Skip to content
Snippets Groups Projects
Select Git revision
  • 38d60f6d8ff92f81fef6e44e3f50baa5897810dd
  • master default
  • object
  • develop protected
  • private_algos
  • cuisine
  • SMOTE
  • revert-76c4cca5
  • archive protected
  • no_graphviz
  • 0.0.1
11 results

analyzeResults.py

Blame
  • tf_fading.py 14.18 KiB
    # -*- coding: utf-8 -*-
    """
    
    .. moduleauthor:: Valentin Emiya
    """
    import numpy as np
    from time import perf_counter
    
    from ltfatpy import plotdgtreal
    from matplotlib import pyplot as plt
    from scipy.optimize import minimize_scalar, minimize
    
    from skpomade.range_approximation import \
        adaptive_randomized_range_finder, randomized_range_finder
    from skpomade.factorization_construction import evd_nystrom
    
    from tffpy.tf_tools import GaborMultiplier
    from tffpy.create_subregions import create_subregions
    from tffpy.utils import dgt, plot_spectrogram, db
    
    
    class GabMulTff:
        """
        Time-frequency fading using Gabor multipliers
    
        Main object to solve the TFF problem
    
        Parameters
        ----------
        x_mix : nd-array
            Mix signal
        mask : nd-array
            Time-frequency mask
        dgt_params : dict
            DGT parameters
        signal_params : dict
            Signal parameters
        tol_subregions : None or float
            If None, the mask is considered as a single region. If float,
            tolerance to split the mask into sub-regions using
            :py:func:`tffpy.create_subregions.create_subregions`.
        fig_dir : str or Path
            If not None, folder where figures are stored. If None, figures are
            not plotted.
        """
    
        def __init__(self, x_mix, mask, dgt_params, signal_params,
                     tol_subregions=None, fig_dir=None):
            self.x_mix = x_mix
            self.dgt_params = dgt_params
            self.signal_params = signal_params
            self.tol_subregions = tol_subregions
            self.t_subreg = None
            if tol_subregions is not None:
                if np.issubdtype(mask.dtype, np.bool_):
                    t0 = perf_counter()
                    mask = create_subregions(mask_bool=mask,
                                             dgt_params=dgt_params,
                                             signal_params=signal_params,
                                             tol=tol_subregions,
                                             fig_dir=fig_dir)
                    self.t_subreg = perf_counter() - t0
                n_areas = np.unique(mask).size - 1
                self.mask = mask
            else:
                n_areas = 1
                self.mask = mask > 0
            self.gabmul_list = [GaborMultiplier(mask=(mask == i + 1),
                                                dgt_params=dgt_params,
                                                signal_params=signal_params)
                                for i in range(n_areas)]
            self.s_vec_list = [None for i in range(n_areas)]
            self.u_mat_list = [None for i in range(n_areas)]
            self.uh_x_list = [None for i in range(n_areas)]
            self.t_arrf = [None for i in range(n_areas)]
            self.t_evdn = [None for i in range(n_areas)]
            self.t_uh_x = [None for i in range(n_areas)]
            self.fig_dir = fig_dir
            if fig_dir is not None:
                fig_dir.mkdir(parents=True, exist_ok=True)
    
        @property
        def n_areas(self):
            """
            Number of sub-regions
            """
            return len(self.u_mat_list)
    
        def compute_decomposition(self, tolerance_arrf, proba_arrf):
            """
            Decompose each Gabor multiplier using a random EVD
    
            The decomposition is obtained using
            :py:func:`skpomade.range_approximation.adaptive_randomized_range_finder`
            followed by :py:func:`skpomade.factorization_construction.evd_nystrom`.
            The rank of each decomposition is estimated using parameters
            `tolerance_arrf` and `proba_arrf`.
            Running times are stored in attributes `t_arrf`, `t_evdn` and `t_uh_x`.
    
            Parameters
            ----------
            tolerance_arrf : float
                Tolerance for
                :py:func:`skpomade.range_approximation.adaptive_randomized_range_finder`
            proba_arrf : float
                Probability of error for
                :py:func:`skpomade.range_approximation.adaptive_randomized_range_finder`
    
            """
            for i in range(self.n_areas):
                print('Random EVD of Gabor multiplier #{}'.format(i))
                print('#coefs in mask: {} ({:.1%} missing)'
                      .format(np.sum(self.gabmul_list[i].mask),
                              np.sum(self.gabmul_list[i].mask)
                              / self.gabmul_list[i].mask.size))
                t0 = perf_counter()
                q_mat = adaptive_randomized_range_finder(a=self.gabmul_list[i],
                                                         tolerance=tolerance_arrf,
                                                         proba=proba_arrf, r=None,
                                                         rand_state=0, n_cols_Q=32)
                self.t_arrf[i] = perf_counter() - t0
                print('Q shape:', q_mat.shape)
                t0 = perf_counter()
                self.s_vec_list[i], self.u_mat_list[i] = \
                    evd_nystrom(a=self.gabmul_list[i], q_mat=q_mat)
                self.t_evdn[i] = perf_counter() - t0
                print('Running times:')
                print('   - adaptive_randomized_range_finder: {} s'.format(
                    self.t_arrf[i]))
                print('   - evd_nystrom: {} s'.format(self.t_evdn[i]))
    
                t0 = perf_counter()
                self.uh_x_list[i] = self.u_mat_list[i].T.conj() @ self.x_mix
                self.t_uh_x[i] = perf_counter() - t0
    
        def compute_decomposition_fixed_rank(self, rank):
            """
            Decompose each Gabor multiplier using a random EVD with given rank
    
            The decomposition is obtained using
            :py:func:`skpomade.range_approximation.randomized_range_finder`
            followed by :py:func:`skpomade.factorization_construction.evd_nystrom`.
            Running times are stored in attributes `t_rrf`, `t_evdn` and `t_uh_x`.
    
            Parameters
            ----------
            rank : int
                Rank of the decompostion
            """
            t_rrf = [None for i in range(self.n_areas)]
            t_evdn = [None for i in range(self.n_areas)]
            t_uh_x = [None for i in range(self.n_areas)]
            for i in range(self.n_areas):
                print('Random EVD of Gabor multiplier #{}'.format(i))
                print('#coefs in mask: {} ({:.1%})'
                      .format(np.sum(self.gabmul_list[i].mask),
                              np.sum(self.gabmul_list[i].mask)
                              / self.gabmul_list[i].mask.size))
                t0 = perf_counter()
                q_mat = randomized_range_finder(a=self.gabmul_list[i],
                                                n_l=rank,
                                                rand_state=0)
                t_rrf[i] = perf_counter() - t0
                print('Q shape:', q_mat.shape)
                t0 = perf_counter()
                self.s_vec_list[i], self.u_mat_list[i] = \
                    evd_nystrom(a=self.gabmul_list[i], q_mat=q_mat)
                t_evdn[i] = perf_counter() - t0
                print('Running times:')
                print('   - randomized_range_finder: {} s'.format(t_rrf[i]))
                print('   - evd_nystrom: {} s'.format(t_evdn[i]))
    
                t0 = perf_counter()
                self.uh_x_list[i] = self.u_mat_list[i].T.conj() @ self.x_mix
                t_uh_x[i] = perf_counter() - t0
    
        def compute_estimate(self, lambda_coef):
            """
            Compute the signal estimate for a given hyperparameter
            :math:`\lambda_i` for each sub-region :math:`i`.
    
            Prior decomposition should have been performed using
            :meth:`compute_decomposition` or
            :meth:`compute_decomposition_fixed_rank`.
    
            Parameters
            ----------
            lambda_coef : nd-array or float
                If nd-array, hyperparameters :math:`\lambda_i` for each sub-region
                :math:`i`. If float, the same value :math:`\lambda` is used
                for each sub-region.
    
            Returns
            -------
            nd-array
                Reconstructed signal
            """
            if isinstance(lambda_coef, np.ndarray):
                assert lambda_coef.size == self.n_areas
            else:
                lambda_coef = np.full(self.n_areas, fill_value=lambda_coef)
            x = self.x_mix.copy()
            for i in range(self.n_areas):
                gamma_vec = lambda_coef[i] * self.s_vec_list[i] \
                            / (1 - (1 - lambda_coef[i]) * self.s_vec_list[i])
                x -= self.u_mat_list[i] @ (gamma_vec * self.uh_x_list[i])
            return x
    
        def compute_lambda(self, x_mix, e_target=None):
            """
            Estimate hyperparameters :math:`\lambda_i` from target energy in each
            sub-region :math:`i`.
    
            Parameters
            ----------
            x_mix : nd-array
                Mix signal
    
            e_target : nd-array or None
                Target energy for each sub-region/. If None, function
                :py:func:`estimate_energy_in_mask` is used to estimate the
                target energies.
    
            Returns
            -------
            lambda_est : nd-array
                Hyperparameters :math:`\lambda_i` for each sub-region :math:`i`.
            t_est : nd-array
                Running time to estimate each hyperparameter.
            """
            if e_target is None:
                e_target = estimate_energy_in_mask(
                    x_mix=x_mix, mask=self.mask,
                    dgt_params=self.dgt_params, signal_params=self.signal_params,
                    fig_dir=self.fig_dir)
            t_est = np.empty(self.n_areas)
            lambda_est = np.empty(self.n_areas)
            for i_area in range(self.n_areas):
                mask_i = self.mask == i_area + 1
    
                def obj_fun_est(lambda_coef):
                    x = self.compute_estimate(lambda_coef)
                    x_tf_masked = mask_i * self.gabmul_list[i_area].dgt(x)
                    e_mask = np.linalg.norm(x_tf_masked, 'fro') ** 2
                    return np.abs(e_target[i_area] - e_mask)
    
                t0 = perf_counter()
                sol_est = minimize_scalar(obj_fun_est, bracket=[0, 1],
                                          method='brent')
                t_est[i_area] = perf_counter() - t0
                lambda_est[i_area] = sol_est.x
            return lambda_est, t_est
    
    
    def reconstruction(x_mix, lambda_coef, u_mat, s_vec):
        return GabMulTff(x_mix=x_mix, u_mat=u_mat, s_vec=s_vec)(lambda_coef)
    
    
    def estimate_energy_in_mask(x_mix, mask, dgt_params, signal_params,
                                fig_dir=None, prefix=None):
        """
        Estimate energy in time-frequency mask
    
        Parameters
        ----------
        x_mix : nd-array
            Mix signal
        mask: nd-array
            Time-frequency mask for each sub-region
        dgt_params : dict
            DGT parameters
        signal_params : dict
            Signal parameters
        fig_dir : Path
            If not None, folder where figures are stored. If None, figures are
            not plotted.
        prefix : str
            If not None, this prefix is used when saving the figures.
    
        Returns
        -------
        nd-array
            Estimated energy in each sub-region.
        """
        x_tf_mat = dgt(sig=x_mix, dgt_params=dgt_params)
        x_tf_mat[mask > 0] = np.nan
        e_f_mean = np.nanmean(np.abs(x_tf_mat) ** 2, axis=1)
    
        mask = mask.astype(int)
        n_areas = np.unique(mask).size - 1
        estimated_energy = np.empty(n_areas)
        e_mat = e_f_mean[:, None] @ np.ones((1, x_tf_mat.shape[1]))
        e_mat[mask == 0] = 0
        for i_area in range(n_areas):
            mask_i = mask == i_area + 1
            estimated_energy[i_area] = np.sum(e_mat * mask_i)
    
        if fig_dir is not None:
            fig_dir.mkdir(parents=True, exist_ok=True)
            if prefix is None:
                prefix = ''
            else:
                prefix = prefix + '_'
            dynrange = 100
            c_max = np.nanmax(db(x_tf_mat))
            clim = c_max - dynrange, c_max
    
            fs = signal_params['fs']
            plt.figure()
            plot_spectrogram(x=x_mix, dgt_params=dgt_params, fs=fs, clim=clim)
            plt.title('Mix')
            plt.savefig(fig_dir / '{}mix.pdf'.format(prefix))
    
            plt.figure()
            plotdgtreal(coef=np.sqrt(e_mat), a=dgt_params['hop'],
                        M=dgt_params['n_bins'], fs=fs, clim=clim)
            plt.title('Mask filled with average energy (total: {})'
                      .format(estimated_energy))
            plt.savefig(fig_dir / '{}filled_mask.pdf'.format(prefix))
    
            x_tf_mat[mask > 0] = np.sqrt(e_mat[mask > 0])
            plt.figure()
            plotdgtreal(coef=x_tf_mat, a=dgt_params['hop'],
                        M=dgt_params['n_bins'], fs=fs, clim=clim)
            plt.title('Mix filled with average energy (total: {})'
                      .format(estimated_energy))
            plt.savefig(fig_dir / '{}filled_mask.pdf'.format(prefix))
    
            plt.figure()
            plt.plot(db(e_f_mean) / 2)
            plt.xlabel('Frequency index')
            plt.ylabel('Average energy')
            plt.title('Average energy per frequency bin in mix')
            plt.savefig(fig_dir / '{}average_energy.pdf'.format(prefix))
    
            e_f_mean_check = np.mean(np.abs(x_tf_mat) ** 2, axis=1)
            plt.figure()
            plt.plot(db(e_f_mean) / 2, label='Before filling')
            plt.plot(db(e_f_mean_check) / 2, '--', label='After filling')
            plt.xlabel('Frequency index')
            plt.ylabel('Average energy')
            plt.title('Average energy per frequency bin in mix')
            plt.legend()
            plt.savefig(fig_dir / '{}average_energy_check.pdf'.format(prefix))
    
        return estimated_energy
    
    
    def compute_lambda_oracle_sdr(gmtff, x_wb):
        """
        Compute oracle value for hyperparameter :math:`\lambda_i` from true
        solution.
    
        If only one region is considered, the Brent's algorithm is used (see
        :py:func:`scipy.optimize.minimize_scalar`). If multiple sub-regions are
        considered the BFGS
        algorithm is used (see :py:func:`scipy.optimize.minimize`).
    
        Parameters
        ----------
        gmtff : GabMulTff
        x_wb : nd-array
            True signal for the wideband source.
    
        Returns
        -------
        lambda_oracle : nd-array
            Oracle values for hyperparameters :math:`\lambda_i` for each
            sub-region :math:`i`.
        t_oracle : nd-array
            Running times for the computation of each hyperparameter
        """
        t0 = perf_counter()
    
        def obj_fun_oracle(lambda_coef):
            return np.linalg.norm(x_wb - gmtff.compute_estimate(lambda_coef))
    
        if gmtff.tol_subregions is None:
            sol_oracle = minimize_scalar(obj_fun_oracle,
                                         bracket=[0, 1], method='brent')
            lambda_oracle = np.array([sol_oracle.x])
        else:
            sol_oracle = minimize(obj_fun_oracle,
                                  np.ones(gmtff.n_areas),
                                  method='BFGS',
                                  options={'disp': True})
            lambda_oracle = sol_oracle.x
        t_oracle = perf_counter() - t0
        return lambda_oracle, t_oracle