Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:MaterializeInc Materialize Parallel Benchmark Framework

From Leeroopedia


Overview

The parallel benchmark framework is a performance measurement system located in misc/python/materialize/parallel_benchmark/. It executes multi-threaded benchmark scenarios against Materialize, measuring query latency and throughput under concurrent load. Unlike the parallel workload framework (which focuses on correctness through randomized actions), this framework focuses on performance regression detection through structured, repeatable scenarios with defined quality-of-service guarantees.

The module consists of two source files:

File Lines Purpose
framework.py 517 Core benchmark infrastructure: measurement storage, action types, load distributions, phase orchestration, and scenario base class
scenarios.py 1208 Concrete benchmark scenario definitions (Kafka, Postgres, MySQL replication, read workloads, CQRS, operational data patterns)

Framework Architecture

Measurement Storage

The framework provides two measurement storage backends via the MeasurementsStore abstract class:

MemoryStore -- Holds measurements in a defaultdict[str, list[Measurement]] for lightweight in-process use.

SQLiteStore -- Persists measurements to a SQLite database file (parallel-benchmark.db) with WAL journaling, exclusive locking, and a 64 MB cache. This store is thread-safe via an explicit threading.Lock and requires SQLite thread-safety level 3 (serialized).

class SQLiteStore(MeasurementsStore):
    def __init__(self, scenario: str):
        self.lock = threading.Lock()
        self.conn = sqlite3.connect(
            DB_FILE, check_same_thread=False, isolation_level=None
        )
        cursor = self.conn.cursor()
        cursor.execute("PRAGMA journal_mode=WAL;")
        cursor.execute("PRAGMA synchronous=OFF;")
        cursor.execute("PRAGMA cache_size=-64000;")  # 64 MB
        cursor.execute("PRAGMA locking_mode=EXCLUSIVE;")

Each Measurement records a duration (seconds) and a timestamp (epoch). The SQLite schema stores measurements as (scenario, action, duration, timestamp).

Action Types

The framework defines four query execution strategies, all inheriting from the base Action class:

Class Behavior Connection Model
StandaloneQuery Opens a new connection per query, executes, and closes Fresh connection each time
ReuseConnQuery Maintains a persistent connection across queries Single long-lived connection
PooledQuery Draws a connection from a shared queue.Queue pool Connection pool
TdAction Delegates to the testdrive framework for Kafka/source operations Composition-based

Each action automatically records its execution duration after completing:

class Action:
    def run(self, start_time: float, conns: queue.Queue, state: State):
        self._run(conns)
        duration = time.time() - start_time
        state.measurements.add(str(self), Measurement(duration, start_time))

The StandaloneQuery and ReuseConnQuery classes support a strict_serializable parameter that controls the transaction isolation level (defaulting to STRICT SERIALIZABLE or falling back to SERIALIZABLE).

The execute_query() helper function handles deadlock detection with automatic retry and real-time recency (RTR) timeout tolerance.

Load Distributions

The framework supports two load-generation patterns through the Distribution base class:

Periodic -- Generates timestamps at a fixed rate of per_second invocations per second, spread evenly within each second. The rate can be overridden at runtime via State.periodic_dists.

class Periodic(Distribution):
    def generate(self, duration, action_name, state):
        per_second = state.periodic_dists.get(action_name) or self.per_second
        next_time = time.time()
        for i in range(int(duration * per_second)):
            yield next_time
            next_time += 1 / per_second
            sleep_until(next_time)

Gaussian -- Generates inter-arrival times drawn from a Gaussian distribution with configurable mean and stddev, running until the duration expires.

Phase Actions

Load generation is wrapped in two PhaseAction strategies:

OpenLoop -- Pairs an Action with a Distribution. Submits jobs to a thread pool queue at the distribution-defined rate, regardless of whether previous jobs have completed. This models external load that does not back off.

ClosedLoop -- Executes an action in a tight loop on its own thread, starting the next invocation only after the previous one completes. This models clients that wait for responses.

Both support a report_regressions flag to control whether their measurements contribute to regression analysis.

Phases

A scenario is composed of an ordered list of Phase objects:

TdPhase -- Executes testdrive commands for setup (creating connections, sources, tables, materialized views). This is typically the first phase in any scenario.

LoadPhase -- Runs a set of PhaseAction instances concurrently for a specified duration (in seconds). Each phase action runs on its own thread, and the phase waits for all threads to join before proceeding.

class LoadPhase(Phase):
    def run(self, c, jobs, conns, state):
        duration = state.load_phase_duration or self.duration
        threads = [
            threading.Thread(
                target=phase_action.run,
                args=(duration, jobs, conns, state),
            )
            for phase_action in self.phase_actions
        ]
        for thread in threads:
            thread.start()
        for thread in threads:
            thread.join()

Scenario Base Class

The Scenario class orchestrates the full benchmark lifecycle:

Method Purpose
init() Configures phases, thread pool size (default 5000), connection pool size, QoS guarantees, and regression thresholds
setup() Creates the thread pool and connection pool
run() Executes all phases in order
teardown() Drains connection pool, signals threads to stop, and joins all threads

Each scenario defines guarantees -- a dictionary mapping action names to minimum QPS and maximum p99 latency thresholds:

self.init(
    phases=[...],
    guarantees={
        "SELECT * FROM kafka_mv (standalone)": {"qps": 15, "p99": 400},
    },
)

The disabled() decorator can mark a scenario class as not enabled, with an ignore_reason string.

Benchmark Scenarios

The scenarios.py module defines the following concrete scenarios:

Scenario Description Key Workload
Kafka Kafka source ingestion with concurrent reads 1 msg/s ingestion + 10 closed-loop SELECT threads
CreateKafkaSink Kafka sink creation under load Continuous CREATE SINK in a closed loop over 900s
PgReadReplica Postgres CDC replication with reads 100 inserts/s into Postgres + 10 SELECT threads on materialized view
PgReadReplicaRTR Postgres replication with real-time recency queries Similar to PgReadReplica but with strict serializable reads
MySQLReadReplica MySQL CDC replication with reads 100 inserts/s into MySQL + 10 SELECT threads
OpenIndexedSelects Indexed SELECT performance under open-loop load 500 standalone SELECT/s against indexed views
ConnectRead Connection establishment overhead Standalone queries measuring connect + query latency
FlagUpdate System flag update impact Periodic ALTER SYSTEM SET during concurrent reads
Read Pure read throughput 10 closed-loop standalone SELECT threads
PoolRead Connection-pooled read throughput 10 closed-loop pooled SELECT threads
StatementLogging Impact of statement logging on performance Enables statement logging then runs read workload
InsertWhereNotExists INSERT WHERE NOT EXISTS pattern Closed-loop INSERT ... WHERE NOT EXISTS queries
InsertsSelects Mixed read/write workload Concurrent INSERT and SELECT operations
CommandQueryResponsibilitySegregation CQRS pattern benchmark Writes to one cluster, reads from materialized views on another
OperationalDataStore Operational data store pattern Postgres CDC + Kafka ingestion + concurrent analytical queries
OperationalDataMesh Data mesh pattern Multiple Postgres sources feeding materialized views with concurrent reads
ReadReplicaBenchmark Read replica pattern at scale Postgres replication with large-scale concurrent reads
StagingBench Staging environment benchmark Performance testing for staging deployments

Scenario Structure Pattern

Most scenarios follow a common structure:

  1. A TdPhase that creates connections, sources, tables, and materialized views using testdrive syntax.
  2. A LoadPhase combining OpenLoop actions for data ingestion with ClosedLoop actions for query workload.
  3. A guarantees dictionary specifying minimum QPS and maximum p99 latency for key queries.

Example scenario structure (Kafka):

class Kafka(Scenario):
    def __init__(self, c, conn_infos):
        self.init(
            [
                TdPhase("""
                    $ kafka-create-topic topic=kafka
                    $ kafka-ingest format=avro topic=kafka ...
                    > CREATE SOURCE kafka FROM KAFKA CONNECTION kafka_conn ...
                    > CREATE TABLE kafka_tbl FROM SOURCE kafka ...
                    > CREATE MATERIALIZED VIEW kafka_mv AS SELECT * FROM kafka_tbl;
                    > CREATE DEFAULT INDEX ON kafka_mv;
                """),
                LoadPhase(
                    duration=120,
                    actions=[
                        OpenLoop(
                            action=TdAction("$ kafka-ingest ...", c),
                            dist=Periodic(per_second=1),
                        )
                    ] + [
                        ClosedLoop(
                            action=StandaloneQuery(
                                "SELECT * FROM kafka_mv",
                                conn_infos["materialized"],
                                strict_serializable=False,
                            ),
                        )
                        for i in range(10)
                    ],
                ),
            ],
            guarantees={
                "SELECT * FROM kafka_mv (standalone)": {"qps": 15, "p99": 400},
            },
        )

Thread Pool Architecture

The framework uses a job-queue-based thread pool. During setup(), up to 5000 worker threads are spawned, each running the run_job() function that pulls callables from a shared queue.Queue:

def run_job(jobs: queue.Queue) -> None:
    while True:
        job = jobs.get()
        try:
            if not job:
                return
            job()
        finally:
            jobs.task_done()

OpenLoop actions submit their work to this queue. ClosedLoop actions run directly on their own thread. During teardown, None sentinels are pushed for each thread to signal termination.

File Locations

File Absolute Path
framework.py misc/python/materialize/parallel_benchmark/framework.py
scenarios.py misc/python/materialize/parallel_benchmark/scenarios.py

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment