#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Caching manager with function decorator.
Input supports python base types and all pg.core objects with .hash() method.
Output supports DataContainerERT, ...
TODO:
* Output types:
numpy.ndarray, pg.Mesh. pg.Vector, pg.Matrix
To use just add the decorator.
@pg.cache
def myLongRunningStuff(*args, **kwargs):
#...
return results
"""
import sys
import os
import inspect
import hashlib
import json
import time
import numpy as np
import pygimli as pg
__NO_CACHE__ = False
[docs]
def noCache(c):
global __NO_CACHE__
__NO_CACHE__ = c
[docs]
def strHash(string):
return int(hashlib.sha224(string.encode()).hexdigest()[:16], 16)
def valHash(a):
if isinstance(a, str):
return strHash(a)
elif isinstance(a, int):
return a
elif isinstance(a, list):
hsh = 0
for item in a:
hsh = hsh ^ valHash(item)
return hsh
elif isinstance(a, np.ndarray):
if a.ndim == 1:
return hash(pg.Vector(a))
elif a.ndim == 2:
# convert to RVector to use memcopy
return hash(pg.Vector(a.reshape((1,a.shape[0]*a.shape[1]))[0]))
else:
print(a)
pg.error('no hash for numpy array')
return hash(a)
class Cache(object):
def __init__(self, hashValue):
self._value = None
self._hash = hashValue
self._name = CacheManager().cachingPath(str(self._hash))
self._info = None
self.restore()
@property
def info(self):
if self._info is None:
self._info = {'type': '',
'file': '',
'date': 0,
'dur': 0.0,
'restored': 0,
'codeinfo': '',
'version': '',
'args': '',
'kwargs': {},
}
return self._info
@info.setter
def info(self, i):
self._info = i
@property
def value(self):
return self._value
@value.setter
def value(self, v):
self.info['type'] = str(type(v).__name__)
# if len(self.info['type']) != 1:
# pg.error('only single return caches supported for now.')
# return
self.info['file'] = self._name
self.updateCacheInfo()
if self.info['type'] == 'Mesh':
pg.info('Save Mesh binary v2')
v.saveBinaryV2(self._name)
elif self.info['type'] == 'RVector':
pg.info('Save RVector binary')
v.save(self._name, format=pg.core.Binary)
elif self.info['type'] == 'ndarray':
pg.info('Save ndarray')
np.save(self._name, v, allow_pickle=True)
elif hasattr(v, 'save') and hasattr(v, 'load'):
v.save(self._name)
else:
np.save(self._name, v, allow_pickle=True)
# pg.warn('ascii save of type', self.info['type'], 'might by dangerous')
# v.save(self._name)
self._value = v
pg.info('Cache stored:', self._name)
def updateCacheInfo(self):
with open(self._name + '.json', 'w') as of:
json.dump(self.info, of, sort_keys=False,
indent=4, separators=(',', ': '))
def restore(self):
"""Read data from json infos"""
if os.path.exists(self._name + '.json'):
# Fricking mpl kills locale setting to system default .. this went
# horrible wrong for german 'decimal_point': ','
pg.checkAndFixLocaleDecimal_point(verbose=False)
try:
with open(self._name + '.json') as file:
self.info = json.load(file)
# if len(self.info['type']) != 1:
# pg.error('only single return caches supported for now.')
#pg._y(pg.pf(self.info))
if self.info['type'] == 'DataContainerERT':
self._value = pg.DataContainerERT(self.info['file'],
removeInvalid=False)
# print(self._value)
elif self.info['type'] == 'RVector':
self._value = pg.Vector()
self._value.load(self.info['file'], format=pg.core.Binary)
elif self.info['type'] == 'Mesh':
pg.tic()
self._value = pg.Mesh()
self._value.loadBinaryV2(self.info['file'] + '.bms')
pg.debug("Restoring cache took:", pg.dur(), "s")
elif self.info['type'] == 'ndarray':
self._value = np.load(self.info['file'] + '.npy',
allow_pickle=True)
elif self.info['type'] == 'Cm05Matrix':
self._value = pg.matrix.Cm05Matrix(self.info['file'])
elif self.info['type'] == 'GeostatisticConstraintsMatrix':
self._value = pg.matrix.GeostatisticConstraintsMatrix(
self.info['file'])
else:
self._value = np.load(self.info['file'] + '.npy',
allow_pickle=True)
if self.value is not None:
self.info['restored'] = self.info['restored'] + 1
self.updateCacheInfo()
pg.info('Cache {3} restored ({1}s x {0}): {2}'.\
format(self.info['restored'],
round(self.info['dur'], 1),
self._name, self.info['codeinfo']))
else:
# default try numpy
pg.warn('Could not restore cache of type {0}.'.format(self.info['type']))
pg.debug("Restoring cache took:", pg.dur(), "s")
except Exception as e:
import traceback
traceback.print_exc(file=sys.stdout)
print(self.info)
pg.error('Cache restoring failed.')
#@pg.singleton
class CacheManager(object):
__instance = None
__has_init = False
def __new__(cls):
if cls.__instance is None:
cls.__instance = object.__new__(cls)
return cls.__instance
def __init__(self):
if not self.__has_init:
self._caches = {}
self.__has_init = True
@staticmethod
def instance(cls):
return cls.__instance__
def cachingPath(self, fName):
"""Create a path name for the cache"""
if pg.rc["globalCache"]:
path = pg.getCachePath()
else:
path = ".cache"
if not os.path.exists(path):
os.mkdir(path)
return os.path.join(path, fName)
def functInfo(self, funct):
"""Return unique info string about the called function."""
return funct.__code__.co_filename + ":" + funct.__qualname__
def hash(self, funct, *args, **kwargs):
""""Create a hash value"""
pg.tic()
functInfo = self.functInfo(funct)
funcHash = strHash(functInfo)
versionHash = strHash(pg.versionStr())
codeHash = strHash(inspect.getsource(funct))
argHash = 0
for i, a in enumerate(args):
if pg.isScalar(a):
argHash = argHash ^ valHash(str(i) + str(a))
else:
argHash = argHash ^ (valHash(i) ^ valHash(a))
for k, v in kwargs.items():
if pg.isScalar(v):
argHash = argHash ^ (valHash(k + str(v)))
else:
argHash = argHash ^ valHash(k) ^ valHash(v)
pg.debug("Hashing took:", pg.dur(), "s")
return funcHash ^ versionHash ^ codeHash ^ argHash
def cache(self, funct, *args, **kwargs):
""" Create a unique cache """
hashVal = self.hash(funct, *args, **kwargs)
cached = Cache(hashVal)
cached.info['codeinfo'] = self.functInfo(funct)
cached.info['version'] = pg.versionStr()
cached.info['args'] = str(args)
cached.info['kwargs'] = str(kwargs)
return cached
[docs]
def cache(funct):
"""Cache decorator."""
def wrapper(*args, **kwargs):
nc = kwargs.pop('skipCache', False)
if any(('--noCache' in sys.argv,
'-N' in sys.argv, nc is True, __NO_CACHE__)):
return funct(*args, **kwargs)
cache = CacheManager().cache(funct, *args, **kwargs)
if cache.value is not None:
return cache.value
else:
# pg.tic will not work because there is only one global __swatch__
sw = pg.Stopwatch(True)
rv = funct(*args, **kwargs)
cache.info['date'] = time.time()
cache.info['dur'] = sw.duration()
try:
cache.value = rv
except Exception as e:
print(e)
pg.warn("Can't cache:", rv)
return rv
return wrapper