Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:DataTalksClub Data engineering zoomcamp Confluent PySpark Streaming

From Leeroopedia


Knowledge Sources
Domains Streaming, Kafka
Last Updated 2026-02-09 00:00 GMT

Overview

PySpark Structured Streaming pipeline that connects to Confluent Cloud Kafka with SASL_SSL authentication, merges green and FHV taxi streams into a unified RIDES_TOPIC, parses the combined data, aggregates by PULocationID, and sinks the results to the console.

Description

This module implements a multi-stage PySpark Structured Streaming pipeline designed to work with Confluent Cloud Kafka (requiring SASL_SSL authentication). The pipeline performs the following steps:

  1. Step 1: Consume from multiple topics -- Reads streaming DataFrames from GREEN_TAXI_TOPIC and FHV_TAXI_TOPIC using read_from_kafka, which configures SASL_SSL authentication using credentials from CONFLUENT_CLOUD_CONFIG.
  2. Step 2: Merge streams -- Publishes both green and FHV ride streams to a unified RIDES_TOPIC using sink_kafka in append mode.
  3. Step 3: Read unified topic -- Reads the merged RIDES_TOPIC back from Kafka and parses the CSV values using parse_rides with ALL_RIDE_SCHEMA (PULocationID and DOLocationID, both StringType). Null rows are dropped.
  4. Step 4: Aggregate -- Applies op_groupby on PULocationID to count rides per pickup location, sorted in descending order.
  5. Step 5: Sink to console -- Writes the aggregated DataFrame to the console in complete output mode.

Key functions:

  • read_from_kafka(consume_topic: str) -- Creates a Spark Streaming DataFrame with SASL_SSL options including kafka.security.protocol, kafka.sasl.mechanism, and kafka.sasl.jaas.config using PLAIN authentication with credentials from CONFLUENT_CLOUD_CONFIG.
  • parse_rides(df, schema) -- Casts key/value to strings, splits CSV values, expands to typed columns per schema, and drops null rows.
  • sink_console(df, output_mode, processing_time) -- Writes streaming DataFrame to console with awaitTermination() (blocking, unlike the Redpanda version).
  • sink_kafka(df, topic, output_mode) -- Writes streaming DataFrame to a Confluent Cloud Kafka topic with full SASL_SSL configuration.
  • op_groupby(df, column_names) -- Groups by specified columns and counts.

Usage

Use this implementation when you need to build a PySpark Structured Streaming pipeline that connects to Confluent Cloud Kafka (with SASL_SSL authentication), merges multiple taxi ride streams, and performs aggregations. It demonstrates how to configure Spark's Kafka connector for Confluent Cloud, which requires additional security options compared to local Redpanda/Kafka setups.

Code Reference

Source Location

Signature

def read_from_kafka(consume_topic: str):
    df_stream = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", CONFLUENT_CLOUD_CONFIG['bootstrap.servers']) \
        .option("subscribe", consume_topic) \
        .option("startingOffsets", "earliest") \
        .option("checkpointLocation", "checkpoint") \
        .option("kafka.security.protocol", "SASL_SSL") \
        .option("kafka.sasl.mechanism", "PLAIN") \
        .option("kafka.sasl.jaas.config",
                f'org.apache.kafka.common.security.plain.PlainLoginModule required '
                f'username="{CONFLUENT_CLOUD_CONFIG["sasl.username"]}" '
                f'password="{CONFLUENT_CLOUD_CONFIG["sasl.password"]}";') \
        .option("failOnDataLoss", False) \
        .load()
    return df_stream


def parse_rides(df, schema):
    assert df.isStreaming is True, "DataFrame doesn't receive streaming data"
    df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    col = F.split(df['value'], ', ')
    for idx, field in enumerate(schema):
        df = df.withColumn(field.name, col.getItem(idx).cast(field.dataType))
    df = df.na.drop()
    return df.select([field.name for field in schema])


def sink_console(df, output_mode: str = 'complete', processing_time: str = '5 seconds'):
    ...

def sink_kafka(df, topic, output_mode: str = 'complete'):
    ...

def op_groupby(df, column_names):
    df_aggregation = df.groupBy(column_names).count()
    return df_aggregation

Import

from streaming_confluent import read_from_kafka, parse_rides, sink_console, sink_kafka, op_groupby

I/O Contract

Inputs

Name Type Required Description
consume_topic str Yes Confluent Cloud Kafka topic name to subscribe to and read streaming data from.
df pyspark.sql.DataFrame Yes A Spark Streaming DataFrame (for parse, sink, and aggregation functions).
schema pyspark.sql.types.StructType Yes Schema for parsing CSV values. Default is ALL_RIDE_SCHEMA with 2 fields: PULocationID (StringType) and DOLocationID (StringType).
output_mode str No Spark output mode for sinks: complete, append, or update. Defaults to complete.
processing_time str No Trigger interval for console sink. Defaults to 5 seconds.
topic str Yes (for sink_kafka) Confluent Cloud Kafka topic to write streaming results to.
column_names list Yes (for op_groupby) Column names to group by for aggregation.

Outputs

Name Type Description
df_stream pyspark.sql.DataFrame Streaming DataFrame returned by read_from_kafka with raw Kafka message columns and SASL_SSL authentication configured.
df_parsed pyspark.sql.DataFrame Streaming DataFrame returned by parse_rides with columns PULocationID and DOLocationID (both StringType), with null rows dropped.
query pyspark.sql.streaming.StreamingQuery Streaming query handle returned by sink_console and sink_kafka. Note: sink_console calls awaitTermination() and blocks.
df_aggregation pyspark.sql.DataFrame Aggregated DataFrame returned by op_groupby with a count column.

Usage Examples

Basic Usage

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from settings import CONFLUENT_CLOUD_CONFIG, GREEN_TAXI_TOPIC, FHV_TAXI_TOPIC, RIDES_TOPIC, ALL_RIDE_SCHEMA

spark = SparkSession.builder.appName('streaming-homework').getOrCreate()
spark.sparkContext.setLogLevel('WARN')

# Step 1: Consume from green and FHV topics
df_green_rides = read_from_kafka(consume_topic=GREEN_TAXI_TOPIC)
df_fhv_rides = read_from_kafka(consume_topic=FHV_TAXI_TOPIC)

# Step 2: Merge into unified RIDES_TOPIC
kafka_sink_green_query = sink_kafka(df=df_green_rides, topic=RIDES_TOPIC, output_mode='append')
kafka_sink_fhv_query = sink_kafka(df=df_fhv_rides, topic=RIDES_TOPIC, output_mode='append')

# Step 3: Read and parse unified topic
df_all_rides = read_from_kafka(consume_topic=RIDES_TOPIC)
df_all_rides = parse_rides(df_all_rides, ALL_RIDE_SCHEMA)

# Step 4: Aggregate by PULocationID
df_pu_location_count = op_groupby(df_all_rides, ['PULocationID'])
df_pu_location_count = df_pu_location_count.sort(F.col('count').desc())

# Step 5: Sink to console
console_sink = sink_console(df_pu_location_count, output_mode='complete')

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment