Coverage for structured_tutorials / runners / base.py: 100%

292 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 13:17 +0200

1# Copyright (c) 2025 Mathias Ertl 

2# Licensed under the MIT License. See LICENSE file for details. 

3 

4"""Base classes for runners.""" 

5 

6import abc 

7import io 

8import logging 

9import os 

10import shlex 

11import shutil 

12import socket 

13import subprocess 

14import sys 

15import tempfile 

16import time 

17from copy import deepcopy 

18from pathlib import Path 

19from subprocess import CompletedProcess 

20from typing import Any 

21 

22from jinja2 import Environment, TemplateSyntaxError 

23 

24from structured_tutorials.errors import ( 

25 CommandOutputTestError, 

26 CommandTestError, 

27 InvalidAlternativesSelectedError, 

28 PromptNotConfirmedError, 

29 RequiredExecutableNotFoundError, 

30) 

31from structured_tutorials.models import ( 

32 AlternativeModel, 

33 CommandsPartModel, 

34 FilePartModel, 

35 PromptModel, 

36 TutorialModel, 

37) 

38from structured_tutorials.models.base import CommandType 

39from structured_tutorials.models.parts import CleanupCommandModel, CommandModel 

40from structured_tutorials.models.tests import TestCommandModel, TestOutputModel, TestPortModel 

41from structured_tutorials.utils import chdir, check_count, cleanup, git_export 

42 

43log = logging.getLogger(__name__) 

44command_logger = logging.getLogger("command") 

45part_log = logging.getLogger("part") 

46 

47 

48class RunnerBase(abc.ABC): 

49 """Base class for runners to provide shared functionality.""" 

50 

51 def __init__( 

52 self, 

53 tutorial: TutorialModel, 

54 path: Path | None = None, 

55 alternatives: tuple[str, ...] = (), 

56 show_command_output: bool = True, 

57 interactive: bool = True, 

58 context: dict[str, Any] | None = None, 

59 ): 

60 self.path = path 

61 self.tutorial = tutorial 

62 self.orig_cwd = Path.cwd() 

63 

64 # Create Jinja2 environment for rendering templates 

65 self.env = Environment(keep_trailing_newline=True) 

66 

67 # Compute initial context 

68 self.context = deepcopy(tutorial.configuration.context) 

69 self.context.update(deepcopy(tutorial.configuration.run.context)) 

70 if context: 

71 self.context.update(context) 

72 

73 # Render context variables incrementally 

74 for key, value in [(k, v) for k, v in self.context.items() if isinstance(v, str)]: 

75 if key.endswith("_template"): 

76 continue 

77 self.context[key] = self.render(value, _key=key) 

78 

79 # Set up the environment for commands 

80 if tutorial.configuration.run.clear_environment: 

81 environment: dict[str, str | None] = {} 

82 else: 

83 environment = os.environ.copy() # type: ignore[assignment] 

84 environment.update(tutorial.configuration.run.environment) 

85 

86 # Check for required executables 

87 for executable in tutorial.configuration.run.required_executables: 

88 if not shutil.which(executable, path=environment.get("PATH")): 

89 raise RequiredExecutableNotFoundError(f"{executable}: Executable not found.") 

90 

91 # Handle alternatives 

92 self.alternatives = alternatives 

93 for alternative in alternatives: 

94 if config := tutorial.configuration.run.alternatives.get(alternative): 

95 environment.update(config.environment) 

96 self.context.update(config.context) 

97 

98 for executable in config.required_executables: 

99 if not shutil.which(executable, path=environment.get("PATH")): 

100 raise RequiredExecutableNotFoundError(f"{executable}: Executable not found.") 

101 

102 self.cleanup: list[CleanupCommandModel] = [] 

103 self.show_command_output = show_command_output 

104 self.interactive = interactive 

105 

106 self.environment = { 

107 k: self.render(v, ignore_errors=True) for k, v in environment.items() if v is not None 

108 } 

109 

110 def render(self, value: str, ignore_errors=False, **context: Any) -> str: 

111 try: 

112 return self.env.from_string(value).render({**self.context, **context}) 

113 except TemplateSyntaxError: # pragma: no cover 

114 if ignore_errors: 

115 return value 

116 raise 

117 

118 def render_command(self, command: CommandType, **context: Any) -> CommandType: 

119 if isinstance(command, str): 

120 return self.render(command, **context) 

121 

122 return tuple(self.render(token, **context) for token in command) 

123 

124 def test_output(self, proc: subprocess.CompletedProcess[bytes], test: TestOutputModel) -> None: 

125 if test.stream == "stderr": 

126 value = proc.stderr 

127 else: 

128 value = proc.stdout 

129 if test.strip: 

130 value = value.strip() 

131 

132 if test.regex: 

133 if (match := test.regex.search(value)) is not None: 

134 self.context.update({k: v.decode("utf-8") for k, v in match.groupdict().items()}) 

135 else: 

136 decoded = value.decode("utf-8") 

137 raise CommandOutputTestError(f"Process did not have the expected output: '{decoded}'") 

138 

139 # Test for line/character count of output 

140 if test.line_count: 

141 try: 

142 check_count(value.splitlines(), test.line_count) 

143 except ValueError as ex: 

144 raise CommandOutputTestError(f"Line count error: {ex}") from ex 

145 if test.character_count: 

146 try: 

147 check_count(value, test.character_count) 

148 except ValueError as ex: 

149 raise CommandOutputTestError(f"Character count error: {ex}") from ex 

150 

151 def validate_alternatives(self) -> None: 

152 """Validate that for each alternative part, an alternative was selected.""" 

153 chosen = set(self.alternatives) 

154 

155 for part_no, part in enumerate(self.tutorial.parts, start=1): 

156 if isinstance(part, AlternativeModel): 

157 selected = chosen & set(part.alternatives) 

158 

159 if part.required and len(selected) == 0: 

160 raise InvalidAlternativesSelectedError(f"Part {part_no}: No alternative selected.") 

161 elif len(selected) > 1: 

162 raise InvalidAlternativesSelectedError( 

163 f"Part {part_no}: More then one alternative selected: {selected}" 

164 ) 

165 

166 def run_shell_command( 

167 self, 

168 command: CommandType, 

169 show_output: bool, 

170 capture_output: bool = False, 

171 stdin: int | io.BufferedReader | None = None, 

172 input: bytes | None = None, 

173 environment: dict[str, Any] | None = None, 

174 clear_environment: bool = False, 

175 options: dict[str, Any] | None = None, 

176 ) -> CompletedProcess[bytes]: 

177 # Only show output if runner itself is not configured to hide all output 

178 if show_output: 

179 show_output = self.show_command_output 

180 

181 # Configure stdout/stderr streams 

182 if capture_output: 

183 stdout: int | None = subprocess.PIPE 

184 stderr: int | None = subprocess.PIPE 

185 elif show_output: 

186 stdout = stderr = None 

187 else: 

188 stdout = stderr = subprocess.DEVNULL 

189 

190 # Configure environment 

191 if environment: 

192 # If environment is passed, we render variables as templates 

193 environment = {k: self.render(v) for k, v in environment.items()} 

194 elif environment is None: # pragma: no cover # dict is always passed directly 

195 # just to simplify the next branch -> environment is always a dict 

196 environment = {} 

197 if clear_environment: 

198 env = environment 

199 elif environment: 

200 env = {**self.environment, **environment} 

201 else: 

202 env = self.environment 

203 

204 # Render the command (args) as template 

205 command = self.render_command(command) 

206 

207 shell = True 

208 logged_command = command # The command string we pass to the logger 

209 if isinstance(command, tuple): 

210 shell = False 

211 logged_command = shlex.join(logged_command) 

212 

213 command_logger.info(logged_command) 

214 proc = subprocess.run( 

215 command, shell=shell, stdin=stdin, input=input, stdout=stdout, stderr=stderr, env=env 

216 ) 

217 

218 # If we want to show the output and capture it at the same time, we need can only show the streams 

219 # separately at the end. 

220 if capture_output and show_output: 

221 print("--- stdout ---") 

222 sys.stdout.buffer.write(proc.stdout + b"\n") 

223 sys.stdout.buffer.flush() 

224 print("--- stderr ---") 

225 sys.stdout.buffer.write(proc.stderr + b"\n") 

226 sys.stdout.buffer.flush() 

227 

228 return proc 

229 

230 def run_commands(self, part: CommandsPartModel, options: dict[str, Any] | None = None) -> None: 

231 assert part.run is not False # Already checked by the caller 

232 if options is None: 

233 options = {} 

234 options = {**options, **part.run.options} 

235 

236 for command_config in part.commands: 

237 if command_config.run is False: 

238 continue 

239 

240 self.run_command(command_config, options) 

241 

242 def run_prompt(self, part: PromptModel) -> None: 

243 prompt = self.render(part.prompt).strip() + " " 

244 

245 if part.response == "enter": 

246 input(prompt) 

247 else: # type == confirm 

248 valid_inputs = ("n", "no", "yes", "y", "") 

249 while (response := input(prompt).strip().lower()) not in valid_inputs: 

250 print(f"Please enter a valid value ({'/'.join(valid_inputs)}).") 

251 

252 if response in ("n", "no") or (response == "" and not part.default): 

253 error = self.render(part.error, response=response) 

254 raise PromptNotConfirmedError(error) 

255 

256 def run_alternative(self, part: AlternativeModel) -> None: 

257 assert part.run is not False # Already checked by the caller 

258 selected = set(self.alternatives) & set(part.alternatives) 

259 

260 # Note: The CLI agent already verifies this - just assert this to be sure. 

261 assert len(selected) <= 1, "More then one part selected." 

262 options = part.run.options 

263 

264 if selected: 

265 selected_part = part.alternatives[next(iter(selected))] 

266 if isinstance(selected_part, CommandsPartModel): 

267 self.run_commands(selected_part, options) 

268 elif isinstance(selected_part, FilePartModel): 

269 self.write_file(selected_part, options) 

270 else: # pragma: no cover 

271 raise RuntimeError(f"{selected_part} is not supported as alternative.") 

272 

273 if part.chdir: 

274 rendered_chdir = self.render(str(part.chdir)) 

275 log.info("Changing working directory to %s.", rendered_chdir) 

276 self.chdir(rendered_chdir, part.run.options) 

277 elif part.chdir is False: 

278 log.info("Changing working directory to %s.", self.orig_cwd) 

279 self.chdir(str(self.orig_cwd), part.run.options) 

280 

281 def run_parts(self) -> None: 

282 for part in self.tutorial.parts: 

283 if isinstance(part, PromptModel): 

284 if self.interactive: 

285 self.run_prompt(part) 

286 continue 

287 if part.run is False: 

288 continue 

289 

290 if part.name: # pragma: no cover 

291 part_log.info(part.name) 

292 else: 

293 part_log.info(f"Running part {part.id}...") 

294 

295 if isinstance(part, CommandsPartModel): 

296 self.run_commands(part) 

297 elif isinstance(part, FilePartModel): 

298 self.write_file(part) 

299 elif isinstance(part, AlternativeModel): 

300 self.run_alternative(part) 

301 else: # pragma: no cover 

302 raise RuntimeError(f"{part} is not a tutorial part") 

303 

304 self.context.update(part.run.update_context) 

305 

306 def run(self) -> None: 

307 if self.tutorial.configuration.run.temporary_directory: 

308 with tempfile.TemporaryDirectory() as tmpdir_name: 

309 log.info("Switching to temporary directory: %s", tmpdir_name) 

310 self.context["cwd"] = self.context["temp_dir"] = Path(tmpdir_name) 

311 self.context["orig_cwd"] = Path.cwd() 

312 

313 with chdir(tmpdir_name), cleanup(self): 

314 self.orig_cwd = Path.cwd() 

315 self.run_parts() 

316 elif self.tutorial.configuration.run.git_export: 

317 with tempfile.TemporaryDirectory() as tmpdir_name: 

318 work_dir = git_export(tmpdir_name) 

319 log.info("Creating git export at: %s", work_dir) 

320 self.context["cwd"] = self.context["export_dir"] = work_dir 

321 self.context["orig_cwd"] = Path.cwd() 

322 

323 with chdir(work_dir), cleanup(self): 

324 self.orig_cwd = Path.cwd() 

325 self.run_parts() 

326 else: 

327 with cleanup(self): 

328 self.run_parts() 

329 

330 def prepare_tutorial(self) -> None: 

331 """Function invoked before running the tutorial.""" 

332 return 

333 

334 def cleanup_tutorial(self) -> None: 

335 """Function invoked after running the tutorial.""" 

336 return 

337 

338 def update_environment_variable(self, key: str, value: str, options: dict[str, Any]) -> None: 

339 """Set an environment variable.""" 

340 self.environment[key] = value 

341 

342 def update_environment(self, env: dict[str, str], options: dict[str, Any]) -> None: 

343 env = {k: self.render(v) for k, v in env.items() if v is not None} 

344 for key, value in env.items(): 

345 self.update_environment_variable(key, value, options) 

346 

347 def write_file(self, part: FilePartModel, options: dict[str, Any] | None = None) -> None: 

348 """Write a file.""" 

349 assert part.run is not False # Already checked by the caller 

350 if options is None: 

351 options = {} 

352 options = {**options, **part.run.options} 

353 

354 raw_destination = self.render(part.destination) 

355 destination = Path(raw_destination) 

356 

357 if raw_destination.endswith(os.path.sep): 

358 # Model validation already validates that the destination does not look like a directory, if no 

359 # source is set, but this could be tricked if the destination is a template. 

360 if not part.source: 

361 raise RuntimeError( 

362 f"{raw_destination}: Destination is directory, but no source given to derive filename." 

363 ) 

364 

365 destination.mkdir(parents=True, exist_ok=True) 

366 destination = destination / part.source.name 

367 elif destination.exists(): 

368 raise RuntimeError(f"{destination}: Destination already exists.") 

369 

370 # If template=False and source is set, we just copy the file as is, without ever reading it 

371 if not part.template and part.source: 

372 self.copy_file(self.tutorial.root / part.source, destination, options) 

373 return 

374 

375 if part.source: 

376 with open(self.tutorial.root / part.source) as source_stream: 

377 template = source_stream.read() 

378 else: 

379 assert isinstance(part.contents, str) # assured by model validation 

380 template = part.contents 

381 

382 if part.template: 

383 contents = self.render(template) 

384 else: 

385 contents = template 

386 

387 self.write_file_from_string(contents, destination, options) 

388 

389 def run_test( 

390 self, 

391 test: TestCommandModel | TestPortModel | TestOutputModel, 

392 proc: subprocess.CompletedProcess[bytes], 

393 options: dict[str, Any], 

394 ) -> None: 

395 # If the test is for an output stream, we can run it right away (the process has already finished). 

396 if isinstance(test, TestOutputModel): 

397 self.test_output(proc, test) 

398 return 

399 

400 # If an initial delay is configured, wait that long 

401 if test.delay > 0: 

402 time.sleep(test.delay) 

403 

404 tries = 0 

405 while tries <= test.retry: 

406 tries += 1 

407 

408 if isinstance(test, TestCommandModel): 

409 test_proc = self.run_shell_command( 

410 test.command, 

411 show_output=test.show_output, 

412 environment=test.environment, 

413 clear_environment=test.clear_environment, 

414 options=options, 

415 ) 

416 

417 # Update environment regardless of success of command 

418 self.update_environment(test.update_environment, options) 

419 

420 if test.status_code == test_proc.returncode: 

421 return 

422 else: 

423 log.error("%s: Test command failed.", shlex.join(test_proc.args)) 

424 else: 

425 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

426 try: 

427 s.connect((test.host, test.port)) 

428 except Exception: 

429 log.error("%s:%s: failed to connect.", test.host, test.port) 

430 else: 

431 return 

432 

433 wait = test.backoff_factor * (2 ** (tries - 1)) 

434 if wait > 0 and tries <= test.retry: 

435 time.sleep(wait) 

436 

437 raise CommandTestError("Test did not pass") 

438 

439 def run_command(self, config: CommandModel, options: dict[str, Any]) -> None: 

440 assert config.run is not False # Already checked by the caller 

441 # Capture output if any test is for the output. 

442 capture_output = any(isinstance(test, TestOutputModel) for test in config.run.test) 

443 options = {**options, **config.run.options} 

444 

445 # Run the command and check status code 

446 if config.run.stdin and config.run.stdin.source and not config.run.stdin.template: 

447 with open(self.tutorial.root / config.run.stdin.source, "rb") as stdin: 

448 proc = self.run_shell_command( 

449 config.command, 

450 show_output=config.run.show_output, 

451 capture_output=capture_output, 

452 stdin=stdin, 

453 environment=config.run.environment, 

454 clear_environment=config.run.clear_environment, 

455 options=options, 

456 ) 

457 else: 

458 # Configure stdin 

459 proc_input = None 

460 if stdin_config := config.run.stdin: 

461 if stdin_config.contents: 

462 proc_input = self.render(stdin_config.contents).encode("utf-8") 

463 elif stdin_config.template: # source path, but template=True 

464 assert stdin_config.source is not None 

465 with open(self.tutorial.root / stdin_config.source) as stream: 

466 stdin_template = stream.read() 

467 proc_input = self.render(stdin_template).encode("utf-8") 

468 

469 else: # pragma: no cover 

470 raise RuntimeError("Invalid configuration.") 

471 

472 proc = self.run_shell_command( 

473 config.command, 

474 show_output=config.run.show_output, 

475 capture_output=capture_output, 

476 input=proc_input, 

477 environment=config.run.environment, 

478 clear_environment=config.run.clear_environment, 

479 options=options, 

480 ) 

481 

482 # Update list of cleanup commands 

483 self.cleanup = list(config.run.cleanup) + self.cleanup 

484 

485 # Handle errors in commands 

486 if proc.returncode != config.run.status_code: 

487 raise RuntimeError( 

488 f"{config.command} failed with return code {proc.returncode} " 

489 f"(expected: {config.run.status_code})." 

490 ) 

491 

492 # Update the context from update_context 

493 self.context.update(config.run.update_context) 

494 

495 if config.chdir: 

496 rendered_chdir = self.render(str(config.chdir)) 

497 log.info("Changing working directory to %s.", rendered_chdir) 

498 self.chdir(rendered_chdir, options) 

499 elif config.chdir is False: 

500 log.info("Changing working directory to %s.", self.orig_cwd) 

501 self.chdir(str(self.orig_cwd), options) 

502 

503 # Run test commands 

504 for test_command_config in config.run.test: 

505 self.run_test(test_command_config, proc, options) 

506 

507 # Update environment (after test commands - they may update the context) 

508 self.update_environment(config.run.update_environment, options) 

509 

510 @abc.abstractmethod 

511 def chdir(self, path: str, options: dict[str, Any]) -> None: ... 

512 

513 @abc.abstractmethod 

514 def copy_file(self, source: Path, destination: Path, options: dict[str, Any]) -> None: ... 

515 

516 @abc.abstractmethod 

517 def write_file_from_string(self, contents: str, destination: Path, options: dict[str, Any]) -> None: ...