Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Risingwavelabs Risingwave DbzSourceHandler

From Leeroopedia


Knowledge Sources
Domains Connectors, CDC, Streaming, gRPC, Debezium
Last Updated 2026-02-09 07:00 GMT

Overview

gRPC-based source handler that starts a Debezium CDC engine and streams change events to gRPC clients with backpressure support.

Description

The DbzSourceHandler class implements the SourceHandler interface to manage the lifecycle of Debezium CDC engine instances and stream their output over gRPC. It consists of two key components:

DbzSourceHandler (outer class): Holds the DbzConnectorConfig and implements the startSource method. This method creates a DbzCdcEngineRunner via the static factory method, starts the engine, configures an OnReadyHandler callback on the gRPC response observer, disables auto-request (enabling manual flow control), and initiates the first polling cycle. If the engine runner fails to create, it completes the response immediately. If any error occurs during startup, it stops the runner and logs the failure.

OnReadyHandler (inner class): Implements Runnable and serves as the gRPC flow-control callback. When invoked, it enters a polling loop that:

  1. Checks if the gRPC context is cancelled (client disconnected), stopping the engine if so.
  2. Checks if the response observer is ready to accept more data (backpressure check).
  3. Polls the engine output channel with a 500ms timeout to retrieve batched CDC events.
  4. Sends the event batch to the gRPC client and increments the sourceRowsReceived metric via ConnectorNodeMetrics.
  5. Returns immediately if backpressure is detected (observer not ready), allowing gRPC to re-invoke the handler when capacity is available.

Usage

This handler is instantiated when the Rust streaming engine requests a CDC event stream from the Java connector service. The handler runs the Debezium engine and feeds events back to the Rust side through the gRPC stream.

Code Reference

Source Location

  • Repository: risingwave
  • File: java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzSourceHandler.java
  • Lines: 118

Signature

public class DbzSourceHandler implements SourceHandler {

    public DbzSourceHandler(DbzConnectorConfig config) { ... }

    @Override
    public void startSource(ServerCallStreamObserver<GetEventStreamResponse> responseObserver) { ... }

    class OnReadyHandler implements Runnable {
        public OnReadyHandler(
                DbzCdcEngineRunner runner,
                ServerCallStreamObserver<GetEventStreamResponse> responseObserver) { ... }

        @Override
        public void run() { ... }
    }
}

Import

import com.risingwave.connector.source.core.DbzSourceHandler;

I/O Contract

Inputs

Name Type Required Description
config DbzConnectorConfig Yes (constructor) Debezium connector configuration including source type, source ID, and all connection properties
responseObserver ServerCallStreamObserver<GetEventStreamResponse> Yes (startSource) gRPC stream observer for sending CDC events back to the client

Outputs

Name Type Description
GetEventStreamResponse gRPC stream messages Batched CDC events containing change records from the upstream database

Flow Control

The handler implements gRPC backpressure by:

  • Disabling auto-request via responseObserver.disableAutoRequest()
  • Setting the OnReadyHandler as the callback via responseObserver.setOnReadyHandler()
  • Checking responseObserver.isReady() before each send
  • Returning from the handler when the observer is not ready (backpressure detected)

Usage Examples

Starting a CDC Source Handler

DbzConnectorConfig config = new DbzConnectorConfig(
    SourceTypeE.POSTGRES,
    sourceId,
    "postgres-server",
    properties,
    isBackfillTable
);
DbzSourceHandler handler = new DbzSourceHandler(config);
handler.startSource(responseObserver);

CDC Source SQL Trigger

-- Creating a CDC source triggers DbzSourceHandler on the Java side
CREATE TABLE orders_cdc (
    order_id INT PRIMARY KEY,
    customer_id INT,
    amount DECIMAL
) WITH (
    connector = 'postgres-cdc',
    hostname = 'localhost',
    port = '5432',
    username = 'repl_user',
    password = 'secret',
    database.name = 'mydb',
    schema.name = 'public',
    table.name = 'orders',
    slot.name = 'rw_orders_slot'
);

Related Pages

Implements Principle

Requires Environment

Related Implementations

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment