Python functools 模块,lru_cache() 实例源码


def gl_init(self):
        self.gl_vertex_shader_factory = functools.lru_cache(maxsize=None)(functools.partial(gl.Shader,GL_VERTEX_SHADER))
        self.gl_fragment_shader_factory = functools.lru_cache(maxsize=None)(functools.partial(gl.Shader,GL_FRAGMENT_SHADER))
        self.gl_program_factory = functools.lru_cache(maxsize=None)(GLProgram)
        self.gl_texture_factory = functools.lru_cache(maxsize=None)(gx.texture.GLTexture)

        array_table = {gx.VA_PTNMTXIDX:GLMatrixIndexArray()}
        array_table.update((attribute,array.gl_convert()) for attribute,array in self.array_table.items())

        for shape in self.shapes:

        for material in self.materials:

        for texture in self.textures:

        self.gl_joints = [copy.copy(joint) for joint in self.joints]
        self.gl_joint_matrices = numpy.empty((len(self.joints),3,4),numpy.float32)
        self.gl_matrix_table = gl.TextureBuffer(GL_DYNAMIC_DRAW,GL_RGBA32F,(len(self.matrix_descriptors),3,4),numpy.float32)

        self.gl_draw_objects = list(self.gl_generate_draw_objects(self.scene_graph))
        self.gl_draw_objects.sort(key=lambda draw_object: draw_object.material.unknown0)
def indexed_cache(func):

    func = functools.lru_cache()(func)

    @utils.catch(IndexError, return_value=lex.generics.index_error)
    def inner(inp, *, index, **kwargs):
        results = func(**kwargs)
        if isinstance(results, list):
            tools.save_results(inp, range(len(results)), results.__getitem__)
            return results[index - 1 if index else 0]
            return results

    return inner

def reuse(func=None, *, cache=lru_cache()):
    """Cache and reuse a generator function across multiple calls."""
    # Allow this decorator to work with or without being called
    if func is None:
        return partial(reuse, cache=cache)

    # Either initialize an empty history and start a new generator, or
    # retrieve an existing history and the already-started generator
    # that produced it
    def resume(*args, **kwargs):
        return [], func(*args, **kwargs)

    def reuser(*args, **kwargs):
        history, gen = resume(*args, **kwargs)
        yield from history
        record = history.append  # Avoid inner-loop name lookup
        for x in gen:
            yield x

    return reuser
def __init__(self,

        self.bot_user = bot_user
        self.osu_client = osu_client
        self.model_cache_dir = pathlib.Path(model_cache_dir)
        self.token_secret = Fernet(token_secret)
        self.upload_url = upload_url

        self.get_model = lru_cache(model_cache_size)(self._get_model)
        self._user_stats = ExpiringCache()

        self._candidates = LockedIterator(self._gen_candidates())
def stop_containers(self, instances):
        Stops all the specified containers in parallel, still respecting links
        current_formation = self.introspector.introspect()

        # Inner function that we can pass to dependency_sort
        def get_incoming_links(instance):
            result = set()
            for potential_linker in current_formation:
                links_to = potential_linker.links.values()
                if instance in links_to:
            return result

        # Resolve container list to include descendency
        instances = dependency_sort(instances, get_incoming_links)
        # Parallel-stop things
            lambda instance, done: all((linker in done) for linker in get_incoming_links(instance)),
def test_need_for_rlock(self):
        # This will deadlock on an LRU cache that uses a regular lock

        def test_func(x):
            'Used to demonstrate a reentrant lru_cache call within a single thread'
            return x

        class DoubleEq:
            'Demonstrate a reentrant lru_cache call within a single thread'
            def __init__(self, x):
                self.x = x
            def __hash__(self):
                return self.x
            def __eq__(self, other):
                if self.x == 2:
                return self.x == other.x

        test_func(DoubleEq(1))                      # Load the cache
        test_func(DoubleEq(2))                      # Load the cache
        self.assertEqual(test_func(DoubleEq(2)),    # Trigger a re-entrant __eq__ call
                         DoubleEq(2))               # Verify the correct return value
def __init__(self,
        self.path = path = pathlib.Path(path)

        self._read_beatmap = lru_cache(cache)(self._raw_read_beatmap)
        self._db = db = sqlite3.connect(str(path / '.slider.db'))
        with db:
                CREATE TABLE IF NOT EXISTS beatmaps (
                    md5 BLOB PRIMARY KEY,
                    id INT,
                    path TEXT UNIQUE NOT NULL
        self._download_url = download_url
def locking_lru_cache(maxsize=128, typed=False):  # can't implement ignored_keywords because we use python's lru_cache...
    "An lru cache with a lock, to prevent concurrent invocations and allow reusing from cache"

    def deco(func):
        caching_func = lru_cache(maxsize, typed)(func)
        func._main_lock = RLock()
        func._keyed_locks = defaultdict(RLock)

        def inner(*args, **kwargs):
            key = _make_key(args, kwargs, typed=typed)
            with func._main_lock:
                key_lock = func._keyed_locks[key]
            with key_lock:
                return caching_func(*args, **kwargs)

        def clear():
            with func._main_lock:
                return caching_func.cache_clear()

        inner.cache_clear = clear
        return inner

    return deco
def testSetRowFactorySize(self):
            from functools import lru_cache
        except ImportError:  # Python < 3.2
            lru_cache = None
        queries = ['select 1 as a, 2 as b, 3 as c', 'select 123 as abc']
        query = self.c.query
        for maxsize in (None, 0, 1, 2, 3, 10, 1024):
            for i in range(3):
                for q in queries:
                    r = query(q).namedresult()[0]
                    if q.endswith('abc'):
                        self.assertEqual(r, (123,))
                        self.assertEqual(r._fields, ('abc',))
                        self.assertEqual(r, (1, 2, 3))
                        self.assertEqual(r._fields, ('a', 'b', 'c'))
            if lru_cache:
                info = pg._row_factory.cache_info()
                self.assertEqual(info.maxsize, maxsize)
                self.assertEqual(info.hits + info.misses, 6)
                    0 if maxsize is not None and maxsize < 2 else 4)
def get_block(self, block_list):
        block_ = QA_fetch_stock_block_adv()
        _data = []

            for item in block_list:

            return np.unique(_data).tolist()
        except Exception as e:
            raise e

def QA_backtest_sell_available(self, __code):
            return self.account.sell_available[__code]
            return 0
   # @lru_cache()
项目:QUANTAXIS    作者:yutiansut    | 项目源码 | 文件源码
def QA_backtest_get_block(self, block_list):
        block_ = QA_fetch_stock_block_adv()
        _data = []

            for item in block_list:

            return np.unique(_data).tolist()
        except Exception as e:
            raise e

def QA_backtest_sell_available(self, __code):
            return self.account.sell_available[__code]
            return 0
   # @lru_cache()
def clear_path_caches():
    """Clear the caches of all path-related methods in this module that use an lru_cache."""
项目:quackalike    作者:gumblex    | 项目源码 | 文件源码
def _get_indexword(model):
    def indexword(word):
            return model.voc.index(word)
        except ValueError:
            return None
    return indexword
项目:plotnine    作者:has2k1    | 项目源码 | 文件源码
def lru_cache(*args, **kwargs):
        def decorator(func):
            return func
        return decorator
def callable(obj):
        return any("__call__" in klass.__dict__ for klass in type(obj).__mro__)

# --- stdlib additions

# py 3.2 functools.lru_cache
# Taken from:
# Credit: Raymond Hettinger
def callable(obj):
        return any("__call__" in klass.__dict__ for klass in type(obj).__mro__)

# --- stdlib additions

# py 3.2 functools.lru_cache
# Taken from:
# Credit: Raymond Hettinger
def __init__(self, codes, separator='@@'):
        self.encode = functools.lru_cache(maxsize=65536)(self.encode)
        self.bpe_codes = [tuple(item.split()) for item in codes]
        # some hacking to deal with duplicates (only consider first instance)
        self.bpe_codes = dict([(code,i) for (i,code) in reversed(list(enumerate(self.bpe_codes)))])

        self.separator = separator
项目:qcore    作者:quora    | 项目源码 | 文件源码
def lru_cache(maxsize=128, key_fn=None):
    """Decorator that adds an LRU cache of size maxsize to the decorated function.

    maxsize is the number of different keys cache can accomodate.
    key_fn is the function that builds key from args. The default key function
    creates a tuple out of args and kwargs. If you use the default, there is no reason
    not to use functools.lru_cache directly.

    Possible use cases:
    - Your cache key is very large, so you don't want to keep the whole key in memory.
    - The function takes some arguments that don't affect the result.


    def decorator(fn):
        cache = LRUCache(maxsize)
        argspec = inspect2.getfullargspec(fn)
        arg_names = argspec.args[1:] + argspec.kwonlyargs  # remove self
        kwargs_defaults = get_kwargs_defaults(argspec)

        cache_key = key_fn
        if cache_key is None:

            def cache_key(args, kwargs):
                return get_args_tuple(args, kwargs, arg_names, kwargs_defaults)

        def wrapper(*args, **kwargs):
            key = cache_key(args, kwargs)
                return cache[key]
            except KeyError:
                value = fn(*args, **kwargs)
                cache[key] = value
                return value

        return wrapper
    return decorator
def isinstance(val, types):
    if types is int:
        types = (int, long)
    elif type(types) is tuple and int in types:
        types += (long,)
    return _builtin_isinstance(val, types)

# functools.lru_cache is Python 3.2+ only.
# /@functools.lru_cache()/d

# int().to_bytes is Python 3.2+ only.
# s/\(\w+\)\.to_bytes(/_int_to_bytes(\1, /
def callable(obj):
        return any("__call__" in klass.__dict__ for klass in type(obj).__mro__)

# --- stdlib additions

# py 3.2 functools.lru_cache
# Taken from:
# Credit: Raymond Hettinger
def callable(obj):
        return any("__call__" in klass.__dict__ for klass in type(obj).__mro__)

# --- stdlib additions

# py 3.2 functools.lru_cache
# Taken from:
# Credit: Raymond Hettinger
def test_lru_with_maxsize_none(self):
        def fib(n):
            if n < 2:
                return n
            return fib(n-1) + fib(n-2)
        self.assertEqual([fib(n) for n in range(16)],
            [0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610])
            functools._CacheInfo(hits=28, misses=16, maxsize=None, currsize=16))
            functools._CacheInfo(hits=0, misses=0, maxsize=None, currsize=0))
def lru_cache(maxsize=128, typed=False):
    """Decorator to wrap a function with a memoizing callable that saves
    up to `maxsize` results based on a Least Recently Used (LRU)

    return _cache(LRUCache(maxsize), typed)
def callable(obj):
        return any("__call__" in klass.__dict__ for klass in type(obj).__mro__)

# --- stdlib additions

# py 3.2 functools.lru_cache
# Taken from:
# Credit: Raymond Hettinger
def simple_cache(func):
    Save results for the :meth:'path.using_module' classmethod.
    When Python 3.2 is available, use functools.lru_cache instead.
    saved_results = {}

    def wrapper(cls, module):
        if module in saved_results:
            return saved_results[module]
        saved_results[module] = func(cls, module)
        return saved_results[module]
    return wrapper
def isclose(a, b, rel_tol=1e-09, abs_tol=0.0):
        "Return true if numbers a and b are close to each other."
        return abs(a - b) <= max(rel_tol * max(abs(a), abs(b)), abs_tol)

# ______________________________________________________________________________
# Misc Functions

# TODO: Use functools.lru_cache memoization decorator
def isclose(a, b, rel_tol=1e-09, abs_tol=0.0):
        "Return true if numbers a and b are close to each other."
        return abs(a - b) <= max(rel_tol * max(abs(a), abs(b)), abs_tol)

# ______________________________________________________________________________
# Misc Functions

项目:MachineLearningProject    作者:ymynem    | 项目源码 | 文件源码
def subsequence_kernel_primed_lru_wrapped(s, t):
    def subsequence_kernel_primed(s_counter, jtot, l, i):  # where (i = 1, … , n-1)
        In order to deal with non-contiguous substrings, it is necessary to
        introduce a decay factor ? ? (0, 1) that can be used to weight the presence of a certain feature in a text
        :param s: string 1
        :param t: string 2
        :param l: lambda represents the weight?
        :param i: length of subsequence
        if i == 0:
            return 1
        elif min(s_counter, jtot) < i:  #
            return 0
            s_counter_minus_one = s_counter - 1
            x = s[s_counter_minus_one]  # last character. sx means the hole string, when they write only s they mean exclude last char
            the_sum = 0
            i_minus_one = i -1
            for j in range(jtot):
                if x == t[j]:
                    the_sum += subsequence_kernel_primed(s_counter_minus_one, j, l, i_minus_one) * l ** (jtot - j + 2)
        res = l * subsequence_kernel_primed(s_counter_minus_one, jtot, l, i) + the_sum
        return res

    return subsequence_kernel_primed
项目:MachineLearningProject    作者:ymynem    | 项目源码 | 文件源码
def kh(s, t, n, l):
    def kmm(n, si, ti):
         if n == 0:
             return 1
         if min(si, ti) < n:
             return 0
         if s[si-1] == t[ti-1]:
             return l*(kmm(n, si, ti-1) + l*km(n-1, si-1, ti-1))
             return l * kmm(n, si, ti-1)
#         return sum(km(n-1, si-1, j) * l**(ti-(j+1)+2) for j in range(ti) if t[j] == s[si-1])

    def km(n, si, ti):
        if n == 0:
            return 1
        if min(si, ti) < n:
            return 0
        return l*km(n, si-1, ti) + kmm(n, si, ti)
#        return l*km(n, si-1, ti) + sum(km(n-1, si-1, j) * l**(ti-(j+1)+2) for j in range(ti) if t[j] == s[si-1])

    def k(n, si, ti):
        if min(si, ti) < n:
            return 0
        return k(n, si-1, ti) + sum(km(n-1, si-1, j) for j in range(ti) if t[j] == s[si-1]) * l**2

    return k(n, len(s), len(t))
项目:kapsel    作者:conda    | 项目源码 | 文件源码
def lru_cache():
        def dec(f):
            def _(*args, **kws):
                return f(*args, **kws)

            return _

        return dec
项目:ml-utils    作者:LinxiFan    | 项目源码 | 文件源码
def lru_cache(func, maxsize=128, typed=False):
    Can be used with or without parenthesis. See `meta_wrap`'s effect. 
    return functools.lru_cache(maxsize, typed)(func)

# ======================== Type conversion ========================
项目:wiki-sem-500    作者:belph    | 项目源码 | 文件源码
def lru_cache(*args, **kwargs):
        return lambda x: x
项目:orizonhub    作者:gumblex    | 项目源码 | 文件源码
def _get_indexword(model):
    def indexword(word):
            return model.voc.index(word)
        except ValueError:
            return None
    return indexword
项目:hexchat-scripts    作者:dewiniaid    | 项目源码 | 文件源码
def __init__(self, name):
        self.word = True
        self.regex = None

        self.bold = False
        self.italic = False
        self.underline = False
        self.reverse = False
        self.color = None
        self.linecolor = None

        self._sound = None
        self.abs_sound = None

        self.wrap_line = None
        self.format_line = ""
        self.wrap_match = None
        self.format_match = ""
        self.replacement = None

        self.enabled = True
        self.mute = False

        self.notify = False
        self.focus = False
        self.flash = False
        self.copy = False

        self._name = name
        self.strip = 0
        self.pattern = name
        self._parent = self._prev = self._next = None

        # Nickname and Channel filters:
        # Lists of (bool, filter) tuples, where the bool is True for allow, False for deny.
        self.filters = {'nick': [], 'channel': []}
        self.check_filter = functools.lru_cache(maxsize=128)(self._check_filter)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_lru_with_maxsize_none(self):
        def fib(n):
            if n < 2:
                return n
            return fib(n-1) + fib(n-2)
        self.assertEqual([fib(n) for n in range(16)],
            [0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610])
            functools._CacheInfo(hits=28, misses=16, maxsize=None, currsize=16))
            functools._CacheInfo(hits=0, misses=0, maxsize=None, currsize=0))
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_lru_with_exceptions(self):
        # Verify that user_function exceptions get passed through without
        # creating a hard-to-read chained exception.
        for maxsize in (None, 100):
            def func(i):
                return 'abc'[i]
            self.assertEqual(func(0), 'a')
            with self.assertRaises(IndexError) as cm:
            # Verify that the previous exception did not result in a cached entry
            with self.assertRaises(IndexError):
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_lru_with_types(self):
        for maxsize in (None, 100):
            @functools.lru_cache(maxsize=maxsize, typed=True)
            def square(x):
                return x * x
            self.assertEqual(square(3), 9)
            self.assertEqual(type(square(3)), type(9))
            self.assertEqual(square(3.0), 9.0)
            self.assertEqual(type(square(3.0)), type(9.0))
            self.assertEqual(square(x=3), 9)
            self.assertEqual(type(square(x=3)), type(9))
            self.assertEqual(square(x=3.0), 9.0)
            self.assertEqual(type(square(x=3.0)), type(9.0))
            self.assertEqual(square.cache_info().hits, 4)
            self.assertEqual(square.cache_info().misses, 4)
项目:ieml    作者:IEMLdev    | 项目源码 | 文件源码
def __init__(self):

        self.lexer = get_script_lexer()
        self.parser = yacc.yacc(module=self, errorlog=logging, start='term',
                                debug=False, optimize=True, picklefile=os.path.join(parser_folder, "script_parser.pickle"))
        # rename the parsing method (can't name it directly parse with lru_cache due to ply checking)
        self.parse = self.t_parse
项目:ieml    作者:IEMLdev    | 项目源码 | 文件源码
def __init__(self):

        # Build the lexer and parser
        self.lexer = get_lexer()
        self.parser = yacc.yacc(module=self, errorlog=logging, start='path',
                                debug=False, optimize=True, picklefile="parser/path_parser.pickle")
        # rename the parsing method (can't name it directly parse with lru_cache due to ply checking)
        self.parse = self.t_parse
项目:click-configfile    作者:jenisys    | 项目源码 | 文件源码
def simple_cache(func):
    Save results for the :meth:'path.using_module' classmethod.
    When Python 3.2 is available, use functools.lru_cache instead.
    saved_results = {}

    def wrapper(cls, module):
        if module in saved_results:
            return saved_results[module]
        saved_results[module] = func(cls, module)
        return saved_results[module]
    return wrapper
项目:sublimeTextConfig    作者:luoye-fe    | 项目源码 | 文件源码
def clear_path_caches():
    """Clear the caches of all path-related methods in this module that use an lru_cache."""
项目:passbytcp    作者:mxdg    | 项目源码 | 文件源码
def _prebuilt_pkg(cls, pkg_type, fallback):
        """act as lru_cache"""
        if pkg_type not in cls._cache_prebuilt_pkg:
            pkg = fallback(force_rebuilt=True)
            cls._cache_prebuilt_pkg[pkg_type] = pkg"_prebuilt_pkg,id:{}".format(id(cls._cache_prebuilt_pkg)))
        return cls._cache_prebuilt_pkg[pkg_type]
项目:passbytcp    作者:mxdg    | 项目源码 | 文件源码
def pbuild_hs_m2s(cls, force_rebuilt=False):
        """pkg build: Handshake Master to Slaver"""
        # because py27 do not have functools.lru_cache, so we must write our own
        if force_rebuilt:
            return CtrlPkg(
            return cls._prebuilt_pkg(cls.PTYPE_HS_M2S, cls.pbuild_hs_m2s)
项目:passbytcp    作者:mxdg    | 项目源码 | 文件源码
def _prebuilt_pkg(cls, pkg_type, fallback):
        """act as lru_cache"""
        if pkg_type not in cls._cache_prebuilt_pkg:
            pkg = fallback(force_rebuilt=True)
            cls._cache_prebuilt_pkg[pkg_type] = pkg

        return cls._cache_prebuilt_pkg[pkg_type]
项目:passbytcp    作者:mxdg    | 项目源码 | 文件源码
def pbuild_hs_m2s(cls, force_rebuilt=False):
        """pkg build: Handshake Master to Slaver"""
        # because py27 do not have functools.lru_cache, so we must write our own
        if force_rebuilt:
            return cls(
            return cls._prebuilt_pkg(cls.PTYPE_HS_M2S, cls.pbuild_hs_m2s)
项目:odin    作者:imito    | 项目源码 | 文件源码
def lru_cache(maxsize=128):
    def tmp_func(func):
      return func
    return tmp_func
项目:AIND-Planning    作者:udacity    | 项目源码 | 文件源码
def isclose(a, b, rel_tol=1e-09, abs_tol=0.0):
        "Return true if numbers a and b are close to each other."
        return abs(a - b) <= max(rel_tol * max(abs(a), abs(b)), abs_tol)

# ______________________________________________________________________________
# Misc Functions

# TODO: Use functools.lru_cache memoization decorator
项目:pipenv    作者:pypa    | 项目源码 | 文件源码
def callable(obj):
        return any("__call__" in klass.__dict__ for klass in type(obj).__mro__)

# --- stdlib additions

# py 3.2 functools.lru_cache
# Taken from:
# Credit: Raymond Hettinger