#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""pyGIMLi - Inversion Frameworks.
These are basic modelling proxies.
"""
import numpy as np
import pygimli as pg
DEFAULT_STYLES = {
'Default': {
'color': 'C0',
'lw': 1.5,
'linestyle': '-'
},
'Data': {
'color': 'C0', # blueish
'lw': 1.5,
'linestyle': ':',
'marker': 'o',
'ms': 6
},
'Response': {
'color': 'C0', # blueish
'lw': 2.0,
'linestyle': '-',
'marker': 'None',
'alpha': 0.4
},
'Error': {
'color': 'C3', # reddish
'lw': 0,
'linestyle': '-',
'elinewidth': 2,
'alpha': 0.5
},
}
[docs]
class Modelling(pg.core.ModellingBase):
"""Abstract Forward Operator.
Abstract Forward Operator that is or can use a Modelling instance.
Can be seen as some kind of proxy Forward Operator.
Todo
----
* Docu:
- describe members (model transformation,
dictionary of region properties)
* think about splitting all mesh related into MeshModelling
* clarify difference: setData(array|DC), setDataContainer(DC),
setDataValues(array)
* clarify dataSpace(comp. ModelSpace):
The unique spatial or temporal origin of a datapoint
(time, coordinates, receiver/transmitter indices, counter)
- Every inversion needs, dataValues and dataSpace
- DataContainer contain, dataValues and dataSpace
- initialize both with initDataSpace(), initModelSpace
* createJacobian should also return J
"""
[docs]
def __init__(self, **kwargs):
"""Initialize.
Attributes
----------
fop : pg.frameworks.Modelling
data : pg.DataContainer
modelTrans : [pg.trans.TransLog()]
Parameters
----------
**kwargs :
fop : Modelling
"""
self._fop = None # pg.frameworks.Modelling .. not needed .. remove it
self._data = None # dataContainer
self._modelTrans = None
self.fop = kwargs.pop('fop', None)
# super(Modelling, self).__init__(**kwargs)
super().__init__(**kwargs)
self._regionProperties = {}
self._interRegionCouplings = []
self._regionsNeedUpdate = False
self._regionChanged = True
self._regionManagerInUse = False
self.modelTrans = pg.trans.TransLog() # Model transformation operator
def __hash__(self):
"""Create a hash for Method Manager."""
# ^ pg.utils.dirHash(self._regionProperties)
if self._data is not None:
return pg.utils.strHash(str(type(self))) ^ hash(self._data)
else:
return pg.utils.strHash(str(type(self)))
[docs]
def Sx(self, x):
"""Right-hand side multiplication of Jacobian J*x.
By default, uses self.jacobian().mult(x)
Overwrite for efficient use with gradient-type inversion.
"""
return self.jacobian().mult(x)
[docs]
def STy(self, y):
"""Right-hand side multiplication of Jacobian J.T*y.
By default, uses self.jacobian().transMult(x)
Overwrite for efficient use with gradient-type inversion.
"""
return self.jacobian().transMult(y)
def __call__(self, *args, **kwargs):
"""Call forward operator."""
return self.response(*args, **kwargs)
@property
def fop(self):
"""Forward operator."""
return self._fop
@fop.setter
def fop(self, fop):
"""Set forward operator."""
if fop is not None:
if not isinstance(fop, pg.frameworks.Modelling):
pg.critical('Forward operator needs to be an instance of '
'pg.modelling.Modelling but is of type:', fop)
self._fop = fop
@property
def data(self):
"""Return data."""
return self._data
@data.setter
def data(self, d):
"""Set data (short property setter)."""
self.setData(d)
[docs]
def setData(self, data):
"""Set data (actual version)."""
if isinstance(data, pg.DataContainer):
self.setDataContainer(data)
else:
self._data = data
[docs]
def setDataPost(self, data):
"""Called when the dataContainer has been set sucessfully."""
pass
[docs]
def setDataContainer(self, data):
"""Set Data container."""
if self.fop is not None:
pg.critical('in use?')
self.fop.setData(data)
else:
super().setData(data)
self._data = data
self.setDataPost(self.data)
@property
def modelTrans(self):
"""Return model transformation."""
self._applyRegionProperties()
if self.regionManager().haveLocalTrans():
return self.regionManager().transModel()
return self._modelTrans
@modelTrans.setter
def modelTrans(self, tm):
"""Set model transformation."""
if isinstance(tm, str):
if tm.lower() == "log":
tm = pg.trans.TransLog()
elif tm.lower() == "linear" or tm.lower() == "lin":
tm = pg.trans.Trans()
else: # something like "10-1000"
raise Exception("Could not use transformation" + tm)
self._modelTrans = tm
[docs]
def regionManager(self):
"""Region manager."""
self._regionManagerInUse = True
# initialize RM if necessary
super().regionManager()
# set all local properties
self._applyRegionProperties()
return super().regionManager()
@property
def parameterCount(self):
"""Return parameter count."""
pC = self.regionManager().parameterCount()
if pC == 0:
pg.warn("Parameter count is 0")
return pC
[docs]
def ensureContent(self):
"""Whatever this is."""
pass
[docs]
def initModelSpace(self, **kwargs):
"""TODO."""
pass
[docs]
def createDefaultStartModel(self, dataVals):
"""Create the default startmodel as the median of the data values."""
pg.critical("'don't use me")
[docs]
def createStartModel(self, dataVals=None):
"""Create the default startmodel as the median of the data values.
Overwriting might be a good idea.
Its used by inversion to create a valid startmodel if there are
no starting values from the regions.
"""
if dataVals is not None:
mv = pg.math.median(dataVals)
pg.info("Use median(data values)={0}".format(mv))
sm = pg.Vector(self.parameterCount, mv)
else:
sm = self.regionManager().createStartModel()
return sm
[docs]
def clearRegionProperties(self):
"""Clear all region parameter."""
self._regionChanged = True
self._regionProperties = {}
[docs]
def regionProperties(self, regionNr=None):
"""Return dictionary of all properties for region number regionNr."""
if regionNr is None:
return self._regionProperties
try:
return self._regionProperties[regionNr]
except KeyError:
print(self._regionProperties)
pg.error("no region for region #:", regionNr)
[docs]
def setRegionProperties(self, regionNr, **kwargs):
"""Set region properties. regionNr can be '*' for all regions.
startModel=None, limits=None, trans=None,
cType=None, zWeight=None, modelControl=None,
background=None, fix=None, single=None,
correlationLengths=None, dip=None, strike=None
Parameters
----------
regionNr : int, [ints], '*'
Region number, list of numbers, or wildcard "*" for all.
startModel : float
starting model value
limits : [float, float]
lower and upper limit for value using a barrier transform
trans : str
transformation for model barrier: "log", "cot", "lin"
cType : int
constraint (regularization) type
zWeight : float
relative weight for vertical boundaries
background : bool
exclude region from inversion completely (prolongation)
fix : float
exclude region from inversion completely (fix to value)
single : bool
reduce region to one unknown
correlationLengths : [floats]
correlation lengths for geostatistical inversion (x', y', z')
dip : float [0]
angle between x and x' (first correlation length)
strike : float [0]
angle between y and y' (second correlation length)
"""
if regionNr == '*':
for regionNr in self.regionManager().regionIdxs():
self.setRegionProperties(regionNr, **kwargs)
return
elif isinstance(regionNr, (list, tuple)):
for r in regionNr:
self.setRegionProperties(r, **kwargs)
return
pg.verbose(f'Set property for region: {regionNr}: {kwargs}')
if regionNr not in self._regionProperties:
self._regionProperties[regionNr] = {'startModel': None,
'modelControl': 1.0,
'zWeight': 1.0,
'cType': None, # RM defaults
'limits': [0, 0],
'trans': 'Log', # RM defauts
'background': None,
'single': None,
'fix': None,
'correlationLengths': None,
'dip': None,
'strike': None,
}
for key in list(kwargs.keys()):
val = kwargs.pop(key)
if val is not None:
if self._regionProperties[regionNr][key] != val:
self._regionsNeedUpdate = True
self._regionProperties[regionNr][key] = val
if len(kwargs) > 0:
pg.warn('Unhandled region properties:', kwargs)
[docs]
def setInterRegionCoupling(self, region1, region2, weight=1.0):
"""Set the weighting for constraints across regions."""
if region1 == "*":
region1 = self.regionManager().regionIdxs()
else:
region1 = [region1]
if region2 == "*":
region2 = self.regionManager().regionIdxs()
else:
region2 = [region2]
for reg1 in region1:
for reg2 in region2:
if reg1 != reg2 and \
(not self._regionProperties[reg1]['background'] and
not self._regionProperties[reg2]['background']):
self._interRegionCouplings.append([reg1, reg2, weight])
self._regionsNeedUpdate = True
def _applyRegionProperties(self):
"""Apply the region properties from dictionary into the region man."""
if self._regionsNeedUpdate is False:
return
# call super class her because self.regionManager() calls allways
# __applyRegionProperies itself
rMgr = super().regionManager()
for rID, vals in self._regionProperties.items():
if vals['fix'] is not None:
if rMgr.region(rID).fixValue() != vals['fix']:
vals['background'] = True
rMgr.region(rID).setFixValue(vals['fix'])
self._regionChanged = True
if vals['background'] is not None:
if rMgr.region(rID).isBackground() != vals['background']:
rMgr.region(rID).setBackground(vals['background'])
self._regionChanged = True
if vals['single'] is not None:
if rMgr.region(rID).isSingle() != vals['single']:
rMgr.region(rID).setSingle(vals['single'])
self._regionChanged = True
if vals['startModel'] is not None:
rMgr.region(rID).setStartModel(vals['startModel'])
if vals['trans'] is not None:
rMgr.region(rID).setModelTransStr_(vals['trans'])
if vals['cType'] is not None:
if rMgr.region(rID).constraintType() != vals['cType']:
self.clearConstraints()
rMgr.region(rID).setConstraintType(vals['cType'])
if vals['zWeight'] is not None:
rMgr.region(rID).setZWeight(vals['zWeight'])
self.clearConstraints()
rMgr.region(rID).setModelControl(vals['modelControl'])
if vals['limits'][0] != 0:
rMgr.region(rID).setLowerBound(vals['limits'][0])
if vals['limits'][1] > vals['limits'][0]:
rMgr.region(rID).setUpperBound(vals['limits'][1])
if vals['correlationLengths'] is not None:
self.clearConstraints()
if vals['dip'] is not None:
self.clearConstraints()
if vals['strike'] is not None:
self.clearConstraints()
for r1, r2, w in self._interRegionCouplings:
rMgr.setInterRegionConstraint(r1, r2, w)
self._regionsNeedUpdate = False
[docs]
def setDataSpace(self, **kwargs):
"""Set data space, e.g., DataContainer, times, coordinates."""
if self.fop is not None:
pg.critical('in use?')
self.fop.setDataSpace(**kwargs)
else:
data = kwargs.pop('dataContainer', None)
if isinstance(data, pg.DataContainer):
self.setDataContainer(data)
else:
print(data)
pg.critical("nothing known to do? "
"Implement me in derived classes")
[docs]
def estimateError(self, data, **kwargs):
"""Create data error fallback when the data error is not known.
Should be implemented method-specific.
"""
raise Exception("Needed?? Implement me in derived classes")
# data = data * (pg.randn(len(data)) * errPerc / 100. + 1.)
# return data
[docs]
def drawModel(self, ax, model, **kwargs):
"""Draw a model into a given axis."""
if self.fop is not None:
pg.critical('in use?')
self.fop.drawModel(ax, model, **kwargs)
else:
print(kwargs)
raise Exception("No yet implemented")
[docs]
def drawData(self, ax, data, **kwargs):
"""Draw data into a given axis."""
if self.fop is not None:
self.fop.drawData(ax, data, **kwargs)
else:
print(kwargs)
raise Exception("No yet implemented")
[docs]
class LinearModelling(Modelling):
"""Modelling class for linearized problems with a given matrix."""
[docs]
def __init__(self, A):
"""Initialize by storing the (reference to the) matrix."""
super().__init__()
self.A = A
self.setJacobian(self.A)
[docs]
def response(self, model):
"""Linearized forward modelling by matrix-vector product."""
return self.A * model
[docs]
def createJacobian(self, model):
"""Do not compute a jacobian (linear)."""
pass
@property
def parameterCount(self):
"""Define the number of parameters from the matrix size."""
return self.A.cols()
[docs]
class Block1DModelling(Modelling):
"""General forward operator for 1D layered models.
Model space: [thickness_i, parameter_jk],
with i = 0 - nLayers-1, j = (0 .. nLayers), k=(0 .. nPara)
"""
[docs]
def __init__(self, nPara=1, nLayers=4, **kwargs):
"""Constructor.
Parameters
----------
nLayers : int [4]
Number of layers.
nPara : int [1]
Number of parameters per layer
(e.g. nPara=2 for resistivity and phase)
"""
self._nLayers = 0
super(Block1DModelling, self).__init__(**kwargs)
self._withMultiThread = True
self._nPara = nPara # number of parameters per layer
self.initModelSpace(nLayers)
@property
def nPara(self):
"""Number of parameters."""
return self._nPara
@property
def nLayers(self):
"""Number of layers."""
return self._nLayers
@nLayers.setter
def nLayers(self, nLayers):
"""Set number of layers."""
return self.initModelSpace(nLayers)
[docs]
def initModelSpace(self, nLayers):
"""Set number of layers for the 1D block model."""
if nLayers == self._nLayers:
return
self._nLayers = nLayers
if nLayers < 2:
pg.critical("Number of layers need to be at least 2")
mesh = pg.meshtools.createMesh1DBlock(nLayers, self._nPara)
self.clearRegionProperties()
self.setMesh(mesh)
# setting region 0 (layers) and 1..nPara (values)
for i in range(1 + self._nPara):
self.setRegionProperties(i, trans='log')
if self._withMultiThread:
self.setMultiThreadJacobian(2*nLayers - 1)
# self._applyRegionProperties()
[docs]
def drawModel(self, ax, model, **kwargs):
"""Draw model into a given axis."""
pg.viewer.mpl.drawModel1D(ax=ax,
model=model,
plot='loglog',
xlabel=kwargs.pop('xlabel',
'Model parameter'),
**kwargs)
return ax
[docs]
def drawData(self, ax, data, err=None, label=None, **kwargs):
"""Default data view.
Modelling creates the data and should know best how to draw them.
Probably ugly and you should overwrite it in your derived forward
operator.
"""
nData = len(data)
yVals = range(1, nData+1)
ax.loglog(data, yVals,
label=label,
**DEFAULT_STYLES.get(label, DEFAULT_STYLES['Default'])
)
if err is not None:
ax.errorbar(data, yVals,
xerr=err*data,
label='Error',
**DEFAULT_STYLES.get('Error',
DEFAULT_STYLES['Default'])
)
ax.set_ylim(max(yVals), min(yVals))
ax.set_xlabel('Data')
ax.set_ylabel('Data Number')
return ax
[docs]
class MeshModelling(Modelling):
"""Modelling class with a mesh discretization."""
[docs]
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._axs = None
self._meshNeedsUpdate = True
self._baseMesh = None
# optional p2 refinement for forward task
self._refineP2 = False
self._refineH2 = True
self._pd = None
self._C = None # custom Constraints matrix
def __hash__(self):
"""Unique hash for caching."""
return super().__hash__() ^ hash(self.mesh())
@property
def mesh(self):
"""Return mesh."""
pg._r("inuse ?")
if self._fop is not None:
pg._r("inuse ?")
return self._fop.mesh
else:
return self.mesh()
@property
def paraDomain(self):
"""Return parameter (inverse) mesh."""
# We need our own copy here because its possible that we want to use
# the mesh after the fop was deleted
if not self.mesh():
pg.critical('paraDomain needs a mesh')
self._pd = pg.Mesh(self.regionManager().paraDomain())
return self._pd
[docs]
def setCustomConstraints(self, C):
""" Set custom constraints matrix for lazy evaluation.
To remove them set it to 'None' again.
"""
self._C = C
[docs]
def createConstraints(self):
"""Create constraint matrix."""
# just ensure there is valid mesh
# self.mesh()
foundGeoStat = False
for reg, props in self.regionProperties().items():
if not props['background'] and \
props['correlationLengths'] is not None or \
props['dip'] is not None or props['strike'] is not None:
cL = props.get('correlationLengths') or 5
dip = props.get('dip') or 0
strike = props.get('strike') or 0
pg.info('Creating GeostatisticConstraintsMatrix for region' +
f' {reg} with: I={cL}, dip={dip}, strike={strike}')
if foundGeoStat is True:
pg.critical('Only one global GeostatisticConstraintsMatrix'
'possible at the moment.')
# keep a copy of C until refcounting in the core works
self._C = pg.matrix.GeostatisticConstraintsMatrix(
mesh=self.paraDomain, I=cL, dip=dip, strike=strike,
)
foundGeoStat = True
self.setConstraints(self._C)
if foundGeoStat is False:
super().createConstraints()
return self.constraints()
[docs]
def paraModel(self, model):
"""Return parameter model, i.e. model mapped back with cell markers."""
mod = model[self.paraDomain.cellMarkers()]
if isinstance(mod, np.ndarray):
mod = pg.Vector(mod)
# Else next line fails as np.array does not allow set attributes.
mod.isParaModel = True
return mod
[docs]
def ensureContent(self):
"""Internal function to ensure there is a valid initialized mesh.
Initialization means the cell marker are recounted and/or there was a
mesh refinement or boundary enlargement, all to fit the needs for the
method-depending forward problem.
"""
# Need to call this once to be sure the mesh is initialized when needed
self.mesh()
[docs]
def setMeshPost(self, mesh):
"""Interface to be called when the mesh has been set successfully.
Might be overwritten by child classes.
"""
pass
[docs]
def createRefinedFwdMesh(self, mesh):
"""Refine the current mesh for higher accuracy.
This is called automatic when accessing self.mesh() so it ensures any
effect of changing region properties (background, single).
"""
m = pg.Mesh(mesh)
if self._refineH2:
pg.info("Creating refined mesh (H2) to solve forward task.")
m = m.createH2()
if self._refineP2:
pg.info("Creating refined mesh (P2) to solve forward task.")
m = m.createP2()
pg.info("Mesh for forward task:", m)
return m
[docs]
def createFwdMesh_(self):
"""Create forward mesh."""
pg.info("Creating forward mesh from region infos.")
m = pg.Mesh(self.regionManager().mesh())
regionIds = self.regionManager().regionIdxs()
for iId in regionIds:
pg.verbose("\tRegion: {0}, Parameter: {1}, PD: {2},"
" Single: {3}, Background: {4}, Fixed: {5}".format(
iId,
self.regionManager().region(iId).parameterCount(),
self.regionManager().region(iId).isInParaDomain(),
self.regionManager().region(iId).isSingle(),
self.regionManager().region(iId).isBackground(),
self.regionManager().region(iId).fixValue()))
m = self.createRefinedFwdMesh(m)
self.setMeshPost(m)
self._regionChanged = False
super().setMesh(m, ignoreRegionManager=True)
if self._C is not None:
pg.info('Set custom constraints matrix.')
self.setConstraints(self._C)
[docs]
def mesh(self):
"""Returns the currently used mesh."""
self._applyRegionProperties()
if self._regionManagerInUse and self._regionChanged is True:
self.createFwdMesh_()
return super().mesh()
[docs]
def setMesh(self, mesh, ignoreRegionManager=False):
"""Set mesh and specify whether region manager can be ignored."""
# keep a copy, just in case
self._baseMesh = mesh
if ignoreRegionManager is False:
self._regionManagerInUse = True
# Modelling without region manager
if ignoreRegionManager is True or not self._regionManagerInUse:
self._regionManagerInUse = False
if self.fop is not None:
pg.critical('in use?')
self.fop.setMesh(mesh, ignoreRegionManager=True)
else:
super(Modelling, self).setMesh(mesh, ignoreRegionManager=True)
pass
self.setMeshPost(mesh)
return
# copy the mesh to the region manager who renumber cell markers
self.clearRegionProperties()
self.regionManager().setMesh(mesh)
self.setDefaultBackground()
[docs]
def setDefaultBackground(self):
"""Set the lowest region to background if several exist."""
regionIds = self.regionManager().regionIdxs()
pg.info("Found {} regions.".format(len(regionIds)))
if len(regionIds) > 1:
bk = pg.sort(regionIds)[0]
pg.info("Region with smallest marker set to background "
"(marker={0})".format(bk))
self.setRegionProperties(bk, background=True)
[docs]
def drawModel(self, ax, model, **kwargs):
"""Draw the model as mesh-based distribution."""
mod = None
# TODO needs to be checked if mapping is always ok (region example)
# is (len(model) == self.paraDomain.cellCount() or \
if hasattr(model, "isParaModel") and model.isParaModel is False:
# pg._y(model.isParaModel)
mod = self.paraModel(model)
elif hasattr(model, "isParaModel") and model.isParaModel is True:
# pg._g(model.isParaModel)
mod = model
elif len(model) == self.paraDomain.nodeCount():
# why nodeCount? a field as model result?
# pg._b('node count')
mod = model
elif len(model) == self.paraDomain.cellCount():
# pg._b('cell count')
mod = model
else:
mod = self.paraModel(model)
if ax is None:
if self._axs is None:
self._axs, _ = pg.show()
ax = self._axs
if hasattr(ax, '__cBar__'):
# we assume the axes already holds a valid mappable and we only
# update the model data
cBar = ax.__cBar__
kwargs.pop('label', None)
kwargs.pop('cMap', None)
pg.viewer.mpl.setMappableData(cBar.mappable, mod, **kwargs)
else:
diam = kwargs.pop('diam', None)
ax, cBar = pg.show(mesh=self.paraDomain,
data=mod,
label=kwargs.pop('label', 'Model parameter'),
logScale=kwargs.pop('logScale', False),
ax=ax,
**kwargs
)
if diam is not None:
pg.viewer.mpl.drawSensors(ax, self.data.sensors(), diam=diam,
edgecolor='black', facecolor='white')
return ax, cBar
[docs]
class PetroModelling(MeshModelling):
"""Combine petrophysical relation with the modelling class f(p).
Combine petrophysical relation :math:`p(m)` with a modelling class
:math:`f(p)` to invert for the petrophysical model :math:`p` instead
of the geophysical model :math:`m`.
:math:`p` be the petrophysical model, e.g., porosity, saturation, ...
:math:`m` be the geophysical model, e.g., slowness, resistivity, ...
"""
[docs]
def __init__(self, fop, petro, **kwargs):
"""Save forward class and transformation, create Jacobian matrix."""
self._f = fop
# self._f createStartModel might be called and depends on the regionMgr
self._f.regionManager = self.regionManager
# self.createRefinedFwdMesh depends on refinement strategy of self._f
self.createRefinedFwdMesh = self._f.createRefinedFwdMesh
super(PetroModelling, self).__init__(fop=None, **kwargs)
# petroTrans.fwd(): p(m), petroTrans.inv(): m(p)
self._petroTrans = petro # class defining p(m)
self._jac = pg.matrix.MultRightMatrix(self._f.jacobian())
self.setJacobian(self._jac)
@property
def petro(self):
"""Petrophysical model transformation."""
return self._petroTrans
[docs]
def setMeshPost(self, mesh):
"""Set mesh after init."""
self._f.setMesh(mesh, ignoreRegionManager=True)
[docs]
def setDataPost(self, data):
"""Set data after init."""
self._f.setData(data)
[docs]
def createStartModel(self, data):
"""Use inverse transformation to get m(p) for the starting model."""
sm = self._f.createStartModel(data)
pModel = self._petroTrans.inv(sm)
return pModel
[docs]
def response(self, model):
"""Use transformation to get p(m) and compute response f(p)."""
tModel = self._petroTrans.fwd(model)
ret = self._f.response(tModel)
return ret
[docs]
def createJacobian(self, model):
r"""Fill the individual jacobian matrices.
J = dF(m) / dm = dF(m) / dp * dp / dm
"""
tModel = self._petroTrans.fwd(model)
self._f.createJacobian(tModel)
self._jac.A = self._f.jacobian()
self._jac.r = self._petroTrans.deriv(model) # set inner derivative
# print(self._jac.A.rows(), self._jac.A.cols())
# print(self._jac.r)
# pg._r("create Jacobian", self, self._jac)
self.setJacobian(self._jac) # to be sure .. test if necessary
# 220817 to be changed later!!
# class JointModelling(Modelling):
[docs]
class JointModelling(MeshModelling):
"""Cumulative (joint) forward operator."""
[docs]
def __init__(self, fopList):
"""Initialize with lists of forward operators."""
super().__init__()
self.fops = fopList
self.jac = pg.matrix.BlockMatrix()
# self.modelTrans = self.fops[0].modelTrans
self.modelTrans = pg.trans.TransLogLU()
self.fops[0].regionManager()
self.setRegionManager(self.fops[0].regionManagerRef())
[docs]
def createStartModel(self, data):
"""Use inverse transformation to get m(p) for the starting model."""
sm = self._f.createStartModel(data)
pModel = self._petroTrans.inv(sm)
return pModel
[docs]
def response(self, model):
"""Concatenate responses for all fops."""
resp = []
for f in self.fops:
resp.extend(f.response(model))
return resp
[docs]
def createJacobian(self, model):
"""Fill the individual Jacobian matrices."""
self.initJacobian()
for f in self.fops:
f.createJacobian(model)
[docs]
def setData(self, data):
"""Distribute list of data to the forward operators."""
if len(data) != len(self.fops):
pg.critical("Please provide data for all forward operators")
self._data = data
nData = 0
for i, fi in enumerate(self.fops):
fi.setData(data[i])
self.jac.addMatrix(fi.jacobian(), nData, 0)
nData += data[i].size() # update total vector length
self.setJacobian(self.jac)
[docs]
def setMesh(self, mesh, **kwargs): # to be removed from here
"""Set the parameter mesh to all fops."""
for fi in self.fops:
fi.setMesh(mesh)
# 220817 to be implemented!!
# class JointMeshModelling(JointModelling):
# def __init__(self, fopList):
# super().__init__(self, fopList)
# self.setRegionManager(self.fops[0].regionManagerRef())
[docs]
class LCModelling(Modelling):
"""2D Laterally constrained (LC) modelling.
2D Laterally constrained (LC) modelling based on BlockMatrices.
"""
[docs]
def __init__(self, fop, **kwargs):
"""Parameters: fop class ."""
super(LCModelling, self).__init__()
self._singleRegion = False
self._fopTemplate = fop
self._fopKwargs = kwargs
self._fops1D = []
self._mesh = None
self._nSoundings = 0
self._parPerSounding = 0
self._jac = None
self.soundingPos = None
[docs]
def setDataBasis(self, **kwargs):
"""Set homogeneous data basis.
Set a common data basis to all forward operators.
If you want individual you need to set them manually.
"""
for f in self._fops1D:
f.setDataBasis(**kwargs)
[docs]
def initModelSpace(self, nLayers):
"""Initialize model space."""
for i, f in enumerate(self._fops1D):
f.initModelSpace(nLayers)
[docs]
def createDefaultStartModel(self, models):
"""Create default starting model."""
sm = pg.Vector()
for i, f in enumerate(self._fops1D):
sm = pg.cat(sm, f.createDefaultStartModel(models[i]))
return sm
[docs]
def response(self, par):
"""Cut together forward responses of all soundings."""
mods = np.asarray(par).reshape(self._nSoundings, self._parPerSounding)
resp = pg.Vector(0)
for i in range(self._nSoundings):
r = self._fops1D[i].response(mods[i])
# print("i:", i, mods[i], r)
resp = pg.cat(resp, r)
return resp
[docs]
def createJacobian(self, par):
"""Create Jacobian matrix by creating individual Jacobians."""
mods = np.asarray(par).reshape(self._nSoundings, self._parPerSounding)
for i in range(self._nSoundings):
self._fops1D[i].createJacobian(mods[i])
[docs]
def createParametrization(self, nSoundings, nLayers=4, nPar=1):
"""Create LCI mesh and suitable constraints informations.
Parameters
----------
nLayers : int
Numbers of depth layers
nSoundings : int
Numbers of 1D measurements to laterally constrain
nPar : int
Numbers of independent parameter types,
e.g., nPar = 1 for VES (invert for resisitivies),
nPar = 2 for VESC (invert for resisitivies and phases)
"""
nCols = (nPar+1) * nLayers - 1 # fail for VES-C
self._parPerSounding = nCols
self._nSoundings = nSoundings
self._mesh = pg.meshtools.createMesh2D(range(nCols + 1),
range(nSoundings + 1))
self._mesh.rotate(pg.RVector3(0, 0, -np.pi/2))
cm = np.ones(nCols * nSoundings) * 1
if not self._singleRegion:
for i in range(nSoundings):
for j in range(nPar):
cm[i * self._parPerSounding + (j+1) * nLayers-1:
i * self._parPerSounding + (j+2) * nLayers-1] += (j+1)
self._mesh.setCellMarkers(cm)
self.setMesh(self._mesh)
pID = self.regionManager().paraDomain().cellMarkers()
cID = [c.id() for c in self._mesh.cells()]
# print(np.array(pID))
# print(np.array(cID))
# print(self.parameterCount
perm = [0]*self.parameterCount
for i in range(len(perm)):
perm[pID[i]] = cID[i]
# print(perm)
self.regionManager().permuteParameterMarker(perm)
# print(self.regionManager().paraDomain().cellMarkers())
[docs]
def initJacobian(self, dataVals, nLayers, nPar=None):
"""Initialize Jacobian matrix.
Parameters
----------
dataVals : ndarray | RMatrix | list
Data values of size (nSounding x Data per sounding).
All data per sounding need to be equal in length.
If they don't fit into a matrix use list of sounding data.
"""
nSoundings = len(dataVals)
if nPar is None:
# TODO get nPar Infos from fop._fopTemplate
nPar = 1
self.createParametrization(nSoundings, nLayers=nLayers, nPar=nPar)
if self._jac is not None:
self._jac.clear()
else:
self._jac = pg.matrix.BlockMatrix()
self.fops1D = []
nData = 0
for i in range(nSoundings):
kwargs = {}
for key, val in self._fopKwargs.items():
if hasattr(val, '__iter__'):
if len(val) == nSoundings:
kwargs[key] = val[i]
else:
kwargs[key] = val
f = None
if issubclass(self._fopTemplate, pg.frameworks.Modelling):
f = self._fopTemplate(**kwargs)
else:
f = type(self._fopTemplate)(self.verbose, **kwargs)
f.setMultiThreadJacobian(self._parPerSounding)
self._fops1D.append(f)
nID = self._jac.addMatrix(f.jacobian())
self._jac.addMatrixEntry(nID, nData, self._parPerSounding * i)
nData += len(dataVals[i])
self._jac.recalcMatrixSize()
# print("Jacobian size:", self.J.rows(), self.J.cols(), nData)
self.setJacobian(self._jac)
[docs]
def drawModel(self, ax, model, **kwargs):
"""Draw models as stitched 1D model section."""
mods = np.asarray(model).reshape(self._nSoundings,
self._parPerSounding)
pg.viewer.mpl.showStitchedModels(mods, ax=ax, useMesh=True,
x=self.soundingPos,
**kwargs)
[docs]
class ParameterModelling(Modelling):
"""Model with symbolic parameter names instead of numbers."""
[docs]
def __init__(self, funct=None, **kwargs):
"""Initialize, optionally with given function."""
self.function = None
self._params = {}
self.dataSpace = None # x, t freqs, or whatever
self.defaultModelTrans = 'lin'
super(ParameterModelling, self).__init__(**kwargs)
if funct is not None:
self._initFunction(funct)
@property
def params(self):
"""Return number of parameters."""
return self._params
def _initFunction(self, funct):
"""Init any function and interpret possible args and kwargs."""
self.function = funct
# the first varname is suposed to be f or freqs
self.dataSpaceName = funct.__code__.co_varnames[0]
pg.debug('data space:', self.dataSpaceName)
args = funct.__code__.co_varnames[1:funct.__code__.co_argcount]
for varname in args:
if varname != 'verbose':
pg.debug('add parameter:', varname)
self._params[varname] = 0.0
# nPara = len(self._params.keys()) # not used!
for i, [k, p] in enumerate(self._params.items()):
self.addParameter(k, id=i, cType=0,
single=True,
trans=self.defaultModelTrans,
startModel=1)
[docs]
def response(self, params):
"""Compute and return model response."""
if np.isnan([*params]).any():
print(params)
pg.critical('invalid params for response')
if self.dataSpace is None:
pg.critical('no data space given')
ret = self.function(self.dataSpace, *params)
return ret
[docs]
def setRegionProperties(self, k, **kwargs):
"""Set Region Properties by parameter name."""
if isinstance(k, int) or (k == '*'):
super(ParameterModelling, self).setRegionProperties(k, **kwargs)
else:
self.setRegionProperties(self._params[k], **kwargs)
[docs]
def addParameter(self, name, id=None, **kwargs):
"""Add a parameter."""
if id is None:
id = len(self._params)
self._params[name] = id
self.regionManager().addRegion(id)
self.setRegionProperties(name, **kwargs)
return id
[docs]
def drawModel(self, ax, model):
"""Draw model."""
label = ''
for k, p in self._params.items():
label += k + "={0} ".format(pg.utils.prettyFloat(model[p]))
pg.info("Model: ", label)
[docs]
class PriorModelling(MeshModelling):
"""Forward operator for grabbing values out of a mesh (prior data)."""
[docs]
def __init__(self, mesh=None, pos=None, **kwargs):
"""Init with mesh and some positions that are converted into ids."""
super().__init__(**kwargs)
self.pos = pos
if mesh is not None:
self.setMesh(mesh)
[docs]
def setMesh(self, mesh):
"""Set mesh, save index vector and compute Jacobian."""
super().setMesh(mesh)
self.ind = np.zeros(len(self.pos), dtype=np.int32)
for i, po in enumerate(self.pos):
cell = mesh.findCell(po)
if cell is None:
raise IndexError(f"Could not find cell at position {po}!")
else:
self.ind[i] = cell.id()
# self.ind = np.array([mesh.findCell(po).id() for po in self.pos])
self.J = pg.SparseMapMatrix()
self.J.resize(len(self.ind), mesh.cellCount())
for i, ii in enumerate(self.ind):
self.J.setVal(i, ii, 1.0)
self.setJacobian(self.J)
[docs]
def response(self, model):
"""Return values at the indexed cells."""
return model[self.ind]
[docs]
def createJacobian(self, model):
"""Do nothing (linear)."""
pass
[docs]
def createRefinedFwdMesh(self, mesh):
"""Create refined forward mesh: do nothing here to prevent this."""
return mesh