Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Datahub project Datahub Proto2DataHub RestEmitter Emit

From Leeroopedia


Field Value
Implementation Name Proto2DataHub_RestEmitter_Emit
Type API Doc
Workflow Protobuf_Schema_Ingestion
Repository https://github.com/datahub-project/datahub
Implements Principle:Datahub_project_Datahub_Protobuf_Metadata_Emission
Last Updated 2026-02-09 17:00 GMT

Overview

Description

Proto2DataHub RestEmitter Emit documents the emission loop within the Proto2DataHub.main method that iterates over all Metadata Change Proposals produced by ProtobufDataset.getAllMetadataChangeProposals() and sends each one to DataHub using the RestEmitter.emit() method from the DataHub Java SDK. This is the final stage of the protobuf ingestion pipeline, where schema-derived metadata is persisted in the DataHub backend.

The emission uses the Emitter interface, which abstracts over the RestEmitter (HTTP REST transport) and FileEmitter (local file transport) implementations. Each MCP is emitted synchronously, with the future's .get() call blocking until the server acknowledges receipt.

Usage

The emission loop runs automatically as part of the Proto2DataHub.main method. It is not typically called directly by users, but understanding the emission pattern is important for debugging ingestion failures and monitoring pipeline throughput.

Code Reference

Source Location

metadata-integration/java/datahub-protobuf/src/main/java/datahub/protobuf/Proto2DataHub.java, lines 80-441.

The core emission loop is at lines 359-407:

filePathStream.forEach(
    filePath -> {
        totalFiles.incrementAndGet();
        try {
            String textSchema = Files.readString(filePath);

            ProtobufDataset dataset = ProtobufDataset.builder()
                .setDataPlatformUrn(new DataPlatformUrn(config.dataPlatform))
                .setProtocIn(new FileInputStream(config.protoc))
                .setEnableProtocCustomProperty(config.enableProtocCustomProperty)
                .setFilename(filePath.toString())
                .setSchema(textSchema)
                .setAuditStamp(auditStamp)
                .setFabricType(config.fabricType)
                .setGithubOrganization(config.githubOrg)
                .setSlackTeamId(config.slackId)
                .setSubType(config.subType)
                .setMessageName(config.messageName)
                .build();

            dataset.getAllMetadataChangeProposals()
                .flatMap(Collection::stream)
                .forEach(mcpw -> {
                    try {
                        finalEmitter.emit(mcpw, null).get();
                        totalEvents.getAndIncrement();
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
        } catch (Exception e) {
            // Error handling with exit code tracking
        }
    });

Signature

Emitter interface (from DataHub Java SDK):

public interface Emitter {
    Future<MetadataWriteResponse> emit(
        MetadataChangeProposalWrapper mcpw,
        Callback callback
    ) throws IOException;

    void close() throws IOException;
}

RestEmitter factory (from DataHub Java SDK):

public class RestEmitter implements Emitter {
    public static RestEmitter create(
        Consumer<RestEmitterConfig.RestEmitterConfigBuilder> builderFunc
    )
}

Import

import datahub.client.Emitter;
import datahub.client.rest.RestEmitter;
import datahub.client.file.FileEmitter;
import datahub.client.file.FileEmitterConfig;
import datahub.event.MetadataChangeProposalWrapper;

I/O Contract

Direction Type Description
Input MetadataChangeProposalWrapper A single MCP wrapping an aspect (e.g., SchemaMetadata, Ownership, GlobalTags) with entity URN, change type, and aspect name.
Input Callback (nullable) Optional callback for async notification. Passed as null in Proto2DataHub since .get() is used for synchronous blocking.
Output Future<MetadataWriteResponse> A future that resolves to the server's response. The .get() call blocks until the MCP is acknowledged.

Emitter Instantiation by Transport

Transport Emitter Creation Configuration
rest RestEmitter.create(b -> b.server(config.datahubAPI).token(config.datahubToken)) Server URL and auth token.
file new FileEmitter(FileEmitterConfig.builder().fileName(config.filename).build()) Output file path.
kafka Not yet implemented Throws UnsupportedOperationException.

Status Tracking

Counter Type Description
totalEvents AtomicInteger Incremented after each successful emit().get() call. Reports total MCPs emitted.
totalFiles AtomicInteger Incremented at the start of processing each file. Reports total files attempted.
exitCode AtomicInteger Set to 1 if any file processing fails. Remains 0 on full success.

Usage Examples

REST Emission (Default)

java -jar datahub-protobuf.jar \
  --descriptor schemas/events.protoc \
  --file schemas/events.proto \
  --datahub_api http://datahub-gms:8080 \
  --datahub_token mytoken123

Output on success:

Successfully emitted 12 events for 1 files to DataHub REST

File Emission for Offline Processing

java -jar datahub-protobuf.jar \
  --descriptor schemas/all.dsc \
  --directory ./schemas \
  --transport file \
  --filename mcps_output.json

Output on success:

Successfully emitted 156 events for 13 files to DataHub FILE

Programmatic Emission (Java SDK)

import datahub.client.rest.RestEmitter;
import datahub.event.MetadataChangeProposalWrapper;

// Create emitter
RestEmitter emitter = RestEmitter.create(b ->
    b.server("http://datahub-gms:8080").token("mytoken"));

try {
    // Build ProtobufDataset (see ProtobufDataset_Builder)
    ProtobufDataset dataset = ProtobufDataset.builder()
        // ... configure builder ...
        .build();

    // Emit all MCPs
    dataset.getAllMetadataChangeProposals()
        .flatMap(Collection::stream)
        .forEach(mcpw -> {
            try {
                emitter.emit(mcpw, null).get();
            } catch (Exception e) {
                System.err.println("Failed to emit: " + e.getMessage());
            }
        });
} finally {
    emitter.close();
}

Error Handling Behavior

The emission loop handles errors at the file level:

// Per-file error handling (simplified)
try {
    // Build dataset and emit MCPs for this file
} catch (Exception e) {
    if (e.getMessage() != null
        && e.getMessage().equals("Cannot autodetect protobuf Message.")) {
        // Treated as warning -- no top-level message found in this file
        System.err.printf("WARN: Top-level schema not found in %s%n", filePath);
    } else {
        // Treated as error -- sets exit code to 1 but continues processing
        e.printStackTrace();
        exitCode.set(1);
    }
}

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment