Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:DataTalksClub Data engineering zoomcamp Pandas Chunked CSV Loading

From Leeroopedia


Metadata
Knowledge Sources DataTalksClub/data-engineering-zoomcamp
Domains pandas, Data Ingestion, PostgreSQL, Batch Processing
Last Updated 2026-02-09 14:00 GMT

Overview

Concrete tool for reading a remote CSV.gz file in fixed-size chunks using pd.read_csv() with iterator=True, creating the target PostgreSQL table schema from the first chunk, and appending all chunks via DataFrame.to_sql().

Description

This implementation uses pandas read_csv() in iterator mode to create a TextFileReader object that yields DataFrame chunks of a configurable size (default 100,000 rows). The URL points to a gzip-compressed CSV file hosted on GitHub Releases, and pandas handles HTTP fetching and gzip decompression transparently.

The ingestion loop has two phases:

  1. Schema creation: On the first iteration, df_chunk.head(0).to_sql(name=target_table, con=engine, if_exists='replace') creates the target table with the correct column types but zero rows. The head(0) call returns an empty DataFrame with the same schema, and if_exists='replace' drops any pre-existing table.
  2. Data insertion: Every chunk (including the first) is written via df_chunk.to_sql(name=target_table, con=engine, if_exists='append'), which appends rows to the existing table.

Progress is tracked using tqdm, which wraps the chunk iterator and displays a progress bar with the number of chunks processed.

Usage

This is the core data loading step of the ingestion pipeline. It is called after the engine has been created and the dtype/parse_dates configuration has been defined. It handles the entire lifecycle from HTTP download through database insertion.

Code Reference

Source Location: 01-docker-terraform/docker-sql/pipeline/ingest_data.py:L51-74

Signature:

df_iter = pd.read_csv(
    url,
    dtype=dtype,
    parse_dates=parse_dates,
    iterator=True,
    chunksize=chunksize,
)

first = True

for df_chunk in tqdm(df_iter):
    if first:
        df_chunk.head(0).to_sql(
            name=target_table,
            con=engine,
            if_exists='replace'
        )
        first = False

    df_chunk.to_sql(
        name=target_table,
        con=engine,
        if_exists='append'
    )

Imports:

import pandas as pd
from tqdm.auto import tqdm

External Dependencies:

Package Version Purpose
pandas >=2.3.3 CSV reading, DataFrame operations, to_sql() for database writes
pyarrow >=22.0.0 Backend engine for pandas nullable dtypes (e.g., Int64)
tqdm >=4.67.1 Progress bar for tracking chunk processing

I/O Contract

Inputs

Name Type Default Description
url str (constructed from year/month) URL to the gzip-compressed CSV file on GitHub Releases
dtype dict[str, str] (module-level dict) Column name to pandas dtype mapping for 16 columns
parse_dates list[str] (module-level list) List of 2 datetime column names to parse
chunksize int 100000 Number of rows per chunk (CLI: --chunksize)
target_table str yellow_taxi_data Name of the PostgreSQL table to create/populate (CLI: --target-table)
engine sqlalchemy.engine.Engine (created in run()) SQLAlchemy engine connected to the target PostgreSQL database

Outputs

Name Type Description
PostgreSQL table Database table The target table (e.g., yellow_taxi_data) populated with all rows from the CSV source, with correct column types and parsed datetime columns

Usage Examples

Full ingestion workflow (from ingest_data.py):

import pandas as pd
from sqlalchemy import create_engine
from tqdm.auto import tqdm

# Configuration (normally from CLI params and module-level dicts)
engine = create_engine('postgresql+psycopg://root:root@localhost:5432/ny_taxi')
url = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz'
target_table = 'yellow_taxi_data'
chunksize = 100000

dtype = {
    "VendorID": "Int64", "passenger_count": "Int64",
    "trip_distance": "float64", "RatecodeID": "Int64",
    "store_and_fwd_flag": "string", "PULocationID": "Int64",
    "DOLocationID": "Int64", "payment_type": "Int64",
    "fare_amount": "float64", "extra": "float64",
    "mta_tax": "float64", "tip_amount": "float64",
    "tolls_amount": "float64", "improvement_surcharge": "float64",
    "total_amount": "float64", "congestion_surcharge": "float64"
}
parse_dates = ["tpep_pickup_datetime", "tpep_dropoff_datetime"]

# Chunked reading
df_iter = pd.read_csv(url, dtype=dtype, parse_dates=parse_dates,
                       iterator=True, chunksize=chunksize)

first = True
for df_chunk in tqdm(df_iter):
    if first:
        df_chunk.head(0).to_sql(name=target_table, con=engine, if_exists='replace')
        first = False
    df_chunk.to_sql(name=target_table, con=engine, if_exists='append')

Adjusting chunk size for memory-constrained environments:

# Use a smaller chunk size (10,000 rows) for containers with limited RAM
df_iter = pd.read_csv(url, dtype=dtype, parse_dates=parse_dates,
                       iterator=True, chunksize=10000)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment