Python attr 模块,asdict() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用attr.asdict()

项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def json_ready_header_auth(header_auth):
    # type: (MessageHeaderAuthentication) -> Dict[str, Text]
    """Create a JSON-serializable representation of a
    :class:`aws_encryption_sdk.internal.structures.MessageHeaderAuthentication`.

    http://docs.aws.amazon.com/encryption-sdk/latest/developer-guide/message-format.html#header-authentication

    :param header_auth: header auth for which to create a JSON-serializable representation
    :type header_auth: aws_encryption_sdk.internal.structures.MessageHeaderAuthentication
    :rtype: dict
    """
    dict_header_auth = attr.asdict(header_auth)

    for key, value in dict_header_auth.items():
        dict_header_auth[key] = unicode_b64_encode(value)

    return dict_header_auth
项目:cattrs    作者:Tinche    | 项目源码 | 文件源码
def test_structure_simple_from_dict_default(converter, cl_and_vals, data):
    """Test structuring non-nested attrs classes with default value."""
    cl, vals = cl_and_vals
    obj = cl(*vals)
    attrs_with_defaults = [a for a in fields(cl)
                           if a.default is not NOTHING]
    to_remove = data.draw(lists(elements=sampled_from(attrs_with_defaults),
                                unique=True))

    for a in to_remove:
        if isinstance(a.default, Factory):
            setattr(obj, a.name, a.default.factory())
        else:
            setattr(obj, a.name, a.default)

    dumped = asdict(obj)

    for a in to_remove:
        del dumped[a.name]

    assert obj == converter.structure(dumped, cl)
项目:cattrs    作者:Tinche    | 项目源码 | 文件源码
def test_structure_union(converter, cl_and_vals_a, cl_and_vals_b):
    """Structuring of automatically-disambiguable unions works."""
    # type: (Converter, Any, Any) -> None
    cl_a, vals_a = cl_and_vals_a
    cl_b, vals_b = cl_and_vals_b
    a_field_names = {a.name for a in fields(cl_a)}
    b_field_names = {a.name for a in fields(cl_b)}
    assume(a_field_names)
    assume(b_field_names)

    common_names = a_field_names & b_field_names
    if len(a_field_names) > len(common_names):
        obj = cl_a(*vals_a)
        dumped = asdict(obj)
        res = converter.structure(dumped, Union[cl_a, cl_b])
        assert isinstance(res, cl_a)
        assert obj == res
项目:cattrs    作者:Tinche    | 项目源码 | 文件源码
def test_fallback(cl_and_vals):
    """The fallback case works."""
    cl, vals = cl_and_vals

    assume(attr.fields(cl))  # At least one field.

    @attr.s
    class A(object):
        pass

    fn = create_uniq_field_dis_func(A, cl)

    assert fn({}) is A
    assert fn(attr.asdict(cl(*vals))) is cl

    attr_names = {a.name for a in attr.fields(cl)}

    if 'xyz' not in attr_names:
        fn({'xyz': 1}) is A  # Uses the fallback.
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def to_yaml(cls: Type['File'], instance: 'File') -> Mapping:
        """Represent the instance as YAML node.

        Keyword arguments:
            instance: The File to be represented.

        Returns:
            YAML representation of the instance.
        """

        # Dump mod part
        columns = (str(c).split('.')[-1] for c in Mod.__table__.columns)
        yml = {f: getattr(instance.mod, f) for f in columns}

        # Dump the file part
        yml['file'] = attr.asdict(instance)
        for field in ('mod',):
            del yml['file'][field]

        return yml
项目:game    作者:chrisnorman7    | 项目源码 | 文件源码
def dump_object(obj):
    """Get this object as a dict."""
    def should_dump(self, attr, value):
        """Decide if the attribute should be dumped or not."""
        column = self.__table__.c[attr.name]
        default = column.default
        if default is not None:
            return not value == default.arg
        elif self.__table__.c[attr.name].nullable and value is None:
            return False
        else:
            return True
    d = asdict(obj, filter=partial(should_dump, obj))
    if isinstance(
        obj,
        db.SpacialObject
    ) and 'type' in d:  # obj.type is not None
        d['type'] = d['type'].name
    return d
项目:hh-page-classifier    作者:TeamHG-Memex    | 项目源码 | 文件源码
def train_model(self, request: Dict) -> Dict:
        ws_id = request['workspace_id']
        try:
            pages = request['pages']
            pages = self._fetch_pages_html(pages)
            result = train_model(
                pages, model_cls=self.model_cls,
                progress_callback=partial(self.progress_callback, ws_id=ws_id),
                **self.model_kwargs)
        except Exception as e:
            logging.error('Failed to train a model', exc_info=e)
            result = ModelMeta(
                model=None,
                meta=Meta(advice=[AdviceItem(
                    ERROR,
                    'Unknown error while training a model: {}'.format(e))]))
        return {
            'workspace_id': ws_id,
            'quality': json.dumps(attr.asdict(result.meta)),
            'model': (encode_object(result.model) if result.model is not None
                      else None),
        }
项目:hh-page-classifier    作者:TeamHG-Memex    | 项目源码 | 文件源码
def test_single_domain():
    docs = [{'html': 'foo{} bar'.format(i % 4),
             'url': 'http://example.com/{}'.format(i),
             'relevant': i % 2 == 0}
            for i in range(10)]
    result = train_model(docs)
    pprint(attr.asdict(result.meta))
    assert lst_as_dict(result.meta.advice)[:2] == [
        {'kind': 'Warning',
         'text': "Only 1 relevant domain in data means that it's impossible to do "
                 'cross-validation across domains, and will likely result in '
                 'model over-fitting.'},
        {'kind': 'Warning',
         'text': 'Number of human labeled documents is just 10, consider having '
                 'at least 100 labeled.'},
    ]
    assert lst_as_dict(result.meta.description)[:3] == [
        {'heading': 'Dataset',
         'text': '10 documents, 10 labeled across 1 domain.'},
        {'heading': 'Class balance',
         'text': '50% relevant, 50% not relevant.'},
        {'heading': 'Metrics', 'text': ''},
    ]
    assert result.model is None
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def json_ready_header(header):
    # type: (MessageHeader) -> Dict[str, Any]
    """Create a JSON-serializable representation of a :class:`aws_encryption_sdk.structures.MessageHeader`.

    http://docs.aws.amazon.com/encryption-sdk/latest/developer-guide/message-format.html#header-structure

    :param header: header for which to create a JSON-serializable representation
    :type header: aws_encryption_sdk.structures.MessageHeader
    :rtype: dict
    """
    dict_header = attr.asdict(header)

    del dict_header['content_aad_length']
    dict_header['version'] = str(float(dict_header['version'].value))
    dict_header['algorithm'] = dict_header['algorithm'].name

    for key, value in dict_header.items():
        if isinstance(value, Enum):
            dict_header[key] = value.value

    dict_header['message_id'] = unicode_b64_encode(dict_header['message_id'])

    dict_header['encrypted_data_keys'] = sorted(
        list(dict_header['encrypted_data_keys']),
        key=lambda x: six.b(x['key_provider']['provider_id']) + x['key_provider']['key_info']
    )
    for data_key in dict_header['encrypted_data_keys']:
        data_key['key_provider']['provider_id'] = unicode_b64_encode(six.b(data_key['key_provider']['provider_id']))
        data_key['key_provider']['key_info'] = unicode_b64_encode(data_key['key_provider']['key_info'])
        data_key['encrypted_data_key'] = unicode_b64_encode(data_key['encrypted_data_key'])

    return dict_header
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def list_asdict(in_list: Iterable[Any]) -> List[Any]:
    """asdict'ify a list of objects.

    Useful when converting a list of objects to json.
    """
    return [asdict(obj) for obj in in_list]


# noinspection PyUnusedLocal
项目:claimchain-core    作者:gdanezis    | 项目源码 | 文件源码
def export(self):
        return asdict(self)
项目:claimchain-core    作者:gdanezis    | 项目源码 | 文件源码
def public_export(self):
        result = {}
        for name, attr in asdict(self, recurse=False).items():
            if isinstance(attr, Keypair):
                result[name + '_pk'] = pet2ascii(attr.pk)
        return result
项目:Codado    作者:corydodt    | 项目源码 | 文件源码
def _orderedCleanDict(attrsObj):
    """
    -> dict with false-values removed

    Also evaluates attr-instances for false-ness by looking at the values of their properties
    """
    def _filt(k, v):
        if attr.has(v):
            return not not any(attr.astuple(v))
        return not not v

    return attr.asdict(attrsObj,
        dict_factory=UnsortableOrderedDict,
        recurse=False,
        filter=_filt)
项目:django-postcode-lookup    作者:LabD    | 项目源码 | 文件源码
def json(self):
        return attr.asdict(self)
项目:pysoa    作者:eventbrite    | 项目源码 | 文件源码
def handle_next_request(self):
        # Get the next JobRequest
        try:
            request_id, meta, request_message = self.transport.receive_request_message()
        except MessageReceiveTimeout:
            # no new message, nothing to do
            return
        if meta.setdefault('__request_serialized__', True) is False:
            # The caller is a new client that did not double-serialize, so do not double-deserialize
            job_request = request_message
        else:
            # The caller is an old client that double-serialized, so be sure to double-deserialize
            # TODO: Remove this and the serializer in version >= 0.25.0
            job_request = self.serializer.blob_to_dict(request_message)
        self.job_logger.info('Job request: %s', job_request)

        # Process and run the Job
        job_response = self.process_job(job_request)

        # Send the JobResponse
        response_dict = {}
        try:
            response_dict = attr.asdict(job_response, dict_factory=UnicodeKeysDict)
            if meta['__request_serialized__'] is False:
                # Match the response serialization behavior to the request serialization behavior
                response_message = response_dict
            else:
                # TODO: Remove this and the serializer in version >= 0.25.0
                response_message = self.serializer.dict_to_blob(response_dict)
        except Exception as e:
            self.metrics.counter('server.error.serialization_failure').increment()
            job_response = self.handle_error(e, variables={'job_response': response_dict})
            response_dict = attr.asdict(job_response, dict_factory=UnicodeKeysDict)
            if meta['__request_serialized__'] is False:
                # Match the response serialization behavior to the request serialization behavior
                response_message = response_dict
            else:
                # TODO: Remove this and the serializer in version >= 0.25.0
                response_message = self.serializer.dict_to_blob(response_dict)
        self.transport.send_response_message(request_id, meta, response_message)
        self.job_logger.info('Job response: %s', response_dict)
项目:pysoa    作者:eventbrite    | 项目源码 | 文件源码
def _base_send_request(self, request_id, meta, job_request):
        with self.metrics.timer('client.send.excluding_middleware'):
            if isinstance(job_request, JobRequest):
                job_request = attr.asdict(job_request, dict_factory=UnicodeKeysDict)
            meta['__request_serialized__'] = True
            self.transport.send_request_message(
                request_id,
                meta,
                self.serializer.dict_to_blob(job_request),
            )
            # meta['__request_serialized__'] = False
            # self.transport.send_request_message(request_id, meta, job_request)
项目:pyventory    作者:lig    | 项目源码 | 文件源码
def ansible_inventory(hosts, out=sys.stdout, indent=None):
    inventory = Inventory(hosts)

    data = OrderedDict(
        (name, attr.asdict(group, dict_factory=OrderedDict))
        for name, group in inventory.groups.items())

    for group in data.values():
        for attr_name in ('hosts', 'vars', 'children',):
            if not group[attr_name]:
                del group[attr_name]

    data['_meta'] = {'hostvars': inventory.hosts.copy()}

    json.dump(data, out, indent=indent, default=list)
项目:beeswax-api    作者:hbmartin    | 项目源码 | 文件源码
def __str__(self):
        return json.dumps(attr.asdict(self, filter=lambda _attr, value: value is not None))
项目:beeswax-api    作者:hbmartin    | 项目源码 | 文件源码
def __str__(self):
        return json.dumps(attr.asdict(self, filter=lambda _attr, value: value is not None))
项目:beeswax-api    作者:hbmartin    | 项目源码 | 文件源码
def __str__(self):
        return json.dumps(attr.asdict(self, filter=lambda _attr, value: value is not None))
项目:cattrs    作者:Tinche    | 项目源码 | 文件源码
def test_attrs_asdict_unstructure(converter, nested_class):
    # type: (Converter, Type) -> None
    """Our dumping should be identical to `attrs`."""
    instance = nested_class[0]()
    assert converter.unstructure(instance) == asdict(instance)
项目:cattrs    作者:Tinche    | 项目源码 | 文件源码
def test_structure_simple_from_dict(converter, cl_and_vals):
    # type: (Converter, Any) -> None
    """Test structuring non-nested attrs classes dumped with asdict."""
    cl, vals = cl_and_vals
    obj = cl(*vals)

    dumped = asdict(obj)
    loaded = converter.structure(dumped, cl)

    assert obj == loaded
项目:osp-scraper    作者:opensyllabus    | 项目源码 | 文件源码
def asdict(self):
        """return a dict representation of the filter"""
        return attr.asdict(self)
项目:labgrid    作者:labgrid-project    | 项目源码 | 文件源码
def asdict(self):
        return {
            'cls': self.cls,
            'params': self.params,
            'acquired': self.acquired,
            'avail': self.avail,
        }
项目:labgrid    作者:labgrid-project    | 项目源码 | 文件源码
def asdict(self):
        result = attr.asdict(self)
        del result['name']  # the name is the key in the places dict
        return result
项目:kubeque    作者:broadinstitute    | 项目源码 | 文件源码
def dumpjob_cmd(jq, io, args):
    import attr
    tasks_as_dicts = []
    jobid = _resolve_jobid(jq, args.jobid)
    job = jq.get_job(jobid)
    job = attr.asdict(job)
    tasks = jq.get_tasks(jobid)
    for task in tasks:
        t = attr.asdict(task)

        task_args = io.get_as_str(task.args)
        t['args_url'] = t['args']
        t['args'] = json.loads(task_args)
        tasks_as_dicts.append(t)
    print(json.dumps(dict(job=job, tasks=tasks_as_dicts), indent=2, sort_keys=True))
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def test_auth_loading(dummy_auth):
    """Is the authentication properly loaded from file?"""

    correct = StringIO(yaml.dump(attr.asdict(dummy_auth)))
    empty = StringIO()

    assert proxy.Authorization.load(correct) == dummy_auth

    with pytest.raises(exceptions.InvalidStream):
        proxy.Authorization.load(empty)
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def test_auth_store(dummy_auth):
    """Is the authentication properly stored into a file?"""

    buffer = StringIO()
    dummy_auth.dump(buffer)

    data = yaml.load(buffer.getvalue())

    assert data == attr.asdict(dummy_auth)


# Function tests

# # Dependency resolution tests
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def __call__(self, req: requests.Request) -> requests.Request:
        """Make the request authenticated."""

        header_fmt = 'Token {user_id}:{token}'
        req.headers['Authorization'] = header_fmt.format_map(attr.asdict(self))

        return req
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def dump(self, file: TextIO) -> None:
        """Store credentials for future use.

        Keyword arguments:
            file: Open YAML text stream to write to.
        """

        yaml.dump(attr.asdict(self), file)
项目:k8s-snapshots    作者:miracle2k    | 项目源码 | 文件源码
def to_dict(self) -> Dict[str, Any]:
        return attr.asdict(self)
项目:k8s-snapshots    作者:miracle2k    | 项目源码 | 文件源码
def __structlog__(self):
        if attr.has(self.__class__):
            return attr.asdict(self)

        if hasattr(self, 'to_dict') and callable(self.to_dict):
            return self.to_dict()

        return self
项目:niceman    作者:ReproNim    | 项目源码 | 文件源码
def test_conda_manager_identify_distributions(get_conda_test_dir):
    # Skip if network is not available (skip_if_no_network fails with fixtures)
    test_dir = get_conda_test_dir
    files = [os.path.join(test_dir, "miniconda/bin/sqlite3"),
             os.path.join(test_dir, "miniconda/envs/mytest/bin/xz"),
             os.path.join(test_dir, "miniconda/envs/mytest/lib/python2.7/site-packages/pip/index.py"),
             os.path.join(test_dir, "miniconda/envs/mytest/lib/python2.7/site-packages/rpaths.py"),
             "/sbin/iptables"]
    tracer = CondaTracer()
    dists = list(tracer.identify_distributions(files))

    assert len(dists) == 1, "Exactly one Conda distribution expected."

    (distributions, unknown_files) = dists[0]

    assert unknown_files == ["/sbin/iptables"], \
        "Exactly one file (/sbin/iptables) should not be discovered."

    assert len(distributions.environments) == 2, \
        "Two conda environments are expected."

    out = {'environments': [{'packages': [{'files': ['bin/sqlite3'],
                                           'name': 'sqlite'}]},
                            {'packages': [{'files': ['bin/xz'],
                                           'name': 'xz'},
                                          {'files': ['lib/python2.7/site-packages/pip/index.py'],
                                           'name': 'pip'},
                                          {'files': ['lib/python2.7/site-packages/rpaths.py'],
                                           'installer': 'pip',
                                           'name': 'rpaths'}
                                          ]
                             }
                            ]
           }
    assert_is_subset_recur(out, attr.asdict(distributions), [dict, list])
    NicemanProvenance.write(sys.stdout, distributions)
    print(json.dumps(unknown_files, indent=4))
项目:niceman    作者:ReproNim    | 项目源码 | 文件源码
def yaml_representer(dumper, data):

        ordered_items = filter(
            lambda i: bool(i[1]),  # so only non empty/None
            attr.asdict(
                data, dict_factory=collections.OrderedDict).items())
        return dumper.represent_mapping('tag:yaml.org,2002:map', ordered_items)
项目:synpurge    作者:aperezdc    | 项目源码 | 文件源码
def asdict(self):
        d = attr.asdict(self)
        d["aliases"] = list(d["aliases"])
        return d
项目:synpurge    作者:aperezdc    | 项目源码 | 文件源码
def open(db_conf):
    from postgresql import driver
    from .pglib import category
    conn_params = attr.asdict(db_conf)
    for key in ("clean_interval", "clean_full", "reindex_interval", "reindex_full"):
        del conn_params[key]
    return Database(driver.connect(category=category, **conn_params),
                    db_conf.database or db_conf.user)
项目:clpa    作者:clpn    | 项目源码 | 文件源码
def keys(self):
        return attr.asdict(self).keys()
项目:clpa    作者:clpn    | 项目源码 | 文件源码
def values(self):
        return attr.asdict(self).values()
项目:clpa    作者:clpn    | 项目源码 | 文件源码
def items(self):
        return attr.asdict(self).items()
项目:maas    作者:maas    | 项目源码 | 文件源码
def asdict(self):
        """Convert to a dictionary."""
        return attr.asdict(self)
项目:maas    作者:maas    | 项目源码 | 文件源码
def asdict(self):
        """Convert to a dictionary."""
        return attr.asdict(self)
项目:aws-encryption-sdk-python    作者:awslabs    | 项目源码 | 文件源码
def __repr__(self):
        """Builds the proper repr string."""
        return '{name}({kwargs})'.format(
            name=self.__class__.__name__,
            kwargs=', '.join(
                '{key}={value}'.format(
                    key=key,
                    value=value
                ) for key, value in sorted(
                    attr.asdict(self.config, recurse=True).items(),
                    key=lambda x: x[0]
                )
            )
        )
项目:hh-page-classifier    作者:TeamHG-Memex    | 项目源码 | 文件源码
def lst_as_dict(lst):
    return [attr.asdict(x) for x in lst]
项目:hh-page-classifier    作者:TeamHG-Memex    | 项目源码 | 文件源码
def test_train_model():
    data = fetch_20newsgroups(
        random_state=42,
        categories=['sci.crypt', 'sci.electronics', 'sci.med', 'sci.space'])
    limit = 200
    if limit is not None:
        data['target'] = data['target'][:limit]
        data['data'] = data['data'][:limit]
    n_domains = int(len(data['target']) / 5)
    docs = [
        {
            'html': '\n'.join('<p>{}</p>'.format(t) for t in text.split('\n')),
            'url': 'http://example-{}.com/{}'.format(n % n_domains, n),
            'relevant': {'sci.space': True, 'sci.med': None}.get(
                data['target_names'][target], False),
        }
        for n, (text, target) in enumerate(zip(data['data'], data['target']))]
    result = train_model(docs)
    pprint(attr.asdict(result.meta))
    assert lst_as_dict(result.meta.advice) == [
        {'kind': 'Notice',
         'text': "The quality of the classifier is very good, ROC AUC is 0.96. "
                 "You can label more pages if you want to improve quality, "
                 "but it's better to start crawling "
                 "and check the quality of crawled pages.",
         },
        ]
    assert lst_as_dict(result.meta.description) == [
        {'heading': 'Dataset',
         'text': '200 documents, 159 labeled across 40 domains.'},
        {'heading': 'Class balance',
         'text': '33% relevant, 67% not relevant.'},
        {'heading': 'Metrics', 'text': ''},
        {'heading': 'Accuracy', 'text': '0.881 ± 0.122'},
        {'heading': 'ROC AUC', 'text': '0.964 ± 0.081'}]
    assert len(result.meta.weights['pos']) > 0
    assert len(result.meta.weights['neg']) > 0
    assert isinstance(result.model, BaseModel)
    assert hasattr(result.model, 'predict_proba')
项目:hh-page-classifier    作者:TeamHG-Memex    | 项目源码 | 文件源码
def test_empty():
    result = train_model([])
    pprint(attr.asdict(result.meta))
    assert result.meta == Meta(
        advice=[AdviceItem('Error', 'Can not train a model, no pages given.')])
    assert result.model is None
项目:hh-page-classifier    作者:TeamHG-Memex    | 项目源码 | 文件源码
def test_unlabeled():
    docs = [{'html': 'foo',
             'url': 'http://example{}.com'.format(i),
             'relevant': None}
            for i in range(10)]
    result = train_model(docs)
    pprint(attr.asdict(result.meta))
    assert result.meta == Meta(
        advice=[AdviceItem(
            'Error', 'Can not train a model, no labeled pages given.')])
    assert result.model is None
项目:hh-page-classifier    作者:TeamHG-Memex    | 项目源码 | 文件源码
def test_two_domains():
    docs = [{'html': 'foo{}'.format(i % 3),
             'url': 'http://example{}.com/{}'.format(i % 2, i),
             'relevant': i % 3 == 0}
            for i in range(10)]
    result = train_model(docs)
    pprint(attr.asdict(result.meta))
    assert lst_as_dict(result.meta.advice) == [
        {'kind': 'Warning',
         'text': 'Low number of relevant domains (just 2) might result in model '
                 'over-fitting.'},
        {'kind': 'Warning',
         'text': 'Number of human labeled documents is just 10, consider having '
                 'at least 100 labeled.'},
        {'kind': 'Notice',
         'text': 'The quality of the classifier is very good, ROC AUC is '
                 '1.00. Still, consider fixing warnings shown above.'}]
    assert lst_as_dict(result.meta.description) == [
        {'heading': 'Dataset',
         'text': '10 documents, 10 labeled across 2 domains.'},
        {'heading': 'Class balance',
         'text': '40% relevant, 60% not relevant.'},
        {'heading': 'Metrics', 'text': ''},
        {'heading': 'Accuracy', 'text': '1.000 ± 0.000'},
        {'heading': 'ROC AUC', 'text': '1.000 ± 0.000'}]
    assert result.model is not None
项目:hh-page-classifier    作者:TeamHG-Memex    | 项目源码 | 文件源码
def test_default_clf(model_cls):
    docs = [{'html': 'foo{} bar'.format(i % 4),
             'url': 'http://example{}.com'.format(i),
             'relevant': i % 2 == 0}
            for i in range(100)]
    result = default_train_model(docs, model_cls=model_cls)
    assert result.model is not None
    meta = attr.asdict(result.meta)
    pprint(meta)
    json.dumps(meta)
项目:allure-python    作者:allure-framework    | 项目源码 | 文件源码
def _report_item(self, item):
        indent = INDENT if os.environ.get("ALLURE_INDENT_OUTPUT") else None
        filename = item.file_pattern.format(prefix=uuid.uuid4())
        data = asdict(item, filter=lambda attr, value: not (type(value) != bool and not bool(value)))
        with io.open(os.path.join(self._report_dir, filename), 'w', encoding='utf8') as json_file:
            if sys.version_info.major < 3:
                json_file.write(unicode(json.dumps(data, indent=indent, ensure_ascii=False, encoding='utf8')))
            else:
                json.dump(data, json_file, indent=indent, ensure_ascii=False)
项目:pysoa    作者:eventbrite    | 项目源码 | 文件源码
def test_multiple_requests(self):
        """
        Sending multiple requests with StubClient.send_request for different actions and then
        calling get_all_responses returns responses for all the actions that were called.
        """
        responses = {
            'action_1': {'body': {'foo': 'bar'}, 'errors': []},
            'action_2': {'body': {'baz': 42}, 'errors': []},
            'action_3': {
                'body': {},
                'errors': [
                    {
                        'code': ERROR_CODE_INVALID,
                        'message': 'Invalid input',
                        'field': 'quas.wex',
                        'traceback': None,
                        'variables': None,
                    },
                ],
            },
        }
        self.client.stub_action(SERVICE_NAME, 'action_1', **responses['action_1'])
        self.client.stub_action(SERVICE_NAME, 'action_2', **responses['action_2'])
        self.client.stub_action(SERVICE_NAME, 'action_3', **responses['action_3'])

        control = self.client._make_control_header()
        context = self.client._make_context_header()
        request_1 = dict(control_extra=control, context=context, actions=[
            {'action': 'action_1'},
            {'action': 'action_2'},
        ])
        request_2 = dict(control_extra=control, context=context, actions=[
            {'action': 'action_2'},
            {'action': 'action_1'},
        ])
        request_3 = dict(control_extra=control, context=context, actions=[{'action': 'action_3'}])

        # Store requests by request ID for later verification, because order is not guaranteed
        requests_by_id = {}
        for request in (request_1, request_2, request_3):
            request_id = self.client.send_request(SERVICE_NAME, **request)
            requests_by_id[request_id] = request

        for response_id, response in self.client.get_all_responses(SERVICE_NAME):
            # The client returned the same number of actions as were requested
            self.assertEqual(len(response.actions), len(requests_by_id[response_id]['actions']))
            for i in range(len(response.actions)):
                action_response = response.actions[i]
                # The action name returned matches the action name in the request
                self.assertEqual(action_response.action, requests_by_id[response_id]['actions'][i]['action'])
                # The action response matches the expected response
                # Errors are returned as the Error type, so convert them to dict first
                self.assertEqual(action_response.body, responses[action_response.action]['body'])
                self.assertEqual(
                    [attr.asdict(e, dict_factory=UnicodeKeysDict) for e in action_response.errors],
                    responses[action_response.action]['errors'],
                )