Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:DataTalksClub Data engineering zoomcamp Kestra SQL Merge

From Leeroopedia


Metadata
Knowledge Sources repo: DataTalksClub/data-engineering-zoomcamp, docs: Kestra Documentation, PostgreSQL MERGE Documentation
Domains Data Loading, Idempotency, SQL, Upsert
Last Updated 2026-02-09 14:00 GMT

Overview

Concrete tool for performing idempotent data loading from staging to production tables using the SQL MERGE statement executed through the Kestra PostgreSQL Queries plugin, ensuring safe pipeline re-runs by inserting only non-duplicate records.

Description

This implementation uses io.kestra.plugin.jdbc.postgresql.Queries to execute a MERGE INTO...USING...ON...WHEN NOT MATCHED THEN INSERT statement that promotes data from the staging table to the production table. The MERGE matches rows based on the unique_row_id column (the MD5 hash computed in the previous deduplication step).

Key characteristics:

  • Insert-only semantics -- the MERGE uses only the WHEN NOT MATCHED THEN INSERT clause. There is no WHEN MATCHED THEN UPDATE clause, meaning existing records in the production table are never modified.
  • Full column projection -- all columns from the staging table are explicitly listed in both the INSERT column list and the VALUES clause, ensuring no columns are missed or misaligned.
  • PostgreSQL 15+ required -- the MERGE statement is only available in PostgreSQL 15 and later. The Docker Compose stack provisions postgres:18, which supports this feature.

The column lists differ between yellow (20 columns) and green (22 columns) taxi datasets, handled by separate MERGE statements within their respective conditional branches.

Usage

This task runs after the MD5 deduplication step has populated unique_row_id on all staging rows, and before the file cleanup step. It is the final data movement operation in the pipeline.

Code Reference

Source Location: 02-workflow-orchestration/flows/04_postgres_taxi.yaml, Lines 132-150 (yellow), Lines 241-259 (green)

Signature (yellow taxi):

      - id: yellow_merge_data
        type: io.kestra.plugin.jdbc.postgresql.Queries
        sql: |
          MERGE INTO {{render(vars.table)}} AS T
          USING {{render(vars.staging_table)}} AS S
          ON T.unique_row_id = S.unique_row_id
          WHEN NOT MATCHED THEN
            INSERT (
              unique_row_id, filename, VendorID, tpep_pickup_datetime, tpep_dropoff_datetime,
              passenger_count, trip_distance, RatecodeID, store_and_fwd_flag, PULocationID,
              DOLocationID, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount,
              improvement_surcharge, total_amount, congestion_surcharge
            )
            VALUES (
              S.unique_row_id, S.filename, S.VendorID, S.tpep_pickup_datetime, S.tpep_dropoff_datetime,
              S.passenger_count, S.trip_distance, S.RatecodeID, S.store_and_fwd_flag, S.PULocationID,
              S.DOLocationID, S.payment_type, S.fare_amount, S.extra, S.mta_tax, S.tip_amount, S.tolls_amount,
              S.improvement_surcharge, S.total_amount, S.congestion_surcharge
            );

Signature (green taxi):

      - id: green_merge_data
        type: io.kestra.plugin.jdbc.postgresql.Queries
        sql: |
          MERGE INTO {{render(vars.table)}} AS T
          USING {{render(vars.staging_table)}} AS S
          ON T.unique_row_id = S.unique_row_id
          WHEN NOT MATCHED THEN
            INSERT (
              unique_row_id, filename, VendorID, lpep_pickup_datetime, lpep_dropoff_datetime,
              store_and_fwd_flag, RatecodeID, PULocationID, DOLocationID, passenger_count,
              trip_distance, fare_amount, extra, mta_tax, tip_amount, tolls_amount, ehail_fee,
              improvement_surcharge, total_amount, payment_type, trip_type, congestion_surcharge
            )
            VALUES (
              S.unique_row_id, S.filename, S.VendorID, S.lpep_pickup_datetime, S.lpep_dropoff_datetime,
              S.store_and_fwd_flag, S.RatecodeID, S.PULocationID, S.DOLocationID, S.passenger_count,
              S.trip_distance, S.fare_amount, S.extra, S.mta_tax, S.tip_amount, S.tolls_amount, S.ehail_fee,
              S.improvement_surcharge, S.total_amount, S.payment_type, S.trip_type, S.congestion_surcharge
            );

Import: Requires the io.kestra.plugin.jdbc.postgresql plugin. Connection defaults inherited from flow-level pluginDefaults.

I/O Contract

Inputs:

Name Type Description
vars.table Rendered template Production table: public.yellow_tripdata or public.green_tripdata
vars.staging_table Rendered template Source staging table: public.yellow_tripdata_staging or public.green_tripdata_staging
Staging data PostgreSQL rows Rows with populated unique_row_id and filename from the deduplication step
Production data PostgreSQL rows Previously loaded rows (or empty table on first run)

Outputs:

Name Type Description
Updated production table PostgreSQL rows Production table now contains the union of previously existing rows and new (non-duplicate) rows from staging
Rows inserted Integer Count of new rows added to production (available in execution metrics)
Rows matched Integer Count of rows that already existed in production (skipped)

Usage Examples

Resolved SQL for yellow taxi MERGE:

MERGE INTO public.yellow_tripdata AS T
USING public.yellow_tripdata_staging AS S
ON T.unique_row_id = S.unique_row_id
WHEN NOT MATCHED THEN
  INSERT (
    unique_row_id, filename, VendorID, tpep_pickup_datetime, tpep_dropoff_datetime,
    passenger_count, trip_distance, RatecodeID, store_and_fwd_flag, PULocationID,
    DOLocationID, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount,
    improvement_surcharge, total_amount, congestion_surcharge
  )
  VALUES (
    S.unique_row_id, S.filename, S.VendorID, S.tpep_pickup_datetime, S.tpep_dropoff_datetime,
    S.passenger_count, S.trip_distance, S.RatecodeID, S.store_and_fwd_flag, S.PULocationID,
    S.DOLocationID, S.payment_type, S.fare_amount, S.extra, S.mta_tax, S.tip_amount, S.tolls_amount,
    S.improvement_surcharge, S.total_amount, S.congestion_surcharge
  );

Verifying idempotency after two runs of the same data:

-- After first run:
SELECT COUNT(*) FROM public.yellow_tripdata;
-- Returns: N (all rows inserted)

-- After second run with same input:
SELECT COUNT(*) FROM public.yellow_tripdata;
-- Returns: N (same count -- no duplicates added)

Checking merge results by source file:

SELECT filename, COUNT(*) AS row_count
FROM public.yellow_tripdata
GROUP BY filename
ORDER BY filename;

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment