Coverage for agentos/tools/url_signer.py: 0%
50 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 08:06 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 08:06 +0800
1"""
2URLSigner — HMAC-based signed URL generation and verification.
4Supports:
5 - Sign URLs with HMAC-SHA256
6 - Expiry-based signed URLs
7 - Path-based signature
8 - Verification of signed URLs
9 - Multiple signing algorithms (HS256, HS384, HS512)
10"""
12from __future__ import annotations
14import hashlib
15import hmac
16import time
17import urllib.parse
18from typing import Optional, Tuple
21# ============================================================================
22# URLSigner
23# ============================================================================
25class URLSigner:
26 """HMAC-based URL signing for secure temporary links.
28 Usage:
29 signer = URLSigner(secret="my-secret-key")
31 # Generate a signed URL that expires in 1 hour
32 signed = signer.sign("https://example.com/files/report.pdf", ttl=3600)
33 # → https://example.com/files/report.pdf?sig=...&exp=...
35 # Verify a signed URL
36 ok, path = signer.verify(signed)
37 if ok:
38 serve(path)
39 """
41 def __init__(self, secret: str, algorithm: str = "HS256"):
42 self._secret = secret.encode("utf-8")
43 algorithms = {"HS256": hashlib.sha256, "HS384": hashlib.sha384, "HS512": hashlib.sha512}
44 if algorithm not in algorithms:
45 raise ValueError(f"Unsupported algorithm: {algorithm}. Use HS256/HS384/HS512")
46 self._hash_func = algorithms[algorithm]
47 self._algorithm = algorithm
49 def sign(
50 self,
51 url: str,
52 ttl: Optional[float] = None,
53 extra_params: Optional[dict] = None,
54 ) -> str:
55 """Sign a URL with optional TTL and extra params.
57 Args:
58 url: The URL to sign
59 ttl: Time-to-live in seconds. None = no expiry
60 extra_params: Additional query params to include in signature
61 """
62 parsed = urllib.parse.urlparse(url)
63 params = dict(urllib.parse.parse_qsl(parsed.query))
65 # Build signature payload
66 path = parsed.path
67 if extra_params:
68 for k, v in sorted(extra_params.items()):
69 params[k] = str(v)
71 if ttl is not None:
72 exp = int(time.time() + ttl)
73 params["exp"] = str(exp)
75 # Generate signature over path + sorted params
76 sig = self._compute_signature(path, params)
78 params["sig"] = sig
80 new_query = urllib.parse.urlencode(params)
81 return urllib.parse.urlunparse(parsed._replace(query=new_query))
83 def verify(self, url: str) -> Tuple[bool, Optional[str]]:
84 """Verify a signed URL. Returns (is_valid, error_message)."""
85 parsed = urllib.parse.urlparse(url)
86 params = dict(urllib.parse.parse_qsl(parsed.query))
88 sig = params.pop("sig", None)
89 if not sig:
90 return False, "Missing signature"
92 # Check expiry (do not pop — must remain for signature recalculation)
93 exp = params.get("exp")
94 if exp:
95 exp_val = int(exp)
96 if time.time() > exp_val:
97 return False, f"URL expired at {time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(exp_val))}"
99 # Recompute
100 path = parsed.path
101 expected = self._compute_signature(path, params)
103 if not hmac.compare_digest(sig, expected):
104 return False, "Invalid signature"
106 return True, None
108 def _compute_signature(self, path: str, params: dict) -> str:
109 data = path.encode("utf-8")
110 for k in sorted(params.keys()):
111 v = params[k]
112 data += f"|{k}={v}".encode("utf-8")
113 return hmac.new(self._secret, data, self._hash_func).hexdigest()