Select Git revision
multiview_platform.tests.test_metrics.doctree
ais_trajectory.py 9.12 KiB
import random
import pandas as pd
import numpy as np
from numba import jit
from scipy.interpolate import interp1d
from skais.utils.geography import great_circle, position_from_distance
from skais.ais.ais_points import AISPoints
@jit(nopython=True)
def compute_trajectory(times, time_gap):
n_samples = len(times)
if n_samples == 0:
return 0
previous_date = times[0]
i = 0
while i < n_samples and (times[i] - previous_date) <= time_gap:
previous_date = times[i]
i += 1
return i
def apply_func_on_window(dat, func, radius, on_edge='copy'):
result = np.zeros(dat.shape[0])
if on_edge == 'copy':
if len(dat.shape) == 1:
dat = np.concatenate([np.full(radius, dat[0]), dat, np.full(radius, dat[-1])])
else:
dat = np.concatenate([np.repeat(np.array(dat[0]).reshape((1, len(dat[0]))), radius, axis=0),
dat,
np.repeat(np.array(dat[-1]).reshape((1, len(dat[-1]))), radius, axis=0)])
for i in range(radius, dat.shape[0] - radius):
data = dat[i - radius:i + radius + 1]
result[i - radius] = func(data)
return result
elif on_edge == 'ignore':
for i in range(0, dat.shape[0]):
lower_bound = max(0, i - radius)
upper_bound = min(dat.shape[0], i + radius + 1)
data = dat[lower_bound:upper_bound]
result[i] = func(data)
return result
else:
raise ValueError
def apply_time_sequence(dat, time, func):
result = np.empty(dat.shape[0])
result[0] = func(dat[0], dat[1], time[0], time[1])
for i in range(1, dat.shape[0]):
result[i] = func(dat[i - 1], dat[i], time[i - 1], time[i])
return result
class AISTrajectory(AISPoints):
def __init__(self, df, mmsi=0, interpolation_time=None):
df = df.drop_duplicates(subset=['ts_sec'])
df = df.sort_values(by=['ts_sec'])
self.mmsi = mmsi
if interpolation_time and len(df.index) > 4:
float_columns = ['longitude', 'latitude', 'cog', 'heading', 'rot', 'sog', 'diff']
discrete_columns = ['navstatus', 'label']
new_df = pd.DataFrame()
t_raw = df['ts_sec'].to_numpy()
t_interp1d = np.arange(start=t_raw[0], stop=t_raw[-1] + 1,
step=interpolation_time)
new_df['ts_sec'] = t_interp1d
for column in float_columns:
if column in df.columns:
new_df[column] = interp1d(x=df['ts_sec'],
y=df[column].to_numpy(),
kind='cubic')(t_interp1d)
for column in discrete_columns:
if column in df.columns:
new_df[column] = interp1d(x=df['ts_sec'],
y=df[column],
kind='nearest', axis=0)(t_interp1d).astype(int)
df = new_df
if 'sog' in df.columns:
df.loc[df['sog'] < 0, 'sog'] = 0
AISPoints.__init__(self, df)
def sliding_window(self, size=10, offset=1, fields=None):
result = []
if len(self.df.index) >= size:
arr = self.to_numpy(fields)
prev_index = 0
while prev_index + size < len(self.df.index) + 1:
result.append(arr[prev_index:prev_index + size])
prev_index += offset
return result
def apply_func_on_time_window(self, func, radius, column, new_column=None, on_edge='copy'):
dat = self.df[column].to_numpy()
result = apply_func_on_window(dat, func, radius, on_edge)
if new_column is None:
self.df[column] = result
else:
self.df[new_column] = result
# TODO rename function/simplify
def apply_func_on_time_sequence(self, func, column, new_column=None):
dat = self.df[column].to_numpy()
time = self.df['ts_sec'].to_numpy()
result = apply_time_sequence(dat, time, func)
if new_column is None:
self.df[column] = result
else:
self.df[new_column] = result
def apply_func_on_points(self, func, column, new_column=None):
dat = self.df[column].to_numpy()
result = np.array(list(map(func, dat)))
if new_column is None:
self.df[column] = result
else:
self.df[new_column] = result
def to_numpy(self, fields=None):
if fields:
df = self.df[fields]
else:
df = self.df
return np.squeeze(df.to_numpy())
def to_geojson(self):
coordinates = []
for index, row in self.df.iterrows():
coordinates.append([row['longitude'], row['latitude']])
return {"type": "LineString", "coordinates": coordinates}
def split_trajectory(self, time_gap=600, interpolation=None):
n_sample = len(self.df.index)
result = []
work_df = self.df.copy()
index = 0
while index < n_sample:
i = compute_trajectory(self.df['ts_sec'][index:].to_numpy(), time_gap)
trajectory = AISTrajectory(work_df[:i], interpolation)
result.append(trajectory)
work_df = work_df[i:]
index += i
return result
def shift_trajectory_to_coordinates(self, target_coordinate=(0, 0), point_index=None, in_place=False):
if point_index is None:
point_index = random.randint(0, len(self.df.index) - 1)
df = self.df.copy()
new_df = df.copy()
new_df['latitude'].iat[point_index] = target_coordinate[0]
new_df['longitude'].iat[point_index] = target_coordinate[1]
new_point = target_coordinate
for i in range(point_index, 0, -1):
current_point = (df.iloc[i]['latitude'], df.iloc[i]['longitude'])
lat_dist = great_circle(current_point[0], df.iloc[i - 1]['latitude'], current_point[1], current_point[1])
long_dist = great_circle(current_point[0], current_point[0], current_point[1], df.iloc[i - 1]['longitude'])
if current_point[0] > df.iloc[i - 1]['latitude']:
lat_dist *= -1
if current_point[1] > df.iloc[i - 1]['longitude']:
long_dist *= -1
new_point = position_from_distance(new_point, (lat_dist, long_dist))
new_df['latitude'].iat[i - 1] = new_point[0]
new_df['longitude'].iat[i - 1] = new_point[1]
new_point = target_coordinate
for i in range(point_index, len(df.index) - 1):
current_point = (df.iloc[i]['latitude'], df.iloc[i]['longitude'])
lat_dist = great_circle(current_point[0], df.iloc[i + 1]['latitude'], current_point[1], current_point[1])
long_dist = great_circle(current_point[0], current_point[0], current_point[1], df.iloc[i + 1]['longitude'])
if current_point[0] > df.iloc[i + 1]['latitude']:
lat_dist *= -1
if current_point[1] > df.iloc[i + 1]['longitude']:
long_dist *= -1
new_point = position_from_distance(new_point, (lat_dist, long_dist))
new_df['latitude'].iat[i + 1] = new_point[0]
new_df['longitude'].iat[i + 1] = new_point[1]
if in_place:
self.df = new_df
return self
else:
return AISTrajectory(new_df, mmsi=self.mmsi)
def get_time_per_label_shift(self, label_column='label'):
current_label = -1
result = []
for index, row in self.df.iterrows():
if current_label != row[label_column]:
current_label = row[label_column]
result.append((row['ts_sec'], current_label))
return result
def generate_array_from_positions(self, height=256, width=256, link=True, bounding_box='fit', features=None, node_size=0):
nb_channels = 1
if features is not None:
nb_channels = len(features)
data = np.zeros((height, width, nb_channels), dtype=np.uint8)
if link:
raise ValueError("feature not implemented")
if bounding_box != 'fit':
raise ValueError("feature not implemented")
positions = self.df[['longitude', 'latitude']].to_numpy()
range_longitude = (min(positions[:, 0]), max(positions[:, 0]))
range_latitude = (min(positions[:, 1]), max(positions[:, 1]))
for longitude, latitude in positions:
x_coord = max(min(height - int(height * (latitude - range_latitude[0]) / (range_latitude[1] - range_latitude[0])) - 1, height - 1), 0)
y_coord = max(min(int((width - 1) * (longitude - range_longitude[0]) / (range_longitude[1] - range_longitude[0])), width - 1), 0)
x_lower_bound = max(0, x_coord - node_size)
x_upper_bound = min(height - 1, x_coord + node_size)
y_lower_bound = max(0, y_coord - node_size)
y_upper_bound = min(width - 1, y_coord + node_size)
for x in range(x_lower_bound, x_upper_bound + 1):
for y in range(y_lower_bound, y_upper_bound + 1):
data[x, y] = [1]
return data