Implementation:DataTalksClub Data engineering zoomcamp Redpanda JsonProducer
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Kafka |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Python class JsonProducer that reads CSV taxi ride data, constructs Ride objects, and publishes them as JSON-serialized messages to a Kafka/Redpanda topic using kafka-python's KafkaProducer.
Description
The JsonProducer class (which inherits from KafkaProducer at the class level but internally instantiates its own KafkaProducer instance) provides two core capabilities:
- read_records -- A static method that reads a CSV file from the given resource_path, skips the header row, and constructs a list of Ride objects from each row.
- publish_rides -- An instance method that iterates over a list of Ride messages and sends each one to the specified Kafka topic. The key is set to the ride's pu_location_id (pickup location ID). On success, the record's offset is printed. KafkaTimeoutError exceptions are caught and printed.
The default configuration in the __main__ block serializes keys by encoding them as UTF-8 strings and values by converting the Ride object's __dict__ to JSON (with default=str to handle Decimal and datetime types). The producer reads from the path defined in settings.INPUT_DATA_PATH and publishes to settings.KAFKA_TOPIC.
Usage
Use this implementation when you need to ingest taxi ride data from a CSV file and publish each ride as a JSON-serialized Kafka message to a Redpanda or Kafka broker. This is the producer counterpart to JsonConsumer, and together they form a complete JSON-based produce/consume pipeline.
Code Reference
Source Location
- Repository: DataTalksClub_Data_engineering_zoomcamp
- File: 07-streaming/python/redpanda_example/producer.py
- Lines: 1-45
Signature
class JsonProducer(KafkaProducer):
def __init__(self, props: Dict):
self.producer = KafkaProducer(**props)
@staticmethod
def read_records(resource_path: str):
records = []
with open(resource_path, 'r') as f:
reader = csv.reader(f)
header = next(reader) # skip the header row
for row in reader:
records.append(Ride(arr=row))
return records
def publish_rides(self, topic: str, messages: List[Ride]):
for ride in messages:
try:
record = self.producer.send(topic=topic, key=ride.pu_location_id, value=ride)
print('Record {} successfully produced at offset {}'.format(ride.pu_location_id, record.get().offset))
except KafkaTimeoutError as e:
print(e.__str__())
Import
from producer import JsonProducer
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| props | Dict | Yes | Configuration dictionary passed to KafkaProducer. Expected keys include bootstrap_servers, key_serializer, and value_serializer. |
| resource_path | str | Yes | File system path to a CSV file containing taxi ride data with a header row. |
| topic | str | Yes | The Kafka topic name to publish ride messages to. |
| messages | List[Ride] | Yes | A list of Ride objects to be serialized and published. |
Outputs
| Name | Type | Description |
|---|---|---|
| records | List[Ride] | The read_records static method returns a list of Ride objects parsed from the CSV file. |
| (stdout) | str | Each successfully produced record logs its pickup location ID and offset. Errors are printed to the console. |
Usage Examples
Basic Usage
import json
from producer import JsonProducer
from settings import BOOTSTRAP_SERVERS, INPUT_DATA_PATH, KAFKA_TOPIC
config = {
'bootstrap_servers': BOOTSTRAP_SERVERS,
'key_serializer': lambda key: str(key).encode(),
'value_serializer': lambda x: json.dumps(x.__dict__, default=str).encode('utf-8')
}
producer = JsonProducer(props=config)
rides = producer.read_records(resource_path=INPUT_DATA_PATH)
producer.publish_rides(topic=KAFKA_TOPIC, messages=rides)