Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Datahub project Datahub SaveIntoDataSourceCommandVisitor

From Leeroopedia
Revision as of 14:44, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Datahub_project_Datahub_SaveIntoDataSourceCommandVisitor.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Spark_Lineage, OpenLineage
Last Updated 2026-02-10 00:00 GMT

Overview

A Spark logical plan visitor that intercepts SaveIntoDataSourceCommand nodes to extract output dataset lineage information for a wide range of data sources including Delta Lake, JDBC, Kafka, Azure Kusto, and extension-provided connectors.

Description

SaveIntoDataSourceCommandVisitor extends AbstractQueryPlanDatasetBuilder and implements JobNameSuffixProvider to handle Spark write operations that go through the SaveIntoDataSourceCommand code path. This is one of the primary entry points for capturing output lineage in Spark, as many DataFrame write operations (e.g., df.write.format("delta").save(path)) are represented internally as SaveIntoDataSourceCommand nodes in the logical plan.

The visitor implements a dispatch chain that handles different data source types in the following priority order:

  1. OpenLineage extensions -- If the SparkOpenLineageExtensionVisitorWrapper recognizes the data source, it delegates lineage extraction to the extension, allowing third-party connectors to provide their own dataset identifiers.
  2. Kafka -- Uses KafkaRelationVisitor for special handling of Kafka source/sink asymmetry (the "topic" vs "subscribe" option difference between reads and writes).
  3. Azure Kusto -- Uses KustoRelationVisitor for Kusto-specific dataset extraction.
  4. Delta Lake -- Handles path-based Delta tables (via the path option), catalog-based tables (via the table option), and saveAsTable scenarios by attempting to extract the table name from the QueryExecution SQL text.
  5. JDBC -- Uses JdbcDatasetUtils to construct a dataset identifier from the JDBC URL and table name.
  6. Generic RelationProvider / SchemaRelationProvider -- As a fallback, creates the relation using the data source's createRelation method, wraps it in a LogicalRelation, and delegates to other registered plan visitors.

For the generic fallback path, the visitor also enriches the resulting output datasets with a LifecycleStateChangeDatasetFacet set to OVERWRITE.

The class also provides job name suffix logic, returning a meaningful suffix (table name or path) based on the data source type, which is used for constructing OpenLineage job names.

Usage

This visitor is registered as part of the OpenLineage Spark agent's plan visitor chain. It is automatically invoked during Spark event processing when a SaveIntoDataSourceCommand is detected in the optimized logical plan. It should not typically be instantiated or called directly by user code. The visitor requires a SparkSession to be present in the OpenLineageContext.

Code Reference

Source Location

Signature

public class SaveIntoDataSourceCommandVisitor
    extends AbstractQueryPlanDatasetBuilder<
        SparkListenerEvent, SaveIntoDataSourceCommand, OutputDataset>
    implements JobNameSuffixProvider<SaveIntoDataSourceCommand> {

    public SaveIntoDataSourceCommandVisitor(OpenLineageContext context);

    @Override
    public boolean isDefinedAtLogicalPlan(LogicalPlan x);

    @Override
    public boolean isDefinedAt(SparkListenerEvent x);

    @Override
    public List<OutputDataset> apply(SaveIntoDataSourceCommand cmd);

    @Override
    public List<OutputDataset> apply(
        SparkListenerEvent event, SaveIntoDataSourceCommand command);

    @Override
    public Optional<String> jobNameSuffix(OpenLineageContext context);

    public Optional<String> jobNameSuffix(SaveIntoDataSourceCommand command);
}

Import

import io.openlineage.spark.agent.lifecycle.plan.SaveIntoDataSourceCommandVisitor;

I/O Contract

Inputs

Name Type Required Description
context OpenLineageContext Yes The OpenLineage context containing the Spark session, query execution, extension visitor wrapper, and registered dataset builders/visitors.
event SparkListenerEvent Yes The Spark listener event (e.g., SparkListenerSQLExecutionStart or SparkListenerSQLExecutionEnd) that triggered plan analysis.
command SaveIntoDataSourceCommand Yes The logical plan node representing a Spark write operation, containing the data source, options (path, table, url, dbtable, etc.), save mode, schema, and query sub-plan.

Outputs

Name Type Description
datasets List<OutputDataset> A list of OpenLineage output datasets representing the write targets. Each dataset includes namespace, name, schema facet, and lifecycle state change facet. Returns an empty list if the data source cannot be resolved.
jobNameSuffix Optional<String> An optional suffix for the OpenLineage job name, derived from the target table name or path (e.g., "my_table" or "/data/output").

Usage Examples

// The visitor is typically registered in the OpenLineage plan visitor chain
// and invoked automatically. Here is how the dispatch logic works internally:

// 1. Check if the visitor handles this plan node
SaveIntoDataSourceCommandVisitor visitor =
    new SaveIntoDataSourceCommandVisitor(openLineageContext);

LogicalPlan plan = queryExecution.optimizedPlan();
if (visitor.isDefinedAtLogicalPlan(plan)) {
    // 2. Extract output datasets
    SaveIntoDataSourceCommand command = (SaveIntoDataSourceCommand) plan;
    List<OutputDataset> outputs = visitor.apply(sparkListenerEvent, command);

    // 3. Each output dataset contains namespace, name, and facets
    for (OutputDataset ds : outputs) {
        System.out.println("Output: " + ds.getNamespace() + "/" + ds.getName());
    }
}

// The visitor handles Delta Lake writes:
//   df.write.format("delta").save("/data/my_table")
//   -> OutputDataset with path-based identifier

// JDBC writes:
//   df.write.format("jdbc").option("url", jdbcUrl)
//     .option("dbtable", "schema.table").save()
//   -> OutputDataset with JDBC-derived identifier

// Kafka writes:
//   df.write.format("kafka").option("topic", "my-topic").save()
//   -> Delegates to KafkaRelationVisitor

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment