Coverage for src/extratools_core/path.py: 48%

98 statements  

« prev     ^ index     » next       coverage.py v7.8.1, created at 2025-06-29 23:51 -0700

1from __future__ import annotations 

2 

3import shutil 

4from collections.abc import Callable, Iterable 

5from datetime import UTC, datetime, timedelta 

6from enum import IntEnum 

7from io import StringIO 

8from pathlib import Path 

9from stat import S_IFMT 

10 

11from .typing import PathLike 

12 

13 

14def clear_dir(curr_dir: PathLike) -> None: 

15 """ 

16 Based on example in https://docs.python.org/3/library/pathlib.html#pathlib.Path.walk 

17 """ 

18 

19 if not curr_dir.is_dir(): 

20 raise ValueError 

21 

22 for parent, dirs, files in curr_dir.walk(top_down=False): 

23 for filename in files: 

24 (parent / filename).unlink() 

25 for dirname in dirs: 

26 (parent / dirname).rmdir() 

27 

28 

29def rm_with_empty_parents( 

30 curr: PathLike, 

31 *, 

32 stop: PathLike | None = None, 

33) -> None: 

34 curr.unlink() 

35 

36 for parent in curr.parents: 

37 if parent == stop: 

38 return 

39 

40 if parent.is_dir() and next(iter(parent.iterdir()), None) is None: 

41 parent.rmdir() 

42 

43 

44def cleanup_dir_by_ttl( 

45 curr_dir: PathLike, 

46 ttl: timedelta | datetime, 

47 *, 

48 include_empty_parents: bool = True, 

49 return_before_delete: bool = False, 

50) -> Iterable[tuple[PathLike, datetime]]: 

51 if not curr_dir.is_dir(): 

52 raise ValueError 

53 

54 now: datetime = datetime.now(UTC) 

55 

56 for parent, _, files in curr_dir.walk(top_down=False): 

57 for filename in files: 

58 f: PathLike = (parent / filename) 

59 

60 last_modified_time: datetime = datetime.fromtimestamp(f.stat().st_mtime, UTC) 

61 if isinstance(ttl, timedelta): 

62 ttl = now - ttl 

63 

64 if last_modified_time < ttl: 

65 if return_before_delete: 

66 yield (f, last_modified_time) 

67 

68 if include_empty_parents: 

69 rm_with_empty_parents(f, stop=curr_dir) 

70 else: 

71 f.unlink() 

72 

73 if not return_before_delete: 

74 yield (f, last_modified_time) 

75 

76 

77def read_text_by_pattern( 

78 *patterns: str, 

79 pwd: Path | str | None = None, 

80 seperator: str | None = None, 

81 add_newline: bool = True, 

82) -> str: 

83 sio = StringIO() 

84 

85 pwd = Path() if not pwd else Path(pwd).expanduser() 

86 for pattern in patterns: 

87 for path in pwd.glob(pattern): 

88 if not path.is_file(): 

89 continue 

90 

91 file_content = path.read_text() 

92 sio.write(file_content) 

93 

94 if add_newline and file_content[-1] != "\n": 

95 sio.write("\n") 

96 

97 if seperator: 

98 sio.write(seperator) 

99 

100 content: str = sio.getvalue() 

101 sio.close() 

102 return content 

103 

104 

105def __find_sibling( 

106 curr: PathLike, 

107 *, 

108 match_type: bool = True, 

109 match_dot_prefix: bool = True, 

110 cmp_op: Callable[[str, str], bool], 

111 min_max_func: Callable[..., PathLike], 

112) -> PathLike | None: 

113 # Must use absolute path to be able to get get name and parent correctly 

114 curr = curr.absolute() 

115 parent: PathLike = curr.parent 

116 

117 result: PathLike | None = None 

118 for sibling in parent.iterdir(): 

119 if match_type and ( 

120 PathType.get(curr) != PathType.get(sibling) 

121 ): 

122 continue 

123 if match_dot_prefix and ( 

124 (curr.name.startswith(".") and not sibling.name.startswith(".")) 

125 or (not curr.name.startswith(".") and sibling.name.startswith(".")) 

126 ): 

127 continue 

128 if not cmp_op(curr.name, sibling.name): 

129 continue 

130 

131 result = ( 

132 min_max_func(sibling, result, key=lambda x: x.name) if result 

133 else sibling 

134 ) 

135 

136 return result 

137 

138 

139def find_next_sibling( 

140 curr: PathLike, 

141 *, 

142 match_type: bool = True, 

143 match_dot_prefix: bool = True, 

144) -> PathLike | None: 

145 return __find_sibling( 

146 curr, 

147 match_type=match_type, 

148 match_dot_prefix=match_dot_prefix, 

149 cmp_op=lambda curr_name, sibling_name: curr_name < sibling_name, 

150 min_max_func=min, 

151 ) 

152 

153 

154def find_previous_sibling( 

155 curr: PathLike, 

156 *, 

157 match_type: bool = True, 

158 match_dot_prefix: bool = True, 

159) -> PathLike | None: 

160 return __find_sibling( 

161 curr, 

162 match_type=match_type, 

163 match_dot_prefix=match_dot_prefix, 

164 cmp_op=lambda curr_name, sibling_name: curr_name > sibling_name, 

165 min_max_func=max, 

166 ) 

167 

168 

169class PathType(IntEnum): 

170 # Defined in `stat` in standard library 

171 DIR = 0o040000 

172 CHAR_DEVICE = 0o020000 

173 BLOCK_DEVICE = 0o060000 

174 FILE = 0o100000 

175 FIFO = 0o010000 

176 SYM_LINK = 0o120000 

177 SOCKET = 0o140000 

178 UNKNOWN = 0 

179 

180 @staticmethod 

181 def get(path: PathLike) -> PathType: 

182 if isinstance(path, Path): 

183 return PathType(S_IFMT(path.stat().st_mode)) 

184 

185 if path.is_dir(): 

186 return PathType.DIR 

187 if path.is_file(): 

188 return PathType.FILE 

189 return PathType.UNKNOWN 

190 

191 

192class LocalPath(Path): 

193 def rmtree(self) -> None: 

194 shutil.rmtree(self) 

195 

196 def path_type(self) -> PathType: 

197 return PathType.get(self)