Implementation:DataTalksClub Data engineering zoomcamp Redpanda Ride Model
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Kafka |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Python data model class Ride that represents a NYC taxi ride record with 18 typed fields, supporting construction from CSV row arrays and deserialization from dictionaries.
Description
The Ride class serves as the canonical data model for taxi ride records in the Redpanda/Kafka streaming examples. It provides two construction paths:
- __init__(self, arr: List[str]) -- The primary constructor that accepts a list of string values (typically from a CSV row) and parses each element into the appropriate Python type. Fields include vendor_id (str), tpep_pickup_datetime and tpep_dropoff_datetime (parsed via datetime.strptime with format "%Y-%m-%d %H:%M:%S"), integer fields such as passenger_count, rate_code_id, pu_location_id, and do_location_id, and Decimal-typed monetary fields including trip_distance, fare_amount, extra, mta_tax, tip_amount, tolls_amount, improvement_surcharge, total_amount, and congestion_surcharge.
- from_dict(cls, d: Dict) -- A class method that reconstructs a Ride from a dictionary (as produced by JSON deserialization), mapping dictionary keys back to the positional array format expected by __init__. This is used as the object_hook in the JsonConsumer deserialization pipeline.
The class also implements __repr__ for readable string representation using the instance's __dict__.
Usage
Use this data model whenever you need to represent a taxi ride record in the streaming pipeline. It is used by both JsonProducer (to construct rides from CSV data) and JsonConsumer (to deserialize rides from JSON messages). It can also serve as a reference schema for the 18 fields present in the NYC taxi ride dataset.
Code Reference
Source Location
- Repository: DataTalksClub_Data_engineering_zoomcamp
- File: 07-streaming/python/redpanda_example/ride.py
- Lines: 1-53
Signature
class Ride:
def __init__(self, arr: List[str]):
self.vendor_id = arr[0]
self.tpep_pickup_datetime = datetime.strptime(arr[1], "%Y-%m-%d %H:%M:%S"),
self.tpep_dropoff_datetime = datetime.strptime(arr[2], "%Y-%m-%d %H:%M:%S"),
self.passenger_count = int(arr[3])
self.trip_distance = Decimal(arr[4])
self.rate_code_id = int(arr[5])
self.store_and_fwd_flag = arr[6]
self.pu_location_id = int(arr[7])
self.do_location_id = int(arr[8])
self.payment_type = arr[9]
self.fare_amount = Decimal(arr[10])
self.extra = Decimal(arr[11])
self.mta_tax = Decimal(arr[12])
self.tip_amount = Decimal(arr[13])
self.tolls_amount = Decimal(arr[14])
self.improvement_surcharge = Decimal(arr[15])
self.total_amount = Decimal(arr[16])
self.congestion_surcharge = Decimal(arr[17])
@classmethod
def from_dict(cls, d: Dict):
return cls(arr=[
d['vendor_id'], d['tpep_pickup_datetime'][0],
d['tpep_dropoff_datetime'][0], d['passenger_count'],
d['trip_distance'], d['rate_code_id'],
d['store_and_fwd_flag'], d['pu_location_id'],
d['do_location_id'], d['payment_type'],
d['fare_amount'], d['extra'], d['mta_tax'],
d['tip_amount'], d['tolls_amount'],
d['improvement_surcharge'], d['total_amount'],
d['congestion_surcharge'],
])
def __repr__(self):
return f'{self.__class__.__name__}: {self.__dict__}'
Import
from ride import Ride
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| arr | List[str] | Yes | A list of 18 string values representing a single taxi ride record, typically sourced from a CSV row. Fields are positionally mapped: [0] vendor_id, [1] tpep_pickup_datetime, [2] tpep_dropoff_datetime, [3] passenger_count, [4] trip_distance, [5] rate_code_id, [6] store_and_fwd_flag, [7] pu_location_id, [8] do_location_id, [9] payment_type, [10] fare_amount, [11] extra, [12] mta_tax, [13] tip_amount, [14] tolls_amount, [15] improvement_surcharge, [16] total_amount, [17] congestion_surcharge. |
| d | Dict | Yes (for from_dict) | A dictionary with keys matching the field names of the Ride class. Used for reconstructing a Ride from a deserialized JSON object. |
Outputs
| Name | Type | Description |
|---|---|---|
| Ride | Ride | A fully constructed Ride instance with typed fields: str for vendor_id, store_and_fwd_flag, and payment_type; datetime (as tuple) for pickup/dropoff timestamps; int for passenger_count, rate_code_id, pu_location_id, and do_location_id; Decimal for all monetary and distance fields. |
Usage Examples
Basic Usage
from ride import Ride
# Construct from CSV row
row = ['1', '2021-01-01 00:15:56', '2021-01-01 00:31:07', '2', '4.50',
'1', 'N', '230', '166', '1', '16.00', '0.50', '0.50',
'4.06', '0.00', '0.30', '21.36', '2.50']
ride = Ride(arr=row)
print(ride.vendor_id) # '1'
print(ride.pu_location_id) # 230
print(ride.total_amount) # Decimal('21.36')
# Reconstruct from dictionary (e.g., after JSON deserialization)
ride_dict = ride.__dict__
ride_copy = Ride.from_dict(ride_dict)