Source code for pygimli.physics.ert.timelapse

"""Timelapse ERT manager class."""
import os.path
from glob import glob
from datetime import datetime, timedelta
import numpy as np
import pygimli as pg
from pygimli.physics import ert
from .processing import combineMultipleData


# move general timelapse stuff to method-independent class
# class Timelapse():
#     mask
#     chooseTime
# class TimelapseERT(Timelapse)


[docs] class TimelapseERT(): """Class for crosshole ERT data manipulation. Note that this class is to be split into a hierarchy of classes for general timelapse data management, timelapse ERT and crosshole ERT. You can load data, filter them data in the temporal or measuring axis, plot data, run inversion and export data and result files. """
[docs] def __init__(self, filename=None, **kwargs): """Initialize class and possibly load data. Parameters ---------- filename : str filename to load data, times, RHOA and ERR from data : DataContainerERT The data with quadrupoles for all times : np.array of datetime objects measuring times DATA : 2d np.array (data.size(), len(times)) all apparent resistivities ERR : 2d np.array (data.size(), len(times)) all apparent relative errors bhmap : array map electrode numbers to borehole numbers mesh : array mesh for inversion """ self.data = kwargs.pop("data", None) self.DATA = kwargs.pop("DATA", []) self.ERR = kwargs.pop("ERR", []) self.times = kwargs.pop("times", []) self.mesh = kwargs.pop("mesh", None) self.pd = None self.name = "new" self.models = [] self.responses = [] self.chi2s = [] self.model = None self.mgr = ert.ERTManager() if self.mesh is not None: self.mgr.setMesh(self.mesh) if filename is not None: if isinstance(filename, str): results = kwargs.pop("results", None) self.load(filename, **kwargs) if results is not None: self.loadResults(results) else: self.DATA = filename if np.any(self.DATA): if isinstance(self.DATA[0], pg.DataContainerERT): self.data, self.DATA, self.ERR = combineMultipleData(self.DATA) self.mask() if "name" in kwargs: self.name = kwargs["name"] nt = 0 if np.any(self.DATA): nt = self.DATA.shape[1] if len(self.times) != nt: # default: days from now self.times = datetime.now() + np.arange(nt) * timedelta(days=1)
def __repr__(self): # for print function """Return string representation of the class.""" out = ['Timelapse ERT data:', self.data.__str__()] if np.any(self.DATA): out.append("{} time steps".format(self.DATA.shape[1])) if np.any(self.times): out[-1] += " from " + self.times[0].isoformat(" ", "minutes") out[-1] += " to " + self.times[-1].isoformat(" ", "minutes") return "\n".join(out)
[docs] def load(self, filename, **kwargs): """Load or import data (or data files using *).""" if os.path.isfile(filename): self.data = ert.load(filename) if os.path.isfile(filename[:-4]+".rhoa"): self.DATA = np.loadtxt(filename[:-4]+".rhoa") if os.path.isfile(filename[:-4]+".err"): self.ERR = np.loadtxt(filename[:-4]+".err") if os.path.isfile(filename[:-4]+".times"): timestr = np.loadtxt(filename[:-4]+".times", dtype=str) self.times = np.array( [datetime.fromisoformat(s) for s in timestr]) elif "*" in filename: DATA = [ert.load(fname) for fname in glob(filename)] self.data, self.DATA, self.ERR = combineMultipleData(DATA) self.name = filename[:-4].replace("*", "All")
[docs] def saveData(self, filename=None, masknan=True): """Save all data as datacontainer, times, rhoa and error arrays.""" filename = filename or self.name if filename.endswith(".shm"): filename = filename[:-4] self.data.save(filename+".shm", "a b m n k") DATA = self.DATA.copy() if masknan: DATA[DATA.mask] = np.nan np.savetxt(filename+".rhoa", DATA, fmt="%6.2f") if np.any(self.ERR): np.savetxt(filename+".err", self.ERR, fmt="%6.2f") with open(filename+".times", "w", encoding="utf-8") as fid: for d in self.times: fid.write(d.isoformat()+"\n") self.name = filename
[docs] def timeIndex(self, t): # """Return index of closest timestep in times to t. Parameters ---------- t : int|str|datetime datetime object or string """ if isinstance(t, str): # convert into datetime t = datetime.fromisoformat(t) # check others if isinstance(t, datetime): # detect closest point return np.argmin(np.abs(self.times-t)) elif isinstance(t, (int, np.int32, np.int64)): return t elif hasattr(t, "__iter__"): return np.array([self.timeIndex(ti) for ti in t], dtype=int) else: raise TypeError("Unknown type", type(t))
[docs] def filter(self, tmin=0, tmax=None, t=None, select=None, kmax=None): """Filter data set temporally or data-wise. Parameters ---------- tmin, tmax : int|str|datetime minimum and maximum times to keep t : int|str|datetime time to remove select : list[int] times to select kmax : float maximum geometric factor to allow """ if np.any(self.DATA): if select is not None: ind = self.timeIndex(select) else: tmin = self.timeIndex(tmin) # converts dt/str to int if tmax is None: tmax = self.DATA.shape[1] else: tmax = self.timeIndex(tmax) ind = np.arange(tmin, tmax) if t is not None: ind = np.setxor1d(ind, self.timeIndex(t)) self.DATA = self.DATA[:, ind] if np.any(self.ERR): self.ERR = self.ERR[:, ind] if np.any(self.times): self.times = self.times[ind] if kmax is not None: ind = np.nonzero(np.abs(self.data["k"]) < kmax)[0] self.data["valid"] = 0 self.data.markValid(ind) self.data.removeInvalid() if np.any(self.DATA): self.DATA = self.DATA[ind, :] if np.any(self.ERR): self.ERR = self.ERR[ind, :]
[docs] def mask(self, rmin=0.1, rmax=1e6, emax=None): """Mask data (i.e. remove them from inversion). Parameters ---------- rmin, rmax : float minimum and maximum apparent resistivity emax : float maximum error """ self.DATA = np.ma.masked_invalid(self.DATA) self.DATA = np.ma.masked_outside(self.DATA, rmin, rmax) if emax is not None and np.any(self.ERR): self.DATA.mask = np.bitwise_or(self.DATA.mask, self.ERR > emax)
[docs] def automask(self, dmax=0.3, nc=5): """Automatic outlier masking using dist to smoothed curve.""" from pygimli.frameworks import harmfit tt = np.array([ti.toordinal() for ti in self.times]) for data in self.DATA: ddata = data[~data.mask].data if len(ddata) > 3: hf = harmfit(ddata, tt[~data.mask], verbose=False, nc=nc, robustData=True, resample=tt)[0] misfit = np.abs(data/hf - 1) data.mask[misfit > dmax] = True
[docs] def showData(self, v="rhoa", t=None, **kwargs): """Show data as pseudosections (single-hole) or cross-plot (crosshole) Parameters ---------- v : str|array ["rhoa] array or field to plot t : int|str|datetime time to choose (can also be first argument) x, y : str|array ["a", "m"] values to use for x and y axes crossplot : bool [x and y given] force AB-MN crossplot kwargs : dict forwarded to ert.show or showDataContainerAsMatrix """ if isinstance(v, (int, str)) and t is None: # obviously t meant t = v v = "rhoa" kwargs.setdefault("cMap", "Spectral_r") if t is not None: t = self.timeIndex(t) rhoa = self.DATA[:, t].copy() v = rhoa.data v[rhoa.mask] = np.nan if kwargs.pop("crossplot", "x" in kwargs and "y" in kwargs): x = kwargs.pop("x", ["a", "b"]) y = kwargs.pop("y", ["m", "n"]) return pg.viewer.mpl.showDataContainerAsMatrix( self.data, x, y, v, **kwargs) else: kwargs.setdefault("x", "a") kwargs.setdefault("y", "m") return self.data.show(v, **kwargs)
[docs] def showTimeline(self, ax=None, **kwargs): """Show data timeline. Parameters ---------- ax : mpl.Axes|None matplotlib axes to plot (otherwise new) a, b, m, n : int tokens to extract data from """ if ax is None: _, ax = pg.plt.subplots(figsize=[8, 5]) good = np.ones(self.data.size(), dtype=bool) lab = kwargs.pop("label", "ABMN") + ": " for k, v in kwargs.items(): good = np.bitwise_and(good, self.data[k] == v) abmn = [self.data[tok] for tok in "abmn"] for i in np.nonzero(good)[0]: lab1 = lab + " ".join([str(tt[i]) for tt in abmn]) ax.semilogy(self.times, self.DATA[i, :], "x-", label=lab1) ax.grid(True) ax.legend() ax.set_xlabel("time") ax.set_ylabel("resistivity (Ohmm)") return ax
[docs] def fitReciprocalErrorModel(self, **kwargs): """Fit all data by analysing normal/reciprocal data. Parameters ---------- show : bool show temporal behaviour of absolute & relative errors kwargs passed on to ert.fitReciprocalErrorModel) nBins : int number of bins to subdivide data (4 < data.size()//30 < 30) rel : bool [False] fit relative instead of absolute errors Returns ------- p, a : array relative (p) and absolute (a) errors for every time step """ data = self.data.copy() p = np.zeros(self.DATA.shape[1]) a = np.zeros_like(p) show = kwargs.pop("show", False) # avoid show single fits rel = kwargs.get("rel", True) for i, rhoa in enumerate(self.DATA.T): if isinstance(rhoa, np.ma.MaskedArray): rhoa = rhoa.data data['rhoa'] = rhoa try: pp, aa = ert.fitReciprocalErrorModel(data, **kwargs) if not rel: pp, aa = aa, pp p[i], a[i] = pp, aa except: print("Could not get reciprocal model for timestep", i) if show: _, ax = pg.plt.subplots(nrows=2, sharex=True) ax[0].plot(self.times, p*100) ax[1].plot(self.times, a) ax[0].set_ylabel("relative error (%)") ax[1].set_ylabel("absolute error (Ohm)") ax[0].grid() ax[1].grid() return p, a
[docs] def generateDataPDF(self, **kwargs): """Generate a pdf with all data as timesteps in individual pages. Iterates through times and calls showData into multi-page pdf """ kwargs.setdefault("verbose", False) from matplotlib.backends.backend_pdf import PdfPages with PdfPages(self.name+'-data.pdf') as pdf: fig = pg.plt.figure(figsize=kwargs.pop("figsize", [5, 5])) for i in range(self.DATA.shape[1]): ax = fig.subplots() self.showData(t=i, ax=ax, **kwargs) ax.set_title(str(i)+": "+ self.times[i].isoformat(" ", "minutes")) fig.savefig(pdf, format='pdf') fig.clf()
[docs] def generateTimelinePDF(self, key="a", filename=None, **kwargs): """Generate multipage PDF with timeline data. Parameters ---------- key : str ['a'] data key to sort measurements after filename : str [name+'time'+key+'.pdf'] output pdf filename kwargs : dict passed to showTimeline """ from matplotlib.backends.backend_pdf import PdfPages if filename is None: filename = self.name+'-time-'+key+'.pdf' with PdfPages(filename) as pdf: fig = pg.plt.figure() for a in np.unique(self.data[key]): ax = fig.subplots() self.showTimeline(ax=ax, a=a) fig.savefig(pdf, format='pdf') fig.clf()
[docs] def chooseTime(self, t=None): """Return data for specific time. Parameters ---------- t : int|str|datetime """ if not isinstance(t, (int, np.int32, np.int64)): t = self.timeIndex(t) rhoa = self.DATA[:, t].copy() arhoa = np.abs(rhoa.data) arhoa[rhoa.mask] = np.nanmedian(arhoa) data = self.data.copy() data["rhoa"] = arhoa data.estimateError() data["err"][rhoa.mask] = 1e8 self.data = data return data
[docs] def createMesh(self, **kwargs): """Generate inversion mesh.""" self.mesh = ert.createInversionMesh(self.data, **kwargs) self.mgr.setMesh(mesh=self.mesh) if kwargs.pop("show", False): print(self.mesh) pg.show(self.mesh, markers=True, showMesh=True)
[docs] def invert(self, t=None, reg=None, regTL=None, **kwargs): """Run inversion for a specific timestep or all subsequently. Parameter --------- t : int|datetime|str|array time index, string or datetime object, or array of any of these reg : dict regularization options (setRegularization) for all inversions regTL : dict regularization options for timesteps inversion only **kwargs : dict keyword arguments passed to ERTManager.invert """ if t is not None: t = self.timeIndex(t) if self.mesh is None: self.createMesh() self.mgr.fop.setVerbose(False) if isinstance(reg, dict): self.mgr.inv.setRegularization(**reg) if t is None: # all t = np.arange(len(self.times)) t = np.atleast_1d(t) models = [] responses = [] self.chi2s = [] startModel = kwargs.pop("startModel", 100) creep = kwargs.pop("creep", False) kwargs.setdefault("isReference", True) for i, ti in enumerate(np.atleast_1d(t)): self.mgr.setData(self.chooseTime(ti)) self.model = self.mgr.invert(startModel=startModel, **kwargs) if i == 0 or creep: startModel = self.model.copy() models.append(self.model) responses.append(self.mgr.inv.response) self.chi2s.append(self.mgr.inv.chi2()) if i == 0 and isinstance(regTL, dict): kwargs.update(regTL) # self.mgr.inv.setRegularization(**regTL) if len(t) == 1: self.mgr.showResult() self.models = np.array(models) self.responses = np.array(responses) self.pd = self.mgr.paraDomain
[docs] def fullInversion(self, scalef=1.0, **kwargs): """Full (4D) inversion.""" DATA = [self.chooseTime(ti) for ti in range(len(self.times))] fop = pg.frameworks.MultiFrameModelling(ert.ERTModelling, scalef=scalef) fop.setData(DATA) if self.mesh is None: self.createMesh() fop.setMesh(self.mesh) print(fop.mesh()) # important to call mesh() function once! dataVec = np.concatenate([data["rhoa"] for data in DATA]) errorVec = np.concatenate([data["err"] for data in DATA]) startModel = fop.createStartModel(dataVec) inv = pg.Inversion(fop=fop, startModel=startModel, verbose=True) fop.createConstraints(C=kwargs.pop("C", None)) kwargs.setdefault("maxIter", 10) kwargs.setdefault("verbose", True) kwargs.setdefault("startModel", startModel) model = inv.run(dataVec, errorVec, **kwargs) self.models = np.reshape(model, [len(DATA), -1]) self.responses = np.reshape(inv.response, [DATA[0].size(), -1]) self.pd = fop.paraDomain return model
[docs] def showFit(self, **kwargs): """Show data, model response and misfit.""" _, ax = pg.plt.subplots(nrows=3, figsize=(10, 6), sharex=True, sharey=True) kwargs.setdefault("verbose", False) _, cb = self.showData(ax=ax[0], **kwargs) self.showData(self.mgr.inv.response, ax=ax[1], cMin=cb.vmin, cMax=cb.vmax, **kwargs) misfit = self.mgr.inv.response / self.data["rhoa"] * 100 - 100 self.showData(misfit, ax=ax[2], cMin=-10, cMax=10, cMap="bwr", **kwargs) return ax
[docs] def showAllModels(self, ncols=2, **kwargs): """Show all models as subplots. Parameters ---------- ncols : int [2] number of columns **kwargs : dict keyword arguments passed to pg.show """ nT = self.DATA.shape[1] showRatio = kwargs.pop("ratio", False) nrows = int(np.ceil(nT/ncols)) _, ax = pg.plt.subplots(nrows=nrows, ncols=ncols, figsize=kwargs.pop("figsize", [8, 5])) ratiokw = kwargs.copy() kwargs.setdefault("cMin", np.min(self.models)) kwargs.setdefault("cMax", np.max(self.models)) kwargs.setdefault("cMap", "Spectral_r") kwargs.setdefault("logScale", True) models = self.models.copy() if showRatio: models = self.models / self.models[0] rmax = np.maximum(np.max(models), 1/np.min(models)) ratiokw["cMax"] = kwargs.pop("rMax", rmax) ratiokw["cMin"] = 1 / ratiokw["cMax"] ratiokw["cMap"] = "bwr" ratiokw["logScale"] = True for i, model in enumerate(models): if i == 0 or not showRatio: pg.show(self.pd, self.models[i], ax=ax.flat[i], **kwargs) else: pg.show(self.pd, model, ax=ax.flat[i], **ratiokw) return ax
[docs] def generateModelPDF(self, **kwargs): """Generate a multi-page pdf with the model results. Parameters ---------- **kwargs : keyword arguments passed to pg.show() """ kwargs.setdefault('label', pg.unit('res')) kwargs.setdefault('cMap', pg.utils.cMap('res')) kwargs.setdefault('logScale', True) from matplotlib.backends.backend_pdf import PdfPages with PdfPages(self.name+'-model.pdf') as pdf: fig = pg.plt.figure(figsize=kwargs.pop("figsize", [8, 5])) for i, model in enumerate(self.models): ax = fig.subplots() pg.show(self.pd, model, ax=ax, **kwargs) ax.set_title(str(i)+": " + self.times[i].isoformat(" ", "minutes")) fig.savefig(pdf, format='pdf') fig.clf()
[docs] def generateRatioPDF(self, **kwargs): """Generate a multi-page pdf with the model results. Parameters ---------- creep : bool [False] Use preceding time step as reference (default is baseline) cMax : float [2] maximum of color scale cMax : float [1/cMax] minimum of color scale logScale : bool [True] logarithmic color scale cMap : str ['bwr'] colormap """ kwargs.setdefault('label', 'ratio') kwargs.setdefault('cMap', 'bwr') kwargs.setdefault('logScale', True) kwargs.setdefault("cMax", 2.0) kwargs.setdefault("cMin", 1/kwargs["cMax"]) basemodel = self.models[0] creep = kwargs.pop("creep", False) from matplotlib.backends.backend_pdf import PdfPages with PdfPages(self.name+'-ratio.pdf') as pdf: fig = pg.plt.figure(figsize=kwargs.pop("figsize", [8, 5])) for i, model in enumerate(self.models[1:]): ax = fig.subplots() pg.show(self.pd, model/basemodel, ax=ax, **kwargs) ax.set_title(str(i)+": " + self.times[i+1].isoformat(" ", "minutes") + "/" + self.times[i].isoformat(" ", "minutes")) fig.savefig(pdf, format='pdf') fig.clf() if creep: basemodel = model
[docs] def exportVTK(self, name=None, oneforall=False): """Generate output vtk(s) for postprocessing.""" name = name or self.name if name.endswith(".vtk"): name = name[:-4] vtk = self.pd.copy() if oneforall: for i, model in enumerate(self.models): vtk[f"model{i}"] = model vtk.exportVTK(name+"_results.vtk") else: for i, model in enumerate(self.models): vtk["resistivity"] = model vtk.exportVTK(name+f"_result{i}.vtk")
[docs] def saveResults(self, basename=None): """Save inversion results.""" if basename is None: basename = self.name self.pd.save(basename+".bms") np.savetxt(basename+".result", self.models, fmt="%.1f")
[docs] def loadResults(self, basename=None): """Load inversion results.""" if basename is None or basename is True: basename = self.name self.pd = pg.load(basename+".bms") self.models = np.loadtxt(basename+".result")
if __name__ == "__main__": pass