Implementation:Datahub project Datahub SparkOpenLineageExtensionVisitorWrapper
| Knowledge Sources | |
|---|---|
| Domains | Spark_Lineage, OpenLineage |
| Last Updated | 2026-02-10 00:00 GMT |
Overview
A reflection-based wrapper that discovers and invokes OpenLineage extension visitors loaded via the Java ServiceLoader mechanism, enabling Spark connectors to provide custom lineage extraction logic through the spark-extension-interfaces package.
Description
SparkOpenLineageExtensionVisitorWrapper serves as the bridge between the OpenLineage Spark agent and third-party Spark connectors that implement the OpenLineageExtensionProvider interface. Since connectors may be loaded by different classloaders than the OpenLineage agent itself, this class handles the complex classloader management required to discover and invoke extension visitors across classloader boundaries.
At initialization, the wrapper:
- Collects all available classloaders from running threads.
- Attempts to ensure the
OpenLineageExtensionProviderclass is available in each classloader (using safe, non-reflective approaches). - Creates an
ExtensionClassloaderthat aggregates all available classloaders. - Uses
ServiceLoaderto discover allOpenLineageExtensionProviderimplementations and instantiate their visitor classes.
Once initialized, the wrapper provides methods to check whether an extension can handle a given lineage node (isDefinedAt) and to extract dataset identifiers, input datasets, and output datasets from those nodes via reflection-based method invocation. All method calls on extension objects are performed via java.lang.reflect.Method to avoid classloader conflicts.
The class also contains Jackson mixins (DatasetIdentifierMixin and SymlinkMixin) that enable proper JSON deserialization of DatasetIdentifier and Symlink objects returned by extension visitors.
Usage
This class is used internally by the OpenLineage Spark agent whenever a Spark logical plan node is encountered that may be handled by a connector-provided extension. It is constructed once per agent lifecycle with a SparkOpenLineageConfig and then queried during plan traversal. Consumers such as SaveIntoDataSourceCommandVisitor call isDefinedAt to check if an extension handles a particular data source, and then call getLineageDatasetIdentifier, getInputs, or getOutputs to extract lineage information.
Code Reference
Source Location
- Repository: Datahub_project_Datahub
- File: metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/lifecycle/SparkOpenLineageExtensionVisitorWrapper.java
- Lines: 1-378
Signature
public final class SparkOpenLineageExtensionVisitorWrapper {
public SparkOpenLineageExtensionVisitorWrapper(SparkOpenLineageConfig config);
public boolean isDefinedAt(Object object);
public DatasetIdentifier getLineageDatasetIdentifier(
Object lineageNode, String sparkListenerEventName,
Object sqlContext, Object parameters);
public DatasetIdentifier getLineageDatasetIdentifier(
Object lineageNode, String sparkListenerEventName);
public Pair<List<InputDataset>, List<Object>> getInputs(
Object lineageNode, String sparkListenerEventName);
public Pair<List<OpenLineage.OutputDataset>, List<Object>> getOutputs(
Object lineageNode, String sparkListenerEventName);
}
Import
import io.openlineage.spark.agent.lifecycle.SparkOpenLineageExtensionVisitorWrapper;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| config | SparkOpenLineageConfig |
Yes | Configuration object containing optional test extension provider name used to filter which extensions are loaded. |
| lineageNode | Object |
Yes | A Spark logical plan node or data source object to be inspected by extension visitors. |
| sparkListenerEventName | String |
Yes | The fully-qualified class name of the Spark listener event that triggered the lineage extraction. |
| sqlContext | Object |
No | The Spark SQL context, passed to the four-argument apply method of extension visitors.
|
| parameters | Object |
No | Additional parameters (e.g., command options), passed to the four-argument apply method.
|
Outputs
| Name | Type | Description |
|---|---|---|
| isDefinedAt result | boolean |
Whether any loaded extension visitor can handle the given lineage node. |
| datasetIdentifier | DatasetIdentifier |
A dataset identifier (name + namespace + symlinks) extracted from the lineage node, or null if no extension could produce one.
|
| inputs | Pair<List<InputDataset>, List<Object>> |
A pair of input datasets and delegate nodes returned by the extension visitor's apply method.
|
| outputs | Pair<List<OutputDataset>, List<Object>> |
A pair of output datasets and delegate nodes returned by the extension visitor's apply method.
|
Usage Examples
// Construct the wrapper during agent initialization
SparkOpenLineageConfig config = new SparkOpenLineageConfig();
SparkOpenLineageExtensionVisitorWrapper wrapper =
new SparkOpenLineageExtensionVisitorWrapper(config);
// Check if an extension handles a particular data source
Object dataSource = command.dataSource();
if (wrapper.isDefinedAt(dataSource)) {
// Extract dataset identifier using the four-argument overload
DatasetIdentifier identifier = wrapper.getLineageDatasetIdentifier(
dataSource,
event.getClass().getName(),
sparkSession.sqlContext(),
command.options()
);
if (identifier != null) {
// Use the identifier to construct an OpenLineage dataset
System.out.println("Dataset: " + identifier.getNamespace()
+ "/" + identifier.getName());
}
}
// Extract input datasets from a lineage node
Pair<List<InputDataset>, List<Object>> inputs =
wrapper.getInputs(lineageNode, "org.apache.spark.sql.execution.QueryExecution");
// Extract output datasets from a lineage node
Pair<List<OpenLineage.OutputDataset>, List<Object>> outputs =
wrapper.getOutputs(lineageNode, "org.apache.spark.sql.execution.QueryExecution");
Related Pages
- Environment:Datahub_project_Datahub_Spark_Lineage_Environment
- SaveIntoDataSourceCommandVisitor - Uses this wrapper to check and extract lineage from extension-provided data sources
- PlanUtils - Utility functions used alongside this wrapper during plan traversal