Coverage for agentos/tools/key_rotation.py: 0%
118 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 07:57 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 07:57 +0800
1"""
2KeyRotation — automated secret key rotation with grace periods and scheduled callbacks.
4Supports:
5 - Schedule-based rotation (interval in seconds)
6 - Manual rotation trigger
7 - Grace period (old key still valid for verification)
8 - Current / pending / expired key states
9 - Rotation hooks (pre_rotate, post_rotate)
10 - Thread-safe
11"""
13from __future__ import annotations
15import os
16import secrets
17import threading
18import time
19from dataclasses import dataclass, field
20from enum import Enum
21from typing import Any, Callable, Dict, List, Optional
24# ============================================================================
25# Key State
26# ============================================================================
28class KeyState(Enum):
29 CURRENT = "current"
30 PENDING = "pending" # newly rotated, still in grace
31 EXPIRED = "expired"
34@dataclass
35class KeyEntry:
36 key: str
37 state: KeyState
38 created_at: float
39 expires_at: Optional[float] = None
42# ============================================================================
43# KeyRotation
44# ============================================================================
46class KeyRotation:
47 """Automated secret key rotation with grace periods.
49 Usage:
50 kr = KeyRotation(rotation_interval=3600, grace_period=300, key_length=32)
51 kr.start()
53 current = kr.current_key # Use this for signing/encryption
54 active_keys = kr.active_keys # All currently valid keys
56 # When a new key is rotated in:
57 # - current becomes the new key
58 # - old key stays in active_keys during grace period (for validation)
59 # - after grace period expires, old key is removed
60 """
62 def __init__(
63 self,
64 rotation_interval: float = 3600.0,
65 grace_period: float = 300.0,
66 key_length: int = 32,
67 ):
68 if rotation_interval <= 0:
69 raise ValueError("rotation_interval must be positive")
70 if grace_period < 0:
71 raise ValueError("grace_period must be non-negative")
72 self._interval = rotation_interval
73 self._grace_period = grace_period
74 self._key_length = key_length
75 self._keys: List[KeyEntry] = []
76 self._lock = threading.RLock()
77 self._timer: Optional[threading.Timer] = None
78 self._running = False
80 # Hooks
81 self._pre_rotate: List[Callable[[], None]] = []
82 self._post_rotate: List[Callable[[str, str], None]] = [] # (old_key, new_key)
84 # Seed with initial key
85 self._rotate_now()
87 # ---------- Lifecycle ----------
89 def start(self) -> None:
90 with self._lock:
91 if self._running:
92 return
93 self._running = True
94 self._schedule_next()
96 def stop(self) -> None:
97 with self._lock:
98 self._running = False
99 if self._timer:
100 self._timer.cancel()
101 self._timer = None
103 def _schedule_next(self) -> None:
104 with self._lock:
105 if not self._running:
106 return
107 self._timer = threading.Timer(self._interval, self._on_timer)
108 self._timer.daemon = True
109 self._timer.start()
111 def _on_timer(self) -> None:
112 self._rotate_now()
113 self._schedule_next()
115 # ---------- Rotation ----------
117 def rotate(self) -> str:
118 """Manually trigger a rotation. Returns the new key."""
119 return self._rotate_now()
121 def _rotate_now(self) -> str:
122 new_key = secrets.token_hex(self._key_length)
123 now = time.time()
125 with self._lock:
126 old_key = self._keys[0].key if self._keys else None
128 # Notify pre-rotation
129 self._notify_pre_rotate()
131 # Move current → pending (if grace > 0), otherwise expired
132 for entry in self._keys:
133 if entry.state == KeyState.CURRENT:
134 if self._grace_period > 0:
135 entry.state = KeyState.PENDING
136 entry.expires_at = now + self._grace_period
137 else:
138 entry.state = KeyState.EXPIRED
140 # Add new current key
141 new_entry = KeyEntry(key=new_key, state=KeyState.CURRENT, created_at=now)
142 self._keys.insert(0, new_entry)
144 # Clean up expired
145 self._keys = [e for e in self._keys if e.state != KeyState.EXPIRED]
147 # Notify post-rotation
148 self._notify_post_rotate(old_key, new_key)
150 return new_key
152 # ---------- Key Access ----------
154 @property
155 def current_key(self) -> Optional[str]:
156 with self._lock:
157 for entry in self._keys:
158 if entry.state == KeyState.CURRENT:
159 return entry.key
160 return None
162 @property
163 def active_keys(self) -> List[str]:
164 """All currently valid keys (CURRENT + PENDING)."""
165 with self._lock:
166 self._cleanup_expired()
167 return [e.key for e in self._keys if e.state in (KeyState.CURRENT, KeyState.PENDING)]
169 @property
170 def pending_keys(self) -> List[str]:
171 """Keys in grace period only."""
172 with self._lock:
173 self._cleanup_expired()
174 return [e.key for e in self._keys if e.state == KeyState.PENDING]
176 def is_valid(self, key: str) -> bool:
177 """Check if a key is currently valid (CURRENT or PENDING)."""
178 return key in self.active_keys
180 def _cleanup_expired(self) -> None:
181 now = time.time()
182 self._keys = [
183 e for e in self._keys
184 if e.state != KeyState.EXPIRED
185 and (e.expires_at is None or e.expires_at > now)
186 ]
188 # ---------- Hooks ----------
190 def on_pre_rotate(self, callback: Callable[[], None]) -> None:
191 self._pre_rotate.append(callback)
193 def on_post_rotate(self, callback: Callable[[str, str], None]) -> None:
194 self._post_rotate.append(callback)
196 def _notify_pre_rotate(self) -> None:
197 for cb in self._pre_rotate:
198 try:
199 cb()
200 except Exception:
201 pass
203 def _notify_post_rotate(self, old_key: Optional[str], new_key: str) -> None:
204 for cb in self._post_rotate:
205 try:
206 cb(old_key, new_key)
207 except Exception:
208 pass
210 # ---------- Info ----------
212 @property
213 def stats(self) -> Dict[str, Any]:
214 with self._lock:
215 self._cleanup_expired()
216 return {
217 "total_keys": len(self._keys),
218 "current": 1 if any(e.state == KeyState.CURRENT for e in self._keys) else 0,
219 "pending": sum(1 for e in self._keys if e.state == KeyState.PENDING),
220 "rotation_interval": self._interval,
221 "grace_period": self._grace_period,
222 "key_length": self._key_length,
223 "running": self._running,
224 }