Implementation:Risingwavelabs Risingwave DbzSourceHandler
| Knowledge Sources | |
|---|---|
| Domains | Connectors, CDC, Streaming, gRPC, Debezium |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
gRPC-based source handler that starts a Debezium CDC engine and streams change events to gRPC clients with backpressure support.
Description
The DbzSourceHandler class implements the SourceHandler interface to manage the lifecycle of Debezium CDC engine instances and stream their output over gRPC. It consists of two key components:
DbzSourceHandler (outer class): Holds the DbzConnectorConfig and implements the startSource method. This method creates a DbzCdcEngineRunner via the static factory method, starts the engine, configures an OnReadyHandler callback on the gRPC response observer, disables auto-request (enabling manual flow control), and initiates the first polling cycle. If the engine runner fails to create, it completes the response immediately. If any error occurs during startup, it stops the runner and logs the failure.
OnReadyHandler (inner class): Implements Runnable and serves as the gRPC flow-control callback. When invoked, it enters a polling loop that:
- Checks if the gRPC context is cancelled (client disconnected), stopping the engine if so.
- Checks if the response observer is ready to accept more data (backpressure check).
- Polls the engine output channel with a 500ms timeout to retrieve batched CDC events.
- Sends the event batch to the gRPC client and increments the sourceRowsReceived metric via ConnectorNodeMetrics.
- Returns immediately if backpressure is detected (observer not ready), allowing gRPC to re-invoke the handler when capacity is available.
Usage
This handler is instantiated when the Rust streaming engine requests a CDC event stream from the Java connector service. The handler runs the Debezium engine and feeds events back to the Rust side through the gRPC stream.
Code Reference
Source Location
- Repository: risingwave
- File: java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzSourceHandler.java
- Lines: 118
Signature
public class DbzSourceHandler implements SourceHandler {
public DbzSourceHandler(DbzConnectorConfig config) { ... }
@Override
public void startSource(ServerCallStreamObserver<GetEventStreamResponse> responseObserver) { ... }
class OnReadyHandler implements Runnable {
public OnReadyHandler(
DbzCdcEngineRunner runner,
ServerCallStreamObserver<GetEventStreamResponse> responseObserver) { ... }
@Override
public void run() { ... }
}
}
Import
import com.risingwave.connector.source.core.DbzSourceHandler;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| config | DbzConnectorConfig | Yes (constructor) | Debezium connector configuration including source type, source ID, and all connection properties |
| responseObserver | ServerCallStreamObserver<GetEventStreamResponse> | Yes (startSource) | gRPC stream observer for sending CDC events back to the client |
Outputs
| Name | Type | Description |
|---|---|---|
| GetEventStreamResponse | gRPC stream messages | Batched CDC events containing change records from the upstream database |
Flow Control
The handler implements gRPC backpressure by:
- Disabling auto-request via responseObserver.disableAutoRequest()
- Setting the OnReadyHandler as the callback via responseObserver.setOnReadyHandler()
- Checking responseObserver.isReady() before each send
- Returning from the handler when the observer is not ready (backpressure detected)
Usage Examples
Starting a CDC Source Handler
DbzConnectorConfig config = new DbzConnectorConfig(
SourceTypeE.POSTGRES,
sourceId,
"postgres-server",
properties,
isBackfillTable
);
DbzSourceHandler handler = new DbzSourceHandler(config);
handler.startSource(responseObserver);
CDC Source SQL Trigger
-- Creating a CDC source triggers DbzSourceHandler on the Java side
CREATE TABLE orders_cdc (
order_id INT PRIMARY KEY,
customer_id INT,
amount DECIMAL
) WITH (
connector = 'postgres-cdc',
hostname = 'localhost',
port = '5432',
username = 'repl_user',
password = 'secret',
database.name = 'mydb',
schema.name = 'public',
table.name = 'orders',
slot.name = 'rw_orders_slot'
);
Related Pages
Implements Principle
Requires Environment
Related Implementations
- Implementation:Risingwavelabs_Risingwave_DbzCdcEngineRunner_Start
- Implementation:Risingwavelabs_Risingwave_ConnectorNodeMetrics
- Implementation:Risingwavelabs_Risingwave_SourceValidateHandler
- Implementation:Risingwavelabs_Risingwave_DbzSourceUtils
- Implementation:Risingwavelabs_Risingwave_CDC_Database_Configuration