Source code for piscat.Preproccessing.filtering

from __future__ import print_function

import cv2
import numpy as np
import scipy.fftpack
import scipy.ndimage
import scipy.signal
from joblib import Parallel, delayed
from PySide6.QtCore import QObject, QRunnable, Signal, Slot
from scipy.ndimage import median_filter
from skimage import filters
from tqdm.autonotebook import tqdm

from piscat.InputOutput.cpu_configurations import CPUConfigurations


class WorkerSignals(QObject):
    finished = Signal()
    error = Signal(tuple)
    result = Signal(object)
    progress = Signal(int)


[docs]class Filters: def __init__(self, video, inter_flag_parallel_active=True): """This class generates a list of video/image filters. To improve performance on large video files, some of them have a parallel implementation. Parameters ---------- video: NDArray The video is 3D-numpy (number of frames, width, height). inter_flag_parallel_active: bool If the user wants to enable general parallel tasks in the CPU configuration, he or she can only use this flag to enable or disable this process. """ self.cpu = CPUConfigurations() self.inter_flag_parallel_active = inter_flag_parallel_active self.video = video self.filtered_video = None
[docs] def temporal_median(self): """By extracting the temporal median from pixels, the background is corrected. Returns ------- @returns: NDArray The background corrected video as 3D-numpy """ video_med = np.median(self.video, axis=0) video_med_ = np.expand_dims(video_med, axis=0) return np.divide(self.video, video_med_) - 1
[docs] def flat_field(self, sigma): """This method corrects the video background by creating a synthetic flat fielding version of the background. Parameters ---------- sigma: float Sigma of Gaussian filter that use to create blur video. Returns ------- flat_field_video: NDArray The background corrected video as 3D-numpy """ blur_video = self.gaussian(sigma) flat_field_video = np.divide(self.video, blur_video) - 1 return flat_field_video
[docs] def median(self, size): """ This function applies a 2D median filter on each frame. Parameters ---------- size: int Kernel size of the median filter. Returns ------- self.blur_video: NDArray The filter video as 3D-numpy. """ if self.cpu.parallel_active is True and self.inter_flag_parallel_active is True: print("\n---start median filter with Parallel---") result = Parallel( n_jobs=self.cpu.n_jobs, backend=self.cpu.backend, verbose=self.cpu.verbose )(delayed(self.median_kernel)(x, size) for x in tqdm(range(self.video.shape[0]))) arry_result = np.asarray(result) self.filtered_video = np.reshape( arry_result, (len(result), self.video.shape[1], self.video.shape[2]) ) else: print("\n---start median filter without Parallel---") result = [ median_filter(self.video[i_, :, :], size) for i_ in range(self.video.shape[0]) ] arry_result = np.asarray(result) self.filtered_video = np.reshape( arry_result, (len(result), self.video.shape[1], self.video.shape[2]) ) return self.filtered_video
def median_kernel(self, i_, size): return median_filter(self.video[i_, :, :], size)
[docs] def gaussian(self, sigma): """ This function applies a 2D Gaussian filter on each frame. Parameters ---------- sigma: float Sigma of Gaussian filter that use to create blur video. Returns ------- self.blur_video: NDArray The filter video as 3D-numpy. """ if self.cpu.parallel_active is True and self.inter_flag_parallel_active is True: print("\n---start gaussian filter with Parallel---") result = Parallel( n_jobs=self.cpu.n_jobs, backend=self.cpu.backend, verbose=self.cpu.verbose )(delayed(self.gaussian_kernel)(x, sigma) for x in tqdm(range(self.video.shape[0]))) arry_result = np.asarray(result) self.filtered_video = np.reshape( arry_result, (len(result), self.video.shape[1], self.video.shape[2]) ) else: print("\n---start gaussian filter without Parallel---") result = [ filters.gaussian(self.video[i_, :, :], sigma=sigma) for i_ in range(self.video.shape[0]) ] arry_result = np.asarray(result) self.filtered_video = np.reshape( arry_result, (len(result), self.video.shape[1], self.video.shape[2]) ) return self.filtered_video
def gaussian_kernel(self, i_, sigma): return filters.gaussian(self.video[i_, :, :], sigma=sigma, preserve_range=True)
[docs]class FFT2D(QRunnable): def __init__(self, video): """ This class computes the 2D spectrum of video. Parameters ---------- video: NDArray The video is 3D-numpy (number of frames, width, height). """ super(FFT2D, self).__init__() self.signals = WorkerSignals() self.video = video
[docs] @Slot() def run(self): result_ = self.fft2D() result = self.log2_scale(result_) self.signals.result.emit(result)
def fft2D(self): im_fft = np.ndarray(self.video.shape, complex) for i in range(0, self.video.shape[0]): im_fft[i, :, :] = np.fft.fftshift(np.fft.fft2(self.video[i, :, :])) return im_fft def log2_scale(self, video): return np.log2(np.abs(video))
[docs]class RadialVarianceTransform: def __init__(self, inter_flag_parallel_active=True): """Efficient Python implementation of Radial Variance Transform. The main function is :func:`rvt` in the bottom of the file, which applies the transform to a single image (2D numpy array). Compared to the vanilla convolution implementation, there are two speed-ups: 1) Pre-calculating and caching kernel FFT; this way so only one inverse FFT is calculated per convolution + one direct fft of the image is used for all convolutions 2) When finding MoV, calculate ``np.mean(rsqmeans)`` in a single convolution by averaging all kernels first Parameters ---------- inter_flag_parallel_active: bool In case the user wants to active general parallel tasks in CPU configuration, the user can only active or deactivate this method by this flag. References ---------- The Radial Variance Transform code has been adopted from the GitHub repository mentioned below. [1] Kashkanova, Anna D., et al. "Precision single-particle localization using radial variance transform." Optics Express 29.7 (2021): 11070-11083. [2] https://github.com/SandoghdarLab/rvt """ self.cpu = CPUConfigurations() self._kernels_fft_cache = {} self.video = None self.inter_flag_parallel_active = inter_flag_parallel_active
[docs] def gen_r_kernel(self, r, rmax): """Generate a ring kernel with radius `r` and size ``2*rmax+1``""" a = rmax * 2 + 1 k = np.zeros((a, a)) for i in range(a): for j in range(a): rij = ((i - rmax) ** 2.0 + (j - rmax) ** 2) ** 0.5 if int(rij) == r: k[i][j] = 1.0 tmp = k / np.sum(k) return tmp
[docs] def generate_all_kernels(self, rmin, rmax, coarse_factor=1, coarse_mode="add"): """Generate a set of kernels with radii between `rmin` and `rmax` and sizes ``2*rmax+1``. ``coarse_factor`` and ``coarse_mode`` determine if the number of those kernels is reduced by either skipping or adding them (see :func:`rvt` for a more detail explanation). """ kernels = [self.gen_r_kernel(r, rmax) for r in range(rmin, rmax + 1)] if coarse_factor > 1: if coarse_mode == "skip": kernels = kernels[::coarse_factor] else: kernels = [ np.sum(kernels[i : i + coarse_factor], axis=0) / coarse_factor for i in range(0, len(kernels), coarse_factor) ] return kernels
def _check_core_args(self, rmin, rmax, kind, coarse_mode="add"): """Check validity of the core algorithm arguments""" if rmin < 0 or rmax < 0: raise ValueError("radius should be non-negative") if rmin >= rmax: raise ValueError("rmax should be strictly greater than rmin") if kind not in {"basic", "normalized"}: raise ValueError("unrecognized kind: {}; can be either 'basic' or 'normalized'") if coarse_mode not in {"add", "skip"}: raise ValueError("unrecognized coarse mode: {}; can be either 'add' or 'skip'") def _check_args(self, rmin, rmax, kind, coarse_mode, highpass_size, upsample): """Check validity of all the algorithm arguments""" self._check_core_args(rmin, rmax, kind, coarse_mode) if upsample < 1: raise ValueError("upsampling factor should be positive") if highpass_size is not None and highpass_size < 0.3: raise ValueError("high-pass filter size should be >= 0.3") ## Prepare auxiliary parameters
[docs] def get_fshape(self, s1, s2, fast_mode=False): """Get the required shape of the transformed image given the shape of the original image and the kernel.""" shape = s1 if fast_mode else s1 + s2 - 1 fshape = [scipy.fftpack.next_fast_len(int(d)) for d in shape] tmp = tuple(fshape) return tmp
## Preparing FFTs of arrays
[docs] def prepare_fft(self, inp, fshape, pad_mode="constant"): """Prepare the image for a convolution by taking its Fourier transform, applying padding if necessary.""" if pad_mode == "fast": return np.fft.rfftn(inp, fshape) else: pad = [((td - d) // 2, (td - d + 1) // 2) for td, d in zip(fshape, inp.shape)] inp = np.pad(inp, pad, mode=pad_mode) tmp = np.fft.rfftn(inp) return tmp
## Shortcut of SciPy fftconvolve, which takes already fft'd arrays on the input
[docs] def convolve_fft(self, sp1, sp2, s1, s2, fshape, fast_mode=False): """Calculate the convolution from the Fourier transforms of the original image and the kernel, trimming the result if necessary.""" ret = np.fft.irfftn(sp1 * sp2, fshape) if fast_mode: return np.roll(ret, (-(s2[0] // 2), -(s2[1] // 2)), (0, 1))[: s1[0], : s1[1]].copy() else: off = (fshape[0] - s1[0]) // 2 + s2[0] // 2, (fshape[1] - s1[1]) // 2 + s2[1] // 2 tmp = ret[off[0] : off[0] + s1[0], off[1] : off[1] + s1[1]].copy() return tmp
[docs] def rvt_core( self, img, rmin, rmax, kind="basic", rweights=None, coarse_factor=1, coarse_mode="add", pad_mode="constant", ): """Perform core part of Radial Variance Transform (RVT) of an image. Parameters ---------- img: NDArray source image (2D numpy array) rmin: float minimal radius (inclusive) rmax: float maximal radius (inclusive) kind: str, ("basic", "normalized") either ``"basic"`` (only VoM), or ``"normalized"`` (VoM/MoV); normalized version increases subpixel bias, but it works better at lower SNR rweights: relative weights of different radial kernels; must be a 1D array of the length ``(rmax-rmin+1)//coarse_factor`` coarse_factor: the reduction factor for the number ring kernels; can be used to speed up calculations at the expense of precision coarse_mode: str, ("add", "skip") The reduction method; can be ``"add"`` (add ``coarse_factor`` rings in a row to get a thicker ring, which works better for smooth features), or ``"skip"`` (only keep on in ``coarse_factor`` rings, which works better for very fine features) pad_mode: str, ("constant", "reflect", "edge", "fast") Edge padding mode for convolutions; can be either one of modes accepted by ``np.pad`` (such as ``"constant"``, ``"reflect"``, or ``"edge"``), or ``"fast"``, which means faster no-padding (a combination of ``"wrap"`` and ``"constant"`` padding depending on the image size); ``"fast"`` mode works faster for smaller images and larger ``rmax``, but the border pixels (within ``rmax`` from the edge) are less reliable; note that the image mean is subtracted before processing, so ``pad_mode="constant"`` (default) is equivalent to padding with a constant value equal to the image mean """ self._check_core_args(rmin, rmax, kind, coarse_mode) # check arguments validity s1 = np.array(img.shape) s2 = np.array([rmax * 2 + 1, rmax * 2 + 1]) fast_mode = pad_mode == "fast" fshape = self.get_fshape( s1, s2, fast_mode=fast_mode ) # calculate the padded image shape (add padding and get to the next "good" FFT size) cache_k = (rmin, rmax, coarse_factor, coarse_mode) + fshape if ( cache_k not in self._kernels_fft_cache ): # generate convolution kernels, if they are not in cache yet kernels = self.generate_all_kernels( rmin, rmax, coarse_factor=coarse_factor, coarse_mode=coarse_mode ) self._kernels_fft_cache[cache_k] = [ self.prepare_fft(k, fshape, pad_mode="fast") for k in kernels ] # pad_mode="fast" corresponds to the default zero-padding here kernels_fft = self._kernels_fft_cache[ cache_k ] # get the convolution kernels (either newely generated, or cached) if rweights is not None: rweights = np.asarray(rweights) if len(rweights) != len(kernels_fft): raise ValueError( "The number of weights {} differs from the number of kernels {}".format( len(rweights), len(kernels_fft) ) ) rweights = rweights / rweights.sum() # normalize weights by their sum img = ( img - img.mean() ) # subtract mean (makes VoM calculation more stable and zero-padding more meaningful) img_fft = self.prepare_fft( img, fshape, pad_mode=pad_mode ) # prepare image FFT (only needs to be done once) rmeans = np.array( [ self.convolve_fft(img_fft, k_fft, s1, s2, fshape, fast_mode=fast_mode) for k_fft in kernels_fft ] ) # calculate M_r for all radii if rweights is None: vom = np.var( rmeans, axis=0 ) # calculate VoM as a standard variance of M_r along the radius axis else: vom = ( np.sum(rmeans**2 * rweights[:, None, None], axis=0) - np.sum(rmeans * rweights[:, None, None], axis=0) ** 2 ) # calculate VoM as a weighted variance of M_r along the radius axis if kind == "basic": return vom else: # calculate MoV for normalization imgsq_fft = self.prepare_fft(img**2, fshape, pad_mode=pad_mode) # prepare image FFT if rweights is None: sumk_fft = np.mean(kernels_fft, axis=0) # find combined kernel as a standard mean mov = self.convolve_fft( imgsq_fft, sumk_fft, s1, s2, fshape, fast_mode=fast_mode ) - np.mean( rmeans**2, axis=0 ) # use the combined kernel to find MoV in one convolution else: sumk_fft = np.sum( kernels_fft * rweights[:, None, None], axis=0 ) # find combined kernel as a weighted mean mov = self.convolve_fft( imgsq_fft, sumk_fft, s1, s2, fshape, fast_mode=fast_mode ) - np.sum( rmeans**2 * rweights[:, None, None], axis=0 ) # use the combined kernel to find MoV in one convolution tmp = vom / mov return tmp
[docs] def high_pass(self, img, size): """Perform Gaussian high-pass filter on the image""" img = img.astype(float) tmp = img - scipy.ndimage.gaussian_filter(img, size) return tmp
[docs] def rvt( self, img, rmin, rmax, kind="basic", highpass_size=None, upsample=1, rweights=None, coarse_factor=1, coarse_mode="add", pad_mode="constant", ): """Perform Radial Variance Transform (RVT) of an image. Parameters ---------- img: NDArray source image (2D numpy array) rmin: minimal radius (inclusive) rmax: maximal radius (inclusive) kind: either ``"basic"`` (only VoM), or ``"normalized"`` (VoM/MoV); normalized version increases subpixel bias, but it works better at lower SNR highpass_size: size of the high-pass filter; ``None`` means no filter (effectively, infinite size) upsample: int integer image upsampling factor; `rmin` and `rmax` are adjusted automatically (i.e., they refer to the non-upsampled image); if ``upsample>1``, the resulting image size is multiplied by ``upsample`` rweights: relative weights of different radial kernels; must be a 1D array of the length ``(rmax-rmin+1)//coarse_factor`` coarse_factor: the reduction factor for the number ring kernels; can be used to speed up calculations at the expense of precision coarse_mode: the reduction method; can be ``"add"`` (add ``coarse_factor`` rings in a row to get a thicker ring, which works better for smooth features), or ``"skip"`` (only keep on in ``coarse_factor`` rings, which works better for very fine features) coarse_factor: the reduction factor for the number ring kernels; can be used to speed up calculations at the expense of precision coarse_mode: the reduction method; can be ``"add"`` (add ``coarse_factor`` rings in a row to get a thicker ring, which works better for smooth features), or ``"skip"`` (only keep on in ``coarse_factor`` rings, which works better for very fine features) pad_mode: edge padding mode for convolutions; can be either one of modes accepted by ``np.pad`` (such as ``"constant"``, ``"reflect"``, or ``"edge"``), or ``"fast"``, which means faster no-padding (a combination of ``"wrap"`` and ``"constant"`` padding depending on the image size); ``"fast"`` mode works faster for smaller images and larger ``rmax``, but the border pixels (within ``rmax`` from the edge) are less reliable; note that the image mean is subtracted before processing, so ``pad_mode="constant"`` (default) is equivalent to padding with a constant value equal to the image mean Returns ------- Returns transform source image """ upsample = int(upsample) self._check_args(rmin, rmax, kind, coarse_mode, highpass_size, upsample) if highpass_size is not None: img = self.high_pass(img, highpass_size) if upsample > 1: img = img.repeat(upsample, axis=-2).repeat( upsample, axis=-1 ) # nearest-neighbor upsampling on both axes if rweights is not None: rweights = np.asarray(rweights).repeat( upsample ) # upsample radial weights as well rmin *= upsample # increase minimal and maximal radii rmax *= upsample tmp = self.rvt_core( img, rmin, rmax, kind=kind, rweights=rweights, coarse_factor=coarse_factor, coarse_mode=coarse_mode, pad_mode=pad_mode, ) return tmp
[docs] def rvt_video( self, video, rmin, rmax, kind="basic", highpass_size=None, upsample=1, rweights=None, coarse_factor=1, coarse_mode="add", pad_mode="constant", ): """This is an RVT wrapper that allows you to get video in parallel. Parameters ---------- video: NDArray The video is 3D-numpy (number of frames, width, height). rmin: minimal radius (inclusive) rmax: maximal radius (inclusive) kind: either ``"basic"`` (only VoM), or ``"normalized"`` (VoM/MoV); normalized version increases subpixel bias, but it works better at lower SNR highpass_size: size of the high-pass filter; ``None`` means no filter (effectively, infinite size) upsample: int integer image upsampling factor; `rmin` and `rmax` are adjusted automatically (i.e., they refer to the non-upsampled image); if ``upsample>1``, the resulting image size is multiplied by ``upsample`` rweights: relative weights of different radial kernels; must be a 1D array of the length ``(rmax-rmin+1)//coarse_factor`` coarse_factor: the reduction factor for the number ring kernels; can be used to speed up calculations at the expense of precision coarse_mode: the reduction method; can be ``"add"`` (add ``coarse_factor`` rings in a row to get a thicker ring, which works better for smooth features), or ``"skip"`` (only keep on in ``coarse_factor`` rings, which works better for very fine features) coarse_factor: the reduction factor for the number ring kernels; can be used to speed up calculations at the expense of precision coarse_mode: the reduction method; can be ``"add"`` (add ``coarse_factor`` rings in a row to get a thicker ring, which works better for smooth features), or ``"skip"`` (only keep on in ``coarse_factor`` rings, which works better for very fine features) pad_mode: edge padding mode for convolutions; can be either one of modes accepted by ``np.pad`` (such as ``"constant"``, ``"reflect"``, or ``"edge"``), or ``"fast"``, which means faster no-padding (a combination of ``"wrap"`` and ``"constant"`` padding depending on the image size); ``"fast"`` mode works faster for smaller images and larger ``rmax``, but the border pixels (within ``rmax`` from the edge) are less reliable; note that the image mean is subtracted before processing, so ``pad_mode="constant"`` (default) is equivalent to padding with a constant value equal to the image mean Returns ------- Returns transform source video. """ self.video = video if self.cpu.parallel_active is True and self.inter_flag_parallel_active is True: print("\n---start RVT with Parallel---") result = Parallel( n_jobs=self.cpu.n_jobs, backend=self.cpu.backend, verbose=self.cpu.verbose )( delayed(self.rvt_kernel)( f_, rmin=rmin, rmax=rmax, kind=kind, highpass_size=highpass_size, upsample=upsample, rweights=rweights, coarse_factor=coarse_factor, coarse_mode=coarse_mode, pad_mode=pad_mode, ) for f_ in tqdm(range(video.shape[0])) ) arry_result = np.asarray(result) self.rvt_video = np.reshape( arry_result, (len(result), self.video.shape[1], self.video.shape[2]) ) else: print("\n---start RVT without Parallel---") result = [ self.rvt_kernel( i_, rmin=rmin, rmax=rmax, kind=kind, highpass_size=highpass_size, upsample=upsample, rweights=rweights, coarse_factor=coarse_factor, coarse_mode=coarse_mode, pad_mode=pad_mode, ) for i_ in tqdm(range(self.video.shape[0])) ] arry_result = np.asarray(result) self.rvt_video = np.reshape( arry_result, (len(result), self.video.shape[1], self.video.shape[2]) ) return self.rvt_video
[docs] def rvt_kernel( self, frame_num, rmin, rmax, kind="basic", highpass_size=None, upsample=1, rweights=None, coarse_factor=1, coarse_mode="add", pad_mode="constant", ): """Perform Radial Variance Transform (RVT) of an image. Parameters ---------- frame_num: int frame number rmin: minimal radius (inclusive) rmax: maximal radius (inclusive) kind: either ``"basic"`` (only VoM), or ``"normalized"`` (VoM/MoV); normalized version increases subpixel bias, but it works better at lower SNR highpass_size: size of the high-pass filter; ``None`` means no filter (effectively, infinite size) upsample: int integer image upsampling factor; `rmin` and `rmax` are adjusted automatically (i.e., they refer to the non-upsampled image); if ``upsample>1``, the resulting image size is multiplied by ``upsample`` rweights: relative weights of different radial kernels; must be a 1D array of the length ``(rmax-rmin+1)//coarse_factor`` coarse_factor: the reduction factor for the number ring kernels; can be used to speed up calculations at the expense of precision coarse_mode: the reduction method; can be ``"add"`` (add ``coarse_factor`` rings in a row to get a thicker ring, which works better for smooth features), or ``"skip"`` (only keep on in ``coarse_factor`` rings, which works better for very fine features) coarse_factor: the reduction factor for the number ring kernels; can be used to speed up calculations at the expense of precision coarse_mode: the reduction method; can be ``"add"`` (add ``coarse_factor`` rings in a row to get a thicker ring, which works better for smooth features), or ``"skip"`` (only keep on in ``coarse_factor`` rings, which works better for very fine features) pad_mode: edge padding mode for convolutions; can be either one of modes accepted by ``np.pad`` (such as ``"constant"``, ``"reflect"``, or ``"edge"``), or ``"fast"``, which means faster no-padding (a combination of ``"wrap"`` and ``"constant"`` padding depending on the image size); ``"fast"`` mode works faster for smaller images and larger ``rmax``, but the border pixels (within ``rmax`` from the edge) are less reliable; note that the image mean is subtracted before processing, so ``pad_mode="constant"`` (default) is equivalent to padding with a constant value equal to the image mean Returns ------- Returns transform source image """ img = self.video[frame_num, :, :] upsample = int(upsample) self._check_args(rmin, rmax, kind, coarse_mode, highpass_size, upsample) if highpass_size is not None: img = self.high_pass(img, highpass_size) if upsample > 1: img = img.repeat(upsample, axis=-2).repeat( upsample, axis=-1 ) # nearest-neighbor upsampling on both axes if rweights is not None: rweights = np.asarray(rweights).repeat( upsample ) # upsample radial weights as well rmin *= upsample # increase minimal and maximal radii rmax *= upsample return self.rvt_core( img, rmin, rmax, kind=kind, rweights=rweights, coarse_factor=coarse_factor, coarse_mode=coarse_mode, pad_mode=pad_mode, )
[docs]class FastRadialSymmetryTransform: def __init__(self): """Implementation of fast radial symmetry transform in pure Python using OpenCV and numpy. References ---------- The fast radial symmetry transform code has been adopted from the GitHub repository mentioned below. [1] Loy, G., & Zelinsky, A. (2002). A fast radial symmetry transform for detecting points of interest. Computer Vision, ECCV 2002. [2] https://github.com/Xonxt/frst """ pass def gradx(self, img): rows, cols = img.shape return np.hstack( (np.zeros((rows, 1)), (img[:, 2:] - img[:, :-2]) / 2.0, np.zeros((rows, 1))) ) def grady(self, img): # img = img.astype('int') rows, cols = img.shape # Use vstack to add back the rows that were dropped as zeros return np.vstack( (np.zeros((1, cols)), (img[2:, :] - img[:-2, :]) / 2.0, np.zeros((1, cols))) ) def _frst(self, img, radii, alpha, beta, stdFactor, mode="BOTH"): """Performs fast radial symmetry transform Parameters ---------- img: NDArray Input_video image, grayscale. radii: int Integer value for radius size in pixels (n in the original paper); also used to size gaussian kernel alpha: float Strictness of symmetry transform (higher=more strict; 2 is good place to start) beta: float Gradient threshold_min parameter, float in [0,1] stdFactor: float Standard deviation factor for gaussian kernel mode: str BRIGHT, DARK, or BOTH """ mode = mode.upper() assert mode in ["BRIGHT", "DARK", "BOTH"] dark = mode == "DARK" or mode == "BOTH" bright = mode == "BRIGHT" or mode == "BOTH" workingDims = tuple((e + 2 * radii) for e in img.shape) output = np.zeros(img.shape, np.float64) O_n = np.zeros(workingDims, np.float64) M_n = np.zeros(workingDims, np.float64) # Calculate gradients gx = self.gradx(img) gy = self.grady(img) # Find gradient vector magnitude gnorms = np.sqrt(np.add(np.multiply(gx, gx), np.multiply(gy, gy))) # Use beta to set threshold_min - speeds up transform significantly gthresh = np.amax(gnorms) * beta # Find x/y distance to affected pixels gpx = ( np.multiply(np.divide(gx, gnorms, out=np.zeros(gx.shape), where=gnorms != 0), radii) .round() .astype(int) ) gpy = ( np.multiply(np.divide(gy, gnorms, out=np.zeros(gy.shape), where=gnorms != 0), radii) .round() .astype(int) ) # Iterate over all pixels (w/ gradient above threshold_min) for coords, gnorm in np.ndenumerate(gnorms): if gnorm > gthresh: i, j = coords # Positively affected pixel if bright: ppve = (i + gpx[i, j], j + gpy[i, j]) O_n[ppve] += 1 M_n[ppve] += gnorm # Negatively affected pixel if dark: pnve = (i - gpx[i, j], j - gpy[i, j]) O_n[pnve] -= 1 M_n[pnve] -= gnorm # Abs and normalize O matrix O_n = np.abs(O_n) O_n = O_n / float(np.amax(O_n)) # Normalize M matrix M_max = float(np.amax(np.abs(M_n))) M_n = M_n / M_max # Elementwise multiplication F_n = np.multiply(np.power(O_n, alpha), M_n) # Gaussian blur kSize = int(np.ceil(radii / 2)) kSize = kSize + 1 if kSize % 2 == 0 else kSize S = cv2.GaussianBlur(F_n, (kSize, kSize), int(radii * stdFactor)) S = np.fliplr(S) return S[radii:-radii, radii:-radii]
[docs]class GuidedFilter: def __init__(self, I, radius, eps): """ This is a class which builds guided filter according to the channel number of guided Input. The guided input could be gray image, color image, or multi-dimensional feature map. Parameters ---------- I: NDArray Guided image or guided feature map radius: int Radius of filter eps: float Value controlling sharpness References ---------- [1] K.He, J.Sun, and X.Tang. Guided Image Filtering. TPAMI'12. """ if len(I.shape) == 2: self._Filter = GrayGuidedFilter(I, radius, eps) else: self._Filter = MultiDimGuidedFilter(I, radius, eps)
[docs] def filter(self, p): """ Parameters ---------- p: NDArray Filtering input which is 2D or 3D with format HW or HWC Returns ------- ret: NDArray Filtering output whose shape is same with input """ p = np.float32(p) if len(p.shape) == 2: return self._Filter.filter(p) elif len(p.shape) == 3: channels = p.shape[2] ret = np.zeros_like(p, dtype=np.float32) for c in range(channels): ret[:, :, c] = self._Filter.filter(p[:, :, c]) return ret
[docs]class GrayGuidedFilter: def __init__(self, I, radius, eps): """ Specific guided filter for gray guided image. Parameters ---------- I: NDArray 2D guided image radius: int Radius of filter eps: float Value controlling sharpness References ---------- The guided filter for gray guided image code has been adopted from the GitHub repository mentioned below. [1] https://github.com/lisabug/guided-filter """ self.I = np.float32(I) self.radius = radius self.eps = eps
[docs] def filter(self, p): """ Parameters ---------- p: NDArray Filtering input of 2D Returns ------- q: NDArray Filtering output of 2D """ # step 1 meanI = cv2.boxFilter(src=self.I, ddepth=-1, ksize=self.radius) meanp = cv2.boxFilter(src=p, ddepth=-1, ksize=self.radius) corrI = cv2.boxFilter(src=self.I * self.I, ddepth=-1, ksize=self.radius) corrIp = cv2.boxFilter(src=self.I * p, ddepth=-1, ksize=self.radius) # step 2 varI = corrI - meanI * meanI covIp = corrIp - meanI * meanp # step 3 a = covIp / (varI + self.eps) b = meanp - a * meanI # step 4 meana = cv2.boxFilter(src=a, ddepth=-1, ksize=self.radius) meanb = cv2.boxFilter(src=b, ddepth=-1, ksize=self.radius) # step 5 q = meana * self.I + meanb return q
[docs]class MultiDimGuidedFilter: def __init__(self, I, radius, eps): """ Specific guided filter for color guided image or multi-dimensional feature map. Parameters ---------- I: NDArray Image. radius: int Radius of filter eps: float Value controlling sharpness References ---------- The guided filter for color guided image code has been adopted from the GitHub repository mentioned below. [1] https://github.com/lisabug/guided-filter """ self.I = np.float32(I) self.radius = radius self.eps = eps self.rows = self.I.shape[0] self.cols = self.I.shape[1] self.chs = self.I.shape[2]
[docs] def filter(self, p): """ Parameters ---------- p: NDArray Filtering input of 2D Returns ------- q: NDArray Filtering output of 2D """ p_ = np.expand_dims(p, axis=2) meanI = cv2.boxFilter(src=self.I, ksize=self.radius) # (H, W, C) meanp = cv2.boxFilter(src=p_, ksize=self.radius) # (H, W, 1) I_ = self.I.reshape((self.rows * self.cols, self.chs, 1)) # (HW, C, 1) meanI_ = meanI.reshape((self.rows * self.cols, self.chs, 1)) # (HW, C, 1) corrI_ = np.matmul(I_, I_.transpose(0, 2, 1)) # (HW, C, C) corrI_ = corrI_.reshape((self.rows, self.cols, self.chs * self.chs)) # (H, W, CC) corrI_ = cv2.boxFilter(src=corrI_, ksize=self.radius) corrI = corrI_.reshape((self.rows * self.cols, self.chs, self.chs)) # (HW, C, C) U = np.expand_dims(np.eye(self.chs, dtype=np.float32), axis=0) # U = np.tile(U, (self.rows*self.cols, 1, 1)) # (HW, C, C) left = np.linalg.inv(corrI + self.eps * U) # (HW, C, C) corrIp = cv2.boxFilter(src=self.I * p_, ksize=self.radius) # (H, W, C) covIp = corrIp - meanI * meanp # (H, W, C) right = covIp.reshape((self.rows * self.cols, self.chs, 1)) # (HW, C, 1) a = np.matmul(left, right) # (HW, C, 1) axmeanI = np.matmul(a.transpose((0, 2, 1)), meanI_) # (HW, 1, 1) axmeanI = axmeanI.reshape((self.rows, self.cols, 1)) b = meanp - axmeanI # (H, W, 1) a = a.reshape((self.rows, self.cols, self.chs)) meana = cv2.boxFilter(src=a, ksize=self.radius) meanb = cv2.boxFilter(src=b, ksize=self.radius) meana = meana.reshape((self.rows * self.cols, 1, self.chs)) meanb = meanb.reshape((self.rows * self.cols, 1, 1)) I_ = self.I.reshape((self.rows * self.cols, self.chs, 1)) q = np.matmul(meana, I_) + meanb q = q.reshape((self.rows, self.cols)) return q