Implementation:Kubeflow Pipelines Dsl ParallelFor
| Sources | Domains | Last Updated |
|---|---|---|
| Kubeflow Pipelines, KFP Control Flow | ML_Pipelines, Parallelism | 2026-02-13 |
Overview
Concrete tool for parallel loop execution in pipelines provided by the KFP DSL context manager.
Description
dsl.ParallelFor is a context manager that iterates over a list, executing all contained tasks for each item in parallel. Items can be a static list, list of dicts, or a PipelineChannel from a preceding task output. The loop variable supports attribute access for dict items (item.field_name). The parallelism parameter caps maximum concurrent iterations.
Usage
Use within a @dsl.pipeline function when the same operations must be applied to each item in a collection.
Code Reference
Source Location: Repository: kubeflow/pipelines, Files: samples/core/loop_parallelism/loop_parallelism.py (L21-26), samples/core/loop_output/loop_output.py (L33-37), samples/core/loop_parameter/loop_parameter.py (L20-28)
Signature:
class ParallelFor:
def __init__(
self,
items: Union[PipelineChannel, List],
parallelism: Optional[int] = None,
name: Optional[str] = None,
): ...
def __enter__(self) -> LoopArgVariable: ...
def __exit__(self, *args): ...
Import: from kfp import dsl
I/O Contract
Inputs:
| Parameter | Type | Required | Description |
|---|---|---|---|
| items | Union[PipelineChannel, List] |
Yes | Iterable of items |
| parallelism | int |
No | Max concurrent iterations |
| name | str |
No | Display name |
Outputs:
LoopArgVariable— loop variable representing current item; supports attribute access for structured items
Usage Examples
Example 1 — Static structured items with parallelism (loop_parallelism.py):
@dsl.pipeline(name='my-pipeline')
def pipeline():
loop_args = [{'A_a': 1, 'B_b': 2}, {'A_a': 10, 'B_b': 20}]
with dsl.ParallelFor(items=loop_args, parallelism=10) as item:
print_op(s=item.A_a)
print_op(s=item.B_b)
Example 2 — Simple list iteration:
with dsl.ParallelFor(['heads', 'tails']) as expected_result:
with dsl.Condition(flip.output == expected_result):
print_op(message=expected_result)