diff --git a/skais/ais/ais_trajectory.py b/skais/ais/ais_trajectory.py index f3199b35dd09a9f7f7c294ac769e158d4ad8edcf..c8b3ed0a234d9ed4186bb5c1cb234c0c327edef7 100644 --- a/skais/ais/ais_trajectory.py +++ b/skais/ais/ais_trajectory.py @@ -6,134 +6,23 @@ from scipy.interpolate import interp1d from skais.ais.ais_points import AISPoints -class NoTimeInformation(Exception): - pass - - @jit(nopython=True) def compute_trajectory(times, time_gap): n_samples = len(times) + if n_samples == 0: + return 0 + previous_date = times[0] i = 0 - while i < n_samples: - if (times[i] - previous_date) > time_gap: - break + while i < n_samples and (times[i] - previous_date) <= time_gap: previous_date = times[i] i += 1 return i -# @jit(nopython=True) -# def compute_position_angle_std(dat, radius): -# angles_stds = np.empty(dat.shape[0]) -# for i in range(radius, dat.shape[0] - radius): -# data = dat[i - radius:i + radius] -# n_samples = len(data) -# center = (data[:, 0].mean(), data[:, 1].mean()) -# -# angles_sum = [] -# for j in range(n_samples): -# p1 = (data[j][0], data[j][1]) -# -# alpha = bearing(p1, center) -# angles_sum.append(alpha) -# -# angles_stds[i] = calc_std_dev(angles_sum) -# return angles_stds -# -# -# @jit(nopython=True) -# def compute_position_angle_mean(dat, radius): -# angles_means = np.empty(dat.shape[0]) -# for i in range(radius, dat.shape[0] - radius): -# data = dat[i - radius:i + radius] -# n_samples = len(data) -# center = (data[:, 0].mean(), data[:, 1].mean()) -# -# cos = sin = 0 -# for j in range(n_samples): -# p1 = (data[j][0], data[j][1]) -# -# alpha = bearing(p1, center) -# -# cos += np.cos(np.radians(alpha)) -# sin += np.sin(np.radians(alpha)) -# -# angles_means[i] = np.arctan2(sin, cos) -# return angles_means -# -# -# def compute_position_dist_mean(dat, radius): -# dist_means = np.empty(dat.shape[0]) -# for i in range(radius, dat.shape[0] - radius): -# data = dat[i - radius:i + radius] -# n_samples = len(data) -# center = (data[:, 0].mean(), data[:, 1].mean()) -# -# dist_sum = 0 -# for j in range(n_samples - 1): -# p1 = (data[j][0], data[j][1]) -# -# dist_sum += great_circle(p1[0], center[0], p1[1], center[1]) -# -# dist_means[i] = dist_sum / (n_samples - 1) -# return dist_means -# -# -# def compute_position_dist_std(dat, radius): -# dist_means = np.empty(dat.shape[0]) -# for i in range(radius, dat.shape[0] - radius): -# data = dat[i - radius:i + radius] -# n_samples = len(data) -# center = (data[:, 0].mean(), data[:, 1].mean()) -# -# dist_sum = [] -# for j in range(n_samples - 1): -# p1 = (data[j][0], data[j][1]) -# -# dist_sum.append(great_circle(p1[0], center[0], p1[1], center[1])) -# -# dist_means[i] = np.std(dist_sum) -# return dist_means -# -# - - -# def l1_angle(dat, radius): -# l1 = np.zeros(dat.shape) -# -# dat = np.concatenate([np.full(radius, dat[0]), dat, np.full(radius, dat[-1])]) -# for i in range(radius, dat.shape[0] - radius): -# data = dat[i - radius:i + radius + 1] -# l1[i - radius] = np.linalg.norm(data, ord=1) -# -# return l1 -# -# -# def l2_angle(dat, radius): -# l2 = np.zeros(dat.shape) -# -# dat = np.concatenate([np.full(radius, dat[0]), dat, np.full(radius, dat[-1])]) -# for i in range(radius, dat.shape[0] - radius): -# data = dat[i - radius:i + radius + 1] -# l2[i - radius] = np.linalg.norm(data, ord=2) -# return l2 -# -# -# def angle_dispersion(dat, radius): -# disp = np.zeros(dat.shape) -# -# dat = np.concatenate([np.full(radius, dat[0]), dat, np.full(radius, dat[-1])]) -# for i in range(radius, dat.shape[0] - radius): -# data = dat[i - radius:i + radius + 1] -# disp[i - radius] = angular_dispersion(np.radians(data)) -# -# return disp - - def apply_func_on_window(dat, func, radius, on_edge='copy'): result = np.zeros(dat.shape) if on_edge == 'copy': @@ -142,6 +31,8 @@ def apply_func_on_window(dat, func, radius, on_edge='copy'): data = dat[i - radius:i + radius + 1] result[i - radius] = func(data) return result + else: + raise ValueError def apply_time_sequence(dat, time, func): @@ -244,9 +135,6 @@ class AISTrajectory(AISPoints): return {"type": "LineString", "coordinates": coordinates} def split_trajectory(self, time_gap=600, interpolation=None): - if 'ts_sec' not in self.df: - raise NoTimeInformation() - n_sample = len(self.df.index) result = [] work_df = self.df.copy() @@ -259,81 +147,4 @@ class AISTrajectory(AISPoints): work_df = work_df[i:] index += i - return result - - # def compute_angle_l1(self, radius): - # dat = self.df['angles_diff'].to_numpy() - # l1 = l1_angle(dat, radius) - # self.df[f"angle_l1"] = l1 - # - # def compute_angle_l2(self, radius): - # dat = self.df['angles_diff'].to_numpy() - # l2 = l2_angle(dat, radius) - # self.df[f"angle_l2"] = l2 - - # def compute_derivative(self, field): - # dt = self.df['ts_sec'].diff() / 60 - # - # dv = self.df[field].diff().div(dt, axis=0, ) - # - # self.df['d_' + field] = dv - # - # def compute_diff(self, field1, field2): - # self.df["diff"] = self.df.apply(lambda x: 180 - abs(abs(x[field1] - x[field2]) - 180), - # axis=1) - # - # def compute_all_derivatives(self): - # fields = ['cog', 'sog', 'rot', 'heading'] - # - # for field in fields: - # self.compute_derivative(field) - # - # def compute_all_stds(self, radius): - # fields = ['cog', 'sog', 'rot', 'heading', 'latitude', 'longitude'] - # - # for field in fields: - # dat = self.df[field].to_numpy() - # stds = compute_std(dat, radius) - # stds[-radius:] = np.nan - # stds[:radius] = np.nan - # self.df[f"{field}_std"] = stds - # - # def compute_all_dispersions(self, radius): - # fields = ['cog', 'heading', 'angles_diff'] - # for field in fields: - # if field in self.df.columns: - # dat = self.df[field].to_numpy() - # disp = angle_dispersion(dat, radius) - # self.df[f"{field}_disp"] = disp - # - # def compute_position_features(self, radius): - # dat = np.stack([self.df.longitude.to_numpy(), self.df.latitude.to_numpy()], axis=1) - # std = compute_position_angle_std(dat, radius) - # std[-radius:] = np.nan - # std[:radius] = np.nan - # self.df[f"angle_std"] = std - # - # mean = compute_position_angle_mean(dat, radius) - # mean[-radius:] = np.nan - # mean[:radius] = np.nan - # self.df[f"angle_mean"] = mean - # - # mean_dist = compute_position_dist_mean(dat, radius) - # mean_dist[-radius:] = np.nan - # mean_dist[:radius] = np.nan - # self.df[f"dist_mean"] = mean_dist - # - # std_dist = compute_position_dist_std(dat, radius) - # std_dist[-radius:] = np.nan - # std_dist[:radius] = np.nan - # self.df[f"dist_std"] = std_dist - # - # angles = compute_point_angles(dat) - # angles[0] = np.nan - # angles[-1] = np.nan - # self.df[f"angles_diff"] = angles - # - # def compute_sqrt_sog(self): - # sog = self.df['sog'].to_numpy() - # sog[sog < 0] = 0 - # self.df["sog_sqrt"] = np.sqrt(sog) + return result \ No newline at end of file diff --git a/skais/tests/ais/test_ais_points.py b/skais/tests/ais/test_ais_points.py index 40fe6b0f1e04ba0493a80dbfead063f377bb98d3..e4a0310ecd079aac64105174fcf495cca162e651 100644 --- a/skais/tests/ais/test_ais_points.py +++ b/skais/tests/ais/test_ais_points.py @@ -1,10 +1,9 @@ import unittest -import pandas as pd import numpy as np +import pandas as pd from skais.ais.ais_points import AISPoints -from skais.ais.ais_trajectory import AISTrajectory class TestAISPositions(unittest.TestCase): @@ -209,53 +208,6 @@ class TestAISPositions(unittest.TestCase): 40, 30, 20, 10, 0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130, 140, 150, 160, 170])) - # def test_histogram_no_label_simple(self): - # result = np.histogramdd(self.ais_points.df[["sog", "diff"]].to_numpy(), 3, [[0, 30], [0, 180]])[0] - # - # result = result / result.sum() - # - # self.assertTrue(np.array_equal(self.ais_points.histogram(["sog", "diff"], 3, [[0, 30], [0, 180]]), - # result)) - # - # def test_histogram_no_label_no_data(self): - # ais_points = AISPoints( - # pd.DataFrame( - # { - # "sog": [], - # "diff": [], - # "label": [] - # } - # ) - # ) - # - # self.assertTrue(np.array_equal(ais_points.histogram(["sog", "diff"], 3, [[0, 30], [0, 180]]), - # np.full((3, 3), 1 / 9))) - # - # def test_histogram_label(self): - # self.assertTrue(np.array_equal(self.ais_points.histogram(["sog", "diff"], 3, [[0, 30], [0, 180]], label=0), - # np.array([[3, 0, 0], [4, 4, 0], [2, 0, 0]]) / 13)) - # - # def test_histogram_joint_x_y(self): - # ground_truth = np.array([[[3, 2], [0, 1], [0, 3]], - # [[4, 2], [4, 0], [0, 0]], - # [[2, 0], [0, 0], [0, 0]]]) / 21 - # - # np.testing.assert_array_equal(ground_truth, self.ais_points.histogram_joint_x_y(x_nb_bins=3)) - # - # def test_histogram_x_knowing_y(self): - # ground_truth = np.array([[[3 / 13, 2 / 8], [0, 1 / 8], [0, 3 / 8]], - # [[4 / 13, 2 / 8], [4 / 13, 0], [0, 0]], - # [[2 / 13, 0], [0, 0], [0, 0]]]) - # - # np.testing.assert_array_equal(ground_truth, self.ais_points.histogram_x_knowing_y(x_nb_bins=3)) - # - # def test_histogram_y_knowing_x(self): - # ground_truth = np.array([[[3 / 5, 2 / 5], [0, 1], [0, 1]], - # [[4 / 6, 2 / 6], [1, 0], [13 / 21, 8 / 21]], - # [[1, 0], [13 / 21, 8 / 21], [13 / 21, 8 / 21]]]) - # - # np.testing.assert_array_equal(ground_truth, self.ais_points.histogram_y_knowing_x(x_nb_bins=3)) - def test_load_from_csv(self): result = AISPoints.load_from_csv("skais/tests/ais/test_load_from_csv.csv") @@ -271,15 +223,6 @@ class TestAISPositions(unittest.TestCase): pd.testing.assert_frame_equal(result.df.reset_index(drop=True), ais_points.df.reset_index(drop=True)) - # def test_histogram_x(self): - # ground_truth = np.array([[5, 1, 3], - # [6, 4, 0], - # [2, 0, 0]]) / 21 - # - # np.testing.assert_array_equal(ground_truth, - # self.ais_points.histogram(features=["sog", "diff"], bins=3, - # ranges=[[0, 30], [0, 180]])) - def test_fuse_single(self): ais_points = AISPoints(pd.DataFrame( { @@ -333,139 +276,4 @@ class TestAISPositions(unittest.TestCase): value['ts'] = pd.to_datetime(value.ts) pd.testing.assert_frame_equal(AISPoints.fuse(ais_points, ais_points).df.reset_index(drop=True), - value.reset_index(drop=True)) - # - # def test_compute_trajectory_simple(self): - # times = np.arange(0, 100, 4) * 60 - # result = compute_trajectory.py_func(times, 5, 1000) - # expected = 25 - # - # self.assertEqual(result, expected) - # - # def test_compute_trajectory_cut(self): - # times = np.concatenate([np.arange(0, 100, 4) * 60, np.arange(120, 200, 4) * 60]) - # result = compute_trajectory.py_func(times, 5, 1000) - # expected = 25 - # - # self.assertEqual(result, expected) - # - # def test_compute_trajectory_limit(self): - # times = np.concatenate([np.arange(0, 100, 4) * 60, np.arange(120, 200, 4) * 60]) - # result = compute_trajectory.py_func(times, 5, 10) - # expected = 10 - # - # self.assertEqual(result, expected) - # - # def test_compute_trajectories_simple_split(self): - # df = pd.DataFrame({'ts_sec': np.concatenate([np.arange(0, 100, 4) * 60, np.arange(120, 200, 4) * 60])}) - # result = compute_trajectories(df, 5, min_size=0) - # expected = [ - # pd.DataFrame({'ts_sec': np.arange(0, 100, 4) * 60}), - # pd.DataFrame({'ts_sec': np.arange(120, 200, 4) * 60}) - # ] - # - # self.assertEqual(len(expected), len(result)) - # for r, e in zip(result, expected): - # pd.testing.assert_frame_equal(e.reset_index(drop=True), r.df.reset_index(drop=True)) - # - # def test_compute_trajectories_split_limit(self): - # a = np.arange(0, 100, 4) - # b = np.arange(120, 200, 4) - # df = pd.DataFrame({'ts_sec': np.concatenate([np.arange(0, 100, 4) * 60, np.arange(120, 200, 4) * 60])}) - # result = compute_trajectories(df, 5, min_size=0, size_limit=10) - # expected = [ - # pd.DataFrame({'ts_sec': a[:10] * 60}), - # pd.DataFrame({'ts_sec': a[10:20] * 60}), - # pd.DataFrame({'ts_sec': a[20:] * 60}), - # pd.DataFrame({'ts_sec': b[:10] * 60}), - # pd.DataFrame({'ts_sec': b[10:] * 60}) - # ] - # - # self.assertEqual(len(expected), len(result)) - # for r, e in zip(result, expected): - # pd.testing.assert_frame_equal(e.reset_index(drop=True), r.df.reset_index(drop=True)) - # - # def test_compute_trajectories_split_min_size(self): - # a = np.arange(0, 100, 4) - # b = np.arange(120, 200, 4) - # print(len(b)) - # df = pd.DataFrame({'ts_sec': np.concatenate([np.arange(0, 100, 4) * 60, np.arange(120, 200, 4) * 60])}) - # result = compute_trajectories(df, 5, min_size=23) - # expected = [ - # pd.DataFrame({'ts_sec': a * 60}) - # ] - # - # self.assertEqual(len(expected), len(result)) - # for r, e in zip(result, expected): - # pd.testing.assert_frame_equal(e.reset_index(drop=True), r.df.reset_index(drop=True)) - - # def test_disjointed_histogram_label_none(self): - # ais_points = AISPoints(pd.DataFrame( - # { - # "cog": [i for i in range(0, 359, 10)], - # "heading": [180 for i in range(0, 359, 10)] - # } - # ) - # ) - # features = ["cog", "heading"] - # bins = [10, 3] - # ranges = [[0, 360], [0, 360]] - # - # result = ais_points.disjointed_histogram(features, bins, ranges) - # expected = [ - # np.array([4, 4, 3, 4, 3, 4, 4, 3, 4, 3]), - # np.array([0, 36, 0]) - # ] - # - # self.assertEqual(len(result), len(expected)) - # - # for r, e in zip(result, expected): - # np.testing.assert_array_equal(e, r[0]) - # - # def test_disjointed_histogram_label_0(self): - # ais_points = AISPoints(pd.DataFrame( - # { - # "cog": [i for i in range(0, 359, 10)], - # "heading": [180 for i in range(0, 359, 10)], - # "label": [0 for _ in range(10)] + [1 for _ in range(26)] - # } - # ) - # ) - # features = ["cog", "heading"] - # bins = [10, 3] - # ranges = [[0, 360], [0, 360]] - # - # result = ais_points.disjointed_histogram(features, bins, ranges, label=0) - # expected = [ - # np.array([4, 4, 2, 0, 0, 0, 0, 0, 0, 0]), - # np.array([0, 10, 0]) - # ] - # - # self.assertEqual(len(result), len(expected)) - # - # for r, e in zip(result, expected): - # np.testing.assert_array_equal(e, r[0]) - # - # def test_disjointed_histogram_bins_int(self): - # ais_points = AISPoints(pd.DataFrame( - # { - # "cog": [i for i in range(0, 359, 10)], - # "heading": [180 for i in range(0, 359, 10)], - # "label": [0 for _ in range(10)] + [1 for _ in range(26)] - # } - # ) - # ) - # features = ["cog", "heading"] - # bins = 10 - # ranges = [[0, 360], [0, 360]] - # - # result = ais_points.disjointed_histogram(features, bins, ranges) - # expected = [ - # np.array([4, 4, 3, 4, 3, 4, 4, 3, 4, 3]), - # np.array([0, 0, 0, 0, 0, 36, 0, 0, 0, 0]) - # ] - # - # self.assertEqual(len(result), len(expected)) - # - # for r, e in zip(result, expected): - # np.testing.assert_array_equal(e, r[0]) + value.reset_index(drop=True)) \ No newline at end of file diff --git a/skais/tests/ais/test_ais_trajectory.py b/skais/tests/ais/test_ais_trajectory.py index 0e5ded1d6854814e1ab7cb8737b764c8c3b94435..799825c93553514f88370f638d6bfca57e2592dd 100644 --- a/skais/tests/ais/test_ais_trajectory.py +++ b/skais/tests/ais/test_ais_trajectory.py @@ -1,62 +1,9 @@ import unittest -import numpy as np - from skais.ais.ais_trajectory import * class TestAISTrajectory(unittest.TestCase): - - # def test_get_stopped_snippets_simple(self): - # ais_trajectory = AISTrajectory(pd.DataFrame( - # { - # "label": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1], - # "ts_sec": [i for i in range(21)] - # } - # )) - # - # expected = pd.DataFrame( - # { - # "label": [1, 1, 1, 1, 1, 1, 1, 1], - # "ts_sec": [i for i in range(13, 21)] - # } - # ) - # - # snippets = ais_trajectory.get_stopped_snippets('label') - # - # self.assertEqual(len(snippets), 1) - # pd.testing.assert_frame_equal(expected, snippets[0].df.reset_index(drop=True)) - - # def test_get_stopped_snippets_multi_snippets(self): - # ais_trajectory = AISTrajectory(pd.DataFrame( - # { - # "label": - # [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0], - # "ts_sec": [i * 60 for i in range(31)] - # } - # )) - # - # expected = [ - # pd.DataFrame( - # { - # "label": [1, 1, 1, 1, 1, 1, 1, 1], - # "ts_sec": [i * 60 for i in range(13, 21)] - # } - # ), - # pd.DataFrame( - # { - # "label": [1, 1, 1, ], - # "ts_sec": [i * 60 for i in range(25, 28)] - # } - # ), - # ] - # - # snippets = ais_trajectory.get_stopped_snippets('label', time_gap=1) - # - # self.assertEqual(len(expected), len(snippets)) - # for e, s in zip(expected, snippets): - # pd.testing.assert_frame_equal(e, s.df.reset_index(drop=True)) - def test_to_geojson(self): trajectory = AISTrajectory( pd.DataFrame( @@ -93,6 +40,23 @@ class TestAISTrajectory(unittest.TestCase): for r, e in zip(result, expected): np.testing.assert_array_equal(r, e) + + def test_sliding_window_too_short(self): + trajectory = AISTrajectory( + pd.DataFrame( + { + "latitude": [0, 90, 0, -90], + "longitude": [0, 90, 180, -90], + "sog": [i for i in range(4)], + "ts_sec": [i for i in range(4)] + } + ) + ) + result = trajectory.sliding_window(size=5, offset=1, fields="sog") + expected = [] + + self.assertEqual(len(result), len(expected)) + self.assertListEqual(result, expected) def test_to_numpy_no_field(self): trajectory = AISTrajectory( @@ -124,23 +88,6 @@ class TestAISTrajectory(unittest.TestCase): np.testing.assert_array_equal(result, expected) - # def test_compute_sqrt_sog(self): - # trajectory = AISTrajectory( - # pd.DataFrame( - # { - # "sog": [i for i in range(4)], - # "ts_sec": [i for i in range(4)] - # } - # ) - # ) - # - # trajectory.compute_sqrt_sog() - # - # result = trajectory.df["sog_sqrt"].to_numpy() - # expected = np.array([math.sqrt(i) for i in range(4)]) - # - # np.testing.assert_array_equal(result, expected) - def test_interpolation(self): trajectory = AISTrajectory( pd.DataFrame( @@ -151,7 +98,7 @@ class TestAISTrajectory(unittest.TestCase): "ts_sec": [i for i in range(0, 6001, 600)] } ), - interpolation_time=5 + interpolation_time=300 ) expected = pd.DataFrame( @@ -173,7 +120,7 @@ class TestAISTrajectory(unittest.TestCase): "label": [0 for _ in range(0, 3001, 600)] + [1 for _ in range(3001, 6001, 600)] } ), - interpolation_time=5 + interpolation_time=300 ) expected = pd.DataFrame( @@ -217,27 +164,19 @@ class TestAISTrajectory(unittest.TestCase): for e, r in zip(expected, trajectory.split_trajectory(800)): pd.testing.assert_frame_equal(e.df.reset_index(drop=True), r.df.reset_index(drop=True)) - def test_split_trajectory_raise(self): - trajectory = AISTrajectory( - pd.DataFrame( - { - "label": [0 for _ in range(0, 3001, 600)] + [1 for _ in range(4001, 7001, 600)] - } - ) - ) - self.assertRaises(NoTimeInformation, trajectory.split_trajectory) - def test_apply_func_on_points_no_column(self): trajectory = AISTrajectory( pd.DataFrame( { - "sog": [i for i in range(100)] + "sog": [i for i in range(100)], + "ts_sec": [i for i in range(100)] } ) ) expected = pd.DataFrame( { - "sog": [np.sqrt(i) for i in range(100)] + "sog": [np.sqrt(i) for i in range(100)], + "ts_sec": [i for i in range(100)] } ) @@ -247,18 +186,19 @@ class TestAISTrajectory(unittest.TestCase): pd.testing.assert_frame_equal(expected.reset_index(drop=True), result.reset_index(drop=True)) - def test_apply_func_on_points(self): trajectory = AISTrajectory( pd.DataFrame( { - "sog": [i for i in range(100)] + "sog": [i for i in range(100)], + "ts_sec": [i for i in range(100)] } ) ) expected = pd.DataFrame( { "sog": [i for i in range(100)], + "ts_sec": [i for i in range(100)], "sqrt_sog": [np.sqrt(i) for i in range(100)] } ) @@ -269,7 +209,6 @@ class TestAISTrajectory(unittest.TestCase): pd.testing.assert_frame_equal(expected.reset_index(drop=True), result.reset_index(drop=True)) - def test_apply_func_on_time_sequence_no_column(self): def sog_div_ts(p1, _, t1, __): return p1 / t1 @@ -325,7 +264,8 @@ class TestAISTrajectory(unittest.TestCase): trajectory = AISTrajectory( pd.DataFrame( { - "sog": [i for i in range(100)] + "sog": [i for i in range(100)], + "ts_sec": [i for i in range(100)] } ) ) @@ -333,7 +273,8 @@ class TestAISTrajectory(unittest.TestCase): { "sog": [0.6, 1.2] + [np.average(np.array([i - 2, i - 1, i, i + 1, i + 2])) for i in range(2, 98)] + - [97.8, 98.4] + [97.8, 98.4], + "ts_sec": [i for i in range(100)] } ) @@ -347,16 +288,17 @@ class TestAISTrajectory(unittest.TestCase): trajectory = AISTrajectory( pd.DataFrame( { - "sog": [i for i in range(100)] + "sog": [i for i in range(100)], + "ts_sec": [i for i in range(100)] } ) ) expected = pd.DataFrame( { "sog": [i for i in range(100)], - "mean": [0.6, 1.2] + - [np.average(np.array([i - 2, i - 1, i, i + 1, i + 2])) for i in range(2, 98)] + - [97.8, 98.4] + "ts_sec": [i for i in range(100)], + "mean": [0.6, 1.2] + [np.average(np.array([i - 2, i - 1, i, i + 1, i + 2])) for i in range(2, 98)] + [ + 97.8, 98.4] } ) @@ -371,71 +313,10 @@ class TestAISTrajectory(unittest.TestCase): self.assertEqual(6, compute_trajectory.py_func(times, 800)) - # def test_compute_angle_l1(self): - # trajectory = AISTrajectory( - # pd.DataFrame( - # { - # "ts_sec": [i for i in range(15)], - # "angles_diff": [0 for _ in range(5)] + [1 for _ in range(5)] + [-1 for i in range(5)] - # } - # ) - # ) - # - # trajectory.compute_angle_l1(2) - # - # result = trajectory.df["angle_l1"].to_numpy() - # expected = np.array([0, 0, 0, 1, 2, 3, 4, 5, 5, 5, 5, 5, 5, 5, 5]) - # - # np.testing.assert_array_equal(result, expected) - # - # def test_compute_angle_l2(self): - # trajectory = AISTrajectory( - # pd.DataFrame( - # { - # "ts_sec": [i for i in range(15)], - # "angles_diff": [0 for _ in range(5)] + [1 for _ in range(5)] + [-1 for i in range(5)] - # } - # ) - # ) - # - # trajectory.compute_angle_l2(2) - # - # result = trajectory.df["angle_l2"].to_numpy() - # expected = np.array(np.sqrt([0, 0, 0, 1, 2, 3, 4, 5, 5, 5, 5, 5, 5, 5, 5])) - # - # np.testing.assert_array_equal(result, expected) - # - # def test_compute_std(self): - # dat = np.array([0 for _ in range(5)] + [1 for _ in range(5)] + [-1 for i in range(5)]) - # radius = 2 - # - # result = compute_std.py_func(dat, radius) - # expected = np.array( - # [0, 0, 0, 0.4, np.sqrt(30 / 125), np.sqrt(30 / 125), 0.4, 0, 0.8, np.sqrt(120 / 125), np.sqrt(120 / 125), - # 0.8, 0, 0, 0]) - # - # np.testing.assert_almost_equal(result, expected) - - # def test_compute_position_angle_std(self): - # dat = [(0, i * 10) for i in range(5)] + [(0, 50 - i * 10) for i in range(5)] - # result = compute_position_angle_std(dat, 2) - # - # # hard to test - # - # def test_compute_position_angle_mean(self): - # dat = [(0, i * 10) for i in range(5)] + [(0, 50 - i * 10) for i in range(5)] - # result = compute_position_angle_mean(dat, 2) - # - # # hard to test - # - # def test_compute_position_dist_mean(self): - # dat = [(0, i * 10) for i in range(5)] + [(0, 50 - i * 10) for i in range(5)] - # result = compute_position_dist_mean(dat, 2) - # - # # hard to test - # - # def test_compute_position_dist_std(self): - # dat = [(0, i * 10) for i in range(5)] + [(0, 50 - i * 10) for i in range(5)] - # result = compute_position_dist_std(dat, 2) - # - # # hard to test + def test_compute_trajectory_empty(self): + times = np.array([]) + + self.assertEqual(0, compute_trajectory.py_func(times, 800)) + + def test_apply_func_on_window(self): + self.assertRaises(ValueError, apply_func_on_window,np.arange(10), 0, 0, 'not valid string') \ No newline at end of file