multitax.utils

  1import gzip
  2import io
  3import os
  4import tarfile
  5import urllib.request
  6import zlib
  7import warnings
  8from collections import OrderedDict
  9from urllib.error import HTTPError
 10
 11
 12def check_dir(prefix: str):
 13    abs_path = os.path.dirname(os.path.abspath(prefix))
 14    if not os.path.exists(abs_path):
 15        raise NotADirectoryError(abs_path)
 16
 17
 18def check_file(file: str):
 19    if not os.path.isfile(file):
 20        raise FileNotFoundError(file + " file do not exist")
 21    if os.path.getsize(file) == 0:
 22        raise FileNotFoundError(file + " file is empty")
 23
 24
 25def check_no_file(file: str):
 26    if os.path.isfile(file):
 27        raise FileExistsError(file)
 28
 29
 30def close_files(fhs: dict):
 31    """
 32    Parameters:
 33    * **fhs** *[dict]*: {file: file handler}
 34
 35    Returns: Nothing
 36    """
 37    for fh in fhs.values():
 38        fh.close()
 39
 40
 41def download_files(urls: list, output_prefix: str = None, retry_attempts: int = 1):
 42    """
 43    Download and open files (memory/stream) or write to disk (multitax.utils.save_urls)
 44
 45    Parameters:
 46    * **urls** *[list]*: List of files to download (text, ".gz", ".tar.gz", ".tgz")
 47    * **output_prefix** *[str]*: Output directory to save files
 48
 49    Returns:
 50    * OrderedDict {file: file handler} (same order as input)
 51    """
 52    if isinstance(urls, str):
 53        urls = [urls]
 54
 55    att = 0
 56    while att < retry_attempts:
 57        att += 1
 58        try:
 59            # If output is provided, save files and parse from disc
 60            if output_prefix:
 61                files = save_urls(urls, output_prefix)
 62                return open_files(files)
 63            else:
 64                # stream contents from url
 65                fhs = OrderedDict()
 66                for url in urls:
 67                    if url.endswith(".tar.gz") or url.endswith(".tgz"):
 68                        # tar files have mixed headers and content
 69                        # whole file should be loaded in memory first and not streamed
 70                        fhs[url] = tarfile.open(fileobj=load_url_mem(url), mode="r:gz")
 71                    elif url.endswith(".gz"):
 72                        fhs[url] = gzip.open(urllib.request.urlopen(url), mode="rb")
 73                        fhs[url].peek(1)  # peek into file to check if is valid
 74                    else:
 75                        fhs[url] = urllib.request.urlopen(url)
 76
 77                return fhs
 78        except (HTTPError, zlib.error, tarfile.TarError):
 79            warnings.warn(
 80                "Download failed, trying again ("
 81                + str(att)
 82                + "/"
 83                + str(retry_attempts)
 84                + ")",
 85                UserWarning,
 86            )
 87
 88    raise Exception("One or more files could not be downloaded: " + ", ".join(urls))
 89
 90
 91def download_parse_data_gtdb(version, file, url):
 92    if file:
 93        fhs = open_files(files=[file])
 94    else:
 95        if not url:
 96            url = f"https://github.com/pirovc/multitax/raw/refs/heads/main/data/gtdb/{version}_acc_rep_lin_ncbi.tsv.gz"
 97        fhs = download_files(urls=[url], retry_attempts=3)
 98
 99    for fh in fhs.values():
100        for line in fh:
101            try:
102                yield line.rstrip().split("\t")
103            except TypeError:
104                yield line.decode().rstrip().split("\t")
105
106
107def filter_function(elements, function, value):
108    return [elements[i] for i, v in enumerate(map(function, elements)) if v == value]
109
110
111def format_repr(inst):
112    vals = [
113        f"version={repr(inst.version)}",
114        f"source={repr(inst.sources)}",
115        f"datetime={repr(inst.datetime)}",
116    ]
117    return f"{inst.__class__.__name__}({', '.join(vals)})"
118
119
120def join_check(elements, sep: str):
121    if elements:
122        return sep.join(map(str, elements))
123    else:
124        return ""
125
126
127def load_url_mem(url: str):
128    """
129    Parameters:
130    * **url** *[str]*: URL to load into memory
131
132    Returns:
133    * io.BytesIO of the requested url
134    """
135    urlstream = urllib.request.urlopen(url)
136    # From https://stackoverflow.com/questions/18623842/read-contents-tarfile-into-python-seeking-backwards-is-not-allowed
137    tmpfile = io.BytesIO()
138    while True:
139        s = urlstream.read(io.DEFAULT_BUFFER_SIZE)
140        if not s:
141            break
142        tmpfile.write(s)
143    urlstream.close()
144    tmpfile.seek(0)
145    return tmpfile
146
147
148def open_files(files: list):
149    """
150    Parameters:
151    * **files** *[list]*: List of files to open (text, ".gz", ".tar.gz", ".tgz")
152
153    Returns:
154    * OrderedDict {file: file handler} (same order as input)
155    """
156
157    fhs = OrderedDict()
158    for file in files:
159        if file.endswith(".tar.gz") or file.endswith(".tgz"):
160            fhs[file] = tarfile.open(file, mode="r:gz")
161        elif file.endswith(".gz"):
162            fhs[file] = gzip.open(file, "rt")
163        else:
164            fhs[file] = open(file, "r")
165    return fhs
166
167
168def reverse_dict(d: dict):
169    rd = {}
170    for k, v in d.items():
171        if v not in rd:
172            rd[v] = []
173        rd[v].append(k)
174    return rd
175
176
177def save_urls(urls: list, output_prefix: str):
178    """
179    Parameters:
180    * **urls** *[list]*: List of urls to download
181    * **output_prefix** *[str]*: Output directory to save files
182
183    Returns:
184    * list of files saved
185    """
186    files = []
187    for url in urls:
188        outfile = output_prefix + os.path.basename(url)
189        check_no_file(outfile)
190        urlstream = urllib.request.urlopen(url)
191        with open(outfile, "b+w") as f:
192            f.write(urlstream.read())
193        urlstream.close()
194        files.append(outfile)
195    return files
196
197
198def warning_on_one_line(message, category, filename, lineno, file=None, line=None):
199    return "%s:%s: %s: %s\n" % (filename, lineno, category.__name__, message)
200
201
202warnings.formatwarning = warning_on_one_line
def check_dir(prefix: str):
13def check_dir(prefix: str):
14    abs_path = os.path.dirname(os.path.abspath(prefix))
15    if not os.path.exists(abs_path):
16        raise NotADirectoryError(abs_path)
def check_file(file: str):
19def check_file(file: str):
20    if not os.path.isfile(file):
21        raise FileNotFoundError(file + " file do not exist")
22    if os.path.getsize(file) == 0:
23        raise FileNotFoundError(file + " file is empty")
def check_no_file(file: str):
26def check_no_file(file: str):
27    if os.path.isfile(file):
28        raise FileExistsError(file)
def close_files(fhs: dict):
31def close_files(fhs: dict):
32    """
33    Parameters:
34    * **fhs** *[dict]*: {file: file handler}
35
36    Returns: Nothing
37    """
38    for fh in fhs.values():
39        fh.close()

Parameters:

  • fhs [dict]: {file: file handler}

Returns: Nothing

def download_files(urls: list, output_prefix: str = None, retry_attempts: int = 1):
42def download_files(urls: list, output_prefix: str = None, retry_attempts: int = 1):
43    """
44    Download and open files (memory/stream) or write to disk (multitax.utils.save_urls)
45
46    Parameters:
47    * **urls** *[list]*: List of files to download (text, ".gz", ".tar.gz", ".tgz")
48    * **output_prefix** *[str]*: Output directory to save files
49
50    Returns:
51    * OrderedDict {file: file handler} (same order as input)
52    """
53    if isinstance(urls, str):
54        urls = [urls]
55
56    att = 0
57    while att < retry_attempts:
58        att += 1
59        try:
60            # If output is provided, save files and parse from disc
61            if output_prefix:
62                files = save_urls(urls, output_prefix)
63                return open_files(files)
64            else:
65                # stream contents from url
66                fhs = OrderedDict()
67                for url in urls:
68                    if url.endswith(".tar.gz") or url.endswith(".tgz"):
69                        # tar files have mixed headers and content
70                        # whole file should be loaded in memory first and not streamed
71                        fhs[url] = tarfile.open(fileobj=load_url_mem(url), mode="r:gz")
72                    elif url.endswith(".gz"):
73                        fhs[url] = gzip.open(urllib.request.urlopen(url), mode="rb")
74                        fhs[url].peek(1)  # peek into file to check if is valid
75                    else:
76                        fhs[url] = urllib.request.urlopen(url)
77
78                return fhs
79        except (HTTPError, zlib.error, tarfile.TarError):
80            warnings.warn(
81                "Download failed, trying again ("
82                + str(att)
83                + "/"
84                + str(retry_attempts)
85                + ")",
86                UserWarning,
87            )
88
89    raise Exception("One or more files could not be downloaded: " + ", ".join(urls))

Download and open files (memory/stream) or write to disk (multitax.utils.save_urls)

Parameters:

  • urls [list]: List of files to download (text, ".gz", ".tar.gz", ".tgz")
  • output_prefix [str]: Output directory to save files

Returns:

  • OrderedDict {file: file handler} (same order as input)
def download_parse_data_gtdb(version, file, url):
 92def download_parse_data_gtdb(version, file, url):
 93    if file:
 94        fhs = open_files(files=[file])
 95    else:
 96        if not url:
 97            url = f"https://github.com/pirovc/multitax/raw/refs/heads/main/data/gtdb/{version}_acc_rep_lin_ncbi.tsv.gz"
 98        fhs = download_files(urls=[url], retry_attempts=3)
 99
100    for fh in fhs.values():
101        for line in fh:
102            try:
103                yield line.rstrip().split("\t")
104            except TypeError:
105                yield line.decode().rstrip().split("\t")
def filter_function(elements, function, value):
108def filter_function(elements, function, value):
109    return [elements[i] for i, v in enumerate(map(function, elements)) if v == value]
def format_repr(inst):
112def format_repr(inst):
113    vals = [
114        f"version={repr(inst.version)}",
115        f"source={repr(inst.sources)}",
116        f"datetime={repr(inst.datetime)}",
117    ]
118    return f"{inst.__class__.__name__}({', '.join(vals)})"
def join_check(elements, sep: str):
121def join_check(elements, sep: str):
122    if elements:
123        return sep.join(map(str, elements))
124    else:
125        return ""
def load_url_mem(url: str):
128def load_url_mem(url: str):
129    """
130    Parameters:
131    * **url** *[str]*: URL to load into memory
132
133    Returns:
134    * io.BytesIO of the requested url
135    """
136    urlstream = urllib.request.urlopen(url)
137    # From https://stackoverflow.com/questions/18623842/read-contents-tarfile-into-python-seeking-backwards-is-not-allowed
138    tmpfile = io.BytesIO()
139    while True:
140        s = urlstream.read(io.DEFAULT_BUFFER_SIZE)
141        if not s:
142            break
143        tmpfile.write(s)
144    urlstream.close()
145    tmpfile.seek(0)
146    return tmpfile

Parameters:

  • url [str]: URL to load into memory

Returns:

  • io.BytesIO of the requested url
def open_files(files: list):
149def open_files(files: list):
150    """
151    Parameters:
152    * **files** *[list]*: List of files to open (text, ".gz", ".tar.gz", ".tgz")
153
154    Returns:
155    * OrderedDict {file: file handler} (same order as input)
156    """
157
158    fhs = OrderedDict()
159    for file in files:
160        if file.endswith(".tar.gz") or file.endswith(".tgz"):
161            fhs[file] = tarfile.open(file, mode="r:gz")
162        elif file.endswith(".gz"):
163            fhs[file] = gzip.open(file, "rt")
164        else:
165            fhs[file] = open(file, "r")
166    return fhs

Parameters:

  • files [list]: List of files to open (text, ".gz", ".tar.gz", ".tgz")

Returns:

  • OrderedDict {file: file handler} (same order as input)
def reverse_dict(d: dict):
169def reverse_dict(d: dict):
170    rd = {}
171    for k, v in d.items():
172        if v not in rd:
173            rd[v] = []
174        rd[v].append(k)
175    return rd
def save_urls(urls: list, output_prefix: str):
178def save_urls(urls: list, output_prefix: str):
179    """
180    Parameters:
181    * **urls** *[list]*: List of urls to download
182    * **output_prefix** *[str]*: Output directory to save files
183
184    Returns:
185    * list of files saved
186    """
187    files = []
188    for url in urls:
189        outfile = output_prefix + os.path.basename(url)
190        check_no_file(outfile)
191        urlstream = urllib.request.urlopen(url)
192        with open(outfile, "b+w") as f:
193            f.write(urlstream.read())
194        urlstream.close()
195        files.append(outfile)
196    return files

Parameters:

  • urls [list]: List of urls to download
  • output_prefix [str]: Output directory to save files

Returns:

  • list of files saved
def warning_on_one_line(message, category, filename, lineno, file=None, line=None):
199def warning_on_one_line(message, category, filename, lineno, file=None, line=None):
200    return "%s:%s: %s: %s\n" % (filename, lineno, category.__name__, message)