diff --git a/skais/ais/ais_trajectory.py b/skais/ais/ais_trajectory.py index aa105a756e7e8c5660c02ec95a2c4cc51ee0f715..81a26b73d98b2d38a0f99468df7fe3bc97a2f831 100644 --- a/skais/ais/ais_trajectory.py +++ b/skais/ais/ais_trajectory.py @@ -1,68 +1,28 @@ -import math - import pandas as pd import numpy as np from numba import jit from scipy.interpolate import interp1d from skais.ais.ais_points import AISPoints -from skais.process.basic_features_operations import angular_dispersion +from skais.process.geography import bearing from skais.utils.geography import great_circle from skais.utils.stats import calc_std_dev -PI = math.pi - - -@jit(nopython=True) -def compute_trajectory(times, time_gap, size_limit): - n_samples = len(times) - - previous_date = times[0] - - i = 0 - for i in range(size_limit): - if i >= n_samples: - break - if (times[i] - previous_date) / 60 > time_gap: - break - previous_date = times[i] - - return i - - -@jit(nopython=True) -def compute_std(dat, radius): - stds = np.empty(dat.shape[0]) - - dat = np.concatenate([np.full(radius, dat[0]), dat, np.full(radius, dat[-1])]) - for i in range(radius, dat.shape[0] - radius): - stds[i - radius] = np.std(dat[i - radius:i + radius + 1]) - - return stds - - -@jit(nopython=True) -def to_rad(degree): - return (degree / 360.0) * (2 * PI) - - -@jit(nopython=True) -def to_deg(rad): - return (rad * 360.0) / (2 * PI) - - -@jit(nopython=True) -def bearing(p1, p2): - long1 = to_rad(p1[0]) - long2 = to_rad(p2[0]) - - lat1 = to_rad(p1[1]) - lat2 = to_rad(p2[1]) - - y = np.sin(long2 - long1) * np.cos(lat2) - x = np.cos(lat1) * np.sin(lat2) - np.sin(lat1) * np.cos(lat2) * np.cos(long2 - long1) - return to_deg(np.arctan2(y, x)) - +# @jit(nopython=True) +# def compute_trajectory(times, time_gap, size_limit): +# n_samples = len(times) +# +# previous_date = times[0] +# +# i = 0 +# for i in range(size_limit): +# if i >= n_samples: +# break +# if (times[i] - previous_date) / 60 > time_gap: +# break +# previous_date = times[i] +# +# return i # @jit(nopython=True) def compute_position_angle_std(dat, radius): @@ -138,62 +98,38 @@ def compute_position_dist_std(dat, radius): return dist_means -@jit(nopython=True) -def angle_between_three_points(p1, p2, p3): - alpha = bearing(p2, p1) - beta = bearing(p2, p3) - result = alpha - beta - if result > 180: - return 180 - result - else: - return result - - -def compute_point_angles(dat): - angles = np.zeros(dat.shape[0]) - - p1 = (dat[0][0], dat[0][1]) - p2 = (dat[1][0], dat[1][1]) - angles[0] = bearing(p1, p2) - for i in range(1, dat.shape[0] - 1): - p1 = (dat[i - 1][0], dat[i - 1][1]) - p2 = (dat[i][0], dat[i][1]) - p3 = (dat[i + 1][0], dat[i + 1][1]) - - angles[i] = angle_between_three_points(p1, p2, p3) - return angles - - -def l1_angle(dat, radius): - l1 = np.zeros(dat.shape) - - dat = np.concatenate([np.full(radius, dat[0]), dat, np.full(radius, dat[-1])]) - for i in range(radius, dat.shape[0] - radius): - data = dat[i - radius:i + radius + 1] - l1[i - radius] = np.linalg.norm(data, ord=1) - return l1 - -def l2_angle(dat, radius): - l2 = np.zeros(dat.shape) - - dat = np.concatenate([np.full(radius, dat[0]), dat, np.full(radius, dat[-1])]) - for i in range(radius, dat.shape[0] - radius): - data = dat[i - radius:i + radius + 1] - l2[i - radius] = np.linalg.norm(data, ord=2) - return l2 - - -def angle_dispersion(dat, radius): - disp = np.zeros(dat.shape) - - dat = np.concatenate([np.full(radius, dat[0]), dat, np.full(radius, dat[-1])]) - for i in range(radius, dat.shape[0] - radius): - data = dat[i - radius:i + radius + 1] - disp[i - radius] = angular_dispersion(np.radians(data)) - - return disp +# def l1_angle(dat, radius): +# l1 = np.zeros(dat.shape) +# +# dat = np.concatenate([np.full(radius, dat[0]), dat, np.full(radius, dat[-1])]) +# for i in range(radius, dat.shape[0] - radius): +# data = dat[i - radius:i + radius + 1] +# l1[i - radius] = np.linalg.norm(data, ord=1) +# +# return l1 +# +# +# def l2_angle(dat, radius): +# l2 = np.zeros(dat.shape) +# +# dat = np.concatenate([np.full(radius, dat[0]), dat, np.full(radius, dat[-1])]) +# for i in range(radius, dat.shape[0] - radius): +# data = dat[i - radius:i + radius + 1] +# l2[i - radius] = np.linalg.norm(data, ord=2) +# return l2 +# +# +# def angle_dispersion(dat, radius): +# disp = np.zeros(dat.shape) +# +# dat = np.concatenate([np.full(radius, dat[0]), dat, np.full(radius, dat[-1])]) +# for i in range(radius, dat.shape[0] - radius): +# data = dat[i - radius:i + radius + 1] +# disp[i - radius] = angular_dispersion(np.radians(data)) +# +# return disp def apply_func_on_window(dat, func, radius, on_edge='copy'): @@ -210,7 +146,7 @@ def apply_time_sequence(dat, time, func): result = np.empty(dat.shape[0]) result[0] = func(dat[0], dat[1], time[0], time[1]) for i in range(1, dat.shape[0]): - result[i] = func(dat[i-1], dat[i], time[i-1], time[i]) + result[i] = func(dat[i - 1], dat[i], time[i - 1], time[i]) return result @@ -244,138 +180,11 @@ class AISTrajectory(AISPoints): df = new_df # self.df = df.dropna() - AISPoints.__init__(df) + AISPoints.__init__(self, df) def __eq__(self, other): return self.df.equals(other.df) - def compute_angle_l1(self, radius): - dat = self.df['angles_diff'].to_numpy() - l1 = l1_angle(dat, radius) - self.df[f"angle_l1"] = l1 - - def compute_angle_l2(self, radius): - dat = self.df['angles_diff'].to_numpy() - l2 = l2_angle(dat, radius) - self.df[f"angle_l2"] = l2 - - def normalize(self, features, normalization_type="min-max", dictionary=None): - normalization_dict = None - if dictionary is not None: - if dictionary["type"] == "min-max": - for f in features: - minimum = dictionary[f"{f}_minimum"] - maximum = dictionary[f"{f}_maximum"] - self.df[f] = (self.df[f] - minimum) / (maximum - minimum) - - elif dictionary["type"] == "standardization": - for f in features: - mean = dictionary[f"{f}_mean"] - std = dictionary[f"{f}_std"] - self.df[f] = (self.df[f] - mean) / std - else: - normalization_dict = {"type": normalization_type} - if normalization_type == "min-max": - for f in features: - minimum = self.df[f].min() - maximum = self.df[f].max() - self.df[f] = (self.df[f] - minimum) / (maximum - minimum) - normalization_dict[f"{f}_minimum"] = minimum - normalization_dict[f"{f}_maximum"] = maximum - - elif normalization_type == "standardization": - for f in features: - mean = self.df[f].mean() - std = self.df[f].std() - if std == 0: - print("Warning: std = %d", std) - std = 1 - self.df[f] = (self.df[f] - mean) / std - normalization_dict[f"{f}_mean"] = mean - normalization_dict[f"{f}_std"] = std - - else: - raise ValueError(f"{normalization_type} not a valid normalization method. Must be on of [min-max, " - f"standardization]") - - return normalization_dict - - def compute_derivative(self, field): - dt = self.df['ts_sec'].diff() / 60 - - dv = self.df[field].diff().div(dt, axis=0, ) - - self.df['d_' + field] = dv - - def compute_diff(self, field1, field2): - self.df["diff"] = self.df.apply(lambda x: 180 - abs(abs(x[field1] - x[field2]) - 180), - axis=1) - - def compute_all_derivatives(self): - fields = ['cog', 'sog', 'rot', 'heading'] - - for field in fields: - self.compute_derivative(field) - - def compute_all_stds(self, radius): - fields = ['cog', 'sog', 'rot', 'heading', 'latitude', 'longitude'] - - for field in fields: - dat = self.df[field].to_numpy() - stds = compute_std(dat, radius) - stds[-radius:] = np.nan - stds[:radius] = np.nan - self.df[f"{field}_std"] = stds - - def compute_all_dispersions(self, radius): - fields = ['cog', 'heading', 'angles_diff'] - for field in fields: - if field in self.df.columns: - dat = self.df[field].to_numpy() - disp = angle_dispersion(dat, radius) - self.df[f"{field}_disp"] = disp - - def compute_position_features(self, radius): - dat = np.stack([self.df.longitude.to_numpy(), self.df.latitude.to_numpy()], axis=1) - std = compute_position_angle_std(dat, radius) - std[-radius:] = np.nan - std[:radius] = np.nan - self.df[f"angle_std"] = std - - mean = compute_position_angle_mean(dat, radius) - mean[-radius:] = np.nan - mean[:radius] = np.nan - self.df[f"angle_mean"] = mean - - mean_dist = compute_position_dist_mean(dat, radius) - mean_dist[-radius:] = np.nan - mean_dist[:radius] = np.nan - self.df[f"dist_mean"] = mean_dist - - std_dist = compute_position_dist_std(dat, radius) - std_dist[-radius:] = np.nan - std_dist[:radius] = np.nan - self.df[f"dist_std"] = std_dist - - angles = compute_point_angles(dat) - angles[0] = np.nan - angles[-1] = np.nan - self.df[f"angles_diff"] = angles - - def compute_sqrt_sog(self): - sog = self.df['sog'].to_numpy() - sog[sog < 0] = 0 - self.df["sog_sqrt"] = np.sqrt(sog) - - def to_numpy(self, fields=None): - - if fields: - df = self.df[fields] - else: - df = self.df - - return np.squeeze(df.to_numpy()) - def sliding_window(self, size=10, offset=1, fields=None): result = [] @@ -388,30 +197,6 @@ class AISTrajectory(AISPoints): return result - def to_geojson(self): - coordinates = [] - for index, row in self.df.iterrows(): - coordinates.append([row['longitude'], row['latitude']]) - - return {"type": "LineString", "coordinates": coordinates} - - def get_stopped_snippets(self, column, exclude_label=0, time_gap=5, size_limit=10000): - df = self.df.drop(self.df[self.df[column] == exclude_label].index) - - work_df = df.copy() - n_sample = len(df.index) - result = [] - - index = 0 - while index < n_sample: - i = compute_trajectory(df['ts_sec'][index:].to_numpy(), time_gap, size_limit) - trajectory = AISTrajectory(work_df[:i]) - result.append(trajectory) - work_df = work_df[i:] - index += i - - return result - def apply_func_on_time_window(self, func, radius, column, new_column=None): dat = self.df[column].to_numpy() result = apply_func_on_window(dat, func, radius, on_edge='copy') @@ -431,3 +216,107 @@ class AISTrajectory(AISPoints): self.df[column] = result else: self.df[new_column] = result + + def apply_func_on_points(self, func, column, new_column=None): + dat = self.df[column].to_numpy() + + result = np.array(list(map(func, dat))) + + if new_column is None: + self.df[column] = result + else: + self.df[new_column] = result + + def to_numpy(self, fields=None): + + if fields: + df = self.df[fields] + else: + df = self.df + + return np.squeeze(df.to_numpy()) + + def to_geojson(self): + coordinates = [] + for index, row in self.df.iterrows(): + coordinates.append([row['longitude'], row['latitude']]) + + return {"type": "LineString", "coordinates": coordinates} + + # + # def compute_angle_l1(self, radius): + # dat = self.df['angles_diff'].to_numpy() + # l1 = l1_angle(dat, radius) + # self.df[f"angle_l1"] = l1 + # + # def compute_angle_l2(self, radius): + # dat = self.df['angles_diff'].to_numpy() + # l2 = l2_angle(dat, radius) + # self.df[f"angle_l2"] = l2 + + # def compute_derivative(self, field): + # dt = self.df['ts_sec'].diff() / 60 + # + # dv = self.df[field].diff().div(dt, axis=0, ) + # + # self.df['d_' + field] = dv + # + # def compute_diff(self, field1, field2): + # self.df["diff"] = self.df.apply(lambda x: 180 - abs(abs(x[field1] - x[field2]) - 180), + # axis=1) + # + # def compute_all_derivatives(self): + # fields = ['cog', 'sog', 'rot', 'heading'] + # + # for field in fields: + # self.compute_derivative(field) + # + # def compute_all_stds(self, radius): + # fields = ['cog', 'sog', 'rot', 'heading', 'latitude', 'longitude'] + # + # for field in fields: + # dat = self.df[field].to_numpy() + # stds = compute_std(dat, radius) + # stds[-radius:] = np.nan + # stds[:radius] = np.nan + # self.df[f"{field}_std"] = stds + # + # def compute_all_dispersions(self, radius): + # fields = ['cog', 'heading', 'angles_diff'] + # for field in fields: + # if field in self.df.columns: + # dat = self.df[field].to_numpy() + # disp = angle_dispersion(dat, radius) + # self.df[f"{field}_disp"] = disp + # + # def compute_position_features(self, radius): + # dat = np.stack([self.df.longitude.to_numpy(), self.df.latitude.to_numpy()], axis=1) + # std = compute_position_angle_std(dat, radius) + # std[-radius:] = np.nan + # std[:radius] = np.nan + # self.df[f"angle_std"] = std + # + # mean = compute_position_angle_mean(dat, radius) + # mean[-radius:] = np.nan + # mean[:radius] = np.nan + # self.df[f"angle_mean"] = mean + # + # mean_dist = compute_position_dist_mean(dat, radius) + # mean_dist[-radius:] = np.nan + # mean_dist[:radius] = np.nan + # self.df[f"dist_mean"] = mean_dist + # + # std_dist = compute_position_dist_std(dat, radius) + # std_dist[-radius:] = np.nan + # std_dist[:radius] = np.nan + # self.df[f"dist_std"] = std_dist + # + # angles = compute_point_angles(dat) + # angles[0] = np.nan + # angles[-1] = np.nan + # self.df[f"angles_diff"] = angles + # + # def compute_sqrt_sog(self): + # sog = self.df['sog'].to_numpy() + # sog[sog < 0] = 0 + # self.df["sog_sqrt"] = np.sqrt(sog) diff --git a/skais/process/geography.py b/skais/process/geography.py new file mode 100644 index 0000000000000000000000000000000000000000..3f19bb3a9eb4cc20937ba6b9e8f9e34e60ee96b7 --- /dev/null +++ b/skais/process/geography.py @@ -0,0 +1,55 @@ +import math + +import numpy as np +from numba import jit + +PI = math.pi + + +@jit(nopython=True) +def to_rad(degree): + return (degree / 360.0) * (2 * PI) + + +@jit(nopython=True) +def to_deg(rad): + return (rad * 360.0) / (2 * PI) + + +@jit(nopython=True) +def bearing(p1, p2): + long1 = to_rad(p1[0]) + long2 = to_rad(p2[0]) + + lat1 = to_rad(p1[1]) + lat2 = to_rad(p2[1]) + + y = np.sin(long2 - long1) * np.cos(lat2) + x = np.cos(lat1) * np.sin(lat2) - np.sin(lat1) * np.cos(lat2) * np.cos(long2 - long1) + return to_deg(np.arctan2(y, x)) + + +@jit(nopython=True) +def angle_between_three_points(p1, p2, p3): + alpha = bearing(p2, p1) + beta = bearing(p2, p3) + result = alpha - beta + if result > 180: + return 180 - result + else: + return result + + +def compute_point_angles(dat): + angles = np.zeros(dat.shape[0]) + + p1 = (dat[0][0], dat[0][1]) + p2 = (dat[1][0], dat[1][1]) + angles[0] = bearing(p1, p2) + for i in range(1, dat.shape[0] - 1): + p1 = (dat[i - 1][0], dat[i - 1][1]) + p2 = (dat[i][0], dat[i][1]) + p3 = (dat[i + 1][0], dat[i + 1][1]) + + angles[i] = angle_between_three_points(p1, p2, p3) + return angles diff --git a/skais/tests/ais/__init__.py b/skais/tests/ais/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/skais/tests/ais/test_ais_trajectory.py b/skais/tests/ais/test_ais_trajectory.py index e8100e31a9b1c6051d15c7e5260eb8bfdfbd6ca3..404244a96b01418cc37486b69eb89e51a09ff02d 100644 --- a/skais/tests/ais/test_ais_trajectory.py +++ b/skais/tests/ais/test_ais_trajectory.py @@ -1,63 +1,58 @@ -import math import unittest -import pandas as pd -import numpy as np - -from skais.ais.ais_trajectory import AISTrajectory, compute_std, to_rad, bearing, to_deg, compute_point_angles, \ - angle_between_three_points +from skais.ais.ais_trajectory import * class TestAISTrajectory(unittest.TestCase): - def test_get_stopped_snippets_simple(self): - ais_trajectory = AISTrajectory(pd.DataFrame( - { - "label": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1], - "ts_sec": [i for i in range(21)] - } - )) - - expected = pd.DataFrame( - { - "label": [1, 1, 1, 1, 1, 1, 1, 1], - "ts_sec": [i for i in range(13, 21)] - } - ) - - snippets = ais_trajectory.get_stopped_snippets('label') - - self.assertEqual(len(snippets), 1) - pd.testing.assert_frame_equal(expected, snippets[0].df.reset_index(drop=True)) - - def test_get_stopped_snippets_multi_snippets(self): - ais_trajectory = AISTrajectory(pd.DataFrame( - { - "label": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0], - "ts_sec": [i * 60 for i in range(31)] - } - )) - - expected = [ - pd.DataFrame( - { - "label": [1, 1, 1, 1, 1, 1, 1, 1], - "ts_sec": [i * 60 for i in range(13, 21)] - } - ), - pd.DataFrame( - { - "label": [1, 1, 1, ], - "ts_sec": [i * 60 for i in range(25, 28)] - } - ), - ] - - snippets = ais_trajectory.get_stopped_snippets('label', time_gap=1) - - self.assertEqual(len(expected), len(snippets)) - for e, s in zip(expected, snippets): - pd.testing.assert_frame_equal(e, s.df.reset_index(drop=True)) + # def test_get_stopped_snippets_simple(self): + # ais_trajectory = AISTrajectory(pd.DataFrame( + # { + # "label": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1], + # "ts_sec": [i for i in range(21)] + # } + # )) + # + # expected = pd.DataFrame( + # { + # "label": [1, 1, 1, 1, 1, 1, 1, 1], + # "ts_sec": [i for i in range(13, 21)] + # } + # ) + # + # snippets = ais_trajectory.get_stopped_snippets('label') + # + # self.assertEqual(len(snippets), 1) + # pd.testing.assert_frame_equal(expected, snippets[0].df.reset_index(drop=True)) + + # def test_get_stopped_snippets_multi_snippets(self): + # ais_trajectory = AISTrajectory(pd.DataFrame( + # { + # "label": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0], + # "ts_sec": [i * 60 for i in range(31)] + # } + # )) + # + # expected = [ + # pd.DataFrame( + # { + # "label": [1, 1, 1, 1, 1, 1, 1, 1], + # "ts_sec": [i * 60 for i in range(13, 21)] + # } + # ), + # pd.DataFrame( + # { + # "label": [1, 1, 1, ], + # "ts_sec": [i * 60 for i in range(25, 28)] + # } + # ), + # ] + # + # snippets = ais_trajectory.get_stopped_snippets('label', time_gap=1) + # + # self.assertEqual(len(expected), len(snippets)) + # for e, s in zip(expected, snippets): + # pd.testing.assert_frame_equal(e, s.df.reset_index(drop=True)) def test_to_geojson(self): trajectory = AISTrajectory( @@ -126,22 +121,22 @@ class TestAISTrajectory(unittest.TestCase): np.testing.assert_array_equal(result, expected) - def test_compute_sqrt_sog(self): - trajectory = AISTrajectory( - pd.DataFrame( - { - "sog": [i for i in range(4)], - "ts_sec": [i for i in range(4)] - } - ) - ) - - trajectory.compute_sqrt_sog() - - result = trajectory.df["sog_sqrt"].to_numpy() - expected = np.array([math.sqrt(i) for i in range(4)]) - - np.testing.assert_array_equal(result, expected) + # def test_compute_sqrt_sog(self): + # trajectory = AISTrajectory( + # pd.DataFrame( + # { + # "sog": [i for i in range(4)], + # "ts_sec": [i for i in range(4)] + # } + # ) + # ) + # + # trajectory.compute_sqrt_sog() + # + # result = trajectory.df["sog_sqrt"].to_numpy() + # expected = np.array([math.sqrt(i) for i in range(4)]) + # + # np.testing.assert_array_equal(result, expected) def test_interpolation(self): trajectory = AISTrajectory( @@ -187,202 +182,50 @@ class TestAISTrajectory(unittest.TestCase): pd.testing.assert_frame_equal(trajectory.df, expected) - def test_compute_angle_l1(self): - trajectory = AISTrajectory( - pd.DataFrame( - { - "ts_sec": [i for i in range(15)], - "angles_diff": [0 for _ in range(5)] + [1 for _ in range(5)] + [-1 for i in range(5)] - } - ) - ) - - trajectory.compute_angle_l1(2) - - result = trajectory.df["angle_l1"].to_numpy() - expected = np.array([0, 0, 0, 1, 2, 3, 4, 5, 5, 5, 5, 5, 5, 5, 5]) - - np.testing.assert_array_equal(result, expected) - - def test_compute_angle_l2(self): - trajectory = AISTrajectory( - pd.DataFrame( - { - "ts_sec": [i for i in range(15)], - "angles_diff": [0 for _ in range(5)] + [1 for _ in range(5)] + [-1 for i in range(5)] - } - ) - ) - - trajectory.compute_angle_l2(2) - - result = trajectory.df["angle_l2"].to_numpy() - expected = np.array(np.sqrt([0, 0, 0, 1, 2, 3, 4, 5, 5, 5, 5, 5, 5, 5, 5])) - - np.testing.assert_array_equal(result, expected) - - def test_compute_std(self): - dat = np.array([0 for _ in range(5)] + [1 for _ in range(5)] + [-1 for i in range(5)]) - radius = 2 - - result = compute_std.py_func(dat, radius) - expected = np.array( - [0, 0, 0, 0.4, np.sqrt(30 / 125), np.sqrt(30 / 125), 0.4, 0, 0.8, np.sqrt(120 / 125), np.sqrt(120 / 125), - 0.8, 0, 0, 0]) - - np.testing.assert_almost_equal(result, expected) - - def test_to_rad_0(self): - result = to_rad.py_func(0) - expected = 0 - - self.assertAlmostEqual(result, expected) - - def test_to_rad_90(self): - result = to_rad.py_func(90) - expected = np.pi / 2 - - self.assertAlmostEqual(result, expected) - - def test_to_rad_180(self): - result = to_rad.py_func(180) - expected = np.pi - - self.assertAlmostEqual(result, expected) - - def test_to_rad_360(self): - result = to_rad.py_func(360) - expected = 2 * np.pi - - self.assertAlmostEqual(result, expected) - - def test_to_rad_m180(self): - result = to_rad.py_func(-180) - expected = -np.pi - - self.assertAlmostEqual(result, expected) - - def test_to_rad_m90(self): - result = to_rad.py_func(-90) - expected = -np.pi / 2 - - self.assertAlmostEqual(result, expected) - - def test_to_rad_270(self): - result = to_rad.py_func(270) - expected = 3 * np.pi / 2 - - self.assertAlmostEqual(result, expected) - - def test_to_rad_810(self): - result = to_rad.py_func(810) - expected = 9 * np.pi / 2 - - self.assertAlmostEqual(result, expected) - - def test_to_deg_0(self): - result = to_deg.py_func(0) - expected = 0 - - self.assertAlmostEqual(result, expected) - - def test_to_deg_90(self): - result = to_deg.py_func(np.pi / 2) - expected = 90 - - self.assertAlmostEqual(result, expected) - - def test_to_deg_180(self): - result = to_deg.py_func(np.pi) - expected = 180 - - self.assertAlmostEqual(result, expected) - - def test_to_deg_360(self): - result = to_deg.py_func(2 * np.pi) - expected = 360 - - self.assertAlmostEqual(result, expected) - - def test_to_deg_m180(self): - result = to_deg.py_func(-np.pi) - expected = -180 - - self.assertAlmostEqual(result, expected) - - def test_to_deg_m90(self): - result = to_deg.py_func(-np.pi / 2) - expected = -90 - - self.assertAlmostEqual(result, expected) - - def test_to_deg_270(self): - result = to_deg.py_func(3 * np.pi / 2) - expected = 270 - - self.assertAlmostEqual(result, expected) - - def test_to_deg_810(self): - result = to_deg.py_func(9 * np.pi / 2) - expected = 810 - - self.assertAlmostEqual(result, expected) - - def test_bearing_rand(self): - paris = (2.3522, 48.8566) - marseille = (5.3698, 43.2965) - - result = bearing.py_func(paris, marseille) - expected = 158.2694 - - self.assertAlmostEqual(result, expected, places=4) - - def test_bearing_along_equator(self): - p1 = (0, 0) - p2 = (90, 0) - - result = bearing.py_func(p1, p2) - expected = 90 - - self.assertAlmostEqual(result, expected) - - def test_bearing_equator_to_north_pole(self): - p1 = (0, 0) - p2 = (0, 90) - - result = bearing.py_func(p1, p2) - expected = 0 - - self.assertAlmostEqual(result, expected) - - def test_angle_between_three_points_1(self): - p1 = (0, -10) - p2 = (0, 0) - p3 = (10, 0) - - self.assertAlmostEqual(90, angle_between_three_points.py_func(p1, p2, p3), places=3) - - def test_angle_between_three_points_2(self): - p1 = (0, -10) - p2 = (0, 0) - p3 = (-10, 0) - - self.assertAlmostEqual(-90, angle_between_three_points.py_func(p1, p2, p3), places=3) - - def test_angle_between_three_points_3(self): - p1 = (0, -10) - p2 = (0, 0) - p3 = (10, 10) - - self.assertAlmostEqual(180 - 44.56139, angle_between_three_points.py_func(p1, p2, p3), places=3) - - def test_angle_between_three_points_4(self): - p1 = (0, 0) - p2 = (0, 10) - p3 = (0, 0) - - self.assertAlmostEqual(0, abs(angle_between_three_points.py_func(p1, p2, p3)), places=3) - + # def test_compute_angle_l1(self): + # trajectory = AISTrajectory( + # pd.DataFrame( + # { + # "ts_sec": [i for i in range(15)], + # "angles_diff": [0 for _ in range(5)] + [1 for _ in range(5)] + [-1 for i in range(5)] + # } + # ) + # ) + # + # trajectory.compute_angle_l1(2) + # + # result = trajectory.df["angle_l1"].to_numpy() + # expected = np.array([0, 0, 0, 1, 2, 3, 4, 5, 5, 5, 5, 5, 5, 5, 5]) + # + # np.testing.assert_array_equal(result, expected) + # + # def test_compute_angle_l2(self): + # trajectory = AISTrajectory( + # pd.DataFrame( + # { + # "ts_sec": [i for i in range(15)], + # "angles_diff": [0 for _ in range(5)] + [1 for _ in range(5)] + [-1 for i in range(5)] + # } + # ) + # ) + # + # trajectory.compute_angle_l2(2) + # + # result = trajectory.df["angle_l2"].to_numpy() + # expected = np.array(np.sqrt([0, 0, 0, 1, 2, 3, 4, 5, 5, 5, 5, 5, 5, 5, 5])) + # + # np.testing.assert_array_equal(result, expected) + # + # def test_compute_std(self): + # dat = np.array([0 for _ in range(5)] + [1 for _ in range(5)] + [-1 for i in range(5)]) + # radius = 2 + # + # result = compute_std.py_func(dat, radius) + # expected = np.array( + # [0, 0, 0, 0.4, np.sqrt(30 / 125), np.sqrt(30 / 125), 0.4, 0, 0.8, np.sqrt(120 / 125), np.sqrt(120 / 125), + # 0.8, 0, 0, 0]) + # + # np.testing.assert_almost_equal(result, expected) # def test_compute_position_angle_std(self): # dat = [(0, i * 10) for i in range(5)] + [(0, 50 - i * 10) for i in range(5)] @@ -406,12 +249,4 @@ class TestAISTrajectory(unittest.TestCase): # dat = [(0, i * 10) for i in range(5)] + [(0, 50 - i * 10) for i in range(5)] # result = compute_position_dist_std(dat, 2) # - # # hard to test - - def test_compute_point_angles(self): - dat = np.array([(0, 0), (0, 10), (10, 10), (0, 10)]) - result = compute_point_angles(dat) - - expected = np.array([0, 90, 0, 0]) - - np.testing.assert_almost_equal(expected, result, decimal=0) + # # hard to test \ No newline at end of file diff --git a/skais/tests/process/__init__.py b/skais/tests/process/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/skais/tests/process/test_geography.py b/skais/tests/process/test_geography.py new file mode 100644 index 0000000000000000000000000000000000000000..be1f844dfd0cfdddf9bb284ab75b6f5d1c9ab30e --- /dev/null +++ b/skais/tests/process/test_geography.py @@ -0,0 +1,168 @@ +import unittest +from skais.process.geography import * + + +class MyTestCase(unittest.TestCase): + def test_to_rad_0(self): + result = to_rad.py_func(0) + expected = 0 + + self.assertAlmostEqual(result, expected) + + def test_to_rad_90(self): + result = to_rad.py_func(90) + expected = np.pi / 2 + + self.assertAlmostEqual(result, expected) + + def test_to_rad_180(self): + result = to_rad.py_func(180) + expected = np.pi + + self.assertAlmostEqual(result, expected) + + def test_to_rad_360(self): + result = to_rad.py_func(360) + expected = 2 * np.pi + + self.assertAlmostEqual(result, expected) + + def test_to_rad_m180(self): + result = to_rad.py_func(-180) + expected = -np.pi + + self.assertAlmostEqual(result, expected) + + def test_to_rad_m90(self): + result = to_rad.py_func(-90) + expected = -np.pi / 2 + + self.assertAlmostEqual(result, expected) + + def test_to_rad_270(self): + result = to_rad.py_func(270) + expected = 3 * np.pi / 2 + + self.assertAlmostEqual(result, expected) + + def test_to_rad_810(self): + result = to_rad.py_func(810) + expected = 9 * np.pi / 2 + + self.assertAlmostEqual(result, expected) + + def test_to_deg_0(self): + result = to_deg.py_func(0) + expected = 0 + + self.assertAlmostEqual(result, expected) + + def test_to_deg_90(self): + result = to_deg.py_func(np.pi / 2) + expected = 90 + + self.assertAlmostEqual(result, expected) + + def test_to_deg_180(self): + result = to_deg.py_func(np.pi) + expected = 180 + + self.assertAlmostEqual(result, expected) + + def test_to_deg_360(self): + result = to_deg.py_func(2 * np.pi) + expected = 360 + + self.assertAlmostEqual(result, expected) + + def test_to_deg_m180(self): + result = to_deg.py_func(-np.pi) + expected = -180 + + self.assertAlmostEqual(result, expected) + + def test_to_deg_m90(self): + result = to_deg.py_func(-np.pi / 2) + expected = -90 + + self.assertAlmostEqual(result, expected) + + def test_to_deg_270(self): + result = to_deg.py_func(3 * np.pi / 2) + expected = 270 + + self.assertAlmostEqual(result, expected) + + def test_to_deg_810(self): + result = to_deg.py_func(9 * np.pi / 2) + expected = 810 + + self.assertAlmostEqual(result, expected) + + def test_bearing_rand(self): + paris = (2.3522, 48.8566) + marseille = (5.3698, 43.2965) + + result = bearing.py_func(paris, marseille) + expected = 158.2694 + + self.assertAlmostEqual(result, expected, places=4) + + def test_bearing_along_equator(self): + p1 = (0, 0) + p2 = (90, 0) + + result = bearing.py_func(p1, p2) + expected = 90 + + self.assertAlmostEqual(result, expected) + + def test_bearing_equator_to_north_pole(self): + p1 = (0, 0) + p2 = (0, 90) + + result = bearing.py_func(p1, p2) + expected = 0 + + self.assertAlmostEqual(result, expected) + + def test_angle_between_three_points_1(self): + p1 = (0, -10) + p2 = (0, 0) + p3 = (10, 0) + + self.assertAlmostEqual(90, angle_between_three_points.py_func(p1, p2, p3), places=3) + + def test_angle_between_three_points_2(self): + p1 = (0, -10) + p2 = (0, 0) + p3 = (-10, 0) + + self.assertAlmostEqual(-90, angle_between_three_points.py_func(p1, p2, p3), places=3) + + def test_angle_between_three_points_3(self): + p1 = (0, -10) + p2 = (0, 0) + p3 = (10, 10) + + self.assertAlmostEqual(180 - 44.56139, angle_between_three_points.py_func(p1, p2, p3), places=3) + + def test_angle_between_three_points_4(self): + p1 = (0, 0) + p2 = (0, 10) + p3 = (0, 0) + + self.assertAlmostEqual(0, abs(angle_between_three_points.py_func(p1, p2, p3)), places=3) + + def test_compute_point_angles(self): + dat = np.array([(0, 0), (0, 10), (10, 10), (0, 10)]) + result = compute_point_angles(dat) + + expected = np.array([0, 90, 0, 0]) + + np.testing.assert_almost_equal(expected, result, decimal=0) + self.assertTrue(True) + + +if __name__ == '__main__': + unittest.main()