Implementation:DataTalksClub Data engineering zoomcamp PySpark Kafka ReadStream
Appearance
| Page Metadata | |
|---|---|
| Knowledge Sources | repo: DataTalksClub/data-engineering-zoomcamp |
| Domains | Data_Engineering, Stream_Processing |
| Last Updated | 2026-02-09 14:00 GMT |
Overview
Concrete API documentation for the PySpark Structured Streaming functions that read from a Kafka topic, parse CSV-formatted messages into a typed DataFrame using a predefined schema, and perform windowed group-by aggregations on the streaming data.
Description
This implementation provides three core functions for stream ingestion and transformation:
read_from_kafka(consume_topic): Creates a streaming DataFrame by connecting to a Kafka broker. It configures the Kafka source with dual bootstrap servers (localhost:9092for host access andbroker:29092for container access), subscribes to the specified topic, reads from the earliest available offset, and uses a checkpoint directory for fault tolerance.
parse_ride_from_kafka_message(df, schema): Transforms the raw Kafka message DataFrame into a typed, columnar DataFrame. It casts the binary key and value to strings, splits the value by comma-space delimiter, and maps each element to a named column with the correct data type as defined by theRIDE_SCHEMA. The schema defines 7 fields:vendor_id(Int),tpep_pickup_datetime(Timestamp),tpep_dropoff_datetime(Timestamp),passenger_count(Int),trip_distance(Float),payment_type(Int), andtotal_amount(Float).
op_windowed_groupby(df, window_duration, slide_duration): Applies a sliding window aggregation over the streaming DataFrame. It groups records by a time window ontpep_pickup_datetimeand byvendor_id, then counts the number of rides in each window-vendor combination. The default configuration uses a 10-minute window duration with a 5-minute slide duration.
Usage
Use this implementation to:
- Ingest real-time taxi ride data from a Kafka topic into a PySpark streaming pipeline.
- Parse CSV-formatted Kafka messages into a strongly-typed DataFrame for downstream SQL-like operations.
- Compute rolling ride counts per vendor within sliding time windows.
- Chain the parsed or aggregated DataFrame into output sinks (console, Kafka, memory).
Code Reference
Source Location
| File | 07-streaming/python/streams-example/pyspark/streaming.py
|
| Lines | L1-87 |
| Repository | DataTalksClub/data-engineering-zoomcamp |
Signature
def read_from_kafka(consume_topic: str) -> DataFrame: ...
def parse_ride_from_kafka_message(df: DataFrame, schema: StructType) -> DataFrame: ...
def op_windowed_groupby(df: DataFrame, window_duration: str, slide_duration: str) -> DataFrame: ...
Import
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from settings import RIDE_SCHEMA, CONSUME_TOPIC_RIDES_CSV, TOPIC_WINDOWED_VENDOR_ID_COUNT
I/O Contract
Inputs
| Parameter | Type | Description |
|---|---|---|
| consume_topic | str |
Kafka topic name to subscribe to (e.g., 'rides_csv')
|
| df | DataFrame (streaming) |
A streaming DataFrame with Kafka's default schema (key, value, topic, partition, offset, timestamp, timestampType) |
| schema | StructType |
PySpark schema defining field names and data types for parsing the CSV value |
| window_duration | str |
Window length as a string (e.g., '10 minutes')
|
| slide_duration | str |
Slide interval as a string (e.g., '5 minutes')
|
Outputs
| Function | Output Type | Columns | Description |
|---|---|---|---|
| read_from_kafka | Streaming DataFrame |
key (binary), value (binary), topic, partition, offset, timestamp, timestampType | Raw Kafka message stream |
| parse_ride_from_kafka_message | Streaming DataFrame |
vendor_id (Int), tpep_pickup_datetime (Timestamp), tpep_dropoff_datetime (Timestamp), passenger_count (Int), trip_distance (Float), payment_type (Int), total_amount (Float) | Typed ride record stream |
| op_windowed_groupby | Streaming DataFrame |
window (struct: start, end), vendor_id (Int), count (Long) | Windowed ride counts per vendor |
Usage Examples
Complete streaming pipeline from Kafka source to windowed aggregation:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from settings import RIDE_SCHEMA, CONSUME_TOPIC_RIDES_CSV
# Initialize Spark session
spark = SparkSession.builder.appName('streaming-examples').getOrCreate()
spark.sparkContext.setLogLevel('WARN')
# Step 1: Read raw stream from Kafka
df_consume_stream = read_from_kafka(consume_topic=CONSUME_TOPIC_RIDES_CSV)
print(df_consume_stream.printSchema())
# root
# |-- key: binary (nullable = true)
# |-- value: binary (nullable = true)
# |-- topic: string (nullable = true)
# |-- partition: integer (nullable = true)
# |-- offset: long (nullable = true)
# |-- timestamp: timestamp (nullable = true)
# |-- timestampType: integer (nullable = true)
# Step 2: Parse raw messages into typed ride records
df_rides = parse_ride_from_kafka_message(df_consume_stream, RIDE_SCHEMA)
print(df_rides.printSchema())
# root
# |-- vendor_id: integer (nullable = true)
# |-- tpep_pickup_datetime: timestamp (nullable = true)
# |-- tpep_dropoff_datetime: timestamp (nullable = true)
# |-- passenger_count: integer (nullable = true)
# |-- trip_distance: float (nullable = true)
# |-- payment_type: integer (nullable = true)
# |-- total_amount: float (nullable = true)
# Step 3: Apply windowed aggregation
df_windowed = op_windowed_groupby(
df_rides,
window_duration='10 minutes',
slide_duration='5 minutes'
)
RIDE_SCHEMA definition from settings.py:
import pyspark.sql.types as T
RIDE_SCHEMA = T.StructType([
T.StructField("vendor_id", T.IntegerType()),
T.StructField('tpep_pickup_datetime', T.TimestampType()),
T.StructField('tpep_dropoff_datetime', T.TimestampType()),
T.StructField("passenger_count", T.IntegerType()),
T.StructField("trip_distance", T.FloatType()),
T.StructField("payment_type", T.IntegerType()),
T.StructField("total_amount", T.FloatType()),
])
CONSUME_TOPIC_RIDES_CSV = 'rides_csv'
TOPIC_WINDOWED_VENDOR_ID_COUNT = 'vendor_counts_windowed'
Submitting the streaming job via spark-submit:
spark-submit \
--master spark://spark-master:7077 \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 \
streaming.py
Related Pages
- Principle:DataTalksClub_Data_engineering_zoomcamp_Structured_Stream_Processing
- Implementation:DataTalksClub_Data_engineering_zoomcamp_PySpark_WriteStream_Sink
- Implementation:DataTalksClub_Data_engineering_zoomcamp_Kafka_Docker_Compose_Setup
- Environment:DataTalksClub_Data_engineering_zoomcamp_Kafka_Confluent_Environment
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment