Coverage for agentos/tools/url_signer.py: 0%

50 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-03 08:06 +0800

1""" 

2URLSigner — HMAC-based signed URL generation and verification. 

3 

4Supports: 

5 - Sign URLs with HMAC-SHA256 

6 - Expiry-based signed URLs 

7 - Path-based signature 

8 - Verification of signed URLs 

9 - Multiple signing algorithms (HS256, HS384, HS512) 

10""" 

11 

12from __future__ import annotations 

13 

14import hashlib 

15import hmac 

16import time 

17import urllib.parse 

18from typing import Optional, Tuple 

19 

20 

21# ============================================================================ 

22# URLSigner 

23# ============================================================================ 

24 

25class URLSigner: 

26 """HMAC-based URL signing for secure temporary links. 

27 

28 Usage: 

29 signer = URLSigner(secret="my-secret-key") 

30 

31 # Generate a signed URL that expires in 1 hour 

32 signed = signer.sign("https://example.com/files/report.pdf", ttl=3600) 

33 # → https://example.com/files/report.pdf?sig=...&exp=... 

34 

35 # Verify a signed URL 

36 ok, path = signer.verify(signed) 

37 if ok: 

38 serve(path) 

39 """ 

40 

41 def __init__(self, secret: str, algorithm: str = "HS256"): 

42 self._secret = secret.encode("utf-8") 

43 algorithms = {"HS256": hashlib.sha256, "HS384": hashlib.sha384, "HS512": hashlib.sha512} 

44 if algorithm not in algorithms: 

45 raise ValueError(f"Unsupported algorithm: {algorithm}. Use HS256/HS384/HS512") 

46 self._hash_func = algorithms[algorithm] 

47 self._algorithm = algorithm 

48 

49 def sign( 

50 self, 

51 url: str, 

52 ttl: Optional[float] = None, 

53 extra_params: Optional[dict] = None, 

54 ) -> str: 

55 """Sign a URL with optional TTL and extra params. 

56 

57 Args: 

58 url: The URL to sign 

59 ttl: Time-to-live in seconds. None = no expiry 

60 extra_params: Additional query params to include in signature 

61 """ 

62 parsed = urllib.parse.urlparse(url) 

63 params = dict(urllib.parse.parse_qsl(parsed.query)) 

64 

65 # Build signature payload 

66 path = parsed.path 

67 if extra_params: 

68 for k, v in sorted(extra_params.items()): 

69 params[k] = str(v) 

70 

71 if ttl is not None: 

72 exp = int(time.time() + ttl) 

73 params["exp"] = str(exp) 

74 

75 # Generate signature over path + sorted params 

76 sig = self._compute_signature(path, params) 

77 

78 params["sig"] = sig 

79 

80 new_query = urllib.parse.urlencode(params) 

81 return urllib.parse.urlunparse(parsed._replace(query=new_query)) 

82 

83 def verify(self, url: str) -> Tuple[bool, Optional[str]]: 

84 """Verify a signed URL. Returns (is_valid, error_message).""" 

85 parsed = urllib.parse.urlparse(url) 

86 params = dict(urllib.parse.parse_qsl(parsed.query)) 

87 

88 sig = params.pop("sig", None) 

89 if not sig: 

90 return False, "Missing signature" 

91 

92 # Check expiry (do not pop — must remain for signature recalculation) 

93 exp = params.get("exp") 

94 if exp: 

95 exp_val = int(exp) 

96 if time.time() > exp_val: 

97 return False, f"URL expired at {time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(exp_val))}" 

98 

99 # Recompute 

100 path = parsed.path 

101 expected = self._compute_signature(path, params) 

102 

103 if not hmac.compare_digest(sig, expected): 

104 return False, "Invalid signature" 

105 

106 return True, None 

107 

108 def _compute_signature(self, path: str, params: dict) -> str: 

109 data = path.encode("utf-8") 

110 for k in sorted(params.keys()): 

111 v = params[k] 

112 data += f"|{k}={v}".encode("utf-8") 

113 return hmac.new(self._secret, data, self._hash_func).hexdigest()