Coverage for frappe_manager / logger / log.py: 80%

124 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-07-02 18:13 +0530

1import gzip 

2import logging 

3import logging.handlers 

4import os 

5import re 

6import shutil 

7 

8from rich.logging import RichHandler 

9 

10from frappe_manager import CLI_LOG_DIRECTORY 

11from frappe_manager.exceptions import ConfigurationError 

12from frappe_manager.logger.live_aware_handler import LiveAwareRichHandler 

13 

14# Define MESSAGE log level 

15CLEANUP = 25 

16 

17 

18def namer(name): 

19 return name + ".gz" 

20 

21 

22def rotator(source, dest): 

23 with open(source, "rb") as f_in, gzip.open(dest, "wb") as f_out: 

24 shutil.copyfileobj(f_in, f_out) 

25 os.remove(source) 

26 

27 

28loggers: dict[str, logging.Logger] = {} 

29 

30# "Register" new loggin level 

31logging.addLevelName(CLEANUP, "CLEANUP") 

32 

33 

34class FMLOGGER(logging.Logger): 

35 def cleanup(self, msg, *args, **kwargs): 

36 if self.isEnabledFor(CLEANUP): 

37 self._log(CLEANUP, msg, args, **kwargs) 

38 

39 

40class ConsoleLogFilter(logging.Filter): 

41 """ 

42 Filter to clean up log messages for console display. 

43 

44 This filter improves readability by: 

45 - Truncating long JSON outputs 

46 - Shortening repetitive separators 

47 - Simplifying docker commands to show only the key operation 

48 - Dimming less important debug information 

49 

50 Note: File logs remain unchanged with full details. 

51 """ 

52 

53 MAX_JSON_LENGTH = 120 

54 MAX_LINE_LENGTH = 150 

55 

56 def filter(self, record: logging.LogRecord) -> bool: 

57 msg = str(record.getMessage()) 

58 

59 msg = re.sub(r"\[corr=[^\]]+\]\s*", "", msg) 

60 

61 if msg.strip() == "- -- -- -- -- -- -- -- -- -- -": 

62 record.msg = "[dim]---[/dim]" 

63 record.args = () 

64 return True 

65 

66 if msg.startswith("COMMAND:"): 

67 simplified = self._simplify_command(msg) 

68 record.msg = f"[dim]{simplified}[/dim]" 

69 record.args = () 

70 return True 

71 

72 if msg.startswith("RETURN CODE:"): 

73 if "RETURN CODE: 0" in msg: 

74 return False 

75 record.msg = f"[yellow]{msg}[/yellow]" 

76 record.args = () 

77 return True 

78 

79 if msg.startswith("{") and len(msg) > self.MAX_JSON_LENGTH: 

80 if '"Name":' in msg or '"Image":' in msg: 

81 truncated = msg[: self.MAX_JSON_LENGTH] + "... [dim][see log file][/dim]" 

82 record.msg = truncated 

83 record.args = () 

84 else: 

85 truncated = msg[: self.MAX_JSON_LENGTH] + "... [dim][truncated][/dim]" 

86 record.msg = truncated 

87 record.args = () 

88 return True 

89 

90 if len(msg) > self.MAX_LINE_LENGTH and not msg.startswith("["): 

91 truncated = msg[: self.MAX_LINE_LENGTH] + "... [dim][truncated][/dim]" 

92 record.msg = truncated 

93 record.args = () 

94 return True 

95 

96 record.msg = msg 

97 record.args = () 

98 return True 

99 

100 def _simplify_command(self, cmd_line: str) -> str: 

101 """ 

102 Simplify docker compose command output to show only the essential operation. 

103 

104 Args: 

105 cmd_line: The full COMMAND: line from logger 

106 

107 Returns: 

108 Simplified command string 

109 """ 

110 # Remove "COMMAND: " prefix 

111 cmd = cmd_line.replace("COMMAND: ", "") 

112 

113 # Common patterns to simplify 

114 simplifications = [ 

115 # Docker compose exec commands - show only the actual command 

116 (r"docker compose -f [^\s]+ exec (?:--user \w+ )?(?:--workdir [^\s]+ )?(\w+) (.+)", r"[\1] \2"), 

117 # Docker compose up/down/ps - show operation and service 

118 (r"docker compose -f [^\s]+ (up|down|ps|start|stop|restart) (.+)", r"compose \1 \2"), 

119 # Docker commands - show just the operation 

120 (r"docker (\w+) (.+)", r"docker \1 ..."), 

121 ] 

122 

123 for pattern, replacement in simplifications: 

124 match = re.search(pattern, cmd) 

125 if match: 

126 try: 

127 simplified = re.sub(pattern, replacement, cmd) 

128 # Further trim if still too long 

129 if len(simplified) > 100: 

130 simplified = simplified[:97] + "..." 

131 return f"COMMAND: {simplified}" 

132 except Exception: 

133 # If regex replacement fails, continue to next pattern 

134 continue 

135 

136 # Fallback: just truncate long commands 

137 if len(cmd) > 80: 

138 return f"COMMAND: {cmd[:77]}..." 

139 

140 return f"COMMAND: {cmd}" 

141 

142 

143def _add_console_handler(logger: logging.Logger, console_level: str) -> None: 

144 """ 

145 Add a LiveAwareRichHandler to the logger for console output to stderr. 

146 

147 This handler coordinates with the Live spinner display to prevent 

148 output corruption and visual artifacts. 

149 

150 Args: 

151 logger: The logger instance to add the handler to 

152 console_level: The logging level name (DEBUG, INFO, WARNING, ERROR) 

153 """ 

154 for handler in logger.handlers[:]: 

155 if isinstance(handler, (RichHandler, LiveAwareRichHandler)): 

156 logger.removeHandler(handler) 

157 

158 from frappe_manager.output_manager import get_global_output_handler, has_global_output_handler 

159 from frappe_manager.output_manager.logging_output import LoggingOutputHandler 

160 from frappe_manager.output_manager.rich_output import RichOutputHandler 

161 

162 if has_global_output_handler(): 

163 output = get_global_output_handler() 

164 

165 if isinstance(output, LoggingOutputHandler): 

166 underlying_output = output.delegate 

167 else: 

168 underlying_output = output 

169 

170 if isinstance(underlying_output, RichOutputHandler): 

171 console_handler = LiveAwareRichHandler( 

172 level=getattr(logging, console_level), 

173 rich_tracebacks=True, 

174 tracebacks_show_locals=True, 

175 show_time=False, 

176 show_path=False, 

177 show_level=True, 

178 markup=True, 

179 console=underlying_output.stderr, 

180 live_display=underlying_output.live, 

181 output_lock=underlying_output._lock, 

182 ) 

183 else: 

184 console_handler = RichHandler( 

185 level=getattr(logging, console_level), 

186 rich_tracebacks=True, 

187 tracebacks_show_locals=True, 

188 show_time=False, 

189 show_path=False, 

190 show_level=True, 

191 markup=True, 

192 ) 

193 else: 

194 console_handler = RichHandler( 

195 level=getattr(logging, console_level), 

196 rich_tracebacks=True, 

197 tracebacks_show_locals=True, 

198 show_time=False, 

199 show_path=False, 

200 show_level=True, 

201 markup=True, 

202 ) 

203 

204 console_handler.setFormatter(logging.Formatter("%(message)s")) 

205 console_handler.addFilter(ConsoleLogFilter()) 

206 logger.addHandler(console_handler) 

207 

208 

209def _update_console_handler(logger: logging.Logger, console_level: str | None) -> None: 

210 """ 

211 Update or remove the console handler based on console_level. 

212 

213 Args: 

214 logger: The logger instance to update 

215 console_level: The logging level name (DEBUG, INFO, WARNING, ERROR) or None to remove 

216 """ 

217 if console_level: 

218 _add_console_handler(logger, console_level) 

219 else: 

220 for handler in logger.handlers[:]: 

221 if isinstance(handler, (RichHandler, LiveAwareRichHandler)): 

222 logger.removeHandler(handler) 

223 

224 

225def get_logger( 

226 log_dir=CLI_LOG_DIRECTORY, 

227 log_file_name="fm", 

228 console_level: str | None = None, 

229 file_level: str = "DEBUG", 

230) -> FMLOGGER: 

231 """ 

232 Creates a Log File and returns Logger object. 

233 

234 Args: 

235 log_dir: Directory to store log files (default: CLI_LOG_DIRECTORY) 

236 log_file_name: Name of the log file without extension (default: 'fm') 

237 console_level: If specified, enables console logging at this level (DEBUG, INFO, WARNING, ERROR) 

238 file_level: Log level for file handler (default: DEBUG) 

239 

240 Returns: 

241 FMLOGGER instance configured with file handler and optional console handler 

242 """ 

243 # Build Log File Full Path 

244 logPath = log_dir / f"{log_file_name}.log" 

245 

246 try: 

247 log_dir.mkdir(parents=True, exist_ok=True) 

248 except PermissionError as e: 

249 # Use print since logger hasn't been initialized yet 

250 print(f"FATAL: Logging not working. {e}") 

251 raise ConfigurationError(f"Logging not working: {e}", details={"log_dir": str(log_dir)}) 

252 

253 # Create logger object and set the format for logging and other attributes 

254 logger_exists = loggers.get(log_file_name) is not None 

255 if logger_exists: 

256 logger: logging.Logger | None = loggers.get(log_file_name) 

257 else: 

258 logging.setLoggerClass(FMLOGGER) 

259 logger: logging.Logger | None = logging.getLogger(log_file_name) 

260 logger.setLevel(logging.DEBUG) 

261 

262 # configured to roatate after 10 mb 

263 handler = logging.handlers.RotatingFileHandler(logPath, "a+", maxBytes=10485760, backupCount=3) 

264 handler.setFormatter(logging.Formatter("[%(asctime)s] %(levelname)s: %(message)s")) 

265 handler.setLevel(getattr(logging, file_level.upper())) 

266 handler.rotator = rotator 

267 logger.addHandler(handler) 

268 

269 # save logger to dict loggers 

270 loggers[log_file_name] = logger 

271 

272 # Add or update console handler only if: 

273 # 1. Logger is being created for the first time (not logger_exists), OR 

274 # 2. console_level is explicitly provided (not None) 

275 # This prevents removing the console handler when business logic calls get_logger() without parameters 

276 if logger and (not logger_exists or console_level is not None): 

277 _update_console_handler(logger, console_level) 

278 

279 return logger