メソッドキャッシュ

Django
2011-06-05 21:09 (13 years ago) ytyng
# -*- coding:utf-8 -*-

"""
Decorator to cache class method results
"""

import hashlib
import base64
from functools import wraps
import logging

from django.core.cache import cache as django_cache
from django.utils.encoding import smart_str

def _logging_verbose(message):
    #pass
    logging.debug('[METHOD_CACHE] %s' % message)


def _execute(method, obj, args, kwargs, cache_timeout=None, cache_backend=None):
    """
    Execute the method
    If the result is already cached, return it without executing
    """
    cache_key = _generate_cache_key(obj, method, args, kwargs)
    if cache_backend is None:
        cache_backend = django_cache

    result = cache_backend.get(cache_key, None)
    if result is None:
        # Execute the method!
        result = method(obj, *args, **kwargs)
        if cache_timeout is None:
            cache_backend.set(cache_key, result)
        else:
            cache_backend.set(cache_key, result, cache_timeout)
        _logging_verbose('cache NO hit. key=%s' % cache_key)
    else:
        _logging_verbose('cache hit. key=%s' % cache_key)
    return result


def _generate_cache_key(obj, method, args=[], kwargs={}):
    """
    Generate cache key
    Create by concatenating object name, method name, and arguments.
    If it exceeds 250 characters, hash the arguments
    @param
        (class|instance) obj: Class or instance
        (function|str) method: The method itself or the name of the method as a string
        (list) args: List of arguments
        (dict) kw: Dictionary of keyword arguments
    """
    KEY_LENGTH_LIMIT = 250
    o_name = obj.__name__ if hasattr(obj,'__name__') else obj.__class__.__name__
    if hasattr(method, '_original_method'):
        method = method._original_method
    f_name = method.func_name
    arg_names = method.func_code.co_varnames[1:method.func_code.co_argcount] 
    # ↑ List of argument names. The first one contains cls or self, so ignore it
    arg_nv_list = []
    for i in range(len(arg_names)):
        arg_name = arg_names[i]
        if len(args) > i:
            arg_nv_list.append((arg_name, args[i]))
        elif arg_name in kwargs:
            arg_nv_list.append((arg_name, kwargs[arg_name]))
        else:
            arg_nv_list.append((arg_name, ''))
    args_str = ','.join([ repr(arg_name) + ":" + _get_arg_value_unique_name(arg_value) for arg_name, arg_value in arg_nv_list ])
    approach_1 = "MC/%s/%s/%s" % (o_name, f_name, args_str)
    if len(approach_1) < KEY_LENGTH_LIMIT:
        return approach_1
    else:
        # Key is too long, so hash part of it
        hashed = hashlib.md5(args_str)
        hashed_key_part = base64.b64encode(hashed.digest())
        approach_2 = "MC/%s/%s/*%s" % (o_name, f_name, hashed_key_part)
        return approach_2[:KEY_LENGTH_LIMIT]


def _get_arg_value_unique_name(arg_value):
    """
    Create a string to uniquely identify arg_value
    For use as a cache key
    """
    if hasattr(arg_value, 'pk'):
        return str(arg_value.pk)
    else:
        return smart_str(arg_value, errors='ignore') #repr might be better?


def method_cache(*args, **kwargs):
    """
    Main decorator

    Mainly used for class methods.
    Can be used for instance methods, but since the cache key will be the same even if the instances are different,
    it might return unexpected results, so generally not recommended.
    Assumes the first argument is a class or instance, so cannot be used with staticmethods.

    Recommended to write at the bottom of the decorator. (Otherwise, the argument names of the target method might not be correctly retrieved. Probably.)

    from common.decorators.method_cache import method_cache, delete_method_cache

    class Hoge(object):

        @classmethod
        @method_cache
        def heavy_method(cls, arg1):
            ...
            ...

        def save(self):
            delete_method_cache(self, self.heavy_method, args=(self.arg1,))
            ...
            ...

    """
    cache_timeout = kwargs.pop('cache_timeout', None)
    cache_backend = kwargs.pop('cache_backend', None)
    assert not kwargs, "Keyword argument accepted is cache_backend or cache_timeout"
    if cache_backend is None and cache_timeout is None:
        method = args[0]
        @wraps(method)
        def decorate(obj, *args, **kwargs):
            return _execute(method, obj, args, kwargs)
        decorate._original_method = method # To reference method arguments even if decorated
        return decorate
    else:
        params = {}
        if not cache_timeout is None:
            params['cache_timeout'] = cache_timeout
        if not cache_backend is None:
            params['cache_backend'] = cache_backend
        def _internal_params(method):
            @wraps(method)
            def decorate(obj, *args, **kwargs):
                return _execute(method, obj, args, kwargs, **params)
            decorate._original_method = method # To reference method arguments even if decorated
            return decorate
        return _internal_params



def delete_method_cache(obj, method, args=[], kwargs={}, cache_backend=None):
    """
    Delete cache.
    Embed in each model's save() method.
    It is okay to receive parameters as unnamed or named arguments.
    """
    cache_key = _generate_cache_key(obj, method, args=args, kwargs=kwargs)
    if cache_backend is None:
        cache_backend = django_cache
    cache_backend.delete(cache_key)
    _logging_verbose('cache delete. key=%s' % cache_key)


# 
# Sample
# 
#class MethodCacheModel(models.Model):
#    """
#    When using method_cache decorator, you must always write a method to delete it.
#    Override the delete_cache method in the extension to handle it.
#    """
#    class Meta:
#        abstract = True
#
#    def save(self, *args, **kwargs):
#        super(MethodCacheModel, self).save(*args, **kwargs)
#        self.delete_cache()
#
#    def delete(self, *args, **kwargs):
#        self.delete_cache()
#        super(MethodCacheModel, self).delete(*args, **kwargs)
#
#    def delete_cache(self):
#        super(MethodCacheModel, self).delete_cache()
#        delete_method_cache(self, self.get, args=(self.pk, ))
#        delete_method_cache(self, self.get_all)
#
#    @classmethod
#    @method_cache
#    def get_all(cls):
#        return list(cls.objects.all())
#
#    @classmethod
#    @method_cache
#    def get(cls, pk):
#        return cls.objects.get(pk = pk)
#
Currently unrated

Comments

Archive

2024
2023
2022
2021
2020
2019
2018
2017
2016
2015
2014
2013
2012
2011