multitax.multitax

  1from multitax.utils import (
  2    join_check,
  3    check_no_file,
  4    filter_function,
  5    reverse_dict,
  6    check_file,
  7    close_files,
  8    download_files,
  9    open_files,
 10    check_dir,
 11)
 12from collections import Counter
 13from datetime import datetime
 14from pylca.pylca import LCA
 15
 16
 17class MultiTax(object):
 18    _default_version = "current"
 19    _supported_versions = ["current"]
 20    _default_urls = {}
 21    _default_root_node = "1"
 22
 23    def __init__(
 24        self,
 25        version: str = None,
 26        files: list = None,
 27        urls: list = None,
 28        output_prefix: str = None,
 29        root_node: str = None,
 30        root_parent: str = "0",
 31        root_name: str = None,
 32        root_rank: str = None,
 33        undefined_node: str = None,
 34        undefined_name: str = None,
 35        undefined_rank: str = None,
 36        build_name_nodes: bool = False,
 37        build_node_children: bool = False,
 38        build_rank_nodes: bool = False,
 39        extended_names: bool = False,
 40        empty: bool = False,
 41    ):
 42        """
 43        Main constructor of MultiTax and sub-classes
 44
 45        Parameters:
 46        * **version** *[str]*: Version to download/parse or custom version name (with files/urls).
 47        * **files** *[str, list]*: One or more local files to parse.
 48        * **urls** *[str, list]*: One or more urls to download and parse.
 49        * **output_prefix** *[str]*: Directory to write downloaded files.
 50        * **root_node** *[str]*: Define an alternative root node.
 51        * **root_parent** *[str]*: Define the root parent node identifier.
 52        * **root_name** *[str]*: Define an alternative root name. Set to None to use original name.
 53        * **root_rank** *[str]*: Define an alternative root rank. Set to None to use original name.
 54        * **undefined_node** *[str]*: Define a default return value for undefined nodes.
 55        * **undefined_name** *[str]*: Define a default return value for undefined names.
 56        * **undefined_rank** *[str]*: Define a default return value for undefined ranks.
 57        * **build_node_children** *[bool]*: Build node,children dict (otherwise it will be created on first use).
 58        * **build_name_nodes** *[bool]*: Build name,nodes dict (otherwise it will be created on first use).
 59        * **build_rank_nodes** *[bool]*: Build rank,nodes dict (otherwise it will be created on first use).
 60        * **extended_names** *[bool]*: Parse extended names if available.
 61        * **empty** *[bool]*: Create an empty instance.
 62
 63        Example:
 64
 65            tax_ncbi = NcbiTx()
 66            tax_gtdb = GtdbTx(files=["file1.gz", "file2.txt"])
 67            tax_silva = SilvaTx(urls=["https://www.arb-silva.de/fileadmin/silva_databases/current/Exports/taxonomy/tax_slv_lsu_138.1.txt.gz"])
 68            tax_ott = OttTx(root_node="844192")
 69            tax_gg = GreengenesTx(output_prefix="save/to/prefix_")
 70        """
 71        if files:
 72            if isinstance(files, str):
 73                files = [files]
 74            for file in files:
 75                check_file(file)
 76
 77        if output_prefix:
 78            check_dir(output_prefix)
 79
 80        # Main structures
 81        self._nodes = {}
 82        self._ranks = {}
 83        self._names = {}
 84
 85        # Aux. structures
 86        self._lineages = {}
 87        self._name_nodes = {}
 88        self._node_children = {}
 89        self._rank_nodes = {}
 90        self._translated_nodes = {}
 91        self._lca = None
 92
 93        # Properties
 94        self.datetime = datetime.now()
 95        self.version = None
 96        self.undefined_node = undefined_node
 97        self.undefined_name = undefined_name
 98        self.undefined_rank = undefined_rank
 99
100        # Set version
101        if files or urls:
102            self.version = version
103        else:
104            self.version = self._default_version if not version else version
105            if self.version not in self._supported_versions:
106                raise ValueError(
107                    f"Version [{self.version}] not supported (possible versions: {', '.join(self._supported_versions)}). To set a custom version, use files or urls."
108                )
109
110        # Store source of tax files (url or file)
111        self.sources = []
112
113        if not empty:
114            # Open/Download/Write files
115            fhs = {}
116            if files:
117                fhs = open_files(files)
118            elif urls or self._default_urls.get(self.version):
119                fhs = download_files(
120                    urls=urls if urls else self._default_urls[self.version],
121                    output_prefix=output_prefix,
122                    retry_attempts=3,
123                )
124
125            if fhs:
126                # Parse taxonomy
127                self._nodes, self._ranks, self._names = self._parse(
128                    fhs, extended_names=extended_names
129                )
130                close_files(fhs)
131                # Save sources for stats (files or urls)
132                self.sources = list(fhs.keys())
133
134        # Set root values
135        self._set_root_node(
136            root=root_node if root_node else self._default_root_node,
137            parent=root_parent,
138            name=root_name,
139            rank=root_rank,
140        )
141
142        # build auxiliary structures
143        if build_node_children:
144            self._node_children = reverse_dict(self._nodes)
145        if build_name_nodes:
146            self._name_nodes = reverse_dict(self._names)
147        if build_rank_nodes:
148            self._rank_nodes = reverse_dict(self._ranks)
149
150        self.check_consistency()
151
152    def _exact_name(self, text: str, names: dict):
153        """
154        Returns list of nodes of a given exact name (case sensitive).
155        """
156        if text in names:
157            return names[text]
158        else:
159            return []
160
161    def _parse(self, fhs: dict):
162        """
163        main function to be overloaded
164        receives a dictionary with {"url/file": file handler}
165        return nodes, ranks and names dicts
166        """
167        return {}, {}, {}
168
169    def _partial_name(self, text: str, names: dict):
170        """
171        Searches names containing a certain text (case sensitive) and return their respective nodes.
172        """
173        matching_nodes = set()
174        for name in names:
175            if text in name:
176                matching_nodes.update(names[name])
177        return list(matching_nodes)
178
179    def _recurse_leaves(self, node: str):
180        """
181        Recursive function returning leaf nodes
182        """
183        children = self.children(node)
184        if not children:
185            return [node]
186        leaves = []
187        for child in children:
188            leaves.extend(self._recurse_leaves(child))
189        return leaves
190
191    def _remove(self, node: str):
192        """
193        Removes node from taxonomy, no checking, for internal use
194        """
195        del self._nodes[node]
196        if node in self._names:
197            del self._names[node]
198        if node in self._ranks:
199            del self._ranks[node]
200
201    def _reset_aux_data(self):
202        """
203        Reset aux. data structures
204        """
205        self._lineages = {}
206        self._name_nodes = {}
207        self._node_children = {}
208        self._rank_nodes = {}
209        self._translated_nodes = {}
210        self._lca = None
211
212    def _set_root_node(self, root: str, parent: str, name: str, rank: str):
213        """
214        Set root node of the tree.
215        The files are parsed based on the self._default_root_node for each class
216        A user-defined root node can be:
217        1) internal: will filter the tree acodingly and delete the default root_node
218        2) external: will add node and link to the default
219        """
220
221        # Set parent/root with defaults
222        self.root_parent = parent
223        self.root_node = self._default_root_node
224        self._nodes[self.root_node] = self.root_parent
225
226        # Default root node is the top by definition
227        if root != self._default_root_node:
228            if root in self._nodes:
229                # Not default but exists on tree, filter only descendants
230                self.filter(root, desc=True)
231                # Remove entry for _default_root_node
232                self._remove(self._default_root_node)
233            else:
234                # Not on tree, link default node with new root
235                self._nodes[self._default_root_node] = root
236            # Change root to user defined
237            self.root_node = root
238            # Set/Update new root node parent link
239            self._nodes[self.root_node] = self.root_parent
240
241        # User-defined rank/name.
242        # If provided, insert manually,
243        # If None, check if is in the tree (defined in the given tax)
244        #    otherwise insert default "root"
245        if name:
246            self._names[self.root_node] = name
247        elif self.root_node not in self._names:
248            self._names[self.root_node] = "root"
249        # Set static name
250        self.root_name = self._names[self.root_node]
251
252        if rank:
253            self._ranks[self.root_node] = rank
254        elif self.root_node not in self._ranks:
255            self._ranks[self.root_node] = "root"
256        # Set static rank
257        self.root_rank = self._ranks[self.root_node]
258
259    def add(self, node: str, parent: str, name: str = None, rank: str = None):
260        """
261        Adds node to taxonomy.
262        Deletes built lineages, translations and lca.
263        """
264        if parent not in self._nodes:
265            raise ValueError("Parent node [" + parent + "] not found.")
266        elif node in self._nodes:
267            raise ValueError("Node [" + node + "] already present.")
268
269        self._nodes[node] = parent
270        self._names[node] = name if name is not None else self.undefined_name
271        self._ranks[node] = rank if rank is not None else self.undefined_rank
272        self._reset_aux_data()
273
274    def build_lca(self):
275        """
276        Builds LCA structure based on pylca.
277        Optional function, LCA is built on first .lca() call.
278
279        Returns: None
280        """
281        self._lca = LCA(self._nodes)
282
283    def build_lineages(self, root_node: str = None, ranks: list = None):
284        """
285        Stores lineages in memory for faster access.
286        It is valid for lineage(), rank_lineage() and name_lineage().
287        If keyword arguments (root_node, ranks) are used in those functions stored lineages are not used.
288
289        Returns: None
290        """
291        self.clear_lineages()
292        for node in self._nodes:
293            self._lineages[node] = self.lineage(
294                node=node, root_node=root_node, ranks=ranks
295            )
296
297    def build_translation(self, tax, file: str = None, url: str = None):
298        """
299        Create a translation of current taxonomy to another
300
301        Parameters:
302
303        * **tax** [MultiTax]: A target taxonomy to be translated to.
304        * **file** *[str]*: Local file to parse.
305        * **url** *[str]*: Url to download and parse.
306
307        Example:
308
309            from multitax import GtdbTx, NcbiTx
310            gtdb_tax = GtdbTx()
311            ncbi_tax = NcbiTx()
312
313            # Automatically download translation files
314            gtdb_tax.build_translation(ncbi_tax)
315            gtdb_tax.translate("g__Escherichia")
316                {'1301', '547', '561', '570', '590', '620'}
317
318            # Using local file
319            ncbi_tax.build_translation(gtdb_tax, file="226_acc_rep_lin_ncbi.tsv.gz")
320            ncbi_tax.translate("620")
321                {'g__Escherichia', 'g__Proteus', 'g__Serratia'}
322        """
323        if file:
324            check_file(file)
325
326        self._translated_nodes = self._build_translation(tax, file, url)
327
328    def children(self, node: str):
329        """
330        Returns list of direct children nodes of a given node.
331        """
332        # Setup on first use
333        if not self._node_children:
334            self._node_children = reverse_dict(self._nodes)
335        if node in self._node_children:
336            return self._node_children[node]
337        else:
338            return []
339
340    def check_consistency(self):
341        """
342        Checks consistency of the tree
343
344        Returns: raise an Exception otherwise None
345        """
346        if self.root_node not in self._nodes:
347            raise ValueError("Root node [" + self.root_node + "] not found.")
348        if self.root_parent in self._nodes:
349            raise ValueError(
350                "Root parent ["
351                + self.root_parent
352                + "] found but should not be on tree."
353            )
354        if self.undefined_node in self._nodes:
355            raise ValueError(
356                "Undefined node ["
357                + self.undefined_node
358                + "] found but should not be on tree."
359            )
360
361        # Difference between values and keys should be only root_parent
362        lost_nodes = set(self._nodes.values()).difference(self._nodes)
363        if self.root_parent not in lost_nodes:
364            raise ValueError(
365                "Root parent [" + self.root_parent + "] not properly defined."
366            )
367        # Remove root_parent from lost nodes to report only missing
368        lost_nodes.remove(self.root_parent)
369        if len(lost_nodes) > 0:
370            raise ValueError("Parent nodes missing: " + ",".join(lost_nodes))
371
372        return None
373
374    def clear_lca(self):
375        """
376        Clear built LCA.
377
378        Returns: None
379        """
380        self._lca = None
381
382    def clear_lineages(self):
383        """
384        Clear built lineages.
385
386        Returns: None
387        """
388        self._lineages = {}
389
390    def closest_parent(self, node: str, ranks: str):
391        """
392        Returns the closest parent node based on a defined list of ranks
393        """
394        # Rank of node is already on the list
395        if self.rank(node) in ranks:
396            return node
397        else:
398            # check lineage from back to front until find a valid node
399            for n in self.lineage(node, ranks=ranks)[::-1]:
400                if n != self.undefined_node:
401                    return n
402        # nothing found
403        return self.undefined_node
404
405    def filter(self, nodes: list, desc: bool = False):
406        """
407        Filters taxonomy given a list of nodes.
408        By default keep all the ancestors of the given nodes.
409        If desc=True, keep all descendants instead.
410        Deletes built lineages, translations and lca.
411
412        Example:
413
414            from multitax import GtdbTx
415            tax = GtdbTx()
416
417            tax.lineage('s__Enterovibrio marina')
418            # ['1', 'd__Bacteria', 'p__Proteobacteria', 'c__Gammaproteobacteria', 'o__Enterobacterales', 'f__Vibrionaceae', 'g__Enterovibrio', 's__Enterovibrio marina']
419            # Keep only ancestors of 'g__Enterovibrio'
420            tax.filter('g__Enterovibrio')
421
422            # Reload taxonomy
423            tax = GtdbTx()
424            # Keep only descendants of 'g__Enterovibrio'
425            tax.filter('g__Enterovibrio', desc=True)
426        """
427        if isinstance(nodes, str):
428            nodes = [nodes]
429
430        # Keep track of nodes to be filtered out
431        filtered_nodes = set(self._nodes)
432        # Always keep root
433        filtered_nodes.discard(self.root_node)
434
435        if desc:
436            # Keep descendants of the given nodes
437            for node in nodes:
438                # Check if node exists (skips root)
439                if node in filtered_nodes:
440                    # For each leaf of the selected nodes
441                    for leaf in self.leaves(node):
442                        # Build lineage of each leaf up-to node itself
443                        for n in self.lineage(leaf, root_node=node):
444                            # Discard nodes from set to be kept
445                            filtered_nodes.discard(n)
446                    # Link node to root
447                    self._nodes[node] = self.root_node
448        else:
449            # Keep ancestors of the given nodes (full lineage up-to root)
450            for node in nodes:
451                # ranks=[] in case build_lineages() was used with specific ranks
452                for n in self.lineage(node, ranks=[]):
453                    # Discard nodes from set to be kept
454                    filtered_nodes.discard(n)
455
456        # Delete filtered nodes
457        for node in filtered_nodes:
458            self._remove(node)
459
460        # Delete aux. data structures
461        self._reset_aux_data()
462        self.check_consistency()
463
464    @classmethod
465    def from_customtx(cls, ctx):
466        """
467        Initialize a Tx sub-class based on a CustomTx instance.
468
469        Example:
470
471            tax_custom = CustomTx(version="custom_ncbi_files", files="my_custom_tax.tsv", cols=["node","parent","rank"])
472            tax_ncbi = NcbiTx.from_customtx(tax_custom)
473        """
474        nc = cls(empty=True)
475        nc.version = ctx.version
476        nc.sources = ctx.sources
477        nc._nodes = ctx._nodes
478        nc._names = ctx._names
479        nc._ranks = ctx._ranks
480        return nc
481
482    def latest(self, node: str):
483        """
484        Returns latest/updated version of a given node.
485        If node is already the latests, returns itself.
486        Mainly used for NCBI (merged.dmp) and OTT (forwards.tsv)
487        """
488        if node in self._nodes:
489            return node
490        else:
491            return self.undefined_node
492
493    def leaves(self, node: str = None):
494        """
495        Returns a list of leaf nodes of a given node.
496        """
497        if node is None or node == self.root_node:
498            # Leaves are nodes not contained in _nodes.values() ("parents")
499            return list(set(self._nodes).difference(self._nodes.values()))
500        elif node in self._nodes:
501            return self._recurse_leaves(node)
502        else:
503            return []
504
505    def lca(self, nodes: list = None):
506        """
507        Returns the lowest common ancestor of two or more nodes.
508
509        Example:
510
511            from multitax import GtdbTx
512            tax = GtdbTx()
513            tax.lca(["s__Escherichia coli", "s__Escherichia fergusonii"])
514        """
515        for node in nodes:
516            if node not in self._nodes:
517                raise ValueError("Node [" + node + "] not found.")
518
519        # Setup on first use
520        if not self._lca:
521            self.build_lca()
522
523        return self._lca(*nodes)
524
525    def lineage(self, node: str, root_node: str = None, ranks: list = None):
526        """
527        Returns a list with the lineage of a given node.
528        If ranks is provided, returns only nodes annotated with such ranks.
529        If root_node is provided, use it instead of default root of tree.
530        """
531        # If lineages were built with build_lineages() with matching params
532        if node in self._lineages and root_node is None and ranks is None:
533            return self._lineages[node]
534        else:
535            if not root_node:
536                root_node = self.root_node
537
538            n = node
539            if ranks:
540                # Fixed length lineage
541                lin = [self.undefined_node] * len(ranks)
542                # Loop until end of the tree (in case chosen root is not on lineage)
543                while n != self.undefined_node:
544                    r = self.rank(n)
545                    if r in ranks:
546                        lin[ranks.index(r)] = n
547                    # If node is root, break (after adding)
548                    if n == root_node:
549                        break
550                    n = self.parent(n)
551            else:
552                # Full lineage
553                lin = []
554                # Loop until end of the tree (in case chosen root is not on lineage)
555                while n != self.undefined_node:
556                    lin.append(n)
557                    # If node is root, break (after adding)
558                    if n == root_node:
559                        break
560                    n = self.parent(n)
561                # Reverse order
562                lin = lin[::-1]
563
564            # last iteration node (n) != root_node: didn't find the root, invalid lineage
565            if n != root_node:
566                return []
567            else:
568                return lin
569
570    def name(self, node: str):
571        """
572        Returns name of a given node.
573        """
574        if node in self._names:
575            return self._names[node]
576        else:
577            return self.undefined_name
578
579    def name_lineage(self, node: str, root_node: str = None, ranks: list = None):
580        """
581        Returns a list with the name lineage of a given node.
582        """
583        return list(
584            map(self.name, self.lineage(node=node, root_node=root_node, ranks=ranks))
585        )
586
587    def nodes_rank(self, rank: str):
588        """
589        Returns list of nodes of a given rank.
590        """
591        # Setup on first use
592        if not self._rank_nodes:
593            self._rank_nodes = reverse_dict(self._ranks)
594        if rank in self._rank_nodes:
595            return self._rank_nodes[rank]
596        else:
597            return []
598
599    def parent(self, node: str):
600        """
601        Returns the direct parent node of a given node.
602        """
603        if node in self._nodes:
604            return self._nodes[node]
605        else:
606            return self.undefined_node
607
608    def parent_rank(self, node: str, rank: str):
609        """
610        Returns the parent node of a given rank in the specified rank.
611        """
612        parent = self.lineage(node=node, ranks=[rank])
613        return parent[0] if parent else self.undefined_node
614
615    def prune(self, nodes: list):
616        """
617        Prunes branches of the tree under the given nodes.
618        Deletes built lineages, translations and lca.
619        """
620
621        if isinstance(nodes, str):
622            nodes = [nodes]
623
624        del_nodes = set()
625        for node in nodes:
626            if node not in self._nodes:
627                raise ValueError("Node [" + node + "] not found.")
628            for leaf in self.leaves(node):
629                for n in self.lineage(leaf, root_node=node)[1:]:
630                    del_nodes.add(n)
631
632        for n in del_nodes:
633            self._remove(n)
634
635        self._reset_aux_data()
636
637    def rank(self, node: str):
638        """
639        Returns the rank of a given node.
640        """
641        if node in self._ranks:
642            return self._ranks[node]
643        else:
644            return self.undefined_rank
645
646    def rank_lineage(self, node: str, root_node: str = None, ranks: list = None):
647        """
648        Returns a list with the rank lineage of a given node.
649        """
650        return list(
651            map(self.rank, self.lineage(node=node, root_node=root_node, ranks=ranks))
652        )
653
654    def remove(self, node: str, check_consistency: bool = False):
655        """
656        Removes node from taxonomy. Can break the tree if a parent node is removed. To remove a certain branch, use prune.
657        Running check consistency after removing a node is recommended.
658        Deletes built lineages, translations and lca.
659        """
660        if node not in self._nodes:
661            raise ValueError("Node [" + node + "] not found.")
662        self._remove(node)
663        self._reset_aux_data()
664        if check_consistency:
665            self.check_consistency()
666
667    def search_name(self, text: str, rank: str = None, exact: bool = True):
668        """
669        Search node by exact or partial name
670
671        Parameters:
672        * **text** *[str]*: Text to search.
673        * **rank** *[str]*: Filter results by rank.
674        * **exact** *[bool]*: Exact or partial name search (both case sensitive).
675
676        Returns: list of matching nodes
677        """
678        # Setup on first use
679        if not self._name_nodes:
680            self._name_nodes = reverse_dict(self._names)
681
682        if exact:
683            ret = self._exact_name(text, self._name_nodes)
684        else:
685            ret = self._partial_name(text, self._name_nodes)
686
687        # Only return nodes of chosen rank
688        if rank:
689            return filter_function(ret, self.rank, rank)
690        else:
691            return ret
692
693    def stats(self):
694        """
695        Returns a dict with general numbers of the taxonomic tree
696
697        Example:
698
699            from pprint import pprint
700            from multitax import GtdbTx
701            tax = GtdbTx()
702
703            pprint(tax.stats())
704            {'leaves': 30238,
705             'names': 42739,
706             'nodes': 42739,
707             'ranked_leaves': Counter({'species': 30238}),
708             'ranked_nodes': Counter({'species': 30238,
709                                      'genus': 8778,
710                                      'family': 2323,
711                                      'order': 930,
712                                      'class': 337,
713                                      'phylum': 131,
714                                      'domain': 1,
715                                      'root': 1}),
716             'ranks': 42739}
717        """
718        s = {}
719        s["nodes"] = len(self._nodes)
720        s["ranks"] = len(self._ranks)
721        s["names"] = len(self._names)
722        all_leaves = self.leaves(self.root_node)
723        s["leaves"] = len(all_leaves)
724        s["ranked_nodes"] = Counter(self._ranks.values())
725        s["ranked_leaves"] = Counter(map(self.rank, all_leaves))
726        return s
727
728    def translate(self, node: str):
729        """
730        Returns the translated node from another taxonomy. Translated nodes are generated with the build_translation function.
731        """
732        if node in self._translated_nodes:
733            return self._translated_nodes[node]
734        else:
735            return []
736
737    def write(
738        self,
739        output_file: str,
740        cols: list = ["node", "parent", "rank", "name"],
741        sep: str = "\t",
742        sep_multi: str = "|",
743        ranks: list = None,
744        gz: bool = False,
745    ):
746        """
747        Writes loaded taxonomy to a file.
748
749        Parameters:
750        * **cols** *[list]*: Options: "node", "latest", "parent", "rank", "name", "leaves", "children", "lineage", "rank_lineage", "name_lineage"
751        * **sep** *[str]*: Separator of fields
752        * **sep_multi** *[str]*: Separator of multi-valued fields
753        * **ranks** *[list]*: Ranks to report
754        * **gz** *[bool]*: Gzip output
755
756        Returns: None
757        """
758        import gzip
759
760        if gz:
761            output_file = (
762                output_file if output_file.endswith(".gz") else output_file + ".gz"
763            )
764            check_no_file(output_file)
765            outf = gzip.open(output_file, "wt")
766        else:
767            check_no_file(output_file)
768            outf = open(output_file, "w")
769
770        write_field = {
771            "node": lambda node: node,
772            "latest": self.latest,
773            "parent": self.parent,
774            "rank": self.rank,
775            "name": self.name,
776            "leaves": lambda node: join_check(self.leaves(node), sep_multi),
777            "children": lambda node: join_check(self.children(node), sep_multi),
778            "lineage": lambda node: join_check(
779                self.lineage(node, ranks=ranks), sep_multi
780            ),
781            "rank_lineage": lambda node: join_check(
782                self.rank_lineage(node, ranks=ranks), sep_multi
783            ),
784            "name_lineage": lambda node: join_check(
785                self.name_lineage(node, ranks=ranks), sep_multi
786            ),
787        }
788
789        for c in cols:
790            if c not in write_field:
791                raise ValueError(
792                    "Field [" + c + "] is not valid. Options: " + ",".join(write_field)
793                )
794
795        if ranks:
796            for rank in ranks:
797                for node in self.nodes_rank(rank):
798                    print(
799                        *[write_field[c](node) for c in cols],
800                        sep=sep,
801                        end="\n",
802                        file=outf,
803                    )
804        else:
805            for node in self._nodes:
806                print(
807                    *[write_field[c](node) for c in cols], sep=sep, end="\n", file=outf
808                )
809
810        outf.close()
class MultiTax:
 18class MultiTax(object):
 19    _default_version = "current"
 20    _supported_versions = ["current"]
 21    _default_urls = {}
 22    _default_root_node = "1"
 23
 24    def __init__(
 25        self,
 26        version: str = None,
 27        files: list = None,
 28        urls: list = None,
 29        output_prefix: str = None,
 30        root_node: str = None,
 31        root_parent: str = "0",
 32        root_name: str = None,
 33        root_rank: str = None,
 34        undefined_node: str = None,
 35        undefined_name: str = None,
 36        undefined_rank: str = None,
 37        build_name_nodes: bool = False,
 38        build_node_children: bool = False,
 39        build_rank_nodes: bool = False,
 40        extended_names: bool = False,
 41        empty: bool = False,
 42    ):
 43        """
 44        Main constructor of MultiTax and sub-classes
 45
 46        Parameters:
 47        * **version** *[str]*: Version to download/parse or custom version name (with files/urls).
 48        * **files** *[str, list]*: One or more local files to parse.
 49        * **urls** *[str, list]*: One or more urls to download and parse.
 50        * **output_prefix** *[str]*: Directory to write downloaded files.
 51        * **root_node** *[str]*: Define an alternative root node.
 52        * **root_parent** *[str]*: Define the root parent node identifier.
 53        * **root_name** *[str]*: Define an alternative root name. Set to None to use original name.
 54        * **root_rank** *[str]*: Define an alternative root rank. Set to None to use original name.
 55        * **undefined_node** *[str]*: Define a default return value for undefined nodes.
 56        * **undefined_name** *[str]*: Define a default return value for undefined names.
 57        * **undefined_rank** *[str]*: Define a default return value for undefined ranks.
 58        * **build_node_children** *[bool]*: Build node,children dict (otherwise it will be created on first use).
 59        * **build_name_nodes** *[bool]*: Build name,nodes dict (otherwise it will be created on first use).
 60        * **build_rank_nodes** *[bool]*: Build rank,nodes dict (otherwise it will be created on first use).
 61        * **extended_names** *[bool]*: Parse extended names if available.
 62        * **empty** *[bool]*: Create an empty instance.
 63
 64        Example:
 65
 66            tax_ncbi = NcbiTx()
 67            tax_gtdb = GtdbTx(files=["file1.gz", "file2.txt"])
 68            tax_silva = SilvaTx(urls=["https://www.arb-silva.de/fileadmin/silva_databases/current/Exports/taxonomy/tax_slv_lsu_138.1.txt.gz"])
 69            tax_ott = OttTx(root_node="844192")
 70            tax_gg = GreengenesTx(output_prefix="save/to/prefix_")
 71        """
 72        if files:
 73            if isinstance(files, str):
 74                files = [files]
 75            for file in files:
 76                check_file(file)
 77
 78        if output_prefix:
 79            check_dir(output_prefix)
 80
 81        # Main structures
 82        self._nodes = {}
 83        self._ranks = {}
 84        self._names = {}
 85
 86        # Aux. structures
 87        self._lineages = {}
 88        self._name_nodes = {}
 89        self._node_children = {}
 90        self._rank_nodes = {}
 91        self._translated_nodes = {}
 92        self._lca = None
 93
 94        # Properties
 95        self.datetime = datetime.now()
 96        self.version = None
 97        self.undefined_node = undefined_node
 98        self.undefined_name = undefined_name
 99        self.undefined_rank = undefined_rank
100
101        # Set version
102        if files or urls:
103            self.version = version
104        else:
105            self.version = self._default_version if not version else version
106            if self.version not in self._supported_versions:
107                raise ValueError(
108                    f"Version [{self.version}] not supported (possible versions: {', '.join(self._supported_versions)}). To set a custom version, use files or urls."
109                )
110
111        # Store source of tax files (url or file)
112        self.sources = []
113
114        if not empty:
115            # Open/Download/Write files
116            fhs = {}
117            if files:
118                fhs = open_files(files)
119            elif urls or self._default_urls.get(self.version):
120                fhs = download_files(
121                    urls=urls if urls else self._default_urls[self.version],
122                    output_prefix=output_prefix,
123                    retry_attempts=3,
124                )
125
126            if fhs:
127                # Parse taxonomy
128                self._nodes, self._ranks, self._names = self._parse(
129                    fhs, extended_names=extended_names
130                )
131                close_files(fhs)
132                # Save sources for stats (files or urls)
133                self.sources = list(fhs.keys())
134
135        # Set root values
136        self._set_root_node(
137            root=root_node if root_node else self._default_root_node,
138            parent=root_parent,
139            name=root_name,
140            rank=root_rank,
141        )
142
143        # build auxiliary structures
144        if build_node_children:
145            self._node_children = reverse_dict(self._nodes)
146        if build_name_nodes:
147            self._name_nodes = reverse_dict(self._names)
148        if build_rank_nodes:
149            self._rank_nodes = reverse_dict(self._ranks)
150
151        self.check_consistency()
152
153    def _exact_name(self, text: str, names: dict):
154        """
155        Returns list of nodes of a given exact name (case sensitive).
156        """
157        if text in names:
158            return names[text]
159        else:
160            return []
161
162    def _parse(self, fhs: dict):
163        """
164        main function to be overloaded
165        receives a dictionary with {"url/file": file handler}
166        return nodes, ranks and names dicts
167        """
168        return {}, {}, {}
169
170    def _partial_name(self, text: str, names: dict):
171        """
172        Searches names containing a certain text (case sensitive) and return their respective nodes.
173        """
174        matching_nodes = set()
175        for name in names:
176            if text in name:
177                matching_nodes.update(names[name])
178        return list(matching_nodes)
179
180    def _recurse_leaves(self, node: str):
181        """
182        Recursive function returning leaf nodes
183        """
184        children = self.children(node)
185        if not children:
186            return [node]
187        leaves = []
188        for child in children:
189            leaves.extend(self._recurse_leaves(child))
190        return leaves
191
192    def _remove(self, node: str):
193        """
194        Removes node from taxonomy, no checking, for internal use
195        """
196        del self._nodes[node]
197        if node in self._names:
198            del self._names[node]
199        if node in self._ranks:
200            del self._ranks[node]
201
202    def _reset_aux_data(self):
203        """
204        Reset aux. data structures
205        """
206        self._lineages = {}
207        self._name_nodes = {}
208        self._node_children = {}
209        self._rank_nodes = {}
210        self._translated_nodes = {}
211        self._lca = None
212
213    def _set_root_node(self, root: str, parent: str, name: str, rank: str):
214        """
215        Set root node of the tree.
216        The files are parsed based on the self._default_root_node for each class
217        A user-defined root node can be:
218        1) internal: will filter the tree acodingly and delete the default root_node
219        2) external: will add node and link to the default
220        """
221
222        # Set parent/root with defaults
223        self.root_parent = parent
224        self.root_node = self._default_root_node
225        self._nodes[self.root_node] = self.root_parent
226
227        # Default root node is the top by definition
228        if root != self._default_root_node:
229            if root in self._nodes:
230                # Not default but exists on tree, filter only descendants
231                self.filter(root, desc=True)
232                # Remove entry for _default_root_node
233                self._remove(self._default_root_node)
234            else:
235                # Not on tree, link default node with new root
236                self._nodes[self._default_root_node] = root
237            # Change root to user defined
238            self.root_node = root
239            # Set/Update new root node parent link
240            self._nodes[self.root_node] = self.root_parent
241
242        # User-defined rank/name.
243        # If provided, insert manually,
244        # If None, check if is in the tree (defined in the given tax)
245        #    otherwise insert default "root"
246        if name:
247            self._names[self.root_node] = name
248        elif self.root_node not in self._names:
249            self._names[self.root_node] = "root"
250        # Set static name
251        self.root_name = self._names[self.root_node]
252
253        if rank:
254            self._ranks[self.root_node] = rank
255        elif self.root_node not in self._ranks:
256            self._ranks[self.root_node] = "root"
257        # Set static rank
258        self.root_rank = self._ranks[self.root_node]
259
260    def add(self, node: str, parent: str, name: str = None, rank: str = None):
261        """
262        Adds node to taxonomy.
263        Deletes built lineages, translations and lca.
264        """
265        if parent not in self._nodes:
266            raise ValueError("Parent node [" + parent + "] not found.")
267        elif node in self._nodes:
268            raise ValueError("Node [" + node + "] already present.")
269
270        self._nodes[node] = parent
271        self._names[node] = name if name is not None else self.undefined_name
272        self._ranks[node] = rank if rank is not None else self.undefined_rank
273        self._reset_aux_data()
274
275    def build_lca(self):
276        """
277        Builds LCA structure based on pylca.
278        Optional function, LCA is built on first .lca() call.
279
280        Returns: None
281        """
282        self._lca = LCA(self._nodes)
283
284    def build_lineages(self, root_node: str = None, ranks: list = None):
285        """
286        Stores lineages in memory for faster access.
287        It is valid for lineage(), rank_lineage() and name_lineage().
288        If keyword arguments (root_node, ranks) are used in those functions stored lineages are not used.
289
290        Returns: None
291        """
292        self.clear_lineages()
293        for node in self._nodes:
294            self._lineages[node] = self.lineage(
295                node=node, root_node=root_node, ranks=ranks
296            )
297
298    def build_translation(self, tax, file: str = None, url: str = None):
299        """
300        Create a translation of current taxonomy to another
301
302        Parameters:
303
304        * **tax** [MultiTax]: A target taxonomy to be translated to.
305        * **file** *[str]*: Local file to parse.
306        * **url** *[str]*: Url to download and parse.
307
308        Example:
309
310            from multitax import GtdbTx, NcbiTx
311            gtdb_tax = GtdbTx()
312            ncbi_tax = NcbiTx()
313
314            # Automatically download translation files
315            gtdb_tax.build_translation(ncbi_tax)
316            gtdb_tax.translate("g__Escherichia")
317                {'1301', '547', '561', '570', '590', '620'}
318
319            # Using local file
320            ncbi_tax.build_translation(gtdb_tax, file="226_acc_rep_lin_ncbi.tsv.gz")
321            ncbi_tax.translate("620")
322                {'g__Escherichia', 'g__Proteus', 'g__Serratia'}
323        """
324        if file:
325            check_file(file)
326
327        self._translated_nodes = self._build_translation(tax, file, url)
328
329    def children(self, node: str):
330        """
331        Returns list of direct children nodes of a given node.
332        """
333        # Setup on first use
334        if not self._node_children:
335            self._node_children = reverse_dict(self._nodes)
336        if node in self._node_children:
337            return self._node_children[node]
338        else:
339            return []
340
341    def check_consistency(self):
342        """
343        Checks consistency of the tree
344
345        Returns: raise an Exception otherwise None
346        """
347        if self.root_node not in self._nodes:
348            raise ValueError("Root node [" + self.root_node + "] not found.")
349        if self.root_parent in self._nodes:
350            raise ValueError(
351                "Root parent ["
352                + self.root_parent
353                + "] found but should not be on tree."
354            )
355        if self.undefined_node in self._nodes:
356            raise ValueError(
357                "Undefined node ["
358                + self.undefined_node
359                + "] found but should not be on tree."
360            )
361
362        # Difference between values and keys should be only root_parent
363        lost_nodes = set(self._nodes.values()).difference(self._nodes)
364        if self.root_parent not in lost_nodes:
365            raise ValueError(
366                "Root parent [" + self.root_parent + "] not properly defined."
367            )
368        # Remove root_parent from lost nodes to report only missing
369        lost_nodes.remove(self.root_parent)
370        if len(lost_nodes) > 0:
371            raise ValueError("Parent nodes missing: " + ",".join(lost_nodes))
372
373        return None
374
375    def clear_lca(self):
376        """
377        Clear built LCA.
378
379        Returns: None
380        """
381        self._lca = None
382
383    def clear_lineages(self):
384        """
385        Clear built lineages.
386
387        Returns: None
388        """
389        self._lineages = {}
390
391    def closest_parent(self, node: str, ranks: str):
392        """
393        Returns the closest parent node based on a defined list of ranks
394        """
395        # Rank of node is already on the list
396        if self.rank(node) in ranks:
397            return node
398        else:
399            # check lineage from back to front until find a valid node
400            for n in self.lineage(node, ranks=ranks)[::-1]:
401                if n != self.undefined_node:
402                    return n
403        # nothing found
404        return self.undefined_node
405
406    def filter(self, nodes: list, desc: bool = False):
407        """
408        Filters taxonomy given a list of nodes.
409        By default keep all the ancestors of the given nodes.
410        If desc=True, keep all descendants instead.
411        Deletes built lineages, translations and lca.
412
413        Example:
414
415            from multitax import GtdbTx
416            tax = GtdbTx()
417
418            tax.lineage('s__Enterovibrio marina')
419            # ['1', 'd__Bacteria', 'p__Proteobacteria', 'c__Gammaproteobacteria', 'o__Enterobacterales', 'f__Vibrionaceae', 'g__Enterovibrio', 's__Enterovibrio marina']
420            # Keep only ancestors of 'g__Enterovibrio'
421            tax.filter('g__Enterovibrio')
422
423            # Reload taxonomy
424            tax = GtdbTx()
425            # Keep only descendants of 'g__Enterovibrio'
426            tax.filter('g__Enterovibrio', desc=True)
427        """
428        if isinstance(nodes, str):
429            nodes = [nodes]
430
431        # Keep track of nodes to be filtered out
432        filtered_nodes = set(self._nodes)
433        # Always keep root
434        filtered_nodes.discard(self.root_node)
435
436        if desc:
437            # Keep descendants of the given nodes
438            for node in nodes:
439                # Check if node exists (skips root)
440                if node in filtered_nodes:
441                    # For each leaf of the selected nodes
442                    for leaf in self.leaves(node):
443                        # Build lineage of each leaf up-to node itself
444                        for n in self.lineage(leaf, root_node=node):
445                            # Discard nodes from set to be kept
446                            filtered_nodes.discard(n)
447                    # Link node to root
448                    self._nodes[node] = self.root_node
449        else:
450            # Keep ancestors of the given nodes (full lineage up-to root)
451            for node in nodes:
452                # ranks=[] in case build_lineages() was used with specific ranks
453                for n in self.lineage(node, ranks=[]):
454                    # Discard nodes from set to be kept
455                    filtered_nodes.discard(n)
456
457        # Delete filtered nodes
458        for node in filtered_nodes:
459            self._remove(node)
460
461        # Delete aux. data structures
462        self._reset_aux_data()
463        self.check_consistency()
464
465    @classmethod
466    def from_customtx(cls, ctx):
467        """
468        Initialize a Tx sub-class based on a CustomTx instance.
469
470        Example:
471
472            tax_custom = CustomTx(version="custom_ncbi_files", files="my_custom_tax.tsv", cols=["node","parent","rank"])
473            tax_ncbi = NcbiTx.from_customtx(tax_custom)
474        """
475        nc = cls(empty=True)
476        nc.version = ctx.version
477        nc.sources = ctx.sources
478        nc._nodes = ctx._nodes
479        nc._names = ctx._names
480        nc._ranks = ctx._ranks
481        return nc
482
483    def latest(self, node: str):
484        """
485        Returns latest/updated version of a given node.
486        If node is already the latests, returns itself.
487        Mainly used for NCBI (merged.dmp) and OTT (forwards.tsv)
488        """
489        if node in self._nodes:
490            return node
491        else:
492            return self.undefined_node
493
494    def leaves(self, node: str = None):
495        """
496        Returns a list of leaf nodes of a given node.
497        """
498        if node is None or node == self.root_node:
499            # Leaves are nodes not contained in _nodes.values() ("parents")
500            return list(set(self._nodes).difference(self._nodes.values()))
501        elif node in self._nodes:
502            return self._recurse_leaves(node)
503        else:
504            return []
505
506    def lca(self, nodes: list = None):
507        """
508        Returns the lowest common ancestor of two or more nodes.
509
510        Example:
511
512            from multitax import GtdbTx
513            tax = GtdbTx()
514            tax.lca(["s__Escherichia coli", "s__Escherichia fergusonii"])
515        """
516        for node in nodes:
517            if node not in self._nodes:
518                raise ValueError("Node [" + node + "] not found.")
519
520        # Setup on first use
521        if not self._lca:
522            self.build_lca()
523
524        return self._lca(*nodes)
525
526    def lineage(self, node: str, root_node: str = None, ranks: list = None):
527        """
528        Returns a list with the lineage of a given node.
529        If ranks is provided, returns only nodes annotated with such ranks.
530        If root_node is provided, use it instead of default root of tree.
531        """
532        # If lineages were built with build_lineages() with matching params
533        if node in self._lineages and root_node is None and ranks is None:
534            return self._lineages[node]
535        else:
536            if not root_node:
537                root_node = self.root_node
538
539            n = node
540            if ranks:
541                # Fixed length lineage
542                lin = [self.undefined_node] * len(ranks)
543                # Loop until end of the tree (in case chosen root is not on lineage)
544                while n != self.undefined_node:
545                    r = self.rank(n)
546                    if r in ranks:
547                        lin[ranks.index(r)] = n
548                    # If node is root, break (after adding)
549                    if n == root_node:
550                        break
551                    n = self.parent(n)
552            else:
553                # Full lineage
554                lin = []
555                # Loop until end of the tree (in case chosen root is not on lineage)
556                while n != self.undefined_node:
557                    lin.append(n)
558                    # If node is root, break (after adding)
559                    if n == root_node:
560                        break
561                    n = self.parent(n)
562                # Reverse order
563                lin = lin[::-1]
564
565            # last iteration node (n) != root_node: didn't find the root, invalid lineage
566            if n != root_node:
567                return []
568            else:
569                return lin
570
571    def name(self, node: str):
572        """
573        Returns name of a given node.
574        """
575        if node in self._names:
576            return self._names[node]
577        else:
578            return self.undefined_name
579
580    def name_lineage(self, node: str, root_node: str = None, ranks: list = None):
581        """
582        Returns a list with the name lineage of a given node.
583        """
584        return list(
585            map(self.name, self.lineage(node=node, root_node=root_node, ranks=ranks))
586        )
587
588    def nodes_rank(self, rank: str):
589        """
590        Returns list of nodes of a given rank.
591        """
592        # Setup on first use
593        if not self._rank_nodes:
594            self._rank_nodes = reverse_dict(self._ranks)
595        if rank in self._rank_nodes:
596            return self._rank_nodes[rank]
597        else:
598            return []
599
600    def parent(self, node: str):
601        """
602        Returns the direct parent node of a given node.
603        """
604        if node in self._nodes:
605            return self._nodes[node]
606        else:
607            return self.undefined_node
608
609    def parent_rank(self, node: str, rank: str):
610        """
611        Returns the parent node of a given rank in the specified rank.
612        """
613        parent = self.lineage(node=node, ranks=[rank])
614        return parent[0] if parent else self.undefined_node
615
616    def prune(self, nodes: list):
617        """
618        Prunes branches of the tree under the given nodes.
619        Deletes built lineages, translations and lca.
620        """
621
622        if isinstance(nodes, str):
623            nodes = [nodes]
624
625        del_nodes = set()
626        for node in nodes:
627            if node not in self._nodes:
628                raise ValueError("Node [" + node + "] not found.")
629            for leaf in self.leaves(node):
630                for n in self.lineage(leaf, root_node=node)[1:]:
631                    del_nodes.add(n)
632
633        for n in del_nodes:
634            self._remove(n)
635
636        self._reset_aux_data()
637
638    def rank(self, node: str):
639        """
640        Returns the rank of a given node.
641        """
642        if node in self._ranks:
643            return self._ranks[node]
644        else:
645            return self.undefined_rank
646
647    def rank_lineage(self, node: str, root_node: str = None, ranks: list = None):
648        """
649        Returns a list with the rank lineage of a given node.
650        """
651        return list(
652            map(self.rank, self.lineage(node=node, root_node=root_node, ranks=ranks))
653        )
654
655    def remove(self, node: str, check_consistency: bool = False):
656        """
657        Removes node from taxonomy. Can break the tree if a parent node is removed. To remove a certain branch, use prune.
658        Running check consistency after removing a node is recommended.
659        Deletes built lineages, translations and lca.
660        """
661        if node not in self._nodes:
662            raise ValueError("Node [" + node + "] not found.")
663        self._remove(node)
664        self._reset_aux_data()
665        if check_consistency:
666            self.check_consistency()
667
668    def search_name(self, text: str, rank: str = None, exact: bool = True):
669        """
670        Search node by exact or partial name
671
672        Parameters:
673        * **text** *[str]*: Text to search.
674        * **rank** *[str]*: Filter results by rank.
675        * **exact** *[bool]*: Exact or partial name search (both case sensitive).
676
677        Returns: list of matching nodes
678        """
679        # Setup on first use
680        if not self._name_nodes:
681            self._name_nodes = reverse_dict(self._names)
682
683        if exact:
684            ret = self._exact_name(text, self._name_nodes)
685        else:
686            ret = self._partial_name(text, self._name_nodes)
687
688        # Only return nodes of chosen rank
689        if rank:
690            return filter_function(ret, self.rank, rank)
691        else:
692            return ret
693
694    def stats(self):
695        """
696        Returns a dict with general numbers of the taxonomic tree
697
698        Example:
699
700            from pprint import pprint
701            from multitax import GtdbTx
702            tax = GtdbTx()
703
704            pprint(tax.stats())
705            {'leaves': 30238,
706             'names': 42739,
707             'nodes': 42739,
708             'ranked_leaves': Counter({'species': 30238}),
709             'ranked_nodes': Counter({'species': 30238,
710                                      'genus': 8778,
711                                      'family': 2323,
712                                      'order': 930,
713                                      'class': 337,
714                                      'phylum': 131,
715                                      'domain': 1,
716                                      'root': 1}),
717             'ranks': 42739}
718        """
719        s = {}
720        s["nodes"] = len(self._nodes)
721        s["ranks"] = len(self._ranks)
722        s["names"] = len(self._names)
723        all_leaves = self.leaves(self.root_node)
724        s["leaves"] = len(all_leaves)
725        s["ranked_nodes"] = Counter(self._ranks.values())
726        s["ranked_leaves"] = Counter(map(self.rank, all_leaves))
727        return s
728
729    def translate(self, node: str):
730        """
731        Returns the translated node from another taxonomy. Translated nodes are generated with the build_translation function.
732        """
733        if node in self._translated_nodes:
734            return self._translated_nodes[node]
735        else:
736            return []
737
738    def write(
739        self,
740        output_file: str,
741        cols: list = ["node", "parent", "rank", "name"],
742        sep: str = "\t",
743        sep_multi: str = "|",
744        ranks: list = None,
745        gz: bool = False,
746    ):
747        """
748        Writes loaded taxonomy to a file.
749
750        Parameters:
751        * **cols** *[list]*: Options: "node", "latest", "parent", "rank", "name", "leaves", "children", "lineage", "rank_lineage", "name_lineage"
752        * **sep** *[str]*: Separator of fields
753        * **sep_multi** *[str]*: Separator of multi-valued fields
754        * **ranks** *[list]*: Ranks to report
755        * **gz** *[bool]*: Gzip output
756
757        Returns: None
758        """
759        import gzip
760
761        if gz:
762            output_file = (
763                output_file if output_file.endswith(".gz") else output_file + ".gz"
764            )
765            check_no_file(output_file)
766            outf = gzip.open(output_file, "wt")
767        else:
768            check_no_file(output_file)
769            outf = open(output_file, "w")
770
771        write_field = {
772            "node": lambda node: node,
773            "latest": self.latest,
774            "parent": self.parent,
775            "rank": self.rank,
776            "name": self.name,
777            "leaves": lambda node: join_check(self.leaves(node), sep_multi),
778            "children": lambda node: join_check(self.children(node), sep_multi),
779            "lineage": lambda node: join_check(
780                self.lineage(node, ranks=ranks), sep_multi
781            ),
782            "rank_lineage": lambda node: join_check(
783                self.rank_lineage(node, ranks=ranks), sep_multi
784            ),
785            "name_lineage": lambda node: join_check(
786                self.name_lineage(node, ranks=ranks), sep_multi
787            ),
788        }
789
790        for c in cols:
791            if c not in write_field:
792                raise ValueError(
793                    "Field [" + c + "] is not valid. Options: " + ",".join(write_field)
794                )
795
796        if ranks:
797            for rank in ranks:
798                for node in self.nodes_rank(rank):
799                    print(
800                        *[write_field[c](node) for c in cols],
801                        sep=sep,
802                        end="\n",
803                        file=outf,
804                    )
805        else:
806            for node in self._nodes:
807                print(
808                    *[write_field[c](node) for c in cols], sep=sep, end="\n", file=outf
809                )
810
811        outf.close()
MultiTax( version: str = None, files: list = None, urls: list = None, output_prefix: str = None, root_node: str = None, root_parent: str = '0', root_name: str = None, root_rank: str = None, undefined_node: str = None, undefined_name: str = None, undefined_rank: str = None, build_name_nodes: bool = False, build_node_children: bool = False, build_rank_nodes: bool = False, extended_names: bool = False, empty: bool = False)
 24    def __init__(
 25        self,
 26        version: str = None,
 27        files: list = None,
 28        urls: list = None,
 29        output_prefix: str = None,
 30        root_node: str = None,
 31        root_parent: str = "0",
 32        root_name: str = None,
 33        root_rank: str = None,
 34        undefined_node: str = None,
 35        undefined_name: str = None,
 36        undefined_rank: str = None,
 37        build_name_nodes: bool = False,
 38        build_node_children: bool = False,
 39        build_rank_nodes: bool = False,
 40        extended_names: bool = False,
 41        empty: bool = False,
 42    ):
 43        """
 44        Main constructor of MultiTax and sub-classes
 45
 46        Parameters:
 47        * **version** *[str]*: Version to download/parse or custom version name (with files/urls).
 48        * **files** *[str, list]*: One or more local files to parse.
 49        * **urls** *[str, list]*: One or more urls to download and parse.
 50        * **output_prefix** *[str]*: Directory to write downloaded files.
 51        * **root_node** *[str]*: Define an alternative root node.
 52        * **root_parent** *[str]*: Define the root parent node identifier.
 53        * **root_name** *[str]*: Define an alternative root name. Set to None to use original name.
 54        * **root_rank** *[str]*: Define an alternative root rank. Set to None to use original name.
 55        * **undefined_node** *[str]*: Define a default return value for undefined nodes.
 56        * **undefined_name** *[str]*: Define a default return value for undefined names.
 57        * **undefined_rank** *[str]*: Define a default return value for undefined ranks.
 58        * **build_node_children** *[bool]*: Build node,children dict (otherwise it will be created on first use).
 59        * **build_name_nodes** *[bool]*: Build name,nodes dict (otherwise it will be created on first use).
 60        * **build_rank_nodes** *[bool]*: Build rank,nodes dict (otherwise it will be created on first use).
 61        * **extended_names** *[bool]*: Parse extended names if available.
 62        * **empty** *[bool]*: Create an empty instance.
 63
 64        Example:
 65
 66            tax_ncbi = NcbiTx()
 67            tax_gtdb = GtdbTx(files=["file1.gz", "file2.txt"])
 68            tax_silva = SilvaTx(urls=["https://www.arb-silva.de/fileadmin/silva_databases/current/Exports/taxonomy/tax_slv_lsu_138.1.txt.gz"])
 69            tax_ott = OttTx(root_node="844192")
 70            tax_gg = GreengenesTx(output_prefix="save/to/prefix_")
 71        """
 72        if files:
 73            if isinstance(files, str):
 74                files = [files]
 75            for file in files:
 76                check_file(file)
 77
 78        if output_prefix:
 79            check_dir(output_prefix)
 80
 81        # Main structures
 82        self._nodes = {}
 83        self._ranks = {}
 84        self._names = {}
 85
 86        # Aux. structures
 87        self._lineages = {}
 88        self._name_nodes = {}
 89        self._node_children = {}
 90        self._rank_nodes = {}
 91        self._translated_nodes = {}
 92        self._lca = None
 93
 94        # Properties
 95        self.datetime = datetime.now()
 96        self.version = None
 97        self.undefined_node = undefined_node
 98        self.undefined_name = undefined_name
 99        self.undefined_rank = undefined_rank
