Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Heibaiying BigData Notes DataSourceSpout Implementation

From Leeroopedia


Overview

Property Value
Type API Doc
Class com.heibaiying.wordcount.component.DataSourceSpout
Extends org.apache.storm.topology.base.BaseRichSpout
Source code/Storm/storm-word-count/src/main/java/com/heibaiying/wordcount/component/DataSourceSpout.java:L14-49

Description

DataSourceSpout is a concrete Spout implementation that serves as the data source for the word-count topology. Rather than connecting to an external system, it generates synthetic data by randomly selecting and combining words from a predefined list. This makes it ideal for development, testing, and demonstration of Storm topology mechanics without requiring external infrastructure.

The class extends BaseRichSpout, which provides default no-op implementations for optional lifecycle methods (such as ack(), fail(), and close()), allowing the developer to focus on the three essential methods: open(), nextTuple(), and declareOutputFields().

Usage

This Spout is wired into a topology using TopologyBuilder.setSpout():

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout", new DataSourceSpout());

Once the topology is submitted, Storm will repeatedly call nextTuple() on this Spout, which emits tab-separated word lines at a rate of approximately one tuple per second.

Code Reference

Class Declaration

public class DataSourceSpout extends BaseRichSpout {

    private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");

    private SpoutOutputCollector spoutOutputCollector;

The class maintains a WORD_LIST of six big-data framework names and a reference to the SpoutOutputCollector for emitting tuples.

open()

@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
    this.spoutOutputCollector = spoutOutputCollector;
}

Called once when the Spout is initialized. Stores the SpoutOutputCollector reference for later use in nextTuple(). The Map parameter contains the Storm configuration, and TopologyContext provides metadata about the topology (task ID, component ID, etc.).

nextTuple()

@Override
public void nextTuple() {
    String lineData = productData();
    spoutOutputCollector.emit(new Values(lineData));
    Utils.sleep(1000);
}

Called repeatedly by the executor thread. Each invocation:

  1. Generates a random line of tab-separated words via the productData() helper method.
  2. Emits the line as a single-field tuple using new Values(lineData).
  3. Sleeps for 1000 milliseconds to throttle the emission rate.

productData()

private String productData() {
    Collections.shuffle(list);
    Random random = new Random();
    int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
    return StringUtils.join(list.toArray(), "\t", 0, endIndex);
}

A private helper method that:

  1. Shuffles the word list randomly.
  2. Picks a random end index between 1 and the list size.
  3. Joins the first N words with a tab (\t) separator using Apache Commons StringUtils.join().

This produces output such as "Flink\tStorm\tHadoop" or "HBase".

declareOutputFields()

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    outputFieldsDeclarer.declare(new Fields("line"));
}

Declares that this Spout emits tuples with a single field named "line". Downstream Bolts read this field using input.getStringByField("line").

I/O Contract

Direction Field Name Type Description
Output line String Tab-separated string of randomly selected words (e.g., "Spark\tHadoop\tHBase")

Input: None (this is a source component).

Output schema: A single field "line" containing a tab-delimited string of 1 to 6 words drawn from the set {Spark, Hadoop, HBase, Storm, Flink, Hive}.

Emission rate: Approximately one tuple per second (controlled by Utils.sleep(1000)).

Usage Examples

Example 1: Local Topology with DataSourceSpout

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout", new DataSourceSpout());
builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout");
builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt");

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("WordCount", new Config(), builder.createTopology());

Example 2: Sample Output

When the topology runs, the Spout produces tuples like:

Flink	Storm	Hadoop
HBase	Spark
Hive	Flink	Storm	HBase	Hadoop	Spark
Storm

Each line is a single tuple emitted with the field name "line".

Related Pages

Relationship Page
implements Heibaiying_BigData_Notes_Storm_Spout_Implementation
related Heibaiying_BigData_Notes_SplitBolt_and_CountBolt
related Heibaiying_BigData_Notes_TopologyBuilder_Usage

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment