Implementation:Risingwavelabs Risingwave DbzCdcEngineRunner Start
| Knowledge Sources | |
|---|---|
| Domains | CDC, Data_Consistency |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
Concrete tool for managing the Debezium CDC engine lifecycle including snapshot execution and streaming transition provided by the RisingWave connector node.
Description
DbzCdcEngineRunner manages the lifecycle of a Debezium embedded engine instance. The start() method launches the engine in a dedicated thread, which first executes the initial snapshot (if configured) and then transitions to continuous streaming from the transaction log. The create() factory method builds the engine with proper configuration, offset backing store, and change event consumer.
Usage
This is created and started by JniDbzSourceHandler when processing a CDC source creation request. It manages the engine thread pool and error handling.
Code Reference
Source Location
- Repository: risingwave
- File: java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java
- Lines: L74-157 (create: L74-118, start: L133-157)
Signature
public class DbzCdcEngineRunner {
public static DbzCdcEngineRunner create(
DbzConnectorConfig config,
long channelPtr,
boolean isCdcSourceJob
) {
// Creates DebeziumEngine with config
// Sets up DbzChangeEventConsumer
// Configures offset backing store
}
public boolean start() {
// Submits engine to thread pool
// Returns true if started successfully
}
}
Import
import com.risingwave.connector.source.core.DbzCdcEngineRunner;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| config | DbzConnectorConfig | Yes | Debezium connector configuration (source type, connection, snapshot mode) |
| channelPtr | long | Yes | JNI channel pointer for event delivery |
| isCdcSourceJob | boolean | Yes | Whether this is a shared CDC source job |
Outputs
| Name | Type | Description |
|---|---|---|
| boolean | return value | true if engine started successfully |
| CDC event stream | Side effect | Continuous change events via DbzChangeEventConsumer.handleBatch() |
Usage Examples
Engine Lifecycle (Internal)
// Called internally by JniDbzSourceHandler
DbzConnectorConfig config = new DbzConnectorConfig(
SourceTypeE.MYSQL, // source type
sourceId, // unique source ID
startOffset, // resume position
userProps, // CDC properties
snapshotDone, // skip snapshot if true
isCdcSourceJob // shared source mode
);
DbzCdcEngineRunner runner = DbzCdcEngineRunner.create(config, channelPtr, isCdcSourceJob);
boolean started = runner.start();