Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:DataExpert io Data engineer handbook Create processed events sink postgres

From Leeroopedia


Overview

Create_processed_events_sink_postgres is an API function that creates a Flink SQL sink table backed by a PostgreSQL database via the JDBC connector. It constructs and executes a DDL statement that defines the processed_events table, enabling the Flink streaming pipeline to write enriched event data into PostgreSQL.

Type

API Doc

Source

start_job.py:L36-56

Signature

def create_processed_events_sink_postgres(t_env: StreamTableEnvironment) -> str

Detailed Description

The function builds a CREATE TABLE DDL statement for the processed_events table and executes it against the provided StreamTableEnvironment. The table schema includes:

Column Type Description
ip VARCHAR Client IP address
event_timestamp TIMESTAMP(3) Parsed event timestamp
referrer VARCHAR The referring URL
host VARCHAR The host of the request
url VARCHAR The URL of the web event
geodata VARCHAR JSON string with geolocation data

The WITH clause configures:

  • 'connector' = 'jdbc'
  • 'url' -- from environment variable POSTGRES_URL, prefixed with jdbc:postgresql://
  • 'table-name' = 'processed_events'
  • 'username' -- from environment variable POSTGRES_USER
  • 'password' -- from environment variable POSTGRES_PASSWORD
  • 'driver' = 'org.postgresql.Driver'
def create_processed_events_sink_postgres(t_env: StreamTableEnvironment) -> str:
    table_name = "processed_events"
    sink_ddl = f"""
        CREATE TABLE {table_name} (
            ip VARCHAR,
            event_timestamp TIMESTAMP(3),
            referrer VARCHAR,
            host VARCHAR,
            url VARCHAR,
            geodata VARCHAR
        ) WITH (
            'connector' = 'jdbc',
            'url' = '{os.environ['POSTGRES_URL']}',
            'table-name' = '{table_name}',
            'username' = '{os.environ['POSTGRES_USER']}',
            'password' = '{os.environ['POSTGRES_PASSWORD']}',
            'driver' = 'org.postgresql.Driver'
        );
    """
    t_env.execute_sql(sink_ddl)
    return table_name

Inputs / Outputs

Inputs:

  • t_env -- a StreamTableEnvironment instance
  • Environment variables:
    • POSTGRES_URL -- JDBC connection URL for PostgreSQL
    • POSTGRES_USER -- database username
    • POSTGRES_PASSWORD -- database password

Outputs:

  • Returns the table name string "processed_events"

Related Pages

Metadata

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment