Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Risingwavelabs Risingwave BinlogHistoryRecordComparator

From Leeroopedia


Knowledge Sources
Domains CDC, Connectors, MySQL, Java
Last Updated 2026-02-09 07:00 GMT

Overview

Patched Debezium history record comparator for binlog-based CDC sources, used to determine the ordering of schema history records during recovery.

Description

BinlogHistoryRecordComparator is an abstract class that extends Debezium's HistoryRecordComparator to provide binlog-specific position comparison logic. It is used during schema history recovery to determine which schema history records are at or before a desired position in the binlog stream.

The core method isPositionAtOrBefore(Document recorded, Document desired) implements a multi-level comparison strategy:

  1. GTID-based comparison: When both positions have GTID sets, the comparator creates GtidSet objects and checks if the recorded set is contained within the desired set. If the GTID sets are equal, it further compares event counts to determine ordering. An optional gtidSourceFilter predicate can filter GTID entries by server UUID.
  2. GTID presence asymmetry: If only the desired position has a GTID, the recorded is assumed to be older (since GTIDs are typically enabled and not disabled). If only the recorded has a GTID, it is assumed to be newer.
  3. Server ID comparison: When neither position has GTIDs and server IDs differ, timestamps are compared as a fallback.
  4. Binlog file comparison: Using the inner BinlogFileName class, which parses filenames like mysql-bin.000003 into a base name and numeric extension for proper ordering.
  5. Position comparison: Byte position within the same binlog file.
  6. Event count and row number: For positions within the same binlog event, events-to-skip and row-in-event values provide the finest granularity.

The inner BinlogFileName class parses binlog filenames by splitting on the last dot to extract the base name and numeric extension, enabling correct lexicographic ordering of binlog files.

Usage

This abstract class is extended by MySQL-specific and MariaDB-specific comparators. It is provided to the schema history implementation during configuration and used during schema recovery to replay only the relevant schema changes.

Code Reference

Source Location

  • Repository: risingwave
  • File: java/connector-node/risingwave-source-cdc/src/main/java/io/debezium/connector/binlog/history/BinlogHistoryRecordComparator.java (L41-304)

Signature

public abstract class BinlogHistoryRecordComparator extends HistoryRecordComparator {

    public BinlogHistoryRecordComparator(
            Predicate<String> gtidSourceFilter, GtidSetFactory gtidSetFactory);

    @Override
    public boolean isPositionAtOrBefore(Document recorded, Document desired);

    protected String getGtidSet(Document document);
    protected int getServerId(Document document);
    protected boolean isSnapshot(Document document);
    protected long getTimestamp(Document document);
    protected BinlogFileName getBinlogFileName(Document document);
    protected int getBinlogPosition(Document document);
    protected int getEventsToSkip(Document document);
    protected int getBinlogRowInEvent(Document document);
}

Import

import io.debezium.connector.binlog.history.BinlogHistoryRecordComparator;
import io.debezium.connector.binlog.BinlogOffsetContext;
import io.debezium.connector.binlog.BinlogSourceInfo;
import io.debezium.connector.binlog.gtid.GtidSet;
import io.debezium.connector.binlog.gtid.GtidSetFactory;
import io.debezium.document.Document;
import io.debezium.relational.history.HistoryRecordComparator;

I/O Contract

Inputs

Name Type Required Description
gtidSourceFilter Predicate<String> No Optional filter to select relevant GTID source UUIDs
gtidSetFactory GtidSetFactory Yes Factory for creating GtidSet instances from string representations
recorded Document Yes (for isPositionAtOrBefore) The position from a recorded schema history entry
desired Document Yes (for isPositionAtOrBefore) The target position to compare against

Outputs

Name Type Description
isPositionAtOrBefore boolean Returns true if the recorded position is at or before the desired position in the binlog stream

Usage Examples

Position Comparison with GTID Sets

// Comparator created by MySQL connector with GTID filter and factory
BinlogHistoryRecordComparator comparator =
        new MySqlHistoryRecordComparator(gtidSourceFilter, gtidSetFactory);

// Compare two positions during schema history recovery
Document recorded = Document.create();
recorded.set("gtids", "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5");
recorded.set("file", "mysql-bin.000003");
recorded.set("pos", 1024);

Document desired = Document.create();
desired.set("gtids", "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-10");
desired.set("file", "mysql-bin.000003");
desired.set("pos", 2048);

boolean result = comparator.isPositionAtOrBefore(recorded, desired);
// result == true (recorded GTID set is subset of desired)

BinlogFileName Parsing

// Parse and compare binlog filenames
BinlogFileName file1 = BinlogFileName.of("mysql-bin.000003");
BinlogFileName file2 = BinlogFileName.of("mysql-bin.000005");
int comparison = file1.compareTo(file2);
// comparison < 0 (file1 is before file2)

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment