Coverage for repo_ctx / providers / local.py: 83%
133 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-25 17:42 +0100
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-25 17:42 +0100
1"""Local Git repository provider."""
3import asyncio
4import hashlib
5import json
6import re
7from pathlib import Path
8from typing import Optional, List
10from git import Repo, InvalidGitRepositoryError
12from .base import GitProvider, ProviderProject, ProviderFile
15class LocalGitProvider(GitProvider):
16 """Provider for local Git repositories.
18 Indexes repositories from the local filesystem without network access.
19 Provides faster indexing compared to remote providers.
20 """
22 def __init__(self, repo_path: str):
23 """Initialize local Git provider.
25 Args:
26 repo_path: Path to Git repository (absolute, relative, or ~)
28 Raises:
29 FileNotFoundError: If path doesn't exist
30 ValueError: If path is not a Git repository
31 """
32 self.repo_path = Path(repo_path).expanduser().resolve()
34 if not self.repo_path.exists():
35 raise FileNotFoundError(f"Repository path does not exist: {repo_path}")
37 try:
38 self.repo = Repo(str(self.repo_path))
39 except InvalidGitRepositoryError:
40 raise ValueError(f"Path is not a Git repository: {repo_path}")
42 async def get_project(self, path: str) -> ProviderProject:
43 """Get project metadata from local repository.
45 Args:
46 path: Repository path (same as __init__)
48 Returns:
49 ProviderProject with extracted metadata
50 """
51 # Run in thread pool to avoid blocking
52 return await asyncio.to_thread(self._get_project_sync)
54 def _get_project_sync(self) -> ProviderProject:
55 """Synchronous implementation of get_project."""
56 # Extract project name from directory name
57 project_name = self.repo_path.name
59 # Try to get description from git config or README
60 description = self._get_description()
62 # Get remote URL if available
63 web_url = self._get_remote_url()
65 # Generate stable project ID
66 project_id = self._generate_project_id()
68 # Get current branch
69 current_branch = self._get_current_branch()
71 return ProviderProject(
72 id=project_id,
73 name=project_name,
74 path=str(self.repo_path),
75 description=description,
76 default_branch=current_branch,
77 web_url=web_url
78 )
80 def _get_description(self) -> Optional[str]:
81 """Extract repository description from git config or README."""
82 # Try git config first
83 try:
84 config = self.repo.config_reader()
85 if config.has_option("gitweb", "description"):
86 return config.get("gitweb", "description")
87 except Exception:
88 pass
90 # Fall back to first line of README
91 for readme_name in ["README.md", "README.rst", "README.txt", "README"]:
92 readme_path = self.repo_path / readme_name
93 if readme_path.exists():
94 try:
95 with open(readme_path) as f:
96 first_line = f.readline().strip()
97 # Remove markdown heading markers
98 return first_line.lstrip("#").strip()
99 except Exception:
100 pass
102 return None
104 def _get_remote_url(self) -> Optional[str]:
105 """Get remote URL if configured."""
106 try:
107 if 'origin' in self.repo.remotes:
108 return self.repo.remotes.origin.url
109 except Exception:
110 pass
111 return None
113 def _get_current_branch(self) -> str:
114 """Get current branch name."""
115 try:
116 return self.repo.active_branch.name
117 except (TypeError, AttributeError):
118 # Detached HEAD or empty repo
119 # Try to get default branch from HEAD ref
120 try:
121 head_ref = self.repo.head.ref
122 return head_ref.name if hasattr(head_ref, 'name') else "main"
123 except Exception:
124 return "main"
126 def _generate_project_id(self) -> str:
127 """Generate stable project identifier."""
128 # Use remote URL if available
129 remote_url = self._get_remote_url()
130 if remote_url:
131 # Parse GitHub/GitLab URL: https://github.com/owner/repo.git
132 # Extract: github.com/owner/repo
133 match = re.search(r'([^/:]+/[^/]+?)(\.git)?$', remote_url)
134 if match:
135 return match.group(1)
137 # Fallback: use path hash
138 path_hash = hashlib.sha256(str(self.repo_path).encode()).hexdigest()[:12]
139 return f"local-{path_hash}"
141 async def get_default_branch(self, project: ProviderProject) -> str:
142 """Get default branch name."""
143 return project.default_branch
145 async def get_file_tree(
146 self,
147 project: ProviderProject,
148 ref: str,
149 recursive: bool = True
150 ) -> List[str]:
151 """Get file tree at specific ref.
153 Args:
154 project: Project to get files from
155 ref: Branch, tag, or commit SHA
156 recursive: If True, include subdirectories
158 Returns:
159 List of file paths relative to repo root
160 """
161 return await asyncio.to_thread(
162 self._get_file_tree_sync, ref, recursive
163 )
165 def _get_file_tree_sync(self, ref: str, recursive: bool) -> List[str]:
166 """Synchronous implementation of get_file_tree."""
167 try:
168 # Get tree object for ref
169 commit = self.repo.commit(ref)
170 tree = commit.tree
172 files = []
174 if recursive:
175 # Recursive traversal
176 for item in tree.traverse():
177 if item.type == 'blob': # File, not directory
178 # Skip binary files
179 if not self._is_binary_file(item):
180 files.append(item.path)
181 else:
182 # Only root level files
183 for item in tree:
184 if item.type == 'blob':
185 if not self._is_binary_file(item):
186 files.append(item.name)
188 return files
189 except Exception as e:
190 raise ValueError(f"Failed to get file tree for ref '{ref}': {e}")
192 def _is_binary_file(self, blob) -> bool:
193 """Check if file is binary.
195 Args:
196 blob: Git blob object
198 Returns:
199 True if file appears to be binary
200 """
201 try:
202 # Get the binary data directly
203 data = blob.data_stream.read()
205 # Check for null bytes in first 8KB (common in binary files)
206 sample = data[:8192]
207 return b'\x00' in sample
208 except Exception:
209 return False
211 async def read_file(
212 self,
213 project: ProviderProject,
214 path: str,
215 ref: str
216 ) -> ProviderFile:
217 """Read file content at specific ref.
219 Args:
220 project: Project containing the file
221 path: File path relative to repo root
222 ref: Branch, tag, or commit SHA
224 Returns:
225 ProviderFile with content
226 """
227 return await asyncio.to_thread(
228 self._read_file_sync, path, ref
229 )
231 def _read_file_sync(self, path: str, ref: str) -> ProviderFile:
232 """Synchronous implementation of read_file."""
233 try:
234 commit = self.repo.commit(ref)
235 blob = commit.tree / path
237 content = blob.data_stream.read().decode('utf-8', errors='replace')
239 return ProviderFile(
240 path=path,
241 content=content,
242 size=blob.size
243 )
244 except KeyError:
245 raise FileNotFoundError(f"File '{path}' not found at ref '{ref}'")
246 except Exception as e:
247 raise FileNotFoundError(f"File '{path}' not found at ref '{ref}': {e}")
249 async def read_config(
250 self,
251 project: ProviderProject,
252 ref: str
253 ) -> Optional[dict]:
254 """Read .repo-ctx.json or git_context.json configuration.
256 Args:
257 project: Project to read config from
258 ref: Branch or tag to read config from
260 Returns:
261 Configuration dict or None if not found
262 """
263 for config_name in [".repo-ctx.json", "git_context.json", ".git_context.json", "repo_context.json"]:
264 try:
265 file = await self.read_file(project, config_name, ref)
266 return json.loads(file.content)
267 except FileNotFoundError:
268 continue
270 return None
272 async def get_tags(
273 self,
274 project: ProviderProject,
275 limit: int = 5
276 ) -> List[str]:
277 """Get repository tags sorted by creation date.
279 Args:
280 project: Project to get tags from
281 limit: Maximum number of tags to return
283 Returns:
284 List of tag names, most recent first
285 """
286 return await asyncio.to_thread(self._get_tags_sync, limit)
288 def _get_tags_sync(self, limit: int) -> List[str]:
289 """Synchronous implementation of get_tags."""
290 try:
291 # Get all tags with their commit dates
292 tags_with_dates = []
293 for tag in self.repo.tags:
294 try:
295 # Get commit date
296 commit = tag.commit
297 date = commit.committed_datetime
298 tags_with_dates.append((tag.name, date))
299 except Exception:
300 # Skip tags that can't be resolved
301 continue
303 # Sort by date (newest first)
304 tags_with_dates.sort(key=lambda x: x[1], reverse=True)
306 # Return tag names only
307 return [name for name, _ in tags_with_dates[:limit]]
308 except Exception:
309 return []
311 async def list_projects_in_group(
312 self,
313 group_path: str,
314 include_subgroups: bool = True
315 ) -> List[ProviderProject]:
316 """List projects in a group.
318 Not supported for local provider.
320 Raises:
321 NotImplementedError: Local provider doesn't support groups
322 """
323 raise NotImplementedError(
324 "Local provider does not support listing projects in groups. "
325 "Use a directory scanner instead."
326 )