Principle:DataExpert io Data engineer handbook JDBC Sink Table Definition
Overview
JDBC Sink Table Definition describes the theory and practice of defining JDBC-backed sink tables in Apache Flink. A JDBC sink table allows Flink to write streaming query results to a relational database (such as PostgreSQL) using standard SQL DDL, enabling seamless integration between stream processing and persistent storage.
DDL-Based JDBC Connector Configuration
In Flink SQL, a JDBC sink table is created using a CREATE TABLE DDL statement. The WITH clause specifies all JDBC connector properties:
- connector -- set to
'jdbc'to use the JDBC connector. - url -- the JDBC connection URL (e.g.,
jdbc:postgresql://host:port/database). - table-name -- the target table in the relational database.
- username -- the database user for authentication.
- password -- the database password for authentication.
- driver -- the fully qualified JDBC driver class name.
CREATE TABLE processed_events (
ip VARCHAR,
event_timestamp TIMESTAMP(3),
referrer VARCHAR,
host VARCHAR,
url VARCHAR,
geodata VARCHAR
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://...',
'table-name' = 'processed_events',
'username' = '...',
'password' = '...',
'driver' = 'org.postgresql.Driver'
);
PostgreSQL Driver Configuration
When targeting PostgreSQL, the JDBC connector requires:
- The
org.postgresql.Driverclass to be available on the Flink classpath. - A JDBC URL in the format
jdbc:postgresql://host:port/database. - Proper credentials with write permissions on the target table.
The target table in PostgreSQL must already exist with a schema compatible with the Flink table definition, or Flink will raise an error at job submission time.
Write Semantics
JDBC sink tables in Flink support:
- Append mode -- each result row is inserted into the database.
- Upsert mode -- if a primary key is defined, Flink will perform upsert (INSERT ... ON CONFLICT UPDATE) operations.
The choice of mode depends on whether the Flink table definition includes a PRIMARY KEY constraint.
When to Use
Use JDBC Sink Table Definition when:
- Persisting streaming query results to a relational database such as PostgreSQL.
- You need to write processed or aggregated data from a Flink pipeline into a JDBC-compatible store.
- The downstream consumers expect data to be available in a traditional SQL database.