Implementation:Risingwavelabs Risingwave SourceHandler Interface
Appearance
Metadata
| Property | Value |
|---|---|
| File | java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/SourceHandler.java
|
| Language | Java |
| Module | connector-api |
| Package | com.risingwave.connector.api.source
|
| Classes | SourceHandler (interface)
|
| Lines | 27 |
Overview
SourceHandler is an interface that defines the contract for handling RPC requests to start a CDC source stream. It declares a single method, startSource, that accepts a gRPC ServerCallStreamObserver and begins streaming GetEventStreamResponse messages to the connected client.
This interface serves as the primary abstraction for source connector implementations, defining how CDC change events are streamed from the connector node to the RisingWave compute engine.
Code Reference
Source Location
Signature
/** Handler for RPC request */
public interface SourceHandler {
void startSource(
ServerCallStreamObserver<ConnectorServiceProto.GetEventStreamResponse>
responseObserver);
}
Imports
import com.risingwave.proto.ConnectorServiceProto;
import io.grpc.stub.ServerCallStreamObserver;
I/O Contract
startSource
| Parameter | Type | Description |
|---|---|---|
responseObserver |
ServerCallStreamObserver<ConnectorServiceProto.GetEventStreamResponse> |
A gRPC server-side stream observer for sending CDC event responses back to the caller |
Contract:
- The implementation must start a CDC engine (or equivalent mechanism) to capture change events from the source database
- Change events must be serialized as
GetEventStreamResponseprotobuf messages and sent viaresponseObserver.onNext() - The implementation should call
responseObserver.onCompleted()when the stream ends normally - The implementation should call
responseObserver.onError()if an unrecoverable error occurs - The
ServerCallStreamObserverprovides back-pressure awareness viaisReady()andsetOnReadyHandler()
Usage Examples
// Implementing a SourceHandler for a specific database
public class MyDbSourceHandler implements SourceHandler {
private final CdcEngineRunner runner;
public MyDbSourceHandler(CdcEngineRunner runner) {
this.runner = runner;
}
@Override
public void startSource(
ServerCallStreamObserver<ConnectorServiceProto.GetEventStreamResponse>
responseObserver) {
try {
runner.start();
CdcEngine engine = runner.getEngine();
BlockingQueue<ConnectorServiceProto.GetEventStreamResponse> output =
engine.getOutputChannel();
while (runner.isRunning()) {
ConnectorServiceProto.GetEventStreamResponse event = output.take();
responseObserver.onNext(event);
}
responseObserver.onCompleted();
} catch (Exception e) {
responseObserver.onError(e);
}
}
}
// Used by the gRPC service implementation
SourceHandler handler = createHandler(sourceType, config);
handler.startSource(responseObserver);
Related Pages
- CdcEngine Interface - The engine that produces CDC events consumed by SourceHandler
- CdcEngineRunner Interface - Manages the lifecycle of the CdcEngine used by SourceHandler
- SourceTypeE - Enum that determines which SourceHandler implementation to instantiate
- JniDbzSourceHandler RunJniDbzSourceThread - A JNI-based source handler implementation
- ConnectorService Main - The main service that routes RPC requests to SourceHandler implementations
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment