Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1#!/usr/bin/env python 

2# cardinal_pythonlib/file_io.py 

3 

4""" 

5=============================================================================== 

6 

7 Original code copyright (C) 2009-2021 Rudolf Cardinal (rudolf@pobox.com). 

8 

9 This file is part of cardinal_pythonlib. 

10 

11 Licensed under the Apache License, Version 2.0 (the "License"); 

12 you may not use this file except in compliance with the License. 

13 You may obtain a copy of the License at 

14 

15 https://www.apache.org/licenses/LICENSE-2.0 

16 

17 Unless required by applicable law or agreed to in writing, software 

18 distributed under the License is distributed on an "AS IS" BASIS, 

19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

20 See the License for the specific language governing permissions and 

21 limitations under the License. 

22 

23=============================================================================== 

24 

25**Support functions for file I/O.** 

26 

27""" 

28 

29from contextlib import contextmanager 

30import csv 

31import fnmatch 

32import gzip 

33from html import escape 

34import io 

35from operator import attrgetter 

36import os 

37import shutil 

38import subprocess 

39import sys 

40import tempfile 

41from typing import (Any, BinaryIO, Generator, Iterable, IO, List, TextIO, 

42 Tuple, Union) 

43import zipfile 

44 

45from cardinal_pythonlib.logs import get_brace_style_log_with_null_handler 

46 

47log = get_brace_style_log_with_null_handler(__name__) 

48 

49UTF8 = "utf8" 

50 

51 

52# ============================================================================= 

53# File opening 

54# ============================================================================= 

55 

56@contextmanager 

57def smart_open(filename: str, mode: str = 'Ur', buffering: int = -1, 

58 encoding: str = None, errors: str = None, newline: str = None, 

59 closefd: bool = True) -> IO: 

60 """ 

61 Context manager (for use with ``with``) that opens a filename and provides 

62 a :class:`IO` object. If the filename is ``'-'``, however, then 

63 ``sys.stdin`` is used for reading and ``sys.stdout`` is used for writing. 

64 """ 

65 # https://stackoverflow.com/questions/17602878/how-to-handle-both-with-open-and-sys-stdout-nicely # noqa 

66 # https://stackoverflow.com/questions/1744989/read-from-file-or-stdin/29824059#29824059 # noqa 

67 if filename == "-": 

68 if mode is None or mode == "" or "r" in mode: 

69 fh = sys.stdin 

70 else: 

71 fh = sys.stdout 

72 else: 

73 fh = open(filename, mode=mode, 

74 buffering=buffering, encoding=encoding, errors=errors, 

75 newline=newline, closefd=closefd) 

76 try: 

77 yield fh 

78 finally: 

79 if filename != "-": 

80 fh.close() 

81 # It does matter that you do NOT close sys.stdin or sys.stdout! 

82 # The close() calls will work, and after that, operations on 

83 # stdin/stdout will fail. 

84 

85 

86# ============================================================================= 

87# File output 

88# ============================================================================= 

89 

90def writeline_nl(fileobj: TextIO, line: str) -> None: 

91 """ 

92 Writes a line plus a terminating newline to the file. 

93 """ 

94 fileobj.write(line + '\n') 

95 

96 

97def writelines_nl(fileobj: TextIO, lines: Iterable[str]) -> None: 

98 """ 

99 Writes lines, plus terminating newline characters, to the file. 

100 

101 (Since :func:`fileobj.writelines` doesn't add newlines... 

102 https://stackoverflow.com/questions/13730107/writelines-writes-lines-without-newline-just-fills-the-file) 

103 """ # noqa 

104 fileobj.write('\n'.join(lines) + '\n') 

105 

106 

107def write_text(filename: str, text: str) -> None: 

108 """ 

109 Writes text to a file. 

110 """ 

111 with open(filename, 'w') as f: # type: TextIO 

112 print(text, file=f) 

113 

114 

115def write_gzipped_text(basefilename: str, text: str) -> None: 

116 """ 

117 Writes text to a file compressed with ``gzip`` (a ``.gz`` file). 

118 The filename is used directly for the "inner" file and the extension 

119 ``.gz`` is appended to the "outer" (zipped) file's name. 

120  

121 This function exists primarily because Lintian wants non-timestamped gzip 

122 files, or it complains: 

123 - https://lintian.debian.org/tags/package-contains-timestamped-gzip.html 

124 - See https://stackoverflow.com/questions/25728472/python-gzip-omit-the-original-filename-and-timestamp 

125 """ # noqa 

126 zipfilename = basefilename + '.gz' 

127 compresslevel = 9 

128 mtime = 0 

129 with open(zipfilename, 'wb') as f: 

130 with gzip.GzipFile(basefilename, 'wb', compresslevel, f, mtime) as gz: 

131 with io.TextIOWrapper(gz) as tw: 

132 tw.write(text) 

133 

134 

135# ============================================================================= 

136# File input 

137# ============================================================================= 

138 

139def get_lines_without_comments(filename: str) -> List[str]: 

140 """ 

141 See :func:`gen_lines_without_comments`; returns results as a list. 

142 """ 

143 return list(gen_lines_without_comments(filename)) 

144 

145 

146# ============================================================================= 

147# More file input: generic generators 

148# ============================================================================= 

149 

150def gen_noncomment_lines( 

151 file: TextIO, 

152 comment_at_start_only: bool = False) -> Generator[str, None, None]: 

153 """ 

154 From an open file, yields all lines as a list, left- and right-stripping 

155 the lines and (by default) removing everything on a line after the first 

156 ``#``. 

157 

158 Also removes blank lines. 

159 

160 Args: 

161 file: 

162 The input file-like object. 

163 comment_at_start_only: 

164 Only detect comments when the ``#`` is the first non-whitespace 

165 character of a line? (The default is False, meaning that comments 

166 are also allowed at the end of lines. NOTE that this does not cope 

167 well with quoted ``#`` symbols.) 

168 

169 """ 

170 if comment_at_start_only: 

171 for line in file: 

172 line = line.strip() # equivalent to lstrip() and rstrip() 

173 if line and not line.startswith("#"): 

174 yield line 

175 else: 

176 for line in file: 

177 line = line.partition('#')[0] # the part before the first # 

178 line = line.strip() # equivalent to lstrip() and rstrip() 

179 if line: 

180 yield line 

181 

182 

183def gen_lines_without_comments( 

184 filename: str, 

185 comment_at_start_only: bool = False) -> Generator[str, None, None]: 

186 """ 

187 As for :func:`gen_noncomment_lines`, but using a filename. 

188 """ 

189 with open(filename) as f: 

190 for line in gen_noncomment_lines( 

191 f, comment_at_start_only=comment_at_start_only): 

192 yield line 

193 

194 

195def gen_textfiles_from_filenames( 

196 filenames: Iterable[str]) -> Generator[TextIO, None, None]: 

197 """ 

198 Generates file-like objects from a list of filenames. 

199 

200 Args: 

201 filenames: iterable of filenames 

202 

203 Yields: 

204 each file as a :class:`TextIO` object 

205 

206 """ 

207 for filename in filenames: 

208 with open(filename) as f: 

209 yield f 

210 

211 

212def gen_lines_from_textfiles( 

213 files: Iterable[TextIO]) -> Generator[str, None, None]: 

214 """ 

215 Generates lines from file-like objects. 

216 

217 Args: 

218 files: iterable of :class:`TextIO` objects 

219 

220 Yields: 

221 each line of all the files 

222 

223 """ 

224 for file in files: 

225 for line in file: 

226 yield line 

227 

228 

229def gen_lower(x: Iterable[str]) -> Generator[str, None, None]: 

230 """ 

231 Args: 

232 x: iterable of strings 

233 

234 Yields: 

235 each string in lower case 

236 """ 

237 for string in x: 

238 yield string.lower() 

239 

240 

241def gen_lines_from_binary_files( 

242 files: Iterable[BinaryIO], 

243 encoding: str = UTF8) -> Generator[str, None, None]: 

244 """ 

245 Generates lines from binary files. 

246 Strips out newlines. 

247 

248 Args: 

249 files: iterable of :class:`BinaryIO` file-like objects 

250 encoding: encoding to use 

251 

252 Yields: 

253 each line of all the files 

254 

255 """ 

256 for file in files: 

257 for byteline in file: 

258 line = byteline.decode(encoding).strip() 

259 yield line 

260 

261 

262def gen_files_from_zipfiles( 

263 zipfilenames_or_files: Iterable[Union[str, BinaryIO]], 

264 filespec: str, 

265 on_disk: bool = False) -> Generator[BinaryIO, None, None]: 

266 """ 

267 

268 Args: 

269 zipfilenames_or_files: iterable of filenames or :class:`BinaryIO` 

270 file-like objects, giving the ``.zip`` files 

271 filespec: filespec to filter the "inner" files against 

272 on_disk: if ``True``, extracts inner files to disk yields file-like 

273 objects that access disk files (and are therefore seekable); if 

274 ``False``, extracts them in memory and yields file-like objects to 

275 those memory files (which will not be seekable; e.g. 

276 https://stackoverflow.com/questions/12821961/) 

277 

278 Yields: 

279 file-like object for each inner file matching ``filespec``; may be 

280 in memory or on disk, as per ``on_disk`` 

281 

282 """ 

283 for zipfilename_or_file in zipfilenames_or_files: 

284 with zipfile.ZipFile(zipfilename_or_file) as zf: 

285 infolist = zf.infolist() # type: List[zipfile.ZipInfo] 

286 infolist.sort(key=attrgetter('filename')) 

287 for zipinfo in infolist: 

288 if not fnmatch.fnmatch(zipinfo.filename, filespec): 

289 continue 

290 log.debug("Reading subfile {}", zipinfo.filename) 

291 if on_disk: 

292 with tempfile.TemporaryDirectory() as tmpdir: 

293 zf.extract(zipinfo.filename, tmpdir) 

294 diskfilename = os.path.join(tmpdir, zipinfo.filename) 

295 with open(diskfilename, 'rb') as subfile: 

296 yield subfile 

297 else: 

298 # Will not be seekable; e.g. 

299 # https://stackoverflow.com/questions/12821961/ 

300 with zf.open(zipinfo.filename) as subfile: 

301 yield subfile 

302 

303 

304def gen_part_from_line(lines: Iterable[str], 

305 part_index: int, 

306 splitter: str = None) -> Generator[str, None, None]: 

307 """ 

308 Splits lines with ``splitter`` and yields a specified part by index. 

309 

310 Args: 

311 lines: iterable of strings 

312 part_index: index of part to yield 

313 splitter: string to split the lines on 

314 

315 Yields: 

316 the specified part for each line 

317 

318 """ 

319 for line in lines: 

320 parts = line.split(splitter) 

321 yield parts[part_index] 

322 

323 

324def gen_part_from_iterables(iterables: Iterable[Any], 

325 part_index: int) -> Generator[Any, None, None]: 

326 r""" 

327 Yields the *n*\ th part of each thing in ``iterables``. 

328 

329 Args: 

330 iterables: iterable of anything 

331 part_index: part index 

332 

333 Yields: 

334 ``item[part_index] for item in iterable`` 

335 

336 """ 

337 # RST: make part of word bold/italic: 

338 # https://stackoverflow.com/questions/12771480/part-of-a-word-bold-in-restructuredtext # noqa 

339 for iterable in iterables: 

340 yield iterable[part_index] 

341 

342 

343def gen_rows_from_csv_binfiles( 

344 csv_files: Iterable[BinaryIO], 

345 encoding: str = UTF8, 

346 skip_header: bool = False, 

347 **csv_reader_kwargs) -> Generator[Iterable[str], None, None]: 

348 """ 

349 Iterate through binary file-like objects that are CSV files in a specified 

350 encoding. Yield each row. 

351 

352 Args: 

353 csv_files: iterable of :class:`BinaryIO` objects 

354 encoding: encoding to use 

355 skip_header: skip the header (first) row of each file? 

356 csv_reader_kwargs: arguments to pass to :func:`csv.reader` 

357 

358 Yields: 

359 rows from the files 

360 

361 """ 

362 dialect = csv_reader_kwargs.pop('dialect', None) 

363 for csv_file_bin in csv_files: 

364 # noinspection PyTypeChecker 

365 csv_file = io.TextIOWrapper(csv_file_bin, encoding=encoding) 

366 thisfile_dialect = dialect 

367 if thisfile_dialect is None: 

368 thisfile_dialect = csv.Sniffer().sniff(csv_file.read(1024)) 

369 csv_file.seek(0) 

370 reader = csv.reader(csv_file, dialect=thisfile_dialect, 

371 **csv_reader_kwargs) 

372 first = True 

373 for row in reader: 

374 if first: 

375 first = False 

376 if skip_header: 

377 continue 

378 yield row 

379 

380 

381# ============================================================================= 

382# File transformations 

383# ============================================================================= 

384 

385def webify_file(srcfilename: str, destfilename: str) -> None: 

386 """ 

387 Rewrites a file from ``srcfilename`` to ``destfilename``, HTML-escaping it 

388 in the process. 

389 """ 

390 with open(srcfilename) as infile, open(destfilename, 'w') as ofile: 

391 for line_ in infile: 

392 ofile.write(escape(line_)) 

393 

394 

395def remove_gzip_timestamp(filename: str, 

396 gunzip_executable: str = "gunzip", 

397 gzip_executable: str = "gzip", 

398 gzip_args: List[str] = None) -> None: 

399 """ 

400 Uses external ``gunzip``/``gzip`` tools to remove a ``gzip`` timestamp. 

401 Necessary for Lintian. 

402 """ 

403 gzip_args = gzip_args or [ 

404 "-9", # maximum compression (or Lintian moans) 

405 "-n", 

406 ] 

407 # gzip/gunzip operate on SINGLE files 

408 with tempfile.TemporaryDirectory() as dir_: 

409 basezipfilename = os.path.basename(filename) 

410 newzip = os.path.join(dir_, basezipfilename) 

411 with open(newzip, 'wb') as z: 

412 log.info( 

413 "Removing gzip timestamp: " 

414 "{} -> gunzip -c -> gzip -n -> {}", 

415 basezipfilename, newzip) 

416 p1 = subprocess.Popen([gunzip_executable, "-c", filename], 

417 stdout=subprocess.PIPE) 

418 p2 = subprocess.Popen([gzip_executable] + gzip_args, 

419 stdin=p1.stdout, stdout=z) 

420 p2.communicate() 

421 shutil.copyfile(newzip, filename) # copy back 

422 

423 

424# ============================================================================= 

425# File modifications 

426# ============================================================================= 

427 

428def replace_in_file(filename: str, text_from: str, text_to: str, 

429 backup_filename: str = None) -> None: 

430 """ 

431 Replaces text in a file. 

432 

433 Args: 

434 filename: filename to process (modifying it in place) 

435 text_from: original text to replace 

436 text_to: replacement text 

437 backup_filename: backup filename to write to, if modifications made 

438 """ 

439 log.info("Amending {}: {} -> {}", 

440 filename, repr(text_from), repr(text_to)) 

441 with open(filename) as infile: 

442 original = infile.read() 

443 modified = original.replace(text_from, text_to) 

444 if modified != original: 

445 if backup_filename: 

446 with open(filename, 'w') as outfile: 

447 outfile.write(original) 

448 with open(filename, 'w') as outfile: 

449 outfile.write(modified) 

450 

451 

452def replace_multiple_in_file(filename: str, 

453 replacements: List[Tuple[str, str]], 

454 backup_filename: str = None) -> None: 

455 """ 

456 Replaces multiple from/to string pairs within a single file. 

457 

458 Args: 

459 filename: filename to process (modifying it in place) 

460 replacements: list of ``(from_text, to_text)`` tuples 

461 backup_filename: backup filename to write to, if modifications made 

462 """ 

463 with open(filename) as infile: 

464 original = infile.read() 

465 modified = original 

466 for text_from, text_to in replacements: 

467 log.info("Amending {}: {} -> {}", 

468 filename, repr(text_from), repr(text_to)) 

469 modified = modified.replace(text_from, text_to) 

470 if modified != original: 

471 if backup_filename: 

472 with open(filename, 'w') as outfile: 

473 outfile.write(original) 

474 with open(filename, 'w') as outfile: 

475 outfile.write(modified) 

476 

477 

478def convert_line_endings(filename: str, to_unix: bool = False, 

479 to_windows: bool = False) -> None: 

480 """ 

481 Converts a file (in place) from UNIX to Windows line endings, or the 

482 reverse. 

483 

484 Args: 

485 filename: filename to modify (in place) 

486 to_unix: convert Windows (CR LF) to UNIX (LF) 

487 to_windows: convert UNIX (LF) to Windows (CR LF) 

488 """ 

489 assert to_unix != to_windows 

490 with open(filename, "rb") as f: 

491 contents = f.read() 

492 windows_eol = b"\r\n" # CR LF 

493 unix_eol = b"\n" # LF 

494 if to_unix: 

495 log.info("Converting from Windows to UNIX line endings: {!r}", 

496 filename) 

497 src = windows_eol 

498 dst = unix_eol 

499 else: # to_windows 

500 log.info("Converting from UNIX to Windows line endings: {!r}", 

501 filename) 

502 src = unix_eol 

503 dst = windows_eol 

504 if windows_eol in contents: 

505 log.info("... already contains at least one Windows line ending; " 

506 "probably converted before; skipping") 

507 return 

508 contents = contents.replace(src, dst) 

509 with open(filename, "wb") as f: 

510 f.write(contents) 

511 

512 

513def is_line_in_file(filename: str, line: str) -> bool: 

514 """ 

515 Detects whether a line is present within a file. 

516 

517 Args: 

518 filename: file to check 

519 line: line to search for (as an exact match) 

520 """ 

521 assert "\n" not in line 

522 with open(filename, "r") as file: 

523 for fileline in file: 

524 if fileline == line: 

525 return True 

526 return False 

527 

528 

529def add_line_if_absent(filename: str, line: str) -> None: 

530 """ 

531 Adds a line (at the end) if it's not already in the file somewhere. 

532 

533 Args: 

534 filename: filename to modify (in place) 

535 line: line to append (which must not have a newline in) 

536 """ 

537 assert "\n" not in line 

538 if not is_line_in_file(filename, line): 

539 log.info("Appending line {!r} to file {!r}", line, filename) 

540 with open(filename, "a") as file: 

541 file.writelines([line])