Coverage for /home/martinb/.local/share/virtualenvs/camcops/lib/python3.6/site-packages/cardinal_pythonlib/fileops.py : 23%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/env python
2# cardinal_pythonlib/ui.py
4"""
5===============================================================================
7 Original code copyright (C) 2009-2021 Rudolf Cardinal (rudolf@pobox.com).
9 This file is part of cardinal_pythonlib.
11 Licensed under the Apache License, Version 2.0 (the "License");
12 you may not use this file except in compliance with the License.
13 You may obtain a copy of the License at
15 https://www.apache.org/licenses/LICENSE-2.0
17 Unless required by applicable law or agreed to in writing, software
18 distributed under the License is distributed on an "AS IS" BASIS,
19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 See the License for the specific language governing permissions and
21 limitations under the License.
23===============================================================================
25**File operations.**
27"""
29from contextlib import contextmanager
30import fnmatch
31import glob
32import os
33import shutil
34import stat
35from types import TracebackType
36from typing import Any, Callable, Dict, Generator, List, Optional, Tuple
38from cardinal_pythonlib.logs import get_brace_style_log_with_null_handler
40log = get_brace_style_log_with_null_handler(__name__)
43# =============================================================================
44# Find or require executables
45# =============================================================================
47def which_with_envpath(executable: str, env: Dict[str, str]) -> str:
48 """
49 Performs a :func:`shutil.which` command using the PATH from the specified
50 environment.
52 Reason: when you use ``run([executable, ...], env)`` and therefore
53 ``subprocess.run([executable, ...], env=env)``, the PATH that's searched
54 for ``executable`` is the parent's, not the new child's -- so you have to
55 find the executable manually.
57 Args:
58 executable: executable to find
59 env: environment to fetch the PATH variable from
60 """
61 oldpath = os.environ.get("PATH", "")
62 os.environ["PATH"] = env.get("PATH")
63 which = shutil.which(executable)
64 os.environ["PATH"] = oldpath
65 return which
68_MISSING_COMMAND = "Missing command (must be on the PATH): "
71def require_executable(executable: str) -> None:
72 """
73 If ``executable`` is not found by :func:`shutil.which`, raise
74 :exc:`FileNotFoundError`.
75 """
76 if shutil.which(executable):
77 return
78 errmsg = _MISSING_COMMAND + executable
79 log.critical(errmsg)
80 raise FileNotFoundError(errmsg)
83def which_and_require(executable: str, fullpath: bool = False) -> str:
84 """
85 Ensures that ``executable`` is on the path, and returns it (or its full
86 path via :func:`shutil.which`).
87 """
88 w = shutil.which(executable)
89 if w:
90 return w if fullpath else executable
91 errmsg = _MISSING_COMMAND + executable
92 log.critical(errmsg)
93 raise FileNotFoundError(errmsg)
96# =============================================================================
97# Create directories
98# =============================================================================
100def mkdir_p(path: str) -> None:
101 """
102 Makes a directory, and any intermediate (parent) directories if required.
104 This is the UNIX ``mkdir -p DIRECTORY`` command; of course, we use
105 :func:`os.makedirs` instead, for portability.
106 """
107 log.debug("mkdir -p " + path)
108 os.makedirs(path, exist_ok=True)
111# =============================================================================
112# Change directories
113# =============================================================================
115@contextmanager
116def pushd(directory: str) -> None:
117 """
118 Context manager: changes directory and preserves the original on exit.
120 Example:
122 .. code-block:: python
124 with pushd(new_directory):
125 # do things
126 """
127 previous_dir = os.getcwd()
128 os.chdir(directory)
129 yield
130 os.chdir(previous_dir)
133def preserve_cwd(func: Callable) -> Callable:
134 """
135 Decorator to preserve the current working directory in calls to the
136 decorated function.
138 Example:
140 .. code-block:: python
142 @preserve_cwd
143 def myfunc():
144 os.chdir("/faraway")
146 os.chdir("/home")
147 myfunc()
148 assert os.getcwd() == "/home"
149 """
150 # https://stackoverflow.com/questions/169070/python-how-do-i-write-a-decorator-that-restores-the-cwd # noqa
151 def decorator(*args_, **kwargs) -> Any:
152 cwd = os.getcwd()
153 result = func(*args_, **kwargs)
154 os.chdir(cwd)
155 return result
156 return decorator
159def root_path() -> str:
160 """
161 Returns the system root directory.
162 """
163 # https://stackoverflow.com/questions/12041525
164 return os.path.abspath(os.sep)
167# =============================================================================
168# Copy or move things
169# =============================================================================
171def copyglob(src: str, dest: str, allow_nothing: bool = False,
172 allow_nonfiles: bool = False) -> None:
173 """
174 Copies files whose filenames match the glob src" into the directory
175 "dest". Raises an error if no files are copied, unless allow_nothing is
176 True.
178 Args:
179 src: source glob (e.g. ``/somewhere/*.txt``)
180 dest: destination directory
181 allow_nothing: don't raise an exception if no files are found
182 allow_nonfiles: copy things that are not files too (as judged by
183 :func:`os.path.isfile`).
185 Raises:
186 ValueError: if no files are found and ``allow_nothing`` is not set
187 """
188 something = False
189 for filename in glob.glob(src):
190 if allow_nonfiles or os.path.isfile(filename):
191 shutil.copy(filename, dest)
192 something = True
193 if something or allow_nothing:
194 return
195 raise ValueError(f"No files found matching: {src}")
198def moveglob(src: str, dest: str, allow_nothing: bool = False,
199 allow_nonfiles: bool = False) -> None:
200 """
201 As for :func:`copyglob`, but moves instead.
202 """
203 something = False
204 for filename in glob.glob(src):
205 if allow_nonfiles or os.path.isfile(filename):
206 shutil.move(filename, dest)
207 something = True
208 if something or allow_nothing:
209 return
210 raise ValueError(f"No files found matching: {src}")
213def copy_tree_root(src_dir: str, dest_parent: str) -> None:
214 """
215 Copies a directory ``src_dir`` into the directory ``dest_parent``.
216 That is, with a file structure like:
218 .. code-block:: none
220 /source/thing/a.txt
221 /source/thing/b.txt
222 /source/thing/somedir/c.txt
224 the command
226 .. code-block:: python
228 copy_tree_root("/source/thing", "/dest")
230 ends up creating
232 .. code-block:: none
234 /dest/thing/a.txt
235 /dest/thing/b.txt
236 /dest/thing/somedir/c.txt
237 """
238 dirname = os.path.basename(os.path.normpath(src_dir))
239 dest_dir = os.path.join(dest_parent, dirname)
240 shutil.copytree(src_dir, dest_dir)
243def copy_tree_contents(srcdir: str, destdir: str,
244 destroy: bool = False) -> None:
245 """
246 Recursive copy. Unlike :func:`copy_tree_root`, :func:`copy_tree_contents`
247 works as follows. With the file structure:
249 .. code-block:: none
251 /source/thing/a.txt
252 /source/thing/b.txt
253 /source/thing/somedir/c.txt
255 the command
257 .. code-block:: python
259 copy_tree_contents("/source/thing", "/dest")
261 ends up creating:
263 .. code-block:: none
265 /dest/a.txt
266 /dest/b.txt
267 /dest/somedir/c.txt
269 """
270 log.info("Copying directory {} -> {}", srcdir, destdir)
271 if os.path.exists(destdir):
272 if not destroy:
273 raise ValueError("Destination exists!")
274 if not os.path.isdir(destdir):
275 raise ValueError("Destination exists but isn't a directory!")
276 log.debug("... removing old contents")
277 rmtree(destdir)
278 log.debug("... now copying")
279 shutil.copytree(srcdir, destdir)
282# =============================================================================
283# Delete things
284# =============================================================================
286def rmglob(pattern: str) -> None:
287 """
288 Deletes all files whose filename matches the glob ``pattern`` (via
289 :func:`glob.glob`).
290 """
291 for f in glob.glob(pattern):
292 os.remove(f)
295def purge(path: str, pattern: str) -> None:
296 """
297 Deletes all files in ``path`` matching ``pattern`` (via
298 :func:`fnmatch.fnmatch`).
299 """
300 for f in find(pattern, path):
301 log.info("Deleting {}", f)
302 os.remove(f)
305def delete_files_within_dir(directory: str, filenames: List[str]) -> None:
306 """
307 Delete files within ``directory`` whose filename *exactly* matches one of
308 ``filenames``.
309 """
310 for dirpath, dirnames, fnames in os.walk(directory):
311 for f in fnames:
312 if f in filenames:
313 fullpath = os.path.join(dirpath, f)
314 log.debug("Deleting {!r}", fullpath)
315 os.remove(fullpath)
318EXC_INFO_TYPE = Tuple[
319 Optional[Any], # Type[BaseException]], but that's not in Python 3.5
320 Optional[BaseException],
321 Optional[TracebackType], # it's a traceback object
322]
323# https://docs.python.org/3/library/sys.html#sys.exc_info
326def shutil_rmtree_onerror(func: Callable[[str], None],
327 path: str,
328 exc_info: EXC_INFO_TYPE) -> None:
329 """
330 Error handler for ``shutil.rmtree``.
332 If the error is due to an access error (read only file)
333 it attempts to add write permission and then retries.
335 If the error is for another reason it re-raises the error.
337 Usage: ``shutil.rmtree(path, onerror=shutil_rmtree_onerror)``
339 See
340 https://stackoverflow.com/questions/2656322/shutil-rmtree-fails-on-windows-with-access-is-denied
341 """ # noqa
342 if not os.access(path, os.W_OK):
343 # Is the error an access error ?
344 os.chmod(path, stat.S_IWUSR)
345 func(path)
346 else:
347 exc = exc_info[1]
348 raise exc
351def rmtree(directory: str) -> None:
352 """
353 Deletes a directory tree.
354 """
355 log.debug("Deleting directory {!r}", directory)
356 shutil.rmtree(directory, onerror=shutil_rmtree_onerror)
359# =============================================================================
360# Change ownership or permissions
361# =============================================================================
363def chown_r(path: str, user: str, group: str) -> None:
364 """
365 Performs a recursive ``chown``.
367 Args:
368 path: path to walk down
369 user: user name or ID
370 group: group name or ID
372 As per https://stackoverflow.com/questions/2853723
373 """
374 for root, dirs, files in os.walk(path):
375 for x in dirs:
376 shutil.chown(os.path.join(root, x), user, group)
377 for x in files:
378 shutil.chown(os.path.join(root, x), user, group)
381def chmod_r(root: str, permission: int) -> None:
382 """
383 Recursive ``chmod``.
385 Args:
386 root: directory to walk down
387 permission: e.g. ``e.g. stat.S_IWUSR``
388 """
389 os.chmod(root, permission)
390 for dirpath, dirnames, filenames in os.walk(root):
391 for d in dirnames:
392 os.chmod(os.path.join(dirpath, d), permission)
393 for f in filenames:
394 os.chmod(os.path.join(dirpath, f), permission)
397# =============================================================================
398# Find files
399# =============================================================================
401def find(pattern: str, path: str) -> List[str]:
402 """
403 Finds files in ``path`` whose filenames match ``pattern`` (via
404 :func:`fnmatch.fnmatch`).
405 """
406 result = []
407 for root, dirs, files in os.walk(path):
408 for name in files:
409 if fnmatch.fnmatch(name, pattern):
410 result.append(os.path.join(root, name))
411 return result
414def find_first(pattern: str, path: str) -> str:
415 """
416 Finds first file in ``path`` whose filename matches ``pattern`` (via
417 :func:`fnmatch.fnmatch`), or raises :exc:`IndexError`.
418 """
419 try:
420 return find(pattern, path)[0]
421 except IndexError:
422 log.critical('''Couldn't find "{}" in "{}"''', pattern, path)
423 raise
426def gen_filenames(starting_filenames: List[str],
427 recursive: bool) -> Generator[str, None, None]:
428 """
429 From a starting list of files and/or directories, generates filenames of
430 all files in the list, and (if ``recursive`` is set) all files within
431 directories in the list.
433 Args:
434 starting_filenames: files and/or directories
435 recursive: walk down any directories in the starting list, recursively?
437 Yields:
438 each filename
440 """
441 for base_filename in starting_filenames:
442 if os.path.isfile(base_filename):
443 yield os.path.abspath(base_filename)
444 elif os.path.isdir(base_filename) and recursive:
445 for dirpath, dirnames, filenames in os.walk(base_filename):
446 for fname in filenames:
447 yield os.path.abspath(os.path.join(dirpath, fname))
450# =============================================================================
451# Check lock status
452# =============================================================================
454def exists_locked(filepath: str) -> Tuple[bool, bool]:
455 """
456 Checks if a file is locked by opening it in append mode.
457 (If no exception is thrown in that situation, then the file is not locked.)
459 Args:
460 filepath: file to check
462 Returns:
463 tuple: ``(exists, locked)``
465 See https://www.calazan.com/how-to-check-if-a-file-is-locked-in-python/.
466 """
467 exists = False
468 locked = None
469 file_object = None
470 if os.path.exists(filepath):
471 exists = True
472 locked = True
473 try:
474 buffer_size = 8
475 # Opening file in append mode and read the first 8 characters.
476 file_object = open(filepath, 'a', buffer_size)
477 if file_object:
478 locked = False # exists and not locked
479 except IOError:
480 pass
481 finally:
482 if file_object:
483 file_object.close()
484 return exists, locked
487# =============================================================================
488# Filename/path processing
489# =============================================================================
491def relative_filename_within_dir(filename: str, directory: str) -> str:
492 """
493 Starting with a (typically absolute) ``filename``, returns the part of the
494 filename that is relative to the directory ``directory``.
495 If the file is *not* within the directory, returns an empty string.
496 """
497 filename = os.path.abspath(filename)
498 directory = os.path.abspath(directory)
499 if os.path.commonpath([directory, filename]) != directory:
500 # Filename is not within directory
501 return ""
502 return os.path.relpath(filename, start=directory)
505# =============================================================================
506# Disk space
507# =============================================================================
509def get_directory_contents_size(directory: str = ".") -> int:
510 """
511 Returns the total size of all files within a directory.
513 See
514 https://stackoverflow.com/questions/1392413/calculating-a-directorys-size-using-python.
516 Args:
517 directory: directory to check
519 Returns:
520 int: size in bytes
521 """ # noqa
522 total_size = 0
523 for dirpath, dirnames, filenames in os.walk(directory):
524 for f in filenames:
525 fp = os.path.join(dirpath, f)
526 # skip if it is symbolic link
527 if not os.path.islink(fp):
528 total_size += os.path.getsize(fp)
530 return total_size