Implementation:Risingwavelabs Risingwave PostgresConnectorConfig
| Knowledge Sources | |
|---|---|
| Domains | CDC, Connectors, PostgreSQL, Java |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
Patched Debezium PostgresConnectorConfig providing extensive configuration options for PostgreSQL CDC source connectors.
Description
PostgresConnectorConfig extends Debezium's RelationalDatabaseConnectorConfig to define all configuration properties governing PostgreSQL CDC behavior in RisingWave. This is a large (1637-line) configuration class that overrides the upstream Debezium version with RisingWave-specific customizations.
The class defines several enumerated configuration types:
- HStoreHandlingMode: Controls how PostgreSQL hstore columns are represented -- either as JSON strings or as Map objects.
- IntervalHandlingMode: Controls how interval columns are represented -- as numeric microseconds or ISO 8601 strings.
- SnapshotMode: Defines when snapshots are taken -- ALWAYS, INITIAL, NEVER/NO_DATA, INITIAL_ONLY, WHEN_NEEDED, CONFIGURATION_BASED, or CUSTOM.
- SnapshotIsolationMode: Sets the transaction isolation level for snapshots -- SERIALIZABLE, REPEATABLE_READ, READ_COMMITTED, or READ_UNCOMMITTED.
- SecureConnectionMode: Maps to PostgreSQL sslmode settings -- DISABLED, ALLOW, PREFER, REQUIRED, VERIFY_CA, VERIFY_FULL.
- LogicalDecoder: Defines the logical decoding plugin -- PGOUTPUT (using PgOutputMessageDecoder) or DECODERBUFS (using PgProtoMessageDecoder). Each decoder declares whether it supports truncate events and logical decoding messages.
- SchemaRefreshMode: Controls when the in-memory schema cache is refreshed -- COLUMNS_DIFF or variations that account for column type differences.
Each enum implements the EnumeratedValue interface and provides static parse() methods for converting string configuration values.
Usage
This class is instantiated by the PostgreSQL CDC connector during initialization. It is passed to all major components including the schema manager, streaming change event source, replication connection, and message decoder to govern their behavior.
Code Reference
Source Location
- Repository: risingwave
- File: java/connector-node/risingwave-source-cdc/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java (L62-1637)
Signature
public class PostgresConnectorConfig extends RelationalDatabaseConnectorConfig {
public enum HStoreHandlingMode implements EnumeratedValue { JSON, MAP; }
public enum IntervalHandlingMode implements EnumeratedValue { NUMERIC, STRING; }
public enum SnapshotMode implements EnumeratedValue {
ALWAYS, INITIAL, NEVER, NO_DATA, INITIAL_ONLY, WHEN_NEEDED,
CONFIGURATION_BASED, CUSTOM;
}
public enum SnapshotIsolationMode implements EnumeratedValue {
SERIALIZABLE, REPEATABLE_READ, READ_COMMITTED, READ_UNCOMMITTED;
}
public enum SecureConnectionMode implements EnumeratedValue {
DISABLED, ALLOW, PREFER, REQUIRED, VERIFY_CA, VERIFY_FULL;
}
public enum LogicalDecoder implements EnumeratedValue {
PGOUTPUT, DECODERBUFS;
public abstract MessageDecoder messageDecoder(
MessageDecoderContext config, PostgresConnection connection);
public abstract String getPostgresPluginName();
public abstract boolean supportsTruncate();
public abstract boolean supportsLogicalDecodingMessage();
}
public enum SchemaRefreshMode implements EnumeratedValue { COLUMNS_DIFF; }
}
Import
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresConnectorConfig.LogicalDecoder;
import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode;
import io.debezium.connector.postgresql.PostgresConnectorConfig.SecureConnectionMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| config | Configuration | Yes | Debezium Configuration object containing all connector properties |
Outputs
| Name | Type | Description |
|---|---|---|
| Configuration values | Various enums and primitives | Typed accessor methods for all PostgreSQL CDC configuration options |
| LogicalDecoder | MessageDecoder | Factory method on LogicalDecoder enum creates the appropriate message decoder instance |
| JdbcConfiguration | JdbcConfiguration | JDBC connection parameters derived from the connector config |
Usage Examples
Configuration Properties
// Properties typically set by RisingWave CDC engine
Properties props = new Properties();
props.put("connector.class", "io.debezium.connector.postgresql.PostgresConnector");
props.put("plugin.name", "pgoutput");
props.put("slot.name", "rw_cdc_slot_12345");
props.put("publication.name", "rw_publication");
props.put("snapshot.mode", "initial");
props.put("snapshot.isolation.mode", "repeatable_read");
props.put("database.sslmode", "require");
props.put("hstore.handling.mode", "json");
props.put("interval.handling.mode", "numeric");
Accessing Configuration Values
PostgresConnectorConfig config = new PostgresConnectorConfig(Configuration.from(props));
// Get the logical decoder
LogicalDecoder decoder = config.plugin();
MessageDecoder messageDecoder = decoder.messageDecoder(decoderContext, connection);
// Check snapshot mode
SnapshotMode snapshotMode = config.snapshotMode();
// Get SSL mode
SecureConnectionMode sslMode = config.secureConnectionMode();