Skip to content
Snippets Groups Projects
Commit 0050ce32 authored by Raphael Sturgis's avatar Raphael Sturgis
Browse files

split trajectory

parent a4210d0d
No related branches found
No related tags found
1 merge request!6Develop
...@@ -4,100 +4,102 @@ from numba import jit ...@@ -4,100 +4,102 @@ from numba import jit
from scipy.interpolate import interp1d from scipy.interpolate import interp1d
from skais.ais.ais_points import AISPoints from skais.ais.ais_points import AISPoints
from skais.process.geography import bearing
from skais.utils.geography import great_circle
from skais.utils.stats import calc_std_dev
# @jit(nopython=True)
# def compute_trajectory(times, time_gap, size_limit):
# n_samples = len(times)
#
# previous_date = times[0]
#
# i = 0
# for i in range(size_limit):
# if i >= n_samples:
# break
# if (times[i] - previous_date) / 60 > time_gap:
# break
# previous_date = times[i]
#
# return i
# @jit(nopython=True)
def compute_position_angle_std(dat, radius):
angles_stds = np.empty(dat.shape[0])
for i in range(radius, dat.shape[0] - radius):
data = dat[i - radius:i + radius]
n_samples = len(data)
center = (data[:, 0].mean(), data[:, 1].mean())
angles_sum = []
for j in range(n_samples):
p1 = (data[j][0], data[j][1])
alpha = bearing(p1, center) class NoTimeInformation(Exception):
angles_sum.append(alpha) pass
angles_stds[i] = calc_std_dev(angles_sum)
return angles_stds
@jit(nopython=True) @jit(nopython=True)
def compute_position_angle_mean(dat, radius): def compute_trajectory(times, time_gap):
angles_means = np.empty(dat.shape[0]) n_samples = len(times)
for i in range(radius, dat.shape[0] - radius):
data = dat[i - radius:i + radius]
n_samples = len(data)
center = (data[:, 0].mean(), data[:, 1].mean())
cos = sin = 0
for j in range(n_samples):
p1 = (data[j][0], data[j][1])
alpha = bearing(p1, center)
cos += np.cos(np.radians(alpha)) previous_date = times[0]
sin += np.sin(np.radians(alpha))
angles_means[i] = np.arctan2(sin, cos) i = 0
return angles_means while i < n_samples:
if (times[i] - previous_date) > time_gap:
break
def compute_position_dist_mean(dat, radius): previous_date = times[i]
dist_means = np.empty(dat.shape[0]) i += 1
for i in range(radius, dat.shape[0] - radius):
data = dat[i - radius:i + radius]
n_samples = len(data)
center = (data[:, 0].mean(), data[:, 1].mean())
dist_sum = 0 return i
for j in range(n_samples - 1):
p1 = (data[j][0], data[j][1])
dist_sum += great_circle(p1[0], center[0], p1[1], center[1])
dist_means[i] = dist_sum / (n_samples - 1)
return dist_means
def compute_position_dist_std(dat, radius):
dist_means = np.empty(dat.shape[0])
for i in range(radius, dat.shape[0] - radius):
data = dat[i - radius:i + radius]
n_samples = len(data)
center = (data[:, 0].mean(), data[:, 1].mean())
dist_sum = []
for j in range(n_samples - 1):
p1 = (data[j][0], data[j][1])
dist_sum.append(great_circle(p1[0], center[0], p1[1], center[1]))
dist_means[i] = np.std(dist_sum)
return dist_means
# @jit(nopython=True)
# def compute_position_angle_std(dat, radius):
# angles_stds = np.empty(dat.shape[0])
# for i in range(radius, dat.shape[0] - radius):
# data = dat[i - radius:i + radius]
# n_samples = len(data)
# center = (data[:, 0].mean(), data[:, 1].mean())
#
# angles_sum = []
# for j in range(n_samples):
# p1 = (data[j][0], data[j][1])
#
# alpha = bearing(p1, center)
# angles_sum.append(alpha)
#
# angles_stds[i] = calc_std_dev(angles_sum)
# return angles_stds
#
#
# @jit(nopython=True)
# def compute_position_angle_mean(dat, radius):
# angles_means = np.empty(dat.shape[0])
# for i in range(radius, dat.shape[0] - radius):
# data = dat[i - radius:i + radius]
# n_samples = len(data)
# center = (data[:, 0].mean(), data[:, 1].mean())
#
# cos = sin = 0
# for j in range(n_samples):
# p1 = (data[j][0], data[j][1])
#
# alpha = bearing(p1, center)
#
# cos += np.cos(np.radians(alpha))
# sin += np.sin(np.radians(alpha))
#
# angles_means[i] = np.arctan2(sin, cos)
# return angles_means
#
#
# def compute_position_dist_mean(dat, radius):
# dist_means = np.empty(dat.shape[0])
# for i in range(radius, dat.shape[0] - radius):
# data = dat[i - radius:i + radius]
# n_samples = len(data)
# center = (data[:, 0].mean(), data[:, 1].mean())
#
# dist_sum = 0
# for j in range(n_samples - 1):
# p1 = (data[j][0], data[j][1])
#
# dist_sum += great_circle(p1[0], center[0], p1[1], center[1])
#
# dist_means[i] = dist_sum / (n_samples - 1)
# return dist_means
#
#
# def compute_position_dist_std(dat, radius):
# dist_means = np.empty(dat.shape[0])
# for i in range(radius, dat.shape[0] - radius):
# data = dat[i - radius:i + radius]
# n_samples = len(data)
# center = (data[:, 0].mean(), data[:, 1].mean())
#
# dist_sum = []
# for j in range(n_samples - 1):
# p1 = (data[j][0], data[j][1])
#
# dist_sum.append(great_circle(p1[0], center[0], p1[1], center[1]))
#
# dist_means[i] = np.std(dist_sum)
# return dist_means
#
#
# def l1_angle(dat, radius): # def l1_angle(dat, radius):
...@@ -243,7 +245,24 @@ class AISTrajectory(AISPoints): ...@@ -243,7 +245,24 @@ class AISTrajectory(AISPoints):
return {"type": "LineString", "coordinates": coordinates} return {"type": "LineString", "coordinates": coordinates}
# def split_trajectory(self, time_gap=600):
if 'ts_sec' not in self.df:
raise NoTimeInformation()
n_sample = len(self.df.index)
result = []
work_df = self.df.copy()
index = 0
while index < n_sample:
i = compute_trajectory(self.df['ts_sec'][index:].to_numpy(), time_gap)
trajectory = AISTrajectory(work_df[:i])
result.append(trajectory)
work_df = work_df[i:]
index += i
return result
# def compute_angle_l1(self, radius): # def compute_angle_l1(self, radius):
# dat = self.df['angles_diff'].to_numpy() # dat = self.df['angles_diff'].to_numpy()
# l1 = l1_angle(dat, radius) # l1 = l1_angle(dat, radius)
......
...@@ -182,6 +182,37 @@ class TestAISTrajectory(unittest.TestCase): ...@@ -182,6 +182,37 @@ class TestAISTrajectory(unittest.TestCase):
pd.testing.assert_frame_equal(trajectory.df, expected) pd.testing.assert_frame_equal(trajectory.df, expected)
def test_split_trajectory_simple(self):
trajectory = AISTrajectory(
pd.DataFrame(
{
"ts_sec": [i for i in range(0, 3001, 600)] + [i for i in range(4001, 7001, 600)],
"label": [0 for _ in range(0, 3001, 600)] + [1 for _ in range(4001, 7001, 600)]
}
)
)
expected = [
AISTrajectory(
pd.DataFrame(
{
"ts_sec": [i for i in range(0, 3001, 600)],
"label": [0 for _ in range(0, 3001, 600)]
}
)
),
AISTrajectory(
pd.DataFrame(
{
"ts_sec": [i for i in range(4001, 7001, 600)],
"label": [1 for i in range(4001, 7001, 600)]
}
)
)
]
for e, r in zip(expected, trajectory.split_trajectory(800)):
pd.testing.assert_frame_equal(e.df.reset_index(drop=True), r.df.reset_index(drop=True))
# def test_compute_angle_l1(self): # def test_compute_angle_l1(self):
# trajectory = AISTrajectory( # trajectory = AISTrajectory(
# pd.DataFrame( # pd.DataFrame(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment