Implementation:DataExpert io Data engineer handbook Create events source kafka
Appearance
Overview
Create_events_source_kafka is an API function that creates a Flink SQL source table backed by a Kafka topic. It constructs and executes a DDL statement that defines the events table, configuring the Kafka connector, SASL_SSL authentication, and JSON deserialization.
Type
API Doc
Source
start_job.py:L83-113
Signature
def create_events_source_kafka(t_env: StreamTableEnvironment) -> str
Detailed Description
The function builds a CREATE TABLE DDL statement for the events table and executes it against the provided StreamTableEnvironment. The table schema includes:
| Column | Type | Description |
|---|---|---|
| url | VARCHAR | The URL of the web event |
| referrer | VARCHAR | The referring URL |
| user_agent | VARCHAR | Browser user agent string |
| host | VARCHAR | The host of the request |
| ip | VARCHAR | Client IP address |
| headers | VARCHAR | HTTP headers |
| event_time | VARCHAR | Raw event timestamp as string |
| event_timestamp | TIMESTAMP(3) | Computed column -- AS TO_TIMESTAMP(event_time, 'yyyy-MM-ddTHH:mm:ss.SSSSSS')
|
The WITH clause configures:
'connector' = 'kafka''properties.bootstrap.servers'-- from environment variableKAFKA_URL'topic'-- from environment variableKAFKA_TOPIC'properties.group.id'-- from environment variableKAFKA_GROUP'properties.security.protocol' = 'SASL_SSL''properties.sasl.mechanism' = 'PLAIN''properties.sasl.jaas.config'-- constructed fromKAFKA_WEB_TRAFFIC_KEYandKAFKA_WEB_TRAFFIC_SECRET'json.ignore-parse-errors' = 'true''format' = 'json'
def create_events_source_kafka(t_env: StreamTableEnvironment) -> str:
kafka_key = os.environ["KAFKA_WEB_TRAFFIC_KEY"]
kafka_secret = os.environ["KAFKA_WEB_TRAFFIC_SECRET"]
table_name = "events"
source_ddl = f"""
CREATE TABLE {table_name} (
url VARCHAR,
referrer VARCHAR,
user_agent VARCHAR,
host VARCHAR,
ip VARCHAR,
headers VARCHAR,
event_time VARCHAR,
event_timestamp AS TO_TIMESTAMP(event_time,
'yyyy-MM-dd''T''HH:mm:ss.SSSSSS')
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '{os.environ['KAFKA_URL']}',
'topic' = '{os.environ['KAFKA_TOPIC']}',
'properties.group.id' = '{os.environ['KAFKA_GROUP']}',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' =
'org.apache.kafka.common.security.plain.PlainLoginModule required'
' username=\"{kafka_key}\" password=\"{kafka_secret}\";',
'json.ignore-parse-errors' = 'true',
'format' = 'json'
);
"""
t_env.execute_sql(source_ddl)
return table_name
Inputs / Outputs
Inputs:
t_env-- aStreamTableEnvironmentinstance- Environment variables:
KAFKA_URL-- Kafka bootstrap serversKAFKA_TOPIC-- topic name to consumeKAFKA_GROUP-- consumer group IDKAFKA_WEB_TRAFFIC_KEY-- SASL username / API keyKAFKA_WEB_TRAFFIC_SECRET-- SASL password / API secret
Outputs:
- Returns the table name string
"events"
Related Pages
- Principle:DataExpert_io_Data_engineer_handbook_Kafka_Source_Table_Definition
- Environment:DataExpert_io_Data_engineer_handbook_Flink_Kafka_Docker_Environment
Metadata
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment