Python typing 模块,MutableMapping() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用typing.MutableMapping()

项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def evaluate(self, query: MutableMapping[str, Any], context: PipelineContext = None) -> True:
        # This is a little weird. We have to handle the case of can_have("x").and_("y") here.
        # Since the only type of Node that can return False rather than just raising a
        # QueryValidationError is a KeyNode that isn't required, or an OrNode whose children
        # are all KeyNodes that aren't required, we can't short circuit on a False value. If
        # we have can_have("x").and_("y"), and only some are True, we want raise a
        # QueryValidationError. If all are True or all are False, everything's fine.
        all_true = True
        all_possible_false = True
        for child in self.children:
            is_true = child.evaluate(query, context)
            all_true = all_true and is_true
            if child.falsifiable:
                all_possible_false = all_possible_false and not is_true

        if all_possible_false or all_true:
            return True
        raise BoundKeyExistenceError("Query must have all or none of the elements joined with \"and\" in a \"can_have\" statement!")
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def evaluate(self, query: MutableMapping[str, Any], context: PipelineContext = None) -> bool:
        # We can't short circuit this because we want to raise any WrongValueTypeError or BoundKeyExistenceError that could occur.
        # We count failures because the only other node than can return False is a KeyNode that isn't required. An OrNode is only
        # allowed to return False when all its children are False KeyNodes. The AndNodes handle raising an error if a can_have
        # expression with ands and ors fails.
        errors = []
        failure_count = 0
        result = False
        for child in self.children:
            try:
                is_true = child.evaluate(query, context)
                if not is_true:
                    failure_count += 1
                result = is_true or result
            except (MissingKeyError, BoundKeyExistenceError) as error:
                errors.append(error)
                failure_count += 1
        if failure_count == len(self.children) and len(errors) > 0:
            raise errors[0]
        return result
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def with_default(self, value: Union[Any, Callable[[MutableMapping[str, Any]], Any]], supplies_type: Type = None) -> "QueryValidator":
        if self._current is None or self._current.child is not None:
            raise QueryValidatorStructureError("No key is selected! Try using \"can_have\" before \"with_default\".")

        if self._current.required:
            raise QueryValidatorStructureError("Can't assign a default value to a required key! Try using \"can_have\" instead of \"have\".")

        if supplies_type:
            expected_type = supplies_type
        else:
            expected_type = type(value)

        default_node = _DefaultValueNode(self._current.key, value, supplies_type)
        result = self.as_(expected_type)
        result._current.child.child = default_node
        return result
项目:aiomongo    作者:ZeoAlliance    | 项目源码 | 文件源码
def explain(self) -> MutableMapping:
        """Returns an explain plan record for this cursor.

        .. mongodoc:: explain
        """

        c = self.clone()
        c.__explain = True

        # always use a hard limit for explains
        if c.__limit:
            c.__limit = -abs(self.__limit)

        async for expl in c:
            return expl

        return None
项目:aiomongo    作者:ZeoAlliance    | 项目源码 | 文件源码
def dereference(self, dbref: DBRef, **kwargs) -> MutableMapping:
        """Dereference a :class:`~bson.dbref.DBRef`, getting the
        document it points to.

        Raises :class:`TypeError` if `dbref` is not an instance of
        :class:`~bson.dbref.DBRef`. Returns a document, or ``None`` if
        the reference does not point to a valid document.  Raises
        :class:`ValueError` if `dbref` has a database specified that
        is different from the current database.

        :Parameters:
          - `dbref`: the reference
          - `**kwargs` (optional): any additional keyword arguments
            are the same as the arguments to
            :meth:`~aiomongo.collection.Collection.find`.
        """
        if not isinstance(dbref, DBRef):
            raise TypeError('cannot dereference a {}'.format(type(dbref)))
        if dbref.database is not None and dbref.database != self.name:
            raise ValueError('trying to dereference a DBRef that points to '
                             'another database ({} not {})'.format(dbref.database, self.name))
        return await self[dbref.collection].find_one({'_id': dbref.id}, **kwargs)
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def dehead_filetree(tree: ExtractFileTree) -> ExtractFileTree:
    """Remove the head of the given filetree while preserving the old head
    name.

    So a tree ``{1: [2: [3: [4: [f1, f2]]]}`` will be converted to
    ``{1: [f1, f2]}``.

    :param dict tree: The file tree as generated by :py:func:`extract`.
    :returns: The same tree but deheaded as described.
    :rtype: dict
    """
    assert len(tree) == 1
    head_node = list(tree.keys())[0]
    head = tree[head_node]

    while (
        isinstance(head, t.MutableSequence) and len(head) == 1 and
        isinstance(head[0], t.MutableMapping) and len(head[0]) == 1
    ):
        head = list(head[0].values())[0]

    tree[head_node] = head

    return tree
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def get_all_permissions(self) -> t.Mapping[str, bool]:
        """Get all course :class:`permissions` for this course role.

        :returns: A name boolean mapping where the name is the name of the
                  permission and the value indicates if this user has this
                  permission.
        """
        perms: t.Sequence[Permission] = (
            Permission.query.
            filter_by(  # type: ignore
                course_permission=True
            ).all()
        )
        result: t.MutableMapping[str, bool] = {}
        for perm in perms:
            if perm.name in self._permissions:
                result[perm.name] = not perm.default_value
            else:
                result[perm.name] = perm.default_value
        return result
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def get_all_permissions(self) -> t.Mapping[str, bool]:
        """Get all course permissions (:class:`Permission`) for this role.

        :returns: A name boolean mapping where the name is the name of the
                  permission and the value indicates if this user has this
                  permission.
        """
        perms: t.Sequence[Permission] = (
            Permission.query.
            filter_by(  # type: ignore
                course_permission=False
            ).all()
        )
        result: t.MutableMapping[str, bool] = {}
        for perm in perms:
            if perm.name in self._permissions:
                result[perm.name] = not perm.default_value
            else:
                result[perm.name] = perm.default_value
        return result
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def __to_json__(self) -> t.MutableMapping[str, t.Any]:
        """Creates a JSON serializable representation of a role.

        This object will look like this:

        .. code:: python

            {
                'id':    int, # The id of this role.
                'name':  str, # The name of this role.
            }

        :returns: An object as described above.
        """
        return {
            'name': self.name,
            'id': self.id,
        }
项目:curious    作者:SunDwarf    | 项目源码 | 文件源码
def __init__(self, bot, **kwargs):
        super().__init__(id=int(kwargs.get("id", 0)), cl=bot)

        #: The name of this guild.
        self.name = kwargs.get("name", "")

        #: A mapping of :class:`~.WidgetChannel` in this widget guild.
        self._channels = {}  # type: typing.MutableMapping[int, WidgetChannel]
        for channel in kwargs.get("channels", []):
            c = WidgetChannel(bot=self._bot, guild=self, **channel)
            self._channels[c.id] = c

        #: A mapping of :class:`~.WidgetMember` in this widget guild.
        self._members = {}
        for member in kwargs.get("members", []):
            m = WidgetMember(bot=self._bot, guild=self, kwargs=member)
            self._members[m.id] = m
项目:data-store    作者:HumanCellAtlas    | 项目源码 | 文件源码
def get_indexed_versions(self) -> typing.MutableMapping[str, str]:
        """
        Returns a dictionary mapping the name of each index containing this document to the
        version of this document in that index. Note that `version` denotes document version, not
        bundle version.
        """
        page_size = 64
        es_client = ElasticsearchClient.get(self.logger)
        alias_name = Config.get_es_alias_name(ESIndexType.docs, self.replica)
        response = es_client.search(index=alias_name, body={
            '_source': False,
            'stored_fields': [],
            'version': True,
            'from': 0,
            'size': page_size,
            'query': {
                'terms': {
                    '_id': [str(self.fqid)]
                }
            }
        })
        hits = response['hits']
        assert hits['total'] <= page_size, 'Document is in too many indices'
        indices = {hit['_index']: hit['_version'] for hit in hits['hits']}
        return indices
项目:aiohttp_json_api    作者:vovanbo    | 项目源码 | 文件源码
def serialize(self, schema, data, **kwargs) -> typing.MutableMapping:
        """Composes the final relationships object."""
        document = OrderedDict()

        if data is None:
            document['data'] = data
        elif isinstance(data, Mapping):
            # JSON API resource linkage or JSON API relationships object
            if 'type' in data and 'id' in data:
                document['data'] = data
        else:
            # the related resource instance
            document['data'] = \
                schema.ctx.registry.ensure_identifier(data, asdict=True)

        links = kwargs.get('links')
        if links is not None:
            document['links'] = links

        return document
项目:aiohttp_json_api    作者:vovanbo    | 项目源码 | 文件源码
def serialize(self, schema, data, links=None, pagination=None,
                  **kwargs) -> typing.MutableMapping:
        """Composes the final JSON API relationships object.

        :arg ~aiohttp_json_api.pagination.PaginationABC pagination:
            If not *None*, the links and meta members of the pagination
            helper are added to the final JSON API relationship object.
        """
        document = OrderedDict()

        if is_collection(data):
            document['data'] = [
                schema.ctx.registry.ensure_identifier(item, asdict=True)
                for item in data
            ]

        if links is not None:
            document['links'] = links

        if pagination is not None:
            document['links'].update(pagination.links())
            document.setdefault('meta', OrderedDict())
            document['meta'].update(pagination.meta())

        return document
项目:aiohttp_json_api    作者:vovanbo    | 项目源码 | 文件源码
def links(self) -> MutableMapping:
        """
        Return pagination links.

        **Must be overridden.**

        A dictionary, which must be included in the top-level *links object*.
        It contains these keys:

        *   *self*
            The link to the current page

        *   *first*
            The link to the first page

        *   *last*
            The link to the last page

        *   *prev*
            The link to the previous page (only set, if a previous page exists)

        *   *next*
            The link to the next page (only set, if a next page exists)
        """
        raise NotImplementedError
项目:aiohttp_json_api    作者:vovanbo    | 项目源码 | 文件源码
def meta(self) -> MutableMapping:
        """
        Return meta object of paginator.

        *   *total-resources*
            The total number of resources in the collection
        *   *page-limit*
            The number of resources on a page
        *   *page-offset*
            The offset of the current page
        """
        return {
            'total-resources': self.total_resources,
            'page-limit': self.limit,
            'page-offset': self.offset
        }
项目:aiohttp_json_api    作者:vovanbo    | 项目源码 | 文件源码
def meta(self) -> MutableMapping:
        """
        Return meta object of pagination.

        *   *total-resources*
            The total number of resources in the collection
        *   *last-page*
            The index of the last page
        *   *page-number*
            The number of the current page
        *   *page-size*
            The (maximum) number of resources on a page
        """
        return {
            'total-resources': self.total_resources,
            'last-page': self.last_page,
            'page-number': self.number,
            'page-size': self.size
        }
项目:pyplot-hierarchical-pie    作者:klieret    | 项目源码 | 文件源码
def complete_pv(pathvalues: MutableMapping[Path, float]) -> MutableMapping[Path, float]:
    """ Consider a pathvalue dictionary of the form Dict[Path, float] e.g.
    {1.1.1: 12.0} (here: only one entry). This function will disect each path
    and assign its value to the truncated path: e.g. here 1, 1.1 and 1.1.1.
    Thus we get {1: 12.0, 1.1: 12.0, 1.1.1: 12.0}. For more items the values
    will be summed accordingly.
    Furthermore the total sum of the items of
    the topmost level will be assigned to the empty path. For this to make
    sense we require that no empy path is in the data beforehand""
    :param pathvalues: {path: value} dictionary
    :return: {path: value}
    dictionary
    """
    if Path(()) in pathvalues:
        raise ValueError("This function does not allow the empty path as item"
                         "in the data list.")
    completed = collections.defaultdict(float)
    for path, value in pathvalues.items():
        # len(path) +1 ensures that also the whole tag is considered
        # starting point 0: also add to empty path.
        for level in range(0, len(path) + 1):
            completed[path[:level]] += value
    return completed
项目:burp    作者:kudelskisecurity    | 项目源码 | 文件源码
def from_json(cls, json: MutableMapping[str, Any]) \
            -> 'RequestReturnedGetNone':
        with JsonParser(json):
            assert json.pop('messageType', None) == 'request'
            return RequestReturnedGetNone(
                in_scope=ensure(bool, json.pop('inScope')),
                http_version=parse_http_version(json.pop('httpVersion')),
                body=b64decode(json.pop('body').encode()),
                tool_flag=ensure(int, json.pop('toolFlag')),
                url=ensure(str, json.pop('url')),
                method=ensure(str, json.pop('method')),
                protocol=ensure(str, json.pop('protocol')),
                path=ensure(str, json.pop('path')),
                headers=tuple(cls.__pop_headers(json)),
                port=ensure(int, json.pop('port')),
                host=ensure(str, json.pop('host')),
                raw=b64decode(json.pop('raw').encode()),
                reference_id=ensure(int, json.pop('referenceID')),
                query=ensure((str, type(None)), json.pop('query', None)),
            )
项目:burp    作者:kudelskisecurity    | 项目源码 | 文件源码
def from_json(cls, json: MutableMapping[str, Any]) -> 'RequestReturned':
        with JsonParser(json):
            json.pop('messageType')
            json.pop('query', None)  # TODO sometimes it's there
            json.pop('comment', None)  # TODO sometimes it's there
            return RequestReturned(
                http_version=parse_http_version(json.pop('httpVersion')),
                headers=tuple(
                    (ensure(str, k), ensure(str, v))
                    for k, v in pop_all(json.pop('headers')).items()
                ),
                body=b64decode(json.pop('body').encode()),
                **dict(chain(
                    _common_returned(json).items(),
                    pop_all(ensure_values(str, json)).items(),
                ))
            )
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def evaluate(self, query: MutableMapping[str, Any], context: PipelineContext = None) -> bool:
        pass
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def evaluate(self, query: MutableMapping[str, Any], context: PipelineContext = None) -> True:
        # This always returns true because it'll raise a QueryValidationError if the query wasn't
        # valid. We also don't want normal AND behavior at this level so we just evaluate the
        # children and return True
        for child in self.children:
            child.evaluate(query, context)
        return True
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def __init__(self, key: str, value: Union[Any, Callable[[MutableMapping[str, Any]], Any]], supplies_type: Type = None) -> None:
        self.key = key
        self.value = value
        self.supplies_type = supplies_type
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def evaluate(self, query: MutableMapping[str, Any], context: PipelineContext = None) -> True:
        try:
            value = query[self.key]
            for type in self.types:
                if issubclass(type, Enum) and isinstance(value, str):
                    value = type(value)
                if isinstance(value, type):
                    query[self.key] = value
                    return True
            raise WrongValueTypeError("{key} must be of type {type} in query!".format(key=self.key, type=self))
        except KeyError:
            if self.child:
                self.child.evaluate(query, context)
        return True
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def evaluate(self, query: MutableMapping[str, Any], context: PipelineContext = None) -> bool:
        has_key = self.key in query
        if self.required and not has_key:
            raise MissingKeyError("{key} must be in query!".format(key=self.key))
        if self.child:
            self.child.evaluate(query, context)
        return has_key
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def __call__(self, query: MutableMapping[str, Any], context: PipelineContext = None) -> True:
        return self._root.evaluate(query, context)
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def validate_query(validator: QueryValidator, *pre_transforms: Callable[[MutableMapping], None]) -> Callable[[Callable[[Any, MutableMapping[str, Any], PipelineContext], Union[Any, Iterable[Any]]]], Callable[[Any, MutableMapping[str, Any], PipelineContext], Union[Any, Iterable[Any]]]]:
    def wrapper(method: Callable[[Any, MutableMapping[str, Any], PipelineContext], Union[Any, Iterable[Any]]]) -> Callable[[Any, MutableMapping[str, Any], PipelineContext], Union[Any, Iterable[Any]]]:
        @wraps(method)
        def wrapped(self: Any, query: MutableMapping[str, Any], context: PipelineContext = None):
            for transform in pre_transforms:
                transform(query)

            validator(query)
            return method(self, query, context)

        return wrapped
    return wrapper
项目:aiomongo    作者:ZeoAlliance    | 项目源码 | 文件源码
def __anext__(self) -> MutableMapping:

        if len(self.__data):
            return self.__data.popleft()

        is_refereshed = await self._refresh()

        if not is_refereshed:
            raise StopAsyncIteration

        return self.__data.popleft()
项目:aiomongo    作者:ZeoAlliance    | 项目源码 | 文件源码
def to_list(self) -> List[MutableMapping]:
        """ Fetches all data from cursor to in-memory list.
        """
        items = []
        async with self:
            async for item in self:
                items.append(item)
        return items
项目:aiomongo    作者:ZeoAlliance    | 项目源码 | 文件源码
def _command(self, connection: 'aiomongo.Connection', command: Union[str, dict], value: Any=1,
                       check: bool = True, allowable_errors: Optional[List[str]] = None,
                       read_preference: Union[_ALL_READ_PREFERENCES] = ReadPreference.PRIMARY,
                       codec_options: CodecOptions = DEFAULT_CODEC_OPTIONS, **kwargs) -> MutableMapping:
        """Internal command helper."""
        if isinstance(command, str):
            command = SON([(command, value)])
        command.update(kwargs)

        return await connection.command(
            self.name, command, read_preference, codec_options, check=check,
            allowable_errors=allowable_errors
        )
项目:aiomongo    作者:ZeoAlliance    | 项目源码 | 文件源码
def insert_one(self, document: MutableMapping, bypass_document_validation: bool = False,
                         check_keys: bool = True) -> InsertOneResult:

        common.validate_is_document_type('document', document)

        if '_id' not in document and not isinstance(document, RawBSONDocument):
            document['_id'] = ObjectId()

        write_concern = self.write_concern.document
        acknowledged = write_concern.get('w') != 0

        connection = await self.database.client.get_connection()

        if acknowledged:
            command = SON([('insert', self.name),
                           ('ordered', True),
                           ('documents', [document])])

            if bypass_document_validation and connection.max_wire_version >= 4:
                command['bypassDocumentValidation'] = True

            result = await connection.command(
                self.database.name, command, ReadPreference.PRIMARY, self.__write_response_codec_options,
                check_keys=check_keys
            )

            helpers._check_write_command_response([(0, result)])
        else:
            if bypass_document_validation and connection.max_wire_version >= 4:
                raise OperationFailure('Cannot set bypass_document_validation with',
                                       ' unacknowledged write concern')

            _, msg, _ = message.insert(
                str(self), [document], check_keys,
                acknowledged, write_concern, False, self.__write_response_codec_options
            )
            connection.send_message(msg)

        document_id = document['_id'] if not isinstance(document, RawBSONDocument) else None

        return InsertOneResult(document_id, acknowledged)
项目:aiomongo    作者:ZeoAlliance    | 项目源码 | 文件源码
def __find_and_modify(self, filter: dict, projection: Optional[Union[list, dict]],
                                sort: Optional[List[tuple]], upsert: Optional[bool] = None,
                                return_document: bool = ReturnDocument.BEFORE, **kwargs) -> MutableMapping:
        """Internal findAndModify helper."""
        common.validate_is_mapping('filter', filter)
        if not isinstance(return_document, bool):
            raise ValueError('return_document must be ReturnDocument.BEFORE or ReturnDocument.AFTER')
        cmd = SON([('findAndModify', self.name),
                   ('query', filter),
                   ('new', return_document)])
        collation = validate_collation_or_none(kwargs.pop('collation', None))
        cmd.update(kwargs)
        if projection is not None:
            cmd['fields'] = helpers._fields_list_to_dict(projection, 'projection')
        if sort is not None:
            cmd['sort'] = helpers._index_document(sort)
        if upsert is not None:
            common.validate_boolean('upsert', upsert)
            cmd['upsert'] = upsert

        connection = await self.database.client.get_connection()
        if connection.max_wire_version >= 4 and 'writeConcern' not in cmd:
            wc_doc = self.write_concern.document
            if wc_doc:
                cmd['writeConcern'] = wc_doc

        out = await connection.command(
            self.database.name, cmd, ReadPreference.PRIMARY, self.codec_options,
            allowable_errors=[_NO_OBJ_ERROR], collation=collation
        )
        helpers._check_write_command_response([(0, out)])
        return out.get('value')
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def __to_json__(self) -> t.Mapping[t.Any, t.Any]:
        """Creates a JSON serializable representation of this object.

        :returns: This APIException instance as a dictionary.
        """
        ret = dict(self.rest)  # type: t.MutableMapping[t.Any, t.Any]
        ret['message'] = self.message
        ret['description'] = self.description
        ret['code'] = self.api_code.name
        return ret
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def extract(
    file: FileStorage,
    ignore_filter: IgnoreFilterManager = None,
    handle_ignore: IgnoreHandling = IgnoreHandling.keep
) -> t.Optional[ExtractFileTree]:
    """Extracts all files in archive with random name to uploads folder.

    :param werkzeug.datastructures.FileStorage file: The file to extract.
    :param ignore_filter: What files should be ignored in the given archive.
        This can only be None when ``handle_ignore`` is
        ``IgnoreHandling.keep``.
    :param handle_ignore: How should ignored file be handled.
    :returns: A file tree as generated by
        :py:func:`rename_directory_structure`.
    """
    if handle_ignore == IgnoreHandling.keep and ignore_filter is None:
        ignore_filter = IgnoreFilterManager([])
    elif ignore_filter is None:  # pragma: no cover
        raise ValueError

    tmpdir = extract_to_temp(
        file,
        ignore_filter,
        handle_ignore,
    )
    rootdir = tmpdir.rstrip(os.sep)
    start = rootdir.rfind(os.sep) + 1
    try:
        res = rename_directory_structure(tmpdir)[tmpdir[start:]]
        filename: str = file.filename.split('.')[0]
        if not res:
            return None
        elif len(res) > 1:
            return {filename: res if isinstance(res, list) else [res]}
        elif not isinstance(res[0], t.MutableMapping):
            return {filename: res}
        else:
            return res[0]
    finally:
        shutil.rmtree(tmpdir)
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def __init__(self, name: str) -> None:
            self.conf: t.MutableMapping[t.Any, t.Any] = {}
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def is_valid_request(
        self,
        request: t.Any,
        parameters: t.MutableMapping[str, str] = {},
        fake_method: t.Any = None,
        handle_error: bool = True
    ) -> bool:
        '''
        Validates an OAuth request using the python-oauth2 library:
            https://github.com/simplegeo/python-oauth2
        '''

        def handle(e: oauth2.Error) -> bool:
            if handle_error:
                return False
            else:
                raise e

        try:
            method, url, headers, parameters = self.parse_request(
                request, parameters, fake_method
            )

            oauth_request = oauth2.Request.from_request(
                method, url, headers=headers, parameters=parameters
            )

            oauth2.Token
            self.oauth_server.verify_request(
                oauth_request, self.oauth_consumer, {}
            )

        except oauth2.Error as e:
            return handle(e)
        except ValueError as e:
            return handle(e)
        # Signature was valid
        return True
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def parse_request(
        self,
        req: 'flask.Request',
        parameters: t.MutableMapping[str, str] = None,
        fake_method: t.Any = None
    ) -> t.Tuple[str, str, t.MutableMapping[str, str], t.MutableMapping[str,
                                                                        str]]:
        '''
        Parse Flask request
        '''
        return (req.method, req.url, dict(req.headers), req.form.copy())
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def __to_json__(self) -> t.MutableMapping[str, t.Any]:
        """Creates a JSON serializable representation of this object.
        """
        return {
            'name': self.name,
            'course': self.course,
            'id': self.id,
        }
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def set_bool(
    out: t.MutableMapping[str, t.Any], parser: t.Any, item: str, default: bool
) -> None:
    val = parser.getboolean(item)
    out[item] = bool(default if val is None else val)
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def set_float(
    out: t.MutableMapping[str, t.Any],
    parser: t.Any,
    item: str,
    default: float
) -> None:
    val = parser.getfloat(item)
    out[item] = float(default if val is None else val)
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def set_int(
    out: t.MutableMapping[str, t.Any], parser: t.Any, item: str, default: int
) -> None:
    val = parser.getint(item)
    out[item] = int(default if val is None else val)
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def set_str(
    out: t.MutableMapping[str, t.Any], parser: t.Any, item: str, default: str
) -> None:
    val = parser.get(item)
    out[item] = str(default if val is None else val)
项目:btle-sniffer    作者:scipag    | 项目源码 | 文件源码
def __init__(self, uuid: str, value: Optional[Sequence[int]],
                 flags: Sequence[str]):
        self.uuid = uuid
        self.value = value
        self.flags = flags
        self.descriptors: MutableMapping[str, GATTDescriptor] = dict()
项目:btle-sniffer    作者:scipag    | 项目源码 | 文件源码
def __init__(self, uuid: str, primary: bool):
        self.uuid = uuid
        self.primary = primary
        self.characteristics: MutableMapping[str, GATTCharacteristic] = dict()
项目:btle-sniffer    作者:scipag    | 项目源码 | 文件源码
def __init__(self,
                 path: str, address: str,
                 paired: bool, connected: bool, services_resolved: bool,
                 name: Optional[str] = None, device_class: Optional[int] = None,
                 appearance: Optional[int] = None, uuids: Sequence[str] = None,
                 rssi: int = None, tx_power: int = None,
                 manufacturer_data: Dict[int, Sequence[int]] = None,
                 service_data: Dict[str, Sequence[int]] = None) -> None:
        self.active = True
        self.path = path
        self.address = address
        self.paired = paired
        self.connected = connected
        self.services_resolved = services_resolved
        self.name = name
        self.device_class = device_class
        self.appearance = appearance
        self.uuids = set(uuids) if uuids is not None else set()
        self.rssis = [rssi] if rssi is not None else list()
        self.tx_power = tx_power
        self.first_seen = datetime.datetime.now()
        self.last_seen = datetime.datetime.now()
        self.services: MutableMapping[str, GATTService] = dict()

        self.manufacturer_data = dict()
        if manufacturer_data is not None:
            for k, v in manufacturer_data.items():
                self.manufacturer_data[k] = [v]

        self.service_data = dict()
        if service_data is not None:
            self.uuids = self.uuids.union(service_data.keys())
            for k, v in service_data.items():
                self.service_data[k] = [v]
项目:kubernetes-ec2-autoscaler    作者:openai    | 项目源码 | 文件源码
def list_scale_sets(self, resource_group_name: str) -> List[AzureScaleSet]:
        fifteen_minutes_ago = datetime.now(pytz.utc) - TIMEOUT_PERIOD
        filter_clause = "eventTimestamp ge '{}' and resourceGroupName eq '{}'".format(fifteen_minutes_ago, resource_group_name)
        select_clause = "authorization,status,subStatus,properties,resourceId,eventTimestamp"

        failures_by_scale_set: MutableMapping[str, List[EventData]] = {}
        for log in self._monitor_client.activity_logs.list(filter=filter_clause, select=select_clause):
            if (log.status and log.status.value == 'Failed') or (log.properties and log.properties.get('statusCode') == 'Conflict'):
                if log.authorization and log.authorization.action and 'delete' in log.authorization.action:
                    continue
                failures_by_scale_set.setdefault(log.resource_id, []).append(log)

        result = []
        for scale_set in self._compute_client.virtual_machine_scale_sets.list(resource_group_name):
            failures = sorted(failures_by_scale_set.get(scale_set.id, []), key=lambda x: x.event_timestamp, reverse=True)
            timeout_until = None
            timeout_reason = None
            for failure in failures:
                status_message = json.loads(failure.properties.get('statusMessage', "{}")) if failure.properties else {}
                error_details = status_message.get('error', {})
                if 'message' in error_details:
                    timeout_until = failure.event_timestamp + TIMEOUT_PERIOD
                    timeout_reason = error_details['message']
                    # Stop if we found a message with details
                    break
                if timeout_until is None:
                    timeout_until = failure.event_timestamp + TIMEOUT_PERIOD
                    timeout_reason = failure.sub_status.localized_value

            priority = int(scale_set.tags[PRIORITY_TAG]) if PRIORITY_TAG in scale_set.tags else None
            no_schedule_taints = json.loads(scale_set.tags.get(NO_SCHEDULE_TAINTS_TAG, '{}'))

            result.append(AzureScaleSet(scale_set.location, resource_group_name, scale_set.name, scale_set.sku.name,
                                        scale_set.sku.capacity, scale_set.provisioning_state, timeout_until=timeout_until,
                                        timeout_reason=timeout_reason, priority=priority, no_schedule_taints=no_schedule_taints))
        return result
项目:kubernetes-ec2-autoscaler    作者:openai    | 项目源码 | 文件源码
def __init__(self, delegate: AzureApi) -> None:
        self._delegate = delegate
        self._lock = RLock()
        self._instance_cache: MutableMapping[Tuple[str, str], List[AzureScaleSetInstance]] = {}
        self._scale_set_cache: MutableMapping[str, List[AzureScaleSet]] = {}
        self._remaining_instances_cache: MutableMapping[str, MutableMapping[str, int]] = {}
项目:curious    作者:SunDwarf    | 项目源码 | 文件源码
def __init__(self, guild: 'Guild',
                 channels: 'typing.MutableMapping[int, channel.Channel]'):
        """
        :param guild: The :class:`~.Guild` object that owns this wrapper.
        :param channels: The dictionary of channels that this wrapper contains.
        """
        self._guild = guild
        self._channels = channels
项目:curious    作者:SunDwarf    | 项目源码 | 文件源码
def __init__(self, guild: 'Guild',
                 roles: 'typing.MutableMapping[int, role.Role]'):
        """
        :param guild: The :class:`~.Guild` object that owns this wrapper.
        :param roles: The dictionary of roles that this wrapper contains.
        """
        self._guild = guild
        self._roles = roles
项目:curious    作者:SunDwarf    | 项目源码 | 文件源码
def __init__(self, guild: 'Guild',
                 emojis: 'typing.MutableMapping[int, dt_emoji.Emoji]'):
        """
        :param guild: The :class:`.Guild` object that owns this wrapper.
        :param emojis: The dictionary of emojis that this wrapper contains.
        """
        self._guild = guild
        self._emojis = emojis
项目:data-store    作者:HumanCellAtlas    | 项目源码 | 文件源码
def remove_versions(self, versions: typing.MutableMapping[str, str]):
        """
        Remove this document from each given index provided that it contains the given version of this document.
        """
        es_client = ElasticsearchClient.get(self.logger)
        num_ok, errors = bulk(es_client, raise_on_error=False, actions=[{
            '_op_type': 'delete',
            '_index': index_name,
            '_type': ESDocType.doc.name,
            '_version': version,
            '_id': str(self.fqid),
        } for index_name, version in versions.items()])
        for item in errors:
            self.logger.warning(f"Document deletion failed: {json.dumps(item)}")