Coverage for src / gitq / git.py: 88%
250 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 15:32 -0400
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 15:32 -0400
1import os
2import subprocess
3import re
4from typing import List, Iterator, NamedTuple, Set, Iterable, Tuple
5from pathlib import Path
6from contextvars import ContextVar
8from .output import Output
10FNULL = open(os.devnull, "w")
12contextGit: ContextVar[Git] = ContextVar("git")
15class GitFailed(Exception):
17 def __init__(self, message: str, *, rc: int):
18 super().__init__(message)
19 self.rc = rc
22class MergeFound(Exception):
23 pass
26class UserError(Exception):
27 pass
30class AuthorDate(NamedTuple):
31 name: str
32 email: str
33 date: str
36def split_author(line: str) -> AuthorDate:
37 m = re.match(r"\s*([^\<\>]+) <([^\<\>]+)> ([\d\-\+\s]+?)\s*$", line)
38 assert m
39 return AuthorDate(m.group(1), m.group(2), m.group(3))
42class DupRecord(NamedTuple):
43 "a record output by `git cherry`"
45 is_new: bool
46 sha: str
48 @property
49 def is_duplicate(self):
50 return not self.is_new
53def coalesce(lines: Iterable[str]) -> Iterator[str]:
54 cur = None
55 for line in lines:
56 if cur is None:
57 cur = line
58 continue
59 if line.startswith(" "):
60 cur += line
61 continue
62 yield cur
63 cur = line
64 if cur is not None: 64 ↛ exitline 64 didn't return from function 'coalesce' because the condition on line 64 was always true
65 yield cur
68class Commit(object):
70 parents: List[str]
72 def __init__(self, *, log: str):
73 self.parents = list()
74 (headers, message) = log.split("\n\n", 1)
75 for header in coalesce(headers.split("\n")):
76 (key, value) = header.strip().split(" ", 1)
77 if key == "commit":
78 self.sha = value
79 if key == "parent":
80 self.parents.append(value)
81 if key == "tree":
82 self.tree = value
83 if key == "author":
84 self.author = value
85 if key == "committer":
86 self.committer = value
87 if key == "gpgsig": 87 ↛ 88line 87 didn't jump to line 88 because the condition on line 87 was never true
88 continue
89 assert message.endswith("\n")
90 lines = message[:-1].split("\n")
91 assert all(x.startswith(" ") for x in lines)
92 lines = [x[4:] for x in lines]
93 self.message = "\n".join(lines) + "\n"
95 @property
96 def summary(self) -> str:
97 return f"{self.sha[:10]} {self.title}"
99 @property
100 def is_merge(self) -> bool:
101 return len(self.parents) > 1
103 @property
104 def title(self) -> str:
105 return self.message.split("\n", 1)[0]
107 def __str__(self) -> str:
108 return self.sha[:10]
111class Git:
113 gitdir: Path # .git
114 directory: Path # toplevel, and where commands are run from
115 fetched: Set[str]
117 def __init__(self, directory=None):
118 self.fetched = set()
119 self.directory = Path(directory or ".")
120 try:
121 top = self("rev-parse", "--show-toplevel", quiet=True).strip()
122 except GitFailed as e:
123 raise UserError("Error: not a git repository") from e
124 if not top: 124 ↛ 125line 124 didn't jump to line 125 because the condition on line 124 was never true
125 raise UserError("Error: cannot find working directory. bare repository?")
126 self.directory = Path(top)
127 self.gitdir = self.directory / self("rev-parse", "--git-dir", quiet=True).strip()
129 def cmd(
130 self, cmd, *, quiet: bool = False, interactive: bool = False, comment: str = "", **kw
131 ) -> str:
132 if not quiet:
133 Output.log_cmd(cmd, comment=comment)
134 if interactive:
135 kw["stderr"] = subprocess.PIPE
136 else:
137 kw["stdin"] = FNULL
138 kw["stdout"] = subprocess.PIPE
139 kw["stderr"] = subprocess.STDOUT
140 proc = subprocess.Popen(cmd, cwd=self.directory, encoding="utf8", **kw)
141 (out, err) = proc.communicate()
142 if proc.wait() != 0:
143 err = ((out or "") + (err or "")).strip()
144 err, _ = re.subn(r"^", "\t", err, flags=re.MULTILINE)
145 raise GitFailed(f"git failed:\n{err}", rc=proc.wait())
146 return out
148 def __call__(self, *args, quiet: bool = False, comment: str = "") -> str:
149 return self.cmd(["git", *args], quiet=quiet, comment=comment)
151 def cmd_test(self, args, **kw) -> bool:
152 proc = subprocess.Popen(
153 args, cwd=self.directory, stdin=FNULL, stdout=FNULL, stderr=FNULL, **kw
154 )
155 if proc.wait() not in [0, 1]: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true
156 raise GitFailed("git failed", rc=proc.wait())
157 return not proc.wait()
159 def rev_parse(self, commit: str) -> str:
160 return self.cmd(["git", "rev-parse", commit], quiet=True).strip()
162 def symbolic_full_name(self, commit: str) -> str | None:
163 name = self.cmd(["git", "rev-parse", "--symbolic-full-name", commit], quiet=True).strip()
164 return name or None
166 def detach(self) -> None:
167 self.cmd(["git", "checkout", self.rev_parse("HEAD")], stderr=FNULL, comment="detach")
169 def upstream(self, branch: str) -> str | None:
170 "return the sha of the branch's upstream, or None"
171 try:
172 return self.rev_parse(branch + "@{upstream}")
173 except GitFailed as e:
174 if "no upstream configured for branch" in str(e): 174 ↛ 176line 174 didn't jump to line 176 because the condition on line 174 was always true
175 return None
176 raise
178 def head(self) -> str:
179 try:
180 return self.cmd(["git", "symbolic-ref", "HEAD"], quiet=True, stderr=FNULL).strip()
181 except GitFailed:
182 return self.rev_parse("HEAD")
184 def force_checkout(self, branch: str, comment: str = "") -> None:
185 self.cmd(["git", "checkout", "-f", branch], stderr=FNULL, comment=comment)
187 def commit(self, ref: str) -> Commit:
188 log = self.cmd("git log -n1 --no-notes --pretty=raw".split() + [ref, "--"], quiet=True)
189 return Commit(log=log)
191 def commits(self, *refs: str, reverse: bool = False) -> List[Commit]:
192 cmd = ["git", "log", "--topo-order", "-z", "--no-notes", "--pretty=raw"]
193 if reverse:
194 cmd.append("--reverse")
195 cmd.extend(refs)
196 cmd.append("--")
197 logs = self.cmd(cmd, quiet=True)
198 return [Commit(log=log) for log in logs.split("\x00") if log]
200 def checkout(self, branch: str, *, comment: str = "") -> None:
201 self.cmd(["git", "checkout", branch], stderr=FNULL, comment=comment)
203 @property
204 def continuation(self) -> Path:
205 return self.gitdir / "continuation.yaml"
207 def is_clean(self) -> bool:
208 if self("diff-files", "--name-only", quiet=True): 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true
209 return False
210 if self.on_orphan_branch(): 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true
211 return True
212 return not self("diff-index", "--cached", "--name-only", "HEAD", quiet=True)
214 @property
215 def cherry_pick_in_progress(self) -> bool:
216 return (self.gitdir / "CHERRY_PICK_HEAD").exists()
218 @property
219 def merge_in_progress(self) -> bool:
220 return (self.gitdir / "MERGE_HEAD").exists()
222 def unique_parent(self, commit: Commit) -> Commit:
223 if len(commit.parents) != 1:
224 raise MergeFound(f"{commit} is a merge")
225 return self.commit(commit.parents[0])
227 def unique_parent_or_root(self, commit: Commit) -> Commit | None:
228 if len(commit.parents) == 0:
229 return None
230 else:
231 return self.unique_parent(commit)
233 def branches(self) -> Iterator[str]:
234 for line in self.cmd(["git", "for-each-ref", "refs/heads"], quiet=True).splitlines():
235 m = re.search(r"\trefs/heads/(.*?)\s*$", line)
236 assert m
237 yield m.group(1)
239 def ref_exists(self, ref: str) -> bool:
240 return self.cmd_test(["git", "rev-parse", "--verify", "--quiet", ref, "--"])
242 def branch_exists(self, branch: str) -> bool:
243 return self.ref_exists(f"refs/heads/{branch}")
245 def ls_files(self, *args) -> Iterator[str]:
246 for line in self.cmd(["git", "ls-files", *args], quiet=True).splitlines():
247 yield line.rstrip()
249 def on_orphan_branch(self) -> bool:
250 """
251 Returns true if HEAD points to a branch name which does not yet
252 exist. This generally only happens after `git init`, or `git
253 checkout --orphan`.
254 """
255 try:
256 head = self.cmd(["git", "symbolic-ref", "HEAD"], quiet=True).strip()
257 except GitFailed:
258 return False
259 return not self.ref_exists(head)
261 def delete_index_and_files(self):
262 Output.log_cmd("git ls-files -z | xargs -0 rm")
263 for file in self.ls_files():
264 path = self.directory / file
265 if os.path.exists(path):
266 os.unlink(path)
267 self.cmd(["git", "read-tree", "--empty"])
269 def cherry_pick_abort(self) -> None:
270 if self.cherry_pick_in_progress: 270 ↛ exitline 270 didn't return from function 'cherry_pick_abort' because the condition on line 270 was always true
271 if self.on_orphan_branch():
272 Output.log_cmd(["rm", self.gitdir / "CHERRY_PICK_HEAD"])
273 (self.gitdir / "CHERRY_PICK_HEAD").unlink()
274 self.delete_index_and_files()
275 else:
276 self.cmd(["git", "cherry-pick", "--abort"])
278 def has_unmerged_files(self) -> bool:
279 return bool(self.cmd(["git", "ls-files", "--unmerged"], quiet=True).strip())
281 def unmerged_files(self) -> Set[str]:
282 lines = self.cmd(["git", "ls-files", "--unmerged"], quiet=True).splitlines()
283 return {line.strip().split("\t", 1)[1] for line in lines}
285 def find_remote(self, url: str) -> str | None:
286 for line in self.cmd(["git", "remote", "-v"], quiet=True).splitlines():
287 name, urlpart = line.rstrip().split("\t")
288 if urlpart == f"{url} (fetch)":
289 return name
290 return None
292 def fetch(self, remote: str):
293 if remote in self.fetched:
294 return
295 self.cmd(["git", "fetch", remote])
296 self.fetched.add(remote)
298 def is_conflicted(self, commit: Commit) -> bool:
299 if len(commit.parents) < 2: 299 ↛ 300line 299 didn't jump to line 300 because the condition on line 299 was never true
300 return False
301 if len(commit.parents) > 2: 301 ↛ 302line 301 didn't jump to line 302 because the condition on line 301 was never true
302 raise NotImplementedError # FIXME merge-tree can only take two arguments!
303 try:
304 tree = self("merge-tree", "--name-only", *commit.parents, quiet=True).strip()
305 except GitFailed as e:
306 if e.rc == 1: 306 ↛ 308line 306 didn't jump to line 308 because the condition on line 306 was always true
307 return True
308 raise
309 return not self.cmd_test(["git", "diff", "--quiet", commit.sha, tree, "--"])
311 def merge_tree(self, a: str, b: str) -> Tuple[str, Set[str]]:
312 cmd = ["git", "merge-tree", "--name-only", "--no-messages", "-z", a, b]
313 p = subprocess.run(cmd, cwd=self.directory, capture_output=True, text=True)
314 if p.returncode not in [0, 1]: 314 ↛ 315line 314 didn't jump to line 315 because the condition on line 314 was never true
315 raise GitFailed(f"git failed:\n{p.stderr}", rc=p.returncode)
316 tree, *conflicts = p.stdout.rstrip("\x00").split("\x00")
317 assert (p.returncode == 0) == (not conflicts)
318 return tree, set(conflicts)
320 def checkout_tree(self, tree: str) -> None:
321 "replace index and working files with the specified tree"
322 deleted = self("diff", "--diff-filter=A", "--name-only", tree).splitlines()
323 self("read-tree", tree)
324 self("checkout", "--", ".")
325 for rel in deleted:
326 (self.directory / rel).unlink()
328 def find_duplicates(self, base: str | None, branch: str, onto: str) -> Iterator[DupRecord]:
329 "Determine which commits in base..branch are cherry-picked in onto"
330 if base is None: 330 ↛ 331line 330 didn't jump to line 331 because the condition on line 330 was never true
331 output = self("cherry", onto, branch, quiet=True)
332 else:
333 output = self("cherry", onto, branch, base, quiet=True)
334 for line in output.strip().splitlines():
335 sign, sha = line.split(" ", 1)
336 assert sign in "-+"
337 yield DupRecord(sign == "+", sha)
339 def is_ancestor(self, ancestor: str, of: str = "HEAD") -> bool:
340 "Return True if ancestor is reachable from descendant"
341 return self.cmd_test(["git", "merge-base", "--is-ancestor", ancestor, of])
343 def abbrev(self, ref: str) -> str:
344 "Return abbreviated sha for ref"
345 return self.cmd(["git", "rev-parse", "--short", ref], quiet=True).strip()