Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Triton inference server Server L0 Long Running Stress

From Leeroopedia


L0 Long Running Stress

Source File: qa/L0_long_running_stress/stress.py
Language: Python (657 lines)
Domains: Testing, Stress_Testing

Purpose

This Python module is the main stress test runner for Triton Inference Server. It orchestrates multiple concurrent threads that each execute randomly weighted scenarios (sequence inference, timeout handling, crashing clients, image classification, and perf_analyzer load generation) over a configurable duration. The runner tracks per-scenario success/failure counts, generates a formatted report, and supports dedicated load threads for sustained GPU utilization.

Signature

# Constants:
CORRELATION_ID_BLOCK_SIZE = 1024 * 1024
BACKENDS = os.environ.get("BACKENDS", "onnx plan")
ALLOW_FAILURE_SCENARIO = [PerfAnalyzerScenario.__name__]
STOP_STRESS_THREAD = False

# Key functions:
def get_trials(is_sequence=True) -> tuple
def update_test_count(test_case_count, failed_test_case_count,
                      request_count, test_case_name, success=True, count=1)

class ScenarioSelector:
    """Weighted random scenario selection using cumulative probability."""
    def __init__(self, probs, rng)
    def get_scenario(self) -> Scenario

def stress_thread(name, seed, correlation_id_base,
                  test_case_count, failed_test_case_count, sequence_request_count)
def load_thread(name, seed, correlation_id_base,
                test_case_count, failed_test_case_count, sequence_request_count)
def format_content(content, max_line_length) -> str
def accumulate_count(dict_list, test_case_name) -> int
def generate_report(elapsed_time, _test_case_count,
                    _failed_test_case_count, _sequence_request_count)

Key Components

ScenarioSelector

Implements weighted random selection of test scenarios. Weights are normalized into a cumulative probability distribution and selection uses binary search via bisect.

class ScenarioSelector:
    def __init__(self, probs, rng):
        total_weight = 0
        for weight, scenario in probs:
            total_weight += weight
            self.scenarios_.append(scenario)
            self.probs_range_.append(float(total_weight))
        for i in range(len(self.probs_range_)):
            self.probs_range_[i] /= total_weight

    def get_scenario(self):
        return self.scenarios_[bisect.bisect_left(self.probs_range_, self.rng_.rand())]

Stress Thread

Each stress thread creates multiple gRPC client contexts (2 common + 8 rare) with reserved correlation ID blocks and continuously runs randomly selected scenarios. The scenario weight distribution (in approximate thousandths):

Weight Scenario Description
60 TimeoutScenario Deadline exceeded testing
80 ResNetScenario Image classification load
60 CrashingScenario Client crash resilience
62 SequenceNoEndScenario Sequences without end flag
68 SequenceValidNoEndScenario Valid then no-end sequence pairs
68 SequenceValidValidScenario Back-to-back valid sequences
7 SequenceNoStartScenario Missing start flag (error case)
295 SequenceValidScenario Standard valid sequences (highest weight)
300 PerfAnalyzerScenario perf_analyzer load generation (highest weight)
def stress_thread(name, seed, correlation_id_base, ...):
    # 10% chance of using a rare context (exercises idle sequence paths)
    if rng.rand() < 0.1:
        client_idx = common_cnt + rare_idx
    else:
        client_idx = common_idx

    try:
        res = scenario.run(client_metadata_list[client_idx])
        if res is not None:
            update_counter_fn(scenario.scenario_name(), count=res)
    except Exception as ex:
        update_counter_fn(scenario.scenario_name(), False)

Load Thread

Dedicated load threads run only PerfAnalyzerScenario with the resnet_v1_50 model to maintain consistent GPU utilization, complementing the bursty stress threads.

Report Generation

Produces a formatted table using prettytable with columns for test case name, failure count, test count, request count, and description. Results are aggregated across all threads and written to stress_report.txt.

def generate_report(elapsed_time, ...):
    t = prettytable.PrettyTable(hrules=prettytable.ALL)
    t.field_names = [
        "Test Case", "Number of Failures", "Test Count",
        "Request Count", "Test Case Description"
    ]
    # ... aggregates counts across all threads

Command-Line Interface

parser.add_argument("-v", "--verbose", ...)
parser.add_argument("-r", "--random-seed", type=int, ...)
parser.add_argument("-t", "--concurrency", type=int, default=8, ...)
parser.add_argument("--load-thread", type=int, default=0, ...)
parser.add_argument("-d", "--test-duration", type=int, default=25000,
    help="Duration in seconds (default ~7 hours)")

Thread Management

The main loop monitors threads for early termination and enforces the test duration. On completion, STOP_STRESS_THREAD is set and threads are joined with a 300-second timeout to detect hangs. Each thread's gRPC clients are explicitly closed to prevent stream cleanup issues.

while (time.time() - start_time) < FLAGS.test_duration:
    time.sleep(1)
    for t in threads:
        if not t.is_alive():
            exit_code = 1
            break

STOP_STRESS_THREAD = True
for t in threads:
    t.join(timeout=300)
    if t.is_alive() and (exit_code == 0):
        exit_code = 1

Failure Handling

Thread exceptions are collected in a thread-safe list. Scenarios in ALLOW_FAILURE_SCENARIO (currently only PerfAnalyzerScenario) do not cause test failure, though their failures are still reported.

Test Flow

  1. Parse command-line arguments (seed, concurrency, duration, load threads)
  2. Initialize per-thread tracking dictionaries
  3. Launch stress threads with weighted scenario selection
  4. Launch optional load threads with PerfAnalyzer only
  5. Monitor for early thread termination during test duration
  6. Signal threads to stop and join with timeout
  7. Generate formatted report to stdout and file
  8. Report collected exceptions and exit with appropriate code

Dependencies

  • scenarios module - All scenario class definitions
  • tritonclient.grpc - gRPC Triton client
  • numpy - Random number generation
  • prettytable - Formatted report output
  • test_util (tu) - Model name utilities

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment