Source code for fringes.fringes

import itertools as it
import json
import logging
import os
import sys
import time
from collections import namedtuple
from collections.abc import Sequence
from importlib.metadata import version
from typing import Literal

import numpy as np
import scipy as sp
import sympy
import yaml
from numba import get_num_threads, set_num_threads

from fringes import grid
from fringes.decoder import ftm, spu, temp_demod_numpy, temp_demod_numpy_unknown_frequencies
from fringes.decoder_numba import decode
from fringes.util import vshape

logger = logging.getLogger(__name__)

_2PI = 2 * np.pi
_Decoded = namedtuple("decoded", ("a", "b", "x"))
_Decoded_verbose = namedtuple("decoded_verbose", ("a", "b", "x", "p", "k", "r", "u"))


[docs] class Fringes: """Configure, encode and decode fringe patterns using phase shifting algorithms. Note ---- All parameters are implemented as properties (managed attributes) and are parsed when set. Note that some attributes have sub-dependencies, hence dependent attributes might change as well. Circular dependencies are resolved automatically. """ # note: the class docstring is continued at the end of the class in an automated manner # value limits _Dmax: int = 2 _amax: float = 2.0 _gmax: float = 4.0 # most screens have a gamma value of ~2.2 # _lminmin: float = 2.0 # l == 2 only if p0 != pi / 2 + 2pi*k, best if p0 == pi + 2pi*k with k being a positive integer # also l <= 2 yields errors in SPU: phase jumps = 2PI / lmin >= np.pi _lminmin: float = 3.0 # l >= 3 yields sufficient modulation theoretically # _lminmin: float = 8.0 # l >= 8 yields sufficient modulation practically [Liu2014] # allowed values; take care to only use immutable types! # todo: is that so? _choices = { "grid": { "image", # "Cartesian", # "polar", # "log-polar", }, # todo: ("Cartesian", "polar", "log-polar") "dtype": { # "bool", # results are too unprecise "uint8", "uint16", "uint32", # "uint64", # todo: integer overflow in pyqtgraph -> replace line 685 of ImageItem.py with: bins = self._xp.arange(mn, mx + 1.01 * step, step, dtype="uint64") # "float16", # numba doesn't handle float16, also most algorithms convert float16 to float32 anyway "float32", "float64", }, "mode": {"fast", "precise", "MRD", "sRGB"}, # todo: , "NumPy", "Numba"), "h": {"w", "r", "g", "b", "c", "m", "y"}, } # default values are defined here; take care to only use immutable types (numpy arrays ARE mutable)! def __init__( self, *args, # bundles all args, what follows are only kwargs X: int = 1920, Y: int = 1200, axes: int | Sequence[int] = (1, 0), K: int = 2, N: int | Sequence[int] = ((4, 4), (4, 4)), # l: float | Sequence[float] | str = ((1920 / 13., 1920 / 7.), (1920 / 13., 1920 / 7.)), # inferred from v v: float | Sequence[float] | str = ((13.0, 7.0), (13.0, 7.0)), f: float | Sequence[float] = ((1.0, 1.0), (1.0, 1.0)), h: Sequence[int] | str = ((255, 255, 255),), p0: float = np.pi, g: float = 1.0, # A: float = 255 / 2, # i.e. Imax / 2 @ uint8; inferred from Imax and E # B: float = 255 / 2, # i.e. Imax / 2 @ uint8; inferred from Imax and E and V E: float = 0.5, V: float = 1.0, a: float = 1.0, bits: int = 8, dtype: str | np.dtype = "uint8", # inferred from bits grid: str = "image", # angle: float = 0.0, SDM: bool = False, WDM: bool = False, FDM: bool = False, static: bool = False, lmin: float = 8.0, reverse: bool = False, mode: str = "fast", # **kwargs, # bundles all further (hence undefined) kwargs; # # todo: raise error if not empty: # # __init__() got an unexpected keyword argument ): # given values which are in defaults but are not identical to them given = { k: v for k, v in sorted(locals().items()) if k in self._defaults and not np.array_equal(v, self._defaults[k]) } # sorted() ensures setting _params in the right order # set default values to private attributes used by properties self._mtf = None # empty cache self._UMR = None # empty cache self._m = None # empty cache self._crt = None # empty cache for k, v in self._defaults.items(): setattr( self, f"_{k}", np.array(v) if isinstance(v, tuple) else v ) # define private variables from which the properties get their values from # set given values (check is run in '_params.setter') self._params = given # precompute self.UMR # compute and cache; logs warning if necessary self.crt # compute and cache (also computes _m) def __call__(self, *args, **kwargs) -> np.ndarray: # noqa: D102 return self.encode(*args, **kwargs) def __iter__(self): # noqa: D105 self._t: int = 0 return self def __next__(self) -> np.ndarray: # noqa: D105 if self._t < self.T: I = self.encode()[0] self._t += 1 return I else: del self._t raise StopIteration() def __len__(self) -> int: # noqa: D105 return self.T def __eq__(self, other) -> bool: # noqa: D105 return other.__class__ is self.__class__ and self._params == other._params def __contains__(self, item): # noqa: D105 return hasattr(self, item) def __str__(self) -> str: # noqa: D105 return self.__class__.__name__ def __repr__(self) -> str: # noqa: D105 return f"{self._params}"
[docs] def reset(self) -> None: """Reset parameters of the `Fringes` instance to default values.""" self._params = self._defaults logger.info("Reset parameters to defaults.")
[docs] def load(self, fname: str | None = None) -> None: """Load parameters from a config file to the `Fringes` instance. .. warning:: The parameters are only loaded if the config file provides the section `fringes`. Parameters ---------- fname : str, optional File name of the file to load. Supported file formats are: *.json, *.yaml. If `fname` is not provided, the file `.fringes.yaml` within the user home directory is loaded. Examples -------- >>> import os >>> fname = os.path.join(os.getcwd(), "config.yaml") >>> from fringes import Fringes >>> f = Fringes() >>> f.N = f.N + 1 >>> f.save(fname) >>> f_ = Fringes() >>> f_.load(fname) >>> f_ == f True """ pname = self.__class__.__name__.lower() if fname is None: fname = os.path.join(os.getcwd(), "config.yaml") if not os.path.isfile(fname): logger.error(f"File '{fname}' does not exist.") return with open(fname, "r") as f: ext = os.path.splitext(fname)[-1] if ext == ".json": p = json.load(f) elif ext == ".yaml": p = yaml.safe_load(f) else: logger.error(f"Unknown file type '{ext}'.") return if pname in p: params = p[pname] self._params = params logger.info(f"Loaded parameters from '{fname}'.") else: logger.error(f"No '{pname}' section in file '{fname}'.")
[docs] def save(self, fname: str | None = None) -> None: """Save the parameters of the `Fringes` instance to a config file. Within the file, the parameters are written to the section `fringes`. Parameters ---------- fname : str, optional File name of the file to save. Supported file formats are: *.json, *.yaml. If `fname` is not provided, the parameters are saved to the file `.fringes.yaml` within the user home directory. Examples -------- >>> import os >>> fname = os.path.join(os.getcwd(), "config.yaml") >>> from fringes import Fringes >>> f = Fringes() >>> f.save(fname) """ pname = self.__class__.__name__.lower() if fname is None: fname = os.path.join(os.getcwd(), "config.yaml") if not os.path.dirname(fname): fname = os.path.join(os.getcwd(), fname) if not os.path.isdir(os.path.dirname(fname)): logger.error("File directory does not exist.") return name, ext = os.path.splitext(fname) if not ext: name, ext = ext, name if ext not in {".json", ".yaml"}: # toml does not allow the type 'None' logger.error("File extension '{ext}' is unknown. Must be '.json' or '.yaml'.") return if os.path.isfile(fname): logger.warning(f"File '{fname}' already exists and will be overwritten.") with open(fname, "w") as f: # todo: only overwrite section "fringes" if ext == ".json": json.dump({f"{pname}": self._params}, f, indent=4) elif ext == ".yaml": yaml.safe_dump({f"{pname}": self._params}, f) logger.info(f"Saved parameters to {fname}.")
# def optimize(self, T: int | None = None, umax: float | None = None) -> None: # todo: optimize # """Optimize the parameters of the `Fringes` instance. # # Parameters # ---------- # T : int, optional # Number of frames. # If `T` is not provided, the number of frames from the `Fringes` instance is used. # Then, the `Fringes` instance's number of shifts `N` is distributed optimally over the directions and sets. # umax : float, optional # Maximum allowable uncertainty (in pixel units). # Must be greater than zero. # Standard deviation of maximum uncertainty for measurement to be valid. # # Notes # ----- # todo: update # # If `umax` is specified, the parameters are determined # that allow a maximal uncertainty of `umax` # with a minimum number of frames. # # Else, the parameters of the `Fringes` instance are optimized to yield the minimal uncertainty # using the given number of frames `T`. # """ # # todo: compute based on given ui -> upi -> ux # # if T is not None: # _T = int(max(1, T)) # # if _T == 1: # WDM + SDM # # todo: FTM? # if self.grid not in self._choices["grid"][:2]: # logger.error(f"Couldn't set 'T = 1': grid not in {self._choices["grid"][:2]}'.") # return # # if self.H > 1: # self.h = self.h[0] # self.K = 1 # self.FDM = False # reset FDM before setting N # self.N = 3 # set N before WDM # self.WDM = True # if self.D == 2: # self.SDM = True # self.v = 1 # elif _T == 2: # WDM # if self.grid not in self._choices["grid"][:2]: # logger.error(f"Couldn't set 'T = 1': grid not in {self._choices["grid"][:2]}'.") # return # # if self.H > 1: # self.h = self.h[0] # self.D = 2 # self.K = 1 # self.FDM = False # reset FDM before setting N # self.SDM = False # self.N = 3 # set N before WDM # self.WDM = True # self.v = 1 # else: # # set boundaries # self.FDM = False # self.SDM = False # self.WDM = False # # # todo: T == 4 -> no modulation # # T == 5 -> FDM if _T >= self._Nmin? # # # try D == 2 # todo: mux # if _T < 2 * self._Nmin: # self.D = 1 # else: # self.D = 2 # # # try to keep 'H' # Hmax = _T // (self.D * self._Nmin) # if self.H > Hmax: # while _T % Hmax != 0: # Hmax -= 1 # self.h = self.h[:Hmax] # _T //= self.H # # # try to keep 'K' # Kmin = _T // (self.D * self._Nmin) # if self.K > Kmin: # self.K = Kmin # # # ensure UMR >= L # if self._ambiguous: # imin = np.argmin(self._v, axis=0) # self._v[imin] = 1 # # # set 'N' # N = np.full((self.D, self.K), _T // (self.D * self.K)) # dT = _T - np.sum(N) # if dT > 0: # k = int(dT // self.D) # N[:, :k] += 1 # if dT % self.D != 0: # d = dT % self.D # N[:d, k] += 1 # self.N = N # elif umax is not None: # ... # else: # ... # # vopt = self.vmax # todo: optimal v # K = np.log(self.Lext) / np.log(self.Lext / vopt) # lopt ** K = Lext # K = np.ceil(max(2, K)) # self.K = K # self.v = "optimal" # # if umax is not None: # umax -> T # self.N = int(np.median(self.N)) # make N const. # a = self.u.max() / umax # N = self.N * a**2 # self.N = np.maximum(3, np.ceil(N)) # # if self.u > umax: # ... # todo: check if umax is reached # else: # T -> u # if T is None: # T = self.T # # # distribute frames optimally (evenly) on shifts # # logger.info("Optimized parameters.") def _modulate(self, x: np.ndarray | tuple[np.ndarray]) -> np.ndarray: """Encode base fringe patterns by spatio-temporal modulation. Parameters ---------- x : np.ndarray | tuple[np.ndarray] Coordinates. Might be generated by `numpy.indices <https://numpy.org/doc/stable/reference/generated/numpy.indices.html>`_. Must have three dimensions at most. The first dimension resp. len(`x`) must match `D`. The shape of each element must be (`Y`, `X`). Returns ------- I : np.ndarray Base fringe patterns. """ t0 = time.perf_counter() # frames if hasattr(self, "_t"): frames = np.array([self._t % np.sum(self._N)]) if self.FDM: # to every frame add D*K-1 frames in distance N N = self._N[0, 0] frames = np.array([[t + N * i for i in range(self.D * self.K)] for t in frames]).ravel() if self.WDM: # WDM before SDM # for every frame take next N consecutive numbers => triple for N=3 N = 3 frames = np.array([[t * N + i for i in range(N)] for t in frames]).ravel() if self.SDM or self.uwr == "FTM": # WDM before SDM # to every frame add 1 in distance N0sum => double N0sum = np.sum(self._N[0]) frames = np.array([[t, t + N0sum] for t in frames]).ravel() else: frames = np.arange(np.sum(self._N)) # allocate T = len(frames) is_mixed_color = np.any((self.h != 0) * (self.h != 255)) dtype = np.dtype("float64") if self.SDM or self.FDM or is_mixed_color or self.uwr == "FTM" else self.dtype I = np.empty((T, self.Y, self.X), dtype) # Ncum = np.cumsum(self._N) # for t in frames: # j = np.argmax(t < Ncum) # # j = np.searchsorted(Ncum, t, side="right") # d, i = np.unravel_index(j, shape=(self.D, self.K)) # n = t - (Ncum[j] - Ncum[0]) # # x_ = (x[d] + self.x0) / self.Lext # normalize x to be within interval [0, 1) # k = _2PI * self._v[d, i] # w = _2PI * self._f[d, i] # # if "MRD" in self.mode: # k *= self.Lext / self.L[d] # # if self.reverse: # w *= -1 # # t_ = n / 4 if self._N[d, i] == 2 else n / self._N[d, i] # p = k * x_ - w * t_ - self.p0 # # I_t = self.E * (1 + self.V * np.cos(p)) # # if "sRGB" in self.mode: # I_t = np.where( # I_t <= 0.0031308, # I_t * 12.92, # 1.055 * I_t ** (1 / 2.4) - 0.055 # ) # elif self.g != 1: # I_t **= self.g # # if self.Imax != 1: # I_t *= self.Imax # # if dtype.kind in "ui": # todo: and rint: # np.rint(I_t, out=I_t) # # elif dtype.kind in "b": # # val = val >= 0.5 # # I[t] = I_t t = 0 frame = 0 for d, ax in enumerate(self.axes): x_ = (x[ax] + self.x0) / self.Lext # normalize x to be within interval [0, 1) for i in range(self.K): k = _2PI * self._v[d, i] w = _2PI * self._f[d, i] if "MRD" in self.mode: k *= self.Lext / self.L[d] if self.reverse: w *= -1 for n in range(self._N[d, i]): if frame in frames: t_ = n / 4 if self._N[d, i] == 2 else n / self._N[d, i] I_t = self.E * (1 + self.V * np.cos(k * x_ - w * t_ - self.p0)) if "sRGB" in self.mode: I_t = np.where(I_t <= 0.0031308, I_t * 12.92, 1.055 * I_t ** (1 / 2.4) - 0.055) elif self.g != 1: I_t **= self.g if self.Imax != 1: I_t *= self.Imax if dtype.kind in "ui": np.rint(I_t, out=I_t) # elif dtype.kind in "b": # val = val >= 0.5 I[t] = I_t t += 1 frame += 1 logger.debug(f"{(time.perf_counter() - t0) * 1000:.0f}ms") return I.reshape(-1, self.Y, self.X, 1) def _demodulate( self, I: np.ndarray, bmin: float = 0.0, Vmin: float = 0.0, unwrap: bool = True, verbose: bool | str = False, threads: int | None = None, spu_func: str = "ski", ) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """Decode base fringe patterns by spatio-temporal demodulation. Parameters ---------- I : np.ndarray Fringe pattern sequence. Must be in `vshape` (frames `T`, height `Y`, width `X`, color channels `C`). bmin : float Minimum modulation for measurement to be valid. Can accelerate decoding. Vmin : float Minimum visibility for measurement to be valid. Can accelerate decoding. unwrap : bool, default=True Flag for temporal phase unwrapping (TPU). verbose : bool or str, default=False Flag for returning intermediate and verbose results. threads : int, optional Number of threads to use. Default is all threads. If negative, this many threads will not be used. spu_func : {'ski', 'cv2'}, default='ski' Unwrapping function to use. - 'ski': `Scikit-image <https://scikit-image.org/docs/stable/auto_examples/filters/plot_phase_unwrap.html>`_ [1]_ - 'cv2': `OpenCV <https://docs.opencv.org/4.7.0/df/d3a/group__phase__unwrapping.html>`_ [2]_ Returns ------- a : np.ndarray Brightness: average signal. b : np.ndarray Modulation: amplitude of the cosine signal. p : np.ndarray Local phase. k : np.ndarray Fringe order. x : np.ndarray Coordinate: decoded screen coordinates. r : np.ndarray Residuals from the optimization-based unwrapping process. References ---------- .. [1] `Herráez et al., "Fast two-dimensional phase-unwrapping algorithm based on sorting by reliability following a noncontinuous path", Applied Optics, 2002. <https://doi.org/10.1364/AO.41.007437>`_ .. [2] `Lei et al., “A novel algorithm based on histogram processing of reliability for two-dimensional phase unwrapping”, Optik - International Journal for Light and Electron Optics, 2015. <https://doi.org/10.1016/j.ijleo.2015.04.070>`_ """ t0 = time.perf_counter() if self.uwr == "FTM": a, b, x = ftm(I, self.D) # todo: p, k, r elif "NumPy" in self.mode: a, b, p = temp_demod_numpy(I, self._N, self._f, self.p0) a_, b_, p_ = temp_demod_numpy_unknown_frequencies(I, self._N, self.p0) # isa = np.allclose(a, a_) # isb = np.allclose(b, b_) # isp = np.allclose(p, p_) if self.uwr == "temporal": NotImplementedError # todo: call tpu() from decoder (as ufunc?) # todo: x, k, r x = np.empty_like(a) k = np.empty_like(a) r = np.empty_like(a) else: try: max_threads = get_num_threads() if threads: threads = min(max(1 - max_threads, threads), max_threads) if threads < 0: threads += max_threads set_num_threads(threads) a, b, p, k, x, r = decode( I, self._N, self._v * self.Lext / self.L[:, None] if "MRD" in self.mode else self._v, -self._f if self.reverse else self._f, self.L, self.UMR, self.crt, self.gcd, self.x0, self.p0, bmin, Vmin, unwrap, self.mode, verbose, ) finally: set_num_threads(max_threads) # spatial unwrapping for d in range(self.D): if self.UMR[d] < self.L[d] * self.a: logger.warning("Unwrapping is not spatially independent and only yields a relative phase map.") # todo: self.K > 1 # if "precise" in self.mode: # w0 = self.N[d] * self.v[d] ** 2 * b.reshape(self.D, self.K, -1)[d].mean(axis=-1)**2 # i0 = np.argmax(w0) # precise # else: # i0 = np.argmin(self.v[d]) # fast # todo: unwrap phases via TPU as far as possible in order to reduce the number of phase transitions x[d] = spu(x[d], verbose=verbose, spu_func=spu_func) if "precise" in self.mode: w0 = self._N[d] * self._v[d] ** 2 i0 = np.argmax(w0) # precise else: i0 = np.argmin(self._v[d]) # fast x[d] *= self._l[d, i0] / _2PI if verbose: r[d, :] = np.nan # todo: from spu() if possible logger.debug(f"{(time.perf_counter() - t0) * 1000:.0f}ms") return a, b, p, k, x, r def _multiplex(self, I: np.ndarray) -> np.ndarray: """Multiplex fringe patterns. Parameters ---------- I : np.ndarray Base fringe patterns. Returns ------- Imux : np.ndarray Multiplexed fringe patterns. """ t0 = time.perf_counter() Imux = I if self.WDM: assert not self.FDM assert self._monochrome assert np.all(self.N == 3) Imux = Imux.reshape((-1, 3, self.Y, self.X, 1)) # returns a view Imux = Imux.swapaxes(1, -1) # returns a view Imux = Imux.reshape((-1, self.Y, self.X, self.C)) # returns a view if self.SDM: assert not self.FDM assert self.grid in {"image", "Cartesian"} # self._choices["grid"][:2] assert self.D == 2 assert I.dtype.kind == "f" Imux = Imux.reshape((self.D, -1)) # returns a view Imux -= self.A Imux = np.sum(Imux, axis=0) Imux += self.A Imux = Imux.reshape((-1, self.Y, self.X, self.C if self.WDM else 1)) # returns a view if self.dtype.kind in "uib": Imux = Imux.astype(self.dtype, copy=False) # returns a view if self.FDM: assert not self.WDM assert not self.SDM assert len(np.unique(self.N)) == 1 assert I.dtype.kind == "f" if np.any(self._N < 2 * np.abs(self._f).max() + 1): # todo: fractional periods logger.warning("Decoding might be disturbed.") Imux = Imux.reshape((self.D * self.K, -1)) # returns a view Imux -= self.A Imux = np.sum(Imux, axis=0) Imux += self.A Imux = Imux.reshape((-1, self.Y, self.X, 1)) # returns a view if self.dtype.kind in "uib": Imux = Imux.astype(self.dtype, copy=False) # returns a view logger.debug(f"{(time.perf_counter() - t0) * 1000:.0f}ms") return Imux def _demultiplex(self, I: np.ndarray) -> np.ndarray: """Demultiplex fringe patterns. Parameters ---------- I : np.ndarray Multiplexed fringe patterns. Returns ------- I : np.ndarray Demultiplexed fringe patterns. """ t0 = time.perf_counter() T, Y, X, C = I.shape # extract Y, X, C from data as these parameters depend on used camera if self.SDM: assert not self.FDM assert self.grid in {"image", "Cartesian"} # self._choices["grid"][:2] assert self.D == 2 fx = np.fft.fftshift(np.fft.rfftfreq(X)) fy = np.fft.fftshift(np.fft.fftfreq(Y)) fxx, fyy = np.meshgrid(fx, fy) mx = np.abs(fxx) >= np.abs(fyy) my = np.abs(fxx) <= np.abs(fyy) J = I I = np.empty((2 * T, Y, X, C)) for t in range(T): for c in range(C): I_FFT = np.fft.fftshift(np.fft.rfft2(J[t, :, :, c])) I[t, :, :, c] = np.fft.irfft2(np.fft.ifftshift(I_FFT * mx), s=(Y, X)) I[T + t, :, :, c] = np.fft.irfft2(np.fft.ifftshift(I_FFT * my), s=(Y, X)) if self.WDM: assert not self.FDM assert C == 3 I = I.reshape((-1, 1, Y, X, C)) # returns a view I = I.swapaxes(-1, 1) # returns a view I = I.reshape((-1, Y, X, 1)) # returns a view if self.FDM: assert not self.WDM assert not self.SDM # todo: allow self.SDM? assert len(np.unique(self.N)) == 1 I = np.tile(I, (self.D * self.K, 1, 1, 1)) logger.debug(f"{(time.perf_counter() - t0) * 1000:.0f}ms") return I def _colorize(self, I: np.ndarray) -> np.ndarray: """Colorize fringe patterns. Parameters ---------- I : np.ndarray Base fringe patterns, possibly multiplexed. Returns ------- Icol : np.ndarray Colorized fringe patterns. """ t0 = time.perf_counter() if hasattr(self, "_t"): hues = np.array([self._t // np.sum(self._N)]) else: hues = np.arange(self.H) T = len(hues) * len(I) Icol = np.empty((T, self.Y, self.X, self.C), self.dtype) t = 0 for h in hues: for frame in I: for c in range(self.C): cb = c if self.WDM else 0 if self.h[h, c] == 0: # uibf -> uibf Icol[t, :, :, c] = 0 elif self.h[h, c] == 255 and Icol.dtype == self.dtype: # uibf -> uibf Icol[t, :, :, c] = frame[:, :, cb] elif self.dtype.kind in "uib": # f -> uib Icol[t, :, :, c] = np.rint(frame[:, :, cb] * (self.h[h, c] / 255)) else: # f -> f Icol[t, :, :, c] = frame[:, :, cb] * (self.h[h, c] / 255) t += 1 logger.debug(f"{(time.perf_counter() - t0) * 1000:.0f}ms") return Icol def _decolorize(self, I: np.ndarray) -> np.ndarray: """Decolorize fringe patterns by weighted averaging of hues. Parameters ---------- I : np.ndarray Colorized fringe patterns. Must be in `vshape` (frames `T`, height `Y`, width `X`, color channels `C`). Returns ------- I : np.ndarray Decolorized fringe patterns. """ t0 = time.perf_counter() T, Y, X, C = I.shape # extract Y, X, C from data as these parameters depend on used camera I = I.reshape((self.H, T // self.H, Y, X, C)) # returns a view solo = np.all(np.count_nonzero(self.h, axis=0) == 1) # each RGB component exists exactly once base = np.all(np.count_nonzero(self.h, axis=1) == 1) # each hue consists of only one RGB base color same = len(set(self.h[self.h != 0])) == 1 # all color channels have the same value # if solo and base: no averaging necessary # if same: all weights are the same if self.H == 3 and C in [1, 3] and solo and base and same: I = np.moveaxis(I, 0, -2) # returns a view # basic slicing returns a view idx = np.argmax(self.h, axis=1) if np.array_equal(idx, [0, 2, 1]): # RBG I = I[:, :, :, 0::-1, :] elif np.array_equal(idx, [1, 2, 0]): # GBR I = I[:, :, :, 1:1:, :] elif np.array_equal(idx, [1, 0, 2]): # GRB I = I[:, :, :, 1:1:-1, :] elif np.array_equal(idx, [2, 1, 0]): # BGR I = I[:, :, :, 2::-1, :] elif np.array_equal(idx, [2, 0, 1]): # BRG I = I[:, :, :, 2:2:-1, :] # elif np.array_equal(idx, [0, 1, 2]): # RGB # I = I[:, :, :, :, :] if C == 1: I = I[:, :, :, :, 0] # returns a view elif C == 3: I = np.diagonal(I, axis1=-2, axis2=-1) # returns a view elif self.H == 2 and C in [1, 3] and base and same: I = np.moveaxis(I, 0, -2) # returns a view # advanced indexing returns a copy, not a view if C == 1: idx = np.argmax(self.h, axis=0) I = I[:, :, :, idx, :] I = I[:, :, :, 0] # returns a view elif C == 3: idx = self.h != 0 I = I[:, :, :, idx] else: # fuse colors by weighted averaging, # this assumes constant camera exposure settings for each set of hues w = self.h / np.sum(self.h, axis=0) # normalized weights w[np.isnan(w)] = 0 if C == 1 and same: w = w[:, 0][:, None] # ensures that the result has only one color channel # if np.all((w == 0) | (w == 1)): # doesn't happen due to testing for 'solo' and 'base' # w = w.astype(bool, copy=False) # multiplying with bool preserves dtype # dtype = I.dtype # without this, np.sum chooses a dtype which can hold the theoretical maximal sum # else: dtype = float # without this, np.sum chooses a dtype which can hold the theoretical maximal sum, i.e. uint32 or uint64 with problems in pyqtgraph's I = np.sum(I * w[:, None, None, None, :], axis=0, dtype=dtype) logger.debug(f"{(time.perf_counter() - t0) * 1000:.0f}ms") return I
[docs] def encode(self) -> np.ndarray: """Encode fringe patterns. Returns ------- I : np.ndarray Fringe pattern sequence. Notes ----- To receive the frames iteratively (i.e. in a lazy manner), simply iterate over the Fringes instance. Examples -------- >>> from fringes import Fringes >>> f = Fringes() Encode the complete fringe pattern sequence. >>> I = f.encode() Call the instance to encode the complete fringe pattern sequence. >>> I = f() Create a generator to receive the frames iteratively, i.e. in a lazy manner. >>> I = (frame for frame in f) """ t0 = time.perf_counter() I = self._modulate(self._x) if self.SDM or self.WDM or self.FDM or self.uwr == "FTM": I = self._multiplex(I) # reduces number of frames if self.grid in ["polar", "log-polar"]: I *= grid.inner_circ(self.Y, self.X)[None, :, :, None] if self.H > 1 or np.any(self.h != 255): # can be used for extended averaging I = self._colorize(I) logger.info(f"{(time.perf_counter() - t0) * 1000:.0f}ms") return I
[docs] def decode( self, I: np.ndarray, bmin: float = 0.0, Vmin: float = 0.0, unwrap: bool = True, verbose: bool | str = False, check_overexposure: bool = False, threads: int | None = None, spu_func: str = "ski", ) -> _Decoded | _Decoded_verbose: r"""Decode fringe patterns. Parameters ---------- I : np.ndarray Fringe pattern sequence. It is reshaped to `vshape` (frames `T`, height `Y`, width `X`, color channels `C`) before processing. .. note:: `I` must have been encoded with the same parameters set to the Fringes instance as the current one. bmin : float, default=0 Minimum modulation for measurement to be valid. Can accelerate decoding. Vmin : float, default=0 Minimum visibility for measurement to be valid. Can accelerate decoding. unwrap : bool, default=True Flag for unwrapping. verbose : bool or str, default=False Flag for returning intermediate and verbose results. check_overexposure: bool, default=False Flag for checking if 'I' is overexposed. Logs a warning message if so. threads : int, optional Number of threads to use. Default is all threads. If negative, this many threads will not be used. spu_func : {'ski', 'cv2'}, default='ski' Spatial unwrapping function to use. - 'ski': `Scikit-image <https://scikit-image.org/docs/stable/auto_examples/filters/plot_phase_unwrap.html>`_ [1]_ - 'cv2': `OpenCV <https://docs.opencv.org/4.7.0/df/d3a/group__phase__unwrapping.html>`_ [2]_ Returns ------- a : np.ndarray Brightness: average signal. b : np.ndarray Modulation: amplitude of the cosine signal. x : np.ndarray Registration: decoded screen coordinates. p : np.ndarray, optional Local phase. k : np.ndarray, optional Fringe order. r : np.ndarray, optional Residuals from the optimization-based unwrapping process. u : np.ndarray, optional Uncertainty of positional decoding in pixel units Raises ------ ValueError If the number of frames of `I` and the attribute `T` of the `Fringes` instance don't match. If `WDM` is active but `I` does not have 3 color channels. References ---------- .. [1] `Herráez et al., "Fast two-dimensional phase-unwrapping algorithm based on sorting by reliability following a noncontinuous path", Applied Optics, 2002. <https://doi.org/10.1364/AO.41.007437>`_ .. [2] `Lei et al., “A novel algorithm based on histogram processing of reliability for two-dimensional phase unwrapping”, Optik - International Journal for Light and Electron Optics, 2015. <https://doi.org/10.1016/j.ijleo.2015.04.070>`_ Examples -------- >>> from fringes import Fringes >>> f = Fringes() >>> I = f.encode() >>> Irec = I # todo: replace this line with the recorded data (cf. example in 'record.py') >>> a, b, x = f.decode(Irec) Also return intermediate and verbose results: >>> a, b, x, p, k, r, u = f.decode(Irec, verbose=True) """ t0 = time.perf_counter() # parse dtype 'object' if I.dtype == "O": I = np.array([frame for frame in I]) # creates a copy # todo: without a copy # dtype = I[0].dtype # dtype = float # I = I.astype(dtype) # I = I.astype(dtype, copy=False) # get and apply video-shape I = vshape(I) T, Y, X, C = I.shape # extract Y, X, C from data as these parameters depend on used camera # check for overexposure if check_overexposure: if I.dtype.kind in "ui": Imax = I.max() lessbits = True # todo: lessbits if lessbits and np.iinfo(I.dtype).bits > 8: # data may contain fewer bits of information bits = int(np.ceil(np.log2(Imax + 1))) # same or next power of two bits += -bits % 2 # same or next power of two which is divisible by two maxval = 2**bits - 1 if maxval < Imax: # if maxval = Imax else: maxval = np.iinfo(I.dtype).max if np.count_nonzero(I == maxval) > 0.1 * I.size: logger.warning("'I' is probably overexposed and decoding might yield unreliable results.") # check number of frames if len(I) != self.T: raise ValueError("Number of frames of data and parameters don't match.") # check color channels if self.WDM and C != 3: raise ValueError( f"'I' must have 3 color channels because 'WDM' is active, but has only {C} color channels." ) # decolorize (fuse hues/colors); for gray fringes, color fusion is not performed, but extended averaging is if self.H > 1 or not self._monochrome: I = self._decolorize(I) # demultiplex if self.SDM and 1 not in self.N or self.WDM or self.FDM: I = self._demultiplex(I) # demodulate bmin = float(max(0, bmin)) Vmin = float(min(max(0, Vmin), 1)) a, b, p, k, x, r = self._demodulate(I, bmin, Vmin, unwrap, verbose, threads, spu_func=spu_func) if verbose: u = self.uncertainty(b=b) # # blacken where color value of hue was black # if self.H > 1 and C == 3: # idx = np.sum(self.h, axis=0) == 0 # if np.any(idx): # blacken where color value of hue was black # a[:, :, :, idx] = 0 # b[:, :, :, idx] = 0 # x[:, :, :, idx] = np.nan # if verbose: # r[:, :, :, idx] = np.nan # u[:, :, :, idx] = np.nan # self.L / np.sqrt(12) # todo: circular distribution # p[:, :, :, idx] = np.nan # # coordinate retransformation # todo: test # if self.grid == "Cartesian": # for d, ax in enumerate(self.axes): # if ax == 0: # x[d] -= self.L[d] / 2 - .5 # elif ax == 1: # x[d] = np.where(x[d] < self.L[d] / 2, x[d] + self.L[d] / 2 - .5, x[d] - self.L[d] / 2 - .5) # if self.D == 2: # # todo: swapaxes # if self.grid == "Cartesian": # if self.X >= self.Y: # x[0] += self.X / 2 - 0.5 # x[0] %= self.X # x[1] *= -1 # x[1] += self.Y / 2 - 0.5 # x[1] %= self.X # else: # x[0] += self.X / 2 - 0.5 # x[0] %= self.Y # x[1] *= -1 # x[1] += self.Y / 2 - 0.5 # x[1] %= self.Y # # todo: polar, logpolar, spiral # # if self.angle != 0: # # t = np.deg2rad(-self.angle) # # # # if self.angle % 90 == 0: # # c = np.cos(t) # # s = np.sin(t) # # L = np.array([[c, -s], [s, c]]) # # # L = np.matrix([[c, -s], [s, c]]) # # ur = L[0, 0] * x[0] + L[0, 1] * x[1] # # vr = L[1, 0] * x[0] + L[1, 1] * x[2] # # # u = np.dot(uu, L) # todo: matrix multiplication # # # v = np.dot(vv, L) # # else: # # tan = np.tan(t) # # ur = x[0] - x[1] * np.tan(t) # # vr = x[0] + x[1] / np.tan(t) # # # # vv = (x[1] - x[0]) / (1 / tan - tan) # # uu = x[0] + vv * tan # # x = np.stack((uu, vv), axis=0) # # x = np.stack((ur, vr), axis=0) logger.info(f"{(time.perf_counter() - t0) * 1000:.0f}ms") # return namedtuple if verbose: return _Decoded_verbose(a, b, x, p, k, r, u) else: return _Decoded(a, b, x)
[docs] def uncertainty( self, ui: float | np.ndarray = np.sqrt(1 / 12), b: float | np.ndarray | None = None, a: float | np.ndarray | None = None, K: float | None = None, dark_noise: float | None = None, ) -> np.ndarray: # todo: :attr:`~fringes.fringes.Fringes.u` denotes the minimum possible uncertainty of the measurement in pixels. # It is based on the phase noise model from [Sur97]_ # and propagated through the unwrapping process and the phase fusion. # It is influenced by the parameters # # - :attr:`~fringes.fringes.Fringes.M`: number of averaged intensity samples, # - :attr:`~fringes.fringes.Fringes.N`: number of phase shifts, # - :attr:`~fringes.fringes.Fringes.l`: wavelengths, # - `\hat{B}`: measured modulation and # - `\hat{u_I}`: intensity noise (caused by the measurement hardware [EMV]_, [Bot08]_). # # .. - :attr:`~fringes.fringes.Fringes.quant`: quantization noise of the light source or camera, # - :attr:`~fringes.fringes.Fringes.dark`: dark noise of the used camera, # - :attr:`~fringes.fringes.Fringes.shot`: photon noise of light itself, # - :attr:`~fringes.fringes.Fringes.gain`: system gain of the used camera. """Uncertainty of positional decoding, in pixel units. Using inverse variance weighting in unwrapping and assuming no fringe order errors. Parameters ---------- ui: float | np.ndarray, default=np.sqrt(1 / 12) Standard deviation of intensity noise. The default assumes only quantization noise. If np.ndarray, it must have image shape (Y, X, C). b : float | np.ndarray, optional Modulation. If np.ndarray, it must have video shape (T, Y, X, C) with T = D * K. a : float | np.ndarray, optional Offset (mean number of gray values). If np.ndarray, it must have video shape (T, Y, X, C) with T = D. K : float, optional System gain of used camera, in units DN/electron. dark_noise : float, optional Dark noise of used camera, in units electrons. Returns ------- u : np.ndarray Uncertainty of positional decoding, in pixel units. Note ---- If `a`, `K` and `dark_noise` are given, `ui` is ignored and calculated from them. In any case, if either `ui`, `b` or `a` is a numpy.ndarray, they must be shape consistent and hence their shape must end in the same (Y, X, C) values (the function takes care of those who are in front). Examples -------- >>> from fringes import Fringes >>> f = Fringes() Assuming only quantization noise and using `f.B` as `b` (both by default): >>> u = f.uncertainty() Setting `ui` and `b` with floats: >>> u = f.uncertainty(ui=2.28, b=100) Using decoded values (numpy.ndarrays) and camera parameters: >>> I = f.encode() >>> Irec = I # todo: replace this line with recording data (cf. example in 'record.py') >>> a, b, x = f.decode(Irec) >>> u = f.uncertainty(b=b, a=a, K=0.038, dark_noise=13.7) """ t0 = time.perf_counter() if b is None: b = self.B elif isinstance(b, np.ndarray): # make b shape consistent Tb, Yb, Xb, Cb = vshape(b).shape assert Tb == self.D * self.K b = b.reshape(self.D, self.K, Yb, Xb, Cb) # check if b and (a or ui) are shape consistent if isinstance(a, np.ndarray): assert vshape(a).shape == (self.D, Yb, Xb, Cb) elif isinstance(ui, np.ndarray): assert vshape(ui).shape == (1, Yb, Xb, Cb) if a is not None and K is not None and dark_noise is not None: if isinstance(a, np.ndarray): # make a shape consistent Ta, Ya, Xa, Ca = vshape(a).shape assert Ta == self.D a = a.reshape(Ta, 1, Ya, Xa, Ca) # compute ui d = dark_noise * K q = np.sqrt(1 / 12) p = np.sqrt(a / K) * K ui = np.sqrt(d**2 + q**2 + p**2) elif isinstance(ui, np.ndarray): # make ui shape consistent Tui, Yui, Xui, Cui = vshape(ui).shape assert Tui == 1 ui = ui.reshape(1, 1, Yui, Xui, Cui) if isinstance(ui, np.ndarray) or isinstance(b, np.ndarray) or isinstance(a, np.ndarray): # make N and l shape consistent N = self._N[:, :, None, None, None] l = self._l[:, :, None, None, None] else: N = self._N l = self._l up = np.sqrt(2) / np.sqrt(self.M) / np.sqrt(N) * ui / b # phase uncertainties # todo: M ux = up / _2PI * l # position uncertainties u = np.sqrt(1 / np.sum(1 / ux**2, axis=1)) # position uncertainty logger.debug(f"{(time.perf_counter() - t0) * 1000:.0f}ms") return u.astype(np.float32, copy=False)
def _trim(self, a: np.ndarray) -> np.ndarray: """Change ndim of `a` to 2, i.e. its shape to ('D', 'K').""" if a.ndim == 0: b = np.full((self.D, self.K), a) elif a.ndim == 1: b = np.array([a[: self._Kmax] for d in range(self.D)]) elif a.ndim == 2: b = a[: self._Dmax, : self._Kmax] else: raise ValueError("Only 0 up to 2 dimensions are allowed.") return b def _set_mtf(self, b: np.ndarray): """Set the modulation transfer function (MTF). Parameters ---------- b : np.ndarray Measured modulation. Raises ------ ValueError If 'b' contains negative values. If length of `b` is not equal to `D` * `K`. Notes ----- The normalized modulation transfer function is a cubic spline interpolator. It is set to the private attribute '_mtf' and used when the method 'mtf()' is called. """ if b is None: self._mtf = None logger.debug("Reset modulation transfer function.") return if np.any(b < 0): raise ValueError("Negative values for 'b' are not allowed.") b = vshape(b) T, Y, X, C = b.shape # extract Y, X, C from data as these parameters depend on used camera if T != self.D * self.K: raise ValueError(f"Number of frames must be {self.D * self.K}.") b = b.reshape(self.D, self.K, Y, X, C) # filter if C > 1: b = np.nanmedian(b, axis=-1) # filter along color axis b = np.nanmedian(b, axis=(2, 3)) # filter along spatial axes # b = np.nanquantile(b, 0.9, axis=(2, 3)) # filter along spatial axes b[np.isnan(b)] = 0 i, v = np.unique(self._v, return_index=True, axis=1) b = b[i] self._mtf = [sp.interpolate.CubicSpline(v[d], b[d], extrapolate=True) for d in range(self.D)] logger.debug("Set modulation transfer function.")
[docs] def mtf(self, v: Sequence[float] | None = None, PSF: float = 0.0) -> np.ndarray: """Modulation Transfer Function. Normalized to values ∈ [0, 1]. Parameters ---------- v: array_like Spatial frequencies at which to determine the normalized modulation. PSF: float, default=0 Standard deviation of the point spread function. Returns ------- b : np.ndarray Relative modulation, at spatial frequencies 'v'. 'b' is in the same shape as 'v'. Raises ------ ValueError If 'v' contains negative numbers. Notes ----- - If the private attribute `_mtf` of the Fringes instance is not None, the MTF is interpolated from previous measurements.\n - Else, if `PSF` is larger than zero, the MTF is computed from the optical transfer function of the optical system, i.e. as the magnitude of the Fourier-transformed 'Point Spread Function' (PSF).\n - Else, an approximation is used [1]_. References ---------- .. [1] `Bothe, "Grundlegende Untersuchungen zur Formerfassung mit einem neuartigen Prinzip der Streifenprojektion und Realisierung in einer kompakten 3D-Kamera", Dissertation, ISBN 978-3-933762-24-5, BIAS Bremen, 2008. <https://www.amazon.de/Grundlegende-Untersuchungen-Formerfassung-Streifenprojektion-Strahltechnik/dp/3933762243/ref=sr_1_2?qid=1691575452&refinements=p_27%3AThorsten+B%C3%B6th&s=books&sr=1-2>`_ """ # todo: test mtf() if v is None: v = self.v v = np.array(v, float, copy=False, ndmin=2) if np.any(v < 0): raise ValueError("Negative numbers are not allowed.") D, K = v.shape if self._mtf is not None: # interpolate from (previous) measurement b = np.array([self._mtf[d](v[d]) for d in range(D)]).reshape(D, K) bmax = np.array([self._mtf[d](0) for d in range(D)]) b /= bmax np.clip(b, 0, 1, out=b) # todo: self._mtf = None when D changes or (D == 1 and axis changes) elif PSF > 0: # determine MTF from PSF b = self.B * np.exp(-2 * (np.pi * PSF / self.Lmax * v) ** 2) # todo: fix # todo: what is smaller: dl or lv? else: b = 1 - v / self.vmax # approximation of [Bothe2008] return b
@property def _Td(self) -> np.ndarray: """Number of frames per direction.""" if self.uwr == "FTM": return np.array([0.5, 0.5]) # todo: D = 1 ? Td = self.H * np.sum(self._N, axis=1, dtype=float) if self.FDM: # todo: fractional periods Td /= self.D * self.K if self.SDM: Td /= self.D if self.WDM: if np.all(self.N == 3): # WDM along shifts Td /= 3 # elif self.K > self.D: # WDM along sets todo # a = np.sum(self._N, axis=1) # b = np.max(a) # c = np.ceil(b / 3) # d = int(c) # # a2 = np.sum(self._N, axis=0) # b2 = np.max(a2) # c2 = np.ceil(b2 / 2) # d2 = int(c2) # # if d < d2: # T = int(np.ceil(np.max(np.sum(self._N, axis=1)) / 3)) # else: # use red and blue # T = int(np.ceil(np.max(np.sum(self._N, axis=0)) / 2)) # else: # WDM along directions (use red and blue) todo # a = np.sum(self._N, axis=0) # b = np.max(a) # c = np.ceil(b / 2) # d = int(c) # T = int(np.ceil(np.max(np.sum(self._N, axis=1)) / 2)) return Td @property def T(self) -> int: """Number of frames.""" return int(np.sum(self._Td)) # use int() to ensure type is "int" instead of "numpy.core.multiarray.scalar" @property def Y(self) -> int: """Height of fringe patterns (in pixel units).""" return self._Y @Y.setter def Y(self, Y: int): _Y = int(max(1, Y)) if self._Y != _Y: self._Y = _Y logger.debug(f"{self._Y=}") if self._Y >= 2**20: logger.warning(f"Height 'Y' exceeds 'OPENCV_IO_MAX_IMAGE_HEIGHT': {self._Y} > 2**20.") if self._Y * self._X >= 2**30: logger.warning( f"Frame size 'Y' * 'X' exceeds 'OPENCV_IO_MAX_IMAGE_PIXELS': {self._X * self._Y} > 2**30." ) self._UMR = None # empty cache self.UMR # compute and cache @property def X(self: int) -> int: """Width of fringe patterns (in pixel units).""" return self._X @X.setter def X(self, X: int): _X = int(max(1, X)) if self._X != _X: self._X = _X logger.debug(f"{self._X=}") if self._Y >= 2**20: logger.warning(f"Width 'X' exceeds 'OPENCV_IO_MAX_IMAGE_WIDTH': {self._X} > 2**20.") if self._Y * self._X >= 2**30: logger.warning( f"Frame size 'Y' * 'X' exceeds 'OPENCV_IO_MAX_IMAGE_PIXELS': {self._X * self._Y} > 2**30." ) self._UMR = None # empty cache self.UMR # compute and cache @property def C(self) -> int: """Number of color channels.""" return 3 if self.WDM or not self._monochrome else 1 @property def shape(self) -> tuple[int, int, int, int]: """Shape of fringe pattern sequence in video shape (frames, height, with, color channels).""" return self.T, self.Y, self.X, self.C @property def L(self) -> np.ndarray: """Coding lengths, i.e. lengths of fringe patterns for each direction (in pixel units).""" return np.array([self.Y, self.X])[self.axes] @property def Lmax(self) -> int: """Maximum coding length (in pixel units).""" return self.L.max() @property def a(self) -> float: """Factor for extending the coding length.""" # a = 1 + 2 * x0 / self.Lmax return self._a @a.setter def a(self, a: float): _a = float(min(max(1, a), self._amax)) if self._a != _a: self._a = _a logger.debug(f"{self._a=}") logger.debug(f"{self.x0=}") logger.debug(f"self._l = {str(self._l.round(3)).replace(chr(10), ',')}") self._UMR = None # empty cache self.UMR # compute and cache @property def Lext(self) -> float: """Extended coding length (in pixel units).""" return self.a * self.Lmax @property def grid(self) -> str: """Coordinate system of the fringe patterns.""" # The following values can be set:\n # - 'image': The top left corner pixel of the grid is the origin and positive directions are downwards and to the right.\n # - 'Cartesian': The center of grid is the origin and positive directions are right- and upwards.\n # - 'polar': The center of grid is the origin and positive directions are clockwise resp. outwards.\n # - 'log-polar': The center of grid is the origin and positive directions are clockwise resp. outwards. return self._grid @grid.setter def grid(self, grid: Literal["image"]): # todo: , "Cartesian", "polar", "log-polar"]): _grid = str(grid) if (self.SDM or self.uwr == "FTM") and self.grid not in {"image", "Cartesian"}: # self._choices["grid"][:2]: logger.error(f"Couldn't set 'grid': 'grid' not in {{{'image', 'Cartesian'}}}.") # self._choices["grid"][:2] return if self._grid != _grid and _grid in self._choices["grid"]: self._grid = _grid logger.debug(f"{self._grid=}") self.SDM = self.SDM # @property # def angle(self) -> float: # """Angle of the coordinate system's principal axis.""" # return self._angle # # @angle.setter # def angle(self, angle: float): # _angle = float(np.remainder(angle, 360)) # todo: +- 45 # # if self._angle != _angle: # self._angle = _angle # logger.debug(f"{self._angle=}") @property def axes(self) -> np.ndarray: """Axes along which to encode.""" # using len(self._axes) instead of self.D, else recursion when MRD in mode return np.array([0, 1])[: len(self._axes)] if "MRD" in self.mode else self._axes @axes.setter def axes(self, axes: int | Sequence[int]): _axes = np.atleast_1d(np.array(axes, int))[: self._Dmax] if not set(_axes) <= {0, 1}: # if not subset logger.error("Couldn't set 'axes': Must consist of only zeros or ones.") return if len(_axes) != len(set(_axes)): logger.error("Each element must only exist once.") return if not np.array_equal(self._axes, _axes): self._axes = _axes logger.debug(f"{self._axes=}") Dold = self._N.shape[0] if Dold > self.D: self.N = self._N[0, :] # triggers UMR self.v = self._v[0, :] # triggers UMR self.f = self._f[0, :] if Dold == self.K == 1: self.FDM = False self.SDM = False # adjusts V elif Dold < self.D: self.N = np.append(self._N, self._N, axis=0) # triggers UMR self.v = np.append(self._v, self._v, axis=0) # triggers UMR self.f = np.append(self._f, self._f, axis=0) self.V = self.V # checks clipping self._UMR = None # empty cache self.UMR # compute and cache @property def D(self) -> int: """Number of directions.""" return len(self.axes) @D.setter # @deprecated("No longer needed since 'axes' was introduced in version 2.0.1, but kept for backwards compatibility.") def D(self, D: int): _D = int(min(max(1, D), self._Dmax)) if self.D > _D: self.axes = self.axes[0] elif self.D < _D: self.axes = np.append(self.axes, 1 - self.axes, axis=0) @property def _x(self) -> tuple[np.ndarray] | np.ndarray: """Coordinate matrices of the coordinate system defined in `grid`. If possible, a sparse representation of the grid is returned. """ if self.grid == "image": # todo: and self.angle == 0: x = np.indices((self.Y, self.X), sparse=True) # ij-indexing else: if self.grid == "image": sys = "img" elif self.grid == "Cartesian": sys = "cart" elif self.grid == "polar": sys = "polar" else: sys = "logpol" # x = np.array(getattr(grid, sys)(self.Y, self.X, self.angle)) x = np.array(getattr(grid, sys)(self.Y, self.X, 0)) # xy-indexing # todo: spiral angle 45 if self.grid in ["polar", "log-polar"]: x *= self.Lext return x @property def x(self) -> np.ndarray: """Coordinate matrices of the coordinate system defined in `grid`. This is always a fleshed out representation. 'axes' defines the order in which the coordinate matrices appear. An additional axis is added for the color channel, which is useful when running tests so that 'x' is shape consistent/broadcastable to 'dec.x' (the latter has a color channel). """ _x = np.array(np.broadcast_arrays(*self._x)) if isinstance(self._x, tuple) else self._x return _x[self.axes, :, :, None].astype(float, copy=False) @property def x0(self) -> float: """Coordinate offset.""" # return self.Lmax * (self.a - 1) / 2 return (self.Lext - self.Lmax) / 2 @property def bits(self) -> int: """Number of bits.""" return self._bits @bits.setter def bits(self, bits: float): self._bits = min(max(0, bits), 64) # max uint64 logger.debug(f"{self.bits = }") logger.debug(f"{self.dtype = }") logger.debug(f"{self.Imax = }") @property def dtype(self) -> np.dtype: """Data type which can hold 2**`bits` of information.""" if self.bits == 0: dtype = np.dtype(float) else: bits = int(np.ceil(np.log2(2**self.bits - 1))) # next power of two bits += -bits % 8 # next power of two divisible by 8 dtype = np.dtype(f"uint{bits}") return dtype @dtype.setter def dtype( self, dtype: np.dtype | Literal[ "uint8", "uint16", "uint32", "float32", "float64", ], ): _dtype = np.dtype(dtype) if self._dtype != _dtype and str(_dtype) in self._choices["dtype"]: self._dtype = _dtype logger.debug(f"{self._dtype=}") self._bits = np.iinfo(_dtype).bits if _dtype.kind in "ui" else 0 logger.debug(f"{self._bits=}") logger.debug(f"{self.Imax=}") logger.debug(f"self.A = {self.A}") logger.debug(f"self.B = {self.B}") @property def Imax(self) -> int: """Maximum gray value.""" # return np.iinfo(self.dtype).max if self.dtype.kind in "ui" else 1 return 2**self.bits - 1 if self.dtype.kind in "ui" else 1 @property def _Amin(self): """Minimum offset.""" return self.B / self._Vmax @property def _Amax(self): """Maximum offset.""" return self.Imax - self._Amin @property def A(self) -> float: """Offset.""" return self.Imax * self.E @A.setter def A(self, A: float): _A = float(min(max(self._Amin, A), self._Amax)) if self.A != _A: self.E = _A / self.Imax @property def _Bmax(self): """Maximum amplitude.""" return min(self.A, self.Imax - self.A) * self._Vmax @property def B(self) -> float: """Amplitude.""" return self.A * self.V @B.setter def B(self, B: float): _B = float(min(max(0, B), self._Bmax)) if self.B != _B: # and _B != 0: self.V = _B / self.A @property def _Vmax(self): """Maximum visibility.""" if self.FDM: return 1 / (self.D * self.K) elif self.SDM: return 1 / self.D else: return 1 @property def V(self) -> float: """Fringe visibility (fringe contrast) ∈ [0, 1]. Screens are extremely nonlinear near the minimum and maximum values; we can avoid them by setting V to e.g. 0.95. """ return self._V @V.setter def V(self, V: float): _V = float(min(max(0, V), self._Vmax)) if self._V != _V: self._V = _V logger.debug(f"{self.V=}") logger.debug(f"{self.B=}") @property def _Emax(self): """Maximum relative offset (exposure).""" return 1 / (1 + self.V) @property def E(self) -> float: """Relative exposure, i.e. relative mean intensity ∈ [0, 1].""" return self._E @E.setter def E(self, E) -> float: _E = float(min(max(0, E), self._Emax)) if self._E != _E: self._E = _E logger.debug(f"{self.E=}") logger.debug(f"{self.A=}") @property def g(self) -> float: """Gamma correction factor used to compensate nonlinearities of the display response curve.""" return self._g @g.setter def g(self, g: float): _g = float(min(max(0, g), self._gmax)) if self._g != _g and _g != 0: self._g = _g logger.debug(f"{self._g=}") @property def _Kmax(self) -> int: """Maximum number of sets.""" return ( int(self.Imax / 2) if (self.SDM or self.FDM) and self.dtype != float else sys.maxsize ) # np.inf instead of sys.maxsize but then it's float @property def K(self) -> int: """Number of sets (number of fringe patterns with different spatial frequencies).""" return self._K @K.setter def K(self, K: int): _K = int(min(max(1, K), self._Kmax)) isVmax = self.V == self._Vmax if self._K > _K: # remove elements self._K = _K logger.debug(f"{self._K=}") self.N = self._N[:, : self.K] self.v = self._v[:, : self.K] self.f = self._f[:, : self.K] if self.D == self.K == 1: self.FDM = False if isVmax: self.V = self._Vmax # maximizes B elif self._K < _K: # add elements self._K = _K logger.debug(f"{self._K=}") self.N = np.append( self._N, np.tile(self._N[0, 0], (self.D, _K - self._N.shape[1])), axis=1 ) # don't append N from _defaults, this might be in conflict with WDM! v = self.Lext ** (1 / np.arange(self._v.shape[1] + 1, _K + 1)) self.v = np.append(self._v, np.tile(v, (self.D, 1)), axis=1) self.f = np.append( self._f, np.tile(np.array(self._defaults["f"]).ravel()[0], (self.D, _K - self._f.shape[1])), axis=1, ) self.V = self.V # checks clipping @property def _Nmin(self) -> int: """Minimum number of shifts to (uniformly) sample temporal frequencies. Per direction at least one set with N ≥ 3 is necessary to solve for the three unknowns brightness 'a', modulation 'b' and coordinate x. """ if self.FDM: Nmin = int(np.ceil(2 * self.f.max() + 1)) # sampling theorem # todo: 2 * D * K + 1 -> fractional periods if static else: Nmin = 3 # todo: 1 -> use old decoder return Nmin @property def N(self) -> np.ndarray: """Number of (equally distributed) phase shifts.""" if self.D == 1 or len(np.unique(self._N, axis=0)) == 1: # sets along directions are identical N = self._N[0] # 1D else: N = self._N # 2D return N @N.setter def N(self, N: int | Sequence[int]): if self.K == 1 and np.all(np.array(N) == 1): # FTM _N = np.array([[1], [1]], int) self.SDM = True else: _N = np.maximum(np.array(N, int), self._Nmin) # make array, cast to dtype, clip if np.all(self.N == 1): self.SDM = False # go back from FTM if not _N.size: # empty array return _N = self._trim(_N) if np.all(_N == 1) and _N.shape[1] == 1: # any pass # FTM elif np.any(_N <= 2): for d in range(self.D): if not any(_N[d] >= 3): i = np.argmax(_N[d]) # np.argmin(_N[d]) _N[d, i] = 3 if self.WDM and not np.all(_N == 3): # WDM locks N logger.error("Couldn't set 'N': At least one Shift != 3.") return if self.FDM and not np.all(_N == _N[0, 0]): _N = np.tile(_N[0, 0], _N.shape) # all N must be equal if not np.array_equal(self._N, _N): self._N = _N logger.debug(f"self._N = {str(self._N).replace(chr(10), ',')}") self.D, self.K = self._N.shape logger.debug(f"{self.T=}") self._UMR = None # FDM: N -> fmax -> f -> v # empty cache self.UMR # compute and cache @property def lmin(self) -> float: """Minimum resolvable wavelength (in pixel units).""" # don't use self._fmax, else circular loop if self.FDM and self.static: fmax = min((self._Nmin - 1) / 2, self.Lext / self._lmin) return min(self._lmin, self.Lext / fmax) else: # fmax = (self._Nmin - 1) / 2 return self._lmin @lmin.setter def lmin(self, lmin: float): _lmin = float(max(self._lminmin, lmin)) if self._lmin != _lmin: self._lmin = _lmin logger.debug(f"{self._lmin=}") logger.debug(f"{self.vmax=}") # computed upon call self.l = self.l # l triggers v if self._mtf is not None: self._mtf = None @property def l(self) -> np.ndarray: """Wavelengths of fringe periods (in pixel units). When Lext changes, 'v' is kept constant and only 'l' is changed. """ return self.Lext / self.v @l.setter def l(self, l: float | Sequence[float] | str): # | Literal["linear", "exponential", "optimal"]): if isinstance(l, str): if "," in l: l = np.fromstring(l, sep=",") # elif l == "optimal": # todo: optimal l # lmin = int(np.ceil(self.lmin)) # lmax = int( # np.ceil( # max( # self.Lext / lmin, # todo: only if b differ slightly # self.lmin, # min(self.Lext / self.vopt, self.Lext), # np.sqrt(self.Lext), # ) # ) # ) # # if lmin == lmax and lmax < self.Lext: # lmax += 1 # # if lmax < self.Lext and not sympy.isprime(lmax): # lmax = sympy.ntheory.generate.nextprime(lmax, 1) # ensures lcm(a, lmax) >= Lext for all a >= lmin # # n = lmax - lmin + 1 # # l_ = np.array([lmin]) # l_max = lmin + 1 # lcm = l_ # while lcm < self.Lext: # lcm_new = np.lcm(lcm, l_max) # if lcm_new > lcm: # l_ = np.append(l_, l_max) # lcm = lcm_new # l_max += 1 # K = min(len(l_), self.K) # # C = sp.special.comb(n, K, exact=True, repetition=True) # number of unique combinations # combos = it.combinations_with_replacement(range(lmin, lmax + 1), K) # # # B = int(np.ceil(np.log2(lmax - 1) / 8)) * 8 # number of bytes required to store integers up to lmax # # combos = np.fromiter(combos, np.dtype((f"uint{B}", K)), C) # # kroot = self.Lext ** (1 / K) # if self.lmin <= kroot: # lcombos = np.array( # [l for l in combos if np.any(np.array([l]) > kroot) and np.lcm.reduce(l) >= self.Lext] # ) # else: # lcombos = np.array([l for l in combos if np.lcm.reduce(l) >= self.Lext]) # # # lcombos = filter(lcombos, K, self.Lext, lmin) # # # idx = np.argmax(np.sum(1 / lcombos**2, axis=1)) # # l = lcombos[idx] # # v = self.Lext / lcombos # b = self.mtf(v) # var = 1 / self.M / self.N * lcombos**2 / b**2 # todo: D, M # idx = np.argmax(np.sum(1 / var, axis=1)) # # l = lcombos[idx] # # if K < self.K: # l = np.concatenate((np.full(self.K - K, lmin), l)) # # # while lmax < self.Lext and np.gcd(lmin, lmax) != 1: # # lmax += 1 # maximum number of iterations? = min(next prime after lmax - lmax, max(0, Lext - lmax, )) # # l = np.array([lmin] * (self.K - 1) + [lmax]) # # # vmax = int(max(1 if self.K == 1 else 2, self.vmax)) # todo: ripples from int() # # v = np.array([vmax] * (self.K - 1) + [vmax - 1]) # two consecutive numbers are always coprime # # lv = self.Lext / v # # lv = np.maximum(self._lmin, np.minimum(lv, self.Lext)) # # # # idx = np.argmax((np.sum(1 / (l ** 2), axis=0), np.sum(1 / (lv ** 2), axis=0))) # # l = l if idx == 0 else lv # # print("l" if idx == 0 else "v") elif l == "close": lmin = int(max(np.ceil(self.lmin), self.Lext ** (1 / self.K) - self.K)) l = lmin + np.arange(self.K) while np.lcm.reduce(l) < self.Lext: l += 1 elif l == "small": lmin = int(np.ceil(self.lmin)) lmax = int(np.ceil(self.Lext ** (1 / self.K))) # wavelengths are around kth root of self.Lext if self.K >= 2: lmax += 1 if self.K >= 3: lmax += 1 if lmax % 2 == 0: # kth root was even lmax += 1 if self.K > 3: ith = self.K - 3 lmax = sympy.ntheory.generate.nextprime(lmax, ith) if lmin > lmax or lmax - lmin + 1 <= self.K: l = lmin + np.arange(self.K) else: lmax = max( lmin, min(lmax, int(np.ceil(self.Lext))) ) # max in outer condition ensures lmax >= lmin even if Lext < lmin if lmin == lmax and lmax < self.Lext: lmax += 1 # ensures lmin and lmax differ so that lcm(l) >= Lext n = lmax - lmin + 1 K = min(self.K, n) # ensures K <= n # C = sp.special.comb(n, K, exact=True, repetition=False) # number of unique combinations combos = np.array( [ c for c in it.combinations(range(lmin, lmax + 1), K) if np.any(np.array([c]) > self.Lext ** (1 / self.K)) and np.lcm.reduce(c) >= self.Lext ] ) idx = np.argmax(np.sum(1 / combos**2, axis=1)) l = combos[idx] if K < self.K: l = np.concatenate((l, np.arange(l.max() + 1, l.max() + 1 + self.K - K))) elif l == "exponential": l = np.concatenate(([np.inf], np.geomspace(self.Lext, self.lmin, self.K - 1))) elif l == "linear": l = np.concatenate(([np.inf], np.linspace(self.Lext, self.lmin, self.K - 1))) else: return self.v = self.Lext / np.array(l, float) @property def _l(self) -> np.ndarray: # kept for backwards compatibility with fringes-GUI """Wavelengths of fringe periods (in pixel units). When `X` or `Y` and hence `Lext` changes, 'v' is kept constant and only 'l' is changed. """ return self.Lext / self._v @property def vmax(self) -> float: """Maximum resolvable spatial frequency.""" return self.Lext / self.lmin @property def v(self) -> np.ndarray: """Spatial frequencies (number of fringe periods across extended coding length `Lext`).""" if self.D == 1 or len(np.unique(self._v, axis=0)) == 1: # sets along directions are identical v = self._v[0] # 1D else: v = self._v # 2D return v # todo: round to number of digits? @v.setter def v(self, v: float | Sequence[float] | str): # | Literal["linear", "exponential", "optimal"]): if isinstance(v, str): if "," in v: v = np.fromstring(v, sep=",") # if v == "optimal": # todo: optimal v # def vopt(self) -> float: # """Optimal spatial frequency for minimal decoding uncertainty.""" # # todo: test # if self._mtf is not None: # interpolate from measurement # v = np.arange(1, self.vmax + 1) # b = self.mtf(v) # N = self._N.ravel()[ # np.argpartition(self._N.ravel(), int(self._N.size // 2))[int(self._N.size // 2)]] # var = 1 / self.M / N / (v ** 2) / b ** 2 # todo: D, M # idx = np.argmax(np.sum(1 / var, axis=1)) # vopt = v[idx] # elif self.PSF > 0: # determine from PSF # vopt_ = 1 / (_2PI * self.PSF) # todo # lopt = 1 / vopt_ # vopt = self.Lext / lopt # vopt = self.vmax / 2 # approximation [Bothe2008] # else: # vopt = int(self.vmax) # # return vopt # # |{v}| = 2 # vmax = int(max(1 if self.K == 1 else 2, self.vopt)) # v = np.array([vmax] * (self.K - 1) + [vmax - 1]) # two consecutive numbers are always coprime # # # # # |{v}| = K # # vmax = int(max(self.K, self.vopt)) # # v = vmax - np.arange(self.K) elif v == "exponential": # K = int(np.ceil(np.log2(self.vmax))) + 1 # + 1: 2 ** 0 = 1 v = np.concatenate(([0], np.geomspace(1, self.vmax, self.K - 1))) elif v == "linear": v = np.concatenate(([0], np.linspace(1, self.vmax, self.K - 1))) else: return _v = np.array(v, float).clip(0, self.vmax) # make array, cast to dtype, clip # todo: allow negative values (effect: inverting amplitude?) if not _v.size: # empty array return _v = self._trim(_v) if self.FDM: if self.static: if ( _v.size != self.D * self.K # todo or not np.all(_v % 1 == 0) or not np.lcm.reduce(_v.astype(int, copy=False).ravel()) == np.prod(_v) # todo: equal ggt = 1 ? ): # todo: allow coprimes?! n = min(10, self.vmax // 2) ith = self.D * self.K pmax = sympy.ntheory.generate.nextprime(n, ith + 1) p = np.array(list(sympy.ntheory.generate.primerange(n, pmax + 1)))[:ith] # primes p = [p[-i // 2] if i % 2 else p[i // 2] for i in range(len(p))] # resort primes _v = np.sort(np.array(p, float).reshape((self.D, self.K)), axis=1) # resort primes logger.warning( f"Periods are not coprime. Changing values to {str(_v.round(3)).replace(chr(10), ',')}." ) # else: # vmax = (self._Nmax - 1) / 2 > _v # _v = np.minimum(_v, vmax) if not np.array_equal(self._v, _v): self._v = _v logger.debug(f"self._v = {str(self._v.round(3)).replace(chr(10), ',')}") logger.debug(f"self._l = {str(self._l.round(3)).replace(chr(10), ',')}") self.D, self.K = self._v.shape self.f = self._f self._UMR = None # empty cache self.UMR # compute and cache self._m = None # empty cache self._crt = None # empty cache self.crt # compute and cache (also computes _m) @property def _fmax(self): """Maximum temporal frequency (maximum number of periods to shift over).""" return ( min((self._Nmin - 1) / 2, self.vmax) if self.FDM and self.static else np.inf # (self._Nmax - 1) / 2 ) # todo: Nmin vs. Nmax @property def f(self) -> np.ndarray: """Temporal frequency (number of periods to shift over).""" if self.D == 1 or len(np.unique(self._f, axis=0)) == 1: # sets along directions are identical f = self._f[0] # 1D else: f = self._f # 2D return f @f.setter def f(self, f: float | Sequence[float]): _f = np.array(f, float).clip(-self._fmax, self._fmax) # make array, cast to dtype, clip if not _f.size: # empty array return _f = self._trim(_f) # 'f' must not be divisible by 'N' # todo: test this D = min(_f.shape[0], self._N.shape[0]) K = min(_f.shape[1], self._N.shape[1]) if np.any(_f[:D, :K] % self._N[:D, :K] == 0): # _f = np.ones(_f.shape) _f[:D, :K][_f[:D, :K] % self._N[:D, :K] == 0] = 1 if self.FDM: if self.static: _f = self._v # periods to shift over = one full revolution else: if ( _f.shape != (self.D, self.K) or not np.all(i % 1 == 0 for i in _f) or len(np.unique(np.abs(_f))) < _f.size ): # assure _f are int and absolute values of _f differ _f = np.arange(1, self.D * self.K + 1, dtype=float).reshape((self.D, self.K)) if 0 not in _f and not np.array_equal(self._f, _f): self._f = _f logger.debug(f"self._f = {str((-self._f if self.reverse else self._f).round(3)).replace(chr(10), ',')}") self.D, self.K = self._f.shape self.N = self._N # todo: remove if fractional periods is implemented, log warning @property def reverse(self) -> bool: """Flag for shifting fringes in reverse direction.""" return self._reverse @reverse.setter def reverse(self, reverse: bool): _reverse = bool(reverse) if self._reverse != _reverse: self._reverse = _reverse logger.debug(f"{self._reverse=}") logger.debug(f"self._f = {str((-self._f if self.reverse else self._f).round(3)).replace(chr(10), ',')}") @property def p0(self) -> float: """Phase offset. It can be used to e.g. let the fringe patterns start (at the origin) with a gray value of zero. """ return 0 if "MRD" in self.mode else self._p0 @p0.setter def p0(self, p0: float): # _p0 = float(np.sign(p0) * np.abs(p0) % _2PI) _p0 = float(p0) if self._p0 != _p0: self._p0 = _p0 logger.debug(f"self._p0 = {self._p0 / np.pi} \u03c0") # π @property def H(self) -> int: """Number of hues.""" return self.h.shape[0] # @H.setter # def H(self, H: int): # _H = int(max(1, H)) # # if self.H != _H: # if self.WDM: # logger.error("Couldn't set 'H': WDM is active.") # return # self.h = "w" * _H # elif _H == 1: # self.h = "w" # elif _H == 2: # self.h = "rb" # else: # rgb = "rgb" * int(np.ceil(_H / 3)) # repeat "rgb" this many times # self.h = rgb[:_H] @property def h(self) -> np.ndarray: """Hues i.e. colors of fringe patterns. Possible values are any sequence of RGB color triples ∈ [0, 255]. However, black (0, 0, 0) is not allowed. The hue values can also be set by assigning any combination of the following characters as a string:\n - 'r': red \n - 'g': green\n - 'b': blue\n - 'c': cyan\n - 'm': magenta\n - 'y': yellow\n - 'w': white\n Before decoding, repeating hues will be fused by averaging. """ return self._h @h.setter def h( self, h: float | Sequence[float] | Sequence[Sequence[float, float, float]] | Literal["w", "r", "g", "b", "c", "m", "y"] | Sequence[Literal["w", "r", "g", "b", "c", "m", "y"]], ): if isinstance(h, str): LUT = { "r": (255, 0, 0), "g": (0, 255, 0), "b": (0, 0, 255), "c": (0, 255, 255), "m": (255, 0, 255), "y": (255, 255, 0), "w": (255, 255, 255), } if all(c in LUT.keys() for c in h.lower()): # set(h.lower()).issubset(LUT.keys()) # set(h.lower()) <= LUT.keys()): # subset # todo: notation since which python version? h = [LUT[c] for c in h.lower()] elif h == "default": h = self._defaults["h"] else: return # make array, clip first and then cast to dtype to avoid integer under-/overflow _h = np.array(h).clip(0, 255).astype(float, copy=False) if not _h.size: # empty array return # trim: change ndim of '_h' to 2, i.e. its shape to (H, 3) if _h.ndim == 0: _h = np.full((self.H, 3), _h) elif _h.ndim == 1: if len(_h) == 3: _h = _h.reshape(1, 3) else: _h = np.repeat(_h.reshape(3, 1), 3, axis=1) elif _h.ndim == 2: _h = _h[:, :3] else: raise ValueError("Only 0 up to 2 dimensions are allowed.") if _h.shape[1] != 3: # length of color-axis must equal 3 logger.error("Couldn't set 'h': 3 color channels must be provided.") return if np.any(np.max(_h, axis=1) == 0): logger.error("Didn't set 'h': Black color is not allowed.") return if self.WDM and not self._monochrome: logger.error("Couldn't set 'h': 'WDM' is active, but not all hues are monochromatic.") return if not np.array_equal(self._h, _h): Hold = self.H self._h = _h logger.debug(f"self._h = {str(self._h).replace(chr(10), ',')}") if Hold != _h.shape[0]: logger.debug(f"self.H = {_h.shape[0]}") # computed upon call logger.debug(f"{self.M=}") # computed upon call @property def M(self) -> int: # float | np.ndarray """Number of averaged intensity samples.""" # M = np.sum(self.h, axis=0) / 255 # todo: might be a fraction M = np.count_nonzero(self.h, axis=0) # todo: always is an integer # M = np.atleast_1d(M[0]) if self._monochrome else M M = M[0] if self._monochrome else M M = M.min() # todo: array of M return int(M) # use int() or float() to ensure type is "int" instead of "numpy.core.multiarray.scalar" # @M.setter # def M(self, M: int): # _M = int(max(1, M)) # self.h = "w" * _M # # todo: float @property def _monochrome(self) -> bool: """True if all hues are monochromatic, i.e. the RGB values are identical for each hue.""" return all(len(set(h)) == 1 for h in self.h) @property def SDM(self) -> bool: """Spatial division multiplexing. The directions 'D' are multiplexed, resulting in a crossed fringe pattern. The amplitude 'B' is halved. It can only be activated if we have two directions, i.e. 'D' ≡ 2. The number of frames 'T' is reduced by the factor 2. """ return self._SDM @SDM.setter def SDM(self, SDM: bool): _SDM = bool(SDM) if _SDM: if self.D != 2: _SDM = False logger.error("Didn't set 'SDM': Pointless as only one dimension exist.") if self.grid not in {"image", "Cartesian"}: _SDM = False logger.error(f"Couldn't set 'SDM': 'grid' not in {{{'image', 'Cartesian'}}}.") if self.FDM: _SDM = False logger.error("Couldn't set 'SDM': FDM is active.") if self._SDM != _SDM: self._SDM = _SDM logger.debug(f"{self._SDM=}") logger.debug(f"{self.T=}") # computed upon call if self.SDM: self.V /= self.D else: self.V *= self.D @property def WDM(self) -> bool: """Wavelength division multiplexing. The shifts are multiplexed into the color channel, resulting in an RGB fringe pattern. It can only be activated if all shifts equal 3, i.e. 'N' ≡ 3. The number of frames 'T' is reduced by the factor 3. """ return self._WDM @WDM.setter def WDM(self, WDM: bool): _WDM = bool(WDM) if _WDM: if not np.all(self.N == 3): _WDM = False logger.error("Couldn't set 'WDM': At least one Shift != 3.") if not self._monochrome: _WDM = False logger.error("Couldn't set 'WDM': Not all hues are monochromatic.") if self.FDM: # todo: remove this, already covered by N _WDM = False logger.error("Couldn't set 'WDM': FDM is active.") if self._WDM != _WDM: self._WDM = _WDM logger.debug(f"{self._WDM=}") logger.debug(f"{self.T=}") # computed upon call @property def FDM(self) -> bool: """Frequency division multiplexing. The directions 'D' and the sets K are multiplexed, resulting in a crossed fringe pattern if 'D' ≡ 2. It can only be activated if 'D' ∨ 'K' > 1 i.e. 'D' * 'K' > 1. The amplitude 'b' is reduced by the factor 'D' * 'K'. Usually 'f' equals 1 and is essentially only changed if frequency division multiplexing ('FDM') is activated: Each set per direction receives an individual temporal frequency 'f', which is used in temporal demodulation to distinguish the individual sets. A minimal number of shifts Nmin ≥ ⌈ 2 * fmax + 1 ⌉ is required to satisfy the sampling theorem and N is updated automatically if necessary. If one wants a static pattern, i.e. one that remains congruent when shifted, set 'static' to True. """ return self._FDM @FDM.setter def FDM(self, FDM: bool): _FDM = bool(FDM) if _FDM: if self.D == self.K == 1: _FDM = False logger.error("Didn't set 'FDM': Dimensions * Sets = 1, so nothing to multiplex.") if self.SDM: _FDM = False logger.error("Couldn't set 'FDM': SDM is active.") if self.WDM: # todo: remove, already covered by N _FDM = False logger.error("Couldn't set 'FDM': WDM is active.") if self._FDM != _FDM: self._FDM = _FDM logger.debug(f"{self._FDM=}") # self.K = self._K self.N = self._N self.v = self._v if self.FDM: self.f = self._f else: self.f = np.ones((self.D, self.K)) # keep maximum possible visibility constant if self.FDM: self.B /= self.D * self.K else: self.B *= self.D * self.K @property def static(self) -> bool: """Flag for creating static fringes (so they remain congruent when shifted).""" return self._static @static.setter def static(self, static: bool): _static = bool(static) if self._static != _static: self._static = _static logger.debug(f"{self._static=}") self.v = self._v self.f = self._f @property def _ambiguous(self) -> bool: """True if unambiguous measurement range is smaller than the coding range.""" return bool(np.any(self.UMR < self.L * self.a)) @property def uwr(self) -> str: """Phase unwrapping method. - 'temporal': Temporal phase unwrapping.\n - 'spatial': Spatial phase unwrapping.\n - 'ftm': Fourier-transform method.\n - 'none': No unwrapping required. """ if np.all(self.v <= 1): uwr = "none" # only averaging elif self.K == 1 and np.all(self._N == 1) and self.grid in {"image", "Cartesian"}: # todo: v >> 1, i.e. l ~ 8 uwr = "ftm" # Fourier-transform method elif self._ambiguous: uwr = "spatial" else: uwr = "temporal" return uwr @property def mode(self) -> str: """Mode for coding. The following values can be set:\n - 'fast'\n - 'precise' """ return self._mode @mode.setter def mode(self, mode: Literal["fast", "precise", "MRD", "sRGB"]): # todo: add "NumPy", "Numba" _mode = str(mode) if self._mode != _mode and any(m in _mode for m in self._choices["mode"]): # _mode in self._choices["mode"]: self._mode = _mode logger.debug(f"{self._mode=}") @property def UMR(self) -> np.ndarray: """Unambiguous measurement range (in pixel units). The coding is only unique within the periodic interval [0, UMR). The UMR is derived from 'l' and 'v':\n - If 'l' ∈ ℕ, UMR = lcm('l'), with lcm() being the least common multiple.\n - Else, if 'v' ∈ ℕ, UMR = 'Lext' / gcd('v'), with gcd() being the greatest common divisor.\n - Else, if 'l' ∨ 'v' ∈ ℚ, lcm() resp. gcd() are extended to rational numbers.\n - Else, if 'l' ∧ 'v' ∈ ℝ ∖ ℚ, UMR = prod('l'), with prod() being the product operator. """ # cache if self._UMR is not None: return self._UMR elif not self._N.shape == self._v.shape: return np.zeros(self.D) precision = np.finfo(float).precision - 2 # todo = 13 atol = 10**-precision UMR = np.empty(self.D, float) for d in range(self.D): l = self._l[d].copy() v = self._v[d].copy() # if 1 in self._N[d] and self.uwr != "FTM": # here, in TPU twice the combinations have to be tried # # todo: test if valid # l[self._N[d] == 1] /= 2 # v[self._N[d] == 1] *= 2 if 0 in v: # equivalently: np.inf in l l = l[v != 0] v = v[v != 0] if len(l) == 0 or len(v) == 0: UMR[d] = 1 # one since we can only control discrete pixels continue if np.all(l % 1 == 0): # all l are integers UMR[d] = np.lcm.reduce(l.astype(int, copy=False)) elif np.all(v % 1 == 0): # all v are integers UMR[d] = self.Lext / np.gcd.reduce(v.astype(int, copy=False)) elif np.allclose(l, np.rint(l), rtol=0, atol=atol): # all l are integers within tolerance UMR[d] = np.lcm.reduce(np.rint(l).astype(int, copy=False)) elif np.allclose(v, np.rint(v), rtol=0, atol=atol): # all v are integers within tolerance UMR[d] = self.Lext / np.gcd.reduce(np.rint(v).astype(int, copy=False)) else: # mutual divisibility test K = len(v) # zeros may have been filtered out above for i in range(K - 1): for j in range(i + 1, K): if 0 <= l[i] % l[j] < atol or 1 - atol < l[i] % l[j] < 1: l[j] = 1 elif 0 <= l[j] % l[i] < atol or 1 - atol < l[j] % l[i] < 1: l[i] = 1 mask = l != 1 v = v[mask] l = l[mask] # number of decimals Dl = max(str(i)[::-1].find(".") for i in l) Dv = max(str(i)[::-1].find(".") for i in v) # estimate whether elements are rational or irrational if Dl < precision or Dv < precision: # rational numbers without integers if Dl <= Dv: ls = l * 10**Dl # wavelengths scaled UMR[d] = np.lcm.reduce(ls.astype(int, copy=False)) / 10**Dl logger.debug("Extended 'lcm' to rational numbers.") else: vs = v * 10**Dv # spatial frequencies scaled UMR[d] = self.Lext / (np.gcd.reduce(vs.astype(int, copy=False)) / 10**Dv) logger.debug("Extended 'gcd' to rational numbers.") else: # irrational numbers or rational numbers with more digits than "precision" UMR[d] = np.prod(l) self._UMR = UMR logger.debug(f"self.UMR = {str(self._UMR)}") if self._ambiguous: logger.warning("UMR < L: unwrapping will not be spatially independent and only yield a relative phase map.") return self._UMR @property def m(self) -> np.ndarray: """Moduli for Chinese remainder theorem.""" # todo: """coprime moduli""" if self._m is not None: # cache return self._m precision = np.finfo(float).precision - 2 # todo: 13 atol = 10**-precision m = np.empty((self.D, self.K), int) # moduli, i.e. integer ratios for d in range(self.D): # if all(l.is_integer() for l in self._l[d]): # ... if np.all(self._l[d] % 1 == 0): # all l are integers m_ = self._l[d].astype(int) gcd = np.gcd.reduce(m_) m[d] = m_ / gcd # lcm = np.lcm.reduce(m[d]) elif np.all(self._v[d] % 1 == 0): # all v are integers m_ = self._v[d].astype(int) lcm = np.lcm.reduce(m_) m[d] = lcm / m_ # changes succession of m_ elif np.allclose(self._l[d], np.rint(self._l[d]), rtol=0, atol=atol): m_ = np.rint(self._l[d]).astype(int) gcd = np.gcd.reduce(m_) m[d] = m_ / gcd # lcm = np.lcm.reduce(m[d]) elif np.allclose(self._v[d], np.rint(self._v[d]), rtol=0, atol=atol): m_ = self._v[d].astype(int) lcm = np.lcm.reduce(m_) m[d] = lcm / m_ # changes succession of m_ else: # extend lcm/gcd to non (ir)rational numbers l = self._l[d] v = self._v[d] # # mutual divisibility test # for i in range(self.K - 1): # for j in range(i + 1, self.K): # if 0 <= self._l[d, i] % self._l[d, j] < atol or 1 - atol < self._l[d, i] % self._l[d, j] < 1: # l[j] = 1 # elif 0 <= self._l[d, j] % self._l[d, i] < atol or 1 - atol < self._l[d, j] % self._l[d, i] < 1: # l[i] = 1 # mask = l != 1 # todo: WAVG # l = l[mask] # v = v[mask] # number of decimals Dl = max(str(i)[::-1].find(".") for i in l) Dv = max(str(i)[::-1].find(".") for i in v) # estimate whether elements are rational or irrational if Dl < precision or Dv < precision: # rational numbers without integers if Dl <= Dv: m_ = (l * 10**Dl).astype(int) # wavelengths scaled gcd = np.gcd.reduce(m_) m[d] = m_ / gcd else: m_ = (v * 10**Dv).astype(int) # spatial frequencies scaled lcm = np.lcm.reduce(m_) m[d] = lcm / m_ else: # irrational numbers or rational numbers with more digits than 'precision' m[d] = 0 # lcm = np.prod(l) self._m = m logger.debug(f"self._m = {str(self._m)}") return m @property def gcd(self) -> np.ndarray: """Greatest common divisor of moduli. Must be smaller than noise. """ gcd = self._l / self.m return np.mean(gcd, axis=1) # each 'gcd' should be identical; we average to get more precision @property def crt(self) -> np.ndarray: """Coefficients for Chinese remainder theorem.""" if self._crt is not None: # cache return self._crt crt = np.empty((self.D, self.K), int) # Solution of multiple congruences. for d in range(self.D): lcm = np.lcm.reduce(self.m[d]) if lcm > 0: for i in range(self.K): # try: # M = int(lcm / self.m[d, i]) # except ZeroDivisionError: # crt[d, i] = 0 # no solution # continue M = int(lcm / self.m[d, i]) # if np.any(crt[d, :i] % self.m[d, i] == 1): # one element already equals 1 modulo m[i] # crt[d, i] = -1 # 0 # todo: WAVG them? # continue try: # M_ = pow(M, -1, self.m[d, i]) # only for Python 3.8+ M_ = sympy.core.numbers.mod_inverse(M, self.m[d, i]) except ValueError: crt[d, i] = 0 # no solution found else: crt[d, i] = M * M_ if np.any(crt[d, :i] % self.m[d, i] == 1): # one element already equals 1 modulo m[i] crt[d, i] *= -1 # todo: WAVG them? continue else: crt[d, :] = 0 # no solution found # coprimality test j0 = -1 for i in range(self.K): for j in range(i + 1, self.K): if np.gcd(self.m[d, i], self.m[d, j]) != 1: j0 = j if j0 > 0: crt[d, j0] = 0 self._crt = crt logger.debug(f"self._crt = {str(self._crt)}") return self._crt @property def uq(self): """Positional uncertainty (standard deviation) due to quantization noise.""" uq = 1 / np.sqrt(12) # quantization noise up = np.sqrt(2) / np.sqrt(self.M) / np.sqrt(self._N) * uq / self.B # phase uncertainties # todo: M ux = up / _2PI * self._l # position uncertainties u = np.sqrt(1 / np.sum(1 / ux**2, axis=1)) # position uncertainty return u @property def SQNR(self): """Signal to quantization noise ratio. It is a measure of how many points can be distinguished within the screen length [0, L). """ return self.L / self.uq @property def SQNRdB(self): """Signal to quantization noise ratio, in unit 'dB'.""" return 20 * np.log10(self.SQNR) @property def DRQ(self): """Dynamic range due to quantization noise. It is a measure of how many points can be distinguished within the unambiguous measurement range [0, UMR). """ return self.UMR / self.uq @property def DRQdB(self): """Dynamic range, in unit 'dB'.""" return 20 * np.log10(self.DRQ) @property def eta(self) -> float: """Spatial coding efficiency.""" eta = self.L / self.UMR eta[self.UMR < self.L] = 0 return eta @property def _params(self) -> dict: """Base parameters required for en- & decoding fringe patterns. This contains all property objects of the class which have a setter method, i.e. are (usually) not derived from others. """ params = {"__version__": version(__package__)} for k in sorted(self._setters): v = getattr(self, k) if isinstance(v, np.ndarray): params[k] = v.tolist() elif isinstance(v, np.dtype): params[k] = str(v) else: params[k] = v return params @_params.setter def _params(self, params: dict): params_old = self._params.copy() # copy current params; they become old params # set params for k in self._setters: # iterate over `_setters`, which are sorted! if k in params: v = params[k] setattr(self, k, v) # check params for k in self._setters: # iterate over `_setters`, which are sorted! if k in params: v = params[k] if k in "N l v f".split(): if isinstance(v, (int, float)): if not np.all(getattr(self, k) == v): break elif getattr(self, k).ndim != np.array(v).ndim: if not np.array_equal(getattr(self, k), v[0]): break else: if not np.array_equal(getattr(self, k), v): break else: if not np.array_equal(getattr(self, k), v): break else: # else clause executes only after the loop completes normally, i.e. did not encounter a break return logger.warning( f"Parameter '{k}' got overwritten by interdependencies. " f"Choose consistent initialization values. " f"Restoring old values." ) self._params = params_old # restore old params # note: class attributes are not available in comprehensions _glossary: dict = { k: v.__doc__.splitlines()[0] for k, v in sorted(vars().items()) if k[0] != "_" and isinstance(v, property) } # vars() is a workaround for class attributes, which are not accessible in comprehensions _setters: tuple = sorted( tuple( k for k, v in vars().items() if k[0] != "_" and isinstance(v, property) and v.fset is not None ) # attention: exclude `_params`! ) # note: sorted() ensures setting params in the same/right order, e.g. in __init__() and _params.setter # todo: does order matter? _help: dict = { k: v.__doc__.splitlines()[0] for k, v in vars().items() if k[0] != "_" and isinstance(v, property) and v.fset is not None } _defaults: dict = __init__.__kwdefaults__ # _types: dict = __init__.__annotations__ _types: dict = {k: type(v) for k, v in _defaults.items()} _types_str: dict = {k: v.__name__ for k, v in _types.items()} # restrict instance attributes to the ones listed here # (commend the next line out or add '__dict__' to prevent this) __slots__ = tuple("_" + k for k in _defaults.keys()) + ( "_mtf", "_UMR", "_crt", "_m", "_t", ) # automated continuation of the class docstring following the NumPy style guide: # https://numpydoc.readthedocs.io/en/latest/format.html#documenting-classes # https://sphinxcontrib-napoleon.readthedocs.io/en/latest/example_numpy.html # __doc__ += "\n" __doc__ += "\nParameters\n----------" # implemented as properties __doc__ += "\n*args : iterable\n Non-keyword arguments are explicitly discarded." __doc__ += "\n Only keyword arguments are considered." for __k, __v in _defaults.items(): # do this in for loop instead of a comprehension, # because for the latter, names in class scope are not accessible __doc__ += f"\n{__k} : {'array_like' if __k in 'N l v f h'.split() else _types_str[__k]}, default={__v}" __doc__ += f"\n {_help[__k]}" # __doc__ += "\n\nAttributes\n----------" # implemented as properties and derived from Parameters # for __k, __v in sorted(vars().items()): # # do this in for loop instead of a comprehension, # # because for the latter, names in class scope are not accessible # if __k not in _defaults and __k in _glossary: # # __doc__ += f"\n{__k} : {'array_like' if __k in "N l v f h".split() else _types_str[__k]}\n {_glossary[__k]}" # __doc__ += f"\n{__k}\n {_glossary[__k]}" __doc__ += "\n" __doc__ += "\nMethods\n-------" __doc__ += f"\nencode\n {encode.__doc__.splitlines()[0]}" __doc__ += "\n See `encode`." __doc__ += f"\ndecode\n {decode.__doc__.splitlines()[0]}" __doc__ += "\n See `decode`." # todo: add all public methods? which should be public? # for __k, __v in sorted(vars().items()): # # do this in for loop instead of a comprehension, # # because for the latter, names in class scope are not accessible # if __k not __k.startswith("_") and callable(__v): # __doc__ += f"\n{__k}\n : {__v.__doc__.splitlines()[0]}" del __k, __v