From 35d0e4a02efdf6ea37711c759890d93aa6389d36 Mon Sep 17 00:00:00 2001 From: Raphael <raphael.sturgis@gmail.com> Date: Fri, 9 Sep 2022 09:05:09 +0200 Subject: [PATCH] bug fix hmm --- skais/learn/hmm_gmm_classifier.py | 178 ++++++++++++++++++------------ 1 file changed, 107 insertions(+), 71 deletions(-) diff --git a/skais/learn/hmm_gmm_classifier.py b/skais/learn/hmm_gmm_classifier.py index 953041d..95e4a51 100644 --- a/skais/learn/hmm_gmm_classifier.py +++ b/skais/learn/hmm_gmm_classifier.py @@ -30,8 +30,98 @@ def split_trajectories(feature_seq, label_seq, n_classes): return result, sequence_list + +def get_sequences(x, y, n_classes): + sequences = {i: [] for i in range(n_classes)} + for feature_seq, label_seq in zip(x, y): + split_seq, _ = split_trajectories(feature_seq, label_seq, n_classes) + + for key in sequences.keys(): + sequences[key] += split_seq[key] + return sequences + + +def fit_hmmms(self, sequences): + for i, seqs in sequences.items(): + self.hmms[i].n_features = self.n_features + if sum([np.array(s).size for s in seqs]) > sum(self.hmms[i]._get_n_fit_scalars_per_param().values()): + self.hmms[i].fit(np.concatenate(seqs), list(map(len, seqs))) + for j, value in enumerate(self.hmms[i].transmat_.sum(axis=1)): + if value == 0: + self.hmms[i].transmat_[j][j] = 1.0 + self.hmms[i].covars_[j] = make_spd_matrix(self.hmms[i].n_features) + else: + self.hmms[i] = None + + +def get_predictions(x, y, hmms, predictor, n_classes): + predict = [] + for feature_seq, label_seq in zip(x, y): + _, sequences_list = split_trajectories(feature_seq, label_seq, n_classes) + pred = np.array([]) + for label, seq in sequences_list: + if hmms[label] is not None: + _, state_sequence = hmms[label].decode(np.array(seq), [len(seq)]) + pred = np.append(pred, [predictor[label][i] for i in state_sequence]) + if len(pred) != 0: + predict.append(pred) + return predict + + +def get_new_hmm_values(nb_states, predict): + start = np.zeros(sum(nb_states)) + T_mat = np.zeros((sum(nb_states), sum(nb_states))) + prior = -1 + count = np.zeros(sum(nb_states)) + for pred in predict: + start[int(pred[0])] += 1 + + for p in pred: + if prior != -1: + T_mat[prior][int(p)] += 1 + count[prior] += 1 + prior = int(p) + + for i in range(sum(nb_states)): + for j in range(sum(nb_states)): + if T_mat[i][j] > 0: + T_mat[i][j] = T_mat[i][j] / count[i] + + for i, value in enumerate(T_mat.sum(axis=1)): + if value == 0: + T_mat[i][i] = 1.0 + return start, T_mat, count + + +def get_means_and_covars(hmms, nb_states, nb_features, degens): + means = [] + covars = [] + for i, model in enumerate(hmms): + if hmms[i] is not None: + means.append(model.means_) + covars.append(model.covars_) + else: + means.append(np.zeros((nb_states[i], nb_features))) + covars.append(np.stack([make_spd_matrix(nb_features) + for _ in range(nb_states[i])], axis=0)) + + means = np.concatenate(means) + covars = np.concatenate(covars) + for n, cv in enumerate(covars): + if degens[n] and np.any(linalg.eigvalsh(cv) > 0): + covars[n] = np.identity(cv.shape[0]) + limit = 0 + while (not np.allclose(cv, cv.T) or np.any(linalg.eigvalsh(cv) <= 0)): + covars[n] += np.identity(cv.shape[0]) * 10 ** -15 + if limit > 100: + covars[n] = np.identity(cv.shape[0]) + break + + return means, covars + + class GMMHMMClassifier: - def __init__(self, nb_states): + def __init__(self, nb_states, max_iter=100,verbose=False): self.n_features = 0 if type(nb_states) is not list: @@ -39,12 +129,13 @@ class GMMHMMClassifier: else: self.nb_states = np.array(nb_states) + self.degen_ = [False for _ in range(sum(self.nb_states))] self.hmms = [] self.n_classes = len(self.nb_states) for i, nb_state in enumerate(self.nb_states): - self.hmms.append(GaussianHMM(n_components=nb_state, covariance_type='full')) + self.hmms.append(GaussianHMM(n_components=nb_state, covariance_type='full', verbose=verbose, n_iter=max_iter)) - self.hmm = GaussianHMM(n_components=sum(self.nb_states), covariance_type='full', init_params='', n_iter=100) + self.hmm = GaussianHMM(n_components=sum(self.nb_states), covariance_type='full', init_params='', n_iter=max_iter) self.predict_dictionary = {} self.predictor = [] @@ -57,79 +148,19 @@ class GMMHMMClassifier: count += 1 def fit(self, x, y): - sequences = {i: [] for i in range(self.n_classes)} self.n_features = x[0].shape[1] - for feature_seq, label_seq in zip(x, y): - split_seq, _ = split_trajectories(feature_seq, label_seq, self.n_classes) - - for key in sequences.keys(): - sequences[key] += split_seq[key] - - for i, seqs in sequences.items(): - self.hmms[i].n_features = self.n_features - if sum([np.array(s).size for s in seqs]) > sum(self.hmms[i]._get_n_fit_scalars_per_param().values()): - self.hmms[i].fit(np.concatenate(seqs), list(map(len, seqs))) - for j, value in enumerate(self.hmms[i].transmat_.sum(axis=1)): - if value == 0: - self.hmms[i].transmat_[j][j] = 1.0 - self.hmms[i].covars_[j] = make_spd_matrix(self.hmms[i].n_features) - else: - self.hmms[i] = None - - predict = [] - for feature_seq, label_seq in zip(x, y): - _, sequences_list = split_trajectories(feature_seq, label_seq, self.n_classes) - pred = np.array([]) - for label, seq in sequences_list: - if self.hmms[label] is not None: - _, state_sequence = self.hmms[label].decode(np.array(seq), [len(seq)]) - pred = np.append(pred, [self.predictor[label][i] for i in state_sequence]) - if len(pred) != 0: - predict.append(pred) - - start = np.zeros(sum(self.nb_states)) - T_mat = np.zeros((sum(self.nb_states), sum(self.nb_states))) - prior = -1 - count = np.zeros(sum(self.nb_states)) - for pred in predict: - start[int(pred[0])] += 1 - - for p in pred: - if prior != -1: - T_mat[prior][int(p)] += 1 - count[prior] += 1 - prior = int(p) - - for i in range(sum(self.nb_states)): - for j in range(sum(self.nb_states)): - if T_mat[i][j] > 0: - T_mat[i][j] = T_mat[i][j] / count[i] + + sequences = get_sequences(x, y, self.n_classes) + fit_hmmms(self, sequences) + predict = get_predictions(x, y, self.hmms, self.predictor, self.n_classes) + start, T_mat, count = get_new_hmm_values(self.nb_states, predict) + + self.get_degens(count) self.hmm.startprob_ = start / sum(start) self.hmm.transmat_ = T_mat - for i, value in enumerate(self.hmm.transmat_.sum(axis=1)): - if value == 0: - self.hmm.transmat_[i][i] = 1.0 - - means = [] - covars = [] - for i, model in enumerate(self.hmms): - if self.hmms[i] is not None: - means.append(model.means_) - covars.append(model.covars_) - else: - means.append(np.zeros((self.nb_states[i], x[0].shape[1]))) - covars.append(np.stack([make_spd_matrix(x[0].shape[1]) - for _ in range(self.nb_states[i])], axis=0)) - - means = np.concatenate(means) - covars = np.concatenate(covars) - for n, cv in enumerate(covars): - if count[n] <= 3: - covars[n] = np.identity(cv.shape[0]) - if not np.allclose(cv, cv.T) or np.any(linalg.eigvalsh(cv) <= 0): - covars[n] += np.identity(cv.shape[0]) * 10 ** -15 + means, covars = get_means_and_covars(self.hmms, self.nb_states, self.n_features, self.degen_) self.hmm.means_ = means self.hmm.covars_ = covars @@ -146,6 +177,11 @@ class GMMHMMClassifier: return self.hmm.predict(X_all, lenghts) + def get_degens(self, count): + for i, c in enumerate(count): + if c < self.n_features: + self.degen_[i] = True + @jit(nopython=True) def hmm_probabilities(predict, nb_states): n_states = nb_states.sum() -- GitLab