Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:DataTalksClub Data engineering zoomcamp PySpark WriteStream Sink

From Leeroopedia


Page Metadata
Knowledge Sources repo: DataTalksClub/data-engineering-zoomcamp
Domains Data_Engineering, Stream_Processing
Last Updated 2026-02-09 14:00 GMT

Overview

Concrete API documentation for the PySpark Structured Streaming sink functions that direct processed stream results to console output for debugging and to Kafka topics for production consumption, including a DataFrame preparation function that transforms multi-column aggregation results into Kafka's key-value schema.

Description

This implementation provides three functions for writing streaming results to output destinations:

  • sink_console(df, output_mode, processing_time): Writes the streaming DataFrame to the console. Configured with output_mode='complete' by default (required for aggregation queries where all result rows must be emitted on each trigger) and a processingTime='5 seconds' trigger interval. The truncate=False option ensures wide columns are fully displayed. Returns a StreamingQuery handle for monitoring and termination.
  • sink_kafka(df, topic): Writes the streaming DataFrame to a Kafka topic for downstream consumption. Connects to the broker at localhost:9092,broker:29092, uses complete output mode, and persists progress to a checkpoint directory. The input DataFrame must already conform to Kafka's expected schema with key and value string columns.
  • prepare_df_to_kafka_sink(df, value_columns, key_column): Transforms a multi-column aggregation DataFrame into Kafka's required two-column schema. It concatenates the specified value columns with a comma-space separator into a single value string column. If a key_column is provided, it renames that column to key and casts it to a string. Returns a DataFrame with only key and value columns.

Usage

Use this implementation to:

  • Debug streaming pipelines by printing intermediate and final results to the console.
  • Publish aggregated streaming results (e.g., windowed ride counts per vendor) to a Kafka topic for downstream services.
  • Transform multi-column DataFrames into Kafka-compatible key-value format before writing to a Kafka sink.
  • Control the latency-throughput tradeoff by adjusting the trigger processing time.

Code Reference

Source Location

File 07-streaming/python/streams-example/pyspark/streaming.py
Lines L35-74 (sink_console at L35, sink_kafka at L56, prepare_df_to_kafka_sink at L67)
Repository DataTalksClub/data-engineering-zoomcamp

Signature

def sink_console(df: DataFrame, output_mode: str = 'complete',
                 processing_time: str = '5 seconds') -> StreamingQuery: ...

def sink_kafka(df: DataFrame, topic: str) -> StreamingQuery: ...

def prepare_df_to_kafka_sink(df: DataFrame, value_columns: List[str],
                              key_column: str = None) -> DataFrame: ...

Import

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from settings import TOPIC_WINDOWED_VENDOR_ID_COUNT

I/O Contract

Inputs

Parameter Type Description
df (sink_console) Streaming DataFrame Any streaming DataFrame to be printed to the console
output_mode str One of 'complete', 'append', or 'update'. Default: 'complete'
processing_time str Trigger interval as a duration string. Default: '5 seconds'
df (sink_kafka) Streaming DataFrame A streaming DataFrame with key (string) and value (string) columns
topic str Kafka topic name to write results to (e.g., 'vendor_counts_windowed')
df (prepare) DataFrame A multi-column DataFrame to be transformed for Kafka
value_columns List[str] Column names to concatenate into the value field
key_column str or None Column name to use as the Kafka message key. If None, no key is set.

Outputs

Function Output Type Description
sink_console StreamingQuery A running query handle; results are printed to stdout every processing_time interval
sink_kafka StreamingQuery A running query handle; results are written to the specified Kafka topic with checkpointing
prepare_df_to_kafka_sink DataFrame A DataFrame with two columns: key (string) and value (string, comma-separated concatenation)

Usage Examples

Writing parsed ride data to the console for debugging:

from streaming import read_from_kafka, parse_ride_from_kafka_message, sink_console
from settings import RIDE_SCHEMA, CONSUME_TOPIC_RIDES_CSV

# Read and parse the stream
df_stream = read_from_kafka(consume_topic=CONSUME_TOPIC_RIDES_CSV)
df_rides = parse_ride_from_kafka_message(df_stream, RIDE_SCHEMA)

# Write to console in append mode (no aggregation)
query = sink_console(df_rides, output_mode='append')
query.awaitTermination()
# Console output every 5 seconds:
# +----------+--------------------+---------------------+---------------+-------------+------------+------------+
# | vendor_id|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|payment_type|total_amount|
# +----------+--------------------+---------------------+---------------+-------------+------------+------------+
# |         1| 2021-01-01 00:15:56|  2021-01-01 00:31:07|              2|          4.5|           1|       18.66|
# ...

Writing windowed aggregation results to a Kafka topic:

from streaming import (
    read_from_kafka, parse_ride_from_kafka_message,
    op_windowed_groupby, prepare_df_to_kafka_sink, sink_kafka
)
from settings import RIDE_SCHEMA, CONSUME_TOPIC_RIDES_CSV, TOPIC_WINDOWED_VENDOR_ID_COUNT

# Build the streaming pipeline
df_stream = read_from_kafka(consume_topic=CONSUME_TOPIC_RIDES_CSV)
df_rides = parse_ride_from_kafka_message(df_stream, RIDE_SCHEMA)

# Windowed aggregation: count rides per vendor in 10-min windows sliding every 5 min
df_windowed = op_windowed_groupby(
    df_rides,
    window_duration='10 minutes',
    slide_duration='5 minutes'
)

# Prepare the aggregation result for Kafka's key-value schema
df_kafka_ready = prepare_df_to_kafka_sink(
    df=df_windowed,
    value_columns=['count'],
    key_column='vendor_id'
)
# Resulting schema: key (string), value (string)
# Example row: key="1", value="42"

# Write to Kafka topic
kafka_query = sink_kafka(
    df=df_kafka_ready,
    topic=TOPIC_WINDOWED_VENDOR_ID_COUNT  # 'vendor_counts_windowed'
)

Running multiple sinks simultaneously:

from pyspark.sql import SparkSession
from streaming import (
    read_from_kafka, parse_ride_from_kafka_message,
    op_windowed_groupby, prepare_df_to_kafka_sink,
    sink_console, sink_kafka
)
from settings import RIDE_SCHEMA, CONSUME_TOPIC_RIDES_CSV, TOPIC_WINDOWED_VENDOR_ID_COUNT

spark = SparkSession.builder.appName('streaming-examples').getOrCreate()
spark.sparkContext.setLogLevel('WARN')

df_stream = read_from_kafka(consume_topic=CONSUME_TOPIC_RIDES_CSV)
df_rides = parse_ride_from_kafka_message(df_stream, RIDE_SCHEMA)

# Sink 1: Raw rides to console (append mode)
sink_console(df_rides, output_mode='append')

# Sink 2: Aggregated counts to console (complete mode)
df_vendor_counts = df_rides.groupBy('vendor_id').count()
sink_console(df_vendor_counts)

# Sink 3: Windowed counts to Kafka topic
df_windowed = op_windowed_groupby(df_rides, '10 minutes', '5 minutes')
df_kafka_ready = prepare_df_to_kafka_sink(df_windowed, ['count'], 'vendor_id')
sink_kafka(df_kafka_ready, TOPIC_WINDOWED_VENDOR_ID_COUNT)

# Wait for all streaming queries to terminate
spark.streams.awaitAnyTermination()

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment