Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Datahub project Datahub ProtobufDataset Builder

From Leeroopedia


Field Value
Implementation Name ProtobufDataset_Builder
Type API Doc
Workflow Protobuf_Schema_Ingestion
Repository https://github.com/datahub-project/datahub
Implements Principle:Datahub_project_Datahub_Protobuf_Schema_Conversion
Last Updated 2026-02-09 17:00 GMT

Overview

Description

ProtobufDataset Builder is the core API for transforming compiled protobuf descriptor sets into DataHub metadata. The ProtobufDataset class uses the Builder pattern to configure all parameters needed for schema conversion, then constructs a ProtobufGraph internally and applies a set of visitor classes to produce a stream of MetadataChangeProposalWrapper collections. These MCPs represent the full set of DataHub aspects derived from the protobuf schema, including SchemaMetadata, DatasetProperties, Ownership, GlobalTags, Domains, GlossaryTerms, InstitutionalMemory, Deprecation, Status, and SubTypes.

The builder internally coordinates three key components:

  • ProtobufGraph: A JGraphT-based directed graph modeling messages, fields, and type relationships (489 lines).
  • DatasetVisitor: A composite visitor that aggregates dataset-level metadata from sub-visitors (180 lines).
  • SchemaFieldVisitor / ProtobufExtensionFieldVisitor: Field-level visitors that produce SchemaField records with type information, descriptions, and field paths (27 lines / extended).

Usage

The builder is instantiated via ProtobufDataset.builder(), configured with setter methods, and finalized with .build(). After building, getAllMetadataChangeProposals() is called to produce the output stream of MCPs.

Code Reference

Source Location

  • metadata-integration/java/datahub-protobuf/src/main/java/datahub/protobuf/ProtobufDataset.java, lines 1-315
  • metadata-integration/java/datahub-protobuf/src/main/java/datahub/protobuf/model/ProtobufGraph.java, lines 1-489
  • metadata-integration/java/datahub-protobuf/src/main/java/datahub/protobuf/model/ProtobufField.java, lines 1-399

Signature

Builder construction and build:

public static ProtobufDataset.Builder builder()

public class Builder {
    public Builder setDataPlatformUrn(@Nullable DataPlatformUrn dataPlatformUrn)
    public Builder setDatasetUrn(@Nullable DatasetUrn datasetUrn)
    public Builder setProtocIn(InputStream protocIn) throws IOException
    public Builder setProtocBytes(byte[] protocBytes)
    public Builder setFabricType(FabricType fabricType)
    public Builder setAuditStamp(AuditStamp auditStamp)
    public Builder setMessageName(@Nullable String messageName)
    public Builder setFilename(@Nullable String filename)
    public Builder setSchema(@Nullable String schema)
    public Builder setGithubOrganization(@Nullable String githubOrganization)
    public Builder setSlackTeamId(@Nullable String slackTeamId)
    public Builder setSubType(@Nullable String subType)
    public Builder setEnableProtocCustomProperty(boolean enableProtocCustomProperty)
    public ProtobufDataset build() throws IOException
}

MCP output:

public Stream<Collection<MetadataChangeProposalWrapper<? extends RecordTemplate>>>
    getAllMetadataChangeProposals()

Import

import datahub.protobuf.ProtobufDataset;

I/O Contract

Direction Type Description
Input InputStream or byte[] Binary protobuf descriptor set (compiled .protoc / .dsc file contents).
Input DataPlatformUrn Platform identifier (defaults to kafka if null).
Input FabricType Environment type (DEV, PROD, etc.).
Input AuditStamp Timestamp and actor for metadata provenance.
Input String filename Path to the source .proto file (used for root message autodetection).
Input String schema Raw text content of the source .proto file (stored as KafkaSchema in SchemaMetadata).
Output Stream<Collection<MetadataChangeProposalWrapper>> Two collections: visitor MCPs (Tag entities) and dataset MCPs (all dataset aspects).

Builder Parameters

