Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:DataExpert io Data engineer handbook Log aggregation

From Leeroopedia


Overview

Log_aggregation is the main orchestration function for the Flink windowed aggregation pipeline. It sets up the execution environment, creates a watermarked Kafka source, creates two PostgreSQL sink tables, and runs two parallel windowed aggregation queries that compute per-host and per-host-per-referrer hit counts using 5-minute tumbling windows.

Type

API Doc

Source

aggregation_job.py:L80-131

Signature

def log_aggregation() -> None

Detailed Description

The log_aggregation function orchestrates the windowed aggregation pipeline through the following steps:

Step 1: Create the Execution Environment

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(3)
env.enable_checkpointing(10)

Parallelism is set to 3 for throughput, and checkpointing is enabled at 10-millisecond intervals.

Step 2: Create StreamTableEnvironment

t_env = StreamTableEnvironment.create(stream_execution_environment=env)

Step 3: Create Kafka Source with Watermark

The Kafka source table includes a watermark definition to handle late-arriving events:

event_timestamp AS TO_TIMESTAMP(event_time, 'yyyy-MM-dd''T''HH:mm:ss.SSSSSS'),
WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '15' SECOND

Step 4: Create Two PostgreSQL Sink Tables

Two sink tables are created:

Sink Table Columns Purpose
processed_events_aggregated event_hour TIMESTAMP(3), host VARCHAR, num_hits BIGINT Per-host hit counts
processed_events_aggregated_source event_hour TIMESTAMP(3), host VARCHAR, referrer VARCHAR, num_hits BIGINT Per-host-per-referrer hit counts

Step 5: Aggregation 1 -- Group by Window and Host

source_table.window(
    Tumble.over(lit(5).minutes).on(col("event_timestamp")).alias("w")
).group_by(
    col("w"), col("host")
).select(
    col("w").start.alias("event_hour"),
    col("host"),
    col("host").count.alias("num_hits")
).execute_insert("processed_events_aggregated")

This aggregation computes the number of hits per host within each 5-minute tumbling window.

Step 6: Aggregation 2 -- Group by Window, Host, and Referrer

source_table.window(
    Tumble.over(lit(5).minutes).on(col("event_timestamp")).alias("w")
).group_by(
    col("w"), col("host"), col("referrer")
).select(
    col("w").start.alias("event_hour"),
    col("host"),
    col("referrer"),
    col("host").count.alias("num_hits")
).execute_insert("processed_events_aggregated_source")

This aggregation computes the number of hits per host-referrer combination within each 5-minute tumbling window.

Inputs / Outputs

Inputs:

  • Kafka source with watermarked timestamps
  • Environment variables for Kafka and PostgreSQL configuration

Outputs:

  • Two PostgreSQL aggregation tables:
    • processed_events_aggregated -- windowed counts grouped by host
    • processed_events_aggregated_source -- windowed counts grouped by host and referrer

Related Pages

Metadata

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment