Python typing 模块,Generic() 实例源码

我们从Python开源项目中,提取了以下5个代码示例,用于说明如何使用typing.Generic()

项目:speccer    作者:bensimner    | 项目源码 | 文件源码
def _from_typing36(t):
    '''Support for 3.6 version of typing module
    '''

    try:
        origin = t.__origin__
    except AttributeError:
        # not typing.Generic
        return Typeable(typ=t, origin=None, args=[], arity=0)

    # is a base type
    if not origin:
        return Typeable(typ=t, origin=None, args=[], arity=1)

    args = [from_type(t_) for t_ in t.__args__]
    return Typeable(typ=t, origin=from_type(origin), args=args, arity=get_arity(origin, args))
项目:speccer    作者:bensimner    | 项目源码 | 文件源码
def get_arity(origin, args=[]):
    '''Gets the arity of some typing type
    '''
    if isinstance(origin, typing.Generic):
        return 1 - (1 if args else 0)

    return 0
项目:oshino    作者:CodersOfTheNight    | 项目源码 | 文件源码
def main_loop(cfg: Config,
                    logger: Logger,
                    transport_cls: Generic[T],
                    continue_fn: callable,
                    loop: BaseEventLoop):
    riemann = cfg.riemann
    transport = transport_cls(riemann.host, riemann.port)
    client = processor.QClient(transport)
    agents = create_agents(cfg.agents)
    register_augments(client, cfg.augments, logger)
    executor = cfg.executor_class(max_workers=cfg.executors_count)
    loop.set_default_executor(executor)

    init(agents)

    while True:
        ts = time()
        (done, pending) = await step(client,
                                     agents,
                                     timeout=cfg.interval * 1.5,
                                     loop=loop)

        te = time()
        td = te - ts
        instrumentation(client,
                        logger,
                        cfg.interval,
                        td,
                        len(client.queue.events),
                        len(pending))

        await processor.flush(client, transport, logger)
        if continue_fn():
            await asyncio.sleep(cfg.interval - int(td), loop=loop)
        else:
            logger.info("Stopping Oshino")
            break

    client.on_stop()
项目:pytypes    作者:Stewori    | 项目源码 | 文件源码
def _detect_issue351():
    """Detect if github.com/python/typing/issues/351 applies
    to the installed typing-version.
    """
    class Tuple(typing.Generic[typing.T]):
        pass

    res = Tuple[str] == typing.Tuple[str]
    del Tuple
    return res
项目:pytypes    作者:Stewori    | 项目源码 | 文件源码
def __Generic__new__(cls, *args, **kwds):
        # this is based on Generic.__new__ from typing-3.5.2.2
        if cls.__origin__ is None:
            obj = cls.__next_in_mro__.__new__(cls)
            obj.__orig_class__ = cls
        else:
            origin = typing._gorg(cls)
            obj = cls.__next_in_mro__.__new__(origin)
            obj.__orig_class__ = cls
            obj.__init__(*args, **kwds)
        return obj