Coverage for structured_tutorials / runners / base.py: 100%
292 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 13:17 +0200
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 13:17 +0200
1# Copyright (c) 2025 Mathias Ertl
2# Licensed under the MIT License. See LICENSE file for details.
4"""Base classes for runners."""
6import abc
7import io
8import logging
9import os
10import shlex
11import shutil
12import socket
13import subprocess
14import sys
15import tempfile
16import time
17from copy import deepcopy
18from pathlib import Path
19from subprocess import CompletedProcess
20from typing import Any
22from jinja2 import Environment, TemplateSyntaxError
24from structured_tutorials.errors import (
25 CommandOutputTestError,
26 CommandTestError,
27 InvalidAlternativesSelectedError,
28 PromptNotConfirmedError,
29 RequiredExecutableNotFoundError,
30)
31from structured_tutorials.models import (
32 AlternativeModel,
33 CommandsPartModel,
34 FilePartModel,
35 PromptModel,
36 TutorialModel,
37)
38from structured_tutorials.models.base import CommandType
39from structured_tutorials.models.parts import CleanupCommandModel, CommandModel
40from structured_tutorials.models.tests import TestCommandModel, TestOutputModel, TestPortModel
41from structured_tutorials.utils import chdir, check_count, cleanup, git_export
43log = logging.getLogger(__name__)
44command_logger = logging.getLogger("command")
45part_log = logging.getLogger("part")
48class RunnerBase(abc.ABC):
49 """Base class for runners to provide shared functionality."""
51 def __init__(
52 self,
53 tutorial: TutorialModel,
54 path: Path | None = None,
55 alternatives: tuple[str, ...] = (),
56 show_command_output: bool = True,
57 interactive: bool = True,
58 context: dict[str, Any] | None = None,
59 ):
60 self.path = path
61 self.tutorial = tutorial
62 self.orig_cwd = Path.cwd()
64 # Create Jinja2 environment for rendering templates
65 self.env = Environment(keep_trailing_newline=True)
67 # Compute initial context
68 self.context = deepcopy(tutorial.configuration.context)
69 self.context.update(deepcopy(tutorial.configuration.run.context))
70 if context:
71 self.context.update(context)
73 # Render context variables incrementally
74 for key, value in [(k, v) for k, v in self.context.items() if isinstance(v, str)]:
75 if key.endswith("_template"):
76 continue
77 self.context[key] = self.render(value, _key=key)
79 # Set up the environment for commands
80 if tutorial.configuration.run.clear_environment:
81 environment: dict[str, str | None] = {}
82 else:
83 environment = os.environ.copy() # type: ignore[assignment]
84 environment.update(tutorial.configuration.run.environment)
86 # Check for required executables
87 for executable in tutorial.configuration.run.required_executables:
88 if not shutil.which(executable, path=environment.get("PATH")):
89 raise RequiredExecutableNotFoundError(f"{executable}: Executable not found.")
91 # Handle alternatives
92 self.alternatives = alternatives
93 for alternative in alternatives:
94 if config := tutorial.configuration.run.alternatives.get(alternative):
95 environment.update(config.environment)
96 self.context.update(config.context)
98 for executable in config.required_executables:
99 if not shutil.which(executable, path=environment.get("PATH")):
100 raise RequiredExecutableNotFoundError(f"{executable}: Executable not found.")
102 self.cleanup: list[CleanupCommandModel] = []
103 self.show_command_output = show_command_output
104 self.interactive = interactive
106 self.environment = {
107 k: self.render(v, ignore_errors=True) for k, v in environment.items() if v is not None
108 }
110 def render(self, value: str, ignore_errors=False, **context: Any) -> str:
111 try:
112 return self.env.from_string(value).render({**self.context, **context})
113 except TemplateSyntaxError: # pragma: no cover
114 if ignore_errors:
115 return value
116 raise
118 def render_command(self, command: CommandType, **context: Any) -> CommandType:
119 if isinstance(command, str):
120 return self.render(command, **context)
122 return tuple(self.render(token, **context) for token in command)
124 def test_output(self, proc: subprocess.CompletedProcess[bytes], test: TestOutputModel) -> None:
125 if test.stream == "stderr":
126 value = proc.stderr
127 else:
128 value = proc.stdout
129 if test.strip:
130 value = value.strip()
132 if test.regex:
133 if (match := test.regex.search(value)) is not None:
134 self.context.update({k: v.decode("utf-8") for k, v in match.groupdict().items()})
135 else:
136 decoded = value.decode("utf-8")
137 raise CommandOutputTestError(f"Process did not have the expected output: '{decoded}'")
139 # Test for line/character count of output
140 if test.line_count:
141 try:
142 check_count(value.splitlines(), test.line_count)
143 except ValueError as ex:
144 raise CommandOutputTestError(f"Line count error: {ex}") from ex
145 if test.character_count:
146 try:
147 check_count(value, test.character_count)
148 except ValueError as ex:
149 raise CommandOutputTestError(f"Character count error: {ex}") from ex
151 def validate_alternatives(self) -> None:
152 """Validate that for each alternative part, an alternative was selected."""
153 chosen = set(self.alternatives)
155 for part_no, part in enumerate(self.tutorial.parts, start=1):
156 if isinstance(part, AlternativeModel):
157 selected = chosen & set(part.alternatives)
159 if part.required and len(selected) == 0:
160 raise InvalidAlternativesSelectedError(f"Part {part_no}: No alternative selected.")
161 elif len(selected) > 1:
162 raise InvalidAlternativesSelectedError(
163 f"Part {part_no}: More then one alternative selected: {selected}"
164 )
166 def run_shell_command(
167 self,
168 command: CommandType,
169 show_output: bool,
170 capture_output: bool = False,
171 stdin: int | io.BufferedReader | None = None,
172 input: bytes | None = None,
173 environment: dict[str, Any] | None = None,
174 clear_environment: bool = False,
175 options: dict[str, Any] | None = None,
176 ) -> CompletedProcess[bytes]:
177 # Only show output if runner itself is not configured to hide all output
178 if show_output:
179 show_output = self.show_command_output
181 # Configure stdout/stderr streams
182 if capture_output:
183 stdout: int | None = subprocess.PIPE
184 stderr: int | None = subprocess.PIPE
185 elif show_output:
186 stdout = stderr = None
187 else:
188 stdout = stderr = subprocess.DEVNULL
190 # Configure environment
191 if environment:
192 # If environment is passed, we render variables as templates
193 environment = {k: self.render(v) for k, v in environment.items()}
194 elif environment is None: # pragma: no cover # dict is always passed directly
195 # just to simplify the next branch -> environment is always a dict
196 environment = {}
197 if clear_environment:
198 env = environment
199 elif environment:
200 env = {**self.environment, **environment}
201 else:
202 env = self.environment
204 # Render the command (args) as template
205 command = self.render_command(command)
207 shell = True
208 logged_command = command # The command string we pass to the logger
209 if isinstance(command, tuple):
210 shell = False
211 logged_command = shlex.join(logged_command)
213 command_logger.info(logged_command)
214 proc = subprocess.run(
215 command, shell=shell, stdin=stdin, input=input, stdout=stdout, stderr=stderr, env=env
216 )
218 # If we want to show the output and capture it at the same time, we need can only show the streams
219 # separately at the end.
220 if capture_output and show_output:
221 print("--- stdout ---")
222 sys.stdout.buffer.write(proc.stdout + b"\n")
223 sys.stdout.buffer.flush()
224 print("--- stderr ---")
225 sys.stdout.buffer.write(proc.stderr + b"\n")
226 sys.stdout.buffer.flush()
228 return proc
230 def run_commands(self, part: CommandsPartModel, options: dict[str, Any] | None = None) -> None:
231 assert part.run is not False # Already checked by the caller
232 if options is None:
233 options = {}
234 options = {**options, **part.run.options}
236 for command_config in part.commands:
237 if command_config.run is False:
238 continue
240 self.run_command(command_config, options)
242 def run_prompt(self, part: PromptModel) -> None:
243 prompt = self.render(part.prompt).strip() + " "
245 if part.response == "enter":
246 input(prompt)
247 else: # type == confirm
248 valid_inputs = ("n", "no", "yes", "y", "")
249 while (response := input(prompt).strip().lower()) not in valid_inputs:
250 print(f"Please enter a valid value ({'/'.join(valid_inputs)}).")
252 if response in ("n", "no") or (response == "" and not part.default):
253 error = self.render(part.error, response=response)
254 raise PromptNotConfirmedError(error)
256 def run_alternative(self, part: AlternativeModel) -> None:
257 assert part.run is not False # Already checked by the caller
258 selected = set(self.alternatives) & set(part.alternatives)
260 # Note: The CLI agent already verifies this - just assert this to be sure.
261 assert len(selected) <= 1, "More then one part selected."
262 options = part.run.options
264 if selected:
265 selected_part = part.alternatives[next(iter(selected))]
266 if isinstance(selected_part, CommandsPartModel):
267 self.run_commands(selected_part, options)
268 elif isinstance(selected_part, FilePartModel):
269 self.write_file(selected_part, options)
270 else: # pragma: no cover
271 raise RuntimeError(f"{selected_part} is not supported as alternative.")
273 if part.chdir:
274 rendered_chdir = self.render(str(part.chdir))
275 log.info("Changing working directory to %s.", rendered_chdir)
276 self.chdir(rendered_chdir, part.run.options)
277 elif part.chdir is False:
278 log.info("Changing working directory to %s.", self.orig_cwd)
279 self.chdir(str(self.orig_cwd), part.run.options)
281 def run_parts(self) -> None:
282 for part in self.tutorial.parts:
283 if isinstance(part, PromptModel):
284 if self.interactive:
285 self.run_prompt(part)
286 continue
287 if part.run is False:
288 continue
290 if part.name: # pragma: no cover
291 part_log.info(part.name)
292 else:
293 part_log.info(f"Running part {part.id}...")
295 if isinstance(part, CommandsPartModel):
296 self.run_commands(part)
297 elif isinstance(part, FilePartModel):
298 self.write_file(part)
299 elif isinstance(part, AlternativeModel):
300 self.run_alternative(part)
301 else: # pragma: no cover
302 raise RuntimeError(f"{part} is not a tutorial part")
304 self.context.update(part.run.update_context)
306 def run(self) -> None:
307 if self.tutorial.configuration.run.temporary_directory:
308 with tempfile.TemporaryDirectory() as tmpdir_name:
309 log.info("Switching to temporary directory: %s", tmpdir_name)
310 self.context["cwd"] = self.context["temp_dir"] = Path(tmpdir_name)
311 self.context["orig_cwd"] = Path.cwd()
313 with chdir(tmpdir_name), cleanup(self):
314 self.orig_cwd = Path.cwd()
315 self.run_parts()
316 elif self.tutorial.configuration.run.git_export:
317 with tempfile.TemporaryDirectory() as tmpdir_name:
318 work_dir = git_export(tmpdir_name)
319 log.info("Creating git export at: %s", work_dir)
320 self.context["cwd"] = self.context["export_dir"] = work_dir
321 self.context["orig_cwd"] = Path.cwd()
323 with chdir(work_dir), cleanup(self):
324 self.orig_cwd = Path.cwd()
325 self.run_parts()
326 else:
327 with cleanup(self):
328 self.run_parts()
330 def prepare_tutorial(self) -> None:
331 """Function invoked before running the tutorial."""
332 return
334 def cleanup_tutorial(self) -> None:
335 """Function invoked after running the tutorial."""
336 return
338 def update_environment_variable(self, key: str, value: str, options: dict[str, Any]) -> None:
339 """Set an environment variable."""
340 self.environment[key] = value
342 def update_environment(self, env: dict[str, str], options: dict[str, Any]) -> None:
343 env = {k: self.render(v) for k, v in env.items() if v is not None}
344 for key, value in env.items():
345 self.update_environment_variable(key, value, options)
347 def write_file(self, part: FilePartModel, options: dict[str, Any] | None = None) -> None:
348 """Write a file."""
349 assert part.run is not False # Already checked by the caller
350 if options is None:
351 options = {}
352 options = {**options, **part.run.options}
354 raw_destination = self.render(part.destination)
355 destination = Path(raw_destination)
357 if raw_destination.endswith(os.path.sep):
358 # Model validation already validates that the destination does not look like a directory, if no
359 # source is set, but this could be tricked if the destination is a template.
360 if not part.source:
361 raise RuntimeError(
362 f"{raw_destination}: Destination is directory, but no source given to derive filename."
363 )
365 destination.mkdir(parents=True, exist_ok=True)
366 destination = destination / part.source.name
367 elif destination.exists():
368 raise RuntimeError(f"{destination}: Destination already exists.")
370 # If template=False and source is set, we just copy the file as is, without ever reading it
371 if not part.template and part.source:
372 self.copy_file(self.tutorial.root / part.source, destination, options)
373 return
375 if part.source:
376 with open(self.tutorial.root / part.source) as source_stream:
377 template = source_stream.read()
378 else:
379 assert isinstance(part.contents, str) # assured by model validation
380 template = part.contents
382 if part.template:
383 contents = self.render(template)
384 else:
385 contents = template
387 self.write_file_from_string(contents, destination, options)
389 def run_test(
390 self,
391 test: TestCommandModel | TestPortModel | TestOutputModel,
392 proc: subprocess.CompletedProcess[bytes],
393 options: dict[str, Any],
394 ) -> None:
395 # If the test is for an output stream, we can run it right away (the process has already finished).
396 if isinstance(test, TestOutputModel):
397 self.test_output(proc, test)
398 return
400 # If an initial delay is configured, wait that long
401 if test.delay > 0:
402 time.sleep(test.delay)
404 tries = 0
405 while tries <= test.retry:
406 tries += 1
408 if isinstance(test, TestCommandModel):
409 test_proc = self.run_shell_command(
410 test.command,
411 show_output=test.show_output,
412 environment=test.environment,
413 clear_environment=test.clear_environment,
414 options=options,
415 )
417 # Update environment regardless of success of command
418 self.update_environment(test.update_environment, options)
420 if test.status_code == test_proc.returncode:
421 return
422 else:
423 log.error("%s: Test command failed.", shlex.join(test_proc.args))
424 else:
425 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
426 try:
427 s.connect((test.host, test.port))
428 except Exception:
429 log.error("%s:%s: failed to connect.", test.host, test.port)
430 else:
431 return
433 wait = test.backoff_factor * (2 ** (tries - 1))
434 if wait > 0 and tries <= test.retry:
435 time.sleep(wait)
437 raise CommandTestError("Test did not pass")
439 def run_command(self, config: CommandModel, options: dict[str, Any]) -> None:
440 assert config.run is not False # Already checked by the caller
441 # Capture output if any test is for the output.
442 capture_output = any(isinstance(test, TestOutputModel) for test in config.run.test)
443 options = {**options, **config.run.options}
445 # Run the command and check status code
446 if config.run.stdin and config.run.stdin.source and not config.run.stdin.template:
447 with open(self.tutorial.root / config.run.stdin.source, "rb") as stdin:
448 proc = self.run_shell_command(
449 config.command,
450 show_output=config.run.show_output,
451 capture_output=capture_output,
452 stdin=stdin,
453 environment=config.run.environment,
454 clear_environment=config.run.clear_environment,
455 options=options,
456 )
457 else:
458 # Configure stdin
459 proc_input = None
460 if stdin_config := config.run.stdin:
461 if stdin_config.contents:
462 proc_input = self.render(stdin_config.contents).encode("utf-8")
463 elif stdin_config.template: # source path, but template=True
464 assert stdin_config.source is not None
465 with open(self.tutorial.root / stdin_config.source) as stream:
466 stdin_template = stream.read()
467 proc_input = self.render(stdin_template).encode("utf-8")
469 else: # pragma: no cover
470 raise RuntimeError("Invalid configuration.")
472 proc = self.run_shell_command(
473 config.command,
474 show_output=config.run.show_output,
475 capture_output=capture_output,
476 input=proc_input,
477 environment=config.run.environment,
478 clear_environment=config.run.clear_environment,
479 options=options,
480 )
482 # Update list of cleanup commands
483 self.cleanup = list(config.run.cleanup) + self.cleanup
485 # Handle errors in commands
486 if proc.returncode != config.run.status_code:
487 raise RuntimeError(
488 f"{config.command} failed with return code {proc.returncode} "
489 f"(expected: {config.run.status_code})."
490 )
492 # Update the context from update_context
493 self.context.update(config.run.update_context)
495 if config.chdir:
496 rendered_chdir = self.render(str(config.chdir))
497 log.info("Changing working directory to %s.", rendered_chdir)
498 self.chdir(rendered_chdir, options)
499 elif config.chdir is False:
500 log.info("Changing working directory to %s.", self.orig_cwd)
501 self.chdir(str(self.orig_cwd), options)
503 # Run test commands
504 for test_command_config in config.run.test:
505 self.run_test(test_command_config, proc, options)
507 # Update environment (after test commands - they may update the context)
508 self.update_environment(config.run.update_environment, options)
510 @abc.abstractmethod
511 def chdir(self, path: str, options: dict[str, Any]) -> None: ...
513 @abc.abstractmethod
514 def copy_file(self, source: Path, destination: Path, options: dict[str, Any]) -> None: ...
516 @abc.abstractmethod
517 def write_file_from_string(self, contents: str, destination: Path, options: dict[str, Any]) -> None: ...