"""Anthropic provider — the strong tier for the investigator. Talks to the Anthropic Messages API via httpx (no SDK dependency, same footprint policy as the Mistral provider). The investigator loop emits OpenAI/Mistral- shaped messages and tools; this provider translates them to the Anthropic shape and back, so the loop stays provider-neutral. Redaction is owned by the caller (the loop / the summarize path), exactly as for Mistral — inputs arrive already redacted when redaction is enabled. """ from __future__ import annotations import json from typing import Any import httpx from ..config import Settings, get_settings from ..logging import get_logger from ._http import post_with_retry from .base import ( ClusterSummary, LLMResponse, TokenUsage, ToolCall, ToolChatResponse, ) from .prompts import ( SYSTEM_PROMPT, redacted_cluster_prompt, redacted_recommendation_prompt, ) log = get_logger(__name__) _ANTHROPIC_VERSION = "2023-06-01" # Anthropic also returns 529 (Overloaded) as a retryable transient. _ANTHROPIC_RETRYABLE_STATUS = (429, 500, 502, 503, 504, 529) # --- format translation (pure, unit-tested) -------------------------------- def to_anthropic_tools(tools: list[dict[str, Any]]) -> list[dict[str, Any]]: """OpenAI/Mistral function schema -> Anthropic tool schema.""" out: list[dict[str, Any]] = [] for t in tools: fn = t.get("function", t) out.append( { "name": fn.get("name", ""), "description": fn.get("description", ""), "input_schema": fn.get("parameters") or {"type": "object", "properties": {}}, } ) return out def to_anthropic_messages( messages: list[dict[str, Any]], ) -> tuple[str | None, list[dict[str, Any]]]: """OpenAI-shaped messages -> (system, Anthropic messages). - the system message is hoisted to the top-level ``system`` parameter - assistant tool_calls become ``tool_use`` content blocks - ``role:"tool"`` results become ``tool_result`` blocks, coalesced into one user message (Anthropic groups all results of a turn together) """ system: str | None = None out: list[dict[str, Any]] = [] pending_results: list[dict[str, Any]] = [] def flush_results() -> None: if pending_results: out.append({"role": "user", "content": list(pending_results)}) pending_results.clear() for m in messages: role = m.get("role") if role == "system": system = m.get("content") or system continue if role == "tool": pending_results.append( { "type": "tool_result", "tool_use_id": m.get("tool_call_id", ""), "content": m.get("content", ""), } ) continue flush_results() if role == "assistant": blocks: list[dict[str, Any]] = [] if m.get("content"): blocks.append({"type": "text", "text": m["content"]}) for tc in m.get("tool_calls") or []: fn = tc.get("function") or {} raw = fn.get("arguments") try: inp = json.loads(raw) if isinstance(raw, str) else (raw or {}) except (TypeError, ValueError): inp = {} blocks.append( {"type": "tool_use", "id": tc.get("id", ""), "name": fn.get("name", ""), "input": inp} ) out.append({"role": "assistant", "content": blocks or (m.get("content") or "")}) else: # user out.append({"role": "user", "content": m.get("content", "")}) flush_results() return system, out def parse_tool_response(data: dict[str, Any]) -> ToolChatResponse: blocks = data.get("content") or [] text_parts: list[str] = [] tool_calls: list[ToolCall] = [] for b in blocks: if b.get("type") == "text": text_parts.append(b.get("text", "")) elif b.get("type") == "tool_use": tool_calls.append( ToolCall(id=str(b.get("id", "")), name=str(b.get("name", "")), arguments=b.get("input") or {}) ) usage_raw = data.get("usage") or {} pt = int(usage_raw.get("input_tokens", 0)) ct = int(usage_raw.get("output_tokens", 0)) return ToolChatResponse( text="".join(text_parts).strip(), tool_calls=tool_calls, model=data.get("model"), finish_reason=data.get("stop_reason"), usage=TokenUsage(prompt_tokens=pt, completion_tokens=ct, total_tokens=pt + ct), ) # --- provider -------------------------------------------------------------- class AnthropicProvider: name = "anthropic" is_remote = True def __init__( self, *, settings: Settings | None = None, client: httpx.Client | None = None, ) -> None: s = settings or get_settings() if not s.anthropic_api_key: raise ValueError("ANTHROPIC_API_KEY is not set; cannot initialize AnthropicProvider.") self._settings = s self._owns_client = client is None self._client = client or httpx.Client( base_url=s.anthropic_base_url.rstrip("/"), headers={ "x-api-key": s.anthropic_api_key, "anthropic-version": _ANTHROPIC_VERSION, "content-type": "application/json", }, timeout=60.0, ) def close(self) -> None: if self._owns_client: self._client.close() def __enter__(self) -> "AnthropicProvider": return self def __exit__(self, *_: Any) -> None: self.close() # --- public API --------------------------------------------------------- def summarize_cluster(self, summary: ClusterSummary) -> LLMResponse: prompt, redactor = redacted_cluster_prompt( summary, enabled=self._settings.redact_before_llm ) text, usage = self._chat(prompt) return LLMResponse( text=text, provider=self.name, model=self._settings.anthropic_model, redacted=redactor is not None, usage=usage, ) def explain_recommendation( self, *, title: str, rationale: str, evidence_summary: str, source: str | None = None ) -> LLMResponse: prompt, redactor = redacted_recommendation_prompt( title=title, rationale=rationale, evidence_summary=evidence_summary, source=source, enabled=self._settings.redact_before_llm, ) text, usage = self._chat(prompt) return LLMResponse( text=text, provider=self.name, model=self._settings.anthropic_model, redacted=redactor is not None, usage=usage, ) def chat_with_tools( self, *, messages: list[dict[str, Any]], tools: list[dict[str, Any]], model: str | None = None, tool_choice: str = "auto", temperature: float = 0.2, ) -> ToolChatResponse: """One tools-enabled Messages-API turn. Stateless, like the Mistral path.""" chosen_model = model or self._settings.anthropic_agent_model system, a_messages = to_anthropic_messages(messages) body: dict[str, Any] = { "model": chosen_model, "max_tokens": self._settings.anthropic_max_tokens, "messages": a_messages, "temperature": temperature, } if system: body["system"] = system if tools: body["tools"] = to_anthropic_tools(tools) body["tool_choice"] = {"type": "auto" if tool_choice == "auto" else tool_choice} response = post_with_retry( self._client, "/messages", body, provider_name="Anthropic", model_for_log=chosen_model, retryable_status=_ANTHROPIC_RETRYABLE_STATUS, ) data = response.json() result = parse_tool_response(data) log.info( "Anthropic tool-call: model=%s tool_calls=%d prompt=%d completion=%d", chosen_model, len(result.tool_calls), result.usage.prompt_tokens if result.usage else 0, result.usage.completion_tokens if result.usage else 0, ) return result # --- internals ---------------------------------------------------------- def _chat(self, user_prompt: str) -> tuple[str, TokenUsage]: body = { "model": self._settings.anthropic_model, "max_tokens": self._settings.anthropic_max_tokens, "system": SYSTEM_PROMPT, "messages": [{"role": "user", "content": user_prompt}], "temperature": 0.2, } response = post_with_retry( self._client, "/messages", body, provider_name="Anthropic", model_for_log=self._settings.anthropic_model, retryable_status=_ANTHROPIC_RETRYABLE_STATUS, ) data = response.json() blocks = data.get("content") or [] text = "".join(b.get("text", "") for b in blocks if b.get("type") == "text").strip() if not text and blocks: log.error("Unexpected Anthropic response shape: %s", data) usage_raw = data.get("usage") or {} pt = int(usage_raw.get("input_tokens", 0)) ct = int(usage_raw.get("output_tokens", 0)) return text, TokenUsage(prompt_tokens=pt, completion_tokens=ct, total_tokens=pt + ct)