Skip to content
Snippets Groups Projects
Commit ab21a120 authored by Loïc Lehnhoff's avatar Loïc Lehnhoff
Browse files

Added scripts and READMEs to run clicks and whistles detectors independently.

parent fe429e4e
No related branches found
No related tags found
No related merge requests found
#%% Importations
import os
import librosa
import argparse
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.signal import find_peaks
from ClickUtils import TeagerKaiser_operator, butter_pass_filter
#%% Functions
# Argparser
def fetch_inputs():
"""
A function to fetch inputs from cmd terminal.
Run `$python ARGS.py -h` for more information.
...
Parameters
----------
None : Inputs are fetched from cmd terminal as arguments.
Return
------
See arparser's help
"""
# setting up arg parser
parser = argparse.ArgumentParser(
description=
("This script requires LIBRARIES."
"\nAnd it does things, TO BE DESCRIBED.")
)
group1 = parser.add_argument_group('File informations')
group1.add_argument(
'-f', '--audio_file',
type=str,
nargs='?',
required=True,
help=("Path to the audio file that the script will use. "
"Supports all audio formats. Determines sampling_rate automatically.")
)
group1.add_argument(
'-out', '--output',
type=str,
nargs='?',
default=os.path.join(".","outputs"),
required=False,
help=("Path where the contours will be saved. "
"Contours will be saved in a .json file, different for each wavefile. "
"Default path is './outputs'.")
)
group1.add_argument(
'-v', '--verbose',
action='store_true',
required=False,
help=("If given, show messages about the progress of the script.")
)
group1.add_argument(
'-show', '--show_plot',
action='store_true',
required=False,
help=("If given, plots the results of the selection process.")
)
group2 = parser.add_argument_group('Detector settings')
group2.add_argument(
'-s', '--sound_threshold',
type=float,
default=0.001,
nargs='?',
required=False,
help=("Value of amplitude used after the TK filter to set a minimum "
"value to consider a point in the detection of peaks."
"\nDefault is 0.001.")
)
group2.add_argument(
'-c', '--channel',
type=int,
default=0,
nargs='?',
required=False,
help=("The index of the audio channel on which "
"the detection will be done. Should be in [0, nchannels-1]."
"\nDefault is 0 (first channel).")
)
group2.add_argument(
'-k', '--click_size_sec',
type=float,
default=0.001,
nargs='?',
required=False,
help=("Duration of a click in seconds. "
"For our data it is estimated to be ~1 ms"
"\nDefault is 0.001 sec.")
)
group2.add_argument(
'-freq', '--min_freq',
type=int,
default=50_000,
nargs='?',
required=False,
help=("Cutoff frequency to apply with the highpass filter."
"Default is 50 kHz (quite a strict criteria).")
)
# fetching arguments
args = parser.parse_args()
audio_file = args.audio_file
output = args.output
sound_threshold = args.sound_threshold
channel = args.channel
click_size_sec = args.click_size_sec
min_freq = args.min_freq
# verifying arguments
try:
assert (os.path.exists(audio_file)), (
f"\nInputError [--audio_file]: Could not find file '{audio_file}'.")
assert (os.path.exists(output)), (
f"\nInputError [--output]: Outputs directory '{output}' does not exist. "
"\nChange the path for the output folder or create corresponding folder.")
assert (sound_threshold > 0), (
f"\nInputError [--sound_threshold]: sound_threshold must be positive.")
assert (channel >= 0), (
f"\nInputError [--channel]: channel must be positive.")
assert (click_size_sec > 0), (
f"\nInputError [--click_size_sec]: click_size_sec must be positive.")
assert (min_freq > 0), (
f"\nInputError [--min_freq]: min_freq must be positive.")
except Exception as e:
print(e)
exit()
return (audio_file, output, sound_threshold, channel,
click_size_sec, min_freq, args.verbose, args.show_plot)
# Main functions
class BColors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
class SimpleAutoClicksDetector:
"""
A class to detect clicks automatically in an audio recording.
Parameters
----------
wavefile_path : string
Path to the wavefile in which clicks will be detected.
sound_thresh : float
Threshold for the detection of peaks in the audio signal
filtered (high-pass + teager-kaiser).
Default is 0.001.
chan : integer
In case the audio data is not mono-channel,
index of the channel on which the detection of clicks will be made.
Default is 0.
click_size_sec : float
Duration of a click in seconds.
Default is 0.001 sec.
highpass_freq : int
Cutoff frequency for the highpass filter.
Default is 50 kHz.
verbose : boolean
Wether to show prints about the process or not.
Default is False.
Methods
-------
create_peaks_map():
A function to find peaks (supposedly clicks) in an audio recording.
Generates map_peaks, of the same shape that the audio recording
with 0 as a base and 1's showing the positions of detections.
"""
def __init__(
self,
wavefile_path=None,
sound_thresh=0.001,
chan=0,
click_size_sec=0.001,
highpass_freq=50_000,
verbose=False):
# fetching arguments
if (isinstance(wavefile_path, str) and
isinstance(sound_thresh, float) and
isinstance(chan, int) and
isinstance(click_size_sec, float) and
isinstance(highpass_freq, int) and
isinstance(verbose, bool)):
self.wavefile_path = wavefile_path
self.sound_thresh = sound_thresh
self.chan = chan
self.click_size_sec = click_size_sec
self.highpass_freq = highpass_freq
self.verbose = verbose
else:
print(f"{BColors.FAIL}Error in parameters.{BColors.ENDC}")
return None
# actual click detection
if self.verbose:
print("Loading audio data...")
self.audio_data, self.sr = librosa.load(
self.wavefile_path, sr=None, mono=False)
if self.verbose:
print("Finding clicks in waveform...")
self.create_peaks_map()
if self.verbose:
print(f"\t{BColors.OKGREEN}Done.{BColors.FAIL}")
def create_peaks_map(self):
# assign parameters
max_length = int(self.sr*self.click_size_sec)
mini_space = int(self.sr*self.click_size_sec*2)
# check if mono or stereo+
if len(self.audio_data.shape) == 1:
self.signal = np.copy(self.audio_data)
else:
self.signal = np.copy(self.audio_data[self.chan])
# detect clicks
self.signal_high = butter_pass_filter(self.signal, self.highpass_freq, self.sr, mode='high')
self.tk_signal = TeagerKaiser_operator(self.signal_high)
self.signal_peaks = find_peaks(self.tk_signal,
distance=mini_space,
width=[0,max_length],
prominence=self.sound_thresh)[0]
map_peaks = np.zeros(len(self.tk_signal), dtype=int)
map_peaks[self.signal_peaks] = 1
self.map_peaks = map_peaks
#%% Parameters
# Audio parameters
(file_path, output_path, sound_thresh, channel,
click_size_sec, fmin, verbose, do_plot) = fetch_inputs()
#%% Main executions
Detector = SimpleAutoClicksDetector(
wavefile_path=file_path,
sound_thresh=sound_thresh,
chan=channel,
click_size_sec=click_size_sec,
highpass_freq=fmin,
verbose = verbose)
# save peak locations
arg_peaks = np.nonzero(Detector.map_peaks)[0]
df = pd.DataFrame()
df["time"] = arg_peaks/Detector.sr
df["signal_amplitude"] = Detector.signal[arg_peaks]
df["TK_amplitude"] = Detector.signal[arg_peaks]
df.to_csv(
os.path.join(output_path, os.path.basename(file_path)[:-4]+"_clicks-detection.csv"),
index=False)
if verbose:
print(f"\n{BColors.OKCYAN}File saved in '{output_path}'.{BColors.ENDC}")
if do_plot:
if verbose:
print("Showing plot.")
fig, ax = plt.subplots(1,1)
ax.plot(
np.arange(0, len(Detector.signal_high))/Detector.sr,
Detector.signal_high,
color='tab:blue', zorder=-1)
ax.scatter(
arg_peaks/Detector.sr,
Detector.signal_high[arg_peaks],
color='tab:orange', s=10, zorder=1)
plt.show(block=True)
# Whistles
This repository contains the files used to analyse click data obtained during the <a href="https://umr-marbec.fr/en/the-projects/dolphinfree/">DOLPHINFREE project</a>.
## Description
Files '1-detection_of_clicks.py', '2-get_stats_of_clicks.py', '3-projection_3features.py' and '3-projection_10features.py' were used during the DOLPHINFREE project.
File 'Detector-solorun.py' is a simplified version that can be more easily used by anyone.
## Get Started
For classic use, only 'Detector-solorun.py' and 'ClickUtils.py' are needed. Copy them into a folder and run to see how to use the script.
## Results
Results are saved into .csv files. Each file contains three columns :
- "time_(sec)" : the time at which a click is detected in the audio file (in seconds).
- "signal_amplitude" : the amplitude of the signal at that time.
- "TK_amplitude" : the amplitude of the Teager-Kaiser filtered signal at that time.
\ No newline at end of file
File moved
#%% Importations
import os
import json
import argparse
import numpy as np
from librosa import load, stft, pcen, amplitude_to_db
from scipy import interpolate
from scipy.stats.mstats import gmean
from scipy.signal import resample
from matplotlib import cm
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
from WhistleUtils import (get_local_maxima, get_trajectories,
select_trajectories, sparsity_ridoff, harmonize_trajectories)
#%% Functions
# Argparser
def fetch_inputs():
"""
A function to fetch inputs from cmd terminal.
Run `$python ARGS.py -h` for more information.
...
Parameters
----------
None : Inputs are fetched from cmd terminal as arguments.
Return
------
See arparser's help
"""
# setting up arg parser
parser = argparse.ArgumentParser(
description=
("This script requires LIBRARIES."
"\nAnd it does things, TO BE DESCRIBED.")
)
group1 = parser.add_argument_group('File informations')
group1.add_argument(
'-f', '--audio_file',
type=str,
nargs='?',
required=True,
help=("Path to the audio file that the script will use. "
"Supports all audio formats. Determines sampling_rate automatically.")
)
group1.add_argument(
'-out', '--output',
type=str,
nargs='?',
default=os.path.join(".","outputs"),
required=False,
help=("Path where the contours will be saved. "
"Contours will be saved in a .json file, different for each wavefile. "
"Default path is './outputs'.")
)
group1.add_argument(
'-new', '--resampling_rate',
type=int,
default=48000,
nargs='?',
required=False,
help=("Resampling rate for the Wavefile. "
"Can be used to speed up spectrogram visualisation. "
"Default value is '48,000' Hz.")
)
group1.add_argument(
'-v', '--verbose',
action='store_true',
required=False,
help=("If given, show messages about the progress of the script.")
)
group1.add_argument(
'-show', '--show_plot',
action='store_true',
required=False,
help=("If given, plots the results of the selection process.")
)
group2 = parser.add_argument_group('Spectrogram settings')
group2.add_argument(
'-fft', '--n_fft',
type=int,
default=256,
nargs='?',
required=False,
help=("Size of the fft window in samples. "
"Default value is '256' samples.")
)
group2.add_argument(
'-over', '--n_overlap',
type=int,
default=0,
nargs='?',
required=False,
help=("Number of fft windows overlapping [hop_length = nfft//(noverlap+1)]. "
"Default value is '0' overlap.")
)
group2.add_argument(
'-high', '--high_pass',
type=int,
default=4675,
nargs='?',
required=False,
help=("Frequency for the hig-pass filter. "
"Default value is '4,675' Hz.")
)
group2.add_argument(
'-p', '--pcen',
type=bool,
default=False,
nargs='?',
required=False,
help=("If True, uses a PCEN instead of a classical spetrogram. "
"Default is False.")
)
group3 = parser.add_argument_group('DECAV parameters')
group3.add_argument(
'-dist_f', '--distance_frequency',
type=int,
default=2,
nargs='?',
required=False,
help=("Maximum distance between two pixels on the frequency-axis "
"to consider that they belong to the same whistle. "
"Default value is '2' pixels.")
)
group3.add_argument(
'-dist_t', '--distance_time',
type=int,
default=5,
nargs='?',
required=False,
help=("Maximum distance between two pixels on the time-axis"
"to consider that they belong to the same whistle."
"Default value is '5' pixels.")
)
group3.add_argument(
'-nrg_r', '--energy_ratio',
type=int,
default=6,
nargs='?',
required=False,
help=("SNR parameter to consider that a pixel has a higher value than its neighbours."
"Default value is '6'.")
)
group3.add_argument(
'-min_s', '--min_size',
type=float,
default=53.3,
nargs='?',
required=False,
help=("Minimum size (in ms) to keep a whistle."
"Default value is '53.3' ms.")
)
group3.add_argument(
'-min_a', '--min_acc',
type=float,
default=0.5,
nargs='?',
required=False,
help=("Minimum acceleration of whistle trajectory to be kept."
"Default value is '0.5' Hz.s-2.")
)
group3.add_argument(
'-max_a', '--max_acc',
type=float,
default=3,
nargs='?',
required=False,
help=("Maximum acceleration of whistle trajectory to be kept."
"Default value is '3' Hz.s-2.")
)
group3.add_argument(
'-cc', '--correlation_coef',
type=float,
default=0.5,
nargs='?',
required=False,
help=("Limit of correlation for two whistles overlapping in time."
"If correlation > cc then the highest frequency whistle is"
"considered to be a harmonic, and therefore discarded."
"Default value is '0.5'.")
)
group3.add_argument(
'-s', '--sparsity_coef',
type=float,
default=0.5,
nargs='?',
required=False,
help=("Cleaning value. "
"Contours with more than 'sparsity_coef'%% missing pixels are discarded."
"Default value is '0.5'."),
)
# fetching arguments
args = parser.parse_args()
audio_file = args.audio_file
resampling_rate = args.resampling_rate
n_fft = args.n_fft
n_overlap = args.n_overlap
high_pass = args.high_pass
output = args.output
distance_frequency = args.distance_frequency
distance_time = args.distance_time
energy_ratio = args.energy_ratio
min_size = args.min_size
min_acc = args.min_acc
max_acc = args.max_acc
correlation_coef = args.correlation_coef
sparsity_coef = args.sparsity_coef
# verifying arguments
try:
assert (os.path.exists(audio_file)), (
f"\nInputError: Could not find file '{audio_file}'.")
assert (os.path.exists(output)), (
f"\nInputError: Outputs directory '{output}' does not exist. "
"\nChange the path for the output folder or create corresponding folder.")
assert (n_overlap >= 0), (
f"\nInputError: n_overlap can not be negative.")
assert (distance_frequency > 0), (
f"\nInputError: distance_frequency can not be negative.")
assert (distance_time > 0), (
f"\nInputError: distance_time can not be negative.")
assert (energy_ratio > 0), (
f"\nInputError: energy_ratio can not be negative.")
assert (min_size > 0), (
f"\nInputError: min_size can not be negative.")
assert (min_acc >= 0), (
f"\nInputError: min_acc can not be negative.")
assert (max_acc > 0), (
f"\nInputError: max_acc can not be negative.")
assert (min_size >= max_acc), (
f"\nInputError: min_size must be inferior to max_acc.")
assert (correlation_coef >= 0) and (correlation_coef <= 1), (
f"\nInputError: correlation_coef must be in [0, 1].")
assert (sparsity_coef >= 0) and (sparsity_coef <= 1), (
f"\nInputError: sparsity_coef must be in [0, 1].")
assert (high_pass < (resampling_rate/2)), (
f"\nInputError: high_pass frequency too high "
"(max is resampling_rate/2).")
except Exception as e:
print(e)
exit()
return (audio_file, resampling_rate, n_fft, n_overlap, high_pass, output,
distance_frequency, distance_time, distance_frequency, min_size,
min_acc, max_acc, correlation_coef, sparsity_coef, args.pcen,
args.verbose, args.show_plot)
# Main functions
class BColors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
# Plotting function
def plot_spectrums(list_of_spectrums, cmaps=[], direction=-1, title="Whistles",
bins=1, titles=[], ylabels=[]):
n = len(list_of_spectrums)
fig, axs = plt.subplots(nrows=n, sharex=True, sharey=True, figsize=(15,15))
if len(cmaps) == 0:
cmaps = ['gray_r']*n
for i in range(n):
axs[i].imshow(list_of_spectrums[i][::direction], aspect='auto',
interpolation='nearest', cmap=cmaps[i])
axs[i].set_yticklabels([])
if len(titles) > 0:
axs[i].set_title(titles[i], fontsize=15)
if len(ylabels) >0:
axs[i].set_ylabel(ylabels[i], fontsize=12)
axs[i].tick_params(axis='both', which='both', labelsize=10)
axs[n-1].set_xlabel(f"Time in bins (1 bin = 1/{bins} sec)", fontsize=12)
fig.suptitle(title)
fig.tight_layout()
return fig, axs
def array_to_dict(spectrogram_pixels, fft, overlap, sr):
"""
Function to store pixels coordinates into a dict
Args:
spectrogram_pixels (numpy array)
fft (int):
Size of the fft window in samples
overlap (int):
Number of fft window overlapping
sr (int):
Sampling rate of the audio file
Returns:
dict:
Dictionnary with a list of coordinates associeted to integers
Each integer is a different whistle
"""
values = np.unique(spectrogram_pixels)[1:]
dict_traj = {}
frequencies = np.arange(0, 1 + fft / 2) * sr / fft
frequencies = frequencies[-spectrogram_pixels.shape[0]:]
for key, value in enumerate(values):
coords = [np.where(spectrogram_pixels == value)[1].tolist(),
np.where(spectrogram_pixels == value)[0].tolist()]
# convert to freq
coords[0] = [(coord*(fft/sr))/(overlap+1) for coord in coords[0]]
# convert to time
coords[1] = [frequencies[coord] for coord in coords[1]]
dict_traj[key+1] = coords
return dict_traj
#%% Parameters
# Audio parameters
(file, new_sr, nfft, noverlap, fmin, output, dist_f, dist_t, nrg_r,
taille_min, min_acce, max_acce, min_r_coef, sparsity, do_pcen,
verbose, plot) = fetch_inputs()
f_min = round(fmin/(new_sr/nfft))
hop_length = nfft//(noverlap+1)
taille_traj_min = round(taille_min*(new_sr/hop_length)/1000)
#%% Main execution
# get audio data
if verbose:
print("Loading audio file...")
signal, fe = load(file, sr=None)
duration = len(signal)/fe
# resample to decrease computation time
signal_dec = resample(signal, int((duration*new_sr)))
# extract spectral informations
Magnitude_audio = stft(signal_dec, n_fft=nfft, hop_length=hop_length)
if do_pcen:
spectrum = pcen(np.abs(Magnitude_audio) * (2**31), bias=10)[f_min:,:]
else:
spectrum = np.copy(np.abs(Magnitude_audio[f_min:,:]))
if verbose:
print(f"{BColors.OKGREEN}\tDone.{BColors.ENDC}\n")
# Selection algorithm
if verbose:
print("Searching for local maxima...")
max_loc_per_bin_check1 = get_local_maxima(spectrum, spectrum, nrg_r)[1]
if verbose:
print("Finding contours...")
trajectories = get_trajectories(max_loc_per_bin_check1, dist_f=dist_f, dist_t=dist_t)
if verbose:
print("Cleaning contours...")
final_traj = select_trajectories(trajectories, taille_traj_min, min_acce, max_acce, verbose=0)
corrected_traj = sparsity_ridoff(final_traj, error_thresh=sparsity)
if verbose:
print("Removing harmonics...")
harmonized_traj = harmonize_trajectories(corrected_traj, min_r=min_r_coef,
min_common=taille_traj_min*2, delete=True)
if verbose:
print(f"{BColors.OKGREEN}\tContours ready!{BColors.ENDC}\n")
if plot:
if verbose:
print(f"Showing results...\n")
# generate bright colors to differenciate trajectories
prism = cm.get_cmap('prism', 256)
newcolors = prism(np.linspace(0, 1, np.unique(harmonized_traj).shape[0]))
pink = np.array([0/256, 0/256, 0/256, 1])
newcolors[0, :] = pink
newcmp = ListedColormap(newcolors)
# By default shows the whole audio
start = 0
stop = spectrum.shape[1]
# Create figure
fig, axs = plot_spectrums([amplitude_to_db(spectrum),
np.copy(max_loc_per_bin_check1),
np.copy(final_traj),
np.copy(harmonized_traj)],
['gray_r', 'gray', newcmp, newcmp],
titles=['Spectrogram (dB scale)', 'Local maxima selection', 'Extraction of continuous trajectories',
'Exclusion of harmonics'],
ylabels=["Frequency"]*4,
bins=375, title="")
# Update view
axs[3].set_xlim(start,stop)
plt.show(block=True)
# Saving results
if verbose:
print("Saving .json files...")
dict1 = array_to_dict(final_traj, nfft, noverlap, new_sr)
dict2 = array_to_dict(corrected_traj, nfft, noverlap, new_sr)
dict3 = array_to_dict(harmonized_traj, nfft, noverlap, new_sr)
names = ["DECAV-results","DECAV-results-clean","DECAV-results-deharmonized"]
for i, dict_ in enumerate([dict1, dict2, dict3]):
with open(os.path.join(output, os.path.basename(file)[:-4]+f"_{names[i]}.json"), "w") as f:
json.dump(dict_, f, indent=4)
if verbose:
print(f"{BColors.OKGREEN}It's done.{BColors.ENDC}")
\ No newline at end of file
# Whistles
This repository contains the files used to analyse whistle data obtained during the <a href="https://umr-marbec.fr/en/the-projects/dolphinfree/">DOLPHINFREE project</a>.
## Description
Files '1-Identification_of_whistles.py' and '2-get_stats_of_whistles.py' were used during the DOLPHINFREE project.
File 'DECAV-solorun.py' is a simplified version that can be more easily used by anyone.
## Get Started
For classic use, only 'DECAV-solorun.py' and 'WhistleUtils.py' are needed. Copy them into a folder and run to see how to use the script.
## Results
Results are saved into .json files. Each file contains a dictionnary with :
- a set of key (integers) which are the ID of each whistle contour.
- a set of coordinates ([[x coords],[y coords]]) for the pixels associated with each contour.
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment