Python toolz 模块,compose() 实例源码

我们从Python开源项目中,提取了以下12个代码示例,用于说明如何使用toolz.compose()

项目:saapy    作者:ashapochka    | 项目源码 | 文件源码
def doctable(ctx):
    df = pd.read_csv('./docs/flight-options.csv')

    # open an existing document
    doc = docx.Document('./docs/style-reference.docx')

    as_int = partial(format_decimal, format='#')
    as_usd = partial(format_currency, currency='USD')

    s = doc.sections[0]
    width = s.page_width - s.left_margin - s.right_margin

    doc.add_picture('./docs/diagrams_002.png', width=width)

    formatters = {
        'ticket_price': as_usd,
        'total_hours': as_int,
        'trip': as_int,
        'airline': partial(shorten_long_name, width=20),
        'selected': compose({0: 'No', 1: 'Yes'}.get, int)
    }
    add_table(df, doc, table_style='Plain Table 3', formatters=formatters)

    # save the doc
    doc.save('./docs/test.docx')
项目:catalyst    作者:enigmampc    | 项目源码 | 文件源码
def matches(self, pattern):
        """
        Elementwise regex match.

        Parameters
        ----------
        pattern : str or compiled regex

        Returns
        -------
        matches : np.ndarray[bool]
            An array with the same shape as self indicating whether each
            element of self was matched by ``pattern``.
        """
        return self.map_predicate(compose(bool, pattern.match))

    # These types all implement an O(N) __contains__, so pre-emptively
    # coerce to `set`.
项目:gpu_monitor    作者:msalvaris    | 项目源码 | 文件源码
def create_influxdb_writer(influxdb_client, series_name="gpu_measurements", **tags):
    """ Returns function which writes to influxdb

    Parameters
    ----------
    influxdb_client:
    series_name: (str)
    tags: Extra tags to be added to the measurements
    """
    to_influxdb = _influxdb_writer_for(influxdb_client, series_name)

    if tags:
        logger.debug('Creating writer with tags')
        write_to_db = compose(to_influxdb,
                              _add_tags(tags),
                              _to_json_dict,
                              parse_line)
    else:
        logger.debug('Creating writer')
        write_to_db = compose(to_influxdb,
                              _to_json_dict,
                              parse_line)

    return _call_when(write_to_db, lambda x: x is not None and '#' not in x)
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def test_quantiles_uneven_buckets(self):
        permute = partial(permute_rows, 5)
        shape = (5, 5)

        factor_data = permute(log1p(arange(25, dtype=float).reshape(shape)))
        mask_data = permute(self.eye_mask(shape=shape))

        f = F()
        m = Mask()

        permuted_array = compose(permute, partial(array, dtype=int64_dtype))
        self.check_terms(
            terms={
                '3_masked': f.quantiles(bins=3, mask=m),
                '7_masked': f.quantiles(bins=7, mask=m),
            },
            initial_workspace={
                f: factor_data,
                m: mask_data,
            },
            expected={
                '3_masked': permuted_array([[-1, 0,  0,  1,  2],
                                            [0, -1,  0,  1,  2],
                                            [0,  0, -1,  1,  2],
                                            [0,  0,  1, -1,  2],
                                            [0,  0,  1,  2, -1]]),
                '7_masked': permuted_array([[-1, 0,  2,  4,  6],
                                            [0, -1,  2,  4,  6],
                                            [0,  2, -1,  4,  6],
                                            [0,  2,  4, -1,  6],
                                            [0,  2,  4,  6, -1]]),
            },
            mask=self.build_mask(self.ones_mask(shape=shape)),
        )
项目:web3utils.py    作者:carver    | 项目源码 | 文件源码
def __call__(self, *args, **kwargs):
        contract = self.contract(*args, **kwargs)
        if type(contract) == type:
            # contract still needs to be initialized, wrap afterwards
            return compose(ContractSugar, contract)
        else:
            return ContractSugar(contract)
项目:catalyst    作者:enigmampc    | 项目源码 | 文件源码
def test_quantiles_uneven_buckets(self):
        permute = partial(permute_rows, 5)
        shape = (5, 5)

        factor_data = permute(log1p(arange(25, dtype=float).reshape(shape)))
        mask_data = permute(self.eye_mask(shape=shape))

        f = F()
        m = Mask()

        permuted_array = compose(permute, partial(array, dtype=int64_dtype))
        self.check_terms(
            terms={
                '3_masked': f.quantiles(bins=3, mask=m),
                '7_masked': f.quantiles(bins=7, mask=m),
            },
            initial_workspace={
                f: factor_data,
                m: mask_data,
            },
            expected={
                '3_masked': permuted_array([[-1, 0,  0,  1,  2],
                                            [0, -1,  0,  1,  2],
                                            [0,  0, -1,  1,  2],
                                            [0,  0,  1, -1,  2],
                                            [0,  0,  1,  2, -1]]),
                '7_masked': permuted_array([[-1, 0,  2,  4,  6],
                                            [0, -1,  2,  4,  6],
                                            [0,  2, -1,  4,  6],
                                            [0,  2,  4, -1,  6],
                                            [0,  2,  4,  6, -1]]),
            },
            mask=self.build_mask(self.ones_mask(shape=shape)),
        )
项目:databrewer    作者:rmax    | 项目源码 | 文件源码
def format_results(terminal_width, key_list, separator, text_list,
                   left_align=True, min_factor=3, **kwargs):
    """Returns formatted results in two columns.
    """
    key_width = max(map(len, key_list))
    separator_length = len(separator)
    desc_wrap = toolz.identity
    if terminal_width:
        if key_width / terminal_width > .5:
            key_width = terminal_width // 2 - 3
        text_width = terminal_width - key_width - separator_length
        if text_width * min_factor > terminal_width:
            desc_wrap = toolz.compose(
                ('\n' + ' ' * (key_width + separator_length)).join,
                toolz.partial(textwrap.wrap, width=text_width, **kwargs),
            )

    if left_align:
        fmt = '%-*s%s%s'
    else:
        fmt = '%*s%s%s'

    for key, text in zip(key_list, text_list):
        text = desc_wrap(text)
        if len(key) > key_width:
            yield fmt % (key_width, key, separator, '')
            yield fmt % (key_width, '', ' ' * separator_length, text)
        else:
            yield fmt % (key_width, key, separator, text)
项目:airship-convert    作者:airship-convert    | 项目源码 | 文件源码
def transform_paragraphs(notebook, transformations=None):
    """"""
    if transformations:
        f = compose(*reversed(transformations))
        notebook["paragraphs"] = [f(p) for p in notebook.get("paragraphs", [])]
    return notebook
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def test_quantiles_unmasked(self, seed):
        permute = partial(permute_rows, seed)

        shape = (6, 6)

        # Shuffle the input rows to verify that we don't depend on the order.
        # Take the log to ensure that we don't depend on linear scaling or
        # integrality of inputs
        factor_data = permute(log1p(arange(36, dtype=float).reshape(shape)))

        f = self.f

        # Apply the same shuffle we applied to the input rows to our
        # expectations. Doing it this way makes it obvious that our
        # expectation corresponds to our input, while still testing against
        # a range of input orderings.
        permuted_array = compose(permute, partial(array, dtype=int64_dtype))
        self.check_terms(
            terms={
                '2': f.quantiles(bins=2),
                '3': f.quantiles(bins=3),
                '6': f.quantiles(bins=6),
            },
            initial_workspace={
                f: factor_data,
            },
            expected={
                # The values in the input are all increasing, so the first half
                # of each row should be in the bottom bucket, and the second
                # half should be in the top bucket.
                '2': permuted_array([[0, 0, 0, 1, 1, 1],
                                     [0, 0, 0, 1, 1, 1],
                                     [0, 0, 0, 1, 1, 1],
                                     [0, 0, 0, 1, 1, 1],
                                     [0, 0, 0, 1, 1, 1],
                                     [0, 0, 0, 1, 1, 1]]),
                # Similar for three buckets.
                '3': permuted_array([[0, 0, 1, 1, 2, 2],
                                     [0, 0, 1, 1, 2, 2],
                                     [0, 0, 1, 1, 2, 2],
                                     [0, 0, 1, 1, 2, 2],
                                     [0, 0, 1, 1, 2, 2],
                                     [0, 0, 1, 1, 2, 2]]),
                # In the limiting case, we just have every column different.
                '6': permuted_array([[0, 1, 2, 3, 4, 5],
                                     [0, 1, 2, 3, 4, 5],
                                     [0, 1, 2, 3, 4, 5],
                                     [0, 1, 2, 3, 4, 5],
                                     [0, 1, 2, 3, 4, 5],
                                     [0, 1, 2, 3, 4, 5]]),
            },
            mask=self.build_mask(self.ones_mask(shape=shape)),
        )
项目:catalyst    作者:enigmampc    | 项目源码 | 文件源码
def expect_types(__funcname=_qualified_name, **named):
    """
    Preprocessing decorator that verifies inputs have expected types.

    Examples
    --------
    >>> @expect_types(x=int, y=str)
    ... def foo(x, y):
    ...    return x, y
    ...
    >>> foo(2, '3')
    (2, '3')
    >>> foo(2.0, '3')  # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
    Traceback (most recent call last):
       ...
    TypeError: ...foo() expected a value of type int for argument 'x',
    but got float instead.

    Notes
    -----
    A special argument, __funcname, can be provided as a string to override the
    function name shown in error messages.  This is most often used on __init__
    or __new__ methods to make errors refer to the class name instead of the
    function name.
    """
    for name, type_ in iteritems(named):
        if not isinstance(type_, (type, tuple)):
            raise TypeError(
                "expect_types() expected a type or tuple of types for "
                "argument '{name}', but got {type_} instead.".format(
                    name=name, type_=type_,
                )
            )

    def _expect_type(type_):
        # Slightly different messages for type and tuple of types.
        _template = (
            "%(funcname)s() expected a value of type {type_or_types} "
            "for argument '%(argname)s', but got %(actual)s instead."
        )
        if isinstance(type_, tuple):
            template = _template.format(
                type_or_types=' or '.join(map(_qualified_name, type_))
            )
        else:
            template = _template.format(type_or_types=_qualified_name(type_))

        return make_check(
            exc_type=TypeError,
            template=template,
            pred=lambda v: not isinstance(v, type_),
            actual=compose(_qualified_name, type),
            funcname=__funcname,
        )

    return preprocess(**valmap(_expect_type, named))
项目:catalyst    作者:enigmampc    | 项目源码 | 文件源码
def test_quantiles_unmasked(self, seed):
        permute = partial(permute_rows, seed)

        shape = (6, 6)

        # Shuffle the input rows to verify that we don't depend on the order.
        # Take the log to ensure that we don't depend on linear scaling or
        # integrality of inputs
        factor_data = permute(log1p(arange(36, dtype=float).reshape(shape)))

        f = self.f

        # Apply the same shuffle we applied to the input rows to our
        # expectations. Doing it this way makes it obvious that our
        # expectation corresponds to our input, while still testing against
        # a range of input orderings.
        permuted_array = compose(permute, partial(array, dtype=int64_dtype))
        self.check_terms(
            terms={
                '2': f.quantiles(bins=2),
                '3': f.quantiles(bins=3),
                '6': f.quantiles(bins=6),
            },
            initial_workspace={
                f: factor_data,
            },
            expected={
                # The values in the input are all increasing, so the first half
                # of each row should be in the bottom bucket, and the second
                # half should be in the top bucket.
                '2': permuted_array([[0, 0, 0, 1, 1, 1],
                                     [0, 0, 0, 1, 1, 1],
                                     [0, 0, 0, 1, 1, 1],
                                     [0, 0, 0, 1, 1, 1],
                                     [0, 0, 0, 1, 1, 1],
                                     [0, 0, 0, 1, 1, 1]]),
                # Similar for three buckets.
                '3': permuted_array([[0, 0, 1, 1, 2, 2],
                                     [0, 0, 1, 1, 2, 2],
                                     [0, 0, 1, 1, 2, 2],
                                     [0, 0, 1, 1, 2, 2],
                                     [0, 0, 1, 1, 2, 2],
                                     [0, 0, 1, 1, 2, 2]]),
                # In the limiting case, we just have every column different.
                '6': permuted_array([[0, 1, 2, 3, 4, 5],
                                     [0, 1, 2, 3, 4, 5],
                                     [0, 1, 2, 3, 4, 5],
                                     [0, 1, 2, 3, 4, 5],
                                     [0, 1, 2, 3, 4, 5],
                                     [0, 1, 2, 3, 4, 5]]),
            },
            mask=self.build_mask(self.ones_mask(shape=shape)),
        )
项目:catalyst    作者:enigmampc    | 项目源码 | 文件源码
def test_top_and_bottom_with_groupby_and_mask(self, dtype, seed):
        permute = partial(permute_rows, seed)
        permuted_array = compose(permute, partial(array, dtype=int64_dtype))

        shape = (8, 8)

        # Shuffle the input rows to verify that we correctly pick out the top
        # values independently of order.
        factor_data = permute(arange(0, 64, dtype=dtype).reshape(shape))
        classifier_data = permuted_array([[0, 0, 1, 1, 2, 2, 0, 0],
                                          [0, 0, 1, 1, 2, 2, 0, 0],
                                          [0, 1, 2, 3, 0, 1, 2, 3],
                                          [0, 1, 2, 3, 0, 1, 2, 3],
                                          [0, 0, 0, 0, 1, 1, 1, 1],
                                          [0, 0, 0, 0, 1, 1, 1, 1],
                                          [0, 0, 0, 0, 0, 0, 0, 0],
                                          [0, 0, 0, 0, 0, 0, 0, 0]])

        f = self.f
        c = self.c

        self.check_terms(
            terms={
                'top2': f.top(2, groupby=c),
                'bottom2': f.bottom(2, groupby=c),
            },
            initial_workspace={
                f: factor_data,
                c: classifier_data,
            },
            expected={
                # Should be the rightmost two entries in classifier_data,
                # ignoring the off-diagonal.
                'top2': permuted_array([[0, 1, 1, 1, 1, 1, 1, 0],
                                        [0, 1, 1, 1, 1, 1, 0, 1],
                                        [1, 1, 1, 1, 1, 0, 1, 1],
                                        [1, 1, 1, 1, 0, 1, 1, 1],
                                        [0, 1, 1, 0, 0, 0, 1, 1],
                                        [0, 1, 0, 1, 0, 0, 1, 1],
                                        [0, 0, 0, 0, 0, 0, 1, 1],
                                        [0, 0, 0, 0, 0, 0, 1, 1]], dtype=bool),
                # Should be the rightmost two entries in classifier_data,
                # ignoring the off-diagonal.
                'bottom2': permuted_array([[1, 1, 1, 1, 1, 1, 0, 0],
                                           [1, 1, 1, 1, 1, 1, 0, 0],
                                           [1, 1, 1, 1, 1, 0, 1, 1],
                                           [1, 1, 1, 1, 0, 1, 1, 1],
                                           [1, 1, 0, 0, 1, 1, 0, 0],
                                           [1, 1, 0, 0, 1, 1, 0, 0],
                                           [1, 0, 1, 0, 0, 0, 0, 0],
                                           [0, 1, 1, 0, 0, 0, 0, 0]],
                                          dtype=bool),
            },
            mask=self.build_mask(permute(rot90(self.eye_mask(shape=shape)))),
        )