Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Datahub project Datahub DatahubRestSink Write Record

From Leeroopedia


Property Value
Page Type Implementation (API Doc)
Workflow Metadata_Ingestion_Pipeline
API DatahubRestSink.write_record_async(record_envelope: RecordEnvelope, write_callback: WriteCallback) -> None
Source File metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py
Repository https://github.com/datahub-project/datahub
Implements Principle:Datahub_project_Datahub_Metadata_Sink_Verification
Last Updated 2026-02-09 17:00 GMT

Overview

Description

The DatahubRestSink class is the primary sink implementation for delivering metadata records to a DataHub instance via the GMS REST API. Its write_record_async() method accepts a RecordEnvelope containing a metadata change event (MCE, MCP, or MCPWrapper) and a WriteCallback for delivery confirmation, then dispatches the record for emission according to the configured delivery mode.

The sink supports three delivery modes:

  • SYNC: Records are emitted synchronously on the calling thread. The callback is invoked immediately upon success or failure.
  • ASYNC: Records are submitted to a PartitionExecutor thread pool, partitioned by entity URN to preserve per-entity ordering. The callback is invoked when the future completes.
  • ASYNC_BATCH: Records are submitted to a BatchPartitionExecutor that groups records into batches (up to max_per_batch) and emits them via the ingestProposalBatch endpoint. This is the most efficient mode for high-volume ingestion.

The sink uses one DataHubRestEmitter per thread (via threading.local()) to ensure thread-safe HTTP session management.

Usage

The write_record_async() method is called by the Pipeline.run() loop for each transformed record. Users do not typically call this method directly; it is invoked by the pipeline orchestration layer.

# Called internally by Pipeline.run() for each record:
self.sink.write_record_async(record_envelope, callback)

Code Reference

Source Location

File Lines Description
metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py L72-92 DatahubRestSinkConfig - configuration class with mode, endpoint, threading params
metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py L132-367 DatahubRestSink - sink class with initialization, write, and close methods
metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py L296-343 write_record_async() - main write dispatch method
metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py L205-249 _write_done_callback() - future completion handler with error classification
metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py L263-294 _emit_batch_wrapper() - batch emission for ASYNC_BATCH mode
metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py L57-69 RestSinkMode enum (SYNC, ASYNC, ASYNC_BATCH)

Signature

class DatahubRestSinkConfig(DatahubClientConfig):
    mode: RestSinkMode = _DEFAULT_REST_SINK_MODE     # Default: ASYNC_BATCH
    endpoint: RestSinkEndpoint = DEFAULT_REST_EMITTER_ENDPOINT
    max_threads: pydantic.PositiveInt = _DEFAULT_REST_SINK_MAX_THREADS
    max_pending_requests: pydantic.PositiveInt = 2000
    max_per_batch: pydantic.PositiveInt = 100


class DatahubRestSink(Sink[DatahubRestSinkConfig, DataHubRestSinkReport]):

    def write_record_async(
        self,
        record_envelope: RecordEnvelope[
            Union[
                MetadataChangeEvent,
                MetadataChangeProposal,
                MetadataChangeProposalWrapper,
            ]
        ],
        write_callback: WriteCallback,
    ) -> None:
        ...

    def close(self) -> None:
        ...

Import

from datahub.ingestion.sink.datahub_rest import DatahubRestSink, DatahubRestSinkConfig

I/O Contract

Direction Type Description
Input record_envelope: RecordEnvelope Envelope containing a MetadataChangeEvent, MetadataChangeProposal, or MetadataChangeProposalWrapper, plus metadata dict with workunit_id
Input write_callback: WriteCallback Callback object with on_success(record, metadata) and on_failure(record, exception, metadata) methods
Output None Method returns immediately in async modes; callback is invoked when write completes
Side Effect HTTP POST to GMS Record emitted to DataHub GMS server via REST API
Side Effect Report updated DataHubRestSinkReport updated with record written/failure/warning counts and pending request count

Configuration parameters:

Parameter Type Default Description
mode RestSinkMode ASYNC_BATCH Delivery mode: SYNC, ASYNC, or ASYNC_BATCH
endpoint RestSinkEndpoint Default REST endpoint REST API endpoint variant
max_threads PositiveInt Environment-dependent Number of worker threads for async modes
max_pending_requests PositiveInt 2000 Maximum in-flight write operations before backpressure blocks the main thread
max_per_batch PositiveInt 100 Maximum records per batch in ASYNC_BATCH mode

Callback behavior by mode:

Mode Execution Callback Timing
SYNC Record emitted synchronously on calling thread Invoked immediately after emit completes or fails
ASYNC Record submitted to PartitionExecutor, partitioned by URN Invoked when the future completes via _write_done_callback
ASYNC_BATCH Record submitted to BatchPartitionExecutor, batched and emitted via ingestProposalBatch Invoked when the batch future completes via _write_done_callback

Usage Examples

Example 1: Default ASYNC_BATCH mode via pipeline

from datahub.ingestion.run.pipeline import Pipeline

recipe = {
    "source": {
        "type": "mysql",
        "config": {
            "host_port": "localhost:3306",
            "database": "analytics",
            "username": "datahub",
            "password": "datahub",
        },
    },
    "sink": {
        "type": "datahub-rest",
        "config": {
            "server": "http://localhost:8080",
            "mode": "ASYNC_BATCH",
            "max_threads": 15,
            "max_per_batch": 100,
        },
    },
}

pipeline = Pipeline.create(recipe)
pipeline.run()
# Sink report shows: total_records_written, pending_requests, async_batches_prepared
pipeline.pretty_print_summary()

Example 2: Synchronous mode for debugging

recipe = {
    "source": {"type": "file", "config": {"path": "metadata.json"}},
    "sink": {
        "type": "datahub-rest",
        "config": {
            "server": "http://localhost:8080",
            "mode": "SYNC",
        },
    },
}

pipeline = Pipeline.create(recipe)
pipeline.run()
pipeline.pretty_print_summary()

Example 3: Direct sink usage (advanced)

from datahub.ingestion.sink.datahub_rest import DatahubRestSink, DatahubRestSinkConfig
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
from datahub.ingestion.api.sink import NoopWriteCallback
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.schema_classes import DatasetPropertiesClass

ctx = PipelineContext(run_id="manual-run")
config = DatahubRestSinkConfig(server="http://localhost:8080")

with DatahubRestSink.create(config.model_dump(), ctx) as sink:
    mcp = MetadataChangeProposalWrapper(
        entityUrn="urn:li:dataset:(urn:li:dataPlatform:mysql,db.table,PROD)",
        aspect=DatasetPropertiesClass(description="Example dataset"),
    )
    envelope = RecordEnvelope(record=mcp, metadata={})
    sink.write_record_async(envelope, NoopWriteCallback())

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment