Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:DataTalksClub Data engineering zoomcamp Kestra Shell Commands

From Leeroopedia


Metadata
Knowledge Sources repo: DataTalksClub/data-engineering-zoomcamp, docs: Kestra Documentation, Kestra Shell Commands Plugin
Domains Data Extraction, Shell Scripting, Workflow Orchestration, ETL
Last Updated 2026-02-09 14:00 GMT

Overview

Concrete tool for extracting data from remote HTTP sources using the Kestra Shell Commands plugin, which executes wget and gunzip within a managed process to download and decompress NYC taxi trip data files.

Description

This implementation uses the io.kestra.plugin.scripts.shell.Commands task type with a io.kestra.plugin.core.runner.Process task runner to execute a shell pipeline that downloads compressed CSV data from the DataTalksClub GitHub releases and decompresses it in a single streaming operation.

The task is parameterized by three user-selectable inputs defined at the flow level:

  • inputs.taxi -- SELECT type with values [yellow, green], defaulting to yellow.
  • inputs.year -- SELECT type with values ["2019", "2020"], defaulting to "2019".
  • inputs.month -- SELECT type with values ["01" through "12"], defaulting to "01".

These inputs are interpolated into the download URL and output filename using Kestra's Pebble template expressions. The outputFiles: ["*.csv"] directive registers all produced CSV files as task outputs in Kestra's internal storage, making them available to subsequent tasks via the expression {{outputs.extract.outputFiles['filename.csv']}}.

Usage

This task is the first data-processing step in the 04_postgres_taxi flow. It executes after the set_label task and before the conditional DDL tasks. The downloaded CSV is consumed by the CopyIn task for bulk loading into the staging table.

Code Reference

Source Location: 02-workflow-orchestration/flows/04_postgres_taxi.yaml, Lines 38-45

Signature:

  - id: extract
    type: io.kestra.plugin.scripts.shell.Commands
    outputFiles:
      - "*.csv"
    taskRunner:
      type: io.kestra.plugin.core.runner.Process
    commands:
      - wget -qO- https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{{inputs.taxi}}/{{render(vars.file)}}.gz | gunzip > {{render(vars.file)}}

Import: Requires the io.kestra.plugin.scripts.shell plugin (included in the default Kestra distribution).

I/O Contract

Inputs:

Name Type Description
inputs.taxi SELECT (yellow, green) Taxi dataset type, determines download URL path segment
inputs.year SELECT (2019, 2020) Year partition of the dataset
inputs.month SELECT (01-12) Month partition of the dataset
vars.file Rendered template Filename template: Template:Inputs.taxi_tripdata_Template:Inputs.year-Template:Inputs.month.csv

Outputs:

Name Type Description
outputs.extract.outputFiles Map<String, URI> Map of matched filenames to Kestra internal storage URIs. Key is the CSV filename (e.g., yellow_tripdata_2019-01.csv), value is the internal storage reference.

Usage Examples

Full flow inputs and extraction task in context:

id: 04_postgres_taxi
namespace: zoomcamp

inputs:
  - id: taxi
    type: SELECT
    displayName: Select taxi type
    values: [yellow, green]
    defaults: yellow

  - id: year
    type: SELECT
    displayName: Select year
    values: ["2019", "2020"]
    defaults: "2019"

  - id: month
    type: SELECT
    displayName: Select month
    values: ["01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12"]
    defaults: "01"

variables:
  file: "{{inputs.taxi}}_tripdata_{{inputs.year}}-{{inputs.month}}.csv"
  data: "{{outputs.extract.outputFiles[inputs.taxi ~ '_tripdata_' ~ inputs.year ~ '-' ~ inputs.month ~ '.csv']}}"

tasks:
  - id: extract
    type: io.kestra.plugin.scripts.shell.Commands
    outputFiles:
      - "*.csv"
    taskRunner:
      type: io.kestra.plugin.core.runner.Process
    commands:
      - wget -qO- https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{{inputs.taxi}}/{{render(vars.file)}}.gz | gunzip > {{render(vars.file)}}

Referencing the output in a downstream task:

  - id: copy_in_to_staging
    type: io.kestra.plugin.jdbc.postgresql.CopyIn
    from: "{{render(vars.data)}}"
    # vars.data resolves to: outputs.extract.outputFiles['yellow_tripdata_2019-01.csv']

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment