Implementation:DataTalksClub Data engineering zoomcamp Ride Data Model
| Page Metadata | |
|---|---|
| Knowledge Sources | repo: DataTalksClub/data-engineering-zoomcamp |
| Domains | Data_Engineering, Stream_Processing |
| Last Updated | 2026-02-09 14:00 GMT |
Overview
Concrete API documentation for the Ride class, a strongly-typed Python data model representing a NYC taxi trip record with 18 fields, used as the serialization and deserialization contract between Kafka producers and consumers.
Description
The Ride class models a single NYC taxi trip record from the TLC Trip Record dataset. It accepts a list of string values (as read from a CSV row) and parses each element into its appropriate Python type:
- String fields:
vendor_id,store_and_fwd_flag,payment_type - Integer fields:
passenger_count,rate_code_id,pu_location_id,do_location_id - Decimal fields:
trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge - Datetime fields:
tpep_pickup_datetime,tpep_dropoff_datetime(parsed with format%Y-%m-%d %H:%M:%S)
The class provides a from_dict classmethod for reconstructing a Ride from a dictionary (used during JSON deserialization) and a __repr__ method for debugging output.
Usage
Use this implementation to:
- Parse CSV rows from the taxi trip dataset into typed Python objects for Kafka production.
- Deserialize JSON messages from Kafka back into
Rideobjects on the consumer side. - Provide a shared schema contract between the
JsonProducerandJsonConsumerclasses.
Code Reference
Source Location
| File | 07-streaming/python/json_example/ride.py
|
| Lines | L1-52 |
| Repository | DataTalksClub/data-engineering-zoomcamp |
Signature
class Ride:
def __init__(self, arr: List[str]) -> None: ...
@classmethod
def from_dict(cls, d: Dict) -> 'Ride': ...
def __repr__(self) -> str: ...
Import
from ride import Ride
I/O Contract
Inputs
| Parameter | Type | Description |
|---|---|---|
| arr | List[str] |
A list of 18 string values representing a single CSV row from the taxi trip dataset, in column order |
| d (from_dict) | Dict |
A dictionary with field names as keys and serialized values, as produced by json.dumps(ride.__dict__)
|
Outputs
| Attribute | Type | Index | Description |
|---|---|---|---|
| vendor_id | str |
0 | Taxi vendor identifier |
| tpep_pickup_datetime | datetime |
1 | Trip pickup timestamp |
| tpep_dropoff_datetime | datetime |
2 | Trip dropoff timestamp |
| passenger_count | int |
3 | Number of passengers |
| trip_distance | Decimal |
4 | Trip distance in miles |
| rate_code_id | int |
5 | Rate code for the trip |
| store_and_fwd_flag | str |
6 | Store and forward flag |
| pu_location_id | int |
7 | Pickup location zone ID |
| do_location_id | int |
8 | Dropoff location zone ID |
| payment_type | str |
9 | Payment method |
| fare_amount | Decimal |
10 | Base fare amount |
| extra | Decimal |
11 | Extra charges |
| mta_tax | Decimal |
12 | MTA tax |
| tip_amount | Decimal |
13 | Tip amount |
| tolls_amount | Decimal |
14 | Tolls amount |
| improvement_surcharge | Decimal |
15 | Improvement surcharge |
| total_amount | Decimal |
16 | Total trip amount |
| congestion_surcharge | Decimal |
17 | Congestion surcharge |
Usage Examples
Constructing a Ride from a CSV row:
from ride import Ride
# Simulating a CSV row as a list of strings
row = [
"1", # vendor_id
"2021-01-01 00:15:56", # tpep_pickup_datetime
"2021-01-01 00:31:07", # tpep_dropoff_datetime
"2", # passenger_count
"4.50", # trip_distance
"1", # rate_code_id
"N", # store_and_fwd_flag
"142", # pu_location_id
"236", # do_location_id
"1", # payment_type
"14.00", # fare_amount
"0.50", # extra
"0.50", # mta_tax
"3.36", # tip_amount
"0.00", # tolls_amount
"0.30", # improvement_surcharge
"18.66", # total_amount
"2.50" # congestion_surcharge
]
ride = Ride(arr=row)
print(ride)
# Output: Ride: {'vendor_id': '1', 'tpep_pickup_datetime': ..., ...}
Reconstructing a Ride from a deserialized JSON dictionary:
from ride import Ride
# Dictionary as received from JSON deserialization
ride_dict = {
'vendor_id': '1',
'tpep_pickup_datetime': ['2021-01-01 00:15:56'],
'tpep_dropoff_datetime': ['2021-01-01 00:31:07'],
'passenger_count': '2',
'trip_distance': '4.50',
'rate_code_id': '1',
'store_and_fwd_flag': 'N',
'pu_location_id': '142',
'do_location_id': '236',
'payment_type': '1',
'fare_amount': '14.00',
'extra': '0.50',
'mta_tax': '0.50',
'tip_amount': '3.36',
'tolls_amount': '0.00',
'improvement_surcharge': '0.30',
'total_amount': '18.66',
'congestion_surcharge': '2.50'
}
ride = Ride.from_dict(ride_dict)
print(ride.pu_location_id)
# Output: 142
Using the Ride model's __dict__ for JSON serialization:
import json
from ride import Ride
ride = Ride(arr=["1", "2021-01-01 00:15:56", "2021-01-01 00:31:07",
"2", "4.50", "1", "N", "142", "236", "1",
"14.00", "0.50", "0.50", "3.36", "0.00", "0.30",
"18.66", "2.50"])
json_bytes = json.dumps(ride.__dict__, default=str).encode('utf-8')
print(json_bytes)
# Output: b'{"vendor_id": "1", "tpep_pickup_datetime": ...}'
Related Pages
- Principle:DataTalksClub_Data_engineering_zoomcamp_Streaming_Data_Model
- Implementation:DataTalksClub_Data_engineering_zoomcamp_JsonProducer_Implementation
- Implementation:DataTalksClub_Data_engineering_zoomcamp_JsonConsumer_Implementation
- Environment:DataTalksClub_Data_engineering_zoomcamp_Kafka_Confluent_Environment