Python inspect 模块,signature() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用inspect.signature()

项目:function-pattern-matching    作者:rasguanabana    | 项目源码 | 文件源码
def __init__(self, test):
        if not callable(test):
            raise ValueError("Guard test has to be callable")

        # check if test signature is ok
        test_args = _getparams(test)

        if len(tuple(arg for arg in test_args
                     if arg[1] in (p_kind.POSITIONAL_ONLY, p_kind.POSITIONAL_OR_KEYWORD))) != 1:
            raise ValueError("Guard test has to have only one positional (not varying) argument")

        def apply_try_conv_to_bool(function, arg):
            try:
                return bool(function(arg)) # test must always return boolean, so convert asap.
            except TypeError: # occures when unorderable types are compared, but we don't want TypeError to be raised.
                return False  # couldn't compare? then guard must say no.

        self.test = lambda inp: apply_try_conv_to_bool(test, inp)
项目:function-pattern-matching    作者:rasguanabana    | 项目源码 | 文件源码
def __init__(self, test):
        if not callable(test):
            raise ValueError("Relguard test has to be callable")

        # extract simplified signature
        test_args = _getparams(test)

        # varying args are not allowed, they make no sense in reguards.
        _kinds = {x[1] for x in test_args}
        if (p_kind.POSITIONAL_ONLY in _kinds or
                p_kind.VAR_POSITIONAL in _kinds or
                p_kind.VAR_KEYWORD in _kinds):
            raise ValueError("Relguard test must take only named not varying arguments")

        self.test = test
        self.__argnames__ = {x[0] for x in test_args}
项目:dabdabrevolution    作者:harryparkdotio    | 项目源码 | 文件源码
def getargspec(func):
        params = signature(func).parameters
        args, varargs, keywords, defaults = [], None, None, []
        for name, param in params.items():
            if param.kind == param.VAR_POSITIONAL:
                varargs = name
            elif param.kind == param.VAR_KEYWORD:
                keywords = name
            else:
                args.append(name)
                if param.default is not param.empty:
                    defaults.append(param.default)
        return (args, varargs, keywords, tuple(defaults) or None)
项目:dabdabrevolution    作者:harryparkdotio    | 项目源码 | 文件源码
def get_cookie(self, key, default=None, secret=None, digestmod=hashlib.sha256):
        """ Return the content of a cookie. To read a `Signed Cookie`, the
            `secret` must match the one used to create the cookie (see
            :meth:`BaseResponse.set_cookie`). If anything goes wrong (missing
            cookie or wrong signature), return a default value. """
        value = self.cookies.get(key)
        if secret:
            # See BaseResponse.set_cookie for details on signed cookies.
            if value and value.startswith('!') and '?' in value:
                sig, msg = map(tob, value[1:].split('?', 1))
                hash = hmac.new(tob(secret), msg, digestmod=digestmod).digest()
                if _lscmp(sig, base64.b64encode(hash)):
                    dst = pickle.loads(base64.b64decode(msg))
                    if dst and dst[0] == key:
                        return dst[1]
            return default
        return value or default
项目:dabdabrevolution    作者:harryparkdotio    | 项目源码 | 文件源码
def yieldroutes(func):
    """ Return a generator for routes that match the signature (name, args)
    of the func parameter. This may yield more than one route if the function
    takes optional keyword arguments. The output is best described by example::

        a()         -> '/a'
        b(x, y)     -> '/b/<x>/<y>'
        c(x, y=5)   -> '/c/<x>' and '/c/<x>/<y>'
        d(x=5, y=6) -> '/d' and '/d/<x>' and '/d/<x>/<y>'
    """
    path = '/' + func.__name__.replace('__', '/').lstrip('/')
    spec = getargspec(func)
    argc = len(spec[0]) - len(spec[3] or [])
    path += ('/<%s>' * argc) % tuple(spec[0][:argc])
    yield path
    for arg in spec[0][argc:]:
        path += '/<%s>' % arg
        yield path
