multitax.multitax

  1from multitax.utils import (
  2    join_check,
  3    check_no_file,
  4    filter_function,
  5    reverse_dict,
  6    check_file,
  7    close_files,
  8    download_files,
  9    open_files,
 10    check_dir,
 11)
 12from collections import Counter
 13from datetime import datetime
 14from pylca.pylca import LCA
 15
 16
 17class MultiTax(object):
 18    _default_version = "current"
 19
 20    _supported_versions = ["current"]
 21    _default_urls = {}
 22    _default_root_node = "1"
 23    _standard_ranks = [
 24        "domain",
 25        "phylum",
 26        "class",
 27        "order",
 28        "family",
 29        "genus",
 30        "species",
 31    ]
 32
 33    def __init__(
 34        self,
 35        version: str = None,
 36        files: list = None,
 37        urls: list = None,
 38        output_prefix: str = None,
 39        root_node: str = None,
 40        root_parent: str = "0",
 41        root_name: str = None,
 42        root_rank: str = None,
 43        undefined_node: str = None,
 44        undefined_name: str = None,
 45        undefined_rank: str = None,
 46        build_name_nodes: bool = False,
 47        build_node_children: bool = False,
 48        build_rank_nodes: bool = False,
 49        extended_names: bool = False,
 50        empty: bool = False,
 51    ):
 52        """
 53        Main constructor of MultiTax and sub-classes
 54
 55        Parameters:
 56        * **version** *[str]*: Version to download/parse or custom version name (with files/urls).
 57        * **files** *[str, list]*: One or more local files to parse.
 58        * **urls** *[str, list]*: One or more urls to download and parse.
 59        * **output_prefix** *[str]*: Directory to write downloaded files.
 60        * **root_node** *[str]*: Define an alternative root node.
 61        * **root_parent** *[str]*: Define the root parent node identifier.
 62        * **root_name** *[str]*: Define an alternative root name. Set to None to use original name.
 63        * **root_rank** *[str]*: Define an alternative root rank. Set to None to use original name.
 64        * **undefined_node** *[str]*: Define a default return value for undefined nodes.
 65        * **undefined_name** *[str]*: Define a default return value for undefined names.
 66        * **undefined_rank** *[str]*: Define a default return value for undefined ranks.
 67        * **build_node_children** *[bool]*: Build node,children dict (otherwise it will be created on first use).
 68        * **build_name_nodes** *[bool]*: Build name,nodes dict (otherwise it will be created on first use).
 69        * **build_rank_nodes** *[bool]*: Build rank,nodes dict (otherwise it will be created on first use).
 70        * **extended_names** *[bool]*: Parse extended names if available.
 71        * **empty** *[bool]*: Create an empty instance.
 72
 73        Example:
 74
 75            tax_ncbi = NcbiTx()
 76            tax_gtdb = GtdbTx(files=["file1.gz", "file2.txt"])
 77            tax_silva = SilvaTx(urls=["https://www.arb-silva.de/fileadmin/silva_databases/current/Exports/taxonomy/tax_slv_lsu_138.1.txt.gz"])
 78            tax_ott = OttTx(root_node="844192")
 79            tax_gg = GreengenesTx(output_prefix="save/to/prefix_")
 80        """
 81        if files:
 82            if isinstance(files, str):
 83                files = [files]
 84            for file in files:
 85                check_file(file)
 86
 87        if output_prefix:
 88            check_dir(output_prefix)
 89
 90        # Main structures
 91        self._nodes = {}
 92        self._ranks = {}
 93        self._names = {}
 94
 95        # Aux. structures
 96        self._lineages = {}
 97        self._name_nodes = {}
 98        self._node_children = {}
 99        self._rank_nodes = {}
100        self._translated_nodes = {}
101        self._lca = None
102
103        # Properties
104        self.datetime = datetime.now()
105        self.version = None
106        self.undefined_node = undefined_node
107        self.undefined_name = undefined_name
108        self.undefined_rank = undefined_rank
109
110        # Set version
111        if files or urls:
112            self.version = version
113        else:
114            self.version = self._default_version if not version else version
115            if self.version not in self._supported_versions:
116                raise ValueError(
117                    f"Version [{self.version}] not supported (possible versions: {', '.join(self._supported_versions)}). To set a custom version, use files or urls."
118                )
119
120        # Store source of tax files (url or file)
121        self.sources = []
122
123        if not empty:
124            # Open/Download/Write files
125            fhs = {}
126            if files:
127                fhs = open_files(files)
128            elif urls or self._default_urls.get(self.version):
129                fhs = download_files(
130                    urls=urls if urls else self._default_urls[self.version],
131                    output_prefix=output_prefix,
132                    retry_attempts=3,
133                )
134
135            if fhs:
136                # Parse taxonomy
137                self._nodes, self._ranks, self._names = self._parse(
138                    fhs, extended_names=extended_names
139                )
140                close_files(fhs)
141                # Save sources for stats (files or urls)
142                self.sources = list(fhs.keys())
143
144        # Set root values
145        self._set_root_node(
146            root=root_node if root_node else self._default_root_node,
147            parent=root_parent,
148            name=root_name,
149            rank=root_rank,
150        )
151
152        # build auxiliary structures
153        if build_node_children:
154            self._node_children = reverse_dict(self._nodes)
155        if build_name_nodes:
156            self._name_nodes = reverse_dict(self._names)
157        if build_rank_nodes:
158            self._rank_nodes = reverse_dict(self._ranks)
159
160        self.check_consistency()
161
162    def _exact_name(self, text: str, names: dict):
163        """
164        Returns list of nodes of a given exact name (case sensitive).
165        """
166        if text in names:
167            return names[text]
168        else:
169            return []
170
171    def _parse(self, fhs: dict):
172        """
173        main function to be overloaded
174        receives a dictionary with {"url/file": file handler}
175        return nodes, ranks and names dicts
176        """
177        return {}, {}, {}
178
179    def _partial_name(self, text: str, names: dict):
180        """
181        Searches names containing a certain text (case sensitive) and return their respective nodes.
182        """
183        matching_nodes = set()
184        for name in names:
185            if text in name:
186                matching_nodes.update(names[name])
187        return list(matching_nodes)
188
189    def _recurse_leaves(self, node: str):
190        """
191        Recursive function returning leaf nodes
192        """
193        children = self.children(node)
194        if not children:
195            return [node]
196        leaves = []
197        for child in children:
198            leaves.extend(self._recurse_leaves(child))
199        return leaves
200
201    def _remove(self, node: str):
202        """
203        Removes node from taxonomy, no checking, for internal use
204        """
205        del self._nodes[node]
206        if node in self._names:
207            del self._names[node]
208        if node in self._ranks:
209            del self._ranks[node]
210
211    def _reset_aux_data(self):
212        """
213        Reset aux. data structures
214        """
215        self._lineages = {}
216        self._name_nodes = {}
217        self._node_children = {}
218        self._rank_nodes = {}
219        self._translated_nodes = {}
220        self._lca = None
221
222    def _set_root_node(self, root: str, parent: str, name: str, rank: str):
223        """
224        Set root node of the tree.
225        The files are parsed based on the self._default_root_node for each class
226        A user-defined root node can be:
227        1) internal: will filter the tree acodingly and delete the default root_node
228        2) external: will add node and link to the default
229        """
230
231        # Set parent/root with defaults
232        self.root_parent = parent
233        self.root_node = self._default_root_node
234        self._nodes[self.root_node] = self.root_parent
235
236        # Default root node is the top by definition
237        if root != self._default_root_node:
238            if root in self._nodes:
239                # Not default but exists on tree, filter only descendants
240                self.filter(root, desc=True)
241                # Remove entry for _default_root_node
242                self._remove(self._default_root_node)
243            else:
244                # Not on tree, link default node with new root
245                self._nodes[self._default_root_node] = root
246            # Change root to user defined
247            self.root_node = root
248            # Set/Update new root node parent link
249            self._nodes[self.root_node] = self.root_parent
250
251        # User-defined rank/name.
252        # If provided, insert manually,
253        # If None, check if is in the tree (defined in the given tax)
254        #    otherwise insert default "root"
255        if name:
256            self._names[self.root_node] = name
257        elif self.root_node not in self._names:
258            self._names[self.root_node] = "root"
259        # Set static name
260        self.root_name = self._names[self.root_node]
261
262        if rank:
263            self._ranks[self.root_node] = rank
264        elif self.root_node not in self._ranks:
265            self._ranks[self.root_node] = "root"
266        # Set static rank
267        self.root_rank = self._ranks[self.root_node]
268
269    def add(self, node: str, parent: str, name: str = None, rank: str = None):
270        """
271        Adds node to taxonomy.
272        Deletes built lineages, translations and lca.
273        """
274        if parent not in self._nodes:
275            raise ValueError("Parent node [" + parent + "] not found.")
276        elif node in self._nodes:
277            raise ValueError("Node [" + node + "] already present.")
278
279        self._nodes[node] = parent
280        self._names[node] = name if name is not None else self.undefined_name
281        self._ranks[node] = rank if rank is not None else self.undefined_rank
282        self._reset_aux_data()
283
284    def build_lca(self):
285        """
286        Builds LCA structure based on pylca.
287        Optional function, LCA is built on first .lca() call.
288
289        Returns: None
290        """
291        self._lca = LCA(self._nodes)
292
293    def build_lineages(self, root_node: str = None, ranks: list = None):
294        """
295        Stores lineages in memory for faster access.
296        It is valid for lineage(), rank_lineage() and name_lineage().
297        If keyword arguments (root_node, ranks) are used in those functions stored lineages are not used.
298
299        Returns: None
300        """
301        self.clear_lineages()
302        for node in self._nodes:
303            self._lineages[node] = self.lineage(
304                node=node, root_node=root_node, ranks=ranks
305            )
306
307    def build_translation(
308        self, tax, representatives: bool = False, file: str = None, url: str = None
309    ):
310        """
311        Create a translation of current taxonomy to another
312
313        Parameters:
314
315        * **tax** [MultiTax]: A target taxonomy to be translated to.
316        * **representatives** *[bool]*: Use only GTDB representative genomes to translate nodes.
317        * **file** *[str]*: Local file to parse.
318        * **url** *[str]*: Url to download and parse.
319
320        Example:
321
322            from multitax import GtdbTx, NcbiTx
323            gtdb_tax = GtdbTx()
324            ncbi_tax = NcbiTx()
325
326            # Automatically download translation files
327            gtdb_tax.build_translation(ncbi_tax)
328            gtdb_tax.translate("g__Escherichia")
329                ['561', '620', '590', '1224', '194', '543', '547', '570', '186803', '2005523', '841', '2', '1485', '2159', '216572', '1301', '128827', '815', '239759', '2791015', '1263', '1472649', '816', '171549', '2005473', '33024']
330
331            # Using local file from https://github.com/pirovc/multitax/tree/main/data/gtdb
332            ncbi_tax.build_translation(gtdb_tax, file="226_acc_rep_lin_ncbi.tsv.gz")
333            ncbi_tax.translate("620")
334                {'g__Escherichia', 'g__Proteus', 'g__Serratia'}
335
336            # Translation based on GTDB representative genome only
337            gtdb_tax.build_translation(ncbi_tax, representatives=True)
338            gtdb_tax.translate("g__Escherichia")
339                ['561', '547']
340        """
341        if file:
342            check_file(file)
343
344        self._translated_nodes = self._build_translation(
345            tax, representatives, file, url
346        )
347
348    def children(self, node: str):
349        """
350        Returns list of direct children nodes of a given node.
351        """
352        # Setup on first use
353        if not self._node_children:
354            self._node_children = reverse_dict(self._nodes)
355        if node in self._node_children:
356            return self._node_children[node]
357        else:
358            return []
359
360    def check_consistency(self):
361        """
362        Checks consistency of the tree
363
364        Returns: raise an Exception otherwise None
365        """
366        if self.root_node not in self._nodes:
367            raise ValueError("Root node [" + self.root_node + "] not found.")
368        if self.root_parent in self._nodes:
369            raise ValueError(
370                "Root parent ["
371                + self.root_parent
372                + "] found but should not be on tree."
373            )
374        if self.undefined_node in self._nodes:
375            raise ValueError(
376                "Undefined node ["
377                + self.undefined_node
378                + "] found but should not be on tree."
379            )
380
381        # Difference between values and keys should be only root_parent
382        lost_nodes = set(self._nodes.values()).difference(self._nodes)
383        if self.root_parent not in lost_nodes:
384            raise ValueError(
385                "Root parent [" + self.root_parent + "] not properly defined."
386            )
387        # Remove root_parent from lost nodes to report only missing
388        lost_nodes.remove(self.root_parent)
389        if len(lost_nodes) > 0:
390            raise ValueError("Parent nodes missing: " + ",".join(lost_nodes))
391
392        return None
393
394    def clear_lca(self):
395        """
396        Clear built LCA.
397
398        Returns: None
399        """
400        self._lca = None
401
402    def clear_lineages(self):
403        """
404        Clear built lineages.
405
406        Returns: None
407        """
408        self._lineages = {}
409
410    def closest_parent(self, node: str, ranks: str):
411        """
412        Returns the closest parent node based on a defined list of ranks
413        """
414        # Rank of node is already on the list
415        if self.rank(node) in ranks:
416            return node
417        else:
418            # check lineage from back to front until find a valid node
419            for n in self.lineage(node, ranks=ranks)[::-1]:
420                if n != self.undefined_node:
421                    return n
422        # nothing found
423        return self.undefined_node
424
425    def filter(self, nodes: list, desc: bool = False):
426        """
427        Filters taxonomy given a list of nodes.
428        By default keep all the ancestors of the given nodes.
429        If desc=True, keep all descendants instead.
430        Deletes built lineages, translations and lca.
431
432        Example:
433
434            from multitax import GtdbTx
435            tax = GtdbTx()
436
437            tax.lineage('s__Enterovibrio marina')
438            # ['1', 'd__Bacteria', 'p__Proteobacteria', 'c__Gammaproteobacteria', 'o__Enterobacterales', 'f__Vibrionaceae', 'g__Enterovibrio', 's__Enterovibrio marina']
439            # Keep only ancestors of 'g__Enterovibrio'
440            tax.filter('g__Enterovibrio')
441
442            # Reload taxonomy
443            tax = GtdbTx()
444            # Keep only descendants of 'g__Enterovibrio'
445            tax.filter('g__Enterovibrio', desc=True)
446        """
447        if isinstance(nodes, str):
448            nodes = [nodes]
449
450        # Keep track of nodes to be filtered out
451        filtered_nodes = set(self._nodes)
452        # Always keep root
453        filtered_nodes.discard(self.root_node)
454
455        if desc:
456            # Keep descendants of the given nodes
457            for node in nodes:
458                # Check if node exists (skips root)
459                if node in filtered_nodes:
460                    # For each leaf of the selected nodes
461                    for leaf in self.leaves(node):
462                        # Build lineage of each leaf up-to node itself
463                        for n in self.lineage(leaf, root_node=node):
464                            # Discard nodes from set to be kept
465                            filtered_nodes.discard(n)
466                    # Link node to root
467                    self._nodes[node] = self.root_node
468        else:
469            # Keep ancestors of the given nodes (full lineage up-to root)
470            for node in nodes:
471                # ranks=[] in case build_lineages() was used with specific ranks
472                for n in self.lineage(node, ranks=[]):
473                    # Discard nodes from set to be kept
474                    filtered_nodes.discard(n)
475
476        # Delete filtered nodes
477        for node in filtered_nodes:
478            self._remove(node)
479
480        # Delete aux. data structures
481        self._reset_aux_data()
482        self.check_consistency()
483
484    @classmethod
485    def from_customtx(cls, ctx):
486        """
487        Initialize a Tx sub-class based on a CustomTx instance.
488
489        Example:
490
491            tax_custom = CustomTx(version="custom_ncbi_files", files="my_custom_tax.tsv", cols=["node","parent","rank"])
492            tax_ncbi = NcbiTx.from_customtx(tax_custom)
493        """
494        nc = cls(empty=True)
495        nc.version = ctx.version
496        nc.sources = ctx.sources
497        nc._nodes = ctx._nodes
498        nc._names = ctx._names
499        nc._ranks = ctx._ranks
500        return nc
501
502    def latest(self, node: str):
503        """
504        Returns latest/updated version of a given node.
505        If node is already the latests, returns itself.
506        Mainly used for NCBI (merged.dmp) and OTT (forwards.tsv)
507        """
508        if node in self._nodes:
509            return node
510        else:
511            return self.undefined_node
512
513    def leaves(self, node: str = None):
514        """
515        Returns a list of leaf nodes of a given node.
516        """
517        if node is None or node == self.root_node:
518            # Leaves are nodes not contained in _nodes.values() ("parents")
519            return list(set(self._nodes).difference(self._nodes.values()))
520        elif node in self._nodes:
521            return self._recurse_leaves(node)
522        else:
523            return []
524
525    def lca(self, nodes: list = None):
526        """
527        Returns the lowest common ancestor of two or more nodes.
528
529        Example:
530
531            from multitax import GtdbTx
532            tax = GtdbTx()
533            tax.lca(["s__Escherichia coli", "s__Escherichia fergusonii"])
534        """
535        for node in nodes:
536            if node not in self._nodes:
537                raise ValueError("Node [" + node + "] not found.")
538
539        # Setup on first use
540        if not self._lca:
541            self.build_lca()
542
543        return self._lca(*nodes)
544
545    def lineage(self, node: str, root_node: str = None, ranks: list = None):
546        """
547        Returns a list with the lineage of a given node.
548        If ranks is provided, returns only nodes annotated with such ranks.
549        If root_node is provided, use it instead of default root of tree.
550        """
551        # If lineages were built with build_lineages() with matching params
552        if node in self._lineages and root_node is None and ranks is None:
553            return self._lineages[node]
554        else:
555            if not root_node:
556                root_node = self.root_node
557
558            n = node
559            if ranks:
560                # Fixed length lineage
561                lin = [self.undefined_node] * len(ranks)
562                # Loop until end of the tree (in case chosen root is not on lineage)
563                while n != self.undefined_node:
564                    r = self.rank(n)
565                    if r in ranks:
566                        lin[ranks.index(r)] = n
567                    # If node is root, break (after adding)
568                    if n == root_node:
569                        break
570                    n = self.parent(n)
571            else:
572                # Full lineage
573                lin = []
574                # Loop until end of the tree (in case chosen root is not on lineage)
575                while n != self.undefined_node:
576                    lin.append(n)
577                    # If node is root, break (after adding)
578                    if n == root_node:
579                        break
580                    n = self.parent(n)
581                # Reverse order
582                lin = lin[::-1]
583
584            # last iteration node (n) != root_node: didn't find the root, invalid lineage
585            if n != root_node:
586                return []
587            else:
588                return lin
589
590    def name(self, node: str):
591        """
592        Returns name of a given node.
593        """
594        if node in self._names:
595            return self._names[node]
596        else:
597            return self.undefined_name
598
599    def name_lineage(self, node: str, root_node: str = None, ranks: list = None):
600        """
601        Returns a list with the name lineage of a given node.
602        """
603        return list(
604            map(self.name, self.lineage(node=node, root_node=root_node, ranks=ranks))
605        )
606
607    def nodes_rank(self, rank: str):
608        """
609        Returns list of nodes of a given rank.
610        """
611        # Setup on first use
612        if not self._rank_nodes:
613            self._rank_nodes = reverse_dict(self._ranks)
614        if rank in self._rank_nodes:
615            return self._rank_nodes[rank]
616        else:
617            return []
618
619    def parent(self, node: str):
620        """
621        Returns the direct parent node of a given node.
622        """
623        if node in self._nodes:
624            return self._nodes[node]
625        else:
626            return self.undefined_node
627
628    def parent_rank(self, node: str, rank: str):
629        """
630        Returns the parent node of a given rank in the specified rank.
631        """
632        parent = self.lineage(node=node, ranks=[rank])
633        return parent[0] if parent else self.undefined_node
634
635    def prune(self, nodes: list):
636        """
637        Prunes branches of the tree under the given nodes.
638        Deletes built lineages, translations and lca.
639        """
640
641        if isinstance(nodes, str):
642            nodes = [nodes]
643
644        del_nodes = set()
645        for node in nodes:
646            if node not in self._nodes:
647                raise ValueError("Node [" + node + "] not found.")
648            for leaf in self.leaves(node):
649                for n in self.lineage(leaf, root_node=node)[1:]:
650                    del_nodes.add(n)
651
652        for n in del_nodes:
653            self._remove(n)
654
655        self._reset_aux_data()
656
657    def rank(self, node: str):
658        """
659        Returns the rank of a given node.
660        """
661        if node in self._ranks:
662            return self._ranks[node]
663        else:
664            return self.undefined_rank
665
666    def rank_lineage(self, node: str, root_node: str = None, ranks: list = None):
667        """
668        Returns a list with the rank lineage of a given node.
669        """
670        return list(
671            map(self.rank, self.lineage(node=node, root_node=root_node, ranks=ranks))
672        )
673
674    def remove(self, node: str, check_consistency: bool = False):
675        """
676        Removes node from taxonomy. Can break the tree if a parent node is removed. To remove a certain branch, use prune.
677        Running check consistency after removing a node is recommended.
678        Deletes built lineages, translations and lca.
679        """
680        if node not in self._nodes:
681            raise ValueError("Node [" + node + "] not found.")
682        self._remove(node)
683        self._reset_aux_data()
684        if check_consistency:
685            self.check_consistency()
686
687    def search_name(self, text: str, rank: str = None, exact: bool = True):
688        """
689        Search node by exact or partial name
690
691        Parameters:
692        * **text** *[str]*: Text to search.
693        * **rank** *[str]*: Filter results by rank.
694        * **exact** *[bool]*: Exact or partial name search (both case sensitive).
695
696        Returns: list of matching nodes
697        """
698        # Setup on first use
699        if not self._name_nodes:
700            self._name_nodes = reverse_dict(self._names)
701
702        if exact:
703            ret = self._exact_name(text, self._name_nodes)
704        else:
705            ret = self._partial_name(text, self._name_nodes)
706
707        # Only return nodes of chosen rank
708        if rank:
709            return filter_function(ret, self.rank, rank)
710        else:
711            return ret
712
713    def stats(self):
714        """
715        Returns a dict with general numbers of the taxonomic tree
716
717        Example:
718
719            from pprint import pprint
720            from multitax import GtdbTx
721            tax = GtdbTx()
722
723            pprint(tax.stats())
724            {'leaves': 30238,
725             'names': 42739,
726             'nodes': 42739,
727             'ranked_leaves': Counter({'species': 30238}),
728             'ranked_nodes': Counter({'species': 30238,
729                                      'genus': 8778,
730                                      'family': 2323,
731                                      'order': 930,
732                                      'class': 337,
733                                      'phylum': 131,
734                                      'domain': 1,
735                                      'root': 1}),
736             'ranks': 42739}
737        """
738        s = {}
739        s["nodes"] = len(self._nodes)
740        s["ranks"] = len(self._ranks)
741        s["names"] = len(self._names)
742        all_leaves = self.leaves(self.root_node)
743        s["leaves"] = len(all_leaves)
744        s["ranked_nodes"] = Counter(self._ranks.values())
745        s["ranked_leaves"] = Counter(map(self.rank, all_leaves))
746        return s
747
748    def translate(self, node: str, top_perc: float | None = None, counts: bool = False):
749        """
750        Returns the translated node(s) from another taxonomy. One node may translate to none, one or several nodes.
751        `counts` additionally outputs the number of entries/genomes used to translate each node.
752        The translation have to first be generated with the `build_translation` function.
753
754        Parameters:
755        * **node** *[str]*: Node to translate.
756        * **top_perc** *[float]*: Keep translations summing up to `top_perc` of the nodes based on counts.
757        * **counts** *[bool]*: Output a sorted list of tuples with the translated node and counts.
758
759        Returns: List of translated nodes (or list of tuples with counts)
760        """
761        if node in self._translated_nodes:
762            ret = Counter(self._translated_nodes[node])
763            i = None
764            if top_perc:
765                total = ret.total()
766                sm = 0
767                for i, (_, cnt) in enumerate(ret.most_common(), 1):
768                    sm += cnt
769                    if (sm / total) >= top_perc:
770                        break
771            return ret.most_common(i) if counts else [n[0] for n in ret.most_common(i)]
772
773        return []
774
775    def write(
776        self,
777        output_file: str,
778        cols: list = ["node", "parent", "rank", "name"],
779        sep: str = "\t",
780        sep_multi: str = "|",
781        ranks: list = None,
782        gz: bool = False,
783    ):
784        """
785        Writes loaded taxonomy to a file.
786
787        Parameters:
788        * **cols** *[list]*: Options: "node", "latest", "parent", "rank", "name", "leaves", "children", "lineage", "rank_lineage", "name_lineage"
789        * **sep** *[str]*: Separator of fields
790        * **sep_multi** *[str]*: Separator of multi-valued fields
791        * **ranks** *[list]*: Ranks to report
792        * **gz** *[bool]*: Gzip output
793
794        Returns: None
795        """
796        import gzip
797
798        if gz:
799            output_file = (
800                output_file if output_file.endswith(".gz") else output_file + ".gz"
801            )
802            check_no_file(output_file)
803            outf = gzip.open(output_file, "wt")
804        else:
805            check_no_file(output_file)
806            outf = open(output_file, "w")
807
808        write_field = {
809            "node": lambda node: node,
810            "latest": self.latest,
811            "parent": self.parent,
812            "rank": self.rank,
813            "name": self.name,
814            "leaves": lambda node: join_check(self.leaves(node), sep_multi),
815            "children": lambda node: join_check(self.children(node), sep_multi),
816            "lineage": lambda node: join_check(
817                self.lineage(node, ranks=ranks), sep_multi
818            ),
819            "rank_lineage": lambda node: join_check(
820                self.rank_lineage(node, ranks=ranks), sep_multi
821            ),
822            "name_lineage": lambda node: join_check(
823                self.name_lineage(node, ranks=ranks), sep_multi
824            ),
825        }
826
827        for c in cols:
828            if c not in write_field:
829                raise ValueError(
830                    "Field [" + c + "] is not valid. Options: " + ",".join(write_field)
831                )
832
833        if ranks:
834            for rank in ranks:
835                for node in self.nodes_rank(rank):
836                    print(
837                        *[write_field[c](node) for c in cols],
838                        sep=sep,
839                        end="\n",
840                        file=outf,
841                    )
842        else:
843            for node in self._nodes:
844                print(
845                    *[write_field[c](node) for c in cols], sep=sep, end="\n", file=outf
846                )
847
848        outf.close()
class MultiTax:
 18class MultiTax(object):
 19    _default_version = "current"
 20
 21    _supported_versions = ["current"]
 22    _default_urls = {}
 23    _default_root_node = "1"
 24    _standard_ranks = [
 25        "domain",
 26        "phylum",
 27        "class",
 28        "order",
 29        "family",
 30        "genus",
 31        "species",
 32    ]
 33
 34    def __init__(
 35        self,
 36        version: str = None,
 37        files: list = None,
 38        urls: list = None,
 39        output_prefix: str = None,
 40        root_node: str = None,
 41        root_parent: str = "0",
 42        root_name: str = None,
 43        root_rank: str = None,
 44        undefined_node: str = None,
 45        undefined_name: str = None,
 46        undefined_rank: str = None,
 47        build_name_nodes: bool = False,
 48        build_node_children: bool = False,
 49        build_rank_nodes: bool = False,
 50        extended_names: bool = False,
 51        empty: bool = False,
 52    ):
 53        """
 54        Main constructor of MultiTax and sub-classes
 55
 56        Parameters:
 57        * **version** *[str]*: Version to download/parse or custom version name (with files/urls).
 58        * **files** *[str, list]*: One or more local files to parse.
 59        * **urls** *[str, list]*: One or more urls to download and parse.
 60        * **output_prefix** *[str]*: Directory to write downloaded files.
 61        * **root_node** *[str]*: Define an alternative root node.
 62        * **root_parent** *[str]*: Define the root parent node identifier.
 63        * **root_name** *[str]*: Define an alternative root name. Set to None to use original name.
 64        * **root_rank** *[str]*: Define an alternative root rank. Set to None to use original name.
 65        * **undefined_node** *[str]*: Define a default return value for undefined nodes.
 66        * **undefined_name** *[str]*: Define a default return value for undefined names.
 67        * **undefined_rank** *[str]*: Define a default return value for undefined ranks.
 68        * **build_node_children** *[bool]*: Build node,children dict (otherwise it will be created on first use).
 69        * **build_name_nodes** *[bool]*: Build name,nodes dict (otherwise it will be created on first use).
 70        * **build_rank_nodes** *[bool]*: Build rank,nodes dict (otherwise it will be created on first use).
 71        * **extended_names** *[bool]*: Parse extended names if available.
 72        * **empty** *[bool]*: Create an empty instance.
 73
 74        Example:
 75
 76            tax_ncbi = NcbiTx()
 77            tax_gtdb = GtdbTx(files=["file1.gz", "file2.txt"])
 78            tax_silva = SilvaTx(urls=["https://www.arb-silva.de/fileadmin/silva_databases/current/Exports/taxonomy/tax_slv_lsu_138.1.txt.gz"])
 79            tax_ott = OttTx(root_node="844192")
 80            tax_gg = GreengenesTx(output_prefix="save/to/prefix_")
 81        """
 82        if files:
 83            if isinstance(files, str):
 84                files = [files]
 85            for file in files:
 86                check_file(file)
 87
 88        if output_prefix:
 89            check_dir(output_prefix)
 90
 91        # Main structures
 92        self._nodes = {}
 93        self._ranks = {}
 94        self._names = {}
 95
 96        # Aux. structures
 97        self._lineages = {}
 98        self._name_nodes = {}
 99        self._node_children = {}
100        self._rank_nodes = {}
101        self._translated_nodes = {}
102        self._lca = None
103
104        # Properties
105        self.datetime = datetime.now()
106        self.version = None
107        self.undefined_node = undefined_node
108        self.undefined_name = undefined_name
109        self.undefined_rank = undefined_rank
110
111        # Set version
112        if files or urls:
113            self.version = version
114        else:
115            self.version = self._default_version if not version else version
116            if self.version not in self._supported_versions:
117                raise ValueError(
118                    f"Version [{self.version}] not supported (possible versions: {', '.join(self._supported_versions)}). To set a custom version, use files or urls."
119                )
120
121        # Store source of tax files (url or file)
122        self.sources = []
123
124        if not empty:
125            # Open/Download/Write files
126            fhs = {}
127            if files:
128                fhs = open_files(files)
129            elif urls or self._default_urls.get(self.version):
130                fhs = download_files(
131                    urls=urls if urls else self._default_urls[self.version],
132                    output_prefix=output_prefix,
133                    retry_attempts=3,
134                )
135
136            if fhs:
137                # Parse taxonomy
138                self._nodes, self._ranks, self._names = self._parse(
139                    fhs, extended_names=extended_names
140                )
141                close_files(fhs)
142                # Save sources for stats (files or urls)
143                self.sources = list(fhs.keys())
144
145        # Set root values
146        self._set_root_node(
147            root=root_node if root_node else self._default_root_node,
148            parent=root_parent,
149            name=root_name,
150            rank=root_rank,
151        )
152
153        # build auxiliary structures
154        if build_node_children:
155            self._node_children = reverse_dict(self._nodes)
156        if build_name_nodes:
157            self._name_nodes = reverse_dict(self._names)
158        if build_rank_nodes:
159            self._rank_nodes = reverse_dict(self._ranks)
160
161        self.check_consistency()
162
163    def _exact_name(self, text: str, names: dict):
164        """
165        Returns list of nodes of a given exact name (case sensitive).
166        """
167        if text in names:
168            return names[text]
169        else:
170            return []
171
172    def _parse(self, fhs: dict):
173        """
174        main function to be overloaded
175        receives a dictionary with {"url/file": file handler}
176        return nodes, ranks and names dicts
177        """
178        return {}, {}, {}
179
180    def _partial_name(self, text: str, names: dict):
181        """
182        Searches names containing a certain text (case sensitive) and return their respective nodes.
183        """
184        matching_nodes = set()
185        for name in names:
186            if text in name:
187                matching_nodes.update(names[name])
188        return list(matching_nodes)
189
190    def _recurse_leaves(self, node: str):
191        """
192        Recursive function returning leaf nodes
193        """
194        children = self.children(node)
195        if not children:
196            return [node]
197        leaves = []
198        for child in children:
199            leaves.extend(self._recurse_leaves(child))
200        return leaves
201
202    def _remove(self, node: str):
203        """
204        Removes node from taxonomy, no checking, for internal use
205        """
206        del self._nodes[node]
207        if node in self._names:
208            del self._names[node]
209        if node in self._ranks:
210            del self._ranks[node]
211
212    def _reset_aux_data(self):
213        """
214        Reset aux. data structures
215        """
216        self._lineages = {}
217        self._name_nodes = {}
218        self._node_children = {}
219        self._rank_nodes = {}
220        self._translated_nodes = {}
221        self._lca = None
222
223    def _set_root_node(self, root: str, parent: str, name: str, rank: str):
224        """
225        Set root node of the tree.
226        The files are parsed based on the self._default_root_node for each class
227        A user-defined root node can be:
228        1) internal: will filter the tree acodingly and delete the default root_node
229        2) external: will add node and link to the default
230        """
231
232        # Set parent/root with defaults
233        self.root_parent = parent
234        self.root_node = self._default_root_node
235        self._nodes[self.root_node] = self.root_parent
236
237        # Default root node is the top by definition
238        if root != self._default_root_node:
239            if root in self._nodes:
240                # Not default but exists on tree, filter only descendants
241                self.filter(root, desc=True)
242                # Remove entry for _default_root_node
243                self._remove(self._default_root_node)
244            else:
245                # Not on tree, link default node with new root
246                self._nodes[self._default_root_node] = root
247            # Change root to user defined
248            self.root_node = root
249            # Set/Update new root node parent link
250            self._nodes[self.root_node] = self.root_parent
251
252        # User-defined rank/name.
253        # If provided, insert manually,
254        # If None, check if is in the tree (defined in the given tax)
255        #    otherwise insert default "root"
256        if name:
257            self._names[self.root_node] = name
258        elif self.root_node not in self._names:
259            self._names[self.root_node] = "root"
260        # Set static name
261        self.root_name = self._names[self.root_node]
262
263        if rank:
264            self._ranks[self.root_node] = rank
265        elif self.root_node not in self._ranks:
266            self._ranks[self.root_node] = "root"
267        # Set static rank
268        self.root_rank = self._ranks[self.root_node]
269
270    def add(self, node: str, parent: str, name: str = None, rank: str = None):
271        """
272        Adds node to taxonomy.
273        Deletes built lineages, translations and lca.
274        """
275        if parent not in self._nodes:
276            raise ValueError("Parent node [" + parent + "] not found.")
277        elif node in self._nodes:
278            raise ValueError("Node [" + node + "] already present.")
279
280        self._nodes[node] = parent
281        self._names[node] = name if name is not None else self.undefined_name
282        self._ranks[node] = rank if rank is not None else self.undefined_rank
283        self._reset_aux_data()
284
285    def build_lca(self):
286        """
287        Builds LCA structure based on pylca.
288        Optional function, LCA is built on first .lca() call.
289
290        Returns: None
291        """
292        self._lca = LCA(self._nodes)
293
294    def build_lineages(self, root_node: str = None, ranks: list = None):
295        """
296        Stores lineages in memory for faster access.
297        It is valid for lineage(), rank_lineage() and name_lineage().
298        If keyword arguments (root_node, ranks) are used in those functions stored lineages are not used.
299
300        Returns: None
301        """
302        self.clear_lineages()
303        for node in self._nodes:
304            self._lineages[node] = self.lineage(
305                node=node, root_node=root_node, ranks=ranks
306            )
307
308    def build_translation(
309        self, tax, representatives: bool = False, file: str = None, url: str = None
310    ):
311        """
312        Create a translation of current taxonomy to another
313
314        Parameters:
315
316        * **tax** [MultiTax]: A target taxonomy to be translated to.
317        * **representatives** *[bool]*: Use only GTDB representative genomes to translate nodes.
318        * **file** *[str]*: Local file to parse.
319        * **url** *[str]*: Url to download and parse.
320
321        Example:
322
323            from multitax import GtdbTx, NcbiTx
324            gtdb_tax = GtdbTx()
325            ncbi_tax = NcbiTx()
326
327            # Automatically download translation files
328            gtdb_tax.build_translation(ncbi_tax)
329            gtdb_tax.translate("g__Escherichia")
330                ['561', '620', '590', '1224', '194', '543', '547', '570', '186803', '2005523', '841', '2', '1485', '2159', '216572', '1301', '128827', '815', '239759', '2791015', '1263', '1472649', '816', '171549', '2005473', '33024']
331
332            # Using local file from https://github.com/pirovc/multitax/tree/main/data/gtdb
333            ncbi_tax.build_translation(gtdb_tax, file="226_acc_rep_lin_ncbi.tsv.gz")
334            ncbi_tax.translate("620")
335                {'g__Escherichia', 'g__Proteus', 'g__Serratia'}
336
337            # Translation based on GTDB representative genome only
338            gtdb_tax.build_translation(ncbi_tax, representatives=True)
339            gtdb_tax.translate("g__Escherichia")
340                ['561', '547']
341        """
342        if file:
343            check_file(file)
344
345        self._translated_nodes = self._build_translation(
346            tax, representatives, file, url
347        )
348
349    def children(self, node: str):
350        """
351        Returns list of direct children nodes of a given node.
352        """
353        # Setup on first use
354        if not self._node_children:
355            self._node_children = reverse_dict(self._nodes)
356        if node in self._node_children:
357            return self._node_children[node]
358        else:
359            return []
360
361    def check_consistency(self):
362        """
363        Checks consistency of the tree
364
365        Returns: raise an Exception otherwise None
366        """
367        if self.root_node not in self._nodes:
368            raise ValueError("Root node [" + self.root_node + "] not found.")
369        if self.root_parent in self._nodes:
370            raise ValueError(
371                "Root parent ["
372                + self.root_parent
373                + "] found but should not be on tree."
374            )
375        if self.undefined_node in self._nodes:
376            raise ValueError(
377                "Undefined node ["
378                + self.undefined_node
379                + "] found but should not be on tree."
380            )
381
382        # Difference between values and keys should be only root_parent
383        lost_nodes = set(self._nodes.values()).difference(self._nodes)
384        if self.root_parent not in lost_nodes:
385            raise ValueError(
386                "Root parent [" + self.root_parent + "] not properly defined."
387            )
388        # Remove root_parent from lost nodes to report only missing
389        lost_nodes.remove(self.root_parent)
390        if len(lost_nodes) > 0:
391            raise ValueError("Parent nodes missing: " + ",".join(lost_nodes))
392
393        return None
394
395    def clear_lca(self):
396        """
397        Clear built LCA.
398
399        Returns: None
400        """
401        self._lca = None
402
403    def clear_lineages(self):
404        """
405        Clear built lineages.
406
407        Returns: None
408        """
409        self._lineages = {}
410
411    def closest_parent(self, node: str, ranks: str):
412        """
413        Returns the closest parent node based on a defined list of ranks
414        """
415        # Rank of node is already on the list
416        if self.rank(node) in ranks:
417            return node
418        else:
419            # check lineage from back to front until find a valid node
420            for n in self.lineage(node, ranks=ranks)[::-1]:
421                if n != self.undefined_node:
422                    return n
423        # nothing found
424        return self.undefined_node
425
426    def filter(self, nodes: list, desc: bool = False):
427        """
428        Filters taxonomy given a list of nodes.
429        By default keep all the ancestors of the given nodes.
430        If desc=True, keep all descendants instead.
431        Deletes built lineages, translations and lca.
432
433        Example:
434
435            from multitax import GtdbTx
436            tax = GtdbTx()
437
438            tax.lineage('s__Enterovibrio marina')
439            # ['1', 'd__Bacteria', 'p__Proteobacteria', 'c__Gammaproteobacteria', 'o__Enterobacterales', 'f__Vibrionaceae', 'g__Enterovibrio', 's__Enterovibrio marina']
440            # Keep only ancestors of 'g__Enterovibrio'
441            tax.filter('g__Enterovibrio')
442
443            # Reload taxonomy
444            tax = GtdbTx()
445            # Keep only descendants of 'g__Enterovibrio'
446            tax.filter('g__Enterovibrio', desc=True)
447        """
448        if isinstance(nodes, str):
449            nodes = [nodes]
450
451        # Keep track of nodes to be filtered out
452        filtered_nodes = set(self._nodes)
453        # Always keep root
454        filtered_nodes.discard(self.root_node)
455
456        if desc:
457            # Keep descendants of the given nodes
458            for node in nodes:
459                # Check if node exists (skips root)
460                if node in filtered_nodes:
461                    # For each leaf of the selected nodes
462                    for leaf in self.leaves(node):
463                        # Build lineage of each leaf up-to node itself
464                        for n in self.lineage(leaf, root_node=node):
465                            # Discard nodes from set to be kept
466                            filtered_nodes.discard(n)
467                    # Link node to root
468                    self._nodes[node] = self.root_node
469        else:
470            # Keep ancestors of the given nodes (full lineage up-to root)
471            for node in nodes:
472                # ranks=[] in case build_lineages() was used with specific ranks
473                for n in self.lineage(node, ranks=[]):
474                    # Discard nodes from set to be kept
475                    filtered_nodes.discard(n)
476
477        # Delete filtered nodes
478        for node in filtered_nodes:
479            self._remove(node)
480
481        # Delete aux. data structures
482        self._reset_aux_data()
483        self.check_consistency()
484
485    @classmethod
486    def from_customtx(cls, ctx):
487        """
488        Initialize a Tx sub-class based on a CustomTx instance.
489
490        Example:
491
492            tax_custom = CustomTx(version="custom_ncbi_files", files="my_custom_tax.tsv", cols=["node","parent","rank"])
493            tax_ncbi = NcbiTx.from_customtx(tax_custom)
494        """
495        nc = cls(empty=True)
496        nc.version = ctx.version
497        nc.sources = ctx.sources
498        nc._nodes = ctx._nodes
499        nc._names = ctx._names
500        nc._ranks = ctx._ranks
501        return nc
502
503    def latest(self, node: str):
504        """
505        Returns latest/updated version of a given node.
506        If node is already the latests, returns itself.
507        Mainly used for NCBI (merged.dmp) and OTT (forwards.tsv)
508        """
509        if node in self._nodes:
510            return node
511        else:
512            return self.undefined_node
513
514    def leaves(self, node: str = None):
515        """
516        Returns a list of leaf nodes of a given node.
517        """
518        if node is None or node == self.root_node:
519            # Leaves are nodes not contained in _nodes.values() ("parents")
520            return list(set(self._nodes).difference(self._nodes.values()))
521        elif node in self._nodes:
522            return self._recurse_leaves(node)
523        else:
524            return []
525
526    def lca(self, nodes: list = None):
527        """
528        Returns the lowest common ancestor of two or more nodes.
529
530        Example:
531
532            from multitax import GtdbTx
533            tax = GtdbTx()
534            tax.lca(["s__Escherichia coli", "s__Escherichia fergusonii"])
535        """
536        for node in nodes:
537            if node not in self._nodes:
538                raise ValueError("Node [" + node + "] not found.")
539
540        # Setup on first use
541        if not self._lca:
542            self.build_lca()
543
544        return self._lca(*nodes)
545
546    def lineage(self, node: str, root_node: str = None, ranks: list = None):
547        """
548        Returns a list with the lineage of a given node.
549        If ranks is provided, returns only nodes annotated with such ranks.
550        If root_node is provided, use it instead of default root of tree.
551        """
552        # If lineages were built with build_lineages() with matching params
553        if node in self._lineages and root_node is None and ranks is None:
554            return self._lineages[node]
555        else:
556            if not root_node:
557                root_node = self.root_node
558
559            n = node
560            if ranks:
561                # Fixed length lineage
562                lin = [self.undefined_node] * len(ranks)
563                # Loop until end of the tree (in case chosen root is not on lineage)
564                while n != self.undefined_node:
565                    r = self.rank(n)
566                    if r in ranks:
567                        lin[ranks.index(r)] = n
568                    # If node is root, break (after adding)
569                    if n == root_node:
570                        break
571                    n = self.parent(n)
572            else:
573                # Full lineage
574                lin = []
575                # Loop until end of the tree (in case chosen root is not on lineage)
576                while n != self.undefined_node:
577                    lin.append(n)
578                    # If node is root, break (after adding)
579                    if n == root_node:
580                        break
581                    n = self.parent(n)
582                # Reverse order
583                lin = lin[::-1]
584
585            # last iteration node (n) != root_node: didn't find the root, invalid lineage
586            if n != root_node:
587                return []
588            else:
589                return lin
590
591    def name(self, node: str):
592        """
593        Returns name of a given node.
594        """
595        if node in self._names:
596            return self._names[node]
597        else:
598            return self.undefined_name
599
600    def name_lineage(self, node: str, root_node: str = None, ranks: list = None):
601        """
602        Returns a list with the name lineage of a given node.
603        """
604        return list(
605            map(self.name, self.lineage(node=node, root_node=root_node, ranks=ranks))
606        )
607
608    def nodes_rank(self, rank: str):
609        """
610        Returns list of nodes of a given rank.
611        """
612        # Setup on first use
613        if not self._rank_nodes:
614            self._rank_nodes = reverse_dict(self._ranks)
615        if rank in self._rank_nodes:
616            return self._rank_nodes[rank]
617        else:
618            return []
619
620    def parent(self, node: str):
621        """
622        Returns the direct parent node of a given node.
623        """
624        if node in self._nodes:
625            return self._nodes[node]
626        else:
627            return self.undefined_node
628
629    def parent_rank(self, node: str, rank: str):
630        """
631        Returns the parent node of a given rank in the specified rank.
632        """
633        parent = self.lineage(node=node, ranks=[rank])
634        return parent[0] if parent else self.undefined_node
635
636    def prune(self, nodes: list):
637        """
638        Prunes branches of the tree under the given nodes.
639        Deletes built lineages, translations and lca.
640        """
641
642        if isinstance(nodes, str):
643            nodes = [nodes]
644
645        del_nodes = set()
646        for node in nodes:
647            if node not in self._nodes:
648                raise ValueError("Node [" + node + "] not found.")
649            for leaf in self.leaves(node):
650                for n in self.lineage(leaf, root_node=node)[1:]:
651                    del_nodes.add(n)
652
653        for n in del_nodes:
654            self._remove(n)
655
656        self._reset_aux_data()
657
658    def rank(self, node: str):
659        """
660        Returns the rank of a given node.
661        """
662        if node in self._ranks:
663            return self._ranks[node]
664        else:
665            return self.undefined_rank
666
667    def rank_lineage(self, node: str, root_node: str = None, ranks: list = None):
668        """
669        Returns a list with the rank lineage of a given node.
670        """
671        return list(
672            map(self.rank, self.lineage(node=node, root_node=root_node, ranks=ranks))
673        )
674
675    def remove(self, node: str, check_consistency: bool = False):
676        """
677        Removes node from taxonomy. Can break the tree if a parent node is removed. To remove a certain branch, use prune.
678        Running check consistency after removing a node is recommended.
679        Deletes built lineages, translations and lca.
680        """
681        if node not in self._nodes:
682            raise ValueError("Node [" + node + "] not found.")
683        self._remove(node)
684        self._reset_aux_data()
685        if check_consistency:
686            self.check_consistency()
687
688    def search_name(self, text: str, rank: str = None, exact: bool = True):
689        """
690        Search node by exact or partial name
691
692        Parameters:
693        * **text** *[str]*: Text to search.
694        * **rank** *[str]*: Filter results by rank.
695        * **exact** *[bool]*: Exact or partial name search (both case sensitive).
696
697        Returns: list of matching nodes
698        """
699        # Setup on first use
700        if not self._name_nodes:
701            self._name_nodes = reverse_dict(self._names)
702
703        if exact:
704            ret = self._exact_name(text, self._name_nodes)
705        else:
706            ret = self._partial_name(text, self._name_nodes)
707
708        # Only return nodes of chosen rank
709        if rank:
710            return filter_function(ret, self.rank, rank)
711        else:
712            return ret
713
714    def stats(self):
715        """
716        Returns a dict with general numbers of the taxonomic tree
717
718        Example:
719
720            from pprint import pprint
721            from multitax import GtdbTx
722            tax = GtdbTx()
723
724            pprint(tax.stats())
725            {'leaves': 30238,
726             'names': 42739,
727             'nodes': 42739,
728             'ranked_leaves': Counter({'species': 30238}),
729             'ranked_nodes': Counter({'species': 30238,
730                                      'genus': 8778,
731                                      'family': 2323,
732                                      'order': 930,
733                                      'class': 337,
734                                      'phylum': 131,
735                                      'domain': 1,
736                                      'root': 1}),
737             'ranks': 42739}
738        """
739        s = {}
740        s["nodes"] = len(self._nodes)
741        s["ranks"] = len(self._ranks)
742        s["names"] = len(self._names)
743        all_leaves = self.leaves(self.root_node)
744        s["leaves"] = len(all_leaves)
745        s["ranked_nodes"] = Counter(self._ranks.values())
746        s["ranked_leaves"] = Counter(map(self.rank, all_leaves))
747        return s
748
749    def translate(self, node: str, top_perc: float | None = None, counts: bool = False):
750        """
751        Returns the translated node(s) from another taxonomy. One node may translate to none, one or several nodes.
752        `counts` additionally outputs the number of entries/genomes used to translate each node.
753        The translation have to first be generated with the `build_translation` function.
754
755        Parameters:
756        * **node** *[str]*: Node to translate.
757        * **top_perc** *[float]*: Keep translations summing up to `top_perc` of the nodes based on counts.
758        * **counts** *[bool]*: Output a sorted list of tuples with the translated node and counts.
759
760        Returns: List of translated nodes (or list of tuples with counts)
761        """
762        if node in self._translated_nodes:
763            ret = Counter(self._translated_nodes[node])
764            i = None
765            if top_perc:
766                total = ret.total()
767                sm = 0
768                for i, (_, cnt) in enumerate(ret.most_common(), 1):
769                    sm += cnt
770                    if (sm / total) >= top_perc:
771                        break
772            return ret.most_common(i) if counts else [n[0] for n in ret.most_common(i)]
773
774        return []
775
776    def write(
777        self,
778        output_file: str,
779        cols: list = ["node", "parent", "rank", "name"],
780        sep: str = "\t",
781        sep_multi: str = "|",
782        ranks: list = None,
783        gz: bool = False,
784    ):
785        """
786        Writes loaded taxonomy to a file.
787
788        Parameters:
789        * **cols** *[list]*: Options: "node", "latest", "parent", "rank", "name", "leaves", "children", "lineage", "rank_lineage", "name_lineage"
790        * **sep** *[str]*: Separator of fields
791        * **sep_multi** *[str]*: Separator of multi-valued fields
792        * **ranks** *[list]*: Ranks to report
793        * **gz** *[bool]*: Gzip output
794
795        Returns: None
796        """
797        import gzip
798
799        if gz:
800            output_file = (
801                output_file if output_file.endswith(".gz") else output_file + ".gz"
802            )
803            check_no_file(output_file)
804            outf = gzip.open(output_file, "wt")
805        else:
806            check_no_file(output_file)
807            outf = open(output_file, "w")
808
809        write_field = {
810            "node": lambda node: node,
811            "latest": self.latest,
812            "parent": self.parent,
813            "rank": self.rank,
814            "name": self.name,
815            "leaves": lambda node: join_check(self.leaves(node), sep_multi),
816            "children": lambda node: join_check(self.children(node), sep_multi),
817            "lineage": lambda node: join_check(
818                self.lineage(node, ranks=ranks), sep_multi
819            ),
820            "rank_lineage": lambda node: join_check(
821                self.rank_lineage(node, ranks=ranks), sep_multi
822            ),
823            "name_lineage": lambda node: join_check(
824                self.name_lineage(node, ranks=ranks), sep_multi
825            ),
826        }
827
828        for c in cols:
829            if c not in write_field:
830                raise ValueError(
831                    "Field [" + c + "] is not valid. Options: " + ",".join(write_field)
832                )
833
834        if ranks:
835            for rank in ranks:
836                for node in self.nodes_rank(rank):
837                    print(
838                        *[write_field[c](node) for c in cols],
839                        sep=sep,
840                        end="\n",
841                        file=outf,
842                    )
843        else:
844            for node in self._nodes:
845                print(
846                    *[write_field[c](node) for c in cols], sep=sep, end="\n", file=outf
847                )
848
849        outf.close()
MultiTax( version: str = None, files: list = None, urls: list = None, output_prefix: str = None, root_node: str = None, root_parent: str = '0', root_name: str = None, root_rank: str = None, undefined_node: str = None, undefined_name: str = None, undefined_rank: str = None, build_name_nodes: bool = False, build_node_children: bool = False, build_rank_nodes: bool = False, extended_names: bool = False, empty: bool = False)
 34    def __init__(
 35        self,
 36        version: str = None,
 37        files: list = None,
 38        urls: list = None,
 39        output_prefix: str = None,
 40        root_node: str = None,
 41        root_parent: str = "0",
 42        root_name: str = None,
 43        root_rank: str = None,
 44        undefined_node: str = None,
 45        undefined_name: str = None,
 46        undefined_rank: str = None,
 47        build_name_nodes: bool = False,
 48        build_node_children: bool = False,
 49        build_rank_nodes: bool = False,
 50        extended_names: bool = False,
 51        empty: bool = False,
 52    ):
 53        """
 54        Main constructor of MultiTax and sub-classes
 55
 56        Parameters:
 57        * **version** *[str]*: Version to download/parse or custom version name (with files/urls).
 58        * **files** *[str, list]*: One or more local files to parse.
 59        * **urls** *[str, list]*: One or more urls to download and parse.
 60        * **output_prefix** *[str]*: Directory to write downloaded files.
 61        * **root_node** *[str]*: Define an alternative root node.
 62        * **root_parent** *[str]*: Define the root parent node identifier.
 63        * **root_name** *[str]*: Define an alternative root name. Set to None to use original name.
 64        * **root_rank** *[str]*: Define an alternative root rank. Set to None to use original name.
 65        * **undefined_node** *[str]*: Define a default return value for undefined nodes.
 66        * **undefined_name** *[str]*: Define a default return value for undefined names.
 67        * **undefined_rank** *[str]*: Define a default return value for undefined ranks.
 68        * **build_node_children** *[bool]*: Build node,children dict (otherwise it will be created on first use).
 69        * **build_name_nodes** *[bool]*: Build name,nodes dict (otherwise it will be created on first use).
 70        * **build_rank_nodes** *[bool]*: Build rank,nodes dict (otherwise it will be created on first use).
 71        * **extended_names** *[bool]*: Parse extended names if available.
 72        * **empty** *[bool]*: Create an empty instance.
 73
 74        Example:
 75
 76            tax_ncbi = NcbiTx()
 77            tax_gtdb = GtdbTx(files=["file1.gz", "file2.txt"])
 78            tax_silva = SilvaTx(urls=["https://www.arb-silva.de/fileadmin/silva_databases/current/Exports/taxonomy/tax_slv_lsu_138.1.txt.gz"])
 79            tax_ott = OttTx(root_node="844192")
 80            tax_gg = GreengenesTx(output_prefix="save/to/prefix_")
 81        """
 82        if files:
 83            if isinstance(files, str):
 84                files = [files]
 85            for file in files:
 86                check_file(file)
 87
 88        if output_prefix:
 89            check_dir(output_prefix)
 90
 91        # Main structures
 92        self._nodes = {}
 93        self._ranks = {}
 94        self._names = {}
 95
 96        # Aux. structures
 97        self._lineages = {}
 98        self._name_nodes = {}
 99        self._node_children = {}
100        self._rank_nodes = {}
101        self._translated_nodes = {}
102        self._lca = None
103
104        # Properties
105        self.datetime = datetime.now()
106        self.version = None
107        self.undefined_node = undefined_node
108        self.undefined_name = undefined_name
109        self.undefined_rank = undefined_rank
110
111        # Set version
112        if files or urls:
113            self.version = version
114        else:
115            self.version = self._default_version if not version else version
116            if self.version not in self._supported_versions:
117                raise ValueError(
118                    f"Version [{self.version}] not supported (possible versions: {', '.join(self._supported_versions)}). To set a custom version, use files or urls."
119                )
120
121        # Store source of tax files (url or file)
122        self.sources = []
123
124        if not empty:
125            # Open/Download/Write files
126            fhs = {}
127            if files:
128                fhs = open_files(files)
129            elif urls or self._default_urls.get(self.version):
130                fhs = download_files(
131                    urls=urls if urls else self._default_urls[self.version],
132                    output_prefix=output_prefix,
133                    retry_attempts=3,
134                )
135
136            if fhs:
137                # Parse taxonomy
138                self._nodes, self._ranks, self._names = self._parse(
139                    fhs, extended_names=extended_names
140                )
141                close_files(fhs)
142                # Save sources for stats (files or urls)
143                self.sources = list(fhs.keys())
144
145        # Set root values
146        self._set_root_node(
147            root=root_node if root_node else self._default_root_node,
148            parent=root_parent,
149            name=root_name,
150            rank=root_rank,
151        )
152
153        # build auxiliary structures
154        if build_node_children:
155            self._node_children = reverse_dict(self._nodes)
156        if build_name_nodes:
157            self._name_nodes = reverse_dict(self._names)
158        if build_rank_nodes:
159            self._rank_nodes = reverse_dict(self._ranks)
160
161        self.check_consistency()

Main constructor of MultiTax and sub-classes

Parameters:

  • version [str]: Version to download/parse or custom version name (with files/urls).
  • files [str, list]: One or more local files to parse.
  • urls [str, list]: One or more urls to download and parse.
  • output_prefix [str]: Directory to write downloaded files.
  • root_node [str]: Define an alternative root node.
  • root_parent [str]: Define the root parent node identifier.
  • root_name [str]: Define an alternative root name. Set to None to use original name.
  • root_rank [str]: Define an alternative root rank. Set to None to use original name.
  • undefined_node [str]: Define a default return value for undefined nodes.
  • undefined_name [str]: Define a default return value for undefined names.
  • undefined_rank [str]: Define a default return value for undefined ranks.
  • build_node_children [bool]: Build node,children dict (otherwise it will be created on first use).
  • build_name_nodes [bool]: Build name,nodes dict (otherwise it will be created on first use).
  • build_rank_nodes [bool]: Build rank,nodes dict (otherwise it will be created on first use).
  • extended_names [bool]: Parse extended names if available.
  • empty [bool]: Create an empty instance.

Example:

tax_ncbi = NcbiTx()
tax_gtdb = GtdbTx(files=["file1.gz", "file2.txt"])
tax_silva = SilvaTx(urls=["https://www.arb-silva.de/fileadmin/silva_databases/current/Exports/taxonomy/tax_slv_lsu_138.1.txt.gz"])
tax_ott = OttTx(root_node="844192")
tax_gg = GreengenesTx(output_prefix="save/to/prefix_")
datetime
version
undefined_node
undefined_name
undefined_rank
sources
def add(self, node: str, parent: str, name: str = None, rank: str = None):
270    def add(self, node: str, parent: str, name: str = None, rank: str = None):
271        """
272        Adds node to taxonomy.
273        Deletes built lineages, translations and lca.
274        """
275        if parent not in self._nodes:
276            raise ValueError("Parent node [" + parent + "] not found.")
277        elif node in self._nodes:
278            raise ValueError("Node [" + node + "] already present.")
279
280        self._nodes[node] = parent
281        self._names[node] = name if name is not None else self.undefined_name
282        self._ranks[node] = rank if rank is not None else self.undefined_rank
283        self._reset_aux_data()

Adds node to taxonomy. Deletes built lineages, translations and lca.

def build_lca(self):
285    def build_lca(self):
286        """
287        Builds LCA structure based on pylca.
288        Optional function, LCA is built on first .lca() call.
289
290        Returns: None
291        """
292        self._lca = LCA(self._nodes)

Builds LCA structure based on pylca. Optional function, LCA is built on first .lca() call.

Returns: None

def build_lineages(self, root_node: str = None, ranks: list = None):
294    def build_lineages(self, root_node: str = None, ranks: list = None):
295        """
296        Stores lineages in memory for faster access.
297        It is valid for lineage(), rank_lineage() and name_lineage().
298        If keyword arguments (root_node, ranks) are used in those functions stored lineages are not used.
299
300        Returns: None
301        """
302        self.clear_lineages()
303        for node in self._nodes:
304            self._lineages[node] = self.lineage(
305                node=node, root_node=root_node, ranks=ranks
306            )

Stores lineages in memory for faster access. It is valid for lineage(), rank_lineage() and name_lineage(). If keyword arguments (root_node, ranks) are used in those functions stored lineages are not used.

Returns: None

def build_translation( self, tax, representatives: bool = False, file: str = None, url: str = None):
308    def build_translation(
309        self, tax, representatives: bool = False, file: str = None, url: str = None
310    ):
311        """
312        Create a translation of current taxonomy to another
313
314        Parameters:
315
316        * **tax** [MultiTax]: A target taxonomy to be translated to.
317        * **representatives** *[bool]*: Use only GTDB representative genomes to translate nodes.
318        * **file** *[str]*: Local file to parse.
319        * **url** *[str]*: Url to download and parse.
320
321        Example:
322
323            from multitax import GtdbTx, NcbiTx
324            gtdb_tax = GtdbTx()
325            ncbi_tax = NcbiTx()
326
327            # Automatically download translation files
328            gtdb_tax.build_translation(ncbi_tax)
329            gtdb_tax.translate("g__Escherichia")
330                ['561', '620', '590', '1224', '194', '543', '547', '570', '186803', '2005523', '841', '2', '1485', '2159', '216572', '1301', '128827', '815', '239759', '2791015', '1263', '1472649', '816', '171549', '2005473', '33024']
331
332            # Using local file from https://github.com/pirovc/multitax/tree/main/data/gtdb
333            ncbi_tax.build_translation(gtdb_tax, file="226_acc_rep_lin_ncbi.tsv.gz")
334            ncbi_tax.translate("620")
335                {'g__Escherichia', 'g__Proteus', 'g__Serratia'}
336
337            # Translation based on GTDB representative genome only
338            gtdb_tax.build_translation(ncbi_tax, representatives=True)
339            gtdb_tax.translate("g__Escherichia")
340                ['561', '547']
341        """
342        if file:
343            check_file(file)
344
345        self._translated_nodes = self._build_translation(
346            tax, representatives, file, url
347        )

Create a translation of current taxonomy to another

Parameters:

  • tax [MultiTax]: A target taxonomy to be translated to.
  • representatives [bool]: Use only GTDB representative genomes to translate nodes.
  • file [str]: Local file to parse.
  • url [str]: Url to download and parse.

Example:

from multitax import GtdbTx, NcbiTx
gtdb_tax = GtdbTx()
ncbi_tax = NcbiTx()

# Automatically download translation files
gtdb_tax.build_translation(ncbi_tax)
gtdb_tax.translate("g__Escherichia")
    ['561', '620', '590', '1224', '194', '543', '547', '570', '186803', '2005523', '841', '2', '1485', '2159', '216572', '1301', '128827', '815', '239759', '2791015', '1263', '1472649', '816', '171549', '2005473', '33024']

# Using local file from https://github.com/pirovc/multitax/tree/main/data/gtdb
ncbi_tax.build_translation(gtdb_tax, file="226_acc_rep_lin_ncbi.tsv.gz")
ncbi_tax.translate("620")
    {'g__Escherichia', 'g__Proteus', 'g__Serratia'}

# Translation based on GTDB representative genome only
gtdb_tax.build_translation(ncbi_tax, representatives=True)
gtdb_tax.translate("g__Escherichia")
    ['561', '547']
def children(self, node: str):
349    def children(self, node: str):
350        """
351        Returns list of direct children nodes of a given node.
352        """
353        # Setup on first use
354        if not self._node_children:
355            self._node_children = reverse_dict(self._nodes)
356        if node in self._node_children:
357            return self._node_children[node]
358        else:
359            return []

Returns list of direct children nodes of a given node.

def check_consistency(self):
361    def check_consistency(self):
362        """
363        Checks consistency of the tree
364
365        Returns: raise an Exception otherwise None
366        """
367        if self.root_node not in self._nodes:
368            raise ValueError("Root node [" + self.root_node + "] not found.")
369        if self.root_parent in self._nodes:
370            raise ValueError(
371                "Root parent ["
372                + self.root_parent
373                + "] found but should not be on tree."
374            )
375        if self.undefined_node in self._nodes:
376            raise ValueError(
377                "Undefined node ["
378                + self.undefined_node
379                + "] found but should not be on tree."
380            )
381
382        # Difference between values and keys should be only root_parent
383        lost_nodes = set(self._nodes.values()).difference(self._nodes)
384        if self.root_parent not in lost_nodes:
385            raise ValueError(
386                "Root parent [" + self.root_parent + "] not properly defined."
387            )
388        # Remove root_parent from lost nodes to report only missing
389        lost_nodes.remove(self.root_parent)
390        if len(lost_nodes) > 0:
391            raise ValueError("Parent nodes missing: " + ",".join(lost_nodes))
392
393        return None

Checks consistency of the tree

Returns: raise an Exception otherwise None

def clear_lca(self):
395    def clear_lca(self):
396        """
397        Clear built LCA.
398
399        Returns: None
400        """
401        self._lca = None

Clear built LCA.

Returns: None

def clear_lineages(self):
403    def clear_lineages(self):
404        """
405        Clear built lineages.
406
407        Returns: None
408        """
409        self._lineages = {}

Clear built lineages.

Returns: None

def closest_parent(self, node: str, ranks: str):
411    def closest_parent(self, node: str, ranks: str):
412        """
413        Returns the closest parent node based on a defined list of ranks
414        """
415        # Rank of node is already on the list
416        if self.rank(node) in ranks:
417            return node
418        else:
419            # check lineage from back to front until find a valid node
420            for n in self.lineage(node, ranks=ranks)[::-1]:
421                if n != self.undefined_node:
422                    return n
423        # nothing found
424        return self.undefined_node

Returns the closest parent node based on a defined list of ranks

def filter(self, nodes: list, desc: bool = False):
426    def filter(self, nodes: list, desc: bool = False):
427        """
428        Filters taxonomy given a list of nodes.
429        By default keep all the ancestors of the given nodes.
430        If desc=True, keep all descendants instead.
431        Deletes built lineages, translations and lca.
432
433        Example:
434
435            from multitax import GtdbTx
436            tax = GtdbTx()
437
438            tax.lineage('s__Enterovibrio marina')
439            # ['1', 'd__Bacteria', 'p__Proteobacteria', 'c__Gammaproteobacteria', 'o__Enterobacterales', 'f__Vibrionaceae', 'g__Enterovibrio', 's__Enterovibrio marina']
440            # Keep only ancestors of 'g__Enterovibrio'
441            tax.filter('g__Enterovibrio')
442
443            # Reload taxonomy
444            tax = GtdbTx()
445            # Keep only descendants of 'g__Enterovibrio'
446            tax.filter('g__Enterovibrio', desc=True)
447        """
448        if isinstance(nodes, str):
449            nodes = [nodes]
450
451        # Keep track of nodes to be filtered out
452        filtered_nodes = set(self._nodes)
453        # Always keep root
454        filtered_nodes.discard(self.root_node)
455
456        if desc:
457            # Keep descendants of the given nodes
458            for node in nodes:
459                # Check if node exists (skips root)
460                if node in filtered_nodes:
461                    # For each leaf of the selected nodes
462                    for leaf in self.leaves(node):
463                        # Build lineage of each leaf up-to node itself
464                        for n in self.lineage(leaf, root_node=node):
465                            # Discard nodes from set to be kept
466                            filtered_nodes.discard(n)
467                    # Link node to root
468                    self._nodes[node] = self.root_node
469        else:
470            # Keep ancestors of the given nodes (full lineage up-to root)
471            for node in nodes:
472                # ranks=[] in case build_lineages() was used with specific ranks
473                for n in self.lineage(node, ranks=[]):
474                    # Discard nodes from set to be kept
475                    filtered_nodes.discard(n)
476
477        # Delete filtered nodes
478        for node in filtered_nodes:
479            self._remove(node)
480
481        # Delete aux. data structures
482        self._reset_aux_data()
483        self.check_consistency()

Filters taxonomy given a list of nodes. By default keep all the ancestors of the given nodes. If desc=True, keep all descendants instead. Deletes built lineages, translations and lca.

Example:

from multitax import GtdbTx
tax = GtdbTx()

tax.lineage('s__Enterovibrio marina')
# ['1', 'd__Bacteria', 'p__Proteobacteria', 'c__Gammaproteobacteria', 'o__Enterobacterales', 'f__Vibrionaceae', 'g__Enterovibrio', 's__Enterovibrio marina']
# Keep only ancestors of 'g__Enterovibrio'
tax.filter('g__Enterovibrio')

# Reload taxonomy
tax = GtdbTx()
# Keep only descendants of 'g__Enterovibrio'
tax.filter('g__Enterovibrio', desc=True)
@classmethod
def from_customtx(cls, ctx):
485    @classmethod
486    def from_customtx(cls, ctx):
487        """
488        Initialize a Tx sub-class based on a CustomTx instance.
489
490        Example:
491
492            tax_custom = CustomTx(version="custom_ncbi_files", files="my_custom_tax.tsv", cols=["node","parent","rank"])
493            tax_ncbi = NcbiTx.from_customtx(tax_custom)
494        """
495        nc = cls(empty=True)
496        nc.version = ctx.version
497        nc.sources = ctx.sources
498        nc._nodes = ctx._nodes
499        nc._names = ctx._names
500        nc._ranks = ctx._ranks
501        return nc

Initialize a Tx sub-class based on a CustomTx instance.

Example:

tax_custom = CustomTx(version="custom_ncbi_files", files="my_custom_tax.tsv", cols=["node","parent","rank"])
tax_ncbi = NcbiTx.from_customtx(tax_custom)
def latest(self, node: str):
503    def latest(self, node: str):
504        """
505        Returns latest/updated version of a given node.
506        If node is already the latests, returns itself.
507        Mainly used for NCBI (merged.dmp) and OTT (forwards.tsv)
508        """
509        if node in self._nodes:
510            return node
511        else:
512            return self.undefined_node

Returns latest/updated version of a given node. If node is already the latests, returns itself. Mainly used for NCBI (merged.dmp) and OTT (forwards.tsv)

def leaves(self, node: str = None):
514    def leaves(self, node: str = None):
515        """
516        Returns a list of leaf nodes of a given node.
517        """
518        if node is None or node == self.root_node:
519            # Leaves are nodes not contained in _nodes.values() ("parents")
520            return list(set(self._nodes).difference(self._nodes.values()))
521        elif node in self._nodes:
522            return self._recurse_leaves(node)
523        else:
524            return []

Returns a list of leaf nodes of a given node.

def lca(self, nodes: list = None):
526    def lca(self, nodes: list = None):
527        """
528        Returns the lowest common ancestor of two or more nodes.
529
530        Example:
531
532            from multitax import GtdbTx
533            tax = GtdbTx()
534            tax.lca(["s__Escherichia coli", "s__Escherichia fergusonii"])
535        """
536        for node in nodes:
537            if node not in self._nodes:
538                raise ValueError("Node [" + node + "] not found.")
539
540        # Setup on first use
541        if not self._lca:
542            self.build_lca()
543
544        return self._lca(*nodes)

Returns the lowest common ancestor of two or more nodes.

Example:

from multitax import GtdbTx
tax = GtdbTx()
tax.lca(["s__Escherichia coli", "s__Escherichia fergusonii"])
def lineage(self, node: str, root_node: str = None, ranks: list = None):
546    def lineage(self, node: str, root_node: str = None, ranks: list = None):
547        """
548        Returns a list with the lineage of a given node.
549        If ranks is provided, returns only nodes annotated with such ranks.
550        If root_node is provided, use it instead of default root of tree.
551        """
552        # If lineages were built with build_lineages() with matching params
553        if node in self._lineages and root_node is None and ranks is None:
554            return self._lineages[node]
555        else:
556            if not root_node:
557                root_node = self.root_node
558
559            n = node
560            if ranks:
561                # Fixed length lineage
562                lin = [self.undefined_node] * len(ranks)
563                # Loop until end of the tree (in case chosen root is not on lineage)
564                while n != self.undefined_node:
565                    r = self.rank(n)
566                    if r in ranks:
567                        lin[ranks.index(r)] = n
568                    # If node is root, break (after adding)
569                    if n == root_node:
570                        break
571                    n = self.parent(n)
572            else:
573                # Full lineage
574                lin = []
575                # Loop until end of the tree (in case chosen root is not on lineage)
576                while n != self.undefined_node:
577                    lin.append(n)
578                    # If node is root, break (after adding)
579                    if n == root_node:
580                        break
581                    n = self.parent(n)
582                # Reverse order
583                lin = lin[::-1]
584
585            # last iteration node (n) != root_node: didn't find the root, invalid lineage
586            if n != root_node:
587                return []
588            else:
589                return lin

Returns a list with the lineage of a given node. If ranks is provided, returns only nodes annotated with such ranks. If root_node is provided, use it instead of default root of tree.

def name(self, node: str):
591    def name(self, node: str):
592        """
593        Returns name of a given node.
594        """
595        if node in self._names:
596            return self._names[node]
597        else:
598            return self.undefined_name

Returns name of a given node.

def name_lineage(self, node: str, root_node: str = None, ranks: list = None):
600    def name_lineage(self, node: str, root_node: str = None, ranks: list = None):
601        """
602        Returns a list with the name lineage of a given node.
603        """
604        return list(
605            map(self.name, self.lineage(node=node, root_node=root_node, ranks=ranks))
606        )

Returns a list with the name lineage of a given node.

def nodes_rank(self, rank: str):
608    def nodes_rank(self, rank: str):
609        """
610        Returns list of nodes of a given rank.
611        """
612        # Setup on first use
613        if not self._rank_nodes:
614            self._rank_nodes = reverse_dict(self._ranks)
615        if rank in self._rank_nodes:
616            return self._rank_nodes[rank]
617        else:
618            return []

Returns list of nodes of a given rank.

def parent(self, node: str):
620    def parent(self, node: str):
621        """
622        Returns the direct parent node of a given node.
623        """
624        if node in self._nodes:
625            return self._nodes[node]
626        else:
627            return self.undefined_node

Returns the direct parent node of a given node.

def parent_rank(self, node: str, rank: str):
629    def parent_rank(self, node: str, rank: str):
630        """
631        Returns the parent node of a given rank in the specified rank.
632        """
633        parent = self.lineage(node=node, ranks=[rank])
634        return parent[0] if parent else self.undefined_node

Returns the parent node of a given rank in the specified rank.

def prune(self, nodes: list):
636    def prune(self, nodes: list):
637        """
638        Prunes branches of the tree under the given nodes.
639        Deletes built lineages, translations and lca.
640        """
641
642        if isinstance(nodes, str):
643            nodes = [nodes]
644
645        del_nodes = set()
646        for node in nodes:
647            if node not in self._nodes:
648                raise ValueError("Node [" + node + "] not found.")
649            for leaf in self.leaves(node):
650                for n in self.lineage(leaf, root_node=node)[1:]:
651                    del_nodes.add(n)
652
653        for n in del_nodes:
654            self._remove(n)
655
656        self._reset_aux_data()

Prunes branches of the tree under the given nodes. Deletes built lineages, translations and lca.

def rank(self, node: str):
658    def rank(self, node: str):
659        """
660        Returns the rank of a given node.
661        """
662        if node in self._ranks:
663            return self._ranks[node]
664        else:
665            return self.undefined_rank

Returns the rank of a given node.

def rank_lineage(self, node: str, root_node: str = None, ranks: list = None):
667    def rank_lineage(self, node: str, root_node: str = None, ranks: list = None):
668        """
669        Returns a list with the rank lineage of a given node.
670        """
671        return list(
672            map(self.rank, self.lineage(node=node, root_node=root_node, ranks=ranks))
673        )

Returns a list with the rank lineage of a given node.

def remove(self, node: str, check_consistency: bool = False):
675    def remove(self, node: str, check_consistency: bool = False):
676        """
677        Removes node from taxonomy. Can break the tree if a parent node is removed. To remove a certain branch, use prune.
678        Running check consistency after removing a node is recommended.
679        Deletes built lineages, translations and lca.
680        """
681        if node not in self._nodes:
682            raise ValueError("Node [" + node + "] not found.")
683        self._remove(node)
684        self._reset_aux_data()
685        if check_consistency:
686            self.check_consistency()

Removes node from taxonomy. Can break the tree if a parent node is removed. To remove a certain branch, use prune. Running check consistency after removing a node is recommended. Deletes built lineages, translations and lca.

def search_name(self, text: str, rank: str = None, exact: bool = True):
688    def search_name(self, text: str, rank: str = None, exact: bool = True):
689        """
690        Search node by exact or partial name
691
692        Parameters:
693        * **text** *[str]*: Text to search.
694        * **rank** *[str]*: Filter results by rank.
695        * **exact** *[bool]*: Exact or partial name search (both case sensitive).
696
697        Returns: list of matching nodes
698        """
699        # Setup on first use
700        if not self._name_nodes:
701            self._name_nodes = reverse_dict(self._names)
702
703        if exact:
704            ret = self._exact_name(text, self._name_nodes)
705        else:
706            ret = self._partial_name(text, self._name_nodes)
707
708        # Only return nodes of chosen rank
709        if rank:
710            return filter_function(ret, self.rank, rank)
711        else:
712            return ret

Search node by exact or partial name

Parameters:

  • text [str]: Text to search.
  • rank [str]: Filter results by rank.
  • exact [bool]: Exact or partial name search (both case sensitive).

Returns: list of matching nodes

def stats(self):
714    def stats(self):
715        """
716        Returns a dict with general numbers of the taxonomic tree
717
718        Example:
719
720            from pprint import pprint
721            from multitax import GtdbTx
722            tax = GtdbTx()
723
724            pprint(tax.stats())
725            {'leaves': 30238,
726             'names': 42739,
727             'nodes': 42739,
728             'ranked_leaves': Counter({'species': 30238}),
729             'ranked_nodes': Counter({'species': 30238,
730                                      'genus': 8778,
731                                      'family': 2323,
732                                      'order': 930,
733                                      'class': 337,
734                                      'phylum': 131,
735                                      'domain': 1,
736                                      'root': 1}),
737             'ranks': 42739}
738        """
739        s = {}
740        s["nodes"] = len(self._nodes)
741        s["ranks"] = len(self._ranks)
742        s["names"] = len(self._names)
743        all_leaves = self.leaves(self.root_node)
744        s["leaves"] = len(all_leaves)
745        s["ranked_nodes"] = Counter(self._ranks.values())
746        s["ranked_leaves"] = Counter(map(self.rank, all_leaves))
747        return s

Returns a dict with general numbers of the taxonomic tree

Example:

from pprint import pprint
from multitax import GtdbTx
tax = GtdbTx()

pprint(tax.stats())
{'leaves': 30238,
 'names': 42739,
 'nodes': 42739,
 'ranked_leaves': Counter({'species': 30238}),
 'ranked_nodes': Counter({'species': 30238,
                          'genus': 8778,
                          'family': 2323,
                          'order': 930,
                          'class': 337,
                          'phylum': 131,
                          'domain': 1,
                          'root': 1}),
 'ranks': 42739}
def translate(self, node: str, top_perc: float | None = None, counts: bool = False):
749    def translate(self, node: str, top_perc: float | None = None, counts: bool = False):
750        """
751        Returns the translated node(s) from another taxonomy. One node may translate to none, one or several nodes.
752        `counts` additionally outputs the number of entries/genomes used to translate each node.
753        The translation have to first be generated with the `build_translation` function.
754
755        Parameters:
756        * **node** *[str]*: Node to translate.
757        * **top_perc** *[float]*: Keep translations summing up to `top_perc` of the nodes based on counts.
758        * **counts** *[bool]*: Output a sorted list of tuples with the translated node and counts.
759
760        Returns: List of translated nodes (or list of tuples with counts)
761        """
762        if node in self._translated_nodes:
763            ret = Counter(self._translated_nodes[node])
764            i = None
765            if top_perc:
766                total = ret.total()
767                sm = 0
768                for i, (_, cnt) in enumerate(ret.most_common(), 1):
769                    sm += cnt
770                    if (sm / total) >= top_perc:
771                        break
772            return ret.most_common(i) if counts else [n[0] for n in ret.most_common(i)]
773
774        return []

Returns the translated node(s) from another taxonomy. One node may translate to none, one or several nodes. counts additionally outputs the number of entries/genomes used to translate each node. The translation have to first be generated with the build_translation function.

Parameters:

  • node [str]: Node to translate.
  • top_perc [float]: Keep translations summing up to top_perc of the nodes based on counts.
  • counts [bool]: Output a sorted list of tuples with the translated node and counts.

Returns: List of translated nodes (or list of tuples with counts)

def write( self, output_file: str, cols: list = ['node', 'parent', 'rank', 'name'], sep: str = '\t', sep_multi: str = '|', ranks: list = None, gz: bool = False):
776    def write(
777        self,
778        output_file: str,
779        cols: list = ["node", "parent", "rank", "name"],
780        sep: str = "\t",
781        sep_multi: str = "|",
782        ranks: list = None,
783        gz: bool = False,
784    ):
785        """
786        Writes loaded taxonomy to a file.
787
788        Parameters:
789        * **cols** *[list]*: Options: "node", "latest", "parent", "rank", "name", "leaves", "children", "lineage", "rank_lineage", "name_lineage"
790        * **sep** *[str]*: Separator of fields
791        * **sep_multi** *[str]*: Separator of multi-valued fields
792        * **ranks** *[list]*: Ranks to report
793        * **gz** *[bool]*: Gzip output
794
795        Returns: None
796        """
797        import gzip
798
799        if gz:
800            output_file = (
801                output_file if output_file.endswith(".gz") else output_file + ".gz"
802            )
803            check_no_file(output_file)
804            outf = gzip.open(output_file, "wt")
805        else:
806            check_no_file(output_file)
807            outf = open(output_file, "w")
808
809        write_field = {
810            "node": lambda node: node,
811            "latest": self.latest,
812            "parent": self.parent,
813            "rank": self.rank,
814            "name": self.name,
815            "leaves": lambda node: join_check(self.leaves(node), sep_multi),
816            "children": lambda node: join_check(self.children(node), sep_multi),
817            "lineage": lambda node: join_check(
818                self.lineage(node, ranks=ranks), sep_multi
819            ),
820            "rank_lineage": lambda node: join_check(
821                self.rank_lineage(node, ranks=ranks), sep_multi
822            ),
823            "name_lineage": lambda node: join_check(
824                self.name_lineage(node, ranks=ranks), sep_multi
825            ),
826        }
827
828        for c in cols:
829            if c not in write_field:
830                raise ValueError(
831                    "Field [" + c + "] is not valid. Options: " + ",".join(write_field)
832                )
833
834        if ranks:
835            for rank in ranks:
836                for node in self.nodes_rank(rank):
837                    print(
838                        *[write_field[c](node) for c in cols],
839                        sep=sep,
840                        end="\n",
841                        file=outf,
842                    )
843        else:
844            for node in self._nodes:
845                print(
846                    *[write_field[c](node) for c in cols], sep=sep, end="\n", file=outf
847                )
848
849        outf.close()

Writes loaded taxonomy to a file.

Parameters:

  • cols [list]: Options: "node", "latest", "parent", "rank", "name", "leaves", "children", "lineage", "rank_lineage", "name_lineage"
  • sep [str]: Separator of fields
  • sep_multi [str]: Separator of multi-valued fields
  • ranks [list]: Ranks to report
  • gz [bool]: Gzip output

Returns: None