Implementation:DataTalksClub Data engineering zoomcamp Pandas Chunked CSV Loading
| Metadata | |
|---|---|
| Knowledge Sources | DataTalksClub/data-engineering-zoomcamp |
| Domains | pandas, Data Ingestion, PostgreSQL, Batch Processing |
| Last Updated | 2026-02-09 14:00 GMT |
Overview
Concrete tool for reading a remote CSV.gz file in fixed-size chunks using pd.read_csv() with iterator=True, creating the target PostgreSQL table schema from the first chunk, and appending all chunks via DataFrame.to_sql().
Description
This implementation uses pandas read_csv() in iterator mode to create a TextFileReader object that yields DataFrame chunks of a configurable size (default 100,000 rows). The URL points to a gzip-compressed CSV file hosted on GitHub Releases, and pandas handles HTTP fetching and gzip decompression transparently.
The ingestion loop has two phases:
- Schema creation: On the first iteration,
df_chunk.head(0).to_sql(name=target_table, con=engine, if_exists='replace')creates the target table with the correct column types but zero rows. Thehead(0)call returns an empty DataFrame with the same schema, andif_exists='replace'drops any pre-existing table. - Data insertion: Every chunk (including the first) is written via
df_chunk.to_sql(name=target_table, con=engine, if_exists='append'), which appends rows to the existing table.
Progress is tracked using tqdm, which wraps the chunk iterator and displays a progress bar with the number of chunks processed.
Usage
This is the core data loading step of the ingestion pipeline. It is called after the engine has been created and the dtype/parse_dates configuration has been defined. It handles the entire lifecycle from HTTP download through database insertion.
Code Reference
Source Location: 01-docker-terraform/docker-sql/pipeline/ingest_data.py:L51-74
Signature:
df_iter = pd.read_csv(
url,
dtype=dtype,
parse_dates=parse_dates,
iterator=True,
chunksize=chunksize,
)
first = True
for df_chunk in tqdm(df_iter):
if first:
df_chunk.head(0).to_sql(
name=target_table,
con=engine,
if_exists='replace'
)
first = False
df_chunk.to_sql(
name=target_table,
con=engine,
if_exists='append'
)
Imports:
import pandas as pd
from tqdm.auto import tqdm
External Dependencies:
| Package | Version | Purpose |
|---|---|---|
pandas |
>=2.3.3 | CSV reading, DataFrame operations, to_sql() for database writes
|
pyarrow |
>=22.0.0 | Backend engine for pandas nullable dtypes (e.g., Int64)
|
tqdm |
>=4.67.1 | Progress bar for tracking chunk processing |
I/O Contract
Inputs
| Name | Type | Default | Description |
|---|---|---|---|
url |
str |
(constructed from year/month) | URL to the gzip-compressed CSV file on GitHub Releases |
dtype |
dict[str, str] |
(module-level dict) | Column name to pandas dtype mapping for 16 columns |
parse_dates |
list[str] |
(module-level list) | List of 2 datetime column names to parse |
chunksize |
int |
100000 |
Number of rows per chunk (CLI: --chunksize)
|
target_table |
str |
yellow_taxi_data |
Name of the PostgreSQL table to create/populate (CLI: --target-table)
|
engine |
sqlalchemy.engine.Engine |
(created in run()) | SQLAlchemy engine connected to the target PostgreSQL database |
Outputs
| Name | Type | Description |
|---|---|---|
| PostgreSQL table | Database table | The target table (e.g., yellow_taxi_data) populated with all rows from the CSV source, with correct column types and parsed datetime columns
|
Usage Examples
Full ingestion workflow (from ingest_data.py):
import pandas as pd
from sqlalchemy import create_engine
from tqdm.auto import tqdm
# Configuration (normally from CLI params and module-level dicts)
engine = create_engine('postgresql+psycopg://root:root@localhost:5432/ny_taxi')
url = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz'
target_table = 'yellow_taxi_data'
chunksize = 100000
dtype = {
"VendorID": "Int64", "passenger_count": "Int64",
"trip_distance": "float64", "RatecodeID": "Int64",
"store_and_fwd_flag": "string", "PULocationID": "Int64",
"DOLocationID": "Int64", "payment_type": "Int64",
"fare_amount": "float64", "extra": "float64",
"mta_tax": "float64", "tip_amount": "float64",
"tolls_amount": "float64", "improvement_surcharge": "float64",
"total_amount": "float64", "congestion_surcharge": "float64"
}
parse_dates = ["tpep_pickup_datetime", "tpep_dropoff_datetime"]
# Chunked reading
df_iter = pd.read_csv(url, dtype=dtype, parse_dates=parse_dates,
iterator=True, chunksize=chunksize)
first = True
for df_chunk in tqdm(df_iter):
if first:
df_chunk.head(0).to_sql(name=target_table, con=engine, if_exists='replace')
first = False
df_chunk.to_sql(name=target_table, con=engine, if_exists='append')
Adjusting chunk size for memory-constrained environments:
# Use a smaller chunk size (10,000 rows) for containers with limited RAM
df_iter = pd.read_csv(url, dtype=dtype, parse_dates=parse_dates,
iterator=True, chunksize=10000)
Related Pages
- Principle:DataTalksClub_Data_engineering_zoomcamp_Chunked_Data_Ingestion
- Implementation:DataTalksClub_Data_engineering_zoomcamp_Pandas_Dtype_Configuration
- Implementation:DataTalksClub_Data_engineering_zoomcamp_SQLAlchemy_Create_Engine
- Heuristic:DataTalksClub_Data_engineering_zoomcamp_CSV_Chunk_Size_Optimization
- Environment:DataTalksClub_Data_engineering_zoomcamp_Docker_PostgreSQL_Python_Environment