Implementation:Risingwavelabs Risingwave JniSinkWriterStreamRequest
| Property | Value |
|---|---|
| Component | Java Binding |
| Language | Java |
| Source File | java/java-binding/src/main/java/com/risingwave/java/binding/JniSinkWriterStreamRequest.java
|
| Lines | 81 |
| Package | com.risingwave.java.binding
|
| License | Apache 2.0 |
Overview
JniSinkWriterStreamRequest is a wrapper class for sink writer stream requests that bridges the JNI channel between the Rust sink coordinator and Java sink writers. It supports two distinct construction modes:
- Protobuf mode: Created from a serialized protobuf payload (
SinkWriterStreamRequest), used for control messages like start/barrier/commit. - StreamChunk mode: Created from a native
StreamChunkpointer with epoch and batch ID metadata, enabling zero-copy passing of streaming row batches from the Rust engine to Java sink writers.
Both modes can be converted to a SinkWriterStreamRequest protobuf message via the asPbRequest() method. In StreamChunk mode, the conversion builds a WriteBatch message that carries the native chunk pointer as a reference, avoiding data serialization overhead. The class implements AutoCloseable to ensure the owned StreamChunk is properly released.
Code Reference
Source Location
java/java-binding/src/main/java/com/risingwave/java/binding/JniSinkWriterStreamRequest.java
Signature
public class JniSinkWriterStreamRequest implements AutoCloseable {
private final ConnectorServiceProto.SinkWriterStreamRequest pbRequest;
private final StreamChunk chunk;
private final long epoch;
private final long batchId;
private final boolean isPb;
JniSinkWriterStreamRequest(ConnectorServiceProto.SinkWriterStreamRequest pbRequest);
JniSinkWriterStreamRequest(StreamChunk chunk, long epoch, long batchId);
public static JniSinkWriterStreamRequest fromSerializedPayload(byte[] payload);
public static JniSinkWriterStreamRequest fromStreamChunkOwnedPointer(
long pointer, long epoch, long batchId);
public ConnectorServiceProto.SinkWriterStreamRequest asPbRequest();
@Override
public void close() throws Exception;
}
Import
import com.risingwave.java.binding.JniSinkWriterStreamRequest;
// Dependencies:
import com.risingwave.proto.ConnectorServiceProto;
import com.google.protobuf.InvalidProtocolBufferException;
I/O Contract
Inputs
- Protobuf mode:
- payload (byte[]): A serialized
ConnectorServiceProto.SinkWriterStreamRequestprotobuf message. This is deserialized viaparseFrominfromSerializedPayload.
- payload (byte[]): A serialized
- StreamChunk mode:
- pointer (long): A native pointer to an owned Rust
StreamChunk. Ownership is transferred to this object. - epoch (long): The epoch number for the write batch.
- batchId (long): The batch identifier for the write batch.
- pointer (long): A native pointer to an owned Rust
Outputs
asPbRequest()returnsConnectorServiceProto.SinkWriterStreamRequest:- In protobuf mode, returns the deserialized protobuf request directly.
- In StreamChunk mode, constructs a
SinkWriterStreamRequestwith aWriteBatchthat contains the epoch, batch ID, and the native chunk pointer as astreamChunkRefPointer.
Side Effects
- Protobuf deserialization:
fromSerializedPayloadmay throw aRuntimeExceptionwrappingInvalidProtocolBufferExceptionif the payload is malformed. - Native resource management:
close()releases the ownedStreamChunkin non-protobuf mode by callingchunk.close(). - Zero-copy pointer passing: In StreamChunk mode,
asPbRequest()embeds the native pointer value in the protobuf message, allowing the sink writer to access the chunk data without serialization.
Usage Examples
Receiving a request from the JNI channel
// Receive a request from the Rust side via JNI channel
try (JniSinkWriterStreamRequest request =
Binding.recvSinkWriterRequestFromChannel(channelPtr)) {
ConnectorServiceProto.SinkWriterStreamRequest pbRequest = request.asPbRequest();
if (pbRequest.hasWriteBatch()) {
// Process the write batch
ConnectorServiceProto.SinkWriterStreamRequest.WriteBatch batch =
pbRequest.getWriteBatch();
long epoch = batch.getEpoch();
long batchId = batch.getBatchId();
// Access stream chunk data via the reference pointer...
}
}
Creating from a serialized payload
byte[] serializedRequest = receiveFromNetwork();
try (JniSinkWriterStreamRequest request =
JniSinkWriterStreamRequest.fromSerializedPayload(serializedRequest)) {
ConnectorServiceProto.SinkWriterStreamRequest pbRequest = request.asPbRequest();
// Handle the request...
}
Related Pages
- Binding - Declares
recvSinkWriterRequestFromChannelthat returns JniSinkWriterStreamRequest instances - StreamChunk - Native StreamChunk wrapper held by JniSinkWriterStreamRequest in non-protobuf mode
- SinkWriterStreamObserver OnNext - gRPC observer that processes sink writer stream requests
- SinkFactory CreateWriter - Factory that creates sink writers consuming these requests