Implementation:DataExpert io Data engineer handbook Log aggregation
Overview
Log_aggregation is the main orchestration function for the Flink windowed aggregation pipeline. It sets up the execution environment, creates a watermarked Kafka source, creates two PostgreSQL sink tables, and runs two parallel windowed aggregation queries that compute per-host and per-host-per-referrer hit counts using 5-minute tumbling windows.
Type
API Doc
Source
aggregation_job.py:L80-131
Signature
def log_aggregation() -> None
Detailed Description
The log_aggregation function orchestrates the windowed aggregation pipeline through the following steps:
Step 1: Create the Execution Environment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(3)
env.enable_checkpointing(10)
Parallelism is set to 3 for throughput, and checkpointing is enabled at 10-millisecond intervals.
Step 2: Create StreamTableEnvironment
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
Step 3: Create Kafka Source with Watermark
The Kafka source table includes a watermark definition to handle late-arriving events:
event_timestamp AS TO_TIMESTAMP(event_time, 'yyyy-MM-dd''T''HH:mm:ss.SSSSSS'),
WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '15' SECOND
Step 4: Create Two PostgreSQL Sink Tables
Two sink tables are created:
| Sink Table | Columns | Purpose |
|---|---|---|
| processed_events_aggregated | event_hour TIMESTAMP(3), host VARCHAR, num_hits BIGINT | Per-host hit counts |
| processed_events_aggregated_source | event_hour TIMESTAMP(3), host VARCHAR, referrer VARCHAR, num_hits BIGINT | Per-host-per-referrer hit counts |
Step 5: Aggregation 1 -- Group by Window and Host
source_table.window(
Tumble.over(lit(5).minutes).on(col("event_timestamp")).alias("w")
).group_by(
col("w"), col("host")
).select(
col("w").start.alias("event_hour"),
col("host"),
col("host").count.alias("num_hits")
).execute_insert("processed_events_aggregated")
This aggregation computes the number of hits per host within each 5-minute tumbling window.
Step 6: Aggregation 2 -- Group by Window, Host, and Referrer
source_table.window(
Tumble.over(lit(5).minutes).on(col("event_timestamp")).alias("w")
).group_by(
col("w"), col("host"), col("referrer")
).select(
col("w").start.alias("event_hour"),
col("host"),
col("referrer"),
col("host").count.alias("num_hits")
).execute_insert("processed_events_aggregated_source")
This aggregation computes the number of hits per host-referrer combination within each 5-minute tumbling window.
Inputs / Outputs
Inputs:
- Kafka source with watermarked timestamps
- Environment variables for Kafka and PostgreSQL configuration
Outputs:
- Two PostgreSQL aggregation tables:
processed_events_aggregated-- windowed counts grouped by hostprocessed_events_aggregated_source-- windowed counts grouped by host and referrer
Related Pages
- Principle:DataExpert_io_Data_engineer_handbook_Windowed_Aggregation
- Environment:DataExpert_io_Data_engineer_handbook_Flink_Kafka_Docker_Environment
- Heuristic:DataExpert_io_Data_engineer_handbook_Flink_Checkpointing_Interval_Tuning
- Heuristic:DataExpert_io_Data_engineer_handbook_Watermark_Late_Arrival_Tolerance