Skip to content

DSPyAdapter

gepa.adapters.dspy_adapter.dspy_adapter.DspyAdapter(student_module, metric_fn: Callable, feedback_map: dict[str, Callable], failure_score=0.0, num_threads: int | None = None, add_format_failure_as_feedback: bool = False, rng: random.Random | None = None, reflection_lm=None, custom_instruction_proposer: ProposalFn | None = None, warn_on_score_mismatch: bool = True, enable_tool_optimization: bool = False, reflection_minibatch_size: int | None = None)

Bases: GEPAAdapter[Example, TraceData, Prediction]

Source code in gepa/adapters/dspy_adapter/dspy_adapter.py
def __init__(
    self,
    student_module,
    metric_fn: Callable,
    feedback_map: dict[str, Callable],
    failure_score=0.0,
    num_threads: int | None = None,
    add_format_failure_as_feedback: bool = False,
    rng: random.Random | None = None,
    reflection_lm=None,
    custom_instruction_proposer: "ProposalFn | None" = None,
    warn_on_score_mismatch: bool = True,
    enable_tool_optimization: bool = False,
    reflection_minibatch_size: int | None = None,
):
    self.student = student_module
    self.metric_fn = metric_fn
    self.feedback_map = feedback_map
    self.failure_score = failure_score
    self.num_threads = num_threads
    self.add_format_failure_as_feedback = add_format_failure_as_feedback
    self.rng = rng or random.Random(0)
    self.reflection_lm = reflection_lm
    self.custom_instruction_proposer = custom_instruction_proposer
    self.warn_on_score_mismatch = warn_on_score_mismatch
    self.enable_tool_optimization = enable_tool_optimization
    self.reflection_minibatch_size = reflection_minibatch_size

Attributes

student = student_module instance-attribute

metric_fn = metric_fn instance-attribute

feedback_map = feedback_map instance-attribute

failure_score = failure_score instance-attribute

num_threads = num_threads instance-attribute

add_format_failure_as_feedback = add_format_failure_as_feedback instance-attribute

rng = rng or random.Random(0) instance-attribute

reflection_lm = reflection_lm instance-attribute

custom_instruction_proposer = custom_instruction_proposer instance-attribute

warn_on_score_mismatch = warn_on_score_mismatch instance-attribute

enable_tool_optimization = enable_tool_optimization instance-attribute

reflection_minibatch_size = reflection_minibatch_size instance-attribute

Functions

propose_new_texts(candidate: dict[str, str], reflective_dataset: dict[str, list[dict[str, Any]]], components_to_update: list[str]) -> dict[str, str]

Source code in gepa/adapters/dspy_adapter/dspy_adapter.py
def propose_new_texts(
    self,
    candidate: dict[str, str],
    reflective_dataset: dict[str, list[dict[str, Any]]],
    components_to_update: list[str],
) -> dict[str, str]:
    reflection_lm = self.reflection_lm or dspy.settings.lm
    # If custom proposer provided, override everything with custom proposer
    if self.custom_instruction_proposer:
        with dspy.context(lm=reflection_lm):
            return self.custom_instruction_proposer(
                candidate=candidate,
                reflective_dataset=reflective_dataset,
                components_to_update=components_to_update,
            )

    # Otherwise, route to appropriate proposers
    # Separate into two categories: tool-using modules (ReAct) vs regular instructions
    # TODO: Add generic tool module support when DSPy trace lineage is improved
    tool_components = []
    instruction_components = []

    for c in components_to_update:
        if c.startswith(TOOL_MODULE_PREFIX):
            tool_components.append(c)
        else:
            instruction_components.append(c)

    results: dict[str, str] = {}

    with dspy.context(lm=reflection_lm):
        # Handle regular instruction components
        if instruction_components:
            for name in instruction_components:
                base_instruction = candidate[name]
                dataset_with_feedback = reflective_dataset[name]
                results[name] = InstructionProposalSignature.run(
                    lm=(lambda x: self.stripped_lm_call(x)[0]),
                    input_dict={
                        "current_instruction_doc": base_instruction,
                        "dataset_with_feedback": dataset_with_feedback,
                    },
                )["new_instruction"]

        # Handle ReAct modules
        if tool_components:
            from gepa.adapters.dspy_adapter.instruction_proposal import ToolProposer

            tool_proposer = ToolProposer()
            results.update(
                tool_proposer(
                    candidate=candidate,
                    reflective_dataset=reflective_dataset,
                    components_to_update=tool_components,
                )
            )

    return results

build_program(candidate: dict[str, str])

Source code in gepa/adapters/dspy_adapter/dspy_adapter.py
def build_program(self, candidate: dict[str, str]):
    new_prog = self.student.deepcopy()

    # Start with plain string instructions from candidate
    predictor_candidates = {k: v for k, v in candidate.items() if not k.startswith(TOOL_MODULE_PREFIX)}

    tool_candidates = {}
    if self.enable_tool_optimization:
        for key, value in candidate.items():
            if not key.startswith(TOOL_MODULE_PREFIX):
                continue

            config = json.loads(value)

            for pred_name, instruction in config.items():
                if isinstance(instruction, str):
                    predictor_candidates[pred_name] = instruction

            tool_candidates.update(config.get("tools", {}))

    # Update predictor instructions
    for name, pred in new_prog.named_predictors():
        if name in predictor_candidates:
            pred.signature = pred.signature.with_instructions(predictor_candidates[name])

    # Update tool descriptions
    if tool_candidates:
        self._update_tool_descriptions(new_prog, tool_candidates)

    return new_prog

evaluate(batch, candidate, capture_traces=False)

Source code in gepa/adapters/dspy_adapter/dspy_adapter.py
def evaluate(self, batch, candidate, capture_traces=False):
    program = self.build_program(candidate)
    callback_metadata = (
        {"metric_key": "eval_full"}
        if self.reflection_minibatch_size is None or len(batch) > self.reflection_minibatch_size
        else {"disable_logging": True}
    )

    outputs: list[Prediction] = []
    scores: list[float] = []
    subscores: list[dict[str, float]] = []
    trajs: list[TraceData] | None = None

    if capture_traces:
        # bootstrap_trace_data-like flow with trace capture
        from dspy.teleprompt import bootstrap_trace as bootstrap_trace_module

        trajs = bootstrap_trace_module.bootstrap_trace_data(
            program=program,
            dataset=batch,
            metric=self.metric_fn,
            num_threads=self.num_threads,
            raise_on_error=False,
            capture_failed_parses=True,
            failure_score=self.failure_score,
            format_failure_score=self.failure_score,
            callback_metadata=callback_metadata,
        )
        for t in trajs:
            outputs.append(t["prediction"])
            score_val, subscore_dict = self._extract_score_and_subscores(t.get("score"))
            if score_val is None:
                score_val = self.failure_score
            scores.append(score_val)
            subscores.append(subscore_dict)
    else:
        evaluator = Evaluate(
            devset=batch,
            metric=self.metric_fn,
            num_threads=self.num_threads,
            return_all_scores=True,
            return_outputs=True,
            failure_score=self.failure_score,
            provide_traceback=True,
            max_errors=len(batch) * 100,
            callback_metadata=callback_metadata,
        )
        res = evaluator(program)
        outputs = [r[1] for r in res.results]
        raw_scores = [r[2] for r in res.results]
        for raw_score in raw_scores:
            score_val, subscore_dict = self._extract_score_and_subscores(raw_score)
            if score_val is None:
                score_val = self.failure_score
            scores.append(score_val)
            subscores.append(subscore_dict)

    has_subscores = any(subscores)
    # Map DSPy "subscores" into GEPA objective score payloads.
    return EvaluationBatch(
        outputs=outputs,
        scores=scores,
        trajectories=trajs,
        objective_scores=subscores if has_subscores else None,
    )

make_reflective_dataset(candidate, eval_batch, components_to_update) -> dict[str, list[ReflectiveExample]]

Source code in gepa/adapters/dspy_adapter/dspy_adapter.py
def make_reflective_dataset(
    self, candidate, eval_batch, components_to_update
) -> dict[str, list[ReflectiveExample]]:
    program = self.build_program(candidate)

    ret_d: dict[str, list[ReflectiveExample]] = {}

    for pred_name in components_to_update:
        # Extract predictor name from component key
        if pred_name.startswith(TOOL_MODULE_PREFIX):
            target_name = pred_name.removeprefix(f"{TOOL_MODULE_PREFIX}:")
        else:
            target_name = pred_name

        # Find the predictor object
        module = None
        for name, m in program.named_predictors():
            if name == target_name:
                module = m
                break
        assert module is not None, f"Predictor not found: {target_name}"

        # Create reflective examples from traces
        items: list[ReflectiveExample] = []
        for data in eval_batch.trajectories or []:
            trace = data["trace"]
            example = data["example"]
            prediction = data["prediction"]
            module_score_obj = data.get("score")
            module_score, _ = self._extract_score_and_subscores(module_score_obj)

            trace_instances = [t for t in trace if t[0].signature.equals(module.signature)]
            if not self.add_format_failure_as_feedback:
                trace_instances = [t for t in trace_instances if not isinstance(t[2], FailedPrediction)]
            if len(trace_instances) == 0:
                continue

            selected = None
            for t in trace_instances:
                if isinstance(t[2], FailedPrediction):
                    selected = t
                    break

            if selected is None:
                if isinstance(prediction, FailedPrediction):
                    continue
                selected = self.rng.choice(trace_instances)

            inputs = selected[1]
            outputs = selected[2]

            new_inputs = {}
            new_outputs = {}

            contains_history = False
            history_key_name = None
            for input_key, input_val in inputs.items():
                if isinstance(input_val, History):
                    contains_history = True
                    assert history_key_name is None
                    history_key_name = input_key

            if contains_history:
                s = "```json\n"
                for i, message in enumerate(inputs[history_key_name].messages):
                    s += f"  {i}: {message}\n"
                s += "```"
                new_inputs["Context"] = s

            for input_key, input_val in inputs.items():
                if contains_history and input_key == history_key_name:
                    continue

                if isinstance(input_val, Type) and self.custom_instruction_proposer is not None:
                    # Keep original object - will be properly formatted when sent to reflection LM
                    new_inputs[input_key] = input_val
                else:
                    new_inputs[input_key] = str(input_val)

            if isinstance(outputs, FailedPrediction):
                s = "Couldn't parse the output as per the expected output format. The model's raw response was:\n"
                s += "```\n"
                s += outputs.completion_text + "\n"
                s += "```\n\n"
                new_outputs = s
            else:
                for output_key, output_val in outputs.items():
                    new_outputs[output_key] = str(output_val)

            d = {"Inputs": new_inputs, "Generated Outputs": new_outputs}
            if isinstance(outputs, FailedPrediction):
                adapter = ChatAdapter()
                structure_instruction = ""
                for dd in adapter.format(module.signature, [], {}):
                    structure_instruction += dd["role"] + ": " + dd["content"] + "\n"
                d["Feedback"] = "Your output failed to parse. Follow this structure:\n" + structure_instruction
                # d['score'] = self.failure_score
            else:
                # Use actual predictor name for feedback lookup
                feedback_fn = self.feedback_map[target_name]
                fb = feedback_fn(
                    predictor_output=outputs,
                    predictor_inputs=inputs,
                    module_inputs=example,
                    module_outputs=prediction,
                    captured_trace=trace,
                )
                if isinstance(fb, dict):
                    feedback_score = fb.get("score")
                    feedback_text = fb.get("feedback", "")
                else:
                    feedback_score = getattr(fb, "score", None)
                    feedback_text = getattr(fb, "feedback", "")
                d["Feedback"] = feedback_text
                if module_score is not None and feedback_score is not None:
                    if abs(feedback_score - module_score) > 1e-8:
                        if self.warn_on_score_mismatch:
                            logger.warning(
                                "The score returned by the metric with pred_name is different from the overall metric score. This can indicate 2 things: Either the metric is non-deterministic (e.g., LLM-as-judge, Semantic score, etc.) or the metric returned a score specific to pred_name that differs from the module level score. Currently, GEPA does not support predictor level scoring (support coming soon), and only requires a feedback text to be provided, which can be specific to the predictor or program level. GEPA will ignore the differing score returned, and instead use module level score. You can safely ignore this warning if using a semantic metric, however, if this mismatch is caused due to predictor scoring, please return module-level scores. To disable this warning, set warn_on_score_mismatch=False."
                            )
                            self.warn_on_score_mismatch = False

            items.append(d)

        if len(items) == 0:
            logger.warning(f"  No valid reflective examples found for {pred_name}")
            continue

        ret_d[pred_name] = items

    if len(ret_d) == 0:
        raise Exception("No valid predictions found for any module.")

    return ret_d

stripped_lm_call(x: str) -> list[str]

Source code in gepa/adapters/dspy_adapter/dspy_adapter.py
def stripped_lm_call(self, x: str) -> list[str]:
    raw_outputs = self.reflection_lm(x)
    outputs = []
    for raw_output in raw_outputs:
        if type(raw_output) == str:
            outputs.append(raw_output)
        elif type(raw_output) == dict:
            if "text" not in raw_output:
                raise KeyError("Missing 'text' field in the output from the base LM!")
            outputs.append(raw_output["text"])
        else:
            raise TypeError("Unexpected output type from the base LM! Expected str or dict")

    return outputs