Coverage for src / gitq / git_swap.py: 92%

224 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-15 15:32 -0400

1#!/usr/bin/env python3 

2 

3import os 

4import sys 

5from contextlib import contextmanager 

6from typing import List, Optional, Iterator, TypeVar, NoReturn 

7import argparse 

8from textwrap import dedent 

9from dataclasses import dataclass, field 

10 

11from .continuations import ( 

12 Abort, 

13 CheckoutBaseline, 

14 cherry_pick, 

15 Continuation, 

16 EditBranch, 

17 PickCherries, 

18 Resume, 

19 Suspend, 

20) 

21from . import continuations 

22from .output import Output 

23from .git import Git, UserError, GitFailed, MergeFound, split_author, Commit 

24from .queue import Queue, NotAQueue 

25 

26description = """Swaps COMMIT with COMMIT^ (i.e. moves COMMIT one step earlier in history), 

27while holding the final content constant. 

28 

29This tool re-orders commits like git rebase -i, but is easier to use. In 

30particular, conflicts must often be resolved twice when using rebase, but 

31not with git-swap. 

32""" 

33 

34 

35T = TypeVar("T") 

36 

37 

38class SwapFailed(Exception): 

39 "Swap Failed." 

40 

41 

42class Stop(Resume): 

43 """ 

44 Raised into a resume stack by `git swap --stop`. This will abandon the 

45 most recent swap operation and push everything back onto the branch. 

46 """ 

47 

48 

49class Squash(Resume): 

50 """ 

51 Raised into a resume stack by `git swap --squash`. This will replace 

52 the most recent swap operation with a squash, and then push everything 

53 back onto the branch. 

54 """ 

55 

56 

57class Fixup(Resume): 

58 """ 

59 Raised into a resume stack by `git swap --fixup`. This will replace 

60 the most recent swap operation with a fixup, and then push everything 

61 back onto the branch. 

62 """ 

63 

64 

65@dataclass 

66class PickCherryWithReference(Continuation): 

67 """ 

68 Pick a cherry, resolving conflicts using a reference commit. When we swap the 

69 order of two commits, we want the resulting tree to be the same. This means 

70 the user should only need to resolve conflicts once, when the now-first commit 

71 is applied. 

72 """ 

73 

74 cherry: str 

75 reference: str 

76 

77 @contextmanager 

78 def impl(self) -> Iterator[None]: 

79 yield 

80 self.git.checkout_tree(self.reference) 

81 self.git("commit", "--allow-empty", "--reuse-message", self.cherry) 

82 

83 

84@dataclass 

85class OrSquash(Continuation): 

86 "Handle the case when the user calls `git swap --squash`, etc.." 

87 

88 head: str 

89 stop: bool 

90 

91 @contextmanager 

92 def impl(self) -> Iterator[None]: 

93 try: 

94 yield 

95 except Fixup: 

96 A = self.git.commit(self.head) 

97 B = self.git.unique_parent(A) 

98 C = self.git.unique_parent_or_root(B) 

99 with CheckoutBaseline(C.sha if C else None): 

100 self.git.checkout_tree(A.sha) 

101 self.git.cmd(["git", "commit", "--allow-empty", "--reuse-message", B.sha]) 

102 if self.stop: 

103 raise Stop 

104 except Squash: 

105 A = self.git.commit(self.head) 

106 B = self.git.unique_parent(A) 

107 C = self.git.unique_parent_or_root(B) 

108 with CheckoutBaseline(C.sha if C else None): 

109 self.git.checkout_tree(A.sha) 

110 author = split_author(B.author) 

111 env = dict(os.environ) 

112 env.update( 

113 { 

114 "GIT_AUTHOR_NAME": author.name, 

115 "GIT_AUTHOR_EMAIL": author.email, 

116 "GIT_AUTHOR_DATE": author.date, 

117 } 

118 ) 

119 message = self.git.gitdir / "COMMIT_EDITMSG" 

120 with open(message, "w") as f: 

121 f.write(B.message) 

122 f.write("\n\n") 

123 f.write(A.message) 

124 cmd = ["git", "commit", "--allow-empty", "--edit", "-F", message] 

125 self.git.cmd(cmd, env=env, interactive=True) 

126 if self.stop: 

127 raise Stop 

128 except Stop: 

129 raise # handled by KeepGoing 

130 except Resume: 

131 raise NotImplementedError 

132 

133 

134@dataclass 

135class SwapCheckpoint(Continuation): 

136 "Restore git state if swap failed." 

137 

138 head: str 

139 

140 @contextmanager 

141 def impl(self) -> Iterator[None]: 

142 try: 

143 yield 

144 except (Exception, Resume): 

145 Output.print("# reset back to before attempted swap") 

146 self.git.force_checkout(self.head) 

147 raise 

148 

149 

150@dataclass 

151class KeepGoing(Continuation): 

152 "After ...AB has been swapped to ...BA, keep trying to push B down further." 

153 

154 baselines: List[str] 

155 cherries: List[str] = field(default_factory=list) 

156 edit: bool = field(default=False) 

157 

158 @contextmanager 

159 def impl(self) -> Iterator[None]: 

160 try: 

161 yield # swap 

162 

163 while True: 

164 A = self.git.commit("HEAD") 

165 B = self.git.unique_parent(A) 

166 self.cherries = [A.sha] + self.cherries 

167 self.git.checkout(B.sha) 

168 swap_or_squash(edit=self.edit, git=self.git, baselines=self.baselines, stop=True) 

169 

170 except (SwapFailed, MergeFound, Stop): 

171 for cherry in self.cherries: 

172 cherry_pick(self.git.commit(cherry)) 

173 return 

174 

175 

176@dataclass 

177class KeepGoingUp(Continuation): 

178 

179 cherries: List[str] 

180 edit: bool = field(default=False) 

181 

182 @contextmanager 

183 def impl(self) -> Iterator: 

184 try: 

185 yield # check out base commit 

186 while self.cherries: 186 ↛ 192line 186 didn't jump to line 192 because the condition on line 186 was always true

187 cherry, *self.cherries = self.cherries 

188 self.git.cmd(["git", "cherry-pick", "--quiet", "--allow-empty", cherry]) 

189 swap_or_squash(git=self.git, edit=self.edit, baselines=[], stop=True) 

190 except (Stop, SwapFailed): 

191 pass 

192 for cherry in self.cherries: 

193 self.git.cmd(["git", "cherry-pick", "--quiet", "--allow-empty", cherry]) 

194 

195 

196def collect_cherries(commit: Optional[Commit], *, git: Git) -> List[str]: 

197 if not commit: 197 ↛ 198line 197 didn't jump to line 198 because the condition on line 197 was never true

198 return list() 

199 cherries: List[str] = list() 

200 head = git.commit("HEAD") 

201 while True: 

202 if head.sha == commit.sha: 

203 return list(reversed(cherries)) 

204 cherries.append(head.sha) 

205 try: 

206 head = git.unique_parent(head) 

207 except MergeFound as e: 

208 raise UserError(f"Error: {e}") from e 

209 

210 

211@contextmanager 

212def edit_commit(commit: Optional[Commit], *, git: Git, edit: bool = False): 

213 "Move HEAD to the specified commit, yield, then cherry-pick everything above it." 

214 if not commit: 

215 yield 

216 return 

217 cherries = collect_cherries(commit, git=git) 

218 git.checkout(commit.sha) 

219 with PickCherries(cherries=cherries, edit=edit): 

220 yield 

221 

222 

223def swap(*, git: Git, edit: bool = False, baselines: List[str]) -> None: 

224 "Swap HEAD with HEAD^." 

225 one = git.commit("HEAD") 

226 try: 

227 two = git.unique_parent(one) 

228 three = git.unique_parent_or_root(two) 

229 except MergeFound as e: 

230 raise SwapFailed(f"Swap failed: {e}") from e 

231 if two.sha in baselines: 

232 raise SwapFailed("hit baseline") 

233 with SwapCheckpoint(head=one.sha): 

234 with CheckoutBaseline(three.sha if three else None): 

235 with PickCherryWithReference(cherry=two.sha, reference=one.sha): 235 ↛ 236line 235 didn't jump to line 236 because

236 try: 

237 cherry_pick(one, edit=edit) 

238 except GitFailed as e: 

239 raise SwapFailed(f"Swap failed: {e}") from e 

240 except Suspend as e: 

241 e.status = dedent( 

242 f""" 

243 Attempting to swap: 

244 {one.summary} 

245 {two.summary} 

246 """ 

247 ) 

248 raise 

249 

250 

251def swap_or_squash(*, edit: bool = False, git: Git, baselines: List[str], stop: bool) -> None: 

252 "Swap HEAD or HEAD^, or squash them together if the user resumes with `--squash`." 

253 head = git.commit("HEAD") 

254 with OrSquash(head=head.sha, stop=stop): 

255 swap(edit=edit, git=git, baselines=baselines) 

256 

257 

258class Main(continuations.Main): 

259 

260 tool = "git-swap" 

261 suspend_message = "Suspended! Resolve conflicts and resume with `git swap --continue`" 

262 

263 def __call__(self) -> NoReturn: 

264 try: 

265 super().__call__() 

266 sys.exit(1) 

267 except SwapFailed as e: 

268 Output.print(e) 

269 sys.exit(1) 

270 

271 def main(self) -> None: 

272 

273 parser = argparse.ArgumentParser( 

274 description=description, formatter_class=argparse.RawDescriptionHelpFormatter 

275 ) 

276 parser.add_argument( 

277 "--keep-going", 

278 "-k", 

279 action="store_true", 

280 help="push COMMIT as far down (or up) the stack as it will go", 

281 ) 

282 parser.add_argument( 

283 "--continue", 

284 "-c", 

285 action="store_true", 

286 dest="resume", 

287 help="resume after conflicts have been resolved", 

288 ) 

289 parser.add_argument( 

290 "--up", action="store_true", help="swap the given commit with the one above it" 

291 ) 

292 parser.add_argument( 

293 "--abort", action="store_true", help="give up and restore git to original state" 

294 ) 

295 parser.add_argument( 

296 "--stop", action="store_true", help="abandon the latest swap operation, and continue" 

297 ) 

298 parser.add_argument( 

299 "--squash", action="store_true", help="squash instead of completing this swap" 

300 ) 

301 parser.add_argument( 

302 "--fixup", action="store_true", help="fixup instead of completing this swap" 

303 ) 

304 parser.add_argument( 

305 "--edit", 

306 "-e", 

307 action="store_true", 

308 help="if conflicts arise, suspend so the user can resolve them", 

309 ) 

310 parser.add_argument("--status", action="store_true", help="print status") 

311 parser.add_argument( 

312 "commit", 

313 nargs="?", 

314 metavar="COMMIT", 

315 help="swap COMMIT with COMMIT^. defaults to HEAD", 

316 ) 

317 args = parser.parse_args() 

318 

319 mode_args = (args.resume, args.abort, args.stop, args.squash, args.fixup, args.status) 

320 if sum(bool(x) for x in mode_args) > 1: 320 ↛ 321line 320 didn't jump to line 321 because the condition on line 320 was never true

321 parser.error("use only one of --continue, --abort, --stop, --status, or --squash") 

322 

323 if args.status: 323 ↛ 324line 323 didn't jump to line 324 because the condition on line 323 was never true

324 self.status() 

325 return 

326 

327 if args.resume or args.abort or args.stop or args.squash or args.fixup: 

328 resume: BaseException | None = None 

329 if args.abort: 329 ↛ 330line 329 didn't jump to line 330 because the condition on line 329 was never true

330 resume = Abort() 

331 elif args.stop: 

332 resume = Stop() 

333 elif args.squash: 

334 resume = Squash() 

335 elif args.fixup: 

336 resume = Fixup() 

337 self.resume(resume) 

338 return 

339 

340 with self.setup(): 

341 upstream = self.git.upstream("HEAD") 

342 with EditBranch(message="git-swap"): 

343 if args.up: 

344 self.swap_up(args) 

345 else: 

346 baselines = [upstream] if args.keep_going and upstream else [] 

347 try: 

348 baselines = list(Queue(self.git).baselines_for_swap()) 

349 except NotAQueue: 

350 pass 

351 self.swap_down(args, baselines) 

352 

353 def swap_down(self, args, baselines: List[str]) -> None: 

354 commit = self.git.commit(args.commit) if args.commit else None 

355 with edit_commit(commit, git=self.git): 

356 if args.keep_going: 

357 with KeepGoing(edit=args.edit, baselines=baselines): 

358 swap_or_squash(edit=args.edit, git=self.git, baselines=baselines, stop=True) 

359 else: 

360 swap_or_squash(edit=args.edit, git=self.git, baselines=baselines, stop=False) 

361 

362 def swap_up(self, args) -> None: 

363 if not args.commit: 363 ↛ 364line 363 didn't jump to line 364 because the condition on line 363 was never true

364 raise UserError("specify a commit") 

365 commit = self.git.commit(args.commit) 

366 cherries = collect_cherries(commit, git=self.git) 

367 if not cherries: 367 ↛ 368line 367 didn't jump to line 368 because the condition on line 367 was never true

368 raise UserError("commit is already at HEAD") 

369 if args.keep_going: 

370 with KeepGoingUp(edit=args.edit, cherries=cherries): 

371 self.git.checkout(commit.sha) 

372 else: 

373 with edit_commit(self.git.commit(cherries[0]), git=self.git): 

374 swap_or_squash(edit=args.edit, git=self.git, baselines=[], stop=False) 

375 

376 

377main = Main() 

378 

379if __name__ == "__main__": 379 ↛ 380line 379 didn't jump to line 380 because the condition on line 379 was never true

380 main()