diff --git a/Clicks/Detector-solorun.py b/Clicks/Detector-solorun.py new file mode 100644 index 0000000000000000000000000000000000000000..fd362875c6ec8b62ba2f59daf12ad401da1d1bd1 --- /dev/null +++ b/Clicks/Detector-solorun.py @@ -0,0 +1,289 @@ +#%% Importations +import os +import librosa +import argparse +import numpy as np +import pandas as pd +import matplotlib.pyplot as plt +from scipy.signal import find_peaks + +from ClickUtils import TeagerKaiser_operator, butter_pass_filter + + +#%% Functions +# Argparser +def fetch_inputs(): + """ + A function to fetch inputs from cmd terminal. + Run `$python ARGS.py -h` for more information. + + ... + + Parameters + ---------- + None : Inputs are fetched from cmd terminal as arguments. + + Return + ------ + See arparser's help + """ + + # setting up arg parser + parser = argparse.ArgumentParser( + description= + ("This script requires LIBRARIES." + "\nAnd it does things, TO BE DESCRIBED.") + ) + + group1 = parser.add_argument_group('File informations') + group1.add_argument( + '-f', '--audio_file', + type=str, + nargs='?', + required=True, + help=("Path to the audio file that the script will use. " + "Supports all audio formats. Determines sampling_rate automatically.") + ) + group1.add_argument( + '-out', '--output', + type=str, + nargs='?', + default=os.path.join(".","outputs"), + required=False, + help=("Path where the contours will be saved. " + "Contours will be saved in a .json file, different for each wavefile. " + "Default path is './outputs'.") + ) + group1.add_argument( + '-v', '--verbose', + action='store_true', + required=False, + help=("If given, show messages about the progress of the script.") + ) + group1.add_argument( + '-show', '--show_plot', + action='store_true', + required=False, + help=("If given, plots the results of the selection process.") + ) + + group2 = parser.add_argument_group('Detector settings') + group2.add_argument( + '-s', '--sound_threshold', + type=float, + default=0.001, + nargs='?', + required=False, + help=("Value of amplitude used after the TK filter to set a minimum " + "value to consider a point in the detection of peaks." + "\nDefault is 0.001.") + ) + group2.add_argument( + '-c', '--channel', + type=int, + default=0, + nargs='?', + required=False, + help=("The index of the audio channel on which " + "the detection will be done. Should be in [0, nchannels-1]." + "\nDefault is 0 (first channel).") + ) + group2.add_argument( + '-k', '--click_size_sec', + type=float, + default=0.001, + nargs='?', + required=False, + help=("Duration of a click in seconds. " + "For our data it is estimated to be ~1 ms" + "\nDefault is 0.001 sec.") + ) + group2.add_argument( + '-freq', '--min_freq', + type=int, + default=50_000, + nargs='?', + required=False, + help=("Cutoff frequency to apply with the highpass filter." + "Default is 50 kHz (quite a strict criteria).") + ) + + + # fetching arguments + args = parser.parse_args() + audio_file = args.audio_file + output = args.output + sound_threshold = args.sound_threshold + channel = args.channel + click_size_sec = args.click_size_sec + min_freq = args.min_freq + + # verifying arguments + try: + assert (os.path.exists(audio_file)), ( + f"\nInputError [--audio_file]: Could not find file '{audio_file}'.") + assert (os.path.exists(output)), ( + f"\nInputError [--output]: Outputs directory '{output}' does not exist. " + "\nChange the path for the output folder or create corresponding folder.") + assert (sound_threshold > 0), ( + f"\nInputError [--sound_threshold]: sound_threshold must be positive.") + assert (channel >= 0), ( + f"\nInputError [--channel]: channel must be positive.") + assert (click_size_sec > 0), ( + f"\nInputError [--click_size_sec]: click_size_sec must be positive.") + assert (min_freq > 0), ( + f"\nInputError [--min_freq]: min_freq must be positive.") + except Exception as e: + print(e) + exit() + + return (audio_file, output, sound_threshold, channel, + click_size_sec, min_freq, args.verbose, args.show_plot) + +# Main functions +class BColors: + HEADER = '\033[95m' + OKBLUE = '\033[94m' + OKCYAN = '\033[96m' + OKGREEN = '\033[92m' + WARNING = '\033[93m' + FAIL = '\033[91m' + ENDC = '\033[0m' + BOLD = '\033[1m' + UNDERLINE = '\033[4m' + +class SimpleAutoClicksDetector: + """ + A class to detect clicks automatically in an audio recording. + + Parameters + ---------- + wavefile_path : string + Path to the wavefile in which clicks will be detected. + sound_thresh : float + Threshold for the detection of peaks in the audio signal + filtered (high-pass + teager-kaiser). + Default is 0.001. + chan : integer + In case the audio data is not mono-channel, + index of the channel on which the detection of clicks will be made. + Default is 0. + click_size_sec : float + Duration of a click in seconds. + Default is 0.001 sec. + highpass_freq : int + Cutoff frequency for the highpass filter. + Default is 50 kHz. + verbose : boolean + Wether to show prints about the process or not. + Default is False. + + Methods + ------- + create_peaks_map(): + A function to find peaks (supposedly clicks) in an audio recording. + Generates map_peaks, of the same shape that the audio recording + with 0 as a base and 1's showing the positions of detections. + """ + def __init__( + self, + wavefile_path=None, + sound_thresh=0.001, + chan=0, + click_size_sec=0.001, + highpass_freq=50_000, + verbose=False): + # fetching arguments + if (isinstance(wavefile_path, str) and + isinstance(sound_thresh, float) and + isinstance(chan, int) and + isinstance(click_size_sec, float) and + isinstance(highpass_freq, int) and + isinstance(verbose, bool)): + self.wavefile_path = wavefile_path + self.sound_thresh = sound_thresh + self.chan = chan + self.click_size_sec = click_size_sec + self.highpass_freq = highpass_freq + self.verbose = verbose + + else: + print(f"{BColors.FAIL}Error in parameters.{BColors.ENDC}") + return None + + # actual click detection + if self.verbose: + print("Loading audio data...") + self.audio_data, self.sr = librosa.load( + self.wavefile_path, sr=None, mono=False) + if self.verbose: + print("Finding clicks in waveform...") + self.create_peaks_map() + if self.verbose: + print(f"\t{BColors.OKGREEN}Done.{BColors.FAIL}") + + def create_peaks_map(self): + # assign parameters + max_length = int(self.sr*self.click_size_sec) + mini_space = int(self.sr*self.click_size_sec*2) + + # check if mono or stereo+ + if len(self.audio_data.shape) == 1: + self.signal = np.copy(self.audio_data) + else: + self.signal = np.copy(self.audio_data[self.chan]) + # detect clicks + self.signal_high = butter_pass_filter(self.signal, self.highpass_freq, self.sr, mode='high') + self.tk_signal = TeagerKaiser_operator(self.signal_high) + self.signal_peaks = find_peaks(self.tk_signal, + distance=mini_space, + width=[0,max_length], + prominence=self.sound_thresh)[0] + map_peaks = np.zeros(len(self.tk_signal), dtype=int) + map_peaks[self.signal_peaks] = 1 + + self.map_peaks = map_peaks + + +#%% Parameters +# Audio parameters +(file_path, output_path, sound_thresh, channel, + click_size_sec, fmin, verbose, do_plot) = fetch_inputs() + + +#%% Main executions +Detector = SimpleAutoClicksDetector( + wavefile_path=file_path, + sound_thresh=sound_thresh, + chan=channel, + click_size_sec=click_size_sec, + highpass_freq=fmin, + verbose = verbose) + +# save peak locations +arg_peaks = np.nonzero(Detector.map_peaks)[0] +df = pd.DataFrame() +df["time"] = arg_peaks/Detector.sr +df["signal_amplitude"] = Detector.signal[arg_peaks] +df["TK_amplitude"] = Detector.signal[arg_peaks] +df.to_csv( + os.path.join(output_path, os.path.basename(file_path)[:-4]+"_clicks-detection.csv"), + index=False) +if verbose: + print(f"\n{BColors.OKCYAN}File saved in '{output_path}'.{BColors.ENDC}") + +if do_plot: + if verbose: + print("Showing plot.") + fig, ax = plt.subplots(1,1) + ax.plot( + np.arange(0, len(Detector.signal_high))/Detector.sr, + Detector.signal_high, + color='tab:blue', zorder=-1) + ax.scatter( + arg_peaks/Detector.sr, + Detector.signal_high[arg_peaks], + color='tab:orange', s=10, zorder=1) + plt.show(block=True) + + diff --git a/Clicks/README.md b/Clicks/README.md new file mode 100644 index 0000000000000000000000000000000000000000..3f2260b3ee6a3ccc02726053cd53fc9968017ba2 --- /dev/null +++ b/Clicks/README.md @@ -0,0 +1,16 @@ +# Whistles +This repository contains the files used to analyse click data obtained during the <a href="https://umr-marbec.fr/en/the-projects/dolphinfree/">DOLPHINFREE project</a>. + +## Description +Files '1-detection_of_clicks.py', '2-get_stats_of_clicks.py', '3-projection_3features.py' and '3-projection_10features.py' were used during the DOLPHINFREE project. + +File 'Detector-solorun.py' is a simplified version that can be more easily used by anyone. + +## Get Started +For classic use, only 'Detector-solorun.py' and 'ClickUtils.py' are needed. Copy them into a folder and run to see how to use the script. + +## Results +Results are saved into .csv files. Each file contains three columns : +- "time_(sec)" : the time at which a click is detected in the audio file (in seconds). +- "signal_amplitude" : the amplitude of the signal at that time. +- "TK_amplitude" : the amplitude of the Teager-Kaiser filtered signal at that time. \ No newline at end of file diff --git a/Clicks/Results.npy b/Clicks/Results/Results.npy similarity index 100% rename from Clicks/Results.npy rename to Clicks/Results/Results.npy diff --git a/Whistles/DECAV-solorun.py b/Whistles/DECAV-solorun.py new file mode 100644 index 0000000000000000000000000000000000000000..42cbbc1c7e51ac6aba4bbe26fb07476e7f5b9b69 --- /dev/null +++ b/Whistles/DECAV-solorun.py @@ -0,0 +1,419 @@ +#%% Importations +import os +import json +import argparse +import numpy as np +from librosa import load, stft, pcen, amplitude_to_db + +from scipy import interpolate +from scipy.stats.mstats import gmean +from scipy.signal import resample + +from matplotlib import cm +import matplotlib.pyplot as plt +from matplotlib.colors import ListedColormap + +from WhistleUtils import (get_local_maxima, get_trajectories, + select_trajectories, sparsity_ridoff, harmonize_trajectories) + +#%% Functions +# Argparser +def fetch_inputs(): + """ + A function to fetch inputs from cmd terminal. + Run `$python ARGS.py -h` for more information. + + ... + + Parameters + ---------- + None : Inputs are fetched from cmd terminal as arguments. + + Return + ------ + See arparser's help + """ + + # setting up arg parser + parser = argparse.ArgumentParser( + description= + ("This script requires LIBRARIES." + "\nAnd it does things, TO BE DESCRIBED.") + ) + + group1 = parser.add_argument_group('File informations') + group1.add_argument( + '-f', '--audio_file', + type=str, + nargs='?', + required=True, + help=("Path to the audio file that the script will use. " + "Supports all audio formats. Determines sampling_rate automatically.") + ) + group1.add_argument( + '-out', '--output', + type=str, + nargs='?', + default=os.path.join(".","outputs"), + required=False, + help=("Path where the contours will be saved. " + "Contours will be saved in a .json file, different for each wavefile. " + "Default path is './outputs'.") + ) + group1.add_argument( + '-new', '--resampling_rate', + type=int, + default=48000, + nargs='?', + required=False, + help=("Resampling rate for the Wavefile. " + "Can be used to speed up spectrogram visualisation. " + "Default value is '48,000' Hz.") + ) + group1.add_argument( + '-v', '--verbose', + action='store_true', + required=False, + help=("If given, show messages about the progress of the script.") + ) + group1.add_argument( + '-show', '--show_plot', + action='store_true', + required=False, + help=("If given, plots the results of the selection process.") + ) + + group2 = parser.add_argument_group('Spectrogram settings') + group2.add_argument( + '-fft', '--n_fft', + type=int, + default=256, + nargs='?', + required=False, + help=("Size of the fft window in samples. " + "Default value is '256' samples.") + ) + group2.add_argument( + '-over', '--n_overlap', + type=int, + default=0, + nargs='?', + required=False, + help=("Number of fft windows overlapping [hop_length = nfft//(noverlap+1)]. " + "Default value is '0' overlap.") + ) + group2.add_argument( + '-high', '--high_pass', + type=int, + default=4675, + nargs='?', + required=False, + help=("Frequency for the hig-pass filter. " + "Default value is '4,675' Hz.") + ) + group2.add_argument( + '-p', '--pcen', + type=bool, + default=False, + nargs='?', + required=False, + help=("If True, uses a PCEN instead of a classical spetrogram. " + "Default is False.") + ) + + group3 = parser.add_argument_group('DECAV parameters') + group3.add_argument( + '-dist_f', '--distance_frequency', + type=int, + default=2, + nargs='?', + required=False, + help=("Maximum distance between two pixels on the frequency-axis " + "to consider that they belong to the same whistle. " + "Default value is '2' pixels.") + ) + group3.add_argument( + '-dist_t', '--distance_time', + type=int, + default=5, + nargs='?', + required=False, + help=("Maximum distance between two pixels on the time-axis" + "to consider that they belong to the same whistle." + "Default value is '5' pixels.") + ) + group3.add_argument( + '-nrg_r', '--energy_ratio', + type=int, + default=6, + nargs='?', + required=False, + help=("SNR parameter to consider that a pixel has a higher value than its neighbours." + "Default value is '6'.") + ) + group3.add_argument( + '-min_s', '--min_size', + type=float, + default=53.3, + nargs='?', + required=False, + help=("Minimum size (in ms) to keep a whistle." + "Default value is '53.3' ms.") + ) + group3.add_argument( + '-min_a', '--min_acc', + type=float, + default=0.5, + nargs='?', + required=False, + help=("Minimum acceleration of whistle trajectory to be kept." + "Default value is '0.5' Hz.s-2.") + ) + group3.add_argument( + '-max_a', '--max_acc', + type=float, + default=3, + nargs='?', + required=False, + help=("Maximum acceleration of whistle trajectory to be kept." + "Default value is '3' Hz.s-2.") + ) + group3.add_argument( + '-cc', '--correlation_coef', + type=float, + default=0.5, + nargs='?', + required=False, + help=("Limit of correlation for two whistles overlapping in time." + "If correlation > cc then the highest frequency whistle is" + "considered to be a harmonic, and therefore discarded." + "Default value is '0.5'.") + ) + group3.add_argument( + '-s', '--sparsity_coef', + type=float, + default=0.5, + nargs='?', + required=False, + help=("Cleaning value. " + "Contours with more than 'sparsity_coef'%% missing pixels are discarded." + "Default value is '0.5'."), + ) + + # fetching arguments + args = parser.parse_args() + audio_file = args.audio_file + resampling_rate = args.resampling_rate + n_fft = args.n_fft + n_overlap = args.n_overlap + high_pass = args.high_pass + output = args.output + distance_frequency = args.distance_frequency + distance_time = args.distance_time + energy_ratio = args.energy_ratio + min_size = args.min_size + min_acc = args.min_acc + max_acc = args.max_acc + correlation_coef = args.correlation_coef + sparsity_coef = args.sparsity_coef + + # verifying arguments + try: + assert (os.path.exists(audio_file)), ( + f"\nInputError: Could not find file '{audio_file}'.") + assert (os.path.exists(output)), ( + f"\nInputError: Outputs directory '{output}' does not exist. " + "\nChange the path for the output folder or create corresponding folder.") + assert (n_overlap >= 0), ( + f"\nInputError: n_overlap can not be negative.") + assert (distance_frequency > 0), ( + f"\nInputError: distance_frequency can not be negative.") + assert (distance_time > 0), ( + f"\nInputError: distance_time can not be negative.") + assert (energy_ratio > 0), ( + f"\nInputError: energy_ratio can not be negative.") + assert (min_size > 0), ( + f"\nInputError: min_size can not be negative.") + assert (min_acc >= 0), ( + f"\nInputError: min_acc can not be negative.") + assert (max_acc > 0), ( + f"\nInputError: max_acc can not be negative.") + assert (min_size >= max_acc), ( + f"\nInputError: min_size must be inferior to max_acc.") + assert (correlation_coef >= 0) and (correlation_coef <= 1), ( + f"\nInputError: correlation_coef must be in [0, 1].") + assert (sparsity_coef >= 0) and (sparsity_coef <= 1), ( + f"\nInputError: sparsity_coef must be in [0, 1].") + assert (high_pass < (resampling_rate/2)), ( + f"\nInputError: high_pass frequency too high " + "(max is resampling_rate/2).") + except Exception as e: + print(e) + exit() + + return (audio_file, resampling_rate, n_fft, n_overlap, high_pass, output, + distance_frequency, distance_time, distance_frequency, min_size, + min_acc, max_acc, correlation_coef, sparsity_coef, args.pcen, + args.verbose, args.show_plot) + +# Main functions +class BColors: + HEADER = '\033[95m' + OKBLUE = '\033[94m' + OKCYAN = '\033[96m' + OKGREEN = '\033[92m' + WARNING = '\033[93m' + FAIL = '\033[91m' + ENDC = '\033[0m' + BOLD = '\033[1m' + UNDERLINE = '\033[4m' + +# Plotting function +def plot_spectrums(list_of_spectrums, cmaps=[], direction=-1, title="Whistles", + bins=1, titles=[], ylabels=[]): + n = len(list_of_spectrums) + fig, axs = plt.subplots(nrows=n, sharex=True, sharey=True, figsize=(15,15)) + + if len(cmaps) == 0: + cmaps = ['gray_r']*n + + for i in range(n): + axs[i].imshow(list_of_spectrums[i][::direction], aspect='auto', + interpolation='nearest', cmap=cmaps[i]) + axs[i].set_yticklabels([]) + if len(titles) > 0: + axs[i].set_title(titles[i], fontsize=15) + if len(ylabels) >0: + axs[i].set_ylabel(ylabels[i], fontsize=12) + axs[i].tick_params(axis='both', which='both', labelsize=10) + axs[n-1].set_xlabel(f"Time in bins (1 bin = 1/{bins} sec)", fontsize=12) + fig.suptitle(title) + fig.tight_layout() + return fig, axs + +def array_to_dict(spectrogram_pixels, fft, overlap, sr): + """ + Function to store pixels coordinates into a dict + + Args: + spectrogram_pixels (numpy array) + fft (int): + Size of the fft window in samples + overlap (int): + Number of fft window overlapping + sr (int): + Sampling rate of the audio file + + + Returns: + dict: + Dictionnary with a list of coordinates associeted to integers + Each integer is a different whistle + """ + values = np.unique(spectrogram_pixels)[1:] + dict_traj = {} + frequencies = np.arange(0, 1 + fft / 2) * sr / fft + frequencies = frequencies[-spectrogram_pixels.shape[0]:] + for key, value in enumerate(values): + coords = [np.where(spectrogram_pixels == value)[1].tolist(), + np.where(spectrogram_pixels == value)[0].tolist()] + # convert to freq + coords[0] = [(coord*(fft/sr))/(overlap+1) for coord in coords[0]] + # convert to time + coords[1] = [frequencies[coord] for coord in coords[1]] + + dict_traj[key+1] = coords + + return dict_traj + + +#%% Parameters +# Audio parameters +(file, new_sr, nfft, noverlap, fmin, output, dist_f, dist_t, nrg_r, + taille_min, min_acce, max_acce, min_r_coef, sparsity, do_pcen, + verbose, plot) = fetch_inputs() + +f_min = round(fmin/(new_sr/nfft)) +hop_length = nfft//(noverlap+1) +taille_traj_min = round(taille_min*(new_sr/hop_length)/1000) + + +#%% Main execution +# get audio data +if verbose: + print("Loading audio file...") +signal, fe = load(file, sr=None) +duration = len(signal)/fe +# resample to decrease computation time +signal_dec = resample(signal, int((duration*new_sr))) +# extract spectral informations +Magnitude_audio = stft(signal_dec, n_fft=nfft, hop_length=hop_length) +if do_pcen: + spectrum = pcen(np.abs(Magnitude_audio) * (2**31), bias=10)[f_min:,:] +else: + spectrum = np.copy(np.abs(Magnitude_audio[f_min:,:])) +if verbose: + print(f"{BColors.OKGREEN}\tDone.{BColors.ENDC}\n") + +# Selection algorithm +if verbose: + print("Searching for local maxima...") +max_loc_per_bin_check1 = get_local_maxima(spectrum, spectrum, nrg_r)[1] +if verbose: + print("Finding contours...") +trajectories = get_trajectories(max_loc_per_bin_check1, dist_f=dist_f, dist_t=dist_t) +if verbose: + print("Cleaning contours...") +final_traj = select_trajectories(trajectories, taille_traj_min, min_acce, max_acce, verbose=0) +corrected_traj = sparsity_ridoff(final_traj, error_thresh=sparsity) +if verbose: + print("Removing harmonics...") +harmonized_traj = harmonize_trajectories(corrected_traj, min_r=min_r_coef, + min_common=taille_traj_min*2, delete=True) +if verbose: + print(f"{BColors.OKGREEN}\tContours ready!{BColors.ENDC}\n") + + +if plot: + if verbose: + print(f"Showing results...\n") + + # generate bright colors to differenciate trajectories + prism = cm.get_cmap('prism', 256) + newcolors = prism(np.linspace(0, 1, np.unique(harmonized_traj).shape[0])) + pink = np.array([0/256, 0/256, 0/256, 1]) + newcolors[0, :] = pink + newcmp = ListedColormap(newcolors) + + # By default shows the whole audio + start = 0 + stop = spectrum.shape[1] + + # Create figure + fig, axs = plot_spectrums([amplitude_to_db(spectrum), + np.copy(max_loc_per_bin_check1), + np.copy(final_traj), + np.copy(harmonized_traj)], + ['gray_r', 'gray', newcmp, newcmp], + titles=['Spectrogram (dB scale)', 'Local maxima selection', 'Extraction of continuous trajectories', + 'Exclusion of harmonics'], + ylabels=["Frequency"]*4, + bins=375, title="") + # Update view + axs[3].set_xlim(start,stop) + plt.show(block=True) + + +# Saving results +if verbose: + print("Saving .json files...") +dict1 = array_to_dict(final_traj, nfft, noverlap, new_sr) +dict2 = array_to_dict(corrected_traj, nfft, noverlap, new_sr) +dict3 = array_to_dict(harmonized_traj, nfft, noverlap, new_sr) +names = ["DECAV-results","DECAV-results-clean","DECAV-results-deharmonized"] + +for i, dict_ in enumerate([dict1, dict2, dict3]): + with open(os.path.join(output, os.path.basename(file)[:-4]+f"_{names[i]}.json"), "w") as f: + json.dump(dict_, f, indent=4) +if verbose: + print(f"{BColors.OKGREEN}It's done.{BColors.ENDC}") \ No newline at end of file diff --git a/Whistles/README.md b/Whistles/README.md new file mode 100644 index 0000000000000000000000000000000000000000..a571ca02366a7a7b5f156059abd51a96a8a807d4 --- /dev/null +++ b/Whistles/README.md @@ -0,0 +1,15 @@ +# Whistles +This repository contains the files used to analyse whistle data obtained during the <a href="https://umr-marbec.fr/en/the-projects/dolphinfree/">DOLPHINFREE project</a>. + +## Description +Files '1-Identification_of_whistles.py' and '2-get_stats_of_whistles.py' were used during the DOLPHINFREE project. + +File 'DECAV-solorun.py' is a simplified version that can be more easily used by anyone. + +## Get Started +For classic use, only 'DECAV-solorun.py' and 'WhistleUtils.py' are needed. Copy them into a folder and run to see how to use the script. + +## Results +Results are saved into .json files. Each file contains a dictionnary with : +- a set of key (integers) which are the ID of each whistle contour. +- a set of coordinates ([[x coords],[y coords]]) for the pixels associated with each contour. \ No newline at end of file