Implementation:MaterializeInc Materialize Service Init
| Knowledge Sources | misc/python/materialize/mzcompose/service.py, misc/python/materialize/mzcompose/services/materialized.py, misc/python/materialize/mzcompose/services/kafka.py, misc/python/materialize/mzcompose/services/schema_registry.py
|
|---|---|
| Domains | Integration Testing, Service Composition, Docker Compose |
| Last Updated | 2026-02-08 |
Overview
Concrete API for defining Docker Compose services in Materialize integration tests, provided by the materialize.mzcompose.service and materialize.mzcompose.services packages.
Description
The service initialization API consists of a base Service class and specialized subclasses that encapsulate Docker Compose service definitions as typed Python objects. The base Service class accepts a name string and a ServiceConfig typed dictionary. Subclasses like Materialized, Kafka, and SchemaRegistry provide rich constructors with domain-specific parameters that build the appropriate ServiceConfig internally.
The Materialized class is the most complex service definition, with over 30 constructor parameters controlling image selection, environment variables, command-line flags, resource limits, metadata store connectivity, blob storage, system parameter defaults, health checks, and more. It automatically wires up dependencies on metadata stores and blob stores based on the configuration provided.
The Kafka class wraps the Confluent Platform Kafka image with configurable broker settings, advertised listeners, and a health-checked dependency on ZooKeeper.
The SchemaRegistry class wraps the Confluent Schema Registry image with configurable Kafka bootstrap servers and a health-checked dependency on those Kafka brokers.
Usage
Use these classes when:
- Defining the
SERVICESlist in anmzcompose.pyfile. - Creating custom service configurations for specific test scenarios.
- Overriding service definitions at runtime via
Composition.override().
Code Reference
Source Location
| Class | File | Lines |
|---|---|---|
Service |
misc/python/materialize/mzcompose/service.py |
L185-195 |
ServiceConfig |
misc/python/materialize/mzcompose/service.py |
L56-183 |
Materialized |
misc/python/materialize/mzcompose/services/materialized.py |
L62-395 |
Kafka |
misc/python/materialize/mzcompose/services/kafka.py |
L20-79 |
SchemaRegistry |
misc/python/materialize/mzcompose/services/schema_registry.py |
L15-73 |
Signature
Service (base class):
class Service:
"""A Docker Compose service in a Composition."""
def __init__(self, name: str, config: ServiceConfig) -> None:
self.name = name
self.config = config
Materialized:
class Materialized(Service):
def __init__(
self,
name: str | None = None,
image: str | None = None,
environment_extra: list[str] = [],
volumes_extra: list[str] = [],
depends_on: list[str] = [],
memory: str | None = None,
cpu: str | None = None,
options: list[str] = [],
persist_blob_url: str | None = None,
default_size: int | str = Size.DEFAULT_SIZE,
environment_id: str | None = None,
propagate_crashes: bool = True,
external_metadata_store: str | bool = EXTERNAL_METADATA_STORE_ADDRESS,
external_blob_store: str | bool = False,
blob_store_is_azure: bool = False,
unsafe_mode: bool = True,
restart: str | None = None,
use_default_volumes: bool = True,
ports: list[str] | list[int] | list[str | int] | None = None,
system_parameter_defaults: dict[str, str] | None = None,
additional_system_parameter_defaults: dict[str, str] | None = None,
system_parameter_version: MzVersion | None = None,
soft_assertions: bool = True,
sanity_restart: bool = True,
platform: str | None = None,
healthcheck: list[str] | None = None,
deploy_generation: int | None = None,
force_migrations: str | None = None,
publish: bool | None = None,
stop_grace_period: str = "120s",
metadata_store: str = METADATA_STORE,
cluster_replica_size: dict[str, dict[str, Any]] | None = None,
bootstrap_replica_size: str | None = None,
default_replication_factor: int = 1,
listeners_config_path: str = ...,
config_sync_file_path: str | None = None,
support_external_clusterd: bool = False,
networks: dict[...] | None = None,
) -> None
Kafka:
class Kafka(Service):
def __init__(
self,
name: str = "kafka",
image: str = "confluentinc/cp-kafka",
tag: str = DEFAULT_CONFLUENT_PLATFORM_VERSION,
ports: list[str | int] | None = None,
allow_host_ports: bool = False,
auto_create_topics: bool = False,
broker_id: int = 1,
offsets_topic_replication_factor: int = 1,
advertised_listeners: list[str] = [],
environment: list[str] = [...],
environment_extra: list[str] = [],
depends_on_extra: list[str] = [],
volumes: list[str] = [],
platform: str | None = None,
) -> None
SchemaRegistry:
class SchemaRegistry(Service):
def __init__(
self,
name: str = "schema-registry",
aliases: list[str] = [],
image: str = "confluentinc/cp-schema-registry",
tag: str = DEFAULT_CONFLUENT_PLATFORM_VERSION,
port: int = 8081,
kafka_servers: list[tuple[str, str]] = [("kafka", "9092")],
environment_extra: list[str] = [],
depends_on_extra: list[str] = [],
volumes: list[str] = [],
platform: str | None = None,
) -> None
Import
from materialize.mzcompose.service import Service, ServiceConfig
from materialize.mzcompose.services.materialized import Materialized
from materialize.mzcompose.services.kafka import Kafka
from materialize.mzcompose.services.schema_registry import SchemaRegistry
I/O Contract
Inputs
| Parameter | Type | Description |
|---|---|---|
name |
str |
The Docker Compose service name (used as the hostname and service identifier) |
config |
ServiceConfig |
A typed dictionary with Docker Compose service fields: image, mzbuild, ports, environment, depends_on, healthcheck, volumes, command, etc.
|
image (Materialized) |
None | Docker image to use; if None, uses mzbuild: materialized to build from source
|
environment_extra |
list[str] |
Additional environment variables appended to the default set |
depends_on |
list[str] |
Additional service dependencies (added with service_started condition)
|
external_metadata_store |
bool | Controls metadata store connectivity; defaults to EXTERNAL_METADATA_STORE_ADDRESS
|
system_parameter_defaults |
None | System parameter defaults passed via MZ_SYSTEM_PARAMETER_DEFAULT
|
kafka_servers (SchemaRegistry) |
list[tuple[str, str]] |
List of (host, port) tuples for Kafka bootstrap servers
|
Outputs
| Attribute | Type | Description |
|---|---|---|
self.name |
str |
The service name |
self.config |
ServiceConfig |
The complete Docker Compose service configuration dictionary, ready for serialization to YAML |
self.default_storage_size (Materialized) |
str |
Computed default storage size string (e.g., "scale=4,workers=1")
|
self.default_replica_size (Materialized) |
str |
Computed default replica size string (e.g., "scale=4,workers=4")
|
Usage Examples
Defining a basic service list in mzcompose.py:
from materialize.mzcompose.services.materialized import Materialized
from materialize.mzcompose.services.kafka import Kafka
from materialize.mzcompose.services.schema_registry import SchemaRegistry
from materialize.mzcompose.services.zookeeper import Zookeeper
SERVICES = [
Zookeeper(),
Kafka(),
SchemaRegistry(),
Materialized(),
]
Customizing Materialized with specific options:
from materialize.mzcompose.services.materialized import Materialized
SERVICES = [
Materialized(
default_size=1,
soft_assertions=True,
external_blob_store=True,
additional_system_parameter_defaults={
"enable_table_keys": "true",
},
options=["--log-filter=info"],
),
]
Creating a custom service from the base class:
from materialize.mzcompose.service import Service, ServiceConfig
class MyCustomService(Service):
def __init__(self, name: str = "my-service") -> None:
config: ServiceConfig = {
"image": "my-org/my-image:latest",
"ports": [8080],
"healthcheck": {
"test": ["CMD", "curl", "-f", "localhost:8080/health"],
"interval": "1s",
"start_period": "30s",
},
}
super().__init__(name=name, config=config)