Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Risingwavelabs Risingwave MySqlOffsetContext

From Leeroopedia
Revision as of 16:32, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Risingwavelabs_Risingwave_MySqlOffsetContext.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Property Value
Component CDC Source - Debezium MySQL Connector
Language Java
Lines 111
License Apache 2.0 (RisingWave Labs + Debezium Authors)
Repository risingwavelabs/risingwave

Overview

MySqlOffsetContext is a patched version of the Debezium MySqlOffsetContext class that extends BinlogOffsetContext<SourceInfo> to manage MySQL CDC offset state within RisingWave. This class tracks the binlog file name, position, GTID set, server ID, and other metadata needed to resume MySQL CDC streaming from a specific point.

The class provides two key capabilities:

  1. Static factory method (initial): Creates a fresh offset context starting from the beginning of the binlog, used for new CDC source creation.
  2. Loader inner class: Deserializes a previously saved offset map back into a fully reconstructed MySqlOffsetContext, used during failover recovery.

This file is placed in the io.debezium.connector.mysql package to override the upstream Debezium class, allowing RisingWave-specific customizations to MySQL offset handling.

Code Reference

Source Location

java/connector-node/risingwave-source-cdc/src/main/java/io/debezium/connector/mysql/MySqlOffsetContext.java

View on GitHub

Signature

public class MySqlOffsetContext extends BinlogOffsetContext<SourceInfo>

Key Methods

// Constructor accepting snapshot state, transaction context, incremental snapshot context, and source info
public MySqlOffsetContext(
        SnapshotType snapshot,
        boolean snapshotCompleted,
        TransactionContext transactionContext,
        IncrementalSnapshotContext<TableId> incrementalSnapshotContext,
        SourceInfo sourceInfo)

// Creates a fresh offset context starting from the beginning of the binlog
public static MySqlOffsetContext initial(MySqlConnectorConfig config)

Inner Class: Loader

public static class Loader extends BinlogOffsetContext.Loader<MySqlOffsetContext> {
    public Loader(MySqlConnectorConfig connectorConfig)

    // Reconstructs a MySqlOffsetContext from a saved offset map
    @Override
    public MySqlOffsetContext load(Map<String, ?> offset)
}

Imports

import static io.debezium.connector.common.OffsetUtils.longOffsetValue;
import io.debezium.connector.SnapshotType;
import io.debezium.connector.binlog.BinlogOffsetContext;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.TableId;
import java.util.Map;
import org.apache.kafka.connect.errors.ConnectException;

I/O Contract

Input

  • MySqlConnectorConfig: The MySQL connector configuration, used to determine read-only connection mode and to construct SourceInfo.
  • Offset map (for Loader): A Map<String, ?> containing previously persisted offset keys:
    • SourceInfo.BINLOG_FILENAME_OFFSET_KEY - The binlog filename (required)
    • SourceInfo.BINLOG_POSITION_OFFSET_KEY - The binlog position
    • EVENTS_TO_SKIP_OFFSET_KEY - Number of events to skip
    • SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY - Row position within the event
    • GTID_SET_KEY - The completed GTID set (optional)
    • SourceInfo.SERVER_ID_KEY - The server ID
    • TIMESTAMP_KEY - The offset timestamp

Output

  • initial(): Returns a new MySqlOffsetContext with binlog start point set to ("", 0L).
  • Loader.load(): Returns a fully populated MySqlOffsetContext reconstructed from the saved offset map.

Exceptions

  • ConnectException: Thrown by the Loader if the binlog filename is missing from the offset map.

Usage Examples

Creating an Initial Offset Context

MySqlConnectorConfig config = new MySqlConnectorConfig(connectorProperties);
MySqlOffsetContext offset = MySqlOffsetContext.initial(config);
// Offset starts from the beginning of the binlog ("", position 0)

Loading a Saved Offset for Recovery

MySqlConnectorConfig config = new MySqlConnectorConfig(connectorProperties);
MySqlOffsetContext.Loader loader = new MySqlOffsetContext.Loader(config);

Map<String, Object> savedOffset = // ... retrieved from offset backing store
MySqlOffsetContext offsetContext = loader.load(savedOffset);
// offsetContext now points to the exact binlog position for resuming CDC

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment