Coverage for /home/deng/Projects/ete4/hackathon/ete4/ete4/parser/fasta.py: 63%

59 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-03-21 09:19 +0100

1import os 

2import textwrap 

3from sys import stderr as STDERR 

4 

5from ete4.core import seqgroup 

6 

7 

8def read_fasta(source, obj=None, header_delimiter="\t", fix_duplicates=True): 

9 """ Reads a collection of sequences econded in FASTA format.""" 

10 

11 if obj is None: 

12 SC = seqgroup.SeqGroup() 

13 else: 

14 SC = obj 

15 

16 names = set() 

17 seq_id = -1 

18 

19 # Prepares handle from which read sequences 

20 if os.path.isfile(source): 

21 if source.endswith('.gz'): 

22 import gzip 

23 _source = gzip.open(source) 

24 else: 

25 _source = open(source, "r") 

26 else: 

27 _source = iter(source.split("\n")) 

28 

29 seq_name = None 

30 for line in _source: 

31 line = line.strip() 

32 if line.startswith('#') or not line: 

33 continue 

34 # Reads seq number 

35 elif line.startswith('>'): 

36 # Checks if previous name had seq 

37 if seq_id>-1 and SC.id2seq[seq_id] == "": 

38 raise Exception("No sequence found for "+seq_name) 

39 

40 seq_id += 1 

41 # Takes header info 

42 seq_header_fields = [_f.strip() for _f in line[1:].split(header_delimiter)] 

43 seq_name = seq_header_fields[0] 

44 

45 # Checks for duplicated seq names 

46 if fix_duplicates and seq_name in names: 

47 tag = str(len([k for k in list(SC.name2id.keys()) if k.endswith(seq_name)])) 

48 old_name = seq_name 

49 seq_name = tag+"_"+seq_name 

50 print("Duplicated entry [%s] was renamed to [%s]" %(old_name, seq_name), file=STDERR) 

51 

52 # stores seq_name 

53 SC.id2seq[seq_id] = "" 

54 SC.id2name[seq_id] = seq_name 

55 SC.name2id[seq_name] = seq_id 

56 SC.id2comment[seq_id] = seq_header_fields[1:] 

57 names.add(seq_name) 

58 

59 else: 

60 if seq_name is None: 

61 raise Exception("Error reading sequences: Wrong format.") 

62 

63 # removes all white spaces in line 

64 s = line.strip().replace(" ","") 

65 

66 # append to seq_string 

67 SC.id2seq[seq_id] += s 

68 

69 if os.path.isfile(source): 

70 _source.close() 

71 

72 if seq_name and SC.id2seq[seq_id] == "": 

73 print(seq_name,"has no sequence", file=STDERR) 

74 return None 

75 

76 # Everything ok 

77 return SC 

78 

79def write_fasta(sequences, outfile = None, seqwidth = 80): 

80 """ Writes a SeqGroup python object using FASTA format. """ 

81 

82 wrapper = textwrap.TextWrapper() 

83 wrapper.break_on_hyphens = False 

84 wrapper.replace_whitespace = False 

85 wrapper.expand_tabs = False 

86 wrapper.break_long_words = True 

87 wrapper.width = 80 

88 text = '\n'.join([">%s\n%s\n" %( "\t".join([name]+comment), wrapper.fill(seq)) for 

89 name, seq, comment in sequences]) 

90 

91 if outfile is not None: 

92 with open(outfile, 'w') as fout: 

93 fout.write(text) 

94 else: 

95 return text