Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Kubeflow Pipelines Metadata Writer

From Leeroopedia
Revision as of 13:11, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Kubeflow_Pipelines_Metadata_Writer.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Knowledge Sources
Domains Metadata_Management, Lineage_Tracking, Kubernetes
Last Updated 2026-02-13 14:00 GMT

Overview

Long-running Kubernetes pod watcher that observes Argo Workflow pods and records their execution metadata into the ML Metadata (MLMD) store for KFP v1 pipelines.

Description

The metadata_writer module runs as a backend service (Deployment) that uses the Kubernetes watch API to stream pod events filtered by the `workflows.argoproj.io/workflow` label. For each pipeline step pod, it extracts component metadata from pod annotations and environment variables, creates MLMD execution and artifact records, and links them to the appropriate run context. It handles both input and output artifacts, supporting S3/MinIO URI construction.

Processing flow:

  1. Watches for Argo Workflow pods via Kubernetes watch API
  2. Skips TFX pods and KFP v2 pods (they have their own metadata writers)
  3. Extracts Argo template, component spec, and arguments from pod metadata
  4. Creates or retrieves an MLMD run context for the workflow
  5. Creates execution records and links input artifacts by URI
  6. Patches pods with `metadata_execution_id` and `metadata_context_id` labels
  7. On pod completion, records output artifacts from `workflows.argoproj.io/outputs` annotation
  8. Patches pods with `metadata_written=true` label

Usage

This module is deployed as the `metadata-writer` Kubernetes Deployment. It is not imported directly by user code but runs as a backend service automatically recording lineage for KFP v1 pipelines.

Code Reference

Source Location

Signature

def patch_pod_metadata(namespace, pod_name, patch_dict) -> None:
    """Patches a Kubernetes pod with metadata labels."""

def output_name_to_argo(name: str) -> str:
    """Converts component output name to Argo annotation key format."""

def is_s3_endpoint(endpoint: str) -> bool:
    """Checks if an endpoint URL is S3-compatible."""

def get_object_store_provider(endpoint: str) -> str:
    """Returns 's3' or 'minio' based on endpoint URL."""

def argo_artifact_to_uri(artifact: dict) -> str:
    """Converts Argo artifact annotation to storage URI (s3://, gs://, etc.)."""

def is_tfx_pod(pod: V1Pod) -> bool:
    """Returns True if the pod belongs to a TFX pipeline (skipped by this writer)."""

def is_kfp_v2_pod(pod: V1Pod) -> bool:
    """Returns True if the pod is a KFP v2 pod (has its own metadata handling)."""

Import

# Not typically imported; runs as a standalone service
# Internal imports:
from metadata_helpers import connect_to_mlmd, get_or_create_run_context

I/O Contract

Inputs

Name Type Required Description
Kubernetes Watch Events kubernetes.watch.Watch Yes Pod event stream filtered by Argo workflow label
Pod Annotations dict Yes Argo workflow annotations with template, outputs, component spec
METADATA_GRPC_SERVICE_HOST env var Yes MLMD gRPC server hostname

Outputs

Name Type Description
MLMD Records side effect Execution, artifact, context, and event records in MLMD store
Pod Patches side effect Labels: metadata_execution_id, metadata_context_id, metadata_written
Debug files files YAML dumps written to /tmp/ for debugging

Usage Examples

Deployment Configuration

# The metadata writer runs as a Kubernetes Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
  name: metadata-writer
spec:
  replicas: 1
  template:
    spec:
      containers:
      - name: main
        image: gcr.io/ml-pipeline/metadata-writer
        env:
        - name: METADATA_GRPC_SERVICE_HOST
          value: "metadata-grpc-service"
        - name: METADATA_GRPC_SERVICE_PORT
          value: "8080"

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment