Implementation:Kubeflow Pipelines Runtime Resource Request Sample
| Knowledge Sources | |
|---|---|
| Domains | Pipeline_Sample, Resource_Management |
| Last Updated | 2026-02-13 14:00 GMT |
Overview
Sample pipeline demonstrating the pattern for generating CPU and memory resource requests at runtime from a component, with static fallback due to SDK limitations.
Description
This sample (49 lines) sketches the pattern for runtime-dynamic resource allocation. The `generate_resource_request` component returns a NamedTuple with memory and cpu values. Due to PipelineParameterChannel not yet supporting resource inputs, the actual limits are hardcoded via `set_memory_limit('500Mi')` and `set_cpu_limit('200m')`.
Usage
Reference this sample for understanding the intended API for dynamic resource allocation in KFP pipelines and the current workaround using static resource limits.
Code Reference
Source Location
- Repository: Kubeflow_Pipelines
- File: samples/core/resource_spec/runtime_resource_request.py
- Lines: 1-49
Signature
@dsl.component
def training_op():
"""Allocates memory to exercise resource limits."""
@dsl.component
def generate_resource_request() -> NamedTuple('Outputs', [('memory', str), ('cpu', str)]):
"""Returns resource request values: memory='500Mi', cpu='200m'."""
@dsl.pipeline
def resource_request_pipeline():
"""Pipeline: generate resources -> training with static limits."""
Import
from kfp import dsl
from typing import NamedTuple
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| (none) | — | — | Pipeline takes no inputs |
Outputs
| Name | Type | Description |
|---|---|---|
| Compiled YAML | file | Pipeline IR YAML for submission to KFP |
Usage Examples
Resource Request Pattern
from kfp import dsl
@dsl.component
def generate_resource_request() -> NamedTuple('Outputs', [('memory', str), ('cpu', str)]):
outputs = NamedTuple('Outputs', [('memory', str), ('cpu', str)])
return outputs(memory='500Mi', cpu='200m')
@dsl.pipeline
def resource_request_pipeline():
resource_task = generate_resource_request()
training_task = training_op()
# TODO: Use resource_task.outputs when PipelineParameterChannel supports it
training_task.set_memory_limit('500Mi')
training_task.set_cpu_limit('200m')
training_task.set_cpu_request('200m')