Implementation:DataTalksClub Data engineering zoomcamp Airflow Homework Solution
| Knowledge Sources | |
|---|---|
| Domains | Workflow_Orchestration, Airflow |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
A multi-DAG Airflow solution that uses a factory function to create download-parquetize-upload-cleanup pipelines for yellow taxi, green taxi, FHV taxi, and zone lookup datasets, with Jinja-templated date-based file paths.
Description
This solution defines a reusable DAG factory function donwload_parquetize_upload_dag that constructs a four-task pipeline applicable to multiple NYC taxi datasets. The factory function accepts a DAG object and four template strings (URL, local CSV path, local Parquet path, and GCS destination path), then wires together:
- download_dataset_task (BashOperator): Downloads the CSV file from S3 using curl -sSLf.
- format_to_parquet_task (PythonOperator): Converts CSV to Parquet using PyArrow via the format_to_parquet(src_file, dest_file) helper function.
- local_to_gcs_task (PythonOperator): Uploads the Parquet file to GCS via the upload_to_gcs(bucket, object_name, local_file) helper function.
- rm_task (BashOperator): Cleans up the local CSV and Parquet files to free disk space.
The factory pattern is applied to four separate DAG instances:
- yellow_taxi_data_v2: Yellow taxi data, scheduled at 0 6 2 * * (6 AM on the 2nd of each month), starting from January 2019, with catchup enabled and up to 3 concurrent runs.
- green_taxi_data_v1: Green taxi data, scheduled at 0 7 2 * * (7 AM on the 2nd of each month), starting from January 2019.
- hfv_taxi_data_v1: FHV taxi data, scheduled at 0 8 2 * * (8 AM on the 2nd of each month), from January 2019 to January 2020 (with an end_date).
- zones_data_v1: Taxi zone lookup data, scheduled @once.
All DAGs use Jinja templates with {{ execution_date.strftime('%Y-%m') }} for dynamic date-based file naming, enabling Airflows backfill capability to process historical months.
Usage
Use this implementation as a reference for creating reusable, parameterized Airflow DAG factories that process multiple related datasets with identical pipeline logic but different data sources and schedules. It demonstrates best practices for backfill-capable DAGs using Jinja templating and local file cleanup.
Code Reference
Source Location
- Repository: DataTalksClub_Data_engineering_zoomcamp
- File: cohorts/2022/week_2_data_ingestion/homework/solution.py
- Lines: 1-188
Signature
def format_to_parquet(src_file, dest_file):
...
def upload_to_gcs(bucket, object_name, local_file):
...
def donwload_parquetize_upload_dag(
dag,
url_template,
local_csv_path_template,
local_parquet_path_template,
gcs_path_template
):
...
Import
import os
import logging
from datetime import datetime
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from google.cloud import storage
import pyarrow.csv as pv
import pyarrow.parquet as pq
I/O Contract
Inputs
donwload_parquetize_upload_dag:
| Name | Type | Required | Description |
|---|---|---|---|
| dag | airflow.DAG | Yes | The Airflow DAG object to attach tasks to |
| url_template | str | Yes | Jinja-templated URL for downloading the source CSV file |
| local_csv_path_template | str | Yes | Jinja-templated local path for the downloaded CSV file |
| local_parquet_path_template | str | Yes | Jinja-templated local path for the converted Parquet file |
| gcs_path_template | str | Yes | Jinja-templated destination path within the GCS bucket |
format_to_parquet:
| Name | Type | Required | Description |
|---|---|---|---|
| src_file | str | Yes | Path to the source CSV file |
| dest_file | str | Yes | Path to the output Parquet file |
upload_to_gcs:
| Name | Type | Required | Description |
|---|---|---|---|
| bucket | str | Yes | GCS bucket name |
| object_name | str | Yes | Target object path in GCS |
| local_file | str | Yes | Local file path to upload |
Environment Variables:
| Name | Type | Required | Description |
|---|---|---|---|
| GCP_PROJECT_ID | str | Yes | Google Cloud project ID |
| GCP_GCS_BUCKET | str | Yes | Target GCS bucket name |
| AIRFLOW_HOME | str | No | Airflow home directory (defaults to /opt/airflow/) |
Outputs
| Name | Type | Description |
|---|---|---|
| GCS Parquet files | GCS objects | Parquet files organized by service type and year in GCS (e.g., raw/yellow_tripdata/2019/yellow_tripdata_2019-01.parquet) |
Usage Examples
Basic Usage
from datetime import datetime
from airflow import DAG
# Define a new DAG using the factory function
my_dag = DAG(
dag_id="custom_taxi_data",
schedule_interval="0 6 2 * *",
start_date=datetime(2021, 1, 1),
default_args=default_args,
catchup=True,
max_active_runs=3,
tags=['dtc-de'],
)
URL_PREFIX = 'https://s3.amazonaws.com/nyc-tlc/trip+data'
AIRFLOW_HOME = os.environ.get("AIRFLOW_HOME", "/opt/airflow/")
donwload_parquetize_upload_dag(
dag=my_dag,
url_template=URL_PREFIX + '/yellow_tripdata_{{ execution_date.strftime(\'%Y-%m\') }}.csv',
local_csv_path_template=AIRFLOW_HOME + '/yellow_tripdata_{{ execution_date.strftime(\'%Y-%m\') }}.csv',
local_parquet_path_template=AIRFLOW_HOME + '/yellow_tripdata_{{ execution_date.strftime(\'%Y-%m\') }}.parquet',
gcs_path_template="raw/yellow_tripdata/{{ execution_date.strftime(\'%Y\') }}/yellow_tripdata_{{ execution_date.strftime(\'%Y-%m\') }}.parquet"
)