Implementation:Heibaiying BigData Notes SplitBolt and CountBolt
Overview
| Property | Value |
|---|---|
| Type | API Doc |
| Classes | com.heibaiying.wordcount.component.SplitBolt, com.heibaiying.wordcount.component.CountBolt
|
| Extends | org.apache.storm.topology.base.BaseRichBolt
|
| Source (SplitBolt) | code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/component/SplitBolt.java:L13-35
|
| Source (CountBolt) | code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/component/CountBolt.java:L12-40
|
Description
SplitBolt and CountBolt are two complementary Bolt implementations that form the processing pipeline in the word-count topology. Together with the DataSourceSpout, they demonstrate the classic MapReduce-style pattern in a streaming context:
- SplitBolt acts as the mapper: it receives tab-separated word lines from the Spout and splits them into individual words, emitting one tuple per word.
- CountBolt acts as the reducer: it receives individual words and maintains a running count for each word in an in-memory HashMap, printing the accumulated results.
Both classes extend BaseRichBolt and implement the three required lifecycle methods: prepare(), execute(), and declareOutputFields().
Usage
These Bolts are wired into the topology using TopologyBuilder.setBolt() with appropriate stream groupings:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout", new DataSourceSpout());
builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout");
builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt");
The data flow is: DataSourceSpout (emits "line") -> SplitBolt (emits "word") -> CountBolt (terminal, prints counts).
Code Reference
SplitBolt
Class Declaration
public class SplitBolt extends BaseRichBolt {
private OutputCollector collector;
prepare()
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
Stores the OutputCollector reference for emitting tuples downstream.
execute()
@Override
public void execute(Tuple input) {
String line = input.getStringByField("line");
String[] words = line.split("\t");
for (String word : words) {
collector.emit(new Values(word));
}
}
Processing logic:
- Reads the "line" field from the input tuple (a tab-separated string of words).
- Splits the line by the tab character (
\t). - Iterates over the resulting words and emits each as an individual tuple with a single field.
declareOutputFields()
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
Declares that SplitBolt emits tuples with a single field named "word".
CountBolt
Class Declaration
public class CountBolt extends BaseRichBolt {
private Map<String, Integer> counts = new HashMap<>();
Maintains an in-memory HashMap to track the running count for each word.
prepare()
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
// No initialization needed for this terminal bolt
}
The OutputCollector is not stored because CountBolt is a terminal Bolt that does not emit downstream tuples.
execute()
@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
Integer count = counts.get(word);
if (count == null) {
count = 0;
}
count++;
counts.put(word, count);
System.out.print("Real-time analysis results : ");
counts.forEach((key, value) -> System.out.print(key + ":" + value + "; "));
System.out.println();
}
Processing logic:
- Reads the "word" field from the input tuple.
- Looks up the current count in the HashMap (defaults to 0 if not found).
- Increments the count and stores it back.
- Prints the complete set of word counts to standard output.
declareOutputFields()
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// Terminal bolt: no output fields declared
}
Empty implementation because CountBolt is a terminal Bolt (sink) that does not emit tuples.
I/O Contract
SplitBolt
| Direction | Field Name | Type | Description |
|---|---|---|---|
| Input | line |
String | Tab-separated string of words from DataSourceSpout |
| Output | word |
String | Individual word extracted from the input line |
Cardinality: One input tuple produces N output tuples, where N is the number of tab-separated tokens in the input line (1 to 6).
CountBolt
| Direction | Field Name | Type | Description |
|---|---|---|---|
| Input | word |
String | Individual word from SplitBolt |
| Output | (none) | -- | Terminal bolt; output is printed to stdout |
Side effect: Prints accumulated word counts to standard output after each tuple is processed.
Usage Examples
Example: Observing the Processing Pipeline
Given the Spout emits the line "Spark\tHadoop\tStorm":
- SplitBolt receives the tuple
{line: "Spark\tHadoop\tStorm"}. - SplitBolt emits three tuples:
{word: "Spark"},{word: "Hadoop"},{word: "Storm"}. - CountBolt processes each word tuple and prints running totals:
Real-time analysis results : Spark:1;
Real-time analysis results : Spark:1; Hadoop:1;
Real-time analysis results : Spark:1; Hadoop:1; Storm:1;
Example: Full Topology Wiring
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout", new DataSourceSpout());
builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout");
builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("WordCount", new Config(), builder.createTopology());
Related Pages
| Relationship | Page |
|---|---|
| implements | Heibaiying_BigData_Notes_Storm_Bolt_Implementation |
| related | Heibaiying_BigData_Notes_DataSourceSpout_Implementation |
| related | Heibaiying_BigData_Notes_TopologyBuilder_Usage |