Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:MaterializeInc Materialize Feature Benchmark Framework

From Leeroopedia


Knowledge Sources
Domains Benchmarking, Performance_Testing, Statistical_Analysis
Last Updated 2026-02-08 00:00 GMT

Overview

A Python framework for running feature-level performance benchmarks against Materialize instances. It supports statistical termination conditions, outlier filtering, multiple aggregation strategies, memory tracking, regression detection via relative thresholds, and comparative reporting across different Materialize versions.

Description

The feature benchmark module is organized as a measurement pipeline with the following components:

  • Benchmark orchestrator (benchmark.py): The central Benchmark class drives the execution loop. For each scenario it runs lifecycle phases (shared(), init(), before(), then benchmark()), collects wallclock duration measurements by computing the difference between two timestamp markers, optionally collects memory measurements from Docker containers (materialized and clusterd), applies outlier filtering, feeds measurements into an aggregation, and checks termination conditions after each iteration.
  • Action types (action.py): Defines executable actions that scenarios use in their lifecycle phases. TdAction runs testdrive queries, LambdaAction executes arbitrary callables, Kgen runs the kgen Kafka data generator, and DummyAction is a no-op placeholder.
  • Scenario framework (scenario.py): The RootScenario base class defines a scenario lifecycle with four phases: shared() (run once across all Mz instances), init() (run once per Mz), before() (run before each measurement iteration), and benchmark() (the measured workload). Scenarios define a SCALE parameter (default 6, meaning 10^6 = 1,000,000 rows), configurable relative regression thresholds per measurement type, and helper methods for generating cross-join queries over a ten table. Three scenario tiers exist: Scenario (default/CI), ScenarioBig (explicit invocation), and ScenarioDisabled.
  • Measurement pipeline:
    • Source (measurement_source.py): Td parses testdrive output to extract timestamps by matching /* A */ and /* B */ markers and reading the at ts values. Lambda records time.time() after executing a callable.
    • Filter (filter.py): RemoveOutliers discards measurements exceeding 1 standard deviation above the mean (after at least 3 data points). FilterFirst discards the first measurement. NoFilter passes everything through.
    • Aggregation (aggregation.py): Multiple strategies including MinAggregation (minimum value), MeanAggregation (numpy mean), StdDevAggregation (mean minus N standard deviations), NormalDistributionAggregation (fits a statistics.NormalDist), and NoAggregation (returns first data point).
    • Termination (termination.py): NormalDistributionOverlap stops when successive normal distribution fits overlap above a threshold (requires >10 data points). ProbForMin stops when the probability of observing a new minimum drops below a threshold (requires >5 data points). RunAtMost stops after a fixed number of iterations.
  • Result evaluation (benchmark_result_evaluator.py): RelativeThresholdEvaluator computes the ratio between the current ("this") and baseline ("other") measurements. A regression is flagged when the ratio exceeds 1 + threshold (default 10% for wallclock, 20% for Mz memory, 50% for clusterd memory). A "strong regression" uses double the threshold. Results are formatted as human-readable strings like "worse: 15.2% slower" or "better: 2.3 times faster".
  • Report generation (report.py): The Report class collects BenchmarkScenarioResult objects per scenario and renders a tabular comparison with columns for name, measurement type, this/other values, unit, threshold, regression flag, and human-readable comparison. ReportMeasurement computes summary statistics (min, max, mean, variance) across multiple cycles.
  • Report selection (benchmark_result_selection.py): When multiple benchmark cycles are run, MedianBenchmarkResultSelector picks the report with the median wallclock value per scenario, and BestBenchmarkResultSelector picks the report with the minimum wallclock value, preferring reports without regressions.

Usage

This framework is used for feature-level performance regression detection in Materialize's CI pipeline and for manual benchmarking. Scenarios target specific database features (e.g., inserts, selects, joins, Kafka ingestion) and measure their wallclock execution time and memory consumption. By comparing measurements between two Materialize versions (typically a development build versus a baseline), regressions can be automatically detected and flagged.

Code Reference

Source Location

  • Repository: MaterializeInc_Materialize
  • Directory: misc/python/materialize/feature_benchmark/
    • benchmark.py (226 lines) - Main Benchmark orchestrator class
    • action.py (92 lines) - Action type definitions (TdAction, LambdaAction, Kgen, DummyAction)
    • aggregation.py (91 lines) - Statistical aggregation strategies (Min, Mean, StdDev, NormalDist, NoAggregation)
    • benchmark_result.py (109 lines) - Result data structures (BenchmarkScenarioResult, BenchmarkScenarioMetric)
    • benchmark_result_evaluator.py (118 lines) - Regression detection via relative thresholds
    • benchmark_result_selection.py (149 lines) - Report selection strategies (Median, Best)
    • executor.py (145 lines) - Execution abstraction (Docker, MzCloud)
    • filter.py (52 lines) - Outlier filtering (RemoveOutliers, FilterFirst, NoFilter)
    • measurement.py (63 lines) - Measurement data types (MeasurementType, MeasurementUnit, WallclockDuration)
    • measurement_source.py (110 lines) - Measurement sources and timestamp parsing (Td, Lambda)
    • report.py (137 lines) - Report generation and regression determination
    • scenario.py (143 lines) - Base scenario class with lifecycle phases
    • scenario_version.py (15 lines) - Scenario versioning
    • termination.py (77 lines) - Statistical termination conditions (NormalDistributionOverlap, ProbForMin, RunAtMost)

Signature

class Benchmark:
    def __init__(
        self,
        mz_id: int,
        mz_version: MzVersion,
        scenario_cls: type[Scenario],
        executor: Executor,
        filter: Filter,
        termination_conditions: list[TerminationCondition],
        aggregation_class: type[Aggregation],
        default_size: int,
        seed: int,
        scale: str | None = None,
        measure_memory: bool = True,
    ) -> None

    def create_scenario_instance(self) -> Scenario
    def run(self) -> list[Aggregation]

class RootScenario:
    SCALE: float = 6
    FIXED_SCALE: bool = False
    RELATIVE_THRESHOLD: dict[MeasurementType, float]

    def __init__(self, scale: float, mz_version: MzVersion, default_size: int, seed: int) -> None
    def shared(self) -> Action | list[Action] | None
    def init(self) -> Action | list[Action] | None
    def before(self) -> Action | list[Action] | None
    def benchmark(self) -> BenchmarkingSequence

class RelativeThresholdEvaluator(BenchmarkResultEvaluator[float | None]):
    def __init__(self, scenario_class: type[Scenario]) -> None
    def ratio(self, metric: BenchmarkScenarioMetric) -> float | None
    def is_regression(self, metric: BenchmarkScenarioMetric, threshold: float | None = None) -> bool
    def is_strong_regression(self, metric: BenchmarkScenarioMetric) -> bool
    def human_readable(self, metric: BenchmarkScenarioMetric, use_colors: bool) -> str

class Executor:
    def Lambda(self, _lambda: Callable[["Executor"], float]) -> float
    def Td(self, input: str) -> Any
    def Kgen(self, topic: str, args: list[str]) -> Any
    def DockerMemMz(self) -> int
    def DockerMemClusterd(self) -> int

class Report:
    def __init__(self, cycle_number: int) -> None
    def add_scenario_result(self, result: BenchmarkScenarioResult) -> None
    def as_string(self, use_colors: bool, limit_to_scenario: str | None = None) -> str
    def has_scenario_regression(self, scenario_name: str) -> bool

Import

from materialize.feature_benchmark.benchmark import Benchmark
from materialize.feature_benchmark.scenario import Scenario, ScenarioBig, ScenarioDisabled
from materialize.feature_benchmark.executor import Executor, Docker, MzCloud
from materialize.feature_benchmark.filter import Filter, RemoveOutliers, NoFilter, FilterFirst
from materialize.feature_benchmark.aggregation import (
    Aggregation, MinAggregation, MeanAggregation, StdDevAggregation, NormalDistributionAggregation,
)
from materialize.feature_benchmark.termination import (
    TerminationCondition, NormalDistributionOverlap, ProbForMin, RunAtMost,
)
from materialize.feature_benchmark.measurement import Measurement, MeasurementType, MeasurementUnit
from materialize.feature_benchmark.measurement_source import MeasurementSource, Td, Lambda
from materialize.feature_benchmark.action import Action, TdAction, LambdaAction, Kgen, DummyAction
from materialize.feature_benchmark.report import Report, ReportMeasurement
from materialize.feature_benchmark.benchmark_result import BenchmarkScenarioResult, BenchmarkScenarioMetric
from materialize.feature_benchmark.benchmark_result_evaluator import RelativeThresholdEvaluator
from materialize.feature_benchmark.benchmark_result_selection import (
    MedianBenchmarkResultSelector, BestBenchmarkResultSelector,
)

I/O Contract

Benchmark Constructor Inputs

Parameter Type Description
mz_id int Identifier for the Materialize instance under test (0 = runs shared section)
mz_version MzVersion Version of the Materialize instance being benchmarked
scenario_cls type[Scenario] The scenario class to instantiate and run
executor Executor Execution backend (Docker or MzCloud)
filter Filter Outlier filter to apply to measurements
termination_conditions list[TerminationCondition] One or more conditions that stop the measurement loop
aggregation_class type[Aggregation] Strategy for aggregating multiple measurements
default_size int Default size parameter passed to scenarios
seed int Random seed for reproducibility
scale None Optional scale override; supports absolute values, or relative adjustments with "+" or "-" prefixes
measure_memory bool Whether to collect memory measurements (default: True)

Benchmark.run() Output

Return Type Description
list[Aggregation] A list of three Aggregation objects: [0] = wallclock performance aggregation, [1] = Mz memory aggregation, [2] = clusterd memory aggregation. Each contains aggregated measurement data accessible via .aggregate().

Measurement Types

MeasurementType Unit Description
WALLCLOCK Seconds or Nanoseconds Elapsed time between the A and B markers in testdrive output
MEMORY_MZ Megabytes Memory usage of the materialized Docker container
MEMORY_CLUSTERD Megabytes Memory usage of the clusterd Docker container

Default Regression Thresholds

MeasurementType Threshold Meaning
WALLCLOCK 10% Wallclock regression flagged if >10% slower
MEMORY_MZ 20% Mz memory regression flagged if >20% more
MEMORY_CLUSTERD 50% Clusterd memory regression flagged if >50% more

Usage Examples

Defining a Custom Scenario

from materialize.feature_benchmark.scenario import Scenario
from materialize.feature_benchmark.measurement_source import Td

class MySelectBenchmark(Scenario):
    SCALE = 4  # 10^4 = 10,000 rows

    def init(self):
        return TdAction("""
> CREATE TABLE t1 (f1 INTEGER);
> INSERT INTO t1 SELECT generate_series(1, 10000);
""")

    def benchmark(self):
        return Td("""
> /* A */ SELECT 1
> /* B */ SELECT count(*) FROM t1
10000
""")

Running a Benchmark

benchmark = Benchmark(
    mz_id=0,
    mz_version=mz_version,
    scenario_cls=MySelectBenchmark,
    executor=docker_executor,
    filter=RemoveOutliers(),
    termination_conditions=[
        NormalDistributionOverlap(threshold=0.99),
        RunAtMost(threshold=100),
    ],
    aggregation_class=MinAggregation,
    default_size=1000,
    seed=42,
)
aggregations = benchmark.run()
wallclock_result = aggregations[0].aggregate()

Evaluating Results and Generating Reports

report = Report(cycle_number=1)
report.add_scenario_result(scenario_result)
print(report.as_string(use_colors=True))
# Output:
# NAME                                | TYPE            |            THIS |           OTHER |  UNIT  | THRESHOLD  | Regression? | 'THIS' is
# -----------------------------------------------------------------------------------------------------------------------
# MySelectBenchmark                   | wallclock       |           0.542 |           0.498 |   s    |    10%     |      no     | worse:   8.8% slower

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment