Implementation:Risingwavelabs Risingwave PostgresStreamingChangeEventSource
| Knowledge Sources | |
|---|---|
| Domains | CDC, Connectors, PostgreSQL, Java |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
Patched Debezium streaming change event source for PostgreSQL that handles WAL-based logical replication event processing.
Description
PostgresStreamingChangeEventSource implements Debezium's StreamingChangeEventSource<PostgresPartition, PostgresOffsetContext> interface to drive PostgreSQL CDC event capture in RisingWave. It is the core streaming engine component that connects to a PostgreSQL replication slot, consumes WAL (Write-Ahead Log) events, and dispatches them as change events.
The class manages the following lifecycle:
- Initialization (init): Sets the effective offset context and refreshes the database schema via taskContext.refreshSchema().
- Execution (execute): Establishes a replication stream from the stored LSN (Log Sequence Number) position. It starts a keep-alive thread to prevent connection timeouts, optionally performs WAL position search for catch-up streaming, and then enters the main message processing loop.
- Message Processing (processMessages): Continuously reads from the replication stream using stream.readPending(). Each received message is processed via processReplicationMessages(), which handles three categories:
- Transactional messages (BEGIN/COMMIT): Updates WAL position and dispatches transaction boundary events.
- Logical decoding messages: Dispatches custom logical decoding messages.
- DML events (INSERT/UPDATE/DELETE): Resolves the table ID from the message, updates the WAL position, and dispatches data change events via PostgresChangeRecordEmitter.
- Cleanup (close): Stops the keep-alive thread and closes the replication connection.
Key features include:
- LSN flushing control: The lsnFlushingAllowed flag prevents premature LSN flushing during recovery, only enabling it after the first message is successfully received.
- Commit offset failure handling: The commitOffsetFailure flag triggers graceful termination when an asynchronous offset commit fails.
- WAL backlog monitoring: A counter tracks events received without forwarding, issuing warnings at GROWING_WAL_WARNING_LOG_INTERVAL (10,000 events) to alert about potential WAL growth.
- Connection probing: Periodic connection health checks via connectionProbeTimer to detect stale connections.
- Pause/resume support: The processing loop supports pausing for blocking snapshots and resuming afterward.
Usage
This class is instantiated by the PostgreSQL CDC connector's ChangeEventSourceFactory and executed by the ChangeEventSourceCoordinator during the streaming phase of the CDC pipeline.
Code Reference
Source Location
- Repository: risingwave
- File: java/connector-node/risingwave-source-cdc/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java (L65-757)
Signature
public class PostgresStreamingChangeEventSource
implements StreamingChangeEventSource<PostgresPartition, PostgresOffsetContext> {
public PostgresStreamingChangeEventSource(
PostgresConnectorConfig connectorConfig,
SnapshotterService snapshotterService,
PostgresConnection connection,
PostgresEventDispatcher<TableId> dispatcher,
ErrorHandler errorHandler,
Clock clock,
PostgresSchema schema,
PostgresTaskContext taskContext,
ReplicationConnection replicationConnection);
@Override
public void init(PostgresOffsetContext offsetContext);
@Override
public void execute(ChangeEventSourceContext context,
PostgresPartition partition,
PostgresOffsetContext offsetContext) throws InterruptedException;
@Override
public void close();
}
Import
import io.debezium.connector.postgresql.PostgresStreamingChangeEventSource;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.connection.ReplicationStream;
import io.debezium.connector.postgresql.connection.Lsn;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| connectorConfig | PostgresConnectorConfig | Yes | Configuration for the PostgreSQL CDC connector |
| connection | PostgresConnection | Yes | JDBC connection for schema queries and transaction commits |
| dispatcher | PostgresEventDispatcher<TableId> | Yes | Event dispatcher for forwarding change events and heartbeats |
| replicationConnection | ReplicationConnection | Yes | Logical replication connection for consuming WAL events |
| schema | PostgresSchema | Yes | In-memory schema representation for table resolution |
| offsetContext | PostgresOffsetContext | Yes (for init/execute) | The current offset state including LSN position |
Outputs
| Name | Type | Description |
|---|---|---|
| Change events | Dispatched via EventDispatcher | INSERT, UPDATE, DELETE events dispatched to the RisingWave CDC pipeline |
| Transaction events | Dispatched via EventDispatcher | BEGIN and COMMIT boundary events for transaction tracking |
| Heartbeat events | Dispatched via EventDispatcher | Periodic heartbeat events when no data messages are received |
| Offset updates | PostgresOffsetContext mutations | WAL position (LSN), commit LSN, and transaction metadata updated in the offset context |
Usage Examples
Streaming Lifecycle
// Created by the ChangeEventSourceFactory
PostgresStreamingChangeEventSource streamingSource =
new PostgresStreamingChangeEventSource(
connectorConfig, snapshotterService, connection,
dispatcher, errorHandler, clock, schema,
taskContext, replicationConnection);
// Initialize with saved offset
streamingSource.init(savedOffsetContext);
// Execute streaming (blocks until stopped or error)
streamingSource.execute(context, partition, offsetContext);
// Cleanup
streamingSource.close();
Message Processing Flow
ReplicationStream
|
v
readPending() --> ReplicationMessage
|
+-- isTransactionalMessage? --> updateWalPosition + dispatch BEGIN/COMMIT
|
+-- isLogicalDecodingMessage? --> dispatch LogicalDecodingMessage
|
+-- isDML? --> resolve TableId --> updateWalPosition
--> dispatch DataChangeEvent via PostgresChangeRecordEmitter