Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Risingwavelabs Risingwave DbzChangeEventConsumer

From Leeroopedia


Knowledge Sources
Domains CDC, Connectors, Debezium, gRPC
Language Java
Lines 378
Last Updated 2026-02-09 07:00 GMT

Overview

Consumes Debezium change events and converts them into RisingWave GetEventStreamResponse protobuf messages, handling DATA, HEARTBEAT, TRANSACTION, and SCHEMA_CHANGE event types.

Description

DbzChangeEventConsumer implements Debezium's ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>> interface and serves as the critical bridge between Debezium's internal change event format and RisingWave's CDC protobuf event stream protocol.

The class performs the following operations:

  1. Event classification -- Each incoming SourceRecord is classified into one of four EventType values based on its topic:
    • HEARTBEAT -- Topic starts with the configured heartbeat topic prefix.
    • TRANSACTION -- Topic matches the transaction topic name.
    • SCHEMA_CHANGE -- Topic matches the schema change topic name.
    • DATA -- All other events (actual data change events).
  1. Offset serialization -- For every event, a DebeziumOffset is constructed from the source partition and offset, then serialized to JSON via DebeziumOffsetSerializer.
  1. Payload conversion -- The record value is converted to JSON using a custom DbzJsonConverter (which includes the schema field for proper downstream parsing). Keys are similarly converted for DATA events.
  1. Batch handling (handleBatch) -- Events are processed in batches:
    • DATA events extract the full table name from the topic (stripping the topic prefix) and include key, payload, and source timestamp.
    • SCHEMA_CHANGE events are emitted as individual responses immediately (flushing the current batch).
    • HEARTBEAT events include only the offset.
    • TRANSACTION events include the payload and transaction timestamp.
    • All messages are placed on a BlockingQueue<GetEventStreamResponse> for downstream consumption.
  1. Offset commitment (commitOffset) -- For Postgres sources, offset commitment is deferred until the epoch is committed. The commitOffset method creates a synthetic SourceRecord wrapper and calls the Debezium committer. It includes adjustSourceOffset to fix LSN type coercion issues (Integer to Long) for Postgres offset keys.

Usage

Instantiated by DbzCdcEngineRunner when starting a Debezium embedded engine for CDC source processing.

Code Reference

Source Location

  • Repository: risingwave
  • File: java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzChangeEventConsumer.java
  • Lines: 1-378

Signature

enum EventType {
    HEARTBEAT,
    TRANSACTION,
    DATA,
    SCHEMA_CHANGE,
}

public class DbzChangeEventConsumer
        implements DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>> {

    DbzChangeEventConsumer(
            SourceTypeE connector,
            long sourceId,
            String heartbeatTopicPrefix,
            String transactionTopic,
            String schemaChangeTopic,
            BlockingQueue<GetEventStreamResponse> queue);

    @Override
    public void handleBatch(
            List<ChangeEvent<SourceRecord, SourceRecord>> events,
            DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> committer)
            throws InterruptedException;

    public BlockingQueue<GetEventStreamResponse> getOutputChannel();

    public void commitOffset(DebeziumOffset offset) throws InterruptedException;

    private Map<String, Object> adjustSourceOffset(Map<String, Object> sourceOffset);
}

Import

import com.risingwave.connector.source.core.DbzChangeEventConsumer;

I/O Contract

Inputs

Name Type Required Description
connector SourceTypeE Yes Source database type (MYSQL, POSTGRES, CITUS, MONGODB, SQL_SERVER)
sourceId long Yes Unique identifier for this CDC source, used as partition identifier in messages
heartbeatTopicPrefix String Yes Topic prefix for identifying heartbeat events
transactionTopic String Yes Topic name for transaction metadata events
schemaChangeTopic String Yes Topic name for schema change events
queue BlockingQueue<GetEventStreamResponse> Yes Output queue for converted protobuf responses
events List<ChangeEvent<SourceRecord, SourceRecord>> Yes Batch of Debezium change events to process (passed to handleBatch)

Outputs

Name Type Description
GetEventStreamResponse Protobuf message Batched CDC messages placed on the output BlockingQueue, containing CdcMessage entries with offset, payload, key, source type, message type, full table name, and source timestamp

Event Type Mapping

EventType CdcMessageType Payload Content
HEARTBEAT HEARTBEAT Offset only (no payload)
TRANSACTION TRANSACTION_META JSON-serialized transaction metadata with ts_ms
SCHEMA_CHANGE SCHEMA_CHANGE JSON-serialized schema change event (emitted as individual response)
DATA DATA JSON-serialized data change event with key, full table name, and source timestamp

Usage Examples

Creating a Consumer for MySQL CDC

BlockingQueue<GetEventStreamResponse> queue = new LinkedBlockingQueue<>();

DbzChangeEventConsumer consumer = new DbzChangeEventConsumer(
        SourceTypeE.MYSQL,
        sourceId,
        "__debezium-heartbeat",
        "server1.transaction",
        "server1.schema-changes",
        queue);

// The consumer is passed to the Debezium EmbeddedEngine builder
// Events are consumed from the queue by the gRPC response stream
GetEventStreamResponse response = queue.take();

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment