Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Risingwavelabs Risingwave SourceHandler Interface

From Leeroopedia
Revision as of 16:32, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Risingwavelabs_Risingwave_SourceHandler_Interface.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Metadata

Property Value
File java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/SourceHandler.java
Language Java
Module connector-api
Package com.risingwave.connector.api.source
Classes SourceHandler (interface)
Lines 27

Overview

SourceHandler is an interface that defines the contract for handling RPC requests to start a CDC source stream. It declares a single method, startSource, that accepts a gRPC ServerCallStreamObserver and begins streaming GetEventStreamResponse messages to the connected client.

This interface serves as the primary abstraction for source connector implementations, defining how CDC change events are streamed from the connector node to the RisingWave compute engine.

Code Reference

Source Location

java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/SourceHandler.java

Signature

/** Handler for RPC request */
public interface SourceHandler {
    void startSource(
            ServerCallStreamObserver<ConnectorServiceProto.GetEventStreamResponse>
                    responseObserver);
}

Imports

import com.risingwave.proto.ConnectorServiceProto;
import io.grpc.stub.ServerCallStreamObserver;

I/O Contract

startSource

Parameter Type Description
responseObserver ServerCallStreamObserver<ConnectorServiceProto.GetEventStreamResponse> A gRPC server-side stream observer for sending CDC event responses back to the caller

Contract:

  • The implementation must start a CDC engine (or equivalent mechanism) to capture change events from the source database
  • Change events must be serialized as GetEventStreamResponse protobuf messages and sent via responseObserver.onNext()
  • The implementation should call responseObserver.onCompleted() when the stream ends normally
  • The implementation should call responseObserver.onError() if an unrecoverable error occurs
  • The ServerCallStreamObserver provides back-pressure awareness via isReady() and setOnReadyHandler()

Usage Examples

// Implementing a SourceHandler for a specific database
public class MyDbSourceHandler implements SourceHandler {
    private final CdcEngineRunner runner;

    public MyDbSourceHandler(CdcEngineRunner runner) {
        this.runner = runner;
    }

    @Override
    public void startSource(
            ServerCallStreamObserver<ConnectorServiceProto.GetEventStreamResponse>
                    responseObserver) {
        try {
            runner.start();
            CdcEngine engine = runner.getEngine();
            BlockingQueue<ConnectorServiceProto.GetEventStreamResponse> output =
                    engine.getOutputChannel();

            while (runner.isRunning()) {
                ConnectorServiceProto.GetEventStreamResponse event = output.take();
                responseObserver.onNext(event);
            }
            responseObserver.onCompleted();
        } catch (Exception e) {
            responseObserver.onError(e);
        }
    }
}

// Used by the gRPC service implementation
SourceHandler handler = createHandler(sourceType, config);
handler.startSource(responseObserver);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment