Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:MaterializeInc Materialize Workload Replay Framework

From Leeroopedia


Knowledge Sources
Domains Testing, Benchmarking, Data Ingestion
Last Updated 2026-02-08 00:00 GMT

Overview

The Workload Replay Framework captures, anonymizes, and replays production-like workloads against Materialize for regression testing and performance benchmarking.

Description

This module provides an end-to-end system for workload replay testing in Materialize. It captures real workload patterns (queries, data ingestion, object creation) from production environments, generates synthetic data matching the original schema and statistical distribution using long-tail random generators, and replays the captured queries concurrently against a test Materialize instance. The framework supports multiple data source types including Kafka (with Avro serialization), PostgreSQL CDC, MySQL CDC, SQL Server CDC, and webhooks, with configurable scaling factors for data volume, ingestion rate, and query concurrency.

Usage

Use this framework when performing regression testing, performance benchmarking, or validating Materialize behavior under production-like workload conditions. It is invoked through the mzcompose test infrastructure and configured via captured workload YAML files.

Code Reference

Source Location

Key Files

  • config.py - Configuration constants: LOCATION, WORKLOAD_REPLAY_VERSION, SEED_RANGE, and cluster_replica_sizes mapping
  • column.py - Column class for type-aware synthetic data generation across 20+ SQL types
  • data.py - Initial data creation functions: create_initial_data_requiring_mz(), create_ingestions(), create_initial_data_external()
  • executor.py - Test orchestration: test() and benchmark() entry points coordinating objects, data, ingestion, and queries
  • ingest.py - Data ingestion via Kafka (Avro), PostgreSQL, MySQL, SQL Server, and webhook sources
  • objects.py - DDL execution: run_create_objects_part_1() (clusters, databases, schemas) and run_create_objects_part_2() (sources, views, sinks)
  • stats.py - Statistics collection, Docker resource monitoring, matplotlib plotting, and version comparison
  • util.py - Utility functions: print_workload_stats(), resolve_tag(), long-tail distribution helpers
  • replay.py - Query replay: continuous_queries(), run_query(), parameter conversion from $N to %s format

Signature

# executor.py - Main test entry point
def test(
    c: Composition,
    workload: dict[str, Any],
    file: pathlib.Path,
    factor_initial_data: float,
    factor_ingestions: float,
    factor_queries: float,
    runtime: int,
    verbose: bool,
    create_objects: bool,
    initial_data: bool,
    early_initial_data: bool,
    run_ingestions: bool,
    run_queries: bool,
    max_concurrent_queries: int,
) -> dict[str, Any]: ...

# column.py - Data generation
class Column:
    def __init__(self, name: str, typ: str, nullable: bool, default: Any, data_shape: str | None): ...
    def avro_type(self) -> str | list[str]: ...
    def kafka_value(self, rng: random.Random) -> Any: ...
    def value(self, rng: random.Random, in_query: bool = True) -> Any: ...

# replay.py - Query execution
def continuous_queries(c: Composition, ...) -> None: ...
def run_query(c: Composition, query: dict[str, Any], stats: dict[str, Any], verbose: bool, stop_event: threading.Event) -> None: ...

Import

from materialize.workload_replay.executor import test
from materialize.workload_replay.column import Column
from materialize.workload_replay.config import LOCATION, SEED_RANGE, cluster_replica_sizes
from materialize.workload_replay.data import create_initial_data_requiring_mz
from materialize.workload_replay.replay import continuous_queries
from materialize.workload_replay.stats import compare_table, docker_stats, plot_docker_stats_compare
from materialize.workload_replay.util import print_workload_stats, resolve_tag

I/O Contract

Input Type Description
workload dict[str, Any] Parsed YAML workload definition containing databases, schemas, tables, sources, queries
factor_initial_data float Scaling factor for initial data volume (1.0 = original size)
factor_ingestions float Scaling factor for continuous ingestion rate
factor_queries float Scaling factor for query frequency
runtime int Duration in seconds for the replay phase
Output Type Description
test result dict[str, Any] Statistics dict with query counts, latencies, errors, Docker resource usage
plots matplotlib files Docker stats comparison plots saved to disk

Usage Examples

# Running a workload replay test via executor
from materialize.workload_replay.executor import test

result = test(
    c=composition,
    workload=workload_dict,
    file=pathlib.Path("workload.yml"),
    factor_initial_data=0.1,
    factor_ingestions=0.5,
    factor_queries=1.0,
    runtime=300,
    verbose=True,
    create_objects=True,
    initial_data=True,
    early_initial_data=False,
    run_ingestions=True,
    run_queries=True,
    max_concurrent_queries=10,
)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment