Principle:DataTalksClub Data engineering zoomcamp Structured Stream Processing
| Page Metadata | |
|---|---|
| Knowledge Sources | DataTalksClub/data-engineering-zoomcamp (07-streaming) |
| Domains | Data_Engineering, Stream_Processing |
| Last Updated | 2026-02-09 14:00 GMT |
Overview
Continuous stream processing using micro-batch execution enables a streaming DataFrame to read from a message broker, parse messages according to a schema, and apply SQL-like transformations on live data.
Description
Structured stream processing bridges the gap between batch and real-time analytics by treating a continuous data stream as an unbounded table. Rather than processing one message at a time, the engine collects messages into micro-batches and applies the same DataFrame or SQL operations used in batch processing.
The key components of this pattern are:
- Streaming DataFrame: A DataFrame backed by a continuously updating data source. It exposes the same API as a static DataFrame (select, filter, groupBy, join) but operates on data that arrives incrementally.
- Source connector: The streaming DataFrame connects to a message broker (such as Kafka) using a format-specific reader. Configuration includes broker addresses, topic subscriptions, starting offsets, and checkpoint locations.
- Schema-based parsing: Raw messages from the broker arrive as key-value byte pairs. The parsing step casts these bytes to strings, splits the value by a delimiter, and maps each element to a named, typed column according to a predefined schema.
- Windowed aggregation: Streaming data is grouped into time-based windows (tumbling or sliding). A sliding window with a 10-minute duration and a 5-minute slide creates overlapping windows that capture events in rolling intervals. This enables time-series analytics like counting events per vendor per time window.
- Checkpoint management: The engine periodically saves its progress (which offsets have been processed) to a checkpoint directory. This allows the pipeline to recover from failures without reprocessing the entire topic.
Usage
Use this principle when:
- You need to run aggregations, joins, or transformations on data as it arrives, rather than waiting for a full batch.
- You want to reuse familiar DataFrame or SQL syntax for real-time analytics.
- You need windowed aggregations (tumbling, sliding, session windows) over time-series event data.
- You require fault-tolerant processing with exactly-once semantics via checkpoint recovery.
Theoretical Basis
Structured stream processing follows a read-parse-transform-output pipeline:
FUNCTION process_stream(consume_topic, schema):
-- Step 1: Connect to the message broker
raw_stream = streaming_engine
.readStream
.format("message_broker")
.option("bootstrap.servers", "broker:9092")
.option("subscribe", consume_topic)
.option("startingOffsets", "earliest")
.option("checkpointLocation", "checkpoint/")
.load()
-- Step 2: Parse raw bytes into typed columns
parsed_stream = raw_stream
.cast(key AS STRING, value AS STRING)
.split(value, delimiter=", ")
FOR EACH index, field IN schema:
parsed_stream.add_column(
name = field.name,
value = split_array[index].cast(field.type)
)
-- Step 3: Apply windowed aggregation
windowed_counts = parsed_stream
.groupBy(
window(
time_column = "pickup_datetime",
window_duration = "10 minutes",
slide_duration = "5 minutes"
),
"vendor_id"
)
.count()
-- Step 4: Write results to output sink
RETURN windowed_counts
The design principles of this pattern are:
- Unified batch and streaming API: The same DataFrame operations work on both static and streaming data. This eliminates the need to learn a separate streaming API and enables code reuse.
- Schema-driven parsing: The schema is defined separately from the parsing logic. Adding or removing fields requires only a schema change, not a code change.
- Sliding window semantics: A window with duration D and slide S emits a result for every interval of length S, where each result covers a window of length D. Events near window boundaries appear in multiple windows, ensuring no data is lost at boundaries.
- Checkpoint-based recovery: By persisting offsets and intermediate state to a checkpoint directory, the engine can restart from the last committed position after a failure, providing exactly-once processing guarantees.