Python mock 模块,ANY 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用mock.ANY

项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_ensure_loadbalancer(self):
        cls = d_lbaasv2.LBaaSv2Driver
        m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
        expected_resp = mock.sentinel.expected_resp
        namespace = 'TEST_NAMESPACE'
        name = 'TEST_NAME'
        project_id = 'TEST_PROJECT'
        subnet_id = 'D3FA400A-F543-4B91-9CD3-047AF0CE42D1'
        ip = '1.2.3.4'
        # TODO(ivc): handle security groups
        sg_ids = []
        endpoints = {'metadata': {'namespace': namespace, 'name': name}}

        m_driver._ensure.return_value = expected_resp
        resp = cls.ensure_loadbalancer(m_driver, endpoints, project_id,
                                       subnet_id, ip, sg_ids)
        m_driver._ensure.assert_called_once_with(mock.ANY,
                                                 m_driver._create_loadbalancer,
                                                 m_driver._find_loadbalancer)
        req = m_driver._ensure.call_args[0][0]
        self.assertEqual("%s/%s" % (namespace, name), req.name)
        self.assertEqual(project_id, req.project_id)
        self.assertEqual(subnet_id, req.subnet_id)
        self.assertEqual(ip, str(req.ip))
        self.assertEqual(expected_resp, resp)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_ensure_pool(self):
        cls = d_lbaasv2.LBaaSv2Driver
        m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
        expected_resp = mock.sentinel.expected_resp
        endpoints = mock.sentinel.endpoints
        loadbalancer = obj_lbaas.LBaaSLoadBalancer(
            id='00EE9E11-91C2-41CF-8FD4-7970579E5C4C',
            project_id='TEST_PROJECT')
        listener = obj_lbaas.LBaaSListener(
            id='A57B7771-6050-4CA8-A63C-443493EC98AB',
            name='TEST_LISTENER_NAME',
            protocol='TCP')
        m_driver._ensure_provisioned.return_value = expected_resp

        resp = cls.ensure_pool(m_driver, endpoints, loadbalancer, listener)

        m_driver._ensure_provisioned.assert_called_once_with(
            loadbalancer, mock.ANY, m_driver._create_pool,
            m_driver._find_pool)
        pool = m_driver._ensure_provisioned.call_args[0][1]
        self.assertEqual(listener.name, pool.name)
        self.assertEqual(loadbalancer.project_id, pool.project_id)
        self.assertEqual(listener.id, pool.listener_id)
        self.assertEqual(listener.protocol, pool.protocol)
        self.assertEqual(expected_resp, resp)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_annotate(self, m_patch, m_count):
        m_count.return_value = list(range(1, 5))
        path = '/test'
        annotations = {'a1': 'v1', 'a2': 'v2'}
        resource_version = "123"
        ret = {'metadata': {'annotations': annotations,
                            "resourceVersion": resource_version}}
        data = jsonutils.dumps(ret, sort_keys=True)

        m_resp = mock.MagicMock()
        m_resp.ok = True
        m_resp.json.return_value = ret
        m_patch.return_value = m_resp

        self.assertEqual(annotations, self.client.annotate(
            path, annotations, resource_version=resource_version))
        m_patch.assert_called_once_with(self.base_url + path,
                                        data=data, headers=mock.ANY,
                                        cert=(None, None), verify=False)
项目:ironic-staging-drivers    作者:openstack    | 项目源码 | 文件源码
def test_execute_clean_step_no_success_log(
            self, log_mock, run_mock, utils_mock, parse_driver_info_mock):

        run_mock.side_effect = exception.InstanceDeployFailure('Boom')
        step = {'priority': 10, 'interface': 'deploy',
                'step': 'erase_devices', 'args': {'tags': ['clean']}}
        di_info = self.node.driver_internal_info
        di_info['agent_url'] = 'http://127.0.0.1'
        self.node.driver_internal_info = di_info
        self.node.save()
        with task_manager.acquire(self.context, self.node.uuid) as task:
            self.driver.execute_clean_step(task, step)
            log_mock.error.assert_called_once_with(
                mock.ANY, {'node': task.node['uuid'],
                           'step': 'erase_devices'})
            utils_mock.assert_called_once_with(task, 'Boom')
            self.assertFalse(log_mock.info.called)
项目:ironic-staging-drivers    作者:openstack    | 项目源码 | 文件源码
def test_send_magic_packets(self, mock_socket):
        fake_socket = mock.Mock(spec=socket, spec_set=True)
        mock_socket.return_value = fake_socket()
        obj_utils.create_test_port(self.context,
                                   uuid=uuidutils.generate_uuid(),
                                   address='aa:bb:cc:dd:ee:ff',
                                   node_id=self.node.id)
        with task_manager.acquire(
                self.context, self.node.uuid, shared=True) as task:
            wol_power._send_magic_packets(task, '255.255.255.255', 9)

            expected_calls = [
                mock.call(),
                mock.call().setsockopt(socket.SOL_SOCKET,
                                       socket.SO_BROADCAST, 1),
                mock.call().sendto(mock.ANY, ('255.255.255.255', 9)),
                mock.call().sendto(mock.ANY, ('255.255.255.255', 9)),
                mock.call().close()]

            fake_socket.assert_has_calls(expected_calls)
            self.assertEqual(1, mock_socket.call_count)
项目:ironic-staging-drivers    作者:openstack    | 项目源码 | 文件源码
def test__set_boot_device_order_fail(self, mock_aw, mock_client_pywsman):
        namespace = resource_uris.CIM_BootConfigSetting
        device = boot_devices.PXE
        result_xml = test_utils.build_soap_xml([{'ReturnValue': '2'}],
                                               namespace)
        mock_xml = test_utils.mock_wsman_root(result_xml)
        mock_pywsman = mock_client_pywsman.Client.return_value
        mock_pywsman.invoke.return_value = mock_xml

        self.assertRaises(exception.AMTFailure,
                          amt_mgmt._set_boot_device_order, self.node, device)
        mock_pywsman.invoke.assert_called_once_with(
            mock.ANY, namespace, 'ChangeBootOrder', mock.ANY)

        mock_pywsman = mock_client_pywsman.Client.return_value
        mock_pywsman.invoke.return_value = None

        self.assertRaises(exception.AMTConnectFailure,
                          amt_mgmt._set_boot_device_order, self.node, device)
        self.assertTrue(mock_aw.called)
项目:ironic-staging-drivers    作者:openstack    | 项目源码 | 文件源码
def test__enable_boot_config_fail(self, mock_aw, mock_client_pywsman):
        namespace = resource_uris.CIM_BootService
        result_xml = test_utils.build_soap_xml([{'ReturnValue': '2'}],
                                               namespace)
        mock_xml = test_utils.mock_wsman_root(result_xml)
        mock_pywsman = mock_client_pywsman.Client.return_value
        mock_pywsman.invoke.return_value = mock_xml

        self.assertRaises(exception.AMTFailure,
                          amt_mgmt._enable_boot_config, self.node)
        mock_pywsman.invoke.assert_called_once_with(
            mock.ANY, namespace, 'SetBootConfigRole', mock.ANY)

        mock_pywsman = mock_client_pywsman.Client.return_value
        mock_pywsman.invoke.return_value = None

        self.assertRaises(exception.AMTConnectFailure,
                          amt_mgmt._enable_boot_config, self.node)
        self.assertTrue(mock_aw.called)
项目:ironic-staging-drivers    作者:openstack    | 项目源码 | 文件源码
def test__get_nm_address_raw_fail(self, parse_mock, dump_mock, raw_mock,
                                      unlink_mock):
        parse_mock.return_value = ('0x0A', '0x0B')
        raw_mock.side_effect = exception.IPMIFailure('raw error')
        with task_manager.acquire(self.context, self.node.uuid,
                                  shared=False) as task:
            self.assertRaises(exception.IPMIFailure, nm_vendor._get_nm_address,
                              task)
            self.node.refresh()
            internal_info = self.node.driver_internal_info
            self.assertEqual(False, internal_info['intel_nm_address'])
            self.assertEqual(False, internal_info['intel_nm_channel'])
            parse_mock.assert_called_once_with(self.temp_filename)
            dump_mock.assert_called_once_with(task, self.temp_filename)
            unlink_mock.assert_called_once_with(self.temp_filename)
            raw_mock.assert_called_once_with(task, mock.ANY)
项目:nameko-amqp-retry    作者:nameko    | 项目源码 | 文件源码
def test_rpc(
        self, container, entrypoint_tracker, rpc_proxy, wait_for_result,
        backoff_count
    ):
        """ RPC entrypoint supports backoff
        """
        with entrypoint_waiter(
            container, 'method', callback=wait_for_result
        ) as result:
            res = rpc_proxy.service.method("arg")

        assert res == result.get() == "result"

        # entrypoint fired backoff_count + 1 times
        assert entrypoint_tracker.get_results() == (
            [None] * backoff_count + ["result"]
        )
        # entrypoint raised `Backoff` for all but the last execution
        assert entrypoint_tracker.get_exceptions() == (
            [(Backoff, ANY, ANY)] * backoff_count + [None]
        )
项目:nameko-amqp-retry    作者:nameko    | 项目源码 | 文件源码
def test_events(
        self, container, entrypoint_tracker, dispatch_event, wait_for_result,
        backoff_count
    ):
        """ Event handler supports backoff
        """
        with entrypoint_waiter(
            container, 'method', callback=wait_for_result
        ) as result:
            dispatch_event("src_service", "event_type", {})

        assert result.get() == "result"

        assert entrypoint_tracker.get_results() == (
            [None] * backoff_count + ["result"]
        )
        assert entrypoint_tracker.get_exceptions() == (
            [(Backoff, ANY, ANY)] * backoff_count + [None]
        )
项目:nameko-amqp-retry    作者:nameko    | 项目源码 | 文件源码
def test_messaging(
        self, container, entrypoint_tracker, publish_message, exchange, queue,
        wait_for_result, backoff_count
    ):
        """ Message consumption supports backoff
        """
        with entrypoint_waiter(
            container, 'method', callback=wait_for_result
        ) as result:
            publish_message(exchange, "msg", routing_key=queue.routing_key)

        assert result.get() == "result"

        assert entrypoint_tracker.get_results() == (
            [None] * backoff_count + ["result"]
        )
        assert entrypoint_tracker.get_exceptions() == (
            [(Backoff, ANY, ANY)] * backoff_count + [None]
        )
项目:nameko-amqp-retry    作者:nameko    | 项目源码 | 文件源码
def test_expiry(
        self, container, entrypoint_tracker, publish_message, exchange, queue,
        limited_backoff, wait_for_backoff_expired
    ):
        """ Message consumption supports backoff expiry
        """
        with entrypoint_waiter(
            container, 'method', callback=wait_for_backoff_expired
        ) as result:
            publish_message(exchange, "msg", routing_key=queue.routing_key)

        with pytest.raises(Backoff.Expired) as raised:
            result.get()
        assert (
            "Backoff aborted after '{}' retries".format(limited_backoff)
        ) in str(raised.value)

        assert entrypoint_tracker.get_results() == (
            [None] * limited_backoff + [None]
        )
        assert entrypoint_tracker.get_exceptions() == (
            [(Backoff, ANY, ANY)] * limited_backoff +
            [(Backoff.Expired, ANY, ANY)]
        )
项目:networking-opencontrail    作者:openstack    | 项目源码 | 文件源码
def test_create_resource(self, options, config):
        driver = self._get_driver(options, config)
        driver._request_backend = mock.MagicMock()
        status_code = requests.codes.ok
        context = mock.MagicMock()
        res_type = 'network'
        res_data = {
            'id': 'network-123',
            'tenant_id': ATTR_NOT_SPECIFIED,
        }
        driver._request_backend.return_value = (status_code, res_data)

        response = driver._create_resource(res_type,
                                           context,
                                           {res_type: res_data})

        self.assertEqual(response, res_data)
        driver._request_backend.assert_called_with(context, mock.ANY,
                                                   res_type, 'CREATE')
项目:networking-opencontrail    作者:openstack    | 项目源码 | 文件源码
def test_get_resource(self, options, config):
        driver = self._get_driver(options, config)
        driver._request_backend = mock.MagicMock()
        status_code = requests.codes.ok
        context = mock.MagicMock()
        res_type = 'network'
        res_data = {
            'id': 'network-123',
        }
        fields = ['id']
        driver._request_backend.return_value = (status_code, res_data)

        response = driver._get_resource(res_type,
                                        context, res_data['id'], fields)

        self.assertEqual(response, res_data)
        driver._request_backend.assert_called_with(context, mock.ANY,
                                                   res_type, 'READ')
项目:networking-opencontrail    作者:openstack    | 项目源码 | 文件源码
def test_update_resource(self, options, config):
        driver = self._get_driver(options, config)
        driver._request_backend = mock.MagicMock()
        status_code = requests.codes.ok
        context = mock.MagicMock()
        res_type = 'network'
        res_data = {
            'id': 'network-123',
        }
        driver._request_backend.return_value = (status_code, res_data)

        response = driver._update_resource(res_type, context, res_data['id'],
                                           {res_type: res_data})

        self.assertEqual(response, res_data)
        driver._request_backend.assert_called_with(context, mock.ANY,
                                                   res_type, 'UPDATE')
项目:networking-opencontrail    作者:openstack    | 项目源码 | 文件源码
def test_list_resource(self, options, config):
        driver = self._get_driver(options, config)
        driver._request_backend = mock.MagicMock()
        status_code = requests.codes.ok
        context = mock.MagicMock()
        res_type = 'network'
        res_data = {
            'id': 'network-123',
        }
        fields = ['id']
        driver._request_backend.return_value = (status_code, [res_data])

        response = driver._list_resource(res_type, context, None, fields)

        self.assertEqual(response, [res_data])
        driver._request_backend.assert_called_with(context, mock.ANY,
                                                   res_type, 'READALL')
项目:networking-opencontrail    作者:openstack    | 项目源码 | 文件源码
def test_add_router_interface(self, options, config):
        driver = self._get_driver(options, config)
        driver._request_backend = mock.MagicMock()
        status_code = requests.codes.ok
        context = mock.MagicMock()
        res_type = 'router'
        res_data = {
            'id': 'router-123',
        }
        res_id = res_data['id']
        driver._request_backend.return_value = (status_code, res_data)

        response = driver.add_router_interface(context, res_id, res_data)

        self.assertEqual(response, res_data)
        driver._request_backend.assert_called_with(context, mock.ANY,
                                                   res_type, 'ADDINTERFACE')
