Coverage for repo_ctx / providers / local.py: 83%

133 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-25 17:42 +0100

1"""Local Git repository provider.""" 

2 

3import asyncio 

4import hashlib 

5import json 

6import re 

7from pathlib import Path 

8from typing import Optional, List 

9 

10from git import Repo, InvalidGitRepositoryError 

11 

12from .base import GitProvider, ProviderProject, ProviderFile 

13 

14 

15class LocalGitProvider(GitProvider): 

16 """Provider for local Git repositories. 

17 

18 Indexes repositories from the local filesystem without network access. 

19 Provides faster indexing compared to remote providers. 

20 """ 

21 

22 def __init__(self, repo_path: str): 

23 """Initialize local Git provider. 

24 

25 Args: 

26 repo_path: Path to Git repository (absolute, relative, or ~) 

27 

28 Raises: 

29 FileNotFoundError: If path doesn't exist 

30 ValueError: If path is not a Git repository 

31 """ 

32 self.repo_path = Path(repo_path).expanduser().resolve() 

33 

34 if not self.repo_path.exists(): 

35 raise FileNotFoundError(f"Repository path does not exist: {repo_path}") 

36 

37 try: 

38 self.repo = Repo(str(self.repo_path)) 

39 except InvalidGitRepositoryError: 

40 raise ValueError(f"Path is not a Git repository: {repo_path}") 

41 

42 async def get_project(self, path: str) -> ProviderProject: 

43 """Get project metadata from local repository. 

44 

45 Args: 

46 path: Repository path (same as __init__) 

47 

48 Returns: 

49 ProviderProject with extracted metadata 

50 """ 

51 # Run in thread pool to avoid blocking 

52 return await asyncio.to_thread(self._get_project_sync) 

53 

54 def _get_project_sync(self) -> ProviderProject: 

55 """Synchronous implementation of get_project.""" 

56 # Extract project name from directory name 

57 project_name = self.repo_path.name 

58 

59 # Try to get description from git config or README 

60 description = self._get_description() 

61 

62 # Get remote URL if available 

63 web_url = self._get_remote_url() 

64 

65 # Generate stable project ID 

66 project_id = self._generate_project_id() 

67 

68 # Get current branch 

69 current_branch = self._get_current_branch() 

70 

71 return ProviderProject( 

72 id=project_id, 

73 name=project_name, 

74 path=str(self.repo_path), 

75 description=description, 

76 default_branch=current_branch, 

77 web_url=web_url 

78 ) 

79 

80 def _get_description(self) -> Optional[str]: 

81 """Extract repository description from git config or README.""" 

82 # Try git config first 

83 try: 

84 config = self.repo.config_reader() 

85 if config.has_option("gitweb", "description"): 

86 return config.get("gitweb", "description") 

87 except Exception: 

88 pass 

89 

90 # Fall back to first line of README 

91 for readme_name in ["README.md", "README.rst", "README.txt", "README"]: 

92 readme_path = self.repo_path / readme_name 

93 if readme_path.exists(): 

94 try: 

95 with open(readme_path) as f: 

96 first_line = f.readline().strip() 

97 # Remove markdown heading markers 

98 return first_line.lstrip("#").strip() 

99 except Exception: 

100 pass 

101 

102 return None 

103 

104 def _get_remote_url(self) -> Optional[str]: 

105 """Get remote URL if configured.""" 

106 try: 

107 if 'origin' in self.repo.remotes: 

108 return self.repo.remotes.origin.url 

109 except Exception: 

110 pass 

111 return None 

112 

113 def _get_current_branch(self) -> str: 

114 """Get current branch name.""" 

115 try: 

116 return self.repo.active_branch.name 

117 except (TypeError, AttributeError): 

118 # Detached HEAD or empty repo 

119 # Try to get default branch from HEAD ref 

120 try: 

121 head_ref = self.repo.head.ref 

122 return head_ref.name if hasattr(head_ref, 'name') else "main" 

123 except Exception: 

124 return "main" 

125 

126 def _generate_project_id(self) -> str: 

127 """Generate stable project identifier.""" 

128 # Use remote URL if available 

129 remote_url = self._get_remote_url() 

130 if remote_url: 

131 # Parse GitHub/GitLab URL: https://github.com/owner/repo.git 

132 # Extract: github.com/owner/repo 

133 match = re.search(r'([^/:]+/[^/]+?)(\.git)?$', remote_url) 

134 if match: 

135 return match.group(1) 

136 

137 # Fallback: use path hash 

138 path_hash = hashlib.sha256(str(self.repo_path).encode()).hexdigest()[:12] 

139 return f"local-{path_hash}" 

140 

141 async def get_default_branch(self, project: ProviderProject) -> str: 

142 """Get default branch name.""" 

143 return project.default_branch 

144 

145 async def get_file_tree( 

146 self, 

147 project: ProviderProject, 

148 ref: str, 

149 recursive: bool = True 

150 ) -> List[str]: 

151 """Get file tree at specific ref. 

152 

153 Args: 

154 project: Project to get files from 

155 ref: Branch, tag, or commit SHA 

156 recursive: If True, include subdirectories 

157 

158 Returns: 

159 List of file paths relative to repo root 

160 """ 

161 return await asyncio.to_thread( 

162 self._get_file_tree_sync, ref, recursive 

163 ) 

164 

165 def _get_file_tree_sync(self, ref: str, recursive: bool) -> List[str]: 

166 """Synchronous implementation of get_file_tree.""" 

167 try: 

168 # Get tree object for ref 

169 commit = self.repo.commit(ref) 

170 tree = commit.tree 

171 

172 files = [] 

173 

174 if recursive: 

175 # Recursive traversal 

176 for item in tree.traverse(): 

177 if item.type == 'blob': # File, not directory 

178 # Skip binary files 

179 if not self._is_binary_file(item): 

180 files.append(item.path) 

181 else: 

182 # Only root level files 

183 for item in tree: 

184 if item.type == 'blob': 

185 if not self._is_binary_file(item): 

186 files.append(item.name) 

187 

188 return files 

189 except Exception as e: 

190 raise ValueError(f"Failed to get file tree for ref '{ref}': {e}") 

191 

192 def _is_binary_file(self, blob) -> bool: 

193 """Check if file is binary. 

194 

195 Args: 

196 blob: Git blob object 

197 

198 Returns: 

199 True if file appears to be binary 

200 """ 

201 try: 

202 # Get the binary data directly 

203 data = blob.data_stream.read() 

204 

205 # Check for null bytes in first 8KB (common in binary files) 

206 sample = data[:8192] 

207 return b'\x00' in sample 

208 except Exception: 

209 return False 

210 

211 async def read_file( 

212 self, 

213 project: ProviderProject, 

214 path: str, 

215 ref: str 

216 ) -> ProviderFile: 

217 """Read file content at specific ref. 

218 

219 Args: 

220 project: Project containing the file 

221 path: File path relative to repo root 

222 ref: Branch, tag, or commit SHA 

223 

224 Returns: 

225 ProviderFile with content 

226 """ 

227 return await asyncio.to_thread( 

228 self._read_file_sync, path, ref 

229 ) 

230 

231 def _read_file_sync(self, path: str, ref: str) -> ProviderFile: 

232 """Synchronous implementation of read_file.""" 

233 try: 

234 commit = self.repo.commit(ref) 

235 blob = commit.tree / path 

236 

237 content = blob.data_stream.read().decode('utf-8', errors='replace') 

238 

239 return ProviderFile( 

240 path=path, 

241 content=content, 

242 size=blob.size 

243 ) 

244 except KeyError: 

245 raise FileNotFoundError(f"File '{path}' not found at ref '{ref}'") 

246 except Exception as e: 

247 raise FileNotFoundError(f"File '{path}' not found at ref '{ref}': {e}") 

248 

249 async def read_config( 

250 self, 

251 project: ProviderProject, 

252 ref: str 

253 ) -> Optional[dict]: 

254 """Read .repo-ctx.json or git_context.json configuration. 

255 

256 Args: 

257 project: Project to read config from 

258 ref: Branch or tag to read config from 

259 

260 Returns: 

261 Configuration dict or None if not found 

262 """ 

263 for config_name in [".repo-ctx.json", "git_context.json", ".git_context.json", "repo_context.json"]: 

264 try: 

265 file = await self.read_file(project, config_name, ref) 

266 return json.loads(file.content) 

267 except FileNotFoundError: 

268 continue 

269 

270 return None 

271 

272 async def get_tags( 

273 self, 

274 project: ProviderProject, 

275 limit: int = 5 

276 ) -> List[str]: 

277 """Get repository tags sorted by creation date. 

278 

279 Args: 

280 project: Project to get tags from 

281 limit: Maximum number of tags to return 

282 

283 Returns: 

284 List of tag names, most recent first 

285 """ 

286 return await asyncio.to_thread(self._get_tags_sync, limit) 

287 

288 def _get_tags_sync(self, limit: int) -> List[str]: 

289 """Synchronous implementation of get_tags.""" 

290 try: 

291 # Get all tags with their commit dates 

292 tags_with_dates = [] 

293 for tag in self.repo.tags: 

294 try: 

295 # Get commit date 

296 commit = tag.commit 

297 date = commit.committed_datetime 

298 tags_with_dates.append((tag.name, date)) 

299 except Exception: 

300 # Skip tags that can't be resolved 

301 continue 

302 

303 # Sort by date (newest first) 

304 tags_with_dates.sort(key=lambda x: x[1], reverse=True) 

305 

306 # Return tag names only 

307 return [name for name, _ in tags_with_dates[:limit]] 

308 except Exception: 

309 return [] 

310 

311 async def list_projects_in_group( 

312 self, 

313 group_path: str, 

314 include_subgroups: bool = True 

315 ) -> List[ProviderProject]: 

316 """List projects in a group. 

317 

318 Not supported for local provider. 

319 

320 Raises: 

321 NotImplementedError: Local provider doesn't support groups 

322 """ 

323 raise NotImplementedError( 

324 "Local provider does not support listing projects in groups. " 

325 "Use a directory scanner instead." 

326 )