Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Datahub project Datahub SparkOpenLineageExtensionVisitorWrapper

From Leeroopedia
Revision as of 14:44, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Datahub_project_Datahub_SparkOpenLineageExtensionVisitorWrapper.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Spark_Lineage, OpenLineage
Last Updated 2026-02-10 00:00 GMT

Overview

A reflection-based wrapper that discovers and invokes OpenLineage extension visitors loaded via the Java ServiceLoader mechanism, enabling Spark connectors to provide custom lineage extraction logic through the spark-extension-interfaces package.

Description

SparkOpenLineageExtensionVisitorWrapper serves as the bridge between the OpenLineage Spark agent and third-party Spark connectors that implement the OpenLineageExtensionProvider interface. Since connectors may be loaded by different classloaders than the OpenLineage agent itself, this class handles the complex classloader management required to discover and invoke extension visitors across classloader boundaries.

At initialization, the wrapper:

  1. Collects all available classloaders from running threads.
  2. Attempts to ensure the OpenLineageExtensionProvider class is available in each classloader (using safe, non-reflective approaches).
  3. Creates an ExtensionClassloader that aggregates all available classloaders.
  4. Uses ServiceLoader to discover all OpenLineageExtensionProvider implementations and instantiate their visitor classes.

Once initialized, the wrapper provides methods to check whether an extension can handle a given lineage node (isDefinedAt) and to extract dataset identifiers, input datasets, and output datasets from those nodes via reflection-based method invocation. All method calls on extension objects are performed via java.lang.reflect.Method to avoid classloader conflicts.

The class also contains Jackson mixins (DatasetIdentifierMixin and SymlinkMixin) that enable proper JSON deserialization of DatasetIdentifier and Symlink objects returned by extension visitors.

Usage

This class is used internally by the OpenLineage Spark agent whenever a Spark logical plan node is encountered that may be handled by a connector-provided extension. It is constructed once per agent lifecycle with a SparkOpenLineageConfig and then queried during plan traversal. Consumers such as SaveIntoDataSourceCommandVisitor call isDefinedAt to check if an extension handles a particular data source, and then call getLineageDatasetIdentifier, getInputs, or getOutputs to extract lineage information.

Code Reference

Source Location

Signature

public final class SparkOpenLineageExtensionVisitorWrapper {

    public SparkOpenLineageExtensionVisitorWrapper(SparkOpenLineageConfig config);

    public boolean isDefinedAt(Object object);

    public DatasetIdentifier getLineageDatasetIdentifier(
        Object lineageNode, String sparkListenerEventName,
        Object sqlContext, Object parameters);

    public DatasetIdentifier getLineageDatasetIdentifier(
        Object lineageNode, String sparkListenerEventName);

    public Pair<List<InputDataset>, List<Object>> getInputs(
        Object lineageNode, String sparkListenerEventName);

    public Pair<List<OpenLineage.OutputDataset>, List<Object>> getOutputs(
        Object lineageNode, String sparkListenerEventName);
}

Import

import io.openlineage.spark.agent.lifecycle.SparkOpenLineageExtensionVisitorWrapper;

I/O Contract

Inputs

Name Type Required Description
config SparkOpenLineageConfig Yes Configuration object containing optional test extension provider name used to filter which extensions are loaded.
lineageNode Object Yes A Spark logical plan node or data source object to be inspected by extension visitors.
sparkListenerEventName String Yes The fully-qualified class name of the Spark listener event that triggered the lineage extraction.
sqlContext Object No The Spark SQL context, passed to the four-argument apply method of extension visitors.
parameters Object No Additional parameters (e.g., command options), passed to the four-argument apply method.

Outputs

Name Type Description
isDefinedAt result boolean Whether any loaded extension visitor can handle the given lineage node.
datasetIdentifier DatasetIdentifier A dataset identifier (name + namespace + symlinks) extracted from the lineage node, or null if no extension could produce one.
inputs Pair<List<InputDataset>, List<Object>> A pair of input datasets and delegate nodes returned by the extension visitor's apply method.
outputs Pair<List<OutputDataset>, List<Object>> A pair of output datasets and delegate nodes returned by the extension visitor's apply method.

Usage Examples

// Construct the wrapper during agent initialization
SparkOpenLineageConfig config = new SparkOpenLineageConfig();
SparkOpenLineageExtensionVisitorWrapper wrapper =
    new SparkOpenLineageExtensionVisitorWrapper(config);

// Check if an extension handles a particular data source
Object dataSource = command.dataSource();
if (wrapper.isDefinedAt(dataSource)) {
    // Extract dataset identifier using the four-argument overload
    DatasetIdentifier identifier = wrapper.getLineageDatasetIdentifier(
        dataSource,
        event.getClass().getName(),
        sparkSession.sqlContext(),
        command.options()
    );

    if (identifier != null) {
        // Use the identifier to construct an OpenLineage dataset
        System.out.println("Dataset: " + identifier.getNamespace()
            + "/" + identifier.getName());
    }
}

// Extract input datasets from a lineage node
Pair<List<InputDataset>, List<Object>> inputs =
    wrapper.getInputs(lineageNode, "org.apache.spark.sql.execution.QueryExecution");

// Extract output datasets from a lineage node
Pair<List<OpenLineage.OutputDataset>, List<Object>> outputs =
    wrapper.getOutputs(lineageNode, "org.apache.spark.sql.execution.QueryExecution");

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment