Coverage for projects/04-llm-adapter-shadow/src/llm_adapter/shadow.py: 93%
55 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-24 01:32 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-24 01:32 +0000
1"""Shadow execution helpers."""
3from __future__ import annotations
5import threading
6import time
7from pathlib import Path
8from typing import Any
10from .metrics import log_event
11from .provider_spi import ProviderRequest, ProviderResponse, ProviderSPI
12from .utils import content_hash
14MetricsPath = str | Path | None
15DEFAULT_METRICS_PATH = "artifacts/runs-metrics.jsonl"
18def _to_path_str(path: MetricsPath) -> str | None:
19 if path is None:
20 return None
21 return str(Path(path))
24def run_with_shadow(
25 primary: ProviderSPI,
26 shadow: ProviderSPI | None,
27 req: ProviderRequest,
28 metrics_path: MetricsPath = DEFAULT_METRICS_PATH,
29) -> ProviderResponse:
30 """Invoke ``primary`` while optionally mirroring the call on ``shadow``.
32 The shadow execution runs on a background thread and *never* affects the
33 primary result. Metrics about both executions are appended to a JSONL file
34 so they can be analysed offline.
35 """
37 shadow_thread: threading.Thread | None = None
38 shadow_payload: dict[str, Any] = {}
39 shadow_name: str | None = None
40 metrics_path_str = _to_path_str(metrics_path)
42 if shadow is not None:
43 shadow_name = shadow.name()
45 def _shadow_worker() -> None:
46 ts0 = time.time()
47 try:
48 response = shadow.invoke(req)
49 except Exception as exc: # pragma: no cover - error branch tested via metrics
50 shadow_payload.update(
51 {
52 "ok": False,
53 "error": type(exc).__name__,
54 "message": str(exc),
55 "provider": shadow_name,
56 }
57 )
58 else:
59 shadow_payload.update(
60 {
61 "ok": True,
62 "provider": shadow_name,
63 "latency_ms": response.latency_ms,
64 "text_len": len(response.text),
65 "token_usage_total": response.token_usage.total,
66 }
67 )
68 finally:
69 shadow_payload["duration_ms"] = int((time.time() - ts0) * 1000)
71 shadow_thread = threading.Thread(target=_shadow_worker, daemon=True)
72 shadow_thread.start()
74 try:
75 primary_res = primary.invoke(req)
76 except Exception:
77 if shadow_thread is not None:
78 shadow_thread.join(timeout=0)
79 raise
81 if shadow_thread is not None:
82 shadow_thread.join(timeout=10)
83 if shadow_thread.is_alive():
84 shadow_payload.setdefault("provider", shadow_name)
85 shadow_payload.update({"ok": False, "error": "ShadowTimeout"})
87 if metrics_path_str:
88 primary_text_len = len(primary_res.text)
89 request_fingerprint = content_hash(
90 "runner", req.prompt, req.options, req.max_tokens
91 )
92 record: dict[str, Any] = {
93 "request_hash": content_hash(
94 primary.name(), req.prompt, req.options, req.max_tokens
95 ),
96 "request_fingerprint": request_fingerprint,
97 "primary_provider": primary.name(),
98 "primary_latency_ms": primary_res.latency_ms,
99 "primary_text_len": primary_text_len,
100 "primary_token_usage_total": primary_res.token_usage.total,
101 "shadow_provider": shadow_payload.get("provider", shadow_name),
102 "shadow_ok": shadow_payload.get("ok"),
103 "shadow_latency_ms": shadow_payload.get("latency_ms"),
104 "shadow_duration_ms": shadow_payload.get("duration_ms"),
105 "shadow_error": shadow_payload.get("error"),
106 }
108 if shadow_payload.get("latency_ms") is not None:
109 record["latency_gap_ms"] = shadow_payload["latency_ms"] - primary_res.latency_ms
111 if shadow_payload.get("text_len") is not None:
112 record["shadow_text_len"] = shadow_payload["text_len"]
114 if shadow_payload.get("token_usage_total") is not None:
115 record["shadow_token_usage_total"] = shadow_payload["token_usage_total"]
117 if shadow_payload.get("message"):
118 record["shadow_error_message"] = shadow_payload["message"]
120 log_event("shadow_diff", metrics_path_str, **record)
122 return primary_res
125__all__ = ["run_with_shadow", "DEFAULT_METRICS_PATH"]