Coverage for src / gitq / queue.py: 90%

346 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 11:23 -0400

1from dataclasses import dataclass, field 

2from typing import List, Iterator 

3from io import StringIO 

4from pathlib import Path 

5from contextlib import contextmanager 

6from functools import cached_property 

7 

8import yaml 

9 

10from .output import Output 

11from .git import Git, Commit, GitFailed, UserError, contextGit 

12from .continuations import ( 

13 EditBranch, 

14 PickCherries, 

15 Step, 

16 Then, 

17 CheckoutBranch, 

18 progn, 

19 Continuation, 

20 Suspend, 

21 Resume, 

22) 

23from .yaml import YAMLObject, BaseLoader 

24 

25 

26class Loader(BaseLoader): 

27 pass 

28 

29 

30class Dumper(yaml.Dumper): 

31 pass 

32 

33 

34@dataclass 

35class Baseline(YAMLObject): 

36 

37 yaml_loader = Loader 

38 yaml_dumper = Dumper 

39 

40 sha: str 

41 ref: str | None = field(default=None) 

42 remote: str | None = field(default=None) 

43 

44 

45yaml.add_path_resolver("!QueueFile", [], Loader=Loader, Dumper=Dumper) 

46yaml.add_path_resolver("!Baseline", ["baselines", None], Loader=Loader, Dumper=Dumper) 

47 

48 

49@dataclass 

50class QueueFile(YAMLObject): 

51 

52 yaml_loader = Loader 

53 yaml_dumper = Dumper 

54 

55 title: str | None = field(default=None) 

56 description: str | None = field(default=None) 

57 baselines: List[Baseline] = field(default_factory=list) 

58 

59 

60# These can appear in continuation files (e.g. as RebaseOne.onto), so register there too. 

61Continuation.register(Baseline) 

62Continuation.register(QueueFile) 

63 

64 

65def message(m: str, title: str | None): 

66 trailers = "Tool: gitq" 

67 if title: 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true

68 return f"{m}: {title}\n\n{trailers}" 

69 else: 

70 return f"{m}\n\n{trailers}" 

71 

72 

73# FIXME this was used improperly as a test for baseline commits. Check 

74# existing callers to figure out what they really mean by it and make it 

75# more specific. 

76 

77 

78def from_this_tool(c: Commit) -> bool: 

79 return c.message.rstrip().endswith("\nTool: gitq") 

80 

81 

82def is_merged_baseline(c: Commit) -> bool: 

83 m = c.message.strip() 

84 return m.endswith("\nTool: gitq") and ( 

85 m.startswith("baseline") or m.startswith("merged baselines") 

86 ) 

87 

88 

89class NotAQueue(UserError): 

90 pass 

91 

92 

93class Queue: 

94 

95 git: Git 

96 qf: QueueFile 

97 

98 queuefile_name = ".git-queue" 

99 

100 @property 

101 def queuefile_path(self) -> Path: 

102 return self.git.directory / self.queuefile_name 

103 

104 def __init__(self, git: Git, *, qf: QueueFile | None = None): 

105 self.git = git 

106 if qf: 

107 self.qf = qf 

108 else: 

109 if not self.queuefile_path.exists(): 

110 raise NotAQueue("This branch is not a queue.") 

111 with open(self.queuefile_path, "r") as f: 

112 self.qf = yaml.load(f, Loader=Loader) 

113 

114 def save_queuefile( 

115 self, *, amend: bool = False, commit_message: str = "", stage: bool | None = None 

116 ): 

117 assert amend + (stage is not None) + bool(commit_message) == 1 

118 with open(self.queuefile_path, "w") as f: 

119 yaml.dump(self.qf, f, Dumper=Dumper) 

120 if stage or amend or commit_message: 120 ↛ 122line 120 didn't jump to line 122 because the condition on line 120 was always true

121 self.git("add", self.queuefile_path.relative_to(self.git.directory)) 

122 if amend: 

123 self.git("commit", "--amend", "--allow-empty", "-C", "HEAD") 

124 elif commit_message: 124 ↛ exitline 124 didn't return from function 'save_queuefile' because the condition on line 124 was always true

125 self.git("commit", "--allow-empty", "-m", commit_message) 

126 

127 def init(self): 

128 self.git("commit", "--allow-empty", "-m", message("initialized queue", self.qf.title)) 

129 self.save_queuefile(amend=True) 

130 

131 def init_new_branch(self, branch: str): 

132 self.git.detach() 

133 self.save_queuefile(commit_message="new queue branch") 

134 progn(MergeBaselines(self.qf), NewBranch(branch)) 

135 

136 @staticmethod 

137 def find_user_merges(commits: List[Commit]) -> Iterator[Commit]: 

138 """ 

139 Find user merges as any merge commit below the "merged baseline" commits 

140 Takes list of commits as output by: 

141 

142 git log --topo-order queue ^baseline... 

143 """ 

144 # find user merges as any merge commit below the "merged baseline" commits 

145 baseline_shas: set[str] = set() 

146 for c in commits: 

147 if is_merged_baseline(c): 

148 baseline_shas.add(c.sha) 

149 if c.sha in baseline_shas: 

150 for p in c.parents: 

151 baseline_shas.add(p) 

152 for c in commits: 

153 if c.is_merge and c.sha in baseline_shas and not is_merged_baseline(c): 

154 yield c 

155 

156 def find_patches(self, ref: str, baselines: List[Baseline], new_base: str) -> Iterator[Commit]: 

157 if self.git.on_orphan_branch(): 157 ↛ 158line 157 didn't jump to line 158 because the condition on line 157 was never true

158 return 

159 commits = self.git.commits(*(f"^{b.sha}" for b in baselines), ref, reverse=True) 

160 user_merges = {c.sha for c in self.find_user_merges(list(reversed(commits)))} 

161 base = self.find_git_cherry_limit(commits) 

162 # We use the + side instead of the - side of the `git cherry` 

163 # output to detect duplicates, because if we used the - side, then 

164 # it would only filter out distinct (different sha) commits that 

165 # are duplicated, but it does not filter out commits that are 

166 # literally present (same sha) in both branch and new_base. 

167 new = set(r.sha for r in self.git.find_duplicates(base, ref, new_base) if r.is_new) 

168 for commit in commits: 

169 if commit.sha in user_merges: 

170 continue 

171 if from_this_tool(commit): 

172 continue 

173 if commit.is_merge: 

174 if self.git.is_conflicted(commit): 

175 raise UserError(f"rebasing merges is not implemented yet: {commit.summary}") 

176 continue 

177 if commit.sha not in new: 

178 continue 

179 changed = self.git("show", "--name-only", "--pretty=", commit.sha, quiet=True).strip() 

180 if changed == self.queuefile_name: 

181 continue 

182 yield commit 

183 

184 def baselines_for_swap(self) -> Iterator[str]: 

185 "return a list of shas that git-swap should not proceed past" 

186 for b in self.qf.baselines: 

187 yield b.sha 

188 commits = self.git.commits(*(f"^{b.sha}" for b in self.qf.baselines), "HEAD", reverse=True) 

189 for commit in commits: 

190 if from_this_tool(commit): 

191 yield commit.sha 

192 

193 def find_git_cherry_limit(self, commits: List[Commit]) -> str | None: 

194 "Find the 'baseline' or 'merged baselines' commit in the queue" 

195 merges = [c.sha for c in commits if is_merged_baseline(c)] 

196 if len(merges) == 0: 

197 # See below, just pick some limit. Can't return them all 

198 return commits[0].parents[0] if commits[0].parents else None 

199 bases = self.git("merge-base", "--independent", *merges, quiet=True).strip().splitlines() 

200 # This is only used to provide a limit to `git cherry`. If there 

201 # are multiple baselines, then `git cherry` may produce additional 

202 # output for baseline commits that should have been excluded. But 

203 # that does not actually matter much, because `git cherry` is only 

204 # used to filter out commits from the list produced by `git log`, 

205 # and `git log` can take multiple limits. 

206 # 

207 # Alternatively, we could create a throwaway merge here and use 

208 # that as the limit. 

209 return bases[0] 

210 

211 def rebase(self, onto: List[Baseline] | None = None) -> None: 

212 with Output.heading("rebasing"): 

213 Rebase(onto).run() 

214 

215 # TODO if baseline is a remote branch, but there is a local branch 

216 # tracking it, detect that. 

217 

218 @classmethod 

219 def needs_rebase(cls, ref: str | None) -> bool: 

220 "Return True if the local queue branch at ref has baselines that have been updated." 

221 if ref is None or not ref.startswith("refs/heads/"): 

222 return False 

223 git = contextGit.get() 

224 try: 

225 content = git("show", f"{ref}:{cls.queuefile_name}", quiet=True) 

226 except GitFailed: 

227 return False 

228 qf = yaml.load(StringIO(content), Loader=Loader) 

229 for b in qf.baselines: 

230 if refresh_baseline(b, git=git).sha != b.sha: 

231 return True 

232 return False 

233 

234 

235@dataclass 

236class RebaseBranch(Step): 

237 "Temporarily checkout a the specified branch and rebase it." 

238 

239 ref: str 

240 

241 def run(self): 

242 with Output.heading(f"rebasing branch {self.ref}"), CheckoutBranch(self.ref): 

243 Rebase().run() 

244 

245 

246@dataclass 

247class RebaseOne(Step): 

248 "Rebase a single branch (not recursive)." 

249 

250 onto: List[Baseline] | None 

251 

252 def run(self): 

253 q = Queue(self.git) 

254 

255 old_baselines = q.qf.baselines 

256 if self.onto is None: 

257 self.onto = q.qf.baselines 

258 

259 # FIXME these should not be re-refreshed every time this resumes 

260 q.qf.baselines = [refresh_baseline(b, git=self.git) for b in self.onto] 

261 with EditBranch(message="git-queue rebase") as branch: 

262 progn(MergeBaselines(q.qf), FindAndPickCherries(branch, old_baselines)) 

263 

264 

265@dataclass 

266class FindAndPickCherries(Step): 

267 "Find patches in branch, and thn apply them to HEAD" 

268 

269 branch: str 

270 old_baselines: List[Baseline] 

271 

272 def run(self) -> None: 

273 q = Queue(self.git) 

274 patches = list(q.find_patches(self.branch, self.old_baselines, "HEAD")) 

275 with PickCherries(cherries=[b.sha for b in patches], edit=True): 

276 pass 

277 

278 

279@dataclass 

280class NewBranch(Step): 

281 name: str 

282 

283 def run(self) -> None: 

284 self.git("branch", self.name, "HEAD") 

285 self.git.checkout(self.name) 

286 

287 

288@dataclass 

289class Rebase(Step): 

290 """ 

291 Recursively rebase a queue branch 

292 * First, rebase any baselines which are also queue branches 

293 * Then rebase the current branch 

294 """ 

295 

296 onto: None | List[Baseline] = field(default=None) 

297 

298 def run(self) -> None: 

299 steps: List[Step] = list() 

300 

301 q = Queue(self.git) 

302 for b in q.qf.baselines: 

303 if q.needs_rebase(b.ref): 

304 assert b.ref 

305 steps.append(RebaseBranch(b.ref)) 

306 

307 steps.append(RebaseOne(onto=self.onto)) 

308 

309 with Then(steps=steps): 

310 pass 

311 

312 

313@dataclass 

314class MergeContinue(Continuation): 

315 """ 

316 When resuming, check if the user ran `git commit`, and do it for them 

317 if they haven't. 

318 """ 

319 

320 @contextmanager 

321 def impl(self) -> Iterator: 

322 try: 

323 yield 

324 except (Exception, Resume): 

325 self.git("merge", "--abort") 

326 raise 

327 if self.git.merge_in_progress: 

328 if self.git.has_unmerged_files(): 328 ↛ 329line 328 didn't jump to line 329 because the condition on line 328 was never true

329 Output.print("The index still has unmerged files.") 

330 raise Suspend(status="resolve conflicts and continue") 

331 self.git("commit", "--no-edit") 

332 

333 

334@dataclass 

335class MergeBaselines(Step, Continuation): 

336 

337 qf: QueueFile 

338 user_merges: List[str] = field(default_factory=list) 

339 find_user_merges: bool = True 

340 needs_checkout: bool = True 

341 suspended_at: str | None = None 

342 

343 def run(self) -> None: 

344 with self: 

345 pass 

346 

347 @contextmanager 

348 def impl(self) -> Iterator: 

349 self.check_user_merges() 

350 yield 

351 if self.suspended_at: 

352 # If continued after asking the user to make a merge, pick it 

353 # up and add it to the list of user merges, and go back to the 

354 # commit we were at before. 

355 self.user_merges.append(self.git.rev_parse("HEAD")) 

356 self.git.checkout(self.suspended_at) 

357 self.suspended_at = None 

358 with Output.heading("merge baselines"): 

359 self.merge_baselines() 

360 

361 @cached_property 

362 def q(self): 

363 return Queue(self.git, qf=self.qf) 

364 

365 def still_needed(self) -> Iterator[Baseline]: 

366 "return a list of baselines that have not yet been merged" 

367 for baseline in self.qf.baselines: 

368 if not self.git.is_ancestor(baseline.sha): 

369 yield baseline 

370 

371 @cached_property 

372 def m(self) -> str: 

373 return message("merged baselines", self.qf.title) 

374 

375 def check_user_merges(self): 

376 """ 

377 Find user merges in HEAD. Ensure that all user merges are clean, 

378 that is they do not introduce any new commits outside of the 

379 baselines. 

380 """ 

381 if not self.find_user_merges: 

382 return 

383 self.find_user_merges = False 

384 

385 # Find user merges in HEAD 

386 q = Queue(self.git) # This is the OLD queue, before baselines have been updated 

387 commits = self.git.commits("HEAD", *(f"^{b.sha}" for b in q.qf.baselines)) 

388 self.user_merges.extend(c.sha for c in Queue.find_user_merges(commits)) 

389 

390 # Check that they're clean, using the NEW queue 

391 clean = list() 

392 for u in self.user_merges: 

393 ancestors = self.git.commits( 

394 u, 

395 *(f"^{b.sha}" for b in self.qf.baselines), 

396 ) 

397 if {a.sha for a in ancestors} <= {u for u in self.user_merges}: 397 ↛ 400line 397 didn't jump to line 400 because the condition on line 397 was always true

398 clean.append(u) 

399 else: 

400 Output.print(f"user merge {u} is not clean, can't use it") 

401 self.user_merges = clean 

402 

403 def merge_baselines(self) -> None: 

404 q = self.q 

405 

406 if len(self.user_merges) > 1: 

407 self.user_merges = ( 

408 self.git("merge-base", "--independent", *self.user_merges, quiet=True) 

409 .strip() 

410 .splitlines() 

411 ) 

412 

413 # First, check out one of the baselines so there's something to 

414 # merge into 

415 if self.needs_checkout: 

416 self.git.checkout(self.qf.baselines[0].sha, comment="baseline") 

417 self.needs_checkout = False 

418 

419 needed = list(self.still_needed()) 

420 if not needed: 

421 q.save_queuefile(commit_message=message("baseline", q.qf.title)) 

422 return 

423 

424 # try octopus merge first 

425 try: 

426 self.git("merge", "--no-ff", *(b.sha for b in needed), "-m", self.m) 

427 except GitFailed: 

428 if (self.git.gitdir / "MERGE_HEAD").exists(): 

429 if self.git.unmerged_files() == {q.queuefile_name}: 

430 q.save_queuefile(commit_message=self.m) 

431 return 

432 self.git("merge", "--abort") 

433 else: 

434 q.save_queuefile(amend=True) 

435 return 

436 

437 # try octopus with user merges 

438 try: 

439 self.git("merge", "--no-ff", *(b.sha for b in needed), *self.user_merges, "-m", self.m) 

440 except GitFailed: 

441 if (self.git.gitdir / "MERGE_HEAD").exists(): 

442 if self.git.unmerged_files() == {q.queuefile_name}: 442 ↛ 443line 442 didn't jump to line 443 because the condition on line 442 was never true

443 q.save_queuefile(commit_message=self.m) 

444 return 

445 self.git("merge", "--abort") 

446 else: 

447 q.save_queuefile(amend=True) 

448 return 

449 

450 # merge one at a time 

451 while needed: 

452 baseline = needed.pop(0) 

453 try: 

454 self.git("merge", "--no-ff", baseline.sha, "-m", self.m) 

455 except GitFailed: 

456 if not self.git.merge_in_progress: 456 ↛ 457line 456 didn't jump to line 457 because the condition on line 456 was never true

457 raise 

458 if self.git.unmerged_files() == {q.queuefile_name}: 

459 q.save_queuefile(commit_message=self.m) 

460 continue 

461 self.git("merge", "--abort") 

462 else: 

463 q.save_queuefile(amend=True) 

464 continue 

465 # Oh, no! A conflict! 

466 self.resolve_conflicts(baseline) 

467 needed = list(self.still_needed()) 

468 

469 def would_conflict(self, a: str, b: str) -> bool: 

470 _, conflicts = self.git.merge_tree(a, b) 

471 return not (conflicts <= {Queue.queuefile_name}) 

472 

473 def resolve_conflicts(self, baseline: Baseline): 

474 

475 head = self.git.rev_parse("HEAD") 

476 to_merge = baseline.sha 

477 

478 # Try to find a user merge that can resolve the conflict 

479 for u in self.user_merges: 

480 contains_baseline = self.git.is_ancestor(baseline.sha, of=u) 

481 if self.git.rev_parse("HEAD") != head: 481 ↛ 482line 481 didn't jump to line 482 because the condition on line 481 was never true

482 self.git.checkout(head) 

483 

484 # try to merge u 

485 try: 

486 self.git("merge", "--no-ff", u, "-m", self.m) 

487 except GitFailed: 

488 if not self.git.merge_in_progress: 488 ↛ 489line 488 didn't jump to line 489 because the condition on line 488 was never true

489 raise 

490 if not (self.git.unmerged_files() <= {Queue.queuefile_name}): 490 ↛ 499line 490 didn't jump to line 499 because the condition on line 490 was always true

491 self.git("merge", "--abort") 

492 if contains_baseline: 492 ↛ 498line 492 didn't jump to line 498 because the condition on line 492 was always true

493 # It looks like the user has already resolved one conflict with 

494 # baseline, but there is still a conflict. Instead of asking 

495 # the user to merge baseline, ask them to merge their previous 

496 # commit incorporating baseline, so we keep making progress. 

497 to_merge = u 

498 continue 

499 self.q.save_queuefile(commit_message=self.m) 

500 else: 

501 self.q.save_queuefile(amend=True) 

502 

503 # u contains baseline, and u is merged. Conflict is resolved. 

504 if contains_baseline: 

505 return 

506 

507 # See if baseline will merge now 

508 try: 

509 self.git("merge", "--no-ff", baseline.sha, "-m", self.m) 

510 except GitFailed: 

511 if not self.git.merge_in_progress: 511 ↛ 512line 511 didn't jump to line 512 because the condition on line 511 was never true

512 raise 

513 self.git("merge", "--abort") 

514 continue 

515 else: 

516 self.q.save_queuefile(amend=True) 

517 return 

518 

519 if self.git.rev_parse("HEAD") != head: 519 ↛ 520line 519 didn't jump to line 520 because the condition on line 519 was never true

520 self.git.checkout(head) 

521 

522 # to_merge conflicts with HEAD. Find an appropriate (not a "merged 

523 # baselines") commit in HEAD which it conflicts with, and ask the 

524 # user to resolve the conflict. 

525 for commit in self.git.commits("HEAD", *(f"^{b.sha}^@" for b in self.qf.baselines)): 525 ↛ 531line 525 didn't jump to line 531 because the loop on line 525 didn't complete

526 if is_merged_baseline(commit): 

527 continue 

528 if self.would_conflict(to_merge, commit.sha): 

529 break 

530 else: 

531 raise Exception 

532 

533 self.suspended_at = head 

534 self.git.checkout(commit.sha, comment="baseline") 

535 try: 

536 self.git("merge", "-m", "resolved conflicts", to_merge) 

537 raise Exception("merge succeeded, but expected failure") 

538 except GitFailed: 

539 if not self.git.merge_in_progress: 539 ↛ 540line 539 didn't jump to line 540 because the condition on line 539 was never true

540 raise 

541 if Queue.queuefile_name in self.git.unmerged_files(): 541 ↛ 542line 541 didn't jump to line 542 because the condition on line 541 was never true

542 self.git("rm", "-f", Queue.queuefile_name) 

543 

544 # suspend to allow the user to resolve the conflict 

545 with MergeContinue(): 

546 raise Suspend(status="resolve conflicts and continue") 

547 

548 

549def refresh_baseline(baseline: Baseline, *, git: Git) -> Baseline: 

550 if baseline.ref is None: 

551 return baseline 

552 elif baseline.remote: 552 ↛ 553line 552 didn't jump to line 553 because the condition on line 552 was never true

553 if baseline.ref.startswith("refs/heads/") and (remote := git.find_remote(baseline.remote)): 

554 git.fetch(remote) 

555 branch = baseline.ref.removeprefix("refs/heads/") 

556 fetched = f"refs/remotes/{remote}/{branch}" 

557 else: 

558 git.cmd(["git", "fetch", baseline.remote, baseline.ref]) 

559 fetched = "FETCH_HEAD" 

560 return Baseline(git.commit(fetched).sha, baseline.ref, baseline.remote) 

561 else: 

562 return Baseline(git.commit(baseline.ref).sha, baseline.ref, None)