Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:DataTalksClub Data engineering zoomcamp Kestra PostgreSQL Queries DDL

From Leeroopedia


Metadata
Knowledge Sources repo: DataTalksClub/data-engineering-zoomcamp, docs: Kestra Documentation, Kestra PostgreSQL Queries Plugin
Domains Data Modeling, DDL, Schema Management, Conditional Logic, PostgreSQL
Last Updated 2026-02-09 14:00 GMT

Overview

Concrete tool for creating dynamically-schemed PostgreSQL tables using the Kestra PostgreSQL Queries plugin combined with the Kestra If flowable task for conditional branching based on taxi type input.

Description

This implementation uses io.kestra.plugin.jdbc.postgresql.Queries to execute CREATE TABLE IF NOT EXISTS DDL statements, wrapped inside io.kestra.plugin.core.flow.If conditional blocks that evaluate the inputs.taxi parameter. The conditional branching selects one of two schema definitions:

  • Yellow taxi schema (20 columns): includes tpep_pickup_datetime, tpep_dropoff_datetime, and 18 other columns specific to yellow taxi trip data.
  • Green taxi schema (22 columns): includes lpep_pickup_datetime, lpep_dropoff_datetime, plus additional columns ehail_fee and trip_type.

Both schemas include two infrastructure columns (unique_row_id and filename) that are populated in later pipeline stages. For each schema variant, both a production table and a staging table are created, followed by a TRUNCATE of the staging table to ensure a clean landing zone.

All PostgreSQL tasks inherit connection defaults from the flow-level pluginDefaults block: jdbc:postgresql://pgdatabase:5432/ny_taxi with user root and password root.

Usage

These tasks execute after the data extraction step and before the CopyIn bulk load. The conditional branching ensures the correct schema is applied based on the user's taxi type selection.

Code Reference

Source Location: 02-workflow-orchestration/flows/04_postgres_taxi.yaml, Lines 47-215

Signature (yellow taxi branch):

  - id: if_yellow_taxi
    type: io.kestra.plugin.core.flow.If
    condition: "{{inputs.taxi == 'yellow'}}"
    then:
      - id: yellow_create_table
        type: io.kestra.plugin.jdbc.postgresql.Queries
        sql: |
          CREATE TABLE IF NOT EXISTS {{render(vars.table)}} (
              unique_row_id          text,
              filename               text,
              VendorID               text,
              tpep_pickup_datetime   timestamp,
              tpep_dropoff_datetime  timestamp,
              passenger_count        integer,
              trip_distance          double precision,
              RatecodeID             text,
              store_and_fwd_flag     text,
              PULocationID           text,
              DOLocationID           text,
              payment_type           integer,
              fare_amount            double precision,
              extra                  double precision,
              mta_tax                double precision,
              tip_amount             double precision,
              tolls_amount           double precision,
              improvement_surcharge  double precision,
              total_amount           double precision,
              congestion_surcharge   double precision
          );

      - id: yellow_create_staging_table
        type: io.kestra.plugin.jdbc.postgresql.Queries
        sql: |
          CREATE TABLE IF NOT EXISTS {{render(vars.staging_table)}} (
              unique_row_id          text,
              filename               text,
              VendorID               text,
              tpep_pickup_datetime   timestamp,
              tpep_dropoff_datetime  timestamp,
              passenger_count        integer,
              trip_distance          double precision,
              RatecodeID             text,
              store_and_fwd_flag     text,
              PULocationID           text,
              DOLocationID           text,
              payment_type           integer,
              fare_amount            double precision,
              extra                  double precision,
              mta_tax                double precision,
              tip_amount             double precision,
              tolls_amount           double precision,
              improvement_surcharge  double precision,
              total_amount           double precision,
              congestion_surcharge   double precision
          );

      - id: yellow_truncate_staging_table
        type: io.kestra.plugin.jdbc.postgresql.Queries
        sql: |
          TRUNCATE TABLE {{render(vars.staging_table)}};

Signature (green taxi branch):

  - id: if_green_taxi
    type: io.kestra.plugin.core.flow.If
    condition: "{{inputs.taxi == 'green'}}"
    then:
      - id: green_create_table
        type: io.kestra.plugin.jdbc.postgresql.Queries
        sql: |
          CREATE TABLE IF NOT EXISTS {{render(vars.table)}} (
              unique_row_id          text,
              filename               text,
              VendorID               text,
              lpep_pickup_datetime   timestamp,
              lpep_dropoff_datetime  timestamp,
              store_and_fwd_flag     text,
              RatecodeID             text,
              PULocationID           text,
              DOLocationID           text,
              passenger_count        integer,
              trip_distance          double precision,
              fare_amount            double precision,
              extra                  double precision,
              mta_tax                double precision,
              tip_amount             double precision,
              tolls_amount           double precision,
              ehail_fee              double precision,
              improvement_surcharge  double precision,
              total_amount           double precision,
              payment_type           integer,
              trip_type              integer,
              congestion_surcharge   double precision
          );

Import: Requires the io.kestra.plugin.jdbc.postgresql plugin. Connection defaults configured at flow level:

pluginDefaults:
  - type: io.kestra.plugin.jdbc.postgresql
    values:
      url: jdbc:postgresql://pgdatabase:5432/ny_taxi
      username: root
      password: root

I/O Contract

Inputs:

Name Type Description
inputs.taxi SELECT (yellow, green) Determines which conditional branch and schema definition to use
vars.table Rendered template Production table name: public.Template:Inputs.taxi_tripdata
vars.staging_table Rendered template Staging table name: public.Template:Inputs.taxi_tripdata_staging
JDBC connection Plugin default jdbc:postgresql://pgdatabase:5432/ny_taxi (root/root)

Outputs:

Name Type Description
Production table PostgreSQL table public.yellow_tripdata or public.green_tripdata (created if not exists)
Staging table PostgreSQL table public.yellow_tripdata_staging or public.green_tripdata_staging (created and truncated)

Usage Examples

Resolved SQL for yellow taxi (inputs.taxi = "yellow", vars.table = "public.yellow_tripdata"):

CREATE TABLE IF NOT EXISTS public.yellow_tripdata (
    unique_row_id          text,
    filename               text,
    VendorID               text,
    tpep_pickup_datetime   timestamp,
    tpep_dropoff_datetime  timestamp,
    passenger_count        integer,
    trip_distance          double precision,
    RatecodeID             text,
    store_and_fwd_flag     text,
    PULocationID           text,
    DOLocationID           text,
    payment_type           integer,
    fare_amount            double precision,
    extra                  double precision,
    mta_tax                double precision,
    tip_amount             double precision,
    tolls_amount           double precision,
    improvement_surcharge  double precision,
    total_amount           double precision,
    congestion_surcharge   double precision
);

Staging table truncation:

TRUNCATE TABLE public.yellow_tripdata_staging;

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment