Implementation:DataExpert io Data engineer handbook Log processing
Overview
Log_processing is the main orchestration function for the Flink Kafka-to-PostgreSQL streaming ETL pipeline. It sets up the execution environment, registers the geolocation UDF, creates source and sink tables, and executes a continuous INSERT INTO ... SELECT query that enriches web traffic events with geodata.
Type
API Doc
Source
start_job.py:L115-150
Signature
def log_processing() -> None
Detailed Description
The log_processing function orchestrates the entire streaming ETL pipeline through the following steps:
Step 1: Create the Execution Environment
env = StreamExecutionEnvironment.get_execution_environment()
Step 2: Enable Checkpointing
Checkpointing is enabled with a 10-second interval for fault tolerance:
env.enable_checkpointing(10000)
Step 3: Set Parallelism
Parallelism is set to 1 for ordered processing:
env.set_parallelism(1)
Step 4: Create StreamTableEnvironment
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
Step 5: Register the Geolocation UDF
The get_location UDF is registered so it can be used in SQL queries:
t_env.create_temporary_function("get_location", get_location)
Step 6: Create Kafka Source Table
source_table = create_events_source_kafka(t_env)
Step 7: Create PostgreSQL Sink Table
sink_table = create_processed_events_sink_postgres(t_env)
Step 8: Execute the Streaming Query
The continuous ETL query selects from the Kafka source, applies the geolocation UDF, and inserts into the PostgreSQL sink:
t_env.execute_sql(f"""
INSERT INTO {sink_table}
SELECT
ip,
event_timestamp,
referrer,
host,
url,
get_location(ip) AS geodata
FROM {source_table}
""")
Full Function
def log_processing() -> None:
env = StreamExecutionEnvironment.get_execution_environment()
env.enable_checkpointing(10000)
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
t_env.create_temporary_function("get_location", get_location)
source_table = create_events_source_kafka(t_env)
sink_table = create_processed_events_sink_postgres(t_env)
t_env.execute_sql(f"""
INSERT INTO {sink_table}
SELECT
ip,
event_timestamp,
referrer,
host,
url,
get_location(ip) AS geodata
FROM {source_table}
""")
Inputs / Outputs
Inputs:
- Environment variables for Kafka configuration:
KAFKA_URL,KAFKA_TOPIC,KAFKA_GROUPKAFKA_WEB_TRAFFIC_KEY,KAFKA_WEB_TRAFFIC_SECRET
- Environment variables for PostgreSQL configuration:
POSTGRES_URL,POSTGRES_USER,POSTGRES_PASSWORD
Outputs:
- A continuous streaming job that writes enriched events to the
processed_eventstable in PostgreSQL
Related Pages
- Principle:DataExpert_io_Data_engineer_handbook_Streaming_ETL_Pipeline
- Environment:DataExpert_io_Data_engineer_handbook_Flink_Kafka_Docker_Environment
- Heuristic:DataExpert_io_Data_engineer_handbook_Flink_Checkpointing_Interval_Tuning