Implementation:DataExpert io Data engineer handbook Create processed events sink postgres
Appearance
Overview
Create_processed_events_sink_postgres is an API function that creates a Flink SQL sink table backed by a PostgreSQL database via the JDBC connector. It constructs and executes a DDL statement that defines the processed_events table, enabling the Flink streaming pipeline to write enriched event data into PostgreSQL.
Type
API Doc
Source
start_job.py:L36-56
Signature
def create_processed_events_sink_postgres(t_env: StreamTableEnvironment) -> str
Detailed Description
The function builds a CREATE TABLE DDL statement for the processed_events table and executes it against the provided StreamTableEnvironment. The table schema includes:
| Column | Type | Description |
|---|---|---|
| ip | VARCHAR | Client IP address |
| event_timestamp | TIMESTAMP(3) | Parsed event timestamp |
| referrer | VARCHAR | The referring URL |
| host | VARCHAR | The host of the request |
| url | VARCHAR | The URL of the web event |
| geodata | VARCHAR | JSON string with geolocation data |
The WITH clause configures:
'connector' = 'jdbc''url'-- from environment variablePOSTGRES_URL, prefixed withjdbc:postgresql://'table-name' = 'processed_events''username'-- from environment variablePOSTGRES_USER'password'-- from environment variablePOSTGRES_PASSWORD'driver' = 'org.postgresql.Driver'
def create_processed_events_sink_postgres(t_env: StreamTableEnvironment) -> str:
table_name = "processed_events"
sink_ddl = f"""
CREATE TABLE {table_name} (
ip VARCHAR,
event_timestamp TIMESTAMP(3),
referrer VARCHAR,
host VARCHAR,
url VARCHAR,
geodata VARCHAR
) WITH (
'connector' = 'jdbc',
'url' = '{os.environ['POSTGRES_URL']}',
'table-name' = '{table_name}',
'username' = '{os.environ['POSTGRES_USER']}',
'password' = '{os.environ['POSTGRES_PASSWORD']}',
'driver' = 'org.postgresql.Driver'
);
"""
t_env.execute_sql(sink_ddl)
return table_name
Inputs / Outputs
Inputs:
t_env-- aStreamTableEnvironmentinstance- Environment variables:
POSTGRES_URL-- JDBC connection URL for PostgreSQLPOSTGRES_USER-- database usernamePOSTGRES_PASSWORD-- database password
Outputs:
- Returns the table name string
"processed_events"
Related Pages
- Principle:DataExpert_io_Data_engineer_handbook_JDBC_Sink_Table_Definition
- Environment:DataExpert_io_Data_engineer_handbook_Flink_Kafka_Docker_Environment
Metadata
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment