Implementation:Risingwavelabs Risingwave OpendalSchemaHistory
| Knowledge Sources | |
|---|---|
| Domains | CDC, Connectors, Storage, Java |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
Debezium schema history implementation backed by OpenDAL object storage, enabling persistent schema change tracking across CDC source restarts.
Description
OpendalSchemaHistory extends Debezium's AbstractFileBasedSchemaHistory to store and retrieve schema history records using RisingWave's JNI-based object store interface (putObject, getObject, listObject from the Binding class). This enables durable schema history storage for MySQL CDC sources using RisingWave's cloud-native object storage layer (such as S3, HDFS, or local filesystem via Hummock).
Schema history records are organized into numbered files following the pattern schema_history_{sequence}.dat under the directory rw-cdc-schema-history/source-{sourceId}. Each file can hold up to maxRecordsPerFile records (default 2048) before a new file is created with an incremented sequence number.
The class employs a caching strategy to minimize object store operations: the latest file path and its deserialized records are cached in memory. When appending new records, if the cached file has capacity, the record is added to the cached list and the entire file is rewritten. When the file is full, a new file is created with the next sequence number.
On startup (doStart()), all existing history files are listed, sorted by sequence number, and loaded sequentially. The cache is initialized from the last file to avoid redundant reads during subsequent writes.
Usage
This class is configured as the schema history implementation for MySQL CDC sources in RisingWave. It is referenced via Debezium configuration properties and is automatically used when RisingWave creates a CDC source for MySQL or MariaDB databases.
Code Reference
Source Location
- Repository: risingwave
- File: java/connector-node/risingwave-source-cdc/src/main/java/com/risingwave/connector/cdc/debezium/internal/OpendalSchemaHistory.java (L47-354)
Signature
public class OpendalSchemaHistory extends AbstractFileBasedSchemaHistory {
public static final String SOURCE_ID = "schema.history.internal.source.id";
public static final String MAX_RECORDS_PER_FILE_CONFIG =
"schema.history.internal.max.records.per.file";
@Override
public void configure(Configuration config, HistoryRecordComparator comparator,
SchemaHistoryListener listener, boolean useCatalogBeforeSchema);
@Override
protected void doStart();
@Override
protected void doStoreRecord(HistoryRecord record);
@Override
public boolean storageExists();
@Override
public void initializeStorage();
}
Import
import com.risingwave.connector.cdc.debezium.internal.OpendalSchemaHistory;
import io.debezium.relational.history.AbstractFileBasedSchemaHistory;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.SchemaHistoryException;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| config | Configuration | Yes | Debezium configuration containing schema.history.internal.source.id and optional schema.history.internal.max.records.per.file |
| comparator | HistoryRecordComparator | Yes | Comparator for ordering history records during recovery |
| record | HistoryRecord | Yes (for doStoreRecord) | A schema change history record to persist |
Outputs
| Name | Type | Description |
|---|---|---|
| records | List<HistoryRecord> | All loaded history records available after doStart() for schema recovery |
| storageExists | boolean | Returns true if at least one schema history file exists in object storage |
| Side effect | Object store writes | Schema history files are written to rw-cdc-schema-history/source-{id}/schema_history_{seq}.dat |
Usage Examples
Debezium Configuration for OpenDAL Schema History
// Configuration properties set by RisingWave CDC engine
Properties props = new Properties();
props.put("schema.history.internal", "com.risingwave.connector.cdc.debezium.internal.OpendalSchemaHistory");
props.put("schema.history.internal.source.id", "12345");
props.put("schema.history.internal.max.records.per.file", "2048");
Object Store File Layout
rw-cdc-schema-history/
source-12345/
schema_history_1.dat -- up to 2048 records
schema_history_2.dat -- next batch of records
schema_history_3.dat -- latest file (cached in memory)