Implementation:MaterializeInc Materialize Validate Action
| Knowledge Sources | misc/python/materialize/checks/actions.py, misc/python/materialize/checks/checks.py, misc/python/materialize/checks/executors.py |
|---|---|
| Domains | Upgrade Testing, Data Integrity Verification, Python API |
| Last Updated | 2026-02-08 |
Overview
Concrete validation action and executor infrastructure for post-upgrade data integrity checks, provided by the Validate class in materialize.checks.actions and the Executor hierarchy in materialize.checks.executors.
Description
The validation implementation is a collaboration between three components:
1. The Validate action class (in actions.py, lines 119–131):
Validate is a concrete Action subclass that orchestrates the validation phase across all checks in a scenario. Its constructor receives the scenario and stores its check_objects. When executed:
execute(e)iterates over all check objects, callingcheck.start_validate(e, self)on each. This triggers each check to create its validation action (TestdriveorPyAction) and begin executing it.join(e)iterates over all check objects, callingcheck.join_validate(e)on each. This waits for each validation to complete and raises on failure.
2. The Check.validate() lifecycle method (in checks.py, lines 60–62):
Each Check subclass implements validate() to return a Testdrive or PyAction containing the actual assertions. The start_validate/join_validate wrapper methods on Check handle guard logic (checking _can_run() and enabled) and executor delegation.
3. The Executor hierarchy (in executors.py):
Executors bridge the gap between the abstract action model and concrete execution backends:
MzcomposeExecutor— Runs testdrive commands synchronously against a Docker Compose-managed Materialize instance. Callscomposition.testdrive()for testdrive actions and opens a direct SQL connection for PyActions.MzcomposeExecutorParallel— Runs testdrive commands in separate threads for parallel check execution. Usesthreading.Threadand propagates exceptions onjoin().CloudtestExecutor— Runs testdrive commands against a Kubernetes-managed Materialize application via the cloudtest framework.
All executors share the current_mz_version attribute, which is updated by StartMz and read by checks that need version-aware assertions.
Usage
Use the Validate action in scenario action lists to trigger the validation phase. Use MzcomposeExecutor for local Docker-based testing and CloudtestExecutor for cloud-based testing. The parallel executor is used for performance-sensitive CI runs.
Code Reference
Source Location
misc/python/materialize/checks/actions.py, lines 119–131 (Validateclass).misc/python/materialize/checks/checks.py, lines 60–62 (Check.validate()), lines 90–98 (start_validate/join_validate).misc/python/materialize/checks/executors.py, lines 22–53 (Executorbase), lines 56–84 (MzcomposeExecutor), lines 86–147 (MzcomposeExecutorParallel).
Signature
Validate action:
class Validate(Action):
def __init__(self, scenario: "Scenario", mz_service: str | None = None) -> None: ...
def execute(self, e: Executor) -> None: ...
def join(self, e: Executor) -> None: ...
Check.validate():
class Check:
def validate(self) -> Testdrive | PyAction: ...
def start_validate(self, e: Executor, a: "Action") -> None: ...
def join_validate(self, e: Executor) -> None: ...
MzcomposeExecutor:
class MzcomposeExecutor(Executor):
def __init__(self, composition: Composition) -> None: ...
def testdrive(
self,
input: str,
caller: Traceback | None = None,
mz_service: str | None = None,
) -> None: ...
def run_pyaction(
self,
method: Callable,
mz_service: str | None = None,
) -> None: ...
Import
from materialize.checks.actions import Validate
from materialize.checks.executors import MzcomposeExecutor, CloudtestExecutor
I/O Contract
Inputs
| Parameter | Type | Description |
|---|---|---|
scenario (on Validate) |
Scenario |
The parent scenario, providing the list of instantiated check objects. |
mz_service (on Validate) |
None | Optional service name override for multi-service upgrade scenarios (e.g., "mz_1" or "mz_2").
|
e (on execute/join) |
Executor |
The execution backend that runs testdrive or Python actions. |
Outputs
| Method | Side Effect | Description |
|---|---|---|
Validate.execute() |
Starts all check validations | Iterates over checks and calls start_validate() on each.
|
Validate.join() |
Waits for completions, raises on failure | Iterates over checks and calls join_validate() on each. Raises if any assertion fails.
|
Check.validate() |
Returns action | Creates and returns a Testdrive or PyAction containing the check's assertions.
|
Usage Examples
Adding validation to a scenario action list:
from materialize.checks.actions import Initialize, Manipulate, Validate
from materialize.checks.mzcompose_actions import StartMz, KillMz
class MyScenario(Scenario):
def actions(self) -> list[Action]:
return [
StartMz(self, tag=self.base_version()),
Initialize(self),
Manipulate(self, phase=1),
KillMz(capture_logs=True),
StartMz(self, tag=None),
Manipulate(self, phase=2),
Validate(self),
# Second validation after restart to catch idempotency issues
KillMz(),
StartMz(self, tag=None),
Validate(self),
]
Writing a validate() method in a Check subclass:
from materialize.checks.checks import Check
from materialize.checks.actions import Testdrive
class CheckTablePersistence(Check):
def initialize(self) -> Testdrive:
return Testdrive(
"""
> CREATE TABLE persist_test (id INT, val TEXT);
> INSERT INTO persist_test VALUES (1, 'before_upgrade');
"""
)
def manipulate(self) -> list[Testdrive]:
return [
Testdrive(
"> INSERT INTO persist_test VALUES (2, 'during_upgrade_phase1');"
),
Testdrive(
"> INSERT INTO persist_test VALUES (3, 'during_upgrade_phase2');"
),
]
def validate(self) -> Testdrive:
return Testdrive(
"""
> SELECT count(*) FROM persist_test;
3
> SELECT val FROM persist_test ORDER BY id;
before_upgrade
during_upgrade_phase1
during_upgrade_phase2
"""
)
Using MzcomposeExecutor for validation:
from materialize.checks.executors import MzcomposeExecutor
executor = MzcomposeExecutor(composition)
validate_action = Validate(scenario)
validate_action.execute(executor)
validate_action.join(executor) # Raises if any check fails