Implementation:DataTalksClub Data engineering zoomcamp Confluent PySpark Streaming
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Kafka |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
PySpark Structured Streaming pipeline that connects to Confluent Cloud Kafka with SASL_SSL authentication, merges green and FHV taxi streams into a unified RIDES_TOPIC, parses the combined data, aggregates by PULocationID, and sinks the results to the console.
Description
This module implements a multi-stage PySpark Structured Streaming pipeline designed to work with Confluent Cloud Kafka (requiring SASL_SSL authentication). The pipeline performs the following steps:
- Step 1: Consume from multiple topics -- Reads streaming DataFrames from GREEN_TAXI_TOPIC and FHV_TAXI_TOPIC using read_from_kafka, which configures SASL_SSL authentication using credentials from CONFLUENT_CLOUD_CONFIG.
- Step 2: Merge streams -- Publishes both green and FHV ride streams to a unified RIDES_TOPIC using sink_kafka in append mode.
- Step 3: Read unified topic -- Reads the merged RIDES_TOPIC back from Kafka and parses the CSV values using parse_rides with ALL_RIDE_SCHEMA (PULocationID and DOLocationID, both StringType). Null rows are dropped.
- Step 4: Aggregate -- Applies op_groupby on PULocationID to count rides per pickup location, sorted in descending order.
- Step 5: Sink to console -- Writes the aggregated DataFrame to the console in complete output mode.
Key functions:
- read_from_kafka(consume_topic: str) -- Creates a Spark Streaming DataFrame with SASL_SSL options including kafka.security.protocol, kafka.sasl.mechanism, and kafka.sasl.jaas.config using PLAIN authentication with credentials from CONFLUENT_CLOUD_CONFIG.
- parse_rides(df, schema) -- Casts key/value to strings, splits CSV values, expands to typed columns per schema, and drops null rows.
- sink_console(df, output_mode, processing_time) -- Writes streaming DataFrame to console with awaitTermination() (blocking, unlike the Redpanda version).
- sink_kafka(df, topic, output_mode) -- Writes streaming DataFrame to a Confluent Cloud Kafka topic with full SASL_SSL configuration.
- op_groupby(df, column_names) -- Groups by specified columns and counts.
Usage
Use this implementation when you need to build a PySpark Structured Streaming pipeline that connects to Confluent Cloud Kafka (with SASL_SSL authentication), merges multiple taxi ride streams, and performs aggregations. It demonstrates how to configure Spark's Kafka connector for Confluent Cloud, which requires additional security options compared to local Redpanda/Kafka setups.
Code Reference
Source Location
- Repository: DataTalksClub_Data_engineering_zoomcamp
- File: cohorts/2023/week_6_stream_processing/streaming_confluent.py
- Lines: 1-100
Signature
def read_from_kafka(consume_topic: str):
df_stream = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", CONFLUENT_CLOUD_CONFIG['bootstrap.servers']) \
.option("subscribe", consume_topic) \
.option("startingOffsets", "earliest") \
.option("checkpointLocation", "checkpoint") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.sasl.jaas.config",
f'org.apache.kafka.common.security.plain.PlainLoginModule required '
f'username="{CONFLUENT_CLOUD_CONFIG["sasl.username"]}" '
f'password="{CONFLUENT_CLOUD_CONFIG["sasl.password"]}";') \
.option("failOnDataLoss", False) \
.load()
return df_stream
def parse_rides(df, schema):
assert df.isStreaming is True, "DataFrame doesn't receive streaming data"
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
col = F.split(df['value'], ', ')
for idx, field in enumerate(schema):
df = df.withColumn(field.name, col.getItem(idx).cast(field.dataType))
df = df.na.drop()
return df.select([field.name for field in schema])
def sink_console(df, output_mode: str = 'complete', processing_time: str = '5 seconds'):
...
def sink_kafka(df, topic, output_mode: str = 'complete'):
...
def op_groupby(df, column_names):
df_aggregation = df.groupBy(column_names).count()
return df_aggregation
Import
from streaming_confluent import read_from_kafka, parse_rides, sink_console, sink_kafka, op_groupby
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| consume_topic | str | Yes | Confluent Cloud Kafka topic name to subscribe to and read streaming data from. |
| df | pyspark.sql.DataFrame | Yes | A Spark Streaming DataFrame (for parse, sink, and aggregation functions). |
| schema | pyspark.sql.types.StructType | Yes | Schema for parsing CSV values. Default is ALL_RIDE_SCHEMA with 2 fields: PULocationID (StringType) and DOLocationID (StringType). |
| output_mode | str | No | Spark output mode for sinks: complete, append, or update. Defaults to complete. |
| processing_time | str | No | Trigger interval for console sink. Defaults to 5 seconds. |
| topic | str | Yes (for sink_kafka) | Confluent Cloud Kafka topic to write streaming results to. |
| column_names | list | Yes (for op_groupby) | Column names to group by for aggregation. |
Outputs
| Name | Type | Description |
|---|---|---|
| df_stream | pyspark.sql.DataFrame | Streaming DataFrame returned by read_from_kafka with raw Kafka message columns and SASL_SSL authentication configured. |
| df_parsed | pyspark.sql.DataFrame | Streaming DataFrame returned by parse_rides with columns PULocationID and DOLocationID (both StringType), with null rows dropped. |
| query | pyspark.sql.streaming.StreamingQuery | Streaming query handle returned by sink_console and sink_kafka. Note: sink_console calls awaitTermination() and blocks. |
| df_aggregation | pyspark.sql.DataFrame | Aggregated DataFrame returned by op_groupby with a count column. |
Usage Examples
Basic Usage
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from settings import CONFLUENT_CLOUD_CONFIG, GREEN_TAXI_TOPIC, FHV_TAXI_TOPIC, RIDES_TOPIC, ALL_RIDE_SCHEMA
spark = SparkSession.builder.appName('streaming-homework').getOrCreate()
spark.sparkContext.setLogLevel('WARN')
# Step 1: Consume from green and FHV topics
df_green_rides = read_from_kafka(consume_topic=GREEN_TAXI_TOPIC)
df_fhv_rides = read_from_kafka(consume_topic=FHV_TAXI_TOPIC)
# Step 2: Merge into unified RIDES_TOPIC
kafka_sink_green_query = sink_kafka(df=df_green_rides, topic=RIDES_TOPIC, output_mode='append')
kafka_sink_fhv_query = sink_kafka(df=df_fhv_rides, topic=RIDES_TOPIC, output_mode='append')
# Step 3: Read and parse unified topic
df_all_rides = read_from_kafka(consume_topic=RIDES_TOPIC)
df_all_rides = parse_rides(df_all_rides, ALL_RIDE_SCHEMA)
# Step 4: Aggregate by PULocationID
df_pu_location_count = op_groupby(df_all_rides, ['PULocationID'])
df_pu_location_count = df_pu_location_count.sort(F.col('count').desc())
# Step 5: Sink to console
console_sink = sink_console(df_pu_location_count, output_mode='complete')