Implementation:Risingwavelabs Risingwave SinkCoordinatorStreamObserver
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Data_Delivery, Connectors, gRPC |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
gRPC stream observer that handles the sink coordinator protocol for managing distributed sink writer coordination.
Description
The SinkCoordinatorStreamObserver class implements the gRPC StreamObserver interface to handle bidirectional streaming for sink coordinator requests. It processes two types of messages: Start (initialize the coordinator) and Commit (commit an epoch with metadata). On receiving a Start request, it resolves the sink connector factory via SinkUtils.getSinkFactory, constructs the TableSchema from the protobuf parameter, and creates a SinkCoordinator instance through the factory. On receiving a Commit request, it delegates the epoch commit with associated metadata to the coordinator. The observer manages the full lifecycle of the coordinator, including cleanup on error or stream completion by calling coordinator.drop().
Usage
This observer is invoked by the Rust streaming engine during distributed sink operations. When multiple sink writers need coordination (e.g., for transactional or exactly-once sinks), the Rust side establishes a gRPC coordinator stream. The coordinator manages commit ordering and metadata aggregation across writer instances.
Code Reference
Source Location
- Repository: risingwave
- File: java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkCoordinatorStreamObserver.java
- Lines: 113
Signature
public class SinkCoordinatorStreamObserver
implements StreamObserver<ConnectorServiceProto.SinkCoordinatorStreamRequest> {
public SinkCoordinatorStreamObserver(
StreamObserver<ConnectorServiceProto.SinkCoordinatorStreamResponse> responseObserver) { ... }
@Override
public void onNext(ConnectorServiceProto.SinkCoordinatorStreamRequest request) { ... }
@Override
public void onError(Throwable throwable) { ... }
@Override
public void onCompleted() { ... }
void cleanUp() { ... }
}
Import
import com.risingwave.connector.SinkCoordinatorStreamObserver;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| request (Start) | SinkCoordinatorStreamRequest | Yes (first message) | Contains SinkParam with connector name, table schema, and properties to initialize the coordinator |
| request (Commit) | SinkCoordinatorStreamRequest | Yes (commit phase) | Contains epoch number and list of metadata entries from sink writers |
Outputs
| Name | Type | Description |
|---|---|---|
| StartResponse | SinkCoordinatorStreamResponse | Acknowledges successful coordinator initialization |
| CommitResponse | SinkCoordinatorStreamResponse | Confirms epoch commit, echoes back the committed epoch number |
Error Handling
- If Start is received when a coordinator is already initialized, throws INVALID_ARGUMENT status.
- If Commit is received before the coordinator is initialized, throws INVALID_ARGUMENT status.
- Unrecognized request types trigger INVALID_ARGUMENT status.
- All exceptions during processing are caught and forwarded as INTERNAL errors to the response observer.
Usage Examples
Coordinated Sink with Iceberg
-- When creating a sink that requires coordination, the coordinator stream is used internally
CREATE SINK iceberg_sink FROM orders
WITH (
connector = 'iceberg',
type = 'upsert',
primary_key = 'order_id',
warehouse.path = 's3://my-bucket/warehouse',
catalog.type = 'storage'
);