Python typing 模块,Generator() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用typing.Generator()

项目:python-devtools    作者:samuelcolvin    | 项目源码 | 文件源码
def __init__(self,
                 indent_step=4,
                 indent_char=' ',
                 repr_strings=False,
                 simple_cutoff=10,
                 width=120,
                 yield_from_generators=True):
        self._indent_step = indent_step
        self._c = indent_char
        self._repr_strings = repr_strings
        self._repr_generators = not yield_from_generators
        self._simple_cutoff = simple_cutoff
        self._width = width
        self._type_lookup = [
            (dict, self._format_dict),
            (str, self._format_str),
            (bytes, self._format_bytes),
            (tuple, self._format_tuples),
            ((list, set, frozenset), self._format_list_like),
            (collections.Generator, self._format_generators),
        ]
项目:drf-metadata    作者:night-crawler    | 项目源码 | 文件源码
def get_meta(self) -> t.Generator[t.Dict, None, None]:
        all_fields = self.model._meta.get_fields(
            include_parents=self.include_parents, include_hidden=self.include_hidden
        )
        for f in all_fields:
            if f.name in self.exclude:
                continue
            if self.fields and f.name not in self.fields:
                continue

            if f.name not in self.fields:
                if f.concrete not in self.concrete_in:
                    continue
                if f.auto_created not in self.auto_created_in:
                    continue
                if f.editable not in self.editable_in:
                    continue

            yield self.get_field_meta(f)
项目:merakicommons    作者:meraki-analytics    | 项目源码 | 文件源码
def enumerate(self, item: Any, reverse: bool = False) -> Generator[Tuple[int, Any], None, None]:
        items = self
        if reverse:
            max = len(items) - 1
            items = reversed(items)
        for index, x in enumerate(items):
            if x == item:
                yield max - index if reverse else index, x
                continue

            try:
                if item in x:
                    yield max - index if reverse else index, x
            except TypeError:
                # x doesn't define __contains__
                pass
项目:merakicommons    作者:meraki-analytics    | 项目源码 | 文件源码
def enumerate(self, item: Any) -> Generator[Tuple[Any, Any], None, None]:
        for key, value in self.items():
            if key == item:
                yield key, value
                continue

            try:
                if item in key:
                    yield key, value
                    continue
            except TypeError:
                # key doesn't define __contains__
                pass

            if value == item:
                yield key, value
                continue

            try:
                if item in value:
                    yield key, value
                    continue
            except TypeError:
                # value doesn't define __contains__
                pass
项目:asyncqlio    作者:SunDwarf    | 项目源码 | 文件源码
def get_columns(self, table_name: str = None) \
            -> 'typing.Generator[md_column.Column, None, None]':
        """
        Yields a :class:`.md_column.Column` for each column in the specified table,
        or for each column in the schema if no table is specified.

        These columns don't point to a :class:`.md_table.Table` since there
        might not be one, but accessing __name__ and __tablename__ of the column's
        table will still work as expected.

        :param table_name: The table to get indexes from, or all tables if omitted
        """
        params = {"table_name": table_name}
        emitter = self.bind.emit_param

        sql = self.bind.dialect.get_column_sql(table_name, emitter=emitter)
        cur = await self.transaction.cursor(sql, params)
        records = await cur.flatten()
        await cur.close()

        return self.bind.dialect.transform_rows_to_columns(*records, table_name=table_name)
项目:asyncqlio    作者:SunDwarf    | 项目源码 | 文件源码
def get_indexes(self, table_name: str = None) \
            -> 'typing.Generator[md_index.Index, None, None]':
        """
        Yields a :class:`.md_index.Index` for each index in the specified table,
        or for each index in the schema if no table is specified.

        These indexes don't point to a :class:`.md_table.Table` since there
        might not be one, but they have a table_name attribute.

        :param table_name: The table to get indexes from, or all tables if omitted
        """
        params = {"table_name": table_name}
        emitter = self.bind.emit_param

        sql = self.bind.dialect.get_index_sql(table_name, emitter=emitter)
        cur = await self.transaction.cursor(sql, params)
        records = await cur.flatten()
        await cur.close()

        return self.bind.dialect.transform_rows_to_indexes(*records, table_name=table_name)
项目:rcli    作者:contains-io    | 项目源码 | 文件源码
def _get_module_commands(module):
    # type: (ast.Module) -> typing.Generator[_EntryPoint, None, None]
    """Yield all Command objects represented by the python module.

    Module commands consist of a docopt-style module docstring and a callable
    Command class.

    Args:
        module: An ast.Module object used to retrieve docopt-style commands.

    Yields:
        Command objects that represent entry points to append to setup.py.
    """
    cls = next((n for n in module.body
                if isinstance(n, ast.ClassDef) and n.name == 'Command'), None)
    if not cls:
        return
    methods = (n.name for n in cls.body if isinstance(n, ast.FunctionDef))
    if '__call__' not in methods:
        return
    docstring = ast.get_docstring(module)
    for commands, _ in usage.parse_commands(docstring):
        yield _EntryPoint(commands[0], next(iter(commands[1:]), None), None)
项目:rcli    作者:contains-io    | 项目源码 | 文件源码
def _get_class_commands(module):
    # type: (ast.Module) -> typing.Generator[_EntryPoint, None, None]
    """Yield all Command objects represented by python classes in the module.

    Class commands are detected by inspecting all callable classes in the
    module for docopt-style docstrings.

    Args:
        module: An ast.Module object used to retrieve docopt-style commands.

    Yields:
        Command objects that represent entry points to append to setup.py.
    """
    nodes = (n for n in module.body if isinstance(n, ast.ClassDef))
    for cls in nodes:
        methods = (n.name for n in cls.body if isinstance(n, ast.FunctionDef))
        if '__call__' in methods:
            docstring = ast.get_docstring(cls)
            for commands, _ in usage.parse_commands(docstring):
                yield _EntryPoint(commands[0], next(iter(commands[1:]), None),
                                  cls.name)
项目:rcli    作者:contains-io    | 项目源码 | 文件源码
def _get_function_commands(module):
    # type: (ast.Module) -> typing.Generator[_EntryPoint, None, None]
    """Yield all Command objects represented by python functions in the module.

    Function commands consist of all top-level functions that contain
    docopt-style docstrings.

    Args:
        module: An ast.Module object used to retrieve docopt-style commands.

    Yields:
        Command objects that represent entry points to append to setup.py.
    """
    nodes = (n for n in module.body if isinstance(n, ast.FunctionDef))
    for func in nodes:
        docstring = ast.get_docstring(func)
        for commands, _ in usage.parse_commands(docstring):
            yield _EntryPoint(commands[0], next(iter(commands[1:]), None),
                              func.name)
项目:flake8-sql    作者:pgjones    | 项目源码 | 文件源码
def _check_query_words(
            self, query: ast.Str, parser: Parser,
    ) -> Generator[Tuple[int, int, str, type], Any, None]:
        for token in parser:
            word = token.value
            if token.is_keyword or token.is_function_name:
                if not word.isupper() and word.upper() not in self.excepted_names:
                    yield(
                        query.lineno, query.col_offset,
                        "Q440 keyword {} is not uppercase".format(word),
                        type(self),
                    )
                if word.upper() in ABBREVIATED_KEYWORDS:
                    yield(
                        query.lineno, query.col_offset,
                        "Q442 avoid abbreviated keywords, {}".format(word),
                        type(self),
                    )
            elif token.is_name and (not word.islower() or word.endswith('_')):
                yield(
                    query.lineno, query.col_offset,
                    "Q441 name {} is not valid, must be snake_case, and cannot "
                    "end with `_`".format(word),
                    type(self),
                )
项目:sockeye    作者:awslabs    | 项目源码 | 文件源码
def lexicon_iterator(path: str,
                     vocab_source: Dict[str, int],
                     vocab_target: Dict[str, int]) -> Generator[Tuple[int, int, float], None, None]:
    """
    Yields lines from a translation table of format: src, trg, logprob.

    :param path: Path to lexicon file.
    :param vocab_source: Source vocabulary.
    :param vocab_target: Target vocabulary.
    :return: Generator returning tuples (src_id, trg_id, prob).
    """
    assert C.UNK_SYMBOL in vocab_source
    assert C.UNK_SYMBOL in vocab_target
    src_unk_id = vocab_source[C.UNK_SYMBOL]
    trg_unk_id = vocab_target[C.UNK_SYMBOL]
    with smart_open(path) as fin:
        for line in fin:
            src, trg, logprob = line.rstrip("\n").split("\t")
            prob = np.exp(float(logprob))
            src_id = vocab_source.get(src, src_unk_id)
            trg_id = vocab_target.get(trg, trg_unk_id)
            yield src_id, trg_id, prob
项目:imap_tools    作者:ikvk    | 项目源码 | 文件源码
def _uid_str(uid_list: str or [str] or Generator) -> str:
        """
        Prepare list of uid for use in commands: delete/copy/move/seen
        uid_list can be: str, list, tuple, set, fetch generator
        """
        if not uid_list:
            raise MailBox.MailBoxUidParamError('uid_list should not be empty')
        if type(uid_list) is str:
            uid_list = uid_list.split(',')
        if inspect.isgenerator(uid_list):
            uid_list = [msg.uid for msg in uid_list if msg.uid]
        if type(uid_list) not in (list, tuple, set):
            raise MailBox.MailBoxUidParamError('Wrong uid_list type: {}'.format(type(uid_list)))
        for uid in uid_list:
            if type(uid) is not str:
                raise MailBox.MailBoxUidParamError('uid {} is not string'.format(str(uid)))
            if not uid.strip().isdigit():
                raise MailBox.MailBoxUidParamError('Wrong uid: {}'.format(uid))
        return ','.join((i.strip() for i in uid_list))
项目:imap_tools    作者:ikvk    | 项目源码 | 文件源码
def get_attachments(self) -> Generator:
        """
        Attachments of the mail message (generator)
        :return: generator of tuple(filename: str, payload: bytes)
        """
        for part in self.obj.walk():
            # multipart/* are just containers
            if part.get_content_maintype() == 'multipart':
                continue
            if part.get('Content-Disposition') is None:
                continue
            filename = part.get_filename()
            if not part.get_filename():
                continue  # this is what happens when Content-Disposition = inline
            filename = self._decode_value(*decode_header(filename)[0])
            payload = part.get_payload(decode=True)
            if not payload:
                continue
            yield filename, payload
项目:iota.lib.py    作者:iotaledger    | 项目源码 | 文件源码
def create_iterator(self, start=0, step=1):
    # type: (int, int) -> Generator[Address]
    """
    Creates an iterator that can be used to progressively generate new
    addresses.

    :param start:
      Starting index.

      Warning: This method may take awhile to reset if ``start``
      is a large number!

    :param step:
      Number of indexes to advance after each address.

      Warning: The generator may take awhile to advance between
      iterations if ``step`` is a large number!
    """
    key_iterator = (
      KeyGenerator(self.seed)
        .create_iterator(start, step, self.security_level)
    )

    while True:
      yield self._generate_address(key_iterator)
项目:iota.lib.py    作者:iotaledger    | 项目源码 | 文件源码
def iter_used_addresses(adapter, seed, start):
  # type: (BaseAdapter, Seed, int) -> Generator[Tuple[Address, List[TransactionHash]]]
  """
  Scans the Tangle for used addresses.

  This is basically the opposite of invoking ``getNewAddresses`` with
  ``stop=None``.
  """
  ft_command = FindTransactionsCommand(adapter)

  for addy in AddressGenerator(seed).create_iterator(start):
    ft_response = ft_command(addresses=[addy])

    if ft_response['hashes']:
      yield addy, ft_response['hashes']
    else:
      break

    # Reset the command so that we can call it again.
    ft_command.reset()
项目:pypeerassets    作者:PeerAssets    | 项目源码 | 文件源码
def test_find_deck_spawns(prov):

    if prov == "holy":
        provider = Holy(network="peercoin-testnet")

    if prov == "mintr":
        provider = Mintr(network="peercoin")

    if prov == "cryptoid":
        provider = Cryptoid(network="peercoin")

    try:
        if prov == "rpc":
            provider = RpcNode(testnet=True)
    except:
        print("No RpcNode avaliable.")

    assert isinstance(find_deck_spawns(provider), Generator)
项目:django_stored_procedures    作者:derfenix    | 项目源码 | 文件源码
def _generate_conditions(self,
                             filters: Generator[Tuple[str, RawSQLFilter], None, None]) -> Generator[str, None, None]:
        """
        Returns generator, yields raw-sql conditions strings

        E.g. 'field_name >= %s`

        :param filters: Generator with filter's name and `RawSQLFilter` instance
        """
        for name, filter_ in filters:
            conds_and_values = self._request_filters.get(name)
            if conds_and_values:
                for condition, value in conds_and_values:
                    try:
                        sql = filter_.filter(name, condition, value)
                    except ValidationError as e:
                        raise ValidationError('Exception raised for {}: {}'.format(name, e))
                    yield sql
            elif filter_.default is not None:
                self.params = filter_.default
                yield "{} = %s".format(name)
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def modpack_file(path: Path) -> Generator[ModPack, None, None]:
    """Context manager for manipulation of existing mod-pack.

    Keyword arguments:
        path: Path to the existing ModPack file, which should be provided.

    Yields:
        ModPack loaded from path. If no exception occurs, the provided modpack
        is written (with changes) back to the file on context exit.
    """

    with path.open(encoding='utf-8', mode='r') as istream:
        mp = ModPack.load(istream)

    yield mp

    with path.open(encoding='utf-8', mode='w') as ostream:
        mp.dump(ostream)
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def filter_obsoletes(
        self: 'ModPack',
        files: Iterable[File]
    ) -> Generator[File, None, None]:
        """Filter obsolete files.

        Obsolete files are defined as being already installed, or being
        an older version of already installed files.

        Keyword arguments:
            files: Iterable of mod :class:`File`s to filter.

        Yields:
            Original files without the obsoletes.
        """

        for file in files:
            current = self.installed.get(file.mod.id, None)

            if current is None or current.date < file.date:
                yield file
            else:
                continue
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def orphans(self: 'ModPack', mods: Mapping[int, Mod]=None) -> Generator[File, None, None]:
        """Finds all no longer needed dependencies.

        Keyword arguments:
            mods: Optional mapping of installed mods [default: self.mods].
                The purpose of this parameter is to be able to override
                really installed mods without changing the property directly.

        Yields:
            Orphaned files.
        """

        if mods is None:
            mods = self.mods

        needed = {}
        for file in mods.values():
            needed.update(resolve(file, pool=self.installed))

        # Filter unneeded dependencies
        yield from (
            file for m_id, file in self.dependencies.items()
            if m_id not in needed
        )
项目:aiohttp_json_api    作者:vovanbo    | 项目源码 | 文件源码
def populate(comments: Sequence['Comment'], authors: Sequence['People'],
                 count=100) -> Generator['Article', None, None]:
        import mimesis

        aid = mimesis.Numbers()
        article = mimesis.Text()
        answers = list(comments)

        def get_random_answers(max):
            counter = 0
            while answers and counter < max:
                yield answers.pop(random.randint(0, len(answers) - 1))
                counter += 1

        return (
            Article(
                id=aid.between(1, count),
                title=article.title(),
                author=random.choice(authors),
                comments=[c for c in get_random_answers(random.randint(1, 10))]
            )
            for _ in range(count)
        )
项目:ohneio    作者:acatton    | 项目源码 | 文件源码
def peek(nbytes=0) -> typing.Generator[_Action, Buffer, bytes]:
    """Read output without consuming it.

    Read but **does not** consume data from the protocol input.

    This is a *non-blocking* primitive, if less data than requested is available,
    less data is returned. It is meant to be used in combination with :func:`~ohneio.wait`, but
    edge cases and smart people can (and most likely will) prove me wrong.

    Args:
        nbytes (:obj:`int`, optional): amount of bytes to read *at most*. ``0`` meaning all bytes.

    Returns:
        bytes: data read from the buffer
    """
    input_ = yield _get_input
    return input_.peek(nbytes)
项目:libiocage    作者:iocage    | 项目源码 | 文件源码
def exec_iter(
    command: typing.List[str],
    logger: typing.Optional[iocage.lib.Logger.Logger]=None
) -> typing.Generator[str, None, None]:

    process = exec_raw(
        command,
        logger=logger,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        universal_newlines=True
    )

    for stdout_line in iter(process.stdout.readline, ""):
        yield stdout_line

    process.stdout.close()

    return_code = process.wait()
    if return_code:
        raise subprocess.CalledProcessError(return_code, command)
项目:libiocage    作者:iocage    | 项目源码 | 文件源码
def __iter__(
        self
    ) -> typing.Generator[Resource, None, None]:

        for child_dataset in self.dataset.children:

            name = self._get_asset_name_from_dataset(child_dataset)
            if self._filters is not None and \
               self._filters.match_key("name", name) is not True:
                # Skip all jails that do not even match the name
                continue

            # ToDo: Do not load jail if filters do not require to
            resource = self._get_resource_from_dataset(child_dataset)
            if self._filters is not None:
                if self._filters.match_resource(resource):
                    yield resource
项目:libiocage    作者:iocage    | 项目源码 | 文件源码
def _print_list(
    resources: typing.Generator[
        iocage.lib.Jails.JailsGenerator,
        None,
        None
    ],
    columns: list,
    show_header: bool,
    separator: str=";"
) -> None:

    if show_header is True:
        print(separator.join(columns).upper())

    for resource in resources:
        print(separator.join(_lookup_resource_values(resource, columns)))
项目:libiocage    作者:iocage    | 项目源码 | 文件源码
def _print_json(
    resources: typing.Generator[
        iocage.lib.Jails.JailsGenerator,
        None,
        None
    ],
    columns: list,
    **json_dumps_args
):

    if "indent" not in json_dumps_args.keys():
        json_dumps_args["indent"] = 2

    if "sort_keys" not in json_dumps_args.keys():
        json_dumps_args["sort_keys"] = True

    output = []

    for resource in resources:
        output.append(dict(zip(
            columns,
            _lookup_resource_values(resource, columns)
        )))

    print(json.dumps(output, **json_dumps_args))
项目:binaryalert    作者:airbnb    | 项目源码 | 文件源码
def _read_in_chunks(file_object: IO[bytes], chunk_size: int = 2*MB) -> Generator[bytes, None, None]:
    """Read a file in fixed-size chunks (to minimize memory usage for large files).

    Args:
        file_object: An opened file-like object supporting read().
        chunk_size: Max size (in bytes) of each file chunk.

    Yields:
        File chunks, each of size at most chunk_size.
    """
    while True:
        chunk = file_object.read(chunk_size)
        if chunk:
            yield chunk
        else:
            return  # End of file.
项目:indy-plenum    作者:hyperledger    | 项目源码 | 文件源码
def get_output(self, limit: int = None) -> Generator:
        if limit is None:
            per_replica = None
        else:
            per_replica = round(limit / self.num_replicas)
            if per_replica == 0:
                logger.debug("{} forcibly setting replica "
                             "message limit to {}"
                             .format(self._node.name,
                                     per_replica))
                per_replica = 1
        for replica in self._replicas:
            num = 0
            while replica.outBox:
                yield replica.outBox.popleft()
                num += 1
                if per_replica and num >= per_replica:
                    break
项目:amino    作者:tek    | 项目源码 | 文件源码
def untyped_do(f: Callable[..., Generator[G, B, None]]) -> Callable[..., G]:
    @functools.wraps(f)
    def do_loop(*a: Any, **kw: Any) -> F[B]:
        itr = f(*a, **kw)
        if not isinstance(itr, GeneratorType):
            raise Exception(f'function `{f.__qualname__}` decorated with `do` does not produce a generator')
        init = itr.send(None)
        m = Monad.fatal_for(init)
        @functools.wraps(f)
        def loop(val: B) -> F[B]:
            try:
                return m.flat_map(itr.send(val), loop)
            except StopIteration:
                return m.pure(val)
        return m.flat_map(init, loop)
    return do_loop
项目:amino    作者:tek    | 项目源码 | 文件源码
def _drain_find(self, abort: Callable[[A], bool]) -> Maybe[A]:
        culprit = Empty()
        def gen() -> Generator:
            nonlocal culprit
            while True:
                try:
                    el = next(self.source)
                    yield el
                    if abort(el):
                        culprit = Just(el)
                        break
                except StopIteration:
                    break
        drained = List.wrap(list(gen()))
        self.strict = self.strict + drained
        return culprit
项目:scibot    作者:SciCrunch    | 项目源码 | 文件源码
def make_find_check_resolve_submit(finder: Finder, notSubmittedCheck: Checker, resolver: Resolver, submitter: Submitter) -> Processor:
    def inner(text: str) -> Generator:
        for found in finder(text):
            print(found)
            if notSubmittedCheck(found):
                resolved = resolver(found)
                yield submitter(found, resolved)
    return inner
项目:python-devtools    作者:samuelcolvin    | 项目源码 | 文件源码
def _process_args(self, func_ast, code_lines, args, kwargs) -> Generator[DebugArgument, None, None]:  # noqa: C901
        arg_offsets = list(self._get_offsets(func_ast))
        for arg, ast_node, i in zip(args, func_ast.args, range(1000)):
            if isinstance(ast_node, ast.Name):
                yield self.output_class.arg_class(arg, name=ast_node.id)
            elif isinstance(ast_node, self.complex_nodes):
                # TODO replace this hack with astor when it get's round to a new release
                start_line, start_col = arg_offsets[i]

                if i + 1 < len(arg_offsets):
                    end_line, end_col = arg_offsets[i + 1]
                else:
                    end_line, end_col = len(code_lines) - 1, None

                name_lines = []
                for l_ in range(start_line, end_line + 1):
                    start_ = start_col if l_ == start_line else 0
                    end_ = end_col if l_ == end_line else None
                    name_lines.append(
                        code_lines[l_][start_:end_].strip(' ')
                    )
                yield self.output_class.arg_class(arg, name=' '.join(name_lines).strip(' ,'))
            else:
                yield self.output_class.arg_class(arg)

        kw_arg_names = {}
        for kw in func_ast.keywords:
            if isinstance(kw.value, ast.Name):
                kw_arg_names[kw.arg] = kw.value.id
        for name, value in kwargs.items():
            yield self.output_class.arg_class(value, name=name, variable=kw_arg_names.get(name))
项目:python-devtools    作者:samuelcolvin    | 项目源码 | 文件源码
def _format(self, value: Any, indent_current: int, indent_first: bool):
        if indent_first:
            self._stream.write(indent_current * self._c)

        value_repr = repr(value)
        if len(value_repr) <= self._simple_cutoff and not isinstance(value, collections.Generator):
            self._stream.write(value_repr)
        else:
            indent_new = indent_current + self._indent_step
            for t, func in self._type_lookup:
                if isinstance(value, t):
                    func(value, value_repr, indent_current, indent_new)
                    return
            self._format_raw(value, value_repr, indent_current, indent_new)
项目:python-devtools    作者:samuelcolvin    | 项目源码 | 文件源码
def _format_generators(self, value: Generator, value_repr: str, indent_current: int, indent_new: int):
        if self._repr_generators:
            self._stream.write(value_repr)
        else:
            self._stream.write('(\n')
            for v in value:
                self._format(v, indent_new, True)
                self._stream.write(',\n')
            self._stream.write(indent_current * self._c + ')')
项目:drf-metadata    作者:night-crawler    | 项目源码 | 文件源码
def get_meta(self) -> t.Generator[t.Dict, None, None]:
        """
        :return: generator
        list(metadata_obj.get_meta()) -> [abstract_field_obj1, abstract_field_obj2, ...]
        """
        fields_by_name = OrderedDict()
        for field_data in self.fields:
            fields_by_name[field_data['name']] = field_data

        for k, v_callable in self.__class__.__dict__.items():
            # method `get_field_<NAME>` used for updates later
            if k.startswith('get_field_'):  # get_field_ ????????? ???????????? ????
                continue
            if not k.startswith('get_'):
                continue
            # check dynamic get_%s fields
            # method get_%s must return {'name': '<NAME>'}, where <name> is a real field name
            res = v_callable(self, self.request)
            fields_by_name[res['name']] = res

        fields_order = self.order or fields_by_name.keys()

        for field_name in fields_order:
            field_value = fields_by_name[field_name]
            # method should update field with returned dict
            method = getattr(self, 'get_field_%s' % field_name, None)
            if callable(method):
                field_value.update(method(field_name, self.request))

            yield field_value

    # noinspection PyMethodMayBeStatic,PyUnusedLocal
项目:fauxmo-plugins    作者:n8henrie    | 项目源码 | 文件源码
def restapiplugin_target() -> Generator:
    """Simulate the endpoints triggered by RESTAPIPlugin."""
    fauxmo_device = Process(target=httpbin.core.app.run,
                            kwargs={"host": "127.0.0.1", "port": 8000},
                            daemon=True)

    fauxmo_device.start()
    time.sleep(1)

    yield

    fauxmo_device.terminate()
    fauxmo_device.join()
项目:proxenos    作者:darvid    | 项目源码 | 文件源码
def srand(seed=0):
    # type: (KeyType) -> typing.Generator[int, None, None]
    if isinstance(seed, six.string_types) or isinstance(seed, bytes):
        if isinstance(seed, six.text_type):
            seed = seed.encode('utf-8')
        seed_int = int(hashlib.sha512(seed).hexdigest(), 16)
        seed = typing.cast(int, seed_int)
    rng = random.Random(seed)
    while True:
        yield rng.randint(0, sys.maxsize)
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def _get_many_generator(self, result: Iterable[S], context: PipelineContext = None) -> Generator[T, None, None]:
        for item in result:
            LOGGER.info("Sending item \"{item}\" to sinks before converting".format(item=item))
            for sink in self._before_transform:
                sink.put(item, context)

            LOGGER.info("Converting item \"{item}\" to request type".format(item=item))
            item = self._transform(data=item, context=context)

            LOGGER.info("Sending item \"{item}\" to sinks after converting".format(item=item))
            for sink in self._after_transform:
                sink.put(item, context)

            yield item
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def get_many_int(self, query: Mapping[str, Any], context: PipelineContext = None) -> Generator[int, None, None]:
        value = query.get(VALUE_KEY)
        count = query.get(COUNT_KEY)

        try:
            value = int(value)
        except ValueError:
            raise NotFoundError("Couldn't cast the query value to \"int\"")

        return (value for _ in range(count))
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def get_many_float(self, query: Mapping[str, Any], context: PipelineContext = None) -> Generator[float, None, None]:
        value = query.get(VALUE_KEY)
        count = query.get(COUNT_KEY)

        try:
            value = float(value)
        except ValueError:
            raise NotFoundError("Couldn't cast the query value to \"float\"")

        if value not in self.items:
            raise NotFoundError("Query value wasn't in store!")

        return (value for _ in range(count))
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def get_many(self, type: Type[T], query: Mapping[str, Any], context: PipelineContext = None) -> Generator[T, None, None]:
        value = query.get(VALUE_KEY)
        count = query.get(COUNT_KEY)

        try:
            # noinspection PyCallingNonCallable
            value = type(value)
        except ValueError:
            raise NotFoundError("Couldn't cast the query value to \"{type}\"".format(type=type))

        return (value for _ in range(count))
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def get_many_int(self, query: Mapping[str, Any], context: PipelineContext = None) -> Generator[int, None, None]:
        value = query.get(VALUE_KEY)
        count = query.get(COUNT_KEY)

        try:
            value = int(value)
        except ValueError:
            raise NotFoundError("Couldn't cast the query value to \"int\"")

        return (value for _ in range(count))
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def get_many_str(self, query: Mapping[str, Any], context: PipelineContext = None) -> Generator[str, None, None]:
        value = query.get(VALUE_KEY)
        count = query.get(COUNT_KEY)

        try:
            value = str(value)
        except ValueError:
            raise NotFoundError("Couldn't cast the query value to \"str\"")

        return (value for _ in range(count))


########################
# Unsupported Function #
########################
项目:allennlp    作者:allenai    | 项目源码 | 文件源码
def __call__(self,
                 dataset: Dataset,
                 num_epochs: int = None,
                 shuffle: bool = True,
                 cuda_device: int = -1,
                 for_training: bool = True) -> Generator[Dict[str, Union[numpy.ndarray,
                                                                         Dict[str, numpy.ndarray]]],
                                                         None, None]:
        """
        Returns a generator that yields batches over the given dataset, forever.

        Parameters
        ----------
        dataset : ``Dataset``
        num_epochs : ``int``, optional (default=``None``)
            How times should we iterate over this dataset?  If ``None``, we will iterate over it
            forever.
        shuffle : ``bool``, optional (default=``True``)
            If ``True``, we will shuffle the instances in ``dataset`` before constructing batches
            and iterating over the data.
        cuda_device : ``int``
            If cuda_device >= 0, GPUs are available and Pytorch was compiled with CUDA support, the
            tensor will be copied to the cuda_device specified.
        for_training : ``bool``, optional (default=``True``)
            If ``False``, we will pass the ``volatile=True`` flag when constructing variables,
            which disables gradient computations in the graph.  This makes inference more efficient
            (particularly in memory usage), but is incompatible with training models.
        """
        if num_epochs is None:
            while True:
                yield from self._yield_one_epoch(dataset, shuffle, cuda_device, for_training)
        else:
            for _ in range(num_epochs):
                yield from self._yield_one_epoch(dataset, shuffle, cuda_device, for_training)
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def get_clients_groups(self) -> typing.Generator['FlyingUnit', None, None]:
        for group in self.groups:
            assert isinstance(group, Group)
            if group.group_is_client_group:
                yield group
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def farps(self) -> typing.Generator['Static', None, None]:
        for coa in [self.blue_coa, self.red_coa]:
            for farp in coa.farps:
                yield farp


# noinspection PyProtectedMember
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def countries(self) -> typing.Generator['Country', None, None]:
        for k in self._section_country:
            if k not in self._countries.keys():
                country = Country(self.d, self.l10n, self.coa_color, k)
                self._countries[k] = country
                self._countries_by_id[country.country_id] = country
                self._countries_by_name[country.country_name] = country
            yield self._countries[k]
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def groups(self) -> typing.Generator['Group', None, None]:
        for country in self.countries:
            assert isinstance(country, Country)
            for group in country.groups:
                assert isinstance(group, Group)
                yield group
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def statics(self) -> typing.Generator['Static', None, None]:
        for country in self.countries:
            assert isinstance(country, Country)
            for static in country.statics:
                assert isinstance(static, Static)
                yield static
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def get_groups_from_category(self, category) -> typing.Generator['Group', None, None]:
        Mission.validator_group_category.validate(category, 'get_groups_from_category')
        for group in self.groups:
            assert isinstance(group, Group)
            if group.group_category == category:
                yield group