Implementation:Datahub project Datahub ProtobufDataset
| Knowledge Sources | |
|---|---|
| Domains | Protobuf_Integration |
| Last Updated | 2026-02-10 00:00 GMT |
Overview
Represents a DataHub dataset derived from a protobuf message definition, orchestrating the conversion of a ProtobufGraph into DataHub metadata aspects including schema metadata, tags, ownership, and dataset properties.
Description
ProtobufDataset is the central model class that ties together a protobuf graph, visitor pattern infrastructure, and DataHub metadata generation. It uses the Builder pattern for construction, accepting parameters such as the data platform URN, fabric type, audit stamp, protobuf descriptor bytes, source schema text, and organizational metadata (GitHub org, Slack team ID).
The build() method on the Builder:
- Parses the protobuf descriptor bytes into a
FileDescriptorSet - Creates a
ProtobufGraphfrom the file set - Wires up a suite of visitors for dataset-level metadata (ownership, tags, terms, domains, properties, institutional memory) and field-level metadata (schema fields with protobuf extensions)
- Configures a
TagVisitorfor MCP-level tag generation
The class exposes two primary MCP generation methods:
- getVisitorMCPs() -- Produces MCPs from registered
ProtobufModelVisitorinstances (e.g., tag creation) - getDatasetMCPs() -- Produces MCPs for the dataset entity itself:
DatasetVisitoroutput,SchemaMetadata,Status, and optionalSubTypes
Schema fields are sorted using a two-level comparator: first by root message field weight (preserving original field order), then by field path.
Usage
Use ProtobufDataset when you need to convert a protobuf message into a complete set of DataHub metadata proposals. It is the primary abstraction used by Proto2DataHub (the CLI tool) and by the Python-based protobuf ingestion source.
Code Reference
Source Location
- Repository: Datahub_project_Datahub
- File: metadata-integration/java/datahub-protobuf/src/main/java/datahub/protobuf/ProtobufDataset.java
Signature
public class ProtobufDataset {
public static ProtobufDataset.Builder builder();
// Builder methods
public static class Builder {
public Builder setDataPlatformUrn(DataPlatformUrn dataPlatformUrn);
public Builder setProtocIn(InputStream protocIn) throws IOException;
public Builder setFabricType(FabricType fabricType);
public Builder setAuditStamp(AuditStamp auditStamp);
public Builder setFilename(String filename);
public Builder setSchema(String schema);
public Builder setGithubOrganization(String githubOrganization);
public Builder setSlackTeamId(String slackTeamId);
public Builder setSubType(String subType);
public Builder setMessageName(String messageName);
public ProtobufDataset build() throws IOException;
}
// MCP generation
public Stream<Collection<MetadataChangeProposalWrapper<?>>> getAllMetadataChangeProposals();
public List<MetadataChangeProposalWrapper<?>> getVisitorMCPs();
public List<MetadataChangeProposalWrapper<?>> getDatasetMCPs();
public SchemaMetadata getSchemaMetadata();
// Accessors
public DatasetUrn getDatasetUrn();
public ProtobufGraph getGraph();
public AuditStamp getAuditStamp();
}
Import
import datahub.protobuf.ProtobufDataset;
I/O Contract
| Input | Type | Description |
|---|---|---|
| protocIn / protocBytes | InputStream / byte[] |
Compiled protobuf descriptor bytes |
| schema | String |
Raw .proto source text for the platform schema |
| dataPlatformUrn | DataPlatformUrn |
Target platform (defaults to kafka) |
| fabricType | FabricType |
Environment (DEV, PROD, etc.) |
| filename | String |
Source .proto file path for autodetection |
| Output | Type | Description |
|---|---|---|
| MCPs | Stream<Collection<MetadataChangeProposalWrapper>> |
All metadata change proposals (visitor + dataset) |
| SchemaMetadata | SchemaMetadata |
DataHub schema with fields, hash, platform schema |
Usage Examples
ProtobufDataset dataset = ProtobufDataset.builder()
.setDataPlatformUrn(new DataPlatformUrn("kafka"))
.setProtocIn(new FileInputStream("repo.dsc"))
.setFilename("src/main/proto/MyEvent.proto")
.setSchema(Files.readString(Path.of("src/main/proto/MyEvent.proto")))
.setAuditStamp(auditStamp)
.setFabricType(FabricType.PROD)
.setGithubOrganization("datahub-project")
.setSubType("schema")
.build();
dataset.getAllMetadataChangeProposals()
.flatMap(Collection::stream)
.forEach(mcpw -> emitter.emit(mcpw, null).get());