Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Datahub project Datahub WriteToDataSourceV2Visitor

From Leeroopedia
Revision as of 14:44, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Datahub_project_Datahub_WriteToDataSourceV2Visitor.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Spark_Lineage, OpenLineage
Last Updated 2026-02-10 00:00 GMT

Overview

WriteToDataSourceV2Visitor is a query plan visitor in the io.openlineage.spark.agent.lifecycle.plan package that extracts output datasets from Spark Structured Streaming write operations. It extends QueryPlanVisitor<WriteToDataSourceV2, OutputDataset> and handles the WriteToDataSourceV2 logical plan node, which is the plan representation of a streaming micro-batch write.

The visitor supports Kafka streaming writes via a dedicated KafkaStreamWriteProxy inner class, file-based streaming writes (including Delta, Parquet, ForeachBatch, and Console sinks), and logs warnings for unsupported sink types. It uses reflection extensively to access private fields of Spark internal classes.

This class is noted as being "shadowed from OpenLineage to support foreachBatch in streaming."

Source file: metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/lifecycle/plan/WriteToDataSourceV2Visitor.java (286 lines)

Code Reference

Class Declaration

@Slf4j
public final class WriteToDataSourceV2Visitor
    extends QueryPlanVisitor<WriteToDataSourceV2, OutputDataset> {

Constants

private static final String KAFKA_STREAMING_WRITE_CLASS_NAME =
    "org.apache.spark.sql.kafka010.KafkaStreamingWrite";
private static final String FOREACH_BATCH_SINK_CLASS_NAME =
    "org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink";

Constructor

public WriteToDataSourceV2Visitor(@NonNull OpenLineageContext context)

Key Methods

isDefinedAt

@Override
public boolean isDefinedAt(LogicalPlan plan)

Returns true when the supplied logical plan is an instance of WriteToDataSourceV2.

apply

@Override
public List<OutputDataset> apply(LogicalPlan plan)

Main entry point. Casts the plan to WriteToDataSourceV2, inspects the BatchWrite (expecting MicroBatchWrite), and dispatches to:

  • handleKafkaStreamingWrite for Kafka sinks
  • handleFileBasedStreamingWrite for file-based sinks (FileStreamSink, ForeachBatchSink, ConsoleSink, DeltaSink, ParquetSink)

handleKafkaStreamingWrite

private List<OutputDataset> handleKafkaStreamingWrite(StreamingWrite streamingWrite)

Uses the KafkaStreamWriteProxy to extract topic name, schema, and bootstrap servers via reflection, then creates an OutputDataset with the topic as the dataset name and the resolved bootstrap server as namespace.

handleFileBasedStreamingWrite

private List<OutputDataset> handleFileBasedStreamingWrite(
    StreamingWrite streamingWrite, WriteToDataSourceV2 write)

Attempts to extract a file path from the streaming write using reflection, creates a DatasetIdentifier from the URI via PathUtils.fromURI, and builds an OutputDataset with the schema taken from the write query.

extractPathFromStreamingWrite

private Optional<String> extractPathFromStreamingWrite(StreamingWrite streamingWrite)

Dispatches path extraction to specialized methods based on the streaming write class name:

  • tryExtractPathFromForeachBatch - returns empty (lineage captured from batch operations)
  • tryExtractPathFromFileSink - tries fields path, outputPath, location via reflection
  • Console sinks return "console://output"

Inner Class: KafkaStreamWriteProxy

@Slf4j
private static final class KafkaStreamWriteProxy {

A reflection-based proxy around Kafka's KafkaStreamingWrite class. Exposes:

  • getTopic() - reads the topic field (Scala Option<String>)
  • getSchema() - reads the schema field (StructType)
  • getBootstrapServers() - reads producerParams map and extracts bootstrap.servers

I/O Contract

Direction Type Description
Input LogicalPlan (specifically WriteToDataSourceV2) A Spark logical plan node representing a V2 data source write in a streaming micro-batch.
Input OpenLineageContext Context with OpenLineage client, Spark context, and dataset factory methods.
Output List<OutputDataset> Zero or one OpenLineage OutputDataset representing the streaming sink (Kafka topic, file path, console, etc.).

Usage Examples

This visitor is registered alongside the input visitor and invoked during plan traversal:

// Registered as part of the output visitor list
List<QueryPlanVisitor<?, ? extends OutputDataset>> outputVisitors = Arrays.asList(
    new WriteToDataSourceV2Visitor(context),
    // ... other visitors
);

// Framework dispatches automatically:
// visitor.isDefinedAt(plan) -> true if WriteToDataSourceV2
// visitor.apply(plan) -> returns List<OutputDataset>

For a Kafka streaming write, the proxy extracts fields reflectively:

// Internal flow:
// 1. WriteToDataSourceV2 -> MicroBatchWrite -> KafkaStreamingWrite
// 2. KafkaStreamWriteProxy reads topic, schema, producerParams via reflection
// 3. OutputDataset created with topic name and bootstrap server namespace

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment