Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Heibaiying BigData Notes Storm Topology Submission

From Leeroopedia


Overview

Property Value
Type Wrapper Doc
Primary Classes org.apache.storm.LocalCluster, org.apache.storm.StormSubmitter
Source (local) code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/LocalWordCountApp.java:L21-23
Source (cluster) code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/ClusterWordCountApp.java:L25-29

Description

This page documents the two topology submission APIs demonstrated in the word-count application: LocalCluster for development testing and StormSubmitter for production cluster deployment. Both APIs accept the same three parameters (topology name, configuration, and topology object), making it straightforward to switch between local and remote deployment.

Usage

The submission call is typically the last step in a Storm application's main() method, after the topology has been constructed with TopologyBuilder.

Code Reference

Local Submission (LocalWordCountApp)

// Create a local in-process cluster for testing
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalWordCountApp",
        new Config(), builder.createTopology());

new LocalCluster()

Aspect Description
Constructor Creates a simulated Storm cluster running entirely within the current JVM process
No arguments Uses default configuration; no external Storm installation required
Threading Spouts and Bolts execute in separate threads within the same JVM

LocalCluster.submitTopology(name, conf, topology)

Parameter Type Description
name String A human-readable name for this topology instance (used in logging and UI)
conf Map (typically Config) Storm configuration (worker count, debug flags, etc.)
topology StormTopology The topology object produced by builder.createTopology()

Behavior: Starts the topology immediately within the current JVM. The method returns after the topology begins executing. The topology continues running in background threads until the JVM exits or cluster.shutdown() is called.

Remote/Cluster Submission (ClusterWordCountApp)

try {
    StormSubmitter.submitTopology("ClusterWordCountApp",
            new Config(), builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
    e.printStackTrace();
}

StormSubmitter.submitTopology(name, conf, topology)

Parameter Type Description
name String Unique name for this topology on the cluster (must not conflict with existing topologies)
conf Map (typically Config) Storm configuration including worker count and any custom settings
topology StormTopology The topology object produced by builder.createTopology()

Behavior: Uploads the topology JAR to the Nimbus daemon, which distributes it to Supervisor nodes. Nimbus then assigns tasks to workers based on the parallelism configuration. The topology runs indefinitely until explicitly killed.

Checked Exceptions:

Exception Cause
AlreadyAliveException A topology with the same name is already running on the cluster
InvalidTopologyException The topology DAG is malformed or contains invalid configuration
AuthorizationException The submitting user lacks permission to submit topologies

Command-Line Submission

For cluster deployment, the topology is submitted from the command line using the storm jar command:

storm jar /path/to/topology.jar com.heibaiying.wordcount.ClusterWordCountApp

This command:

  1. Adds the JAR to the classpath.
  2. Invokes the specified main class.
  3. The main class calls StormSubmitter.submitTopology() to upload the topology to Nimbus.

I/O Contract

The submission APIs do not alter the topology's data flow. They control where and how the topology executes:

API Execution Environment Persistence Use Case
LocalCluster.submitTopology() In-process (single JVM) Terminates with JVM Development and testing
StormSubmitter.submitTopology() Distributed cluster Runs indefinitely Production deployment

Usage Examples

Example 1: Local Mode with Shutdown

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("TestTopology", new Config(), builder.createTopology());

// Let it run for 30 seconds, then shut down
Thread.sleep(30000);
cluster.killTopology("TestTopology");
cluster.shutdown();

Example 2: Cluster Mode with Custom Configuration

Config config = new Config();
config.setNumWorkers(4);
config.setDebug(false);
config.setMaxSpoutPending(5000);

try {
    StormSubmitter.submitTopology("ProductionWordCount", config, builder.createTopology());
} catch (AlreadyAliveException e) {
    System.err.println("Topology already running. Kill it first: storm kill ProductionWordCount");
} catch (InvalidTopologyException e) {
    System.err.println("Invalid topology: " + e.getMessage());
} catch (AuthorizationException e) {
    System.err.println("Not authorized to submit topology");
}

Example 3: Conditional Local/Cluster Submission

if (args != null && args.length > 0) {
    // Cluster mode: topology name passed as argument
    StormSubmitter.submitTopology(args[0], config, builder.createTopology());
} else {
    // Local mode: no arguments
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("LocalTest", config, builder.createTopology());
}

Related Pages

Relationship Page
implements Heibaiying_BigData_Notes_Storm_Topology_Deployment
related Heibaiying_BigData_Notes_TopologyBuilder_Usage
related Heibaiying_BigData_Notes_Storm_Parallelism_Config
related Heibaiying_BigData_Notes_Maven_Packaging_for_Storm

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment