Implementation:Risingwavelabs Risingwave StreamChunkDeserializer
| Knowledge Sources | |
|---|---|
| Domains | Connectors, Sinks, Serialization, Data Types |
| Language | Java |
| Lines | 364 |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
Deserializes binary-encoded StreamChunk protobuf messages into iterable SinkRow collections, converting RisingWave internal column data formats to Java objects.
Description
StreamChunkDeserializer implements the Deserializer interface and is the primary mechanism for converting binary StreamChunk data received over gRPC into Java-consumable row data for sink connectors. It operates in two phases:
- ValueGetter construction -- At initialization, the class inspects the TableSchema and builds an array of type-specific ValueGetter lambda functions. Each getter handles null checks and delegates to the appropriate typed accessor on StreamChunkRow (e.g., getShort, getInt, getLong, getFloat, getDouble, getBoolean, getString, getTimestamp, getTimestamptz, getTime, getDecimal, getDate, getInterval, getJsonb, getBytea). Array (LIST) types are also supported for INT16, INT32, INT64, FLOAT, DOUBLE, and VARCHAR element types.
- Deserialization -- The deserialize method accepts a WriteBatch and creates a StreamChunkIterable from either a StreamChunkPayload (binary data) or a StreamChunkRefPointer (direct pointer).
The class defines three inner helper classes:
- StreamChunkRowWrapper -- Adapts a StreamChunkRow to the SinkRow interface using the ValueGetters.
- StreamChunkIteratorWrapper -- Wraps a StreamChunkIterator as a Java Iterator<SinkRow> with AutoCloseable support.
- StreamChunkIterable -- Implements CloseableIterable<SinkRow> to manage the lifecycle of the StreamChunk and its iterator.
Usage
Instantiated by the sink framework when StreamChunk binary format is used for data transfer between the Rust engine and Java sink connectors.
Code Reference
Source Location
- Repository: risingwave
- File: java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/deserializer/StreamChunkDeserializer.java
- Lines: 1-364
Signature
public class StreamChunkDeserializer implements Deserializer {
interface ValueGetter {
Object get(StreamChunkRow row);
}
public StreamChunkDeserializer(TableSchema tableSchema);
static ValueGetter[] buildValueGetter(TableSchema tableSchema);
@Override
public CloseableIterable<SinkRow> deserialize(
ConnectorServiceProto.SinkWriterStreamRequest.WriteBatch writeBatch);
static class StreamChunkRowWrapper implements SinkRow { ... }
static class StreamChunkIteratorWrapper implements Iterator<SinkRow>, AutoCloseable { ... }
static class StreamChunkIterable implements CloseableIterable<SinkRow> { ... }
}
Import
import com.risingwave.connector.deserializer.StreamChunkDeserializer;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| tableSchema | TableSchema | Yes | Schema describing column names and data types used to build type-specific ValueGetters |
| writeBatch | SinkWriterStreamRequest.WriteBatch | Yes | Protobuf message containing either a StreamChunkPayload (binary data) or a StreamChunkRefPointer |
Outputs
| Name | Type | Description |
|---|---|---|
| CloseableIterable<SinkRow> | Iterable | An iterable of SinkRow objects, each providing typed column access via get(index) and operation type via getOp() |
Exceptions
| Exception | Condition |
|---|---|
| StatusRuntimeException (INVALID_ARGUMENT) | Unsupported data type encountered in the table schema |
| StatusRuntimeException (INVALID_ARGUMENT) | Unsupported array element type in LIST columns |
| StatusRuntimeException (INVALID_ARGUMENT) | WriteBatch does not contain a StreamChunkPayload or StreamChunkRefPointer |
Usage Examples
Deserializing a WriteBatch
TableSchema schema = TableSchema.fromProto(tableSchemaProto);
StreamChunkDeserializer deserializer = new StreamChunkDeserializer(schema);
// Inside the sink writer handling loop:
try (CloseableIterable<SinkRow> rows = deserializer.deserialize(writeBatch)) {
for (SinkRow row : rows) {
Data.Op op = row.getOp(); // INSERT, DELETE, UPDATE_INSERT, etc.
Object col0 = row.get(0); // First column value (typed)
Object col1 = row.get(1); // Second column value (typed)
// Process the row...
}
}