Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Risingwavelabs Risingwave PostgresStreamingChangeEventSource

From Leeroopedia


Knowledge Sources
Domains CDC, Connectors, PostgreSQL, Java
Last Updated 2026-02-09 07:00 GMT

Overview

Patched Debezium streaming change event source for PostgreSQL that handles WAL-based logical replication event processing.

Description

PostgresStreamingChangeEventSource implements Debezium's StreamingChangeEventSource<PostgresPartition, PostgresOffsetContext> interface to drive PostgreSQL CDC event capture in RisingWave. It is the core streaming engine component that connects to a PostgreSQL replication slot, consumes WAL (Write-Ahead Log) events, and dispatches them as change events.

The class manages the following lifecycle:

  1. Initialization (init): Sets the effective offset context and refreshes the database schema via taskContext.refreshSchema().
  2. Execution (execute): Establishes a replication stream from the stored LSN (Log Sequence Number) position. It starts a keep-alive thread to prevent connection timeouts, optionally performs WAL position search for catch-up streaming, and then enters the main message processing loop.
  3. Message Processing (processMessages): Continuously reads from the replication stream using stream.readPending(). Each received message is processed via processReplicationMessages(), which handles three categories:
    • Transactional messages (BEGIN/COMMIT): Updates WAL position and dispatches transaction boundary events.
    • Logical decoding messages: Dispatches custom logical decoding messages.
    • DML events (INSERT/UPDATE/DELETE): Resolves the table ID from the message, updates the WAL position, and dispatches data change events via PostgresChangeRecordEmitter.
  1. Cleanup (close): Stops the keep-alive thread and closes the replication connection.

Key features include:

  • LSN flushing control: The lsnFlushingAllowed flag prevents premature LSN flushing during recovery, only enabling it after the first message is successfully received.
  • Commit offset failure handling: The commitOffsetFailure flag triggers graceful termination when an asynchronous offset commit fails.
  • WAL backlog monitoring: A counter tracks events received without forwarding, issuing warnings at GROWING_WAL_WARNING_LOG_INTERVAL (10,000 events) to alert about potential WAL growth.
  • Connection probing: Periodic connection health checks via connectionProbeTimer to detect stale connections.
  • Pause/resume support: The processing loop supports pausing for blocking snapshots and resuming afterward.

Usage

This class is instantiated by the PostgreSQL CDC connector's ChangeEventSourceFactory and executed by the ChangeEventSourceCoordinator during the streaming phase of the CDC pipeline.

Code Reference

Source Location

  • Repository: risingwave
  • File: java/connector-node/risingwave-source-cdc/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java (L65-757)

Signature

public class PostgresStreamingChangeEventSource
        implements StreamingChangeEventSource<PostgresPartition, PostgresOffsetContext> {

    public PostgresStreamingChangeEventSource(
            PostgresConnectorConfig connectorConfig,
            SnapshotterService snapshotterService,
            PostgresConnection connection,
            PostgresEventDispatcher<TableId> dispatcher,
            ErrorHandler errorHandler,
            Clock clock,
            PostgresSchema schema,
            PostgresTaskContext taskContext,
            ReplicationConnection replicationConnection);

    @Override
    public void init(PostgresOffsetContext offsetContext);

    @Override
    public void execute(ChangeEventSourceContext context,
            PostgresPartition partition,
            PostgresOffsetContext offsetContext) throws InterruptedException;

    @Override
    public void close();
}

Import

import io.debezium.connector.postgresql.PostgresStreamingChangeEventSource;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.connection.ReplicationStream;
import io.debezium.connector.postgresql.connection.Lsn;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;

I/O Contract

Inputs

Name Type Required Description
connectorConfig PostgresConnectorConfig Yes Configuration for the PostgreSQL CDC connector
connection PostgresConnection Yes JDBC connection for schema queries and transaction commits
dispatcher PostgresEventDispatcher<TableId> Yes Event dispatcher for forwarding change events and heartbeats
replicationConnection ReplicationConnection Yes Logical replication connection for consuming WAL events
schema PostgresSchema Yes In-memory schema representation for table resolution
offsetContext PostgresOffsetContext Yes (for init/execute) The current offset state including LSN position

Outputs

Name Type Description
Change events Dispatched via EventDispatcher INSERT, UPDATE, DELETE events dispatched to the RisingWave CDC pipeline
Transaction events Dispatched via EventDispatcher BEGIN and COMMIT boundary events for transaction tracking
Heartbeat events Dispatched via EventDispatcher Periodic heartbeat events when no data messages are received
Offset updates PostgresOffsetContext mutations WAL position (LSN), commit LSN, and transaction metadata updated in the offset context

Usage Examples

Streaming Lifecycle

// Created by the ChangeEventSourceFactory
PostgresStreamingChangeEventSource streamingSource =
        new PostgresStreamingChangeEventSource(
                connectorConfig, snapshotterService, connection,
                dispatcher, errorHandler, clock, schema,
                taskContext, replicationConnection);

// Initialize with saved offset
streamingSource.init(savedOffsetContext);

// Execute streaming (blocks until stopped or error)
streamingSource.execute(context, partition, offsetContext);

// Cleanup
streamingSource.close();

Message Processing Flow

ReplicationStream
    |
    v
readPending() --> ReplicationMessage
    |
    +-- isTransactionalMessage? --> updateWalPosition + dispatch BEGIN/COMMIT
    |
    +-- isLogicalDecodingMessage? --> dispatch LogicalDecodingMessage
    |
    +-- isDML? --> resolve TableId --> updateWalPosition
                    --> dispatch DataChangeEvent via PostgresChangeRecordEmitter

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment