Source code for mvpure_py.localizer.localizer_utils

""" Functions used during sources localization. """

# Author: Julia Jurkowska

import numpy as np
from scipy.linalg import sqrtm
from tqdm import tqdm

from ..viz import plot_RN_eigenvalues
from ..utils import algebra


[docs] def suggest_n_sources_and_rank( R: np.ndarray, N: np.ndarray, show_plot: bool = True, subject: str = None, n_sources_threshold: float = 1, rank_threshold: float = 1.5, **kwargs ) -> tuple[int, int]: """ Automatically propose number of sources to localize and rank based on Proposition 3 in [1]_. Parameters ---------- R : array-like Data covariance matrix N : array-like Noise covariance matrix show_plot : bool Whether to display a graph of the eigenvalues of the :math:`RN^{-1}` matrix. Default to True. subject : str Subject name the analysis is performed for. Optional. n_sources_threshold : float Number of eigenvalues of the :math:`RN^{-1}` matrix below this threshold corresponds to the suggested number of sources to localize. Default to 1.0. For more details see Observation 1 in [1]_. rank_threshold : float Number of eigenvalues of the :math:`RN^{-1}` matrix below this threshold corresponds to the suggested rank optimization parameter. Default to 1.5. For more details see Proposition 3 in [1]_. Returns ------- n_sources : int Suggested number of sources to localize. rank : int Suggested rank optimization parameter. References ---------- """ if show_plot: _, eigvals = plot_RN_eigenvalues(R=R, N=N, subject=subject, return_eigvals=True, **kwargs) else: eigvals = algebra.get_pinv_RN_eigenvals(R=R, N=N) # Suggesting number of sources n_sources_temp = np.where(eigvals > n_sources_threshold)[0] if n_sources_temp.size == 0: raise ValueError(f"All eigenvalues of $\\mathrm{{RN}}^{{-1}}$ are smaller than {n_sources_threshold}.") else: n_sources = n_sources_temp[-1] + 1 # Suggesting rank rank_temp = np.where(eigvals > rank_threshold)[0] if rank_temp.size == 0: raise ValueError(f"All eigenvalues of $\\mathrm{{RN}}^{{-1}}$ are smaller than {rank_threshold}.") else: rank = rank_temp[-1] + 1 print(f"Suggested number of sources to localize: {n_sources}") print(f"Suggested rank is: {rank}") return int(n_sources), int(rank)
[docs] def get_activity_index(localizer_to_use: str, H: np.ndarray, R: np.ndarray, N: np.ndarray, n_sources_to_localize: int, r: int) -> tuple[np.ndarray, np.ndarray, int, np.ndarray]: """ Calculate activity index specified in ``localizer_to_use``. Parameters ----------- localizer_to_use: str activity index to use. Options: "mai", "mpz", "mai_mvp", "mpz_mvp". H : array-like, shape (n_channels, n_sources) leadfield matrix R : array-like, shape (n_channels, n_channels) data covariance matrix N : array-like, shape (n_channels, n_channels) noise covariance matrix n_sources_to_localize: int number of sources to localize r : int optimization parameter Returns ----------- index_max: array-like, shape (n_sources_to_localize,) indices of localized dipoles act_index_values: array-like, shape (n_sources_to_localize,) activity index value r : int optimization parameter used H_res: array-like, shape (n_channels, n_sources_to_localize) subset of leadfield matrix for localized dipoles """ # placeholder for localizer activity index index_max = np.zeros(n_sources_to_localize) act_index_values = np.zeros(n_sources_to_localize) # leadfield of dipoles corresponding to max index H_res = None # iterate through number of sources to localize for n in range(1, n_sources_to_localize + 1): # placeholder for activity index for each source act = np.zeros(H.shape[1]) # iterate through all sources for l in tqdm(range(H.shape[1]), total=H.shape[1]): # current dipole leadfield H_curr = H[:, l] if H_res is not None: if len(H_res.shape) == 1: H_res = H_res.reshape(-1, 1) # if some source was selected in previous loop(s) [ n > 1 ] # stack current leadfield with selected dipole(s) leadfield(s) H_curr = np.hstack((H_res, H_curr.reshape(-1, 1))) # choose activity index func = _choose_activity_index(localizer_to_use) S_curr = algebra._get_S(H_curr, R) # MAI family if "mai" in localizer_to_use: G_curr = algebra._get_G(H_curr, N) act[l] = func(G_curr, S_curr, current_iter=n, r=r) # MPZ family elif "mpz" in localizer_to_use: T_curr = algebra._get_T(H_curr, R, N) if localizer_to_use == "mpz_mvp": G_curr = algebra._get_G(H_curr, N) Q_curr = algebra._get_Q(S_curr, G_curr) act[l] = func(S_curr, T_curr, Q_curr, r=r, current_iter=n) else: act[l] = func(S_curr, T_curr, current_iter=n, r=r) # find for which dipole index is max index_max[n-1] = int(np.argmax(act)) act_index_values[n-1] = np.max(act) # create H_res with leadfields of selected dipoles if H_res is None: H_res = H[:, int(index_max[n-1])] else: H_res = np.hstack((H_res, H[:, int(index_max[n-1])].reshape(-1, 1))) if H_res.ndim == 1: H_res = H_res[:, np.newaxis] index_max = [int(i) for i in index_max] print(f"Leadfield indices corresponding to localized sources: {index_max}") return index_max, act_index_values, r, H_res
def _choose_activity_index( localizer_to_use: str ): """ Specify activity index to use. Parameters: ----------- localizer_to_use: str name of the localizer to use Returns: ----------- func: function _get_xxx_xxx specific for given activity index """ if localizer_to_use == "mai": func = _get_simple_mai elif localizer_to_use == "mpz": func = _get_simple_mpz elif localizer_to_use == "mai_mvp": func = _get_mai_mvp elif localizer_to_use == "mpz_mvp": func = _get_mpz_mvp else: raise ValueError("Only possible localizers to specify are: 'mai', 'mpz', " "'mai_mvp', 'mpz_mvp'") return func def _get_simple_mai(G: np.ndarray, S: np.ndarray, current_iter: int, r: int = None) -> float: """ Using eq. 39 from [1]_. MAI = tr{GS^(-1)} - l where G = H.T @ N^(-1) @ H [eq. 8] S = H.T @ R^(-1) @ H [eq. 5] Parameters: ----------- G : array-like, shape (n_sources, n_sources) matrix defined in eq. 8, calculated in function _get_G() S : array-like, shape (n_sources, n_sources) matrix defined in eq. 5, calculated in function _get_S() current_iter: int Number of dipole being localized in current iteration. Returns: ----------- mai: float MAI activity index as defined in [2]_. References ---------- .. [1] Piotrowski, T., Nikadon, J., & Moiseev, A. (2021). Localization of brain activity from EEG/MEG using MV-PURE framework. *Biomedical Signal Processing and control, 64*, 102243. .. [2] Moiseev, A., Gaspar, J. M., Schneider, J. A., & Herdman, A. T. (2011). Application of multi-source minimum variance beamformers for reconstruction of correlated neural activity. *NeuroImage, 58*(2), 481-496. """ if isinstance(G, float) and isinstance(S, float): mai = G / S - current_iter else: # Following implementation from source article we should useL mai = np.trace(np.matmul(G, np.linalg.pinv(S))) - current_iter return mai def _get_simple_mpz(S: np.ndarray, T: np.ndarray, current_iter: int, r: int = None) -> float: """ Using eq. 42 from [1]_. MPZ = tr{S@T^(-1)} - l where S = H.T @ R^(-1) @ H [eq. 5] T = H.T @ R^(-1) @ N @ R^(-1) @ H [eq. 12] Parameters: ----------- S : array-like, shape (n_sources, n_sources) matrix defined in eq. 5, calculated in function _get_S() T : array-like, shape (n_sources, n_sources) matrix defined in eq. 12, calculated in function _get_T() current_iter: int number of dipole being localized Returns: ----------- mpz: float MPZ activity index as defined in [2]_. References ---------- .. [1] Piotrowski, T., Nikadon, J., & Moiseev, A. (2021). Localization of brain activity from EEG/MEG using MV-PURE framework. *Biomedical Signal Processing and control, 64*, 102243. .. [2] Moiseev, A., Gaspar, J. M., Schneider, J. A., & Herdman, A. T. (2011). Application of multi-source minimum variance beamformers for reconstruction of correlated neural activity. *NeuroImage, 58*(2), 481-496. """ if isinstance(S, float) and isinstance(T, float): mpz = S / T - current_iter else: # Following implementation from source article we should useL mpz = np.trace(np.matmul(S, np.linalg.pinv(T))) - current_iter return mpz def _get_mai_mvp(G: np.ndarray, S: np.ndarray, r: int, current_iter: int) -> float: """ MAI_MVP = sum_to_l(eigenval(G @ S^(-1)) - l for 1 <= l <= r = sum_to_r(eigenval(G @ S^(-1)) - r for l > r where G = H.T @ N^(-1) @ H [eq. 8] S = H.T @ R^(-1) @ H [eq. 5] and H - leadfield matrix, N - noise covariance, and R - data covariance. Note: Using full-rank MAI_MVP localizer (r = n_sources_to_localize) replicates results for MAI (_get_simple_mai). Parameters ---------- G : array-like Matrix defined in eq. 8, computed in function _get_G() S : array-like Matrix defined in eq. 5, computed in function _get_S() r : int Optimization parameter current_iter : int Number of dipole being localized in current iteration. Returns ------- mai_mvp : float MAI_MVP activity index """ # same as mai_ext -- maybe just refer to _get_mai_extension? # if current_iter [l] <= r - use simple MAI from _get_simple_mai() if current_iter <= r: mai_mvp = _get_simple_mai(G, S, current_iter) else: s, u = np.linalg.eig(np.matmul(G, np.linalg.pinv(S))) sorted_idx = np.argsort(s)[::-1] s = s[sorted_idx] mai_mvp = np.sum(s[:r]) - r return mai_mvp def _get_mpz_mvp(S: np.ndarray, T: np.ndarray, Q: np.ndarray, r: int, current_iter: int) -> float: """ MPZ_MVP = trace{S@T^(-1)} - r for r <= l = trace(S @ T^(-1) @ P_SQ_r) - r for l > r where S = H.T @ R^(-1) @ H [eq. 5] T = H.T @ R^(-1) @ N @ R^(-1) @ H [eq. 12] and P_SQ_r is the oblique projection matrix onto the subspace of S @ Q, H - leadfield matrix, N - noise covariance, and R - data covariance. Note: Using full-rank MPZ_MVP localizer (r = n_sources_to_localize) replicates results for MPZ (_get_simple_mpz). Parameters ---------- S : array-like Matrix defined in eq. 5, calculated in function _get_S() T : array-like Matrix defined in eq. 12, calculated in function _get_T() Q : array-like Matrix Q calculated as Q = S^(-1) - G^(-1) in function _get_Q(). r : int Optimization parameter current_iter : int Number of dipole being localized in current iteration. Returns ------- mpz_mvp : float MPZ_MVP activity index """ # if current_iter [l] <= r - use simple MPZ from _get_simple_mpz() if current_iter <= r: mpz_mvp = _get_simple_mpz(S, T, current_iter) else: s, u = np.linalg.eig(np.matmul(S, Q)) sorted_idx = np.argsort(s)[::-1] u = u[:, sorted_idx] proj_matrix = u @ np.block( [[np.eye(r), np.zeros((r, current_iter - r))], [np.zeros((current_iter - r, current_iter))]] ) @ np.linalg.pinv(u) mpz_mvp = np.trace(S @ np.linalg.pinv(T) @ proj_matrix) - r return mpz_mvp