Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:DataTalksClub Data engineering zoomcamp Redpanda CSV Producer

From Leeroopedia


Knowledge Sources
Domains Streaming, Kafka
Last Updated 2026-02-09 00:00 GMT

Overview

Python class RideCSVProducer that reads CSV taxi ride data, extracts selected columns (vendor_id, pickup/dropoff datetimes, passenger_count, trip_distance, payment_type, total_amount), and publishes them as comma-separated string messages to a Kafka/Redpanda topic using kafka-python.

Description

The RideCSVProducer class provides a lightweight producer that publishes taxi ride data as raw CSV strings rather than structured JSON. It wraps kafka-python's KafkaProducer and offers two key methods:

  1. read_records(resource_path: str) -- A static method that opens a CSV file, skips the header, and extracts selected columns from each row into a comma-separated string format: vendor_id, tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count, trip_distance, payment_type, total_amount (columns 0, 1, 2, 3, 4, 9, 16). The method also extracts the vendor_id as the key. It limits output to the first 5 records and returns a zipped iterator of (key, value) tuples.
  2. publish(topic: str, records: [str, str]) -- Iterates over key-value pairs and sends each to the specified Kafka topic. It handles KeyboardInterrupt and general exceptions, flushes the producer after all records, and sleeps for 1 second.

A standalone delivery_report function is also defined in the module for callback-style delivery reporting, though it is not wired into the default configuration.

Usage

Use this implementation when you need to produce CSV-formatted string messages to a Kafka/Redpanda topic, particularly as the ingestion stage for downstream PySpark Structured Streaming pipelines that parse CSV values. This producer pairs with the RideCSVConsumer and the streaming.py PySpark pipeline.

Code Reference

Source Location

Signature

def delivery_report(err, msg):
    if err is not None:
        print("Delivery failed for record {}: {}".format(msg.key(), err))
        return
    print('Record {} successfully produced to {} [{}] at offset {}'.format(
        msg.key(), msg.topic(), msg.partition(), msg.offset()))


class RideCSVProducer:
    def __init__(self, props: Dict):
        self.producer = KafkaProducer(**props)

    @staticmethod
    def read_records(resource_path: str):
        records, ride_keys = [], []
        i = 0
        with open(resource_path, 'r') as f:
            reader = csv.reader(f)
            header = next(reader)  # skip the header
            for row in reader:
                records.append(f'{row[0]}, {row[1]}, {row[2]}, {row[3]}, {row[4]}, {row[9]}, {row[16]}')
                ride_keys.append(str(row[0]))
                i += 1
                if i == 5:
                    break
        return zip(ride_keys, records)

    def publish(self, topic: str, records: [str, str]):
        for key_value in records:
            key, value = key_value
            try:
                self.producer.send(topic=topic, key=key, value=value)
                print(f"Producing record for <key: {key}, value:{value}>")
            except KeyboardInterrupt:
                break
            except Exception as e:
                print(f"Exception while producing record - {value}: {e}")
        self.producer.flush()
        sleep(1)

Import

from producer import RideCSVProducer

I/O Contract

Inputs

Name Type Required Description
props Dict Yes Configuration dictionary passed to KafkaProducer. Expected keys include bootstrap_servers, key_serializer, and value_serializer.
resource_path str Yes File system path to a CSV file containing taxi ride data with a header row.
topic str Yes The Kafka topic name to publish CSV string records to.
records zip[str, str] Yes An iterable of (key, value) string tuples to publish.

Outputs

Name Type Description
records zip[str, str] The read_records static method returns a zipped iterator of (ride_key, csv_record_string) tuples. Limited to first 5 records.
(stdout) str Each produced record is logged to the console with its key and value. Errors are also printed.

Usage Examples

Basic Usage

from producer import RideCSVProducer
from settings import BOOTSTRAP_SERVERS, INPUT_DATA_PATH, PRODUCE_TOPIC_RIDES_CSV

config = {
    'bootstrap_servers': [BOOTSTRAP_SERVERS],
    'key_serializer': lambda x: x.encode('utf-8'),
    'value_serializer': lambda x: x.encode('utf-8')
}

producer = RideCSVProducer(props=config)
ride_records = producer.read_records(resource_path=INPUT_DATA_PATH)
producer.publish(topic=PRODUCE_TOPIC_RIDES_CSV, records=ride_records)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment