Overview
A Python framework for running feature-level performance benchmarks against Materialize instances. It supports statistical termination conditions, outlier filtering, multiple aggregation strategies, memory tracking, regression detection via relative thresholds, and comparative reporting across different Materialize versions.
Description
The feature benchmark module is organized as a measurement pipeline with the following components:
- Benchmark orchestrator (
benchmark.py): The central Benchmark class drives the execution loop. For each scenario it runs lifecycle phases (shared(), init(), before(), then benchmark()), collects wallclock duration measurements by computing the difference between two timestamp markers, optionally collects memory measurements from Docker containers (materialized and clusterd), applies outlier filtering, feeds measurements into an aggregation, and checks termination conditions after each iteration.
- Action types (
action.py): Defines executable actions that scenarios use in their lifecycle phases. TdAction runs testdrive queries, LambdaAction executes arbitrary callables, Kgen runs the kgen Kafka data generator, and DummyAction is a no-op placeholder.
- Scenario framework (
scenario.py): The RootScenario base class defines a scenario lifecycle with four phases: shared() (run once across all Mz instances), init() (run once per Mz), before() (run before each measurement iteration), and benchmark() (the measured workload). Scenarios define a SCALE parameter (default 6, meaning 10^6 = 1,000,000 rows), configurable relative regression thresholds per measurement type, and helper methods for generating cross-join queries over a ten table. Three scenario tiers exist: Scenario (default/CI), ScenarioBig (explicit invocation), and ScenarioDisabled.
- Measurement pipeline:
- Source (
measurement_source.py): Td parses testdrive output to extract timestamps by matching /* A */ and /* B */ markers and reading the at ts values. Lambda records time.time() after executing a callable.
- Filter (
filter.py): RemoveOutliers discards measurements exceeding 1 standard deviation above the mean (after at least 3 data points). FilterFirst discards the first measurement. NoFilter passes everything through.
- Aggregation (
aggregation.py): Multiple strategies including MinAggregation (minimum value), MeanAggregation (numpy mean), StdDevAggregation (mean minus N standard deviations), NormalDistributionAggregation (fits a statistics.NormalDist), and NoAggregation (returns first data point).
- Termination (
termination.py): NormalDistributionOverlap stops when successive normal distribution fits overlap above a threshold (requires >10 data points). ProbForMin stops when the probability of observing a new minimum drops below a threshold (requires >5 data points). RunAtMost stops after a fixed number of iterations.
- Result evaluation (
benchmark_result_evaluator.py): RelativeThresholdEvaluator computes the ratio between the current ("this") and baseline ("other") measurements. A regression is flagged when the ratio exceeds 1 + threshold (default 10% for wallclock, 20% for Mz memory, 50% for clusterd memory). A "strong regression" uses double the threshold. Results are formatted as human-readable strings like "worse: 15.2% slower" or "better: 2.3 times faster".
- Report generation (
report.py): The Report class collects BenchmarkScenarioResult objects per scenario and renders a tabular comparison with columns for name, measurement type, this/other values, unit, threshold, regression flag, and human-readable comparison. ReportMeasurement computes summary statistics (min, max, mean, variance) across multiple cycles.
- Report selection (
benchmark_result_selection.py): When multiple benchmark cycles are run, MedianBenchmarkResultSelector picks the report with the median wallclock value per scenario, and BestBenchmarkResultSelector picks the report with the minimum wallclock value, preferring reports without regressions.
Usage
This framework is used for feature-level performance regression detection in Materialize's CI pipeline and for manual benchmarking. Scenarios target specific database features (e.g., inserts, selects, joins, Kafka ingestion) and measure their wallclock execution time and memory consumption. By comparing measurements between two Materialize versions (typically a development build versus a baseline), regressions can be automatically detected and flagged.
Code Reference
Source Location
- Repository: MaterializeInc_Materialize
- Directory:
misc/python/materialize/feature_benchmark/
benchmark.py (226 lines) - Main Benchmark orchestrator class
action.py (92 lines) - Action type definitions (TdAction, LambdaAction, Kgen, DummyAction)
aggregation.py (91 lines) - Statistical aggregation strategies (Min, Mean, StdDev, NormalDist, NoAggregation)
benchmark_result.py (109 lines) - Result data structures (BenchmarkScenarioResult, BenchmarkScenarioMetric)
benchmark_result_evaluator.py (118 lines) - Regression detection via relative thresholds
benchmark_result_selection.py (149 lines) - Report selection strategies (Median, Best)
executor.py (145 lines) - Execution abstraction (Docker, MzCloud)
filter.py (52 lines) - Outlier filtering (RemoveOutliers, FilterFirst, NoFilter)
measurement.py (63 lines) - Measurement data types (MeasurementType, MeasurementUnit, WallclockDuration)
measurement_source.py (110 lines) - Measurement sources and timestamp parsing (Td, Lambda)
report.py (137 lines) - Report generation and regression determination
scenario.py (143 lines) - Base scenario class with lifecycle phases
scenario_version.py (15 lines) - Scenario versioning
termination.py (77 lines) - Statistical termination conditions (NormalDistributionOverlap, ProbForMin, RunAtMost)
Signature
class Benchmark:
def __init__(
self,
mz_id: int,
mz_version: MzVersion,
scenario_cls: type[Scenario],
executor: Executor,
filter: Filter,
termination_conditions: list[TerminationCondition],
aggregation_class: type[Aggregation],
default_size: int,
seed: int,
scale: str | None = None,
measure_memory: bool = True,
) -> None
def create_scenario_instance(self) -> Scenario
def run(self) -> list[Aggregation]
class RootScenario:
SCALE: float = 6
FIXED_SCALE: bool = False
RELATIVE_THRESHOLD: dict[MeasurementType, float]
def __init__(self, scale: float, mz_version: MzVersion, default_size: int, seed: int) -> None
def shared(self) -> Action | list[Action] | None
def init(self) -> Action | list[Action] | None
def before(self) -> Action | list[Action] | None
def benchmark(self) -> BenchmarkingSequence
class RelativeThresholdEvaluator(BenchmarkResultEvaluator[float | None]):
def __init__(self, scenario_class: type[Scenario]) -> None
def ratio(self, metric: BenchmarkScenarioMetric) -> float | None
def is_regression(self, metric: BenchmarkScenarioMetric, threshold: float | None = None) -> bool
def is_strong_regression(self, metric: BenchmarkScenarioMetric) -> bool
def human_readable(self, metric: BenchmarkScenarioMetric, use_colors: bool) -> str
class Executor:
def Lambda(self, _lambda: Callable[["Executor"], float]) -> float
def Td(self, input: str) -> Any
def Kgen(self, topic: str, args: list[str]) -> Any
def DockerMemMz(self) -> int
def DockerMemClusterd(self) -> int
class Report:
def __init__(self, cycle_number: int) -> None
def add_scenario_result(self, result: BenchmarkScenarioResult) -> None
def as_string(self, use_colors: bool, limit_to_scenario: str | None = None) -> str
def has_scenario_regression(self, scenario_name: str) -> bool
Import
from materialize.feature_benchmark.benchmark import Benchmark
from materialize.feature_benchmark.scenario import Scenario, ScenarioBig, ScenarioDisabled
from materialize.feature_benchmark.executor import Executor, Docker, MzCloud
from materialize.feature_benchmark.filter import Filter, RemoveOutliers, NoFilter, FilterFirst
from materialize.feature_benchmark.aggregation import (
Aggregation, MinAggregation, MeanAggregation, StdDevAggregation, NormalDistributionAggregation,
)
from materialize.feature_benchmark.termination import (
TerminationCondition, NormalDistributionOverlap, ProbForMin, RunAtMost,
)
from materialize.feature_benchmark.measurement import Measurement, MeasurementType, MeasurementUnit
from materialize.feature_benchmark.measurement_source import MeasurementSource, Td, Lambda
from materialize.feature_benchmark.action import Action, TdAction, LambdaAction, Kgen, DummyAction
from materialize.feature_benchmark.report import Report, ReportMeasurement
from materialize.feature_benchmark.benchmark_result import BenchmarkScenarioResult, BenchmarkScenarioMetric
from materialize.feature_benchmark.benchmark_result_evaluator import RelativeThresholdEvaluator
from materialize.feature_benchmark.benchmark_result_selection import (
MedianBenchmarkResultSelector, BestBenchmarkResultSelector,
)
I/O Contract
Benchmark Constructor Inputs
| Parameter |
Type |
Description
|
mz_id |
int |
Identifier for the Materialize instance under test (0 = runs shared section)
|
mz_version |
MzVersion |
Version of the Materialize instance being benchmarked
|
scenario_cls |
type[Scenario] |
The scenario class to instantiate and run
|
executor |
Executor |
Execution backend (Docker or MzCloud)
|
filter |
Filter |
Outlier filter to apply to measurements
|
termination_conditions |
list[TerminationCondition] |
One or more conditions that stop the measurement loop
|
aggregation_class |
type[Aggregation] |
Strategy for aggregating multiple measurements
|
default_size |
int |
Default size parameter passed to scenarios
|
seed |
int |
Random seed for reproducibility
|
scale |
None |
Optional scale override; supports absolute values, or relative adjustments with "+" or "-" prefixes
|
measure_memory |
bool |
Whether to collect memory measurements (default: True)
|
Benchmark.run() Output
| Return Type |
Description
|
list[Aggregation] |
A list of three Aggregation objects: [0] = wallclock performance aggregation, [1] = Mz memory aggregation, [2] = clusterd memory aggregation. Each contains aggregated measurement data accessible via .aggregate().
|
Measurement Types
| MeasurementType |
Unit |
Description
|
WALLCLOCK |
Seconds or Nanoseconds |
Elapsed time between the A and B markers in testdrive output
|
MEMORY_MZ |
Megabytes |
Memory usage of the materialized Docker container
|
MEMORY_CLUSTERD |
Megabytes |
Memory usage of the clusterd Docker container
|
Default Regression Thresholds
| MeasurementType |
Threshold |
Meaning
|
WALLCLOCK |
10% |
Wallclock regression flagged if >10% slower
|
MEMORY_MZ |
20% |
Mz memory regression flagged if >20% more
|
MEMORY_CLUSTERD |
50% |
Clusterd memory regression flagged if >50% more
|
Usage Examples
Defining a Custom Scenario
from materialize.feature_benchmark.scenario import Scenario
from materialize.feature_benchmark.measurement_source import Td
class MySelectBenchmark(Scenario):
SCALE = 4 # 10^4 = 10,000 rows
def init(self):
return TdAction("""
> CREATE TABLE t1 (f1 INTEGER);
> INSERT INTO t1 SELECT generate_series(1, 10000);
""")
def benchmark(self):
return Td("""
> /* A */ SELECT 1
> /* B */ SELECT count(*) FROM t1
10000
""")
Running a Benchmark
benchmark = Benchmark(
mz_id=0,
mz_version=mz_version,
scenario_cls=MySelectBenchmark,
executor=docker_executor,
filter=RemoveOutliers(),
termination_conditions=[
NormalDistributionOverlap(threshold=0.99),
RunAtMost(threshold=100),
],
aggregation_class=MinAggregation,
default_size=1000,
seed=42,
)
aggregations = benchmark.run()
wallclock_result = aggregations[0].aggregate()
Evaluating Results and Generating Reports
report = Report(cycle_number=1)
report.add_scenario_result(scenario_result)
print(report.as_string(use_colors=True))
# Output:
# NAME | TYPE | THIS | OTHER | UNIT | THRESHOLD | Regression? | 'THIS' is
# -----------------------------------------------------------------------------------------------------------------------
# MySelectBenchmark | wallclock | 0.542 | 0.498 | s | 10% | no | worse: 8.8% slower
Related Pages