import gc import os import shelve import shutil import tempfile import time from functools import wraps from unittest import TestCase import six CURRENT_FILE_FOLDER = os.path.dirname(os.path.abspath(__file__)) class MyMemoization: def __init__(self, filename, readonly=False): if readonly: self.storage = shelve.open(filename, "r") else: self.storage = shelve.open(filename, "c") self.readonly = readonly @staticmethod def get_key(fn, args, kwargs): return str(args)+str(kwargs) def __call__(self, fn): @wraps(fn) def wrapper(*args, **kwargs): key = self.get_key(fn, args, kwargs) if key in self.storage: out = self.storage[key] else: out = fn(*args, **kwargs) if not self.readonly: self.storage[key] = out self.storage.sync() return out return wrapper class TestPythonShelveIssue(TestCase): def setUp(self): self.temp_folder = tempfile.mkdtemp(prefix="test_memoize_tmp_", dir=CURRENT_FILE_FOLDER) self.file_name = os.path.join(self.temp_folder, "memoize_test_file.dat") def tearDown(self): do_to_all_files(self.temp_folder, lambda x: os.chmod(x, 0o777)) gc.collect() shutil.rmtree(self.temp_folder) def test_1(self): args1 = (1, 2, 3) args2 = (4, 5, 6) memoize = MyMemoization(self.file_name, readonly=False) decorator = memoize(volatile_function) res = decorator(*args1) # possible fix is to add this line: # memoize.storage.close() do_to_all_files(self.temp_folder, lambda x: os.chmod(x, 0o444)) memoize = MyMemoization(self.file_name, readonly=True) decorator = memoize(volatile_function) self.assertEqual(res, decorator(*args1)) res = decorator(args2) self.assertNotEqual(res, decorator(args2)) def test_2(self): s = shelve.open(self.file_name, "c") s.close() do_to_all_files(self.temp_folder, lambda x: os.chmod(x, 0o444)) def wrapper(): memoize = MyMemoization(self.file_name, readonly=False) decorator = memoize(volatile_function) decorator(1, 2, 3) self.assertRaises(PermissionError, wrapper) def volatile_function(*args): if six.PY2: return time.clock(), args elif six.PY3: return time.perf_counter_ns(), args def do_to_all_files(path, callback): for (dirpath, dirnames, filenames) in os.walk(path): for file in filenames: p = os.path.join(dirpath, file) callback(p)