Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Heibaiying BigData Notes Storm Parallelism Config

From Leeroopedia


Overview

Property Value
Type Wrapper Doc
Primary Class org.apache.storm.Config
Source code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/ClusterWordCountApp.java:L14-32

Description

This page documents the APIs used to configure parallelism in a Storm topology, as demonstrated in the ClusterWordCountApp class. Storm provides three levels of parallelism control -- workers, executors, and tasks -- each configured through a different API call. Proper configuration of these parameters is essential for achieving optimal throughput and resource utilization in a production deployment.

Usage

Parallelism configuration is applied during topology construction, before submission to the cluster. The configuration is set on the Config object (for worker count) and on the TopologyBuilder declarations (for executor and task counts).

Code Reference

ClusterWordCountApp Source

public class ClusterWordCountApp {

    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("DataSourceSpout", new DataSourceSpout());
        builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout");
        builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt");

        try {
            StormSubmitter.submitTopology("ClusterWordCountApp",
                    new Config(), builder.createTopology());
        } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
            e.printStackTrace();
        }
    }
}

API Breakdown

config.setNumWorkers(int)

Config config = new Config();
config.setNumWorkers(2);
Parameter Type Description
numWorkers int The number of worker JVM processes to allocate for this topology across the cluster

Effect: Storm distributes the topology's executors across the specified number of worker processes. Each worker runs in its own JVM, potentially on different cluster nodes. The default is 1 worker.

Scope: Topology-wide. This setting applies to all components in the topology.

builder.setBolt(id, bolt, parallelism_hint)

builder.setBolt("SplitBolt", new SplitBolt(), 4).shuffleGrouping("DataSourceSpout");
Parameter Type Description
id String Unique identifier for this Bolt component
bolt IRichBolt The Bolt instance to register
parallelism_hint int The initial number of executors (threads) to allocate for this Bolt

Effect: Creates the specified number of executor threads for this Bolt. Each executor independently calls execute() on incoming tuples. The default parallelism hint is 1.

Note: The same overload exists for setSpout(id, spout, parallelism_hint).

declarer.setNumTasks(int)

builder.setBolt("CountBolt", new CountBolt(), 4)
       .setNumTasks(8)
       .shuffleGrouping("SplitBolt");
Parameter Type Description
numTasks int The total number of logical task instances for this component

Effect: Sets the number of tasks (logical instances) for this component. Tasks are distributed among the executors. If numTasks is 8 and there are 4 executors, each executor runs 2 tasks. The default is one task per executor.

Runtime rebalancing: The task count is fixed at submission time, but the executor count can be changed at runtime via storm rebalance. Setting numTasks higher than the initial executor count allows scaling up executors without restarting the topology.

I/O Contract

This configuration does not change the data flow or tuple schema. It only affects the physical execution plan:

Configuration Default Effect
setNumWorkers(n) 1 Number of JVM processes across the cluster
parallelism_hint 1 Number of executor threads per component
setNumTasks(n) (equals parallelism_hint) Number of logical instances per component

Usage Examples

Example 1: Basic Parallelism Configuration

Config config = new Config();
config.setNumWorkers(2);

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new DataSourceSpout(), 2);
builder.setBolt("split", new SplitBolt(), 4).shuffleGrouping("spout");
builder.setBolt("count", new CountBolt(), 4).fieldsGrouping("split", new Fields("word"));

StormSubmitter.submitTopology("WordCount", config, builder.createTopology());

This configures:

  • 2 workers -- Two JVM processes on the cluster.
  • 2 executors for the Spout -- Two threads producing tuples in parallel.
  • 4 executors for SplitBolt -- Four threads splitting lines concurrently.
  • 4 executors for CountBolt -- Four threads counting words (fields grouping ensures each word always reaches the same task).

Example 2: Configuring Tasks for Future Rebalancing

Config config = new Config();
config.setNumWorkers(3);

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new DataSourceSpout(), 2).setNumTasks(4);
builder.setBolt("split", new SplitBolt(), 4).setNumTasks(8).shuffleGrouping("spout");
builder.setBolt("count", new CountBolt(), 4).setNumTasks(16).fieldsGrouping("split", new Fields("word"));

StormSubmitter.submitTopology("WordCount", config, builder.createTopology());

Later, the executor count can be increased without restart:

storm rebalance WordCount -n 4 -e spout=4 -e split=8 -e count=16

Example 3: Viewing Parallelism in Storm UI

After submission, the Storm UI displays:

  • Number of workers allocated
  • Number of executors and tasks per component
  • Capacity metric for each component (values near 1.0 indicate a bottleneck)

Related Pages

Relationship Page
implements Heibaiying_BigData_Notes_Storm_Parallelism_Configuration
related Heibaiying_BigData_Notes_TopologyBuilder_Usage
related Heibaiying_BigData_Notes_Storm_Topology_Submission

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment