"""pyGIMLi Modelling Frameworks."""
import numpy as np
import pygimli as pg
DEFAULT_STYLES = {
'Default': {
'color': 'C0',
'lw': 1.5,
'linestyle': '-'
},
'Data': {
'color': 'C0', # blueish
'lw': 1.5,
'linestyle': ':',
'marker': 'o',
'ms': 6
},
'Response': {
'color': 'C0', # blueish
'lw': 2.0,
'linestyle': '-',
'marker': 'None',
'alpha': 0.4
},
'Error': {
'color': 'C3', # reddish
'lw': 0,
'linestyle': '-',
'elinewidth': 2,
'alpha': 0.5
},
}
[docs]
class Modelling(pg.core.ModellingBase):
"""Abstract Forward Operator.
Abstract Forward Operator, i.e. base class for deriving forward operators
that are or that use a Modelling instance.
Todo
----
* Doc:
- describe members (model transformation,
dictionary of region properties)
* think about splitting all mesh related into MeshModelling
* clarify difference: setData(array|DC), setDataContainer(DC),
setDataValues(array)
* clarify dataSpace(comp. ModelSpace):
The unique spatial or temporal origin of a datapoint
(time, coordinates, receiver/transmitter indices, counter)
- Every inversion needs, dataValues and dataSpace
- DataContainer contain, dataValues and dataSpace
- initialize both with initDataSpace(), initModelSpace
* createJacobian should also return J
"""
[docs]
def __init__(self, **kwargs):
"""Initialize.
Attributes
----------
fop : pg.frameworks.Modelling
data : pg.DataContainer
modelTrans : [pg.trans.TransLog()]
Keyword Arguments
-----------------
**kwargs:
fop: Modelling
"""
self._fop = None # pg.frameworks.Modelling .. not needed .. remove it
self._data = None # dataContainer
self._modelTrans = None
self.fop = kwargs.pop('fop', None)
# super(Modelling, self).__init__(**kwargs)
super().__init__(**kwargs)
self._regionProperties = {}
self._interRegionCouplings = []
self._regionsNeedUpdate = False
self._regionChanged = True
self._regionManagerInUse = False
self.modelTrans = pg.trans.TransLog() # Model transformation operator
def __hash__(self):
"""Create a hash for Method Manager."""
# ^ pg.utils.dirHash(self._regionProperties)
if self._data is not None:
return pg.utils.strHash(str(type(self))) ^ hash(self._data)
else:
return pg.utils.strHash(str(type(self)))
[docs]
def Sx(self, x):
"""Right-hand side multiplication of Jacobian J*x.
By default, uses self.jacobian().mult(x)
Overwrite for efficient use with gradient-type inversion.
"""
return self.jacobian().mult(x)
[docs]
def STy(self, y):
"""Right-hand side multiplication of Jacobian J.T*y.
By default, uses self.jacobian().transMult(x)
Overwrite for efficient use with gradient-type inversion.
"""
return self.jacobian().transMult(y)
def __call__(self, *args, **kwargs):
"""Call forward operator."""
return self.response(*args, **kwargs)
@property
def fop(self):
"""Forward operator."""
return self._fop
@fop.setter
def fop(self, fop):
"""Set forward operator."""
if fop is not None:
if not isinstance(fop, pg.frameworks.Modelling):
pg.critical('Forward operator needs to be an instance of '
'pg.modelling.Modelling but is of type:', fop)
self._fop = fop
@property
def data(self):
"""Return data."""
return self._data
@data.setter
def data(self, d):
"""Set data (short property setter)."""
self.setData(d)
[docs]
def setData(self, data):
"""Set data (actual version)."""
if isinstance(data, pg.DataContainer):
self.setDataContainer(data)
else:
self._data = data
[docs]
def setDataPost(self, data):
"""Call when the dataContainer has been set successfully."""
pass
[docs]
def setDataContainer(self, data):
"""Set Data container."""
if self.fop is not None:
pg.critical('in use?')
self.fop.setData(data)
else:
super().setData(data)
self._data = data
self.setDataPost(self.data)
@property
def modelTrans(self):
"""Return model transformation."""
self._applyRegionProperties()
if self.regionManager().haveLocalTrans():
return self.regionManager().transModel()
return self._modelTrans
@modelTrans.setter
def modelTrans(self, tm):
"""Set model transformation."""
if isinstance(tm, str):
tm = pg.core.trans.str2Trans(tm)
self._modelTrans = tm
[docs]
def regionManager(self):
"""Region manager."""
self._regionManagerInUse = True
# initialize RM if necessary
super().regionManager()
# set all local properties
self._applyRegionProperties()
return super().regionManager()
@property
def parameterCount(self):
"""Return parameter count."""
pC = self.regionManager().parameterCount()
if pC == 0:
pg.warn("Parameter count is 0")
return pC
[docs]
def ensureContent(self):
"""Whatever this is."""
pass
[docs]
def initModelSpace(self, **kwargs):
"""TODO."""
pass
[docs]
def createDefaultStartModel(self, dataVals):
"""Create the default startmodel as the median of the data values."""
pg.critical("'don't use me")
[docs]
def createStartModel(self, dataVals=None):
"""Create the default startmodel as the median of the data values.
Overwriting might be a good idea.
Its used by inversion to create a valid startmodel if there are
no starting values from the regions.
"""
if dataVals is not None:
mv = pg.math.median(dataVals)
pg.info(f"Use median(data values)={mv}")
sm = pg.Vector(self.parameterCount, mv)
else:
sm = self.regionManager().createStartModel()
return sm
[docs]
def clearRegionProperties(self):
"""Clear all region parameter."""
self._regionChanged = True
self._regionProperties = {}
[docs]
def regionProperties(self, regionNr=None):
"""Return dictionary of all properties for region number regionNr."""
if regionNr is None:
return self._regionProperties
try:
return self._regionProperties[regionNr]
except KeyError:
print(self._regionProperties)
pg.error("no region for region #:", regionNr)
[docs]
def setRegionProperties(self, regionNr, **kwargs):
"""Set region properties. regionNr can be '*' for all regions.
startModel=None, limits=None, trans=None,
cType=None, zWeight=None, modelControl=None,
background=None, fix=None, single=None,
correlationLengths=None, dip=None, strike=None
Parameters
----------
regionNr : int, [ints], '*'
Region number, list of numbers, or wildcard "*" for all.
startModel : float
starting model value
limits : [float, float]
lower and upper limit for value using a barrier transform
trans : str
transformation for model barrier: "log", "cot", "lin"
cType : int
constraint (regularization) type
zWeight : float
relative weight for vertical boundaries
background : bool
exclude region from inversion completely (prolongation)
fix : float
exclude region from inversion completely (fix to value)
single : bool
reduce region to one unknown
correlationLengths : [floats]
correlation lengths for geostatistical inversion (x', y', z')
dip : float [0]
angle between x and x' (first correlation length)
strike : float [0]
angle between y and y' (second correlation length)
"""
if regionNr == '*':
for regionNr in self.regionManager().regionIdxs():
self.setRegionProperties(regionNr, **kwargs)
return
elif isinstance(regionNr, (list, tuple)):
for r in regionNr:
self.setRegionProperties(r, **kwargs)
return
pg.verbose(f'Set property for region: {regionNr}: {kwargs}')
if regionNr not in self._regionProperties:
self._regionProperties[regionNr] = {'startModel': None,
'modelControl': 1.0,
'zWeight': 1.0,
'cType': None, # RM defaults
'limits': [0, 0],
'trans': 'Log', # RM defauts
'background': None,
'single': None,
'fix': None,
'correlationLengths': None,
'dip': None,
'strike': None,
}
for key in list(kwargs.keys()):
val = kwargs.pop(key)
if val is not None and self._regionProperties[regionNr][key] != val:
self._regionsNeedUpdate = True
self._regionProperties[regionNr][key] = val
if len(kwargs) > 0:
pg.warn('Unhandled region properties:', kwargs)
[docs]
def setInterRegionCoupling(self, region1, region2, weight=1.0):
"""Set the weighting for constraints across regions."""
region1 = self.regionManager().regionIdxs() if region1 == "*" else [region1]
region2 = self.regionManager().regionIdxs() if region2 == "*" else [region2]
for reg1 in region1:
for reg2 in region2:
if reg1 != reg2 and \
(not self._regionProperties[reg1]['background'] and
not self._regionProperties[reg2]['background']):
self._interRegionCouplings.append([reg1, reg2, weight])
self._regionsNeedUpdate = True
def _applyRegionProperties(self):
"""Apply the region properties from dictionary into the region man."""
if self._regionsNeedUpdate is False:
return
# call super class her because self.regionManager() calls allways
# __applyRegionProperies itself
rMgr = super().regionManager()
for rID, vals in self._regionProperties.items():
if vals['fix'] is not None and rMgr.region(rID).fixValue() != vals['fix']:
vals['background'] = True
rMgr.region(rID).setFixValue(vals['fix'])
self._regionChanged = True
if (vals['background'] is not None and
rMgr.region(rID).isBackground() != vals['background']):
rMgr.region(rID).setBackground(vals['background'])
self._regionChanged = True
if (vals['single'] is not None and
rMgr.region(rID).isSingle() != vals['single']):
rMgr.region(rID).setSingle(vals['single'])
self._regionChanged = True
if vals['startModel'] is not None:
rMgr.region(rID).setStartModel(vals['startModel'])
if vals['trans'] is not None:
rMgr.region(rID).setModelTransStr_(vals['trans'])
if (vals['cType'] is not None and
rMgr.region(rID).constraintType() != vals['cType']):
self.clearConstraints()
rMgr.region(rID).setConstraintType(vals['cType'])
if vals['zWeight'] is not None:
rMgr.region(rID).setZWeight(vals['zWeight'])
self.clearConstraints()
rMgr.region(rID).setModelControl(vals['modelControl'])
if vals['limits'][0] != 0:
rMgr.region(rID).setLowerBound(vals['limits'][0])
if vals['limits'][1] > vals['limits'][0]:
rMgr.region(rID).setUpperBound(vals['limits'][1])
if vals['correlationLengths'] is not None:
self.clearConstraints()
if vals['dip'] is not None:
self.clearConstraints()
if vals['strike'] is not None:
self.clearConstraints()
for r1, r2, w in self._interRegionCouplings:
rMgr.setInterRegionConstraint(r1, r2, w)
self._regionsNeedUpdate = False
[docs]
def setDataSpace(self, **kwargs):
"""Set data space, e.g., DataContainer, times, coordinates."""
if self.fop is not None:
pg.critical('in use?')
self.fop.setDataSpace(**kwargs)
else:
data = kwargs.pop('dataContainer', None)
if isinstance(data, pg.DataContainer):
self.setDataContainer(data)
else:
print(data)
pg.critical("nothing known to do? "
"Implement me in derived classes")
[docs]
def estimateError(self, data, **kwargs):
"""Create data error fallback when the data error is not known.
Should be implemented method-specific.
"""
raise NotImplementedError("Needed?? Implement me in derived classes")
# data = data * (pg.randn(len(data)) * errPerc / 100. + 1.)
# return data
[docs]
def drawModel(self, ax, model, **kwargs):
"""Draw a model into a given axis."""
if self.fop is not None:
pg.critical('in use?')
self.fop.drawModel(ax, model, **kwargs)
else:
print(kwargs)
raise NotImplementedError("No yet implemented")
[docs]
def drawData(self, ax, data, **kwargs):
"""Draw data into a given axis."""
if self.fop is not None:
self.fop.drawData(ax, data, **kwargs)
else:
print(kwargs)
raise NotImplementedError("No yet implemented")
[docs]
class LinearModelling(Modelling):
"""Modelling class for linearized problems with a given matrix."""
[docs]
def __init__(self, A):
"""Initialize by storing the (reference to the) matrix."""
super().__init__()
self.A = A
self.setJacobian(self.A)
[docs]
def response(self, model):
"""Linearized forward modelling by matrix-vector product."""
return self.A * model
[docs]
def createJacobian(self, model):
"""Do not compute a jacobian (linear)."""
pass
@property
def parameterCount(self):
"""Define the number of parameters from the matrix size."""
return self.A.cols()
[docs]
class Block1DModelling(Modelling):
"""General forward operator for 1D layered models.
Model space: [thickness_i, parameter_jk],
with i = 0 - nLayers-1, j = (0 .. nLayers), k=(0 .. nPara)
"""
[docs]
def __init__(self, nPara=1, nLayers=4, **kwargs):
"""Initialize by setting up model space from layers and parameters.
Parameters
----------
nLayers : int [4]
Number of layers.
nPara : int [1]
Number of parameters per layer
(e.g. nPara=2 for resistivity and phase)
"""
self._nLayers = 0
super().__init__(**kwargs)
self._withMultiThread = True
self._nPara = nPara # number of parameters per layer
self.initModelSpace(nLayers)
@property
def nPara(self):
"""Number of parameters."""
return self._nPara
@property
def nLayers(self):
"""Number of layers."""
return self._nLayers
@nLayers.setter
def nLayers(self, nLayers):
"""Set number of layers."""
return self.initModelSpace(nLayers)
[docs]
def initModelSpace(self, nLayers):
"""Set number of layers for the 1D block model."""
if nLayers == self._nLayers:
return
self._nLayers = nLayers
if nLayers < 2:
pg.critical("Number of layers need to be at least 2")
mesh = pg.meshtools.createMesh1DBlock(nLayers, self._nPara)
self.clearRegionProperties()
self.setMesh(mesh)
# setting region 0 (layers) and 1..nPara (values)
for i in range(1 + self._nPara):
self.setRegionProperties(i, trans='log')
if self._withMultiThread:
self.setMultiThreadJacobian(2*nLayers - 1)
# self._applyRegionProperties()
[docs]
def drawModel(self, ax, model, **kwargs):
"""Draw model into a given axis."""
pg.viewer.mpl.drawModel1D(ax=ax,
model=model,
plot='loglog',
xlabel=kwargs.pop('xlabel',
'Model parameter'),
**kwargs)
return ax
[docs]
def drawData(self, ax, data, err=None, label=None, **kwargs):
"""Draw the data by default method.
Probably ugly and you should overwrite it in your derived class.
Modelling creates the data and should know best how to draw them.
"""
nData = len(data)
yVals = range(1, nData+1)
ax.loglog(data, yVals,
label=label,
**DEFAULT_STYLES.get(label, DEFAULT_STYLES['Default'])
)
if err is not None:
ax.errorbar(data, yVals,
xerr=err*data,
label='Error',
**DEFAULT_STYLES.get('Error',
DEFAULT_STYLES['Default'])
)
ax.set_ylim(max(yVals), min(yVals))
ax.set_xlabel('Data')
ax.set_ylabel('Data Number')
return ax
[docs]
class MeshModelling(Modelling):
"""Modelling class with a mesh discretization."""
[docs]
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._axs = None
self._meshNeedsUpdate = True
self._baseMesh = None ## keep a copy of the original mesh
# optional p2 refinement for forward task
self._refineP2 = False
self._refineH2 = True
self._pd = None
self._C = None # custom Constraints matrix
def __hash__(self):
"""Return unique hash for caching."""
return super().__hash__() ^ hash(self.mesh())
@property
def paraDomain(self):
"""Return parameter (inverse) mesh."""
# We need our own copy here because its possible that we want to use
# the mesh after the fop was deleted
if not self.mesh():
pg.critical('paraDomain needs a mesh')
self._pd = pg.Mesh(self.regionManager().paraDomain())
return self._pd
[docs]
def setCustomConstraints(self, C):
"""Set custom constraints matrix for lazy evaluation.
To remove them set it to 'None' again.
"""
self._C = C
[docs]
def createConstraints(self):
"""Create constraint matrix."""
# just ensure there is valid mesh
# self.mesh()
foundGeoStat = False
for reg, props in self.regionProperties().items():
if not props['background'] and \
props['correlationLengths'] is not None or \
props['dip'] is not None or props['strike'] is not None:
cL = props.get('correlationLengths') or 5
dip = props.get('dip') or 0
strike = props.get('strike') or 0
pg.info('Creating GeostatisticConstraintsMatrix for region' +
f' {reg} with: I={cL}, dip={dip}, strike={strike}')
if foundGeoStat is True:
pg.critical('Only one global GeostatisticConstraintsMatrix'
'possible at the moment.')
# keep a copy of C until refcounting in the core works
self._C = pg.matrix.GeostatisticConstraintsMatrix(
mesh=self.paraDomain, I=cL, dip=dip, strike=strike,
)
foundGeoStat = True
self.setConstraints(self._C)
if foundGeoStat is False:
super().createConstraints()
return self.constraints()
[docs]
def paraModel(self, model):
"""Return parameter model, i.e. model mapped back with cell markers."""
mod = model[self.paraDomain.cellMarkers()]
if isinstance(mod, np.ndarray):
mod = pg.Vector(mod)
# Else next line fails as np.array does not allow set attributes.
mod.isParaModel = True
return mod
[docs]
def ensureContent(self):
"""Ensure there is a valid initialized mesh.
Initialization means the cell marker are recounted and/or there was a
mesh refinement or boundary enlargement, all to fit the needs for the
method-depending forward problem.
"""
# Need to call this once to be sure the mesh is initialized when needed
self.mesh()
[docs]
def setMeshPost(self, mesh):
"""Set mesh, called when the mesh has been set successfully.
Might be overwritten by child classes.
"""
pass
[docs]
def createRefinedFwdMesh(self, mesh):
"""Refine the current mesh for higher accuracy.
This is called automatic when accessing self.mesh() so it ensures any
effect of changing region properties (background, single).
"""
m = pg.Mesh(mesh)
if self._refineH2:
pg.info("Creating refined mesh (H2) to solve forward task.")
m = m.createH2()
if self._refineP2:
pg.info("Creating refined mesh (P2) to solve forward task.")
m = m.createP2()
pg.info("Mesh for forward task:", m)
return m
[docs]
def createFwdMesh_(self):
"""Create forward mesh."""
pg.info("Creating forward mesh from region infos.")
m = pg.Mesh(self.regionManager().mesh())
regionIds = self.regionManager().regionIdxs()
for iId in regionIds:
r = self.regionManager().region(iId)
pg.verbose(f"\tRegion: {iId}, "
f"Parameter: {r.parameterCount()}, "
f"PD: {r.isInParaDomain()},"
f" Single: {r.isSingle()}, "
f"Background: {r.isBackground()}, "
f"Fixed: {r.fixValue()}")
m = self.createRefinedFwdMesh(m)
self.setMeshPost(m)
self._regionChanged = False
super().setMesh(m, ignoreRegionManager=True)
if self._C is not None:
pg.info('Set custom constraints matrix.')
self.setConstraints(self._C)
[docs]
def mesh(self):
"""Return currently used mesh."""
self._applyRegionProperties()
if self._regionManagerInUse and self._regionChanged is True:
self.createFwdMesh_()
return super().mesh()
[docs]
def setMesh(self, mesh, ignoreRegionManager=False):
"""Set mesh and specify whether region manager can be ignored."""
# pg._b('setMesh', id(mesh), mesh, ignoreRegionManager)
# keep a copy, just in case
self._baseMesh = mesh
if ignoreRegionManager is False:
self._regionManagerInUse = True
# Modelling without region manager
if ignoreRegionManager is True or not self._regionManagerInUse:
self._regionManagerInUse = False
super(Modelling, self).setMesh(mesh, ignoreRegionManager=True)
self.setMeshPost(mesh)
return
# copy the mesh to the region manager who renumber cell markers
self.clearRegionProperties()
self.regionManager().setMesh(mesh)
self.setDefaultBackground()
[docs]
def setDefaultBackground(self):
"""Set the lowest region to background if several exist."""
regionIds = self.regionManager().regionIdxs()
pg.info(f"Found {len(regionIds)} regions.")
if len(regionIds) > 1:
bk = pg.sort(regionIds)[0]
pg.info("Region with smallest marker set to background "
f"(marker={bk})")
self.setRegionProperties(bk, background=True)
[docs]
def drawModel(self, ax, model, **kwargs):
"""Draw the model as mesh-based distribution."""
mod = None
# TODO needs to be checked if mapping is always ok (region example)
# is (len(model) == self.paraDomain.cellCount() or \
if hasattr(model, "isParaModel") and model.isParaModel is False:
# pg._y(model.isParaModel)
mod = self.paraModel(model)
elif hasattr(model, "isParaModel") and model.isParaModel is True:
# pg._g(model.isParaModel)
mod = model
elif len(model) == self.paraDomain.nodeCount():
# why nodeCount? a field as model result?
# pg._b('node count')
mod = model
elif len(model) == self.paraDomain.cellCount():
# pg._b('cell count')
mod = model
else:
mod = self.paraModel(model)
if ax is None:
if self._axs is None:
self._axs, _ = pg.show()
ax = self._axs
if hasattr(ax, '__cBar__'):
# we assume the axes already holds a valid mappable and we only
# update the model data
cBar = ax.__cBar__
kwargs.pop('label', None)
kwargs.pop('cMap', None)
pg.viewer.mpl.setMappableData(cBar.mappable, mod, **kwargs)
else:
diam = kwargs.pop('diam', None)
ax, cBar = pg.show(mesh=self.paraDomain,
data=mod,
label=kwargs.pop('label', 'Model parameter'),
logScale=kwargs.pop('logScale', False),
ax=ax,
**kwargs
)
if diam is not None:
pg.viewer.mpl.drawSensors(ax, self.data.sensors(), diam=diam,
edgecolor='black', facecolor='white')
return ax, cBar
[docs]
class PetroModelling(MeshModelling):
"""Combine petrophysical relation with the modelling class f(p).
Combine petrophysical relation :math:`p(m)` with a modelling class
:math:`f(p)` to invert for the petrophysical model :math:`p` instead
of the geophysical model :math:`m`.
:math:`p` be the petrophysical model, e.g., porosity, saturation, ...
:math:`m` be the geophysical model, e.g., slowness, resistivity, ...
"""
[docs]
def __init__(self, fop, petro, **kwargs):
"""Save forward class and transformation, create Jacobian matrix."""
self._f = fop
# self._f createStartModel might be called and depends on the regionMgr
self._f.regionManager = self.regionManager
# self.createRefinedFwdMesh depends on refinement strategy of self._f
self.createRefinedFwdMesh = self._f.createRefinedFwdMesh
super().__init__(fop=None, **kwargs)
# petroTrans.fwd(): p(m), petroTrans.inv(): m(p)
self._petroTrans = petro # class defining p(m)
self._jac = pg.matrix.MultRightMatrix(self._f.jacobian())
self.setJacobian(self._jac)
@property
def petro(self):
"""Petrophysical model transformation."""
return self._petroTrans
[docs]
def setMeshPost(self, mesh):
"""Set mesh after init."""
self._f.setMesh(mesh, ignoreRegionManager=True)
[docs]
def setDataPost(self, data):
"""Set data after init."""
self._f.setData(data)
[docs]
def createStartModel(self, data):
"""Use inverse transformation to get m(p) for the starting model."""
sm = self._f.createStartModel(data)
pModel = self._petroTrans.inv(sm)
return pModel
[docs]
def response(self, model):
"""Use transformation to get p(m) and compute response f(p)."""
tModel = self._petroTrans.fwd(model)
ret = self._f.response(tModel)
return ret
[docs]
def createJacobian(self, model):
r"""Fill the individual jacobian matrices.
J = dF(m) / dm = dF(m) / dp * dp / dm
"""
tModel = self._petroTrans.fwd(model)
self._f.createJacobian(tModel)
self._jac.A = self._f.jacobian()
self._jac.r = self._petroTrans.deriv(model) # set inner derivative
# print(self._jac.A.rows(), self._jac.A.cols())
# print(self._jac.r)
# pg._r("create Jacobian", self, self._jac)
self.setJacobian(self._jac) # to be sure .. test if necessary
# 220817 to be changed later!!
# class JointModelling(Modelling):
[docs]
class JointModelling(MeshModelling):
"""Cumulative (joint) forward operator."""
[docs]
def __init__(self, fopList):
"""Initialize with lists of forward operators."""
super().__init__()
self.fops = fopList
self.jac = pg.matrix.BlockMatrix()
# self.modelTrans = self.fops[0].modelTrans
self.modelTrans = pg.trans.TransLogLU()
self.fops[0].regionManager()
self.setRegionManager(self.fops[0].regionManagerRef())
[docs]
def createStartModel(self, data):
"""Use inverse transformation to get m(p) for the starting model."""
sm = self._f.createStartModel(data)
pModel = self._petroTrans.inv(sm)
return pModel
[docs]
def response(self, model):
"""Concatenate responses for all fops."""
resp = []
for f in self.fops:
resp.extend(f.response(model))
return resp
[docs]
def createJacobian(self, model):
"""Fill the individual Jacobian matrices."""
self.initJacobian()
for f in self.fops:
f.createJacobian(model)
[docs]
def setData(self, data):
"""Distribute list of data to the forward operators."""
if len(data) != len(self.fops):
pg.critical("Please provide data for all forward operators")
self._data = data
nData = 0
for i, fi in enumerate(self.fops):
fi.setData(data[i])
self.jac.addMatrix(fi.jacobian(), nData, 0)
nData += data[i].size() # update total vector length
self.setJacobian(self.jac)
[docs]
def setMesh(self, mesh, **kwargs): # to be removed from here
"""Set the parameter mesh to all fops."""
for fi in self.fops:
fi.setMesh(mesh)
# 220817 to be implemented!!
# class JointMeshModelling(JointModelling):
# def __init__(self, fopList):
# super().__init__(self, fopList)
# self.setRegionManager(self.fops[0].regionManagerRef())
[docs]
class LCModelling(Modelling):
"""2D Laterally constrained (LC) modelling.
2D Laterally constrained (LC) modelling based on BlockMatrices.
"""
[docs]
def __init__(self, fop, **kwargs):
"""Parameters: fop class ."""
super().__init__()
self._singleRegion = False
self._fopTemplate = fop
self._fopKwargs = kwargs
self._fops1D = []
self._mesh = None
self._nSoundings = 0
self._parPerSounding = 0
self._jac = None
self.soundingPos = None
[docs]
def setDataBasis(self, **kwargs):
"""Set homogeneous data basis.
Set a common data basis to all forward operators.
If you want individual you need to set them manually.
"""
for f in self._fops1D:
f.setDataBasis(**kwargs)
[docs]
def initModelSpace(self, nLayers):
"""Initialize model space."""
for f in self._fops1D:
f.initModelSpace(nLayers)
[docs]
def createDefaultStartModel(self, models):
"""Create default starting model."""
sm = pg.Vector()
for i, f in enumerate(self._fops1D):
sm = pg.cat(sm, f.createDefaultStartModel(models[i]))
return sm
[docs]
def response(self, par):
"""Cut together forward responses of all soundings."""
mods = np.asarray(par).reshape(self._nSoundings, self._parPerSounding)
resp = pg.Vector(0)
for i in range(self._nSoundings):
r = self._fops1D[i].response(mods[i])
# print("i:", i, mods[i], r)
resp = pg.cat(resp, r)
return resp
[docs]
def createJacobian(self, par):
"""Create Jacobian matrix by creating individual Jacobians."""
mods = np.asarray(par).reshape(self._nSoundings, self._parPerSounding)
for i in range(self._nSoundings):
self._fops1D[i].createJacobian(mods[i])
[docs]
def createParametrization(self, nSoundings, nLayers=4, nPar=1):
"""Create LCI mesh and suitable constraints informations.
Parameters
----------
nLayers : int
Numbers of depth layers
nSoundings : int
Numbers of 1D measurements to laterally constrain
nPar : int
Numbers of independent parameter types,
e.g., nPar = 1 for VES (invert for resisitivies),
nPar = 2 for VESC (invert for resisitivies and phases)
"""
nCols = (nPar+1) * nLayers - 1 # fail for VES-C
self._parPerSounding = nCols
self._nSoundings = nSoundings
self._mesh = pg.meshtools.createMesh2D(range(nCols + 1),
range(nSoundings + 1))
self._mesh.rotate(pg.RVector3(0, 0, -np.pi/2))
cm = np.ones(nCols * nSoundings) * 1
if not self._singleRegion:
for i in range(nSoundings):
for j in range(nPar):
cm[i * self._parPerSounding + (j+1) * nLayers-1:
i * self._parPerSounding + (j+2) * nLayers-1] += (j+1)
self._mesh.setCellMarkers(cm)
self.setMesh(self._mesh)
pID = self.regionManager().paraDomain().cellMarkers()
cID = [c.id() for c in self._mesh.cells()]
# print(np.array(pID))
# print(np.array(cID))
# print(self.parameterCount
perm = [0]*self.parameterCount
for i in range(len(perm)):
perm[pID[i]] = cID[i]
# print(perm)
self.regionManager().permuteParameterMarker(perm)
# print(self.regionManager().paraDomain().cellMarkers())
[docs]
def initJacobian(self, dataVals, nLayers, nPar=None):
"""Initialize Jacobian matrix.
Parameters
----------
dataVals : ndarray | RMatrix | list
Data values of size (nSounding x Data per sounding).
All data per sounding need to be equal in length.
If they don't fit into a matrix use list of sounding data.
"""
nSoundings = len(dataVals)
if nPar is None:
# TODO get nPar Infos from fop._fopTemplate
nPar = 1
self.createParametrization(nSoundings, nLayers=nLayers, nPar=nPar)
if self._jac is not None:
self._jac.clear()
else:
self._jac = pg.matrix.BlockMatrix()
self.fops1D = []
nData = 0
for i in range(nSoundings):
kwargs = {}
for key, val in self._fopKwargs.items():
if hasattr(val, '__iter__'):
if len(val) == nSoundings:
kwargs[key] = val[i]
else:
kwargs[key] = val
f = None
if issubclass(self._fopTemplate, pg.frameworks.Modelling):
f = self._fopTemplate(**kwargs)
else:
f = type(self._fopTemplate)(self.verbose, **kwargs)
f.setMultiThreadJacobian(self._parPerSounding)
self._fops1D.append(f)
nID = self._jac.addMatrix(f.jacobian())
self._jac.addMatrixEntry(nID, nData, self._parPerSounding * i)
nData += len(dataVals[i])
self._jac.recalcMatrixSize()
# print("Jacobian size:", self.J.rows(), self.J.cols(), nData)
self.setJacobian(self._jac)
[docs]
def drawModel(self, ax, model, **kwargs):
"""Draw models as stitched 1D model section."""
mods = np.asarray(model).reshape(self._nSoundings,
self._parPerSounding)
pg.viewer.mpl.showStitchedModels(mods, ax=ax, useMesh=True,
x=self.soundingPos,
**kwargs)
[docs]
class ParameterModelling(Modelling):
"""Model with symbolic parameter names instead of numbers."""
[docs]
def __init__(self, funct=None, **kwargs):
"""Initialize, optionally with given function."""
self.function = None
self._params = {}
self.dataSpace = None # x, t freqs, or whatever
self.defaultModelTrans = 'lin'
super().__init__(**kwargs)
if funct is not None:
self._initFunction(funct)
@property
def params(self):
"""Return number of parameters."""
return self._params
def _initFunction(self, funct):
"""Init any function and interpret possible args and kwargs."""
self.function = funct
# the first varname is suposed to be f or freqs
self.dataSpaceName = funct.__code__.co_varnames[0]
pg.debug('data space:', self.dataSpaceName)
args = funct.__code__.co_varnames[1:funct.__code__.co_argcount]
for varname in args:
if varname != 'verbose':
pg.debug('add parameter:', varname)
self._params[varname] = 0.0
# nPara = len(self._params.keys()) # not used!
# for i, [k, p] in enumerate(self._params.items()):
for i, k in enumerate(self._params.keys()):
self.addParameter(k, id=i, cType=0,
single=True,
trans=self.defaultModelTrans,
startModel=1)
[docs]
def response(self, params):
"""Compute and return model response."""
if np.isnan([*params]).any():
print(params)
pg.critical('invalid params for response')
if self.dataSpace is None:
pg.critical('no data space given')
ret = self.function(self.dataSpace, *params)
return ret
[docs]
def setRegionProperties(self, k, **kwargs):
"""Set Region Properties by parameter name."""
if isinstance(k, int) or (k == '*'):
super().setRegionProperties(k, **kwargs)
else:
self.setRegionProperties(self._params[k], **kwargs)
[docs]
def addParameter(self, name, id=None, **kwargs):
"""Add a parameter."""
if id is None:
id = len(self._params)
self._params[name] = id
self.regionManager().addRegion(id)
self.setRegionProperties(name, **kwargs)
return id
[docs]
def drawModel(self, ax, model):
"""Draw model."""
label = ''
for k, p in self._params.items():
label += k + f"={pg.utils.prettyFloat(model[p])} "
pg.info("Model: ", label)
[docs]
class PriorModelling(MeshModelling):
"""Forward operator for grabbing values out of a mesh (prior data)."""
[docs]
def __init__(self, mesh=None, pos=None, **kwargs):
"""Init with mesh and some positions that are converted into ids."""
super().__init__(**kwargs)
self.pos = pos
if mesh is not None:
self.setMesh(mesh)
[docs]
def setMesh(self, mesh):
"""Set mesh, save index vector and compute Jacobian."""
super().setMesh(mesh)
self.ind = np.zeros(len(self.pos), dtype=np.int32)
for i, po in enumerate(self.pos):
cell = mesh.findCell(po)
if cell is None:
raise IndexError(f"Could not find cell at position {po}!")
else:
self.ind[i] = cell.id()
# self.ind = np.array([mesh.findCell(po).id() for po in self.pos])
self.J = pg.SparseMapMatrix()
self.J.resize(len(self.ind), mesh.cellCount())
for i, ii in enumerate(self.ind):
self.J.setVal(i, ii, 1.0)
self.setJacobian(self.J)
[docs]
def response(self, model):
"""Return values at the indexed cells."""
return model[self.ind]
[docs]
def createJacobian(self, model):
"""Do nothing (linear)."""
pass
[docs]
def createRefinedFwdMesh(self, mesh):
"""Create refined forward mesh: do nothing here to prevent this."""
return mesh