Implementation:DataTalksClub Data engineering zoomcamp Airflow Ingest Script
| Knowledge Sources | |
|---|---|
| Domains | Workflow_Orchestration, Airflow |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
A Python function that ingests CSV data into a PostgreSQL database using chunked Pandas reads and SQLAlchemy, designed to be called as a PythonOperator callable within an Airflow DAG.
Description
The ingest_callable function implements a chunked data ingestion pattern for loading CSV files into PostgreSQL. It establishes a database connection using SQLAlchemy with a dynamically constructed PostgreSQL connection string, then reads the CSV file in chunks of 100,000 rows using pd.read_csv with iterator=True and chunksize=100000.
For the first chunk, the function:
- Converts the tpep_pickup_datetime and tpep_dropoff_datetime columns to proper datetime types using pd.to_datetime.
- Creates the target table schema by inserting an empty DataFrame header with if_exists='replace'.
- Appends the first chunk of data to the newly created table.
Subsequent chunks follow the same datetime conversion pattern and are appended to the existing table. The function includes timing instrumentation that logs how long each chunk insertion takes. The loop terminates when a StopIteration exception is raised, indicating all data has been processed.
The function accepts an execution_date parameter, making it compatible with Airflows execution context for logging and tracking purposes.
Usage
Use this implementation when you need to ingest large CSV files into PostgreSQL within an Airflow pipeline. It is particularly suited for NYC yellow taxi trip data that contains tpep_pickup_datetime and tpep_dropoff_datetime columns. The chunked approach prevents memory issues when loading large datasets.
Code Reference
Source Location
- Repository: DataTalksClub_Data_engineering_zoomcamp
- File: cohorts/2022/week_2_data_ingestion/airflow/dags_local/ingest_script.py
- Lines: 1-48
Signature
def ingest_callable(user, password, host, port, db, table_name, csv_file, execution_date):
...
Import
import os
from time import time
import pandas as pd
from sqlalchemy import create_engine
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| user | str | Yes | PostgreSQL username |
| password | str | Yes | PostgreSQL password |
| host | str | Yes | PostgreSQL host address |
| port | str | Yes | PostgreSQL port number |
| db | str | Yes | PostgreSQL database name |
| table_name | str | Yes | Target table name for data insertion |
| csv_file | str | Yes | Path to the CSV file to ingest |
| execution_date | datetime | Yes | Airflow execution date passed from the DAG context |
Outputs
| Name | Type | Description |
|---|---|---|
| PostgreSQL table | Database table | The target table populated with all rows from the CSV file, with datetime columns properly converted. The table is recreated (replaced) on each run. |
Usage Examples
Basic Usage
# Called as an Airflow PythonOperator callable:
from airflow.operators.python import PythonOperator
ingest_task = PythonOperator(
task_id="ingest_task",
python_callable=ingest_callable,
op_kwargs={
"user": "airflow",
"password": "airflow",
"host": "pgdatabase",
"port": "5432",
"db": "ny_taxi",
"table_name": "yellow_taxi_trips",
"csv_file": "/opt/airflow/output.csv",
"execution_date": "{{ execution_date }}",
},
)
Standalone Usage
from datetime import datetime
from ingest_script import ingest_callable
ingest_callable(
user="postgres",
password="postgres",
host="localhost",
port="5432",
db="ny_taxi",
table_name="yellow_taxi_trips",
csv_file="yellow_tripdata_2021-01.csv",
execution_date=datetime(2021, 1, 1)
)