Implementation:MaterializeInc Materialize Scalability Benchmark Framework
Overview
The Scalability Benchmark Framework is a Python-based performance testing module within the Materialize repository. It measures transactions per second (TPS) across increasing concurrency levels for a variety of SQL workloads, compares results against a baseline endpoint, and detects performance regressions. The framework lives under misc/python/materialize/scalability/ and is organized into clearly separated sub-packages for execution, endpoints, operations, workloads, configuration, schema setup, DataFrame wrappers, result analysis, and plotting.
BenchmarkExecutor
Source: misc/python/materialize/scalability/executor/benchmark_executor.py
The BenchmarkExecutor class is the central orchestrator of benchmark runs. It accepts a BenchmarkConfiguration, a Schema, a baseline Endpoint (optional), a list of other endpoints to test, and a ResultAnalyzer.
Execution Flow
- The executor iterates over each workload class from the configuration.
- For each workload, it runs the workload against the baseline endpoint first (if one is provided), then against every other endpoint.
- After each non-baseline run, it invokes the
ResultAnalyzerto compare TPS against the baseline. - If a regression is detected, the executor retries the workload up to
MAX_RETRIES_ON_REGRESSION = 2additional times before accepting the result.
Concurrency Scheduling
Concurrency levels are generated exponentially using the formula:
concurrencies = [round(exponent_base ** c) for c in range(0, range_end)]
The resulting values are deduplicated, sorted, and filtered to fall within [min_concurrency, max_concurrency]. For each concurrency level, the operation count is scaled by the formula floor(count * sqrt(concurrency)) as defined in BenchmarkConfiguration.
Thread Pool and Cursor Pool
For each concurrency level, the executor:
- Initializes the schema via
Schema.init_sqls(). - Runs any workload-specific
init_operations(). - Creates a cursor pool with one
psycopgcursor per concurrent thread. - Uses a
ThreadPoolExecutorto dispatch operations across workers. Each worker receives a uniqueworker_idassigned during thread initialization via a lock-guarded counter.
Measurement Collection
Each individual operation execution is timed via wall-clock measurement. The executor collects:
- Per-operation detail records: concurrency, wallclock duration, operation class name, workload name, and transaction index.
- Per-concurrency totals: concurrency, total wallclock, workload name, count, TPS, mean/median/min/max transaction duration.
These are stored as DfDetails and DfTotals DataFrames and written to CSV files under an endpoint-specific directory.
class BenchmarkExecutor:
def __init__(
self,
config: BenchmarkConfiguration,
schema: Schema,
baseline_endpoint: Endpoint | None,
other_endpoints: list[Endpoint],
result_analyzer: ResultAnalyzer,
):
...
def run_workloads(self) -> BenchmarkResult:
for workload_cls in self.config.workload_classes:
self.run_workload_for_all_endpoints(workload_cls)
return self.result
Endpoint Abstract Class and Implementations
Source: misc/python/materialize/scalability/endpoint/endpoint.py, misc/python/materialize/scalability/endpoint/endpoints.py
Endpoint Base Class
The Endpoint class is the abstract base for all benchmark targets. It defines the interface for connecting to a SQL database and querying its version.
class Endpoint:
def sql_connection(self, quiet=False, kind=ConnectionKind.Plain):
conn = psycopg.connect(self.url(kind))
conn.autocommit = True
return conn
def url(self, kind=ConnectionKind.Plain) -> str:
return f"postgresql://{self.user()}:{self.password()}@{self.host()}:{port}/{self.database()}"
Key abstract methods that subclasses must implement: host(), user(), password(), database(), port(), and up().
The ConnectionKind enum supports Plain, Password, and Sasl connection types, each using a different port method.
The try_load_version() method queries SELECT mz_version() from the endpoint and caches the result. If the query fails, it returns "unknown".
Concrete Endpoint Implementations
| Class | Description |
|---|---|
| MaterializeRemote | Connects to a remote Materialize instance via a user-provided psql URL. The up() method is a no-op.
|
| PostgresContainer | Starts a PostgreSQL container via the mzcompose composition framework. Returns "postgres" as its version name.
|
| MaterializeNonRemote | Abstract base for locally-launched Materialize instances. Includes a lift_limits() method that sets max_connections, max_tables, and max_materialized_views to 65535 via an internal system connection.
|
Operation Base Class and Concrete Operations
Source: misc/python/materialize/scalability/operation/scalability_operation.py, misc/python/materialize/scalability/operation/operations/operations.py
Operation Hierarchy
The Operation base class defines the interface for a single executable step within a workload. It uses a data-passing pattern via OperationData (a dictionary-like object carrying a cursor, worker ID, and arbitrary key-value pairs).
class Operation:
def required_keys(self) -> set[str]: ...
def produced_keys(self) -> set[str]: ...
def execute(self, data: OperationData) -> OperationData:
data.validate_requirements(self.required_keys(), ...)
data = self._execute(data)
data.validate_requirements(self.produced_keys(), ...)
return data
The class hierarchy provides progressively specialized SQL operation types:
| Class | Description |
|---|---|
| SqlOperationWithInput | Executes a SQL statement derived from input data. Requires a cursor key. Calls fetchall() after execution.
|
| SqlOperationWithSeed | Extends SqlOperationWithInput to accept a single seed key for parameterized SQL.
|
| SqlOperationWithTwoSeeds | Accepts two seed keys for SQL statements requiring two parameters. |
| SimpleSqlOperation | Requires no input keys beyond the cursor; executes a fixed SQL string. |
| OperationChainWithDataExchange | Chains multiple Operation instances sequentially, passing OperationData between them.
|
Concrete SQL Operations
The operations/operations.py module defines specific SQL operations:
| Operation | SQL Statement |
|---|---|
| InsertDefaultValues | INSERT INTO t1 DEFAULT VALUES;
|
| SelectOne | SELECT 1;
|
| SelectStar | SELECT * FROM t1;
|
| SelectLimit | SELECT * FROM t1 LIMIT 1;
|
| SelectCount | SELECT COUNT(*) FROM t1;
|
| SelectCountInMv | SELECT count FROM mv1;
|
| SelectUnionAll | SELECT * FROM t1 UNION ALL SELECT * FROM t1;
|
| Update | UPDATE t1 SET f1 = f1 + 1;
|
| CreateTableX | CREATE TABLE x_{seed} (f1 INT, f2 INT, f3 INT, f4 INT, f5 INT);
|
| CreateIndexOnTableX | CREATE INDEX i_x_{seed} ON x_{seed} (f1);
|
| CreateMvOnTableX | CREATE MATERIALIZED VIEW mv_x_{seed} AS SELECT * FROM x_{seed};
|
| CreateViewXOnSeries | Creates a view (materialized or plain) over generate_series(1, 100).
|
Workload Base Classes, Markers, and Concrete Workloads
Source: misc/python/materialize/scalability/workload/workload.py, misc/python/materialize/scalability/workload/workload_markers.py, misc/python/materialize/scalability/workload/workloads/
Workload and WorkloadWithContext
The Workload class is the abstract base for all benchmark workloads:
class Workload:
def init_operations(self) -> list[Operation]:
return []
def operations(self) -> list[Operation]:
raise NotImplementedError
def execute_operation(self, operation, cursor, worker_id, transaction_index, verbose):
data = OperationData(cursor, worker_id)
self.amend_data_before_execution(data)
operation.execute(data)
def name(self) -> str:
return self.__class__.__name__
def version(self) -> WorkloadVersion:
return WorkloadVersion.create(1, 0, 0)
WorkloadWithContext extends Workload to carry references to the current Endpoint and Schema, which are injected by the executor before the run.
Workload Markers
Workload markers are abstract grouping classes used to categorize workloads:
| Marker Class | Group Name |
|---|---|
| DmlDqlWorkload | DML & DQL |
| DdlWorkload | DDL |
| ConnectionWorkload | Connection |
| SelfTestWorkload | Self-Test |
DML/DQL Workloads
Source: misc/python/materialize/scalability/workload/workloads/dml_dql_workloads.py
These workloads extend DmlDqlWorkload and exercise data manipulation and query statements:
- InsertWorkload -- Repeatedly inserts default values into
t1. - SelectOneWorkload -- Repeatedly executes
SELECT 1;. - SelectStarWorkload -- Repeatedly runs
SELECT * FROM t1;. - SelectLimitWorkload -- Runs
SELECT * FROM t1 LIMIT 1;. - SelectCountWorkload -- Runs
SELECT COUNT(*) FROM t1;. - SelectUnionAllWorkload -- Runs a
UNION ALLoft1with itself. - InsertAndSelectCountInMvWorkload -- Alternates between inserting and reading count from a materialized view.
- InsertAndSelectLimitWorkload -- Alternates between inserting and reading with
LIMIT 1. - UpdateWorkload -- Repeatedly runs
UPDATE t1 SET f1 = f1 + 1;.
DDL Workloads
Source: misc/python/materialize/scalability/workload/workloads/ddl_workloads.py
These workloads extend DdlWorkload and exercise schema operations:
- CreateAndDropTableWorkload -- Creates a table, populates it, queries it, and drops it, all chained as an
OperationChainWithDataExchange. - CreateAndDropTableWithMvWorkload -- Same as above but also creates and queries a materialized view before dropping.
- CreateAndReplaceViewWorkload -- Creates five views on
generate_series, creates a merged view across them, then drops all six views.
Connection Workloads
Source: misc/python/materialize/scalability/workload/workloads/connection_workloads.py
These workloads extend both WorkloadWithContext and ConnectionWorkload:
- EstablishConnectionWorkload -- Chains
Connect(),SelectOne(),Disconnect(). - EstablishPasswordConnectionWorkload -- Same chain using password-authenticated connections.
- EstablishSaslConnectionWorkload -- Same chain using SASL-authenticated connections.
Each workload pushes the endpoint and schema into the OperationData and removes the pre-existing cursor, since the connection itself is being benchmarked.
Self-Test Workloads
Source: misc/python/materialize/scalability/workload/workloads/self_test_workloads.py
These workloads extend SelfTestWorkload and are used for framework validation:
- EmptyOperatorWorkload -- Executes an empty no-op operation.
- EmptySqlStatementWorkload -- Executes an empty SQL statement.
- Sleep10MsInEnvironmentdWorkload -- Sleeps 10ms in the environmentd process.
- Sleep10MsInClusterdWorkload -- Sleeps 10ms in the clusterd process.
- Sleep10MsInPythonWorkload -- Sleeps 10ms in the Python client.
BenchmarkConfiguration
Source: misc/python/materialize/scalability/config/benchmark_config.py
BenchmarkConfiguration is a dataclass holding all parameters for a benchmark run:
@dataclass
class BenchmarkConfiguration:
workload_classes: list[type[Workload]]
exponent_base: float
min_concurrency: int
max_concurrency: int
count: int
verbose: bool
def get_count_for_concurrency(self, concurrency: int) -> int:
return floor(self.count * sqrt(concurrency))
The get_count_for_concurrency method scales the number of operations proportionally to the square root of concurrency. This ensures that higher concurrency levels execute more total operations, producing stable TPS measurements even when individual operations complete quickly.
Schema Configuration
Source: misc/python/materialize/scalability/schema/schema.py
The Schema class manages database schema setup for benchmark runs. It supports configurable:
- Source type -- Currently only
Source.TABLE. - Schema name -- Defaults to
"scalability". - Index creation -- Optionally creates indexes on tables and materialized views and waits for them to become queryable.
- Transaction isolation -- Supports
SERIALIZABLEandSTRICT SERIALIZABLE. - Cluster name -- Optionally targets a specific cluster.
- Object count -- Number of tables (and corresponding MVs and indexes) to create.
The init_sqls() method generates the full initialization sequence:
def init_sqls(self) -> list[str]:
init_sqls = self.connect_sqls() + [
f"DROP SCHEMA IF EXISTS {self.schema} CASCADE;",
f"CREATE SCHEMA {self.schema};",
"DROP TABLE IF EXISTS t1;",
]
# Creates tables, inserts default values, creates MVs, and optionally indexes
...
The connect_sqls() method generates per-connection setup statements including SET SCHEMA, SET CLUSTER, and SET transaction_isolation.
DataFrame Wrappers for Results
Source: misc/python/materialize/scalability/df/df_totals.py
The framework uses typed pandas DataFrame wrappers to provide a structured interface over raw benchmark data.
DfTotals
Wraps the per-concurrency summary DataFrame with columns for concurrency, wallclock, workload name, count, TPS, and mean/median/min/max transaction duration.
Key methods:
get_max_concurrency()-- Returns the highest concurrency level measured.get_concurrency_values()-- Returns all concurrency values as a list.get_tps_values()-- Returns all TPS values as a list.merge(other)-- Joins twoDfTotalson count, concurrency, and workload columns, producing aDfTotalsMergedwithTPS_BASELINEandTPS_OTHERcolumns for comparison.
DfTotalsMerged
An intermediate representation produced by merging two endpoint result DataFrames. It is further converted into a DfTotalsExtended (enriched with percentage differences) for use in regression detection.
concat_df_totals
A utility function that concatenates multiple DfTotals instances into one.
Result Analysis: Regression Detection and TPS Comparison
Source: misc/python/materialize/scalability/result/result_analyzers.py, misc/python/materialize/scalability/result/regression_assessment.py, misc/python/materialize/scalability/result/scalability_result.py
DefaultResultAnalyzer
The DefaultResultAnalyzer implements the ResultAnalyzer interface and performs TPS comparison between a baseline and a test endpoint:
class DefaultResultAnalyzer(ResultAnalyzer):
def __init__(self, max_deviation_as_percent_decimal: float):
self.max_deviation_as_percent_decimal = max_deviation_as_percent_decimal
def perform_comparison_in_workload(self, workload_name, baseline_endpoint,
other_endpoint, baseline_result, other_result):
merged_data = baseline_result.df_totals.merge(other_result.df_totals)
tps_per_endpoint_data = merged_data.to_enriched_result_frame(...)
entries_worse = tps_per_endpoint_data.to_filtered_with_threshold(
self.max_deviation_as_percent_decimal, match_results_better_than_baseline=False
)
entries_better = tps_per_endpoint_data.to_filtered_with_threshold(
self.max_deviation_as_percent_decimal, match_results_better_than_baseline=True
)
...
The comparison pipeline works as follows:
- The baseline and test
DfTotalsare merged on matching concurrency levels and workload names. - The merged frame is enriched with percentage differences.
- Entries exceeding the
max_deviation_as_percent_decimalthreshold in the negative direction become Regression objects. - Entries exceeding the threshold in the positive direction become ScalabilityImprovement objects.
- Both are wrapped in a
ComparisonOutcomewhich tracks regressions, improvements, and the set of endpoints with regressions.
ScalabilityChange, Regression, and ScalabilityImprovement
Source: misc/python/materialize/scalability/result/scalability_change.py
ScalabilityChange is the base class for detected performance changes. Each instance records:
- Workload name and concurrency level
- Operation count
- TPS of the test endpoint and the baseline
- Absolute and percentage TPS difference
- The endpoint where the change was detected
Regression validates that tps_diff < 0 (performance degradation). ScalabilityImprovement validates that tps_diff > 0.
ComparisonOutcome
Source: misc/python/materialize/scalability/result/comparison_outcome.py
Aggregates all regressions and improvements from a comparison. Supports merging outcomes across multiple workloads and filtering regressions by endpoint.
RegressionAssessment
Source: misc/python/materialize/scalability/result/regression_assessment.py
RegressionAssessment determines whether detected regressions are justified (i.e., known and accepted) or unjustified (new and unexpected). It does this by:
- Checking whether both the baseline and the test endpoint reference release versions.
- Looking up known regression commits between the two versions via
get_commits_of_accepted_regressions_between_versions(). - If matching commits exist, the regression is marked as justified; otherwise it is unjustified.
Only unjustified regressions are reported as test failures via to_failure_details().
BenchmarkResult
Source: misc/python/materialize/scalability/result/scalability_result.py
The BenchmarkResult dataclass is the top-level container for all benchmark data:
@dataclass
class BenchmarkResult:
overall_comparison_outcome: ComparisonOutcome
df_total_by_endpoint_name_and_workload: dict[str, dict[str, DfTotals]]
df_details_by_endpoint_name_and_workload: dict[str, dict[str, DfDetails]]
workload_version_by_name: dict[str, WorkloadVersion]
workload_group_by_name: dict[str, str]
It provides methods to:
- Add regression outcomes from individual workload comparisons.
- Record workload metadata (name, version, group).
- Append per-endpoint, per-workload results.
- Retrieve results grouped by endpoint or by workload.
Plotting Functions
Source: misc/python/materialize/scalability/plot/plot.py
The plotting module generates visualizations of benchmark results using matplotlib.
plot_tps_per_connections
Produces a scatter plot of TPS versus concurrent SQL connections for each endpoint:
def plot_tps_per_connections(
workload_name, figure, df_totals_by_endpoint_name,
baseline_version_name, include_zero_in_y_axis, ...
):
for endpoint_version_name, df_totals in df_totals_by_endpoint_name.items():
plot.scatter(
df_totals.get_concurrency_values(),
df_totals.get_tps_values(),
marker=_get_plot_marker(endpoint_version_name, baseline_version_name),
)
plot.set_ylabel("Transactions Per Second (tps)")
plot.set_xlabel("Concurrent SQL Connections")
The baseline endpoint uses a distinct marker style. The y-axis can optionally start at zero.
plot_duration_by_connections_for_workload
Plots the distribution of individual transaction durations per concurrency level. Supports both violin and box plot types via the DistributionPlotType enum (default: violin).
Module Structure Summary
| Sub-package | Key Classes | Responsibility |
|---|---|---|
executor/ |
BenchmarkExecutor |
Orchestrates workload execution across endpoints and concurrency levels |
endpoint/ |
Endpoint, MaterializeRemote, PostgresContainer, MaterializeNonRemote |
Abstracts database connection targets |
operation/ |
Operation, SimpleSqlOperation, SqlOperationWithSeed, OperationChainWithDataExchange |
Defines individual SQL operations with data-passing contracts |
workload/ |
Workload, WorkloadWithContext, DmlDqlWorkload, DdlWorkload, ConnectionWorkload |
Groups operations into named, categorized benchmark scenarios |
config/ |
BenchmarkConfiguration |
Holds all tunable parameters for a benchmark run |
schema/ |
Schema |
Manages database object initialization and per-connection setup |
df/ |
DfTotals, DfTotalsMerged, DfDetails |
Typed pandas DataFrame wrappers for structured result access |
result/ |
DefaultResultAnalyzer, ComparisonOutcome, RegressionAssessment, BenchmarkResult |
Regression detection, justification logic, and result aggregation |
plot/ |
plot_tps_per_connections, plot_duration_by_connections_for_workload |
Visualization of TPS and duration distributions |