Implementation:MaterializeInc Materialize Parallel Benchmark Framework
Overview
The parallel benchmark framework is a performance measurement system located in misc/python/materialize/parallel_benchmark/. It executes multi-threaded benchmark scenarios against Materialize, measuring query latency and throughput under concurrent load. Unlike the parallel workload framework (which focuses on correctness through randomized actions), this framework focuses on performance regression detection through structured, repeatable scenarios with defined quality-of-service guarantees.
The module consists of two source files:
| File | Lines | Purpose |
|---|---|---|
framework.py |
517 | Core benchmark infrastructure: measurement storage, action types, load distributions, phase orchestration, and scenario base class |
scenarios.py |
1208 | Concrete benchmark scenario definitions (Kafka, Postgres, MySQL replication, read workloads, CQRS, operational data patterns) |
Framework Architecture
Measurement Storage
The framework provides two measurement storage backends via the MeasurementsStore abstract class:
MemoryStore -- Holds measurements in a defaultdict[str, list[Measurement]] for lightweight in-process use.
SQLiteStore -- Persists measurements to a SQLite database file (parallel-benchmark.db) with WAL journaling, exclusive locking, and a 64 MB cache. This store is thread-safe via an explicit threading.Lock and requires SQLite thread-safety level 3 (serialized).
class SQLiteStore(MeasurementsStore):
def __init__(self, scenario: str):
self.lock = threading.Lock()
self.conn = sqlite3.connect(
DB_FILE, check_same_thread=False, isolation_level=None
)
cursor = self.conn.cursor()
cursor.execute("PRAGMA journal_mode=WAL;")
cursor.execute("PRAGMA synchronous=OFF;")
cursor.execute("PRAGMA cache_size=-64000;") # 64 MB
cursor.execute("PRAGMA locking_mode=EXCLUSIVE;")
Each Measurement records a duration (seconds) and a timestamp (epoch). The SQLite schema stores measurements as (scenario, action, duration, timestamp).
Action Types
The framework defines four query execution strategies, all inheriting from the base Action class:
| Class | Behavior | Connection Model |
|---|---|---|
StandaloneQuery |
Opens a new connection per query, executes, and closes | Fresh connection each time |
ReuseConnQuery |
Maintains a persistent connection across queries | Single long-lived connection |
PooledQuery |
Draws a connection from a shared queue.Queue pool |
Connection pool |
TdAction |
Delegates to the testdrive framework for Kafka/source operations | Composition-based |
Each action automatically records its execution duration after completing:
class Action:
def run(self, start_time: float, conns: queue.Queue, state: State):
self._run(conns)
duration = time.time() - start_time
state.measurements.add(str(self), Measurement(duration, start_time))
The StandaloneQuery and ReuseConnQuery classes support a strict_serializable parameter that controls the transaction isolation level (defaulting to STRICT SERIALIZABLE or falling back to SERIALIZABLE).
The execute_query() helper function handles deadlock detection with automatic retry and real-time recency (RTR) timeout tolerance.
Load Distributions
The framework supports two load-generation patterns through the Distribution base class:
Periodic -- Generates timestamps at a fixed rate of per_second invocations per second, spread evenly within each second. The rate can be overridden at runtime via State.periodic_dists.
class Periodic(Distribution):
def generate(self, duration, action_name, state):
per_second = state.periodic_dists.get(action_name) or self.per_second
next_time = time.time()
for i in range(int(duration * per_second)):
yield next_time
next_time += 1 / per_second
sleep_until(next_time)
Gaussian -- Generates inter-arrival times drawn from a Gaussian distribution with configurable mean and stddev, running until the duration expires.
Phase Actions
Load generation is wrapped in two PhaseAction strategies:
OpenLoop -- Pairs an Action with a Distribution. Submits jobs to a thread pool queue at the distribution-defined rate, regardless of whether previous jobs have completed. This models external load that does not back off.
ClosedLoop -- Executes an action in a tight loop on its own thread, starting the next invocation only after the previous one completes. This models clients that wait for responses.
Both support a report_regressions flag to control whether their measurements contribute to regression analysis.
Phases
A scenario is composed of an ordered list of Phase objects:
TdPhase -- Executes testdrive commands for setup (creating connections, sources, tables, materialized views). This is typically the first phase in any scenario.
LoadPhase -- Runs a set of PhaseAction instances concurrently for a specified duration (in seconds). Each phase action runs on its own thread, and the phase waits for all threads to join before proceeding.
class LoadPhase(Phase):
def run(self, c, jobs, conns, state):
duration = state.load_phase_duration or self.duration
threads = [
threading.Thread(
target=phase_action.run,
args=(duration, jobs, conns, state),
)
for phase_action in self.phase_actions
]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
Scenario Base Class
The Scenario class orchestrates the full benchmark lifecycle:
| Method | Purpose |
|---|---|
init() |
Configures phases, thread pool size (default 5000), connection pool size, QoS guarantees, and regression thresholds |
setup() |
Creates the thread pool and connection pool |
run() |
Executes all phases in order |
teardown() |
Drains connection pool, signals threads to stop, and joins all threads |
Each scenario defines guarantees -- a dictionary mapping action names to minimum QPS and maximum p99 latency thresholds:
self.init(
phases=[...],
guarantees={
"SELECT * FROM kafka_mv (standalone)": {"qps": 15, "p99": 400},
},
)
The disabled() decorator can mark a scenario class as not enabled, with an ignore_reason string.
Benchmark Scenarios
The scenarios.py module defines the following concrete scenarios:
| Scenario | Description | Key Workload |
|---|---|---|
| Kafka | Kafka source ingestion with concurrent reads | 1 msg/s ingestion + 10 closed-loop SELECT threads |
| CreateKafkaSink | Kafka sink creation under load | Continuous CREATE SINK in a closed loop over 900s |
| PgReadReplica | Postgres CDC replication with reads | 100 inserts/s into Postgres + 10 SELECT threads on materialized view |
| PgReadReplicaRTR | Postgres replication with real-time recency queries | Similar to PgReadReplica but with strict serializable reads |
| MySQLReadReplica | MySQL CDC replication with reads | 100 inserts/s into MySQL + 10 SELECT threads |
| OpenIndexedSelects | Indexed SELECT performance under open-loop load | 500 standalone SELECT/s against indexed views |
| ConnectRead | Connection establishment overhead | Standalone queries measuring connect + query latency |
| FlagUpdate | System flag update impact | Periodic ALTER SYSTEM SET during concurrent reads |
| Read | Pure read throughput | 10 closed-loop standalone SELECT threads |
| PoolRead | Connection-pooled read throughput | 10 closed-loop pooled SELECT threads |
| StatementLogging | Impact of statement logging on performance | Enables statement logging then runs read workload |
| InsertWhereNotExists | INSERT WHERE NOT EXISTS pattern | Closed-loop INSERT ... WHERE NOT EXISTS queries |
| InsertsSelects | Mixed read/write workload | Concurrent INSERT and SELECT operations |
| CommandQueryResponsibilitySegregation | CQRS pattern benchmark | Writes to one cluster, reads from materialized views on another |
| OperationalDataStore | Operational data store pattern | Postgres CDC + Kafka ingestion + concurrent analytical queries |
| OperationalDataMesh | Data mesh pattern | Multiple Postgres sources feeding materialized views with concurrent reads |
| ReadReplicaBenchmark | Read replica pattern at scale | Postgres replication with large-scale concurrent reads |
| StagingBench | Staging environment benchmark | Performance testing for staging deployments |
Scenario Structure Pattern
Most scenarios follow a common structure:
- A
TdPhasethat creates connections, sources, tables, and materialized views using testdrive syntax. - A
LoadPhasecombiningOpenLoopactions for data ingestion withClosedLoopactions for query workload. - A
guaranteesdictionary specifying minimum QPS and maximum p99 latency for key queries.
Example scenario structure (Kafka):
class Kafka(Scenario):
def __init__(self, c, conn_infos):
self.init(
[
TdPhase("""
$ kafka-create-topic topic=kafka
$ kafka-ingest format=avro topic=kafka ...
> CREATE SOURCE kafka FROM KAFKA CONNECTION kafka_conn ...
> CREATE TABLE kafka_tbl FROM SOURCE kafka ...
> CREATE MATERIALIZED VIEW kafka_mv AS SELECT * FROM kafka_tbl;
> CREATE DEFAULT INDEX ON kafka_mv;
"""),
LoadPhase(
duration=120,
actions=[
OpenLoop(
action=TdAction("$ kafka-ingest ...", c),
dist=Periodic(per_second=1),
)
] + [
ClosedLoop(
action=StandaloneQuery(
"SELECT * FROM kafka_mv",
conn_infos["materialized"],
strict_serializable=False,
),
)
for i in range(10)
],
),
],
guarantees={
"SELECT * FROM kafka_mv (standalone)": {"qps": 15, "p99": 400},
},
)
Thread Pool Architecture
The framework uses a job-queue-based thread pool. During setup(), up to 5000 worker threads are spawned, each running the run_job() function that pulls callables from a shared queue.Queue:
def run_job(jobs: queue.Queue) -> None:
while True:
job = jobs.get()
try:
if not job:
return
job()
finally:
jobs.task_done()
OpenLoop actions submit their work to this queue. ClosedLoop actions run directly on their own thread. During teardown, None sentinels are pushed for each thread to signal termination.
File Locations
| File | Absolute Path |
|---|---|
| framework.py | misc/python/materialize/parallel_benchmark/framework.py
|
| scenarios.py | misc/python/materialize/parallel_benchmark/scenarios.py
|