Select Git revision
-
Benoit Favre authoredBenoit Favre authored
WhistleUtils.py 15.38 KiB
# -*- coding: utf-8 -*-
"""
Created on Sun Dec 26 13:12:14 2021
@author: Loïc
"""
#%% Packages importations
import csv
import os
import pandas as pd
import numpy as np
from scipy.signal import butter
from scipy.stats.mstats import gmean
from scipy.spatial import distance
from scipy import interpolate
#%% Functions process
def import_csv(csv_name, folder, useless=True, slash="/"):
"""
Parameters
----------
csv_name : STRING
Name of the csv file that needs to be imported
folder : STRING, optional
Path to the folder containing csv file.
The default is data_folder (see Parameters section).
useless : BOOLEAN, optional
Does the file contains useless infos after the table ?
If True, remove those infos
Returns
-------
data : LIST
List containing all rows of the csv file.
Exclusion of last lines if they don't end with .wav.
"""
data = []
# read data csv
with open(folder + slash + csv_name, newline="") as csv_file:
lines = csv.reader(csv_file, delimiter=',')
for row in lines:
data = data + [row]
csv_file.close()
# clean csv (last lines contain useless info)
n = 0
while useless:
# in case wrong use of the function
if n == len(data):
useless = False
n = 0
raise Exception("First column contains no audio file name")
# exit the loop when audio is found
if data[-n][0].endswith(".wav"):
useless = False
# search audio in next line
else:
n += 1
data = data[0:-n]
return data
def get_csv(csv_folder, slash="\\"):
"""
Parameters
----------
csv_folder : STRING
Path to a folder containing ONLY csv file.
slash : STRING
Separator for folders.
Returns
-------
data_20_21 : DATAFRAME
Contains the inforamtion inside the .CSVs.
sorted_names : LIST
Names corresponding to audio files in the first column of the .CSVs.
"""
csv_names = [a for a in os.listdir(csv_folder) if a.endswith('.csv')]
# import data
data = import_csv(csv_names[0], csv_folder, slash=slash)
for i in range(1,len(csv_names)):
data = data + import_csv(csv_names[i], csv_folder, slash=slash)[1:]
data_frame = pd.DataFrame(data=data[:][1:], columns=data[:][0])
# change dtype for selected columns
for i in range(3,17):
data_frame.iloc[:,i] = data_frame.iloc[:,i].astype(int)
# fetch audio files names
audio_names = np.copy([])
for filename in data_frame["Fichier Audio"]:
if filename.endswith(".wav"):
audio_names = np.append(audio_names, filename.replace('/', slash))
# sort audio files names
# We want to sort it by year
years = np.unique([path[4:8] for path in audio_names])
sorted_names = np.array([], dtype='<U49')
# and inside each year, sort it by date.
for year in years:
idx_to_sort = np.where(np.array([path[4:8] for path in audio_names]) == year)[0]
temp_path = np.copy(audio_names[idx_to_sort])
temp_path.sort()
sorted_names = np.append(temp_path, sorted_names)
return data_frame, sorted_names
#%% Trajectories algorithms
def get_local_maxima(spectrum, spectrum2, hardness, threshold=10e-5):
"""
Parameters
----------
spectrum : NUMPY ARRAY
Spectrogram (float values) of an audio signal.
hardness : INT or FLOAT
Number of times a value has to be above the geometric mean in order to be kept.
Returns
-------
local_max1 : NUMPY ARRAY
Spectrogram with 1 & 0 indicating local maxima
local_max2 : NUMPY ARRAY
Spectrogram with 1 & 0 indicating local maxima above hardness*geom_mean
"""
local_max1 = np.zeros(spectrum.shape, dtype=int)
local_max2 = np.zeros(spectrum.shape, dtype=int)
geom_mean = gmean(gmean(spectrum, axis=1))
geom_mean0 = gmean(spectrum, axis=0)
geom_mean1 = gmean(spectrum, axis=1)
# geom_mean0, geom_mean1 = gmean(spectrum), gmean(spectrum)
for each_bin in range(spectrum.shape[1]):
for freq in range(1, spectrum.shape[0]-1):
if (spectrum1[freq, each_bin] > spectrum2[freq-1, each_bin]) and \
(spectrum2[freq, each_bin] > spectrum2[freq+1, each_bin]):
local_max1[freq, each_bin] = 1
if (spectrum[freq, each_bin] > (threshold)):
if (spectrum[freq, each_bin] > (geom_mean0[each_bin]*hardness)):
if (spectrum[freq, each_bin] > (geom_mean1[freq]*hardness)):
local_max2[freq, each_bin] = 1
return local_max1, local_max2
def get_trajectories(spectro_local_max, dist_f, dist_t):
"""
Parameters
----------
spectro_local_max : NUMPY ARRAY
Matrix of 1 & zeros indicating the presence of local maxima in a spectrum.
dist_f : INT
Number of bins : Tolerance for frequency jumps in a trajectory.
dist_t : INT
Number of bins : Tolerance for time jumps in a trajectory.
Returns
-------
traj : NUMPY ARRAY
Array of int where each value (execpt 0) is a different trajectory.
"""
traj = np.zeros(spectro_local_max.shape, dtype=int)
# initialisation boucle
tr = np.where(spectro_local_max[:,0] == 1)[0]
traj[tr,0] = np.arange(1, len(tr)+1)
traj_max = len(tr)
nb_traj = np.ones(len(tr), dtype=int)
for j in range(1,spectro_local_max.shape[1]):
tr = np.nonzero(spectro_local_max[:,j])[0]
for k in range(len(tr)):
coord1_dwn = max(0, tr[k]-dist_f)
coord1_up = min(tr[k]+dist_f, spectro_local_max.shape[1])
coord2_dwn = max(0, j-dist_t)
coord2_up = j-1
traj_p = traj[coord1_dwn:(coord1_up+1), coord2_dwn:(coord2_up+1)]
traj_p_list = traj_p[traj_p!=0]
if len(traj_p_list) == 0:
traj_max = traj_max+1
traj[tr[k],j] = traj_max
nb_traj = np.append(nb_traj, 1)
else:
# add: look for closest (and not highest valued) trajectory
# if len(np.unique(traj_p_list)) <= 1:
# pos = np.argmax(nb_traj[traj_p_list-1])
# else:
# coordx, coordy = np.where(traj_p!=0)
# coords = np.array([coordx, coordy]).T
# distances = distance.cdist(coords, np.array([traj_p.shape])/2)
# closests = np.where(distances==min(distances))[0]
# pos = np.argmax(nb_traj[traj_p[coordx[closests],coordy[closests]]-1])
pos = np.argmax(nb_traj[traj_p_list-1])
traj[tr[k],j] = traj_p_list[pos]
nb_traj[traj_p_list[pos]-1] = nb_traj[traj_p_list[pos]-1]+1
so = np.sort(traj[tr,j])
doublon = so[np.where(so[1:]==so[:-1])[0]]
if len(doublon) > 0:
for k in range(len(doublon)):
pos_doublon = np.where(traj[:,j]==doublon[k])[0]
pos_ex = np.where(traj[:, max(0, j-dist_t):j]==doublon[k])[0]
pos = np.argsort(np.abs(pos_doublon-pos_ex[-1]))
traj[pos_doublon[pos[-1]],j] = traj_max+1
nb_traj = np.append(nb_traj, 1)
traj_max = traj_max+1
nb_traj[doublon[k]] = nb_traj[doublon[k]]+1
return traj
def select_trajectories(traj_matrix, min_len_traj, min_acceleration, max_acceleration, verbose=1):
"""
Parameters
----------
traj_matrix : NUMPY ARRAY
Array of int where each value (except 0) is a different trajectory.
min_len_traj : INT
Minimal length of a trajectory to be kept.
min_acceleration : FLOAT
Minimal acceleration for mean_acceleration over the trajectory.
max_acceleration : FLOAT
Minimal acceleration for max_acceleration over the trajectory.
verbose : [0, 1]
TMTC
Returns
-------
sel_traj : NUMPY ARRAY
Array of int where each value (execpt 0) is a selected trajectory.
"""
sel_traj = np.zeros(traj_matrix.shape, dtype=int)
# first selection based on length (for optimization)
values, counts = np.unique(traj_matrix, return_counts=True)
keep_values = values[counts > int(min_len_traj*0.5)]
for i in keep_values:
if verbose:
print(f"\tChecking trace traj {i+1}/{max(keep_values)}", end='\r')
x, y = np.where(traj_matrix==i)
# length verification
taille_traj = len(x)
if taille_traj > min_len_traj:
vitesse = np.diff(np.array([x,y]))[0]/np.diff(np.array([x,y]))[1]
acce = np.abs(np.diff(vitesse)/(y[2:]-y[:-2]))
acce_avg = np.mean(acce)
acce_max = np.max(acce)
if (acce_avg < min_acceleration) and (acce_max < max_acceleration):
sel_traj[np.where(traj_matrix==i)] = i
return sel_traj
def sparsity_ridoff(trajectories, error_thresh=0.5):
"""
Parameters
----------
trajectories : NUMPY ARRAY
Array of int where each value (except 0) is a different trajectory.
error_tresh : FLOAT
Proportion of the data that has to be continuous
Returns
-------
corr_traj : NUMPY ARRAY
Array of int where each value (execpt 0) is a trajectory with errors < error_thresh.
"""
corr_traj = np.zeros(trajectories.shape, dtype=int)
for k,i in enumerate(np.unique(trajectories)[1:]):
x, y = np.where(trajectories==i)
x = x[np.argsort(y)]
y = y[np.argsort(y)]
time_errors = np.sum(y[1:]-y[:-1]-1)
frequency_change = np.abs(x[1:]-x[:-1])
frequency_errors = np.sum(frequency_change[frequency_change !=0]-1)
if (time_errors/len(np.unique(y)) < error_thresh) and (frequency_errors/len(np.unique(y)) < error_thresh):
corr_traj[np.where(trajectories==i)] = k
return corr_traj
def harmonize_trajectories(trajectories, min_r, min_common=18, delete=False):
"""
Parameters
----------
trajectories : NUMPY ARRAY
Array of int where each value (except 0) is a different trajectory.
min_r : FLOAT
Minimal R squared coef of regression to consider two trajectory as correlated
min_common : INT, optional
Minimal length that 2 trajectories must have in common to be correlated
delete : BOOLEAN, optional
If True, returns trajectory array without detected harmonics.
If False, return trajectory array with detected harmonics grouped.
Returns
-------
harmonized_trajectories : NUMPY ARRAY
Array of int where each value (execpt 0) is a trajectory grouped with its harmonics/ without its harmonics
"""
values = np.unique(trajectories)[1:]
starts = np.zeros(len(values), dtype=int)
stops = np.zeros(len(values), dtype=int)
for i,trace in enumerate(values):
y = np.where(trajectories == trace)[1]
starts[i] = min(y)
stops[i] = max(y)
overlaps = [0]*len(starts)
for i in range(len(starts)):
overlaps_before = np.where(starts < stops[i])[0]
overlaps_after = np.where(stops > starts[i])[0]
overlaps[i] = np.intersect1d(overlaps_before, overlaps_after)
whistles = np.arange(1,len(starts)+1, dtype=float)
for overlap in overlaps:
if len(overlap) > 1:
tr1 = overlap[0]
for tr2 in overlap[1:]:
trace1 = np.where(trajectories == values[tr1])
trace2 = np.where(trajectories == values[tr2])
common_time = np.intersect1d(trace1[1], trace2[1])
if len(common_time) > min_common:
common_interpolation = np.arange(min(common_time), max(common_time)+1)
trace1_sort = np.argsort(trace1[1])
trace10, trace11 = trace1[0][trace1_sort], trace1[1][trace1_sort]
trace2_sort = np.argsort(trace2[1])
trace20, trace21 = trace2[0][trace2_sort], trace2[1][trace2_sort]
f1 = interpolate.interp1d(trace11, trace10)
f2 = interpolate.interp1d(trace21, trace20)
new_trace1 = f1(common_interpolation)
new_trace2 = f2(common_interpolation)
r_coef = np.corrcoef(new_trace1, new_trace2)[0,1]
else:
r_coef = 0
if r_coef > min_r:
whistles[tr2] = whistles[tr1]+(np.random.random()*0.9)
harmonized_trajectories = np.copy(trajectories).astype(float)
for i, value in enumerate(values):
harmonized_trajectories[np.where(trajectories == value)] = whistles[i]
if delete: # will need optimization
deharmonized_trajectories = np.copy(harmonized_trajectories)
values = np.unique(harmonized_trajectories)[1:]
repeated_values = np.unique(values.astype(int))
for value in repeated_values:
harmonics = np.where(values.astype(int)==value)[0]
name_trajs = values[harmonics]
lengths = np.zeros(name_trajs.shape)
for n,name in enumerate(name_trajs):
lengths[n] = len(np.where(harmonized_trajectories == name)[0])
name_trajs = np.delete(name_trajs, np.argmax(lengths))
for name in name_trajs:
deharmonized_trajectories[np.where(harmonized_trajectories==name)]=0
return deharmonized_trajectories
return harmonized_trajectories
#%% Functions plots
def get_category(samples, names, data_frame, category='acoustic'):
"""
Parameters
----------
samples : LIST OF STRINGS.
Origin of each click,
should be like ['SCW1807_20200711_083600_extract0.npy']).
names : LIST OF STRINGS
Names of each recording,
(should be like ['11072020/SCW1807_20200711_082400.wav']).
data_frame : PANDAS DATAFRAME
Dataframe containing all csv infos.
category : TYPE, optional
Category to extratc from dataframe. The default is 'acoustic'.
Should be 'acoustic', 'fishing_net', 'behavior', 'beacon', 'date',
'number', 'net' or 'none'
Returns
-------
TYPE
DESCRIPTION.
"""
use_samples = np.array([file[:23] for file in samples])
use_names = np.array([name.split('/')[-1][-27:-4] for name in names])
cat_list = np.zeros(use_samples.shape, dtype="object")
if category == 'acoustic':
start, stop = 3, 10
elif category == 'fishing_net':
start, stop = 10, 13
elif category == 'behavior':
start, stop = 13, 16
elif category == 'none':
return np.array(['none']*len(use_samples))
for i, file in enumerate(use_samples):
idx_ind = np.where(file == use_names)[0][0]
if category == 'beacon':
cat_list[i] = data_frame.iloc[idx_ind, 17]
elif category == 'date':
cat_list[i] = data_frame.iloc[idx_ind, 1]
elif category == 'number':
cat_list[i] = data_frame.iloc[idx_ind, 18]
elif category == 'net':
cat_list[i] = data_frame.iloc[idx_ind,19]
else:
column = np.argmax(data_frame.iloc[idx_ind, start:stop])+start
cat_list[i] = data_frame.columns[column]
if category == 'beacon':
NC = np.where(get_category(samples, names, data_frame, category='acoustic')
== 'AV')[0]
cat_list[NC] = 'NC'
return cat_list