Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Datahub project Datahub Scheduled Ingestion Orchestration

From Leeroopedia


Field Value
Implementation Name Scheduled Ingestion Orchestration
Overview Concrete tool for wrapping the DataHub ingest CLI command in external schedulers like Airflow, cron, or Kubernetes CronJobs, as well as using DataHub's built-in server-side scheduling.
Type External Tool Doc
Implements Datahub_project_Datahub_Scheduled_Ingestion
Status Active
Domains Data_Integration, Metadata_Management
Source DataHub Repository -- metadata-ingestion/src/datahub/cli/ingest_cli.py (lines 45-258 for run command, lines 260-388 for deploy command)
Last Updated 2026-02-10
Knowledge Sources DataHub Repository

Description

Scheduled ingestion orchestration wraps the datahub ingest run CLI command in external scheduling systems to automate recurring metadata ingestion. Since the CLI is a standard Unix command that reads a recipe file and exits with a status code, it integrates seamlessly with any scheduler that can execute shell commands.

Additionally, DataHub provides a built-in server-side scheduling mechanism via the datahub ingest deploy command, which registers a recipe with the DataHub server along with a cron schedule. The server then executes the recipe at the specified intervals using its built-in executor.

Key Parameters

The parameters for the underlying datahub ingest run command remain the same as documented in Datahub_project_Datahub_Ingest_CLI_Run. The scheduling configuration is external to the CLI itself.

For server-side scheduling via datahub ingest deploy:

Parameter Type Default Description
-c, --config string (file path) (required) Path to the recipe config file.
-n, --name string None Recipe name for identification.
--urn string None URN of an existing recipe to update.
--schedule string (cron) None Cron expression for scheduling (e.g., "0 2 * * *" for daily at 2 AM).
--time-zone string UTC Timezone for the schedule (e.g., "America/New_York").
--executor-id string None Custom executor ID for routing execution requests.
--cli-version string None Custom CLI version to use for ingestion.

I/O Contract

Inputs

  • A recipe YAML file (same as datahub ingest run)
  • Scheduling configuration (cron expression, Airflow schedule, etc.)
  • Environment variables for credentials and secrets

Outputs

  • Periodic metadata ingestion runs with independent execution reports
  • Exit codes per run: 0 for success, 1 for failures
  • Ingestion run summaries stored in DataHub for monitoring

Usage Examples

Cron (Unix Scheduler)

Schedule a nightly ingestion at 2:00 AM:

# crontab -e
0 2 * * * /usr/local/bin/datahub ingest -c /opt/datahub/recipes/snowflake.yml >> /var/log/datahub-ingest.log 2>&1

Apache Airflow (BashOperator)

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    "owner": "datahub",
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    dag_id="datahub_snowflake_ingestion",
    default_args=default_args,
    schedule_interval="0 2 * * *",  # Daily at 2 AM
    start_date=datetime(2026, 1, 1),
    catchup=False,
) as dag:

    ingest_snowflake = BashOperator(
        task_id="ingest_snowflake_metadata",
        bash_command="datahub ingest -c /opt/datahub/recipes/snowflake.yml",
        env={
            "SNOWFLAKE_PASSWORD": "{{ var.value.snowflake_password }}",
            "DATAHUB_TOKEN": "{{ var.value.datahub_token }}",
        },
    )

Apache Airflow (PythonOperator)

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def run_datahub_ingestion():
    from datahub.ingestion.run.pipeline import Pipeline

    config = {
        "source": {
            "type": "snowflake",
            "config": {
                "account_id": "myaccount",
                "username": "datahub_user",
                "password": "{{ var.value.snowflake_password }}",
            },
        },
        "datahub_api": {
            "server": "http://datahub-gms:8080",
            "token": "{{ var.value.datahub_token }}",
        },
    }
    pipeline = Pipeline.create(config)
    pipeline.run()
    pipeline.raise_from_status()

with DAG(
    dag_id="datahub_snowflake_python",
    schedule_interval="0 2 * * *",
    start_date=datetime(2026, 1, 1),
    catchup=False,
) as dag:

    ingest_task = PythonOperator(
        task_id="ingest_snowflake",
        python_callable=run_datahub_ingestion,
    )

Kubernetes CronJob

apiVersion: batch/v1
kind: CronJob
metadata:
  name: datahub-snowflake-ingestion
spec:
  schedule: "0 2 * * *"
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: datahub-ingest
            image: acryldata/datahub-ingestion:latest
            command: ["datahub", "ingest", "-c", "/recipes/snowflake.yml"]
            envFrom:
            - secretRef:
                name: datahub-ingestion-secrets
            volumeMounts:
            - name: recipes
              mountPath: /recipes
          volumes:
          - name: recipes
            configMap:
              name: datahub-recipes
          restartPolicy: OnFailure

DataHub Server-Side Scheduling (datahub ingest deploy)

# Deploy a recipe with a daily schedule at 2 AM UTC
datahub ingest deploy \
  -c /opt/datahub/recipes/snowflake.yml \
  --name "Snowflake Nightly Sync" \
  --schedule "0 2 * * *" \
  --time-zone "America/New_York"

The deploy command registers the recipe with the DataHub server via the updateIngestionSource GraphQL mutation. The server then manages execution according to the cron schedule.

Deploy Command Signature

# ingest_cli.py, lines 327-338
def deploy(
    name: Optional[str],
    config: str,
    urn: Optional[str],
    executor_id: Optional[str],
    cli_version: Optional[str],
    schedule: Optional[str],
    time_zone: Optional[str],
    extra_pip: Optional[str],
    extra_env: Optional[str],
    debug: bool = False,
) -> None:

Monitoring

Regardless of the scheduling approach, each ingestion run produces:

  • CLI output -- A structured summary printed to stdout/stderr with success/failure counts
  • DataHub run report -- An ingestion run summary stored in DataHub (accessible via the DataHub UI under "Ingestion" tab)
  • Exit code -- 0 for success, 1 for failures, enabling scheduler-level alerting

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment