#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Magnetic resonance sounding module."""
# general modules to import according to standards
import time
import numpy as np
#import matplotlib.pyplot as plt
import pygimli as pg
from pygimli.utils import iterateBounds
from pygimli.utils.base import gmat2numpy
from pygimli.viewer.mpl import drawModel1D
# local functions in package
from pygimli.physics.sNMR.modelling import MRS1dBlockQTModelling
from pygimli.physics.sNMR.plotting import showErrorBars, showWC, showT2
[docs]
class MRS():
"""Magnetic resonance sounding (MRS) manager class.
Attributes
----------
t, q : ndarray - time and pulse moment vectors
data, error : 2d ndarray - data and error cubes
K, z : ndarray - (complex) kernel and its vertical discretization
model, modelL, modelU : vectors - model vector and lower/upper bound to it
Methods
-------
loadMRSI - load MRSI (MRSmatlab format) data
showCube - show any data/error/misfit as data cube (over q and t)
showDataAndError - show data and error cubes
showKernel - show Kernel matrix
createFOP - create forward operator
createInv - create pygimli Inversion instance
run - run block-mono (alternatively smooth-mono) inversion (with bootstrap)
calcMCM - compute model covariance matrix and thus uncertainties
splitModel - return thickness, water content and T2* time from vector
showResult/showResultAndFit - show inversion result (with fit)
runEA - run evolutionary algorithm (GA, PSO etc.) using inspyred
plotPopulation - plot final population of an EA run
"""
[docs]
def __init__(self, name=None, verbose=True, **kwargs):
"""MRS init with optional data load from mrsi file
Parameters
----------
name : string
Filename with load data and kernel (*.mrsi) or just data (*.mrsd)
verbose : bool
be verbose
kwargs - see :func:`MRS.loadMRSI`.
"""
self.verbose = verbose
self.t, self.q, self.z = None, None, None
self.data, self.error = None, None
self.K, self.fop, self.INV = None, None, None
self.dcube, self.ecube = None, None
self.lLB, self.lUB = None, None
self.nlay = 0
self.model, self.modelL, self.modelU = None, None, None
self.lowerBound = [1.0, 0.0, 0.02] # d, theta, T2*
self.upperBound = [30., 0.45, 1.00] # d, theta, T2*
self.startval = [10., 0.30, 0.20] # d, theta, T2*
self.logpar = False
self.basename = 'new'
self.figs = {}
if name is not None: # load data and kernel
# check for mrsi/d/k
if name[-5:-1].lower() == '.mrs': # mrsi or mrsd
self.loadMRSI(name, **kwargs)
self.basename = name.rstrip('.mrsi')
# elif name[-5:].lower() == '.mrsd':
# self.loadMRSD(name, **kwargs)
elif name.lower().endswith('npz'):
self.loadDataNPZ(name, **kwargs)
else:
self.loadDir(name)
def __repr__(self): # for print function
"""String representation."""
out = ""
if len(self.t) > 0 and len(self.q) > 0:
out = "<MRSdata: %d qs, %d times" % \
(len(self.q), len(self.t))
if hasattr(self.z, '__iter__') and len(self.z) > 0:
out += ", %d layers" % len(self.z)
return out + ">"
[docs]
def loadDataNPZ(self, filename, **kwargs):
"""Load data and kernel from numpy gzip packed file.
The npz file contains the fields: q, t, D, (E), z, K
"""
self.basename = filename.rstrip('.npz')
DATA = np.load(filename)
self.q = DATA['q']
self.t = DATA['t']
self.z = np.absolute(DATA['z'])
self.K = DATA['K']
self.dcube = DATA['D']
ndcubet = len(self.dcube[0])
if len(self.dcube) == len(self.q) and ndcubet == len(self.t):
if kwargs.pop('usereal', False):
self.data = np.real(self.dcube.flat)
else:
self.data = np.abs(self.dcube.flat)
if 'E' in DATA:
self.ecube = DATA['E']
else:
self.ecube = np.zeros_like(self.dcube)
self.checkData(**kwargs)
[docs]
def loadKernelNPZ(self, filename, **kwargs):
"""Load data and kernel from numpy gzip packed file.
The npz file contains the fields: q, t, D, (E), z, K
"""
self.basename = filename.rstrip('.npz')
DATA = np.load(filename)
self.q = DATA['pulseMoments']
self.z = np.absolute(DATA['zVector'])
self.K = DATA['kernel']
[docs]
def loadMRSI(self, filename, **kwargs):
"""Load data, error and kernel from mrsi or mrsd file
Parameters
----------
usereal : bool [False]
use real parts (after data rotation) instead of amplitudes
mint/maxt : float [0.0/2.0]
minimum/maximum time to restrict time series
"""
from scipy.io import loadmat # loading Matlab mat files
if filename[-5:].lower() == '.mrsd':
idata = None
pl = loadmat(filename, struct_as_record=False,
squeeze_me=True)['proclog']
self.q = np.array([q.q for q in pl.Q])
self.t = pl.Q[0].rx.sig[0].t + pl.Q[0].timing.tau_dead1
nq = len(pl.Q)
nt = len(self.t)
self.dcube = np.zeros((nq, nt))
self.ecube = np.zeros((nq, nt))
# self.ecube = np.ones((nq, nt))*20e-9
for i in range(nq):
self.dcube[i, :] = pl.Q[i].rx.sig[1].V
self.ecube[i, :] = np.real(pl.Q[i].rx.sig[1].E)
else:
idata = loadmat(filename, struct_as_record=False,
squeeze_me=True)['idata']
self.t = idata.data.t + idata.data.effDead
self.q = idata.data.q
self.K = idata.kernel.K
self.z = np.hstack((0., idata.kernel.z))
self.dcube = idata.data.dcube
self.ecube = idata.data.ecube
defaultNoise = kwargs.get("defaultNoise", 100e-9)
if self.ecube[0][0] == 0:
self.ecube = np.ones_like(self.dcube) * defaultNoise
if self.verbose:
print("no errors in file, assuming", defaultNoise*1e9, "nV")
self.ecube = np.ones((len(self.q), len(self.t))) * defaultNoise
if idata is not None:
self.ecube /= np.sqrt(idata.data.gateL)
self.checkData(**kwargs)
# load model from matlab file (result of MRSQTInversion)
if filename[-5:].lower() == '.mrsi' and hasattr(idata, 'inv1Dqt'):
if hasattr(idata.inv1Dqt, 'blockMono'):
sol = idata.inv1Dqt.blockMono.solution[0]
self.model = np.hstack((sol.thk, sol.w, sol.T2))
self.nlay = len(sol.w)
if self.verbose:
print("loaded file: " + filename)
[docs]
def checkData(self, **kwargs):
"""Check data and retrieve data and error vector."""
mint = kwargs.pop('mint', 0)
maxt = kwargs.pop('maxt', 1000)
good = (self.t <= maxt) & (self.t >= mint)
self.t = self.t[good]
self.dcube = self.dcube[:, good]
self.ecube = self.ecube[:, good]
ndcubet = len(self.dcube[0])
if len(self.dcube) == len(self.q) and ndcubet == len(self.t):
if kwargs.pop('usereal', False):
self.data = np.real(self.dcube.flat)
else:
self.data = np.abs(self.dcube.flat)
else:
print('Dimensions do not match!')
necubet = len(self.dcube[0])
if len(self.ecube) == len(self.q) and necubet == len(self.t):
self.error = self.ecube.ravel()
if min(self.error) <= 0.:
print("Warning: negative errors present! Taking absolute value")
self.error = np.absolute(self.error)
defaultNoise = kwargs.pop("defaultNoise", 100e-9)
if min(self.error) == 0.:
if self.verbose:
print("Warning: zero error, assuming", defaultNoise)
self.error[self.error == 0.] = defaultNoise
# clip data if desired (using vmin and vmax keywords)
if "vmax" in kwargs:
vmax = kwargs['vmax']
self.error[self.data > vmax] = max(self.error)*3
self.data[self.data > vmax] = vmax
if "vmin" in kwargs:
vmin = kwargs['vmin']
self.error[self.data < vmin] = max(self.error)*3
self.data[self.data < vmin] = vmin
if self.verbose:
print(self)
[docs]
def loadMRSD(self, filename, usereal=False, mint=0., maxt=2.0):
"""Load mrsd (MRS data) file: not really used as in MRSD."""
from scipy.io import loadmat # loading Matlab mat files
print("Currently not using mint/maxt & usereal:", mint, maxt, usereal)
pl = loadmat(filename, struct_as_record=False,
squeeze_me=True)['proclog']
self.q = np.array([q.q for q in pl.Q])
self.t = pl.Q[0].rx.sig[0].t + pl.Q[0].timing.tau_dead1
nq = len(pl.Q)
nt = len(self.t)
self.dcube = np.zeros((nq, nt))
for i in range(nq):
self.dcube[i, :] = np.abs(pl.Q[i].rx.sig[1].V)
self.ecube = np.ones((nq, nt))*20e-9
[docs]
def loadDataCube(self, filename='datacube.dat'):
"""Load data cube from single ascii file (old stuff)"""
A = np.loadtxt(filename).T
self.q = A[1:, 0]
self.t = A[0, 1:]
self.data = A[1:, 1:].ravel()
[docs]
def loadErrorCube(self, filename='errorcube.dat'):
"""Load error cube from a single ascii file (old stuff)."""
A = np.loadtxt(filename).T
if len(A) == len(self.q) and len(A[0]) == len(self.t):
self.error = A.ravel()
elif len(A) == len(self.q) + 1 and len(A[0]) == len(self.t) + 1:
self.error = A[1:, 1:].ravel()
else:
self.error = np.ones(len(self.q) * len(self.t)) * 100e-9
[docs]
def loadKernel(self, name=''):
"""Load kernel matrix from mrsk or two bmat files."""
from scipy.io import loadmat # loading Matlab mat files
if name[-5:].lower() == '.mrsk':
kdata = loadmat(name, struct_as_record=False,
squeeze_me=True)['kdata']
self.K = kdata.K
self.z = np.hstack((0., kdata.model.z))
else: # try load real/imag parts (backward compat.)
KR = pg.Matrix(name + 'KR.bmat')
KI = pg.Matrix(name + 'KI.bmat')
self.K = np.zeros((KR.rows(), KR.cols()), dtype='complex')
for i in range(KR.rows()):
self.K[i] = np.array(KR[i]) + np.array(KI[i]) * 1j
[docs]
def loadZVector(self, filename='zkernel.vec'):
"""Load the kernel vertical discretisation (z) vector."""
self.z = pg.Vector(filename)
[docs]
def loadDir(self, dirname):
"""Load several standard files from dir (old Borkum stage)."""
if not dirname[-1] == '/':
dirname += '/'
self.loadDataCube(dirname + 'datacube.dat')
self.loadErrorCube(dirname + 'errorcube.dat')
self.loadKernel(dirname)
self.loadZVector(dirname + 'zkernel.vec')
self.dirname = dirname # to save results etc.
[docs]
def showCube(self, ax=None, vec=None, islog=None, clim=None, clab=None):
"""Plot any data (or response, error, misfit) cube nicely."""
if vec is None:
vec = np.array(self.data).flat
print(len(vec))
mul = 1.0
if max(vec) < 1e-3: # Volts
mul = 1e9
if ax is None:
_, ax = plt.subplots(1, 1)
if islog is None:
print(len(vec))
islog = (min(vec) > 0.)
negative = (min(vec) < 0)
if islog:
vec = np.log10(np.abs(vec))
if clim is None:
if negative:
cmax = max(max(vec), -min(vec))
clim = (-cmax, cmax)
else:
cmax = max(vec)
if islog:
cmin = cmax - 1.5
else:
cmin = 0.
clim = (cmin, cmax)
xt = range(0, len(self.t), 10)
xtl = [str(ti) for ti in np.round(self.t[xt] * 1000.)]
qt = range(0, len(self.q), 5)
qtl = [str(qi) for qi in np.round(np.asarray(self.q)[qt] * 10.) / 10.]
mat = np.array(vec).reshape((len(self.q), len(self.t)))*mul
im = ax.imshow(mat, interpolation='nearest', aspect='auto')
im.set_clim(clim)
ax.set_xticks(xt)
ax.set_xticklabels(xtl)
ax.set_yticks(qt)
ax.set_yticklabels(qtl)
ax.set_xlabel('$t$ [ms]')
ax.set_ylabel('$q$ [As]')
cb = plt.colorbar(im, ax=ax, orientation='horizontal')
if clab is not None:
cb.ax.set_title(clab)
return clim
[docs]
def showDataAndError(self, figsize=(10, 8), show=False):
"""Show data cube along with error cube."""
fig, ax = plt.subplots(1, 2, figsize=figsize)
self.showCube(ax[0], self.data * 1e9, islog=False)
self.showCube(ax[1], self.error * 1e9, islog=True)
if show:
plt.show()
self.figs['data+error'] = fig
return fig, ax
[docs]
def showKernel(self, ax=None):
"""Show the kernel as matrix (Q over z)."""
if ax is None:
fig, ax = plt.subplots()
self.figs['kernel'] = fig
# ax.imshow(self.K.T, interpolation='nearest', aspect='auto')
ax.matshow(self.K.T, aspect='auto')
yt = ax.get_yticks()
maxzi = self.K.shape[1]
yt = yt[(yt >= 0) & (yt < maxzi)]
if yt[-1] < maxzi-2:
yt = np.hstack((yt, maxzi))
ytl = [str(self.z[int(yti)]) for yti in yt]
zl = self.z[[int(yti) for yti in yt]]
ytl = [str(zi) for zi in np.round(zl, 1)]
ax.set_yticks(yt)
ax.set_yticklabels(ytl)
xt = ax.get_xticks()
maxqi = self.K.shape[0]
xt = xt[(xt >= 0) & (xt < maxqi)]
xtl = [np.round(self.q[iq], 2) for iq in xt]
ax.set_xticks(xt)
ax.set_xticklabels(xtl)
return fig, ax
[docs]
@staticmethod
def createFOP(nlay, K, z, t): # , verbose=True, **kwargs):
"""Create forward operator instance."""
fop = MRS1dBlockQTModelling(nlay, K, z, t)
return fop
[docs]
def setBoundaries(self):
"""Set parameter boundaries for inversion."""
for i in range(3):
self.fop.region(i).setParameters(self.startval[i],
self.lowerBound[i],
self.upperBound[i], "log")
[docs]
def createInv(self, nlay=3, lam=100., verbose=True, **kwargs):
"""Create inversion instance (and fop if necessary with nlay)."""
self.fop = MRS.createFOP(nlay, self.K, self.z, self.t)
self.setBoundaries()
self.INV = pg.Inversion(self.data, self.fop, verbose)
self.INV.setLambda(lam)
self.INV.setMarquardtScheme(kwargs.pop('lambdaFactor', 0.8))
self.INV.stopAtChi1(False) # now in MarquardtScheme
self.INV.setDeltaPhiAbortPercent(0.5)
self.INV.setAbsoluteError(np.abs(self.error))
self.INV.setRobustData(kwargs.pop('robust', False))
return self.INV
[docs]
@staticmethod
def simulate(model, K, z, t):
"""Do synthetic modelling."""
nlay = int(len(model) / 3) + 1
fop = MRS.createFOP(nlay, K, z, t)
return fop.response(model)
[docs]
def invert(self, nlay=3, lam=100., startvec=None,
verbose=True, uncertainty=False, **kwargs):
"""Easiest variant doing all (create fop and inv) in one call."""
if self.INV is None or self.nlay != nlay:
self.INV = self.createInv(nlay, lam, verbose, **kwargs)
self.INV.setVerbose(verbose)
if startvec is not None:
self.INV.setModel(startvec)
if verbose:
print("Doing inversion...")
self.model = np.array(self.INV.run())
return self.model
[docs]
def run(self, verbose=True, uncertainty=False, **kwargs):
"""Easiest variant doing all (create fop and inv) in one call."""
self.invert(verbose=verbose, **kwargs)
if uncertainty:
if verbose:
print("Computing uncertainty...")
self.modelL, self.modelU = iterateBounds(
self.INV, dchi2=self.INV.chi2() / 2, change=1.2)
if verbose:
print("ready")
[docs]
def splitModel(self, model=None):
"""Split model vector into d, theta and T2*."""
if model is None:
model = self.model
nl = int(len(self.model)/3) + 1 # self.nlay
thk = model[:nl - 1]
wc = model[nl - 1:2 * nl - 1]
t2 = model[2 * nl - 1:3 * nl - 1]
return thk, wc, t2
[docs]
def result(self):
"""Return block model results (thk, wc and T2 vectors)."""
return self.splitModel()
[docs]
def showResult(self, figsize=(10, 8), save='', fig=None, ax=None):
"""Show theta(z) and T2*(z) (+uncertainties if there)."""
if ax is None:
fig, ax = plt.subplots(1, 2, sharey=True, figsize=figsize)
self.figs['result'] = fig
thk, wc, t2 = self.splitModel()
showWC(ax[0], thk, wc)
showT2(ax[1], thk, t2)
if self.modelL is not None and self.modelU is not None:
thkL, wcL, t2L = self.splitModel(self.modelL)
thkU, wcU, t2U = self.splitModel(self.modelU)
showErrorBars(ax[0], thk, wc, thkL, thkU, wcL, wcU)
showErrorBars(ax[1], thk, t2*1e3, thkL, thkU, t2L*1e3, t2U*1e3)
if fig is not None:
if save:
fig.savefig(save, bbox_inches='tight')
return fig, ax
[docs]
def showResultAndFit(self, figsize=(12, 10), save='', plotmisfit=False,
maxdep=0, clim=None):
"""Show ec(z), T2*(z), data and model response."""
fig, ax = plt.subplots(2, 2 + plotmisfit, figsize=figsize)
self.figs['result+fit'] = fig
thk, wc, t2 = self.splitModel()
showWC(ax[0, 0], thk, wc, maxdep=maxdep)
showT2(ax[0, 1], thk, t2, maxdep=maxdep)
ax[0, 0].set_title(r'MRS water content $\theta$')
ax[0, 1].set_title(r'MRS decay time $T_2^*$')
ax[0, 0].set_ylabel('$z$ [m]')
ax[0, 1].set_ylabel('$z$ [m]')
if self.modelL is not None and self.modelU is not None:
thkL, wcL, t2L = self.splitModel(self.modelL)
thkU, wcU, t2U = self.splitModel(self.modelU)
showErrorBars(ax[0, 0], thk, wc, thkL, thkU, wcL, wcU)
showErrorBars(ax[0, 1], thk, t2*1e3, thkL, thkU, t2L*1e3, t2U*1e3)
if maxdep > 0.:
ax[0, 0].set_ylim([maxdep, 0.])
ax[0, 1].set_ylim([maxdep, 0.])
clim = self.showCube(ax[1, 0], self.data * 1e9, islog=False, clim=clim)
ax[1, 0].set_title('measured data [nV]') # log10
self.showCube(
ax[1, 1], self.INV.response() * 1e9, clim=clim, islog=False)
ax[1, 1].set_title('simulated data [nV]') # log10
if plotmisfit:
self.showCube(ax[0, 2], (self.data - self.INV.response()) * 1e9,
islog=False)
ax[0, 2].set_title('misfit [nV]') # log10
ewmisfit = (self.data - self.INV.response()) / self.error
self.showCube(ax[1, 2], ewmisfit, islog=False)
ax[1, 2].set_title('error-weighted misfit')
if save:
if not isinstance(save, str):
save = self.basename
fig.savefig(save, bbox_inches='tight')
return fig, ax
[docs]
def saveResult(self, filename):
"""Save inversion result to column text file for later use."""
thk, wc, t2 = self.splitModel()
z = np.hstack((0., np.cumsum(thk)))
ALL = np.column_stack((z, wc, t2))
if self.modelL is not None and self.modelU is not None:
thkL, wcL, t2L = self.splitModel(self.modelL)
thkU, wcU, t2U = self.splitModel(self.modelU)
zL = z.copy()
zL[1:] += (thkL - thk)
zU = z.copy()
zU[1:] += (thkU - thk)
ALL = np.column_stack((z, wc, t2, zL, zU, wcL, wcU, t2L, t2U))
np.savetxt(filename, ALL, fmt='%.3f')
[docs]
def loadResult(self, filename):
"""Load inversion result from column file."""
A = np.loadtxt(filename)
z, wc, t2 = A[:, 0], A[:, 1], A[:, 2]
thk = np.diff(z)
self.nlay = len(wc)
self.model = np.hstack((thk, wc, t2))
if len(A[0]) > 8:
zL, wcL, t2L = A[:, 3], A[:, 5], A[:, 7]
zU, wcU, t2U = A[:, 4], A[:, 6], A[:, 8]
thkL = thk + zL[1:] - z[1:]
thkU = thk + zU[1:] - z[1:]
t2L[t2L < 0.01] = 0.01
self.modelL = np.hstack((thkL, wcL, t2L))
t2U[t2U > 1.0] = 1.0
self.modelU = np.hstack((thkU, wcU, t2U))
[docs]
def calcMCM(self):
"""Compute linear model covariance matrix."""
J = gmat2numpy(self.fop.jacobian()) # (linear) jacobian matrix
D = np.diag(1 / self.error)
DJ = D.dot(J)
JTJ = DJ.T.dot(DJ)
MCM = np.linalg.inv(JTJ) # model covariance matrix
var = np.sqrt(np.diag(MCM)) # standard deviations from main diagonal
di = (1. / var) # variances as column vector
# scaled model covariance (=correlation) matrix
MCMs = di.reshape(len(di), 1) * MCM * di
return var, MCMs
[docs]
def calcMCMbounds(self):
"""Compute model bounds using covariance matrix diagonals."""
mcm = self.calcMCM()[0]
self.modelL = self.model - mcm
self.modelU = self.model + mcm
[docs]
def genMod(self, individual):
"""Generate (GA) model from random vector (0-1) using model bounds."""
model = np.asarray(individual) * (self.lUB - self.lLB) + self.lLB
if self.logpar:
return pg.exp(model)
else:
return model
[docs]
def runEA(self, nlay=None, eatype='GA', pop_size=100, num_gen=100,
runs=1, mp_num_cpus=8, **kwargs):
"""Run evolutionary algorithm using the inspyred library
Parameters
----------
nlay : int [taken from classic fop if not given]
number of layers
pop_size : int [100]
population size
num_gen : int [100]
number of generations
runs : int [pop_size*num_gen]
number of independent runs (with random population)
eatype : string ['GA']
algorithm, choose among:
'GA' - Genetic Algorithm [default]
'SA' - Simulated Annealing
'DEA' - Discrete Evolutionary Algorithm
'PSO' - Particle Swarm Optimization
'ACS' - Ant Colony Strategy
'ES' - Evolutionary Strategy
"""
import inspyred
import random
def mygenerate(random, args):
"""generate a random vector of model size"""
return [random.random() for i in range(nlay * 3 - 1)]
def my_observer(population, num_generations, num_evaluations, args):
""" print fitness over generation number """
best = min(population)
print('{0:6} -- {1}'.format(num_generations, best.fitness))
@inspyred.ec.evaluators.evaluator
def datafit(individual, args):
""" error-weighted data misfit as basis for evaluating fitness """
misfit = (self.data -
self.fop.response(self.genMod(individual))) / self.error
return np.mean(misfit**2)
# prepare forward operator
if self.fop is None or (nlay is not None and nlay is not self.nlay):
self.fop = MRS.createFOP(nlay)
lowerBound = pg.cat(pg.cat(pg.Vector(self.nlay - 1,
self.lowerBound[0]),
pg.Vector(self.nlay, self.lowerBound[1])),
pg.Vector(self.nlay, self.lowerBound[2]))
upperBound = pg.cat(pg.cat(pg.Vector(self.nlay - 1,
self.upperBound[0]),
pg.Vector(self.nlay, self.upperBound[1])),
pg.Vector(self.nlay, self.upperBound[2]))
if self.logpar:
self.lLB, self.lUB = pg.log(lowerBound), pg.log(
upperBound) # ready mapping functions
else:
self.lLB, self.lUB = lowerBound, upperBound
# self.f = MRS1dBlockQTModelling(nlay, self.K, self.z, self.t)
# setup random generator
rand = random.Random()
# choose among different evolution algorithms
if eatype == 'GA':
ea = inspyred.ec.GA(rand)
ea.variator = [
inspyred.ec.variators.blend_crossover,
inspyred.ec.variators.gaussian_mutation]
ea.selector = inspyred.ec.selectors.tournament_selection
ea.replacer = inspyred.ec.replacers.generational_replacement
if eatype == 'SA':
ea = inspyred.ec.SA(rand)
if eatype == 'DEA':
ea = inspyred.ec.DEA(rand)
if eatype == 'PSO':
ea = inspyred.swarm.PSO(rand)
if eatype == 'ACS':
ea = inspyred.swarm.ACS(rand, [])
if eatype == 'ES':
ea = inspyred.ec.ES(rand)
ea.terminator = [inspyred.ec.terminators.evaluation_termination,
inspyred.ec.terminators.diversity_termination]
else:
ea.terminator = inspyred.ec.terminators.evaluation_termination
# ea.observer = my_observer
ea.observer = [
inspyred.ec.observers.stats_observer,
inspyred.ec.observers.file_observer]
tstr = '{0}'.format(time.strftime('%y%m%d-%H%M%S'))
self.EAstatfile = self.basename + '-' + eatype + 'stat' + tstr + '.csv'
with open(self.EAstatfile, 'w') as fid:
self.pop = []
for i in range(runs):
rand.seed(int(time.time()))
self.pop.extend(ea.evolve(
evaluator=datafit, generator=mygenerate, maximize=False,
pop_size=pop_size, max_evaluations=pop_size*num_gen,
bounder=inspyred.ec.Bounder(0., 1.), num_elites=1,
statistics_file=fid, **kwargs))
# self.pop.extend(ea.evolve(
# generator=mygenerate, maximize=False,
# evaluator=inspyred.ec.evaluators.parallel_evaluation_mp,
# mp_evaluator=datafit, mp_num_cpus=mp_num_cpus,
# pop_size=pop_size, max_evaluations=pop_size*num_gen,
# bounder=inspyred.ec.Bounder(0., 1.), num_elites=1,
# statistics_file=fid, **kwargs))
self.pop.sort(reverse=True)
self.fits = [ind.fitness for ind in self.pop]
print('minimum fitness of ' + str(min(self.fits)))
[docs]
def plotPopulation(self, maxfitness=None, fitratio=1.05, savefile=True):
"""Plot fittest individuals (fitness<maxfitness) as 1d models
Parameters
----------
maxfitness : float
maximum fitness value (absolute) OR
fitratio : float [1.05]
maximum ratio to minimum fitness
"""
if maxfitness is None:
maxfitness = min(self.fits) * fitratio
fig, ax = plt.subplots(1, 2, sharey=True)
self.figs['population'] = fig
maxz = 0
for ind in self.pop:
if ind.fitness < maxfitness:
model = np.asarray(self.genMod(ind.candidate))
thk = model[:self.nlay - 1]
wc = model[self.nlay - 1:self.nlay * 2 - 1]
t2 = model[self.nlay * 2 - 1:]
drawModel1D(ax[0], thk, wc * 100, color='grey')
drawModel1D(ax[1], thk, t2 * 1000, color='grey')
maxz = max(maxz, sum(thk))
model = np.asarray(self.genMod(self.pop[0].candidate))
thk = model[:self.nlay - 1]
wc = model[self.nlay - 1:self.nlay * 2 - 1]
t2 = model[self.nlay * 2 - 1:]
drawModel1D(ax[0], thk, wc * 100, color='black', linewidth=3)
drawModel1D(ax[1], thk, t2 * 1000, color='black', linewidth=3,
plotfunction='semilogx')
ax[0].set_xlim(self.lowerBound[1] * 100, self.upperBound[1] * 100)
ax[0].set_ylim((maxz * 1.2, 0))
ax[1].set_xlim(self.lowerBound[2] * 1000, self.upperBound[2] * 1000)
ax[1].set_ylim((maxz * 1.2, 0))
xt = [10, 20, 50, 100, 200, 500, 1000]
ax[1].set_xticks(xt)
ax[1].set_xticklabels([str(xti) for xti in xt])
if savefile:
fig.savefig(self.EAstatfile.replace('.csv', '.pdf'),
bbox_inches='tight')
plt.show()
[docs]
def plotEAstatistics(self, fname=None):
"""Plot EA statistics (best, worst, ...) over time."""
if fname is None:
fname = self.EAstatfile
gen, psize, worst, best, med, avg, std = np.genfromtxt(
fname, unpack=True, usecols=range(7), delimiter=',')
stderr = std / np.sqrt(psize)
data = [avg, med, best, worst]
colors = ['black', 'blue', 'green', 'red']
labels = ['average', 'median', 'best', 'worst']
fig, ax = plt.subplots()
self.figs['statistics'] = fig
ax.errorbar(gen, avg, stderr, color=colors[0], label=labels[0])
ax.set_yscale('log')
for d, col, lab in zip(data[1:], colors[1:], labels[1:]):
ax.plot(gen, d, color=col, label=lab)
ax.fill_between(gen, data[2], data[3], color='#e6f2e6')
ax.grid(True)
ymin = min([min(d) for d in data])
ymax = max([max(d) for d in data])
yrange = ymax - ymin
ax.set_ylim((ymin - 0.1*yrange, ymax + 0.1*yrange))
ax.legend(loc='upper left') # , prop=prop)
ax.set_xlabel('Generation')
ax.set_ylabel('Fitness')
[docs]
def saveFigs(self, basename=None, extension="pdf"):
"""Save all figures to (pdf) files."""
if basename is None:
basename = self.basename
for key in self.figs:
self.figs[key].savefig(basename+"-"+key+"."+extension,
bbox_inches='tight')
if __name__ == "__main__":
datafile = 'example.mrsi'
numlayers = 4
mrs = MRS(datafile)
mrs.run(numlayers, uncertainty=True)
outThk, outWC, outT2 = mrs.result()
mrs.saveResult(mrs.basename+'.result')
mrs.showResultAndFit(save=mrs.basename+'.pdf')
plt.show()