Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Triton inference server Server L0 Opentelemetry Unittest

From Leeroopedia


L0 OpenTelemetry Unittest

Source File: qa/L0_trace/opentelemetry_unittest.py
Language: Python (1054 lines)
Domains: Testing, Tracing

Purpose

This Python test module provides comprehensive unit tests for OpenTelemetry (OTel) tracing integration in Triton Inference Server. It validates that traces exported via the OTLP protocol have correct span hierarchy, event content, resource attributes, and client context propagation across HTTP, gRPC, and streaming gRPC protocols. The tests cover simple models, ensemble models, BLS chains, custom backend tracing, non-decoupled models, request cancellation traces, SageMaker endpoint tracing, trace rate control, and Python backend trace context exposure.

Signature

# Module-level functions:
def callback(user_data, result, error)
def prepare_data(client, is_binary=True) -> list
def send_bls_request(model_name="simple", headers=None)

# Key classes:
class UserData:
    """Container for async request results."""

class OpenTelemetryTest(tu.TestResultCollector):
    """Main test class with 19 test methods for OTel tracing validation."""

    # Helper methods:
    def _parse_trace_log(self, trace_log) -> list
    def _check_events(self, span_name, events, is_cancelled)
    def _test_resource_attributes(self, attributes)
    def _verify_contents(self, spans, expected_counts, is_cancelled)
    def _verify_nesting(self, spans, expected_parent_span_dict)
    def _verify_headers_propagated_from_client_if_any(self, root_span, headers)
    def _test_trace(self, headers, expected_number_of_spans, expected_counts, expected_parent_span_dict)
    def _test_simple_trace(self, headers=None)
    def _test_custom_identity_trace(self, headers=None)
    def _test_non_decoupled_trace(self, headers=None)
    def _test_bls_trace(self, headers=None)
    def _test_ensemble_trace(self, headers=None)
    def _test_trace_cancel(self, is_queued)

    # Test methods (19 total):
    def test_http_trace_simple_model(self)
    def test_http_trace_simple_model_context_propagation(self)
    def test_grpc_trace_simple_model(self)
    def test_grpc_trace_all_input_required_model_cancel(self)
    def test_grpc_trace_model_cancel_in_queue(self)
    def test_non_decoupled(self)
    def test_grpc_trace_simple_model_context_propagation(self)
    def test_streaming_grpc_trace_simple_model(self)
    def test_streaming_grpc_trace_simple_model_context_propagation(self)
    def test_http_trace_bls_model(self)
    def test_http_trace_bls_model_context_propagation(self)
    def test_http_trace_ensemble_model(self)
    def test_http_trace_ensemble_model_context_propagation(self)
    def test_http_trace_triggered(self)
    def test_sagemaker_invocation_trace_simple_model_context_propagation(self)
    def test_sagemaker_invoke_trace_simple_model_context_propagation(self)
    def test_trace_context_exposed_to_pbe(self)
    def test_custom_backend_tracing(self)
    def test_custom_backend_tracing_context_propagation(self)

Key Components

OTel Collector Management

Each test starts an OpenTelemetry Collector subprocess and waits for trace collection. On teardown, the collector is killed and trace logs are preserved with test-specific filenames.

def setUp(self):
    self.collector_subprocess = subprocess.Popen(
        ["./otelcol", "--config", "./trace-config.yaml"]
    )
    time.sleep(5)
    self.filename = "collected_traces.json"
    self.client_headers = dict(
        {"traceparent": "00-0af7651916cd43dd8448eb211c12666c-b7ad6b7169242424-01"}
    )

Event Verification

The _check_events method validates that each span type contains exactly the expected events:

Span Type Expected Events
InferRequest (HTTP) HTTP_RECV_START, HTTP_RECV_END, INFER_RESPONSE_COMPLETE, HTTP_SEND_START, HTTP_SEND_END
InferRequest (gRPC) GRPC_WAITREAD_START, GRPC_WAITREAD_END, INFER_RESPONSE_COMPLETE, GRPC_SEND_START, GRPC_SEND_END
compute COMPUTE_START, COMPUTE_INPUT_END, COMPUTE_OUTPUT_START, COMPUTE_END
Model request spans REQUEST_START, QUEUE_START, REQUEST_END
custom_identity_int32 REQUEST_START, QUEUE_START, REQUEST_END, CUSTOM_SINGLE_ACTIVITY
CUSTOM_ACTIVITY CUSTOM_ACTIVITY_START, CUSTOM_ACTIVITY_END
Cancelled (HTTP) HTTP_RECV_START, HTTP_RECV_END only
Cancelled (gRPC) GRPC_WAITREAD_START, GRPC_WAITREAD_END only

Span Hierarchy Verification

The _verify_nesting method reconstructs the parent-child span tree and compares against expected structures:

# Simple model: 3 spans
expected_parent_span_dict = {
    "InferRequest": ["simple"],
    "simple": ["compute"]
}

# BLS model: 6 spans
expected_parent_span_dict = {
    "InferRequest": ["bls_simple"],
    "bls_simple": ["compute", "ensemble_add_sub_int32_int32_int32"],
    "ensemble_add_sub_int32_int32_int32": ["simple"],
    "simple": ["compute"]
}

# Custom identity: 10 spans (6 nested custom activities)
expected_parent_span_dict = {
    "InferRequest": ["custom_identity_int32"],
    "custom_identity_int32": ["CUSTOM_ACTIVITY", "CUSTOM_ACTIVITY0", "compute"],
    "CUSTOM_ACTIVITY0": ["CUSTOM_ACTIVITY1"],
    # ... chain continues through CUSTOM_ACTIVITY5
}

Context Propagation

Tests that the W3C Trace Context traceparent header is correctly propagated from client to server. The trace ID and parent span ID from the header are verified against the root span's attributes.

def _verify_headers_propagated_from_client_if_any(self, root_span, headers):
    if headers != None:
        parent_span_id = headers["traceparent"].split("-")[2]
        parent_trace_id = headers["traceparent"].split("-")[1]
        self.assertEqual(root_span["traceId"], parent_trace_id)
    self.assertEqual(root_span["parentSpanId"], parent_span_id)

Resource Attribute Validation

Verifies that custom resource attributes set via --trace-config=opentelemetry,resource= appear in collected traces:

def _test_resource_attributes(self, attributes):
    expected_service_name = {"key": "service.name", "value": {"stringValue": "test_triton"}}
    expected_test_key_value = {"key": "test.key", "value": {"stringValue": "test.value"}}
    self.assertIn(expected_service_name, attributes)
    self.assertIn(expected_test_key_value, attributes)

Cancellation Tracing

Two cancellation tests validate trace behavior:

  • In-compute cancellation: Cancels during active computation; expects compute, request, and root spans with reduced events
  • In-queue cancellation: Cancels while queued in dynamic batcher; expects 0 compute spans

Trace Rate Control

The test_http_trace_triggered test modifies the trace rate to 5, sends 5 requests (expects 1 trace), then sends 5 requests with OTel headers (each should be traced), and finally restores the rate to 1.

SageMaker Endpoint Tracing

Tests trace propagation through SageMaker's /invocations and /models/{name}/invoke endpoints with OTel context headers.

Python Backend Trace Context

Validates that the Python backend can access trace context when running in OpenTelemetry mode, verifying the context follows the W3C traceparent format pattern.

def test_trace_context_exposed_to_pbe(self):
    context = result.as_numpy("OUTPUT0")[()].decode("utf-8")
    context = json.loads(context)
    self.assertIn("traceparent", context.keys())
    context_pattern = re.compile(r"\d{2}-[0-9a-f]{32}-[0-9a-f]{16}-\d{2}")
    self.assertIsNotNone(re.match(context_pattern, context["traceparent"]))

Dependencies

  • tritonclient.http / tritonclient.grpc - Triton client libraries
  • numpy - Numerical operations
  • requests - HTTP requests for SageMaker endpoints
  • test_util (tu) - Test result collector
  • OpenTelemetry Collector binary (otelcol)
  • trace-config.yaml - OTel collector configuration

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment