Skip to content
Snippets Groups Projects
Commit 0891d733 authored by Raphael Sturgis's avatar Raphael Sturgis
Browse files

Merge branch 'develop' into 'main'

Develop

See merge request !6
parents 041a6ff6 2bf2710c
No related branches found
No related tags found
2 merge requests!15Resolve "normalisation should raise exception when bad arguments given",!6Develop
Showing
with 1116 additions and 930 deletions
.idea/*
build/
skais.egg-info/
*.coverage
*__pycache__*
......@@ -10,7 +10,7 @@ tests:
script:
- pip install -r requirements.txt
- pip install --no-deps .
- pytest --junitxml=report.xml
- pytest --junitxml=report.xml --cov=src -vvv
artifacts:
when: always
reports:
......
skais
=====
[![coverage report](https://gitlab.lis-lab.fr/raphael.sturgis/skais/badges/develop/coverage.svg)](https://gitlab.lis-lab.fr/raphael.sturgis/skais/-/commits/develop)
[![pipeline status](https://gitlab.lis-lab.fr/raphael.sturgis/skais/badges/develop/pipeline.svg)](https://gitlab.lis-lab.fr/raphael.sturgis/skais/-/commits/develop)
``skais`` is a Python package for Raphael Sturgis' PhD at LIS and Searoutes.
Install
......
......@@ -3,4 +3,5 @@ setuptools~=57.0.0
numpy~=1.19.5
numba~=0.53.1
scipy~=1.5.4
POT~=0.7.0
\ No newline at end of file
hmmlearn~=0.2.6
scikit-learn~=1.0.1
\ No newline at end of file
......@@ -69,7 +69,7 @@ def setup_package():
VERSION = get_version()
here = os.path.abspath(os.path.dirname(__file__))
with open(os.path.join(here, 'README.rst'), encoding='utf-8') as f:
with open(os.path.join(here, 'README.md'), encoding='utf-8') as f:
long_description = f.read()
mod_dir = NAME
......
__version__ = "0.1a"
import pickle
from datetime import datetime
import pandas as pd
import numpy as np
from numba import jit
import pandas as pd
from scipy.stats import stats
from skais.ais.ais_trajectory import AISTrajectory
def compute_trajectories(df, time_gap, min_size=50, size_limit=500, interpolation_time=None):
n_sample = len(df.index)
result = []
work_df = df.copy()
index = 0
while index < n_sample:
i = compute_trajectory(df['ts_sec'][index:].to_numpy(), time_gap, size_limit)
trajectory = AISTrajectory(work_df[:i], interpolation_time=interpolation_time)
if len(trajectory.df.index) > min_size:
result.append(trajectory)
work_df = work_df[i:]
index += i
return result
@jit(nopython=True)
def compute_trajectory(times, time_gap, size_limit):
n_samples = len(times)
previous_date = times[0]
i = 0
for i in range(size_limit):
if i >= n_samples or ((times[i] - previous_date) / 60 > time_gap):
return i
previous_date = times[i]
return i + 1
# def compute_trajectories(df, time_gap, min_size=50, size_limit=500, interpolation_time=None):
# n_sample = len(df.index)
# result = []
# work_df = df.copy()
#
# index = 0
# while index < n_sample:
# i = compute_trajectory(df['ts_sec'][index:].to_numpy(), time_gap, size_limit)
# trajectory = AISTrajectory(work_df[:i], interpolation_time=interpolation_time)
# if len(trajectory.df.index) > min_size:
# result.append(trajectory)
# work_df = work_df[i:]
# index += i
#
# return result
#
#
# @jit(nopython=True)
# def compute_trajectory(times, time_gap, size_limit):
# n_samples = len(times)
#
# previous_date = times[0]
#
# i = 0
# for i in range(size_limit):
# if i >= n_samples or ((times[i] - previous_date) / 60 > time_gap):
# return i
# previous_date = times[i]
#
# return i + 1
class AISPoints:
......@@ -49,18 +43,33 @@ class AISPoints:
self.df = df
@staticmethod
def load_from_csv(file_name):
df = pd.read_csv(file_name)
ais_points = AISPoints(df)
return ais_points
def describe(self):
description = {
"nb vessels": len(self.df.mmsi.unique()),
"nb points": len(self.df.index),
"average speed": self.df['sog'].mean(),
"average diff": self.df['diff'].mean()
}
for n in np.sort(self.df['label'].unique()):
description[f"labeled {n}"] = len(self.df[self.df['label'] == n].index)
return description
# cleaning functions
def remove_outliers(self, features, rank=4):
if rank <= 0:
raise ValueError(f"Rank is equal to {rank}, must be positive and superior to 0")
for feature in features:
self.df = self.df.drop(self.df[(np.abs(stats.zscore(self.df[feature])) > rank)].index)
def clean_angles(self):
self.df = self.df[self.df["cog"] <= 360]
self.df = self.df[self.df["cog"] >= 0]
self.df = self.df[self.df["heading"] <= 360]
self.df = self.df[self.df["heading"] >= 0]
def normalize(self, features, normalization_type="min-max"):
normalization_dict = {}
if normalization_type == "min-max":
......@@ -92,115 +101,12 @@ class AISPoints:
f"standardization]")
return normalization_type, normalization_dict
def histogram(self, features, bins=10, ranges=None, label=None, y_field='label'):
if label is not None:
tmp = self.df[self.df[y_field] == label]
else:
tmp = self.df
dat = tmp[features]
h = np.histogramdd(dat.to_numpy(), bins, ranges)[0]
if h.sum() == 0:
return np.full(h.shape, 1 / h.size)
else:
return h / h.sum()
def disjointed_histogram(self, features, bins, ranges, label=None, y_field='label'):
if label is not None:
tmp = self.df[self.df[y_field] == label]
else:
tmp = self.df
if type(bins) == int:
bins = [bins for _ in features]
histograms = []
for feature, bin, f_range in zip(features, bins, ranges):
histograms.append(np.histogram(tmp[feature], bin, f_range))
return histograms
def compute_diff_heading_cog(self):
self.df["diff"] = self.df.apply(lambda x: 180 - abs(abs(x['heading'] - x['cog']) - 180),
# New features
def compute_drift(self):
self.df["drift"] = self.df.apply(lambda x: 180 - abs(abs(x['heading'] - x['cog']) - 180),
axis=1)
def clean_angles(self):
self.df = self.df[self.df["cog"] <= 360]
self.df = self.df[self.df["cog"] >= 0]
self.df = self.df[self.df["heading"] <= 360]
self.df = self.df[self.df["heading"] >= 0]
def histogram_joint_x_y(self, x_fields=["sog", "diff"], x_nb_bins=10, x_range=[[0, 30], [0, 180]]
, y_nb_bins=2, y_fields='label', y_range=[0, 1]):
return self.histogram(x_fields + [y_fields],
bins=[x_nb_bins for i in x_fields] + [y_nb_bins],
ranges=x_range + [y_range])
def histogram_x_knowing_y(self, x_fields=["sog", "diff"], x_nb_bins=10, x_range=[[0, 30], [0, 180]]
, y_nb_bins=2, y_field='label'):
result = []
for i in range(y_nb_bins):
layer = self.histogram(x_fields, bins=x_nb_bins, ranges=x_range, label=i, y_field=y_field)
result.append(layer)
return np.stack(result, axis=len(x_fields))
def disjointed_histogram_x_knowing_y(self, features, x_nb_bins=10, x_range=[[0, 1]]
, y_nb_bins=4, y_field='label'):
out = []
for feature, f_range in zip(features, x_range):
result = []
for i in range(y_nb_bins):
layer, _ = np.histogram(self.df[self.df[y_field] == i][feature].to_numpy(), bins=x_nb_bins,
range=f_range)
if layer.sum() == 0:
layer = np.full(layer.shape, 1)
result.append(layer)
out.append(np.stack(result))
return out
def histogram_y_knowing_x(self, x_fields=["sog", "diff"], x_nb_bins=10, x_range=[[0, 30], [0, 180]]
, y_nb_bins=2, y_field='label', y_range=[0, 1]):
h_joint = self.histogram_joint_x_y(x_fields, x_nb_bins, x_range, y_nb_bins, y_field, y_range)
y_hist = self.histogram(features=y_field, bins=y_nb_bins, ranges=[y_range])
result = np.zeros(h_joint.shape)
for idx, x in np.ndenumerate(h_joint):
if h_joint[idx[:-1]].sum() == 0:
result[idx] = y_hist[idx[-1]]
else:
result[idx] = x / h_joint[idx[:-1]].sum()
return result
def get_trajectories(self, time_gap=30, min_size=50, interpolation_time=None):
if 'ts' in self.df:
self.df['ts'] = pd.to_datetime(self.df['ts'], infer_datetime_format=True)
self.df['ts_sec'] = self.df['ts'].apply(lambda x: datetime.timestamp(x))
dat = self.df
else:
raise ValueError
trajectories = []
for mmsi in dat.mmsi.unique():
trajectories += compute_trajectories(dat[dat['mmsi'] == mmsi], time_gap, min_size=min_size,
interpolation_time=interpolation_time)
return trajectories
def describe(self):
stats = {"nb vessels": len(self.df.mmsi.unique()),
"nb points": len(self.df.index),
"average speed": self.df['sog'].mean(),
"average diff": self.df['diff'].mean()
}
for n in np.sort(self.df['label'].unique()):
stats[f"labeled {n}"] = len(self.df[self.df['label'] == n].index)
return stats
# Static methods
@staticmethod
def fuse(*args):
if len(args) == 1:
......@@ -215,3 +121,9 @@ class AISPoints:
dfs.append(aisPosition.df)
return AISPoints(pd.concat(dfs).reindex())
@staticmethod
def load_from_csv(file_name):
df = pd.read_csv(file_name)
ais_points = AISPoints(df)
return ais_points
import math
import pandas as pd
import numpy as np
from numba import jit
from scipy.interpolate import interp1d
from skais.utils.geography import great_circle
from skais.utils.stats import calc_std_dev
from skais.ais.ais_points import AISPoints
PI = math.pi
class NoTimeInformation(Exception):
pass
@jit(nopython=True)
def compute_trajectory(times, time_gap, size_limit):
def compute_trajectory(times, time_gap):
n_samples = len(times)
previous_date = times[0]
i = 0
for i in range(size_limit):
if i >= n_samples:
break
if (times[i] - previous_date) / 60 > time_gap:
while i < n_samples:
if (times[i] - previous_date) > time_gap:
break
previous_date = times[i]
i += 1
return i
@jit(nopython=True)
def compute_std(dat, radius):
stds = np.empty(dat.shape[0])
dat = np.concatenate([np.full(radius, dat[0]), dat, np.full(radius, dat[-1])])
for i in range(radius, dat.shape[0] - radius):
stds[i - radius] = np.std(dat[i - radius:i + radius + 1])
return stds
@jit(nopython=True)
def to_rad(degree):
return (degree / 360.0) * (2 * PI)
@jit(nopython=True)
def to_deg(rad):
return (rad * 360.0) / (2 * PI)
@jit(nopython=True)
def bearing(p1, p2):
long1 = to_rad(p1[0])
long2 = to_rad(p2[0])
lat1 = to_rad(p1[1])
lat2 = to_rad(p2[1])
y = np.sin(long2 - long1) * np.cos(lat2)
x = np.cos(lat1) * np.sin(lat2) - np.sin(lat1) * np.cos(lat2) * np.cos(long2 - long1)
return to_deg(np.arctan2(y, x))
# @jit(nopython=True)
def compute_position_angle_std(dat, radius):
angles_stds = np.empty(dat.shape[0])
for i in range(radius, dat.shape[0] - radius):
data = dat[i - radius:i + radius]
n_samples = len(data)
center = (data[:, 0].mean(), data[:, 1].mean())
angles_sum = []
for j in range(n_samples):
p1 = (data[j][0], data[j][1])
alpha = bearing(p1, center)
angles_sum.append(alpha)
angles_stds[i] = calc_std_dev(angles_sum)
return angles_stds
@jit(nopython=True)
def compute_position_angle_mean(dat, radius):
angles_means = np.empty(dat.shape[0])
for i in range(radius, dat.shape[0] - radius):
data = dat[i - radius:i + radius]
n_samples = len(data)
center = (data[:, 0].mean(), data[:, 1].mean())
cos = sin = 0
for j in range(n_samples):
p1 = (data[j][0], data[j][1])
alpha = bearing(p1, center)
cos += np.cos(np.radians(alpha))
sin += np.sin(np.radians(alpha))
angles_means[i] = np.arctan2(sin, cos)
return angles_means
def compute_position_dist_mean(dat, radius):
dist_means = np.empty(dat.shape[0])
for i in range(radius, dat.shape[0] - radius):
data = dat[i - radius:i + radius]
n_samples = len(data)
center = (data[:, 0].mean(), data[:, 1].mean())
dist_sum = 0
for j in range(n_samples - 1):
p1 = (data[j][0], data[j][1])
dist_sum += great_circle(p1[0], center[0], p1[1], center[1])
dist_means[i] = dist_sum / (n_samples - 1)
return dist_means
def compute_position_dist_std(dat, radius):
dist_means = np.empty(dat.shape[0])
for i in range(radius, dat.shape[0] - radius):
data = dat[i - radius:i + radius]
n_samples = len(data)
center = (data[:, 0].mean(), data[:, 1].mean())
dist_sum = []
for j in range(n_samples - 1):
p1 = (data[j][0], data[j][1])
dist_sum.append(great_circle(p1[0], center[0], p1[1], center[1]))
dist_means[i] = np.std(dist_sum)
return dist_means
def compute_point_angles(dat):
angles = np.zeros(dat.shape[0])
for i in range(1, dat.shape[0] - 1):
p1 = (dat[i - 1][0], dat[i - 1][1])
p2 = (dat[i][0], dat[i][1])
p3 = (dat[i + 1][0], dat[i + 1][1])
alpha = bearing(p2, p1)
beta = bearing(p2, p3)
angles[i] = 180 - abs(abs(alpha - beta) - 180)
return angles
def l1_angle(dat, radius):
l1 = np.zeros(dat.shape)
# def compute_position_angle_std(dat, radius):
# angles_stds = np.empty(dat.shape[0])
# for i in range(radius, dat.shape[0] - radius):
# data = dat[i - radius:i + radius]
# n_samples = len(data)
# center = (data[:, 0].mean(), data[:, 1].mean())
#
# angles_sum = []
# for j in range(n_samples):
# p1 = (data[j][0], data[j][1])
#
# alpha = bearing(p1, center)
# angles_sum.append(alpha)
#
# angles_stds[i] = calc_std_dev(angles_sum)
# return angles_stds
#
#
# @jit(nopython=True)
# def compute_position_angle_mean(dat, radius):
# angles_means = np.empty(dat.shape[0])
# for i in range(radius, dat.shape[0] - radius):
# data = dat[i - radius:i + radius]
# n_samples = len(data)
# center = (data[:, 0].mean(), data[:, 1].mean())
#
# cos = sin = 0
# for j in range(n_samples):
# p1 = (data[j][0], data[j][1])
#
# alpha = bearing(p1, center)
#
# cos += np.cos(np.radians(alpha))
# sin += np.sin(np.radians(alpha))
#
# angles_means[i] = np.arctan2(sin, cos)
# return angles_means
#
#
# def compute_position_dist_mean(dat, radius):
# dist_means = np.empty(dat.shape[0])
# for i in range(radius, dat.shape[0] - radius):
# data = dat[i - radius:i + radius]
# n_samples = len(data)
# center = (data[:, 0].mean(), data[:, 1].mean())
#
# dist_sum = 0
# for j in range(n_samples - 1):
# p1 = (data[j][0], data[j][1])
#
# dist_sum += great_circle(p1[0], center[0], p1[1], center[1])
#
# dist_means[i] = dist_sum / (n_samples - 1)
# return dist_means
#
#
# def compute_position_dist_std(dat, radius):
# dist_means = np.empty(dat.shape[0])
# for i in range(radius, dat.shape[0] - radius):
# data = dat[i - radius:i + radius]
# n_samples = len(data)
# center = (data[:, 0].mean(), data[:, 1].mean())
#
# dist_sum = []
# for j in range(n_samples - 1):
# p1 = (data[j][0], data[j][1])
#
# dist_sum.append(great_circle(p1[0], center[0], p1[1], center[1]))
#
# dist_means[i] = np.std(dist_sum)
# return dist_means
#
#
# def l1_angle(dat, radius):
# l1 = np.zeros(dat.shape)
#
# dat = np.concatenate([np.full(radius, dat[0]), dat, np.full(radius, dat[-1])])
# for i in range(radius, dat.shape[0] - radius):
# data = dat[i - radius:i + radius + 1]
# l1[i - radius] = np.linalg.norm(data, ord=1)
#
# return l1
#
#
# def l2_angle(dat, radius):
# l2 = np.zeros(dat.shape)
#
# dat = np.concatenate([np.full(radius, dat[0]), dat, np.full(radius, dat[-1])])
# for i in range(radius, dat.shape[0] - radius):
# data = dat[i - radius:i + radius + 1]
# l2[i - radius] = np.linalg.norm(data, ord=2)
# return l2
#
#
# def angle_dispersion(dat, radius):
# disp = np.zeros(dat.shape)
#
# dat = np.concatenate([np.full(radius, dat[0]), dat, np.full(radius, dat[-1])])
# for i in range(radius, dat.shape[0] - radius):
# data = dat[i - radius:i + radius + 1]
# disp[i - radius] = angular_dispersion(np.radians(data))
#
# return disp
def apply_func_on_window(dat, func, radius, on_edge='copy'):
result = np.zeros(dat.shape)
if on_edge == 'copy':
dat = np.concatenate([np.full(radius, dat[0]), dat, np.full(radius, dat[-1])])
for i in range(radius, dat.shape[0] - radius):
data = dat[i - radius:i + radius + 1]
l1[i - radius] = np.linalg.norm(data, ord=1)
return l1
result[i - radius] = func(data)
return result
def l2_angle(dat, radius):
l2 = np.zeros(dat.shape)
dat = np.concatenate([np.full(radius, dat[0]), dat, np.full(radius, dat[-1])])
for i in range(radius, dat.shape[0] - radius):
data = dat[i - radius:i + radius+1]
l2[i - radius] = np.linalg.norm(data, ord=2)
return l2
def apply_time_sequence(dat, time, func):
result = np.empty(dat.shape[0])
result[0] = func(dat[0], dat[1], time[0], time[1])
for i in range(1, dat.shape[0]):
result[i] = func(dat[i - 1], dat[i], time[i - 1], time[i])
return result
class AISTrajectory:
class AISTrajectory(AISPoints):
def __init__(self, df, interpolation_time=None):
df = df.drop_duplicates(subset=['ts_sec'])
......@@ -201,103 +182,52 @@ class AISTrajectory:
df = new_df
# self.df = df.dropna()
self.df = df
def compute_angle_l1(self, radius):
dat = self.df['angles_diff'].to_numpy()
l1 = l1_angle(dat, radius)
self.df[f"angle_l1"] = l1
def compute_angle_l2(self, radius):
dat = self.df['angles_diff'].to_numpy()
l2 = l2_angle(dat, radius)
self.df[f"angle_l2"] = l2
def normalize(self, features, normalization_type="min-max"):
normalization_dict = {}
if normalization_type == "min-max":
for f in features:
minimum = self.df[f].min()
maximum = self.df[f].max()
self.df[f] = (self.df[f] - minimum) / (maximum - minimum)
normalization_dict[f"{f}_minimum"] = minimum
normalization_dict[f"{f}_maximum"] = maximum
elif normalization_type == "standardization":
normalisation_factors = ("standardization", {})
for f in features:
mean = self.df[f].mean()
std = self.df[f].std()
if std == 0:
print("Warning: std = %d", std)
std = 1
self.df[f] = (self.df[f] - mean) / std
normalization_dict[f"{f}_mean"] = mean
normalization_dict[f"{f}_std"] = std
else:
raise ValueError(f"{normalization_type} not a valid normalization method. Must be on of [min-max, "
f"standardization]")
return normalization_type, normalization_dict
AISPoints.__init__(self, df)
def compute_derivative(self, field):
dt = self.df['ts_sec'].diff() / 60
def __eq__(self, other):
return self.df.equals(other.df)
dv = self.df[field].diff().div(dt, axis=0, )
self.df['d_' + field] = dv
def compute_diff(self, field1, field2):
self.df["diff"] = self.df.apply(lambda x: 180 - abs(abs(x[field1] - x[field2]) - 180),
axis=1)
def sliding_window(self, size=10, offset=1, fields=None):
result = []
def compute_all_derivatives(self):
fields = ['cog', 'sog', 'rot', 'heading']
if len(self.df.index) >= size:
arr = self.to_numpy(fields)
prev_index = 0
while prev_index + size < len(self.df.index) + 1:
result.append(arr[prev_index:prev_index + size])
prev_index += offset
for field in fields:
self.compute_derivative(field)
return result
def compute_all_stds(self, radius):
fields = ['cog', 'sog', 'rot', 'heading', 'latitude', 'longitude']
def apply_func_on_time_window(self, func, radius, column, new_column=None):
dat = self.df[column].to_numpy()
result = apply_func_on_window(dat, func, radius, on_edge='copy')
for field in fields:
dat = self.df[field].to_numpy()
stds = compute_std(dat, radius)
stds[-radius:] = np.nan
stds[:radius] = np.nan
self.df[f"{field}_std"] = stds
if new_column is None:
self.df[column] = result
else:
self.df[new_column] = result
def compute_position_features(self, radius):
dat = np.stack([self.df.longitude.to_numpy(), self.df.latitude.to_numpy()], axis=1)
std = compute_position_angle_std(dat, radius)
std[-radius:] = np.nan
std[:radius] = np.nan
self.df[f"angle_std"] = std
def apply_time_sequence_func(self, func, column, new_column=None):
dat = self.df[column].to_numpy()
time = self.df['ts_sec'].to_numpy()
mean = compute_position_angle_mean(dat, radius)
mean[-radius:] = np.nan
mean[:radius] = np.nan
self.df[f"angle_mean"] = mean
result = apply_time_sequence(dat, time, func)
mean_dist = compute_position_dist_mean(dat, radius)
mean_dist[-radius:] = np.nan
mean_dist[:radius] = np.nan
self.df[f"dist_mean"] = mean_dist
if new_column is None:
self.df[column] = result
else:
self.df[new_column] = result
std_dist = compute_position_dist_std(dat, radius)
std_dist[-radius:] = np.nan
std_dist[:radius] = np.nan
self.df[f"dist_std"] = std_dist
def apply_func_on_points(self, func, column, new_column=None):
dat = self.df[column].to_numpy()
angles = compute_point_angles(dat)
angles[0] = np.nan
angles[-1] = np.nan
self.df[f"angles_diff"] = angles
result = np.array(list(map(func, dat)))
def compute_sqrt_sog(self):
sog = self.df['sog'].to_numpy()
sog[sog < 0] = 0
self.df["sog_sqrt"] = np.sqrt(sog)
if new_column is None:
self.df[column] = result
else:
self.df[new_column] = result
def to_numpy(self, fields=None):
......@@ -308,18 +238,6 @@ class AISTrajectory:
return np.squeeze(df.to_numpy())
def sliding_window(self, size=10, offset=1, fields=None):
result = []
if len(self.df.index) >= size:
arr = self.to_numpy(fields)
prev_index = 0
while prev_index + size < len(self.df.index) + 1:
result.append(arr[prev_index:prev_index + size])
prev_index += offset
return result
def to_geojson(self):
coordinates = []
for index, row in self.df.iterrows():
......@@ -327,19 +245,97 @@ class AISTrajectory:
return {"type": "LineString", "coordinates": coordinates}
def get_stopped_snippets(self, column, exclude_label=0, time_gap=5, size_limit=10000):
df = self.df.drop(self.df[self.df[column] == exclude_label].index)
def split_trajectory(self, time_gap=600):
if 'ts_sec' not in self.df:
raise NoTimeInformation()
work_df = df.copy()
n_sample = len(df.index)
n_sample = len(self.df.index)
result = []
work_df = self.df.copy()
index = 0
while index < n_sample:
i = compute_trajectory(df['ts_sec'][index:].to_numpy(), time_gap, size_limit)
i = compute_trajectory(self.df['ts_sec'][index:].to_numpy(), time_gap)
trajectory = AISTrajectory(work_df[:i])
result.append(trajectory)
work_df = work_df[i:]
index += i
return result
# def compute_angle_l1(self, radius):
# dat = self.df['angles_diff'].to_numpy()
# l1 = l1_angle(dat, radius)
# self.df[f"angle_l1"] = l1
#
# def compute_angle_l2(self, radius):
# dat = self.df['angles_diff'].to_numpy()
# l2 = l2_angle(dat, radius)
# self.df[f"angle_l2"] = l2
# def compute_derivative(self, field):
# dt = self.df['ts_sec'].diff() / 60
#
# dv = self.df[field].diff().div(dt, axis=0, )
#
# self.df['d_' + field] = dv
#
# def compute_diff(self, field1, field2):
# self.df["diff"] = self.df.apply(lambda x: 180 - abs(abs(x[field1] - x[field2]) - 180),
# axis=1)
#
# def compute_all_derivatives(self):
# fields = ['cog', 'sog', 'rot', 'heading']
#
# for field in fields:
# self.compute_derivative(field)
#
# def compute_all_stds(self, radius):
# fields = ['cog', 'sog', 'rot', 'heading', 'latitude', 'longitude']
#
# for field in fields:
# dat = self.df[field].to_numpy()
# stds = compute_std(dat, radius)
# stds[-radius:] = np.nan
# stds[:radius] = np.nan
# self.df[f"{field}_std"] = stds
#
# def compute_all_dispersions(self, radius):
# fields = ['cog', 'heading', 'angles_diff']
# for field in fields:
# if field in self.df.columns:
# dat = self.df[field].to_numpy()
# disp = angle_dispersion(dat, radius)
# self.df[f"{field}_disp"] = disp
#
# def compute_position_features(self, radius):
# dat = np.stack([self.df.longitude.to_numpy(), self.df.latitude.to_numpy()], axis=1)
# std = compute_position_angle_std(dat, radius)
# std[-radius:] = np.nan
# std[:radius] = np.nan
# self.df[f"angle_std"] = std
#
# mean = compute_position_angle_mean(dat, radius)
# mean[-radius:] = np.nan
# mean[:radius] = np.nan
# self.df[f"angle_mean"] = mean
#
# mean_dist = compute_position_dist_mean(dat, radius)
# mean_dist[-radius:] = np.nan
# mean_dist[:radius] = np.nan
# self.df[f"dist_mean"] = mean_dist
#
# std_dist = compute_position_dist_std(dat, radius)
# std_dist[-radius:] = np.nan
# std_dist[:radius] = np.nan
# self.df[f"dist_std"] = std_dist
#
# angles = compute_point_angles(dat)
# angles[0] = np.nan
# angles[-1] = np.nan
# self.df[f"angles_diff"] = angles
#
# def compute_sqrt_sog(self):
# sog = self.df['sog'].to_numpy()
# sog[sog < 0] = 0
# self.df["sog_sqrt"] = np.sqrt(sog)
import random
from hmmlearn.hmm import GMMHMM, GaussianHMM
from numba import jit
from scipy import linalg
from sklearn.datasets import make_spd_matrix
import numpy as np
def split_trajectories(feature_seq, label_seq, n_classes):
if len(feature_seq) != len(label_seq):
raise ValueError
if len(feature_seq) == 0:
return {}
sequence_length = len(feature_seq)
current_label = label_seq[0]
result = {i: [] for i in range(n_classes)}
current_seq = []
sequence_list = []
for i in range(sequence_length):
if current_label != label_seq[i]:
result[current_label].append(current_seq)
sequence_list.append((current_label, current_seq))
current_label = label_seq[i]
current_seq = []
current_seq.append(feature_seq[i])
result[current_label].append(current_seq)
sequence_list.append((current_label, current_seq))
return result, sequence_list
class GMMHMMClassifier:
def __init__(self, nb_states):
self.n_features = 0
if type(nb_states) is not list:
self.nb_states = np.array([nb_states])
else:
self.nb_states = np.array(nb_states)
self.hmms = []
self.n_classes = len(self.nb_states)
for i, nb_state in enumerate(self.nb_states):
self.hmms.append(GaussianHMM(n_components=nb_state, covariance_type='full'))
self.hmm = GaussianHMM(n_components=sum(self.nb_states), covariance_type='full', init_params='', n_iter=100)
self.predict_dictionary = {}
self.predictor = []
count = 0
for i, ii in enumerate(self.nb_states):
self.predictor.append({})
for j in range(ii):
self.predictor[i][j] = count
self.predict_dictionary[count] = i
count += 1
def fit(self, x, y):
sequences = {i: [] for i in range(self.n_classes)}
self.n_features = x[0].shape[1]
for feature_seq, label_seq in zip(x, y):
split_seq, _ = split_trajectories(feature_seq, label_seq, self.n_classes)
for key in sequences.keys():
sequences[key] += split_seq[key]
for i, seqs in sequences.items():
self.hmms[i].n_features = self.n_features
if sum([np.array(s).size for s in seqs]) > sum(self.hmms[i]._get_n_fit_scalars_per_param().values()):
self.hmms[i].fit(np.concatenate(seqs), list(map(len, seqs)))
for j, value in enumerate(self.hmms[i].transmat_.sum(axis=1)):
if value == 0:
self.hmms[i].transmat_[j][j] = 1.0
self.hmms[i].covars_[j] = make_spd_matrix(self.hmms[i].n_features)
else:
self.hmms[i] = None
predict = []
for feature_seq, label_seq in zip(x, y):
_, sequences_list = split_trajectories(feature_seq, label_seq, self.n_classes)
pred = np.array([])
for label, seq in sequences_list:
if self.hmms[label] is not None:
_, state_sequence = self.hmms[label].decode(np.array(seq), [len(seq)])
pred = np.append(pred, [self.predictor[label][i] for i in state_sequence])
if len(pred) != 0:
predict.append(pred)
start = np.zeros(sum(self.nb_states))
T_mat = np.zeros((sum(self.nb_states), sum(self.nb_states)))
prior = -1
count = np.zeros(sum(self.nb_states))
for pred in predict:
start[int(pred[0])] += 1
for p in pred:
if prior != -1:
T_mat[prior][int(p)] += 1
count[prior] += 1
prior = int(p)
for i in range(sum(self.nb_states)):
for j in range(sum(self.nb_states)):
if T_mat[i][j] > 0:
T_mat[i][j] = T_mat[i][j] / count[i]
self.hmm.startprob_ = start / sum(start)
self.hmm.transmat_ = T_mat
for i, value in enumerate(self.hmm.transmat_.sum(axis=1)):
if value == 0:
self.hmm.transmat_[i][i] = 1.0
means = []
covars = []
for i, model in enumerate(self.hmms):
if self.hmms[i] is not None:
means.append(model.means_)
covars.append(model.covars_)
else:
means.append(np.zeros((self.nb_states[i], x[0].shape[1])))
covars.append(np.stack([make_spd_matrix(x[0].shape[1])
for _ in range(self.nb_states[i])], axis=0))
means = np.concatenate(means)
covars = np.concatenate(covars)
for n, cv in enumerate(covars):
if count[n] <= 3:
covars[n] = np.identity(cv.shape[0])
if not np.allclose(cv, cv.T) or np.any(linalg.eigvalsh(cv) <= 0):
covars[n] += np.identity(cv.shape[0]) * 10 ** -15
self.hmm.means_ = means
self.hmm.covars_ = covars
def predict(self, X):
X_all = np.concatenate(X)
lenghts = [len(x) for x in X]
return np.array([self.predict_dictionary[i] for i in self.hmm.predict(X_all, lenghts)])
def predict_raw(self, X):
X_all = [x for i in X for x in i]
lenghts = [len(x) for x in X]
return self.hmm.predict(X_all, lenghts)
@jit(nopython=True)
def hmm_probabilities(predict, nb_states):
n_states = nb_states.sum()
start = np.zeros(n_states)
T_mat = np.zeros((n_states, n_states))
for pred in predict:
start[int(pred[0])] += 1
prior = int(pred[0])
for p in pred[1:]:
T_mat[prior][int(p)] += 1
prior = int(p)
mat_sum = T_mat.sum(axis=1)
for i in range(n_states):
if mat_sum[i] == 0:
T_mat[i][i] = 1.0
else:
T_mat[i] = T_mat[i] / mat_sum[i]
return start / start.sum(), T_mat
import numpy as np
def histogram(ais_points, features, bins, ranges=None, label=None, y_field='label'):
if label is not None:
tmp = ais_points.df[ais_points.df[y_field] == label]
else:
tmp = ais_points.df
dat = tmp[features]
h = np.histogramdd(dat.to_numpy(), bins, ranges)[0]
if h.sum() == 0:
return np.full(h.shape, 1 / h.size)
else:
return h / h.sum()
def disjointed_histogram(ais_points, features, bins, ranges, label=None, y_field='label'):
if label is not None:
tmp = ais_points.df[ais_points.df[y_field] == label]
else:
tmp = ais_points.df
if type(bins) == int:
bins = [bins for _ in features]
histograms = []
for feature, h_bin, f_range in zip(features, bins, ranges):
histograms.append(np.histogram(tmp[feature], h_bin, f_range))
return histograms
def histogram_joint_x_y(ais_points, x_fields, x_nb_bins, x_range, y_fields, y_nb_bins, y_range):
return histogram(ais_points, x_fields + [y_fields],
bins=[x_nb_bins for _ in x_fields] + [y_nb_bins],
ranges=x_range + [y_range])
def histogram_x_knowing_y(ais_points, x_fields, x_nb_bins, x_range, y_nb_bins, y_field):
result = []
for i in range(y_nb_bins):
layer = histogram(ais_points, x_fields, bins=x_nb_bins, ranges=x_range, label=i, y_field=y_field)
result.append(layer)
return np.stack(result, axis=len(x_fields))
def disjointed_histogram_x_knowing_y(ais_points, features, x_nb_bins, x_range, y_nb_bins, y_field):
out = []
for feature, f_range in zip(features, x_range):
result = []
for i in range(y_nb_bins):
layer, _ = np.histogram(ais_points.df[ais_points.df[y_field] == i][feature].to_numpy(), bins=x_nb_bins,
range=f_range)
if layer.sum() == 0:
layer = np.full(layer.shape, 1)
result.append(layer)
out.append(np.stack(result))
return out
def histogram_y_knowing_x(ais_points, x_fields, x_nb_bins, x_range, y_nb_bins, y_field, y_range):
h_joint = histogram_joint_x_y(ais_points, x_fields, x_nb_bins, x_range, y_nb_bins, y_field, y_range)
y_hist = histogram(ais_points, features=y_field, bins=y_nb_bins, ranges=[y_range])
result = np.zeros(h_joint.shape)
for idx, x in np.ndenumerate(h_joint):
if h_joint[idx[:-1]].sum() == 0:
result[idx] = y_hist[idx[-1]]
else:
result[idx] = x / h_joint[idx[:-1]].sum()
return result
from skais.ais.ais_trajectory import AISTrajectory
# Trajectories
"""
Separates AISPoints into individual trajectories
"""
def get_trajectories(ais_points):
trajectories = []
for mmsi in ais_points.df.mmsi.unique():
trajectories.append(AISTrajectory(ais_points.df[ais_points.df['mmsi'] == mmsi].reset_index(drop=True)))
return trajectories
import cmath
import numpy as np
def angular_average_vector(angles):
n = len(angles)
x = np.sum(np.cos(angles)) / n
y = np.sum(np.sin(angles)) / n
return np.array([x, y])
def angular_dispersion(angles):
x, y = angular_average_vector(angles)
return np.sqrt(x ** 2 + y ** 2)
def angular_mean(angles):
x, y = angular_average_vector(angles)
theta = abs(np.arctan2(x, y))
if y > 0 and x > 0: # first Q
return theta
if y > 0 >= x: # Second Q
return np.pi / 2 + theta
if y <= 0 < x: # Fourth Q
return np.pi / 2 - theta
else: # Third Q
return - theta
def angular_std(angles):
return 1 - angular_dispersion(angles)
def angular_time_derivative(angle1, angle2, ts1, ts2):
z1 = np.cos(angle1) + np.sin(angle1) * 1j
z2 = np.cos(angle2) + np.sin(angle2) * 1j
z = z2 / z1
return cmath.polar(z)[1] / (ts2 - ts1)
def time_derivative(f1, f2, ts1, ts2):
return (f2 - f1) / (ts2 - ts1)
import math
import numpy as np
from numba import jit
PI = math.pi
@jit(nopython=True)
def to_rad(degree):
return (degree / 360.0) * (2 * PI)
@jit(nopython=True)
def to_deg(rad):
return (rad * 360.0) / (2 * PI)
@jit(nopython=True)
def bearing(p1, p2):
long1 = to_rad(p1[0])
long2 = to_rad(p2[0])
lat1 = to_rad(p1[1])
lat2 = to_rad(p2[1])
y = np.sin(long2 - long1) * np.cos(lat2)
x = np.cos(lat1) * np.sin(lat2) - np.sin(lat1) * np.cos(lat2) * np.cos(long2 - long1)
return to_deg(np.arctan2(y, x))
@jit(nopython=True)
def angle_between_three_points(p1, p2, p3):
alpha = bearing(p2, p1)
beta = bearing(p2, p3)
result = alpha - beta
if result > 180:
return 180 - result
else:
return result
def compute_point_angles(dat):
angles = np.zeros(dat.shape[0])
p1 = (dat[0][0], dat[0][1])
p2 = (dat[1][0], dat[1][1])
angles[0] = bearing(p1, p2)
for i in range(1, dat.shape[0] - 1):
p1 = (dat[i - 1][0], dat[i - 1][1])
p2 = (dat[i][0], dat[i][1])
p3 = (dat[i + 1][0], dat[i + 1][1])
angles[i] = angle_between_three_points(p1, p2, p3)
return angles
This diff is collapsed.
import math
import unittest
import pandas as pd
import numpy as np
from skais.ais.ais_trajectory import AISTrajectory, compute_std, to_rad, bearing, to_deg, compute_point_angles
from skais.ais.ais_trajectory import *
class TestAISTrajectory(unittest.TestCase):
def test_get_stopped_snippets_simple(self):
ais_trajectory = AISTrajectory(pd.DataFrame(
{
"label": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1],
"ts_sec": [i for i in range(21)]
}
))
expected = pd.DataFrame(
{
"label": [1, 1, 1, 1, 1, 1, 1, 1],
"ts_sec": [i for i in range(13, 21)]
}
)
snippets = ais_trajectory.get_stopped_snippets('label')
self.assertEqual(len(snippets), 1)
pd.testing.assert_frame_equal(expected, snippets[0].df.reset_index(drop=True))
def test_get_stopped_snippets_multi_snippets(self):
ais_trajectory = AISTrajectory(pd.DataFrame(
{
"label": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0],
"ts_sec": [i * 60 for i in range(31)]
}
))
expected = [
pd.DataFrame(
{
"label": [1, 1, 1, 1, 1, 1, 1, 1],
"ts_sec": [i * 60 for i in range(13, 21)]
}
),
pd.DataFrame(
{
"label": [1, 1, 1, ],
"ts_sec": [i * 60 for i in range(25, 28)]
}
),
]
snippets = ais_trajectory.get_stopped_snippets('label', time_gap=1)
self.assertEqual(len(expected), len(snippets))
for e, s in zip(expected, snippets):
pd.testing.assert_frame_equal(e, s.df.reset_index(drop=True))
# def test_get_stopped_snippets_simple(self):
# ais_trajectory = AISTrajectory(pd.DataFrame(
# {
# "label": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1],
# "ts_sec": [i for i in range(21)]
# }
# ))
#
# expected = pd.DataFrame(
# {
# "label": [1, 1, 1, 1, 1, 1, 1, 1],
# "ts_sec": [i for i in range(13, 21)]
# }
# )
#
# snippets = ais_trajectory.get_stopped_snippets('label')
#
# self.assertEqual(len(snippets), 1)
# pd.testing.assert_frame_equal(expected, snippets[0].df.reset_index(drop=True))
# def test_get_stopped_snippets_multi_snippets(self):
# ais_trajectory = AISTrajectory(pd.DataFrame(
# {
# "label": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0],
# "ts_sec": [i * 60 for i in range(31)]
# }
# ))
#
# expected = [
# pd.DataFrame(
# {
# "label": [1, 1, 1, 1, 1, 1, 1, 1],
# "ts_sec": [i * 60 for i in range(13, 21)]
# }
# ),
# pd.DataFrame(
# {
# "label": [1, 1, 1, ],
# "ts_sec": [i * 60 for i in range(25, 28)]
# }
# ),
# ]
#
# snippets = ais_trajectory.get_stopped_snippets('label', time_gap=1)
#
# self.assertEqual(len(expected), len(snippets))
# for e, s in zip(expected, snippets):
# pd.testing.assert_frame_equal(e, s.df.reset_index(drop=True))
def test_to_geojson(self):
trajectory = AISTrajectory(
......@@ -125,22 +121,22 @@ class TestAISTrajectory(unittest.TestCase):
np.testing.assert_array_equal(result, expected)
def test_compute_sqrt_sog(self):
trajectory = AISTrajectory(
pd.DataFrame(
{
"sog": [i for i in range(4)],
"ts_sec": [i for i in range(4)]
}
)
)
trajectory.compute_sqrt_sog()
result = trajectory.df["sog_sqrt"].to_numpy()
expected = np.array([math.sqrt(i) for i in range(4)])
np.testing.assert_array_equal(result, expected)
# def test_compute_sqrt_sog(self):
# trajectory = AISTrajectory(
# pd.DataFrame(
# {
# "sog": [i for i in range(4)],
# "ts_sec": [i for i in range(4)]
# }
# )
# )
#
# trajectory.compute_sqrt_sog()
#
# result = trajectory.df["sog_sqrt"].to_numpy()
# expected = np.array([math.sqrt(i) for i in range(4)])
#
# np.testing.assert_array_equal(result, expected)
def test_interpolation(self):
trajectory = AISTrajectory(
......@@ -186,173 +182,81 @@ class TestAISTrajectory(unittest.TestCase):
pd.testing.assert_frame_equal(trajectory.df, expected)
def test_compute_angle_l1(self):
def test_split_trajectory_simple(self):
trajectory = AISTrajectory(
pd.DataFrame(
{
"ts_sec": [i for i in range(15)],
"angles_diff": [0 for _ in range(5)] + [1 for _ in range(5)] + [-1 for i in range(5)]
"ts_sec": [i for i in range(0, 3001, 600)] + [i for i in range(4001, 7001, 600)],
"label": [0 for _ in range(0, 3001, 600)] + [1 for _ in range(4001, 7001, 600)]
}
)
)
trajectory.compute_angle_l1(2)
result = trajectory.df["angle_l1"].to_numpy()
expected = np.array([0, 0, 0, 1, 2, 3, 4, 5, 5, 5, 5, 5, 5, 5, 5])
np.testing.assert_array_equal(result, expected)
def test_compute_angle_l2(self):
trajectory = AISTrajectory(
expected = [
AISTrajectory(
pd.DataFrame(
{
"ts_sec": [i for i in range(15)],
"angles_diff": [0 for _ in range(5)] + [1 for _ in range(5)] + [-1 for i in range(5)]
"ts_sec": [i for i in range(0, 3001, 600)],
"label": [0 for _ in range(0, 3001, 600)]
}
)
),
AISTrajectory(
pd.DataFrame(
{
"ts_sec": [i for i in range(4001, 7001, 600)],
"label": [1 for i in range(4001, 7001, 600)]
}
)
)
]
trajectory.compute_angle_l2(2)
result = trajectory.df["angle_l2"].to_numpy()
expected = np.array(np.sqrt([0, 0, 0, 1, 2, 3, 4, 5, 5, 5, 5, 5, 5, 5, 5]))
np.testing.assert_array_equal(result, expected)
def test_compute_std(self):
dat = np.array([0 for _ in range(5)] + [1 for _ in range(5)] + [-1 for i in range(5)])
radius = 2
result = compute_std.py_func(dat, radius)
expected = np.array(
[0, 0, 0, 0.4, np.sqrt(30 / 125), np.sqrt(30 / 125), 0.4, 0, 0.8, np.sqrt(120 / 125), np.sqrt(120 / 125),
0.8, 0, 0, 0])
np.testing.assert_almost_equal(result, expected)
def test_to_rad_0(self):
result = to_rad.py_func(0)
expected = 0
self.assertAlmostEqual(result, expected)
def test_to_rad_90(self):
result = to_rad.py_func(90)
expected = np.pi / 2
self.assertAlmostEqual(result, expected)
def test_to_rad_180(self):
result = to_rad.py_func(180)
expected = np.pi
self.assertAlmostEqual(result, expected)
def test_to_rad_360(self):
result = to_rad.py_func(360)
expected = 2 * np.pi
self.assertAlmostEqual(result, expected)
def test_to_rad_m180(self):
result = to_rad.py_func(-180)
expected = -np.pi
self.assertAlmostEqual(result, expected)
def test_to_rad_m90(self):
result = to_rad.py_func(-90)
expected = -np.pi / 2
self.assertAlmostEqual(result, expected)
def test_to_rad_270(self):
result = to_rad.py_func(270)
expected = 3 * np.pi / 2
self.assertAlmostEqual(result, expected)
def test_to_rad_810(self):
result = to_rad.py_func(810)
expected = 9 * np.pi / 2
self.assertAlmostEqual(result, expected)
def test_to_deg_0(self):
result = to_deg.py_func(0)
expected = 0
self.assertAlmostEqual(result, expected)
def test_to_deg_90(self):
result = to_deg.py_func(np.pi / 2)
expected = 90
self.assertAlmostEqual(result, expected)
def test_to_deg_180(self):
result = to_deg.py_func(np.pi)
expected = 180
self.assertAlmostEqual(result, expected)
def test_to_deg_360(self):
result = to_deg.py_func(2 * np.pi)
expected = 360
self.assertAlmostEqual(result, expected)
def test_to_deg_m180(self):
result = to_deg.py_func(-np.pi)
expected = -180
self.assertAlmostEqual(result, expected)
def test_to_deg_m90(self):
result = to_deg.py_func(-np.pi / 2)
expected = -90
self.assertAlmostEqual(result, expected)
def test_to_deg_270(self):
result = to_deg.py_func(3 * np.pi / 2)
expected = 270
self.assertAlmostEqual(result, expected)
def test_to_deg_810(self):
result = to_deg.py_func(9 * np.pi / 2)
expected = 810
self.assertAlmostEqual(result, expected)
def test_bearing_rand(self):
paris = (2.3522, 48.8566)
marseille = (5.3698, 43.2965)
result = bearing.py_func(paris, marseille)
expected = 158.2694
self.assertAlmostEqual(result, expected, places=4)
def test_bearing_along_equator(self):
p1 = (0, 0)
p2 = (90, 0)
result = bearing.py_func(p1, p2)
expected = 90
self.assertAlmostEqual(result, expected)
def test_bearing_equator_to_north_pole(self):
p1 = (0, 0)
p2 = (0, 90)
result = bearing.py_func(p1, p2)
expected = 0
self.assertAlmostEqual(result, expected)
for e, r in zip(expected, trajectory.split_trajectory(800)):
pd.testing.assert_frame_equal(e.df.reset_index(drop=True), r.df.reset_index(drop=True))
# def test_compute_angle_l1(self):
# trajectory = AISTrajectory(
# pd.DataFrame(
# {
# "ts_sec": [i for i in range(15)],
# "angles_diff": [0 for _ in range(5)] + [1 for _ in range(5)] + [-1 for i in range(5)]
# }
# )
# )
#
# trajectory.compute_angle_l1(2)
#
# result = trajectory.df["angle_l1"].to_numpy()
# expected = np.array([0, 0, 0, 1, 2, 3, 4, 5, 5, 5, 5, 5, 5, 5, 5])
#
# np.testing.assert_array_equal(result, expected)
#
# def test_compute_angle_l2(self):
# trajectory = AISTrajectory(
# pd.DataFrame(
# {
# "ts_sec": [i for i in range(15)],
# "angles_diff": [0 for _ in range(5)] + [1 for _ in range(5)] + [-1 for i in range(5)]
# }
# )
# )
#
# trajectory.compute_angle_l2(2)
#
# result = trajectory.df["angle_l2"].to_numpy()
# expected = np.array(np.sqrt([0, 0, 0, 1, 2, 3, 4, 5, 5, 5, 5, 5, 5, 5, 5]))
#
# np.testing.assert_array_equal(result, expected)
#
# def test_compute_std(self):
# dat = np.array([0 for _ in range(5)] + [1 for _ in range(5)] + [-1 for i in range(5)])
# radius = 2
#
# result = compute_std.py_func(dat, radius)
# expected = np.array(
# [0, 0, 0, 0.4, np.sqrt(30 / 125), np.sqrt(30 / 125), 0.4, 0, 0.8, np.sqrt(120 / 125), np.sqrt(120 / 125),
# 0.8, 0, 0, 0])
#
# np.testing.assert_almost_equal(result, expected)
# def test_compute_position_angle_std(self):
# dat = [(0, i * 10) for i in range(5)] + [(0, 50 - i * 10) for i in range(5)]
......@@ -377,11 +281,3 @@ class TestAISTrajectory(unittest.TestCase):
# result = compute_position_dist_std(dat, 2)
#
# # hard to test
\ No newline at end of file
def test_compute_point_angles(self):
dat = np.array([(0, 0), (0, 10), (10, 10), (0, 10)])
result = compute_point_angles(dat)
expected = np.array([0, 90, 0, 0])
np.testing.assert_almost_equal(expected, result, decimal=0)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment