Skip to content
Snippets Groups Projects
Select Git revision
  • 95d9ba69a6a00a3e820a92a304d3466f8a86f343
  • main default protected
2 results

devs-dclitest.zip

Blame
  • ais_trajectory.py 9.12 KiB
    import random
    
    import pandas as pd
    import numpy as np
    from numba import jit
    from scipy.interpolate import interp1d
    
    from skais.utils.geography import great_circle, position_from_distance
    from skais.ais.ais_points import AISPoints
    
    
    @jit(nopython=True)
    def compute_trajectory(times, time_gap):
        n_samples = len(times)
    
        if n_samples == 0:
            return 0
    
        previous_date = times[0]
    
        i = 0
        while i < n_samples and (times[i] - previous_date) <= time_gap:
            previous_date = times[i]
            i += 1
    
        return i
    
    
    def apply_func_on_window(dat, func, radius, on_edge='copy'):
        result = np.zeros(dat.shape[0])
        if on_edge == 'copy':
            if len(dat.shape) == 1:
                dat = np.concatenate([np.full(radius, dat[0]), dat, np.full(radius, dat[-1])])
            else:
                dat = np.concatenate([np.repeat(np.array(dat[0]).reshape((1, len(dat[0]))), radius, axis=0),
                                      dat,
                                      np.repeat(np.array(dat[-1]).reshape((1, len(dat[-1]))), radius, axis=0)])
            for i in range(radius, dat.shape[0] - radius):
                data = dat[i - radius:i + radius + 1]
                result[i - radius] = func(data)
            return result
        elif on_edge == 'ignore':
            for i in range(0, dat.shape[0]):
                lower_bound = max(0, i - radius)
                upper_bound = min(dat.shape[0], i + radius + 1)
                data = dat[lower_bound:upper_bound]
                result[i] = func(data)
            return result
        else:
            raise ValueError
    
    
    def apply_time_sequence(dat, time, func):
        result = np.empty(dat.shape[0])
        result[0] = func(dat[0], dat[1], time[0], time[1])
        for i in range(1, dat.shape[0]):
            result[i] = func(dat[i - 1], dat[i], time[i - 1], time[i])
        return result
    
    
    class AISTrajectory(AISPoints):
        def __init__(self, df, mmsi=0, interpolation_time=None):
            df = df.drop_duplicates(subset=['ts_sec'])
            df = df.sort_values(by=['ts_sec'])
            self.mmsi = mmsi
            if interpolation_time and len(df.index) > 4:
    
                float_columns = ['longitude', 'latitude', 'cog', 'heading', 'rot', 'sog', 'diff']
                discrete_columns = ['navstatus', 'label']
                new_df = pd.DataFrame()
                t_raw = df['ts_sec'].to_numpy()
                t_interp1d = np.arange(start=t_raw[0], stop=t_raw[-1] + 1,
                                       step=interpolation_time)
    
                new_df['ts_sec'] = t_interp1d
    
                for column in float_columns:
                    if column in df.columns:
                        new_df[column] = interp1d(x=df['ts_sec'],
                                                  y=df[column].to_numpy(),
                                                  kind='cubic')(t_interp1d)
    
                for column in discrete_columns:
                    if column in df.columns:
                        new_df[column] = interp1d(x=df['ts_sec'],
                                                  y=df[column],
                                                  kind='nearest', axis=0)(t_interp1d).astype(int)
    
                df = new_df
            if 'sog' in df.columns:
                df.loc[df['sog'] < 0, 'sog'] = 0
            AISPoints.__init__(self, df)
    
        def sliding_window(self, size=10, offset=1, fields=None):
            result = []
    
            if len(self.df.index) >= size:
                arr = self.to_numpy(fields)
                prev_index = 0
                while prev_index + size < len(self.df.index) + 1:
                    result.append(arr[prev_index:prev_index + size])
                    prev_index += offset
    
            return result
    
        def apply_func_on_time_window(self, func, radius, column, new_column=None, on_edge='copy'):
            dat = self.df[column].to_numpy()
            result = apply_func_on_window(dat, func, radius, on_edge)
    
            if new_column is None:
                self.df[column] = result
            else:
                self.df[new_column] = result
    
        # TODO rename function/simplify
        def apply_func_on_time_sequence(self, func, column, new_column=None):
            dat = self.df[column].to_numpy()
            time = self.df['ts_sec'].to_numpy()
    
            result = apply_time_sequence(dat, time, func)
    
            if new_column is None:
                self.df[column] = result
            else:
                self.df[new_column] = result
    
        def apply_func_on_points(self, func, column, new_column=None):
            dat = self.df[column].to_numpy()
    
            result = np.array(list(map(func, dat)))
    
            if new_column is None:
                self.df[column] = result
            else:
                self.df[new_column] = result
    
        def to_numpy(self, fields=None):
    
            if fields:
                df = self.df[fields]
            else:
                df = self.df
    
            return np.squeeze(df.to_numpy())
    
        def to_geojson(self):
            coordinates = []
            for index, row in self.df.iterrows():
                coordinates.append([row['longitude'], row['latitude']])
    
            return {"type": "LineString", "coordinates": coordinates}
    
        def split_trajectory(self, time_gap=600, interpolation=None):
            n_sample = len(self.df.index)
            result = []
            work_df = self.df.copy()
    
            index = 0
            while index < n_sample:
                i = compute_trajectory(self.df['ts_sec'][index:].to_numpy(), time_gap)
                trajectory = AISTrajectory(work_df[:i], interpolation)
                result.append(trajectory)
                work_df = work_df[i:]
                index += i
    
            return result
    
        def shift_trajectory_to_coordinates(self, target_coordinate=(0, 0), point_index=None, in_place=False):
            if point_index is None:
                point_index = random.randint(0, len(self.df.index) - 1)
    
            df = self.df.copy()
            new_df = df.copy()
    
            new_df['latitude'].iat[point_index] = target_coordinate[0]
            new_df['longitude'].iat[point_index] = target_coordinate[1]
    
            new_point = target_coordinate
            for i in range(point_index, 0, -1):
                current_point = (df.iloc[i]['latitude'], df.iloc[i]['longitude'])
                lat_dist = great_circle(current_point[0], df.iloc[i - 1]['latitude'], current_point[1], current_point[1])
                long_dist = great_circle(current_point[0], current_point[0], current_point[1], df.iloc[i - 1]['longitude'])
    
                if current_point[0] > df.iloc[i - 1]['latitude']:
                    lat_dist *= -1
    
                if current_point[1] > df.iloc[i - 1]['longitude']:
                    long_dist *= -1
    
                new_point = position_from_distance(new_point, (lat_dist, long_dist))
    
                new_df['latitude'].iat[i - 1] = new_point[0]
                new_df['longitude'].iat[i - 1] = new_point[1]
    
            new_point = target_coordinate
            for i in range(point_index, len(df.index) - 1):
                current_point = (df.iloc[i]['latitude'], df.iloc[i]['longitude'])
                lat_dist = great_circle(current_point[0], df.iloc[i + 1]['latitude'], current_point[1], current_point[1])
                long_dist = great_circle(current_point[0], current_point[0], current_point[1], df.iloc[i + 1]['longitude'])
    
                if current_point[0] > df.iloc[i + 1]['latitude']:
                    lat_dist *= -1
    
                if current_point[1] > df.iloc[i + 1]['longitude']:
                    long_dist *= -1
    
                new_point = position_from_distance(new_point, (lat_dist, long_dist))
    
                new_df['latitude'].iat[i + 1] = new_point[0]
                new_df['longitude'].iat[i + 1] = new_point[1]
    
            if in_place:
                self.df = new_df
                return self
            else:
                return AISTrajectory(new_df, mmsi=self.mmsi)
    
        def get_time_per_label_shift(self, label_column='label'):
            current_label = -1
            result = []
            for index, row in self.df.iterrows():
                if current_label != row[label_column]:
                    current_label = row[label_column]
                    result.append((row['ts_sec'], current_label))
            return result
    
        def generate_array_from_positions(self, height=256, width=256, link=True, bounding_box='fit', features=None, node_size=0):
            nb_channels = 1
            if features is not None:
                nb_channels = len(features)
            data = np.zeros((height, width, nb_channels), dtype=np.uint8)
            if link:
                raise ValueError("feature not implemented")
            if bounding_box != 'fit':
                raise ValueError("feature not implemented")
            positions = self.df[['longitude', 'latitude']].to_numpy()
            range_longitude = (min(positions[:, 0]), max(positions[:, 0]))
            range_latitude = (min(positions[:, 1]), max(positions[:, 1]))
            for longitude, latitude in positions:
                x_coord = max(min(height - int(height * (latitude - range_latitude[0]) / (range_latitude[1] - range_latitude[0])) - 1, height - 1), 0)
                y_coord = max(min(int((width - 1) * (longitude - range_longitude[0]) / (range_longitude[1] - range_longitude[0])), width - 1), 0)
    
                x_lower_bound = max(0, x_coord - node_size)
                x_upper_bound = min(height - 1, x_coord + node_size)
    
                y_lower_bound = max(0, y_coord - node_size)
                y_upper_bound = min(width - 1, y_coord + node_size)
    
                for x in range(x_lower_bound, x_upper_bound + 1):
                    for y in range(y_lower_bound, y_upper_bound + 1):
                        data[x, y] = [1]
            return data