Python typing 模块,MutableSequence() 实例源码


项目    作者:CodeGra-de    | 项目源码 | 文件源码
def dehead_filetree(tree: ExtractFileTree) -> ExtractFileTree:
    """Remove the head of the given filetree while preserving the old head

    So a tree ``{1: [2: [3: [4: [f1, f2]]]}`` will be converted to
    ``{1: [f1, f2]}``.

    :param dict tree: The file tree as generated by :py:func:`extract`.
    :returns: The same tree but deheaded as described.
    :rtype: dict
    assert len(tree) == 1
    head_node = list(tree.keys())[0]
    head = tree[head_node]

    while (
        isinstance(head, t.MutableSequence) and len(head) == 1 and
        isinstance(head[0], t.MutableMapping) and len(head[0]) == 1
        head = list(head[0].values())[0]

    tree[head_node] = head

    return tree
项目    作者:CodeGra-de    | 项目源码 | 文件源码
def get_grade_history(submission_id: int
                      ) -> JSONResponse[t.Sequence[models.GradeHistory]]:
    """Get the grade history for the given submission.

    .. :quickref: Submission; Get the grade history for the given submission.

    :returns: A list of :class:`.models.GradeHistory` object serialized to
        json for the given assignment.
    :raises PermissionException: If the current user has no permission to see
        the grade history. (INCORRECT_PERMISSION)
    work = helpers.get_or_404(models.Work, submission_id)

    auth.ensure_permission('can_see_grade_history', work.assignment.course_id)

    hist: t.MutableSequence[models.GradeHistory]
    hist = db.session.query(
        models.GradeHistory.changed_at.desc(),  # type: ignore

    return jsonify(hist)
项目    作者:iotaledger    | 项目源码 | 文件源码
def address_from_digest(digest):
    # type: (Digest) -> Address
    Generates an address from a private key digest.
    address_trits = [0] * (Address.LEN * TRITS_PER_TRYTE) # type: MutableSequence[int]

    sponge = Kerl()

    return Address.from_trits(
      trits = address_trits,

      key_index       = digest.key_index,
      security_level  = digest.security_level,
项目:puppeter    作者:coi-gov-pl    | 项目源码 | 文件源码
def __produce_commands(self, install):
        # type: (After4xCollectionInstaller) -> Sequence[str]
        args = []  # type: MutableSequence[str]
        mem = install.puppetserver_jvm_memory()
        jvmargs = install.puppetserver_jvm_args()
        if mem.is_set():
            if mem.heap_minimum() is not None:
            if mem.heap_maximum() is not None:
            java_version = Facter.get(JavaVersion)
            metaspace_arg = self.__get_metaspace_arg(java_version)
            if java_version.has_permgen_space() and mem.metaspace_maximum() is None:
            elif mem.metaspace_maximum() is not None:
        if jvmargs.are_set():
        args_as_str = ' '.join(args)
        collector = self._collector()
        mapping = dict(jvmargs=args_as_str)
        collector.collect_from_template('Configuring PuppetServer JVM Args',
        return collector.lines()
项目:puppeter    作者:coi-gov-pl    | 项目源码 | 文件源码
def segments(self):
        # type: () -> MutableSequence[int|str]
        return list(self.__get_segments())
项目:lymph-schema    作者:deliveryhero    | 项目源码 | 文件源码
def _to_jsonschema(type_):
    if isinstance(type_, marshmallow.Schema):
        return _jsonschema.dump_schema(type_)
    elif type_ in six.integer_types:
        return {'type': 'number', 'format': 'integer'}
    elif type_ == float:
        return {'type': 'number', 'format': 'float'}
    elif type_ == decimal.Decimal:
        return {'type': 'string', 'format': 'decimal'}
    elif type_ == uuid.UUID:
        return {'type': 'string', 'format': 'uuid'}
    elif type_ == datetime.datetime:
        return {'type': 'string', 'format': 'date-time'}
    elif type_ ==
        return {'type': 'string', 'format': 'date'}
    elif type_ == datetime.time:
        return {'type': 'string', 'format': 'time'}
    elif type_ == dict:
        return {'type': 'object'}
    elif type_ == six.text_type or type_ == six.binary_type:
        return {'type': 'string'}
    elif type_ is None:
        return {'type': 'null'}
    elif type_ == list:
        return {'type': 'array'}
    elif type_ == bool:
        return {'type': 'boolean'}
    elif issubclass(type_, typing.MutableSequence[typing.T]):
        items_type = type_.__parameters__[0]
        if issubclass(items_type, marshmallow.Schema):
            items_type = items_type()
        return {
            'type': 'array',
            'items': _to_jsonschema(items_type),
        raise ValueError('unsupported return type: %s' % type_)
项目:TopChef    作者:TopChef    | 项目源码 | 文件源码
def __init__(self, random_jobs: MutableSequence[JobInterface]):
        self._jobs = { job for job in random_jobs}
项目    作者:iotaledger    | 项目源码 | 文件源码
def squeeze(self, trits):
    # type: (MutableSequence[int]) -> None
    Squeeze trits from the sponge.

    :param trits:
      Sequence that the squeezed trits will be copied to.
      Note: this object will be modified!
    # Squeeze is kind of like the opposite of absorb; it copies trits
    # from internal state to the ``trits`` parameter, one hash at a
    # time, and transforming internal state in between hashes.
    # However, only the first hash of the state is "public", so we
    # can simplify the implementation somewhat.

    # Ensure that ``trits`` can hold at least one hash worth of trits.
    trits.extend([0] * max(0, HASH_LENGTH - len(trits)))

    # Copy exactly one hash.
    trits[0:HASH_LENGTH] = self._state[0:HASH_LENGTH]

    # One hash worth of trits copied; now transform.
项目    作者:iotaledger    | 项目源码 | 文件源码
def __next__(self):
    # type: () -> TryteString
    Returns the next signature fragment.
    key_trytes = next(self._key_chunks) # type: TryteString
    self._iteration += 1

    # If the key is long enough, loop back around to the start.
    normalized_chunk =\
      self._normalized_hash[self._iteration % len(self._normalized_hash)]

    signature_fragment = key_trytes.as_trits()

    # Build the signature, one hash at a time.
    for i in range(key_trytes.count_chunks(Hash.LEN)):
      hash_start  = i * HASH_LENGTH
      hash_end    = hash_start + HASH_LENGTH

      buffer = signature_fragment[hash_start:hash_end] # type: MutableSequence[int]

      for _ in range(13 - normalized_chunk[i]):

      signature_fragment[hash_start:hash_end] = buffer

    return TryteString.from_trits(signature_fragment)
项目:data-store    作者:HumanCellAtlas    | 项目源码 | 文件源码
def __init__(
            template_map: typing.MutableSequence[typing.Tuple[str, int]]=None,
    ) -> None:
        self.template_map = list() if template_map is None else template_map
项目:wuye.vim    作者:zhaoyingnan911    | 项目源码 | 文件源码
def we_can_has_sequence(p, q, r, s, t, u):
    :type p: typing.Sequence[int]
    :type q: typing.Sequence[B]
    :type r: typing.Sequence[int]
    :type s: typing.Sequence["int"]
    :type t: typing.MutableSequence[dict]
    :type u: typing.List[float]
    #? ["count"]
    #? int()
    #? ["count"]
    #? B()
    #? ["count"]
    #? int()
    #? ["count"]
    #? int()
    #? []
    #? ["append"]
    #? dict()
    #? ["append"]
    #? float()
项目:puckfetcher    作者:andrewmichaud    | 项目源码 | 文件源码
def __init__(self, feedstate_dict: Mapping[str, Any]=None) -> None:
        if feedstate_dict is not None:
            LOG.debug("Successfully loaded feed state dict.")

            self.feed = feedstate_dict.get("feed", {})
            self.entries = feedstate_dict.get("entries", [])
            self.entries_state_dict = feedstate_dict.get("entries_state_dict", {})
            self.queue = collections.deque(feedstate_dict.get("queue", []))

            # Store the most recent SUMMARY_LIMIT items we've downloaded.
            temp_list = feedstate_dict.get("summary_queue", [])
            self.summary_queue: MutableSequence[Dict[str, Any]] = collections.deque(

            # When we load from the cache file, mark all of the items in the summary queue as not
            # being from the current session.
            for elem in temp_list:
                elem["is_this_session"] = False

            last_modified = feedstate_dict.get("last_modified", None)

            self.etag = feedstate_dict.get("etag", None)
            self.latest_entry_number = feedstate_dict.get("latest_entry_number", None)

            LOG.debug("Did not successfully load feed state dict.")
            LOG.debug("Creating blank dict.")

            self.feed = {}
            self.entries = []
            self.entries_state_dict = {}
            self.queue = collections.deque([])
            self.summary_queue = collections.deque([], SUMMARY_LIMIT)
            self.last_modified: Any = None
            self.etag: str = None
            self.latest_entry_number = None
项目    作者:iotaledger    | 项目源码 | 文件源码
def absorb(self, trits, offset=0, length=None):
    # type: (MutableSequence[int], int, Optional[int]) -> None
    Absorb trits into the sponge from a buffer.

    :param trits:
      Buffer that contains the trits to absorb.

    :param offset:
      Starting offset in ``trits``.

    :param length:
      Number of trits to absorb.  Defaults to ``len(trits)``.
    # Pad input if necessary, so that it can be divided evenly into
    # hashes.
    # Note that this operation creates a COPY of ``trits``; the
    # incoming buffer is not modified!
    pad = ((len(trits) % TRIT_HASH_LENGTH) or TRIT_HASH_LENGTH)
    trits += [0] * (TRIT_HASH_LENGTH - pad)

    if length is None:
      length = len(trits)

    if length < 1:
      raise with_context(
        exc = ValueError('Invalid length passed to ``absorb``.'),

        context = {
          'trits': trits,
          'offset': offset,
          'length': length,

    while offset < length:
      stop = min(offset + TRIT_HASH_LENGTH, length)

      # If we're copying over a full chunk, zero last trit
      if stop - offset == TRIT_HASH_LENGTH:
        trits[stop - 1] = 0

      signed_nums = conv.convertToBytes(trits[offset:stop])

      # Convert signed bytes into their equivalent unsigned representation
      # In order to use Python's built-in bytes type
      unsigned_bytes = bytearray(conv.convert_sign(b) for b in signed_nums)


      offset += TRIT_HASH_LENGTH
项目    作者:iotaledger    | 项目源码 | 文件源码
def squeeze(self, trits, offset=0, length=None):
    # type: (MutableSequence[int], int, Optional[int]) -> None
    Squeeze trits from the sponge into a buffer.

    :param trits:
      Buffer that will hold the squeezed trits.

      IMPORTANT:  If ``trits`` is too small, it will be extended!

    :param offset:
      Starting offset in ``trits``.

    :param length:
      Number of trits to squeeze from the sponge.

      If not specified, defaults to :py:data:`TRIT_HASH_LENGTH` (i.e.,
      by default, we will try to squeeze exactly 1 hash).
    # Pad input if necessary, so that it can be divided evenly into
    # hashes.
    pad = ((len(trits) % TRIT_HASH_LENGTH) or TRIT_HASH_LENGTH)
    trits += [0] * (TRIT_HASH_LENGTH - pad)

    if length is None:
      # By default, we will try to squeeze one hash.
      # Note that this is different than ``absorb``.
      length = len(trits) or TRIT_HASH_LENGTH

    if length < 1:
      raise with_context(
        exc = ValueError('Invalid length passed to ``squeeze``.'),

        context = {
          'trits': trits,
          'offset': offset,
          'length': length,

    while offset < length:
      unsigned_hash = self.k.digest()

      if PY2:
        unsigned_hash = map(ord, unsigned_hash) # type: ignore

      signed_hash = [conv.convert_sign(b) for b in unsigned_hash]

      trits_from_hash = conv.convertToTrits(signed_hash)
      trits_from_hash[TRIT_HASH_LENGTH - 1] = 0

      stop = min(TRIT_HASH_LENGTH, length-offset)
      trits[offset:offset+stop] = trits_from_hash[0:stop]

      flipped_bytes = bytearray(conv.convert_sign(~b) for b in unsigned_hash)

      # Reset internal state before feeding back in

      offset += TRIT_HASH_LENGTH
项目    作者:iotaledger    | 项目源码 | 文件源码
def get_digest(self):
    # type: () -> Digest
    Generates the digest used to do the actual signing.

    Signing keys can have variable length and tend to be quite long,
    which makes them not-well-suited for use in crypto algorithms.

    The digest is essentially the result of running the signing key
    through a PBKDF, yielding a constant-length hash that can be used
    for crypto.
    hashes_per_fragment = FRAGMENT_LENGTH // Hash.LEN

    key_fragments = self.iter_chunks(FRAGMENT_LENGTH)

    # The digest will contain one hash per key fragment.
    digest = [0] * HASH_LENGTH * len(key_fragments)

    # Iterate over each fragment in the key.
    for (i, fragment) in enumerate(key_fragments): # type: Tuple[int, TryteString]
      fragment_trits = fragment.as_trits()

      key_fragment  = [0] * FRAGMENT_LENGTH
      hash_trits    = []

      # Within each fragment, iterate over one hash at a time.
      for j in range(hashes_per_fragment):
        hash_start  = j * HASH_LENGTH
        hash_end    = hash_start + HASH_LENGTH
        hash_trits  = fragment_trits[hash_start:hash_end] # type: MutableSequence[int]

        for k in range(26):
          sponge = Kerl()

        key_fragment[hash_start:hash_end] = hash_trits

      # After processing all of the hashes in the fragment, generate a
      # final hash and append it to the digest.
      # Note that we will do this once per fragment in the key, so the
      # longer the key is, the longer the digest will be.
      sponge = Kerl()

      fragment_start  = i * FRAGMENT_LENGTH
      fragment_end    = fragment_start + FRAGMENT_LENGTH

      digest[fragment_start:fragment_end] = hash_trits

    return Digest(TryteString.from_trits(digest), self.key_index)