Implementation:Risingwavelabs Risingwave ConfigurableOffsetBackingStore
| Property | Value |
|---|---|
| Component | CDC Source - Debezium Internal |
| Language | Java |
| Lines | 208 |
| License | Apache 2.0 |
| Repository | risingwavelabs/risingwave |
Overview
ConfigurableOffsetBackingStore is a custom implementation of the Kafka Connect OffsetBackingStore interface that enables the Debezium engine to be initialized with a pre-existing binlog offset for failover recovery. When RisingWave restores a CDC source after a failure, it provides the last known offset state (serialized as JSON) through the worker configuration. This class deserializes that offset and writes it into an in-memory store so that Debezium can resume capturing changes from the exact position where it left off.
The implementation was originally ported from the Flink CDC Connectors project and adapted for use within RisingWave's connector node architecture.
The in-memory offset store uses a simple HashMap<ByteBuffer, ByteBuffer> backed by a single-threaded executor for thread-safe access.
Code Reference
Source Location
java/connector-node/risingwave-source-cdc/src/main/java/com/risingwave/connector/cdc/debezium/internal/ConfigurableOffsetBackingStore.java
Signature
public class ConfigurableOffsetBackingStore implements OffsetBackingStore
Key Methods
// Configures the store, deserializing saved offset state if present, and flushing it to the in-memory store
@Override
public void configure(WorkerConfig config)
// Starts the single-threaded executor service
@Override
public void start()
// Shuts down the executor service with a 30-second graceful timeout
@Override
public void stop()
// Retrieves offsets for the given keys from the in-memory map
@Override
public Future<Map<ByteBuffer, ByteBuffer>> get(final Collection<ByteBuffer> keys)
// Stores offset key-value pairs into the in-memory map
@Override
public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values, final Callback<Void> callback)
// Returns null (connector partitions not tracked)
@Override
public Set<Map<String, Object>> connectorPartitions(String connectorName)
Constants
public static final String OFFSET_STATE_VALUE = "offset.storage.risingwave.state.value";
public static final int FLUSH_TIMEOUT_SECONDS = 10;
Imports
import io.debezium.config.Instantiator;
import io.debezium.embedded.async.AsyncEngineConfig;
import io.debezium.engine.DebeziumEngine;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.*;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.util.Callback;
I/O Contract
Input
- WorkerConfig: Contains optional key
offset.storage.risingwave.state.valuewith a JSON-serializedDebeziumOffset(source partition and source offset). If this key is absent, the store starts from a clean state. - ByteBuffer keys/values: Standard Kafka Connect offset storage key-value pairs passed via
get()andset().
Output
- get(): Returns a
Future<Map<ByteBuffer, ByteBuffer>>containing the requested offset entries from the in-memory map. - set(): Returns a
Future<Void>that completes after storing the provided entries and invoking the callback.
Side Effects
- Executor lifecycle: Creates and manages a single-threaded executor pool for serialized access to the offset data.
- Logging: Logs offset flush results, warnings on empty state, and errors on deserialization or timeout failures.
Usage Examples
Configuring Debezium Engine with Offset Restore
// When setting up the Debezium engine for recovery
Properties props = new Properties();
props.put("offset.storage", ConfigurableOffsetBackingStore.class.getName());
// Provide the saved offset state from RisingWave's state store
props.put(ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, savedOffsetJson);
Normal Startup (No Recovery)
// When no prior offset exists, the store starts clean
Properties props = new Properties();
props.put("offset.storage", ConfigurableOffsetBackingStore.class.getName());
// OFFSET_STATE_VALUE is not set; store initializes empty
Related Pages
- DbzCdcEngineRunner Start - CDC engine runner that creates and configures the Debezium engine with this offset store
- BinlogOffsetContext - Debezium binlog offset context used for MySQL CDC
- DatetimeTypeConverter - Custom type converter also used in the CDC pipeline
- JniDbzSourceHandler RunJniDbzSourceThread - JNI bridge for Debezium source threads