Implementation:DataTalksClub Data engineering zoomcamp Redpanda CSV Producer
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Kafka |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Python class RideCSVProducer that reads CSV taxi ride data, extracts selected columns (vendor_id, pickup/dropoff datetimes, passenger_count, trip_distance, payment_type, total_amount), and publishes them as comma-separated string messages to a Kafka/Redpanda topic using kafka-python.
Description
The RideCSVProducer class provides a lightweight producer that publishes taxi ride data as raw CSV strings rather than structured JSON. It wraps kafka-python's KafkaProducer and offers two key methods:
- read_records(resource_path: str) -- A static method that opens a CSV file, skips the header, and extracts selected columns from each row into a comma-separated string format: vendor_id, tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count, trip_distance, payment_type, total_amount (columns 0, 1, 2, 3, 4, 9, 16). The method also extracts the vendor_id as the key. It limits output to the first 5 records and returns a zipped iterator of (key, value) tuples.
- publish(topic: str, records: [str, str]) -- Iterates over key-value pairs and sends each to the specified Kafka topic. It handles KeyboardInterrupt and general exceptions, flushes the producer after all records, and sleeps for 1 second.
A standalone delivery_report function is also defined in the module for callback-style delivery reporting, though it is not wired into the default configuration.
Usage
Use this implementation when you need to produce CSV-formatted string messages to a Kafka/Redpanda topic, particularly as the ingestion stage for downstream PySpark Structured Streaming pipelines that parse CSV values. This producer pairs with the RideCSVConsumer and the streaming.py PySpark pipeline.
Code Reference
Source Location
- Repository: DataTalksClub_Data_engineering_zoomcamp
- File: 07-streaming/python/streams-example/redpanda/producer.py
- Lines: 1-63
Signature
def delivery_report(err, msg):
if err is not None:
print("Delivery failed for record {}: {}".format(msg.key(), err))
return
print('Record {} successfully produced to {} [{}] at offset {}'.format(
msg.key(), msg.topic(), msg.partition(), msg.offset()))
class RideCSVProducer:
def __init__(self, props: Dict):
self.producer = KafkaProducer(**props)
@staticmethod
def read_records(resource_path: str):
records, ride_keys = [], []
i = 0
with open(resource_path, 'r') as f:
reader = csv.reader(f)
header = next(reader) # skip the header
for row in reader:
records.append(f'{row[0]}, {row[1]}, {row[2]}, {row[3]}, {row[4]}, {row[9]}, {row[16]}')
ride_keys.append(str(row[0]))
i += 1
if i == 5:
break
return zip(ride_keys, records)
def publish(self, topic: str, records: [str, str]):
for key_value in records:
key, value = key_value
try:
self.producer.send(topic=topic, key=key, value=value)
print(f"Producing record for <key: {key}, value:{value}>")
except KeyboardInterrupt:
break
except Exception as e:
print(f"Exception while producing record - {value}: {e}")
self.producer.flush()
sleep(1)
Import
from producer import RideCSVProducer
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| props | Dict | Yes | Configuration dictionary passed to KafkaProducer. Expected keys include bootstrap_servers, key_serializer, and value_serializer. |
| resource_path | str | Yes | File system path to a CSV file containing taxi ride data with a header row. |
| topic | str | Yes | The Kafka topic name to publish CSV string records to. |
| records | zip[str, str] | Yes | An iterable of (key, value) string tuples to publish. |
Outputs
| Name | Type | Description |
|---|---|---|
| records | zip[str, str] | The read_records static method returns a zipped iterator of (ride_key, csv_record_string) tuples. Limited to first 5 records. |
| (stdout) | str | Each produced record is logged to the console with its key and value. Errors are also printed. |
Usage Examples
Basic Usage
from producer import RideCSVProducer
from settings import BOOTSTRAP_SERVERS, INPUT_DATA_PATH, PRODUCE_TOPIC_RIDES_CSV
config = {
'bootstrap_servers': [BOOTSTRAP_SERVERS],
'key_serializer': lambda x: x.encode('utf-8'),
'value_serializer': lambda x: x.encode('utf-8')
}
producer = RideCSVProducer(props=config)
ride_records = producer.read_records(resource_path=INPUT_DATA_PATH)
producer.publish(topic=PRODUCE_TOPIC_RIDES_CSV, records=ride_records)