Coverage for agentos/monitoring/alerts.py: 58%

92 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1"""AgentOS monitoring — alert rules and webhook notification dispatcher.""" 

2 

3from __future__ import annotations 

4 

5import json 

6import time 

7import urllib.request 

8from dataclasses import dataclass, field 

9from enum import Enum 

10from typing import Callable, Optional 

11 

12 

13class AlertSeverity(str, Enum): 

14 """告警实例。""" 

15 

16 """告警严重级别。""" 

17 

18 CRITICAL = "critical" 

19 WARNING = "warning" 

20 INFO = "info" 

21 

22 

23class AlertState(str, Enum): 

24 

25 """告警状态。""" 

26 

27 FIRING = "firing" 

28 RESOLVED = "resolved" 

29 

30 

31@dataclass 

32class AlertRule: 

33 """告警规则。""" 

34 name: str 

35 description: str 

36 severity: AlertSeverity = AlertSeverity.WARNING 

37 condition: Optional[Callable[[], bool]] = None 

38 cooldown_seconds: int = 300 

39 _last_fired: float = field(default=0.0, repr=False) 

40 

41 def evaluate(self) -> bool: 

42 if not self.condition: 

43 return False 

44 now = time.time() 

45 if now - self._last_fired < self.cooldown_seconds: 

46 return False 

47 result = self.condition() 

48 if result: 

49 self._last_fired = now 

50 return result 

51 

52 

53@dataclass 

54class Alert: 

55 rule_name: str 

56 severity: AlertSeverity 

57 message: str 

58 state: AlertState = AlertState.FIRING 

59 timestamp: float = field(default_factory=time.time) 

60 labels: dict = field(default_factory=dict) 

61 

62 def to_dict(self) -> dict: 

63 return { 

64 "rule_name": self.rule_name, 

65 "severity": self.severity.value, 

66 "message": self.message, 

67 "state": self.state.value, 

68 "timestamp": self.timestamp, 

69 "labels": self.labels, 

70 } 

71 

72 def to_json(self) -> str: 

73 return json.dumps(self.to_dict()) 

74 

75 

76@dataclass 

77class MonitoringConfig: 

78 """监控配置。""" 

79 enabled: bool = True 

80 evaluation_interval: int = 60 

81 max_alerts_per_interval: int = 10 

82 

83 

84@dataclass 

85class WebhookConfig: 

86 """Webhook 配置。""" 

87 url: str = "" 

88 method: str = "POST" 

89 headers: dict = field(default_factory=dict) 

90 timeout: float = 5.0 

91 retry_count: int = 3 

92 

93 

94class WebhookDispatcher: 

95 """Dispatches Alerts to configured webhook endpoints.""" 

96 

97 def __init__(self, config: Optional[WebhookConfig] = None): 

98 self.config = config or WebhookConfig() 

99 

100 def send(self, alert: Alert) -> bool: 

101 if not self.config.url: 

102 return False 

103 payload = json.dumps(alert.to_dict()).encode("utf-8") 

104 for attempt in range(self.config.retry_count + 1): 

105 try: 

106 req = urllib.request.Request( 

107 self.config.url, 

108 data=payload, 

109 headers=self.config.headers, 

110 method=self.config.method, 

111 ) 

112 with urllib.request.urlopen(req, timeout=self.config.timeout) as resp: 

113 return resp.status < 400 

114 except Exception: 

115 if attempt == self.config.retry_count: 

116 return False 

117 time.sleep(1.0 * (attempt + 1)) 

118 return False 

119 

120 

121class AlertEvaluator: 

122 """Evaluates AlertRules and generates Alerts.""" 

123 

124 def __init__(self, config: Optional[MonitoringConfig] = None): 

125 self.config = config or MonitoringConfig() 

126 self.rules: list[AlertRule] = [] 

127 

128 def add_rule(self, rule: AlertRule): 

129 self.rules.append(rule) 

130 

131 def evaluate(self) -> list[Alert]: 

132 if not self.config.enabled: 

133 return [] 

134 alerts: list[Alert] = [] 

135 count = 0 

136 for rule in self.rules: 

137 if count >= self.config.max_alerts_per_interval: 

138 break 

139 if rule.evaluate(): 

140 alerts.append(Alert( 

141 rule_name=rule.name, 

142 severity=rule.severity, 

143 message=f"Alert: {rule.description}", 

144 )) 

145 count += 1 

146 return alerts