diff --git a/examples/python/anthropic_quickstart.py b/examples/python/anthropic_quickstart.py new file mode 100644 index 0000000..7af2590 --- /dev/null +++ b/examples/python/anthropic_quickstart.py @@ -0,0 +1,84 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2026 SecureAgentics +# +# Licensed under the Apache Licence, Version 2.0 (the "Licence"). +# You may not use this file except in compliance with the Licence. +# A copy of the Licence is included at LICENSE in the repository root. +"""Minimal quickstart: monitor Anthropic API calls with Adrian. + +Run:: + + export ANTHROPIC_API_KEY="sk-ant-..." + export ADRIAN_API_KEY="..." # optional -- omit to collect locally only + python examples/python/anthropic_quickstart.py +""" + +from __future__ import annotations + +import asyncio +import os + +import anthropic +import adrian + +# ------------------------------------------------------------------ +# 1. Initialise Adrian. This auto-instruments Anthropic by default. +# ------------------------------------------------------------------ +adrian.init( + api_key=os.environ.get("ADRIAN_API_KEY", ""), + session_id="anthropic-quickstart-session", +) + +# ------------------------------------------------------------------ +# 2. Create an Anthropic client as normal. +# ------------------------------------------------------------------ +client = anthropic.AsyncAnthropic(api_key=os.environ["ANTHROPIC_API_KEY"]) + + +async def main() -> None: + print("Sending first request...") + + # ------------------------------------------------------------------ + # 3. Wrap related calls in an invocation context so Adrian groups them. + # ------------------------------------------------------------------ + async with adrian.anthropic_invocation(): + response = await client.messages.create( + model="claude-haiku-4-5-20251001", + max_tokens=256, + system="You are a concise assistant.", + messages=[{"role": "user", "content": "What is 2 + 2? Answer in one sentence."}], + ) + + text = next( + (block.text for block in response.content if hasattr(block, "text")), + "", + ) + print(f"Model says: {text}") + + # A second call in the same invocation -- same invocation_id in Adrian. + follow_up = await client.messages.create( + model="claude-haiku-4-5-20251001", + max_tokens=256, + system="You are a concise assistant.", + messages=[ + {"role": "user", "content": "What is 2 + 2? Answer in one sentence."}, + {"role": "assistant", "content": text}, + {"role": "user", "content": "Now multiply that result by 10."}, + ], + ) + + follow_text = next( + (block.text for block in follow_up.content if hasattr(block, "text")), + "", + ) + print(f"Follow-up: {follow_text}") + + # ------------------------------------------------------------------ + # 4. Always shut down Adrian cleanly to flush any pending events. + # ------------------------------------------------------------------ + await adrian.shutdown() + print("Done. Check your Adrian dashboard for the captured events.") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/sdk/python/adrian/__init__.py b/sdk/python/adrian/__init__.py index 3c3f72a..6b06a28 100644 --- a/sdk/python/adrian/__init__.py +++ b/sdk/python/adrian/__init__.py @@ -35,6 +35,7 @@ from langchain_core.runnables.base import Runnable from langchain_core.runnables.config import ensure_config +from adrian.anthropic_handler import anthropic_invocation, anthropic_invocation_sync from adrian.config import ( AdrianConfig, OnAuditCallback, @@ -101,6 +102,9 @@ "__version__", "mcp_servers", "redact_text", + "patch_anthropic", + "anthropic_invocation", + "anthropic_invocation_sync", ] logger = logging.getLogger("adrian") @@ -341,6 +345,7 @@ def init( if auto_instrument: _auto_instrument_langchain() + _auto_instrument_anthropic() # MCP server tracking is independent of LangChain auto-instrumentation, # it observes a different library (langchain-mcp-adapters) and is the @@ -374,6 +379,26 @@ def shutdown() -> None: set_config(None) +def patch_anthropic() -> None: + """Apply Anthropic SDK instrumentation. + + Monkey-patches ``anthropic.Anthropic`` and ``anthropic.AsyncAnthropic`` so + that every ``messages.create`` call is captured as an Adrian ``PairedEvent``. + Called automatically by :func:`init` when ``auto_instrument=True``. + + Call explicitly only when ``auto_instrument=False``:: + + adrian.init(api_key="...", auto_instrument=False) + adrian.patch_anthropic() + """ + from adrian.anthropic_handler import patch_anthropic as _patch + + _patch( + hooks_getter=lambda: _hooks, + config_getter=lambda: get_config() if is_initialized() else None, + ) + + def get_handler() -> AdrianCallbackHandler | None: """Return the SDK's callback handler, or ``None`` if uninitialised. @@ -509,6 +534,15 @@ def _inject_callbacks(config: Any) -> Any: # noqa: ANN401 # ------------------------------------------------------------------ +def _auto_instrument_anthropic() -> None: + """Apply Anthropic SDK monkey-patches if the package is installed.""" + try: + patch_anthropic() + logger.debug("Anthropic auto-instrumentation applied") + except Exception: + logger.exception("Anthropic auto-instrumentation failed") + + def _auto_instrument_langchain() -> None: """Apply all monkey-patches to LangChain / LangGraph.""" try: diff --git a/sdk/python/adrian/anthropic_handler.py b/sdk/python/adrian/anthropic_handler.py new file mode 100644 index 0000000..f068f0f --- /dev/null +++ b/sdk/python/adrian/anthropic_handler.py @@ -0,0 +1,559 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2026 SecureAgentics +# +# Licensed under the Apache Licence, Version 2.0 (the "Licence"). +# You may not use this file except in compliance with the Licence. +# A copy of the Licence is included at LICENSE in the repository root. +"""Anthropic SDK instrumentation for Adrian. + +Patches ``anthropic.Anthropic`` and ``anthropic.AsyncAnthropic`` so that every +``messages.create`` call is captured as an Adrian ``PairedEvent`` and emitted +through the hook registry. The patch is idempotent; calling +:func:`patch_anthropic` again after a shutdown / re-init only updates the +internal getters, it does not re-wrap the already-patched method. + +Usage without auto-instrumentation:: + + import anthropic + import adrian + + adrian.init(api_key="...", auto_instrument=False) + adrian.patch_anthropic() + + client = anthropic.AsyncAnthropic() + response = await client.messages.create(model="...", ...) + +To group multi-turn calls under a single invocation ID:: + + async with adrian.anthropic_invocation(): + r1 = await client.messages.create(...) + r2 = await client.messages.create(...) # same invocation_id as r1 +""" + +# pyright: reportUnknownVariableType=false +# pyright: reportUnknownMemberType=false +# pyright: reportUnknownArgumentType=false +# pyright: reportUnknownLambdaType=false + +from __future__ import annotations + +import asyncio +import logging +from collections.abc import Callable +from contextlib import asynccontextmanager, contextmanager +from datetime import UTC, datetime +from typing import TYPE_CHECKING, Any +from uuid import uuid4 + +from adrian.config import AdrianConfig +from adrian.context import get_invocation_id, set_invocation_id +from adrian.format.types import AgentContext, LlmPairData, PairedEvent +from adrian.hooks import HookRegistry +from adrian.types import ChatMessage, EventData, TokenUsage, ToolCallRecord + +if TYPE_CHECKING: + from contextvars import Token + +logger = logging.getLogger("adrian.anthropic") + +# Set once by patch_anthropic(); read at call time so shutdown + re-init works. +_hooks_getter: Callable[[], HookRegistry | None] | None = None +_config_getter: Callable[[], AdrianConfig | None] | None = None + + +# ------------------------------------------------------------------ +# Message format conversion +# ------------------------------------------------------------------ + + +def _flatten_content(content: Any) -> str: # noqa: ANN401 + """Flatten Anthropic message content to a plain string. + + Anthropic messages carry either a plain string or a list of content + blocks (``TextBlockParam``, ``ToolUseBlockParam``, ``ToolResultBlockParam``, + and so on). Both forms are normalised to a plain string for + ``ChatMessage.content``. + + Args: + content: Anthropic message content -- a string or a block list. + + Returns: + Plain string representation. + """ + if isinstance(content, str): + return content + + if not isinstance(content, list): + return str(content) + + parts: list[str] = [] + + for block in content: + if hasattr(block, "type"): + # SDK typed objects (TextBlock, ToolUseBlock, ToolResultBlock, …) + btype = block.type + + if btype == "text": + parts.append(getattr(block, "text", "")) + elif btype == "tool_use": + name = getattr(block, "name", "unknown") + args = getattr(block, "input", {}) + parts.append(f"[tool_use: {name} args={args}]") + elif btype == "tool_result": + inner = getattr(block, "content", "") + parts.append(_flatten_content(inner)) + elif isinstance(block, dict): + btype = block.get("type", "") + + if btype == "text": + parts.append(str(block.get("text", ""))) + elif btype == "tool_use": + name = block.get("name", "unknown") + args = block.get("input", {}) + parts.append(f"[tool_use: {name} args={args}]") + elif btype == "tool_result": + inner = block.get("content", "") + parts.append(_flatten_content(inner)) + + return "\n".join(p for p in parts if p) + + +def _flatten_anthropic_messages( + messages: list[dict[str, Any]], + system: str | list[Any] | None, +) -> list[ChatMessage]: + """Convert Anthropic message params to a flat ``ChatMessage`` list. + + Prepends the system prompt (if any) as a ``"system"`` role entry, + then converts each user / assistant turn in order. + + Args: + messages: Anthropic ``messages`` parameter -- a list of dicts with + ``role`` and ``content`` keys. + system: Anthropic ``system`` parameter -- a string, a block list, or + ``None``. + + Returns: + Flat list of ``ChatMessage`` dicts compatible with the Adrian format. + """ + result: list[ChatMessage] = [] + + if system is not None: + result.append(ChatMessage(role="system", content=_flatten_content(system))) + + for msg in messages: + role = str(msg.get("role", "unknown")) + content = _flatten_content(msg.get("content", "")) + result.append(ChatMessage(role=role, content=content)) + + return result + + +def _extract_anthropic_tool_calls(content: list[Any]) -> list[ToolCallRecord]: + """Extract tool call records from an Anthropic response content list. + + Scans for ``ToolUseBlock`` SDK objects or ``tool_use`` dicts and converts + each to a ``ToolCallRecord``. + + Args: + content: ``Message.content`` from the Anthropic response. + + Returns: + List of ``ToolCallRecord`` dicts, empty when no tool calls are present. + """ + records: list[ToolCallRecord] = [] + + for block in content: + if hasattr(block, "type") and block.type == "tool_use": + args = getattr(block, "input", {}) + + if not isinstance(args, dict): + try: + args = dict(args) + except (TypeError, ValueError): + args = {} + + records.append( + ToolCallRecord( + id=str(getattr(block, "id", "")), + name=str(getattr(block, "name", "unknown")), + args=args, + ) + ) + elif isinstance(block, dict) and block.get("type") == "tool_use": + args = block.get("input", {}) + + if not isinstance(args, dict): + args = {} + + records.append( + ToolCallRecord( + id=str(block.get("id", "")), + name=str(block.get("name", "unknown")), + args=args, + ) + ) + + return records + + +def _extract_anthropic_usage(response: Any) -> TokenUsage | None: + """Extract token usage from an Anthropic ``Message`` response object. + + Args: + response: ``anthropic.types.Message`` or any object with a ``usage`` + attribute carrying ``input_tokens`` and ``output_tokens``. + + Returns: + ``TokenUsage`` TypedDict, or ``None`` if usage data is absent. + """ + usage = getattr(response, "usage", None) + + if usage is None: + return None + + input_tokens: int = getattr(usage, "input_tokens", 0) or 0 + output_tokens: int = getattr(usage, "output_tokens", 0) or 0 + + return TokenUsage( + prompt_tokens=input_tokens, + completion_tokens=output_tokens, + total_tokens=input_tokens + output_tokens, + ) + + +def _extract_response_text(content: list[Any]) -> str: + """Extract plain text output from an Anthropic response content list. + + Args: + content: ``Message.content`` from the Anthropic response. + + Returns: + Concatenated text from all ``TextBlock`` entries, joined by newlines. + """ + parts: list[str] = [] + + for block in content: + if hasattr(block, "type") and block.type == "text": + parts.append(getattr(block, "text", "")) + elif isinstance(block, dict) and block.get("type") == "text": + parts.append(str(block.get("text", ""))) + + return "\n".join(p for p in parts if p) + + +def _derive_agent_id(messages: list[ChatMessage]) -> str: + """Derive a stable agent identity from the system prompt. + + Without LangGraph checkpoint metadata, the system prompt is the primary + signal for agent identity. Returns ``"default"`` when no system message + is present. + + Args: + messages: Flattened message list, may contain a ``"system"`` entry. + + Returns: + Agent identifier string (at most 64 characters). + """ + for msg in messages: + if msg.get("role") == "system": + content = msg["content"].strip() + + if content: + return content[:64].replace("\n", " ") + + return "default" + + +# ------------------------------------------------------------------ +# PairedEvent assembly +# ------------------------------------------------------------------ + + +def build_anthropic_llm_pair( + *, + flat_messages: list[ChatMessage], + response: Any, + model: str, + session_id: str, + invocation_id: str, + run_id: str, +) -> PairedEvent: + """Assemble a ``PairedEvent`` from an Anthropic ``messages.create`` call. + + Args: + flat_messages: Converted input messages (includes system prompt at index 0 + when present). + response: Raw ``anthropic.types.Message`` response object. + model: Model identifier from the request parameters. + session_id: Adrian session identifier. + invocation_id: Invocation correlation ID. + run_id: Per-call unique identifier generated by the patch. + + Returns: + Assembled ``PairedEvent`` with ``pair_type="llm"``. + """ + system_prompt = "" + user_instruction = "" + + for msg in flat_messages: + if msg.get("role") == "system" and not system_prompt: + system_prompt = msg["content"] + + for msg in reversed(flat_messages): + if msg.get("role") == "user": + user_instruction = msg["content"] + break + + content: list[Any] = getattr(response, "content", []) + output_text = _extract_response_text(content) + tool_calls = _extract_anthropic_tool_calls(content) + usage = _extract_anthropic_usage(response) + + # Prefer the model identifier echoed by the server; fall back to the request param. + response_model: str = getattr(response, "model", "") or model + + return PairedEvent( + event_id=str(uuid4()), + invocation_id=invocation_id, + session_id=session_id, + run_id=run_id, + timestamp=datetime.now(UTC).isoformat(), + pair_type="llm", + agent=AgentContext( + agent_id=_derive_agent_id(flat_messages), + system_prompt=system_prompt, + user_instruction=user_instruction, + ), + parent=None, + data=LlmPairData( + model=response_model, + messages=flat_messages, + output=output_text, + tool_calls=tool_calls, + usage=usage, + ), + ) + + +# ------------------------------------------------------------------ +# Emission helpers +# ------------------------------------------------------------------ + + +async def _emit_pair(response: Any, kwargs: dict[str, Any]) -> None: + """Assemble and emit a ``PairedEvent`` for a completed ``messages.create`` call. + + Reads hooks and config at call time so the correct state is used even if + :func:`~adrian.shutdown` and :func:`~adrian.init` have been called since + the patch was applied. + + Args: + response: Anthropic ``Message`` response object. + kwargs: Original ``messages.create`` keyword arguments. + """ + if _hooks_getter is None or _config_getter is None: + return + + hooks = _hooks_getter() + config = _config_getter() + + if hooks is None or config is None: + return + + try: + session_id = config.session_id + messages_param: list[dict[str, Any]] = list(kwargs.get("messages") or []) + system_param: str | list[Any] | None = kwargs.get("system") + model_param: str = str(kwargs.get("model", "unknown")) + + flat_messages = _flatten_anthropic_messages(messages_param, system_param) + invocation_id = get_invocation_id() or "no_invocation" + run_id = str(uuid4()) + + pair = build_anthropic_llm_pair( + flat_messages=flat_messages, + response=response, + model=model_param, + session_id=session_id, + invocation_id=invocation_id, + run_id=run_id, + ) + + await hooks.emit(pair) + + if config.on_event is not None: + from typing import cast + + result = config.on_event( + pair.pair_type, + cast(EventData, pair.data), + pair.run_id, + None, + pair.event_id, + ) + + if asyncio.iscoroutine(result): + await result + + except Exception: + logger.exception("Failed to emit Anthropic paired event") + + +def _schedule_emit(response: Any, kwargs: dict[str, Any]) -> None: + """Schedule event emission from a synchronous call site. + + When inside a running event loop, schedules a fire-and-forget task so + the sync caller is not blocked. When no loop is running, blocks until + emission completes so the event is not silently dropped. + + Args: + response: Anthropic ``Message`` response object. + kwargs: Original ``messages.create`` keyword arguments. + """ + coro = _emit_pair(response, kwargs) + + try: + loop = asyncio.get_running_loop() + loop.create_task(coro) + except RuntimeError: + try: + asyncio.run(coro) + except Exception: + logger.exception("Failed to emit Anthropic event (sync path)") + + +# ------------------------------------------------------------------ +# SDK patching +# ------------------------------------------------------------------ + + +def patch_anthropic( + hooks_getter: Callable[[], HookRegistry | None], + config_getter: Callable[[], AdrianConfig | None], +) -> None: + """Monkey-patch ``anthropic.Anthropic`` and ``anthropic.AsyncAnthropic``. + + Wraps ``messages.create`` on both the sync and async Anthropic resource + classes so every API call is captured as an Adrian ``PairedEvent``. + + The patch is idempotent: subsequent calls update the internal getters but + do not re-wrap the already-patched method. If the ``anthropic`` package is + not installed the call is a silent no-op. + + This function is called automatically by :func:`~adrian.init` when + ``auto_instrument=True`` (the default). + + Args: + hooks_getter: Zero-arg callable returning the current ``HookRegistry``, + or ``None`` when the SDK is not initialised. + config_getter: Zero-arg callable returning the current ``AdrianConfig``, + or ``None`` when the SDK is not initialised. + """ + global _hooks_getter, _config_getter # noqa: PLW0603 + + _hooks_getter = hooks_getter + _config_getter = config_getter + + try: + from anthropic.resources.messages import AsyncMessages, Messages + except ImportError: + logger.debug("anthropic package not installed; skipping Anthropic patching") + return + + # ---- sync Messages.create ---- + try: + sync_cls = Messages + + if not getattr(sync_cls, "_adrian_patched", False): + _original_sync = sync_cls.create + + def _patched_sync_create( + self: Any, + *args: Any, + **kwargs: Any, # noqa: ANN401 + ) -> Any: # noqa: ANN401 + response = _original_sync(self, *args, **kwargs) + _schedule_emit(response, kwargs) + return response + + sync_cls.create = _patched_sync_create # type: ignore[method-assign] + sync_cls._adrian_patched = True # type: ignore[attr-defined] + logger.debug("Patched anthropic.resources.Messages.create") + except AttributeError: + logger.warning( + "Could not patch anthropic.resources.Messages; " + "the SDK structure may have changed" + ) + + # ---- async AsyncMessages.create ---- + try: + async_cls = AsyncMessages + + if not getattr(async_cls, "_adrian_patched", False): + _original_async = async_cls.create + + async def _patched_async_create( + self: Any, + *args: Any, + **kwargs: Any, # noqa: ANN401 + ) -> Any: # noqa: ANN401 + response = await _original_async(self, *args, **kwargs) + await _emit_pair(response, kwargs) + return response + + async_cls.create = _patched_async_create # type: ignore[method-assign] + async_cls._adrian_patched = True # type: ignore[attr-defined] + logger.debug("Patched anthropic.resources.AsyncMessages.create") + except AttributeError: + logger.warning( + "Could not patch anthropic.resources.AsyncMessages; " + "the SDK structure may have changed" + ) + + +# ------------------------------------------------------------------ +# Invocation context managers +# ------------------------------------------------------------------ + + +@asynccontextmanager +async def anthropic_invocation(): # type: ignore[return] + """Group async Anthropic API calls under a single invocation ID. + + Sets the ``invocation_id`` context variable so all ``messages.create`` + calls within the block share the same ID, enabling multi-turn agent + conversations to be correlated in the Adrian dashboard. + + Usage:: + + async with adrian.anthropic_invocation(): + r1 = await client.messages.create(...) + r2 = await client.messages.create(...) # same invocation_id as r1 + """ + token: Token[str | None] = set_invocation_id(str(uuid4())) + + try: + yield + finally: + token.var.reset(token) + + +@contextmanager +def anthropic_invocation_sync(): # type: ignore[return] + """Group synchronous Anthropic API calls under a single invocation ID. + + The sync counterpart to :func:`anthropic_invocation`. + + Usage:: + + with adrian.anthropic_invocation_sync(): + r1 = client.messages.create(...) + r2 = client.messages.create(...) # same invocation_id as r1 + """ + token: Token[str | None] = set_invocation_id(str(uuid4())) + + try: + yield + finally: + token.var.reset(token) diff --git a/sdk/python/pyproject.toml b/sdk/python/pyproject.toml index 77d4caf..1505d25 100644 --- a/sdk/python/pyproject.toml +++ b/sdk/python/pyproject.toml @@ -41,6 +41,9 @@ dependencies = [ ] [project.optional-dependencies] +anthropic = [ + "anthropic>=0.40.0", +] dev = [ "pytest>=8.0.0", "pytest-asyncio>=0.24.0", @@ -51,6 +54,7 @@ dev = [ "langgraph==1.1.2", "langgraph-prebuilt==1.0.8", "langchain-mcp-adapters>=0.2.2", + "anthropic>=0.40.0", ] [project.urls] diff --git a/sdk/python/tests/test_anthropic_handler.py b/sdk/python/tests/test_anthropic_handler.py new file mode 100644 index 0000000..e2dc1ba --- /dev/null +++ b/sdk/python/tests/test_anthropic_handler.py @@ -0,0 +1,738 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2026 SecureAgentics +# +# Licensed under the Apache Licence, Version 2.0 (the "Licence"). +# You may not use this file except in compliance with the Licence. +# A copy of the Licence is included at LICENSE in the repository root. +"""Tests for the Anthropic SDK instrumentation (adrian.anthropic_handler).""" + +from __future__ import annotations + +from typing import Any +from unittest.mock import MagicMock + +import adrian.anthropic_handler as _ah +import pytest +from adrian.anthropic_handler import ( + _derive_agent_id, + _emit_pair, + _extract_anthropic_tool_calls, + _extract_anthropic_usage, + _extract_response_text, + _flatten_anthropic_messages, + _flatten_content, + anthropic_invocation, + anthropic_invocation_sync, + build_anthropic_llm_pair, + patch_anthropic, +) +from adrian.config import AdrianConfig +from adrian.context import get_invocation_id, set_invocation_id +from adrian.format.types import LlmPairData, PairedEvent +from adrian.hooks import HookRegistry +from adrian.types import ChatMessage + +# ------------------------------------------------------------------ +# Shared helpers +# ------------------------------------------------------------------ + + +class _Collector: + """Minimal EventHandler that accumulates paired events.""" + + def __init__(self) -> None: + self.events: list[PairedEvent] = [] + + async def on_paired_event(self, event: PairedEvent) -> None: + self.events.append(event) + + async def close(self) -> None: + return None + + +def _make_text_response( + *, + model: str = "claude-opus-4-6", + text: str = "Hello!", + input_tokens: int = 10, + output_tokens: int = 5, +) -> MagicMock: + """Build a minimal mock Anthropic Message response.""" + text_block = MagicMock() + text_block.type = "text" + text_block.text = text + + usage = MagicMock() + usage.input_tokens = input_tokens + usage.output_tokens = output_tokens + + response = MagicMock() + response.model = model + response.content = [text_block] + response.usage = usage + + return response + + +def _wired_hooks(config: AdrianConfig) -> tuple[HookRegistry, _Collector]: + """Return a HookRegistry + Collector pair and wire them into the handler.""" + collector = _Collector() + hooks = HookRegistry() + hooks.register(collector) + _ah._hooks_getter = lambda: hooks + _ah._config_getter = lambda: config + return hooks, collector + + +# ------------------------------------------------------------------ +# _flatten_content +# ------------------------------------------------------------------ + + +class TestFlattenContent: + def test_plain_string_passthrough(self) -> None: + assert _flatten_content("hello world") == "hello world" + + def test_non_list_non_str_coerced(self) -> None: + assert _flatten_content(42) == "42" # type: ignore[arg-type] + + def test_text_block_dict(self) -> None: + result = _flatten_content([{"type": "text", "text": "hi"}]) + assert result == "hi" + + def test_tool_use_dict(self) -> None: + blocks = [{"type": "tool_use", "name": "search", "input": {"q": "test"}}] + result = _flatten_content(blocks) + assert "tool_use: search" in result + + def test_tool_result_string_dict(self) -> None: + blocks = [{"type": "tool_result", "content": "42"}] + assert _flatten_content(blocks) == "42" + + def test_sdk_text_object(self) -> None: + block = MagicMock() + block.type = "text" + block.text = "SDK text" + assert _flatten_content([block]) == "SDK text" + + def test_sdk_tool_use_object(self) -> None: + block = MagicMock() + block.type = "tool_use" + block.name = "my_tool" + block.input = {"x": 1} + result = _flatten_content([block]) + assert "tool_use: my_tool" in result + + def test_sdk_tool_result_delegates_recursively(self) -> None: + inner = MagicMock() + inner.type = "text" + inner.text = "inner text" + outer = MagicMock() + outer.type = "tool_result" + outer.content = [inner] + result = _flatten_content([outer]) + assert "inner text" in result + + def test_mixed_blocks_joined_by_newline(self) -> None: + blocks = [ + {"type": "text", "text": "first"}, + {"type": "text", "text": "second"}, + ] + result = _flatten_content(blocks) + assert result == "first\nsecond" + + def test_empty_list(self) -> None: + assert _flatten_content([]) == "" + + +# ------------------------------------------------------------------ +# _flatten_anthropic_messages +# ------------------------------------------------------------------ + + +class TestFlattenAnthropicMessages: + def test_no_system(self) -> None: + msgs = [{"role": "user", "content": "hi"}] + result = _flatten_anthropic_messages(msgs, None) + assert len(result) == 1 + assert result[0]["role"] == "user" + assert result[0]["content"] == "hi" + + def test_system_prepended_as_first_entry(self) -> None: + msgs = [{"role": "user", "content": "hello"}] + result = _flatten_anthropic_messages(msgs, "You are helpful.") + assert result[0]["role"] == "system" + assert result[0]["content"] == "You are helpful." + assert result[1]["role"] == "user" + + def test_system_as_block_list(self) -> None: + system = [{"type": "text", "text": "block system"}] + result = _flatten_anthropic_messages([], system) + assert result[0]["role"] == "system" + assert result[0]["content"] == "block system" + + def test_assistant_role_preserved(self) -> None: + msgs = [ + {"role": "user", "content": "q"}, + {"role": "assistant", "content": "a"}, + ] + result = _flatten_anthropic_messages(msgs, None) + assert result[-1]["role"] == "assistant" + + def test_multi_turn_order(self) -> None: + msgs = [ + {"role": "user", "content": "first"}, + {"role": "assistant", "content": "second"}, + {"role": "user", "content": "third"}, + ] + result = _flatten_anthropic_messages(msgs, "sys") + assert len(result) == 4 + assert result[0]["role"] == "system" + assert result[1]["content"] == "first" + assert result[3]["content"] == "third" + + def test_empty_messages_with_system(self) -> None: + result = _flatten_anthropic_messages([], "only system") + assert len(result) == 1 + assert result[0]["role"] == "system" + + +# ------------------------------------------------------------------ +# _extract_anthropic_tool_calls +# ------------------------------------------------------------------ + + +class TestExtractAnthropicToolCalls: + def test_empty_content(self) -> None: + assert _extract_anthropic_tool_calls([]) == [] + + def test_text_block_ignored(self) -> None: + block = MagicMock() + block.type = "text" + block.text = "hello" + assert _extract_anthropic_tool_calls([block]) == [] + + def test_sdk_tool_use_object(self) -> None: + block = MagicMock() + block.type = "tool_use" + block.id = "call_abc" + block.name = "get_weather" + block.input = {"city": "London"} + result = _extract_anthropic_tool_calls([block]) + assert len(result) == 1 + assert result[0]["id"] == "call_abc" + assert result[0]["name"] == "get_weather" + assert result[0]["args"] == {"city": "London"} + + def test_dict_tool_use(self) -> None: + block = {"type": "tool_use", "id": "c1", "name": "search", "input": {"q": "x"}} + result = _extract_anthropic_tool_calls([block]) + assert len(result) == 1 + assert result[0]["name"] == "search" + assert result[0]["id"] == "c1" + + def test_multiple_tool_calls(self) -> None: + def _make(name: str, id_: str) -> MagicMock: + b = MagicMock() + b.type = "tool_use" + b.id = id_ + b.name = name + b.input = {} + return b + + result = _extract_anthropic_tool_calls( + [_make("tool_a", "c1"), _make("tool_b", "c2")] + ) + assert len(result) == 2 + assert {r["name"] for r in result} == {"tool_a", "tool_b"} + + def test_non_dict_input_coerced(self) -> None: + block = MagicMock() + block.type = "tool_use" + block.id = "c1" + block.name = "t" + block.input = [("key", "val")] + result = _extract_anthropic_tool_calls([block]) + assert isinstance(result[0]["args"], dict) + + +# ------------------------------------------------------------------ +# _extract_anthropic_usage +# ------------------------------------------------------------------ + + +class TestExtractAnthropicUsage: + def test_none_when_usage_attribute_missing(self) -> None: + assert _extract_anthropic_usage(object()) is None + + def test_none_when_usage_is_none(self) -> None: + response = MagicMock() + response.usage = None + assert _extract_anthropic_usage(response) is None + + def test_extracts_tokens_correctly(self) -> None: + usage = MagicMock() + usage.input_tokens = 150 + usage.output_tokens = 30 + response = MagicMock() + response.usage = usage + result = _extract_anthropic_usage(response) + assert result is not None + assert result["prompt_tokens"] == 150 + assert result["completion_tokens"] == 30 + assert result["total_tokens"] == 180 + + def test_zero_tokens_handled(self) -> None: + usage = MagicMock() + usage.input_tokens = 0 + usage.output_tokens = 0 + response = MagicMock() + response.usage = usage + result = _extract_anthropic_usage(response) + assert result is not None + assert result["total_tokens"] == 0 + + +# ------------------------------------------------------------------ +# _extract_response_text +# ------------------------------------------------------------------ + + +class TestExtractResponseText: + def test_single_text_block(self) -> None: + block = MagicMock() + block.type = "text" + block.text = "The answer." + assert _extract_response_text([block]) == "The answer." + + def test_multiple_text_blocks_joined(self) -> None: + def _tb(text: str) -> MagicMock: + b = MagicMock() + b.type = "text" + b.text = text + return b + + result = _extract_response_text([_tb("line1"), _tb("line2")]) + assert result == "line1\nline2" + + def test_non_text_blocks_skipped(self) -> None: + tool = MagicMock() + tool.type = "tool_use" + text = MagicMock() + text.type = "text" + text.text = "answer" + assert _extract_response_text([tool, text]) == "answer" + + def test_empty_content(self) -> None: + assert _extract_response_text([]) == "" + + def test_dict_text_block(self) -> None: + assert ( + _extract_response_text([{"type": "text", "text": "dict text"}]) + == "dict text" + ) + + +# ------------------------------------------------------------------ +# _derive_agent_id +# ------------------------------------------------------------------ + + +class TestDeriveAgentId: + def test_default_when_no_system(self) -> None: + msgs: list[ChatMessage] = [ChatMessage(role="user", content="hi")] + assert _derive_agent_id(msgs) == "default" + + def test_uses_system_prompt(self) -> None: + msgs: list[ChatMessage] = [ + ChatMessage(role="system", content="You are a code assistant."), + ChatMessage(role="user", content="Help me."), + ] + assert _derive_agent_id(msgs) == "You are a code assistant." + + def test_truncates_at_64_chars(self) -> None: + msgs: list[ChatMessage] = [ChatMessage(role="system", content="x" * 100)] + assert len(_derive_agent_id(msgs)) == 64 + + def test_newlines_replaced_with_spaces(self) -> None: + msgs: list[ChatMessage] = [ChatMessage(role="system", content="line1\nline2")] + result = _derive_agent_id(msgs) + assert "\n" not in result + + def test_empty_system_prompt_falls_back(self) -> None: + msgs: list[ChatMessage] = [ + ChatMessage(role="system", content=" "), + ChatMessage(role="user", content="hi"), + ] + assert _derive_agent_id(msgs) == "default" + + +# ------------------------------------------------------------------ +# build_anthropic_llm_pair +# ------------------------------------------------------------------ + + +class TestBuildAnthropicLlmPair: + def test_pair_type_is_llm(self) -> None: + pair = build_anthropic_llm_pair( + flat_messages=[ChatMessage(role="user", content="hi")], + response=_make_text_response(), + model="m", + session_id="s", + invocation_id="i", + run_id="r", + ) + assert pair.pair_type == "llm" + + def test_ids_propagated(self) -> None: + pair = build_anthropic_llm_pair( + flat_messages=[ChatMessage(role="user", content="hi")], + response=_make_text_response(), + model="m", + session_id="sess-abc", + invocation_id="inv-xyz", + run_id="run-1", + ) + assert pair.session_id == "sess-abc" + assert pair.invocation_id == "inv-xyz" + assert pair.run_id == "run-1" + + def test_model_from_response_preferred_over_request(self) -> None: + pair = build_anthropic_llm_pair( + flat_messages=[ChatMessage(role="user", content="hi")], + response=_make_text_response(model="claude-haiku-4-5"), + model="request-model", + session_id="s", + invocation_id="i", + run_id="r", + ) + assert isinstance(pair.data, LlmPairData) + assert pair.data.model == "claude-haiku-4-5" + + def test_fallback_to_request_model_when_response_empty(self) -> None: + resp = _make_text_response() + resp.model = "" + pair = build_anthropic_llm_pair( + flat_messages=[ChatMessage(role="user", content="hi")], + response=resp, + model="fallback-model", + session_id="s", + invocation_id="i", + run_id="r", + ) + assert isinstance(pair.data, LlmPairData) + assert pair.data.model == "fallback-model" + + def test_system_prompt_extracted(self) -> None: + flat_msgs: list[ChatMessage] = [ + ChatMessage(role="system", content="You are a triage agent."), + ChatMessage(role="user", content="Help."), + ] + pair = build_anthropic_llm_pair( + flat_messages=flat_msgs, + response=_make_text_response(), + model="m", + session_id="s", + invocation_id="i", + run_id="r", + ) + assert pair.agent.system_prompt == "You are a triage agent." + + def test_last_user_message_is_user_instruction(self) -> None: + flat_msgs: list[ChatMessage] = [ + ChatMessage(role="system", content="sys"), + ChatMessage(role="user", content="first question"), + ChatMessage(role="assistant", content="answer"), + ChatMessage(role="user", content="follow-up"), + ] + pair = build_anthropic_llm_pair( + flat_messages=flat_msgs, + response=_make_text_response(), + model="m", + session_id="s", + invocation_id="i", + run_id="r", + ) + assert pair.agent.user_instruction == "follow-up" + + def test_output_text_captured(self) -> None: + pair = build_anthropic_llm_pair( + flat_messages=[ChatMessage(role="user", content="hi")], + response=_make_text_response(text="The answer is 42."), + model="m", + session_id="s", + invocation_id="i", + run_id="r", + ) + assert isinstance(pair.data, LlmPairData) + assert pair.data.output == "The answer is 42." + + def test_token_usage_populated(self) -> None: + pair = build_anthropic_llm_pair( + flat_messages=[ChatMessage(role="user", content="hi")], + response=_make_text_response(input_tokens=200, output_tokens=50), + model="m", + session_id="s", + invocation_id="i", + run_id="r", + ) + assert isinstance(pair.data, LlmPairData) + assert pair.data.usage is not None + assert pair.data.usage["prompt_tokens"] == 200 + assert pair.data.usage["completion_tokens"] == 50 + assert pair.data.usage["total_tokens"] == 250 + + def test_tool_calls_in_data(self) -> None: + tool_block = MagicMock() + tool_block.type = "tool_use" + tool_block.id = "c1" + tool_block.name = "search" + tool_block.input = {"query": "test"} + + usage = MagicMock() + usage.input_tokens = 10 + usage.output_tokens = 5 + + resp = MagicMock() + resp.model = "claude-opus-4-6" + resp.content = [tool_block] + resp.usage = usage + + pair = build_anthropic_llm_pair( + flat_messages=[ChatMessage(role="user", content="find it")], + response=resp, + model="m", + session_id="s", + invocation_id="i", + run_id="r", + ) + assert isinstance(pair.data, LlmPairData) + assert len(pair.data.tool_calls) == 1 + assert pair.data.tool_calls[0]["name"] == "search" + + def test_event_id_is_unique_per_call(self) -> None: + kwargs: Any = dict( + flat_messages=[ChatMessage(role="user", content="hi")], + response=_make_text_response(), + model="m", + session_id="s", + invocation_id="i", + run_id="r", + ) + assert ( + build_anthropic_llm_pair(**kwargs).event_id + != build_anthropic_llm_pair(**kwargs).event_id + ) + + def test_parent_is_none(self) -> None: + pair = build_anthropic_llm_pair( + flat_messages=[ChatMessage(role="user", content="hi")], + response=_make_text_response(), + model="m", + session_id="s", + invocation_id="i", + run_id="r", + ) + assert pair.parent is None + + +# ------------------------------------------------------------------ +# _emit_pair +# ------------------------------------------------------------------ + + +class TestEmitPair: + async def test_emits_event_to_hooks(self) -> None: + config = AdrianConfig(session_id="sess-emit") + _, collector = _wired_hooks(config) + + await _emit_pair( + _make_text_response(text="Reply"), + { + "model": "claude-opus-4-6", + "messages": [{"role": "user", "content": "Question"}], + "system": "You are helpful.", + }, + ) + + assert len(collector.events) == 1 + event = collector.events[0] + assert event.pair_type == "llm" + assert event.session_id == "sess-emit" + assert isinstance(event.data, LlmPairData) + assert event.data.output == "Reply" + assert event.agent.system_prompt == "You are helpful." + + async def test_skips_silently_when_hooks_none(self) -> None: + _ah._hooks_getter = lambda: None + _ah._config_getter = lambda: None + await _emit_pair(_make_text_response(), {"model": "m", "messages": []}) + + async def test_skips_silently_when_getters_not_set(self) -> None: + _ah._hooks_getter = None + _ah._config_getter = None + await _emit_pair(_make_text_response(), {"model": "m", "messages": []}) + + async def test_fires_on_event_callback(self) -> None: + fired: list[str] = [] + + def on_event( + event_type: str, + data: Any, + run_id: str, + parent_run_id: str | None, + event_id: str | None, + ) -> None: + fired.append(event_type) + + config = AdrianConfig(session_id="s", on_event=on_event) + _wired_hooks(config) + + await _emit_pair( + _make_text_response(), + {"model": "m", "messages": [{"role": "user", "content": "q"}]}, + ) + + assert fired == ["llm"] + + async def test_uses_invocation_id_from_context(self) -> None: + config = AdrianConfig(session_id="s") + _, collector = _wired_hooks(config) + + token = set_invocation_id("fixed-inv-id") + + try: + await _emit_pair( + _make_text_response(), + {"model": "m", "messages": [{"role": "user", "content": "q"}]}, + ) + finally: + token.var.reset(token) + + assert collector.events[0].invocation_id == "fixed-inv-id" + + async def test_defaults_to_no_invocation_outside_context(self) -> None: + config = AdrianConfig(session_id="s") + _, collector = _wired_hooks(config) + + await _emit_pair( + _make_text_response(), + {"model": "m", "messages": [{"role": "user", "content": "q"}]}, + ) + + assert collector.events[0].invocation_id == "no_invocation" + + async def test_token_usage_in_emitted_event(self) -> None: + config = AdrianConfig(session_id="s") + _, collector = _wired_hooks(config) + + await _emit_pair( + _make_text_response(input_tokens=100, output_tokens=40), + {"model": "m", "messages": [{"role": "user", "content": "q"}]}, + ) + + data = collector.events[0].data + assert isinstance(data, LlmPairData) + assert data.usage is not None + assert data.usage["total_tokens"] == 140 + + +# ------------------------------------------------------------------ +# patch_anthropic +# ------------------------------------------------------------------ + + +class TestPatchAnthropicGetters: + def test_getters_updated_on_each_call(self) -> None: + hooks_a: list[HookRegistry] = [HookRegistry()] + config_a: list[AdrianConfig] = [AdrianConfig()] + + patch_anthropic( + hooks_getter=lambda: hooks_a[0], config_getter=lambda: config_a[0] + ) + + assert _ah._hooks_getter is not None + assert _ah._config_getter is not None + assert _ah._hooks_getter() is hooks_a[0] + assert _ah._config_getter() is config_a[0] + + hooks_b = HookRegistry() + config_b = AdrianConfig() + + patch_anthropic(hooks_getter=lambda: hooks_b, config_getter=lambda: config_b) + + assert _ah._hooks_getter() is hooks_b + assert _ah._config_getter() is config_b + + def test_no_op_when_anthropic_not_installed( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + import sys + + saved = sys.modules.pop("anthropic", None) + monkeypatch.setitem(sys.modules, "anthropic", None) # type: ignore[arg-type] + + try: + patch_anthropic(hooks_getter=lambda: None, config_getter=lambda: None) + finally: + if saved is not None: + sys.modules["anthropic"] = saved + else: + sys.modules.pop("anthropic", None) + + +# ------------------------------------------------------------------ +# anthropic_invocation / anthropic_invocation_sync +# ------------------------------------------------------------------ + + +class TestAnthropicInvocationContext: + async def test_async_sets_invocation_id(self) -> None: + assert get_invocation_id() is None + + async with anthropic_invocation(): + inv_id = get_invocation_id() + assert inv_id is not None + assert len(inv_id) > 0 + + assert get_invocation_id() is None + + async def test_async_id_is_uuid_format(self) -> None: + import re + + uuid_re = re.compile( + r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$" + ) + + async with anthropic_invocation(): + assert uuid_re.match(get_invocation_id() or "") is not None + + async def test_async_resets_on_exit(self) -> None: + outer_token = set_invocation_id("outer") + + async with anthropic_invocation(): + inner_id = get_invocation_id() + assert inner_id != "outer" + + assert get_invocation_id() == "outer" + outer_token.var.reset(outer_token) + + def test_sync_sets_invocation_id(self) -> None: + assert get_invocation_id() is None + + with anthropic_invocation_sync(): + inv_id = get_invocation_id() + assert inv_id is not None + + assert get_invocation_id() is None + + async def test_two_consecutive_invocations_have_different_ids(self) -> None: + ids: list[str] = [] + + async with anthropic_invocation(): + ids.append(get_invocation_id() or "") + + async with anthropic_invocation(): + ids.append(get_invocation_id() or "") + + assert ids[0] != ids[1]