Python typing 模块,Type() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用typing.Type()

项目:Hanabi-AI    作者:MeGotsThis    | 项目源码 | 文件源码
def __init__(self,
                 username: str,
                 password: str,
                 botModule: str,
                 botconfig: Mapping,
                 numPlayers: int,
                 variant: Variant,
                 spectators: bool,
                 gameName: str,
                 *args,
                 **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.username: str = username
        self.password: str = password
        module = importlib.import_module(botModule + '.bot')
        self.botCls: Type[Bot] = module.Bot  # type: ignore
        self.botconfig: Mapping = botconfig
        self.numPlayers: int = numPlayers
        self.variant: Variant = variant
        self.spectators: bool = spectators
        self.gameName: str = gameName
        self.conn: socketIO_client.SocketIO
        self.tablePlayers: List[str] = []
        self.readyToStart: bool = False
        self.game: Optional[Game] = None
项目:Lyra    作者:caterinaurban    | 项目源码 | 文件源码
def __init__(self, variables: List[VariableIdentifier], lattices: Dict[Type, Type[Lattice]],
                 arguments: Dict[Type, Dict[str, Any]] = defaultdict(lambda: dict())):
        """Create a mapping Var -> L from each variable in Var to the corresponding element in L.

        :param variables: list of program variables
        :param lattices: dictionary from variable types to the corresponding lattice types
        :param arguments: dictionary from variable types to arguments of the corresponding lattices
        """
        super().__init__()
        self._variables = variables
        self._lattices = lattices
        self._arguments = arguments
        try:
            self._store = {v: lattices[type(v.typ)](**arguments[type(v.typ)]) for v in variables}
        except KeyError as key:
            error = f"Missing lattice for variable type {repr(key.args[0])}!"
            raise ValueError(error)
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def dispatch(method: Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]) -> Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]:
        dispatcher = singledispatch(method)
        provides = set()

        def wrapper(self: Any, type: Type[T], query: Mapping[str, Any], context: PipelineContext = None) -> Any:
            call = dispatcher.dispatch(type)
            try:
                return call(self, query, context=context)
            except TypeError:
                raise DataSource.unsupported(type)

        def register(type: Type[T]) -> Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]:
            provides.add(type)
            return dispatcher.register(type)

        wrapper.register = register
        wrapper._provides = provides
        update_wrapper(wrapper, method)
        return wrapper
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def dispatch(method: Callable[[Any, Type[T], Any, PipelineContext], None]) -> Callable[[Any, Type[T], Any, PipelineContext], None]:
        dispatcher = singledispatch(method)
        accepts = set()

        def wrapper(self: Any, type: Type[T], items: Any, context: PipelineContext = None) -> None:
            call = dispatcher.dispatch(type)
            try:
                return call(self, items, context=context)
            except TypeError:
                raise DataSink.unsupported(type)

        def register(type: Type[T]) -> Callable[[Any, Type[T], Any, PipelineContext], None]:
            accepts.add(type)
            return dispatcher.register(type)

        wrapper.register = register
        wrapper._accepts = accepts
        update_wrapper(wrapper, method)
        return wrapper
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def _transform(self, source_type: Type[S], target_type: Type[T]) -> Tuple[Callable[[S], T], int]:
        try:
            LOGGER.info("Searching type graph for shortest path from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__))
            path = dijkstra_path(self._type_graph, source=source_type, target=target_type, weight="cost")
            LOGGER.info("Found a path from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__))
        except (KeyError, NetworkXNoPath):
            raise NoConversionError("Pipeline can't convert \"{source_type}\" to \"{target_type}\"".format(source_type=source_type, target_type=target_type))

        LOGGER.info("Building transformer chain from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__))
        chain = []
        cost = 0
        for source, target in _pairwise(path):
            transformer = self._type_graph.adj[source][target][_TRANSFORMER]
            chain.append((transformer, target))
            cost += transformer.cost
        LOGGER.info("Built transformer chain from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__))

        if not chain:
            return _identity, 0

        return partial(_transform, transformer_chain=chain), cost
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def _best_transform_from(self, source_type: Type[S], target_types: Iterable[Type]) -> Tuple[Callable[[S], Any], Type, int]:
        best = None
        best_cost = _MAX_TRANSFORM_COST
        to_type = None
        for target_type in target_types:
            try:
                transform, cost = self._transform(source_type, target_type)
                if cost < best_cost:
                    best = transform
                    best_cost = cost
                    to_type = target_type
            except NoConversionError:
                pass
        if best is None:
            raise NoConversionError("Pipeline can't convert \"{source_type}\" to any of \"{target_types}\"".format(source_type=source_type, target_types=target_types))
        return best, to_type, best_cost
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def _best_transform_to(self, target_type: Type[T], source_types: Iterable[Type]) -> Tuple[Callable[[T], Any], Type, int]:
        best = None
        best_cost = _MAX_TRANSFORM_COST
        from_type = None
        for source_type in source_types:
            try:
                transform, cost = self._transform(source_type, target_type)
                if cost < best_cost:
                    best = transform
                    best_cost = cost
                    from_type = source_type
            except NoConversionError:
                pass
        if best is None:
            raise NoConversionError("Pipeline can't convert from any of \"{source_types}\" to \"{target_type}\"".format(source_types=source_types, target_type=target_type))
        return best, from_type, best_cost
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def _create_source_handlers(self, type: Type[T]) -> List[_SourceHandler]:
        source_handlers = []
        for source, targets in self._sources:
            if TYPE_WILDCARD in source.provides or type in source.provides:
                sink_handlers = self._create_sink_handlers(type, targets)
                source_handlers.append(_SourceHandler(source, type, _identity, {sink_handler: False for sink_handler in sink_handlers}))
            else:
                try:
                    transform, source_type, cost = self._best_transform_to(type, source.provides)
                    # If we got past the above function call, then there is a transformer from `source_type` to `type`
                    pre_handlers, post_handlers = self._create_sink_handlers_simultaneously(source_type, transform, type, targets)
                    sink_handlers = {sink_handler: False for sink_handler in pre_handlers}
                    sink_handlers.update({sink_handler: True for sink_handler in post_handlers})
                    source_handlers.append(_SourceHandler(source, source_type, transform, sink_handlers))
                except NoConversionError:
                    pass

        return source_handlers
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def put(self, type: Type[T], item: T) -> None:
        """Puts an objects into the data pipeline. The object may be transformed into a new type for insertion if necessary.

        Args:
            item: The object to be inserted into the data pipeline.
        """
        LOGGER.info("Getting SinkHandlers for \"{type}\"".format(type=type.__name__))
        try:
            handlers = self._put_types[type]
        except KeyError:
            try:
                LOGGER.info("Building new SinkHandlers for \"{type}\"".format(type=type.__name__))
                handlers = self._put_handlers(type)
            except NoConversionError:
                handlers = None
            self._get_types[type] = handlers

        LOGGER.info("Creating new PipelineContext")
        context = self._new_context()

        LOGGER.info("Sending item \"{item}\" to SourceHandlers".format(item=item))
        if handlers is not None:
            for handler in handlers:
                handler.put(item, context)
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def put_many(self, type: Type[T], items: Iterable[T]) -> None:
        """Puts multiple objects of the same type into the data sink. The objects may be transformed into a new type for insertion if necessary.

        Args:
            items: An iterable (e.g. list) of objects to be inserted into the data pipeline.
        """
        LOGGER.info("Getting SinkHandlers for \"{type}\"".format(type=type.__name__))
        try:
            handlers = self._put_types[type]
        except KeyError:
            try:
                LOGGER.info("Building new SinkHandlers for \"{type}\"".format(type=type.__name__))
                handlers = self._put_handlers(type)
            except NoConversionError:
                handlers = None
            self._get_types[type] = handlers

        LOGGER.info("Creating new PipelineContext")
        context = self._new_context()

        LOGGER.info("Sending items \"{items}\" to SourceHandlers".format(items=items))
        if handlers is not None:
            items = list(items)
            for handler in handlers:
                handler.put_many(items, context)
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def with_default(self, value: Union[Any, Callable[[MutableMapping[str, Any]], Any]], supplies_type: Type = None) -> "QueryValidator":
        if self._current is None or self._current.child is not None:
            raise QueryValidatorStructureError("No key is selected! Try using \"can_have\" before \"with_default\".")

        if self._current.required:
            raise QueryValidatorStructureError("Can't assign a default value to a required key! Try using \"can_have\" instead of \"have\".")

        if supplies_type:
            expected_type = supplies_type
        else:
            expected_type = type(value)

        default_node = _DefaultValueNode(self._current.key, value, supplies_type)
        result = self.as_(expected_type)
        result._current.child.child = default_node
        return result
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def dispatch(method: Callable[[Any, Type[T], F, PipelineContext], T]) -> Callable[[Any, Type[T], F, PipelineContext], T]:
        dispatcher = singledispatch(method)
        transforms = {}

        def wrapper(self: Any, target_type: Type[T], value: F, context: PipelineContext = None) -> T:
            call = dispatcher.dispatch(TypePair[value.__class__, target_type])
            try:
                return call(self, value, context=context)
            except TypeError:
                raise DataTransformer.unsupported(target_type, value)

        def register(from_type: Type[F], to_type: Type[T]) -> Callable[[Any, Type[T], F, PipelineContext], T]:
            try:
                target_types = transforms[from_type]
            except KeyError:
                target_types = set()
                transforms[from_type] = target_types
            target_types.add(to_type)

            return dispatcher.register(TypePair[from_type, to_type])

        wrapper.register = register
        wrapper._transforms = transforms
        update_wrapper(wrapper, method)
        return wrapper
项目:allennlp    作者:allenai    | 项目源码 | 文件源码
def _initializer_wrapper(init_function: Callable[..., None]) -> Type[Initializer]:
    class Init(Initializer):
        def __init__(self, **kwargs):
            self._init_function = init_function
            self._kwargs = kwargs
        def __call__(self, tensor: torch.autograd.Variable) -> None:
            self._init_function(tensor, **self._kwargs)
        def __repr__(self):
            return 'Init: %s, with params: %s' % (self._init_function, self._kwargs)
        @classmethod
        def from_params(cls, params: Params):
            return cls(**params.as_dict())
    return Init


# There are no classes to decorate, so we hack these into Registrable._registry
项目:brainiak    作者:brainiak    | 项目源码 | 文件源码
def multimask_images(images: Iterable[SpatialImage],
                     masks: Sequence[np.ndarray], image_type: type = None
                     ) -> Iterable[Sequence[np.ndarray]]:
    """Mask images with multiple masks.

    Parameters
    ----------
    images:
        Images to mask.
    masks:
        Masks to apply.
    image_type:
        Type to cast images to.

    Yields
    ------
    Sequence[np.ndarray]
        For each mask, a masked image.
    """
    for image in images:
        yield [mask_image(image, mask, image_type) for mask in masks]
项目:brainiak    作者:brainiak    | 项目源码 | 文件源码
def mask_images(images: Iterable[SpatialImage], mask: np.ndarray,
                image_type: type = None) -> Iterable[np.ndarray]:
    """Mask images.

    Parameters
    ----------
    images:
        Images to mask.
    mask:
        Mask to apply.
    image_type:
        Type to cast images to.

    Yields
    ------
    np.ndarray
        Masked image.
    """
    for images in multimask_images(images, (mask,), image_type):
        yield images[0]
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def get_parsers_classes(self, filter_name: str=None) -> List[Type['BaseParser']]:
        parsers_list = list()
        for parser in self.parsers:
            parser_name = getattr(parser, 'name')
            if filter_name:
                if filter_name in parser_name:
                    if parser_name == 'generic':
                        parsers_list.append(parser)
                    else:
                        parsers_list.insert(0, parser)
            else:
                if parser_name == 'generic':
                    parsers_list.append(parser)
                else:
                    parsers_list.insert(0, parser)

        return parsers_list
项目:asyncqlio    作者:SunDwarf    | 项目源码 | 文件源码
def get_table(self, table_name: str) -> 'typing.Type[Table]':
        """
        Gets a table from the current metadata.

        :param table_name: The name of the table to get.
        :return: A :class:`.Table` object.
        """
        try:
            return self.tables[table_name]
        except KeyError:
            # we can load this from the name instead
            for table in self.tables.values():
                if table.__name__ == table_name:
                    return table
            else:
                return None
项目:asyncqlio    作者:SunDwarf    | 项目源码 | 文件源码
def run_update_query(self, query: 'md_query.BaseQuery'):
        """
        Executes an update query.

        :param query: The :class:`.RowUpdateQuery` or :class:`.BulkUpdateQuery` to execute.
        """
        if isinstance(query, md_query.RowUpdateQuery):
            for row, (sql, params) in zip(query.rows_to_update, query.generate_sql()):
                if md_inspection._get_mangled(row, "deleted"):
                    raise RuntimeError("Row '{}' is marked as deleted".format(row))

                if sql is None and params is None:
                    continue

                await self.execute(sql, params)
                # copy the history of the row
                row._previous_values = row._values
        elif isinstance(query, md_query.BulkUpdateQuery):
            sql, params = query.generate_sql()
            await self.execute(sql, params)
        else:
            raise TypeError("Type {0.__class__.__name__} is not an update query".format(query))

        return query
项目:asyncqlio    作者:SunDwarf    | 项目源码 | 文件源码
def run_delete_query(self, query: 'md_query.RowDeleteQuery'):
        """
        Executes a delete query.

        :param query: The :class:`.RowDeleteQuery` or :class:`.BulkDeleteQuery` to execute.
        """
        if isinstance(query, md_query.RowDeleteQuery):
            for row, (sql, params) in zip(query.rows_to_delete, query.generate_sql()):
                if md_inspection._get_mangled(row, "deleted"):
                    raise RuntimeError("Row '{}' is already marked as deleted".format(row))

                if sql is None and params is None:
                    continue

                await self.execute(sql, params)
                md_inspection._set_mangled(row, "deleted", True)
        elif isinstance(query, md_query.BulkDeleteQuery):
            sql, params = query.generate_sql()
            await self.execute(sql, params)
        else:
            raise TypeError("Type {0.__class__.__name__} is not a delete query".format(query))

        return query
项目:pyimc    作者:oysstu    | 项目源码 | 文件源码
def __init__(self, lsf_path: str, types: List[Type[pyimc.Message]] = None, make_index=True):
        """
        Reads an LSF file.
        :param lsf_path: The path to the LSF file.
        :param types: The message types to return. List of pyimc message classes.
        :param make_index: If true, an index that speeds up subsequent reads is created.
        """
        self.fpath = lsf_path
        self.f = None  # type: io.BufferedIOBase
        self.header = IMCHeader()  # Preallocate header buffer
        self.parser = pyimc.Parser()
        self.idx = {}  # type: Dict[Union[int, str], List[int]]
        self.make_index = make_index

        if types:
            self.msg_types = [pyimc.Factory.id_from_abbrev(x.__name__) for x in types]
        else:
            self.msg_types = None
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def get_all_subclasses(cls: t.Type[T]) -> t.Iterable[t.Type['T']]:
    """Returns all subclasses of the given class.

    Stolen from:
    https://stackoverflow.com/questions/3862310/how-can-i-find-all-subclasses-of-a-class-given-its-name

    :param cls: The parent class
    :returns: A list of all subclasses
    """
    all_subclasses = []

    for subclass in cls.__subclasses__():
        all_subclasses.append(subclass)
        all_subclasses.extend(get_all_subclasses(subclass))

    return all_subclasses
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def _filter_or_404(model: t.Type[Y], get_all: bool,
                   criteria: t.Tuple) -> t.Union[Y, t.Sequence[Y]]:
    """Get the specified object by filtering or raise an exception.

    :param get_all: Get all objects if ``True`` else get a single one.
    :param model: The object to get.
    :param criteria: The criteria to filter with.
    :returns: The requested object.

    :raises APIException: If no object with the given id could be found.
        (OBJECT_ID_NOT_FOUND)
    """
    crit_str = ' AND '.join(str(crit) for crit in criteria)
    query = model.query.filter(*criteria)  # type: ignore
    obj = query.all() if get_all else query.one_or_none()
    if not obj:
        raise psef.errors.APIException(
            f'The requested {model.__name__.lower()} was not found',
            f'There is no "{model.__name__}" when filtering with {crit_str}',
            psef.errors.APICodes.OBJECT_ID_NOT_FOUND, 404
        )
    return obj
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def filter_all_or_404(model: t.Type[Y], *criteria: t.Any) -> t.Sequence[Y]:
    """Get all objects of the specified model filtered by the specified
    criteria.

    .. note::
        ``Y`` is bound to :py:class:`psef.models.Base`, so it should be a
        SQLAlchemy model.

    :param model: The object to get.
    :param criteria: The criteria to filter with.
    :returns: The requested objects.

    :raises APIException: If no object with the given id could be found.
        (OBJECT_ID_NOT_FOUND)
    """
    return t.cast(t.Sequence[Y], _filter_or_404(model, True, criteria))
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def filter_single_or_404(model: t.Type[Y], *criteria: t.Any) -> Y:
    """Get a single object of the specified model by filtering or raise an
    exception.

    .. note::
        ``Y`` is bound to :py:class:`psef.models.Base`, so it should be a
        SQLAlchemy model.

    :param model: The object to get.
    :param criteria: The criteria to filter with.
    :returns: The requested object.

    :raises APIException: If no object with the given id could be found.
        (OBJECT_ID_NOT_FOUND)
    """
    return t.cast(Y, _filter_or_404(model, False, criteria))
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def get_or_404(model: t.Type[Y], object_id: t.Any) -> Y:
    """Get the specified object by primary key or raise an exception.

    .. note::
        ``Y`` is bound to :py:class:`psef.models.Base`, so it should be a
        SQLAlchemy model.

    :param model: The object to get.
    :param object_id: The primary key identifier for the given object.
    :returns: The requested object.

    :raises APIException: If no object with the given id could be found.
        (OBJECT_ID_NOT_FOUND)
    """
    obj: t.Optional[Y] = model.query.get(object_id)
    if obj is None:
        raise psef.errors.APIException(
            f'The requested "{model.__name__}" was not found',
            f'There is no "{model.__name__}" with primary key {object_id}',
            psef.errors.APICodes.OBJECT_ID_NOT_FOUND, 404
        )
    return obj
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def create_from_request(cls: t.Type['LTI'], req: flask.Request) -> 'LTI':
        params = req.form.copy()

        lti_provider = models.LTIProvider.query.filter_by(
            key=params['oauth_consumer_key']
        ).first()
        if lti_provider is None:
            lti_provider = models.LTIProvider(key=params['oauth_consumer_key'])
            db.session.add(lti_provider)
            db.session.commit()

        params['lti_provider_id'] = lti_provider.id

        # This is semi sensitive information so it should not end up in the JWT
        # token.
        launch_params = {}
        for key, value in params.items():
            if not key.startswith('oauth'):
                launch_params[key] = value

        self = cls(launch_params, lti_provider)

        auth.ensure_valid_oauth(self.key, self.secret, req)

        return self
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def ensure_valid_oauth(
    key: str,
    secret: str,
    request: t.Any,
    parser_cls: t.Type = _FlaskOAuthValidator
) -> None:
    """Make sure the given oauth key and secret is valid for the given request.

    :param str key: The oauth key to be used for validating.
    :param str secret: The oauth secret to be used for validating.
    :param object request: The request that should be validated.
    :param RequestValidatorMixin parser_cls: The class used to parse the given
        ``request`` it should subclass :py:class:`RequestValidatorMixin` and
        should at least override the
        :func:`RequestValidatorMixin.parse_request` method.
    :returns: Nothing
    """
    validator = parser_cls(key, secret)
    if not validator.is_valid_request(request):
        raise PermissionException(
            'No valid oauth request could be found.',
            'The given request is not a valid oauth request.',
            APICodes.INVALID_OAUTH_REQUEST, 400
        )
项目:ribosome    作者:tek    | 项目源码 | 文件源码
def setup_plugin(cls: Type[NvimPlugin], name: str, prefix: str, debug: bool) -> None:
    help = Helpers(cls, name, prefix)
    cls.name = name
    cls.prefix = prefix
    cls.debug = debug
    help.msg_cmd('show_log_info', ShowLogInfo)
    help.short_handler('log_level', command, cls.set_log_level)
    help.msg_fun('mapping', Mapping)
    help.name_handler('stage_1', command, cls.stage_1, sync=True)
    help.name_handler('stage_2', command, cls.stage_2, sync=True)
    help.name_handler('stage_3', command, cls.stage_3, sync=True)
    help.name_handler('stage_4', command, cls.stage_4, sync=True)
    help.name_handler('quit', command, cls.quit, sync=True)
    help.name_handler('rpc_handlers', function, cls.rpc_handlers, sync=True)
    help.name_handler('append_python_path', function, cls.append_python_path)
    help.name_handler('show_python_path', function, cls.show_python_path)
    help.name_handler('send', function, cls.send_message)
项目:ribosome    作者:tek    | 项目源码 | 文件源码
def __init__(
            self,
            name: str,
            desc: str,
            help: str,
            prefix: bool,
            tpe: Type[A],
            ctor: Callable[[A], B],
            default: Either[str, B],
    ) -> None:
        self.name = name
        self.desc = desc
        self.help = help
        self.prefix = prefix
        self.tpe = tpe
        self.ctor = ctor
        self.default = default
项目:ribosome    作者:tek    | 项目源码 | 文件源码
def __init__(
            self,
            name: str,
            prefix: Optional[str]=None,
            components: Map[str, Union[str, type]]=Map(),
            state_type: Optional[Type[S]]=None,
            state_ctor: Optional[Callable[['Config', NvimFacade], S]]=None,
            settings: Optional[Settings]=None,
            request_handlers: List[RequestHandler]=Nil,
            core_components: List[str]=Nil,
            default_components: List[str]=Nil
    ) -> None:
        self.name = name
        self.prefix = prefix or name
        self.components = components
        self.state_type = state_type or AutoData
        self.state_ctor = state_ctor or (lambda c, v: self.state_type(config=c, vim_facade=Just(v)))
        self.settings = settings or PluginSettings(name=name)
        self.request_handlers = RequestHandlers.cons(*request_handlers)
        self.core_components = core_components
        self.default_components = default_components
项目:typesentry    作者:h2oai    | 项目源码 | 文件源码
def test_Type():
    from typing import Type, Any

    class A(object): pass

    class B(A): pass

    class C(B): pass

    class D(A): pass

    assert is_type(A, type)
    assert is_type(A, Type)
    assert is_type(A, Type[Any])
    assert is_type(A, Type[object])
    assert is_type(A, Type[A])
    assert is_type(B, Type[A])
    assert is_type(C, Type[A])
    assert is_type(C, Type[B])
    assert is_type(D, Type[A])
    assert not is_type(A, Type[B])
    assert not is_type(D, Type[B])
    assert not is_type("str", Type)
    assert not is_type(None, Type[A])
项目:typesentry    作者:h2oai    | 项目源码 | 文件源码
def test_typing():
    from typing import Any, List, Set, Dict, Type, Tuple
    assert name_type(Any) == "Any"
    assert name_type(List) == "List"
    assert name_type(List[Any]) == "List"
    assert name_type(List[str]) == "List[str]"
    assert name_type(List[int]) == "List[int]"
    assert name_type(Set) == "Set"
    assert name_type(Set[Any]) == "Set"
    assert name_type(Set[List]) == "Set[List]"
    assert name_type(Dict) == "Dict"
    assert name_type(Dict[Any, Any]) == "Dict"
    assert name_type(Dict[str, int]) == "Dict[str, int]"
    assert name_type(Type) == "Type"
    assert name_type(Type[int]) == "Type[int]"
    assert name_type(Type[MagicType]) == "Type[MagicType]"
    assert name_type(Tuple) == "Tuple"
    assert name_type(Tuple[int]) == "Tuple[int]"
    assert name_type(Tuple[int, str, List]) == "Tuple[int, str, List]"
    assert name_type(Tuple[int, Ellipsis]) == "Tuple[int, ...]"
    assert name_type(Tuple[str, Ellipsis]) == "Tuple[str, ...]"
项目:BAG_framework    作者:ucb-art    | 项目源码 | 文件源码
def create_master_instance(self, gen_cls, lib_name, params, used_cell_names, **kwargs):
        # type: (Type[MasterType], str, Dict[str, Any], Set[str], **kwargs) -> MasterType
        """Create a new non-finalized master instance.

        This instance is used to determine if we created this instance before.

        Parameters
        ----------
        gen_cls : Type[MasterType]
            the generator Python class.
        lib_name : str
            generated instance library name.
        params : Dict[str, Any]
            instance parameters dictionary.
        used_cell_names : Set[str]
            a set of all used cell names.
        **kwargs
            optional arguments for the generator.

        Returns
        -------
        master : MasterType
            the non-finalized generated instance.
        """
        raise NotImplementedError('not implemented')
项目:BAG_framework    作者:ucb-art    | 项目源码 | 文件源码
def _import_class_from_str(class_str):
    # type: (str) -> Type
    """Given a Python class string, convert it to the Python class.

    Parameters
    ----------
    class_str : str
        a Python class string/

    Returns
    -------
    py_class : class
        a Python class.
    """
    sections = class_str.split('.')

    module_str = '.'.join(sections[:-1])
    class_str = sections[-1]
    modul = importlib.import_module(module_str)
    return getattr(modul, class_str)
项目:curious    作者:SunDwarf    | 项目源码 | 文件源码
def make_user(self, user_data: dict, *,
                  user_klass: typing.Type[UserType] = User,
                  override_cache: bool = False) -> UserType:
        """
        Creates a new user and caches it.

        :param user_data: The user data to use to create.
        :param user_klass: The type of user to create.
        :param override_cache: Should the cache be overridden?
        :return: A new :class`~.User` (hopefully).
        """
        id = int(user_data.get("id", 0))
        if id in self._users and not override_cache:
            return self._users[id]

        user = user_klass(self.client, **user_data)
        self._users[user.id] = user

        return user
项目:curious    作者:SunDwarf    | 项目源码 | 文件源码
def load_plugin(self, klass: typing.Type[Plugin], *args,
                          module: str = None):
        """
        Loads a plugin.
        .. note::

            The client instance will automatically be provided to the Plugin's ``__init__``.

        :param klass: The plugin class to load.
        :param args: Any args to provide to the plugin.
        :param module: The module name provided with this plugin. Only used interally.
        """
        # get the name and create the plugin object
        plugin_name = getattr(klass, "plugin_name", klass.__name__)
        instance = klass(self.client, *args)

        # call load, of course
        await instance.load()

        self.plugins[plugin_name] = instance
        if module is not None:
            self._module_plugins[module].append(instance)

        return instance
项目:python-devtools    作者:samuelcolvin    | 项目源码 | 文件源码
def _warn(self, msg, category: Type[Warning]=RuntimeWarning):
        if self._show_warnings:
            warnings.warn(msg, category)
项目:swagger-codegen-example-python    作者:cnadiminti    | 项目源码 | 文件源码
def from_dict(cls: Type[T], dikt) -> T:
        """
        Returns the dict as a model
        """
        return deserialize_model(dikt, cls)
项目:Hanabi-AI    作者:MeGotsThis    | 项目源码 | 文件源码
def __init__(self,
                 username: str,
                 password: str,
                 botModule: str,
                 botconfig: Mapping,
                 *args,
                 **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.username: str = username
        self.password: str = password
        module = importlib.import_module(botModule + '.bot')
        self.botCls: Type[Bot] = module.Bot  # type: ignore
        self.botconfig: Mapping = botconfig
        self.conn: socketIO_client.SocketIO
        self.game: Optional[Game] = None
项目:Hanabi-AI    作者:MeGotsThis    | 项目源码 | 文件源码
def __copy__(self) -> 'CardKnowledge':
        cls: Type[CardKnowledge] = self.__class__
        result: CardKnowledge = cls.__new__(cls)
        result.__dict__.update(self.__dict__)
        result.cantBe = {c: self.cantBe[c][:] for c in self.bot.colors}
        return result
项目:Hanabi-AI    作者:MeGotsThis    | 项目源码 | 文件源码
def __init__(self,
                 connection: Any,
                 variant: Variant,
                 names: List[str],
                 botPosition: int,
                 botCls: Type['bot.Bot'],
                 **kwargs) -> None:
        self.connection: Any = connection
        self.variant: Variant  = variant
        self.numPlayers: int = len(names)
        self.botPosition: int = botPosition
        self.bot: bot.Bot
        self.bot = botCls(self, botPosition, names[botPosition], **kwargs)
        self.players: List[Player] = [self.bot.create_player(p, names[p])
                                      for p in range(self.numPlayers)]
        self.turnCount: int = -1
        self.deckCount: int = -1
        self.scoreCount: int = 0
        self.clueCount: int = 8
        self.strikeCount: int = 0
        self.currentPlayer: int = -1
        self.deck: Dict[int, Card] = {}
        self.discards: List[int] = []
        self.playedCards: Dict[Color, List[Card]]
        self.playedCards = {c: [] for c in variant.pile_colors}
        self.actionLog: List[str] = []
        self._lastAction: Optional[Action] = None
        self._cardMoved: Optional[int] = None
        self._cardPosition: Optional[int] = None
        self._striked: bool = False
项目:fauxmo-plugins    作者:n8henrie    | 项目源码 | 文件源码
def __exit__(self, exc_type: Optional[Type[BaseException]], exc_value:
                 Optional[Exception], traceback: Optional[TracebackType]) \
            -> None:
        """Terminate the server and join the thread on exit."""
        self.server.terminate()
        self.server.join()
项目:Lyra    作者:caterinaurban    | 项目源码 | 文件源码
def __init__(self, lattice: Type, arguments: Dict[str, Any]):
        """Create a stack of elements of a lattice.

        :param lattice: type of the lattice
        """
        super().__init__()
        self._stack = [lattice(**arguments)]
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def unsupported(type: Type[T]) -> UnsupportedError:
        return UnsupportedError("The type \"{type}\" is not supported by this DataSource!".format(type=type.__name__))
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def provides(self):  # type: Union[Iterable[Type[T]], Type[Any]]
        """The types of objects the data store provides."""
        types = set()
        any_dispatch = False
        try:
            types.update(getattr(self.__class__, "get")._provides)
            any_dispatch = True
        except AttributeError:
            pass
        try:
            types.update(getattr(self.__class__, "get_many")._provides)
            any_dispatch = True
        except AttributeError:
            pass
        return types if any_dispatch else TYPE_WILDCARD
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def get(self, type: Type[T], query: Mapping[str, Any], context: PipelineContext = None) -> T:
        """Gets a query from the data source.

        Args:
            query: The query being requested.
            context: The context for the extraction (mutable).

        Returns:
            The requested object.
        """
        pass
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def get_many(self, type: Type[T], query: Mapping[str, Any], context: PipelineContext = None) -> Iterable[T]:
        """Gets a query from the data source, which contains a request for multiple objects.

        Args:
            query: The query being requested (contains a request for multiple objects).
            context: The context for the extraction (mutable).

        Returns:
            The requested objects.
        """
        pass
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def get_many(self, type: Type[T], query: Mapping[str, Any], context: PipelineContext = None) -> Iterable[T]:
        try:
            sources = self._sources[type]
        except KeyError as error:
            raise DataSource.unsupported(type) from error

        for source in sources:
            try:
                return source.get_many(type, deepcopy(query), context)
            except NotFoundError:
                continue
        raise NotFoundError()
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def get(self, type: Type[T], query: Mapping[str, Any], context: PipelineContext = None) -> T:
        try:
            sources = self._sources[type]
        except KeyError as error:
            raise DataSource.unsupported(type) from error

        for source in sources:
            try:
                return source.get(type, deepcopy(query), context)
            except NotFoundError:
                continue
        raise NotFoundError()
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def unsupported(type: Type[T]) -> UnsupportedError:
        return UnsupportedError("The type \"{type}\" is not supported by this DataSink!".format(type=type.__name__))