Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Kubeflow Kubeflow KFP SDK Pipeline Definition

From Leeroopedia
Knowledge Sources
Domains MLOps, Pipeline Orchestration, Python SDK
Last Updated 2026-02-13 00:00 GMT

Overview

Concrete tool for defining, compiling, and executing reproducible ML pipelines provided by the Kubeflow Pipelines SDK v2.

Description

The KFP SDK v2 provides a Python-native API for building ML pipelines. Engineers define pipeline components using the @kfp.dsl.component decorator (for lightweight Python function-based components) or @kfp.dsl.container_component (for container-based components), then compose them into a DAG using the @kfp.dsl.pipeline decorator. The kfp.compiler.Compiler class compiles the pipeline definition into an IR YAML file, and kfp.Client submits runs to the KFP backend.

The SDK enforces typed interfaces between components through the use of KFP artifact types (Input[Dataset], Output[Model], Output[Metrics], etc.), ensuring that data flows are validated at compile time. Each pipeline run is tracked in ML Metadata (MLMD), recording all parameters, artifacts, and execution metadata for full lineage.

External Reference

Usage

Use the KFP SDK pipeline definition when:

  • Prototype code must be formalized into a reproducible, version-controlled workflow.
  • Multi-step ML processes need automated orchestration on Kubernetes.
  • Artifact tracking and experiment comparison across runs are required.
  • Pipelines need to be shared, reviewed, and triggered programmatically or on a schedule.

Code Reference

Source Location

Signature

import kfp
from kfp import dsl, compiler

@dsl.component(base_image="python:3.11", packages_to_install=["pandas"])
def my_component(input_data: dsl.Input[dsl.Dataset], output_data: dsl.Output[dsl.Dataset]):
    ...

@dsl.pipeline(name="my-pipeline", description="Pipeline description")
def my_pipeline(param1: str, param2: int = 10):
    step1 = my_component(input_data=...)
    step2 = another_component(input_data=step1.outputs["output_data"])

compiler.Compiler().compile(pipeline_func=my_pipeline, package_path="pipeline.yaml")

client = kfp.Client(host="https://kubeflow.example.com/pipeline")
client.create_run_from_pipeline_func(my_pipeline, arguments={"param1": "value"})

Import

pip install kfp==2.11.0

I/O Contract

Inputs

Name Type Required Description
pipeline_func Python callable Yes Function decorated with @dsl.pipeline defining the DAG
name string Yes Human-readable pipeline name
description string No Description of the pipeline purpose
package_path string Yes (for compile) Output path for the compiled IR YAML file
arguments dict No Runtime parameter overrides for the pipeline run
component definitions @dsl.component functions Yes One or more component functions forming the DAG steps

Outputs

Name Type Description
Compiled pipeline YAML IR YAML file Portable pipeline definition for submission to KFP backend
Pipeline run KFP Run object Tracked execution with unique run ID and status
MLMD artifacts Metadata records All input/output artifacts, parameters, and metrics recorded in ML Metadata
Experiment record KFP Experiment Logical grouping of related pipeline runs for comparison

Usage Examples

Basic Usage

from kfp import dsl, compiler

@dsl.component(base_image="python:3.11", packages_to_install=["scikit-learn"])
def train_model(
    dataset: dsl.Input[dsl.Dataset],
    model: dsl.Output[dsl.Model],
    learning_rate: float = 0.01,
):
    import pickle
    from sklearn.linear_model import SGDClassifier
    import pandas as pd

    df = pd.read_csv(dataset.path)
    X, y = df.drop("target", axis=1), df["target"]
    clf = SGDClassifier(learning_rate="constant", eta0=learning_rate)
    clf.fit(X, y)

    with open(model.path, "wb") as f:
        pickle.dump(clf, f)

@dsl.component(base_image="python:3.11")
def evaluate_model(
    model: dsl.Input[dsl.Model],
    test_data: dsl.Input[dsl.Dataset],
    metrics: dsl.Output[dsl.Metrics],
):
    import pickle
    import pandas as pd

    with open(model.path, "rb") as f:
        clf = pickle.load(f)

    df = pd.read_csv(test_data.path)
    X, y = df.drop("target", axis=1), df["target"]
    accuracy = clf.score(X, y)
    metrics.log_metric("accuracy", accuracy)

@dsl.pipeline(name="train-eval-pipeline", description="Train and evaluate a model")
def train_eval_pipeline(learning_rate: float = 0.01):
    train_step = train_model(dataset=dsl.importer(...), learning_rate=learning_rate)
    evaluate_model(model=train_step.outputs["model"], test_data=dsl.importer(...))

compiler.Compiler().compile(
    pipeline_func=train_eval_pipeline,
    package_path="train_eval_pipeline.yaml",
)

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment