Implementation:DataTalksClub Data engineering zoomcamp Confluent CSV Producer
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Kafka |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Python class RideCSVProducer using confluent-kafka's Producer (not kafka-python) to read CSV taxi ride data and publish location-pair records to Confluent Cloud Kafka topics, supporting both green and FHV taxi types via CLI arguments.
Description
The RideCSVProducer class is designed for publishing taxi ride data to Confluent Cloud Kafka. Unlike the Redpanda examples that use kafka-python, this implementation uses the confluent_kafka.Producer client, which requires a different configuration format (using dot-separated keys like bootstrap.servers instead of underscored bootstrap_servers).
The class supports two taxi ride types:
- green -- Extracts PULocationID (column 5) and DOLocationID (column 6) as the CSV record, with vendor_id (column 0) as the key.
- fhv -- Extracts PULocationID (column 3) and DOLocationID (column 4) as the CSV record, with dispatching_base_num (column 0) as the key.
Key methods:
- __init__(self, probs: Dict, ride_type: str) -- Initializes the Producer with the provided configuration and stores the ride type.
- parse_row(self, row) -- Extracts the key and record from a CSV row based on the ride type.
- read_records(self, resource_path: str) -- Reads all rows from the CSV file (skipping the header), parses each row, and returns a zipped iterator of (key, record) tuples.
- publish(self, records, topic: str) -- Iterates over key-value pairs and calls producer.produce() (the confluent-kafka API, as opposed to producer.send() in kafka-python). Handles BufferError by polling, and flushes after all records with a 10-second sleep.
The __main__ block uses argparse to accept a --type argument (defaulting to green) and selects the appropriate topic and data path from settings.
Usage
Use this implementation when you need to publish taxi ride data to Confluent Cloud Kafka (as opposed to a local Redpanda/Kafka broker). It handles SASL_SSL authentication via the CONFLUENT_CLOUD_CONFIG settings dictionary, which is read from a properties file. This producer is the data source for the Confluent PySpark Streaming pipeline.
Code Reference
Source Location
- Repository: DataTalksClub_Data_engineering_zoomcamp
- File: cohorts/2023/week_6_stream_processing/producer_confluent.py
- Lines: 1-72
Signature
class RideCSVProducer:
def __init__(self, probs: Dict, ride_type: str):
self.producer = Producer(**probs)
self.ride_type = ride_type
def parse_row(self, row):
if self.ride_type == 'green':
record = f'{row[5]}, {row[6]}' # PULocationID, DOLocationID
key = str(row[0]) # vendor_id
elif self.ride_type == 'fhv':
record = f'{row[3]}, {row[4]}' # PULocationID, DOLocationID
key = str(row[0]) # dispatching_base_num
return key, record
def read_records(self, resource_path: str):
records, ride_keys = [], []
with open(resource_path, 'r') as f:
reader = csv.reader(f)
header = next(reader) # skip the header
for row in reader:
key, record = self.parse_row(row)
ride_keys.append(key)
records.append(record)
return zip(ride_keys, records)
def publish(self, records: [str, str], topic: str):
for key_value in records:
key, value = key_value
try:
self.producer.poll(0)
self.producer.produce(topic=topic, key=key, value=value)
print(f"Producing record for <key: {key}, value:{value}>")
except KeyboardInterrupt:
break
except BufferError as bfer:
self.producer.poll(0.1)
except Exception as e:
print(f"Exception while producing record - {value}: {e}")
self.producer.flush()
sleep(10)
Import
from producer_confluent import RideCSVProducer
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| probs | Dict | Yes | Confluent Cloud configuration dictionary (read from client_original.properties). Expected keys include bootstrap.servers, sasl.username, sasl.password, and other SASL_SSL parameters. |
| ride_type | str | Yes | Type of taxi ride data to process. Must be green or fhv. Determines which CSV columns are extracted. |
| resource_path | str | Yes | File system path to the CSV file containing taxi ride data. |
| topic | str | Yes | The Confluent Cloud Kafka topic name to publish records to. |
| records | zip[str, str] | Yes | An iterable of (key, value) string tuples to publish. |
Outputs
| Name | Type | Description |
|---|---|---|
| records | zip[str, str] | The read_records method returns a zipped iterator of (ride_key, location_pair_string) tuples. |
| (stdout) | str | Each produced record is logged to the console. Errors and exceptions are also printed. |
Usage Examples
Basic Usage
from producer_confluent import RideCSVProducer
from settings import CONFLUENT_CLOUD_CONFIG, GREEN_TAXI_TOPIC, GREEN_TRIP_DATA_PATH
producer = RideCSVProducer(ride_type='green', probs=CONFLUENT_CLOUD_CONFIG)
ride_records = producer.read_records(resource_path=GREEN_TRIP_DATA_PATH)
producer.publish(records=ride_records, topic=GREEN_TAXI_TOPIC)
CLI Usage
# Produce green taxi data (default):
# python producer_confluent.py
# Produce FHV taxi data:
# python producer_confluent.py --type fhv