Coverage for spectra.py : 45%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2Contains utilities to represent spectra as well as functions to read them in bulk from
3.sptxt files
4"""
6from __future__ import annotations
7import logging
9try:
10 import matplotlib.pyplot as plt
11except ImportError:
12 plt = None
14import warnings
15from typing import Iterator, Dict, Optional, List, Sequence, Union
16from pathlib import Path
18from elfragmentador import constants, annotate, encoding_decoding, scoring
19import elfragmentador
20from elfragmentador.encoding_decoding import get_fragment_encoding_labels, SequencePair
22from pandas.core.frame import DataFrame
23import numpy as np
24from tqdm.auto import tqdm
27class Spectrum:
28 """Represents Spectra and bundles methods to annotate peaks."""
30 __SPTXT_TEMPLATE = (
31 "Name: {name}\n"
32 # "LibID: {lib_id}\n"
33 "MW: {mw}\n"
34 "PrecursorMZ: {precursor_mz}\n"
35 "FullName: {full_name}\n"
36 "Comment: {comment}\n"
37 "Num Peaks: {num_peaks}\n"
38 "{peaks}\n\n"
39 )
41 def __init__(
42 self,
43 sequence: str,
44 charge: int,
45 parent_mz: Union[float, int],
46 mzs: Union[List[float], List[int]],
47 intensities: Union[List[float], List[int]],
48 nce: Optional[float],
49 modifications: Optional[str] = None,
50 instrument: None = None,
51 analyzer: str = "FTMS",
52 rt: Optional[float] = None,
53 irt: Optional[float] = None,
54 raw_spectra: Optional[str] = None,
55 nreps: Optional[int] = None,
56 ) -> None:
57 """
58 Representation of spectra with methods to convert from and to encodings.
60 This class provides a way to represent spectra and its associated peptide sequence,
61 as well as multiple methods to convert these information to the encoding required
62 for training the main model
64 Parameters
65 ----------
66 sequence : str
67 String representing the aminoacid sequence with or without modifications
68 charge : int
69 Charge of the precursor associated with the spectrum
70 parent_mz : float
71 Observed m/z of the precursor
72 mzs : List[float]
73 Iterable of the observed masses of the fragment ions
74 intensities : List[float]
75 Iterable of the observed intensities of the fragment ions, should
76 match in length with the length of `mzs`
77 nce : float
78 Collision Energy used during the fragmentation.
79 modifications : str, optional
80 Currently unused string describing the modifications, by default None
81 instrument : str, optional
82 Currently unused instrument where the spectra was collected, by default None
83 analyzer : str, optional
84 Either of 'FTMS', 'ITMS', 'TripleTOF', despite the annotation working on all
85 of them, the model currently supports only FTMS data, by default FTMS
86 rt : float, optional
87 Retention time of the spectra, by default None
88 raw_spectra : str, optional
89 String describing the file where the spectra originated, by default None
90 nreps : int, optional
91 Integer describing how many spectra were used to generate this concensus spectrum
92 """
93 tolerance, tolerance_unit = constants.TOLERANCE[analyzer]
94 parsed_peptide = list(annotate.peptide_parser(sequence, solve_aliases=True))
96 # Makes sure all elements in the sequence are aminoacids
97 assert set(parsed_peptide) <= constants.AMINO_ACID_SET.union(
98 constants.MOD_PEPTIDE_ALIASES
99 ), f"Assertion of supported modifications failed for {sequence}: {parsed_peptide}"
100 sequence = "".join(parsed_peptide)
101 self.sequence = "".join([x[:1] for x in parsed_peptide])
102 self.mod_sequence = sequence
103 self.length = len(encoding_decoding.clip_explicit_terminus(parsed_peptide))
104 self.charge = charge
105 self.parent_mz = parent_mz
106 self.modifications = modifications
108 amino_acids = list(annotate.peptide_parser(sequence))
110 # TODO consider if this masses should be calculated in a lazy manner
111 # TODO redefine these with the functions inside annotate
112 self.theoretical_mass = annotate.get_theoretical_mass(amino_acids)
113 self.theoretical_mz = (
114 self.theoretical_mass + (charge * constants.PROTON)
115 ) / charge
117 assert len(mzs) == len(intensities)
118 self.mzs = mzs
119 self.intensities = intensities
121 self.delta_m = abs(self.parent_mz - self.theoretical_mz)
122 self.delta_ppm = 1e6 * abs(self.delta_m) / self.theoretical_mz
124 # Annotation related section
125 self.tolerance = tolerance
126 self.tolerance_unit = tolerance_unit
128 # Dict with ION_NAME: ION MASS
129 self._theoretical_peaks = annotate.get_peptide_ions(self.mod_sequence)
131 self._annotated_peaks = None
132 self._delta_ascore = None
133 self.nce = nce
134 self.instrument = instrument
135 self.analyzer = analyzer
136 self.rt = rt
137 self.irt = irt
138 self.raw_spectra = raw_spectra
139 self.nreps = nreps
141 @classmethod
142 def theoretical_spectrum(
143 cls,
144 seq: str,
145 charge: int,
146 ) -> Spectrum:
147 """theoretical_spectrum Generates theoretical spectra from sequences.
149 Parameters
150 ----------
151 seq : str
152 Peptide sequence
153 charge : int
154 Precursor charge to use
156 Returns
157 -------
158 Spectrum
159 A spectrum object with 1 as the theoretical intensities
161 Examples
162 --------
163 >>> spec = Spectrum.theoretical_spectrum("MYPEPTIDE", 2)
164 >>> spec
165 Spectrum:
166 Sequence: MYPEPTIDE len:9
167 Mod.Sequence: MYPEPTIDE
168 Charge: 2
169 MZs: [132.047761467, 148.060434167, 263.087377167]...16
170 Ints: [1.0, 1.0, 1.0]...16
171 Instrument: None
172 Analyzer: FTMS
173 NCE: None
174 RT: None
175 OriginalSpectra: Predicted
176 Annotations: {'z2y2': 1.0, 'z2b2': 1.0, ...
177 """
178 ions = annotate.get_peptide_ions(seq)
179 ions = {k: v for k, v in ions.items() if int(k[1]) < charge}
180 parent_mz = annotate.get_precursor_mz(seq, charge)
181 mzs = list(ions.values())
182 mzs = sorted(mzs)
183 intensities = [1.0 for _ in mzs]
185 spec = cls(
186 sequence=seq,
187 charge=charge,
188 parent_mz=parent_mz,
189 mzs=mzs,
190 intensities=intensities,
191 nce=None,
192 raw_spectra="Predicted",
193 )
195 spec.annotated_peaks
197 return spec
199 @classmethod
200 def from_tensors(
201 cls,
202 sequence_tensor: List[int],
203 fragment_tensor: List[int],
204 mod_tensor: None = None,
205 charge: int = 2,
206 nce: float = 27.0,
207 parent_mz: int = 0,
208 *args,
209 **kwargs,
210 ) -> Spectrum:
211 """
212 from_tensors Encodes iterables into a Spectrum object.
214 This method is an utility function to create Spectrum objects from the encoded
215 iterables or tensors. The encoding entail two iterables of integers,
216 the sequence and the fragment (optionally the modifications).
218 For the values of the encoding please visit the constants submodule
220 Parameters
221 ----------
222 sequence_tensor : List[int]
223 [description]
224 fragment_tensor : List[float]
225 [description]
226 mod_tensor : List[int], optional
227 [description], by default None
228 charge: int
229 nce: float
230 Normalized collision energy
232 Returns
233 -------
234 Spectrum
235 A spectrum object decoding the sequences
237 Examples
238 --------
239 >>> Spectrum.from_tensors([1, 1, 2, 3, 0, 0, 0, 0, 0, 0], [0]*constants.NUM_FRAG_EMBEDINGS)
240 Spectrum:
241 Sequence: AACD len:4
242 Mod.Sequence: AACD
243 Charge: 2
244 MZs: [72.044390467, 134.044784167, 143.081504467]...18
245 Ints: [0.0, 0.0, 0.0]...18
246 Instrument: None
247 Analyzer: FTMS
248 NCE: 27.0
249 RT: None
250 OriginalSpectra: None
251 """
252 mod_sequence = encoding_decoding.decode_mod_seq(sequence_tensor, mod_tensor)
253 fragment_df = encoding_decoding.decode_fragment_tensor(
254 mod_sequence, fragment_tensor
255 )
256 self = cls(
257 mod_sequence,
258 mzs=[float(x) for x in fragment_df["Mass"]],
259 intensities=[float(x) for x in fragment_df["Intensity"]],
260 charge=charge,
261 parent_mz=parent_mz,
262 nce=nce,
263 *args,
264 **kwargs,
265 )
266 return self
268 def precursor_error(self, error_type: str = "ppm") -> float:
269 """
270 precursor_error Calculates the mass error of the precursor.
272 Calculates the mass error of the precursor, knowing the sequence,
273 and modifications, in addition to the observed mass
275 Parameters
276 ----------
277 error_type : ppm or da, optional
278 The type of mass error that will be calculated, by default "ppm"
280 Returns
281 -------
282 float
283 the mass error...
285 Raises
286 ------
287 NotImplementedError
288 Raises this error if any error type other than ppm or da is provided
290 Examples
291 --------
292 >>> myspec = Spectrum("AA", charge=1, parent_mz=161.0920, mzs=[101.0713], intensities=[299.0], nce = 27.0, )
293 >>> myspec.precursor_error("ppm")
294 0.42936316076909053
295 >>> myspec = Spectrum("AAAT[181]PAKKTVT[181]PAK", charge=3, parent_mz=505.5842, mzs=[101.0713, 143.0816, 147.1129], intensities=[299.0, 5772.5, 2537.1], nce = 27.0, )
296 >>> myspec.precursor_error("ppm")
297 0.06981956539363246
298 >>> myspec.precursor_error("da")
299 3.529966664927997e-05
300 """
301 if error_type == "ppm":
302 return self.delta_ppm
303 elif error_type == "da":
304 return self.delta_m
305 else:
306 raise NotImplementedError(
307 "Not a know error type, select either of 'ppm' or 'da'"
308 )
310 def _annotate_peaks(self) -> None:
311 annots = annotate.annotate_peaks2(
312 self._theoretical_peaks,
313 self.mzs,
314 self.intensities,
315 self.tolerance,
316 self.tolerance_unit,
317 )
318 assert (
319 len(annots) > 0
320 ), f"No peaks were annotated in this spectrum {self.sequence}"
321 if len(annots) < 3:
322 warnings.warn(
323 f"Less than 3 ({len(annots)}) peaks were"
324 f" annotated for spectra {self.sequence}"
325 )
327 self._annotated_peaks = annots
329 def _calculate_delta_ascore(self) -> None:
330 self._delta_ascore = scoring.calc_delta_ascore(
331 seq=encoding_decoding.clip_explicit_terminus(self.mod_sequence),
332 mod=list(constants.VARIABLE_MODS.keys()),
333 aas=list(constants.VARIABLE_MODS.values()),
334 mzs=self.mzs,
335 ints=self.intensities,
336 )
338 @property
339 def delta_ascore(self) -> float:
340 if self._delta_ascore is None:
341 self._calculate_delta_ascore()
343 return self._delta_ascore
345 def encode_spectra(
346 self, dry: bool = False
347 ) -> Union[List[Union[int, float]], List[str]]:
348 """
349 encode_spectra Produce encoded sequences from your spectrum object.
351 It produces a list of integers that represents the spectrum, the labels correspond
352 to the ones in constants.FRAG_EMBEDING_LABELS, but can also be acquired using the
353 argument dry=True
355 Parameters
356 ----------
357 dry : bool, optional
358 wether to actually compute the encoding or only return the labels, by default False
360 Returns
361 -------
362 List[int] or List[str]
363 The list of intensities matching the ions (normalized to the highest) or
364 the labels for such ions, depending on wether the dry argument was passed or not
367 Examples
368 --------
369 >>> myspec = Spectrum("AAAT[181]PAKKTVT[181]PAK", charge=3, parent_mz=505.5842, mzs=[101.0713, 143.0816, 147.1129], intensities=[299.0, 5772.5, 2537.1], nce = 27.0)
370 >>> myspec.encode_spectra()
371 [0, 0.4395149415331312, 1.0, 0, 0, 0, 0, ..., 0]
372 >>> len(myspec.encode_spectra())
373 174
374 >>> myspec.encode_spectra(dry=True)
375 ['z1b1', 'z1y1', 'z1b2', 'z1y2', 'z1b3',..., 'z3b29', 'z3y29']
376 """
377 if self._annotated_peaks is None and not dry:
378 self.annotated_peaks
379 self.num_matching_peaks = sum(
380 [1 for x in self.annotated_peaks.values() if x > 0]
381 )
383 if dry:
384 peak_annot = None
385 else:
386 peak_annot = self.annotated_peaks
388 return get_fragment_encoding_labels(annotated_peaks=peak_annot)
390 def encode_sequence(self) -> SequencePair:
391 """
392 encode_sequence returns the encoded sequence of the aminoacids/modifications.
394 It returns two lists representing the aminoacid and modification sequences, the
395 length of the sequence will correspond to constants.MAX_TENSOR_SEQUENCE.
397 The meaning of each corresponding index comes from constants.ALPHABET and
398 constants.MOD_INDICES. Some aliases for modifications are supported, check them
399 at constans.MOD_PEPTIDE_ALIASES
401 Returns
402 -------
403 Tuple[List[int], List[int]]
404 A named tuple with `aas` and `mods` as names, containging respectively the
405 encoding of aminoacids and modifications.
407 Examples
408 --------
409 >>> myspec = Spectrum("AAAT[181]PAKKTVT[181]PAK", charge=3, parent_mz=505.5842, mzs=[101.0713, 143.0816, 147.1129], intensities=[299.0, 5772.5, 2537.1], nce = 27.0)
410 >>> myspec.encode_sequence()
411 SequencePair(aas=[23, 1, 1, 1, 17, 13, 1, 9, 9, 17, 19, ..., 0], mods=[0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 5, ..., 0])
412 """
413 return encoding_decoding.encode_mod_seq(self.mod_sequence)
415 @property
416 def annotated_peaks(self) -> Dict[str, float]:
417 """
418 annotated_peaks Peaks in the spectra annotated as ions.
420 Contains the annotated ions and its corresponding intensity, normalized
421 to the highest one in the spectra
423 Returns
424 -------
425 dict
426 Keys are the charge and ion types (z1y1) and the values
427 are the normalized intensity of the ion.
429 Examples
430 --------
431 >>> myspec = Spectrum("AAAT[181]PAKKTVT[181]PAK", charge=3, parent_mz=505.5842, mzs=[101.0713, 143.0816, 147.1129], intensities=[299.0, 5772.5, 2537.1], nce = 27.0)
432 >>> myspec.annotated_peaks
433 {'z1b2': 1.0, 'z1y1': 0.4395149415331312}
434 """
435 if self._annotated_peaks is None:
436 self._annotate_peaks()
438 return self._annotated_peaks
440 def __repr__(self) -> str:
441 """
442 __repr__ Represents the summary of the spectrum for the console.
444 it is implicitly called by print but allows nice printing of the Spectrum
445 objects in the console for debugging purposes mainly.
447 Returns
448 -------
449 str
450 String representation of the object
452 Examples
453 --------
454 >>> myspec = Spectrum("MYPEPT[181]IDEK", 2, 200, [100, 200], [1e8, 1e7], nce=27.0)
455 >>> myspec
456 Spectrum:
457 Sequence: MYPEPTIDEK len:10
458 Mod.Sequence: MYPEPT[PHOSPHO]IDEK
459 Charge: 2
460 MZs: [100, 200]...2
461 Ints: [100000000.0, 10000000.0]...2
462 Instrument: None
463 Analyzer: FTMS
464 NCE: 27.0
465 RT: None
466 OriginalSpectra: None
467 """
468 out = (
469 "Spectrum:\n"
470 f"\tSequence: {encoding_decoding.clip_explicit_terminus(self.sequence)}"
471 f" len:{self.length}\n"
472 f"\tMod.Sequence: {encoding_decoding.clip_explicit_terminus(self.mod_sequence)}\n"
473 f"\tCharge: {self.charge}\n"
474 f"\tMZs: {self.mzs[:3]}...{len(self.mzs)}\n"
475 f"\tInts: {self.intensities[:3]}...{len(self.intensities)}\n"
476 f"\tInstrument: {self.instrument}\n"
477 f"\tAnalyzer: {self.analyzer}\n"
478 f"\tNCE: {self.nce}\n"
479 f"\tRT: {self.rt}\n"
480 f"\tOriginalSpectra: {self.raw_spectra}\n"
481 )
483 if self._annotated_peaks is not None:
484 out += f"\tAnnotations: {self._annotated_peaks}\n"
486 return out
488 def to_sptxt(self) -> str:
489 """
490 to_sptxt Represents the spectrum for an sptxt file
492 Returns
493 -------
494 str
495 String representation of the object
497 Examples
498 --------
499 >>> myspec = Spectrum("MYPEPT[181]IDEK", 2, 200, [100, 200], [1e8, 1e7], nce=27.0)
500 >>> myspec
501 Spectrum:
502 Sequence: MYPEPTIDEK len:10
503 Mod.Sequence: MYPEPT[PHOSPHO]IDEK
504 Charge: 2
505 MZs: [100, 200]...2
506 Ints: [100000000.0, 10000000.0]...2
507 Instrument: None
508 Analyzer: FTMS
509 NCE: 27.0
510 RT: None
511 OriginalSpectra: None
512 >>> print(myspec.to_sptxt())
513 Name: MYPEPT[80]IDEK/2
514 MW: 1301.5250727
515 PrecursorMZ: 651.769812817
516 FullName: MYPEPT[80]IDEK/2 (HCD)
517 Comment: CollisionEnergy=27.0 ...
518 Num Peaks: 2
519 100\t100000000.0\t"?"
520 200\t10000000.0\t"?"
523 """
524 mod_seq = annotate.mass_diff_encode_seq(self.mod_sequence)
525 name = f"{mod_seq}/{self.charge}"
526 mw = self.theoretical_mass
527 precursor_mz = self.theoretical_mz
528 full_name = name + " (HCD)"
529 comment = {
530 "CollisionEnergy": self.nce,
531 "Origin": f"ElFragmentador_v{elfragmentador.__version__}",
532 }
534 if self.rt is not None:
535 comment.update({"RetentionTime": self.rt})
537 if self.irt is not None:
538 comment.update({"iRT": self.irt})
540 comment = " ".join([f"{k}={v}" for k, v in comment.items()])
541 peak_list = [
542 f'{x}\t{y}\t"?"' for x, y in zip(self.mzs, self.intensities) if y > 0.001
543 ]
545 peaks = "\n".join(peak_list)
547 out = self.__SPTXT_TEMPLATE.format(
548 name=name,
549 mw=mw,
550 precursor_mz=precursor_mz,
551 full_name=full_name,
552 comment=comment,
553 num_peaks=len(peak_list),
554 peaks=peaks,
555 )
556 return out
558 def plot(self, mirror: Spectrum = None, ax=None):
559 try:
560 plt
561 except UnboundLocalError:
562 import matplotlib.pyplot as plt
564 if ax is not None:
565 plt = ax
567 display_sequence = encoding_decoding.clip_explicit_terminus(self.mod_sequence)
568 display_sequence = f"{display_sequence}/{self.charge}+ NCE={self.nce}"
569 try:
570 plt.title(display_sequence)
571 except TypeError:
572 plt.set_title(display_sequence)
573 plt.set_xlabel("m/z")
574 plt.set_ylabel("Relative Intensity")
576 plt.vlines(self.mzs, 0, self.intensities, color="blue")
577 plt.axhline(0, color="black")
579 for ion_name, intensity in self.annotated_peaks.items():
580 if intensity < np.max(self.intensities) * 0.01:
581 continue
583 plt.annotate(
584 ion_name,
585 xy=(self._theoretical_peaks[ion_name], intensity),
586 xytext=(1, 1),
587 textcoords="offset points",
588 horizontalalignment="center",
589 verticalalignment="bottom",
590 )
592 if mirror:
593 plt.vlines(mirror.mz, 0, mirror.intensities, color="red")
594 plt.vlines(0, -1, 1, color="gray")
595 else:
596 plt.vlines(0, 0, 1, color="gray")
597 # plt.show()
600def encode_sptxt(
601 filepath: Union[str, Path],
602 max_spec: float = 1e9,
603 min_peaks: int = 3,
604 min_delta_ascore: int = 20,
605 irt_fun: None = None,
606 *args,
607 **kwargs,
608) -> DataFrame:
609 """
610 encode_sptxt Convert an sptxt file to a dataframe containing encodings.
612 Converts the spectra contained in a .sptxt file to a pandas DataFrame that
613 contains the relevant fields required to train the main model.
615 Parameters
616 ----------
617 filepath : Path or str
618 Path of the .sptxt file to read
619 max_spec : int, optional
620 Maximum number of spectra to read, by default 1e9
621 min_peaks : int
622 Minimum number of annotated peaks for a spectrum to be added
623 irt_fun : [type], optional
624 Not yet implemented but would take a callable that converts
625 the retention times to iRTs, by default None
627 Returns
628 -------
629 DataFrame
630 DataFrame containing the data required to train the model
631 and would be taken by the PeptideDataset
632 # TODO specify the columns
634 Raises
635 ------
636 NotImplementedError
637 Raises this error when an iRT converter function is passed
638 because I have not implemented it....
639 """
640 iter = read_sptxt(filepath, *args, **kwargs)
642 sequences = []
643 mod_sequences = []
644 seq_encodings = []
645 mod_encodings = []
646 spectra_encodings = []
647 charges = []
648 rts = []
649 nces = []
650 orig = []
651 d_ascores = []
652 nreps = []
654 i = 0
655 skipped_spec = 0
656 for spec in tqdm(iter):
657 i += 1
658 if i >= max_spec:
659 break
661 # TODO add offset to skip the first x sequences and a way to make the selection random
662 seq_encode, mod_encode = spec.encode_sequence()
663 seq_encode, mod_encode = str(seq_encode), str(mod_encode)
665 try:
666 spec_encode = spec.encode_spectra()
667 spec_encode = [round(x, 5) for x in spec_encode]
668 spec_encode = str(spec_encode)
669 except AssertionError as e:
670 warnings.warn(f"Skipping because of error: {e}")
671 skipped_spec += 1
672 continue
674 if min_peaks is not None and spec.num_matching_peaks < min_peaks:
675 warnings.warn(
676 f"Skipping peptide due few peaks being annotated {spec.mod_sequence}"
677 )
678 skipped_spec += 1
679 continue
681 if min_delta_ascore is not None and spec.delta_ascore < min_delta_ascore:
682 warnings.warn(
683 f"Skipping peptide due low ascore '{spec.delta_ascore}' {spec.mod_sequence}"
684 )
685 skipped_spec += 1
686 continue
688 spectra_encodings.append(spec_encode)
689 seq_encodings.append(seq_encode)
690 mod_encodings.append(mod_encode)
691 charges.append(spec.charge)
692 sequences.append(spec.sequence)
693 mod_sequences.append(spec.mod_sequence)
694 rts.append(spec.rt)
695 nces.append(spec.nce)
696 orig.append(spec.raw_spectra)
697 d_ascores.append(spec.delta_ascore)
698 nreps.append(spec.nreps)
700 ret = DataFrame(
701 {
702 "Sequences": sequences,
703 "ModSequences": mod_sequences,
704 "Charges": charges,
705 "NCEs": nces,
706 "RTs": rts,
707 "SpectraEncodings": spectra_encodings,
708 "ModEncodings": mod_encodings,
709 "SeqEncodings": seq_encodings,
710 "OrigSpectra": orig,
711 "DeltaAscore": d_ascores,
712 "Nreps": nreps,
713 }
714 )
716 if irt_fun is not None:
717 raise NotImplementedError
718 else:
719 """
720 warnings.warn(
721 "No calculation function passed for iRT,"
722 " will replace the column with missing"
723 )
724 """
725 ret["iRT"] = np.nan
727 if skipped_spec >= 1:
728 warnings.warn(f"{skipped_spec}/{i} Spectra were skipped")
730 logging.info(list(ret))
731 logging.info(ret)
733 return ret
736def sptxt_to_csv(filepath, output_path, filter_irt_peptides=True, *args, **kwargs):
737 df = encode_sptxt(filepath=filepath, *args, **kwargs)
738 if filter_irt_peptides:
739 df = df[[x not in constants.IRT_PEPTIDES for x in df["Sequences"]]]
740 df.to_csv(output_path, index=False)
743# TODO consider if moving this parser to just use another dependency ... pyteomics ??
744def read_sptxt(filepath: str, *args, **kwargs) -> Iterator[Spectrum]:
745 """
746 read_sptxt reads a spectra library file.
748 reads a spectral library file into a list of spectra objects
750 Parameters
751 ----------
752 filepath : Path or str
753 The path to the spectral library, extension .sptxt
754 *args
755 Passed onto Spectrum
756 **kwargs
757 Passed onto Spectrum
759 Yields
760 ------
761 Spectrum objects
762 """
763 with open(filepath, "r") as f:
764 spectrum_section = []
765 for line in f:
766 if line.startswith("#"):
767 continue
768 stripped_line = line.strip()
769 if len(stripped_line) == 0:
770 if len(spectrum_section) > 0:
771 try:
772 yield _parse_spectra_sptxt(spectrum_section, *args, **kwargs)
773 except AssertionError as e:
774 warnings.warn(f"Skipping spectra with assertion error: {e}")
775 pass
776 spectrum_section = []
777 else:
778 spectrum_section.append(stripped_line)
780 if len(spectrum_section) > 0:
781 try:
782 yield _parse_spectra_sptxt(spectrum_section, *args, **kwargs)
783 except AssertionError as e:
784 warnings.warn(f"Skipping spectra with assertion error: {e}")
787def _parse_spectra_sptxt(
788 x: List[str], instrument: None = None, analyzer: str = "FTMS", *args, **kwargs
789) -> Spectrum:
790 """
791 Parse a single spectra into an object.
793 Meant for internal use
794 """
795 digits = [str(v) for v in range(10)]
797 # Header Handling
798 named_params = [v for v in x if v[:1].isalpha() and ":" in v]
799 named_params_dict = {}
800 for v in named_params:
801 tmp = v.split(":")
802 named_params_dict[tmp[0].strip()] = ":".join(tmp[1:])
804 fragmentation = named_params_dict.get("FullName", None)
805 if fragmentation is not None:
806 fragmentation = fragmentation[fragmentation.index("(") + 1 : -1]
808 comment_sec = [
809 v.split("=") for v in named_params_dict["Comment"].strip().split(" ")
810 ]
811 comment_dict = {v[0]: v[1] for v in comment_sec}
812 sequence, charge = named_params_dict["Name"].split("/")
814 nce = comment_dict.get("CollisionEnergy", None)
815 if nce is not None:
816 nce = float(nce)
818 rt = comment_dict.get("RetentionTime", None)
819 if rt is not None:
820 rt = float(rt.split(",")[0])
822 if comment_dict.get("iRT", None) is not None:
823 warnings.warn(
824 (
825 "Noticed the comment dict has iRT values,"
826 " We have not implemented reading them but will do in the future"
827 ),
828 FutureWarning,
829 )
831 nreps = comment_dict.get("Nreps", None)
832 if nreps is not None:
833 nreps = int(nreps.split("/")[0])
835 raw_spectra = comment_dict.get("RawSpectrum", None) or comment_dict.get(
836 "BestRawSpectrum", None
837 )
839 # Peaks Handling
840 peaks_sec = [v for v in x if v[0] in digits and ("\t" in v or " " in v)]
841 peaks_sec = [l.strip().split() for l in peaks_sec if "." in l]
842 mz = [float(l[0]) for l in peaks_sec]
843 intensity = [float(l[1]) for l in peaks_sec]
845 out_spec = Spectrum(
846 sequence=sequence.strip(),
847 charge=int(charge),
848 parent_mz=float(comment_dict["Parent"]),
849 intensities=intensity,
850 mzs=mz,
851 modifications=comment_dict["Mods"],
852 analyzer=analyzer,
853 instrument=instrument,
854 nce=nce,
855 rt=rt,
856 raw_spectra=raw_spectra,
857 nreps=nreps,
858 *args,
859 **kwargs,
860 )
862 return out_spec