Skip to content
Snippets Groups Projects
Commit 7bb5d8ab authored by Raphael's avatar Raphael
Browse files

minor

parent b94d5ab2
No related branches found
No related tags found
1 merge request!6Develop
__version__ = "0.1a"
import random
from hmmlearn.hmm import GMMHMM, GaussianHMM
from matplotlib import pyplot as plt
from numba import jit
from scipy import linalg
from sklearn.datasets import make_spd_matrix
import numpy as np
def split_trajectories(feature_seq, label_seq, n_classes):
if len(feature_seq) != len(label_seq):
raise ValueError
if len(feature_seq) == 0:
return {}
sequence_length = len(feature_seq)
current_label = label_seq[0]
result = {i: [] for i in range(n_classes)}
current_seq = []
sequence_list = []
for i in range(sequence_length):
if current_label != label_seq[i]:
result[current_label].append(current_seq)
sequence_list.append((current_label, current_seq))
current_label = label_seq[i]
current_seq = []
current_seq.append(feature_seq[i])
result[current_label].append(current_seq)
sequence_list.append((current_label, current_seq))
return result, sequence_list
class GMMHMMClassifier:
def __init__(self, nb_states):
self.n_features = 0
if type(nb_states) is not list:
self.nb_states = np.array([nb_states])
else:
self.nb_states = np.array(nb_states)
self.hmms = []
self.n_classes = len(self.nb_states)
for i, nb_state in enumerate(self.nb_states):
self.hmms.append(GaussianHMM(n_components=nb_state, covariance_type='full'))
self.hmm = GaussianHMM(n_components=sum(self.nb_states), covariance_type='full', init_params='', n_iter=100)
self.predict_dictionary = {}
self.predictor = []
count = 0
for i, ii in enumerate(self.nb_states):
self.predictor.append({})
for j in range(ii):
self.predictor[i][j] = count
self.predict_dictionary[count] = i
count += 1
def fit(self, x, y):
sequences = {i: [] for i in range(self.n_classes)}
self.n_features = x[0].shape[1]
for feature_seq, label_seq in zip(x, y):
split_seq, _ = split_trajectories(feature_seq, label_seq, self.n_classes)
for key in sequences.keys():
sequences[key] += split_seq[key]
for i, seqs in sequences.items():
self.hmms[i].n_features = self.n_features
if sum([np.array(s).size for s in seqs]) > sum(self.hmms[i]._get_n_fit_scalars_per_param().values()):
self.hmms[i].fit(np.concatenate(seqs), list(map(len, seqs)))
for j, value in enumerate(self.hmms[i].transmat_.sum(axis=1)):
if value == 0:
self.hmms[i].transmat_[j][j] = 1.0
self.hmms[i].covars_[j] = make_spd_matrix(self.hmms[i].n_features)
else:
self.hmms[i] = None
predict = []
for feature_seq, label_seq in zip(x, y):
_, sequences_list = split_trajectories(feature_seq, label_seq, self.n_classes)
pred = np.array([])
for label, seq in sequences_list:
if self.hmms[label] is not None:
_, state_sequence = self.hmms[label].decode(np.array(seq), [len(seq)])
pred = np.append(pred, [self.predictor[label][i] for i in state_sequence])
if len(pred) != 0:
predict.append(pred)
start = np.zeros(sum(self.nb_states))
T_mat = np.zeros((sum(self.nb_states), sum(self.nb_states)))
prior = -1
count = np.zeros(sum(self.nb_states))
for pred in predict:
start[int(pred[0])] += 1
for p in pred:
if prior != -1:
T_mat[prior][int(p)] += 1
count[prior] += 1
prior = int(p)
for i in range(sum(self.nb_states)):
for j in range(sum(self.nb_states)):
if T_mat[i][j] > 0:
T_mat[i][j] = T_mat[i][j] / count[i]
self.hmm.startprob_ = start / sum(start)
self.hmm.transmat_ = T_mat
for i, value in enumerate(self.hmm.transmat_.sum(axis=1)):
if value == 0:
self.hmm.transmat_[i][i] = 1.0
means = []
covars = []
for i, model in enumerate(self.hmms):
if self.hmms[i] is not None:
means.append(model.means_)
covars.append(model.covars_)
else:
means.append(np.zeros((self.nb_states[i], x[0].shape[1])))
covars.append(np.stack([make_spd_matrix(x[0].shape[1])
for _ in range(self.nb_states[i])], axis=0))
means = np.concatenate(means)
covars = np.concatenate(covars)
for n, cv in enumerate(covars):
if count[n] <= 3:
covars[n] = np.identity(cv.shape[0])
if not np.allclose(cv, cv.T) or np.any(linalg.eigvalsh(cv) <= 0):
covars[n] += np.identity(cv.shape[0]) * 10 ** -15
self.hmm.means_ = means
self.hmm.covars_ = covars
def predict(self, X):
X_all = np.concatenate(X)
lenghts = [len(x) for x in X]
return np.array([self.predict_dictionary[i] for i in self.hmm.predict(X_all, lenghts)])
def predict_raw(self, X):
X_all = [x for i in X for x in i]
lenghts = [len(x) for x in X]
return self.hmm.predict(X_all, lenghts)
@jit(nopython=True)
def hmm_probabilities(predict, nb_states):
n_states = nb_states.sum()
start = np.zeros(n_states)
T_mat = np.zeros((n_states, n_states))
for pred in predict:
start[int(pred[0])] += 1
prior = int(pred[0])
for p in pred[1:]:
T_mat[prior][int(p)] += 1
prior = int(p)
mat_sum = T_mat.sum(axis=1)
for i in range(n_states):
if mat_sum[i] == 0:
T_mat[i][i] = 1.0
else:
T_mat[i] = T_mat[i] / mat_sum[i]
return start / start.sum(), T_mat
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment