Coverage for agentos/evolution/signals.py: 39%

191 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2Behavior Signal Detection — User behavior signal collection and analysis. 

3 

4Signals collected: 

5 - Tool usage frequency and patterns 

6 - Explicit feedback (thumbs up/down, ratings) 

7 - Implicit feedback (corrections, re-prompts, "no/stop/undo") 

8 - Conversation context (topic shifts, depth, sentiment) 

9 - Timing signals (response latency, session duration) 

10 - Preference signals (format preferences, language, tone) 

11 

12These signals feed into the Learner to generate evolution proposals. 

13""" 

14 

15from __future__ import annotations 

16 

17import json 

18import time 

19import uuid 

20from collections import defaultdict 

21from dataclasses import dataclass, field 

22from enum import Enum 

23from pathlib import Path 

24from typing import Any, Callable, Optional 

25 

26 

27# ── Signal Types ── 

28 

29class SignalType(str, Enum): 

30 TOOL_USAGE = "tool_usage" # Tool was invoked 

31 EXPLICIT_FEEDBACK = "explicit_feedback" # User gave explicit rating 

32 CORRECTION = "correction" # User corrected agent 

33 RE_PROMPT = "re_prompt" # User re-asked the same thing 

34 UNDO = "undo" # User undid an action 

35 SESSION_LENGTH = "session_length" # Session duration 

36 TOPIC_SWITCH = "topic_switch" # User changed topic abruptly 

37 FORMAT_PREFERENCE = "format_preference" # Output format preference 

38 RESPONSE_LATENCY = "response_latency" # How fast agent responded 

39 ERROR_RECOVERY = "error_recovery" # Error occurred and agent recovered 

40 PATTERN_MATCH = "pattern_match" # Recognized repeated pattern 

41 

42 

43class FeedbackPolarity(str, Enum): 

44 POSITIVE = "positive" 

45 NEGATIVE = "negative" 

46 NEUTRAL = "neutral" 

47 

48 

49# ── Signal Data ── 

50 

51@dataclass 

52class BehaviorSignal: 

53 """A single observed user behavior signal.""" 

54 

55 id: str = field(default_factory=lambda: uuid.uuid4().hex[:8]) 

56 type_: SignalType = SignalType.TOOL_USAGE 

57 timestamp: float = field(default_factory=time.time) 

58 user_id: str = "default" 

59 session_id: str = "" 

60 

61 # Payload 

62 tool_name: str = "" 

63 tool_args: dict = field(default_factory=dict) 

64 tool_success: bool = True 

65 tool_duration_ms: float = 0.0 

66 

67 feedback_type: str = "" # "thumbs_up", "thumbs_down", "rating:4" 

68 feedback_text: str = "" 

69 polarity: FeedbackPolarity = FeedbackPolarity.NEUTRAL 

70 

71 context_before: str = "" # What happened before this signal 

72 context_after: str = "" # Result after this signal 

73 

74 metadata: dict[str, Any] = field(default_factory=dict) 

75 

76 def to_dict(self) -> dict: 

77 return { 

78 "id": self.id, 

79 "type": self.type_.value, 

80 "timestamp": self.timestamp, 

81 "user_id": self.user_id, 

82 "session_id": self.session_id, 

83 "tool_name": self.tool_name, 

84 "tool_args": self.tool_args, 

85 "tool_success": self.tool_success, 

86 "tool_duration_ms": self.tool_duration_ms, 

87 "feedback_type": self.feedback_type, 

88 "feedback_text": self.feedback_text, 

89 "polarity": self.polarity.value, 

90 "context_before": self.context_before[:500], 

91 "context_after": self.context_after[:500], 

92 "metadata": self.metadata, 

93 } 

94 

95 

96@dataclass 

97class SignalSummary: 

98 """Aggregated signal analysis over a time window.""" 

99 

100 window_start: float = 0.0 

101 window_end: float = field(default_factory=time.time) 

102 total_signals: int = 0 

103 

104 # Tool usage 

105 top_tools: list[tuple[str, int]] = field(default_factory=list) # [(tool_name, count)] 

106 tool_success_rate: float = 0.0 

107 

108 # Feedback 

109 positive_feedback: int = 0 

110 negative_feedback: int = 0 

111 correction_count: int = 0 

112 undo_count: int = 0 

113 re_prompt_count: int = 0 

114 

115 # Patterns 

116 detected_patterns: list[str] = field(default_factory=list) 

117 

118 def to_dict(self) -> dict: 

119 return { 

120 "window_start": self.window_start, 

121 "window_end": self.window_end, 

122 "total_signals": self.total_signals, 

123 "top_tools": self.top_tools, 

124 "tool_success_rate": self.tool_success_rate, 

125 "positive_feedback": self.positive_feedback, 

126 "negative_feedback": self.negative_feedback, 

127 "correction_count": self.correction_count, 

128 "undo_count": self.undo_count, 

129 "re_prompt_count": self.re_prompt_count, 

130 "detected_patterns": self.detected_patterns, 

131 } 

132 

133 

134# ── Signal Collector ── 

135 

136class SignalCollector: 

137 """Collects and persists user behavior signals. 

138 

139 Features: 

140 - In-memory ring buffer (last N signals) 

141 - Optional disk persistence 

142 - Signal hooks for real-time processing 

143 - Aggregation windows for analysis 

144 

145 Usage: 

146 collector = SignalCollector(buffer_size=1000) 

147 

148 # Record tool usage 

149 collector.record_tool_usage("web_search", {"query": "..."}, success=True) 

150 

151 # Record explicit feedback 

152 collector.record_feedback("thumbs_up", "Great answer!") 

153 

154 # Record correction 

155 collector.record_correction("No, use Python not JS") 

156 

157 # Get summary 

158 summary = collector.summarize(hours=24) 

159 """ 

160 

161 def __init__( 

162 self, 

163 buffer_size: int = 2000, 

164 persist_path: Optional[str] = None, 

165 ): 

166 self._buffer: list[BehaviorSignal] = [] 

167 self._buffer_size = buffer_size 

168 self._persist_path = Path(persist_path) if persist_path else None 

169 self._hooks: list[Callable[[BehaviorSignal], None]] = [] 

170 self._tool_counter: dict[str, int] = defaultdict(int) 

171 self._session_id: str = "" 

172 

173 if self._persist_path: 

174 self._persist_path.parent.mkdir(parents=True, exist_ok=True) 

175 self._load_from_disk() 

176 

177 def set_session(self, session_id: str) -> None: 

178 self._session_id = session_id 

179 

180 # ── Recording API ── 

181 

182 def record_tool_usage( 

183 self, 

184 tool_name: str, 

185 tool_args: dict = None, 

186 success: bool = True, 

187 duration_ms: float = 0.0, 

188 ) -> BehaviorSignal: 

189 """Record a tool invocation.""" 

190 signal = BehaviorSignal( 

191 type_=SignalType.TOOL_USAGE, 

192 tool_name=tool_name, 

193 tool_args=tool_args or {}, 

194 tool_success=success, 

195 tool_duration_ms=duration_ms, 

196 session_id=self._session_id, 

197 ) 

198 self._tool_counter[tool_name] += 1 

199 self._append(signal) 

200 return signal 

201 

202 def record_feedback( 

203 self, 

204 feedback_type: str, 

205 feedback_text: str = "", 

206 ) -> BehaviorSignal: 

207 """Record explicit user feedback.""" 

208 polarity = FeedbackPolarity.NEUTRAL 

209 if feedback_type in ("thumbs_up", "positive", "5", "4"): 

210 polarity = FeedbackPolarity.POSITIVE 

211 elif feedback_type in ("thumbs_down", "negative", "1", "2"): 

212 polarity = FeedbackPolarity.NEGATIVE 

213 

214 signal = BehaviorSignal( 

215 type_=SignalType.EXPLICIT_FEEDBACK, 

216 feedback_type=feedback_type, 

217 feedback_text=feedback_text, 

218 polarity=polarity, 

219 session_id=self._session_id, 

220 ) 

221 self._append(signal) 

222 return signal 

223 

224 def record_correction(self, correction_text: str, context: str = "") -> BehaviorSignal: 

225 """Record a user correction.""" 

226 signal = BehaviorSignal( 

227 type_=SignalType.CORRECTION, 

228 feedback_text=correction_text, 

229 context_before=context, 

230 polarity=FeedbackPolarity.NEGATIVE, 

231 session_id=self._session_id, 

232 ) 

233 self._append(signal) 

234 return signal 

235 

236 def record_undo(self, action: str = "") -> BehaviorSignal: 

237 """Record an undo action.""" 

238 signal = BehaviorSignal( 

239 type_=SignalType.UNDO, 

240 tool_name=action, 

241 polarity=FeedbackPolarity.NEGATIVE, 

242 session_id=self._session_id, 

243 ) 

244 self._append(signal) 

245 return signal 

246 

247 def record_re_prompt(self, original_query: str = "") -> BehaviorSignal: 

248 """Record a re-prompt (user asked again differently).""" 

249 signal = BehaviorSignal( 

250 type_=SignalType.RE_PROMPT, 

251 context_before=original_query, 

252 polarity=FeedbackPolarity.NEGATIVE, 

253 session_id=self._session_id, 

254 ) 

255 self._append(signal) 

256 return signal 

257 

258 def record_format_preference(self, format_type: str) -> BehaviorSignal: 

259 """Record output format preference.""" 

260 signal = BehaviorSignal( 

261 type_=SignalType.FORMAT_PREFERENCE, 

262 feedback_type=format_type, 

263 session_id=self._session_id, 

264 ) 

265 self._append(signal) 

266 return signal 

267 

268 def record_error_recovery(self, error: str, recovered: bool = True) -> BehaviorSignal: 

269 """Record an error that the agent recovered from.""" 

270 signal = BehaviorSignal( 

271 type_=SignalType.ERROR_RECOVERY, 

272 context_before=error, 

273 tool_success=recovered, 

274 session_id=self._session_id, 

275 ) 

276 self._append(signal) 

277 return signal 

278 

279 # ── Analysis ── 

280 

281 def summarize(self, hours: float = 24) -> SignalSummary: 

282 """Generate a summary of signals over the last N hours.""" 

283 now = time.time() 

284 cutoff = now - hours * 3600 

285 

286 signals = [s for s in self._buffer if s.timestamp >= cutoff] 

287 

288 summary = SignalSummary( 

289 window_start=cutoff, 

290 window_end=now, 

291 total_signals=len(signals), 

292 ) 

293 

294 tool_counts: dict[str, int] = defaultdict(int) 

295 total_tools = 0 

296 successful_tools = 0 

297 

298 for s in signals: 

299 if s.type_ == SignalType.TOOL_USAGE: 

300 tool_counts[s.tool_name] += 1 

301 total_tools += 1 

302 if s.tool_success: 

303 successful_tools += 1 

304 

305 elif s.type_ == SignalType.EXPLICIT_FEEDBACK: 

306 if s.polarity == FeedbackPolarity.POSITIVE: 

307 summary.positive_feedback += 1 

308 elif s.polarity == FeedbackPolarity.NEGATIVE: 

309 summary.negative_feedback += 1 

310 

311 elif s.type_ == SignalType.CORRECTION: 

312 summary.correction_count += 1 

313 

314 elif s.type_ == SignalType.UNDO: 

315 summary.undo_count += 1 

316 

317 elif s.type_ == SignalType.RE_PROMPT: 

318 summary.re_prompt_count += 1 

319 

320 summary.top_tools = sorted(tool_counts.items(), key=lambda x: -x[1])[:10] 

321 summary.tool_success_rate = successful_tools / max(total_tools, 1) 

322 

323 # Detect patterns 

324 summary.detected_patterns = self._detect_patterns(signals) 

325 

326 return summary 

327 

328 def get_tool_ranking(self, top_n: int = 10) -> list[tuple[str, int]]: 

329 return sorted(self._tool_counter.items(), key=lambda x: -x[1])[:top_n] 

330 

331 def get_feedback_ratio(self, hours: float = 168) -> float: 

332 """Positive feedback ratio over time window.""" 

333 summary = self.summarize(hours) 

334 total = summary.positive_feedback + summary.negative_feedback 

335 if total == 0: 

336 return 0.5 

337 return summary.positive_feedback / total 

338 

339 # ── Hooks ── 

340 

341 def on_signal(self, hook: Callable[[BehaviorSignal], None]) -> None: 

342 """Register a hook called on every new signal.""" 

343 self._hooks.append(hook) 

344 

345 # ── Internal ── 

346 

347 def _append(self, signal: BehaviorSignal) -> None: 

348 self._buffer.append(signal) 

349 if len(self._buffer) > self._buffer_size: 

350 self._buffer = self._buffer[-self._buffer_size:] 

351 

352 for hook in self._hooks: 

353 try: 

354 hook(signal) 

355 except Exception: 

356 pass 

357 

358 if self._persist_path: 

359 self._save_to_disk() 

360 

361 def _detect_patterns(self, signals: list[BehaviorSignal]) -> list[str]: 

362 """Detect behavioral patterns from signals.""" 

363 patterns = [] 

364 

365 # Pattern: frequent corrections on same topic 

366 corrections = [s for s in signals if s.type_ == SignalType.CORRECTION] 

367 if len(corrections) >= 3: 

368 patterns.append(f"frequent_corrections:{len(corrections)}") 

369 

370 # Pattern: high undo rate 

371 undos = [s for s in signals if s.type_ == SignalType.UNDO] 

372 if len(undos) >= 2: 

373 patterns.append(f"high_undo_rate:{len(undos)}") 

374 

375 # Pattern: repeated tool failures 

376 failed_tools = [s for s in signals if s.type_ == SignalType.TOOL_USAGE and not s.tool_success] 

377 if len(failed_tools) >= 3: 

378 tools = set(s.tool_name for s in failed_tools) 

379 patterns.append(f"failing_tools:{','.join(tools)}") 

380 

381 # Pattern: positive feedback streak 

382 positive = [s for s in signals if s.polarity == FeedbackPolarity.POSITIVE] 

383 if len(positive) >= 5: 

384 patterns.append(f"positive_streak:{len(positive)}") 

385 

386 return patterns 

387 

388 def _save_to_disk(self) -> None: 

389 if not self._persist_path: 

390 return 

391 try: 

392 data = [s.to_dict() for s in self._buffer[-500:]] 

393 self._persist_path.write_text(json.dumps(data, ensure_ascii=False, indent=2)) 

394 except Exception: 

395 pass 

396 

397 def _load_from_disk(self) -> None: 

398 if not self._persist_path or not self._persist_path.exists(): 

399 return 

400 try: 

401 data = json.loads(self._persist_path.read_text()) 

402 for item in data[-500:]: 

403 signal = BehaviorSignal( 

404 id=item.get("id", ""), 

405 type_=SignalType(item.get("type", "tool_usage")), 

406 timestamp=item.get("timestamp", 0), 

407 user_id=item.get("user_id", "default"), 

408 session_id=item.get("session_id", ""), 

409 tool_name=item.get("tool_name", ""), 

410 tool_success=item.get("tool_success", True), 

411 feedback_type=item.get("feedback_type", ""), 

412 feedback_text=item.get("feedback_text", ""), 

413 polarity=FeedbackPolarity(item.get("polarity", "neutral")), 

414 context_before=item.get("context_before", ""), 

415 context_after=item.get("context_after", ""), 

416 metadata=item.get("metadata", {}), 

417 ) 

418 self._buffer.append(signal) 

419 self._tool_counter[signal.tool_name] += 1 

420 except Exception: 

421 pass