Implementation:MaterializeInc Materialize Check Support Classes
| Knowledge Sources | |
|---|---|
| Domains | Testing, Upgrade_Validation, Platform_Checks |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Support classes and utilities that provide infrastructure for the platform checks framework, including backup/restore actions, Kubernetes cloud test actions, shared constants, feature flags, and backup/restore scenario definitions.
Description
The platform checks framework relies on several support modules beyond the individual Check subclasses:
- backup_actions.py: Defines Backup and Restore Action subclasses for backup/restore test orchestration
- cloudtest_actions.py: Defines Action subclasses for Kubernetes-based cloud testing (ReplaceEnvironmentdStatefulSet, SetupSshTunnels)
- common.py: Shared Kafka Avro schema constants used across multiple check files
- features.py: Features class for managing platform-specific feature flags (e.g., Azurite availability)
- scenarios_backup_restore.py: Concrete Scenario subclasses that orchestrate backup and restore test sequences
Code Reference
Source Locations
- Repository: MaterializeInc_Materialize
- Files:
- misc/python/materialize/checks/backup_actions.py
- misc/python/materialize/checks/cloudtest_actions.py
- misc/python/materialize/checks/common.py
- misc/python/materialize/checks/features.py
- misc/python/materialize/checks/scenarios_backup_restore.py
Backup Actions (backup_actions.py)
Provides two Action subclasses that delegate to the mzcompose composition for backup and restore operations.
Backup
class Backup(Action):
def execute(self, e: Executor) -> None:
c = e.mzcompose_composition()
c.backup()
def join(self, e: Executor) -> None:
# Action is blocking
pass
Triggers a backup of the Materialize state via the mzcompose composition. The action is synchronous (blocking), so join() is a no-op.
Restore
class Restore(Action):
def __init__(self, restart_mz=True):
self.restart_mz = restart_mz
def execute(self, e: Executor) -> None:
c = e.mzcompose_composition()
c.restore(restart_mz=self.restart_mz)
def join(self, e: Executor) -> None:
# Action is blocking
pass
Restores Materialize state from a previous backup. The restart_mz parameter controls whether Materialize is automatically restarted after restore (defaults to True). When set to False, the caller is responsible for starting Materialize separately (used in scenarios like BackupAndRestoreToPreviousState).
I/O Contract
| Class | Input | Output | Side Effect |
|---|---|---|---|
| Backup | Executor (provides mzcompose composition) | None | Creates a backup of Materialize state |
| Restore | Executor, restart_mz: bool | None | Restores Materialize state; optionally restarts Mz |
Cloud Test Actions (cloudtest_actions.py)
Provides Action subclasses for Kubernetes-based cloud testing environments.
ReplaceEnvironmentdStatefulSet
class ReplaceEnvironmentdStatefulSet(Action):
"""Change the image tag of the environmentd stateful set,
re-create the definition and replace the existing one."""
new_tag: str | None
def __init__(self, new_tag: str | None = None) -> None:
self.new_tag = new_tag
def execute(self, e: Executor) -> None:
new_version = (
MzVersion.parse_mz(self.new_tag)
if self.new_tag
else MzVersion.parse_cargo()
)
mz = e.cloudtest_application()
stateful_set = [
resource for resource in mz.resources
if type(resource) == EnvironmentdStatefulSet
]
assert len(stateful_set) == 1
stateful_set = stateful_set[0]
stateful_set.tag = self.new_tag
stateful_set.replace()
e.current_mz_version = new_version
Replaces the Kubernetes EnvironmentdStatefulSet with a new image tag to simulate version upgrades in cloud test environments. If new_tag is None, it falls back to parsing the version from Cargo.toml. Updates e.current_mz_version after the replacement.
SetupSshTunnels
class SetupSshTunnels(Action):
"""Prepare the SSH tunnels."""
def __init__(self, mz: MaterializeApplication) -> None:
self.handle: Any | None = None
self.mz = mz
def execute(self, e: Executor) -> None:
connection_count = 4
self.handle = e.testdrive(...)
for i in range(connection_count):
public_key = self.mz.environmentd.sql_query(...)
self.mz.kubectl("exec", "svc/ssh-bastion-host", ...)
def join(self, e: Executor) -> None:
e.join(self.handle)
Creates 4 SSH tunnel connections in Materialize and registers their public keys with the SSH bastion host pod in Kubernetes. Unlike Backup/Restore, this action is partially asynchronous: the testdrive handle is stored and joined later.
I/O Contract
| Class | Input | Output | Side Effect |
|---|---|---|---|
| ReplaceEnvironmentdStatefulSet | new_tag: str or None | None | Replaces K8s StatefulSet image, updates executor version |
| SetupSshTunnels | MaterializeApplication | None | Creates SSH tunnel connections and registers public keys |
Common Constants (common.py)
Provides shared Kafka Avro schema definitions used by multiple check files, particularly sink and source checks.
KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD = """
$ set keyschema={
"type": "record",
"name": "Key",
"fields": [
{"name": "key1", "type": "string"}
]
}
$ set schema={
"type" : "record",
"name" : "test",
"fields" : [
{"name":"f1", "type":"string"}
]
}
"""
This constant defines a testdrive-compatible Avro schema pair (key + value) used for Kafka source and sink checks. Checks import it as:
from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
Features (features.py)
Manages platform-specific feature flags that control which checks can run in a given test environment.
class Features:
AZURITE = "azurite"
def __init__(self, features):
self.features = features
def azurite_enabled(self) -> bool:
return self.features and self.AZURITE in self.features
Currently supports a single feature flag:
- AZURITE: Controls whether Azure Blob Storage emulation (Azurite) is available. Some checks (e.g., copy_to_s3.py) may require this for S3-compatible storage tests.
The Features class is instantiated with a collection of feature strings and provides boolean accessor methods. This design accommodates platform-specific constraints such as architecture differences (e.g., SQL Server Docker image only available on x86_64).
I/O Contract
| Method | Input | Output | Description |
|---|---|---|---|
| __init__() | features: collection of strings | Features instance | Stores the feature set |
| azurite_enabled() | None | bool | Returns True if Azurite is in the feature set |
Backup/Restore Scenarios (scenarios_backup_restore.py)
Defines four Scenario subclasses that orchestrate backup and restore operations interleaved with check lifecycle phases.
BackupAndRestoreAfterManipulate
class BackupAndRestoreAfterManipulate(Scenario):
"""Backup and Restore Materialize after manipulate(phase=2) has run.
Only validate() is run post-restore."""
def actions(self) -> list[Action]:
return [
StartMz(self),
Initialize(self),
Manipulate(self, phase=1),
Manipulate(self, phase=2),
Backup(),
KillMz(),
Restore(),
Validate(self),
]
Runs the full initialize/manipulate lifecycle, then backs up, kills Materialize, restores, and validates. Tests that a complete state survives backup/restore.
BackupAndRestoreBeforeManipulate
class BackupAndRestoreBeforeManipulate(Scenario):
"""Backup and Restore Materialize before manipulate(phase=2) has run."""
def actions(self) -> list[Action]:
return [
StartMz(self),
Initialize(self),
Manipulate(self, phase=1),
Backup(),
KillMz(),
Restore(),
Manipulate(self, phase=2),
Validate(self),
]
Backs up after phase 1 manipulation only, restores, then continues with phase 2 manipulation and validation. Tests that partial state can be restored and continued.
BackupAndRestoreToPreviousState
class BackupAndRestoreToPreviousState(Scenario):
"""Backup, run more workloads, and then Restore to a previous state."""
def requires_external_idempotence(self) -> bool:
return True
def actions(self) -> list[Action]:
return [
StartMz(self),
Initialize(self),
Manipulate(self, phase=1),
Backup(),
Manipulate(self, phase=2), # Those updates will be lost here ..
KillMz(),
Restore(restart_mz=False),
StartMz(self),
Manipulate(self, phase=2), # ... and redone here
Validate(self),
]
Tests restoring to a previous state by running phase 2 manipulation, then restoring to the pre-phase-2 backup and re-running phase 2. Requires external idempotence because phase 2 manipulations run twice (once before backup restore, once after). Checks marked @externally_idempotent(False) are excluded from this scenario.
BackupAndRestoreMulti
class BackupAndRestoreMulti(Scenario):
"""Repeated Backup and Restore operations."""
def actions(self) -> list[Action]:
return [
StartMz(self),
Initialize(self),
Backup(), KillMz(), Restore(),
Manipulate(self, phase=1),
Backup(), KillMz(), Restore(),
Manipulate(self, phase=2),
Backup(), KillMz(), Restore(),
Validate(self),
Backup(), KillMz(), Restore(),
Validate(self),
]
Performs multiple backup/restore cycles throughout the entire lifecycle, including after validation. Tests that repeated backup/restore operations do not corrupt state.