diff --git a/skais/ais/ais_trajectory.py b/skais/ais/ais_trajectory.py index b719d3de5350b6ba15da13c857dd69462ac711c0..359705c1cc71a2db79d7b5c952fe5bc02c64b727 100644 --- a/skais/ais/ais_trajectory.py +++ b/skais/ais/ais_trajectory.py @@ -154,7 +154,7 @@ def apply_time_sequence(dat, time, func): class AISTrajectory(AISPoints): def __init__(self, df, interpolation_time=None): - df = df.drop_duplicates(subset=['ts_sec']) + # df = df.drop_duplicates(subset=['ts_sec']) if interpolation_time and len(df.index) > 4: @@ -184,9 +184,6 @@ class AISTrajectory(AISPoints): # self.df = df.dropna() AISPoints.__init__(self, df) - def __eq__(self, other): - return self.df.equals(other.df) - def sliding_window(self, size=10, offset=1, fields=None): result = [] @@ -208,7 +205,8 @@ class AISTrajectory(AISPoints): else: self.df[new_column] = result - def apply_time_sequence_func(self, func, column, new_column=None): + # TODO rename function/simplify + def apply_func_on_time_sequence(self, func, column, new_column=None): dat = self.df[column].to_numpy() time = self.df['ts_sec'].to_numpy() diff --git a/skais/tests/ais/test_ais_trajectory.py b/skais/tests/ais/test_ais_trajectory.py index 78fff0bb0b83fc724d6671a4d2ad9582d00b00c8..0e5ded1d6854814e1ab7cb8737b764c8c3b94435 100644 --- a/skais/tests/ais/test_ais_trajectory.py +++ b/skais/tests/ais/test_ais_trajectory.py @@ -1,5 +1,7 @@ import unittest +import numpy as np + from skais.ais.ais_trajectory import * @@ -28,7 +30,8 @@ class TestAISTrajectory(unittest.TestCase): # def test_get_stopped_snippets_multi_snippets(self): # ais_trajectory = AISTrajectory(pd.DataFrame( # { - # "label": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0], + # "label": + # [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0], # "ts_sec": [i * 60 for i in range(31)] # } # )) @@ -167,7 +170,7 @@ class TestAISTrajectory(unittest.TestCase): pd.DataFrame( { "ts_sec": [i for i in range(0, 6001, 600)], - "label": [0 for i in range(0, 3001, 600)] + [1 for i in range(3001, 6001, 600)] + "label": [0 for _ in range(0, 3001, 600)] + [1 for _ in range(3001, 6001, 600)] } ), interpolation_time=5 @@ -176,7 +179,7 @@ class TestAISTrajectory(unittest.TestCase): expected = pd.DataFrame( { "ts_sec": [i for i in range(0, 6001, 300)], - "label": [0 for i in range(0, 3301, 300)] + [1 for i in range(3301, 6001, 300)] + "label": [0 for _ in range(0, 3301, 300)] + [1 for _ in range(3301, 6001, 300)] } ) @@ -205,7 +208,7 @@ class TestAISTrajectory(unittest.TestCase): pd.DataFrame( { "ts_sec": [i for i in range(4001, 7001, 600)], - "label": [1 for i in range(4001, 7001, 600)] + "label": [1 for _ in range(4001, 7001, 600)] } ) ) @@ -213,6 +216,161 @@ class TestAISTrajectory(unittest.TestCase): for e, r in zip(expected, trajectory.split_trajectory(800)): pd.testing.assert_frame_equal(e.df.reset_index(drop=True), r.df.reset_index(drop=True)) + + def test_split_trajectory_raise(self): + trajectory = AISTrajectory( + pd.DataFrame( + { + "label": [0 for _ in range(0, 3001, 600)] + [1 for _ in range(4001, 7001, 600)] + } + ) + ) + self.assertRaises(NoTimeInformation, trajectory.split_trajectory) + + def test_apply_func_on_points_no_column(self): + trajectory = AISTrajectory( + pd.DataFrame( + { + "sog": [i for i in range(100)] + } + ) + ) + expected = pd.DataFrame( + { + "sog": [np.sqrt(i) for i in range(100)] + } + ) + + trajectory.apply_func_on_points(np.sqrt, 'sog') + + result = trajectory.df + + pd.testing.assert_frame_equal(expected.reset_index(drop=True), result.reset_index(drop=True)) + + + def test_apply_func_on_points(self): + trajectory = AISTrajectory( + pd.DataFrame( + { + "sog": [i for i in range(100)] + } + ) + ) + expected = pd.DataFrame( + { + "sog": [i for i in range(100)], + "sqrt_sog": [np.sqrt(i) for i in range(100)] + } + ) + + trajectory.apply_func_on_points(np.sqrt, 'sog', 'sqrt_sog') + + result = trajectory.df + + pd.testing.assert_frame_equal(expected.reset_index(drop=True), result.reset_index(drop=True)) + + + def test_apply_func_on_time_sequence_no_column(self): + def sog_div_ts(p1, _, t1, __): + return p1 / t1 + + trajectory = AISTrajectory( + pd.DataFrame( + { + "sog": [i + 1 for i in range(100)], + "ts_sec": [i + 1 for i in range(100)] + } + ) + ) + expected = pd.DataFrame( + { + "sog": [1.0 for _ in range(100)], + "ts_sec": [i + 1 for i in range(100)] + } + ) + + trajectory.apply_func_on_time_sequence(sog_div_ts, 'sog') + + result = trajectory.df + + pd.testing.assert_frame_equal(expected.reset_index(drop=True), result.reset_index(drop=True)) + + def test_apply_func_on_time_sequence(self): + def sog_div_ts(p1, _, t1, __): + return p1 / t1 + + trajectory = AISTrajectory( + pd.DataFrame( + { + "sog": [i + 1 for i in range(100)], + "ts_sec": [i + 1 for i in range(100)] + } + ) + ) + expected = pd.DataFrame( + { + "sog": [i + 1 for i in range(100)], + "ts_sec": [i + 1 for i in range(100)], + "div": [1.0 for _ in range(100)] + } + ) + + trajectory.apply_func_on_time_sequence(sog_div_ts, 'sog', 'div') + + result = trajectory.df + + pd.testing.assert_frame_equal(expected.reset_index(drop=True), result.reset_index(drop=True)) + + def test_apply_func_on_time_window_no_column(self): + trajectory = AISTrajectory( + pd.DataFrame( + { + "sog": [i for i in range(100)] + } + ) + ) + expected = pd.DataFrame( + { + "sog": [0.6, 1.2] + + [np.average(np.array([i - 2, i - 1, i, i + 1, i + 2])) for i in range(2, 98)] + + [97.8, 98.4] + } + ) + + trajectory.apply_func_on_time_window(np.average, 2, 'sog', ) + + result = trajectory.df + + pd.testing.assert_frame_equal(expected.reset_index(drop=True), result.reset_index(drop=True)) + + def test_apply_func_on_time_window(self): + trajectory = AISTrajectory( + pd.DataFrame( + { + "sog": [i for i in range(100)] + } + ) + ) + expected = pd.DataFrame( + { + "sog": [i for i in range(100)], + "mean": [0.6, 1.2] + + [np.average(np.array([i - 2, i - 1, i, i + 1, i + 2])) for i in range(2, 98)] + + [97.8, 98.4] + } + ) + + trajectory.apply_func_on_time_window(np.average, 2, 'sog', 'mean') + + result = trajectory.df + + pd.testing.assert_frame_equal(expected.reset_index(drop=True), result.reset_index(drop=True)) + + def test_compute_trajectory(self): + times = np.array([i for i in range(0, 3001, 600)] + [i for i in range(4001, 7001, 600)]) + + self.assertEqual(6, compute_trajectory.py_func(times, 800)) + # def test_compute_angle_l1(self): # trajectory = AISTrajectory( # pd.DataFrame( @@ -280,4 +438,4 @@ class TestAISTrajectory(unittest.TestCase): # dat = [(0, i * 10) for i in range(5)] + [(0, 50 - i * 10) for i in range(5)] # result = compute_position_dist_std(dat, 2) # - # # hard to test \ No newline at end of file + # # hard to test