From 52093178a43d4ee642673e09622419d78833c44b Mon Sep 17 00:00:00 2001 From: Viraat Chandra Date: Fri, 5 Jun 2026 17:00:00 -0700 Subject: [PATCH] Revert "feat(trace): add -vvv binary trace events + live dashboard" This reverts commit 28f5ac199b681ad478d53ed8bf48981023a1ad1d. The -vvv trace feature was merged to main before review; reverting so main stays clean. It will be re-introduced via the reviewed feat/trace-events PR (#334). Co-Authored-By: Claude Opus 4.8 --- scripts/trace_dashboard.py | 214 --- .../commands/benchmark/execute.py | 67 +- .../endpoint_client/config.py | 5 - .../endpoint_client/http.py | 5 +- .../endpoint_client/worker.py | 32 +- .../load_generator/multi_turn_strategy.py | 8 - .../load_generator/session.py | 17 +- src/inference_endpoint/main.py | 21 +- src/inference_endpoint/utils/logging.py | 11 +- src/inference_endpoint/utils/trace.py | 561 -------- .../utils/trace_dashboard.py | 1173 ---------------- tests/unit/commands/test_benchmark.py | 2 - tests/unit/utils/__init__.py | 2 - tests/unit/utils/test_trace.py | 280 ---- tests/unit/utils/test_trace_dashboard.py | 1187 ----------------- 15 files changed, 17 insertions(+), 3568 deletions(-) delete mode 100755 scripts/trace_dashboard.py delete mode 100644 src/inference_endpoint/utils/trace.py delete mode 100644 src/inference_endpoint/utils/trace_dashboard.py delete mode 100644 tests/unit/utils/__init__.py delete mode 100644 tests/unit/utils/test_trace.py delete mode 100644 tests/unit/utils/test_trace_dashboard.py diff --git a/scripts/trace_dashboard.py b/scripts/trace_dashboard.py deleted file mode 100755 index 5d40763a9..000000000 --- a/scripts/trace_dashboard.py +++ /dev/null @@ -1,214 +0,0 @@ -#!/usr/bin/env python3 -# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 - -"""CLI entry point for the -vvv trace dashboard. - -Reads fixed-size binary frames from the FIFO opened by -:func:`inference_endpoint.utils.trace.bootstrap` and renders the -dashboard via ``rich.Live``. Dashboard aggregation lives in -:mod:`inference_endpoint.utils.trace_dashboard` so it can be unit -tested without standing up a TUI. - -Linux only: timestamps are compared across processes and rely on -``CLOCK_MONOTONIC`` being system-wide (per ``man 7 time``). -""" - -# ruff: noqa: I001 -# The pre-commit ruff hook is pinned to v0.3.3 (see -# .pre-commit-config.yaml's "TODO: sync rev with ruff version"), which -# does not auto-detect `inference_endpoint` as a first-party package -# and therefore disagrees with the project's local ruff (v0.15.8) on -# import order in this file. File-level noqa keeps both versions quiet -# until the rev is synced. -from __future__ import annotations - -import argparse -import fcntl -import json -import os -import sys -import threading -import time - -from inference_endpoint.utils.trace import FRAME_SIZE, snapshot_sidecar_path -from inference_endpoint.utils.trace_dashboard import ( - DASHBOARD_THEME, - READ_CHUNK, - REFRESH_HZ, - Dashboard, -) -from rich.console import Console -from rich.live import Live - - -def _try_load_snapshot(path: str) -> dict | None: - """Best-effort read of the loadgen snapshot sidecar. Returns None - if the file is missing or transiently mid-rename (atomic write may - briefly produce a half-rename window that json.load tolerates).""" - try: - with open(path) as f: - return json.load(f) - except (OSError, json.JSONDecodeError): - return None - - -# End-of-run exit policy for the reader thread. -# -# The authoritative final snapshot (state=="complete") is written by the -# parent's trace.teardown() *just before* it closes its FIFO write fd, and -# every worker closes its write fd at process exit. So true FIFO EOF (all -# writers closed) is the guaranteed signal that the complete snapshot is -# already on disk — preferring EOF makes the FINAL panel deterministic. -# -# But workers can take tens of seconds to drain their ZMQ queues before -# exiting, so we can't wait for EOF unconditionally or the dashboard hangs -# long past the end of the run. Compromise: once lifecycle frames go quiet, -# keep reading — letting true EOF trigger an immediate clean exit — until -# this generous cap bounds a wedged-worker hang. After the reader exits, -# main() polls the sidecar for the complete snapshot. -_IDLE_EXIT_CAP_S = 30.0 - - -class _FrameReader(threading.Thread): - """Blocking read loop; ingests whole frames into the Dashboard. - - Uses ``select`` with a short timeout so the thread can check for an - idle-exit condition — the FIFO may not reach EOF until all 24+ worker - processes have drained their ZMQ queues and exited, which can be tens - of seconds after the benchmark has finished. - """ - - def __init__(self, fd: int, dash: Dashboard) -> None: - super().__init__(daemon=True, name="trace-reader") - self._fd = fd - self._dash = dash - self._pending = bytearray() - self._eof = threading.Event() - - @property - def eof(self) -> bool: - return self._eof.is_set() - - def run(self) -> None: - import select - - try: - while True: - ready, _, _ = select.select([self._fd], [], [], 0.5) - if ready: - try: - chunk = os.read(self._fd, READ_CHUNK) - except OSError: - return - if not chunk: - return # true EOF — all writers closed - self._pending.extend(chunk) - whole = (len(self._pending) // FRAME_SIZE) * FRAME_SIZE - if whole: - self._dash.ingest_frames(bytes(self._pending[:whole])) - del self._pending[:whole] - # Check lifecycle idle time regardless of whether LOOP_LAG - # frames are still arriving — workers emit LOOP_LAG every - # 0.3 s even after the run ends, so frame-arrival time is - # not a reliable proxy for end-of-run. We keep reading so - # true EOF (the parent's post-teardown fd close) wins and - # guarantees the complete snapshot is on disk; this cap only - # bounds a wedged-worker hang where EOF never arrives. - d = self._dash - if (d.is_done or d.is_tail) and d.lifecycle_idle_s >= _IDLE_EXIT_CAP_S: - return - finally: - self._eof.set() - - -# Match the producers' write-fd request in trace.py. Capped by -# /proc/sys/fs/pipe-max-size; request fails above that → default kept. -_F_SETPIPE_SZ = getattr(fcntl, "F_SETPIPE_SZ", 1031) -_KERNEL_PIPE_BUF = 64 * 1024 * 1024 - - -def _open_trace_input(pipe_path: str | None) -> int: - if pipe_path: - fd = os.open(pipe_path, os.O_RDONLY) - try: - fcntl.fcntl(fd, _F_SETPIPE_SZ, _KERNEL_PIPE_BUF) - except OSError: - pass - return fd - return sys.stdin.fileno() - - -def main() -> int: - parser = argparse.ArgumentParser(description=__doc__) - parser.add_argument( - "--trace-pipe", - help="FIFO path to read binary trace frames from (default: stdin).", - ) - args = parser.parse_args() - - dash = Dashboard() - # Dashboard renders to stderr because the parent benchmark process - # has redirected its own stdout/stderr to a log file (see trace.bootstrap). - console = Console(file=sys.stderr, force_terminal=True, theme=DASHBOARD_THEME) - reader = _FrameReader(_open_trace_input(args.trace_pipe), dash) - reader.start() - - # Snapshot sidecar path is convention-named after the parent's pid - # (main proc that spawned us). The benchmark writes it periodically - # under -vvv so we can live-update the LOADGEN vs TRACE panel. - snap_path = snapshot_sidecar_path(os.getppid()) - - # screen=True uses the alternate-screen buffer so updates redraw - # cleanly without scrollback noise. When Live() exits the alt - # screen is torn down — to keep the final frame visible we capture - # it BEFORE leaving the context and print it to the normal buffer - # afterward. - final_frame = None - with Live( - dash.render(), - console=console, - refresh_per_second=REFRESH_HZ, - screen=True, - transient=False, - ) as live: - while not reader.eof: - snap = _try_load_snapshot(snap_path) - if snap is not None: - dash.attach_loadgen_snapshot(snap) - live.update(dash.render()) - time.sleep(1.0 / REFRESH_HZ) - # After reader exits (FIFO EOF or idle timeout), the main - # benchmark process may still be finalizing the metrics - # aggregator. Poll the sidecar for up to _FINAL_SNAP_WAIT_S - # seconds until we see a snapshot with state=="complete", - # then use it for the final comparison panel. - _FINAL_SNAP_WAIT_S = 20.0 - deadline = time.monotonic() + _FINAL_SNAP_WAIT_S - best_snap: dict | None = None - while time.monotonic() < deadline: - s = _try_load_snapshot(snap_path) - if s is not None: - best_snap = s - if s.get("state") in ("complete", "COMPLETE"): - break - time.sleep(0.2) - if best_snap is not None: - dash.attach_loadgen_snapshot(best_snap, force=True) - # Bypass the per-tick fold-defer window so the final render - # captures COMPLETE frames that landed within the last 300 ms - # — otherwise they sit queued and the closing frame shows - # stale stage histograms / verdict. - dash.flush_pending_folds() - final_frame = dash.render() - live.update(final_frame) - # Now we're back on the normal screen — print the last snapshot - # so the user can see the totals + verdict after the run ends. - if final_frame is not None: - console.print(final_frame) - console.print("[dim]── trace finished ──[/dim]") - return 0 - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/src/inference_endpoint/commands/benchmark/execute.py b/src/inference_endpoint/commands/benchmark/execute.py index c59aa4346..7a86bb2de 100644 --- a/src/inference_endpoint/commands/benchmark/execute.py +++ b/src/inference_endpoint/commands/benchmark/execute.py @@ -26,7 +26,6 @@ import asyncio import json import logging -import os import random import shutil import signal @@ -97,7 +96,6 @@ SessionResult, ) from inference_endpoint.metrics.report import Report -from inference_endpoint.utils import trace transformers_logging.set_verbosity_error() @@ -567,8 +565,6 @@ async def _run_benchmark_async( config = ctx.config session_id = f"cli_benchmark_{uuid.uuid4().hex[:8]}" - trace.start_lag_task(loop) # -vvv: main-proc loop-lag sampler - # Progress bar + response collector pbar = tqdm( desc=f"{config.model_params.name} (Streaming: {ctx.enable_streaming})", @@ -616,47 +612,6 @@ async def _run_benchmark_async( ) metrics_subscriber.start() - # -vvv: snapshot sidecar tap → dashboard's LOADGEN vs TRACE. - # - # The provider reads ``metrics_subscriber.latest``, which is only - # advanced by the subscriber's ``process()`` task scheduled off an - # ``add_reader`` callback on THIS loop. Under a saturated 30k+ req/s - # run the loop starves that callback, so ``latest`` freezes and the - # dashboard's live LOADGEN panel goes stale (the tap thread keeps - # re-writing the same frozen snapshot). The FINAL panel is NOT - # affected: trace.teardown() writes the authoritative - # ``state == "complete"`` snapshot to the same sidecar before it - # closes the FIFO, and the dashboard reader now prefers true FIFO - # EOF (all writers closed) before its final-snapshot poll — so the - # closing comparison always reflects ``final_snapshot.json``. - # - # TODO(trace-live-staleness): decouple the live feed from this loop - # with a trace-local second SUB, added purely here + as a helper in - # utils/trace.py (no edits to subscriber.py/publisher.py/APIs): - # * In a daemon thread, create an independent ``zmq.Context`` and a - # blocking SUB with ``CONFLATE=1`` + ``SUBSCRIBE b""`` connected - # to ``ipc://{zmq_ctx.socket_dir}/{metrics_socket_name}`` (the - # same endpoint the aggregator binds; a PUB fans out to all SUBs, - # so the existing in-process subscriber is unaffected). - # * Per recv: strip the ``TOPIC_FRAME_SIZE`` topic prefix and - # handle the ``BATCH_TOPIC`` batch frame (mirror - # ``ZmqMessageSubscriber.receive``), then - # ``MetricsSnapshotCodec().decode`` + ``snapshot_to_dict`` and - # hand the dict to the tap. A blocking recv in its own thread is - # immune to main-loop starvation. - # Deferred for now: it would duplicate the transport's private - # framing (TOPIC_FRAME_SIZE / BATCH_TOPIC / single-vs-batch layout) - # outside the transport layer, which is a hidden coupling to a - # non-public contract — out of scope for a surgical in-bounds fix. - trace.start_snapshot_tap( - loop, - lambda: ( - snapshot_to_dict(metrics_subscriber.latest) - if metrics_subscriber.latest is not None - else None - ), - ) - # Launch service subprocesses launcher = ServiceLauncher(zmq_ctx) aggregator_args: list[str] = [ @@ -725,9 +680,6 @@ async def _run_benchmark_async( api_key=config.endpoint_config.api_key, event_logs_dir=ctx.report_dir, cpu_affinity=ctx.affinity_plan, - trace_pipe_path=( - trace.fifo_path(os.getpid()) if trace.is_enabled() else None - ), ) http_client = await HTTPEndpointClient.create(http_config, loop) issuer = HttpClientSampleIssuer(http_client) @@ -812,15 +764,13 @@ def _on_global_timeout() -> None: def _on_phase_start(phase: PhaseConfig) -> None: nonlocal global_timeout_handle - if phase.phase_type == PhaseType.PERFORMANCE: - # -vvv: signal the dashboard that warmup is over so its - # LOADGEN vs TRACE accumulators reset to the same - # window the loadgen aggregator's tracked counters use. - trace.emit_trace(trace.Event.PERF_START, 0) - if max_duration_ms is not None: - global_timeout_handle = loop.call_later( - max_duration_ms / 1000.0, _on_global_timeout - ) + if ( + phase.phase_type == PhaseType.PERFORMANCE + and max_duration_ms is not None + ): + global_timeout_handle = loop.call_later( + max_duration_ms / 1000.0, _on_global_timeout + ) loop.add_signal_handler(signal.SIGINT, session.stop) try: @@ -881,9 +831,6 @@ def _on_phase_start(phase: PhaseConfig) -> None: except Exception as e: # noqa: BLE001 — best-effort report build. logger.warning(f"Failed to build report from snapshot: {e}") - # Cancel trace tasks, flush emitter, unlink FIFO, write final - # snapshot. No-op when trace was never enabled. - await trace.teardown(final_snapshot=snap_dict) metrics_subscriber.close() pbar.close() diff --git a/src/inference_endpoint/endpoint_client/config.py b/src/inference_endpoint/endpoint_client/config.py index 0b57138dd..6f93aba9d 100644 --- a/src/inference_endpoint/endpoint_client/config.py +++ b/src/inference_endpoint/endpoint_client/config.py @@ -193,11 +193,6 @@ class HTTPClientConfig(WithUpdatesMixin, BaseModel): default=None, exclude=True ) - # Trace FIFO path, set by trace.bootstrap under -vvv. - trace_pipe_path: Annotated[str | None, cyclopts.Parameter(parse=False)] = Field( - default=None, exclude=True - ) - # CPU affinity plan for worker processes (computed by caller, e.g. benchmark command). # None = disabled (no worker pinning) cpu_affinity: Annotated[AffinityPlan | None, cyclopts.Parameter(parse=False)] = ( diff --git a/src/inference_endpoint/endpoint_client/http.py b/src/inference_endpoint/endpoint_client/http.py index 53df76420..210d3e192 100644 --- a/src/inference_endpoint/endpoint_client/http.py +++ b/src/inference_endpoint/endpoint_client/http.py @@ -791,14 +791,13 @@ def build_request( @dataclass(slots=True) class InFlightRequest: - """State for a single HTTP request through its lifecycle. + """State for a single HTTP request through its lifecycle: Attributes: query_id: Correlates response back to original Query. http_bytes: Serialized HTTP request for socket.write(). is_streaming: Whether this is a streaming (SSE) request or not. - connection: PooledConnection assigned to this request (set once - request is fired). + connection: PooledConnection assigned to this request (set once request is fired). """ query_id: str diff --git a/src/inference_endpoint/endpoint_client/worker.py b/src/inference_endpoint/endpoint_client/worker.py index a731c6fba..a87b6889f 100644 --- a/src/inference_endpoint/endpoint_client/worker.py +++ b/src/inference_endpoint/endpoint_client/worker.py @@ -46,9 +46,7 @@ PooledConnection, ) from inference_endpoint.profiling import profile -from inference_endpoint.utils import trace from inference_endpoint.utils.logging import setup_logging -from inference_endpoint.utils.trace import Event, emit_trace_id logger = logging.getLogger(__name__) @@ -83,15 +81,6 @@ def worker_main( worker_log_format = f"%(asctime)s - %(name)s[W{worker_id}/%(process)d] - %(funcName)s - %(levelname)s - %(message)s" setup_logging(level=http_config.log_level, format_string=worker_log_format) - # -vvv: worker attaches to the FIFO main proc created at bootstrap. - # Missing here means the caller set trace_pipe_path but skipped - # mkfifo — fail loud rather than silently lose trace events. - if http_config.trace_pipe_path: - if not os.path.exists(http_config.trace_pipe_path): - raise FileNotFoundError( - f"trace_pipe_path={http_config.trace_pipe_path} missing" - ) - trace.enable_tracing(http_config.trace_pipe_path) # Configure GC based on worker_gc_mode match http_config.worker_gc_mode: @@ -240,11 +229,6 @@ async def run(self) -> None: signal.signal(signal.SIGTERM, self.shutdown) signal.signal(signal.SIGINT, self.shutdown) - # Event-loop lag sampler also drains this worker's trace - # buffer on the loop thread — see `trace.emit_loop_lag`. - if trace.is_enabled(): - self._loop.create_task(trace.emit_loop_lag(self.worker_id)) - # Warmup connection pool if enabled warmup_cfg = self.http_config.warmup_connections if warmup_cfg != 0: @@ -322,8 +306,6 @@ async def _run_main_loop(self) -> None: if query is None: break - emit_trace_id(Event.WORKER_RECEIVED, query.id) - # Prepare and fire request req = self._prepare_request(query) if not await self._fire_request(req): @@ -357,7 +339,7 @@ def _prepare_request(self, query: Query) -> InFlightRequest: extra_headers=query.headers, ) - # Create request context. + # Create request context req = InFlightRequest( query_id=query.id, http_bytes=http_bytes, @@ -383,11 +365,8 @@ async def _fire_request(self, req: InFlightRequest) -> bool: # Acquire connection from pool conn = await self._pool.acquire() - emit_trace_id(Event.CONN_ACQUIRED, req.query_id) - # Write request bytes directly to transport conn.protocol.write(req.http_bytes) - emit_trace_id(Event.WRITTEN, req.query_id) # Store connection on req for response processing req.connection = conn @@ -407,7 +386,6 @@ async def _process_response(self, req: InFlightRequest) -> None: try: # Await headers and handle error status status_code, _ = await conn.protocol.read_headers() - emit_trace_id(Event.RESPONSE_HEADERS, req.query_id) if status_code != 200: error_body = await conn.protocol.read_body() self._pool.release(conn) @@ -445,11 +423,7 @@ async def _handle_streaming_body(self, req: InFlightRequest) -> None: accumulator = self._accumulator(query_id, self.http_config.stream_all_chunks) # Process SSE stream - yields batches of chunks - first_chunk = True async for chunk_batch in self._iter_sse_lines(conn): - if first_chunk: - emit_trace_id(Event.RESPONSE_BYTES, req.query_id) - first_chunk = False for delta in chunk_batch: if stream_chunk := accumulator.add_chunk(delta): self._responses.send(stream_chunk) @@ -457,9 +431,6 @@ async def _handle_streaming_body(self, req: InFlightRequest) -> None: # Release connection early - done with socket I/O (idempotent) self._pool.release(conn) - # Last chunk received — splits server token-gen from the client tail. - emit_trace_id(Event.RESPONSE_DONE, req.query_id) - # Send final complete back to main rank self._responses.send(accumulator.get_final_output()) @@ -471,7 +442,6 @@ async def _handle_non_streaming_body(self, req: InFlightRequest) -> None: # Read entire response body response_bytes = await conn.protocol.read_body() - emit_trace_id(Event.RESPONSE_BYTES, req.query_id) # Release connection early - done with socket I/O (idempotent) self._pool.release(conn) diff --git a/src/inference_endpoint/load_generator/multi_turn_strategy.py b/src/inference_endpoint/load_generator/multi_turn_strategy.py index ef9d1f470..4cecccd19 100644 --- a/src/inference_endpoint/load_generator/multi_turn_strategy.py +++ b/src/inference_endpoint/load_generator/multi_turn_strategy.py @@ -26,7 +26,6 @@ from ..core.types import ErrorData, QueryResult, TextModelOutput from ..dataset_manager.multi_turn_dataset import ConversationMetadata from ..exceptions import InputValidationError -from ..utils.trace import Event, emit_trace_id from .conversation_manager import ConversationManager, ConversationState from .strategy import PhaseIssuerProtocol @@ -373,13 +372,6 @@ def _handle_timeout(self, query_id: str, conv_id: str) -> None: error_type="TurnTimeout", error_message=f"turn timeout after {self._turn_timeout_s}s", ) - # Mirror the trace COMPLETE here — _handle_response is bypassed - # for the timed-out turn, so without this the dashboard's - # in-flight count stays permanently inflated. Only fire on the - # timeout path: the abort path below uses register_skipped(), - # which never emits trace ISSUED, so emitting COMPLETE there - # would unmatched-inflate the dashboard's complete counter. - emit_trace_id(Event.COMPLETE, query_id) dropped = self._abort_remaining_turns( conv_id, reason=f"prior turn timed out (query={query_id})" diff --git a/src/inference_endpoint/load_generator/session.py b/src/inference_endpoint/load_generator/session.py index b871207bd..3b80420b8 100644 --- a/src/inference_endpoint/load_generator/session.py +++ b/src/inference_endpoint/load_generator/session.py @@ -39,7 +39,6 @@ ) from ..core.types import PromptData, Query, QueryResult, StreamChunk from ..dataset_manager.dataset import Dataset -from ..utils.trace import Event, emit_trace_id from .sample_order import create_sample_order from .strategy import LoadStrategy, create_load_strategy @@ -76,7 +75,7 @@ def _extract_prompt_text(messages: list[Any]) -> str | None: # --------------------------------------------------------------------------- -class PhaseType(str, Enum): # noqa: UP042 +class PhaseType(str, Enum): """Phase types control tracking and reporting behavior.""" PERFORMANCE = "performance" @@ -271,7 +270,6 @@ def issue( data=prompt_data, ) ) - emit_trace_id(Event.ISSUED, query_id) self._issuer.issue(query) self.inflight += 1 self.issued_count += 1 @@ -518,16 +516,9 @@ def _handle_response(self, resp: QueryResult | StreamChunk) -> None: # a real response arriving after timeout double-publishes ERROR/COMPLETE # and double-decrements inflight (no per-request HTTP timeout # exists in endpoint_client; late arrivals are possible). - # - # The trace MAIN_RECEIVED emit is below the gate so a late - # arrival doesn't reopen a lifecycle whose synthetic COMPLETE - # has already been folded — that would record a negative - # `final chunk -> complete` duration in the dashboard. if phase_issuer is not None and query_id in phase_issuer.completed_uuids: return - emit_trace_id(Event.MAIN_RECEIVED, resp.id) - conv_id_str, turn_num = ("", None) if phase_issuer is not None: conv_id_str, turn_num = phase_issuer.uuid_to_conv_info.pop( @@ -570,10 +561,6 @@ def _handle_response(self, resp: QueryResult | StreamChunk) -> None: data=resp.response_output, ) ) - # Trace COMPLETE for every phase (incl. warmup): the dashboard - # tracks the full request lifecycle, and a warmup ISSUED with - # no matching COMPLETE would leak as "in-flight" forever. - emit_trace_id(Event.COMPLETE, query_id) if phase_issuer is not None and query_id in phase_issuer.uuid_to_index: phase_issuer.mark_inflight_complete() @@ -606,8 +593,6 @@ def _handle_response(self, resp: QueryResult | StreamChunk) -> None: turn=turn_num, ) ) - if is_first: - emit_trace_id(Event.RECV_FIRST, resp.id) def _make_stop_check( self, settings: RuntimeSettings, phase_start_ns: int diff --git a/src/inference_endpoint/main.py b/src/inference_endpoint/main.py index 2c0f3f503..abae50643 100644 --- a/src/inference_endpoint/main.py +++ b/src/inference_endpoint/main.py @@ -42,9 +42,7 @@ InputValidationError, SetupError, ) -from inference_endpoint.utils import trace from inference_endpoint.utils.logging import setup_logging -from inference_endpoint.utils.trace import bootstrap as _bootstrap_trace logger = logging.getLogger(__name__) @@ -66,26 +64,13 @@ def launcher( name="--verbose", alias="-v", count=True, - help="Verbosity level (-v info, -vv debug, -vvv trace)", + help="Verbosity level (-v info, -vv debug)", ), ] = 0, ): """Global options applied before any command.""" - # -vvv only spawns the trace pipeline (FIFO, dashboard subprocess, - # stdout/stderr redirect) for the `benchmark` subcommand. Other - # commands (info, init, probe, validate-yaml) cap the verbosity at - # DEBUG so the FIFO/dashboard/log-redirect path is skipped — - # running e.g. `inference-endpoint -vvv info` should print info, - # not hijack the terminal. - is_benchmark = bool(tokens) and tokens[0] == "benchmark" - setup_logging(level=_bootstrap_trace(verbose if is_benchmark else min(verbose, 2))) - try: - app(tokens) - finally: - # Guarantees FIFO/dir cleanup when the benchmark fails before - # reaching run_benchmark_async's own teardown (e.g. config / - # dataset / endpoint setup raises). Idempotent off-trace. - trace.cleanup() + setup_logging(level="DEBUG" if verbose >= 2 else "INFO") + app(tokens) # Benchmark subcommands — lazy-loaded from commands/benchmark/cli.py diff --git a/src/inference_endpoint/utils/logging.py b/src/inference_endpoint/utils/logging.py index fce30166b..aae4650e4 100644 --- a/src/inference_endpoint/utils/logging.py +++ b/src/inference_endpoint/utils/logging.py @@ -30,12 +30,6 @@ # Initialize colorama _colorama_init(autoreset=True) -# Custom TRACE level below DEBUG (10) for high-frequency hot-path events. -# Activated by -vvv on the CLI; the matching binary trace emitter lives in -# inference_endpoint.utils.trace and is engaged from the same bootstrap. -TRACE = 5 -logging.addLevelName(TRACE, "TRACE") - # Map levelname -> color _LEVEL_COLORS = { "INFO": Fore.GREEN, @@ -118,8 +112,9 @@ def setup_logging(level: str | None = None, format_string: str | None = None) -> handler = logging.StreamHandler(sys.stdout) handler.setFormatter(ColoredFormatter(fmt=format_string, use_color=use_color)) - level_value = TRACE if level.upper() == "TRACE" else getattr(logging, level.upper()) - logging.basicConfig(level=level_value, handlers=[handler], force=True) + logging.basicConfig( + level=getattr(logging, level.upper()), handlers=[handler], force=True + ) # Set specific logger levels logging.getLogger("asyncio").setLevel(logging.WARNING) diff --git a/src/inference_endpoint/utils/trace.py b/src/inference_endpoint/utils/trace.py deleted file mode 100644 index 803849d3e..000000000 --- a/src/inference_endpoint/utils/trace.py +++ /dev/null @@ -1,561 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Binary trace event channel for ``-vvv`` runs. - -Design ------- -Each process (main + each worker) packs 17-byte events into a 1 MiB -ring and flushes once per asyncio tick via :func:`emit_loop_lag`. -Producer and flusher share the loop thread, so the emit hot path is -lock-free pack-and-return. - -Transport: a POSIX named pipe (``mkfifo``) at -``/tmp/endpoints_trace_/fifo`` (per-pid 0o700 dir; -``/tmp`` is shared with snapshot.json and logs.txt — ``/dev/shm`` is -reserved for execute.py's bulk artifacts). The ``scripts/trace_dashboard.py`` -subprocess opens the read end; main + every worker open the write -end. FIFO over alternatives because: - -* PIPE_BUF (4096 B) write atomicity — concurrent writers from N - processes can't interleave inside an event since each ≤ 4080 B - chunk is atomic. Unix SOCK_STREAM gives no such guarantee. -* Filesystem path = cross-process discovery without env vars or a - listen/accept rendezvous; workers just open the path. -* Kernel-buffered (``F_SETPIPE_SZ`` to 1 MiB) so the dashboard can - block briefly without dropping data. -* ``O_NONBLOCK`` + ``EAGAIN`` is the backpressure signal; we - account dropped bytes and emit ``Event.TRACE_DROPS`` rather than - stalling the loop. -* Reader EOF on parent exit is a natural shutdown signal. - -Wire format (17 B / event): `` str: - """Convention path for the trace FIFO (per-pid 0o700 dir).""" - return f"/tmp/endpoints_trace_{pid}/fifo" - - -def snapshot_sidecar_path(pid: int) -> str: - """Convention path for the live MetricsSnapshot JSON sidecar.""" - return f"/tmp/endpoints_trace_{pid}/snapshot.json" - - -# Buffer budget ≈ 4 GiB cap: 64 MiB ring × (1 main + N workers) + 64 MiB -# FIFO. Stays under 4 GiB to ~62 processes; lower _BUF_CAPACITY beyond. -# SPSC ring (producer + emit_loop_lag share the loop, no lock). 64 MiB ≈ -# 3.9M frames/tick, so ring-overflow drops are effectively impossible. -_BUF_CAPACITY = 64 * 1024 * 1024 # 64 MiB ≈ 3.9M frames per 0.3 s tick -_WRITE_CHUNK = 4080 # FRAME-aligned ≤ PIPE_BUF so writes are atomic -# Best-effort from both ends. Capped by /proc/sys/fs/pipe-max-size -# (1 MiB unprivileged); request fails above that → kernel default kept. -_KERNEL_PIPE_BUF = 64 * 1024 * 1024 # 64 MiB best-effort F_SETPIPE_SZ -_F_SETPIPE_SZ = getattr(fcntl, "F_SETPIPE_SZ", 1031) # Linux-only -_DASHBOARD_READY_S = 0.5 # grace after spawn before opening FIFO - - -def _sid_from_uuid(req_id: str) -> int: - """Deterministic 64-bit sid from a UUID hex string. - - ``hash(str)`` is per-process randomised (PEP 456) and would break - cross-process correlation. - """ - return int(req_id[:16], 16) - - -class _TraceEmitter: - """Buffered binary trace emitter. SPSC; one per process.""" - - __slots__ = ("_buf", "_dead", "_dropped_bytes", "_fd", "_offset") - - def __init__(self, fd: int) -> None: - self._fd = fd - self._buf = bytearray(_BUF_CAPACITY) - self._offset = 0 - self._dead = False - self._dropped_bytes = 0 - - def emit(self, event: int, sid: int) -> None: - if self._dead: - return - o = self._offset - if o + FRAME_SIZE > _BUF_CAPACITY: - # Buffer full this cycle; account the drop. - self._dropped_bytes += FRAME_SIZE - return - PACKER.pack_into(self._buf, o, event, sid, time.monotonic_ns()) - self._offset = o + FRAME_SIZE - - def flush(self) -> None: - if self._dead: - return - end = self._offset - if end == 0: - return - self._offset = 0 - view = memoryview(self._buf) - pos = 0 - while pos < end: - n = min(_WRITE_CHUNK, end - pos) - try: - os.write(self._fd, view[pos : pos + n]) - except OSError as e: - if e.errno == errno.EAGAIN: - # Reader is behind; PIPE_BUF atomicity means none - # of this chunk landed. Drop the rest, don't block. - self._dropped_bytes += end - pos - return - self._die() - return - pos += n - - def dropped_bytes(self) -> int: - """Cumulative dropped bytes since process start (never reset). - - emit_loop_lag re-emits this running total every tick, so a - TRACE_DROPS frame that is itself dropped (ring/pipe full) is - corrected on the next tick — the count is never lost. - """ - return self._dropped_bytes - - def _die(self) -> None: - global emit_trace, _active_emitter - self._dead = True - emit_trace = _noop - _active_emitter = None # is_enabled() → False after pipe death - try: - os.close(self._fd) - except OSError: - pass # already closed (double-_die, BrokenPipe auto-close) - - -def _noop(event: int, sid: int) -> None: # noqa: ARG001 - pass - - -def is_enabled() -> bool: - """True after :func:`enable_tracing` has been called this process.""" - return _active_emitter is not None - - -# Default (disabled) bindings. Replaced by enable_tracing() under -vvv. -# Hot path is a single bound-method call: pack_into + offset bump, no -# branching past the dead check. -emit_trace = _noop - -# Live emitter reference used by emit_loop_lag to drive periodic flushes -# on the same loop thread that owns emit_trace. None when tracing is off. -_active_emitter: _TraceEmitter | None = None - - -def emit_trace_id(event: int, req_id: str) -> None: - """Emit by request UUID. No-op guard makes non-hex ids safe when - tracing is off (existing tests pass ids like ``"q-1"``).""" - if emit_trace is _noop: - return - emit_trace(event, _sid_from_uuid(req_id)) - - -async def emit_loop_lag(worker_id: int, period_s: float = 0.3) -> None: - """Per-process tick: emit loop-lag + drops, then drain the ring. - - Spawn once per process after :func:`enable_tracing`. Runs on the - same loop as every ``emit_trace`` call, which is what makes the - SPSC ring lock-free. - """ - sid_high = (worker_id & 0xFF) << 56 - target_ns = int(period_s * 1e9) - mask_low = (1 << 56) - 1 - while True: - try: - t0 = time.monotonic_ns() - await asyncio.sleep(period_s) - lag = (time.monotonic_ns() - t0) - target_ns - if lag < 0: - lag = 0 - emit_trace(Event.LOOP_LAG, sid_high | (lag & mask_low)) - if _active_emitter is not None: - # Cumulative total, re-emitted every tick — self-heals if - # this frame is dropped (see _TraceEmitter.dropped_bytes). - dropped = _active_emitter.dropped_bytes() - if dropped: - emit_trace(Event.TRACE_DROPS, sid_high | (dropped & mask_low)) - _active_emitter.flush() - except asyncio.CancelledError: - # Final tick: report drops accrued since the last loop, then - # flush, so the closing snapshot's drop count is complete. - if _active_emitter is not None: - dropped = _active_emitter.dropped_bytes() - if dropped: - emit_trace(Event.TRACE_DROPS, sid_high | (dropped & mask_low)) - _active_emitter.flush() - return - except Exception: - # Don't let a transient fd error kill the flush task. - logger.exception("emit_loop_lag tick failed; continuing") - - -def enable_tracing(pipe_path: str) -> None: - """Install the emitter; idempotent. No-op if the FIFO is gone - (dashboard exited between dispatch and open).""" - global emit_trace - if emit_trace is not _noop: - return - if not os.path.exists(pipe_path): - return - # Two-step open: blocking O_WRONLY first to synchronise with the - # reader (O_NONBLOCK would ENXIO if reader isn't up yet), then - # flip to non-blocking so subsequent writes never stall the loop. - try: - fd = os.open(pipe_path, os.O_WRONLY) - except OSError: - return # pipe gone (dashboard exited before we opened) - try: - flags = fcntl.fcntl(fd, fcntl.F_GETFL) - fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) - except OSError: - pass # rare; non-blocking is best-effort - try: - fcntl.fcntl(fd, _F_SETPIPE_SZ, _KERNEL_PIPE_BUF) - except OSError: - # F_SETPIPE_SZ requires CAP_SYS_RESOURCE above the kernel - # default, or is unavailable on the running kernel — either - # way the smaller default pipe buffer is fine, the user-space - # ring buffer absorbs short spikes on its own. - pass - global _active_emitter - emitter = _TraceEmitter(fd) - emit_trace = emitter.emit - _active_emitter = emitter - - -def bootstrap(verbose: int) -> str: - """Resolve verbose count to a log-level. On ``-vvv`` create the - FIFO, spawn the dashboard, redirect this process's stdout/stderr - to ``logs.txt`` in the same per-pid dir, and install the emitter. - - Caller MUST invoke :func:`teardown` once the run is done — there - is no atexit hook. Leaving cleanup explicit keeps shutdown order - deterministic and avoids the "atexit fired during interpreter - teardown" failure mode where ``os.write``/``os.unlink`` race - finalised globals. - """ - if verbose < 2: - return "INFO" - if verbose == 2: - return "DEBUG" - - # PID reuse on long-lived hosts can leave a stale dir; Linux PIDs - # are unique at any instant so an existing dir at our own PID is - # always a dead previous occupant — wipe and recreate. - path = fifo_path(os.getpid()) - trace_dir = os.path.dirname(path) - try: - os.mkdir(trace_dir, 0o700) - except FileExistsError: - shutil.rmtree(trace_dir, ignore_errors=True) - try: - os.mkdir(trace_dir, 0o700) - except FileExistsError: - # rmtree + mkdir lost a race; refuse rather than spin. - raise RuntimeError( - f"trace dir {trace_dir} reappeared after cleanup" - ) from None - os.mkfifo(path, 0o600) - _state.fifo_path = path - - # Spawn dashboard BEFORE the stdout/stderr redirect so it inherits - # the original terminal fds. - dashboard_proc = _spawn_dashboard(path) - if dashboard_proc is None: - # Wheel install without scripts/. Don't enable_tracing — its - # blocking O_WRONLY would deadlock with no reader. - sys.stderr.write( - "trace: scripts/trace_dashboard.py not found — open the FIFO " - f"manually: 'python scripts/trace_dashboard.py --trace-pipe {path}'\n" - ) - return "TRACE" - - # Grace window so a dashboard import error surfaces here instead - # of deadlocking the blocking O_WRONLY below. - time.sleep(_DASHBOARD_READY_S) - if dashboard_proc.poll() is not None: - sys.stderr.write( - f"trace: dashboard exited rc={dashboard_proc.returncode}; " - f"trace events disabled this run\n" - ) - return "TRACE" - - # Log file lives inside the per-pid 0o700 dir; O_NOFOLLOW is - # belt-and-suspenders against a symlink attack on /tmp. - log_path = os.path.join(trace_dir, "logs.txt") - orig_stderr_fd = os.dup(2) - os.write( - orig_stderr_fd, - f"trace: dashboard active — logs piped to {log_path}\n".encode(), - ) - log_fd = os.open( - log_path, - os.O_WRONLY | os.O_CREAT | os.O_TRUNC | os.O_NOFOLLOW, - 0o600, - ) - os.dup2(log_fd, 1) - os.dup2(log_fd, 2) - os.close(log_fd) - _state.orig_stderr_fd = orig_stderr_fd - _state.log_path = log_path - - enable_tracing(path) - return "TRACE" - - -@dataclass -class _BootstrapState: - """Bookkeeping for :func:`teardown`. Populated by :func:`bootstrap`.""" - - fifo_path: str | None = None - orig_stderr_fd: int | None = None - log_path: str | None = None - tasks: list[asyncio.Task[None]] = field(default_factory=list) - # Daemon threads spawned by start_snapshot_tap. Not asyncio tasks; - # they exit when _tap_stop is set. - tap_threads: list[threading.Thread] = field(default_factory=list) - tap_stop: threading.Event = field(default_factory=threading.Event) - - -_state = _BootstrapState() - -SnapshotProvider = Callable[[], dict | None] - - -def start_lag_task(loop: asyncio.AbstractEventLoop) -> None: - """Spawn :func:`emit_loop_lag` for the main proc on ``loop``.""" - if not is_enabled(): - return - _state.tasks.append(loop.create_task(emit_loop_lag(MAIN_PROC_LOOP_ID))) - - -def start_snapshot_tap( - loop: asyncio.AbstractEventLoop, - provider: SnapshotProvider, - *, - period_s: float = 0.5, -) -> None: - """Spawn the snapshot sidecar tap in a daemon thread. - - Running in a thread (not a coroutine) means the sidecar updates on - a real-time clock even when the benchmark asyncio loop is saturated - at 30 k+ req/s and cannot schedule coroutines. ``provider`` must be - thread-safe; ``loop`` is accepted for API compat but unused. - """ - if not is_enabled(): - return - path = snapshot_sidecar_path(os.getpid()) - # Re-arm here (right before spawn), NOT in cleanup(): clearing in - # cleanup() would un-stop a tap thread that outlived teardown's join - # and let it overwrite the authoritative final snapshot. Any orphan - # from a prior session is already dead by the next start. - _state.tap_stop.clear() - stop = _state.tap_stop - - def _tap_thread() -> None: - while not stop.wait(timeout=period_s): - try: - snap = provider() - if snap is not None: - _atomic_write_json(path, snap) - except Exception: # noqa: BLE001 — telemetry, never crash - logger.debug("snapshot tap write failed", exc_info=True) - - t = threading.Thread(target=_tap_thread, daemon=True, name="snapshot-tap") - t.start() - _state.tap_threads.append(t) - - -def _atomic_write_json(path: str, payload: dict) -> None: - tmp = f"{path}.tmp" - try: - with open(tmp, "w") as f: - json.dump(payload, f) - os.replace(tmp, path) - except OSError: - logger.debug("snapshot sidecar write failed: %s", path, exc_info=True) - - -def cleanup() -> None: - """Sync portion of teardown: flush emitter, close fd, unlink FIFO, - print log-path reminder. Safe from any context (no loop required), - idempotent, and the only cleanup that runs when bootstrap fired - but the async event loop never started (e.g. CLI parse / config / - endpoint setup raised before run_benchmark_async).""" - global emit_trace, _active_emitter - emitter = _active_emitter - if emitter is not None: - emitter.flush() - try: - os.close(emitter._fd) - except OSError: - pass - emit_trace = _noop - _active_emitter = None - - if _state.fifo_path is not None: - try: - os.unlink(_state.fifo_path) - except OSError: - pass # dashboard or stale-dir wipe already removed it - _state.fifo_path = None - - if _state.orig_stderr_fd is not None and _state.log_path is not None: - try: - os.write( - _state.orig_stderr_fd, - f"\ntrace: full run log → {_state.log_path}\n".encode(), - ) - except OSError: - pass # terminal closed mid-run (nohup, detached) - _state.orig_stderr_fd = None - _state.log_path = None - - -async def teardown(*, final_snapshot: dict | None = None) -> None: - """Async teardown: stops the tap thread, cancels lag tasks, writes - the final snapshot, then delegates to :func:`cleanup`. Idempotent.""" - # Stop the tap thread first so we can write the authoritative final - # snapshot without a concurrent thread overwriting it. - _state.tap_stop.set() - for t in _state.tap_threads: - t.join(timeout=2.0) - _state.tap_threads.clear() - - # Write the final snapshot AFTER the tap thread is gone. - if final_snapshot is not None and _state.fifo_path is not None: - _atomic_write_json(snapshot_sidecar_path(os.getpid()), final_snapshot) - - for task in _state.tasks: - if not task.done(): - task.cancel() - try: - await task - except (asyncio.CancelledError, Exception): - pass # telemetry; let the bench keep tearing down - _state.tasks.clear() - cleanup() - - -def _spawn_dashboard(pipe_path: str) -> subprocess.Popen | None: - """Spawn ``scripts/trace_dashboard.py`` to read from the FIFO. - - Returns the live ``subprocess.Popen`` on success, or ``None`` if the - script isn't on disk (e.g. installed without the repo tree). The - dashboard inherits the parent's stdout/stderr at spawn time, so its - rich.Live render goes to the terminal — and the parent's stdout - and stderr remain untouched (trace bytes ride the FIFO). - """ - repo_root = Path(__file__).resolve().parents[3] - script = repo_root / "scripts" / "trace_dashboard.py" - if not script.exists(): - return None - return subprocess.Popen( - [sys.executable, str(script), "--trace-pipe", pipe_path], - # Inherit stdout / stderr / stdin from parent — dashboard renders - # to its own stderr (= terminal); trace bytes do NOT come through - # stdin, they come through the FIFO. - ) diff --git a/src/inference_endpoint/utils/trace_dashboard.py b/src/inference_endpoint/utils/trace_dashboard.py deleted file mode 100644 index 89307d909..000000000 --- a/src/inference_endpoint/utils/trace_dashboard.py +++ /dev/null @@ -1,1173 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 - -"""Dashboard logic for the -vvv trace stream. - -Pure aggregation + rendering — no I/O. The CLI entry point at -``scripts/trace_dashboard.py`` wires this up to the FIFO reader and -``rich.Live``. Tests target this module directly so the dashboard's -counts/lifecycle behaviour is verifiable in isolation. -""" - -from __future__ import annotations - -import re -import threading -import time -from collections import deque -from dataclasses import dataclass, field -from typing import NamedTuple - -from hdrh.histogram import HdrHistogram -from rich.text import Text -from rich.theme import Theme - -from inference_endpoint.utils.trace import ( - FRAME_SIZE, - MAIN_PROC_LOOP_ID, - PACKER, - Event, -) - -# Single source of truth for the dashboard palette. The CLI wrapper -# attaches this to its rich Console so style names below resolve at -# render time. Tones chosen to read cleanly on dark terminals without -# the bright-yellow / bold-cyan flash that earlier versions had. -DASHBOARD_THEME = Theme( - { - "rule": "grey39", - "label": "grey62", - "value": "white", - "section": "cyan", - "client_row": "grey78", - "server_row": "deep_sky_blue3", - "summary": "white", - "warn": "orange3", - "critical": "red3", - "muted": "grey50", - # ipc_2_worker / ipc_2_main tokens inside stage labels — call - # out the IPC boundaries so the eye lands on them. - "ipc_seg": "green3", - # issued (in) vs completed (out) — paired so the eye reads the - # backlog at a glance. - "issued": "deep_sky_blue1", - "completed": "green3", - } -) - -REFRESH_PERIOD_S = 0.3 -REFRESH_HZ = 1.0 / REFRESH_PERIOD_S -WIDTH = 129 -LABEL_W = 50 -# Sized to one enlarged-pipe drain (1 MiB, see trace._KERNEL_PIPE_BUF) so -# each os.read pulls a full pipe's worth in one syscall instead of 16× -# 64 KiB reads, keeping the single reader ahead of ~240k frames/s. -READ_CHUNK = 1 << 20 - -# Defer folding by this much after COMPLETE arrives, so worker frames -# flushed slightly later than main's COMPLETE for the same sid have a -# chance to land before we pop the lifecycle. -_FOLD_DEFER_NS = int(REFRESH_PERIOD_S * 1_000_000_000) - -# Evict partial lifecycles (no COMPLETE) older than this. Sized large -# enough to dwarf the worst-case request latency (long-streaming LLM -# completions, deep server queues) so legit in-flight requests are -# never collected before their COMPLETE arrives — losing the fold -# would silently zero out a stage row. -_LIFECYCLE_TTL_NS = 600_000_000_000 # 10 min - -# Tail indicator threshold: if no ISSUED event has arrived for this -# long while in-flight > 0, the run is considered to be in the tail -# (draining; no new work being scheduled). Two refresh ticks is the -# minimum that filters out idle gaps between ingest batches. -_TAIL_QUIET_NS = int(2 * REFRESH_PERIOD_S * 1e9) - -# End-of-run: no ISSUED or COMPLETE frame for this long ⇒ the run has -# stopped producing → freeze. Activity-based (not in-flight==0, which is -# unreliable when COMPLETE frames drop on the FIFO). Generous enough to -# never trip during a normal inter-completion gap at any real QPS. -_DONE_QUIET_NS = 3_000_000_000 # 3 s - -# HDR Histogram requires a fixed trackable range at construction time. -# 1 hour cap with 3 sig figs ≈ 14k buckets per metric, ~100 KB each. -# min/avg/max are tracked exactly outside the histogram (see _Metric) -# so values past the cap still show their true magnitude in those -# slots; only p50/p99 read from HDR and clamp at the cap. -HDR_LOW = 1 -HDR_HIGH = 3_600_000_000_000 # 1 hour in ns -HDR_SIG = 3 - - -# -- data model ----------------------------------------------------------- - - -class Stats(NamedTuple): - """Per-metric summary. All durations in nanoseconds.""" - - n: int - avg: float - min: float - p50: float - p99: float - max: float - - -@dataclass(slots=True) -class _Lifecycle: - """Per-request timing context keyed by sid. ``birth_ns`` is the - monotonic time the first frame for this sid landed — used by the - TTL eviction pass to drop partial lifecycles whose COMPLETE never - arrived. The fold defer is owned by ``Dashboard._fold_queue``. - """ - - birth_ns: int = 0 - stages: dict[int, int] = field(default_factory=dict) - - -@dataclass(slots=True) -class _Metric: - """Per-metric stats. ``min_ns`` / ``max_ns`` / ``sum_ns`` / ``total`` - are exact and uncapped — the dashboard's true min/avg/max always - reflect real data even past the HDR range. ``hist`` is HDR-bounded - (1 ns → 1 h) and only feeds the p50 / p99 columns; values past - HDR_HIGH are pinned at the cap there.""" - - total: int = 0 - sum_ns: float = 0.0 - min_ns: float = float("inf") - max_ns: float = float("-inf") - hist: HdrHistogram = field( - default_factory=lambda: HdrHistogram(HDR_LOW, HDR_HIGH, HDR_SIG) - ) - - def add(self, ns: float) -> None: - self.total += 1 - self.sum_ns += ns - if ns < self.min_ns: - self.min_ns = ns - if ns > self.max_ns: - self.max_ns = ns - iv = int(ns) - if iv < HDR_LOW: - iv = HDR_LOW - elif iv > HDR_HIGH: - iv = HDR_HIGH - self.hist.record_value(iv) - - -# -- stage definitions ---------------------------------------------------- - -_SIDE_CLIENT = "client" -_SIDE_SERVER = "server" - -# (key, start_event, end_event) — every per-stage delta the fold computes. -# Superset of both layouts below; the render picks + labels a subset by -# mode, so we fold once and show it streaming-vs-offline. server_resp is -# headers→1st-chunk (streaming) or headers→full-body (offline); stream_gen -# + tail_stream only fold when RESPONSE_DONE is present (streaming). -_STAGE_FOLDS: tuple[tuple[str, Event, Event], ...] = ( - ("backpressure", Event.ISSUED, Event.CONN_ACQUIRED), - ("socket_write", Event.CONN_ACQUIRED, Event.WRITTEN), - ("server_headers", Event.WRITTEN, Event.RESPONSE_HEADERS), - ("server_resp", Event.RESPONSE_HEADERS, Event.RESPONSE_BYTES), - ("stream_gen", Event.RESPONSE_BYTES, Event.RESPONSE_DONE), - ("tail_stream", Event.RESPONSE_DONE, Event.COMPLETE), - ("tail_offline", Event.RESPONSE_BYTES, Event.COMPLETE), -) - -# (side, label, key). Labels use ASCII '->' (some terminals render U+2192 -# as two cells, shifting every row a column versus the header). -_LAYOUT_STREAMING: tuple[tuple[str, str, str], ...] = ( - (_SIDE_CLIENT, "issue -> ipc_2_worker -> conn acquired", "backpressure"), - (_SIDE_CLIENT, "conn acquired -> payload written", "socket_write"), - (_SIDE_SERVER, "payload written -> headers recvd", "server_headers"), - (_SIDE_SERVER, "headers recvd -> 1st chunk", "server_resp"), - (_SIDE_SERVER, "1st chunk -> last chunk", "stream_gen"), - (_SIDE_CLIENT, "last chunk -> ipc_2_main -> complete", "tail_stream"), -) -_LAYOUT_OFFLINE: tuple[tuple[str, str, str], ...] = ( - (_SIDE_CLIENT, "issue -> ipc_2_worker -> conn acquired", "backpressure"), - (_SIDE_CLIENT, "conn acquired -> payload written", "socket_write"), - (_SIDE_SERVER, "payload written -> headers recvd", "server_headers"), - (_SIDE_SERVER, "headers recvd -> response", "server_resp"), - (_SIDE_CLIENT, "response -> ipc_2_main -> complete", "tail_offline"), -) - -# All metric keys tracked by Dashboard. Per-stage keys come from -# _STAGE_FOLDS; the rest are summary / aggregate buckets for the verdict: -# ipc_wait = ISSUED → WORKER_RECEIVED (worker-side pickup latency) -# client_pre = WORKER_RECEIVED → WRITTEN (loadgen send-side work) -# server_http = WRITTEN → last-body-byte (server response, incl. token-gen) -# client_post = last-body-byte → COMPLETE (loadgen receive-side work) -# where last-body-byte = RESPONSE_DONE (streaming) or RESPONSE_BYTES (offline). -_METRIC_KEYS: tuple[str, ...] = tuple({k for k, _, _ in _STAGE_FOLDS}) + ( - "e2e", - "ttft", - "ipc_wait", - "pool_wait", - "client_pre", - "server_http", - "client_post", -) - -# Backpressure thresholds: trigger the chip when either the worker-side -# pickup or the TCP-pool acquire takes ≥ this fraction of E2E. -_BACKPRESSURE_PCT = 0.20 - - -# -- stats -------------------------------------------------------------- - - -def _stats(m: _Metric) -> Stats: - if m.total == 0: - return Stats(0, 0.0, 0.0, 0.0, 0.0, 0.0) - h = m.hist - return Stats( - n=m.total, - avg=m.sum_ns / m.total, - min=m.min_ns, # exact, uncapped - p50=float(h.get_value_at_percentile(50.0)), - p99=float(h.get_value_at_percentile(99.0)), - max=m.max_ns, # exact, uncapped - ) - - -def _fmt_row(s: Stats, pct: float) -> str: - ms = 1e6 - # Columns are 12 / 11×5 / 9 chars — sized so 6-digit ms values - # (~hours of latency) still have a separator between cells. - return ( - f"{s.n:>12,}{s.avg / ms:>11.2f}{s.min / ms:>11.2f}{s.p50 / ms:>11.2f}" - f"{s.p99 / ms:>11.2f}{s.max / ms:>11.2f}{pct:>8.1f}%\n" - ) - - -_IPC_TOKEN_RE = re.compile(r"\bipc_\w+") - - -def _split_ipc_tokens(text: str) -> list[tuple[str, bool]]: - """Yield ``(chunk, is_ipc)`` runs so the renderer can color the - ``ipc_*`` tokens (e.g. ``ipc_2_worker``, ``ipc_2_main``) with - a different style from the surrounding label.""" - out: list[tuple[str, bool]] = [] - last = 0 - for m in _IPC_TOKEN_RE.finditer(text): - if m.start() > last: - out.append((text[last : m.start()], False)) - out.append((m.group(0), True)) - last = m.end() - if last < len(text): - out.append((text[last:], False)) - return out - - -# -- dashboard ---------------------------------------------------------- - - -class Dashboard: - """Aggregates trace frames; renders a rich :class:`Text`.""" - - def __init__( - self, - *, - fold_defer_ns: int = _FOLD_DEFER_NS, - lifecycle_ttl_ns: int = _LIFECYCLE_TTL_NS, - ) -> None: - self._fold_defer_ns = fold_defer_ns - self._lifecycle_ttl_ns = lifecycle_ttl_ns - self._lifecycles: dict[int, _Lifecycle] = {} - self._loop_lag: dict[int, _Metric] = {} - self._metrics: dict[str, _Metric] = {k: _Metric() for k in _METRIC_KEYS} - self._n_complete = 0 - self._start_ns = time.monotonic_ns() - # Guards mutation of every aggregator field (lifecycles, fold/birth - # queues, _metrics, _loop_lag, _dropped_bytes_by_proc, all _n_* - # counters). The FIFO reader thread enters via ingest_frames; the - # main thread enters via render(). Contention is bounded — render - # ticks at REFRESH_HZ (~3 Hz), the reader holds the lock for at - # most one frame batch at a time. - self._lock = threading.Lock() - # Per-process drop accounting: proc_id (worker_id or - # MAIN_PROC_LOOP_ID) → total dropped bytes reported so far. - self._dropped_bytes_by_proc: dict[int, int] = {} - # Lifecycle counters maintained at ingest time. The reader thread - # is the sole writer; the render thread does plain GIL-atomic - # reads. issued/complete are main-proc; written is worker-proc - # (WRITTEN = payload sent) and drives on-the-wire in-flight. - self._n_issued = 0 - self._n_written = 0 - self._n_complete_seen = 0 - # Monotonic-ns time when ISSUED / COMPLETE last incremented. - # _last_issued_ns drives the TAIL indicator; _last_complete_ns - # is used to freeze the rate denominator once completions stop - # arriving (prevents throughput from trending toward zero in tail). - # _last_lifecycle_ns is the max of the two — updated only on real - # request events (not LOOP_LAG/TRACE_DROPS) so the reader can - # detect idle end-of-run even when LOOP_LAG frames keep the FIFO active. - self._last_issued_ns = 0 - self._last_complete_ns = 0 - self._last_lifecycle_ns = 0 - # Fold queue: every COMPLETE event pushes (ts_seen, sid) here - # at ingest time. finalize_completed pops from the front with a - # time-based defer. This makes folding O(folds-per-render) - # instead of O(lifecycles) and removes the previous - # MAX_INFLIGHT-eviction footgun (where in-flight ISSUED entries - # were popped before their COMPLETE could land, starving the - # stage histograms). - self._fold_queue: deque[tuple[int, int]] = deque() - # Latest loadgen snapshot (parsed final_snapshot.json dict). - # Populated by attach_loadgen_snapshot when available; the - # comparison panel renders only if this is set. - self._loadgen_snapshot: dict | None = None - self._loadgen_snapshot_ts: int = 0 # monotonic_ns when data last changed - self._loadgen_snapshot_sig: int = 0 # hash of the last seen total_duration_ns - # Frozen Stats snapshot captured the first time is_done becomes - # True. Stage rows render from this once set so late-arriving - # straggler frames don't cause numbers to keep moving after the - # run is logically complete. - self._frozen_stats: dict[str, Stats] | None = None - - # ---- loadgen comparison hook --------------------------------------- - - def attach_loadgen_snapshot(self, snapshot: dict, *, force: bool = False) -> None: - """Store the latest parsed snapshot dict for the comparison panel. - - ``force=True`` bypasses the staleness gate and always refreshes - the timestamp — use for the end-of-run snapshot written by - ``teardown()``, which may share the same ``tracked_samples_completed`` - value as the last live snapshot if the subscriber was stale. - """ - self._loadgen_snapshot = snapshot - if force: - self._loadgen_snapshot_ts = time.monotonic_ns() - return - metrics = snapshot.get("metrics") or () - sig = next( - ( - int(m.get("value") or 0) - for m in metrics - if m.get("name") == "tracked_samples_completed" - and m.get("type") == "counter" - ), - 0, - ) - if sig != self._loadgen_snapshot_sig: - self._loadgen_snapshot_sig = sig - self._loadgen_snapshot_ts = time.monotonic_ns() - - # ---- observers (read-only; for tests & rendering) ------------------ - - @property - def n_issued(self) -> int: - return self._n_issued - - @property - def n_complete_seen(self) -> int: - return self._n_complete_seen - - @property - def elapsed_s(self) -> float: - """Wall-clock seconds since PERF_START (or dashboard start).""" - return max((time.monotonic_ns() - self._start_ns) / 1e9, 1e-9) - - @property - def _active_elapsed_s(self) -> float: - """Elapsed time capped at the last COMPLETE arrival. - - In the tail phase (no new completions) the wall clock grows - but completions don't. Capping the denominator here keeps - throughput rates from trending toward zero after the run drains. - """ - end_ns = self._last_complete_ns or time.monotonic_ns() - return max((end_ns - self._start_ns) / 1e9, 1e-9) - - @property - def issuance_rate(self) -> float: - """ISSUED events per second (main proc fire rate).""" - return self._n_issued / self.elapsed_s - - @property - def completion_rate(self) -> float: - """COMPLETE events per second (effective server throughput).""" - return self._n_complete_seen / self._active_elapsed_s - - @property - def n_complete_folded(self) -> int: - """Lifecycles that have been folded into the stage histograms.""" - return self._n_complete - - @property - def _issuance_quiet(self) -> bool: - """Main has stopped scheduling (no ISSUED for _TAIL_QUIET_NS).""" - if self._last_issued_ns == 0: - return False - return (time.monotonic_ns() - self._last_issued_ns) >= _TAIL_QUIET_NS - - @property - def is_tail(self) -> bool: - """Issuance has gone quiet but completions are still arriving — - the run is draining. Activity-based (independent of in-flight, - which is lossy under FIFO drops).""" - return self._issuance_quiet and not self.is_done - - @property - def lifecycle_idle_s(self) -> float: - """Seconds since the last ISSUED or COMPLETE frame. - - Zero until the first lifecycle event. Used by the reader to - trigger an idle exit even when LOOP_LAG frames keep arriving. - """ - if self._last_lifecycle_ns == 0: - return 0.0 - return (time.monotonic_ns() - self._last_lifecycle_ns) / 1e9 - - @property - def is_done(self) -> bool: - """Issuance quiet AND no ISSUED/COMPLETE for _DONE_QUIET_NS — the - run has stopped producing events, so freeze. Activity-based, so it - fires reliably even when dropped COMPLETE frames leave in-flight - permanently > 0 (the old in-flight==0 test never fired under drops, - which let straggler frames keep changing the display indefinitely). - """ - if not self._issuance_quiet: - return False - return (time.monotonic_ns() - self._last_lifecycle_ns) >= _DONE_QUIET_NS - - @property - def is_backpressured(self) -> bool: - """True when the first lifecycle stage (ISSUED → CONN_ACQUIRED) - takes ≥ _BACKPRESSURE_PCT of E2E — requests are backing up before - the socket write. Triggered off the stage's folded end-points, so - it survives intermediate-frame drops. Orthogonal to :attr:`is_tail`. - """ - e2e_avg = _stats(self._metrics["e2e"]).avg - if not e2e_avg: - return False - bp_avg = _stats(self._metrics["backpressure"]).avg - return bp_avg / e2e_avg >= _BACKPRESSURE_PCT - - @property - def in_flight(self) -> int: - """On-the-wire requests = WRITTEN (payload sent) − COMPLETE. - - Counts requests actually sent to the server and awaiting their - response — excludes the IPC backlog (issued but not yet written). - WRITTEN is worker-proc and COMPLETE main-proc, both lossy over the - FIFO, so the raw difference can momentarily exceed issued or go - negative under heavy frame drop; clamp to [0, issued − complete]. - """ - written = min(self._n_written, self._n_issued) - return max(0, written - self._n_complete_seen) - - @property - def dropped_frames(self) -> int: - return sum(self._dropped_bytes_by_proc.values()) // FRAME_SIZE - - def stage_n(self, key: str) -> int: - """N for a stage metric (e.g. ``ipc_dispatch``, ``server_headers``). - - Returns 0 if the key is unknown. - """ - m = self._metrics.get(key) - return 0 if m is None else m.total - - def loop_lag_n(self, proc_id: int) -> int: - m = self._loop_lag.get(proc_id) - return 0 if m is None else m.total - - def lifecycle_count(self) -> int: - """Number of sids still being tracked (pre-fold).""" - return len(self._lifecycles) - - # ---- ingest --------------------------------------------------------- - - def ingest_frames(self, buf: bytes) -> None: - n_whole = len(buf) // FRAME_SIZE - if n_whole == 0: - return - # Decode all frames in C via iter_unpack rather than a per-frame - # unpack_from loop — at ~240k frames/s across 24 worker pipes the - # Python-level loop is the reader's bottleneck and the backpressure - # that overflows the producers' pipes. iter_unpack requires an - # exact frame-multiple, so slice off any trailing partial first - # (the FIFO reader only hands us whole frames, but ingest is also - # called directly with partials in tests). - whole = buf if len(buf) == n_whole * FRAME_SIZE else buf[: n_whole * FRAME_SIZE] - frames = PACKER.iter_unpack(whole) - now_ns = time.monotonic_ns() - # Reader thread enters here; serialise against the render thread - # which may pop from the same queues / dicts inside render(). - with self._lock: - for eb, sid, ts in frames: - if eb == Event.LOOP_LAG: - self._record_loop_lag(sid) - continue - if eb == Event.TRACE_DROPS: - self._record_drop(sid) - continue - if eb == Event.PERF_START: - # Warmup done — drop everything seen so far so the - # LOADGEN vs TRACE comparison aligns with loadgen's - # tracked window. - self._reset_metrics(now_ns) - continue - lc = self._lifecycles.get(sid) - if lc is None: - lc = _Lifecycle(birth_ns=now_ns) - self._lifecycles[sid] = lc - if eb == Event.ISSUED: - self._n_issued += 1 - self._last_issued_ns = now_ns - self._last_lifecycle_ns = now_ns - elif eb == Event.WRITTEN: - self._n_written += 1 - elif eb == Event.COMPLETE: - # Gate on ISSUED present: a warmup request whose ISSUED - # was cleared at PERF_START but whose COMPLETE lands - # afterward must not bleed into the perf window. Safe - # because COMPLETE and ISSUED are both main-proc events - # (same emitter, FIFO order) — a genuine perf COMPLETE - # always has its ISSUED already seen. - if Event.ISSUED in lc.stages: - self._n_complete_seen += 1 - self._last_complete_ns = now_ns - self._last_lifecycle_ns = now_ns - # Enqueue for deferred fold; render thread will pop. - self._fold_queue.append((now_ns, sid)) - lc.stages[eb] = ts - - def _record_loop_lag(self, sid: int) -> None: - worker_id = (sid >> 56) & 0xFF - lag_ns = sid & ((1 << 56) - 1) - m = self._loop_lag.get(worker_id) - if m is None: - m = _Metric() - self._loop_lag[worker_id] = m - m.add(float(lag_ns)) - - def _record_drop(self, sid: int) -> None: - # Payload is the producer's CUMULATIVE drop total, re-sent every - # tick. Store the latest (max guards frame reorder) rather than - # summing, so a lost TRACE_DROPS frame self-heals on the next one. - proc_id = (sid >> 56) & 0xFF - dropped = sid & ((1 << 56) - 1) - prev = self._dropped_bytes_by_proc.get(proc_id, 0) - if dropped > prev: - self._dropped_bytes_by_proc[proc_id] = dropped - - def _reset_metrics(self, now_ns: int) -> None: - """Drop warmup-phase state on PERF_START. Per-worker loop_lag - and per-proc dropped-bytes counters are kept — they apply to - the worker process, not the request stream.""" - self._lifecycles.clear() - self._fold_queue.clear() - for m in self._metrics.values(): - m.total = 0 - m.sum_ns = 0.0 - m.min_ns = float("inf") - m.max_ns = float("-inf") - m.hist.reset() - self._n_issued = 0 - self._n_written = 0 - self._n_complete_seen = 0 - self._n_complete = 0 - self._last_issued_ns = 0 - self._last_complete_ns = 0 - self._last_lifecycle_ns = 0 - self._frozen_stats = None - self._start_ns = now_ns # uptime resets too — rate denominators - - # ---- finalize ------------------------------------------------------- - - def flush_pending_folds(self) -> None: - """Force-drain the fold queue ignoring the per-tick defer window. - - Called at FIFO EOF: any COMPLETE frames that arrived within the - last ``_fold_defer_ns`` would otherwise sit in the queue past - the final render and be lost. Acquires the same lock as - ingest/render to stay consistent with the rest of the API. - """ - with self._lock: - self._finalize_completed_impl(fold_defer_ns=0) - - def finalize_completed(self) -> None: - """Drain the fold queue (folds-since-last-tick) and the TTL queue - (partial lifecycles too old to keep). Both pops are O(work - done), not O(dict size). - """ - self._finalize_completed_impl(fold_defer_ns=self._fold_defer_ns) - - def _finalize_completed_impl(self, *, fold_defer_ns: int) -> None: - now_ns = time.monotonic_ns() - fold_deadline = now_ns - fold_defer_ns - while self._fold_queue and self._fold_queue[0][0] <= fold_deadline: - _ts, sid = self._fold_queue.popleft() - lc = self._lifecycles.pop(sid, None) - if lc is None: - # Already evicted by TTL or a previous duplicate COMPLETE. - continue - stages = lc.stages - if Event.COMPLETE not in stages or Event.ISSUED not in stages: - # Either ISSUED was never seen for this sid (e.g. its - # producer started after we missed its first flush) or - # COMPLETE was the only event for this sid. Skip — we - # can't time anything. - continue - self._fold(stages) - # TTL eviction: drop partial lifecycles (COMPLETE never landed) - # older than the TTL. Python preserves dict insertion order so - # we can scan from the oldest end and stop at the first entry - # still inside the TTL window — no separate birth queue needed, - # which is what kept this O(QPS × TTL) at high throughput. - evict_deadline = now_ns - self._lifecycle_ttl_ns - stale: list[int] = [] - for sid, lc in self._lifecycles.items(): - if lc.birth_ns > evict_deadline: - break - stale.append(sid) - for sid in stale: - del self._lifecycles[sid] - - def _fold(self, stages: dict[int, int]) -> None: - issued = stages[Event.ISSUED] - complete = stages[Event.COMPLETE] - self._metrics["e2e"].add(complete - issued) - recv_first = stages.get(Event.RECV_FIRST) - if recv_first is not None: - self._metrics["ttft"].add(recv_first - issued) - for key, start_ev, end_ev in _STAGE_FOLDS: - t0 = stages.get(start_ev) - t1 = stages.get(end_ev) - if t0 is not None and t1 is not None: - self._metrics[key].add(t1 - t0) - # Aggregate buckets for the verdict. client_pre measures - # WORKER_RECEIVED → WRITTEN (true loadgen send-side work), NOT - # ISSUED → WRITTEN — the latter folds in IPC queue wait, which is - # back-pressure from server saturation and misleads the verdict. - # body_done = last body byte: RESPONSE_DONE (streaming, last chunk) - # or RESPONSE_BYTES (offline, full body) — so server_http captures - # token-gen and client_post is the real client tail, both modes. - worker_recv = stages.get(Event.WORKER_RECEIVED) - conn_acq = stages.get(Event.CONN_ACQUIRED) - written = stages.get(Event.WRITTEN) - body_done = stages.get(Event.RESPONSE_DONE) - if body_done is None: - body_done = stages.get(Event.RESPONSE_BYTES) - if worker_recv is not None: - self._metrics["ipc_wait"].add(worker_recv - issued) - if conn_acq is not None: - self._metrics["pool_wait"].add(conn_acq - worker_recv) - if written is not None: - self._metrics["client_pre"].add(written - worker_recv) - if body_done is not None: - self._metrics["server_http"].add(body_done - written) - if body_done is not None: - self._metrics["client_post"].add(complete - body_done) - self._n_complete += 1 - - # ---- render --------------------------------------------------------- - - def render(self) -> Text: - # Held for the entire render so the reader thread can't mutate - # the dicts / queues / histograms we are walking. finalize_completed - # also mutates state (folds + evictions), so it must run inside - # the same critical section. - with self._lock: - self.finalize_completed() - # Freeze stage stats the first time is_done fires so that - # late-arriving straggler frames don't cause the lifecycle - # table to keep moving after the run is logically complete. - if self.is_done and self._frozen_stats is None: - self._frozen_stats = {k: _stats(v) for k, v in self._metrics.items()} - stats = self._frozen_stats or None - out = Text(no_wrap=True) - self._render_header(out) - out.append("\n") - self._render_lifecycle(out, frozen_stats=stats) - if self._loadgen_snapshot is not None: - out.append("\n") - self._render_loadgen_comparison(out) - out.append("\n") - self._render_loop_lag(out) - return out - - def _render_header(self, out: Text) -> None: - elapsed_s = self.elapsed_s - qps = self.completion_rate - dropped = self.dropped_frames - out.append("═" * WIDTH + "\n", style="section") - self._row( - out, - ( - ("uptime", f"{elapsed_s:>10.1f}s", ""), - ("complete", f"{self._n_complete_seen:>10,}", "completed"), - ("req/s", f"{qps:>10,.1f}", "completed"), - ), - ) - self._row( - out, - ( - ("issued", f"{self._n_issued:>10,}", "issued"), - ("in-flight", f"{self.in_flight:>10,}", ""), - ("", "", ""), - ), - ) - if dropped: - drop_style = "critical" if dropped > 100 else "warn" - self._row( - out, - ( - ("dropped frames", f"{dropped:>10,}", drop_style), - ("", "", ""), - ("", "", ""), - ), - ) - bp = self.is_backpressured - tail = self.is_tail - done = self.is_done - if done: - self._row( - out, - ( - ("status", f"{'DONE (report gen)':>24}", "warn"), - ("", "", ""), - ("", "", ""), - ), - ) - elif tail or bp: - if tail and bp: - chip, style = "TAIL + BACKPRESSURE", "critical" - elif tail: - chip, style = "TAIL", "warn" - else: - chip, style = "BACKPRESSURE", "critical" - status_fields: tuple[tuple[str, str, str], ...] = ( - ("status", f"{chip:>24}", style), - ("draining", f"{self.in_flight:>10,}", style) if tail else ("", "", ""), - ("", "", ""), - ) - self._row(out, status_fields) - out.append("═" * WIDTH + "\n", style="section") - - @staticmethod - def _row( - out: Text, - fields: tuple[tuple[str, str, str], ...], - *, - col_w: int = 40, - col_gap: int = 3, - ) -> None: - """Render a row of (label, value, value_style) fields in equal-width - columns. Labels are dim and left-aligned; values are right-aligned - within their column. Empty (label, value) pairs render as blank - space so column anchors stay consistent across rows. - """ - out.append(" ") - for i, (label, value, style) in enumerate(fields): - if i > 0: - out.append(" " * col_gap) - pad = max(1, col_w - len(label) - 1 - len(value)) - if label: - out.append(f"{label} ", style="rule") - out.append(" " * pad) - out.append(value, style=style) - else: - out.append(" " * col_w) - out.append("\n") - - def _render_lifecycle( - self, out: Text, frozen_stats: dict[str, Stats] | None = None - ) -> None: - def _get(key: str) -> Stats: - if frozen_stats is not None: - return frozen_stats.get(key, Stats(0, 0.0, 0.0, 0.0, 0.0, 0.0)) - return _stats(self._metrics[key]) - - e2e_avg = _get("e2e").avg or 1.0 - # Streaming if the 1st-chunk→last-chunk delta folded (RESPONSE_DONE - # present); otherwise offline (single body read). - streaming = _get("stream_gen").n > 0 - layout = _LAYOUT_STREAMING if streaming else _LAYOUT_OFFLINE - section = " REQUEST LIFECYCLE (ms)" - if frozen_stats is not None: - section += " [frozen]" - out.append(section + "\n", style="section") - out.append("─" * WIDTH + "\n", style="rule") - out.append( - f" {'stage':<{LABEL_W}}" - f"{'N':>12}{'avg':>11}{'min':>11}{'p50':>11}{'p99':>11}" - f"{'max':>11}{'%E2E':>9}\n", - style="label", - ) - for side, label, key in layout: - self._render_row_stats(out, side, label, _get(key), e2e_avg) - e2e_stats = _get("e2e") - self._render_summary_stats( - out, - "E2E TOTAL issue -> complete", - e2e_stats, - e2e_avg, - bold=True, - ) - out.append("\n") - self._render_verdict(out, e2e_avg) - - def _render_row( - self, out: Text, side: str, label: str, m: _Metric, e2e_avg: float - ) -> None: - self._render_row_stats(out, side, label, _stats(m), e2e_avg) - - def _render_row_stats( - self, out: Text, side: str, label: str, s: Stats, e2e_avg: float - ) -> None: - # Clamp at 100: a stage is a sub-interval of E2E so it cannot - # exceed it per request. The raw ratio can top 100% because the - # stage avg and the E2E avg are over different folded populations - # (different N when intermediate frames drop) — the slow-request - # subset that retained its frames biases the stage avg upward. - pct = min(100.0, 100.0 * s.avg / e2e_avg) if e2e_avg and s.avg else 0.0 - side_style = "client_row" if side == _SIDE_CLIENT else "server_row" - prefix = f"[{side}] {label}" - Dashboard._append_label(out, prefix[:LABEL_W], LABEL_W, side_style, " ") - out.append(_fmt_row(s, pct), style=side_style) - - @staticmethod - def _append_label( - out: Text, text: str, width: int, base_style: str, leading: str = "" - ) -> None: - """Append a stage label with ``ipc_<...>`` tokens highlighted. - Pads the whole label to ``width`` so columns stay aligned.""" - out.append(leading) - consumed = 0 - for chunk, is_ipc in _split_ipc_tokens(text): - out.append(chunk, style="ipc_seg" if is_ipc else base_style) - consumed += len(chunk) - pad = width - consumed - if pad > 0: - out.append(" " * pad, style=base_style) - - def _render_summary( - self, - out: Text, - label: str, - m: _Metric, - e2e_avg: float, - *, - bold: bool = False, - ) -> None: - self._render_summary_stats(out, label, _stats(m), e2e_avg, bold=bold) - - def _render_summary_stats( - self, - out: Text, - label: str, - s: Stats, - e2e_avg: float, - *, - bold: bool = False, - ) -> None: - pct = 100.0 * s.avg / e2e_avg if e2e_avg and s.avg else 0.0 - style = "summary" if bold else "" - out.append(f" {label[:LABEL_W]:<{LABEL_W}}", style=style) - out.append(_fmt_row(s, pct), style=style) - - def _render_verdict(self, out: Text, e2e_avg: float) -> None: - # Two plain rows; no section headers, no dividers, no footer. - # Same 3-column grid as the header so everything anchors. - client = ( - _stats(self._metrics["client_pre"]).avg - + _stats(self._metrics["client_post"]).avg - ) - server = _stats(self._metrics["server_http"]).avg - queue = _stats(self._metrics["ipc_wait"]).avg - c_pct = 100.0 * client / e2e_avg if e2e_avg else 0.0 - s_pct = 100.0 * server / e2e_avg if e2e_avg else 0.0 - q_pct = 100.0 * queue / e2e_avg if e2e_avg else 0.0 - c_style = "warn" if c_pct > 50 else "" - q_style = "warn" if q_pct > 50 else "" - - self._row( - out, - ( - ("client work", f"{c_pct:>9.1f}%", c_style), - ("server work", f"{s_pct:>9.1f}%", ""), - ("backpressure", f"{q_pct:>9.1f}%", q_style), - ), - ) - if self.elapsed_s < 2.0: - return - iss = self.issuance_rate - cmp = self.completion_rate - backlog = iss - cmp - bl_style = "warn" if backlog > max(cmp, 1) else "" - self._row( - out, - ( - ("issued", f"{iss:>10,.0f}/s", "issued"), - ("completed", f"{cmp:>10,.0f}/s", "completed"), - ("backlog", f"{backlog:>+10,.0f}/s", bl_style), - ), - ) - - def _render_loadgen_comparison(self, out: Text) -> None: - """Side-by-side trace-vs-loadgen panel. Two sub-tables: - counts/rates (with errors), and latency percentiles - (min/p50/p99/max + Δmax). Skipped when no snapshot is attached. - """ - snap = self._loadgen_snapshot - if snap is None: - return - metrics = snap.get("metrics") or () - counters = { - m.get("name"): m.get("value") for m in metrics if m.get("type") == "counter" - } - series = {m.get("name"): m for m in metrics if m.get("type") == "series"} - lg_completed = int(counters.get("total_samples_completed") or 0) or None - lg_failed = int(counters.get("tracked_samples_failed") or 0) or None - lg_tracked = int(counters.get("tracked_samples_completed") or 0) - # tracked_duration_ns may be 0 mid-run; fall back to total. - lg_dur_ns = int(counters.get("tracked_duration_ns") or 0) or int( - counters.get("total_duration_ns") or 0 - ) - lg_dur_s = lg_dur_ns / 1e9 if lg_dur_ns > 0 else 0.0 - lg_qps = (lg_tracked / lg_dur_s) if lg_dur_s and lg_tracked else None - lg_osl_total = float((series.get("osl") or {}).get("total") or 0.0) - lg_tps = (lg_osl_total / lg_dur_s) if lg_dur_s and lg_osl_total else None - lg_e2e = series.get("sample_latency_ns") or {} - lg_ttft = series.get("ttft_ns") or {} - lg_tpot = series.get("tpot_ns") or {} - - trace_e2e = _stats(self._metrics["e2e"]) - trace_ttft = _stats(self._metrics["ttft"]) - trace_qps = self.completion_rate or None - trace_completed = self._n_complete_seen or None - - snap_age_s = (time.monotonic_ns() - self._loadgen_snapshot_ts) / 1e9 - age_tag = f" (snapshot {snap_age_s:.0f}s old)" if snap_age_s > 2.0 else "" - out.append(f" LOADGEN vs TRACE{age_tag}\n", style="section") - out.append("─" * WIDTH + "\n", style="rule") - - # --- counts / rates table --- - out.append( - f" {'counts / rates':<{self._CMP_LABEL_W}}" - f"{'loadgen':>{self._CMP_VAL_W}}" - f"{'trace':>{self._CMP_VAL_W}}" - f"{'Δ':>{self._CMP_DELTA_W}}\n", - style="label", - ) - self._cmp_row( - out, "samples completed", lg_completed, trace_completed, fmt=",.0f" - ) - self._cmp_row(out, "errors", lg_failed, None, fmt=",.0f") - self._cmp_row(out, "throughput (req/s)", lg_qps, trace_qps, fmt=",.1f") - self._cmp_row(out, "throughput (tok/s)", lg_tps, None, fmt=",.1f") - - # --- latency percentiles table --- - out.append("\n") - out.append( - f" {'latency':<{self._LAT_LABEL_W}}" - f"{'src':<{self._LAT_SRC_W}}" - f"{'min':>{self._LAT_VAL_W}}" - f"{'p50':>{self._LAT_VAL_W}}" - f"{'p99':>{self._LAT_VAL_W}}" - f"{'max':>{self._LAT_VAL_W}}" - f"{'Δmax':>{self._LAT_DELTA_W}}\n", - style="label", - ) - # Label has no unit — _cmp_dist appends one auto-picked from - # the observed max so sub-ms values (e.g. tpot on small local - # models) don't all flatten to "0.00 ms". - self._cmp_dist(out, "e2e", lg_e2e, trace_e2e) - self._cmp_dist(out, "ttft", lg_ttft, trace_ttft) - self._cmp_dist(out, "tpot", lg_tpot, None) - - _CMP_LABEL_W = 40 - _CMP_VAL_W = 22 - _CMP_DELTA_W = 14 - _CMP_WARN_PCT = 5.0 - _LAT_LABEL_W = 18 - _LAT_SRC_W = 10 - _LAT_VAL_W = 14 - _LAT_DELTA_W = 12 - - @staticmethod - def _cmp_row( - out: Text, - label: str, - loadgen: float | None, - trace: float | None, - *, - fmt: str = ",.1f", - ) -> None: - """Single-value row: skipped when both sides are empty; em-dash - on the missing side; Δ em-dashed unless both sides populated.""" - if loadgen is None and trace is None: - return - val_w = Dashboard._CMP_VAL_W - out.append(f" {label:<{Dashboard._CMP_LABEL_W}}", style="label") - if loadgen is None: - out.append(f"{'—':>{val_w}}", style="muted") - else: - out.append(f"{loadgen:>{val_w}{fmt}}", style="") - if trace is None: - out.append(f"{'—':>{val_w}}", style="muted") - else: - out.append(f"{trace:>{val_w}{fmt}}", style="") - delta_s, delta_style = Dashboard._delta(loadgen, trace) - out.append(f"{delta_s:>{Dashboard._CMP_DELTA_W}}\n", style=delta_style) - - @staticmethod - def _cmp_dist( - out: Text, - label: str, - lg_series: dict, - trace_stats: Stats | None, - ) -> None: - """Two-line distribution block: loadgen + trace rows for one - metric. Auto-picks ns/µs/ms/s based on the observed max so - sub-ms values keep precision. Skipped if neither side has data.""" - lg_pcts = (lg_series or {}).get("percentiles") or {} - lg_count = (lg_series or {}).get("count") or 0 - has_lg = lg_count > 0 - has_tr = trace_stats is not None and trace_stats.n > 0 - if not has_lg and not has_tr: - return - - # Pick the largest max across both sources to size the unit. - lg_max_ns = float((lg_series or {}).get("max") or 0.0) if has_lg else 0.0 - tr_max_ns = ( - float(trace_stats.max) if has_tr and trace_stats is not None else 0.0 - ) - divisor, unit = Dashboard._pick_unit(max(lg_max_ns, tr_max_ns)) - - def cells( - min_: float | None, p50: float | None, p99: float | None, max_: float | None - ) -> tuple[str, ...]: - return tuple( - f"{v:>{Dashboard._LAT_VAL_W},.2f}" - if v is not None - else f"{'—':>{Dashboard._LAT_VAL_W}}" - for v in (min_, p50, p99, max_) - ) - - if has_lg: - lg_min = float(lg_series.get("min") or 0.0) / divisor - lg_max = lg_max_ns / divisor - lg_p50 = float(lg_pcts.get("50.0") or 0.0) / divisor - lg_p99 = float(lg_pcts.get("99.0") or 0.0) / divisor - else: - lg_min = lg_p50 = lg_p99 = lg_max = None # type: ignore[assignment] - out.append( - f" {label + ' (' + unit + ')':<{Dashboard._LAT_LABEL_W}}", style="label" - ) - out.append(f"{'loadgen':<{Dashboard._LAT_SRC_W}}", style="label") - for c in cells(lg_min, lg_p50, lg_p99, lg_max): - out.append(c, style="" if has_lg else "muted") - out.append("\n") - - if has_tr and trace_stats is not None: - tr_min = trace_stats.min / divisor - tr_max = tr_max_ns / divisor - tr_p50 = trace_stats.p50 / divisor - tr_p99 = trace_stats.p99 / divisor - else: - tr_min = tr_p50 = tr_p99 = tr_max = None # type: ignore[assignment] - out.append(f" {'':<{Dashboard._LAT_LABEL_W}}") - out.append(f"{'trace':<{Dashboard._LAT_SRC_W}}", style="label") - for c in cells(tr_min, tr_p50, tr_p99, tr_max): - out.append(c, style="" if has_tr else "muted") - delta_s, delta_style = ( - Dashboard._delta(lg_max, tr_max) if has_lg and has_tr else ("—", "muted") - ) - out.append(f"{delta_s:>{Dashboard._LAT_DELTA_W}}\n", style=delta_style) - - @staticmethod - def _pick_unit(max_ns: float) -> tuple[float, str]: - """Pick the most readable (divisor, suffix) for a metric whose - observed max is ``max_ns`` nanoseconds. ms is the fallback so a - block with no data still labels consistently.""" - if max_ns >= 1e9: - return 1e9, "s" - if max_ns >= 1e6: - return 1e6, "ms" - if max_ns >= 1e3: - return 1e3, "µs" - return 1e6, "ms" - - @staticmethod - def _delta(loadgen: float | None, trace: float | None) -> tuple[str, str]: - """Format the Δ cell: percentage change of trace vs loadgen. - Em-dash when either side is missing or loadgen is zero.""" - if loadgen is None or trace is None or loadgen == 0: - return ("—", "muted") - delta_pct = 100.0 * (trace - loadgen) / loadgen - style = "warn" if abs(delta_pct) > Dashboard._CMP_WARN_PCT else "" - return (f"{delta_pct:+.1f}%", style) - - # Maximum worker rows shown (excl. main which is always first). - _LAG_TOP_N = 16 - - def _render_loop_lag(self, out: Text) -> None: - out.append(" EVENT LOOP LAG (ms)\n", style="section") - out.append("─" * WIDTH + "\n", style="rule") - if not self._loop_lag: - out.append(" (no LOOP_LAG events yet)\n", style="muted italic") - return - - # Separate main from workers; sort workers by max lag descending, - # keep top _LAG_TOP_N worst offenders. - main_entry = self._loop_lag.get(MAIN_PROC_LOOP_ID) - all_worker_stats = [ - (wid, _stats(m)) - for wid, m in self._loop_lag.items() - if wid != MAIN_PROC_LOOP_ID - ] - all_worker_stats.sort(key=lambda t: t[1].max, reverse=True) - workers = all_worker_stats[: self._LAG_TOP_N] - - # Fleet summary: median p99 across all workers + hot-worker count. - # "Hot" = p99 > 5 ms (GIL or syscall stall territory). - _HOT_THRESH_NS = 5_000_000 # 5 ms - all_p99s = [s.p99 for _, s in all_worker_stats] - fleet_p99_ms = sorted(all_p99s)[len(all_p99s) // 2] / 1e6 if all_p99s else 0.0 - n_hot = sum(1 for p in all_p99s if p >= _HOT_THRESH_NS) - n_workers = len(all_worker_stats) - hot_style = "critical" if n_hot > n_workers // 2 else ("warn" if n_hot else "") - out.append( - f" fleet p99 {fleet_p99_ms:.2f} ms " f"hot workers (p99 ≥ 5 ms) ", - style="label", - ) - out.append(f"{n_hot}/{n_workers}\n", style=hot_style or "label") - - def _emit(label: str, s: Stats, *, highlight: bool = False) -> None: - mx_ms = s.max / 1e6 - p99_ms = s.p99 / 1e6 - mx_style = "critical" if mx_ms > 50 else ("warn" if mx_ms > 10 else "") - p99_style = "critical" if p99_ms > 10 else ("warn" if p99_ms > 1 else "") - row_style = "summary" if highlight else "" - out.append(f" {label:<10}", style=row_style) - out.append(f"{s.n:>10,}", style=row_style) - out.append(f"{s.min / 1e6:>10.2f}", style=row_style) - out.append(f"{s.p50 / 1e6:>10.2f}", style=row_style) - out.append(f"{p99_ms:>10.2f}", style=p99_style or row_style) - out.append(f"{mx_ms:>10.2f}\n", style=mx_style or row_style) - - out.append( - f" {'worker':<10}{'#samples':>10}{'min':>10}{'p50':>10}" - f"{'p99':>10}{'max':>10}\n", - style="label", - ) - - # main always first, always highlighted - if main_entry is not None: - _emit("main", _stats(main_entry), highlight=True) - - for wid, s in workers: - _emit(f"w{wid}", s) - - omitted = max(0, len(self._loop_lag) - 1 - self._LAG_TOP_N) - if omitted: - out.append( - f" … {omitted} worker(s) with lower max lag not shown\n", - style="muted", - ) diff --git a/tests/unit/commands/test_benchmark.py b/tests/unit/commands/test_benchmark.py index f6126ecfb..4a5e6ea23 100644 --- a/tests/unit/commands/test_benchmark.py +++ b/tests/unit/commands/test_benchmark.py @@ -620,7 +620,6 @@ async def _capture_launch(service_configs, *, timeout): patch( "inference_endpoint.commands.benchmark.execute.ServiceLauncher" ) as MockLauncher, - patch("inference_endpoint.commands.benchmark.execute.trace"), patch("inference_endpoint.commands.benchmark.execute.tqdm"), ): MockZMQ.scoped.return_value.__enter__ = MagicMock(return_value=mock_zmq) @@ -675,7 +674,6 @@ async def _capture_launch(service_configs, *, timeout): patch( "inference_endpoint.commands.benchmark.execute.ServiceLauncher" ) as MockLauncher, - patch("inference_endpoint.commands.benchmark.execute.trace"), patch("inference_endpoint.commands.benchmark.execute.tqdm"), ): MockZMQ.scoped.return_value.__enter__ = MagicMock(return_value=mock_zmq) diff --git a/tests/unit/utils/__init__.py b/tests/unit/utils/__init__.py deleted file mode 100644 index 52a7a9daf..000000000 --- a/tests/unit/utils/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 diff --git a/tests/unit/utils/test_trace.py b/tests/unit/utils/test_trace.py deleted file mode 100644 index 606968baa..000000000 --- a/tests/unit/utils/test_trace.py +++ /dev/null @@ -1,280 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 - -"""Coverage for ``inference_endpoint.utils.trace`` runtime helpers -(start_lag_task / start_snapshot_tap / teardown / path conventions / -emitter pipe-death). The dashboard aggregation lives in -``test_trace_dashboard.py``. -""" -# ruff: noqa: I001 - -from __future__ import annotations - -import asyncio -import json -import os -import shutil -import threading -import time - -import pytest - -from inference_endpoint.utils import trace - - -# --------------------------------------------------------------------------- -# Path conventions -# --------------------------------------------------------------------------- - - -@pytest.mark.unit -class TestPathConventions: - def test_fifo_path_is_per_pid_subdir(self) -> None: - assert trace.fifo_path(12345) == "/tmp/endpoints_trace_12345/fifo" - - def test_snapshot_path_in_same_subdir(self) -> None: - assert ( - trace.snapshot_sidecar_path(12345) - == "/tmp/endpoints_trace_12345/snapshot.json" - ) - - def test_paths_share_per_pid_dir(self) -> None: - # FIFO and snapshot must always be in the same per-pid dir so - # one mkdir / one cleanup covers both. - assert os.path.dirname(trace.fifo_path(7)) == os.path.dirname( - trace.snapshot_sidecar_path(7) - ) - - -# --------------------------------------------------------------------------- -# emit_trace_id no-op guard -# --------------------------------------------------------------------------- - - -@pytest.mark.unit -class TestNoOpGuard: - def test_non_hex_id_safe_when_disabled(self) -> None: - # Existing tests pass query ids like "q-1" / "q-stream"; the - # emit_trace_id no-op guard must short-circuit before the hex - # parse can raise ValueError. - assert trace.is_enabled() is False - trace.emit_trace_id(trace.Event.WRITTEN, "q-stream") - trace.emit_trace_id(trace.Event.WRITTEN, "not-a-hex-string") - - def test_dashed_uuid_safe_when_disabled(self) -> None: - trace.emit_trace_id(trace.Event.WRITTEN, "12345678-1234-1234-1234-123456789abc") - - -# --------------------------------------------------------------------------- -# enable_tracing / teardown -# --------------------------------------------------------------------------- - - -def _make_fifo_with_drain_thread() -> tuple[str, threading.Thread]: - """Set up the convention layout (per-pid 0o700 dir + FIFO inside) - that bootstrap() would normally create, plus a background reader - so enable_tracing's blocking O_WRONLY open returns immediately.""" - path = trace.fifo_path(os.getpid()) - trace_dir = os.path.dirname(path) - # Wipe any stale dir from a prior test in the same pid. - if os.path.isdir(trace_dir): - shutil.rmtree(trace_dir, ignore_errors=True) - os.mkdir(trace_dir, 0o700) - os.mkfifo(path, 0o600) - trace._state.fifo_path = path # so teardown unlinks like bootstrap does - - def _drain() -> None: - fd = os.open(path, os.O_RDONLY) - try: - while True: - if not os.read(fd, 4096): - return - finally: - os.close(fd) - - t = threading.Thread(target=_drain, daemon=True) - t.start() - time.sleep(0.05) - return path, t - - -@pytest.mark.unit -class TestEnableTracing: - def teardown_method(self) -> None: - # Coroutines run synchronously here; an asyncio.run drives teardown. - asyncio.run(trace.teardown()) - - def test_no_op_on_missing_fifo(self) -> None: - trace.enable_tracing("/tmp/this/does/not/exist") - assert trace.is_enabled() is False - - def test_enable_then_teardown_idempotent(self) -> None: - path, _ = _make_fifo_with_drain_thread() - trace.enable_tracing(path) - assert trace.is_enabled() is True - # Calling enable_tracing again is a no-op (idempotent). - trace.enable_tracing(path) - assert trace.is_enabled() is True - # First teardown disables; second teardown is harmless. - asyncio.run(trace.teardown()) - assert trace.is_enabled() is False - asyncio.run(trace.teardown()) - assert trace.is_enabled() is False - - -# --------------------------------------------------------------------------- -# Snapshot tap task -# --------------------------------------------------------------------------- - - -@pytest.mark.unit -class TestSnapshotTap: - def teardown_method(self) -> None: - asyncio.run(trace.teardown()) - - def test_tap_writes_atomic_json_then_teardown_cancels(self) -> None: - path, _ = _make_fifo_with_drain_thread() - trace.enable_tracing(path) - snap_path = trace.snapshot_sidecar_path(os.getpid()) - - async def _run() -> None: - loop = asyncio.get_running_loop() - provider_calls = {"n": 0} - - def provider() -> dict | None: - provider_calls["n"] += 1 - return {"hello": provider_calls["n"]} - - trace.start_snapshot_tap(loop, provider, period_s=0.05) - await asyncio.sleep(0.15) # ≥ 2 ticks - # File should exist with the latest provider payload. - assert os.path.exists(snap_path) - with open(snap_path) as f: - blob = json.load(f) - assert blob["hello"] >= 2 - assert provider_calls["n"] >= 2 - # teardown cancels the running task. - await trace.teardown() - - asyncio.run(_run()) - assert trace.is_enabled() is False - - def test_provider_returning_none_skips_write(self) -> None: - path, _ = _make_fifo_with_drain_thread() - trace.enable_tracing(path) - snap_path = trace.snapshot_sidecar_path(os.getpid()) - # Pre-remove any leftover sidecar. - try: - os.unlink(snap_path) - except FileNotFoundError: - pass - - async def _run() -> None: - loop = asyncio.get_running_loop() - trace.start_snapshot_tap(loop, lambda: None, period_s=0.05) - await asyncio.sleep(0.12) - assert not os.path.exists(snap_path) - await trace.teardown() - - asyncio.run(_run()) - - def test_start_when_disabled_is_no_op(self) -> None: - async def _run() -> None: - loop = asyncio.get_running_loop() - # Tracing not enabled → no task is spawned, no exception. - trace.start_snapshot_tap(loop, lambda: {"x": 1}) - await asyncio.sleep(0.01) - await trace.teardown() - - asyncio.run(_run()) - - -# --------------------------------------------------------------------------- -# Loop-lag task -# --------------------------------------------------------------------------- - - -@pytest.mark.unit -class TestLoopLagTask: - def teardown_method(self) -> None: - asyncio.run(trace.teardown()) - - def test_start_when_disabled_is_no_op(self) -> None: - async def _run() -> None: - loop = asyncio.get_running_loop() - trace.start_lag_task(loop) - await asyncio.sleep(0.01) - await trace.teardown() - - asyncio.run(_run()) - - def test_start_when_enabled_creates_task(self) -> None: - path, _ = _make_fifo_with_drain_thread() - trace.enable_tracing(path) - - async def _run() -> None: - loop = asyncio.get_running_loop() - trace.start_lag_task(loop) - # One task registered → teardown will cancel it. - assert len(trace._state.tasks) == 1 - await trace.teardown() - assert trace._state.tasks == [] - - asyncio.run(_run()) - - -# --------------------------------------------------------------------------- -# Teardown final-snapshot write -# --------------------------------------------------------------------------- - - -@pytest.mark.unit -class TestTeardownFinalSnapshot: - def teardown_method(self) -> None: - asyncio.run(trace.teardown()) - - def test_writes_final_snapshot_when_passed(self) -> None: - path, _ = _make_fifo_with_drain_thread() - trace.enable_tracing(path) - snap_path = trace.snapshot_sidecar_path(os.getpid()) - try: - os.unlink(snap_path) - except FileNotFoundError: - pass - - payload = {"final": True, "samples": 42} - asyncio.run(trace.teardown(final_snapshot=payload)) - - # File should reflect the passed dict, not whatever a tap - # would have produced. - with open(snap_path) as f: - assert json.load(f) == payload - - def test_no_op_final_snapshot_when_disabled(self) -> None: - # Without enable_tracing, fifo_path state isn't set; teardown - # silently no-ops on the final-write path. - asyncio.run(trace.teardown(final_snapshot={"ignored": True})) - assert trace.is_enabled() is False - - -@pytest.mark.unit -class TestSyncCleanup: - """``cleanup()`` runs from sync contexts (e.g. main.py's launcher - finally block when bootstrap fired but the loop never started).""" - - def teardown_method(self) -> None: - trace.cleanup() # idempotent reset - - def test_idempotent_when_never_enabled(self) -> None: - trace.cleanup() - trace.cleanup() - assert trace.is_enabled() is False - - def test_unlinks_fifo_and_disables_emitter(self) -> None: - path, _ = _make_fifo_with_drain_thread() - trace.enable_tracing(path) - assert trace.is_enabled() is True - assert os.path.exists(path) - trace.cleanup() - assert trace.is_enabled() is False - assert not os.path.exists(path) diff --git a/tests/unit/utils/test_trace_dashboard.py b/tests/unit/utils/test_trace_dashboard.py deleted file mode 100644 index 7d4263137..000000000 --- a/tests/unit/utils/test_trace_dashboard.py +++ /dev/null @@ -1,1187 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 - -"""TDD coverage for the trace dashboard's count + lifecycle logic. - -These tests target the symptoms the user has repeatedly hit: - * "in-flight count stuck / wrong" - * "N (stage count) stuck" - * "complete=0 even though events are flowing" - -The dashboard logic lives in ``inference_endpoint.utils.trace_dashboard`` -so we can drive it with synthetic frame buffers and assert invariants -without spinning up a real benchmark. -""" - -# ruff: noqa: I001 — see scripts/trace_dashboard.py for why this file is -# pinned: the pre-commit ruff (v0.3.3) and the local ruff (v0.15.8) -# disagree on `inference_endpoint` first-party detection. -from __future__ import annotations - -import struct -import time -import uuid - -import pytest -from inference_endpoint.utils.trace import ( - FRAME_SIZE, - MAIN_PROC_LOOP_ID, - PACKER, - Event, -) -from inference_endpoint.utils.trace_dashboard import Dashboard - -# All events arrive on a single ascending clock for these tests. -_t = [1000] - - -def _sid_from_uuid(req_id: str) -> int: - return int(req_id[:16], 16) - - -def _frame(event: Event, sid: int, ts: int | None = None) -> bytes: - if ts is None: - _t[0] += 1 - ts = _t[0] - return PACKER.pack(int(event), sid, ts) - - -def _loop_lag_sid(worker_id: int, lag_ns: int) -> int: - return ((worker_id & 0xFF) << 56) | (lag_ns & ((1 << 56) - 1)) - - -def _drop_sid(proc_id: int, dropped_bytes: int) -> int: - return ((proc_id & 0xFF) << 56) | (dropped_bytes & ((1 << 56) - 1)) - - -def _full_lifecycle(sid: int) -> bytes: - """Offline lifecycle: no RESPONSE_DONE (RESPONSE_BYTES is the full body).""" - return b"".join( - _frame(ev, sid) - for ev in ( - Event.ISSUED, - Event.WORKER_RECEIVED, - Event.CONN_ACQUIRED, - Event.WRITTEN, - Event.RESPONSE_HEADERS, - Event.RESPONSE_BYTES, - Event.MAIN_RECEIVED, - Event.COMPLETE, - ) - ) - - -def _inflight(sid: int) -> bytes: - """Issued + written (payload sent), not yet complete — counts as - on-the-wire in-flight.""" - return _frame(Event.ISSUED, sid) + _frame(Event.WRITTEN, sid) - - -def _full_streaming_lifecycle(sid: int) -> bytes: - """Streaming lifecycle: RESPONSE_BYTES = 1st chunk, RESPONSE_DONE = last.""" - return b"".join( - _frame(ev, sid) - for ev in ( - Event.ISSUED, - Event.WORKER_RECEIVED, - Event.CONN_ACQUIRED, - Event.WRITTEN, - Event.RESPONSE_HEADERS, - Event.RESPONSE_BYTES, - Event.RECV_FIRST, - Event.RESPONSE_DONE, - Event.MAIN_RECEIVED, - Event.COMPLETE, - ) - ) - - -def _new_sid() -> int: - return _sid_from_uuid(uuid.uuid4().hex) - - -def _dash() -> Dashboard: - """Test factory: zero fold defer so finalize_completed() folds - immediately without needing to advance the wall clock.""" - return Dashboard(fold_defer_ns=0) - - -# --------------------------------------------------------------------------- -# In-flight counter -# --------------------------------------------------------------------------- - - -@pytest.mark.unit -class TestInFlightCounter: - def test_starts_at_zero(self) -> None: - d = _dash() - assert d.in_flight == 0 - assert d.n_issued == 0 - assert d.n_complete_seen == 0 - - def test_written_increments_in_flight(self) -> None: - d = _dash() - sid = _new_sid() - d.ingest_frames(_inflight(sid)) # issued + written - assert d.in_flight == 1 - assert d.n_issued == 1 - assert d.n_complete_seen == 0 - - def test_issued_only_not_in_flight(self) -> None: - # Issued but not yet written (IPC backlog) is NOT on-the-wire - # in-flight — only written-but-not-complete counts. - d = _dash() - d.ingest_frames(_frame(Event.ISSUED, _new_sid())) - assert d.in_flight == 0 - assert d.n_issued == 1 - - def test_complete_brings_in_flight_to_zero(self) -> None: - d = _dash() - sid = _new_sid() - d.ingest_frames(_inflight(sid)) - d.ingest_frames(_frame(Event.COMPLETE, sid)) - assert d.in_flight == 0 - assert d.n_complete_seen == 1 - - def test_many_written_then_many_complete(self) -> None: - d = _dash() - sids = [_new_sid() for _ in range(500)] - for sid in sids: - d.ingest_frames(_inflight(sid)) - assert d.in_flight == 500 - for sid in sids: - d.ingest_frames(_frame(Event.COMPLETE, sid)) - assert d.in_flight == 0 - assert d.n_issued == 500 - assert d.n_complete_seen == 500 - - def test_in_flight_never_negative_and_excludes_orphan_complete(self) -> None: - # A COMPLETE with no preceding ISSUED (warmup bleed: ISSUED - # cleared at PERF_START) is not counted, so in_flight clamps at - # zero and never goes negative. - d = _dash() - d.ingest_frames(_frame(Event.COMPLETE, _new_sid())) - assert d.in_flight == 0 - assert d.n_complete_seen == 0 # orphan COMPLETE ignored - # A written-but-not-complete request is on-the-wire in-flight. - d.ingest_frames(_inflight(_new_sid())) - assert d.in_flight == 1 - - def test_in_flight_is_constant_time_at_scale(self) -> None: - # User's complaint: at 40k+ entries the in-flight count visibly - # lagged because we iterated the dict. After moving to direct - # counters this should be a plain int subtraction regardless of - # dict size. Sanity-check the timing. - d = _dash() - sids = [_new_sid() for _ in range(50_000)] - for sid in sids: - d.ingest_frames(_frame(Event.ISSUED, sid)) - assert d.lifecycle_count() == 50_000 - t0 = time.monotonic_ns() - for _ in range(1000): - _ = d.in_flight - elapsed_us = (time.monotonic_ns() - t0) / 1000 - # 1000 reads should take < 50 ms total (i.e. < 50 µs each) even - # at 50k lifecycle entries — generous bound that catches any - # O(N) regression. - assert ( - elapsed_us < 50_000 - ), f"in_flight read took {elapsed_us:.0f} µs / 1000 calls — O(N) regression" - - def test_in_flight_counts_written_minus_complete_at_scale(self) -> None: - # 21,578 fully-completed lifecycles + 958,187 written-but-not- - # complete. On-the-wire in-flight is the latter. - d = _dash() - complete_sids = [_new_sid() for _ in range(21_578)] - in_flight_sids = [_new_sid() for _ in range(958_187)] - d.ingest_frames(b"".join(_full_lifecycle(s) for s in complete_sids)) - d.ingest_frames(b"".join(_inflight(s) for s in in_flight_sids)) - assert d.n_issued == 979_765 - assert d.n_complete_seen == 21_578 - assert d.in_flight == 958_187 - - def test_rates_track_ingest(self) -> None: - d = _dash() - # Force a known elapsed window. - d._start_ns = time.monotonic_ns() - 4_000_000_000 # 4 s ago - sids = [_new_sid() for _ in range(2000)] - for s in sids: - d.ingest_frames(_frame(Event.ISSUED, s)) - # COMPLETE 500 of the issued sids (a COMPLETE only counts when - # its ISSUED was seen first). - for s in sids[:500]: - d.ingest_frames(_frame(Event.COMPLETE, s)) - # 2000/4 = 500 issue/s, 500/4 = 125 complete/s - assert 400 < d.issuance_rate < 600 - assert 100 < d.completion_rate < 150 - - def test_in_flight_is_written_minus_complete(self) -> None: - # On-the-wire in-flight = WRITTEN − COMPLETE. - d = _dash() - sids = [_new_sid() for _ in range(100)] - d.ingest_frames(b"".join(_inflight(s) for s in sids)) - assert d.in_flight == 100 # all written, none complete - for s in sids[:60]: - d.ingest_frames(_frame(Event.COMPLETE, s)) - assert d.in_flight == 40 - assert d.n_complete_seen == 60 - - def test_in_flight_clamped_when_written_exceeds_issued(self) -> None: - # Under FIFO drops, WRITTEN frames (worker-proc) can survive for - # requests whose ISSUED (main-proc) was dropped → n_written > - # n_issued. in_flight must never exceed issued − complete. - d = _dash() - for s in (_new_sid() for _ in range(50)): - d.ingest_frames(_frame(Event.ISSUED, s)) - # 80 WRITTEN frames, 30 of them for requests with no ISSUED seen. - for s in (_new_sid() for _ in range(80)): - d.ingest_frames(_frame(Event.WRITTEN, s)) - assert d.in_flight == 50 # min(80, 50) − 0, not 80 - - def test_in_flight_zero_when_all_completed(self) -> None: - # End-of-benchmark invariant: once every ISSUED has its - # COMPLETE, in_flight MUST be 0 — regardless of what stage - # folding or eviction did along the way. - d = _dash() - sids = [_new_sid() for _ in range(10_000)] - d.ingest_frames(b"".join(_inflight(s) for s in sids)) - assert d.in_flight == 10_000 - # Render a few times mid-flight (simulating the render thread - # interleaving with ingestion). The fold queue is empty so - # nothing folds, in-flight stays put. - for _ in range(5): - d.finalize_completed() - assert d.in_flight == 10_000 - # Now everything completes. - d.ingest_frames(b"".join(_frame(Event.COMPLETE, s) for s in sids)) - assert d.in_flight == 0 - # Further finalize ticks don't change it. - for _ in range(5): - d.finalize_completed() - assert d.in_flight == 0 - - def test_orphan_complete_without_issued_not_counted(self) -> None: - # Warmup bleed: a COMPLETE whose ISSUED was cleared at PERF_START - # must not inflate the perf-window counters. - d = _dash() - d.ingest_frames(_frame(Event.COMPLETE, _new_sid())) - assert d.n_complete_seen == 0 - - -# --------------------------------------------------------------------------- -# Stage N count via the folding path -# --------------------------------------------------------------------------- - - -@pytest.mark.unit -class TestStageN: - def test_no_fold_before_render_tick(self) -> None: - # Lifecycles sit in the dict until finalize_completed runs. - d = _dash() - sid = _new_sid() - d.ingest_frames(_full_lifecycle(sid)) - assert d.stage_n("backpressure") == 0 - assert d.n_complete_folded == 0 - - def test_fold_happens_on_first_finalize_with_zero_defer(self) -> None: - d = _dash() # fold_defer_ns=0 - sid = _new_sid() - d.ingest_frames(_full_lifecycle(sid)) - d.finalize_completed() - for key in ( - "backpressure", - "socket_write", - "server_headers", - "server_resp", - "tail_offline", - "e2e", - ): - assert d.stage_n(key) == 1, f"stage {key} did not fold" - assert d.n_complete_folded == 1 - - def test_streaming_lifecycle_folds_split_tail(self) -> None: - # RESPONSE_DONE present → token-gen (1st→last chunk) and the - # client tail (last chunk→complete) fold separately; the offline - # combined tail stays empty. - d = _dash() - d.ingest_frames(_full_streaming_lifecycle(_new_sid())) - d.finalize_completed() - assert d.stage_n("stream_gen") == 1 - assert d.stage_n("tail_stream") == 1 - assert d.stage_n("server_resp") == 1 # headers -> 1st chunk - assert d.stage_n("tail_offline") == 1 # also folds, just not shown - - def test_streaming_render_uses_split_labels(self) -> None: - d = _dash() - for _ in range(5): - d.ingest_frames(_full_streaming_lifecycle(_new_sid())) - text = d.render().plain - assert "1st chunk -> last chunk" in text - assert "last chunk -> ipc_2_main -> complete" in text - assert "headers -> response" not in text # offline-only label - - def test_offline_render_uses_response_labels(self) -> None: - d = _dash() - for _ in range(5): - d.ingest_frames(_full_lifecycle(_new_sid())) - text = d.render().plain - assert "headers recvd -> response" in text - assert "response -> ipc_2_main -> complete" in text - assert "1st chunk -> last chunk" not in text # streaming-only label - - def test_fold_defer_holds_completes_until_deadline(self) -> None: - # With a non-zero fold defer, a COMPLETE just observed should - # NOT be folded until the deadline has passed. This is what - # lets late worker frames catch up before we pop the lifecycle. - defer_ns = 50_000_000 # 50 ms - d = Dashboard(fold_defer_ns=defer_ns) - sid = _new_sid() - d.ingest_frames(_full_lifecycle(sid)) - d.finalize_completed() - assert d.n_complete_folded == 0 # not yet - time.sleep(0.075) # past the 50 ms defer - d.finalize_completed() - assert d.n_complete_folded == 1 - - def test_n_grows_monotonically_with_completions(self) -> None: - d = _dash() - for _ in range(100): - sid = _new_sid() - d.ingest_frames(_full_lifecycle(sid)) - # First render marks all 100 with complete_seen_at; second - # render folds them all. - d.finalize_completed() - d.finalize_completed() - assert d.stage_n("e2e") == 100 - assert d.n_complete_folded == 100 - - def test_partial_lifecycle_does_not_inflate_stage_n(self) -> None: - # Partial frames in: backpressure (issue -> tcp conn_acquired) - # folds only when COMPLETE finally lands AND CONN_ACQUIRED was - # seen. Invariant: N never exceeds n_complete_folded. - d = _dash() - sid = _new_sid() - d.ingest_frames(_frame(Event.ISSUED, sid)) - d.ingest_frames(_frame(Event.WORKER_RECEIVED, sid)) - d.ingest_frames(_frame(Event.CONN_ACQUIRED, sid)) - for _ in range(5): - d.finalize_completed() - assert d.stage_n("backpressure") == 0 - assert d.n_complete_folded == 0 - d.ingest_frames(_frame(Event.COMPLETE, sid)) - d.finalize_completed() - d.finalize_completed() - assert d.stage_n("backpressure") == 1 - assert d.n_complete_folded == 1 - - def test_stage_n_unchanged_by_extra_events_on_already_folded_sid(self) -> None: - # After fold, the sid is popped. A late event for that sid - # should create a new lifecycle (and inflate in-flight by 1 - # until it ages out), but must NOT double-count any stage. - d = _dash() - sid = _new_sid() - d.ingest_frames(_full_lifecycle(sid)) - d.finalize_completed() - d.finalize_completed() - assert d.stage_n("backpressure") == 1 - # Late stray frame - d.ingest_frames(_frame(Event.WORKER_RECEIVED, sid)) - d.finalize_completed() - d.finalize_completed() - assert d.stage_n("backpressure") == 1 - - -# --------------------------------------------------------------------------- -# Drop counter -# --------------------------------------------------------------------------- - - -@pytest.mark.unit -class TestDrops: - def test_no_drops_initially(self) -> None: - d = _dash() - assert d.dropped_frames == 0 - - def test_trace_drops_sums_across_procs(self) -> None: - d = _dash() - # 34 bytes dropped on main, 17 bytes on worker 5 - d.ingest_frames(_frame(Event.TRACE_DROPS, _drop_sid(MAIN_PROC_LOOP_ID, 34))) - d.ingest_frames(_frame(Event.TRACE_DROPS, _drop_sid(5, 17))) - assert d.dropped_frames == (34 + 17) // FRAME_SIZE # 3 - - def test_trace_drops_per_proc_is_cumulative_latest(self) -> None: - # The payload is a per-proc CUMULATIVE total re-sent each tick. - # Same proc reporting 34 then 510 → latest wins (not summed), - # so a lost frame self-heals on the next. - d = _dash() - d.ingest_frames(_frame(Event.TRACE_DROPS, _drop_sid(5, 34))) - d.ingest_frames(_frame(Event.TRACE_DROPS, _drop_sid(5, 510))) - assert d.dropped_frames == 510 // FRAME_SIZE # 30, not (34+510) - # A stale/reordered lower value does not regress the count. - d.ingest_frames(_frame(Event.TRACE_DROPS, _drop_sid(5, 100))) - assert d.dropped_frames == 510 // FRAME_SIZE - - def test_trace_drops_does_not_create_lifecycle(self) -> None: - d = _dash() - d.ingest_frames(_frame(Event.TRACE_DROPS, _drop_sid(MAIN_PROC_LOOP_ID, 34))) - assert d.lifecycle_count() == 0 - assert d.in_flight == 0 - - -# --------------------------------------------------------------------------- -# LOOP_LAG demux -# --------------------------------------------------------------------------- - - -@pytest.mark.unit -class TestLoopLag: - def test_demux_per_worker(self) -> None: - d = _dash() - d.ingest_frames(_frame(Event.LOOP_LAG, _loop_lag_sid(0, 1_000_000))) - d.ingest_frames(_frame(Event.LOOP_LAG, _loop_lag_sid(0, 2_000_000))) - d.ingest_frames(_frame(Event.LOOP_LAG, _loop_lag_sid(1, 500_000))) - d.ingest_frames( - _frame(Event.LOOP_LAG, _loop_lag_sid(MAIN_PROC_LOOP_ID, 3_000_000)) - ) - assert d.loop_lag_n(0) == 2 - assert d.loop_lag_n(1) == 1 - assert d.loop_lag_n(MAIN_PROC_LOOP_ID) == 1 - - def test_loop_lag_does_not_create_lifecycle(self) -> None: - d = _dash() - d.ingest_frames(_frame(Event.LOOP_LAG, _loop_lag_sid(0, 1_000_000))) - assert d.lifecycle_count() == 0 - assert d.in_flight == 0 - assert d.n_issued == 0 - - -# --------------------------------------------------------------------------- -# Frame parsing robustness -# --------------------------------------------------------------------------- - - -@pytest.mark.unit -class TestFrameParsing: - def test_ignores_trailing_partial_frame(self) -> None: - d = _dash() - sid = _new_sid() - whole = _frame(Event.ISSUED, sid) - partial = struct.pack(" None: - d = _dash() - d.ingest_frames(b"") - assert d.lifecycle_count() == 0 - assert d.in_flight == 0 - - -# --------------------------------------------------------------------------- -# Burst scenarios — mirror the user's offline-burst failure modes -# --------------------------------------------------------------------------- - - -@pytest.mark.unit -class TestBurst: - def test_burst_then_completion(self) -> None: - d = _dash() - sids = [_new_sid() for _ in range(1000)] - # Phase 1: all ISSUED arrive in a burst — issued but not written - # is IPC backlog, not on-the-wire in-flight. - d.ingest_frames(b"".join(_frame(Event.ISSUED, s) for s in sids)) - assert d.in_flight == 0 - # Render at this point — no folds yet. - d.finalize_completed() - assert d.n_complete_folded == 0 - # Phase 2: worker events arrive; WRITTEN puts them on the wire. - for ev in ( - Event.WORKER_RECEIVED, - Event.CONN_ACQUIRED, - Event.WRITTEN, - Event.RESPONSE_HEADERS, - Event.RESPONSE_BYTES, - ): - d.ingest_frames(b"".join(_frame(ev, s) for s in sids)) - assert d.in_flight == 1000 # written, not yet complete - # Phase 3: COMPLETE arrives - d.ingest_frames(b"".join(_frame(Event.MAIN_RECEIVED, s) for s in sids)) - d.ingest_frames(b"".join(_frame(Event.COMPLETE, s) for s in sids)) - assert d.in_flight == 0 - # Now fold (two cycles of defer logic) - d.finalize_completed() - d.finalize_completed() - assert d.n_complete_folded == 1000 - assert d.stage_n("e2e") == 1000 - assert d.stage_n("backpressure") == 1000 - - def test_render_header_shows_correct_counts(self) -> None: - # End-to-end: ingest a known set of events, render once, parse - # the header from the rich Text, assert the numbers the user - # actually sees on screen. 30 written-not-complete (in-flight), - # 70 fully complete → 100 issued. - d = _dash() - for _ in range(30): - d.ingest_frames(_inflight(_new_sid())) - for _ in range(70): - d.ingest_frames(_full_lifecycle(_new_sid())) - text = d.render().plain - # First header line: complete + req/s - complete_line = next(line for line in text.splitlines() if "complete" in line) - assert " 70" in complete_line, complete_line - # Second header line: issued / in-flight - inflight_line = next(line for line in text.splitlines() if "in-flight" in line) - assert " 30" in inflight_line, inflight_line # in-flight - assert " 100" in inflight_line, inflight_line # issued - assert "queued" not in text - assert "processing" not in text - - def test_time_line_attributes_queue_wait_correctly(self) -> None: - # Long IPC backpressure must NOT be billed as client overhead; - # it shows up in the "backpressure" column. - d = _dash() - sid = _new_sid() - ts = [ - (Event.ISSUED, 0), - (Event.WORKER_RECEIVED, 20_000_000_000), # 20 s backpressure - (Event.CONN_ACQUIRED, 20_000_005_000), - (Event.WRITTEN, 20_000_010_000), # 10 us client_pre - (Event.RESPONSE_HEADERS, 25_000_010_000), # 5 s server - (Event.RESPONSE_BYTES, 25_000_010_500), - (Event.MAIN_RECEIVED, 25_000_011_000), - (Event.COMPLETE, 25_000_012_000), - ] - d.ingest_frames(b"".join(_frame(ev, sid, t) for ev, t in ts)) - d.finalize_completed() - text = d.render().plain - time_line = next(line for line in text.splitlines() if "backpressure" in line) - assert "client work" in time_line - assert "server work" in time_line - assert "0.0%" in time_line # client work - assert "80.0%" in time_line # backpressure - assert "20.0%" in time_line # server work - - def test_rate_line_shows_issued_completed_backlog(self) -> None: - d = _dash() - d._start_ns = time.monotonic_ns() - 10_000_000_000 - for _ in range(70_000): - d.ingest_frames(_frame(Event.ISSUED, _new_sid())) - for _ in range(2_000): - d.ingest_frames(_full_lifecycle(_new_sid())) - text = d.render().plain - rate_line = next(line for line in text.splitlines() if "backlog " in line) - assert "issued " in rate_line - assert "completed " in rate_line - assert "+" in rate_line # positive backlog - - def test_rate_line_suppressed_during_warmup(self) -> None: - d = _dash() - for _ in range(50): - d.ingest_frames(_frame(Event.ISSUED, _new_sid())) - text = d.render().plain - assert not any( - "issued " in line and "backlog" in line for line in text.splitlines() - ) - - -@pytest.mark.unit -class TestLoadgenComparison: - """Final-frame comparison: trace-measured vs loadgen-recorded metrics.""" - - def _snapshot( - self, - *, - completed: int = 1000, - tracked: int = 1000, - tracked_duration_ns: int = 10_000_000_000, - e2e_p50_ns: float = 100_000_000, - e2e_p99_ns: float = 250_000_000, - ttft_p50_ns: float | None = None, - ttft_p99_ns: float | None = None, - ) -> dict: - # Mirror the on-wire shape produced by - # snapshot_to_dict: counters live in `metrics` with - # type="counter", not under a top-level dict. - metrics: list[dict] = [ - {"type": "counter", "name": "total_samples_completed", "value": completed}, - {"type": "counter", "name": "tracked_samples_completed", "value": tracked}, - { - "type": "counter", - "name": "tracked_duration_ns", - "value": tracked_duration_ns, - }, - { - "type": "counter", - "name": "total_duration_ns", - "value": tracked_duration_ns, - }, - { - "type": "series", - "name": "sample_latency_ns", - "count": tracked, - "total": e2e_p50_ns * tracked, - "min": 0.0, - "max": e2e_p99_ns, - "sum_sq": 0.0, - "percentiles": {"50.0": e2e_p50_ns, "99.0": e2e_p99_ns}, - "histogram": [], - }, - ] - if ttft_p50_ns is not None: - metrics.append( - { - "type": "series", - "name": "ttft_ns", - "count": tracked, - "total": ttft_p50_ns * tracked, - "min": 0.0, - "max": ttft_p99_ns or ttft_p50_ns, - "sum_sq": 0.0, - "percentiles": { - "50.0": ttft_p50_ns, - "99.0": ttft_p99_ns or ttft_p50_ns, - }, - "histogram": [], - } - ) - return { - "counter": 0, # snapshot frame number (int), not the per-metric counters - "timestamp_ns": 0, - "state": "complete", - "n_pending_tasks": 0, - "metrics": metrics, - } - - def test_no_comparison_section_until_attached(self) -> None: - d = _dash() - d.ingest_frames(_full_lifecycle(_new_sid())) - d.finalize_completed() - text = d.render().plain - assert "LOADGEN vs TRACE" not in text - - def test_comparison_section_appears_after_attach(self) -> None: - d = _dash() - for _ in range(1000): - d.ingest_frames(_full_lifecycle(_new_sid())) - d.finalize_completed() - d.attach_loadgen_snapshot( - self._snapshot(completed=1000, tracked=1000, tracked_duration_ns=10**10) - ) - text = d.render().plain - assert "LOADGEN vs TRACE" in text - # Two-section layout: counts/rates + latency (with src column). - assert "counts / rates" in text - assert "loadgen" in text - assert "trace" in text - assert "Δ" in text - assert "throughput (req/s)" in text - assert "samples completed" in text - # Latency block exposes min/p50/p99/max + Δmax. - assert "latency" in text - assert "Δmax" in text - # Unit is auto-picked from the observed max so the label suffix - # varies (ms/µs/s); just confirm the metric name is present. - assert "e2e (" in text - for col in ("min", "p50", "p99", "max"): - assert col in text - - def test_comparison_values_match_when_in_agreement(self) -> None: - # Snapshot says 1000 completed; drive trace ingest with same - # counts so Δ on the samples row is ~0%. - d = _dash() - d._start_ns = time.monotonic_ns() - 10_000_000_000 # 10 s elapsed - for _ in range(1000): - d.ingest_frames(_full_lifecycle(_new_sid())) - d.finalize_completed() - d.attach_loadgen_snapshot( - self._snapshot(completed=1000, tracked=1000, tracked_duration_ns=10**10) - ) - text = d.render().plain - # One row carries the label + both values + delta. - samples_line = next( - line for line in text.splitlines() if "samples completed" in line - ) - # Loadgen and trace each contribute one "1,000" cell on the - # same row; the third token is the Δ. - assert samples_line.count("1,000") == 2, samples_line - assert "+0.0%" in samples_line or "0.0%" in samples_line - - def test_comparison_throughput_falls_back_to_total_duration(self) -> None: - # Snapshot with tracked_duration_ns = 0 but total_duration_ns - # populated: throughput must NOT be reported as 0. - d = _dash() - for _ in range(100): - d.ingest_frames(_full_lifecycle(_new_sid())) - d.finalize_completed() - snap = self._snapshot(completed=100, tracked=100, tracked_duration_ns=0) - # _snapshot() puts the same value in both counters. Force - # tracked_duration_ns back to 0 while keeping total_duration_ns - # non-zero to mimic the real-world "tracked block not yet - # closed" case. - for m in snap["metrics"]: - if m.get("name") == "tracked_duration_ns": - m["value"] = 0 - elif m.get("name") == "total_duration_ns": - m["value"] = 10_000_000_000 # 10 s - d.attach_loadgen_snapshot(snap) - text = d.render().plain - tput_line = next( - line for line in text.splitlines() if "throughput (req/s)" in line - ) - # 100 samples / 10 s = 10 req/s. Must not be 0.0 and the - # loadgen cell must not be an em-dash. - assert "10.0" in tput_line, tput_line - assert "—" not in tput_line.split("throughput (req/s)")[1].split()[0], tput_line - - def test_comparison_shows_tpot_and_tps_loadgen_only(self) -> None: - # tpot + tps come from loadgen only; the trace cell stays - # em-dashed because we don't measure per-token timings. - d = _dash() - for _ in range(100): - d.ingest_frames(_full_lifecycle(_new_sid())) - d.finalize_completed() - snap = self._snapshot( - completed=100, tracked=100, tracked_duration_ns=10_000_000_000 - ) - # Add tpot and osl series so tps + tpot rows render. - snap["metrics"].extend( - [ - { - "type": "series", - "name": "tpot_ns", - "count": 100, - "total": 100.0, - "min": 0.0, - "max": 100.0, - "sum_sq": 0.0, - "percentiles": {"50.0": 5_000_000, "99.0": 12_000_000}, # 5 / 12 ms - "histogram": [], - }, - { - "type": "series", - "name": "osl", - "count": 100, - "total": 50_000.0, # 500 tokens × 100 samples - "min": 0.0, - "max": 1000.0, - "sum_sq": 0.0, - "percentiles": {"50.0": 500.0, "99.0": 1000.0}, - "histogram": [], - }, - ] - ) - d.attach_loadgen_snapshot(snap) - text = d.render().plain - lines = text.splitlines() - # tpot block: loadgen row carries p50 = 5 ms; trace row is em-dash - # all the way across (no per-token timings emitted as trace events). - assert "tpot (ms)" in text - tpot_block = [i for i, line in enumerate(lines) if "tpot (ms)" in line] - assert tpot_block, "expected a tpot (ms) row" - loadgen_line = lines[tpot_block[0]] - trace_line = lines[tpot_block[0] + 1] - assert "loadgen" in loadgen_line - assert "5.00" in loadgen_line # p50 = 5 ms - assert "trace" in trace_line - assert "—" in trace_line, trace_line - - tps_line = next(line for line in lines if "tok/s" in line) - # 50,000 tokens / 10 s = 5,000 tok/s - assert "5,000.0" in tps_line, tps_line - - def test_comparison_flags_drift(self) -> None: - # Loadgen says 1000 completed but trace only saw 500. Δ on - # the samples row should be -50%. - d = _dash() - for _ in range(500): - d.ingest_frames(_full_lifecycle(_new_sid())) - d.finalize_completed() - d.attach_loadgen_snapshot( - self._snapshot(completed=1000, tracked=1000, tracked_duration_ns=10**10) - ) - text = d.render().plain - samples_line = next( - line for line in text.splitlines() if "samples completed" in line - ) - assert "1,000" in samples_line # loadgen value - assert "500" in samples_line # trace value - assert "-50" in samples_line or "−50" in samples_line, samples_line - - def test_comparison_skips_rows_with_no_data(self) -> None: - # Streaming-off offline run: snapshot has no ttft / tpot / osl - # series. The ttft / tpot / tok-throughput rows must not render. - d = _dash() - for _ in range(100): - d.ingest_frames(_full_lifecycle(_new_sid())) - d.finalize_completed() - d.attach_loadgen_snapshot( - self._snapshot(completed=100, tracked=100, tracked_duration_ns=10**10) - ) - text = d.render().plain - assert "ttft (ms)" not in text - assert "tpot (ms)" not in text - assert "throughput (tok/s)" not in text - - def test_render_header_after_drops(self) -> None: - d = _dash() - d.ingest_frames(_frame(Event.ISSUED, _new_sid())) - # 17 bytes = 1 frame dropped - d.ingest_frames( - _frame(Event.TRACE_DROPS, _drop_sid(MAIN_PROC_LOOP_ID, FRAME_SIZE)) - ) - text = d.render().plain - assert "dropped" in text - dropped_line = next(line for line in text.splitlines() if "dropped" in line) - assert "1" in dropped_line - - def test_interleaved_burst(self) -> None: - # Realistic high-QPS pattern: ISSUED for new sids and COMPLETE - # for older sids interleaved. in-flight tracks the running - # difference accurately. - d = _dash() - live: list[int] = [] - for _ in range(100): - new = [_new_sid() for _ in range(10)] - live.extend(new) - d.ingest_frames(b"".join(_full_lifecycle(s) for s in new[:5])) - d.ingest_frames(b"".join(_inflight(s) for s in new[5:])) - d.finalize_completed() - d.finalize_completed() - # Five out of every ten are fully resolved (folded); the other - # five are written-but-not-complete and stay in-flight. - assert d.n_issued == 1000 - assert d.n_complete_seen == 500 - assert d.in_flight == 500 - assert d.n_complete_folded == 500 - assert d.stage_n("e2e") == 500 - - def test_complete_arriving_after_eviction_still_folds(self) -> None: - # USER'S BUG (real pattern): in offline burst the loadgen - # issues millions of requests in a tight loop while responses - # trickle in over many seconds. The render cycle fires while - # ingestion is still going — the lifecycle dict grows past - # MAX_INFLIGHT, eviction pops the still-in-flight ISSUED - # entries, *then* their COMPLETE eventually arrives, finds no - # ISSUED in stages, and the fold gate rejects them. - # Result: complete=N, stage N=tiny. - d = _dash() - # 1) Flood the dict with ISSUED-only entries until eviction - # kicks in. This mirrors the loadgen burst racing ahead of - # the server. - burst = [_new_sid() for _ in range(250_000)] - d.ingest_frames(b"".join(_frame(Event.ISSUED, s) for s in burst)) - # 2) Render — the old code would evict 150k partials here. - d.finalize_completed() - # 3) Now those same requests complete. With the broken - # behaviour, the lifecycle has no ISSUED, gets rejected, - # and stage N never grows. - for sid in burst[:500]: - d.ingest_frames( - b"".join( - _frame(ev, sid) - for ev in ( - Event.WORKER_RECEIVED, - Event.CONN_ACQUIRED, - Event.WRITTEN, - Event.RESPONSE_HEADERS, - Event.RESPONSE_BYTES, - Event.MAIN_RECEIVED, - Event.COMPLETE, - ) - ) - ) - d.finalize_completed() - d.finalize_completed() - assert d.n_complete_seen == 500 - assert ( - d.n_complete_folded == 500 - ), f"folded {d.n_complete_folded}/500 — eviction lost ISSUED context" - assert d.stage_n("e2e") == 500 - assert d.stage_n("backpressure") == 500 - - def test_huge_in_flight_does_not_starve_folds(self) -> None: - # USER'S BUG: at 980k in-flight + 21k complete, stage N is stuck - # at 2.5k. Reason was MAX_INFLIGHT-triggered eviction that ran - # *before* each request's COMPLETE arrived — so the ISSUED was - # popped from the dict, then COMPLETE arrived for a sid with - # no ISSUED in stages, missed the fold gate, and the request - # never made it into the stage histograms. - # - # Invariant: with N issued + M complete (M ≤ N), after - # ingesting + rendering, stage N must reach M (not get - # throttled by dict-size eviction). - d = _dash() - completed_sids = [_new_sid() for _ in range(500)] - outstanding_sids = [_new_sid() for _ in range(200_000)] - # Interleave issuance with completions to simulate the user's - # 980k-in-flight + 21k-complete scenario in miniature. - # Phase 1: issue all sids - d.ingest_frames(b"".join(_frame(Event.ISSUED, s) for s in completed_sids)) - d.ingest_frames(b"".join(_frame(Event.ISSUED, s) for s in outstanding_sids)) - # Phase 2: complete the first batch - for sid in completed_sids: - d.ingest_frames( - b"".join( - _frame(ev, sid) - for ev in ( - Event.WORKER_RECEIVED, - Event.CONN_ACQUIRED, - Event.WRITTEN, - Event.RESPONSE_HEADERS, - Event.RESPONSE_BYTES, - Event.MAIN_RECEIVED, - Event.COMPLETE, - ) - ) - ) - # Two render ticks should fold every completed lifecycle, no - # matter how many partial lifecycles are sitting alongside. - d.finalize_completed() - d.finalize_completed() - assert d.n_complete_seen == 500 - assert ( - d.n_complete_folded == 500 - ), f"folded only {d.n_complete_folded}/500 — eviction starved folds" - assert d.stage_n("e2e") == 500 - assert d.stage_n("backpressure") == 500 - - -@pytest.mark.unit -class TestTailIndicator: - """`is_tail` flips when ISSUED stops arriving and in_flight > 0.""" - - def test_false_before_any_issued(self) -> None: - d = _dash() - assert d.is_tail is False - - def test_false_while_issuance_active(self) -> None: - d = _dash() - d.ingest_frames(_frame(Event.ISSUED, _new_sid())) - # Same monotonic clock → quiet window is 0; not yet "tail". - assert d.is_tail is False - - def test_true_once_quiet_window_elapses(self) -> None: - from inference_endpoint.utils.trace_dashboard import _TAIL_QUIET_NS - - d = _dash() - sid = _new_sid() - d.ingest_frames(_inflight(sid)) - d._last_issued_ns = time.monotonic_ns() - _TAIL_QUIET_NS - 1 - assert d.in_flight == 1 - assert d.is_tail is True - - def test_done_once_activity_ceases(self) -> None: - # Activity-based: issuance quiet AND no ISSUED/COMPLETE for - # _DONE_QUIET_NS → is_done (and is_tail clears). Independent of - # in-flight, so it fires even when COMPLETE frames were dropped. - from inference_endpoint.utils.trace_dashboard import ( - _DONE_QUIET_NS, - _TAIL_QUIET_NS, - ) - - d = _dash() - d.ingest_frames(_inflight(_new_sid())) - now = time.monotonic_ns() - # Issuance quiet but a completion arrived recently → still draining. - d._last_issued_ns = now - _TAIL_QUIET_NS - 1 - assert d.is_tail is True - assert d.is_done is False - # No lifecycle event for the done window → run has stopped → done. - d._last_lifecycle_ns = now - _DONE_QUIET_NS - 1 - assert d.is_done is True - assert d.is_tail is False - - def test_header_shows_tail_chip(self) -> None: - from inference_endpoint.utils.trace_dashboard import _TAIL_QUIET_NS - - d = _dash() - d.ingest_frames(_inflight(_new_sid())) - d._last_issued_ns = time.monotonic_ns() - _TAIL_QUIET_NS - 1 - text = d.render().plain - assert "TAIL" in text - assert "draining" in text - - -@pytest.mark.unit -class TestBackpressure: - """``is_backpressured`` flips when the first stage (ISSUED → - CONN_ACQUIRED) takes ≥ _BACKPRESSURE_PCT of E2E.""" - - def _ingest(self, d: Dashboard, e2e_ns: int, first_stage_ns: int) -> None: - """Synthesise lifecycles with a given issue→conn_acquired gap.""" - for _ in range(200): - sid = _new_sid() - issued_ts = 1 - conn_ts = issued_ts + first_stage_ns - for ev, ts in ( - (Event.ISSUED, issued_ts), - (Event.WORKER_RECEIVED, issued_ts + first_stage_ns // 2), - (Event.CONN_ACQUIRED, conn_ts), - (Event.WRITTEN, conn_ts + 1), - (Event.RESPONSE_HEADERS, conn_ts + 2), - (Event.RESPONSE_BYTES, conn_ts + 3), - (Event.MAIN_RECEIVED, conn_ts + 4), - (Event.COMPLETE, issued_ts + e2e_ns), - ): - d.ingest_frames(_frame(ev, sid, ts)) - d.finalize_completed() - - def test_false_when_first_stage_below_threshold(self) -> None: - d = _dash() - # First stage = 2% of e2e — below 20%. - self._ingest(d, e2e_ns=1_000_000, first_stage_ns=20_000) - assert d.is_backpressured is False - - def test_true_when_first_stage_heavy(self) -> None: - d = _dash() - # First stage = 50% of e2e. - self._ingest(d, e2e_ns=1_000_000, first_stage_ns=500_000) - assert d.is_backpressured is True - - def test_true_survives_dropped_intermediate_frames(self) -> None: - # First stage folds from ISSUED + CONN_ACQUIRED endpoints only, - # so it triggers even when WORKER_RECEIVED frames are lost. - d = _dash() - for _ in range(50): - sid = _new_sid() - d.ingest_frames(_frame(Event.ISSUED, sid, 1)) - d.ingest_frames(_frame(Event.CONN_ACQUIRED, sid, 600_000)) - d.ingest_frames(_frame(Event.COMPLETE, sid, 1_000_000)) - d.finalize_completed() - assert d.is_backpressured is True - - def test_header_chip_shows_backpressure(self) -> None: - d = _dash() - self._ingest(d, e2e_ns=1_000_000, first_stage_ns=500_000) - text = d.render().plain - assert "BACKPRESSURE" in text - assert "(tcp)" not in text and "(worker)" not in text - - -@pytest.mark.unit -class TestPerfStartReset: - """PERF_START drops warmup state so LOADGEN vs TRACE aligns with - loadgen's tracked window.""" - - def test_metrics_and_counters_reset_on_perf_start(self) -> None: - d = _dash() - for _ in range(50): - d.ingest_frames(_full_lifecycle(_new_sid())) - d.finalize_completed() - assert d.n_issued == 50 - assert d.n_complete_seen == 50 - assert d.stage_n("e2e") == 50 - - # Phase boundary marker. sid=0; ts irrelevant. - d.ingest_frames(_frame(Event.PERF_START, 0)) - assert d.n_issued == 0 - assert d.n_complete_seen == 0 - assert d.stage_n("e2e") == 0 - assert d.in_flight == 0 - - def test_loop_lag_survives_reset(self) -> None: - # Loop lag is per-worker process health, not per-request — it - # must not get wiped by a phase boundary. - d = _dash() - d.ingest_frames(_frame(Event.LOOP_LAG, _loop_lag_sid(0, 1_000_000))) - d.ingest_frames(_frame(Event.PERF_START, 0)) - assert 0 in d._loop_lag - assert d._loop_lag[0].total == 1 - - def test_warmup_request_completing_after_perf_start_not_counted(self) -> None: - # A warmup request: ISSUED before PERF_START (cleared by reset), - # COMPLETE after. Its COMPLETE must not bleed into the perf - # window's counters or fold into the stage histograms. - d = _dash() - warmup_sid = _new_sid() - d.ingest_frames(_frame(Event.ISSUED, warmup_sid)) - d.ingest_frames(_frame(Event.PERF_START, 0)) # clears the ISSUED - # Perf-phase request, fully traced. - perf_sid = _new_sid() - d.ingest_frames(_full_lifecycle(perf_sid)) - # Warmup request's late COMPLETE lands now. - d.ingest_frames(_frame(Event.COMPLETE, warmup_sid)) - d.finalize_completed() - assert d.n_issued == 1 # only the perf request - assert d.n_complete_seen == 1 # warmup COMPLETE excluded - assert d.stage_n("e2e") == 1 - assert d.in_flight == 0 - - -@pytest.mark.unit -class TestErrorsCounter: - """``errors`` row in LOADGEN vs TRACE picks up tracked_samples_failed.""" - - def test_errors_row_renders_failure_count(self) -> None: - d = _dash() - d.ingest_frames(_full_lifecycle(_new_sid())) - d.finalize_completed() - d.attach_loadgen_snapshot( - { - "counter": 0, - "timestamp_ns": 0, - "state": "complete", - "n_pending_tasks": 0, - "metrics": [ - { - "type": "counter", - "name": "total_samples_completed", - "value": 100, - }, - { - "type": "counter", - "name": "tracked_samples_completed", - "value": 100, - }, - { - "type": "counter", - "name": "tracked_samples_failed", - "value": 7, - }, - { - "type": "counter", - "name": "tracked_duration_ns", - "value": 10**10, - }, - { - "type": "counter", - "name": "total_duration_ns", - "value": 10**10, - }, - ], - } - ) - text = d.render().plain - errors_line = next(line for line in text.splitlines() if "errors" in line) - assert "7" in errors_line - - def test_errors_row_skipped_when_zero(self) -> None: - d = _dash() - d.ingest_frames(_full_lifecycle(_new_sid())) - d.finalize_completed() - d.attach_loadgen_snapshot( - { - "counter": 0, - "timestamp_ns": 0, - "state": "complete", - "n_pending_tasks": 0, - "metrics": [ - { - "type": "counter", - "name": "tracked_samples_completed", - "value": 100, - }, - { - "type": "counter", - "name": "tracked_duration_ns", - "value": 10**10, - }, - ], - } - ) - text = d.render().plain - for line in text.splitlines(): - assert "errors" not in line, line