Implementation:Datahub project Datahub SparkDatasetExtractor
| Knowledge Sources | |
|---|---|
| Domains | Spark_Lineage |
| Last Updated | 2026-02-10 00:00 GMT |
Overview
DEPRECATED: This implementation is part of the legacy spark-lineage-legacy module and has been superseded by acryl-spark-lineage. See Heuristic:Datahub_project_Datahub_Warning_Deprecated_Spark_Lineage_Legacy for migration guidance.
A static utility class that maps Apache Spark LogicalPlan nodes, SparkPlan nodes, and BaseRelation objects to DataHub SparkDataset instances, enabling dataset extraction from Spark query execution plans for lineage tracking.
Description
DatasetExtractor is the core dataset identification component of the legacy Spark lineage listener. It uses three static maps to dispatch plan nodes to dataset extraction logic:
PLAN_TO_DATASET -- Maps LogicalPlan subclasses to dataset extractors:
InsertIntoHadoopFsRelationCommand-- Extracts HDFS path or catalog table datasetsLogicalRelation-- Delegates toREL_TO_DATASETbased on the underlyingBaseRelationSaveIntoDataSourceCommand-- Extracts JDBC or HDFS path datasets from command optionsCreateDataSourceTableAsSelectCommand-- Extracts catalog table datasetsCreateHiveTableAsSelectCommand-- Extracts Hive catalog table datasetsInsertIntoHiveTable-- Extracts Hive table datasetsHiveTableRelation-- Extracts Hive table metadataInMemoryRelation-- Traverses cached plan leaves viaSPARKPLAN_TO_DATASET
SPARKPLAN_TO_DATASET -- Maps physical SparkPlan nodes:
FileSourceScanExec-- Delegates toREL_TO_DATASETHiveTableScanExec-- Delegates toPLAN_TO_DATASETRowDataSourceScanExec-- Delegates toREL_TO_DATASETInMemoryTableScanExec-- Delegates toPLAN_TO_DATASET
REL_TO_DATASET -- Maps BaseRelation types:
HadoopFsRelation-- Extracts root paths, resolves directoriesJDBCRelation-- Extracts JDBC URL and table name
Output filtering: The OUTPUT_CMD set ensures that output commands (insert/create statements) are only returned when outputNode=true, preventing them from being double-counted as inputs.
Configuration: Dataset properties (environment, platform instance, Hive platform alias, include scheme, partition pattern removal) are read from the DataHub Spark config.
Usage
Use DatasetExtractor within the DatahubSparkListener to identify input and output datasets from Spark SQL execution plans. It is called during SqlStartTask.run() to build DatasetLineage objects.
Code Reference
Source Location
- Repository: Datahub_project_Datahub
- File: metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/DatasetExtractor.java
Signature
@Slf4j
public class DatasetExtractor {
// Primary entry point
static Optional<? extends Collection<SparkDataset>> asDataset(
LogicalPlan logicalPlan, SparkContext ctx, boolean outputNode);
// Internal interfaces
private static interface PlanToDataset {
Optional<? extends Collection<SparkDataset>> fromPlanNode(
LogicalPlan plan, SparkContext ctx, Config datahubConfig);
}
private static interface RelationToDataset {
Optional<? extends Collection<SparkDataset>> fromRelation(
BaseRelation rel, SparkContext ctx, Config datahubConfig);
}
private static interface SparkPlanToDataset {
Optional<? extends Collection<SparkDataset>> fromSparkPlanNode(
SparkPlan plan, SparkContext ctx, Config datahubConfig);
}
}
Import
import datahub.spark.DatasetExtractor;
I/O Contract
| Input | Type | Description |
|---|---|---|
| logicalPlan | LogicalPlan |
Spark SQL optimized logical plan node |
| ctx | SparkContext |
Active Spark context for Hadoop config access |
| outputNode | boolean |
True if extracting output (sink) datasets |
| Output | Type | Description |
|---|---|---|
| SparkDataset collection | Optional<Collection<SparkDataset>> |
Extracted datasets: HdfsPathDataset, JdbcDataset, or CatalogTableDataset
|
Usage Examples
// Extract output datasets from a logical plan
Optional<? extends Collection<SparkDataset>> outputDS =
DatasetExtractor.asDataset(optimizedPlan, sparkContext, true);
// Extract input datasets from a child plan node
Optional<? extends Collection<SparkDataset>> inputDS =
DatasetExtractor.asDataset(childPlan, sparkContext, false);
// Datasets can be HdfsPathDataset, JdbcDataset, or CatalogTableDataset
outputDS.ifPresent(datasets -> {
for (SparkDataset ds : datasets) {
System.out.println("Dataset URN: " + ds.urn());
}
});