Coverage for src / kemi / dedup.py: 100%

55 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-05 15:47 +0000

1from datetime import datetime, timezone 

2 

3from kemi.models import MemoryObject 

4from kemi.scoring import cosine_similarity 

5 

6NEGATION_WORDS = { 

7 "no", 

8 "not", 

9 "never", 

10 "don't", 

11 "doesn't", 

12 "didn't", 

13 "won't", 

14 "can't", 

15 "cannot", 

16 "hate", 

17 "dislike", 

18 "avoid", 

19 "stop", 

20 "stopped", 

21 "quit", 

22 "quitting", 

23 "anymore", 

24 "ceased", 

25} 

26 

27SENTIMENT_SHIFT_PAIRS = [ 

28 ("love", "hate"), 

29 ("like", "dislike"), 

30 ("enjoy", "avoid"), 

31 ("always", "never"), 

32 ("do", "don't"), 

33 ("will", "won't"), 

34 ("can", "can't"), 

35 ("start", "stop"), 

36 ("begin", "quit"), 

37 ("am", "am not"), 

38 ("was", "wasn't"), 

39 ("is", "isn't"), 

40 ("have", "haven't"), 

41 ("had", "hadn't"), 

42] 

43 

44 

45def _extract_nouns(text: str) -> set[str]: 

46 words = text.lower().split() 

47 result = set() 

48 skip_next = False 

49 for i, word in enumerate(words): 

50 if skip_next: 

51 skip_next = False 

52 continue 

53 clean = word.strip(",.!?;:'\"") 

54 if len(clean) > 2: 

55 result.add(clean) 

56 if i + 1 < len(words) and words[i + 1] in ("at", "in", "on", "to", "for"): 

57 skip_next = True 

58 result.add(words[i + 1].strip(",.!?;:'\"")) 

59 return result 

60 

61 

62def has_sentiment_flip(text_a: str, text_b: str) -> bool: 

63 words_a = set(text_a.lower().split()) 

64 words_b = set(text_b.lower().split()) 

65 

66 has_neg_a = bool(words_a & NEGATION_WORDS) 

67 has_neg_b = bool(words_b & NEGATION_WORDS) 

68 

69 nouns_a = _extract_nouns(text_a) 

70 nouns_b = _extract_nouns(text_b) 

71 common_nouns = nouns_a & nouns_b 

72 

73 for pos, neg in SENTIMENT_SHIFT_PAIRS: # pragma: no cover (edge case) 

74 if (pos in words_a and neg in words_b) or ( 

75 neg in words_a and pos in words_b 

76 ): # pragma: no cover (edge case) 

77 if common_nouns: # pragma: no cover (edge case) 

78 return True # pragma: no cover (edge case) 

79 

80 if has_neg_a != has_neg_b and common_nouns: # pragma: no cover (edge case) 

81 return True # pragma: no cover (edge case) 

82 

83 return False 

84 

85 

86def find_duplicates( 

87 new_memory: MemoryObject, 

88 existing_memories: list[MemoryObject], 

89 threshold: float = 0.85, 

90) -> list[MemoryObject]: 

91 """Find memories that are semantically similar to new_memory. 

92 

93 Returns memories with cosine similarity strictly above threshold (>= threshold is NOT included). 

94 Default threshold is 0.85. 

95 """ 

96 if new_memory.embedding is None or not existing_memories: 

97 return [] 

98 

99 duplicates = [] 

100 for existing in existing_memories: 

101 if existing.embedding is None: 

102 continue 

103 

104 similarity = cosine_similarity(new_memory.embedding, existing.embedding) 

105 normalized_sim = (similarity + 1.0) / 2.0 

106 

107 if normalized_sim > threshold: 

108 if has_sentiment_flip( 

109 new_memory.content, existing.content 

110 ): # pragma: no cover (edge case) 

111 continue # pragma: no cover (edge case) 

112 duplicates.append(existing) 

113 

114 return duplicates 

115 

116 

117def find_conflicts( 

118 new_memory: MemoryObject, 

119 existing_memories: list[MemoryObject], 

120 conflict_threshold: float = 0.65, 

121 dedup_threshold: float = 0.85, 

122) -> list[MemoryObject]: 

123 """Find memories that are potentially conflicting with new_memory. 

124 

125 Returns memories with similarity strictly between conflict_threshold and 

126 dedup_threshold. This range excludes duplicates (above dedup_threshold) and 

127 excludes unrelated (below conflict_threshold). Default: 0.65 < similarity < 0.85 

128 """ 

129 if new_memory.embedding is None or not existing_memories: 

130 return [] 

131 

132 conflicts = [] 

133 for existing in existing_memories: 

134 if existing.embedding is None: 

135 continue 

136 

137 similarity = cosine_similarity(new_memory.embedding, existing.embedding) 

138 normalized_sim = (similarity + 1.0) / 2.0 

139 

140 if conflict_threshold < normalized_sim < dedup_threshold: 

141 conflicts.append(existing) 

142 

143 return conflicts 

144 

145 

146def resolve_duplicate( 

147 new_memory: MemoryObject, 

148 existing: MemoryObject, 

149) -> MemoryObject: 

150 """Resolve duplicate using LATEST_WINS strategy. 

151 

152 Copies content from new_memory into existing, updates last_accessed_at, 

153 preserves the existing memory_id. 

154 

155 Does not mutate either input. Returns a new MemoryObject. 

156 """ 

157 

158 return MemoryObject( 

159 memory_id=existing.memory_id, 

160 user_id=existing.user_id, 

161 content=new_memory.content, 

162 embedding=new_memory.embedding if new_memory.embedding is not None else existing.embedding, 

163 score=0.0, 

164 created_at=existing.created_at, 

165 last_accessed_at=datetime.now(timezone.utc), 

166 source=existing.source, 

167 importance=existing.importance, 

168 lifecycle_state=existing.lifecycle_state, 

169 metadata=existing.metadata.copy() if existing.metadata else {}, 

170 embedding_dim=new_memory.embedding_dim or existing.embedding_dim, 

171 tags=new_memory.tags if new_memory.tags else existing.tags, 

172 confidence=new_memory.confidence, 

173 memory_type=new_memory.memory_type, 

174 session_id=new_memory.session_id or existing.session_id, 

175 namespace=new_memory.namespace or existing.namespace, 

176 expires_at=new_memory.expires_at or existing.expires_at, 

177 version=existing.version + 1, 

178 )