Implementation:DataTalksClub Data engineering zoomcamp Redpanda PySpark Streaming
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Kafka |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
PySpark Structured Streaming pipeline that reads CSV-formatted ride data from a Kafka/Redpanda topic, parses it into a typed DataFrame, applies groupBy and windowed aggregations, and sinks results to both console and Kafka.
Description
This module implements a complete PySpark Structured Streaming pipeline with the following functions:
- read_from_kafka(consume_topic: str) -- Creates a Spark Streaming DataFrame by connecting to Kafka at localhost:9092,broker:29092, subscribing to the specified topic, reading from the earliest offset, and using a checkpoint directory.
- parse_ride_from_kafka_message(df, schema) -- Asserts that the DataFrame is streaming, casts key and value to strings, splits the CSV value column by ", ", and expands each element into a top-level column based on the provided RIDE_SCHEMA (a StructType with fields: vendor_id, tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count, trip_distance, payment_type, total_amount).
- sink_console(df, output_mode, processing_time) -- Writes the streaming DataFrame to the console with configurable output mode (default complete) and trigger interval (default 5 seconds). Returns a StreamingQuery.
- sink_memory(df, query_name, query_template) -- Writes the streaming DataFrame to an in-memory table, then runs a SQL query against it. Returns both the query results and the StreamingQuery.
- sink_kafka(df, topic) -- Writes the streaming DataFrame to a Kafka topic at localhost:9092,broker:29092 in complete output mode with checkpointing.
- prepare_df_to_kafka_sink(df, value_columns, key_column) -- Prepares a DataFrame for Kafka output by concatenating value_columns into a single value column and optionally renaming a column to key.
- op_groupby(df, column_names) -- Applies a groupBy aggregation with count() on the specified columns.
- op_windowed_groupby(df, window_duration, slide_duration) -- Applies a windowed aggregation on tpep_pickup_datetime grouped by vendor_id, with configurable window and slide durations.
The __main__ block orchestrates the full pipeline: reads from rides_csv topic, parses with RIDE_SCHEMA, sinks raw data to console in append mode, computes trip count by vendor_id, computes windowed trip count, sinks the groupBy result to console, and publishes the windowed result to the vendor_counts_windowed Kafka topic.
Usage
Use this implementation as a reference for building PySpark Structured Streaming pipelines that consume from Kafka/Redpanda, parse CSV data into typed DataFrames, apply aggregations (including time-windowed aggregations), and sink results to console or back to Kafka. Requires a running Spark cluster with the Kafka connector JAR.
Code Reference
Source Location
- Repository: DataTalksClub_Data_engineering_zoomcamp
- File: 07-streaming/python/streams-example/redpanda/streaming.py
- Lines: 1-128
Signature
def read_from_kafka(consume_topic: str):
df_stream = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092,broker:29092") \
.option("subscribe", consume_topic) \
.option("startingOffsets", "earliest") \
.option("checkpointLocation", "checkpoint") \
.load()
return df_stream
def parse_ride_from_kafka_message(df, schema):
assert df.isStreaming is True, "DataFrame doesn't receive streaming data"
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
col = F.split(df['value'], ', ')
for idx, field in enumerate(schema):
df = df.withColumn(field.name, col.getItem(idx).cast(field.dataType))
return df.select([field.name for field in schema])
def sink_console(df, output_mode: str = 'complete', processing_time: str = '5 seconds'):
...
def sink_memory(df, query_name, query_template):
...
def sink_kafka(df, topic):
...
def prepare_df_to_kafka_sink(df, value_columns, key_column=None):
...
def op_groupby(df, column_names):
df_aggregation = df.groupBy(column_names).count()
return df_aggregation
def op_windowed_groupby(df, window_duration, slide_duration):
df_windowed_aggregation = df.groupBy(
F.window(timeColumn=df.tpep_pickup_datetime, windowDuration=window_duration, slideDuration=slide_duration),
df.vendor_id
).count()
return df_windowed_aggregation
Import
from streaming import read_from_kafka, parse_ride_from_kafka_message, sink_console, sink_memory, sink_kafka, prepare_df_to_kafka_sink, op_groupby, op_windowed_groupby
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| consume_topic | str | Yes | Kafka topic name to subscribe to and read streaming data from. |
| df | pyspark.sql.DataFrame | Yes | A Spark Streaming DataFrame (for parse, sink, and aggregation functions). |
| schema | pyspark.sql.types.StructType | Yes | Schema defining the column names and types for parsing CSV values. Default is RIDE_SCHEMA with 7 fields: vendor_id (IntegerType), tpep_pickup_datetime (TimestampType), tpep_dropoff_datetime (TimestampType), passenger_count (IntegerType), trip_distance (FloatType), payment_type (IntegerType), total_amount (FloatType). |
| output_mode | str | No | Spark output mode for sinks: complete, append, or update. Defaults to complete. |
| processing_time | str | No | Trigger interval for console sink. Defaults to 5 seconds. |
| topic | str | Yes (for sink_kafka) | Kafka topic to write aggregated results to. |
| window_duration | str | Yes (for op_windowed_groupby) | Window size for time-based aggregation (e.g., 10 minutes). |
| slide_duration | str | Yes (for op_windowed_groupby) | Slide interval for the sliding window (e.g., 5 minutes). |
Outputs
| Name | Type | Description |
|---|---|---|
| df_stream | pyspark.sql.DataFrame | Streaming DataFrame returned by read_from_kafka with raw Kafka message columns (key, value, topic, partition, offset, timestamp, timestampType). |
| df_parsed | pyspark.sql.DataFrame | Streaming DataFrame returned by parse_ride_from_kafka_message with typed columns matching the provided schema. |
| write_query | pyspark.sql.streaming.StreamingQuery | Streaming query handle returned by sink_console, sink_memory, and sink_kafka. |
| df_aggregation | pyspark.sql.DataFrame | Aggregated DataFrame returned by op_groupby and op_windowed_groupby with a count column. |
Usage Examples
Basic Usage
from pyspark.sql import SparkSession
from settings import RIDE_SCHEMA, CONSUME_TOPIC_RIDES_CSV, TOPIC_WINDOWED_VENDOR_ID_COUNT
spark = SparkSession.builder.appName('streaming-examples').getOrCreate()
spark.sparkContext.setLogLevel('WARN')
# Read from Kafka
df_consume_stream = read_from_kafka(consume_topic=CONSUME_TOPIC_RIDES_CSV)
# Parse CSV messages into typed DataFrame
df_rides = parse_ride_from_kafka_message(df_consume_stream, RIDE_SCHEMA)
# Sink raw data to console
sink_console(df_rides, output_mode='append')
# Apply aggregations
df_trip_count_by_vendor_id = op_groupby(df_rides, ['vendor_id'])
df_windowed = op_windowed_groupby(df_rides, window_duration="10 minutes", slide_duration='5 minutes')
# Sink aggregation to console and Kafka
sink_console(df_trip_count_by_vendor_id)
df_messages = prepare_df_to_kafka_sink(df=df_windowed, value_columns=['count'], key_column='vendor_id')
sink_kafka(df=df_messages, topic=TOPIC_WINDOWED_VENDOR_ID_COUNT)
spark.streams.awaitAnyTermination()