Python typing 模块,cast() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用typing.cast()

项目:Hanabi-AI    作者:MeGotsThis    | 项目源码 | 文件源码
def discardSomeCard(self) -> bool:
        best_index: int = 0
        card: CardKnowledge
        bestCard: CardKnowledge
        i: int
        for i in range(len(self.hand)):
            card = cast(CardKnowledge, self.game.deck[self.hand[i]])
            bestCard = cast(CardKnowledge,
                            self.game.deck[self.hand[best_index]])
            if bestCard.maybeValue is None:
                best_index = i
            elif (card.maybeValue is not None
                    and card.maybeValue > bestCard.maybeValue):
                best_index = i
        self.discard_card(best_index)
        return True
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def require_bool(value: Optional[Union[bool, str, int]], convert: bool=False, allow_none: bool=False) -> Any:
    """Make sure a value is a boolean.

    Used when dealing with http input data.
    """
    if value is None and allow_none:
        return value
    if type(value) != bool:
        if not convert:
            raise InvalidData()
        if value in [None, 0, '0', 'false', 'False']:
            value = False
        elif value in [1, '1', 'true', 'True']:
            value = True
        else:
            raise InvalidData('value was %s(%s), expected bool' % (type(value), value))
    return cast(bool, value)
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def require_dict(value: Optional[Dict[Any, Any]], key_type: Any=None, value_type: Any=None,
                 allow_none: bool=False) -> Any:
    """Make sure a value is a Dict[key_type, value_type].

    Used when dealing with http input data.
    """
    if value is None and allow_none:
        return value
    if type(value) != dict:
        raise InvalidData('value was %s(%s), expected dict' % (type(value), value))
    value = cast(Dict, value)
    if key_type or value_type:
        for k, v in value.items():
            if key_type and type(k) != key_type:
                raise InvalidData('dict key was %s(%s), expected %s' % (type(k), k, key_type))
            if value_type and type(v) != value_type:
                raise InvalidData('dict value was %s(%s), expected %s' % (type(v), v, key_type))
    return value
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def _get_monitor_metadata(self, dbcon: DBConnection) -> Optional[Dict[int, Dict[str, str]]]:
        include_metadata = require_bool(
            get_request_param(self.request, 'include_metadata', error_if_missing=False),
            convert=True) or False
        if not include_metadata:
            return None
        if 'id' in self.request.rel_url.query:
            metadata_models = await metadata.get_metadata_for_object(
                dbcon, 'active_monitor', require_int(cast(str, get_request_param(self.request, 'id'))))
        elif 'meta_key' in self.request.rel_url.query:
            meta_key = require_str(get_request_param(self.request, 'meta_key'))
            meta_value = require_str(get_request_param(self.request, 'meta_value'))
            metadata_models = await metadata.get_metadata_for_object_metadata(
                dbcon, meta_key, meta_value, 'active_monitor', 'active_monitors')
        elif 'monitor_group_id' in self.request.rel_url.query:
            metadata_models = await monitor_group.get_active_monitor_metadata_for_monitor_group(
                dbcon, require_int(cast(str, get_request_param(self.request, 'monitor_group_id'))))
        else:
            metadata_models = await metadata.get_metadata_for_object_type(dbcon, 'active_monitor')
        metadata_dict = {}  # type: Dict[int, Dict[str, str]]
        for metadata_model in metadata_models:
            if metadata_model.object_id not in metadata_dict:
                metadata_dict[metadata_model.object_id] = {}
            metadata_dict[metadata_model.object_id][metadata_model.key] = metadata_model.value
        return metadata_dict
项目:allennlp    作者:allenai    | 项目源码 | 文件源码
def _sort_dataset_by_padding(dataset: Dataset,
                                 sorting_keys: List[Tuple[str, str]],  # pylint: disable=invalid-sequence-index
                                 padding_noise: float = 0.0) -> Dataset:
        """
        Sorts the ``Instances`` in this ``Dataset`` by their padding lengths, using the keys in
        ``sorting_keys`` (in the order in which they are provided).  ``sorting_keys`` is a list of
        ``(field_name, padding_key)`` tuples.
        """
        instances_with_lengths = []
        for instance in dataset.instances:
            padding_lengths = cast(Dict[str, Dict[str, float]], instance.get_padding_lengths())
            if padding_noise > 0.0:
                noisy_lengths = {}
                for field_name, field_lengths in padding_lengths.items():
                    noisy_lengths[field_name] = add_noise_to_dict_values(field_lengths, padding_noise)
                padding_lengths = noisy_lengths
            instance_with_lengths = ([padding_lengths[field_name][padding_key]
                                      for (field_name, padding_key) in sorting_keys],
                                     instance)
            instances_with_lengths.append(instance_with_lengths)
        instances_with_lengths.sort(key=lambda x: x[0])
        return Dataset([instance_with_lengths[-1] for instance_with_lengths in instances_with_lengths])
项目:suq    作者:MaxwellBo    | 项目源码 | 文件源码
def get_remaining_shared_breaks_this_week(group_members: Set[User]) -> List[Break]:
    """
    Finds this weeks remaining common breaks between a group of users
    """
    # So, the Mypy type checker treats `List` as invariant, meaning we
    # can't give a `List[B]` to a function that expects a `List[A]` if
    # B is a subclass of A.
    # So we have to cast it in to the function...

    # FIXME: Get rid of these casts when Van Rossum figures out how to write a
    #        proper type system
    breaks = cast(List[Event_], get_shared_breaks(group_members))
    now = datetime.now(BRISBANE_TIME_ZONE)

    ### ... and out.
    return cast(List[Break], get_this_weeks_events(now, breaks))


# FIXME: Make 'request_status' an enum: https://docs.python.org/3/library/enum.html
项目:gopythongo    作者:gopythongo    | 项目源码 | 文件源码
def create_file_in_config_folder(self, filename: str, mode: int=None) -> TextIO:
        """
        :param filename: the name of the file in the generated config folder
        :param mode: pass an ``int`` here if you want to modify the files mode (will be umasked)
        :return: an open file descriptor (``TextIO``) object that the *caller must call `.close()` on*
        """
        if os.path.isfile(filename):
            raise InvalidArgumentException("Call create_file_in_config_folder with a filename, not a path")

        self.ensure_config_folder()
        f = cast(TextIO, io.open(os.path.join(self.configfolder, filename), mode="wt", encoding="utf-8"))

        if mode:
            os.chmod(os.path.join(self.configfolder, filename), get_umasked_mode(mode))

        return f
项目:gopythongo    作者:gopythongo    | 项目源码 | 文件源码
def validate_args(self, args: configargparse.Namespace) -> None:
        _aptly_args.validate_shared_args(args)

        from gopythongo.versioners import get_version_parsers
        debvp = cast(DebianVersionParser, get_version_parsers()["debian"])  # type: DebianVersionParser
        if args.version_action not in debvp.supported_actions:
            raise ErrorMessage("Version Action is set to '%s', but you chose the Aptly Store which relies on Debian "
                               "version strings. Unfortunately the Debian Versioner does not support the '%s' action. "
                               "It only supports: %s." %
                               (highlight(args.version_action), highlight(args.version_action),
                                highlight(", ".join(debvp.supported_actions))))

        if "-distribution" in args.aptly_publish_opts:
            print_warning("You are using %s in your Aptly Store options. You should use the %s GoPythonGo argument "
                          "instead, since using -distribution in the aptly command line is invalid when GoPythonGo "
                          "tries to update a published repo." %
                          (highlight("-distribution"), highlight("--aptly-distribution")))

        if args.use_aptly_wrapper:
            wrapper_cmd = create_script_path(the_context.gopythongo_path, "vaultwrapper")
            if not os.path.exists(wrapper_cmd) or not os.access(wrapper_cmd, os.X_OK):
                raise ErrorMessage("%s can either not be found or is not executable. The vault wrapper seems to "
                                   "be unavailable." % wrapper_cmd)
            self.aptly_wrapper_cmd = wrapper_cmd
项目:rcli    作者:contains-io    | 项目源码 | 文件源码
def _get_commands(dist  # type: setuptools.dist.Distribution
                  ):
    # type: (...) -> typing.Dict[str, typing.Set[str]]
    """Find all commands belonging to the given distribution.

    Args:
        dist: The Distribution to search for docopt-compatible docstrings that
            can be used to generate command entry points.

    Returns:
        A dictionary containing a mapping of primary commands to sets of
        subcommands.
    """
    py_files = (f for f in setuptools.findall()
                if os.path.splitext(f)[1].lower() == '.py')
    pkg_files = (f for f in py_files if _get_package_name(f) in dist.packages)
    commands = {}  # type: typing.Dict[str, typing.Set[str]]
    for file_name in pkg_files:
        with open(file_name) as py_file:
            module = typing.cast(ast.Module, ast.parse(py_file.read()))
        module_name = _get_module_name(file_name)
        _append_commands(commands, module_name, _get_module_commands(module))
        _append_commands(commands, module_name, _get_class_commands(module))
        _append_commands(commands, module_name, _get_function_commands(module))
    return commands
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def filter_all_or_404(model: t.Type[Y], *criteria: t.Any) -> t.Sequence[Y]:
    """Get all objects of the specified model filtered by the specified
    criteria.

    .. note::
        ``Y`` is bound to :py:class:`psef.models.Base`, so it should be a
        SQLAlchemy model.

    :param model: The object to get.
    :param criteria: The criteria to filter with.
    :returns: The requested objects.

    :raises APIException: If no object with the given id could be found.
        (OBJECT_ID_NOT_FOUND)
    """
    return t.cast(t.Sequence[Y], _filter_or_404(model, True, criteria))
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def filter_single_or_404(model: t.Type[Y], *criteria: t.Any) -> Y:
    """Get a single object of the specified model by filtering or raise an
    exception.

    .. note::
        ``Y`` is bound to :py:class:`psef.models.Base`, so it should be a
        SQLAlchemy model.

    :param model: The object to get.
    :param criteria: The criteria to filter with.
    :returns: The requested object.

    :raises APIException: If no object with the given id could be found.
        (OBJECT_ID_NOT_FOUND)
    """
    return t.cast(Y, _filter_or_404(model, False, criteria))
项目:graphscale    作者:schrockn    | 项目源码 | 文件源码
def gen_update_pent_dynamic(
    context: PentContext,
    obj_id: UUID,
    pent_cls_name: str,
    data_cls_name: str,
    payload_cls_name: str,
    data: PentMutationData
) -> PentMutationPayload:

    data_cls = context.cls_from_name(data_cls_name)
    check.isinst(data, data_cls)

    pent_cls = context.cls_from_name(pent_cls_name)
    payload_cls = context.cls_from_name(payload_cls_name)

    pent = await update_pent(context, pent_cls, obj_id, data)
    return cast(PentMutationPayload, payload_cls(pent))
项目:graphscale    作者:schrockn    | 项目源码 | 文件源码
def gen_operation(self, graphql_text: str, operation: str, *args: GraphQLArg) -> dict:
        arg_strings = []
        for name, arg_type, _value in args:
            arg_strings.append("${name}: {arg_type}".format(name=name, arg_type=arg_type))

        arg_list = ', '.join(arg_strings)

        full_query = (
            '{operation} ({arg_list}) '.format(arg_list=arg_list, operation=operation) + '{' +
            graphql_text + '}'
        )
        arg_dict = {arg.name: arg.value for arg in args}
        result = await (
            exec_in_mem_graphql(
                self.graphql_schema, self.context, full_query, self.root_value, arg_dict
            )
        )
        if result.errors:
            _process_error(result)

        return cast(dict, result.data)
项目:mazes-for-programmers-python-src    作者:Kartones    | 项目源码 | 文件源码
def render(self, grid: Grid, **kwargs: Any) -> None:
        horizontal_wall = "\u2501"
        vertical_wall = "\u2503"

        output = self.JUNCTIONS[12]
        for x in range(grid.columns - 1):
            output += (horizontal_wall * 3 + self.get_topmost_junction(cast(Cell, grid.cell_at(row=0, column=x))))
        output += horizontal_wall * 3 + self.JUNCTIONS[10] + "\n"

        for row in grid.each_row():
            top = vertical_wall
            bottom = self.get_leftmost_junction(row[0])
            for cell in row:
                body = grid.contents_of(cell)
                east_boundary = " " if cell.linked_to(cell.east) else vertical_wall
                top += body + east_boundary
                south_boundary = "   " if cell.linked_to(cell.south) else horizontal_wall * 3
                bottom += south_boundary + self.get_south_east_junction(cell)
            output += top + "\n"
            output += bottom + "\n"

        print(output)
项目:neuralmonkey    作者:ufal    | 项目源码 | 文件源码
def feed_dict(self, dataset: Dataset, train: bool = False) -> FeedDict:
        fd = {}  # type: FeedDict

        sentences = cast(Iterable[List[str]],
                         dataset.get_series(self.data_id, allow_none=True))

        fd[self.train_mode] = train

        if sentences is not None:
            vectors, paddings = self.vocabulary.sentences_to_tensor(
                list(sentences), pad_to_max_len=False, train_mode=train)

            fd[self.train_targets] = vectors.T
            fd[self.train_weights] = paddings.T

        return fd
项目:neuralmonkey    作者:ufal    | 项目源码 | 文件源码
def feed_dict(self, dataset: Dataset, train: bool = False) -> FeedDict:
        sentences = cast(Iterable[List[str]],
                         dataset.get_series(self.data_id, allow_none=True))

        sentences_list = list(sentences) if sentences is not None else None

        fd = {}  # type: FeedDict

        if sentences is not None:
            label_tensors, _ = self.vocabulary.sentences_to_tensor(
                sentences_list, self.max_output_len)

            # pylint: disable=unsubscriptable-object
            fd[self.gt_inputs[0]] = label_tensors[0]
            # pylint: enable=unsubscriptable-object

        fd[self.train_mode] = train

        return fd
项目:neuralmonkey    作者:ufal    | 项目源码 | 文件源码
def _get_series_outputs(series_config: SeriesConfig) -> Dict[str, str]:
    """Get paths to series outputs from the dataset keyword argument specs.

    Output file for a series named 'xxx' is specified by parameter 's_xxx_out'

    Arguments:
        series_config: A dictionary containing the dataset keyword argument
           specs.

    Returns:
        A dictionary which maps serie names to the paths for their output
        files.
    """
    outputs = {}
    for key, value in series_config.items():
        matcher = SERIES_OUTPUT.match(key)
        if matcher:
            name = matcher.group(1)
            if not isinstance(value, str):
                raise ValueError(
                    "Output path for '{}' series must be a string, was {}.".
                    format(name, type(value)))
            outputs[name] = cast(str, value)
    return outputs
项目:neuralmonkey    作者:ufal    | 项目源码 | 文件源码
def _preprocessed_datasets(
        dataset: Dataset,
        series_config: SeriesConfig) -> None:
    """Apply dataset-level preprocessing."""
    keys = [key for key in series_config.keys()
            if PREPROCESSED_SERIES.match(key)]

    for key in keys:
        name = PREPROCESSED_SERIES.match(key).group(1)
        preprocessor = cast(DatasetPreprocess, series_config[key])

        if isinstance(dataset, Dataset):
            new_series = list(preprocessor(dataset))
            dataset.add_series(name, new_series)
        elif isinstance(dataset, LazyDataset):
            dataset.preprocess_series[name] = (None, preprocessor)
项目:neuralmonkey    作者:ufal    | 项目源码 | 文件源码
def __init__(self,
                 output_series: str,
                 encoder: Stateful,
                 used_session: int = 0) -> None:
        """Initialize the representation runner.

        Args:
            output_series: Name of the output seriesi with vectors.
            encoder: Used encoder.
            used_session: Id of the TensorFlow session used in case of model
                ensembles.
        """
        check_argument_types()

        if not isinstance(encoder, ModelPart):
            raise TypeError("The encoder of the representation runner has to "
                            "be an instance of 'ModelPart'")

        BaseRunner.__init__(self, output_series, cast(ModelPart, encoder))

        self._used_session = used_session  # type: int
        self._encoded = encoder.output  # type: tf.Tensor

    # pylint: disable=unused-argument
项目:neuralmonkey    作者:ufal    | 项目源码 | 文件源码
def feed_dict(self, dataset: Dataset, train: bool = False) -> FeedDict:
        fd = {}  # type: FeedDict

        sentences = cast(Iterable[List[str]],
                         dataset.get_series(self.data_id, allow_none=True))

        fd[self.train_mode] = train

        if sentences is not None:
            vectors, paddings = self.vocabulary.sentences_to_tensor(
                list(sentences), pad_to_max_len=False, train_mode=train)

            fd[self.train_targets] = vectors.T
            fd[self.train_weights] = paddings.T

        return fd
项目:neuralmonkey    作者:ufal    | 项目源码 | 文件源码
def _get_series_outputs(series_config: SeriesConfig) -> Dict[str, str]:
    """Get paths to series outputs from the dataset keyword argument specs.

    Output file for a series named 'xxx' is specified by parameter 's_xxx_out'

    Arguments:
        series_config: A dictionary containing the dataset keyword argument
           specs.

    Returns:
        A dictionary which maps serie names to the paths for their output
        files.
    """
    outputs = {}
    for key, value in series_config.items():
        matcher = SERIES_OUTPUT.match(key)
        if matcher:
            name = matcher.group(1)
            if not isinstance(value, str):
                raise ValueError(
                    "Output path for '{}' series must be a string, was {}.".
                    format(name, type(value)))
            outputs[name] = cast(str, value)
    return outputs
项目:neuralmonkey    作者:ufal    | 项目源码 | 文件源码
def _preprocessed_datasets(
        dataset: Dataset,
        series_config: SeriesConfig) -> None:
    """Apply dataset-level preprocessing."""
    keys = [key for key in series_config.keys()
            if PREPROCESSED_SERIES.match(key)]

    for key in keys:
        name = PREPROCESSED_SERIES.match(key).group(1)
        preprocessor = cast(DatasetPreprocess, series_config[key])

        if isinstance(dataset, Dataset):
            new_series = list(preprocessor(dataset))
            dataset.add_series(name, new_series)
        elif isinstance(dataset, LazyDataset):
            dataset.preprocess_series[name] = (None, preprocessor)
项目:neuralmonkey    作者:ufal    | 项目源码 | 文件源码
def __init__(self,
                 output_series: str,
                 encoder: Stateful,
                 used_session: int = 0) -> None:
        """Initialize the representation runner.

        Args:
            output_series: Name of the output seriesi with vectors.
            encoder: Used encoder.
            used_session: Id of the TensorFlow session used in case of model
                ensembles.
        """
        check_argument_types()

        if not isinstance(encoder, ModelPart):
            raise TypeError("The encoder of the representation runner has to "
                            "be an instance of 'ModelPart'")

        BaseRunner.__init__(self, output_series, cast(ModelPart, encoder))

        self._used_session = used_session  # type: int
        self._encoded = encoder.output  # type: tf.Tensor

    # pylint: disable=unused-argument
项目:neuralmonkey    作者:ufal    | 项目源码 | 文件源码
def feed_dict(self, dataset: Dataset, train: bool = False) -> FeedDict:
        sentences = cast(Iterable[List[str]],
                         dataset.get_series(self.data_id, allow_none=True))

        sentences_list = list(sentences) if sentences is not None else None

        fd = {}  # type: FeedDict

        if sentences is not None:
            label_tensors, _ = self.vocabulary.sentences_to_tensor(
                sentences_list, self.max_output_len)

            # pylint: disable=unsubscriptable-object
            fd[self.gt_inputs[0]] = label_tensors[0]
            # pylint: enable=unsubscriptable-object

        fd[self.train_mode] = train

        return fd
项目:neuralmonkey    作者:ufal    | 项目源码 | 文件源码
def _get_series_outputs(series_config: SeriesConfig) -> Dict[str, str]:
    """Get paths to series outputs from the dataset keyword argument specs.

    Output file for a series named 'xxx' is specified by parameter 's_xxx_out'

    Arguments:
        series_config: A dictionary containing the dataset keyword argument
           specs.

    Returns:
        A dictionary which maps serie names to the paths for their output
        files.
    """
    outputs = {}
    for key, value in series_config.items():
        matcher = SERIES_OUTPUT.match(key)
        if matcher:
            name = matcher.group(1)
            if not isinstance(value, str):
                raise ValueError(
                    "Output path for '{}' series must be a string, was {}.".
                    format(name, type(value)))
            outputs[name] = cast(str, value)
    return outputs
项目:neuralmonkey    作者:ufal    | 项目源码 | 文件源码
def __init__(self,
                 output_series: str,
                 encoder: Stateful,
                 used_session: int = 0) -> None:
        """Initialize the representation runner.

        Args:
            output_series: Name of the output seriesi with vectors.
            encoder: Used encoder.
            used_session: Id of the TensorFlow session used in case of model
                ensembles.
        """
        check_argument_types()

        if not isinstance(encoder, ModelPart):
            raise TypeError("The encoder of the representation runner has to "
                            "be an instance of 'ModelPart'")

        BaseRunner.__init__(self, output_series, cast(ModelPart, encoder))

        self._used_session = used_session  # type: int
        self._encoded = encoder.output  # type: tf.Tensor

    # pylint: disable=unused-argument
项目:bit-torrent    作者:borzunov    | 项目源码 | 文件源码
def _handle_haves(self, message_id: MessageType, payload: memoryview):
        if message_id == MessageType.have:
            (index,) = struct.unpack('!I', cast(bytes, payload))
            self._mark_as_owner(index)
        elif message_id == MessageType.bitfield:
            piece_count = self._download_info.piece_count
            PeerTCPClient._check_payload_len(message_id, payload, int(ceil(piece_count / 8)))

            arr = bitarray(endian='big')
            arr.frombytes(payload.tobytes())
            for i in range(piece_count):
                if arr[i]:
                    self._mark_as_owner(i)
            for i in range(piece_count, len(arr)):
                if arr[i]:
                    raise ValueError('Spare bits in "bitfield" message must be zero')

        # if self._download_info.complete and self.is_seed():
        #     raise SeedError('A seed is disconnected because a download is complete')
项目:bit-torrent    作者:borzunov    | 项目源码 | 文件源码
def _handle_requests(self, message_id: MessageType, payload: memoryview):
        piece_index, begin, length = struct.unpack('!3I', cast(bytes, payload))
        request = BlockRequest(piece_index, begin, length)
        self._check_position_range(request)

        if message_id == MessageType.request:
            if length > PeerTCPClient.MAX_REQUEST_LENGTH:
                raise ValueError('Requested {} bytes, but the current policy allows to accept requests '
                                 'of not more than {} bytes'.format(length, PeerTCPClient.MAX_REQUEST_LENGTH))
            if (self._am_choking or not self._peer_interested or
                    not self._download_info.pieces[piece_index].downloaded):
                # If peer isn't interested but requesting, their peer_interested flag wasn't considered
                # when selecting who to unchoke, so we may be not ready to upload to them.
                # If requested piece is not downloaded yet, we shouldn't disconnect because our piece_downloaded flag
                # could be removed because of file corruption.
                return

            await self._send_block(request)
            await self.drain()
        elif message_id == MessageType.cancel:
            # Now we answer to a request immediately or reject and forget it,
            # so there's no need to handle cancel messages
            pass
项目:aioprometheus    作者:claws    | 项目源码 | 文件源码
def add(self, labels: LabelsType, value: NumericValueType) -> None:
        ''' Add will add the given value to the counter.

        :raises: ValueError if the value is negative. Counters can only
          increase.
        '''
        value = cast(Union[float, int], value)  # typing check, no runtime behaviour.
        if value < 0:
            raise ValueError("Counters can't decrease")

        try:
            current = self.get_value(labels)
        except KeyError:
            current = 0

        current = cast(Union[float, int], current)  # typing check, no runtime behaviour.
        self.set_value(labels, current + value)
项目:aioprometheus    作者:claws    | 项目源码 | 文件源码
def add(self, labels: LabelsType, value: NumericValueType) -> None:
        ''' Add adds a single observation to the summary '''

        value = cast(Union[float, int], value)  # typing check, no runtime behaviour.
        if type(value) not in (float, int):
            raise TypeError("Summary only works with digits (int, float)")

        try:
            e = self.get_value(labels)
        except KeyError:
            # Initialize quantile estimator
            e = quantile.Estimator(*self.invariants)
            self.set_value(labels, e)

        e.observe(float(value))  # type: ignore

    # https://prometheus.io/docs/instrumenting/writing_clientlibs/#summary
    # A summary MUST have the ``observe`` methods
项目:aioprometheus    作者:claws    | 项目源码 | 文件源码
def get(self,
            labels: LabelsType) -> Dict[Union[float, str], NumericValueType]:
        '''
        Get gets a dict of values, containing the sum, count and percentiles,
        matching an arbitrary group of labels.

        :raises: KeyError if an item with matching labels is not present.
        '''
        return_data = {}  # type: Dict[Union[float, str], NumericValueType]

        e = self.get_value(labels)
        e = cast(Any, e)  # typing check, no runtime behaviour.

        # Set invariants data (default to 0.50, 0.90 and 0.99)
        for i in e._invariants:  # type: ignore
            q = i._quantile
            return_data[q] = e.query(q)  # type: ignore

        # Set sum and count
        return_data[self.SUM_KEY] = e._sum  # type: ignore
        return_data[self.COUNT_KEY] = e._observations  # type: ignore

        return return_data
项目:aioprometheus    作者:claws    | 项目源码 | 文件源码
def get(self,
            labels: LabelsType) -> Dict[Union[float, str], NumericValueType]:
        '''
        Get gets a dict of values, containing the sum, count and buckets,
        matching an arbitrary group of labels.

        :raises: KeyError if an item with matching labels is not present.
        '''
        return_data = {}  # type: Dict[Union[float, str], NumericValueType]

        h = self.get_value(labels)
        h = cast(histogram.Histogram, h)  # typing check, no runtime behaviour.

        for upper_bound, cumulative_count in h.buckets.items():
            return_data[upper_bound] = cumulative_count  # keys are floats

        # Set sum and count
        return_data[self.SUM_KEY] = h.sum
        return_data[self.COUNT_KEY] = h.observations

        return return_data
项目:cloak-server    作者:encryptme    | 项目源码 | 文件源码
def handle(self, config, out, force, wait, post_hook, **options):
        server_id, auth_token = self._require_credentials(config)

        server = Server.retrieve(server_id, auth_token)

        while wait and server.csr_pending:
            time.sleep(5)
            server = Server.retrieve(server_id, auth_token)

        tag = self._get_tag(config) if (not force) else None
        result = server.get_pki(tag)

        if result is not PKI.NOT_MODIFIED:
            pki = cast(PKI, result)

            if pki.entity is not None:
                self._handle_pki(result, config, out, post_hook)
                print("Certificates saved to {}.".format(out), file=self.stdout)
            else:
                print("No certificate available. Request one with req.", file=self.stdout)
        else:
            print("Not modified. Pass -f to download anyway.", file=self.stdout)
项目:PYELT    作者:NLHEALTHCARE    | 项目源码 | 文件源码
def __init__(self, source: Union['SourceTable', 'SourceQuery', 'File'], target: Union[str, Table], auto_map: bool = True, filter='', ignore_fields: List[str] = []) -> None:
        #todo transformations
        if isinstance(source, File):
            self.file_name = source.file_name
        elif isinstance(source, SourceTable):
            self.source_table = source
        elif isinstance(source, SourceQuery):
            self.source_table = source

        if isinstance(target, str):
            self.sor_table = str(target) #type: str
        else:
            target_tbl = cast(Table, target)
            self.sor_table = target_tbl.name
        super().__init__(source, target, filter)
        self.temp_table = self.sor_table.replace('_hstage', '') + '_temp'
        #todo keys
        self.keys = [] #type: List[str]
        ignore_fields = [s.lower() for s in ignore_fields]
        self.ignore_fields = ignore_fields
        # self.field_mappings = [] #type: List[FieldMapping]
        self.auto_map = auto_map
        if auto_map: self.create_auto_mappings(source, ignore_fields)
项目:python-driver    作者:bblfsh    | 项目源码 | 文件源码
def test_10_send_response_json(self) -> None:
        self._restart_data('json')
        processor = RequestProcessorJSON(self.recvbuffer)
        processor._send_response(cast(Response, self.data))
        res = self._loadResults('json')
        self.assertEqual(len(res), 1)
        self.assertDictEqual(self.data, res[0])

    # process request already tested with TestPythonDriverBase
项目:Hanabi-AI    作者:MeGotsThis    | 项目源码 | 文件源码
def updateEyesightCount(self) -> None:
        self.eyesightCount = {c: [0] * 6 for c in self.colors}
        p: Player
        c: int
        card: CardKnowledge
        for p in self.game.players:
            for c in p.hand:
                card = cast(CardKnowledge, self.game.deck[c])
                if card.suit is not None and card.rank is not None:
                    self.eyesightCount[card.suit][card.rank] += 1
                elif card.color is not None and card.value is not None:
                    self.eyesightCount[card.color][card.value] += 1
项目:Hanabi-AI    作者:MeGotsThis    | 项目源码 | 文件源码
def updateLocatedCount(self) -> bool:
        '''Returns True if played/discarded cards has changed'''
        newCount: Dict[Color, List[int]] = {c: [0] * 6 for c in self.colors}
        p: Player
        c: int
        for p in self.game.players:
            for c in p.hand:
                card = cast(CardKnowledge, self.game.deck[c])
                if card.color is not None and card.value is not None:
                    newCount[card.color][card.value] += 1

        if newCount != self.locatedCount:
            self.locatedCount = newCount
            return True
        return False
项目:Hanabi-AI    作者:MeGotsThis    | 项目源码 | 文件源码
def handState(self,
                  player: int,
                  showCritical: bool=True) -> List[HandState]:
        handState: List[HandState]
        handState = [HandState.Unclued] * len(self.game.players[player].hand)
        c: int
        h: int
        card: CardKnowledge
        for c, h in enumerate(self.game.players[player].hand):
            card = cast(CardKnowledge, self.game.deck[h])
            if card.worthless is True:
                handState[c] = HandState.Worthless
                continue
            if card.playWorthless is True:
                handState[c] = HandState.Worthless
                continue
            if card.playable is True:
                handState[c] = HandState.Playable
                continue
            if card.valuable is True:
                handState[c] = HandState.Saved
                continue
            if card.clued:
                handState[c] = HandState.SoonPlay
                continue
            if showCritical and player != self.position:
                if self.isValuable(card.suit, card.rank):
                    handState[c] = HandState.Critical
                elif card.rank == Value.V2 and self.is2Valuable(card.suit):
                    handState[c] = HandState.Critical2
        return handState
项目:Hanabi-AI    作者:MeGotsThis    | 项目源码 | 文件源码
def isCluedElsewhere(self, player: int, hand: int) -> bool:
        returnVal: bool = False
        cardIdx: int = self.game.players[player].hand[hand]
        handcard: CardKnowledge
        handcard = cast(CardKnowledge, self.game.deck[cardIdx])
        color: Color = handcard.suit
        value: Value = handcard.rank
        p: Player
        c: int
        card: CardKnowledge
        for p in self.game.players:
            for c in p.hand:
                card = cast(CardKnowledge, self.game.deck[c])
                if card.deckPosition == handcard.deckPosition:
                    continue
                if p is self:
                    if card.mustBeColor(color):
                        if card.mustBeValue(value):
                            return True
                        if card.cannotBeValue(value) and card.clued:
                            returnVal = None
                    elif card.mustBeValue(value):
                        if card.cannotBeColor(color) and card.clued:
                            returnVal = None
                else:
                    if (card.clued
                            and card.suit == color
                            and card.rank == value):
                        return True
                    elif card.color == color and card.value == value:
                        return True
        return returnVal
项目:Hanabi-AI    作者:MeGotsThis    | 项目源码 | 文件源码
def cluedCard(self,
                  color: Color,
                  value: Value,
                  player: Optional[int]=None,
                  strict: bool=False,
                  maybe: bool=False) -> Optional[int]:
        p: Player
        c: int
        card: CardKnowledge
        for p in self.game.players:
            if player == p.position:
                if strict:
                    continue
                # When it is the player, assume fully tagged cards as clued too
                for c in p.hand:
                    card = cast(CardKnowledge, self.game.deck[c])
                    if card.color == color and card.value == value:
                        return card.deckPosition
                    if p is self:
                        if (maybe and card.maybeColor == color
                                and card.maybeValue == value):
                            return card.deckPosition
            elif p is self:
                for c in p.hand:
                    card = cast(CardKnowledge, self.game.deck[c])
                    if card.color == color and card.value == value:
                        return card.deckPosition
                    if (maybe and card.maybeColor == color
                            and card.maybeValue == value):
                        return card.deckPosition
            else:
                for c in p.hand:
                    card = cast(CardKnowledge, self.game.deck[c])
                    if (card.clued
                        and card.suit == color
                        and card.rank == value):
                        return card.deckPosition
        return None
项目:Hanabi-AI    作者:MeGotsThis    | 项目源码 | 文件源码
def doesCardMatchHand(self, deckIdx: int) -> bool:
        deckCard: CardKnowledge
        deckCard = cast(CardKnowledge, self.game.deck[deckIdx])
        assert deckCard.suit is not None
        assert deckCard.rank is not None
        if self.colorComplete[deckCard.suit]:
            return False
        if deckCard.rank == Value.V5:
            return False
        h: int
        for h in self.hand:
            card: CardKnowledge
            card = cast(CardKnowledge, self.game.deck[h])
            if not card.clued:
                continue
            if card.worthless or card.playWorthless:
                continue
            if card.cantBe[deckCard.suit][deckCard.rank]:
                continue
            if card.color is not None and card.value is not None:
                continue
            if card.color == deckCard.suit:
                maybeValue: Optional[Value] = card.maybeValue
                if maybeValue is not None:
                    if maybeValue == deckCard.rank:
                        return True
                    continue
                if deckCard.rank in card.possibleValues:
                    return True
            if card.value == deckCard.rank:
                maybeColor: Optional[Color] = card.maybeColor
                if maybeColor is not None:
                    if maybeColor == deckCard.suit:
                        return True
                    continue
                if deckCard.suit in card.possibleColors:
                    return True
        return False
项目:Hanabi-AI    作者:MeGotsThis    | 项目源码 | 文件源码
def pleaseObserveBeforeDiscard(self,
                                   from_: int,
                                   card_index: int,
                                   deckIdx: int) -> None:
        card: CardKnowledge = cast(CardKnowledge, self.game.deck[deckIdx])
        card.state = CardState.Discard
        self.seePublicCard(card.suit, card.rank)
项目:proxenos    作者:darvid    | 项目源码 | 文件源码
def srand(seed=0):
    # type: (KeyType) -> typing.Generator[int, None, None]
    if isinstance(seed, six.string_types) or isinstance(seed, bytes):
        if isinstance(seed, six.text_type):
            seed = seed.encode('utf-8')
        seed_int = int(hashlib.sha512(seed).hexdigest(), 16)
        seed = typing.cast(int, seed_int)
    rng = random.Random(seed)
    while True:
        yield rng.randint(0, sys.maxsize)
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def configure_logging(logtype: str, logfilename: Optional[str]=None, debug_logging: bool=False,
                      rotate_length: int=1000000, max_rotated_files: int=250) -> None:
    global logger
    level = logging.INFO
    if debug_logging:
        level = logging.DEBUG
    if logtype not in ['stdout', 'syslog', 'file']:
        raise errors.IrisettError('invalid logtype name %s' % logtype)
    if rotate_length is None:
        rotate_length = 1000000
    if max_rotated_files is None:
        max_rotated_files = 250
    logger = logging.getLogger('irisett')
    logger.setLevel(level)

    if logtype == 'stdout':
        handler = logging.StreamHandler()  # type: Any
        handler.setLevel(level)
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    elif logtype == 'syslog':
        handler = logging.handlers.SysLogHandler(address='/dev/log')
        handler.setLevel(level)
        formatter = logging.Formatter('%(name)s - %(levelname)s - %(message)s')
    else:  # == file
        logfilename = cast(str, logfilename)
        logpath = os.path.split(logfilename)[0]
        if not os.path.exists(logpath):
            os.makedirs(logpath)
        handler = logging.handlers.RotatingFileHandler(logfilename, maxBytes=rotate_length,
                                                       backupCount=max_rotated_files)
        handler.setLevel(level)
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def require_list(value: Optional[List[Any]], item_type: Any=None, allow_none: bool=False) -> Any:
    """Make sure a value is a List[item_type].

    Used when dealing with http input data.
    """
    if value is None and allow_none:
        return value
    if type(value) != list:
        raise InvalidData('value was %s, expected list' % type(value))
    value = cast(List, value)
    if item_type:
        for item in value:
            if type(item) != item_type:
                raise InvalidData('list item was %s, expected %s' % (type(item), item_type))
    return value
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def require_int(value: Optional[Union[SupportsInt, str, bytes]], allow_none: bool=False) -> Any:
    """Make sure a value is an int.

    Used when dealing with http input data.
    """
    if value is None and allow_none:
        return value
    value = cast(Union[SupportsInt, str, bytes], value)
    try:
        value = int(value)
    except (ValueError, TypeError):
        raise InvalidData('value was %s(%s), expected list' % (type(value), value))
    return value
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def update_monitor(self) -> web.Response:
        request_data = await self.request.json()
        monitor = self._get_request_monitor(self.request)
        if 'args' in request_data:
            args = cast(Dict[str, str], require_dict(request_data['args']))
            await monitor.update_args(args)
        if 'checks_enabled' in request_data:
            await monitor.set_checks_enabled_status(cast(bool, require_bool(request_data['checks_enabled'])))
        if 'alerts_enabled' in request_data:
            await monitor.set_alerts_enabled_status(cast(bool, require_bool(request_data['alerts_enabled'])))
        return web.json_response(True)
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def _get_request_monitor(self, request: web.Request) -> ActiveMonitor:
        monitor_id = require_int(cast(str, get_request_param(request, 'id')))
        monitor = request.app['active_monitor_manager'].monitors.get(monitor_id, None)
        if not monitor:
            raise errors.NotFound()
        return monitor
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def get(self) -> web.Response:
        monitor_id = cast(int, require_int(get_request_param(self.request, 'monitor_id')))
        if 'include_all' in self.request.rel_url.query:
            contacts = await get_all_contacts_for_active_monitor(self.request.app['dbcon'], monitor_id)
        else:
            contacts = object_models.asdict(
                await get_contacts_for_active_monitor(self.request.app['dbcon'], monitor_id)
            )
        ret = object_models.list_asdict(contacts)
        return web.json_response(ret)
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def post(self) -> web.Response:
        request_data = await self.request.json()
        await add_contact_to_active_monitor(
            self.request.app['dbcon'],
            cast(int, require_int(request_data.get('contact_id'))),
            cast(int, require_int(request_data.get('monitor_id'))))
        return web.json_response(True)