Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Dagster io Dagster MIPROv2 Optimizer Pattern

From Leeroopedia


Property Value
Type Implementation
Category AI, Prompt_Engineering, Optimization
Repository Dagster_io_Dagster
Implements Principle:Dagster_io_Dagster_DSPy_Prompt_Optimization

Overview

Concrete implementation pattern for DSPy prompt optimization using MIPROv2 within Dagster assets, with custom resource management and quality gates.

Description

This implementation demonstrates how to integrate DSPy's MIPROv2 optimizer into a Dagster asset pipeline. A ConnectionsSolver module uses dspy.ChainOfThought to solve word puzzles iteratively. The optimization asset loads a baseline model, runs MIPROv2 with a configurable metric function and thread count, and saves the optimized model through a DSPyResource. The resource manages DSPy configuration, model serialization, and loading.

Usage

Define a DSPy module (e.g., ConnectionsSolver), create a baseline by evaluating it on training data, then use the optimized_model asset to run MIPROv2 optimization. The DSPyResource handles configuration and persistence.

Code Reference

Source Location

  • examples/docs_projects/project_dspy/src/project_dspy/components/ds_py_model_builder.py:L246-347
  • examples/docs_projects/project_dspy/dspy_modules/solver.py

Signature/Pattern

Solver module with ChainOfThought:

import dspy
from dspy.teleprompt import MIPROv2

class ConnectionsSolver(dspy.Module):
    def __init__(self):
        self.predict = dspy.ChainOfThought(
            "rules, available_words, history_feedback, guess_index -> guess"
        )

    def forward(self, puzzle):
        # Iterative solving with feedback
        for i in range(MAX_ATTEMPTS):
            prediction = self.predict(
                rules=RULES,
                available_words=remaining,
                history_feedback=feedback,
                guess_index=str(i),
            )
            # ... validate and update ...

Optimization asset:

@dg.asset(group_name="optimization")
def optimized_model(context, connections_puzzle_data: dict, dspy_resource: DSPyResource):
    dspy_resource.configure_dspy()
    baseline = dspy_resource.load_model(ConnectionsSolver, "baseline")

    optimizer = MIPROv2(
        auto="light",
        metric=optimization_metric,
        num_threads=4,
    )

    optimized = optimizer.compile(
        baseline,
        trainset=connections_puzzle_data["train_puzzles"][:30],
        valset=connections_puzzle_data["train_puzzles"][30:50],
    )

    dspy_resource.save_model(optimized, "optimized")
    return {"model_name": "optimized", "optimized": True}

Import

import dspy
from dspy.teleprompt import MIPROv2

I/O Contract

Direction Name Type Description
Input connections_puzzle_data dict Training and validation puzzle data with ground truth
Input dspy_resource DSPyResource Resource managing DSPy config, model save/load
Input baseline model ConnectionsSolver Pre-trained baseline module loaded from storage
Output optimized model dict Metadata dict with model name and optimization status
Output saved model ConnectionsSolver (serialized) Optimized module persisted via DSPyResource

Usage Examples

Defining the optimization pipeline:

import dagster as dg

defs = dg.Definitions(
    assets=[connections_puzzle_data, baseline_model, optimized_model],
    resources={
        "dspy_resource": DSPyResource(
            model_name="gpt-4o-mini",
            model_dir="models/",
        ),
    },
)

Configuring MIPROv2 optimization levels:

# "light" mode for quick optimization
optimizer = MIPROv2(auto="light", metric=optimization_metric, num_threads=4)

# "medium" mode for more thorough search
optimizer = MIPROv2(auto="medium", metric=optimization_metric, num_threads=8)

# "heavy" mode for exhaustive optimization
optimizer = MIPROv2(auto="heavy", metric=optimization_metric, num_threads=16)

Custom metric function:

def optimization_metric(example, prediction, trace=None):
    # Compare predicted groups against ground truth
    correct = sum(1 for pred, truth in zip(prediction.groups, example.groups) if pred == truth)
    return correct / len(example.groups)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment