Python toolz 模块,concatv() 实例源码

我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用toolz.concatv()

项目:calculette-impots-m-language-parser    作者:openfisca    | 项目源码 | 文件源码
def visit_enumeration(self, node, children):
        def iter_enumerations():
            integers_or_symbols = concatv(
                find(children, type='integer'),
                find(children, type='symbol'),
                )
            values = list(pluck('value', integers_or_symbols))
            if values:
                yield make_json_ast_node(
                    type='enumeration_values',
                    values=values,
                    )
            intervals = find_many_or_none(children, type='interval')
            if intervals is not None:
                yield from intervals

        assert isinstance(children, list), children
        return list(iter_enumerations())
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def test_id_macro_dataset(self):
        """
        input (self.macro_df)
           asof_date  timestamp  value
        0 2014-01-01 2014-01-01      0
        3 2014-01-02 2014-01-02      1
        6 2014-01-03 2014-01-03      2

        output (expected):
                                   value
        2014-01-01 Equity(65 [A])      0
                   Equity(66 [B])      0
                   Equity(67 [C])      0
        2014-01-02 Equity(65 [A])      1
                   Equity(66 [B])      1
                   Equity(67 [C])      1
        2014-01-03 Equity(65 [A])      2
                   Equity(66 [B])      2
                   Equity(67 [C])      2
        """
        asset_info = asset_infos[0][0]
        nassets = len(asset_info)
        with tmp_asset_finder() as finder:
            expected = pd.DataFrame(
                list(concatv([0] * nassets, [1] * nassets, [2] * nassets)),
                index=pd.MultiIndex.from_product((
                    self.macro_df.timestamp,
                    finder.retrieve_all(asset_info.index),
                )),
                columns=('value',),
            )
            self._test_id(
                self.macro_df,
                self.macro_dshape,
                expected,
                finder,
                ('value',),
            )
项目:adt    作者:llllllllll    | 项目源码 | 文件源码
def __new__(mcls, name, bases, dict_):
        self = super().__new__(mcls, name, bases, dict_)
        if len(bases) and bases[0] is ADT:
            self._typevars = dict_._typevars
            self._constructors = tuple(dict_._constructors.values())
            constructors = set(self._constructors)
            for constructor in constructors:
                types = concatv(
                    constructor._args,
                    constructor._kwargs.values(),
                )
                for t in types:
                    if isinstance(t, RecursiveType) and t._name != name:
                        raise TypeError(
                            'recursive type name must be the same as the type'
                            ' name, %r != %r' % (
                                t._name,
                                name,
                            ),
                        )
                    if t in constructors:
                        raise TypeError(
                            'constructor %r has arguments that are other'
                            ' constructors' % constructor,
                        )
            if not self._typevars:
                return adt(self, ())
        return self
项目:calculette-impots-python    作者:openfisca    | 项目源码 | 文件源码
def visit_infix_expression(node, operators={}):
    def interleave(*iterables):
        for values in itertools.zip_longest(*iterables, fillvalue=UnboundLocalError):
            for index, value in enumerate(values):
                if value != UnboundLocalError:
                    yield index, value

    tokens = [
        visit_node(operand_or_operator)
        if index == 0
        else operators.get(operand_or_operator, operand_or_operator)
        for index, operand_or_operator in interleave(node['operands'], node['operators'])
        ]
    # Transform product expressions into a lazy "and" expression in order to prevent a division by 0:
    if node['type'] == 'product_expression':
        tokens = concatv(
            interpose(
                el='and',
                seq=map(visit_node, node['operands']),
                ),
            ['and'],
            tokens,
            )
    return '({})'.format(' '.join(map(str, tokens)))


# Main visitor
项目:dask-patternsearch    作者:eriknw    | 项目源码 | 文件源码
def generate_stencil_points(self):
        return concatv(self._stencil_points, self._stencil_iter)
项目:call_map    作者:nccgroup    | 项目源码 | 文件源码
def update_usage_search_locations(self, platform: str):
        '''Update the places where usages are found

        Call this whenever you load new modules or scripts.

        '''

        if platform.lower().startswith('python'):
            from . import jedi_dump

            jedi_dump.JediCodeElementNode.usage_resolution_modules = (
                frozenset((nn.module_context for nn in
                           tz.concatv(self.module_nodes[platform].values(),
                                      self.script_nodes[platform].values())
                           if nn.code_element.path)))
项目:call_map    作者:nccgroup    | 项目源码 | 文件源码
def keyPressEvent(self, event):
        super().keyPressEvent(event) #ll.setFocus()

        if not event.isAccepted():
            if event.key() == Qt.Qt.Key_Right:
                _next = self.next_call_list()
                if _next and _next.count() > 0:
                    if all(future.done() for future in
                           tz.concatv(self.populate_futures, self.add_next_futures)):
                        _next.setFocus()
                        self.map_widget.ensureWidgetVisible(_next, 0, 0)
                        if self.next_call_list().currentItem() is None:
                            self.next_call_list().setCurrentRow(0)
                    else:
                        wait_item = _next.item(0)
                        wait_item.poked += 1
                        if wait_item.poked == 3:
                            wait_item.setText('QUIT POKING ME')
                event.accept()

            if event.key() == Qt.Qt.Key_Left:
                _prev = self.prev_call_list()
                if _prev:
                    _prev.setFocus()
                    (self.map_widget.ensureWidgetVisible(_prev, 0, 0))
                event.accept()

        if event.key() == Qt.Qt.Key_Space:
            pass
项目:amino    作者:tek    | 项目源码 | 文件源码
def append(self, other: 'LazyList[A]') -> 'LazyList[A]':
        return self.copy(lambda s: concatv(s, self.strict, other.source, other.strict), lambda s: List())
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def test_deltas_macro(self):
        asset_info = asset_infos[0][0]
        expr = bz.data(self.macro_df, name='expr', dshape=self.macro_dshape)
        deltas = bz.data(
            self.macro_df.iloc[:-1],
            name='deltas',
            dshape=self.macro_dshape,
        )
        deltas = bz.transform(
            deltas,
            value=deltas.value + 10,
            timestamp=deltas.timestamp + timedelta(days=1),
        )

        nassets = len(asset_info)
        expected_views = keymap(pd.Timestamp, {
            '2014-01-02': repeat_last_axis(np.array([10.0, 1.0]), nassets),
            '2014-01-03': repeat_last_axis(np.array([11.0, 2.0]), nassets),
        })

        with tmp_asset_finder(equities=asset_info) as finder:
            expected_output = pd.DataFrame(
                list(concatv([10] * nassets, [11] * nassets)),
                index=pd.MultiIndex.from_product((
                    sorted(expected_views.keys()),
                    finder.retrieve_all(asset_info.index),
                )),
                columns=('value',),
            )
            dates = self.dates
            self._run_pipeline(
                expr,
                deltas,
                expected_views,
                expected_output,
                finder,
                calendar=dates,
                start=dates[1],
                end=dates[-1],
                window_length=2,
                compute_fn=np.nanmax,
            )
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def test_novel_deltas_macro(self):
        asset_info = asset_infos[0][0]
        base_dates = pd.DatetimeIndex([
            pd.Timestamp('2014-01-01'),
            pd.Timestamp('2014-01-04')
        ])
        baseline = pd.DataFrame({
            'value': (0, 1),
            'asof_date': base_dates,
            'timestamp': base_dates,
        })
        expr = bz.data(baseline, name='expr', dshape=self.macro_dshape)
        deltas = bz.data(baseline, name='deltas', dshape=self.macro_dshape)
        deltas = bz.transform(
            deltas,
            value=deltas.value + 10,
            timestamp=deltas.timestamp + timedelta(days=1),
        )

        nassets = len(asset_info)
        expected_views = keymap(pd.Timestamp, {
            '2014-01-03': repeat_last_axis(
                np.array([10.0, 10.0, 10.0]),
                nassets,
            ),
            '2014-01-06': repeat_last_axis(
                np.array([10.0, 10.0, 11.0]),
                nassets,
            ),
        })

        cal = pd.DatetimeIndex([
            pd.Timestamp('2014-01-01'),
            pd.Timestamp('2014-01-02'),
            pd.Timestamp('2014-01-03'),
            # omitting the 4th and 5th to simulate a weekend
            pd.Timestamp('2014-01-06'),
        ])
        with tmp_asset_finder(equities=asset_info) as finder:
            expected_output = pd.DataFrame(
                list(concatv([10] * nassets, [11] * nassets)),
                index=pd.MultiIndex.from_product((
                    sorted(expected_views.keys()),
                    finder.retrieve_all(asset_info.index),
                )),
                columns=('value',),
            )
            self._run_pipeline(
                expr,
                deltas,
                expected_views,
                expected_output,
                finder,
                calendar=cal,
                start=cal[2],
                end=cal[-1],
                window_length=3,
                compute_fn=op.itemgetter(-1),
            )
项目:lain    作者:llllllllll    | 项目源码 | 文件源码
def summary(feature_names, features, **labels):
    """Summarize the data we are about to train with.

    Parameters
    ----------
    feature_names : iterable[str]
        The names of the features in the ``features`` array.
    features : np.ndarray
        The 3d feature array.
    **labels
        The named label arrays.

    Returns
    -------
    summary : str
        A summary of the features and labels.
    """
    single_attribute_template = dedent(
        """\
        {name}:
          mean: {mean}
          std:  {std}
          min:  {min}
          max:  {max}""",
    )

    def format_attribute(name, values):
        return '    ' + '\n    '.join(
            single_attribute_template.format(
                name=name,
                mean=values.mean(),
                std=values.std(),
                min=values.min(),
                max=values.max(),
            ).splitlines(),
        )

    return '\n'.join(concatv(
        (
            'summary:',
            '  labels:',
        ),
        (
            format_attribute(name, value)
            for name, value in sorted(labels.items(), key=first)
        ),
        (
            'features:',
        ),
        (
            format_attribute(name, features[..., ix])
            for ix, name in enumerate(feature_names)
        )
    ))
项目:catalyst    作者:enigmampc    | 项目源码 | 文件源码
def load_extensions(default, extensions, strict, environ, reload=False):
    """Load all of the given extensions. This should be called by run_algo
    or the cli.

    Parameters
    ----------
    default : bool
        Load the default exension (~/.catalyst/extension.py)?
    extension : iterable[str]
        The paths to the extensions to load. If the path ends in ``.py`` it is
        treated as a script and executed. If it does not end in ``.py`` it is
        treated as a module to be imported.
    strict : bool
        Should failure to load an extension raise. If this is false it will
        still warn.
    environ : mapping
        The environment to use to find the default extension path.
    reload : bool, optional
        Reload any extensions that have already been loaded.
    """
    if default:
        default_extension_path = pth.default_extension(environ=environ)
        pth.ensure_file(default_extension_path)
        # put the default extension first so other extensions can depend on
        # the order they are loaded
        extensions = concatv([default_extension_path], extensions)

    for ext in extensions:
        if ext in _loaded_extensions and not reload:
            continue
        try:
            # load all of the catalyst extensionss
            if ext.endswith('.py'):
                run_path(ext, run_name='<extension>')
            else:
                __import__(ext)
        except Exception as e:
            if strict:
                # if `strict` we should raise the actual exception and fail
                raise
            # without `strict` we should just log the failure
            warnings.warn(
                'Failed to load extension: %r\n%s' % (ext, e),
                stacklevel=2
            )
        else:
            _loaded_extensions.add(ext)
项目:catalyst    作者:enigmampc    | 项目源码 | 文件源码
def test_deltas_macro(self):
        expr = bz.data(self.macro_df, name='expr', dshape=self.macro_dshape)
        deltas = bz.data(
            self.macro_df.iloc[:-1],
            name='deltas',
            dshape=self.macro_dshape,
        )
        deltas = bz.transform(
            deltas,
            value=deltas.value + 10,
            timestamp=deltas.timestamp + timedelta(days=1),
        )

        nassets = len(simple_asset_info)
        expected_views = keymap(pd.Timestamp, {
            '2014-01-02': np.array([[10.0],
                                    [1.0]]),
            '2014-01-03': np.array([[11.0],
                                    [2.0]]),
        })

        with tmp_asset_finder(equities=simple_asset_info) as finder:
            expected_output = pd.DataFrame(
                list(concatv([10] * nassets, [11] * nassets)),
                index=pd.MultiIndex.from_product((
                    sorted(expected_views.keys()),
                    finder.retrieve_all(simple_asset_info.index),
                )),
                columns=('value',),
            )
            dates = self.dates
            self._run_pipeline(
                expr,
                deltas,
                None,
                expected_views,
                expected_output,
                finder,
                calendar=dates,
                start=dates[1],
                end=dates[-1],
                window_length=2,
                compute_fn=np.nanmax,
            )
项目:catalyst    作者:enigmampc    | 项目源码 | 文件源码
def _test_checkpoints_macro(self, checkpoints, ffilled_value=-1.0):
        """Simple checkpoints test that accepts a checkpoints dataframe and
        the expected value for 2014-01-03 for macro datasets.

        The underlying data has value -1.0 on 2014-01-01 and 1.0 on 2014-01-04.

        Parameters
        ----------
        checkpoints : pd.DataFrame
            The checkpoints data.
        ffilled_value : float, optional
            The value to be read on the third, if not provided, it will be the
            value in the base data that will be naturally ffilled there.
        """
        dates = pd.Timestamp('2014-01-01'), pd.Timestamp('2014-01-04')
        baseline = pd.DataFrame({
            'value': [-1.0, 1.0],
            'asof_date': dates,
            'timestamp': dates,
        })

        nassets = len(simple_asset_info)
        expected_views = keymap(pd.Timestamp, {
            '2014-01-03': np.array([[ffilled_value]]),
            '2014-01-04': np.array([[1.0]]),
        })

        with tmp_asset_finder(equities=simple_asset_info) as finder:
            expected_output = pd.DataFrame(
                list(concatv([ffilled_value] * nassets, [1.0] * nassets)),
                index=pd.MultiIndex.from_product((
                    sorted(expected_views.keys()),
                    finder.retrieve_all(simple_asset_info.index),
                )),
                columns=('value',),
            )

            self._run_pipeline(
                bz.data(baseline, name='expr', dshape=self.macro_dshape),
                None,
                bz.data(
                    checkpoints,
                    name='expr_checkpoints',
                    dshape=self.macro_dshape,
                ),
                expected_views,
                expected_output,
                finder,
                calendar=pd.date_range('2014-01-01', '2014-01-04'),
                start=pd.Timestamp('2014-01-03'),
                end=dates[-1],
                window_length=1,
                compute_fn=op.itemgetter(-1),
            )