Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:TA Lib Ta lib python Stream Wrapper Integration

From Leeroopedia


Knowledge Sources
Domains Real_Time_Processing, Technical_Analysis
Last Updated 2026-02-09 22:00 GMT

Overview

Concrete tool for integrating streaming functions with the _wrapper decorator system, enabling pandas/polars input support while returning scalar outputs, provided by the talib.__init__ module wrapping loop.

Description

At import time, the talib/__init__.py module wraps all streaming functions with the _wrapper decorator in a loop:

stream_func_names = ['stream_%s' % fname for fname in __TA_FUNCTION_NAMES__]
stream = __import__("stream", globals(), locals(), stream_func_names, level=1)
for func_name, stream_func_name in zip(__TA_FUNCTION_NAMES__, stream_func_names):
    wrapped_func = _wrapper(getattr(stream, func_name))
    globals()[stream_func_name] = wrapped_func

The _wrapper converts pandas/polars input to numpy but detects scalar streaming output and returns it as-is.

Usage

Call streaming functions via talib.stream_SMA() (with _wrapper support) or directly via talib.stream.SMA() (without _wrapper).

Code Reference

Source Location

  • Repository: ta-lib-python
  • File: talib/__init__.py
  • Lines: L128-133 (stream function wrapping loop), L82-86 (streaming result detection in _wrapper)
  • Also: talib/stream.py:L1-6 (module globals population)

Signature

# Wrapped streaming functions available as:
talib.stream_SMA(real, timeperiod=30)   # Accepts pd.Series/pl.Series
talib.stream_RSI(real, timeperiod=14)
talib.stream_BBANDS(real, timeperiod=5, nbdevup=2.0, nbdevdn=2.0, matype=0)
# ... all 161 streaming functions

Import

import talib
# talib.stream_SMA, talib.stream_RSI, etc.

I/O Contract

Inputs

Name Type Required Description
real np.ndarray / pd.Series / pl.Series Yes Price data buffer (auto-converted by _wrapper)
timeperiod int No Lookback period

Outputs

Name Type Description
scalar_result float Latest indicator value (always a scalar, never Series)

Usage Examples

With Pandas Series

import pandas as pd
import talib

close = pd.Series([44.0, 44.3, 44.1, 43.6, 44.3, 44.8, 45.1, 44.9, 45.2, 45.0])
latest = talib.stream_SMA(close, timeperiod=5)
print(type(latest))  # float (not pd.Series)
print(latest)        # 44.8 (scalar)

Direct Stream Module

import numpy as np
from talib import stream

data = np.random.random(100)
latest_rsi = stream.RSI(data, timeperiod=14)
print(f"Latest RSI: {latest_rsi:.2f}")

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment