Coverage for src / tracekit / inference / sequences.py: 97%
243 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Sequence pattern detection and request-response correlation.
3 - RE-SEQ-002: Sequence Pattern Detection
4 - RE-SEQ-003: Request-Response Correlation
6This module provides tools for detecting sequential patterns in message
7streams, identifying request-response pairs, and analyzing communication
8flows.
9"""
11from __future__ import annotations
13from collections import defaultdict
14from collections.abc import Callable, Sequence
15from dataclasses import dataclass, field
16from typing import Any
18import numpy as np
21@dataclass
22class SequencePattern:
23 """A detected sequence pattern.
25 Implements RE-SEQ-002: Sequence pattern representation.
27 Attributes:
28 pattern: List of message identifiers in sequence.
29 frequency: Number of occurrences.
30 positions: Starting positions in stream.
31 confidence: Detection confidence (0-1).
32 avg_gap: Average gap between elements.
33 gap_variance: Variance in inter-element gaps.
34 """
36 pattern: list[Any]
37 frequency: int
38 positions: list[int] = field(default_factory=list)
39 confidence: float = 0.0
40 avg_gap: float = 0.0
41 gap_variance: float = 0.0
44@dataclass
45class RequestResponsePair:
46 """A correlated request-response pair.
48 Implements RE-SEQ-003: Request-response pair.
50 Attributes:
51 request_index: Index of request message.
52 response_index: Index of response message.
53 request: Request message data.
54 response: Response message data.
55 latency: Time between request and response.
56 correlation_id: Detected correlation identifier.
57 confidence: Correlation confidence (0-1).
58 """
60 request_index: int
61 response_index: int
62 request: Any
63 response: Any
64 latency: float
65 correlation_id: bytes | int | None = None
66 confidence: float = 0.0
69@dataclass
70class CommunicationFlow:
71 """A complete communication flow.
73 Implements RE-SEQ-003: Communication flow.
75 Attributes:
76 flow_id: Unique flow identifier.
77 messages: List of messages in flow.
78 pairs: Request-response pairs.
79 direction: Primary direction ('request_first' or 'response_first').
80 participants: Identified participants.
81 duration: Total flow duration.
82 """
84 flow_id: int
85 messages: list[Any]
86 pairs: list[RequestResponsePair]
87 direction: str
88 participants: list[str]
89 duration: float
92class SequencePatternDetector:
93 """Detect sequential patterns in message streams.
95 Implements RE-SEQ-002: Sequence Pattern Detection.
97 Identifies recurring patterns of message types or values in
98 communication streams.
100 Example:
101 >>> detector = SequencePatternDetector()
102 >>> patterns = detector.detect_patterns(messages, key=lambda m: m.type)
103 """
105 def __init__(
106 self,
107 min_pattern_length: int = 2,
108 max_pattern_length: int = 10,
109 min_frequency: int = 2,
110 max_gap: float | None = None,
111 ) -> None:
112 """Initialize detector.
114 Args:
115 min_pattern_length: Minimum pattern length.
116 max_pattern_length: Maximum pattern length.
117 min_frequency: Minimum occurrences to consider.
118 max_gap: Maximum allowed gap between pattern elements.
119 """
120 self.min_pattern_length = min_pattern_length
121 self.max_pattern_length = max_pattern_length
122 self.min_frequency = min_frequency
123 self.max_gap = max_gap
125 def detect_patterns(
126 self,
127 messages: Sequence[Any],
128 key: Callable[[Any], Any] | None = None,
129 timestamp_key: Callable[[Any], float] | None = None,
130 ) -> list[SequencePattern]:
131 """Detect sequential patterns in message stream.
133 Implements RE-SEQ-002: Pattern detection workflow.
135 Args:
136 messages: Sequence of messages.
137 key: Function to extract message identifier.
138 timestamp_key: Function to extract timestamp.
140 Returns:
141 List of detected patterns.
143 Example:
144 >>> patterns = detector.detect_patterns(
145 ... messages,
146 ... key=lambda m: m.get('type'),
147 ... timestamp_key=lambda m: m.get('timestamp')
148 ... )
149 """
150 if not messages:
151 return []
153 # Extract identifiers
154 if key is not None:
155 identifiers = [key(m) for m in messages]
156 else:
157 identifiers = list(messages)
159 # Extract timestamps if provided
160 timestamps = None
161 if timestamp_key is not None:
162 timestamps = [timestamp_key(m) for m in messages]
164 # Find all n-grams
165 candidates = self._find_ngram_patterns(identifiers, timestamps)
167 # Filter and score
168 patterns = self._score_patterns(candidates, identifiers, timestamps)
170 # Sort by confidence
171 patterns.sort(key=lambda p: (-p.confidence, -p.frequency))
173 return patterns
175 def find_repeating_sequences(
176 self,
177 messages: Sequence[Any],
178 key: Callable[[Any], Any] | None = None,
179 ) -> list[tuple[list[Any], int, list[int]]]:
180 """Find exactly repeating message sequences.
182 Implements RE-SEQ-002: Exact sequence detection.
184 Args:
185 messages: Sequence of messages.
186 key: Function to extract message identifier.
188 Returns:
189 List of (sequence, count, positions) tuples.
190 """
191 if not messages:
192 return []
194 # Extract identifiers
195 if key is not None:
196 identifiers = tuple(key(m) for m in messages)
197 else:
198 identifiers = tuple(messages)
200 results = []
202 for length in range(self.min_pattern_length, self.max_pattern_length + 1):
203 # Count n-grams
204 ngram_positions = defaultdict(list)
206 for i in range(len(identifiers) - length + 1):
207 ngram = identifiers[i : i + length]
208 ngram_positions[ngram].append(i)
210 # Filter by frequency
211 for ngram, positions in ngram_positions.items():
212 if len(positions) >= self.min_frequency:
213 results.append((list(ngram), len(positions), positions))
215 # Sort by frequency
216 results.sort(key=lambda x: -x[1])
218 return results
220 def detect_periodic_patterns(
221 self,
222 messages: Sequence[Any],
223 key: Callable[[Any], Any] | None = None,
224 timestamp_key: Callable[[Any], float] | None = None,
225 ) -> list[SequencePattern]:
226 """Detect patterns that occur at regular intervals.
228 Implements RE-SEQ-002: Periodic pattern detection.
230 Args:
231 messages: Sequence of messages.
232 key: Function to extract message identifier.
233 timestamp_key: Function to extract timestamp.
235 Returns:
236 List of periodic patterns.
237 """
238 patterns = self.detect_patterns(messages, key, timestamp_key)
240 # Filter for low gap variance (periodic)
241 periodic = []
242 for pattern in patterns:
243 if pattern.frequency >= 3 and pattern.avg_gap > 0: 243 ↛ 242line 243 didn't jump to line 242 because the condition on line 243 was always true
244 # Calculate coefficient of variation
245 if pattern.gap_variance > 0:
246 cv = (pattern.gap_variance**0.5) / pattern.avg_gap
247 else:
248 cv = 0
249 # Low CV indicates periodicity
250 if cv < 0.2: # Less than 20% variation
251 periodic.append(pattern)
253 return periodic
255 def _find_ngram_patterns(
256 self,
257 identifiers: list[Any],
258 timestamps: list[float] | None,
259 ) -> dict[tuple[Any, ...], list[int]]:
260 """Find all n-gram patterns.
262 Args:
263 identifiers: Message identifiers.
264 timestamps: Message timestamps.
266 Returns:
267 Dictionary mapping patterns to positions.
268 """
269 candidates = defaultdict(list)
271 for length in range(self.min_pattern_length, self.max_pattern_length + 1):
272 for i in range(len(identifiers) - length + 1):
273 ngram = tuple(identifiers[i : i + length])
275 # Check gap constraint
276 if self.max_gap is not None and timestamps is not None:
277 gaps = [timestamps[i + j + 1] - timestamps[i + j] for j in range(length - 1)]
278 if any(g > self.max_gap for g in gaps):
279 continue
281 candidates[ngram].append(i)
283 return candidates
285 def _score_patterns(
286 self,
287 candidates: dict[tuple[Any, ...], list[int]],
288 identifiers: list[Any],
289 timestamps: list[float] | None,
290 ) -> list[SequencePattern]:
291 """Score candidate patterns.
293 Args:
294 candidates: Pattern -> positions mapping.
295 identifiers: Original identifiers.
296 timestamps: Message timestamps.
298 Returns:
299 List of scored patterns.
300 """
301 patterns = []
303 for pattern_tuple, positions in candidates.items():
304 if len(positions) < self.min_frequency:
305 continue
307 pattern_list = list(pattern_tuple)
308 length = len(pattern_list)
310 # Calculate gap statistics if timestamps available
311 avg_gap = 0.0
312 gap_variance = 0.0
314 if timestamps is not None and len(positions) > 1:
315 # Calculate gaps between pattern occurrences
316 gaps = []
317 for i in range(len(positions) - 1):
318 start_time = timestamps[positions[i]]
319 end_time = timestamps[positions[i + 1]]
320 gaps.append(end_time - start_time)
322 if gaps: 322 ↛ 328line 322 didn't jump to line 328 because the condition on line 322 was always true
323 avg_gap = sum(gaps) / len(gaps)
324 gap_variance = sum((g - avg_gap) ** 2 for g in gaps) / len(gaps)
326 # Calculate confidence
327 # Higher confidence for: frequent, consistent gaps, longer patterns
328 frequency_score = min(1.0, len(positions) / 10)
329 length_score = min(1.0, length / 5)
331 if gap_variance > 0 and avg_gap > 0:
332 consistency_score = 1.0 / (1.0 + (gap_variance**0.5 / avg_gap))
333 else:
334 consistency_score = 0.5
336 confidence = 0.4 * frequency_score + 0.3 * consistency_score + 0.3 * length_score
338 patterns.append(
339 SequencePattern(
340 pattern=pattern_list,
341 frequency=len(positions),
342 positions=positions,
343 confidence=confidence,
344 avg_gap=avg_gap,
345 gap_variance=gap_variance,
346 )
347 )
349 return patterns
352class RequestResponseCorrelator:
353 """Correlate request and response messages.
355 Implements RE-SEQ-003: Request-Response Correlation.
357 Identifies matching request-response pairs in bidirectional
358 communication streams.
360 Example:
361 >>> correlator = RequestResponseCorrelator()
362 >>> pairs = correlator.correlate(
363 ... messages,
364 ... request_filter=lambda m: m.type == 'REQ',
365 ... response_filter=lambda m: m.type == 'RSP'
366 ... )
367 """
369 def __init__(
370 self,
371 max_latency: float = 10.0,
372 correlation_key: Callable[[Any], Any] | None = None,
373 ) -> None:
374 """Initialize correlator.
376 Args:
377 max_latency: Maximum time between request and response.
378 correlation_key: Function to extract correlation ID.
379 """
380 self.max_latency = max_latency
381 self.correlation_key = correlation_key
383 def correlate(
384 self,
385 messages: Sequence[Any],
386 request_filter: Callable[[Any], bool] | None = None,
387 response_filter: Callable[[Any], bool] | None = None,
388 timestamp_key: Callable[[Any], float] | None = None,
389 ) -> list[RequestResponsePair]:
390 """Correlate requests with responses.
392 Implements RE-SEQ-003: Request-response correlation workflow.
394 Args:
395 messages: All messages in stream.
396 request_filter: Function to identify requests.
397 response_filter: Function to identify responses.
398 timestamp_key: Function to extract timestamp.
400 Returns:
401 List of correlated pairs.
403 Example:
404 >>> pairs = correlator.correlate(
405 ... messages,
406 ... request_filter=lambda m: m['direction'] == 'out',
407 ... response_filter=lambda m: m['direction'] == 'in',
408 ... timestamp_key=lambda m: m['time']
409 ... )
410 """
411 # Separate requests and responses
412 requests = []
413 responses = []
415 for i, msg in enumerate(messages):
416 ts = timestamp_key(msg) if timestamp_key else float(i)
418 if request_filter is None or request_filter(msg):
419 correlation_id = None
420 if self.correlation_key is not None:
421 try:
422 correlation_id = self.correlation_key(msg)
423 except (KeyError, TypeError):
424 pass
425 requests.append((i, msg, ts, correlation_id))
427 if response_filter is None or response_filter(msg):
428 correlation_id = None
429 if self.correlation_key is not None:
430 try:
431 correlation_id = self.correlation_key(msg)
432 except (KeyError, TypeError):
433 pass
434 responses.append((i, msg, ts, correlation_id))
436 # Match pairs
437 return self._match_pairs(requests, responses)
439 def correlate_by_content(
440 self,
441 messages: Sequence[Any],
442 content_key: Callable[[Any], bytes],
443 timestamp_key: Callable[[Any], float] | None = None,
444 ) -> list[RequestResponsePair]:
445 """Correlate by analyzing message content similarity.
447 Implements RE-SEQ-003: Content-based correlation.
449 Args:
450 messages: All messages.
451 content_key: Function to extract message content.
452 timestamp_key: Function to extract timestamp.
454 Returns:
455 List of correlated pairs.
456 """
457 pairs = []
458 used_responses = set()
460 for i, msg in enumerate(messages):
461 req_content = content_key(msg)
462 req_ts = timestamp_key(msg) if timestamp_key else float(i)
464 best_match = None
465 best_score = 0.0
467 for j in range(i + 1, len(messages)):
468 if j in used_responses: 468 ↛ 469line 468 didn't jump to line 469 because the condition on line 468 was never true
469 continue
471 resp = messages[j]
472 resp_ts = timestamp_key(resp) if timestamp_key else float(j)
474 latency = resp_ts - req_ts
475 if latency < 0 or latency > self.max_latency: 475 ↛ 476line 475 didn't jump to line 476 because the condition on line 475 was never true
476 continue
478 resp_content = content_key(resp)
479 score = self._content_similarity(req_content, resp_content)
481 if score > best_score:
482 best_score = score
483 best_match = (j, resp, latency)
485 if best_match is not None and best_score > 0.3:
486 j, resp, latency = best_match
487 used_responses.add(j)
488 pairs.append(
489 RequestResponsePair(
490 request_index=i,
491 response_index=j,
492 request=msg,
493 response=resp,
494 latency=latency,
495 confidence=best_score,
496 )
497 )
499 return pairs
501 def extract_flows(
502 self,
503 pairs: Sequence[RequestResponsePair],
504 messages: Sequence[Any],
505 flow_key: Callable[[Any], str] | None = None,
506 ) -> list[CommunicationFlow]:
507 """Extract communication flows from pairs.
509 Implements RE-SEQ-003: Flow extraction.
511 Args:
512 pairs: Correlated request-response pairs.
513 messages: All messages.
514 flow_key: Function to extract flow identifier.
516 Returns:
517 List of communication flows.
518 """
519 if flow_key is None:
520 # Group all pairs into one flow
521 return [
522 CommunicationFlow(
523 flow_id=0,
524 messages=list(messages),
525 pairs=list(pairs),
526 direction="request_first",
527 participants=[],
528 duration=0.0,
529 )
530 ]
532 # Group by flow key
533 flow_groups = defaultdict(list)
534 for pair in pairs:
535 key = flow_key(pair.request)
536 flow_groups[key].append(pair)
538 flows = []
539 for i, (key, group_pairs) in enumerate(flow_groups.items()):
540 # Get all messages in this flow
541 indices = set()
542 for pair in group_pairs:
543 indices.add(pair.request_index)
544 indices.add(pair.response_index)
546 flow_messages = [messages[j] for j in sorted(indices)]
548 # Calculate duration
549 if group_pairs: 549 ↛ 553line 549 didn't jump to line 553 because the condition on line 549 was always true
550 _start = min(p.latency for p in group_pairs)
551 duration = max(p.latency for p in group_pairs)
552 else:
553 duration = 0.0
555 flows.append(
556 CommunicationFlow(
557 flow_id=i,
558 messages=flow_messages,
559 pairs=group_pairs,
560 direction="request_first",
561 participants=[str(key)],
562 duration=duration,
563 )
564 )
566 return flows
568 def _match_pairs(
569 self,
570 requests: list[tuple[int, Any, float, Any]],
571 responses: list[tuple[int, Any, float, Any]],
572 ) -> list[RequestResponsePair]:
573 """Match request and response messages.
575 Args:
576 requests: List of (index, message, timestamp, correlation_id).
577 responses: List of (index, message, timestamp, correlation_id).
579 Returns:
580 List of matched pairs.
581 """
582 pairs = []
583 used_responses = set()
585 for req_idx, req_msg, req_ts, req_id in requests:
586 best_match = None
587 best_score = 0.0
589 for resp_idx, resp_msg, resp_ts, resp_id in responses:
590 if resp_idx in used_responses:
591 continue
593 # Check timing
594 latency = resp_ts - req_ts
595 if latency < 0 or latency > self.max_latency:
596 continue
598 # Check correlation ID
599 if req_id is not None and resp_id is not None:
600 if req_id == resp_id:
601 score = 1.0
602 else:
603 score = 0.0
604 else:
605 # Use timing proximity
606 score = 1.0 - (latency / self.max_latency)
608 if score > best_score:
609 best_score = score
610 best_match = (resp_idx, resp_msg, latency, resp_id)
612 if best_match is not None:
613 resp_idx, resp_msg, latency, resp_id = best_match
614 used_responses.add(resp_idx)
616 pairs.append(
617 RequestResponsePair(
618 request_index=req_idx,
619 response_index=resp_idx,
620 request=req_msg,
621 response=resp_msg,
622 latency=latency,
623 correlation_id=req_id if req_id is not None else resp_id,
624 confidence=best_score,
625 )
626 )
628 return pairs
630 def _content_similarity(self, content_a: bytes, content_b: bytes) -> float:
631 """Calculate content similarity.
633 Args:
634 content_a: First content.
635 content_b: Second content.
637 Returns:
638 Similarity score (0-1).
639 """
640 if not content_a or not content_b:
641 return 0.0
643 # Use byte set similarity (Jaccard-like)
644 set_a = set(content_a)
645 set_b = set(content_b)
647 intersection = len(set_a & set_b)
648 union = len(set_a | set_b)
650 if union == 0: 650 ↛ 651line 650 didn't jump to line 651 because the condition on line 650 was never true
651 return 0.0
653 return intersection / union
656# =============================================================================
657# Convenience functions
658# =============================================================================
661def detect_sequence_patterns(
662 messages: Sequence[Any],
663 key: Callable[[Any], Any] | None = None,
664 min_length: int = 2,
665 max_length: int = 10,
666 min_frequency: int = 2,
667) -> list[SequencePattern]:
668 """Detect sequential patterns in messages.
670 Implements RE-SEQ-002: Sequence Pattern Detection.
672 Args:
673 messages: Sequence of messages.
674 key: Function to extract message identifier.
675 min_length: Minimum pattern length.
676 max_length: Maximum pattern length.
677 min_frequency: Minimum occurrences.
679 Returns:
680 List of detected patterns.
682 Example:
683 >>> patterns = detect_sequence_patterns(
684 ... messages,
685 ... key=lambda m: m['type']
686 ... )
687 """
688 detector = SequencePatternDetector(
689 min_pattern_length=min_length,
690 max_pattern_length=max_length,
691 min_frequency=min_frequency,
692 )
693 return detector.detect_patterns(messages, key)
696def correlate_requests(
697 messages: Sequence[Any],
698 request_filter: Callable[[Any], bool],
699 response_filter: Callable[[Any], bool],
700 timestamp_key: Callable[[Any], float] | None = None,
701 max_latency: float = 10.0,
702) -> list[RequestResponsePair]:
703 """Correlate request and response messages.
705 Implements RE-SEQ-003: Request-Response Correlation.
707 Args:
708 messages: All messages.
709 request_filter: Function to identify requests.
710 response_filter: Function to identify responses.
711 timestamp_key: Function to extract timestamp.
712 max_latency: Maximum time between request and response.
714 Returns:
715 List of correlated pairs.
717 Example:
718 >>> pairs = correlate_requests(
719 ... messages,
720 ... request_filter=lambda m: m['dir'] == 'out',
721 ... response_filter=lambda m: m['dir'] == 'in',
722 ... timestamp_key=lambda m: m['ts']
723 ... )
724 """
725 correlator = RequestResponseCorrelator(max_latency=max_latency)
726 return correlator.correlate(messages, request_filter, response_filter, timestamp_key)
729def find_message_dependencies(
730 messages: Sequence[Any],
731 key: Callable[[Any], Any],
732 timestamp_key: Callable[[Any], float] | None = None,
733) -> dict[Any, list[Any]]:
734 """Find dependencies between message types.
736 Implements RE-SEQ-002: Dependency detection.
738 Args:
739 messages: Sequence of messages.
740 key: Function to extract message type.
741 timestamp_key: Function to extract timestamp.
743 Returns:
744 Dictionary mapping message types to their typical successors.
746 Example:
747 >>> deps = find_message_dependencies(messages, key=lambda m: m['type'])
748 >>> deps['REQ'] # ['RSP', 'ACK']
749 """
750 dependencies: dict[Any, list[Any]] = defaultdict(list)
752 for i in range(len(messages) - 1):
753 current = key(messages[i])
754 next_msg = key(messages[i + 1])
756 if next_msg not in dependencies[current]:
757 dependencies[current].append(next_msg)
759 return dict(dependencies)
762def calculate_latency_stats(
763 pairs: Sequence[RequestResponsePair],
764) -> dict[str, float]:
765 """Calculate latency statistics for request-response pairs.
767 Implements RE-SEQ-003: Latency analysis.
769 Args:
770 pairs: List of request-response pairs.
772 Returns:
773 Dictionary with min, max, mean, median, std latencies.
774 """
775 if not pairs:
776 return {
777 "min": 0.0,
778 "max": 0.0,
779 "mean": 0.0,
780 "median": 0.0,
781 "std": 0.0,
782 }
784 latencies = [p.latency for p in pairs]
785 arr = np.array(latencies)
787 return {
788 "min": float(np.min(arr)),
789 "max": float(np.max(arr)),
790 "mean": float(np.mean(arr)),
791 "median": float(np.median(arr)),
792 "std": float(np.std(arr)),
793 }
796__all__ = [
797 "CommunicationFlow",
798 "RequestResponseCorrelator",
799 "RequestResponsePair",
800 # Data classes
801 "SequencePattern",
802 # Classes
803 "SequencePatternDetector",
804 "calculate_latency_stats",
805 "correlate_requests",
806 # Functions
807 "detect_sequence_patterns",
808 "find_message_dependencies",
809]