Coverage for src / kemi / dedup.py: 100%
55 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-05 15:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-05 15:47 +0000
1from datetime import datetime, timezone
3from kemi.models import MemoryObject
4from kemi.scoring import cosine_similarity
6NEGATION_WORDS = {
7 "no",
8 "not",
9 "never",
10 "don't",
11 "doesn't",
12 "didn't",
13 "won't",
14 "can't",
15 "cannot",
16 "hate",
17 "dislike",
18 "avoid",
19 "stop",
20 "stopped",
21 "quit",
22 "quitting",
23 "anymore",
24 "ceased",
25}
27SENTIMENT_SHIFT_PAIRS = [
28 ("love", "hate"),
29 ("like", "dislike"),
30 ("enjoy", "avoid"),
31 ("always", "never"),
32 ("do", "don't"),
33 ("will", "won't"),
34 ("can", "can't"),
35 ("start", "stop"),
36 ("begin", "quit"),
37 ("am", "am not"),
38 ("was", "wasn't"),
39 ("is", "isn't"),
40 ("have", "haven't"),
41 ("had", "hadn't"),
42]
45def _extract_nouns(text: str) -> set[str]:
46 words = text.lower().split()
47 result = set()
48 skip_next = False
49 for i, word in enumerate(words):
50 if skip_next:
51 skip_next = False
52 continue
53 clean = word.strip(",.!?;:'\"")
54 if len(clean) > 2:
55 result.add(clean)
56 if i + 1 < len(words) and words[i + 1] in ("at", "in", "on", "to", "for"):
57 skip_next = True
58 result.add(words[i + 1].strip(",.!?;:'\""))
59 return result
62def has_sentiment_flip(text_a: str, text_b: str) -> bool:
63 words_a = set(text_a.lower().split())
64 words_b = set(text_b.lower().split())
66 has_neg_a = bool(words_a & NEGATION_WORDS)
67 has_neg_b = bool(words_b & NEGATION_WORDS)
69 nouns_a = _extract_nouns(text_a)
70 nouns_b = _extract_nouns(text_b)
71 common_nouns = nouns_a & nouns_b
73 for pos, neg in SENTIMENT_SHIFT_PAIRS: # pragma: no cover (edge case)
74 if (pos in words_a and neg in words_b) or (
75 neg in words_a and pos in words_b
76 ): # pragma: no cover (edge case)
77 if common_nouns: # pragma: no cover (edge case)
78 return True # pragma: no cover (edge case)
80 if has_neg_a != has_neg_b and common_nouns: # pragma: no cover (edge case)
81 return True # pragma: no cover (edge case)
83 return False
86def find_duplicates(
87 new_memory: MemoryObject,
88 existing_memories: list[MemoryObject],
89 threshold: float = 0.85,
90) -> list[MemoryObject]:
91 """Find memories that are semantically similar to new_memory.
93 Returns memories with cosine similarity strictly above threshold (>= threshold is NOT included).
94 Default threshold is 0.85.
95 """
96 if new_memory.embedding is None or not existing_memories:
97 return []
99 duplicates = []
100 for existing in existing_memories:
101 if existing.embedding is None:
102 continue
104 similarity = cosine_similarity(new_memory.embedding, existing.embedding)
105 normalized_sim = (similarity + 1.0) / 2.0
107 if normalized_sim > threshold:
108 if has_sentiment_flip(
109 new_memory.content, existing.content
110 ): # pragma: no cover (edge case)
111 continue # pragma: no cover (edge case)
112 duplicates.append(existing)
114 return duplicates
117def find_conflicts(
118 new_memory: MemoryObject,
119 existing_memories: list[MemoryObject],
120 conflict_threshold: float = 0.65,
121 dedup_threshold: float = 0.85,
122) -> list[MemoryObject]:
123 """Find memories that are potentially conflicting with new_memory.
125 Returns memories with similarity strictly between conflict_threshold and
126 dedup_threshold. This range excludes duplicates (above dedup_threshold) and
127 excludes unrelated (below conflict_threshold). Default: 0.65 < similarity < 0.85
128 """
129 if new_memory.embedding is None or not existing_memories:
130 return []
132 conflicts = []
133 for existing in existing_memories:
134 if existing.embedding is None:
135 continue
137 similarity = cosine_similarity(new_memory.embedding, existing.embedding)
138 normalized_sim = (similarity + 1.0) / 2.0
140 if conflict_threshold < normalized_sim < dedup_threshold:
141 conflicts.append(existing)
143 return conflicts
146def resolve_duplicate(
147 new_memory: MemoryObject,
148 existing: MemoryObject,
149) -> MemoryObject:
150 """Resolve duplicate using LATEST_WINS strategy.
152 Copies content from new_memory into existing, updates last_accessed_at,
153 preserves the existing memory_id.
155 Does not mutate either input. Returns a new MemoryObject.
156 """
158 return MemoryObject(
159 memory_id=existing.memory_id,
160 user_id=existing.user_id,
161 content=new_memory.content,
162 embedding=new_memory.embedding if new_memory.embedding is not None else existing.embedding,
163 score=0.0,
164 created_at=existing.created_at,
165 last_accessed_at=datetime.now(timezone.utc),
166 source=existing.source,
167 importance=existing.importance,
168 lifecycle_state=existing.lifecycle_state,
169 metadata=existing.metadata.copy() if existing.metadata else {},
170 embedding_dim=new_memory.embedding_dim or existing.embedding_dim,
171 tags=new_memory.tags if new_memory.tags else existing.tags,
172 confidence=new_memory.confidence,
173 memory_type=new_memory.memory_type,
174 session_id=new_memory.session_id or existing.session_id,
175 namespace=new_memory.namespace or existing.namespace,
176 expires_at=new_memory.expires_at or existing.expires_at,
177 version=existing.version + 1,
178 )