Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Risingwavelabs Risingwave PostgresConnectorConfig

From Leeroopedia


Knowledge Sources
Domains CDC, Connectors, PostgreSQL, Java
Last Updated 2026-02-09 07:00 GMT

Overview

Patched Debezium PostgresConnectorConfig providing extensive configuration options for PostgreSQL CDC source connectors.

Description

PostgresConnectorConfig extends Debezium's RelationalDatabaseConnectorConfig to define all configuration properties governing PostgreSQL CDC behavior in RisingWave. This is a large (1637-line) configuration class that overrides the upstream Debezium version with RisingWave-specific customizations.

The class defines several enumerated configuration types:

  • HStoreHandlingMode: Controls how PostgreSQL hstore columns are represented -- either as JSON strings or as Map objects.
  • IntervalHandlingMode: Controls how interval columns are represented -- as numeric microseconds or ISO 8601 strings.
  • SnapshotMode: Defines when snapshots are taken -- ALWAYS, INITIAL, NEVER/NO_DATA, INITIAL_ONLY, WHEN_NEEDED, CONFIGURATION_BASED, or CUSTOM.
  • SnapshotIsolationMode: Sets the transaction isolation level for snapshots -- SERIALIZABLE, REPEATABLE_READ, READ_COMMITTED, or READ_UNCOMMITTED.
  • SecureConnectionMode: Maps to PostgreSQL sslmode settings -- DISABLED, ALLOW, PREFER, REQUIRED, VERIFY_CA, VERIFY_FULL.
  • LogicalDecoder: Defines the logical decoding plugin -- PGOUTPUT (using PgOutputMessageDecoder) or DECODERBUFS (using PgProtoMessageDecoder). Each decoder declares whether it supports truncate events and logical decoding messages.
  • SchemaRefreshMode: Controls when the in-memory schema cache is refreshed -- COLUMNS_DIFF or variations that account for column type differences.

Each enum implements the EnumeratedValue interface and provides static parse() methods for converting string configuration values.

Usage

This class is instantiated by the PostgreSQL CDC connector during initialization. It is passed to all major components including the schema manager, streaming change event source, replication connection, and message decoder to govern their behavior.

Code Reference

Source Location

  • Repository: risingwave
  • File: java/connector-node/risingwave-source-cdc/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java (L62-1637)

Signature

public class PostgresConnectorConfig extends RelationalDatabaseConnectorConfig {

    public enum HStoreHandlingMode implements EnumeratedValue { JSON, MAP; }
    public enum IntervalHandlingMode implements EnumeratedValue { NUMERIC, STRING; }
    public enum SnapshotMode implements EnumeratedValue {
        ALWAYS, INITIAL, NEVER, NO_DATA, INITIAL_ONLY, WHEN_NEEDED,
        CONFIGURATION_BASED, CUSTOM;
    }
    public enum SnapshotIsolationMode implements EnumeratedValue {
        SERIALIZABLE, REPEATABLE_READ, READ_COMMITTED, READ_UNCOMMITTED;
    }
    public enum SecureConnectionMode implements EnumeratedValue {
        DISABLED, ALLOW, PREFER, REQUIRED, VERIFY_CA, VERIFY_FULL;
    }
    public enum LogicalDecoder implements EnumeratedValue {
        PGOUTPUT, DECODERBUFS;
        public abstract MessageDecoder messageDecoder(
                MessageDecoderContext config, PostgresConnection connection);
        public abstract String getPostgresPluginName();
        public abstract boolean supportsTruncate();
        public abstract boolean supportsLogicalDecodingMessage();
    }
    public enum SchemaRefreshMode implements EnumeratedValue { COLUMNS_DIFF; }
}

Import

import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresConnectorConfig.LogicalDecoder;
import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode;
import io.debezium.connector.postgresql.PostgresConnectorConfig.SecureConnectionMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig;

I/O Contract

Inputs

Name Type Required Description
config Configuration Yes Debezium Configuration object containing all connector properties

Outputs

Name Type Description
Configuration values Various enums and primitives Typed accessor methods for all PostgreSQL CDC configuration options
LogicalDecoder MessageDecoder Factory method on LogicalDecoder enum creates the appropriate message decoder instance
JdbcConfiguration JdbcConfiguration JDBC connection parameters derived from the connector config

Usage Examples

Configuration Properties

// Properties typically set by RisingWave CDC engine
Properties props = new Properties();
props.put("connector.class", "io.debezium.connector.postgresql.PostgresConnector");
props.put("plugin.name", "pgoutput");
props.put("slot.name", "rw_cdc_slot_12345");
props.put("publication.name", "rw_publication");
props.put("snapshot.mode", "initial");
props.put("snapshot.isolation.mode", "repeatable_read");
props.put("database.sslmode", "require");
props.put("hstore.handling.mode", "json");
props.put("interval.handling.mode", "numeric");

Accessing Configuration Values

PostgresConnectorConfig config = new PostgresConnectorConfig(Configuration.from(props));

// Get the logical decoder
LogicalDecoder decoder = config.plugin();
MessageDecoder messageDecoder = decoder.messageDecoder(decoderContext, connection);

// Check snapshot mode
SnapshotMode snapshotMode = config.snapshotMode();

// Get SSL mode
SecureConnectionMode sslMode = config.secureConnectionMode();

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment