from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from Bio.Align import MultipleSeqAlignment
import numpy as np
import sklearn.metrics as sn
from .__params import lett2num
from joblib import Parallel, delayed
def _clean_msa(msa):
"""
This function compares the amino acid codes in the sequence alignment with
the ones in lett2num and removes unknown amino acids (such as 'X' or 'B')
when importing the multiple sequence alignment.
Parameters
----------
msa : bioalign object
"""
for index, record in enumerate(msa):
for char in record.seq:
if char not in lett2num.keys():
sequence = list(record.seq)
sequence[record.seq.index(char)] = '-'
sequence = "".join(sequence)
msa[index].seq = Seq(sequence)
return msa
[docs]
def filter_sequences(sequences, sequences_id,
gap_threshold=0.4, seq_threshold=0.2,
verbose=False):
"""
Filter sequences
Remove (1) overly gapped positions; (2) overly gapped sequences.
Parameters
----------
sequences : list of MSA sequences to filter
sequences_id : list of the MSA's sequence identifiers
gap_threshold : float,
maximum proportion of gaps tolerated per position (default=0.4)
seq_threshold : float,
maximum proportion of gaps tolerated per sequence (default=0.2)
Returns
-------
filtered_seqs : list of the remaining sequences (written as strings)
after applying the filters
filtered_seqs_id : list of sequence identifiers that were kept after
applying the filters
remaining_pos : numpy.ndarray
remaining positions after filtering
"""
updated_sequences, remaining_pos = _filter_gap_pos(
sequences, threshold=gap_threshold,
verbose=verbose)
filtered_seqs, filtered_seqs_id, = _filter_gap_seq(
updated_sequences, sequences_id,
threshold=seq_threshold, verbose=verbose)
return filtered_seqs, filtered_seqs_id, remaining_pos
def _filter_gap_pos(sequences, threshold=0.4, verbose=False):
"""Filter the sequences for overly gapped positions.
Parameters
----------
sequences : list of the MSA sequences to filter
threshold : max proportion of gaps tolerated (default=0.4)
Returns
-------
updated_seqs : updated_list of sequences with filtered gaps
remaining_pos : numpy.ndarray
remaining positions after filtering
"""
if verbose:
print("Filter MSA for overly gapped positions")
Nseq, Npos = len(sequences), len(sequences[0])
gaps = np.array([[int(sequences[seq][pos] == '-') for pos in range(Npos)]
for seq in range(Nseq)])
freq_gap_per_pos = np.sum(gaps, axis=0) / Nseq
remaining_pos = np.where(freq_gap_per_pos <= threshold)[0]
if verbose:
print("Keeping %i out of %i positions" % (len(remaining_pos), Npos))
updated_seqs = ["".join([sequences[seq][pos] for pos in remaining_pos])
for seq in range(Nseq)]
return updated_seqs, remaining_pos
def _filter_gap_seq(sequences, sequences_id, threshold=0.2, verbose=False):
"""
Remove sequences with a fraction of gaps greater than a specified
value.
Parameters
----------
sequences : list of MSA sequences
sequences_id : list of the MSA's sequence identifiers
threshold : maximum fraction of gaps per sequence (default 0.2)
Returns
-------
filt_seqs : filtered list of sequences
filt_seqs_id : corresponding list of sequence identifiers
"""
if verbose:
print('Filter MSA for overly gapped sequences')
Nseq, Npos = len(sequences), len(sequences[0])
freq_gap_per_seq = np.array([sequences[seq].count('-') / Npos
for seq in range(Nseq)])
filt_seqs_ix = np.where(freq_gap_per_seq <= threshold)[0]
if verbose:
print('Keeping %i sequences out of %i sequences' %
(len(filt_seqs_ix), Nseq))
filt_seqs = [sequences[seq] for seq in filt_seqs_ix]
filt_seqs_id = [sequences_id[seq] for seq in filt_seqs_ix]
return filt_seqs, filt_seqs_id
[docs]
def filter_ref_seq(sequences, sequences_id, delta=0.2, refseq_id=None,
verbose=False):
'''
Filter the alignment based on identity with a reference sequence
Remove sequences *r* with Sr < delta, where Sr is the fractional identity
between the sequence *r* and a specified reference sequence.
Parameters
----------
sequences : list of sequences in the MSA
sequences_id : list of sequence identifiers in the MSA
delta : identity threshold (default=0.2)
refseq_id : identifier of the reference sequence, if 'None', a reference
sequence is choosen as the sequence that has the mean pairwise
sequence identity closest to that of the entire sequence
alignment (default 'None')
Returns
-------
filt_seqs : filtered list of sequences
filt_seqs_id : corresponding list of sequence identifiers
'''
Nseq = len(sequences)
if refseq_id is None:
if verbose:
print('Choose a default reference sequence within the alignment')
refseq_idx = _choose_ref_seq(sequences)
else:
if verbose:
print('Reference sequence is: %i' % refseq_id)
refseq_idx = sequences_id.index(refseq_id)
sim_matrix = compute_seq_identity(sequences)
filt_seqs_ix = np.where(sim_matrix[refseq_idx] >= delta)[0]
filt_seqs = [sequences[seq] for seq in filt_seqs_ix]
filt_seqs_id = [sequences_id[seq] for seq in filt_seqs_ix]
if verbose:
print('Keeping %i out of %i sequences' % (len(filt_seqs), Nseq))
return filt_seqs, filt_seqs_id
def _choose_ref_seq(msa):
"""
Determine a reference sequence for the alignment
This function chooses a default reference sequence for the alignment by
taking the sequence which has the mean pairwise sequence identity closest
to that of the entire sequence alignment.
Parameters
----------
msa : the multiple sequence alignment as a list of sequences
Returns
-------
The index of the reference sequence in the given alignment
"""
sim_matrix = compute_seq_identity(msa)
mean_pairwise_seq_sim = np.mean(sim_matrix, axis=0)
ref_seq = np.argmin(mean_pairwise_seq_sim)
return ref_seq
[docs]
def filter_seq_id(sequences, sequences_id, list_id):
"""
Filter sequences based on list
Filter a multiple sequence alignment to keep only sequences whose
identifiers are in a user provided list.
Parameters
----------
sequences : list of MSA sequences
sequences_id : list of the MSA's sequence identifiers
list_id : list of sequence identifiers the user wants to keep. The
identifiers must be in the same format as in the input MSA
Returns
-------
new_msa : Bio.Align.MultipleSeqAlignment object,
filtered msa
id_list : list of sequence ID in the filtered MSA
seq_list : list of sequences of the filtered MSA
"""
new_msa = MultipleSeqAlignment([])
for ident in sequences_id:
if ident in list_id:
new_record = SeqRecord(Seq(sequences[sequences_id.index(ident)]),
id=ident)
new_msa.append(new_record)
seq_list = []
id_list = []
for record in new_msa:
seq_list.append(str(record.seq))
id_list.append(record.id)
seq_list = np.array(seq_list)
return [new_msa, id_list, seq_list]
[docs]
def map_to_pdb(pdb_seq, pdb_pos, sequences, sequences_id, pdb_seq_id):
"""
Mapping of the unfiltered MSA positions on a PDB structure.
Parameters
----------
pdb_seq: str,
amino acid sequence of the reference PDB file
pdb_pos: list,
Residue positions as found in the PDB file
sequences: list,
List of sequences of the unfiltered MSA
sequences_id: list,
List of sequence identifiers in the unfiltered MSA
pdb_seq_id: str,
identifier of the sequence the positions are mapped onto. Should be
included in sequences_id.
Returns
-------
mapping: numpy.ndarray of shape (3, len(pdb_seq)),
the first element is an array of the residues found in the PDB sequence
the second element is an array of the PDB position of each amino acid
the third element is an array of the positions of those same amino
acids in the unfiltered MSA
"""
msa_pos = []
pdb_seq_idx = sequences_id.index(pdb_seq_id)
for aa_index in range(len(sequences[pdb_seq_idx])):
if sequences[pdb_seq_idx][aa_index] != '-':
msa_pos.append(aa_index)
mapping = np.array((list(pdb_seq), pdb_pos, msa_pos))
return mapping
[docs]
def compute_seq_identity(sequences):
"""
Computes the identity between sequences in a MSA (as Hamming's pairwise
distance)
Parameters
----------
sequences : list of sequences
Returns
-------
sim_matrix : identity matrix of shape (Nseq, Nseq)
"""
separated_aa = np.array([[lett2num[char] for char in row]
for row in sequences])
sim_matrix = 1 - sn.DistanceMetric.get_metric(
"hamming").pairwise(separated_aa)
return sim_matrix
[docs]
def compute_seq_weights(sequences, threshold=0.8, verbose_every=0,
n_jobs=1, verbose_parallel=5):
"""
Compute sequence weights
Each sequence s is given a weight ws = 1/Ns where Ns is the number of
sequences with an identity to s above a specified threshold.
Parameters
----------
sequences : list of sequences
threshold : float, optional, default: 0.8
percentage identity above which the sequences are considered identical
(default=0.8)
verbose_every : int
if > 0, verbose every {verbose_every} sequences
n_jobs : int, default=1 (no parallelization)
the maximum number of concurrently running jobs
(see joblib doc)
verbose_parallel : int
verbosity level for parallelization
(see joblib doc)
Returns
-------
weights : np.array (nseq, ) of each sequence weight
m_eff : float
number of effective sequences
"""
if threshold < 0 or threshold > 1:
raise ValueError(
"The threshold needs to be between 0 and 1." +
f" Value provided {threshold}")
sequences_num = np.array([[lett2num[char] for char in row]
for row in sequences])
if n_jobs == 1:
seq_weights = []
for iseq, seq in enumerate(sequences_num):
if iseq % 100 == 0:
print('computing weight of seq %d/%d\t' %
(iseq+1, len(sequences_num)), end='\r')
sim = 1 - sn.DistanceMetric.get_metric(
"hamming").pairwise([seq], sequences_num)
seq_weights.append(1 / np.sum(sim >= threshold))
else:
def _weight_f(sequence, sequence_list):
return 1 / np.sum(1 - sn.DistanceMetric.get_metric(
"hamming").pairwise([sequence], sequence_list) >= threshold)
seq_weights = Parallel(n_jobs=n_jobs, verbose=verbose_parallel)(
delayed(_weight_f)(seq, sequences_num) for seq in sequences_num)
seq_weights = np.array(seq_weights)
m_eff = sum(seq_weights)
return seq_weights, m_eff
[docs]
def map_msa_positions(n_loaded_pos, remaining_pos):
"""
Maps positions between the original and the filtered MSA
Parameters
----------
n_loaded_pos : int,
Number of positions in the original unfiltered MSA
remaining_pos : np.ndarray,
array containing the indexes of positions that have been conserved
after filtering the MSA (output from cocoatree.msa.filter_sequences)
Returns
-------
original2filtered : dictionnary,
the keys are the positions in the original MSA and the values are the
corresponding positions in the filtered MSA. When the original position
has been filtered, the value is set to 'None'.
filtered2original : dictionnary,
the keys are the positions in the filtered MSA and the values are the
corresponding positions in the original MSA.
"""
mapping = [
int(val) if f else None
for f, val in zip(
np.isin(np.arange(n_loaded_pos), remaining_pos),
np.isin(np.arange(n_loaded_pos), remaining_pos).cumsum()-1)]
original2filtered = {
i: t for i, t
in enumerate(mapping)}
filtered2original = dict()
for pos in range(len(remaining_pos)):
filtered2original[pos] = int(remaining_pos[pos])
return original2filtered, filtered2original