Coverage for src / gitq / git.py: 88%

250 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-15 15:32 -0400

1import os 

2import subprocess 

3import re 

4from typing import List, Iterator, NamedTuple, Set, Iterable, Tuple 

5from pathlib import Path 

6from contextvars import ContextVar 

7 

8from .output import Output 

9 

10FNULL = open(os.devnull, "w") 

11 

12contextGit: ContextVar[Git] = ContextVar("git") 

13 

14 

15class GitFailed(Exception): 

16 

17 def __init__(self, message: str, *, rc: int): 

18 super().__init__(message) 

19 self.rc = rc 

20 

21 

22class MergeFound(Exception): 

23 pass 

24 

25 

26class UserError(Exception): 

27 pass 

28 

29 

30class AuthorDate(NamedTuple): 

31 name: str 

32 email: str 

33 date: str 

34 

35 

36def split_author(line: str) -> AuthorDate: 

37 m = re.match(r"\s*([^\<\>]+) <([^\<\>]+)> ([\d\-\+\s]+?)\s*$", line) 

38 assert m 

39 return AuthorDate(m.group(1), m.group(2), m.group(3)) 

40 

41 

42class DupRecord(NamedTuple): 

43 "a record output by `git cherry`" 

44 

45 is_new: bool 

46 sha: str 

47 

48 @property 

49 def is_duplicate(self): 

50 return not self.is_new 

51 

52 

53def coalesce(lines: Iterable[str]) -> Iterator[str]: 

54 cur = None 

55 for line in lines: 

56 if cur is None: 

57 cur = line 

58 continue 

59 if line.startswith(" "): 

60 cur += line 

61 continue 

62 yield cur 

63 cur = line 

64 if cur is not None: 64 ↛ exitline 64 didn't return from function 'coalesce' because the condition on line 64 was always true

65 yield cur 

66 

67 

68class Commit(object): 

69 

70 parents: List[str] 

71 

72 def __init__(self, *, log: str): 

73 self.parents = list() 

74 (headers, message) = log.split("\n\n", 1) 

75 for header in coalesce(headers.split("\n")): 

76 (key, value) = header.strip().split(" ", 1) 

77 if key == "commit": 

78 self.sha = value 

79 if key == "parent": 

80 self.parents.append(value) 

81 if key == "tree": 

82 self.tree = value 

83 if key == "author": 

84 self.author = value 

85 if key == "committer": 

86 self.committer = value 

87 if key == "gpgsig": 87 ↛ 88line 87 didn't jump to line 88 because the condition on line 87 was never true

88 continue 

89 assert message.endswith("\n") 

90 lines = message[:-1].split("\n") 

91 assert all(x.startswith(" ") for x in lines) 

92 lines = [x[4:] for x in lines] 

93 self.message = "\n".join(lines) + "\n" 

94 

95 @property 

96 def summary(self) -> str: 

97 return f"{self.sha[:10]} {self.title}" 

98 

99 @property 

100 def is_merge(self) -> bool: 

101 return len(self.parents) > 1 

102 

103 @property 

104 def title(self) -> str: 

105 return self.message.split("\n", 1)[0] 

106 

107 def __str__(self) -> str: 

108 return self.sha[:10] 

109 

110 

111class Git: 

112 

113 gitdir: Path # .git 

114 directory: Path # toplevel, and where commands are run from 

115 fetched: Set[str] 

116 

117 def __init__(self, directory=None): 

118 self.fetched = set() 

119 self.directory = Path(directory or ".") 

120 try: 

121 top = self("rev-parse", "--show-toplevel", quiet=True).strip() 

122 except GitFailed as e: 

123 raise UserError("Error: not a git repository") from e 

124 if not top: 124 ↛ 125line 124 didn't jump to line 125 because the condition on line 124 was never true

125 raise UserError("Error: cannot find working directory. bare repository?") 

126 self.directory = Path(top) 

127 self.gitdir = self.directory / self("rev-parse", "--git-dir", quiet=True).strip() 

128 

129 def cmd( 

130 self, cmd, *, quiet: bool = False, interactive: bool = False, comment: str = "", **kw 

131 ) -> str: 

132 if not quiet: 

133 Output.log_cmd(cmd, comment=comment) 

134 if interactive: 

135 kw["stderr"] = subprocess.PIPE 

136 else: 

137 kw["stdin"] = FNULL 

138 kw["stdout"] = subprocess.PIPE 

139 kw["stderr"] = subprocess.STDOUT 

140 proc = subprocess.Popen(cmd, cwd=self.directory, encoding="utf8", **kw) 

141 (out, err) = proc.communicate() 

142 if proc.wait() != 0: 

143 err = ((out or "") + (err or "")).strip() 

144 err, _ = re.subn(r"^", "\t", err, flags=re.MULTILINE) 

145 raise GitFailed(f"git failed:\n{err}", rc=proc.wait()) 

146 return out 

147 

148 def __call__(self, *args, quiet: bool = False, comment: str = "") -> str: 

149 return self.cmd(["git", *args], quiet=quiet, comment=comment) 

150 

151 def cmd_test(self, args, **kw) -> bool: 

152 proc = subprocess.Popen( 

153 args, cwd=self.directory, stdin=FNULL, stdout=FNULL, stderr=FNULL, **kw 

154 ) 

155 if proc.wait() not in [0, 1]: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true

156 raise GitFailed("git failed", rc=proc.wait()) 

157 return not proc.wait() 

158 

159 def rev_parse(self, commit: str) -> str: 

160 return self.cmd(["git", "rev-parse", commit], quiet=True).strip() 

161 

162 def symbolic_full_name(self, commit: str) -> str | None: 

163 name = self.cmd(["git", "rev-parse", "--symbolic-full-name", commit], quiet=True).strip() 

164 return name or None 

165 

166 def detach(self) -> None: 

167 self.cmd(["git", "checkout", self.rev_parse("HEAD")], stderr=FNULL, comment="detach") 

168 

169 def upstream(self, branch: str) -> str | None: 

170 "return the sha of the branch's upstream, or None" 

171 try: 

172 return self.rev_parse(branch + "@{upstream}") 

173 except GitFailed as e: 

174 if "no upstream configured for branch" in str(e): 174 ↛ 176line 174 didn't jump to line 176 because the condition on line 174 was always true

175 return None 

176 raise 

177 

178 def head(self) -> str: 

179 try: 

180 return self.cmd(["git", "symbolic-ref", "HEAD"], quiet=True, stderr=FNULL).strip() 

181 except GitFailed: 

182 return self.rev_parse("HEAD") 

183 

184 def force_checkout(self, branch: str, comment: str = "") -> None: 

185 self.cmd(["git", "checkout", "-f", branch], stderr=FNULL, comment=comment) 

186 

187 def commit(self, ref: str) -> Commit: 

188 log = self.cmd("git log -n1 --no-notes --pretty=raw".split() + [ref, "--"], quiet=True) 

189 return Commit(log=log) 

190 

191 def commits(self, *refs: str, reverse: bool = False) -> List[Commit]: 

192 cmd = ["git", "log", "--topo-order", "-z", "--no-notes", "--pretty=raw"] 

193 if reverse: 

194 cmd.append("--reverse") 

195 cmd.extend(refs) 

196 cmd.append("--") 

197 logs = self.cmd(cmd, quiet=True) 

198 return [Commit(log=log) for log in logs.split("\x00") if log] 

199 

200 def checkout(self, branch: str, *, comment: str = "") -> None: 

201 self.cmd(["git", "checkout", branch], stderr=FNULL, comment=comment) 

202 

203 @property 

204 def continuation(self) -> Path: 

205 return self.gitdir / "continuation.yaml" 

206 

207 def is_clean(self) -> bool: 

208 if self("diff-files", "--name-only", quiet=True): 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true

209 return False 

210 if self.on_orphan_branch(): 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true

211 return True 

212 return not self("diff-index", "--cached", "--name-only", "HEAD", quiet=True) 

213 

214 @property 

215 def cherry_pick_in_progress(self) -> bool: 

216 return (self.gitdir / "CHERRY_PICK_HEAD").exists() 

217 

218 @property 

219 def merge_in_progress(self) -> bool: 

220 return (self.gitdir / "MERGE_HEAD").exists() 

221 

222 def unique_parent(self, commit: Commit) -> Commit: 

223 if len(commit.parents) != 1: 

224 raise MergeFound(f"{commit} is a merge") 

225 return self.commit(commit.parents[0]) 

226 

227 def unique_parent_or_root(self, commit: Commit) -> Commit | None: 

228 if len(commit.parents) == 0: 

229 return None 

230 else: 

231 return self.unique_parent(commit) 

232 

233 def branches(self) -> Iterator[str]: 

234 for line in self.cmd(["git", "for-each-ref", "refs/heads"], quiet=True).splitlines(): 

235 m = re.search(r"\trefs/heads/(.*?)\s*$", line) 

236 assert m 

237 yield m.group(1) 

238 

239 def ref_exists(self, ref: str) -> bool: 

240 return self.cmd_test(["git", "rev-parse", "--verify", "--quiet", ref, "--"]) 

241 

242 def branch_exists(self, branch: str) -> bool: 

243 return self.ref_exists(f"refs/heads/{branch}") 

244 

245 def ls_files(self, *args) -> Iterator[str]: 

246 for line in self.cmd(["git", "ls-files", *args], quiet=True).splitlines(): 

247 yield line.rstrip() 

248 

249 def on_orphan_branch(self) -> bool: 

250 """ 

251 Returns true if HEAD points to a branch name which does not yet 

252 exist. This generally only happens after `git init`, or `git 

253 checkout --orphan`. 

254 """ 

255 try: 

256 head = self.cmd(["git", "symbolic-ref", "HEAD"], quiet=True).strip() 

257 except GitFailed: 

258 return False 

259 return not self.ref_exists(head) 

260 

261 def delete_index_and_files(self): 

262 Output.log_cmd("git ls-files -z | xargs -0 rm") 

263 for file in self.ls_files(): 

264 path = self.directory / file 

265 if os.path.exists(path): 

266 os.unlink(path) 

267 self.cmd(["git", "read-tree", "--empty"]) 

268 

269 def cherry_pick_abort(self) -> None: 

270 if self.cherry_pick_in_progress: 270 ↛ exitline 270 didn't return from function 'cherry_pick_abort' because the condition on line 270 was always true

271 if self.on_orphan_branch(): 

272 Output.log_cmd(["rm", self.gitdir / "CHERRY_PICK_HEAD"]) 

273 (self.gitdir / "CHERRY_PICK_HEAD").unlink() 

274 self.delete_index_and_files() 

275 else: 

276 self.cmd(["git", "cherry-pick", "--abort"]) 

277 

278 def has_unmerged_files(self) -> bool: 

279 return bool(self.cmd(["git", "ls-files", "--unmerged"], quiet=True).strip()) 

280 

281 def unmerged_files(self) -> Set[str]: 

282 lines = self.cmd(["git", "ls-files", "--unmerged"], quiet=True).splitlines() 

283 return {line.strip().split("\t", 1)[1] for line in lines} 

284 

285 def find_remote(self, url: str) -> str | None: 

286 for line in self.cmd(["git", "remote", "-v"], quiet=True).splitlines(): 

287 name, urlpart = line.rstrip().split("\t") 

288 if urlpart == f"{url} (fetch)": 

289 return name 

290 return None 

291 

292 def fetch(self, remote: str): 

293 if remote in self.fetched: 

294 return 

295 self.cmd(["git", "fetch", remote]) 

296 self.fetched.add(remote) 

297 

298 def is_conflicted(self, commit: Commit) -> bool: 

299 if len(commit.parents) < 2: 299 ↛ 300line 299 didn't jump to line 300 because the condition on line 299 was never true

300 return False 

301 if len(commit.parents) > 2: 301 ↛ 302line 301 didn't jump to line 302 because the condition on line 301 was never true

302 raise NotImplementedError # FIXME merge-tree can only take two arguments! 

303 try: 

304 tree = self("merge-tree", "--name-only", *commit.parents, quiet=True).strip() 

305 except GitFailed as e: 

306 if e.rc == 1: 306 ↛ 308line 306 didn't jump to line 308 because the condition on line 306 was always true

307 return True 

308 raise 

309 return not self.cmd_test(["git", "diff", "--quiet", commit.sha, tree, "--"]) 

310 

311 def merge_tree(self, a: str, b: str) -> Tuple[str, Set[str]]: 

312 cmd = ["git", "merge-tree", "--name-only", "--no-messages", "-z", a, b] 

313 p = subprocess.run(cmd, cwd=self.directory, capture_output=True, text=True) 

314 if p.returncode not in [0, 1]: 314 ↛ 315line 314 didn't jump to line 315 because the condition on line 314 was never true

315 raise GitFailed(f"git failed:\n{p.stderr}", rc=p.returncode) 

316 tree, *conflicts = p.stdout.rstrip("\x00").split("\x00") 

317 assert (p.returncode == 0) == (not conflicts) 

318 return tree, set(conflicts) 

319 

320 def checkout_tree(self, tree: str) -> None: 

321 "replace index and working files with the specified tree" 

322 deleted = self("diff", "--diff-filter=A", "--name-only", tree).splitlines() 

323 self("read-tree", tree) 

324 self("checkout", "--", ".") 

325 for rel in deleted: 

326 (self.directory / rel).unlink() 

327 

328 def find_duplicates(self, base: str | None, branch: str, onto: str) -> Iterator[DupRecord]: 

329 "Determine which commits in base..branch are cherry-picked in onto" 

330 if base is None: 330 ↛ 331line 330 didn't jump to line 331 because the condition on line 330 was never true

331 output = self("cherry", onto, branch, quiet=True) 

332 else: 

333 output = self("cherry", onto, branch, base, quiet=True) 

334 for line in output.strip().splitlines(): 

335 sign, sha = line.split(" ", 1) 

336 assert sign in "-+" 

337 yield DupRecord(sign == "+", sha) 

338 

339 def is_ancestor(self, ancestor: str, of: str = "HEAD") -> bool: 

340 "Return True if ancestor is reachable from descendant" 

341 return self.cmd_test(["git", "merge-base", "--is-ancestor", ancestor, of]) 

342 

343 def abbrev(self, ref: str) -> str: 

344 "Return abbreviated sha for ref" 

345 return self.cmd(["git", "rev-parse", "--short", ref], quiet=True).strip()