Implementation:Datahub project Datahub SparkStreamingEventToDatahub
| Knowledge Sources | |
|---|---|
| Domains | Spark_Lineage, OpenLineage |
| Last Updated | 2026-02-10 00:00 GMT |
Overview
SparkStreamingEventToDatahub is a utility class in the datahub.spark.converter package that converts Spark Structured Streaming progress events into DataHub metadata change proposals (MCPs). It transforms a StreamingQueryProgress event into a list of MetadataChangeProposalWrapper objects representing DataFlow, DataJob, and DataJobInputOutput entities, thereby capturing end-to-end lineage for streaming pipelines.
The class is entirely static (the constructor is private) and operates as a stateless converter. It parses the JSON payload embedded in StreamingQueryProgress to extract source and sink descriptions, maps those descriptions to DataHub platform names and dataset URNs, and assembles the corresponding metadata aspects.
Source file: metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/converter/SparkStreamingEventToDatahub.java (255 lines)
Code Reference
Class Declaration
@Slf4j
public class SparkStreamingEventToDatahub {
private SparkStreamingEventToDatahub() {}
Platform Constants
public static final String DELTA_LAKE_PLATFORM = "delta-lake"; public static final String FILE_PLATFORM = "file"; public static final String KAFKA_PLATFORM = "kafka";
Key Methods
generateMcpFromStreamingProgressEvent
public static List<MetadataChangeProposalWrapper> generateMcpFromStreamingProgressEvent(
StreamingQueryProgress event,
SparkLineageConf conf,
Map<String, MetadataChangeProposalWrapper> schemaMap)
Primary entry point. Accepts a Spark streaming progress event, configuration, and a schema map, then produces a list of MCPs. The method:
- Derives a stable
pipelineNamefrom the configured pipeline name, or falls back to the streaming query name, or synthesizes one from the sink description. - Creates a
DataFlowInfoaspect with custom properties (createdAt, plan JSON) and emits a DataFlow MCP. - Creates a
DataJobInfoaspect with batch metrics (batchId, inputRowsPerSecond, processedRowsPerSecond, numInputRows) and emits a DataJob MCP. - Parses sources and sink from the event JSON, resolves each to a
DatasetUrnviagenerateUrnFromStreamingDescription, and builds theDataJobInputOutputaspect. - Optionally materializes datasets and attaches schema metadata if configured.
generateUrnFromStreamingDescription
public static Optional<DatasetUrn> generateUrnFromStreamingDescription(
String description, SparkLineageConf sparkLineageConf)
Parses a streaming source or sink description string of the form Namespace[path]. Maps the namespace to a DataHub platform via getDatahubPlatform and constructs a DatasetUrn. For Kafka, it extracts the topic name. For file-based and Delta Lake platforms, it delegates to HdfsPathDataset.create for URI normalization.
getDatahubPlatform
public static String getDatahubPlatform(String namespace)
Maps Spark streaming namespace identifiers to DataHub platform names:
KafkaV2→kafkaDeltaSink→delta-lakeCloudFilesSource→dbfsFileSink,FileStreamSource→file- All others pass through as-is.
getKafkaTopicFromPath
public static String getKafkaTopicFromPath(String path)
Extracts a Kafka topic name by returning the substring between the first pair of square brackets in the path string.
I/O Contract
| Direction | Type | Description |
|---|---|---|
| Input | StreamingQueryProgress |
Spark streaming micro-batch progress event containing JSON with sources, sink, and batch metrics. |
| Input | SparkLineageConf |
Configuration object carrying pipeline name, platform instance, fabric type, and flags for dataset materialization and schema inclusion. |
| Input | Map<String, MetadataChangeProposalWrapper> |
Pre-built schema MCPs keyed by dataset URN string, used when schema metadata inclusion is enabled. |
| Output | List<MetadataChangeProposalWrapper> |
Ordered list of MCPs: one DataFlow, one DataJob, zero or more input/output Dataset MCPs, and one DataJobInputOutput aspect. |
Usage Examples
This class is invoked from the DataHub Spark listener whenever a streaming micro-batch completes:
StreamingQueryProgress progress = ...; // from SparkListener callback
SparkLineageConf conf = ...; // built from Spark config
Map<String, MetadataChangeProposalWrapper> schemaMap = ...; // pre-computed schemas
List<MetadataChangeProposalWrapper> mcps =
SparkStreamingEventToDatahub.generateMcpFromStreamingProgressEvent(progress, conf, schemaMap);
// Emit each MCP to the DataHub REST emitter
for (MetadataChangeProposalWrapper mcp : mcps) {
emitter.emit(mcp);
}
Related Pages
- Datahub_project_Datahub_SparkConfigParser_ParseSparkConfig - Configuration parsing for Spark lineage
- Datahub_project_Datahub_SparkLineageConf_Builder - Builder for the
SparkLineageConfconsumed by this class - Datahub_project_Datahub_DatahubSparkListener_Init - The Spark listener that invokes this converter
- Datahub_project_Datahub_StreamingDataSourceV2RelationVisitor - Visitor for streaming input datasets in logical plans
- Datahub_project_Datahub_WriteToDataSourceV2Visitor - Visitor for streaming output datasets in logical plans
- Datahub_project_Datahub_SparkPathUtils - Path-to-DatasetIdentifier resolution used by streaming description parsing