Implementation:Online ml River Stream TwitterLiveStream
| Knowledge Sources | |
|---|---|
| Domains | Online_Learning, Data_Streaming, Social_Media, Twitter |
| Last Updated | 2026-02-08 16:00 GMT |
Overview
Twitter API v2 client for streaming live tweets in real-time based on filtering rules.
Description
The TwitterLiveStream class provides access to Twitter's filtered stream API v2, allowing real-time collection of tweets matching specified rules. It handles authentication, rule management, and streaming logic. The client automatically sets up and tears down filtering rules, and yields tweet objects with metadata including author information and matching rules. Requires the requests library and a Twitter developer bearer token.
Usage
Use this for real-time sentiment analysis, trend detection, or online learning on live social media data. Ideal for monitoring specific topics, tracking user activity, or building adaptive models that respond to current events. Requires Twitter API v2 access.
Code Reference
Source Location
- Repository: Online_ml_River
- File: river/stream/tweet_stream.py
Signature
class TwitterLiveStream:
def __init__(self, rules, bearer_token):
...
def __iter__(self):
...
Import
from river import stream
I/O Contract
| Parameter | Type | Description |
|---|---|---|
| rules | list[str] | Filtering rules (e.g., ["from:BBCBreaking"]) |
| bearer_token | str | Twitter API v2 bearer token |
Returns (via iteration):
| Type | Description |
|---|---|
| dict | Tweet object with 'data', 'includes', 'matching_rules' |
Usage Examples
from river import stream
import time
import logging
# Initialize with filtering rules and bearer token
tweets = stream.TwitterLiveStream(
rules=["from:BBCBreaking", "from:cnnbrk"],
bearer_token="<your_bearer_token_here>"
)
# Stream tweets in a loop with error handling
# (recommended for production use)
while True:
try:
for tweet in tweets:
# Extract tweet information
text = tweet['data']['text']
author_id = tweet['data']['author_id']
created_at = tweet['data']['created_at']
# Get author info from includes
author = tweet['includes']['users'][0]
username = author['username']
print(f"{username}: {text[:100]}...")
print(f"Posted at: {created_at}")
print()
except requests.exceptions.RequestException as e:
logging.warning(f"Connection error: {e}")
time.sleep(10) # Wait before reconnecting
# Example tweet structure:
# {
# 'data': {
# 'author_id': '428333',
# 'created_at': '2022-08-26T12:59:48.000Z',
# 'id': '1563149212774445058',
# 'text': "Ukraine's Zaporizhzhia nuclear power plant..."
# },
# 'includes': {
# 'users': [{
# 'created_at': '2007-01-02T01:48:14.000Z',
# 'id': '428333',
# 'name': 'CNN Breaking News',
# 'username': 'cnnbrk'
# }]
# },
# 'matching_rules': [{'id': '1563148866333151233', 'tag': 'from:cnnbrk'}]
# }
# Filtering rules can use various operators:
# - from:username - tweets from specific user
# - #hashtag - tweets with hashtag
# - keyword - tweets containing keyword
# - lang:en - tweets in English
# See Twitter API docs for full rule syntax