Implementation:Heibaiying BigData Notes Storm Parallelism Config
Overview
| Property | Value |
|---|---|
| Type | Wrapper Doc |
| Primary Class | org.apache.storm.Config
|
| Source | code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/ClusterWordCountApp.java:L14-32
|
Description
This page documents the APIs used to configure parallelism in a Storm topology, as demonstrated in the ClusterWordCountApp class. Storm provides three levels of parallelism control -- workers, executors, and tasks -- each configured through a different API call. Proper configuration of these parameters is essential for achieving optimal throughput and resource utilization in a production deployment.
Usage
Parallelism configuration is applied during topology construction, before submission to the cluster. The configuration is set on the Config object (for worker count) and on the TopologyBuilder declarations (for executor and task counts).
Code Reference
ClusterWordCountApp Source
public class ClusterWordCountApp {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout", new DataSourceSpout());
builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout");
builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt");
try {
StormSubmitter.submitTopology("ClusterWordCountApp",
new Config(), builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
e.printStackTrace();
}
}
}
API Breakdown
config.setNumWorkers(int)
Config config = new Config();
config.setNumWorkers(2);
| Parameter | Type | Description |
|---|---|---|
numWorkers |
int | The number of worker JVM processes to allocate for this topology across the cluster |
Effect: Storm distributes the topology's executors across the specified number of worker processes. Each worker runs in its own JVM, potentially on different cluster nodes. The default is 1 worker.
Scope: Topology-wide. This setting applies to all components in the topology.
builder.setBolt(id, bolt, parallelism_hint)
builder.setBolt("SplitBolt", new SplitBolt(), 4).shuffleGrouping("DataSourceSpout");
| Parameter | Type | Description |
|---|---|---|
id |
String | Unique identifier for this Bolt component |
bolt |
IRichBolt | The Bolt instance to register |
parallelism_hint |
int | The initial number of executors (threads) to allocate for this Bolt |
Effect: Creates the specified number of executor threads for this Bolt. Each executor independently calls execute() on incoming tuples. The default parallelism hint is 1.
Note: The same overload exists for setSpout(id, spout, parallelism_hint).
declarer.setNumTasks(int)
builder.setBolt("CountBolt", new CountBolt(), 4)
.setNumTasks(8)
.shuffleGrouping("SplitBolt");
| Parameter | Type | Description |
|---|---|---|
numTasks |
int | The total number of logical task instances for this component |
Effect: Sets the number of tasks (logical instances) for this component. Tasks are distributed among the executors. If numTasks is 8 and there are 4 executors, each executor runs 2 tasks. The default is one task per executor.
Runtime rebalancing: The task count is fixed at submission time, but the executor count can be changed at runtime via storm rebalance. Setting numTasks higher than the initial executor count allows scaling up executors without restarting the topology.
I/O Contract
This configuration does not change the data flow or tuple schema. It only affects the physical execution plan:
| Configuration | Default | Effect |
|---|---|---|
setNumWorkers(n) |
1 | Number of JVM processes across the cluster |
parallelism_hint |
1 | Number of executor threads per component |
setNumTasks(n) |
(equals parallelism_hint) | Number of logical instances per component |
Usage Examples
Example 1: Basic Parallelism Configuration
Config config = new Config();
config.setNumWorkers(2);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new DataSourceSpout(), 2);
builder.setBolt("split", new SplitBolt(), 4).shuffleGrouping("spout");
builder.setBolt("count", new CountBolt(), 4).fieldsGrouping("split", new Fields("word"));
StormSubmitter.submitTopology("WordCount", config, builder.createTopology());
This configures:
- 2 workers -- Two JVM processes on the cluster.
- 2 executors for the Spout -- Two threads producing tuples in parallel.
- 4 executors for SplitBolt -- Four threads splitting lines concurrently.
- 4 executors for CountBolt -- Four threads counting words (fields grouping ensures each word always reaches the same task).
Example 2: Configuring Tasks for Future Rebalancing
Config config = new Config();
config.setNumWorkers(3);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new DataSourceSpout(), 2).setNumTasks(4);
builder.setBolt("split", new SplitBolt(), 4).setNumTasks(8).shuffleGrouping("spout");
builder.setBolt("count", new CountBolt(), 4).setNumTasks(16).fieldsGrouping("split", new Fields("word"));
StormSubmitter.submitTopology("WordCount", config, builder.createTopology());
Later, the executor count can be increased without restart:
storm rebalance WordCount -n 4 -e spout=4 -e split=8 -e count=16
Example 3: Viewing Parallelism in Storm UI
After submission, the Storm UI displays:
- Number of workers allocated
- Number of executors and tasks per component
- Capacity metric for each component (values near 1.0 indicate a bottleneck)
Related Pages
| Relationship | Page |
|---|---|
| implements | Heibaiying_BigData_Notes_Storm_Parallelism_Configuration |
| related | Heibaiying_BigData_Notes_TopologyBuilder_Usage |
| related | Heibaiying_BigData_Notes_Storm_Topology_Submission |