Implementation:MaterializeInc Materialize Data Ingest Framework
| Knowledge Sources | |
|---|---|
| Domains | Testing, Data_Ingestion, Multi_Backend |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Framework for generating and executing multi-backend data ingestion workloads against Materialize, supporting Kafka, PostgreSQL, MySQL, SQL Server, and direct SQL backends.
Description
The Data Ingest Framework is a Python-based test infrastructure module within the Materialize repository that enables automated, randomized data ingestion testing across multiple database backends. The framework decouples workload definitions (what data operations to perform) from executor implementations (how to perform them on a specific backend), allowing the same logical test scenario to be replayed against Kafka (via Avro-serialized topics), PostgreSQL (via CDC replication), MySQL (via CDC replication), SQL Server (via CDC replication), and Materialize tables directly (via Kafka roundtrip through sinks and sources).
The module generates randomized schemas with typed fields, produces rows with random or deterministic values according to configurable record sizes (TINY, SMALL, MEDIUM, LARGE), and orchestrates INSERT, UPSERT, and DELETE operations in transactional batches. After execution, results from each backend's Materialize source are compared against a PostgreSQL ground truth to verify correctness.
Usage
Use this framework when writing or running integration tests that need to verify Materialize correctly ingests data from external systems. It is particularly useful for:
- Consistency testing: Verifying that Materialize sources reflect the same final state as a reference PostgreSQL database after a series of mutations.
- Disruption testing: Validating data correctness after Materialize restarts or zero-downtime deployments.
- Multi-backend regression: Ensuring changes to Materialize do not break ingestion from any supported source system.
- Type coverage: Exercising the full range of Materialize-supported data types across backends with boundary values and edge cases.
Code Reference
Source Location
- Repository: MaterializeInc_Materialize
- File: misc/python/materialize/data_ingest/ (directory)
- data_type.py (993 lines) - Data type definitions and backend-specific name mappings
- executor.py (867 lines) - Backend executors (Kafka, PostgreSQL, MySQL, SQL Server, Kafka Roundtrip)
- workload.py (353 lines) - Workload orchestration and predefined test scenarios
- definition.py (161 lines) - Workload operation patterns (Insert, Upsert, Delete)
- field.py (49 lines) - Field schema representation
- row.py (33 lines) - Row data container with operation type
- rowlist.py (22 lines) - Row collection container
- transaction.py (23 lines) - Transaction wrapper over row lists
- transaction_def.py (166 lines) - Transaction definitions, RestartMz, ZeroDowntimeDeploy
Signature
# data_type.py - Enums and base class
class RecordSize(Enum): # TINY=1, SMALL=2, MEDIUM=3, LARGE=4
class Backend(Enum): # AVRO=1, JSON=2, POSTGRES=3, MYSQL=4, SQL_SERVER=5, MATERIALIZE=6
class DataType:
def random_value(rng, record_size, in_query) -> Any
def numeric_value(num, in_query) -> Any
def name(backend) -> str
# Concrete DataType subclasses:
# Boolean, SmallInt, Int, Long, UInt2, UInt4, UInt8, Float, Double,
# Numeric, Numeric383, Text, Bytea, UUID, Jsonb, TextTextMap,
# IntArray, IntList, Timestamp, MzTimestamp, Date, Time, Interval, Oid
# field.py
class Field:
def __init__(self, name: str, data_type: type[DataType], is_key: bool, value_fn=None)
# row.py
class Operation(Enum): # INSERT=1, UPSERT=2, DELETE=3
class Row:
def __init__(self, fields: list[Field], values: list[Any], operation: Operation)
# rowlist.py
class RowList:
def __init__(self, rows: list[Row])
# transaction.py
class Transaction:
def __init__(self, row_lists: list[RowList])
# definition.py
class Records(Enum): # ALL=0, ONE=1, HUNDRED=100, SOME=1000, MANY=1000000
class Keyspace(Enum): # SINGLE_VALUE=1, LARGE=2, EXISTING=3
class Definition:
def generate(self, fields: list[Field]) -> Iterator[RowList]
class Insert(Definition):
def __init__(self, count: Records, record_size: RecordSize)
class Upsert(Definition):
def __init__(self, keyspace: Keyspace, count: Records, record_size: RecordSize)
class Delete(Definition):
def __init__(self, number_of_records: Records, record_size: RecordSize, num: int | None)
# transaction_def.py
class TransactionSize(Enum): # SINGLE_OPERATION=1, HUGE=1_000_000_000
class TransactionDef:
def __init__(self, operations: list[Definition], size: TransactionSize)
def generate(self, fields: list[Field]) -> Iterator[Transaction | None]
class RestartMz(TransactionDef):
def __init__(self, composition, probability, workload, azurite)
class ZeroDowntimeDeploy(TransactionDef):
def __init__(self, composition, probability, workload, azurite)
# executor.py
class Executor:
def __init__(self, ports, fields, database, schema, cluster, mz_service, composition)
def create(self, logging_exe=None) -> None
def run(self, transaction: Transaction, logging_exe=None) -> None
class PrintExecutor(Executor)
class KafkaExecutor(Executor)
class PgExecutor(Executor)
class MySqlExecutor(Executor)
class SqlServerExecutor(Executor)
class KafkaRoundtripExecutor(Executor)
# workload.py
class Workload:
def __init__(self, azurite, composition, mz_service, deploy_generation)
def generate(self, fields: list[Field]) -> Iterator[Transaction]
class SingleSensorUpdating(Workload)
class SingleSensorUpdatingDisruptions(Workload)
class SingleSensorUpdating0dtDeploy(Workload)
class DeleteDataAtEndOfDay(Workload)
class DeleteDataAtEndOfDayDisruptions(Workload)
class DeleteDataAtEndOfDay0dtDeploys(Workload)
Import
from materialize.data_ingest.data_type import DataType, RecordSize, Backend
from materialize.data_ingest.executor import KafkaExecutor, PgExecutor, MySqlExecutor, SqlServerExecutor, KafkaRoundtripExecutor
from materialize.data_ingest.workload import Workload
from materialize.data_ingest.definition import Insert, Upsert, Delete, Records, Keyspace
from materialize.data_ingest.field import Field
from materialize.data_ingest.row import Row, Operation
from materialize.data_ingest.rowlist import RowList
from materialize.data_ingest.transaction import Transaction
from materialize.data_ingest.transaction_def import TransactionDef, RestartMz, ZeroDowntimeDeploy
I/O Contract
Inputs
| Component | Input | Type | Description |
|---|---|---|---|
| Field | name | str | Column name for the schema field |
| Field | data_type | type[DataType] | One of the 24 DataType subclasses (Boolean, SmallInt, Int, Long, etc.) |
| Field | is_key | bool | Whether this field is part of the primary key |
| DataType.random_value | rng | random.Random | Random number generator for reproducible runs |
| DataType.random_value | record_size | RecordSize | Controls value magnitude: TINY, SMALL, MEDIUM, LARGE |
| DataType.random_value | in_query | bool | When True, returns SQL-cast literals (e.g., 42::int); when False, returns raw Python values
|
| DataType.name | backend | Backend | Returns the type name appropriate for the target backend (AVRO, JSON, POSTGRES, MYSQL, SQL_SERVER, MATERIALIZE) |
| Insert | count | Records | Number of rows to insert: ONE (1), HUNDRED (100), SOME (1,000), MANY (1,000,000) |
| Insert | record_size | RecordSize | Size category for generated values |
| Upsert | keyspace | Keyspace | SINGLE_VALUE (always key 0), LARGE, or EXISTING |
| Delete | number_of_records | Records | How many records to delete; ALL requires a num parameter
|
| TransactionDef | operations | list[Definition] | Ordered list of Insert/Upsert/Delete definitions to compose into transactions |
| TransactionDef | size | TransactionSize | SINGLE_OPERATION (one RowList per Transaction) or HUGE (up to 1 billion RowLists batched) |
| Executor | ports | dict[str, int] | Service-name-to-port mapping (e.g., {"materialized": 6875, "kafka": 9092})
|
| Executor | fields | list[Field] | Schema fields for table creation and data generation |
| Workload | azurite | bool | Whether to use Azure blob storage emulator |
| execute_workload | runtime | int | Maximum seconds to run the workload before stopping |
Outputs
| Component | Output | Type | Description |
|---|---|---|---|
| Definition.generate | row lists | Iterator[RowList] | Stream of RowList objects, each containing one or more Row instances with field values and operation type |
| TransactionDef.generate | transactions | None] | Stream of Transaction objects (batched RowLists) or None (for disruption operations like RestartMz) |
| Workload.generate | transactions | Iterator[Transaction] | Infinite cycle of transactions from the workload's cycle of TransactionDefs |
| Executor.create | side effects | None | Creates backend-specific tables, sources, connections, topics, and schemas |
| Executor.run | side effects | None | Executes INSERT/UPSERT/DELETE operations against the backend |
| execute_workload | assertion | None | Raises ValueError if Materialize query results differ from PostgreSQL ground truth |
Data Type Backend Mappings
| DataType | Materialize | Avro | JSON | PostgreSQL | MySQL | SQL Server |
|---|---|---|---|---|---|---|
| SmallInt | smallint | int | integer | smallint | smallint | smallint |
| Int | int | int | integer | int | int | int |
| Long | bigint | long | integer | bigint | bigint | bigint |
| UInt2 | uint2 | int | integer | numeric | smallint unsigned | uint2 |
| UInt4 | uint4 | int | integer | numeric | int unsigned | uint4 |
| UInt8 | uint8 | long | integer | numeric | bigint unsigned | uint8 |
| Float | float4 | float | number | float4 | float4 | real |
| Double | float8 | double | number | float8 | float8 | float |
| Text | text | string | string | text | text | varchar(1024) |
| Timestamp | timestamp | unsupported | unsupported | timestamp | timestamp | datetime2 |
Architecture
Layer Structure
The framework is organized into four layers:
1. Type Layer (data_type.py, field.py): Defines 24 DataType subclasses, each providing random_value(), numeric_value(), and name() methods. The name() method returns backend-specific type names. Pre-built filter sets (DATA_TYPES_FOR_AVRO, DATA_TYPES_FOR_MYSQL, DATA_TYPES_FOR_SQL_SERVER, DATA_TYPES_FOR_KEY) exclude unsupported types per backend. Field binds a name, data type, and key/value role together.
2. Data Layer (row.py, rowlist.py, transaction.py): A Row couples a list of fields with their concrete values and an Operation enum (INSERT, UPSERT, DELETE). RowList groups rows that belong to the same logical operation step. Transaction groups RowLists into an atomic unit of work.
3. Definition Layer (definition.py, transaction_def.py): Definition subclasses (Insert, Upsert, Delete) generate streams of RowLists from field schemas. TransactionDef batches Definition outputs into Transactions of configurable size. Special TransactionDef subclasses RestartMz and ZeroDowntimeDeploy inject infrastructure disruptions (killing/restarting Materialize instances, performing rolling deployments) between data operations.
4. Execution Layer (executor.py, workload.py): Executor subclasses translate Transactions into backend-specific DDL and DML. Workload subclasses define reusable test scenarios as cycles of TransactionDefs. The execute_workload() function wires everything together: it generates random schemas, creates executors, runs the workload for a specified duration, then compares Materialize results against a PostgreSQL ground truth.
Executor Details
| Executor | Backend | Ingestion Path | Key Behavior |
|---|---|---|---|
| KafkaExecutor | Kafka (Avro) | Produces Avro-serialized key/value messages to a Kafka topic; Materialize reads via CREATE SOURCE ... FROM KAFKA ... ENVELOPE UPSERT |
Deletes send null values for the key; uses Schema Registry for Avro schemas |
| PgExecutor | PostgreSQL | Writes to a Postgres table; Materialize reads via CREATE SOURCE ... FROM POSTGRES CONNECTION ... (PUBLICATION ...) |
Creates publication, sets REPLICA IDENTITY FULL; uses ON CONFLICT ... DO UPDATE for upserts
|
| MySqlExecutor | MySQL | Writes to a MySQL table; Materialize reads via CREATE SOURCE ... FROM MYSQL CONNECTION ... |
Uses ON DUPLICATE KEY UPDATE for upserts; connects via pymysql
|
| SqlServerExecutor | SQL Server | Writes to a SQL Server table with CDC enabled; Materialize reads via CREATE SOURCE ... FROM SQL SERVER CONNECTION ... |
Uses MERGE ... WHEN MATCHED THEN UPDATE WHEN NOT MATCHED THEN INSERT for upserts; uses testdrive for SQL execution
|
| KafkaRoundtripExecutor | Materialize (via Kafka sink/source) | Writes to a Materialize table, sinks to Kafka via ENVELOPE DEBEZIUM, then reads back via a new Kafka source |
Tests full roundtrip through Materialize sinks and sources |
| PrintExecutor | stdout | Prints transactions to console | Used for debugging when verbose mode is enabled |
Predefined Workloads
| Workload | Description | Cycle |
|---|---|---|
| SingleSensorUpdating | Simulates a single sensor continuously updating its value | Upsert with SINGLE_VALUE keyspace, one record per transaction |
| SingleSensorUpdatingDisruptions | Same as above with Mz restart disruptions (10% probability) | Upsert + RestartMz |
| SingleSensorUpdating0dtDeploy | Same as above with zero-downtime deployment disruptions (10% probability) | Upsert + ZeroDowntimeDeploy |
| DeleteDataAtEndOfDay | Inserts many records, then deletes all of them in bulk | Insert phase (HUGE transactions) followed by Delete ALL phase |
| DeleteDataAtEndOfDayDisruptions | Same as above with Mz restart disruptions | Insert + Delete + RestartMz |
| DeleteDataAtEndOfDay0dtDeploys | Same as above with zero-downtime deployment disruptions | Insert + Delete + ZeroDowntimeDeploy |
Usage Examples
Running a Workload Programmatically
from materialize.data_ingest.workload import execute_workload, SingleSensorUpdating
from materialize.data_ingest.executor import KafkaExecutor, MySqlExecutor
workload = SingleSensorUpdating(azurite=False, composition=composition)
execute_workload(
executor_classes=[KafkaExecutor, MySqlExecutor],
workload=workload,
num=1,
ports={"materialized": 6875, "kafka": 9092, "schema-registry": 8081, "postgres": 5432, "mysql": 3306},
runtime=60, # run for 60 seconds
verbose=False,
composition=composition,
)
Defining a Custom Workload
from materialize.data_ingest.workload import Workload
from materialize.data_ingest.definition import Insert, Upsert, Delete, Records, Keyspace, RecordSize
from materialize.data_ingest.transaction_def import TransactionDef, TransactionSize
class MyCustomWorkload(Workload):
def __init__(self, azurite, composition=None, mz_service="materialized", deploy_generation=0):
super().__init__(azurite, composition, mz_service, deploy_generation)
insert = Insert(count=Records.HUNDRED, record_size=RecordSize.MEDIUM)
self.cycle = [
TransactionDef(operations=[insert], size=TransactionSize.SINGLE_OPERATION),
TransactionDef(operations=[
Delete(number_of_records=Records.ALL, record_size=RecordSize.MEDIUM, num=insert.max_key())
]),
]
Generating Random Typed Fields
import random
from materialize.data_ingest.data_type import DATA_TYPES_FOR_KEY, DATA_TYPES_FOR_AVRO
from materialize.data_ingest.field import Field
fields = []
for i in range(random.randint(1, 10)):
fields.append(Field(f"key{i}", random.choice(DATA_TYPES_FOR_KEY), True))
for i in range(random.randint(0, 20)):
fields.append(Field(f"value{i}", random.choice(DATA_TYPES_FOR_AVRO), False))