Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:DataTalksClub Data engineering zoomcamp Kestra MD5 Deduplication

From Leeroopedia


Metadata
Knowledge Sources repo: DataTalksClub/data-engineering-zoomcamp, docs: Kestra Documentation, PostgreSQL String Functions
Domains Data Quality, Deduplication, SQL, Data Integrity
Last Updated 2026-02-09 14:00 GMT

Overview

Concrete tool for computing MD5-based deduplication keys and setting lineage tracking filenames on staging table rows using the Kestra PostgreSQL Queries plugin with an UPDATE statement.

Description

This implementation uses io.kestra.plugin.jdbc.postgresql.Queries to execute an UPDATE statement that populates two infrastructure columns on every row in the staging table:

  • unique_row_id -- set to the MD5 hash of 7 concatenated business key columns: VendorID, pickup_datetime, dropoff_datetime, PULocationID, DOLocationID, fare_amount, and trip_distance. Each column is cast to text and wrapped in COALESCE(..., ) to handle NULLs.
  • filename -- set to the rendered source file name (e.g., yellow_tripdata_2019-01.csv) for data lineage tracking.

The datetime column names differ between taxi types: yellow uses tpep_pickup_datetime / tpep_dropoff_datetime, while green uses lpep_pickup_datetime / lpep_dropoff_datetime. This difference is handled by the conditional branching -- each taxi type branch has its own UPDATE statement with the correct column references.

Usage

This task runs immediately after the CopyIn bulk load and before the MERGE upsert. It transforms the staging table from a raw data landing zone to a deduplication-ready dataset by populating the unique_row_id that the MERGE step uses for matching.

Code Reference

Source Location: 02-workflow-orchestration/flows/04_postgres_taxi.yaml, Lines 116-131 (yellow), Lines 225-239 (green)

Signature (yellow taxi):

      - id: yellow_add_unique_id_and_filename
        type: io.kestra.plugin.jdbc.postgresql.Queries
        sql: |
          UPDATE {{render(vars.staging_table)}}
          SET
            unique_row_id = md5(
              COALESCE(CAST(VendorID AS text), '') ||
              COALESCE(CAST(tpep_pickup_datetime AS text), '') ||
              COALESCE(CAST(tpep_dropoff_datetime AS text), '') ||
              COALESCE(PULocationID, '') ||
              COALESCE(DOLocationID, '') ||
              COALESCE(CAST(fare_amount AS text), '') ||
              COALESCE(CAST(trip_distance AS text), '')
            ),
            filename = '{{render(vars.file)}}';

Signature (green taxi):

      - id: green_add_unique_id_and_filename
        type: io.kestra.plugin.jdbc.postgresql.Queries
        sql: |
          UPDATE {{render(vars.staging_table)}}
          SET
            unique_row_id = md5(
              COALESCE(CAST(VendorID AS text), '') ||
              COALESCE(CAST(lpep_pickup_datetime AS text), '') ||
              COALESCE(CAST(lpep_dropoff_datetime AS text), '') ||
              COALESCE(PULocationID, '') ||
              COALESCE(DOLocationID, '') ||
              COALESCE(CAST(fare_amount AS text), '') ||
              COALESCE(CAST(trip_distance AS text), '')
            ),
            filename = '{{render(vars.file)}}';

Import: Requires the io.kestra.plugin.jdbc.postgresql plugin. Connection defaults inherited from flow-level pluginDefaults.

I/O Contract

Inputs:

Name Type Description
vars.staging_table Rendered template Target staging table: public.yellow_tripdata_staging or public.green_tripdata_staging
vars.file Rendered template Source filename for lineage: yellow_tripdata_2019-01.csv
Staging table data PostgreSQL rows Rows with NULL unique_row_id and filename from the CopyIn step

Outputs:

Name Type Description
Updated staging table PostgreSQL rows All rows now have populated unique_row_id (32-char hex MD5 hash) and filename (source file name)
Rows affected Integer Number of rows updated (should equal total staging table row count)

Usage Examples

Resolved SQL for yellow taxi (January 2019):

UPDATE public.yellow_tripdata_staging
SET
  unique_row_id = md5(
    COALESCE(CAST(VendorID AS text), '') ||
    COALESCE(CAST(tpep_pickup_datetime AS text), '') ||
    COALESCE(CAST(tpep_dropoff_datetime AS text), '') ||
    COALESCE(PULocationID, '') ||
    COALESCE(DOLocationID, '') ||
    COALESCE(CAST(fare_amount AS text), '') ||
    COALESCE(CAST(trip_distance AS text), '')
  ),
  filename = 'yellow_tripdata_2019-01.csv';

Verifying hash generation:

SELECT unique_row_id, filename, VendorID, tpep_pickup_datetime
FROM public.yellow_tripdata_staging
LIMIT 5;
-- unique_row_id: e.g., 'a1b2c3d4e5f6...' (32-char hex string)
-- filename: 'yellow_tripdata_2019-01.csv'

Checking for hash collisions (diagnostic):

SELECT unique_row_id, COUNT(*) AS cnt
FROM public.yellow_tripdata_staging
GROUP BY unique_row_id
HAVING COUNT(*) > 1;

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment