Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:DataExpert io Data engineer handbook Kafka Source Table Definition

From Leeroopedia


Overview

Kafka Source Table Definition describes the theory and practice of defining Apache Kafka topics as SQL tables within Apache Flink. By expressing a Kafka source as a DDL statement, Flink's Table API can treat an unbounded stream of records as a queryable table, enabling SQL-based stream processing.

DDL-Based Connector Configuration

In Flink SQL, a Kafka source table is created using a CREATE TABLE DDL statement. The WITH clause specifies all connector-level configuration properties:

  • connector -- set to 'kafka' to use the Kafka connector.
  • bootstrap.servers -- the comma-separated list of Kafka broker addresses.
  • topic -- the Kafka topic to consume from.
  • security.protocol -- the protocol used to communicate with brokers (e.g., SASL_SSL).
  • format -- the deserialization format for record values (e.g., 'json').
  • group.id -- the consumer group identifier.
CREATE TABLE events (
    url VARCHAR,
    referrer VARCHAR,
    user_agent VARCHAR,
    host VARCHAR,
    ip VARCHAR,
    headers VARCHAR,
    event_time VARCHAR,
    event_timestamp AS TO_TIMESTAMP(event_time, 'yyyy-MM-dd''T''HH:mm:ss.SSSSSS')
) WITH (
    'connector' = 'kafka',
    'properties.bootstrap.servers' = '...',
    'topic' = '...',
    'properties.security.protocol' = 'SASL_SSL',
    'json.ignore-parse-errors' = 'true',
    'format' = 'json'
);

Computed Columns

Flink DDL supports computed columns, which are virtual columns derived from other columns using expressions. A common pattern is to parse a string-typed timestamp into a proper TIMESTAMP type using the TO_TIMESTAMP function:

event_timestamp AS TO_TIMESTAMP(event_time, 'yyyy-MM-dd''T''HH:mm:ss.SSSSSS')

This allows downstream queries to work with native timestamp semantics, enabling windowed aggregations and time-based operations without additional casting.

SASL_SSL Authentication with PLAIN Mechanism

When Kafka brokers require authentication, Flink supports SASL_SSL with the PLAIN mechanism. The DDL WITH clause includes:

  • 'properties.security.protocol' = 'SASL_SSL'
  • 'properties.sasl.mechanism' = 'PLAIN'
  • 'properties.sasl.jaas.config' -- a JAAS configuration string embedding the API key and secret.

This ensures that Flink consumers authenticate securely against managed Kafka services (e.g., Confluent Cloud, Amazon MSK with SASL).

When to Use

Use Kafka Source Table Definition when:

  • Consuming streaming events from a Kafka topic in a Flink pipeline.
  • You want to express the source schema declaratively using SQL DDL.
  • You need computed columns (e.g., timestamp parsing) directly in the table definition.
  • You require secure authentication (SASL_SSL/PLAIN) to connect to the Kafka cluster.

Related Pages

Metadata

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment