Skip to content

MergeProposer

gepa.proposer.merge.MergeProposer(logger: LoggerProtocol, valset: DataLoader[DataId, DataInst], evaluator: Callable[[list[DataInst], dict[str, str]], tuple[list[RolloutOutput], list[float], Sequence[ObjectiveScores] | None]], use_merge: bool, max_merge_invocations: int, val_overlap_floor: int = 5, rng: random.Random | None = None, callbacks: list[GEPACallback] | None = None)

Bases: ProposeNewCandidate[DataId]

Implements merge flow that combines compatible descendants of a common ancestor.

  • Find merge candidates among Pareto front dominators
  • Attempt a merge via sample_and_attempt_merge_programs_by_common_predictors
  • Subsample eval on valset-driven selected indices
  • Return proposal if merge's subsample score >= max(parents) The engine handles full eval + adding to state.
Source code in gepa/proposer/merge.py
def __init__(
    self,
    logger: LoggerProtocol,
    valset: DataLoader[DataId, DataInst],
    evaluator: Callable[
        [list[DataInst], dict[str, str]],
        tuple[list[RolloutOutput], list[float], Sequence[ObjectiveScores] | None],
    ],
    use_merge: bool,
    max_merge_invocations: int,
    val_overlap_floor: int = 5,
    rng: random.Random | None = None,
    callbacks: list[GEPACallback] | None = None,
):
    self.logger = logger
    self.valset = valset
    self.evaluator = evaluator
    self.use_merge = use_merge
    self.max_merge_invocations = max_merge_invocations
    self.rng = rng if rng is not None else random.Random(0)
    self.callbacks = callbacks

    if val_overlap_floor <= 0:
        raise ValueError("val_overlap_floor should be a positive integer")
    self.val_overlap_floor = val_overlap_floor
    # Internal counters matching original behavior
    self.merges_due = 0
    self.total_merges_tested = 0
    self.merges_performed: tuple[list[AncestorLog], list[MergeDescription]] = ([], [])

    # Toggle controlled by engine: set True when last iter found new program
    self.last_iter_found_new_program = False

Attributes

logger = logger instance-attribute

valset = valset instance-attribute

evaluator = evaluator instance-attribute

use_merge = use_merge instance-attribute

max_merge_invocations = max_merge_invocations instance-attribute

rng = rng if rng is not None else random.Random(0) instance-attribute

callbacks = callbacks instance-attribute

val_overlap_floor = val_overlap_floor instance-attribute

merges_due = 0 instance-attribute

total_merges_tested = 0 instance-attribute

merges_performed: tuple[list[AncestorLog], list[MergeDescription]] = ([], []) instance-attribute

last_iter_found_new_program = False instance-attribute

Functions

schedule_if_needed() -> None

Source code in gepa/proposer/merge.py
def schedule_if_needed(self) -> None:
    if self.use_merge and self.total_merges_tested < self.max_merge_invocations:
        self.merges_due += 1

select_eval_subsample_for_merged_program(scores1: dict[DataId, float], scores2: dict[DataId, float], num_subsample_ids: int = 5) -> list[DataId]

Source code in gepa/proposer/merge.py
def select_eval_subsample_for_merged_program(
    self,
    scores1: dict[DataId, float],
    scores2: dict[DataId, float],
    num_subsample_ids: int = 5,
) -> list[DataId]:
    common_ids = list(set(scores1.keys()) & set(scores2.keys()))

    p1 = [idx for idx in common_ids if scores1[idx] > scores2[idx]]
    p2 = [idx for idx in common_ids if scores2[idx] > scores1[idx]]
    p3 = [idx for idx in common_ids if idx not in p1 and idx not in p2]

    n_each = max(1, math.ceil(num_subsample_ids / 3))
    selected: list[DataId] = []
    for bucket in (p1, p2, p3):
        if len(selected) >= num_subsample_ids:
            break
        available = [idx for idx in bucket if idx not in selected]
        take = min(len(available), n_each, num_subsample_ids - len(selected))
        if take > 0:
            selected += self.rng.sample(available, k=take)

    remaining = num_subsample_ids - len(selected)
    if remaining > 0:
        unused = [idx for idx in common_ids if idx not in selected]
        if len(unused) >= remaining:
            selected += self.rng.sample(unused, k=remaining)
        elif common_ids:
            selected += self.rng.choices(common_ids, k=remaining)

    return selected[:num_subsample_ids]

propose(state: GEPAState[RolloutOutput, DataId]) -> CandidateProposal[DataId] | None

Source code in gepa/proposer/merge.py
def propose(self, state: GEPAState[RolloutOutput, DataId]) -> CandidateProposal[DataId] | None:
    i = state.i + 1
    state.full_program_trace[-1]["invoked_merge"] = True

    # Only attempt when scheduled by engine and after a new program in last iteration
    if not (self.use_merge and self.last_iter_found_new_program and self.merges_due > 0):
        self.logger.log(f"Iteration {i}: No merge candidates scheduled")
        return None

    pareto_front_programs = state.get_pareto_front_mapping()

    tracked_scores: Sequence[float] = getattr(
        state, "per_program_tracked_scores", state.program_full_scores_val_set
    )
    merge_candidates = find_dominator_programs(pareto_front_programs, list(tracked_scores))

    def has_val_support_overlap(id1: ProgramIdx, id2: ProgramIdx) -> bool:
        common_ids = set(state.prog_candidate_val_subscores[id1].keys()) & set(
            state.prog_candidate_val_subscores[id2].keys()
        )
        return len(common_ids) >= self.val_overlap_floor

    merge_output = sample_and_attempt_merge_programs_by_common_predictors(
        agg_scores=list(tracked_scores),
        rng=self.rng,
        merge_candidates=merge_candidates,
        merges_performed=self.merges_performed,
        program_candidates=state.program_candidates,
        parent_program_for_candidate=state.parent_program_for_candidate,
        has_val_support_overlap=has_val_support_overlap,
    )

    if merge_output is None:
        self.logger.log(f"Iteration {i}: No merge candidates found")
        return None

    new_program, id1, id2, ancestor = merge_output
    state.full_program_trace[-1]["merged"] = True
    state.full_program_trace[-1]["merged_entities"] = (id1, id2, ancestor)
    self.merges_performed[0].append((id1, id2, ancestor))
    self.logger.log(f"Iteration {i}: Merged programs {id1} and {id2} via ancestor {ancestor}")

    subsample_ids = self.select_eval_subsample_for_merged_program(
        state.prog_candidate_val_subscores[id1],
        state.prog_candidate_val_subscores[id2],
    )
    if not subsample_ids:
        self.logger.log(
            f"Iteration {i}: Skipping merge of {id1} and {id2} due to insufficient overlapping val coverage"
        )
        return None

    assert set(subsample_ids).issubset(state.prog_candidate_val_subscores[id1].keys())
    assert set(subsample_ids).issubset(state.prog_candidate_val_subscores[id2].keys())
    id1_sub_scores = [state.prog_candidate_val_subscores[id1][k] for k in subsample_ids]
    id2_sub_scores = [state.prog_candidate_val_subscores[id2][k] for k in subsample_ids]
    state.full_program_trace[-1]["subsample_ids"] = subsample_ids

    mini_devset = self.valset.fetch(subsample_ids)

    # Notify evaluation start for merged candidate
    notify_callbacks(
        self.callbacks,
        "on_evaluation_start",
        EvaluationStartEvent(
            iteration=i,
            candidate_idx=None,
            batch_size=len(mini_devset),
            capture_traces=False,
            parent_ids=[id1, id2],
            inputs=mini_devset,
            is_seed_candidate=False,
        ),
    )

    outputs_by_id, scores_by_id, objective_by_id, actual_evals_count = state.cached_evaluate_full(
        new_program, subsample_ids, self.valset.fetch, self.evaluator
    )
    new_sub_scores = [scores_by_id[eid] for eid in subsample_ids]
    outputs = [outputs_by_id[eid] for eid in subsample_ids]

    notify_callbacks(
        self.callbacks,
        "on_evaluation_end",
        EvaluationEndEvent(
            iteration=i,
            candidate_idx=None,
            scores=new_sub_scores,
            has_trajectories=False,
            parent_ids=[id1, id2],
            outputs=outputs,
            trajectories=None,
            objective_scores=[objective_by_id[eid] for eid in subsample_ids] if objective_by_id else None,
            is_seed_candidate=False,
        ),
    )

    state.full_program_trace[-1]["id1_subsample_scores"] = id1_sub_scores
    state.full_program_trace[-1]["id2_subsample_scores"] = id2_sub_scores
    state.full_program_trace[-1]["new_program_subsample_scores"] = new_sub_scores

    # Count evals via hook mechanism
    state.increment_evals(actual_evals_count)

    # Acceptance will be evaluated by engine (>= max(parents))
    return CandidateProposal(
        candidate=new_program,
        parent_program_ids=[id1, id2],
        subsample_indices=subsample_ids,
        subsample_scores_before=[sum(id1_sub_scores), sum(id2_sub_scores)],
        subsample_scores_after=new_sub_scores,
        tag="merge",
        metadata={"ancestor": ancestor},
    )