Python oslo_serialization.jsonutils 模块,dumps() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用oslo_serialization.jsonutils.dumps()

项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def add(self):
        params = None
        try:
            params = utils.CNIParameters(flask.request.get_json())
            LOG.debug('Received addNetwork request. CNI Params: %s', params)
            vif = self.plugin.add(params)
            data = jsonutils.dumps(vif.obj_to_primitive())
        except exceptions.ResourceNotReady as e:
            LOG.error("Timed out waiting for requested pod to appear in "
                      "registry: %s.", e)
            return '', httplib.GATEWAY_TIMEOUT, self.headers
        except Exception:
            LOG.exception('Error when processing addNetwork request. CNI '
                          'Params: %s', params)
            return '', httplib.INTERNAL_SERVER_ERROR, self.headers
        return data, httplib.ACCEPTED, self.headers
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_do_POST_populate(self):
        method = 'create'
        path = "http://localhost/populatePool"
        trunk_ips = [u"10.0.0.6"]
        num_ports = 3
        body = jsonutils.dumps({"trunks": trunk_ips,
                                "num_ports": num_ports})
        headers = {'Content-Type': 'application/json', 'Connection': 'close'}
        headers['Content-Length'] = len(body)
        trigger_exception = False

        expected_resp = ('Ports pool at {} was populated with 3 ports.'
                         .format(trunk_ips)).encode()

        self._do_POST_helper(method, path, headers, body, expected_resp,
                             trigger_exception, trunk_ips, num_ports)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_do_POST_populate_exception(self):
        method = 'create'
        path = "http://localhost/populatePool"
        trunk_ips = [u"10.0.0.6"]
        num_ports = 3
        body = jsonutils.dumps({"trunks": trunk_ips,
                                "num_ports": num_ports})
        headers = {'Content-Type': 'application/json', 'Connection': 'close'}
        headers['Content-Length'] = len(body)
        trigger_exception = True

        expected_resp = ('Error while populating pool {0} with {1} ports.'
                         .format(trunk_ips, num_ports)).encode()

        self._do_POST_helper(method, path, headers, body, expected_resp,
                             trigger_exception, trunk_ips, num_ports)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_do_POST_populate_no_trunks(self):
        method = 'create'
        path = "http://localhost/populatePool"
        trunk_ips = []
        num_ports = 3
        body = jsonutils.dumps({"trunks": trunk_ips,
                                "num_ports": num_ports})
        headers = {'Content-Type': 'application/json', 'Connection': 'close'}
        headers['Content-Length'] = len(body)
        trigger_exception = False

        expected_resp = ('Trunk port IP(s) missing.'
                         .format(trunk_ips, num_ports)).encode()

        self._do_POST_helper(method, path, headers, body, expected_resp,
                             trigger_exception, trunk_ips, num_ports)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_do_GET_list(self):
        method = 'list'
        method_resp = ('["10.0.0.6", "9d2b45c4efaa478481c30340b49fd4d2", '
                       '["00efc78c-f11c-414a-bfcd-a82e16dc07d1", '
                       '"fd6b13dc-7230-4cbe-9237-36b4614bc6b5"]] '
                       'has 5 ports')

        path = "http://localhost/listPools"
        body = jsonutils.dumps({})
        headers = {'Content-Type': 'application/json', 'Connection': 'close'}
        headers['Content-Length'] = len(body)
        trigger_exception = False

        expected_resp = ('Pools:\n{0}'.format(method_resp)).encode()

        self._do_GET_helper(method, method_resp, path, headers, body,
                            expected_resp, trigger_exception)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_do_GET_list_exception(self):
        method = 'list'
        method_resp = ('["10.0.0.6", "9d2b45c4efaa478481c30340b49fd4d2", '
                       '["00efc78c-f11c-414a-bfcd-a82e16dc07d1", '
                       '"fd6b13dc-7230-4cbe-9237-36b4614bc6b5"]] '
                       'has 5 ports')

        path = "http://localhost/listPools"
        body = jsonutils.dumps({})
        headers = {'Content-Type': 'application/json', 'Connection': 'close'}
        headers['Content-Length'] = len(body)
        trigger_exception = True

        expected_resp = ('Error listing the pools.').encode()

        self._do_GET_helper(method, method_resp, path, headers, body,
                            expected_resp, trigger_exception)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_do_GET_show(self):
        method = 'show'
        method_resp = "251f748d-2a0d-4143-bce8-2e616f7a6a4a"
        path = "http://localhost/showPool"
        pool_key = [u"10.0.0.6", u"9d2b45c4efaa478481c30340b49fd4d2",
                    [u"00efc78c-f11c-414a-bfcd-a82e16dc07d1",
                     u"fd6b13dc-7230-4cbe-9237-36b4614bc6b5"]]
        body = jsonutils.dumps({"pool_key": pool_key})
        headers = {'Content-Type': 'application/json', 'Connection': 'close'}
        headers['Content-Length'] = len(body)
        trigger_exception = False
        formated_key = (pool_key[0], pool_key[1], tuple(sorted(pool_key[2])))
        expected_resp = ('Pool {0} ports are:\n{1}'
                         .format(formated_key, method_resp)).encode()

        self._do_GET_helper(method, method_resp, path, headers, body,
                            expected_resp, trigger_exception, pool_key)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_do_GET_show_exception(self):
        method = 'show'
        method_resp = "251f748d-2a0d-4143-bce8-2e616f7a6a4a"
        path = "http://localhost/showPool"
        pool_key = [u"10.0.0.6", u"9d2b45c4efaa478481c30340b49fd4d2",
                    [u"00efc78c-f11c-414a-bfcd-a82e16dc07d1",
                     u"fd6b13dc-7230-4cbe-9237-36b4614bc6b5"]]
        body = jsonutils.dumps({"pool_key": pool_key})
        headers = {'Content-Type': 'application/json', 'Connection': 'close'}
        headers['Content-Length'] = len(body)
        trigger_exception = True
        formated_key = (pool_key[0], pool_key[1], tuple(sorted(pool_key[2])))
        expected_resp = ('Error showing pool: {0}.'
                         .format(formated_key)).encode()

        self._do_GET_helper(method, method_resp, path, headers, body,
                            expected_resp, trigger_exception, pool_key)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_do_GET_show_empty(self):
        method = 'show'
        method_resp = "Empty pool"
        path = "http://localhost/showPool"
        pool_key = [u"10.0.0.6", u"9d2b45c4efaa478481c30340b49fd4d2",
                    [u"00efc78c-f11c-414a-bfcd-a82e16dc07d1",
                     u"fd6b13dc-7230-4cbe-9237-36b4614bc6b5"]]
        body = jsonutils.dumps({"pool_key": pool_key})
        headers = {'Content-Type': 'application/json', 'Connection': 'close'}
        headers['Content-Length'] = len(body)
        trigger_exception = False
        formated_key = (pool_key[0], pool_key[1], tuple(sorted(pool_key[2])))
        expected_resp = ('Pool {0} ports are:\n{1}'
                         .format(formated_key, method_resp)).encode()

        self._do_GET_helper(method, method_resp, path, headers, body,
                            expected_resp, trigger_exception, pool_key)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_annotate(self, m_patch, m_count):
        m_count.return_value = list(range(1, 5))
        path = '/test'
        annotations = {'a1': 'v1', 'a2': 'v2'}
        resource_version = "123"
        ret = {'metadata': {'annotations': annotations,
                            "resourceVersion": resource_version}}
        data = jsonutils.dumps(ret, sort_keys=True)

        m_resp = mock.MagicMock()
        m_resp.ok = True
        m_resp.json.return_value = ret
        m_patch.return_value = m_resp

        self.assertEqual(annotations, self.client.annotate(
            path, annotations, resource_version=resource_version))
        m_patch.assert_called_once_with(self.base_url + path,
                                        data=data, headers=mock.ANY,
                                        cert=(None, None), verify=False)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_watch(self, m_get):
        path = '/test'
        data = [{'obj': 'obj%s' % i} for i in range(3)]
        lines = [jsonutils.dumps(i) for i in data]

        m_resp = mock.MagicMock()
        m_resp.ok = True
        m_resp.iter_lines.return_value = lines
        m_get.return_value = m_resp

        cycles = 3
        self.assertEqual(
            data * cycles,
            list(itertools.islice(self.client.watch(path),
                                  len(data) * cycles)))

        self.assertEqual(cycles, m_get.call_count)
        self.assertEqual(cycles, m_resp.close.call_count)
        m_get.assert_called_with(self.base_url + path, headers={}, stream=True,
                                 params={'watch': 'true'}, cert=(None, None),
                                 verify=False)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def _set_lbaas_spec(self, service, lbaas_spec):
        # TODO(ivc): extract annotation interactions
        if lbaas_spec is None:
            LOG.debug("Removing LBaaSServiceSpec annotation: %r", lbaas_spec)
            annotation = None
        else:
            lbaas_spec.obj_reset_changes(recursive=True)
            LOG.debug("Setting LBaaSServiceSpec annotation: %r", lbaas_spec)
            annotation = jsonutils.dumps(lbaas_spec.obj_to_primitive(),
                                         sort_keys=True)
        svc_link = service['metadata']['selfLink']
        ep_link = self._get_endpoints_link(service)
        k8s = clients.get_kubernetes_client()

        try:
            k8s.annotate(ep_link,
                         {k_const.K8S_ANNOTATION_LBAAS_SPEC: annotation})
        except k_exc.K8sClientException:
            # REVISIT(ivc): only raise ResourceNotReady for NotFound
            raise k_exc.ResourceNotReady(ep_link)

        k8s.annotate(svc_link,
                     {k_const.K8S_ANNOTATION_LBAAS_SPEC: annotation},
                     resource_version=service['metadata']['resourceVersion'])
项目:functest    作者:opnfv    | 项目源码 | 文件源码
def run(self):
        """ Override the function run: run testcase and update database """
        update_data = {'task_id': self.args.get('task_id'),
                       'status': 'IN PROGRESS'}
        self.handler.insert(update_data)

        LOGGER.info('Starting running test case')

        try:
            data = self.target(self.args)
        except Exception as err:  # pylint: disable=broad-except
            LOGGER.exception('Task Failed')
            update_data = {'status': 'FAIL', 'error': str(err)}
            self.handler.update_attr(self.args.get('task_id'), update_data)
        else:
            LOGGER.info('Task Finished')
            LOGGER.debug('Result: %s', data)
            new_data = {'status': 'FINISHED',
                        'result': jsonutils.dumps(data.get('result', {}))}

            self.handler.update_attr(self.args.get('task_id'), new_data)
项目:mos-horizon    作者:Mirantis    | 项目源码 | 文件源码
def _get_file_contents(from_data, files):
    if not isinstance(from_data, (dict, list)):
        return
    if isinstance(from_data, dict):
        recurse_data = six.itervalues(from_data)
        for key, value in six.iteritems(from_data):
            if _ignore_if(key, value):
                continue
            if not value.startswith(('http://', 'https://')):
                raise exceptions.GetFileError(value, 'get_file')
            if value not in files:
                file_content = heat_utils.read_url_content(value)
                if template_utils.is_template(file_content):
                    template = get_template_files(template_url=value)[1]
                    file_content = jsonutils.dumps(template)
                files[value] = file_content
    else:
        recurse_data = from_data
    for value in recurse_data:
        _get_file_contents(value, files)
项目:gluon    作者:openstack    | 项目源码 | 文件源码
def sendjson(self, method, urlpath, obj=None):
        """Send json to the OpenDaylight controller."""
        headers = {'Content-Type': 'application/json'}
        data = jsonutils.dumps(obj, indent=2) if obj else None
        url = '/'.join([self.url, urlpath])
        LOG.debug("Sending METHOD (%(method)s) URL (%(url)s) JSON (%(obj)s)" %
                  {'method': method, 'url': url, 'obj': obj})
        r = requests.request(method, url=url,
                             headers=headers, data=data,
                             auth=self.auth, timeout=self.timeout)
        try:
            r.raise_for_status()
        except Exception as ex:
            LOG.error("Error Sending METHOD (%(method)s) URL (%(url)s)"
                      "JSON (%(obj)s) return: %(r)s ex: %(ex)s rtext: "
                      "%(rtext)s" %
                      {'method': method, 'url': url, 'obj': obj, 'r': r,
                       'ex': ex, 'rtext': r.text})
            return r
        try:
            return json.loads(r.content)
        except Exception:
            LOG.debug("%s" % r)
            return
项目:python-bileanclient    作者:openstack    | 项目源码 | 文件源码
def format_nested_dict(d, fields, column_names):
    if d is None:
        return ''
    pt = prettytable.PrettyTable(caching=False, print_empty=False,
                                 header=True, field_names=column_names)
    for n in column_names:
        pt.align[n] = 'l'

    keys = sorted(d.keys())
    for field in keys:
        value = d[field]
        if not isinstance(value, six.string_types):
            value = jsonutils.dumps(value, indent=2, ensure_ascii=False)
        pt.add_row([field, value.strip('"')])

    return pt.get_string()
项目:zun    作者:openstack    | 项目源码 | 文件源码
def update_compute_node(self, context, node_uuid, values):
        if 'uuid' in values:
            msg = _('Cannot overwrite UUID for an existing node.')
            raise exception.InvalidParameterValue(err=msg)

        try:
            target = self.client.read('/compute_nodes/' + node_uuid)
            target_value = json.loads(target.value)
            target_value.update(values)
            target.value = json.dumps(target_value)
            self.client.update(target)
        except etcd.EtcdKeyNotFound:
            raise exception.ComputeNodeNotFound(compute_node=node_uuid)
        except Exception as e:
            LOG.error(
                'Error occurred while updating compute node: %s',
                six.text_type(e))
            raise
        return translate_etcd_result(target, 'compute_node')
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_detach_volume(self,
                           mock_cinder_api_cls,
                           mock_get_connector_prprts,
                           mock_get_volume_connector):
        volume = mock.MagicMock()
        volume.volume_id = self.fake_volume_id
        volume.connection_info = jsonutils.dumps(self.fake_conn_info)
        mock_cinder_api = mock.MagicMock()
        mock_cinder_api_cls.return_value = mock_cinder_api
        mock_connector = mock.MagicMock()
        mock_get_connector_prprts.return_value = self.fake_conn_prprts
        mock_get_volume_connector.return_value = mock_connector

        cinder = cinder_workflow.CinderWorkflow(self.context)
        cinder.detach_volume(volume)

        mock_cinder_api.begin_detaching.assert_called_once_with(
            self.fake_volume_id)
        mock_connector.disconnect_volume.assert_called_once_with(
            self.fake_conn_info['data'], None)
        mock_cinder_api.terminate_connection.assert_called_once_with(
            self.fake_volume_id, self.fake_conn_prprts)
        mock_cinder_api.detach.assert_called_once_with(
            self.fake_volume_id)
        mock_cinder_api.roll_detaching.assert_not_called()
项目:zun    作者:openstack    | 项目源码 | 文件源码
def get_pci_resources(self):
        addresses = []
        try:
            output, status = processutils.execute('lspci', '-D', '-nnmm')
            lines = output.split('\n')
            for line in lines:
                if not line:
                    continue
                columns = line.split()
                address = columns[0]
                addresses.append(address)
        except processutils.ProcessExecutionError:
            raise exception.CommandError(cmd='lspci')

        pci_info = []
        for addr in addresses:
            pci_info.append(self._get_pci_dev_info(addr))

        return jsonutils.dumps(pci_info)
项目:mistral-lib    作者:openstack    | 项目源码 | 文件源码
def serialize(self, entity):
        if entity is None:
            return None

        key = self._get_serialization_key(type(entity))

        # Primitive or not registered type.
        if not key:
            return jsonutils.dumps(
                jsonutils.to_primitive(entity, convert_instances=True)
            )

        serializer = self.serializers.get(key)

        if not serializer:
            raise RuntimeError(
                "Failed to find a serializer for the key: %s" % key
            )

        result = {
            '__serial_key': key,
            '__serial_data': serializer.serialize(entity)
        }

        return jsonutils.dumps(result)
项目:vmware-nsxlib    作者:openstack    | 项目源码 | 文件源码
def test_clear_port_bindings(self):
        fake_port = copy.copy(test_constants.FAKE_PORT)
        fake_port['address_bindings'] = ['a', 'b']
        mocked_resource = self.get_mocked_resource()

        def get_fake_port(*args):
            return fake_port

        mocked_resource.get = get_fake_port
        mocked_resource.update(
            fake_port['id'], fake_port['id'], address_bindings=[])

        fake_port['address_bindings'] = []
        test_client.assert_json_call(
            'put', mocked_resource,
            'https://1.2.3.4/api/v1/logical-ports/%s' % fake_port['id'],
            data=jsonutils.dumps(fake_port, sort_keys=True),
            headers=self.default_headers())
项目:vmware-nsxlib    作者:openstack    | 项目源码 | 文件源码
def test_create_logical_router(self):
        """Test creating a router returns the correct response and 201 status.

        """
        fake_router = test_constants.FAKE_ROUTER.copy()
        router = self.get_mocked_resource()
        tier0_router = True
        description = 'dummy'
        router.create(fake_router['display_name'], None, None, tier0_router,
                      description=description)

        data = {
            'display_name': fake_router['display_name'],
            'router_type': 'TIER0' if tier0_router else 'TIER1',
            'tags': None,
            'description': description
        }

        test_client.assert_json_call(
            'post', router,
            'https://1.2.3.4/api/v1/logical-routers',
            data=jsonutils.dumps(data, sort_keys=True),
            headers=self.default_headers())
项目:vmware-nsxlib    作者:openstack    | 项目源码 | 文件源码
def test_update_advertisement(self):
        router = self.get_mocked_resource()
        router_id = test_constants.FAKE_ROUTER_UUID
        data = {'advertise_nat_routes': 'a',
                'advertise_nsx_connected_routes': 'b',
                'advertise_static_routes': False,
                'enabled': True,
                'advertise_lb_vip': False,
                'advertise_lb_snat_ip': False}
        with mock.patch("vmware_nsxlib.v3.NsxLib.get_version",
                        return_value='2.1.0'), \
            mock.patch.object(router.client, 'get',
                              return_value={}):
            router.update_advertisement(
                router_id, **data)
            test_client.assert_json_call(
                'put', router,
                ('https://1.2.3.4/api/v1/logical-routers/%s/routing/'
                 'advertisement' % router_id),
                data=jsonutils.dumps(data, sort_keys=True),
                headers=self.default_headers())
项目:vmware-nsxlib    作者:openstack    | 项目源码 | 文件源码
def test_update_advertisement_no_lb(self):
        router = self.get_mocked_resource()
        router_id = test_constants.FAKE_ROUTER_UUID
        data = {'advertise_nat_routes': 'a',
                'advertise_nsx_connected_routes': 'b',
                'advertise_static_routes': False,
                'enabled': True}
        with mock.patch("vmware_nsxlib.v3.NsxLib.get_version",
                        return_value='1.1.0'), \
            mock.patch.object(router.client, 'get',
                              return_value={}):
            # lb args will be ignored on this nsx version
            router.update_advertisement(
                router_id,
                advertise_lb_vip=False,
                advertise_lb_snat_ip=False,
                **data)
            test_client.assert_json_call(
                'put', router,
                ('https://1.2.3.4/api/v1/logical-routers/%s/routing/'
                 'advertisement' % router_id),
                data=jsonutils.dumps(data, sort_keys=True),
                headers=self.default_headers())
项目:vmware-nsxlib    作者:openstack    | 项目源码 | 文件源码
def test_update_logical_router_port(self):
        fake_router_port = test_constants.FAKE_ROUTER_PORT.copy()
        uuid = fake_router_port['id']
        fake_relay_uuid = uuidutils.generate_uuid()
        lrport = self.get_mocked_resource()
        with mock.patch.object(lrport, 'get', return_value=fake_router_port),\
            mock.patch("vmware_nsxlib.v3.NsxLib.get_version",
                       return_value='2.0.0'):
            lrport.update(uuid, relay_service_uuid=fake_relay_uuid)
            data = {
                'id': uuid,
                'display_name': fake_router_port['display_name'],
                'logical_router_id': fake_router_port['logical_router_id'],
                'resource_type': fake_router_port['resource_type'],
                "revision": 0,
                'service_bindings': [{'service_id': {
                    'target_type': 'LogicalService',
                    'target_id': fake_relay_uuid}}]
            }

            test_client.assert_json_call(
                'put', lrport,
                'https://1.2.3.4/api/v1/logical-router-ports/%s' % uuid,
                data=jsonutils.dumps(data, sort_keys=True),
                headers=self.default_headers())
项目:vmware-nsxlib    作者:openstack    | 项目源码 | 文件源码
def test_create_ip_pool_no_ranges_with_gateway(self):
        pool = self.get_mocked_resource()
        cidr = '2.2.2.0/30'
        gateway_ip = '2.2.2.1'
        pool.create(cidr, allocation_ranges=None, gateway_ip=gateway_ip)
        exp_ranges = [{'start': '2.2.2.0', 'end': '2.2.2.0'},
                      {'start': '2.2.2.2', 'end': '2.2.2.3'}]

        data = {
            'subnets': [{
                'gateway_ip': gateway_ip,
                'allocation_ranges': exp_ranges,
                'cidr': cidr,
            }]
        }

        test_client.assert_json_call(
            'post', pool,
            'https://1.2.3.4/api/v1/pools/ip-pools',
            data=jsonutils.dumps(data, sort_keys=True),
            headers=self.default_headers())
项目:vmware-nsxlib    作者:openstack    | 项目源码 | 文件源码
def test_create_ip_pool_no_ranges_no_gateway(self):
        pool = self.get_mocked_resource()
        cidr = '2.2.2.0/30'
        pool.create(cidr, allocation_ranges=None)
        exp_ranges = [{'start': '2.2.2.0', 'end': '2.2.2.3'}]

        data = {
            'subnets': [{
                'allocation_ranges': exp_ranges,
                'cidr': cidr,
            }]
        }

        test_client.assert_json_call(
            'post', pool,
            'https://1.2.3.4/api/v1/pools/ip-pools',
            data=jsonutils.dumps(data, sort_keys=True),
            headers=self.default_headers())
项目:vmware-nsxlib    作者:openstack    | 项目源码 | 文件源码
def test_update_metadata_proxy(self):
        fake_md = test_constants.FAKE_MD.copy()
        md = self.get_mocked_resource()
        new_url = "http://2.2.2.20:3500/xyz"
        new_secret = 'abc'
        new_edge = uuidutils.generate_uuid()
        with mock.patch.object(md.client, 'url_get', return_value=fake_md):
            md.update(fake_md['id'], server_url=new_url, secret=new_secret,
                      edge_cluster_id=new_edge)
            fake_md.update({'metadata_server_url': new_url,
                            'secret': new_secret,
                            'edge_cluster_id': new_edge})
            test_client.assert_json_call(
                'put', md,
                'https://1.2.3.4/api/v1/md-proxies/%s' % fake_md['id'],
                data=jsonutils.dumps(fake_md, sort_keys=True),
                headers=self.default_headers())
项目:vmware-nsxlib    作者:openstack    | 项目源码 | 文件源码
def _verify_backend_create(self, mocked_trust, cert_pem):
        """Verify API calls to create cert and identity on backend"""
        # verify API call to import cert on backend
        base_uri = 'https://1.2.3.4/api/v1/trust-management'
        uri = base_uri + '/certificates?action=import'
        expected_body = {'pem_encoded': cert_pem}
        test_client.assert_json_call('post', mocked_trust.client, uri,
                                     single_call=False,
                                     data=jsonutils.dumps(expected_body))

        # verify API call to bind cert to identity on backend
        uri = base_uri + '/principal-identities'
        expected_body = {'name': self.identity,
                         'node_id': self.node_id,
                         'permission_group': 'read_write_api_users',
                         'certificate_id': self.cert_id,
                         'is_protected': True}
        test_client.assert_json_call('post', mocked_trust.client, uri,
                                     single_call=False,
                                     data=jsonutils.dumps(expected_body,
                                                          sort_keys=True))
项目:vmware-nsxlib    作者:openstack    | 项目源码 | 文件源码
def test_json_request(self):
        resp = mocks.MockRequestsResponse(
            200, jsonutils.dumps({'result': {'ok': 200}}))

        api = self.new_mocked_client(client.JSONRESTClient,
                                     session_response=resp,
                                     url_prefix='api/v2/nat')

        resp = api.create(body={'name': 'mgmt-egress'})

        assert_json_call(
            'post', api,
            'https://1.2.3.4/api/v2/nat',
            data=jsonutils.dumps({'name': 'mgmt-egress'}))

        self.assertEqual(resp, {'result': {'ok': 200}})
项目:fuxi    作者:openstack    | 项目源码 | 文件源码
def test_volumedriver_create(self):
        self.volume_providers_setup(['cinder'])
        fake_request = {
            u'Name': u'test-vol',
            u'Opts': {u'size': u'1'},
        }
        for provider in app.volume_providers.values():
            provider.check_exist = mock.MagicMock()
            provider.check_exist.return_value = False
            provider.create = mock.MagicMock()

        response = self.app.post('/VolumeDriver.Create',
                                 content_type='application/json',
                                 data=jsonutils.dumps(fake_request))
        fake_response = {
            u'Err': u''
        }

        self.assertEqual(200, response.status_code)
        self.assertEqual(fake_response, jsonutils.loads(response.data))
项目:fuxi    作者:openstack    | 项目源码 | 文件源码
def test_volumedriver_create_invalid_volume_provider(self):
        self.volume_providers_setup(['cinder'])
        fake_request = {
            u'Name': u'test-vol',
            u'Opts': {u'size': u'1',
                      u'volume_provider': u'provider'}}
        for provider in app.volume_providers.values():
            provider.check_exist = mock.MagicMock()
            provider.check_exist.return_value = False
            provider.create = mock.MagicMock()

        response = self.app.post('VolumeDriver.Create',
                                 content_type='application/json',
                                 data=jsonutils.dumps(fake_request))
        fake_response = {
            u'Err': u''
        }
        self.assertEqual(200, response.status_code)
        self.assertNotEqual(fake_response, jsonutils.loads(response.data))
项目:fuxi    作者:openstack    | 项目源码 | 文件源码
def test_volumedriver_remove(self):
        self.volume_providers_setup(['cinder'])
        fake_request = {
            u'Name': u'test-vol'
        }
        for provider in app.volume_providers.values():
            provider.delete = mock.MagicMock()
            provider.delete.return_value = True

        response = self.app.post('/VolumeDriver.Remove',
                                 content_type='application/json',
                                 data=jsonutils.dumps(fake_request))
        fake_response = {
            u'Err': u''
        }
        self.assertEqual(fake_response, jsonutils.loads(response.data))
项目:fuxi    作者:openstack    | 项目源码 | 文件源码
def test_volumedriver_mount(self):
        self.volume_providers_setup(['cinder'])
        fake_name = u'test-vol'
        fake_request = {
            u'Name': fake_name
        }

        for provider in app.volume_providers.values():
            provider.check_exist = mock.MagicMock()
            provider.check_exist.return_value = True
            provider.mount = mock.MagicMock()
            provider.mount.return_value = fake_mountpoint(fake_name)

        response = self.app.post('/VolumeDriver.Mount',
                                 content_type='application/json',
                                 data=jsonutils.dumps(fake_request))
        fake_response = {
            u'Mountpoint': fake_mountpoint(fake_name),
            u'Err': u''
        }
        self.assertEqual(fake_response, jsonutils.loads(response.data))
项目:fuxi    作者:openstack    | 项目源码 | 文件源码
def test_volumedriver_mount_with_volume_not_exist(self):
        self.volume_providers_setup(['cinder'])
        fake_name = u'test-vol'
        fake_request = {
            u'Name': fake_name,
        }
        for provider in app.volume_providers.values():
            provider.check_exit = mock.MagicMock()
            provider.check_exit.return_value = False
        response = self.app.post('/VolumeDriver.Mount',
                                 content_type='application/json',
                                 data=jsonutils.dumps(fake_request))
        fake_response = {
            u'Mountpoint': fake_mountpoint(fake_name),
            u'Err': u''
        }
        self.assertEqual(200, response.status_code)
        self.assertNotEqual(fake_response, jsonutils.loads(response.data))
项目:fuxi    作者:openstack    | 项目源码 | 文件源码
def test_volumedriver_path(self):
        self.volume_providers_setup(['cinder'])
        fake_name = u'test-vol'
        fake_request = {
            u'Name': fake_name
        }
        for provider in app.volume_providers.values():
            provider.show = mock.MagicMock()
            provider.show.return_value = fake_volume(fake_name)

        response = self.app.post('/VolumeDriver.Path',
                                 content_type='application/json',
                                 data=jsonutils.dumps(fake_request))
        fake_response = {
            u'Mountpoint': fake_mountpoint(fake_name),
            u'Err': u''
        }
        self.assertEqual(fake_response, jsonutils.loads(response.data))
项目:fuxi    作者:openstack    | 项目源码 | 文件源码
def test_volumedriver_path_with_volume_not_exist(self):
        self.volume_providers_setup(['cinder'])
        fake_docker_volume_name = u'test-vol'
        fake_request = {
            u'Name': fake_docker_volume_name
        }
        for provider in app.volume_providers.values():
            provider.show = mock.MagicMock(side_effect=exceptions.NotFound)

        response = self.app.post('/VolumeDriver.Path',
                                 content_type='application/json',
                                 data=jsonutils.dumps(fake_request))
        fake_response = {
            u'Err': u'Mountpoint Not Found'
        }
        self.assertEqual(200, response.status_code)
        self.assertEqual(fake_response, jsonutils.loads(response.data))
项目:fuxi    作者:openstack    | 项目源码 | 文件源码
def test_volumedriver_get_with_volume_not_exist(self):
        self.volume_providers_setup(['cinder'])
        fake_docker_volume_name = u'test-vol'
        fake_request = {
            u'Name': fake_docker_volume_name
        }
        for provider in app.volume_providers.values():
            provider.show = mock.MagicMock(side_effect=exceptions.NotFound())

        response = self.app.post('/VolumeDriver.Get',
                                 content_type='application/json',
                                 data=jsonutils.dumps(fake_request))
        fake_response = {
            u'Err': u'Volume Not Found'
        }
        self.assertEqual(200, response.status_code)
        self.assertEqual(fake_response, jsonutils.loads(response.data))
项目:kuryr-libnetwork    作者:openstack    | 项目源码 | 文件源码
def test_ipam_driver_request_pool_with_default_v6pool(self,
            mock_list_subnetpools):
        fake_kuryr_subnetpool_id = uuidutils.generate_uuid()
        fake_name = 'kuryr6'
        kuryr_subnetpools = self._get_fake_v6_subnetpools(
            fake_kuryr_subnetpool_id, prefixes=['fe80::/64'])
        mock_list_subnetpools.return_value = {
            'subnetpools': kuryr_subnetpools['subnetpools']}

        fake_request = {
            'AddressSpace': '',
            'Pool': '',
            'SubPool': '',  # In the case --ip-range is not given
            'Options': {},
            'V6': True
        }
        response = self.app.post('/IpamDriver.RequestPool',
                                content_type='application/json',
                                data=jsonutils.dumps(fake_request))

        self.assertEqual(200, response.status_code)
        mock_list_subnetpools.assert_called_with(name=fake_name)
        decoded_json = jsonutils.loads(response.data)
        self.assertEqual(fake_kuryr_subnetpool_id, decoded_json['PoolID'])
项目:kuryr-libnetwork    作者:openstack    | 项目源码 | 文件源码
def test_network_driver_endpoint_operational_info_with_no_port(self):
        docker_network_id = lib_utils.get_hash()
        docker_endpoint_id = lib_utils.get_hash()
        fake_port_response = {"ports": []}

        with mock.patch.object(app.neutron, 'list_ports') as mock_list_ports:
            data = {
                'NetworkID': docker_network_id,
                'EndpointID': docker_endpoint_id,
            }

            mock_list_ports.return_value = fake_port_response
            response = self.app.post('/NetworkDriver.EndpointOperInfo',
                                     content_type='application/json',
                                     data=jsonutils.dumps(data))
            decoded_json = jsonutils.loads(response.data)
            self.assertEqual(200, response.status_code)

            port_name = utils.get_neutron_port_name(docker_endpoint_id)
            mock_list_ports.assert_called_once_with(name=port_name)

            self.assertEqual({}, decoded_json['Value'])
项目:kuryr-libnetwork    作者:openstack    | 项目源码 | 文件源码
def test_network_driver_allocate_network(self):
        docker_network_id = lib_utils.get_hash()
        allocate_network_request = {
            'NetworkID': docker_network_id,
            'IPv4Data': [{
                'AddressSpace': 'foo',
                'Pool': '192.168.42.0/24',
                'Gateway': '192.168.42.1/24',
            }],
            'IPv6Data': [],
            'Options': {}
        }

        response = self.app.post('/NetworkDriver.AllocateNetwork',
                                 content_type='application/json',
                                 data=jsonutils.dumps(
                                     allocate_network_request))
        self.assertEqual(200, response.status_code)
        decoded_json = jsonutils.loads(response.data)
        self.assertEqual({'Options': {}}, decoded_json)
项目:fuel-nailgun-extension-cluster-upgrade    作者:openstack    | 项目源码 | 文件源码
def test_node_reassign_handler_with_roles(self, mcast):
        cluster = self.env.create(
            cluster_kwargs={'api': False},
            nodes_kwargs=[{'status': consts.NODE_STATUSES.ready,
                           'roles': ['controller']}])
        node = cluster.nodes[0]
        seed_cluster = self.env.create_cluster(api=False)

        # NOTE(akscram): reprovision=True means that the node will be
        #                re-provisioned during the reassigning. This is
        #                a default behavior.
        data = {'nodes_ids': [node.id],
                'reprovision': True,
                'roles': ['compute']}
        resp = self.app.post(
            reverse('NodeReassignHandler',
                    kwargs={'cluster_id': seed_cluster.id}),
            jsonutils.dumps(data),
            headers=self.default_headers)
        self.assertEqual(202, resp.status_code)
        self.assertEqual(node.roles, [])
        self.assertEqual(node.pending_roles, ['compute'])
        self.assertTrue(mcast.called)
项目:fuel-nailgun-extension-cluster-upgrade    作者:openstack    | 项目源码 | 文件源码
def test_node_reassign_handler_without_reprovisioning(self, mcast):
        cluster = self.env.create(
            cluster_kwargs={'api': False},
            nodes_kwargs=[{'status': consts.NODE_STATUSES.ready,
                           'roles': ['controller']}])
        node = cluster.nodes[0]
        seed_cluster = self.env.create_cluster(api=False)

        data = {'nodes_ids': [node.id],
                'reprovision': False,
                'roles': ['compute']}
        resp = self.app.post(
            reverse('NodeReassignHandler',
                    kwargs={'cluster_id': seed_cluster.id}),
            jsonutils.dumps(data),
            headers=self.default_headers)
        self.assertEqual(200, resp.status_code)
        self.assertFalse(mcast.called)
        self.assertEqual(node.roles, ['compute'])
项目:ara    作者:openstack    | 项目源码 | 文件源码
def playbook_treeview(playbook):
    """
    Creates a fake filesystem with playbook files and uses generate_tree() to
    recurse and return a JSON structure suitable for bootstrap-treeview.
    """
    fs = fake_filesystem.FakeFilesystem()
    mock_os = fake_filesystem.FakeOsModule(fs)

    files = models.File.query.filter(models.File.playbook_id.in_([playbook]))

    paths = {}
    for file in files:
        fs.CreateFile(file.path)
        paths[file.path] = file.id

    return jsonutils.dumps(generate_tree('/', paths, mock_os),
                           sort_keys=True,
                           indent=2)
项目:nova-fusioncompute    作者:openstack    | 项目源码 | 文件源码
def _init_all_fc_dvs(self):
        """Send message to fc and get dvswitch info

        :return:
        """
        LOG.debug("loading dvs mapping ")
        dvs_map_temp = {}
        physnet_map_temp = {}
        data = self.get(self.site.dvswitchs_uri)
        if not data.get(constant.DVSWITCHS):
            raise fc_exc.DVSwitchNotFound()

        dvs = data.get(constant.DVSWITCHS)
        if dvs and len(dvs) > 0:
            for dvswitch in dvs:
                dvs_id = utils.get_id_from_urn(dvswitch.get('urn'))
                dvs_map_temp[dvswitch["name"]] = dvs_id
                self.update_physnet_map(dvs_id, physnet_map_temp)

        LOG.debug(
            "init all fc dvs dvs map is %s, physnet map is %s",
            jsonutils.dumps(dvs_map_temp),
            jsonutils.dumps(physnet_map_temp))
        self.dvs_mapping = dvs_map_temp
        self.physnet_mapping = physnet_map_temp
项目:nova-fusioncompute    作者:openstack    | 项目源码 | 文件源码
def create_vsp(self, dvs_id, pg_urn, vif):
        """send message to fusion compute to create a vsp

        :param dvs_id:
        :param pg_urn:
        :param vif:
        :return:
        """
        vsp_path = self.get_path_by_site(constant.VSP_URI,
                                         dvs_id=dvs_id)
        port_id = vif['id']

        body = {
            'name': port_id,
            'portGroupUrn': pg_urn,
            'tags': [{'tagKey': constant.VSP_TAG_KEY, 'tagValue': port_id}]
        }

        ret = self.post(vsp_path, data=jsonutils.dumps(body))

        return ret
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def create_subports(num_ports, trunk_ips, timeout=180):
    method = 'POST'
    body = jsonutils.dumps({"trunks": trunk_ips, "num_ports": num_ports})
    headers = {'Content-Type': 'application/json', 'Connection': 'close'}
    headers['Content-Length'] = len(body)
    path = 'http://localhost{0}'.format(constants.VIF_POOL_POPULATE)
    socket_path = constants.MANAGER_SOCKET_FILE
    conn = UnixDomainHttpConnection(socket_path, timeout)
    conn.request(method, path, body=body, headers=headers)
    resp = conn.getresponse()
    print(resp.read())
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def delete_subports(trunk_ips, timeout=180):
    method = 'POST'
    body = jsonutils.dumps({"trunks": trunk_ips})
    headers = {'Content-Type': 'application/json', 'Connection': 'close'}
    headers['Content-Length'] = len(body)
    path = 'http://localhost{0}'.format(constants.VIF_POOL_FREE)
    socket_path = constants.MANAGER_SOCKET_FILE
    conn = UnixDomainHttpConnection(socket_path, timeout)
    conn.request(method, path, body=body, headers=headers)
    resp = conn.getresponse()
    print(resp.read())
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def list_pools(timeout=180):
    method = 'GET'
    body = jsonutils.dumps({})
    headers = {'Context-Type': 'application/json', 'Connection': 'close'}
    headers['Context-Length'] = len(body)
    path = 'http://localhost{0}'.format(constants.VIF_POOL_LIST)
    socket_path = constants.MANAGER_SOCKET_FILE
    conn = UnixDomainHttpConnection(socket_path, timeout)
    conn.request(method, path, body=body, headers=headers)
    resp = conn.getresponse()
    print(resp.read())
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def show_pool(trunk_ip, project_id, sg, timeout=180):
    method = 'GET'
    body = jsonutils.dumps({"pool_key": [trunk_ip, project_id, sg]})
    headers = {'Context-Type': 'application/json', 'Connection': 'close'}
    headers['Context-Length'] = len(body)
    path = 'http://localhost{0}'.format(constants.VIF_POOL_SHOW)
    socket_path = constants.MANAGER_SOCKET_FILE
    conn = UnixDomainHttpConnection(socket_path, timeout)
    conn.request(method, path, body=body, headers=headers)
    resp = conn.getresponse()
    print(resp.read())