Parameter Type Required Default Description
dataPlatformUrn DataPlatformUrn No kafka The data platform URN for the generated dataset.
datasetUrn DatasetUrn No Auto-generated Explicit dataset URN. If null, constructed from platform + message full name + fabric type.
protocIn / protocBytes InputStream / byte[] Yes -- The compiled protobuf descriptor set binary content.
fabricType FabricType Yes -- Environment (DEV, PROD, STAGING, etc.).
auditStamp AuditStamp Yes -- Metadata provenance timestamp and actor.
messageName String No Auto-detect Fully qualified message name. If null, the root message is auto-detected from the graph.
filename String No -- Source file path, used for root message autodetection when messageName is null.
schema String No -- Raw proto source text, stored in the SchemaMetadata platform schema.
githubOrganization String No -- GitHub org for resolving team references in comments.
slackTeamId String No -- Slack team ID for resolving channel references in comments.
subType String No -- Custom subtype (e.g., schema, event).
enableProtocCustomProperty boolean No false Whether to store the base64-encoded protoc binary as a dataset custom property.

Output Aspects

The getAllMetadataChangeProposals() method returns a stream of two collections:

Collection 1 -- Visitor MCPs (from getVisitorMCPs()):

  • Tag entity MCPs produced by TagVisitor

Collection 2 -- Dataset MCPs (from getDatasetMCPs()):

Aspect Aspect Name Source
DatasetProperties datasetProperties DatasetVisitor (name, qualifiedName, description, customProperties)
InstitutionalMemory institutionalMemory InstitutionalMemoryVisitor (documentation links from comments)
GlobalTags globalTags TagAssociationVisitor (meta.tag annotations)
GlossaryTerms glossaryTerms TermAssociationVisitor (glossary term annotations)
Ownership ownership OwnershipVisitor (meta.ownership annotations)
Domains domains DomainVisitor (meta.domain annotations)
Deprecation deprecation DeprecationVisitor (meta.deprecation annotations, emitted only if present)
SchemaMetadata schemaMetadata ProtobufDataset directly (schema fields, platform schema, version, hash)
Status status ProtobufDataset directly (always removed=false)
SubTypes subTypes ProtobufDataset directly (if subType is configured)

Usage Examples

Basic Builder Usage

import datahub.protobuf.ProtobufDataset;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.FabricType;
import com.linkedin.common.urn.CorpuserUrn;
import com.linkedin.common.urn.DataPlatformUrn;

import java.io.FileInputStream;
import java.nio.file.Files;
import java.nio.file.Path;

AuditStamp auditStamp = new AuditStamp()
    .setTime(System.currentTimeMillis())
    .setActor(new CorpuserUrn("datahub"));

ProtobufDataset dataset = ProtobufDataset.builder()
    .setDataPlatformUrn(new DataPlatformUrn("kafka"))
    .setProtocIn(new FileInputStream("schemas/my_event.protoc"))
    .setFilename("schemas/my_event.proto")
    .setSchema(Files.readString(Path.of("schemas/my_event.proto")))
    .setAuditStamp(auditStamp)
    .setFabricType(FabricType.PROD)
    .setGithubOrganization("datahub-project")
    .setSlackTeamId("TUMKD5EGJ")
    .setSubType("schema")
    .build();

Extracting All MCPs

import datahub.event.MetadataChangeProposalWrapper;
import java.util.Collection;
import java.util.stream.Stream;

Stream<Collection<MetadataChangeProposalWrapper<?>>> mcpStream =
    dataset.getAllMetadataChangeProposals();

mcpStream.flatMap(Collection::stream).forEach(mcpw -> {
    System.out.println("Aspect: " + mcpw.getAspectName());
    System.out.println("Entity: " + mcpw.getEntityUrn());
});

Accessing Schema Metadata Directly

import com.linkedin.schema.SchemaMetadata;

SchemaMetadata schemaMetadata = dataset.getSchemaMetadata();
System.out.println("Schema name: " + schemaMetadata.getSchemaName());
System.out.println("Field count: " + schemaMetadata.getFields().size());
schemaMetadata.getFields().forEach(field -> {
    System.out.println("  " + field.getFieldPath() + " : " + field.getNativeDataType());
});

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment