Implementation:Apache Shardingsphere ClusterPersistRepository Init
| Knowledge Sources | |
|---|---|
| Domains | Cluster_Mode, Distributed_Coordination |
| Last Updated | 2026-02-10 00:00 GMT |
Overview
Concrete tool for establishing connections to distributed coordination services (ZooKeeper or etcd) provided by ShardingSphere's cluster repository provider modules.
Description
This page documents the init() method on both ZookeeperRepository and EtcdRepository, the two SPI implementations of the ClusterPersistRepository interface. Each provides a distinct initialization strategy tailored to its coordination backend.
ZookeeperRepository.init() (lines 73-78):
- Parses ZookeeperProperties from the configuration properties, extracting retry interval, max retries, session TTL, operation timeout, and optional digest authentication.
- Builds a CuratorFramework client with the parsed settings, configuring the connection string, namespace, retry policy (exponential backoff), and optional session/connection timeouts.
- Configures digest-based ACL authentication if a digest property is provided.
- Registers a SessionConnectionReconnectListener on the connection state listenable to handle automatic reconnection and compute node re-registration.
- Starts the client and blocks until the connection is established or the calculated timeout (
retryIntervalMilliseconds * maxRetries) elapses.
EtcdRepository.init() (lines 70-76):
- Parses EtcdProperties from the configuration properties.
- Builds an etcd Client by splitting the server list into endpoint URIs, setting the namespace as a
ByteSequence, and configuring the maximum inbound message size. - The etcd client connects lazily on first operation, so the
init()method returns immediately after client construction.
Usage
These init() methods are called internally by ClusterContextManagerBuilder.getClusterPersistRepository() during cluster-mode bootstrap. The specific implementation is selected via SPI based on the type field in ClusterPersistRepositoryConfiguration (e.g., "ZooKeeper" or "etcd").
Code Reference
Source Location -- ZookeeperRepository
- Repository: Apache ShardingSphere
- File:
mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java - Lines: 73-78 (init method), 80-112 (buildCuratorClient), 114-128 (initCuratorClient)
Source Location -- EtcdRepository
- Repository: Apache ShardingSphere
- File:
mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java - Lines: 70-76
Signature -- ZookeeperRepository
public final class ZookeeperRepository implements ClusterPersistRepository {
@Override
public void init(final ClusterPersistRepositoryConfiguration config, final ComputeNodeInstanceContext computeNodeInstanceContext) {
ZookeeperProperties zookeeperProps = new ZookeeperProperties(config.getProps());
client = buildCuratorClient(config, zookeeperProps);
client.getConnectionStateListenable().addListener(new SessionConnectionReconnectListener(computeNodeInstanceContext, this));
initCuratorClient(zookeeperProps);
}
}
Signature -- EtcdRepository
public final class EtcdRepository implements ClusterPersistRepository {
@Override
public void init(final ClusterPersistRepositoryConfiguration config, final ComputeNodeInstanceContext computeNodeInstanceContext) {
etcdProps = new EtcdProperties(config.getProps());
client = Client.builder().endpoints(Util.toURIs(Splitter.on(",").trimResults().splitToList(config.getServerLists())))
.namespace(ByteSequence.from(config.getNamespace(), StandardCharsets.UTF_8))
.maxInboundMessageSize((int) 32e9)
.build();
}
}
Import
import org.apache.shardingsphere.mode.repository.cluster.zookeeper.ZookeeperRepository;
import org.apache.shardingsphere.mode.repository.cluster.etcd.EtcdRepository;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| config | ClusterPersistRepositoryConfiguration | Yes | Configuration containing server lists, namespace, type, and provider-specific properties |
| computeNodeInstanceContext | ComputeNodeInstanceContext | Yes | Context of the current compute node instance, used for session reconnect handling |
Outputs
| Name | Type | Description |
|---|---|---|
| (void) | void | The method initializes internal client state; no return value |
Side Effects
| Effect | Description |
|---|---|
| Network connection | Establishes a connection to the ZooKeeper ensemble or etcd cluster |
| Session listener | Registers a reconnection listener (ZooKeeper) to re-register the compute node on session restore |
| Blocking wait | ZooKeeper implementation blocks until connected or timeout; etcd connects lazily |
ZooKeeper Configuration Properties
| Property Key | Default | Description |
|---|---|---|
| retryIntervalMilliseconds | 500 | Base interval between connection retry attempts |
| maxRetries | 3 | Maximum number of connection retry attempts |
| timeToLiveSeconds | 0 | Session timeout in seconds (0 uses ZooKeeper default) |
| operationTimeoutMilliseconds | 0 | Connection timeout in milliseconds (0 uses default) |
| digest | (empty) | Digest authentication string for ACL-based access control |
Usage Examples
// ZooKeeper initialization (called internally by ClusterContextManagerBuilder)
ClusterPersistRepositoryConfiguration config = new ClusterPersistRepositoryConfiguration(
"ZooKeeper", "my-namespace", "zk1:2181,zk2:2181,zk3:2181", new Properties());
ClusterPersistRepository repository = TypedSPILoader.getService(ClusterPersistRepository.class, config.getType(), config.getProps());
repository.init(config, computeNodeInstanceContext);
// etcd initialization
ClusterPersistRepositoryConfiguration etcdConfig = new ClusterPersistRepositoryConfiguration(
"etcd", "my-namespace", "http://etcd1:2379,http://etcd2:2379", new Properties());
ClusterPersistRepository etcdRepository = TypedSPILoader.getService(ClusterPersistRepository.class, etcdConfig.getType(), etcdConfig.getProps());
etcdRepository.init(etcdConfig, computeNodeInstanceContext);