multitax.utils
1import gzip 2import io 3import os 4import tarfile 5import urllib.request 6import zlib 7import warnings 8from collections import OrderedDict 9from urllib.error import HTTPError 10 11 12def check_dir(prefix: str): 13 abs_path = os.path.dirname(os.path.abspath(prefix)) 14 if not os.path.exists(abs_path): 15 raise NotADirectoryError(abs_path) 16 17 18def check_file(file: str): 19 if not os.path.isfile(file): 20 raise FileNotFoundError(file + " file do not exist") 21 if os.path.getsize(file) == 0: 22 raise FileNotFoundError(file + " file is empty") 23 24 25def check_no_file(file: str): 26 if os.path.isfile(file): 27 raise FileExistsError(file) 28 29 30def close_files(fhs: dict): 31 """ 32 Parameters: 33 * **fhs** *[dict]*: {file: file handler} 34 35 Returns: Nothing 36 """ 37 for fh in fhs.values(): 38 fh.close() 39 40 41def download_files(urls: list, output_prefix: str = None, retry_attempts: int = 1): 42 """ 43 Download and open files (memory/stream) or write to disk (multitax.utils.save_urls) 44 45 Parameters: 46 * **urls** *[list]*: List of files to download (text, ".gz", ".tar.gz", ".tgz") 47 * **output_prefix** *[str]*: Output directory to save files 48 49 Returns: 50 * OrderedDict {file: file handler} (same order as input) 51 """ 52 if isinstance(urls, str): 53 urls = [urls] 54 55 att = 0 56 while att < retry_attempts: 57 att += 1 58 try: 59 # If output is provided, save files and parse from disc 60 if output_prefix: 61 files = save_urls(urls, output_prefix) 62 return open_files(files) 63 else: 64 # stream contents from url 65 fhs = OrderedDict() 66 for url in urls: 67 if url.endswith(".tar.gz") or url.endswith(".tgz"): 68 # tar files have mixed headers and content 69 # whole file should be loaded in memory first and not streamed 70 fhs[url] = tarfile.open(fileobj=load_url_mem(url), mode="r:gz") 71 elif url.endswith(".gz"): 72 fhs[url] = gzip.open(urllib.request.urlopen(url), mode="rb") 73 fhs[url].peek(1) # peek into file to check if is valid 74 else: 75 fhs[url] = urllib.request.urlopen(url) 76 77 return fhs 78 except (HTTPError, zlib.error, tarfile.TarError): 79 warnings.warn( 80 "Download failed, trying again (" 81 + str(att) 82 + "/" 83 + str(retry_attempts) 84 + ")", 85 UserWarning, 86 ) 87 88 raise Exception("One or more files could not be downloaded: " + ", ".join(urls)) 89 90 91def filter_function(elements, function, value): 92 return [elements[i] for i, v in enumerate(map(function, elements)) if v == value] 93 94 95def format_repr(inst): 96 vals = [ 97 f"version={repr(inst.version)}", 98 f"source={repr(inst.sources)}", 99 f"datetime={repr(inst.datetime)}", 100 ] 101 return f"{inst.__class__.__name__}({', '.join(vals)})" 102 103 104def join_check(elements, sep: str): 105 if elements: 106 return sep.join(map(str, elements)) 107 else: 108 return "" 109 110 111def load_url_mem(url: str): 112 """ 113 Parameters: 114 * **url** *[str]*: URL to load into memory 115 116 Returns: 117 * io.BytesIO of the requested url 118 """ 119 urlstream = urllib.request.urlopen(url) 120 # From https://stackoverflow.com/questions/18623842/read-contents-tarfile-into-python-seeking-backwards-is-not-allowed 121 tmpfile = io.BytesIO() 122 while True: 123 s = urlstream.read(io.DEFAULT_BUFFER_SIZE) 124 if not s: 125 break 126 tmpfile.write(s) 127 urlstream.close() 128 tmpfile.seek(0) 129 return tmpfile 130 131 132def open_files(files: list): 133 """ 134 Parameters: 135 * **files** *[list]*: List of files to open (text, ".gz", ".tar.gz", ".tgz") 136 137 Returns: 138 * OrderedDict {file: file handler} (same order as input) 139 """ 140 141 fhs = OrderedDict() 142 for file in files: 143 if file.endswith(".tar.gz") or file.endswith(".tgz"): 144 fhs[file] = tarfile.open(file, mode="r:gz") 145 elif file.endswith(".gz"): 146 fhs[file] = gzip.open(file, "rt") 147 else: 148 fhs[file] = open(file, "r") 149 return fhs 150 151 152def reverse_dict(d: dict): 153 rd = {} 154 for k, v in d.items(): 155 if v not in rd: 156 rd[v] = [] 157 rd[v].append(k) 158 return rd 159 160 161def save_urls(urls: list, output_prefix: str): 162 """ 163 Parameters: 164 * **urls** *[list]*: List of urls to download 165 * **output_prefix** *[str]*: Output directory to save files 166 167 Returns: 168 * list of files saved 169 """ 170 files = [] 171 for url in urls: 172 outfile = output_prefix + os.path.basename(url) 173 check_no_file(outfile) 174 urlstream = urllib.request.urlopen(url) 175 with open(outfile, "b+w") as f: 176 f.write(urlstream.read()) 177 urlstream.close() 178 files.append(outfile) 179 return files 180 181 182def warning_on_one_line(message, category, filename, lineno, file=None, line=None): 183 return "%s:%s: %s: %s\n" % (filename, lineno, category.__name__, message) 184 185 186warnings.formatwarning = warning_on_one_line
def
check_dir(prefix: str):
def
check_file(file: str):
def
check_no_file(file: str):
def
close_files(fhs: dict):
31def close_files(fhs: dict): 32 """ 33 Parameters: 34 * **fhs** *[dict]*: {file: file handler} 35 36 Returns: Nothing 37 """ 38 for fh in fhs.values(): 39 fh.close()
Parameters:
- fhs [dict]: {file: file handler}
Returns: Nothing
def
download_files(urls: list, output_prefix: str = None, retry_attempts: int = 1):
42def download_files(urls: list, output_prefix: str = None, retry_attempts: int = 1): 43 """ 44 Download and open files (memory/stream) or write to disk (multitax.utils.save_urls) 45 46 Parameters: 47 * **urls** *[list]*: List of files to download (text, ".gz", ".tar.gz", ".tgz") 48 * **output_prefix** *[str]*: Output directory to save files 49 50 Returns: 51 * OrderedDict {file: file handler} (same order as input) 52 """ 53 if isinstance(urls, str): 54 urls = [urls] 55 56 att = 0 57 while att < retry_attempts: 58 att += 1 59 try: 60 # If output is provided, save files and parse from disc 61 if output_prefix: 62 files = save_urls(urls, output_prefix) 63 return open_files(files) 64 else: 65 # stream contents from url 66 fhs = OrderedDict() 67 for url in urls: 68 if url.endswith(".tar.gz") or url.endswith(".tgz"): 69 # tar files have mixed headers and content 70 # whole file should be loaded in memory first and not streamed 71 fhs[url] = tarfile.open(fileobj=load_url_mem(url), mode="r:gz") 72 elif url.endswith(".gz"): 73 fhs[url] = gzip.open(urllib.request.urlopen(url), mode="rb") 74 fhs[url].peek(1) # peek into file to check if is valid 75 else: 76 fhs[url] = urllib.request.urlopen(url) 77 78 return fhs 79 except (HTTPError, zlib.error, tarfile.TarError): 80 warnings.warn( 81 "Download failed, trying again (" 82 + str(att) 83 + "/" 84 + str(retry_attempts) 85 + ")", 86 UserWarning, 87 ) 88 89 raise Exception("One or more files could not be downloaded: " + ", ".join(urls))
Download and open files (memory/stream) or write to disk (multitax.utils.save_urls)
Parameters:
- urls [list]: List of files to download (text, ".gz", ".tar.gz", ".tgz")
- output_prefix [str]: Output directory to save files
Returns:
- OrderedDict {file: file handler} (same order as input)
def
filter_function(elements, function, value):
def
format_repr(inst):
def
join_check(elements, sep: str):
def
load_url_mem(url: str):
112def load_url_mem(url: str): 113 """ 114 Parameters: 115 * **url** *[str]*: URL to load into memory 116 117 Returns: 118 * io.BytesIO of the requested url 119 """ 120 urlstream = urllib.request.urlopen(url) 121 # From https://stackoverflow.com/questions/18623842/read-contents-tarfile-into-python-seeking-backwards-is-not-allowed 122 tmpfile = io.BytesIO() 123 while True: 124 s = urlstream.read(io.DEFAULT_BUFFER_SIZE) 125 if not s: 126 break 127 tmpfile.write(s) 128 urlstream.close() 129 tmpfile.seek(0) 130 return tmpfile
Parameters:
- url [str]: URL to load into memory
Returns:
- io.BytesIO of the requested url
def
open_files(files: list):
133def open_files(files: list): 134 """ 135 Parameters: 136 * **files** *[list]*: List of files to open (text, ".gz", ".tar.gz", ".tgz") 137 138 Returns: 139 * OrderedDict {file: file handler} (same order as input) 140 """ 141 142 fhs = OrderedDict() 143 for file in files: 144 if file.endswith(".tar.gz") or file.endswith(".tgz"): 145 fhs[file] = tarfile.open(file, mode="r:gz") 146 elif file.endswith(".gz"): 147 fhs[file] = gzip.open(file, "rt") 148 else: 149 fhs[file] = open(file, "r") 150 return fhs
Parameters:
- files [list]: List of files to open (text, ".gz", ".tar.gz", ".tgz")
Returns:
- OrderedDict {file: file handler} (same order as input)
def
reverse_dict(d: dict):
def
save_urls(urls: list, output_prefix: str):
162def save_urls(urls: list, output_prefix: str): 163 """ 164 Parameters: 165 * **urls** *[list]*: List of urls to download 166 * **output_prefix** *[str]*: Output directory to save files 167 168 Returns: 169 * list of files saved 170 """ 171 files = [] 172 for url in urls: 173 outfile = output_prefix + os.path.basename(url) 174 check_no_file(outfile) 175 urlstream = urllib.request.urlopen(url) 176 with open(outfile, "b+w") as f: 177 f.write(urlstream.read()) 178 urlstream.close() 179 files.append(outfile) 180 return files
Parameters:
- urls [list]: List of urls to download
- output_prefix [str]: Output directory to save files
Returns:
- list of files saved
def
warning_on_one_line(message, category, filename, lineno, file=None, line=None):