Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:MaterializeInc Materialize Service Init

From Leeroopedia


Knowledge Sources misc/python/materialize/mzcompose/service.py, misc/python/materialize/mzcompose/services/materialized.py, misc/python/materialize/mzcompose/services/kafka.py, misc/python/materialize/mzcompose/services/schema_registry.py
Domains Integration Testing, Service Composition, Docker Compose
Last Updated 2026-02-08

Overview

Concrete API for defining Docker Compose services in Materialize integration tests, provided by the materialize.mzcompose.service and materialize.mzcompose.services packages.

Description

The service initialization API consists of a base Service class and specialized subclasses that encapsulate Docker Compose service definitions as typed Python objects. The base Service class accepts a name string and a ServiceConfig typed dictionary. Subclasses like Materialized, Kafka, and SchemaRegistry provide rich constructors with domain-specific parameters that build the appropriate ServiceConfig internally.

The Materialized class is the most complex service definition, with over 30 constructor parameters controlling image selection, environment variables, command-line flags, resource limits, metadata store connectivity, blob storage, system parameter defaults, health checks, and more. It automatically wires up dependencies on metadata stores and blob stores based on the configuration provided.

The Kafka class wraps the Confluent Platform Kafka image with configurable broker settings, advertised listeners, and a health-checked dependency on ZooKeeper.

The SchemaRegistry class wraps the Confluent Schema Registry image with configurable Kafka bootstrap servers and a health-checked dependency on those Kafka brokers.

Usage

Use these classes when:

  • Defining the SERVICES list in an mzcompose.py file.
  • Creating custom service configurations for specific test scenarios.
  • Overriding service definitions at runtime via Composition.override().

Code Reference

Source Location

Class File Lines
Service misc/python/materialize/mzcompose/service.py L185-195
ServiceConfig misc/python/materialize/mzcompose/service.py L56-183
Materialized misc/python/materialize/mzcompose/services/materialized.py L62-395
Kafka misc/python/materialize/mzcompose/services/kafka.py L20-79
SchemaRegistry misc/python/materialize/mzcompose/services/schema_registry.py L15-73

Signature

Service (base class):

class Service:
    """A Docker Compose service in a Composition."""
    def __init__(self, name: str, config: ServiceConfig) -> None:
        self.name = name
        self.config = config

Materialized:

class Materialized(Service):
    def __init__(
        self,
        name: str | None = None,
        image: str | None = None,
        environment_extra: list[str] = [],
        volumes_extra: list[str] = [],
        depends_on: list[str] = [],
        memory: str | None = None,
        cpu: str | None = None,
        options: list[str] = [],
        persist_blob_url: str | None = None,
        default_size: int | str = Size.DEFAULT_SIZE,
        environment_id: str | None = None,
        propagate_crashes: bool = True,
        external_metadata_store: str | bool = EXTERNAL_METADATA_STORE_ADDRESS,
        external_blob_store: str | bool = False,
        blob_store_is_azure: bool = False,
        unsafe_mode: bool = True,
        restart: str | None = None,
        use_default_volumes: bool = True,
        ports: list[str] | list[int] | list[str | int] | None = None,
        system_parameter_defaults: dict[str, str] | None = None,
        additional_system_parameter_defaults: dict[str, str] | None = None,
        system_parameter_version: MzVersion | None = None,
        soft_assertions: bool = True,
        sanity_restart: bool = True,
        platform: str | None = None,
        healthcheck: list[str] | None = None,
        deploy_generation: int | None = None,
        force_migrations: str | None = None,
        publish: bool | None = None,
        stop_grace_period: str = "120s",
        metadata_store: str = METADATA_STORE,
        cluster_replica_size: dict[str, dict[str, Any]] | None = None,
        bootstrap_replica_size: str | None = None,
        default_replication_factor: int = 1,
        listeners_config_path: str = ...,
        config_sync_file_path: str | None = None,
        support_external_clusterd: bool = False,
        networks: dict[...] | None = None,
    ) -> None

Kafka:

class Kafka(Service):
    def __init__(
        self,
        name: str = "kafka",
        image: str = "confluentinc/cp-kafka",
        tag: str = DEFAULT_CONFLUENT_PLATFORM_VERSION,
        ports: list[str | int] | None = None,
        allow_host_ports: bool = False,
        auto_create_topics: bool = False,
        broker_id: int = 1,
        offsets_topic_replication_factor: int = 1,
        advertised_listeners: list[str] = [],
        environment: list[str] = [...],
        environment_extra: list[str] = [],
        depends_on_extra: list[str] = [],
        volumes: list[str] = [],
        platform: str | None = None,
    ) -> None

SchemaRegistry:

class SchemaRegistry(Service):
    def __init__(
        self,
        name: str = "schema-registry",
        aliases: list[str] = [],
        image: str = "confluentinc/cp-schema-registry",
        tag: str = DEFAULT_CONFLUENT_PLATFORM_VERSION,
        port: int = 8081,
        kafka_servers: list[tuple[str, str]] = [("kafka", "9092")],
        environment_extra: list[str] = [],
        depends_on_extra: list[str] = [],
        volumes: list[str] = [],
        platform: str | None = None,
    ) -> None

Import

from materialize.mzcompose.service import Service, ServiceConfig
from materialize.mzcompose.services.materialized import Materialized
from materialize.mzcompose.services.kafka import Kafka
from materialize.mzcompose.services.schema_registry import SchemaRegistry

I/O Contract

Inputs

Parameter Type Description
name str The Docker Compose service name (used as the hostname and service identifier)
config ServiceConfig A typed dictionary with Docker Compose service fields: image, mzbuild, ports, environment, depends_on, healthcheck, volumes, command, etc.
image (Materialized) None Docker image to use; if None, uses mzbuild: materialized to build from source
environment_extra list[str] Additional environment variables appended to the default set
depends_on list[str] Additional service dependencies (added with service_started condition)
external_metadata_store bool Controls metadata store connectivity; defaults to EXTERNAL_METADATA_STORE_ADDRESS
system_parameter_defaults None System parameter defaults passed via MZ_SYSTEM_PARAMETER_DEFAULT
kafka_servers (SchemaRegistry) list[tuple[str, str]] List of (host, port) tuples for Kafka bootstrap servers

Outputs

Attribute Type Description
self.name str The service name
self.config ServiceConfig The complete Docker Compose service configuration dictionary, ready for serialization to YAML
self.default_storage_size (Materialized) str Computed default storage size string (e.g., "scale=4,workers=1")
self.default_replica_size (Materialized) str Computed default replica size string (e.g., "scale=4,workers=4")

Usage Examples

Defining a basic service list in mzcompose.py:

from materialize.mzcompose.services.materialized import Materialized
from materialize.mzcompose.services.kafka import Kafka
from materialize.mzcompose.services.schema_registry import SchemaRegistry
from materialize.mzcompose.services.zookeeper import Zookeeper

SERVICES = [
    Zookeeper(),
    Kafka(),
    SchemaRegistry(),
    Materialized(),
]

Customizing Materialized with specific options:

from materialize.mzcompose.services.materialized import Materialized

SERVICES = [
    Materialized(
        default_size=1,
        soft_assertions=True,
        external_blob_store=True,
        additional_system_parameter_defaults={
            "enable_table_keys": "true",
        },
        options=["--log-filter=info"],
    ),
]

Creating a custom service from the base class:

from materialize.mzcompose.service import Service, ServiceConfig

class MyCustomService(Service):
    def __init__(self, name: str = "my-service") -> None:
        config: ServiceConfig = {
            "image": "my-org/my-image:latest",
            "ports": [8080],
            "healthcheck": {
                "test": ["CMD", "curl", "-f", "localhost:8080/health"],
                "interval": "1s",
                "start_period": "30s",
            },
        }
        super().__init__(name=name, config=config)

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment