Coverage for /home/deng/Projects/ete4/hackathon/ete4/ete4/parser/fasta.py: 63%
59 statements
« prev ^ index » next coverage.py v7.2.7, created at 2024-03-21 09:19 +0100
« prev ^ index » next coverage.py v7.2.7, created at 2024-03-21 09:19 +0100
1import os
2import textwrap
3from sys import stderr as STDERR
5from ete4.core import seqgroup
8def read_fasta(source, obj=None, header_delimiter="\t", fix_duplicates=True):
9 """ Reads a collection of sequences econded in FASTA format."""
11 if obj is None:
12 SC = seqgroup.SeqGroup()
13 else:
14 SC = obj
16 names = set()
17 seq_id = -1
19 # Prepares handle from which read sequences
20 if os.path.isfile(source):
21 if source.endswith('.gz'):
22 import gzip
23 _source = gzip.open(source)
24 else:
25 _source = open(source, "r")
26 else:
27 _source = iter(source.split("\n"))
29 seq_name = None
30 for line in _source:
31 line = line.strip()
32 if line.startswith('#') or not line:
33 continue
34 # Reads seq number
35 elif line.startswith('>'):
36 # Checks if previous name had seq
37 if seq_id>-1 and SC.id2seq[seq_id] == "":
38 raise Exception("No sequence found for "+seq_name)
40 seq_id += 1
41 # Takes header info
42 seq_header_fields = [_f.strip() for _f in line[1:].split(header_delimiter)]
43 seq_name = seq_header_fields[0]
45 # Checks for duplicated seq names
46 if fix_duplicates and seq_name in names:
47 tag = str(len([k for k in list(SC.name2id.keys()) if k.endswith(seq_name)]))
48 old_name = seq_name
49 seq_name = tag+"_"+seq_name
50 print("Duplicated entry [%s] was renamed to [%s]" %(old_name, seq_name), file=STDERR)
52 # stores seq_name
53 SC.id2seq[seq_id] = ""
54 SC.id2name[seq_id] = seq_name
55 SC.name2id[seq_name] = seq_id
56 SC.id2comment[seq_id] = seq_header_fields[1:]
57 names.add(seq_name)
59 else:
60 if seq_name is None:
61 raise Exception("Error reading sequences: Wrong format.")
63 # removes all white spaces in line
64 s = line.strip().replace(" ","")
66 # append to seq_string
67 SC.id2seq[seq_id] += s
69 if os.path.isfile(source):
70 _source.close()
72 if seq_name and SC.id2seq[seq_id] == "":
73 print(seq_name,"has no sequence", file=STDERR)
74 return None
76 # Everything ok
77 return SC
79def write_fasta(sequences, outfile = None, seqwidth = 80):
80 """ Writes a SeqGroup python object using FASTA format. """
82 wrapper = textwrap.TextWrapper()
83 wrapper.break_on_hyphens = False
84 wrapper.replace_whitespace = False
85 wrapper.expand_tabs = False
86 wrapper.break_long_words = True
87 wrapper.width = 80
88 text = '\n'.join([">%s\n%s\n" %( "\t".join([name]+comment), wrapper.fill(seq)) for
89 name, seq, comment in sequences])
91 if outfile is not None:
92 with open(outfile, 'w') as fout:
93 fout.write(text)
94 else:
95 return text