Implementation:Risingwavelabs Risingwave ChangeEventSourceCoordinator
| Knowledge Sources | |
|---|---|
| Domains | CDC, Connectors, Software_Architecture, Java |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
Patched Debezium ChangeEventSourceCoordinator that orchestrates the lifecycle of snapshot and streaming phases for CDC sources.
Description
ChangeEventSourceCoordinator is the core orchestration component controlling the CDC pipeline execution lifecycle in RisingWave. It is a RisingWave-patched override of the upstream Debezium class, annotated @ThreadSafe, that coordinates the sequencing of initial snapshot, streaming, and incremental snapshot phases while managing threads and error handling.
The coordinator manages the following components:
- ChangeEventSourceFactory: Creates snapshot and streaming change event sources.
- EventDispatcher: Routes change events to the downstream pipeline.
- DatabaseSchema: The in-memory schema representation (may be historized).
- SignalProcessor: Handles signal-based actions like blocking snapshots.
- NotificationService: Manages lifecycle notifications.
- ErrorHandler: Propagates errors from background threads.
Startup (start): Initializes metrics (snapshot and streaming), then submits the main execution to a single-threaded executor. The background thread first recovers the historized schema if it exists, then calls executeChangeEventSources() which runs the snapshot phase followed by the streaming phase.
Snapshot Phase (doSnapshot): Executes optional catch-up streaming, then runs the snapshot via snapshotSource.execute(). If the snapshot completes successfully and the schema has table information, it ensures the schema is non-empty.
Streaming Phase (streamEvents): Initializes the streaming source via initStreamEvents() (which creates the streaming source, registers metrics, initializes incremental snapshot support, and emits an initial heartbeat), then registers signal action providers and starts streaming via streamingSource.execute().
Blocking Snapshot (doBlockingSnapshot): Runs on a separate executor. It pauses streaming, executes the snapshot, and then resumes streaming using lock-based coordination (ChangeEventSourceContextImpl).
Offset Commit (commitOffset): Thread-safe forwarding of offset commits to the streaming source, protected by a ReentrantLock.
Shutdown (stop): Sets the running flag to false, gracefully shuts down both executors with a configurable timeout, stops the signal processor and notification service, closes the event dispatcher, and unregisters metrics.
The inner class ChangeEventSourceContextImpl implements ChangeEventSourceContext with Lock/Condition pairs for coordinating pause/resume between the streaming thread and blocking snapshot thread.
Usage
This coordinator is instantiated by the Debezium connector task and is the entry point for the entire CDC pipeline. It is used for both MySQL and PostgreSQL CDC sources in RisingWave.
Code Reference
Source Location
- Repository: risingwave
- File: java/connector-node/risingwave-source-cdc/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java (L82-615)
Signature
@ThreadSafe
public class ChangeEventSourceCoordinator<P extends Partition, O extends OffsetContext> {
public ChangeEventSourceCoordinator(
Offsets<P, O> previousOffsets,
ErrorHandler errorHandler,
Class<? extends SourceConnector> connectorType,
CommonConnectorConfig connectorConfig,
ChangeEventSourceFactory<P, O> changeEventSourceFactory,
ChangeEventSourceMetricsFactory<P> changeEventSourceMetricsFactory,
EventDispatcher<P, ?> eventDispatcher,
DatabaseSchema<?> schema,
SignalProcessor<P, O> signalProcessor,
NotificationService<P, O> notificationService,
SnapshotterService snapshotterService);
public synchronized void start(CdcSourceTaskContext taskContext,
ChangeEventQueueMetrics changeEventQueueMetrics,
EventMetadataProvider metadataProvider);
protected void executeChangeEventSources(
CdcSourceTaskContext taskContext,
SnapshotChangeEventSource<P, O> snapshotSource,
Offsets<P, O> previousOffsets,
AtomicReference<LoggingContext.PreviousContext> previousLogContext,
ChangeEventSourceContext context) throws InterruptedException;
protected void streamEvents(ChangeEventSourceContext context,
P partition, O offsetContext) throws InterruptedException;
protected void initStreamEvents(P partition, O offsetContext)
throws InterruptedException;
public void commitOffset(Map<String, ?> partition, Map<String, ?> offset);
public synchronized void stop() throws InterruptedException;
public void doBlockingSnapshot(P partition, OffsetContext offsetContext,
SnapshotConfiguration snapshotConfiguration);
}
Import
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSourceFactory;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.schema.DatabaseSchema;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| previousOffsets | Offsets<P, O> | Yes | Previously stored offsets for resuming from a checkpoint |
| errorHandler | ErrorHandler | Yes | Handler for propagating errors from background threads |
| connectorConfig | CommonConnectorConfig | Yes | Connector configuration including shutdown timeout and streaming delay |
| changeEventSourceFactory | ChangeEventSourceFactory<P, O> | Yes | Factory for creating snapshot and streaming sources |
| eventDispatcher | EventDispatcher<P, ?> | Yes | Dispatcher for routing change events |
| schema | DatabaseSchema<?> | Yes | In-memory database schema representation |
| signalProcessor | SignalProcessor<P, O> | No | Optional signal processor for handling actions like blocking snapshots |
| taskContext | CdcSourceTaskContext | Yes (for start) | Task context providing logging configuration and metrics |
Outputs
| Name | Type | Description |
|---|---|---|
| Change events | Via EventDispatcher | Snapshot and streaming change events dispatched to the downstream pipeline |
| Metrics | SnapshotChangeEventSourceMetrics, StreamingChangeEventSourceMetrics | Registered JMX metrics for monitoring snapshot and streaming phases |
| Heartbeat | Via EventDispatcher | Initial heartbeat emitted when streaming is established |
| Side effect | Thread management | Background executor threads for snapshot, streaming, and blocking snapshot execution |
Usage Examples
Coordinator Lifecycle
// Create the coordinator
ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> coordinator =
new ChangeEventSourceCoordinator<>(
previousOffsets, errorHandler, PostgresConnector.class,
connectorConfig, changeEventSourceFactory,
metricsFactory, eventDispatcher, schema,
signalProcessor, notificationService, snapshotterService);
// Start the CDC pipeline (non-blocking, runs on background thread)
coordinator.start(taskContext, queueMetrics, metadataProvider);
// Commit offsets during checkpointing
coordinator.commitOffset(partitionMap, offsetMap);
// Stop the pipeline gracefully
coordinator.stop();
Execution Flow
start()
|
+--> [Background Thread]
|
+--> recover schema history (if historized)
|
+--> executeChangeEventSources()
|
+--> doSnapshot()
| |
| +--> executeCatchUpStreaming() (optional)
| +--> snapshotSource.execute()
|
+--> delayStreamingIfNeeded()
|
+--> streamEvents()
|
+--> initStreamEvents()
| +--> create streaming source
| +--> register metrics
| +--> init incremental snapshot
| +--> emit initial heartbeat
|
+--> registerSignalActionsAndStartProcessor()
|
+--> streamingSource.execute() [blocks until stopped]