项目:networking-opencontrail    作者:openstack    | 项目源码 | 文件源码
def test_remove_router_interface(self, options, config):
        driver = self._get_driver(options, config)
        driver._request_backend = mock.MagicMock()
        status_code = requests.codes.ok
        context = mock.MagicMock()
        res_type = 'router'
        res_data = {
            'id': 'router-123',
        }
        res_id = res_data['id']
        driver._request_backend.return_value = (status_code, res_data)

        response = driver.remove_router_interface(context, res_id, res_data)

        self.assertEqual(response, res_data)
        driver._request_backend.assert_called_with(context, mock.ANY,
                                                   res_type, 'DELINTERFACE')
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_bad_protocol_version(self, *args):
        c = self.make_connection()
        c._requests = Mock()
        c.defunct = Mock()

        # read in a SupportedMessage response
        header = self.make_header_prefix(SupportedMessage, version=0x7f)
        options = self.make_options_body()
        message = self.make_msg(header, options)
        c._iobuf = BytesIO()
        c._iobuf.write(message)
        c.process_io_buffer()

        # make sure it errored correctly
        c.defunct.assert_called_once_with(ANY)
        args, kwargs = c.defunct.call_args
        self.assertIsInstance(args[0], ProtocolError)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_negative_body_length(self, *args):
        c = self.make_connection()
        c._requests = Mock()
        c.defunct = Mock()

        # read in a SupportedMessage response
        header = self.make_header_prefix(SupportedMessage)
        message = header + int32_pack(-13)
        c._iobuf = BytesIO()
        c._iobuf.write(message)
        c.process_io_buffer()

        # make sure it errored correctly
        c.defunct.assert_called_once_with(ANY)
        args, kwargs = c.defunct.call_args
        self.assertIsInstance(args[0], ProtocolError)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_unsupported_cql_version(self, *args):
        c = self.make_connection()
        c._requests = {0: (c._handle_options_response, ProtocolHandler.decode_message, [])}
        c.defunct = Mock()
        c.cql_version = "3.0.3"

        # read in a SupportedMessage response
        header = self.make_header_prefix(SupportedMessage)

        options_buf = BytesIO()
        write_stringmultimap(options_buf, {
            'CQL_VERSION': ['7.8.9'],
            'COMPRESSION': []
        })
        options = options_buf.getvalue()

        c.process_msg(_Frame(version=4, flags=0, stream=0, opcode=SupportedMessage.opcode, body_offset=9, end_pos=9 + len(options)), options)

        # make sure it errored correctly
        c.defunct.assert_called_once_with(ANY)
        args, kwargs = c.defunct.call_args
        self.assertIsInstance(args[0], ProtocolError)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_handle_topology_change(self):
        event = {
            'change_type': 'NEW_NODE',
            'address': ('1.2.3.4', 9000)
        }
        self.cluster.scheduler.reset_mock()
        self.control_connection._handle_topology_change(event)
        self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.control_connection._refresh_nodes_if_not_up, '1.2.3.4')

        event = {
            'change_type': 'REMOVED_NODE',
            'address': ('1.2.3.4', 9000)
        }
        self.cluster.scheduler.reset_mock()
        self.control_connection._handle_topology_change(event)
        self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.cluster.remove_host, None)

        event = {
            'change_type': 'MOVED_NODE',
            'address': ('1.2.3.4', 9000)
        }
        self.cluster.scheduler.reset_mock()
        self.control_connection._handle_topology_change(event)
        self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.control_connection._refresh_nodes_if_not_up, '1.2.3.4')
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_handle_schema_change(self):

        change_types = [getattr(SchemaChangeType, attr) for attr in vars(SchemaChangeType) if attr[0] != '_']
        for change_type in change_types:
            event = {
                'target_type': SchemaTargetType.TABLE,
                'change_type': change_type,
                'keyspace': 'ks1',
                'table': 'table1'
            }
            self.cluster.scheduler.reset_mock()
            self.control_connection._handle_schema_change(event)
            self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.control_connection.refresh_schema, **event)

            self.cluster.scheduler.reset_mock()
            event['target_type'] = SchemaTargetType.KEYSPACE
            del event['table']
            self.control_connection._handle_schema_change(event)
            self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.control_connection.refresh_schema, **event)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_result_message(self):
        session = self.make_basic_session()
        session.cluster._default_load_balancing_policy.make_query_plan.return_value = ['ip1', 'ip2']
        pool = session._pools.get.return_value
        pool.is_shutdown = False

        connection = Mock(spec=Connection)
        pool.borrow_connection.return_value = (connection, 1)

        rf = self.make_response_future(session)
        rf.send_request()

        rf.session._pools.get.assert_called_once_with('ip1')
        pool.borrow_connection.assert_called_once_with(timeout=ANY)

        connection.send_msg.assert_called_once_with(rf.message, 1, cb=ANY, encoder=ProtocolHandler.encode_message, decoder=ProtocolHandler.decode_message, result_metadata=[])

        rf._set_result(None, None, None, self.make_mock_response([{'col': 'val'}]))
        result = rf.result()
        self.assertEqual(result, [{'col': 'val'}])
项目:craton    作者:openstack    | 项目源码 | 文件源码
def test_update_host(self, mock_host, mock_get_device_type):
        mock_get_device_type.return_value = "network_devices"
        record = dict(fake_resources.HOST1.items())
        payload = {'name': 'Host_New', 'parent_id': 2}
        db_data = payload.copy()
        record.update(payload)
        mock_host.return_value = record

        resp = self.put('/v1/hosts/1', data=payload)

        self.assertEqual(resp.json['name'], db_data['name'])
        self.assertEqual(resp.json['parent_id'], db_data['parent_id'])
        self.assertEqual(200, resp.status_code)
        mock_host.assert_called_once_with(mock.ANY, '1', db_data)
        mock_get_device_type.assert_called_once()
        up_link = {
            "rel": "up",
            "href": "http://localhost/v1/network-devices/2"
        }
        self.assertIn(up_link, resp.json["links"])
项目:craton    作者:openstack    | 项目源码 | 文件源码
def test_get_host_by_ip_address_filter(self, fake_hosts):
        region_id = 1
        ip_address = '10.10.0.1'
        filters = {
            'region_id': 1, 'ip_address': ip_address,
            'resolved-values': True,
        }
        path_query = '/v1/hosts?region_id={}&ip_address={}'.format(
            region_id, ip_address
        )
        fake_hosts.return_value = (fake_resources.HOSTS_LIST_R2, {})
        resp = self.get(path_query)
        host_resp = fake_resources.HOSTS_LIST_R2
        self.assertEqual(len(resp.json['hosts']), 1)
        self.assertEqual(resp.json['hosts'][0]["name"], host_resp[0].name)

        fake_hosts.assert_called_once_with(
            mock.ANY, filters, {'limit': 30, 'marker': None},
        )
项目:craton    作者:openstack    | 项目源码 | 文件源码
def test_projects_put_variables(self, mock_project):
        proj1 = fake_resources.PROJECT1
        proj1_id = str(proj1.id)
        db_return_value = copy.deepcopy(proj1)
        db_return_value.variables["a"] = "b"
        mock_project.return_value = db_return_value
        payload = {"a": "b"}
        db_data = payload.copy()
        resp = self.put(
            'v1/projects/{}/variables'.format(proj1_id),
            data=payload
        )
        self.assertEqual(resp.status_code, 200)
        mock_project.assert_called_once_with(mock.ANY, "projects", proj1_id,
                                             db_data)
        expected = {
            "variables": {"key1": "value1", "key2": "value2", "a": "b"},
        }
        self.assertDictEqual(expected, resp.json)
项目:craton    作者:openstack    | 项目源码 | 文件源码
def test_put_network_device(self, fake_device, mock_get_device_type):
        mock_get_device_type.return_value = "network_devices"
        payload = {"name": "NetDev_New1", "parent_id": 2}
        fake_device.return_value = dict(fake_resources.NETWORK_DEVICE1.items(),
                                        **payload)
        resp = self.put('v1/network-devices/1', data=payload)
        self.assertEqual(resp.status_code, 200)
        self.assertEqual(resp.json['name'], "NetDev_New1")
        self.assertEqual(resp.json['parent_id'], 2)
        fake_device.assert_called_once_with(
            mock.ANY, '1', {"name": "NetDev_New1", "parent_id": 2}
        )
        mock_get_device_type.assert_called_once()
        up_link = {
            "rel": "up",
            "href": "http://localhost/v1/network-devices/2"
        }
        self.assertIn(up_link, resp.json["links"])
项目:craton    作者:openstack    | 项目源码 | 文件源码
def test_get_network_devices_by_ip_address_filter(self, fake_devices):
        region_id = '1'
        ip_address = '10.10.0.1'
        filters = {'region_id': region_id, 'ip_address': ip_address,
                   'resolved-values': True}
        path_query = '/v1/network-devices?region_id={}&ip_address={}'.format(
            region_id, ip_address
        )
        fake_devices.return_value = (fake_resources.NETWORK_DEVICE_LIST1, {})
        resp = self.get(path_query)
        device_resp = fake_resources.NETWORK_DEVICE_LIST1
        self.assertEqual(len(resp.json['network_devices']), 1)
        self.assertEqual(resp.json['network_devices'][0]["ip_address"],
                         device_resp[0].ip_address)

        fake_devices.assert_called_once_with(
            mock.ANY, filters, {'limit': 30, 'marker': None},
        )
项目:craton    作者:openstack    | 项目源码 | 文件源码
def test_get_netinterfaces_by_ip_address_filter(self, fake_interfaces):
        device_id = 1
        ip_address = '10.10.0.1'
        filters = {'device_id': device_id, 'ip_address': ip_address}
        path_query = (
            '/v1/network-interfaces?device_id={}&ip_address={}'.format(
                device_id, ip_address
            )
        )
        fake_interfaces.return_value = (fake_resources.NETWORK_INTERFACE_LIST1,
                                        {})
        resp = self.get(path_query)
        interface_resp = fake_resources.NETWORK_INTERFACE_LIST1
        self.assertEqual(len(resp.json['network_interfaces']), 1)
        self.assertEqual(resp.json['network_interfaces'][0]["name"],
                         interface_resp[0].name)

        fake_interfaces.assert_called_once_with(
            mock.ANY, filters, {'limit': 30, 'marker': None},
        )
项目:craton    作者:openstack    | 项目源码 | 文件源码
def test_network_interfaces_update(self, fake_interfaces):
        record = dict(fake_resources.NETWORK_INTERFACE1.items())
        payload = {'name': 'New'}
        db_data = payload.copy()
        record.update(payload)
        fake_interfaces.return_value = record

        resp = self.put('/v1/network-interfaces/1', data=payload)

        self.assertEqual(resp.json['name'], db_data['name'])
        self.assertEqual(200, resp.status_code)
        self.assertEqual(
            resp.json['ip_address'],
            fake_resources.NETWORK_INTERFACE1.ip_address
        )
        fake_interfaces.assert_called_once_with(mock.ANY, '1', db_data)
项目:sndlatr    作者:Schibum    | 项目源码 | 文件源码
def test_send_mail(self):
        """
        send_mail should send mail and mark as sent if everything goes well.
        """
        job = self.create_job(state='queued')
        with mailman_mock() as mailman:
            job.send_mail('token')
            mailman.send_draft.assert_called_with(
                job.message_id_int, mock.ANY)
            mailman.mark_as_sent.assert_called_with(job.message_id_int,
                                                    job.sent_mail_rfc_id,
                                                    'testmail')
            mailman.quit.assert_called_with()
        job = job.key.get()
        self.assertEquals(job.state, 'done')
        self.assertTrue(job.sent_mail_rfc_id)
项目:CAL    作者:HPCC-Cloud-Computing    | 项目源码 | 文件源码
def test_create_without_instance_name(self):
        self.mock_object(
            self.fake_driver.resource, 'create_instances',
            mock.Mock(return_value=mock.Mock))

        self.fake_driver.create(
            'fake_image_id',
            'fake_flavor_id',
            'fake_net_id'
        )

        self.fake_driver.resource.create_instances. \
            assert_called_once_with(
                ImageId='fake_image_id',
                MinCount=1,
                MaxCount=1,
                InstanceType='fake_flavor_id',
                SubnetId='fake_net_id',
                IamInstanceProfile={
                    'Arn': '',
                    'Name': mock.ANY
                }
            )
项目:CAL    作者:HPCC-Cloud-Computing    | 项目源码 | 文件源码
def test_create_without_instance_name(self):
        self.mock_object(
            self.fake_driver.client.servers, 'create',
            mock.Mock(return_value=mock.Mock))
        # NOTE: in fact: mock.Mock is novaclient.v2.servers.Server

        self.fake_driver.create(
            'fake_image_id',
            'fake_flavor_id',
            'fake_net_id'
        )

        self.fake_driver.client.servers.create. \
            assert_called_once_with(
                name=mock.ANY,
                image='fake_image_id',
                flavor='fake_flavor_id',
                nics=[{'net-id': 'fake_net_id'}]
            )
项目:quadriga    作者:joowani    | 项目源码 | 文件源码
def test_get_orders(requests_post, logger):
    client = build_client()
    output = client.get_orders()
    assert output == test_body
    requests_post.assert_called_with(
        url=build_url('/open_orders'),
        json={
            'book': test_book,
            'key': test_key,
            'nonce': test_nonce,
            'signature': mock.ANY
        }
    )
    logger.debug.assert_called_with(
        "[client: test_client_id] get user's open orders for btc_usd")

    with pytest.raises(InvalidOrderBookError):
        client.get_orders(book='invalid_book')
项目:Texty    作者:sarthfrey    | 项目源码 | 文件源码
def test_proxy_info(http_mock, resp_mock):
    http = Mock()
    http.request.return_value = (Mock(), Mock())
    http_mock.return_value = http
    Connection.set_proxy_info(
        'example.com',
        8080,
        proxy_type=PROXY_TYPE_SOCKS5,
    )
    make_request("GET", "http://httpbin.org/get")
    http_mock.assert_called_with(timeout=None, ca_certs=ANY, proxy_info=ANY)
    http.request.assert_called_with("http://httpbin.org/get", "GET",
                                    body=None, headers=None)
    proxy_info = http_mock.call_args[1]['proxy_info']
    assert_equal(proxy_info.proxy_host, 'example.com')
    assert_equal(proxy_info.proxy_port, 8080)
    assert_equal(proxy_info.proxy_type, PROXY_TYPE_SOCKS5)
项目:Texty    作者:sarthfrey    | 项目源码 | 文件源码
def testPassThrough(self, mock_request):
        mock_response = Mock()
        mock_response.ok = True,
        mock_response.content = json.dumps({'key': 'value'})
        mock_request.return_value = mock_response

        assert_equal(self.r.timeout, sentinel.timeout)
        assert_equal((mock_response, {'key': 'value'}), self.r.request('GET', base_uri))

        mock_request.assert_called_once_with(
            'GET',
            base_uri + '.json',
            headers=ANY,
            timeout=sentinel.timeout,
            auth=ANY
        )
项目:virtualbmc    作者:openstack    | 项目源码 | 文件源码
def test_add(self, mock_check_conn, mock_makedirs, mock_configparser,
                 mock_open):
        config = mock_configparser.return_value
        params = copy.copy(self.add_params)
        self.manager.add(**params)

        expected_calls = [mock.call('VirtualBMC', i, self.add_params[i])
                          for i in self.add_params]
        self.assertEqual(sorted(expected_calls),
                         sorted(config.set.call_args_list))
        config.add_section.assert_called_once_with('VirtualBMC')
        config.write.assert_called_once_with(mock.ANY)
        mock_check_conn.assert_called_once_with(
            self.add_params['libvirt_uri'], self.add_params['domain_name'],
            sasl_username=self.add_params['libvirt_sasl_username'],
            sasl_password=self.add_params['libvirt_sasl_password'])
        mock_makedirs.assert_called_once_with(
            os.path.join(_CONFIG_PATH, self.add_params['domain_name']))
        mock_configparser.assert_called_once_with()
项目:virtualbmc    作者:openstack    | 项目源码 | 文件源码
def test_add_with_port_as_int(self, mock_check_conn, mock_makedirs,
                                  mock_configparser, mock_open):
        config = mock_configparser.return_value
        params = copy.copy(self.add_params)
        params['port'] = int(params['port'])
        self.manager.add(**params)

        expected_calls = [mock.call('VirtualBMC', i, self.add_params[i])
                          for i in self.add_params]
        self.assertEqual(sorted(expected_calls),
                         sorted(config.set.call_args_list))
        config.add_section.assert_called_once_with('VirtualBMC')
        config.write.assert_called_once_with(mock.ANY)
        mock_check_conn.assert_called_once_with(
            self.add_params['libvirt_uri'], self.add_params['domain_name'],
            sasl_username=self.add_params['libvirt_sasl_username'],
            sasl_password=self.add_params['libvirt_sasl_password'])
        mock_makedirs.assert_called_once_with(
            os.path.join(_CONFIG_PATH, self.add_params['domain_name']))
        mock_configparser.assert_called_once_with()
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_get_all_capsules(self, mock_container_get_by_uuid,
                              mock_capsule_list,
                              mock_container_show):
        test_container = utils.get_test_container()
        test_container_obj = objects.Container(self.context,
                                               **test_container)
        mock_container_get_by_uuid.return_value = test_container_obj

        test_capsule = utils.get_test_capsule()
        test_capsule_obj = objects.Capsule(self.context, **test_capsule)
        mock_capsule_list.return_value = [test_capsule_obj]
        mock_container_show.return_value = test_container_obj

        response = self.app.get('/capsules/')

        mock_capsule_list.assert_called_once_with(mock.ANY,
                                                  1000, None, 'id', 'asc',
                                                  filters=None)
        context = mock_capsule_list.call_args[0][0]
        self.assertIs(False, context.all_tenants)
        self.assertEqual(200, response.status_int)
        actual_capsules = response.json['capsules']
        self.assertEqual(1, len(actual_capsules))
        self.assertEqual(test_capsule['uuid'],
                         actual_capsules[0].get('uuid'))
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_get_all_hosts(self, mock_host_list, mock_policy):
        mock_policy.return_value = True
        test_host = utils.get_test_compute_node()
        numat = numa.NUMATopology._from_dict(test_host['numa_topology'])
        test_host['numa_topology'] = numat
        hosts = [objects.ComputeNode(self.context, **test_host)]
        mock_host_list.return_value = hosts

        response = self.get('/v1/hosts')

        mock_host_list.assert_called_once_with(mock.ANY,
                                               1000, None, 'hostname', 'asc',
                                               filters=None)
        self.assertEqual(200, response.status_int)
        actual_hosts = response.json['hosts']
        self.assertEqual(1, len(actual_hosts))
        self.assertEqual(test_host['uuid'],
                         actual_hosts[0].get('uuid'))
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_get_all_containers(self, mock_container_list,
                                mock_container_show):
        test_container = utils.get_test_container()
        containers = [objects.Container(self.context, **test_container)]
        mock_container_list.return_value = containers
        mock_container_show.return_value = containers[0]

        response = self.get('/v1/containers/')

        mock_container_list.assert_called_once_with(mock.ANY,
                                                    1000, None, 'id', 'asc',
                                                    filters=None)
        context = mock_container_list.call_args[0][0]
        self.assertIs(False, context.all_tenants)
        self.assertEqual(200, response.status_int)
        actual_containers = response.json['containers']
        self.assertEqual(1, len(actual_containers))
        self.assertEqual(test_container['uuid'],
                         actual_containers[0].get('uuid'))
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_get_all_containers_all_tenants(self, mock_container_list,
                                            mock_container_show, mock_policy):
        mock_policy.return_value = True
        test_container = utils.get_test_container()
        containers = [objects.Container(self.context, **test_container)]
        mock_container_list.return_value = containers
        mock_container_show.return_value = containers[0]

        response = self.get('/v1/containers/?all_tenants=1')

        mock_container_list.assert_called_once_with(mock.ANY,
                                                    1000, None, 'id', 'asc',
                                                    filters=None)
        context = mock_container_list.call_args[0][0]
        self.assertIs(True, context.all_tenants)
        self.assertEqual(200, response.status_int)
        actual_containers = response.json['containers']
        self.assertEqual(1, len(actual_containers))
        self.assertEqual(test_container['uuid'],
                         actual_containers[0].get('uuid'))
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_get_one_by_uuid_all_tenants(self, mock_container_get_by_uuid,
                                         mock_container_show, mock_policy):
        mock_policy.return_value = True
        test_container = utils.get_test_container()
        test_container_obj = objects.Container(self.context, **test_container)
        mock_container_get_by_uuid.return_value = test_container_obj
        mock_container_show.return_value = test_container_obj

        response = self.get('/v1/containers/%s/?all_tenants=1' %
                            test_container['uuid'])

        mock_container_get_by_uuid.assert_called_once_with(
            mock.ANY,
            test_container['uuid'])
        context = mock_container_get_by_uuid.call_args[0][0]
        self.assertIs(True, context.all_tenants)
        self.assertEqual(200, response.status_int)
        self.assertEqual(test_container['uuid'],
                         response.json['uuid'])
项目:zun    作者:openstack    | 项目源码 | 文件源码
def _action_test(self, container, action, ident_field,
                     mock_container_action, status_code, query_param=''):
        test_container_obj = objects.Container(self.context, **container)
        ident = container.get(ident_field)
        get_by_ident_loc = 'zun.objects.Container.get_by_%s' % ident_field
        with patch(get_by_ident_loc) as mock_get_by_indent:
            mock_get_by_indent.return_value = test_container_obj
            response = self.post('/v1/containers/%s/%s/?%s' %
                                 (ident, action, query_param))
            self.assertEqual(status_code, response.status_int)

            # Only PUT should work, others like GET should fail
            self.assertRaises(AppError, self.get,
                              ('/v1/containers/%s/%s/' %
                               (ident, action)))
        if query_param:
            value = query_param.split('=')[1]
            mock_container_action.assert_called_once_with(
                mock.ANY, test_container_obj, value)
        else:
            mock_container_action.assert_called_once_with(
                mock.ANY, test_container_obj)
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_delete_container_by_uuid_all_tenants(self, mock_get_by_uuid,
                                                  mock_container_delete,
                                                  mock_validate, mock_policy):
        mock_policy.return_value = True
        test_container = utils.get_test_container()
        test_container_obj = objects.Container(self.context, **test_container)
        mock_get_by_uuid.return_value = test_container_obj

        container_uuid = test_container.get('uuid')
        response = self.delete('/v1/containers/%s/?all_tenants=1' %
                               container_uuid)

        self.assertEqual(204, response.status_int)
        mock_container_delete.assert_called_once_with(
            mock.ANY, test_container_obj, False)
        context = mock_container_delete.call_args[0][0]
        self.assertIs(True, context.all_tenants)
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_kill_container_by_uuid(self,
                                    mock_get_by_uuid, mock_container_kill,
                                    mock_validate):
        test_container_obj = objects.Container(self.context,
                                               **utils.get_test_container())
        mock_container_kill.return_value = test_container_obj
        test_container = utils.get_test_container()
        test_container_obj = objects.Container(self.context, **test_container)
        mock_get_by_uuid.return_value = test_container_obj

        container_uuid = test_container.get('uuid')
        url = '/v1/containers/%s/%s/' % (container_uuid, 'kill')
        cmd = {'signal': '9'}
        response = self.post(url, cmd)
        self.assertEqual(202, response.status_int)
        mock_container_kill.assert_called_once_with(
            mock.ANY, test_container_obj, cmd['signal'])
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_resize_container_by_uuid(self,
                                      mock_get_by_uuid,
                                      mock_container_resize,
                                      mock_validate):
        test_container_obj = objects.Container(self.context,
                                               **utils.get_test_container())
        mock_container_resize.return_value = test_container_obj
        test_container = utils.get_test_container()
        test_container_obj = objects.Container(self.context, **test_container)
        mock_get_by_uuid.return_value = test_container_obj

        container_name = test_container.get('name')
        url = '/v1/containers/%s/%s/' % (container_name, 'resize')
        cmd = {'h': '100', 'w': '100'}
        response = self.post(url, cmd)
        self.assertEqual(200, response.status_int)
        mock_container_resize.assert_called_once_with(
            mock.ANY, test_container_obj, cmd['h'], cmd['w'])
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_get_archive_by_uuid(self,
                                 mock_get_by_uuid,
                                 container_get_archive,
                                 mock_validate):
        container_get_archive.return_value = ("", "")
        test_container = utils.get_test_container()
        test_container_obj = objects.Container(self.context, **test_container)
        mock_get_by_uuid.return_value = test_container_obj

        container_uuid = test_container.get('uuid')
        url = '/v1/containers/%s/%s/' % (container_uuid, 'get_archive')
        cmd = {'path': '/home/1.txt'}
        response = self.get(url, cmd)
        self.assertEqual(200, response.status_int)
        container_get_archive.assert_called_once_with(
            mock.ANY, test_container_obj, cmd['path'])