Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Datahub project Datahub MergeIntoCommandInputDatasetBuilder

From Leeroopedia


Knowledge Sources
Domains Spark_Lineage, OpenLineage
Last Updated 2026-02-10 00:00 GMT

Overview

MergeIntoCommandInputDatasetBuilder is a query plan input dataset builder in the io.openlineage.spark3.agent.lifecycle.plan package that extracts input datasets from open-source Delta Lake MergeIntoCommand logical plan nodes. It extends AbstractQueryPlanInputDatasetBuilder<MergeIntoCommand> and directly accesses the target() and source() methods on the Delta Lake class (which is available at compile time, unlike the Databricks variant).

This builder enables recursive subquery traversal (via the true parameter to the superclass constructor) and includes a fallback mechanism for complex source plans where standard delegation does not capture all input datasets.

Source file: metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark3/agent/lifecycle/plan/MergeIntoCommandInputDatasetBuilder.java (85 lines)

Code Reference

Class Declaration

@Slf4j
public class MergeIntoCommandInputDatasetBuilder
    extends AbstractQueryPlanInputDatasetBuilder<MergeIntoCommand> {

Constructor

public MergeIntoCommandInputDatasetBuilder(OpenLineageContext context) {
    super(context, true); // Enables recursive traversal of subqueries
}

The true parameter enables the parent class's built-in recursive subquery traversal, ensuring that nested subqueries within the MERGE INTO source are processed.

Key Methods

isDefinedAtLogicalPlan

@Override
public boolean isDefinedAtLogicalPlan(LogicalPlan x) {
    return x instanceof MergeIntoCommand;
}

Uses instanceof for type checking since the open-source MergeIntoCommand class is available at compile time.

apply

@Override
protected List<OpenLineage.InputDataset> apply(SparkListenerEvent event, MergeIntoCommand x)

Extracts input datasets by:

  1. Delegating the target table (x.target()) to the standard dataset extraction framework.
  2. Delegating the source plan (x.source()) for recursive processing.
  3. If source delegation returns empty, falling back to extractInputDatasetsFromComplexSource for deeper plan tree traversal.

extractInputDatasetsFromComplexSource

private List<OpenLineage.InputDataset> extractInputDatasetsFromComplexSource(
    LogicalPlan source, SparkListenerEvent event)

Performs a breadth-first traversal of the logical plan tree using a queue. For each node:

  • Attempts delegation to registered dataset builders.
  • If delegation produces results, those are collected and the branch is not traversed further.
  • If delegation produces no results, all child nodes are enqueued for further processing.

This handles cases where intermediate logical plan nodes (such as DISTINCT, PROJECT, or FILTER) do not have registered builders, but their leaf children (table scans) do.

I/O Contract

Direction Type Description
Input MergeIntoCommand An open-source Delta Lake MERGE INTO logical plan node.
Input SparkListenerEvent The Spark event that triggered lineage extraction.
Input OpenLineageContext Context with the OpenLineage client and registered dataset builders.
Output List<OpenLineage.InputDataset> All input datasets from both the target and source branches of the MERGE INTO command.

Usage Examples

This builder is automatically registered and invoked during plan traversal:

// Registration alongside the Databricks variant
List<AbstractQueryPlanInputDatasetBuilder<?>> builders = Arrays.asList(
    new MergeIntoCommandInputDatasetBuilder(context),  // open-source Delta Lake
    new MergeIntoCommandEdgeInputDatasetBuilder(context), // Databricks Delta Lake
    // ... other builders
);

// During plan traversal:
// builder.isDefinedAtLogicalPlan(plan) -> true if plan instanceof MergeIntoCommand
// builder.apply(event, mergeCommand) -> extracts all input datasets

A typical MERGE INTO operation processes both target and source:

// MERGE INTO target_table USING source_table ON condition
// WHEN MATCHED THEN UPDATE ...
// WHEN NOT MATCHED THEN INSERT ...

// Builder extracts:
// 1. target_table as InputDataset (via delegate(x.target(), event))
// 2. source_table as InputDataset (via delegate(x.source(), event))
// For complex sources (subqueries), breadth-first traversal finds leaf table scans

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment