Coverage for agentos/tools/totp.py: 0%
80 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 08:12 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 08:12 +0800
1"""
2TOTP — Time-based One-Time Password (RFC 6238).
4Supports:
5 - TOTP generation (SHA1/SHA256/SHA512)
6 - Configurable digits (6/8) and period (30s default)
7 - Key URI generation for QR codes (otpauth://)
8 - Verification with drift tolerance
9 - HOTP (counter-based) support (RFC 4226)
10"""
12from __future__ import annotations
14import base64
15import hashlib
16import hmac
17import struct
18import time
19from typing import Optional, Tuple
20from urllib.parse import quote, urlencode
23# ============================================================================
24# TOTP / HOTP
25# ============================================================================
27class TOTP:
28 """Time-based One-Time Password generator (RFC 6238).
30 Usage:
31 totp = TOTP(secret="JBSWY3DPEHPK3PXP")
32 code = totp.now() # 6-digit code
33 ok = totp.verify(code) # True/False
34 uri = totp.to_uri("user@example.com", "MyApp") # QR code URI
35 """
37 def __init__(
38 self,
39 secret: str,
40 digits: int = 6,
41 period: int = 30,
42 algorithm: str = "SHA1",
43 ):
44 self._secret = secret.upper().replace(" ", "")
45 self._digits = digits
46 self._period = period
47 algorithms = {"SHA1": hashlib.sha1, "SHA256": hashlib.sha256, "SHA512": hashlib.sha512}
48 if algorithm not in algorithms:
49 raise ValueError(f"Unsupported algorithm: {algorithm}")
50 self._hash_func = algorithms[algorithm]
51 self._algorithm = algorithm
53 @property
54 def secret(self) -> str:
55 return self._secret
57 @property
58 def digits(self) -> int:
59 return self._digits
61 @property
62 def period(self) -> int:
63 return self._period
65 @property
66 def algorithm(self) -> str:
67 return self._algorithm
69 # ---------- Generation ----------
71 def now(self) -> str:
72 """Generate the current TOTP code."""
73 return self.at(int(time.time()))
75 def at(self, timestamp: int) -> str:
76 """Generate TOTP code for a specific Unix timestamp."""
77 counter = timestamp // self._period
78 return self._generate(counter)
80 # ---------- Verification ----------
82 def verify(
83 self,
84 code: str,
85 drift: int = 1,
86 timestamp: Optional[int] = None,
87 ) -> bool:
88 """Verify a TOTP code with optional drift tolerance.
90 Args:
91 code: The code to verify
92 drift: Number of periods before/after to check (default 1 = +/-30s)
93 timestamp: Reference timestamp, defaults to now
94 """
95 ts = timestamp or int(time.time())
96 for offset in range(-drift, drift + 1):
97 if self.at(ts + offset * self._period) == code:
98 return True
99 return False
101 # ---------- URI ----------
103 def to_uri(self, account: str, issuer: Optional[str] = None) -> str:
104 """Generate otpauth:// URI for QR code.
106 Args:
107 account: User account (e.g., email)
108 issuer: Service name
109 """
110 label = account
111 if issuer:
112 label = f"{issuer}:{account}"
114 params = {
115 "secret": self._secret,
116 "digits": str(self._digits),
117 "period": str(self._period),
118 "algorithm": self._algorithm,
119 }
120 if issuer:
121 params["issuer"] = issuer
123 query = urlencode(params)
124 return f"otpauth://totp/{quote(label)}?{query}"
126 # ---------- Internal ----------
128 def _generate(self, counter: int) -> str:
129 """Generate HOTP code for a given counter."""
130 key = base64.b64decode(self._pad_base64(self._secret))
131 msg = struct.pack(">Q", counter)
132 h = hmac.new(key, msg, self._hash_func).digest()
133 offset = h[-1] & 0x0F
134 binary = struct.unpack(">I", h[offset:offset + 4])[0] & 0x7FFFFFFF
135 mod = 10 ** self._digits
136 return str(binary % mod).zfill(self._digits)
138 @staticmethod
139 def _pad_base64(s: str) -> str:
140 """Pad base32 string for base64 decoding (base32 → base64)."""
141 # Convert base32 to bytes, then encode as base64
142 # Standard base32 alphabet: A-Z 2-7, padding =
143 missing_padding = len(s) % 8
144 if missing_padding:
145 s += "=" * (8 - missing_padding)
146 raw = base64.b32decode(s)
147 return base64.b64encode(raw).decode("ascii")
149 @classmethod
150 def generate_secret(cls, length: int = 32) -> str:
151 """Generate a random base32 secret."""
152 import secrets
153 raw = secrets.token_bytes(length)
154 return base64.b32encode(raw).decode("ascii").rstrip("=")
157class HOTP(TOTP):
158 """HMAC-based One-Time Password (RFC 4226).
160 Usage:
161 hotp = HOTP(secret="JBSWY3DPEHPK3PXP")
162 code = hotp.at(0) # Generate for counter 0
163 ok = hotp.verify(code, counter=0)
164 """
166 def __init__(
167 self,
168 secret: str,
169 digits: int = 6,
170 algorithm: str = "SHA1",
171 ):
172 super().__init__(secret=secret, digits=digits, period=1, algorithm=algorithm)
174 def at(self, counter: int) -> str:
175 return self._generate(counter)
177 def verify(
178 self,
179 code: str,
180 counter: int,
181 look_ahead: int = 10,
182 ) -> Tuple[bool, Optional[int]]:
183 """Verify HOTP code, returns (is_valid, matched_counter)."""
184 for c in range(counter, counter + look_ahead + 1):
185 if self.at(c) == code:
186 return True, c
187 return False, None