Implementation:Datahub project Datahub Scheduled Ingestion Orchestration
| Field | Value |
|---|---|
| Implementation Name | Scheduled Ingestion Orchestration |
| Overview | Concrete tool for wrapping the DataHub ingest CLI command in external schedulers like Airflow, cron, or Kubernetes CronJobs, as well as using DataHub's built-in server-side scheduling. |
| Type | External Tool Doc |
| Implements | Datahub_project_Datahub_Scheduled_Ingestion |
| Status | Active |
| Domains | Data_Integration, Metadata_Management |
| Source | DataHub Repository -- metadata-ingestion/src/datahub/cli/ingest_cli.py (lines 45-258 for run command, lines 260-388 for deploy command)
|
| Last Updated | 2026-02-10 |
| Knowledge Sources | DataHub Repository |
Description
Scheduled ingestion orchestration wraps the datahub ingest run CLI command in external scheduling systems to automate recurring metadata ingestion. Since the CLI is a standard Unix command that reads a recipe file and exits with a status code, it integrates seamlessly with any scheduler that can execute shell commands.
Additionally, DataHub provides a built-in server-side scheduling mechanism via the datahub ingest deploy command, which registers a recipe with the DataHub server along with a cron schedule. The server then executes the recipe at the specified intervals using its built-in executor.
Key Parameters
The parameters for the underlying datahub ingest run command remain the same as documented in Datahub_project_Datahub_Ingest_CLI_Run. The scheduling configuration is external to the CLI itself.
For server-side scheduling via datahub ingest deploy:
| Parameter | Type | Default | Description |
|---|---|---|---|
-c, --config |
string (file path) | (required) | Path to the recipe config file. |
-n, --name |
string | None |
Recipe name for identification. |
--urn |
string | None |
URN of an existing recipe to update. |
--schedule |
string (cron) | None |
Cron expression for scheduling (e.g., "0 2 * * *" for daily at 2 AM).
|
--time-zone |
string | UTC | Timezone for the schedule (e.g., "America/New_York").
|
--executor-id |
string | None |
Custom executor ID for routing execution requests. |
--cli-version |
string | None |
Custom CLI version to use for ingestion. |
I/O Contract
Inputs
- A recipe YAML file (same as
datahub ingest run) - Scheduling configuration (cron expression, Airflow schedule, etc.)
- Environment variables for credentials and secrets
Outputs
- Periodic metadata ingestion runs with independent execution reports
- Exit codes per run: 0 for success, 1 for failures
- Ingestion run summaries stored in DataHub for monitoring
Usage Examples
Cron (Unix Scheduler)
Schedule a nightly ingestion at 2:00 AM:
# crontab -e
0 2 * * * /usr/local/bin/datahub ingest -c /opt/datahub/recipes/snowflake.yml >> /var/log/datahub-ingest.log 2>&1
Apache Airflow (BashOperator)
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
"owner": "datahub",
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="datahub_snowflake_ingestion",
default_args=default_args,
schedule_interval="0 2 * * *", # Daily at 2 AM
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:
ingest_snowflake = BashOperator(
task_id="ingest_snowflake_metadata",
bash_command="datahub ingest -c /opt/datahub/recipes/snowflake.yml",
env={
"SNOWFLAKE_PASSWORD": "{{ var.value.snowflake_password }}",
"DATAHUB_TOKEN": "{{ var.value.datahub_token }}",
},
)
Apache Airflow (PythonOperator)
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def run_datahub_ingestion():
from datahub.ingestion.run.pipeline import Pipeline
config = {
"source": {
"type": "snowflake",
"config": {
"account_id": "myaccount",
"username": "datahub_user",
"password": "{{ var.value.snowflake_password }}",
},
},
"datahub_api": {
"server": "http://datahub-gms:8080",
"token": "{{ var.value.datahub_token }}",
},
}
pipeline = Pipeline.create(config)
pipeline.run()
pipeline.raise_from_status()
with DAG(
dag_id="datahub_snowflake_python",
schedule_interval="0 2 * * *",
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:
ingest_task = PythonOperator(
task_id="ingest_snowflake",
python_callable=run_datahub_ingestion,
)
Kubernetes CronJob
apiVersion: batch/v1
kind: CronJob
metadata:
name: datahub-snowflake-ingestion
spec:
schedule: "0 2 * * *"
jobTemplate:
spec:
template:
spec:
containers:
- name: datahub-ingest
image: acryldata/datahub-ingestion:latest
command: ["datahub", "ingest", "-c", "/recipes/snowflake.yml"]
envFrom:
- secretRef:
name: datahub-ingestion-secrets
volumeMounts:
- name: recipes
mountPath: /recipes
volumes:
- name: recipes
configMap:
name: datahub-recipes
restartPolicy: OnFailure
DataHub Server-Side Scheduling (datahub ingest deploy)
# Deploy a recipe with a daily schedule at 2 AM UTC
datahub ingest deploy \
-c /opt/datahub/recipes/snowflake.yml \
--name "Snowflake Nightly Sync" \
--schedule "0 2 * * *" \
--time-zone "America/New_York"
The deploy command registers the recipe with the DataHub server via the updateIngestionSource GraphQL mutation. The server then manages execution according to the cron schedule.
Deploy Command Signature
# ingest_cli.py, lines 327-338
def deploy(
name: Optional[str],
config: str,
urn: Optional[str],
executor_id: Optional[str],
cli_version: Optional[str],
schedule: Optional[str],
time_zone: Optional[str],
extra_pip: Optional[str],
extra_env: Optional[str],
debug: bool = False,
) -> None:
Monitoring
Regardless of the scheduling approach, each ingestion run produces:
- CLI output -- A structured summary printed to stdout/stderr with success/failure counts
- DataHub run report -- An ingestion run summary stored in DataHub (accessible via the DataHub UI under "Ingestion" tab)
- Exit code -- 0 for success, 1 for failures, enabling scheduler-level alerting