Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Kubeflow Pipelines PipelineTask Configuration Methods

From Leeroopedia
Sources Kubeflow Pipelines, KFP SDK
Domains ML_Pipelines, Resource_Management
Last Updated 2026-02-13

Overview

Concrete tool for configuring task-level resources, caching, and retry policies provided by the KFP SDK PipelineTask fluent API.

Description

PipelineTask objects returned from component invocations expose chainable methods: set_cpu_limit(), set_memory_limit(), set_caching_options(), set_retry(). These methods return the task itself enabling method chaining.

Usage

Call on any PipelineTask instance within a pipeline function to set resource constraints or execution policies.

Code Reference

Source Location: Repository: kubeflow/pipelines, Files: samples/core/resource_spec/resource_spec.py (L18-35), samples/core/caching/caching_sample.py (L61-64), samples/core/retry/retry.py (L31-37)

Signature:

class PipelineTask:
    def set_cpu_limit(self, cpu: str) -> 'PipelineTask': ...
    def set_memory_limit(self, memory: str) -> 'PipelineTask': ...
    def set_caching_options(self, enable_caching: bool) -> 'PipelineTask': ...
    def set_retry(self, num_retries: int, backoff_duration: str = None,
                  backoff_factor: float = None,
                  backoff_max_duration: str = None) -> 'PipelineTask': ...

Import: from kfp import dsl (PipelineTask returned from component calls)

I/O Contract

Direction Parameter Type Description
Input cpu str Kubernetes CPU units e.g. '1'
Input memory str Kubernetes memory e.g. '650M', '1Gi'
Input enable_caching bool Whether to enable execution caching
Input num_retries int Number of retry attempts on failure
Output PipelineTask PipelineTask Same task with config applied (chainable)

Usage Examples

Example 1 -- Resource limits (resource_spec.py):

training_task = training_op(n=n).set_cpu_limit('1').set_memory_limit('650M')

Example 2 -- Caching (caching_sample.py):

work_task = do_work_op(seconds=seconds)
work_task.set_caching_options(enable_caching=False)

Example 3 -- Retry (retry.py):

op1 = random_failure_op(exit_codes='0,1,2,3').set_retry(10)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment