Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:ArroyoSystems Arroyo Debezium Extension

From Leeroopedia


Overview

Debezium Extensions implement Change Data Capture (CDC) support in Arroyo's planner through two custom logical plan extensions: DebeziumUnrollingExtension for consuming Debezium-formatted change events, and ToDebeziumExtension for producing Debezium-formatted output.

Description

The module defines two extensions:

  • DebeziumUnrollingExtension: Unrolls Debezium change events (with "before", "after", and "op" fields) into insert/update/delete operations. It:
    • Takes a source with Debezium-formatted data as input
    • Tracks primary keys for stateful update/delete handling
    • Produces an output schema with the data fields plus an _updating metadata field
    • Implements ArroyoExtension to create a DebeziumDecode operator node in the dataflow graph
  • ToDebeziumExtension: Converts regular updating streams back into Debezium format for output to CDC-compatible sinks. It:
    • Takes an updating stream as input
    • Constructs the Debezium envelope schema with "before" (nullable struct), "after" (nullable struct), and "op" (string) fields
    • Implements ArroyoExtension to create a DebeziumEncode operator node

The as_debezium_schema helper method constructs the Debezium envelope DFSchema by wrapping data fields in before/after structs and adding the op field.

Usage

DebeziumUnrollingExtension is inserted during source rewriting for tables configured with Debezium format. ToDebeziumExtension is inserted during sink rewriting for sinks that require Debezium output format.

Code Reference

Source Location

crates/arroyo-planner/src/extension/debezium.rs

Signature

pub(crate) const DEBEZIUM_UNROLLING_EXTENSION_NAME: &str = "DebeziumUnrollingExtension";
pub(crate) const TO_DEBEZIUM_EXTENSION_NAME: &str = "ToDebeziumExtension";

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct DebeziumUnrollingExtension {
    input: LogicalPlan,
    schema: DFSchemaRef,
    pub primary_keys: Vec<usize>,
    primary_key_names: Arc<Vec<String>>,
}

impl DebeziumUnrollingExtension {
    pub(crate) fn as_debezium_schema(
        input_schema: &DFSchemaRef,
        qualifier: Option<TableReference>,
    ) -> Result<DFSchemaRef>
    pub(crate) fn try_new(input: LogicalPlan, primary_keys: Vec<String>) -> Result<Self>
}

Import

use crate::extension::debezium::{
    DebeziumUnrollingExtension, ToDebeziumExtension,
    DEBEZIUM_UNROLLING_EXTENSION_NAME, TO_DEBEZIUM_EXTENSION_NAME,
};

I/O Contract

Inputs

Name Type Description
input LogicalPlan Source plan producing Debezium-formatted records (for unrolling) or updating records (for encoding)
primary_keys Vec<String> Primary key column names for CDC state management

Outputs

Name Type Description
unrolled schema DFSchemaRef Data fields with _updating metadata (from DebeziumUnrollingExtension)
debezium schema DFSchemaRef Debezium envelope with before, after, op, and _timestamp fields (from ToDebeziumExtension)

Usage Examples

// Creating a Debezium unrolling extension for a CDC source
let extension = DebeziumUnrollingExtension::try_new(
    source_plan,
    vec!["id".to_string()],
)?;

// Getting the Debezium envelope schema
let debezium_schema = DebeziumUnrollingExtension::as_debezium_schema(
    &input_schema,
    Some(TableReference::bare("my_table")),
)?;

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment