Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Kubeflow Pipelines Metadata Helpers

From Leeroopedia
Knowledge Sources
Domains Metadata_Management, Lineage_Tracking
Last Updated 2026-02-13 14:00 GMT

Overview

Python helper library providing CRUD operations for ML Metadata (MLMD) entities used by the KFP metadata writer service to record pipeline execution lineage.

Description

The metadata_helpers module is the core abstraction layer between KFP's pipeline execution model and the MLMD storage system. It provides functions for connecting to the MLMD gRPC service, creating and retrieving typed entities (artifacts, executions, contexts), and recording lineage relationships (events, attributions). Every pipeline run's lineage — which artifacts were produced by which tasks, in which run context — is recorded through these helpers.

Key constants:

  • `RUN_CONTEXT_TYPE_NAME = "KfpRun"` — Context type for pipeline runs
  • `KFP_EXECUTION_TYPE_NAME_PREFIX = "components."` — Execution type prefix for pipeline components

Usage

Import this module when building services that need to record pipeline metadata into MLMD. It is used by the metadata_writer pod to track artifact provenance and execution history for KFP v1 pipelines.

Code Reference

Source Location

Signature

def connect_to_mlmd() -> metadata_store.MetadataStore:
    """Establishes gRPC connection to MLMD with retry logic (100 attempts),
    IPv6 support, optional TLS, and configurable max message length."""

def get_or_create_artifact_type(store, type_name, properties=None) -> int:
    """Returns type ID, creating the artifact type if it doesn't exist."""

def get_or_create_execution_type(store, type_name, properties=None) -> int:
    """Returns type ID, creating the execution type if it doesn't exist."""

def get_or_create_context_type(store, type_name, properties=None) -> int:
    """Returns type ID, creating the context type if it doesn't exist."""

def create_artifact_with_type(store, uri, type_name, properties=None) -> metadata_store_pb2.Artifact:
    """Creates a new MLMD artifact with the specified type."""

def create_execution_with_type(store, type_name, properties=None) -> metadata_store_pb2.Execution:
    """Creates a new MLMD execution with the specified type."""

def get_or_create_run_context(store, run_id, pipeline_name=None) -> metadata_store_pb2.Context:
    """Creates or retrieves a KfpRun context with pipeline name and run ID."""

def create_new_execution_in_existing_run_context(
    store, context_id, type_name, custom_properties=None
) -> metadata_store_pb2.Execution:
    """Records a task execution within an existing run context."""

def create_new_output_artifact(
    store, execution, context, uri, type_name, properties=None
) -> metadata_store_pb2.Artifact:
    """Creates an output artifact and links it to execution and context via events and attributions."""

def link_execution_to_input_artifact(store, execution, context, artifact) -> None:
    """Links an input artifact to an execution via INPUT event and attribution."""

Import

from metadata_helpers import connect_to_mlmd, get_or_create_run_context

I/O Contract

Inputs

Name Type Required Description
METADATA_GRPC_SERVICE_HOST env var Yes MLMD gRPC server hostname
METADATA_GRPC_SERVICE_PORT env var Yes MLMD gRPC server port
METADATA_TLS_ENABLED env var No Enable TLS for gRPC connection

Outputs

Name Type Description
MetadataStore metadata_store.MetadataStore Connected MLMD client instance
Artifact metadata_store_pb2.Artifact Created/retrieved MLMD artifact
Execution metadata_store_pb2.Execution Created/retrieved MLMD execution
Context metadata_store_pb2.Context Created/retrieved MLMD context (run scope)

Usage Examples

Recording Pipeline Execution Metadata

from metadata_helpers import (
    connect_to_mlmd,
    get_or_create_run_context,
    create_new_execution_in_existing_run_context,
    create_new_output_artifact,
)

# 1. Connect to MLMD store
store = connect_to_mlmd()

# 2. Get or create a run context
run_ctx = get_or_create_run_context(
    store,
    run_id="run-abc-123",
    pipeline_name="my-pipeline",
)

# 3. Record an execution
execution = create_new_execution_in_existing_run_context(
    store,
    context_id=run_ctx.id,
    type_name="components.MyTrainer",
)

# 4. Record output artifact
artifact = create_new_output_artifact(
    store,
    execution=execution,
    context=run_ctx,
    uri="gs://bucket/model/output",
    type_name="Model",
)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment