Coverage for utils.py : 18%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import re
2import random
3from pathlib import Path
4from typing import Union, Iterable
5import logging
7from pyteomics import mzml
8import pandas as pd
9from tqdm.auto import tqdm
11import elfragmentador
12import elfragmentador.constants as CONSTANTS
13from elfragmentador.spectra import Spectrum
14from elfragmentador.model import PepTransformerModel
16import torch
17from torch.nn.functional import cosine_similarity
19import warnings
22# TODO split addition of metadata and actual predictions to separate functions to
25def append_preds(
26 in_pin: Union[Path, str], out_pin: Union[Path, str], model: PepTransformerModel
27) -> pd.DataFrame:
28 """Append cosine similarity to prediction to a percolator input
30 Args:
31 in_pin (Union[Path, str]): Input Percolator file location
32 out_pin (Union[Path, str]): Output percolator file location
33 model (PepTransformerModel): Transformer model to use
35 Returns:
36 pd.DataFrame: Pandas data frame with the appended column
37 """
39 warnings.filterwarnings(
40 "ignore",
41 ".*peaks were annotated for spectra.*",
42 )
44 # Read pin
45 NUM_COLUMNS = 28
47 # The appendix is in the form of _SpecNum_Charge_ID
48 regex_file_appendix = re.compile("_\d+_\d+_\d+$")
49 appendix_charge_regex = re.compile("(?<=_)\d+(?=_)")
50 dot_re = re.compile("(?<=\.).*(?=\..*$)")
51 template_string = "controllerType=0 controllerNumber=1 scan={SCAN_NUMBER}"
52 in_pin_path = Path(in_pin)
54 df = pd.read_csv(
55 in_pin,
56 sep="\t",
57 index_col=False,
58 usecols=list(range(NUM_COLUMNS)),
59 )
60 # TODO fix so the last column remains unchanged, right now it keeps
61 # only the first protein because the field is not quoted in comet
62 # outputs
64 logging.info(df)
66 """
67 Col names should be:
68 SpecId
69 Label
70 ScanNr
71 ExpMass
72 CalcMass
73 Peptide
74 mokapot score
75 mokapot q-value
76 mokapot PEP
77 Proteins
78 """
80 # This would allow skipping several reads on the file (which is fairly quick)
81 # df = df.sort_values(by=['SpecId', 'ScanNr']).reset_index(drop=True).copy()
82 # This would allow skipping several predictions... which right now are fairly slow
83 df = df.sort_values(by=["Peptide", "CalcMass"]).reset_index(drop=True).copy()
84 df.insert(loc=NUM_COLUMNS - 2, column="SpecCorrelation", value=0)
86 mzml_readers = {}
87 scan_id = None
88 last_seq = None
89 last_charge = None
90 last_nce = None
91 outs = []
93 tqdm_postfix = {
94 "cached_reads": 0,
95 "cached_predictions": 0,
96 "predictions": 0,
97 }
99 tqdm_iter = tqdm(df.iterrows(), total=len(df))
101 # TODO check if batching would improve inference speed
102 for index, row in tqdm_iter:
103 row_rawfile = re.sub(regex_file_appendix, "", row.SpecId)
104 row_appendix = regex_file_appendix.search(row.SpecId)[0]
106 curr_charge = int(appendix_charge_regex.search(row_appendix, 2)[0])
107 peptide_sequence = dot_re.search(row.Peptide)[0]
109 rawfile_path = Path(in_pin_path.parent / (row_rawfile + ".mzML"))
110 assert rawfile_path.is_file(), f"{rawfile_path} does not exist"
112 if mzml_readers.get(str(rawfile_path), None) is None:
113 mzml_readers[str(rawfile_path)] = mzml.PreIndexedMzML(str(rawfile_path))
115 old_scan_id = scan_id
116 scan_id = template_string.format(SCAN_NUMBER=row.ScanNr)
118 if old_scan_id != scan_id:
119 # read_spectrum
120 curr_scan = mzml_readers[str(rawfile_path)].get_by_id(scan_id)
121 nce = curr_scan["precursorList"]["precursor"][0]["selectedIonList"][
122 "selectedIon"
123 ][0]["charge state"]
124 else:
126 tqdm_postfix["cached_reads"] += 1
127 tqdm_iter.set_postfix(tqdm_postfix)
129 # convert spectrum to model "output"
130 curr_spec_object = Spectrum(
131 sequence=peptide_sequence,
132 parent_mz=row.ExpMass,
133 charge=curr_charge,
134 mzs=curr_scan["m/z array"],
135 intensities=curr_scan["intensity array"],
136 nce=nce,
137 )
139 # predict spectrum
140 with torch.no_grad():
141 if (
142 last_seq != peptide_sequence
143 or last_charge != curr_charge
144 or last_nce != nce
145 ):
146 last_seq = peptide_sequence
147 last_charge = curr_charge
148 last_nce = nce
150 pred_irt, pred_spec = model.predict_from_seq(
151 seq=peptide_sequence,
152 charge=curr_charge,
153 nce=nce,
154 )
155 pred_spec = torch.stack([pred_spec])
156 tqdm_postfix["predictions"] += 1
157 tqdm_iter.set_postfix(tqdm_postfix)
158 else:
159 tqdm_postfix["cached_predictions"] += 1
160 tqdm_iter.set_postfix(tqdm_postfix)
162 # Get ground truth spectrum
163 try:
164 gt_spec = torch.stack([torch.Tensor(curr_spec_object.encode_spectra())])
166 except AssertionError as e:
167 if "No peaks were annotated in this spectrum" in str(e):
168 gt_spec = torch.zeros_like(pred_spec)
170 # compare spectra
171 distance = cosine_similarity(gt_spec, pred_spec)
173 # append to results
174 outs.append(float(distance))
176 df["SpecCorrelation"] = outs
177 df.to_csv(out_pin, index=False, sep="\t")
178 return df
181def predict_df(
182 df: pd.DataFrame, impute_collision_energy=False, model: PepTransformerModel = None
183) -> str:
184 """
185 Predicts the spectra from a precursor list as defined by Skyline
187 Args:
188 df (pd.DataFrame):
189 A data frame containing minimum 3 columns
190 modified_sequence ('Modified Sequence'),
191 collision_energy ('CE'),
192 precursor_charge ('Precursor Charge')
193 impute_collision_energy (Union[bool, float]):
194 Either False or a collision energy to use
195 predicting the spectra
196 """
197 OPTION_1_NAMES = ["Modified Sequence", "CE", "Precursor Charge"]
198 OPTION_2_NAMES = ["modified_sequence", "collision_energy", "precursor_charge"]
200 if OPTION_1_NAMES[0] in list(df):
201 names = OPTION_1_NAMES
202 elif OPTION_2_NAMES[0] in list(df):
203 names = OPTION_2_NAMES
204 else:
205 raise ValueError(
206 "Names in the data frame dont match any of the posible options"
207 )
209 if names[1] not in list(df):
210 if impute_collision_energy:
211 df[names[1]] = impute_collision_energy
212 else:
213 raise ValueError(
214 f"Didn't find a collision enery column with name {names[1]},"
215 " please provide one or a value for `impute_collision_energy`"
216 )
218 if model is None:
219 model = PepTransformerModel.load_from_checkpoint(
220 elfragmentador.DEFAULT_CHECKPOINT
221 )
223 my_iter = tqdm(zip(df[names[0]], df[names[1]], df[names[2]]), total=len(df))
224 out = []
226 for seq, nce, charge in my_iter:
227 pred_spec = model.predict_from_seq(
228 seq=seq, charge=int(charge), nce=nce, as_spectrum=True
229 )
230 out.append(pred_spec.to_sptxt())
232 return "\n".join(out)
235def get_random_peptide():
236 AAS = [x for x in CONSTANTS.ALPHABET if x.isupper()]
237 len_pep = random.randint(5, CONSTANTS.MAX_SEQUENCE)
238 out_pep = ""
240 for _ in range(len_pep):
241 out_pep += "".join(random.sample(AAS, 1))
243 return out_pep