Coverage for frappe_manager / output_manager / rich_output.py: 59%
263 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
1"""
2Rich terminal output handler.
4This implementation provides Rich terminal formatting with spinner displays,
5live output, and interactive prompts. All functionality is self-contained
6within this handler, implementing the OutputHandler interface.
7"""
9import re
10import threading
11import warnings
12from collections import deque
13from collections.abc import Iterable, Sequence
14from typing import Any
16import typer
17from rich.console import Group
18from rich.live import Live
19from rich.padding import Padding
20from rich.spinner import Spinner
21from rich.table import Table
22from rich.text import Text
24from frappe_manager.output_manager.base import OutputHandler
25from frappe_manager.output_manager.console_singleton import get_stderr_console, get_stdout_console
26from frappe_manager.output_manager.flags import OutputRefactoringFlags
28# Emoji constants for consistent output
29EMOJI_WORKING = "⚙️"
30EMOJI_SUCCESS = "⚡"
31EMOJI_ERROR = "⛔"
32EMOJI_WARNING = "⚠️"
33EMOJI_INFO = "ℹ️"
35# Cache deprecation flag at module load (performance optimization)
36_DEPRECATION_WARNINGS_ENABLED = False
37try:
38 _DEPRECATION_WARNINGS_ENABLED = OutputRefactoringFlags.use_context_managers()
39except ImportError:
40 pass
43class RichOutputHandler(OutputHandler):
44 """
45 Output handler that uses Rich terminal formatting.
47 This handler provides full Rich functionality including spinners, live displays,
48 interactive prompts, and formatted output. It respects both TTY detection and
49 the --non-interactive flag for proper behavior in different environments.
50 """
52 def __init__(self, verbose: bool = False):
53 """
54 Initialize the Rich output handler.
56 Args:
57 verbose: Show info and debug level messages
58 """
59 super().__init__(verbose)
61 self.stdout = get_stdout_console()
62 self.stderr = get_stderr_console()
63 self.previous_head = None
64 self.current_head = None
66 self.spinner = Spinner(text=Text(""), name="dots2", speed=1)
67 self.live = Live(self.spinner, console=self.stderr, transient=True)
69 self._spinner_active = False
70 self._current_text = None
71 self._lock = threading.RLock()
73 @property
74 def _is_interactive(self) -> bool:
75 """
76 Check if interactive mode is enabled (considers both TTY and --non-interactive flag).
78 Returns:
79 True if interactive features (spinners, prompts) should be shown
80 """
81 if self._interactive is not None:
82 return self._interactive
83 return self._tty_available
85 @property
86 def is_spinner_active(self) -> bool:
87 """Check if spinner is currently active."""
88 return self._spinner_active
90 def start(self, text: str) -> None:
91 """
92 Start a new operation with a status message.
94 Args:
95 text: The initial status message to display
96 """
97 with self._lock:
98 if _DEPRECATION_WARNINGS_ENABLED:
99 warnings.warn(
100 "Direct output.start() is deprecated. Use context managers instead:\n"
101 " from frappe_manager.output_manager import spinner\n"
102 " with spinner(output, 'text'): ...\n"
103 "See .plans/output-migration-guide.md for migration guide.",
104 DeprecationWarning,
105 stacklevel=2,
106 )
108 if OutputRefactoringFlags.strict_mode():
109 raise RuntimeError(
110 "Direct output.start() is not allowed in strict mode. "
111 "Use context managers: with spinner(output, 'text'): ...",
112 )
114 super().start(text)
116 self.current_head = self.previous_head = Text(text=text, style="bold blue")
117 self.spinner = Spinner(text=self.current_head, name="dots2", speed=1)
119 self._spinner_active = True
120 self._current_text = text
122 if self._is_interactive:
123 self.live.start(refresh=True)
124 self.live.update(self.spinner, refresh=True)
125 else:
126 self.stderr.print(f"{EMOJI_WORKING} {text}")
128 def change_head(self, text: str, style: str | None = "blue bold") -> None:
129 """
130 Update the current operation status message.
132 Args:
133 text: The new status message
134 style: Optional Rich style string (e.g., "blue bold")
135 """
136 if not self._is_interactive:
137 self.stderr.print(f"{EMOJI_WORKING} {text}")
138 return
140 self.previous_head = self.current_head
141 self.current_head = text
142 if style:
143 self.spinner.update(text=Text(self.current_head, style="blue bold"))
144 else:
145 self.spinner.update(text=self.current_head)
146 self.live.refresh()
148 def update_head(self, text: str) -> None:
149 """
150 Update the head text and print the previous head.
152 Args:
153 text: The new head text
154 """
155 if not self._is_interactive:
156 self.stderr.print(f"{EMOJI_WORKING} {text}")
157 return
159 self.previous_head = self.current_head
160 self.current_head = text
161 self.live.console.print(self.previous_head, style="blue")
162 self.spinner.update(text=Text(self.current_head, style="blue bold"), style="bold blue")
164 def stop(self) -> None:
165 """Stop the current operation status display."""
166 with self._lock:
167 if _DEPRECATION_WARNINGS_ENABLED:
168 warnings.warn(
169 "Direct output.stop() is deprecated. Use context managers instead:\n"
170 " from frappe_manager.output_manager import spinner\n"
171 " with spinner(output, 'text'): ...\n"
172 "See .plans/output-migration-guide.md for migration guide.",
173 DeprecationWarning,
174 stacklevel=2,
175 )
177 if OutputRefactoringFlags.strict_mode():
178 raise RuntimeError(
179 "Direct output.stop() is not allowed in strict mode. "
180 "Use context managers: with spinner(output, 'text'): ...",
181 )
183 super().stop()
185 self._spinner_active = False
186 self._current_text = None
188 if self._is_interactive:
189 self.spinner.update()
190 self.live.update(Text("", end=""))
191 self.live.stop()
193 def print(self, text: str, emoji_code: str = ":zap:", prefix: str | None = None, **kwargs) -> None:
194 """
195 Print a message with optional emoji and prefix.
197 Args:
198 text: The message to print
199 emoji_code: Emoji code to display (e.g., ":zap:")
200 prefix: Optional prefix for the message
201 **kwargs: Additional Rich print arguments
202 """
203 if prefix:
204 msg = f"{emoji_code} {prefix} {text}"
205 else:
206 msg = f"{emoji_code} {text}"
208 self.stderr.print(msg, **kwargs)
210 def debug(self, text: str, emoji_code: str = ":bug:", **kwargs) -> None:
211 """
212 Display debug message if verbose mode is enabled.
214 Args:
215 text: Debug message
216 emoji_code: Emoji code to display (e.g., ":bug:")
217 **kwargs: Additional Rich print arguments
218 """
219 if self.verbose:
220 self.print(text, emoji_code=emoji_code, **kwargs)
222 def info(self, text: str, emoji_code: str = ":information:", **kwargs) -> None:
223 """
224 Display info message if verbose mode is enabled.
226 Args:
227 text: Info message
228 emoji_code: Emoji code to display (e.g., ":information:")
229 **kwargs: Additional Rich print arguments
230 """
231 if self.verbose:
232 self.print(text, emoji_code=emoji_code, **kwargs)
234 def display_error(self, text: str, emoji_code: str = ":no_entry:") -> None:
235 """
236 Display error message without raising exception.
238 Args:
239 text: The error message
240 emoji_code: Emoji code to display (e.g., ":no_entry:")
241 """
242 self.stderr.print(f"{emoji_code} {text}")
244 def error(self, text: str, exception: Exception, emoji_code: str = ":no_entry:") -> None:
245 """
246 Display an error message and raise the exception.
248 This method always raises the provided exception after displaying the error message.
249 Use display_error() if you want to display an error without raising an exception.
251 Args:
252 text: The error message
253 exception: Exception to raise after displaying (required)
254 emoji_code: Emoji code to display (e.g., ":no_entry:")
256 Raises:
257 Exception: Always raises the provided exception
258 """
259 self.stderr.print(f"{emoji_code} {text}")
261 if exception:
262 raise exception
264 def warning(self, text: str, emoji_code: str = ":warning:") -> None:
265 """
266 Display a warning message.
268 Args:
269 text: The warning message
270 emoji_code: Emoji code to display (e.g., ":warning:")
271 """
272 self.stderr.print(f"{emoji_code} {text}")
274 def live_lines(
275 self,
276 data: Iterable[tuple[str, bytes]],
277 stdout: bool = True,
278 stderr: bool = True,
279 lines: int = 4,
280 padding: tuple[int, int, int, int] = (0, 0, 0, 2),
281 stop_string: str | None = None,
282 log_prefix: str = "=>",
283 ) -> None:
284 """
285 Display live streaming output from a process.
287 Args:
288 data: Iterator yielding (source, line) tuples where source is "stdout" or "stderr"
289 stdout: Whether to display stdout lines
290 stderr: Whether to display stderr lines
291 lines: Maximum number of lines to display
292 padding: Padding around displayed lines (top, right, bottom, left)
293 stop_string: String that stops display when found
294 log_prefix: Prefix for each line
295 """
296 if not self._is_interactive:
297 while True:
298 try:
299 source, line = next(data)
300 if isinstance(line, bytes):
301 line = line.decode(errors="replace")
303 if "[==".lower() in line.lower() or "Updating files:".lower() in line.lower():
304 continue
306 if source == "stdout" and stdout:
307 self.stdout.print(f"{log_prefix} {line.rstrip()}")
308 elif source == "stderr" and stderr:
309 self.stderr.print(f"{log_prefix} {line.rstrip()}")
311 if stop_string and stop_string.lower() in line.lower():
312 break
314 except KeyboardInterrupt:
315 break
316 except StopIteration:
317 break
318 return
320 max_height = lines
321 displayed_lines = deque(maxlen=max_height)
323 while True:
324 try:
325 source, line = next(data)
326 if isinstance(line, bytes):
327 line = line.decode(errors="replace")
329 if "[==".lower() in line.lower() or "Updating files:".lower() in line.lower():
330 continue
332 if source == "stdout" and stdout:
333 displayed_lines.append(line)
335 if source == "stderr" and stderr:
336 displayed_lines.append(line)
338 if stop_string and stop_string.lower() in line.lower():
339 raise StopIteration
341 # Create fresh table for each update (prevents IndexError during Rich rendering)
342 table = Table(show_header=False, box=None)
343 table.add_column()
345 for linex in list(displayed_lines):
346 prefix_text = Text(log_prefix + " ", no_wrap=True)
347 table_line = Text.from_ansi(linex)
348 prefix_text.append_text(table_line)
349 table.add_row(prefix_text)
351 self.update_live(table, padding=padding)
352 self.live.refresh()
354 except KeyboardInterrupt:
355 self.live.refresh()
357 except StopIteration:
358 self.update_live()
359 break
361 def update_live(self, renderable: Any = None, padding: tuple[int, int, int, int] = (0, 0, 0, 0)) -> None:
362 """
363 Update the live display with new content.
365 Args:
366 renderable: Rich renderable object to display
367 padding: Padding around content (top, right, bottom, left)
368 """
369 if not self._is_interactive:
370 return
372 if renderable:
373 if padding:
374 renderable = Padding(renderable, padding)
376 group = Group(self.spinner, renderable)
377 self.live.update(group)
378 else:
379 self.live.update(self.spinner)
380 self.live.refresh()
382 def prompt_ask(
383 self,
384 prompt: str = "",
385 choices: Sequence[str] | None = None,
386 default: str | None = None,
387 force_yes: bool = False,
388 required_flag: str | None = None,
389 **kwargs,
390 ) -> str:
391 from frappe_manager.exceptions import NonInteractiveError
393 if force_yes:
394 return "yes"
396 if not self.is_interactive():
397 if required_flag:
398 raise NonInteractiveError(
399 f"Cannot prompt in non-interactive mode: {prompt}",
400 suggestions=[f"Use: {required_flag}"],
401 )
402 if default is None:
403 suggestions = []
404 if choices:
405 suggestions.append(f"Pass one of: {', '.join(choices)}")
406 suggestions.append("Run without --non-interactive to enable prompts")
407 raise NonInteractiveError(
408 f"Cannot prompt in non-interactive mode: {prompt}",
409 suggestions=suggestions if suggestions else None,
410 )
411 return default
413 prompt_clean = re.sub(r"\[/?[a-z]+\]", "", prompt)
415 if self._is_interactive:
416 from InquirerPy import inquirer
417 from InquirerPy.utils import InquirerPyStyle
419 self.spinner.update()
420 self.live.stop()
422 custom_style = InquirerPyStyle(
423 {
424 "questionmark": "#e5c07b",
425 "answered_question": "",
426 "answer": "#61afef bold",
427 "pointer": "#61afef bold",
428 "highlighted": "#61afef bold",
429 "selected": "#e5c07b",
430 },
431 )
433 if choices:
434 value = inquirer.select(
435 message=prompt_clean,
436 choices=choices,
437 default=default,
438 vi_mode=True,
439 qmark="",
440 amark="",
441 style=custom_style,
442 ).execute()
443 else:
444 value = inquirer.text(
445 message=prompt_clean,
446 default=default or "",
447 vi_mode=True,
448 qmark="",
449 amark="",
450 style=custom_style,
451 ).execute()
453 self.start("Working")
454 return value
455 if choices:
456 choices_str = "/".join(str(c) for c in choices)
457 prompt_full = f"{prompt_clean} [{choices_str}]"
458 if default:
459 prompt_full += f" (default: {default})"
460 prompt_full += ": "
462 value = input(prompt_full).strip()
463 if not value and default:
464 return default
466 if value not in choices:
467 self.stderr.print(f"{EMOJI_WARNING} Invalid choice '{value}', using default: {default}")
468 return default or choices[0]
469 return value
470 prompt_full = prompt_clean
471 if default:
472 prompt_full += f" (default: {default})"
473 prompt_full += ": "
475 value = input(prompt_full).strip()
476 return value if value else (default or "")
478 def prompt_fuzzy(
479 self,
480 prompt: str,
481 choices: list[str],
482 default: str | None = None,
483 required_flag: str | None = None,
484 **kwargs,
485 ) -> str:
486 from frappe_manager.exceptions import NonInteractiveError
488 if not self.is_interactive():
489 if required_flag:
490 raise NonInteractiveError(
491 f"Cannot prompt in non-interactive mode: {prompt}",
492 suggestions=[f"Provide: {required_flag}"],
493 )
494 if default is None:
495 raise NonInteractiveError(
496 f"Cannot prompt in non-interactive mode: {prompt}",
497 suggestions=["Run without --non-interactive to enable prompts"],
498 )
499 return default
501 if self._is_interactive:
502 from InquirerPy import inquirer
504 self.spinner.update()
505 self.live.stop()
507 qmark = kwargs.pop("qmark", "🤔")
508 amark = kwargs.pop("amark", "🤔")
509 vi_mode = kwargs.pop("vi_mode", True)
510 mandatory = kwargs.pop("mandatory", True)
512 value = inquirer.fuzzy(
513 message=prompt,
514 choices=choices,
515 vi_mode=vi_mode,
516 mandatory=mandatory,
517 qmark=qmark,
518 amark=amark,
519 **kwargs,
520 ).execute()
522 self.start("Working")
523 return value
524 raise NonInteractiveError(
525 f"Cannot prompt in non-interactive mode: {prompt}",
526 suggestions=["Run without --non-interactive to enable prompts"],
527 )
529 @property
530 def should_stream_docker(self) -> bool:
531 return self._is_interactive and self.is_spinner_active and not self.verbose
533 def print_data(self, data: Any, **kwargs) -> None:
534 import json
536 from rich.table import Table as RichTable
538 mode = OutputRefactoringFlags.stream_separation_mode()
540 if mode == "legacy":
541 if isinstance(data, RichTable):
542 self.stderr.print(data)
543 else:
544 self.stderr.print(str(data))
545 elif isinstance(data, RichTable):
546 self.stdout.print(data)
547 elif isinstance(data, (dict, list)):
548 json_str = json.dumps(data, indent=2, default=str)
549 self.stdout.print(json_str)
550 else:
551 self.stdout.print(str(data))
553 def print_status(self, text: str, emoji_code: str = ":zap:", **kwargs) -> None:
554 self.stderr.print(f"{emoji_code} {text}", **kwargs)
556 def exit(self, text: str, emoji_code: str = ":no_entry:", os_exit=False, error_msg=None):
557 """
558 Exit with error message.
560 Args:
561 text: The text to be printed
562 emoji_code: The emoji code to be displayed before the text (default: ":no_entry:")
563 os_exit: If True, the program will exit with status code 1 (default: False)
564 error_msg: The error message to be displayed after the text (default: None)
565 """
566 self.stop()
568 to_print = f"{emoji_code} {text}"
569 if error_msg:
570 to_print = f"{emoji_code} {text}\n Error : {error_msg}"
572 self.stderr.print(to_print)
574 if os_exit:
575 exit(1)
577 raise typer.Exit(1)