Coverage for src / invariant / store / disk.py: 92.00%

50 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-08 09:24 +0000

1"""DiskStore: Bounded persistent artifact storage backed by diskcache.""" 

2 

3from pathlib import Path 

4from typing import Any 

5 

6from diskcache import Cache 

7 

8from invariant.cacheable import is_cacheable 

9from invariant.store.base import ArtifactStore 

10from invariant.store.codec import deserialize, serialize 

11 

12 

13class DiskStore(ArtifactStore): 

14 """Bounded persistent artifact store backed by diskcache. 

15 

16 The cache is stored on the local filesystem under `.invariant/cache/` 

17 by default, with metadata managed by SQLite via diskcache. Values are the 

18 serialized artifact bytes, so the store codec contract is unchanged. 

19 """ 

20 

21 def __init__( 

22 self, 

23 cache_dir: Path | str | None = None, 

24 *, 

25 size_limit_bytes: int = 2**30, 

26 eviction_policy: str = "least-frequently-used", 

27 cull_limit: int = 1, 

28 ) -> None: 

29 """Initialize DiskStore. 

30 

31 Args: 

32 cache_dir: Directory to store cache. Defaults to `.invariant/cache/` 

33 in the current working directory. 

34 size_limit_bytes: Approximate maximum on-disk size of the cache. 

35 eviction_policy: diskcache eviction policy. Defaults to LFU. 

36 cull_limit: Maximum entries diskcache will evict per cull cycle. 

37 """ 

38 if cache_dir is None: 

39 cache_dir = Path.cwd() / ".invariant" / "cache" 

40 elif isinstance(cache_dir, str): 

41 cache_dir = Path(cache_dir) 

42 

43 self.cache_dir = cache_dir 

44 self.cache_dir.mkdir(parents=True, exist_ok=True) 

45 self.size_limit_bytes = size_limit_bytes 

46 self.eviction_policy = eviction_policy 

47 self.cull_limit = cull_limit 

48 self._cache = Cache( 

49 directory=str(self.cache_dir), 

50 size_limit=size_limit_bytes, 

51 eviction_policy=eviction_policy, 

52 cull_limit=cull_limit, 

53 ) 

54 super().__init__() 

55 

56 def _make_key(self, op_name: str, digest: str) -> str: 

57 """Create a stable composite cache key from operation and digest. 

58 

59 Args: 

60 op_name: The name of the operation. 

61 digest: The SHA-256 hash (64 character hex string). 

62 

63 Returns: 

64 Composite key suitable for diskcache. 

65 """ 

66 if len(digest) != 64: 

67 raise ValueError(f"Invalid digest length: {len(digest)}, expected 64") 

68 

69 return f"{op_name}:{digest}" 

70 

71 def exists(self, op_name: str, digest: str) -> bool: 

72 """Check if an artifact exists.""" 

73 key = self._make_key(op_name, digest) 

74 exists = key in self._cache 

75 if exists: 

76 self.stats.hits += 1 

77 else: 

78 self.stats.misses += 1 

79 return exists 

80 

81 def get(self, op_name: str, digest: str) -> Any: 

82 """Retrieve an artifact by operation name and digest. 

83 

84 Raises: 

85 KeyError: If artifact does not exist. 

86 """ 

87 key = self._make_key(op_name, digest) 

88 

89 serialized = self._cache.get(key, default=None) 

90 if serialized is None: 

91 raise KeyError( 

92 f"Artifact with op_name '{op_name}' and digest '{digest}' not found" 

93 ) 

94 

95 return deserialize(serialized) 

96 

97 def put(self, op_name: str, digest: str, artifact: Any) -> None: 

98 """Store an artifact with the given operation name and digest.""" 

99 if not is_cacheable(artifact): 

100 raise TypeError( 

101 f"Artifact is not cacheable: {type(artifact)}. " 

102 f"Use is_cacheable() to check values before storing." 

103 ) 

104 

105 key = self._make_key(op_name, digest) 

106 serialized_data = serialize(artifact) 

107 self._cache.set(key, serialized_data) 

108 self.stats.puts += 1 

109 

110 def close(self) -> None: 

111 """Close the underlying diskcache handle.""" 

112 self._cache.close() 

113 

114 def __del__(self) -> None: 

115 """Best-effort cleanup for callers that don't explicitly close the store.""" 

116 try: 

117 self.close() 

118 except Exception: 

119 pass