Implementation:MaterializeInc Materialize Parallel Workload Framework
Overview
The parallel workload framework is a multi-threaded stress testing system located in misc/python/materialize/parallel_workload/. It drives concurrent database operations against a running Materialize instance by spawning multiple worker threads, each executing randomized actions drawn from weighted action lists. The framework is designed to uncover concurrency bugs, race conditions, and error-handling regressions across DDL, DML, and read workloads.
The module is composed of eight source files:
| File | Lines | Purpose |
|---|---|---|
action.py |
3183 | Action classes representing all database operations (SELECT, INSERT, CREATE, DROP, etc.) |
database.py |
1162 | Database schema model and configuration management (tables, views, sources, sinks, clusters) |
parallel_workload.py |
652 | Main orchestrator that initializes the database, spawns workers, and reports statistics |
expression.py |
303 | Random SQL expression generation from a type-indexed function/operator registry |
executor.py |
268 | SQL execution layer wrapping psycopg cursors and WebSocket connections |
column.py |
175 | Column representations for tables and source-specific column variants |
worker.py |
146 | Worker thread loop with error handling and reconnection logic |
settings.py |
49 | Configuration enums for complexity levels and testing scenarios |
Architecture
Orchestration
The entry point is the run() function in parallel_workload.py, which accepts host, ports, seed, runtime, complexity, scenario, and thread count parameters. It performs the following steps:
- Initializes a
Databaseobject representing the schema state. - Opens a system connection to configure Materialize parameters (max schemas, tables, roles, clusters, etc.).
- Creates the initial database objects (schemas, tables, views, indexes, sources, sinks, clusters, roles).
- Spawns a configurable number of worker threads, each assigned an action list based on the
Complexitysetting. - Periodically reports execution statistics via a
StatisticsAction.
def run(
host: str,
ports: dict[str, int],
seed: str,
runtime: int,
complexity: Complexity,
scenario: Scenario,
num_threads: int | None,
naughty_identifiers: bool,
replicas: int,
composition: Composition | None,
azurite: bool,
sanity_restart: bool,
) -> None:
num_threads = num_threads or os.cpu_count() or 10
rng = random.Random(random.randrange(SEED_RANGE))
...
database = Database(
rng, seed, host, ports, complexity, scenario, naughty_identifiers
)
Settings and Configuration
The settings.py module defines two core enums and a dictionary of additional system parameter overrides.
Complexity controls which action lists are activated for worker threads:
| Value | Description |
|---|---|
Read |
Read-only queries (SELECT, SUBSCRIBE, COPY TO S3) |
DML |
Read + write operations (INSERT, UPDATE, DELETE) |
DDL |
Read + write + schema modifications (CREATE, DROP, ALTER, RENAME) |
DDLOnly |
Only schema modification operations |
Scenario controls special behavioral modes:
| Value | Description |
|---|---|
Regression |
Standard regression testing |
Cancel |
Injects query cancellation requests |
Kill |
Kills and restarts the Materialize process |
Rename |
Exercises schema and object renaming paths |
BackupRestore |
Tests backup and restore workflows |
ZeroDowntimeDeploy |
Tests zero-downtime deployment |
Both enums support a "random" value that selects a random member at construction time.
The ADDITIONAL_SYSTEM_PARAMETER_DEFAULTS dictionary disables memory-intensive or unstable features during stress tests (e.g., memory_limiter_interval set to "0").
Action System
Base Action Class
Every action inherits from the Action base class in action.py. The base class defines:
run(exe: Executor) -> bool-- Executes the action using the provided executor. ReturnsTrueif a query was actually executed.errors_to_ignore(exe: Executor) -> list[str]-- Returns a list of error message substrings that are expected given the current complexity and scenario. This method provides context-sensitive error suppression; for instance, DDL complexity adds "query could not complete" and "unknown catalog item", while the Kill scenario adds connection-related errors.generate_select_query(exe, expr_kind)-- Builds a randomized SELECT query with optional JOINs, WHERE clauses, UNION ALL, window functions, and aggregations.
class Action:
rng: random.Random
composition: Composition | None
def run(self, exe: Executor) -> bool:
raise NotImplementedError
def errors_to_ignore(self, exe: Executor) -> list[str]:
result = [
"permission denied for",
"must be owner of",
"numeric field overflow",
"division by zero",
...
]
if exe.db.complexity in (Complexity.DDL, Complexity.DDLOnly):
result.extend([
"query could not complete",
"unknown catalog item",
...
])
return result
Action Lists
Actions are grouped into five weighted ActionList instances. Each list pairs action classes with relative weights controlling how frequently they are selected:
read_action_list (autocommit=False):
| Action | Weight |
|---|---|
SelectAction |
100 |
SelectOneAction |
1 |
CopyToS3Action |
100 |
SetClusterAction |
1 |
CommitRollbackAction |
30 |
ReconnectAction |
1 |
FlipFlagsAction |
2 |
fetch_action_list (autocommit=False): SUBSCRIBE/FETCH operations with cursor management.
write_action_list (autocommit=False): INSERT, COPY FROM STDIN, HttpPost, and SourceInsert operations.
dml_nontrans_action_list (autocommit=True): DELETE, UPDATE, INSERT RETURNING, and COMMENT operations. These run in autocommit mode because deletes cannot execute inside transactions.
ddl_action_list (autocommit=True): Full schema management including CREATE/DROP for tables, views, indexes, roles, clusters, replicas, sources (Webhook, Kafka, Postgres, MySQL, SQL Server), sinks (Kafka, Iceberg), RENAME, SWAP, ALTER, and privilege management (GRANT/REVOKE).
Key Action Classes
The framework defines over 50 action classes. Notable categories include:
Read Actions:
SelectAction-- Generates randomized SELECT queries with expressions, JOINs, window functions, and aggregates.FetchAction-- Declares cursors for SUBSCRIBE operations and fetches results with configurable timeouts.SQLsmithAction-- Delegates to the SQLsmith fuzzer for query generation.CopyToS3Action-- Executes COPY ... TO S3 operations.
Write Actions:
InsertAction-- Inserts randomized rows with generated expressions, respecting transaction boundaries.CopyFromStdinAction-- Uses COPY FROM STDIN for bulk inserts.InsertReturningAction-- INSERT ... RETURNING queries.UpdateAction,DeleteAction-- Randomized UPDATE and DELETE with WHERE clauses.SourceInsertAction-- Inserts data through external source connectors (Kafka, MySQL, Postgres).HttpPostAction-- Sends data via HTTP POST to webhook sources.
DDL Actions:
CreateTableAction,DropTableAction,RenameTableAction-- Table lifecycle management.CreateViewAction,DropViewAction,ReplaceMaterializedViewAction-- View management including materialized view replacement.CreateClusterAction,DropClusterAction,SwapClusterAction-- Cluster operations.CreateKafkaSourceAction,CreatePostgresSourceAction,CreateMySqlSourceAction-- Source creation for various connectors.CreateKafkaSinkAction,CreateIcebergSinkAction-- Sink creation.GrantPrivilegesAction,RevokePrivilegesAction-- RBAC operations.RenameSchemaAction,SwapSchemaAction-- Schema rename and swap operations.
Lifecycle Actions:
CancelAction-- Cancels in-flight queries on other workers.KillAction-- Kills and restarts the Materialized service.ZeroDowntimeDeployAction-- Executes zero-downtime deployment sequences.BackupRestoreAction-- Performs backup and restore operations.ReconnectAction-- Reconnects the executor with optional role switching.
Database Model
The database.py module maintains a thread-safe in-memory representation of all database objects. Each object type (Table, View, Schema, etc.) has a threading.Lock for concurrent access.
Resource Limits
The framework enforces maximum counts for each resource type to prevent unbounded growth:
MAX_COLUMNS = 5
MAX_ROWS = 50
MAX_CLUSTERS = 4
MAX_CLUSTER_REPLICAS = 2
MAX_DBS = 5
MAX_SCHEMAS = 5
MAX_TABLES = 5
MAX_VIEWS = 15
MAX_INDEXES = 15
MAX_ROLES = 15
MAX_WEBHOOK_SOURCES = 5
MAX_KAFKA_SOURCES = 5
MAX_MYSQL_SOURCES = 5
MAX_SQL_SERVER_SOURCES = 5
MAX_POSTGRES_SOURCES = 5
MAX_KAFKA_SINKS = 5
MAX_ICEBERG_SINKS = 5
Object Hierarchy
The Database class tracks all database objects and provides accessor methods such as db_objects() and db_objects_without_views(). On initialization, it creates randomized starting sets of schemas, tables, views, roles, clusters, and webhook sources, then incrementally creates and destroys objects during the test run.
Key model classes include:
- DB -- Represents a Materialize database with a seed-based name (e.g.,
db-pw-{seed}-{id}). - Schema -- Resides within a DB; supports rename tracking.
- Table -- Contains randomized columns (2 to
MAX_COLUMNS) with random data types. Supports temporary tables. - View -- Built from one or two base objects with random expressions. Can be materialized with configurable refresh strategies (
ON COMMIT,EVERY ... seconds,AT mz_now()). - Index -- Tracks which columns of a
DBObjectare indexed. - Cluster -- Managed or unmanaged clusters with configurable size and replication factor.
- Role -- Named roles for RBAC testing.
- WebhookSource, KafkaSource, MySqlSource, PostgresSource, SqlServerSource -- Source connectors with source-specific column types.
- KafkaSink, IcebergSink -- Sink connectors for Kafka and Iceberg.
Column Representations
The column.py module defines the base Column class and source-specific variants:
- Column -- Base class with
column_id,data_type,nullable,default, and a reference to the parentDBObject. The column name encodes the ID and type (e.g.,c-0-int4). - WebhookColumn, KafkaColumn, MySqlColumn, PostgresColumn, SqlServerColumn -- Simplified column classes for external sources that accept a plain name and data type.
The module also supports naughty identifiers via the naughtify() function, which appends truncated strings from a naughty-strings corpus to identifiers for testing character encoding edge cases. This is toggled globally via set_naughty_identifiers().
def naughtify(name: str) -> str:
"""Makes a string into a naughty identifier, always returns the same
identifier when called with the same input."""
if not NAUGHTY_IDENTIFIERS:
return name
strings = naughty_strings()
index = sum([10**i * c for i, c in enumerate(name.encode())]) % len(strings)
return f"{name}_{strings[index].encode('utf-8')[:16].decode('utf-8', 'ignore')}"
Expression Generation
The expression.py module builds random SQL expressions using a type-indexed registry of function and operator templates (FUNC_OPS).
ExprKind Enum
| Value | Description |
|---|---|
ALL |
All expressions allowed |
WRITE |
Excludes functions unsuitable for write contexts (e.g., mz_now())
|
MATERIALIZABLE |
Only expressions valid in materialized views (excludes session-dependent functions like current_user())
|
NONE |
No expressions |
FuncOp Registry
Each FuncOp stores a format template, parameter types, and an optional unsupported kind. The FUNC_OPS dictionary maps data types to lists of applicable operations. Coverage includes:
- Arithmetic:
+,-,*,/,modfor integer, unsigned integer, and floating-point types. - Bitwise:
&,|,#,~for integer types. - Mathematical:
exp,ceil,floor,cbrt,cos,sin,tan,radians,degreesfor Double. - String:
lower,upper,md5,reverse,concat,btrim,||. - Boolean:
AND,OR,NOT, comparisons (>,<,=, etc.),IS NULL, containment (@>,<@). - Temporal: Date/timestamp/interval arithmetic,
now(),mz_now(). - List/Map:
list_append,list_prepend,list_cat,map_length. - Casts: Type conversions between numeric, temporal, and text types.
The expression() function recursively builds expressions up to depth 60, randomly choosing between function applications, column references, and literal values:
def expression(
data_type: type[DataType],
columns: list[Column],
rng: random.Random,
kind: ExprKind = ExprKind.ALL,
level: int = 0,
) -> str:
if level < 60:
if FUNC_OPS[data_type] and rng.random() < 0.5:
fnop = rng.choice(FUNC_OPS[data_type])
...
return fnop.text.format(*exprs)
if rng.random() < 0.9:
for col in rng.sample(columns, len(columns)):
if col.data_type == data_type:
return str(col)
return str(data_type.random_value(rng, record_size=record_size, in_query=True))
Executor
The executor.py module provides the Executor class, which wraps a psycopg cursor and an optional WebSocket connection for executing SQL against Materialize.
Key responsibilities:
- Connection management: Tracks
reconnect_nextandrollback_nextflags to handle connection recovery after failures. - Transaction control:
commit()androllback()methods with support for both SQL-level and connection-level rollback. - Protocol selection: Randomly chooses between the PostgreSQL wire protocol (psycopg) and the WebSocket experimental API based on
use_ws. - Logging: Thread-safe query logging to
parallel-workload-queries.logusing a global lock. - Insert tracking: The
insert_tablefield prevents writing to different tables within the same transaction.
class Executor:
rng: random.Random
cur: psycopg.Cursor
ws: websocket.WebSocket | None
pg_pid: int
insert_table: int | None
db: "Database"
reconnect_next: bool
rollback_next: bool
The Http enum controls HTTP usage: NO (never), YES (always), or RANDOM (coin flip).
Worker Thread Management
The worker.py module defines the Worker class, which runs the main action loop on a dedicated thread.
Each worker:
- Establishes its own PostgreSQL connection and WebSocket connection.
- Sets
SERIALIZABLEisolation level and disablesauto_route_catalog_queries. - Enters a time-bounded loop (
while time.time() < self.end_time). - Selects an action via weighted random choice from its action list.
- Handles connection recovery: if
rollback_nextis set, rolls back; ifreconnect_nextis set, reconnects. - Tracks per-action query counts and categorized ignored errors.
- Propagates unexpected exceptions to the main thread via
occurred_exception.
class Worker:
rng: random.Random
actions: list[Action]
weights: list[float]
end_time: float
num_queries: Counter[type[Action]]
ignored_errors: defaultdict[str, Counter[type[Action]]]
def run(self, host, pg_port, http_port, user, database):
...
while time.time() < self.end_time:
action = self.rng.choices(self.actions, self.weights)[0]
try:
...
if action.run(self.exe):
self.num_queries[type(action)] += 1
except QueryError as e:
...
Error Handling Strategy
The framework implements a layered error handling approach:
- Action-level error lists: Each action class overrides
errors_to_ignore()to declare expected errors for its operation type. - Context-sensitive suppression: The base
Actionclass adds scenario-specific and complexity-specific errors (e.g., connection errors for the Kill scenario, catalog errors for DDL complexity). - Worker-level recovery: Workers match caught
QueryErrorexceptions against the ignore list. Connection-level errors trigger reconnection; transaction-level errors trigger rollback. - Failure propagation: Unmatched errors are stored in
occurred_exceptionand re-raised, causing the test to fail.
File Locations
| File | Absolute Path |
|---|---|
| action.py | misc/python/materialize/parallel_workload/action.py
|
| database.py | misc/python/materialize/parallel_workload/database.py
|
| expression.py | misc/python/materialize/parallel_workload/expression.py
|
| parallel_workload.py | misc/python/materialize/parallel_workload/parallel_workload.py
|
| column.py | misc/python/materialize/parallel_workload/column.py
|
| executor.py | misc/python/materialize/parallel_workload/executor.py
|
| settings.py | misc/python/materialize/parallel_workload/settings.py
|
| worker.py | misc/python/materialize/parallel_workload/worker.py
|