Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Datahub project Datahub SparkPlanUtils

From Leeroopedia


Knowledge Sources
Domains Spark_Lineage, OpenLineage
Last Updated 2026-02-10 00:00 GMT

Overview

A utility class providing helper functions for traversing Spark logical plans, constructing OpenLineage facets, resolving HDFS paths with DataHub PathSpec support, and safely invoking Scala partial functions during lineage extraction.

Description

PlanUtils is a central utility class used throughout the Spark lineage extraction pipeline. It provides several categories of functionality:

Partial Function Merging: The merge method combines a collection of Scala PartialFunction instances into a single OpenLineageAbstractPartialFunction. This merged function tests each input against the provided functions in order until a match is found, aggregating all results from matching visitors. This is the core mechanism by which multiple plan visitors are composed together to handle different types of logical plan nodes.

Schema Construction: The schemaFacet method converts a Spark StructType into an OpenLineage SchemaDatasetFacet, and toStructType converts a list of Spark Attribute objects back into a StructType. These are used whenever schema information needs to be attached to dataset lineage events.

OpenLineage Facet Construction: The class provides factory methods for creating common OpenLineage facets:

  • datasourceFacet -- Creates a DatasourceDatasetFacet from a namespace URI.
  • parentRunFacet -- Creates a ParentRunFacet with parent and root job/run information, supporting the OpenLineage run hierarchy model.

Path Resolution: Two implementations of directory path resolution are provided:

  • getDirectoryPathOl -- The original OpenLineage implementation that uses the Hadoop filesystem API to determine whether a path is a file (returning the parent) or a directory.
  • getDirectoryPath -- A DataHub-specific replacement that reads spark.datahub.* configuration properties and applies DataHub PathSpec transformations via HdfsPathDataset, enabling custom path normalization for lineage tracking.

Safe Invocation Helpers: The class provides defensive wrappers for Scala partial functions:

  • safeIsDefinedAt -- Calls isDefinedAt on a partial function, catching ClassCastException, general exceptions, and NoClassDefFoundError to prevent visitor failures from propagating.
  • safeIsInstanceOf -- A class-name-based instanceof check that does not fail if the target class is not on the classpath.
  • safeApply -- Calls apply on a partial function, returning an empty list on any error.

RDD Path Discovery: The findRDDPaths method collects distinct data location directories from a list of RDDs by delegating to RddPathUtils.

Usage

This utility class is used extensively by plan visitors (SaveIntoDataSourceCommandVisitor, KafkaRelationVisitor, etc.) and the core lineage extraction framework. The merge method is central to the visitor dispatch mechanism. The safe invocation helpers ensure robustness when dealing with Spark's heterogeneous classloader environment, where visitor functions may reference classes not available at runtime. Path resolution methods are used whenever file-based datasets are encountered.

Code Reference

Source Location

Signature

public class PlanUtils {

    public static <T, D> OpenLineageAbstractPartialFunction<T, Collection<D>> merge(
        Collection<? extends PartialFunction<T, ? extends Collection<D>>> fns);

    public static OpenLineage.SchemaDatasetFacet schemaFacet(
        OpenLineage openLineage, StructType structType);

    public static StructType toStructType(List<Attribute> attributes);

    public static String namespaceUri(URI outputPath);

    public static OpenLineage.DatasourceDatasetFacet datasourceFacet(
        OpenLineage openLineage, String namespaceUri);

    public static OpenLineage.ParentRunFacet parentRunFacet(
        UUID parentRunId, String parentJobName, String parentJobNamespace,
        UUID rootParentRunId, String rootParentJobName, String rootParentJobNamespace);

    public static Path getDirectoryPathOl(Path p, Configuration hadoopConf);

    public static Path getDirectoryPath(Path p, Configuration hadoopConf);

    public static List<Path> findRDDPaths(List<RDD<?>> fileRdds);

    public static boolean safeIsInstanceOf(Object instance, String classCanonicalName);

    public static boolean safeIsDefinedAt(PartialFunction pfn, Object x);

    public static <T, D> List<T> safeApply(PartialFunction<D, List<T>> pfn, D x);
}

Import

import io.openlineage.spark.agent.util.PlanUtils;

I/O Contract

Inputs

Name Type Required Description
fns Collection<PartialFunction<T, Collection<D>>> Yes A collection of Scala partial functions (plan visitors) to be merged into a single dispatch function.
openLineage OpenLineage Yes The OpenLineage client instance used as a factory for constructing facet objects.
structType StructType Yes A Spark schema to convert into an OpenLineage schema facet.
attributes List<Attribute> Yes A list of Spark plan output attributes to convert into a StructType.
outputPath URI Yes A URI from which to extract the namespace (scheme + authority).
p Path Yes A Hadoop Path to resolve to a directory path, potentially applying DataHub PathSpec transformations.
hadoopConf Configuration Yes Hadoop configuration used for filesystem operations during path resolution.
parentRunId / rootParentRunId UUID Yes Run identifiers for constructing the parent run facet hierarchy.
parentJobName / rootParentJobName String Yes Job names for constructing the parent run facet hierarchy.
parentJobNamespace / rootParentJobNamespace String Yes Job namespaces for constructing the parent run facet hierarchy.
instance Object Yes An object to check against a class name in safeIsInstanceOf.
classCanonicalName String Yes The canonical class name to check against in safeIsInstanceOf.
pfn PartialFunction Yes A Scala partial function to safely invoke in safeIsDefinedAt or safeApply.
fileRdds List<RDD<?>> Yes A list of RDDs from which to extract data location paths.

Outputs

Name Type Description
mergedFunction OpenLineageAbstractPartialFunction<T, Collection<D>> A single partial function that dispatches to all provided visitors and aggregates results.
schemaFacet OpenLineage.SchemaDatasetFacet An OpenLineage schema facet containing field names and types.
structType StructType A Spark StructType constructed from the given attributes.
namespaceUri String A namespace URI string in the format scheme://authority or just scheme.
datasourceFacet OpenLineage.DatasourceDatasetFacet A datasource facet containing the URI and name of the data source.
parentRunFacet OpenLineage.ParentRunFacet A parent run facet encoding the run/job hierarchy with root information.
directoryPath Path A resolved directory path (parent if input is a file), potentially transformed by DataHub PathSpecs.
rddPaths List<Path> A list of distinct directory paths found across the given RDDs.
safeIsInstanceOf result boolean Whether the object is an instance of the named class, or false if the class is not found.
safeIsDefinedAt result boolean Whether the partial function is defined at the input, or false on any error.
safeApply result List<T> The result of applying the partial function, or an empty list on any error.

Usage Examples

// Merge multiple plan visitors into a single dispatch function
Collection<PartialFunction<LogicalPlan, Collection<OutputDataset>>> visitors =
    context.getOutputDatasetQueryPlanVisitors();
OpenLineageAbstractPartialFunction<LogicalPlan, Collection<OutputDataset>> merged =
    PlanUtils.merge(visitors);

// Check and apply the merged function
LogicalPlan plan = queryExecution.optimizedPlan();
if (merged.isDefinedAt(plan)) {
    Collection<OutputDataset> datasets = merged.apply(plan);
}

// Convert Spark schema to OpenLineage schema facet
StructType schema = dataFrame.schema();
OpenLineage.SchemaDatasetFacet schemaFacet =
    PlanUtils.schemaFacet(openLineage, schema);

// Convert plan output attributes to StructType
List<Attribute> attrs = ScalaConversionUtils.fromSeq(plan.output());
StructType structType = PlanUtils.toStructType(attrs);

// Resolve a directory path with DataHub PathSpec support
Path inputPath = new Path("hdfs://namenode/data/warehouse/my_table/part-00000.parquet");
Path dirPath = PlanUtils.getDirectoryPath(inputPath, hadoopConf);
// Result may be transformed based on spark.datahub.* configuration

// Safely check if an object is an instance of a potentially missing class
boolean isBigQuery = PlanUtils.safeIsInstanceOf(
    dataSource, "com.google.cloud.spark.bigquery.BigQueryRelationProvider");

// Construct a parent run facet for OpenLineage event hierarchy
OpenLineage.ParentRunFacet parentFacet = PlanUtils.parentRunFacet(
    parentRunId, "parent_job", "namespace",
    rootRunId, "root_job", "namespace");

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment