Source code for pygimli.frameworks.modelling

"""pyGIMLi Modelling Frameworks."""
import numpy as np
import pygimli as pg

DEFAULT_STYLES = {
    'Default': {
        'color': 'C0',
        'lw': 1.5,
        'linestyle': '-'
    },
    'Data': {
        'color': 'C0',  # blueish
        'lw': 1.5,
        'linestyle': ':',
        'marker': 'o',
        'ms': 6
    },
    'Response': {
        'color': 'C0',  # blueish
        'lw': 2.0,
        'linestyle': '-',
        'marker': 'None',
        'alpha': 0.4
    },
    'Error': {
        'color': 'C3',  # reddish
        'lw': 0,
        'linestyle': '-',
        'elinewidth': 2,
        'alpha': 0.5
    },
}


[docs] class Modelling(pg.core.ModellingBase): """Abstract Forward Operator. Abstract Forward Operator, i.e. base class for deriving forward operators that are or that use a Modelling instance. Todo ---- * Doc: - describe members (model transformation, dictionary of region properties) * think about splitting all mesh related into MeshModelling * clarify difference: setData(array|DC), setDataContainer(DC), setDataValues(array) * clarify dataSpace(comp. ModelSpace): The unique spatial or temporal origin of a datapoint (time, coordinates, receiver/transmitter indices, counter) - Every inversion needs, dataValues and dataSpace - DataContainer contain, dataValues and dataSpace - initialize both with initDataSpace(), initModelSpace * createJacobian should also return J """
[docs] def __init__(self, **kwargs): """Initialize. Attributes ---------- fop : pg.frameworks.Modelling data : pg.DataContainer modelTrans : [pg.trans.TransLog()] Keyword Arguments ----------------- **kwargs: fop: Modelling """ self._fop = None # pg.frameworks.Modelling .. not needed .. remove it self._data = None # dataContainer self._modelTrans = None self.fop = kwargs.pop('fop', None) # super(Modelling, self).__init__(**kwargs) super().__init__(**kwargs) self._regionProperties = {} self._interRegionCouplings = [] self._regionsNeedUpdate = False self._regionChanged = True self._regionManagerInUse = False self.modelTrans = pg.trans.TransLog() # Model transformation operator
def __hash__(self): """Create a hash for Method Manager.""" # ^ pg.utils.dirHash(self._regionProperties) if self._data is not None: return pg.utils.strHash(str(type(self))) ^ hash(self._data) else: return pg.utils.strHash(str(type(self)))
[docs] def Sx(self, x): """Right-hand side multiplication of Jacobian J*x. By default, uses self.jacobian().mult(x) Overwrite for efficient use with gradient-type inversion. """ return self.jacobian().mult(x)
[docs] def STy(self, y): """Right-hand side multiplication of Jacobian J.T*y. By default, uses self.jacobian().transMult(x) Overwrite for efficient use with gradient-type inversion. """ return self.jacobian().transMult(y)
def __call__(self, *args, **kwargs): """Call forward operator.""" return self.response(*args, **kwargs) @property def fop(self): """Forward operator.""" return self._fop @fop.setter def fop(self, fop): """Set forward operator.""" if fop is not None: if not isinstance(fop, pg.frameworks.Modelling): pg.critical('Forward operator needs to be an instance of ' 'pg.modelling.Modelling but is of type:', fop) self._fop = fop @property def data(self): """Return data.""" return self._data @data.setter def data(self, d): """Set data (short property setter).""" self.setData(d)
[docs] def setData(self, data): """Set data (actual version).""" if isinstance(data, pg.DataContainer): self.setDataContainer(data) else: self._data = data
[docs] def setDataPost(self, data): """Call when the dataContainer has been set successfully.""" pass
[docs] def setDataContainer(self, data): """Set Data container.""" if self.fop is not None: pg.critical('in use?') self.fop.setData(data) else: super().setData(data) self._data = data self.setDataPost(self.data)
@property def modelTrans(self): """Return model transformation.""" self._applyRegionProperties() if self.regionManager().haveLocalTrans(): return self.regionManager().transModel() return self._modelTrans @modelTrans.setter def modelTrans(self, tm): """Set model transformation.""" if isinstance(tm, str): tm = pg.core.trans.str2Trans(tm) self._modelTrans = tm
[docs] def regionManager(self): """Region manager.""" self._regionManagerInUse = True # initialize RM if necessary super().regionManager() # set all local properties self._applyRegionProperties() return super().regionManager()
@property def parameterCount(self): """Return parameter count.""" pC = self.regionManager().parameterCount() if pC == 0: pg.warn("Parameter count is 0") return pC
[docs] def ensureContent(self): """Whatever this is.""" pass
[docs] def initModelSpace(self, **kwargs): """TODO.""" pass
[docs] def createDefaultStartModel(self, dataVals): """Create the default startmodel as the median of the data values.""" pg.critical("'don't use me")
[docs] def createStartModel(self, dataVals=None): """Create the default startmodel as the median of the data values. Overwriting might be a good idea. Its used by inversion to create a valid startmodel if there are no starting values from the regions. """ if dataVals is not None: mv = pg.math.median(dataVals) pg.info(f"Use median(data values)={mv}") sm = pg.Vector(self.parameterCount, mv) else: sm = self.regionManager().createStartModel() return sm
[docs] def clearRegionProperties(self): """Clear all region parameter.""" self._regionChanged = True self._regionProperties = {}
[docs] def regionProperties(self, regionNr=None): """Return dictionary of all properties for region number regionNr.""" if regionNr is None: return self._regionProperties try: return self._regionProperties[regionNr] except KeyError: print(self._regionProperties) pg.error("no region for region #:", regionNr)
[docs] def setRegionProperties(self, regionNr, **kwargs): """Set region properties. regionNr can be '*' for all regions. startModel=None, limits=None, trans=None, cType=None, zWeight=None, modelControl=None, background=None, fix=None, single=None, correlationLengths=None, dip=None, strike=None Parameters ---------- regionNr : int, [ints], '*' Region number, list of numbers, or wildcard "*" for all. startModel : float starting model value limits : [float, float] lower and upper limit for value using a barrier transform trans : str transformation for model barrier: "log", "cot", "lin" cType : int constraint (regularization) type zWeight : float relative weight for vertical boundaries background : bool exclude region from inversion completely (prolongation) fix : float exclude region from inversion completely (fix to value) single : bool reduce region to one unknown correlationLengths : [floats] correlation lengths for geostatistical inversion (x', y', z') dip : float [0] angle between x and x' (first correlation length) strike : float [0] angle between y and y' (second correlation length) """ if regionNr == '*': for regionNr in self.regionManager().regionIdxs(): self.setRegionProperties(regionNr, **kwargs) return elif isinstance(regionNr, (list, tuple)): for r in regionNr: self.setRegionProperties(r, **kwargs) return pg.verbose(f'Set property for region: {regionNr}: {kwargs}') if regionNr not in self._regionProperties: self._regionProperties[regionNr] = {'startModel': None, 'modelControl': 1.0, 'zWeight': 1.0, 'cType': None, # RM defaults 'limits': [0, 0], 'trans': 'Log', # RM defauts 'background': None, 'single': None, 'fix': None, 'correlationLengths': None, 'dip': None, 'strike': None, } for key in list(kwargs.keys()): val = kwargs.pop(key) if val is not None and self._regionProperties[regionNr][key] != val: self._regionsNeedUpdate = True self._regionProperties[regionNr][key] = val if len(kwargs) > 0: pg.warn('Unhandled region properties:', kwargs)
[docs] def setInterRegionCoupling(self, region1, region2, weight=1.0): """Set the weighting for constraints across regions.""" region1 = self.regionManager().regionIdxs() if region1 == "*" else [region1] region2 = self.regionManager().regionIdxs() if region2 == "*" else [region2] for reg1 in region1: for reg2 in region2: if reg1 != reg2 and \ (not self._regionProperties[reg1]['background'] and not self._regionProperties[reg2]['background']): self._interRegionCouplings.append([reg1, reg2, weight]) self._regionsNeedUpdate = True
def _applyRegionProperties(self): """Apply the region properties from dictionary into the region man.""" if self._regionsNeedUpdate is False: return # call super class her because self.regionManager() calls allways # __applyRegionProperies itself rMgr = super().regionManager() for rID, vals in self._regionProperties.items(): if vals['fix'] is not None and rMgr.region(rID).fixValue() != vals['fix']: vals['background'] = True rMgr.region(rID).setFixValue(vals['fix']) self._regionChanged = True if (vals['background'] is not None and rMgr.region(rID).isBackground() != vals['background']): rMgr.region(rID).setBackground(vals['background']) self._regionChanged = True if (vals['single'] is not None and rMgr.region(rID).isSingle() != vals['single']): rMgr.region(rID).setSingle(vals['single']) self._regionChanged = True if vals['startModel'] is not None: rMgr.region(rID).setStartModel(vals['startModel']) if vals['trans'] is not None: rMgr.region(rID).setModelTransStr_(vals['trans']) if (vals['cType'] is not None and rMgr.region(rID).constraintType() != vals['cType']): self.clearConstraints() rMgr.region(rID).setConstraintType(vals['cType']) if vals['zWeight'] is not None: rMgr.region(rID).setZWeight(vals['zWeight']) self.clearConstraints() rMgr.region(rID).setModelControl(vals['modelControl']) if vals['limits'][0] != 0: rMgr.region(rID).setLowerBound(vals['limits'][0]) if vals['limits'][1] > vals['limits'][0]: rMgr.region(rID).setUpperBound(vals['limits'][1]) if vals['correlationLengths'] is not None: self.clearConstraints() if vals['dip'] is not None: self.clearConstraints() if vals['strike'] is not None: self.clearConstraints() for r1, r2, w in self._interRegionCouplings: rMgr.setInterRegionConstraint(r1, r2, w) self._regionsNeedUpdate = False
[docs] def setDataSpace(self, **kwargs): """Set data space, e.g., DataContainer, times, coordinates.""" if self.fop is not None: pg.critical('in use?') self.fop.setDataSpace(**kwargs) else: data = kwargs.pop('dataContainer', None) if isinstance(data, pg.DataContainer): self.setDataContainer(data) else: print(data) pg.critical("nothing known to do? " "Implement me in derived classes")
[docs] def estimateError(self, data, **kwargs): """Create data error fallback when the data error is not known. Should be implemented method-specific. """ raise NotImplementedError("Needed?? Implement me in derived classes")
# data = data * (pg.randn(len(data)) * errPerc / 100. + 1.) # return data
[docs] def drawModel(self, ax, model, **kwargs): """Draw a model into a given axis.""" if self.fop is not None: pg.critical('in use?') self.fop.drawModel(ax, model, **kwargs) else: print(kwargs) raise NotImplementedError("No yet implemented")
[docs] def drawData(self, ax, data, **kwargs): """Draw data into a given axis.""" if self.fop is not None: self.fop.drawData(ax, data, **kwargs) else: print(kwargs) raise NotImplementedError("No yet implemented")
[docs] class LinearModelling(Modelling): """Modelling class for linearized problems with a given matrix."""
[docs] def __init__(self, A): """Initialize by storing the (reference to the) matrix.""" super().__init__() self.A = A self.setJacobian(self.A)
[docs] def response(self, model): """Linearized forward modelling by matrix-vector product.""" return self.A * model
[docs] def createJacobian(self, model): """Do not compute a jacobian (linear).""" pass
@property def parameterCount(self): """Define the number of parameters from the matrix size.""" return self.A.cols()
[docs] class Block1DModelling(Modelling): """General forward operator for 1D layered models. Model space: [thickness_i, parameter_jk], with i = 0 - nLayers-1, j = (0 .. nLayers), k=(0 .. nPara) """
[docs] def __init__(self, nPara=1, nLayers=4, **kwargs): """Initialize by setting up model space from layers and parameters. Parameters ---------- nLayers : int [4] Number of layers. nPara : int [1] Number of parameters per layer (e.g. nPara=2 for resistivity and phase) """ self._nLayers = 0 super().__init__(**kwargs) self._withMultiThread = True self._nPara = nPara # number of parameters per layer self.initModelSpace(nLayers)
@property def nPara(self): """Number of parameters.""" return self._nPara @property def nLayers(self): """Number of layers.""" return self._nLayers @nLayers.setter def nLayers(self, nLayers): """Set number of layers.""" return self.initModelSpace(nLayers)
[docs] def initModelSpace(self, nLayers): """Set number of layers for the 1D block model.""" if nLayers == self._nLayers: return self._nLayers = nLayers if nLayers < 2: pg.critical("Number of layers need to be at least 2") mesh = pg.meshtools.createMesh1DBlock(nLayers, self._nPara) self.clearRegionProperties() self.setMesh(mesh) # setting region 0 (layers) and 1..nPara (values) for i in range(1 + self._nPara): self.setRegionProperties(i, trans='log') if self._withMultiThread: self.setMultiThreadJacobian(2*nLayers - 1)
# self._applyRegionProperties()
[docs] def drawModel(self, ax, model, **kwargs): """Draw model into a given axis.""" pg.viewer.mpl.drawModel1D(ax=ax, model=model, plot='loglog', xlabel=kwargs.pop('xlabel', 'Model parameter'), **kwargs) return ax
[docs] def drawData(self, ax, data, err=None, label=None, **kwargs): """Draw the data by default method. Probably ugly and you should overwrite it in your derived class. Modelling creates the data and should know best how to draw them. """ nData = len(data) yVals = range(1, nData+1) ax.loglog(data, yVals, label=label, **DEFAULT_STYLES.get(label, DEFAULT_STYLES['Default']) ) if err is not None: ax.errorbar(data, yVals, xerr=err*data, label='Error', **DEFAULT_STYLES.get('Error', DEFAULT_STYLES['Default']) ) ax.set_ylim(max(yVals), min(yVals)) ax.set_xlabel('Data') ax.set_ylabel('Data Number') return ax
[docs] class MeshModelling(Modelling): """Modelling class with a mesh discretization."""
[docs] def __init__(self, **kwargs): super().__init__(**kwargs) self._axs = None self._meshNeedsUpdate = True self._baseMesh = None ## keep a copy of the original mesh # optional p2 refinement for forward task self._refineP2 = False self._refineH2 = True self._pd = None self._C = None # custom Constraints matrix
def __hash__(self): """Return unique hash for caching.""" return super().__hash__() ^ hash(self.mesh()) @property def paraDomain(self): """Return parameter (inverse) mesh.""" # We need our own copy here because its possible that we want to use # the mesh after the fop was deleted if not self.mesh(): pg.critical('paraDomain needs a mesh') self._pd = pg.Mesh(self.regionManager().paraDomain()) return self._pd
[docs] def setCustomConstraints(self, C): """Set custom constraints matrix for lazy evaluation. To remove them set it to 'None' again. """ self._C = C
[docs] def createConstraints(self): """Create constraint matrix.""" # just ensure there is valid mesh # self.mesh() foundGeoStat = False for reg, props in self.regionProperties().items(): if not props['background'] and \ props['correlationLengths'] is not None or \ props['dip'] is not None or props['strike'] is not None: cL = props.get('correlationLengths') or 5 dip = props.get('dip') or 0 strike = props.get('strike') or 0 pg.info('Creating GeostatisticConstraintsMatrix for region' + f' {reg} with: I={cL}, dip={dip}, strike={strike}') if foundGeoStat is True: pg.critical('Only one global GeostatisticConstraintsMatrix' 'possible at the moment.') # keep a copy of C until refcounting in the core works self._C = pg.matrix.GeostatisticConstraintsMatrix( mesh=self.paraDomain, I=cL, dip=dip, strike=strike, ) foundGeoStat = True self.setConstraints(self._C) if foundGeoStat is False: super().createConstraints() return self.constraints()
[docs] def paraModel(self, model): """Return parameter model, i.e. model mapped back with cell markers.""" mod = model[self.paraDomain.cellMarkers()] if isinstance(mod, np.ndarray): mod = pg.Vector(mod) # Else next line fails as np.array does not allow set attributes. mod.isParaModel = True return mod
[docs] def ensureContent(self): """Ensure there is a valid initialized mesh. Initialization means the cell marker are recounted and/or there was a mesh refinement or boundary enlargement, all to fit the needs for the method-depending forward problem. """ # Need to call this once to be sure the mesh is initialized when needed self.mesh()
[docs] def setMeshPost(self, mesh): """Set mesh, called when the mesh has been set successfully. Might be overwritten by child classes. """ pass
[docs] def createRefinedFwdMesh(self, mesh): """Refine the current mesh for higher accuracy. This is called automatic when accessing self.mesh() so it ensures any effect of changing region properties (background, single). """ m = pg.Mesh(mesh) if self._refineH2: pg.info("Creating refined mesh (H2) to solve forward task.") m = m.createH2() if self._refineP2: pg.info("Creating refined mesh (P2) to solve forward task.") m = m.createP2() pg.info("Mesh for forward task:", m) return m
[docs] def createFwdMesh_(self): """Create forward mesh.""" pg.info("Creating forward mesh from region infos.") m = pg.Mesh(self.regionManager().mesh()) regionIds = self.regionManager().regionIdxs() for iId in regionIds: r = self.regionManager().region(iId) pg.verbose(f"\tRegion: {iId}, " f"Parameter: {r.parameterCount()}, " f"PD: {r.isInParaDomain()}," f" Single: {r.isSingle()}, " f"Background: {r.isBackground()}, " f"Fixed: {r.fixValue()}") m = self.createRefinedFwdMesh(m) self.setMeshPost(m) self._regionChanged = False super().setMesh(m, ignoreRegionManager=True) if self._C is not None: pg.info('Set custom constraints matrix.') self.setConstraints(self._C)
[docs] def mesh(self): """Return currently used mesh.""" self._applyRegionProperties() if self._regionManagerInUse and self._regionChanged is True: self.createFwdMesh_() return super().mesh()
[docs] def setMesh(self, mesh, ignoreRegionManager=False): """Set mesh and specify whether region manager can be ignored.""" # pg._b('setMesh', id(mesh), mesh, ignoreRegionManager) # keep a copy, just in case self._baseMesh = mesh if ignoreRegionManager is False: self._regionManagerInUse = True # Modelling without region manager if ignoreRegionManager is True or not self._regionManagerInUse: self._regionManagerInUse = False super(Modelling, self).setMesh(mesh, ignoreRegionManager=True) self.setMeshPost(mesh) return # copy the mesh to the region manager who renumber cell markers self.clearRegionProperties() self.regionManager().setMesh(mesh) self.setDefaultBackground()
[docs] def setDefaultBackground(self): """Set the lowest region to background if several exist.""" regionIds = self.regionManager().regionIdxs() pg.info(f"Found {len(regionIds)} regions.") if len(regionIds) > 1: bk = pg.sort(regionIds)[0] pg.info("Region with smallest marker set to background " f"(marker={bk})") self.setRegionProperties(bk, background=True)
[docs] def drawModel(self, ax, model, **kwargs): """Draw the model as mesh-based distribution.""" mod = None # TODO needs to be checked if mapping is always ok (region example) # is (len(model) == self.paraDomain.cellCount() or \ if hasattr(model, "isParaModel") and model.isParaModel is False: # pg._y(model.isParaModel) mod = self.paraModel(model) elif hasattr(model, "isParaModel") and model.isParaModel is True: # pg._g(model.isParaModel) mod = model elif len(model) == self.paraDomain.nodeCount(): # why nodeCount? a field as model result? # pg._b('node count') mod = model elif len(model) == self.paraDomain.cellCount(): # pg._b('cell count') mod = model else: mod = self.paraModel(model) if ax is None: if self._axs is None: self._axs, _ = pg.show() ax = self._axs if hasattr(ax, '__cBar__'): # we assume the axes already holds a valid mappable and we only # update the model data cBar = ax.__cBar__ kwargs.pop('label', None) kwargs.pop('cMap', None) pg.viewer.mpl.setMappableData(cBar.mappable, mod, **kwargs) else: diam = kwargs.pop('diam', None) ax, cBar = pg.show(mesh=self.paraDomain, data=mod, label=kwargs.pop('label', 'Model parameter'), logScale=kwargs.pop('logScale', False), ax=ax, **kwargs ) if diam is not None: pg.viewer.mpl.drawSensors(ax, self.data.sensors(), diam=diam, edgecolor='black', facecolor='white') return ax, cBar
[docs] class PetroModelling(MeshModelling): """Combine petrophysical relation with the modelling class f(p). Combine petrophysical relation :math:`p(m)` with a modelling class :math:`f(p)` to invert for the petrophysical model :math:`p` instead of the geophysical model :math:`m`. :math:`p` be the petrophysical model, e.g., porosity, saturation, ... :math:`m` be the geophysical model, e.g., slowness, resistivity, ... """
[docs] def __init__(self, fop, petro, **kwargs): """Save forward class and transformation, create Jacobian matrix.""" self._f = fop # self._f createStartModel might be called and depends on the regionMgr self._f.regionManager = self.regionManager # self.createRefinedFwdMesh depends on refinement strategy of self._f self.createRefinedFwdMesh = self._f.createRefinedFwdMesh super().__init__(fop=None, **kwargs) # petroTrans.fwd(): p(m), petroTrans.inv(): m(p) self._petroTrans = petro # class defining p(m) self._jac = pg.matrix.MultRightMatrix(self._f.jacobian()) self.setJacobian(self._jac)
@property def petro(self): """Petrophysical model transformation.""" return self._petroTrans
[docs] def setMeshPost(self, mesh): """Set mesh after init.""" self._f.setMesh(mesh, ignoreRegionManager=True)
[docs] def setDataPost(self, data): """Set data after init.""" self._f.setData(data)
[docs] def createStartModel(self, data): """Use inverse transformation to get m(p) for the starting model.""" sm = self._f.createStartModel(data) pModel = self._petroTrans.inv(sm) return pModel
[docs] def response(self, model): """Use transformation to get p(m) and compute response f(p).""" tModel = self._petroTrans.fwd(model) ret = self._f.response(tModel) return ret
[docs] def createJacobian(self, model): r"""Fill the individual jacobian matrices. J = dF(m) / dm = dF(m) / dp * dp / dm """ tModel = self._petroTrans.fwd(model) self._f.createJacobian(tModel) self._jac.A = self._f.jacobian() self._jac.r = self._petroTrans.deriv(model) # set inner derivative # print(self._jac.A.rows(), self._jac.A.cols()) # print(self._jac.r) # pg._r("create Jacobian", self, self._jac) self.setJacobian(self._jac) # to be sure .. test if necessary
# 220817 to be changed later!! # class JointModelling(Modelling):
[docs] class JointModelling(MeshModelling): """Cumulative (joint) forward operator."""
[docs] def __init__(self, fopList): """Initialize with lists of forward operators.""" super().__init__() self.fops = fopList self.jac = pg.matrix.BlockMatrix() # self.modelTrans = self.fops[0].modelTrans self.modelTrans = pg.trans.TransLogLU() self.fops[0].regionManager() self.setRegionManager(self.fops[0].regionManagerRef())
[docs] def createStartModel(self, data): """Use inverse transformation to get m(p) for the starting model.""" sm = self._f.createStartModel(data) pModel = self._petroTrans.inv(sm) return pModel
[docs] def response(self, model): """Concatenate responses for all fops.""" resp = [] for f in self.fops: resp.extend(f.response(model)) return resp
[docs] def createJacobian(self, model): """Fill the individual Jacobian matrices.""" self.initJacobian() for f in self.fops: f.createJacobian(model)
[docs] def setData(self, data): """Distribute list of data to the forward operators.""" if len(data) != len(self.fops): pg.critical("Please provide data for all forward operators") self._data = data nData = 0 for i, fi in enumerate(self.fops): fi.setData(data[i]) self.jac.addMatrix(fi.jacobian(), nData, 0) nData += data[i].size() # update total vector length self.setJacobian(self.jac)
[docs] def setMesh(self, mesh, **kwargs): # to be removed from here """Set the parameter mesh to all fops.""" for fi in self.fops: fi.setMesh(mesh)
# 220817 to be implemented!! # class JointMeshModelling(JointModelling): # def __init__(self, fopList): # super().__init__(self, fopList) # self.setRegionManager(self.fops[0].regionManagerRef())
[docs] class LCModelling(Modelling): """2D Laterally constrained (LC) modelling. 2D Laterally constrained (LC) modelling based on BlockMatrices. """
[docs] def __init__(self, fop, **kwargs): """Parameters: fop class .""" super().__init__() self._singleRegion = False self._fopTemplate = fop self._fopKwargs = kwargs self._fops1D = [] self._mesh = None self._nSoundings = 0 self._parPerSounding = 0 self._jac = None self.soundingPos = None
[docs] def setDataBasis(self, **kwargs): """Set homogeneous data basis. Set a common data basis to all forward operators. If you want individual you need to set them manually. """ for f in self._fops1D: f.setDataBasis(**kwargs)
[docs] def initModelSpace(self, nLayers): """Initialize model space.""" for f in self._fops1D: f.initModelSpace(nLayers)
[docs] def createDefaultStartModel(self, models): """Create default starting model.""" sm = pg.Vector() for i, f in enumerate(self._fops1D): sm = pg.cat(sm, f.createDefaultStartModel(models[i])) return sm
[docs] def response(self, par): """Cut together forward responses of all soundings.""" mods = np.asarray(par).reshape(self._nSoundings, self._parPerSounding) resp = pg.Vector(0) for i in range(self._nSoundings): r = self._fops1D[i].response(mods[i]) # print("i:", i, mods[i], r) resp = pg.cat(resp, r) return resp
[docs] def createJacobian(self, par): """Create Jacobian matrix by creating individual Jacobians.""" mods = np.asarray(par).reshape(self._nSoundings, self._parPerSounding) for i in range(self._nSoundings): self._fops1D[i].createJacobian(mods[i])
[docs] def createParametrization(self, nSoundings, nLayers=4, nPar=1): """Create LCI mesh and suitable constraints informations. Parameters ---------- nLayers : int Numbers of depth layers nSoundings : int Numbers of 1D measurements to laterally constrain nPar : int Numbers of independent parameter types, e.g., nPar = 1 for VES (invert for resisitivies), nPar = 2 for VESC (invert for resisitivies and phases) """ nCols = (nPar+1) * nLayers - 1 # fail for VES-C self._parPerSounding = nCols self._nSoundings = nSoundings self._mesh = pg.meshtools.createMesh2D(range(nCols + 1), range(nSoundings + 1)) self._mesh.rotate(pg.RVector3(0, 0, -np.pi/2)) cm = np.ones(nCols * nSoundings) * 1 if not self._singleRegion: for i in range(nSoundings): for j in range(nPar): cm[i * self._parPerSounding + (j+1) * nLayers-1: i * self._parPerSounding + (j+2) * nLayers-1] += (j+1) self._mesh.setCellMarkers(cm) self.setMesh(self._mesh) pID = self.regionManager().paraDomain().cellMarkers() cID = [c.id() for c in self._mesh.cells()] # print(np.array(pID)) # print(np.array(cID)) # print(self.parameterCount perm = [0]*self.parameterCount for i in range(len(perm)): perm[pID[i]] = cID[i] # print(perm) self.regionManager().permuteParameterMarker(perm)
# print(self.regionManager().paraDomain().cellMarkers())
[docs] def initJacobian(self, dataVals, nLayers, nPar=None): """Initialize Jacobian matrix. Parameters ---------- dataVals : ndarray | RMatrix | list Data values of size (nSounding x Data per sounding). All data per sounding need to be equal in length. If they don't fit into a matrix use list of sounding data. """ nSoundings = len(dataVals) if nPar is None: # TODO get nPar Infos from fop._fopTemplate nPar = 1 self.createParametrization(nSoundings, nLayers=nLayers, nPar=nPar) if self._jac is not None: self._jac.clear() else: self._jac = pg.matrix.BlockMatrix() self.fops1D = [] nData = 0 for i in range(nSoundings): kwargs = {} for key, val in self._fopKwargs.items(): if hasattr(val, '__iter__'): if len(val) == nSoundings: kwargs[key] = val[i] else: kwargs[key] = val f = None if issubclass(self._fopTemplate, pg.frameworks.Modelling): f = self._fopTemplate(**kwargs) else: f = type(self._fopTemplate)(self.verbose, **kwargs) f.setMultiThreadJacobian(self._parPerSounding) self._fops1D.append(f) nID = self._jac.addMatrix(f.jacobian()) self._jac.addMatrixEntry(nID, nData, self._parPerSounding * i) nData += len(dataVals[i]) self._jac.recalcMatrixSize() # print("Jacobian size:", self.J.rows(), self.J.cols(), nData) self.setJacobian(self._jac)
[docs] def drawModel(self, ax, model, **kwargs): """Draw models as stitched 1D model section.""" mods = np.asarray(model).reshape(self._nSoundings, self._parPerSounding) pg.viewer.mpl.showStitchedModels(mods, ax=ax, useMesh=True, x=self.soundingPos, **kwargs)
[docs] class ParameterModelling(Modelling): """Model with symbolic parameter names instead of numbers."""
[docs] def __init__(self, funct=None, **kwargs): """Initialize, optionally with given function.""" self.function = None self._params = {} self.dataSpace = None # x, t freqs, or whatever self.defaultModelTrans = 'lin' super().__init__(**kwargs) if funct is not None: self._initFunction(funct)
@property def params(self): """Return number of parameters.""" return self._params def _initFunction(self, funct): """Init any function and interpret possible args and kwargs.""" self.function = funct # the first varname is suposed to be f or freqs self.dataSpaceName = funct.__code__.co_varnames[0] pg.debug('data space:', self.dataSpaceName) args = funct.__code__.co_varnames[1:funct.__code__.co_argcount] for varname in args: if varname != 'verbose': pg.debug('add parameter:', varname) self._params[varname] = 0.0 # nPara = len(self._params.keys()) # not used! # for i, [k, p] in enumerate(self._params.items()): for i, k in enumerate(self._params.keys()): self.addParameter(k, id=i, cType=0, single=True, trans=self.defaultModelTrans, startModel=1)
[docs] def response(self, params): """Compute and return model response.""" if np.isnan([*params]).any(): print(params) pg.critical('invalid params for response') if self.dataSpace is None: pg.critical('no data space given') ret = self.function(self.dataSpace, *params) return ret
[docs] def setRegionProperties(self, k, **kwargs): """Set Region Properties by parameter name.""" if isinstance(k, int) or (k == '*'): super().setRegionProperties(k, **kwargs) else: self.setRegionProperties(self._params[k], **kwargs)
[docs] def addParameter(self, name, id=None, **kwargs): """Add a parameter.""" if id is None: id = len(self._params) self._params[name] = id self.regionManager().addRegion(id) self.setRegionProperties(name, **kwargs) return id
[docs] def drawModel(self, ax, model): """Draw model.""" label = '' for k, p in self._params.items(): label += k + f"={pg.utils.prettyFloat(model[p])} " pg.info("Model: ", label)
[docs] class PriorModelling(MeshModelling): """Forward operator for grabbing values out of a mesh (prior data)."""
[docs] def __init__(self, mesh=None, pos=None, **kwargs): """Init with mesh and some positions that are converted into ids.""" super().__init__(**kwargs) self.pos = pos if mesh is not None: self.setMesh(mesh)
[docs] def setMesh(self, mesh): """Set mesh, save index vector and compute Jacobian.""" super().setMesh(mesh) self.ind = np.zeros(len(self.pos), dtype=np.int32) for i, po in enumerate(self.pos): cell = mesh.findCell(po) if cell is None: raise IndexError(f"Could not find cell at position {po}!") else: self.ind[i] = cell.id() # self.ind = np.array([mesh.findCell(po).id() for po in self.pos]) self.J = pg.SparseMapMatrix() self.J.resize(len(self.ind), mesh.cellCount()) for i, ii in enumerate(self.ind): self.J.setVal(i, ii, 1.0) self.setJacobian(self.J)
[docs] def response(self, model): """Return values at the indexed cells.""" return model[self.ind]
[docs] def createJacobian(self, model): """Do nothing (linear).""" pass
[docs] def createRefinedFwdMesh(self, mesh): """Create refined forward mesh: do nothing here to prevent this.""" return mesh