Implementation:Risingwavelabs Risingwave NoDataRecoverySnapshotter
| Property | Value |
|---|---|
| Component | CDC Source - Debezium Internal |
| Language | Java |
| Lines | 54 |
| License | Apache 2.0 |
| Repository | risingwavelabs/risingwave |
Overview
NoDataRecoverySnapshotter is a custom Debezium snapshotter that extends NoDataSnapshotter to provide schema recovery without performing a full data snapshot. In normal Debezium operation, a "no data" snapshot mode skips both data and schema snapshots. However, RisingWave needs the ability to recover schema history metadata (e.g., for MySQL databases that use historized schemas) without re-reading all the data rows.
This snapshotter overrides the shouldSnapshotSchema method to return true when the schema history does not yet exist or when no offset has been established, enabling Debezium to rebuild its internal schema representation. It also overrides shouldSnapshotOnSchemaError to return true, ensuring that schema errors trigger a recovery snapshot rather than a failure.
Code Reference
Source Location
java/connector-node/risingwave-source-cdc/src/main/java/com/risingwave/connector/cdc/debezium/internal/NoDataRecoverySnapshotter.java
Signature
public class NoDataRecoverySnapshotter extends NoDataSnapshotter
Key Methods
// Returns the snapshotter name identifier: "no_data_recovery"
@Override
public String name()
// Determines whether schema should be snapshotted based on offset existence and schema history
@Override
public boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress)
// Always returns true to recover from schema errors
@Override
public boolean shouldSnapshotOnSchemaError()
Imports
import io.debezium.bean.StandardBeanNames;
import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.HistorizedDatabaseSchema;
import io.debezium.snapshot.mode.NoDataSnapshotter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
I/O Contract
Input
- offsetExists (boolean): Whether a previously stored offset exists for this CDC source.
- snapshotInProgress (boolean): Whether a snapshot is currently in progress.
- beanRegistry: Inherited from the parent class, provides access to the
DatabaseSchemabean for checking whether the schema is historized and whether schema history exists.
Output
- shouldSnapshotSchema: Returns
trueif:- The database schema is historized AND no offset exists or a snapshot is in progress, OR
- The database schema is historized AND the schema history storage does not yet exist.
- Returns
falseif the database schema is not historized.
- shouldSnapshotOnSchemaError: Always returns
true. - name(): Returns the string
"no_data_recovery".
Side Effects
- Logging: Logs the result of
shouldSnapshotSchemaat DEBUG level when checking schema history existence.
Usage Examples
Registering the Snapshotter in Debezium Configuration
// In Debezium connector configuration
props.put("snapshot.mode.custom.name", "no_data_recovery");
// The snapshotter is discovered via Debezium's SPI mechanism
Schema Recovery Decision Logic
// When offset does not exist (fresh start), schema snapshot is taken
snapshotter.shouldSnapshotSchema(false, false); // -> true (for historized schemas)
// When offset exists and schema history exists, no schema snapshot needed
snapshotter.shouldSnapshotSchema(true, false); // -> false (history already present)
// When offset exists but schema history is missing (recovery scenario)
snapshotter.shouldSnapshotSchema(true, false); // -> true (history needs rebuilding)
Related Pages
- ConfigurableOffsetBackingStore - Companion offset store used during CDC recovery
- DbzCdcEngineRunner Start - CDC engine runner that configures snapshot modes
- BinlogOffsetContext - Offset context related to MySQL binlog-based CDC