Python typing 模块,Iterable() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用typing.Iterable()

项目:greenstalk    作者:mayhewj    | 项目源码 | 文件源码
def __init__(self,
                 host: str = '127.0.0.1',
                 port: int = 11300,
                 encoding: Optional[str] = 'utf-8',
                 use: str = DEFAULT_TUBE,
                 watch: Union[str, Iterable[str]] = DEFAULT_TUBE) -> None:
        self._sock = socket.create_connection((host, port))
        self._reader = self._sock.makefile('rb')  # type: BinaryIO
        self.encoding = encoding

        if use != DEFAULT_TUBE:
            self.use(use)

        if isinstance(watch, str):
            if watch != DEFAULT_TUBE:
                self.watch(watch)
                self.ignore(DEFAULT_TUBE)
        else:
            for tube in watch:
                self.watch(tube)
            if DEFAULT_TUBE not in watch:
                self.ignore(DEFAULT_TUBE)
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def avg_linelength_diffs(diffargs):
    # type: (List[Tuple[str, bytes]]) -> Iterable[int]
    """Returns the nudged absolute line length differences.
    """
    for filename1, content2 in diffargs:
        linelen1 = get_num_lines(filename1)
        filelen1 = len(get_cached_file(filename1))
        avg1 = 0.0
        if linelen1 > 0:
            avg1 = float(filelen1) / linelen1

        linelen2 = count_content_lines(content2)
        filelen2 = len(content2)
        avg2 = 0.0
        if linelen2 > 0:
            avg2 = float(filelen2) / linelen2

        yield int(abs(10000.0 * (avg1 - avg2)))
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def unified_diff(filename, content2=None):
    # type: (str, Optional[bytes]) -> Tuple[int, Iterable[str]]
    """This function prints a unified diff of the contents of
    filename and the standard input, when used from the command line
    as follows:
        echo 123 > d.txt ; echo 456 | ./whatstyle.py --stdindiff d.txt
    We get this result:
    ---
    +++
    @@ -1 +1 @@
    -123
    +456
    """
    use_stdin = content2 is None
    if content2 is None:
        # Read binary input stream
        stdin = rawstream(sys.stdin)
        econtent2 = bytestr(stdin.read())
    else:
        econtent2 = content2
    exit_code, diff = compute_unified_diff(filename, econtent2, lineterm='')
    if use_stdin:
        write('\n'.join(diff))
    return exit_code, diff
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def compute_unified_diff(filename, content2, **kwargs):
    # type: (str, bytes, **Any) -> Tuple[int, Iterable[str]]
    diff = ()  # type: Iterable[str]
    exit_code = ERROR
    kw = kwargs.copy()
    if 'n' not in kwargs:
        # zero context lines
        kw['n'] = 0
    try:
        content1 = get_cached_file(filename)
        if PY3:
            c1 = unistr(content1)
            c2 = unistr(content2)
        else:
            c1 = content1
            c2 = content2
        diff = difflib.unified_diff(c1.splitlines(True), c2.splitlines(True), **kw)
        exit_code = OK
    finally:
        return exit_code, diff

# ---------------------------------------------------------------------
# Spare the user from specifying a formatter by finding a suitable one.
项目:saapy    作者:ashapochka    | 项目源码 | 文件源码
def misses_to_frame(parsed_lexemes: Iterable,
                    terms: Dict[str, str]=None) -> pd.DataFrame:
    if not terms:
        terms = {}
    miss_dict = collect_misses(parsed_lexemes)
    misses = []
    for miss in miss_dict:
        low_miss = miss.lower()
        miss_record = OrderedDict()
        miss_record['miss'] = low_miss
        miss_record['term'] = terms.get(low_miss, low_miss)
        miss_record['lexemes'] = ' '.join(miss_dict[miss])
        misses.append(miss_record)
    miss_frame = pd.DataFrame.from_records(
        misses, index='miss', columns=['miss', 'term', 'lexemes'])
    return miss_frame
项目:backend.ai-client-py    作者:lablup    | 项目源码 | 文件源码
def _list(cls, user_id: int,
              is_active: Optional[bool]=None,
              fields: Optional[Iterable[str]]=None):
        if fields is None:
            fields = (
                'access_key', 'secret_key',
                'is_active', 'is_admin',
            )
        q = 'query($user_id: Int!, $is_active: Boolean) {' \
            '  keypairs(user_id: $user_id, is_active: $is_active) {' \
            '    $fields' \
            '  }' \
            '}'
        q = q.replace('$fields', ' '.join(fields))
        vars = {
            'user_id': user_id,
            'is_active': is_active,
        }
        resp = yield Request('POST', '/admin/graphql', {
            'query': q,
            'variables': vars,
        })
        data = resp.json()
        return data['keypairs']
项目:backend.ai-client-py    作者:lablup    | 项目源码 | 文件源码
def _get_or_create(cls, lang: str,
                       client_token: Optional[str]=None,
                       mounts: Optional[Iterable[str]]=None,
                       envs: Optional[Mapping[str, str]]=None,
                       max_mem: int=0, exec_timeout: int=0) -> str:
        if client_token:
            assert len(client_token) > 8
        else:
            client_token = uuid.uuid4().hex
        resp = yield Request('POST', '/kernel/create', {
            'lang': lang,
            'clientSessionToken': client_token,
            'config': {
                'mounts': mounts,
                'envs': envs,
            },
            # 'limits': {
            #     'maxMem': max_mem,
            #     'execTimeout': exec_timeout,
            # },
        })
        data = resp.json()
        o = cls(data['kernelId'])  # type: ignore
        o.created = data.get('created', True)  # True is for legacy
        return o
项目:speechless    作者:JuliusKunze    | 项目源码 | 文件源码
def _extract_positional_label_by_id(self, files: Iterable[Path]) -> Dict[str, Union[PositionalLabel, str]]:
        xml_ending = ".xml"

        microphone_endings = [
            "_Yamaha",
            "_Kinect-Beam",
            "_Kinect-RAW",
            "_Realtek",
            "_Samson",
            "_Microsoft-Kinect-Raw"
        ]

        xml_files = [file for file in files if file.name.endswith(xml_ending) if
                     self.id_filter_regex.match(name_without_extension(file))]

        return OrderedDict(
            (name_without_extension(file) + microphone_ending,
             self._extract_label_from_xml(file))
            for file in xml_files
            for microphone_ending in microphone_endings
            if (Path(file.parent) / (name_without_extension(file) + microphone_ending + ".wav")).exists())
项目:speechless    作者:JuliusKunze    | 项目源码 | 文件源码
def train(self,
              labeled_spectrogram_batches: Iterable[List[LabeledSpectrogram]],
              preview_labeled_spectrogram_batch: List[LabeledSpectrogram],
              tensor_board_log_directory: Path,
              net_directory: Path,
              batches_per_epoch: int):
        print_preview_batch = lambda: log(self.test_and_predict_batch(preview_labeled_spectrogram_batch))

        print_preview_batch()
        self.loss_net.fit_generator(self._loss_inputs_generator(labeled_spectrogram_batches), epochs=100000000,
                                    steps_per_epoch=batches_per_epoch,
                                    callbacks=self.create_callbacks(
                                        callback=print_preview_batch,
                                        tensor_board_log_directory=tensor_board_log_directory,
                                        net_directory=net_directory),
                                    initial_epoch=self.load_epoch if (self.load_epoch is not None) else 0)
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def _best_transform_from(self, source_type: Type[S], target_types: Iterable[Type]) -> Tuple[Callable[[S], Any], Type, int]:
        best = None
        best_cost = _MAX_TRANSFORM_COST
        to_type = None
        for target_type in target_types:
            try:
                transform, cost = self._transform(source_type, target_type)
                if cost < best_cost:
                    best = transform
                    best_cost = cost
                    to_type = target_type
            except NoConversionError:
                pass
        if best is None:
            raise NoConversionError("Pipeline can't convert \"{source_type}\" to any of \"{target_types}\"".format(source_type=source_type, target_types=target_types))
        return best, to_type, best_cost
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def _best_transform_to(self, target_type: Type[T], source_types: Iterable[Type]) -> Tuple[Callable[[T], Any], Type, int]:
        best = None
        best_cost = _MAX_TRANSFORM_COST
        from_type = None
        for source_type in source_types:
            try:
                transform, cost = self._transform(source_type, target_type)
                if cost < best_cost:
                    best = transform
                    best_cost = cost
                    from_type = source_type
            except NoConversionError:
                pass
        if best is None:
            raise NoConversionError("Pipeline can't convert from any of \"{source_types}\" to \"{target_type}\"".format(source_types=source_types, target_type=target_type))
        return best, from_type, best_cost
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def _create_sink_handlers_simultaneously(self, before: Type[T], transform: DataTransformer, after: Type[T], targets: Iterable[DataSink]):
        before_transform_handlers = set()
        after_transform_handlers = set()
        for sink in targets:
            try:
                before_transformer, before_to_type, before_cost = self._best_transform_from(before, sink.accepts)
            except NoConversionError:
                before_transformer = None
            try:
                after_transformer, after_to_type, after_cost = self._best_transform_from(after, sink.accepts)
            except NoConversionError:
                after_transformer = None

            if before_transformer is not None and after_transformer is not None:
                if before_cost < after_cost:
                    before_transform_handlers.add(_SinkHandler(sink, before_to_type, before_transformer))
                else:
                    after_transform_handlers.add(_SinkHandler(sink, after_to_type, after_transformer))
            elif before_transformer is not None:
                before_transform_handlers.add(_SinkHandler(sink, before_to_type, before_transformer))
            elif after_transformer is not None:
                after_transform_handlers.add(_SinkHandler(sink, after_to_type, after_transformer))
        return before_transform_handlers, after_transform_handlers
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def put_many(self, type: Type[T], items: Iterable[T]) -> None:
        """Puts multiple objects of the same type into the data sink. The objects may be transformed into a new type for insertion if necessary.

        Args:
            items: An iterable (e.g. list) of objects to be inserted into the data pipeline.
        """
        LOGGER.info("Getting SinkHandlers for \"{type}\"".format(type=type.__name__))
        try:
            handlers = self._put_types[type]
        except KeyError:
            try:
                LOGGER.info("Building new SinkHandlers for \"{type}\"".format(type=type.__name__))
                handlers = self._put_handlers(type)
            except NoConversionError:
                handlers = None
            self._get_types[type] = handlers

        LOGGER.info("Creating new PipelineContext")
        context = self._new_context()

        LOGGER.info("Sending items \"{items}\" to SourceHandlers".format(items=items))
        if handlers is not None:
            items = list(items)
            for handler in handlers:
                handler.put_many(items, context)
项目:python-netsgiro    作者:otovo    | 项目源码 | 文件源码
def _split_text_to_lines_and_columns(
            cls, text) -> Iterable[Tuple[int, int, str]]:
        lines = text.splitlines()

        if len(lines) > cls._MAX_LINES:
            raise ValueError(
                'Max {} specification lines allowed, got {}'
                .format(cls._MAX_LINES, len(lines)))

        for line_number, line_text in enumerate(lines, 1):
            if len(line_text) > cls._MAX_LINE_LENGTH:
                raise ValueError(
                    'Specification lines must be max {} chars long, '
                    'got {}: {!r}'
                    .format(cls._MAX_LINE_LENGTH, len(line_text), line_text))

            yield (line_number, 1, '{:40}'.format(line_text[0:40]))
            yield (line_number, 2, '{:40}'.format(line_text[40:80]))
项目:lHelper    作者:Julian24816    | 项目源码 | 文件源码
def run(cls, command: str, *args: Iterable[str]):
        """
        Runs the registered Command if it exists.
        :raises KeyError: when the command does not exist
        :param command: the registry key of the command to be invoked
        :param args: the arguments to be passed on to the command
        """
        if cls.has_option(command):
            try:
                cls.__registry[command](*args)
            # except Exception as e:
            #     raise e
            except TypeError:
                print(cls.__registry[command].usage_notice())
            except KeyboardInterrupt:
                print("Command aborted.")
            except Exception as e:
                print(e)
                print("Please contact support.")
        else:
            raise UnknownCommand(command)
项目:sqlalchemy-media    作者:pylover    | 项目源码 | 文件源码
def coerce(cls, index, value):

        if not isinstance(value, cls):

            if isinstance(value, Iterable):
                result = cls()

                # noinspection PyTypeChecker
                for i in value:
                    item = cls.__item_type__.coerce(index, i)
                    # item.set_parent(result)
                    result.append(item)
                return result

            return super().coerce(index, value)

        else:
            return value
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def send_sms(recipients: Iterable[str], msg: str, username: str, api_key: str, sender: str):
    data = {
        'messages': [],
    }  # type: Dict[str, List]
    for recipient in recipients:
        data['messages'].append({
            'source': 'python',
            'from': sender,
            'body': msg[:140],
            'to': recipient,
            'schedule': ''
        })
    try:
        async with aiohttp.ClientSession(headers={'Content-Type': 'application/json'},
                                         auth=aiohttp.BasicAuth(username, api_key)) as session:
            async with session.post(CLICKSEND_URL, data=json.dumps(data), timeout=30) as resp:
                if resp.status != 200:
                    log.msg('Error sending clicksend sms notification: http status %s' % (str(resp.status)),
                            'NOTIFICATION')
    except aiohttp.ClientError as e:
        log.msg('Error sending clicksend sms notification: %s' % (str(e)), 'NOTIFICATIONS')
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def _get_alerts(self, q: str, q_args: Iterable[Any]) -> List[Dict[str, Any]]:
        rows = await self.request.app['dbcon'].fetch_all(q, q_args)
        ret = []
        for id, monitor_id, start_ts, end_ts, alert_msg in rows:
            alert = {
                'id': id,
                'monitor_id': monitor_id,
                'start_ts': start_ts,
                'end_ts': end_ts,
                'alert_msg': alert_msg,
                'monitor_description': '',
            }
            monitor = self.request.app['active_monitor_manager'].monitors.get(monitor_id, None)  # type: ActiveMonitor
            if monitor:
                alert['monitor_description'] = monitor.get_description()
            ret.append(alert)
        return ret
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def get(self) -> web.Response:
        dbcon = self.request.app['dbcon']
        if 'id' in self.request.rel_url.query:
            contact_id = require_int(get_request_param(self.request, 'id'))
            c = await contact.get_contact(dbcon, contact_id)
            contact_list = []  # type: Iterable[object_models.Contact]
            if c:
                contact_list = [c]
            metadata_list = await metadata.get_metadata_for_object(dbcon, 'contact', contact_id)
        elif 'meta_key' in self.request.rel_url.query:
            meta_key = require_str(get_request_param(self.request, 'meta_key'))
            meta_value = require_str(get_request_param(self.request, 'meta_value'))
            contact_list = await contact.get_contacts_for_metadata(dbcon, meta_key, meta_value)
            metadata_list = await metadata.get_metadata_for_object_metadata(
                dbcon, meta_key, meta_value, 'contact', 'contacts')
        else:
            contact_list = await contact.get_all_contacts(dbcon)
            metadata_list = await metadata.get_metadata_for_object_type(dbcon, 'contact')
        return web.json_response(apply_metadata_to_model_list(contact_list, metadata_list))
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def get(self) -> web.Response:
        dbcon = self.request.app['dbcon']
        if 'id' in self.request.rel_url.query:
            monitor_group_id = require_int(get_request_param(self.request, 'id'))
            monitor_group_item = await monitor_group.get_monitor_group(dbcon, monitor_group_id)
            monitor_group_list = []  # type: Iterable[object_models.MonitorGroup]
            if monitor_group_item:
                monitor_group_list = [monitor_group_item]
            metadata_list = await metadata.get_metadata_for_object(dbcon, 'monitor_group', monitor_group_id)
        elif 'meta_key' in self.request.rel_url.query:
            meta_key = require_str(get_request_param(self.request, 'meta_key'))
            meta_value = require_str(get_request_param(self.request, 'meta_value'))
            monitor_group_list = await monitor_group.get_monitor_groups_for_metadata(dbcon, meta_key, meta_value)
            metadata_list = await metadata.get_metadata_for_object_metadata(
                dbcon, meta_key, meta_value, 'monitor_group', 'monitor_groups')
        else:
            monitor_group_list = await monitor_group.get_all_monitor_groups(dbcon)
            metadata_list = await metadata.get_metadata_for_object_type(dbcon, 'monitor_group')
        return web.json_response(apply_metadata_to_model_list(monitor_group_list, metadata_list))
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def set_active_monitor_contacts(dbcon: DBConnection,
                                      contact_ids: Iterable[int], monitor_id: int):
    """(Re-)set contacts for an active monitor.

    Delete existing contacts for an active monitor and set the given new
    contacts.
    """

    async def _run(cur: Cursor) -> None:
        q = """delete from active_monitor_contacts where active_monitor_id=%s"""
        await cur.execute(q, (monitor_id,))
        for contact_id in contact_ids:
            q = """insert into active_monitor_contacts (active_monitor_id, contact_id) values (%s, %s)"""
            q_args = (monitor_id, contact_id)
            await cur.execute(q, q_args)

    if not await active_monitor_exists(dbcon, monitor_id):
        raise errors.InvalidArguments('monitor does not exist')
    await dbcon.transact(_run)
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def set_active_monitor_contact_groups(dbcon: DBConnection,
                                            contact_group_ids: Iterable[int], monitor_id: int) -> None:
    """(Re-)set contact_groups for an active monitor.

    Delete existing contact groups for an active monitor and set the given new
    contact groups.
    """

    async def _run(cur: Cursor) -> None:
        q = """delete from active_monitor_contact_groups where active_monitor_id=%s"""
        await cur.execute(q, (monitor_id,))
        for contact_group_id in contact_group_ids:
            q = """insert into active_monitor_contact_groups (active_monitor_id, contact_group_id) values (%s, %s)"""
            q_args = (monitor_id, contact_group_id)
            await cur.execute(q, q_args)

    if not await active_monitor_exists(dbcon, monitor_id):
        raise errors.InvalidArguments('monitor does not exist')
    await dbcon.transact(_run)
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def set_contact_group_contacts(dbcon: DBConnection,
                                     contact_group_id: int, contact_ids: Iterable[int]) -> None:
    """(Re-)set contacts for a contact group.

    Delete existing contacts for a contact group and set the given new
    contacts.
    """

    async def _run(cur: Cursor) -> None:
        q = """delete from contact_group_contacts where contact_group_id=%s"""
        await cur.execute(q, (contact_group_id,))
        for contact_id in contact_ids:
            q = """insert into contact_group_contacts (contact_group_id, contact_id) values (%s, %s)"""
            q_args = (contact_group_id, contact_id)
            await cur.execute(q, q_args)

    if not await contact_group_exists(dbcon, contact_group_id):
        raise errors.InvalidArguments('contact group does not exist')
    await dbcon.transact(_run)
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def delete_metadata(dbcon: DBConnection, object_type: str, object_id: int,
                          keys: Optional[Iterable[str]] = None):
    """Delete metadata for an object.

    If keys is given, only delete the specified keys, otherwise delete all
    metadata for the object.
    """

    async def _run(cur: Cursor) -> None:
        if keys:
            # noinspection PyTypeChecker
            for key in keys:
                q = """delete from object_metadata where object_type=%s and object_id=%s and `key`=%s"""
                q_args = (object_type, object_id, key)  # type: Tuple
                await cur.execute(q, q_args)
        else:
            q = """delete from object_metadata where object_type=%s and object_id=%s"""
            q_args = (object_type, object_id)
            await cur.execute(q, q_args)

    await dbcon.transact(_run)
项目:parsita    作者:drhagen    | 项目源码 | 文件源码
def splat(f: Callable[..., A]) -> Callable[[Iterable], A]:
    """Convert a function taking multiple arguments into a function taking a single iterable argument.

    Args:
        f: Any function

    Returns:
        A function that accepts a single iterable argument. Each element of this iterable argument is passed as an
        argument to ``f``.

    Example:
        $ def f(a, b, c):
        $     return a + b + c
        $
        $ f(1, 2, 3)  # 6
        $ g = splat(f)
        $ g([1, 2, 3])  # 6
    """

    def splatted(args):
        return f(*args)

    return splatted
项目:parsita    作者:drhagen    | 项目源码 | 文件源码
def unsplat(f: Callable[[Iterable], A]) -> Callable[..., A]:
    """Convert a function taking a single iterable argument into a function taking multiple arguments.

    Args:
        f: Any function taking a single iterable argument

    Returns:
        A function that accepts multiple arguments. Each argument of this function is passed as an element of an
        iterable to ``f``.

    Example:
        $ def f(a):
        $     return a[0] + a[1] + a[2]
        $
        $ f([1, 2, 3])  # 6
        $ g = unsplat(f)
        $ g(1, 2, 3)  # 6
    """

    def unsplatted(*args):
        return f(args)

    return unsplatted
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def config_per_platform(config: ConfigType,
                        domain: str) -> Iterable[Tuple[Any, Any]]:
    """Generator to break a component config into different platforms.

    For example, will find 'switch', 'switch 2', 'switch 3', .. etc
    """
    for config_key in extract_domain_configs(config, domain):
        platform_config = config[config_key]

        if not platform_config:
            continue
        elif not isinstance(platform_config, list):
            platform_config = [platform_config]

        for item in platform_config:
            try:
                platform = item.get(CONF_PLATFORM)
            except AttributeError:
                platform = None

            yield platform, item
项目:jack    作者:uclmr    | 项目源码 | 文件源码
def train(self, optimizer,
              training_set: Iterable[Tuple[QASetting, List[Answer]]],
              batch_size: int, max_epochs=10, hooks=tuple(),
              l2=0.0, clip=None, clip_op=tf.clip_by_value, summary_writer=None, **kwargs):
        """
        This method trains the reader (and changes its state).

        Args:
            optimizer: TF optimizer
            training_set: the training instances.
            batch_size: size of training batches
            max_epochs: maximum number of epochs
            hooks: TrainingHook implementations that are called after epochs and batches
            l2: whether to use l2 regularization
            clip: whether to apply gradient clipping and at which value
            clip_op: operation to perform for clipping
        """
        batches, loss, min_op, summaries = self._setup_training(
            batch_size, clip, optimizer, training_set, summary_writer, l2, clip_op, **kwargs)

        self._train_loop(min_op, loss, batches, hooks, max_epochs, summaries, summary_writer, **kwargs)
项目:srctools    作者:TeamSpen210    | 项目源码 | 文件源码
def __init__(
        self,
        x: Union[int, float, 'Vec', Iterable[float]]=0.0,
        y: float=0.0,
        z: float=0.0,
    ) -> None:
        """Create a Vector.

        All values are converted to Floats automatically.
        If no value is given, that axis will be set to 0.
        An iterable can be passed in (as the x argument), which will be
        used for x, y, and z.
        """
        if isinstance(x, (int, float)):
            self.x = float(x)
            self.y = float(y)
            self.z = float(z)
        else:
            it = iter(x)
            self.x = float(next(it, 0.0))
            self.y = float(next(it, y))
            self.z = float(next(it, z))
项目:seproxer    作者:Rastii    | 项目源码 | 文件源码
def test_urls(self, urls: t.Iterable[str]):
        self._proxy.clear_flows()
        for url in urls:
            # TODO: Handle both of these failing
            driver_results = self._driver_controller.get_results(
                url=url,
                controller_wait=self._proxy_pending_requests_wait,
            )
            proxy_results = self._proxy.get_results()

            result = SeproxerUrlResult(
                url=url,
                driver_results=driver_results,
                proxy_results=proxy_results,
            )
            self._result_handler.handle(result)
项目:brainiak    作者:brainiak    | 项目源码 | 文件源码
def multimask_images(images: Iterable[SpatialImage],
                     masks: Sequence[np.ndarray], image_type: type = None
                     ) -> Iterable[Sequence[np.ndarray]]:
    """Mask images with multiple masks.

    Parameters
    ----------
    images:
        Images to mask.
    masks:
        Masks to apply.
    image_type:
        Type to cast images to.

    Yields
    ------
    Sequence[np.ndarray]
        For each mask, a masked image.
    """
    for image in images:
        yield [mask_image(image, mask, image_type) for mask in masks]
项目:brainiak    作者:brainiak    | 项目源码 | 文件源码
def mask_images(images: Iterable[SpatialImage], mask: np.ndarray,
                image_type: type = None) -> Iterable[np.ndarray]:
    """Mask images.

    Parameters
    ----------
    images:
        Images to mask.
    mask:
        Mask to apply.
    image_type:
        Type to cast images to.

    Yields
    ------
    np.ndarray
        Masked image.
    """
    for images in multimask_images(images, (mask,), image_type):
        yield images[0]
项目:modernpython    作者:rhettinger    | 项目源码 | 文件源码
def mean(data: Iterable[float]) -> float:
    'Accurate arithmetic mean'
    data = list(data)
    return fsum(data) / len(data)
项目:modernpython    作者:rhettinger    | 项目源码 | 文件源码
def transpose(matrix: Iterable[Iterable]) -> Iterable[tuple]:
    'Swap rows with columns for a 2-D array'
    return zip(*matrix)
项目:modernpython    作者:rhettinger    | 项目源码 | 文件源码
def assign_data(centroids: Sequence[Centroid], data: Iterable[Point]) -> Dict[Centroid, Sequence[Point]]:
    'Assign data the closest centroid'
    d : DefaultDict[Point, List[Point]] = defaultdict(list)
    for point in data:
        centroid: Point = min(centroids, key=partial(dist, point))
        d[centroid].append(point)
    return dict(d)
项目:modernpython    作者:rhettinger    | 项目源码 | 文件源码
def compute_centroids(groups: Iterable[Sequence[Point]]) -> List[Centroid]:
    'Compute the centroid of each group'
    return [tuple(map(mean, transpose(group))) for group in groups]
项目:modernpython    作者:rhettinger    | 项目源码 | 文件源码
def k_means(data: Iterable[Point], k:int=2, iterations:int=10) -> List[Point]:
    'Return k-centroids for the data'
    data = list(data)
    centroids = sample(data, k)
    for i in range(iterations):
        labeled = assign_data(centroids, data)
        centroids = compute_centroids(labeled.values())
    return centroids
项目:goodreads-api-client-python    作者:mdzhang    | 项目源码 | 文件源码
def id_to_work_id(self, ids: Iterable[str]):
        id_csv = ','.join(ids)
        endpoint = 'book/id_to_work_id/{}'.format(id_csv)
        res = self._transport.req(endpoint=endpoint)
        return res['work-ids']
项目:goodreads-api-client-python    作者:mdzhang    | 项目源码 | 文件源码
def isbn_to_id(self, isbns: Iterable[str]):
        raise NotImplementedError('API always 404s on this endpoint')
项目:goodreads-api-client-python    作者:mdzhang    | 项目源码 | 文件源码
def review_counts(self, isbns: Iterable[str]):
        # This endpoint 406s on non-json content type
        endpoint = 'book/review_counts.json'
        params = {
            'isbns': ','.join(set(isbns)),
        }
        res = self._transport.req(endpoint=endpoint, params=params,
                                  transform='json')
        return res['books']
项目:pylongecity    作者:cyrbon    | 项目源码 | 文件源码
def render(posts: Iterable[Post], output_filepath: Optional[str] = None):
    ''' Puts html from posts into output_filepath (.html) and opens that in a specified browser '''
    posts = list(posts)
    if posts:
        write_and_open_in_browser(render_to_html(posts), output_filepath)
    else: print('Nothing found')
项目:pylongecity    作者:cyrbon    | 项目源码 | 文件源码
def has_links(posts: List[Post], *patterns, **kwargs) -> Iterable[Post]:
    '''
    Returns all posts which have matching links in them.
    Args:
        post (List[Post]): Posts to filter
        *patterns (List[str]): Patterns which links should match
        not (List[str]) : Patterns which links should not match. E.g., not=['google', 'longecity', 'amazon']

    '''
    return filter(lambda p: (len(match_links(p, *patterns, **kwargs)) > 0), posts)
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def grouper(n, iterable, padvalue=None):
    # type: (int, Iterable[Any], Any) -> Iterable[Iterable[Any]]
    """grouper(3, 'abcdefg', 'x') --> ('a','b','c'), ('d','e','f'), ('g','x','x')"""
    return izip_longest(*[iter(iterable)] * n, fillvalue=padvalue)

# ----------------------------------------------------------------------
# Functions to find an executable in the PATH, from:
# http://stackoverflow.com/questions/377017/test-if-executable-exists-in-python
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def option_make(optionname,      # type: AnyStr
                optiontype,      # type: AnyStr
                configs,         # type: Iterable[OptionValue]
                nestedopts=None  # type: Optional[StyleDef]
                ):
    # type: (...) -> Tuple[str, str, List[OptionValue], Optional[StyleDef]]
    configs = [typeconv(c) for c in configs]
    return unistr(optionname), unistr(optiontype), configs, nestedopts
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def styledef_make(options=None):
    # type: (Union[dict, Iterable[Option], None]) -> StyleDef
    if isinstance(options, dict):
        s = styledef_make()
        for _, option in sorted(options.items()):
            if isinstance(option, dict):
                option = styledef_make(option)
            styledef_add_option(option, s)
        return s
    if options is None:
        options = []
    return StyleDef((option_name(o), o) for o in options)
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def inclusiverange(start, stop):
    # type: (int, int) -> Iterable[int]
    return range(start, stop + 1)
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def stylevariants(optionname, values):
    # type: (str, Iterable[OptionValue]) -> List[Style]
    return [stylevariant(optionname, v) for v in values]
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def variants_for(self, option):
        # type: (Option) -> List[Style]

        def kvpairs(vs):
            # type: (Iterable[OptionValue]) -> List[Style]
            return stylevariants(stylename, vs)

        stylename = option_name(option)
        styletype = option_type(option)
        configs = option_configs(option)

        if configs:
            return kvpairs(configs)
        if styletype == 'bool':
            return kvpairs([True, False])
        if styletype == 'int':
            if stylename == 'column_limit':
                # Here we can get weird results, for example
                # in bottle_sqlalchemy.py is a constructor with
                # 8 arguments which are already split between two lines.
                # We find an optimum column limit of 126 because this
                # has less diff lines than putting each argument on a new
                # line. Maybe we should use a different diff metric.
                return kvpairs(self.column_limit_candidates)
            elif stylename == 'indent_width':
                return kvpairs([2, 4, 8])
            elif stylename == 'spaces_before_comment':
                return kvpairs(inclusiverange(1, 4))
            elif stylename.startswith('split_penalty'):
                # We avoid changing large integers whose purpose
                # is not exactly clear for the moment.
                pass
        return []

# ----------------------------------------------------------------------
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def variants_for(self, option):
        # type: (Option) -> List[Style]

        stylename = option_name(option)
        styletype = option_type(option)
        configs = option_configs(option)

        def kvpairs(vs):
            # type: (Iterable[OptionValue]) -> List[Style]
            return stylevariants(stylename, vs)

        if configs:
            return kvpairs(configs)

        if stylename == 'indent':
            return kvpairs(['yes'])

        if stylename == 'wrap':
            return kvpairs([0])

        if stylename == 'indent-spaces':
            return kvpairs(inclusiverange(0, 8))

        if styletype == 'AutoBool':
            return kvpairs(['yes', 'no', 'auto'])

        if styletype == 'Boolean':
            return kvpairs(['yes', 'no'])
        return []
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def variants_for(self, option):
        # type: (Option) -> List[Style]

        def kvpairs(vs):
            # type: (Iterable[OptionValue]) -> List[Style]
            return stylevariants(stylename, vs)

        def numreplace(configs, numvalues):
            # type: (List[OptionValue], Iterable[int]) -> List[OptionValue]
            extconfigs = []  # type: List[OptionValue]
            for c in configs:
                if isinstance(c, text_type) and '#' in c:
                    for n in numvalues:
                        num = str(n)
                        nc = c.replace('#', num)
                        extconfigs.append(nc)
                else:
                    extconfigs.append(c)
            return extconfigs

        stylename = option_name(option)
        configs = option_configs(option)

        if stylename == self.columnlimitname:
            candidates = self.column_limit_candidates
            candidates = [c for c in candidates if 50 <= c <= 200]
            return kvpairs(candidates)
        if stylename == 'indent':
            return kvpairs(numreplace(configs, [2, 4, 8]))
        if stylename == 'min-conditional-indent':
            return kvpairs(numreplace(configs, [0, 1, 2, 3]))
        if stylename == 'max-instatement-indent':
            return kvpairs(numreplace(configs, inclusiverange(40, 120)))
        if stylename == 'mode':
            return []
        if configs:
            return kvpairs(numreplace(configs, [1, 2, 4, 8]))
        return []