Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Risingwavelabs Risingwave BinlogOffsetContext

From Leeroopedia


Knowledge Sources
Domains CDC, Connectors, MySQL, Java
Last Updated 2026-02-09 07:00 GMT

Overview

Patched Debezium BinlogOffsetContext that tracks MySQL/MariaDB binlog position, GTID sets, and transaction state for CDC offset management.

Description

BinlogOffsetContext is a concrete implementation of Debezium's CommonOffsetContext for binlog-based connectors (MySQL and MariaDB). It manages the state needed to track the current position within a MySQL binary log stream, including:

  • Binlog file and position: The restart binlog filename and byte position for resuming after restarts.
  • GTID sets: Both the current GTID set (including in-progress transactions) and the restart GTID set (only completed transactions) for Global Transaction ID-based replication.
  • Transaction tracking: Whether the connector is currently inside a transaction, the transaction ID (either GTID or file/position based), and counts of events and rows to skip upon restart.
  • Snapshot state: Whether a snapshot is in progress or completed, managed via the parent class.

The class provides methods for the binlog reader to update position state as events are processed: setBinlogStartPoint() sets the initial reading position, startNextTransaction() and commitTransaction() manage transaction boundaries, setEventPosition() tracks individual event positions, and setRowNumber() handles multi-row events.

The getOffset() method produces the Kafka Connect-compatible offset map that includes server ID, GTID set, binlog filename, position, events to skip, and timestamp. This offset map is used for checkpoint/recovery.

This is a RisingWave-patched override of the upstream Debezium class, enabling RisingWave-specific binlog offset tracking behavior.

Usage

This class is used internally by the MySQL/MariaDB CDC connector to track binlog positions during streaming replication. It is instantiated by the connector's offset context loader and updated by the binlog event reader as events are processed.

Code Reference

Source Location

  • Repository: risingwave
  • File: java/connector-node/risingwave-source-cdc/src/main/java/io/debezium/connector/binlog/BinlogOffsetContext.java (L51-430)

Signature

public class BinlogOffsetContext<T extends BinlogSourceInfo> extends CommonOffsetContext<T> {

    public static final String EVENTS_TO_SKIP_OFFSET_KEY = "event";
    public static final String TIMESTAMP_KEY = "ts_sec";
    public static final String GTID_SET_KEY = "gtids";
    public static final String NON_GTID_TRANSACTION_ID_FORMAT = "file=%s,pos=%s";

    public BinlogOffsetContext(SnapshotType snapshot, boolean snapshotCompleted,
            TransactionContext transactionContext,
            IncrementalSnapshotContext<TableId> incrementalSnapshotContext, T sourceInfo);

    @Override
    public Map<String, ?> getOffset();

    public void setBinlogStartPoint(String binlogFilename, long positionOfFirstEvent);
    public void setCompletedGtidSet(String gtidSet);
    public void startGtid(String gtid, String gtidSet);
    public void startNextTransaction();
    public void commitTransaction();
    public void completeEvent();
    public void setEventPosition(long positionOfCurrentEvent, long eventSizeInBytes);
    public void setRowNumber(int eventRowNumber, int totalNumberOfRows);
    public String gtidSet();
    public T getSource();
}

Import

import io.debezium.connector.binlog.BinlogOffsetContext;
import io.debezium.connector.binlog.BinlogSourceInfo;
import io.debezium.pipeline.CommonOffsetContext;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;

I/O Contract

Inputs

Name Type Required Description
snapshot SnapshotType No The snapshot type if a snapshot is in progress
snapshotCompleted boolean Yes Whether the initial snapshot has been completed
transactionContext TransactionContext Yes Transaction metadata context for tracking transaction boundaries
incrementalSnapshotContext IncrementalSnapshotContext<TableId> Yes Context for incremental snapshot state
sourceInfo T (extends BinlogSourceInfo) Yes Source information including binlog file, position, server ID

Outputs

Name Type Description
offset Map<String, ?> Kafka Connect offset map containing server_id, gtids, binlog filename, position, events to skip, and timestamp
sourceInfoSchema Schema Kafka Connect schema for the source information struct
gtidSet String The current GTID set as a string representation

Usage Examples

Tracking Binlog Position During Streaming

// Set starting position
offsetContext.setBinlogStartPoint("mysql-bin.000003", 154);
offsetContext.setCompletedGtidSet("3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5");

// Process a transaction
offsetContext.startNextTransaction();
offsetContext.setEventPosition(1024, 256);
offsetContext.setRowNumber(0, 3);
offsetContext.completeEvent();
offsetContext.commitTransaction();

// Get the offset for checkpointing
Map<String, ?> offset = offsetContext.getOffset();
// offset contains: {server_id=1, gtids="...", file="mysql-bin.000003", pos=1280, ts_sec=...}

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment