Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:MaterializeInc Materialize Parallel Workload Framework

From Leeroopedia
Revision as of 15:39, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/MaterializeInc_Materialize_Parallel_Workload_Framework.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Overview

The parallel workload framework is a multi-threaded stress testing system located in misc/python/materialize/parallel_workload/. It drives concurrent database operations against a running Materialize instance by spawning multiple worker threads, each executing randomized actions drawn from weighted action lists. The framework is designed to uncover concurrency bugs, race conditions, and error-handling regressions across DDL, DML, and read workloads.

The module is composed of eight source files:

File Lines Purpose
action.py 3183 Action classes representing all database operations (SELECT, INSERT, CREATE, DROP, etc.)
database.py 1162 Database schema model and configuration management (tables, views, sources, sinks, clusters)
parallel_workload.py 652 Main orchestrator that initializes the database, spawns workers, and reports statistics
expression.py 303 Random SQL expression generation from a type-indexed function/operator registry
executor.py 268 SQL execution layer wrapping psycopg cursors and WebSocket connections
column.py 175 Column representations for tables and source-specific column variants
worker.py 146 Worker thread loop with error handling and reconnection logic
settings.py 49 Configuration enums for complexity levels and testing scenarios

Architecture

Orchestration

The entry point is the run() function in parallel_workload.py, which accepts host, ports, seed, runtime, complexity, scenario, and thread count parameters. It performs the following steps:

  1. Initializes a Database object representing the schema state.
  2. Opens a system connection to configure Materialize parameters (max schemas, tables, roles, clusters, etc.).
  3. Creates the initial database objects (schemas, tables, views, indexes, sources, sinks, clusters, roles).
  4. Spawns a configurable number of worker threads, each assigned an action list based on the Complexity setting.
  5. Periodically reports execution statistics via a StatisticsAction.
def run(
    host: str,
    ports: dict[str, int],
    seed: str,
    runtime: int,
    complexity: Complexity,
    scenario: Scenario,
    num_threads: int | None,
    naughty_identifiers: bool,
    replicas: int,
    composition: Composition | None,
    azurite: bool,
    sanity_restart: bool,
) -> None:
    num_threads = num_threads or os.cpu_count() or 10
    rng = random.Random(random.randrange(SEED_RANGE))
    ...
    database = Database(
        rng, seed, host, ports, complexity, scenario, naughty_identifiers
    )

Settings and Configuration

The settings.py module defines two core enums and a dictionary of additional system parameter overrides.

Complexity controls which action lists are activated for worker threads:

Value Description
Read Read-only queries (SELECT, SUBSCRIBE, COPY TO S3)
DML Read + write operations (INSERT, UPDATE, DELETE)
DDL Read + write + schema modifications (CREATE, DROP, ALTER, RENAME)
DDLOnly Only schema modification operations

Scenario controls special behavioral modes:

Value Description
Regression Standard regression testing
Cancel Injects query cancellation requests
Kill Kills and restarts the Materialize process
Rename Exercises schema and object renaming paths
BackupRestore Tests backup and restore workflows
ZeroDowntimeDeploy Tests zero-downtime deployment

Both enums support a "random" value that selects a random member at construction time.

The ADDITIONAL_SYSTEM_PARAMETER_DEFAULTS dictionary disables memory-intensive or unstable features during stress tests (e.g., memory_limiter_interval set to "0").

Action System

Base Action Class

Every action inherits from the Action base class in action.py. The base class defines:

  • run(exe: Executor) -> bool -- Executes the action using the provided executor. Returns True if a query was actually executed.
  • errors_to_ignore(exe: Executor) -> list[str] -- Returns a list of error message substrings that are expected given the current complexity and scenario. This method provides context-sensitive error suppression; for instance, DDL complexity adds "query could not complete" and "unknown catalog item", while the Kill scenario adds connection-related errors.
  • generate_select_query(exe, expr_kind) -- Builds a randomized SELECT query with optional JOINs, WHERE clauses, UNION ALL, window functions, and aggregations.
class Action:
    rng: random.Random
    composition: Composition | None

    def run(self, exe: Executor) -> bool:
        raise NotImplementedError

    def errors_to_ignore(self, exe: Executor) -> list[str]:
        result = [
            "permission denied for",
            "must be owner of",
            "numeric field overflow",
            "division by zero",
            ...
        ]
        if exe.db.complexity in (Complexity.DDL, Complexity.DDLOnly):
            result.extend([
                "query could not complete",
                "unknown catalog item",
                ...
            ])
        return result

Action Lists

Actions are grouped into five weighted ActionList instances. Each list pairs action classes with relative weights controlling how frequently they are selected:

read_action_list (autocommit=False):

Action Weight
SelectAction 100
SelectOneAction 1
CopyToS3Action 100
SetClusterAction 1
CommitRollbackAction 30
ReconnectAction 1
FlipFlagsAction 2

fetch_action_list (autocommit=False): SUBSCRIBE/FETCH operations with cursor management.

write_action_list (autocommit=False): INSERT, COPY FROM STDIN, HttpPost, and SourceInsert operations.

dml_nontrans_action_list (autocommit=True): DELETE, UPDATE, INSERT RETURNING, and COMMENT operations. These run in autocommit mode because deletes cannot execute inside transactions.

ddl_action_list (autocommit=True): Full schema management including CREATE/DROP for tables, views, indexes, roles, clusters, replicas, sources (Webhook, Kafka, Postgres, MySQL, SQL Server), sinks (Kafka, Iceberg), RENAME, SWAP, ALTER, and privilege management (GRANT/REVOKE).

Key Action Classes

The framework defines over 50 action classes. Notable categories include:

Read Actions:

  • SelectAction -- Generates randomized SELECT queries with expressions, JOINs, window functions, and aggregates.
  • FetchAction -- Declares cursors for SUBSCRIBE operations and fetches results with configurable timeouts.
  • SQLsmithAction -- Delegates to the SQLsmith fuzzer for query generation.
  • CopyToS3Action -- Executes COPY ... TO S3 operations.

Write Actions:

  • InsertAction -- Inserts randomized rows with generated expressions, respecting transaction boundaries.
  • CopyFromStdinAction -- Uses COPY FROM STDIN for bulk inserts.
  • InsertReturningAction -- INSERT ... RETURNING queries.
  • UpdateAction, DeleteAction -- Randomized UPDATE and DELETE with WHERE clauses.
  • SourceInsertAction -- Inserts data through external source connectors (Kafka, MySQL, Postgres).
  • HttpPostAction -- Sends data via HTTP POST to webhook sources.

DDL Actions:

  • CreateTableAction, DropTableAction, RenameTableAction -- Table lifecycle management.
  • CreateViewAction, DropViewAction, ReplaceMaterializedViewAction -- View management including materialized view replacement.
  • CreateClusterAction, DropClusterAction, SwapClusterAction -- Cluster operations.
  • CreateKafkaSourceAction, CreatePostgresSourceAction, CreateMySqlSourceAction -- Source creation for various connectors.
  • CreateKafkaSinkAction, CreateIcebergSinkAction -- Sink creation.
  • GrantPrivilegesAction, RevokePrivilegesAction -- RBAC operations.
  • RenameSchemaAction, SwapSchemaAction -- Schema rename and swap operations.

Lifecycle Actions:

  • CancelAction -- Cancels in-flight queries on other workers.
  • KillAction -- Kills and restarts the Materialized service.
  • ZeroDowntimeDeployAction -- Executes zero-downtime deployment sequences.
  • BackupRestoreAction -- Performs backup and restore operations.
  • ReconnectAction -- Reconnects the executor with optional role switching.

Database Model

The database.py module maintains a thread-safe in-memory representation of all database objects. Each object type (Table, View, Schema, etc.) has a threading.Lock for concurrent access.

Resource Limits

The framework enforces maximum counts for each resource type to prevent unbounded growth:

MAX_COLUMNS = 5
MAX_ROWS = 50
MAX_CLUSTERS = 4
MAX_CLUSTER_REPLICAS = 2
MAX_DBS = 5
MAX_SCHEMAS = 5
MAX_TABLES = 5
MAX_VIEWS = 15
MAX_INDEXES = 15
MAX_ROLES = 15
MAX_WEBHOOK_SOURCES = 5
MAX_KAFKA_SOURCES = 5
MAX_MYSQL_SOURCES = 5
MAX_SQL_SERVER_SOURCES = 5
MAX_POSTGRES_SOURCES = 5
MAX_KAFKA_SINKS = 5
MAX_ICEBERG_SINKS = 5

Object Hierarchy

The Database class tracks all database objects and provides accessor methods such as db_objects() and db_objects_without_views(). On initialization, it creates randomized starting sets of schemas, tables, views, roles, clusters, and webhook sources, then incrementally creates and destroys objects during the test run.

Key model classes include:

  • DB -- Represents a Materialize database with a seed-based name (e.g., db-pw-{seed}-{id}).
  • Schema -- Resides within a DB; supports rename tracking.
  • Table -- Contains randomized columns (2 to MAX_COLUMNS) with random data types. Supports temporary tables.
  • View -- Built from one or two base objects with random expressions. Can be materialized with configurable refresh strategies (ON COMMIT, EVERY ... seconds, AT mz_now()).
  • Index -- Tracks which columns of a DBObject are indexed.
  • Cluster -- Managed or unmanaged clusters with configurable size and replication factor.
  • Role -- Named roles for RBAC testing.
  • WebhookSource, KafkaSource, MySqlSource, PostgresSource, SqlServerSource -- Source connectors with source-specific column types.
  • KafkaSink, IcebergSink -- Sink connectors for Kafka and Iceberg.

Column Representations

The column.py module defines the base Column class and source-specific variants:

  • Column -- Base class with column_id, data_type, nullable, default, and a reference to the parent DBObject. The column name encodes the ID and type (e.g., c-0-int4).
  • WebhookColumn, KafkaColumn, MySqlColumn, PostgresColumn, SqlServerColumn -- Simplified column classes for external sources that accept a plain name and data type.

The module also supports naughty identifiers via the naughtify() function, which appends truncated strings from a naughty-strings corpus to identifiers for testing character encoding edge cases. This is toggled globally via set_naughty_identifiers().

def naughtify(name: str) -> str:
    """Makes a string into a naughty identifier, always returns the same
    identifier when called with the same input."""
    if not NAUGHTY_IDENTIFIERS:
        return name
    strings = naughty_strings()
    index = sum([10**i * c for i, c in enumerate(name.encode())]) % len(strings)
    return f"{name}_{strings[index].encode('utf-8')[:16].decode('utf-8', 'ignore')}"

Expression Generation

The expression.py module builds random SQL expressions using a type-indexed registry of function and operator templates (FUNC_OPS).

ExprKind Enum

Value Description
ALL All expressions allowed
WRITE Excludes functions unsuitable for write contexts (e.g., mz_now())
MATERIALIZABLE Only expressions valid in materialized views (excludes session-dependent functions like current_user())
NONE No expressions

FuncOp Registry

Each FuncOp stores a format template, parameter types, and an optional unsupported kind. The FUNC_OPS dictionary maps data types to lists of applicable operations. Coverage includes:

  • Arithmetic: +, -, *, /, mod for integer, unsigned integer, and floating-point types.
  • Bitwise: &, |, #, ~ for integer types.
  • Mathematical: exp, ceil, floor, cbrt, cos, sin, tan, radians, degrees for Double.
  • String: lower, upper, md5, reverse, concat, btrim, ||.
  • Boolean: AND, OR, NOT, comparisons (>, <, =, etc.), IS NULL, containment (@>, <@).
  • Temporal: Date/timestamp/interval arithmetic, now(), mz_now().
  • List/Map: list_append, list_prepend, list_cat, map_length.
  • Casts: Type conversions between numeric, temporal, and text types.

The expression() function recursively builds expressions up to depth 60, randomly choosing between function applications, column references, and literal values:

def expression(
    data_type: type[DataType],
    columns: list[Column],
    rng: random.Random,
    kind: ExprKind = ExprKind.ALL,
    level: int = 0,
) -> str:
    if level < 60:
        if FUNC_OPS[data_type] and rng.random() < 0.5:
            fnop = rng.choice(FUNC_OPS[data_type])
            ...
            return fnop.text.format(*exprs)
        if rng.random() < 0.9:
            for col in rng.sample(columns, len(columns)):
                if col.data_type == data_type:
                    return str(col)
    return str(data_type.random_value(rng, record_size=record_size, in_query=True))

Executor

The executor.py module provides the Executor class, which wraps a psycopg cursor and an optional WebSocket connection for executing SQL against Materialize.

Key responsibilities:

  • Connection management: Tracks reconnect_next and rollback_next flags to handle connection recovery after failures.
  • Transaction control: commit() and rollback() methods with support for both SQL-level and connection-level rollback.
  • Protocol selection: Randomly chooses between the PostgreSQL wire protocol (psycopg) and the WebSocket experimental API based on use_ws.
  • Logging: Thread-safe query logging to parallel-workload-queries.log using a global lock.
  • Insert tracking: The insert_table field prevents writing to different tables within the same transaction.
class Executor:
    rng: random.Random
    cur: psycopg.Cursor
    ws: websocket.WebSocket | None
    pg_pid: int
    insert_table: int | None
    db: "Database"
    reconnect_next: bool
    rollback_next: bool

The Http enum controls HTTP usage: NO (never), YES (always), or RANDOM (coin flip).

Worker Thread Management

The worker.py module defines the Worker class, which runs the main action loop on a dedicated thread.

Each worker:

  1. Establishes its own PostgreSQL connection and WebSocket connection.
  2. Sets SERIALIZABLE isolation level and disables auto_route_catalog_queries.
  3. Enters a time-bounded loop (while time.time() < self.end_time).
  4. Selects an action via weighted random choice from its action list.
  5. Handles connection recovery: if rollback_next is set, rolls back; if reconnect_next is set, reconnects.
  6. Tracks per-action query counts and categorized ignored errors.
  7. Propagates unexpected exceptions to the main thread via occurred_exception.
class Worker:
    rng: random.Random
    actions: list[Action]
    weights: list[float]
    end_time: float
    num_queries: Counter[type[Action]]
    ignored_errors: defaultdict[str, Counter[type[Action]]]

    def run(self, host, pg_port, http_port, user, database):
        ...
        while time.time() < self.end_time:
            action = self.rng.choices(self.actions, self.weights)[0]
            try:
                ...
                if action.run(self.exe):
                    self.num_queries[type(action)] += 1
            except QueryError as e:
                ...

Error Handling Strategy

The framework implements a layered error handling approach:

  1. Action-level error lists: Each action class overrides errors_to_ignore() to declare expected errors for its operation type.
  2. Context-sensitive suppression: The base Action class adds scenario-specific and complexity-specific errors (e.g., connection errors for the Kill scenario, catalog errors for DDL complexity).
  3. Worker-level recovery: Workers match caught QueryError exceptions against the ignore list. Connection-level errors trigger reconnection; transaction-level errors trigger rollback.
  4. Failure propagation: Unmatched errors are stored in occurred_exception and re-raised, causing the test to fail.

File Locations

File Absolute Path
action.py misc/python/materialize/parallel_workload/action.py
database.py misc/python/materialize/parallel_workload/database.py
expression.py misc/python/materialize/parallel_workload/expression.py
parallel_workload.py misc/python/materialize/parallel_workload/parallel_workload.py
column.py misc/python/materialize/parallel_workload/column.py
executor.py misc/python/materialize/parallel_workload/executor.py
settings.py misc/python/materialize/parallel_workload/settings.py
worker.py misc/python/materialize/parallel_workload/worker.py

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment