Skip to content

ExperimentTracker

gepa.logging.experiment_tracker.ExperimentTracker(use_wandb: bool = False, wandb_api_key: str | None = None, wandb_init_kwargs: dict[str, Any] | None = None, use_mlflow: bool = False, mlflow_tracking_uri: str | None = None, mlflow_experiment_name: str | None = None)

Unified experiment tracking that supports both wandb and mlflow.

Source code in gepa/logging/experiment_tracker.py
def __init__(
    self,
    use_wandb: bool = False,
    wandb_api_key: str | None = None,
    wandb_init_kwargs: dict[str, Any] | None = None,
    use_mlflow: bool = False,
    mlflow_tracking_uri: str | None = None,
    mlflow_experiment_name: str | None = None,
):
    self.use_wandb = use_wandb
    self.use_mlflow = use_mlflow

    self.wandb_api_key = wandb_api_key
    self.wandb_init_kwargs = wandb_init_kwargs or {}
    self.mlflow_tracking_uri = mlflow_tracking_uri
    self.mlflow_experiment_name = mlflow_experiment_name

    self._created_mlflow_run = False

Attributes

use_wandb = use_wandb instance-attribute

use_mlflow = use_mlflow instance-attribute

wandb_api_key = wandb_api_key instance-attribute

wandb_init_kwargs = wandb_init_kwargs or {} instance-attribute

mlflow_tracking_uri = mlflow_tracking_uri instance-attribute

mlflow_experiment_name = mlflow_experiment_name instance-attribute

Functions

__enter__()

Context manager entry.

Source code in gepa/logging/experiment_tracker.py
def __enter__(self):
    """Context manager entry."""
    self.initialize()
    self.start_run()
    return self

__exit__(exc_type, exc_val, exc_tb)

Context manager exit - always end the run.

Source code in gepa/logging/experiment_tracker.py
def __exit__(self, exc_type, exc_val, exc_tb):
    """Context manager exit - always end the run."""
    self.end_run()
    return False  # Don't suppress exceptions

initialize()

Initialize the logging backends.

Source code in gepa/logging/experiment_tracker.py
def initialize(self):
    """Initialize the logging backends."""
    if self.use_wandb:
        self._initialize_wandb()
    if self.use_mlflow:
        self._initialize_mlflow()

start_run()

Start a new run.

Source code in gepa/logging/experiment_tracker.py
def start_run(self):
    """Start a new run."""
    if self.use_wandb:
        import wandb  # type: ignore

        wandb.init(**self.wandb_init_kwargs)
    if self.use_mlflow:
        import mlflow  # type: ignore

        # Only start a new run if there's no active run
        if mlflow.active_run() is None:
            mlflow.start_run()
            self._created_mlflow_run = True
        else:
            self._created_mlflow_run = False

log_metrics(metrics: dict[str, Any], step: int | None = None)

Log metrics to the active backends.

Source code in gepa/logging/experiment_tracker.py
def log_metrics(self, metrics: dict[str, Any], step: int | None = None):
    """Log metrics to the active backends."""
    if self.use_wandb:
        try:
            import wandb  # type: ignore

            wandb.log(metrics, step=step)
        except Exception as e:
            print(f"Warning: Failed to log to wandb: {e}")

    if self.use_mlflow:
        try:
            import mlflow  # type: ignore

            # MLflow only accepts numeric metrics, filter out non-numeric values
            numeric_metrics = {k: float(v) for k, v in metrics.items() if isinstance(v, int | float)}
            if numeric_metrics:
                mlflow.log_metrics(numeric_metrics, step=step)
        except Exception as e:
            print(f"Warning: Failed to log to mlflow: {e}")

end_run()

End the current run.

Source code in gepa/logging/experiment_tracker.py
def end_run(self):
    """End the current run."""
    if self.use_wandb:
        try:
            import wandb  # type: ignore

            if wandb.run is not None:
                wandb.finish()
        except Exception as e:
            print(f"Warning: Failed to end wandb run: {e}")

    if self.use_mlflow:
        try:
            import mlflow  # type: ignore

            if self._created_mlflow_run and mlflow.active_run() is not None:
                mlflow.end_run()
                self._created_mlflow_run = False
        except Exception as e:
            print(f"Warning: Failed to end mlflow run: {e}")

is_active() -> bool

Check if any backend has an active run.

Source code in gepa/logging/experiment_tracker.py
def is_active(self) -> bool:
    """Check if any backend has an active run."""
    if self.use_wandb:
        try:
            import wandb  # type: ignore

            if wandb.run is not None:
                return True
        except Exception:
            pass

    if self.use_mlflow:
        try:
            import mlflow  # type: ignore

            if mlflow.active_run() is not None:
                return True
        except Exception:
            pass

    return False