Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:MaterializeInc Materialize Data Ingest Framework

From Leeroopedia


Knowledge Sources
Domains Testing, Data_Ingestion, Multi_Backend
Last Updated 2026-02-08 00:00 GMT

Overview

Framework for generating and executing multi-backend data ingestion workloads against Materialize, supporting Kafka, PostgreSQL, MySQL, SQL Server, and direct SQL backends.

Description

The Data Ingest Framework is a Python-based test infrastructure module within the Materialize repository that enables automated, randomized data ingestion testing across multiple database backends. The framework decouples workload definitions (what data operations to perform) from executor implementations (how to perform them on a specific backend), allowing the same logical test scenario to be replayed against Kafka (via Avro-serialized topics), PostgreSQL (via CDC replication), MySQL (via CDC replication), SQL Server (via CDC replication), and Materialize tables directly (via Kafka roundtrip through sinks and sources).

The module generates randomized schemas with typed fields, produces rows with random or deterministic values according to configurable record sizes (TINY, SMALL, MEDIUM, LARGE), and orchestrates INSERT, UPSERT, and DELETE operations in transactional batches. After execution, results from each backend's Materialize source are compared against a PostgreSQL ground truth to verify correctness.

Usage

Use this framework when writing or running integration tests that need to verify Materialize correctly ingests data from external systems. It is particularly useful for:

  • Consistency testing: Verifying that Materialize sources reflect the same final state as a reference PostgreSQL database after a series of mutations.
  • Disruption testing: Validating data correctness after Materialize restarts or zero-downtime deployments.
  • Multi-backend regression: Ensuring changes to Materialize do not break ingestion from any supported source system.
  • Type coverage: Exercising the full range of Materialize-supported data types across backends with boundary values and edge cases.

Code Reference

Source Location

  • Repository: MaterializeInc_Materialize
  • File: misc/python/materialize/data_ingest/ (directory)
    • data_type.py (993 lines) - Data type definitions and backend-specific name mappings
    • executor.py (867 lines) - Backend executors (Kafka, PostgreSQL, MySQL, SQL Server, Kafka Roundtrip)
    • workload.py (353 lines) - Workload orchestration and predefined test scenarios
    • definition.py (161 lines) - Workload operation patterns (Insert, Upsert, Delete)
    • field.py (49 lines) - Field schema representation
    • row.py (33 lines) - Row data container with operation type
    • rowlist.py (22 lines) - Row collection container
    • transaction.py (23 lines) - Transaction wrapper over row lists
    • transaction_def.py (166 lines) - Transaction definitions, RestartMz, ZeroDowntimeDeploy

Signature

# data_type.py - Enums and base class
class RecordSize(Enum):       # TINY=1, SMALL=2, MEDIUM=3, LARGE=4
class Backend(Enum):           # AVRO=1, JSON=2, POSTGRES=3, MYSQL=4, SQL_SERVER=5, MATERIALIZE=6
class DataType:
    def random_value(rng, record_size, in_query) -> Any
    def numeric_value(num, in_query) -> Any
    def name(backend) -> str

# Concrete DataType subclasses:
# Boolean, SmallInt, Int, Long, UInt2, UInt4, UInt8, Float, Double,
# Numeric, Numeric383, Text, Bytea, UUID, Jsonb, TextTextMap,
# IntArray, IntList, Timestamp, MzTimestamp, Date, Time, Interval, Oid

# field.py
class Field:
    def __init__(self, name: str, data_type: type[DataType], is_key: bool, value_fn=None)

# row.py
class Operation(Enum):        # INSERT=1, UPSERT=2, DELETE=3
class Row:
    def __init__(self, fields: list[Field], values: list[Any], operation: Operation)

# rowlist.py
class RowList:
    def __init__(self, rows: list[Row])

# transaction.py
class Transaction:
    def __init__(self, row_lists: list[RowList])

# definition.py
class Records(Enum):           # ALL=0, ONE=1, HUNDRED=100, SOME=1000, MANY=1000000
class Keyspace(Enum):          # SINGLE_VALUE=1, LARGE=2, EXISTING=3
class Definition:
    def generate(self, fields: list[Field]) -> Iterator[RowList]
class Insert(Definition):
    def __init__(self, count: Records, record_size: RecordSize)
class Upsert(Definition):
    def __init__(self, keyspace: Keyspace, count: Records, record_size: RecordSize)
class Delete(Definition):
    def __init__(self, number_of_records: Records, record_size: RecordSize, num: int | None)

# transaction_def.py
class TransactionSize(Enum):   # SINGLE_OPERATION=1, HUGE=1_000_000_000
class TransactionDef:
    def __init__(self, operations: list[Definition], size: TransactionSize)
    def generate(self, fields: list[Field]) -> Iterator[Transaction | None]
class RestartMz(TransactionDef):
    def __init__(self, composition, probability, workload, azurite)
class ZeroDowntimeDeploy(TransactionDef):
    def __init__(self, composition, probability, workload, azurite)

# executor.py
class Executor:
    def __init__(self, ports, fields, database, schema, cluster, mz_service, composition)
    def create(self, logging_exe=None) -> None
    def run(self, transaction: Transaction, logging_exe=None) -> None
class PrintExecutor(Executor)
class KafkaExecutor(Executor)
class PgExecutor(Executor)
class MySqlExecutor(Executor)
class SqlServerExecutor(Executor)
class KafkaRoundtripExecutor(Executor)

# workload.py
class Workload:
    def __init__(self, azurite, composition, mz_service, deploy_generation)
    def generate(self, fields: list[Field]) -> Iterator[Transaction]
class SingleSensorUpdating(Workload)
class SingleSensorUpdatingDisruptions(Workload)
class SingleSensorUpdating0dtDeploy(Workload)
class DeleteDataAtEndOfDay(Workload)
class DeleteDataAtEndOfDayDisruptions(Workload)
class DeleteDataAtEndOfDay0dtDeploys(Workload)

Import

from materialize.data_ingest.data_type import DataType, RecordSize, Backend
from materialize.data_ingest.executor import KafkaExecutor, PgExecutor, MySqlExecutor, SqlServerExecutor, KafkaRoundtripExecutor
from materialize.data_ingest.workload import Workload
from materialize.data_ingest.definition import Insert, Upsert, Delete, Records, Keyspace
from materialize.data_ingest.field import Field
from materialize.data_ingest.row import Row, Operation
from materialize.data_ingest.rowlist import RowList
from materialize.data_ingest.transaction import Transaction
from materialize.data_ingest.transaction_def import TransactionDef, RestartMz, ZeroDowntimeDeploy

I/O Contract

Inputs

Component Input Type Description
Field name str Column name for the schema field
Field data_type type[DataType] One of the 24 DataType subclasses (Boolean, SmallInt, Int, Long, etc.)
Field is_key bool Whether this field is part of the primary key
DataType.random_value rng random.Random Random number generator for reproducible runs
DataType.random_value record_size RecordSize Controls value magnitude: TINY, SMALL, MEDIUM, LARGE
DataType.random_value in_query bool When True, returns SQL-cast literals (e.g., 42::int); when False, returns raw Python values
DataType.name backend Backend Returns the type name appropriate for the target backend (AVRO, JSON, POSTGRES, MYSQL, SQL_SERVER, MATERIALIZE)
Insert count Records Number of rows to insert: ONE (1), HUNDRED (100), SOME (1,000), MANY (1,000,000)
Insert record_size RecordSize Size category for generated values
Upsert keyspace Keyspace SINGLE_VALUE (always key 0), LARGE, or EXISTING
Delete number_of_records Records How many records to delete; ALL requires a num parameter
TransactionDef operations list[Definition] Ordered list of Insert/Upsert/Delete definitions to compose into transactions
TransactionDef size TransactionSize SINGLE_OPERATION (one RowList per Transaction) or HUGE (up to 1 billion RowLists batched)
Executor ports dict[str, int] Service-name-to-port mapping (e.g., {"materialized": 6875, "kafka": 9092})
Executor fields list[Field] Schema fields for table creation and data generation
Workload azurite bool Whether to use Azure blob storage emulator
execute_workload runtime int Maximum seconds to run the workload before stopping

Outputs

Component Output Type Description
Definition.generate row lists Iterator[RowList] Stream of RowList objects, each containing one or more Row instances with field values and operation type
TransactionDef.generate transactions None] Stream of Transaction objects (batched RowLists) or None (for disruption operations like RestartMz)
Workload.generate transactions Iterator[Transaction] Infinite cycle of transactions from the workload's cycle of TransactionDefs
Executor.create side effects None Creates backend-specific tables, sources, connections, topics, and schemas
Executor.run side effects None Executes INSERT/UPSERT/DELETE operations against the backend
execute_workload assertion None Raises ValueError if Materialize query results differ from PostgreSQL ground truth

Data Type Backend Mappings

DataType Materialize Avro JSON PostgreSQL MySQL SQL Server
SmallInt smallint int integer smallint smallint smallint
Int int int integer int int int
Long bigint long integer bigint bigint bigint
UInt2 uint2 int integer numeric smallint unsigned uint2
UInt4 uint4 int integer numeric int unsigned uint4
UInt8 uint8 long integer numeric bigint unsigned uint8
Float float4 float number float4 float4 real
Double float8 double number float8 float8 float
Text text string string text text varchar(1024)
Timestamp timestamp unsupported unsupported timestamp timestamp datetime2

Architecture

Layer Structure

The framework is organized into four layers:

1. Type Layer (data_type.py, field.py): Defines 24 DataType subclasses, each providing random_value(), numeric_value(), and name() methods. The name() method returns backend-specific type names. Pre-built filter sets (DATA_TYPES_FOR_AVRO, DATA_TYPES_FOR_MYSQL, DATA_TYPES_FOR_SQL_SERVER, DATA_TYPES_FOR_KEY) exclude unsupported types per backend. Field binds a name, data type, and key/value role together.

2. Data Layer (row.py, rowlist.py, transaction.py): A Row couples a list of fields with their concrete values and an Operation enum (INSERT, UPSERT, DELETE). RowList groups rows that belong to the same logical operation step. Transaction groups RowLists into an atomic unit of work.

3. Definition Layer (definition.py, transaction_def.py): Definition subclasses (Insert, Upsert, Delete) generate streams of RowLists from field schemas. TransactionDef batches Definition outputs into Transactions of configurable size. Special TransactionDef subclasses RestartMz and ZeroDowntimeDeploy inject infrastructure disruptions (killing/restarting Materialize instances, performing rolling deployments) between data operations.

4. Execution Layer (executor.py, workload.py): Executor subclasses translate Transactions into backend-specific DDL and DML. Workload subclasses define reusable test scenarios as cycles of TransactionDefs. The execute_workload() function wires everything together: it generates random schemas, creates executors, runs the workload for a specified duration, then compares Materialize results against a PostgreSQL ground truth.

Executor Details

Executor Backend Ingestion Path Key Behavior
KafkaExecutor Kafka (Avro) Produces Avro-serialized key/value messages to a Kafka topic; Materialize reads via CREATE SOURCE ... FROM KAFKA ... ENVELOPE UPSERT Deletes send null values for the key; uses Schema Registry for Avro schemas
PgExecutor PostgreSQL Writes to a Postgres table; Materialize reads via CREATE SOURCE ... FROM POSTGRES CONNECTION ... (PUBLICATION ...) Creates publication, sets REPLICA IDENTITY FULL; uses ON CONFLICT ... DO UPDATE for upserts
MySqlExecutor MySQL Writes to a MySQL table; Materialize reads via CREATE SOURCE ... FROM MYSQL CONNECTION ... Uses ON DUPLICATE KEY UPDATE for upserts; connects via pymysql
SqlServerExecutor SQL Server Writes to a SQL Server table with CDC enabled; Materialize reads via CREATE SOURCE ... FROM SQL SERVER CONNECTION ... Uses MERGE ... WHEN MATCHED THEN UPDATE WHEN NOT MATCHED THEN INSERT for upserts; uses testdrive for SQL execution
KafkaRoundtripExecutor Materialize (via Kafka sink/source) Writes to a Materialize table, sinks to Kafka via ENVELOPE DEBEZIUM, then reads back via a new Kafka source Tests full roundtrip through Materialize sinks and sources
PrintExecutor stdout Prints transactions to console Used for debugging when verbose mode is enabled

Predefined Workloads

Workload Description Cycle
SingleSensorUpdating Simulates a single sensor continuously updating its value Upsert with SINGLE_VALUE keyspace, one record per transaction
SingleSensorUpdatingDisruptions Same as above with Mz restart disruptions (10% probability) Upsert + RestartMz
SingleSensorUpdating0dtDeploy Same as above with zero-downtime deployment disruptions (10% probability) Upsert + ZeroDowntimeDeploy
DeleteDataAtEndOfDay Inserts many records, then deletes all of them in bulk Insert phase (HUGE transactions) followed by Delete ALL phase
DeleteDataAtEndOfDayDisruptions Same as above with Mz restart disruptions Insert + Delete + RestartMz
DeleteDataAtEndOfDay0dtDeploys Same as above with zero-downtime deployment disruptions Insert + Delete + ZeroDowntimeDeploy

Usage Examples

Running a Workload Programmatically

from materialize.data_ingest.workload import execute_workload, SingleSensorUpdating
from materialize.data_ingest.executor import KafkaExecutor, MySqlExecutor

workload = SingleSensorUpdating(azurite=False, composition=composition)
execute_workload(
    executor_classes=[KafkaExecutor, MySqlExecutor],
    workload=workload,
    num=1,
    ports={"materialized": 6875, "kafka": 9092, "schema-registry": 8081, "postgres": 5432, "mysql": 3306},
    runtime=60,        # run for 60 seconds
    verbose=False,
    composition=composition,
)

Defining a Custom Workload

from materialize.data_ingest.workload import Workload
from materialize.data_ingest.definition import Insert, Upsert, Delete, Records, Keyspace, RecordSize
from materialize.data_ingest.transaction_def import TransactionDef, TransactionSize

class MyCustomWorkload(Workload):
    def __init__(self, azurite, composition=None, mz_service="materialized", deploy_generation=0):
        super().__init__(azurite, composition, mz_service, deploy_generation)
        insert = Insert(count=Records.HUNDRED, record_size=RecordSize.MEDIUM)
        self.cycle = [
            TransactionDef(operations=[insert], size=TransactionSize.SINGLE_OPERATION),
            TransactionDef(operations=[
                Delete(number_of_records=Records.ALL, record_size=RecordSize.MEDIUM, num=insert.max_key())
            ]),
        ]

Generating Random Typed Fields

import random
from materialize.data_ingest.data_type import DATA_TYPES_FOR_KEY, DATA_TYPES_FOR_AVRO
from materialize.data_ingest.field import Field

fields = []
for i in range(random.randint(1, 10)):
    fields.append(Field(f"key{i}", random.choice(DATA_TYPES_FOR_KEY), True))
for i in range(random.randint(0, 20)):
    fields.append(Field(f"value{i}", random.choice(DATA_TYPES_FOR_AVRO), False))

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment