Implementation:Risingwavelabs Risingwave JniDbzSourceHandler RunJniDbzSourceThread
| Knowledge Sources | |
|---|---|
| Domains | CDC, JNI, Data_Replication |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
Concrete tool for running Debezium CDC source threads via JNI provided by the RisingWave Java connector node.
Description
JniDbzSourceHandler.runJniDbzSourceThread() is the JNI entry point for starting a CDC capture thread. Called from the Rust engine via JNI, it deserializes the protobuf request, creates a DbzConnectorConfig, launches a DbzCdcEngineRunner, and streams change events back to Rust through a CdcSourceChannel.
Usage
This method is called by the Rust engine when a user creates a CDC source table (e.g., CREATE TABLE ... WITH (connector='mysql-cdc')). It runs in a dedicated thread managed by the JNI runtime.
Code Reference
Source Location
- Repository: risingwave
- File: java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java
- Lines: L61-101
Signature
public class JniDbzSourceHandler {
public static void runJniDbzSourceThread(
byte[] getEventStreamRequestBytes,
long channelPtr
) {
// Deserializes GetEventStreamRequest protobuf
// Creates DbzConnectorConfig
// Launches DbzCdcEngineRunner
// Streams events via CdcSourceChannel(channelPtr)
}
}
Import
import com.risingwave.connector.source.core.JniDbzSourceHandler;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| getEventStreamRequestBytes | byte[] | Yes | Serialized protobuf GetEventStreamRequest containing sourceId, sourceType, and CDC properties |
| channelPtr | long | Yes | JNI pointer to CdcSourceChannel for sending events to Rust |
Outputs
| Name | Type | Description |
|---|---|---|
| CDC events | CdcSourceChannel stream | Change events (insert/update/delete) streamed to Rust via JNI |
| Handler registration | Side effect | Handler registered in JniDbzSourceRegistry for lifecycle management |
Usage Examples
Create MySQL CDC Source
-- This SQL triggers JniDbzSourceHandler on the Java side
CREATE TABLE mysql_orders (
order_id INT PRIMARY KEY,
customer_name VARCHAR,
total_amount DECIMAL
) WITH (
connector = 'mysql-cdc',
hostname = 'mysql',
port = '3306',
username = 'root',
password = '123456',
database.name = 'mydb',
table.name = 'orders',
server.id = '1'
);
Create PostgreSQL CDC Source
CREATE TABLE pg_users (
user_id INT PRIMARY KEY,
username VARCHAR,
email VARCHAR
) WITH (
connector = 'postgres-cdc',
hostname = 'postgres',
port = '5432',
username = 'postgres',
password = '123456',
database.name = 'mydb',
table.name = 'users',
slot.name = 'rw_slot_users'
);