multitax.utils
1import gzip 2import io 3import os 4import tarfile 5import urllib.request 6import zlib 7import warnings 8from collections import OrderedDict 9from urllib.error import HTTPError 10 11 12def check_dir(prefix: str): 13 abs_path = os.path.dirname(os.path.abspath(prefix)) 14 if not os.path.exists(abs_path): 15 raise NotADirectoryError(abs_path) 16 17 18def check_file(file: str): 19 if not os.path.isfile(file): 20 raise FileNotFoundError(file + " file do not exist") 21 if os.path.getsize(file) == 0: 22 raise FileNotFoundError(file + " file is empty") 23 24 25def check_no_file(file: str): 26 if os.path.isfile(file): 27 raise FileExistsError(file) 28 29 30def close_files(fhs: dict): 31 """ 32 Parameters: 33 * **fhs** *[dict]*: {file: file handler} 34 35 Returns: Nothing 36 """ 37 for fh in fhs.values(): 38 fh.close() 39 40 41def download_files(urls: list, output_prefix: str = None, retry_attempts: int = 1): 42 """ 43 Download and open files (memory/stream) or write to disk (multitax.utils.save_urls) 44 45 Parameters: 46 * **urls** *[list]*: List of files to download (text, ".gz", ".tar.gz", ".tgz") 47 * **output_prefix** *[str]*: Output directory to save files 48 49 Returns: 50 * OrderedDict {file: file handler} (same order as input) 51 """ 52 if isinstance(urls, str): 53 urls = [urls] 54 55 att = 0 56 while att < retry_attempts: 57 att += 1 58 try: 59 # If output is provided, save files and parse from disc 60 if output_prefix: 61 files = save_urls(urls, output_prefix) 62 return open_files(files) 63 else: 64 # stream contents from url 65 fhs = OrderedDict() 66 for url in urls: 67 if url.endswith(".tar.gz") or url.endswith(".tgz"): 68 # tar files have mixed headers and content 69 # whole file should be loaded in memory first and not streamed 70 fhs[url] = tarfile.open(fileobj=load_url_mem(url), mode="r:gz") 71 elif url.endswith(".gz"): 72 fhs[url] = gzip.open(urllib.request.urlopen(url), mode="rb") 73 fhs[url].peek(1) # peek into file to check if is valid 74 else: 75 fhs[url] = urllib.request.urlopen(url) 76 77 return fhs 78 except (HTTPError, zlib.error, tarfile.TarError): 79 warnings.warn( 80 "Download failed, trying again (" 81 + str(att) 82 + "/" 83 + str(retry_attempts) 84 + ")", 85 UserWarning, 86 ) 87 88 raise Exception("One or more files could not be downloaded: " + ", ".join(urls)) 89 90 91def download_parse_data_gtdb(version, file, url): 92 if file: 93 fhs = open_files(files=[file]) 94 else: 95 if not url: 96 url = f"https://github.com/pirovc/multitax/raw/refs/heads/main/data/gtdb/{version}_acc_rep_lin_ncbi.tsv.gz" 97 fhs = download_files(urls=[url], retry_attempts=3) 98 99 for fh in fhs.values(): 100 for line in fh: 101 try: 102 yield line.rstrip().split("\t") 103 except TypeError: 104 yield line.decode().rstrip().split("\t") 105 106 107def filter_function(elements, function, value): 108 return [elements[i] for i, v in enumerate(map(function, elements)) if v == value] 109 110 111def format_repr(inst): 112 vals = [ 113 f"version={repr(inst.version)}", 114 f"source={repr(inst.sources)}", 115 f"datetime={repr(inst.datetime)}", 116 ] 117 return f"{inst.__class__.__name__}({', '.join(vals)})" 118 119 120def join_check(elements, sep: str): 121 if elements: 122 return sep.join(map(str, elements)) 123 else: 124 return "" 125 126 127def load_url_mem(url: str): 128 """ 129 Parameters: 130 * **url** *[str]*: URL to load into memory 131 132 Returns: 133 * io.BytesIO of the requested url 134 """ 135 urlstream = urllib.request.urlopen(url) 136 # From https://stackoverflow.com/questions/18623842/read-contents-tarfile-into-python-seeking-backwards-is-not-allowed 137 tmpfile = io.BytesIO() 138 while True: 139 s = urlstream.read(io.DEFAULT_BUFFER_SIZE) 140 if not s: 141 break 142 tmpfile.write(s) 143 urlstream.close() 144 tmpfile.seek(0) 145 return tmpfile 146 147 148def open_files(files: list): 149 """ 150 Parameters: 151 * **files** *[list]*: List of files to open (text, ".gz", ".tar.gz", ".tgz") 152 153 Returns: 154 * OrderedDict {file: file handler} (same order as input) 155 """ 156 157 fhs = OrderedDict() 158 for file in files: 159 if file.endswith(".tar.gz") or file.endswith(".tgz"): 160 fhs[file] = tarfile.open(file, mode="r:gz") 161 elif file.endswith(".gz"): 162 fhs[file] = gzip.open(file, "rt") 163 else: 164 fhs[file] = open(file, "r") 165 return fhs 166 167 168def reverse_dict(d: dict): 169 rd = {} 170 for k, v in d.items(): 171 if v not in rd: 172 rd[v] = [] 173 rd[v].append(k) 174 return rd 175 176 177def save_urls(urls: list, output_prefix: str): 178 """ 179 Parameters: 180 * **urls** *[list]*: List of urls to download 181 * **output_prefix** *[str]*: Output directory to save files 182 183 Returns: 184 * list of files saved 185 """ 186 files = [] 187 for url in urls: 188 outfile = output_prefix + os.path.basename(url) 189 check_no_file(outfile) 190 urlstream = urllib.request.urlopen(url) 191 with open(outfile, "b+w") as f: 192 f.write(urlstream.read()) 193 urlstream.close() 194 files.append(outfile) 195 return files 196 197 198def warning_on_one_line(message, category, filename, lineno, file=None, line=None): 199 return "%s:%s: %s: %s\n" % (filename, lineno, category.__name__, message) 200 201 202warnings.formatwarning = warning_on_one_line
def
check_dir(prefix: str):
def
check_file(file: str):
def
check_no_file(file: str):
def
close_files(fhs: dict):
31def close_files(fhs: dict): 32 """ 33 Parameters: 34 * **fhs** *[dict]*: {file: file handler} 35 36 Returns: Nothing 37 """ 38 for fh in fhs.values(): 39 fh.close()
Parameters:
- fhs [dict]: {file: file handler}
Returns: Nothing
def
download_files(urls: list, output_prefix: str = None, retry_attempts: int = 1):
42def download_files(urls: list, output_prefix: str = None, retry_attempts: int = 1): 43 """ 44 Download and open files (memory/stream) or write to disk (multitax.utils.save_urls) 45 46 Parameters: 47 * **urls** *[list]*: List of files to download (text, ".gz", ".tar.gz", ".tgz") 48 * **output_prefix** *[str]*: Output directory to save files 49 50 Returns: 51 * OrderedDict {file: file handler} (same order as input) 52 """ 53 if isinstance(urls, str): 54 urls = [urls] 55 56 att = 0 57 while att < retry_attempts: 58 att += 1 59 try: 60 # If output is provided, save files and parse from disc 61 if output_prefix: 62 files = save_urls(urls, output_prefix) 63 return open_files(files) 64 else: 65 # stream contents from url 66 fhs = OrderedDict() 67 for url in urls: 68 if url.endswith(".tar.gz") or url.endswith(".tgz"): 69 # tar files have mixed headers and content 70 # whole file should be loaded in memory first and not streamed 71 fhs[url] = tarfile.open(fileobj=load_url_mem(url), mode="r:gz") 72 elif url.endswith(".gz"): 73 fhs[url] = gzip.open(urllib.request.urlopen(url), mode="rb") 74 fhs[url].peek(1) # peek into file to check if is valid 75 else: 76 fhs[url] = urllib.request.urlopen(url) 77 78 return fhs 79 except (HTTPError, zlib.error, tarfile.TarError): 80 warnings.warn( 81 "Download failed, trying again (" 82 + str(att) 83 + "/" 84 + str(retry_attempts) 85 + ")", 86 UserWarning, 87 ) 88 89 raise Exception("One or more files could not be downloaded: " + ", ".join(urls))
Download and open files (memory/stream) or write to disk (multitax.utils.save_urls)
Parameters:
- urls [list]: List of files to download (text, ".gz", ".tar.gz", ".tgz")
- output_prefix [str]: Output directory to save files
Returns:
- OrderedDict {file: file handler} (same order as input)
def
download_parse_data_gtdb(version, file, url):
92def download_parse_data_gtdb(version, file, url): 93 if file: 94 fhs = open_files(files=[file]) 95 else: 96 if not url: 97 url = f"https://github.com/pirovc/multitax/raw/refs/heads/main/data/gtdb/{version}_acc_rep_lin_ncbi.tsv.gz" 98 fhs = download_files(urls=[url], retry_attempts=3) 99 100 for fh in fhs.values(): 101 for line in fh: 102 try: 103 yield line.rstrip().split("\t") 104 except TypeError: 105 yield line.decode().rstrip().split("\t")
def
filter_function(elements, function, value):
def
format_repr(inst):
def
join_check(elements, sep: str):
def
load_url_mem(url: str):
128def load_url_mem(url: str): 129 """ 130 Parameters: 131 * **url** *[str]*: URL to load into memory 132 133 Returns: 134 * io.BytesIO of the requested url 135 """ 136 urlstream = urllib.request.urlopen(url) 137 # From https://stackoverflow.com/questions/18623842/read-contents-tarfile-into-python-seeking-backwards-is-not-allowed 138 tmpfile = io.BytesIO() 139 while True: 140 s = urlstream.read(io.DEFAULT_BUFFER_SIZE) 141 if not s: 142 break 143 tmpfile.write(s) 144 urlstream.close() 145 tmpfile.seek(0) 146 return tmpfile
Parameters:
- url [str]: URL to load into memory
Returns:
- io.BytesIO of the requested url
def
open_files(files: list):
149def open_files(files: list): 150 """ 151 Parameters: 152 * **files** *[list]*: List of files to open (text, ".gz", ".tar.gz", ".tgz") 153 154 Returns: 155 * OrderedDict {file: file handler} (same order as input) 156 """ 157 158 fhs = OrderedDict() 159 for file in files: 160 if file.endswith(".tar.gz") or file.endswith(".tgz"): 161 fhs[file] = tarfile.open(file, mode="r:gz") 162 elif file.endswith(".gz"): 163 fhs[file] = gzip.open(file, "rt") 164 else: 165 fhs[file] = open(file, "r") 166 return fhs
Parameters:
- files [list]: List of files to open (text, ".gz", ".tar.gz", ".tgz")
Returns:
- OrderedDict {file: file handler} (same order as input)
def
reverse_dict(d: dict):
def
save_urls(urls: list, output_prefix: str):
178def save_urls(urls: list, output_prefix: str): 179 """ 180 Parameters: 181 * **urls** *[list]*: List of urls to download 182 * **output_prefix** *[str]*: Output directory to save files 183 184 Returns: 185 * list of files saved 186 """ 187 files = [] 188 for url in urls: 189 outfile = output_prefix + os.path.basename(url) 190 check_no_file(outfile) 191 urlstream = urllib.request.urlopen(url) 192 with open(outfile, "b+w") as f: 193 f.write(urlstream.read()) 194 urlstream.close() 195 files.append(outfile) 196 return files
Parameters:
- urls [list]: List of urls to download
- output_prefix [str]: Output directory to save files
Returns:
- list of files saved
def
warning_on_one_line(message, category, filename, lineno, file=None, line=None):