Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:DataTalksClub Data engineering zoomcamp Airflow GCS To BQ DAG

From Leeroopedia


Knowledge Sources
Domains Workflow_Orchestration, Airflow
Last Updated 2026-02-09 00:00 GMT

Overview

An Airflow DAG that moves Parquet files within GCS, creates BigQuery external tables, and builds date-partitioned BigQuery tables for both yellow and green taxi trip data.

Description

This DAG, identified as gcs_2_bq_dag, orchestrates the promotion of raw Parquet data in GCS into queryable, partitioned BigQuery tables. It iterates over a COLOUR_RANGE dictionary mapping taxi colors to their respective pickup datetime columns (tpep_pickup_datetime for yellow, lpep_pickup_datetime for green), creating a three-task pipeline for each color:

  1. move_{colour}_tripdata_files_task (GCSToGCSOperator): Moves Parquet files from the raw/ prefix to a color-specific prefix within the same GCS bucket (e.g., raw/yellow_tripdata*.parquet to yellow/yellow_tripdata), using move_object=True to remove the source files after transfer.
  2. bq_{colour}_tripdata_external_table_task (BigQueryCreateExternalTableOperator): Creates a BigQuery external table with autodetect enabled, pointing to all Parquet files under the color-specific GCS prefix (e.g., gs://{BUCKET}/yellow/*).
  3. bq_create_{colour}_tripdata_partitioned_table_task (BigQueryInsertJobOperator): Executes a CREATE OR REPLACE TABLE SQL statement that creates a date-partitioned table from the external table, partitioning by the appropriate pickup datetime column using PARTITION BY DATE().

The DAG runs on a @daily schedule with no catchup, a single active run limit, and is tagged with dtc-de. Configuration is driven by environment variables: GCP_PROJECT_ID, GCP_GCS_BUCKET, and BIGQUERY_DATASET (defaulting to trips_data_all).

Usage

Use this implementation as a reference for building Airflow DAGs that promote raw GCS data into optimized BigQuery tables. It demonstrates the pattern of GCS file reorganization, external table creation, and partitioned table materialization, which is a common approach for data warehouse loading pipelines.

Code Reference

Source Location

Signature

# DAG declaration
with DAG(
    dag_id="gcs_2_bq_dag",
    schedule_interval="@daily",
    default_args=default_args,
    catchup=False,
    max_active_runs=1,
    tags=['dtc-de'],
) as dag:
    for colour, ds_col in COLOUR_RANGE.items():
        ...

Import

import os
import logging

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateExternalTableOperator, BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator

I/O Contract

Inputs

Environment Variables:

Name Type Required Description
GCP_PROJECT_ID str Yes Google Cloud project ID
GCP_GCS_BUCKET str Yes GCS bucket containing the raw Parquet files
BIGQUERY_DATASET str No BigQuery dataset name (defaults to trips_data_all)

GCS Source Data:

Name Type Required Description
raw/{colour}_tripdata*.parquet Parquet files Yes Raw taxi trip data files in Parquet format under the raw/ prefix in GCS

Outputs

Name Type Description
GCS reorganized files GCS objects Parquet files moved from raw/ to color-specific prefixes (yellow/, green/)
BigQuery external tables BigQuery tables External tables named {colour}_tripdata_external_table pointing to GCS Parquet files
BigQuery partitioned tables BigQuery tables Materialized tables named {colour}_tripdata partitioned by the pickup datetime column

Usage Examples

Basic Usage

# This DAG is triggered automatically by Airflow on a daily schedule.
# To trigger manually via the Airflow CLI:
# airflow dags trigger gcs_2_bq_dag

# The SQL generated for creating a partitioned table looks like:
# CREATE OR REPLACE TABLE trips_data_all.yellow_tripdata
# PARTITION BY DATE(tpep_pickup_datetime)
# AS
# SELECT * FROM trips_data_all.yellow_tripdata_external_table;

Extending with Additional Colors

# To add FHV taxi data, extend the COLOUR_RANGE dictionary:
COLOUR_RANGE = {
    'yellow': 'tpep_pickup_datetime',
    'green': 'lpep_pickup_datetime',
    'fhv': 'pickup_datetime'
}
# The loop will automatically create task sets for each color.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment