Coverage for agentos/monitoring/alerts.py: 58%
92 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""AgentOS monitoring — alert rules and webhook notification dispatcher."""
3from __future__ import annotations
5import json
6import time
7import urllib.request
8from dataclasses import dataclass, field
9from enum import Enum
10from typing import Callable, Optional
13class AlertSeverity(str, Enum):
14 """告警实例。"""
16 """告警严重级别。"""
18 CRITICAL = "critical"
19 WARNING = "warning"
20 INFO = "info"
23class AlertState(str, Enum):
25 """告警状态。"""
27 FIRING = "firing"
28 RESOLVED = "resolved"
31@dataclass
32class AlertRule:
33 """告警规则。"""
34 name: str
35 description: str
36 severity: AlertSeverity = AlertSeverity.WARNING
37 condition: Optional[Callable[[], bool]] = None
38 cooldown_seconds: int = 300
39 _last_fired: float = field(default=0.0, repr=False)
41 def evaluate(self) -> bool:
42 if not self.condition:
43 return False
44 now = time.time()
45 if now - self._last_fired < self.cooldown_seconds:
46 return False
47 result = self.condition()
48 if result:
49 self._last_fired = now
50 return result
53@dataclass
54class Alert:
55 rule_name: str
56 severity: AlertSeverity
57 message: str
58 state: AlertState = AlertState.FIRING
59 timestamp: float = field(default_factory=time.time)
60 labels: dict = field(default_factory=dict)
62 def to_dict(self) -> dict:
63 return {
64 "rule_name": self.rule_name,
65 "severity": self.severity.value,
66 "message": self.message,
67 "state": self.state.value,
68 "timestamp": self.timestamp,
69 "labels": self.labels,
70 }
72 def to_json(self) -> str:
73 return json.dumps(self.to_dict())
76@dataclass
77class MonitoringConfig:
78 """监控配置。"""
79 enabled: bool = True
80 evaluation_interval: int = 60
81 max_alerts_per_interval: int = 10
84@dataclass
85class WebhookConfig:
86 """Webhook 配置。"""
87 url: str = ""
88 method: str = "POST"
89 headers: dict = field(default_factory=dict)
90 timeout: float = 5.0
91 retry_count: int = 3
94class WebhookDispatcher:
95 """Dispatches Alerts to configured webhook endpoints."""
97 def __init__(self, config: Optional[WebhookConfig] = None):
98 self.config = config or WebhookConfig()
100 def send(self, alert: Alert) -> bool:
101 if not self.config.url:
102 return False
103 payload = json.dumps(alert.to_dict()).encode("utf-8")
104 for attempt in range(self.config.retry_count + 1):
105 try:
106 req = urllib.request.Request(
107 self.config.url,
108 data=payload,
109 headers=self.config.headers,
110 method=self.config.method,
111 )
112 with urllib.request.urlopen(req, timeout=self.config.timeout) as resp:
113 return resp.status < 400
114 except Exception:
115 if attempt == self.config.retry_count:
116 return False
117 time.sleep(1.0 * (attempt + 1))
118 return False
121class AlertEvaluator:
122 """Evaluates AlertRules and generates Alerts."""
124 def __init__(self, config: Optional[MonitoringConfig] = None):
125 self.config = config or MonitoringConfig()
126 self.rules: list[AlertRule] = []
128 def add_rule(self, rule: AlertRule):
129 self.rules.append(rule)
131 def evaluate(self) -> list[Alert]:
132 if not self.config.enabled:
133 return []
134 alerts: list[Alert] = []
135 count = 0
136 for rule in self.rules:
137 if count >= self.config.max_alerts_per_interval:
138 break
139 if rule.evaluate():
140 alerts.append(Alert(
141 rule_name=rule.name,
142 severity=rule.severity,
143 message=f"Alert: {rule.description}",
144 ))
145 count += 1
146 return alerts