Implementation:TA Lib Ta lib python Stream Validation Pattern
Appearance
| Knowledge Sources | |
|---|---|
| Domains | Testing, Technical_Analysis |
| Last Updated | 2026-02-09 22:00 GMT |
Overview
Concrete validation pattern for verifying streaming API correctness by comparing stream.FUNC(data) against talib.FUNC(data)[-1] as implemented in the test suite.
Description
This is a Pattern Doc documenting the validation approach used in tests/test_stream.py. The test iterates over indicators and asserts that streaming results match the last element of full computations.
Usage
Use this pattern to validate your streaming integration in automated tests.
Code Reference
Source Location
- Repository: ta-lib-python
- File: tests/test_stream.py
- Lines: L9-22 (test_streaming function)
Signature
# Validation pattern (from test_stream.py)
def validate_streaming_equivalence(data, func_name, **params):
stream_result = getattr(stream, func_name)(data, **params)
full_result = getattr(talib, func_name)(data, **params)
if isinstance(full_result, tuple):
for s, f in zip(stream_result, full_result):
assert s == f[-1] or (np.isnan(s) and np.isnan(f[-1]))
else:
last = full_result[-1]
assert stream_result == last or (np.isnan(stream_result) and np.isnan(last))
Import
import numpy as np
import talib
from talib import stream
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| data | np.ndarray | Yes | Same price buffer passed to both stream and full functions |
| params | kwargs | No | Same parameters for both calls |
Outputs
| Name | Type | Description |
|---|---|---|
| is_valid | bool | True if stream result matches last element of full computation |
Usage Examples
Validate SMA
import numpy as np
import talib
from talib import stream
data = np.random.random(100)
stream_val = stream.SMA(data, timeperiod=20)
full_val = talib.SMA(data, timeperiod=20)[-1]
assert stream_val == full_val, f"Mismatch: {stream_val} != {full_val}"
print("SMA streaming validation passed")
Validate All Indicators
import numpy as np
import talib
from talib import stream
data = np.random.random(100)
for name in ['SMA', 'EMA', 'RSI', 'MOM']:
s = getattr(stream, name)(data, timeperiod=14)
f = getattr(talib, name)(data, timeperiod=14)[-1]
if np.isnan(s) and np.isnan(f):
continue
assert s == f, f"{name} mismatch"
print("All streaming validations passed")
Related Pages
Implements Principle
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment