Source code for qspec.models._base

# -*- coding: utf-8 -*-
"""
qspec.models._base

Created on 14.03.2022

@author: Patrick Mueller

Base classes for lineshape models.
"""

import numpy as np

from qspec.tools import merge_intervals

__all__ = ['Model', 'Empty', 'NPeak', 'Offset', 'Amplifier', 'Custom', 'YPars', 'Listed', 'Summed', 'Linked']


np_version = np.version.version.split('.')


def _is_unc(fix):
    if isinstance(fix, float):
        return True
    if not isinstance(fix, str):
        return False

    j, k = fix.find('('), fix.find(')')
    try:
        if j == -1 or k == -1:
            raise ValueError('Not a value with uncertainty.')
        _ = '{}({})'.format(float(fix[:j]), float(fix[j+1:k]))
    except ValueError:
        return False
    return True


def _val_fix_to_val(val, fix):
    if isinstance(fix, float):
        return val
    j = fix.find('(')
    return float(fix[:j])


def _fix_to_unc(fix):
    if isinstance(fix, float):
        return fix
    j, k = fix.find('('), fix.find(')')
    return float(fix[j+1:k])


def _args_ordered(args, order):
    return [args[i] for i in order]


def _poly(x, *args):
    return np.sum([args[n] * x ** n for n in range(len(args))], axis=0)


[docs]class Model: """ Base class for all models. """ def __init__(self, model=None): self.model = model self.type = 'Model' def __call__(self, x, *args, **kwargs): return self.evaluate(x, *self.update_args(args), **kwargs)
[docs] def evaluate(self, x, *args, **kwargs): # Reimplement this function in subclasses. pass
def _add_arg(self, name, val, fix, link): if name in self.names: raise ValueError('Parameter {} already exists.'.format(name)) self.names.append(name) self.vals.append(val) self.fixes.append(fix) self.links.append(link) self.expressions.append('args[{}]'.format(self._index)) self.p[name] = self._index self._index += 1 self._size += 1 @property def description(self): label = '' super_model = self while super_model is not None: if isinstance(super_model, Listed): label += super_model.type + '[0].' super_model = super_model.models[0] else: label += super_model.type + '.' super_model = super_model.model return label[:-1] @property def size(self): """ :returns: The number of parameters required by the model. """ return self._size @property def dx(self): return 0.1 if self.model is None else self.model.dx @property def error(self): return self._error @error.setter def error(self, value): self._error = value @property def model(self): return self._model @model.setter def model(self, value): self._model = value if self._model is None: self.names, self.vals, self.fixes, self.links = [], [], [], [] self.expressions = [] self.p = {} self._index = 0 self._size = 0 self._error = '' else: self.names, self.vals, self.fixes, self.links = \ self._model.names, self._model.vals, self._model.fixes, self._model.links self.expressions = self._model.expressions self.p = self._model.p self._index = len(self._model.names) self._size = len(self._model.names) self._error = self._model.error
[docs] def get_pars(self): return zip(self.names, self.vals, self.fixes, self.links)
[docs] def set_pars(self, pars, force=False): for i, p in enumerate(pars): self.set_val(i, p[0], force=force) self.set_fix(i, p[1], force=force) self.set_link(i, p[2], force=force)
[docs] def set_vals(self, vals, force=False): for i, val in enumerate(vals): self.set_val(i, val, force=force)
[docs] def set_fixes(self, fixes, force=False): for i, fix in enumerate(fixes): self.set_fix(i, fix, force=force)
[docs] def set_val(self, i, val, force=False): if force or isinstance(val, int) or isinstance(val, float): if self.model is None: self.vals[i] = val else: self.model.set_val(i, val, force=True) # Set val for all sub-models
# to ensure set_val is called for a ListedModel if one is part of the sub-models. # This is only needed for the vals since these may be predicted by the specific model. # Everything else is handled by the top-model.
[docs] def set_fix(self, i, fix, force=False): if force: self.fixes[i] = fix return if isinstance(fix, int) or isinstance(fix, float): if isinstance(fix, bool): fix = bool(fix) elif fix <= 0 or np.isinf(fix): fix = fix <= 0 else: fix = float(fix) expr = 'args[{}]'.format(i) elif isinstance(fix, str): j, k = fix.find('('), fix.find(')') try: if j == -1 or k == -1: raise ValueError('Not a value with uncertainty.') fix = '{}({})'.format(float(fix[:j]), float(fix[j+1:k])) expr = 'args[{}]'.format(i) except ValueError: _fix = fix for j, name in enumerate(self.names): _fix = _fix.replace(name, 'eval(self.expressions[{}])'.format(j)) expr = _fix try: self._eval_zero_division(self.vals, expr) except (ValueError, TypeError, SyntaxError, NameError) as e: print('Invalid expression for parameter \'{}\': {}. Got a {}.'.format(self.names[i], fix, repr(e))) return elif isinstance(fix, list): if len(fix) == 0: fix = [0, 1] elif len(fix) == 1: fix = [0, fix[0]] else: fix = fix[:2] expr = 'args[{}]'.format(i) else: return temp_expr = self.expressions[i] temp_fix = self.fixes[i] self.expressions[i] = compile(expr, '<string>', 'eval', optimize=2) # Compile beforehand to save time. self.fixes[i] = fix try: self.update_args(self.vals) except RecursionError as e: print('Expression for {} with fix {} form a loop. Got a {}.'.format(self.names[i], fix, repr(e))) self.expressions[i] = temp_expr self.fixes[i] = temp_fix
def _eval_zero_division(self, args, expr): try: return eval(expr, {}, {'self': self, 'args': args}) except ZeroDivisionError: return 0.
[docs] def update_args(self, args): return tuple(self._eval_zero_division(args, expr) for expr in self.expressions)
[docs] def update(self): self.set_vals(self.update_args(self.vals), force=False)
[docs] def min(self): return -1. if self.model is None else self.model.min()
[docs] def max(self): return 1. if self.model is None else self.model.max()
[docs] def intervals(self): return [[self.min(), self.max()]] if self.model is None else self.model.intervals()
[docs] def x(self): return np.concatenate([np.arange(i[0], i[1], self.dx, dtype=float) for i in self.intervals()], axis=0)
[docs] def fit_prepare(self): bounds = (-np.inf, np.inf) fixed = [fix for fix in self.fixes] b_lower = [] b_upper = [] _bounds = False for i, fix in enumerate(self.fixes): if isinstance(fix, bool): b_lower.append(-np.inf) b_upper.append(np.inf) elif _is_unc(fix): b_lower.append(-np.inf) b_upper.append(np.inf) fixed[i] = False elif isinstance(fix, list): _bounds = True b_lower.append(fix[0]) b_upper.append(fix[1]) fixed[i] = False elif isinstance(fix, str): b_lower.append(-np.inf) b_upper.append(np.inf) fixed[i] = True _fix = fix else: raise TypeError('The type {} of the element {} in self.fixes is not supported.' .format(type(fix), fix)) if _bounds: bounds = (b_lower, b_upper) return fixed, bounds
[docs]class Empty(Model): """ An empty model, returning zeros with the same shape as x. """ def __init__(self): super().__init__(model=None) self.type = 'Empty'
[docs] def evaluate(self, x, *args, **kwargs): return np.zeros_like(x)
[docs]class NPeak(Model): """ Evaluates the given 'model' at the positions x\ :sub:`i` with scalings p\ :sub:`i` where i < 'n_peaks'. """ def __init__(self, model, n_peaks=1): super().__init__(model=model) self.type = 'NPeak' self.n_peaks = int(n_peaks) for n in range(self.n_peaks): self._add_arg('x{}'.format(n), 0., n == 0, False) self._add_arg('p{}'.format(n), 1., n == 0, False)
[docs] def evaluate(self, x, *args, **kwargs): return np.sum([args[self.model.size + 2 * n + 1] * self.model.evaluate(x - args[self.model.size + 2 * n], *args[:self.model.size]) for n in range(self.n_peaks)], axis=0)
[docs] def min(self): min_center = min(self.vals[self.p['x{}'.format(n)]] for n in range(self.n_peaks)) return min_center + self.model.min()
[docs] def max(self): max_center = max(self.vals[self.p['x{}'.format(n)]] for n in range(self.n_peaks)) return max_center + self.model.max()
[docs] def intervals(self): return merge_intervals([[i[0] + self.vals[self.model.size + 2 * n], i[1] + self.vals[self.model.size + 2 * n]] for i in self.model.intervals() for n in range(self.n_peaks)])
[docs]class Offset(Model): """ Cuts the x-axis and adds y-axis offsets to every segment. """ def __init__(self, model=None, x_cuts=None, offsets=None): """ :param model: The model the offset will be added to. If None, the offset will be added to zero. :param x_cuts: x values where to cut the x-axis. :param offsets: A list of maximally considered polynomial orders for each slice. The list must have length len(x_cuts) + 1. """ super().__init__(model=model) self.type = 'Offset' if x_cuts is None: x_cuts = [] self.x_cuts = sorted(list(x_cuts)) self.offsets = offsets if self.offsets is None: self.offsets = [0] if len(self.offsets) != len(self.x_cuts) + 1: raise ValueError('The parameter offset must be a list of size \'len(x_cuts) + 1\'' ' and contain the maximally considered polynomial order for each slice.') self.offset_map = [] self.offset_masks = [] self.update_on_call = True self.gen_offset_map()
[docs] def evaluate(self, x, *args, **kwargs): if self._model is None: return self._offset(x, *args) return self.model.evaluate(x, *args[:self.model.size]) + self._offset(x, *args)
[docs] def set_x_cuts(self, x_cuts): x_cuts = list(x_cuts) if len(x_cuts) != len(self.x_cuts): raise ValueError('\'x_cuts\' must not change its size.') self.x_cuts = sorted(list(x_cuts))
def _offset(self, x, *args): if self.update_on_call: self.gen_offset_masks(x) ret = np.zeros_like(x) for i, mask in enumerate(self.offset_masks): ret[mask] = _poly(x[mask], *_args_ordered(args, self.offset_map[i])) return ret
[docs] def gen_offset_map(self): self.offset_map = [] for i, n in enumerate(self.offsets): self.offset_map.append([]) for k in range(n + 1): self.offset_map[-1].append(self._index) self._add_arg('off{}e{}'.format(i, k), 0., False, False)
""" Preprocessing """
[docs] def gen_offset_masks(self, x): self.offset_masks = [] for x0, x1 in zip([np.min(x) - 1., ] + self.x_cuts, self.x_cuts + [np.max(x) + 1., ]): x_mean = 0.5 * (x0 + x1) self.offset_masks.append(np.abs(x - x_mean) < x1 - x_mean)
[docs] def guess_offset(self, x, y): for i, mask in enumerate(self.offset_masks): self.vals[self.p['off{}e0'.format(i)]] = 0.5 * (y[mask][0] + y[mask][-1]) try: self.vals[self.p['off{}e1'.format(i)]] = (y[mask][-1] - y[mask][0]) / (x[mask][-1] - x[mask][0]) except KeyError: return
[docs]class Amplifier(Model): """ A polynomial of order 'order'. """ def __init__(self, order=None): super().__init__(model=None) self.type = 'Amplifier' if order is None: order = 1 self.order = order for n in range(order + 1): self._add_arg('a{}'.format(n), 1. if n == 1 else 0., False, False) self._min = -10 self._max = 10
[docs] def evaluate(self, x, *args, **kwargs): self._min = np.min(x) self._max = np.max(x) return _poly(x, *args)
@property def dx(self): return 1e-2
[docs] def min(self): return self._min
[docs] def max(self): return self._max
[docs]class Custom(Model): def __init__(self, model=None, parameters=None): super().__init__(model=model) self.type = 'Custom' if parameters is None: parameters = [] self.parameters = parameters for p in self.parameters: self._add_arg(p, 0., False, False)
[docs] def evaluate(self, x, *args, **kwargs): if self.model is None: return np.array(args, dtype=float) return self.model.evaluate(x, *args[:self.model.size], **kwargs)
[docs]class YPars(Model): def __init__(self, model): super().__init__(model=model) self.type = 'YPars' self.p_y = [i for i, fix in enumerate(self.model.fixes) if _is_unc(fix)]
[docs] def evaluate(self, x, *args, **kwargs): return np.concatenate([self.model.evaluate(x, *args, **kwargs), np.array([args[p_y] for p_y in self.p_y], dtype=float)], axis=0)
[docs]class Listed(Model): def __init__(self, models, labels=None): super().__init__(model=None) self.type = 'Listed' self.models = models # models is just a reference to a list defined somewhere else. self.labels = labels if self.labels is None: if len(self.models) == 1: self.labels = ['', ] else: self.labels = ['__{}'.format(i) for i in range(len(self.models))] self.slices = [] self.model_map = [] self.index_map = [] for i, (model, label) in enumerate(zip(self.models, self.labels)): self.slices.append(slice(self._index, self._index + model.size, 1)) for j, (name, val, fix, link) in enumerate(model.get_pars()): self.model_map.append(i) self.index_map.append(j) if isinstance(fix, str) and not _is_unc(fix): for _name in model.names: fix = fix.replace(_name, '{}{}'.format(_name, label)) self._add_arg('{}{}'.format(name, label), val, fix, link) self.set_fixes(list(self.fixes))
[docs] def set_val(self, i, val, force=False): super().set_val(i, val, force=force) if i < len(self.model_map): self.models[self.model_map[i]].set_val(self.index_map[i], self.vals[i], force=True)
[docs] def set_fix(self, i, fix, force=False): super().set_fix(i, fix, force=force) if i < len(self.model_map): self.models[self.model_map[i]].set_fix(self.index_map[i], self.fixes[i], force=True)
[docs] def inherit_vals(self, force=False): self.set_vals([val for model in self.models for val in model.vals], force=force)
[docs] def inherit_fixes(self, force=False): self.set_fixes([fix for model in self.models for fix in model.fixes], force=force)
[docs]class Summed(Listed): def __init__(self, models, labels=None): super().__init__(models, labels=labels) self.type = 'Summed' self.indices_add = [] for n, (model, label) in enumerate(zip(self.models, self.labels)): self.indices_add.append([self._index, self._index + 1]) self._add_arg('center{}'.format(label), 0., False, False) self._add_arg('int{}'.format(label), 1., False, False)
[docs] def evaluate(self, x, *args, **kwargs): return np.sum([args[i[1]] * model.evaluate(x - args[i[0]], *args[_slice], **kwargs) for model, _slice, i in zip(self.models, self.slices, self.indices_add)], axis=0)
@property def dx(self): return min(model.dx for model in self.models)
[docs] def min(self): return min(model.min() for model in self.models)
[docs] def max(self): return max(model.max() for model in self.models)
[docs] def intervals(self): return merge_intervals([[i[0] + self.vals[j[0]], i[1] + self.vals[j[0]]] for model, j in zip(self.models, self.indices_add) for i in model.intervals()])
[docs]class Linked(Listed): def __init__(self, models): super().__init__(models, labels=None) self.type = 'Linked' for i, (name, val, fix, link) in enumerate(self.get_pars()): if link and (not fix or isinstance(fix, list) or _is_unc(fix)): _name = name[:name.rfind('__')] for j, model in enumerate(self.models): if j < self.model_map[i] and _name in model.names: self.set_fix(i, '{}__{}'.format(_name, j)) break
[docs] def evaluate(self, x, *args, **kwargs): return np.concatenate(tuple(model.evaluate(_x, *args[_slice], **kwargs) for model, _slice, _x in zip(self.models, self.slices, x)), axis=0)
[docs] def set_fix(self, i, fix, force=False): super(Listed, self).set_fix(i, fix, force=force)