Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:DataTalksClub Data engineering zoomcamp Redpanda PySpark Streaming

From Leeroopedia


Knowledge Sources
Domains Streaming, Kafka
Last Updated 2026-02-09 00:00 GMT

Overview

PySpark Structured Streaming pipeline that reads CSV-formatted ride data from a Kafka/Redpanda topic, parses it into a typed DataFrame, applies groupBy and windowed aggregations, and sinks results to both console and Kafka.

Description

This module implements a complete PySpark Structured Streaming pipeline with the following functions:

  1. read_from_kafka(consume_topic: str) -- Creates a Spark Streaming DataFrame by connecting to Kafka at localhost:9092,broker:29092, subscribing to the specified topic, reading from the earliest offset, and using a checkpoint directory.
  2. parse_ride_from_kafka_message(df, schema) -- Asserts that the DataFrame is streaming, casts key and value to strings, splits the CSV value column by ", ", and expands each element into a top-level column based on the provided RIDE_SCHEMA (a StructType with fields: vendor_id, tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count, trip_distance, payment_type, total_amount).
  3. sink_console(df, output_mode, processing_time) -- Writes the streaming DataFrame to the console with configurable output mode (default complete) and trigger interval (default 5 seconds). Returns a StreamingQuery.
  4. sink_memory(df, query_name, query_template) -- Writes the streaming DataFrame to an in-memory table, then runs a SQL query against it. Returns both the query results and the StreamingQuery.
  5. sink_kafka(df, topic) -- Writes the streaming DataFrame to a Kafka topic at localhost:9092,broker:29092 in complete output mode with checkpointing.
  6. prepare_df_to_kafka_sink(df, value_columns, key_column) -- Prepares a DataFrame for Kafka output by concatenating value_columns into a single value column and optionally renaming a column to key.
  7. op_groupby(df, column_names) -- Applies a groupBy aggregation with count() on the specified columns.
  8. op_windowed_groupby(df, window_duration, slide_duration) -- Applies a windowed aggregation on tpep_pickup_datetime grouped by vendor_id, with configurable window and slide durations.

The __main__ block orchestrates the full pipeline: reads from rides_csv topic, parses with RIDE_SCHEMA, sinks raw data to console in append mode, computes trip count by vendor_id, computes windowed trip count, sinks the groupBy result to console, and publishes the windowed result to the vendor_counts_windowed Kafka topic.

Usage

Use this implementation as a reference for building PySpark Structured Streaming pipelines that consume from Kafka/Redpanda, parse CSV data into typed DataFrames, apply aggregations (including time-windowed aggregations), and sink results to console or back to Kafka. Requires a running Spark cluster with the Kafka connector JAR.

Code Reference

Source Location

Signature

def read_from_kafka(consume_topic: str):
    df_stream = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092,broker:29092") \
        .option("subscribe", consume_topic) \
        .option("startingOffsets", "earliest") \
        .option("checkpointLocation", "checkpoint") \
        .load()
    return df_stream


def parse_ride_from_kafka_message(df, schema):
    assert df.isStreaming is True, "DataFrame doesn't receive streaming data"
    df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    col = F.split(df['value'], ', ')
    for idx, field in enumerate(schema):
        df = df.withColumn(field.name, col.getItem(idx).cast(field.dataType))
    return df.select([field.name for field in schema])


def sink_console(df, output_mode: str = 'complete', processing_time: str = '5 seconds'):
    ...

def sink_memory(df, query_name, query_template):
    ...

def sink_kafka(df, topic):
    ...

def prepare_df_to_kafka_sink(df, value_columns, key_column=None):
    ...

def op_groupby(df, column_names):
    df_aggregation = df.groupBy(column_names).count()
    return df_aggregation


def op_windowed_groupby(df, window_duration, slide_duration):
    df_windowed_aggregation = df.groupBy(
        F.window(timeColumn=df.tpep_pickup_datetime, windowDuration=window_duration, slideDuration=slide_duration),
        df.vendor_id
    ).count()
    return df_windowed_aggregation

Import

from streaming import read_from_kafka, parse_ride_from_kafka_message, sink_console, sink_memory, sink_kafka, prepare_df_to_kafka_sink, op_groupby, op_windowed_groupby

I/O Contract

Inputs

Name Type Required Description
consume_topic str Yes Kafka topic name to subscribe to and read streaming data from.
df pyspark.sql.DataFrame Yes A Spark Streaming DataFrame (for parse, sink, and aggregation functions).
schema pyspark.sql.types.StructType Yes Schema defining the column names and types for parsing CSV values. Default is RIDE_SCHEMA with 7 fields: vendor_id (IntegerType), tpep_pickup_datetime (TimestampType), tpep_dropoff_datetime (TimestampType), passenger_count (IntegerType), trip_distance (FloatType), payment_type (IntegerType), total_amount (FloatType).
output_mode str No Spark output mode for sinks: complete, append, or update. Defaults to complete.
processing_time str No Trigger interval for console sink. Defaults to 5 seconds.
topic str Yes (for sink_kafka) Kafka topic to write aggregated results to.
window_duration str Yes (for op_windowed_groupby) Window size for time-based aggregation (e.g., 10 minutes).
slide_duration str Yes (for op_windowed_groupby) Slide interval for the sliding window (e.g., 5 minutes).

Outputs

Name Type Description
df_stream pyspark.sql.DataFrame Streaming DataFrame returned by read_from_kafka with raw Kafka message columns (key, value, topic, partition, offset, timestamp, timestampType).
df_parsed pyspark.sql.DataFrame Streaming DataFrame returned by parse_ride_from_kafka_message with typed columns matching the provided schema.
write_query pyspark.sql.streaming.StreamingQuery Streaming query handle returned by sink_console, sink_memory, and sink_kafka.
df_aggregation pyspark.sql.DataFrame Aggregated DataFrame returned by op_groupby and op_windowed_groupby with a count column.

Usage Examples

Basic Usage

from pyspark.sql import SparkSession
from settings import RIDE_SCHEMA, CONSUME_TOPIC_RIDES_CSV, TOPIC_WINDOWED_VENDOR_ID_COUNT

spark = SparkSession.builder.appName('streaming-examples').getOrCreate()
spark.sparkContext.setLogLevel('WARN')

# Read from Kafka
df_consume_stream = read_from_kafka(consume_topic=CONSUME_TOPIC_RIDES_CSV)

# Parse CSV messages into typed DataFrame
df_rides = parse_ride_from_kafka_message(df_consume_stream, RIDE_SCHEMA)

# Sink raw data to console
sink_console(df_rides, output_mode='append')

# Apply aggregations
df_trip_count_by_vendor_id = op_groupby(df_rides, ['vendor_id'])
df_windowed = op_windowed_groupby(df_rides, window_duration="10 minutes", slide_duration='5 minutes')

# Sink aggregation to console and Kafka
sink_console(df_trip_count_by_vendor_id)
df_messages = prepare_df_to_kafka_sink(df=df_windowed, value_columns=['count'], key_column='vendor_id')
sink_kafka(df=df_messages, topic=TOPIC_WINDOWED_VENDOR_ID_COUNT)

spark.streams.awaitAnyTermination()

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment