Source code for giddy.sequence

"""
Sequence analysis methods.
"""

__author__ = "Wei Kang <weikang9009@gmail.com>, Sergio J. Rey <sjsrey@gmail.com>"

__all__ = ["Sequence"]

import itertools

import numpy as np
import scipy.spatial.distance as d

from .markov import Markov


[docs] class Sequence: """ Pairwise sequence analysis. Dynamic programming if optimal matching. Parameters ---------- y : array one row per sequence of neighborhood types for each spatial unit. Sequences could be of varying lengths. subs_mat : array (k,k), substitution cost matrix. Should be hollow ( 0 cost between the same type), symmetric and non-negative. dist_type : string "hamming": hamming distance (substitution only and its cost is constant 1) from sklearn.metrics; "markov": utilize empirical transition probabilities to define substitution costs; "interval": differences between states are used to define substitution costs, and indel=k-1; "arbitrary": arbitrary distance if there is not a strong theory guidance: substitution=0.5, indel=1. "tran": transition-oriented optimal matching. Sequence of transitions. Based on :cite:`Biemann:2011`. indel : float insertion/deletion cost. cluster_type : string cluster algorithm (specification) used to generate neighborhood types, such as "ward", "kmeans", etc. Attributes ---------- seq_dis_mat : array (n,n), distance/dissimilarity matrix for each pair of sequences classes : array (k, ), unique classes k : int number of unique classes label_dict : dict dictionary - {input label: int value between 0 and k-1 (k is the number of unique classes for the pooled data)} Examples -------- >>> import numpy as np 1. Testing on unequal string sequences 1.1 substitution cost matrix and indel cost are not given, and will be generated based on the distance type "interval" >>> seq1 = 'ACGGTAG' >>> seq2 = 'CCTAAG' >>> seq3 = 'CCTAAGC' >>> seqAna = Sequence([seq1,seq2,seq3],dist_type="interval") >>> seqAna.k 4 >>> seqAna.classes array(['A', 'C', 'G', 'T'], dtype='<U1') >>> seqAna.subs_mat array([[0., 1., 2., 3.], [1., 0., 1., 2.], [2., 1., 0., 1.], [3., 2., 1., 0.]]) >>> seqAna.seq_dis_mat array([[ 0., 7., 10.], [ 7., 0., 3.], [10., 3., 0.]]) 1.2 User-defined substitution cost matrix and indel cost >>> subs_mat = np.array( ... [ ... [0, 0.76, 0.29, 0.05], ... [0.30, 0, 0.40, 0.60], ... [0.16, 0.61, 0, 0.26], ... [0.38, 0.20, 0.12, 0] ... ] ... ) >>> indel = subs_mat.max() >>> seqAna = Sequence([seq1,seq2,seq3], subs_mat=subs_mat, indel=indel) >>> seqAna.seq_dis_mat array([[0. , 1.94, 2.46], [1.94, 0. , 0.76], [2.46, 0.76, 0. ]]) 1.3 Calculating "hamming" distance will fail on unequal sequences >>> seqAna = Sequence([seq1,seq2,seq3], dist_type="hamming") Traceback (most recent call last): ValueError: hamming distance cannot be calculated for sequences of unequal lengths! 2. Testing on equal string sequences >>> seq1 = 'ACGGTAG' >>> seq2 = 'CCTAAGA' >>> seq3 = 'CCTAAGC' 2.1 Calculating "hamming" distance >>> seqAna = Sequence([seq1,seq2,seq3], dist_type="hamming") >>> seqAna.seq_dis_mat array([[0., 6., 6.], [6., 0., 1.], [6., 1., 0.]]) 2.2 User-defined substitution cost matrix and indel cost (distance between different types is always 1 and indel cost is 2) - give the same sequence distance matrix as "hamming" distance >>> subs_mat = np.array( ... [ ... [0., 1., 1., 1.], ... [1., 0., 1., 1.], ... [1., 1., 0., 1.], ... [1., 1., 1., 0.] ... ] ... ) >>> indel = 2 >>> seqAna = Sequence([seq1,seq2,seq3], subs_mat=subs_mat, indel=indel) >>> seqAna.seq_dis_mat array([[0., 6., 6.], [6., 0., 1.], [6., 1., 0.]]) 2.3 User-defined substitution cost matrix and indel cost (distance between different types is always 1 and indel cost is 1) - give a slightly different sequence distance matrix from "hamming" distance since insertion and deletion is happening >>> subs_mat = np.array( ... [ ... [0., 1., 1., 1.], ... [1., 0., 1., 1.], ... [1., 1., 0., 1.], ... [1., 1., 1., 0.] ... ] ... ) >>> indel = 1 >>> seqAna = Sequence([seq1,seq2,seq3], subs_mat=subs_mat, indel=indel) >>> seqAna.seq_dis_mat array([[0., 5., 5.], [5., 0., 1.], [5., 1., 0.]]) 3. Not passing proper parameters will raise an error >>> seqAna = Sequence([seq1,seq2,seq3]) Traceback (most recent call last): ValueError: Please specify a proper `dist_type` or `subs_mat` and `indel` to proceed! >>> seqAna = Sequence([seq1,seq2,seq3], subs_mat=subs_mat) Traceback (most recent call last): ValueError: Please specify a proper `dist_type` or `subs_mat` and `indel` to proceed! >>> seqAna = Sequence([seq1,seq2,seq3], indel=indel) Traceback (most recent call last): ValueError: Please specify a proper `dist_type` or `subs_mat` and `indel` to proceed! """ # noqa E501
[docs] def __init__(self, y, subs_mat=None, dist_type=None, indel=None, cluster_type=None): y = np.asarray(y) merged = list(itertools.chain.from_iterable(y)) self.classes = np.unique(merged) self.k = len(self.classes) self.n = len(y) self.indel = indel self.subs_mat = subs_mat self.cluster_type = cluster_type self.label_dict = dict(zip(self.classes, range(self.k))) y_int = [] for yi in y: y_int.append(list(map(self.label_dict.get, yi))) y_int = np.array(y_int, dtype=object) if subs_mat is None or indel is None: if dist_type is None: raise ValueError( "Please specify a proper `dist_type` or " "`subs_mat` and `indel` to proceed!" ) else: if dist_type.lower() == "interval": self.indel = self.k - 1 self.subs_mat = np.zeros((self.k, self.k)) for i in range(0, self.k - 1): for j in range(i + 1, self.k): self.subs_mat[i, j] = j - i self.subs_mat[j, i] = j - i self._om_dist(y_int) elif dist_type.lower() == "hamming": if len(y_int.shape) != 2: raise ValueError( "hamming distance cannot be calculated for " "sequences of unequal lengths!" ) hamming_dist = ( d.pdist(y_int.astype(int), metric="hamming") * y_int.shape[1] ) self.seq_dis_mat = d.squareform(hamming_dist) elif dist_type.lower() == "arbitrary": self.indel = 1 mat = np.ones((self.k, self.k)) * 0.5 np.fill_diagonal(mat, 0) self.subs_mat = mat self._om_dist(y_int) elif dist_type.lower() == "markov": p = Markov(y_int).p self.indel = 1 mat = (2 - (p + p.T)) / 2 np.fill_diagonal(mat, 0) self.subs_mat = mat self._om_dist(y_int) elif dist_type.lower() == "tran": # sequence of transitions self.indel = 2 y_uni = np.unique(y_int) dict_trans_state = {} trans_list = [] for i, tran in enumerate(itertools.product([-1], y_uni)): trans_list.append(tran) dict_trans_state[tran] = i for i, tran in enumerate(itertools.product(y_uni, y_uni)): trans_list.append(tran) dict_trans_state[tran] = i + len(y_uni) subs_mat = np.ones((self.k * (self.k + 1), self.k * (self.k + 1))) np.fill_diagonal(subs_mat, 0) for row in range(self.k**2): row_index = row + self.k row_tran = trans_list[row_index] for col in range(self.k**2): col_index = col + self.k col_tran = trans_list[col_index] subs_mat[row_index, col_index] = abs( int(row_tran[0] == row_tran[1]) - int(col_tran[0] == col_tran[1]) ) self.dict_trans_state = dict_trans_state self.subs_mat = subs_mat # Transform sequences of states into sequences of transitions. y_int_ext = np.insert(y_int, 0, -1, axis=1) y_tran_index = np.zeros_like(y_int) y_tran = [] for i in range(y_int.shape[1]): y_tran.append(list(zip(y_int_ext[:, i], y_int_ext[:, i + 1]))) for i in range(y_int.shape[0]): for j in range(y_int.shape[1]): y_tran_index[i, j] = dict_trans_state[y_tran[j][i]] self._om_dist(y_tran_index) else: self._om_dist(y_int)
def _om_pair_dist(self, seq1, seq2): """ Method for calculating the optimal matching distance between a pair of sequences given a substitution cost matrix and an indel cost. Arguments --------- seq1 : array (t1, ), the first sequence seq2 : array (t2, ), the second sequence Returns ------- D : array (t2+1, t1+1), score matrix: D[i+1,j+1] is the best score for aligning the substring, seq1[0:j] and seq2[0:i], and D[t2+1, t1+1] (or D[-1,-1]) is the global optimal score. """ t1 = len(seq1) t2 = len(seq2) D = np.zeros((t2 + 1, t1 + 1)) for j in range(1, t1 + 1): D[0, j] = self.indel * j for i in range(1, t2 + 1): D[i, 0] = self.indel * i for i in range(1, t2 + 1): for j in range(1, t1 + 1): gaps = D[i, j - 1] + self.indel gapt = D[i - 1, j] + self.indel match = D[i - 1, j - 1] + self.subs_mat[seq1[j - 1], seq2[i - 1]] D[i, j] = min(match, gaps, gapt) return D def _om_dist(self, y_int): """ Method for calculating optimal matching distances between all sequence pairs. Arguments --------- y_int : array Encoded longitudinal data ready for optimal matching. Note ---- This method is optimized to calculate the distance between unique sequences only in order to save computation time. """ y_str = [] for i in y_int: y_str.append("".join(list(map(str, i)))) moves_str, unique_indices = np.unique(y_str, axis=0, return_index=True) moves_int = y_int[unique_indices] uni_num = len(moves_str) dict_move_index = dict(zip(map(tuple, moves_int), range(uni_num))) # dict_move_index = dict(zip(map(tuple, moves_str), range(uni_num))) # y_uni = moves_int uni_seq_dis_mat = np.zeros((uni_num, uni_num)) for pair in itertools.combinations(range(uni_num), 2): seq1 = moves_int[pair[0]] seq2 = moves_int[pair[1]] uni_seq_dis_mat[pair[0], pair[1]] = self._om_pair_dist(seq1, seq2)[-1, -1] uni_seq_dis_mat = uni_seq_dis_mat + uni_seq_dis_mat.transpose() seq_dis_mat = np.zeros((self.n, self.n)) for pair in itertools.combinations(range(self.n), 2): seq1 = y_int[pair[0]] seq2 = y_int[pair[1]] seq_dis_mat[pair[0], pair[1]] = uni_seq_dis_mat[ dict_move_index[tuple(seq1)], dict_move_index[tuple(seq2)] ] self.seq_dis_mat = seq_dis_mat + seq_dis_mat.transpose()