Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Dagster io Dagster Dagster Pipes Client

From Leeroopedia


Field Value
Implementation Name Dagster Pipes Client
Source (core) python_modules/dagster/dagster/_core/pipes/client.py:L28-54
Source (Modal) Integration via dagster-modal package
Repository dagster-io/dagster
Domains Data_Engineering, Serverless, GPU_Computing

Overview

Concrete client for orchestrating external processes with the Dagster Pipes protocol provided by the Dagster core library and dagster-modal integration.

Description

The PipesClient is an abstract base class in Dagster's core that defines the interface for launching and communicating with external processes. Concrete implementations (such as ModalClient, PipesSubprocessClient, and others) handle the specifics of each execution environment. On the external side, the dagster-pipes lightweight package provides open_dagster_pipes() for reporting materializations, metadata, and logs back to Dagster through the configured transport layer.

Usage

Use PipesClient subclasses when you need to run computation in an external environment while maintaining full observability in the Dagster asset graph. The orchestrator side uses the client to launch the process and collect results; the external side uses open_dagster_pipes() to report back.

Code Reference

Source Location

  • Core base class: python_modules/dagster/dagster/_core/pipes/client.py:L28-54
  • Modal integration: dagster-modal package

Signature / Pattern

# Core PipesClient base class
class PipesClient(ABC):
    """Pipes client base class.

    Pipes clients for specific external environments should subclass this.
    """

    @abstractmethod
    def run(
        self,
        *,
        context: Union[OpExecutionContext, AssetExecutionContext],
        extras: Optional[PipesExtras] = None,
        **kwargs,
    ) -> PipesClientCompletedInvocation:
        ...
# Dagster side (orchestrator) using Modal
from dagster_modal import ModalClient

@dg.asset(kinds={"modal", "gpu"})
def transcription(context: dg.AssetExecutionContext, modal_client: ModalClient, s3: S3Resource):
    result = modal_client.run(
        func_ref="modal_project.transcribe",
        context=context,
        env={"R2_BUCKET_NAME": "my-bucket"},
        extras={"audio_file_path": "data/episode.mp3"},
    ).get_materialize_result()
    return result
# External side (Modal function)
from dagster_pipes import open_dagster_pipes

@modal.function(gpu="any", timeout=900)
def transcribe():
    with open_dagster_pipes() as context:
        # ... do work ...
        context.report_asset_materialization(
            metadata={"transcript_length": len(text), "model": "whisper-base.en"}
        )

Import

  • Orchestrator: from dagster_modal import ModalClient
  • External process: from dagster_pipes import open_dagster_pipes

I/O Contract

Direction Name Type Description
Input func_ref str Reference to the external function to invoke
Input env dict Environment variables passed to the external process
Input extras dict Arbitrary data passed to the external environment via Pipes context
Input context AssetExecutionContext The Dagster execution context for the asset
Output MaterializeResult MaterializeResult Returned via get_materialize_result() on the completed invocation
Output metadata various Metadata reported from the external process (e.g., transcript_length, model name)

Usage Examples

import dagster as dg
from dagster_modal import ModalClient

@dg.asset(kinds={"modal", "gpu"})
def transcription(
    context: dg.AssetExecutionContext,
    modal_client: ModalClient,
    s3: S3Resource,
):
    result = modal_client.run(
        func_ref="modal_project.transcribe",
        context=context,
        env={"R2_BUCKET_NAME": "my-bucket"},
        extras={"audio_file_path": "data/episode.mp3"},
    ).get_materialize_result()
    return result

defs = dg.Definitions(
    assets=[transcription],
    resources={
        "modal_client": ModalClient(),
        "s3": S3Resource(bucket_name="my-bucket"),
    },
)

Related Pages

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment