Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Risingwavelabs Risingwave StreamChunkDeserializer

From Leeroopedia
Revision as of 16:32, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Risingwavelabs_Risingwave_StreamChunkDeserializer.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Connectors, Sinks, Serialization, Data Types
Language Java
Lines 364
Last Updated 2026-02-09 07:00 GMT

Overview

Deserializes binary-encoded StreamChunk protobuf messages into iterable SinkRow collections, converting RisingWave internal column data formats to Java objects.

Description

StreamChunkDeserializer implements the Deserializer interface and is the primary mechanism for converting binary StreamChunk data received over gRPC into Java-consumable row data for sink connectors. It operates in two phases:

  1. ValueGetter construction -- At initialization, the class inspects the TableSchema and builds an array of type-specific ValueGetter lambda functions. Each getter handles null checks and delegates to the appropriate typed accessor on StreamChunkRow (e.g., getShort, getInt, getLong, getFloat, getDouble, getBoolean, getString, getTimestamp, getTimestamptz, getTime, getDecimal, getDate, getInterval, getJsonb, getBytea). Array (LIST) types are also supported for INT16, INT32, INT64, FLOAT, DOUBLE, and VARCHAR element types.
  2. Deserialization -- The deserialize method accepts a WriteBatch and creates a StreamChunkIterable from either a StreamChunkPayload (binary data) or a StreamChunkRefPointer (direct pointer).

The class defines three inner helper classes:

  • StreamChunkRowWrapper -- Adapts a StreamChunkRow to the SinkRow interface using the ValueGetters.
  • StreamChunkIteratorWrapper -- Wraps a StreamChunkIterator as a Java Iterator<SinkRow> with AutoCloseable support.
  • StreamChunkIterable -- Implements CloseableIterable<SinkRow> to manage the lifecycle of the StreamChunk and its iterator.

Usage

Instantiated by the sink framework when StreamChunk binary format is used for data transfer between the Rust engine and Java sink connectors.

Code Reference

Source Location

  • Repository: risingwave
  • File: java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/deserializer/StreamChunkDeserializer.java
  • Lines: 1-364

Signature

public class StreamChunkDeserializer implements Deserializer {

    interface ValueGetter {
        Object get(StreamChunkRow row);
    }

    public StreamChunkDeserializer(TableSchema tableSchema);

    static ValueGetter[] buildValueGetter(TableSchema tableSchema);

    @Override
    public CloseableIterable<SinkRow> deserialize(
            ConnectorServiceProto.SinkWriterStreamRequest.WriteBatch writeBatch);

    static class StreamChunkRowWrapper implements SinkRow { ... }
    static class StreamChunkIteratorWrapper implements Iterator<SinkRow>, AutoCloseable { ... }
    static class StreamChunkIterable implements CloseableIterable<SinkRow> { ... }
}

Import

import com.risingwave.connector.deserializer.StreamChunkDeserializer;

I/O Contract

Inputs

Name Type Required Description
tableSchema TableSchema Yes Schema describing column names and data types used to build type-specific ValueGetters
writeBatch SinkWriterStreamRequest.WriteBatch Yes Protobuf message containing either a StreamChunkPayload (binary data) or a StreamChunkRefPointer

Outputs

Name Type Description
CloseableIterable<SinkRow> Iterable An iterable of SinkRow objects, each providing typed column access via get(index) and operation type via getOp()

Exceptions

Exception Condition
StatusRuntimeException (INVALID_ARGUMENT) Unsupported data type encountered in the table schema
StatusRuntimeException (INVALID_ARGUMENT) Unsupported array element type in LIST columns
StatusRuntimeException (INVALID_ARGUMENT) WriteBatch does not contain a StreamChunkPayload or StreamChunkRefPointer

Usage Examples

Deserializing a WriteBatch

TableSchema schema = TableSchema.fromProto(tableSchemaProto);
StreamChunkDeserializer deserializer = new StreamChunkDeserializer(schema);

// Inside the sink writer handling loop:
try (CloseableIterable<SinkRow> rows = deserializer.deserialize(writeBatch)) {
    for (SinkRow row : rows) {
        Data.Op op = row.getOp();        // INSERT, DELETE, UPDATE_INSERT, etc.
        Object col0 = row.get(0);         // First column value (typed)
        Object col1 = row.get(1);         // Second column value (typed)
        // Process the row...
    }
}

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment