Skip to content
Snippets Groups Projects
Commit a65fed84 authored by Raphael Sturgis's avatar Raphael Sturgis
Browse files

Merge branch 'develop' into 'main'


See merge request !1
parents 62d97a52 bf3d0348
No related branches found
No related tags found
2 merge requests!15Resolve "normalisation should raise exception when bad arguments given",!1Develop
with 1638 additions and 0 deletions
# run the test suite
stage: test
- main
- develop
- docker
- pip install -r requirements.txt
- pip install --no-deps .
- pytest --junitxml=report.xml
when: always
junit: report.xml
\ No newline at end of file
File moved
\ No newline at end of file
import pickle
from datetime import datetime
import pandas as pd
import numpy as np
from numba import jit
from scipy.stats import stats
from skais.ais.ais_trajectory import AISTrajectory
def compute_trajectories(df, time_gap, min_size=50, size_limit=500, interpolation_time=None):
n_sample = len(df.index)
result = []
work_df = df.copy()
index = 0
while index < n_sample:
i = compute_trajectory(df['ts_sec'][index:].to_numpy(), time_gap, size_limit)
trajectory = AISTrajectory(work_df[:i], interpolation_time=interpolation_time)
if len(trajectory.df.index) > min_size:
work_df = work_df[i:]
index += i
return result
def compute_trajectory(times, time_gap, size_limit):
n_samples = len(times)
previous_date = times[0]
i = 0
for i in range(size_limit):
if i >= n_samples or ((times[i] - previous_date) / 60 > time_gap):
return i
previous_date = times[i]
return i + 1
class AISPoints:
def __init__(self, df=None):
if 'ts' in df:
df['ts'] = pd.to_datetime(df.ts)
df = df.sort_values(['mmsi', 'ts'])
self.df = df
def load_from_csv(file_name):
df = pd.read_csv(file_name)
ais_points = AISPoints(df)
return ais_points
def remove_outliers(self, features, rank=4):
if rank <= 0:
raise ValueError(f"Rank is equal to {rank}, must be positive and superior to 0")
for feature in features:
self.df = self.df.drop(self.df[(np.abs(stats.zscore(self.df[feature])) > rank)].index)
def normalize(self, features, normalization_type="min-max"):
normalization_dict = {}
if normalization_type == "min-max":
for f in features:
minimum = self.df[f].min()
maximum = self.df[f].max()
diff = (maximum - minimum)
if diff == 0:
print("Warning: diff = %d", diff)
diff = 1
self.df[f] = (self.df[f] - minimum) / diff
normalization_dict[f"{f}_minimum"] = minimum
normalization_dict[f"{f}_maximum"] = maximum
elif normalization_type == "standardization":
normalisation_factors = ("standardization", {})
for f in features:
mean = self.df[f].mean()
std = self.df[f].std()
if std == 0:
print("Warning: std = %d", std)
std = 1
self.df[f] = (self.df[f] - mean) / std
normalization_dict[f"{f}_mean"] = mean
normalization_dict[f"{f}_std"] = std
raise ValueError(f"{normalization_type} not a valid normalization method. Must be on of [min-max, "
return normalization_type, normalization_dict
def histogram(self, features, bins=10, ranges=None, label=None, y_field='label'):
if label is not None:
tmp = self.df[self.df[y_field] == label]
tmp = self.df
dat = tmp[features]
h = np.histogramdd(dat.to_numpy(), bins, ranges)[0]
if h.sum() == 0:
return np.full(h.shape, 1 / h.size)
return h / h.sum()
def disjointed_histogram(self, features, bins, ranges, label=None, y_field='label'):
if label is not None:
tmp = self.df[self.df[y_field] == label]
tmp = self.df
if type(bins) == int:
bins = [bins for _ in features]
histograms = []
for feature, bin, f_range in zip(features, bins, ranges):
histograms.append(np.histogram(tmp[feature], bin, f_range))
return histograms
def compute_diff_heading_cog(self):
self.df["diff"] = self.df.apply(lambda x: 180 - abs(abs(x['heading'] - x['cog']) - 180),
def clean_angles(self):
self.df = self.df[self.df["cog"] <= 360]
self.df = self.df[self.df["cog"] >= 0]
self.df = self.df[self.df["heading"] <= 360]
self.df = self.df[self.df["heading"] >= 0]
def histogram_joint_x_y(self, x_fields=["sog", "diff"], x_nb_bins=10, x_range=[[0, 30], [0, 180]]
, y_nb_bins=2, y_fields='label', y_range=[0, 1]):
return self.histogram(x_fields + [y_fields],
bins=[x_nb_bins for i in x_fields] + [y_nb_bins],
ranges=x_range + [y_range])
def histogram_x_knowing_y(self, x_fields=["sog", "diff"], x_nb_bins=10, x_range=[[0, 30], [0, 180]]
, y_nb_bins=2, y_field='label'):
result = []
for i in range(y_nb_bins):
layer = self.histogram(x_fields, bins=x_nb_bins, ranges=x_range, label=i, y_field=y_field)
return np.stack(result, axis=len(x_fields))
def disjointed_histogram_x_knowing_y(self, features, x_nb_bins=10, x_range=[[0, 1]]
, y_nb_bins=4, y_field='label'):
out = []
for feature, f_range in zip(features, x_range):
result = []
for i in range(y_nb_bins):
layer, _ = np.histogram(self.df[self.df[y_field] == i][feature].to_numpy(), bins=x_nb_bins,
if layer.sum() == 0:
layer = np.full(layer.shape, 1)
return out
def histogram_y_knowing_x(self, x_fields=["sog", "diff"], x_nb_bins=10, x_range=[[0, 30], [0, 180]]
, y_nb_bins=2, y_field='label', y_range=[0, 1]):
h_joint = self.histogram_joint_x_y(x_fields, x_nb_bins, x_range, y_nb_bins, y_field, y_range)
y_hist = self.histogram(features=y_field, bins=y_nb_bins, ranges=[y_range])
result = np.zeros(h_joint.shape)
for idx, x in np.ndenumerate(h_joint):
if h_joint[idx[:-1]].sum() == 0:
result[idx] = y_hist[idx[-1]]
result[idx] = x / h_joint[idx[:-1]].sum()
return result
def get_trajectories(self, time_gap=30, min_size=50, interpolation_time=None):
if 'ts' in self.df:
self.df['ts'] = pd.to_datetime(self.df['ts'], infer_datetime_format=True)
self.df['ts_sec'] = self.df['ts'].apply(lambda x: datetime.timestamp(x))
dat = self.df
raise ValueError
trajectories = []
for mmsi in dat.mmsi.unique():
trajectories += compute_trajectories(dat[dat['mmsi'] == mmsi], time_gap, min_size=min_size,
return trajectories
def describe(self):
stats = {"nb vessels": len(self.df.mmsi.unique()),
"nb points": len(self.df.index),
"average speed": self.df['sog'].mean(),
"average diff": self.df['diff'].mean()
for n in np.sort(self.df['label'].unique()):
stats[f"labeled {n}"] = len(self.df[self.df['label'] == n].index)
return stats
def fuse(*args):
if len(args) == 1:
if not isinstance(args[0], list):
return args[0]
table = args[0]
table = args
dfs = []
for aisPosition in table:
return AISPoints(pd.concat(dfs).reindex())
import math
import pandas as pd
import numpy as np
from numba import jit
from scipy.interpolate import interp1d
from skais.utils.geography import great_circle
from skais.utils.stats import calc_std_dev
PI = math.pi
def compute_trajectory(times, time_gap, size_limit):
n_samples = len(times)
previous_date = times[0]
i = 0
for i in range(size_limit):
if i >= n_samples:
if (times[i] - previous_date) / 60 > time_gap:
previous_date = times[i]
return i
def compute_std(dat, radius):
stds = np.empty(dat.shape[0])
dat = np.concatenate([np.full(radius, dat[0]), dat, np.full(radius, dat[-1])])
for i in range(radius, dat.shape[0] - radius):
stds[i - radius] = np.std(dat[i - radius:i + radius + 1])
return stds
def to_rad(degree):
return (degree / 360.0) * (2 * PI)
def to_deg(rad):
return (rad * 360.0) / (2 * PI)
def bearing(p1, p2):
long1 = to_rad(p1[0])
long2 = to_rad(p2[0])
lat1 = to_rad(p1[1])
lat2 = to_rad(p2[1])
y = np.sin(long2 - long1) * np.cos(lat2)
x = np.cos(lat1) * np.sin(lat2) - np.sin(lat1) * np.cos(lat2) * np.cos(long2 - long1)
return to_deg(np.arctan2(y, x))
# @jit(nopython=True)
def compute_position_angle_std(dat, radius):
angles_stds = np.empty(dat.shape[0])
for i in range(radius, dat.shape[0] - radius):
data = dat[i - radius:i + radius]
n_samples = len(data)
center = (data[:, 0].mean(), data[:, 1].mean())
angles_sum = []
for j in range(n_samples):
p1 = (data[j][0], data[j][1])
alpha = bearing(p1, center)
angles_stds[i] = calc_std_dev(angles_sum)
return angles_stds
def compute_position_angle_mean(dat, radius):
angles_means = np.empty(dat.shape[0])
for i in range(radius, dat.shape[0] - radius):
data = dat[i - radius:i + radius]
n_samples = len(data)
center = (data[:, 0].mean(), data[:, 1].mean())
cos = sin = 0
for j in range(n_samples):
p1 = (data[j][0], data[j][1])
alpha = bearing(p1, center)
cos += np.cos(np.radians(alpha))
sin += np.sin(np.radians(alpha))
angles_means[i] = np.arctan2(sin, cos)
return angles_means
def compute_position_dist_mean(dat, radius):
dist_means = np.empty(dat.shape[0])
for i in range(radius, dat.shape[0] - radius):
data = dat[i - radius:i + radius]
n_samples = len(data)
center = (data[:, 0].mean(), data[:, 1].mean())
dist_sum = 0
for j in range(n_samples - 1):
p1 = (data[j][0], data[j][1])
dist_sum += great_circle(p1[0], center[0], p1[1], center[1])
dist_means[i] = dist_sum / (n_samples - 1)
return dist_means
def compute_position_dist_std(dat, radius):
dist_means = np.empty(dat.shape[0])
for i in range(radius, dat.shape[0] - radius):
data = dat[i - radius:i + radius]
n_samples = len(data)
center = (data[:, 0].mean(), data[:, 1].mean())
dist_sum = []
for j in range(n_samples - 1):
p1 = (data[j][0], data[j][1])
dist_sum.append(great_circle(p1[0], center[0], p1[1], center[1]))
dist_means[i] = np.std(dist_sum)
return dist_means
def compute_point_angles(dat):
angles = np.zeros(dat.shape[0])
for i in range(1, dat.shape[0] - 1):
p1 = (dat[i - 1][0], dat[i - 1][1])
p2 = (dat[i][0], dat[i][1])
p3 = (dat[i + 1][0], dat[i + 1][1])
alpha = bearing(p2, p1)
beta = bearing(p2, p3)
angles[i] = 180 - abs(abs(alpha - beta) - 180)
return angles
def l1_angle(dat, radius):
l1 = np.zeros(dat.shape)
dat = np.concatenate([np.full(radius, dat[0]), dat, np.full(radius, dat[-1])])
for i in range(radius, dat.shape[0] - radius):
data = dat[i - radius:i + radius+1]
l1[i - radius] = np.linalg.norm(data, ord=1)
return l1
def l2_angle(dat, radius):
l2 = np.zeros(dat.shape)
dat = np.concatenate([np.full(radius, dat[0]), dat, np.full(radius, dat[-1])])
for i in range(radius, dat.shape[0] - radius):
data = dat[i - radius:i + radius+1]
l2[i - radius] = np.linalg.norm(data, ord=2)
return l2
class AISTrajectory:
def __init__(self, df, interpolation_time=None):
df = df.drop_duplicates(subset=['ts_sec'])
if interpolation_time and len(df.index) > 4:
float_columns = ['longitude', 'latitude', 'cog', 'heading', 'rot', 'sog', 'diff']
discrete_columns = ['navstatus', 'label']
new_df = pd.DataFrame()
t_raw = df['ts_sec'].to_numpy()
t_interp1d = np.arange(start=t_raw[0], stop=t_raw[-1]+1,
step=interpolation_time * 60)
new_df['ts_sec'] = t_interp1d
for column in float_columns:
if column in df.columns:
new_df[column] = interp1d(x=df['ts_sec'],
for column in discrete_columns:
if column in df.columns:
new_df[column] = interp1d(x=df['ts_sec'],
kind='nearest', axis=0)(t_interp1d).astype(int)
df = new_df
# self.df = df.dropna()
self.df = df
def compute_angle_l1(self, radius):
dat = self.df['angles_diff'].to_numpy()
l1 = l1_angle(dat, radius)
self.df[f"angle_l1"] = l1
def compute_angle_l2(self, radius):
dat = self.df['angles_diff'].to_numpy()
l2 = l2_angle(dat, radius)
self.df[f"angle_l2"] = l2
def normalize(self, features, normalization_type="min-max"):
normalization_dict = {}
if normalization_type == "min-max":
for f in features:
minimum = self.df[f].min()
maximum = self.df[f].max()
self.df[f] = (self.df[f] - minimum) / (maximum - minimum)
normalization_dict[f"{f}_minimum"] = minimum
normalization_dict[f"{f}_maximum"] = maximum
elif normalization_type == "standardization":
normalisation_factors = ("standardization", {})
for f in features:
mean = self.df[f].mean()
std = self.df[f].std()
if std == 0:
print("Warning: std = %d", std)
std = 1
self.df[f] = (self.df[f] - mean) / std
normalization_dict[f"{f}_mean"] = mean
normalization_dict[f"{f}_std"] = std
raise ValueError(f"{normalization_type} not a valid normalization method. Must be on of [min-max, "
return normalization_type, normalization_dict
def compute_derivative(self, field):
dt = self.df['ts_sec'].diff() / 60
dv = self.df[field].diff().div(dt, axis=0, )
self.df['d_' + field] = dv
def compute_diff(self, field1, field2):
self.df["diff"] = self.df.apply(lambda x: 180 - abs(abs(x[field1] - x[field2]) - 180),
def compute_all_derivatives(self):
fields = ['cog', 'sog', 'rot', 'heading']
for field in fields:
def compute_all_stds(self, radius):
fields = ['cog', 'sog', 'rot', 'heading', 'latitude', 'longitude']
for field in fields:
dat = self.df[field].to_numpy()
stds = compute_std(dat, radius)
stds[-radius:] = np.nan
stds[:radius] = np.nan
self.df[f"{field}_std"] = stds
def compute_position_features(self, radius):
dat = np.stack([self.df.longitude.to_numpy(), self.df.latitude.to_numpy()], axis=1)
std = compute_position_angle_std(dat, radius)
std[-radius:] = np.nan
std[:radius] = np.nan
self.df[f"angle_std"] = std
mean = compute_position_angle_mean(dat, radius)
mean[-radius:] = np.nan
mean[:radius] = np.nan
self.df[f"angle_mean"] = mean
mean_dist = compute_position_dist_mean(dat, radius)
mean_dist[-radius:] = np.nan
mean_dist[:radius] = np.nan
self.df[f"dist_mean"] = mean_dist
std_dist = compute_position_dist_std(dat, radius)
std_dist[-radius:] = np.nan
std_dist[:radius] = np.nan
self.df[f"dist_std"] = std_dist
angles = compute_point_angles(dat)
angles[0] = np.nan
angles[-1] = np.nan
self.df[f"angles_diff"] = angles
def compute_sqrt_sog(self):
sog = self.df['sog'].to_numpy()
sog[sog < 0] = 0
self.df["sog_sqrt"] = np.sqrt(sog)
def to_numpy(self, fields=None):
if fields:
df = self.df[fields]
df = self.df
return np.squeeze(df.to_numpy())
def sliding_window(self, size=10, offset=1, fields=None):
result = []
if len(self.df.index) >= size:
arr = self.to_numpy(fields)
prev_index = 0
while prev_index + size < len(self.df.index) + 1:
result.append(arr[prev_index:prev_index + size])
prev_index += offset
return result
def to_geojson(self):
coordinates = []
for index, row in self.df.iterrows():
coordinates.append([row['longitude'], row['latitude']])
return {"type": "LineString", "coordinates": coordinates}
def get_stopped_snippets(self, column, exclude_label=0, time_gap=5, size_limit=10000):
df = self.df.drop(self.df[self.df[column] == exclude_label].index)
work_df = df.copy()
n_sample = len(df.index)
result = []
index = 0
while index < n_sample:
i = compute_trajectory(df['ts_sec'][index:].to_numpy(), time_gap, size_limit)
trajectory = AISTrajectory(work_df[:i])
work_df = work_df[i:]
index += i
return result
import unittest
import pandas as pd
import numpy as np
from skais.ais.ais_points import AISPoints, compute_trajectory, compute_trajectories
class TestAISPositions(unittest.TestCase):
def setUp(self) -> None:
self.ais_points = AISPoints(pd.DataFrame(
"sog": [2, 3, 7, 15, 14, 12, 18, 25, 21, 12, 11, 16, 19, 2, 5, 15, 12, 7, 8, 9, 1],
"diff": [35, 45, 59, 12, 1, 2, 54, 5, 47, 86, 119, 68, 75, 54, 55, 12, 32, 62, 159, 157, 132],
"label": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1],
"mmsi": [0 for i in range(21)]
self.ais_trajectories = AISPoints(
'lat': [0, 0, 1, 1, 2, 2, 3, 3, 4, 4] * 2,
'long': [0, 0, 0, 0, 0, 1, 2, 3, 4, 5] * 2,
'sog': [0, 0, 10, 10, 10, 20, 30, 40, 20, 10] * 2,
'diff': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] * 2,
'label': [1, 1, 0, 0, 0, 0, 0, 0, 0, 0] * 2,
'ts': ['2020-03-10 22:10:00', '2020-03-10 22:14:00', '2020-03-10 22:18:00', '2020-03-10 22:22:00',
'2020-03-10 22:30:00', '2020-03-10 22:32:00', '2020-03-10 22:35:00', '2020-03-10 22:40:00',
'2020-03-10 22:45:00', '2020-03-10 22:50:00'] +
['2020-03-10 22:10:00', '2020-03-10 22:14:00', '2020-03-10 22:18:00', '2020-03-10 22:20:00',
'2020-03-10 23:30:00', '2020-03-10 23:32:00', '2020-03-10 23:35:00', '2020-03-10 23:40:00',
'2020-03-10 23:45:00', '2020-03-10 23:50:00'],
'mmsi': [100 for i in range(10)] + [101 for i in range(10)]
def test_histogram_no_label_simple(self):
result = np.histogramdd(self.ais_points.df[["sog", "diff"]].to_numpy(), 3, [[0, 30], [0, 180]])[0]
result = result / result.sum()
self.assertTrue(np.array_equal(self.ais_points.histogram(["sog", "diff"], 3, [[0, 30], [0, 180]]),
def test_histogram_no_label_no_data(self):
ais_points = AISPoints(
"sog": [],
"diff": [],
"label": []
self.assertTrue(np.array_equal(ais_points.histogram(["sog", "diff"], 3, [[0, 30], [0, 180]]),
np.full((3, 3), 1 / 9)))
def test_histogram_label(self):
self.assertTrue(np.array_equal(self.ais_points.histogram(["sog", "diff"], 3, [[0, 30], [0, 180]], label=0),
np.array([[3, 0, 0], [4, 4, 0], [2, 0, 0]]) / 13))
def test_histogram_joint_x_y(self):
ground_truth = np.array([[[3, 2], [0, 1], [0, 3]],
[[4, 2], [4, 0], [0, 0]],
[[2, 0], [0, 0], [0, 0]]]) / 21
np.testing.assert_array_equal(ground_truth, self.ais_points.histogram_joint_x_y(x_nb_bins=3))
def test_histogram_x_knowing_y(self):
ground_truth = np.array([[[3 / 13, 2 / 8], [0, 1 / 8], [0, 3 / 8]],
[[4 / 13, 2 / 8], [4 / 13, 0], [0, 0]],
[[2 / 13, 0], [0, 0], [0, 0]]])
np.testing.assert_array_equal(ground_truth, self.ais_points.histogram_x_knowing_y(x_nb_bins=3))
def test_histogram_y_knowing_x(self):
ground_truth = np.array([[[3 / 5, 2 / 5], [0, 1], [0, 1]],
[[4 / 6, 2 / 6], [1, 0], [13 / 21, 8 / 21]],
[[1, 0], [13 / 21, 8 / 21], [13 / 21, 8 / 21]]])
np.testing.assert_array_equal(ground_truth, self.ais_points.histogram_y_knowing_x(x_nb_bins=3))
# def test_load_from_csv(self):
# ais_points = AISPoints.load_from_csv("test_load_from_csv.csv")
# pd.testing.assert_frame_equal(ais_points.df, self.ais_points.df)
def test_compute_diff_heading_cog(self):
ais_points = AISPoints(pd.DataFrame(
"cog": [i for i in range(0, 359, 10)],
"heading": [180 for i in range(0, 359, 10)]
diff = ais_points.df['diff'].to_numpy()
np.testing.assert_array_equal(diff, np.array([180, 170, 160, 150, 140, 130, 120, 110, 100, 90, 80, 70, 60, 50,
40, 30, 20, 10, 0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110,
120, 130, 140, 150, 160, 170]))
def test_histogram_x(self):
ground_truth = np.array([[5, 1, 3],
[6, 4, 0],
[2, 0, 0]]) / 21
self.ais_points.histogram(features=["sog", "diff"], bins=3,
ranges=[[0, 30], [0, 180]]))
def test_describe(self):
'nb vessels': 1,
'nb points': 21,
'labeled 0': 13,
'labeled 1': 8,
'average speed': 234 / 21,
'average diff': 1271 / 21
def test_fuse_single(self):
pd.testing.assert_frame_equal(AISPoints.fuse(self.ais_points).df, self.ais_points.df)
def test_fuse_simple_list(self):
pd.testing.assert_frame_equal(AISPoints.fuse([self.ais_points]).df, self.ais_points.df)
def test_fuse_multiple(self):
value = pd.DataFrame(
"sog": [2, 3, 7, 15, 14, 12, 18, 25, 21, 12, 11, 16, 19, 2, 5, 15, 12, 7, 8, 9, 1, 2, 3, 7, 15, 14, 12,
18, 25, 21, 12, 11, 16, 19, 2, 5, 15, 12, 7, 8, 9, 1],
"diff": [35, 45, 59, 12, 1, 2, 54, 5, 47, 86, 119, 68, 75, 54, 55, 12, 32, 62, 159, 157, 132, 35, 45,
59, 12, 1, 2, 54, 5, 47, 86, 119, 68, 75, 54, 55, 12, 32, 62, 159, 157, 132],
"label": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1],
"mmsi": [0 for i in range(42)]
pd.testing.assert_frame_equal(AISPoints.fuse(self.ais_points, self.ais_points).df.reset_index(drop=True),
def test_compute_trajectory_simple(self):
times = np.arange(0, 100, 4) * 60
result = compute_trajectory.py_func(times, 5, 1000)
expected = 25
self.assertEqual(result, expected)
def test_compute_trajectory_cut(self):
times = np.concatenate([np.arange(0, 100, 4) * 60, np.arange(120, 200, 4) * 60])
result = compute_trajectory.py_func(times, 5, 1000)
expected = 25
self.assertEqual(result, expected)
def test_compute_trajectory_limit(self):
times = np.concatenate([np.arange(0, 100, 4) * 60, np.arange(120, 200, 4) * 60])
result = compute_trajectory.py_func(times, 5, 10)
expected = 10
self.assertEqual(result, expected)
def test_compute_trajectories_simple_split(self):
df = pd.DataFrame({'ts_sec': np.concatenate([np.arange(0, 100, 4) * 60, np.arange(120, 200, 4) * 60])})
result = compute_trajectories(df, 5, min_size=0)
expected = [
pd.DataFrame({'ts_sec': np.arange(0, 100, 4) * 60}),
pd.DataFrame({'ts_sec': np.arange(120, 200, 4) * 60})
self.assertEqual(len(expected), len(result))
for r, e in zip(result, expected):
pd.testing.assert_frame_equal(e.reset_index(drop=True), r.df.reset_index(drop=True))
def test_compute_trajectories_split_limit(self):
a = np.arange(0, 100, 4)
b = np.arange(120, 200, 4)
df = pd.DataFrame({'ts_sec': np.concatenate([np.arange(0, 100, 4) * 60, np.arange(120, 200, 4) * 60])})
result = compute_trajectories(df, 5, min_size=0, size_limit=10)
expected = [
pd.DataFrame({'ts_sec': a[:10] * 60}),
pd.DataFrame({'ts_sec': a[10:20] * 60}),
pd.DataFrame({'ts_sec': a[20:] * 60}),
pd.DataFrame({'ts_sec': b[:10] * 60}),
pd.DataFrame({'ts_sec': b[10:] * 60})
self.assertEqual(len(expected), len(result))
for r, e in zip(result, expected):
pd.testing.assert_frame_equal(e.reset_index(drop=True), r.df.reset_index(drop=True))
def test_compute_trajectories_split_min_size(self):
a = np.arange(0, 100, 4)
b = np.arange(120, 200, 4)
df = pd.DataFrame({'ts_sec': np.concatenate([np.arange(0, 100, 4) * 60, np.arange(120, 200, 4) * 60])})
result = compute_trajectories(df, 5, min_size=23)
expected = [
pd.DataFrame({'ts_sec': a * 60})
self.assertEqual(len(expected), len(result))
for r, e in zip(result, expected):
pd.testing.assert_frame_equal(e.reset_index(drop=True), r.df.reset_index(drop=True))
def test_normalize_min_max(self):
ais_points = AISPoints(pd.DataFrame(
"cog": [i for i in range(0, 359, 10)],
"heading": [180 for i in range(0, 359, 10)]
ais_points.normalize(['cog', 'heading'])
result = ais_points.df
expected = pd.DataFrame(
"cog": [i / 350.0 for i in range(0, 359, 10)],
"heading": [0.0 for i in range(0, 359, 10)]
pd.testing.assert_frame_equal(expected.reset_index(drop=True), result.reset_index(drop=True))
def test_normalize_standardization(self):
ais_points = AISPoints(pd.DataFrame(
"cog": [i for i in range(0, 359, 10)],
"heading": [180 for i in range(0, 359, 10)]
ais_points.normalize(['cog', 'heading'], normalization_type="standardization")
result = ais_points.df
expected = pd.DataFrame(
"cog": [-1.68458833, -1.58832614, -1.49206395, -1.39580176, -1.29953957,
-1.20327738, -1.10701519, -1.010753, -0.91449081, -0.81822862,
-0.72196643, -0.62570424, -0.52944205, -0.43317986, -0.33691767,
-0.24065548, -0.14439329, -0.0481311, 0.0481311, 0.14439329,
0.24065548, 0.33691767, 0.43317986, 0.52944205, 0.62570424,
0.72196643, 0.81822862, 0.91449081, 1.010753, 1.10701519,
1.20327738, 1.29953957, 1.39580176, 1.49206395, 1.58832614,
"heading": [0.0 for i in range(0, 359, 10)]
pd.testing.assert_frame_equal(expected.reset_index(drop=True), result.reset_index(drop=True),
check_exact=False, rtol=0.05)
def test_disjointed_histogram_label_none(self):
ais_points = AISPoints(pd.DataFrame(
"cog": [i for i in range(0, 359, 10)],
"heading": [180 for i in range(0, 359, 10)]
features = ["cog", "heading"]
bins = [10, 3]
ranges = [[0, 360], [0, 360]]
result = ais_points.disjointed_histogram(features, bins, ranges)
expected = [
np.array([4, 4, 3, 4, 3, 4, 4, 3, 4, 3]),
np.array([0, 36, 0])
self.assertEqual(len(result), len(expected))
for r, e in zip(result, expected):
np.testing.assert_array_equal(e, r[0])
def test_disjointed_histogram_label_0(self):
ais_points = AISPoints(pd.DataFrame(
"cog": [i for i in range(0, 359, 10)],
"heading": [180 for i in range(0, 359, 10)],
"label": [0 for _ in range(10)] + [1 for _ in range(26)]
features = ["cog", "heading"]
bins = [10, 3]
ranges = [[0, 360], [0, 360]]
result = ais_points.disjointed_histogram(features, bins, ranges, label=0)
expected = [
np.array([4, 4, 2, 0, 0, 0, 0, 0, 0, 0]),
np.array([0, 10, 0])
self.assertEqual(len(result), len(expected))
for r, e in zip(result, expected):
np.testing.assert_array_equal(e, r[0])
def test_disjointed_histogram_bins_int(self):
ais_points = AISPoints(pd.DataFrame(
"cog": [i for i in range(0, 359, 10)],
"heading": [180 for i in range(0, 359, 10)],
"label": [0 for _ in range(10)] + [1 for _ in range(26)]
features = ["cog", "heading"]
bins = 10
ranges = [[0, 360], [0, 360]]
result = ais_points.disjointed_histogram(features, bins, ranges)
expected = [
np.array([4, 4, 3, 4, 3, 4, 4, 3, 4, 3]),
np.array([0, 0, 0, 0, 0, 36, 0, 0, 0, 0])
self.assertEqual(len(result), len(expected))
for r, e in zip(result, expected):
np.testing.assert_array_equal(e, r[0])
def test_clean_angles(self):
ais_points = AISPoints(pd.DataFrame(
"cog": [i for i in range(0, 359, 10)] + [489, 456, -12] + [180, 180, 180],
"heading": [180 for i in range(0, 359, 10)] + [489, 180, 180] + [999, 666, -333],
expected = pd.DataFrame(
"cog": [i for i in range(0, 359, 10)],
"heading": [180 for i in range(0, 359, 10)]
result = ais_points.df
pd.testing.assert_frame_equal(expected.reset_index(drop=True), result.reset_index(drop=True))
def test_remove_outliers_simple(self):
ais_points = AISPoints(pd.DataFrame(
"cog": [i for i in range(0, 359, 10)] + [1000] + [666],
"heading": [0.0 for i in range(0, 359, 10)] + [0] + [0]}
expected = pd.DataFrame(
"cog": [i for i in range(0, 359, 10)] + [666],
"heading": [0.0 for i in range(0, 359, 10)] + [0]
ais_points.remove_outliers(["cog", "heading"])
pd.testing.assert_frame_equal(expected.reset_index(drop=True), ais_points.df.reset_index(drop=True))
def test_remove_outliers_rank(self):
ais_points = AISPoints(pd.DataFrame(
"cog": [i for i in range(0, 359, 10)] + [1000] + [666],
"heading": [0.0 for i in range(0, 359, 10)] + [0] + [0]}
expected = pd.DataFrame(
"cog": [i for i in range(0, 359, 10)],
"heading": [0.0 for i in range(0, 359, 10)]
ais_points.remove_outliers(["cog", "heading"], rank=2)
pd.testing.assert_frame_equal(expected.reset_index(drop=True), ais_points.df.reset_index(drop=True))
def test_remove_outliers_not_all_features(self):
ais_points = AISPoints(pd.DataFrame(
"cog": [i / 350.0 for i in range(0, 359, 10)] + [500] + [0],
"heading": [0.0 for i in range(0, 359, 10)] + [0] + [10000]}
expected = pd.DataFrame(
"cog": [i / 350.0 for i in range(0, 359, 10)] + [0],
"heading": [0.0 for i in range(0, 359, 10)] + [10000]
pd.testing.assert_frame_equal(expected.reset_index(drop=True), ais_points.df.reset_index(drop=True))
def test_remove_outliers_exception(self):
ais_points = AISPoints(
"cog": [i / 350.0 for i in range(0, 359, 10)] + [500] + [0],
"heading": [0.0 for i in range(0, 359, 10)] + [0] + [10000]
with self.assertRaises(ValueError):
ais_points.remove_outliers(["cog"], rank=0)
import math
import unittest
import pandas as pd
import numpy as np
from skais.ais.ais_trajectory import AISTrajectory, compute_std, to_rad, bearing, to_deg, compute_point_angles
class TestAISTrajectory(unittest.TestCase):
def test_get_stopped_snippets_simple(self):
ais_trajectory = AISTrajectory(pd.DataFrame(
"label": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1],
"ts_sec": [i for i in range(21)]
expected = pd.DataFrame(
"label": [1, 1, 1, 1, 1, 1, 1, 1],
"ts_sec": [i for i in range(13, 21)]
snippets = ais_trajectory.get_stopped_snippets('label')
self.assertEqual(len(snippets), 1)
pd.testing.assert_frame_equal(expected, snippets[0].df.reset_index(drop=True))
def test_get_stopped_snippets_multi_snippets(self):
ais_trajectory = AISTrajectory(pd.DataFrame(
"label": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0],
"ts_sec": [i * 60 for i in range(31)]
expected = [
"label": [1, 1, 1, 1, 1, 1, 1, 1],
"ts_sec": [i * 60 for i in range(13, 21)]
"label": [1, 1, 1, ],
"ts_sec": [i * 60 for i in range(25, 28)]
snippets = ais_trajectory.get_stopped_snippets('label', time_gap=1)
self.assertEqual(len(expected), len(snippets))
for e, s in zip(expected, snippets):
pd.testing.assert_frame_equal(e, s.df.reset_index(drop=True))
def test_to_geojson(self):
trajectory = AISTrajectory(
"latitude": [0, 90, 0, -90],
"longitude": [0, 90, 180, -90],
"ts_sec": [i for i in range(4)]
expected = {'type': 'LineString', 'coordinates': [[0, 0], [90, 90], [180, 0], [-90, -90]]}
self.assertDictEqual(trajectory.to_geojson(), expected)
def test_sliding_window(self):
trajectory = AISTrajectory(
"latitude": [0, 90, 0, -90],
"longitude": [0, 90, 180, -90],
"sog": [i for i in range(4)],
"ts_sec": [i for i in range(4)]
result = trajectory.sliding_window(size=3, offset=1, fields="sog")
expected = [
np.array([0, 1, 2]),
np.array([1, 2, 3])
self.assertEqual(len(result), len(expected))
for r, e in zip(result, expected):
np.testing.assert_array_equal(r, e)
def test_to_numpy_no_field(self):
trajectory = AISTrajectory(
"sog": [i for i in range(4)],
"ts_sec": [i for i in range(4)]
result = trajectory.to_numpy()
expected = np.stack([np.arange(4), np.arange(4)]).T
np.testing.assert_array_equal(result, expected)
def test_to_numpy_field(self):
trajectory = AISTrajectory(
"sog": [i for i in range(4)],
"ts_sec": [i for i in range(4)]
result = trajectory.to_numpy("sog")
expected = np.arange(4)
np.testing.assert_array_equal(result, expected)
def test_compute_sqrt_sog(self):
trajectory = AISTrajectory(
"sog": [i for i in range(4)],
"ts_sec": [i for i in range(4)]
result = trajectory.df["sog_sqrt"].to_numpy()
expected = np.array([math.sqrt(i) for i in range(4)])
np.testing.assert_array_equal(result, expected)
def test_interpolation(self):
trajectory = AISTrajectory(
"latitude": [0 for _ in range(0, 101, 10)],
"longitude": [i - 2 for i in range(0, 101, 10)],
"sog": [i * 2 for i in range(11)],
"ts_sec": [i for i in range(0, 6001, 600)]
expected = pd.DataFrame(
"ts_sec": [i for i in range(0, 6001, 300)],
"longitude": [float(i - 2) for i in range(0, 101, 5)],
"latitude": [0.0 for _ in range(0, 101, 5)],
"sog": [float(i) for i in range(21)]
pd.testing.assert_frame_equal(trajectory.df, expected)
def test_interpolation_discrete(self):
trajectory = AISTrajectory(
"ts_sec": [i for i in range(0, 6001, 600)],
"label": [0 for i in range(0, 3001, 600)] + [1 for i in range(3001, 6001, 600)]
expected = pd.DataFrame(
"ts_sec": [i for i in range(0, 6001, 300)],
"label": [0 for i in range(0, 3301, 300)] + [1 for i in range(3301, 6001, 300)]
pd.testing.assert_frame_equal(trajectory.df, expected)
def test_compute_angle_l1(self):
trajectory = AISTrajectory(
"ts_sec": [i for i in range(15)],
"angles_diff": [0 for _ in range(5)] + [1 for _ in range(5)] + [-1 for i in range(5)]
result = trajectory.df["angle_l1"].to_numpy()
expected = np.array([0, 0, 0, 1, 2, 3, 4, 5, 5, 5, 5, 5, 5, 5, 5])
np.testing.assert_array_equal(result, expected)
def test_compute_angle_l2(self):
trajectory = AISTrajectory(
"ts_sec": [i for i in range(15)],
"angles_diff": [0 for _ in range(5)] + [1 for _ in range(5)] + [-1 for i in range(5)]
result = trajectory.df["angle_l2"].to_numpy()
expected = np.array(np.sqrt([0, 0, 0, 1, 2, 3, 4, 5, 5, 5, 5, 5, 5, 5, 5]))
np.testing.assert_array_equal(result, expected)
def test_compute_std(self):
dat = np.array([0 for _ in range(5)] + [1 for _ in range(5)] + [-1 for i in range(5)])
radius = 2
result = compute_std.py_func(dat, radius)
expected = np.array(
[0, 0, 0, 0.4, np.sqrt(30 / 125), np.sqrt(30 / 125), 0.4, 0, 0.8, np.sqrt(120 / 125), np.sqrt(120 / 125),
0.8, 0, 0, 0])
np.testing.assert_almost_equal(result, expected)
def test_to_rad_0(self):
result = to_rad.py_func(0)
expected = 0
self.assertAlmostEqual(result, expected)
def test_to_rad_90(self):
result = to_rad.py_func(90)
expected = np.pi / 2
self.assertAlmostEqual(result, expected)
def test_to_rad_180(self):
result = to_rad.py_func(180)
expected = np.pi
self.assertAlmostEqual(result, expected)
def test_to_rad_360(self):
result = to_rad.py_func(360)
expected = 2 * np.pi
self.assertAlmostEqual(result, expected)
def test_to_rad_m180(self):
result = to_rad.py_func(-180)
expected = -np.pi
self.assertAlmostEqual(result, expected)
def test_to_rad_m90(self):
result = to_rad.py_func(-90)
expected = -np.pi / 2
self.assertAlmostEqual(result, expected)
def test_to_rad_270(self):
result = to_rad.py_func(270)
expected = 3 * np.pi / 2
self.assertAlmostEqual(result, expected)
def test_to_rad_810(self):
result = to_rad.py_func(810)
expected = 9 * np.pi / 2
self.assertAlmostEqual(result, expected)
def test_to_deg_0(self):
result = to_deg.py_func(0)
expected = 0
self.assertAlmostEqual(result, expected)
def test_to_deg_90(self):
result = to_deg.py_func(np.pi / 2)
expected = 90
self.assertAlmostEqual(result, expected)
def test_to_deg_180(self):
result = to_deg.py_func(np.pi)
expected = 180
self.assertAlmostEqual(result, expected)
def test_to_deg_360(self):
result = to_deg.py_func(2 * np.pi)
expected = 360
self.assertAlmostEqual(result, expected)
def test_to_deg_m180(self):
result = to_deg.py_func(-np.pi)
expected = -180
self.assertAlmostEqual(result, expected)
def test_to_deg_m90(self):
result = to_deg.py_func(-np.pi / 2)
expected = -90
self.assertAlmostEqual(result, expected)
def test_to_deg_270(self):
result = to_deg.py_func(3 * np.pi / 2)
expected = 270
self.assertAlmostEqual(result, expected)
def test_to_deg_810(self):
result = to_deg.py_func(9 * np.pi / 2)
expected = 810
self.assertAlmostEqual(result, expected)
def test_bearing_rand(self):
paris = (2.3522, 48.8566)
marseille = (5.3698, 43.2965)
result = bearing.py_func(paris, marseille)
expected = 158.2694
self.assertAlmostEqual(result, expected, places=4)
def test_bearing_along_equator(self):
p1 = (0, 0)
p2 = (90, 0)
result = bearing.py_func(p1, p2)
expected = 90
self.assertAlmostEqual(result, expected)
def test_bearing_equator_to_north_pole(self):
p1 = (0, 0)
p2 = (0, 90)
result = bearing.py_func(p1, p2)
expected = 0
self.assertAlmostEqual(result, expected)
# def test_compute_position_angle_std(self):
# dat = [(0, i * 10) for i in range(5)] + [(0, 50 - i * 10) for i in range(5)]
# result = compute_position_angle_std(dat, 2)
# # hard to test
# def test_compute_position_angle_mean(self):
# dat = [(0, i * 10) for i in range(5)] + [(0, 50 - i * 10) for i in range(5)]
# result = compute_position_angle_mean(dat, 2)
# # hard to test
# def test_compute_position_dist_mean(self):
# dat = [(0, i * 10) for i in range(5)] + [(0, 50 - i * 10) for i in range(5)]
# result = compute_position_dist_mean(dat, 2)
# # hard to test
# def test_compute_position_dist_std(self):
# dat = [(0, i * 10) for i in range(5)] + [(0, 50 - i * 10) for i in range(5)]
# result = compute_position_dist_std(dat, 2)
# # hard to test
def test_compute_point_angles(self):
dat = np.array([(0, 0), (0, 10), (10, 10), (0, 10)])
result = compute_point_angles(dat)
expected = np.array([0, 90, 0, 0])
np.testing.assert_almost_equal(expected, result, decimal=0)
\ No newline at end of file
# -*- coding: utf-8 -*-
.. moduleauthor:: Valentin Emiya
from configparser import ConfigParser
import os
from pathlib import Path
import tempfile
import unittest
from unittest.mock import patch
from skais.utils.config import generate_config, get_data_path, get_config_file
class TestGetConfigFile(unittest.TestCase):
def test_get_config_file(self):
config_file = get_config_file()
self.assertEqual(, 'skais.conf')
self.assertEqual(, '.config')
class TestGenerateConfig(unittest.TestCase):
def test_generate_config(self):
with patch('skais.utils.config.get_config_file') as mock:
mock.return_value = Path(tempfile.mkdtemp()) / 'skais.conf'
config_file = mock.return_value
class TestGetDataPath(unittest.TestCase):
def test_get_data_path(self):
with patch('skais.utils.config.get_config_file') as mock:
mock.return_value = Path(tempfile.mkdtemp()) / 'skais.conf'
config_file = mock.return_value
with self.assertRaises(Exception):
with self.assertRaises(Exception):
config = ConfigParser()
true_data_path = Path(tempfile.mkdtemp()) / 'data'
print('Data path:', str(true_data_path))
config.set('DATA', 'data_path', str(true_data_path))
config.write(open(config_file, 'w'))
tested_data_path = get_data_path()
self.assertEqual(tested_data_path, true_data_path)
File added
File added
File added
# -*- coding: utf-8 -*-
.. moduleauthor:: Valentin Emiya
# TODO tests
import os
from pathlib import Path
from configparser import ConfigParser
def get_config_file():
User configuration file
return Path(os.path.expanduser('~')) / '.config' / 'skais.conf'
def generate_config():
Generate an empty configuration file.
config = ConfigParser(allow_no_value=True)
config.set('DATA', '# path to data')
config.set('DATA', 'data_path', '/to/be/completed/skais/data')
config_file = get_config_file()
config_file.parent.mkdir(exist_ok=True, parents=True)
with open(config_file, 'w') as file:
print('Configuration file created: {}. Please update it with your data '
def get_data_path():
Read data folder from user configuration file.
config_file = get_config_file()
if not config_file.exists():
raise Exception('Configuration file does not exists. To create it, '
'execute tffpy.utils.generate_config()')
config = ConfigParser()
data_path = Path(config['DATA']['data_path'])
if not data_path.exists():
raise Exception('Invalid data path: {}. Update configuration file {}'
.format(data_path, config_file))
return data_path
def get_db_config():
Read db config from user configuration file.
config_file = get_config_file()
if not config_file.exists():
raise Exception('Configuration file does not exists. To create it, '
'execute tffpy.utils.generate_config()')
config = ConfigParser()
dict = {}
dict['user'] = config['DB']['user']
dict['password'] = config['DB']['password']
dict['host'] = config['DB']['host']
dict['port'] = config['DB']['port']
dict['database'] = config['DB']['database']
if not dict:
raise Exception('Invalid data path: {}. Update configuration file {}'
.format(dict, config_file))
return dict
\ No newline at end of file
def make_feature_vectors(trajectories, features=None,
label_field='label', nb_classes=2, sliding_window_gap=10, length_list=15):
if features is None:
features = ['rot', 'sog', 'd_sog', 'd_cog', 'd_heading', 'd_rot']
x = []
y = []
features_trajectories = []
label_trajectories = []
zero = [0 for _ in range(nb_classes)]
for trajectory in trajectories:
if len(trajectory.df.index) > length_list:
trajectory.df['ts'] = trajectory.df.index
trajectory.compute_diff('heading', 'cog')
windows = trajectory.sliding_window(length_list, offset=sliding_window_gap,
fields=features + [label_field])
if length_list > 0:
for window in windows:
label = zero.copy()
label[int(window[length_list // 2, -1])] = 1
x.append(window[:, :-1])
return features_trajectories, label_trajectories, x, y
from sklearn.metrics.pairwise import haversine_distances
import numpy as np
from numba import jit
def great_circle(lat1, lat2, long1, long2):
x1 = np.radians(lat1)
y1 = np.radians(long1)
x2 = np.radians(lat2)
y2 = np.radians(long2)
R = 6371000
delta_x = x2 - x1
delta_y= y2 -y1
a = np.sin(delta_x / 2) * np.sin(delta_x / 2) + np.cos(x1) * np.cos(x2) * \
np.sin(delta_y / 2) * np.sin(delta_y / 2)
c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a))
d = R * c
return d
def parseToken(current_token, head, isTokenNumber):
if isTokenNumber:
return ''
return current_token
def parseListOfList(string):
openBrackets = 0
stack = []
head = []
current_token = ''
isTokenNumber = True
for s in string:
if s == '[':
openBrackets +=1
head = []
elif s == ']':
current_token = parseToken(current_token, head, isTokenNumber)
openBrackets -=1
tmp = stack.pop()
head = tmp
isTokenNumber = False
elif s == ",":
current_token = parseToken(current_token, head, isTokenNumber)
isTokenNumber = True
current_token += s
return head[0]
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment