#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Time Domain Electromagnetics (TDEM) functions and class"""
import sys
from math import pi
import numpy as np
#import matplotlib.pyplot as plt
import pygimli as pg
from pygimli.frameworks.modelling import MeshModelling
from . vmd import VMDTimeDomainModelling
[docs]
def rhoafromU(U, t, Tx, current=1.0, Rx=None):
r"""Apparent resistivity curve from classical TEM (U or dB/dt)
rhoafromU(U/I, t, TXarea[, RXarea])
.. math::
\rho_a = ( A_{Rx} *A_{Tx} * \mu_0 / 20 / (U/I) )^2/3*t^{-5/3}*4e-7
"""
UbyI = U / current
if Rx is None:
Rx = Tx # assume single/coincident loop
mu0 = 4e-7 * pi
rhoa = (Rx * Tx * mu0 / 20. / UbyI)**(2. / 3.) * \
t**(-5. / 3.) * mu0 / pi
return rhoa
[docs]
def rhoafromB(B, t, Tx, current=1):
r"""Apparent resistivity from B-field TEM
.. math::
\rho_a = ( (A_{Tx}*I*\mu_0 ) / (30B) )^2/3 * 4e-7 / t
"""
mu0 = 4e-7 * pi
rhoa = (current * Tx * mu0 / 30. / B)**(2. / 3.) * mu0 / pi / t
return rhoa
# TODO: better derive a class TEMsounding from dict and put functions in there
def TxArea(snd):
""" return effective transmitter area """
if isinstance(snd['LOOP_SIZE'], str):
Tx = np.prod([float(a) for a in snd['LOOP_SIZE'].split()])
else:
Tx = snd['LOOP_SIZE']
return Tx
def RxArea(snd):
"""Return effective receiver area."""
Rx = 0 # just in case of error
if 'COIL_SIZE' in snd:
Rx = snd['COIL_SIZE']
if Rx == 700.:
Rx = 100. # hack for wrong turns in NMR noise loop
else: # no coil size given ==> COI or SIN ==> take loop size
Rx = TxArea(snd)
return Rx
def get_rhoa(snd, cal=260e-9, corrramp=False, verbose=False):
"""Compute apparent resistivity from sounding (usf) dict."""
Tx = TxArea(snd)
Rx = RxArea(snd)
if 'COIL_SIZE' in snd:
Rx = snd['COIL_SIZE']
else:
Rx = Tx
if verbose:
print("Tx/Rx", Tx, Rx)
v = snd['VOLTAGE']
istart, istop = 0, len(v) # default: take all
mav = np.arange(len(v))[v == max(v)]
if len(mav) > 1: # several equal big ones: start after
istart = max(mav) + 1
if min(v) < 0.0: # negative values: stop at first
istop = np.argmax(v[20:] < 0.0) + 20
if verbose:
print(istart, istop)
v = v[istart:istop]
if 'ST_DEV' in snd:
dv = snd['ST_DEV'][istart:istop] # / snd['CURRENT']
else:
dv = v * 0.01
t = snd['TIME'][istart:istop] * 1.0
if corrramp and 'RAMP_TIME' in snd:
t = t - snd['RAMP_TIME'] / 2
if Rx == 1: # apparently B-field not dB/dt
rhoa = rhoafromB(B=v*cal, t=t, Tx=Tx)
else:
if verbose:
print("Using rhoafromU:", v, t, Tx, Rx)
rhoa = rhoafromU(U=v, t=t, Tx=Tx, Rx=Rx)
if verbose:
print(rhoa[0], rhoa[10], rhoa[-1])
rhoaerr = dv / v * (2. / 3.)
return rhoa, t, rhoaerr
def readusffile(filename, stripnoise=True):
"""Read data from single USF (universal sounding file) file
Examples
--------
DATA = readusffile(filename)
DATA = readusffile(filename, DATA) will append to DATA
"""
DATA = []
columns = []
nr = 0
station = {}
sounding = {}
sounding['FILENAME'] = filename
isdata = False
fid = open(filename)
for line in fid:
zeile = line.rstrip('\n').replace(',', ' ') # commas useless here
if zeile: # anything at all
if zeile[0] == '/': # comment-like
if zeile[1:4] == 'END': # end of a sounding
if isdata: # already read some data
sounding['data'] = columns
for i, cn in enumerate(sounding['column_names']):
sounding[cn] = columns[:, i]
sounding['FILENAME'] = filename
if 'INSTRUMENT' in sounding and 'ST_DEV' in sounding:
if 'terraTEM' in sounding['INSTRUMENT']:
sounding['ST_DEV'] *= 0.01
print('taking default stdev')
sounding.update(station)
if not(stripnoise and 'SWEEP_IS_NOISE' in sounding and
sounding['SWEEP_IS_NOISE'] == 1):
DATA.append(sounding)
sounding = {}
isdata = not isdata # turn off data mode
elif zeile.find(':') > 0: # key-value pair
key, value = zeile[1:].split(':')
try:
val = float(value)
sounding[key] = val
except:
sounding[key] = value
if 'SWEEP' in key and len(station) == 0: # first sweep
station = sounding.copy() # save global settings
else:
if isdata:
values = zeile.split()
try:
for i, v in enumerate(values):
columns[nr, i] = float(v)
nr += 1
except:
sounding['column_names'] = values
columns = np.zeros((int(sounding['POINTS']),
len(values)))
nr = 0
fid.close()
return DATA
def readusffiles(filenames):
"""Read all soundings data from a list of usf files
Example
-------
DATA = readusffiles(filenames)
"""
from glob import glob
if isinstance(filenames, str):
if filenames.find('*') >= 0:
filenames = glob(filenames)
else:
filenames = [filenames]
DATA = []
for onefile in filenames:
DATA.extend(readusffile(onefile))
return DATA
def readTEMfastFile(temfile):
"""ReadTEMfastFile(filename) reads TEM-fast file into usf sounding."""
snd = {}
snd['FILENAME'] = temfile
fid = open(temfile)
for i in range(4):
zeile = fid.readline()
snd['STACK_SIZE'] = int(zeile.split()[3])
snd['RAMP_TIME'] = float(zeile.split()[5])*1e-6
snd['CURRENT'] = float(zeile.split()[7][2:])
zeile = fid.readline()
fid.close()
snd['LOOP_SIZE'] = float(zeile.split()[2])**2
snd['COIL_SIZE'] = float(zeile.split()[5])**2
t, v, e, r = np.loadtxt(temfile, skiprows=8, usecols=(1, 2, 3, 4),
unpack=True)
ind = np.nonzero((r > 0) * (v > 0) * (t > snd['RAMP_TIME']*1.2e6)) # us
snd['TIME'] = t[ind] * 1e-6 # us
snd['VOLTAGE'] = v[ind]
snd['ST_DEV'] = e[ind]
snd['RHOA'] = r[ind]
return snd
def readUniKTEMData(filename):
"""Read TEM data format of University of Cologne."""
if '*' in filename:
from glob import glob
allfiles = glob(filename)
else:
allfiles = [filename]
DATA = []
for filename in allfiles:
snd = {}
snd['FILENAME'] = filename
A = np.loadtxt(filename)
snd['TIME'] = A[:, 1]
snd['VOLTAGE'] = A[:, 2]
snd['ST_DEV'] = A[:, 4] / 100 * A[:, 2]
DATA.append(snd)
return DATA
def readSiroTEMData(fname):
"""Read TEM data from siroTEM instrument dump.
Example
-------
DATA = readSiroTEMData(filename)
.. list of soundings with USF and siro-specific keys
"""
Time_ST = np.array([487., 887., 1287., 1687., 2087., 2687., 3487., 4287.,
5087., 5887., 7087., 8687., 10287., 11887., 13487.,
15887., 19087., 22287., 25487., 28687., 33487., 39887.,
46287., 52687., 59087., 68687., 81487., 94287.,
107090., 119890., 139090., 164690., 190290., 215890.,
241490., 279890., 331090., 382290., 433490., 484690.,
561490., 663890., 766290., 868690., 971090., 1124700.,
1329500., 1534300., 1739100., 1943900.])
Time_ET = np.array([0.05, 0.1, 0.15, 0.25, 0.325, 0.425, 0.525, 0.625,
0.725, 0.875, 1.075, 1.275, 1.475, 1.675, 1.975,
2.375, 2.775, 3.175, 3.575, 4.175, 4.975, 5.775,
6.575, 7.375, 8.575, 10.175, 11.775, 13.375, 14.975,
17.375, 20.575, 23.775, 26.975, 30.175, 34.975, 41.375,
47.775, 54.175, 60.574, 70.175, 82.975, 95.775,
108.575, 121.375, 140.575, 166.175, 191.775, 217.375,
242.975, 281.375, 332.575])
fid = open(fname)
# read in file header until : sign
line = 'a'
while len(line) > 0 and line[0] != ':':
line = fid.readline()
DATA = []
line = fid.readline()
while line[0] != ';':
header = line[1:-6].split(',')
snd = {} # dictionary, uppercase corresponds to USF format keys
snd['INSTRUMENT'] = 'siroTEM'
snd['dtype'] = int(header[3])
dstring = header[1]
snd['DATE'] = int('20' + dstring[6:8] + dstring[3:4] + dstring[0:1])
snd['win0'], snd['win1'], ngain, snd['conf'], snd['nch'] = \
[int(h) for h in header[5:10]]
snd['SOUNDING_NUMBER'] = int(header[10])
snd['GAIN_FACTOR'] = [0.1, 1.0, 10.0, 100.0][ngain] # predefined gains
snd['STACK_SIZE'] = int(header[14])
snd['ttype'] = int(header[20])
# 1=composite, 2=earlytime, 3=standard, 4=highresolution
snd['CURRENT'] = float(header[17])
snd['RAMP_TIME'] = float(header[18]) * 1e-6
snd['TIME_DELAY'] = float(header[19])
snd['LOOP_SIZE'] = float(header[21])
snd['COIL_SIZE'] = float(header[22])
fid.readline()
data = []
line = fid.readline()[:-1] # trim CR+LF newline
while len(line) > 0:
while line[-1] == '/':
line = line[:-1] + fid.readline()[:-1].replace('\t', '')
# aline = line
nums = [float(el[-7:-2]) * 10**(float(el[-2:])) for el in
line[1:-5].split(',')[1:]]
data.append(np.array(nums))
line = fid.readline().rstrip('\n').rstrip('\r')
snd['VOLTAGE'] = data[0]
if snd['ttype'] == 2: # early time
snd['TIME'] = Time_ET[snd['win0'] - 1:snd['win1']] * 1e-3
if snd['ttype'] == 3: # standard time
snd['TIME'] = Time_ST[snd['win0'] - 1:snd['win1']] * 1e-6
snd['ST_DEV'] = data[1]
if snd['dtype'] > 0: # normal measurement
DATA.append(snd)
line = fid.readline()
fid.close()
# DATA['FILENAME'] = fname # makes no sense as DATA is an array->snd?
return DATA
def getname(snd):
"""Generate label name from filename entry."""
fname = snd['FILENAME']
name = fname[fname.rfind('\\')+1:-4]
if 'STACK_SIZE' in snd:
name += '-' + str(int(snd['STACK_SIZE']))
return name
[docs]
class TDEM():
"""TEM class mainly for holding data etc."""
basename = "new"
[docs]
def __init__(self, filename=None):
"""Initialize class and (optionally) load data"""
self.DATA = []
self.names = []
if filename:
self.load(filename)
[docs]
def load(self, filename):
"""Road data from usf, txt (siroTEM), tem (TEMfast) or UniK file."""
if filename.lower().endswith('.usf'):
self.DATA.extend(readusffiles(filename))
elif filename.lower().endswith('.txt'):
self.DATA = readSiroTEMData(filename)
elif filename.lower().endswith('.tem'):
self.DATA.append(readTEMfastFile(filename))
elif filename.lower().endswith('.dat'): # dangerous
self.DATA = readUniKTEMData(filename)
self.basename = filename # .rstrip()
def __repr__(self):
return "<TDEMdata: %d soundings>" % (len(self.DATA))
[docs]
def showInfos(self): # only for old scripts using it
print(self.__repr__)
[docs]
def gather(self, token):
"""Collect item from all soundings."""
if token == "LOOP_SIZE":
mylist = [np.array(snd["LOOP_SIZE"].split(), dtype=float)
for snd in self.DATA]
else:
mylist = [snd[token] for snd in self.DATA]
return np.array(mylist)
[docs]
def filterSoundings(self, token, value):
"""Filter all values matching a certain token."""
vals = self.gather(token)
ind = np.nonzero(vals == value)[0]
# bind = vals == value
if np.any(ind):
self.DATA = [self.DATA[i] for i in ind]
else:
print("No data matching the filter criteria")
[docs]
def filterData(self, token, vmin=0, vmax=9e99):
"""Filter all sounding data according to criterion."""
for snd in self.DATA:
vals = snd[token]
bind = (vals >= vmin) & (vals <= vmax)
for col in snd["column_names"]:
snd[col] = snd[col][bind]
[docs]
def plotTransients(self, ax=None, **kwargs):
"""Plot all transients into one window"""
if ax is None:
fig, ax = plt.subplots()
else:
fig = ax.get_figure()
kwargs.setdefault('marker', '.')
plotlegend = kwargs.pop('legend', True)
cols = 'rgbmcyk'
pl = []
for i, data in enumerate(self.DATA):
t = data['TIME']
u = data['VOLTAGE'] / RxArea(data)
col = cols[i % len(cols)]
pl.append(ax.loglog(t, u, label=getname(data),
color=col, **kwargs))
if 'ST_DEV' in data:
err = data['ST_DEV'] / RxArea(data)
ax.errorbar(t, u, yerr=err, color=col)
# uU = u + err
# uL = u - err
# ax.errorbar(t, u, yerr=[uL, uU], color=col)
if 'RAMP_TIME' in data:
ax.vlines(data['RAMP_TIME'], min(u), max(u), colors=col)
ax.set_xlabel('t [s]')
ax.set_ylabel('U/I [V/A]')
if plotlegend:
ax.legend(loc='best')
# xlim = [10e-6, 2e-3]
ax.grid(True)
return fig, ax
[docs]
def plotRhoa(self, ax=None, ploterror=False, corrramp=False, **kwargs):
"""Plot all apparent resistivity curves into one window."""
if ax is None:
fig, ax = plt.subplots()
kwargs.setdefault('marker', '.')
plotLegend = kwargs.pop('legend', True)
for i, data in enumerate(self.DATA):
rhoa, t, err = get_rhoa(data, corrramp=corrramp)
err[err > .99] = .99
col = 'C'+str(i % 10)
ax.loglog(rhoa, t, label=getname(data), # color=col,
color=col, **kwargs)
if ploterror:
ax.errorbar(rhoa, t, xerr=rhoa * err, color=col)
ax.set_ylabel('t [s]')
ax.set_xlabel(r'$\rho_a$ [$\Omega$m]')
if plotLegend:
ax.legend(loc='best')
ax.grid(True)
ax.set_ylim(ax.get_ylim()[::-1])
return ax
def __call__(self, i=0):
"""Return a single sounding."""
return self.DATA[i]
def __getitem__(self, i=0):
"""Return a single sounding."""
return self.DATA[i]
[docs]
def invert(self, nr=0, nlay=4, thickness=None, errorFloor=0.05):
"""Do inversion."""
snd = self.DATA[nr]
self.fop = VMDTimeDomainModelling(snd['TIME'], TxArea(snd), 1,
nLayers=nlay)
rhoa, t, err = get_rhoa(snd)
self.fop.t = t
model = self.fop.createStartModel(rhoa, nlay, thickness=None)
self.INV = pg.frameworks.MarquardtInversion(fop=self.fop)
errorVals = np.maximum(snd.pop('ST_DEV', 0)/snd['VOLTAGE'], errorFloor)
self.model = self.INV.run(dataVals=rhoa, errorVals=errorVals,
startModel=model)
return self.model
[docs]
def stackAll(self, tmin=0, tmax=100):
"""Stack all measurements yielding a new TDEM class instance."""
t = self.DATA[0]['TIME']
v = np.zeros_like(t)
V = np.zeros((len(v), len(self.DATA)))
ST = np.zeros(len(self.DATA))
sumstacks = 0
for i, snd in enumerate(self.DATA):
if np.allclose(snd['TIME'], t):
stacks = snd.pop('STACK_SIZE', 1)
ST[i] = stacks
v += snd['VOLTAGE'] * stacks
sumstacks += stacks
V[:, i] = snd['VOLTAGE']
else:
print("sounding {} does not have the same time!".format(i))
v /= sumstacks
VM = np.ma.masked_less_equal(V, 0)
# err = np.std(VM, axis=1).data
err = np.std(np.log(VM), axis=1).data
snd = self.DATA[0].copy()
fi = np.nonzero((t >= tmin) & (t <= tmax))[0]
snd['TIME'] = t[fi]
snd['VOLTAGE'] = v[fi]
snd['ST_DEV'] = err[fi] * v[fi]
del snd['data']
tem = TDEM()
tem.DATA = [snd]
return tem
[docs]
class TDEMSmoothModelling(MeshModelling):
"""Occam-style (smooth) inversion."""
[docs]
def __init__(self, thk, **kwargs):
super().__init__()
self.thk_ = thk
self.nlay_ = len(thk)+1
self.core = VMDTimeDomainModelling(**kwargs, nLayers=self.nlay_)
self.mesh_ = pg.meshtools.createMesh1D(self.nlay_)
self.setMesh(self.mesh_)
[docs]
def response(self, par):
"""Model response (forward modelling)."""
return self.core.response(pg.cat(self.thk_, par))
if __name__ == '__main__':
print("do some tests here")
tem = TDEM(sys.argv[1])
print(tem)
tem.plotTransients()
tem.plotRhoa()