Skip to content
Snippets Groups Projects
Select Git revision
  • c2ae09409f7966c334e35498f66a42cac17ce1ba
  • master default protected
2 results

fstclosure

Blame
  • WhistleUtils.py 15.38 KiB
    # -*- coding: utf-8 -*-
    """
    Created on Sun Dec 26 13:12:14 2021
    
    @author: Loïc
    """
    
    #%% Packages importations
    import csv
    import os
    
    import pandas as pd
    import numpy as np
    
    from scipy.signal import butter
    from scipy.stats.mstats import gmean
    from scipy.spatial import distance
    from scipy import interpolate
    
    #%% Functions process
    def import_csv(csv_name, folder, useless=True, slash="/"):
        """
        Parameters
        ----------
        csv_name : STRING
            Name of the csv file that needs to be imported
        folder : STRING, optional
            Path to the folder containing csv file. 
            The default is data_folder (see Parameters section).
        useless : BOOLEAN, optional
            Does the file contains useless infos after the table ? 
            If True, remove those infos
    
        Returns
        -------
        data : LIST
            List containing all rows of the csv file. 
            Exclusion of last lines if they don't end with .wav.
        """
        data = []
        
        # read data csv
        with open(folder + slash + csv_name, newline="") as csv_file:
            lines = csv.reader(csv_file, delimiter=',')
            for row in lines:
                data = data + [row]
        csv_file.close()
        
        # clean csv (last lines contain useless info)
        n = 0
        while useless:
            # in case wrong use of the function
            if n == len(data):
                useless = False
                n = 0
                raise Exception("First column contains no audio file name")
            # exit the loop when audio is found
            if data[-n][0].endswith(".wav"):
                useless = False 
            # search audio in next line
            else:
                n += 1
        
        data = data[0:-n]
        return data
    
    def get_csv(csv_folder, slash="\\"):
        """
        Parameters
        ----------
        csv_folder : STRING
            Path to a folder containing ONLY csv file.
        slash : STRING
            Separator for folders.
    
        Returns
        -------
        data_20_21 : DATAFRAME
            Contains the inforamtion inside the .CSVs.
        sorted_names : LIST
            Names corresponding to audio files in the first column of the .CSVs.
        """
        csv_names = [a for a in os.listdir(csv_folder) if a.endswith('.csv')]
    
        # import data
        data = import_csv(csv_names[0], csv_folder, slash=slash)
        for i in range(1,len(csv_names)):
            data = data + import_csv(csv_names[i], csv_folder, slash=slash)[1:]
        data_frame = pd.DataFrame(data=data[:][1:], columns=data[:][0])
    
        # change dtype for selected columns
        for i in range(3,17):
            data_frame.iloc[:,i] = data_frame.iloc[:,i].astype(int) 
    
        # fetch audio files names
        audio_names = np.copy([])
        for filename in data_frame["Fichier Audio"]:
            if filename.endswith(".wav"):
                audio_names = np.append(audio_names, filename.replace('/', slash))
    
        # sort audio files names
        # We want to sort it by year
        years = np.unique([path[4:8] for path in audio_names])
        sorted_names = np.array([], dtype='<U49')
        # and inside each year, sort it by date.
        for year in years:
            idx_to_sort = np.where(np.array([path[4:8] for path in audio_names]) == year)[0]
            temp_path = np.copy(audio_names[idx_to_sort])
            temp_path.sort()
            sorted_names = np.append(temp_path, sorted_names)
                
        return data_frame, sorted_names
    
    #%% Trajectories algorithms
    def get_local_maxima(spectrum, spectrum2, hardness, threshold=10e-5):
        """
        Parameters
        ----------
        spectrum : NUMPY ARRAY 
            Spectrogram (float values) of an audio signal. 
        hardness : INT or FLOAT
            Number of times a value has to be above the geometric mean in order to be kept.
    
        Returns
        -------
        local_max1 : NUMPY ARRAY
            Spectrogram with 1 & 0 indicating local maxima
        local_max2 : NUMPY ARRAY
            Spectrogram with 1 & 0 indicating local maxima above hardness*geom_mean
        """
        local_max1 = np.zeros(spectrum.shape, dtype=int)
        local_max2 = np.zeros(spectrum.shape, dtype=int)
        geom_mean = gmean(gmean(spectrum, axis=1))
        geom_mean0 = gmean(spectrum, axis=0)
        geom_mean1 = gmean(spectrum, axis=1)
        # geom_mean0, geom_mean1 = gmean(spectrum), gmean(spectrum)
    
        for each_bin in range(spectrum.shape[1]):
            for freq in range(1, spectrum.shape[0]-1):
                if (spectrum1[freq, each_bin] > spectrum2[freq-1, each_bin]) and \
                (spectrum2[freq, each_bin] > spectrum2[freq+1, each_bin]):
                    local_max1[freq, each_bin] = 1
                    if (spectrum[freq, each_bin] > (threshold)):
                        if (spectrum[freq, each_bin] > (geom_mean0[each_bin]*hardness)):
                            if (spectrum[freq, each_bin] > (geom_mean1[freq]*hardness)):
                                local_max2[freq, each_bin] = 1
    
        return local_max1, local_max2
    
    def get_trajectories(spectro_local_max, dist_f, dist_t):
        """
            Parameters
        ----------
        spectro_local_max : NUMPY ARRAY 
            Matrix of 1 & zeros indicating the presence of local maxima in a spectrum.
        dist_f : INT 
            Number of bins : Tolerance for frequency jumps in a trajectory.
        dist_t : INT
            Number of bins : Tolerance for time jumps in a trajectory.
    
        Returns
        -------
        traj : NUMPY ARRAY
            Array of int where each value (execpt 0) is a different trajectory. 
        """
        traj = np.zeros(spectro_local_max.shape, dtype=int)
        # initialisation boucle
        tr = np.where(spectro_local_max[:,0] == 1)[0]
        traj[tr,0] = np.arange(1, len(tr)+1)
        traj_max = len(tr)
        nb_traj = np.ones(len(tr), dtype=int)
    
        for j in range(1,spectro_local_max.shape[1]):
            tr = np.nonzero(spectro_local_max[:,j])[0]
    
            for k in range(len(tr)):
                coord1_dwn = max(0, tr[k]-dist_f)
                coord1_up = min(tr[k]+dist_f, spectro_local_max.shape[1])
                coord2_dwn = max(0, j-dist_t) 
                coord2_up = j-1
                traj_p = traj[coord1_dwn:(coord1_up+1), coord2_dwn:(coord2_up+1)]
                traj_p_list = traj_p[traj_p!=0]
    
                if len(traj_p_list) == 0:
                    traj_max = traj_max+1
                    traj[tr[k],j] = traj_max
                    nb_traj = np.append(nb_traj, 1)
                else:
                    # add: look for closest (and not highest valued) trajectory
                    # if len(np.unique(traj_p_list)) <= 1:
                    #     pos = np.argmax(nb_traj[traj_p_list-1])
                    # else:
                    #     coordx, coordy = np.where(traj_p!=0)
                    #     coords = np.array([coordx, coordy]).T
                    #     distances = distance.cdist(coords, np.array([traj_p.shape])/2)
                    #     closests = np.where(distances==min(distances))[0]
                    #     pos = np.argmax(nb_traj[traj_p[coordx[closests],coordy[closests]]-1])
                    pos = np.argmax(nb_traj[traj_p_list-1])
                    traj[tr[k],j] = traj_p_list[pos]
                    nb_traj[traj_p_list[pos]-1] = nb_traj[traj_p_list[pos]-1]+1
    
            so = np.sort(traj[tr,j])
            doublon = so[np.where(so[1:]==so[:-1])[0]]
            if len(doublon) > 0:
                for k in range(len(doublon)):
                    pos_doublon = np.where(traj[:,j]==doublon[k])[0]
                    pos_ex = np.where(traj[:, max(0, j-dist_t):j]==doublon[k])[0]
                    pos = np.argsort(np.abs(pos_doublon-pos_ex[-1]))
                    traj[pos_doublon[pos[-1]],j] = traj_max+1
                    nb_traj = np.append(nb_traj, 1)
                    traj_max = traj_max+1
                    nb_traj[doublon[k]] = nb_traj[doublon[k]]+1
        return traj
    
    def select_trajectories(traj_matrix, min_len_traj, min_acceleration, max_acceleration, verbose=1):
        """
            Parameters
        ----------
        traj_matrix : NUMPY ARRAY
            Array of int where each value (except 0) is a different trajectory.
        min_len_traj : INT 
            Minimal length of a trajectory to be kept.
        min_acceleration : FLOAT
            Minimal acceleration for mean_acceleration over the trajectory.
        max_acceleration : FLOAT
            Minimal acceleration for max_acceleration over the trajectory.
        verbose : [0, 1]
            TMTC
    
            Returns
        -------
        sel_traj : NUMPY ARRAY
            Array of int where each value (execpt 0) is a selected trajectory. 
        """
        sel_traj = np.zeros(traj_matrix.shape, dtype=int)
    
        # first selection based on length (for optimization)
        values, counts = np.unique(traj_matrix, return_counts=True)
        keep_values = values[counts > int(min_len_traj*0.5)]
    
        for i in keep_values:
            if verbose:
                print(f"\tChecking trace traj {i+1}/{max(keep_values)}", end='\r')
            x, y = np.where(traj_matrix==i)
            # length verification
            taille_traj = len(x)
    
            if taille_traj > min_len_traj:
                vitesse = np.diff(np.array([x,y]))[0]/np.diff(np.array([x,y]))[1]
                acce = np.abs(np.diff(vitesse)/(y[2:]-y[:-2]))
                acce_avg = np.mean(acce)
                acce_max = np.max(acce)
    
                if (acce_avg < min_acceleration) and (acce_max < max_acceleration):
                    sel_traj[np.where(traj_matrix==i)] = i
        return sel_traj
    
    def sparsity_ridoff(trajectories, error_thresh=0.5):
        """
            Parameters
        ----------
        trajectories : NUMPY ARRAY
            Array of int where each value (except 0) is a different trajectory.
        error_tresh : FLOAT
            Proportion of the data that has to be continuous
    
            Returns
        -------
        corr_traj : NUMPY ARRAY
            Array of int where each value (execpt 0) is a trajectory with errors < error_thresh. 
        """
        corr_traj = np.zeros(trajectories.shape, dtype=int)
        for k,i in enumerate(np.unique(trajectories)[1:]):
            x, y = np.where(trajectories==i)
            x = x[np.argsort(y)]
            y = y[np.argsort(y)]
            time_errors = np.sum(y[1:]-y[:-1]-1)
            frequency_change = np.abs(x[1:]-x[:-1])
            frequency_errors = np.sum(frequency_change[frequency_change !=0]-1)
            if (time_errors/len(np.unique(y)) < error_thresh) and (frequency_errors/len(np.unique(y)) < error_thresh):
                corr_traj[np.where(trajectories==i)] = k
        return corr_traj
    
    def harmonize_trajectories(trajectories, min_r, min_common=18, delete=False):
        """
            Parameters
        ----------
        trajectories : NUMPY ARRAY
            Array of int where each value (except 0) is a different trajectory.
        min_r : FLOAT
            Minimal R squared coef of regression to consider two trajectory as correlated
        min_common : INT, optional
            Minimal length that 2 trajectories must have in common to be correlated
        delete : BOOLEAN, optional
            If True, returns trajectory array without detected harmonics. 
            If False, return trajectory array with detected harmonics grouped. 
    
            Returns
        -------
        harmonized_trajectories : NUMPY ARRAY
            Array of int where each value (execpt 0) is a trajectory grouped with its harmonics/ without its harmonics 
        """
        values = np.unique(trajectories)[1:]
        starts = np.zeros(len(values), dtype=int)
        stops = np.zeros(len(values), dtype=int)
        for i,trace in enumerate(values):        
            y = np.where(trajectories == trace)[1]
            starts[i] = min(y)
            stops[i] = max(y)
    
        overlaps = [0]*len(starts)
        for i in range(len(starts)):
            overlaps_before = np.where(starts < stops[i])[0]
            overlaps_after = np.where(stops > starts[i])[0]
            overlaps[i] = np.intersect1d(overlaps_before, overlaps_after)
    
        whistles = np.arange(1,len(starts)+1, dtype=float)
        for overlap in overlaps:
            if len(overlap) > 1:
                tr1 = overlap[0]
                for tr2 in overlap[1:]:
                    trace1 = np.where(trajectories == values[tr1])
                    trace2 = np.where(trajectories == values[tr2])
    
                    common_time = np.intersect1d(trace1[1], trace2[1])
                    if len(common_time) > min_common:
                        common_interpolation = np.arange(min(common_time), max(common_time)+1)
    
                        trace1_sort = np.argsort(trace1[1])
                        trace10, trace11 = trace1[0][trace1_sort], trace1[1][trace1_sort]
                        trace2_sort = np.argsort(trace2[1])
                        trace20, trace21 = trace2[0][trace2_sort], trace2[1][trace2_sort]
    
                        f1 = interpolate.interp1d(trace11, trace10)
                        f2 = interpolate.interp1d(trace21, trace20)
                        new_trace1 = f1(common_interpolation)
                        new_trace2 = f2(common_interpolation)
    
                        r_coef = np.corrcoef(new_trace1, new_trace2)[0,1]
                    
                    else:
                        r_coef = 0
    
                    if r_coef > min_r:
                        whistles[tr2] = whistles[tr1]+(np.random.random()*0.9)
    
        harmonized_trajectories = np.copy(trajectories).astype(float)
        for i, value in enumerate(values):
            harmonized_trajectories[np.where(trajectories == value)] = whistles[i]
    
        if delete: # will need optimization
            deharmonized_trajectories = np.copy(harmonized_trajectories)
            values = np.unique(harmonized_trajectories)[1:]
            repeated_values = np.unique(values.astype(int))
            for value in repeated_values:
                harmonics = np.where(values.astype(int)==value)[0]
                name_trajs = values[harmonics]
                lengths = np.zeros(name_trajs.shape)
                for n,name in enumerate(name_trajs):
                    lengths[n] = len(np.where(harmonized_trajectories == name)[0])
                name_trajs = np.delete(name_trajs, np.argmax(lengths))
                for name in name_trajs:
                    deharmonized_trajectories[np.where(harmonized_trajectories==name)]=0
            return deharmonized_trajectories
    
        return harmonized_trajectories
    
    #%% Functions plots
    def get_category(samples, names, data_frame, category='acoustic'):
        """
        Parameters
        ----------
        samples : LIST OF STRINGS.
            Origin of each click,
            should be like ['SCW1807_20200711_083600_extract0.npy']).
        names : LIST OF STRINGS
            Names of each recording,
            (should be like ['11072020/SCW1807_20200711_082400.wav']).
        data_frame : PANDAS DATAFRAME
            Dataframe containing all csv infos.
        category : TYPE, optional
            Category to extratc from dataframe. The default is 'acoustic'.
            Should be 'acoustic', 'fishing_net', 'behavior', 'beacon', 'date',
            'number', 'net' or 'none'
            
        Returns
        -------
        TYPE
            DESCRIPTION.
    
        """
        use_samples = np.array([file[:23] for file in samples])
        use_names = np.array([name.split('/')[-1][-27:-4] for name in names])
        
        cat_list = np.zeros(use_samples.shape, dtype="object")
        
        if category == 'acoustic':
            start, stop = 3, 10
        
        elif category == 'fishing_net':
            start, stop = 10, 13
        
        elif category == 'behavior':
            start, stop = 13, 16
        
        elif category == 'none':
            return np.array(['none']*len(use_samples))
        
        for i, file in enumerate(use_samples):
            idx_ind = np.where(file == use_names)[0][0]
            if category == 'beacon':
                cat_list[i] = data_frame.iloc[idx_ind, 17]
            elif category == 'date':
                cat_list[i] = data_frame.iloc[idx_ind, 1]
            elif category == 'number':
                cat_list[i] = data_frame.iloc[idx_ind, 18]
            elif category == 'net':
                cat_list[i] = data_frame.iloc[idx_ind,19]
            else: 
                column = np.argmax(data_frame.iloc[idx_ind, start:stop])+start
                cat_list[i] = data_frame.columns[column]
        
        if category == 'beacon':
            NC = np.where(get_category(samples, names, data_frame, category='acoustic')
                          == 'AV')[0]
            cat_list[NC] = 'NC'
        
        return cat_list