Python grpc 模块,RpcError() 实例源码

我们从Python开源项目中,提取了以下38个代码示例,用于说明如何使用grpc.RpcError()

项目:gax-python    作者:googleapis    | 项目源码 | 文件源码
def test_wrap_value_error(self):
        from google.gax.errors import InvalidArgumentError

        invalid_attribute_exc = grpc.RpcError()
        invalid_attribute_exc.code = lambda: grpc.StatusCode.INVALID_ARGUMENT

        def value_error_func(*dummy_args, **dummy_kwargs):
            raise invalid_attribute_exc

        value_error_callable = api_callable.create_api_call(
            value_error_func, _CallSettings())

        with self.assertRaises(ValueError) as exc_info:
            value_error_callable(None)

        self.assertIsInstance(exc_info.exception, InvalidArgumentError)
        self.assertEqual(exc_info.exception.args, (u'RPC failed',))
        self.assertIs(exc_info.exception.cause, invalid_attribute_exc)
项目:python-etcd3    作者:kragniz    | 项目源码 | 文件源码
def _handle_errors(f):
    if inspect.isgeneratorfunction(f):
        def handler(*args, **kwargs):
            try:
                for data in f(*args, **kwargs):
                    yield data
            except grpc.RpcError as exc:
                _translate_exception(exc)
    else:
        def handler(*args, **kwargs):
            try:
                return f(*args, **kwargs)
            except grpc.RpcError as exc:
                _translate_exception(exc)

    return functools.wraps(f)(handler)
项目:appbackendapi    作者:codesdk    | 项目源码 | 文件源码
def main():
    service = cloud_speech_pb2.SpeechStub(
        make_channel('speech.googleapis.com', 443))

    # For streaming audio from the microphone, there are three threads.
    # First, a thread that collects audio data as it comes in
    with record_audio(RATE, CHUNK) as buffered_audio_data:
        # Second, a thread that sends requests with that data
        requests = request_stream(buffered_audio_data, RATE)
        # Third, a thread that listens for transcription responses
        recognize_stream = service.StreamingRecognize(
            requests, DEADLINE_SECS)

        # Exit things cleanly on interrupt
        signal.signal(signal.SIGINT, lambda *_: recognize_stream.cancel())

        # Now, put the transcription responses to use.
        try:
            listen_print_loop(recognize_stream)

            recognize_stream.cancel()
        except grpc.RpcError:
            # This happens because of the interrupt handler
            pass
项目:PowerSpikeGG    作者:PowerSpikeGG    | 项目源码 | 文件源码
def test_bad_request(self):
        """Check if the server correctly raise error on bad request."""
        with self.assertRaises(grpc.RpcError):
            self.stub.Match(service_pb2.MatchRequest(id=4242))
        with self.assertRaises(grpc.RpcError):
            self.stub.Match(service_pb2.MatchRequest())
        with self.assertRaises(grpc.RpcError):
            self.stub.Match(service_pb2.MatchRequest(region=constants_pb2.EUW))
项目:gax-python    作者:googleapis    | 项目源码 | 文件源码
def exc_to_code(exc):
    """Retrieves the status code from an exception"""
    if not isinstance(exc, RpcError):
        return None
    else:
        try:
            return exc.code()
        except AttributeError:
            return None
项目:ok_google    作者:respeaker    | 项目源码 | 文件源码
def is_grpc_error_unavailable(e):
        is_grpc_error = isinstance(e, grpc.RpcError)
        if is_grpc_error and (e.code() == grpc.StatusCode.UNAVAILABLE):
            logging.error('grpc unavailable error: %s', e)
            return True
        return False
项目:aiyprojects-raspbian    作者:google    | 项目源码 | 文件源码
def do_request(self):
        """Establishes a connection and starts sending audio to the cloud
        endpoint. Responses are handled by the subclass until one returns a
        result.

        Returns:
            namedtuple with the following fields:
                transcript: string with transcript of user query
                response_audio: optionally, an audio response from the server

        Raises speech.Error on error.
        """
        try:
            service = self._make_service(self._channel_factory.make_channel())

            response_stream = self._create_response_stream(
                service, self._request_stream(), self.DEADLINE_SECS)

            if self._audio_logging_enabled:
                self._start_logging_request()

            return self._handle_response_stream(response_stream)
        except (
                google.auth.exceptions.GoogleAuthError,
                grpc.RpcError,
        ) as exc:
            raise Error('Exception in speech request') from exc
项目:aiyprojects-raspbian    作者:google    | 项目源码 | 文件源码
def do_request(self):
        """Establishes a connection and starts sending audio to the cloud
        endpoint. Responses are handled by the subclass until one returns a
        result.

        Returns:
            namedtuple with the following fields:
                transcript: string with transcript of user query
                response_audio: optionally, an audio response from the server

        Raises speech.Error on error.
        """
        try:
            service = self._make_service(self._channel_factory.make_channel())

            response_stream = self._create_response_stream(
                service, self._request_stream(), self.DEADLINE_SECS)

            if self._audio_logging_enabled:
                self._start_logging_request()

            return self._handle_response_stream(response_stream)
        except (
                google.auth.exceptions.GoogleAuthError,
                grpc.RpcError,
        ) as exc:
            raise Error('Exception in speech request') from exc
项目:relaax    作者:deeplearninc    | 项目源码 | 文件源码
def _init_ten_times(self):
        message = bridge_pb2.NullMessage()
        for _ in range(9):
            try:
                return self._stub.Init(message)
            except grpc.RpcError as e:
                pass
        return self._stub.Init(message)
项目:relaax    作者:deeplearninc    | 项目源码 | 文件源码
def send(self, method_name, message_factory):
        if self._stub is None:
            self._stub = bridge_pb2.BridgeStub(grpc.insecure_channel('%s:%d' % self._server))
            for _ in range(9):
                method = getattr(self._stub, method_name)
                message = message_factory()
                try:
                    return method(message)
                except grpc.RpcError as e:
                    pass
        return getattr(self._stub, method_name)(message_factory())
项目:python-etcd3    作者:kragniz    | 项目源码 | 文件源码
def run(self):
        try:
            for response in self._watch_response_iterator:
                if response.created:
                    self._watch_id_callbacks[response.watch_id] = \
                        self._callback
                    self._watch_id_queue.put(response.watch_id)

                callback = self._watch_id_callbacks.get(response.watch_id)
                if callback:
                    # The watcher can be safely reused, but adding a new event
                    # to indicate that the revision is already compacted
                    # requires api change which would break all users of this
                    # module. So, raising an exception if a watcher is still
                    # alive. The caller has to create a new client instance to
                    # recover would break all users of this module.
                    if response.compact_revision != 0:
                        callback(etcd3_exceptions.RevisionCompactedError(
                            response.compact_revision))
                        self.cancel(response.watch_id)
                        continue
                    for event in response.events:
                        callback(events.new_event(event))
        except grpc.RpcError as e:
            self.stop()
            if self._watch_id_callbacks:
                for callback in self._watch_id_callbacks.values():
                    callback(e)
项目:python-etcd3    作者:kragniz    | 项目源码 | 文件源码
def test_grpc_exception_on_unknown_code(self, etcd):
        exception = self.MockedException(grpc.StatusCode.DATA_LOSS)
        kv_mock = mock.MagicMock()
        kv_mock.Range.side_effect = exception
        etcd.kvstub = kv_mock

        with pytest.raises(grpc.RpcError):
            etcd.get("foo")
项目:python-etcd3    作者:kragniz    | 项目源码 | 文件源码
def test_compact(self, etcd):
        etcd.compact(3)
        with pytest.raises(grpc.RpcError):
            etcd.compact(3)
项目:assistant-sdk-python    作者:googlesamples    | 项目源码 | 文件源码
def is_grpc_error_unavailable(e):
        is_grpc_error = isinstance(e, grpc.RpcError)
        if is_grpc_error and (e.code() == grpc.StatusCode.UNAVAILABLE):
            logging.error('grpc unavailable error: %s', e)
            return True
        return False
项目:tfserving_predict_client    作者:epigramai    | 项目源码 | 文件源码
def predict(self, request_data, request_timeout=10):

        logger.info('Sending request to tfserving model')
        logger.info('Model name: ' + str(self.model_name))
        logger.info('Model version: ' + str(self.model_version))
        logger.info('Host: ' + str(self.host))

        tensor_shape = request_data.shape

        if self.model_name == 'incv4' or self.model_name == 'res152':
            features_tensor_proto = tf.contrib.util.make_tensor_proto(request_data, shape=tensor_shape)
        else:
            features_tensor_proto = tf.contrib.util.make_tensor_proto(request_data,
                                                                      dtype=tf.float32, shape=tensor_shape)

        # Create gRPC client and request
        channel = grpc.insecure_channel(self.host)
        stub = PredictionServiceStub(channel)
        request = PredictRequest()

        request.model_spec.name = self.model_name

        if self.model_version > 0:
            request.model_spec.version.value = self.model_version

        request.inputs['inputs'].CopyFrom(features_tensor_proto)

        try:
            result = stub.Predict(request, timeout=request_timeout)
            logger.info('Got scores with len: ' + str(len(list(result.outputs['scores'].float_val))))
            return list(result.outputs['scores'].float_val)
        except RpcError as e:
            logger.error(e)
            logger.error('Prediction failed!')
项目:tfserving_predict_client    作者:epigramai    | 项目源码 | 文件源码
def predict(self, request_data, request_timeout=10):

        logger.info('Sending request to tfserving model')
        logger.info('Model name: ' + str(self.model_name))
        logger.info('Model version: ' + str(self.model_version))
        logger.info('Host: ' + str(self.host))

        tensor_shape = request_data.shape

        if self.model_name == 'incv4' or self.model_name == 'res152':
            features_tensor_proto = tf.contrib.util.make_tensor_proto(request_data, shape=tensor_shape)
        else:
            features_tensor_proto = tf.contrib.util.make_tensor_proto(request_data,
                                                                      dtype=tf.float32, shape=tensor_shape)

        # Create gRPC client and request
        channel = grpc.insecure_channel(self.host)
        stub = PredictionServiceStub(channel)
        request = PredictRequest()

        request.model_spec.name = self.model_name

        if self.model_version > 0:
            request.model_spec.version.value = self.model_version

        request.inputs['inputs'].CopyFrom(features_tensor_proto)

        try:
            result = stub.Predict(request, timeout=request_timeout)
            logger.debug('Predicted scores with len: ' + str(len(list(result.outputs['scores'].float_val))))
            return list(result.outputs['scores'].float_val)
        except RpcError as e:
            logger.warning(e)
            logger.warning('Prediction failed. Mock client will return empty prediction of length: '
                           + str(self.num_scores))
            return [0] * self.num_scores
项目:aioetcd3    作者:gaopeiliang    | 项目源码 | 文件源码
def member_healthy(self, members=None):

        if not members:
            members = await self.member_list()
            members = [m.clientURLs for m in members]

        health_members = []
        unhealth_members = []
        for m in members:

            m = [u.rpartition("//")[2] for u in m]
            server_endpoint = ipv4_endpoints(m)

            if self._credentials:
                channel = aiogrpc.secure_channel(server_endpoint, self._credentials, options=self._options,
                                                 loop=self._loop, executor=self._executor,
                                                 standalone_pool_for_streaming=True)
            else:
                channel = aiogrpc.insecure_channel(server_endpoint, options=self._options, loop=self._loop,
                                                   executor=self._executor, standalone_pool_for_streaming=True)

            maintenance = Maintenance(channel=channel, timeout=2)
            try:
                await maintenance.status()
            except grpc.RpcError:
                unhealth_members.append(m)
            else:
                health_members.append(m)

        return health_members, unhealth_members
项目:yarntf    作者:tobiajo    | 项目源码 | 文件源码
def register_container(self, application_id, ip, port, job_name, task_index):
        container = csg.Container()
        container.applicationId = application_id
        container.ip = ip
        container.port = port
        container.jobName = job_name
        container.taskIndex = task_index
        request = csg.RegisterContainerRequest(container=container)
        try:
            self.stub.RegisterContainer(request)
        except grpc.RpcError:
            return False
        return True
项目:yarntf    作者:tobiajo    | 项目源码 | 文件源码
def get_cluster_spec(self, application_id):
        request = csg.GetClusterSpecRequest()
        request.applicationId = application_id
        try:
            reply = self.stub.GetClusterSpec(request)
        except grpc.RpcError:
            return None
        return reply.clusterSpec
项目:GassistPi    作者:shivasiddharth    | 项目源码 | 文件源码
def is_grpc_error_unavailable(e):
        is_grpc_error = isinstance(e, grpc.RpcError)
        if is_grpc_error and (e.code() == grpc.StatusCode.UNAVAILABLE):
            logging.error('grpc unavailable error: %s', e)
            return True
        return False
项目:python-speech-recog    作者:whittlbc    | 项目源码 | 文件源码
def listen(self):
        service = cloud_speech_pb2.SpeechStub(self.make_channel('speech.googleapis.com', 443))

        # For streaming audio from the microphone, there are three threads.
        # First, a thread that collects audio data as it comes in
        with self.record_audio(self.RATE, self.CHUNK) as buff:
            # Second, a thread that sends requests with that data
            overlap_buffer = collections.deque(maxlen=self.SECS_OVERLAP * self.RATE / self.CHUNK)
            requests = self.request_stream(self._audio_data_generator(buff, overlap_buffer), self.RATE)
            # Third, a thread that listens for transcription responses
            recognize_stream = service.StreamingRecognize(
                requests, self.DEADLINE_SECS)

            # Exit things cleanly on interrupt
            signal.signal(signal.SIGINT, lambda *_: recognize_stream.cancel())

            # Now, put the transcription responses to use.
            try:
                while True:
                    self.listen_print_loop(recognize_stream, buff)

                    # Discard this stream and create a new one.
                    # Note: calling .cancel() doesn't immediately raise an RpcError
                    # - it only raises when the iterator's next() is requested
                    recognize_stream.cancel()

                    requests = self.request_stream(self._audio_data_generator(
                        buff, overlap_buffer), self.RATE)
                    # Third, a thread that listens for transcription responses
                    recognize_stream = service.StreamingRecognize(
                        requests, self.DEADLINE_SECS)

            except grpc.RpcError:
                # This happens because of the interrupt handler
                pass
项目:grpc-opentracing    作者:grpc-ecosystem    | 项目源码 | 文件源码
def testUnaryUnaryOpenTracing(self):
        multi_callable = self._service.unary_unary_multi_callable
        request = b'\x01'
        self.assertRaises(grpc.RpcError, multi_callable, request)

        span0 = self._tracer.get_span(0)
        self.assertIsNotNone(span0)
        self.assertTrue(span0.get_tag('error'))

        span1 = self._tracer.get_span(1)
        self.assertIsNotNone(span1)
        self.assertTrue(span1.get_tag('error'))
项目:grpc-opentracing    作者:grpc-ecosystem    | 项目源码 | 文件源码
def testUnaryUnaryOpenTracingWithCall(self):
        multi_callable = self._service.unary_unary_multi_callable
        request = b'\x01'
        self.assertRaises(grpc.RpcError, multi_callable.with_call, request)

        span0 = self._tracer.get_span(0)
        self.assertIsNotNone(span0)
        self.assertTrue(span0.get_tag('error'))

        span1 = self._tracer.get_span(1)
        self.assertIsNotNone(span1)
        self.assertTrue(span1.get_tag('error'))
项目:grpc-opentracing    作者:grpc-ecosystem    | 项目源码 | 文件源码
def testUnaryStreamOpenTracing(self):
        multi_callable = self._service.unary_stream_multi_callable
        request = b'\x01'
        response = multi_callable(request)
        self.assertRaises(grpc.RpcError, list, response)

        span0 = self._tracer.get_span(0)
        self.assertIsNotNone(span0)
        self.assertTrue(span0.get_tag('error'))

        span1 = self._tracer.get_span(1)
        self.assertIsNotNone(span1)
        self.assertTrue(span1.get_tag('error'))
项目:grpc-opentracing    作者:grpc-ecosystem    | 项目源码 | 文件源码
def testStreamUnaryOpenTracing(self):
        multi_callable = self._service.stream_unary_multi_callable
        requests = [b'\x01', b'\x02']
        self.assertRaises(grpc.RpcError, multi_callable, iter(requests))

        span0 = self._tracer.get_span(0)
        self.assertIsNotNone(span0)
        self.assertTrue(span0.get_tag('error'))

        span1 = self._tracer.get_span(1)
        self.assertIsNotNone(span1)
        self.assertTrue(span1.get_tag('error'))
项目:grpc-opentracing    作者:grpc-ecosystem    | 项目源码 | 文件源码
def testStreamUnaryOpenTracingWithCall(self):
        multi_callable = self._service.stream_unary_multi_callable
        requests = [b'\x01', b'\x02']
        self.assertRaises(grpc.RpcError, multi_callable.with_call,
                          iter(requests))

        span0 = self._tracer.get_span(0)
        self.assertIsNotNone(span0)
        self.assertTrue(span0.get_tag('error'))

        span1 = self._tracer.get_span(1)
        self.assertIsNotNone(span1)
        self.assertTrue(span1.get_tag('error'))
项目:grpc-opentracing    作者:grpc-ecosystem    | 项目源码 | 文件源码
def testUnaryUnaryOpenTracing(self):
        multi_callable = self._service.unary_unary_multi_callable
        request = b'\x01'
        self.assertRaises(grpc.RpcError, multi_callable, request)

        span0 = self._tracer.get_span(0)
        self.assertIsNotNone(span0)
        self.assertTrue(span0.get_tag('error'))

        span1 = self._tracer.get_span(1)
        self.assertIsNotNone(span1)
        self.assertTrue(span1.get_tag('error'))
项目:grpc-opentracing    作者:grpc-ecosystem    | 项目源码 | 文件源码
def testUnaryUnaryOpenTracingWithCall(self):
        multi_callable = self._service.unary_unary_multi_callable
        request = b'\x01'
        self.assertRaises(grpc.RpcError, multi_callable.with_call, request)

        span0 = self._tracer.get_span(0)
        self.assertIsNotNone(span0)
        self.assertTrue(span0.get_tag('error'))

        span1 = self._tracer.get_span(1)
        self.assertIsNotNone(span1)
        self.assertTrue(span1.get_tag('error'))
项目:grpc-opentracing    作者:grpc-ecosystem    | 项目源码 | 文件源码
def testUnaryStreamOpenTracing(self):
        multi_callable = self._service.unary_stream_multi_callable
        request = b'\x01'
        response = multi_callable(request)
        self.assertRaises(grpc.RpcError, list, response)

        span0 = self._tracer.get_span(0)
        self.assertIsNotNone(span0)
        self.assertTrue(span0.get_tag('error'))

        span1 = self._tracer.get_span(1)
        self.assertIsNotNone(span1)
        self.assertTrue(span1.get_tag('error'))
项目:grpc-opentracing    作者:grpc-ecosystem    | 项目源码 | 文件源码
def testStreamUnaryOpenTracing(self):
        multi_callable = self._service.stream_unary_multi_callable
        requests = [b'\x01', b'\x02']
        self.assertRaises(grpc.RpcError, multi_callable, iter(requests))

        span0 = self._tracer.get_span(0)
        self.assertIsNotNone(span0)
        self.assertTrue(span0.get_tag('error'))

        span1 = self._tracer.get_span(1)
        self.assertIsNotNone(span1)
        self.assertTrue(span1.get_tag('error'))
项目:grpc-opentracing    作者:grpc-ecosystem    | 项目源码 | 文件源码
def testStreamUnaryOpenTracingWithCall(self):
        multi_callable = self._service.stream_unary_multi_callable
        requests = [b'\x01', b'\x02']
        self.assertRaises(grpc.RpcError, multi_callable.with_call,
                          iter(requests))

        span0 = self._tracer.get_span(0)
        self.assertIsNotNone(span0)
        self.assertTrue(span0.get_tag('error'))

        span1 = self._tracer.get_span(1)
        self.assertIsNotNone(span1)
        self.assertTrue(span1.get_tag('error'))
项目:hops-tensorflow    作者:hopshadoop    | 项目源码 | 文件源码
def register_container(self, application_id, ip, port, job_name, task_index, tb_port):
    container = csg.Container()
    container.applicationId = application_id
    container.ip = ip
    container.port = port
    container.jobName = job_name
    container.taskIndex = task_index
    container.tbPort = tb_port
    request = csg.RegisterContainerRequest(container=container)
    try:
      self.stub.RegisterContainer(request)
    except grpc.RpcError:
      return False
    return True
项目:hops-tensorflow    作者:hopshadoop    | 项目源码 | 文件源码
def get_cluster_spec(self, application_id):
    request = csg.GetClusterSpecRequest()
    request.applicationId = application_id
    try:
      reply = self.stub.GetClusterSpec(request)
    except grpc.RpcError:
      return None
    return reply.clusterSpec
项目:python-grpc-demo    作者:amitsaha    | 项目源码 | 文件源码
def _wrapped_call(self, *args, **kwargs):
        try:
            return getattr(args[0], args[1])(
                args[2], **kwargs, timeout=self.timeout
            )
        except grpc.RpcError as e:
            print('Call {0} failed with {1}'.format(
                args[1], e.code())
            )
            raise
项目:python-grpc-demo    作者:amitsaha    | 项目源码 | 文件源码
def run():
    # read in certificate
    with open('server.crt') as f:
        trusted_certs = f.read().encode()

    # create credentials
    credentials = grpc.ssl_channel_credentials(root_certificates=trusted_certs)
    channel = grpc.secure_channel('localhost:50051', credentials)
    try:
        grpc.channel_ready_future(channel).result(timeout=10)
    except grpc.FutureTimeoutError:
        sys.exit('Error connecting to server')
    else:
        stub = users_service.UsersStub(channel)
        metadata = [('ip', '127.0.0.1')]

        try:
            response = stub.CreateUser(
                users_messages.CreateUserRequest(username='tom'),
                metadata=metadata,
            )
        except grpc.RpcError as e:
            print('CreateUser failed with {0}: {1}'.format(e.code(), e.details()))
        else:
            print("User created:", response.user.username)

        request = users_messages.GetUsersRequest(
            user=[users_messages.User(username="alexa", user_id=1),
                  users_messages.User(username="christie", user_id=1)]
        )
        response = stub.GetUsers(request)
        for resp in response:
            print(resp)
项目:python-grpc-demo    作者:amitsaha    | 项目源码 | 文件源码
def _wrapped_call(self, *args, **kwargs):
        try:
            return getattr(args[0], args[1])(
                args[2], **kwargs, timeout=self.timeout
            )
        except grpc.RpcError as e:
            print('Call {0} failed with {1}'.format(
                args[1], e.code())
            )
            raise
项目:QRL    作者:theQRL    | 项目源码 | 文件源码
def tx_prepare(ctx, src, dst, amount, fee, pk, otsidx):
    """
    Request a tx blob (unsigned) to transfer from src to dst (uses local wallet)
    """
    try:
        address_src, src_xmss = _select_wallet(ctx, src)
        if src_xmss:
            address_src_pk = src_xmss.pk()
            address_src_otsidx = src_xmss.get_index()
        else:
            address_src_pk = pk.encode()
            address_src_otsidx = int(otsidx)

        address_dst = dst.encode()
        amount_shor = int(amount * 1.e8)
        fee_shor = int(fee * 1.e8)
    except Exception as e:
        click.echo("Error validating arguments")
        quit(1)

    channel = grpc.insecure_channel(ctx.obj.node_public_address)
    stub = qrl_pb2_grpc.PublicAPIStub(channel)
    # FIXME: This could be problematic. Check
    transferCoinsReq = qrl_pb2.TransferCoinsReq(address_from=address_src,
                                                address_to=address_dst,
                                                amount=amount_shor,
                                                fee=fee_shor,
                                                xmss_pk=address_src_pk,
                                                xmss_ots_index=address_src_otsidx)

    try:
        transferCoinsResp = stub.TransferCoins(transferCoinsReq, timeout=5)
    except grpc.RpcError as e:
        click.echo(e.details())
        quit(1)
    except Exception as e:
        click.echo("Unhandled error: {}".format(str(e)))
        quit(1)

    txblob = bin2hstr(transferCoinsResp.transaction_unsigned.SerializeToString())
    print(txblob)
项目:aioetcd3    作者:gaopeiliang    | 项目源码 | 文件源码
def test_watch_exception(self):
        f1 = asyncio.get_event_loop().create_future()
        f2 = asyncio.get_event_loop().create_future()
        async def watch_1():
            i = 0
            async with self.client.watch_scope('/foo') as response:
                f1.set_result(None)
                with self.assertRaises(RpcError):
                    async for event in response:
                        i = i + 1
                        if i == 1:
                            self.assertEqual(event.type, EVENT_TYPE_CREATE)
                            self.assertEqual(event.key, b'/foo')
                            self.assertEqual(event.value, b'foo')
                            f2.set_result(None)
                        elif i == 2:
                            raise ValueError("Not raised")
        f3 = asyncio.get_event_loop().create_future()
        f4 = asyncio.get_event_loop().create_future()
        async def watch_2():
            i = 0
            async with self.client.watch_scope('/foo', always_reconnect=True) as response:
                f3.set_result(None)
                async for event in response:
                    i = i + 1
                    if i == 1:
                        self.assertEqual(event.type, EVENT_TYPE_CREATE)
                        self.assertEqual(event.key, b'/foo')
                        self.assertEqual(event.value, b'foo')
                        f4.set_result(None)
                    elif i == 2:
                        self.assertEqual(event.type, EVENT_TYPE_MODIFY)
                        self.assertEqual(event.key, b'/foo')
                        self.assertEqual(event.value, b'foo1')
                    elif i == 3:
                        self.assertEqual(event.type, EVENT_TYPE_DELETE)
                        self.assertEqual(event.key, b'/foo')
                        # delete event has no value
                        # self.assertEqual(event.value, b'foo1')
                        break
        t1 = asyncio.ensure_future(watch_1())
        t2 = asyncio.ensure_future(watch_2())
        await f1
        await f3
        await self.client.put('/foo', 'foo')
        await f2
        await f4
        fake_endpoints = 'ipv4:///127.0.0.1:49999'
        self.client.update_server_list(fake_endpoints)
        await asyncio.sleep(2)
        self.client.update_server_list(self.endpoints)
        await self.client.put('/foo', 'foo1')
        await self.client.delete('/foo')
        await t1
        await t2