Source code for piscat.Localization.localization_filtering

import math

import numpy as np
import pandas as pd
from tqdm.autonotebook import tqdm

from piscat.InputOutput.cpu_configurations import CPUConfigurations


[docs]class SpatialFilter: def __init__(self): """We have a `SpatialFilter` class in PiSCAT that allows users to filter `outlier_frames` that have a strong vibration or a particle flying by, `dense_PSFs`, and non-symmetric PSFs that may not properly resemble the iPSF expected from the experimental setup. The threshold_min parameter in each of these filters determines the filter's sensitivity. """ self.cpu = CPUConfigurations() def list_frames(self, df_PSFs): list_frames = df_PSFs["frame"].tolist() list_frames = np.sort(np.unique(list_frames)) list_frames = list_frames.tolist() return list_frames
[docs] def outlier_frames(self, df_PSFs, threshold=20): """This function eliminates all detected PSFs in the frame that are greater than the threshold_min value. PSFs that were detected in unstable frames are reduced using this method. Parameters ---------- df_PSFs: pandas dataframe The data frame contains PSFs locations( x, y, frame, sigma) threshold: int Maximum number of PSFs in one frame. Returns ------- filter_df_PSFs: pandas dataframe The filter data frame contains PSFs locations( x, y, frame, sigma) """ if df_PSFs.shape[0] == 0 or df_PSFs is None: raise ValueError("---data frames is empty!---") print("\nstart removing crappy frames --->", end=" ") his_all_particles = df_PSFs["frame"].value_counts() temp = his_all_particles.where(his_all_particles <= threshold) select_particles = temp[~temp.isnull()] index_particles = select_particles.index filter_df_PSFs = df_PSFs.loc[df_PSFs["frame"].isin(index_particles)] print("Done!") return filter_df_PSFs
[docs] def dense_PSFs(self, df_PSFs, threshold=0): """ Remove PSFs from the dataframe that have an overlap greater than the specified portion. Parameters ---------- df_PSFs : pandas DataFrame The dataframe containing PSF locations (x, y, frame, sigma). threshold : float The threshold specifies the maximum allowable portion of overlap between two PSFs. It should be a value between 0 and 1. Returns ------- filter_df_PSFs : pandas DataFrame The filtered dataframe containing PSF locations (x, y, frame, sigma). Equation: --------- - The radius of PSF1 and PSF2 are calculated as sqrt(2) * sigma1 and sqrt(2) * sigma2, respectively. - The distance between two PSFs is calculated as d = sqrt((x1 - x2)^2 + (y1 - y2)^2). - The minimum acceptable distance (without overlap) is calculated as l = sqrt(2) * (sigma1 + sigma2). - PSFs are removed if d <= l * (1 - threshold). """ if df_PSFs.shape[0] == 0 or df_PSFs is None: print("---data frames is empty!---") return df_PSFs if type(df_PSFs) is pd.core.frame.DataFrame: df_PSFs = df_PSFs else: raise ValueError( "Input does not have correct type! This function needs panda data frames." ) list_frames = self.list_frames(df_PSFs) self.particles_after_closeFilter = df_PSFs self.index_particles_filter = self.particles_after_closeFilter.index num_particles = self.particles_after_closeFilter.shape[0] print("\n---Cleaning the df_PSFs that have drift without parallel loop---") self.point_1 = np.zeros((1, 2), dtype=np.float64) self.point_2 = np.zeros((1, 2), dtype=np.float64) self.remove_list_close = [] for frame_num in tqdm(list_frames): particle = self.particles_after_closeFilter.loc[ self.particles_after_closeFilter["frame"] == frame_num ] index_list = [index for index in particle.index] particle_X = particle["x"].tolist() particle_Y = particle["y"].tolist() particle_sigma = particle["sigma"].tolist() if len(index_list) != 1: for i_ in range(len(particle_X)): self.point_1[0, 0] = particle_X[i_] self.point_1[0, 1] = particle_Y[i_] sigma_1 = particle_sigma[i_] count_ = i_ + 1 while count_ <= (len(particle_X) - 1): self.point_2[0, 0] = particle_X[count_] self.point_2[0, 1] = particle_Y[count_] sigma_2 = particle_sigma[count_] distance = math.sqrt( ((self.point_1[0, 0] - self.point_2[0, 0]) ** 2) + ((self.point_1[0, 1] - self.point_2[0, 1]) ** 2) ) min_d = math.sqrt(2) * (sigma_1 + sigma_2) if distance <= (min_d*(1 - threshold)): self.remove_list_close.append(index_list[i_]) self.remove_list_close.append(index_list[count_]) count_ = count_ + 1 remove_list = list(set(self.remove_list_close)) self.particles_after_closeFilter = self.particles_after_closeFilter.drop( remove_list, axis=0, errors="ignore" ) print("\nNumber of PSFs before filters = {}".format(num_particles)) print( "\nNumber of PSFs after filters = {}".format( self.particles_after_closeFilter.shape[0] ) ) return self.particles_after_closeFilter.reset_index(drop=True)
[docs] def symmetric_PSFs(self, df_PSFs, threshold=0.7): """ Remove astigmatism-affected PSFs with this filter. Parameters ---------- df_PSFs: pandas dataframe The data frame contains PSFs locations( x, y, frame, sigma) threshold: float The smallest sigma ratio that is acceptable (sigma max/sigma min). Returns ------- df_PSF_thr: pandas dataframe The filter data frame contains PSFs locations( x, y, frame, sigma) """ if df_PSFs is not None and df_PSFs.shape[0] > 0: if "Sigma_ratio" in df_PSFs.keys(): idx_thr_sigma = df_PSFs.Sigma_ratio > threshold idx_nan_sigma = df_PSFs.Sigma_ratio.isnull() df_PSF_thr = df_PSFs[idx_thr_sigma.tolist() or idx_nan_sigma.tolist()] return df_PSF_thr else: raise RuntimeError("---Sigma ratio does not exist in data frames---") else: raise RuntimeError("---data frames is empty!---")
[docs] def remove_side_lobes_artifact(self, df_PSFs, threshold=0): """This filter removes false detections on side lobes of PSFs caused by the localization algorithm by comparing center intensity contrast. Parameters ---------- df_PSFs: pandas dataframe The data frame contains PSFs locations( x, y, frame, sigma, center_intensity) threshold: float It specifies the portion of the overlay that two PSFs must have to remove from the list. Returns ------- filter_df_PSFs: pandas dataframe The filter data frame contains PSFs locations( x, y, frame, sigma, center_intensity) """ if df_PSFs.shape[0] == 0 or df_PSFs is None: print("---data frames is empty!---") return df_PSFs if type(df_PSFs) is pd.core.frame.DataFrame: df_PSFs = df_PSFs else: raise ValueError( "Input does not have correct type! This function needs panda data frames." ) list_frames = self.list_frames(df_PSFs) self.particles_after_closeFilter = df_PSFs self.index_particles_filter = self.particles_after_closeFilter.index num_particles = self.particles_after_closeFilter.shape[0] print("\n---Cleaning the df_PSFs that have side lobs without parallel loop---") self.point_1 = np.zeros((1, 2), dtype=np.float64) self.point_2 = np.zeros((1, 2), dtype=np.float64) self.remove_list_close = [] for frame_num in tqdm(list_frames): particle = self.particles_after_closeFilter.loc[ self.particles_after_closeFilter["frame"] == frame_num ] index_list = [index for index in particle.index] particle_X = particle["x"].tolist() particle_Y = particle["y"].tolist() particle_sigma = particle["sigma"].tolist() particle_center_intensity = particle["center_intensity"].tolist() if len(index_list) != 1: for i_ in range(len(particle_X)): self.point_1[0, 0] = particle_X[i_] self.point_1[0, 1] = particle_Y[i_] sigma_1 = particle_sigma[i_] count_ = i_ + 1 while count_ <= (len(particle_X) - 1): self.point_2[0, 0] = particle_X[count_] self.point_2[0, 1] = particle_Y[count_] sigma_2 = particle_sigma[count_] distance = math.sqrt( ((self.point_1[0, 0] - self.point_2[0, 0]) ** 2) + ((self.point_1[0, 1] - self.point_2[0, 1]) ** 2) ) tmp = math.sqrt(2) * (sigma_1 + sigma_2) if distance <= ((math.sqrt(2) * (sigma_1 + sigma_2)) - (threshold * tmp)): intesity_1 = particle_center_intensity[i_] intesity_2 = particle_center_intensity[count_] if np.abs(intesity_1) == np.abs(intesity_2): self.remove_list_close.append(index_list[i_]) self.remove_list_close.append(index_list[count_]) if np.abs(intesity_1) > np.abs(intesity_2): self.remove_list_close.append(index_list[count_]) if intesity_1 < intesity_2: self.remove_list_close.append(index_list[i_]) count_ = count_ + 1 remove_list = list(set(self.remove_list_close)) self.particles_after_closeFilter = self.particles_after_closeFilter.drop( remove_list, axis=0, errors="ignore" ) print("\nNumber of PSFs before filters = {}".format(num_particles)) print( "\nNumber of PSFs after filters = {}".format( self.particles_after_closeFilter.shape[0] ) ) return self.particles_after_closeFilter.reset_index(drop=True)