Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:DataTalksClub Data engineering zoomcamp Airflow GCS Ingestion DAG

From Leeroopedia


Knowledge Sources
Domains Workflow_Orchestration, Airflow
Last Updated 2026-02-09 00:00 GMT

Overview

An Airflow DAG that orchestrates downloading NYC yellow taxi CSV data, converting it to Parquet, uploading to GCS, and creating a BigQuery external table.

Description

This DAG, identified as data_ingestion_gcs_dag, implements a four-task pipeline for ingesting NYC yellow taxi trip data into Google Cloud Platform. The pipeline consists of:

  1. download_dataset_task (BashOperator): Downloads the CSV file from the NYC TLC S3 bucket using curl.
  2. format_to_parquet_task (PythonOperator): Converts the downloaded CSV to Parquet format using PyArrows pv.read_csv and pq.write_table functions via the format_to_parquet helper function.
  3. local_to_gcs_task (PythonOperator): Uploads the Parquet file to a GCS bucket using the upload_to_gcs helper function, which includes a workaround for upload timeouts on files larger than 6 MB by setting custom multipart and chunk sizes (5 MB each).
  4. bigquery_external_table_task (BigQueryCreateExternalTableOperator): Creates a BigQuery external table pointing to the uploaded Parquet file in GCS.

The DAG runs on a @daily schedule with no catchup, a single active run limit, and is tagged with dtc-de. Configuration is driven by environment variables: GCP_PROJECT_ID, GCP_GCS_BUCKET, AIRFLOW_HOME, and BIGQUERY_DATASET.

Usage

Use this implementation as a reference for building Airflow DAGs that ingest CSV data from external sources, convert to columnar format, upload to GCS, and register as BigQuery external tables. It demonstrates the standard download-transform-upload-register pattern used in data warehouse pipelines.

Code Reference

Source Location

Signature

def format_to_parquet(src_file):
    ...

def upload_to_gcs(bucket, object_name, local_file):
    ...

# DAG declaration
with DAG(
    dag_id="data_ingestion_gcs_dag",
    schedule_interval="@daily",
    default_args=default_args,
    catchup=False,
    max_active_runs=1,
    tags=['dtc-de'],
) as dag:
    ...

Import

import os
import logging

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

from google.cloud import storage
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateExternalTableOperator
import pyarrow.csv as pv
import pyarrow.parquet as pq

I/O Contract

Inputs

format_to_parquet:

Name Type Required Description
src_file str Yes Path to the source CSV file to convert

upload_to_gcs:

Name Type Required Description
bucket str Yes GCS bucket name
object_name str Yes Target path and file name in GCS
local_file str Yes Source path and file name on local filesystem

Environment Variables:

Name Type Required Description
GCP_PROJECT_ID str Yes Google Cloud project ID
GCP_GCS_BUCKET str Yes Target GCS bucket name
AIRFLOW_HOME str No Airflow home directory (defaults to /opt/airflow/)
BIGQUERY_DATASET str No BigQuery dataset name (defaults to trips_data_all)

Outputs

Name Type Description
Parquet file in GCS GCS object The converted Parquet file uploaded to raw/{parquet_file} in the GCS bucket
BigQuery external table BigQuery table An external table named external_table in the configured BigQuery dataset pointing to the GCS Parquet file

Usage Examples

Basic Usage

# This DAG is triggered automatically by Airflow on a daily schedule.
# To trigger manually via the Airflow CLI:
# airflow dags trigger data_ingestion_gcs_dag

# The format_to_parquet function can be used standalone:
format_to_parquet("/opt/airflow/yellow_tripdata_2021-01.csv")

# The upload_to_gcs function can be used standalone:
upload_to_gcs(
    bucket="my-gcs-bucket",
    object_name="raw/yellow_tripdata_2021-01.parquet",
    local_file="/opt/airflow/yellow_tripdata_2021-01.parquet"
)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment