Implementation:MaterializeInc Materialize Workload Replay Framework
| Knowledge Sources | |
|---|---|
| Domains | Testing, Benchmarking, Data Ingestion |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
The Workload Replay Framework captures, anonymizes, and replays production-like workloads against Materialize for regression testing and performance benchmarking.
Description
This module provides an end-to-end system for workload replay testing in Materialize. It captures real workload patterns (queries, data ingestion, object creation) from production environments, generates synthetic data matching the original schema and statistical distribution using long-tail random generators, and replays the captured queries concurrently against a test Materialize instance. The framework supports multiple data source types including Kafka (with Avro serialization), PostgreSQL CDC, MySQL CDC, SQL Server CDC, and webhooks, with configurable scaling factors for data volume, ingestion rate, and query concurrency.
Usage
Use this framework when performing regression testing, performance benchmarking, or validating Materialize behavior under production-like workload conditions. It is invoked through the mzcompose test infrastructure and configured via captured workload YAML files.
Code Reference
Source Location
- Repository: MaterializeInc_Materialize
- Module: misc/python/materialize/workload_replay/
Key Files
- config.py - Configuration constants:
LOCATION,WORKLOAD_REPLAY_VERSION,SEED_RANGE, andcluster_replica_sizesmapping - column.py -
Columnclass for type-aware synthetic data generation across 20+ SQL types - data.py - Initial data creation functions:
create_initial_data_requiring_mz(),create_ingestions(),create_initial_data_external() - executor.py - Test orchestration:
test()andbenchmark()entry points coordinating objects, data, ingestion, and queries - ingest.py - Data ingestion via Kafka (Avro), PostgreSQL, MySQL, SQL Server, and webhook sources
- objects.py - DDL execution:
run_create_objects_part_1()(clusters, databases, schemas) andrun_create_objects_part_2()(sources, views, sinks) - stats.py - Statistics collection, Docker resource monitoring, matplotlib plotting, and version comparison
- util.py - Utility functions:
print_workload_stats(),resolve_tag(), long-tail distribution helpers - replay.py - Query replay:
continuous_queries(),run_query(), parameter conversion from$Nto%sformat
Signature
# executor.py - Main test entry point
def test(
c: Composition,
workload: dict[str, Any],
file: pathlib.Path,
factor_initial_data: float,
factor_ingestions: float,
factor_queries: float,
runtime: int,
verbose: bool,
create_objects: bool,
initial_data: bool,
early_initial_data: bool,
run_ingestions: bool,
run_queries: bool,
max_concurrent_queries: int,
) -> dict[str, Any]: ...
# column.py - Data generation
class Column:
def __init__(self, name: str, typ: str, nullable: bool, default: Any, data_shape: str | None): ...
def avro_type(self) -> str | list[str]: ...
def kafka_value(self, rng: random.Random) -> Any: ...
def value(self, rng: random.Random, in_query: bool = True) -> Any: ...
# replay.py - Query execution
def continuous_queries(c: Composition, ...) -> None: ...
def run_query(c: Composition, query: dict[str, Any], stats: dict[str, Any], verbose: bool, stop_event: threading.Event) -> None: ...
Import
from materialize.workload_replay.executor import test
from materialize.workload_replay.column import Column
from materialize.workload_replay.config import LOCATION, SEED_RANGE, cluster_replica_sizes
from materialize.workload_replay.data import create_initial_data_requiring_mz
from materialize.workload_replay.replay import continuous_queries
from materialize.workload_replay.stats import compare_table, docker_stats, plot_docker_stats_compare
from materialize.workload_replay.util import print_workload_stats, resolve_tag
I/O Contract
| Input | Type | Description |
|---|---|---|
| workload | dict[str, Any] |
Parsed YAML workload definition containing databases, schemas, tables, sources, queries |
| factor_initial_data | float |
Scaling factor for initial data volume (1.0 = original size) |
| factor_ingestions | float |
Scaling factor for continuous ingestion rate |
| factor_queries | float |
Scaling factor for query frequency |
| runtime | int |
Duration in seconds for the replay phase |
| Output | Type | Description |
|---|---|---|
| test result | dict[str, Any] |
Statistics dict with query counts, latencies, errors, Docker resource usage |
| plots | matplotlib files | Docker stats comparison plots saved to disk |
Usage Examples
# Running a workload replay test via executor
from materialize.workload_replay.executor import test
result = test(
c=composition,
workload=workload_dict,
file=pathlib.Path("workload.yml"),
factor_initial_data=0.1,
factor_ingestions=0.5,
factor_queries=1.0,
runtime=300,
verbose=True,
create_objects=True,
initial_data=True,
early_initial_data=False,
run_ingestions=True,
run_queries=True,
max_concurrent_queries=10,
)