Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Datahub project Datahub MergeIntoCommandEdgeInputDatasetBuilder

From Leeroopedia
Revision as of 14:43, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Datahub_project_Datahub_MergeIntoCommandEdgeInputDatasetBuilder.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Spark_Lineage, OpenLineage
Last Updated 2026-02-10 00:00 GMT

Overview

MergeIntoCommandEdgeInputDatasetBuilder is a query plan input dataset builder in the io.openlineage.spark3.agent.lifecycle.plan package that extracts input datasets from Databricks-specific MergeIntoCommandEdge logical plan nodes. It extends AbstractQueryPlanInputDatasetBuilder<LogicalPlan> and uses reflection to access the target() and source() methods on the Databricks proprietary class, since that class is not available at compile time.

This builder handles the Databricks variant of Delta Lake's MERGE INTO command, which uses the MergeIntoCommandEdge class instead of the open-source MergeIntoCommand. When standard delegation fails to extract datasets from complex source plans (e.g., subqueries with DISTINCT or PROJECT nodes), the builder performs a breadth-first traversal of the logical plan tree to find datasets in child nodes.

Source file: metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark3/agent/lifecycle/plan/MergeIntoCommandEdgeInputDatasetBuilder.java (95 lines)

Code Reference

Class Declaration

@Slf4j
public class MergeIntoCommandEdgeInputDatasetBuilder
    extends AbstractQueryPlanInputDatasetBuilder<LogicalPlan> {

Constructor

public MergeIntoCommandEdgeInputDatasetBuilder(OpenLineageContext context) {
    super(context, false);
}

The false parameter disables the built-in recursive subquery traversal in the parent class, since this builder implements its own traversal logic.

Key Methods

isDefinedAtLogicalPlan

@Override
public boolean isDefinedAtLogicalPlan(LogicalPlan x) {
    return x.getClass()
        .getCanonicalName()
        .endsWith("sql.transaction.tahoe.commands.MergeIntoCommandEdge");
}

Matches by class name suffix rather than instanceof because the Databricks class is not on the compile-time classpath.

apply

@Override
protected List<InputDataset> apply(SparkListenerEvent event, LogicalPlan x)

Extracts input datasets by:

  1. Invoking target() and source() on the plan node via reflection (MethodUtils.invokeExactMethod).
  2. Delegating each result (if it is a LogicalPlan) to the standard dataset extraction framework.
  3. If the source delegation returns empty, falling back to extractInputDatasetsFromComplexSource for deeper traversal.

extractInputDatasetsFromComplexSource

private List<InputDataset> extractInputDatasetsFromComplexSource(
    LogicalPlan source, SparkListenerEvent event)

Performs a breadth-first traversal of the logical plan tree. For each node:

  • Attempts delegation to registered builders.
  • If no datasets are found, enqueues all child nodes for further traversal.
  • Stops descending a branch once datasets are successfully extracted.

I/O Contract

Direction Type Description
Input LogicalPlan (Databricks MergeIntoCommandEdge) A Databricks-specific Delta Lake MERGE INTO logical plan node.
Input SparkListenerEvent The Spark event that triggered lineage extraction.
Input OpenLineageContext Context with the OpenLineage client and registered dataset builders.
Output List<InputDataset> All input datasets found in both the target and source branches of the MERGE INTO command.

Usage Examples

This builder is registered in the OpenLineage Spark integration and invoked automatically when the plan walker encounters a MergeIntoCommandEdge node:

// Registration
List<AbstractQueryPlanInputDatasetBuilder<?>> builders = Arrays.asList(
    new MergeIntoCommandEdgeInputDatasetBuilder(context),
    new MergeIntoCommandInputDatasetBuilder(context),
    // ... other builders
);

// Automatic invocation during plan traversal:
// builder.isDefinedAtLogicalPlan(plan) -> true if class name ends with MergeIntoCommandEdge
// builder.apply(event, plan) -> extracts target + source input datasets

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment