Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Workflow:DataTalksClub Data engineering zoomcamp Kafka Stream Processing

From Leeroopedia


Knowledge Sources
Domains Data_Engineering, Stream_Processing, Real_Time_Analytics
Last Updated 2026-02-09 07:00 GMT

Overview

End-to-end stream processing pipeline using Apache Kafka for message transport and PySpark Structured Streaming for real-time aggregation of NYC taxi ride data.

Description

This workflow demonstrates real-time data processing using a Kafka-based streaming architecture. A Python producer reads taxi ride records from CSV files and publishes them as JSON messages to a Kafka topic. A PySpark Structured Streaming consumer subscribes to the topic, parses the incoming messages, and performs both simple and windowed aggregations (trip counts by vendor, windowed trip counts). Results are written to the console for debugging or back to a Kafka topic for downstream consumption. The architecture also supports Avro serialization with Schema Registry and alternative brokers like Redpanda.

Usage

Execute this workflow when you need to process taxi ride data in real-time or near-real-time as it arrives. Use it when you have a running Kafka (or Redpanda) cluster and need to compute streaming aggregations such as trip counts per vendor within sliding time windows. This is appropriate when batch processing latency is unacceptable and you need continuous, incremental results.

Execution Steps

Step 1: Infrastructure Setup

Deploy Apache Kafka (or Redpanda as a drop-in replacement) and its supporting services using Docker Compose. This includes the broker, Zookeeper (for Kafka), and optionally the Confluent Schema Registry for Avro serialization support.

Key considerations:

  • Kafka requires Zookeeper for cluster coordination (Redpanda does not)
  • Schema Registry is needed only when using Avro serialization
  • Broker configuration must expose both internal (Docker) and external (host) listeners
  • Docker Compose networking allows producers and consumers to connect via container hostnames

Step 2: Data Model Definition

Define the ride data model class that maps CSV columns to structured fields. The model includes vendor ID, pickup/dropoff timestamps, location IDs, passenger count, trip distance, rate code, and fare components. Serialization methods convert the model to JSON for Kafka transport.

Key considerations:

  • The data model must support both serialization (to JSON/Avro) and deserialization
  • Key field (pickup location ID) is used for Kafka message partitioning
  • Date fields require special handling in JSON serialization (default=str)

Step 3: Producer Implementation

Build a Kafka producer that reads taxi ride records from a CSV file, constructs Ride objects, and publishes each record as a JSON-serialized message to a Kafka topic. The producer configures bootstrap servers, key serializer (string encoding), and value serializer (JSON encoding).

Key considerations:

  • CSV reader skips the header row and maps columns positionally to the Ride constructor
  • Key serializer encodes the pickup location ID as the partition key
  • Value serializer converts the Ride object to JSON using its __dict__ attribute
  • Error handling catches KafkaTimeoutError for broker connectivity issues

Step 4: Consumer Implementation

Build a Kafka consumer that subscribes to the ride topic and deserializes JSON messages back into Ride objects. The consumer polls for messages in a loop, printing each record for verification. It handles graceful shutdown on keyboard interrupt.

Key considerations:

  • Consumer group ID ensures coordinated consumption across multiple instances
  • auto_offset_reset='earliest' starts from the beginning of the topic on first run
  • enable_auto_commit=True automatically tracks consumed offsets
  • Poll timeout of 1 second allows keyboard interrupt handling

Step 5: Stream Processing with PySpark

Build a PySpark Structured Streaming application that reads from the Kafka topic as a streaming DataFrame, parses the CSV-formatted message values into typed columns, and applies aggregation operations. Two aggregation patterns are demonstrated: simple groupBy (count by vendor) and windowed groupBy (count by vendor within sliding time windows).

Key considerations:

  • readStream with Kafka format connects Spark to the Kafka topic
  • Message values are parsed by splitting on comma delimiter and casting to schema types
  • Window aggregation uses 10-minute windows with 5-minute slide duration
  • Results can be written to console (debugging), memory (testing), or back to Kafka (production)

Step 6: Output Sink Configuration

Configure where streaming results are written. Console sink provides debugging output. Kafka sink publishes aggregated results back to a separate Kafka topic for downstream consumers. Both sinks support configurable output modes (complete for aggregations, append for raw events).

Key considerations:

  • Complete output mode rewrites all aggregation results each trigger
  • Append output mode only emits new rows (used for non-aggregated streams)
  • Checkpoint location ensures exactly-once processing semantics across restarts
  • awaitAnyTermination blocks until all streaming queries complete

Execution Diagram

GitHub URL

Workflow Repository