Implementation:Datahub project Datahub WriteToDataSourceV2Visitor
| Knowledge Sources | |
|---|---|
| Domains | Spark_Lineage, OpenLineage |
| Last Updated | 2026-02-10 00:00 GMT |
Overview
WriteToDataSourceV2Visitor is a query plan visitor in the io.openlineage.spark.agent.lifecycle.plan package that extracts output datasets from Spark Structured Streaming write operations. It extends QueryPlanVisitor<WriteToDataSourceV2, OutputDataset> and handles the WriteToDataSourceV2 logical plan node, which is the plan representation of a streaming micro-batch write.
The visitor supports Kafka streaming writes via a dedicated KafkaStreamWriteProxy inner class, file-based streaming writes (including Delta, Parquet, ForeachBatch, and Console sinks), and logs warnings for unsupported sink types. It uses reflection extensively to access private fields of Spark internal classes.
This class is noted as being "shadowed from OpenLineage to support foreachBatch in streaming."
Source file: metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/lifecycle/plan/WriteToDataSourceV2Visitor.java (286 lines)
Code Reference
Class Declaration
@Slf4j
public final class WriteToDataSourceV2Visitor
extends QueryPlanVisitor<WriteToDataSourceV2, OutputDataset> {
Constants
private static final String KAFKA_STREAMING_WRITE_CLASS_NAME =
"org.apache.spark.sql.kafka010.KafkaStreamingWrite";
private static final String FOREACH_BATCH_SINK_CLASS_NAME =
"org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink";
Constructor
public WriteToDataSourceV2Visitor(@NonNull OpenLineageContext context)
Key Methods
isDefinedAt
@Override public boolean isDefinedAt(LogicalPlan plan)
Returns true when the supplied logical plan is an instance of WriteToDataSourceV2.
apply
@Override public List<OutputDataset> apply(LogicalPlan plan)
Main entry point. Casts the plan to WriteToDataSourceV2, inspects the BatchWrite (expecting MicroBatchWrite), and dispatches to:
handleKafkaStreamingWritefor Kafka sinkshandleFileBasedStreamingWritefor file-based sinks (FileStreamSink, ForeachBatchSink, ConsoleSink, DeltaSink, ParquetSink)
handleKafkaStreamingWrite
private List<OutputDataset> handleKafkaStreamingWrite(StreamingWrite streamingWrite)
Uses the KafkaStreamWriteProxy to extract topic name, schema, and bootstrap servers via reflection, then creates an OutputDataset with the topic as the dataset name and the resolved bootstrap server as namespace.
handleFileBasedStreamingWrite
private List<OutputDataset> handleFileBasedStreamingWrite(
StreamingWrite streamingWrite, WriteToDataSourceV2 write)
Attempts to extract a file path from the streaming write using reflection, creates a DatasetIdentifier from the URI via PathUtils.fromURI, and builds an OutputDataset with the schema taken from the write query.
extractPathFromStreamingWrite
private Optional<String> extractPathFromStreamingWrite(StreamingWrite streamingWrite)
Dispatches path extraction to specialized methods based on the streaming write class name:
tryExtractPathFromForeachBatch- returns empty (lineage captured from batch operations)tryExtractPathFromFileSink- tries fieldspath,outputPath,locationvia reflection- Console sinks return
"console://output"
Inner Class: KafkaStreamWriteProxy
@Slf4j
private static final class KafkaStreamWriteProxy {
A reflection-based proxy around Kafka's KafkaStreamingWrite class. Exposes:
getTopic()- reads thetopicfield (ScalaOption<String>)getSchema()- reads theschemafield (StructType)getBootstrapServers()- readsproducerParamsmap and extractsbootstrap.servers
I/O Contract
| Direction | Type | Description |
|---|---|---|
| Input | LogicalPlan (specifically WriteToDataSourceV2) |
A Spark logical plan node representing a V2 data source write in a streaming micro-batch. |
| Input | OpenLineageContext |
Context with OpenLineage client, Spark context, and dataset factory methods. |
| Output | List<OutputDataset> |
Zero or one OpenLineage OutputDataset representing the streaming sink (Kafka topic, file path, console, etc.).
|
Usage Examples
This visitor is registered alongside the input visitor and invoked during plan traversal:
// Registered as part of the output visitor list
List<QueryPlanVisitor<?, ? extends OutputDataset>> outputVisitors = Arrays.asList(
new WriteToDataSourceV2Visitor(context),
// ... other visitors
);
// Framework dispatches automatically:
// visitor.isDefinedAt(plan) -> true if WriteToDataSourceV2
// visitor.apply(plan) -> returns List<OutputDataset>
For a Kafka streaming write, the proxy extracts fields reflectively:
// Internal flow: // 1. WriteToDataSourceV2 -> MicroBatchWrite -> KafkaStreamingWrite // 2. KafkaStreamWriteProxy reads topic, schema, producerParams via reflection // 3. OutputDataset created with topic name and bootstrap server namespace
Related Pages
- Datahub_project_Datahub_StreamingDataSourceV2RelationVisitor - Companion visitor handling input datasets for streaming reads
- Datahub_project_Datahub_SparkStreamingEventToDatahub - Higher-level streaming event converter
- Datahub_project_Datahub_SparkPathUtils - URI-to-DatasetIdentifier resolution used by file-based sink handling
- Datahub_project_Datahub_RemovePathPatternUtils - Path pattern cleanup for extracted dataset names