Source code for pygimli.physics.traveltime.TravelTimeManager

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""Class for managing first arrival travel time inversions."""
import os
import numpy as np

import pygimli as pg
from pygimli.frameworks import MeshMethodManager

from pygimli.utils import getSavePath
from . modelling import TravelTimeDijkstraModelling, FatrayDijkstraModelling
from . plotting import drawFirstPicks


[docs] class TravelTimeManager(MeshMethodManager): """Manager for refraction seismics (traveltime tomography). TODO Document main members and use default MethodManager interface e.g., self.inv, self.fop, self.paraDomain, self.mesh, self.data """
[docs] def __init__(self, data=None, **kwargs): """Create an instance of the Traveltime manager. Parameters ---------- data: :gimliapi:`GIMLI::DataContainer` | str You can initialize the Manager with data or give them a dataset when calling the inversion. """ self.useFatray = kwargs.pop("fatray", False) self.frequency = kwargs.pop("frequency", 100.) self.secNodes = kwargs.pop("secNodes", 2) super().__init__(data=data, **kwargs) self.inv.dataTrans = pg.trans.Trans()
@property def velocity(self): """Return velocity vector (the inversion model).""" # we can check here if there was an inversion run return self.fw.model # shouldn't it be the inverse?
[docs] def createForwardOperator(self, **kwargs): """Create default forward operator for Traveltime modelling. Your want your Manager use a special forward operator you can add them here on default Dijkstra is used. """ if self.useFatray: fop = FatrayDijkstraModelling(frequency=self.frequency, **kwargs) else: fop = TravelTimeDijkstraModelling(verbose=self.verbose) return fop
[docs] def load(self, fileName): """Load any supported data file.""" self.data = pg.physics.traveltime.load(fileName) return self.data
[docs] def createMeshMovedToMeshManager(self, data=None, **kwargs): """Create default inversion mesh. Inversion mesh for traveltime inversion does not need boundary region. Args ---- data: DataContainer Data container to read sensors from. Keyword Args ------------ Forwarded to `:py:func:pygimli.meshtools.createParaMesh` """ data = data or self.data if not hasattr(data, 'sensors'): pg.critical('Please provide a data container for mesh generation') return pg.meshtools.createParaMesh(data.sensors(), paraBoundary=0, boundary=0, **kwargs)
[docs] def checkData(self, data): """Return data from container.""" if isinstance(data, pg.DataContainer): if not data.haveData('t'): pg.critical('DataContainer has no "t" values.') return data['t'] return data
[docs] def checkError(self, err, dataVals): """Return relative error.""" if isinstance(err, pg.DataContainer): if not err.haveData('err'): pg.error('DataContainer has no "err" values. Fallback to 3%') return np.ones(err.size()) * 0.03 return err['err'] / dataVals return err
[docs] def applyMesh(self, mesh, secNodes=None, ignoreRegionManager=False): """Apply mesh, i.e. set mesh in the forward operator class.""" if secNodes is None: secNodes = self.secNodes self.fop._refineSecNodes = secNodes if secNodes > 0: if ignoreRegionManager: mesh = self.fop.createRefinedFwdMesh(mesh) self.fop.setMesh(mesh, ignoreRegionManager=ignoreRegionManager) self.fop.jacobian().clear()
[docs] def simulate(self, mesh=None, scheme=None, slowness=None, vel=None, seed=None, secNodes=2, noiseLevel=0.0, noiseAbs=0.0, **kwargs): """Simulate traveltime measurements. Perform the forward task for a given mesh, a slowness distribution (per cell) and return data (traveltime) for a measurement scheme. Parameters ---------- mesh : :gimliapi:`GIMLI::Mesh` Mesh to calculate for or use the last known mesh. scheme: :gimliapi:`GIMLI::DataContainer` Data measurement scheme needs 's' for shot and 'g' for geophone data token. slowness : array(mesh.cellCount()) | array(N, mesh.cellCount()) Slowness distribution for the given mesh cells can be: * a single array of len mesh.cellCount() * a matrix of N slowness distributions of len mesh.cellCount() * a slowness map [[marker0, slow0], [marker1, slow1], ...] vel : array(mesh.cellCount()) | array(N, mesh.cellCount()) Velocity distribution for the mesh cells (overwrites slowness!). secNodes: int [2] Number of refinement nodes to increase accuracy of the forward calculation. noiseLevel: float [0.0] Add relative noise to the simulated data. noiseLevel*100 in % noiseAbs: float [0.0] Add absolute noise to the simulated data in ms. seed: int [None] Seed the random generator for the noise. Keyword Arguments ----------------- returnArray: [False] Return only the calculated times. verbose: [self.verbose] Overwrite verbose level. **kwargs Additional kwargs ... Returns ------- t : array(N, data.size()) | DataContainer The resulting simulated travel time values (returnArray=True) or DataContainer containing them in t field (returnArray=False). Either one column array or matrix in case of slowness matrix. """ verbose = kwargs.pop('verbose', self.verbose) fop = self.fop scheme = scheme or self.data fop.data = scheme fop.verbose = verbose if mesh is not None: self.applyMesh(mesh, secNodes=secNodes, ignoreRegionManager=True) if vel is not None: slowness = 1/vel if slowness is None: pg.critical("Need some slowness or velocity distribution for" " simulation.") if len(slowness) == self.fop.mesh().cellCount(): t = fop.response(slowness) if verbose: print('min/max t:', min(t), max(t)) else: print(self.fop.mesh()) print("slowness: ", slowness) pg.critical("Simulate called with wrong slowness array.") ret = pg.DataContainer(scheme) if noiseLevel > 0 or noiseAbs > 0: if not ret.allNonZero('err'): err = noiseAbs + t * noiseLevel ret['err'] = err pg.verbose("Absolute error estimates (min:max) {0}:{1}".format( min(ret['err']), max(ret['err']))) t += pg.randn(ret.size(), seed=seed) * ret('err') if kwargs.pop('returnArray', False) is True: return t ret['t'] = t return ret
[docs] def invert(self, data=None, useGradient=True, vTop=500, vBottom=5000, secNodes=None, **kwargs): """Invert data. Parameters ---------- data : pg.DataContainer() Data container with at least SensorIndices 's g' (shot/geophone) & data values 't' (traveltime in s) and 'err' (absolute error in s) useGradient: bool [True] Use gradient starting model typical for refraction cases. For crosshole tomography geometry you should set this to False for a non-gradient (e.g. homogeneous) starting model. vTop: float Top velocity for gradient starting model. vBottom: float Bottom velocity for gradient starting model. secNodes: int [2] Number of secondary nodes for accuracy of forward computation. Returns ------- model: Mapped (for paradomain) velocity model. Keyword Arguments ----------------- ** kwargs: Inversion related arguments: See :py:mod:`pygimli.frameworks.MeshMethodManager.invert` """ mesh = kwargs.pop('mesh', None) if secNodes is not None: self.secNodes = secNodes if 'limits' in kwargs: if kwargs['limits'][0] > 1: tmp = kwargs['limits'][0] kwargs['limits'][0] = 1.0 / kwargs['limits'][1] kwargs['limits'][1] = 1.0 / tmp pg.verbose('Switching velocity limits to slowness limits.', kwargs['limits']) if useGradient: self.fop._useGradient = [vTop, vBottom] else: self.fop._useGradient = None ### invert return mapped models slowness = super().invert(data, mesh, **kwargs) velocity = 1.0 / slowness velocity.isParaModel = slowness.isParaModel self.fw.model = 1.0 / self.fw.model #C42 self.fw only hold non-mapped model # that needs to be compatible to self.fw.mesh return velocity
[docs] def showFit(self, axs=None, firstPicks=True, **kwargs): """Show data fit as first-break picks or apparent velocity.""" if firstPicks: kwargs.setdefault("linestyle", "None") ax, _ = self.showData(firstPicks=True, **kwargs) drawFirstPicks(ax, self.fop.data, self.inv.response, marker=None) else: super().showFit(axs=axs, **kwargs)
[docs] def getRayPaths(self, model=None): """Compute ray paths. If model is not specified, the last calculated Jacobian is used. Parameters ---------- model : array Velocity model for which to calculate and visualize ray paths (the default is model for last Jacobian calculation in self.velocity). Returns ------- list of two-column array holding x and y positions """ if model is not None: # if self.fop.jacobian().size() == 0 or model != self.model: self.fop.createJacobian(1/model) shots = self.fop.data.id("s") recei = self.fop.data.id("g") segs = [] nodes = self.fop._core.mesh().positions(withSecNodes=True) for s, g in zip(shots, recei): wi = self.fop.way(s, g) points = nodes[wi] segs.append(np.column_stack((pg.x(points), pg.y(points)))) return segs
[docs] def drawRayPaths(self, ax, model=None, rayPaths=None, **kwargs): """Draw the the ray paths for model or last model. If model is not specified, the last calculated Jacobian is used. Parameters ---------- rayPaths : list of np.array x/y column array with ray point positions model : array Velocity model for which to calculate and visualize ray paths (the default is model for last Jacobian calculation in self.velocity). ax : matplotlib.axes object To draw the model and the path into. **kwargs : type Additional arguments passed to LineCollection (alpha, linewidths, color, linestyles). Returns ------- lc : matplotlib.LineCollection """ rayPaths = rayPaths or self.getRayPaths(model=model) _ = kwargs.setdefault("color", "w") _ = kwargs.setdefault("alpha", 0.5) _ = kwargs.setdefault("linewidths", 0.8) from matplotlib.collections import LineCollection lc = LineCollection(rayPaths, **kwargs) ax.add_collection(lc) return lc
[docs] def showRayPaths(self, model=None, ax=None, **kwargs): """Show the model with ray paths for given model. If not model specified, the last calculated Jacobian is taken. Parameters ---------- model : array Velocity model for which to calculate and visualize ray paths (the default is model for last Jacobian calculation in self.velocity). ax : matplotlib.axes object To draw the model and the path into. **kwargs : type forward to drawRayPaths Returns ------- ax : matplotlib.axes object cb : matplotlib.colorbar object (only if model is provided) Examples -------- >>> import pygimli as pg >>> from pygimli.physics import traveltime as tt >>> >>> x, y = 8, 6 >>> mesh = pg.createGrid(x, y) >>> data = tt.createRAData([(0, 0)] + [(x, i) for i in range(y)], ... shotDistance=y+1) >>> data["t"] = 1.0 >>> mgr = tt.Manager(data) >>> mgr.applyMesh(mesh, secNodes=10) >>> ax, cb = mgr.showRayPaths(showMesh=True, diam=0.1) """ if model is None: if self.fop.jacobian().size() == 0: self.fop.mesh() # initialize any meshs .. just to be sure is 1 model = pg.Vector(self.fop.regionManager().parameterCount(), 1.0) else: model = self.model ax, cbar = self.showModel(ax=ax, model=model, showMesh=kwargs.pop('showMesh', None), diam=kwargs.pop('diam', None)) self.drawRayPaths(ax, model=model, **kwargs) return ax, cbar
[docs] def rayCoverage(self): """Ray coverage, i.e. summed raypath lengths.""" J = self.fop.jacobian() return J.transMult(np.ones(J.rows()))
[docs] def createTraveltimefield(self, v=None, startPos=None, withSec=False): """Compute a single traveltime field.""" startPos = startPos or self.data.sensor(0) fop = self.fop mesh = fop.mesh() Di = fop.dijkstra slowPerCell = fop.createMappedModel(1/v, 1e16) Di.setGraph(fop._core.createGraph(slowPerCell)) Di.setStartNode(mesh.findNearestNode(startPos)) dist = Di.distances() if withSec: return dist else: return dist[:mesh.nodeCount()]
[docs] def standardizedCoverage(self): """Standardized coverage vector (0|1) using neighbor info.""" coverage = self.rayCoverage() C = self.fop.constraintsRef() return np.sign(np.absolute(C.transMult(C * coverage)))
[docs] def showCoverage(self, ax=None, name='coverage', **kwargs): """Show the ray coverage in log-scale.""" if ax is None: fig, ax = pg.plt.subplots() cov = self.rayCoverage() return pg.show(self.fop.paraDomain, pg.log10(cov+min(cov[cov > 0])*.5), ax=ax, coverage=self.standardizedCoverage(), **kwargs)
[docs] def saveResult(self, folder=None, size=(16, 10), verbose=False, **kwargs): """Save the results in a specified (or date-time derived) folder. Saved items are: * Resulting inversion model * Velocity vector * Coverage vector * Standardized coverage vector * Mesh (bms and vtk with results) Args ---- path: str[None] Path to save into. If not set the name is automatically created size: (float, float) (16,10) Figure size. Keyword Args ------------ Will be forwarded to showResults Returns ------- str: Name of the result path. """ subfolder = self.__class__.__name__ path = getSavePath(folder, subfolder) if verbose: pg.info('Saving refraction data to: {}'.format(path)) np.savetxt(os.path.join(path, 'velocity.vector'), self.velocity) np.savetxt(os.path.join(path, 'velocity-cov.vector'), self.rayCoverage()) np.savetxt(os.path.join(path, 'velocity-scov.vector'), self.standardizedCoverage()) m = pg.Mesh(self.paraDomain) m['Velocity'] = self.paraModel(self.velocity) m['Coverage'] = self.rayCoverage() m['S_Coverage'] = self.standardizedCoverage() m.exportVTK(os.path.join(path, 'velocity')) m.saveBinaryV2(os.path.join(path, 'velocity-pd')) self.fop.mesh().save(os.path.join(path, 'velocity-mesh')) np.savetxt(os.path.join(path, 'chi.txt'), self.inv.chi2History) fig, ax = pg.plt.subplots() self.showResult(ax=ax, cov=self.standardizedCoverage(), **kwargs) fig.set_size_inches(size) fig.savefig(os.path.join(path, 'velocity.pdf'), bbox_inches='tight') pg.plt.close(fig) return path