Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:DataTalksClub Data engineering zoomcamp Airflow Homework Solution

From Leeroopedia


Knowledge Sources
Domains Workflow_Orchestration, Airflow
Last Updated 2026-02-09 00:00 GMT

Overview

A multi-DAG Airflow solution that uses a factory function to create download-parquetize-upload-cleanup pipelines for yellow taxi, green taxi, FHV taxi, and zone lookup datasets, with Jinja-templated date-based file paths.

Description

This solution defines a reusable DAG factory function donwload_parquetize_upload_dag that constructs a four-task pipeline applicable to multiple NYC taxi datasets. The factory function accepts a DAG object and four template strings (URL, local CSV path, local Parquet path, and GCS destination path), then wires together:

  1. download_dataset_task (BashOperator): Downloads the CSV file from S3 using curl -sSLf.
  2. format_to_parquet_task (PythonOperator): Converts CSV to Parquet using PyArrow via the format_to_parquet(src_file, dest_file) helper function.
  3. local_to_gcs_task (PythonOperator): Uploads the Parquet file to GCS via the upload_to_gcs(bucket, object_name, local_file) helper function.
  4. rm_task (BashOperator): Cleans up the local CSV and Parquet files to free disk space.

The factory pattern is applied to four separate DAG instances:

  • yellow_taxi_data_v2: Yellow taxi data, scheduled at 0 6 2 * * (6 AM on the 2nd of each month), starting from January 2019, with catchup enabled and up to 3 concurrent runs.
  • green_taxi_data_v1: Green taxi data, scheduled at 0 7 2 * * (7 AM on the 2nd of each month), starting from January 2019.
  • hfv_taxi_data_v1: FHV taxi data, scheduled at 0 8 2 * * (8 AM on the 2nd of each month), from January 2019 to January 2020 (with an end_date).
  • zones_data_v1: Taxi zone lookup data, scheduled @once.

All DAGs use Jinja templates with {{ execution_date.strftime('%Y-%m') }} for dynamic date-based file naming, enabling Airflows backfill capability to process historical months.

Usage

Use this implementation as a reference for creating reusable, parameterized Airflow DAG factories that process multiple related datasets with identical pipeline logic but different data sources and schedules. It demonstrates best practices for backfill-capable DAGs using Jinja templating and local file cleanup.

Code Reference

Source Location

Signature

def format_to_parquet(src_file, dest_file):
    ...

def upload_to_gcs(bucket, object_name, local_file):
    ...

def donwload_parquetize_upload_dag(
    dag,
    url_template,
    local_csv_path_template,
    local_parquet_path_template,
    gcs_path_template
):
    ...

Import

import os
import logging
from datetime import datetime

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

from google.cloud import storage
import pyarrow.csv as pv
import pyarrow.parquet as pq

I/O Contract

Inputs

donwload_parquetize_upload_dag:

Name Type Required Description
dag airflow.DAG Yes The Airflow DAG object to attach tasks to
url_template str Yes Jinja-templated URL for downloading the source CSV file
local_csv_path_template str Yes Jinja-templated local path for the downloaded CSV file
local_parquet_path_template str Yes Jinja-templated local path for the converted Parquet file
gcs_path_template str Yes Jinja-templated destination path within the GCS bucket

format_to_parquet:

Name Type Required Description
src_file str Yes Path to the source CSV file
dest_file str Yes Path to the output Parquet file

upload_to_gcs:

Name Type Required Description
bucket str Yes GCS bucket name
object_name str Yes Target object path in GCS
local_file str Yes Local file path to upload

Environment Variables:

Name Type Required Description
GCP_PROJECT_ID str Yes Google Cloud project ID
GCP_GCS_BUCKET str Yes Target GCS bucket name
AIRFLOW_HOME str No Airflow home directory (defaults to /opt/airflow/)

Outputs

Name Type Description
GCS Parquet files GCS objects Parquet files organized by service type and year in GCS (e.g., raw/yellow_tripdata/2019/yellow_tripdata_2019-01.parquet)

Usage Examples

Basic Usage

from datetime import datetime
from airflow import DAG

# Define a new DAG using the factory function
my_dag = DAG(
    dag_id="custom_taxi_data",
    schedule_interval="0 6 2 * *",
    start_date=datetime(2021, 1, 1),
    default_args=default_args,
    catchup=True,
    max_active_runs=3,
    tags=['dtc-de'],
)

URL_PREFIX = 'https://s3.amazonaws.com/nyc-tlc/trip+data'
AIRFLOW_HOME = os.environ.get("AIRFLOW_HOME", "/opt/airflow/")

donwload_parquetize_upload_dag(
    dag=my_dag,
    url_template=URL_PREFIX + '/yellow_tripdata_{{ execution_date.strftime(\'%Y-%m\') }}.csv',
    local_csv_path_template=AIRFLOW_HOME + '/yellow_tripdata_{{ execution_date.strftime(\'%Y-%m\') }}.csv',
    local_parquet_path_template=AIRFLOW_HOME + '/yellow_tripdata_{{ execution_date.strftime(\'%Y-%m\') }}.parquet',
    gcs_path_template="raw/yellow_tripdata/{{ execution_date.strftime(\'%Y\') }}/yellow_tripdata_{{ execution_date.strftime(\'%Y-%m\') }}.parquet"
)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment