Skip to content
Snippets Groups Projects
Select Git revision
  • 5477ccb6aecbac32e0a4ac419fc2634f1d3c63a5
  • master default protected
2 results

get_SNR.py

Blame
  • get_train_annot.py 11.23 KiB
    """Converts Raven format dataframe annotations to YOLO format"""
    
    import argparse
    import random
    import os
    import pandas as pd
    import numpy as np
    import matplotlib.pyplot as plt
    import soundfile as sf
    
    import cv2
    import sys
    import subprocess
    import utils
    
    from p_tqdm import p_map
    from tqdm import tqdm
    
    
    def process(entry, arguments, species_list):
        """
        Precess the annotation to get the .jpg spectrogram and the .txt annotation file
        :param x (tuple): Enumerate number, [filename, group] per file
        :param arguments (args): Arguments
        :param species_list (list): List with corresponding species
        """
        _, (filename, grp) = entry
    
        try:
            info = sf.info(filename)  # Collection recording information
            file_duration, fs = info.duration, info.samplerate
        except Exception as error:
            print(f'`{filename}` cannot be open... : {error}')
            return
    
        # create the time list between 0 and 1000 * duration
        # Create the list of all possible offset to compute spectrogram
        offset_list = np.arange(
            0, file_duration, arguments.duration - arguments.overlap)
        new_pos = utils.split_annotations(
            grp[['start', 'stop']], arguments.duration)
        grp = pd.merge(grp, new_pos)
        while len(grp) != 0:
            # collect all the data between the offset and duration-overlap
            if len(offset_list) >= 1:
                table = grp[grp.start <= offset_list[0] + arguments.duration]
            else:
                continue
    
            # create an empty dataframe
            annotation = pd.DataFrame(columns=['id', 'x', 'y', 'width', 'height'])
    
            # if no data for this period, go to the next one
            if len(table) == 0:
                offset_list = offset_list[1:]
                continue
    
            offset = offset_list[0]  # take initial time for offset
    
            name = str(grp.iloc[0].Path.replace(
                '/', '_').replace('.', '_') + '_' + str(offset))
    
            sig, fs = sf.read(filename, start=int(
                offset*fs), stop=int((offset+arguments.duration)*fs), always_2d=True)  # Load the signal
            sig = sig[:, 0]  # Only take channel 0
            if arguments.rf == None:
                arguments.rf = fs
            # Apply resample and low/high pass filter
            sig = utils.signal_processing(
                sig, rf=arguments.rf, fs=fs, high=arguments.high, low=arguments.low)
            fig = utils.create_spectrogram(
                sig, arguments.directory, names=None, 
                window_size=arguments.window,
                overlap=arguments.hop)
    
            for _, row in table.iterrows():
                specie = row.species
                x_pxl = (row.midl - offset) / \
                    arguments.duration  # take x value in pixels
                width_pxl = (row.stop - row.start) / \
                    arguments.duration  # take width value in pixels
    
                # take y value in pixels
                y_pxl = 1 - (row.midl_y / (arguments.rf / 2))
                height_pxl = (row.max_freq - row.min_freq) / \
                    (arguments.rf / 2)  # take height value in pixels
                if height_pxl > 1:
                    height_pxl = 1
                elif height_pxl > y_pxl * 2:
                    y_pxl = y_pxl + 0.5 * (height_pxl - y_pxl * 2)
                # Store the annotation in a DataFrame
                new_table = pd.DataFrame([[str(species_list[species_list.species == specie].index[0]),
                                           x_pxl, y_pxl, width_pxl, height_pxl]],
                                         columns=['id', 'x', 'y', 'width', 'height'])
    
                annotation = annotation.dropna(axis=1, how='all')
                new_table = new_table.dropna(axis=1, how='all')                       
                annotation = pd.concat([annotation, new_table])
    
            grp = grp.drop(table.index)
    
            name_file = os.path.join(arguments.directory,
                                     'labels', f'{name}.txt')
            # Create all the folder
            for folder in ['images', 'labels', 'images/all', 'annotated_images']:
                utils.create_directory(os.path.join(
                    arguments.directory, folder))
            for specie_num in species_list:
                utils.create_directory(os.path.join(
                    arguments.directory, 'images', str(specie_num)))
            # Save the images and annotation
            plt.savefig(os.path.join(arguments.directory,
                        'images', str(species_list[species_list.species ==
                                      specie].index[0]), f'{name}.jpg'))
            annotation.to_csv(name_file, sep=' ', header=False, index=False)
            plt.savefig(os.path.join(arguments.directory, 'images', 'all',
                                     f'{name}.jpg'))
            plt.close()
    
            # Add annotation to the images in another folder
            image = cv2.imread(
                os.path.join(arguments.directory, 'images', 'all', f'{name}.jpg'))
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            # For each annotation in a spectrogram
            for num, annot in annotation.iterrows():
                shp1, shape1, shp4, shape4 = utils.get_box_shape(
                    [annot, num], image)
    
                text_shape = shp1[0], shp1[1] - 5
                label = annot['id']
    
                # Add the annotation into the images as a rectangle
                cv2.rectangle(image, pt1=shape1, pt2=shape4,
                              color=colors[species_list[species_list.species ==
                                                        specie].index[0]],
                              thickness=1)
                cv2.rectangle(
                    image, pt1=shp1, pt2=shp4,
                    color=colors[species_list[species_list.species ==
                                              specie].index[0]],
                    thickness=-1)
                # Add the label associated
                cv2.putText(image, label, text_shape,
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
            plt.imshow(image)
            plt.subplots_adjust(top=1, bottom=0, left=0,
                                right=1)  # Remove the border
            plt.savefig(
                os.path.join(arguments.directory, 'annotated_images', f'{name}.jpg'))
            plt.close()
    
    
    if __name__ == '__main__':
        parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter,
                                         description='Create .txt and .jpg to each annotation '
                                         'from a csv')
        parser.add_argument('filename_path', type=str,
                            help='Path/name of the folder/file containing the annotations. If a file '
                            'use Raven format and add a Path columns with the path to the '
                            '.wav files')
        parser.add_argument('path_to_data', type=utils.arg_directory,
                            help='Path of the folder that contains the recordings')
        parser.add_argument('directory', type=utils.arg_directory,
                            help='Directory to which spectrograms and .txt files will be stored')
        parser.add_argument('--duration', type=int,
                            help='Duration for each spectrogram', default=8)
        parser.add_argument('--overlap', type=int,
                            help='Overlap in seconds between 2 spectrograms', default=0)
        parser.add_argument(
                            '--rf', type=int, help='Frequency Resampling ', default=None)
        parser.add_argument(
                            '--window', type=int, help='Window size for the Fourier Transform', default=1024)
        parser.add_argument(
                            '--hop', type=float, help='Ratio of hop in window : 50%% = 0.5', default=.5)
        parser.add_argument(
                            '--cpu', type=int, help='To speed up the process, write 2 or more', default=1)
        parser.add_argument('--high', type=int,
                            help='High Pass Filter value in Hz', default=10)
        parser.add_argument('--low', type=int,
                            help='Low Pass Filter value in Hz', default=None)
        parser.add_argument('--test', action='store_const', const=1,
                            help='Split into train/test/val. 1 - Ratio / 2 for test and'
                            ' same for validation', default=None)
        args = parser.parse_args()
    
        # Load the data and put it into a DataFrame
        df = utils.open_file(args.filename_path)
        suffix = input('Which suffix for your recording data? [wav, WAV, Wav, flac, mp3] : ')
        if len(df.columns) == 1:
            final = []
            for file, _ in df.groupby('Path'):
                new_df = utils.open_file(file)
                if len(new_df) >= 1:
                    new_df['Path'] = os.path.join(args.path_to_data, str(file.split('.Table')[0]+f'.{suffix}'))
                    final.append(new_df)
                else:
                    continue
            df = pd.concat(final)
        elif 'Path' not in df.columns:
            df['Path'] = os.path.join(args.path_to_data, args.filename_path.split('/')[-1].split('.Table')[0]+f'.{suffix}')
        df, species = utils.prepare_dataframe(df, args)
    
        colors = [(random.randint(0, 255), random.randint(0, 255),
                   random.randint(0, 255)) for _ in range(len(species))]
    
        species.to_csv(os.path.join(
            args.directory, 'species_list.csv'), index=False)
    
        if args.cpu == 1:
            for i in tqdm(enumerate(df.groupby('Path')), total=len(df.groupby('Path')),
                          desc="Processing", ascii='░▒▓█'):
                process(i, args, species)
        else:
            args = [args for i in range(len(df.groupby('Path')))]
            species = [species for i in range(len(df.groupby('Path')))]
            p_map(process, enumerate(df.groupby('Path')), args,
                  species, num_cpus=args[0].cpu, total=len(df.groupby('Path')))
            args = args[0]
        print('saved to', args.directory)
    
        if not args.test:
            # Ask user if the script split the data or not
            SPLIT = input(
                'Do you want to split your data into a random train/test/val ? [Y/N] : ')
        else:
            SPLIT = 'Y'
    
        if SPLIT == 'Y':
            print('The train set will be 70%, val set 15% and test set 15%')
            path = os.getcwd()
            # Get the current path to find the split script
            script = os.path.join(path, 'get_train_val.py')
            data_path = os.path.join(path, args.directory, 'labels')
            directory_path = os.path.join(path, args.directory, 'set')
            # Create the directory path if not exists
            utils.create_directory(directory_path)
            try : 
                # Run the split command
                os.system(f'{sys.executable} {script} {data_path} {directory_path} -r 0.7  --test')
     
                print(f'Train saved in {directory_path}\n')
                print('To train your model, use the following command : \n')
    
                yolo_path = os.path.join(path, 'yolov5/train.py')
                data_path = os.path.join(directory_path, 'custom_data.yaml')
                weights_path = os.path.join(path, 'yolov5/weights/yolov5l.pt')
                hyp_path = os.path.join(path, 'custom_hyp.yaml')
    
                command = f'python {yolo_path} --data {data_path} --imgsz 640 --epochs 100 --weights {weights_path} --hyp {hyp_path} --cache'
                print(command,'\n')
                print('\u26A0\uFE0F   Be aware that it is recommended to have background images that',
                ' represents 10% of your dataset. To do so, please use the script "get_spectrogram.py"',
                ' with --background arguments. Comptue on recordings that contains multiple type of noise...')
    
            except Exception as error:
                print(error)