100
101        # Set version
102        if files or urls:
103            self.version = version
104        else:
105            self.version = self._default_version if not version else version
106            if self.version not in self._supported_versions:
107                raise ValueError(
108                    f"Version [{self.version}] not supported (possible versions: {', '.join(self._supported_versions)}). To set a custom version, use files or urls."
109                )
110
111        # Store source of tax files (url or file)
112        self.sources = []
113
114        if not empty:
115            # Open/Download/Write files
116            fhs = {}
117            if files:
118                fhs = open_files(files)
119            elif urls or self._default_urls.get(self.version):
120                fhs = download_files(
121                    urls=urls if urls else self._default_urls[self.version],
122                    output_prefix=output_prefix,
123                    retry_attempts=3,
124                )
125
126            if fhs:
127                # Parse taxonomy
128                self._nodes, self._ranks, self._names = self._parse(
129                    fhs, extended_names=extended_names
130                )
131                close_files(fhs)
132                # Save sources for stats (files or urls)
133                self.sources = list(fhs.keys())
134
135        # Set root values
136        self._set_root_node(
137            root=root_node if root_node else self._default_root_node,
138            parent=root_parent,
139            name=root_name,
140            rank=root_rank,
141        )
142
143        # build auxiliary structures
144        if build_node_children:
145            self._node_children = reverse_dict(self._nodes)
146        if build_name_nodes:
147            self._name_nodes = reverse_dict(self._names)
148        if build_rank_nodes:
149            self._rank_nodes = reverse_dict(self._ranks)
150
151        self.check_consistency()

Main constructor of MultiTax and sub-classes

Parameters:

  • version [str]: Version to download/parse or custom version name (with files/urls).
  • files [str, list]: One or more local files to parse.
  • urls [str, list]: One or more urls to download and parse.
  • output_prefix [str]: Directory to write downloaded files.
  • root_node [str]: Define an alternative root node.
  • root_parent [str]: Define the root parent node identifier.
  • root_name [str]: Define an alternative root name. Set to None to use original name.
  • root_rank [str]: Define an alternative root rank. Set to None to use original name.
  • undefined_node [str]: Define a default return value for undefined nodes.
  • undefined_name [str]: Define a default return value for undefined names.
  • undefined_rank [str]: Define a default return value for undefined ranks.
  • build_node_children [bool]: Build node,children dict (otherwise it will be created on first use).
  • build_name_nodes [bool]: Build name,nodes dict (otherwise it will be created on first use).
  • build_rank_nodes [bool]: Build rank,nodes dict (otherwise it will be created on first use).
  • extended_names [bool]: Parse extended names if available.
  • empty [bool]: Create an empty instance.

Example:

tax_ncbi = NcbiTx()
tax_gtdb = GtdbTx(files=["file1.gz", "file2.txt"])
tax_silva = SilvaTx(urls=["https://www.arb-silva.de/fileadmin/silva_databases/current/Exports/taxonomy/tax_slv_lsu_138.1.txt.gz"])
tax_ott = OttTx(root_node="844192")
tax_gg = GreengenesTx(output_prefix="save/to/prefix_")
datetime
version
undefined_node
undefined_name
undefined_rank
sources
def add(self, node: str, parent: str, name: str = None, rank: str = None):
260    def add(self, node: str, parent: str, name: str = None, rank: str = None):
261        """
262        Adds node to taxonomy.
263        Deletes built lineages, translations and lca.
264        """
265        if parent not in self._nodes:
266            raise ValueError("Parent node [" + parent + "] not found.")
267        elif node in self._nodes:
268            raise ValueError("Node [" + node + "] already present.")
269
270        self._nodes[node] = parent
271        self._names[node] = name if name is not None else self.undefined_name
272        self._ranks[node] = rank if rank is not None else self.undefined_rank
273        self._reset_aux_data()

Adds node to taxonomy. Deletes built lineages, translations and lca.

def build_lca(self):
275    def build_lca(self):
276        """
277        Builds LCA structure based on pylca.
278        Optional function, LCA is built on first .lca() call.
279
280        Returns: None
281        """
282        self._lca = LCA(self._nodes)

Builds LCA structure based on pylca. Optional function, LCA is built on first .lca() call.

Returns: None

def build_lineages(self, root_node: str = None, ranks: list = None):
284    def build_lineages(self, root_node: str = None, ranks: list = None):
285        """
286        Stores lineages in memory for faster access.
287        It is valid for lineage(), rank_lineage() and name_lineage().
288        If keyword arguments (root_node, ranks) are used in those functions stored lineages are not used.
289
290        Returns: None
291        """
292        self.clear_lineages()
293        for node in self._nodes:
294            self._lineages[node] = self.lineage(
295                node=node, root_node=root_node, ranks=ranks
296            )

Stores lineages in memory for faster access. It is valid for lineage(), rank_lineage() and name_lineage(). If keyword arguments (root_node, ranks) are used in those functions stored lineages are not used.

Returns: None

def build_translation(self, tax, file: str = None, url: str = None):
298    def build_translation(self, tax, file: str = None, url: str = None):
299        """
300        Create a translation of current taxonomy to another
301
302        Parameters:
303
304        * **tax** [MultiTax]: A target taxonomy to be translated to.
305        * **file** *[str]*: Local file to parse.
306        * **url** *[str]*: Url to download and parse.
307
308        Example:
309
310            from multitax import GtdbTx, NcbiTx
311            gtdb_tax = GtdbTx()
312            ncbi_tax = NcbiTx()
313
314            # Automatically download translation files
315            gtdb_tax.build_translation(ncbi_tax)
316            gtdb_tax.translate("g__Escherichia")
317                {'1301', '547', '561', '570', '590', '620'}
318
319            # Using local file
320            ncbi_tax.build_translation(gtdb_tax, file="226_acc_rep_lin_ncbi.tsv.gz")
321            ncbi_tax.translate("620")
322                {'g__Escherichia', 'g__Proteus', 'g__Serratia'}
323        """
324        if file:
325            check_file(file)
326
327        self._translated_nodes = self._build_translation(tax, file, url)

Create a translation of current taxonomy to another

Parameters:

  • tax [MultiTax]: A target taxonomy to be translated to.
  • file [str]: Local file to parse.
  • url [str]: Url to download and parse.

Example:

from multitax import GtdbTx, NcbiTx
gtdb_tax = GtdbTx()
ncbi_tax = NcbiTx()

# Automatically download translation files
gtdb_tax.build_translation(ncbi_tax)
gtdb_tax.translate("g__Escherichia")
    {'1301', '547', '561', '570', '590', '620'}

# Using local file
ncbi_tax.build_translation(gtdb_tax, file="226_acc_rep_lin_ncbi.tsv.gz")
ncbi_tax.translate("620")
    {'g__Escherichia', 'g__Proteus', 'g__Serratia'}
def children(self, node: str):
329    def children(self, node: str):
330        """
331        Returns list of direct children nodes of a given node.
332        """
333        # Setup on first use
334        if not self._node_children:
335            self._node_children = reverse_dict(self._nodes)
336        if node in self._node_children:
337            return self._node_children[node]
338        else:
339            return []

Returns list of direct children nodes of a given node.

def check_consistency(self):
341    def check_consistency(self):
342        """
343        Checks consistency of the tree
344
345        Returns: raise an Exception otherwise None
346        """
347        if self.root_node not in self._nodes:
348            raise ValueError("Root node [" + self.root_node + "] not found.")
349        if self.root_parent in self._nodes:
350            raise ValueError(
351                "Root parent ["
352                + self.root_parent
353                + "] found but should not be on tree."
354            )
355        if self.undefined_node in self._nodes:
356            raise ValueError(
357                "Undefined node ["
358                + self.undefined_node
359                + "] found but should not be on tree."
360            )
361
362        # Difference between values and keys should be only root_parent
363        lost_nodes = set(self._nodes.values()).difference(self._nodes)
364        if self.root_parent not in lost_nodes:
365            raise ValueError(
366                "Root parent [" + self.root_parent + "] not properly defined."
367            )
368        # Remove root_parent from lost nodes to report only missing
369        lost_nodes.remove(self.root_parent)
370        if len(lost_nodes) > 0:
371            raise ValueError("Parent nodes missing: " + ",".join(lost_nodes))
372
373        return None

Checks consistency of the tree

Returns: raise an Exception otherwise None

def clear_lca(self):
375    def clear_lca(self):
376        """
377        Clear built LCA.
378
379        Returns: None
380        """
381        self._lca = None

Clear built LCA.

Returns: None

def clear_lineages(self):
383    def clear_lineages(self):
384        """
385        Clear built lineages.
386
387        Returns: None
388        """
389        self._lineages = {}

Clear built lineages.

Returns: None

def closest_parent(self, node: str, ranks: str):
391    def closest_parent(self, node: str, ranks: str):
392        """
393        Returns the closest parent node based on a defined list of ranks
394        """
395        # Rank of node is already on the list
396        if self.rank(node) in ranks:
397            return node
398        else:
399            # check lineage from back to front until find a valid node
400            for n in self.lineage(node, ranks=ranks)[::-1]:
401                if n != self.undefined_node:
402                    return n
403        # nothing found
404        return self.undefined_node

Returns the closest parent node based on a defined list of ranks

def filter(self, nodes: list, desc: bool = False):
406    def filter(self, nodes: list, desc: bool = False):
407        """
408        Filters taxonomy given a list of nodes.
409        By default keep all the ancestors of the given nodes.
410        If desc=True, keep all descendants instead.
411        Deletes built lineages, translations and lca.
412
413        Example:
414
415            from multitax import GtdbTx
416            tax = GtdbTx()
417
418            tax.lineage('s__Enterovibrio marina')
419            # ['1', 'd__Bacteria', 'p__Proteobacteria', 'c__Gammaproteobacteria', 'o__Enterobacterales', 'f__Vibrionaceae', 'g__Enterovibrio', 's__Enterovibrio marina']
420            # Keep only ancestors of 'g__Enterovibrio'
421            tax.filter('g__Enterovibrio')
422
423            # Reload taxonomy
424            tax = GtdbTx()
425            # Keep only descendants of 'g__Enterovibrio'
426            tax.filter('g__Enterovibrio', desc=True)
427        """
428        if isinstance(nodes, str):
429            nodes = [nodes]
430
431        # Keep track of nodes to be filtered out
432        filtered_nodes = set(self._nodes)
433        # Always keep root
434        filtered_nodes.discard(self.root_node)
435
436        if desc:
437            # Keep descendants of the given nodes
438            for node in nodes:
439                # Check if node exists (skips root)
440                if node in filtered_nodes:
441                    # For each leaf of the selected nodes
442                    for leaf in self.leaves(node):
443                        # Build lineage of each leaf up-to node itself
444                        for n in self.lineage(leaf, root_node=node):
445                            # Discard nodes from set to be kept
446                            filtered_nodes.discard(n)
447                    # Link node to root
448                    self._nodes[node] = self.root_node
449        else:
450            # Keep ancestors of the given nodes (full lineage up-to root)
451            for node in nodes:
452                # ranks=[] in case build_lineages() was used with specific ranks
453                for n in self.lineage(node, ranks=[]):
454                    # Discard nodes from set to be kept
455                    filtered_nodes.discard(n)
456
457        # Delete filtered nodes
458        for node in filtered_nodes:
459            self._remove(node)
460
461        # Delete aux. data structures
462        self._reset_aux_data()
463        self.check_consistency()

Filters taxonomy given a list of nodes. By default keep all the ancestors of the given nodes. If desc=True, keep all descendants instead. Deletes built lineages, translations and lca.

Example:

from multitax import GtdbTx
tax = GtdbTx()

tax.lineage('s__Enterovibrio marina')
# ['1', 'd__Bacteria', 'p__Proteobacteria', 'c__Gammaproteobacteria', 'o__Enterobacterales', 'f__Vibrionaceae', 'g__Enterovibrio', 's__Enterovibrio marina']
# Keep only ancestors of 'g__Enterovibrio'
tax.filter('g__Enterovibrio')

# Reload taxonomy
tax = GtdbTx()
# Keep only descendants of 'g__Enterovibrio'
tax.filter('g__Enterovibrio', desc=True)
@classmethod
def from_customtx(cls, ctx):
465    @classmethod
466    def from_customtx(cls, ctx):
467        """
468        Initialize a Tx sub-class based on a CustomTx instance.
469
470        Example:
471
472            tax_custom = CustomTx(version="custom_ncbi_files", files="my_custom_tax.tsv", cols=["node","parent","rank"])
473            tax_ncbi = NcbiTx.from_customtx(tax_custom)
474        """
475        nc = cls(empty=True)
476        nc.version = ctx.version
477        nc.sources = ctx.sources
478        nc._nodes = ctx._nodes
479        nc._names = ctx._names
480        nc._ranks = ctx._ranks
481        return nc

Initialize a Tx sub-class based on a CustomTx instance.

Example:

tax_custom = CustomTx(version="custom_ncbi_files", files="my_custom_tax.tsv", cols=["node","parent","rank"])
tax_ncbi = NcbiTx.from_customtx(tax_custom)
def latest(self, node: str):
483    def latest(self, node: str):
484        """
485        Returns latest/updated version of a given node.
486        If node is already the latests, returns itself.
487        Mainly used for NCBI (merged.dmp) and OTT (forwards.tsv)
488        """
489        if node in self._nodes:
490            return node
491        else:
492            return self.undefined_node

Returns latest/updated version of a given node. If node is already the latests, returns itself. Mainly used for NCBI (merged.dmp) and OTT (forwards.tsv)

def leaves(self, node: str = None):
494    def leaves(self, node: str = None):
495        """
496        Returns a list of leaf nodes of a given node.
497        """
498        if node is None or node == self.root_node:
499            # Leaves are nodes not contained in _nodes.values() ("parents")
500            return list(set(self._nodes).difference(self._nodes.values()))
501        elif node in self._nodes:
502            return self._recurse_leaves(node)
503        else:
504            return []

Returns a list of leaf nodes of a given node.

def lca(self, nodes: list = None):
506    def lca(self, nodes: list = None):
507        """
508        Returns the lowest common ancestor of two or more nodes.
509
510        Example:
511
512            from multitax import GtdbTx
513            tax = GtdbTx()
514            tax.lca(["s__Escherichia coli", "s__Escherichia fergusonii"])
515        """
516        for node in nodes:
517            if node not in self._nodes:
518                raise ValueError("Node [" + node + "] not found.")
519
520        # Setup on first use
521        if not self._lca:
522            self.build_lca()
523
524        return self._lca(*nodes)

Returns the lowest common ancestor of two or more nodes.

Example:

from multitax import GtdbTx
tax = GtdbTx()
tax.lca(["s__Escherichia coli", "s__Escherichia fergusonii"])
def lineage(self, node: str, root_node: str = None, ranks: list = None):
526    def lineage(self, node: str, root_node: str = None, ranks: list = None):
527        """
528        Returns a list with the lineage of a given node.
529        If ranks is provided, returns only nodes annotated with such ranks.
530        If root_node is provided, use it instead of default root of tree.
531        """
532        # If lineages were built with build_lineages() with matching params
533        if node in self._lineages and root_node is None and ranks is None:
534            return self._lineages[node]
535        else:
536            if not root_node:
537                root_node = self.root_node
538
539            n = node
540            if ranks:
541                # Fixed length lineage
542                lin = [self.undefined_node] * len(ranks)
543                # Loop until end of the tree (in case chosen root is not on lineage)
544                while n != self.undefined_node:
545                    r = self.rank(n)
546                    if r in ranks:
547                        lin[ranks.index(r)] = n
548                    # If node is root, break (after adding)
549                    if n == root_node:
550                        break
551                    n = self.parent(n)
552            else:
553                # Full lineage
554                lin = []
555                # Loop until end of the tree (in case chosen root is not on lineage)
556                while n != self.undefined_node:
557                    lin.append(n)
558                    # If node is root, break (after adding)
559                    if n == root_node:
560                        break
561                    n = self.parent(n)
562                # Reverse order
563                lin = lin[::-1]
564
565            # last iteration node (n) != root_node: didn't find the root, invalid lineage
566            if n != root_node:
567                return []
568            else:
569                return lin

Returns a list with the lineage of a given node. If ranks is provided, returns only nodes annotated with such ranks. If root_node is provided, use it instead of default root of tree.

def name(self, node: str):
571    def name(self, node: str):
572        """
573        Returns name of a given node.
574        """
575        if node in self._names:
576            return self._names[node]
577        else:
578            return self.undefined_name

Returns name of a given node.

def name_lineage(self, node: str, root_node: str = None, ranks: list = None):
580    def name_lineage(self, node: str, root_node: str = None, ranks: list = None):
581        """
582        Returns a list with the name lineage of a given node.
583        """
584        return list(
585            map(self.name, self.lineage(node=node, root_node=root_node, ranks=ranks))
586        )

Returns a list with the name lineage of a given node.

def nodes_rank(self, rank: str):
588    def nodes_rank(self, rank: str):
589        """
590        Returns list of nodes of a given rank.
591        """
592        # Setup on first use
593        if not self._rank_nodes:
594            self._rank_nodes = reverse_dict(self._ranks)
595        if rank in self._rank_nodes:
596            return self._rank_nodes[rank]
597        else:
598            return []

Returns list of nodes of a given rank.

def parent(self, node: str):
600    def parent(self, node: str):
601        """
602        Returns the direct parent node of a given node.
603        """
604        if node in self._nodes:
605            return self._nodes[node]
606        else:
607            return self.undefined_node

Returns the direct parent node of a given node.

def parent_rank(self, node: str, rank: str):
609    def parent_rank(self, node: str, rank: str):
610        """
611        Returns the parent node of a given rank in the specified rank.
612        """
613        parent = self.lineage(node=node, ranks=[rank])
614        return parent[0] if parent else self.undefined_node

Returns the parent node of a given rank in the specified rank.

def prune(self, nodes: list):
616    def prune(self, nodes: list):
617        """
618        Prunes branches of the tree under the given nodes.
619        Deletes built lineages, translations and lca.
620        """
621
622        if isinstance(nodes, str):
623            nodes = [nodes]
624
625        del_nodes = set()
626        for node in nodes:
627            if node not in self._nodes:
628                raise ValueError("Node [" + node + "] not found.")
629            for leaf in self.leaves(node):
630                for n in self.lineage(leaf, root_node=node)[1:]:
631                    del_nodes.add(n)
632
633        for n in del_nodes:
634            self._remove(n)
635
636        self._reset_aux_data()

Prunes branches of the tree under the given nodes. Deletes built lineages, translations and lca.

def rank(self, node: str):
638    def rank(self, node: str):
639        """
640        Returns the rank of a given node.
641        """
642        if node in self._ranks:
643            return self._ranks[node]
644        else:
645            return self.undefined_rank

Returns the rank of a given node.

def rank_lineage(self, node: str, root_node: str = None, ranks: list = None):
647    def rank_lineage(self, node: str, root_node: str = None, ranks: list = None):
648        """
649        Returns a list with the rank lineage of a given node.
650        """
651        return list(
652            map(self.rank, self.lineage(node=node, root_node=root_node, ranks=ranks))
653        )

Returns a list with the rank lineage of a given node.

def remove(self, node: str, check_consistency: bool = False):
655    def remove(self, node: str, check_consistency: bool = False):
656        """
657        Removes node from taxonomy. Can break the tree if a parent node is removed. To remove a certain branch, use prune.
658        Running check consistency after removing a node is recommended.
659        Deletes built lineages, translations and lca.
660        """
661        if node not in self._nodes:
662            raise ValueError("Node [" + node + "] not found.")
663        self._remove(node)
664        self._reset_aux_data()
665        if check_consistency:
666            self.check_consistency()

Removes node from taxonomy. Can break the tree if a parent node is removed. To remove a certain branch, use prune. Running check consistency after removing a node is recommended. Deletes built lineages, translations and lca.

def search_name(self, text: str, rank: str = None, exact: bool = True):
668    def search_name(self, text: str, rank: str = None, exact: bool = True):
669        """
670        Search node by exact or partial name
671
672        Parameters:
673        * **text** *[str]*: Text to search.
674        * **rank** *[str]*: Filter results by rank.
675        * **exact** *[bool]*: Exact or partial name search (both case sensitive).
676
677        Returns: list of matching nodes
678        """
679        # Setup on first use
680        if not self._name_nodes:
681            self._name_nodes = reverse_dict(self._names)
682
683        if exact:
684            ret = self._exact_name(text, self._name_nodes)
685        else:
686            ret = self._partial_name(text, self._name_nodes)
687
688        # Only return nodes of chosen rank
689        if rank:
690            return filter_function(ret, self.rank, rank)
691        else:
692            return ret

Search node by exact or partial name

Parameters:

  • text [str]: Text to search.
  • rank [str]: Filter results by rank.
  • exact [bool]: Exact or partial name search (both case sensitive).

Returns: list of matching nodes

def stats(self):
694    def stats(self):
695        """
696        Returns a dict with general numbers of the taxonomic tree
697
698        Example:
699
700            from pprint import pprint
701            from multitax import GtdbTx
702            tax = GtdbTx()
703
704            pprint(tax.stats())
705            {'leaves': 30238,
706             'names': 42739,
707             'nodes': 42739,
708             'ranked_leaves': Counter({'species': 30238}),
709             'ranked_nodes': Counter({'species': 30238,
710                                      'genus': 8778,
711                                      'family': 2323,
712                                      'order': 930,
713                                      'class': 337,
714                                      'phylum': 131,
715                                      'domain': 1,
716                                      'root': 1}),
717             'ranks': 42739}
718        """
719        s = {}
720        s["nodes"] = len(self._nodes)
721        s["ranks"] = len(self._ranks)
722        s["names"] = len(self._names)
723        all_leaves = self.leaves(self.root_node)
724        s["leaves"] = len(all_leaves)
725        s["ranked_nodes"] = Counter(self._ranks.values())
726        s["ranked_leaves"] = Counter(map(self.rank, all_leaves))
727        return s

Returns a dict with general numbers of the taxonomic tree

Example:

from pprint import pprint
from multitax import GtdbTx
tax = GtdbTx()

pprint(tax.stats())
{'leaves': 30238,
 'names': 42739,
 'nodes': 42739,
 'ranked_leaves': Counter({'species': 30238}),
 'ranked_nodes': Counter({'species': 30238,
                          'genus': 8778,
                          'family': 2323,
                          'order': 930,
                          'class': 337,
                          'phylum': 131,
                          'domain': 1,
                          'root': 1}),
 'ranks': 42739}
def translate(self, node: str):
729    def translate(self, node: str):
730        """
731        Returns the translated node from another taxonomy. Translated nodes are generated with the build_translation function.
732        """
733        if node in self._translated_nodes:
734            return self._translated_nodes[node]
735        else:
736            return []

Returns the translated node from another taxonomy. Translated nodes are generated with the build_translation function.

def write( self, output_file: str, cols: list = ['node', 'parent', 'rank', 'name'], sep: str = '\t', sep_multi: str = '|', ranks: list = None, gz: bool = False):
738    def write(
739        self,
740        output_file: str,
741        cols: list = ["node", "parent", "rank", "name"],
742        sep: str = "\t",
743        sep_multi: str = "|",
744        ranks: list = None,
745        gz: bool = False,
746    ):
747        """
748        Writes loaded taxonomy to a file.
749
750        Parameters:
751        * **cols** *[list]*: Options: "node", "latest", "parent", "rank", "name", "leaves", "children", "lineage", "rank_lineage", "name_lineage"
752        * **sep** *[str]*: Separator of fields
753        * **sep_multi** *[str]*: Separator of multi-valued fields
754        * **ranks** *[list]*: Ranks to report
755        * **gz** *[bool]*: Gzip output
756
757        Returns: None
758        """
759        import gzip
760
761        if gz:
762            output_file = (
763                output_file if output_file.endswith(".gz") else output_file + ".gz"
764            )
765            check_no_file(output_file)
766            outf = gzip.open(output_file, "wt")
767        else:
768            check_no_file(output_file)
769            outf = open(output_file, "w")
770
771        write_field = {
772            "node": lambda node: node,
773            "latest": self.latest,
774            "parent": self.parent,
775            "rank": self.rank,
776            "name": self.name,
777            "leaves": lambda node: join_check(self.leaves(node), sep_multi),
778            "children": lambda node: join_check(self.children(node), sep_multi),
779            "lineage": lambda node: join_check(
780                self.lineage(node, ranks=ranks), sep_multi
781            ),
782            "rank_lineage": lambda node: join_check(
783                self.rank_lineage(node, ranks=ranks), sep_multi
784            ),
785            "name_lineage": lambda node: join_check(
786                self.name_lineage(node, ranks=ranks), sep_multi
787            ),
788        }
789
790        for c in cols:
791            if c not in write_field:
792                raise ValueError(
793                    "Field [" + c + "] is not valid. Options: " + ",".join(write_field)
794                )
795
796        if ranks:
797            for rank in ranks:
798                for node in self.nodes_rank(rank):
799                    print(
800                        *[write_field[c](node) for c in cols],
801                        sep=sep,
802                        end="\n",
803                        file=outf,
804                    )
805        else:
806            for node in self._nodes:
807                print(
808                    *[write_field[c](node) for c in cols], sep=sep, end="\n", file=outf
809                )
810
811        outf.close()

Writes loaded taxonomy to a file.

Parameters:

  • cols [list]: Options: "node", "latest", "parent", "rank", "name", "leaves", "children", "lineage", "rank_lineage", "name_lineage"
  • sep [str]: Separator of fields
  • sep_multi [str]: Separator of multi-valued fields
  • ranks [list]: Ranks to report
  • gz [bool]: Gzip output

Returns: None