Coverage for agentos/evolution/signals.py: 39%
191 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2Behavior Signal Detection — User behavior signal collection and analysis.
4Signals collected:
5 - Tool usage frequency and patterns
6 - Explicit feedback (thumbs up/down, ratings)
7 - Implicit feedback (corrections, re-prompts, "no/stop/undo")
8 - Conversation context (topic shifts, depth, sentiment)
9 - Timing signals (response latency, session duration)
10 - Preference signals (format preferences, language, tone)
12These signals feed into the Learner to generate evolution proposals.
13"""
15from __future__ import annotations
17import json
18import time
19import uuid
20from collections import defaultdict
21from dataclasses import dataclass, field
22from enum import Enum
23from pathlib import Path
24from typing import Any, Callable, Optional
27# ── Signal Types ──
29class SignalType(str, Enum):
30 TOOL_USAGE = "tool_usage" # Tool was invoked
31 EXPLICIT_FEEDBACK = "explicit_feedback" # User gave explicit rating
32 CORRECTION = "correction" # User corrected agent
33 RE_PROMPT = "re_prompt" # User re-asked the same thing
34 UNDO = "undo" # User undid an action
35 SESSION_LENGTH = "session_length" # Session duration
36 TOPIC_SWITCH = "topic_switch" # User changed topic abruptly
37 FORMAT_PREFERENCE = "format_preference" # Output format preference
38 RESPONSE_LATENCY = "response_latency" # How fast agent responded
39 ERROR_RECOVERY = "error_recovery" # Error occurred and agent recovered
40 PATTERN_MATCH = "pattern_match" # Recognized repeated pattern
43class FeedbackPolarity(str, Enum):
44 POSITIVE = "positive"
45 NEGATIVE = "negative"
46 NEUTRAL = "neutral"
49# ── Signal Data ──
51@dataclass
52class BehaviorSignal:
53 """A single observed user behavior signal."""
55 id: str = field(default_factory=lambda: uuid.uuid4().hex[:8])
56 type_: SignalType = SignalType.TOOL_USAGE
57 timestamp: float = field(default_factory=time.time)
58 user_id: str = "default"
59 session_id: str = ""
61 # Payload
62 tool_name: str = ""
63 tool_args: dict = field(default_factory=dict)
64 tool_success: bool = True
65 tool_duration_ms: float = 0.0
67 feedback_type: str = "" # "thumbs_up", "thumbs_down", "rating:4"
68 feedback_text: str = ""
69 polarity: FeedbackPolarity = FeedbackPolarity.NEUTRAL
71 context_before: str = "" # What happened before this signal
72 context_after: str = "" # Result after this signal
74 metadata: dict[str, Any] = field(default_factory=dict)
76 def to_dict(self) -> dict:
77 return {
78 "id": self.id,
79 "type": self.type_.value,
80 "timestamp": self.timestamp,
81 "user_id": self.user_id,
82 "session_id": self.session_id,
83 "tool_name": self.tool_name,
84 "tool_args": self.tool_args,
85 "tool_success": self.tool_success,
86 "tool_duration_ms": self.tool_duration_ms,
87 "feedback_type": self.feedback_type,
88 "feedback_text": self.feedback_text,
89 "polarity": self.polarity.value,
90 "context_before": self.context_before[:500],
91 "context_after": self.context_after[:500],
92 "metadata": self.metadata,
93 }
96@dataclass
97class SignalSummary:
98 """Aggregated signal analysis over a time window."""
100 window_start: float = 0.0
101 window_end: float = field(default_factory=time.time)
102 total_signals: int = 0
104 # Tool usage
105 top_tools: list[tuple[str, int]] = field(default_factory=list) # [(tool_name, count)]
106 tool_success_rate: float = 0.0
108 # Feedback
109 positive_feedback: int = 0
110 negative_feedback: int = 0
111 correction_count: int = 0
112 undo_count: int = 0
113 re_prompt_count: int = 0
115 # Patterns
116 detected_patterns: list[str] = field(default_factory=list)
118 def to_dict(self) -> dict:
119 return {
120 "window_start": self.window_start,
121 "window_end": self.window_end,
122 "total_signals": self.total_signals,
123 "top_tools": self.top_tools,
124 "tool_success_rate": self.tool_success_rate,
125 "positive_feedback": self.positive_feedback,
126 "negative_feedback": self.negative_feedback,
127 "correction_count": self.correction_count,
128 "undo_count": self.undo_count,
129 "re_prompt_count": self.re_prompt_count,
130 "detected_patterns": self.detected_patterns,
131 }
134# ── Signal Collector ──
136class SignalCollector:
137 """Collects and persists user behavior signals.
139 Features:
140 - In-memory ring buffer (last N signals)
141 - Optional disk persistence
142 - Signal hooks for real-time processing
143 - Aggregation windows for analysis
145 Usage:
146 collector = SignalCollector(buffer_size=1000)
148 # Record tool usage
149 collector.record_tool_usage("web_search", {"query": "..."}, success=True)
151 # Record explicit feedback
152 collector.record_feedback("thumbs_up", "Great answer!")
154 # Record correction
155 collector.record_correction("No, use Python not JS")
157 # Get summary
158 summary = collector.summarize(hours=24)
159 """
161 def __init__(
162 self,
163 buffer_size: int = 2000,
164 persist_path: Optional[str] = None,
165 ):
166 self._buffer: list[BehaviorSignal] = []
167 self._buffer_size = buffer_size
168 self._persist_path = Path(persist_path) if persist_path else None
169 self._hooks: list[Callable[[BehaviorSignal], None]] = []
170 self._tool_counter: dict[str, int] = defaultdict(int)
171 self._session_id: str = ""
173 if self._persist_path:
174 self._persist_path.parent.mkdir(parents=True, exist_ok=True)
175 self._load_from_disk()
177 def set_session(self, session_id: str) -> None:
178 self._session_id = session_id
180 # ── Recording API ──
182 def record_tool_usage(
183 self,
184 tool_name: str,
185 tool_args: dict = None,
186 success: bool = True,
187 duration_ms: float = 0.0,
188 ) -> BehaviorSignal:
189 """Record a tool invocation."""
190 signal = BehaviorSignal(
191 type_=SignalType.TOOL_USAGE,
192 tool_name=tool_name,
193 tool_args=tool_args or {},
194 tool_success=success,
195 tool_duration_ms=duration_ms,
196 session_id=self._session_id,
197 )
198 self._tool_counter[tool_name] += 1
199 self._append(signal)
200 return signal
202 def record_feedback(
203 self,
204 feedback_type: str,
205 feedback_text: str = "",
206 ) -> BehaviorSignal:
207 """Record explicit user feedback."""
208 polarity = FeedbackPolarity.NEUTRAL
209 if feedback_type in ("thumbs_up", "positive", "5", "4"):
210 polarity = FeedbackPolarity.POSITIVE
211 elif feedback_type in ("thumbs_down", "negative", "1", "2"):
212 polarity = FeedbackPolarity.NEGATIVE
214 signal = BehaviorSignal(
215 type_=SignalType.EXPLICIT_FEEDBACK,
216 feedback_type=feedback_type,
217 feedback_text=feedback_text,
218 polarity=polarity,
219 session_id=self._session_id,
220 )
221 self._append(signal)
222 return signal
224 def record_correction(self, correction_text: str, context: str = "") -> BehaviorSignal:
225 """Record a user correction."""
226 signal = BehaviorSignal(
227 type_=SignalType.CORRECTION,
228 feedback_text=correction_text,
229 context_before=context,
230 polarity=FeedbackPolarity.NEGATIVE,
231 session_id=self._session_id,
232 )
233 self._append(signal)
234 return signal
236 def record_undo(self, action: str = "") -> BehaviorSignal:
237 """Record an undo action."""
238 signal = BehaviorSignal(
239 type_=SignalType.UNDO,
240 tool_name=action,
241 polarity=FeedbackPolarity.NEGATIVE,
242 session_id=self._session_id,
243 )
244 self._append(signal)
245 return signal
247 def record_re_prompt(self, original_query: str = "") -> BehaviorSignal:
248 """Record a re-prompt (user asked again differently)."""
249 signal = BehaviorSignal(
250 type_=SignalType.RE_PROMPT,
251 context_before=original_query,
252 polarity=FeedbackPolarity.NEGATIVE,
253 session_id=self._session_id,
254 )
255 self._append(signal)
256 return signal
258 def record_format_preference(self, format_type: str) -> BehaviorSignal:
259 """Record output format preference."""
260 signal = BehaviorSignal(
261 type_=SignalType.FORMAT_PREFERENCE,
262 feedback_type=format_type,
263 session_id=self._session_id,
264 )
265 self._append(signal)
266 return signal
268 def record_error_recovery(self, error: str, recovered: bool = True) -> BehaviorSignal:
269 """Record an error that the agent recovered from."""
270 signal = BehaviorSignal(
271 type_=SignalType.ERROR_RECOVERY,
272 context_before=error,
273 tool_success=recovered,
274 session_id=self._session_id,
275 )
276 self._append(signal)
277 return signal
279 # ── Analysis ──
281 def summarize(self, hours: float = 24) -> SignalSummary:
282 """Generate a summary of signals over the last N hours."""
283 now = time.time()
284 cutoff = now - hours * 3600
286 signals = [s for s in self._buffer if s.timestamp >= cutoff]
288 summary = SignalSummary(
289 window_start=cutoff,
290 window_end=now,
291 total_signals=len(signals),
292 )
294 tool_counts: dict[str, int] = defaultdict(int)
295 total_tools = 0
296 successful_tools = 0
298 for s in signals:
299 if s.type_ == SignalType.TOOL_USAGE:
300 tool_counts[s.tool_name] += 1
301 total_tools += 1
302 if s.tool_success:
303 successful_tools += 1
305 elif s.type_ == SignalType.EXPLICIT_FEEDBACK:
306 if s.polarity == FeedbackPolarity.POSITIVE:
307 summary.positive_feedback += 1
308 elif s.polarity == FeedbackPolarity.NEGATIVE:
309 summary.negative_feedback += 1
311 elif s.type_ == SignalType.CORRECTION:
312 summary.correction_count += 1
314 elif s.type_ == SignalType.UNDO:
315 summary.undo_count += 1
317 elif s.type_ == SignalType.RE_PROMPT:
318 summary.re_prompt_count += 1
320 summary.top_tools = sorted(tool_counts.items(), key=lambda x: -x[1])[:10]
321 summary.tool_success_rate = successful_tools / max(total_tools, 1)
323 # Detect patterns
324 summary.detected_patterns = self._detect_patterns(signals)
326 return summary
328 def get_tool_ranking(self, top_n: int = 10) -> list[tuple[str, int]]:
329 return sorted(self._tool_counter.items(), key=lambda x: -x[1])[:top_n]
331 def get_feedback_ratio(self, hours: float = 168) -> float:
332 """Positive feedback ratio over time window."""
333 summary = self.summarize(hours)
334 total = summary.positive_feedback + summary.negative_feedback
335 if total == 0:
336 return 0.5
337 return summary.positive_feedback / total
339 # ── Hooks ──
341 def on_signal(self, hook: Callable[[BehaviorSignal], None]) -> None:
342 """Register a hook called on every new signal."""
343 self._hooks.append(hook)
345 # ── Internal ──
347 def _append(self, signal: BehaviorSignal) -> None:
348 self._buffer.append(signal)
349 if len(self._buffer) > self._buffer_size:
350 self._buffer = self._buffer[-self._buffer_size:]
352 for hook in self._hooks:
353 try:
354 hook(signal)
355 except Exception:
356 pass
358 if self._persist_path:
359 self._save_to_disk()
361 def _detect_patterns(self, signals: list[BehaviorSignal]) -> list[str]:
362 """Detect behavioral patterns from signals."""
363 patterns = []
365 # Pattern: frequent corrections on same topic
366 corrections = [s for s in signals if s.type_ == SignalType.CORRECTION]
367 if len(corrections) >= 3:
368 patterns.append(f"frequent_corrections:{len(corrections)}")
370 # Pattern: high undo rate
371 undos = [s for s in signals if s.type_ == SignalType.UNDO]
372 if len(undos) >= 2:
373 patterns.append(f"high_undo_rate:{len(undos)}")
375 # Pattern: repeated tool failures
376 failed_tools = [s for s in signals if s.type_ == SignalType.TOOL_USAGE and not s.tool_success]
377 if len(failed_tools) >= 3:
378 tools = set(s.tool_name for s in failed_tools)
379 patterns.append(f"failing_tools:{','.join(tools)}")
381 # Pattern: positive feedback streak
382 positive = [s for s in signals if s.polarity == FeedbackPolarity.POSITIVE]
383 if len(positive) >= 5:
384 patterns.append(f"positive_streak:{len(positive)}")
386 return patterns
388 def _save_to_disk(self) -> None:
389 if not self._persist_path:
390 return
391 try:
392 data = [s.to_dict() for s in self._buffer[-500:]]
393 self._persist_path.write_text(json.dumps(data, ensure_ascii=False, indent=2))
394 except Exception:
395 pass
397 def _load_from_disk(self) -> None:
398 if not self._persist_path or not self._persist_path.exists():
399 return
400 try:
401 data = json.loads(self._persist_path.read_text())
402 for item in data[-500:]:
403 signal = BehaviorSignal(
404 id=item.get("id", ""),
405 type_=SignalType(item.get("type", "tool_usage")),
406 timestamp=item.get("timestamp", 0),
407 user_id=item.get("user_id", "default"),
408 session_id=item.get("session_id", ""),
409 tool_name=item.get("tool_name", ""),
410 tool_success=item.get("tool_success", True),
411 feedback_type=item.get("feedback_type", ""),
412 feedback_text=item.get("feedback_text", ""),
413 polarity=FeedbackPolarity(item.get("polarity", "neutral")),
414 context_before=item.get("context_before", ""),
415 context_after=item.get("context_after", ""),
416 metadata=item.get("metadata", {}),
417 )
418 self._buffer.append(signal)
419 self._tool_counter[signal.tool_name] += 1
420 except Exception:
421 pass