Implementation:Datahub project Datahub SparkPlanUtils
| Knowledge Sources | |
|---|---|
| Domains | Spark_Lineage, OpenLineage |
| Last Updated | 2026-02-10 00:00 GMT |
Overview
A utility class providing helper functions for traversing Spark logical plans, constructing OpenLineage facets, resolving HDFS paths with DataHub PathSpec support, and safely invoking Scala partial functions during lineage extraction.
Description
PlanUtils is a central utility class used throughout the Spark lineage extraction pipeline. It provides several categories of functionality:
Partial Function Merging: The merge method combines a collection of Scala PartialFunction instances into a single OpenLineageAbstractPartialFunction. This merged function tests each input against the provided functions in order until a match is found, aggregating all results from matching visitors. This is the core mechanism by which multiple plan visitors are composed together to handle different types of logical plan nodes.
Schema Construction: The schemaFacet method converts a Spark StructType into an OpenLineage SchemaDatasetFacet, and toStructType converts a list of Spark Attribute objects back into a StructType. These are used whenever schema information needs to be attached to dataset lineage events.
OpenLineage Facet Construction: The class provides factory methods for creating common OpenLineage facets:
datasourceFacet-- Creates aDatasourceDatasetFacetfrom a namespace URI.parentRunFacet-- Creates aParentRunFacetwith parent and root job/run information, supporting the OpenLineage run hierarchy model.
Path Resolution: Two implementations of directory path resolution are provided:
getDirectoryPathOl-- The original OpenLineage implementation that uses the Hadoop filesystem API to determine whether a path is a file (returning the parent) or a directory.getDirectoryPath-- A DataHub-specific replacement that readsspark.datahub.*configuration properties and applies DataHub PathSpec transformations viaHdfsPathDataset, enabling custom path normalization for lineage tracking.
Safe Invocation Helpers: The class provides defensive wrappers for Scala partial functions:
safeIsDefinedAt-- CallsisDefinedAton a partial function, catchingClassCastException, general exceptions, andNoClassDefFoundErrorto prevent visitor failures from propagating.safeIsInstanceOf-- A class-name-basedinstanceofcheck that does not fail if the target class is not on the classpath.safeApply-- Callsapplyon a partial function, returning an empty list on any error.
RDD Path Discovery: The findRDDPaths method collects distinct data location directories from a list of RDDs by delegating to RddPathUtils.
Usage
This utility class is used extensively by plan visitors (SaveIntoDataSourceCommandVisitor, KafkaRelationVisitor, etc.) and the core lineage extraction framework. The merge method is central to the visitor dispatch mechanism. The safe invocation helpers ensure robustness when dealing with Spark's heterogeneous classloader environment, where visitor functions may reference classes not available at runtime. Path resolution methods are used whenever file-based datasets are encountered.
Code Reference
Source Location
- Repository: Datahub_project_Datahub
- File: metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/PlanUtils.java
- Lines: 1-311
Signature
public class PlanUtils {
public static <T, D> OpenLineageAbstractPartialFunction<T, Collection<D>> merge(
Collection<? extends PartialFunction<T, ? extends Collection<D>>> fns);
public static OpenLineage.SchemaDatasetFacet schemaFacet(
OpenLineage openLineage, StructType structType);
public static StructType toStructType(List<Attribute> attributes);
public static String namespaceUri(URI outputPath);
public static OpenLineage.DatasourceDatasetFacet datasourceFacet(
OpenLineage openLineage, String namespaceUri);
public static OpenLineage.ParentRunFacet parentRunFacet(
UUID parentRunId, String parentJobName, String parentJobNamespace,
UUID rootParentRunId, String rootParentJobName, String rootParentJobNamespace);
public static Path getDirectoryPathOl(Path p, Configuration hadoopConf);
public static Path getDirectoryPath(Path p, Configuration hadoopConf);
public static List<Path> findRDDPaths(List<RDD<?>> fileRdds);
public static boolean safeIsInstanceOf(Object instance, String classCanonicalName);
public static boolean safeIsDefinedAt(PartialFunction pfn, Object x);
public static <T, D> List<T> safeApply(PartialFunction<D, List<T>> pfn, D x);
}
Import
import io.openlineage.spark.agent.util.PlanUtils;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| fns | Collection<PartialFunction<T, Collection<D>>> |
Yes | A collection of Scala partial functions (plan visitors) to be merged into a single dispatch function. |
| openLineage | OpenLineage |
Yes | The OpenLineage client instance used as a factory for constructing facet objects. |
| structType | StructType |
Yes | A Spark schema to convert into an OpenLineage schema facet. |
| attributes | List<Attribute> |
Yes | A list of Spark plan output attributes to convert into a StructType.
|
| outputPath | URI |
Yes | A URI from which to extract the namespace (scheme + authority). |
| p | Path |
Yes | A Hadoop Path to resolve to a directory path, potentially applying DataHub PathSpec transformations.
|
| hadoopConf | Configuration |
Yes | Hadoop configuration used for filesystem operations during path resolution. |
| parentRunId / rootParentRunId | UUID |
Yes | Run identifiers for constructing the parent run facet hierarchy. |
| parentJobName / rootParentJobName | String |
Yes | Job names for constructing the parent run facet hierarchy. |
| parentJobNamespace / rootParentJobNamespace | String |
Yes | Job namespaces for constructing the parent run facet hierarchy. |
| instance | Object |
Yes | An object to check against a class name in safeIsInstanceOf.
|
| classCanonicalName | String |
Yes | The canonical class name to check against in safeIsInstanceOf.
|
| pfn | PartialFunction |
Yes | A Scala partial function to safely invoke in safeIsDefinedAt or safeApply.
|
| fileRdds | List<RDD<?>> |
Yes | A list of RDDs from which to extract data location paths. |
Outputs
| Name | Type | Description |
|---|---|---|
| mergedFunction | OpenLineageAbstractPartialFunction<T, Collection<D>> |
A single partial function that dispatches to all provided visitors and aggregates results. |
| schemaFacet | OpenLineage.SchemaDatasetFacet |
An OpenLineage schema facet containing field names and types. |
| structType | StructType |
A Spark StructType constructed from the given attributes.
|
| namespaceUri | String |
A namespace URI string in the format scheme://authority or just scheme.
|
| datasourceFacet | OpenLineage.DatasourceDatasetFacet |
A datasource facet containing the URI and name of the data source. |
| parentRunFacet | OpenLineage.ParentRunFacet |
A parent run facet encoding the run/job hierarchy with root information. |
| directoryPath | Path |
A resolved directory path (parent if input is a file), potentially transformed by DataHub PathSpecs. |
| rddPaths | List<Path> |
A list of distinct directory paths found across the given RDDs. |
| safeIsInstanceOf result | boolean |
Whether the object is an instance of the named class, or false if the class is not found.
|
| safeIsDefinedAt result | boolean |
Whether the partial function is defined at the input, or false on any error.
|
| safeApply result | List<T> |
The result of applying the partial function, or an empty list on any error. |
Usage Examples
// Merge multiple plan visitors into a single dispatch function
Collection<PartialFunction<LogicalPlan, Collection<OutputDataset>>> visitors =
context.getOutputDatasetQueryPlanVisitors();
OpenLineageAbstractPartialFunction<LogicalPlan, Collection<OutputDataset>> merged =
PlanUtils.merge(visitors);
// Check and apply the merged function
LogicalPlan plan = queryExecution.optimizedPlan();
if (merged.isDefinedAt(plan)) {
Collection<OutputDataset> datasets = merged.apply(plan);
}
// Convert Spark schema to OpenLineage schema facet
StructType schema = dataFrame.schema();
OpenLineage.SchemaDatasetFacet schemaFacet =
PlanUtils.schemaFacet(openLineage, schema);
// Convert plan output attributes to StructType
List<Attribute> attrs = ScalaConversionUtils.fromSeq(plan.output());
StructType structType = PlanUtils.toStructType(attrs);
// Resolve a directory path with DataHub PathSpec support
Path inputPath = new Path("hdfs://namenode/data/warehouse/my_table/part-00000.parquet");
Path dirPath = PlanUtils.getDirectoryPath(inputPath, hadoopConf);
// Result may be transformed based on spark.datahub.* configuration
// Safely check if an object is an instance of a potentially missing class
boolean isBigQuery = PlanUtils.safeIsInstanceOf(
dataSource, "com.google.cloud.spark.bigquery.BigQueryRelationProvider");
// Construct a parent run facet for OpenLineage event hierarchy
OpenLineage.ParentRunFacet parentFacet = PlanUtils.parentRunFacet(
parentRunId, "parent_job", "namespace",
rootRunId, "root_job", "namespace");
Related Pages
- Environment:Datahub_project_Datahub_Spark_Lineage_Environment
- SaveIntoDataSourceCommandVisitor - Primary consumer of
PlanUtilsmethods for schema conversion and safe type checking - SparkOpenLineageExtensionVisitorWrapper - Works alongside plan utilities during the visitor dispatch pipeline