Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Datahub project Datahub SparkStreamingEventToDatahub

From Leeroopedia


Knowledge Sources
Domains Spark_Lineage, OpenLineage
Last Updated 2026-02-10 00:00 GMT

Overview

SparkStreamingEventToDatahub is a utility class in the datahub.spark.converter package that converts Spark Structured Streaming progress events into DataHub metadata change proposals (MCPs). It transforms a StreamingQueryProgress event into a list of MetadataChangeProposalWrapper objects representing DataFlow, DataJob, and DataJobInputOutput entities, thereby capturing end-to-end lineage for streaming pipelines.

The class is entirely static (the constructor is private) and operates as a stateless converter. It parses the JSON payload embedded in StreamingQueryProgress to extract source and sink descriptions, maps those descriptions to DataHub platform names and dataset URNs, and assembles the corresponding metadata aspects.

Source file: metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/converter/SparkStreamingEventToDatahub.java (255 lines)

Code Reference

Class Declaration

@Slf4j
public class SparkStreamingEventToDatahub {
  private SparkStreamingEventToDatahub() {}

Platform Constants

public static final String DELTA_LAKE_PLATFORM = "delta-lake";
public static final String FILE_PLATFORM = "file";
public static final String KAFKA_PLATFORM = "kafka";

Key Methods

generateMcpFromStreamingProgressEvent

public static List<MetadataChangeProposalWrapper> generateMcpFromStreamingProgressEvent(
    StreamingQueryProgress event,
    SparkLineageConf conf,
    Map<String, MetadataChangeProposalWrapper> schemaMap)

Primary entry point. Accepts a Spark streaming progress event, configuration, and a schema map, then produces a list of MCPs. The method:

  1. Derives a stable pipelineName from the configured pipeline name, or falls back to the streaming query name, or synthesizes one from the sink description.
  2. Creates a DataFlowInfo aspect with custom properties (createdAt, plan JSON) and emits a DataFlow MCP.
  3. Creates a DataJobInfo aspect with batch metrics (batchId, inputRowsPerSecond, processedRowsPerSecond, numInputRows) and emits a DataJob MCP.
  4. Parses sources and sink from the event JSON, resolves each to a DatasetUrn via generateUrnFromStreamingDescription, and builds the DataJobInputOutput aspect.
  5. Optionally materializes datasets and attaches schema metadata if configured.

generateUrnFromStreamingDescription

public static Optional<DatasetUrn> generateUrnFromStreamingDescription(
    String description, SparkLineageConf sparkLineageConf)

Parses a streaming source or sink description string of the form Namespace[path]. Maps the namespace to a DataHub platform via getDatahubPlatform and constructs a DatasetUrn. For Kafka, it extracts the topic name. For file-based and Delta Lake platforms, it delegates to HdfsPathDataset.create for URI normalization.

getDatahubPlatform

public static String getDatahubPlatform(String namespace)

Maps Spark streaming namespace identifiers to DataHub platform names:

  • KafkaV2kafka
  • DeltaSinkdelta-lake
  • CloudFilesSourcedbfs
  • FileSink, FileStreamSourcefile
  • All others pass through as-is.

getKafkaTopicFromPath

public static String getKafkaTopicFromPath(String path)

Extracts a Kafka topic name by returning the substring between the first pair of square brackets in the path string.

I/O Contract

Direction Type Description
Input StreamingQueryProgress Spark streaming micro-batch progress event containing JSON with sources, sink, and batch metrics.
Input SparkLineageConf Configuration object carrying pipeline name, platform instance, fabric type, and flags for dataset materialization and schema inclusion.
Input Map<String, MetadataChangeProposalWrapper> Pre-built schema MCPs keyed by dataset URN string, used when schema metadata inclusion is enabled.
Output List<MetadataChangeProposalWrapper> Ordered list of MCPs: one DataFlow, one DataJob, zero or more input/output Dataset MCPs, and one DataJobInputOutput aspect.

Usage Examples

This class is invoked from the DataHub Spark listener whenever a streaming micro-batch completes:

StreamingQueryProgress progress = ...; // from SparkListener callback
SparkLineageConf conf = ...; // built from Spark config
Map<String, MetadataChangeProposalWrapper> schemaMap = ...; // pre-computed schemas

List<MetadataChangeProposalWrapper> mcps =
    SparkStreamingEventToDatahub.generateMcpFromStreamingProgressEvent(progress, conf, schemaMap);

// Emit each MCP to the DataHub REST emitter
for (MetadataChangeProposalWrapper mcp : mcps) {
    emitter.emit(mcp);
}

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment