Skip to content
Snippets Groups Projects
Commit dc4b8781 authored by Raphael Sturgis's avatar Raphael Sturgis
Browse files

100% converage AISTrajectory

parent 48166c93
No related branches found
No related tags found
No related merge requests found
......@@ -154,7 +154,7 @@ def apply_time_sequence(dat, time, func):
class AISTrajectory(AISPoints):
def __init__(self, df, interpolation_time=None):
df = df.drop_duplicates(subset=['ts_sec'])
# df = df.drop_duplicates(subset=['ts_sec'])
if interpolation_time and len(df.index) > 4:
......@@ -184,9 +184,6 @@ class AISTrajectory(AISPoints):
# self.df = df.dropna()
AISPoints.__init__(self, df)
def __eq__(self, other):
return self.df.equals(other.df)
def sliding_window(self, size=10, offset=1, fields=None):
result = []
......@@ -208,7 +205,8 @@ class AISTrajectory(AISPoints):
else:
self.df[new_column] = result
def apply_time_sequence_func(self, func, column, new_column=None):
# TODO rename function/simplify
def apply_func_on_time_sequence(self, func, column, new_column=None):
dat = self.df[column].to_numpy()
time = self.df['ts_sec'].to_numpy()
......
import unittest
import numpy as np
from skais.ais.ais_trajectory import *
......@@ -28,7 +30,8 @@ class TestAISTrajectory(unittest.TestCase):
# def test_get_stopped_snippets_multi_snippets(self):
# ais_trajectory = AISTrajectory(pd.DataFrame(
# {
# "label": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0],
# "label":
# [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0],
# "ts_sec": [i * 60 for i in range(31)]
# }
# ))
......@@ -167,7 +170,7 @@ class TestAISTrajectory(unittest.TestCase):
pd.DataFrame(
{
"ts_sec": [i for i in range(0, 6001, 600)],
"label": [0 for i in range(0, 3001, 600)] + [1 for i in range(3001, 6001, 600)]
"label": [0 for _ in range(0, 3001, 600)] + [1 for _ in range(3001, 6001, 600)]
}
),
interpolation_time=5
......@@ -176,7 +179,7 @@ class TestAISTrajectory(unittest.TestCase):
expected = pd.DataFrame(
{
"ts_sec": [i for i in range(0, 6001, 300)],
"label": [0 for i in range(0, 3301, 300)] + [1 for i in range(3301, 6001, 300)]
"label": [0 for _ in range(0, 3301, 300)] + [1 for _ in range(3301, 6001, 300)]
}
)
......@@ -205,7 +208,7 @@ class TestAISTrajectory(unittest.TestCase):
pd.DataFrame(
{
"ts_sec": [i for i in range(4001, 7001, 600)],
"label": [1 for i in range(4001, 7001, 600)]
"label": [1 for _ in range(4001, 7001, 600)]
}
)
)
......@@ -213,6 +216,161 @@ class TestAISTrajectory(unittest.TestCase):
for e, r in zip(expected, trajectory.split_trajectory(800)):
pd.testing.assert_frame_equal(e.df.reset_index(drop=True), r.df.reset_index(drop=True))
def test_split_trajectory_raise(self):
trajectory = AISTrajectory(
pd.DataFrame(
{
"label": [0 for _ in range(0, 3001, 600)] + [1 for _ in range(4001, 7001, 600)]
}
)
)
self.assertRaises(NoTimeInformation, trajectory.split_trajectory)
def test_apply_func_on_points_no_column(self):
trajectory = AISTrajectory(
pd.DataFrame(
{
"sog": [i for i in range(100)]
}
)
)
expected = pd.DataFrame(
{
"sog": [np.sqrt(i) for i in range(100)]
}
)
trajectory.apply_func_on_points(np.sqrt, 'sog')
result = trajectory.df
pd.testing.assert_frame_equal(expected.reset_index(drop=True), result.reset_index(drop=True))
def test_apply_func_on_points(self):
trajectory = AISTrajectory(
pd.DataFrame(
{
"sog": [i for i in range(100)]
}
)
)
expected = pd.DataFrame(
{
"sog": [i for i in range(100)],
"sqrt_sog": [np.sqrt(i) for i in range(100)]
}
)
trajectory.apply_func_on_points(np.sqrt, 'sog', 'sqrt_sog')
result = trajectory.df
pd.testing.assert_frame_equal(expected.reset_index(drop=True), result.reset_index(drop=True))
def test_apply_func_on_time_sequence_no_column(self):
def sog_div_ts(p1, _, t1, __):
return p1 / t1
trajectory = AISTrajectory(
pd.DataFrame(
{
"sog": [i + 1 for i in range(100)],
"ts_sec": [i + 1 for i in range(100)]
}
)
)
expected = pd.DataFrame(
{
"sog": [1.0 for _ in range(100)],
"ts_sec": [i + 1 for i in range(100)]
}
)
trajectory.apply_func_on_time_sequence(sog_div_ts, 'sog')
result = trajectory.df
pd.testing.assert_frame_equal(expected.reset_index(drop=True), result.reset_index(drop=True))
def test_apply_func_on_time_sequence(self):
def sog_div_ts(p1, _, t1, __):
return p1 / t1
trajectory = AISTrajectory(
pd.DataFrame(
{
"sog": [i + 1 for i in range(100)],
"ts_sec": [i + 1 for i in range(100)]
}
)
)
expected = pd.DataFrame(
{
"sog": [i + 1 for i in range(100)],
"ts_sec": [i + 1 for i in range(100)],
"div": [1.0 for _ in range(100)]
}
)
trajectory.apply_func_on_time_sequence(sog_div_ts, 'sog', 'div')
result = trajectory.df
pd.testing.assert_frame_equal(expected.reset_index(drop=True), result.reset_index(drop=True))
def test_apply_func_on_time_window_no_column(self):
trajectory = AISTrajectory(
pd.DataFrame(
{
"sog": [i for i in range(100)]
}
)
)
expected = pd.DataFrame(
{
"sog": [0.6, 1.2] +
[np.average(np.array([i - 2, i - 1, i, i + 1, i + 2])) for i in range(2, 98)] +
[97.8, 98.4]
}
)
trajectory.apply_func_on_time_window(np.average, 2, 'sog', )
result = trajectory.df
pd.testing.assert_frame_equal(expected.reset_index(drop=True), result.reset_index(drop=True))
def test_apply_func_on_time_window(self):
trajectory = AISTrajectory(
pd.DataFrame(
{
"sog": [i for i in range(100)]
}
)
)
expected = pd.DataFrame(
{
"sog": [i for i in range(100)],
"mean": [0.6, 1.2] +
[np.average(np.array([i - 2, i - 1, i, i + 1, i + 2])) for i in range(2, 98)] +
[97.8, 98.4]
}
)
trajectory.apply_func_on_time_window(np.average, 2, 'sog', 'mean')
result = trajectory.df
pd.testing.assert_frame_equal(expected.reset_index(drop=True), result.reset_index(drop=True))
def test_compute_trajectory(self):
times = np.array([i for i in range(0, 3001, 600)] + [i for i in range(4001, 7001, 600)])
self.assertEqual(6, compute_trajectory.py_func(times, 800))
# def test_compute_angle_l1(self):
# trajectory = AISTrajectory(
# pd.DataFrame(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment