Coverage for session_buddy / utils / file_utils.py: 74.44%

132 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1#!/usr/bin/env python3 

2"""File and directory utilities for session management. 

3 

4This module provides file system operations following crackerjack 

5architecture patterns with single responsibility principle. 

6""" 

7 

8from __future__ import annotations 

9 

10import os 

11import shutil 

12import subprocess # nosec B404 

13from contextlib import suppress 

14from datetime import UTC, datetime, timedelta 

15from pathlib import Path 

16from typing import Any 

17 

18 

19def _cleanup_session_logs() -> str: 

20 """Clean up old session log files, keeping recent ones.""" 

21 claude_dir = Path.home() / ".claude" / "logs" 

22 if not claude_dir.exists(): 22 ↛ 23line 22 didn't jump to line 23 because the condition on line 22 was never true

23 return "📝 No log directory found" 

24 

25 log_files = list(claude_dir.glob("session_management_*.log")) 

26 if not log_files: 26 ↛ 27line 26 didn't jump to line 27 because the condition on line 26 was never true

27 return "📝 No session log files found" 

28 

29 # Keep logs from last 10 days 

30 cutoff_date = datetime.now(UTC) - timedelta(days=10) 

31 cleaned_count = 0 

32 

33 for log_file in log_files: 

34 try: 

35 # Extract date from filename: session_management_YYYYMMDD.log 

36 date_str = log_file.stem.split("_")[-1] # Gets the YYYYMMDD part 

37 if len(date_str) == 8 and date_str.isdigit(): 37 ↛ 33line 37 didn't jump to line 33 because the condition on line 37 was always true

38 log_date = datetime.strptime(date_str, "%Y%m%d").replace(tzinfo=UTC) 

39 if log_date < cutoff_date: 39 ↛ 40line 39 didn't jump to line 40 because the condition on line 39 was never true

40 log_file.unlink() 

41 cleaned_count += 1 

42 except (ValueError, OSError): 

43 # Skip files with invalid names or permission issues 

44 continue 

45 

46 remaining_count = len(list(claude_dir.glob("session_management_*.log"))) 

47 return f"📝 Cleaned {cleaned_count} old log files, {remaining_count} retained" 

48 

49 

50def _get_cleanup_patterns() -> list[str]: 

51 """Get list of file patterns to clean up.""" 

52 return [ 

53 "**/.DS_Store", 

54 "**/__pycache__", 

55 "**/*.pyc", 

56 "**/*.pyo", 

57 "**/node_modules/.cache", 

58 "**/.pytest_cache", 

59 "**/coverage.xml", 

60 "**/.coverage", 

61 "**/htmlcov", 

62 "**/tmp_*", 

63 "**/.tmp", 

64 "**/temp_*", 

65 ] 

66 

67 

68def _calculate_item_size(item: Path) -> int: 

69 """Calculate size of file or directory in MB.""" 

70 size_mb = 0 

71 with suppress(OSError, PermissionError): 

72 if item.is_file(): 72 ↛ 73line 72 didn't jump to line 73 because the condition on line 72 was never true

73 size_mb = int(item.stat().st_size / (1024 * 1024)) 

74 elif item.is_dir(): 74 ↛ 80line 74 didn't jump to line 80

75 # Calculate directory size 

76 with suppress(PermissionError, OSError): 

77 for subitem in item.rglob("*"): 

78 if subitem.is_file(): 

79 size_mb += int(subitem.stat().st_size / (1024 * 1024)) 

80 return size_mb 

81 

82 

83def _cleanup_item(item: Path) -> tuple[str, int]: 

84 """Clean up a single item and return its display name and size.""" 

85 with suppress(PermissionError, OSError): 

86 if item.is_file(): 86 ↛ 87line 86 didn't jump to line 87 because the condition on line 86 was never true

87 size_mb = _calculate_item_size(item) 

88 item.unlink() 

89 return f"🗑️ {item.name}", size_mb 

90 if item.is_dir(): 90 ↛ 94line 90 didn't jump to line 94

91 size_mb = _calculate_item_size(item) 

92 shutil.rmtree(item, ignore_errors=True) 

93 return f"📁 {item.name}/", size_mb 

94 return "", 0 

95 

96 

97def _cleanup_temp_files(current_dir: Path) -> str: 

98 """Clean up temporary files and caches.""" 

99 cleanup_patterns = _get_cleanup_patterns() 

100 cleaned_items: list[str] = [] 

101 

102 total_size_mb = _process_cleanup_patterns( 

103 current_dir, 

104 cleanup_patterns, 

105 cleaned_items, 

106 ) 

107 

108 if not cleaned_items: 

109 return "🧹 No temporary files found to clean" 

110 

111 return _format_cleanup_results(cleaned_items, total_size_mb) 

112 

113 

114def _process_cleanup_patterns( 

115 current_dir: Path, 

116 patterns: list[str], 

117 cleaned_items: list[str], 

118) -> float: 

119 """Process each cleanup pattern and collect results.""" 

120 total_size_mb = 0.0 

121 for pattern in patterns: 

122 total_size_mb += _process_single_pattern(current_dir, pattern, cleaned_items) 

123 return total_size_mb 

124 

125 

126def _process_single_pattern( 

127 current_dir: Path, 

128 pattern: str, 

129 cleaned_items: list[str], 

130) -> float: 

131 """Process a single cleanup pattern.""" 

132 pattern_size_mb = 0.0 

133 with suppress(PermissionError, OSError): 

134 for item in current_dir.glob(pattern): 

135 if item.exists(): 135 ↛ 134line 135 didn't jump to line 134 because the condition on line 135 was always true

136 display_name, size_mb = _cleanup_item(item) 

137 if display_name: 137 ↛ 134line 137 didn't jump to line 134 because the condition on line 137 was always true

138 cleaned_items.append(display_name) 

139 pattern_size_mb += size_mb 

140 return pattern_size_mb 

141 

142 

143def _format_cleanup_results(cleaned_items: list[str], total_size_mb: float) -> str: 

144 """Format cleanup results for display.""" 

145 display_items = cleaned_items[:10] 

146 if len(cleaned_items) > 10: 146 ↛ 149line 146 didn't jump to line 149 because the condition on line 146 was always true

147 display_items.append(f"... and {len(cleaned_items) - 10} more items") 

148 

149 return ( 

150 f"🧹 Cleaned {len(cleaned_items)} items ({total_size_mb:.1f} MB): " 

151 + ", ".join(display_items) 

152 ) 

153 

154 

155def _cleanup_uv_cache() -> str: 

156 """Clean up UV package manager cache to free space.""" 

157 try: 

158 # Run uv cache clean command with timeout to prevent hanging 

159 result = subprocess.run( 

160 ["uv", "cache", "clean"], 

161 capture_output=True, 

162 text=True, 

163 check=False, 

164 timeout=30, # 30 second timeout to prevent test hangs 

165 ) 

166 

167 if result.returncode == 0: 

168 # Parse output for size information 

169 output = result.stdout.strip() 

170 if "freed" in output.lower() or "removed" in output.lower(): 

171 return f"📦 UV cache cleaned: {output}" 

172 return "📦 UV cache cleaned successfully" 

173 return f"⚠️ UV cache clean failed: {result.stderr.strip()}" 

174 

175 except FileNotFoundError: 

176 return "⚠️ UV not found, skipping cache cleanup" 

177 except subprocess.TimeoutExpired: 

178 return "⚠️ UV cache cleanup timed out after 30 seconds" 

179 except Exception as e: 

180 return f"⚠️ UV cache cleanup error: {e}" 

181 

182 

183def validate_claude_directory() -> dict[str, Any]: 

184 """Validate and set up Claude directory structure.""" 

185 claude_dir = Path.home() / ".claude" 

186 results = _initialize_validation_results(claude_dir) 

187 

188 try: 

189 _setup_main_directory(claude_dir, results) 

190 _setup_subdirectories(claude_dir, results) 

191 _calculate_directory_size(claude_dir, results) 

192 _validate_permissions(claude_dir, results) 

193 except Exception as e: 

194 results["success"] = False 

195 results["error"] = str(e) 

196 

197 return results 

198 

199 

200def _initialize_validation_results(claude_dir: Path) -> dict[str, Any]: 

201 """Initialize validation results dictionary.""" 

202 return { 

203 "success": True, 

204 "directory": str(claude_dir), 

205 "created": False, 

206 "structure": {}, 

207 "permissions": "ok", 

208 "size_mb": 0.0, 

209 } 

210 

211 

212def _setup_main_directory(claude_dir: Path, results: dict[str, Any]) -> None: 

213 """Set up main Claude directory.""" 

214 if not claude_dir.exists(): 214 ↛ 215line 214 didn't jump to line 215 because the condition on line 214 was never true

215 claude_dir.mkdir(parents=True, exist_ok=True) 

216 results["created"] = True 

217 

218 

219def _setup_subdirectories(claude_dir: Path, results: dict[str, Any]) -> None: 

220 """Set up Claude subdirectories.""" 

221 subdirs = ["logs", "data", "temp", "backups"] 

222 for subdir in subdirs: 

223 subdir_path = claude_dir / subdir 

224 subdir_path.mkdir(exist_ok=True) 

225 results["structure"][subdir] = { 

226 "exists": True, 

227 "writable": os.access(subdir_path, os.W_OK), 

228 "files": len(list(subdir_path.iterdir())) if subdir_path.exists() else 0, 

229 } 

230 

231 

232def _calculate_directory_size(claude_dir: Path, results: dict[str, Any]) -> None: 

233 """Calculate total directory size.""" 

234 total_size = 0 

235 for item in claude_dir.rglob("*"): 

236 if item.is_file(): 

237 try: 

238 total_size += item.stat().st_size 

239 except (OSError, PermissionError): 

240 continue 

241 

242 results["size_mb"] = total_size / (1024 * 1024) 

243 

244 

245def _validate_permissions(claude_dir: Path, results: dict[str, Any]) -> None: 

246 """Validate directory permissions.""" 

247 if not os.access(claude_dir, os.W_OK): 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true

248 results["permissions"] = "readonly" 

249 results["success"] = False