Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Datahub project Datahub Ingest CLI Run

From Leeroopedia


Field Value
Implementation Name Ingest CLI Run
Overview Concrete tool for executing metadata ingestion pipelines via the DataHub CLI datahub ingest run command.
Type API Doc
Implements Datahub_project_Datahub_Batch_Ingestion_Execution
Status Active
Domains Data_Integration, Metadata_Management
Source DataHub Repository -- metadata-ingestion/src/datahub/cli/ingest_cli.py (lines 45-258) and metadata-ingestion/src/datahub/ingestion/run/pipeline.py (lines 418-570)
Last Updated 2026-02-10
Knowledge Sources DataHub Repository

Description

The datahub ingest run command is the primary CLI entry point for executing a metadata ingestion pipeline. It reads a YAML recipe file, creates a Pipeline instance, executes the full ETL cycle, and reports results.

The command is implemented as a Click command in ingest_cli.py that delegates to the Pipeline class in pipeline.py. The execution flow is:

  1. Load and parse the recipe YAML file via load_config_file()
  2. Create a Pipeline instance via Pipeline.create(config_dict)
  3. Execute the pipeline via pipeline.run()
  4. Print a summary report via pipeline.pretty_print_summary()
  5. Exit with appropriate return code (0 for success, 1 for failures)

CLI Command Signature

datahub ingest run -c <config_file> [OPTIONS]

CLI Options

Option Type Default Description
-c, --config string (file path) (required) Path to the recipe config file in YAML or TOML format.
-n, --dry-run flag False Perform a dry run, skipping writes to the sink.
--preview flag False Perform limited ingestion for a quick preview.
--preview-workunits integer 10 Number of work units to produce in preview mode.
--strict-warnings/--no-strict-warnings flag False If enabled, warnings yield a non-zero exit code.
--test-source-connection flag False Only test the source connection details from the recipe.
--report-to string "datahub" Destination for structured reports. Default sends to DataHub server; any other value is treated as a file path.
--no-default-report flag False Turn off default reporting to DataHub.
--no-spinner flag False Turn off the progress spinner.
--no-progress flag False Mute intermediate progress reports.
--record flag False Enable recording of the ingestion run for debugging.
--record-password string None Password for encrypting the recording archive.
--record-output-path string None Path to save the recording archive.

Python Function Signature

# ingest_cli.py, lines 160-176
def run(
    config: str,
    dry_run: bool,
    preview: bool,
    strict_warnings: bool,
    preview_workunits: int,
    test_source_connection: bool,
    report_to: Optional[str],
    no_default_report: bool,
    no_spinner: bool,
    no_progress: bool,
    record: bool,
    record_password: Optional[str],
    record_output_path: Optional[str],
    no_s3_upload: bool,
    no_secret_redaction: bool,
) -> None:

Pipeline.create() Class Method

# pipeline.py, lines 418-437
@classmethod
def create(
    cls,
    config_dict: dict,
    dry_run: bool = False,
    preview_mode: bool = False,
    preview_workunits: int = 10,
    report_to: Optional[str] = "datahub",
    no_progress: bool = False,
    raw_config: Optional[dict] = None,
) -> "Pipeline":
    config = PipelineConfig.from_dict(config_dict, raw_config)
    return cls(
        config,
        dry_run=dry_run,
        preview_mode=preview_mode,
        preview_workunits=preview_workunits,
        report_to=report_to,
        no_progress=no_progress,
    )

Pipeline.run() Method

# pipeline.py, lines 461-570
def run(self) -> None:
    self._set_platform()
    self._warn_old_cli_version()
    with self.exit_stack, self.inner_exit_stack:
        # ... memory profiling setup ...
        self.final_status = PipelineStatus.UNKNOWN
        self._notify_reporters_on_ingestion_start()
        try:
            for wu in itertools.islice(
                self.source.get_workunits(),
                self.preview_workunits if self.preview_mode else None,
            ):
                record_envelopes = list(self.extractor.get_records(wu))
                for record_envelope in self.transform(record_envelopes):
                    if not self.dry_run:
                        self.sink.write_record_async(record_envelope, callback)

            # Process end-of-stream for transformers
            # Close source for stateful ingestion
            self.inner_exit_stack.close()
            self.process_commits()
            self.final_status = PipelineStatus.COMPLETED
        except (SystemExit, KeyboardInterrupt):
            self.final_status = PipelineStatus.CANCELLED
            raise
        except Exception as exc:
            self.final_status = PipelineStatus.ERROR
            self._handle_uncaught_pipeline_exception(exc)

Import

from datahub.ingestion.run.pipeline import Pipeline

I/O Contract

Inputs

  • A recipe YAML file path (via -c flag) or a config dictionary (via Pipeline.create())
  • Optional CLI flags for dry run, preview, reporting, and recording

Outputs

  • Metadata change proposals emitted to the configured sink (REST API, Kafka, or file)
  • A structured ingestion report printed to stdout
  • An ingestion run summary sent to the DataHub server (unless reporting is disabled)
  • Exit code: 0 for success, 1 for failures or strict-warning mode with warnings

Usage Examples

Basic Ingestion

datahub ingest -c recipe.yml

Dry Run

datahub ingest -c recipe.yml --dry-run

Preview Mode

datahub ingest -c recipe.yml --preview --preview-workunits 5

Test Source Connection

datahub ingest -c recipe.yml --test-source-connection

Programmatic Usage

from datahub.ingestion.run.pipeline import Pipeline

config = {
    "source": {
        "type": "snowflake",
        "config": {
            "account_id": "myaccount",
            "username": "user",
            "password": "pass",
        },
    },
}

pipeline = Pipeline.create(config)
pipeline.run()
ret = pipeline.pretty_print_summary()

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment