#!/usr/bin/env python
"""Frequency Domain Electromagnetics (FDEM) class."""
from pathlib import Path
import numpy as np
import pygimli as pg
from pygimli.viewer.mpl import show1dmodel, drawModel1D
from .hemmodelling import HEMmodelling
from .fdemmodelling import FDEM2dFOP
from .tools import xfplot
[docs]
class FDEM:
"""Managing Frequency Domain EM data and their inversions."""
[docs]
def __init__(self, x=None, freqs=None,
coilSpacing=None, inphase=None, outphase=None,
filename=None, scaleFreeAir=False):
r"""Initialize data class and load data. Provide filename or data.
If filename is given, data is loaded, overwriting settings.
Parameters
----------
x: array
Array of measurement positions
freq: array
Measured frequencies
coilSpacing : float
Distance between 2 two coils
inphase : array
real part of :math:`|amplitude| * \exp^{i phase}`
outphase : array
imaginary part of :math:`|amplitude| * \exp^{i phase}`
filename : str
Filename to read from. Supported: .xyz (MaxMin), .txt (Emsys)
scaleFreeAir : bool
Scale inphase and outphase data by free air (primary) solution
"""
if isinstance(x, str) and freqs is None: # string/file init
filename = x
self.x = x
self.frequencies = freqs
self.coilSpacing = coilSpacing
self.scaling = "ppm"
self.IP = inphase
self.OP = outphase
self.ERR = None
self.height = 1.0 # standard height for MaxMin/Promys devices
self.fop = None # better apply MethodManger base interface
self.transData, self.transRes, self.transThk = None, None, None
if filename:
# check if filename extension is TXT or CSV
fl = filename.lower()
if fl.endswith('.txt') or fl.endswith('.csv'):
try:
self.importMaxMinData(filename)
except Exception:
self.importEmsysAsciiData(filename)
else:
self.importMaxMinData(filename)
if np.any(self.frequencies):
self.isActiveFreq = self.frequencies > 0.0
self.activeFreq = np.nonzero(self.isActiveFreq)[0]
if scaleFreeAir:
freeAirSolution = self.FOP().freeAirSolution()
self.IP /= freeAirSolution
self.OP /= freeAirSolution
def __repr__(self):
"""Create string representation."""
if self.x is None:
part1 = "<FDEMdata: sounding with" + \
f" {len(self.frequencies):d} frequencies"
else:
part1 = f"<FDEMdata: {len(self.x):d} soundings" + \
"with {len(self.frequencies):d} frequencies"
if self.coilSpacing is not None:
cs = self.coilSpacing
if hasattr(cs, '__iter__'):
if len(cs) > 1:
part2 = f"coil spacing is {min(cs):f}-{max(cs):f} m>"
else:
part2 = f"coil spacing is {cs:f} m"
return part1 + ' , ' + part2
else:
return part1
[docs]
def importEmsysAsciiData(self, filename):
"""Import data from emsys text export file.
columns: no, pos(1-3), separation(4), frequency(6), error(8),
inphase (9-11), outphase (12-14),
reads: positions, data, frequencies, error and geometry
"""
cols = (1, 4, 6, 8, 9, 12, 15, 16)
xx, sep, f, pf, ip, op, hmod, q = np.loadtxt(
filename, skiprows=1, usecols=cols, unpack=True)
err = q / pf * 100. # percentage of primary field
if len(np.unique(sep)) > 1:
print("Warning! Several coil spacings present in file!")
self.coilSpacing = np.median(sep)
f = np.round_(f)
self.frequencies, mf, nf = np.unique(f, True, True)
x, mx, nx = np.unique(xx, True, True)
self.IP = np.ones((len(x), len(f))) * np.nan
self.OP = np.ones((len(x), len(f))) * np.nan
self.ERR = np.ones((len(x), len(f))) * np.nan
for i in range(len(f)):
self.IP[nx[i], nf[i]] = ip[i]
self.OP[nx[i], nf[i]] = op[i]
self.ERR[nx[i], nf[i]] = err[i]
[docs]
def readHEMData(self, filename, takeevery=1, choosevcp=True):
"""Read RESOLVE type airborne EM data from .XYZ file."""
self.header = {}
keyword = ''
i = 0
with Path(filename).open() as fid:
for line in fid:
if line[0] == '/':
line = line[1:].strip('\n').replace(
',', '').replace('AND', '')
try:
result = [float(co) for co in line.split()]
except ValueError:
result = line.split()
if len(result) == 1:
result = result[0]
if keyword:
if isinstance(keyword, list):
for kw, res in zip(keyword, result, strict=False):
self.header[kw] = res
else:
self.header[keyword] = result
keyword = ''
else:
keyword = result
else:
break
line = fid.readline()
print(line)
tmp = np.genfromtxt(
fname=filename, autostrip=True, comments='/',
skip_header=i+1, dtype=float, names=True, case_sensitive='lower',
missing_values='*', filling_values=-9999, skip_footer=1)
# read properties from header
if choosevcp:
ivcp = np.nonzero(np.array(self.header['COILGEOMETRY']) == 1)[0]
else:
ivcp = range(len(self.header['FREQUENCY']))
self.frequencies = np.array(self.header['FREQUENCY'])[ivcp]
self.coilSpacing = np.array(self.header['COILSEPERATION'])[ivcp]
# read properties from data block
names = tmp.dtype.names
if 'lon' in names and 'lat' in names:
utm = pg.utils.getUTMProjection(zone=32)
x, y = utm(tmp['lon'], tmp['lat'])
else:
x, y = tmp['x'], tmp['y']
self.pos = np.column_stack((x, y))[::takeevery]
dx = np.sqrt(np.diff(self.pos[:, 0])**2 + np.diff(self.pos[:, 1])**2)
self.x = np.hstack((0., np.cumsum(dx)))
self.z = tmp['h_laser'][::takeevery]
self.topo = tmp['topo'][::takeevery]
IP = np.column_stack([tmp['real_'+str(i+1)] for i in ivcp])
OP = np.column_stack([tmp['quad_'+str(i+1)] for i in ivcp])
# better do a decimation or running average here
self.IP = IP[::takeevery, :]
self.OP = OP[::takeevery, :]
self.isActiveFreq = self.frequencies > 0.0
self.activeFreq = np.nonzero(self.isActiveFreq)[0]
[docs]
def importIPXData(self, filename, verbose=False):
"""Import MaxMin IPX format with pos, data, frequencies & geometry."""
delim = None
with Path(filename).open() as fid:
aline = ''
i = 0 # just in case there is no header
for aline in fid:
if aline.split()[0][0].isdigit(): # number found
break
elif aline.find('COIL') > 0: # [:6] == '/ COIL':
self.coilSpacing = float(aline.replace(':', ': ').split()[-2])
elif aline.find('FREQ') > 0: # [:6] == '/ FREQ':
mya = aline[aline.find(':') + 1:].replace(',', ' ').split()
myf = [float(aa) for aa in mya if aa[0].isdigit()]
self.frequencies = np.array(myf)
if verbose:
print("CS=", self.coilSpacing, "F=", self.frequencies)
if aline.find(',') > 0:
delim = ','
nf = len(self.frequencies)
if verbose:
print("delim=", delim, "nf=", nf)
A = np.loadtxt(filename, skiprows=i, delimiter=delim, comments='/').T
x, y, self.IP, self.OP = A[0], A[1], A[
2:nf * 2 + 2:2].T, A[3:nf * 2 + 2:2].T
if max(x) == min(x):
self.x = y
else:
self.x = x
[docs]
def importMaxMinData(self, filename, verbose=False):
"""Import MaxMin ASCII export (.txt) data."""
self.coilSpacing = 99.9
f, re, im, err, cond = [], [], [], [], []
x, RE, IM, ERR, COND = [], [], [], [], []
with Path(filename).open() as fid:
lines = fid.readlines()
for line in lines:
stline = line.split()
if line.startswith("Coil Sep"):
self.coilSpacing = float(stline[-1])
if len(stline) > 3 and stline[3].find("Stn") >= 0:
x.append(float(stline[4]))
if len(re) > 0:
RE.append(np.array(re))
IM.append(np.array(im))
ERR.append(np.array(err))
COND.append(np.array(cond))
f, re, im, err, cond = [], [], [], [], []
if len(stline) > 0 and stline[0] == "MAX1": # data
f.append(float(stline[1]))
re.append(float(stline[3]))
im.append(float(stline[5]))
err.append(float(stline[7]))
cond.append(float(stline[9]))
if len(re) > 0:
RE.append(np.array(re))
IM.append(np.array(im))
ERR.append(np.array(err))
COND.append(np.array(cond))
self.x = np.array(x)
self.frequencies = np.array(f)
self.IP = np.array(RE)
self.OP = np.array(IM)
self.ERR = np.array(ERR)
[docs]
def deactivate(self, fr):
"""Deactivate a single frequency."""
fi = np.nonzero(np.absolute(self.frequencies / fr - 1.) < 0.1)
self.isActiveFreq[fi] = False
self.activeFreq = np.nonzero(self.isActiveFreq)[0]
[docs]
def freq(self):
"""Return active (i.e., non-deactivated) frequencies."""
return self.frequencies[self.activeFreq]
[docs]
def FOP(self, nlay=2, useHEM=1): # createFOP deciding upon block or smooth
"""Forward modelling operator using a block discretization.
Parameters
----------
nlay : int
Number of blocks
"""
if useHEM:
return HEMmodelling(nlay, self.height, f=self.freq(),
r=self.coilSpacing, scaling=self.scaling)
else:
return pg.core.FDEM1dModelling(nlay, self.freq(), self.coilSpacing,
-self.height)
[docs]
def FOPsmooth(self, zvec):
"""Forward modelling operator using fixed layers (smooth inversion).
Parameters
----------
zvec : array
"""
return pg.FDEM1dRhoModelling(zvec, self.freq(), self.coilSpacing,
-self.height)
[docs]
def selectData(self, xpos=0):
"""Select sounding at a specific position or by number.
Retrieve inphase, outphase and error(if exist) vector from index
or near given position
Parameters
----------
xpos : int | float
index (int) or position (float) along profile to choose
Returns
-------
IP : array
OP : array
ERR : array or None (if no error is specified)
"""
# check for index
if isinstance(xpos, int) and (xpos < len(self.x)) and (xpos >= 0):
n = xpos
else:
n = np.argmin(np.absolute(self.x - xpos))
self.height = self.z[n]
ip = self.IP[n, self.activeFreq]
op = self.OP[n, self.activeFreq]
err = None
if self.ERR is not None:
err = self.ERR[n, self.activeFreq]
return ip, op, err
[docs]
def error(self, xpos=0):
"""Return error as vector."""
_, _, err = self.selectData(xpos)
return err
[docs]
def datavec(self, xpos=0):
"""Extract data vector (stack in and out phase) for given pos/no."""
ip, op, _ = self.selectData(xpos)
return np.hstack((ip, op))
[docs]
def errorvec(self, xpos=0, minvalue=0.0):
"""Extract error vector for a give position or sounding number."""
_, _, err = self.selectData(xpos)
return np.tile(np.maximum(err * 0.7071, minvalue), 2)
# return pg.asvector(np.tile(np.maximum(err * 0.7071, minvalue), 2))
[docs]
def invBlock(self, xpos=0, nlay=2, noise=1.0, show=True, stmod=30.,
lam=1000., lBound=0., uBound=0., verbose=False, **kwargs):
"""Create and return Gimli inversion instance for block inversion.
Parameters
----------
xpos : array
position vector
nLay : int
Number of layers of the model to be determined OR
vector of layer numbers OR forward operator
noise : float
Absolute data err in percent
stmod : float or pg.Vector
Starting model
lam : float
Global regularization parameter lambda.
lBound : float
Lower boundary for the model
uBound : float
Upper boundary for the model. 0 means no upper booundary
verbose : bool
Be verbose
"""
# self.transThk = pg.trans.TransLog()
# self.transRes = pg.trans.TransLogLU(lBound, uBound)
# self.transData = pg.trans.Trans()
self.transData = pg.trans.TransSymLog(tol=0.1)
self.transLog = pg.trans.TransLog()
useHEM = kwargs.pop("useHEM", True)
# EM forward operator
if isinstance(nlay, pg.core.FDEM1dModelling):
self.fop = nlay
else:
self.fop = self.FOP(nlay, useHEM=useHEM)
dataVec = self.datavec(xpos)
# self.fop.region(0).setTransModel(self.transThk)
# self.fop.region(1).setTransModel(self.transRes)
if isinstance(noise, float):
errorVec = pg.Vector(len(dataVec), noise)
else:
errorVec = pg.asvector(noise)
# independent EM inversion
if isinstance(stmod, float): # real model given
model = pg.Vector(nlay * 2 - 1, stmod)
model[0] = 2.
else:
model = stmod if len(stmod)==nlay*2-1 else \
pg.Vector(nlay*2-1, 30.0)
pg.verbose("Model", model)
from pygimli.frameworks import MarquardtInversion
self.inv = MarquardtInversion(fop=self.fop, verbose=verbose,
debug=True)
self.inv.dataTrans = self.transData
self.inv.modelTrans = self.transLog
# self.dataTrans = self.transData
self.model1d = self.inv.run(dataVec, np.abs(errorVec/dataVec),
lam=lam, startModel=model, **kwargs)
response = self.inv.response
if show:
self.plotData(xpos=xpos, response=response)
return self.model1d
[docs]
def plotData(self, xpos=0, response=None, error=None, ax=None,
marker='bo-', rmarker='rx-', clf=True, addlabel='', nv=2):
"""Plot data as curves at given position."""
ip, op, err = self.selectData(xpos)
if error is not None and err is not None:
error = err
fr = self.freq()
if ax is None:
_, ax = pg.plt.subplots(nrows=1, ncols=nv)
ipax = ax[-2]
else:
ipax = ax[0]
markersize = 4
if error is not None:
markersize = 2
ipax.semilogy(ip, fr, marker, label='obs' + addlabel,
markersize=markersize)
if error is not None and len(error) == len(ip):
ipax.errorbar(ip, fr, xerr=error)
# ipax.set_axis('tight')
if error is not None:
ipax.ylim((min(fr) * .98, max(fr) * 1.02))
ipax.grid(True)
ipax.set_xlabel('inphase [ppm]')
ipax.set_ylabel('f [Hz]')
if response is not None:
rip = np.asarray(response)[:len(ip)]
ipax.semilogy(rip, fr, rmarker, label='syn' + addlabel)
ipax.legend(loc='best')
opax = None
opax = pg.plt.subplot(1, nv, nv) if ax is None else ax[-1]
opax.semilogy(op, fr, marker, label='obs' + addlabel,
markersize=markersize)
if error is not None and len(error) == len(ip):
opax.errorbar(op, fr, xerr=error)
if response is not None:
rop = np.asarray(response)[len(ip):]
opax.semilogy(rop, fr, rmarker, label='syn' + addlabel)
if error is not None:
opax.ylim((min(fr) * .98, max(fr) * 1.02))
opax.grid(True)
opax.set_xlabel('outphase [ppm]')
opax.set_ylabel('f [Hz]')
opax.legend(loc='best')
return ax
[docs]
def showModelAndData(self, model, xpos=0, response=None, figsize=(8, 6)):
"""Show both model and data with response in subfigures."""
fig, ax = pg.plt.subplots(1, 3, figsize=figsize)
model = np.asarray(model)
nlay = int((len(model) + 1) / 2)
thk = model[:nlay - 1]
res = model[nlay - 1: 2 * nlay - 1]
drawModel1D(ax[0], thk, res, plotfunction='semilogx')
self.plotData(xpos, response, ax=ax[1:3], clf=False)
return fig, ax
[docs]
def plotAllData(self, orientation='horizontal', aspect=1000,
outname=None, show=False, figsize=(11, 8), everyx=None):
"""Plot data along a profile as image plots for IP and OP."""
if self.x is None:
raise ValueError("No measurement position array x given")
freq = self.freq()
nr = 2
if self.ERR is not None:
nr = 3
if everyx is None:
everyx = len(self.x) // 10
_, ax = pg.plt.subplots(ncols=1, nrows=nr, figsize=figsize)
xfplot(ax[0], self.IP[:, self.activeFreq], self.x, freq,
orientation=orientation, aspect=aspect, everyx=everyx,
label='inphase percent')
# ax[0].set_title('inphase percent')
xfplot(ax[1], self.OP[:, self.activeFreq], self.x, freq,
orientation=orientation, aspect=aspect, everyx=everyx,
label='outphase percent')
# ax[1].set_title('outphase percent')
if self.ERR is not None:
xfplot(ax[2], self.ERR[:, self.activeFreq], self.x, freq,
orientation=orientation)
ax[2].set_title('error percent')
if outname is not None:
pg.plt.savefig(outname)
if show:
pg.plt.show()
return ax
[docs]
def plotModelAndData(self, model, xpos, response,
modelL=None, modelU=None):
"""Plot both model and data in subfigures."""
self.plotData(xpos, response, nv=3)
show1dmodel(model, color='blue')
if modelL is not None and modelU is not None:
pass
# draw1dmodelErr(model, modelL, modelU) # !!!!
return
[docs]
def FOP2d(self, nlay):
"""2d forward modelling operator."""
return FDEM2dFOP(self, nlay)
[docs]
def inv2D(self, nlay, lam=100., resL=1., resU=1000., thkL=1.,
thkU=100., minErr=1.0):
"""2d LCI inversion class."""
if isinstance(nlay, int):
modVec = pg.Vector(nlay * 2 - 1, 30.)
cType = 0 # no reference model
else:
modVec = nlay
cType = 10 # use this as referencemodel
nlay = (len(modVec) + 1) / 2
# init forward operator
self.f2d = self.FOP2d(nlay)
# transformations
self.transData = pg.trans.Trans()
self.transThk = pg.trans.TransLogLU(thkL, thkU)
self.transRes = pg.trans.TransLogLU(resL, resU)
for i in range(nlay - 1):
self.f2d.region(i).setTransModel(self.transThk)
for i in range(nlay - 1, nlay * 2 - 1):
self.f2d.region(i).setTransModel(self.transRes)
# set constraints
self.f2d.region(0).setConstraintType(cType)
self.f2d.region(1).setConstraintType(cType)
# collect data vector
datvec = pg.Vector(0)
for i in range(len(self.x)):
datvec = pg.cat(datvec, self.datavec(i))
# collect error vector
if self.ERR is None:
error = 1.0
else:
error = []
for i in range(len(self.x)):
err = np.maximum(self.ERR[i][self.activeFreq] * 0.701, minErr)
error.extend(err)
# generate starting model by repetition
model = np.repeat(modVec, len(self.x))
self.inv2d = pg.Inversion(fop=self.f2d)
self.inv2d.dataTrans = self.transData
model = self.inv2d.run(datvec, absoluteError=error,
startModel=model, isReference=True,
lam=lam, verbose=True)
return model
if __name__ == "__main__":
import argparse # better get an argparser from method manager
parser = argparse.ArgumentParser(description="usage: %prog [options] fdem")
parser.add_argument("-v", "--verbose", dest="verbose", action="store_true",
help="be verbose", default=False)
parser.add_argument("-n", "--nLayers", dest="nlay",
help="number of layers", type=int, default="4")
parser.add_argument("-x", "--xPos", dest="xpos", help="position/no to use",
type=float, default="0")
parser.add_argument("-l", "--lambda", dest="lam",
help="init regularization", type=float, default="30")
parser.add_argument("-e", "--error", dest="err", help="error estimate",
type=float, default="1")
parser.add_argument("datafile")
options = parser.parse_args()
if options.verbose:
__verbose__ = True
print(options)
fdem = FDEM(options.datafile)
print(fdem)
datafile = options.datafile
numlay = options.nlay
xvector = options.xpos
name = datafile.lower().rstrip('.xyz')
fdem = FDEM(datafile)
print(fdem)
fdem.deactivate(56320.) # do not use highest frequency
fdem.plotAllData(outname=name + '-alldata.pdf')
# that's bullshit (that's what we have the class for)
inv = fdem.invBlock(xpos=xvector, lam=options.lam, nlay=options.nlay,
noise=options.err, verbose=False)
mymodel = np.asarray(inv.run())
inv.echoStatus()
print("thk = ", mymodel[:numlay - 1])
print("res = ", mymodel[numlay - 1:])
figure, axes = fdem.showModelAndData(mymodel, xvector, inv.response())
model = fdem.inv2D(options.nlay)
# fig.savefig(name+str(xpos)+'-result.pdf', bbox_inches='tight')
pg.plt.show()