Coverage for frappe_manager / output_manager / rich_output.py: 59%

263 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-07-02 18:13 +0530

1""" 

2Rich terminal output handler. 

3 

4This implementation provides Rich terminal formatting with spinner displays, 

5live output, and interactive prompts. All functionality is self-contained 

6within this handler, implementing the OutputHandler interface. 

7""" 

8 

9import re 

10import threading 

11import warnings 

12from collections import deque 

13from collections.abc import Iterable, Sequence 

14from typing import Any 

15 

16import typer 

17from rich.console import Group 

18from rich.live import Live 

19from rich.padding import Padding 

20from rich.spinner import Spinner 

21from rich.table import Table 

22from rich.text import Text 

23 

24from frappe_manager.output_manager.base import OutputHandler 

25from frappe_manager.output_manager.console_singleton import get_stderr_console, get_stdout_console 

26from frappe_manager.output_manager.flags import OutputRefactoringFlags 

27 

28# Emoji constants for consistent output 

29EMOJI_WORKING = "⚙️" 

30EMOJI_SUCCESS = "⚡" 

31EMOJI_ERROR = "⛔" 

32EMOJI_WARNING = "⚠️" 

33EMOJI_INFO = "ℹ️" 

34 

35# Cache deprecation flag at module load (performance optimization) 

36_DEPRECATION_WARNINGS_ENABLED = False 

37try: 

38 _DEPRECATION_WARNINGS_ENABLED = OutputRefactoringFlags.use_context_managers() 

39except ImportError: 

40 pass 

41 

42 

43class RichOutputHandler(OutputHandler): 

44 """ 

45 Output handler that uses Rich terminal formatting. 

46 

47 This handler provides full Rich functionality including spinners, live displays, 

48 interactive prompts, and formatted output. It respects both TTY detection and 

49 the --non-interactive flag for proper behavior in different environments. 

50 """ 

51 

52 def __init__(self, verbose: bool = False): 

53 """ 

54 Initialize the Rich output handler. 

55 

56 Args: 

57 verbose: Show info and debug level messages 

58 """ 

59 super().__init__(verbose) 

60 

61 self.stdout = get_stdout_console() 

62 self.stderr = get_stderr_console() 

63 self.previous_head = None 

64 self.current_head = None 

65 

66 self.spinner = Spinner(text=Text(""), name="dots2", speed=1) 

67 self.live = Live(self.spinner, console=self.stderr, transient=True) 

68 

69 self._spinner_active = False 

70 self._current_text = None 

71 self._lock = threading.RLock() 

72 

73 @property 

74 def _is_interactive(self) -> bool: 

75 """ 

76 Check if interactive mode is enabled (considers both TTY and --non-interactive flag). 

77 

78 Returns: 

79 True if interactive features (spinners, prompts) should be shown 

80 """ 

81 if self._interactive is not None: 

82 return self._interactive 

83 return self._tty_available 

84 

85 @property 

86 def is_spinner_active(self) -> bool: 

87 """Check if spinner is currently active.""" 

88 return self._spinner_active 

89 

90 def start(self, text: str) -> None: 

91 """ 

92 Start a new operation with a status message. 

93 

94 Args: 

95 text: The initial status message to display 

96 """ 

97 with self._lock: 

98 if _DEPRECATION_WARNINGS_ENABLED: 

99 warnings.warn( 

100 "Direct output.start() is deprecated. Use context managers instead:\n" 

101 " from frappe_manager.output_manager import spinner\n" 

102 " with spinner(output, 'text'): ...\n" 

103 "See .plans/output-migration-guide.md for migration guide.", 

104 DeprecationWarning, 

105 stacklevel=2, 

106 ) 

107 

108 if OutputRefactoringFlags.strict_mode(): 

109 raise RuntimeError( 

110 "Direct output.start() is not allowed in strict mode. " 

111 "Use context managers: with spinner(output, 'text'): ...", 

112 ) 

