Python inspect 模块,isawaitable() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用inspect.isawaitable()

项目:pcbot    作者:pckv    | 项目源码 | 文件源码
def eval_(message: discord.Message, python_code: Annotate.Code):
    """ Evaluate a python expression. Can be any python code on one
    line that returns something. Coroutine generators will by awaited.
    """
    code_globals.update(dict(message=message, client=client,
                             author=message.author, server=message.server, channel=message.channel))

    before = datetime.now()
    try:
        result = eval(python_code, code_globals)
        if inspect.isawaitable(result):
            result = await result
    except SyntaxError as e:
        result = utils.format_syntax_error(e)
    except Exception as e:
        result = utils.format_exception(e)

    await send_result(message.channel, result, datetime.now() - before)
项目:django-sanic-adaptor    作者:ashleysommer    | 项目源码 | 文件源码
def async_legacy_get_response_dj_1_10(self, request):
        """
        Apply process_request() middleware and call the main _get_response(),
        if needed. Used only for legacy MIDDLEWARE_CLASSES.
        """
        response = None
        # Apply request middleware
        for middleware_method in self._request_middleware:
            response = middleware_method(request)
            if isawaitable(response):
                response = await response
            if response:
                break

        if response is None:
            response = await self._get_response_inner_dj_1_10(request)

        return response

    # This function is protected under the Django BSD 3-Clause licence
    # This function is reproduced under the terms of the Django Licence
    # See DJANGO_LICENCE in this source code repository
项目:youtube    作者:FishyFing    | 项目源码 | 文件源码
def debug(self, ctx, *, code: str):
        """Evaluates code."""
        code = code.strip('` ')
        python = '```py\n{}\n```'

        env = {
            'bot': self.bot,
            'ctx': ctx,
            'message': ctx.message,
            'guild': ctx.guild,
            'channel': ctx.channel,
            'author': ctx.author
        }

        env.update(globals())

        try:
            result = eval(code, env)
            if inspect.isawaitable(result):
                result = await result
        except Exception as e:
            await ctx.send(python.format(type(e).__name__ + ': ' + str(e)))
            return

        await ctx.send(python.format(result))
项目:lagbot    作者:mikevb1    | 项目源码 | 文件源码
def debug(self, ctx, *, code: cleanup_code):
        """Evaluates code."""
        result = None
        env = get_env(ctx)
        env['__'] = self.last_eval

        try:
            result = eval(code, env)
            if inspect.isawaitable(result):
                result = await result
            if isinstance(result, discord.Embed):
                await ctx.send(embed=result)
                return
        except Exception as e:
            say = self.eval_output(exception_signature())
        else:
            say = self.eval_output(rep(result))
        if say is None:
            say = 'None'
        await ctx.send(say)
项目:curious    作者:SunDwarf    | 项目源码 | 文件源码
def can_run(self, cmd) -> Tuple[bool, list]:
        """
        Checks if a command can be ran.

        :return: If it can be ran, and a list of conditions that failed.
        """
        conditions = getattr(cmd, "cmd_conditions", [])
        failed = []
        for condition in conditions:
            try:
                success = condition(self)
                if inspect.isawaitable(success):
                    success = await success
            except CommandsError:
                raise
            except Exception:
                failed.append(condition)
            else:
                if not success:
                    failed.append(success)

        if failed:
            return False, failed

        return True, []
项目:apex-sigma    作者:lu-ci    | 项目源码 | 文件源码
def evaluate(cmd, message, args):
    if message.author.id in permitted_id:
        if not args:
            await message.channel.send(cmd.help())
        else:
            try:
                execution = " ".join(args)
                output = eval(execution)
                if inspect.isawaitable(output):
                    output = await output
                status = discord.Embed(title='? Executed', color=0x66CC66)
                if output:
                    try:
                        status.add_field(name='Results', value='\n```\n' + str(output) + '\n```')
                    except:
                        pass
            except Exception as e:
                cmd.log.error(e)
                status = discord.Embed(color=0xDB0000, title='? Error')
                status.add_field(name='Execution Failed', value=str(e))
            await message.channel.send(None, embed=status)
    else:
        status = discord.Embed(type='rich', color=0xDB0000,
                               title='? Insufficient Permissions. Bot Owner or Server Admin Only.')
        await message.channel.send(None, embed=status)
项目:nougat    作者:NougatWeb    | 项目源码 | 文件源码
def is_middleware(func):
    """
    test whether it is a middleware
    :return: Boolean
    """
    args = list(inspect.signature(func).parameters.items())

    # if not inspect.isawaitable(func):
    #     raise UnknownMiddlewareException("middleware {} should be awaitable".format(func.__name__))

    if len(args) != 3:
        raise UnknownMiddlewareException("middleware {} should has 3 params named req, res and next".format(func.__name__))

    if args[0][0] != 'req':
        raise UnknownMiddlewareException("the first param's name of middleware {} should be req".format(func.__name__))

    if args[1][0] != 'res':
        raise UnknownMiddlewareException("the second param's name of middleware {} should be res".format(func.__name__))

    if args[2][0] != 'next':
        raise UnknownMiddlewareException("the third param's name of middleware {} should be next".format(func.__name__))

    return True
项目:golightan    作者:shirou    | 项目源码 | 文件源码
def ensure_future(coro_or_future, *, loop=None):
    """Wrap a coroutine or an awaitable in a future.

    If the argument is a Future, it is returned directly.
    """
    if futures.isfuture(coro_or_future):
        if loop is not None and loop is not coro_or_future._loop:
            raise ValueError('loop argument must agree with Future')
        return coro_or_future
    elif coroutines.iscoroutine(coro_or_future):
        if loop is None:
            loop = events.get_event_loop()
        task = loop.create_task(coro_or_future)
        if task._source_traceback:
            del task._source_traceback[-1]
        return task
    elif compat.PY35 and inspect.isawaitable(coro_or_future):
        return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
    else:
        raise TypeError('A Future, a coroutine or an awaitable is required')
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def ensure_future(coro_or_future, *, loop=None):
    """Wrap a coroutine or an awaitable in a future.

    If the argument is a Future, it is returned directly.
    """
    if isinstance(coro_or_future, futures.Future):
        if loop is not None and loop is not coro_or_future._loop:
            raise ValueError('loop argument must agree with Future')
        return coro_or_future
    elif coroutines.iscoroutine(coro_or_future):
        if loop is None:
            loop = events.get_event_loop()
        task = loop.create_task(coro_or_future)
        if task._source_traceback:
            del task._source_traceback[-1]
        return task
    elif compat.PY35 and inspect.isawaitable(coro_or_future):
        return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
    else:
        raise TypeError('A Future, a coroutine or an awaitable is required')
项目:sanic-cors    作者:ashleysommer    | 项目源码 | 文件源码
def route_wrapper(self, route, req, context, request_args, request_kw,
                            *decorator_args, **decorator_kw):
        _ = decorator_kw.pop('with_context')  # ignore this.
        _options = decorator_kw
        options = get_cors_options(context.app, _options)
        if options.get('automatic_options') and req.method == 'OPTIONS':
            resp = response.HTTPResponse()
        else:
            resp = route(req, *request_args, **request_kw)
            if resp is not None:
                while isawaitable(resp):
                    resp = await resp
        if resp is not None:
            request_context = context.request[id(req)]
            set_cors_headers(req, resp, context, options)
            request_context[SANIC_CORS_EVALUATED] = "1"
        return resp
项目:aioprometheus    作者:claws    | 项目源码 | 文件源码
def isawaitable(obj):
    ''' Return True if the object is an awaitable or is a function that
    returns an awaitable.

    This function is used internally by aiotesting.
    '''
    if PY35:
        result = inspect.iscoroutinefunction(obj) or inspect.isawaitable(obj)

    elif PY34:
        result = (isinstance(obj, asyncio.Future) or
                  asyncio.iscoroutine(obj) or
                  hasattr(obj, '__await__'))
    else:
        raise Exception(
            'isawaitable is not supported on Python {}'.format(
                sys.version_info))
    return result
项目:aiorethink    作者:lars-tiede    | 项目源码 | 文件源码
def _run_query(query, conn = None):
    """`run()`s query if caller hasn't already done so, then awaits and returns
    its result.

    If run() has already been called, then the query (strictly speaking, the
    awaitable) is just awaited. This gives the caller the opportunity to
    customize the run() call.

    If run() has not been called, then the query is run on the given connection
    (or the default connection). This is more convenient for the caller than
    the other version.
    """
    # run() it if caller didn't do that already
    if not inspect.isawaitable(query):
        if not isinstance(query, r.RqlQuery):
            raise TypeError("query is neither awaitable nor a RqlQuery")
        cn = conn or await db_conn
        query = query.run(cn)

    return await query
项目:asyncpg    作者:MagicStack    | 项目源码 | 文件源码
def test_cursor_iterable_02(self):
        # Test that it's not possible to create a cursor without hold
        # outside of a transaction
        s = await self.con.prepare(
            'DECLARE t BINARY CURSOR WITHOUT HOLD FOR SELECT 1')
        with self.assertRaises(asyncpg.NoActiveSQLTransactionError):
            await s.fetch()

        # Now test that statement.cursor() does not let you
        # iterate over it outside of a transaction
        st = await self.con.prepare('SELECT generate_series(0, 20)')

        it = st.cursor(prefetch=5).__aiter__()
        if inspect.isawaitable(it):
            it = await it

        with self.assertRaisesRegex(asyncpg.NoActiveSQLTransactionError,
                                    'cursor cannot be created.*transaction'):
            await it.__anext__()
项目:Turbo    作者:jaydenkieran    | 项目源码 | 文件源码
def c_eval(self, message, server, channel, author, stmt, args):
        """
        Evaluates Python code

        {prefix}eval <code>

        If the result is a coroutine, it will be awaited
        """
        stmt = ' '.join([stmt, *args])
        try:
            result = eval(stmt)
            if inspect.isawaitable(result):
                result = await result
        except Exception as e:
            exc = traceback.format_exc().splitlines()
            result = exc[-1]
        log.debug("Evaluated: {} - Result was: {}".format(stmt, result))
        return Response("```xl\n--- In ---\n{}\n--- Out ---\n{}\n```".format(stmt, result))
项目:CEX.IO-Client-Python3.5    作者:cexioltd    | 项目源码 | 文件源码
def is_awaitable(obj):
        # There is no single method which can answer in any case, should wait or not - so need to create one
        # for the suspected cases : func, coro, gen-coro, future,
        #                           class with sync __call__, class with async __call__,
        #                           sync method, async method
        if inspect.isawaitable(obj) or inspect.iscoroutinefunction(obj) or inspect.iscoroutine(obj):
            return True
        elif inspect.isgeneratorfunction(obj):
            return True
        elif CallChain.is_user_defined_class(obj):
            if hasattr(obj, '__call__'):
                return CallChain.is_awaitable(obj.__call__)
            return False
        else:
            return False
项目:Harmonbot    作者:Harmon758    | 项目源码 | 文件源码
def eval(self, ctx, *, code : str):
        code = code.strip('`')
        try:
            result = eval(code)
            if inspect.isawaitable(result):
                result = await result
            await self.bot.reply(clients.py_code_block.format(result))
        except Exception as e:
            await self.bot.reply(clients.py_code_block.format("{}: {}".format(type(e).__name__, e)))
项目:Harmonbot    作者:Harmon758    | 项目源码 | 文件源码
def repl(self, ctx):
        variables = {"self" : self, "ctx" : ctx, "last" : None}
        await self.bot.embed_reply("Enter code to execute or evaluate\n`exit` or `quit` to exit")
        while True:
            message = await self.bot.wait_for_message(author = ctx.message.author, channel = ctx.message.channel, check = lambda m: m.content.startswith('`'))
            if message.content.startswith("```py") and message.content.endswith("```"):
                code = message.content[5:-3].strip(" \n")
            else:
                code = message.content.strip("` \n")
            if code in ("quit", "exit", "quit()", "exit()"):
                await self.bot.embed_reply('Exiting repl')
                return
            function = exec
            if '\n' not in code:
                try:
                    code = compile(code, "<repl>", "eval")
                except SyntaxError:
                    pass
                else:
                    function = eval
            if function is exec:
                try:
                    code = compile(code, "<repl>", "exec")
                except SyntaxError as e:
                    await self.bot.reply(clients.py_code_block.format("{0.text}{1:>{0.offset}}\n{2}: {0}".format(e, '^', type(e).__name__)))
                    continue
            try:
                result = function(code, variables)
                if inspect.isawaitable(result):
                    result = await result
            except:
                await self.bot.reply(clients.py_code_block.format("\n".join(traceback.format_exc().splitlines()[-2:]).strip()))
            else:
                if function is eval:
                    try:
                        await self.bot.reply(clients.py_code_block.format(result))
                    except Exception as e:
                        await self.bot.reply(clients.py_code_block.format("{}: {}".format(type(e).__name__, e)))
                variables["last"] = result
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def isawaitable(obj):
        return isinstance(obj, Awaitable)


###
#  allow patching the stdlib
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def patch(patch_inspect=True):
    """
    Main entry point for patching the ``collections.abc`` and ``inspect``
    standard library modules.
    """
    PATCHED['collections.abc.Generator'] = _collections_abc.Generator = Generator
    PATCHED['collections.abc.Coroutine'] = _collections_abc.Coroutine = Coroutine
    PATCHED['collections.abc.Awaitable'] = _collections_abc.Awaitable = Awaitable

    if patch_inspect:
        import inspect
        PATCHED['inspect.isawaitable'] = inspect.isawaitable = isawaitable
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def isawaitable(obj):
        return isinstance(obj, Awaitable)


###
#  allow patching the stdlib
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def patch(patch_inspect=True):
    """
    Main entry point for patching the ``collections.abc`` and ``inspect``
    standard library modules.
    """
    PATCHED['collections.abc.Generator'] = _collections_abc.Generator = Generator
    PATCHED['collections.abc.Coroutine'] = _collections_abc.Coroutine = Coroutine
    PATCHED['collections.abc.Awaitable'] = _collections_abc.Awaitable = Awaitable

    if patch_inspect:
        import inspect
        PATCHED['inspect.isawaitable'] = inspect.isawaitable = isawaitable
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def isawaitable(obj):
        return isinstance(obj, Awaitable)


###
#  allow patching the stdlib
项目:chrome-prerender    作者:bosondata    | 项目源码 | 文件源码
def _handle_response(self, obj: Dict) -> None:
        req_id = obj.get('id')
        if req_id is not None:
            future = self._futures.get(req_id)
            if future and not future.cancelled():
                future.set_result(obj)
        method = obj.get('method')
        if method is not None:
            callback = self._callbacks.get(method)
            if callback is not None:
                ret = callback(obj)
                if inspect.isawaitable(ret):
                    await ret
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def auto_await(value):
    if inspect.isawaitable(value):
        return await value
    return value
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def auto_await(value):
    if inspect.isawaitable(value):
        return await value
    return value
项目:Inkxbot    作者:InkxtheSquid    | 项目源码 | 文件源码
def debug(self, ctx, *, code : str):
        """Evaluates code."""
        code = code.strip('` ')
        python = '```py\n{}\n```'
        result = None

        env = {
            'bot': self.bot,
            'ctx': ctx,
            'message': ctx.message,
            'server': ctx.message.server,
            'channel': ctx.message.channel,
            'author': ctx.message.author
        }

        env.update(globals())

        try:
            result = eval(code, env)
            if inspect.isawaitable(result):
                result = await result
        except Exception as e:
            await self.bot.say(python.format(type(e).__name__ + ': ' + str(e)))
            return

        await self.bot.say(python.format(result))
项目:RPoint    作者:george17-meet    | 项目源码 | 文件源码
def auto_await(value):
    if inspect.isawaitable(value):
        return await value
    return value
项目:monk    作者:IcyCC    | 项目源码 | 文件源码
def handle_request(self, request, write_response):
        if self.is_static(request) is True:
            path = self.config.get('static_path')+str(request.url.path, encoding="utf-8")
            path = os.path.abspath(path)
            log.info("Static file read path {} ".format(path))
            context = await read_file(path)
            file_type = str(request.url.path, encoding="utf-8").split('.')[-1]
            resp = Response(body=context, version='1.1',
                            content_type=TYPE_MAP.get(file_type, "text/plain"))
            write_response(resp)

        for hooker in self.hook.after_request:
            request = hooker(request)
            if isawaitable(request):
                request = await request

        handle = self.router.get(request=request)
        if handle is None:
            write_response(self.abort_404("Not found method"))
            return

        response = handle(request)

        if isawaitable(response):
            response = await response

        for hooker in self.hook.before_response:
            response = hooker(response)
            if isawaitable(response):
                response = await response

        write_response(response)
项目:ormageddon    作者:renskiy    | 项目源码 | 文件源码
def __anext__(self):
        try:
            result = self.__next__()
            if inspect.isawaitable(result):
                result = await result
                self.qrw._result_cache[-1] = result
            return result
        except StopAsyncIteration:
            self.qrw._result_cache.pop()
            self.qrw._ct -= 1
            self._idx -= 1
            raise
        except StopIteration:
            raise StopAsyncIteration
项目:ormageddon    作者:renskiy    | 项目源码 | 文件源码
def python_value(self, value):
        if inspect.isawaitable(value):
            return self.async_python_value(value)
        return super().python_value(value)
项目:ormageddon    作者:renskiy    | 项目源码 | 文件源码
def _map(callback, *iterables, loop=None):
    if any(map(inspect.isawaitable, iterables)):
        return map_async(callback, *iterables, loop=loop)
    return map(callback, *iterables)
项目:ormageddon    作者:renskiy    | 项目源码 | 文件源码
def _zip(*iterables, loop=None):
    assert not all(map(inspect.isawaitable, iterables))
    return zip(*ensure_iterables(*iterables, loop=loop))
项目:ormageddon    作者:renskiy    | 项目源码 | 文件源码
def execute(self):
        loop = self.database.loop
        with contextlib.ExitStack() as exit_stack:
            exit_stack.enter_context(patch(self, '_execute', _QueryExecutor(self._execute, loop=loop)))
            exit_stack.enter_context(patch(peewee, 'map', functools.partial(_map, loop=loop), map))
            exit_stack.enter_context(patch(peewee, 'zip', functools.partial(_zip, loop=loop), zip))
            result = super().execute()
            if inspect.isawaitable(result):
                return await result
            elif isinstance(result, list):
                return await asyncio.gather(*result, loop=loop)
            return result
项目:ormageddon    作者:renskiy    | 项目源码 | 文件源码
def ensure_iterables(*iterables, loop=None):
    for iterable in iterables:
        if inspect.isawaitable(iterable):
            future = asyncio.ensure_future(iterable, loop=loop)
            yield future_iterator(future)
        else:
            yield iterable
项目:ormageddon    作者:renskiy    | 项目源码 | 文件源码
def force_future(entity, loop=None):
    if inspect.isawaitable(entity):
        return entity
    future = asyncio.Future(loop=loop)
    future.set_result(entity)
    return future
项目:RealtimePythonChat    作者:quangtqag    | 项目源码 | 文件源码
def auto_await(value):
    if inspect.isawaitable(value):
        return await value
    return value
项目:Indushell    作者:SecarmaLabs    | 项目源码 | 文件源码
def auto_await(value):
    if inspect.isawaitable(value):
        return await value
    return value
项目:SESTREN    作者:SirThane    | 项目源码 | 文件源码
def _eval(self, ctx, *, code: str):
        """Run eval() on an input."""

        code = code.strip('` ')
        python = '```py\n{0}\n```'
        env = self.env(ctx)

        try:
            result = eval(code, env)
            if inspect.isawaitable(result):
                result = await result
            result = str(result)[:1014]
            color = 0x00FF00
            field = {
                'inline': False,
                'name': 'Yielded result:',
                'value': python.format(result)
            }
        except Exception as e:
            color = 0xFF0000
            field = {
                'inline': False,
                'name': 'Yielded exception "{0.__name__}":'.format(type(e)),
                'value': '{0} '.format(e)
            }

        embed = discord.Embed(title="Eval on:", description=python.format(code), color=color)
        embed.add_field(**field)

        await ctx.send(embed=embed)
项目:Liljimbo-Chatbot    作者:chrisjim316    | 项目源码 | 文件源码
def auto_await(value):
    if inspect.isawaitable(value):
        return await value
    return value
项目:flask_system    作者:prashasy    | 项目源码 | 文件源码
def auto_await(value):
    if inspect.isawaitable(value):
        return await value
    return value
项目:pcbot    作者:pckv    | 项目源码 | 文件源码
def call_reload(name: str):
    """ Initiates reload of plugin. """
    # See if the plugin has an on_reload() function, and call that
    if hasattr(plugins[name], "on_reload"):
        if callable(plugins[name].on_reload):
            result = plugins[name].on_reload(name)
            if inspect.isawaitable(result):
                await result
    else:
        await on_reload(name)
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def auto_await(value):
    if inspect.isawaitable(value):
        return await value
    return value
项目:FileStoreGAE    作者:liantian-cn    | 项目源码 | 文件源码
def auto_await(value):
    if inspect.isawaitable(value):
        return await value
    return value
项目:goldmine    作者:Armored-Dragon    | 项目源码 | 文件源码
def __call__(self, *args, **kwargs):
        print('called fake')
        res = self.attr(*args, **kwargs)
        print('calling regular' + str(args))
        print(self.attr)
        if inspect.isawaitable(res):
            self.iface.loop.create_task(res)
项目:syncer    作者:miyakogi    | 项目源码 | 文件源码
def _is_awaitable(co: Generator[Any, None, Any]) -> bool:
    if PY35:
        return inspect.isawaitable(co)
    else:
        return (isinstance(co, types.GeneratorType) or
                isinstance(co, asyncio.Future))
项目:curious    作者:SunDwarf    | 项目源码 | 文件源码
def handle_commands(self, ctx: EventContext, message: Message):
        """
        Handles commands for a message.
        """
        # check bot type
        if message.author.user.bot and self.client.bot_type & 8:
            return

        if message.author.user != self.client.user and self.client.bot_type & 64:
            return

        if message.guild_id is not None and self.client.bot_type & 32:
            return

        if message.guild_id is None and self.client.bot_type & 16:
            return

        # step 1, match the messages
        matched = self.message_check(self.client, message)
        if inspect.isawaitable(matched):
            matched = await matched

        if matched is None:
            return None

        # deconstruct the tuple returned into more useful variables than a single tuple
        command_word, tokens = matched

        # step 2, create the new commands context
        ctx = Context(event_context=ctx, message=message)
        ctx.command_name = command_word
        ctx.tokens = tokens
        ctx.manager = self

        # step 3, invoke the context to try and match the command and run it
        await ctx.try_invoke()
项目:metapensiero.reactive    作者:azazel75    | 项目源码 | 文件源码
def _exec_possible_awaitable(self, func, *args, **kwargs):
        result = func(*args, **kwargs)
        if inspect.isawaitable(result):
            result = await result
        return result
项目:python-group-proj    作者:Sharcee    | 项目源码 | 文件源码
def auto_await(value):
    if inspect.isawaitable(value):
        return await value
    return value
项目:islam-buddy    作者:hamir    | 项目源码 | 文件源码
def auto_await(value):
    if inspect.isawaitable(value):
        return await value
    return value
项目:Chiaki-Nanami    作者:Ikusaba-san    | 项目源码 | 文件源码
def maybe_awaitable(func, *args, **kwargs):
    maybe = func(*args, **kwargs)
    return await maybe if inspect.isawaitable(maybe) else maybe