Coverage for src / gitq / continuations.py: 90%

268 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-15 15:32 -0400

1import sys 

2import yaml 

3from typing import Optional, List, TypeVar, ContextManager, Generic, Iterator, NoReturn, Type 

4from contextlib import contextmanager 

5from itertools import count 

6from abc import abstractmethod 

7from dataclasses import dataclass, field 

8 

9from .output import Output 

10from .git import Git, UserError, GitFailed, contextGit, Commit 

11from .yaml import YAMLObject, BaseLoader 

12 

13 

14class Loader(BaseLoader): 

15 "YAML loader for .git/continuation.yaml" 

16 

17 pass 

18 

19 

20class Dumper(yaml.Dumper): 

21 "YAML dumper for .git/continuation.yaml" 

22 

23 pass 

24 

25 

26@dataclass 

27class Continuations(YAMLObject): 

28 "Root object for .git/continuation.yaml" 

29 

30 yaml_loader = Loader 

31 yaml_dumper = Dumper 

32 continuations: List[Continuation] 

33 tool: str 

34 status: str | None = field(default=None) 

35 

36 

37yaml.add_path_resolver("!Continuations", [], Loader=Loader, Dumper=Dumper) 

38 

39T = TypeVar("T") 

40 

41 

42class Suspend(BaseException): 

43 "Suspend execution and save a stack of continuations in .git/continuation.yaml" 

44 

45 continuations: List["Continuation"] 

46 status: Optional[str] 

47 

48 def __init__(self, *, status: str | None = None) -> None: 

49 super().__init__() 

50 self.status = status 

51 self.continuations = list() 

52 

53 

54class Resume(BaseException): 

55 "Resume execution with some additional instruction from the user." 

56 

57 

58class Abort(Exception): 

59 """ 

60 Raised into a resume stack by `--abort`. This will abort the operation 

61 and restore git to its previous state. 

62 """ 

63 

64 

65class Continuation(Generic[T], YAMLObject): 

66 """ 

67 A continuation is is a context manager that can be suspended, 

68 serialized out to yaml, and then resumed in a subsequent execution of 

69 this program. 

70 

71 This is a very low-tech approach to serializeable continuations, and it 

72 relies on suspendable code being written in a strange idiom to work. 

73 

74 Anything that needs to happen after a resume needs to be expressed as a 

75 stack of `Continuation` instances, rather than ordinary function calls. 

76 

77 A continuation class must: 

78 

79 * be a dataclass with only serializable attributes 

80 

81 * implement a context manager overriding `.impl()` 

82 

83 * be prepared to reconstruct the execution state of `.impl()`, if it 

84 calls anything that might raise Suspend. In other words, there is 

85 no magic here that somehow serializes the python execution state. 

86 Each `Continuation` instance is just going to be reanimated based 

87 on its serializeable attributes, and resume again from the yield. 

88 """ 

89 

90 yaml_loader = Loader 

91 yaml_dumper = Dumper 

92 

93 manager: ContextManager[T] = field(metadata={"yaml_exclude": True}) 

94 

95 def __enter__(self) -> T: 

96 self.manager = self.impl() 

97 try: 

98 return self.manager.__enter__() 

99 except Suspend as exception: 

100 raise Exception("continuations must not suspend before yield") from exception 

101 

102 def __exit__(self, exception_type, exception, traceback) -> bool | None: 

103 if exception is None and exception_type is not None: 103 ↛ 104line 103 didn't jump to line 104 because the condition on line 103 was never true

104 exception = exception_type() 

105 

106 if isinstance(exception, Suspend): 

107 exception.continuations.append(self) 

108 return None 

109 

110 try: 

111 return self.manager.__exit__(exception_type, exception, traceback) 

112 except Suspend as exception: 

113 exception.continuations.append(self) 

114 raise 

115 

116 @abstractmethod 

117 def impl(self) -> ContextManager[T]: 

118 pass 

119 

120 @property 

121 def git(sef) -> Git: 

122 return contextGit.get() 

123 

124 @staticmethod 

125 def register(c: Type[YAMLObject]) -> None: 

126 "register a class for yaml serialization in continuations" 

127 Loader.add_constructor(c.yaml_tag, c.from_yaml) 

128 Dumper.add_representer(c, c.to_yaml) 

129 

130 

131class Main: 

132 

133 tool: str 

134 suspend_message = "Suspended!" 

135 

136 @abstractmethod 

137 def main(self) -> None: 

138 pass 

139 

140 def __call__(self) -> NoReturn: 

141 self.git = Git() 

142 contextGit.set(self.git) 

143 try: 

144 self.main() 

145 except UserError as e: 

146 Output.print(e) 

147 sys.exit(1) 

148 except Abort: 

149 Output.print("Cancelled. Previous state restored.") 

150 sys.exit(0) 

151 

152 @contextmanager 

153 def setup(self) -> Iterator: 

154 if not self.git.is_clean(): 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true

155 raise UserError("Error: repo not clean") 

156 if self.git.continuation.exists(): 156 ↛ 157line 156 didn't jump to line 157 because the condition on line 156 was never true

157 with open(self.git.continuation, "r") as f: 

158 j: Continuations = yaml.load(f, Loader) 

159 raise UserError(f"{j.tool} operation is already in progress.") 

160 try: 

161 yield 

162 except Suspend as e: 

163 self.suspend(e) 

164 except Resume as e: 

165 raise Exception("Internal error. Uncaught Resume") from e 

166 

167 def suspend(self, e: Suspend) -> NoReturn: 

168 if e.status: 

169 Output.print(e.status) 

170 with open(self.git.continuation, "w") as f: 

171 continuations = list(reversed(e.continuations)) 

172 j = Continuations(continuations, self.tool, e.status) 

173 yaml.dump(j, f, Dumper=Dumper) 

174 Output.print(self.suspend_message) 

175 sys.exit(2) 

176 

177 def reanimate(self, continuations: List[Continuation], *, throw: BaseException | None) -> None: 

178 if not len(continuations): 

179 if throw is not None: 

180 raise throw 

181 else: 

182 return 

183 continuation, *continuations = continuations 

184 with continuation: 

185 self.reanimate(continuations, throw=throw) 

186 

187 def resume(self, throw: BaseException | None = None) -> NoReturn: 

188 

189 if not self.git.continuation.exists(): 

190 raise UserError(f"Error: no {self.tool} operation is in progress") 

191 

192 with open(self.git.continuation, "r") as f: 

193 j: Continuations = yaml.load(f, Loader) 

194 

195 # All the commands support continue and abort, so its fine if the 

196 # user calls them from the wrong tool. 

197 if j.tool != self.tool and throw is not None and not isinstance(throw, Abort): 197 ↛ 198line 197 didn't jump to line 198 because the condition on line 197 was never true

198 raise UserError(f"A {j.tool} operation is currently in progress") 

199 

200 self.git.continuation.unlink() 

201 

202 try: 

203 self.reanimate(j.continuations, throw=throw) 

204 except Suspend as e: 

205 self.suspend(e) 

206 except Resume as e: 

207 raise Exception("Internal error. Uncaught Resume") from e 

208 

209 sys.exit(0) 

210 

211 def status(self) -> None: 

212 if not self.git.continuation.exists(): 

213 Output.print("no operation in progress") 

214 return 

215 with open(self.git.continuation, "r") as f: 

216 j: Continuations = yaml.load(f, Loader) 

217 if j.tool != self.tool: 

218 Output.print(f"{j.tool} operation is in progress.") 

219 Output.print(j.status or f"{j.tool} operation is in progress") 

220 

221 

222class Finally(Continuation): 

223 "This should be used instead of try/finally for continuation classes." 

224 

225 @abstractmethod 

226 def cleanup(self) -> None: 

227 pass 

228 

229 @contextmanager 

230 def impl(self) -> Iterator[None]: 

231 try: 

232 yield 

233 except GeneratorExit: 

234 raise 

235 except (Exception, Resume): 

236 self.cleanup() 

237 raise 

238 except BaseException as e: 

239 self.cleanup() 

240 raise Exception(f"Unexpected BaseException: {repr(e)}") 

241 else: 

242 self.cleanup() 

243 

244 

245@dataclass 

246class DeleteTempBranch(Finally): 

247 

248 branch: str 

249 previous_head: str 

250 

251 def cleanup(self) -> None: 

252 if self.git.on_orphan_branch(): 

253 Output.print(f"# reset back to before creating {self.branch} branch") 

254 self.git.force_checkout(self.previous_head) 

255 else: 

256 self.git.detach() 

257 if self.git.branch_exists(self.branch): 

258 self.git.cmd(["git", "branch", "-qD", self.branch]) 

259 

260 

261@contextmanager 

262def TempBranch() -> Iterator[str]: 

263 """ 

264 Create a temporary branch with no content and no parents. 

265 """ 

266 git = contextGit.get() 

267 branches = set(git.branches()) 

268 for n in count(): 268 ↛ 273line 268 didn't jump to line 273 because the loop on line 268 didn't complete

269 branch = f"temp-{n}" 

270 if branch not in branches: 270 ↛ 268line 270 didn't jump to line 268 because the condition on line 270 was always true

271 break 

272 else: 

273 raise AssertionError 

274 

275 with DeleteTempBranch(branch=branch, previous_head=git.head()): 

276 git.cmd(["git", "checkout", "-q", "--orphan", branch]) 

277 git.delete_index_and_files() 

278 yield branch 

279 

280 

281@contextmanager 

282def CheckoutBaseline(sha: str | None): 

283 """ 

284 Checkout a baseline commit, or if argument is None, create a temporary 

285 branch with no history and check that out. 

286 """ 

287 git = contextGit.get() 

288 if sha is None: 

289 with TempBranch(): 

290 yield 

291 else: 

292 git.checkout(sha) 

293 yield 

294 

295 

296@dataclass 

297class EditBranch(Continuation[str]): 

298 """ 

299 Detach from the current branch, so it can be edited without polluting 

300 the reflog with a bunch of intermediate steps. At the end, update the 

301 branch using message, and check it back out again. 

302 """ 

303 

304 message: str 

305 head: str | None = field(default=None) 

306 

307 @property 

308 def branch(self) -> Optional[str]: 

309 if self.head and self.head.startswith("refs/heads/"): 309 ↛ 311line 309 didn't jump to line 311 because the condition on line 309 was always true

310 return self.head.removeprefix("refs/heads/") or None 

311 return None 

312 

313 @contextmanager 

314 def impl(self) -> Iterator[str]: 

315 if self.head is None: 

316 self.head = self.git.head() 

317 self.git.detach() 

318 try: 

319 yield self.head 

320 except (Exception, Resume): 

321 Output.print("# Failed. Resetting to original HEAD") 

322 self.git.force_checkout(self.branch or self.head) 

323 raise 

324 else: 

325 if self.branch: 325 ↛ exitline 325 didn't return from function 'impl' because the condition on line 325 was always true

326 self.git.cmd(["git", "update-ref", "-m", self.message, self.head, "HEAD"]) 

327 self.git.checkout(self.branch, comment="done editing branch") 

328 

329 

330@dataclass 

331class CheckoutBranch(Finally): 

332 "Temporarily checkout ref, then restore to previous HEAD" 

333 

334 branch: str 

335 old_branch: str | None = field(default=None) 

336 

337 def cleanup(self): 

338 if self.old_branch is not None: 338 ↛ exitline 338 didn't return from function 'cleanup' because the condition on line 338 was always true

339 self.git.force_checkout(self.old_branch, comment="restore previous HEAD") 

340 

341 @contextmanager 

342 def impl(self) -> Iterator: 

343 assert self.branch.startswith("refs/heads/") 

344 if self.old_branch is None: 

345 self.old_branch = self.git.head() 

346 if self.old_branch.startswith("refs/heads/"): 346 ↛ 348line 346 didn't jump to line 348 because the condition on line 346 was always true

347 self.old_branch = self.old_branch.removeprefix("refs/heads/") 

348 self.git.checkout(self.branch.removeprefix("refs/heads/"), comment="checkout branch") 

349 with super().impl(): 

350 yield 

351 

352 

353@dataclass 

354class PickCherries(Continuation): 

355 "Yield, then cherry-pick specified commits." 

356 

357 cherries: List[str] 

358 edit: bool = field(default=False) 

359 

360 @contextmanager 

361 def impl(self) -> Iterator[None]: 

362 yield 

363 while self.cherries: 

364 cherry, *self.cherries = self.cherries 

365 cherry_pick(self.git.commit(cherry), edit=self.edit) 

366 

367 

368@dataclass 

369class CherryPickContinue(Continuation): 

370 """ 

371 When resuming, check if the user ran `git cherry-pick --continue`, and 

372 do it for them if they have't. 

373 """ 

374 

375 ref: str 

376 

377 @contextmanager 

378 def impl(self) -> Iterator[None]: 

379 try: 

380 yield 

381 except (Exception, Resume): 

382 self.git.cherry_pick_abort() 

383 raise 

384 if self.git.cherry_pick_in_progress: 384 ↛ exitline 384 didn't return from function 'impl' because the condition on line 384 was always true

385 if self.git.has_unmerged_files(): 

386 Output.print("The index still has unmerged files.") 

387 raise Suspend(status=f"cherry-picking {self.ref}") 

388 self.git.cmd(["git", "cherry-pick", "--continue"]) 

389 

390 

391def cherry_pick(cherry: Commit, *, edit: bool = False) -> None: 

392 "Cherry-pick a single commit. If it fails, suspend so the user can resolve conflicts." 

393 git = contextGit.get() 

394 abbrev = git.abbrev(cherry.sha) 

395 try: 

396 git.cmd(["git", "cherry-pick", "--quiet", "--allow-empty", abbrev], comment=cherry.title) 

397 except GitFailed: 

398 if edit and git.cherry_pick_in_progress: 

399 with CherryPickContinue(ref=cherry.sha): 

400 raise Suspend(status=f"cherry-picking {abbrev} {cherry.title}") 

401 else: 

402 git.cherry_pick_abort() 

403 raise 

404 

405 

406class Step(YAMLObject): 

407 

408 yaml_loader = Loader 

409 yaml_dumper = Dumper 

410 

411 @abstractmethod 

412 def run(self): 

413 pass 

414 

415 @property 

416 def git(self) -> Git: 

417 return contextGit.get() 

418 

419 

420@dataclass 

421class Then(Continuation): 

422 "Perform a list of steps in order" 

423 

424 steps: List[Step] 

425 

426 @contextmanager 

427 def impl(self) -> Iterator[None]: 

428 yield 

429 while self.steps: 

430 self.steps.pop(0).run() 

431 

432 

433def progn(*steps: Step): 

434 with Then(steps=list(steps)): 

435 pass