Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Kubeflow Pipelines Dsl ParallelFor

From Leeroopedia
Sources Domains Last Updated
Kubeflow Pipelines, KFP Control Flow ML_Pipelines, Parallelism 2026-02-13

Overview

Concrete tool for parallel loop execution in pipelines provided by the KFP DSL context manager.

Description

dsl.ParallelFor is a context manager that iterates over a list, executing all contained tasks for each item in parallel. Items can be a static list, list of dicts, or a PipelineChannel from a preceding task output. The loop variable supports attribute access for dict items (item.field_name). The parallelism parameter caps maximum concurrent iterations.

Usage

Use within a @dsl.pipeline function when the same operations must be applied to each item in a collection.

Code Reference

Source Location: Repository: kubeflow/pipelines, Files: samples/core/loop_parallelism/loop_parallelism.py (L21-26), samples/core/loop_output/loop_output.py (L33-37), samples/core/loop_parameter/loop_parameter.py (L20-28)

Signature:

class ParallelFor:
    def __init__(
        self,
        items: Union[PipelineChannel, List],
        parallelism: Optional[int] = None,
        name: Optional[str] = None,
    ): ...
    def __enter__(self) -> LoopArgVariable: ...
    def __exit__(self, *args): ...

Import: from kfp import dsl

I/O Contract

Inputs:

Parameter Type Required Description
items Union[PipelineChannel, List] Yes Iterable of items
parallelism int No Max concurrent iterations
name str No Display name

Outputs:

  • LoopArgVariable — loop variable representing current item; supports attribute access for structured items

Usage Examples

Example 1 — Static structured items with parallelism (loop_parallelism.py):

@dsl.pipeline(name='my-pipeline')
def pipeline():
    loop_args = [{'A_a': 1, 'B_b': 2}, {'A_a': 10, 'B_b': 20}]
    with dsl.ParallelFor(items=loop_args, parallelism=10) as item:
        print_op(s=item.A_a)
        print_op(s=item.B_b)

Example 2 — Simple list iteration:

with dsl.ParallelFor(['heads', 'tails']) as expected_result:
    with dsl.Condition(flip.output == expected_result):
        print_op(message=expected_result)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment