Implementation:DataTalksClub Data engineering zoomcamp Kestra SQL Merge
| Metadata | |
|---|---|
| Knowledge Sources | repo: DataTalksClub/data-engineering-zoomcamp, docs: Kestra Documentation, PostgreSQL MERGE Documentation |
| Domains | Data Loading, Idempotency, SQL, Upsert |
| Last Updated | 2026-02-09 14:00 GMT |
Overview
Concrete tool for performing idempotent data loading from staging to production tables using the SQL MERGE statement executed through the Kestra PostgreSQL Queries plugin, ensuring safe pipeline re-runs by inserting only non-duplicate records.
Description
This implementation uses io.kestra.plugin.jdbc.postgresql.Queries to execute a MERGE INTO...USING...ON...WHEN NOT MATCHED THEN INSERT statement that promotes data from the staging table to the production table. The MERGE matches rows based on the unique_row_id column (the MD5 hash computed in the previous deduplication step).
Key characteristics:
- Insert-only semantics -- the MERGE uses only the
WHEN NOT MATCHED THEN INSERTclause. There is noWHEN MATCHED THEN UPDATEclause, meaning existing records in the production table are never modified. - Full column projection -- all columns from the staging table are explicitly listed in both the INSERT column list and the VALUES clause, ensuring no columns are missed or misaligned.
- PostgreSQL 15+ required -- the
MERGEstatement is only available in PostgreSQL 15 and later. The Docker Compose stack provisionspostgres:18, which supports this feature.
The column lists differ between yellow (20 columns) and green (22 columns) taxi datasets, handled by separate MERGE statements within their respective conditional branches.
Usage
This task runs after the MD5 deduplication step has populated unique_row_id on all staging rows, and before the file cleanup step. It is the final data movement operation in the pipeline.
Code Reference
Source Location: 02-workflow-orchestration/flows/04_postgres_taxi.yaml, Lines 132-150 (yellow), Lines 241-259 (green)
Signature (yellow taxi):
- id: yellow_merge_data
type: io.kestra.plugin.jdbc.postgresql.Queries
sql: |
MERGE INTO {{render(vars.table)}} AS T
USING {{render(vars.staging_table)}} AS S
ON T.unique_row_id = S.unique_row_id
WHEN NOT MATCHED THEN
INSERT (
unique_row_id, filename, VendorID, tpep_pickup_datetime, tpep_dropoff_datetime,
passenger_count, trip_distance, RatecodeID, store_and_fwd_flag, PULocationID,
DOLocationID, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount,
improvement_surcharge, total_amount, congestion_surcharge
)
VALUES (
S.unique_row_id, S.filename, S.VendorID, S.tpep_pickup_datetime, S.tpep_dropoff_datetime,
S.passenger_count, S.trip_distance, S.RatecodeID, S.store_and_fwd_flag, S.PULocationID,
S.DOLocationID, S.payment_type, S.fare_amount, S.extra, S.mta_tax, S.tip_amount, S.tolls_amount,
S.improvement_surcharge, S.total_amount, S.congestion_surcharge
);
Signature (green taxi):
- id: green_merge_data
type: io.kestra.plugin.jdbc.postgresql.Queries
sql: |
MERGE INTO {{render(vars.table)}} AS T
USING {{render(vars.staging_table)}} AS S
ON T.unique_row_id = S.unique_row_id
WHEN NOT MATCHED THEN
INSERT (
unique_row_id, filename, VendorID, lpep_pickup_datetime, lpep_dropoff_datetime,
store_and_fwd_flag, RatecodeID, PULocationID, DOLocationID, passenger_count,
trip_distance, fare_amount, extra, mta_tax, tip_amount, tolls_amount, ehail_fee,
improvement_surcharge, total_amount, payment_type, trip_type, congestion_surcharge
)
VALUES (
S.unique_row_id, S.filename, S.VendorID, S.lpep_pickup_datetime, S.lpep_dropoff_datetime,
S.store_and_fwd_flag, S.RatecodeID, S.PULocationID, S.DOLocationID, S.passenger_count,
S.trip_distance, S.fare_amount, S.extra, S.mta_tax, S.tip_amount, S.tolls_amount, S.ehail_fee,
S.improvement_surcharge, S.total_amount, S.payment_type, S.trip_type, S.congestion_surcharge
);
Import: Requires the io.kestra.plugin.jdbc.postgresql plugin. Connection defaults inherited from flow-level pluginDefaults.
I/O Contract
Inputs:
| Name | Type | Description |
|---|---|---|
| vars.table | Rendered template | Production table: public.yellow_tripdata or public.green_tripdata
|
| vars.staging_table | Rendered template | Source staging table: public.yellow_tripdata_staging or public.green_tripdata_staging
|
| Staging data | PostgreSQL rows | Rows with populated unique_row_id and filename from the deduplication step
|
| Production data | PostgreSQL rows | Previously loaded rows (or empty table on first run) |
Outputs:
| Name | Type | Description |
|---|---|---|
| Updated production table | PostgreSQL rows | Production table now contains the union of previously existing rows and new (non-duplicate) rows from staging |
| Rows inserted | Integer | Count of new rows added to production (available in execution metrics) |
| Rows matched | Integer | Count of rows that already existed in production (skipped) |
Usage Examples
Resolved SQL for yellow taxi MERGE:
MERGE INTO public.yellow_tripdata AS T
USING public.yellow_tripdata_staging AS S
ON T.unique_row_id = S.unique_row_id
WHEN NOT MATCHED THEN
INSERT (
unique_row_id, filename, VendorID, tpep_pickup_datetime, tpep_dropoff_datetime,
passenger_count, trip_distance, RatecodeID, store_and_fwd_flag, PULocationID,
DOLocationID, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount,
improvement_surcharge, total_amount, congestion_surcharge
)
VALUES (
S.unique_row_id, S.filename, S.VendorID, S.tpep_pickup_datetime, S.tpep_dropoff_datetime,
S.passenger_count, S.trip_distance, S.RatecodeID, S.store_and_fwd_flag, S.PULocationID,
S.DOLocationID, S.payment_type, S.fare_amount, S.extra, S.mta_tax, S.tip_amount, S.tolls_amount,
S.improvement_surcharge, S.total_amount, S.congestion_surcharge
);
Verifying idempotency after two runs of the same data:
-- After first run:
SELECT COUNT(*) FROM public.yellow_tripdata;
-- Returns: N (all rows inserted)
-- After second run with same input:
SELECT COUNT(*) FROM public.yellow_tripdata;
-- Returns: N (same count -- no duplicates added)
Checking merge results by source file:
SELECT filename, COUNT(*) AS row_count
FROM public.yellow_tripdata
GROUP BY filename
ORDER BY filename;