Select Git revision
c_corr.pyx 3.71 KiB
# cython: infer_types=True
import numpy as np
cimport cython
ctypedef fused npType1:
short
int
long
float
double
ctypedef fused npType2:
short
int
long
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.embedsignature(True)
def c_corr_at(npType1[:,::1] cc, npType2[::1, :] taus):
total = taus.shape[1]
n_pair = taus.shape[0]
cdef double max_corr_val = 0.0
cdef int index = 0
cdef double corr_val = 0.0
for i in range(total):
corr_val = 1.0
for p in range(n_pair):
corr_val *= cc[p, taus[p, i]]
if corr_val > max_corr_val:
max_corr_val = corr_val
index = i
return np.log10(max_corr_val), index
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.embedsignature(True)
cpdef c_corr_all(cc, npType2 max_tdoa, npType2 base_tdoa):
cdef float[:,::1] cc_mem = cc.astype(np.float32)
cdef double corr_val = 0.0
cdef long[::1] taus = np.zeros(cc.shape[0], int)
cdef long[::1] out_taus = np.zeros(cc.shape[0], int)
corr_val = _c_corr_core(cc_mem, taus, out_taus, corr_val, <long> base_tdoa-1, <long> base_tdoa,
<long> max_tdoa, <long> cc.shape[1], <long> cc.shape[0])
return np.log10(corr_val), out_taus
cdef long moduloNeg(long a, long b):
if a < 0:
return a + b
else:
return a
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.profile(True)
cdef double _c_corr_core(float[:,::1] cc, long[::1] taus, long[::1] out_taus, double val,
long curr_tdoa, long ind_tdoa, long max_tdoa, long win_size, long n_pair):
cdef int i0 = 1
cdef int j0 = 0
cdef double tmp_val = 0.0
if curr_tdoa < 0:
for i in range(ind_tdoa, n_pair):
taus[i] = taus[i0] - taus[j0]
i0 += 1
if i0 >= ind_tdoa:
j0 += 1
i0 = j0 + 1
tmp_val = cc[0, moduloNeg(taus[0], win_size)]
for i in range(1, n_pair):
tmp_val *= cc[i, moduloNeg(taus[i], win_size)]
if val < tmp_val:
val = tmp_val
out_taus[:] = taus
return val
cdef int low = -max_tdoa
cdef int high = max_tdoa
for i in range(curr_tdoa+1, ind_tdoa):
if low < taus[i] - max_tdoa:
low = -max_tdoa + taus[i]
if high > max_tdoa + taus[i]:
high = max_tdoa + taus[i]
for t in range(low, high+1):
taus[curr_tdoa] = t
val = _c_corr_core(cc, taus, out_taus, val, curr_tdoa-1, ind_tdoa, max_tdoa, win_size, n_pair)
return val
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.profile(True)
def _c_corr_core2(npType1[:,::1] cc, npType2[::1] taus, npType2[::1] out_taus, double val,
npType2 curr_tdoa, npType2 ind_tdoa, npType2 max_tdoa, npType2 win_size, npType2 n_pair):
cdef int i0 = 1
cdef int j0 = 0
cdef double tmp_val = 0.0
for i in range(ind_tdoa):
taus[i] = - max_tdoa - 1
cdef int level = 0
while level >= 0:
taus[level] += 1
if taus[level] > min(taus[:level]) + max_tdoa:
taus[level] = max(taus[:level]) - max_tdoa - 1
level -= 1
continue
level +=1
if level == ind_tdoa:
level -= 1
for i in range(ind_tdoa, n_pair):
taus[i] = taus[i0] - taus[j0]
i0 += 1
if i0 >= ind_tdoa:
j0 += 1
i0 = j0 + 1
tmp_val = cc[0, moduloNeg(taus[0], win_size)]
for i in range(1, n_pair):
tmp_val *= cc[i, moduloNeg(taus[i], win_size)]
if val < tmp_val:
val = tmp_val
out_taus[:] = taus
return val