Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Workflow:DataExpert io Data engineer handbook Flink Kafka Streaming Pipeline

From Leeroopedia


Knowledge Sources
Domains Data_Engineering, Stream_Processing, Apache_Flink, Apache_Kafka
Last Updated 2026-02-09 06:00 GMT

Overview

End-to-end process for deploying a PyFlink streaming pipeline that consumes web traffic events from Kafka, enriches them with geolocation data, and sinks results to PostgreSQL.

Description

This workflow sets up and runs a real-time data streaming pipeline as part of Week 4 (Apache Flink Training) of the intermediate bootcamp. It provisions a Flink cluster (JobManager + TaskManager), PostgreSQL sink, and connects to a cloud-hosted Confluent Kafka topic producing live web traffic events. The pipeline implements two processing patterns: a pass-through enrichment job that adds IP geolocation via an external API, and a windowed aggregation job that computes 5-minute tumbling window hit counts per host. Both jobs use PyFlink's Table API with SQL DDL for source/sink definitions and JDBC connectors for PostgreSQL output.

Usage

Execute this workflow when you need to process real-time streaming data from Kafka using Apache Flink and persist results to a relational database. This is appropriate when you have completed the batch processing exercises (Weeks 1-3) and are ready to work with streaming semantics, watermarks, checkpointing, and windowed aggregations.

Execution Steps

Step 1: Configure_Credentials

Copy the example environment file and populate it with Kafka credentials, IP geolocation API key, and PostgreSQL connection settings. The credentials authenticate against a Confluent Cloud Kafka cluster and the ip2location.io geocoding service.

Key considerations:

  • Kafka credentials (key/secret) are obtained from the bootcamp portal
  • IP geocoding requires a free account at ip2location.io
  • PostgreSQL defaults to postgres/postgres on the Docker-internal host
  • The environment file must NOT be committed to version control
  • SASL_SSL protocol is used for Kafka authentication

Step 2: Build_And_Deploy_Infrastructure

Build the Docker image and deploy the Flink cluster using Docker Compose. The stack includes a Flink JobManager, TaskManager, and PostgreSQL database. The build process installs PyFlink, Kafka connectors, and JDBC drivers into the Flink image.

What happens:

  • Docker builds a custom Flink image with Python and connector dependencies
  • JobManager starts and registers with the resource manager
  • TaskManager connects and registers available task slots
  • PostgreSQL starts with the processed_events sink table
  • The Flink UI becomes available at localhost:8081

Step 3: Initialize_Sink_Tables

Create the PostgreSQL sink tables that Flink will write to. The init.sql script from the Week 1-2 environment creates the processed_events table. For the aggregation job, additional tables (processed_events_aggregated, processed_events_aggregated_source) are created via the Flink DDL.

Key considerations:

  • The processed_events table stores enriched individual events
  • Aggregation tables store windowed hit counts by host and referrer
  • Tables use JDBC connector configuration for Flink compatibility
  • Ensure the PostgreSQL container is fully started before running init SQL

Step 4: Submit_Streaming_Job

Submit the PyFlink job to the Flink cluster for execution. The job defines a Kafka source table (reading JSON-formatted web traffic events), a PostgreSQL sink table, and a transformation that enriches events with geolocation data from an external API via a User Defined Function (UDF).

What happens:

  • StreamExecutionEnvironment is configured with checkpointing every 10 seconds
  • StreamTableEnvironment provides the Table API for SQL-based stream processing
  • Kafka source table is created via SQL DDL with SASL_SSL authentication
  • GetLocation UDF is registered for IP-to-geolocation enrichment
  • INSERT INTO ... SELECT query streams enriched events to PostgreSQL
  • The job runs continuously until stopped

Step 5: Submit_Aggregation_Job

Optionally submit the windowed aggregation job that computes 5-minute tumbling window statistics. This job reads from the same Kafka topic but applies WATERMARK-based event time processing and tumbling window aggregations to produce per-host and per-host-per-referrer hit counts.

What happens:

  • Watermark strategy is set to event_time minus 15 seconds for late event handling
  • Tumble.over(5 minutes) defines the window size
  • Two aggregations run: host-level and host+referrer-level hit counts
  • Results are inserted into separate PostgreSQL aggregation tables
  • Parallelism is set to 3 for the aggregation job

Step 6: Verify_And_Monitor

Confirm the pipeline is operating correctly by checking the Flink UI for running jobs, triggering events from the web traffic source, and querying the PostgreSQL sink tables to verify data flow.

Key considerations:

  • Flink UI at localhost:8081 shows running jobs and task status
  • Trigger events by visiting the bootcamp web traffic generator
  • Query processed_events table to confirm rows are being inserted
  • Use make psql to connect to the containerized PostgreSQL
  • Data persists in a local postgres-data directory across container restarts

Execution Diagram

GitHub URL

Workflow Repository