Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Risingwavelabs Risingwave SinkWriterStreamObserver OnNext

From Leeroopedia
Revision as of 16:32, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Risingwavelabs_Risingwave_SinkWriterStreamObserver_OnNext.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Streaming, Data_Delivery, Connectors, gRPC
Last Updated 2026-02-09 07:00 GMT

Overview

Concrete tool for handling streaming sink write requests via gRPC provided by the Java connector service.

Description

The SinkWriterStreamObserver class implements the gRPC StreamObserver interface to handle bidirectional streaming of sink write requests. Its onNext method processes three types of messages: StartSink (initialize writer), WriteBatch (write data rows), and Barrier (checkpoint). It manages epoch-based consistency, delegates actual writes to the appropriate SinkWriter implementation, and sends responses back through the gRPC stream.

Usage

This is invoked by the Rust core engine when executing a CREATE SINK statement. The Rust side establishes a gRPC stream and sends write requests; this observer processes them on the Java side.

Code Reference

Source Location

  • Repository: risingwave
  • File: java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java
  • Lines: L94-182

Signature

public class SinkWriterStreamObserver
        implements StreamObserver<ConnectorServiceProto.SinkWriterStreamRequest> {

    @Override
    public void onNext(ConnectorServiceProto.SinkWriterStreamRequest sinkTask) {
        // Handles: sinkTask.hasStart(), sinkTask.hasWriteBatch(), sinkTask.hasBarrier()
    }
}

Import

import com.risingwave.connector.SinkWriterStreamObserver;

I/O Contract

Inputs

Name Type Required Description
sinkTask (StartSink) SinkWriterStreamRequest Yes (first message) Initializes the sink with sinkParam, payloadSchema
sinkTask (WriteBatch) SinkWriterStreamRequest Yes (data) Contains epoch, batchId, and serialized row data
sinkTask (Barrier) SinkWriterStreamRequest Yes (checkpoint) Contains epoch and isCheckpoint flag

Outputs

Name Type Description
StartResponse SinkWriterStreamResponse Acknowledges sink initialization
BatchWrittenResponse SinkWriterStreamResponse Confirms batch write with epoch and batchId
CommitResponse SinkWriterStreamResponse Checkpoint commit with epoch and optional metadata

Usage Examples

Create a Kafka Sink

-- This SQL triggers the SinkWriterStreamObserver flow
CREATE SINK kafka_sink FROM user_event_counts
WITH (
    connector = 'kafka',
    topic = 'output_events',
    properties.bootstrap.server = 'broker:9092',
    type = 'append-only'
) FORMAT PLAIN ENCODE JSON;

Create a JDBC Sink

CREATE SINK pg_sink FROM user_event_counts
WITH (
    connector = 'jdbc',
    jdbc.url = 'jdbc:postgresql://postgres:5432/target_db',
    table.name = 'event_counts',
    type = 'upsert',
    primary_key = 'user_id,event_type'
);

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment