Implementation:Kubeflow Pipelines Metadata Helpers
| Knowledge Sources | |
|---|---|
| Domains | Metadata_Management, Lineage_Tracking |
| Last Updated | 2026-02-13 14:00 GMT |
Overview
Python helper library providing CRUD operations for ML Metadata (MLMD) entities used by the KFP metadata writer service to record pipeline execution lineage.
Description
The metadata_helpers module is the core abstraction layer between KFP's pipeline execution model and the MLMD storage system. It provides functions for connecting to the MLMD gRPC service, creating and retrieving typed entities (artifacts, executions, contexts), and recording lineage relationships (events, attributions). Every pipeline run's lineage — which artifacts were produced by which tasks, in which run context — is recorded through these helpers.
Key constants:
- `RUN_CONTEXT_TYPE_NAME = "KfpRun"` — Context type for pipeline runs
- `KFP_EXECUTION_TYPE_NAME_PREFIX = "components."` — Execution type prefix for pipeline components
Usage
Import this module when building services that need to record pipeline metadata into MLMD. It is used by the metadata_writer pod to track artifact provenance and execution history for KFP v1 pipelines.
Code Reference
Source Location
- Repository: Kubeflow_Pipelines
- File: backend/metadata_writer/src/metadata_helpers.py
- Lines: 1-444
Signature
def connect_to_mlmd() -> metadata_store.MetadataStore:
"""Establishes gRPC connection to MLMD with retry logic (100 attempts),
IPv6 support, optional TLS, and configurable max message length."""
def get_or_create_artifact_type(store, type_name, properties=None) -> int:
"""Returns type ID, creating the artifact type if it doesn't exist."""
def get_or_create_execution_type(store, type_name, properties=None) -> int:
"""Returns type ID, creating the execution type if it doesn't exist."""
def get_or_create_context_type(store, type_name, properties=None) -> int:
"""Returns type ID, creating the context type if it doesn't exist."""
def create_artifact_with_type(store, uri, type_name, properties=None) -> metadata_store_pb2.Artifact:
"""Creates a new MLMD artifact with the specified type."""
def create_execution_with_type(store, type_name, properties=None) -> metadata_store_pb2.Execution:
"""Creates a new MLMD execution with the specified type."""
def get_or_create_run_context(store, run_id, pipeline_name=None) -> metadata_store_pb2.Context:
"""Creates or retrieves a KfpRun context with pipeline name and run ID."""
def create_new_execution_in_existing_run_context(
store, context_id, type_name, custom_properties=None
) -> metadata_store_pb2.Execution:
"""Records a task execution within an existing run context."""
def create_new_output_artifact(
store, execution, context, uri, type_name, properties=None
) -> metadata_store_pb2.Artifact:
"""Creates an output artifact and links it to execution and context via events and attributions."""
def link_execution_to_input_artifact(store, execution, context, artifact) -> None:
"""Links an input artifact to an execution via INPUT event and attribution."""
Import
from metadata_helpers import connect_to_mlmd, get_or_create_run_context
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| METADATA_GRPC_SERVICE_HOST | env var | Yes | MLMD gRPC server hostname |
| METADATA_GRPC_SERVICE_PORT | env var | Yes | MLMD gRPC server port |
| METADATA_TLS_ENABLED | env var | No | Enable TLS for gRPC connection |
Outputs
| Name | Type | Description |
|---|---|---|
| MetadataStore | metadata_store.MetadataStore | Connected MLMD client instance |
| Artifact | metadata_store_pb2.Artifact | Created/retrieved MLMD artifact |
| Execution | metadata_store_pb2.Execution | Created/retrieved MLMD execution |
| Context | metadata_store_pb2.Context | Created/retrieved MLMD context (run scope) |
Usage Examples
Recording Pipeline Execution Metadata
from metadata_helpers import (
connect_to_mlmd,
get_or_create_run_context,
create_new_execution_in_existing_run_context,
create_new_output_artifact,
)
# 1. Connect to MLMD store
store = connect_to_mlmd()
# 2. Get or create a run context
run_ctx = get_or_create_run_context(
store,
run_id="run-abc-123",
pipeline_name="my-pipeline",
)
# 3. Record an execution
execution = create_new_execution_in_existing_run_context(
store,
context_id=run_ctx.id,
type_name="components.MyTrainer",
)
# 4. Record output artifact
artifact = create_new_output_artifact(
store,
execution=execution,
context=run_ctx,
uri="gs://bucket/model/output",
type_name="Model",
)