Python inspect 模块,iscoroutinefunction() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用inspect.iscoroutinefunction()

项目:uvio    作者:srossross    | 项目源码 | 文件源码
def next_tick(self, callback, *args, **kwargs):
        """
        Once the current event loop turn runs to completion,
        call the callback or coroutine function::

            loop.next_tick(callback)


        """
        if inspect.iscoroutinefunction(callback):
            raise Exception("Did you mean ot create a coroutine (got: {})".format(callback))

        if not inspect.iscoroutine(callback):
            callback = partial(callback, *args, **kwargs)

        self.ready[callback] = None
项目:calm    作者:bagrat    | 项目源码 | 文件源码
def _handle_request(self, handler_def, **kwargs):
        """A generic HTTP method handler."""
        if not handler_def:
            raise MethodNotAllowedError()

        handler = handler_def.handler
        kwargs.update(self._get_query_args(handler_def))
        self._cast_args(handler, kwargs)
        self._parse_and_update_body(handler_def)
        if inspect.iscoroutinefunction(handler):
            resp = await handler(self.request, **kwargs)
        else:
            self.log.warning("'%s' is not a coroutine!", handler_def.handler)
            resp = handler(self.request, **kwargs)

        if resp:
            self._write_response(resp, handler_def)
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def create(cls, obj, body, evaldict, defaults=None,
               doc=None, module=None, addsource=True, **attrs):
        """
        Create a function from the strings name, signature and body.
        evaldict is the evaluation dictionary. If addsource is true an
        attribute __source__ is added to the result. The attributes attrs
        are added, if any.
        """
        if isinstance(obj, str):  # "name(signature)"
            name, rest = obj.strip().split('(', 1)
            signature = rest[:-1]  # strip a right parens
            func = None
        else:  # a function
            name = None
            signature = None
            func = obj
        self = cls(func, name, signature, defaults, doc, module)
        ibody = '\n'.join('    ' + line for line in body.splitlines())
        caller = evaldict.get('_call_')  # when called from `decorate`
        if caller and iscoroutinefunction(caller):
            body = ('async def %(name)s(%(signature)s):\n' + ibody).replace(
                'return', 'return await')
        else:
            body = 'def %(name)s(%(signature)s):\n' + ibody
        return self.make(body, evaldict, addsource, **attrs)
项目:curious    作者:SunDwarf    | 项目源码 | 文件源码
def add_event(self, func, name: str = None):
        """
        Add an event to the internal registry of events.

        :param name: The event name to register under.
        :param func: The function to add.
        """
        if not inspect.iscoroutinefunction(func):
            raise TypeError("Event must be a coroutine function")

        if name is None:
            evs = func.events
        else:
            evs = [name]

        for ev_name in evs:
            logger.debug("Registered event `{}` handling `{}`".format(func, ev_name))
            self.event_listeners.add(ev_name, func)
项目:aioprometheus    作者:claws    | 项目源码 | 文件源码
def isawaitable(obj):
    ''' Return True if the object is an awaitable or is a function that
    returns an awaitable.

    This function is used internally by aiotesting.
    '''
    if PY35:
        result = inspect.iscoroutinefunction(obj) or inspect.isawaitable(obj)

    elif PY34:
        result = (isinstance(obj, asyncio.Future) or
                  asyncio.iscoroutine(obj) or
                  hasattr(obj, '__await__'))
    else:
        raise Exception(
            'isawaitable is not supported on Python {}'.format(
                sys.version_info))
    return result
项目:aioprometheus    作者:claws    | 项目源码 | 文件源码
def _tearDown(self):
        ''' Destroy the event loop '''
        if asyncio.iscoroutinefunction(self.tearDown):
            self.loop.run_until_complete(self.tearDown())
        else:
            self.tearDown()

        if not isinstance(self.loop, asyncio.AbstractEventLoop):
            raise Exception('Invalid event loop: ', self.loop)
        if self.loop.is_running():
            self.loop.stop()
        self.loop.close()
        del self.loop
        asyncio.set_event_loop_policy(None)
        asyncio.set_event_loop(None)

        # By explicitly forcing a garbage collection here,
        # the event loop will report any remaining sockets
        # and coroutines left in the event loop which indicates
        # that further cleanup actions should be implemented
        # in the code under test.
        gc.collect()
项目:asyncpg    作者:MagicStack    | 项目源码 | 文件源码
def _iter_methods(bases, ns):
        for base in bases:
            for methname in dir(base):
                if not methname.startswith('test_'):
                    continue

                meth = getattr(base, methname)
                if not inspect.iscoroutinefunction(meth):
                    continue

                yield methname, meth

        for methname, meth in ns.items():
            if not methname.startswith('test_'):
                continue

            if not inspect.iscoroutinefunction(meth):
                continue

            yield methname, meth
项目:nettirely    作者:8Banana    | 项目源码 | 文件源码
def on_regexp(self, regexp):
        """
        Creates a decorator that registers a regexp command handler.

        The regexp command handler takes as arguments:
            1. The bot instance
            2. The command sender
            3. The command recipient, usually a channel
            4. The match object, for any groups you might wanna extract.

        The regexp is searched, not just matched.
        Your handler might get called multiple times per message,
        depending on the amount of matches.
        """

        regexp = re.compile(regexp)

        def _inner(func):
            if not inspect.iscoroutinefunction(func):
                raise ValueError("You can only register coroutines!")
            self._regexp_callbacks.setdefault(regexp, [])\
                                  .append(func)
            return func
        return _inner
项目:plumeria    作者:sk89q    | 项目源码 | 文件源码
def __getattr__(self, item):
        attr = getattr(self.delegate, item)

        if inspect.iscoroutinefunction(attr) or hasattr(attr,
                                                        "_is_coroutine") and attr._is_coroutine or inspect.iscoroutine(
            attr):
            async def wrapper(*args, **kwargs):
                return self._wrap(await attr(*args, **kwargs))

            return wrapper() if inspect.iscoroutine(attr) else wrapper
        elif inspect.isgeneratorfunction(attr) or inspect.isgenerator(attr):
            def wrapper(*args, **kwargs):
                for entry in attr(*args, **kwargs):
                    yield self._wrap(entry)

            return wrapper if inspect.isgeneratorfunction(attr) else wrapper()
        elif inspect.isfunction(attr):
            def wrapper(*args, **kwargs):
                return self._wrap(attr(*args, **kwargs))

            return wrapper
        else:
            return self._wrap(attr)
项目:uninhibited    作者:akatrevorjay    | 项目源码 | 文件源码
def _call_handler(self, *, handler, args, kwargs, loop=None, start=True, executor=None):
        if loop is None:
            loop = asyncio.get_event_loop()

        if (inspect.iscoroutinefunction(handler) or inspect.isgeneratorfunction(handler)):
            # Get a coro/future
            f = handler(*args, **kwargs)
        else:
            # run_in_executor doesn't support kwargs
            handler = functools.partial(handler, *args, **kwargs)

            # Get result/coro/future
            f = loop.run_in_executor(executor, handler)

        if start:
            # Wrap future in a task, schedule it for execution
            f = asyncio.ensure_future(f, loop=loop)

        # Return a coro that awaits our existing future
        return self._result_tuple(handler, f)
项目:flask    作者:bobohope    | 项目源码 | 文件源码
def create(cls, obj, body, evaldict, defaults=None,
               doc=None, module=None, addsource=True, **attrs):
        """
        Create a function from the strings name, signature and body.
        evaldict is the evaluation dictionary. If addsource is true an
        attribute __source__ is added to the result. The attributes attrs
        are added, if any.
        """
        if isinstance(obj, str):  # "name(signature)"
            name, rest = obj.strip().split('(', 1)
            signature = rest[:-1]  # strip a right parens
            func = None
        else:  # a function
            name = None
            signature = None
            func = obj
        self = cls(func, name, signature, defaults, doc, module)
        ibody = '\n'.join('    ' + line for line in body.splitlines())
        caller = evaldict.get('_call_')  # when called from `decorate`
        if caller and iscoroutinefunction(caller):
            body = ('async def %(name)s(%(signature)s):\n' + ibody).replace(
                'return', 'return await')
        else:
            body = 'def %(name)s(%(signature)s):\n' + ibody
        return self.make(body, evaldict, addsource, **attrs)
项目:retwist    作者:trustyou    | 项目源码 | 文件源码
def render_GET(self, request):
        # type: (Request) -> int
        """
        Get JSON data from json_GET, and render for the client.

        Do not override in sub classes ...
        :param request: Twisted request
        """
        if iscoroutinefunction(self.json_GET):
            coroutine = self.json_GET(request)
            json_def = ensureDeferred(coroutine)  # type: Deferred
        else:
            json_def = maybeDeferred(self.json_GET, request)

        json_def.addCallback(self.send_json_response, request)
        json_def.addErrback(self.handle_failure, request)

        # handle connection failures
        request.notifyFinish().addErrback(self.on_connection_closed, json_def)

        return NOT_DONE_YET
项目:yatta_reader    作者:sound88    | 项目源码 | 文件源码
def create(cls, obj, body, evaldict, defaults=None,
               doc=None, module=None, addsource=True, **attrs):
        """
        Create a function from the strings name, signature and body.
        evaldict is the evaluation dictionary. If addsource is true an
        attribute __source__ is added to the result. The attributes attrs
        are added, if any.
        """
        if isinstance(obj, str):  # "name(signature)"
            name, rest = obj.strip().split('(', 1)
            signature = rest[:-1]  # strip a right parens
            func = None
        else:  # a function
            name = None
            signature = None
            func = obj
        self = cls(func, name, signature, defaults, doc, module)
        ibody = '\n'.join('    ' + line for line in body.splitlines())
        caller = evaldict.get('_call_')  # when called from `decorate`
        if caller and iscoroutinefunction(caller):
            body = ('async def %(name)s(%(signature)s):\n' + ibody).replace(
                'return', 'return await')
        else:
            body = 'def %(name)s(%(signature)s):\n' + ibody
        return self.make(body, evaldict, addsource, **attrs)
项目:yaz    作者:yaz    | 项目源码 | 文件源码
def __call__(self, **kwargs):
        """Prepare dependencies and call this Task"""
        start = datetime.datetime.now()
        logger.info("Start task %s", self)
        try:
            if self.plugin_class:
                result = self.func(get_plugin_instance(self.plugin_class), **kwargs)

            else:
                result = self.func(**kwargs)

            if inspect.iscoroutinefunction(self.func):
                assert inspect.iscoroutine(
                    result), "The task is defined as a coroutine function but does not return a coroutine"
                loop = asyncio.get_event_loop()
                if loop.is_closed():
                    loop = asyncio.new_event_loop()
                    asyncio.set_event_loop(loop)
                result = loop.run_until_complete(result)
                loop.close()
        finally:
            stop = datetime.datetime.now()
            logger.info("End task %s after %s", self, stop - start)

        return result
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def handle(self, instance, player, argv):
        """
        Handle command parsing and execution.

        :param player: Player object.
        :param argv: Arguments in array
        :type player: pyplanet.apps.core.maniaplanet.models.player.Player
        """
        # Check permissions.
        if self.perms and len(self.perms) > 0:
            # All the given perms need to be matching!
            is_allowed = await asyncio.gather(*[
                instance.permission_manager.has_permission(player, perm) for perm in self.perms
            ])
            if not all(allowed is True for allowed in is_allowed):
                await instance.chat(
                    '$z$sYou are not authorized to use this command!',
                    player.login
                )
                return

        # Strip off the namespace and command.
        paramv = self.get_params(argv)

        # Parse, validate and show errors if any.
        self.parser.parse(paramv)
        if not self.parser.is_valid():
            await instance.gbx.multicall(
                instance.chat('$z$sCommand operation got invalid arguments: {}'.format(', '.join(self.parser.errors)), player),
                instance.chat('$z$s >> {}'.format(self.usage_text), player),
            )
            return

        # We are through. Call our target!
        if iscoroutinefunction(self.target):
            return await self.target(player=player, data=self.parser.data, raw=argv, command=self)
        return self.target(player=player, data=self.parser.data, raw=argv, command=self)
项目:CEX.IO-Client-Python3.5    作者:cexioltd    | 项目源码 | 文件源码
def is_awaitable(obj):
        # There is no single method which can answer in any case, should wait or not - so need to create one
        # for the suspected cases : func, coro, gen-coro, future,
        #                           class with sync __call__, class with async __call__,
        #                           sync method, async method
        if inspect.isawaitable(obj) or inspect.iscoroutinefunction(obj) or inspect.iscoroutine(obj):
            return True
        elif inspect.isgeneratorfunction(obj):
            return True
        elif CallChain.is_user_defined_class(obj):
            if hasattr(obj, '__call__'):
                return CallChain.is_awaitable(obj.__call__)
            return False
        else:
            return False
项目:bottery    作者:rougeth    | 项目源码 | 文件源码
def test_baseengine_not_implemented_calls(method_name):
    """Check if method calls from public API raise NotImplementedError"""
    engine = BaseEngine()
    with pytest.raises(NotImplementedError):
        method = getattr(engine, method_name)
        if inspect.iscoroutinefunction(method):
            await method()
        else:
            method()
项目:bottery    作者:rougeth    | 项目源码 | 文件源码
def get_response(self, view, message):
        """
        Get response running the view with await syntax if it is a
        coroutine function, otherwise just run it the normal way.
        """

        if inspect.iscoroutinefunction(view):
            return await view(message)

        return view(message)
项目:poloniex-api    作者:absortium    | 项目源码 | 文件源码
def command_operator(func):
    if iscoroutinefunction(func):
        async def async_decorator(self, *args, **kwargs):
            kwargs = apply_defaults(func, *args, **kwargs)
            method, params = self.get_params(func.__name__, **kwargs)

            if method == "post":
                response = await self.api_call(data=params)
            elif method == "get":
                response = await self.api_call(params=params)
            else:
                raise PoloniexError("Not available method '{}'".format(method))

            return self.response_handler(response, command=func.__name__)

        return async_decorator
    else:
        def decorator(self, *args, **kwargs):
            kwargs = apply_defaults(func, *args, **kwargs)
            method, params = self.get_params(func.__name__, **kwargs)

            if method == "post":
                response = self.api_call(data=params)
            elif method == "get":
                response = self.api_call(params=params)
            else:
                raise PoloniexError("Not available method '{}'".format(method))

            return self.response_handler(response, command=func.__name__)

        return decorator
项目:toapy    作者:endreman0    | 项目源码 | 文件源码
def __get__(desc, obj, cls):
        if obj is None: return desc # no classmethod-style replacement
        attrs = vars(obj)
        if desc.fn.__name__ in attrs:
            return attrs[desc.fn.__name__]

        try:
            fetcher = obj._get
        except AttributeError:
            raise TypeError('TOA object %r must declare _get' % obj) from None
        meth = desc.fn.__get__(obj)
        if inspect.iscoroutinefunction(fetcher):
            @functools.wraps(desc.fn)
            async def wrapper(self, *args, **kwargs):
                url = desc.get_url(args, kwargs)
                json = await self._get(url)
                return meth(json)
        else:
            @functools.wraps(desc.fn)
            def wrapper(self, *args, **kwargs):
                url = desc.get_url(args, kwargs)
                json = self._get(url)
                return meth(json)
        ret = wrapper.__get__(obj)
        attrs[desc.fn.__name__] = ret
        return ret
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def pytest_pyfunc_call(pyfuncitem):
    if inspect.iscoroutinefunction(pyfuncitem.obj):
        pyfuncitem.obj = trio_test(pyfuncitem.obj)
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def pytest_pyfunc_call(pyfuncitem):
    if inspect.iscoroutinefunction(pyfuncitem.obj):
        pyfuncitem.obj = trio_test(pyfuncitem.obj)
项目:asyncqlio    作者:SunDwarf    | 项目源码 | 文件源码
def __init__(cls, name, bases, methods):
        coros = {}
        for base in reversed(cls.__mro__):
            coros.update((name, val) for name, val in vars(base).items()
                         if inspect.iscoroutinefunction(val))

        for name, val in vars(cls).items():
            if name in coros and not inspect.iscoroutinefunction(val):
                raise TypeError('Must use async def %s%s' % (name, inspect.signature(val)))
        super().__init__(name, bases, methods)
项目:asyncqlio    作者:SunDwarf    | 项目源码 | 文件源码
def __new__(meta, clsname, bases, attributes):
        if '__init__' in attributes and not inspect.iscoroutinefunction(attributes['__init__']):
            raise TypeError('__init__ must be a coroutine')
        return super().__new__(meta, clsname, bases, attributes)
项目:sphinxcontrib-trio    作者:python-trio    | 项目源码 | 文件源码
def sniff_options(obj):
    options = set()
    # We walk the __wrapped__ chain to collect properties.
    while True:
        if getattr(obj, "__isabstractmethod__", False):
            options.add("abstractmethod")
        if isinstance(obj, classmethod):
            options.add("classmethod")
        if isinstance(obj, staticmethod):
            options.add("staticmethod")
        # if isinstance(obj, property):
        #     options.add("property")
        # Only check for these if we haven't seen any of them yet:
        if not (options & EXCLUSIVE_OPTIONS):
            if inspect.iscoroutinefunction(obj):
                options.add("async")
            # in some versions of Python, isgeneratorfunction returns true for
            # coroutines, so we use elif
            elif inspect.isgeneratorfunction(obj):
                options.add("for")
            if isasyncgenfunction(obj):
                options.add("async-for")
            # Some heuristics to detect when something is a context manager
            if getattr(obj, "__code__", None) in CM_CODES:
                options.add("with")
            if getattr(obj, "__returns_contextmanager__", False):
                options.add("with")
            if getattr(obj, "__returns_acontextmanager__", False):
                options.add("async-with")
        if hasattr(obj, "__wrapped__"):
            obj = obj.__wrapped__
        elif hasattr(obj, "__func__"):  # for staticmethod & classmethod
            obj = obj.__func__
        else:
            break

    return options
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def iscoroutinefunction(f):
        return False

# getargspec has been deprecated in Python 3.5
项目:omnic    作者:michaelpb    | 项目源码 | 文件源码
def coerce_to_synchronous(func):
    '''
    Given a function that might be async, wrap it in an explicit loop so it can
    be run in a synchronous context.
    '''
    if inspect.iscoroutinefunction(func):
        @functools.wraps(func)
        def sync_wrapper(*args, **kwargs):
            loop = asyncio.get_event_loop()
            try:
                loop.run_until_complete(func(*args, **kwargs))
            finally:
                loop.close()
        return sync_wrapper
    return func
项目:trafaret    作者:Deepwalker    | 项目源码 | 文件源码
def async_transform(self, value, context=None):
        if not inspect.iscoroutinefunction(self.fn):
            return self.transform(value, context=context)
        if self.supports_context:
            res = await self.fn(value, context=context)
        else:
            res = await self.fn(value)
        if isinstance(res, DataError):
            raise res
        else:
            return res
项目:cozmo-python-sdk    作者:anki    | 项目源码 | 文件源码
def connect(f, conn_factory=conn.CozmoConnection, connector=None):
    '''Connects to the Cozmo Engine on the mobile device and supplies the connection to a function.

    Accepts a function, f, that is given a :class:`cozmo.conn.CozmoConnection` object as
    a parameter.

    The supplied function may be either an asynchronous coroutine function
    (normally defined using ``async def``) or a regular synchronous function.

    If an asynchronous function is supplied it will be run on the same thread
    as the Cozmo event loop and must use the ``await`` keyword to yield control
    back to the loop.

    If a synchronous function is supplied then it will run on the main thread
    and Cozmo's event loop will run on a separate thread.  Calls to
    asynchronous methods returned from CozmoConnection will automatically
    be translated to synchronous ones.

    The connect function will return once the supplied function has completed,
    as which time it will terminate the connection to the robot.

    Args:
        f (callable): The function to execute
        conn_factory (callable): Override the factory function to generate a
            :class:`cozmo.conn.CozmoConnection` (or subclass) instance.
        connector (:class:`DeviceConnector`): Optional instance of a DeviceConnector
            subclass that handles opening the USB connection to a device.
            By default it will connect to the first Android or iOS device that
            has the Cozmo app running in SDK mode.
    '''
    if asyncio.iscoroutinefunction(f):
        return _connect_async(f, conn_factory, connector)
    return _connect_sync(f, conn_factory, connector)
项目:cozmo-python-sdk    作者:anki    | 项目源码 | 文件源码
def _connect_viewer(f, conn_factory, connector, viewer):
    # Run the viewer in the main thread, with the SDK running on a new background thread.
    loop = asyncio.new_event_loop()
    abort_future = concurrent.futures.Future()

    async def view_connector(coz_conn):
        try:
            await viewer.connect(coz_conn)

            if inspect.iscoroutinefunction(f):
                await f(coz_conn)
            else:
                await coz_conn._loop.run_in_executor(None, f, base._SyncProxy(coz_conn))
        finally:
            viewer.disconnect()

    try:
        if not inspect.iscoroutinefunction(f):
            conn_factory = functools.partial(conn_factory, _sync_abort_future=abort_future)
        lt = _LoopThread(loop, f=view_connector, conn_factory=conn_factory, connector=connector)
        lt.start()
        viewer.mainloop()
    except BaseException as e:
        abort_future.set_exception(exceptions.SDKShutdown(repr(e)))
        raise
    finally:
        lt.stop()
项目:mercury    作者:jr0d    | 项目源码 | 文件源码
def async_endpoint(name):
    def add(f):
        log.debug('Adding async runtime endpoint {} ({})'.format(f.__name__, name))
        if not inspect.iscoroutinefunction(f):
            log.error('{} is not a coroutine'.format(f.__name__))
        elif name in async_endpoints:
            log.error('{} already exists in table'.format(name))
        else:
            async_endpoints[name] = f
        return f
    return add
项目:napper    作者:epsy    | 项目源码 | 文件源码
def __new__(cls, name, bases, members):
        for key, value in dict(members).items():
            if key.startswith('test_') and inspect.iscoroutinefunction(value):
                members['async_' + key] = value
                members[key] = _make_asyncwrapper(value)
        return type.__new__(cls, name, bases, members)
项目:golightan    作者:shirou    | 项目源码 | 文件源码
def iscoroutinefunction(func):
    """Return True if func is a decorated coroutine function."""
    return (getattr(func, '_is_coroutine', None) is _is_coroutine or
            _inspect_iscoroutinefunction(func))
项目:sadm    作者:prologin    | 项目源码 | 文件源码
def __init__(cls, name, bases, dct):
        super(MethodCollection, cls).__init__(name, bases, dct)

        # Build the list of remote methods.
        remote_methods = {}
        for name, obj in dct.items():
            if is_remote_method(obj):
                if not inspect.iscoroutinefunction(obj):
                    raise RuntimeError("Remote method {} is not a coroutine"
                                       .format(obj))
                remote_methods[name] = obj
        cls.REMOTE_METHODS = remote_methods
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def iscoroutinefunction(func):
    """Return True if func is a decorated coroutine function."""
    return (getattr(func, '_is_coroutine', False) or
            _inspect_iscoroutinefunction(func))
项目:aiohttp_json_api    作者:vovanbo    | 项目源码 | 文件源码
def setup_custom_handlers(custom_handlers):
    """Set up default and custom handlers for JSON API application."""
    from . import handlers as default_handlers
    from .common import logger

    handlers = {
        name: handler
        for name, handler in inspect.getmembers(default_handlers,
                                                inspect.iscoroutinefunction)
        if name in default_handlers.__all__
    }
    if custom_handlers is not None:
        if isinstance(custom_handlers, MutableMapping):
            custom_handlers_iter = custom_handlers.items()
        elif isinstance(custom_handlers, Sequence):
            custom_handlers_iter = ((c.__name__, c) for c in custom_handlers)
        else:
            raise TypeError('Wrong type of "custom_handlers" parameter. '
                            'Mapping or Sequence is expected.')

        for name, custom_handler in custom_handlers_iter:
            handler_name = custom_handler.__name__
            if name not in handlers:
                logger.warning('Custom handler %s is ignored.', name)
                continue
            if not inspect.iscoroutinefunction(custom_handler):
                logger.error('"%s" is not a co-routine function (ignored).',
                             handler_name)
                continue

            handlers[name] = custom_handler
            logger.debug('Default handler "%s" is replaced '
                         'with co-routine "%s" (%s)',
                         name, handler_name, inspect.getmodule(custom_handler))
    return handlers
项目:portkey    作者:red8012    | 项目源码 | 文件源码
def on(selector: str, event: str, filter_selector: str=None, throttle=False, stop_propagation=False):
    """Register a coroutine function as a handler for the given event.
    Corresponding to `$(selector).on(event, filter_selector, ...)` in jQuery

    Args:
        selector: A jQuery selector expression
        event: One or more space-separated event types and optional namespaces, such as "click".
        filter_selector: a selector string to filter the descendants of the selected elements that trigger the event.
            If this argument is omitted, the event is always triggered when it reaches the selected element.
        throttle: Whether to discard events when Portkey is busy.
            Should set to true if the event is going to fire at very high frequency.
            If set to false, all events will be queued and handled.
        stop_propagation: Prevents the event from bubbling up the DOM tree, 
            preventing any parent handlers from being notified of the event.

    Returns: A decorator that returns the original function.

    """

    def decorator(func):
        assert inspect.iscoroutinefunction(func), \
            'Only a coroutine function (a function defined with an async def syntax)' \
            ' can be registered as an event handler.'
        handler_info = get_handler_info(func)
        # assert handler_info.handler is None, \
        #     'You should only register one event on a handler function.'
        handler_info._handler = func
        handler_info.handler = id(func)
        handler_info.selector = selector
        handler_info.event = event
        handler_info.filter_selector = filter_selector
        handler_info.throttle = throttle
        handler_info.stop_propagation = stop_propagation
        return func
    return decorator
项目:portkey    作者:red8012    | 项目源码 | 文件源码
def preprocess_jquery_arguments(x):
    if isinstance(x, html_tag):
        return str(x)
    if inspect.iscoroutinefunction(x):
        on(None, None)(x)
        return id2handlerInfo[id(x)]
    return x
项目:aioprometheus    作者:claws    | 项目源码 | 文件源码
def addCleanup(self, function, *args, **kwargs):
        '''
        Add a function, with arguments, to be called when the test is
        completed. If function is a coroutine function, it will be run by the
        event loop before it is destroyed.
        '''
        if asyncio.iscoroutinefunction(function):
            return super().addCleanup(self.loop.run_until_complete,
                                      function(*args, **kwargs))

        return super().addCleanup(function, *args, **kwargs)
项目:aioprometheus    作者:claws    | 项目源码 | 文件源码
def _setUp(self):
        ''' Create a new loop for each test case '''
        asyncio.set_event_loop_policy(self.loop_policy)
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)

        if asyncio.iscoroutinefunction(self.setUp):
            self.loop.run_until_complete(self.setUp())
        else:
            self.setUp()
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def __init__(cls, name, bases, methods):
        coros = {}
        for base in reversed(cls.__mro__):
            coros.update((name, val) for name, val in vars(base).items()
                         if inspect.iscoroutinefunction(val))

        for name, val in vars(cls).items():
            if name in coros and not inspect.iscoroutinefunction(val):
                raise TypeError('Must use async def %s%s' % (name, inspect.signature(val)))
        super().__init__(name, bases, methods)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def __new__(meta, clsname, bases, attributes):
        if '__init__' in attributes and not inspect.iscoroutinefunction(attributes['__init__']):
            raise TypeError('__init__ must be a coroutine')
        return super().__new__(meta, clsname, bases, attributes)
项目:old-sovrin    作者:sovrin-foundation    | 项目源码 | 文件源码
def __call__(self, function):

        if not self.shouldCheck:
            return function

        hints = get_type_hints(function)

        def precheck(*args, **kwargs):

            all_args = kwargs.copy()
            all_args.update(dict(zip(function.__code__.co_varnames, args)))

            for argument, argument_type in ((i, type(j)) for i, j in all_args.items()):
                if argument in hints:
                    if not issubclass(argument_type, hints[argument]):
                        raise TypeError('Type of {} is {} and not {}'.
                                        format(argument,
                                               argument_type,
                                               hints[argument]))

        def postcheck(result):
            if 'return' in hints:
                if not isinstance(result, hints['return']):
                    raise TypeError('Type of result is {} and not {}'.
                                    format(type(result), hints['return']))
            return result

        if inspect.iscoroutinefunction(function):
            async def type_checker(*args, **kwargs):
                precheck(*args, **kwargs)
                result = await function(*args, **kwargs)
                return postcheck(result)
        else:
            def type_checker(*args, **kwargs):
                precheck(*args, **kwargs)
                result = function(*args, **kwargs)
                return postcheck(result)

        return type_checker
项目:pytest-curio    作者:johnnoone    | 项目源码 | 文件源码
def pytest_pycollect_makeitem(collector, name, obj):
    if collector.funcnamefilter(name) and inspect.iscoroutinefunction(obj):
        item = pytest.Function(name, parent=collector)
        if 'curio' in item.keywords:
            return list(collector._genfunctions(name, obj))
项目:nettirely    作者:8Banana    | 项目源码 | 文件源码
def _create_callback_registration(key):
    def _inner(self, func):
        if not inspect.iscoroutinefunction(func):
            raise ValueError("You can only register coroutines!")
        self._message_callbacks.setdefault(key, []).append(func)
        return func
    return _inner
项目:nettirely    作者:8Banana    | 项目源码 | 文件源码
def on_connect(self, func):
        if not inspect.iscoroutinefunction(func):
            raise ValueError("You can only register coroutines!")
        self._connection_callbacks.append(func)
项目:nettirely    作者:8Banana    | 项目源码 | 文件源码
def on_disconnect(self, func):
        # Registers a coroutine to be ran right before exit.
        # This is so you can modify your state to be JSON-compliant.
        if inspect.iscoroutinefunction(func):
            raise ValueError("You can only register non-coroutines!")
        self._disconnection_callbacks.append(func)
项目:nettirely    作者:8Banana    | 项目源码 | 文件源码
def on_command(self, command, arg_amount=ANY_ARGUMENTS):
        """
        Creates a decorator that registers a command handler.

        The argument command must include the prefix.

        The command handler takes as arguments:
            1. The bot instance
            2. The command sender.
            3. The command recipient, usually a channel.
            4. Any arguments that came with the command,
               split depending on the arg_amount argument.

        As an example, to register a command that looks like this:
            !slap nickname

        You'd write something like this:
            @bot.on_command("!slap", arg_amount=1)
            def slap(self, sender, recipient, slappee):
                ...
        """

        def _inner(func):
            if not inspect.iscoroutinefunction(func):
                raise ValueError("You can only register coroutines!")
            self._command_callbacks.setdefault(command, [])\
                                   .append((func, arg_amount))
            return func
        return _inner
项目:flask    作者:bobohope    | 项目源码 | 文件源码
def iscoroutinefunction(f):
        return False

# getargspec has been deprecated in Python 3.5
项目:retwist    作者:trustyou    | 项目源码 | 文件源码
def iscoroutinefunction(func):
        # type: (Callable) -> bool
        return False