Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:DataExpert io Data engineer handbook Log processing

From Leeroopedia


Overview

Log_processing is the main orchestration function for the Flink Kafka-to-PostgreSQL streaming ETL pipeline. It sets up the execution environment, registers the geolocation UDF, creates source and sink tables, and executes a continuous INSERT INTO ... SELECT query that enriches web traffic events with geodata.

Type

API Doc

Source

start_job.py:L115-150

Signature

def log_processing() -> None

Detailed Description

The log_processing function orchestrates the entire streaming ETL pipeline through the following steps:

Step 1: Create the Execution Environment

env = StreamExecutionEnvironment.get_execution_environment()

Step 2: Enable Checkpointing

Checkpointing is enabled with a 10-second interval for fault tolerance:

env.enable_checkpointing(10000)

Step 3: Set Parallelism

Parallelism is set to 1 for ordered processing:

env.set_parallelism(1)

Step 4: Create StreamTableEnvironment

t_env = StreamTableEnvironment.create(stream_execution_environment=env)

Step 5: Register the Geolocation UDF

The get_location UDF is registered so it can be used in SQL queries:

t_env.create_temporary_function("get_location", get_location)

Step 6: Create Kafka Source Table

source_table = create_events_source_kafka(t_env)

Step 7: Create PostgreSQL Sink Table

sink_table = create_processed_events_sink_postgres(t_env)

Step 8: Execute the Streaming Query

The continuous ETL query selects from the Kafka source, applies the geolocation UDF, and inserts into the PostgreSQL sink:

t_env.execute_sql(f"""
    INSERT INTO {sink_table}
    SELECT
        ip,
        event_timestamp,
        referrer,
        host,
        url,
        get_location(ip) AS geodata
    FROM {source_table}
""")

Full Function

def log_processing() -> None:
    env = StreamExecutionEnvironment.get_execution_environment()
    env.enable_checkpointing(10000)
    env.set_parallelism(1)

    t_env = StreamTableEnvironment.create(stream_execution_environment=env)
    t_env.create_temporary_function("get_location", get_location)

    source_table = create_events_source_kafka(t_env)
    sink_table = create_processed_events_sink_postgres(t_env)

    t_env.execute_sql(f"""
        INSERT INTO {sink_table}
        SELECT
            ip,
            event_timestamp,
            referrer,
            host,
            url,
            get_location(ip) AS geodata
        FROM {source_table}
    """)

Inputs / Outputs

Inputs:

  • Environment variables for Kafka configuration:
    • KAFKA_URL, KAFKA_TOPIC, KAFKA_GROUP
    • KAFKA_WEB_TRAFFIC_KEY, KAFKA_WEB_TRAFFIC_SECRET
  • Environment variables for PostgreSQL configuration:
    • POSTGRES_URL, POSTGRES_USER, POSTGRES_PASSWORD

Outputs:

  • A continuous streaming job that writes enriched events to the processed_events table in PostgreSQL

Related Pages

Metadata

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment