Coverage for lmcat\lmcat.py: 73%

186 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-29 16:42 -0700

1import argparse 

2import io 

3import json 

4 

5# from dataclasses import dataclass, field 

6from pathlib import Path 

7import sys 

8 

9from lmcat.processing_pipeline import ProcessingPipeline 

10 

11 

12# Handle Python 3.11+ vs older Python for TOML parsing 

13try: 

14 import tomllib 

15except ImportError: 

16 try: 

17 import tomli as tomllib # type: ignore 

18 except ImportError: 

19 tomllib = None # type: ignore[assignment] 

20 

21import igittigitt # noqa: E402 

22 

23from muutils.json_serialize import ( 

24 SerializableDataclass, 

25 serializable_dataclass, 

26 serializable_field, 

27) 

28from muutils.misc import shorten_numerical_to_str # noqa: E402 

29 

30 

31from lmcat.file_stats import FileStats, TokenizerWrapper, TreeEntry, TOKENIZERS_PRESENT 

32from lmcat.processing_pipeline import OnMultipleProcessors 

33 

34 

35@serializable_dataclass(kw_only=True) 

36class LMCatConfig(SerializableDataclass): 

37 """Configuration dataclass for lmcat 

38 

39 # Parameters: 

40 - `tree_divider: str` 

41 - `tree_indent: str` 

42 - `tree_file_divider: str` 

43 - `content_divider: str` 

44 - `include_gitignore: bool` (default True) 

45 - `tree_only: bool` (default False) 

46 """ 

47 

48 content_divider: str = serializable_field(default="``````") 

49 tree_only: bool = serializable_field(default=False) 

50 

51 # ignoring 

52 ignore_patterns: list[str] = serializable_field(default_factory=list) 

53 ignore_patterns_files: list[Path] = serializable_field( 

54 default_factory=lambda: [Path(".gitignore"), Path(".lmignore")], 

55 serialization_fn=lambda x: [p.as_posix() for p in x], 

56 deserialize_fn=lambda x: [Path(p) for p in x], 

57 ) 

58 

59 # this file will be imported, and if the functions in it are decorated 

60 # with one of the `register_*` decorators, they will be added to the functions 

61 # which can be used in the processing pipeline 

62 # --allow-plugins is a command line only option and must be set to true for this to work 

63 plugins_file: Path | None = serializable_field( 

64 default=None, 

65 serialization_fn=lambda x: x.as_posix() if x else None, 

66 deserialize_fn=lambda x: Path(x) if x else None, 

67 ) 

68 allow_plugins: bool = serializable_field( 

69 default=False, 

70 deserialize_fn=lambda x: False, # this can only be overriden through the command line 

71 ) 

72 

73 # processing pipeline 

74 glob_process: dict[str, str] = serializable_field(default_factory=dict) 

75 decider_process: dict[str, str] = serializable_field(default_factory=dict) 

76 on_multiple_processors: OnMultipleProcessors = serializable_field( 

77 default="except", 

78 assert_type=False, 

79 ) 

80 

81 # tokenization 

82 tokenizer: str = serializable_field( 

83 default="gpt2" if TOKENIZERS_PRESENT else "whitespace-split" 

84 ) 

85 "Tokenizer to use for tokenizing the output. `gpt2` by default. passed to `tokenizers.Tokenizer.from_pretrained()`. If specified and `tokenizers` not installed, will throw exception. fallback `whitespace-split` used to avoid exception when `tokenizers` not installed." 

86 

87 # tree formatting 

88 tree_divider: str = serializable_field(default="│ ") 

89 tree_file_divider: str = serializable_field(default="├── ") 

90 tree_indent: str = serializable_field(default=" ") 

91 

92 def get_tokenizer_obj(self) -> TokenizerWrapper: 

93 """Get the tokenizer object""" 

94 return TokenizerWrapper(self.tokenizer) 

95 

96 def get_processing_pipeline(self) -> ProcessingPipeline: 

97 """Get the processing pipeline object""" 

98 plugins_file: Path | None = self.plugins_file if self.allow_plugins else None 

99 return ProcessingPipeline( 

100 plugins_file=plugins_file, 

101 decider_process_keys=self.decider_process, 

102 glob_process_keys=self.glob_process, 

103 on_multiple_processors=self.on_multiple_processors, 

104 ) 

105 

106 @classmethod 

107 def read(cls, root_dir: Path) -> "LMCatConfig": 

108 """Attempt to read config from pyproject.toml, lmcat.toml, or lmcat.json.""" 

109 pyproject_path: Path = root_dir / "pyproject.toml" 

110 lmcat_toml_path: Path = root_dir / "lmcat.toml" 

111 lmcat_json_path: Path = root_dir / "lmcat.json" 

112 

113 if ( 

114 sum( 

115 int(p.is_file()) 

116 for p in (pyproject_path, lmcat_toml_path, lmcat_json_path) 

117 ) 

118 > 1 

119 ): 

120 raise ValueError( 

121 "Multiple configuration files found. Please only use one of pyproject.toml, lmcat.toml, or lmcat.json." 

122 ) 

123 

124 # Try pyproject.toml first 

125 if tomllib is not None and pyproject_path.is_file(): 

126 with pyproject_path.open("rb") as f: 

127 pyproject_data = tomllib.load(f) 

128 if "tool" in pyproject_data and "lmcat" in pyproject_data["tool"]: 

129 return cls.load(pyproject_data["tool"]["lmcat"]) 

130 

131 # Then try lmcat.toml 

132 if tomllib is not None and lmcat_toml_path.is_file(): 

133 with lmcat_toml_path.open("rb") as f: 

134 toml_data = tomllib.load(f) 

135 return cls.load(toml_data) 

136 

137 # Finally try lmcat.json 

138 if lmcat_json_path.is_file(): 

139 with lmcat_json_path.open("r", encoding="utf-8") as f: 

140 json_data = json.load(f) 

141 return cls.load(json_data) 

142 

143 # Fallback to defaults 

144 return cls() 

145 

146 

147class IgnoreHandler: 

148 """Handles all ignore pattern matching using igittigitt""" 

149 

150 def __init__(self, root_dir: Path, config: LMCatConfig): 

151 self.root_dir: Path = root_dir 

152 self.config: LMCatConfig = config 

153 

154 # set up parser 

155 self.parser: igittigitt.IgnoreParser = igittigitt.IgnoreParser() 

156 

157 # first from the files 

158 for ignore_file in self.config.ignore_patterns_files: 

159 self.parser.parse_rule_files(self.root_dir, filename=ignore_file.name) 

160 

161 # then from the config itself 

162 for pattern in self.config.ignore_patterns: 

163 self.parser.add_rule(pattern=pattern, base_path=self.root_dir) 

164 

165 def is_ignored(self, path: Path) -> bool: 

166 """Check if a path should be ignored""" 

167 # Never ignore the gitignore/lmignore files themselves 

168 if path.name in {".gitignore", ".lmignore"}: 

169 return True 

170 

171 # Use igittigitt's matching 

172 return self.parser.match(path) 

173 

174 

175def sorted_entries(directory: Path) -> list[Path]: 

176 """Return directory contents sorted: directories first, then files""" 

177 subdirs: list[Path] = sorted( 

178 [p for p in directory.iterdir() if p.is_dir()], key=lambda x: x.name 

179 ) 

180 files: list[Path] = sorted( 

181 [p for p in directory.iterdir() if p.is_file()], key=lambda x: x.name 

182 ) 

183 return subdirs + files 

184 

185 

186def walk_dir( 

187 directory: Path, 

188 ignore_handler: IgnoreHandler, 

189 config: LMCatConfig, 

190 tokenizer: TokenizerWrapper, 

191 prefix: str = "", 

192) -> tuple[list[TreeEntry], list[Path]]: 

193 """Recursively walk a directory, building tree lines and collecting file paths""" 

194 tree_output: list[TreeEntry] = [] 

195 collected_files: list[Path] = [] 

196 

197 entries: list[Path] = sorted_entries(directory) 

198 for i, entry in enumerate(entries): 

199 if ignore_handler.is_ignored(entry): 

200 continue 

201 

202 is_last: bool = i == len(entries) - 1 

203 connector: str = ( 

204 config.tree_file_divider 

205 if not is_last 

206 else config.tree_file_divider.replace("├", "└") 

207 ) 

208 

209 if entry.is_dir(): 

210 tree_output.append(TreeEntry(f"{prefix}{connector}{entry.name}", None)) 

211 extension: str = config.tree_divider if not is_last else config.tree_indent 

212 sub_output: list[TreeEntry] 

213 sub_files: list[Path] 

214 sub_output, sub_files = walk_dir( 

215 directory=entry, 

216 ignore_handler=ignore_handler, 

217 config=config, 

218 tokenizer=tokenizer, 

219 prefix=prefix + extension, 

220 ) 

221 tree_output.extend(sub_output) 

222 collected_files.extend(sub_files) 

223 else: 

224 stats: FileStats = FileStats.from_file(entry, tokenizer) 

225 tree_output.append(TreeEntry(f"{prefix}{connector}{entry.name}", stats)) 

226 collected_files.append(entry) 

227 

228 return tree_output, collected_files 

229 

230 

231def format_tree_with_stats( 

232 entries: list[TreeEntry], show_tokens: bool = False 

233) -> list[str]: 

234 """Format tree entries with aligned statistics 

235 

236 # Parameters: 

237 - `entries : list[TreeEntry]` 

238 List of tree entries with optional stats 

239 - `show_tokens : bool` 

240 Whether to show token counts 

241 

242 # Returns: 

243 - `list[str]` 

244 Formatted tree lines with aligned stats 

245 """ 

246 # Find max widths for alignment 

247 max_line_len: int = max(len(entry.line) for entry in entries) 

248 max_lines: int = max( 

249 (len(f"{entry.stats.lines:,}") if entry.stats else 0) for entry in entries 

250 ) 

251 max_chars: int = max( 

252 (len(f"{entry.stats.chars:,}") if entry.stats else 0) for entry in entries 

253 ) 

254 max_tokens: int = ( 

255 max( 

256 ( 

257 len(f"{entry.stats.tokens:,}") 

258 if entry.stats and entry.stats.tokens 

259 else 0 

260 ) 

261 for entry in entries 

262 ) 

263 if show_tokens 

264 else 0 

265 ) 

266 

267 formatted: list[str] = [] 

268 for entry in entries: 

269 line: str = entry.line.ljust(max_line_len + 2) 

270 if entry.stats: 

271 lines_str: str = f"{entry.stats.lines:,}L".rjust(max_lines + 1) 

272 chars_str: str = f"{entry.stats.chars:,}C".rjust(max_chars + 1) 

273 stats_str: str = f"[{lines_str} {chars_str}" 

274 if show_tokens and entry.stats.tokens is not None: 

275 tokens_str: str = f"{entry.stats.tokens:,}T".rjust(max_tokens + 1) 

276 stats_str += f" {tokens_str}" 

277 stats_str += "]" 

278 formatted.append(f"{line}{stats_str}") 

279 else: 

280 formatted.append(line) 

281 

282 return formatted 

283 

284 

285def walk_and_collect( 

286 root_dir: Path, 

287 config: LMCatConfig, 

288) -> tuple[list[str], list[Path]]: 

289 """Walk filesystem from root_dir and gather tree listing plus file paths""" 

290 if config is None: 

291 config = LMCatConfig() 

292 

293 tokenizer: TokenizerWrapper = config.get_tokenizer_obj() 

294 

295 ignore_handler = IgnoreHandler(root_dir, config) 

296 base_name = root_dir.resolve().name 

297 

298 # Start with root directory name 

299 tree_output = [TreeEntry(base_name)] 

300 

301 # Walk the directory tree 

302 sub_output, sub_files = walk_dir( 

303 directory=root_dir, 

304 ignore_handler=ignore_handler, 

305 config=config, 

306 tokenizer=tokenizer, 

307 prefix="", 

308 ) 

309 tree_output.extend(sub_output) 

310 

311 # Format tree with stats 

312 formatted_tree = format_tree_with_stats( 

313 tree_output, show_tokens=tokenizer is not None 

314 ) 

315 

316 return formatted_tree, sub_files 

317 

318 

319def assemble_summary( 

320 root_dir: Path, 

321 config: LMCatConfig, 

322) -> str: 

323 """Assemble the summary output and return""" 

324 

325 processing_pipeline: ProcessingPipeline = config.get_processing_pipeline() 

326 

327 tree_output: list[str] 

328 collected_files: list[Path] 

329 tree_output, collected_files = walk_and_collect( 

330 root_dir=root_dir, 

331 config=config, 

332 ) 

333 

334 output: list[str] = [] 

335 output.append("# File Tree") 

336 output.append("\n```") 

337 output.extend(tree_output) 

338 output.append("```\n") 

339 

340 # Add file contents if not suppressed 

341 if not config.tree_only: 

342 output.append("# File Contents") 

343 

344 for fpath in collected_files: 

345 # get the path 

346 relpath_posix: str = fpath.relative_to(root_dir).as_posix() 

347 

348 # process the contents 

349 f_contents: str 

350 p_name: str | None 

351 f_contents, p_name = processing_pipeline.process_file(fpath) 

352 processed_with: str = f'processed_with="{p_name}"' if p_name else "" 

353 

354 # start of file marker 

355 pathspec_start: str = f'{ path="{relpath_posix}" {processed_with} } ' 

356 pathspec_end: str = f'{ end_of_file="{relpath_posix}" } ' 

357 output.append("") 

358 output.append(config.content_divider + pathspec_start) 

359 

360 # process the actual contents of the file with the pipeline, and append 

361 output.append(f_contents) 

362 

363 # add the end of file marker 

364 output.append(config.content_divider + pathspec_end) 

365 

366 output_joined: str = "\n".join(output) 

367 

368 stats_dict_ints: dict[str, int] = { 

369 "files": len(collected_files), 

370 "lines": len(output_joined.splitlines()), 

371 "chars": len(output_joined), 

372 } 

373 

374 tokenizer: TokenizerWrapper = config.get_tokenizer_obj() 

375 

376 n_tokens: int = tokenizer.n_tokens(output_joined) 

377 stats_dict_ints[f"`{tokenizer.name}` tokens"] = n_tokens 

378 

379 stats_header: list[str] = ["# Stats"] 

380 for key, val in stats_dict_ints.items(): 

381 val_str: str = str(val) 

382 val_short: str = shorten_numerical_to_str(val) 

383 if val_str != val_short: 

384 stats_header.append(f"- {val} ({val_short}) {key}") 

385 else: 

386 stats_header.append(f"- {val} {key}") 

387 

388 output_complete: str = "\n".join(stats_header) + "\n\n" + output_joined 

389 

390 return output_complete 

391 

392 

393def main() -> None: 

394 """Main entry point for the script""" 

395 arg_parser = argparse.ArgumentParser( 

396 description="lmcat - list tree and content, combining .gitignore + .lmignore", 

397 add_help=False, 

398 ) 

399 arg_parser.add_argument( 

400 "-t", 

401 "--tree-only", 

402 action="store_true", 

403 default=False, 

404 help="Only print the tree, not the file contents.", 

405 ) 

406 arg_parser.add_argument( 

407 "-o", 

408 "--output", 

409 action="store", 

410 default=None, 

411 help="Output file to write the tree and contents to.", 

412 ) 

413 arg_parser.add_argument( 

414 "-h", "--help", action="help", help="Show this help message and exit." 

415 ) 

416 arg_parser.add_argument( 

417 "--print-cfg", 

418 action="store_true", 

419 default=False, 

420 help="Print the configuration as json and exit.", 

421 ) 

422 arg_parser.add_argument( 

423 "--allow-plugins", 

424 action="store_true", 

425 default=False, 

426 help="Allow plugins to be loaded from the plugins file. WARNING: this will execute arbitrary code found in the file pointed to by `config.plugins_file`, and **is a security risk**.", 

427 ) 

428 

429 args: argparse.Namespace = arg_parser.parse_known_args()[0] 

430 root_dir: Path = Path(".").resolve() 

431 config: LMCatConfig = LMCatConfig.read(root_dir) 

432 

433 # CLI overrides 

434 config.tree_only = args.tree_only 

435 config.allow_plugins = args.allow_plugins 

436 

437 # print cfg and exit if requested 

438 if args.print_cfg: 

439 print(json.dumps(config.serialize(), indent="\t")) 

440 return 

441 

442 # assemble summary 

443 summary: str = assemble_summary(root_dir=root_dir, config=config) 

444 

445 # Write output 

446 if args.output: 

447 output_path: Path = Path(args.output) 

448 output_path.parent.mkdir(parents=True, exist_ok=True) 

449 output_path.write_text(summary, encoding="utf-8") 

450 else: 

451 if sys.platform == "win32": 

452 sys.stdout = io.TextIOWrapper( 

453 sys.stdout.buffer, encoding="utf-8", errors="replace" 

454 ) 

455 sys.stderr = io.TextIOWrapper( 

456 sys.stderr.buffer, encoding="utf-8", errors="replace" 

457 ) 

458 

459 print(summary) 

460 

461 

462if __name__ == "__main__": 

463 main()