Implementation:DataTalksClub Data engineering zoomcamp Airflow GCS To BQ DAG
| Knowledge Sources | |
|---|---|
| Domains | Workflow_Orchestration, Airflow |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
An Airflow DAG that moves Parquet files within GCS, creates BigQuery external tables, and builds date-partitioned BigQuery tables for both yellow and green taxi trip data.
Description
This DAG, identified as gcs_2_bq_dag, orchestrates the promotion of raw Parquet data in GCS into queryable, partitioned BigQuery tables. It iterates over a COLOUR_RANGE dictionary mapping taxi colors to their respective pickup datetime columns (tpep_pickup_datetime for yellow, lpep_pickup_datetime for green), creating a three-task pipeline for each color:
- move_{colour}_tripdata_files_task (GCSToGCSOperator): Moves Parquet files from the raw/ prefix to a color-specific prefix within the same GCS bucket (e.g., raw/yellow_tripdata*.parquet to yellow/yellow_tripdata), using move_object=True to remove the source files after transfer.
- bq_{colour}_tripdata_external_table_task (BigQueryCreateExternalTableOperator): Creates a BigQuery external table with autodetect enabled, pointing to all Parquet files under the color-specific GCS prefix (e.g., gs://{BUCKET}/yellow/*).
- bq_create_{colour}_tripdata_partitioned_table_task (BigQueryInsertJobOperator): Executes a CREATE OR REPLACE TABLE SQL statement that creates a date-partitioned table from the external table, partitioning by the appropriate pickup datetime column using PARTITION BY DATE().
The DAG runs on a @daily schedule with no catchup, a single active run limit, and is tagged with dtc-de. Configuration is driven by environment variables: GCP_PROJECT_ID, GCP_GCS_BUCKET, and BIGQUERY_DATASET (defaulting to trips_data_all).
Usage
Use this implementation as a reference for building Airflow DAGs that promote raw GCS data into optimized BigQuery tables. It demonstrates the pattern of GCS file reorganization, external table creation, and partitioned table materialization, which is a common approach for data warehouse loading pipelines.
Code Reference
Source Location
- Repository: DataTalksClub_Data_engineering_zoomcamp
- File: cohorts/2022/week_3_data_warehouse/airflow/dags/gcs_to_bq_dag.py
- Lines: 1-81
Signature
# DAG declaration
with DAG(
dag_id="gcs_2_bq_dag",
schedule_interval="@daily",
default_args=default_args,
catchup=False,
max_active_runs=1,
tags=['dtc-de'],
) as dag:
for colour, ds_col in COLOUR_RANGE.items():
...
Import
import os
import logging
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateExternalTableOperator, BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
I/O Contract
Inputs
Environment Variables:
| Name | Type | Required | Description |
|---|---|---|---|
| GCP_PROJECT_ID | str | Yes | Google Cloud project ID |
| GCP_GCS_BUCKET | str | Yes | GCS bucket containing the raw Parquet files |
| BIGQUERY_DATASET | str | No | BigQuery dataset name (defaults to trips_data_all) |
GCS Source Data:
| Name | Type | Required | Description |
|---|---|---|---|
| raw/{colour}_tripdata*.parquet | Parquet files | Yes | Raw taxi trip data files in Parquet format under the raw/ prefix in GCS |
Outputs
| Name | Type | Description |
|---|---|---|
| GCS reorganized files | GCS objects | Parquet files moved from raw/ to color-specific prefixes (yellow/, green/) |
| BigQuery external tables | BigQuery tables | External tables named {colour}_tripdata_external_table pointing to GCS Parquet files |
| BigQuery partitioned tables | BigQuery tables | Materialized tables named {colour}_tripdata partitioned by the pickup datetime column |
Usage Examples
Basic Usage
# This DAG is triggered automatically by Airflow on a daily schedule.
# To trigger manually via the Airflow CLI:
# airflow dags trigger gcs_2_bq_dag
# The SQL generated for creating a partitioned table looks like:
# CREATE OR REPLACE TABLE trips_data_all.yellow_tripdata
# PARTITION BY DATE(tpep_pickup_datetime)
# AS
# SELECT * FROM trips_data_all.yellow_tripdata_external_table;
Extending with Additional Colors
# To add FHV taxi data, extend the COLOUR_RANGE dictionary:
COLOUR_RANGE = {
'yellow': 'tpep_pickup_datetime',
'green': 'lpep_pickup_datetime',
'fhv': 'pickup_datetime'
}
# The loop will automatically create task sets for each color.