Skip to content
Snippets Groups Projects
Commit 0bd0034b authored by ferrari's avatar ferrari
Browse files

initial commit

parents
No related branches found
No related tags found
No related merge requests found
This diff is collapsed.
# cython: infer_types=True
import numpy as np
cimport cython
ctypedef fused npType1:
short
int
long
long long
float
double
ctypedef fused npType2:
short
int
long
long long
@cython.boundscheck(False)
@cython.wraparound(False)
def c_corr(npType1[:,::1] cc, npType2[::1, :] taus):
total = taus.shape[1]
n_pair = taus.shape[0]
cdef double max_corr_val = 0.0
cdef int index = 0
cdef double corr_val = 0.0
for i in range(total):
corr_val = 1.0
for p in range(n_pair):
corr_val *= cc[p, taus[p, i]]
if corr_val > max_corr_val:
max_corr_val = corr_val
index = i
return np.log10(max_corr_val), index
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Computes TDOA estimates from a multi-channel recording.
For usage information, call with --help.
Authors: Maxence Ferrari, Marion Poupard, Jan Schlüter
"""
from __future__ import division
import os
import sys
import itertools
from argparse import ArgumentParser
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LinearRegression
import numpy as np
from numpy.fft import rfft, irfft
import scipy.signal
import soundfile as sf
from c_corr import c_corr
from scipy.signal.windows import tukey
try:
from tqdm import trange
except ImportError:
trange = range
# colors used for plotting
CLR_ENERGY = '#C46362'
CLR_TDOA = '#1F77B4'
CLR_DERIVED_TDOA = '#94B9D3'
def opts_parser():
usage =\
"""Computes TDOA estimates from a multi-channel recording.
"""
parser = ArgumentParser(description=usage)
parser.add_argument('infile',
type=str,
help='The sound file to process.')
parser.add_argument('outfile',
type=str,
help='The text or npy file to write results to, or "display" or '
'a pdf or png file to plot results instead. For text and npy '
'files, each row gives the position (in samples), '
'cross-correlation product, the independent TDOAs (in '
'samples), and TDOAs derived from the independent ones. For '
'plots, the panes give the cross-correlation product, '
'independent TDOAS and derived TDOAs from top to bottom.')
parser.add_argument('-c', '--channels',
type=intlist, default=None,
help='The channels to cross-correlate. Accepts two or more, '
'but beware of high memory use. To be given as a '
'comma-separated list of numbers, with 0 referring to the '
'first channel (default: all channels).')
parser.add_argument('--start', metavar='SECONDS',
type=float, default=None,
help='If given, only analyze from the given position.')
parser.add_argument('--end', metavar='SECONDS',
type=float, default=None,
help='If given, only analyze up to the given position.')
parser.add_argument('-f', '--frame-size',
type=float, default=0.2,
help='The size of the cross-correlation frames in seconds '
'(default: %(default)s)')
parser.add_argument('-s', '--hop-size',
type=str, default=0.01,
help='The step between the beginnings of sequential frames '
'in seconds (default: %(default)s), or the postion in second'
'if csv file path is given.')
parser.add_argument('-m', '--max-tdoa',
type=float, default=0.0011,
help='The maximum TDOA in seconds (default: %(default)s). '
'Compute as maximum distance between two microphones '
'(or hydrophones) divided by the minimum speed of '
'sound (330 m/s in air, 1450 m/s in water).')
parser.add_argument('-l', '--lower-freq',
type=float, default=0,
help='Cutoff frequency of a high-pass filter to apply on the '
'samples before correlating (default: %(default)s). Set '
'to 0 to disable.')
parser.add_argument('-u', '--upper-freq',
type=float, default=0,
help='Cutoff frequency of a low-pass filter to apply on the '
'samples before correlating (default: %(default)s). Set '
'to 0 to disable.)')
parser.add_argument('-d', '--decimate',
type=int, default=1,
help='Reduce temporal resolution by the given factor when '
'computing TDOAs. Make sure to pass a low enough '
'--upper-freq to avoid artifacts (default: %(default)s).')
parser.add_argument('--decimate-in-time',
action='store_true',
help='If given, any decimation will be applied in the time '
'domain instead of the spectral domain.')
parser.add_argument('-t', '--result-threshold',
type=str, default=None,
help='Remove all points below a threshold in cross-correlation '
'product. "abs:N" (e.g., abs:12.5) applies an absolute threshold, '
'"rel" applies a relative threshold, "rel:M,N" allows to tune its '
'parameters (e.g., rel:90,2.5). The relative threshold is set to '
'the mean + N * stddev of all cross-correlation product values '
'below the M-percentile. Defaults are M=95, N=1.')
parser.add_argument('-i', '--inverse',
type=intlist, default=None,
help='Inverse the channel. To be given as a comma-separated list of numbers,'
'with 0 referring to the first channel once channels have been chosen by --channels'
)
parser.add_argument('-e', action='store_false', help='Erase exiting out file')
return parser
def intlist(s):
return list(map(int, s.split(',')))
def slicer(down, up, ndim, n):
index = np.mgrid[ndim * [slice(0, n)]]
bounds = np.linspace(down, up, n + 1).astype(np.int)
slices = np.asarray([slice(a, b)
for a, b in zip(bounds[:-1], bounds[1:])])
return slices[index].reshape(ndim, -1).T
def corr(data, sr, pos, w_size, max_tdoa, decimate=1):
num_channels = data.shape[1]
num_channel_pairs = num_channels * (num_channels - 1) // 2
# apply zero-padding
data = np.pad(data, (((w_size - 1) // 2, (w_size - 1) // 2), (0, 0)),
'constant')
# prepare window
win = np.expand_dims(tukey(w_size, 0.2), -1)
# prepare index matrices
v1 = np.empty(num_channel_pairs, np.int8)
v2 = np.empty(num_channel_pairs, np.int8)
mat = np.zeros((num_channel_pairs, num_channels - 1), np.int8)
for k, (i, j) in enumerate(itertools.combinations(range(num_channels), 2)):
if i > 0:
mat[k, i-1] = -1
mat[k, j-1] = 1
v1[k] = i
v2[k] = j
cc_size = min(w_size, int(2 * max_tdoa // decimate))
slices = slicer(-(cc_size // 2), cc_size // 2, (num_channels - 1), 16)
tausf = []
for j in range(len(slices)):
taus = np.mgrid[slices[j]].reshape(num_channels - 1, -1).astype(np.int16)
taus2 = np.matmul(mat, taus)
tausf += [taus2[:, np.abs(taus2).max(0) <= cc_size // 2]]
tausf = np.hstack(tausf)
dw_size = w_size // decimate
tausf %= dw_size
# run the TDOA search
tdoas = np.zeros((len(pos), num_channels), np.float32)
tdoas2 = np.zeros((len(pos), num_channels -1), np.float32)
cc = np.empty((num_channel_pairs, dw_size), np.float32)
poly = PolynomialFeatures(2)
lin = LinearRegression()
pipe = Pipeline([('poly',poly),('lin',lin)])
ind = np.triu_indices(num_channels -1)
for i in trange(len(pos)):
fft = rfft(win * data[pos[i]:w_size + pos[i]], axis=0)
if decimate > 1:
fft = fft[:(len(fft) - 1) // decimate + 1]
fft = np.asarray(fft, dtype=np.complex64)
cc[:] = irfft(fft[:, v2] * np.conj(fft[:, v1]), axis=0).T
cc -= cc.min(-1, keepdims=True)
val, index = c_corr(cc, tausf)
tdoas[i, 0] = val
tdoas[i, 1:] = ((tausf[:(num_channels - 1), index] + dw_size//2) % dw_size) - dw_size//2
#hyper resolution
taus = tdoas[i,1:num_channels] + np.stack(np.meshgrid(*(num_channels-1) * (np.arange(-2,3),)),0).reshape(num_channels-1, -1).T
taus = np.matmul(mat, taus.T)
taus = taus[:, np.abs(taus).max(0) <= cc_size // 2]
mean = taus.mean(-1)[:3]
coef = pipe.fit((taus).T[:,:3] - mean,
cc[np.expand_dims(np.arange(num_channel_pairs), 1), taus.astype(np.int)].prod(0)
).named_steps['lin'].coef_
der = np.zeros((num_channels -1, num_channels - 1))
der[ind] = coef[4:]
tdoas2[i] = np.linalg.lstsq(der + der.T, -coef[1:4], rcond=None)[0] + mean
return np.hstack((np.expand_dims(pos, -1), tdoas)), np.hstack((np.expand_dims(pos, -1), tdoas2))
def to_float(data):
"""
Converts integer samples to float samples, dividing by the datatype's
maximum on the way. If the data is in floating point already, just
converts to float32 if needed.
"""
data = np.asanyarray(data)
if np.issubdtype(data.dtype, np.floating):
return np.asarray(data, dtype=np.float32)
else:
return np.divide(data, np.iinfo(data.dtype).max, dtype=np.float32)
def main():
# parse command line
parser = opts_parser()
options = parser.parse_args()
if options.e and os.path.isfile(options.outfile):
return None
if options.result_threshold is None:
pass
elif options.result_threshold.startswith('abs:'):
options.result_threshold = ('abs', float(options.result_threshold[4:]))
elif options.result_threshold == 'rel':
options.result_threshold = ('rel', (95, 1))
elif options.result_threshold.startswith('rel:'):
options.result_threshold = ('rel', tuple(
float(v) for v in options.result_threshold[4:].split(',', 1)))
else:
parser.error('invalid --result-threshold: %s' % options.result_threshold)
# load audio file
print("Loading %s..." % options.infile)
samples, sr = sf.read(options.infile)
if options.start is not None or options.end is not None:
samples = samples[
options.start and int(options.start * sr) or None:
options.end and int(options.end * sr) or None]
# keep only required channels and convert to float
if options.channels is not None:
samples_ = np.empty((len(samples), len(options.channels)), dtype=np.float32)
for s, c in zip(samples_.T, options.channels):
s[:] = to_float(samples[:, c])
samples = samples_
else:
samples = to_float(samples)
options.channels = range(samples.shape[1])
if options.inverse is not None:
for c in options.inverse:
samples[:, c] *= -1
# apply temporal lowpass and/or highpass, if needed
if options.lower_freq or options.upper_freq:
print("Filtering...")
nyquist = sr / 2
if options.lower_freq and not options.upper_freq:
sos = scipy.signal.butter(8, options.lower_freq / nyquist,
output='sos', btype='highpass')
elif options.lower_freq and options.upper_freq:
sos = scipy.signal.butter(8, (options.lower_freq / nyquist,
options.upper_freq / nyquist),
output='sos', btype='bandpass')
elif not options.lower_freq and options.upper_freq:
sos = scipy.signal.butter(8, options.upper_freq / nyquist,
output='sos', btype='lowpass')
samples = scipy.signal.sosfilt(sos, samples, axis=0)
# decimate in time domain, if requested
if options.decimate and options.decimate_in_time:
samples = samples[::options.decimate]
sr /= options.decimate
# prepare list of positions to compute TDOAs for
if os.path.isfile(options.hop_size):
pos = (sr * np.loadtxt(options.hop_size, delimiter=',')).astype(int).ravel()
else:
try:
pos = np.arange(0, len(samples), int(sr * float(options.hop_size)))
except ValueError:
raise ValueError(f'Error: hop size {options.hop_size} is nethier an existing file nor a float')
# handle it to the algorithm
print("Computing TDOAs...")
result1, result2 = corr(samples, sr, pos, int(sr * options.frame_size),
max_tdoa=int(np.ceil(sr * options.max_tdoa)),
decimate=(options.decimate
if not options.decimate_in_time else 1))
# compute additional non-independent TDOAs
additional1 = [(b - a) for a, b in itertools.combinations(result1.T[2:], 2)]
result1 = np.hstack((result1,) + tuple(a[:, np.newaxis] for a in additional1))
additional2 = [(b - a) for a, b in itertools.combinations(result2.T[1:], 2)]
result2 = np.hstack((result2,) + tuple(a[:, np.newaxis] for a in additional2))
# filter the background noise
if options.result_threshold is not None:
method, args = options.result_threshold
values = result1[:, 1]
if method == 'abs':
result1 = result1[values > args]
elif method == 'rel':
percentile, stddev = args
below_percentile = values[values < np.percentile(values, percentile)]
result1 = result1[values > np.mean(below_percentile) +
stddev * np.std(below_percentile)]
# save or plot results
if options.outfile.endswith('.npy'):
np.save(options.outfile, result1)
np.save(options.outfile[:-4]+'2.npy', result2)
elif (options.outfile == 'display' or
options.outfile[-3:] in ('pdf', 'png')):
import matplotlib as mpl
import matplotlib.pyplot as plt
fig, axes = plt.subplots(result1.shape[1] - 1, 1, sharex=True)
num_channels = len(options.channels)
num_tdoas = num_channels - 1
num_derived_tdoas = num_tdoas * (num_tdoas - 1) // 2
colors = ([CLR_ENERGY] +
[CLR_TDOA] * num_tdoas +
[CLR_DERIVED_TDOA] * num_derived_tdoas)
times = result1[:, 0] / sr
if options.start is not None:
times += options.start
for ax, r, c in zip(axes, result1.T[1:], colors):
ax.plot(times, r, '.', markersize=3, color=c)
axes[-1].xaxis.set_major_formatter(mpl.ticker.FuncFormatter(
lambda v, _: ('%d:%02.1f' % (v // 60, v % 60))))
fig.suptitle(os.path.basename(options.infile))
if options.outfile == 'display':
plt.show()
else:
plt.savefig(options.outfile)
else:
np.savetxt(options.outfile, result1, delimiter=',')
np.savetxt(options.outfile+'2', result2, delimiter=',')
print("Done.")
if __name__ == "__main__":
sys.exit(main())
from distutils.core import setup
from Cython.Build import cythonize
setup(
ext_modules = cythonize("c_corr.pyx", annotate=True)
)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment