Coverage for src / gitq / continuations.py: 90%
268 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 15:32 -0400
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 15:32 -0400
1import sys
2import yaml
3from typing import Optional, List, TypeVar, ContextManager, Generic, Iterator, NoReturn, Type
4from contextlib import contextmanager
5from itertools import count
6from abc import abstractmethod
7from dataclasses import dataclass, field
9from .output import Output
10from .git import Git, UserError, GitFailed, contextGit, Commit
11from .yaml import YAMLObject, BaseLoader
14class Loader(BaseLoader):
15 "YAML loader for .git/continuation.yaml"
17 pass
20class Dumper(yaml.Dumper):
21 "YAML dumper for .git/continuation.yaml"
23 pass
26@dataclass
27class Continuations(YAMLObject):
28 "Root object for .git/continuation.yaml"
30 yaml_loader = Loader
31 yaml_dumper = Dumper
32 continuations: List[Continuation]
33 tool: str
34 status: str | None = field(default=None)
37yaml.add_path_resolver("!Continuations", [], Loader=Loader, Dumper=Dumper)
39T = TypeVar("T")
42class Suspend(BaseException):
43 "Suspend execution and save a stack of continuations in .git/continuation.yaml"
45 continuations: List["Continuation"]
46 status: Optional[str]
48 def __init__(self, *, status: str | None = None) -> None:
49 super().__init__()
50 self.status = status
51 self.continuations = list()
54class Resume(BaseException):
55 "Resume execution with some additional instruction from the user."
58class Abort(Exception):
59 """
60 Raised into a resume stack by `--abort`. This will abort the operation
61 and restore git to its previous state.
62 """
65class Continuation(Generic[T], YAMLObject):
66 """
67 A continuation is is a context manager that can be suspended,
68 serialized out to yaml, and then resumed in a subsequent execution of
69 this program.
71 This is a very low-tech approach to serializeable continuations, and it
72 relies on suspendable code being written in a strange idiom to work.
74 Anything that needs to happen after a resume needs to be expressed as a
75 stack of `Continuation` instances, rather than ordinary function calls.
77 A continuation class must:
79 * be a dataclass with only serializable attributes
81 * implement a context manager overriding `.impl()`
83 * be prepared to reconstruct the execution state of `.impl()`, if it
84 calls anything that might raise Suspend. In other words, there is
85 no magic here that somehow serializes the python execution state.
86 Each `Continuation` instance is just going to be reanimated based
87 on its serializeable attributes, and resume again from the yield.
88 """
90 yaml_loader = Loader
91 yaml_dumper = Dumper
93 manager: ContextManager[T] = field(metadata={"yaml_exclude": True})
95 def __enter__(self) -> T:
96 self.manager = self.impl()
97 try:
98 return self.manager.__enter__()
99 except Suspend as exception:
100 raise Exception("continuations must not suspend before yield") from exception
102 def __exit__(self, exception_type, exception, traceback) -> bool | None:
103 if exception is None and exception_type is not None: 103 ↛ 104line 103 didn't jump to line 104 because the condition on line 103 was never true
104 exception = exception_type()
106 if isinstance(exception, Suspend):
107 exception.continuations.append(self)
108 return None
110 try:
111 return self.manager.__exit__(exception_type, exception, traceback)
112 except Suspend as exception:
113 exception.continuations.append(self)
114 raise
116 @abstractmethod
117 def impl(self) -> ContextManager[T]:
118 pass
120 @property
121 def git(sef) -> Git:
122 return contextGit.get()
124 @staticmethod
125 def register(c: Type[YAMLObject]) -> None:
126 "register a class for yaml serialization in continuations"
127 Loader.add_constructor(c.yaml_tag, c.from_yaml)
128 Dumper.add_representer(c, c.to_yaml)
131class Main:
133 tool: str
134 suspend_message = "Suspended!"
136 @abstractmethod
137 def main(self) -> None:
138 pass
140 def __call__(self) -> NoReturn:
141 self.git = Git()
142 contextGit.set(self.git)
143 try:
144 self.main()
145 except UserError as e:
146 Output.print(e)
147 sys.exit(1)
148 except Abort:
149 Output.print("Cancelled. Previous state restored.")
150 sys.exit(0)
152 @contextmanager
153 def setup(self) -> Iterator:
154 if not self.git.is_clean(): 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true
155 raise UserError("Error: repo not clean")
156 if self.git.continuation.exists(): 156 ↛ 157line 156 didn't jump to line 157 because the condition on line 156 was never true
157 with open(self.git.continuation, "r") as f:
158 j: Continuations = yaml.load(f, Loader)
159 raise UserError(f"{j.tool} operation is already in progress.")
160 try:
161 yield
162 except Suspend as e:
163 self.suspend(e)
164 except Resume as e:
165 raise Exception("Internal error. Uncaught Resume") from e
167 def suspend(self, e: Suspend) -> NoReturn:
168 if e.status:
169 Output.print(e.status)
170 with open(self.git.continuation, "w") as f:
171 continuations = list(reversed(e.continuations))
172 j = Continuations(continuations, self.tool, e.status)
173 yaml.dump(j, f, Dumper=Dumper)
174 Output.print(self.suspend_message)
175 sys.exit(2)
177 def reanimate(self, continuations: List[Continuation], *, throw: BaseException | None) -> None:
178 if not len(continuations):
179 if throw is not None:
180 raise throw
181 else:
182 return
183 continuation, *continuations = continuations
184 with continuation:
185 self.reanimate(continuations, throw=throw)
187 def resume(self, throw: BaseException | None = None) -> NoReturn:
189 if not self.git.continuation.exists():
190 raise UserError(f"Error: no {self.tool} operation is in progress")
192 with open(self.git.continuation, "r") as f:
193 j: Continuations = yaml.load(f, Loader)
195 # All the commands support continue and abort, so its fine if the
196 # user calls them from the wrong tool.
197 if j.tool != self.tool and throw is not None and not isinstance(throw, Abort): 197 ↛ 198line 197 didn't jump to line 198 because the condition on line 197 was never true
198 raise UserError(f"A {j.tool} operation is currently in progress")
200 self.git.continuation.unlink()
202 try:
203 self.reanimate(j.continuations, throw=throw)
204 except Suspend as e:
205 self.suspend(e)
206 except Resume as e:
207 raise Exception("Internal error. Uncaught Resume") from e
209 sys.exit(0)
211 def status(self) -> None:
212 if not self.git.continuation.exists():
213 Output.print("no operation in progress")
214 return
215 with open(self.git.continuation, "r") as f:
216 j: Continuations = yaml.load(f, Loader)
217 if j.tool != self.tool:
218 Output.print(f"{j.tool} operation is in progress.")
219 Output.print(j.status or f"{j.tool} operation is in progress")
222class Finally(Continuation):
223 "This should be used instead of try/finally for continuation classes."
225 @abstractmethod
226 def cleanup(self) -> None:
227 pass
229 @contextmanager
230 def impl(self) -> Iterator[None]:
231 try:
232 yield
233 except GeneratorExit:
234 raise
235 except (Exception, Resume):
236 self.cleanup()
237 raise
238 except BaseException as e:
239 self.cleanup()
240 raise Exception(f"Unexpected BaseException: {repr(e)}")
241 else:
242 self.cleanup()
245@dataclass
246class DeleteTempBranch(Finally):
248 branch: str
249 previous_head: str
251 def cleanup(self) -> None:
252 if self.git.on_orphan_branch():
253 Output.print(f"# reset back to before creating {self.branch} branch")
254 self.git.force_checkout(self.previous_head)
255 else:
256 self.git.detach()
257 if self.git.branch_exists(self.branch):
258 self.git.cmd(["git", "branch", "-qD", self.branch])
261@contextmanager
262def TempBranch() -> Iterator[str]:
263 """
264 Create a temporary branch with no content and no parents.
265 """
266 git = contextGit.get()
267 branches = set(git.branches())
268 for n in count(): 268 ↛ 273line 268 didn't jump to line 273 because the loop on line 268 didn't complete
269 branch = f"temp-{n}"
270 if branch not in branches: 270 ↛ 268line 270 didn't jump to line 268 because the condition on line 270 was always true
271 break
272 else:
273 raise AssertionError
275 with DeleteTempBranch(branch=branch, previous_head=git.head()):
276 git.cmd(["git", "checkout", "-q", "--orphan", branch])
277 git.delete_index_and_files()
278 yield branch
281@contextmanager
282def CheckoutBaseline(sha: str | None):
283 """
284 Checkout a baseline commit, or if argument is None, create a temporary
285 branch with no history and check that out.
286 """
287 git = contextGit.get()
288 if sha is None:
289 with TempBranch():
290 yield
291 else:
292 git.checkout(sha)
293 yield
296@dataclass
297class EditBranch(Continuation[str]):
298 """
299 Detach from the current branch, so it can be edited without polluting
300 the reflog with a bunch of intermediate steps. At the end, update the
301 branch using message, and check it back out again.
302 """
304 message: str
305 head: str | None = field(default=None)
307 @property
308 def branch(self) -> Optional[str]:
309 if self.head and self.head.startswith("refs/heads/"): 309 ↛ 311line 309 didn't jump to line 311 because the condition on line 309 was always true
310 return self.head.removeprefix("refs/heads/") or None
311 return None
313 @contextmanager
314 def impl(self) -> Iterator[str]:
315 if self.head is None:
316 self.head = self.git.head()
317 self.git.detach()
318 try:
319 yield self.head
320 except (Exception, Resume):
321 Output.print("# Failed. Resetting to original HEAD")
322 self.git.force_checkout(self.branch or self.head)
323 raise
324 else:
325 if self.branch: 325 ↛ exitline 325 didn't return from function 'impl' because the condition on line 325 was always true
326 self.git.cmd(["git", "update-ref", "-m", self.message, self.head, "HEAD"])
327 self.git.checkout(self.branch, comment="done editing branch")
330@dataclass
331class CheckoutBranch(Finally):
332 "Temporarily checkout ref, then restore to previous HEAD"
334 branch: str
335 old_branch: str | None = field(default=None)
337 def cleanup(self):
338 if self.old_branch is not None: 338 ↛ exitline 338 didn't return from function 'cleanup' because the condition on line 338 was always true
339 self.git.force_checkout(self.old_branch, comment="restore previous HEAD")
341 @contextmanager
342 def impl(self) -> Iterator:
343 assert self.branch.startswith("refs/heads/")
344 if self.old_branch is None:
345 self.old_branch = self.git.head()
346 if self.old_branch.startswith("refs/heads/"): 346 ↛ 348line 346 didn't jump to line 348 because the condition on line 346 was always true
347 self.old_branch = self.old_branch.removeprefix("refs/heads/")
348 self.git.checkout(self.branch.removeprefix("refs/heads/"), comment="checkout branch")
349 with super().impl():
350 yield
353@dataclass
354class PickCherries(Continuation):
355 "Yield, then cherry-pick specified commits."
357 cherries: List[str]
358 edit: bool = field(default=False)
360 @contextmanager
361 def impl(self) -> Iterator[None]:
362 yield
363 while self.cherries:
364 cherry, *self.cherries = self.cherries
365 cherry_pick(self.git.commit(cherry), edit=self.edit)
368@dataclass
369class CherryPickContinue(Continuation):
370 """
371 When resuming, check if the user ran `git cherry-pick --continue`, and
372 do it for them if they have't.
373 """
375 ref: str
377 @contextmanager
378 def impl(self) -> Iterator[None]:
379 try:
380 yield
381 except (Exception, Resume):
382 self.git.cherry_pick_abort()
383 raise
384 if self.git.cherry_pick_in_progress: 384 ↛ exitline 384 didn't return from function 'impl' because the condition on line 384 was always true
385 if self.git.has_unmerged_files():
386 Output.print("The index still has unmerged files.")
387 raise Suspend(status=f"cherry-picking {self.ref}")
388 self.git.cmd(["git", "cherry-pick", "--continue"])
391def cherry_pick(cherry: Commit, *, edit: bool = False) -> None:
392 "Cherry-pick a single commit. If it fails, suspend so the user can resolve conflicts."
393 git = contextGit.get()
394 abbrev = git.abbrev(cherry.sha)
395 try:
396 git.cmd(["git", "cherry-pick", "--quiet", "--allow-empty", abbrev], comment=cherry.title)
397 except GitFailed:
398 if edit and git.cherry_pick_in_progress:
399 with CherryPickContinue(ref=cherry.sha):
400 raise Suspend(status=f"cherry-picking {abbrev} {cherry.title}")
401 else:
402 git.cherry_pick_abort()
403 raise
406class Step(YAMLObject):
408 yaml_loader = Loader
409 yaml_dumper = Dumper
411 @abstractmethod
412 def run(self):
413 pass
415 @property
416 def git(self) -> Git:
417 return contextGit.get()
420@dataclass
421class Then(Continuation):
422 "Perform a list of steps in order"
424 steps: List[Step]
426 @contextmanager
427 def impl(self) -> Iterator[None]:
428 yield
429 while self.steps:
430 self.steps.pop(0).run()
433def progn(*steps: Step):
434 with Then(steps=list(steps)):
435 pass