multitax.utils

  1import gzip
  2import io
  3import os
  4import tarfile
  5import urllib.request
  6import zlib
  7import warnings
  8from collections import OrderedDict
  9from urllib.error import HTTPError
 10
 11
 12def check_dir(prefix: str):
 13    abs_path = os.path.dirname(os.path.abspath(prefix))
 14    if not os.path.exists(abs_path):
 15        raise NotADirectoryError(abs_path)
 16
 17
 18def check_file(file: str):
 19    if not os.path.isfile(file):
 20        raise FileNotFoundError(file + " file do not exist")
 21    if os.path.getsize(file) == 0:
 22        raise FileNotFoundError(file + " file is empty")
 23
 24
 25def check_no_file(file: str):
 26    if os.path.isfile(file):
 27        raise FileExistsError(file)
 28
 29
 30def close_files(fhs: dict):
 31    """
 32    Parameters:
 33    * **fhs** *[dict]*: {file: file handler}
 34
 35    Returns: Nothing
 36    """
 37    for fh in fhs.values():
 38        fh.close()
 39
 40
 41def download_files(urls: list, output_prefix: str = None, retry_attempts: int = 1):
 42    """
 43    Download and open files (memory/stream) or write to disk (multitax.utils.save_urls)
 44
 45    Parameters:
 46    * **urls** *[list]*: List of files to download (text, ".gz", ".tar.gz", ".tgz")
 47    * **output_prefix** *[str]*: Output directory to save files
 48
 49    Returns:
 50    * OrderedDict {file: file handler} (same order as input)
 51    """
 52    if isinstance(urls, str):
 53        urls = [urls]
 54
 55    att = 0
 56    while att < retry_attempts:
 57        att += 1
 58        try:
 59            # If output is provided, save files and parse from disc
 60            if output_prefix:
 61                files = save_urls(urls, output_prefix)
 62                return open_files(files)
 63            else:
 64                # stream contents from url
 65                fhs = OrderedDict()
 66                for url in urls:
 67                    if url.endswith(".tar.gz") or url.endswith(".tgz"):
 68                        # tar files have mixed headers and content
 69                        # whole file should be loaded in memory first and not streamed
 70                        fhs[url] = tarfile.open(fileobj=load_url_mem(url), mode="r:gz")
 71                    elif url.endswith(".gz"):
 72                        fhs[url] = gzip.open(urllib.request.urlopen(url), mode="rb")
 73                        fhs[url].peek(1)  # peek into file to check if is valid
 74                    else:
 75                        fhs[url] = urllib.request.urlopen(url)
 76
 77                return fhs
 78        except (HTTPError, zlib.error, tarfile.TarError):
 79            warnings.warn(
 80                "Download failed, trying again ("
 81                + str(att)
 82                + "/"
 83                + str(retry_attempts)
 84                + ")",
 85                UserWarning,
 86            )
 87
 88    raise Exception("One or more files could not be downloaded: " + ", ".join(urls))
 89
 90
 91def filter_function(elements, function, value):
 92    return [elements[i] for i, v in enumerate(map(function, elements)) if v == value]
 93
 94
 95def format_repr(inst):
 96    vals = [
 97        f"version={repr(inst.version)}",
 98        f"source={repr(inst.sources)}",
 99        f"datetime={repr(inst.datetime)}",
100    ]
101    return f"{inst.__class__.__name__}({', '.join(vals)})"
102
103
104def join_check(elements, sep: str):
105    if elements:
106        return sep.join(map(str, elements))
107    else:
108        return ""
109
110
111def load_url_mem(url: str):
112    """
113    Parameters:
114    * **url** *[str]*: URL to load into memory
115
116    Returns:
117    * io.BytesIO of the requested url
118    """
119    urlstream = urllib.request.urlopen(url)
120    # From https://stackoverflow.com/questions/18623842/read-contents-tarfile-into-python-seeking-backwards-is-not-allowed
121    tmpfile = io.BytesIO()
122    while True:
123        s = urlstream.read(io.DEFAULT_BUFFER_SIZE)
124        if not s:
125            break
126        tmpfile.write(s)
127    urlstream.close()
128    tmpfile.seek(0)
129    return tmpfile
130
131
132def open_files(files: list):
133    """
134    Parameters:
135    * **files** *[list]*: List of files to open (text, ".gz", ".tar.gz", ".tgz")
136
137    Returns:
138    * OrderedDict {file: file handler} (same order as input)
139    """
140
141    fhs = OrderedDict()
142    for file in files:
143        if file.endswith(".tar.gz") or file.endswith(".tgz"):
144            fhs[file] = tarfile.open(file, mode="r:gz")
145        elif file.endswith(".gz"):
146            fhs[file] = gzip.open(file, "rt")
147        else:
148            fhs[file] = open(file, "r")
149    return fhs
150
151
152def reverse_dict(d: dict):
153    rd = {}
154    for k, v in d.items():
155        if v not in rd:
156            rd[v] = []
157        rd[v].append(k)
158    return rd
159
160
161def save_urls(urls: list, output_prefix: str):
162    """
163    Parameters:
164    * **urls** *[list]*: List of urls to download
165    * **output_prefix** *[str]*: Output directory to save files
166
167    Returns:
168    * list of files saved
169    """
170    files = []
171    for url in urls:
172        outfile = output_prefix + os.path.basename(url)
173        check_no_file(outfile)
174        urlstream = urllib.request.urlopen(url)
175        with open(outfile, "b+w") as f:
176            f.write(urlstream.read())
177        urlstream.close()
178        files.append(outfile)
179    return files
180
181
182def warning_on_one_line(message, category, filename, lineno, file=None, line=None):
183    return "%s:%s: %s: %s\n" % (filename, lineno, category.__name__, message)
184
185
186warnings.formatwarning = warning_on_one_line
def check_dir(prefix: str):
13def check_dir(prefix: str):
14    abs_path = os.path.dirname(os.path.abspath(prefix))
15    if not os.path.exists(abs_path):
16        raise NotADirectoryError(abs_path)
def check_file(file: str):
19def check_file(file: str):
20    if not os.path.isfile(file):
21        raise FileNotFoundError(file + " file do not exist")
22    if os.path.getsize(file) == 0:
23        raise FileNotFoundError(file + " file is empty")
def check_no_file(file: str):
26def check_no_file(file: str):
27    if os.path.isfile(file):
28        raise FileExistsError(file)
def close_files(fhs: dict):
31def close_files(fhs: dict):
32    """
33    Parameters:
34    * **fhs** *[dict]*: {file: file handler}
35
36    Returns: Nothing
37    """
38    for fh in fhs.values():
39        fh.close()

Parameters:

  • fhs [dict]: {file: file handler}

Returns: Nothing

def download_files(urls: list, output_prefix: str = None, retry_attempts: int = 1):
42def download_files(urls: list, output_prefix: str = None, retry_attempts: int = 1):
43    """
44    Download and open files (memory/stream) or write to disk (multitax.utils.save_urls)
45
46    Parameters:
47    * **urls** *[list]*: List of files to download (text, ".gz", ".tar.gz", ".tgz")
48    * **output_prefix** *[str]*: Output directory to save files
49
50    Returns:
51    * OrderedDict {file: file handler} (same order as input)
52    """
53    if isinstance(urls, str):
54        urls = [urls]
55
56    att = 0
57    while att < retry_attempts:
58        att += 1
59        try:
60            # If output is provided, save files and parse from disc
61            if output_prefix:
62                files = save_urls(urls, output_prefix)
63                return open_files(files)
64            else:
65                # stream contents from url
66                fhs = OrderedDict()
67                for url in urls:
68                    if url.endswith(".tar.gz") or url.endswith(".tgz"):
69                        # tar files have mixed headers and content
70                        # whole file should be loaded in memory first and not streamed
71                        fhs[url] = tarfile.open(fileobj=load_url_mem(url), mode="r:gz")
72                    elif url.endswith(".gz"):
73                        fhs[url] = gzip.open(urllib.request.urlopen(url), mode="rb")
74                        fhs[url].peek(1)  # peek into file to check if is valid
75                    else:
76                        fhs[url] = urllib.request.urlopen(url)
77
78                return fhs
79        except (HTTPError, zlib.error, tarfile.TarError):
80            warnings.warn(
81                "Download failed, trying again ("
82                + str(att)
83                + "/"
84                + str(retry_attempts)
85                + ")",
86                UserWarning,
87            )
88
89    raise Exception("One or more files could not be downloaded: " + ", ".join(urls))

Download and open files (memory/stream) or write to disk (multitax.utils.save_urls)

Parameters:

  • urls [list]: List of files to download (text, ".gz", ".tar.gz", ".tgz")
  • output_prefix [str]: Output directory to save files

Returns:

  • OrderedDict {file: file handler} (same order as input)
def filter_function(elements, function, value):
92def filter_function(elements, function, value):
93    return [elements[i] for i, v in enumerate(map(function, elements)) if v == value]
def format_repr(inst):
 96def format_repr(inst):
 97    vals = [
 98        f"version={repr(inst.version)}",
 99        f"source={repr(inst.sources)}",
100        f"datetime={repr(inst.datetime)}",
101    ]
102    return f"{inst.__class__.__name__}({', '.join(vals)})"
def join_check(elements, sep: str):
105def join_check(elements, sep: str):
106    if elements:
107        return sep.join(map(str, elements))
108    else:
109        return ""
def load_url_mem(url: str):
112def load_url_mem(url: str):
113    """
114    Parameters:
115    * **url** *[str]*: URL to load into memory
116
117    Returns:
118    * io.BytesIO of the requested url
119    """
120    urlstream = urllib.request.urlopen(url)
121    # From https://stackoverflow.com/questions/18623842/read-contents-tarfile-into-python-seeking-backwards-is-not-allowed
122    tmpfile = io.BytesIO()
123    while True:
124        s = urlstream.read(io.DEFAULT_BUFFER_SIZE)
125        if not s:
126            break
127        tmpfile.write(s)
128    urlstream.close()
129    tmpfile.seek(0)
130    return tmpfile

Parameters:

  • url [str]: URL to load into memory

Returns:

  • io.BytesIO of the requested url
def open_files(files: list):
133def open_files(files: list):
134    """
135    Parameters:
136    * **files** *[list]*: List of files to open (text, ".gz", ".tar.gz", ".tgz")
137
138    Returns:
139    * OrderedDict {file: file handler} (same order as input)
140    """
141
142    fhs = OrderedDict()
143    for file in files:
144        if file.endswith(".tar.gz") or file.endswith(".tgz"):
145            fhs[file] = tarfile.open(file, mode="r:gz")
146        elif file.endswith(".gz"):
147            fhs[file] = gzip.open(file, "rt")
148        else:
149            fhs[file] = open(file, "r")
150    return fhs

Parameters:

  • files [list]: List of files to open (text, ".gz", ".tar.gz", ".tgz")

Returns:

  • OrderedDict {file: file handler} (same order as input)
def reverse_dict(d: dict):
153def reverse_dict(d: dict):
154    rd = {}
155    for k, v in d.items():
156        if v not in rd:
157            rd[v] = []
158        rd[v].append(k)
159    return rd
def save_urls(urls: list, output_prefix: str):
162def save_urls(urls: list, output_prefix: str):
163    """
164    Parameters:
165    * **urls** *[list]*: List of urls to download
166    * **output_prefix** *[str]*: Output directory to save files
167
168    Returns:
169    * list of files saved
170    """
171    files = []
172    for url in urls:
173        outfile = output_prefix + os.path.basename(url)
174        check_no_file(outfile)
175        urlstream = urllib.request.urlopen(url)
176        with open(outfile, "b+w") as f:
177            f.write(urlstream.read())
178        urlstream.close()
179        files.append(outfile)
180    return files

Parameters:

  • urls [list]: List of urls to download
  • output_prefix [str]: Output directory to save files

Returns:

  • list of files saved
def warning_on_one_line(message, category, filename, lineno, file=None, line=None):
183def warning_on_one_line(message, category, filename, lineno, file=None, line=None):
184    return "%s:%s: %s: %s\n" % (filename, lineno, category.__name__, message)