Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Risingwavelabs Risingwave SinkCoordinatorStreamObserver

From Leeroopedia


Knowledge Sources
Domains Streaming, Data_Delivery, Connectors, gRPC
Last Updated 2026-02-09 07:00 GMT

Overview

gRPC stream observer that handles the sink coordinator protocol for managing distributed sink writer coordination.

Description

The SinkCoordinatorStreamObserver class implements the gRPC StreamObserver interface to handle bidirectional streaming for sink coordinator requests. It processes two types of messages: Start (initialize the coordinator) and Commit (commit an epoch with metadata). On receiving a Start request, it resolves the sink connector factory via SinkUtils.getSinkFactory, constructs the TableSchema from the protobuf parameter, and creates a SinkCoordinator instance through the factory. On receiving a Commit request, it delegates the epoch commit with associated metadata to the coordinator. The observer manages the full lifecycle of the coordinator, including cleanup on error or stream completion by calling coordinator.drop().

Usage

This observer is invoked by the Rust streaming engine during distributed sink operations. When multiple sink writers need coordination (e.g., for transactional or exactly-once sinks), the Rust side establishes a gRPC coordinator stream. The coordinator manages commit ordering and metadata aggregation across writer instances.

Code Reference

Source Location

  • Repository: risingwave
  • File: java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkCoordinatorStreamObserver.java
  • Lines: 113

Signature

public class SinkCoordinatorStreamObserver
        implements StreamObserver<ConnectorServiceProto.SinkCoordinatorStreamRequest> {

    public SinkCoordinatorStreamObserver(
            StreamObserver<ConnectorServiceProto.SinkCoordinatorStreamResponse> responseObserver) { ... }

    @Override
    public void onNext(ConnectorServiceProto.SinkCoordinatorStreamRequest request) { ... }

    @Override
    public void onError(Throwable throwable) { ... }

    @Override
    public void onCompleted() { ... }

    void cleanUp() { ... }
}

Import

import com.risingwave.connector.SinkCoordinatorStreamObserver;

I/O Contract

Inputs

Name Type Required Description
request (Start) SinkCoordinatorStreamRequest Yes (first message) Contains SinkParam with connector name, table schema, and properties to initialize the coordinator
request (Commit) SinkCoordinatorStreamRequest Yes (commit phase) Contains epoch number and list of metadata entries from sink writers

Outputs

Name Type Description
StartResponse SinkCoordinatorStreamResponse Acknowledges successful coordinator initialization
CommitResponse SinkCoordinatorStreamResponse Confirms epoch commit, echoes back the committed epoch number

Error Handling

  • If Start is received when a coordinator is already initialized, throws INVALID_ARGUMENT status.
  • If Commit is received before the coordinator is initialized, throws INVALID_ARGUMENT status.
  • Unrecognized request types trigger INVALID_ARGUMENT status.
  • All exceptions during processing are caught and forwarded as INTERNAL errors to the response observer.

Usage Examples

Coordinated Sink with Iceberg

-- When creating a sink that requires coordination, the coordinator stream is used internally
CREATE SINK iceberg_sink FROM orders
WITH (
    connector = 'iceberg',
    type = 'upsert',
    primary_key = 'order_id',
    warehouse.path = 's3://my-bucket/warehouse',
    catalog.type = 'storage'
);

Related Pages

Implements Principle

Requires Environment

Related Implementations

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment