import importlib
import logging
import os
import time
import cv2
import numba as nb
import numpy as np
import skimage as ski
import scipy as sp
import toml
# import sympy.ntheory.generate
logger = logging.getLogger(__name__)
def _version():
"""Version of package.
Use version string in 'pyproject.toml' as the single source of truth."""
try:
# in order not to confuse an installed version of a package with a local one,
# first try the local one (not being installed)
meta = toml.load(os.path.join(os.path.dirname(__file__), "..", "..", "pyproject.toml"))
version = meta["project"]["version"] # Python Packaging User Guide expects version here
except KeyError:
version = meta["tool"]["poetry"]["version"] # Poetry expects version here
except FileNotFoundError:
version = importlib.metadata.version("fringes") # installed version
return version
[docs]
def vshape(data: np.ndarray, channels: list | tuple = (1, 3, 4)) -> np.ndarray:
"""Standardizes the input data shape.
Transforms video data into the standardized shape (T, Y, X, C), where
T is number of frames, Y is height, X is width, and C is number of color channels.
Inspired from `scikit-video <http://www.scikit-video.org/stable/modules/generated/skvideo.utils.vshape.html>`_.
Parameters
----------
data : ndarray
Input data of arbitrary shape.
channels : list | tuple
Allowed number of color channels.
Returns
-------
videodata : ndarray
Standardized version of data, in shape (T, Y, X, C), where
T is number of frames, Y is height, X is width, and C is number of color channels.
Notes
-----
Ensures that the array becomes 4-dimensional
and that the length of the last dimension is in 'channels').
To do this, leading dimensions may be flattened.
Examples
--------
>>> from fringes import vshape
>>> data = np.ones(100)
>>> videodata = vshape(data)
>>> videodata.shape
(100, 1, 1, 1)
>>> data = np.ones((1200, 1920))
>>> videodata = vshape(data)
>>> videodata.shape
(1, 1200, 1920, 1)
>>> data = np.ones((1200, 1920, 3))
>>> videodata = vshape(data)
>>> videodata.shape
(1, 1200, 1920, 3)
>>> data = np.ones((100, 1200, 1920))
>>> videodata = vshape(data)
>>> videodata.shape
(100, 1200, 1920, 1)
>>> data = np.ones((100, 1200, 1920, 3))
>>> videodata = vshape(data)
>>> videodata.shape
(100, 1200, 1920, 3)
>>> data = np.ones((2, 3, 4, 1200, 1920))
>>> videodata = vshape(data)
>>> videodata.shape
(24, 1200, 1920, 1)
"""
if data.ndim == 0:
data = data.reshape(1) # returns a view
if data.ndim > 4:
if data.shape[-1] in channels:
T = np.prod(data.shape[:-3])
Y, X, C = data.shape[-3:]
else:
T = np.prod(data.shape[:-2])
Y, X = data.shape[-2:]
C = 1
elif data.ndim == 4:
if data.shape[-1] in channels:
T, Y, X, C = data.shape
else:
T = np.prod(data.shape[:2])
X, Y = data.shape[2:]
C = 1
elif data.ndim == 3:
if data.shape[-1] in channels:
T = 1
Y, X, C = data.shape
else:
T, Y, X = data.shape
C = 1
elif data.ndim == 2:
T = 1
Y, X = data.shape
C = 1
elif data.ndim == 1:
T = data.shape[0]
Y = 1
X = 1
C = 1
return data.reshape(T, Y, X, C) # returns a view
[docs]
def simulate(
I: np.ndarray,
# M: float = 1,
PSF: float = 0,
system_gain: float = 0.038,
dark_current: float = 3.64 / 0.038, # [electrons] # some cameras feature a dark current compensation
dark_noise: float = 13.7, # [electrons]
seed: int = 268664434431581513926327163960690138719, # secrets.randbits(128)
) -> np.ndarray:
"""Simulate the recording, i.e. the transmission channel.
This includes the modulation transfer function (computed from the imaging system's point spread function)
and intensity noise added by the camera.
Parameters
----------
I : np.ndarray
Fringe pattern sequence.
PSF : float, optional
Standard deviation of the Point Spread Function, in pixel units.
The default is 0.
system_gain : float, optional
System gain of the digital camera.
The default is 0.038.
dark_current : float, optional
Dark current of the digital camera, in unit electrons.
The default is ~100.
dark_noise : float, optional
Dark noise of the digital camera, in units electrons.
The default is 13.7.
seed : int, optional
A seed to initialize the Random Number Generator.
It makes the random numbers predictable.
See `Seeding and Entropy <https://numpy.org/doc/stable/reference/random/bit_generators/index.html#seeding-and-entropy>`_ for more information about seeding.
Returns
-------
I : np.ndarray
Simulated fringe pattern sequence.
"""
# M : float
# Optical magnification of the imaging system.
t0 = time.perf_counter()
I.shape = vshape(I).shape
dtype = I.dtype
I = I.astype(float, copy=False)
# # magnification
# if magnification != 1: # attention: magnification must be an integer
# I = sp.ndimage.uniform_filter(I, size=magnification, mode="reflect", axes=(1, 2))
# # magnification
# if M != 1: # todo: float magnification
# I = sp.ndimage.uniform_filter(I, size=M, mode="nearest", axes=(1, 2))
# PSF (e.g. defocus)
if PSF != 0:
I = sp.ndimage.gaussian_filter(I, sigma=PSF, order=0, mode="nearest", axes=(1, 2))
if system_gain > 0:
# random number generator
rng = np.random.default_rng(seed)
# add shot noise
shot = (rng.poisson(I) - I) * np.sqrt(system_gain)
shot_new = rng.poisson(I / system_gain) * system_gain - I
I_shot = rng.poisson(I / system_gain) * system_gain
I += shot
# todo: int
# s_ = np.std(shot)
if dark_current > 0 or dark_noise > 0:
# add dark signal and dark noise
dark_current_y = dark_current * system_gain
dark_noise_y = dark_noise * system_gain
dark = rng.normal(dark_current_y, dark_noise_y, I.shape)
I += dark
# d_ = np.std(dark)
DSNU = 0 # todo: spatial non-uniformity
if DSNU:
# add spatial non-uniformity
I += DSNU
# clip values
if system_gain > 0 or np.any(DSNU > 0):
Imax = np.iinfo(dtype).max if dtype.kind in "ui" else 1
np.clip(I, 0, Imax, out=I)
# quantization noise is added by converting to integer
I = I.astype(dtype, copy=False)
logger.info(f"{1000 * (time.perf_counter() - t0)}ms")
return I
[docs]
def gamma_auto_correct(I: np.ndarray) -> np.ndarray:
"""Automatically estimate and apply the gamma correction factor
to linearize the display/camera response curve.
Parameters
----------
I : np.ndarray
Recorded data.
Returns
-------
I_lin : np.ndarray
Linearized data.
"""
# normalize to [0, 1]
Imax = np.iinfo(I.dtype).max if I.dtype.kind in "ui" else 1 if I.max() < 1 else I.max()
I = I / Imax
# estimate gamma correction factor
med = np.nanmedian(I) # Median is a robust estimator for the mean.
gamma = np.log(med) / np.log(0.5)
inv_gamma = 1 / gamma
# apply inverse gamma
# table = np.array([((g / self.Imax) ** invGamma) * self.Imax for g in range(self.Imax + 1)], self.dtype)
# I = cv2.LUT(I, table)
I **= inv_gamma
I *= Imax
return I
# def degamma(I): # todo degamma
# """Gamma correction.
#
# Assumes equally distributed histogram."""
# NotImplemented
[docs]
def circular_distance(a: float | np.ndarray, b: float | np.ndarray, c: float) -> np.ndarray:
"""Circular distance.
Parameters
----------
a : float or np.ndarray
Start points.
b : float or np.ndarray
End points.
c : float
Circumference (distance) after which wrapping occurs.
Returns
-------
d : float or np.ndarray
Circular distance from a to b.
Notes
-----
For more details, see https://ieeexplore.ieee.org/document/9771407.
"""
# dmax = c / 2
# d = b - a
# if d > dmax:
# d -= c
# elif d < -dmax:
# d += c
# cd = np.minimum(np.abs(a - b), c - np.abs(a - b))
cd = c / 2 - np.abs(c / 2 - np.abs(a - b))
return cd
# @nb.jit(cache=True, nopython=True, nogil=True, parallel=True, fastmath=True)
# def circ_dist(a, b, c) -> float:
# d = b - a
# dmax = c / 2
#
# if d > dmax:
# d -= c
# elif d < -dmax:
# d += c
# return d
[docs]
def bilateral(I: np.ndarray, k: int = 7) -> np.ndarray: # todo: test
"""Bilateral filter.
Edge-preserving, denoising filter.
Parameters
----------
I : np.ndarray
Input data.
It is reshaped to video-shape (frames 'T', height 'Y', width 'X', color channels 'C') before processing.
k : int, optional
Size of the filter kernel.
Default is 7.
Returns
-------
out : np.ndarray
Filtered data.
"""
t0 = time.perf_counter()
T, Y, X, C = vshape(I).shape
I = I.reshape(T, Y, X, C)
out = np.empty_like(I)
for t in range(T):
if C in [1, 3]:
rv = ski.restoration.denoise_bilateral(I[t], win_size=k, sigma_spatial=1, channel_axis=-1)
out[t] = rv if C == 3 else rv[..., None]
# out[t] = cv2.bilateralFilter(I[t], d=k, sigmaColor=np.std(I[t]), sigmaSpace=1)
else:
for c in range(C):
out[t, :, :, c] = ski.restoration.denoise_bilateral(I[t, :, :, c], win_size=k, sigma_spatial=1)
# out[t, :, :, c] = cv2.bilateralFilter(I[t], d=k, sigmaColor=np.std(I[t, :, :, c]), sigmaSpace=1)
logging.debug(f"{1000 * (time.perf_counter() - t0)}ms")
return out
# # @nb.jit(cache=True, nopython=True, nogil=True, parallel=True, fastmath=True) # todo
# def filter(combos, K, L, lmin):
# kroot = L ** (1 / K)
# if lmin <= kroot:
# lcombos = np.array([l for l in combos if np.any(l > kroot) and np.lcm.reduce(l) >= L])
# else:
# lcombos = np.array([l for l in combos if np.lcm.reduce(l) >= L])
#
# return lcombos
#
#
# def coprime(n: list[int] | tuple[int] | np.ndarray) -> bool: # n: iterable # todo: extend to rational numbers
# """Test whether numbers are pairwise co-prime.
#
# Parameters
# ----------
# n : list, tuple, np.ndarray
# Integer numbers.
#
# Returns
# -------
# iscoprime : bool
# True if numbers are pairwise co-prime, else False.
# """
#
# # convert to array and flatten
# n = np.array(n).ravel() # returns a view
#
# # check whether iterable has entries
# if n.size == 0:
# return False
#
# # check whether numbers are integers
# if not np.all([i % 1 == 0 for i in n]):
# return False
#
# # ensure numbers are integers
# if n.dtype != int:
# n = n.astype(int, copy=False)
#
# # check pairwise for coprimality, i.e. gcd(a, b) == 1
# for i in range(n.size): # each combination; number of combinations = n.size * (n.size - 1) / 2
# for j in range(i + 1, n.size):
# if np.gcd(n[i], n[j]) != 1:
# return False
#
# # alternatively: np.lcm.reduce(n) == np.prod(n)
#
# return True
#
#
# def extgcd(a, b):
# """Erweiterter euklidischer Algorithmus.
#
# https://hwlang.de/krypto/algo/euklid-erweitert.htm
#
# Parameters
# ----------
# a : int
# Ganzzahl.
#
# b : int
# Ganzzahl.
#
# Returns
# -------
# a : int
# Größter gemeinsamer Teiler von 'a' und 'b'.
#
# u : int
# Koeffizienten 'u' und 'v' einer Darstellung von 'a' als ganzzahlige Linearmbination.
#
# v : int
# Koeffizienten 'u' und 'v' einer Darstellung von 'a' als ganzzahlige Linearmbination.
# """
#
# u, v, s, t = 1, 0, 0, 1
# while b != 0:
# q = a // b
# a, b = b, a - q * b
# u, s = s, u - q * s
# v, t = t, v - q * t
# return a, u, v
#
#
# def modinverse(a, n):
# """Berechnet das multiplikativ inverse Element von a modulo n.
#
# https://hwlang.de/krypto/grund/inverses-element.htm
#
# Parameters
# ----------
# a : int
# Ganzzahl.
#
# n : int
# Modul.
#
# Returns
# -------
# mi : int
# Multiplikativ inverse Element von 'a' modulo 'n'.
# """
#
# g, u, v = extgcd(a, n)
# return u % n
#
#
# def chineseRemainder(nn, rr):
# """Chinesischer-Restsatz-Algorithmus.
#
# Der Vorteil dieser Implementierung nach dem Divide-and-Conquer-Prinzip besteht darin,
# dass in den unteren Rekursionsebenen viele Berechnungen mit kleinen Zahlen stattfinden
# und erst in den oberen Rekursionsebenen wenige Berechnungen mit großen Zahlen.
#
# https://hwlang.de/krypto/algo/chinese-remainder.htm
#
# Parameters
# ----------
# nn : np.ndarray, list
# Liste paarweise teilerfremder Moduln.
#
# rr : np.ndarray, list
# Liste der Reste.
#
# Returns
# -------
# mn : int
# Produkt der Moduln.
#
# x : int
# Zahl x nach dem chinesischen Restsatz.
# """
#
# if len(nn) == 1:
# return nn[0], rr[0]
# else:
# k = len(nn) // 2
# m, a = chineseRemainder(nn[:k], rr[:k])
# n, b = chineseRemainder(nn[k:], rr[k:])
# g, u, v = extgcd(m, n)
# x = (b - a) * u % n * m + a
# return m * n, x
#
#
# def modinv(x, p):
# """Modular multiplicative inverse.
#
# y = invmod(x, p) such that x*y == 1 (mod p)
#
# https://bugs.python.org/issue36027
#
# Parameters
# ----------
#
# x : int
# Integer.
#
# p : int
# Modul.
#
# Returns
# -------
# y : Modular multiplicative inverse.
# """
# return pow(x, -1, p)