Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Risingwavelabs Risingwave OpendalSchemaHistory

From Leeroopedia
Revision as of 16:32, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Risingwavelabs_Risingwave_OpendalSchemaHistory.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains CDC, Connectors, Storage, Java
Last Updated 2026-02-09 07:00 GMT

Overview

Debezium schema history implementation backed by OpenDAL object storage, enabling persistent schema change tracking across CDC source restarts.

Description

OpendalSchemaHistory extends Debezium's AbstractFileBasedSchemaHistory to store and retrieve schema history records using RisingWave's JNI-based object store interface (putObject, getObject, listObject from the Binding class). This enables durable schema history storage for MySQL CDC sources using RisingWave's cloud-native object storage layer (such as S3, HDFS, or local filesystem via Hummock).

Schema history records are organized into numbered files following the pattern schema_history_{sequence}.dat under the directory rw-cdc-schema-history/source-{sourceId}. Each file can hold up to maxRecordsPerFile records (default 2048) before a new file is created with an incremented sequence number.

The class employs a caching strategy to minimize object store operations: the latest file path and its deserialized records are cached in memory. When appending new records, if the cached file has capacity, the record is added to the cached list and the entire file is rewritten. When the file is full, a new file is created with the next sequence number.

On startup (doStart()), all existing history files are listed, sorted by sequence number, and loaded sequentially. The cache is initialized from the last file to avoid redundant reads during subsequent writes.

Usage

This class is configured as the schema history implementation for MySQL CDC sources in RisingWave. It is referenced via Debezium configuration properties and is automatically used when RisingWave creates a CDC source for MySQL or MariaDB databases.

Code Reference

Source Location

  • Repository: risingwave
  • File: java/connector-node/risingwave-source-cdc/src/main/java/com/risingwave/connector/cdc/debezium/internal/OpendalSchemaHistory.java (L47-354)

Signature

public class OpendalSchemaHistory extends AbstractFileBasedSchemaHistory {

    public static final String SOURCE_ID = "schema.history.internal.source.id";
    public static final String MAX_RECORDS_PER_FILE_CONFIG =
            "schema.history.internal.max.records.per.file";

    @Override
    public void configure(Configuration config, HistoryRecordComparator comparator,
                          SchemaHistoryListener listener, boolean useCatalogBeforeSchema);

    @Override
    protected void doStart();

    @Override
    protected void doStoreRecord(HistoryRecord record);

    @Override
    public boolean storageExists();

    @Override
    public void initializeStorage();
}

Import

import com.risingwave.connector.cdc.debezium.internal.OpendalSchemaHistory;
import io.debezium.relational.history.AbstractFileBasedSchemaHistory;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.SchemaHistoryException;

I/O Contract

Inputs

Name Type Required Description
config Configuration Yes Debezium configuration containing schema.history.internal.source.id and optional schema.history.internal.max.records.per.file
comparator HistoryRecordComparator Yes Comparator for ordering history records during recovery
record HistoryRecord Yes (for doStoreRecord) A schema change history record to persist

Outputs

Name Type Description
records List<HistoryRecord> All loaded history records available after doStart() for schema recovery
storageExists boolean Returns true if at least one schema history file exists in object storage
Side effect Object store writes Schema history files are written to rw-cdc-schema-history/source-{id}/schema_history_{seq}.dat

Usage Examples

Debezium Configuration for OpenDAL Schema History

// Configuration properties set by RisingWave CDC engine
Properties props = new Properties();
props.put("schema.history.internal", "com.risingwave.connector.cdc.debezium.internal.OpendalSchemaHistory");
props.put("schema.history.internal.source.id", "12345");
props.put("schema.history.internal.max.records.per.file", "2048");

Object Store File Layout

rw-cdc-schema-history/
  source-12345/
    schema_history_1.dat   -- up to 2048 records
    schema_history_2.dat   -- next batch of records
    schema_history_3.dat   -- latest file (cached in memory)

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment