Implementation:DataTalksClub Data engineering zoomcamp Kestra PostgreSQL CopyIn
| Metadata | |
|---|---|
| Knowledge Sources | repo: DataTalksClub/data-engineering-zoomcamp, docs: Kestra Documentation, Kestra PostgreSQL CopyIn Plugin |
| Domains | Data Loading, Bulk Transfer, PostgreSQL, ETL |
| Last Updated | 2026-02-09 14:00 GMT |
Overview
Concrete tool for performing high-throughput bulk data loading from CSV files into PostgreSQL staging tables using the Kestra CopyIn plugin, which wraps the PostgreSQL COPY protocol.
Description
This implementation uses io.kestra.plugin.jdbc.postgresql.CopyIn to stream CSV data from Kestra's internal storage directly into a PostgreSQL staging table using the native COPY protocol. The task is configured with:
- format: CSV -- specifies the input file format as comma-separated values.
- header: true -- indicates the first row of the CSV contains column names (to be skipped during loading).
- from -- references the Kestra internal storage URI of the extracted CSV file, resolved via the
Template:Render(vars.data)template expression. - table -- the target staging table name, resolved via
Template:Render(vars.staging table). - columns -- an explicit list of CSV column names that maps file columns to table columns. This is essential because the staging table contains two additional columns (
unique_row_idandfilename) that are not present in the source CSV.
The column list differs between yellow and green taxi datasets:
- Yellow taxi: 18 columns (VendorID through congestion_surcharge, with
tpep_datetime prefixes). - Green taxi: 20 columns (VendorID through congestion_surcharge, with
lpep_datetime prefixes, plusehail_feeandtrip_type).
Usage
This task runs after the staging table has been created and truncated, and before the deduplication (MD5 hash) step. The staging table must be empty before the CopyIn operation to ensure a clean load.
Code Reference
Source Location: 02-workflow-orchestration/flows/04_postgres_taxi.yaml, Lines 108-114 (yellow), Lines 217-223 (green)
Signature (yellow taxi):
- id: yellow_copy_in_to_staging_table
type: io.kestra.plugin.jdbc.postgresql.CopyIn
format: CSV
from: "{{render(vars.data)}}"
table: "{{render(vars.staging_table)}}"
header: true
columns: [VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge]
Signature (green taxi):
- id: green_copy_in_to_staging_table
type: io.kestra.plugin.jdbc.postgresql.CopyIn
format: CSV
from: "{{render(vars.data)}}"
table: "{{render(vars.staging_table)}}"
header: true
columns: [VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge]
Import: Requires the io.kestra.plugin.jdbc.postgresql plugin. Connection defaults configured at flow level:
pluginDefaults:
- type: io.kestra.plugin.jdbc.postgresql
values:
url: jdbc:postgresql://pgdatabase:5432/ny_taxi
username: root
password: root
I/O Contract
Inputs:
| Name | Type | Description |
|---|---|---|
| from | Kestra internal URI | Reference to the extracted CSV file: Template:Render(vars.data) resolving to outputs.extract.outputFiles['..csv']
|
| table | String (rendered) | Target staging table: public.yellow_tripdata_staging or public.green_tripdata_staging
|
| columns | List<String> | Explicit column list mapping CSV columns to table columns (excludes unique_row_id and filename) |
| format | String | CSV
|
| header | Boolean | true -- first row is a header to skip
|
Outputs:
| Name | Type | Description |
|---|---|---|
| Populated staging table | PostgreSQL table | Staging table containing all rows from the CSV file. The unique_row_id and filename columns are NULL at this point.
|
| Row count | Long | Number of rows loaded (available in Kestra execution metrics) |
Usage Examples
Resolved COPY command for yellow taxi data (conceptual SQL equivalent):
COPY public.yellow_tripdata_staging (
VendorID, tpep_pickup_datetime, tpep_dropoff_datetime,
passenger_count, trip_distance, RatecodeID, store_and_fwd_flag,
PULocationID, DOLocationID, payment_type, fare_amount, extra,
mta_tax, tip_amount, tolls_amount, improvement_surcharge,
total_amount, congestion_surcharge
)
FROM STDIN
WITH (FORMAT CSV, HEADER true);
Verifying the load with a count query:
SELECT COUNT(*) FROM public.yellow_tripdata_staging;
Inspecting NULL infrastructure columns after CopyIn:
SELECT unique_row_id, filename, VendorID, tpep_pickup_datetime
FROM public.yellow_tripdata_staging
LIMIT 5;
-- unique_row_id and filename will be NULL
-- VendorID and tpep_pickup_datetime will contain data from CSV