Coverage for agentos/tools/key_rotation.py: 0%

118 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-03 07:57 +0800

1""" 

2KeyRotation — automated secret key rotation with grace periods and scheduled callbacks. 

3 

4Supports: 

5 - Schedule-based rotation (interval in seconds) 

6 - Manual rotation trigger 

7 - Grace period (old key still valid for verification) 

8 - Current / pending / expired key states 

9 - Rotation hooks (pre_rotate, post_rotate) 

10 - Thread-safe 

11""" 

12 

13from __future__ import annotations 

14 

15import os 

16import secrets 

17import threading 

18import time 

19from dataclasses import dataclass, field 

20from enum import Enum 

21from typing import Any, Callable, Dict, List, Optional 

22 

23 

24# ============================================================================ 

25# Key State 

26# ============================================================================ 

27 

28class KeyState(Enum): 

29 CURRENT = "current" 

30 PENDING = "pending" # newly rotated, still in grace 

31 EXPIRED = "expired" 

32 

33 

34@dataclass 

35class KeyEntry: 

36 key: str 

37 state: KeyState 

38 created_at: float 

39 expires_at: Optional[float] = None 

40 

41 

42# ============================================================================ 

43# KeyRotation 

44# ============================================================================ 

45 

46class KeyRotation: 

47 """Automated secret key rotation with grace periods. 

48 

49 Usage: 

50 kr = KeyRotation(rotation_interval=3600, grace_period=300, key_length=32) 

51 kr.start() 

52 

53 current = kr.current_key # Use this for signing/encryption 

54 active_keys = kr.active_keys # All currently valid keys 

55 

56 # When a new key is rotated in: 

57 # - current becomes the new key 

58 # - old key stays in active_keys during grace period (for validation) 

59 # - after grace period expires, old key is removed 

60 """ 

61 

62 def __init__( 

63 self, 

64 rotation_interval: float = 3600.0, 

65 grace_period: float = 300.0, 

66 key_length: int = 32, 

67 ): 

68 if rotation_interval <= 0: 

69 raise ValueError("rotation_interval must be positive") 

70 if grace_period < 0: 

71 raise ValueError("grace_period must be non-negative") 

72 self._interval = rotation_interval 

73 self._grace_period = grace_period 

74 self._key_length = key_length 

75 self._keys: List[KeyEntry] = [] 

76 self._lock = threading.RLock() 

77 self._timer: Optional[threading.Timer] = None 

78 self._running = False 

79 

80 # Hooks 

81 self._pre_rotate: List[Callable[[], None]] = [] 

82 self._post_rotate: List[Callable[[str, str], None]] = [] # (old_key, new_key) 

83 

84 # Seed with initial key 

85 self._rotate_now() 

86 

87 # ---------- Lifecycle ---------- 

88 

89 def start(self) -> None: 

90 with self._lock: 

91 if self._running: 

92 return 

93 self._running = True 

94 self._schedule_next() 

95 

96 def stop(self) -> None: 

97 with self._lock: 

98 self._running = False 

99 if self._timer: 

100 self._timer.cancel() 

101 self._timer = None 

102 

103 def _schedule_next(self) -> None: 

104 with self._lock: 

105 if not self._running: 

106 return 

107 self._timer = threading.Timer(self._interval, self._on_timer) 

108 self._timer.daemon = True 

109 self._timer.start() 

110 

111 def _on_timer(self) -> None: 

112 self._rotate_now() 

113 self._schedule_next() 

114 

115 # ---------- Rotation ---------- 

116 

117 def rotate(self) -> str: 

118 """Manually trigger a rotation. Returns the new key.""" 

119 return self._rotate_now() 

120 

121 def _rotate_now(self) -> str: 

122 new_key = secrets.token_hex(self._key_length) 

123 now = time.time() 

124 

125 with self._lock: 

126 old_key = self._keys[0].key if self._keys else None 

127 

128 # Notify pre-rotation 

129 self._notify_pre_rotate() 

130 

131 # Move current → pending (if grace > 0), otherwise expired 

132 for entry in self._keys: 

133 if entry.state == KeyState.CURRENT: 

134 if self._grace_period > 0: 

135 entry.state = KeyState.PENDING 

136 entry.expires_at = now + self._grace_period 

137 else: 

138 entry.state = KeyState.EXPIRED 

139 

140 # Add new current key 

141 new_entry = KeyEntry(key=new_key, state=KeyState.CURRENT, created_at=now) 

142 self._keys.insert(0, new_entry) 

143 

144 # Clean up expired 

145 self._keys = [e for e in self._keys if e.state != KeyState.EXPIRED] 

146 

147 # Notify post-rotation 

148 self._notify_post_rotate(old_key, new_key) 

149 

150 return new_key 

151 

152 # ---------- Key Access ---------- 

153 

154 @property 

155 def current_key(self) -> Optional[str]: 

156 with self._lock: 

157 for entry in self._keys: 

158 if entry.state == KeyState.CURRENT: 

159 return entry.key 

160 return None 

161 

162 @property 

163 def active_keys(self) -> List[str]: 

164 """All currently valid keys (CURRENT + PENDING).""" 

165 with self._lock: 

166 self._cleanup_expired() 

167 return [e.key for e in self._keys if e.state in (KeyState.CURRENT, KeyState.PENDING)] 

168 

169 @property 

170 def pending_keys(self) -> List[str]: 

171 """Keys in grace period only.""" 

172 with self._lock: 

173 self._cleanup_expired() 

174 return [e.key for e in self._keys if e.state == KeyState.PENDING] 

175 

176 def is_valid(self, key: str) -> bool: 

177 """Check if a key is currently valid (CURRENT or PENDING).""" 

178 return key in self.active_keys 

179 

180 def _cleanup_expired(self) -> None: 

181 now = time.time() 

182 self._keys = [ 

183 e for e in self._keys 

184 if e.state != KeyState.EXPIRED 

185 and (e.expires_at is None or e.expires_at > now) 

186 ] 

187 

188 # ---------- Hooks ---------- 

189 

190 def on_pre_rotate(self, callback: Callable[[], None]) -> None: 

191 self._pre_rotate.append(callback) 

192 

193 def on_post_rotate(self, callback: Callable[[str, str], None]) -> None: 

194 self._post_rotate.append(callback) 

195 

196 def _notify_pre_rotate(self) -> None: 

197 for cb in self._pre_rotate: 

198 try: 

199 cb() 

200 except Exception: 

201 pass 

202 

203 def _notify_post_rotate(self, old_key: Optional[str], new_key: str) -> None: 

204 for cb in self._post_rotate: 

205 try: 

206 cb(old_key, new_key) 

207 except Exception: 

208 pass 

209 

210 # ---------- Info ---------- 

211 

212 @property 

213 def stats(self) -> Dict[str, Any]: 

214 with self._lock: 

215 self._cleanup_expired() 

216 return { 

217 "total_keys": len(self._keys), 

218 "current": 1 if any(e.state == KeyState.CURRENT for e in self._keys) else 0, 

219 "pending": sum(1 for e in self._keys if e.state == KeyState.PENDING), 

220 "rotation_interval": self._interval, 

221 "grace_period": self._grace_period, 

222 "key_length": self._key_length, 

223 "running": self._running, 

224 }