Python aioredis 模块,Redis() 实例源码

我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用aioredis.Redis()

项目:telegram-reddit-newsbot    作者:sashgorokhov    | 项目源码 | 文件源码
def get_redis(address=None, loop=None, recreate=False) -> aioredis.Redis:
    global _redis
    address = address or settings.CONFIG['redis']
    kwargs = utils.parse_redis_url(address)
    kwargs['address'] = kwargs.pop('host'), kwargs.pop('port')
    if not _redis or recreate:
        _redis = await aioredis.create_reconnecting_redis(loop=loop, **kwargs)
    return _redis
项目:lightbus    作者:adamcharnock    | 项目源码 | 文件源码
def set_redis_pool(self, redis_pool: Optional[Redis]):
        if redis_pool:
            if isinstance(redis_pool, (ConnectionsPool,)):
                # If they've passed a raw pool then wrap it up in a Redis object.
                # aioredis.create_redis_pool() normally does this for us.
                redis_pool = Redis(redis_pool)
            if not isinstance(redis_pool, (Redis,)):
                raise InvalidRedisPool(
                    'Invalid Redis connection provided: {}. If unsure, use aioredis.create_redis_pool() to '
                    'create your redis connection.'.format(redis_pool)
                )
            if not isinstance(redis_pool._pool_or_conn, (ConnectionsPool,)):
                raise InvalidRedisPool(
                    'The provided redis connection is backed by a single connection, rather than a '
                    'pool of connections. This will lead to lightbus deadlocks and is unsupported. '
                    'If unsure, use aioredis.create_redis_pool() to create your redis connection.'
                )

            self._redis_pool = redis_pool
项目:lightbus    作者:adamcharnock    | 项目源码 | 文件源码
def call_rpc(self, rpc_message: RpcMessage):
        stream = '{}:stream'.format(rpc_message.api_name)
        logger.debug(
            LBullets(
                L("Enqueuing message {} in Redis stream {}", Bold(rpc_message), Bold(stream)),
                items=rpc_message.to_dict()
            )
        )

        pool = await self.get_redis_pool()
        with await pool as redis:
            start_time = time.time()
            # TODO: MAXLEN
            await redis.xadd(stream=stream, fields=rpc_message.to_dict())

        logger.info(L(
            "Enqueued message {} in Redis in {} stream {}",
            Bold(rpc_message), human_time(time.time() - start_time), Bold(stream)
        ))
项目:lightbus    作者:adamcharnock    | 项目源码 | 文件源码
def send_result(self, rpc_message: RpcMessage, result_message: ResultMessage, return_path: str):
        logger.debug(L(
            "Sending result {} into Redis using return path {}",
            Bold(result_message), Bold(return_path)
        ))
        redis_key = self._parse_return_path(return_path)

        pool = await self.get_redis_pool()
        with await pool as redis:
            start_time = time.time()
            p = redis.pipeline()
            p.lpush(redis_key, redis_encode(result_message.result))
            # TODO: Make result expiry configurable
            p.expire(redis_key, timeout=60)
            await p.execute()

        logger.debug(L(
            "? Sent result {} into Redis in {} using return path {}",
            Bold(result_message), human_time(time.time() - start_time), Bold(return_path)
        ))
项目:lightbus    作者:adamcharnock    | 项目源码 | 文件源码
def receive_result(self, rpc_message: RpcMessage, return_path: str) -> ResultMessage:
        logger.info(L("? Awaiting Redis result for RPC message: {}", Bold(rpc_message)))
        redis_key = self._parse_return_path(return_path)

        pool = await self.get_redis_pool()
        with await pool as redis:
            start_time = time.time()
            # TODO: Make timeout configurable
            _, result = await redis.blpop(redis_key, timeout=5)
            result = redis_decode(result)

        logger.info(L(
            "? Received Redis result in {} for RPC message {}: {}",
            human_time(time.time() - start_time), rpc_message, Bold(result)
        ))
        return result
项目:arq    作者:samuelcolvin    | 项目源码 | 文件源码
def get_redis(self) -> Redis:
        """
        Get the redis pool, if a pool is already initialised it's returned, else one is crated.
        """
        async with self._create_pool_lock:
            if self.redis is None:
                self.redis = await self.create_redis_pool()
        return self.redis
项目:ns-bot    作者:eigenein    | 项目源码 | 文件源码
def __init__(self, connection: aioredis.Redis):
        self.connection = connection
项目:aiocache    作者:argaen    | 项目源码 | 文件源码
def conn(func):
    @functools.wraps(func)
    async def wrapper(self, *args, _conn=None, **kwargs):
        if _conn is None:

            pool = await self._get_pool()
            conn_context = await pool
            with conn_context as _conn:
                if not AIOREDIS_BEFORE_ONE:
                    _conn = aioredis.Redis(_conn)
                return await func(self, *args, _conn=_conn, **kwargs)

        return await func(self, *args, _conn=_conn, **kwargs)
    return wrapper
项目:aiocache    作者:argaen    | 项目源码 | 文件源码
def acquire_conn(self):
        await self._get_pool()
        conn = await self._pool.acquire()
        if not AIOREDIS_BEFORE_ONE:
            conn = aioredis.Redis(conn)
        return conn
项目:telegram-reddit-newsbot    作者:sashgorokhov    | 项目源码 | 文件源码
def __init__(self, queue=None):
        self.queue = queue or list()
        redis = mock.MagicMock(aioredis.Redis)
        redis.rpush.side_effect = utils.make_coro(result=lambda key, item: self.queue.append(item))
        redis.rpop.side_effect = utils.make_coro(result=lambda key: self.queue.pop())
        redis.lpush.side_effect = utils.make_coro(result=lambda key, item: self.queue.insert(0, item))
        redis.llen.side_effect = utils.make_coro(result=lambda key: len(self.queue))
        super(MockQueue, self).__init__(redis, None)
项目:telegram-reddit-newsbot    作者:sashgorokhov    | 项目源码 | 文件源码
def __init__(self, redis, key):
        """
        :param aioredis.Redis redis:
        """
        super(RedisQueue, self).__init__()
        self.redis = redis
        self.key = key
项目:arq    作者:samuelcolvin    | 项目源码 | 文件源码
def __init__(self, *,
                 redis: Redis,
                 max_concurrent_tasks: int=50,
                 shutdown_delay: float=6,
                 timeout_seconds: int=60,
                 burst_mode: bool=True,
                 raise_task_exception: bool=False,
                 semaphore_timeout: float=60) -> None:
        """
        :param redis: redis pool to get connection from to pop items from list, also used to optionally
            re-enqueue pending jobs on termination
        :param max_concurrent_tasks: maximum number of jobs which can be execute at the same time by the event loop
        :param shutdown_delay: number of seconds to wait for tasks to finish
        :param timeout_seconds: maximum duration of a job, after that the job will be cancelled by the event loop
        :param burst_mode: break the iter loop as soon as no more jobs are available by adding an sentinel quit queue
        :param raise_task_exception: whether or not to raise an exception which occurs in a processed task
        """
        self.redis = redis
        self.loop = redis._pool_or_conn._loop
        self.max_concurrent_tasks = max_concurrent_tasks
        self.task_semaphore = asyncio.Semaphore(value=max_concurrent_tasks, loop=self.loop)
        self.shutdown_delay = max(shutdown_delay, 0.1)
        self.timeout_seconds = timeout_seconds
        self.burst_mode = burst_mode
        self.raise_task_exception = raise_task_exception
        self.pending_tasks: Set[asyncio.futures.Future] = set()
        self.task_exception: Exception = None
        self.semaphore_timeout = semaphore_timeout

        self.jobs_complete, self.jobs_failed, self.jobs_timed_out = 0, 0, 0
        self.running = False
        self._finish_lock = asyncio.Lock(loop=self.loop)
项目:arq    作者:samuelcolvin    | 项目源码 | 文件源码
def create_pool_lenient(settings: RedisSettings, loop: asyncio.AbstractEventLoop, *,
                              _retry: int=0) -> Redis:
    """
    Create a new redis pool, retrying up to conn_retries times if the connection fails.
    :param settings: RedisSettings instance
    :param loop: event loop
    :param _retry: retry attempt, this is set when the method calls itself recursively
    """
    addr = settings.host, settings.port
    try:
        pool = await aioredis.create_redis_pool(
            addr, loop=loop, db=settings.database, password=settings.password,
            timeout=settings.conn_timeout
        )
    except (ConnectionError, OSError, aioredis.RedisError, asyncio.TimeoutError) as e:
        if _retry < settings.conn_retries:
            logger.warning('redis connection error %s %s, %d retries remaining...',
                           e.__class__.__name__, e, settings.conn_retries - _retry)
            await asyncio.sleep(settings.conn_retry_delay)
        else:
            raise
    else:
        if _retry > 0:
            logger.info('redis connection successful')
        return pool

    # recursively attempt to create the pool outside the except block to avoid
    # "During handling of the above exception..." madness
    return await create_pool_lenient(settings, loop, _retry=_retry + 1)
项目:arq    作者:samuelcolvin    | 项目源码 | 文件源码
def __init__(self, *,
                 loop: asyncio.AbstractEventLoop=None,
                 redis_settings: RedisSettings=None,
                 existing_redis: Redis=None) -> None:
        """
        :param loop: asyncio loop to use for the redis pool
        :param redis_settings: connection settings to use for the pool
        :param existing_redis: existing pool, if set no new pool is created, instead this one is used
        """
        # the "or getattr(...) or" seems odd but it allows the mixin to work with subclasses which initialise
        # loop or redis_settings before calling super().__init__ and don't pass those parameters through in kwargs.
        self.loop = loop or getattr(self, 'loop', None) or asyncio.get_event_loop()
        self.redis_settings = redis_settings or getattr(self, 'redis_settings', None) or RedisSettings()
        self.redis = existing_redis
        self._create_pool_lock = asyncio.Lock(loop=self.loop)
项目:lightbus    作者:adamcharnock    | 项目源码 | 文件源码
def get_redis_pool(self) -> Redis:
        if self._redis_pool is None:
            self._redis_pool = await aioredis.create_redis_pool(**self.connection_kwargs)
        return self._redis_pool