Implementation:Kubeflow Pipelines Recursive Training Loop Pattern
| Sources | Kubeflow Pipelines, KFP Control Flow |
|---|---|
| Domains | Machine_Learning, Control_Flow |
| Last Updated | 2026-02-13 |
Overview
Pattern Doc for implementing a train-evaluate-check recursive loop with MSE-based termination in KFP.
Description
This is a Pattern Doc documenting how to combine @dsl.graph_component, xgboost_train_on_csv_op, xgboost_predict_on_csv_op, calculate_regression_metrics_from_csv_op, and dsl.Condition into a recursive training loop. The pattern has 4 phases:
- Phase 1: Retrain the model with additional iterations
- Phase 2: Generate predictions
- Phase 3: Calculate metrics
- Phase 4: Check MSE > threshold and recurse if needed
The recursive call passes the updated model back to the loop entry.
Usage
Use this pattern to implement iterative model improvement until convergence.
Interface Specification
@dsl.graph_component
def train_until_low_error(starting_model, training_data, true_values):
# Phase 1: Retrain
model = xgboost_train_on_csv_op(
training_data=training_data,
starting_model=starting_model,
label_column=0, objective='reg:squarederror', num_iterations=50,
).outputs['model']
# Phase 2: Predict
predictions = xgboost_predict_on_csv_op(
data=training_data, model=model, label_column=0
).output
# Phase 3: Evaluate
metrics_task = calculate_regression_metrics_from_csv_op(
true_values=true_values, predicted_values=predictions
)
# Phase 4: Check and recurse
with dsl.Condition(metrics_task.outputs['mean_squared_error'] > 0.01):
train_until_low_error(
starting_model=model,
training_data=training_data,
true_values=true_values,
)
Code Reference
Source: samples/core/train_until_good/train_until_good.py (L33-64 recursive loop, L68-98 main pipeline). Import: from kfp import dsl, components
Usage Examples
Complete pipeline using the pattern:
@dsl.pipeline()
def train_until_good_pipeline():
training_data = chicago_taxi_dataset_op(
where='trip_start_timestamp >= "2019-01-01" AND trip_start_timestamp < "2019-02-01"',
select='tips,trip_seconds,...', limit=10000,
).output
true_values_table = pandas_transform_csv_op(table=training_data, transform_code='df = df[["tips"]]').output
true_values = drop_header_op(true_values_table).output
first_model = xgboost_train_on_csv_op(
training_data=training_data, label_column=0,
objective='reg:squarederror', num_iterations=100,
).outputs['model']
train_until_low_error(
starting_model=first_model,
training_data=training_data,
true_values=true_values,
)