Skip to content
Snippets Groups Projects
Commit af593660 authored by Raphael's avatar Raphael
Browse files

data augmentation

parent 97f26b28
No related branches found
No related tags found
2 merge requests!12version 0.2a,!10Resolve "Image creation bugs with 0 size windows"
......@@ -4,4 +4,5 @@ numpy~=1.19.5
numba~=0.53.1
scipy~=1.5.4
hmmlearn~=0.2.6
scikit-learn~=1.0.1
\ No newline at end of file
scikit-learn~=1.0.1
tqdm~=4.62.3
\ No newline at end of file
......@@ -49,9 +49,10 @@ def apply_time_sequence(dat, time, func):
class AISTrajectory(AISPoints):
def __init__(self, df, interpolation_time=None):
def __init__(self, df, mmsi=0, interpolation_time=None):
df = df.drop_duplicates(subset=['ts_sec'])
df = df.sort_values(by=['ts_sec'])
self.mmsi = mmsi
if interpolation_time and len(df.index) > 4:
float_columns = ['longitude', 'latitude', 'cog', 'heading', 'rot', 'sog', 'diff']
......
......@@ -9,6 +9,6 @@ from skais.ais.ais_trajectory import AISTrajectory
def get_trajectories(ais_points):
trajectories = []
for mmsi in ais_points.df.mmsi.unique():
trajectories.append(AISTrajectory(ais_points.df[ais_points.df['mmsi'] == mmsi].reset_index(drop=True)))
trajectories.append(AISTrajectory(ais_points.df[ais_points.df['mmsi'] == mmsi].reset_index(drop=True),
mmsi=mmsi))
return trajectories
import tqdm as tqdm
from skais.process.data_augmentation.flip import Flip
from skais.process.data_augmentation.pipeline import Pipeline
from skais.process.data_augmentation.translator import Translator
class AugmentationEngine:
def __init__(self, translation_values, flip_values):
self.pipelines = []
for tv_long, tv_lat in translation_values:
self.pipelines.append(Pipeline([Translator(tv_long, tv_lat)]))
for fv_meridian, fv_parallel in flip_values:
self.pipelines.append(Pipeline([Flip(fv_meridian, fv_parallel)]))
for tv_long, tv_lat in translation_values:
translator = Translator(tv_long, tv_lat)
for fv_meridian, fv_parallel in flip_values:
flip = Flip(fv_meridian, fv_parallel)
self.pipelines.append(Pipeline([translator, flip]))
def transform(self, x, verbose=0):
results = x.copy()
iterator = self.pipelines
if verbose > 0:
iterator = tqdm.tqdm(self.pipelines)
for p in iterator:
results += p.transform(x)
return results
class DataAugmentor:
class DataTransformer:
def transform(self, X):
pass
from skais.ais.ais_trajectory import AISTrajectory
from skais.process.data_augmentation.data_augmentor import DataAugmentor
from skais.process.data_augmentation.data_transformer import DataTransformer
class Flip(DataAugmentor):
class Flip(DataTransformer):
def __init__(self, meridian=None, parallel=None):
self.meridian = meridian
self.parallel = parallel
def transform(self, X):
def transform(self, x):
result = []
for trajectory in X:
df = trajectory.df
df['latitude'] = -trajectory.df['latitude']
result.append(AISTrajectory(df))
if self.parallel is not None:
for trajectory in x:
df = trajectory.df.copy()
df['latitude'] = -trajectory.df['latitude']
result.append(AISTrajectory(df))
else:
result += x.copy()
return result
from skais.process.data_augmentation.data_transformer import DataTransformer
class Pipeline(DataTransformer):
def __init__(self, sequence):
for s in sequence:
assert (isinstance(s, DataTransformer))
self.sequence = sequence
def transform(self, x):
result = x.copy()
for aug in self.sequence:
result = aug.transform(result)
return result
from skais.ais.ais_trajectory import AISTrajectory
from skais.process.data_augmentation.data_augmentor import DataAugmentor
from skais.process.data_augmentation.data_transformer import DataTransformer
class Translator(DataAugmentor):
class Translator(DataTransformer):
def __init__(self, longitude, latitude):
self.longitude = longitude
self.latitude = latitude
def transform(self, X):
def transform(self, x):
result = []
for trajectory in X:
df = trajectory.df
for trajectory in x:
df = trajectory.df.copy()
df['longitude'] = trajectory.df['longitude'] + self.longitude
result.append(AISTrajectory(df))
return result
\ No newline at end of file
return result
......@@ -29,7 +29,7 @@ class TestFlip(unittest.TestCase):
self.trajectories = [t1, t2]
def test_flip_equator(self):
aug = Flip(0, None)
aug = Flip(None, 0)
result = aug.transform(self.trajectories)
......@@ -51,11 +51,38 @@ class TestFlip(unittest.TestCase):
}
)
)
expected = [t1,t2]
expected = [t1, t2]
self.assertEqual(len(expected), len(result))
for t1, t2 in zip(result, expected):
pd.testing.assert_frame_equal(t1.df, t2.df)
def test_invariance(self):
t1 = AISTrajectory(
pd.DataFrame(
{
'ts_sec': [i for i in range(10)],
'latitude': [45 + i for i in range(10)],
'longitude': [12 + i for i in range(10)]
}
)
)
t2 = AISTrajectory(
pd.DataFrame(
{
'ts_sec': [i for i in range(10)],
'latitude': [-12 + i for i in range(10)],
'longitude': [12 + i for i in range(10)]
}
)
)
expected = [t1, t2]
aug = Flip(0, None)
_ = aug.transform(self.trajectories)
for t1, t2 in zip(self.trajectories, expected):
pd.testing.assert_frame_equal(t1.df, t2.df)
if __name__ == '__main__':
unittest.main()
import unittest
from skais.ais.ais_trajectory import AISTrajectory
import pandas as pd
from skais.process.data_augmentation.translator import Translator
class TestTranslator(unittest.TestCase):
def setUp(self):
t1 = AISTrajectory(
pd.DataFrame(
{
'ts_sec': [i for i in range(10)],
'latitude': [0 for _ in range(10)],
'longitude': [12 + i for i in range(10)]
}
)
)
t2 = AISTrajectory(
pd.DataFrame(
{
'ts_sec': [i for i in range(10)],
'latitude': [-12 + i for i in range(10)],
'longitude': [12 + i for i in range(10)]
}
)
)
self.trajectories = [t1, t2]
def test_transform_longitude(self):
aug = Translator(1, 0)
result = aug.transform(self.trajectories)
t1 = AISTrajectory(
pd.DataFrame(
{
'ts_sec': [i for i in range(10)],
'latitude': [0 for _ in range(10)],
'longitude': [13 + i for i in range(10)]
}
)
)
t2 = AISTrajectory(
pd.DataFrame(
{
'ts_sec': [i for i in range(10)],
'latitude': [-12 + i for i in range(10)],
'longitude': [13 + i for i in range(10)]
}
)
)
expected = [t1, t2]
for t1, t2 in zip(result, expected):
pd.testing.assert_frame_equal(t1.df, t2.df)
def test_invariance(self):
t1 = AISTrajectory(
pd.DataFrame(
{
'ts_sec': [i for i in range(10)],
'latitude': [0 for _ in range(10)],
'longitude': [12 + i for i in range(10)]
}
)
)
t2 = AISTrajectory(
pd.DataFrame(
{
'ts_sec': [i for i in range(10)],
'latitude': [-12 + i for i in range(10)],
'longitude': [12 + i for i in range(10)]
}
)
)
expected = [t1, t2]
for t1, t2 in zip(self.trajectories, expected):
pd.testing.assert_frame_equal(t1.df, t2.df)
if __name__ == '__main__':
unittest.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment