Implementation:Dagster io Dagster Dagster Pipes Client
| Field | Value |
|---|---|
| Implementation Name | Dagster Pipes Client |
| Source (core) | python_modules/dagster/dagster/_core/pipes/client.py:L28-54
|
| Source (Modal) | Integration via dagster-modal package
|
| Repository | dagster-io/dagster |
| Domains | Data_Engineering, Serverless, GPU_Computing |
Overview
Concrete client for orchestrating external processes with the Dagster Pipes protocol provided by the Dagster core library and dagster-modal integration.
Description
The PipesClient is an abstract base class in Dagster's core that defines the interface for launching and communicating with external processes. Concrete implementations (such as ModalClient, PipesSubprocessClient, and others) handle the specifics of each execution environment. On the external side, the dagster-pipes lightweight package provides open_dagster_pipes() for reporting materializations, metadata, and logs back to Dagster through the configured transport layer.
Usage
Use PipesClient subclasses when you need to run computation in an external environment while maintaining full observability in the Dagster asset graph. The orchestrator side uses the client to launch the process and collect results; the external side uses open_dagster_pipes() to report back.
Code Reference
Source Location
- Core base class:
python_modules/dagster/dagster/_core/pipes/client.py:L28-54 - Modal integration:
dagster-modalpackage
Signature / Pattern
# Core PipesClient base class
class PipesClient(ABC):
"""Pipes client base class.
Pipes clients for specific external environments should subclass this.
"""
@abstractmethod
def run(
self,
*,
context: Union[OpExecutionContext, AssetExecutionContext],
extras: Optional[PipesExtras] = None,
**kwargs,
) -> PipesClientCompletedInvocation:
...
# Dagster side (orchestrator) using Modal
from dagster_modal import ModalClient
@dg.asset(kinds={"modal", "gpu"})
def transcription(context: dg.AssetExecutionContext, modal_client: ModalClient, s3: S3Resource):
result = modal_client.run(
func_ref="modal_project.transcribe",
context=context,
env={"R2_BUCKET_NAME": "my-bucket"},
extras={"audio_file_path": "data/episode.mp3"},
).get_materialize_result()
return result
# External side (Modal function)
from dagster_pipes import open_dagster_pipes
@modal.function(gpu="any", timeout=900)
def transcribe():
with open_dagster_pipes() as context:
# ... do work ...
context.report_asset_materialization(
metadata={"transcript_length": len(text), "model": "whisper-base.en"}
)
Import
- Orchestrator:
from dagster_modal import ModalClient - External process:
from dagster_pipes import open_dagster_pipes
I/O Contract
| Direction | Name | Type | Description |
|---|---|---|---|
| Input | func_ref | str |
Reference to the external function to invoke |
| Input | env | dict |
Environment variables passed to the external process |
| Input | extras | dict |
Arbitrary data passed to the external environment via Pipes context |
| Input | context | AssetExecutionContext |
The Dagster execution context for the asset |
| Output | MaterializeResult | MaterializeResult |
Returned via get_materialize_result() on the completed invocation
|
| Output | metadata | various | Metadata reported from the external process (e.g., transcript_length, model name) |
Usage Examples
import dagster as dg
from dagster_modal import ModalClient
@dg.asset(kinds={"modal", "gpu"})
def transcription(
context: dg.AssetExecutionContext,
modal_client: ModalClient,
s3: S3Resource,
):
result = modal_client.run(
func_ref="modal_project.transcribe",
context=context,
env={"R2_BUCKET_NAME": "my-bucket"},
extras={"audio_file_path": "data/episode.mp3"},
).get_materialize_result()
return result
defs = dg.Definitions(
assets=[transcription],
resources={
"modal_client": ModalClient(),
"s3": S3Resource(bucket_name="my-bucket"),
},
)