Implementation:DataTalksClub Data engineering zoomcamp Kestra MD5 Deduplication
| Metadata | |
|---|---|
| Knowledge Sources | repo: DataTalksClub/data-engineering-zoomcamp, docs: Kestra Documentation, PostgreSQL String Functions |
| Domains | Data Quality, Deduplication, SQL, Data Integrity |
| Last Updated | 2026-02-09 14:00 GMT |
Overview
Concrete tool for computing MD5-based deduplication keys and setting lineage tracking filenames on staging table rows using the Kestra PostgreSQL Queries plugin with an UPDATE statement.
Description
This implementation uses io.kestra.plugin.jdbc.postgresql.Queries to execute an UPDATE statement that populates two infrastructure columns on every row in the staging table:
- unique_row_id -- set to the MD5 hash of 7 concatenated business key columns:
VendorID,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,fare_amount, andtrip_distance. Each column is cast to text and wrapped inCOALESCE(..., )to handle NULLs. - filename -- set to the rendered source file name (e.g.,
yellow_tripdata_2019-01.csv) for data lineage tracking.
The datetime column names differ between taxi types: yellow uses tpep_pickup_datetime / tpep_dropoff_datetime, while green uses lpep_pickup_datetime / lpep_dropoff_datetime. This difference is handled by the conditional branching -- each taxi type branch has its own UPDATE statement with the correct column references.
Usage
This task runs immediately after the CopyIn bulk load and before the MERGE upsert. It transforms the staging table from a raw data landing zone to a deduplication-ready dataset by populating the unique_row_id that the MERGE step uses for matching.
Code Reference
Source Location: 02-workflow-orchestration/flows/04_postgres_taxi.yaml, Lines 116-131 (yellow), Lines 225-239 (green)
Signature (yellow taxi):
- id: yellow_add_unique_id_and_filename
type: io.kestra.plugin.jdbc.postgresql.Queries
sql: |
UPDATE {{render(vars.staging_table)}}
SET
unique_row_id = md5(
COALESCE(CAST(VendorID AS text), '') ||
COALESCE(CAST(tpep_pickup_datetime AS text), '') ||
COALESCE(CAST(tpep_dropoff_datetime AS text), '') ||
COALESCE(PULocationID, '') ||
COALESCE(DOLocationID, '') ||
COALESCE(CAST(fare_amount AS text), '') ||
COALESCE(CAST(trip_distance AS text), '')
),
filename = '{{render(vars.file)}}';
Signature (green taxi):
- id: green_add_unique_id_and_filename
type: io.kestra.plugin.jdbc.postgresql.Queries
sql: |
UPDATE {{render(vars.staging_table)}}
SET
unique_row_id = md5(
COALESCE(CAST(VendorID AS text), '') ||
COALESCE(CAST(lpep_pickup_datetime AS text), '') ||
COALESCE(CAST(lpep_dropoff_datetime AS text), '') ||
COALESCE(PULocationID, '') ||
COALESCE(DOLocationID, '') ||
COALESCE(CAST(fare_amount AS text), '') ||
COALESCE(CAST(trip_distance AS text), '')
),
filename = '{{render(vars.file)}}';
Import: Requires the io.kestra.plugin.jdbc.postgresql plugin. Connection defaults inherited from flow-level pluginDefaults.
I/O Contract
Inputs:
| Name | Type | Description |
|---|---|---|
| vars.staging_table | Rendered template | Target staging table: public.yellow_tripdata_staging or public.green_tripdata_staging
|
| vars.file | Rendered template | Source filename for lineage: yellow_tripdata_2019-01.csv
|
| Staging table data | PostgreSQL rows | Rows with NULL unique_row_id and filename from the CopyIn step
|
Outputs:
| Name | Type | Description |
|---|---|---|
| Updated staging table | PostgreSQL rows | All rows now have populated unique_row_id (32-char hex MD5 hash) and filename (source file name)
|
| Rows affected | Integer | Number of rows updated (should equal total staging table row count) |
Usage Examples
Resolved SQL for yellow taxi (January 2019):
UPDATE public.yellow_tripdata_staging
SET
unique_row_id = md5(
COALESCE(CAST(VendorID AS text), '') ||
COALESCE(CAST(tpep_pickup_datetime AS text), '') ||
COALESCE(CAST(tpep_dropoff_datetime AS text), '') ||
COALESCE(PULocationID, '') ||
COALESCE(DOLocationID, '') ||
COALESCE(CAST(fare_amount AS text), '') ||
COALESCE(CAST(trip_distance AS text), '')
),
filename = 'yellow_tripdata_2019-01.csv';
Verifying hash generation:
SELECT unique_row_id, filename, VendorID, tpep_pickup_datetime
FROM public.yellow_tripdata_staging
LIMIT 5;
-- unique_row_id: e.g., 'a1b2c3d4e5f6...' (32-char hex string)
-- filename: 'yellow_tripdata_2019-01.csv'
Checking for hash collisions (diagnostic):
SELECT unique_row_id, COUNT(*) AS cnt
FROM public.yellow_tripdata_staging
GROUP BY unique_row_id
HAVING COUNT(*) > 1;