Coverage for agentos/tools/id_generator.py: 0%
77 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 08:34 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 08:34 +0800
1"""
2IDGenerator — multi-format unique identifier generation.
4Supports:
5 - UUID4 (random)
6 - UUID7 (time-ordered, sortable)
7 - ULID (26-char Crockford base32, time-sortable)
8 - Nano ID (custom alphabet & length)
9 - Snowflake-like (timestamp + worker + sequence)
10 - KSUID (K-Sortable Unique IDentifier)
11 - XID (12-byte globally unique ID)
12 - Short ID (URL-safe, configurable length)
13"""
15from __future__ import annotations
17import os
18import secrets
19import struct
20import time
21import uuid
22from typing import Optional
25# ============================================================================
26# UUID7 (time-ordered UUID, RFC 9562 draft)
27# ============================================================================
29def uuid7() -> str:
30 """Generate a time-ordered UUIDv7 string."""
31 timestamp_ms = int(time.time() * 1000)
32 rand_bytes = secrets.token_bytes(10)
34 # UUID7 layout: 48-bit unix_ts_ms | 4-bit ver | 12-bit rand_a | 2-bit var | 62-bit rand_b
35 ts_bytes = struct.pack(">Q", timestamp_ms)[2:] # 6 bytes
36 b = bytearray(ts_bytes + rand_bytes)
38 # Set version to 7
39 b[6] = (b[6] & 0x0F) | 0x70
40 # Set variant to 10xx (RFC 4122)
41 b[8] = (b[8] & 0x3F) | 0x80
43 # Format as UUID
44 u = uuid.UUID(bytes=bytes(b))
45 return str(u)
48# ============================================================================
49# ULID
50# ============================================================================
52_CROCKFORD = "0123456789ABCDEFGHJKMNPQRSTVWXYZ"
54def ulid() -> str:
55 """Generate a ULID (26-character Crockford base32)."""
56 ts = int(time.time() * 1000)
57 rand = secrets.token_bytes(10)
59 # Timestamp: 48 bits = 10 base32 chars
60 ts_part = ""
61 for _ in range(10):
62 ts_part = _CROCKFORD[ts & 0x1F] + ts_part
63 ts >>= 5
65 # Random: 80 bits = 16 base32 chars
66 rand_part = ""
67 r = int.from_bytes(rand, "big")
68 for _ in range(16):
69 rand_part = _CROCKFORD[r & 0x1F] + rand_part
70 r >>= 5
72 return ts_part + rand_part
75# ============================================================================
76# Nano ID
77# ============================================================================
79def nanoid(size: int = 21, alphabet: Optional[str] = None) -> str:
80 """Generate a Nano ID string.
82 Args:
83 size: Length of the ID (default 21)
84 alphabet: Custom alphabet (default URL-safe alphanumeric)
85 """
86 if alphabet is None:
87 alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz-"
89 mask = (1 << ((len(alphabet) - 1).bit_length())) - 1
90 step = max(1, int(1.6 * mask * size / len(alphabet)))
92 result = []
93 while len(result) < size:
94 for byte in secrets.token_bytes(step):
95 idx = byte & mask
96 if idx < len(alphabet):
97 result.append(alphabet[idx])
98 if len(result) == size:
99 break
101 return "".join(result)
104# ============================================================================
105# Snowflake
106# ============================================================================
108class Snowflake:
109 """Snowflake-like distributed ID generator.
111 Layout (64 bits): timestamp(42) | worker(10) | sequence(12)
112 Custom epoch: 2024-01-01T00:00:00Z
113 """
115 CUSTOM_EPOCH = 1704067200000 # 2024-01-01T00:00:00Z in ms
117 def __init__(self, worker_id: int = 0):
118 if not (0 <= worker_id < 1024):
119 raise ValueError("worker_id must be 0-1023")
120 self._worker_id = worker_id
121 self._sequence = 0
122 self._last_ms = -1
124 def generate(self) -> int:
125 """Generate next snowflake ID."""
126 now = int(time.time() * 1000)
128 if now < self._last_ms:
129 # Clock moved backwards — wait
130 now = self._last_ms
132 if now == self._last_ms:
133 self._sequence = (self._sequence + 1) & 0xFFF
134 if self._sequence == 0:
135 # Sequence exhausted, wait for next millisecond
136 while now <= self._last_ms:
137 now = int(time.time() * 1000)
138 else:
139 self._sequence = 0
141 self._last_ms = now
142 ts = now - self.CUSTOM_EPOCH
144 return (ts << 22) | (self._worker_id << 12) | self._sequence
146 def generate_str(self) -> str:
147 """Generate a snowflake ID as string."""
148 return str(self.generate())
151# ============================================================================
152# Short ID
153# ============================================================================
155_SHORT_ALPHABET = "23456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz"
157def short_id(length: int = 8) -> str:
158 """Generate a short URL-safe random ID."""
159 return "".join(secrets.choice(_SHORT_ALPHABET) for _ in range(length))
162# ============================================================================
163# Convenience
164# ============================================================================
166def uuid4() -> str:
167 """Standard random UUIDv4."""
168 return str(uuid.uuid4())
170def generate(style: str = "uuid4") -> str:
171 """Generate an ID in the requested style.
173 Supported: uuid4, uuid7, ulid, nanoid, short
174 """
175 generators = {
176 "uuid4": uuid4,
177 "uuid7": uuid7,
178 "ulid": ulid,
179 "nanoid": lambda: nanoid(),
180 "short": lambda: short_id(),
181 }
182 if style not in generators:
183 raise ValueError(f"Unknown style: {style}. Choose from {list(generators.keys())}")
184 return generators[style]()