Python redis 模块,WatchError() 实例源码

我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用redis.WatchError()

项目:aws-lambda-fsm-workflows    作者:Workiva    | 项目源码 | 文件源码
def test_aquire_lease_redis_available_lose_secondary(self,
                                                         mock_settings,
                                                         mock_time,
                                                         mock_get_secondary_cache_source,
                                                         mock_get_connection):
        mock_settings.ENDPOINTS = {}
        mock_settings.ELASTICACHE_ENDPOINTS = ELASTICACHE_ENDPOINTS_REDIS
        mock_time.time.return_value = 999.
        mock_get_secondary_cache_source.return_value = _get_test_arn(AWS.ELASTICACHE)
        mock_pipe = mock_get_connection.return_value.pipeline.return_value.__enter__.return_value
        mock_pipe.get.return_value = None
        mock_pipe.execute.side_effect = redis.WatchError
        ret = acquire_lease('a', 1, 1, primary=False)
        self.assertFalse(ret)
        mock_pipe.watch.assert_called_with('lease-a')
        mock_pipe.get.assert_called_with('lease-a')
        mock_pipe.multi.assert_called_with()
        mock_pipe.setex.assert_called_with('lease-a', 86400, '1:1:1299:1')
        mock_pipe.execute.assert_called_with()
项目:aws-lambda-fsm-workflows    作者:Workiva    | 项目源码 | 文件源码
def test_aquire_lease_redis_available_lose(self,
                                               mock_settings,
                                               mock_time,
                                               mock_get_primary_cache_source,
                                               mock_get_connection):
        mock_settings.ENDPOINTS = {}
        mock_settings.ELASTICACHE_ENDPOINTS = ELASTICACHE_ENDPOINTS_REDIS
        mock_time.time.return_value = 999.
        mock_get_primary_cache_source.return_value = _get_test_arn(AWS.ELASTICACHE)
        mock_pipe = mock_get_connection.return_value.pipeline.return_value.__enter__.return_value
        mock_pipe.get.return_value = None
        mock_pipe.execute.side_effect = redis.WatchError
        ret = acquire_lease('a', 1, 1)
        self.assertFalse(ret)
        mock_pipe.watch.assert_called_with('lease-a')
        mock_pipe.get.assert_called_with('lease-a')
        mock_pipe.multi.assert_called_with()
        mock_pipe.setex.assert_called_with('lease-a', 86400, '1:1:1299:1')
        mock_pipe.execute.assert_called_with()
项目:aws-lambda-fsm-workflows    作者:Workiva    | 项目源码 | 文件源码
def test_aquire_lease_redis_leased_expired_lose(self,
                                                    mock_settings,
                                                    mock_time,
                                                    mock_get_primary_cache_source,
                                                    mock_get_connection):
        mock_settings.ENDPOINTS = {}
        mock_settings.ELASTICACHE_ENDPOINTS = ELASTICACHE_ENDPOINTS_REDIS
        mock_time.time.return_value = 999.
        mock_get_primary_cache_source.return_value = _get_test_arn(AWS.ELASTICACHE)
        mock_pipe = mock_get_connection.return_value.pipeline.return_value.__enter__.return_value
        mock_pipe.get.return_value = '99:99:0:99'
        mock_pipe.execute.side_effect = redis.WatchError
        ret = acquire_lease('a', 1, 1)
        self.assertFalse(ret)
        mock_pipe.watch.assert_called_with('lease-a')
        mock_pipe.get.assert_called_with('lease-a')
        mock_pipe.multi.assert_called_with()
        mock_pipe.setex.assert_called_with('lease-a', 86400, '1:1:1299:100')
        mock_pipe.execute.assert_called_with()
项目:aws-lambda-fsm-workflows    作者:Workiva    | 项目源码 | 文件源码
def test_release_lease_redis_owned_self_loses(self,
                                                  mock_settings,
                                                  mock_get_primary_cache_source,
                                                  mock_get_connection):
        mock_settings.ENDPOINTS = {}
        mock_settings.ELASTICACHE_ENDPOINTS = ELASTICACHE_ENDPOINTS_REDIS
        mock_get_primary_cache_source.return_value = _get_test_arn(AWS.ELASTICACHE)
        mock_pipe = mock_get_connection.return_value.pipeline.return_value.__enter__.return_value
        mock_pipe.get.return_value = '99:99:99:99'
        mock_pipe.execute.side_effect = redis.WatchError
        ret = release_lease('a', 99, 99, 99)
        self.assertFalse(ret)
        mock_pipe.watch.assert_called_with('lease-a')
        mock_pipe.get.assert_called_with('lease-a')
        mock_pipe.multi.assert_called_with()
        mock_pipe.setex.assert_called_with('lease-a', 86400, '-1:-1:0:99')
        mock_pipe.execute.assert_called_with()
项目:BIGSI    作者:Phelimb    | 项目源码 | 文件源码
def _batch_insert_prob_redis(conn, names, all_hashes, colour, count=0):
    r = conn
    with r.pipeline() as pipe:
        try:
            pipe.watch(names)
            vals = get_vals(r, names, all_hashes)
            pipe.multi()
            for name, values, hs in zip(names, vals, all_hashes):
                for val, h in zip(values, hs):
                    ba = BitArray()
                    if val is None:
                        val = b''
                    ba.frombytes(val)
                    ba.setbit(colour, 1)
                    pipe.hset(name, h, ba.tobytes())
            pipe.execute()
        except redis.WatchError:
            logger.warning("Retrying %s %s " % (r, name))
            if count < 5:
                self._batch_insert(conn, hk, colour, count=count+1)
            else:
                logger.warning(
                    "Failed %s %s. Too many retries. Contining regardless." % (r, name))
项目:evolution-strategies-starter    作者:openai    | 项目源码 | 文件源码
def get_current_task(self):
        with self.local_redis.pipeline() as pipe:
            while True:
                try:
                    pipe.watch(TASK_ID_KEY)
                    task_id = int(retry_get(pipe, TASK_ID_KEY))
                    if task_id == self.cached_task_id:
                        logger.debug('[worker] Returning cached task {}'.format(task_id))
                        break
                    pipe.multi()
                    pipe.get(TASK_DATA_KEY)
                    logger.info('[worker] Getting new task {}. Cached task was {}'.format(task_id, self.cached_task_id))
                    self.cached_task_id, self.cached_task_data = task_id, deserialize(pipe.execute()[0])
                    break
                except redis.WatchError:
                    continue
        return self.cached_task_id, self.cached_task_data
项目:dramatiq    作者:Bogdanp    | 项目源码 | 文件源码
def incr(self, key, amount, maximum, ttl):
        with self.client.pipeline() as pipe:
            while True:
                try:
                    pipe.watch(key)
                    value = int(pipe.get(key) or b"0")
                    value += amount
                    if value > maximum:
                        return False

                    pipe.multi()
                    pipe.set(key, value, px=ttl)
                    pipe.execute()
                    return True
                except redis.WatchError:
                    continue
项目:dramatiq    作者:Bogdanp    | 项目源码 | 文件源码
def decr(self, key, amount, minimum, ttl):
        with self.client.pipeline() as pipe:
            while True:
                try:
                    pipe.watch(key)
                    value = int(pipe.get(key) or b"0")
                    value -= amount
                    if value < minimum:
                        return False

                    pipe.multi()
                    pipe.set(key, value, px=ttl)
                    pipe.execute()
                    return True
                except redis.WatchError:
                    continue
项目:dramatiq    作者:Bogdanp    | 项目源码 | 文件源码
def incr_and_sum(self, key, keys, amount, maximum, ttl):
        with self.client.pipeline() as pipe:
            while True:
                try:
                    pipe.watch(key, *keys)
                    value = int(pipe.get(key) or b"0")
                    value += amount
                    if value > maximum:
                        return False

                    values = pipe.mget(keys)
                    total = amount + sum(int(n) for n in values if n)
                    if total > maximum:
                        return False

                    pipe.multi()
                    pipe.set(key, value, px=ttl)
                    pipe.execute()
                    return True
                except redis.WatchError:
                    continue
项目:zing    作者:evernote    | 项目源码 | 文件源码
def save_enqueued(self, pipe):
        """
        Preparing job to enqueue. Works via pipeline.
        Nothing done if WatchError happens while next `pipeline.execute()`.
        """
        job = self.create_job(status=JobStatus.QUEUED)
        self.set_job_params(pipeline=pipe)
        job.origin = self.origin
        job.enqueued_at = utcnow()
        if job.timeout is None:
            job.timeout = self.timeout
        job.save(pipeline=pipe)
        self.job = job
项目:zing    作者:evernote    | 项目源码 | 文件源码
def save_deferred(self, depends_on, pipe):
        """
        Preparing job to defer (add as dependent). Works via pipeline.
        Nothing done if WatchError happens while next `pipeline.execute()`.
        """
        job = self.create_job(depends_on=depends_on, status=JobStatus.DEFERRED)
        self.set_job_params(pipeline=pipe)
        job.register_dependency(pipeline=pipe)
        job.save(pipeline=pipe)

        return job
项目:0ops.exed    作者:whisperaven    | 项目源码 | 文件源码
def create(self, redis):
        """ 
        Create job by create job context in redis, each job will create len(hosts) meta keys, 
            conflict dectection is done by redis key exists check of job meta keys. 

        If no conflict, job context (the `meta_keys`) will created via redis pipeline 
            to avoid operate confilct.
        """
        pipeline = redis.pipeline()
        try:
            pipeline.watch(self.meta_keys)
            for key in self.meta_keys:
                if pipeline.exists(key):
                    raise JobConflictError("operate conflict, job already exists on some host(s)")

            LOG.info("going to create job meta data <{0}>".format(';'.join(self.meta_keys)))
            pipeline.multi()
            for key in self.meta_keys:
                pipeline.hmset(key, dict(startat=self._startat))
            pipeline.execute()
            LOG.info("job meta data create finished, <{0}>".format(';'.join(self.meta_keys)))

        except WatchError:
            LOG.info("conflict detected on job meta data create <{0}>".format(';'.join(self.meta_keys)))
            raise JobConflictError("operate conflict, try again later")
        finally:
            pipeline.reset()
项目:spdb    作者:jhuapl-boss    | 项目源码 | 文件源码
def get_delayed_writes(self, delayed_write_key):
        """
        Method to get all delayed write-cuboid keys for a single delayed_write_key

        Returns:
            list(str): List of delayed write-cuboid keys
        """
        write_cuboid_key_list = []
        with self.status_client.pipeline() as pipe:
            try:
                # Get all items in the list and cleanup, in a transaction so other procs can't add anything
                pipe.watch(delayed_write_key)
                pipe.multi()

                # Get all items in the list
                pipe.lrange(delayed_write_key, 0, -1)

                # Delete the delayed-write-key as it should be empty now
                pipe.delete(delayed_write_key)

                # Delete its associated resource-delayed-write key that stores the resource string
                pipe.delete("RESOURCE-{}".format(delayed_write_key))

                # Execute.
                write_cuboid_key_list = pipe.execute()

                # If you got here things worked OK. Clean up the result. First entry in list is the LRANGE result
                write_cuboid_key_list = write_cuboid_key_list[0]

                # Keys are encoded
                write_cuboid_key_list = [x.decode() for x in write_cuboid_key_list]

            except redis.WatchError as _:
                # Watch error occurred. Just bail out and let the daemon pick this up later.
                return []

            except Exception as e:
                raise SpdbError("An error occurred while attempting to retrieve delay-write keys: \n {}".format(e),
                                ErrorCodes.REDIS_ERROR)

        return write_cuboid_key_list
项目:ray    作者:ray-project    | 项目源码 | 文件源码
def release_gpus_in_use(driver_id, local_scheduler_id, gpu_ids, redis_client):
    """Release the GPUs that a given worker was using.

    Note that this does not affect the local scheduler's bookkeeping. It only
    affects the GPU allocations which are recorded in the primary Redis shard,
    which are redundant with the local scheduler bookkeeping.

    Args:
        driver_id: The ID of the driver that is releasing some GPUs.
        local_scheduler_id: The ID of the local scheduler that owns the GPUs
            being released.
        gpu_ids: The IDs of the GPUs being released.
        redis_client: A client for the primary Redis shard.
    """
    # Attempt to release GPU IDs atomically.
    with redis_client.pipeline() as pipe:
        while True:
            try:
                # If this key is changed before the transaction below (the
                # multi/exec block), then the transaction will not take place.
                pipe.watch(local_scheduler_id)

                # Figure out which GPUs are currently in use.
                result = redis_client.hget(local_scheduler_id, "gpus_in_use")
                gpus_in_use = dict() if result is None else json.loads(
                    result.decode("ascii"))

                assert driver_id in gpus_in_use
                assert gpus_in_use[driver_id] >= len(gpu_ids)

                gpus_in_use[driver_id] -= len(gpu_ids)

                pipe.multi()

                pipe.hset(local_scheduler_id, "gpus_in_use",
                          json.dumps(gpus_in_use))

                pipe.execute()
                # If a WatchError is not raised, then the operations should
                # have gone through atomically.
                break
            except redis.WatchError:
                # Another client must have changed the watched key between the
                # time we started WATCHing it and the pipeline's execution. We
                # should just retry.
                continue
项目:aws-lambda-fsm-workflows    作者:Workiva    | 项目源码 | 文件源码
def _release_lease_redis(cache_arn, correlation_id, steps, retries, fence_token):
    """
    Releases a lease from redis.
    """
    import redis

    redis_conn = get_connection(cache_arn)
    if not redis_conn:
        return  # pragma: no cover

    with redis_conn.pipeline() as pipe:

        try:

            # get the current value of the lease (within a watch)
            redis_key = LEASE_DATA.LEASE_KEY_PREFIX + correlation_id
            pipe.watch(redis_key)
            current_lease_value = pipe.get(redis_key)
            pipe.multi()

            # if there is already a lease holder, then we have a few options
            if current_lease_value:

                # split the current lease apart
                current_steps, current_retries, current_time, current_fence_token = \
                    _deserialize_lease_value(current_lease_value)

                # release it by:
                # 1. setting the lease value to "unowned" (steps/retries = -1)
                # 2. setting it as expired (expires = 0) with set
                # 3. setting the fence token to the current value so it can be incremented later
                if (current_steps, current_retries, current_fence_token) == (steps, retries, fence_token):
                    new_fence_token = fence_token
                    new_lease_value = _serialize_lease_value(-1, -1, 0, new_fence_token)
                    pipe.setex(redis_key, LEASE_DATA.LEASE_CLEANUP_TIMEOUT, new_lease_value)

                # otherwise, something else owns the lease, so we can't release it
                else:
                    return False

            else:

                # the lease is no longer owned by anyone
                return False

            # execute the transaction
            pipe.execute()

            # if we make it this far, we have released the lease
            return True

        except redis.WatchError:
            return False

        except redis.exceptions.ConnectionError:
            logger.exception('')
            return 0
项目:kinto-redis    作者:Kinto    | 项目源码 | 文件源码
def _bump_timestamp(self, collection_id, parent_id, record=None,
                        modified_field=None, last_modified=None):

        key = '{0}.{1}.timestamp'.format(collection_id, parent_id)
        while 1:
            with self._client.pipeline() as pipe:
                try:
                    pipe.watch(key)
                    previous = pipe.get(key)
                    pipe.multi()
                    # XXX factorize code from memory and redis backends.
                    is_specified = (record is not None and
                                    modified_field in record or
                                    last_modified is not None)
                    if is_specified:
                        # If there is a timestamp in the new record,
                        # try to use it.
                        if last_modified is not None:
                            current = last_modified
                        else:
                            current = record[modified_field]
                    else:
                        current = utils.msec_time()

                    if previous and int(previous) >= current:
                        collection_timestamp = int(previous) + 1
                    else:
                        collection_timestamp = current

                    # Return the newly generated timestamp as the current one
                    # only if nothing else was specified.
                    is_equal = previous and int(previous) == current
                    if not is_specified or is_equal:
                        current = collection_timestamp

                    pipe.set(key, collection_timestamp)
                    pipe.execute()
                    return current
                except redis.WatchError:  # pragma: no cover
                    # Our timestamp has been modified by someone else, let's
                    # retry.
                    # XXX: untested.
                    continue
项目:spdb    作者:jhuapl-boss    | 项目源码 | 文件源码
def add_to_page_out(self, temp_page_out_key, lookup_key, resolution, morton, time_sample):
        """
        Method to add a key to the page-out tracking set

        Args:
            lookup_key (str): Lookup key for a channel
            resolution (int): level in the resolution heirarchy
            morton (int): morton id for the cuboid
            time_sample (int): time sample for cuboid

        Returns:
            (bool, bool): Tuple where first value is if the transaction succeeded and the second is if the key is in
            page out already
        """
        page_out_key = "PAGE-OUT&{}&{}".format(lookup_key, resolution)
        in_page_out = True
        cnt = 0
        with self.status_client.pipeline() as pipe:
            while 1:
                try:
                    # Create temp set
                    pipe.watch(page_out_key)
                    pipe.multi()
                    pipe.sadd(temp_page_out_key, "{}&{}".format(time_sample, morton))
                    pipe.expire(temp_page_out_key, 15)
                    pipe.sdiff(temp_page_out_key, page_out_key)
                    pipe.sadd(page_out_key, "{}&{}".format(time_sample, morton))
                    result = pipe.execute()

                    if len(result[2]) > 0:
                        in_page_out = False
                    else:
                        in_page_out = True

                    break
                except redis.WatchError as e:
                    # Watch error occurred, try again!
                    cnt += 1

                    if cnt > 200:
                        raise SpdbError("Failed to add to page out due to timeout. {}".format(e),
                                        ErrorCodes.REDIS_ERROR)
                    continue

                except Exception as e:
                    raise SpdbError("Failed to check page-out set. {}".format(e),
                                    ErrorCodes.REDIS_ERROR)

        return in_page_out