Implementation:Heibaiying BigData Notes TopologyBuilder Usage
Overview
| Property | Value |
|---|---|
| Type | Wrapper Doc |
| Primary Class | org.apache.storm.topology.TopologyBuilder
|
| Source | code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/LocalWordCountApp.java:L10-26
|
Description
This page documents the usage of TopologyBuilder as seen in the word-count application. The TopologyBuilder is Storm's primary API for constructing a topology DAG by registering Spouts and Bolts and defining the stream connections between them. Once all components are wired, the builder produces a StormTopology object that can be submitted to a cluster for execution.
Usage
The TopologyBuilder is used in the main() method of a Storm application to assemble the topology before submission. It is a builder-pattern API where components are added incrementally and the final topology is created with a single call.
Code Reference
Full Wiring Example from LocalWordCountApp
public class LocalWordCountApp {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout", new DataSourceSpout());
builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout");
builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalWordCountApp",
new Config(), builder.createTopology());
}
}
API Breakdown
TopologyBuilder.setSpout(id, spout)
builder.setSpout("DataSourceSpout", new DataSourceSpout());
| Parameter | Type | Description |
|---|---|---|
id |
String | Unique identifier for this Spout component within the topology |
spout |
IRichSpout | The Spout instance to register |
Returns: SpoutDeclarer, which can be used to further configure the Spout (e.g., setting parallelism hint or number of tasks).
TopologyBuilder.setBolt(id, bolt)
builder.setBolt("SplitBolt", new SplitBolt())
| Parameter | Type | Description |
|---|---|---|
id |
String | Unique identifier for this Bolt component within the topology |
bolt |
IRichBolt | The Bolt instance to register |
Returns: BoltDeclarer, which is used to specify input sources and stream groupings.
BoltDeclarer.shuffleGrouping(sourceId)
builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout");
| Parameter | Type | Description |
|---|---|---|
sourceId |
String | The component ID of the upstream Spout or Bolt from which this Bolt receives tuples |
Configures shuffle grouping, which distributes tuples randomly and evenly across all tasks of this Bolt. Other grouping methods available on BoltDeclarer include:
fieldsGrouping(sourceId, new Fields("fieldName"))-- Partition by field hash.allGrouping(sourceId)-- Broadcast to all tasks.globalGrouping(sourceId)-- Send all to single task with lowest ID.
TopologyBuilder.createTopology()
builder.createTopology()
Returns: StormTopology -- The fully constructed topology object, ready for submission to a cluster.
I/O Contract
| Component | Input Source | Grouping | Output |
|---|---|---|---|
| DataSourceSpout | (external data generation) | -- | Emits "line" field
|
| SplitBolt | DataSourceSpout | shuffleGrouping | Emits "word" field
|
| CountBolt | SplitBolt | shuffleGrouping | (terminal: prints to stdout) |
The resulting DAG is a simple linear chain:
DataSourceSpout --[shuffle]--> SplitBolt --[shuffle]--> CountBolt
Usage Examples
Example 1: Adding Fields Grouping for Accurate Counting
To ensure all tuples with the same word arrive at the same CountBolt task (important when running with parallelism > 1), use fieldsGrouping:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout", new DataSourceSpout());
builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout");
builder.setBolt("CountBolt", new CountBolt()).fieldsGrouping("SplitBolt", new Fields("word"));
Example 2: Fan-Out Topology
A single Spout can feed multiple Bolts:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("source", new DataSourceSpout());
builder.setBolt("processorA", new BoltA()).shuffleGrouping("source");
builder.setBolt("processorB", new BoltB()).shuffleGrouping("source");
Example 3: Fan-In Topology
A single Bolt can receive from multiple sources:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("source1", new SpoutA());
builder.setSpout("source2", new SpoutB());
builder.setBolt("merger", new MergerBolt())
.shuffleGrouping("source1")
.shuffleGrouping("source2");
Related Pages
| Relationship | Page |
|---|---|
| implements | Heibaiying_BigData_Notes_Storm_Topology_Wiring |
| related | Heibaiying_BigData_Notes_DataSourceSpout_Implementation |
| related | Heibaiying_BigData_Notes_SplitBolt_and_CountBolt |
| related | Heibaiying_BigData_Notes_Storm_Parallelism_Config |