Implementation:ArroyoSystems Arroyo Debezium Extension
Overview
Debezium Extensions implement Change Data Capture (CDC) support in Arroyo's planner through two custom logical plan extensions: DebeziumUnrollingExtension for consuming Debezium-formatted change events, and ToDebeziumExtension for producing Debezium-formatted output.
Description
The module defines two extensions:
DebeziumUnrollingExtension: Unrolls Debezium change events (with "before", "after", and "op" fields) into insert/update/delete operations. It:- Takes a source with Debezium-formatted data as input
- Tracks primary keys for stateful update/delete handling
- Produces an output schema with the data fields plus an
_updatingmetadata field - Implements
ArroyoExtensionto create aDebeziumDecodeoperator node in the dataflow graph
ToDebeziumExtension: Converts regular updating streams back into Debezium format for output to CDC-compatible sinks. It:- Takes an updating stream as input
- Constructs the Debezium envelope schema with "before" (nullable struct), "after" (nullable struct), and "op" (string) fields
- Implements
ArroyoExtensionto create aDebeziumEncodeoperator node
The as_debezium_schema helper method constructs the Debezium envelope DFSchema by wrapping data fields in before/after structs and adding the op field.
Usage
DebeziumUnrollingExtension is inserted during source rewriting for tables configured with Debezium format. ToDebeziumExtension is inserted during sink rewriting for sinks that require Debezium output format.
Code Reference
Source Location
crates/arroyo-planner/src/extension/debezium.rs
Signature
pub(crate) const DEBEZIUM_UNROLLING_EXTENSION_NAME: &str = "DebeziumUnrollingExtension";
pub(crate) const TO_DEBEZIUM_EXTENSION_NAME: &str = "ToDebeziumExtension";
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct DebeziumUnrollingExtension {
input: LogicalPlan,
schema: DFSchemaRef,
pub primary_keys: Vec<usize>,
primary_key_names: Arc<Vec<String>>,
}
impl DebeziumUnrollingExtension {
pub(crate) fn as_debezium_schema(
input_schema: &DFSchemaRef,
qualifier: Option<TableReference>,
) -> Result<DFSchemaRef>
pub(crate) fn try_new(input: LogicalPlan, primary_keys: Vec<String>) -> Result<Self>
}
Import
use crate::extension::debezium::{
DebeziumUnrollingExtension, ToDebeziumExtension,
DEBEZIUM_UNROLLING_EXTENSION_NAME, TO_DEBEZIUM_EXTENSION_NAME,
};
I/O Contract
Inputs
| Name | Type | Description |
|---|---|---|
| input | LogicalPlan |
Source plan producing Debezium-formatted records (for unrolling) or updating records (for encoding) |
| primary_keys | Vec<String> |
Primary key column names for CDC state management |
Outputs
| Name | Type | Description |
|---|---|---|
| unrolled schema | DFSchemaRef |
Data fields with _updating metadata (from DebeziumUnrollingExtension) |
| debezium schema | DFSchemaRef |
Debezium envelope with before, after, op, and _timestamp fields (from ToDebeziumExtension) |
Usage Examples
// Creating a Debezium unrolling extension for a CDC source
let extension = DebeziumUnrollingExtension::try_new(
source_plan,
vec!["id".to_string()],
)?;
// Getting the Debezium envelope schema
let debezium_schema = DebeziumUnrollingExtension::as_debezium_schema(
&input_schema,
Some(TableReference::bare("my_table")),
)?;
Related Pages
- ArroyoSystems_Arroyo_Planner_Extensions - The ArroyoExtension trait implemented by both extensions
- ArroyoSystems_Arroyo_Table_Catalog - ConnectorTable that determines when Debezium is needed
- ArroyoSystems_Arroyo_Physical_Planner - DebeziumUnrollingExec and ToDebeziumExec physical plans