Implementation:Kubeflow Pipelines Metadata Writer
| Knowledge Sources | |
|---|---|
| Domains | Metadata_Management, Lineage_Tracking, Kubernetes |
| Last Updated | 2026-02-13 14:00 GMT |
Overview
Long-running Kubernetes pod watcher that observes Argo Workflow pods and records their execution metadata into the ML Metadata (MLMD) store for KFP v1 pipelines.
Description
The metadata_writer module runs as a backend service (Deployment) that uses the Kubernetes watch API to stream pod events filtered by the `workflows.argoproj.io/workflow` label. For each pipeline step pod, it extracts component metadata from pod annotations and environment variables, creates MLMD execution and artifact records, and links them to the appropriate run context. It handles both input and output artifacts, supporting S3/MinIO URI construction.
Processing flow:
- Watches for Argo Workflow pods via Kubernetes watch API
- Skips TFX pods and KFP v2 pods (they have their own metadata writers)
- Extracts Argo template, component spec, and arguments from pod metadata
- Creates or retrieves an MLMD run context for the workflow
- Creates execution records and links input artifacts by URI
- Patches pods with `metadata_execution_id` and `metadata_context_id` labels
- On pod completion, records output artifacts from `workflows.argoproj.io/outputs` annotation
- Patches pods with `metadata_written=true` label
Usage
This module is deployed as the `metadata-writer` Kubernetes Deployment. It is not imported directly by user code but runs as a backend service automatically recording lineage for KFP v1 pipelines.
Code Reference
Source Location
- Repository: Kubeflow_Pipelines
- File: backend/metadata_writer/src/metadata_writer.py
- Lines: 1-407
Signature
def patch_pod_metadata(namespace, pod_name, patch_dict) -> None:
"""Patches a Kubernetes pod with metadata labels."""
def output_name_to_argo(name: str) -> str:
"""Converts component output name to Argo annotation key format."""
def is_s3_endpoint(endpoint: str) -> bool:
"""Checks if an endpoint URL is S3-compatible."""
def get_object_store_provider(endpoint: str) -> str:
"""Returns 's3' or 'minio' based on endpoint URL."""
def argo_artifact_to_uri(artifact: dict) -> str:
"""Converts Argo artifact annotation to storage URI (s3://, gs://, etc.)."""
def is_tfx_pod(pod: V1Pod) -> bool:
"""Returns True if the pod belongs to a TFX pipeline (skipped by this writer)."""
def is_kfp_v2_pod(pod: V1Pod) -> bool:
"""Returns True if the pod is a KFP v2 pod (has its own metadata handling)."""
Import
# Not typically imported; runs as a standalone service
# Internal imports:
from metadata_helpers import connect_to_mlmd, get_or_create_run_context
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| Kubernetes Watch Events | kubernetes.watch.Watch | Yes | Pod event stream filtered by Argo workflow label |
| Pod Annotations | dict | Yes | Argo workflow annotations with template, outputs, component spec |
| METADATA_GRPC_SERVICE_HOST | env var | Yes | MLMD gRPC server hostname |
Outputs
| Name | Type | Description |
|---|---|---|
| MLMD Records | side effect | Execution, artifact, context, and event records in MLMD store |
| Pod Patches | side effect | Labels: metadata_execution_id, metadata_context_id, metadata_written |
| Debug files | files | YAML dumps written to /tmp/ for debugging |
Usage Examples
Deployment Configuration
# The metadata writer runs as a Kubernetes Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: metadata-writer
spec:
replicas: 1
template:
spec:
containers:
- name: main
image: gcr.io/ml-pipeline/metadata-writer
env:
- name: METADATA_GRPC_SERVICE_HOST
value: "metadata-grpc-service"
- name: METADATA_GRPC_SERVICE_PORT
value: "8080"