Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:DataTalksClub Data engineering zoomcamp Confluent CSV Producer

From Leeroopedia


Knowledge Sources
Domains Streaming, Kafka
Last Updated 2026-02-09 00:00 GMT

Overview

Python class RideCSVProducer using confluent-kafka's Producer (not kafka-python) to read CSV taxi ride data and publish location-pair records to Confluent Cloud Kafka topics, supporting both green and FHV taxi types via CLI arguments.

Description

The RideCSVProducer class is designed for publishing taxi ride data to Confluent Cloud Kafka. Unlike the Redpanda examples that use kafka-python, this implementation uses the confluent_kafka.Producer client, which requires a different configuration format (using dot-separated keys like bootstrap.servers instead of underscored bootstrap_servers).

The class supports two taxi ride types:

  1. green -- Extracts PULocationID (column 5) and DOLocationID (column 6) as the CSV record, with vendor_id (column 0) as the key.
  2. fhv -- Extracts PULocationID (column 3) and DOLocationID (column 4) as the CSV record, with dispatching_base_num (column 0) as the key.

Key methods:

  • __init__(self, probs: Dict, ride_type: str) -- Initializes the Producer with the provided configuration and stores the ride type.
  • parse_row(self, row) -- Extracts the key and record from a CSV row based on the ride type.
  • read_records(self, resource_path: str) -- Reads all rows from the CSV file (skipping the header), parses each row, and returns a zipped iterator of (key, record) tuples.
  • publish(self, records, topic: str) -- Iterates over key-value pairs and calls producer.produce() (the confluent-kafka API, as opposed to producer.send() in kafka-python). Handles BufferError by polling, and flushes after all records with a 10-second sleep.

The __main__ block uses argparse to accept a --type argument (defaulting to green) and selects the appropriate topic and data path from settings.

Usage

Use this implementation when you need to publish taxi ride data to Confluent Cloud Kafka (as opposed to a local Redpanda/Kafka broker). It handles SASL_SSL authentication via the CONFLUENT_CLOUD_CONFIG settings dictionary, which is read from a properties file. This producer is the data source for the Confluent PySpark Streaming pipeline.

Code Reference

Source Location

Signature

class RideCSVProducer:
    def __init__(self, probs: Dict, ride_type: str):
        self.producer = Producer(**probs)
        self.ride_type = ride_type

    def parse_row(self, row):
        if self.ride_type == 'green':
            record = f'{row[5]}, {row[6]}'  # PULocationID, DOLocationID
            key = str(row[0])  # vendor_id
        elif self.ride_type == 'fhv':
            record = f'{row[3]}, {row[4]}'  # PULocationID, DOLocationID
            key = str(row[0])  # dispatching_base_num
        return key, record

    def read_records(self, resource_path: str):
        records, ride_keys = [], []
        with open(resource_path, 'r') as f:
            reader = csv.reader(f)
            header = next(reader)  # skip the header
            for row in reader:
                key, record = self.parse_row(row)
                ride_keys.append(key)
                records.append(record)
        return zip(ride_keys, records)

    def publish(self, records: [str, str], topic: str):
        for key_value in records:
            key, value = key_value
            try:
                self.producer.poll(0)
                self.producer.produce(topic=topic, key=key, value=value)
                print(f"Producing record for <key: {key}, value:{value}>")
            except KeyboardInterrupt:
                break
            except BufferError as bfer:
                self.producer.poll(0.1)
            except Exception as e:
                print(f"Exception while producing record - {value}: {e}")
        self.producer.flush()
        sleep(10)

Import

from producer_confluent import RideCSVProducer

I/O Contract

Inputs

Name Type Required Description
probs Dict Yes Confluent Cloud configuration dictionary (read from client_original.properties). Expected keys include bootstrap.servers, sasl.username, sasl.password, and other SASL_SSL parameters.
ride_type str Yes Type of taxi ride data to process. Must be green or fhv. Determines which CSV columns are extracted.
resource_path str Yes File system path to the CSV file containing taxi ride data.
topic str Yes The Confluent Cloud Kafka topic name to publish records to.
records zip[str, str] Yes An iterable of (key, value) string tuples to publish.

Outputs

Name Type Description
records zip[str, str] The read_records method returns a zipped iterator of (ride_key, location_pair_string) tuples.
(stdout) str Each produced record is logged to the console. Errors and exceptions are also printed.

Usage Examples

Basic Usage

from producer_confluent import RideCSVProducer
from settings import CONFLUENT_CLOUD_CONFIG, GREEN_TAXI_TOPIC, GREEN_TRIP_DATA_PATH

producer = RideCSVProducer(ride_type='green', probs=CONFLUENT_CLOUD_CONFIG)
ride_records = producer.read_records(resource_path=GREEN_TRIP_DATA_PATH)
producer.publish(records=ride_records, topic=GREEN_TAXI_TOPIC)

CLI Usage

# Produce green taxi data (default):
# python producer_confluent.py

# Produce FHV taxi data:
# python producer_confluent.py --type fhv

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment