Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 0 additions & 214 deletions scripts/trace_dashboard.py

This file was deleted.

67 changes: 7 additions & 60 deletions src/inference_endpoint/commands/benchmark/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import asyncio
import json
import logging
import os
import random
import shutil
import signal
Expand Down Expand Up @@ -97,7 +96,6 @@
SessionResult,
)
from inference_endpoint.metrics.report import Report
from inference_endpoint.utils import trace

transformers_logging.set_verbosity_error()

Expand Down Expand Up @@ -567,8 +565,6 @@ async def _run_benchmark_async(
config = ctx.config
session_id = f"cli_benchmark_{uuid.uuid4().hex[:8]}"

trace.start_lag_task(loop) # -vvv: main-proc loop-lag sampler

# Progress bar + response collector
pbar = tqdm(
desc=f"{config.model_params.name} (Streaming: {ctx.enable_streaming})",
Expand Down Expand Up @@ -616,47 +612,6 @@ async def _run_benchmark_async(
)
metrics_subscriber.start()

# -vvv: snapshot sidecar tap → dashboard's LOADGEN vs TRACE.
#
# The provider reads ``metrics_subscriber.latest``, which is only
# advanced by the subscriber's ``process()`` task scheduled off an
# ``add_reader`` callback on THIS loop. Under a saturated 30k+ req/s
# run the loop starves that callback, so ``latest`` freezes and the
# dashboard's live LOADGEN panel goes stale (the tap thread keeps
# re-writing the same frozen snapshot). The FINAL panel is NOT
# affected: trace.teardown() writes the authoritative
# ``state == "complete"`` snapshot to the same sidecar before it
# closes the FIFO, and the dashboard reader now prefers true FIFO
# EOF (all writers closed) before its final-snapshot poll — so the
# closing comparison always reflects ``final_snapshot.json``.
#
# TODO(trace-live-staleness): decouple the live feed from this loop
# with a trace-local second SUB, added purely here + as a helper in
# utils/trace.py (no edits to subscriber.py/publisher.py/APIs):
# * In a daemon thread, create an independent ``zmq.Context`` and a
# blocking SUB with ``CONFLATE=1`` + ``SUBSCRIBE b""`` connected
# to ``ipc://{zmq_ctx.socket_dir}/{metrics_socket_name}`` (the
# same endpoint the aggregator binds; a PUB fans out to all SUBs,
# so the existing in-process subscriber is unaffected).
# * Per recv: strip the ``TOPIC_FRAME_SIZE`` topic prefix and
# handle the ``BATCH_TOPIC`` batch frame (mirror
# ``ZmqMessageSubscriber.receive``), then
# ``MetricsSnapshotCodec().decode`` + ``snapshot_to_dict`` and
# hand the dict to the tap. A blocking recv in its own thread is
# immune to main-loop starvation.
# Deferred for now: it would duplicate the transport's private
# framing (TOPIC_FRAME_SIZE / BATCH_TOPIC / single-vs-batch layout)
# outside the transport layer, which is a hidden coupling to a
# non-public contract — out of scope for a surgical in-bounds fix.
trace.start_snapshot_tap(
loop,
lambda: (
snapshot_to_dict(metrics_subscriber.latest)
if metrics_subscriber.latest is not None
else None
),
)

# Launch service subprocesses
launcher = ServiceLauncher(zmq_ctx)
aggregator_args: list[str] = [
Expand Down Expand Up @@ -725,9 +680,6 @@ async def _run_benchmark_async(
api_key=config.endpoint_config.api_key,
event_logs_dir=ctx.report_dir,
cpu_affinity=ctx.affinity_plan,
trace_pipe_path=(
trace.fifo_path(os.getpid()) if trace.is_enabled() else None
),
)
http_client = await HTTPEndpointClient.create(http_config, loop)
issuer = HttpClientSampleIssuer(http_client)
Expand Down Expand Up @@ -812,15 +764,13 @@ def _on_global_timeout() -> None:

def _on_phase_start(phase: PhaseConfig) -> None:
nonlocal global_timeout_handle
if phase.phase_type == PhaseType.PERFORMANCE:
# -vvv: signal the dashboard that warmup is over so its
# LOADGEN vs TRACE accumulators reset to the same
# window the loadgen aggregator's tracked counters use.
trace.emit_trace(trace.Event.PERF_START, 0)
if max_duration_ms is not None:
global_timeout_handle = loop.call_later(
max_duration_ms / 1000.0, _on_global_timeout
)
if (
phase.phase_type == PhaseType.PERFORMANCE
and max_duration_ms is not None
):
global_timeout_handle = loop.call_later(
max_duration_ms / 1000.0, _on_global_timeout
)

loop.add_signal_handler(signal.SIGINT, session.stop)
try:
Expand Down Expand Up @@ -881,9 +831,6 @@ def _on_phase_start(phase: PhaseConfig) -> None:
except Exception as e: # noqa: BLE001 — best-effort report build.
logger.warning(f"Failed to build report from snapshot: {e}")

# Cancel trace tasks, flush emitter, unlink FIFO, write final
# snapshot. No-op when trace was never enabled.
await trace.teardown(final_snapshot=snap_dict)
metrics_subscriber.close()
pbar.close()

Expand Down
5 changes: 0 additions & 5 deletions src/inference_endpoint/endpoint_client/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,6 @@ class HTTPClientConfig(WithUpdatesMixin, BaseModel):
default=None, exclude=True
)

# Trace FIFO path, set by trace.bootstrap under -vvv.
trace_pipe_path: Annotated[str | None, cyclopts.Parameter(parse=False)] = Field(
default=None, exclude=True
)

# CPU affinity plan for worker processes (computed by caller, e.g. benchmark command).
# None = disabled (no worker pinning)
cpu_affinity: Annotated[AffinityPlan | None, cyclopts.Parameter(parse=False)] = (
Expand Down
5 changes: 2 additions & 3 deletions src/inference_endpoint/endpoint_client/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -791,14 +791,13 @@ def build_request(

@dataclass(slots=True)
class InFlightRequest:
"""State for a single HTTP request through its lifecycle.
"""State for a single HTTP request through its lifecycle:

Attributes:
query_id: Correlates response back to original Query.
http_bytes: Serialized HTTP request for socket.write().
is_streaming: Whether this is a streaming (SSE) request or not.
connection: PooledConnection assigned to this request (set once
request is fired).
connection: PooledConnection assigned to this request (set once request is fired).
"""

query_id: str
Expand Down
Loading
Loading