Python django.core.cache 模块,get_cache() 实例源码

我们从Python开源项目中,提取了以下7个代码示例,用于说明如何使用django.core.cache.get_cache()

项目:django-heartbeat    作者:pbs    | 项目源码 | 文件源码
def get_cache(cache_name):
    if hasattr(caches, '__call__'):
        return caches(cache_name)
    return caches[cache_name]
项目:YouPBX    作者:JoneXiong    | 项目源码 | 文件源码
def get_cache(k):
        from django.core.cache import caches
        return caches[k]
项目:tissuelab    作者:VirtualPlants    | 项目源码 | 文件源码
def __init__(self, session_key=None):
        self._cache = get_cache(settings.SESSION_CACHE_ALIAS)
        super(SessionStore, self).__init__(session_key)
项目:tissuelab    作者:VirtualPlants    | 项目源码 | 文件源码
def test_default_cache(self):
        self.session.save()
        self.assertNotEqual(get_cache('default').get(self.session.cache_key), None)
项目:tissuelab    作者:VirtualPlants    | 项目源码 | 文件源码
def test_non_default_cache(self):
        # Re-initalize the session backend to make use of overridden settings.
        self.session = self.backend()

        self.session.save()
        self.assertEqual(get_cache('default').get(self.session.cache_key), None)
        self.assertNotEqual(get_cache('sessions').get(self.session.cache_key), None)
项目:DCRM    作者:82Flex    | 项目源码 | 文件源码
def get_redis_connection(config, use_strict_redis=False):
    """
    Returns a redis connection from a connection config
    """
    redis_cls = redis.StrictRedis if use_strict_redis else redis.Redis

    if 'URL' in config:
        return redis_cls.from_url(config['URL'], db=config.get('DB'))
    if 'USE_REDIS_CACHE' in config.keys():

        try:
            from django.core.cache import caches
            cache = caches[config['USE_REDIS_CACHE']]
        except ImportError:
            from django.core.cache import get_cache
            cache = get_cache(config['USE_REDIS_CACHE'])

        if hasattr(cache, 'client'):
            # We're using django-redis. The cache's `client` attribute
            # is a pluggable backend that return its Redis connection as
            # its `client`
            try:
                # To get Redis connection on django-redis >= 3.4.0
                # we need to use cache.client.get_client() instead of
                # cache.client.client used in older versions
                try:
                    return cache.client.get_client()
                except AttributeError:
                    return cache.client.client
            except NotImplementedError:
                pass
        else:
            # We're using django-redis-cache
            try:
                return cache._client
            except AttributeError:
                # For django-redis-cache > 0.13.1
                return cache.get_master_client()

    if 'UNIX_SOCKET_PATH' in config:
        return redis_cls(unix_socket_path=config['UNIX_SOCKET_PATH'], db=config['DB'])

    return redis_cls(host=config['HOST'], port=config['PORT'], db=config['DB'], password=config.get('PASSWORD', None))
项目:tissuelab    作者:VirtualPlants    | 项目源码 | 文件源码
def create_test_db(self, verbosity=1, autoclobber=False):
        """
        Creates a test database, prompting the user for confirmation if the
        database already exists. Returns the name of the test database created.

        This method is overloaded to load up the SpatiaLite initialization
        SQL prior to calling the `syncdb` command.
        """
        # Don't import django.core.management if it isn't needed.
        from django.core.management import call_command

        test_database_name = self._get_test_db_name()

        if verbosity >= 1:
            test_db_repr = ''
            if verbosity >= 2:
                test_db_repr = " ('%s')" % test_database_name
            print("Creating test database for alias '%s'%s..." % (self.connection.alias, test_db_repr))

        self._create_test_db(verbosity, autoclobber)

        self.connection.close()
        self.connection.settings_dict["NAME"] = test_database_name

        # Need to load the SpatiaLite initialization SQL before running `syncdb`.
        self.load_spatialite_sql()

        # Report syncdb messages at one level lower than that requested.
        # This ensures we don't get flooded with messages during testing
        # (unless you really ask to be flooded)
        call_command('syncdb',
            verbosity=max(verbosity - 1, 0),
            interactive=False,
            database=self.connection.alias,
            load_initial_data=False)

        # We need to then do a flush to ensure that any data installed by
        # custom SQL has been removed. The only test data should come from
        # test fixtures, or autogenerated from post_syncdb triggers.
        # This has the side effect of loading initial data (which was
        # intentionally skipped in the syncdb).
        call_command('flush',
            verbosity=max(verbosity - 1, 0),
            interactive=False,
            database=self.connection.alias)

        from django.core.cache import get_cache
        from django.core.cache.backends.db import BaseDatabaseCache
        for cache_alias in settings.CACHES:
            cache = get_cache(cache_alias)
            if isinstance(cache, BaseDatabaseCache):
                call_command('createcachetable', cache._table, database=self.connection.alias)

        # Get a cursor (even though we don't need one yet). This has
        # the side effect of initializing the test database.
        cursor = self.connection.cursor()

        return test_database_name