Python asyncio 模块,shield() 实例源码

我们从Python开源项目中,提取了以下43个代码示例,用于说明如何使用asyncio.shield()

项目:rci    作者:seecloud    | 项目源码 | 文件源码
def run(self):
        _out_cb = functools.partial(self._console_cb, 1)
        _err_cb = functools.partial(self._console_cb, 2)
        self._update_status("boot")
        await asyncio.shield(self._get_cluster())
        for vm, scripts in self.config["scripts"].items():
            vm = self.cluster.vms[vm]
            for script_name in scripts:
                script = self.root.config.get_script(script_name)
                self.root.log.debug("%s: running script: %s", self, script)
                self._update_status("running " + script_name)
                error = await vm.run_script(self.root.loop, script, self.env,
                                            _out_cb, _err_cb)
                if error:
                    self.root.log.debug("%s error in script %s", self, script)
                    self._update_status("failure")
                    self.console_callbacks = set()
                    return error
        self.console_callbacks = set()
        self._update_status("success")
        self.root.log.debug("%s all scripts success", self)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_shield_effect(self):
        # Cancelling outer() does not affect inner().
        proof = 0
        waiter = asyncio.Future(loop=self.loop)

        @asyncio.coroutine
        def inner():
            nonlocal proof
            yield from waiter
            proof += 1

        @asyncio.coroutine
        def outer():
            nonlocal proof
            yield from asyncio.shield(inner(), loop=self.loop)
            proof += 100

        f = asyncio.async(outer(), loop=self.loop)
        test_utils.run_briefly(self.loop)
        f.cancel()
        with self.assertRaises(asyncio.CancelledError):
            self.loop.run_until_complete(f)
        waiter.set_result(None)
        test_utils.run_briefly(self.loop)
        self.assertEqual(proof, 1)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_gather_shield(self):
        child1 = asyncio.Future(loop=self.loop)
        child2 = asyncio.Future(loop=self.loop)
        inner1 = asyncio.shield(child1, loop=self.loop)
        inner2 = asyncio.shield(child2, loop=self.loop)
        parent = asyncio.gather(inner1, inner2, loop=self.loop)
        test_utils.run_briefly(self.loop)
        parent.cancel()
        # This should cancel inner1 and inner2 but bot child1 and child2.
        test_utils.run_briefly(self.loop)
        self.assertIsInstance(parent.exception(), asyncio.CancelledError)
        self.assertTrue(inner1.cancelled())
        self.assertTrue(inner2.cancelled())
        child1.set_result(1)
        child2.set_result(2)
        test_utils.run_briefly(self.loop)
项目:jd4    作者:vijos    | 项目源码 | 文件源码
def wait_cgroup(sock, execute_task, time_limit_ns, memory_limit_bytes, process_limit):
    cgroup = CGroup()
    try:
        cgroup.memory_limit_bytes = memory_limit_bytes
        cgroup.pids_max = process_limit
        await cgroup.accept(sock)
        start_idle = _get_idle()

        while True:
            cpu_usage_ns = cgroup.cpu_usage_ns
            idle_usage_ns = int((_get_idle() - start_idle) / cpu_count() * 1e9)
            time_usage_ns = max(cpu_usage_ns, idle_usage_ns)
            time_remain_ns = time_limit_ns - time_usage_ns
            if time_remain_ns <= 0:
                return time_usage_ns, cgroup.memory_usage_bytes
            try:
                await wait_for(shield(execute_task), (time_remain_ns + WAIT_JITTER_NS) / 1e9)
                return cgroup.cpu_usage_ns, cgroup.memory_usage_bytes
            except TimeoutError:
                pass
    finally:
        while cgroup.kill():
            await sleep(.001)
        cgroup.close()
项目:urban-journey    作者:urbanjourney    | 项目源码 | 文件源码
def trigger(self, data, *args, **kwargs):
        """
        Triggers all activities connected to his port.

        :param data: The data being transmitted.
        :param args:  Random stuff
        :param kwargs: More random stuff
        """
        # ctlog.debug("InputPort.trigger({})".format(data))
        # Only transmit the data if there are activities connected to this port.
        if len(self._activities):
            futures = [None] * len(self._activities)
            for i, activity in enumerate(self._activities):
                futures[i] = activity.trigger([self], {self.attribute_name: data}, self.parent_object, *args, **kwargs)

            try:
                # TODO: This will stop calling modules as soon as one raises an exception. Figure out a way to handle
                #       exceptions individually for each future.
                await wait_for(shield(wait(futures)), self.time_out)
            except Exception as e:
                print(self.channel_name, self.time_out)
                self.parent_object.root.handle_exception(sys.exc_info())
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_shield_effect(self):
        # Cancelling outer() does not affect inner().
        proof = 0
        waiter = asyncio.Future(loop=self.loop)

        @asyncio.coroutine
        def inner():
            nonlocal proof
            yield from waiter
            proof += 1

        @asyncio.coroutine
        def outer():
            nonlocal proof
            yield from asyncio.shield(inner(), loop=self.loop)
            proof += 100

        f = asyncio.ensure_future(outer(), loop=self.loop)
        test_utils.run_briefly(self.loop)
        f.cancel()
        with self.assertRaises(asyncio.CancelledError):
            self.loop.run_until_complete(f)
        waiter.set_result(None)
        test_utils.run_briefly(self.loop)
        self.assertEqual(proof, 1)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_gather_shield(self):
        child1 = asyncio.Future(loop=self.loop)
        child2 = asyncio.Future(loop=self.loop)
        inner1 = asyncio.shield(child1, loop=self.loop)
        inner2 = asyncio.shield(child2, loop=self.loop)
        parent = asyncio.gather(inner1, inner2, loop=self.loop)
        test_utils.run_briefly(self.loop)
        parent.cancel()
        # This should cancel inner1 and inner2 but bot child1 and child2.
        test_utils.run_briefly(self.loop)
        self.assertIsInstance(parent.exception(), asyncio.CancelledError)
        self.assertTrue(inner1.cancelled())
        self.assertTrue(inner2.cancelled())
        child1.set_result(1)
        child2.set_result(2)
        test_utils.run_briefly(self.loop)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_shield_effect(self):
        # Cancelling outer() does not affect inner().
        proof = 0
        waiter = asyncio.Future(loop=self.loop)

        @asyncio.coroutine
        def inner():
            nonlocal proof
            yield from waiter
            proof += 1

        @asyncio.coroutine
        def outer():
            nonlocal proof
            yield from asyncio.shield(inner(), loop=self.loop)
            proof += 100

        f = asyncio.async(outer(), loop=self.loop)
        test_utils.run_briefly(self.loop)
        f.cancel()
        with self.assertRaises(asyncio.CancelledError):
            self.loop.run_until_complete(f)
        waiter.set_result(None)
        test_utils.run_briefly(self.loop)
        self.assertEqual(proof, 1)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_gather_shield(self):
        child1 = asyncio.Future(loop=self.loop)
        child2 = asyncio.Future(loop=self.loop)
        inner1 = asyncio.shield(child1, loop=self.loop)
        inner2 = asyncio.shield(child2, loop=self.loop)
        parent = asyncio.gather(inner1, inner2, loop=self.loop)
        test_utils.run_briefly(self.loop)
        parent.cancel()
        # This should cancel inner1 and inner2 but bot child1 and child2.
        test_utils.run_briefly(self.loop)
        self.assertIsInstance(parent.exception(), asyncio.CancelledError)
        self.assertTrue(inner1.cancelled())
        self.assertTrue(inner2.cancelled())
        child1.set_result(1)
        child2.set_result(2)
        test_utils.run_briefly(self.loop)
项目:chrome-prerender    作者:bosondata    | 项目源码 | 文件源码
def render(self, url: str, format: str = 'html') -> str:
        if not self._pages:
            raise RuntimeError('No browser available')

        try:
            page = await asyncio.wait_for(self._idle_pages.get(), timeout=10)
        except asyncio.TimeoutError:
            raise TemporaryBrowserFailure('No Chrome page available in 10s')

        reopen = False
        try:
            try:
                await page.attach()
            except asyncio.TimeoutError:
                logger.error('Attach to Chrome page %s timed out, page is likely closed', page.id)
                reopen = True
                raise TemporaryBrowserFailure('Attach to Chrome page timed out')
            data = await asyncio.wait_for(page.render(url, format), timeout=PRERENDER_TIMEOUT)
            return data
        except InvalidHandshake:
            logger.error('Chrome invalid handshake for page %s', page.id)
            reopen = True
            raise TemporaryBrowserFailure('Invalid handshake')
        except ConnectionClosed:
            logger.error('Chrome remote connection closed for page %s', page.id)
            reopen = True
            raise TemporaryBrowserFailure('Chrome remote debugging connection closed')
        except RuntimeError as e:
            # https://github.com/MagicStack/uvloop/issues/68
            if 'unable to perform operation' in str(e):
                logger.error('RuntimeError: %s', str(e))
                reopen = True
                raise TemporaryBrowserFailure(str(e))
            else:
                raise
        finally:
            await asyncio.shield(self._manage_page(page, reopen))
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_shield_result(self):
        inner = asyncio.Future(loop=self.loop)
        outer = asyncio.shield(inner)
        inner.set_result(42)
        res = self.loop.run_until_complete(outer)
        self.assertEqual(res, 42)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_shield_exception(self):
        inner = asyncio.Future(loop=self.loop)
        outer = asyncio.shield(inner)
        test_utils.run_briefly(self.loop)
        exc = RuntimeError('expected')
        inner.set_exception(exc)
        test_utils.run_briefly(self.loop)
        self.assertIs(outer.exception(), exc)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_shield_cancel(self):
        inner = asyncio.Future(loop=self.loop)
        outer = asyncio.shield(inner)
        test_utils.run_briefly(self.loop)
        inner.cancel()
        test_utils.run_briefly(self.loop)
        self.assertTrue(outer.cancelled())
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_shield_shortcut(self):
        fut = asyncio.Future(loop=self.loop)
        fut.set_result(42)
        res = self.loop.run_until_complete(asyncio.shield(fut))
        self.assertEqual(res, 42)
项目:aiotask-context    作者:Skyscanner    | 项目源码 | 文件源码
def my_coro_2():
    context.set("request_id", str(uuid.uuid4()))
    await asyncio.gather(
        asyncio.ensure_future(my_coro(0)),
        asyncio.wait_for(my_coro(1), 1),
        asyncio.shield(asyncio.wait_for(my_coro(2), 1)),
        my_coro(3))
项目:mockaioredis    作者:kblin    | 项目源码 | 文件源码
def wait_closed(self):
        'wait until pool is closed'
        await asyncio.shield(self._close_waiter, loop=self._loop)
项目:aiossdb    作者:Microndgt    | 项目源码 | 文件源码
def wait_closed(self):
        """?? ??????????????self._close_waiter???"""
        yield from asyncio.shield(self._close_waiter, loop=self._loop)
项目:aiossdb    作者:Microndgt    | 项目源码 | 文件源码
def wait_closed(self):
        """?????????????????????????????Future???"""
        yield from asyncio.shield(self._waiter, loop=self._loop)
项目:jd4    作者:vijos    | 项目源码 | 文件源码
def build(self):
        self.next(status=STATUS_COMPILING)
        package, message, _, _ = await shield(pool_build(self.lang, self.code))
        self.next(compiler_text=message)
        if not package:
            logger.debug('Compile error: %s', message)
            raise CompileError(message)
        return package
项目:jd4    作者:vijos    | 项目源码 | 文件源码
def judge(self, cases_file, package):
        loop = get_event_loop()
        self.next(status=STATUS_JUDGING, progress=0)
        cases = list(read_cases(cases_file))
        total_status = STATUS_ACCEPTED
        total_score = 0
        total_time_usage_ns = 0
        total_memory_usage_bytes = 0
        judge_tasks = list()
        for case in cases:
            judge_tasks.append(loop.create_task(pool_judge(package, case)))
        for index, judge_task in enumerate(judge_tasks):
            status, score, time_usage_ns, memory_usage_bytes, stderr = await shield(judge_task)
            if self.type == 1:
                judge_text = stderr.decode(encoding='utf-8', errors='replace')
            else:
                judge_text = ''
            self.next(status=STATUS_JUDGING,
                      case={'status': status,
                            'score': score,
                            'time_ms': time_usage_ns // 1000000,
                            'memory_kb': memory_usage_bytes // 1024,
                            'judge_text': judge_text},
                      progress=(index + 1) * 100 // len(cases))
            total_status = max(total_status, status)
            total_score += score
            total_time_usage_ns += time_usage_ns
            total_memory_usage_bytes = max(total_memory_usage_bytes, memory_usage_bytes)
        self.end(status=total_status,
                 score=total_score,
                 time_ms=total_time_usage_ns // 1000000,
                 memory_kb=total_memory_usage_bytes // 1024)
项目:guillotina    作者:plone    | 项目源码 | 文件源码
def commit(self, request=None, txn=None):
        return await shield(self._commit(request=request, txn=txn))
项目:guillotina    作者:plone    | 项目源码 | 文件源码
def abort(self, request=None, txn=None):
        return await shield(self._abort(request=request, txn=txn))
项目:guillotina    作者:plone    | 项目源码 | 文件源码
def finalize(self):
        await self._vacuum.finalize()
        self._vacuum_task.cancel()
        await shield(self._pool.release(self._read_conn))
        await self._pool.close()
项目:guillotina    作者:plone    | 项目源码 | 文件源码
def close(self, con):
        try:
            await shield(self._pool.release(con))
        except (asyncio.CancelledError, asyncpg.exceptions.ConnectionDoesNotExistError,
                RuntimeError):
            pass
项目:azmq    作者:ereOn    | 项目源码 | 文件源码
def _open_inproc_connection(self, path):
        server_future = self._inproc_servers.get(path)
        server = None

        while not server or server.closing:
            if not server_future or server_future.cancelled():
                server_future = self._inproc_servers[path] = \
                    asyncio.Future(loop=self.loop)

            await asyncio.shield(server_future)
            server = server_future.result()
            server_future = None

        return await server.create_channel(path)
项目:azmq    作者:ereOn    | 项目源码 | 文件源码
def wait_closed(self):
        """
        Wait for the instance to be closed.
        """
        # Prevents the future from being cancelled in case the wait itself gets
        # cancelled.
        await asyncio.shield(self._closed_future)
        return self._closed_future.result()
项目:BAG_framework    作者:ucb-art    | 项目源码 | 文件源码
def _kill_subprocess(self, proc):
        # type: (Optional[Process]) -> None
        """Helper method; send SIGTERM/SIGKILL to a subprocess.

        This method first sends SIGTERM to the subprocess.  If the process hasn't terminated
        after a given timeout, it sends SIGKILL.

        Parameter
        ---------
        proc : Optional[Process]
            the process to attempt to terminate.  If None, this method does nothing.
        """
        if proc is not None:
            if proc.returncode is None:
                try:
                    proc.terminate()
                    try:
                        await asyncio.shield(asyncio.wait_for(proc.wait(), self._cancel_timeout))
                    except CancelledError:
                        pass

                    if proc.returncode is None:
                        proc.kill()
                        try:
                            await asyncio.shield(asyncio.wait_for(proc.wait(), self._cancel_timeout))
                        except CancelledError:
                            pass
                except ProcessLookupError:
                    pass
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_shield_result(self):
        inner = asyncio.Future(loop=self.loop)
        outer = asyncio.shield(inner)
        inner.set_result(42)
        res = self.loop.run_until_complete(outer)
        self.assertEqual(res, 42)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_shield_exception(self):
        inner = asyncio.Future(loop=self.loop)
        outer = asyncio.shield(inner)
        test_utils.run_briefly(self.loop)
        exc = RuntimeError('expected')
        inner.set_exception(exc)
        test_utils.run_briefly(self.loop)
        self.assertIs(outer.exception(), exc)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_shield_cancel(self):
        inner = asyncio.Future(loop=self.loop)
        outer = asyncio.shield(inner)
        test_utils.run_briefly(self.loop)
        inner.cancel()
        test_utils.run_briefly(self.loop)
        self.assertTrue(outer.cancelled())
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_shield_shortcut(self):
        fut = asyncio.Future(loop=self.loop)
        fut.set_result(42)
        res = self.loop.run_until_complete(asyncio.shield(fut))
        self.assertEqual(res, 42)
项目:asyncio-coroutine-patterns    作者:yeraydiazdiaz    | 项目源码 | 文件源码
def get_comments_of_top_stories(loop, session, limit, iteration):
    """Retrieve top stories in HN.

    """
    fetcher = URLFetcher()  # create a new fetcher for this task
    try:
        response = await fetcher.fetch(session, TOP_STORIES_URL)
    except BoomException as e:
        log.error("Error retrieving top stories: {}".format(e))
        # return instead of re-raising as it will go unnoticed
        return
    except Exception as e:  # catch generic exceptions
        log.error("Unexpected exception: {}".format(e))
        return

    tasks = {
        asyncio.ensure_future(
            post_number_of_comments(loop, session, fetcher, post_id)
        ): post_id for post_id in response[:limit]}

    # return on first exception to cancel any pending tasks
    done, pending = await asyncio.shield(asyncio.wait(
        tasks.keys(), return_when=FIRST_EXCEPTION))

    # if there are pending tasks is because there was an exception
    # cancel any pending tasks
    for pending_task in pending:
        pending_task.cancel()

    # process the done tasks
    for done_task in done:
        # if an exception is raised one of the Tasks will raise
        try:
            print("Post {} has {} comments ({})".format(
                tasks[done_task], done_task.result(), iteration))
        except BoomException as e:
            print("Error retrieving comments for top stories: {}".format(e))

    return fetcher.fetch_counter
项目:aiothrift    作者:ryanwang520    | 项目源码 | 文件源码
def wait_closed(self):
        for task in self._release_tasks:
            yield from asyncio.shield(task, loop=self._loop)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_shield_result(self):
        inner = asyncio.Future(loop=self.loop)
        outer = asyncio.shield(inner)
        inner.set_result(42)
        res = self.loop.run_until_complete(outer)
        self.assertEqual(res, 42)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_shield_exception(self):
        inner = asyncio.Future(loop=self.loop)
        outer = asyncio.shield(inner)
        test_utils.run_briefly(self.loop)
        exc = RuntimeError('expected')
        inner.set_exception(exc)
        test_utils.run_briefly(self.loop)
        self.assertIs(outer.exception(), exc)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_shield_cancel(self):
        inner = asyncio.Future(loop=self.loop)
        outer = asyncio.shield(inner)
        test_utils.run_briefly(self.loop)
        inner.cancel()
        test_utils.run_briefly(self.loop)
        self.assertTrue(outer.cancelled())
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_shield_shortcut(self):
        fut = asyncio.Future(loop=self.loop)
        fut.set_result(42)
        res = self.loop.run_until_complete(asyncio.shield(fut))
        self.assertEqual(res, 42)
项目:waspy    作者:wasp    | 项目源码 | 文件源码
def handle_incoming_request(self, reader, writer):
        if self._shutting_down:
            self._count += 1
        self._set_tcp_nodelay(writer)
        protocol = _HTTPServerProtocol(reader=reader, writer=writer,
                                       prefix=self.prefix)
        try:
            while True:
                inner = asyncio.Task(protocol.get_request())
                coro = asyncio.shield(inner)
                self._sleeping_connections.add(coro)
                try:
                    request = await coro
                except asyncio.CancelledError:
                    # Give the request a chance to send a request if it has one
                    if inner.done():
                        request = inner.result()
                    else:
                        request = await asyncio.wait_for(inner, 1)
                self._sleeping_connections.remove(coro)
                response = await self._handler(request)
                await protocol.send_response(response)
                if protocol.conn.our_state is h11.MUST_CLOSE:
                    break
                if self._shutting_down:
                    break
                protocol.conn.start_next_cycle()
        except (ClosedError, ConnectionResetError,
                asyncio.CancelledError, asyncio.TimeoutError):
            pass
        finally:
            writer.close()
项目:jussi    作者:steemit    | 项目源码 | 文件源码
def terminate_connection(self, conn):
        try:
            logger.debug(f'terminating connection:{id(conn)}')
            await asyncio.shield(conn.close_connection(force=True))
            conn.worker_task.cancel()
            conn.close()
            self._terminated.add(conn)
            self.release(conn)
        except Exception as e:
            logger.exception(e)
            self._terminated.remove(conn)
项目:aiogrpc    作者:hubo1016    | 项目源码 | 文件源码
def __anext__(self):
        if self._next_future is None:
            if self._iterator is None:
                raise StopAsyncIteration
            self._next_future = self._loop.run_in_executor(self._stream_executor, self._next)
        try:
            return await asyncio.shield(self._next_future, loop=self._loop)
        finally:
            if self._next_future and self._next_future.done():
                self._next_future = None
项目:rauc-hawkbit    作者:rauc    | 项目源码 | 文件源码
def process_deployment(self, base):
        """
        Check for deployments, download them, verify checksum and trigger
        RAUC install operation.
        """
        if self.action_id is not None:
            self.logger.info('Deployment is already in progress')
            return

        # retrieve action id and resource parameter from URL
        deployment = base['_links']['deploymentBase']['href']
        match = re.search('/deploymentBase/(.+)\?c=(.+)$', deployment)
        action_id, resource = match.groups()
        self.logger.info('Deployment found for this target')
        # fetch deployment information
        deploy_info = await self.ddi.deploymentBase[action_id](resource)
        try:
            chunk = deploy_info['deployment']['chunks'][0]
        except IndexError:
            # send negative feedback to HawkBit
            status_execution = DeploymentStatusExecution.closed
            status_result = DeploymentStatusResult.failure
            msg = 'Deployment without chunks found. Ignoring'
            await self.ddi.deploymentBase[action_id].feedback(
                    status_execution, status_result, [msg])
            raise APIError(msg)

        try:
            artifact = chunk['artifacts'][0]
        except IndexError:
            # send negative feedback to HawkBit
            status_execution = DeploymentStatusExecution.closed
            status_result = DeploymentStatusResult.failure
            msg = 'Deployment without artifacts found. Ignoring'
            await self.ddi.deploymentBase[action_id].feedback(
                    status_execution, status_result, [msg])
            raise APIError(msg)
        # download artifact, check md5 and report feedback
        download_url = artifact['_links']['download-http']['href']
        md5_hash = artifact['hashes']['md5']
        self.logger.info('Starting bundle download')
        await self.download_artifact(action_id, download_url, md5_hash)

        # download successful, start install
        self.logger.info('Starting installation')
        try:
            self.action_id = action_id
            # do not interrupt install call
            await asyncio.shield(self.install())
        except GLib.Error as e:
            # send negative feedback to HawkBit
            status_execution = DeploymentStatusExecution.closed
            status_result = DeploymentStatusResult.failure
            await self.ddi.deploymentBase[action_id].feedback(
                    status_execution, status_result, [str(e)])
            raise APIError(str(e))
项目:aioamqp_consumer    作者:wikibusiness    | 项目源码 | 文件源码
def _worker(self):
        while True:
            try:
                finalizer = delivery_tag = None
                payload, envelope, properties = await self._queue.get()
                delivery_tag = envelope.delivery_tag

                try:
                    coro = await self._run_task(
                        payload,
                        properties,
                        delivery_tag,
                    )

                    finalizer = self.loop.create_task(coro)

                    await asyncio.shield(finalizer, loop=self.loop)
                finally:
                    self._queue.task_done()
            except asyncio.CancelledError:
                msg = 'Worker (queue: %(queue)s) is cancelled'
                context = {'queue': self.queue_name}
                logger.debug(msg, context)

                if finalizer is not None:
                    if not finalizer.done():
                        msg = 'Worker (queue: %(queue)s) doing ' \
                              'finalization while cancellation'
                        context = {'queue': self.queue_name}
                        logger.debug(msg, context)
                        await finalizer

                    break

                if self._down.is_set():
                    break

                if delivery_tag is not None:
                    msg = 'Worker (queue: %(queue)s) doing ' \
                          'basic reject due cancellation'
                    context = {'queue': self.queue_name}
                    await self._basic_reject(delivery_tag, requeue=True)

                break
项目:asyncpg    作者:MagicStack    | 项目源码 | 文件源码
def release(self, connection, *, timeout=None):
        """Release a database connection back to the pool.

        :param Connection connection:
            A :class:`~asyncpg.connection.Connection` object to release.
        :param float timeout:
            A timeout for releasing the connection.  If not specified, defaults
            to the timeout provided in the corresponding call to the
            :meth:`Pool.acquire() <asyncpg.pool.Pool.acquire>` method.

        .. versionchanged:: 0.14.0
            Added the *timeout* parameter.
        """
        async def _release_impl(ch: PoolConnectionHolder, timeout: float):
            try:
                await ch.release(timeout)
            finally:
                self._queue.put_nowait(ch)

        self._check_init()

        if (type(connection) is not PoolConnectionProxy or
                connection._holder._pool is not self):
            raise exceptions.InterfaceError(
                'Pool.release() received invalid connection: '
                '{connection!r} is not a member of this pool'.format(
                    connection=connection))

        if connection._con is None:
            # Already released, do nothing.
            return

        con = connection._detach()
        con._on_release()

        if timeout is None:
            timeout = connection._holder._timeout

        # Use asyncio.shield() to guarantee that task cancellation
        # does not prevent the connection from being returned to the
        # pool properly.
        return await asyncio.shield(
            _release_impl(connection._holder, timeout), loop=self._loop)