Implementation:Triton inference server Server L0 Long Running Stress
L0 Long Running Stress
Source File: qa/L0_long_running_stress/stress.py
Language: Python (657 lines)
Domains: Testing, Stress_Testing
Purpose
This Python module is the main stress test runner for Triton Inference Server. It orchestrates multiple concurrent threads that each execute randomly weighted scenarios (sequence inference, timeout handling, crashing clients, image classification, and perf_analyzer load generation) over a configurable duration. The runner tracks per-scenario success/failure counts, generates a formatted report, and supports dedicated load threads for sustained GPU utilization.
Signature
# Constants:
CORRELATION_ID_BLOCK_SIZE = 1024 * 1024
BACKENDS = os.environ.get("BACKENDS", "onnx plan")
ALLOW_FAILURE_SCENARIO = [PerfAnalyzerScenario.__name__]
STOP_STRESS_THREAD = False
# Key functions:
def get_trials(is_sequence=True) -> tuple
def update_test_count(test_case_count, failed_test_case_count,
request_count, test_case_name, success=True, count=1)
class ScenarioSelector:
"""Weighted random scenario selection using cumulative probability."""
def __init__(self, probs, rng)
def get_scenario(self) -> Scenario
def stress_thread(name, seed, correlation_id_base,
test_case_count, failed_test_case_count, sequence_request_count)
def load_thread(name, seed, correlation_id_base,
test_case_count, failed_test_case_count, sequence_request_count)
def format_content(content, max_line_length) -> str
def accumulate_count(dict_list, test_case_name) -> int
def generate_report(elapsed_time, _test_case_count,
_failed_test_case_count, _sequence_request_count)
Key Components
ScenarioSelector
Implements weighted random selection of test scenarios. Weights are normalized into a cumulative probability distribution and selection uses binary search via bisect.
class ScenarioSelector:
def __init__(self, probs, rng):
total_weight = 0
for weight, scenario in probs:
total_weight += weight
self.scenarios_.append(scenario)
self.probs_range_.append(float(total_weight))
for i in range(len(self.probs_range_)):
self.probs_range_[i] /= total_weight
def get_scenario(self):
return self.scenarios_[bisect.bisect_left(self.probs_range_, self.rng_.rand())]
Stress Thread
Each stress thread creates multiple gRPC client contexts (2 common + 8 rare) with reserved correlation ID blocks and continuously runs randomly selected scenarios. The scenario weight distribution (in approximate thousandths):
| Weight | Scenario | Description |
|---|---|---|
| 60 | TimeoutScenario | Deadline exceeded testing |
| 80 | ResNetScenario | Image classification load |
| 60 | CrashingScenario | Client crash resilience |
| 62 | SequenceNoEndScenario | Sequences without end flag |
| 68 | SequenceValidNoEndScenario | Valid then no-end sequence pairs |
| 68 | SequenceValidValidScenario | Back-to-back valid sequences |
| 7 | SequenceNoStartScenario | Missing start flag (error case) |
| 295 | SequenceValidScenario | Standard valid sequences (highest weight) |
| 300 | PerfAnalyzerScenario | perf_analyzer load generation (highest weight) |
def stress_thread(name, seed, correlation_id_base, ...):
# 10% chance of using a rare context (exercises idle sequence paths)
if rng.rand() < 0.1:
client_idx = common_cnt + rare_idx
else:
client_idx = common_idx
try:
res = scenario.run(client_metadata_list[client_idx])
if res is not None:
update_counter_fn(scenario.scenario_name(), count=res)
except Exception as ex:
update_counter_fn(scenario.scenario_name(), False)
Load Thread
Dedicated load threads run only PerfAnalyzerScenario with the resnet_v1_50 model to maintain consistent GPU utilization, complementing the bursty stress threads.
Report Generation
Produces a formatted table using prettytable with columns for test case name, failure count, test count, request count, and description. Results are aggregated across all threads and written to stress_report.txt.
def generate_report(elapsed_time, ...):
t = prettytable.PrettyTable(hrules=prettytable.ALL)
t.field_names = [
"Test Case", "Number of Failures", "Test Count",
"Request Count", "Test Case Description"
]
# ... aggregates counts across all threads
Command-Line Interface
parser.add_argument("-v", "--verbose", ...)
parser.add_argument("-r", "--random-seed", type=int, ...)
parser.add_argument("-t", "--concurrency", type=int, default=8, ...)
parser.add_argument("--load-thread", type=int, default=0, ...)
parser.add_argument("-d", "--test-duration", type=int, default=25000,
help="Duration in seconds (default ~7 hours)")
Thread Management
The main loop monitors threads for early termination and enforces the test duration. On completion, STOP_STRESS_THREAD is set and threads are joined with a 300-second timeout to detect hangs. Each thread's gRPC clients are explicitly closed to prevent stream cleanup issues.
while (time.time() - start_time) < FLAGS.test_duration:
time.sleep(1)
for t in threads:
if not t.is_alive():
exit_code = 1
break
STOP_STRESS_THREAD = True
for t in threads:
t.join(timeout=300)
if t.is_alive() and (exit_code == 0):
exit_code = 1
Failure Handling
Thread exceptions are collected in a thread-safe list. Scenarios in ALLOW_FAILURE_SCENARIO (currently only PerfAnalyzerScenario) do not cause test failure, though their failures are still reported.
Test Flow
- Parse command-line arguments (seed, concurrency, duration, load threads)
- Initialize per-thread tracking dictionaries
- Launch stress threads with weighted scenario selection
- Launch optional load threads with PerfAnalyzer only
- Monitor for early thread termination during test duration
- Signal threads to stop and join with timeout
- Generate formatted report to stdout and file
- Report collected exceptions and exit with appropriate code
Dependencies
scenariosmodule - All scenario class definitionstritonclient.grpc- gRPC Triton clientnumpy- Random number generationprettytable- Formatted report outputtest_util(tu) - Model name utilities