Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Kubeflow Pipelines Control Flow Composition Pattern

From Leeroopedia
Sources Domains Last Updated
Kubeflow Pipelines, KFP Control Flow ML_Pipelines, Control_Flow 2026-02-13

Overview

A user-defined pattern for composing ExitHandler, ParallelFor, and Condition within a single KFP pipeline.

Description

This is a Pattern Doc — it documents how to combine dsl.ExitHandler, dsl.ParallelFor, and dsl.Condition in a single pipeline. There is no single API; rather, it is the composition of multiple APIs. The pattern shows nesting: ExitHandler at the outermost level, ParallelFor for fan-out, and Condition for branching decisions, with further nested Conditions for multi-level decision trees.

Usage

Use this pattern when your pipeline needs complex execution logic combining guaranteed cleanup, parallel iteration, and conditional branching.

Interface Specification

@dsl.pipeline(name='combined-control-flow')
def combined_pipeline():
    # 1. Define exit handler task
    exit_task = cleanup_op(...)

    # 2. Wrap failure-prone section
    with dsl.ExitHandler(exit_task):
        risky_op(...)

    # 3. Fan-out with parallel loop
    with dsl.ParallelFor(items_list) as item:
        # 4. Branch based on conditions
        with dsl.Condition(task.output == item):
            result = process_op(input=item)
            # 5. Nested condition
            with dsl.Condition(result.output > threshold):
                handle_high_op(...)

Code Reference

  • Source Location: Repository: kubeflow/pipelines, File: samples/tutorials/DSL - Control structures/DSL - Control structures.py (L71-95)
  • Import: from kfp import dsl

Usage Examples

Example — Full combined control flow (DSL - Control structures.py):

@dsl.pipeline(
    name='tutorial-control-flows',
    description='Shows how to use dsl.Condition(), dsl.ParallelFor, and dsl.ExitHandler().'
)
def control_flows_pipeline():
    exit_task = print_op(message='Exit handler has worked!')
    with dsl.ExitHandler(exit_task):
        fail_op(message="Failing the run to demonstrate that exit handler still gets executed.")

    flip = flip_coin_op()

    with dsl.ParallelFor(['heads', 'tails']) as expected_result:
        with dsl.Condition(flip.output == expected_result):
            random_num_head = get_random_int_op(minimum=0, maximum=9)
            with dsl.Condition(random_num_head.output > 5):
                print_op(message=f'{expected_result} and {random_num_head.output} > 5!')
            with dsl.Condition(random_num_head.output <= 5):
                print_op(message=f'{expected_result} and {random_num_head.output} <= 5!')

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment