Coverage for src / tracekit / analyzers / packet / metrics.py: 100%
121 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Packet metrics for stream analysis.
3This module provides throughput, jitter, and loss rate metrics
4for packet stream analysis.
7Example:
8 >>> from tracekit.analyzers.packet.metrics import throughput, jitter, loss_rate
9 >>> rate = throughput(packets)
10 >>> jitter_stats = jitter(packets)
12References:
13 RFC 3550 for jitter calculation
14"""
16from __future__ import annotations
18from dataclasses import dataclass
19from typing import TYPE_CHECKING, Any
21import numpy as np
23if TYPE_CHECKING:
24 from collections.abc import Iterator, Sequence
26 from numpy.typing import NDArray
29@dataclass
30class PacketInfo:
31 """Packet information for metrics calculation.
33 Attributes:
34 timestamp: Packet arrival timestamp in seconds.
35 size: Packet size in bytes.
36 sequence: Optional sequence number for loss detection.
37 """
39 timestamp: float
40 size: int
41 sequence: int | None = None
44@dataclass
45class ThroughputResult:
46 """Throughput measurement result.
48 Attributes:
49 bytes_per_second: Data rate in bytes/second.
50 bits_per_second: Data rate in bits/second.
51 packets_per_second: Packet rate.
52 total_bytes: Total bytes in measurement period.
53 total_packets: Total packets in measurement period.
54 duration: Measurement duration in seconds.
55 """
57 bytes_per_second: float
58 bits_per_second: float
59 packets_per_second: float
60 total_bytes: int
61 total_packets: int
62 duration: float
65@dataclass
66class JitterResult:
67 """Jitter measurement result.
69 Attributes:
70 mean: Mean inter-arrival time.
71 std: Standard deviation of inter-arrival time.
72 min: Minimum inter-arrival time.
73 max: Maximum inter-arrival time.
74 jitter_rfc3550: RFC 3550 jitter estimate.
75 """
77 mean: float
78 std: float
79 min: float
80 max: float
81 jitter_rfc3550: float
84@dataclass
85class LossResult:
86 """Packet loss measurement result.
88 Attributes:
89 loss_rate: Loss rate as fraction (0-1).
90 loss_percentage: Loss rate as percentage.
91 packets_lost: Estimated number of lost packets.
92 packets_received: Number of received packets.
93 gaps: List of (start_seq, end_seq) gap ranges.
94 """
96 loss_rate: float
97 loss_percentage: float
98 packets_lost: int
99 packets_received: int
100 gaps: list[tuple[int, int]]
103@dataclass
104class LatencyResult:
105 """Request-response latency result.
107 Attributes:
108 mean: Mean latency in seconds.
109 std: Standard deviation.
110 min: Minimum latency.
111 max: Maximum latency.
112 p50: Median latency.
113 p95: 95th percentile latency.
114 p99: 99th percentile latency.
115 samples: Number of samples.
116 """
118 mean: float
119 std: float
120 min: float
121 max: float
122 p50: float
123 p95: float
124 p99: float
125 samples: int
128def throughput(
129 packets: Sequence[PacketInfo] | Iterator[PacketInfo],
130 *,
131 window_size: float | None = None,
132) -> ThroughputResult:
133 """Calculate throughput and packet rate.
135 Args:
136 packets: Sequence or iterator of packets.
137 window_size: If provided, use sliding window of this duration.
138 If None, calculate over entire sequence.
140 Returns:
141 ThroughputResult with throughput metrics.
143 Example:
144 >>> packets = [PacketInfo(t, sz) for t, sz in data]
145 >>> result = throughput(packets)
146 >>> print(f"Throughput: {result.bits_per_second / 1e6:.2f} Mbps")
147 """
148 packet_list = list(packets)
150 if len(packet_list) < 2:
151 return ThroughputResult(
152 bytes_per_second=0.0,
153 bits_per_second=0.0,
154 packets_per_second=0.0,
155 total_bytes=sum(p.size for p in packet_list),
156 total_packets=len(packet_list),
157 duration=0.0,
158 )
160 total_bytes = sum(p.size for p in packet_list)
161 total_packets = len(packet_list)
163 # Sort by timestamp
164 sorted_packets = sorted(packet_list, key=lambda p: p.timestamp)
165 duration = sorted_packets[-1].timestamp - sorted_packets[0].timestamp
167 if duration <= 0:
168 duration = 1e-9 # Avoid division by zero
170 bytes_per_second = total_bytes / duration
171 packets_per_second = total_packets / duration
173 return ThroughputResult(
174 bytes_per_second=bytes_per_second,
175 bits_per_second=bytes_per_second * 8,
176 packets_per_second=packets_per_second,
177 total_bytes=total_bytes,
178 total_packets=total_packets,
179 duration=duration,
180 )
183def jitter(
184 packets: Sequence[PacketInfo] | Iterator[PacketInfo],
185) -> JitterResult:
186 """Calculate inter-arrival time jitter.
188 Computes jitter statistics including RFC 3550 jitter estimate.
190 Args:
191 packets: Sequence or iterator of packets with timestamps.
193 Returns:
194 JitterResult with jitter metrics.
196 Example:
197 >>> result = jitter(packets)
198 >>> print(f"Jitter: {result.std * 1000:.3f} ms")
200 References:
201 RFC 3550 Section A.8 for jitter calculation
202 """
203 packet_list = list(packets)
205 if len(packet_list) < 2:
206 return JitterResult(
207 mean=0.0,
208 std=0.0,
209 min=0.0,
210 max=0.0,
211 jitter_rfc3550=0.0,
212 )
214 # Sort by timestamp
215 sorted_packets = sorted(packet_list, key=lambda p: p.timestamp)
217 # Calculate inter-arrival times
218 timestamps = np.array([p.timestamp for p in sorted_packets])
219 iat = np.diff(timestamps)
221 # RFC 3550 jitter estimate (smoothed absolute deviation)
222 # J(i) = J(i-1) + (|D(i-1,i)| - J(i-1))/16
223 # where D(i-1,i) is the difference in inter-arrival times
224 if len(iat) > 1:
225 d = np.diff(iat) # Deviation from expected IAT
226 jitter_rfc = 0.0
227 for deviation in np.abs(d):
228 jitter_rfc = jitter_rfc + (deviation - jitter_rfc) / 16
229 else:
230 jitter_rfc = 0.0
232 return JitterResult(
233 mean=float(np.mean(iat)),
234 std=float(np.std(iat)),
235 min=float(np.min(iat)),
236 max=float(np.max(iat)),
237 jitter_rfc3550=float(jitter_rfc),
238 )
241def loss_rate(
242 packets: Sequence[PacketInfo] | Iterator[PacketInfo],
243) -> LossResult:
244 """Detect and report packet loss from sequence numbers.
246 Args:
247 packets: Sequence or iterator of packets with sequence numbers.
249 Returns:
250 LossResult with loss metrics.
252 Example:
253 >>> result = loss_rate(packets)
254 >>> print(f"Loss rate: {result.loss_percentage:.2f}%")
255 >>> for start, end in result.gaps:
256 ... print(f"Gap: {start} to {end}")
257 """
258 packet_list = list(packets)
260 # Filter packets with sequence numbers
261 with_seq = [(p.sequence, p.timestamp) for p in packet_list if p.sequence is not None]
263 if len(with_seq) < 2:
264 return LossResult(
265 loss_rate=0.0,
266 loss_percentage=0.0,
267 packets_lost=0,
268 packets_received=len(packet_list),
269 gaps=[],
270 )
272 # Sort by sequence number
273 sorted_seqs = sorted(with_seq, key=lambda x: x[0])
274 sequences = [s[0] for s in sorted_seqs]
276 # Find gaps in sequence
277 gaps: list[tuple[int, int]] = []
278 packets_lost = 0
280 for i in range(1, len(sequences)):
281 expected = sequences[i - 1] + 1
282 actual = sequences[i]
284 if actual > expected:
285 # Gap detected
286 gaps.append((expected, actual - 1))
287 packets_lost += actual - expected
289 # Calculate loss rate
290 total_expected = sequences[-1] - sequences[0] + 1
291 packets_received = len(sequences)
293 loss_frac = packets_lost / total_expected if total_expected > 0 else 0.0
295 return LossResult(
296 loss_rate=loss_frac,
297 loss_percentage=loss_frac * 100,
298 packets_lost=packets_lost,
299 packets_received=packets_received,
300 gaps=gaps,
301 )
304def latency(
305 request_times: Sequence[float] | NDArray[np.floating[Any]],
306 response_times: Sequence[float] | NDArray[np.floating[Any]],
307) -> LatencyResult:
308 """Calculate request-response latency statistics.
310 Args:
311 request_times: Array of request timestamps.
312 response_times: Array of corresponding response timestamps.
314 Returns:
315 LatencyResult with latency statistics.
317 Raises:
318 ValueError: If request and response arrays have different lengths.
320 Example:
321 >>> result = latency(request_times, response_times)
322 >>> print(f"Mean latency: {result.mean * 1000:.2f} ms")
323 >>> print(f"P99 latency: {result.p99 * 1000:.2f} ms")
324 """
325 req = np.asarray(request_times)
326 resp = np.asarray(response_times)
328 if len(req) != len(resp):
329 raise ValueError("Request and response arrays must have same length")
331 if len(req) == 0:
332 return LatencyResult(
333 mean=0.0,
334 std=0.0,
335 min=0.0,
336 max=0.0,
337 p50=0.0,
338 p95=0.0,
339 p99=0.0,
340 samples=0,
341 )
343 latencies = resp - req
345 # Filter out negative latencies (invalid pairings)
346 valid = latencies >= 0
347 latencies = latencies[valid]
349 if len(latencies) == 0:
350 return LatencyResult(
351 mean=0.0,
352 std=0.0,
353 min=0.0,
354 max=0.0,
355 p50=0.0,
356 p95=0.0,
357 p99=0.0,
358 samples=0,
359 )
361 return LatencyResult(
362 mean=float(np.mean(latencies)),
363 std=float(np.std(latencies)),
364 min=float(np.min(latencies)),
365 max=float(np.max(latencies)),
366 p50=float(np.percentile(latencies, 50)),
367 p95=float(np.percentile(latencies, 95)),
368 p99=float(np.percentile(latencies, 99)),
369 samples=len(latencies),
370 )
373def windowed_throughput(
374 packets: Sequence[PacketInfo],
375 window_size: float,
376 step_size: float | None = None,
377) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
378 """Calculate throughput over sliding windows.
380 Args:
381 packets: Sequence of packets.
382 window_size: Window size in seconds.
383 step_size: Step size in seconds (default: window_size / 2).
385 Returns:
386 (times, throughputs) - Center times and throughput values.
388 Example:
389 >>> times, rates = windowed_throughput(packets, window_size=1.0)
390 >>> plt.plot(times, rates / 1e6)
391 >>> plt.ylabel("Throughput (Mbps)")
392 """
393 if step_size is None:
394 step_size = window_size / 2
396 packet_list = sorted(packets, key=lambda p: p.timestamp)
398 if len(packet_list) < 2:
399 return np.array([]), np.array([])
401 start_time = packet_list[0].timestamp
402 end_time = packet_list[-1].timestamp
404 times = []
405 throughputs = []
407 window_start = start_time
409 while window_start + window_size <= end_time:
410 window_end = window_start + window_size
412 # Count bytes in window
413 window_bytes = sum(p.size for p in packet_list if window_start <= p.timestamp < window_end)
415 center_time = window_start + window_size / 2
416 rate = window_bytes / window_size * 8 # bits/second
418 times.append(center_time)
419 throughputs.append(rate)
421 window_start += step_size
423 return np.array(times), np.array(throughputs)
426__all__ = [
427 "JitterResult",
428 "LatencyResult",
429 "LossResult",
430 "PacketInfo",
431 "ThroughputResult",
432 "jitter",
433 "latency",
434 "loss_rate",
435 "throughput",
436 "windowed_throughput",
437]