我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.core.cache.caches()。
def get_cache_key(request, key_prefix=None, method='GET', cache=None): """ Returns a cache key based on the request URL and query. It can be used in the request phase because it pulls the list of headers to take into account from the global URL registry and uses those to build a cache key to check against. If there is no headerlist stored, the page needs to be rebuilt, so this function returns None. """ if key_prefix is None: key_prefix = settings.CACHE_MIDDLEWARE_KEY_PREFIX cache_key = _generate_cache_header_key(key_prefix, request) if cache is None: cache = caches[settings.CACHE_MIDDLEWARE_ALIAS] headerlist = cache.get(cache_key) if headerlist is not None: return _generate_cache_key(request, method, headerlist, key_prefix) else: return None
def get_cache(cache=None): """Return ``cache`` or the 'default' cache if ``cache`` is not specified or ``cache`` is not configured. :param cache: The name of the requested cache. """ try: # Check for proper Redis persistent backends # FIXME: this logic needs to be a system sanity check if (not settings.DEBUG and cache in PERSISTENT_STORES and (cache not in settings.CACHES or 'RedisCache' not in settings.CACHES[cache]['BACKEND'] or settings.CACHES[cache].get('TIMEOUT', '') is not None)): raise ImproperlyConfigured( 'Pootle requires a Redis-backed caching backend for %r ' 'with `TIMEOUT: None`. Please review your settings.' % cache ) return caches[cache] except InvalidCacheBackendError: return default_cache
def get_downtime_intervals(): ''' Returns dictionary of IntervalSets of downtime intervals per telescope resource. Caches the data and will attempt to update the cache every 15 minutes, but fallback on using previous downtime list otherwise. ''' downtime_intervals = caches['locmem'].get('downtime_intervals', []) if not downtime_intervals: # If the cache has expired, attempt to update the downtime intervals try: data = DowntimeDB._get_downtime_data() downtime_intervals = DowntimeDB._order_downtime_by_resource(data) caches['locmem'].set('downtime_intervals', downtime_intervals, 900) caches['locmem'].set('downtime_intervals.no_expire', downtime_intervals) except DowntimeDBException as e: downtime_intervals = caches['locmem'].get('downtime_intervals.no_expire', []) logger.warning(repr(e)) return downtime_intervals
def _get_configdb_data(self, resource): ''' Gets all the data from configdb (the sites structure with everything in it) :return: list of dictionaries of site data ''' data = caches['locmem'].get(resource) if not data: try: r = requests.get(settings.CONFIGDB_URL + '/{}/'.format(resource)) r.raise_for_status() except (requests.exceptions.RequestException, requests.exceptions.HTTPError) as e: msg = "{}: {}".format(e.__class__.__name__, CONFIGDB_ERROR_MSG) raise ConfigDBException(msg) try: data = r.json()['results'] except KeyError: raise ConfigDBException(CONFIGDB_ERROR_MSG) # cache the results for 15 minutes caches['locmem'].set(resource, data, 900) return data
def _get_lock(lock_name, expiration): """ Creates a new redis LuaLock Args: lock_name (str): The name of the lock expiration (datetime.datetime): The expiration datetime Returns: redis.lock.LuaLock: a redis lua-based lock """ timeout = int((expiration - now_in_utc()).total_seconds()) # this is a StrictRedis instance, we need this for the script installation that LuaLock uses redis = caches['redis'].client.get_client() # don't block acquiring the lock, the task will need to try again later return LuaLock(redis, lock_name, timeout=timeout, blocking=False, thread_local=False)
def release_lock(lock_name, token): """ Release a lock Args: lock_name (str): The lock key in redis token (bytes): The unique id used Returns: bool: True if the lock was successfully released """ # this is a StrictRedis instance, we need this for the script installation that LuaLock uses redis = caches['redis'].client.get_client() lock = LuaLock(redis, lock_name) try: lock.do_release(token) except LockError: # If the lock is expired we don't want to raise an error pass
def test_iter_keys(self): cache = caches["default"] _params = cache._params _is_shard = (_params["OPTIONS"]["CLIENT_CLASS"] == "django_redis.client.ShardClient") if _is_shard: return cache.set("foo1", 1) cache.set("foo2", 1) cache.set("foo3", 1) # Test simple result result = set(cache.iter_keys("foo*")) self.assertEqual(result, set(["foo1", "foo2", "foo3"])) # Test limited result result = list(cache.iter_keys("foo*", itersize=2)) self.assertEqual(len(result), 3) # Test generator object result = cache.iter_keys("foo*") self.assertNotEqual(next(result), None)
def lookup(self, instance=None, content_type=None, selectors=None): if instance is not None and content_type is None: content_type = ContentType.objects.get_for_model(instance) if selectors is None and instance is not None: selectors = self._get_selectors(instance, content_type) selectors = tuple(selectors) if any(s is None for s in selectors): return None # Look for schema list in cache, keyed by content type and # selector list. key = schema_cache_key(content_type, selectors) cached = caches['jsonattrs'].get(key) if cached is not None: return cached # Not in cache: build schema list using increasing selector # sequences. base_schemas = self.filter(content_type=content_type) schemas = [] for i in range(len(selectors) + 1): schemas += list(base_schemas.filter(selectors=selectors[:i])) caches['jsonattrs'].set(key, schemas) return schemas
def test_wrapper_incr_safety(self): # get the default cache cache = caches['default'] # it should fail not because of our wrapper with assert_raises(ValueError) as ex: cache.incr('missing_key') # the error is not caused by our tracer eq_(ex.exception.args[0], "Key 'missing_key' not found") # an error trace must be sent spans = self.tracer.writer.pop() eq_(len(spans), 2) span = spans[0] eq_(span.resource, 'incr') eq_(span.name, 'django.cache') eq_(span.span_type, 'cache') eq_(span.error, 1)
def setup_attributes(self): self.setup_query() self.model = self.query.model self.pk_fieldname = self.model._meta.pk.attname self.fieldnames = [(self.pk_fieldname,)] self.db_config = self.connection.settings_dict self.max_relation_depth = self.db_config.get('MAX_RELATION_DEPTH', 10) self.chunk_size = getattr(self.model, 'max_per_request', None) or self.db_config.get('CHUNK_SIZE', float('inf')) cache = self.db_config.get('CACHE', None) if cache: cache_name = cache['NAME'] self.cache = caches[cache_name] self.cache_timeout = cache.get('TIMEOUT', 60) self.cache_count_all_timeout = cache.get('COUNT_ALL_TIMEOUT', self.cache_timeout) logger.debug('Using cache: %s' % cache_name) else: self.cache = None logger.debug('Caching not enabled')
def __init__(self, default_timeout=None): """ Creates the manager """ self.cache = caches['default'] try: self.version = get_application_id() except AttributeError: # this fails on local dev self.version = '' self.default_timeout = ( settings.MEMOISE_CACHE_TIMEOUT if default_timeout is None else default_timeout )
def get_or_set(self, fn, *args, **kwargs): """ Wraps `fn` and caches the response Cache key varys by all *args and **kwargs """ timeout = kwargs.pop("timeout", None) message_type = kwargs.pop("message_type", None) key = self.create_key(fn, args, kwargs) obj = self.get_by_key(key, message_type=message_type) if obj is None: obj = fn(*args, **kwargs) self.add_by_key( key, obj, timeout=timeout, message_type=message_type) return obj
def get_cache(cache_name): if hasattr(caches, '__call__'): return caches(cache_name) return caches[cache_name]
def main_cache(): cache = caches['main_cache'] cache.clear() return cache
def fast_cache(): cache = caches['fast_cache'] cache.clear() return cache
def fast_cache(self): """ It get faster cache backend :return: BaseCache """ return caches[self.fast_cache_alias]
def cache(self): """ It get slower cache backend :return: BaseCache """ return caches[self.cache_alias]
def souvenez(user, when=None, ratelimit=True, check_duplicate=False): """ Save a Souvenir to the DB, rate-limited by default to once per hour. Returns a string: "added", "rate-limited" or "duplicated". """ # user can be a User object or PK (for backfill script) user_id = getattr(user, 'id', user) username = getattr(user, 'username', user) # just for logging if when is None: when = timezone.now() if ratelimit is True: ratelimit = getattr(settings, 'SOUVENIRS_RATELIMIT_SECONDS', 3600) if ratelimit: name = getattr(settings, 'SOUVENIRS_CACHE_NAME', 'default') prefix = getattr(settings, 'SOUVENIRS_CACHE_PREFIX', 'souvenir.') key = '{}.{}'.format(prefix, user_id) cache = caches[name] value = cache.get(key) if value and when < value + timedelta(seconds=ratelimit): logger.debug("rate-limited %s (last seen %s)", username, value) return 'rate-limited' cache.set(key, when) if check_duplicate: if Souvenir.objects.filter(user_id=user_id, when=when).exists(): logger.debug("ignoring duplicate souvenir for %s (%s)", username, when) return 'duplicated' Souvenir(user_id=user_id, when=when).save() logger.debug("saved souvenir for %s (%s)", username, when) return 'added'
def handle(self, *tablenames, **options): db = options.get('database') self.verbosity = int(options.get('verbosity')) dry_run = options.get('dry_run') if len(tablenames): # Legacy behavior, tablename specified as argument for tablename in tablenames: self.create_table(db, tablename, dry_run) else: for cache_alias in settings.CACHES: cache = caches[cache_alias] if isinstance(cache, BaseDatabaseCache): self.create_table(db, cache._table, dry_run)
def __init__(self, session_key=None): self._cache = caches[settings.SESSION_CACHE_ALIAS] super(SessionStore, self).__init__(session_key)
def __init__(self): self.cache_timeout = settings.CACHE_MIDDLEWARE_SECONDS self.key_prefix = settings.CACHE_MIDDLEWARE_KEY_PREFIX self.cache_alias = settings.CACHE_MIDDLEWARE_ALIAS self.cache = caches[self.cache_alias]
def __init__(self): self.key_prefix = settings.CACHE_MIDDLEWARE_KEY_PREFIX self.cache_alias = settings.CACHE_MIDDLEWARE_ALIAS self.cache = caches[self.cache_alias]
def learn_cache_key(request, response, cache_timeout=None, key_prefix=None, cache=None): """ Learns what headers to take into account for some request URL from the response object. It stores those headers in a global URL registry so that later access to that URL will know what headers to take into account without building the response object itself. The headers are named in the Vary header of the response, but we want to prevent response generation. The list of headers to use for cache key generation is stored in the same cache as the pages themselves. If the cache ages some data out of the cache, this just means that we have to build the response once to get at the Vary header and so at the list of headers to use for the cache key. """ if key_prefix is None: key_prefix = settings.CACHE_MIDDLEWARE_KEY_PREFIX if cache_timeout is None: cache_timeout = settings.CACHE_MIDDLEWARE_SECONDS cache_key = _generate_cache_header_key(key_prefix, request) if cache is None: cache = caches[settings.CACHE_MIDDLEWARE_ALIAS] if response.has_header('Vary'): is_accept_language_redundant = settings.USE_I18N or settings.USE_L10N # If i18n or l10n are used, the generated cache key will be suffixed # with the current locale. Adding the raw value of Accept-Language is # redundant in that case and would result in storing the same content # under multiple keys in the cache. See #18191 for details. headerlist = [] for header in cc_delim_re.split(response['Vary']): header = header.upper().replace('-', '_') if header == 'ACCEPT_LANGUAGE' and is_accept_language_redundant: continue headerlist.append('HTTP_' + header) headerlist.sort() cache.set(cache_key, headerlist, cache_timeout) return _generate_cache_key(request, request.method, headerlist, key_prefix) else: # if there is no Vary header, we still need a cache key # for the request.build_absolute_uri() cache.set(cache_key, [], cache_timeout) return _generate_cache_key(request, request.method, [], key_prefix)
def render(self, context): try: expire_time = self.expire_time_var.resolve(context) except VariableDoesNotExist: raise TemplateSyntaxError('"cache" tag got an unknown variable: %r' % self.expire_time_var.var) try: expire_time = int(expire_time) except (ValueError, TypeError): raise TemplateSyntaxError('"cache" tag got a non-integer timeout value: %r' % expire_time) if self.cache_name: try: cache_name = self.cache_name.resolve(context) except VariableDoesNotExist: raise TemplateSyntaxError('"cache" tag got an unknown variable: %r' % self.cache_name.var) try: fragment_cache = caches[cache_name] except InvalidCacheBackendError: raise TemplateSyntaxError('Invalid cache name specified for cache tag: %r' % cache_name) else: try: fragment_cache = caches['template_fragments'] except InvalidCacheBackendError: fragment_cache = caches['default'] vary_on = [var.resolve(context) for var in self.vary_on] cache_key = make_template_fragment_key(self.fragment_name, vary_on) value = fragment_cache.get(cache_key) if value is None: value = self.nodelist.render(context) fragment_cache.set(cache_key, value, expire_time) return value
def handle(self, *tablenames, **options): db = options['database'] self.verbosity = options['verbosity'] dry_run = options['dry_run'] if len(tablenames): # Legacy behavior, tablename specified as argument for tablename in tablenames: self.create_table(db, tablename, dry_run) else: for cache_alias in settings.CACHES: cache = caches[cache_alias] if isinstance(cache, BaseDatabaseCache): self.create_table(db, cache._table, dry_run)
def prepare(name): setup() from django.core.cache import caches obj = caches[name] for key in range(RANGE): key = str(key).encode('utf-8') obj.set(key, key) try: obj.close() except: pass
def get_component(): """ ??????API?? """ component = WeChatComponent( settings.COMPONENT_APP_ID, settings.COMPONENT_APP_SECRET, settings.COMPONENT_APP_TOKEN, settings.COMPONENT_ENCODINGAESKEY, session=caches['wechat'] ) return component
def get_cached_choices(self): if not self.cache_config['enabled']: return None c = caches(self.cache_config['cache']) return c.get(self.cache_config['key']%self.field_path)
def set_cached_choices(self,choices): if not self.cache_config['enabled']: return c = caches(self.cache_config['cache']) return c.set(self.cache_config['key']%self.field_path,choices)
def tiered_django_cache(cache_name_timeout_pairs): return TieredCache([ CacheWithTimeout(caches[cache_name], timeout) for cache_name, timeout in cache_name_timeout_pairs if timeout ])
def cache_backend(self): backend = self.app.conf.cache_backend return caches[backend] if backend else default_cache
def setUp(self): self.old_kf = settings.CACHES['default'].get('KEY_FUNCTION') self.old_rkf = settings.CACHES['default'].get('REVERSE_KEY_FUNCTION') settings.CACHES['default']['KEY_FUNCTION'] = 'redis_backend_testapp.tests.make_key' settings.CACHES['default']['REVERSE_KEY_FUNCTION'] = 'redis_backend_testapp.tests.reverse_key' self.cache = caches['default'] try: self.cache.clear() except Exception: pass