Coverage for frappe_manager / logger / log.py: 80%
124 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
1import gzip
2import logging
3import logging.handlers
4import os
5import re
6import shutil
8from rich.logging import RichHandler
10from frappe_manager import CLI_LOG_DIRECTORY
11from frappe_manager.exceptions import ConfigurationError
12from frappe_manager.logger.live_aware_handler import LiveAwareRichHandler
14# Define MESSAGE log level
15CLEANUP = 25
18def namer(name):
19 return name + ".gz"
22def rotator(source, dest):
23 with open(source, "rb") as f_in, gzip.open(dest, "wb") as f_out:
24 shutil.copyfileobj(f_in, f_out)
25 os.remove(source)
28loggers: dict[str, logging.Logger] = {}
30# "Register" new loggin level
31logging.addLevelName(CLEANUP, "CLEANUP")
34class FMLOGGER(logging.Logger):
35 def cleanup(self, msg, *args, **kwargs):
36 if self.isEnabledFor(CLEANUP):
37 self._log(CLEANUP, msg, args, **kwargs)
40class ConsoleLogFilter(logging.Filter):
41 """
42 Filter to clean up log messages for console display.
44 This filter improves readability by:
45 - Truncating long JSON outputs
46 - Shortening repetitive separators
47 - Simplifying docker commands to show only the key operation
48 - Dimming less important debug information
50 Note: File logs remain unchanged with full details.
51 """
53 MAX_JSON_LENGTH = 120
54 MAX_LINE_LENGTH = 150
56 def filter(self, record: logging.LogRecord) -> bool:
57 msg = str(record.getMessage())
59 msg = re.sub(r"\[corr=[^\]]+\]\s*", "", msg)
61 if msg.strip() == "- -- -- -- -- -- -- -- -- -- -":
62 record.msg = "[dim]---[/dim]"
63 record.args = ()
64 return True
66 if msg.startswith("COMMAND:"):
67 simplified = self._simplify_command(msg)
68 record.msg = f"[dim]{simplified}[/dim]"
69 record.args = ()
70 return True
72 if msg.startswith("RETURN CODE:"):
73 if "RETURN CODE: 0" in msg:
74 return False
75 record.msg = f"[yellow]{msg}[/yellow]"
76 record.args = ()
77 return True
79 if msg.startswith("{") and len(msg) > self.MAX_JSON_LENGTH:
80 if '"Name":' in msg or '"Image":' in msg:
81 truncated = msg[: self.MAX_JSON_LENGTH] + "... [dim][see log file][/dim]"
82 record.msg = truncated
83 record.args = ()
84 else:
85 truncated = msg[: self.MAX_JSON_LENGTH] + "... [dim][truncated][/dim]"
86 record.msg = truncated
87 record.args = ()
88 return True
90 if len(msg) > self.MAX_LINE_LENGTH and not msg.startswith("["):
91 truncated = msg[: self.MAX_LINE_LENGTH] + "... [dim][truncated][/dim]"
92 record.msg = truncated
93 record.args = ()
94 return True
96 record.msg = msg
97 record.args = ()
98 return True
100 def _simplify_command(self, cmd_line: str) -> str:
101 """
102 Simplify docker compose command output to show only the essential operation.
104 Args:
105 cmd_line: The full COMMAND: line from logger
107 Returns:
108 Simplified command string
109 """
110 # Remove "COMMAND: " prefix
111 cmd = cmd_line.replace("COMMAND: ", "")
113 # Common patterns to simplify
114 simplifications = [
115 # Docker compose exec commands - show only the actual command
116 (r"docker compose -f [^\s]+ exec (?:--user \w+ )?(?:--workdir [^\s]+ )?(\w+) (.+)", r"[\1] \2"),
117 # Docker compose up/down/ps - show operation and service
118 (r"docker compose -f [^\s]+ (up|down|ps|start|stop|restart) (.+)", r"compose \1 \2"),
119 # Docker commands - show just the operation
120 (r"docker (\w+) (.+)", r"docker \1 ..."),
121 ]
123 for pattern, replacement in simplifications:
124 match = re.search(pattern, cmd)
125 if match:
126 try:
127 simplified = re.sub(pattern, replacement, cmd)
128 # Further trim if still too long
129 if len(simplified) > 100:
130 simplified = simplified[:97] + "..."
131 return f"COMMAND: {simplified}"
132 except Exception:
133 # If regex replacement fails, continue to next pattern
134 continue
136 # Fallback: just truncate long commands
137 if len(cmd) > 80:
138 return f"COMMAND: {cmd[:77]}..."
140 return f"COMMAND: {cmd}"
143def _add_console_handler(logger: logging.Logger, console_level: str) -> None:
144 """
145 Add a LiveAwareRichHandler to the logger for console output to stderr.
147 This handler coordinates with the Live spinner display to prevent
148 output corruption and visual artifacts.
150 Args:
151 logger: The logger instance to add the handler to
152 console_level: The logging level name (DEBUG, INFO, WARNING, ERROR)
153 """
154 for handler in logger.handlers[:]:
155 if isinstance(handler, (RichHandler, LiveAwareRichHandler)):
156 logger.removeHandler(handler)
158 from frappe_manager.output_manager import get_global_output_handler, has_global_output_handler
159 from frappe_manager.output_manager.logging_output import LoggingOutputHandler
160 from frappe_manager.output_manager.rich_output import RichOutputHandler
162 if has_global_output_handler():
163 output = get_global_output_handler()
165 if isinstance(output, LoggingOutputHandler):
166 underlying_output = output.delegate
167 else:
168 underlying_output = output
170 if isinstance(underlying_output, RichOutputHandler):
171 console_handler = LiveAwareRichHandler(
172 level=getattr(logging, console_level),
173 rich_tracebacks=True,
174 tracebacks_show_locals=True,
175 show_time=False,
176 show_path=False,
177 show_level=True,
178 markup=True,
179 console=underlying_output.stderr,
180 live_display=underlying_output.live,
181 output_lock=underlying_output._lock,
182 )
183 else:
184 console_handler = RichHandler(
185 level=getattr(logging, console_level),
186 rich_tracebacks=True,
187 tracebacks_show_locals=True,
188 show_time=False,
189 show_path=False,
190 show_level=True,
191 markup=True,
192 )
193 else:
194 console_handler = RichHandler(
195 level=getattr(logging, console_level),
196 rich_tracebacks=True,
197 tracebacks_show_locals=True,
198 show_time=False,
199 show_path=False,
200 show_level=True,
201 markup=True,
202 )
204 console_handler.setFormatter(logging.Formatter("%(message)s"))
205 console_handler.addFilter(ConsoleLogFilter())
206 logger.addHandler(console_handler)
209def _update_console_handler(logger: logging.Logger, console_level: str | None) -> None:
210 """
211 Update or remove the console handler based on console_level.
213 Args:
214 logger: The logger instance to update
215 console_level: The logging level name (DEBUG, INFO, WARNING, ERROR) or None to remove
216 """
217 if console_level:
218 _add_console_handler(logger, console_level)
219 else:
220 for handler in logger.handlers[:]:
221 if isinstance(handler, (RichHandler, LiveAwareRichHandler)):
222 logger.removeHandler(handler)
225def get_logger(
226 log_dir=CLI_LOG_DIRECTORY,
227 log_file_name="fm",
228 console_level: str | None = None,
229 file_level: str = "DEBUG",
230) -> FMLOGGER:
231 """
232 Creates a Log File and returns Logger object.
234 Args:
235 log_dir: Directory to store log files (default: CLI_LOG_DIRECTORY)
236 log_file_name: Name of the log file without extension (default: 'fm')
237 console_level: If specified, enables console logging at this level (DEBUG, INFO, WARNING, ERROR)
238 file_level: Log level for file handler (default: DEBUG)
240 Returns:
241 FMLOGGER instance configured with file handler and optional console handler
242 """
243 # Build Log File Full Path
244 logPath = log_dir / f"{log_file_name}.log"
246 try:
247 log_dir.mkdir(parents=True, exist_ok=True)
248 except PermissionError as e:
249 # Use print since logger hasn't been initialized yet
250 print(f"FATAL: Logging not working. {e}")
251 raise ConfigurationError(f"Logging not working: {e}", details={"log_dir": str(log_dir)})
253 # Create logger object and set the format for logging and other attributes
254 logger_exists = loggers.get(log_file_name) is not None
255 if logger_exists:
256 logger: logging.Logger | None = loggers.get(log_file_name)
257 else:
258 logging.setLoggerClass(FMLOGGER)
259 logger: logging.Logger | None = logging.getLogger(log_file_name)
260 logger.setLevel(logging.DEBUG)
262 # configured to roatate after 10 mb
263 handler = logging.handlers.RotatingFileHandler(logPath, "a+", maxBytes=10485760, backupCount=3)
264 handler.setFormatter(logging.Formatter("[%(asctime)s] %(levelname)s: %(message)s"))
265 handler.setLevel(getattr(logging, file_level.upper()))
266 handler.rotator = rotator
267 logger.addHandler(handler)
269 # save logger to dict loggers
270 loggers[log_file_name] = logger
272 # Add or update console handler only if:
273 # 1. Logger is being created for the first time (not logger_exists), OR
274 # 2. console_level is explicitly provided (not None)
275 # This prevents removing the console handler when business logic calls get_logger() without parameters
276 if logger and (not logger_exists or console_level is not None):
277 _update_console_handler(logger, console_level)
279 return logger