Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:DataTalksClub Data engineering zoomcamp Airflow Ingest Script

From Leeroopedia


Knowledge Sources
Domains Workflow_Orchestration, Airflow
Last Updated 2026-02-09 00:00 GMT

Overview

A Python function that ingests CSV data into a PostgreSQL database using chunked Pandas reads and SQLAlchemy, designed to be called as a PythonOperator callable within an Airflow DAG.

Description

The ingest_callable function implements a chunked data ingestion pattern for loading CSV files into PostgreSQL. It establishes a database connection using SQLAlchemy with a dynamically constructed PostgreSQL connection string, then reads the CSV file in chunks of 100,000 rows using pd.read_csv with iterator=True and chunksize=100000.

For the first chunk, the function:

  1. Converts the tpep_pickup_datetime and tpep_dropoff_datetime columns to proper datetime types using pd.to_datetime.
  2. Creates the target table schema by inserting an empty DataFrame header with if_exists='replace'.
  3. Appends the first chunk of data to the newly created table.

Subsequent chunks follow the same datetime conversion pattern and are appended to the existing table. The function includes timing instrumentation that logs how long each chunk insertion takes. The loop terminates when a StopIteration exception is raised, indicating all data has been processed.

The function accepts an execution_date parameter, making it compatible with Airflows execution context for logging and tracking purposes.

Usage

Use this implementation when you need to ingest large CSV files into PostgreSQL within an Airflow pipeline. It is particularly suited for NYC yellow taxi trip data that contains tpep_pickup_datetime and tpep_dropoff_datetime columns. The chunked approach prevents memory issues when loading large datasets.

Code Reference

Source Location

Signature

def ingest_callable(user, password, host, port, db, table_name, csv_file, execution_date):
    ...

Import

import os
from time import time
import pandas as pd
from sqlalchemy import create_engine

I/O Contract

Inputs

Name Type Required Description
user str Yes PostgreSQL username
password str Yes PostgreSQL password
host str Yes PostgreSQL host address
port str Yes PostgreSQL port number
db str Yes PostgreSQL database name
table_name str Yes Target table name for data insertion
csv_file str Yes Path to the CSV file to ingest
execution_date datetime Yes Airflow execution date passed from the DAG context

Outputs

Name Type Description
PostgreSQL table Database table The target table populated with all rows from the CSV file, with datetime columns properly converted. The table is recreated (replaced) on each run.

Usage Examples

Basic Usage

# Called as an Airflow PythonOperator callable:
from airflow.operators.python import PythonOperator

ingest_task = PythonOperator(
    task_id="ingest_task",
    python_callable=ingest_callable,
    op_kwargs={
        "user": "airflow",
        "password": "airflow",
        "host": "pgdatabase",
        "port": "5432",
        "db": "ny_taxi",
        "table_name": "yellow_taxi_trips",
        "csv_file": "/opt/airflow/output.csv",
        "execution_date": "{{ execution_date }}",
    },
)

Standalone Usage

from datetime import datetime
from ingest_script import ingest_callable

ingest_callable(
    user="postgres",
    password="postgres",
    host="localhost",
    port="5432",
    db="ny_taxi",
    table_name="yellow_taxi_trips",
    csv_file="yellow_tripdata_2021-01.csv",
    execution_date=datetime(2021, 1, 1)
)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment