Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Datahub project Datahub SparkDatasetExtractor

From Leeroopedia


Knowledge Sources
Domains Spark_Lineage
Last Updated 2026-02-10 00:00 GMT

Overview

DEPRECATED: This implementation is part of the legacy spark-lineage-legacy module and has been superseded by acryl-spark-lineage. See Heuristic:Datahub_project_Datahub_Warning_Deprecated_Spark_Lineage_Legacy for migration guidance.

A static utility class that maps Apache Spark LogicalPlan nodes, SparkPlan nodes, and BaseRelation objects to DataHub SparkDataset instances, enabling dataset extraction from Spark query execution plans for lineage tracking.

Description

DatasetExtractor is the core dataset identification component of the legacy Spark lineage listener. It uses three static maps to dispatch plan nodes to dataset extraction logic:

PLAN_TO_DATASET -- Maps LogicalPlan subclasses to dataset extractors:

  • InsertIntoHadoopFsRelationCommand -- Extracts HDFS path or catalog table datasets
  • LogicalRelation -- Delegates to REL_TO_DATASET based on the underlying BaseRelation
  • SaveIntoDataSourceCommand -- Extracts JDBC or HDFS path datasets from command options
  • CreateDataSourceTableAsSelectCommand -- Extracts catalog table datasets
  • CreateHiveTableAsSelectCommand -- Extracts Hive catalog table datasets
  • InsertIntoHiveTable -- Extracts Hive table datasets
  • HiveTableRelation -- Extracts Hive table metadata
  • InMemoryRelation -- Traverses cached plan leaves via SPARKPLAN_TO_DATASET

SPARKPLAN_TO_DATASET -- Maps physical SparkPlan nodes:

  • FileSourceScanExec -- Delegates to REL_TO_DATASET
  • HiveTableScanExec -- Delegates to PLAN_TO_DATASET
  • RowDataSourceScanExec -- Delegates to REL_TO_DATASET
  • InMemoryTableScanExec -- Delegates to PLAN_TO_DATASET

REL_TO_DATASET -- Maps BaseRelation types:

  • HadoopFsRelation -- Extracts root paths, resolves directories
  • JDBCRelation -- Extracts JDBC URL and table name

Output filtering: The OUTPUT_CMD set ensures that output commands (insert/create statements) are only returned when outputNode=true, preventing them from being double-counted as inputs.

Configuration: Dataset properties (environment, platform instance, Hive platform alias, include scheme, partition pattern removal) are read from the DataHub Spark config.

Usage

Use DatasetExtractor within the DatahubSparkListener to identify input and output datasets from Spark SQL execution plans. It is called during SqlStartTask.run() to build DatasetLineage objects.

Code Reference

Source Location

Signature

@Slf4j
public class DatasetExtractor {

    // Primary entry point
    static Optional<? extends Collection<SparkDataset>> asDataset(
        LogicalPlan logicalPlan, SparkContext ctx, boolean outputNode);

    // Internal interfaces
    private static interface PlanToDataset {
        Optional<? extends Collection<SparkDataset>> fromPlanNode(
            LogicalPlan plan, SparkContext ctx, Config datahubConfig);
    }

    private static interface RelationToDataset {
        Optional<? extends Collection<SparkDataset>> fromRelation(
            BaseRelation rel, SparkContext ctx, Config datahubConfig);
    }

    private static interface SparkPlanToDataset {
        Optional<? extends Collection<SparkDataset>> fromSparkPlanNode(
            SparkPlan plan, SparkContext ctx, Config datahubConfig);
    }
}

Import

import datahub.spark.DatasetExtractor;

I/O Contract

Input Type Description
logicalPlan LogicalPlan Spark SQL optimized logical plan node
ctx SparkContext Active Spark context for Hadoop config access
outputNode boolean True if extracting output (sink) datasets
Output Type Description
SparkDataset collection Optional<Collection<SparkDataset>> Extracted datasets: HdfsPathDataset, JdbcDataset, or CatalogTableDataset

Usage Examples

// Extract output datasets from a logical plan
Optional<? extends Collection<SparkDataset>> outputDS =
    DatasetExtractor.asDataset(optimizedPlan, sparkContext, true);

// Extract input datasets from a child plan node
Optional<? extends Collection<SparkDataset>> inputDS =
    DatasetExtractor.asDataset(childPlan, sparkContext, false);

// Datasets can be HdfsPathDataset, JdbcDataset, or CatalogTableDataset
outputDS.ifPresent(datasets -> {
    for (SparkDataset ds : datasets) {
        System.out.println("Dataset URN: " + ds.urn());
    }
});

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment