Implementation:DataTalksClub Data engineering zoomcamp PySpark WriteStream Sink
| Page Metadata | |
|---|---|
| Knowledge Sources | repo: DataTalksClub/data-engineering-zoomcamp |
| Domains | Data_Engineering, Stream_Processing |
| Last Updated | 2026-02-09 14:00 GMT |
Overview
Concrete API documentation for the PySpark Structured Streaming sink functions that direct processed stream results to console output for debugging and to Kafka topics for production consumption, including a DataFrame preparation function that transforms multi-column aggregation results into Kafka's key-value schema.
Description
This implementation provides three functions for writing streaming results to output destinations:
sink_console(df, output_mode, processing_time): Writes the streaming DataFrame to the console. Configured withoutput_mode='complete'by default (required for aggregation queries where all result rows must be emitted on each trigger) and aprocessingTime='5 seconds'trigger interval. Thetruncate=Falseoption ensures wide columns are fully displayed. Returns aStreamingQueryhandle for monitoring and termination.
sink_kafka(df, topic): Writes the streaming DataFrame to a Kafka topic for downstream consumption. Connects to the broker atlocalhost:9092,broker:29092, usescompleteoutput mode, and persists progress to a checkpoint directory. The input DataFrame must already conform to Kafka's expected schema withkeyandvaluestring columns.
prepare_df_to_kafka_sink(df, value_columns, key_column): Transforms a multi-column aggregation DataFrame into Kafka's required two-column schema. It concatenates the specified value columns with a comma-space separator into a singlevaluestring column. If akey_columnis provided, it renames that column tokeyand casts it to a string. Returns a DataFrame with onlykeyandvaluecolumns.
Usage
Use this implementation to:
- Debug streaming pipelines by printing intermediate and final results to the console.
- Publish aggregated streaming results (e.g., windowed ride counts per vendor) to a Kafka topic for downstream services.
- Transform multi-column DataFrames into Kafka-compatible key-value format before writing to a Kafka sink.
- Control the latency-throughput tradeoff by adjusting the trigger processing time.
Code Reference
Source Location
| File | 07-streaming/python/streams-example/pyspark/streaming.py
|
| Lines | L35-74 (sink_console at L35, sink_kafka at L56, prepare_df_to_kafka_sink at L67) |
| Repository | DataTalksClub/data-engineering-zoomcamp |
Signature
def sink_console(df: DataFrame, output_mode: str = 'complete',
processing_time: str = '5 seconds') -> StreamingQuery: ...
def sink_kafka(df: DataFrame, topic: str) -> StreamingQuery: ...
def prepare_df_to_kafka_sink(df: DataFrame, value_columns: List[str],
key_column: str = None) -> DataFrame: ...
Import
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from settings import TOPIC_WINDOWED_VENDOR_ID_COUNT
I/O Contract
Inputs
| Parameter | Type | Description |
|---|---|---|
| df (sink_console) | Streaming DataFrame |
Any streaming DataFrame to be printed to the console |
| output_mode | str |
One of 'complete', 'append', or 'update'. Default: 'complete'
|
| processing_time | str |
Trigger interval as a duration string. Default: '5 seconds'
|
| df (sink_kafka) | Streaming DataFrame |
A streaming DataFrame with key (string) and value (string) columns
|
| topic | str |
Kafka topic name to write results to (e.g., 'vendor_counts_windowed')
|
| df (prepare) | DataFrame |
A multi-column DataFrame to be transformed for Kafka |
| value_columns | List[str] |
Column names to concatenate into the value field
|
| key_column | str or None |
Column name to use as the Kafka message key. If None, no key is set.
|
Outputs
| Function | Output Type | Description |
|---|---|---|
| sink_console | StreamingQuery |
A running query handle; results are printed to stdout every processing_time interval
|
| sink_kafka | StreamingQuery |
A running query handle; results are written to the specified Kafka topic with checkpointing |
| prepare_df_to_kafka_sink | DataFrame |
A DataFrame with two columns: key (string) and value (string, comma-separated concatenation)
|
Usage Examples
Writing parsed ride data to the console for debugging:
from streaming import read_from_kafka, parse_ride_from_kafka_message, sink_console
from settings import RIDE_SCHEMA, CONSUME_TOPIC_RIDES_CSV
# Read and parse the stream
df_stream = read_from_kafka(consume_topic=CONSUME_TOPIC_RIDES_CSV)
df_rides = parse_ride_from_kafka_message(df_stream, RIDE_SCHEMA)
# Write to console in append mode (no aggregation)
query = sink_console(df_rides, output_mode='append')
query.awaitTermination()
# Console output every 5 seconds:
# +----------+--------------------+---------------------+---------------+-------------+------------+------------+
# | vendor_id|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|payment_type|total_amount|
# +----------+--------------------+---------------------+---------------+-------------+------------+------------+
# | 1| 2021-01-01 00:15:56| 2021-01-01 00:31:07| 2| 4.5| 1| 18.66|
# ...
Writing windowed aggregation results to a Kafka topic:
from streaming import (
read_from_kafka, parse_ride_from_kafka_message,
op_windowed_groupby, prepare_df_to_kafka_sink, sink_kafka
)
from settings import RIDE_SCHEMA, CONSUME_TOPIC_RIDES_CSV, TOPIC_WINDOWED_VENDOR_ID_COUNT
# Build the streaming pipeline
df_stream = read_from_kafka(consume_topic=CONSUME_TOPIC_RIDES_CSV)
df_rides = parse_ride_from_kafka_message(df_stream, RIDE_SCHEMA)
# Windowed aggregation: count rides per vendor in 10-min windows sliding every 5 min
df_windowed = op_windowed_groupby(
df_rides,
window_duration='10 minutes',
slide_duration='5 minutes'
)
# Prepare the aggregation result for Kafka's key-value schema
df_kafka_ready = prepare_df_to_kafka_sink(
df=df_windowed,
value_columns=['count'],
key_column='vendor_id'
)
# Resulting schema: key (string), value (string)
# Example row: key="1", value="42"
# Write to Kafka topic
kafka_query = sink_kafka(
df=df_kafka_ready,
topic=TOPIC_WINDOWED_VENDOR_ID_COUNT # 'vendor_counts_windowed'
)
Running multiple sinks simultaneously:
from pyspark.sql import SparkSession
from streaming import (
read_from_kafka, parse_ride_from_kafka_message,
op_windowed_groupby, prepare_df_to_kafka_sink,
sink_console, sink_kafka
)
from settings import RIDE_SCHEMA, CONSUME_TOPIC_RIDES_CSV, TOPIC_WINDOWED_VENDOR_ID_COUNT
spark = SparkSession.builder.appName('streaming-examples').getOrCreate()
spark.sparkContext.setLogLevel('WARN')
df_stream = read_from_kafka(consume_topic=CONSUME_TOPIC_RIDES_CSV)
df_rides = parse_ride_from_kafka_message(df_stream, RIDE_SCHEMA)
# Sink 1: Raw rides to console (append mode)
sink_console(df_rides, output_mode='append')
# Sink 2: Aggregated counts to console (complete mode)
df_vendor_counts = df_rides.groupBy('vendor_id').count()
sink_console(df_vendor_counts)
# Sink 3: Windowed counts to Kafka topic
df_windowed = op_windowed_groupby(df_rides, '10 minutes', '5 minutes')
df_kafka_ready = prepare_df_to_kafka_sink(df_windowed, ['count'], 'vendor_id')
sink_kafka(df_kafka_ready, TOPIC_WINDOWED_VENDOR_ID_COUNT)
# Wait for all streaming queries to terminate
spark.streams.awaitAnyTermination()
Related Pages
- Principle:DataTalksClub_Data_engineering_zoomcamp_Stream_Output_Sink
- Implementation:DataTalksClub_Data_engineering_zoomcamp_PySpark_Kafka_ReadStream
- Implementation:DataTalksClub_Data_engineering_zoomcamp_Kafka_Docker_Compose_Setup
- Environment:DataTalksClub_Data_engineering_zoomcamp_Kafka_Confluent_Environment