Implementation:Datahub project Datahub DatahubRestSink Write Record
| Property | Value |
|---|---|
| Page Type | Implementation (API Doc) |
| Workflow | Metadata_Ingestion_Pipeline |
| API | DatahubRestSink.write_record_async(record_envelope: RecordEnvelope, write_callback: WriteCallback) -> None
|
| Source File | metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py
|
| Repository | https://github.com/datahub-project/datahub |
| Implements | Principle:Datahub_project_Datahub_Metadata_Sink_Verification |
| Last Updated | 2026-02-09 17:00 GMT |
Overview
Description
The DatahubRestSink class is the primary sink implementation for delivering metadata records to a DataHub instance via the GMS REST API. Its write_record_async() method accepts a RecordEnvelope containing a metadata change event (MCE, MCP, or MCPWrapper) and a WriteCallback for delivery confirmation, then dispatches the record for emission according to the configured delivery mode.
The sink supports three delivery modes:
- SYNC: Records are emitted synchronously on the calling thread. The callback is invoked immediately upon success or failure.
- ASYNC: Records are submitted to a
PartitionExecutorthread pool, partitioned by entity URN to preserve per-entity ordering. The callback is invoked when the future completes. - ASYNC_BATCH: Records are submitted to a
BatchPartitionExecutorthat groups records into batches (up tomax_per_batch) and emits them via theingestProposalBatchendpoint. This is the most efficient mode for high-volume ingestion.
The sink uses one DataHubRestEmitter per thread (via threading.local()) to ensure thread-safe HTTP session management.
Usage
The write_record_async() method is called by the Pipeline.run() loop for each transformed record. Users do not typically call this method directly; it is invoked by the pipeline orchestration layer.
# Called internally by Pipeline.run() for each record:
self.sink.write_record_async(record_envelope, callback)
Code Reference
Source Location
| File | Lines | Description |
|---|---|---|
metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py |
L72-92 | DatahubRestSinkConfig - configuration class with mode, endpoint, threading params
|
metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py |
L132-367 | DatahubRestSink - sink class with initialization, write, and close methods
|
metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py |
L296-343 | write_record_async() - main write dispatch method
|
metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py |
L205-249 | _write_done_callback() - future completion handler with error classification
|
metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py |
L263-294 | _emit_batch_wrapper() - batch emission for ASYNC_BATCH mode
|
metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py |
L57-69 | RestSinkMode enum (SYNC, ASYNC, ASYNC_BATCH)
|
Signature
class DatahubRestSinkConfig(DatahubClientConfig):
mode: RestSinkMode = _DEFAULT_REST_SINK_MODE # Default: ASYNC_BATCH
endpoint: RestSinkEndpoint = DEFAULT_REST_EMITTER_ENDPOINT
max_threads: pydantic.PositiveInt = _DEFAULT_REST_SINK_MAX_THREADS
max_pending_requests: pydantic.PositiveInt = 2000
max_per_batch: pydantic.PositiveInt = 100
class DatahubRestSink(Sink[DatahubRestSinkConfig, DataHubRestSinkReport]):
def write_record_async(
self,
record_envelope: RecordEnvelope[
Union[
MetadataChangeEvent,
MetadataChangeProposal,
MetadataChangeProposalWrapper,
]
],
write_callback: WriteCallback,
) -> None:
...
def close(self) -> None:
...
Import
from datahub.ingestion.sink.datahub_rest import DatahubRestSink, DatahubRestSinkConfig
I/O Contract
| Direction | Type | Description |
|---|---|---|
| Input | record_envelope: RecordEnvelope |
Envelope containing a MetadataChangeEvent, MetadataChangeProposal, or MetadataChangeProposalWrapper, plus metadata dict with workunit_id
|
| Input | write_callback: WriteCallback |
Callback object with on_success(record, metadata) and on_failure(record, exception, metadata) methods
|
| Output | None |
Method returns immediately in async modes; callback is invoked when write completes |
| Side Effect | HTTP POST to GMS | Record emitted to DataHub GMS server via REST API |
| Side Effect | Report updated | DataHubRestSinkReport updated with record written/failure/warning counts and pending request count
|
Configuration parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
mode |
RestSinkMode |
ASYNC_BATCH |
Delivery mode: SYNC, ASYNC, or ASYNC_BATCH |
endpoint |
RestSinkEndpoint |
Default REST endpoint | REST API endpoint variant |
max_threads |
PositiveInt |
Environment-dependent | Number of worker threads for async modes |
max_pending_requests |
PositiveInt |
2000 | Maximum in-flight write operations before backpressure blocks the main thread |
max_per_batch |
PositiveInt |
100 | Maximum records per batch in ASYNC_BATCH mode |
Callback behavior by mode:
| Mode | Execution | Callback Timing |
|---|---|---|
| SYNC | Record emitted synchronously on calling thread | Invoked immediately after emit completes or fails |
| ASYNC | Record submitted to PartitionExecutor, partitioned by URN |
Invoked when the future completes via _write_done_callback
|
| ASYNC_BATCH | Record submitted to BatchPartitionExecutor, batched and emitted via ingestProposalBatch |
Invoked when the batch future completes via _write_done_callback
|
Usage Examples
Example 1: Default ASYNC_BATCH mode via pipeline
from datahub.ingestion.run.pipeline import Pipeline
recipe = {
"source": {
"type": "mysql",
"config": {
"host_port": "localhost:3306",
"database": "analytics",
"username": "datahub",
"password": "datahub",
},
},
"sink": {
"type": "datahub-rest",
"config": {
"server": "http://localhost:8080",
"mode": "ASYNC_BATCH",
"max_threads": 15,
"max_per_batch": 100,
},
},
}
pipeline = Pipeline.create(recipe)
pipeline.run()
# Sink report shows: total_records_written, pending_requests, async_batches_prepared
pipeline.pretty_print_summary()
Example 2: Synchronous mode for debugging
recipe = {
"source": {"type": "file", "config": {"path": "metadata.json"}},
"sink": {
"type": "datahub-rest",
"config": {
"server": "http://localhost:8080",
"mode": "SYNC",
},
},
}
pipeline = Pipeline.create(recipe)
pipeline.run()
pipeline.pretty_print_summary()
Example 3: Direct sink usage (advanced)
from datahub.ingestion.sink.datahub_rest import DatahubRestSink, DatahubRestSinkConfig
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
from datahub.ingestion.api.sink import NoopWriteCallback
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.schema_classes import DatasetPropertiesClass
ctx = PipelineContext(run_id="manual-run")
config = DatahubRestSinkConfig(server="http://localhost:8080")
with DatahubRestSink.create(config.model_dump(), ctx) as sink:
mcp = MetadataChangeProposalWrapper(
entityUrn="urn:li:dataset:(urn:li:dataPlatform:mysql,db.table,PROD)",
aspect=DatasetPropertiesClass(description="Example dataset"),
)
envelope = RecordEnvelope(record=mcp, metadata={})
sink.write_record_async(envelope, NoopWriteCallback())