Implementation:Risingwavelabs Risingwave SinkWriterStreamObserver OnNext
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Data_Delivery, Connectors, gRPC |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
Concrete tool for handling streaming sink write requests via gRPC provided by the Java connector service.
Description
The SinkWriterStreamObserver class implements the gRPC StreamObserver interface to handle bidirectional streaming of sink write requests. Its onNext method processes three types of messages: StartSink (initialize writer), WriteBatch (write data rows), and Barrier (checkpoint). It manages epoch-based consistency, delegates actual writes to the appropriate SinkWriter implementation, and sends responses back through the gRPC stream.
Usage
This is invoked by the Rust core engine when executing a CREATE SINK statement. The Rust side establishes a gRPC stream and sends write requests; this observer processes them on the Java side.
Code Reference
Source Location
- Repository: risingwave
- File: java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java
- Lines: L94-182
Signature
public class SinkWriterStreamObserver
implements StreamObserver<ConnectorServiceProto.SinkWriterStreamRequest> {
@Override
public void onNext(ConnectorServiceProto.SinkWriterStreamRequest sinkTask) {
// Handles: sinkTask.hasStart(), sinkTask.hasWriteBatch(), sinkTask.hasBarrier()
}
}
Import
import com.risingwave.connector.SinkWriterStreamObserver;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| sinkTask (StartSink) | SinkWriterStreamRequest | Yes (first message) | Initializes the sink with sinkParam, payloadSchema |
| sinkTask (WriteBatch) | SinkWriterStreamRequest | Yes (data) | Contains epoch, batchId, and serialized row data |
| sinkTask (Barrier) | SinkWriterStreamRequest | Yes (checkpoint) | Contains epoch and isCheckpoint flag |
Outputs
| Name | Type | Description |
|---|---|---|
| StartResponse | SinkWriterStreamResponse | Acknowledges sink initialization |
| BatchWrittenResponse | SinkWriterStreamResponse | Confirms batch write with epoch and batchId |
| CommitResponse | SinkWriterStreamResponse | Checkpoint commit with epoch and optional metadata |
Usage Examples
Create a Kafka Sink
-- This SQL triggers the SinkWriterStreamObserver flow
CREATE SINK kafka_sink FROM user_event_counts
WITH (
connector = 'kafka',
topic = 'output_events',
properties.bootstrap.server = 'broker:9092',
type = 'append-only'
) FORMAT PLAIN ENCODE JSON;
Create a JDBC Sink
CREATE SINK pg_sink FROM user_event_counts
WITH (
connector = 'jdbc',
jdbc.url = 'jdbc:postgresql://postgres:5432/target_db',
table.name = 'event_counts',
type = 'upsert',
primary_key = 'user_id,event_type'
);