Implementation:Risingwavelabs Risingwave BinlogOffsetContext
| Knowledge Sources | |
|---|---|
| Domains | CDC, Connectors, MySQL, Java |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
Patched Debezium BinlogOffsetContext that tracks MySQL/MariaDB binlog position, GTID sets, and transaction state for CDC offset management.
Description
BinlogOffsetContext is a concrete implementation of Debezium's CommonOffsetContext for binlog-based connectors (MySQL and MariaDB). It manages the state needed to track the current position within a MySQL binary log stream, including:
- Binlog file and position: The restart binlog filename and byte position for resuming after restarts.
- GTID sets: Both the current GTID set (including in-progress transactions) and the restart GTID set (only completed transactions) for Global Transaction ID-based replication.
- Transaction tracking: Whether the connector is currently inside a transaction, the transaction ID (either GTID or file/position based), and counts of events and rows to skip upon restart.
- Snapshot state: Whether a snapshot is in progress or completed, managed via the parent class.
The class provides methods for the binlog reader to update position state as events are processed: setBinlogStartPoint() sets the initial reading position, startNextTransaction() and commitTransaction() manage transaction boundaries, setEventPosition() tracks individual event positions, and setRowNumber() handles multi-row events.
The getOffset() method produces the Kafka Connect-compatible offset map that includes server ID, GTID set, binlog filename, position, events to skip, and timestamp. This offset map is used for checkpoint/recovery.
This is a RisingWave-patched override of the upstream Debezium class, enabling RisingWave-specific binlog offset tracking behavior.
Usage
This class is used internally by the MySQL/MariaDB CDC connector to track binlog positions during streaming replication. It is instantiated by the connector's offset context loader and updated by the binlog event reader as events are processed.
Code Reference
Source Location
- Repository: risingwave
- File: java/connector-node/risingwave-source-cdc/src/main/java/io/debezium/connector/binlog/BinlogOffsetContext.java (L51-430)
Signature
public class BinlogOffsetContext<T extends BinlogSourceInfo> extends CommonOffsetContext<T> {
public static final String EVENTS_TO_SKIP_OFFSET_KEY = "event";
public static final String TIMESTAMP_KEY = "ts_sec";
public static final String GTID_SET_KEY = "gtids";
public static final String NON_GTID_TRANSACTION_ID_FORMAT = "file=%s,pos=%s";
public BinlogOffsetContext(SnapshotType snapshot, boolean snapshotCompleted,
TransactionContext transactionContext,
IncrementalSnapshotContext<TableId> incrementalSnapshotContext, T sourceInfo);
@Override
public Map<String, ?> getOffset();
public void setBinlogStartPoint(String binlogFilename, long positionOfFirstEvent);
public void setCompletedGtidSet(String gtidSet);
public void startGtid(String gtid, String gtidSet);
public void startNextTransaction();
public void commitTransaction();
public void completeEvent();
public void setEventPosition(long positionOfCurrentEvent, long eventSizeInBytes);
public void setRowNumber(int eventRowNumber, int totalNumberOfRows);
public String gtidSet();
public T getSource();
}
Import
import io.debezium.connector.binlog.BinlogOffsetContext;
import io.debezium.connector.binlog.BinlogSourceInfo;
import io.debezium.pipeline.CommonOffsetContext;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| snapshot | SnapshotType | No | The snapshot type if a snapshot is in progress |
| snapshotCompleted | boolean | Yes | Whether the initial snapshot has been completed |
| transactionContext | TransactionContext | Yes | Transaction metadata context for tracking transaction boundaries |
| incrementalSnapshotContext | IncrementalSnapshotContext<TableId> | Yes | Context for incremental snapshot state |
| sourceInfo | T (extends BinlogSourceInfo) | Yes | Source information including binlog file, position, server ID |
Outputs
| Name | Type | Description |
|---|---|---|
| offset | Map<String, ?> | Kafka Connect offset map containing server_id, gtids, binlog filename, position, events to skip, and timestamp |
| sourceInfoSchema | Schema | Kafka Connect schema for the source information struct |
| gtidSet | String | The current GTID set as a string representation |
Usage Examples
Tracking Binlog Position During Streaming
// Set starting position
offsetContext.setBinlogStartPoint("mysql-bin.000003", 154);
offsetContext.setCompletedGtidSet("3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5");
// Process a transaction
offsetContext.startNextTransaction();
offsetContext.setEventPosition(1024, 256);
offsetContext.setRowNumber(0, 3);
offsetContext.completeEvent();
offsetContext.commitTransaction();
// Get the offset for checkpointing
Map<String, ?> offset = offsetContext.getOffset();
// offset contains: {server_id=1, gtids="...", file="mysql-bin.000003", pos=1280, ts_sec=...}