Implementation:Dagster io Dagster MIPROv2 Optimizer Pattern
| Property | Value |
|---|---|
| Type | Implementation |
| Category | AI, Prompt_Engineering, Optimization |
| Repository | Dagster_io_Dagster |
| Implements | Principle:Dagster_io_Dagster_DSPy_Prompt_Optimization |
Overview
Concrete implementation pattern for DSPy prompt optimization using MIPROv2 within Dagster assets, with custom resource management and quality gates.
Description
This implementation demonstrates how to integrate DSPy's MIPROv2 optimizer into a Dagster asset pipeline. A ConnectionsSolver module uses dspy.ChainOfThought to solve word puzzles iteratively. The optimization asset loads a baseline model, runs MIPROv2 with a configurable metric function and thread count, and saves the optimized model through a DSPyResource. The resource manages DSPy configuration, model serialization, and loading.
Usage
Define a DSPy module (e.g., ConnectionsSolver), create a baseline by evaluating it on training data, then use the optimized_model asset to run MIPROv2 optimization. The DSPyResource handles configuration and persistence.
Code Reference
Source Location
examples/docs_projects/project_dspy/src/project_dspy/components/ds_py_model_builder.py:L246-347examples/docs_projects/project_dspy/dspy_modules/solver.py
Signature/Pattern
Solver module with ChainOfThought:
import dspy
from dspy.teleprompt import MIPROv2
class ConnectionsSolver(dspy.Module):
def __init__(self):
self.predict = dspy.ChainOfThought(
"rules, available_words, history_feedback, guess_index -> guess"
)
def forward(self, puzzle):
# Iterative solving with feedback
for i in range(MAX_ATTEMPTS):
prediction = self.predict(
rules=RULES,
available_words=remaining,
history_feedback=feedback,
guess_index=str(i),
)
# ... validate and update ...
Optimization asset:
@dg.asset(group_name="optimization")
def optimized_model(context, connections_puzzle_data: dict, dspy_resource: DSPyResource):
dspy_resource.configure_dspy()
baseline = dspy_resource.load_model(ConnectionsSolver, "baseline")
optimizer = MIPROv2(
auto="light",
metric=optimization_metric,
num_threads=4,
)
optimized = optimizer.compile(
baseline,
trainset=connections_puzzle_data["train_puzzles"][:30],
valset=connections_puzzle_data["train_puzzles"][30:50],
)
dspy_resource.save_model(optimized, "optimized")
return {"model_name": "optimized", "optimized": True}
Import
import dspy
from dspy.teleprompt import MIPROv2
I/O Contract
| Direction | Name | Type | Description |
|---|---|---|---|
| Input | connections_puzzle_data | dict | Training and validation puzzle data with ground truth |
| Input | dspy_resource | DSPyResource | Resource managing DSPy config, model save/load |
| Input | baseline model | ConnectionsSolver | Pre-trained baseline module loaded from storage |
| Output | optimized model | dict | Metadata dict with model name and optimization status |
| Output | saved model | ConnectionsSolver (serialized) | Optimized module persisted via DSPyResource |
Usage Examples
Defining the optimization pipeline:
import dagster as dg
defs = dg.Definitions(
assets=[connections_puzzle_data, baseline_model, optimized_model],
resources={
"dspy_resource": DSPyResource(
model_name="gpt-4o-mini",
model_dir="models/",
),
},
)
Configuring MIPROv2 optimization levels:
# "light" mode for quick optimization
optimizer = MIPROv2(auto="light", metric=optimization_metric, num_threads=4)
# "medium" mode for more thorough search
optimizer = MIPROv2(auto="medium", metric=optimization_metric, num_threads=8)
# "heavy" mode for exhaustive optimization
optimizer = MIPROv2(auto="heavy", metric=optimization_metric, num_threads=16)
Custom metric function:
def optimization_metric(example, prediction, trace=None):
# Compare predicted groups against ground truth
correct = sum(1 for pred, truth in zip(prediction.groups, example.groups) if pred == truth)
return correct / len(example.groups)