Coverage for src / kemi / operations / _ops_versioning.py: 93%

67 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-05 15:47 +0000

1"""Versioning operations: configure_versioning, get_history, diff_versions, rollback_memory. 

2 

3These free functions are called by the corresponding ``Memory`` methods. 

4Public API is unchanged — the ``Memory`` class still exposes 

5``memory.configure_versioning()``, ``memory.get_history()``, etc. 

6""" 

7 

8from __future__ import annotations 

9 

10import logging 

11import sqlite3 

12from typing import TYPE_CHECKING, Any 

13 

14from kemi.versions import DiffResult, MemoryVersionStore, RollbackResult, VersionSnapshot 

15 

16if TYPE_CHECKING: 

17 from kemi._memory_impl import Memory 

18 

19logger = logging.getLogger(__name__) 

20 

21 

22def configure( 

23 memory: "Memory", 

24 db_path: str | None, 

25 max_versions_per_memory: int, 

26 auto_prune_versions: bool, 

27) -> None: 

28 """Enable memory version history tracking.""" 

29 if db_path is None: 

30 try: 

31 db_path = memory._store._db_path # type: ignore[attr-defined] 

32 except AttributeError: 

33 logger.warning("Cannot determine database path for version store") 

34 return 

35 

36 try: 

37 memory._version_store = MemoryVersionStore(db_path=db_path) 

38 memory._max_versions_per_memory = max_versions_per_memory 

39 memory._auto_prune_versions = auto_prune_versions 

40 logger.info( 

41 "Memory versioning enabled (max %d versions per memory)", 

42 max_versions_per_memory, 

43 ) 

44 except (OSError, sqlite3.DatabaseError) as e: 

45 logger.warning("Failed to initialise version store: %s", e) 

46 

47 

48def get_store(memory: "Memory") -> MemoryVersionStore: 

49 """Get or lazily initialise the version store.""" 

50 if memory._version_store is None: 

51 db_path: str | None 

52 try: 

53 db_path = memory._store._db_path # type: ignore[attr-defined] 

54 except AttributeError: 

55 # No persistent path — use in-memory DB so in-process operations work. 

56 db_path = ":memory:" 

57 memory._version_store = MemoryVersionStore(db_path=db_path) 

58 return memory._version_store 

59 

60 

61def get_history( 

62 memory: "Memory", 

63 memory_id: str, 

64 limit: int = 100, 

65) -> list[VersionSnapshot]: 

66 """Return version history for a memory, newest first.""" 

67 try: 

68 vs = get_store(memory) 

69 snapshots = vs.list_versions(memory_id) 

70 return snapshots[:limit] 

71 except (OSError, sqlite3.DatabaseError, AttributeError): 

72 logger.debug("get_history failed for %s", memory_id, exc_info=True) 

73 return [] 

74 

75 

76def diff( 

77 memory: "Memory", 

78 memory_id: str, 

79 from_version: int, 

80 to_version: int, 

81) -> DiffResult | None: 

82 """Show field-level differences between two versions of a memory.""" 

83 try: 

84 vs = get_store(memory) 

85 return vs.diff(memory_id, from_version, to_version) 

86 except (OSError, sqlite3.DatabaseError, AttributeError): 

87 logger.debug( 

88 "diff_versions failed for %s (%d -> %d)", 

89 memory_id, 

90 from_version, 

91 to_version, 

92 exc_info=True, 

93 ) 

94 return None 

95 

96 

97def rollback( 

98 memory: "Memory", 

99 memory_id: str, 

100 target_version: int, 

101) -> RollbackResult | None: 

102 """Roll a memory back to a previous version.""" 

103 try: 

104 vs = get_store(memory) 

105 return vs.rollback( 

106 memory_id=memory_id, 

107 target_version=target_version, 

108 store=memory._store, 

109 ) 

110 except (OSError, sqlite3.DatabaseError, AttributeError, ValueError): 

111 logger.debug( 

112 "rollback_memory failed for %s to v%d", 

113 memory_id, 

114 target_version, 

115 exc_info=True, 

116 ) 

117 return None 

118 

119 

120def auto_prune(memory: "Memory", memory_id: str) -> None: 

121 """Prune old versions for a memory if auto-prune is enabled.""" 

122 if not memory._auto_prune_versions or memory._version_store is None: 

123 return 

124 try: 

125 vs = get_store(memory) 

126 snapshots = vs.list_versions(memory_id) 

127 if len(snapshots) > memory._max_versions_per_memory: 

128 version_ids = [ 

129 s.version for s in snapshots[memory._max_versions_per_memory:] 

130 ] 

131 conn = vs._get_connection() 

132 try: 

133 for v in version_ids: 

134 conn.execute( 

135 "DELETE FROM memory_versions " 

136 "WHERE memory_id = ? AND version = ?", 

137 (memory_id, v), 

138 ) 

139 conn.commit() 

140 finally: 

141 conn.close() 

142 logger.info( 

143 "Pruned %d old versions for memory %s", 

144 len(version_ids), 

145 memory_id, 

146 ) 

147 except (OSError, sqlite3.DatabaseError, AttributeError): 

148 logger.debug("Failed to prune versions for %s", memory_id, exc_info=True)