Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Datahub project Datahub Ingest CLI Run

From Leeroopedia
Revision as of 14:43, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Datahub_project_Datahub_Ingest_CLI_Run.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Field Value
Implementation Name Ingest CLI Run
Overview Concrete tool for executing metadata ingestion pipelines via the DataHub CLI datahub ingest run command.
Type API Doc
Implements Datahub_project_Datahub_Batch_Ingestion_Execution
Status Active
Domains Data_Integration, Metadata_Management
Source DataHub Repository -- metadata-ingestion/src/datahub/cli/ingest_cli.py (lines 45-258) and metadata-ingestion/src/datahub/ingestion/run/pipeline.py (lines 418-570)
Last Updated 2026-02-10
Knowledge Sources DataHub Repository

Description

The datahub ingest run command is the primary CLI entry point for executing a metadata ingestion pipeline. It reads a YAML recipe file, creates a Pipeline instance, executes the full ETL cycle, and reports results.

The command is implemented as a Click command in ingest_cli.py that delegates to the Pipeline class in pipeline.py. The execution flow is:

  1. Load and parse the recipe YAML file via load_config_file()
  2. Create a Pipeline instance via Pipeline.create(config_dict)
  3. Execute the pipeline via pipeline.run()
  4. Print a summary report via pipeline.pretty_print_summary()
  5. Exit with appropriate return code (0 for success, 1 for failures)

CLI Command Signature

datahub ingest run -c <config_file> [OPTIONS]

CLI Options

Option Type Default Description
-c, --config string (file path) (required) Path to the recipe config file in YAML or TOML format.
-n, --dry-run flag False Perform a dry run, skipping writes to the sink.
--preview flag False Perform limited ingestion for a quick preview.
--preview-workunits integer 10 Number of work units to produce in preview mode.
--strict-warnings/--no-strict-warnings flag False If enabled, warnings yield a non-zero exit code.
--test-source-connection flag False Only test the source connection details from the recipe.
--report-to string "datahub" Destination for structured reports. Default sends to DataHub server; any other value is treated as a file path.
--no-default-report flag False Turn off default reporting to DataHub.
--no-spinner flag False Turn off the progress spinner.
--no-progress flag False Mute intermediate progress reports.
--record flag False Enable recording of the ingestion run for debugging.
--record-password string None Password for encrypting the recording archive.
--record-output-path string None Path to save the recording archive.

Python Function Signature

# ingest_cli.py, lines 160-176
def run(
    config: str,
    dry_run: bool,
    preview: bool,
    strict_warnings: bool,
    preview_workunits: int,
    test_source_connection: bool,
    report_to: Optional[str],
    no_default_report: bool,
    no_spinner: bool,
    no_progress: bool,
    record: bool,
    record_password: Optional[str],
    record_output_path: Optional[str],
    no_s3_upload: bool,
    no_secret_redaction: bool,
) -> None:

Pipeline.create() Class Method

# pipeline.py, lines 418-437
@classmethod
def create(
    cls,
    config_dict: dict,
    dry_run: bool = False,
    preview_mode: bool = False,
    preview_workunits: int = 10,
    report_to: Optional[str] = "datahub",
    no_progress: bool = False,
    raw_config: Optional[dict] = None,
) -> "Pipeline":
    config = PipelineConfig.from_dict(config_dict, raw_config)
    return cls(
        config,
        dry_run=dry_run,
        preview_mode=preview_mode,
        preview_workunits=preview_workunits,
        report_to=report_to,
        no_progress=no_progress,
    )

Pipeline.run() Method

# pipeline.py, lines 461-570
def run(self) -> None:
    self._set_platform()
    self._warn_old_cli_version()
    with self.exit_stack, self.inner_exit_stack:
        # ... memory profiling setup ...
        self.final_status = PipelineStatus.UNKNOWN
        self._notify_reporters_on_ingestion_start()
        try:
            for wu in itertools.islice(
                self.source.get_workunits(),
                self.preview_workunits if self.preview_mode else None,
            ):
                record_envelopes = list(self.extractor.get_records(wu))
                for record_envelope in self.transform(record_envelopes):
                    if not self.dry_run:
                        self.sink.write_record_async(record_envelope, callback)

            # Process end-of-stream for transformers
            # Close source for stateful ingestion
            self.inner_exit_stack.close()
            self.process_commits()
            self.final_status = PipelineStatus.COMPLETED
        except (SystemExit, KeyboardInterrupt):
            self.final_status = PipelineStatus.CANCELLED
            raise
        except Exception as exc:
            self.final_status = PipelineStatus.ERROR
            self._handle_uncaught_pipeline_exception(exc)

Import

from datahub.ingestion.run.pipeline import Pipeline

I/O Contract

Inputs

  • A recipe YAML file path (via -c flag) or a config dictionary (via Pipeline.create())
  • Optional CLI flags for dry run, preview, reporting, and recording

Outputs

  • Metadata change proposals emitted to the configured sink (REST API, Kafka, or file)
  • A structured ingestion report printed to stdout
  • An ingestion run summary sent to the DataHub server (unless reporting is disabled)
  • Exit code: 0 for success, 1 for failures or strict-warning mode with warnings

Usage Examples

Basic Ingestion

datahub ingest -c recipe.yml

Dry Run

datahub ingest -c recipe.yml --dry-run

Preview Mode

datahub ingest -c recipe.yml --preview --preview-workunits 5

Test Source Connection

datahub ingest -c recipe.yml --test-source-connection

Programmatic Usage

from datahub.ingestion.run.pipeline import Pipeline

config = {
    "source": {
        "type": "snowflake",
        "config": {
            "account_id": "myaccount",
            "username": "user",
            "password": "pass",
        },
    },
}

pipeline = Pipeline.create(config)
pipeline.run()
ret = pipeline.pretty_print_summary()

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment