Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Risingwavelabs Risingwave JniSinkWriterStreamRequest

From Leeroopedia


Property Value
Component Java Binding
Language Java
Source File java/java-binding/src/main/java/com/risingwave/java/binding/JniSinkWriterStreamRequest.java
Lines 81
Package com.risingwave.java.binding
License Apache 2.0

Overview

JniSinkWriterStreamRequest is a wrapper class for sink writer stream requests that bridges the JNI channel between the Rust sink coordinator and Java sink writers. It supports two distinct construction modes:

  1. Protobuf mode: Created from a serialized protobuf payload (SinkWriterStreamRequest), used for control messages like start/barrier/commit.
  2. StreamChunk mode: Created from a native StreamChunk pointer with epoch and batch ID metadata, enabling zero-copy passing of streaming row batches from the Rust engine to Java sink writers.

Both modes can be converted to a SinkWriterStreamRequest protobuf message via the asPbRequest() method. In StreamChunk mode, the conversion builds a WriteBatch message that carries the native chunk pointer as a reference, avoiding data serialization overhead. The class implements AutoCloseable to ensure the owned StreamChunk is properly released.

Code Reference

Source Location

java/java-binding/src/main/java/com/risingwave/java/binding/JniSinkWriterStreamRequest.java

Signature

public class JniSinkWriterStreamRequest implements AutoCloseable {
    private final ConnectorServiceProto.SinkWriterStreamRequest pbRequest;
    private final StreamChunk chunk;
    private final long epoch;
    private final long batchId;
    private final boolean isPb;

    JniSinkWriterStreamRequest(ConnectorServiceProto.SinkWriterStreamRequest pbRequest);
    JniSinkWriterStreamRequest(StreamChunk chunk, long epoch, long batchId);

    public static JniSinkWriterStreamRequest fromSerializedPayload(byte[] payload);
    public static JniSinkWriterStreamRequest fromStreamChunkOwnedPointer(
            long pointer, long epoch, long batchId);

    public ConnectorServiceProto.SinkWriterStreamRequest asPbRequest();

    @Override
    public void close() throws Exception;
}

Import

import com.risingwave.java.binding.JniSinkWriterStreamRequest;
// Dependencies:
import com.risingwave.proto.ConnectorServiceProto;
import com.google.protobuf.InvalidProtocolBufferException;

I/O Contract

Inputs

  • Protobuf mode:
    • payload (byte[]): A serialized ConnectorServiceProto.SinkWriterStreamRequest protobuf message. This is deserialized via parseFrom in fromSerializedPayload.
  • StreamChunk mode:
    • pointer (long): A native pointer to an owned Rust StreamChunk. Ownership is transferred to this object.
    • epoch (long): The epoch number for the write batch.
    • batchId (long): The batch identifier for the write batch.

Outputs

  • asPbRequest() returns ConnectorServiceProto.SinkWriterStreamRequest:
    • In protobuf mode, returns the deserialized protobuf request directly.
    • In StreamChunk mode, constructs a SinkWriterStreamRequest with a WriteBatch that contains the epoch, batch ID, and the native chunk pointer as a streamChunkRefPointer.

Side Effects

  • Protobuf deserialization: fromSerializedPayload may throw a RuntimeException wrapping InvalidProtocolBufferException if the payload is malformed.
  • Native resource management: close() releases the owned StreamChunk in non-protobuf mode by calling chunk.close().
  • Zero-copy pointer passing: In StreamChunk mode, asPbRequest() embeds the native pointer value in the protobuf message, allowing the sink writer to access the chunk data without serialization.

Usage Examples

Receiving a request from the JNI channel

// Receive a request from the Rust side via JNI channel
try (JniSinkWriterStreamRequest request =
        Binding.recvSinkWriterRequestFromChannel(channelPtr)) {

    ConnectorServiceProto.SinkWriterStreamRequest pbRequest = request.asPbRequest();

    if (pbRequest.hasWriteBatch()) {
        // Process the write batch
        ConnectorServiceProto.SinkWriterStreamRequest.WriteBatch batch =
                pbRequest.getWriteBatch();
        long epoch = batch.getEpoch();
        long batchId = batch.getBatchId();
        // Access stream chunk data via the reference pointer...
    }
}

Creating from a serialized payload

byte[] serializedRequest = receiveFromNetwork();
try (JniSinkWriterStreamRequest request =
        JniSinkWriterStreamRequest.fromSerializedPayload(serializedRequest)) {

    ConnectorServiceProto.SinkWriterStreamRequest pbRequest = request.asPbRequest();
    // Handle the request...
}

Related Pages

  • Binding - Declares recvSinkWriterRequestFromChannel that returns JniSinkWriterStreamRequest instances
  • StreamChunk - Native StreamChunk wrapper held by JniSinkWriterStreamRequest in non-protobuf mode
  • SinkWriterStreamObserver OnNext - gRPC observer that processes sink writer stream requests
  • SinkFactory CreateWriter - Factory that creates sink writers consuming these requests

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment