Implementation:Kubeflow Pipelines Control Flow Composition Pattern
| Sources | Domains | Last Updated |
|---|---|---|
| Kubeflow Pipelines, KFP Control Flow | ML_Pipelines, Control_Flow | 2026-02-13 |
Overview
A user-defined pattern for composing ExitHandler, ParallelFor, and Condition within a single KFP pipeline.
Description
This is a Pattern Doc — it documents how to combine dsl.ExitHandler, dsl.ParallelFor, and dsl.Condition in a single pipeline. There is no single API; rather, it is the composition of multiple APIs. The pattern shows nesting: ExitHandler at the outermost level, ParallelFor for fan-out, and Condition for branching decisions, with further nested Conditions for multi-level decision trees.
Usage
Use this pattern when your pipeline needs complex execution logic combining guaranteed cleanup, parallel iteration, and conditional branching.
Interface Specification
@dsl.pipeline(name='combined-control-flow')
def combined_pipeline():
# 1. Define exit handler task
exit_task = cleanup_op(...)
# 2. Wrap failure-prone section
with dsl.ExitHandler(exit_task):
risky_op(...)
# 3. Fan-out with parallel loop
with dsl.ParallelFor(items_list) as item:
# 4. Branch based on conditions
with dsl.Condition(task.output == item):
result = process_op(input=item)
# 5. Nested condition
with dsl.Condition(result.output > threshold):
handle_high_op(...)
Code Reference
- Source Location: Repository:
kubeflow/pipelines, File:samples/tutorials/DSL - Control structures/DSL - Control structures.py(L71-95) - Import:
from kfp import dsl
Usage Examples
Example — Full combined control flow (DSL - Control structures.py):
@dsl.pipeline(
name='tutorial-control-flows',
description='Shows how to use dsl.Condition(), dsl.ParallelFor, and dsl.ExitHandler().'
)
def control_flows_pipeline():
exit_task = print_op(message='Exit handler has worked!')
with dsl.ExitHandler(exit_task):
fail_op(message="Failing the run to demonstrate that exit handler still gets executed.")
flip = flip_coin_op()
with dsl.ParallelFor(['heads', 'tails']) as expected_result:
with dsl.Condition(flip.output == expected_result):
random_num_head = get_random_int_op(minimum=0, maximum=9)
with dsl.Condition(random_num_head.output > 5):
print_op(message=f'{expected_result} and {random_num_head.output} > 5!')
with dsl.Condition(random_num_head.output <= 5):
print_op(message=f'{expected_result} and {random_num_head.output} <= 5!')