Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:MaterializeInc Materialize Scalability Benchmark Framework

From Leeroopedia
Revision as of 15:39, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/MaterializeInc_Materialize_Scalability_Benchmark_Framework.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Overview

The Scalability Benchmark Framework is a Python-based performance testing module within the Materialize repository. It measures transactions per second (TPS) across increasing concurrency levels for a variety of SQL workloads, compares results against a baseline endpoint, and detects performance regressions. The framework lives under misc/python/materialize/scalability/ and is organized into clearly separated sub-packages for execution, endpoints, operations, workloads, configuration, schema setup, DataFrame wrappers, result analysis, and plotting.

BenchmarkExecutor

Source: misc/python/materialize/scalability/executor/benchmark_executor.py

The BenchmarkExecutor class is the central orchestrator of benchmark runs. It accepts a BenchmarkConfiguration, a Schema, a baseline Endpoint (optional), a list of other endpoints to test, and a ResultAnalyzer.

Execution Flow

  1. The executor iterates over each workload class from the configuration.
  2. For each workload, it runs the workload against the baseline endpoint first (if one is provided), then against every other endpoint.
  3. After each non-baseline run, it invokes the ResultAnalyzer to compare TPS against the baseline.
  4. If a regression is detected, the executor retries the workload up to MAX_RETRIES_ON_REGRESSION = 2 additional times before accepting the result.

Concurrency Scheduling

Concurrency levels are generated exponentially using the formula:

concurrencies = [round(exponent_base ** c) for c in range(0, range_end)]

The resulting values are deduplicated, sorted, and filtered to fall within [min_concurrency, max_concurrency]. For each concurrency level, the operation count is scaled by the formula floor(count * sqrt(concurrency)) as defined in BenchmarkConfiguration.

Thread Pool and Cursor Pool

For each concurrency level, the executor:

  • Initializes the schema via Schema.init_sqls().
  • Runs any workload-specific init_operations().
  • Creates a cursor pool with one psycopg cursor per concurrent thread.
  • Uses a ThreadPoolExecutor to dispatch operations across workers. Each worker receives a unique worker_id assigned during thread initialization via a lock-guarded counter.

Measurement Collection

Each individual operation execution is timed via wall-clock measurement. The executor collects:

  • Per-operation detail records: concurrency, wallclock duration, operation class name, workload name, and transaction index.
  • Per-concurrency totals: concurrency, total wallclock, workload name, count, TPS, mean/median/min/max transaction duration.

These are stored as DfDetails and DfTotals DataFrames and written to CSV files under an endpoint-specific directory.

class BenchmarkExecutor:
    def __init__(
        self,
        config: BenchmarkConfiguration,
        schema: Schema,
        baseline_endpoint: Endpoint | None,
        other_endpoints: list[Endpoint],
        result_analyzer: ResultAnalyzer,
    ):
        ...

    def run_workloads(self) -> BenchmarkResult:
        for workload_cls in self.config.workload_classes:
            self.run_workload_for_all_endpoints(workload_cls)
        return self.result

Endpoint Abstract Class and Implementations

Source: misc/python/materialize/scalability/endpoint/endpoint.py, misc/python/materialize/scalability/endpoint/endpoints.py

Endpoint Base Class

The Endpoint class is the abstract base for all benchmark targets. It defines the interface for connecting to a SQL database and querying its version.

class Endpoint:
    def sql_connection(self, quiet=False, kind=ConnectionKind.Plain):
        conn = psycopg.connect(self.url(kind))
        conn.autocommit = True
        return conn

    def url(self, kind=ConnectionKind.Plain) -> str:
        return f"postgresql://{self.user()}:{self.password()}@{self.host()}:{port}/{self.database()}"

Key abstract methods that subclasses must implement: host(), user(), password(), database(), port(), and up().

The ConnectionKind enum supports Plain, Password, and Sasl connection types, each using a different port method.

The try_load_version() method queries SELECT mz_version() from the endpoint and caches the result. If the query fails, it returns "unknown".

Concrete Endpoint Implementations

Class Description
MaterializeRemote Connects to a remote Materialize instance via a user-provided psql URL. The up() method is a no-op.
PostgresContainer Starts a PostgreSQL container via the mzcompose composition framework. Returns "postgres" as its version name.
MaterializeNonRemote Abstract base for locally-launched Materialize instances. Includes a lift_limits() method that sets max_connections, max_tables, and max_materialized_views to 65535 via an internal system connection.

Operation Base Class and Concrete Operations

Source: misc/python/materialize/scalability/operation/scalability_operation.py, misc/python/materialize/scalability/operation/operations/operations.py

Operation Hierarchy

The Operation base class defines the interface for a single executable step within a workload. It uses a data-passing pattern via OperationData (a dictionary-like object carrying a cursor, worker ID, and arbitrary key-value pairs).

class Operation:
    def required_keys(self) -> set[str]: ...
    def produced_keys(self) -> set[str]: ...
    def execute(self, data: OperationData) -> OperationData:
        data.validate_requirements(self.required_keys(), ...)
        data = self._execute(data)
        data.validate_requirements(self.produced_keys(), ...)
        return data

The class hierarchy provides progressively specialized SQL operation types:

Class Description
SqlOperationWithInput Executes a SQL statement derived from input data. Requires a cursor key. Calls fetchall() after execution.
SqlOperationWithSeed Extends SqlOperationWithInput to accept a single seed key for parameterized SQL.
SqlOperationWithTwoSeeds Accepts two seed keys for SQL statements requiring two parameters.
SimpleSqlOperation Requires no input keys beyond the cursor; executes a fixed SQL string.
OperationChainWithDataExchange Chains multiple Operation instances sequentially, passing OperationData between them.

Concrete SQL Operations

The operations/operations.py module defines specific SQL operations:

Operation SQL Statement
InsertDefaultValues INSERT INTO t1 DEFAULT VALUES;
SelectOne SELECT 1;
SelectStar SELECT * FROM t1;
SelectLimit SELECT * FROM t1 LIMIT 1;
SelectCount SELECT COUNT(*) FROM t1;
SelectCountInMv SELECT count FROM mv1;
SelectUnionAll SELECT * FROM t1 UNION ALL SELECT * FROM t1;
Update UPDATE t1 SET f1 = f1 + 1;
CreateTableX CREATE TABLE x_{seed} (f1 INT, f2 INT, f3 INT, f4 INT, f5 INT);
CreateIndexOnTableX CREATE INDEX i_x_{seed} ON x_{seed} (f1);
CreateMvOnTableX CREATE MATERIALIZED VIEW mv_x_{seed} AS SELECT * FROM x_{seed};
CreateViewXOnSeries Creates a view (materialized or plain) over generate_series(1, 100).

Workload Base Classes, Markers, and Concrete Workloads

Source: misc/python/materialize/scalability/workload/workload.py, misc/python/materialize/scalability/workload/workload_markers.py, misc/python/materialize/scalability/workload/workloads/

Workload and WorkloadWithContext

The Workload class is the abstract base for all benchmark workloads:

class Workload:
    def init_operations(self) -> list[Operation]:
        return []

    def operations(self) -> list[Operation]:
        raise NotImplementedError

    def execute_operation(self, operation, cursor, worker_id, transaction_index, verbose):
        data = OperationData(cursor, worker_id)
        self.amend_data_before_execution(data)
        operation.execute(data)

    def name(self) -> str:
        return self.__class__.__name__

    def version(self) -> WorkloadVersion:
        return WorkloadVersion.create(1, 0, 0)

WorkloadWithContext extends Workload to carry references to the current Endpoint and Schema, which are injected by the executor before the run.

Workload Markers

Workload markers are abstract grouping classes used to categorize workloads:

Marker Class Group Name
DmlDqlWorkload DML & DQL
DdlWorkload DDL
ConnectionWorkload Connection
SelfTestWorkload Self-Test

DML/DQL Workloads

Source: misc/python/materialize/scalability/workload/workloads/dml_dql_workloads.py

These workloads extend DmlDqlWorkload and exercise data manipulation and query statements:

  • InsertWorkload -- Repeatedly inserts default values into t1.
  • SelectOneWorkload -- Repeatedly executes SELECT 1;.
  • SelectStarWorkload -- Repeatedly runs SELECT * FROM t1;.
  • SelectLimitWorkload -- Runs SELECT * FROM t1 LIMIT 1;.
  • SelectCountWorkload -- Runs SELECT COUNT(*) FROM t1;.
  • SelectUnionAllWorkload -- Runs a UNION ALL of t1 with itself.
  • InsertAndSelectCountInMvWorkload -- Alternates between inserting and reading count from a materialized view.
  • InsertAndSelectLimitWorkload -- Alternates between inserting and reading with LIMIT 1.
  • UpdateWorkload -- Repeatedly runs UPDATE t1 SET f1 = f1 + 1;.

DDL Workloads

Source: misc/python/materialize/scalability/workload/workloads/ddl_workloads.py

These workloads extend DdlWorkload and exercise schema operations:

  • CreateAndDropTableWorkload -- Creates a table, populates it, queries it, and drops it, all chained as an OperationChainWithDataExchange.
  • CreateAndDropTableWithMvWorkload -- Same as above but also creates and queries a materialized view before dropping.
  • CreateAndReplaceViewWorkload -- Creates five views on generate_series, creates a merged view across them, then drops all six views.

Connection Workloads

Source: misc/python/materialize/scalability/workload/workloads/connection_workloads.py

These workloads extend both WorkloadWithContext and ConnectionWorkload:

  • EstablishConnectionWorkload -- Chains Connect(), SelectOne(), Disconnect().
  • EstablishPasswordConnectionWorkload -- Same chain using password-authenticated connections.
  • EstablishSaslConnectionWorkload -- Same chain using SASL-authenticated connections.

Each workload pushes the endpoint and schema into the OperationData and removes the pre-existing cursor, since the connection itself is being benchmarked.

Self-Test Workloads

Source: misc/python/materialize/scalability/workload/workloads/self_test_workloads.py

These workloads extend SelfTestWorkload and are used for framework validation:

  • EmptyOperatorWorkload -- Executes an empty no-op operation.
  • EmptySqlStatementWorkload -- Executes an empty SQL statement.
  • Sleep10MsInEnvironmentdWorkload -- Sleeps 10ms in the environmentd process.
  • Sleep10MsInClusterdWorkload -- Sleeps 10ms in the clusterd process.
  • Sleep10MsInPythonWorkload -- Sleeps 10ms in the Python client.

BenchmarkConfiguration

Source: misc/python/materialize/scalability/config/benchmark_config.py

BenchmarkConfiguration is a dataclass holding all parameters for a benchmark run:

@dataclass
class BenchmarkConfiguration:
    workload_classes: list[type[Workload]]
    exponent_base: float
    min_concurrency: int
    max_concurrency: int
    count: int
    verbose: bool

    def get_count_for_concurrency(self, concurrency: int) -> int:
        return floor(self.count * sqrt(concurrency))

The get_count_for_concurrency method scales the number of operations proportionally to the square root of concurrency. This ensures that higher concurrency levels execute more total operations, producing stable TPS measurements even when individual operations complete quickly.

Schema Configuration

Source: misc/python/materialize/scalability/schema/schema.py

The Schema class manages database schema setup for benchmark runs. It supports configurable:

  • Source type -- Currently only Source.TABLE.
  • Schema name -- Defaults to "scalability".
  • Index creation -- Optionally creates indexes on tables and materialized views and waits for them to become queryable.
  • Transaction isolation -- Supports SERIALIZABLE and STRICT SERIALIZABLE.
  • Cluster name -- Optionally targets a specific cluster.
  • Object count -- Number of tables (and corresponding MVs and indexes) to create.

The init_sqls() method generates the full initialization sequence:

def init_sqls(self) -> list[str]:
    init_sqls = self.connect_sqls() + [
        f"DROP SCHEMA IF EXISTS {self.schema} CASCADE;",
        f"CREATE SCHEMA {self.schema};",
        "DROP TABLE IF EXISTS t1;",
    ]
    # Creates tables, inserts default values, creates MVs, and optionally indexes
    ...

The connect_sqls() method generates per-connection setup statements including SET SCHEMA, SET CLUSTER, and SET transaction_isolation.

DataFrame Wrappers for Results

Source: misc/python/materialize/scalability/df/df_totals.py

The framework uses typed pandas DataFrame wrappers to provide a structured interface over raw benchmark data.

DfTotals

Wraps the per-concurrency summary DataFrame with columns for concurrency, wallclock, workload name, count, TPS, and mean/median/min/max transaction duration.

Key methods:

  • get_max_concurrency() -- Returns the highest concurrency level measured.
  • get_concurrency_values() -- Returns all concurrency values as a list.
  • get_tps_values() -- Returns all TPS values as a list.
  • merge(other) -- Joins two DfTotals on count, concurrency, and workload columns, producing a DfTotalsMerged with TPS_BASELINE and TPS_OTHER columns for comparison.

DfTotalsMerged

An intermediate representation produced by merging two endpoint result DataFrames. It is further converted into a DfTotalsExtended (enriched with percentage differences) for use in regression detection.

concat_df_totals

A utility function that concatenates multiple DfTotals instances into one.

Result Analysis: Regression Detection and TPS Comparison

Source: misc/python/materialize/scalability/result/result_analyzers.py, misc/python/materialize/scalability/result/regression_assessment.py, misc/python/materialize/scalability/result/scalability_result.py

DefaultResultAnalyzer

The DefaultResultAnalyzer implements the ResultAnalyzer interface and performs TPS comparison between a baseline and a test endpoint:

class DefaultResultAnalyzer(ResultAnalyzer):
    def __init__(self, max_deviation_as_percent_decimal: float):
        self.max_deviation_as_percent_decimal = max_deviation_as_percent_decimal

    def perform_comparison_in_workload(self, workload_name, baseline_endpoint,
                                        other_endpoint, baseline_result, other_result):
        merged_data = baseline_result.df_totals.merge(other_result.df_totals)
        tps_per_endpoint_data = merged_data.to_enriched_result_frame(...)
        entries_worse = tps_per_endpoint_data.to_filtered_with_threshold(
            self.max_deviation_as_percent_decimal, match_results_better_than_baseline=False
        )
        entries_better = tps_per_endpoint_data.to_filtered_with_threshold(
            self.max_deviation_as_percent_decimal, match_results_better_than_baseline=True
        )
        ...

The comparison pipeline works as follows:

  1. The baseline and test DfTotals are merged on matching concurrency levels and workload names.
  2. The merged frame is enriched with percentage differences.
  3. Entries exceeding the max_deviation_as_percent_decimal threshold in the negative direction become Regression objects.
  4. Entries exceeding the threshold in the positive direction become ScalabilityImprovement objects.
  5. Both are wrapped in a ComparisonOutcome which tracks regressions, improvements, and the set of endpoints with regressions.

ScalabilityChange, Regression, and ScalabilityImprovement

Source: misc/python/materialize/scalability/result/scalability_change.py

ScalabilityChange is the base class for detected performance changes. Each instance records:

  • Workload name and concurrency level
  • Operation count
  • TPS of the test endpoint and the baseline
  • Absolute and percentage TPS difference
  • The endpoint where the change was detected

Regression validates that tps_diff < 0 (performance degradation). ScalabilityImprovement validates that tps_diff > 0.

ComparisonOutcome

Source: misc/python/materialize/scalability/result/comparison_outcome.py

Aggregates all regressions and improvements from a comparison. Supports merging outcomes across multiple workloads and filtering regressions by endpoint.

RegressionAssessment

Source: misc/python/materialize/scalability/result/regression_assessment.py

RegressionAssessment determines whether detected regressions are justified (i.e., known and accepted) or unjustified (new and unexpected). It does this by:

  1. Checking whether both the baseline and the test endpoint reference release versions.
  2. Looking up known regression commits between the two versions via get_commits_of_accepted_regressions_between_versions().
  3. If matching commits exist, the regression is marked as justified; otherwise it is unjustified.

Only unjustified regressions are reported as test failures via to_failure_details().

BenchmarkResult

Source: misc/python/materialize/scalability/result/scalability_result.py

The BenchmarkResult dataclass is the top-level container for all benchmark data:

@dataclass
class BenchmarkResult:
    overall_comparison_outcome: ComparisonOutcome
    df_total_by_endpoint_name_and_workload: dict[str, dict[str, DfTotals]]
    df_details_by_endpoint_name_and_workload: dict[str, dict[str, DfDetails]]
    workload_version_by_name: dict[str, WorkloadVersion]
    workload_group_by_name: dict[str, str]

It provides methods to:

  • Add regression outcomes from individual workload comparisons.
  • Record workload metadata (name, version, group).
  • Append per-endpoint, per-workload results.
  • Retrieve results grouped by endpoint or by workload.

Plotting Functions

Source: misc/python/materialize/scalability/plot/plot.py

The plotting module generates visualizations of benchmark results using matplotlib.

plot_tps_per_connections

Produces a scatter plot of TPS versus concurrent SQL connections for each endpoint:

def plot_tps_per_connections(
    workload_name, figure, df_totals_by_endpoint_name,
    baseline_version_name, include_zero_in_y_axis, ...
):
    for endpoint_version_name, df_totals in df_totals_by_endpoint_name.items():
        plot.scatter(
            df_totals.get_concurrency_values(),
            df_totals.get_tps_values(),
            marker=_get_plot_marker(endpoint_version_name, baseline_version_name),
        )
    plot.set_ylabel("Transactions Per Second (tps)")
    plot.set_xlabel("Concurrent SQL Connections")

The baseline endpoint uses a distinct marker style. The y-axis can optionally start at zero.

plot_duration_by_connections_for_workload

Plots the distribution of individual transaction durations per concurrency level. Supports both violin and box plot types via the DistributionPlotType enum (default: violin).

Module Structure Summary

Sub-package Key Classes Responsibility
executor/ BenchmarkExecutor Orchestrates workload execution across endpoints and concurrency levels
endpoint/ Endpoint, MaterializeRemote, PostgresContainer, MaterializeNonRemote Abstracts database connection targets
operation/ Operation, SimpleSqlOperation, SqlOperationWithSeed, OperationChainWithDataExchange Defines individual SQL operations with data-passing contracts
workload/ Workload, WorkloadWithContext, DmlDqlWorkload, DdlWorkload, ConnectionWorkload Groups operations into named, categorized benchmark scenarios
config/ BenchmarkConfiguration Holds all tunable parameters for a benchmark run
schema/ Schema Manages database object initialization and per-connection setup
df/ DfTotals, DfTotalsMerged, DfDetails Typed pandas DataFrame wrappers for structured result access
result/ DefaultResultAnalyzer, ComparisonOutcome, RegressionAssessment, BenchmarkResult Regression detection, justification logic, and result aggregation
plot/ plot_tps_per_connections, plot_duration_by_connections_for_workload Visualization of TPS and duration distributions

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment