Coverage for session_buddy / memory / persistence.py: 91.78%

63 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1from __future__ import annotations 

2 

3import hashlib 

4import json 

5import time 

6import typing as t 

7from dataclasses import dataclass 

8from datetime import UTC, datetime 

9from pathlib import Path 

10 

11import duckdb 

12from session_buddy.memory.entity_extractor import ( 

13 EntityRelationship, 

14 ExtractedEntity, 

15 ProcessedMemory, 

16) 

17from session_buddy.settings import get_settings as _get_settings 

18 

19 

20def get_settings() -> t.Any: 

21 """Local indirection for tests to monkeypatch.""" 

22 return _get_settings() 

23 

24 

25def _connect() -> duckdb.DuckDBPyConnection: 

26 settings = get_settings() 

27 db_path = Path( 

28 str(getattr(settings, "database_path", "~/.claude/data/reflection.duckdb")) 

29 ).expanduser() 

30 db_path.parent.mkdir(parents=True, exist_ok=True) 

31 return duckdb.connect(str(db_path), config={"allow_unsigned_extensions": True}) 

32 

33 

34def _new_id(prefix: str = "mem") -> str: 

35 return hashlib.md5( 

36 f"{prefix}_{time.time()}".encode(), 

37 usedforsecurity=False, 

38 ).hexdigest() 

39 

40 

41@dataclass(slots=True) 

42class PersistResult: 

43 memory_id: str 

44 entity_ids: list[str] 

45 relationship_ids: list[str] 

46 

47 

48def insert_processed_memory( 

49 pm: ProcessedMemory, 

50 content: str, 

51 *, 

52 project: str | None = None, 

53 namespace: str = "default", 

54 embedding: list[float] | None = None, 

55 session_id: str | None = None, 

56 user_id: str | None = None, 

57) -> PersistResult: 

58 """Insert a processed memory into v2 tables. 

59 

60 Returns ids for the memory and inserted entities/relationships. 

61 """ 

62 memory_id = _new_id("conv") 

63 entity_ids: list[str] = [] 

64 relationship_ids: list[str] = [] 

65 

66 with _connect() as conn: 

67 # conversations_v2 

68 _insert_conversation( 

69 conn, 

70 memory_id, 

71 content, 

72 embedding, 

73 pm, 

74 project, 

75 namespace, 

76 session_id, 

77 user_id, 

78 ) 

79 

80 # Insert entities 

81 value_to_id: dict[str, str] = {} 

82 entity_ids = _insert_entities(conn, pm.entities, memory_id, value_to_id) 

83 

84 # Insert relationships 

85 relationship_ids = _insert_relationships( 

86 conn, pm.relationships, value_to_id, memory_id 

87 ) 

88 

89 return PersistResult( 

90 memory_id=memory_id, 

91 entity_ids=entity_ids, 

92 relationship_ids=relationship_ids, 

93 ) 

94 

95 

96def _insert_conversation( 

97 conn: duckdb.DuckDBPyConnection, 

98 memory_id: str, 

99 content: str, 

100 embedding: list[float] | None, 

101 pm: ProcessedMemory, 

102 project: str | None, 

103 namespace: str, 

104 session_id: str | None, 

105 user_id: str | None, 

106) -> None: 

107 """Insert main conversation record.""" 

108 conn.execute( 

109 """ 

110 INSERT INTO conversations_v2 ( 

111 id, content, embedding, category, subcategory, importance_score, 

112 memory_tier, access_count, last_accessed, project, namespace, 

113 timestamp, session_id, user_id, searchable_content, reasoning 

114 ) VALUES (?, ?, ?, ?, ?, ?, ?, 0, NULL, ?, ?, CURRENT_TIMESTAMP, ?, ?, ?, ?) 

115 """, 

116 [ 

117 memory_id, 

118 content, 

119 embedding, 

120 pm.category, 

121 pm.subcategory, 

122 pm.importance_score, 

123 pm.suggested_tier, 

124 project, 

125 namespace, 

126 session_id, 

127 user_id, 

128 pm.searchable_content, 

129 pm.reasoning, 

130 ], 

131 ) 

132 

133 

134def _insert_entities( 

135 conn: duckdb.DuckDBPyConnection, 

136 entities: list[ExtractedEntity], 

137 memory_id: str, 

138 value_to_id: dict[str, str], 

139) -> list[str]: 

140 """Insert entities and return their IDs.""" 

141 entity_ids: list[str] = [] 

142 for ent in entities: 

143 if not isinstance(ent, ExtractedEntity): 143 ↛ 145line 143 didn't jump to line 145 because the condition on line 143 was never true

144 # When validated from JSON, Pydantic ensures type; guard anyway 

145 ent = ExtractedEntity.model_validate(ent) # type: ignore[assignment] 

146 ent_id = _new_id("ent") 

147 conn.execute( 

148 """ 

149 INSERT INTO memory_entities ( 

150 id, memory_id, entity_type, entity_value, confidence 

151 ) VALUES (?, ?, ?, ?, ?) 

152 """, 

153 [ent_id, memory_id, ent.entity_type, ent.entity_value, ent.confidence], 

154 ) 

155 entity_ids.append(ent_id) 

156 value_to_id.setdefault(ent.entity_value, ent_id) 

157 return entity_ids 

158 

159 

160def _insert_relationships( 

161 conn: duckdb.DuckDBPyConnection, 

162 relationships: list[EntityRelationship], 

163 value_to_id: dict[str, str], 

164 memory_id: str, 

165) -> list[str]: 

166 """Insert relationships and return their IDs.""" 

167 relationship_ids: list[str] = [] 

168 for rel in relationships: 

169 if not isinstance(rel, EntityRelationship): 169 ↛ 170line 169 didn't jump to line 170 because the condition on line 169 was never true

170 rel = EntityRelationship.model_validate(rel) # type: ignore[assignment] 

171 from_id = value_to_id.get(rel.from_entity) 

172 to_id = value_to_id.get(rel.to_entity) 

173 if not (from_id and to_id): 173 ↛ 175line 173 didn't jump to line 175 because the condition on line 173 was never true

174 # Skip if referenced entity values are missing 

175 continue 

176 rel_id = _new_id("rel") 

177 conn.execute( 

178 """ 

179 INSERT INTO memory_relationships ( 

180 id, from_entity_id, to_entity_id, relationship_type, strength 

181 ) VALUES (?, ?, ?, ?, ?) 

182 """, 

183 [rel_id, from_id, to_id, rel.relationship_type, rel.strength], 

184 ) 

185 relationship_ids.append(rel_id) 

186 return relationship_ids 

187 

188 

189def log_memory_access(memory_id: str, access_type: str = "search") -> None: 

190 """Append an access log entry for later analysis by the Conscious Agent.""" 

191 with _connect() as conn: 

192 conn.execute( 

193 "INSERT INTO memory_access_log (id, memory_id, access_type, timestamp) VALUES (?, ?, ?, CURRENT_TIMESTAMP)", 

194 [_new_id("acc"), memory_id, access_type], 

195 )