Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:DataTalksClub Data engineering zoomcamp PySpark Kafka ReadStream

From Leeroopedia


Page Metadata
Knowledge Sources repo: DataTalksClub/data-engineering-zoomcamp
Domains Data_Engineering, Stream_Processing
Last Updated 2026-02-09 14:00 GMT

Overview

Concrete API documentation for the PySpark Structured Streaming functions that read from a Kafka topic, parse CSV-formatted messages into a typed DataFrame using a predefined schema, and perform windowed group-by aggregations on the streaming data.

Description

This implementation provides three core functions for stream ingestion and transformation:

  • read_from_kafka(consume_topic): Creates a streaming DataFrame by connecting to a Kafka broker. It configures the Kafka source with dual bootstrap servers (localhost:9092 for host access and broker:29092 for container access), subscribes to the specified topic, reads from the earliest available offset, and uses a checkpoint directory for fault tolerance.
  • parse_ride_from_kafka_message(df, schema): Transforms the raw Kafka message DataFrame into a typed, columnar DataFrame. It casts the binary key and value to strings, splits the value by comma-space delimiter, and maps each element to a named column with the correct data type as defined by the RIDE_SCHEMA. The schema defines 7 fields: vendor_id (Int), tpep_pickup_datetime (Timestamp), tpep_dropoff_datetime (Timestamp), passenger_count (Int), trip_distance (Float), payment_type (Int), and total_amount (Float).
  • op_windowed_groupby(df, window_duration, slide_duration): Applies a sliding window aggregation over the streaming DataFrame. It groups records by a time window on tpep_pickup_datetime and by vendor_id, then counts the number of rides in each window-vendor combination. The default configuration uses a 10-minute window duration with a 5-minute slide duration.

Usage

Use this implementation to:

  • Ingest real-time taxi ride data from a Kafka topic into a PySpark streaming pipeline.
  • Parse CSV-formatted Kafka messages into a strongly-typed DataFrame for downstream SQL-like operations.
  • Compute rolling ride counts per vendor within sliding time windows.
  • Chain the parsed or aggregated DataFrame into output sinks (console, Kafka, memory).

Code Reference

Source Location

File 07-streaming/python/streams-example/pyspark/streaming.py
Lines L1-87
Repository DataTalksClub/data-engineering-zoomcamp

Signature

def read_from_kafka(consume_topic: str) -> DataFrame: ...

def parse_ride_from_kafka_message(df: DataFrame, schema: StructType) -> DataFrame: ...

def op_windowed_groupby(df: DataFrame, window_duration: str, slide_duration: str) -> DataFrame: ...

Import

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from settings import RIDE_SCHEMA, CONSUME_TOPIC_RIDES_CSV, TOPIC_WINDOWED_VENDOR_ID_COUNT

I/O Contract

Inputs

Parameter Type Description
consume_topic str Kafka topic name to subscribe to (e.g., 'rides_csv')
df DataFrame (streaming) A streaming DataFrame with Kafka's default schema (key, value, topic, partition, offset, timestamp, timestampType)
schema StructType PySpark schema defining field names and data types for parsing the CSV value
window_duration str Window length as a string (e.g., '10 minutes')
slide_duration str Slide interval as a string (e.g., '5 minutes')

Outputs

Function Output Type Columns Description
read_from_kafka Streaming DataFrame key (binary), value (binary), topic, partition, offset, timestamp, timestampType Raw Kafka message stream
parse_ride_from_kafka_message Streaming DataFrame vendor_id (Int), tpep_pickup_datetime (Timestamp), tpep_dropoff_datetime (Timestamp), passenger_count (Int), trip_distance (Float), payment_type (Int), total_amount (Float) Typed ride record stream
op_windowed_groupby Streaming DataFrame window (struct: start, end), vendor_id (Int), count (Long) Windowed ride counts per vendor

Usage Examples

Complete streaming pipeline from Kafka source to windowed aggregation:

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from settings import RIDE_SCHEMA, CONSUME_TOPIC_RIDES_CSV

# Initialize Spark session
spark = SparkSession.builder.appName('streaming-examples').getOrCreate()
spark.sparkContext.setLogLevel('WARN')

# Step 1: Read raw stream from Kafka
df_consume_stream = read_from_kafka(consume_topic=CONSUME_TOPIC_RIDES_CSV)
print(df_consume_stream.printSchema())
# root
#  |-- key: binary (nullable = true)
#  |-- value: binary (nullable = true)
#  |-- topic: string (nullable = true)
#  |-- partition: integer (nullable = true)
#  |-- offset: long (nullable = true)
#  |-- timestamp: timestamp (nullable = true)
#  |-- timestampType: integer (nullable = true)

# Step 2: Parse raw messages into typed ride records
df_rides = parse_ride_from_kafka_message(df_consume_stream, RIDE_SCHEMA)
print(df_rides.printSchema())
# root
#  |-- vendor_id: integer (nullable = true)
#  |-- tpep_pickup_datetime: timestamp (nullable = true)
#  |-- tpep_dropoff_datetime: timestamp (nullable = true)
#  |-- passenger_count: integer (nullable = true)
#  |-- trip_distance: float (nullable = true)
#  |-- payment_type: integer (nullable = true)
#  |-- total_amount: float (nullable = true)

# Step 3: Apply windowed aggregation
df_windowed = op_windowed_groupby(
    df_rides,
    window_duration='10 minutes',
    slide_duration='5 minutes'
)

RIDE_SCHEMA definition from settings.py:

import pyspark.sql.types as T

RIDE_SCHEMA = T.StructType([
    T.StructField("vendor_id", T.IntegerType()),
    T.StructField('tpep_pickup_datetime', T.TimestampType()),
    T.StructField('tpep_dropoff_datetime', T.TimestampType()),
    T.StructField("passenger_count", T.IntegerType()),
    T.StructField("trip_distance", T.FloatType()),
    T.StructField("payment_type", T.IntegerType()),
    T.StructField("total_amount", T.FloatType()),
])

CONSUME_TOPIC_RIDES_CSV = 'rides_csv'
TOPIC_WINDOWED_VENDOR_ID_COUNT = 'vendor_counts_windowed'

Submitting the streaming job via spark-submit:

spark-submit \
    --master spark://spark-master:7077 \
    --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 \
    streaming.py

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment