Implementation:DataTalksClub Data engineering zoomcamp Kestra Shell Commands
| Metadata | |
|---|---|
| Knowledge Sources | repo: DataTalksClub/data-engineering-zoomcamp, docs: Kestra Documentation, Kestra Shell Commands Plugin |
| Domains | Data Extraction, Shell Scripting, Workflow Orchestration, ETL |
| Last Updated | 2026-02-09 14:00 GMT |
Overview
Concrete tool for extracting data from remote HTTP sources using the Kestra Shell Commands plugin, which executes wget and gunzip within a managed process to download and decompress NYC taxi trip data files.
Description
This implementation uses the io.kestra.plugin.scripts.shell.Commands task type with a io.kestra.plugin.core.runner.Process task runner to execute a shell pipeline that downloads compressed CSV data from the DataTalksClub GitHub releases and decompresses it in a single streaming operation.
The task is parameterized by three user-selectable inputs defined at the flow level:
- inputs.taxi -- SELECT type with values
[yellow, green], defaulting toyellow. - inputs.year -- SELECT type with values
["2019", "2020"], defaulting to"2019". - inputs.month -- SELECT type with values
["01"through"12"], defaulting to"01".
These inputs are interpolated into the download URL and output filename using Kestra's Pebble template expressions. The outputFiles: ["*.csv"] directive registers all produced CSV files as task outputs in Kestra's internal storage, making them available to subsequent tasks via the expression {{outputs.extract.outputFiles['filename.csv']}}.
Usage
This task is the first data-processing step in the 04_postgres_taxi flow. It executes after the set_label task and before the conditional DDL tasks. The downloaded CSV is consumed by the CopyIn task for bulk loading into the staging table.
Code Reference
Source Location: 02-workflow-orchestration/flows/04_postgres_taxi.yaml, Lines 38-45
Signature:
- id: extract
type: io.kestra.plugin.scripts.shell.Commands
outputFiles:
- "*.csv"
taskRunner:
type: io.kestra.plugin.core.runner.Process
commands:
- wget -qO- https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{{inputs.taxi}}/{{render(vars.file)}}.gz | gunzip > {{render(vars.file)}}
Import: Requires the io.kestra.plugin.scripts.shell plugin (included in the default Kestra distribution).
I/O Contract
Inputs:
| Name | Type | Description |
|---|---|---|
| inputs.taxi | SELECT (yellow, green) | Taxi dataset type, determines download URL path segment |
| inputs.year | SELECT (2019, 2020) | Year partition of the dataset |
| inputs.month | SELECT (01-12) | Month partition of the dataset |
| vars.file | Rendered template | Filename template: Template:Inputs.taxi_tripdata_Template:Inputs.year-Template:Inputs.month.csv
|
Outputs:
| Name | Type | Description |
|---|---|---|
| outputs.extract.outputFiles | Map<String, URI> | Map of matched filenames to Kestra internal storage URIs. Key is the CSV filename (e.g., yellow_tripdata_2019-01.csv), value is the internal storage reference.
|
Usage Examples
Full flow inputs and extraction task in context:
id: 04_postgres_taxi
namespace: zoomcamp
inputs:
- id: taxi
type: SELECT
displayName: Select taxi type
values: [yellow, green]
defaults: yellow
- id: year
type: SELECT
displayName: Select year
values: ["2019", "2020"]
defaults: "2019"
- id: month
type: SELECT
displayName: Select month
values: ["01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12"]
defaults: "01"
variables:
file: "{{inputs.taxi}}_tripdata_{{inputs.year}}-{{inputs.month}}.csv"
data: "{{outputs.extract.outputFiles[inputs.taxi ~ '_tripdata_' ~ inputs.year ~ '-' ~ inputs.month ~ '.csv']}}"
tasks:
- id: extract
type: io.kestra.plugin.scripts.shell.Commands
outputFiles:
- "*.csv"
taskRunner:
type: io.kestra.plugin.core.runner.Process
commands:
- wget -qO- https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{{inputs.taxi}}/{{render(vars.file)}}.gz | gunzip > {{render(vars.file)}}
Referencing the output in a downstream task:
- id: copy_in_to_staging
type: io.kestra.plugin.jdbc.postgresql.CopyIn
from: "{{render(vars.data)}}"
# vars.data resolves to: outputs.extract.outputFiles['yellow_tripdata_2019-01.csv']