Implementation:MaterializeInc Materialize Composition Up Override
| Knowledge Sources | misc/python/materialize/mzcompose/composition.py
|
|---|---|
| Domains | Integration Testing, Service Orchestration, Docker Compose |
| Last Updated | 2026-02-08 |
Overview
Concrete API for starting services and dynamically overriding service definitions within integration test workflows, provided by the Composition class in materialize.mzcompose.composition.
Description
The Composition.up() method builds, (re)creates, and starts named services by delegating to docker compose up. It supports detached mode, health check waiting, idle service startup, and configurable retry behavior. Services can be specified either as string names (referring to already-defined services) or as Service objects.
The Composition.override() method is a context manager that temporarily replaces one or more service definitions in the composition. It deep-copies the current composition state, applies the new service definitions, pulls images if needed, and restores the original state when the context exits. The override performs a wholesale replacement of the service configuration, not a deep merge. It also handles a special case for the sanity_restart label on the materialized service, preserving the disabled state if the override removed the label.
Usage
Use these methods when:
- Starting services at the beginning of a workflow or at specific test phases.
- Testing version upgrade scenarios by overriding the Materialized image.
- Changing environment variables or command flags for a specific test step.
- Starting idle containers for testdrive or other exec-based interactions.
Code Reference
Source Location
| Method | File | Lines |
|---|---|---|
Composition.up() |
misc/python/materialize/mzcompose/composition.py |
L1020-1071 |
Composition.override() |
misc/python/materialize/mzcompose/composition.py |
L578-640 |
Signature
Composition.up():
def up(
self,
*services: str | Service,
detach: bool = True,
wait: bool = True,
max_tries: int = 5,
) -> None
Composition.override():
@contextmanager
def override(
self, *services: "MzComposeService", fail_on_new_service: bool = True
) -> Iterator[None]
Import
from materialize.mzcompose.composition import Composition
from materialize.mzcompose.services.materialized import Materialized
I/O Contract
Inputs
| Parameter | Type | Description |
|---|---|---|
*services (up) |
Service | Variadic list of service names or Service objects to start. Service objects with idle=True have their entrypoint replaced with sleep infinity.
|
detach |
bool |
Run containers in the background; defaults to True
|
wait |
bool |
Wait for health checks to complete before returning; implies detach; defaults to True
|
max_tries |
int |
Number of retry attempts on failure; defaults to 5 (increased to 300 in CI when waiting for builds) |
*services (override) |
MzComposeService |
Variadic list of service objects whose definitions will temporarily replace the existing ones |
fail_on_new_service |
bool |
If True, raises an assertion error if a service name does not already exist in the composition; defaults to True
|
Outputs
| Method | Return Type | Description |
|---|---|---|
up() |
None |
Starts services and blocks until healthy (when wait=True); raises on failure after exhausting retries
|
override() |
Context manager yielding None |
Yields control to the with block with modified service definitions; restores original definitions on exit
|
Usage Examples
Starting services in a workflow:
def workflow_default(c: Composition):
c.up("materialized", "zookeeper", "kafka", "schema-registry")
# All services are now running and healthy
c.sql("SELECT 1")
Overriding Materialized for an upgrade test:
def workflow_upgrade(c: Composition):
# Start with the old version
old_mz = Materialized(image="materialize/materialized:v0.80.0")
with c.override(old_mz):
c.up("materialized")
c.sql("CREATE TABLE t (a INT)")
c.sql("INSERT INTO t VALUES (1)")
# Restart with the new (current) version -- override is now restored
c.up("materialized")
c.sql("SELECT * FROM t")
Overriding with custom environment variables:
def workflow_custom_config(c: Composition):
custom_mz = Materialized(
environment_extra=["MZ_LOG_FILTER=debug"],
additional_system_parameter_defaults={
"enable_table_keys": "true",
},
)
with c.override(custom_mz):
c.up("materialized")
c.sql("SHOW cluster")
Starting an idle service for testdrive:
from materialize.mzcompose.service import Service
def workflow_default(c: Composition):
c.up("materialized")
# Start testdrive container in idle mode (sleep infinity)
c.up(Service("testdrive", idle=True))
# Now execute testdrive scripts via docker compose exec
c.testdrive(
"""
> SELECT 1
1
"""
)