Coverage for agentos/tools/totp.py: 0%

80 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-03 08:12 +0800

1""" 

2TOTP — Time-based One-Time Password (RFC 6238). 

3 

4Supports: 

5 - TOTP generation (SHA1/SHA256/SHA512) 

6 - Configurable digits (6/8) and period (30s default) 

7 - Key URI generation for QR codes (otpauth://) 

8 - Verification with drift tolerance 

9 - HOTP (counter-based) support (RFC 4226) 

10""" 

11 

12from __future__ import annotations 

13 

14import base64 

15import hashlib 

16import hmac 

17import struct 

18import time 

19from typing import Optional, Tuple 

20from urllib.parse import quote, urlencode 

21 

22 

23# ============================================================================ 

24# TOTP / HOTP 

25# ============================================================================ 

26 

27class TOTP: 

28 """Time-based One-Time Password generator (RFC 6238). 

29 

30 Usage: 

31 totp = TOTP(secret="JBSWY3DPEHPK3PXP") 

32 code = totp.now() # 6-digit code 

33 ok = totp.verify(code) # True/False 

34 uri = totp.to_uri("user@example.com", "MyApp") # QR code URI 

35 """ 

36 

37 def __init__( 

38 self, 

39 secret: str, 

40 digits: int = 6, 

41 period: int = 30, 

42 algorithm: str = "SHA1", 

43 ): 

44 self._secret = secret.upper().replace(" ", "") 

45 self._digits = digits 

46 self._period = period 

47 algorithms = {"SHA1": hashlib.sha1, "SHA256": hashlib.sha256, "SHA512": hashlib.sha512} 

48 if algorithm not in algorithms: 

49 raise ValueError(f"Unsupported algorithm: {algorithm}") 

50 self._hash_func = algorithms[algorithm] 

51 self._algorithm = algorithm 

52 

53 @property 

54 def secret(self) -> str: 

55 return self._secret 

56 

57 @property 

58 def digits(self) -> int: 

59 return self._digits 

60 

61 @property 

62 def period(self) -> int: 

63 return self._period 

64 

65 @property 

66 def algorithm(self) -> str: 

67 return self._algorithm 

68 

69 # ---------- Generation ---------- 

70 

71 def now(self) -> str: 

72 """Generate the current TOTP code.""" 

73 return self.at(int(time.time())) 

74 

75 def at(self, timestamp: int) -> str: 

76 """Generate TOTP code for a specific Unix timestamp.""" 

77 counter = timestamp // self._period 

78 return self._generate(counter) 

79 

80 # ---------- Verification ---------- 

81 

82 def verify( 

83 self, 

84 code: str, 

85 drift: int = 1, 

86 timestamp: Optional[int] = None, 

87 ) -> bool: 

88 """Verify a TOTP code with optional drift tolerance. 

89 

90 Args: 

91 code: The code to verify 

92 drift: Number of periods before/after to check (default 1 = +/-30s) 

93 timestamp: Reference timestamp, defaults to now 

94 """ 

95 ts = timestamp or int(time.time()) 

96 for offset in range(-drift, drift + 1): 

97 if self.at(ts + offset * self._period) == code: 

98 return True 

99 return False 

100 

101 # ---------- URI ---------- 

102 

103 def to_uri(self, account: str, issuer: Optional[str] = None) -> str: 

104 """Generate otpauth:// URI for QR code. 

105 

106 Args: 

107 account: User account (e.g., email) 

108 issuer: Service name 

109 """ 

110 label = account 

111 if issuer: 

112 label = f"{issuer}:{account}" 

113 

114 params = { 

115 "secret": self._secret, 

116 "digits": str(self._digits), 

117 "period": str(self._period), 

118 "algorithm": self._algorithm, 

119 } 

120 if issuer: 

121 params["issuer"] = issuer 

122 

123 query = urlencode(params) 

124 return f"otpauth://totp/{quote(label)}?{query}" 

125 

126 # ---------- Internal ---------- 

127 

128 def _generate(self, counter: int) -> str: 

129 """Generate HOTP code for a given counter.""" 

130 key = base64.b64decode(self._pad_base64(self._secret)) 

131 msg = struct.pack(">Q", counter) 

132 h = hmac.new(key, msg, self._hash_func).digest() 

133 offset = h[-1] & 0x0F 

134 binary = struct.unpack(">I", h[offset:offset + 4])[0] & 0x7FFFFFFF 

135 mod = 10 ** self._digits 

136 return str(binary % mod).zfill(self._digits) 

137 

138 @staticmethod 

139 def _pad_base64(s: str) -> str: 

140 """Pad base32 string for base64 decoding (base32 → base64).""" 

141 # Convert base32 to bytes, then encode as base64 

142 # Standard base32 alphabet: A-Z 2-7, padding = 

143 missing_padding = len(s) % 8 

144 if missing_padding: 

145 s += "=" * (8 - missing_padding) 

146 raw = base64.b32decode(s) 

147 return base64.b64encode(raw).decode("ascii") 

148 

149 @classmethod 

150 def generate_secret(cls, length: int = 32) -> str: 

151 """Generate a random base32 secret.""" 

152 import secrets 

153 raw = secrets.token_bytes(length) 

154 return base64.b32encode(raw).decode("ascii").rstrip("=") 

155 

156 

157class HOTP(TOTP): 

158 """HMAC-based One-Time Password (RFC 4226). 

159 

160 Usage: 

161 hotp = HOTP(secret="JBSWY3DPEHPK3PXP") 

162 code = hotp.at(0) # Generate for counter 0 

163 ok = hotp.verify(code, counter=0) 

164 """ 

165 

166 def __init__( 

167 self, 

168 secret: str, 

169 digits: int = 6, 

170 algorithm: str = "SHA1", 

171 ): 

172 super().__init__(secret=secret, digits=digits, period=1, algorithm=algorithm) 

173 

174 def at(self, counter: int) -> str: 

175 return self._generate(counter) 

176 

177 def verify( 

178 self, 

179 code: str, 

180 counter: int, 

181 look_ahead: int = 10, 

182 ) -> Tuple[bool, Optional[int]]: 

183 """Verify HOTP code, returns (is_valid, matched_counter).""" 

184 for c in range(counter, counter + look_ahead + 1): 

185 if self.at(c) == code: 

186 return True, c 

187 return False, None