Implementation:Dagster io Dagster ML Pipeline Assets
| Property | Value |
|---|---|
| Type | Implementation |
| Category | Machine_Learning, MLOps |
| Repository | Dagster_io_Dagster |
| Implements | Principle:Dagster_io_Dagster_ML_Model_Lifecycle |
Overview
Concrete implementation pattern for ML model lifecycle management using Dagster assets, Config classes, and abstract resource storage provided by the Dagster framework.
Description
This implementation demonstrates the full ML model lifecycle as Dagster assets. It uses Config classes for hyperparameter management, an abstract ModelStoreResource for pluggable model storage, and quality-gated deployment through configurable accuracy thresholds. The pattern covers training a CNN digit classifier, evaluating metrics, conditionally deploying based on quality gates, and serving batch predictions.
Usage
Define a ModelConfig with hyperparameters, implement a ModelStoreResource for your storage backend, then wire the assets together in a Dagster definitions module. Each asset receives its dependencies through Dagster's dependency injection.
Code Reference
Source Location
examples/docs_projects/project_ml/src/project_ml/defs/assets/model_assets.py:L29-657examples/docs_projects/project_ml/src/project_ml/defs/resources.py
Signature/Pattern
Model configuration with Dagster Config:
class ModelConfig(dg.Config):
conv1_channels: int = 32
conv2_channels: int = 64
hidden_size: int = 256
batch_size: int = 64
learning_rate: float = 0.001
epochs: int = 30
patience: int = 7
use_early_stopping: bool = True
Abstract model storage resource:
class ModelStoreResource(dg.ConfigurableResource, ABC):
@abstractmethod
def save_model(self, model, name: str): ...
@abstractmethod
def load_model(self, name: str): ...
class LocalModelStoreResource(ModelStoreResource):
base_dir: str = "models/"
def save_model(self, model, name):
path = Path(self.base_dir) / f"{name}.pkl"
pickle.dump(model, open(path, "wb"))
Training asset:
@dg.asset(group_name="model_pipeline")
def digit_classifier(config: ModelConfig, processed_mnist_data: dict, model_storage: ModelStoreResource) -> DigitCNN:
model = DigitCNN(config)
# ... training loop ...
model_storage.save_model(model, "digit_classifier")
return model
Deployment with quality gate:
class DeploymentConfig(dg.Config):
accuracy_threshold: float = 0.90
force_deploy: bool = False
@dg.asset(deps=["digit_classifier"])
def production_digit_classifier(config: DeploymentConfig, model_evaluation: dict, model_storage: ModelStoreResource):
if model_evaluation["test_accuracy"] >= config.accuracy_threshold or config.force_deploy:
model = model_storage.load_model("digit_classifier")
model_storage.save_model(model, "production_digit_classifier")
return model
return None
Import
import dagster as dg
from dagster import ConfigurableResource
I/O Contract
| Direction | Name | Type | Description |
|---|---|---|---|
| Input | processed_mnist_data | dict | Preprocessed training and test data |
| Input | config (ModelConfig) | ModelConfig | Hyperparameters for model training |
| Input | model_storage | ModelStoreResource | Pluggable storage backend for model artifacts |
| Output | digit_classifier | DigitCNN | Trained model artifact |
| Output | model_evaluation | dict | Evaluation metrics (accuracy, loss, etc.) |
| Output | production_digit_classifier | Optional[DigitCNN] | Deployed model (None if quality gate fails) |
| Output | predictions | dict | Batch inference predictions |
Usage Examples
Defining the asset graph with resources:
import dagster as dg
defs = dg.Definitions(
assets=[digit_classifier, model_evaluation, production_digit_classifier, batch_predictions],
resources={
"model_storage": LocalModelStoreResource(base_dir="models/"),
},
)
Materializing with custom config:
# Override hyperparameters at runtime
dg.materialize(
[digit_classifier],
run_config=dg.RunConfig(
ops={"digit_classifier": ModelConfig(epochs=50, learning_rate=0.0001)}
),
resources={"model_storage": LocalModelStoreResource()},
)