Implementation:Risingwavelabs Risingwave MySqlOffsetContext
| Property | Value |
|---|---|
| Component | CDC Source - Debezium MySQL Connector |
| Language | Java |
| Lines | 111 |
| License | Apache 2.0 (RisingWave Labs + Debezium Authors) |
| Repository | risingwavelabs/risingwave |
Overview
MySqlOffsetContext is a patched version of the Debezium MySqlOffsetContext class that extends BinlogOffsetContext<SourceInfo> to manage MySQL CDC offset state within RisingWave. This class tracks the binlog file name, position, GTID set, server ID, and other metadata needed to resume MySQL CDC streaming from a specific point.
The class provides two key capabilities:
- Static factory method (
initial): Creates a fresh offset context starting from the beginning of the binlog, used for new CDC source creation. - Loader inner class: Deserializes a previously saved offset map back into a fully reconstructed
MySqlOffsetContext, used during failover recovery.
This file is placed in the io.debezium.connector.mysql package to override the upstream Debezium class, allowing RisingWave-specific customizations to MySQL offset handling.
Code Reference
Source Location
java/connector-node/risingwave-source-cdc/src/main/java/io/debezium/connector/mysql/MySqlOffsetContext.java
Signature
public class MySqlOffsetContext extends BinlogOffsetContext<SourceInfo>
Key Methods
// Constructor accepting snapshot state, transaction context, incremental snapshot context, and source info
public MySqlOffsetContext(
SnapshotType snapshot,
boolean snapshotCompleted,
TransactionContext transactionContext,
IncrementalSnapshotContext<TableId> incrementalSnapshotContext,
SourceInfo sourceInfo)
// Creates a fresh offset context starting from the beginning of the binlog
public static MySqlOffsetContext initial(MySqlConnectorConfig config)
Inner Class: Loader
public static class Loader extends BinlogOffsetContext.Loader<MySqlOffsetContext> {
public Loader(MySqlConnectorConfig connectorConfig)
// Reconstructs a MySqlOffsetContext from a saved offset map
@Override
public MySqlOffsetContext load(Map<String, ?> offset)
}
Imports
import static io.debezium.connector.common.OffsetUtils.longOffsetValue;
import io.debezium.connector.SnapshotType;
import io.debezium.connector.binlog.BinlogOffsetContext;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.TableId;
import java.util.Map;
import org.apache.kafka.connect.errors.ConnectException;
I/O Contract
Input
- MySqlConnectorConfig: The MySQL connector configuration, used to determine read-only connection mode and to construct
SourceInfo. - Offset map (for Loader): A
Map<String, ?>containing previously persisted offset keys:SourceInfo.BINLOG_FILENAME_OFFSET_KEY- The binlog filename (required)SourceInfo.BINLOG_POSITION_OFFSET_KEY- The binlog positionEVENTS_TO_SKIP_OFFSET_KEY- Number of events to skipSourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY- Row position within the eventGTID_SET_KEY- The completed GTID set (optional)SourceInfo.SERVER_ID_KEY- The server IDTIMESTAMP_KEY- The offset timestamp
Output
- initial(): Returns a new
MySqlOffsetContextwith binlog start point set to("", 0L). - Loader.load(): Returns a fully populated
MySqlOffsetContextreconstructed from the saved offset map.
Exceptions
- ConnectException: Thrown by the Loader if the binlog filename is missing from the offset map.
Usage Examples
Creating an Initial Offset Context
MySqlConnectorConfig config = new MySqlConnectorConfig(connectorProperties);
MySqlOffsetContext offset = MySqlOffsetContext.initial(config);
// Offset starts from the beginning of the binlog ("", position 0)
Loading a Saved Offset for Recovery
MySqlConnectorConfig config = new MySqlConnectorConfig(connectorProperties);
MySqlOffsetContext.Loader loader = new MySqlOffsetContext.Loader(config);
Map<String, Object> savedOffset = // ... retrieved from offset backing store
MySqlOffsetContext offsetContext = loader.load(savedOffset);
// offsetContext now points to the exact binlog position for resuming CDC
Related Pages
- BinlogOffsetContext - Parent class providing base binlog offset functionality
- ConfigurableOffsetBackingStore - Offset backing store that persists and restores offset state
- DbzCdcEngineRunner Start - CDC engine runner that uses offset contexts for MySQL sources
- JniDbzSourceHandler RunJniDbzSourceThread - JNI bridge for Debezium source threads