Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:DataTalksClub Data engineering zoomcamp Kestra PostgreSQL CopyIn

From Leeroopedia


Metadata
Knowledge Sources repo: DataTalksClub/data-engineering-zoomcamp, docs: Kestra Documentation, Kestra PostgreSQL CopyIn Plugin
Domains Data Loading, Bulk Transfer, PostgreSQL, ETL
Last Updated 2026-02-09 14:00 GMT

Overview

Concrete tool for performing high-throughput bulk data loading from CSV files into PostgreSQL staging tables using the Kestra CopyIn plugin, which wraps the PostgreSQL COPY protocol.

Description

This implementation uses io.kestra.plugin.jdbc.postgresql.CopyIn to stream CSV data from Kestra's internal storage directly into a PostgreSQL staging table using the native COPY protocol. The task is configured with:

  • format: CSV -- specifies the input file format as comma-separated values.
  • header: true -- indicates the first row of the CSV contains column names (to be skipped during loading).
  • from -- references the Kestra internal storage URI of the extracted CSV file, resolved via the Template:Render(vars.data) template expression.
  • table -- the target staging table name, resolved via Template:Render(vars.staging table).
  • columns -- an explicit list of CSV column names that maps file columns to table columns. This is essential because the staging table contains two additional columns (unique_row_id and filename) that are not present in the source CSV.

The column list differs between yellow and green taxi datasets:

  • Yellow taxi: 18 columns (VendorID through congestion_surcharge, with tpep_ datetime prefixes).
  • Green taxi: 20 columns (VendorID through congestion_surcharge, with lpep_ datetime prefixes, plus ehail_fee and trip_type).

Usage

This task runs after the staging table has been created and truncated, and before the deduplication (MD5 hash) step. The staging table must be empty before the CopyIn operation to ensure a clean load.

Code Reference

Source Location: 02-workflow-orchestration/flows/04_postgres_taxi.yaml, Lines 108-114 (yellow), Lines 217-223 (green)

Signature (yellow taxi):

      - id: yellow_copy_in_to_staging_table
        type: io.kestra.plugin.jdbc.postgresql.CopyIn
        format: CSV
        from: "{{render(vars.data)}}"
        table: "{{render(vars.staging_table)}}"
        header: true
        columns: [VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge]

Signature (green taxi):

      - id: green_copy_in_to_staging_table
        type: io.kestra.plugin.jdbc.postgresql.CopyIn
        format: CSV
        from: "{{render(vars.data)}}"
        table: "{{render(vars.staging_table)}}"
        header: true
        columns: [VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge]

Import: Requires the io.kestra.plugin.jdbc.postgresql plugin. Connection defaults configured at flow level:

pluginDefaults:
  - type: io.kestra.plugin.jdbc.postgresql
    values:
      url: jdbc:postgresql://pgdatabase:5432/ny_taxi
      username: root
      password: root

I/O Contract

Inputs:

Name Type Description
from Kestra internal URI Reference to the extracted CSV file: Template:Render(vars.data) resolving to outputs.extract.outputFiles['..csv']
table String (rendered) Target staging table: public.yellow_tripdata_staging or public.green_tripdata_staging
columns List<String> Explicit column list mapping CSV columns to table columns (excludes unique_row_id and filename)
format String CSV
header Boolean true -- first row is a header to skip

Outputs:

Name Type Description
Populated staging table PostgreSQL table Staging table containing all rows from the CSV file. The unique_row_id and filename columns are NULL at this point.
Row count Long Number of rows loaded (available in Kestra execution metrics)

Usage Examples

Resolved COPY command for yellow taxi data (conceptual SQL equivalent):

COPY public.yellow_tripdata_staging (
    VendorID, tpep_pickup_datetime, tpep_dropoff_datetime,
    passenger_count, trip_distance, RatecodeID, store_and_fwd_flag,
    PULocationID, DOLocationID, payment_type, fare_amount, extra,
    mta_tax, tip_amount, tolls_amount, improvement_surcharge,
    total_amount, congestion_surcharge
)
FROM STDIN
WITH (FORMAT CSV, HEADER true);

Verifying the load with a count query:

SELECT COUNT(*) FROM public.yellow_tripdata_staging;

Inspecting NULL infrastructure columns after CopyIn:

SELECT unique_row_id, filename, VendorID, tpep_pickup_datetime
FROM public.yellow_tripdata_staging
LIMIT 5;
-- unique_row_id and filename will be NULL
-- VendorID and tpep_pickup_datetime will contain data from CSV

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment