diff --git a/skais/__init__.py b/skais/__init__.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..834dc3a90c4fe62af22a6facecb79e16795e97fb 100644 --- a/skais/__init__.py +++ b/skais/__init__.py @@ -0,0 +1 @@ +__version__ = "0.1a" diff --git a/skais/ais/__init__.py b/skais/ais/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/skais/learn/__init__.py b/skais/learn/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/skais/learn/hmm_gmm_classifier.py b/skais/learn/hmm_gmm_classifier.py new file mode 100644 index 0000000000000000000000000000000000000000..34507ddf98c80a7452ca45894d077a11b9ca1a61 --- /dev/null +++ b/skais/learn/hmm_gmm_classifier.py @@ -0,0 +1,170 @@ +import random + +from hmmlearn.hmm import GMMHMM, GaussianHMM +from matplotlib import pyplot as plt +from numba import jit +from scipy import linalg +from sklearn.datasets import make_spd_matrix +import numpy as np + +def split_trajectories(feature_seq, label_seq, n_classes): + if len(feature_seq) != len(label_seq): + raise ValueError + if len(feature_seq) == 0: + return {} + + sequence_length = len(feature_seq) + current_label = label_seq[0] + result = {i: [] for i in range(n_classes)} + current_seq = [] + sequence_list = [] + for i in range(sequence_length): + if current_label != label_seq[i]: + result[current_label].append(current_seq) + sequence_list.append((current_label, current_seq)) + current_label = label_seq[i] + current_seq = [] + current_seq.append(feature_seq[i]) + + result[current_label].append(current_seq) + sequence_list.append((current_label, current_seq)) + + return result, sequence_list + +class GMMHMMClassifier: + def __init__(self, nb_states): + self.n_features = 0 + + if type(nb_states) is not list: + self.nb_states = np.array([nb_states]) + else: + self.nb_states = np.array(nb_states) + + self.hmms = [] + self.n_classes = len(self.nb_states) + for i, nb_state in enumerate(self.nb_states): + self.hmms.append(GaussianHMM(n_components=nb_state, covariance_type='full')) + + self.hmm = GaussianHMM(n_components=sum(self.nb_states), covariance_type='full', init_params='', n_iter=100) + + self.predict_dictionary = {} + self.predictor = [] + count = 0 + for i, ii in enumerate(self.nb_states): + self.predictor.append({}) + for j in range(ii): + self.predictor[i][j] = count + self.predict_dictionary[count] = i + count += 1 + + def fit(self, x, y): + sequences = {i: [] for i in range(self.n_classes)} + self.n_features = x[0].shape[1] + for feature_seq, label_seq in zip(x, y): + split_seq, _ = split_trajectories(feature_seq, label_seq, self.n_classes) + + for key in sequences.keys(): + sequences[key] += split_seq[key] + + for i, seqs in sequences.items(): + self.hmms[i].n_features = self.n_features + if sum([np.array(s).size for s in seqs]) > sum(self.hmms[i]._get_n_fit_scalars_per_param().values()): + self.hmms[i].fit(np.concatenate(seqs), list(map(len, seqs))) + for j, value in enumerate(self.hmms[i].transmat_.sum(axis=1)): + if value == 0: + self.hmms[i].transmat_[j][j] = 1.0 + self.hmms[i].covars_[j] = make_spd_matrix(self.hmms[i].n_features) + else: + self.hmms[i] = None + + predict = [] + for feature_seq, label_seq in zip(x, y): + _, sequences_list = split_trajectories(feature_seq, label_seq, self.n_classes) + pred = np.array([]) + for label, seq in sequences_list: + if self.hmms[label] is not None: + _, state_sequence = self.hmms[label].decode(np.array(seq), [len(seq)]) + pred = np.append(pred, [self.predictor[label][i] for i in state_sequence]) + if len(pred) != 0: + predict.append(pred) + + start = np.zeros(sum(self.nb_states)) + T_mat = np.zeros((sum(self.nb_states), sum(self.nb_states))) + prior = -1 + count = np.zeros(sum(self.nb_states)) + for pred in predict: + start[int(pred[0])] += 1 + + for p in pred: + if prior != -1: + T_mat[prior][int(p)] += 1 + count[prior] += 1 + prior = int(p) + + for i in range(sum(self.nb_states)): + for j in range(sum(self.nb_states)): + if T_mat[i][j] > 0: + T_mat[i][j] = T_mat[i][j] / count[i] + + self.hmm.startprob_ = start / sum(start) + self.hmm.transmat_ = T_mat + + for i, value in enumerate(self.hmm.transmat_.sum(axis=1)): + if value == 0: + self.hmm.transmat_[i][i] = 1.0 + + means = [] + covars = [] + for i, model in enumerate(self.hmms): + if self.hmms[i] is not None: + means.append(model.means_) + covars.append(model.covars_) + else: + means.append(np.zeros((self.nb_states[i], x[0].shape[1]))) + covars.append(np.stack([make_spd_matrix(x[0].shape[1]) + for _ in range(self.nb_states[i])], axis=0)) + + means = np.concatenate(means) + covars = np.concatenate(covars) + for n, cv in enumerate(covars): + if count[n] <= 3: + covars[n] = np.identity(cv.shape[0]) + if not np.allclose(cv, cv.T) or np.any(linalg.eigvalsh(cv) <= 0): + covars[n] += np.identity(cv.shape[0]) * 10 ** -15 + + self.hmm.means_ = means + self.hmm.covars_ = covars + + def predict(self, X): + X_all = np.concatenate(X) + lenghts = [len(x) for x in X] + + return np.array([self.predict_dictionary[i] for i in self.hmm.predict(X_all, lenghts)]) + + def predict_raw(self, X): + X_all = [x for i in X for x in i] + lenghts = [len(x) for x in X] + + return self.hmm.predict(X_all, lenghts) + +@jit(nopython=True) +def hmm_probabilities(predict, nb_states): + n_states = nb_states.sum() + start = np.zeros(n_states) + T_mat = np.zeros((n_states, n_states)) + for pred in predict: + start[int(pred[0])] += 1 + prior = int(pred[0]) + + for p in pred[1:]: + T_mat[prior][int(p)] += 1 + prior = int(p) + + mat_sum = T_mat.sum(axis=1) + for i in range(n_states): + if mat_sum[i] == 0: + T_mat[i][i] = 1.0 + else: + T_mat[i] = T_mat[i] / mat_sum[i] + + return start / start.sum(), T_mat