Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Risingwavelabs Risingwave PostgresSchema

From Leeroopedia


Knowledge Sources
Domains CDC, Connectors, PostgreSQL, Java
Last Updated 2026-02-09 07:00 GMT

Overview

Patched Debezium PostgresSchema class managing the in-memory representation of the PostgreSQL database schema for CDC.

Description

PostgresSchema extends Debezium's RelationalDatabaseSchema and implements HistorizedDatabaseSchema<TableId> to manage the in-memory representation of all PostgreSQL tables being tracked by a CDC source. This is a RisingWave-patched override of the upstream Debezium class, customized for RisingWave's CDC integration.

The class maintains several key data structures:

  • tableIdToToastableColumns: A map from TableId to a list of column names that use PostgreSQL TOAST storage strategies (x, e, or m). These columns may have their values replaced with unchanged-toast markers during logical replication.
  • relationIdToTableId: A map from PostgreSQL OID relation IDs to TableId objects, enabling lookup of tables by their internal database identifiers as received in replication messages.

Key operations include:

  • refresh(PostgresConnection, boolean): Reads the complete database schema from the connection, optionally printing replica identity information, and refreshes the internal schema cache.
  • refresh(PostgresConnection, TableId, boolean): Refreshes the schema for a single table, with support for removing generated columns (used by the pgoutput decoder).
  • applySchemaChangesForTable(int, Table): Called when a RELATION message is received during streaming to update the schema mapping for a specific relation ID.
  • tableFor(int): Resolves a Table object from a relation ID, used during INSERT/UPDATE/DELETE message decoding.
  • refreshToastableColumnsMap(): Queries pg_attribute to identify columns with TOAST-eligible storage strategies.

The class also includes a companion PostgresSchemaHistory class -- a no-op implementation of SchemaHistory that returns true for exists() and storageExists() since PostgreSQL does not use DDL-based schema history like MySQL.

Usage

This class is instantiated during PostgreSQL CDC source initialization and is used throughout the streaming lifecycle to resolve table schemas for incoming replication messages.

Code Reference

Source Location

  • Repository: risingwave
  • File: java/connector-node/risingwave-source-cdc/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java (L69-479)

Signature

@NotThreadSafe
public class PostgresSchema extends RelationalDatabaseSchema
        implements HistorizedDatabaseSchema<TableId> {

    protected static final String PUBLIC_SCHEMA_NAME = "public";

    protected PostgresSchema(PostgresConnectorConfig config,
            PostgresDefaultValueConverter defaultValueConverter,
            TopicNamingStrategy<TableId> topicNamingStrategy,
            PostgresValueConverter valueConverter);

    protected PostgresSchema refresh(PostgresConnection connection,
            boolean printReplicaIdentityInfo) throws SQLException;

    protected void refresh(PostgresConnection connection, TableId tableId,
            boolean refreshToastableColumns) throws SQLException;

    protected void refreshFromIncrementalSnapshot(
            PostgresConnection connection, TableId tableId) throws SQLException;

    public void applySchemaChangesForTable(int relationId, Table table);
    public Table tableFor(int relationId);
    public List<String> getToastableColumnsForTableId(TableId tableId);

    protected static TableId parse(String table);

    @Override
    public SchemaHistory getSchemaHistory();

    @Override
    public boolean isHistorized();
}

Import

import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.HistorizedDatabaseSchema;

I/O Contract

Inputs

Name Type Required Description
config PostgresConnectorConfig Yes Connector configuration including table filters, column filters, and key mapper
defaultValueConverter PostgresDefaultValueConverter Yes Converter for PostgreSQL default column values
topicNamingStrategy TopicNamingStrategy<TableId> Yes Strategy for naming Kafka topics from table identifiers
valueConverter PostgresValueConverter Yes Converter for PostgreSQL column values to Kafka Connect types
connection PostgresConnection Yes (for refresh) JDBC connection used to read schema metadata
relationId int Yes (for applySchemaChangesForTable) PostgreSQL OID for the table relation

Outputs

Name Type Description
Table Table Resolved table definition including columns, types, and primary keys
toastableColumns List<String> Column names that use TOAST storage strategies for a given table
Schema Kafka Connect Schema The Kafka Connect schema for a specific table, derived from the table definition
SchemaHistory SchemaHistory A no-op PostgresSchemaHistory instance (PostgreSQL does not need DDL-based history)

Usage Examples

Initializing and Refreshing Schema

// Create schema during task initialization
PostgresSchema schema = new PostgresSchema(
        connectorConfig, defaultValueConverter,
        topicNamingStrategy, valueConverter);

// Load all table schemas from the database
schema.refresh(connection, true);

// Resolve a table by relation ID during streaming
Table table = schema.tableFor(relationId);

// Get TOAST-able columns for unchanged-toast handling
List<String> toastCols = schema.getToastableColumnsForTableId(tableId);

Applying Schema Changes from Replication Stream

// When a RELATION message is received during pgoutput decoding
int relationId = buffer.getInt();
Table table = resolveRelationFromMetadata(metadata);
schema.applySchemaChangesForTable(relationId, table);

// Subsequent INSERT/UPDATE/DELETE can resolve the table
Table resolvedTable = schema.tableFor(relationId);

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment