Skip to content

GEPAState

gepa.core.state.GEPAState(seed_candidate: dict[str, str], base_evaluation: ValsetEvaluation[RolloutOutput, DataId], track_best_outputs: bool = False, frontier_type: FrontierType = 'instance', evaluation_cache: EvaluationCache[RolloutOutput, DataId] | None = None)

Bases: Generic[RolloutOutput, DataId]

Persistent optimizer state tracking candidates, sparse validation coverage, and objective frontiers.

Source code in gepa/core/state.py
def __init__(
    self,
    seed_candidate: dict[str, str],
    base_evaluation: ValsetEvaluation[RolloutOutput, DataId],
    track_best_outputs: bool = False,
    frontier_type: FrontierType = "instance",
    evaluation_cache: "EvaluationCache[RolloutOutput, DataId] | None" = None,
):
    self.program_candidates = [dict(seed_candidate)]
    self.prog_candidate_val_subscores = [dict(base_evaluation.scores_by_val_id)]

    base_objective_aggregates = self._aggregate_objective_scores(base_evaluation.objective_scores_by_val_id)
    self.prog_candidate_objective_scores = [base_objective_aggregates]

    self.parent_program_for_candidate = [[None]]

    self.frontier_type: FrontierType = frontier_type
    self.pareto_front_valset = {val_id: score for val_id, score in base_evaluation.scores_by_val_id.items()}
    self.program_at_pareto_front_valset = {val_id: {0} for val_id in base_evaluation.scores_by_val_id.keys()}
    self.objective_pareto_front = dict(base_objective_aggregates)
    self.program_at_pareto_front_objectives = {objective: {0} for objective in base_objective_aggregates.keys()}

    # Validate that objective scores are provided for frontier types that require them
    if frontier_type in ("objective", "hybrid", "cartesian"):
        if not base_evaluation.objective_scores_by_val_id:
            raise ValueError(
                f"frontier_type='{frontier_type}' requires objective_scores to be provided by the evaluator, "
                f"but none were found. Use an evaluator that returns objective_scores or use frontier_type='instance'."
            )

    # Cartesian frontier will be base_evaluation.objective_scores_by_val_id
    if frontier_type == "cartesian":
        assert base_evaluation.objective_scores_by_val_id is not None  # Already validated above
        self.pareto_front_cartesian = {
            (val_id, objective): objective_score
            for val_id, objective_scores in base_evaluation.objective_scores_by_val_id.items()
            for objective, objective_score in objective_scores.items()
        }
        self.program_at_pareto_front_cartesian = {
            (val_id, objective): {0}
            for val_id, objective_scores in base_evaluation.objective_scores_by_val_id.items()
            for objective in objective_scores.keys()
        }
    else:
        self.pareto_front_cartesian = {}
        self.program_at_pareto_front_cartesian = {}

    self.list_of_named_predictors = list(seed_candidate.keys())
    self.named_predictor_id_to_update_next_for_program_candidate = [0]
    self.i = -1

    self.num_metric_calls_by_discovery = [0]

    if track_best_outputs:
        self.best_outputs_valset = {
            val_id: [(0, output)] for val_id, output in base_evaluation.outputs_by_val_id.items()
        }
    else:
        self.best_outputs_valset = None

    self.full_program_trace = []
    self.validation_schema_version = self._VALIDATION_SCHEMA_VERSION
    self.evaluation_cache = evaluation_cache

Attributes

pareto_front_cartesian: dict[tuple[DataId, str], float] instance-attribute

program_at_pareto_front_cartesian: dict[tuple[DataId, str], set[ProgramIdx]] instance-attribute

num_full_ds_evals: int instance-attribute

total_num_evals: int instance-attribute

best_outputs_valset: dict[DataId, list[tuple[ProgramIdx, RolloutOutput]]] | None instance-attribute

program_candidates: list[dict[str, str]] = [dict(seed_candidate)] instance-attribute

prog_candidate_val_subscores: list[dict[DataId, float]] = [dict(base_evaluation.scores_by_val_id)] instance-attribute

prog_candidate_objective_scores: list[ObjectiveScores] = [base_objective_aggregates] instance-attribute

parent_program_for_candidate: list[list[ProgramIdx | None]] = [[None]] instance-attribute

frontier_type: FrontierType = frontier_type instance-attribute

pareto_front_valset: dict[DataId, float] = {val_id: score for val_id, score in (base_evaluation.scores_by_val_id.items())} instance-attribute

program_at_pareto_front_valset: dict[DataId, set[ProgramIdx]] = {val_id: {0} for val_id in (base_evaluation.scores_by_val_id.keys())} instance-attribute

objective_pareto_front: ObjectiveScores = dict(base_objective_aggregates) instance-attribute

program_at_pareto_front_objectives: dict[str, set[ProgramIdx]] = {objective: {0} for objective in (base_objective_aggregates.keys())} instance-attribute

list_of_named_predictors: list[str] = list(seed_candidate.keys()) instance-attribute

named_predictor_id_to_update_next_for_program_candidate: list[int] = [0] instance-attribute

i: int = -1 instance-attribute

num_metric_calls_by_discovery: list[int] = [0] instance-attribute

full_program_trace: list[dict[str, Any]] = [] instance-attribute

validation_schema_version: int = self._VALIDATION_SCHEMA_VERSION instance-attribute

evaluation_cache: EvaluationCache[RolloutOutput, DataId] | None = evaluation_cache instance-attribute

valset_evaluations: dict[DataId, list[ProgramIdx]] property

Valset examples by id and programs that have evaluated them. Keys include only validation ids that have been scored at least once.

program_full_scores_val_set: list[float] property

per_program_tracked_scores: list[float] property

Functions

is_consistent() -> bool

Source code in gepa/core/state.py
def is_consistent(self) -> bool:
    assert len(self.program_candidates) == len(self.parent_program_for_candidate)
    assert len(self.program_candidates) == len(self.named_predictor_id_to_update_next_for_program_candidate)
    assert len(self.program_candidates) == len(self.prog_candidate_val_subscores)
    assert len(self.program_candidates) == len(self.prog_candidate_objective_scores)
    assert len(self.program_candidates) == len(self.num_metric_calls_by_discovery)

    assert len(self.pareto_front_valset) == len(self.program_at_pareto_front_valset)
    assert set(self.pareto_front_valset.keys()) == set(self.program_at_pareto_front_valset.keys())
    assert set(self.objective_pareto_front.keys()) == set(self.program_at_pareto_front_objectives.keys())

    for front in self.program_at_pareto_front_valset.values():
        for prog_idx in front:
            assert prog_idx < len(self.program_candidates), (
                "Program index in valset pareto front exceeds number of program candidates"
            )

    return True

add_budget_hook(hook: Callable[[int, int], None]) -> None

Register a callback to be called whenever total_num_evals changes.

Parameters:

Name Type Description Default
hook Callable[[int, int], None]

A callable that receives (new_total, delta) when evals are incremented.

required
Source code in gepa/core/state.py
def add_budget_hook(self, hook: Callable[[int, int], None]) -> None:
    """Register a callback to be called whenever total_num_evals changes.

    Args:
        hook: A callable that receives (new_total, delta) when evals are incremented.
    """
    if not hasattr(self, "_budget_hooks"):
        self._budget_hooks: list[Callable[[int, int], None]] = []
    self._budget_hooks.append(hook)

increment_evals(count: int) -> None

Increment total_num_evals and notify all registered hooks.

Parameters:

Name Type Description Default
count int

Number of evaluations to add.

required
Source code in gepa/core/state.py
def increment_evals(self, count: int) -> None:
    """Increment total_num_evals and notify all registered hooks.

    Args:
        count: Number of evaluations to add.
    """
    self.total_num_evals += count
    # Lazy init handles states loaded from disk (which won't have _budget_hooks)
    hooks = getattr(self, "_budget_hooks", None)
    if hooks:
        for hook in hooks:
            hook(self.total_num_evals, count)

save(run_dir: str | None, *, use_cloudpickle: bool = False) -> None

Source code in gepa/core/state.py
def save(self, run_dir: str | None, *, use_cloudpickle: bool = False) -> None:
    if run_dir is None:
        return
    with open(os.path.join(run_dir, "gepa_state.bin"), "wb") as f:
        if use_cloudpickle:
            import cloudpickle as pickle  # type: ignore[import-not-found]
        else:
            import pickle
        # Exclude runtime-only attributes that can't be serialized (e.g., callback hooks)
        serialized = {k: v for k, v in self.__dict__.items() if k not in self._EXCLUDED_FROM_SERIALIZATION}
        serialized["validation_schema_version"] = GEPAState._VALIDATION_SCHEMA_VERSION
        pickle.dump(serialized, f)

load(run_dir: str) -> GEPAState[RolloutOutput, DataId] staticmethod

Source code in gepa/core/state.py
@staticmethod
def load(run_dir: str) -> "GEPAState[RolloutOutput, DataId]":
    with open(os.path.join(run_dir, "gepa_state.bin"), "rb") as f:
        import pickle

        data = pickle.load(f)

    # handle schema migration
    version = data.get("validation_schema_version")
    if version is None or version < 2:
        GEPAState._migrate_from_legacy_state_v0(data)
        version = data.get("validation_schema_version")
    if version is None or version < GEPAState._VALIDATION_SCHEMA_VERSION:
        GEPAState._upgrade_state_dict(data)

    state = GEPAState.__new__(GEPAState)
    state.__dict__.update(data)

    state.validation_schema_version = GEPAState._VALIDATION_SCHEMA_VERSION
    assert len(state.program_candidates) == len(state.prog_candidate_val_subscores)
    assert len(state.program_candidates) == len(state.prog_candidate_objective_scores)
    assert len(state.program_candidates) == len(state.num_metric_calls_by_discovery)
    assert len(state.program_candidates) == len(state.parent_program_for_candidate)
    assert len(state.program_candidates) == len(state.named_predictor_id_to_update_next_for_program_candidate)
    assert len(state.pareto_front_valset) == len(state.program_at_pareto_front_valset)
    assert set(state.pareto_front_valset.keys()) == set(state.program_at_pareto_front_valset.keys())
    assert set(state.objective_pareto_front.keys()) == set(state.program_at_pareto_front_objectives.keys())
    return state

get_program_average_val_subset(program_idx: int) -> tuple[float, int]

Source code in gepa/core/state.py
def get_program_average_val_subset(self, program_idx: int) -> tuple[float, int]:
    # TODO: This should be only used/handled by the val_evaluation_policy, and never used directly.
    scores = self.prog_candidate_val_subscores[program_idx]
    if not scores:
        return float("-inf"), 0
    num_samples = len(scores)
    avg = sum(scores.values()) / num_samples
    return avg, num_samples

update_state_with_new_program(parent_program_idx: list[ProgramIdx], new_program: dict[str, str], valset_evaluation: ValsetEvaluation, run_dir: str | None, num_metric_calls_by_discovery_of_new_program: int) -> ProgramIdx

Source code in gepa/core/state.py
def update_state_with_new_program(
    self,
    parent_program_idx: list[ProgramIdx],
    new_program: dict[str, str],
    valset_evaluation: ValsetEvaluation,
    run_dir: str | None,
    num_metric_calls_by_discovery_of_new_program: int,
) -> ProgramIdx:
    new_program_idx = len(self.program_candidates)
    self.program_candidates.append(dict(new_program))
    self.num_metric_calls_by_discovery.append(num_metric_calls_by_discovery_of_new_program)

    max_predictor_id = max(
        [self.named_predictor_id_to_update_next_for_program_candidate[p] for p in parent_program_idx],
        default=0,
    )
    self.named_predictor_id_to_update_next_for_program_candidate.append(max_predictor_id)
    self.parent_program_for_candidate.append(list(parent_program_idx))

    valset_scores = dict(valset_evaluation.scores_by_val_id)
    self.prog_candidate_val_subscores.append(valset_scores)
    objective_scores = self._aggregate_objective_scores(valset_evaluation.objective_scores_by_val_id)
    self.prog_candidate_objective_scores.append(objective_scores)

    for val_id, score in valset_scores.items():
        output = valset_evaluation.outputs_by_val_id.get(val_id) if valset_evaluation.outputs_by_val_id else None
        self._update_pareto_front_for_val_id(
            val_id,
            score,
            new_program_idx,
            output,
            run_dir,
            self.i + 1,
        )

    self._update_objective_pareto_front(objective_scores, new_program_idx)

    if self.frontier_type in ("objective", "hybrid", "cartesian"):
        if not valset_evaluation.objective_scores_by_val_id:
            raise ValueError(
                f"frontier_type='{self.frontier_type}' requires objective_scores to be provided by the evaluator, "
                f"but none were found in the evaluation result."
            )

    if self.frontier_type == "cartesian":
        assert valset_evaluation.objective_scores_by_val_id is not None  # Validated above
        for val_id, objective_scores in valset_evaluation.objective_scores_by_val_id.items():
            for objective, objective_score in objective_scores.items():
                self._update_pareto_front_for_cartesian(
                    val_id,
                    objective,
                    objective_score,
                    new_program_idx,
                )

    return new_program_idx

get_pareto_front_mapping() -> dict[FrontierKey, set[ProgramIdx]]

Return frontier key to best-program-indices mapping based on configured frontier_type.

Source code in gepa/core/state.py
def get_pareto_front_mapping(self) -> dict[FrontierKey, set[ProgramIdx]]:
    """Return frontier key to best-program-indices mapping based on configured frontier_type."""
    return self._get_pareto_front_mapping(self.frontier_type)

cached_evaluate(candidate: dict[str, str], example_ids: list[DataId], fetcher: Callable[[list[DataId]], Any], evaluator: Callable[[Any, dict[str, str]], tuple[Any, list[float], Sequence[ObjectiveScores] | None]]) -> tuple[list[float], int]

Evaluate with optional caching. Returns (scores, num_actual_evals).

Source code in gepa/core/state.py
def cached_evaluate(
    self,
    candidate: dict[str, str],
    example_ids: list[DataId],
    fetcher: Callable[[list[DataId]], Any],
    evaluator: Callable[[Any, dict[str, str]], tuple[Any, list[float], Sequence[ObjectiveScores] | None]],
) -> tuple[list[float], int]:
    """Evaluate with optional caching. Returns (scores, num_actual_evals)."""
    _, scores_by_id, _, num_actual_evals = self.cached_evaluate_full(candidate, example_ids, fetcher, evaluator)
    return [scores_by_id[eid] for eid in example_ids], num_actual_evals

cached_evaluate_full(candidate: dict[str, str], example_ids: list[DataId], fetcher: Callable[[list[DataId]], Any], evaluator: Callable[[Any, dict[str, str]], tuple[Any, list[float], Sequence[ObjectiveScores] | None]]) -> tuple[dict[DataId, RolloutOutput], dict[DataId, float], dict[DataId, ObjectiveScores] | None, int]

Evaluate with optional caching, returning full results.

Source code in gepa/core/state.py
def cached_evaluate_full(
    self,
    candidate: dict[str, str],
    example_ids: list[DataId],
    fetcher: Callable[[list[DataId]], Any],
    evaluator: Callable[[Any, dict[str, str]], tuple[Any, list[float], Sequence[ObjectiveScores] | None]],
) -> tuple[dict[DataId, RolloutOutput], dict[DataId, float], dict[DataId, ObjectiveScores] | None, int]:
    """Evaluate with optional caching, returning full results."""
    if self.evaluation_cache is not None:
        return self.evaluation_cache.evaluate_with_cache_full(candidate, example_ids, fetcher, evaluator)
    batch = fetcher(example_ids)
    outputs, scores, objective_scores = evaluator(batch, candidate)
    outputs_by_id = dict(zip(example_ids, outputs, strict=False))
    scores_by_id = dict(zip(example_ids, scores, strict=False))
    objective_by_id = dict(zip(example_ids, objective_scores, strict=False)) if objective_scores else None
    return outputs_by_id, scores_by_id, objective_by_id, len(example_ids)