Coverage for agentos/tools/id_generator.py: 0%

77 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-03 08:34 +0800

1""" 

2IDGenerator — multi-format unique identifier generation. 

3 

4Supports: 

5 - UUID4 (random) 

6 - UUID7 (time-ordered, sortable) 

7 - ULID (26-char Crockford base32, time-sortable) 

8 - Nano ID (custom alphabet & length) 

9 - Snowflake-like (timestamp + worker + sequence) 

10 - KSUID (K-Sortable Unique IDentifier) 

11 - XID (12-byte globally unique ID) 

12 - Short ID (URL-safe, configurable length) 

13""" 

14 

15from __future__ import annotations 

16 

17import os 

18import secrets 

19import struct 

20import time 

21import uuid 

22from typing import Optional 

23 

24 

25# ============================================================================ 

26# UUID7 (time-ordered UUID, RFC 9562 draft) 

27# ============================================================================ 

28 

29def uuid7() -> str: 

30 """Generate a time-ordered UUIDv7 string.""" 

31 timestamp_ms = int(time.time() * 1000) 

32 rand_bytes = secrets.token_bytes(10) 

33 

34 # UUID7 layout: 48-bit unix_ts_ms | 4-bit ver | 12-bit rand_a | 2-bit var | 62-bit rand_b 

35 ts_bytes = struct.pack(">Q", timestamp_ms)[2:] # 6 bytes 

36 b = bytearray(ts_bytes + rand_bytes) 

37 

38 # Set version to 7 

39 b[6] = (b[6] & 0x0F) | 0x70 

40 # Set variant to 10xx (RFC 4122) 

41 b[8] = (b[8] & 0x3F) | 0x80 

42 

43 # Format as UUID 

44 u = uuid.UUID(bytes=bytes(b)) 

45 return str(u) 

46 

47 

48# ============================================================================ 

49# ULID 

50# ============================================================================ 

51 

52_CROCKFORD = "0123456789ABCDEFGHJKMNPQRSTVWXYZ" 

53 

54def ulid() -> str: 

55 """Generate a ULID (26-character Crockford base32).""" 

56 ts = int(time.time() * 1000) 

57 rand = secrets.token_bytes(10) 

58 

59 # Timestamp: 48 bits = 10 base32 chars 

60 ts_part = "" 

61 for _ in range(10): 

62 ts_part = _CROCKFORD[ts & 0x1F] + ts_part 

63 ts >>= 5 

64 

65 # Random: 80 bits = 16 base32 chars 

66 rand_part = "" 

67 r = int.from_bytes(rand, "big") 

68 for _ in range(16): 

69 rand_part = _CROCKFORD[r & 0x1F] + rand_part 

70 r >>= 5 

71 

72 return ts_part + rand_part 

73 

74 

75# ============================================================================ 

76# Nano ID 

77# ============================================================================ 

78 

79def nanoid(size: int = 21, alphabet: Optional[str] = None) -> str: 

80 """Generate a Nano ID string. 

81 

82 Args: 

83 size: Length of the ID (default 21) 

84 alphabet: Custom alphabet (default URL-safe alphanumeric) 

85 """ 

86 if alphabet is None: 

87 alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz-" 

88 

89 mask = (1 << ((len(alphabet) - 1).bit_length())) - 1 

90 step = max(1, int(1.6 * mask * size / len(alphabet))) 

91 

92 result = [] 

93 while len(result) < size: 

94 for byte in secrets.token_bytes(step): 

95 idx = byte & mask 

96 if idx < len(alphabet): 

97 result.append(alphabet[idx]) 

98 if len(result) == size: 

99 break 

100 

101 return "".join(result) 

102 

103 

104# ============================================================================ 

105# Snowflake 

106# ============================================================================ 

107 

108class Snowflake: 

109 """Snowflake-like distributed ID generator. 

110 

111 Layout (64 bits): timestamp(42) | worker(10) | sequence(12) 

112 Custom epoch: 2024-01-01T00:00:00Z 

113 """ 

114 

115 CUSTOM_EPOCH = 1704067200000 # 2024-01-01T00:00:00Z in ms 

116 

117 def __init__(self, worker_id: int = 0): 

118 if not (0 <= worker_id < 1024): 

119 raise ValueError("worker_id must be 0-1023") 

120 self._worker_id = worker_id 

121 self._sequence = 0 

122 self._last_ms = -1 

123 

124 def generate(self) -> int: 

125 """Generate next snowflake ID.""" 

126 now = int(time.time() * 1000) 

127 

128 if now < self._last_ms: 

129 # Clock moved backwards — wait 

130 now = self._last_ms 

131 

132 if now == self._last_ms: 

133 self._sequence = (self._sequence + 1) & 0xFFF 

134 if self._sequence == 0: 

135 # Sequence exhausted, wait for next millisecond 

136 while now <= self._last_ms: 

137 now = int(time.time() * 1000) 

138 else: 

139 self._sequence = 0 

140 

141 self._last_ms = now 

142 ts = now - self.CUSTOM_EPOCH 

143 

144 return (ts << 22) | (self._worker_id << 12) | self._sequence 

145 

146 def generate_str(self) -> str: 

147 """Generate a snowflake ID as string.""" 

148 return str(self.generate()) 

149 

150 

151# ============================================================================ 

152# Short ID 

153# ============================================================================ 

154 

155_SHORT_ALPHABET = "23456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz" 

156 

157def short_id(length: int = 8) -> str: 

158 """Generate a short URL-safe random ID.""" 

159 return "".join(secrets.choice(_SHORT_ALPHABET) for _ in range(length)) 

160 

161 

162# ============================================================================ 

163# Convenience 

164# ============================================================================ 

165 

166def uuid4() -> str: 

167 """Standard random UUIDv4.""" 

168 return str(uuid.uuid4()) 

169 

170def generate(style: str = "uuid4") -> str: 

171 """Generate an ID in the requested style. 

172 

173 Supported: uuid4, uuid7, ulid, nanoid, short 

174 """ 

175 generators = { 

176 "uuid4": uuid4, 

177 "uuid7": uuid7, 

178 "ulid": ulid, 

179 "nanoid": lambda: nanoid(), 

180 "short": lambda: short_id(), 

181 } 

182 if style not in generators: 

183 raise ValueError(f"Unknown style: {style}. Choose from {list(generators.keys())}") 

184 return generators[style]()