Implementation:Kubeflow Pipelines Dsl Pipeline Decorator
| Field | Value |
|---|---|
| Sources | Kubeflow Pipelines, KFP SDK |
| Domains | ML_Pipelines, Orchestration |
| Last Updated | 2026-02-13 00:00 GMT |
Overview
Concrete tool for composing pipeline DAGs from component tasks provided by the KFP SDK.
Description
The @dsl.pipeline decorator transforms a Python function into a pipeline definition. Component tasks are instantiated inside and wired via task.output (single output) or task.outputs["key"] (named outputs). Explicit ordering uses task.after(other_task).
Usage
Use when defining a multi-step ML pipeline. The decorated function becomes the pipeline graph.
Code Reference
Source Location: Repository: kubeflow/pipelines, File: samples/core/sequential/sequential.py (L38-46), samples/core/parallel_join/parallel_join.py (L38-51)
Signature:
@dsl.pipeline(
name: str = None,
description: str = None,
pipeline_root: str = None,
)
def pipeline_func(
param1: str = 'default',
param2: int = 0,
):
task1 = component_op(param=param1)
task2 = another_op(data=task1.output)
task3 = final_op(input=task1.outputs["key"])
task3.after(task2)
Import:
from kfp import dsl
I/O Contract
| Direction | Name | Type | Required | Description |
|---|---|---|---|---|
| Input | name | str | No | Display name for the pipeline |
| Input | description | str | No | Description of the pipeline |
| Input | pipeline_root | str | No | Artifact storage root |
| Input | function parameters | various | varies | Pipeline runtime arguments |
| Output | PipelineSpec | object | — | Pipeline graph spec |
| Output | YAML IR | file | — | Produced when compiled |
Usage Examples
Example 1: Sequential pipeline
From sequential.py:
from kfp import dsl, compiler
@dsl.pipeline(
name='sequential-pipeline',
description='A pipeline with two sequential steps.'
)
def sequential_pipeline(url: str = 'gs://ml-pipeline/sample-data/shakespeare/shakespeare1.txt'):
download_task = gcs_download_op(url=url)
echo_task = echo_op(text=download_task.output)
Example 2: Parallel fan-out/fan-in
From parallel_join.py:
@dsl.pipeline(
name='parallel-pipeline',
description='Download two messages in parallel and prints the concatenated result.'
)
def download_and_join(
url1: str='gs://ml-pipeline/sample-data/shakespeare/shakespeare1.txt',
url2: str='gs://ml-pipeline/sample-data/shakespeare/shakespeare2.txt'
):
download1_task = gcs_download_op(url=url1)
download2_task = gcs_download_op(url=url2)
echo_task = echo2_op(text1=download1_task.output, text2=download2_task.output)