Source

jaraco.performance / jaraco / performance / cache.py

"""
Cache implementations based on Raymond Hettinger's implementations,
originating with
http://code.activestate.com/recipes/498245-lru-and-lfu-cache-decorators/
and now part of Python 3.2 stdlib.

This implementation uses a classed-based approach, which provides a cleaner
interface, but may not perform as well as the function-based decorators.
"""

from __future__ import absolute_import

import collections
import inspect
import itertools
import functools
from itertools import ifilterfalse
from heapq import nsmallest
from operator import itemgetter
try:
	from thread import allocate_lock as Lock
except ImportError:
	try:
		from _thread import allocate_lock as Lock
	except:
		from _dummy_thread import allocate_lock as Lock

# use some forward-compatibility functions/objects
from py31compat.functools import wraps
from py26compat.collections import OrderedDict

class Counter(dict):
	'Mapping where default values are zero'
	def __missing__(self, key):
		return 0

class CacheMiss(object):
	"""
	An object like None, which resolves to boolean False, but which is
	distinguishable from None.
	"""
	def __bool__(self):
		return False

def get_frames():
	"""
	Get the stack of frames of the caller, including the
	caller's frame.
	"""
	# always do one f_back to get to caller of get_frames.
	starting_frame = inspect.currentframe().f_back
	def traverse(frame):
		while frame:
			yield frame
			frame = frame.f_back
	return traverse(starting_frame)

def _move_to_end(odict, key):
	'''
	Move an existing element to the end.

	Raises KeyError if the element does not exist.
	'''
	odict[key] = odict.pop(key)

class Cache(object):
	sentinel = object()
	"marker for looping around the queue"

	kwd_mark = object()
	"separate positional and keyword"

	def compose_key(self, args, kwargs):
		"""
		Compose a key from both positional and keyword arguments.
		Return a tuple
		"""
		key = args
		if kwargs:
			key += (self.kwd_mark,) + tuple(sorted(kwargs.items()))
		return key

	def lookup(self, *args, **kwargs):
		"""
		Do a manual lookup. Doesn't update hits/misses.
		Raises KeyError if a miss occurs.
		"""
		key = self.compose_key(args, kwargs)
		return self.store[key]

	def safe_lookup(self, *args, **kwargs):
		"""
		Like lookup, but return a CacheMiss if a miss occurs.
		"""
		key = self.compose_key(args, kwargs)
		return self.store.get(key, CacheMiss())

	def recheck(self):
		"""
		Use stack frame inspection to find this cache and re-query it
		for the same value. This is useful when the cache needs to be
		queried for the same value (such as after acquiring a lock).

		For example:

		@LRUCache
		def my_func(a, b, c):
			get_lock() # may block for a long time
			# since the previous call may have blocked for a long time,
			#  the cache contents may be very different, so revisit the
			#  cache.

			# note, we can't just call my_func.cache.lookup(a,b,c)
			#  because some of those parameters may have been passed
			#  as keyword arguments.
			cached = my_func.cache.recheck()
			if not isinstance(cached, CacheMiss): return cached
			# clever, expensive code here
			return expensive_call()
		"""
		# have to get the same args the caller used.
		frames = get_frames()
		# skip this frame
		self_frame = next(frames)
		for frame in frames:
			# search for any frame whose 'self' is this object
			if frame.f_locals.get('self') is self:
				# we found the wrapper function - so it must have an `args` and
				#  `kwds`
				args, kwds = frame.f_locals['args'], frame.f_locals['kwds']
				return self.safe_lookup(*args, **kwds)
		# we never found ourself
		return CacheMiss()

class LRUCache(Cache):
	'''
	Least-recently-used cache decorator.

	Arguments to the cached function must be hashable.
	Cache performance statistics stored in .hits and .misses.
	Clear the cache with .clear().
	Cache object can be accessed as f.cache, where f is the decorated
	 function.
	http://en.wikipedia.org/wiki/Cache_algorithms#Least_Recently_Used
	'''

	def __init__(self, maxsize=100):
		self.maxsize = maxsize
		# ordered least recent to most recent
		self.store = OrderedDict()
		# needed because OrderedDict isn't threadsafe
		self.lock = Lock()
		self.hits = self.misses = 0

	def decorate(self, user_function,
			len=len, iter=iter, tuple=tuple, sorted=sorted,
			KeyError=KeyError
			):

		# lookup optimizations (ugly but fast)
		sentinel = self.sentinel
		kwd_mark = self.kwd_mark
		lock = self.lock
		store = self.store
		store_popitem = self.store.popitem
		store_renew = getattr(store, 'move_to_end',
			functools.partial(_move_to_end, store))

		@wraps(user_function)
		def wrapper(*args, **kwds):
			# inline self.compose_key for performance
			key = args
			if kwds:
				key += (kwd_mark,) + tuple(sorted(kwds.items()))

			with lock:
				# get cache entry or compute if not found
				try:
					result = store[key]
					# record recent use of this key
					store_renew(key)
					self.hits += 1
					return result
				except KeyError:
					pass

			# enact the function with the lock released
			result = user_function(*args, **kwds)

			with lock:
				store[key] = result
				self.misses += 1

				# purge least recently used cache entry
				if len(store) > self.maxsize:
					store_popitem(0)

			return result
		wrapper.cache = self
		return wrapper

	def clear(self):
		self.store.clear()
		self.hits = self.misses = 0

	__call__ = decorate

class LFUCache(Cache):
	'''
	Least-frequenty-used cache decorator.

	Arguments to the cached function must be hashable.
	Cache performance statistics stored in .hits and .misses.
	Clear the cache with .clear().
	Cache object can be accessed as f.cache, where f is the decorated
	 function.
	http://en.wikipedia.org/wiki/Least_Frequently_Used
	'''

	def __init__(self, maxsize=100):
		self.maxsize = maxsize
		# mapping of args to results
		self.store = dict()
		# times each key has been accessed
		self.use_count = Counter()
		self.hits = self.misses = 0

	def decorate(self, user_function):
		kwd_mark = self.kwd_mark

		# lookup optimizations (ugly but fast)
		store = self.store
		use_count = self.use_count

		@wraps(user_function)
		def wrapper(*args, **kwds):
			# inline self.compose_key for performance
			key = args
			if kwds:
				key += (kwd_mark,) + tuple(sorted(kwds.items()))
			use_count[key] += 1

			# get cache entry or compute if not found
			try:
				result = store[key]
				self.hits += 1
			except KeyError:
				result = user_function(*args, **kwds)
				store[key] = result
				self.misses += 1

				# purge least frequently used cache entry
				if len(store) > self.maxsize:
					for key, _ in nsmallest(self.maxsize // 10,
						use_count.iteritems(),
						key=itemgetter(1)
						):
						del store[key], use_count[key]

			return result
		wrapper.cache = self
		return wrapper

	__call__ = decorate

	def clear(self):
		self.store.clear()
		self.use_count.clear()
		self.hits = self.misses = 0