Source code for piscat.Trajectory.temporal_filtering

import numpy as np
from tqdm.autonotebook import tqdm


[docs]class TemporalFilter: def __init__(self, video, batchSize): """Filters to be applied to temporal features are included in this class. Parameters ---------- video: NDArray Input video. batchSize: int Batch size that is used for DRA. """ self.video = video self.batchSize = batchSize
[docs] def filter_tarj_base_length(self, df_PSFs, threshold_min, threshold_max): """This function removes the particle from data frames that have a temporal length smaller and bigger than threshold_min values. Parameters ---------- df_PSFs: pandas dataframe The data frame contains PSFs locations and ID (x, y, frame, sigma, particle, ...) threshold_min: int The minimum acceptable temporal length of one particle. threshold_max: int The maximum acceptable temporal length of one particle. Returns ------- particles: pandas dataframe The filter data frame (x, y, frame, sigma, particle, ...) his_all_particles: list List that shows the statistics of particles length. """ if df_PSFs.shape[0] == 0 or df_PSFs is None: raise ValueError("---data frames is empty!---") his_all_particles = df_PSFs["particle"].value_counts() temp_0 = his_all_particles.where(his_all_particles >= threshold_min) temp = his_all_particles.where(temp_0 <= threshold_max) select_particles = temp[~temp.isnull()] index_particles = select_particles.index particles = df_PSFs.loc[df_PSFs["particle"].isin(index_particles)] return particles, his_all_particles
[docs] def v_trajectory(self, df_PSFs, threshold_min, threshold_max): """This function extract v-shape of the particle that has a temporal length between threshold_min and threshold_min max values. Parameters ---------- df_PSFs: pandas dataframe The data frame contains PSFs locations and ID (x, y, frame, sigma, particle, ...). threshold_min: int The minimum acceptable temporal length of one particle. threshold_max: int The maximum acceptable temporal length of one particle. Returns ------- all_trajectories: List of list Returns list of extracted data (i.e [List of list]) | [intensity_horizontal, intensity_vertical, particle_center_intensity, particle_center_intensity_follow, particle_frame, particle_sigma, particle_X, particle_Y, particle_ID, optional(fit_intensity, fit_x, fit_y, fit_X_sigma, fit_Y_sigma, fit_Bias, fit_intensity_error, fit_x_error, fit_y_error, fit_X_sigma_error, fit_Y_sigma_error, fit_Bias_error)] particles: pandas dataframe The dataframe after using temporal filter (x, y, frame, sigma, particle, ...) his_all_particles: list List that shows the statistics of particles length. """ if df_PSFs.shape[0] == 0 or df_PSFs is None: raise ValueError("---data frames is empty!---") particles, his_all_particles = self.filter_tarj_base_length( df_PSFs=df_PSFs, threshold_min=threshold_min, threshold_max=threshold_max ) all_trajectories = self.v_profile(df_PSFs=particles, window_size=self.batchSize) return all_trajectories, particles, his_all_particles
[docs] def v_profile(self, df_PSFs, window_size=2000): """The V-Shape trajectories and extended version are calculated. Parameters ---------- window_size: int The maximum number of the frames that follow the V-Shape contrast from both sides. Returns ------- all_trajectories: List of list Returns array contains the following information for each particle. | [intensity_horizontal, intensity_vertical, particle_center_intensity, particle_center_intensity_follow, particle_frame, particle_sigma, particle_X, particle_Y, particle_ID, optional(fit_intensity, fit_x, fit_y, fit_X_sigma, fit_Y_sigma, fit_Bias, fit_intensity_error, fit_x_error, fit_y_error, fit_X_sigma_error, fit_Y_sigma_error, fit_Bias_error)] """ all_trajectories = [] intensity_trajectories_dic = dict() index_particles = df_PSFs["particle"].unique().tolist() print("\nstart V_trajectories without parallel loop--->", end=" ") window_size = 2 * window_size for j_ in tqdm(index_particles): particle = df_PSFs.loc[df_PSFs["particle"] == j_] particle_ID = particle["particle"].tolist() particle_X = particle["x"].tolist() particle_Y = particle["y"].tolist() particle_frame = particle["frame"].tolist() particle_center_intensity = particle["center_intensity"].tolist() particle_sigma = particle["sigma"].tolist() if "Fit_Amplitude" in particle.keys(): fit_intensity = particle["Fit_Amplitude"].tolist() fit_x = particle["Fit_X-Center"].tolist() fit_y = particle["Fit_Y-Center"].tolist() fit_X_sigma = particle["Fit_X-Sigma"].tolist() fit_Y_sigma = particle["Fit_Y-Sigma"].tolist() fit_Bias = particle["Fit_Bias"].tolist() fit_intensity_error = particle["Fit_errors_Amplitude"].tolist() fit_x_error = particle["Fit_errors_X-Center"].tolist() fit_y_error = particle["Fit_errors_Y-Center"].tolist() fit_X_sigma_error = particle["Fit_errors_X-Sigma"].tolist() fit_Y_sigma_error = particle["Fit_errors_Y-Sigma"].tolist() fit_Bias_error = particle["Fit_errors_Bias"].tolist() intensity_vertical = [] intensity_horizontal = [] for frame_num, y, x, sigma in zip( particle_frame, particle_Y, particle_X, particle_sigma ): len_profile = int(5 * sigma) min_v = np.max([y - len_profile, 0]) max_v = np.min([y + len_profile, self.video.shape[1]]) intensity_vertical.append( self.video[int(frame_num), int(min_v) : int(max_v), int(x)] ) min_h = np.max([x - len_profile, 0]) max_h = np.min([x + len_profile, self.video.shape[1]]) intensity_horizontal.append( self.video[int(frame_num), int(y), int(min_h) : int(max_h)] ) first_frame = int(particle_frame[0]) x_first_frame = int(particle_X[0]) y_first_frame = int(particle_Y[0]) last_frame = int(particle_frame[-1]) x_last_frame = int(particle_X[-1]) y_last_frame = int(particle_Y[-1]) start_frame = np.max([0, first_frame - window_size]) end_frame = np.min([self.video.shape[0], last_frame + window_size]) particle_center_intensity_follow_backward = self.video[ int(start_frame) : int(first_frame), y_first_frame, x_first_frame ] particle_center_intensity_follow_forward = self.video[ int(last_frame) : int(end_frame), y_last_frame, x_last_frame ] particle_center_intensity_follow = np.concatenate( ( particle_center_intensity_follow_backward, particle_center_intensity, particle_center_intensity_follow_forward, ), axis=0, ) trajectories = [] trajectories.append(intensity_horizontal) trajectories.append(intensity_vertical) trajectories.append(particle_center_intensity) trajectories.append(particle_center_intensity_follow) trajectories.append(particle_frame) trajectories.append(particle_sigma) trajectories.append(particle_X) trajectories.append(particle_Y) trajectories.append(particle_ID) if "Fit_Amplitude" in particle.keys(): trajectories.append(fit_intensity) trajectories.append(fit_x) trajectories.append(fit_y) trajectories.append(fit_X_sigma) trajectories.append(fit_Y_sigma) trajectories.append(fit_Bias) trajectories.append(fit_intensity_error) trajectories.append(fit_x_error) trajectories.append(fit_y_error) trajectories.append(fit_X_sigma_error) trajectories.append(fit_Y_sigma_error) trajectories.append(fit_Bias_error) all_trajectories.append(trajectories) print("Done") return all_trajectories