"""Import routines several ERT file formats."""
import re
import numpy as np
import pygimli as pg
[docs]
def load(fileName, verbose=False, **kwargs):
"""Shortcut to load ERT data.
Import Data and try to assume the file format.
Additionally to unified data format we support the wide-spread res2dinv
format as well as ASCII column files generated by the processing software
of various instruments (ABEM LS, Syscal Pro, Resecs, ?)
If this fails, install pybert and use its auto importer pybert.importData.
Parameters
----------
fileName: str
Returns
-------
data: pg.DataContainer
"""
data = pg.load(fileName)
if isinstance(data, pg.DataContainerERT):
return data
try:
pg.info("could not read unified data format for ERT ... try res2dinv")
data = importRes2dInv(fileName)
return data
except Exception:
pg.info("could not read res2dinv ... try Ascii columns")
try:
data = importAsciiColumns(fileName)
return data
except Exception as e:
pg.info("Failed importing Ascii column file. Consider using pybert.")
pg.info(e)
if verbose:
pg.info("Try to import using pybert .. if available")
pb = pg.optImport('pybert')
data = pb.importData(fileName)
if kwargs.pop('ensureKRhoa', False):
if not data.haveData('k'):
data.createGeometricFactors()
if data.haveData('r'):
data['rhoa'] = data['r'] * data['k']
elif data.haveData('u') and data.haveData('i'):
data['rhoa'] = data['u'] / data['i'] * data['k']
if isinstance(data, pg.DataContainerERT):
return data
pg.critical("Can't import ERT data file.", fileName)
def importRes2dInv(filename, verbose=False, return_header=False):
"""Read res2dinv format file.
Parameters
----------
filename : str
verbose : bool [False]
return_header : bool [False]
Returns
-------
pg.DataContainerERT and (in case of return_header=True)
header dictionary
Format
------
str - title
float - unit spacing [m]
int - Array Number (1-Wenner, 3-Dipole-dipole atm only)
int - Number of Datapoints
float - x-location given in terms of first electrode
use 1 if mid-point location is given
int - 0 for no IP, use 1 if IP present
str - Phase Angle if IP present
str - mrad if IP present
0,90.0 - if IP present
dataBody
"""
def getNonEmptyRow(i, comment='#'):
s = next(i)
while s[0] is comment:
s = next(i)
return s.split('\r\n')[0]
# def getNonEmptyRow(...)
with open(filename, 'r') as fi:
content = fi.readlines()
it = iter(content)
header = {}
header['name'] = getNonEmptyRow(it, comment=';')
header['spacing'] = float(getNonEmptyRow(it, comment=';'))
typrow = getNonEmptyRow(it, comment=';')
typ = int(typrow.rstrip('\n').rstrip('R').rstrip('L'))
if typ == 11:
# independent electrode positions
header['subtype'] = int(getNonEmptyRow(it, comment=';'))
header['dummy'] = getNonEmptyRow(it, comment=';')
isR = int(getNonEmptyRow(it, comment=';'))
nData = int(getNonEmptyRow(it, comment=';'))
xLoc = float(getNonEmptyRow(it, comment=';'))
hasIP = int(getNonEmptyRow(it, comment=';'))
if hasIP:
header['ipQuantity'] = getNonEmptyRow(it, comment=';')
header['ipUnit'] = getNonEmptyRow(it, comment=';')
header['ipData'] = getNonEmptyRow(it, comment=';')
ipline = header['ipData'].rstrip('\n').rstrip('\r').split(' ')
if len(ipline) > 2: # obviously spectral data?
header['ipNumGates'] = int(ipline[0])
header['ipDelay'] = float(ipline[1])
header['onTime'] = float(ipline[-2])
header['offTime'] = float(ipline[-1])
header['ipDT'] = np.array(ipline[2:-2], dtype=float)
header['ipGateT'] = np.cumsum(np.hstack((header['ipDelay'],
header['ipDT'])))
data = pg.DataContainerERT()
data.resize(nData)
if typ == 9 or typ == 10:
raise Exception("Don't know how to read:" + str(typ))
if typ in [11, 12, 13]: # mixed array
res = pg.Vector(nData, 0.0)
ip = pg.Vector(nData, 0.0)
specIP = []
for i in range(nData):
vals = getNonEmptyRow(it, comment=';').replace(',', ' ').split()
# row starts with 4
if int(vals[0]) == 4:
eaID = data.createSensor(pg.Pos(float(vals[1]),
float(vals[2])))
ebID = data.createSensor(pg.Pos(float(vals[3]),
float(vals[4])))
emID = data.createSensor(pg.Pos(float(vals[5]),
float(vals[6])))
enID = data.createSensor(pg.Pos(float(vals[7]),
float(vals[8])))
elif int(vals[0]) == 3:
eaID = data.createSensor(pg.Pos(float(vals[1]),
float(vals[2])))
ebID = -1
emID = data.createSensor(pg.Pos(float(vals[3]),
float(vals[4])))
enID = data.createSensor(pg.Pos(float(vals[5]),
float(vals[6])))
elif int(vals[0]) == 2:
eaID = data.createSensor(pg.Pos(float(vals[1]),
float(vals[2])))
ebID = -1
emID = data.createSensor(pg.Pos(float(vals[3]),
float(vals[4])))
enID = -1
else:
raise Exception('dont know how to handle row', vals[0])
res[i] = float(vals[int(vals[0])*2+1])
if hasIP:
# ip[i] = float(vals[int(vals[0])*2+2])
ipCol = int(vals[0])*2+2
ip[i] = float(vals[ipCol])
if 'ipNumGates' in header:
specIP.append(vals[ipCol:])
data.createFourPointData(i, eaID, ebID, emID, enID)
if isR:
data.set('r', res)
else:
data.set('rhoa', res)
if hasIP:
data.set('ip', ip)
if 'ipNumGates' in header:
A = np.array(specIP, dtype=float)
A[A > 1000] = -999
A[A < -1000] = -999
for i in range(header['ipNumGates']):
data.set('ip'+str(i+1), A[:, i])
else: # not type 11-13
# amount of values per column per typ
nntyp = [0, 3, 3, 4, 3, 3, 4, 4, 3, 0, 0, 8, 10]
nn = nntyp[typ] + hasIP # number of columns
dataBody = np.zeros((nn, nData))
for i in range(nData):
vals = getNonEmptyRow(it, comment=';').replace(',', ' ').split()
dataBody[:, i] = np.array(vals, dtype=float)
XX = dataBody[0]
EL = dataBody[1]
SP = pg.Vector(nData, 1.0)
if nn - hasIP == 4:
SP = dataBody[2]
AA = None
BB = None
NN = None
MM = None
if typ == 1: # Wenner
AA = XX - xLoc * EL * 1.5
MM = AA + EL
NN = MM + EL
BB = NN + EL
elif typ == 2: # Pole-Pole
AA = XX - xLoc * EL * 0.5
MM = AA + EL
elif typ == 3: # Dipole-Dipole
AA = XX - xLoc * EL * (SP / 2. + 1.)
BB = AA + EL
MM = BB + SP * EL
NN = MM + EL
elif typ == 3: # Dipole-Dipole
AA = XX - xLoc * EL * (SP / 2. + 1.)
BB = AA + EL
MM = BB + SP * EL
NN = MM + EL
elif typ == 4: # WENNER-BETA
AA = XX - xLoc * EL * 1.5
BB = AA + EL
MM = BB + EL
NN = MM + EL
elif typ == 5: # WENNER-GAMMA
AA = XX - xLoc * EL * 1.5
MM = AA + EL
BB = MM + EL
NN = BB + EL
elif typ == 6: # POLE-DIPOLE
AA = XX - xLoc * SP * EL - (SP - 1.) * (SP < 0.) * EL
MM = AA + SP * EL
NN = MM + np.sign(SP) * EL
elif typ == 7: # SCHLUMBERGER
AA = XX - xLoc * EL * (SP + 0.5)
MM = AA + SP * EL
NN = MM + EL
BB = NN + SP * EL
else:
raise Exception('Datatype ' + str(typ) + ' not yet suppoted')
for i in range(len(AA)):
if AA is not None:
eaID = data.createSensor(pg.Pos(AA[i], 0.0))
else:
eaID = -1
if BB is not None:
ebID = data.createSensor(pg.Pos(BB[i], 0.0))
else:
ebID = -1
if MM is not None:
emID = data.createSensor(pg.Pos(MM[i], 0.0))
else:
emID = -1
if NN is not None:
enID = data.createSensor(pg.Pos(NN[i], 0.0))
else:
enID = -1
data.createFourPointData(i, eaID, ebID, emID, enID)
data.set('rhoa', dataBody[nn - hasIP - 1])
if hasIP:
data.set('ip', dataBody[nn - 1])
row = getNonEmptyRow(it, comment=';')
if row.lower().startswith('topography'):
row = getNonEmptyRow(it, comment=';')
istopo = int(row)
if istopo:
ntopo = int(getNonEmptyRow(it, comment=';'))
ap = data.additionalPoints()
for i in range(ntopo):
strs = getNonEmptyRow(it, comment=';').replace(',', ' ').split()
ap.push_back(pg.Pos([float(s) for s in strs]))
data.setAdditionalPoints(ap)
data.sortSensorsX()
data.sortSensorsIndex()
if return_header:
return data, header
else:
return data
# def importRes2dInv(...)
def importAsciiColumns(filename, verbose=False, return_header=False):
"""Import any ERT data file organized in columns with column header.
Input can be:
* Terrameter LS or SAS Ascii Export format, e.g.
Time MeasID DPID Channel A(x) A(y) A(z) B(x) B(y) B(z) M(x) M(y) M(z)
N(x) N(y) N(z) F(x) F(y) F(z) Note I(mA) Uout(V) U(V) SP(V) R(O)
Var(%) Rhoa Cycles Pint Pext(V) T(°C) Lat Long
2016-09-14 07:01:56 73 7 1 8 1 1 20 1 1 12 1 1
16 1 1 14 1 2.076 99.8757 107.892 0.0920761 0 0.921907
0.196302 23.17 1 12.1679 12.425 42.1962 0 0
* Resecs Output format
"""
data = pg.DataContainerERT()
header = {}
with open(filename, 'r', encoding='iso-8859-15') as fi:
content = fi.readlines()
if content[0].startswith('Injection'): # Resecs lead-in
for n in range(20):
if len(content[n]) < 2:
break
content = content[n+1:]
if content[0].startswith('Filename'): # ABEM lead-in
for n, line in enumerate(content):
if line.find("MeasID") > 0:
break
for i in range(n):
sp = content[i].split(":")
if len(sp) > 1:
tok = sp[0].lstrip("\t").lstrip("- ")
header[tok] = sp[1].rstrip("\n").rstrip("\r")
for last in range(len(content)-1, -1, -1):
if content[last].find("---") == 0:
print(content[last])
last -= 1
print(content[last])
while len(content[last]) < 3:
last -= 1
last += 1
break
if last <= 1:
last = len(content)
content = content[n:last]
d = readAsDictionary(content, sep='\t')
if len(d) < 2:
d = readAsDictionary(content)
nData = len(next(iter(d.values())))
data.resize(nData)
if 'Spa.1' in d: # Syscal Pro
abmn = ['Spa.1', 'Spa.2', 'Spa.3', 'Spa.4']
if verbose:
pg.debug("detected Syscalfile format")
elif 'A(x)' in d: # ABEM Terrameter
abmn = ['A', 'B', 'M', 'N']
if verbose:
pg.debug("detected ABEM file format")
elif 'xA' in d: # Workbench TX2 processed data
abmn = ['xA', 'xB', 'xM', 'xN']
if verbose:
pg.debug("detected Workbench file format")
elif 'C1(x)' in d or 'C1(xm)' in d: # Resecs
abmn = ['C1', 'C2', 'P1', 'P2']
if verbose:
pg.debug("detected RESECS file format")
else:
pg.debug("no electrode positions found!")
pg.debug("Keys are:", d.keys())
raise Exception("No electrode positions found!")
for i in range(nData):
if abmn[0]+'(z)' in d:
eID = [data.createSensor([d[se+'(x)'][i], d[se+'(y)'][i],
d[se+'(z)'][i]]) for se in abmn]
elif abmn[0]+'(zm)' in d:
eID = [data.createSensor([d[se+'(xm)'][i], d[se+'(ym)'][i],
d[se+'(zm)'][i]]) for se in abmn]
elif abmn[0]+'(y)' in d:
eID = [data.createSensor([d[se+'(x)'][i], d[se+'(y)'][i],
0.]) for se in abmn]
elif abmn[0]+'(ym)' in d:
eID = [data.createSensor([d[se+'(xm)'][i], d[se+'(ym)'][i],
0.]) for se in abmn]
elif abmn[0]+'(x)' in d:
eID = [data.createSensor([d[se+'(x)'][i], 0.,
0.]) for se in abmn]
elif abmn[0]+'(xm)' in d:
eID = [data.createSensor([d[se+'(xm)'][i], 0.,
0.]) for se in abmn]
else:
eID = [data.createSensor([d[se][i], 0., 0.]) for se in abmn]
data.createFourPointData(i, *eID)
# data.save('tmp.shm', 'a b m n')
tokenmap = {'I(mA)': 'i', 'I': 'i', 'In': 'i', 'Vp': 'u',
'VoltageV': 'u', 'U': 'u', 'U(V)': 'u', 'UV': 'u',
'R(Ohm)': 'r', 'RO': 'r', 'R(O)': 'r', 'Res': 'r',
'Rho': 'rhoa', 'AppROhmm': 'rhoa', 'Rho-a(Ohm-m)': 'rhoa',
'Rho-a(Om)': 'rhoa',
'Var(%)': 'err', 'D': 'err', 'Dev.': 'err', 'Dev': 'err',
'M': 'ma', 'P': 'ip', 'IP sum window': 'ip',
'Time': 't'}
# Unit conversions (mA,mV,%), partly automatically assumed
unitmap = {'I(mA)': 1e-3, 'Var(%)': 0.01, # ABEM
'U': 1e-3, 'I': 1e-3, 'D': 0.01, # Resecs
'Dev.': 0.01, 'In': 1e-3, 'Vp': 1e-3} # Syscal
abmn = ['a', 'b', 'm', 'n']
if 'Cycles' in d:
d['stacks'] = d['Cycles']
for key in d.keys():
vals = np.asarray(d[key])
if key.startswith('IP sum window'): # there is a trailing number
key = 'IP sum window' # apparently not working
if np.issubdtype(vals.dtype, np.floating, # 'float' 'int'
) or np.issubdtype(vals.dtype, np.signedinteger):
if key in tokenmap: # use the standard (i, u, rhoa) key
if key not in abmn:
if verbose:
pg.debug("Setting", tokenmap[key], "from", key)
data.set(tokenmap[key],
vals * unitmap.get(key, 1.0))
else: # use the original key if not XX(x) etc.
if not re.search('([x-z])', key) and key not in abmn:
data.set(key.replace(' ', '_'), d[key])
r = data['u'] / data['i']
if hasattr(d, 'R(0)'):
if np.linalg.norm(r-d['R(O)']) < 1e4: # no idea what's that for
data.set('r', r)
else:
pg.debug("Warning! File inconsistent")
data.sortSensorsX()
if return_header:
return data, header
else:
return data
# def importAsciiColumns(...)
def readAsDictionary(content, token=None, sep=None): # obsolote due to numpy?
"""Read list of strings from a file as column separated dictionary.
e.g.
token1 token2 token3 token4
va1 va2 val3 val4
va1 va2 val3 val4
va1 va2 val3 val4
Parameters
----------
content: [string]
List of strings read from file:
e.g.
with open(filename, 'r') as fi:
content = fi.readlines()
fi.close()
token: [string]
If given the tokens will be the keys of the resulting dictionary.
When token is None, tokens will be the first row values.
When token is a empty list, the tokens will be autonamed to
'col' + str(ColNumber)
ret: dictionary
Dictionary of all data
"""
data = dict()
if token is None:
header = content[0].splitlines()[0].split(sep)
token = []
for i, tok in enumerate(header):
tok = tok.lstrip()
token.append(tok)
for i, row in enumerate(content[1:]):
vals = row.splitlines()[0].split(sep)
for j, v in enumerate(vals):
v = v.replace(',', '.')
if len(token) < j+1:
token.append('col' + str(j))
if token[j] not in data:
data[token[j]] = [None] * (len(content)-1)
try:
data[token[j]][i] = float(v)
except Exception:
if len(v) == 1 and v[0] == '-':
v = 0.0
data[token[j]][i] = v
return data