Implementation:Datahub project Datahub SaveIntoDataSourceCommandVisitor
| Knowledge Sources | |
|---|---|
| Domains | Spark_Lineage, OpenLineage |
| Last Updated | 2026-02-10 00:00 GMT |
Overview
A Spark logical plan visitor that intercepts SaveIntoDataSourceCommand nodes to extract output dataset lineage information for a wide range of data sources including Delta Lake, JDBC, Kafka, Azure Kusto, and extension-provided connectors.
Description
SaveIntoDataSourceCommandVisitor extends AbstractQueryPlanDatasetBuilder and implements JobNameSuffixProvider to handle Spark write operations that go through the SaveIntoDataSourceCommand code path. This is one of the primary entry points for capturing output lineage in Spark, as many DataFrame write operations (e.g., df.write.format("delta").save(path)) are represented internally as SaveIntoDataSourceCommand nodes in the logical plan.
The visitor implements a dispatch chain that handles different data source types in the following priority order:
- OpenLineage extensions -- If the
SparkOpenLineageExtensionVisitorWrapperrecognizes the data source, it delegates lineage extraction to the extension, allowing third-party connectors to provide their own dataset identifiers. - Kafka -- Uses
KafkaRelationVisitorfor special handling of Kafka source/sink asymmetry (the "topic" vs "subscribe" option difference between reads and writes). - Azure Kusto -- Uses
KustoRelationVisitorfor Kusto-specific dataset extraction. - Delta Lake -- Handles path-based Delta tables (via the
pathoption), catalog-based tables (via thetableoption), andsaveAsTablescenarios by attempting to extract the table name from theQueryExecutionSQL text. - JDBC -- Uses
JdbcDatasetUtilsto construct a dataset identifier from the JDBC URL and table name. - Generic RelationProvider / SchemaRelationProvider -- As a fallback, creates the relation using the data source's
createRelationmethod, wraps it in aLogicalRelation, and delegates to other registered plan visitors.
For the generic fallback path, the visitor also enriches the resulting output datasets with a LifecycleStateChangeDatasetFacet set to OVERWRITE.
The class also provides job name suffix logic, returning a meaningful suffix (table name or path) based on the data source type, which is used for constructing OpenLineage job names.
Usage
This visitor is registered as part of the OpenLineage Spark agent's plan visitor chain. It is automatically invoked during Spark event processing when a SaveIntoDataSourceCommand is detected in the optimized logical plan. It should not typically be instantiated or called directly by user code. The visitor requires a SparkSession to be present in the OpenLineageContext.
Code Reference
Source Location
- Repository: Datahub_project_Datahub
- File: metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/lifecycle/plan/SaveIntoDataSourceCommandVisitor.java
- Lines: 1-353
Signature
public class SaveIntoDataSourceCommandVisitor
extends AbstractQueryPlanDatasetBuilder<
SparkListenerEvent, SaveIntoDataSourceCommand, OutputDataset>
implements JobNameSuffixProvider<SaveIntoDataSourceCommand> {
public SaveIntoDataSourceCommandVisitor(OpenLineageContext context);
@Override
public boolean isDefinedAtLogicalPlan(LogicalPlan x);
@Override
public boolean isDefinedAt(SparkListenerEvent x);
@Override
public List<OutputDataset> apply(SaveIntoDataSourceCommand cmd);
@Override
public List<OutputDataset> apply(
SparkListenerEvent event, SaveIntoDataSourceCommand command);
@Override
public Optional<String> jobNameSuffix(OpenLineageContext context);
public Optional<String> jobNameSuffix(SaveIntoDataSourceCommand command);
}
Import
import io.openlineage.spark.agent.lifecycle.plan.SaveIntoDataSourceCommandVisitor;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| context | OpenLineageContext |
Yes | The OpenLineage context containing the Spark session, query execution, extension visitor wrapper, and registered dataset builders/visitors. |
| event | SparkListenerEvent |
Yes | The Spark listener event (e.g., SparkListenerSQLExecutionStart or SparkListenerSQLExecutionEnd) that triggered plan analysis.
|
| command | SaveIntoDataSourceCommand |
Yes | The logical plan node representing a Spark write operation, containing the data source, options (path, table, url, dbtable, etc.), save mode, schema, and query sub-plan. |
Outputs
| Name | Type | Description |
|---|---|---|
| datasets | List<OutputDataset> |
A list of OpenLineage output datasets representing the write targets. Each dataset includes namespace, name, schema facet, and lifecycle state change facet. Returns an empty list if the data source cannot be resolved. |
| jobNameSuffix | Optional<String> |
An optional suffix for the OpenLineage job name, derived from the target table name or path (e.g., "my_table" or "/data/output").
|
Usage Examples
// The visitor is typically registered in the OpenLineage plan visitor chain
// and invoked automatically. Here is how the dispatch logic works internally:
// 1. Check if the visitor handles this plan node
SaveIntoDataSourceCommandVisitor visitor =
new SaveIntoDataSourceCommandVisitor(openLineageContext);
LogicalPlan plan = queryExecution.optimizedPlan();
if (visitor.isDefinedAtLogicalPlan(plan)) {
// 2. Extract output datasets
SaveIntoDataSourceCommand command = (SaveIntoDataSourceCommand) plan;
List<OutputDataset> outputs = visitor.apply(sparkListenerEvent, command);
// 3. Each output dataset contains namespace, name, and facets
for (OutputDataset ds : outputs) {
System.out.println("Output: " + ds.getNamespace() + "/" + ds.getName());
}
}
// The visitor handles Delta Lake writes:
// df.write.format("delta").save("/data/my_table")
// -> OutputDataset with path-based identifier
// JDBC writes:
// df.write.format("jdbc").option("url", jdbcUrl)
// .option("dbtable", "schema.table").save()
// -> OutputDataset with JDBC-derived identifier
// Kafka writes:
// df.write.format("kafka").option("topic", "my-topic").save()
// -> Delegates to KafkaRelationVisitor
Related Pages
- Environment:Datahub_project_Datahub_Spark_Lineage_Environment
- SparkOpenLineageExtensionVisitorWrapper - Used to check and extract lineage from extension-provided data sources
- PlanUtils - Utility functions for schema conversion and safe type checking used by this visitor