Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Workflow:DataTalksClub Data engineering zoomcamp Kestra ETL Pipeline

From Leeroopedia



Knowledge Sources
Domains Data_Engineering, Workflow_Orchestration, ETL
Last Updated 2026-02-09 07:00 GMT

Overview

End-to-end orchestrated ETL pipeline using Kestra to extract NYC taxi trip CSV data, load it into a PostgreSQL staging table, apply deduplication, and merge into a final production table.

Description

This workflow defines a production-grade ETL pattern using Kestra as the workflow orchestrator. The pipeline downloads compressed CSV files from a remote source, creates both staging and final tables in PostgreSQL with proper schemas, bulk-loads data into the staging table using COPY, generates unique row identifiers via MD5 hashing, and performs an idempotent MERGE operation to deduplicate records. The flow supports both yellow and green taxi datasets with conditional branching. It can be triggered manually or on a cron schedule.

Usage

Execute this workflow when you need an orchestrated, repeatable ETL pipeline that can handle incremental data loads with deduplication. Use it when you have Kestra running as your orchestration platform and need to load NYC taxi data into PostgreSQL with proper staging and merge semantics. The scheduled variant runs monthly to pick up new data automatically.

Execution Steps

Step 1: Infrastructure Setup

Deploy the Kestra orchestrator and PostgreSQL database using Docker Compose. Kestra runs as a web service with its own UI for flow management, while PostgreSQL serves as both the Kestra metadata store and the target data warehouse.

Key considerations:

  • Docker Compose defines both Kestra and PostgreSQL services
  • Kestra connects to PostgreSQL for its internal state management
  • The target database for taxi data uses separate credentials from Kestra's internal database

Step 2: Data Extraction

Download the compressed CSV file from the NYC TLC data repository using a shell command task. The file is selected based on input parameters (taxi type, year, month) and decompressed on the fly using gunzip.

Key considerations:

  • The wget and gunzip pipeline streams data efficiently without intermediate storage
  • Output files are registered as Kestra internal storage artifacts
  • File naming follows the pattern: {color}_tripdata_{year}-{month}.csv

Step 3: Table Creation

Create both the final destination table and a staging table in PostgreSQL if they do not already exist. The schema differs between yellow and green taxi datasets (different datetime column names, green has additional fields like ehail_fee and trip_type), so conditional branching selects the appropriate DDL.

Key considerations:

  • Uses CREATE TABLE IF NOT EXISTS for idempotency
  • Staging table mirrors the final table schema exactly
  • Both tables include metadata columns (unique_row_id, filename) not present in the source data

Step 4: Staging Load

Truncate the staging table to clear any previous data, then bulk-load the extracted CSV file using PostgreSQL COPY command. This is the fastest method for loading large CSV files into PostgreSQL.

Key considerations:

  • Truncate before load ensures clean staging state
  • COPY with CSV format and header skip handles the file format
  • Column list is explicitly specified to match the CSV column order

Step 5: Deduplication and Enrichment

Update staging records with a computed unique row identifier (MD5 hash of key business columns) and the source filename. The MD5 hash combines vendor, pickup/dropoff times, locations, fare, and distance to create a deterministic row fingerprint.

Key considerations:

  • MD5 hash provides deterministic deduplication across loads
  • COALESCE handles NULL values in hash computation
  • Filename column tracks data provenance

Step 6: Merge to Production

Execute a MERGE (upsert) statement that compares staging records against the production table using the unique_row_id. Only new records (those not already present) are inserted, ensuring idempotent loads that can be safely re-run.

Key considerations:

  • MERGE with WHEN NOT MATCHED ensures no duplicate inserts
  • The operation is idempotent: re-running with the same data produces no duplicates
  • This pattern supports both initial loads and incremental updates

Step 7: Cleanup

Purge temporary files from Kestra internal storage to free resources. This step removes the downloaded CSV files that were stored as execution artifacts.

Key considerations:

  • Can be disabled during development to inspect intermediate outputs
  • Runs automatically as the final step in the flow

Execution Diagram

GitHub URL

Workflow Repository