Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1""" 

2Greatly inspired/copied from: 

3https://github.com/kusterlab/prosit/blob/master/prosit 

4 

5And released under an Apache 2.0 license 

6""" 

7 

8import collections 

9from collections import OrderedDict, defaultdict 

10from typing import Callable, Dict, List, Tuple, Union, Iterator 

11import warnings 

12 

13import numpy 

14from numpy import bool_, float64, ndarray 

15 

16from elfragmentador import constants, encoding_decoding 

17 

18 

19def solve_alias(x): 

20 try: 

21 x = x if len(x) == 1 else x[:1] + f"[{constants.MOD_PEPTIDE_ALIASES[x]}]" 

22 except KeyError: 

23 x = ( 

24 x 

25 if len(x) == 1 

26 else x[:1] + f"[{constants.MOD_PEPTIDE_ALIASES[x.replace('[', '[+')]}]" 

27 ) 

28 

29 x = x if len(x) != 3 else x[:1] # Takes care of C[] 

30 

31 return x 

32 

33 

34def peptide_parser(p: str, solve_aliases=False) -> Iterator[str]: 

35 """ 

36 Parses maxquant formatted peptide strings 

37 

38 Examples 

39 ======== 

40 >>> list(peptide_parser("AAACC")) 

41 ['n', 'A', 'A', 'A', 'C', 'C', 'c'] 

42 >>> list(peptide_parser("AAAM(ox)CC")) 

43 ['n', 'A', 'A', 'A', 'M(ox)', 'C', 'C', 'c'] 

44 >>> list(peptide_parser("AAAM[+16]CC")) 

45 ['n', 'A', 'A', 'A', 'M[+16]', 'C', 'C', 'c'] 

46 """ 

47 

48 ANNOTATIONS = "[](){}" 

49 

50 # This fixes a bug where passing a list would yield the incorrect results 

51 p = "".join(p) 

52 

53 if p[0] in ANNOTATIONS: 

54 raise ValueError(f"sequence starts with '{p[0]}'") 

55 n = len(p) 

56 i = 0 

57 

58 # Yield n terminus if its not explicit in the sequence 

59 if p[0] != "n": 

60 yield "n" 

61 

62 while i < n: 

63 if p[i] == "_": 

64 i += 1 

65 continue 

66 elif i + 1 < n and p[i + 1] in ANNOTATIONS: 

67 p_ = p[i + 2 :] 

68 annots = [x for x in ANNOTATIONS if x in p_] 

69 nexts = [] 

70 for an in annots: 

71 nexts.append(p_.index(an)) 

72 j = min(nexts) 

73 offset = i + j + 3 

74 out = p[i:offset] 

75 yield_value = solve_alias(out) if solve_aliases else out 

76 i = offset 

77 else: 

78 yield_value = p[i] 

79 i += 1 

80 

81 yield yield_value 

82 

83 # Yield c terminus if its not explicit in the sequence 

84 if yield_value != "c": 

85 yield "c" 

86 

87 

88def mass_diff_encode_seq(seq): 

89 iter = peptide_parser(seq, solve_aliases=True) 

90 iter = encoding_decoding.clip_explicit_terminus(list(iter)) 

91 # For some reason skyline detects T[80] but not T[+80] ... 

92 # And does not detect T[181] as a valid mod ... 

93 out = "".join([constants.MASS_DIFF_ALIASES_I[x].replace("+", "") for x in iter]) 

94 return out 

95 

96 

97def canonicalize_seq(seq: str, robust: bool = False) -> str: 

98 """canonicalize_seq Solves all modification aliases in a sequence. 

99 

100 Given a sequence, converts al supported modification aliases to the 

101 "canonical" version of them and returns the new version. 

102 

103 Parameters 

104 ---------- 

105 seq : str 

106 Modified peptide sequence, for example "PEPTIDE[+23]TIDE") 

107 robust : bool, optional 

108 Wether you want error to be silent and return none when they happen, by default False 

109 

110 Returns 

111 ------- 

112 str 

113 Same sequence as input but with all mod aliases replaced for the primary 

114 one in this package 

115 

116 Raises 

117 ------ 

118 e 

119 [description] 

120 """ 

121 try: 

122 out = "".join(peptide_parser(seq, solve_aliases=True)) 

123 except KeyError as e: 

124 out = None 

125 if not robust: 

126 warnings.warn(f"Non-supported sequence found in {seq}") 

127 raise e 

128 

129 return out 

130 

131 

132def get_theoretical_mass(peptide: str): 

133 """ 

134 Calculates the theoretical mass of a peptide 

135 

136 Example 

137 ------- 

138 >>> get_theoretical_mass("MYPEPTIDE") 

139 1093.4637787 

140 """ 

141 aas = peptide_parser(peptide) 

142 out = sum([constants.MOD_AA_MASSES[a] for a in aas]) 

143 return out 

144 

145 

146def get_precursor_mz(peptide: str, charge: int): 

147 """ 

148 Calculates the theoretical mass/charge of a precursor peptide 

149 (assumes positive mode) 

150 

151 Example 

152 ------- 

153 >>> get_precursor_mz("MYPEPTIDE", 1) 

154 1094.471055167 

155 >>> get_precursor_mz("MYPEPTIDE", 2) 

156 547.739165817 

157 """ 

158 return get_mz(get_theoretical_mass(peptide), 0, charge) 

159 

160 

161def get_forward_backward(peptide: str) -> Tuple[ndarray, ndarray]: 

162 """ 

163 Calculates masses forward and backwards from aminoacid 

164 sequences 

165 

166 Examples 

167 ======== 

168 >>> get_forward_backward("AMC") 

169 (array([ 1.00782503, 72.04493903, 203.08542403, 363.11607276, 

170 380.11881242]), array([ 17.00273967, 177.03338839, 308.07387339, 379.11098739, 

171 380.11881242])) 

172 >>> get_forward_backward("AM[147]C") 

173 (array([ 1.00782503, 72.04493903, 219.08033403, 379.11098276, 

174 396.11372242]), array([ 17.00273967, 177.03338839, 324.06878339, 395.10589739, 

175 396.11372242])) 

176 >>> get_forward_backward("n[+42]AM[147]C") 

177 (array([ 43.01839004, 114.05550404, 261.09089904, 421.12154776, 

178 438.12428742]), array([ 17.00273967, 177.03338839, 324.06878339, 395.10589739, 

179 438.12428742])) 

180 """ 

181 amino_acids = peptide_parser(peptide) 

182 masses = [constants.MOD_AA_MASSES[a] for a in amino_acids] 

183 forward = numpy.cumsum(masses) 

184 backward = numpy.cumsum(masses[::-1]) 

185 return forward, backward 

186 

187 

188def get_mz(sum_: float64, ion_offset: float, charge: int) -> float64: 

189 return (sum_ + ion_offset + charge * constants.PROTON) / charge 

190 

191 

192def get_mzs(cumsum: ndarray, ion_type: str, z: int) -> List[float64]: 

193 # return (cumsum[:-1] + constants.ION_OFFSET[ion_type] + (z * constants.PROTON))/z 

194 # TODO this can be vectorized if needed 

195 return [get_mz(s, constants.ION_OFFSET[ion_type], z) for s in cumsum[:-2]][1:] 

196 

197 

198def get_annotation( 

199 forward: ndarray, backward: ndarray, charge: int, ion_types: str 

200) -> OrderedDict: 

201 """ 

202 Calculates the ion annotations based on the forward 

203 and backward cumulative masses 

204 

205 Example 

206 ======= 

207 >>> fw, bw = get_forward_backward("AMC") 

208 >>> fw 

209 array([ 1.00782503, 72.04493903, 203.08542403, 363.11607276, 380.11881242]) 

210 >>> bw 

211 array([ 17.00273967, 177.03338839, 308.07387339, 379.11098739, 380.11881242]) 

212 >>> get_annotation(fw, bw, 3, "y") 

213 OrderedDict([('y1', 60.354347608199994), ...]) 

214 """ 

215 tmp = "{}{}" 

216 tmp_nl = "{}{}-{}" 

217 all_ = {} 

218 for ion_type in ion_types: 

219 if ion_type in constants.FORWARD: 

220 cummass = forward 

221 elif ion_type in constants.BACKWARD: 

222 cummass = backward 

223 else: 

224 raise ValueError("unknown ion_type: {}".format(ion_type)) 

225 masses = get_mzs(cummass, ion_type, charge) 

226 d = {tmp.format(ion_type, i + 1): m for i, m in enumerate(masses)} 

227 all_.update(d) 

228 """ 

229 for nl, offset in constants.NEUTRAL_LOSS.items(): 

230 nl_masses = get_mzs(cummass - offset, ion_type, charge) 

231 d = {tmp_nl.format(ion_type, i + 1, nl): m for i, m in enumerate(nl_masses)} 

232 all_.update(d) 

233 """ 

234 return collections.OrderedDict(sorted(all_.items(), key=lambda t: t[0])) 

235 

236 

237def get_peptide_ions(aa_seq: str) -> Dict[str, float64]: 

238 """Gets the theoretical masses of fragment ions 

239 

240 Args: 

241 aa_seq (str): Aminoacid sequence with modifications 

242 

243 Returns: 

244 Dict[str, float64]: Keys are ion names and values are the mass 

245 

246 Examples: 

247 >>> foo = get_peptide_ions("AA") 

248 >>> sorted(foo.keys()) 

249 ['z1b1', 'z1y1', 'z2b1', 'z2y1', 'z3b1', 'z3y1'] 

250 >>> foo['z1y1'] # ground truth from http://db.systemsbiology.net:8080/proteomicsToolkit/FragIonServlet.html 

251 90.054955167 

252 >>> foo['z1b1'] 

253 72.044390467 

254 """ 

255 out = _get_peptide_ions( 

256 aa_seq, 

257 charges=range(1, constants.MAX_FRAG_CHARGE + 1), 

258 ion_types=constants.ION_TYPES, 

259 ) 

260 return out 

261 

262 

263def _get_peptide_ions( 

264 aa_seq: str, 

265 charges: Union[List[int], range] = range(1, 5), 

266 ion_types: Union[str, List[str]] = "yb", 

267) -> Dict[str, float64]: 

268 """ 

269 

270 Examples 

271 ======== 

272 >>> foo = _get_peptide_ions("AA", [1,2]) 

273 >>> foo 

274 {'z1y1': 90.054955167, ...} 

275 """ 

276 fw, bw = get_forward_backward(aa_seq) 

277 out = {} 

278 

279 for charge in charges: 

280 for ion in ion_types: 

281 ion_dict = get_annotation(fw, bw, charge, ion) 

282 ion_dict = {"z" + str(charge) + k: v for k, v in ion_dict.items()} 

283 out.update(ion_dict) 

284 

285 return out 

286 

287 

288def get_tolerance( 

289 theoretical: float64, tolerance: int = 25, unit: str = "ppm" 

290) -> float64: 

291 if unit == "ppm": 

292 return theoretical * float(tolerance) / 10 ** 6 

293 elif unit == "da": 

294 return float(tolerance) 

295 else: 

296 raise ValueError("unit {} not implemented".format(unit)) 

297 

298 

299def is_in_tolerance( 

300 theoretical: float64, observed: float, tolerance: int = 25, unit: str = "ppm" 

301) -> bool_: 

302 mz_tolerance = get_tolerance(theoretical, tolerance, unit) 

303 lower = observed - mz_tolerance 

304 upper = observed + mz_tolerance 

305 return theoretical >= lower and theoretical <= upper 

306 

307 

308def annotate_peaks(theoretical_peaks, mzs, intensities, tolerance=25, unit="ppm"): 

309 annots = {} 

310 max_delta = tolerance if unit == "da" else max(mzs) * tolerance / 1e6 

311 

312 raise DeprecationWarning( 

313 "Use `annotate_peaks2`, this version of the" 

314 " function is deprecated and will be removed in the future" 

315 ) 

316 

317 for mz, inten in zip(mzs, intensities): 

318 matching = { 

319 k: inten 

320 for k, v in theoretical_peaks.items() 

321 if abs(mz - v) <= max_delta and is_in_tolerance(v, mz, tolerance, unit) 

322 } 

323 annots.update(matching) 

324 

325 max_int = max([v for v in annots.values()] + [0]) 

326 annots = {k: v / max_int for k, v in annots.items()} 

327 return annots 

328 

329 

330def is_sorted( 

331 lst: Union[ 

332 List[List[Union[int, float]]], 

333 List[List[float]], 

334 List[List[Union[str, float64]]], 

335 List[List[Union[float64, int]]], 

336 ], 

337 key: Callable = lambda x: x, 

338) -> bool: 

339 for i, el in enumerate(lst[1:]): 

340 if key(el) < key(lst[i]): # i is the index of the previous element 

341 return False 

342 return True 

343 

344 

345def sort_if_needed( 

346 lst: Union[ 

347 List[List[Union[int, float]]], 

348 List[List[float]], 

349 List[List[Union[str, float64]]], 

350 List[List[Union[float64, int]]], 

351 ], 

352 key: Callable = lambda x: x, 

353) -> None: 

354 if not is_sorted(lst, key): 

355 lst.sort(key=key) 

356 

357 

358def annotate_peaks2( 

359 theoretical_peaks: Dict[str, float64], 

360 mzs: Union[List[float64], List[float]], 

361 intensities: Union[List[float], List[int]], 

362 tolerance: int = 25, 

363 unit: str = "ppm", 

364) -> Dict[str, float]: 

365 max_delta = tolerance if unit == "da" else max(mzs) * tolerance / 1e6 

366 

367 mz_pairs = [[m, i] for m, i in zip(mzs, intensities)] 

368 theo_peaks = [[k, v] for k, v in theoretical_peaks.items()] 

369 

370 sort_if_needed(mz_pairs, key=lambda x: x[0]) 

371 sort_if_needed(theo_peaks, key=lambda x: x[1]) 

372 

373 theo_iter = iter(theo_peaks) 

374 curr_theo_key, curr_theo_val = next(theo_iter) 

375 

376 annots = defaultdict(lambda: 0) 

377 for mz, inten in mz_pairs: 

378 deltamass = mz - curr_theo_val 

379 try: 

380 while deltamass >= max_delta: 

381 curr_theo_key, curr_theo_val = next(theo_iter) 

382 deltamass = mz - curr_theo_val 

383 except StopIteration: 

384 pass 

385 

386 in_deltam = abs(deltamass) <= max_delta 

387 if in_deltam and abs(deltamass) <= get_tolerance( 

388 curr_theo_val, tolerance, unit 

389 ): 

390 annots[curr_theo_key] += inten 

391 else: 

392 try: 

393 while True: 

394 curr_theo_key, curr_theo_val = next(theo_iter) 

395 deltamass = mz - curr_theo_val 

396 if deltamass < -max_delta: 

397 break 

398 in_deltam = abs(deltamass) <= max_delta 

399 if in_deltam and abs(deltamass) <= get_tolerance( 

400 curr_theo_val, tolerance, unit 

401 ): 

402 annots[curr_theo_key] += inten 

403 except StopIteration: 

404 pass 

405 

406 max_int = max([v for v in annots.values()] + [0]) 

407 annots = {k: v / max_int for k, v in annots.items()} 

408 return annots