Coverage for /home/deng/Projects/ete4/hackathon/ete4/ete4/parser/phylip.py: 5%

126 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-03-21 09:19 +0100

1import os 

2import re 

3from sys import stderr as STDERR 

4 

5from ete4.core import seqgroup 

6 

7 

8def read_phylip(source, interleaved=True, obj=None, 

9 relaxed=False, fix_duplicates=True): 

10 if obj is None: 

11 SG = seqgroup.SeqGroup() 

12 else: 

13 SG = obj 

14 

15 # Prepares handle from which read sequences 

16 if os.path.isfile(source): 

17 if source.endswith('.gz'): 

18 import gzip 

19 _source = gzip.open(source) 

20 else: 

21 _source = open(source, "r") 

22 else: 

23 _source = iter(source.split("\n")) 

24 

25 nchar, ntax = None, None 

26 counter = 0 

27 id_counter = 0 

28 for line in _source: 

29 line = line.strip("\n") 

30 # Passes comments and blank lines 

31 if not line or line[0] == "#": 

32 continue 

33 # Reads head 

34 if not nchar or not ntax: 

35 m = re.match(r"^\s*(\d+)\s+(\d+)",line) 

36 if m: 

37 ntax = int (m.groups()[0]) 

38 nchar = int (m.groups()[1]) 

39 else: 

40 raise Exception("A first line with the alignment dimension is required") 

41 # Reads sequences 

42 else: 

43 if not interleaved: 

44 # Reads names and sequences 

45 if SG.id2name.get(id_counter, None) is None: 

46 if relaxed: 

47 m = re.match("^([^ ]+)(.+)", line) 

48 else: 

49 m = re.match("^(.{10})(.+)", line) 

50 if m: 

51 name = m.groups()[0].strip() 

52 if fix_duplicates and name in SG.name2id: 

53 tag = str(len([k for k in list(SG.name2id.keys()) \ 

54 if k.endswith(name)])) 

55 old_name = name 

56 # Tag is in the beginning to avoid being 

57 # cut it by the 10 chars limit 

58 name = tag+"_"+name 

59 print("Duplicated entry [%s] was renamed to [%s]" %\ 

60 (old_name, name), file=STDERR) 

61 SG.id2name[id_counter] = name 

62 SG.name2id[name] = id_counter 

63 SG.id2seq[id_counter] = "" 

64 line = m.groups()[1] 

65 else: 

66 raise Exception("Wrong phylip sequencial format.") 

67 SG.id2seq[id_counter] += re.sub(r"\s","", line) 

68 if len(SG.id2seq[id_counter]) == nchar: 

69 id_counter += 1 

70 name = None 

71 elif len(SG.id2seq[id_counter]) > nchar: 

72 raise Exception("Unexpected length of sequence [%s] [%s]." %(name,SG.id2seq[id_counter])) 

73 else: 

74 if len(SG)<ntax: 

75 if relaxed: 

76 m = re.match("^([^ ]+)(.+)", line) 

77 else: 

78 m = re.match("^(.{10})(.+)",line) 

79 if m: 

80 name = m.groups()[0].strip() 

81 

82 seq = re.sub(r"\s","",m.groups()[1]) 

83 SG.id2seq[id_counter] = seq 

84 SG.id2name[id_counter] = name 

85 if fix_duplicates and name in SG.name2id: 

86 tag = str(len([k for k in list(SG.name2id.keys()) \ 

87 if k.endswith(name)])) 

88 old_name = name 

89 name = tag+"_"+name 

90 print("Duplicated entry [%s] was renamed to [%s]" %\ 

91 (old_name, name), file=STDERR) 

92 SG.name2id[name] = id_counter 

93 id_counter += 1 

94 else: 

95 raise Exception("Unexpected number of sequences.") 

96 else: 

97 seq = re.sub(r"\s", "", line) 

98 if id_counter == len(SG): 

99 id_counter = 0 

100 SG.id2seq[id_counter] += seq 

101 id_counter += 1 

102 

103 if os.path.isfile(source): 

104 _source.close() 

105 

106 if len(SG) != ntax: 

107 raise Exception("Unexpected number of sequences.") 

108 

109 # Check lenght of all seqs 

110 for i in list(SG.id2seq.keys()): 

111 if len(SG.id2seq[i]) != nchar: 

112 raise Exception("Unexpected lenght of sequence [%s]" %SG.id2name[i]) 

113 

114 return SG 

115 

116def write_phylip(aln, outfile=None, interleaved=True, relaxed=False): 

117 width = 60 

118 seq_visited = set([]) 

119 

120 show_name_warning = False 

121 lenghts = set((len(seq) for seq in list(aln.id2seq.values()))) 

122 if len(lenghts) >1: 

123 raise Exception("Phylip format requires sequences of equal lenght.") 

124 seqlength = lenghts.pop() 

125 

126 if not relaxed: 

127 name_fix = 10 

128 else: 

129 name_fix = max([len(name) for name in list(aln.id2name.values())]) 

130 

131 alg_lines = [] 

132 alg_text = " %d %d" %(len(aln), seqlength) 

133 alg_lines.append(alg_text) 

134 if interleaved: 

135 visited = set([]) 

136 for i in range(0, seqlength, width): 

137 for j in aln.id2name.keys(): 

138 name = aln.id2name[j] 

139 if not relaxed and len(name)>name_fix: 

140 name = name[:name_fix] 

141 show_name_warning = True 

142 

143 seq = aln.id2seq[j][i:i+width] 

144 if j not in visited: 

145 name_str = "%s " %name.ljust(name_fix) 

146 visited.add(j) 

147 else: 

148 name_str = "".ljust(name_fix+3) 

149 

150 seq_str = ' '.join([seq[k:k+10] for k in range(0, len(seq), 10)]) 

151 line_str = "%s%s" %(name_str, seq_str) 

152 alg_lines.append(line_str) 

153 alg_lines.append("") 

154 else: 

155 for name, seq, comments in aln.iter_entries(): 

156 if not relaxed and len(name)>10: 

157 name = name[:name_fix] 

158 show_name_warning = True 

159 line_str = "%s %s\n%s" %\ 

160 (name.ljust(name_fix), seq[0:width-name_fix-3], '\n'.join([seq[k:k+width] \ 

161 for k in range(width-name_fix-3, len(seq), width)])) 

162 alg_lines.append(line_str) 

163 alg_lines.append("") 

164 

165 

166 if show_name_warning: 

167 print("Warning! Some sequence names were cut to 10 characters!!", file=STDERR) 

168 alg_text = '\n'.join(alg_lines) 

169 if outfile is not None: 

170 OUT = open(outfile, "w") 

171 OUT.write(alg_text) 

172 OUT.close() 

173 else: 

174 return alg_text