项目:speccer    作者:bensimner    | 项目源码 | 文件源码
def __init__(self, strat):
        self.log = logging.getLogger('strategy.StrategyIterator({})'.format(str(strat)))
        self.strategy = strat
        sig = inspect.signature(strat.generate)
        params = sig.parameters
        kws = {}
        self.log.debug('init')
        for kw in params:
            if kw in strat._kws:
                self.log.debug('add keyword {kw}'.format(kw=kw))
                kws[kw] = strat._kws[kw]
            elif params[kw].kind == inspect.Parameter.VAR_KEYWORD:
                self.log.debug('merge keywords on VAR_KEYWORD {kw}'.format(kw=kw))
                kws.update(strat._kws)
                break

        self._generator = strat.generate(strat._depth, *strat._args, **kws)
项目:latexipy    作者:masasin    | 项目源码 | 文件源码
def test_parameters_passed_custom_kwargs(self):
        params = inspect.signature(lp.figure).parameters

        with patch('matplotlib.figure.Figure.set_size_inches'), \
                patch('latexipy._latexipy.save_figure') as mock_save_figure:
            with lp.figure('filename', directory='directory', exts='exts',
                           mkdir='mkdir'):
                pass

            mock_save_figure.assert_called_once_with(
                filename='filename',
                directory='directory',
                exts='exts',
                mkdir='mkdir',
                from_context_manager=True,
            )
项目:pytc-gui    作者:harmslab    | 项目源码 | 文件源码
def get_connector_param(self,avail_name):
        """
        Look up parameters for a initializing a connector.
        """

        try:
            c = self._avail_connectors[avail_name]
        except KeyError:
            err = "connector {} not found\n".format(avail_name)
            raise ValueError(err)

        meta = {}
        parameters = inspect.signature(c).parameters
        for k in parameters:
            meta[k] = parameters.default
            if meta[k] == inspect._empty:
                meta[k] = None

        return meta
项目:Seq2Seq-Tensorflow    作者:keon    | 项目源码 | 文件源码
def auto_assign(func):
    # Signature:
    sig = signature(func)
    for name, param in sig.parameters.items():
        if param.kind in (Parameter.VAR_POSITIONAL, Parameter.VAR_KEYWORD):
            raise RuntimeError('Unable to auto assign if *args or **kwargs in signature.')
    # Wrapper:
    @functools.wraps(func)
    def wrapper(self, *args, **kwargs):
        for i, (name, param) in enumerate(sig.parameters.items()):
            # Skip 'self' param:
            if i == 0: continue
            # Search value in args, kwargs or defaults:
            if i - 1 < len(args):
                val = args[i - 1]
            elif name in kwargs:
                val = kwargs[name]
            else:
                val = param.default
            setattr(self, name, val)
        func(self, *args, **kwargs)
    return wrapper
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def verify_interface(iface, klass):
    for method in iface.__abstractmethods__:
        if not hasattr(klass, method):
            raise InterfaceNotImplemented(
                "{0} is missing a {1!r} method".format(klass, method)
            )
        if isinstance(getattr(iface, method), abc.abstractproperty):
            # Can't properly verify these yet.
            continue
        sig = signature(getattr(iface, method))
        actual = signature(getattr(klass, method))
        if sig != actual:
            raise InterfaceNotImplemented(
                "{0}.{1}'s signature differs from the expected. Expected: "
                "{2!r}. Received: {3!r}".format(
                    klass, method, sig, actual
                )
            )
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def verify_interface(iface, klass):
    for method in iface.__abstractmethods__:
        if not hasattr(klass, method):
            raise InterfaceNotImplemented(
                "{0} is missing a {1!r} method".format(klass, method)
            )
        if isinstance(getattr(iface, method), abc.abstractproperty):
            # Can't properly verify these yet.
            continue
        sig = signature(getattr(iface, method))
        actual = signature(getattr(klass, method))
        if sig != actual:
            raise InterfaceNotImplemented(
                "{0}.{1}'s signature differs from the expected. Expected: "
                "{2!r}. Received: {3!r}".format(
                    klass, method, sig, actual
                )
            )
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def getargspec(func):
    if six.PY2:
        return inspect.getargspec(func)

    sig = inspect.signature(func)
    args = [
        p.name for p in sig.parameters.values()
        if p.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD
    ]
    varargs = [
        p.name for p in sig.parameters.values()
        if p.kind == inspect.Parameter.VAR_POSITIONAL
    ]
    varargs = varargs[0] if varargs else None
    varkw = [
        p.name for p in sig.parameters.values()
        if p.kind == inspect.Parameter.VAR_KEYWORD
    ]
    varkw = varkw[0] if varkw else None
    defaults = [
        p.default for p in sig.parameters.values()
        if p.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD and p.default is not p.empty
    ] or None
    return args, varargs, varkw, defaults
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def func_accepts_kwargs(func):
    if six.PY2:
        # Not all callables are inspectable with getargspec, so we'll
        # try a couple different ways but in the end fall back on assuming
        # it is -- we don't want to prevent registration of valid but weird
        # callables.
        try:
            argspec = inspect.getargspec(func)
        except TypeError:
            try:
                argspec = inspect.getargspec(func.__call__)
            except (TypeError, AttributeError):
                argspec = None
        return not argspec or argspec[2] is not None

    return any(
        p for p in inspect.signature(func).parameters.values()
        if p.kind == p.VAR_KEYWORD
    )
项目:auger    作者:laffra    | 项目源码 | 文件源码
def _call_matcher(self, _call):
        """
        Given a call (or simply a (args, kwargs) tuple), return a
        comparison key suitable for matching with other calls.
        This is a best effort method which relies on the spec's signature,
        if available, or falls back on the arguments themselves.
        """
        sig = self._spec_signature
        if sig is not None:
            if len(_call) == 2:
                name = ''
                args, kwargs = _call
            else:
                name, args, kwargs = _call
            try:
                return name, sig.bind(*args, **kwargs)
            except TypeError as e:
                e.__traceback__ = None
                return e
        else:
            return _call
项目:plotnine    作者:has2k1    | 项目源码 | 文件源码
def get_kwarg_names(func):
    """
    Return a list of valid kwargs to function func
    """
    try:
        # Python 3.5
        sig = inspect.signature(func)
    except AttributeError:
        # Below Python 3.5
        args, _, _, defaults = inspect.getargspec(func)
        if defaults:
            kwonlyargs = args[-len(defaults):]
        else:
            kwonlyargs = []
    else:
        kwonlyargs = [p.name for p in sig.parameters.values()
                      if p.default is not p.empty]

    return kwonlyargs
项目:api-retriever    作者:sbaltes    | 项目源码 | 文件源码
def _load_callback(callback_name):
        """
        Load a callback function by name and check its form (must have one parameter named "entity").
        :param callback_name: Name of the callback function to load.
        :return: The callback function.
        """
        try:
            callback_function = getattr(callbacks, callback_name)
            callback_parameters = signature(callback_function).parameters
            # check if callback has the correct form (only one parameter named "entity")
            if len(callback_parameters) == 1 and "entity" in callback_parameters:
                return callback_function
            else:
                raise IllegalArgumentError("Invalid callback: " + str(callback_name))
        except AttributeError:
            raise IllegalConfigurationError("Parsing configuration file failed: Callback "
                                            + callback_name + " not found.")
项目:SP-admin-server    作者:VenomzGaming    | 项目源码 | 文件源码
def add_command(self, command, permission):
        if command in self:
            raise CommandException('Cannot re-assign command ({command_name})'.format(command_name=command))

        def decorator(method):
            ## Retrieves the functions argument count.
            ## Used to verify the client/say command is valid.
            self[command] = (
                method,
                len(signature(method).parameters),
                permission
            )
            def new(*args):
                method(*args)
            return new
        return decorator
项目:py2p    作者:p2p-today    | 项目源码 | 文件源码
def register_handler(
                self,  # type: BaseSocket
                method  # type: Callable[..., Union[bool, None]]
        ):  # type: (...) -> None
            """Register a handler for incoming method.

            Args:
                method: A function with two given arguments. Its signature
                            should be of the form ``handler(msg, handler)``,
                            where msg is a :py:class:`py2p.base.Message`
                            object, and handler is a
                            :py:class:`py2p.base.BaseConnection` object. It
                            should return ``True`` if it performed an action,
                            to reduce the number of handlers checked.

            Raises:
                ValueError: If the method signature doesn't parse correctly
            """
            args = inspect.signature(method)
            if (len(args.parameters) !=
                (3 if args.parameters.get('self') else 2)):
                raise ValueError(
                    "This method must contain exactly two arguments "
                    "(or three if first is self)")
            self.__handlers.append(method)
项目:py2p    作者:p2p-today    | 项目源码 | 文件源码
def register_handler(
                self,  # type: BaseSocket
                method  # type: Callable[..., Union[bool, None]]
        ):  # type: (...) -> None
            """Register a handler for incoming method.

            Args:
                method: A function with two given arguments. Its signature
                            should be of the form ``handler(msg, handler)``,
                            where msg is a :py:class:`py2p.base.Message`
                            object, and handler is a
                            :py:class:`py2p.base.BaseConnection` object. It
                            should return ``True`` if it performed an action,
                            to reduce the number of handlers checked.

            Raises:
                ValueError: If the method signature doesn't parse correctly
            """
            args = inspect.getargspec(method)
            if (args[1:] != (None, None, None) or
                    len(args[0]) != (3 if args[0][0] == 'self' else 2)):
                raise ValueError(
                    "This method must contain exactly two arguments "
                    "(or three if first is self)")
            self.__handlers.append(method)
项目:warriorframework    作者:warriorframework    | 项目源码 | 文件源码
def yieldroutes(func):
    """ Return a generator for routes that match the signature (name, args)
    of the func parameter. This may yield more than one route if the function
    takes optional keyword arguments. The output is best described by example::

        a()         -> '/a'
        b(x, y)     -> '/b/<x>/<y>'
        c(x, y=5)   -> '/c/<x>' and '/c/<x>/<y>'
        d(x=5, y=6) -> '/d' and '/d/<x>' and '/d/<x>/<y>'
    """
    path = '/' + func.__name__.replace('__', '/').lstrip('/')
    spec = getargspec(func)
    argc = len(spec[0]) - len(spec[3] or [])
    path += ('/<%s>' * argc) % tuple(spec[0][:argc])
    yield path
    for arg in spec[0][argc:]:
        path += '/<%s>' % arg
        yield path
项目:adventurelib    作者:lordmauve    | 项目源码 | 文件源码
def _register(command, func, kwargs={}):
    """Register func as a handler for the given command."""
    pattern = Pattern(command)
    sig = inspect.signature(func)
    func_argnames = set(sig.parameters)
    when_argnames = set(pattern.argnames) | set(kwargs.keys())
    if func_argnames != when_argnames:
        raise InvalidCommand(
            'The function %s%s has the wrong signature for @when(%r)' % (
                func.__name__, sig, command
            ) + '\n\nThe function arguments should be (%s)' % (
                ', '.join(pattern.argnames + list(kwargs.keys()))
            )
        )

    commands.append((pattern, func, kwargs))
项目:deep_stack    作者:lyn716    | 项目源码 | 文件源码
def get_func_standard_and_necessary_keys(func):
    """????function?????.

    :param func: ????
    :return: standard_keys:??????????
             necessary_keys: ???????????????
    """
    standard_kwargs = dict(inspect.signature(func).parameters)
    necessary_keys = []

    for k, v in standard_kwargs.items():
        format_string = v.__str__()
        if "=" in format_string:
            standard_kwargs[k] = format_string.split("=")[-1]
        else:
            standard_kwargs[k] = None
            necessary_keys.append(k)

    return set(standard_kwargs.keys()), set(necessary_keys)
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def verify_interface(iface, klass):
    for method in iface.__abstractmethods__:
        if not hasattr(klass, method):
            raise InterfaceNotImplemented(
                "{0} is missing a {1!r} method".format(klass, method)
            )
        if isinstance(getattr(iface, method), abc.abstractproperty):
            # Can't properly verify these yet.
            continue
        sig = signature(getattr(iface, method))
        actual = signature(getattr(klass, method))
        if sig != actual:
            raise InterfaceNotImplemented(
                "{0}.{1}'s signature differs from the expected. Expected: "
                "{2!r}. Received: {3!r}".format(
                    klass, method, sig, actual
                )
            )
项目:RPoint    作者:george17-meet    | 项目源码 | 文件源码
def _call_matcher(self, _call):
        """
        Given a call (or simply a (args, kwargs) tuple), return a
        comparison key suitable for matching with other calls.
        This is a best effort method which relies on the spec's signature,
        if available, or falls back on the arguments themselves.
        """
        sig = self._spec_signature
        if sig is not None:
            if len(_call) == 2:
                name = ''
                args, kwargs = _call
            else:
                name, args, kwargs = _call
            try:
                return name, sig.bind(*args, **kwargs)
            except TypeError as e:
                e.__traceback__ = None
                return e
        else:
            return _call
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def verify_interface(iface, klass):
    for method in iface.__abstractmethods__:
        if not hasattr(klass, method):
            raise InterfaceNotImplemented(
                "{0} is missing a {1!r} method".format(klass, method)
            )
        if isinstance(getattr(iface, method), abc.abstractproperty):
            # Can't properly verify these yet.
            continue
        sig = signature(getattr(iface, method))
        actual = signature(getattr(klass, method))
        if sig != actual:
            raise InterfaceNotImplemented(
                "{0}.{1}'s signature differs from the expected. Expected: "
                "{2!r}. Received: {3!r}".format(
                    klass, method, sig, actual
                )
            )
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def verify_interface(iface, klass):
    for method in iface.__abstractmethods__:
        if not hasattr(klass, method):
            raise InterfaceNotImplemented(
                "{0} is missing a {1!r} method".format(klass, method)
            )
        if isinstance(getattr(iface, method), abc.abstractproperty):
            # Can't properly verify these yet.
            continue
        sig = signature(getattr(iface, method))
        actual = signature(getattr(klass, method))
        if sig != actual:
            raise InterfaceNotImplemented(
                "{0}.{1}'s signature differs from the expected. Expected: "
                "{2!r}. Received: {3!r}".format(
                    klass, method, sig, actual
                )
            )
项目:NebulaSolarDash    作者:toddlerya    | 项目源码 | 文件源码
def yieldroutes(func):
    """ Return a generator for routes that match the signature (name, args)
    of the func parameter. This may yield more than one route if the function
    takes optional keyword arguments. The output is best described by example::

        a()         -> '/a'
        b(x, y)     -> '/b/<x>/<y>'
        c(x, y=5)   -> '/c/<x>' and '/c/<x>/<y>'
        d(x=5, y=6) -> '/d' and '/d/<x>' and '/d/<x>/<y>'
    """
    path = '/' + func.__name__.replace('__', '/').lstrip('/')
    spec = getargspec(func)
    argc = len(spec[0]) - len(spec[3] or [])
    path += ('/<%s>' * argc) % tuple(spec[0][:argc])
    yield path
    for arg in spec[0][argc:]:
        path += '/<%s>' % arg
        yield path
项目:bottle_beginner    作者:denzow    | 项目源码 | 文件源码
def yieldroutes(func):
    """ Return a generator for routes that match the signature (name, args)
    of the func parameter. This may yield more than one route if the function
    takes optional keyword arguments. The output is best described by example::

        a()         -> '/a'
        b(x, y)     -> '/b/<x>/<y>'
        c(x, y=5)   -> '/c/<x>' and '/c/<x>/<y>'
        d(x=5, y=6) -> '/d' and '/d/<x>' and '/d/<x>/<y>'
    """
    path = '/' + func.__name__.replace('__', '/').lstrip('/')
    spec = getargspec(func)
    argc = len(spec[0]) - len(spec[3] or [])
    path += ('/<%s>' * argc) % tuple(spec[0][:argc])
    yield path
    for arg in spec[0][argc:]:
        path += '/<%s>' % arg
        yield path
项目:calm    作者:bagrat    | 项目源码 | 文件源码
def __init__(self, uri, uri_regex, handler):
        super(HandlerDef, self).__init__()

        self.uri = uri
        self.uri_regex = uri_regex
        self.handler = handler
        self._signature = inspect.signature(handler)
        self._params = {
            k: v for k, v in list(
                self._signature.parameters.items()
            )[1:]
        }

        self.path_args = []
        self.query_args = []

        self.consumes = getattr(handler, 'consumes', None)
        self.produces = getattr(handler, 'produces', None)
        self.errors = getattr(handler, 'errors', [])
        self.deprecated = getattr(handler, 'deprecated', False)

        self._extract_arguments()
        self.operation_definition = self._generate_operation_definition()
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def verify_interface(iface, klass):
    for method in iface.__abstractmethods__:
        if not hasattr(klass, method):
            raise InterfaceNotImplemented(
                "{0} is missing a {1!r} method".format(klass, method)
            )
        if isinstance(getattr(iface, method), abc.abstractproperty):
            # Can't properly verify these yet.
            continue
        sig = signature(getattr(iface, method))
        actual = signature(getattr(klass, method))
        if sig != actual:
            raise InterfaceNotImplemented(
                "{0}.{1}'s signature differs from the expected. Expected: "
                "{2!r}. Received: {3!r}".format(
                    klass, method, sig, actual
                )
            )
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def verify_interface(iface, klass):
    for method in iface.__abstractmethods__:
        if not hasattr(klass, method):
            raise InterfaceNotImplemented(
                "{0} is missing a {1!r} method".format(klass, method)
            )
        if isinstance(getattr(iface, method), abc.abstractproperty):
            # Can't properly verify these yet.
            continue
        sig = signature(getattr(iface, method))
        actual = signature(getattr(klass, method))
        if sig != actual:
            raise InterfaceNotImplemented(
                "{0}.{1}'s signature differs from the expected. Expected: "
                "{2!r}. Received: {3!r}".format(
                    klass, method, sig, actual
                )
            )
项目:MCSManager-fsmodule    作者:Suwings    | 项目源码 | 文件源码
def yieldroutes(func):
    """ Return a generator for routes that match the signature (name, args)
    of the func parameter. This may yield more than one route if the function
    takes optional keyword arguments. The output is best described by example::

        a()         -> '/a'
        b(x, y)     -> '/b/<x>/<y>'
        c(x, y=5)   -> '/c/<x>' and '/c/<x>/<y>'
        d(x=5, y=6) -> '/d' and '/d/<x>' and '/d/<x>/<y>'
    """
    path = '/' + func.__name__.replace('__', '/').lstrip('/')
    spec = getargspec(func)
    argc = len(spec[0]) - len(spec[3] or [])
    path += ('/<%s>' * argc) % tuple(spec[0][:argc])
    yield path
    for arg in spec[0][argc:]:
        path += '/<%s>' % arg
        yield path
项目:newton-sketch    作者:huisaddison    | 项目源码 | 文件源码
def auto_assign(func):
    signature = inspect.signature(func)

    def wrapper(*args, **kwargs):
        instance = args[0]
        bind = signature.bind(*args, **kwargs)
        for param in signature.parameters.values():
            if param.name != 'self':
                if param.name in bind.arguments:
                    setattr(instance, param.name, bind.arguments[param.name])
                if param.name not in bind.arguments and param.default is not param.empty:
                    setattr(instance, param.name, param.default)
        return func(*args, **kwargs)

    wrapper.__signature__ = signature # Restore the signature

    return wrapper
项目:mockito-python    作者:kaste    | 项目源码 | 文件源码
def get_signature(obj, method_name):
    method = getattr(obj, method_name)

    # Eat self for unbound methods bc signature doesn't do it
    if PY3:
        if (inspect.isclass(obj) and
                not inspect.ismethod(method) and
                not isinstance(
                    obj.__dict__.get(method_name),
                    staticmethod)):
            method = functools.partial(method, None)
    else:
        if (isinstance(method, types.UnboundMethodType) and
                method.__self__ is None):
            method = functools.partial(method, None)

    try:
        return signature(method)
    except:
        return None
项目:guillotina    作者:plone    | 项目源码 | 文件源码
def run(self, arguments, settings, app):
        aiotask_context.set('request', self.request)
        script = os.path.abspath(arguments.script)
        spec = importlib.util.spec_from_file_location("module.name", script)
        module = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(module)
        if not hasattr(module, 'run'):
            logger.warn(f'Not `async def run()` function found in file {script}')
            return
        sig = inspect.signature(module.run)
        if 'container' in sig.parameters:
            async for txn, tm, container in get_containers(self.request):
                await module.run(container)
                await tm.commit(txn=txn)
        else:
            await lazy_apply(module.run, app)
项目:graphene-mongo    作者:joaovitorsilvestre    | 项目源码 | 文件源码
def test_gen_mutation(mock_person):
    import inspect
    import graphene
    from graphene.utils.str_converters import to_snake_case

    from graphene_mongo.mutation import gen_mutation
    from graphene_mongo.model import ModelSchema

    model_schema = ModelSchema(mock_person, mock_person._fields, None, None)

    result = gen_mutation(mock_person, model_schema.schema, model_schema.operators_mutation,
                          model_schema.fields_mutation, None, None)

    assert issubclass(result, graphene.Mutation)
    assert hasattr(result, 'mutate')

    assert result._meta.name == 'Create' + mock_person.__name__
    assert result._meta.local_fields[to_snake_case(mock_person.__name__)]
    assert result._meta.fields[to_snake_case(mock_person.__name__)]

    operators_mutation = inspect.signature(result.__dict__['Field']).parameters['args'].default

    assert operators_mutation == model_schema.operators_mutation
项目:python-cookbook-3rd    作者:tuanavu    | 项目源码 | 文件源码
def __init__(self, clsname, bases, clsdict):
        super().__init__(clsname, bases, clsdict)
        sup = super(self, self)
        for name, value in clsdict.items():
            if name.startswith('_') or not callable(value):
                continue
            # Get the previous definition (if any) and compare the signatures
            prev_dfn = getattr(sup,name,None)
            if prev_dfn:
                prev_sig = signature(prev_dfn)
                val_sig = signature(value)
                if prev_sig != val_sig:
                    logging.warning('Signature mismatch in %s. %s != %s',
                                value.__qualname__, str(prev_sig), str(val_sig))

# Example
项目:python-cookbook-3rd    作者:tuanavu    | 项目源码 | 文件源码
def register(self, meth):
        '''
        Register a new method as a multimethod
        '''
        sig = inspect.signature(meth)

        # Build a type-signature from the method's annotations
        types = []
        for name, parm in sig.parameters.items():
            if name == 'self': 
                continue
            if parm.annotation is inspect.Parameter.empty:
                raise TypeError(
                    'Argument {} must be annotated with a type'.format(name)
                    )
            if not isinstance(parm.annotation, type):
                raise TypeError(
                    'Argument {} annotation must be a type'.format(name)
                    )
            if parm.default is not inspect.Parameter.empty:
                self._methods[tuple(types)] = meth
            types.append(parm.annotation)

        self._methods[tuple(types)] = meth
项目:python-cookbook-3rd    作者:tuanavu    | 项目源码 | 文件源码
def typeassert(*ty_args, **ty_kwargs):
    def decorate(func):
        # If in optimized mode, disable type checking
        if not __debug__:
            return func

        # Map function argument names to supplied types
        sig = signature(func)
        bound_types = sig.bind_partial(*ty_args, **ty_kwargs).arguments

        @wraps(func)
        def wrapper(*args, **kwargs):
            bound_values = sig.bind(*args, **kwargs)
            # Enforce type assertions across supplied arguments
            for name, value in bound_values.arguments.items():
                if name in bound_types:
                    if not isinstance(value, bound_types[name]):
                        raise TypeError(
                            'Argument {} must be {}'.format(name, bound_types[name])
                            )
            return func(*args, **kwargs)
        return wrapper
    return decorate

# Examples
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def pdef(self, obj, oname=''):
        """Print the call signature for any callable object.

        If the object is a class, print the constructor information."""

        if not callable(obj):
            print('Object is not callable.')
            return

        header = ''

        if inspect.isclass(obj):
            header = self.__head('Class constructor information:\n')


        output = self._getdef(obj,oname)
        if output is None:
            self.noinfo('definition header',oname)
        else:
            print(header,self.format(output), end=' ')

    # In Python 3, all classes are new-style, so they all have __init__.
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def _default_arguments_from_docstring(self, doc):
        """Parse the first line of docstring for call signature.

        Docstring should be of the form 'min(iterable[, key=func])\n'.
        It can also parse cython docstring of the form
        'Minuit.migrad(self, int ncall=10000, resume=True, int nsplit=1)'.
        """
        if doc is None:
            return []

        #care only the firstline
        line = doc.lstrip().splitlines()[0]

        #p = re.compile(r'^[\w|\s.]+\(([^)]*)\).*')
        #'min(iterable[, key=func])\n' -> 'iterable[, key=func]'
        sig = self.docstring_sig_re.search(line)
        if sig is None:
            return []
        # iterable[, key=func]' -> ['iterable[' ,' key=func]']
        sig = sig.groups()[0].split(',')
        ret = []
        for s in sig:
            #re.compile(r'[\s|\[]*(\w+)(?:\s*=\s*.*)')
            ret += self.docstring_kwd_re.findall(s)
        return ret
项目:Expert-Python-Programming_Second-Edition    作者:PacktPublishing    | 项目源码 | 文件源码
def ensure_interface(function):
    signature = inspect.signature(function)
    parameters = signature.parameters

    @wraps(function)
    def wrapped(*args, **kwargs):
        bound = signature.bind(*args, **kwargs)
        for name, value in bound.arguments.items():
            annotation = parameters[name].annotation

            if not isinstance(annotation, ABCMeta):
                continue

            if not isinstance(value, annotation):
                raise TypeError(
                    "{} does not implement {} interface"
                    "".format(value, annotation)

                )

        function(*args, **kwargs)

    return wrapped
项目:datatest    作者:shawnbrown    | 项目源码 | 文件源码
def _get_arg_lengths(func):
    """Returns a two-tuple containing the number of positional arguments
    as the first item and the number of variable positional arguments as
    the second item.
    """
    try:
        funcsig = inspect.signature(func)
        params_dict = funcsig.parameters
        parameters = params_dict.values()
        args_type = (inspect._POSITIONAL_OR_KEYWORD, inspect._POSITIONAL_ONLY)
        args = [x for x in parameters if x.kind in args_type]
        vararg = [x for x in parameters if x.kind == inspect._VAR_POSITIONAL]
        vararg = vararg.pop() if vararg else None
    except AttributeError:
        try:
            try:  # For Python 3.2 and earlier.
                args, vararg = inspect.getfullargspec(func)[:2]
            except AttributeError:  # For Python 2.7 and earlier.
                args, vararg = inspect.getargspec(func)[:2]
        except TypeError:     # In 3.2 and earlier, raises TypeError
            raise ValueError  # but 3.3 and later raise a ValueError.
    return (len(args), (1 if vararg else 0))
项目:caproto    作者:NSLS-II    | 项目源码 | 文件源码
def test_extended_headers_smoke(header_name):
    header = getattr(ca, header_name)
    sig = inspect.signature(header)

    regular_bind_args = {}
    extended_bind_args = {}
    for param in sig.parameters.keys():
        regular_bind_args[param] = 0
        extended_bind_args[param] = 2 ** 32

    reg_args = sig.bind(**regular_bind_args)
    reg_hdr = header(*reg_args.args, **reg_args.kwargs)

    ext_args = sig.bind(**extended_bind_args)
    ext_hdr = header(*ext_args.args, **ext_args.kwargs)

    print(reg_hdr)
    assert isinstance(reg_hdr, ca.MessageHeader)
    print(ext_hdr)
    assert isinstance(ext_hdr, ca.ExtendedMessageHeader)
项目:daisy    作者:llllllllll    | 项目源码 | 文件源码
def __init__(self, func):
        self._func = func
        self.__signature__ = signature(func)
        update_wrapper(self, func)
项目:speccer    作者:bensimner    | 项目源码 | 文件源码
def next(self, f):
        return Command(self.fdo, self.fpre, self.fpost, f, self.name)

    # these are helper functions to the type signature of the `fdo` function
项目:speccer    作者:bensimner    | 项目源码 | 文件源码
def signature(self):
        s = inspect.signature(self.fdo)
        return s
项目:speccer    作者:bensimner    | 项目源码 | 文件源码
def return_annotation(self):
        r = self.signature.return_annotation
        if r is not inspect._empty:
            return r
        return None
项目:speccer    作者:bensimner    | 项目源码 | 文件源码
def parameters(self):
        s = self.signature
        return s.parameters