Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Risingwavelabs Risingwave ChangeEventSourceCoordinator

From Leeroopedia


Knowledge Sources
Domains CDC, Connectors, Software_Architecture, Java
Last Updated 2026-02-09 07:00 GMT

Overview

Patched Debezium ChangeEventSourceCoordinator that orchestrates the lifecycle of snapshot and streaming phases for CDC sources.

Description

ChangeEventSourceCoordinator is the core orchestration component controlling the CDC pipeline execution lifecycle in RisingWave. It is a RisingWave-patched override of the upstream Debezium class, annotated @ThreadSafe, that coordinates the sequencing of initial snapshot, streaming, and incremental snapshot phases while managing threads and error handling.

The coordinator manages the following components:

  • ChangeEventSourceFactory: Creates snapshot and streaming change event sources.
  • EventDispatcher: Routes change events to the downstream pipeline.
  • DatabaseSchema: The in-memory schema representation (may be historized).
  • SignalProcessor: Handles signal-based actions like blocking snapshots.
  • NotificationService: Manages lifecycle notifications.
  • ErrorHandler: Propagates errors from background threads.

Startup (start): Initializes metrics (snapshot and streaming), then submits the main execution to a single-threaded executor. The background thread first recovers the historized schema if it exists, then calls executeChangeEventSources() which runs the snapshot phase followed by the streaming phase.

Snapshot Phase (doSnapshot): Executes optional catch-up streaming, then runs the snapshot via snapshotSource.execute(). If the snapshot completes successfully and the schema has table information, it ensures the schema is non-empty.

Streaming Phase (streamEvents): Initializes the streaming source via initStreamEvents() (which creates the streaming source, registers metrics, initializes incremental snapshot support, and emits an initial heartbeat), then registers signal action providers and starts streaming via streamingSource.execute().

Blocking Snapshot (doBlockingSnapshot): Runs on a separate executor. It pauses streaming, executes the snapshot, and then resumes streaming using lock-based coordination (ChangeEventSourceContextImpl).

Offset Commit (commitOffset): Thread-safe forwarding of offset commits to the streaming source, protected by a ReentrantLock.

Shutdown (stop): Sets the running flag to false, gracefully shuts down both executors with a configurable timeout, stops the signal processor and notification service, closes the event dispatcher, and unregisters metrics.

The inner class ChangeEventSourceContextImpl implements ChangeEventSourceContext with Lock/Condition pairs for coordinating pause/resume between the streaming thread and blocking snapshot thread.

Usage

This coordinator is instantiated by the Debezium connector task and is the entry point for the entire CDC pipeline. It is used for both MySQL and PostgreSQL CDC sources in RisingWave.

Code Reference

Source Location

  • Repository: risingwave
  • File: java/connector-node/risingwave-source-cdc/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java (L82-615)

Signature

@ThreadSafe
public class ChangeEventSourceCoordinator<P extends Partition, O extends OffsetContext> {

    public ChangeEventSourceCoordinator(
            Offsets<P, O> previousOffsets,
            ErrorHandler errorHandler,
            Class<? extends SourceConnector> connectorType,
            CommonConnectorConfig connectorConfig,
            ChangeEventSourceFactory<P, O> changeEventSourceFactory,
            ChangeEventSourceMetricsFactory<P> changeEventSourceMetricsFactory,
            EventDispatcher<P, ?> eventDispatcher,
            DatabaseSchema<?> schema,
            SignalProcessor<P, O> signalProcessor,
            NotificationService<P, O> notificationService,
            SnapshotterService snapshotterService);

    public synchronized void start(CdcSourceTaskContext taskContext,
            ChangeEventQueueMetrics changeEventQueueMetrics,
            EventMetadataProvider metadataProvider);

    protected void executeChangeEventSources(
            CdcSourceTaskContext taskContext,
            SnapshotChangeEventSource<P, O> snapshotSource,
            Offsets<P, O> previousOffsets,
            AtomicReference<LoggingContext.PreviousContext> previousLogContext,
            ChangeEventSourceContext context) throws InterruptedException;

    protected void streamEvents(ChangeEventSourceContext context,
            P partition, O offsetContext) throws InterruptedException;

    protected void initStreamEvents(P partition, O offsetContext)
            throws InterruptedException;

    public void commitOffset(Map<String, ?> partition, Map<String, ?> offset);

    public synchronized void stop() throws InterruptedException;

    public void doBlockingSnapshot(P partition, OffsetContext offsetContext,
            SnapshotConfiguration snapshotConfiguration);
}

Import

import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSourceFactory;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.schema.DatabaseSchema;

I/O Contract

Inputs

Name Type Required Description
previousOffsets Offsets<P, O> Yes Previously stored offsets for resuming from a checkpoint
errorHandler ErrorHandler Yes Handler for propagating errors from background threads
connectorConfig CommonConnectorConfig Yes Connector configuration including shutdown timeout and streaming delay
changeEventSourceFactory ChangeEventSourceFactory<P, O> Yes Factory for creating snapshot and streaming sources
eventDispatcher EventDispatcher<P, ?> Yes Dispatcher for routing change events
schema DatabaseSchema<?> Yes In-memory database schema representation
signalProcessor SignalProcessor<P, O> No Optional signal processor for handling actions like blocking snapshots
taskContext CdcSourceTaskContext Yes (for start) Task context providing logging configuration and metrics

Outputs

Name Type Description
Change events Via EventDispatcher Snapshot and streaming change events dispatched to the downstream pipeline
Metrics SnapshotChangeEventSourceMetrics, StreamingChangeEventSourceMetrics Registered JMX metrics for monitoring snapshot and streaming phases
Heartbeat Via EventDispatcher Initial heartbeat emitted when streaming is established
Side effect Thread management Background executor threads for snapshot, streaming, and blocking snapshot execution

Usage Examples

Coordinator Lifecycle

// Create the coordinator
ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> coordinator =
        new ChangeEventSourceCoordinator<>(
                previousOffsets, errorHandler, PostgresConnector.class,
                connectorConfig, changeEventSourceFactory,
                metricsFactory, eventDispatcher, schema,
                signalProcessor, notificationService, snapshotterService);

// Start the CDC pipeline (non-blocking, runs on background thread)
coordinator.start(taskContext, queueMetrics, metadataProvider);

// Commit offsets during checkpointing
coordinator.commitOffset(partitionMap, offsetMap);

// Stop the pipeline gracefully
coordinator.stop();

Execution Flow

start()
  |
  +--> [Background Thread]
       |
       +--> recover schema history (if historized)
       |
       +--> executeChangeEventSources()
            |
            +--> doSnapshot()
            |    |
            |    +--> executeCatchUpStreaming() (optional)
            |    +--> snapshotSource.execute()
            |
            +--> delayStreamingIfNeeded()
            |
            +--> streamEvents()
                 |
                 +--> initStreamEvents()
                 |    +--> create streaming source
                 |    +--> register metrics
                 |    +--> init incremental snapshot
                 |    +--> emit initial heartbeat
                 |
                 +--> registerSignalActionsAndStartProcessor()
                 |
                 +--> streamingSource.execute() [blocks until stopped]

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment