Implementation:Kubeflow Kubeflow KFP SDK Pipeline Definition
| Knowledge Sources | |
|---|---|
| Domains | MLOps, Pipeline Orchestration, Python SDK |
| Last Updated | 2026-02-13 00:00 GMT |
Overview
Concrete tool for defining, compiling, and executing reproducible ML pipelines provided by the Kubeflow Pipelines SDK v2.
Description
The KFP SDK v2 provides a Python-native API for building ML pipelines. Engineers define pipeline components using the @kfp.dsl.component decorator (for lightweight Python function-based components) or @kfp.dsl.container_component (for container-based components), then compose them into a DAG using the @kfp.dsl.pipeline decorator. The kfp.compiler.Compiler class compiles the pipeline definition into an IR YAML file, and kfp.Client submits runs to the KFP backend.
The SDK enforces typed interfaces between components through the use of KFP artifact types (Input[Dataset], Output[Model], Output[Metrics], etc.), ensuring that data flows are validated at compile time. Each pipeline run is tracked in ML Metadata (MLMD), recording all parameters, artifacts, and execution metadata for full lineage.
External Reference
Usage
Use the KFP SDK pipeline definition when:
- Prototype code must be formalized into a reproducible, version-controlled workflow.
- Multi-step ML processes need automated orchestration on Kubernetes.
- Artifact tracking and experiment comparison across runs are required.
- Pipelines need to be shared, reviewed, and triggered programmatically or on a schedule.
Code Reference
Source Location
- Repository: kubeflow/pipelines
- SDK Repository: kubeflow/sdk
- File: sdk/python/kfp/dsl/_pipeline.py, sdk/python/kfp/compiler/compiler.py
Signature
import kfp
from kfp import dsl, compiler
@dsl.component(base_image="python:3.11", packages_to_install=["pandas"])
def my_component(input_data: dsl.Input[dsl.Dataset], output_data: dsl.Output[dsl.Dataset]):
...
@dsl.pipeline(name="my-pipeline", description="Pipeline description")
def my_pipeline(param1: str, param2: int = 10):
step1 = my_component(input_data=...)
step2 = another_component(input_data=step1.outputs["output_data"])
compiler.Compiler().compile(pipeline_func=my_pipeline, package_path="pipeline.yaml")
client = kfp.Client(host="https://kubeflow.example.com/pipeline")
client.create_run_from_pipeline_func(my_pipeline, arguments={"param1": "value"})
Import
pip install kfp==2.11.0
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| pipeline_func | Python callable | Yes | Function decorated with @dsl.pipeline defining the DAG |
| name | string | Yes | Human-readable pipeline name |
| description | string | No | Description of the pipeline purpose |
| package_path | string | Yes (for compile) | Output path for the compiled IR YAML file |
| arguments | dict | No | Runtime parameter overrides for the pipeline run |
| component definitions | @dsl.component functions | Yes | One or more component functions forming the DAG steps |
Outputs
| Name | Type | Description |
|---|---|---|
| Compiled pipeline YAML | IR YAML file | Portable pipeline definition for submission to KFP backend |
| Pipeline run | KFP Run object | Tracked execution with unique run ID and status |
| MLMD artifacts | Metadata records | All input/output artifacts, parameters, and metrics recorded in ML Metadata |
| Experiment record | KFP Experiment | Logical grouping of related pipeline runs for comparison |
Usage Examples
Basic Usage
from kfp import dsl, compiler
@dsl.component(base_image="python:3.11", packages_to_install=["scikit-learn"])
def train_model(
dataset: dsl.Input[dsl.Dataset],
model: dsl.Output[dsl.Model],
learning_rate: float = 0.01,
):
import pickle
from sklearn.linear_model import SGDClassifier
import pandas as pd
df = pd.read_csv(dataset.path)
X, y = df.drop("target", axis=1), df["target"]
clf = SGDClassifier(learning_rate="constant", eta0=learning_rate)
clf.fit(X, y)
with open(model.path, "wb") as f:
pickle.dump(clf, f)
@dsl.component(base_image="python:3.11")
def evaluate_model(
model: dsl.Input[dsl.Model],
test_data: dsl.Input[dsl.Dataset],
metrics: dsl.Output[dsl.Metrics],
):
import pickle
import pandas as pd
with open(model.path, "rb") as f:
clf = pickle.load(f)
df = pd.read_csv(test_data.path)
X, y = df.drop("target", axis=1), df["target"]
accuracy = clf.score(X, y)
metrics.log_metric("accuracy", accuracy)
@dsl.pipeline(name="train-eval-pipeline", description="Train and evaluate a model")
def train_eval_pipeline(learning_rate: float = 0.01):
train_step = train_model(dataset=dsl.importer(...), learning_rate=learning_rate)
evaluate_model(model=train_step.outputs["model"], test_data=dsl.importer(...))
compiler.Compiler().compile(
pipeline_func=train_eval_pipeline,
package_path="train_eval_pipeline.yaml",
)