Python django.core.cache 模块,caches() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.core.cache.caches()

项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def get_cache_key(request, key_prefix=None, method='GET', cache=None):
    """
    Returns a cache key based on the request URL and query. It can be used
    in the request phase because it pulls the list of headers to take into
    account from the global URL registry and uses those to build a cache key
    to check against.

    If there is no headerlist stored, the page needs to be rebuilt, so this
    function returns None.
    """
    if key_prefix is None:
        key_prefix = settings.CACHE_MIDDLEWARE_KEY_PREFIX
    cache_key = _generate_cache_header_key(key_prefix, request)
    if cache is None:
        cache = caches[settings.CACHE_MIDDLEWARE_ALIAS]
    headerlist = cache.get(cache_key)
    if headerlist is not None:
        return _generate_cache_key(request, method, headerlist, key_prefix)
    else:
        return None
项目:zing    作者:evernote    | 项目源码 | 文件源码
def get_cache(cache=None):
    """Return ``cache`` or the 'default' cache if ``cache`` is not specified or
    ``cache`` is not configured.

    :param cache: The name of the requested cache.
    """
    try:
        # Check for proper Redis persistent backends
        # FIXME: this logic needs to be a system sanity check
        if (not settings.DEBUG and cache in PERSISTENT_STORES and
                (cache not in settings.CACHES or 'RedisCache' not in
                 settings.CACHES[cache]['BACKEND'] or
                 settings.CACHES[cache].get('TIMEOUT', '') is not None)):
            raise ImproperlyConfigured(
                'Pootle requires a Redis-backed caching backend for %r '
                'with `TIMEOUT: None`. Please review your settings.' % cache
            )

        return caches[cache]
    except InvalidCacheBackendError:
        return default_cache
项目:valhalla    作者:LCOGT    | 项目源码 | 文件源码
def get_downtime_intervals():
        ''' Returns dictionary of IntervalSets of downtime intervals per telescope resource. Caches the data and will
            attempt to update the cache every 15 minutes, but fallback on using previous downtime list otherwise.
        '''
        downtime_intervals = caches['locmem'].get('downtime_intervals', [])
        if not downtime_intervals:
            # If the cache has expired, attempt to update the downtime intervals
            try:
                data = DowntimeDB._get_downtime_data()
                downtime_intervals = DowntimeDB._order_downtime_by_resource(data)
                caches['locmem'].set('downtime_intervals', downtime_intervals, 900)
                caches['locmem'].set('downtime_intervals.no_expire', downtime_intervals)
            except DowntimeDBException as e:
                downtime_intervals = caches['locmem'].get('downtime_intervals.no_expire', [])
                logger.warning(repr(e))

        return downtime_intervals
项目:valhalla    作者:LCOGT    | 项目源码 | 文件源码
def _get_configdb_data(self, resource):
        ''' Gets all the data from configdb (the sites structure with everything in it)
        :return: list of dictionaries of site data
        '''

        data = caches['locmem'].get(resource)
        if not data:
            try:
                r = requests.get(settings.CONFIGDB_URL + '/{}/'.format(resource))
                r.raise_for_status()
            except (requests.exceptions.RequestException, requests.exceptions.HTTPError) as e:
                msg = "{}: {}".format(e.__class__.__name__, CONFIGDB_ERROR_MSG)
                raise ConfigDBException(msg)
            try:
                data = r.json()['results']
            except KeyError:
                raise ConfigDBException(CONFIGDB_ERROR_MSG)
            # cache the results for 15 minutes
            caches['locmem'].set(resource, data, 900)

        return data
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def _get_lock(lock_name, expiration):
    """
    Creates a new redis LuaLock

    Args:
        lock_name (str): The name of the lock
        expiration (datetime.datetime): The expiration datetime

    Returns:
        redis.lock.LuaLock: a redis lua-based lock
    """
    timeout = int((expiration - now_in_utc()).total_seconds())

    # this is a StrictRedis instance, we need this for the script installation that LuaLock uses
    redis = caches['redis'].client.get_client()
    # don't block acquiring the lock, the task will need to try again later
    return LuaLock(redis, lock_name, timeout=timeout, blocking=False, thread_local=False)
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def release_lock(lock_name, token):
    """
    Release a lock

    Args:
        lock_name (str): The lock key in redis
        token (bytes): The unique id used

    Returns:
        bool: True if the lock was successfully released
    """
    # this is a StrictRedis instance, we need this for the script installation that LuaLock uses
    redis = caches['redis'].client.get_client()
    lock = LuaLock(redis, lock_name)
    try:
        lock.do_release(token)
    except LockError:
        # If the lock is expired we don't want to raise an error
        pass
项目:django-redis-sentinel-redux    作者:danigosa    | 项目源码 | 文件源码
def test_iter_keys(self):
        cache = caches["default"]
        _params = cache._params
        _is_shard = (_params["OPTIONS"]["CLIENT_CLASS"] ==
                     "django_redis.client.ShardClient")

        if _is_shard:
            return

        cache.set("foo1", 1)
        cache.set("foo2", 1)
        cache.set("foo3", 1)

        # Test simple result
        result = set(cache.iter_keys("foo*"))
        self.assertEqual(result, set(["foo1", "foo2", "foo3"]))

        # Test limited result
        result = list(cache.iter_keys("foo*", itersize=2))
        self.assertEqual(len(result), 3)

        # Test generator object
        result = cache.iter_keys("foo*")
        self.assertNotEqual(next(result), None)
项目:django-jsonattrs    作者:Cadasta    | 项目源码 | 文件源码
def lookup(self, instance=None, content_type=None, selectors=None):
        if instance is not None and content_type is None:
            content_type = ContentType.objects.get_for_model(instance)
        if selectors is None and instance is not None:
            selectors = self._get_selectors(instance, content_type)
        selectors = tuple(selectors)
        if any(s is None for s in selectors):
            return None

        # Look for schema list in cache, keyed by content type and
        # selector list.
        key = schema_cache_key(content_type, selectors)
        cached = caches['jsonattrs'].get(key)
        if cached is not None:
            return cached

        # Not in cache: build schema list using increasing selector
        # sequences.
        base_schemas = self.filter(content_type=content_type)
        schemas = []
        for i in range(len(selectors) + 1):
            schemas += list(base_schemas.filter(selectors=selectors[:i]))
        caches['jsonattrs'].set(key, schemas)
        return schemas
项目:dd-trace-py    作者:DataDog    | 项目源码 | 文件源码
def test_wrapper_incr_safety(self):
        # get the default cache
        cache = caches['default']

        # it should fail not because of our wrapper
        with assert_raises(ValueError) as ex:
            cache.incr('missing_key')

        # the error is not caused by our tracer
        eq_(ex.exception.args[0], "Key 'missing_key' not found")
        # an error trace must be sent
        spans = self.tracer.writer.pop()
        eq_(len(spans), 2)
        span = spans[0]
        eq_(span.resource, 'incr')
        eq_(span.name, 'django.cache')
        eq_(span.span_type, 'cache')
        eq_(span.error, 1)
项目:lifesoundtrack    作者:MTG    | 项目源码 | 文件源码
def get_cache_key(request, key_prefix=None, method='GET', cache=None):
    """
    Returns a cache key based on the request URL and query. It can be used
    in the request phase because it pulls the list of headers to take into
    account from the global URL registry and uses those to build a cache key
    to check against.

    If there is no headerlist stored, the page needs to be rebuilt, so this
    function returns None.
    """
    if key_prefix is None:
        key_prefix = settings.CACHE_MIDDLEWARE_KEY_PREFIX
    cache_key = _generate_cache_header_key(key_prefix, request)
    if cache is None:
        cache = caches[settings.CACHE_MIDDLEWARE_ALIAS]
    headerlist = cache.get(cache_key)
    if headerlist is not None:
        return _generate_cache_key(request, method, headerlist, key_prefix)
    else:
        return None
项目:django_any_backend    作者:primal100    | 项目源码 | 文件源码
def setup_attributes(self):
        self.setup_query()
        self.model = self.query.model
        self.pk_fieldname = self.model._meta.pk.attname
        self.fieldnames = [(self.pk_fieldname,)]
        self.db_config = self.connection.settings_dict
        self.max_relation_depth = self.db_config.get('MAX_RELATION_DEPTH', 10)
        self.chunk_size = getattr(self.model, 'max_per_request', None) or self.db_config.get('CHUNK_SIZE', float('inf'))
        cache = self.db_config.get('CACHE', None)
        if cache:
            cache_name = cache['NAME']
            self.cache = caches[cache_name]
            self.cache_timeout = cache.get('TIMEOUT', 60)
            self.cache_count_all_timeout = cache.get('COUNT_ALL_TIMEOUT', self.cache_timeout)
            logger.debug('Using cache: %s' % cache_name)
        else:
            self.cache = None
            logger.debug('Caching not enabled')
项目:montage    作者:storyful    | 项目源码 | 文件源码
def __init__(self, default_timeout=None):
        """
            Creates the manager
        """
        self.cache = caches['default']

        try:
            self.version = get_application_id()
        except AttributeError:
            # this fails on local dev
            self.version = ''

        self.default_timeout = (
            settings.MEMOISE_CACHE_TIMEOUT
            if default_timeout is None
            else default_timeout
        )
项目:montage    作者:storyful    | 项目源码 | 文件源码
def get_or_set(self, fn, *args, **kwargs):
        """
            Wraps `fn` and caches the response

            Cache key varys by all *args and **kwargs
        """
        timeout = kwargs.pop("timeout", None)
        message_type = kwargs.pop("message_type", None)
        key = self.create_key(fn, args, kwargs)

        obj = self.get_by_key(key, message_type=message_type)

        if obj is None:
            obj = fn(*args, **kwargs)
            self.add_by_key(
                key, obj, timeout=timeout, message_type=message_type)

        return obj
项目:liberator    作者:libscie    | 项目源码 | 文件源码
def get_cache_key(request, key_prefix=None, method='GET', cache=None):
    """
    Returns a cache key based on the request URL and query. It can be used
    in the request phase because it pulls the list of headers to take into
    account from the global URL registry and uses those to build a cache key
    to check against.

    If there is no headerlist stored, the page needs to be rebuilt, so this
    function returns None.
    """
    if key_prefix is None:
        key_prefix = settings.CACHE_MIDDLEWARE_KEY_PREFIX
    cache_key = _generate_cache_header_key(key_prefix, request)
    if cache is None:
        cache = caches[settings.CACHE_MIDDLEWARE_ALIAS]
    headerlist = cache.get(cache_key)
    if headerlist is not None:
        return _generate_cache_key(request, method, headerlist, key_prefix)
    else:
        return None
项目:djanoDoc    作者:JustinChavez    | 项目源码 | 文件源码
def get_cache_key(request, key_prefix=None, method='GET', cache=None):
    """
    Returns a cache key based on the request URL and query. It can be used
    in the request phase because it pulls the list of headers to take into
    account from the global URL registry and uses those to build a cache key
    to check against.

    If there is no headerlist stored, the page needs to be rebuilt, so this
    function returns None.
    """
    if key_prefix is None:
        key_prefix = settings.CACHE_MIDDLEWARE_KEY_PREFIX
    cache_key = _generate_cache_header_key(key_prefix, request)
    if cache is None:
        cache = caches[settings.CACHE_MIDDLEWARE_ALIAS]
    headerlist = cache.get(cache_key)
    if headerlist is not None:
        return _generate_cache_key(request, method, headerlist, key_prefix)
    else:
        return None
项目:django-heartbeat    作者:pbs    | 项目源码 | 文件源码
def get_cache(cache_name):
    if hasattr(caches, '__call__'):
        return caches(cache_name)
    return caches[cache_name]
项目:django-in-request-cache    作者:mojeto    | 项目源码 | 文件源码
def main_cache():
    cache = caches['main_cache']
    cache.clear()
    return cache
项目:django-in-request-cache    作者:mojeto    | 项目源码 | 文件源码
def fast_cache():
    cache = caches['fast_cache']
    cache.clear()
    return cache
项目:django-in-request-cache    作者:mojeto    | 项目源码 | 文件源码
def fast_cache(self):
        """
        It get faster cache backend
        :return: BaseCache
        """
        return caches[self.fast_cache_alias]
项目:django-in-request-cache    作者:mojeto    | 项目源码 | 文件源码
def cache(self):
        """
        It get slower cache backend
        :return: BaseCache
        """
        return caches[self.cache_alias]
项目:django-souvenirs    作者:appsembler    | 项目源码 | 文件源码
def souvenez(user, when=None, ratelimit=True, check_duplicate=False):
    """
    Save a Souvenir to the DB, rate-limited by default to once per hour.
    Returns a string: "added", "rate-limited" or "duplicated".
    """
    # user can be a User object or PK (for backfill script)
    user_id = getattr(user, 'id', user)
    username = getattr(user, 'username', user)  # just for logging

    if when is None:
        when = timezone.now()

    if ratelimit is True:
        ratelimit = getattr(settings, 'SOUVENIRS_RATELIMIT_SECONDS', 3600)

    if ratelimit:
        name = getattr(settings, 'SOUVENIRS_CACHE_NAME', 'default')
        prefix = getattr(settings, 'SOUVENIRS_CACHE_PREFIX', 'souvenir.')
        key = '{}.{}'.format(prefix, user_id)
        cache = caches[name]
        value = cache.get(key)
        if value and when < value + timedelta(seconds=ratelimit):
            logger.debug("rate-limited %s (last seen %s)", username, value)
            return 'rate-limited'
        cache.set(key, when)

    if check_duplicate:
        if Souvenir.objects.filter(user_id=user_id, when=when).exists():
            logger.debug("ignoring duplicate souvenir for %s (%s)", username, when)
            return 'duplicated'

    Souvenir(user_id=user_id, when=when).save()
    logger.debug("saved souvenir for %s (%s)", username, when)
    return 'added'
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def handle(self, *tablenames, **options):
        db = options.get('database')
        self.verbosity = int(options.get('verbosity'))
        dry_run = options.get('dry_run')
        if len(tablenames):
            # Legacy behavior, tablename specified as argument
            for tablename in tablenames:
                self.create_table(db, tablename, dry_run)
        else:
            for cache_alias in settings.CACHES:
                cache = caches[cache_alias]
                if isinstance(cache, BaseDatabaseCache):
                    self.create_table(db, cache._table, dry_run)
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def __init__(self, session_key=None):
        self._cache = caches[settings.SESSION_CACHE_ALIAS]
        super(SessionStore, self).__init__(session_key)
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def __init__(self, session_key=None):
        self._cache = caches[settings.SESSION_CACHE_ALIAS]
        super(SessionStore, self).__init__(session_key)
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def __init__(self):
        self.cache_timeout = settings.CACHE_MIDDLEWARE_SECONDS
        self.key_prefix = settings.CACHE_MIDDLEWARE_KEY_PREFIX
        self.cache_alias = settings.CACHE_MIDDLEWARE_ALIAS
        self.cache = caches[self.cache_alias]
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def __init__(self):
        self.key_prefix = settings.CACHE_MIDDLEWARE_KEY_PREFIX
        self.cache_alias = settings.CACHE_MIDDLEWARE_ALIAS
        self.cache = caches[self.cache_alias]
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def learn_cache_key(request, response, cache_timeout=None, key_prefix=None, cache=None):
    """
    Learns what headers to take into account for some request URL from the
    response object. It stores those headers in a global URL registry so that
    later access to that URL will know what headers to take into account
    without building the response object itself. The headers are named in the
    Vary header of the response, but we want to prevent response generation.

    The list of headers to use for cache key generation is stored in the same
    cache as the pages themselves. If the cache ages some data out of the
    cache, this just means that we have to build the response once to get at
    the Vary header and so at the list of headers to use for the cache key.
    """
    if key_prefix is None:
        key_prefix = settings.CACHE_MIDDLEWARE_KEY_PREFIX
    if cache_timeout is None:
        cache_timeout = settings.CACHE_MIDDLEWARE_SECONDS
    cache_key = _generate_cache_header_key(key_prefix, request)
    if cache is None:
        cache = caches[settings.CACHE_MIDDLEWARE_ALIAS]
    if response.has_header('Vary'):
        is_accept_language_redundant = settings.USE_I18N or settings.USE_L10N
        # If i18n or l10n are used, the generated cache key will be suffixed
        # with the current locale. Adding the raw value of Accept-Language is
        # redundant in that case and would result in storing the same content
        # under multiple keys in the cache. See #18191 for details.
        headerlist = []
        for header in cc_delim_re.split(response['Vary']):
            header = header.upper().replace('-', '_')
            if header == 'ACCEPT_LANGUAGE' and is_accept_language_redundant:
                continue
            headerlist.append('HTTP_' + header)
        headerlist.sort()
        cache.set(cache_key, headerlist, cache_timeout)
        return _generate_cache_key(request, request.method, headerlist, key_prefix)
    else:
        # if there is no Vary header, we still need a cache key
        # for the request.build_absolute_uri()
        cache.set(cache_key, [], cache_timeout)
        return _generate_cache_key(request, request.method, [], key_prefix)
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def render(self, context):
        try:
            expire_time = self.expire_time_var.resolve(context)
        except VariableDoesNotExist:
            raise TemplateSyntaxError('"cache" tag got an unknown variable: %r' % self.expire_time_var.var)
        try:
            expire_time = int(expire_time)
        except (ValueError, TypeError):
            raise TemplateSyntaxError('"cache" tag got a non-integer timeout value: %r' % expire_time)
        if self.cache_name:
            try:
                cache_name = self.cache_name.resolve(context)
            except VariableDoesNotExist:
                raise TemplateSyntaxError('"cache" tag got an unknown variable: %r' % self.cache_name.var)
            try:
                fragment_cache = caches[cache_name]
            except InvalidCacheBackendError:
                raise TemplateSyntaxError('Invalid cache name specified for cache tag: %r' % cache_name)
        else:
            try:
                fragment_cache = caches['template_fragments']
            except InvalidCacheBackendError:
                fragment_cache = caches['default']

        vary_on = [var.resolve(context) for var in self.vary_on]
        cache_key = make_template_fragment_key(self.fragment_name, vary_on)
        value = fragment_cache.get(cache_key)
        if value is None:
            value = self.nodelist.render(context)
            fragment_cache.set(cache_key, value, expire_time)
        return value
项目:NarshaTech    作者:KimJangHyeon    | 项目源码 | 文件源码
def handle(self, *tablenames, **options):
        db = options['database']
        self.verbosity = options['verbosity']
        dry_run = options['dry_run']
        if len(tablenames):
            # Legacy behavior, tablename specified as argument
            for tablename in tablenames:
                self.create_table(db, tablename, dry_run)
        else:
            for cache_alias in settings.CACHES:
                cache = caches[cache_alias]
                if isinstance(cache, BaseDatabaseCache):
                    self.create_table(db, cache._table, dry_run)
项目:NarshaTech    作者:KimJangHyeon    | 项目源码 | 文件源码
def __init__(self, session_key=None):
        self._cache = caches[settings.SESSION_CACHE_ALIAS]
        super(SessionStore, self).__init__(session_key)
项目:NarshaTech    作者:KimJangHyeon    | 项目源码 | 文件源码
def __init__(self, session_key=None):
        self._cache = caches[settings.SESSION_CACHE_ALIAS]
        super(SessionStore, self).__init__(session_key)
项目:django-lrucache-backend    作者:kogan    | 项目源码 | 文件源码
def prepare(name):
    setup()

    from django.core.cache import caches

    obj = caches[name]

    for key in range(RANGE):
        key = str(key).encode('utf-8')
        obj.set(key, key)

    try:
        obj.close()
    except:
        pass
项目:django-wechat-example    作者:wechatpy    | 项目源码 | 文件源码
def get_component():
    """
    ??????API??
    """
    component = WeChatComponent(
        settings.COMPONENT_APP_ID,
        settings.COMPONENT_APP_SECRET,
        settings.COMPONENT_APP_TOKEN,
        settings.COMPONENT_ENCODINGAESKEY,
        session=caches['wechat']
    )
    return component
项目:blog_django    作者:chnpmy    | 项目源码 | 文件源码
def get_cached_choices(self):
        if not self.cache_config['enabled']:
            return None
        c = caches(self.cache_config['cache'])
        return c.get(self.cache_config['key']%self.field_path)
项目:blog_django    作者:chnpmy    | 项目源码 | 文件源码
def set_cached_choices(self,choices):
        if not self.cache_config['enabled']:
            return
        c = caches(self.cache_config['cache'])
        return c.set(self.cache_config['key']%self.field_path,choices)
项目:quickcache    作者:dimagi    | 项目源码 | 文件源码
def tiered_django_cache(cache_name_timeout_pairs):
    return TieredCache([
        CacheWithTimeout(caches[cache_name], timeout)
        for cache_name, timeout in cache_name_timeout_pairs
        if timeout
    ])
项目:Scrum    作者:prakharchoudhary    | 项目源码 | 文件源码
def handle(self, *tablenames, **options):
        db = options['database']
        self.verbosity = options['verbosity']
        dry_run = options['dry_run']
        if len(tablenames):
            # Legacy behavior, tablename specified as argument
            for tablename in tablenames:
                self.create_table(db, tablename, dry_run)
        else:
            for cache_alias in settings.CACHES:
                cache = caches[cache_alias]
                if isinstance(cache, BaseDatabaseCache):
                    self.create_table(db, cache._table, dry_run)
项目:Scrum    作者:prakharchoudhary    | 项目源码 | 文件源码
def __init__(self, session_key=None):
        self._cache = caches[settings.SESSION_CACHE_ALIAS]
        super(SessionStore, self).__init__(session_key)
项目:Scrum    作者:prakharchoudhary    | 项目源码 | 文件源码
def __init__(self, session_key=None):
        self._cache = caches[settings.SESSION_CACHE_ALIAS]
        super(SessionStore, self).__init__(session_key)
项目:dream_blog    作者:fanlion    | 项目源码 | 文件源码
def get_cached_choices(self):
        if not self.cache_config['enabled']:
            return None
        c = caches(self.cache_config['cache'])
        return c.get(self.cache_config['key']%self.field_path)
项目:dream_blog    作者:fanlion    | 项目源码 | 文件源码
def set_cached_choices(self,choices):
        if not self.cache_config['enabled']:
            return
        c = caches(self.cache_config['cache'])
        return c.set(self.cache_config['key']%self.field_path,choices)
项目:MxOnline    作者:myTeemo    | 项目源码 | 文件源码
def get_cached_choices(self):
        if not self.cache_config['enabled']:
            return None
        c = caches(self.cache_config['cache'])
        return c.get(self.cache_config['key']%self.field_path)
项目:MxOnline    作者:myTeemo    | 项目源码 | 文件源码
def set_cached_choices(self,choices):
        if not self.cache_config['enabled']:
            return
        c = caches(self.cache_config['cache'])
        return c.set(self.cache_config['key']%self.field_path,choices)
项目:django-celery-results    作者:celery    | 项目源码 | 文件源码
def cache_backend(self):
        backend = self.app.conf.cache_backend
        return caches[backend] if backend else default_cache
项目:djangoblog    作者:liuhuipy    | 项目源码 | 文件源码
def get_cached_choices(self):
        if not self.cache_config['enabled']:
            return None
        c = caches(self.cache_config['cache'])
        return c.get(self.cache_config['key']%self.field_path)
项目:djangoblog    作者:liuhuipy    | 项目源码 | 文件源码
def set_cached_choices(self,choices):
        if not self.cache_config['enabled']:
            return
        c = caches(self.cache_config['cache'])
        return c.set(self.cache_config['key']%self.field_path,choices)
项目:django    作者:alexsukhrin    | 项目源码 | 文件源码
def handle(self, *tablenames, **options):
        db = options['database']
        self.verbosity = options['verbosity']
        dry_run = options['dry_run']
        if len(tablenames):
            # Legacy behavior, tablename specified as argument
            for tablename in tablenames:
                self.create_table(db, tablename, dry_run)
        else:
            for cache_alias in settings.CACHES:
                cache = caches[cache_alias]
                if isinstance(cache, BaseDatabaseCache):
                    self.create_table(db, cache._table, dry_run)
项目:django    作者:alexsukhrin    | 项目源码 | 文件源码
def __init__(self, session_key=None):
        self._cache = caches[settings.SESSION_CACHE_ALIAS]
        super(SessionStore, self).__init__(session_key)
项目:django    作者:alexsukhrin    | 项目源码 | 文件源码
def __init__(self, session_key=None):
        self._cache = caches[settings.SESSION_CACHE_ALIAS]
        super(SessionStore, self).__init__(session_key)
项目:django-redis-sentinel-redux    作者:danigosa    | 项目源码 | 文件源码
def setUp(self):
        self.old_kf = settings.CACHES['default'].get('KEY_FUNCTION')
        self.old_rkf = settings.CACHES['default'].get('REVERSE_KEY_FUNCTION')
        settings.CACHES['default']['KEY_FUNCTION'] = 'redis_backend_testapp.tests.make_key'
        settings.CACHES['default']['REVERSE_KEY_FUNCTION'] = 'redis_backend_testapp.tests.reverse_key'

        self.cache = caches['default']
        try:
            self.cache.clear()
        except Exception:
            pass