Coverage for projects/04-llm-adapter-shadow/src/llm_adapter/shadow.py: 93%

55 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-24 01:32 +0000

1"""Shadow execution helpers.""" 

2 

3from __future__ import annotations 

4 

5import threading 

6import time 

7from pathlib import Path 

8from typing import Any 

9 

10from .metrics import log_event 

11from .provider_spi import ProviderRequest, ProviderResponse, ProviderSPI 

12from .utils import content_hash 

13 

14MetricsPath = str | Path | None 

15DEFAULT_METRICS_PATH = "artifacts/runs-metrics.jsonl" 

16 

17 

18def _to_path_str(path: MetricsPath) -> str | None: 

19 if path is None: 

20 return None 

21 return str(Path(path)) 

22 

23 

24def run_with_shadow( 

25 primary: ProviderSPI, 

26 shadow: ProviderSPI | None, 

27 req: ProviderRequest, 

28 metrics_path: MetricsPath = DEFAULT_METRICS_PATH, 

29) -> ProviderResponse: 

30 """Invoke ``primary`` while optionally mirroring the call on ``shadow``. 

31 

32 The shadow execution runs on a background thread and *never* affects the 

33 primary result. Metrics about both executions are appended to a JSONL file 

34 so they can be analysed offline. 

35 """ 

36 

37 shadow_thread: threading.Thread | None = None 

38 shadow_payload: dict[str, Any] = {} 

39 shadow_name: str | None = None 

40 metrics_path_str = _to_path_str(metrics_path) 

41 

42 if shadow is not None: 

43 shadow_name = shadow.name() 

44 

45 def _shadow_worker() -> None: 

46 ts0 = time.time() 

47 try: 

48 response = shadow.invoke(req) 

49 except Exception as exc: # pragma: no cover - error branch tested via metrics 

50 shadow_payload.update( 

51 { 

52 "ok": False, 

53 "error": type(exc).__name__, 

54 "message": str(exc), 

55 "provider": shadow_name, 

56 } 

57 ) 

58 else: 

59 shadow_payload.update( 

60 { 

61 "ok": True, 

62 "provider": shadow_name, 

63 "latency_ms": response.latency_ms, 

64 "text_len": len(response.text), 

65 "token_usage_total": response.token_usage.total, 

66 } 

67 ) 

68 finally: 

69 shadow_payload["duration_ms"] = int((time.time() - ts0) * 1000) 

70 

71 shadow_thread = threading.Thread(target=_shadow_worker, daemon=True) 

72 shadow_thread.start() 

73 

74 try: 

75 primary_res = primary.invoke(req) 

76 except Exception: 

77 if shadow_thread is not None: 

78 shadow_thread.join(timeout=0) 

79 raise 

80 

81 if shadow_thread is not None: 

82 shadow_thread.join(timeout=10) 

83 if shadow_thread.is_alive(): 

84 shadow_payload.setdefault("provider", shadow_name) 

85 shadow_payload.update({"ok": False, "error": "ShadowTimeout"}) 

86 

87 if metrics_path_str: 

88 primary_text_len = len(primary_res.text) 

89 request_fingerprint = content_hash( 

90 "runner", req.prompt, req.options, req.max_tokens 

91 ) 

92 record: dict[str, Any] = { 

93 "request_hash": content_hash( 

94 primary.name(), req.prompt, req.options, req.max_tokens 

95 ), 

96 "request_fingerprint": request_fingerprint, 

97 "primary_provider": primary.name(), 

98 "primary_latency_ms": primary_res.latency_ms, 

99 "primary_text_len": primary_text_len, 

100 "primary_token_usage_total": primary_res.token_usage.total, 

101 "shadow_provider": shadow_payload.get("provider", shadow_name), 

102 "shadow_ok": shadow_payload.get("ok"), 

103 "shadow_latency_ms": shadow_payload.get("latency_ms"), 

104 "shadow_duration_ms": shadow_payload.get("duration_ms"), 

105 "shadow_error": shadow_payload.get("error"), 

106 } 

107 

108 if shadow_payload.get("latency_ms") is not None: 

109 record["latency_gap_ms"] = shadow_payload["latency_ms"] - primary_res.latency_ms 

110 

111 if shadow_payload.get("text_len") is not None: 

112 record["shadow_text_len"] = shadow_payload["text_len"] 

113 

114 if shadow_payload.get("token_usage_total") is not None: 

115 record["shadow_token_usage_total"] = shadow_payload["token_usage_total"] 

116 

117 if shadow_payload.get("message"): 

118 record["shadow_error_message"] = shadow_payload["message"] 

119 

120 log_event("shadow_diff", metrics_path_str, **record) 

121 

122 return primary_res 

123 

124 

125__all__ = ["run_with_shadow", "DEFAULT_METRICS_PATH"]