Implementation:Heibaiying BigData Notes DataSourceSpout Implementation
Overview
| Property | Value |
|---|---|
| Type | API Doc |
| Class | com.heibaiying.wordcount.component.DataSourceSpout
|
| Extends | org.apache.storm.topology.base.BaseRichSpout
|
| Source | code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/component/DataSourceSpout.java:L14-49
|
Description
DataSourceSpout is a concrete Spout implementation that serves as the data source for the word-count topology. Rather than connecting to an external system, it generates synthetic data by randomly selecting and combining words from a predefined list. This makes it ideal for development, testing, and demonstration of Storm topology mechanics without requiring external infrastructure.
The class extends BaseRichSpout, which provides default no-op implementations for optional lifecycle methods (such as ack(), fail(), and close()), allowing the developer to focus on the three essential methods: open(), nextTuple(), and declareOutputFields().
Usage
This Spout is wired into a topology using TopologyBuilder.setSpout():
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout", new DataSourceSpout());
Once the topology is submitted, Storm will repeatedly call nextTuple() on this Spout, which emits tab-separated word lines at a rate of approximately one tuple per second.
Code Reference
Class Declaration
public class DataSourceSpout extends BaseRichSpout {
private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");
private SpoutOutputCollector spoutOutputCollector;
The class maintains a WORD_LIST of six big-data framework names and a reference to the SpoutOutputCollector for emitting tuples.
open()
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
Called once when the Spout is initialized. Stores the SpoutOutputCollector reference for later use in nextTuple(). The Map parameter contains the Storm configuration, and TopologyContext provides metadata about the topology (task ID, component ID, etc.).
nextTuple()
@Override
public void nextTuple() {
String lineData = productData();
spoutOutputCollector.emit(new Values(lineData));
Utils.sleep(1000);
}
Called repeatedly by the executor thread. Each invocation:
- Generates a random line of tab-separated words via the
productData()helper method. - Emits the line as a single-field tuple using
new Values(lineData). - Sleeps for 1000 milliseconds to throttle the emission rate.
productData()
private String productData() {
Collections.shuffle(list);
Random random = new Random();
int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
return StringUtils.join(list.toArray(), "\t", 0, endIndex);
}
A private helper method that:
- Shuffles the word list randomly.
- Picks a random end index between 1 and the list size.
- Joins the first N words with a tab (
\t) separator using Apache CommonsStringUtils.join().
This produces output such as "Flink\tStorm\tHadoop" or "HBase".
declareOutputFields()
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("line"));
}
Declares that this Spout emits tuples with a single field named "line". Downstream Bolts read this field using input.getStringByField("line").
I/O Contract
| Direction | Field Name | Type | Description |
|---|---|---|---|
| Output | line |
String | Tab-separated string of randomly selected words (e.g., "Spark\tHadoop\tHBase")
|
Input: None (this is a source component).
Output schema: A single field "line" containing a tab-delimited string of 1 to 6 words drawn from the set {Spark, Hadoop, HBase, Storm, Flink, Hive}.
Emission rate: Approximately one tuple per second (controlled by Utils.sleep(1000)).
Usage Examples
Example 1: Local Topology with DataSourceSpout
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout", new DataSourceSpout());
builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout");
builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("WordCount", new Config(), builder.createTopology());
Example 2: Sample Output
When the topology runs, the Spout produces tuples like:
Flink Storm Hadoop
HBase Spark
Hive Flink Storm HBase Hadoop Spark
Storm
Each line is a single tuple emitted with the field name "line".
Related Pages
| Relationship | Page |
|---|---|
| implements | Heibaiying_BigData_Notes_Storm_Spout_Implementation |
| related | Heibaiying_BigData_Notes_SplitBolt_and_CountBolt |
| related | Heibaiying_BigData_Notes_TopologyBuilder_Usage |