Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Kubeflow Pipelines Dsl Pipeline Decorator

From Leeroopedia
Field Value
Sources Kubeflow Pipelines, KFP SDK
Domains ML_Pipelines, Orchestration
Last Updated 2026-02-13 00:00 GMT

Overview

Concrete tool for composing pipeline DAGs from component tasks provided by the KFP SDK.

Description

The @dsl.pipeline decorator transforms a Python function into a pipeline definition. Component tasks are instantiated inside and wired via task.output (single output) or task.outputs["key"] (named outputs). Explicit ordering uses task.after(other_task).

Usage

Use when defining a multi-step ML pipeline. The decorated function becomes the pipeline graph.

Code Reference

Source Location: Repository: kubeflow/pipelines, File: samples/core/sequential/sequential.py (L38-46), samples/core/parallel_join/parallel_join.py (L38-51)

Signature:

@dsl.pipeline(
    name: str = None,
    description: str = None,
    pipeline_root: str = None,
)
def pipeline_func(
    param1: str = 'default',
    param2: int = 0,
):
    task1 = component_op(param=param1)
    task2 = another_op(data=task1.output)
    task3 = final_op(input=task1.outputs["key"])
    task3.after(task2)

Import:

from kfp import dsl

I/O Contract

Direction Name Type Required Description
Input name str No Display name for the pipeline
Input description str No Description of the pipeline
Input pipeline_root str No Artifact storage root
Input function parameters various varies Pipeline runtime arguments
Output PipelineSpec object Pipeline graph spec
Output YAML IR file Produced when compiled

Usage Examples

Example 1: Sequential pipeline

From sequential.py:

from kfp import dsl, compiler

@dsl.pipeline(
    name='sequential-pipeline',
    description='A pipeline with two sequential steps.'
)
def sequential_pipeline(url: str = 'gs://ml-pipeline/sample-data/shakespeare/shakespeare1.txt'):
    download_task = gcs_download_op(url=url)
    echo_task = echo_op(text=download_task.output)

Example 2: Parallel fan-out/fan-in

From parallel_join.py:

@dsl.pipeline(
    name='parallel-pipeline',
    description='Download two messages in parallel and prints the concatenated result.'
)
def download_and_join(
    url1: str='gs://ml-pipeline/sample-data/shakespeare/shakespeare1.txt',
    url2: str='gs://ml-pipeline/sample-data/shakespeare/shakespeare2.txt'
):
    download1_task = gcs_download_op(url=url1)
    download2_task = gcs_download_op(url=url2)
    echo_task = echo2_op(text1=download1_task.output, text2=download2_task.output)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment