Implementation:Datahub project Datahub Proto2DataHub RestEmitter Emit
| Field | Value |
|---|---|
| Implementation Name | Proto2DataHub_RestEmitter_Emit |
| Type | API Doc |
| Workflow | Protobuf_Schema_Ingestion |
| Repository | https://github.com/datahub-project/datahub |
| Implements | Principle:Datahub_project_Datahub_Protobuf_Metadata_Emission |
| Last Updated | 2026-02-09 17:00 GMT |
Overview
Description
Proto2DataHub RestEmitter Emit documents the emission loop within the Proto2DataHub.main method that iterates over all Metadata Change Proposals produced by ProtobufDataset.getAllMetadataChangeProposals() and sends each one to DataHub using the RestEmitter.emit() method from the DataHub Java SDK. This is the final stage of the protobuf ingestion pipeline, where schema-derived metadata is persisted in the DataHub backend.
The emission uses the Emitter interface, which abstracts over the RestEmitter (HTTP REST transport) and FileEmitter (local file transport) implementations. Each MCP is emitted synchronously, with the future's .get() call blocking until the server acknowledges receipt.
Usage
The emission loop runs automatically as part of the Proto2DataHub.main method. It is not typically called directly by users, but understanding the emission pattern is important for debugging ingestion failures and monitoring pipeline throughput.
Code Reference
Source Location
metadata-integration/java/datahub-protobuf/src/main/java/datahub/protobuf/Proto2DataHub.java, lines 80-441.
The core emission loop is at lines 359-407:
filePathStream.forEach(
filePath -> {
totalFiles.incrementAndGet();
try {
String textSchema = Files.readString(filePath);
ProtobufDataset dataset = ProtobufDataset.builder()
.setDataPlatformUrn(new DataPlatformUrn(config.dataPlatform))
.setProtocIn(new FileInputStream(config.protoc))
.setEnableProtocCustomProperty(config.enableProtocCustomProperty)
.setFilename(filePath.toString())
.setSchema(textSchema)
.setAuditStamp(auditStamp)
.setFabricType(config.fabricType)
.setGithubOrganization(config.githubOrg)
.setSlackTeamId(config.slackId)
.setSubType(config.subType)
.setMessageName(config.messageName)
.build();
dataset.getAllMetadataChangeProposals()
.flatMap(Collection::stream)
.forEach(mcpw -> {
try {
finalEmitter.emit(mcpw, null).get();
totalEvents.getAndIncrement();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
} catch (Exception e) {
// Error handling with exit code tracking
}
});
Signature
Emitter interface (from DataHub Java SDK):
public interface Emitter {
Future<MetadataWriteResponse> emit(
MetadataChangeProposalWrapper mcpw,
Callback callback
) throws IOException;
void close() throws IOException;
}
RestEmitter factory (from DataHub Java SDK):
public class RestEmitter implements Emitter {
public static RestEmitter create(
Consumer<RestEmitterConfig.RestEmitterConfigBuilder> builderFunc
)
}
Import
import datahub.client.Emitter;
import datahub.client.rest.RestEmitter;
import datahub.client.file.FileEmitter;
import datahub.client.file.FileEmitterConfig;
import datahub.event.MetadataChangeProposalWrapper;
I/O Contract
| Direction | Type | Description |
|---|---|---|
| Input | MetadataChangeProposalWrapper |
A single MCP wrapping an aspect (e.g., SchemaMetadata, Ownership, GlobalTags) with entity URN, change type, and aspect name. |
| Input | Callback (nullable) |
Optional callback for async notification. Passed as null in Proto2DataHub since .get() is used for synchronous blocking.
|
| Output | Future<MetadataWriteResponse> |
A future that resolves to the server's response. The .get() call blocks until the MCP is acknowledged.
|
Emitter Instantiation by Transport
| Transport | Emitter Creation | Configuration |
|---|---|---|
rest |
RestEmitter.create(b -> b.server(config.datahubAPI).token(config.datahubToken)) |
Server URL and auth token. |
file |
new FileEmitter(FileEmitterConfig.builder().fileName(config.filename).build()) |
Output file path. |
kafka |
Not yet implemented | Throws UnsupportedOperationException.
|
Status Tracking
| Counter | Type | Description |
|---|---|---|
totalEvents |
AtomicInteger |
Incremented after each successful emit().get() call. Reports total MCPs emitted.
|
totalFiles |
AtomicInteger |
Incremented at the start of processing each file. Reports total files attempted. |
exitCode |
AtomicInteger |
Set to 1 if any file processing fails. Remains 0 on full success.
|
Usage Examples
REST Emission (Default)
java -jar datahub-protobuf.jar \
--descriptor schemas/events.protoc \
--file schemas/events.proto \
--datahub_api http://datahub-gms:8080 \
--datahub_token mytoken123
Output on success:
Successfully emitted 12 events for 1 files to DataHub REST
File Emission for Offline Processing
java -jar datahub-protobuf.jar \
--descriptor schemas/all.dsc \
--directory ./schemas \
--transport file \
--filename mcps_output.json
Output on success:
Successfully emitted 156 events for 13 files to DataHub FILE
Programmatic Emission (Java SDK)
import datahub.client.rest.RestEmitter;
import datahub.event.MetadataChangeProposalWrapper;
// Create emitter
RestEmitter emitter = RestEmitter.create(b ->
b.server("http://datahub-gms:8080").token("mytoken"));
try {
// Build ProtobufDataset (see ProtobufDataset_Builder)
ProtobufDataset dataset = ProtobufDataset.builder()
// ... configure builder ...
.build();
// Emit all MCPs
dataset.getAllMetadataChangeProposals()
.flatMap(Collection::stream)
.forEach(mcpw -> {
try {
emitter.emit(mcpw, null).get();
} catch (Exception e) {
System.err.println("Failed to emit: " + e.getMessage());
}
});
} finally {
emitter.close();
}
Error Handling Behavior
The emission loop handles errors at the file level:
// Per-file error handling (simplified)
try {
// Build dataset and emit MCPs for this file
} catch (Exception e) {
if (e.getMessage() != null
&& e.getMessage().equals("Cannot autodetect protobuf Message.")) {
// Treated as warning -- no top-level message found in this file
System.err.printf("WARN: Top-level schema not found in %s%n", filePath);
} else {
// Treated as error -- sets exit code to 1 but continues processing
e.printStackTrace();
exitCode.set(1);
}
}