Coverage for src / tracekit / analyzers / packet / metrics.py: 100%

121 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Packet metrics for stream analysis. 

2 

3This module provides throughput, jitter, and loss rate metrics 

4for packet stream analysis. 

5 

6 

7Example: 

8 >>> from tracekit.analyzers.packet.metrics import throughput, jitter, loss_rate 

9 >>> rate = throughput(packets) 

10 >>> jitter_stats = jitter(packets) 

11 

12References: 

13 RFC 3550 for jitter calculation 

14""" 

15 

16from __future__ import annotations 

17 

18from dataclasses import dataclass 

19from typing import TYPE_CHECKING, Any 

20 

21import numpy as np 

22 

23if TYPE_CHECKING: 

24 from collections.abc import Iterator, Sequence 

25 

26 from numpy.typing import NDArray 

27 

28 

29@dataclass 

30class PacketInfo: 

31 """Packet information for metrics calculation. 

32 

33 Attributes: 

34 timestamp: Packet arrival timestamp in seconds. 

35 size: Packet size in bytes. 

36 sequence: Optional sequence number for loss detection. 

37 """ 

38 

39 timestamp: float 

40 size: int 

41 sequence: int | None = None 

42 

43 

44@dataclass 

45class ThroughputResult: 

46 """Throughput measurement result. 

47 

48 Attributes: 

49 bytes_per_second: Data rate in bytes/second. 

50 bits_per_second: Data rate in bits/second. 

51 packets_per_second: Packet rate. 

52 total_bytes: Total bytes in measurement period. 

53 total_packets: Total packets in measurement period. 

54 duration: Measurement duration in seconds. 

55 """ 

56 

57 bytes_per_second: float 

58 bits_per_second: float 

59 packets_per_second: float 

60 total_bytes: int 

61 total_packets: int 

62 duration: float 

63 

64 

65@dataclass 

66class JitterResult: 

67 """Jitter measurement result. 

68 

69 Attributes: 

70 mean: Mean inter-arrival time. 

71 std: Standard deviation of inter-arrival time. 

72 min: Minimum inter-arrival time. 

73 max: Maximum inter-arrival time. 

74 jitter_rfc3550: RFC 3550 jitter estimate. 

75 """ 

76 

77 mean: float 

78 std: float 

79 min: float 

80 max: float 

81 jitter_rfc3550: float 

82 

83 

84@dataclass 

85class LossResult: 

86 """Packet loss measurement result. 

87 

88 Attributes: 

89 loss_rate: Loss rate as fraction (0-1). 

90 loss_percentage: Loss rate as percentage. 

91 packets_lost: Estimated number of lost packets. 

92 packets_received: Number of received packets. 

93 gaps: List of (start_seq, end_seq) gap ranges. 

94 """ 

95 

96 loss_rate: float 

97 loss_percentage: float 

98 packets_lost: int 

99 packets_received: int 

100 gaps: list[tuple[int, int]] 

101 

102 

103@dataclass 

104class LatencyResult: 

105 """Request-response latency result. 

106 

107 Attributes: 

108 mean: Mean latency in seconds. 

109 std: Standard deviation. 

110 min: Minimum latency. 

111 max: Maximum latency. 

112 p50: Median latency. 

113 p95: 95th percentile latency. 

114 p99: 99th percentile latency. 

115 samples: Number of samples. 

116 """ 

117 

118 mean: float 

119 std: float 

120 min: float 

121 max: float 

122 p50: float 

123 p95: float 

124 p99: float 

125 samples: int 

126 

127 

128def throughput( 

129 packets: Sequence[PacketInfo] | Iterator[PacketInfo], 

130 *, 

131 window_size: float | None = None, 

132) -> ThroughputResult: 

133 """Calculate throughput and packet rate. 

134 

135 Args: 

136 packets: Sequence or iterator of packets. 

137 window_size: If provided, use sliding window of this duration. 

138 If None, calculate over entire sequence. 

139 

140 Returns: 

141 ThroughputResult with throughput metrics. 

142 

143 Example: 

144 >>> packets = [PacketInfo(t, sz) for t, sz in data] 

145 >>> result = throughput(packets) 

146 >>> print(f"Throughput: {result.bits_per_second / 1e6:.2f} Mbps") 

147 """ 

148 packet_list = list(packets) 

149 

150 if len(packet_list) < 2: 

151 return ThroughputResult( 

152 bytes_per_second=0.0, 

153 bits_per_second=0.0, 

154 packets_per_second=0.0, 

155 total_bytes=sum(p.size for p in packet_list), 

156 total_packets=len(packet_list), 

157 duration=0.0, 

158 ) 

159 

160 total_bytes = sum(p.size for p in packet_list) 

161 total_packets = len(packet_list) 

162 

163 # Sort by timestamp 

164 sorted_packets = sorted(packet_list, key=lambda p: p.timestamp) 

165 duration = sorted_packets[-1].timestamp - sorted_packets[0].timestamp 

166 

167 if duration <= 0: 

168 duration = 1e-9 # Avoid division by zero 

169 

170 bytes_per_second = total_bytes / duration 

171 packets_per_second = total_packets / duration 

172 

173 return ThroughputResult( 

174 bytes_per_second=bytes_per_second, 

175 bits_per_second=bytes_per_second * 8, 

176 packets_per_second=packets_per_second, 

177 total_bytes=total_bytes, 

178 total_packets=total_packets, 

179 duration=duration, 

180 ) 

181 

182 

183def jitter( 

184 packets: Sequence[PacketInfo] | Iterator[PacketInfo], 

185) -> JitterResult: 

186 """Calculate inter-arrival time jitter. 

187 

188 Computes jitter statistics including RFC 3550 jitter estimate. 

189 

190 Args: 

191 packets: Sequence or iterator of packets with timestamps. 

192 

193 Returns: 

194 JitterResult with jitter metrics. 

195 

196 Example: 

197 >>> result = jitter(packets) 

198 >>> print(f"Jitter: {result.std * 1000:.3f} ms") 

199 

200 References: 

201 RFC 3550 Section A.8 for jitter calculation 

202 """ 

203 packet_list = list(packets) 

204 

205 if len(packet_list) < 2: 

206 return JitterResult( 

207 mean=0.0, 

208 std=0.0, 

209 min=0.0, 

210 max=0.0, 

211 jitter_rfc3550=0.0, 

212 ) 

213 

214 # Sort by timestamp 

215 sorted_packets = sorted(packet_list, key=lambda p: p.timestamp) 

216 

217 # Calculate inter-arrival times 

218 timestamps = np.array([p.timestamp for p in sorted_packets]) 

219 iat = np.diff(timestamps) 

220 

221 # RFC 3550 jitter estimate (smoothed absolute deviation) 

222 # J(i) = J(i-1) + (|D(i-1,i)| - J(i-1))/16 

223 # where D(i-1,i) is the difference in inter-arrival times 

224 if len(iat) > 1: 

225 d = np.diff(iat) # Deviation from expected IAT 

226 jitter_rfc = 0.0 

227 for deviation in np.abs(d): 

228 jitter_rfc = jitter_rfc + (deviation - jitter_rfc) / 16 

229 else: 

230 jitter_rfc = 0.0 

231 

232 return JitterResult( 

233 mean=float(np.mean(iat)), 

234 std=float(np.std(iat)), 

235 min=float(np.min(iat)), 

236 max=float(np.max(iat)), 

237 jitter_rfc3550=float(jitter_rfc), 

238 ) 

239 

240 

241def loss_rate( 

242 packets: Sequence[PacketInfo] | Iterator[PacketInfo], 

243) -> LossResult: 

244 """Detect and report packet loss from sequence numbers. 

245 

246 Args: 

247 packets: Sequence or iterator of packets with sequence numbers. 

248 

249 Returns: 

250 LossResult with loss metrics. 

251 

252 Example: 

253 >>> result = loss_rate(packets) 

254 >>> print(f"Loss rate: {result.loss_percentage:.2f}%") 

255 >>> for start, end in result.gaps: 

256 ... print(f"Gap: {start} to {end}") 

257 """ 

258 packet_list = list(packets) 

259 

260 # Filter packets with sequence numbers 

261 with_seq = [(p.sequence, p.timestamp) for p in packet_list if p.sequence is not None] 

262 

263 if len(with_seq) < 2: 

264 return LossResult( 

265 loss_rate=0.0, 

266 loss_percentage=0.0, 

267 packets_lost=0, 

268 packets_received=len(packet_list), 

269 gaps=[], 

270 ) 

271 

272 # Sort by sequence number 

273 sorted_seqs = sorted(with_seq, key=lambda x: x[0]) 

274 sequences = [s[0] for s in sorted_seqs] 

275 

276 # Find gaps in sequence 

277 gaps: list[tuple[int, int]] = [] 

278 packets_lost = 0 

279 

280 for i in range(1, len(sequences)): 

281 expected = sequences[i - 1] + 1 

282 actual = sequences[i] 

283 

284 if actual > expected: 

285 # Gap detected 

286 gaps.append((expected, actual - 1)) 

287 packets_lost += actual - expected 

288 

289 # Calculate loss rate 

290 total_expected = sequences[-1] - sequences[0] + 1 

291 packets_received = len(sequences) 

292 

293 loss_frac = packets_lost / total_expected if total_expected > 0 else 0.0 

294 

295 return LossResult( 

296 loss_rate=loss_frac, 

297 loss_percentage=loss_frac * 100, 

298 packets_lost=packets_lost, 

299 packets_received=packets_received, 

300 gaps=gaps, 

301 ) 

302 

303 

304def latency( 

305 request_times: Sequence[float] | NDArray[np.floating[Any]], 

306 response_times: Sequence[float] | NDArray[np.floating[Any]], 

307) -> LatencyResult: 

308 """Calculate request-response latency statistics. 

309 

310 Args: 

311 request_times: Array of request timestamps. 

312 response_times: Array of corresponding response timestamps. 

313 

314 Returns: 

315 LatencyResult with latency statistics. 

316 

317 Raises: 

318 ValueError: If request and response arrays have different lengths. 

319 

320 Example: 

321 >>> result = latency(request_times, response_times) 

322 >>> print(f"Mean latency: {result.mean * 1000:.2f} ms") 

323 >>> print(f"P99 latency: {result.p99 * 1000:.2f} ms") 

324 """ 

325 req = np.asarray(request_times) 

326 resp = np.asarray(response_times) 

327 

328 if len(req) != len(resp): 

329 raise ValueError("Request and response arrays must have same length") 

330 

331 if len(req) == 0: 

332 return LatencyResult( 

333 mean=0.0, 

334 std=0.0, 

335 min=0.0, 

336 max=0.0, 

337 p50=0.0, 

338 p95=0.0, 

339 p99=0.0, 

340 samples=0, 

341 ) 

342 

343 latencies = resp - req 

344 

345 # Filter out negative latencies (invalid pairings) 

346 valid = latencies >= 0 

347 latencies = latencies[valid] 

348 

349 if len(latencies) == 0: 

350 return LatencyResult( 

351 mean=0.0, 

352 std=0.0, 

353 min=0.0, 

354 max=0.0, 

355 p50=0.0, 

356 p95=0.0, 

357 p99=0.0, 

358 samples=0, 

359 ) 

360 

361 return LatencyResult( 

362 mean=float(np.mean(latencies)), 

363 std=float(np.std(latencies)), 

364 min=float(np.min(latencies)), 

365 max=float(np.max(latencies)), 

366 p50=float(np.percentile(latencies, 50)), 

367 p95=float(np.percentile(latencies, 95)), 

368 p99=float(np.percentile(latencies, 99)), 

369 samples=len(latencies), 

370 ) 

371 

372 

373def windowed_throughput( 

374 packets: Sequence[PacketInfo], 

375 window_size: float, 

376 step_size: float | None = None, 

377) -> tuple[NDArray[np.float64], NDArray[np.float64]]: 

378 """Calculate throughput over sliding windows. 

379 

380 Args: 

381 packets: Sequence of packets. 

382 window_size: Window size in seconds. 

383 step_size: Step size in seconds (default: window_size / 2). 

384 

385 Returns: 

386 (times, throughputs) - Center times and throughput values. 

387 

388 Example: 

389 >>> times, rates = windowed_throughput(packets, window_size=1.0) 

390 >>> plt.plot(times, rates / 1e6) 

391 >>> plt.ylabel("Throughput (Mbps)") 

392 """ 

393 if step_size is None: 

394 step_size = window_size / 2 

395 

396 packet_list = sorted(packets, key=lambda p: p.timestamp) 

397 

398 if len(packet_list) < 2: 

399 return np.array([]), np.array([]) 

400 

401 start_time = packet_list[0].timestamp 

402 end_time = packet_list[-1].timestamp 

403 

404 times = [] 

405 throughputs = [] 

406 

407 window_start = start_time 

408 

409 while window_start + window_size <= end_time: 

410 window_end = window_start + window_size 

411 

412 # Count bytes in window 

413 window_bytes = sum(p.size for p in packet_list if window_start <= p.timestamp < window_end) 

414 

415 center_time = window_start + window_size / 2 

416 rate = window_bytes / window_size * 8 # bits/second 

417 

418 times.append(center_time) 

419 throughputs.append(rate) 

420 

421 window_start += step_size 

422 

423 return np.array(times), np.array(throughputs) 

424 

425 

426__all__ = [ 

427 "JitterResult", 

428 "LatencyResult", 

429 "LossResult", 

430 "PacketInfo", 

431 "ThroughputResult", 

432 "jitter", 

433 "latency", 

434 "loss_rate", 

435 "throughput", 

436 "windowed_throughput", 

437]