Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Heibaiying BigData Notes SplitBolt and CountBolt

From Leeroopedia


Overview

Property Value
Type API Doc
Classes com.heibaiying.wordcount.component.SplitBolt, com.heibaiying.wordcount.component.CountBolt
Extends org.apache.storm.topology.base.BaseRichBolt
Source (SplitBolt) code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/component/SplitBolt.java:L13-35
Source (CountBolt) code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/component/CountBolt.java:L12-40

Description

SplitBolt and CountBolt are two complementary Bolt implementations that form the processing pipeline in the word-count topology. Together with the DataSourceSpout, they demonstrate the classic MapReduce-style pattern in a streaming context:

  • SplitBolt acts as the mapper: it receives tab-separated word lines from the Spout and splits them into individual words, emitting one tuple per word.
  • CountBolt acts as the reducer: it receives individual words and maintains a running count for each word in an in-memory HashMap, printing the accumulated results.

Both classes extend BaseRichBolt and implement the three required lifecycle methods: prepare(), execute(), and declareOutputFields().

Usage

These Bolts are wired into the topology using TopologyBuilder.setBolt() with appropriate stream groupings:

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout", new DataSourceSpout());
builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout");
builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt");

The data flow is: DataSourceSpout (emits "line") -> SplitBolt (emits "word") -> CountBolt (terminal, prints counts).

Code Reference

SplitBolt

Class Declaration

public class SplitBolt extends BaseRichBolt {

    private OutputCollector collector;

prepare()

@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    this.collector = collector;
}

Stores the OutputCollector reference for emitting tuples downstream.

execute()

@Override
public void execute(Tuple input) {
    String line = input.getStringByField("line");
    String[] words = line.split("\t");
    for (String word : words) {
        collector.emit(new Values(word));
    }
}

Processing logic:

  1. Reads the "line" field from the input tuple (a tab-separated string of words).
  2. Splits the line by the tab character (\t).
  3. Iterates over the resulting words and emits each as an individual tuple with a single field.

declareOutputFields()

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
}

Declares that SplitBolt emits tuples with a single field named "word".

CountBolt

Class Declaration

public class CountBolt extends BaseRichBolt {

    private Map<String, Integer> counts = new HashMap<>();

Maintains an in-memory HashMap to track the running count for each word.

prepare()

@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    // No initialization needed for this terminal bolt
}

The OutputCollector is not stored because CountBolt is a terminal Bolt that does not emit downstream tuples.

execute()

@Override
public void execute(Tuple input) {
    String word = input.getStringByField("word");
    Integer count = counts.get(word);
    if (count == null) {
        count = 0;
    }
    count++;
    counts.put(word, count);
    System.out.print("Real-time analysis results : ");
    counts.forEach((key, value) -> System.out.print(key + ":" + value + "; "));
    System.out.println();
}

Processing logic:

  1. Reads the "word" field from the input tuple.
  2. Looks up the current count in the HashMap (defaults to 0 if not found).
  3. Increments the count and stores it back.
  4. Prints the complete set of word counts to standard output.

declareOutputFields()

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    // Terminal bolt: no output fields declared
}

Empty implementation because CountBolt is a terminal Bolt (sink) that does not emit tuples.

I/O Contract

SplitBolt

Direction Field Name Type Description
Input line String Tab-separated string of words from DataSourceSpout
Output word String Individual word extracted from the input line

Cardinality: One input tuple produces N output tuples, where N is the number of tab-separated tokens in the input line (1 to 6).

CountBolt

Direction Field Name Type Description
Input word String Individual word from SplitBolt
Output (none) -- Terminal bolt; output is printed to stdout

Side effect: Prints accumulated word counts to standard output after each tuple is processed.

Usage Examples

Example: Observing the Processing Pipeline

Given the Spout emits the line "Spark\tHadoop\tStorm":

  1. SplitBolt receives the tuple {line: "Spark\tHadoop\tStorm"}.
  2. SplitBolt emits three tuples: {word: "Spark"}, {word: "Hadoop"}, {word: "Storm"}.
  3. CountBolt processes each word tuple and prints running totals:
Real-time analysis results : Spark:1;
Real-time analysis results : Spark:1; Hadoop:1;
Real-time analysis results : Spark:1; Hadoop:1; Storm:1;

Example: Full Topology Wiring

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout", new DataSourceSpout());
builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout");
builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt");

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("WordCount", new Config(), builder.createTopology());

Related Pages

Relationship Page
implements Heibaiying_BigData_Notes_Storm_Bolt_Implementation
related Heibaiying_BigData_Notes_DataSourceSpout_Implementation
related Heibaiying_BigData_Notes_TopologyBuilder_Usage

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment