Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Risingwavelabs Risingwave ConfigurableOffsetBackingStore

From Leeroopedia


Property Value
Component CDC Source - Debezium Internal
Language Java
Lines 208
License Apache 2.0
Repository risingwavelabs/risingwave

Overview

ConfigurableOffsetBackingStore is a custom implementation of the Kafka Connect OffsetBackingStore interface that enables the Debezium engine to be initialized with a pre-existing binlog offset for failover recovery. When RisingWave restores a CDC source after a failure, it provides the last known offset state (serialized as JSON) through the worker configuration. This class deserializes that offset and writes it into an in-memory store so that Debezium can resume capturing changes from the exact position where it left off.

The implementation was originally ported from the Flink CDC Connectors project and adapted for use within RisingWave's connector node architecture.

The in-memory offset store uses a simple HashMap<ByteBuffer, ByteBuffer> backed by a single-threaded executor for thread-safe access.

Code Reference

Source Location

java/connector-node/risingwave-source-cdc/src/main/java/com/risingwave/connector/cdc/debezium/internal/ConfigurableOffsetBackingStore.java

View on GitHub

Signature

public class ConfigurableOffsetBackingStore implements OffsetBackingStore

Key Methods

// Configures the store, deserializing saved offset state if present, and flushing it to the in-memory store
@Override
public void configure(WorkerConfig config)

// Starts the single-threaded executor service
@Override
public void start()

// Shuts down the executor service with a 30-second graceful timeout
@Override
public void stop()

// Retrieves offsets for the given keys from the in-memory map
@Override
public Future<Map<ByteBuffer, ByteBuffer>> get(final Collection<ByteBuffer> keys)

// Stores offset key-value pairs into the in-memory map
@Override
public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values, final Callback<Void> callback)

// Returns null (connector partitions not tracked)
@Override
public Set<Map<String, Object>> connectorPartitions(String connectorName)

Constants

public static final String OFFSET_STATE_VALUE = "offset.storage.risingwave.state.value";
public static final int FLUSH_TIMEOUT_SECONDS = 10;

Imports

import io.debezium.config.Instantiator;
import io.debezium.embedded.async.AsyncEngineConfig;
import io.debezium.engine.DebeziumEngine;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.*;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.util.Callback;

I/O Contract

Input

  • WorkerConfig: Contains optional key offset.storage.risingwave.state.value with a JSON-serialized DebeziumOffset (source partition and source offset). If this key is absent, the store starts from a clean state.
  • ByteBuffer keys/values: Standard Kafka Connect offset storage key-value pairs passed via get() and set().

Output

  • get(): Returns a Future<Map<ByteBuffer, ByteBuffer>> containing the requested offset entries from the in-memory map.
  • set(): Returns a Future<Void> that completes after storing the provided entries and invoking the callback.

Side Effects

  • Executor lifecycle: Creates and manages a single-threaded executor pool for serialized access to the offset data.
  • Logging: Logs offset flush results, warnings on empty state, and errors on deserialization or timeout failures.

Usage Examples

Configuring Debezium Engine with Offset Restore

// When setting up the Debezium engine for recovery
Properties props = new Properties();
props.put("offset.storage", ConfigurableOffsetBackingStore.class.getName());
// Provide the saved offset state from RisingWave's state store
props.put(ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, savedOffsetJson);

Normal Startup (No Recovery)

// When no prior offset exists, the store starts clean
Properties props = new Properties();
props.put("offset.storage", ConfigurableOffsetBackingStore.class.getName());
// OFFSET_STATE_VALUE is not set; store initializes empty

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment