Coverage for src/blob_dict/dict/path.py: 0%

119 statements  

« prev     ^ index     » next       coverage.py v7.8.1, created at 2025-06-22 06:35 -0700

1import shutil 

2from abc import abstractmethod 

3from collections.abc import Iterator 

4from datetime import UTC, datetime, timedelta 

5from mimetypes import guess_type 

6from pathlib import Path 

7from typing import Any, Literal, Protocol, cast, override 

8 

9from extratools_core.path import rm_with_empty_parents 

10from extratools_core.typing import PathLike, SearchableMapping 

11 

12from ..blob import BytesBlob, StrBlob 

13from ..blob.json import JsonDictBlob, YamlDictBlob 

14from . import MutableBlobDictBase 

15 

16 

17class LocalPath(Path): 

18 def rmtree(self) -> None: 

19 shutil.rmtree(self) 

20 

21 

22class ExtraPathLike(PathLike, Protocol): 

23 @abstractmethod 

24 def rmtree(self) -> None: 

25 ... 

26 

27 

28class PathBlobDict(MutableBlobDictBase, SearchableMapping[str, BytesBlob]): 

29 def __init__( 

30 self, 

31 path: ExtraPathLike | None = None, 

32 *, 

33 compression: bool = False, 

34 ttl: timedelta | None = None, 

35 blob_class: type[BytesBlob] = BytesBlob, 

36 blob_class_args: dict[str, Any] | None = None, 

37 ) -> None: 

38 super().__init__() 

39 

40 if path is None: 

41 path = LocalPath(".") 

42 

43 if isinstance(path, Path): 

44 path = path.expanduser() 

45 

46 self.__path: ExtraPathLike = path 

47 

48 # The concept of relative path does not exist for `CloudPath`, 

49 # and each walked path is always absolute for `CloudPath`. 

50 # Therefore, we extract each key by removing the path prefix. 

51 # In this way, the same logic works for both absolute and relative path. 

52 self.__prefix_len: int = ( 

53 len(str(self.__path.absolute())) 

54 # Extra 1 is for separator `/` between prefix and filename 

55 + 1 

56 ) 

57 

58 self.__compression: bool = compression 

59 

60 # Note that we do not automatically cleanup by TTL for reasons below: 

61 # - It is tricky to do so for local path without CRON job or daemon process 

62 # - Multiple objects could actually use same directory with different TTLs 

63 # Thus, it is best to depend on native solution for cleanup by TTL, 

64 # like S3's object lifecycle management. 

65 self.__ttl: timedelta | None = ttl 

66 

67 self.__blob_class: type[BytesBlob] = blob_class 

68 self.__blob_class_args: dict[str, Any] = blob_class_args or {} 

69 

70 def create(self) -> None: 

71 self.__path.mkdir( 

72 parents=True, 

73 exist_ok=True, 

74 ) 

75 

76 def delete(self) -> None: 

77 self.__path.rmtree() 

78 

79 def __is_expired(self, key_path: PathLike) -> bool: 

80 return ( 

81 datetime.now(UTC) 

82 - datetime.fromtimestamp(key_path.stat().st_mtime, UTC) 

83 > cast("timedelta", self.__ttl) 

84 ) 

85 

86 @override 

87 def __contains__(self, key: object) -> bool: 

88 key_path: PathLike = self.__path / str(key) 

89 

90 return ( 

91 key_path.is_file() 

92 and ( 

93 not self.__ttl 

94 or not self.__is_expired(key_path) 

95 ) 

96 ) 

97 

98 def __get_blob_class(self, key: str) -> type[BytesBlob]: # noqa: PLR0911 

99 mime_type: str | None 

100 mime_type, _ = guess_type(self.__path / key) 

101 

102 match mime_type: 

103 case "application/json": 

104 return JsonDictBlob 

105 case "application/octet-stream": 

106 return BytesBlob 

107 case "application/yaml": 

108 return YamlDictBlob 

109 case "audo/mpeg": 

110 # Import here as it has optional dependency 

111 from ..blob.audio import AudioBlob # noqa: PLC0415 

112 

113 return AudioBlob 

114 case "image/png": 

115 # Import here as it has optional dependency 

116 from ..blob.image import ImageBlob # noqa: PLC0415 

117 

118 return ImageBlob 

119 case ( 

120 "text/css" 

121 | "text/csv" 

122 | "text/html" 

123 | "text/javascript" 

124 | "text/markdown" 

125 | "text/plain" 

126 | "text/xml" 

127 ): 

128 return StrBlob 

129 case "video/mp4": 

130 # Import here as it has optional dependency 

131 from ..blob.video import VideoBlob # noqa: PLC0415 

132 

133 return VideoBlob 

134 case _: 

135 return self.__blob_class 

136 

137 def _get(self, key: str, blob_bytes: bytes) -> BytesBlob: 

138 blob: BytesBlob = BytesBlob.from_bytes(blob_bytes, compression=self.__compression) 

139 return blob.as_blob( 

140 self.__get_blob_class(key), 

141 self.__blob_class_args, 

142 ) 

143 

144 @override 

145 def __getitem__(self, key: str, /) -> BytesBlob: 

146 if key not in self: 

147 raise KeyError 

148 

149 return self._get(key, (self.__path / key).read_bytes()) 

150 

151 def __path_to_str(self, path: ExtraPathLike) -> str: 

152 return str(path.absolute())[self.__prefix_len:] 

153 

154 @override 

155 def __iter__(self) -> Iterator[str]: 

156 for parent, _, files in self.__path.walk(): 

157 for filename in files: 

158 key_path: PathLike = parent / filename 

159 if self.__ttl and self.__is_expired(key_path): 

160 continue 

161 

162 yield self.__path_to_str(key_path) 

163 

164 @override 

165 def search(self, filter_body: str | None = None) -> Iterator[str]: 

166 for key_path in self.__path.glob(filter_body or "**"): 

167 if key_path.is_file(): 

168 yield self.__path_to_str(key_path) 

169 

170 @override 

171 def clear(self) -> None: 

172 for parent, dirs, files in self.__path.walk(top_down=False): 

173 for filename in files: 

174 (parent / filename).unlink() 

175 for dirname in dirs: 

176 (parent / dirname).rmdir() 

177 

178 def __cleanup(self, key: str) -> None: 

179 rm_with_empty_parents(self.__path / key, stop=self.__path) 

180 

181 @override 

182 def pop[T: Any]( 

183 self, 

184 key: str, 

185 /, 

186 default: BytesBlob | T | Literal["__DEFAULT"] = "__DEFAULT", 

187 ) -> BytesBlob | T: 

188 blob: BytesBlob | None = self.get(key) 

189 if blob: 

190 self.__cleanup(key) 

191 

192 if blob is not None: 

193 return blob 

194 

195 if default == "__DEFAULT": 

196 raise KeyError 

197 

198 return default 

199 

200 @override 

201 def __delitem__(self, key: str, /) -> None: 

202 if key not in self: 

203 raise KeyError 

204 

205 self.__cleanup(key) 

206 

207 __BAD_BLOB_CLASS_ERROR_MESSAGE: str = "Must specify blob that is instance of {blob_class}" 

208 

209 @override 

210 def __setitem__(self, key: str, blob: BytesBlob, /) -> None: 

211 if not isinstance(blob, self.__blob_class): 

212 raise TypeError(PathBlobDict.__BAD_BLOB_CLASS_ERROR_MESSAGE.format( 

213 blob_class=self.__blob_class, 

214 )) 

215 

216 (self.__path / key).parent.mkdir( 

217 parents=True, 

218 exist_ok=True, 

219 ) 

220 

221 blob_bytes: bytes = blob.as_bytes(compression=self.__compression) 

222 (self.__path / key).write_bytes(blob_bytes)