Implementation:DataTalksClub Data engineering zoomcamp Kestra PostgreSQL Queries DDL
| Metadata | |
|---|---|
| Knowledge Sources | repo: DataTalksClub/data-engineering-zoomcamp, docs: Kestra Documentation, Kestra PostgreSQL Queries Plugin |
| Domains | Data Modeling, DDL, Schema Management, Conditional Logic, PostgreSQL |
| Last Updated | 2026-02-09 14:00 GMT |
Overview
Concrete tool for creating dynamically-schemed PostgreSQL tables using the Kestra PostgreSQL Queries plugin combined with the Kestra If flowable task for conditional branching based on taxi type input.
Description
This implementation uses io.kestra.plugin.jdbc.postgresql.Queries to execute CREATE TABLE IF NOT EXISTS DDL statements, wrapped inside io.kestra.plugin.core.flow.If conditional blocks that evaluate the inputs.taxi parameter. The conditional branching selects one of two schema definitions:
- Yellow taxi schema (20 columns): includes
tpep_pickup_datetime,tpep_dropoff_datetime, and 18 other columns specific to yellow taxi trip data. - Green taxi schema (22 columns): includes
lpep_pickup_datetime,lpep_dropoff_datetime, plus additional columnsehail_feeandtrip_type.
Both schemas include two infrastructure columns (unique_row_id and filename) that are populated in later pipeline stages. For each schema variant, both a production table and a staging table are created, followed by a TRUNCATE of the staging table to ensure a clean landing zone.
All PostgreSQL tasks inherit connection defaults from the flow-level pluginDefaults block: jdbc:postgresql://pgdatabase:5432/ny_taxi with user root and password root.
Usage
These tasks execute after the data extraction step and before the CopyIn bulk load. The conditional branching ensures the correct schema is applied based on the user's taxi type selection.
Code Reference
Source Location: 02-workflow-orchestration/flows/04_postgres_taxi.yaml, Lines 47-215
Signature (yellow taxi branch):
- id: if_yellow_taxi
type: io.kestra.plugin.core.flow.If
condition: "{{inputs.taxi == 'yellow'}}"
then:
- id: yellow_create_table
type: io.kestra.plugin.jdbc.postgresql.Queries
sql: |
CREATE TABLE IF NOT EXISTS {{render(vars.table)}} (
unique_row_id text,
filename text,
VendorID text,
tpep_pickup_datetime timestamp,
tpep_dropoff_datetime timestamp,
passenger_count integer,
trip_distance double precision,
RatecodeID text,
store_and_fwd_flag text,
PULocationID text,
DOLocationID text,
payment_type integer,
fare_amount double precision,
extra double precision,
mta_tax double precision,
tip_amount double precision,
tolls_amount double precision,
improvement_surcharge double precision,
total_amount double precision,
congestion_surcharge double precision
);
- id: yellow_create_staging_table
type: io.kestra.plugin.jdbc.postgresql.Queries
sql: |
CREATE TABLE IF NOT EXISTS {{render(vars.staging_table)}} (
unique_row_id text,
filename text,
VendorID text,
tpep_pickup_datetime timestamp,
tpep_dropoff_datetime timestamp,
passenger_count integer,
trip_distance double precision,
RatecodeID text,
store_and_fwd_flag text,
PULocationID text,
DOLocationID text,
payment_type integer,
fare_amount double precision,
extra double precision,
mta_tax double precision,
tip_amount double precision,
tolls_amount double precision,
improvement_surcharge double precision,
total_amount double precision,
congestion_surcharge double precision
);
- id: yellow_truncate_staging_table
type: io.kestra.plugin.jdbc.postgresql.Queries
sql: |
TRUNCATE TABLE {{render(vars.staging_table)}};
Signature (green taxi branch):
- id: if_green_taxi
type: io.kestra.plugin.core.flow.If
condition: "{{inputs.taxi == 'green'}}"
then:
- id: green_create_table
type: io.kestra.plugin.jdbc.postgresql.Queries
sql: |
CREATE TABLE IF NOT EXISTS {{render(vars.table)}} (
unique_row_id text,
filename text,
VendorID text,
lpep_pickup_datetime timestamp,
lpep_dropoff_datetime timestamp,
store_and_fwd_flag text,
RatecodeID text,
PULocationID text,
DOLocationID text,
passenger_count integer,
trip_distance double precision,
fare_amount double precision,
extra double precision,
mta_tax double precision,
tip_amount double precision,
tolls_amount double precision,
ehail_fee double precision,
improvement_surcharge double precision,
total_amount double precision,
payment_type integer,
trip_type integer,
congestion_surcharge double precision
);
Import: Requires the io.kestra.plugin.jdbc.postgresql plugin. Connection defaults configured at flow level:
pluginDefaults:
- type: io.kestra.plugin.jdbc.postgresql
values:
url: jdbc:postgresql://pgdatabase:5432/ny_taxi
username: root
password: root
I/O Contract
Inputs:
| Name | Type | Description |
|---|---|---|
| inputs.taxi | SELECT (yellow, green) | Determines which conditional branch and schema definition to use |
| vars.table | Rendered template | Production table name: public.Template:Inputs.taxi_tripdata
|
| vars.staging_table | Rendered template | Staging table name: public.Template:Inputs.taxi_tripdata_staging
|
| JDBC connection | Plugin default | jdbc:postgresql://pgdatabase:5432/ny_taxi (root/root)
|
Outputs:
| Name | Type | Description |
|---|---|---|
| Production table | PostgreSQL table | public.yellow_tripdata or public.green_tripdata (created if not exists)
|
| Staging table | PostgreSQL table | public.yellow_tripdata_staging or public.green_tripdata_staging (created and truncated)
|
Usage Examples
Resolved SQL for yellow taxi (inputs.taxi = "yellow", vars.table = "public.yellow_tripdata"):
CREATE TABLE IF NOT EXISTS public.yellow_tripdata (
unique_row_id text,
filename text,
VendorID text,
tpep_pickup_datetime timestamp,
tpep_dropoff_datetime timestamp,
passenger_count integer,
trip_distance double precision,
RatecodeID text,
store_and_fwd_flag text,
PULocationID text,
DOLocationID text,
payment_type integer,
fare_amount double precision,
extra double precision,
mta_tax double precision,
tip_amount double precision,
tolls_amount double precision,
improvement_surcharge double precision,
total_amount double precision,
congestion_surcharge double precision
);
Staging table truncation:
TRUNCATE TABLE public.yellow_tripdata_staging;