Python mock 模块,call() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用mock.call()

项目:kolla-kubernetes-personal    作者:rthallisey    | 项目源码 | 文件源码
def test_load_ok(self):
        in_config = json.dumps({'command': '/bin/true',
                                'config_files': {}})

        mo = mock.mock_open(read_data=in_config)
        with mock.patch.object(set_configs, 'open', mo):
            config = set_configs.load_config()
            set_configs.copy_config(config)
            self.assertEqual([
                mock.call('/var/lib/kolla/config_files/config.json'),
                mock.call().__enter__(),
                mock.call().read(),
                mock.call().__exit__(None, None, None),
                mock.call('/run_command', 'w+'),
                mock.call().__enter__(),
                mock.call().write(u'/bin/true'),
                mock.call().__exit__(None, None, None)], mo.mock_calls)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_create_ovs_vif_port(self):
        calls = [
            mock.call('ovs-vsctl', '--', '--if-exists',
                      'del-port', 'fake-dev', '--', 'add-port',
                      'fake-bridge', 'fake-dev',
                      '--', 'set', 'Interface', 'fake-dev',
                      'external-ids:iface-id=fake-iface-id',
                      'external-ids:iface-status=active',
                      'external-ids:attached-mac=fake-mac',
                      'external-ids:vm-uuid=fake-instance-uuid',
                      run_as_root=True)]
        with mock.patch.object(utils, 'execute', return_value=('', '')) as ex:
            linux_net.create_ovs_vif_port('fake-bridge', 'fake-dev',
                                          'fake-iface-id', 'fake-mac',
                                          'fake-instance-uuid')
            ex.assert_has_calls(calls)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_request(self):
        cls = d_lbaasv2.LBaaSv2Driver
        m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
        loadbalancer = mock.sentinel.loadbalancer
        obj = mock.sentinel.obj
        create = mock.sentinel.create
        find = mock.sentinel.find
        expected_result = mock.sentinel.expected_result
        timer = [mock.sentinel.t0, mock.sentinel.t1]
        m_driver._provisioning_timer.return_value = timer
        m_driver._ensure.side_effect = [n_exc.StateInvalidClient,
                                        expected_result]

        ret = cls._ensure_provisioned(m_driver, loadbalancer, obj, create,
                                      find)

        m_driver._wait_for_provisioning.assert_has_calls(
            [mock.call(loadbalancer, t) for t in timer])
        m_driver._ensure.assert_has_calls(
            [mock.call(obj, create, find) for _ in timer])
        self.assertEqual(expected_result, ret)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_ensure_not_ready(self):
        cls = d_lbaasv2.LBaaSv2Driver
        m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
        loadbalancer = mock.sentinel.loadbalancer
        obj = mock.sentinel.obj
        create = mock.sentinel.create
        find = mock.sentinel.find
        timer = [mock.sentinel.t0, mock.sentinel.t1]
        m_driver._provisioning_timer.return_value = timer
        m_driver._ensure.return_value = None

        self.assertRaises(k_exc.ResourceNotReady, cls._ensure_provisioned,
                          m_driver,
                          loadbalancer, obj, create, find)

        m_driver._wait_for_provisioning.assert_has_calls(
            [mock.call(loadbalancer, t) for t in timer])
        m_driver._ensure.assert_has_calls(
            [mock.call(obj, create, find) for _ in timer])
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_register(self, m_init, m_wrap_consumer):
        consumes = {mock.sentinel.key_fn1: mock.sentinel.key1,
                    mock.sentinel.key_fn2: mock.sentinel.key2,
                    mock.sentinel.key_fn3: mock.sentinel.key3}
        m_dispatcher = mock.Mock()
        m_consumer = mock.Mock()
        m_consumer.consumes = consumes
        m_wrap_consumer.return_value = mock.sentinel.handler
        m_init.return_value = None
        pipeline = _TestEventPipeline()
        pipeline._dispatcher = m_dispatcher

        pipeline.register(m_consumer)

        m_wrap_consumer.assert_called_once_with(m_consumer)
        m_dispatcher.register.assert_has_calls([
            mock.call(key_fn, key, mock.sentinel.handler)
            for key_fn, key in consumes.items()], any_order=True)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_call_retry(self, m_sleep, m_count):
        attempts = 3
        timeout = 10
        deadline = self.now + timeout
        failures = [_EX1()] * (attempts - 1)
        event = mock.sentinel.event
        m_handler = mock.Mock()
        m_handler.side_effect = failures + [None]
        m_sleep.return_value = 1
        m_count.return_value = list(range(1, 5))
        retry = h_retry.Retry(m_handler, timeout=timeout, exceptions=_EX1)

        retry(event)

        m_handler.assert_has_calls([mock.call(event)] * attempts)
        m_sleep.assert_has_calls([
            mock.call(deadline, i + 1, failures[i])
            for i in range(len(failures))])
项目:ironic-staging-drivers    作者:openstack    | 项目源码 | 文件源码
def test_send_magic_packets(self, mock_socket):
        fake_socket = mock.Mock(spec=socket, spec_set=True)
        mock_socket.return_value = fake_socket()
        obj_utils.create_test_port(self.context,
                                   uuid=uuidutils.generate_uuid(),
                                   address='aa:bb:cc:dd:ee:ff',
                                   node_id=self.node.id)
        with task_manager.acquire(
                self.context, self.node.uuid, shared=True) as task:
            wol_power._send_magic_packets(task, '255.255.255.255', 9)

            expected_calls = [
                mock.call(),
                mock.call().setsockopt(socket.SOL_SOCKET,
                                       socket.SO_BROADCAST, 1),
                mock.call().sendto(mock.ANY, ('255.255.255.255', 9)),
                mock.call().sendto(mock.ANY, ('255.255.255.255', 9)),
                mock.call().close()]

            fake_socket.assert_has_calls(expected_calls)
            self.assertEqual(1, mock_socket.call_count)
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def test_get_swift_hash_env(self, mock_config, mock_service_name):
        mock_config.return_value = None
        mock_service_name.return_value = "testsvc"
        tmpfile = tempfile.mktemp()
        swift_context.SWIFT_HASH_FILE = tmpfile
        with mock.patch('lib.swift_context.os.environ.get') as mock_env_get:
            mock_env_get.return_value = str(uuid.uuid4())
            hash_ = swift_context.get_swift_hash()
            mock_env_get.assert_has_calls([
                mock.call('JUJU_MODEL_UUID'),
                mock.call('JUJU_ENV_UUID',
                          mock_env_get.return_value)
            ])

        with open(tmpfile, 'r') as fd:
            self.assertEqual(hash_, fd.read())

        self.assertTrue(mock_config.called)
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def test_failure(self):
        """Ensure that action_fail is called on failure."""
        self.config.return_value = "swauth"
        self.action_get.return_value = "test"
        self.determine_api_port.return_value = 8070
        self.CalledProcessError = ValueError

        self.check_call.side_effect = subprocess.CalledProcessError(
            0, "hi", "no")
        actions.add_user.add_user()
        self.leader_get.assert_called_with("swauth-admin-key")
        calls = [call("account"), call("username"), call("password")]
        self.action_get.assert_has_calls(calls)
        self.action_set.assert_not_called()

        self.action_fail.assert_called_once_with(
            'Adding user test failed with: "Command \'hi\' returned non-zero '
            'exit status 0"')
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def test_final_action(
        self,
        refresh_batch,
        temp_name,
        write_session,
        mock_execute,
        database_name
    ):
        refresh_batch.final_action()
        calls = [
            mock.call(write_session, 'USE {0}'.format(database_name)),
            mock.call(
                write_session,
                'DROP TABLE IF EXISTS {0}'.format(temp_name),
            ),
        ]
        mock_execute.assert_has_calls(calls)
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def test_create_table_from_src_table(
        self,
        refresh_batch,
        fake_original_table,
        fake_new_table,
        show_table_query,
        write_session
    ):
        with mock.patch.object(
            refresh_batch,
            '_execute_query',
            autospec=True
        ) as mock_execute:
            mock_execute.return_value.fetchone.return_value = [
                'test_db',
                fake_original_table
            ]
            refresh_batch.create_table_from_src_table(write_session)
            calls = [
                mock.call(write_session, show_table_query),
                mock.call(write_session, fake_new_table)
            ]
            mock_execute.assert_has_calls(calls, any_order=True)
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def test_bootstrap_files_calls_register_file_for_each_file(
            self,
            containers
    ):
        file_paths = {'a.test', 'b.test'}
        bootstrapper = FileBootstrapperBase(
            schema_ref=None,
            file_paths=file_paths,
            override_metadata=True,
            file_extension='test'
        )
        bootstrapper.register_file = mock.Mock(return_value=None)
        bootstrapper.bootstrap_schema_result = mock.Mock()
        bootstrapper.bootstrap_files()
        assert bootstrapper.register_file.mock_calls == [
            mock.call(file_path) for file_path in file_paths
        ]
        assert bootstrapper.bootstrap_schema_result.mock_calls == [
            mock.call(None) for _ in file_paths
        ]
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def test_run_converts_existing_if_overwrite_true(
            self,
            sql_file_path,
            avsc_file_path,
            globs,
            mock_get_file_paths_from_glob_patterns,
            mock_os_path_exists,
            mock_batch
    ):
        mock_batch.options.overwrite = True
        mock_batch.run()
        assert mock_get_file_paths_from_glob_patterns.mock_calls == [
            mock.call(glob_patterns=globs)
        ]
        assert mock_os_path_exists.mock_calls == [mock.call(avsc_file_path)]
        assert mock_batch.convert_sql_to_avsc.mock_calls == [
            mock.call(
                avsc_file_path=avsc_file_path,
                sql_file_path=sql_file_path
            )
        ]
项目:nameko-slack    作者:iky    | 项目源码 | 文件源码
def test_client_manager_setup_with_multiple_bot_tokens(mocked_slack_client):

    config = {
        'SLACK': {
            'BOTS': {
                'spam': 'abc-123',
                'ham': 'def-456',
            }
        }
    }

    client_manager = rtm.SlackRTMClientManager()
    client_manager.container = Mock(config=config)

    client_manager.setup()

    assert 'spam' in client_manager.clients
    assert 'ham' in client_manager.clients
    assert client_manager.clients['spam'] == mocked_slack_client.return_value
    assert client_manager.clients['ham'] == mocked_slack_client.return_value

    assert call('abc-123') in mocked_slack_client.call_args_list
    assert call('def-456') in mocked_slack_client.call_args_list
项目:nameko-slack    作者:iky    | 项目源码 | 文件源码
def test_handle_event_by_type(self, events, service_runner, tracker):

        class Service:

            name = 'sample'

            @rtm.handle_event('presence_change')
            def handle_event(self, event):
                tracker.handle_event(event)

        service_runner(Service, events)

        assert (
            tracker.handle_event.call_args_list ==
            [
                call(event) for event in events
                if event.get('type') == 'presence_change'
            ])
项目:nameko-slack    作者:iky    | 项目源码 | 文件源码
def test_handle_any_message(self, events, service_runner, tracker):

        class Service:

            name = 'sample'

            @rtm.handle_message
            def handle_event(self, event, message):
                tracker.handle_event(event, message)

        service_runner(Service, events)

        assert (
            tracker.handle_event.call_args_list ==
            [
                call(event, event.get('text')) for event in events
                if event.get('type') == 'message'
            ])
项目:nameko-slack    作者:iky    | 项目源码 | 文件源码
def test_replies_on_handle_message(events, service_runner):

        class Service:

            name = 'sample'

            @rtm.handle_message
            def handle_message(self, event, message):
                return 'sure, {}'.format(message)

        reply_calls = service_runner(Service, events)

        assert reply_calls == [
            call('D11', 'sure, spam ham'),
            call('D11', 'sure, ham spam'),
            call('D11', 'sure, spam egg'),
        ]
项目:skipper    作者:Stratoscale    | 项目源码 | 文件源码
def test_make_without_build_container_tag_with_context(self, skipper_runner_run_mock):
        global_params = self.global_params[:-2]
        makefile = 'Makefile'
        target = 'all'
        make_params = ['-f', makefile, target]
        self._invoke_cli(
            defaults=config.load_defaults(),
            global_params=global_params,
            subcmd='make',
            subcmd_params=make_params
        )
        expected_commands = [
            mock.call(['docker', 'build', '-t', 'build-container-image', '-f', 'Dockerfile.build-container-image',
                       SKIPPER_CONF_CONTAINER_CONTEXT]),
            mock.call(['make'] + make_params, fqdn_image='build-container-image', environment=[],
                      interactive=False, name=None, net='host', volumes=None, workdir=None, use_cache=False),
        ]
        skipper_runner_run_mock.assert_has_calls(expected_commands)
项目:skipper    作者:Stratoscale    | 项目源码 | 文件源码
def test_push(self, skipper_runner_run_mock, requests_get_mock):
        skipper_runner_run_mock.side_effect = [0, 0]
        push_params = ['my_image']
        with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
            requests_response_mock = requests_response_class_mock.return_value
            requests_response_mock.json.return_value = {
                'name': 'my_image',
                'tags': ['latest', 'aaaaaaa', 'bbbbbbb']
            }
            requests_get_mock.return_value = requests_response_mock

        self._invoke_cli(
            global_params=self.global_params,
            subcmd='push',
            subcmd_params=push_params
        )
        expected_commands = [
            mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']),
        ]
        skipper_runner_run_mock.assert_has_calls(expected_commands)
项目:skipper    作者:Stratoscale    | 项目源码 | 文件源码
def test_push_already_in_registry(self, skipper_runner_run_mock, requests_get_mock):
        skipper_runner_run_mock.side_effect = [0, 0]
        push_params = ['my_image']
        with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
            requests_response_mock = requests_response_class_mock.return_value
            requests_response_mock.json.return_value = {
                'name': 'my_image',
                'tags': ['latest', 'aaaaaaa', 'bbbbbbb', "1234567"]
            }
            requests_get_mock.return_value = requests_response_mock

        self._invoke_cli(
            global_params=self.global_params,
            subcmd='push',
            subcmd_params=push_params
        )
        expected_commands = [
            mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']),
        ]
        skipper_runner_run_mock.assert_has_calls(expected_commands)
项目:skipper    作者:Stratoscale    | 项目源码 | 文件源码
def test_push_fail(self, skipper_runner_run_mock, requests_get_mock):
        skipper_runner_run_mock.side_effect = [0, 1]
        push_params = ['my_image']
        with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
            requests_response_mock = requests_response_class_mock.return_value
            requests_response_mock.json.return_value = {
                'name': 'my_image',
                'tags': ['latest', 'aaaaaaa', 'bbbbbbb']
            }
            requests_get_mock.return_value = requests_response_mock

        result = self._invoke_cli(
            global_params=self.global_params,
            subcmd='push',
            subcmd_params=push_params
        )
        self.assertEqual(result.exit_code, 1)
        expected_commands = [
            mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']),
        ]
        skipper_runner_run_mock.assert_has_calls(expected_commands)
项目:skipper    作者:Stratoscale    | 项目源码 | 文件源码
def test_push_rmi_fail(self, skipper_runner_run_mock, requests_get_mock):
        skipper_runner_run_mock.side_effect = [0, 0, 1]
        push_params = ['my_image']
        with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
            requests_response_mock = requests_response_class_mock.return_value
            requests_response_mock.json.return_value = {
                'name': 'my_image',
                'tags': ['latest', 'aaaaaaa', 'bbbbbbb']
            }
            requests_get_mock.return_value = requests_response_mock

        result = self._invoke_cli(
            global_params=self.global_params,
            subcmd='push',
            subcmd_params=push_params
        )
        self.assertEqual(result.exit_code, 0)
        expected_commands = [
            mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']),
        ]
        skipper_runner_run_mock.assert_has_calls(expected_commands)
项目:skipper    作者:Stratoscale    | 项目源码 | 文件源码
def test_push_to_namespace(self, skipper_runner_run_mock, requests_get_mock):
        skipper_runner_run_mock.side_effect = [0, 0]
        push_params = ['--namespace', 'my_namespace', 'my_image']
        with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
            requests_response_mock = requests_response_class_mock.return_value
            requests_response_mock.json.return_value = {
                'name': 'my_image',
                'tags': ['latest', 'aaaaaaa', 'bbbbbbb']
            }
            requests_get_mock.return_value = requests_response_mock

        self._invoke_cli(
            global_params=self.global_params,
            subcmd='push',
            subcmd_params=push_params
        )
        expected_commands = [
            mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_namespace/my_image:1234567']),
            mock.call(['docker', 'push', 'registry.io:5000/my_namespace/my_image:1234567']),
            mock.call(['docker', 'rmi', 'registry.io:5000/my_namespace/my_image:1234567']),
        ]
        skipper_runner_run_mock.assert_has_calls(expected_commands)
项目:skipper    作者:Stratoscale    | 项目源码 | 文件源码
def test_push_with_defaults_from_config_file(self, skipper_runner_run_mock, requests_get_mock):
        skipper_runner_run_mock.side_effect = [0, 0]
        push_params = ['my_image']
        with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
            requests_response_mock = requests_response_class_mock.return_value
            requests_response_mock.json.return_value = {
                'name': 'my_image',
                'tags': ['latest', 'aaaaaaa', 'bbbbbbb']
            }
            requests_get_mock.return_value = requests_response_mock

        self._invoke_cli(
            defaults=config.load_defaults(),
            subcmd='push',
            subcmd_params=push_params
        )
        expected_commands = [
            mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']),
        ]
        skipper_runner_run_mock.assert_has_calls(expected_commands)
项目:skipper    作者:Stratoscale    | 项目源码 | 文件源码
def test_make_without_build_container_tag(self, skipper_runner_run_mock):
        global_params = self.global_params[:-2]
        makefile = 'Makefile'
        target = 'all'
        make_params = ['-f', makefile, target]
        self._invoke_cli(
            global_params=global_params,
            subcmd='make',
            subcmd_params=make_params
        )
        expected_commands = [
            mock.call(['docker', 'build', '-t', 'build-container-image', '-f', 'Dockerfile.build-container-image', '.']),
            mock.call(['make'] + make_params, fqdn_image='build-container-image', environment=[],
                      interactive=False, name=None, net='host', volumes=None, workdir=None, use_cache=False),
        ]
        skipper_runner_run_mock.assert_has_calls(expected_commands)
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_resume_compute_exception_wait_slave_available(self, mock_sleep):
        side_effect_xenapi_failure = FakeXenAPIException
        side_effect_plugin_error = [self.pluginlib.PluginError(
                                    "Wait for the slave to become available"),
                                    None]
        self.mock_patch_object(self.session.xenapi.VM,
                               'start')
        self.session.xenapi.VM.start.side_effect = \
            side_effect_xenapi_failure
        self.host._run_command.side_effect = side_effect_plugin_error
        self.host.XenAPI.Failure = FakeXenAPIException
        expected = [call(["xe", "vm-start", "uuid=%s" % 'fake_compute_uuid']),
                    call(["xe", "vm-start", "uuid=%s" % 'fake_compute_uuid'])]

        self.host._resume_compute(self.session,
                                  'fake_compute_ref',
                                  'fake_compute_uuid')
        self.session.xenapi.VM.start.assert_called_with(
            'fake_compute_ref', False, True)
        self.assertEqual(expected, self.host._run_command.call_args_list)
        mock_sleep.assert_called_once()
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_power_action_input_cmd_result_not_empty(self):
        side_effects = [None, None, 'not_empty']
        temp_arg_dict = {'host_uuid': 'fake_host_uuid'}
        self.host._run_command.side_effect = side_effects
        cmds = {"reboot": "host-reboot",
                "startup": "host-power-on",
                "shutdown": "host-shutdown"}
        fake_action = 'reboot'  # 'statup' and 'shutdown' should be same
        expected_cmd_arg_list = [call(["xe", "host-disable", "uuid=%s"
                                      % 'fake_host_uuid']),
                                 call(["xe", "vm-shutdown", "--multiple",
                                      "resident-on=%s" % 'fake_host_uuid']),
                                 call(["xe", cmds[fake_action], "uuid=%s" %
                                      'fake_host_uuid'])]

        self.assertRaises(self.pluginlib.PluginError,
                          self.host._power_action,
                          fake_action, temp_arg_dict)
        self.assertEqual(self.host._run_command.call_args_list,
                         expected_cmd_arg_list)
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_power_action(self):
        temp_arg_dict = {'host_uuid': 'fake_host_uuid'}
        self.host._run_command.return_value = None
        cmds = {"reboot": "host-reboot",
                "startup": "host-power-on",
                "shutdown": "host-shutdown"}
        fake_action = 'reboot'  # 'statup' and 'shutdown' should be same
        expected_cmd_arg_list = [call(["xe", "host-disable", "uuid=%s"
                                      % 'fake_host_uuid']),
                                 call(["xe", "vm-shutdown", "--multiple",
                                      "resident-on=%s" % 'fake_host_uuid']),
                                 call(["xe", cmds[fake_action], "uuid=%s" %
                                      'fake_host_uuid'])]
        expected_result = {"power_action": fake_action}

        action_result = self.host._power_action(fake_action, temp_arg_dict)
        self.assertEqual(self.host._run_command.call_args_list,
                         expected_cmd_arg_list)
        self.assertEqual(action_result, expected_result)
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_ovs_add_patch_port(self):
        brige_name = 'fake_brige_name'
        port_name = 'fake_port_name'
        peer_port_name = 'fake_peer_port_name'
        side_effects = [brige_name, port_name, peer_port_name]
        self.mock_patch_object(self.pluginlib,
                               'exists')
        self.pluginlib.exists.side_effect = side_effects
        expected_cmd_args = ['ovs-vsctl', '--', '--if-exists', 'del-port',
                             port_name, '--', 'add-port', brige_name,
                             'fake_port_name', '--', 'set', 'interface',
                             'fake_port_name', 'type=patch',
                             'options:peer=%s' % peer_port_name]
        expected_pluginlib_arg_list = [call('fake_args', 'bridge_name'),
                                       call('fake_args', 'port_name'),
                                       call('fake_args', 'peer_port_name')]

        self.host._ovs_add_patch_port('fake_args')
        self.host._run_command.assert_called_with(expected_cmd_args)
        self.assertEqual(self.pluginlib.exists.call_args_list,
                         expected_pluginlib_arg_list)
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_ovs_del_port(self):
        bridge_name = 'fake_brige_name'
        port_name = 'fake_port_name'
        side_effects = [bridge_name, port_name]
        self.mock_patch_object(self.pluginlib,
                               'exists')
        self.pluginlib.exists.side_effect = side_effects
        expected_cmd_args = ['ovs-vsctl', '--', '--if-exists', 'del-port',
                             bridge_name, port_name]
        expected_pluginlib_arg_list = [call('fake_args', 'bridge_name'),
                                       call('fake_args', 'port_name')]

        self.host._ovs_del_port('fake_args')
        self.host._run_command.assert_called_with(expected_cmd_args)
        self.assertEqual(self.pluginlib.exists.call_args_list,
                         expected_pluginlib_arg_list)
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_ovs_set_if_external_id(self):
        interface = 'fake_interface'
        extneral_id = 'fake_extneral_id'
        value = 'fake_value'
        side_effects = [interface, extneral_id, value]
        self.mock_patch_object(self.pluginlib,
                               'exists')
        self.pluginlib.exists.side_effect = side_effects
        expected_cmd_args = ['ovs-vsctl', 'set', 'Interface', interface,
                             'external-ids:%s=%s' % (extneral_id, value)]
        expected_pluginlib_arg_list = [call('fake_args', 'interface'),
                                       call('fake_args', 'extneral_id'),
                                       call('fake_args', 'value')]

        self.host._ovs_set_if_external_id('fake_args')
        self.host._run_command.assert_called_with(expected_cmd_args)
        self.assertEqual(self.pluginlib.exists.call_args_list,
                         expected_pluginlib_arg_list)
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_ip_link_add_veth_pair(self):
        dev1_name = 'fake_brige_name'
        dev2_name = 'fake_port_name'
        side_effects = [dev1_name, dev2_name]
        self.mock_patch_object(self.pluginlib,
                               'exists')
        self.pluginlib.exists.side_effect = side_effects
        expected_cmd_args = ['ip', 'link', 'add', dev1_name, 'type',
                             'veth', 'peer', 'name', dev2_name]
        expected_pluginlib_arg_list = [call('fake_args', 'dev1_name'),
                                       call('fake_args', 'dev2_name')]

        self.host._ip_link_add_veth_pair('fake_args')
        self.host._run_command.assert_called_with(expected_cmd_args)
        self.assertEqual(self.pluginlib.exists.call_args_list,
                         expected_pluginlib_arg_list)
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_ip_link_set_promisc(self):
        device_name = 'fake_device_name'
        option = 'fake_option'
        side_effects = [device_name, option]
        self.mock_patch_object(self.pluginlib,
                               'exists')
        self.pluginlib.exists.side_effect = side_effects
        expected_cmd_args = ['ip', 'link', 'set', device_name, 'promisc',
                             option]
        expected_pluginlib_arg_list = [call('fake_args', 'device_name'),
                                       call('fake_args', 'option')]

        self.host._ip_link_set_promisc('fake_args')
        self.host._run_command.assert_called_with(expected_cmd_args)
        self.assertEqual(self.pluginlib.exists.call_args_list,
                         expected_pluginlib_arg_list)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_no_req_ids(self, *args):
        in_flight = 3

        get_holders = self.make_get_holders(1)
        max_connection = Mock(spec=Connection, host='localhost',
                              lock=Lock(),
                              max_request_id=in_flight - 1, in_flight=in_flight,
                              is_idle=True, is_defunct=False, is_closed=False)
        holder = get_holders.return_value[0]
        holder.get_connections.return_value.append(max_connection)

        self.run_heartbeat(get_holders)

        holder.get_connections.assert_has_calls([call()] * get_holders.call_count)
        self.assertEqual(max_connection.in_flight, in_flight)
        self.assertEqual(max_connection.send_msg.call_count, 0)
        self.assertEqual(max_connection.send_msg.call_count, 0)
        max_connection.defunct.assert_has_calls([call(ANY)] * get_holders.call_count)
        holder.return_connection.assert_has_calls(
            [call(max_connection)] * get_holders.call_count)
项目:sbdspider    作者:onecer    | 项目源码 | 文件源码
def test_scheduler_persistent(self):
        # TODO: Improve this test to avoid the need to check for log messages.
        self.spider.log = mock.Mock(spec=self.spider.log)

        self.scheduler.persist = True
        self.scheduler.open(self.spider)

        self.assertEqual(self.spider.log.call_count, 0)

        self.scheduler.enqueue_request(Request('http://example.com/page1'))
        self.scheduler.enqueue_request(Request('http://example.com/page2'))

        self.assertTrue(self.scheduler.has_pending_requests())
        self.scheduler.close('finish')

        self.scheduler.open(self.spider)
        self.spider.log.assert_has_calls([
            mock.call("Resuming crawl (2 requests scheduled)"),
        ])
        self.assertEqual(len(self.scheduler), 2)

        self.scheduler.persist = False
        self.scheduler.close('finish')

        self.assertEqual(len(self.scheduler), 0)
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def test_register_configs_apache(self, resource_map, exists, renderer):
        exists.return_value = False
        self.os_release.return_value = 'havana'
        fake_renderer = MagicMock()
        fake_renderer.register = MagicMock()
        renderer.return_value = fake_renderer

        resource_map.return_value = self.rsc_map
        utils.register_configs()
        renderer.assert_called_with(
            openstack_release='havana', templates_dir='templates/')

        ex_reg = [
            call('/etc/keystone/keystone.conf', [self.ctxt]),
            call(
                '/etc/apache2/sites-available/openstack_https_frontend',
                [self.ctxt]),
            call(
                '/etc/apache2/sites-available/openstack_https_frontend.conf',
                [self.ctxt]),
        ]
        self.assertEqual(fake_renderer.register.call_args_list, ex_reg)
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def test_create_user_credentials_no_roles(self, mock_create_user,
                                              mock_create_role,
                                              mock_grant_role,
                                              mock_user_exists,
                                              get_callback, set_callback):
        mock_user_exists.return_value = False
        get_callback.return_value = 'passA'
        utils.create_user_credentials('userA',
                                      get_callback,
                                      set_callback,
                                      tenant='tenantA')
        mock_create_user.assert_has_calls([call('userA', 'passA',
                                                domain=None,
                                                tenant='tenantA')])
        mock_create_role.assert_has_calls([])
        mock_grant_role.assert_has_calls([])
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def test_create_user_credentials(self, mock_create_user, mock_create_role,
                                     mock_grant_role, mock_user_exists,
                                     get_callback, set_callback):
        mock_user_exists.return_value = False
        get_callback.return_value = 'passA'
        utils.create_user_credentials('userA',
                                      get_callback,
                                      set_callback,
                                      tenant='tenantA',
                                      grants=['roleA'], new_roles=['roleB'])
        mock_create_user.assert_has_calls([call('userA', 'passA',
                                                tenant='tenantA',
                                                domain=None)])
        mock_create_role.assert_has_calls([call('roleB', user='userA',
                                                tenant='tenantA',
                                                domain=None)])
        mock_grant_role.assert_has_calls([call('userA', 'roleA',
                                               tenant='tenantA',
                                               user_domain=None,
                                               project_domain=None)])
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def test_git_pre_install(self, adduser, add_group, add_user_to_group,
                             write_file, mkdir):
        utils.git_pre_install()
        adduser.assert_called_with('keystone', shell='/bin/bash',
                                   system_user=True,
                                   home_dir='/var/lib/keystone')
        add_group.assert_called_with('keystone', system_group=True)
        add_user_to_group.assert_called_with('keystone', 'keystone')
        expected = [
            call('/var/lib/keystone', owner='keystone',
                 group='keystone', perms=0755, force=False),
            call('/var/lib/keystone/cache', owner='keystone',
                 group='keystone', perms=0755, force=False),
            call('/var/log/keystone', owner='keystone',
                 group='keystone', perms=0755, force=False),
        ]
        self.assertEqual(mkdir.call_args_list, expected)
        write_file.assert_called_with('/var/log/keystone/keystone.log',
                                      '', owner='keystone', group='keystone',
                                      perms=0600)
项目:pyhpecw7    作者:HPENetworking    | 项目源码 | 文件源码
def test_ports_build(self, mock_iface_updown):
        expected_port_build = self.read_config_xml('irf_port_build')
        expected_activate = self.read_action_xml('irf_port_build_activate')

        expected_call_list = [
            [expected_port_build, 'edit_config'],
            ['startup.cfg', 'save'],
            [expected_activate, 'action']

        ]

        down_ifaces = ['FortyGigE1/0/2', 'FortyGigE1/0/1', 'FortyGigE1/0/3', 'FortyGigE1/0/4']
        up_ifaces = ['FortyGigE1/0/1', 'FortyGigE1/0/2', 'FortyGigE1/0/3', 'FortyGigE1/0/4']

        self.irf_port.build('1', [], [], ['FortyGigE1/0/1', 'FortyGigE1/0/2'], ['FortyGigE1/0/3', 'FortyGigE1/0/4'])

        self.assert_stage_requests_multiple(expected_call_list)
        mock_iface_updown.assert_has_calls([mock.call(down_ifaces, 'down'), mock.call(up_ifaces, 'up')])
项目:pyhpecw7    作者:HPENetworking    | 项目源码 | 文件源码
def test_ports_build_remove(self, mock_iface_updown):
        expected_port_build = self.read_config_xml('irf_port_build_remove')
        expected_activate = self.read_action_xml('irf_port_build_activate')

        expected_call_list = [
            [expected_port_build, 'edit_config'],
            ['startup.cfg', 'save'],
            [expected_activate, 'action']

        ]

        down_ifaces = ['FortyGigE1/0/2', 'FortyGigE1/0/1', 'FortyGigE1/0/3', 'FortyGigE1/0/4']
        up_ifaces = []

        self.irf_port.build('1', ['FortyGigE1/0/1', 'FortyGigE1/0/2'], ['FortyGigE1/0/3', 'FortyGigE1/0/4'], [], [])

        self.assert_stage_requests_multiple(expected_call_list)
        mock_iface_updown.assert_has_calls([mock.call(down_ifaces, 'down'), mock.call(up_ifaces, 'up')])
项目:pyhpecw7    作者:HPENetworking    | 项目源码 | 文件源码
def test_init(self, mock_ping):
        ping = Ping(self.device, TARGET)

        self.assertEqual(ping.vrf, '')
        self.assertEqual(ping.host, TARGET)
        self.assertEqual(ping.v6, False)
        self.assertEqual(ping.detail, False)

        ping6 = Ping(self.device, TARGET, v6=True)

        self.assertEqual(ping6.vrf, '')
        self.assertEqual(ping6.host, TARGET)
        self.assertEqual(ping6.v6, True)
        self.assertEqual(ping6.detail, False)

        mock_ping.assert_has_calls([mock.call(), mock.call()])
项目:charm-nova-cloud-controller    作者:openstack    | 项目源码 | 文件源码
def test_migrate_nova_databases_ocata(self, cellv2_ready, get_cell_uuid,
                                          check_output):
        "Migrate database with nova-manage in a clustered env"
        get_cell_uuid.return_value = 'c83121db-f1c7-464a-b657-38c28fac84c6'
        self.relation_ids.return_value = ['cluster:1']
        self.os_release.return_value = 'ocata'
        utils.migrate_nova_databases()
        check_output.assert_has_calls([
            call(['nova-manage', 'api_db', 'sync']),
            call(['nova-manage', 'cell_v2', 'map_cell0']),
            call(['nova-manage', 'cell_v2', 'create_cell', '--name', 'cell1',
                  '--verbose']),
            call(['nova-manage', 'db', 'sync']),
            call(['nova-manage', 'db', 'online_data_migrations']),
            call(['nova-manage', 'cell_v2', 'discover_hosts', '--cell_uuid',
                  'c83121db-f1c7-464a-b657-38c28fac84c6', '--verbose']),
        ])
        self.peer_store.assert_called_with('dbsync_state', 'complete')
        self.assertTrue(self.enable_services.called)
        self.cmd_all_services.assert_called_with('start')
项目:charm-nova-cloud-controller    作者:openstack    | 项目源码 | 文件源码
def test_upgrade_icehouse_juno(self, determine_packages,
                                   migrate_nova_databases,
                                   get_step_upgrade_source):
        "Simulate a call to do_openstack_upgrade() for icehouse->juno"
        self.test_config.set('openstack-origin', 'cloud:trusty-juno')
        get_step_upgrade_source.return_value = None
        self.os_release.return_value = 'icehouse'
        self.get_os_codename_install_source.return_value = 'juno'
        self.is_leader.return_value = True
        self.relation_ids.return_value = []
        utils.do_openstack_upgrade(self.register_configs())
        self.apt_update.assert_called_with(fatal=True)
        self.apt_upgrade.assert_called_with(options=DPKG_OPTS, fatal=True,
                                            dist=True)
        self.apt_install.assert_called_with(determine_packages(), fatal=True)
        self.register_configs.assert_called_with(release='juno')
        self.assertTrue(migrate_nova_databases.call_count, 1)
项目:charm-nova-cloud-controller    作者:openstack    | 项目源码 | 文件源码
def test_upgrade_kilo_liberty(self, determine_packages,
                                  migrate_nova_databases,
                                  migrate_nova_flavors,
                                  get_step_upgrade_source):
        "Simulate a call to do_openstack_upgrade() for kilo->liberty"
        self.test_config.set('openstack-origin', 'cloud:trusty-liberty')
        get_step_upgrade_source.return_value = None
        self.os_release.return_value = 'kilo'
        self.get_os_codename_install_source.return_value = 'liberty'
        self.is_leader.return_value = True
        self.relation_ids.return_value = []
        utils.do_openstack_upgrade(self.register_configs())
        self.apt_update.assert_called_with(fatal=True)
        self.apt_upgrade.assert_called_with(options=DPKG_OPTS, fatal=True,
                                            dist=True)
        self.apt_install.assert_called_with(determine_packages(), fatal=True)
        self.register_configs.assert_called_with(release='liberty')
        self.assertTrue(migrate_nova_flavors.call_count, 1)
        self.assertTrue(migrate_nova_databases.call_count, 1)
项目:charm-nova-cloud-controller    作者:openstack    | 项目源码 | 文件源码
def test_upgrade_liberty_mitaka(self, determine_packages,
                                    migrate_nova_databases,
                                    get_step_upgrade_source,
                                    database_setup):
        "Simulate a call to do_openstack_upgrade() for liberty->mitaka"
        self.test_config.set('openstack-origin', 'cloud:trusty-kilo')
        get_step_upgrade_source.return_value = None
        self.os_release.return_value = 'liberty'
        self.get_os_codename_install_source.return_value = 'mitaka'
        self.is_leader.return_value = True
        self.relation_ids.return_value = []
        database_setup.return_value = False
        utils.do_openstack_upgrade(self.register_configs())
        self.apt_update.assert_called_with(fatal=True)
        self.apt_upgrade.assert_called_with(options=DPKG_OPTS, fatal=True,
                                            dist=True)
        self.apt_install.assert_called_with(determine_packages(), fatal=True)
        self.register_configs.assert_called_with(release='mitaka')
        self.assertFalse(migrate_nova_databases.called)
        database_setup.assert_called_with(prefix='novaapi')
项目:charm-nova-cloud-controller    作者:openstack    | 项目源码 | 文件源码
def test_config_changed_region_change(self, mock_compute_changed,
                                          mock_config_https,
                                          mock_filter_packages,
                                          mock_service_resume,
                                          mock_is_db_initialised,
                                          mock_update_nova_consoleauth_config,
                                          mock_update_aws_compat_services):
        self.git_install_requested.return_value = False
        self.openstack_upgrade_available.return_value = False
        self.config_value_changed.return_value = True
        self.related_units.return_value = ['unit/0']
        self.relation_ids.side_effect = \
            lambda x: ['generic_rid'] if x == 'cloud-compute' else []
        mock_is_db_initialised.return_value = False
        self.os_release.return_value = 'diablo'
        hooks.config_changed()
        mock_compute_changed.assert_has_calls([call('generic_rid', 'unit/0')])
        self.assertTrue(mock_update_nova_consoleauth_config.called)
        self.assertTrue(mock_update_aws_compat_services.called)
项目:charm-nova-cloud-controller    作者:openstack    | 项目源码 | 文件源码
def test_db_joined_mitaka(self):
        self.get_relation_ip.return_value = '10.10.10.10'
        self.os_release.return_value = 'mitaka'
        self.is_relation_made.return_value = False
        hooks.db_joined()
        self.relation_set.assert_has_calls([
            call(nova_database='nova',
                 nova_username='nova',
                 nova_hostname='10.10.10.10',
                 relation_id=None),
            call(novaapi_database='nova_api',
                 novaapi_username='nova',
                 novaapi_hostname='10.10.10.10',
                 relation_id=None),
        ])

        self.get_relation_ip.assert_called_with('shared-db',
                                                cidr_network=None)
项目:charm-nova-cloud-controller    作者:openstack    | 项目源码 | 文件源码
def test_amqp_changed_api_rel(self, configs, cell_joined, api_joined,
                                  quantum_joined, mock_is_db_initialised,
                                  update_db_allowed, init_db_allowed):
        self.relation_ids.side_effect = [
            ['nova-cell-api/0'],
            ['nova-api/0'],
            ['quantum-service/0'],
        ]
        mock_is_db_initialised.return_value = False
        configs.complete_contexts = MagicMock()
        configs.complete_contexts.return_value = ['amqp']
        configs.write = MagicMock()
        self.os_release.return_value = 'diablo'
        self.is_relation_made.return_value = True
        hooks.amqp_changed()
        self.assertEqual(configs.write.call_args_list,
                         [call('/etc/nova/nova.conf')])
        cell_joined.assert_called_with(rid='nova-cell-api/0')
        api_joined.assert_called_with(rid='nova-api/0')
        quantum_joined.assert_called_with(rid='quantum-service/0',
                                          remote_restart=True)
项目:Eskapade    作者:KaveIO    | 项目源码 | 文件源码
def test_execute(self, mock_finalize, mock_execute, mock_initialize):
        c1 = Chain('c1')
        l1 = Link('l1')
        l2 = Link('l2')
        l3 = Link('l3')

        # test happy flow
        c1.links = [l1, l2, l3]
        mock_initialize.return_value = StatusCode.Success
        mock_execute.return_value = StatusCode.Success
        mock_finalize.return_value = StatusCode.Success
        mock_parent = mock.MagicMock(autospec=True)
        mock_parent.attach_mock(mock_initialize, 'initialize')
        mock_parent.attach_mock(mock_execute, 'execute')
        mock_parent.attach_mock(mock_finalize, 'finalize')
        status = c1.initialize()
        self.assertEqual(status, StatusCode.Success)
        status = c1.execute()
        self.assertEqual(status, StatusCode.Success)
        status = c1.finalize()
        self.assertEqual(status, StatusCode.Success)
        calls = [mock.call.initialize()]*3 + [mock.call.execute()]*3 + [mock.call.finalize()]*3
        mock_parent.assert_has_calls(calls, any_order=False)