Implementation:Risingwavelabs Risingwave PostgresSchema
| Knowledge Sources | |
|---|---|
| Domains | CDC, Connectors, PostgreSQL, Java |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
Patched Debezium PostgresSchema class managing the in-memory representation of the PostgreSQL database schema for CDC.
Description
PostgresSchema extends Debezium's RelationalDatabaseSchema and implements HistorizedDatabaseSchema<TableId> to manage the in-memory representation of all PostgreSQL tables being tracked by a CDC source. This is a RisingWave-patched override of the upstream Debezium class, customized for RisingWave's CDC integration.
The class maintains several key data structures:
- tableIdToToastableColumns: A map from TableId to a list of column names that use PostgreSQL TOAST storage strategies (x, e, or m). These columns may have their values replaced with unchanged-toast markers during logical replication.
- relationIdToTableId: A map from PostgreSQL OID relation IDs to TableId objects, enabling lookup of tables by their internal database identifiers as received in replication messages.
Key operations include:
- refresh(PostgresConnection, boolean): Reads the complete database schema from the connection, optionally printing replica identity information, and refreshes the internal schema cache.
- refresh(PostgresConnection, TableId, boolean): Refreshes the schema for a single table, with support for removing generated columns (used by the pgoutput decoder).
- applySchemaChangesForTable(int, Table): Called when a RELATION message is received during streaming to update the schema mapping for a specific relation ID.
- tableFor(int): Resolves a Table object from a relation ID, used during INSERT/UPDATE/DELETE message decoding.
- refreshToastableColumnsMap(): Queries pg_attribute to identify columns with TOAST-eligible storage strategies.
The class also includes a companion PostgresSchemaHistory class -- a no-op implementation of SchemaHistory that returns true for exists() and storageExists() since PostgreSQL does not use DDL-based schema history like MySQL.
Usage
This class is instantiated during PostgreSQL CDC source initialization and is used throughout the streaming lifecycle to resolve table schemas for incoming replication messages.
Code Reference
Source Location
- Repository: risingwave
- File: java/connector-node/risingwave-source-cdc/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java (L69-479)
Signature
@NotThreadSafe
public class PostgresSchema extends RelationalDatabaseSchema
implements HistorizedDatabaseSchema<TableId> {
protected static final String PUBLIC_SCHEMA_NAME = "public";
protected PostgresSchema(PostgresConnectorConfig config,
PostgresDefaultValueConverter defaultValueConverter,
TopicNamingStrategy<TableId> topicNamingStrategy,
PostgresValueConverter valueConverter);
protected PostgresSchema refresh(PostgresConnection connection,
boolean printReplicaIdentityInfo) throws SQLException;
protected void refresh(PostgresConnection connection, TableId tableId,
boolean refreshToastableColumns) throws SQLException;
protected void refreshFromIncrementalSnapshot(
PostgresConnection connection, TableId tableId) throws SQLException;
public void applySchemaChangesForTable(int relationId, Table table);
public Table tableFor(int relationId);
public List<String> getToastableColumnsForTableId(TableId tableId);
protected static TableId parse(String table);
@Override
public SchemaHistory getSchemaHistory();
@Override
public boolean isHistorized();
}
Import
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.HistorizedDatabaseSchema;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| config | PostgresConnectorConfig | Yes | Connector configuration including table filters, column filters, and key mapper |
| defaultValueConverter | PostgresDefaultValueConverter | Yes | Converter for PostgreSQL default column values |
| topicNamingStrategy | TopicNamingStrategy<TableId> | Yes | Strategy for naming Kafka topics from table identifiers |
| valueConverter | PostgresValueConverter | Yes | Converter for PostgreSQL column values to Kafka Connect types |
| connection | PostgresConnection | Yes (for refresh) | JDBC connection used to read schema metadata |
| relationId | int | Yes (for applySchemaChangesForTable) | PostgreSQL OID for the table relation |
Outputs
| Name | Type | Description |
|---|---|---|
| Table | Table | Resolved table definition including columns, types, and primary keys |
| toastableColumns | List<String> | Column names that use TOAST storage strategies for a given table |
| Schema | Kafka Connect Schema | The Kafka Connect schema for a specific table, derived from the table definition |
| SchemaHistory | SchemaHistory | A no-op PostgresSchemaHistory instance (PostgreSQL does not need DDL-based history) |
Usage Examples
Initializing and Refreshing Schema
// Create schema during task initialization
PostgresSchema schema = new PostgresSchema(
connectorConfig, defaultValueConverter,
topicNamingStrategy, valueConverter);
// Load all table schemas from the database
schema.refresh(connection, true);
// Resolve a table by relation ID during streaming
Table table = schema.tableFor(relationId);
// Get TOAST-able columns for unchanged-toast handling
List<String> toastCols = schema.getToastableColumnsForTableId(tableId);
Applying Schema Changes from Replication Stream
// When a RELATION message is received during pgoutput decoding
int relationId = buffer.getInt();
Table table = resolveRelationFromMetadata(metadata);
schema.applySchemaChangesForTable(relationId, table);
// Subsequent INSERT/UPDATE/DELETE can resolve the table
Table resolvedTable = schema.tableFor(relationId);