Implementation:Apache Dolphinscheduler MasterClusters And WorkerClusters
| Knowledge Sources | |
|---|---|
| Domains | Distributed_Systems, Cluster_Management |
| Last Updated | 2026-02-10 00:00 GMT |
Overview
Concrete tool for tracking master and worker node health using MasterClusters, WorkerClusters, MasterSlotManager, and ClusterManager with registry-based heartbeat subscriptions.
Description
MasterClusters extends AbstractClusterSubscribeListener<MasterServerMetadata> and implements IClusters<MasterServerMetadata>. It maintains a map of active master nodes and provides getServers(), getNormalServers(), and listener registration. WorkerClusters extends AbstractClusterSubscribeListener<WorkerServerMetadata> and additionally tracks worker groups via getNormalWorkerServerAddressByGroup(String) and containsWorkerGroup(String). MasterSlotManager implements IMasterSlotReBalancer providing getCurrentMasterSlot(), getTotalMasterSlots(), and doReBalance(). ClusterManager initializes both cluster objects via start().
Usage
Automatically managed by the master server's Spring context. ClusterManager.start() is called during bootstrap.
Code Reference
Source Location
- Repository: dolphinscheduler
- File: dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterClusters.java (L37-98)
- File: dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClusters.java (L38-182)
- File: dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManager.java (L31-62)
- File: dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java (L34-87)
Signature
public class MasterClusters extends AbstractClusterSubscribeListener<MasterServerMetadata>
implements IClusters<MasterServerMetadata> {
public List<MasterServerMetadata> getServers();
public List<MasterServerMetadata> getNormalServers();
public void registerListener(IClustersChangeListener<MasterServerMetadata> listener);
@Override protected void onServerAdded(MasterServerMetadata server);
@Override protected void onServerRemove(MasterServerMetadata server);
@Override protected void onServerUpdate(MasterServerMetadata server);
}
public class WorkerClusters extends AbstractClusterSubscribeListener<WorkerServerMetadata>
implements IClusters<WorkerServerMetadata> {
public List<String> getNormalWorkerServerAddressByGroup(String workerGroup);
public boolean containsWorkerGroup(String workerGroup);
public void registerListener(IClustersChangeListener<WorkerServerMetadata> listener);
}
public class MasterSlotManager implements IMasterSlotReBalancer {
public int getCurrentMasterSlot();
public int getTotalMasterSlots();
public boolean checkSlotValid();
public void doReBalance(List<MasterServerMetadata> masters);
}
@Component
public class ClusterManager {
public void start();
}
Import
import org.apache.dolphinscheduler.server.master.cluster.MasterClusters;
import org.apache.dolphinscheduler.server.master.cluster.WorkerClusters;
import org.apache.dolphinscheduler.server.master.cluster.MasterSlotManager;
import org.apache.dolphinscheduler.server.master.cluster.ClusterManager;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| Registry heartbeats | JSON events | Yes | ADD/REMOVE/UPDATE events from service registry |
Outputs
| Name | Type | Description |
|---|---|---|
| Master topology | List<MasterServerMetadata> | Live list of active master nodes |
| Worker topology | Map<String, List<String>> | Worker group to address mapping |
| Slot assignment | int/int | Current slot and total slots for work distribution |
Usage Examples
Querying Worker Availability
// Check if a worker group has available workers
boolean available = workerClusters.containsWorkerGroup("data-team");
// Get worker addresses for a group
List<String> workers = workerClusters.getNormalWorkerServerAddressByGroup("data-team");
Checking Slot Assignment
// Check if this master should process a given workflow
int slot = masterSlotManager.getCurrentMasterSlot();
int total = masterSlotManager.getTotalMasterSlots();
boolean shouldProcess = (workflowCode % total == slot);