Implementation:Datahub project Datahub Ingest CLI Run
Appearance
| Field | Value |
|---|---|
| Implementation Name | Ingest CLI Run |
| Overview | Concrete tool for executing metadata ingestion pipelines via the DataHub CLI datahub ingest run command.
|
| Type | API Doc |
| Implements | Datahub_project_Datahub_Batch_Ingestion_Execution |
| Status | Active |
| Domains | Data_Integration, Metadata_Management |
| Source | DataHub Repository -- metadata-ingestion/src/datahub/cli/ingest_cli.py (lines 45-258) and metadata-ingestion/src/datahub/ingestion/run/pipeline.py (lines 418-570)
|
| Last Updated | 2026-02-10 |
| Knowledge Sources | DataHub Repository |
Description
The datahub ingest run command is the primary CLI entry point for executing a metadata ingestion pipeline. It reads a YAML recipe file, creates a Pipeline instance, executes the full ETL cycle, and reports results.
The command is implemented as a Click command in ingest_cli.py that delegates to the Pipeline class in pipeline.py. The execution flow is:
- Load and parse the recipe YAML file via
load_config_file() - Create a
Pipelineinstance viaPipeline.create(config_dict) - Execute the pipeline via
pipeline.run() - Print a summary report via
pipeline.pretty_print_summary() - Exit with appropriate return code (0 for success, 1 for failures)
CLI Command Signature
datahub ingest run -c <config_file> [OPTIONS]
CLI Options
| Option | Type | Default | Description |
|---|---|---|---|
-c, --config |
string (file path) | (required) | Path to the recipe config file in YAML or TOML format. |
-n, --dry-run |
flag | False |
Perform a dry run, skipping writes to the sink. |
--preview |
flag | False |
Perform limited ingestion for a quick preview. |
--preview-workunits |
integer | 10 |
Number of work units to produce in preview mode. |
--strict-warnings/--no-strict-warnings |
flag | False |
If enabled, warnings yield a non-zero exit code. |
--test-source-connection |
flag | False |
Only test the source connection details from the recipe. |
--report-to |
string | "datahub" |
Destination for structured reports. Default sends to DataHub server; any other value is treated as a file path. |
--no-default-report |
flag | False |
Turn off default reporting to DataHub. |
--no-spinner |
flag | False |
Turn off the progress spinner. |
--no-progress |
flag | False |
Mute intermediate progress reports. |
--record |
flag | False |
Enable recording of the ingestion run for debugging. |
--record-password |
string | None |
Password for encrypting the recording archive. |
--record-output-path |
string | None |
Path to save the recording archive. |
Python Function Signature
# ingest_cli.py, lines 160-176
def run(
config: str,
dry_run: bool,
preview: bool,
strict_warnings: bool,
preview_workunits: int,
test_source_connection: bool,
report_to: Optional[str],
no_default_report: bool,
no_spinner: bool,
no_progress: bool,
record: bool,
record_password: Optional[str],
record_output_path: Optional[str],
no_s3_upload: bool,
no_secret_redaction: bool,
) -> None:
Pipeline.create() Class Method
# pipeline.py, lines 418-437
@classmethod
def create(
cls,
config_dict: dict,
dry_run: bool = False,
preview_mode: bool = False,
preview_workunits: int = 10,
report_to: Optional[str] = "datahub",
no_progress: bool = False,
raw_config: Optional[dict] = None,
) -> "Pipeline":
config = PipelineConfig.from_dict(config_dict, raw_config)
return cls(
config,
dry_run=dry_run,
preview_mode=preview_mode,
preview_workunits=preview_workunits,
report_to=report_to,
no_progress=no_progress,
)
Pipeline.run() Method
# pipeline.py, lines 461-570
def run(self) -> None:
self._set_platform()
self._warn_old_cli_version()
with self.exit_stack, self.inner_exit_stack:
# ... memory profiling setup ...
self.final_status = PipelineStatus.UNKNOWN
self._notify_reporters_on_ingestion_start()
try:
for wu in itertools.islice(
self.source.get_workunits(),
self.preview_workunits if self.preview_mode else None,
):
record_envelopes = list(self.extractor.get_records(wu))
for record_envelope in self.transform(record_envelopes):
if not self.dry_run:
self.sink.write_record_async(record_envelope, callback)
# Process end-of-stream for transformers
# Close source for stateful ingestion
self.inner_exit_stack.close()
self.process_commits()
self.final_status = PipelineStatus.COMPLETED
except (SystemExit, KeyboardInterrupt):
self.final_status = PipelineStatus.CANCELLED
raise
except Exception as exc:
self.final_status = PipelineStatus.ERROR
self._handle_uncaught_pipeline_exception(exc)
Import
from datahub.ingestion.run.pipeline import Pipeline
I/O Contract
Inputs
- A recipe YAML file path (via
-cflag) or a config dictionary (viaPipeline.create()) - Optional CLI flags for dry run, preview, reporting, and recording
Outputs
- Metadata change proposals emitted to the configured sink (REST API, Kafka, or file)
- A structured ingestion report printed to stdout
- An ingestion run summary sent to the DataHub server (unless reporting is disabled)
- Exit code: 0 for success, 1 for failures or strict-warning mode with warnings
Usage Examples
Basic Ingestion
datahub ingest -c recipe.yml
Dry Run
datahub ingest -c recipe.yml --dry-run
Preview Mode
datahub ingest -c recipe.yml --preview --preview-workunits 5
Test Source Connection
datahub ingest -c recipe.yml --test-source-connection
Programmatic Usage
from datahub.ingestion.run.pipeline import Pipeline
config = {
"source": {
"type": "snowflake",
"config": {
"account_id": "myaccount",
"username": "user",
"password": "pass",
},
},
}
pipeline = Pipeline.create(config)
pipeline.run()
ret = pipeline.pretty_print_summary()
Related Pages
- Implements: Datahub_project_Datahub_Batch_Ingestion_Execution
- Related: Datahub_project_Datahub_PipelineConfig
- Related: Datahub_project_Datahub_DatahubClientConfig
- Related: Datahub_project_Datahub_Scheduled_Ingestion_Orchestration
- Environment: Environment:Datahub_project_Datahub_Python_3_10_Ingestion_Environment
- Heuristic: Heuristic:Datahub_project_Datahub_Batch_Size_And_Timeout_Tuning
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment