Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:TA Lib Ta lib python Stream Validation Pattern

From Leeroopedia
Revision as of 16:48, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/TA_Lib_Ta_lib_python_Stream_Validation_Pattern.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Testing, Technical_Analysis
Last Updated 2026-02-09 22:00 GMT

Overview

Concrete validation pattern for verifying streaming API correctness by comparing stream.FUNC(data) against talib.FUNC(data)[-1] as implemented in the test suite.

Description

This is a Pattern Doc documenting the validation approach used in tests/test_stream.py. The test iterates over indicators and asserts that streaming results match the last element of full computations.

Usage

Use this pattern to validate your streaming integration in automated tests.

Code Reference

Source Location

  • Repository: ta-lib-python
  • File: tests/test_stream.py
  • Lines: L9-22 (test_streaming function)

Signature

# Validation pattern (from test_stream.py)
def validate_streaming_equivalence(data, func_name, **params):
    stream_result = getattr(stream, func_name)(data, **params)
    full_result = getattr(talib, func_name)(data, **params)

    if isinstance(full_result, tuple):
        for s, f in zip(stream_result, full_result):
            assert s == f[-1] or (np.isnan(s) and np.isnan(f[-1]))
    else:
        last = full_result[-1]
        assert stream_result == last or (np.isnan(stream_result) and np.isnan(last))

Import

import numpy as np
import talib
from talib import stream

I/O Contract

Inputs

Name Type Required Description
data np.ndarray Yes Same price buffer passed to both stream and full functions
params kwargs No Same parameters for both calls

Outputs

Name Type Description
is_valid bool True if stream result matches last element of full computation

Usage Examples

Validate SMA

import numpy as np
import talib
from talib import stream

data = np.random.random(100)

stream_val = stream.SMA(data, timeperiod=20)
full_val = talib.SMA(data, timeperiod=20)[-1]

assert stream_val == full_val, f"Mismatch: {stream_val} != {full_val}"
print("SMA streaming validation passed")

Validate All Indicators

import numpy as np
import talib
from talib import stream

data = np.random.random(100)

for name in ['SMA', 'EMA', 'RSI', 'MOM']:
    s = getattr(stream, name)(data, timeperiod=14)
    f = getattr(talib, name)(data, timeperiod=14)[-1]
    if np.isnan(s) and np.isnan(f):
        continue
    assert s == f, f"{name} mismatch"
print("All streaming validations passed")

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment