Implementation:Heibaiying BigData Notes Storm Topology Submission
Overview
| Property | Value |
|---|---|
| Type | Wrapper Doc |
| Primary Classes | org.apache.storm.LocalCluster, org.apache.storm.StormSubmitter
|
| Source (local) | code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/LocalWordCountApp.java:L21-23
|
| Source (cluster) | code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/ClusterWordCountApp.java:L25-29
|
Description
This page documents the two topology submission APIs demonstrated in the word-count application: LocalCluster for development testing and StormSubmitter for production cluster deployment. Both APIs accept the same three parameters (topology name, configuration, and topology object), making it straightforward to switch between local and remote deployment.
Usage
The submission call is typically the last step in a Storm application's main() method, after the topology has been constructed with TopologyBuilder.
Code Reference
Local Submission (LocalWordCountApp)
// Create a local in-process cluster for testing
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalWordCountApp",
new Config(), builder.createTopology());
new LocalCluster()
| Aspect | Description |
|---|---|
| Constructor | Creates a simulated Storm cluster running entirely within the current JVM process |
| No arguments | Uses default configuration; no external Storm installation required |
| Threading | Spouts and Bolts execute in separate threads within the same JVM |
LocalCluster.submitTopology(name, conf, topology)
| Parameter | Type | Description |
|---|---|---|
name |
String | A human-readable name for this topology instance (used in logging and UI) |
conf |
Map (typically Config) | Storm configuration (worker count, debug flags, etc.) |
topology |
StormTopology | The topology object produced by builder.createTopology()
|
Behavior: Starts the topology immediately within the current JVM. The method returns after the topology begins executing. The topology continues running in background threads until the JVM exits or cluster.shutdown() is called.
Remote/Cluster Submission (ClusterWordCountApp)
try {
StormSubmitter.submitTopology("ClusterWordCountApp",
new Config(), builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
e.printStackTrace();
}
StormSubmitter.submitTopology(name, conf, topology)
| Parameter | Type | Description |
|---|---|---|
name |
String | Unique name for this topology on the cluster (must not conflict with existing topologies) |
conf |
Map (typically Config) | Storm configuration including worker count and any custom settings |
topology |
StormTopology | The topology object produced by builder.createTopology()
|
Behavior: Uploads the topology JAR to the Nimbus daemon, which distributes it to Supervisor nodes. Nimbus then assigns tasks to workers based on the parallelism configuration. The topology runs indefinitely until explicitly killed.
Checked Exceptions:
| Exception | Cause |
|---|---|
AlreadyAliveException |
A topology with the same name is already running on the cluster |
InvalidTopologyException |
The topology DAG is malformed or contains invalid configuration |
AuthorizationException |
The submitting user lacks permission to submit topologies |
Command-Line Submission
For cluster deployment, the topology is submitted from the command line using the storm jar command:
storm jar /path/to/topology.jar com.heibaiying.wordcount.ClusterWordCountApp
This command:
- Adds the JAR to the classpath.
- Invokes the specified main class.
- The main class calls
StormSubmitter.submitTopology()to upload the topology to Nimbus.
I/O Contract
The submission APIs do not alter the topology's data flow. They control where and how the topology executes:
| API | Execution Environment | Persistence | Use Case |
|---|---|---|---|
LocalCluster.submitTopology() |
In-process (single JVM) | Terminates with JVM | Development and testing |
StormSubmitter.submitTopology() |
Distributed cluster | Runs indefinitely | Production deployment |
Usage Examples
Example 1: Local Mode with Shutdown
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("TestTopology", new Config(), builder.createTopology());
// Let it run for 30 seconds, then shut down
Thread.sleep(30000);
cluster.killTopology("TestTopology");
cluster.shutdown();
Example 2: Cluster Mode with Custom Configuration
Config config = new Config();
config.setNumWorkers(4);
config.setDebug(false);
config.setMaxSpoutPending(5000);
try {
StormSubmitter.submitTopology("ProductionWordCount", config, builder.createTopology());
} catch (AlreadyAliveException e) {
System.err.println("Topology already running. Kill it first: storm kill ProductionWordCount");
} catch (InvalidTopologyException e) {
System.err.println("Invalid topology: " + e.getMessage());
} catch (AuthorizationException e) {
System.err.println("Not authorized to submit topology");
}
Example 3: Conditional Local/Cluster Submission
if (args != null && args.length > 0) {
// Cluster mode: topology name passed as argument
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
} else {
// Local mode: no arguments
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalTest", config, builder.createTopology());
}
Related Pages
| Relationship | Page |
|---|---|
| implements | Heibaiying_BigData_Notes_Storm_Topology_Deployment |
| related | Heibaiying_BigData_Notes_TopologyBuilder_Usage |
| related | Heibaiying_BigData_Notes_Storm_Parallelism_Config |
| related | Heibaiying_BigData_Notes_Maven_Packaging_for_Storm |