Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Dagster io Dagster ML Pipeline Assets

From Leeroopedia


Property Value
Type Implementation
Category Machine_Learning, MLOps
Repository Dagster_io_Dagster
Implements Principle:Dagster_io_Dagster_ML_Model_Lifecycle

Overview

Concrete implementation pattern for ML model lifecycle management using Dagster assets, Config classes, and abstract resource storage provided by the Dagster framework.

Description

This implementation demonstrates the full ML model lifecycle as Dagster assets. It uses Config classes for hyperparameter management, an abstract ModelStoreResource for pluggable model storage, and quality-gated deployment through configurable accuracy thresholds. The pattern covers training a CNN digit classifier, evaluating metrics, conditionally deploying based on quality gates, and serving batch predictions.

Usage

Define a ModelConfig with hyperparameters, implement a ModelStoreResource for your storage backend, then wire the assets together in a Dagster definitions module. Each asset receives its dependencies through Dagster's dependency injection.

Code Reference

Source Location

  • examples/docs_projects/project_ml/src/project_ml/defs/assets/model_assets.py:L29-657
  • examples/docs_projects/project_ml/src/project_ml/defs/resources.py

Signature/Pattern

Model configuration with Dagster Config:

class ModelConfig(dg.Config):
    conv1_channels: int = 32
    conv2_channels: int = 64
    hidden_size: int = 256
    batch_size: int = 64
    learning_rate: float = 0.001
    epochs: int = 30
    patience: int = 7
    use_early_stopping: bool = True

Abstract model storage resource:

class ModelStoreResource(dg.ConfigurableResource, ABC):
    @abstractmethod
    def save_model(self, model, name: str): ...
    @abstractmethod
    def load_model(self, name: str): ...

class LocalModelStoreResource(ModelStoreResource):
    base_dir: str = "models/"
    def save_model(self, model, name):
        path = Path(self.base_dir) / f"{name}.pkl"
        pickle.dump(model, open(path, "wb"))

Training asset:

@dg.asset(group_name="model_pipeline")
def digit_classifier(config: ModelConfig, processed_mnist_data: dict, model_storage: ModelStoreResource) -> DigitCNN:
    model = DigitCNN(config)
    # ... training loop ...
    model_storage.save_model(model, "digit_classifier")
    return model

Deployment with quality gate:

class DeploymentConfig(dg.Config):
    accuracy_threshold: float = 0.90
    force_deploy: bool = False

@dg.asset(deps=["digit_classifier"])
def production_digit_classifier(config: DeploymentConfig, model_evaluation: dict, model_storage: ModelStoreResource):
    if model_evaluation["test_accuracy"] >= config.accuracy_threshold or config.force_deploy:
        model = model_storage.load_model("digit_classifier")
        model_storage.save_model(model, "production_digit_classifier")
        return model
    return None

Import

import dagster as dg
from dagster import ConfigurableResource

I/O Contract

Direction Name Type Description
Input processed_mnist_data dict Preprocessed training and test data
Input config (ModelConfig) ModelConfig Hyperparameters for model training
Input model_storage ModelStoreResource Pluggable storage backend for model artifacts
Output digit_classifier DigitCNN Trained model artifact
Output model_evaluation dict Evaluation metrics (accuracy, loss, etc.)
Output production_digit_classifier Optional[DigitCNN] Deployed model (None if quality gate fails)
Output predictions dict Batch inference predictions

Usage Examples

Defining the asset graph with resources:

import dagster as dg

defs = dg.Definitions(
    assets=[digit_classifier, model_evaluation, production_digit_classifier, batch_predictions],
    resources={
        "model_storage": LocalModelStoreResource(base_dir="models/"),
    },
)

Materializing with custom config:

# Override hyperparameters at runtime
dg.materialize(
    [digit_classifier],
    run_config=dg.RunConfig(
        ops={"digit_classifier": ModelConfig(epochs=50, learning_rate=0.0001)}
    ),
    resources={"model_storage": LocalModelStoreResource()},
)

Related Pages

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment