Implementation:Kubeflow Pipelines PipelineTask Configuration Methods
| Sources | Kubeflow Pipelines, KFP SDK |
|---|---|
| Domains | ML_Pipelines, Resource_Management |
| Last Updated | 2026-02-13 |
Overview
Concrete tool for configuring task-level resources, caching, and retry policies provided by the KFP SDK PipelineTask fluent API.
Description
PipelineTask objects returned from component invocations expose chainable methods: set_cpu_limit(), set_memory_limit(), set_caching_options(), set_retry(). These methods return the task itself enabling method chaining.
Usage
Call on any PipelineTask instance within a pipeline function to set resource constraints or execution policies.
Code Reference
Source Location: Repository: kubeflow/pipelines, Files: samples/core/resource_spec/resource_spec.py (L18-35), samples/core/caching/caching_sample.py (L61-64), samples/core/retry/retry.py (L31-37)
Signature:
class PipelineTask:
def set_cpu_limit(self, cpu: str) -> 'PipelineTask': ...
def set_memory_limit(self, memory: str) -> 'PipelineTask': ...
def set_caching_options(self, enable_caching: bool) -> 'PipelineTask': ...
def set_retry(self, num_retries: int, backoff_duration: str = None,
backoff_factor: float = None,
backoff_max_duration: str = None) -> 'PipelineTask': ...
Import: from kfp import dsl (PipelineTask returned from component calls)
I/O Contract
| Direction | Parameter | Type | Description |
|---|---|---|---|
| Input | cpu | str | Kubernetes CPU units e.g. '1' |
| Input | memory | str | Kubernetes memory e.g. '650M', '1Gi' |
| Input | enable_caching | bool | Whether to enable execution caching |
| Input | num_retries | int | Number of retry attempts on failure |
| Output | PipelineTask | PipelineTask | Same task with config applied (chainable) |
Usage Examples
Example 1 -- Resource limits (resource_spec.py):
training_task = training_op(n=n).set_cpu_limit('1').set_memory_limit('650M')
Example 2 -- Caching (caching_sample.py):
work_task = do_work_op(seconds=seconds)
work_task.set_caching_options(enable_caching=False)
Example 3 -- Retry (retry.py):
op1 = random_failure_op(exit_codes='0,1,2,3').set_retry(10)