Principle:DataExpert io Data engineer handbook Kafka Source Table Definition
Overview
Kafka Source Table Definition describes the theory and practice of defining Apache Kafka topics as SQL tables within Apache Flink. By expressing a Kafka source as a DDL statement, Flink's Table API can treat an unbounded stream of records as a queryable table, enabling SQL-based stream processing.
DDL-Based Connector Configuration
In Flink SQL, a Kafka source table is created using a CREATE TABLE DDL statement. The WITH clause specifies all connector-level configuration properties:
- connector -- set to
'kafka'to use the Kafka connector. - bootstrap.servers -- the comma-separated list of Kafka broker addresses.
- topic -- the Kafka topic to consume from.
- security.protocol -- the protocol used to communicate with brokers (e.g.,
SASL_SSL). - format -- the deserialization format for record values (e.g.,
'json'). - group.id -- the consumer group identifier.
CREATE TABLE events (
url VARCHAR,
referrer VARCHAR,
user_agent VARCHAR,
host VARCHAR,
ip VARCHAR,
headers VARCHAR,
event_time VARCHAR,
event_timestamp AS TO_TIMESTAMP(event_time, 'yyyy-MM-dd''T''HH:mm:ss.SSSSSS')
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '...',
'topic' = '...',
'properties.security.protocol' = 'SASL_SSL',
'json.ignore-parse-errors' = 'true',
'format' = 'json'
);
Computed Columns
Flink DDL supports computed columns, which are virtual columns derived from other columns using expressions. A common pattern is to parse a string-typed timestamp into a proper TIMESTAMP type using the TO_TIMESTAMP function:
event_timestamp AS TO_TIMESTAMP(event_time, 'yyyy-MM-dd''T''HH:mm:ss.SSSSSS')
This allows downstream queries to work with native timestamp semantics, enabling windowed aggregations and time-based operations without additional casting.
SASL_SSL Authentication with PLAIN Mechanism
When Kafka brokers require authentication, Flink supports SASL_SSL with the PLAIN mechanism. The DDL WITH clause includes:
'properties.security.protocol' = 'SASL_SSL''properties.sasl.mechanism' = 'PLAIN''properties.sasl.jaas.config'-- a JAAS configuration string embedding the API key and secret.
This ensures that Flink consumers authenticate securely against managed Kafka services (e.g., Confluent Cloud, Amazon MSK with SASL).
When to Use
Use Kafka Source Table Definition when:
- Consuming streaming events from a Kafka topic in a Flink pipeline.
- You want to express the source schema declaratively using SQL DDL.
- You need computed columns (e.g., timestamp parsing) directly in the table definition.
- You require secure authentication (SASL_SSL/PLAIN) to connect to the Kafka cluster.