113 

114 super().start(text) 

115 

116 self.current_head = self.previous_head = Text(text=text, style="bold blue") 

117 self.spinner = Spinner(text=self.current_head, name="dots2", speed=1) 

118 

119 self._spinner_active = True 

120 self._current_text = text 

121 

122 if self._is_interactive: 

123 self.live.start(refresh=True) 

124 self.live.update(self.spinner, refresh=True) 

125 else: 

126 self.stderr.print(f"{EMOJI_WORKING} {text}") 

127 

128 def change_head(self, text: str, style: str | None = "blue bold") -> None: 

129 """ 

130 Update the current operation status message. 

131 

132 Args: 

133 text: The new status message 

134 style: Optional Rich style string (e.g., "blue bold") 

135 """ 

136 if not self._is_interactive: 

137 self.stderr.print(f"{EMOJI_WORKING} {text}") 

138 return 

139 

140 self.previous_head = self.current_head 

141 self.current_head = text 

142 if style: 

143 self.spinner.update(text=Text(self.current_head, style="blue bold")) 

144 else: 

145 self.spinner.update(text=self.current_head) 

146 self.live.refresh() 

147 

148 def update_head(self, text: str) -> None: 

149 """ 

150 Update the head text and print the previous head. 

151 

152 Args: 

153 text: The new head text 

154 """ 

155 if not self._is_interactive: 

156 self.stderr.print(f"{EMOJI_WORKING} {text}") 

157 return 

158 

159 self.previous_head = self.current_head 

160 self.current_head = text 

161 self.live.console.print(self.previous_head, style="blue") 

162 self.spinner.update(text=Text(self.current_head, style="blue bold"), style="bold blue") 

163 

164 def stop(self) -> None: 

165 """Stop the current operation status display.""" 

166 with self._lock: 

167 if _DEPRECATION_WARNINGS_ENABLED: 

168 warnings.warn( 

169 "Direct output.stop() is deprecated. Use context managers instead:\n" 

170 " from frappe_manager.output_manager import spinner\n" 

171 " with spinner(output, 'text'): ...\n" 

172 "See .plans/output-migration-guide.md for migration guide.", 

173 DeprecationWarning, 

174 stacklevel=2, 

175 ) 

176 

177 if OutputRefactoringFlags.strict_mode(): 

178 raise RuntimeError( 

179 "Direct output.stop() is not allowed in strict mode. " 

180 "Use context managers: with spinner(output, 'text'): ...", 

181 ) 

182 

183 super().stop() 

184 

185 self._spinner_active = False 

186 self._current_text = None 

187 

188 if self._is_interactive: 

189 self.spinner.update() 

190 self.live.update(Text("", end="")) 

191 self.live.stop() 

192 

193 def print(self, text: str, emoji_code: str = ":zap:", prefix: str | None = None, **kwargs) -> None: 

194 """ 

195 Print a message with optional emoji and prefix. 

196 

197 Args: 

198 text: The message to print 

199 emoji_code: Emoji code to display (e.g., ":zap:") 

200 prefix: Optional prefix for the message 

201 **kwargs: Additional Rich print arguments 

202 """ 

203 if prefix: 

204 msg = f"{emoji_code} {prefix} {text}" 

205 else: 

206 msg = f"{emoji_code} {text}" 

207 

208 self.stderr.print(msg, **kwargs) 

209 

210 def debug(self, text: str, emoji_code: str = ":bug:", **kwargs) -> None: 

211 """ 

212 Display debug message if verbose mode is enabled. 

213 

214 Args: 

215 text: Debug message 

216 emoji_code: Emoji code to display (e.g., ":bug:") 

217 **kwargs: Additional Rich print arguments 

218 """ 

219 if self.verbose: 

220 self.print(text, emoji_code=emoji_code, **kwargs) 

221 

222 def info(self, text: str, emoji_code: str = ":information:", **kwargs) -> None: 

223 """ 

224 Display info message if verbose mode is enabled. 

225 

226 Args: 

227 text: Info message 

228 emoji_code: Emoji code to display (e.g., ":information:") 

229 **kwargs: Additional Rich print arguments 

230 """ 

231 if self.verbose: 

232 self.print(text, emoji_code=emoji_code, **kwargs) 

233 

234 def display_error(self, text: str, emoji_code: str = ":no_entry:") -> None: 

235 """ 

236 Display error message without raising exception. 

237 

238 Args: 

239 text: The error message 

240 emoji_code: Emoji code to display (e.g., ":no_entry:") 

241 """ 

242 self.stderr.print(f"{emoji_code} {text}") 

243 

244 def error(self, text: str, exception: Exception, emoji_code: str = ":no_entry:") -> None: 

245 """ 

246 Display an error message and raise the exception. 

247 

248 This method always raises the provided exception after displaying the error message. 

249 Use display_error() if you want to display an error without raising an exception. 

250 

251 Args: 

252 text: The error message 

253 exception: Exception to raise after displaying (required) 

254 emoji_code: Emoji code to display (e.g., ":no_entry:") 

255 

256 Raises: 

257 Exception: Always raises the provided exception 

258 """ 

259 self.stderr.print(f"{emoji_code} {text}") 

260 

261 if exception: 

262 raise exception 

263 

264 def warning(self, text: str, emoji_code: str = ":warning:") -> None: 

265 """ 

266 Display a warning message. 

267 

268 Args: 

269 text: The warning message 

270 emoji_code: Emoji code to display (e.g., ":warning:") 

271 """ 

272 self.stderr.print(f"{emoji_code} {text}") 

273 

274 def live_lines( 

275 self, 

276 data: Iterable[tuple[str, bytes]], 

277 stdout: bool = True, 

278 stderr: bool = True, 

279 lines: int = 4, 

280 padding: tuple[int, int, int, int] = (0, 0, 0, 2), 

281 stop_string: str | None = None, 

282 log_prefix: str = "=>", 

283 ) -> None: 

284 """ 

285 Display live streaming output from a process. 

286 

287 Args: 

288 data: Iterator yielding (source, line) tuples where source is "stdout" or "stderr" 

289 stdout: Whether to display stdout lines 

290 stderr: Whether to display stderr lines 

291 lines: Maximum number of lines to display 

292 padding: Padding around displayed lines (top, right, bottom, left) 

293 stop_string: String that stops display when found 

294 log_prefix: Prefix for each line 

295 """ 

296 if not self._is_interactive: 

297 while True: 

298 try: 

299 source, line = next(data) 

300 if isinstance(line, bytes): 

301 line = line.decode(errors="replace") 

302 

303 if "[==".lower() in line.lower() or "Updating files:".lower() in line.lower(): 

304 continue 

305 

306 if source == "stdout" and stdout: 

307 self.stdout.print(f"{log_prefix} {line.rstrip()}") 

308 elif source == "stderr" and stderr: 

309 self.stderr.print(f"{log_prefix} {line.rstrip()}") 

310 

311 if stop_string and stop_string.lower() in line.lower(): 

312 break 

313 

314 except KeyboardInterrupt: 

315 break 

316 except StopIteration: 

317 break 

318 return 

319 

320 max_height = lines 

321 displayed_lines = deque(maxlen=max_height) 

322 

323 while True: 

324 try: 

325 source, line = next(data) 

326 if isinstance(line, bytes): 

327 line = line.decode(errors="replace") 

328 

329 if "[==".lower() in line.lower() or "Updating files:".lower() in line.lower(): 

330 continue 

331 

332 if source == "stdout" and stdout: 

333 displayed_lines.append(line) 

334 

335 if source == "stderr" and stderr: 

336 displayed_lines.append(line) 

337 

338 if stop_string and stop_string.lower() in line.lower(): 

339 raise StopIteration 

340 

341 # Create fresh table for each update (prevents IndexError during Rich rendering) 

342 table = Table(show_header=False, box=None) 

343 table.add_column() 

344 

345 for linex in list(displayed_lines): 

346 prefix_text = Text(log_prefix + " ", no_wrap=True) 

347 table_line = Text.from_ansi(linex) 

348 prefix_text.append_text(table_line) 

349 table.add_row(prefix_text) 

350 

351 self.update_live(table, padding=padding) 

352 self.live.refresh() 

353 

354 except KeyboardInterrupt: 

355 self.live.refresh() 

356 

357 except StopIteration: 

358 self.update_live() 

359 break 

360 

361 def update_live(self, renderable: Any = None, padding: tuple[int, int, int, int] = (0, 0, 0, 0)) -> None: 

362 """ 

363 Update the live display with new content. 

364 

365 Args: 

366 renderable: Rich renderable object to display 

367 padding: Padding around content (top, right, bottom, left) 

368 """ 

369 if not self._is_interactive: 

370 return 

371 

372 if renderable: 

373 if padding: 

374 renderable = Padding(renderable, padding) 

375 

376 group = Group(self.spinner, renderable) 

377 self.live.update(group) 

378 else: 

379 self.live.update(self.spinner) 

380 self.live.refresh() 

381 

382 def prompt_ask( 

383 self, 

384 prompt: str = "", 

385 choices: Sequence[str] | None = None, 

386 default: str | None = None, 

387 force_yes: bool = False, 

388 required_flag: str | None = None, 

389 **kwargs, 

390 ) -> str: 

391 from frappe_manager.exceptions import NonInteractiveError 

392 

393 if force_yes: 

394 return "yes" 

395 

396 if not self.is_interactive(): 

397 if required_flag: 

398 raise NonInteractiveError( 

399 f"Cannot prompt in non-interactive mode: {prompt}", 

400 suggestions=[f"Use: {required_flag}"], 

401 ) 

402 if default is None: 

403 suggestions = [] 

404 if choices: 

405 suggestions.append(f"Pass one of: {', '.join(choices)}") 

406 suggestions.append("Run without --non-interactive to enable prompts") 

407 raise NonInteractiveError( 

408 f"Cannot prompt in non-interactive mode: {prompt}", 

409 suggestions=suggestions if suggestions else None, 

410 ) 

411 return default 

412 

413 prompt_clean = re.sub(r"\[/?[a-z]+\]", "", prompt) 

414 

415 if self._is_interactive: 

416 from InquirerPy import inquirer 

417 from InquirerPy.utils import InquirerPyStyle 

418 

419 self.spinner.update() 

420 self.live.stop() 

421 

422 custom_style = InquirerPyStyle( 

423 { 

424 "questionmark": "#e5c07b", 

425 "answered_question": "", 

426 "answer": "#61afef bold", 

427 "pointer": "#61afef bold", 

428 "highlighted": "#61afef bold", 

429 "selected": "#e5c07b", 

430 }, 

431 ) 

432 

433 if choices: 

434 value = inquirer.select( 

435 message=prompt_clean, 

436 choices=choices, 

437 default=default, 

438 vi_mode=True, 

439 qmark="", 

440 amark="", 

441 style=custom_style, 

442 ).execute() 

443 else: 

444 value = inquirer.text( 

445 message=prompt_clean, 

446 default=default or "", 

447 vi_mode=True, 

448 qmark="", 

449 amark="", 

450 style=custom_style, 

451 ).execute() 

452 

453 self.start("Working") 

454 return value 

455 if choices: 

456 choices_str = "/".join(str(c) for c in choices) 

457 prompt_full = f"{prompt_clean} [{choices_str}]" 

458 if default: 

459 prompt_full += f" (default: {default})" 

460 prompt_full += ": " 

461 

462 value = input(prompt_full).strip() 

463 if not value and default: 

464 return default 

465 

466 if value not in choices: 

467 self.stderr.print(f"{EMOJI_WARNING} Invalid choice '{value}', using default: {default}") 

468 return default or choices[0] 

469 return value 

470 prompt_full = prompt_clean 

471 if default: 

472 prompt_full += f" (default: {default})" 

473 prompt_full += ": " 

474 

475 value = input(prompt_full).strip() 

476 return value if value else (default or "") 

477 

478 def prompt_fuzzy( 

479 self, 

480 prompt: str, 

481 choices: list[str], 

482 default: str | None = None, 

483 required_flag: str | None = None, 

484 **kwargs, 

485 ) -> str: 

486 from frappe_manager.exceptions import NonInteractiveError 

487 

488 if not self.is_interactive(): 

489 if required_flag: 

490 raise NonInteractiveError( 

491 f"Cannot prompt in non-interactive mode: {prompt}", 

492 suggestions=[f"Provide: {required_flag}"], 

493 ) 

494 if default is None: 

495 raise NonInteractiveError( 

496 f"Cannot prompt in non-interactive mode: {prompt}", 

497 suggestions=["Run without --non-interactive to enable prompts"], 

498 ) 

499 return default 

500 

501 if self._is_interactive: 

502 from InquirerPy import inquirer 

503 

504 self.spinner.update() 

505 self.live.stop() 

506 

507 qmark = kwargs.pop("qmark", "🤔") 

508 amark = kwargs.pop("amark", "🤔") 

509 vi_mode = kwargs.pop("vi_mode", True) 

510 mandatory = kwargs.pop("mandatory", True) 

511 

512 value = inquirer.fuzzy( 

513 message=prompt, 

514 choices=choices, 

515 vi_mode=vi_mode, 

516 mandatory=mandatory, 

517 qmark=qmark, 

518 amark=amark, 

519 **kwargs, 

520 ).execute() 

521 

522 self.start("Working") 

523 return value 

524 raise NonInteractiveError( 

525 f"Cannot prompt in non-interactive mode: {prompt}", 

526 suggestions=["Run without --non-interactive to enable prompts"], 

527 ) 

528 

529 @property 

530 def should_stream_docker(self) -> bool: 

531 return self._is_interactive and self.is_spinner_active and not self.verbose 

532 

533 def print_data(self, data: Any, **kwargs) -> None: 

534 import json 

535 

536 from rich.table import Table as RichTable 

537 

538 mode = OutputRefactoringFlags.stream_separation_mode() 

539 

540 if mode == "legacy": 

541 if isinstance(data, RichTable): 

542 self.stderr.print(data) 

543 else: 

544 self.stderr.print(str(data)) 

545 elif isinstance(data, RichTable): 

546 self.stdout.print(data) 

547 elif isinstance(data, (dict, list)): 

548 json_str = json.dumps(data, indent=2, default=str) 

549 self.stdout.print(json_str) 

550 else: 

551 self.stdout.print(str(data)) 

552 

553 def print_status(self, text: str, emoji_code: str = ":zap:", **kwargs) -> None: 

554 self.stderr.print(f"{emoji_code} {text}", **kwargs) 

555 

556 def exit(self, text: str, emoji_code: str = ":no_entry:", os_exit=False, error_msg=None): 

557 """ 

558 Exit with error message. 

559 

560 Args: 

561 text: The text to be printed 

562 emoji_code: The emoji code to be displayed before the text (default: ":no_entry:") 

563 os_exit: If True, the program will exit with status code 1 (default: False) 

564 error_msg: The error message to be displayed after the text (default: None) 

565 """ 

566 self.stop() 

567 

568 to_print = f"{emoji_code} {text}" 

569 if error_msg: 

570 to_print = f"{emoji_code} {text}\n Error : {error_msg}" 

571 

572 self.stderr.print(to_print) 

573 

574 if os_exit: 

575 exit(1) 

576 

577 raise typer.Exit(1)