Implementation:Risingwavelabs Risingwave DbzChangeEventConsumer
| Knowledge Sources | |
|---|---|
| Domains | CDC, Connectors, Debezium, gRPC |
| Language | Java |
| Lines | 378 |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
Consumes Debezium change events and converts them into RisingWave GetEventStreamResponse protobuf messages, handling DATA, HEARTBEAT, TRANSACTION, and SCHEMA_CHANGE event types.
Description
DbzChangeEventConsumer implements Debezium's ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>> interface and serves as the critical bridge between Debezium's internal change event format and RisingWave's CDC protobuf event stream protocol.
The class performs the following operations:
- Event classification -- Each incoming SourceRecord is classified into one of four EventType values based on its topic:
- HEARTBEAT -- Topic starts with the configured heartbeat topic prefix.
- TRANSACTION -- Topic matches the transaction topic name.
- SCHEMA_CHANGE -- Topic matches the schema change topic name.
- DATA -- All other events (actual data change events).
- Offset serialization -- For every event, a DebeziumOffset is constructed from the source partition and offset, then serialized to JSON via DebeziumOffsetSerializer.
- Payload conversion -- The record value is converted to JSON using a custom DbzJsonConverter (which includes the schema field for proper downstream parsing). Keys are similarly converted for DATA events.
- Batch handling (handleBatch) -- Events are processed in batches:
- DATA events extract the full table name from the topic (stripping the topic prefix) and include key, payload, and source timestamp.
- SCHEMA_CHANGE events are emitted as individual responses immediately (flushing the current batch).
- HEARTBEAT events include only the offset.
- TRANSACTION events include the payload and transaction timestamp.
- All messages are placed on a BlockingQueue<GetEventStreamResponse> for downstream consumption.
- Offset commitment (commitOffset) -- For Postgres sources, offset commitment is deferred until the epoch is committed. The commitOffset method creates a synthetic SourceRecord wrapper and calls the Debezium committer. It includes adjustSourceOffset to fix LSN type coercion issues (Integer to Long) for Postgres offset keys.
Usage
Instantiated by DbzCdcEngineRunner when starting a Debezium embedded engine for CDC source processing.
Code Reference
Source Location
- Repository: risingwave
- File: java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzChangeEventConsumer.java
- Lines: 1-378
Signature
enum EventType {
HEARTBEAT,
TRANSACTION,
DATA,
SCHEMA_CHANGE,
}
public class DbzChangeEventConsumer
implements DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>> {
DbzChangeEventConsumer(
SourceTypeE connector,
long sourceId,
String heartbeatTopicPrefix,
String transactionTopic,
String schemaChangeTopic,
BlockingQueue<GetEventStreamResponse> queue);
@Override
public void handleBatch(
List<ChangeEvent<SourceRecord, SourceRecord>> events,
DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> committer)
throws InterruptedException;
public BlockingQueue<GetEventStreamResponse> getOutputChannel();
public void commitOffset(DebeziumOffset offset) throws InterruptedException;
private Map<String, Object> adjustSourceOffset(Map<String, Object> sourceOffset);
}
Import
import com.risingwave.connector.source.core.DbzChangeEventConsumer;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| connector | SourceTypeE | Yes | Source database type (MYSQL, POSTGRES, CITUS, MONGODB, SQL_SERVER) |
| sourceId | long | Yes | Unique identifier for this CDC source, used as partition identifier in messages |
| heartbeatTopicPrefix | String | Yes | Topic prefix for identifying heartbeat events |
| transactionTopic | String | Yes | Topic name for transaction metadata events |
| schemaChangeTopic | String | Yes | Topic name for schema change events |
| queue | BlockingQueue<GetEventStreamResponse> | Yes | Output queue for converted protobuf responses |
| events | List<ChangeEvent<SourceRecord, SourceRecord>> | Yes | Batch of Debezium change events to process (passed to handleBatch) |
Outputs
| Name | Type | Description |
|---|---|---|
| GetEventStreamResponse | Protobuf message | Batched CDC messages placed on the output BlockingQueue, containing CdcMessage entries with offset, payload, key, source type, message type, full table name, and source timestamp |
Event Type Mapping
| EventType | CdcMessageType | Payload Content |
|---|---|---|
| HEARTBEAT | HEARTBEAT | Offset only (no payload) |
| TRANSACTION | TRANSACTION_META | JSON-serialized transaction metadata with ts_ms |
| SCHEMA_CHANGE | SCHEMA_CHANGE | JSON-serialized schema change event (emitted as individual response) |
| DATA | DATA | JSON-serialized data change event with key, full table name, and source timestamp |
Usage Examples
Creating a Consumer for MySQL CDC
BlockingQueue<GetEventStreamResponse> queue = new LinkedBlockingQueue<>();
DbzChangeEventConsumer consumer = new DbzChangeEventConsumer(
SourceTypeE.MYSQL,
sourceId,
"__debezium-heartbeat",
"server1.transaction",
"server1.schema-changes",
queue);
// The consumer is passed to the Debezium EmbeddedEngine builder
// Events are consumed from the queue by the gRPC response stream
GetEventStreamResponse response = queue.take();