Implementation:DataTalksClub Data engineering zoomcamp Airflow GCS Ingestion DAG
| Knowledge Sources | |
|---|---|
| Domains | Workflow_Orchestration, Airflow |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
An Airflow DAG that orchestrates downloading NYC yellow taxi CSV data, converting it to Parquet, uploading to GCS, and creating a BigQuery external table.
Description
This DAG, identified as data_ingestion_gcs_dag, implements a four-task pipeline for ingesting NYC yellow taxi trip data into Google Cloud Platform. The pipeline consists of:
- download_dataset_task (BashOperator): Downloads the CSV file from the NYC TLC S3 bucket using curl.
- format_to_parquet_task (PythonOperator): Converts the downloaded CSV to Parquet format using PyArrows pv.read_csv and pq.write_table functions via the format_to_parquet helper function.
- local_to_gcs_task (PythonOperator): Uploads the Parquet file to a GCS bucket using the upload_to_gcs helper function, which includes a workaround for upload timeouts on files larger than 6 MB by setting custom multipart and chunk sizes (5 MB each).
- bigquery_external_table_task (BigQueryCreateExternalTableOperator): Creates a BigQuery external table pointing to the uploaded Parquet file in GCS.
The DAG runs on a @daily schedule with no catchup, a single active run limit, and is tagged with dtc-de. Configuration is driven by environment variables: GCP_PROJECT_ID, GCP_GCS_BUCKET, AIRFLOW_HOME, and BIGQUERY_DATASET.
Usage
Use this implementation as a reference for building Airflow DAGs that ingest CSV data from external sources, convert to columnar format, upload to GCS, and register as BigQuery external tables. It demonstrates the standard download-transform-upload-register pattern used in data warehouse pipelines.
Code Reference
Source Location
- Repository: DataTalksClub_Data_engineering_zoomcamp
- File: cohorts/2022/week_2_data_ingestion/airflow/dags/data_ingestion_gcs_dag.py
- Lines: 1-110
Signature
def format_to_parquet(src_file):
...
def upload_to_gcs(bucket, object_name, local_file):
...
# DAG declaration
with DAG(
dag_id="data_ingestion_gcs_dag",
schedule_interval="@daily",
default_args=default_args,
catchup=False,
max_active_runs=1,
tags=['dtc-de'],
) as dag:
...
Import
import os
import logging
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from google.cloud import storage
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateExternalTableOperator
import pyarrow.csv as pv
import pyarrow.parquet as pq
I/O Contract
Inputs
format_to_parquet:
| Name | Type | Required | Description |
|---|---|---|---|
| src_file | str | Yes | Path to the source CSV file to convert |
upload_to_gcs:
| Name | Type | Required | Description |
|---|---|---|---|
| bucket | str | Yes | GCS bucket name |
| object_name | str | Yes | Target path and file name in GCS |
| local_file | str | Yes | Source path and file name on local filesystem |
Environment Variables:
| Name | Type | Required | Description |
|---|---|---|---|
| GCP_PROJECT_ID | str | Yes | Google Cloud project ID |
| GCP_GCS_BUCKET | str | Yes | Target GCS bucket name |
| AIRFLOW_HOME | str | No | Airflow home directory (defaults to /opt/airflow/) |
| BIGQUERY_DATASET | str | No | BigQuery dataset name (defaults to trips_data_all) |
Outputs
| Name | Type | Description |
|---|---|---|
| Parquet file in GCS | GCS object | The converted Parquet file uploaded to raw/{parquet_file} in the GCS bucket |
| BigQuery external table | BigQuery table | An external table named external_table in the configured BigQuery dataset pointing to the GCS Parquet file |
Usage Examples
Basic Usage
# This DAG is triggered automatically by Airflow on a daily schedule.
# To trigger manually via the Airflow CLI:
# airflow dags trigger data_ingestion_gcs_dag
# The format_to_parquet function can be used standalone:
format_to_parquet("/opt/airflow/yellow_tripdata_2021-01.csv")
# The upload_to_gcs function can be used standalone:
upload_to_gcs(
bucket="my-gcs-bucket",
object_name="raw/yellow_tripdata_2021-01.parquet",
local_file="/opt/airflow/yellow_tripdata_2021-01.parquet"
)