Workflow:Apache Shardingsphere Cluster Mode Initialization
| Property | Value |
|---|---|
| Workflow Name | Cluster_Mode_Initialization |
| Project | Apache ShardingSphere |
| Repository | https://github.com/apache/shardingsphere |
| Documentation | https://shardingsphere.apache.org/document/current/en/user-manual/shardingsphere-jdbc/yaml-config/mode/ |
| Domains | Distributed_Database, Cluster_Management, Configuration |
| Trigger | Application startup with mode type set to Cluster in YAML or Java configuration
|
| Entry Point | ClusterContextManagerBuilder.build()
|
| Last Updated | 2026-02-10 12:00 GMT |
Overview
Description
The Cluster Mode Initialization workflow bootstraps Apache ShardingSphere in cluster mode, enabling distributed coordination across multiple compute nodes through either ZooKeeper or etcd as the underlying registry center. When a ShardingSphere instance (JDBC driver or Proxy) starts with its mode configured as Cluster, the SPI mechanism loads ClusterContextManagerBuilder, which orchestrates the full initialization sequence: establishing a connection to the coordination service, constructing the persistence layer, building metadata contexts from either local configuration or an existing registry center, registering the current compute node, and subscribing to cluster-wide data change events for dynamic governance.
This workflow is the primary entry path for any ShardingSphere deployment requiring multi-instance coordination, distributed locking, configuration synchronization, or centralized metadata management. It produces a fully initialized ContextManager that holds all runtime contexts needed by the ShardingSphere kernel.
Usage
Cluster mode is activated by specifying mode.type: Cluster in the YAML configuration along with a repository section identifying the coordination service. A typical ZooKeeper configuration looks like:
mode:
type: Cluster
repository:
type: ZooKeeper
props:
namespace: governance_ds
server-lists: localhost:2181
retryIntervalMilliseconds: 500
timeToLiveSeconds: 60
maxRetries: 3
operationTimeoutMilliseconds: 500
An equivalent etcd configuration replaces the repository type:
mode:
type: Cluster
repository:
type: etcd
props:
namespace: governance_ds
server-lists: http://localhost:2379
timeToLiveSeconds: 30
The ClusterContextManagerBuilder is loaded via the Java SPI mechanism. The SPI service file at META-INF/services/org.apache.shardingsphere.mode.manager.builder.ContextManagerBuilder maps the type string "Cluster" to ClusterContextManagerBuilder.
Execution Steps
Step 1: Configuration Read and Parameter Construction
| Aspect | Detail |
|---|---|
| Source File | ContextManagerBuilderParameter.java (63 lines)
|
| Package | org.apache.shardingsphere.mode.manager.builder
|
| Purpose | Aggregate all inputs needed for building the ContextManager |
The initialization begins when the application's YAML configuration (or programmatic Java configuration) is parsed into a ContextManagerBuilderParameter instance. This parameter object is a @RequiredArgsConstructor final class that bundles together the ModeConfiguration (containing mode type and repository settings), the map of DatabaseConfiguration entries keyed by database name, global data sources, global rule configurations, system properties, instance labels, and the InstanceMetaData identifying this compute node.
The getModeConfiguration() method provides a safety default: if no ModeConfiguration was supplied, it returns a standalone-mode configuration. In the cluster path, the caller always provides an explicit ModeConfiguration with type "Cluster" and a ClusterPersistRepositoryConfiguration as its repository field.
The ClusterPersistRepositoryConfiguration (40 lines) carries four fields: type (e.g., "ZooKeeper" or "etcd"), namespace (the root path in the coordination service), serverLists (comma-separated endpoint addresses), and props (implementation-specific properties such as retry intervals and timeouts).
Step 2: ClusterContextManagerBuilder Loaded via SPI
| Aspect | Detail |
|---|---|
| Source File | ClusterContextManagerBuilder.java (104 lines)
|
| Package | org.apache.shardingsphere.mode.manager.cluster
|
| SPI Interface | ContextManagerBuilder
|
| SPI Type Key | "Cluster"
|
ShardingSphere's SPI framework uses TypedSPILoader to locate the ContextManagerBuilder implementation whose getType() returns "Cluster". This resolves to ClusterContextManagerBuilder.
The build(ContextManagerBuilderParameter, EventBusContext) method is the top-level orchestration point. It performs the following sequence:
- Extracts the
ModeConfigurationand casts the repository configuration toClusterPersistRepositoryConfiguration. - Creates a
ComputeNodeInstanceContextwrapping aComputeNodeInstanceconstructed from the instance metadata and labels. - Delegates to
getClusterPersistRepository()to initialize the cluster repository connection (Step 3). - Initializes the
ComputeNodeInstanceContextwith aClusterWorkerIdGenerator(Step 7 details). - Creates an
ExclusiveOperatorEnginebacked byClusterExclusiveOperatorContextfor distributed exclusive operations. - Calls
MetaDataContextsFactory.create()to build theMetaDataContexts(Step 5). - Constructs the
ContextManagerwith the metadata, instance context, exclusive engine, and repository. - Calls
registerOnline()to register the compute node and set up event listeners (Steps 6 and 7). - Registers
DeliverEventSubscriberinstances loaded via SPI to the event bus.
Step 3: Cluster Repository Initialized
| Aspect | Detail |
|---|---|
| Source Files | ZookeeperRepository.java (298 lines), EtcdRepository.java (211 lines)
|
| SPI Interface | ClusterPersistRepository
|
| ZooKeeper Type Key | "ZooKeeper"
|
| etcd Type Key | "etcd"
|
The getClusterPersistRepository() method in ClusterContextManagerBuilder first validates that the ClusterPersistRepositoryConfiguration is not null (throwing MissingRequiredClusterRepositoryConfigurationException otherwise), then uses TypedSPILoader.getService(ClusterPersistRepository.class, config.getType(), config.getProps()) to load the appropriate implementation.
ZooKeeper path: ZookeeperRepository.init() creates a CuratorFramework client using CuratorFrameworkFactory.builder(). It configures the connection string from serverLists, sets the namespace, applies an ExponentialBackoffRetry policy with configurable retryIntervalMilliseconds and maxRetries, and optionally configures session timeout, connection timeout, and digest-based ACL authentication. The client is then started and blocks until connected (or throws OperationTimeoutException if the connection cannot be established within the retry window). A SessionConnectionReconnectListener is added to handle ZooKeeper session reconnection events.
etcd path: EtcdRepository.init() creates a jetcd Client from the provided endpoint list, configures the namespace as a byte-sequence prefix, and sets up lease-based time-to-live management for ephemeral keys. The etcd client uses gRPC for communication and supports TLS endpoints.
Both implementations provide the same ClusterPersistRepository contract: persist(), query(), delete(), getChildrenKeys(), watch(), persistEphemeral(), persistExclusiveEphemeral(), and getDistributedLock().
Step 4: Persistence Facade Constructed
| Aspect | Detail |
|---|---|
| Source Files | MetaDataPersistFacade.java (140 lines), PersistServiceFacade.java (62 lines), ClusterPersistServiceFacade.java (57 lines)
|
| Packages | org.apache.shardingsphere.mode.metadata.persist, org.apache.shardingsphere.mode.persist, org.apache.shardingsphere.mode.manager.cluster.persist.facade
|
The persistence layer is assembled in two stages:
Stage A -- MetaDataPersistFacade: During the MetaDataContextsFactory creation call, new MetaDataPersistFacade(repository) is constructed. This facade initializes a set of specialized persistence services backed by the cluster repository: VersionPersistService (version tracking), DataSourceUnitPersistService (data source pool configurations), DatabaseMetaDataPersistFacade (schema, table, and view metadata), DatabaseRulePersistService (per-database rule configurations), GlobalRulePersistService (global rules), PropertiesPersistService (system properties), and StatisticsPersistService (runtime statistics). All these services read from and write to the cluster coordination service (ZooKeeper or etcd) through the shared PersistRepository interface.
Stage B -- PersistServiceFacade and ClusterPersistServiceFacade: After the ContextManager is constructed, its constructor creates a PersistServiceFacade. This top-level facade wraps a new MetaDataPersistFacade, a StatePersistService, a QualifiedDataSourceStatePersistService, and a mode-specific ModePersistServiceFacade loaded via SPI (which resolves to ClusterPersistServiceFacade for cluster mode). The ClusterPersistServiceFacade holds ClusterMetaDataManagerPersistService (for coordinated metadata changes), ClusterComputeNodePersistService (for compute node registration), and ClusterProcessPersistService (for distributed process management). On shutdown, ClusterPersistServiceFacade.close() takes the compute node offline by removing its ephemeral registration.
Step 5: MetaDataContexts Created
| Aspect | Detail |
|---|---|
| Source Files | MetaDataContextsFactory.java (217 lines), LocalConfigurationMetaDataContextsInitFactory.java (101 lines), RegisterCenterMetaDataContextsInitFactory.java (134 lines)
|
| Package | org.apache.shardingsphere.mode.metadata.factory
|
MetaDataContextsFactory.create(param) decides the initialization strategy based on whether the registry center already contains registered database metadata:
Decision logic: containsRegisteredDatabases() checks whether DatabaseMetaDataPersistService.loadAllDatabaseNames() returns a non-empty collection from the cluster repository. If databases are already registered, the registry center is treated as the source of truth; otherwise, local configuration is used as the initial bootstrap source.
Path A -- LocalConfigurationMetaDataContextsInitFactory: Used for fresh cluster deployments where no prior metadata exists in the coordination service. It reads database configurations, rule configurations, and properties from the ContextManagerBuilderParameter (which originates from the local YAML file). It creates ShardingSphereDatabase instances via ShardingSphereDatabasesFactory.create(), connects to actual data sources, introspects schemas, builds rules, and produces the MetaDataContexts. After creation, it persists the database configurations, global rule configurations, schema metadata, and statistics back into the cluster repository so that subsequent nodes can use Path B.
Path B -- RegisterCenterMetaDataContextsInitFactory: Used when the cluster repository already has registered databases from a prior node's initialization. It loads data source configurations, database rule configurations, properties, and schema metadata from the registry center. It creates DataSourceGeneratedDatabaseConfiguration instances from persisted DataSourceConfiguration entries, builds databases using the same factory methods, and merges any schema views loaded from the registry. If the PERSIST_SCHEMAS_TO_REPOSITORY_ENABLED property is set, full schema structures are loaded from the repository; otherwise, schemas are built from actual data source connections and registry views are merged in.
Both paths ultimately produce a MetaDataContexts containing the ShardingSphereMetaData (all databases, global resource metadata, global rule metadata, and configuration properties) and ShardingSphereStatistics.
Step 6: Event Listeners Registered
| Aspect | Detail |
|---|---|
| Source File | DataChangedEventListenerRegistry.java (63 lines)
|
| Package | org.apache.shardingsphere.mode.manager.cluster.dispatch.listener
|
| Purpose | Subscribe to cluster coordination events for dynamic governance |
After the ContextManager is constructed and the compute node is registered online, the registerOnline() method in ClusterContextManagerBuilder creates a DataChangedEventListenerRegistry and calls its register() method.
The DataChangedEventListenerRegistry obtains the ClusterPersistRepository from the ContextManager's PersistServiceFacade and sets up two categories of listeners:
Database listeners: For each database name (determined by instance type -- JDBC mode uses the local configuration's database names, Proxy mode loads all database names from the registry), a DatabaseMetaDataChangedListener is registered. This listener watches the DatabaseMetaDataNodePath for each database in the coordination service. Any changes to database-level metadata (data sources, rules, schemas, tables) trigger event dispatch through the ContextManager.
Global listeners: All GlobalDataChangedEventHandler implementations loaded via SPI are iterated. For each handler, a GlobalMetaDataChangedListener is registered on the handler's subscribed node path. This covers cluster-wide events such as global rule changes, property updates, compute node state transitions, and qualified data source state changes.
Additionally, the ClusterContextManagerBuilder.build() method separately registers DeliverEventSubscriber instances (loaded via SPI) to the EventBusContext. These subscribers handle event delivery across the cluster by writing events to the coordination service, where other nodes' watchers pick them up.
Step 7: Compute Node Registered with Worker ID
| Aspect | Detail |
|---|---|
| Source Files | ClusterComputeNodePersistService.java (180 lines), ClusterWorkerIdGenerator.java (89 lines)
|
| Packages | org.apache.shardingsphere.mode.manager.cluster.persist.service, org.apache.shardingsphere.mode.manager.cluster.workerid
|
Compute node registration occurs in two phases:
Phase A -- Worker ID generation: Early in the build() method, ComputeNodeInstanceContext.init() is called with a ClusterWorkerIdGenerator. This generator first checks whether a worker ID is already persisted for the current instance ID. If found, it reuses the existing ID. If not, it enters a generation loop: it queries all currently assigned worker IDs from the cluster repository, computes the set of available IDs (from 0 to MAX_WORKER_ID), selects the smallest available ID via a priority queue, and attempts to reserve it through ReservationPersistService.reserveWorkerId(). The reservation uses an exclusive ephemeral node to prevent race conditions when multiple nodes start simultaneously. If the reservation fails (another node claimed the same ID), the loop retries with the next available ID. Upon success, the worker ID is persisted via ClusterComputeNodePersistService.persistWorkerId() as an ephemeral node. The worker ID is critical for ShardingSphere's distributed ID generation (e.g., Snowflake algorithm) to ensure uniqueness across cluster nodes.
Phase B -- Online registration: The registerOnline() method in ClusterContextManagerBuilder obtains the ClusterPersistServiceFacade from the ContextManager's PersistServiceFacade and calls computeNodeService.registerOnline(computeNodeInstance). This method persists an ephemeral node at the online path with the compute node's data (database name, attributes, version), updates the instance state (initially OK) as an ephemeral entry, and persists the instance labels as an ephemeral entry. Ephemeral nodes are used so that if the compute node loses its session with the coordination service, the registration is automatically cleaned up. After self-registration, the builder loads all existing cluster instances via computeNodeService.loadAllInstances() and populates the ClusterInstanceRegistry so the current node is aware of all peers.
Execution Diagram
Application Start (mode.type = Cluster)
|
v
[Step 1] Parse YAML / Java Config
|--- ModeConfiguration (type="Cluster", repository config)
|--- DatabaseConfiguration map
|--- Global rules, properties, labels, instance metadata
|
v
ContextManagerBuilderParameter constructed
|
v
[Step 2] SPI loads ClusterContextManagerBuilder (type="Cluster")
|
v
ClusterContextManagerBuilder.build()
|
+---> [Step 3] getClusterPersistRepository()
| |
| +---> TypedSPILoader resolves repository type
| | |
| | +---> "ZooKeeper" => ZookeeperRepository
| | | |--- CuratorFramework built and started
| | | |--- SessionConnectionReconnectListener added
| | | |--- blockUntilConnected()
| | |
| | +---> "etcd" => EtcdRepository
| | |--- jetcd Client created
| | |--- Namespace and lease configured
| |
| v
| ClusterPersistRepository ready
|
+---> ComputeNodeInstanceContext created
|
+---> ClusterWorkerIdGenerator initialized ----+
| |
+---> ExclusiveOperatorEngine created |
| |
+---> [Step 5] MetaDataContextsFactory.create() |
| | |
| +---> [Step 4] MetaDataPersistFacade |
| | |--- VersionPersistService |
| | |--- DataSourceUnitService |
| | |--- DatabaseMetaDataFacade |
| | |--- DatabaseRuleService |
| | |--- GlobalRuleService |
| | |--- PropertiesService |
| | |--- StatisticsService |
| | |
| +---> containsRegisteredDatabases()? |
| | | |
| | Yes | No |
| | | | | |
| | v | v |
| | Registry| Local Config |
| | Center | Init Factory |
| | Init | | |
| | Factory | +-> Build DBs from YAML |
| | | | +-> Persist to registry |
| | +-> Load configs from registry |
| | +-> Build DBs from persisted data |
| | | |
| v v |
| MetaDataContexts ready |
| |
+---> ContextManager constructed |
| |--- MetaDataContexts |
| |--- ComputeNodeInstanceContext |
| |--- ExclusiveOperatorEngine |
| |--- ExecutorEngine |
| |--- MetaDataContextManager |
| |--- PersistServiceFacade ----+ |
| |--- StateContext | |
| | |
| [Step 4 contd] | |
| PersistServiceFacade | |
| |--- MetaDataPersistFacade | |
| |--- StatePersistService | |
| |--- ModePersistServiceFacade |
| | (ClusterPersistServiceFacade)
| | |--- ClusterMetaDataManagerPersistService
| | |--- ClusterComputeNodePersistService
| | |--- ClusterProcessPersistService
| | |
+---> [Step 7] registerOnline() <-----------------+
| |
| +---> computeNodeService.registerOnline()
| | |--- persistEphemeral (online node)
| | |--- updateState (OK)
| | |--- persistLabels
| |
| +---> loadAllInstances() => populate ClusterInstanceRegistry
| |
| +---> [Step 6] DataChangedEventListenerRegistry.register()
| |
| +---> Per database: watch DatabaseMetaDataNodePath
| | => DatabaseMetaDataChangedListener
| |
| +---> Per GlobalDataChangedEventHandler (SPI):
| watch handler's subscribed path
| => GlobalMetaDataChangedListener
|
+---> DeliverEventSubscribers registered to EventBusContext
|
v
ContextManager returned (fully initialized)
|
v
ShardingSphere ready to serve in Cluster Mode