Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import re 

2import random 

3from pathlib import Path 

4from typing import Union, Iterable 

5import logging 

6 

7from pyteomics import mzml 

8import pandas as pd 

9from tqdm.auto import tqdm 

10 

11import elfragmentador 

12import elfragmentador.constants as CONSTANTS 

13from elfragmentador.spectra import Spectrum 

14from elfragmentador.model import PepTransformerModel 

15 

16import torch 

17from torch.nn.functional import cosine_similarity 

18 

19import warnings 

20 

21 

22# TODO split addition of metadata and actual predictions to separate functions to 

23 

24 

25def append_preds( 

26 in_pin: Union[Path, str], out_pin: Union[Path, str], model: PepTransformerModel 

27) -> pd.DataFrame: 

28 """Append cosine similarity to prediction to a percolator input 

29 

30 Args: 

31 in_pin (Union[Path, str]): Input Percolator file location 

32 out_pin (Union[Path, str]): Output percolator file location 

33 model (PepTransformerModel): Transformer model to use 

34 

35 Returns: 

36 pd.DataFrame: Pandas data frame with the appended column 

37 """ 

38 

39 warnings.filterwarnings( 

40 "ignore", 

41 ".*peaks were annotated for spectra.*", 

42 ) 

43 

44 # Read pin 

45 NUM_COLUMNS = 28 

46 

47 # The appendix is in the form of _SpecNum_Charge_ID 

48 regex_file_appendix = re.compile("_\d+_\d+_\d+$") 

49 appendix_charge_regex = re.compile("(?<=_)\d+(?=_)") 

50 dot_re = re.compile("(?<=\.).*(?=\..*$)") 

51 template_string = "controllerType=0 controllerNumber=1 scan={SCAN_NUMBER}" 

52 in_pin_path = Path(in_pin) 

53 

54 df = pd.read_csv( 

55 in_pin, 

56 sep="\t", 

57 index_col=False, 

58 usecols=list(range(NUM_COLUMNS)), 

59 ) 

60 # TODO fix so the last column remains unchanged, right now it keeps 

61 # only the first protein because the field is not quoted in comet 

62 # outputs 

63 

64 logging.info(df) 

65 

66 """ 

67 Col names should be: 

68 SpecId 

69 Label 

70 ScanNr 

71 ExpMass 

72 CalcMass 

73 Peptide 

74 mokapot score 

75 mokapot q-value 

76 mokapot PEP 

77 Proteins 

78 """ 

79 

80 # This would allow skipping several reads on the file (which is fairly quick) 

81 # df = df.sort_values(by=['SpecId', 'ScanNr']).reset_index(drop=True).copy() 

82 # This would allow skipping several predictions... which right now are fairly slow 

83 df = df.sort_values(by=["Peptide", "CalcMass"]).reset_index(drop=True).copy() 

84 df.insert(loc=NUM_COLUMNS - 2, column="SpecCorrelation", value=0) 

85 

86 mzml_readers = {} 

87 scan_id = None 

88 last_seq = None 

89 last_charge = None 

90 last_nce = None 

91 outs = [] 

92 

93 tqdm_postfix = { 

94 "cached_reads": 0, 

95 "cached_predictions": 0, 

96 "predictions": 0, 

97 } 

98 

99 tqdm_iter = tqdm(df.iterrows(), total=len(df)) 

100 

101 # TODO check if batching would improve inference speed 

102 for index, row in tqdm_iter: 

103 row_rawfile = re.sub(regex_file_appendix, "", row.SpecId) 

104 row_appendix = regex_file_appendix.search(row.SpecId)[0] 

105 

106 curr_charge = int(appendix_charge_regex.search(row_appendix, 2)[0]) 

107 peptide_sequence = dot_re.search(row.Peptide)[0] 

108 

109 rawfile_path = Path(in_pin_path.parent / (row_rawfile + ".mzML")) 

110 assert rawfile_path.is_file(), f"{rawfile_path} does not exist" 

111 

112 if mzml_readers.get(str(rawfile_path), None) is None: 

113 mzml_readers[str(rawfile_path)] = mzml.PreIndexedMzML(str(rawfile_path)) 

114 

115 old_scan_id = scan_id 

116 scan_id = template_string.format(SCAN_NUMBER=row.ScanNr) 

117 

118 if old_scan_id != scan_id: 

119 # read_spectrum 

120 curr_scan = mzml_readers[str(rawfile_path)].get_by_id(scan_id) 

121 nce = curr_scan["precursorList"]["precursor"][0]["selectedIonList"][ 

122 "selectedIon" 

123 ][0]["charge state"] 

124 else: 

125 

126 tqdm_postfix["cached_reads"] += 1 

127 tqdm_iter.set_postfix(tqdm_postfix) 

128 

129 # convert spectrum to model "output" 

130 curr_spec_object = Spectrum( 

131 sequence=peptide_sequence, 

132 parent_mz=row.ExpMass, 

133 charge=curr_charge, 

134 mzs=curr_scan["m/z array"], 

135 intensities=curr_scan["intensity array"], 

136 nce=nce, 

137 ) 

138 

139 # predict spectrum 

140 with torch.no_grad(): 

141 if ( 

142 last_seq != peptide_sequence 

143 or last_charge != curr_charge 

144 or last_nce != nce 

145 ): 

146 last_seq = peptide_sequence 

147 last_charge = curr_charge 

148 last_nce = nce 

149 

150 pred_irt, pred_spec = model.predict_from_seq( 

151 seq=peptide_sequence, 

152 charge=curr_charge, 

153 nce=nce, 

154 ) 

155 pred_spec = torch.stack([pred_spec]) 

156 tqdm_postfix["predictions"] += 1 

157 tqdm_iter.set_postfix(tqdm_postfix) 

158 else: 

159 tqdm_postfix["cached_predictions"] += 1 

160 tqdm_iter.set_postfix(tqdm_postfix) 

161 

162 # Get ground truth spectrum 

163 try: 

164 gt_spec = torch.stack([torch.Tensor(curr_spec_object.encode_spectra())]) 

165 

166 except AssertionError as e: 

167 if "No peaks were annotated in this spectrum" in str(e): 

168 gt_spec = torch.zeros_like(pred_spec) 

169 

170 # compare spectra 

171 distance = cosine_similarity(gt_spec, pred_spec) 

172 

173 # append to results 

174 outs.append(float(distance)) 

175 

176 df["SpecCorrelation"] = outs 

177 df.to_csv(out_pin, index=False, sep="\t") 

178 return df 

179 

180 

181def predict_df( 

182 df: pd.DataFrame, impute_collision_energy=False, model: PepTransformerModel = None 

183) -> str: 

184 """ 

185 Predicts the spectra from a precursor list as defined by Skyline 

186 

187 Args: 

188 df (pd.DataFrame): 

189 A data frame containing minimum 3 columns 

190 modified_sequence ('Modified Sequence'), 

191 collision_energy ('CE'), 

192 precursor_charge ('Precursor Charge') 

193 impute_collision_energy (Union[bool, float]): 

194 Either False or a collision energy to use 

195 predicting the spectra 

196 """ 

197 OPTION_1_NAMES = ["Modified Sequence", "CE", "Precursor Charge"] 

198 OPTION_2_NAMES = ["modified_sequence", "collision_energy", "precursor_charge"] 

199 

200 if OPTION_1_NAMES[0] in list(df): 

201 names = OPTION_1_NAMES 

202 elif OPTION_2_NAMES[0] in list(df): 

203 names = OPTION_2_NAMES 

204 else: 

205 raise ValueError( 

206 "Names in the data frame dont match any of the posible options" 

207 ) 

208 

209 if names[1] not in list(df): 

210 if impute_collision_energy: 

211 df[names[1]] = impute_collision_energy 

212 else: 

213 raise ValueError( 

214 f"Didn't find a collision enery column with name {names[1]}," 

215 " please provide one or a value for `impute_collision_energy`" 

216 ) 

217 

218 if model is None: 

219 model = PepTransformerModel.load_from_checkpoint( 

220 elfragmentador.DEFAULT_CHECKPOINT 

221 ) 

222 

223 my_iter = tqdm(zip(df[names[0]], df[names[1]], df[names[2]]), total=len(df)) 

224 out = [] 

225 

226 for seq, nce, charge in my_iter: 

227 pred_spec = model.predict_from_seq( 

228 seq=seq, charge=int(charge), nce=nce, as_spectrum=True 

229 ) 

230 out.append(pred_spec.to_sptxt()) 

231 

232 return "\n".join(out) 

233 

234 

235def get_random_peptide(): 

236 AAS = [x for x in CONSTANTS.ALPHABET if x.isupper()] 

237 len_pep = random.randint(5, CONSTANTS.MAX_SEQUENCE) 

238 out_pep = "" 

239 

240 for _ in range(len_pep): 

241 out_pep += "".join(random.sample(AAS, 1)) 

242 

243 return out_pep