Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:DataExpert io Data engineer handbook Create events source kafka

From Leeroopedia


Overview

Create_events_source_kafka is an API function that creates a Flink SQL source table backed by a Kafka topic. It constructs and executes a DDL statement that defines the events table, configuring the Kafka connector, SASL_SSL authentication, and JSON deserialization.

Type

API Doc

Source

start_job.py:L83-113

Signature

def create_events_source_kafka(t_env: StreamTableEnvironment) -> str

Detailed Description

The function builds a CREATE TABLE DDL statement for the events table and executes it against the provided StreamTableEnvironment. The table schema includes:

Column Type Description
url VARCHAR The URL of the web event
referrer VARCHAR The referring URL
user_agent VARCHAR Browser user agent string
host VARCHAR The host of the request
ip VARCHAR Client IP address
headers VARCHAR HTTP headers
event_time VARCHAR Raw event timestamp as string
event_timestamp TIMESTAMP(3) Computed column -- AS TO_TIMESTAMP(event_time, 'yyyy-MM-ddTHH:mm:ss.SSSSSS')

The WITH clause configures:

  • 'connector' = 'kafka'
  • 'properties.bootstrap.servers' -- from environment variable KAFKA_URL
  • 'topic' -- from environment variable KAFKA_TOPIC
  • 'properties.group.id' -- from environment variable KAFKA_GROUP
  • 'properties.security.protocol' = 'SASL_SSL'
  • 'properties.sasl.mechanism' = 'PLAIN'
  • 'properties.sasl.jaas.config' -- constructed from KAFKA_WEB_TRAFFIC_KEY and KAFKA_WEB_TRAFFIC_SECRET
  • 'json.ignore-parse-errors' = 'true'
  • 'format' = 'json'
def create_events_source_kafka(t_env: StreamTableEnvironment) -> str:
    kafka_key = os.environ["KAFKA_WEB_TRAFFIC_KEY"]
    kafka_secret = os.environ["KAFKA_WEB_TRAFFIC_SECRET"]
    table_name = "events"
    source_ddl = f"""
        CREATE TABLE {table_name} (
            url VARCHAR,
            referrer VARCHAR,
            user_agent VARCHAR,
            host VARCHAR,
            ip VARCHAR,
            headers VARCHAR,
            event_time VARCHAR,
            event_timestamp AS TO_TIMESTAMP(event_time,
                'yyyy-MM-dd''T''HH:mm:ss.SSSSSS')
        ) WITH (
            'connector' = 'kafka',
            'properties.bootstrap.servers' = '{os.environ['KAFKA_URL']}',
            'topic' = '{os.environ['KAFKA_TOPIC']}',
            'properties.group.id' = '{os.environ['KAFKA_GROUP']}',
            'properties.security.protocol' = 'SASL_SSL',
            'properties.sasl.mechanism' = 'PLAIN',
            'properties.sasl.jaas.config' =
                'org.apache.kafka.common.security.plain.PlainLoginModule required'
                ' username=\"{kafka_key}\" password=\"{kafka_secret}\";',
            'json.ignore-parse-errors' = 'true',
            'format' = 'json'
        );
    """
    t_env.execute_sql(source_ddl)
    return table_name

Inputs / Outputs

Inputs:

  • t_env -- a StreamTableEnvironment instance
  • Environment variables:
    • KAFKA_URL -- Kafka bootstrap servers
    • KAFKA_TOPIC -- topic name to consume
    • KAFKA_GROUP -- consumer group ID
    • KAFKA_WEB_TRAFFIC_KEY -- SASL username / API key
    • KAFKA_WEB_TRAFFIC_SECRET -- SASL password / API secret

Outputs:

  • Returns the table name string "events"

Related Pages

Metadata

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment