Python collections.abc 模块,Mapping() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用collections.abc.Mapping()

项目:yarl    作者:aio-libs    | 项目源码 | 文件源码
def with_query(self, *args, **kwargs):
        """Return a new URL with query part replaced.

        Accepts any Mapping (e.g. dict, multidict.MultiDict instances)
        or str, autoencode the argument if needed.

        It also can take an arbitrary number of keyword arguments.

        Clear query if None is passed.

        """
        # N.B. doesn't cleanup query/fragment

        new_query = self._get_str_query(*args, **kwargs)
        return URL(
            self._val._replace(path=self._val.path, query=new_query),
            encoded=True)
项目:phredutils    作者:doctaphred    | 项目源码 | 文件源码
def diffs(*mappings, missing=MISSING):
    """Yield keys and values which differ between the two mappings.

    A 'mapping' is any object which implements keys() and __getitem__().
    """
    assert mappings
    assert all(isinstance(mapping, Mapping) for mapping in mappings)

    # Defer to __eq__(), even if it contradicts the algorithm below
    if all_eq(mappings):
        return

    keys = chain.from_iterable(mapping.keys() for mapping in mappings)
    for key in unique(keys):
        vals = tuple(values(mappings, key))
        if not all_eq(vals):
            yield key, vals
项目:shanghai    作者:chireiden    | 项目源码 | 文件源码
def __getitem__(self, key: str) -> Any:
        node = self.mapping
        leafs = key.split(".")

        for i, leaf in enumerate(leafs):
            if not isinstance(node, c_abc.Mapping):
                raise KeyError(f"Element {'.'.join(leafs[:i])!r} is not a mapping")
            if not leaf:
                raise KeyError(f"Empty sub-key after {'.'.join(leafs[:i])!r}")
            if leaf not in node:
                break
            node = node[leaf]
        else:
            return node

        raise KeyError(f"Cannot find '{key}'")
项目:multidict    作者:aio-libs    | 项目源码 | 文件源码
def __eq__(self, other):
        if not isinstance(other, abc.Mapping):
            return NotImplemented
        if isinstance(other, _Base):
            lft = self._impl._items
            rht = other._impl._items
            if len(lft) != len(rht):
                return False
            for (i1, k2, v1), (i2, k2, v2) in zip(lft, rht):
                if i1 != i2 or v1 != v2:
                    return False
            return True
        if len(self._impl._items) != len(other):
            return False
        for k, v in self.items():
            nv = other.get(k, _marker)
            if v != nv:
                return False
        return True
项目:pep    作者:pepkit    | 项目源码 | 文件源码
def basic_instance_data(request, instance_raw_data):
    """
    Transform the raw data for a basic model instance to comply with its ctor.

    :param pytest._pytest.fixtures.SubRequest request: test case requesting
        the basic instance data
    :param Mapping instance_raw_data: the raw data needed to create a
        model instance
    :return object: basic instance data in a form accepted by its constructor
    """
    # Cleanup is free with _write_config, using request's temp folder.
    transformation_by_class = {
            "AttributeDict": lambda data: data,
            "PipelineInterface": lambda data:
                    _write_config(data, request, "pipeline_interface.yaml"),
            "ProtocolInterface": lambda data:
                    _write_config(data, request, "pipeline_interface.yaml"),
            "ProtocolMapper": lambda data: data,
            "Sample": lambda data: pd.Series(data)}
    which_class = request.getfixturevalue("class_name")
    return transformation_by_class[which_class](instance_raw_data)
项目:pep    作者:pepkit    | 项目源码 | 文件源码
def _write_config(data, request, filename):
    """
    Write configuration data to file.

    :param str Sequence | Mapping data: data to write to file, YAML compliant
    :param pytest._pytest.fixtures.SubRequest request: test case that
        requested a fixture from which this function was called
    :param str filename: name for the file to write
    :return str: full path to the file written
    """
    # We get cleanup for free by writing to file in requests temp folder.
    dirpath = request.getfixturevalue("tmpdir").strpath
    filepath = os.path.join(dirpath, filename)
    with open(filepath, 'w') as conf_file:
        yaml.safe_dump(data, conf_file)
    return filepath
项目:deb-python-json-pointer    作者:openstack    | 项目源码 | 文件源码
def get_part(self, doc, part):
        """ Returns the next step in the correct type """

        if isinstance(doc, Mapping):
            return part

        elif isinstance(doc, Sequence):

            if part == '-':
                return part

            if not RE_ARRAY_INDEX.match(str(part)):
                raise JsonPointerException("'%s' is not a valid list index" % (part, ))

            return int(part)

        elif hasattr(doc, '__getitem__'):
            # Allow indexing via ducktyping if the target has defined __getitem__
            return part

        else:
            raise JsonPointerException("Document '%s' does not support indexing, "
                                       "must be dict/list or support __getitem__" % type(doc))
项目:linearmodels    作者:bashtage    | 项目源码 | 文件源码
def test_against_direct_model(data):
    keys = list(data.keys())
    if not isinstance(data[keys[0]], Mapping):
        return
    if 'weights' in data[keys[0]]:
        return
    y = []
    x = []
    data_copy = OrderedDict()
    for i in range(min(3, len(data))):
        data_copy[keys[i]] = data[keys[i]]
        y.append(data[keys[i]]['dependent'])
        x.append(data[keys[i]]['exog'])

    direct = simple_sur(y, x)
    mod = SUR(data_copy)
    res = mod.fit(method='ols')
    assert_allclose(res.params.values[:, None], direct.beta0)

    res = mod.fit(method='gls')
    assert_allclose(res.params.values[:, None], direct.beta1)
项目:signac-project-template    作者:glotzerlab    | 项目源码 | 文件源码
def cast_json(json_dict):
    """Convert an arbitrary JSON source into MongoDB
    compatible format."""
    DOT = '_'
    DOLLAR = '\uff04'
    if isinstance(json_dict, str):
        return json_dict.replace('.', DOT).replace('$', DOLLAR)
    if six.PY2 and isinstance(json_dict, unicode):  # noqa
        return json_dict.replace('.', DOT).replace('$', DOLLAR)
    if isinstance(json_dict, Mapping):
        return {cast_json(key): cast_json(value) for
                key, value in json_dict.items()}
    elif isinstance(json_dict, Iterable):
        return [cast_json(o) for o in json_dict]
    else:
        return json_dict
项目:signac-project-template    作者:glotzerlab    | 项目源码 | 文件源码
def cast_json(json_dict):
    """Convert an arbitrary JSON source into MongoDB
    compatible format."""
    DOT = '_'
    DOLLAR = '\uff04'
    if isinstance(json_dict, str):
        return json_dict.replace('.', DOT).replace('$', DOLLAR)
    if six.PY2 and isinstance(json_dict, unicode):  # noqa
        return json_dict.replace('.', DOT).replace('$', DOLLAR)
    if isinstance(json_dict, Mapping):
        return {cast_json(key): cast_json(value) for
                key, value in json_dict.items()}
    elif isinstance(json_dict, Iterable):
        return [cast_json(o) for o in json_dict]
    else:
        return json_dict
项目:signac-project-template    作者:glotzerlab    | 项目源码 | 文件源码
def cast_json(json_dict):
    """Convert an arbitrary JSON source into MongoDB
    compatible format."""
    DOT = '_'
    DOLLAR = '\uff04'
    if isinstance(json_dict, str):
        return json_dict.replace('.', DOT).replace('$', DOLLAR)
    if six.PY2 and isinstance(json_dict, unicode):  # noqa
        return json_dict.replace('.', DOT).replace('$', DOLLAR)
    if isinstance(json_dict, Mapping):
        return {cast_json(key): cast_json(value) for
                key, value in json_dict.items()}
    elif isinstance(json_dict, Iterable):
        return [cast_json(o) for o in json_dict]
    else:
        return json_dict
项目:signac-project-template    作者:glotzerlab    | 项目源码 | 文件源码
def cast_json(json_dict):
    """Convert an arbitrary JSON source into MongoDB
    compatible format."""
    DOT = '_'
    DOLLAR = '\uff04'
    if isinstance(json_dict, str):
        return json_dict.replace('.', DOT).replace('$', DOLLAR)
    if six.PY2 and isinstance(json_dict, unicode):  # noqa
        return json_dict.replace('.', DOT).replace('$', DOLLAR)
    if isinstance(json_dict, Mapping):
        return {cast_json(key): cast_json(value) for
                key, value in json_dict.items()}
    elif isinstance(json_dict, Iterable):
        return [cast_json(o) for o in json_dict]
    else:
        return json_dict
项目:grako    作者:apalala    | 项目源码 | 文件源码
def asjson(obj, seen=None):
    if isinstance(obj, collections.Mapping) or isiter(obj):
        # prevent traversal of recursive structures
        if seen is None:
            seen = set()
        elif id(obj) in seen:
            return '__RECURSIVE__'
        seen.add(id(obj))

    if hasattr(obj, '__json__') and type(obj) is not type:
        return obj.__json__()
    elif isinstance(obj, collections.Mapping):
        result = collections.OrderedDict()
        for k, v in obj.items():
            try:
                result[asjson(k, seen)] = asjson(v, seen)
            except TypeError:
                debug('Unhashable key?', type(k), str(k))
                raise
        return result
    elif isiter(obj):
        return [asjson(e, seen) for e in obj]
    else:
        return obj
项目:yatta_reader    作者:sound88    | 项目源码 | 文件源码
def get_part(self, doc, part):
        """Returns the next step in the correct type"""

        if isinstance(doc, Mapping):
            return part

        elif isinstance(doc, Sequence):

            if part == '-':
                return part

            if not self._RE_ARRAY_INDEX.match(str(part)):
                raise JsonPointerException("'%s' is not a valid sequence index" % part)

            return int(part)

        elif hasattr(doc, '__getitem__'):
            # Allow indexing via ducktyping
            # if the target has defined __getitem__
            return part

        else:
            raise JsonPointerException("Document '%s' does not support indexing, "
                                       "must be mapping/sequence or support __getitem__" % type(doc))
项目:python-pool-performance    作者:JohnStarich    | 项目源码 | 文件源码
def run_test(work_type: FunctionType, job_sets: Sequence, trials: int,
             pool_class: type, worker_count: int) -> Mapping:
    pool = pool_class(worker_count)
    if work_type == 'compute':
        test_func = pool.run_compute_test
    elif work_type == 'network':
        test_func = pool.run_network_test
    else:
        raise Exception("Invalid work type: {}".format(work_type))
    results = map(
        lambda jobs: test_func(jobs, trials, show_progress=True),
        tqdm(job_sets, desc=pool_class.__name__),
    )
    summarized_results = list(map(summarize_test, results))
    pool.destroy_pool()
    return summarized_results
项目:phredutils    作者:doctaphred    | 项目源码 | 文件源码
def frozen(struct):
    """Return an immutable, hashable version of the given data structure.

    Iterators (including generators) are hashable but mutable, so they
    are evaluated and returned as tuples---if they are infinite, this
    function will not exit.
    """
    if isinstance(struct, Mapping):
        return frozenset((k, frozen(v)) for k, v in struct.items())
    if isinstance(struct, Set):
        return frozenset(frozen(item) for item in struct)
    if isinstance(struct, Iterable):  # Includes iterators and generators
        return tuple(frozen(item) for item in struct)
    hash(struct)  # Raise TypeError for unhashable objects
    return struct
项目:phredutils    作者:doctaphred    | 项目源码 | 文件源码
def hashified(struct, use_none=False):
    """Return a hashable version of the given data structure.

    If use_none is True, returns None instead of raising TypeError for
    unhashable types: this will serve as a bad but sometimes passable
    hash.

    See also functools._make_key, which might be a better choice.
    """
    try:
        hash(struct)
    except TypeError:
        pass
    else:
        # Return the original object if it's already hashable
        return struct

    if isinstance(struct, Mapping):
        return frozenset((k, hashified(v)) for k, v in struct.items())
    if isinstance(struct, Set):
        return frozenset(hashified(item) for item in struct)
    if isinstance(struct, Iterable):
        return tuple(hashified(item) for item in struct)

    if use_none:
        return None

    raise TypeError('unhashable type: {.__name__!r}'.format(type(struct)))
项目:mfnf-pdf-export    作者:Lodifice    | 项目源码 | 文件源码
def __call__(self, obj):
        """Transforms the JSON object `obj`."""
        if isinstance(obj, str):
            return obj
        elif isinstance(obj, Sequence):
            return self.act_on_list(obj)
        elif isinstance(obj, Mapping):
            return self.act_on_dict(obj)
        else:
            return obj
项目:notebooks    作者:fluentpython    | 项目源码 | 文件源码
def build(cls, obj):
        if isinstance(obj, abc.Mapping):
            return cls(obj)
        elif isinstance(obj, abc.MutableSequence):
            return [cls.build(item) for item in obj]
        else:  # <8>
            return obj
项目:notebooks    作者:fluentpython    | 项目源码 | 文件源码
def build(cls, obj):  # <5>
        if isinstance(obj, abc.Mapping):  # <6>
            return cls(obj)
        elif isinstance(obj, abc.MutableSequence):  # <7>
            return [cls.build(item) for item in obj]
        else:  # <8>
            return obj
# END EXPLORE0
项目:notebooks    作者:fluentpython    | 项目源码 | 文件源码
def __new__(cls, arg):  # <1>
        if isinstance(arg, abc.Mapping):
            return super().__new__(cls)  # <2>
        elif isinstance(arg, abc.MutableSequence):  # <3>
            return [cls(item) for item in arg]
        else:
            return arg
项目:pymzn    作者:paolodragone    | 项目源码 | 文件源码
def _is_list(obj):
    return (isinstance(obj, Sized) and isinstance(obj, Iterable) and
            not isinstance(obj, (Set, Mapping)))
项目:pymzn    作者:paolodragone    | 项目源码 | 文件源码
def _is_dict(obj):
    return isinstance(obj, Mapping)
项目:shanghai    作者:chireiden    | 项目源码 | 文件源码
def __init__(self, mapping: Mapping = None) -> None:
        if mapping is None:
            mapping = {}
        if not isinstance(mapping, c_abc.Mapping):
            raise ValueError("Must be a mapping")
        self.mapping = mapping
项目:shanghai    作者:chireiden    | 项目源码 | 文件源码
def __init__(self, mapping: Mapping, *fallback_configs: Configuration) -> None:
        super().__init__(mapping)
        # for fb_c in fallback_configs:
        #     if not isinstance(fb_c, Configuration):
        #         raise ValueError(f"{fb_c!r} is not an instance of {Configuration}")
        self.fallback_configs = fallback_configs
项目:shanghai    作者:chireiden    | 项目源码 | 文件源码
def __init__(self, name: str, mapping: Mapping, *fallback_configs: Configuration) -> None:
        super().__init__(mapping, *fallback_configs)
        self.name = name

        self._require_keys({'nick', 'user', 'realname'})
        self._fix_channels(mapping)

        self.servers = self._parse_servers(mapping)
项目:shanghai    作者:chireiden    | 项目源码 | 文件源码
def _fix_channels(mapping: Mapping) -> None:
        # TODO move to channels core plugin
        for channel, channel_conf in mapping.get('channels', {}).items():
            if channel_conf is None:
                mapping['channels'][channel] = channel_conf = {}

            # replace channel names 'foobar' with '#foobar'
            if not channel.startswith(tuple('#&+!')):
                del mapping['channels'][channel]
                mapping['channels'][f'#{channel}'] = channel_conf
项目:shanghai    作者:chireiden    | 项目源码 | 文件源码
def __init__(self, mapping: Mapping) -> None:
        super().__init__(mapping)
        self.networks = list(self._parse_networks(mapping))
项目:shanghai    作者:chireiden    | 项目源码 | 文件源码
def _parse_networks(self, root: Mapping) -> List[NetworkConfiguration]:
        networks = root.get('networks', None)
        if networks is None:
            raise ConfigurationError("No networks found")

        return [NetworkConfiguration(name, mapping, self)
                for name, mapping in networks.items()]
项目:Excalibot    作者:endreman0    | 项目源码 | 文件源码
def __setitem__(self, key, value):
        if isinstance(value, Mapping):
            value = Config(**value)
        self._values[key] = value
项目:Excalibot    作者:endreman0    | 项目源码 | 文件源码
def populate(self, values):
        for key, value in values.items():
            if key not in self._values: continue # Automatically delete removed settings
            default = self._values[key]
            if isinstance(default, Config) and isinstance(value, Mapping):
                default.populate(value)
            else:
                self._values[key] = value
项目:pionic    作者:tlocke    | 项目源码 | 文件源码
def _dump(obj, indent):
    if isinstance(obj, Mapping):
        new_indent = indent + '  '
        items = []
        for k, v in sorted(obj.items()):
            items.append(
                '\n' + new_indent + "'" + k + "': " + _dump(v, new_indent))
        return '{' + ','.join(items) + '}'
    elif isinstance(obj, bool):
        return 'true' if obj else 'false'
    elif isinstance(obj, list):
        new_indent = indent + '  '
        b = ','.join('\n' + new_indent + _dump(v, new_indent) for v in obj)
        return '[' + b + ']'
    elif isinstance(obj, (int, float, Decimal)):
        return str(obj)
    elif obj is None:
        return 'null'
    elif isinstance(obj, str):
        quote = LONG_QUOTE if '\n' in obj or '\r' in obj else SHORT_QUOTE
        return quote + obj + quote
    elif isinstance(obj, bytearray):
        return '{{ ' + b64encode(obj).decode() + ' }}'
    elif isinstance(obj, bytes):
        return "{{ '" + obj.decode() + "' }}"
    elif isinstance(obj, Datetime):
        if obj.tzinfo is None or obj.tzinfo == Timezone.utc:
            fmt = UTC_FORMAT
        else:
            fmt = TZ_FORMAT
        return obj.strftime(fmt)
    else:
        raise PionException("Type " + str(type(obj)) + " not recognised.")
项目:hyperlink    作者:python-hyper    | 项目源码 | 文件源码
def iter_pairs(iterable):
    """
    Iterate over the (key, value) pairs in ``iterable``.

    This handles dictionaries sensibly, and falls back to assuming the
    iterable yields (key, value) pairs. This behaviour is similar to
    what Python's ``dict()`` constructor does.
    """
    if isinstance(iterable, Mapping):
        iterable = iterable.items()
    return iter(iterable)
项目:aiohttp-three-template    作者:RobertoPrevato    | 项目源码 | 文件源码
def __new__(cls, arg):
        if isinstance(arg, abc.Mapping):
            return super().__new__(cls)
        elif isinstance(arg, abc.MutableSequence):
            return [cls(item) for item in arg]
        else:
            return arg
项目:pudzu    作者:Udzu    | 项目源码 | 文件源码
def make_mapping(v, key_fn=identity):
    """Return a mapping from an object, using a function to generate keys if needed.
    Mappings are left as is, iterables are split into elements, everything else is
    wrapped in a singleton map."""
    if v is None: return {}
    elif isinstance(v, Mapping): return v
    elif non_string_iterable(v): return { ignoring_extra_args(key_fn)(i, x) : x for (i,x) in enumerate(v) }
    else: return { ignoring_extra_args(key_fn)(None, v) : v }
项目:pudzu    作者:Udzu    | 项目源码 | 文件源码
def __init__(self, d={}, normalize=str.lower, base_factory=dict):
        self.normalize = normalize
        self._d = base_factory()
        self._k = {}
        if isinstance(d, abc.Mapping):
            for k, v in d.items():
                self.__setitem__(k, v)
        elif isinstance(d, abc.Iterable):
            for (k, v) in d:
                self.__setitem__(k, v)
项目:stig    作者:rndusr    | 项目源码 | 文件源码
def __repr__(self):
        if isinstance(self._args, abc.Sequence):
            argstr = ', '.join(repr(arg) for arg in self._args)
        elif isinstance(self._args, abc.Mapping):
            argstr = ', '.join('%s=%r' % (k,v)
                                for k,v in self._args.items())
        provides = '/'.join(interface for interface in self.provides)
        string = '<Command [{}] {}({})'.format(provides, self.name, argstr)
        if self.finished:
            string += ' success={}'.format(self.success)
        else:
            string += ' running'
        return string + '>'
项目:multidict    作者:aio-libs    | 项目源码 | 文件源码
def getone(self, key, default=_marker):
        """Get first value matching the key."""
        identity = self._title(key)
        for i, k, v in self._impl._items:
            if i == identity:
                return v
        if default is not _marker:
            return default
        raise KeyError('Key not found: %r' % key)

    # Mapping interface #
项目:multidict    作者:aio-libs    | 项目源码 | 文件源码
def clear(self):
        """Remove all items from MultiDict."""
        self._impl._items.clear()
        self._impl.incr_version()

    # Mapping interface #
项目:multidict    作者:aio-libs    | 项目源码 | 文件源码
def test_abc_inheritance():
    assert issubclass(MultiMapping, Mapping)
    assert not issubclass(MultiMapping, MutableMapping)
    assert issubclass(MutableMultiMapping, Mapping)
    assert issubclass(MutableMultiMapping, MutableMapping)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_Mapping(self):
        for sample in [dict]:
            self.assertIsInstance(sample(), Mapping)
            self.assertTrue(issubclass(sample, Mapping))
        self.validate_abstract_methods(Mapping, '__contains__', '__iter__', '__len__',
            '__getitem__')
        class MyMapping(Mapping):
            def __len__(self):
                return 0
            def __getitem__(self, i):
                raise IndexError
            def __iter__(self):
                return iter(())
        self.validate_comparison(MyMapping())
项目:pep    作者:pepkit    | 项目源码 | 文件源码
def atacseq_iface_without_resources():
    """
    Provide the ATAC-Seq pipeline interface as a fixture, without resources.

    Note that this represents the configuration data for the interface for a
    single pipeline. In order to use this in the form that a PipelineInterface
    expects, this needs to be the value to which a key is mapped within a
    larger Mapping.

    :return Mapping: all of the pipeline interface configuration data for
        ATAC-Seq, minus the resources section
    """
    return {
        "name": "ATACseq",
        "looper_args": True,
        "required_input_files": ["read1", "read2"],
        "all_input_files": ["read1", "read2"],
        "ngs_input_files": ["read1", "read2"],
        "arguments": {
            "--sample-name": "sample_name",
            "--genome": "genome",
            "--input": "read1",
            "--input2": "read2",
            "--single-or-paired": "read_type"
        },
        "optional_arguments": {
            "--frip-ref-peaks": "FRIP_ref",
            "--prealignments": "prealignments",
            "--genome-size": "macs_genome_size"
        }
    }
项目:pep    作者:pepkit    | 项目源码 | 文件源码
def atacseq_iface_with_resources(
        atacseq_iface_without_resources, resources):
    """

    :param dict atacseq_iface_without_resources: PipelineInterface config
        data, minus a resources section
    :param Mapping resources: resources section of PipelineInterface
        configuration data
    :return Mapping: pipeline interface data for ATAC-Seq pipeline, with all
        of the base sections plus resources section
    """
    iface_data = copy.deepcopy(atacseq_iface_without_resources)
    iface_data["resources"] = copy.deepcopy(resources)
    return iface_data
项目:pep    作者:pepkit    | 项目源码 | 文件源码
def piface_config_bundles(request, resources):
    """
    Provide the ATAC-Seq pipeline interface as a fixture, including resources.

    Note that this represents the configuration data for the interface for a
    single pipeline. In order to use this in the form that a PipelineInterface
    expects, this needs to be the value to which a key is mapped within a
    larger Mapping.

    :param pytest._pytest.fixtures.SubRequest request: hook into test case
        requesting this fixture, which is queried for a resources value with
        which to override the default if it's present.
    :param Mapping resources: pipeline interface resource specification
    :return Iterable[Mapping]: collection of bundles of pipeline interface
        configuration bundles
    """
    iface_config_datas = request.getfixturevalue("config_bundles")
    if isinstance(iface_config_datas, Mapping):
        data_bundles = iface_config_datas.values()
    elif isinstance(iface_config_datas, Iterable):
        data_bundles = iface_config_datas
    else:
        raise TypeError("Expected mapping or list collection of "
                        "PipelineInterface data: {} ({})".format(
                iface_config_datas, type(iface_config_datas)))
    resource_specification = request.getfixturevalue("resources") \
            if "resources" in request.fixturenames else resources
    for config_bundle in data_bundles:
        config_bundle.update(resource_specification)
    return iface_config_datas
项目:chat    作者:cambridgeltl    | 项目源码 | 文件源码
def extend(self, iterable):
        """ Concatenate two lists by adding a list of extra items to the end
        of this list. Each item added must be capable of being unpacked into a
        key-value pair.

        >>> kvl = KeyValueList([('one', 'eins'), ('two', 'zwei')])
        >>> kvl.extend([('three', 'drei'), ('four', 'vier')])
        >>> for item in kvl:
        ...     print(item)
        ('one', 'eins')
        ('two', 'zwei')
        ('three', 'drei')
        ('four', 'vier')
        >>> kvl.extend(['five', 'six'])
        Traceback (most recent call last):
          File "<stdin>", line 1, in ?
        ValueError: KeyValueList items must be pairs

        """
        if isinstance(iterable, Mapping):
            list.extend(self, iterable.items())
        else:
            try:
                list.extend(self, ((k, v) for k, v in iterable))
            except ValueError:
                raise ValueError("KeyValueList items must be pairs")
项目:chat    作者:cambridgeltl    | 项目源码 | 文件源码
def extend(self, iterable):
        """ Concatenate two lists by adding a list of extra items to the end
        of this list. Each item added must be capable of being unpacked into a
        key-value pair.

        >>> kvl = KeyValueList([('one', 'eins'), ('two', 'zwei')])
        >>> kvl.extend([('three', 'drei'), ('four', 'vier')])
        >>> for item in kvl:
        ...     print(item)
        ('one', 'eins')
        ('two', 'zwei')
        ('three', 'drei')
        ('four', 'vier')
        >>> kvl.extend(['five', 'six'])
        Traceback (most recent call last):
          File "<stdin>", line 1, in ?
        ValueError: KeyValueList items must be pairs

        """
        if isinstance(iterable, Mapping):
            list.extend(self, iterable.items())
        else:
            try:
                list.extend(self, ((k, v) for k, v in iterable))
            except ValueError:
                raise ValueError("KeyValueList items must be pairs")
项目:aiomeasures    作者:cookkkie    | 项目源码 | 文件源码
def format_tags(obj, defaults=None):
    result = set()
    for src in (obj, defaults):
        if isinstance(src, Mapping):
            result.update(['%s:%s' % (k, v) for k, v in src.items()])
        elif isinstance(src, list):
            result.update(src)
        elif isinstance(src, str):
            result.add(src)
    return sorted(result)
项目:aiomeasures    作者:cookkkie    | 项目源码 | 文件源码
def format_tags(obj, defaults=None):
    result = set()
    for src in (obj, defaults):
        if isinstance(src, Mapping):
            result.update(['%s:%s' % (k, v) for k, v in src.items()])
        elif isinstance(src, list):
            result.update(src)
        elif isinstance(src, str):
            result.add(src)
    return sorted(result)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_Mapping(self):
        for sample in [dict]:
            self.assertIsInstance(sample(), Mapping)
            self.assertTrue(issubclass(sample, Mapping))
        self.validate_abstract_methods(Mapping, '__contains__', '__iter__', '__len__',
            '__getitem__')
        class MyMapping(Mapping):
            def __len__(self):
                return 0
            def __getitem__(self, i):
                raise IndexError
            def __iter__(self):
                return iter(())
        self.validate_comparison(MyMapping())
项目:gitsome    作者:donnemartin    | 项目源码 | 文件源码
def _merge_envs(self, merge_envs, re_env):
        new_env = {}
        for e in merge_envs:
            if e == 'replay':
                new_env.update(re_env)
            elif e == 'native':
                new_env.update(builtins.__xonsh_env__)
            elif isinstance(e, Mapping):
                new_env.update(e)
            else:
                raise TypeError('Type of env not understood: {0!r}'.format(e))
        new_env = Env(**new_env)
        return new_env