tf_fading.py 14.5 KB
Newer Older
valentin.emiya's avatar
valentin.emiya committed
1
2
# -*- coding: utf-8 -*-
"""
valentin.emiya's avatar
valentin.emiya committed
3
Class :class:`GabMulTff` is the main object to solve a time-frequency fading
valentin.emiya's avatar
valentin.emiya committed
4
problem.
valentin.emiya's avatar
valentin.emiya committed
5
6
7
8

.. moduleauthor:: Valentin Emiya
"""
from time import perf_counter
valentin.emiya's avatar
valentin.emiya committed
9
from pathlib import Path
valentin.emiya's avatar
valentin.emiya committed
10

valentin.emiya's avatar
valentin.emiya committed
11
import numpy as np
valentin.emiya's avatar
valentin.emiya committed
12
from scipy.optimize import minimize_scalar, minimize
valentin.emiya's avatar
valentin.emiya committed
13
14
from matplotlib import pyplot as plt
from ltfatpy import plotdgtreal
valentin.emiya's avatar
valentin.emiya committed
15
16
17
18
19
20
21
22
23
24
25

from skpomade.range_approximation import \
    adaptive_randomized_range_finder, randomized_range_finder
from skpomade.factorization_construction import evd_nystrom

from tffpy.tf_tools import GaborMultiplier
from tffpy.create_subregions import create_subregions
from tffpy.utils import dgt, plot_spectrogram, db


class GabMulTff:
valentin.emiya's avatar
valentin.emiya committed
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
    """
    Time-frequency fading using Gabor multipliers

    Main object to solve the TFF problem

    Parameters
    ----------
    x_mix : nd-array
        Mix signal
    mask : nd-array
        Time-frequency mask
    dgt_params : dict
        DGT parameters
    signal_params : dict
        Signal parameters
    tol_subregions : None or float
        If None, the mask is considered as a single region. If float,
        tolerance to split the mask into sub-regions using
valentin.emiya's avatar
valentin.emiya committed
44
        :py:func:`~tffpy.create_subregions.create_subregions`.
valentin.emiya's avatar
valentin.emiya committed
45
46
47
48
49
    fig_dir : str or Path
        If not None, folder where figures are stored. If None, figures are
        not plotted.
    """

valentin.emiya's avatar
valentin.emiya committed
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
    def __init__(self, x_mix, mask, dgt_params, signal_params,
                 tol_subregions=None, fig_dir=None):
        self.x_mix = x_mix
        self.dgt_params = dgt_params
        self.signal_params = signal_params
        self.tol_subregions = tol_subregions
        self.t_subreg = None
        if tol_subregions is not None:
            if np.issubdtype(mask.dtype, np.bool_):
                t0 = perf_counter()
                mask = create_subregions(mask_bool=mask,
                                         dgt_params=dgt_params,
                                         signal_params=signal_params,
                                         tol=tol_subregions,
                                         fig_dir=fig_dir)
                self.t_subreg = perf_counter() - t0
            n_areas = np.unique(mask).size - 1
            self.mask = mask
        else:
            n_areas = 1
            self.mask = mask > 0
        self.gabmul_list = [GaborMultiplier(mask=(mask == i + 1),
                                            dgt_params=dgt_params,
                                            signal_params=signal_params)
                            for i in range(n_areas)]
        self.s_vec_list = [None for i in range(n_areas)]
        self.u_mat_list = [None for i in range(n_areas)]
        self.uh_x_list = [None for i in range(n_areas)]
        self.t_arrf = [None for i in range(n_areas)]
        self.t_evdn = [None for i in range(n_areas)]
        self.t_uh_x = [None for i in range(n_areas)]
        self.fig_dir = fig_dir
        if fig_dir is not None:
valentin.emiya's avatar
valentin.emiya committed
83
            fig_dir = Path(fig_dir)
valentin.emiya's avatar
valentin.emiya committed
84
85
86
87
            fig_dir.mkdir(parents=True, exist_ok=True)

    @property
    def n_areas(self):
valentin.emiya's avatar
valentin.emiya committed
88
89
90
        """
        Number of sub-regions
        """
valentin.emiya's avatar
valentin.emiya committed
91
92
93
        return len(self.u_mat_list)

    def compute_decomposition(self, tolerance_arrf, proba_arrf):
valentin.emiya's avatar
valentin.emiya committed
94
95
96
97
98
99
100
101
        """
        Decompose each Gabor multiplier using a random EVD

        The decomposition is obtained using
        :py:func:`skpomade.range_approximation.adaptive_randomized_range_finder`
        followed by :py:func:`skpomade.factorization_construction.evd_nystrom`.
        The rank of each decomposition is estimated using parameters
        `tolerance_arrf` and `proba_arrf`.
valentin.emiya's avatar
valentin.emiya committed
102
103
104
105
        Running times to compute the range approximation, the
        EVD itself and the additional matrix products for subsequent
        computations are stored in attributes `t_arrf`, `t_evdn` and
        `t_uh_x`, respectively.
valentin.emiya's avatar
valentin.emiya committed
106
107
108
109
110

        Parameters
        ----------
        tolerance_arrf : float
            Tolerance for
valentin.emiya's avatar
valentin.emiya committed
111
            :py:func:`~skpomade.range_approximation.adaptive_randomized_range_finder`
valentin.emiya's avatar
valentin.emiya committed
112
113
        proba_arrf : float
            Probability of error for
valentin.emiya's avatar
valentin.emiya committed
114
            :py:func:`~skpomade.range_approximation.adaptive_randomized_range_finder`
valentin.emiya's avatar
valentin.emiya committed
115
116

        """
valentin.emiya's avatar
valentin.emiya committed
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
        for i in range(self.n_areas):
            print('Random EVD of Gabor multiplier #{}'.format(i))
            print('#coefs in mask: {} ({:.1%} missing)'
                  .format(np.sum(self.gabmul_list[i].mask),
                          np.sum(self.gabmul_list[i].mask)
                          / self.gabmul_list[i].mask.size))
            t0 = perf_counter()
            q_mat = adaptive_randomized_range_finder(a=self.gabmul_list[i],
                                                     tolerance=tolerance_arrf,
                                                     proba=proba_arrf, r=None,
                                                     rand_state=0, n_cols_Q=32)
            self.t_arrf[i] = perf_counter() - t0
            print('Q shape:', q_mat.shape)
            t0 = perf_counter()
            self.s_vec_list[i], self.u_mat_list[i] = \
                evd_nystrom(a=self.gabmul_list[i], q_mat=q_mat)
            self.t_evdn[i] = perf_counter() - t0
            print('Running times:')
            print('   - adaptive_randomized_range_finder: {} s'.format(
                self.t_arrf[i]))
            print('   - evd_nystrom: {} s'.format(self.t_evdn[i]))

            t0 = perf_counter()
            self.uh_x_list[i] = self.u_mat_list[i].T.conj() @ self.x_mix
            self.t_uh_x[i] = perf_counter() - t0

    def compute_decomposition_fixed_rank(self, rank):
valentin.emiya's avatar
valentin.emiya committed
144
145
146
147
148
149
150
151
152
153
154
155
156
        """
        Decompose each Gabor multiplier using a random EVD with given rank

        The decomposition is obtained using
        :py:func:`skpomade.range_approximation.randomized_range_finder`
        followed by :py:func:`skpomade.factorization_construction.evd_nystrom`.
        Running times are stored in attributes `t_rrf`, `t_evdn` and `t_uh_x`.

        Parameters
        ----------
        rank : int
            Rank of the decompostion
        """
valentin.emiya's avatar
valentin.emiya committed
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
        t_rrf = [None for i in range(self.n_areas)]
        t_evdn = [None for i in range(self.n_areas)]
        t_uh_x = [None for i in range(self.n_areas)]
        for i in range(self.n_areas):
            print('Random EVD of Gabor multiplier #{}'.format(i))
            print('#coefs in mask: {} ({:.1%})'
                  .format(np.sum(self.gabmul_list[i].mask),
                          np.sum(self.gabmul_list[i].mask)
                          / self.gabmul_list[i].mask.size))
            t0 = perf_counter()
            q_mat = randomized_range_finder(a=self.gabmul_list[i],
                                            n_l=rank,
                                            rand_state=0)
            t_rrf[i] = perf_counter() - t0
            print('Q shape:', q_mat.shape)
            t0 = perf_counter()
            self.s_vec_list[i], self.u_mat_list[i] = \
                evd_nystrom(a=self.gabmul_list[i], q_mat=q_mat)
            t_evdn[i] = perf_counter() - t0
            print('Running times:')
            print('   - randomized_range_finder: {} s'.format(t_rrf[i]))
            print('   - evd_nystrom: {} s'.format(t_evdn[i]))

            t0 = perf_counter()
            self.uh_x_list[i] = self.u_mat_list[i].T.conj() @ self.x_mix
            t_uh_x[i] = perf_counter() - t0

    def compute_estimate(self, lambda_coef):
valentin.emiya's avatar
valentin.emiya committed
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
        """
        Compute the signal estimate for a given hyperparameter
        :math:`\lambda_i` for each sub-region :math:`i`.

        Prior decomposition should have been performed using
        :meth:`compute_decomposition` or
        :meth:`compute_decomposition_fixed_rank`.

        Parameters
        ----------
        lambda_coef : nd-array or float
            If nd-array, hyperparameters :math:`\lambda_i` for each sub-region
            :math:`i`. If float, the same value :math:`\lambda` is used
            for each sub-region.

        Returns
        -------
        nd-array
            Reconstructed signal
        """
valentin.emiya's avatar
valentin.emiya committed
205
206
207
208
209
210
211
212
213
214
215
216
        if isinstance(lambda_coef, np.ndarray):
            assert lambda_coef.size == self.n_areas
        else:
            lambda_coef = np.full(self.n_areas, fill_value=lambda_coef)
        x = self.x_mix.copy()
        for i in range(self.n_areas):
            gamma_vec = lambda_coef[i] * self.s_vec_list[i] \
                        / (1 - (1 - lambda_coef[i]) * self.s_vec_list[i])
            x -= self.u_mat_list[i] @ (gamma_vec * self.uh_x_list[i])
        return x

    def compute_lambda(self, x_mix, e_target=None):
valentin.emiya's avatar
valentin.emiya committed
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
        """
        Estimate hyperparameters :math:`\lambda_i` from target energy in each
        sub-region :math:`i`.

        Parameters
        ----------
        x_mix : nd-array
            Mix signal

        e_target : nd-array or None
            Target energy for each sub-region/. If None, function
            :py:func:`estimate_energy_in_mask` is used to estimate the
            target energies.

        Returns
        -------
        lambda_est : nd-array
            Hyperparameters :math:`\lambda_i` for each sub-region :math:`i`.
        t_est : nd-array
            Running time to estimate each hyperparameter.
        """
valentin.emiya's avatar
valentin.emiya committed
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
        if e_target is None:
            e_target = estimate_energy_in_mask(
                x_mix=x_mix, mask=self.mask,
                dgt_params=self.dgt_params, signal_params=self.signal_params,
                fig_dir=self.fig_dir)
        t_est = np.empty(self.n_areas)
        lambda_est = np.empty(self.n_areas)
        for i_area in range(self.n_areas):
            mask_i = self.mask == i_area + 1

            def obj_fun_est(lambda_coef):
                x = self.compute_estimate(lambda_coef)
                x_tf_masked = mask_i * self.gabmul_list[i_area].dgt(x)
                e_mask = np.linalg.norm(x_tf_masked, 'fro') ** 2
                return np.abs(e_target[i_area] - e_mask)

            t0 = perf_counter()
            sol_est = minimize_scalar(obj_fun_est, bracket=[0, 1],
                                      method='brent')
            t_est[i_area] = perf_counter() - t0
            lambda_est[i_area] = sol_est.x
        return lambda_est, t_est


def reconstruction(x_mix, lambda_coef, u_mat, s_vec):
    return GabMulTff(x_mix=x_mix, u_mat=u_mat, s_vec=s_vec)(lambda_coef)


def estimate_energy_in_mask(x_mix, mask, dgt_params, signal_params,
                            fig_dir=None, prefix=None):
valentin.emiya's avatar
valentin.emiya committed
268
269
270
271
272
273
274
275
276
277
278
279
280
    """
    Estimate energy in time-frequency mask

    Parameters
    ----------
    x_mix : nd-array
        Mix signal
    mask: nd-array
        Time-frequency mask for each sub-region
    dgt_params : dict
        DGT parameters
    signal_params : dict
        Signal parameters
valentin.emiya's avatar
valentin.emiya committed
281
    fig_dir : str or Path
valentin.emiya's avatar
valentin.emiya committed
282
283
284
285
286
287
288
289
290
291
        If not None, folder where figures are stored. If None, figures are
        not plotted.
    prefix : str
        If not None, this prefix is used when saving the figures.

    Returns
    -------
    nd-array
        Estimated energy in each sub-region.
    """
valentin.emiya's avatar
valentin.emiya committed
292
293
294
295
296
297
298
299
300
301
302
303
304
305
    x_tf_mat = dgt(sig=x_mix, dgt_params=dgt_params)
    x_tf_mat[mask > 0] = np.nan
    e_f_mean = np.nanmean(np.abs(x_tf_mat) ** 2, axis=1)

    mask = mask.astype(int)
    n_areas = np.unique(mask).size - 1
    estimated_energy = np.empty(n_areas)
    e_mat = e_f_mean[:, None] @ np.ones((1, x_tf_mat.shape[1]))
    e_mat[mask == 0] = 0
    for i_area in range(n_areas):
        mask_i = mask == i_area + 1
        estimated_energy[i_area] = np.sum(e_mat * mask_i)

    if fig_dir is not None:
valentin.emiya's avatar
valentin.emiya committed
306
        fig_dir = Path(fig_dir)
valentin.emiya's avatar
valentin.emiya committed
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
        fig_dir.mkdir(parents=True, exist_ok=True)
        if prefix is None:
            prefix = ''
        else:
            prefix = prefix + '_'
        dynrange = 100
        c_max = np.nanmax(db(x_tf_mat))
        clim = c_max - dynrange, c_max

        fs = signal_params['fs']
        plt.figure()
        plot_spectrogram(x=x_mix, dgt_params=dgt_params, fs=fs, clim=clim)
        plt.title('Mix')
        plt.savefig(fig_dir / '{}mix.pdf'.format(prefix))

        plt.figure()
        plotdgtreal(coef=np.sqrt(e_mat), a=dgt_params['hop'],
                    M=dgt_params['n_bins'], fs=fs, clim=clim)
        plt.title('Mask filled with average energy (total: {})'
                  .format(estimated_energy))
        plt.savefig(fig_dir / '{}filled_mask.pdf'.format(prefix))

        x_tf_mat[mask > 0] = np.sqrt(e_mat[mask > 0])
        plt.figure()
        plotdgtreal(coef=x_tf_mat, a=dgt_params['hop'],
                    M=dgt_params['n_bins'], fs=fs, clim=clim)
        plt.title('Mix filled with average energy (total: {})'
                  .format(estimated_energy))
        plt.savefig(fig_dir / '{}filled_mask.pdf'.format(prefix))

        plt.figure()
        plt.plot(db(e_f_mean) / 2)
        plt.xlabel('Frequency index')
        plt.ylabel('Average energy')
        plt.title('Average energy per frequency bin in mix')
        plt.savefig(fig_dir / '{}average_energy.pdf'.format(prefix))

        e_f_mean_check = np.mean(np.abs(x_tf_mat) ** 2, axis=1)
        plt.figure()
        plt.plot(db(e_f_mean) / 2, label='Before filling')
        plt.plot(db(e_f_mean_check) / 2, '--', label='After filling')
        plt.xlabel('Frequency index')
        plt.ylabel('Average energy')
        plt.title('Average energy per frequency bin in mix')
        plt.legend()
        plt.savefig(fig_dir / '{}average_energy_check.pdf'.format(prefix))

    return estimated_energy


def compute_lambda_oracle_sdr(gmtff, x_wb):
valentin.emiya's avatar
valentin.emiya committed
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
    """
    Compute oracle value for hyperparameter :math:`\lambda_i` from true
    solution.

    If only one region is considered, the Brent's algorithm is used (see
    :py:func:`scipy.optimize.minimize_scalar`). If multiple sub-regions are
    considered the BFGS
    algorithm is used (see :py:func:`scipy.optimize.minimize`).

    Parameters
    ----------
    gmtff : GabMulTff
    x_wb : nd-array
        True signal for the wideband source.

    Returns
    -------
    lambda_oracle : nd-array
        Oracle values for hyperparameters :math:`\lambda_i` for each
        sub-region :math:`i`.
    t_oracle : nd-array
        Running times for the computation of each hyperparameter
    """
valentin.emiya's avatar
valentin.emiya committed
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
    t0 = perf_counter()

    def obj_fun_oracle(lambda_coef):
        return np.linalg.norm(x_wb - gmtff.compute_estimate(lambda_coef))

    if gmtff.tol_subregions is None:
        sol_oracle = minimize_scalar(obj_fun_oracle,
                                     bracket=[0, 1], method='brent')
        lambda_oracle = np.array([sol_oracle.x])
    else:
        sol_oracle = minimize(obj_fun_oracle,
                              np.ones(gmtff.n_areas),
                              method='BFGS',
                              options={'disp': True})
        lambda_oracle = sol_oracle.x
    t_oracle = perf_counter() - t0
    return lambda_oracle, t_oracle