Python mock.patch 模块,multiple() 实例源码

我们从Python开源项目中,提取了以下38个代码示例,用于说明如何使用mock.patch.multiple()

项目:charm-ceph-mon    作者:openstack    | 项目源码 | 文件源码
def test_nrpe_dependency_installed(self, mock_config):
        config = copy.deepcopy(CHARM_CONFIG)
        mock_config.side_effect = lambda key: config[key]
        with patch.multiple(ceph_hooks,
                            apt_install=DEFAULT,
                            rsync=DEFAULT,
                            log=DEFAULT,
                            write_file=DEFAULT,
                            nrpe=DEFAULT) as mocks:
            ceph_hooks.update_nrpe_config()
        mocks["apt_install"].assert_called_once_with(
            ["python-dbus", "lockfile-progs"])
项目:DiscourseSenser    作者:WladimirSidorenko    | 项目源码 | 文件源码
def test_train_1(self):
        j = 2
        ret_train = np.zeros((6, 3, N_CLASSES))
        ret_dev = np.zeros((6, 3, N_CLASSES))
        func_ret = np.zeros((1, N_CLASSES))
        func_ret[0, j] = 1.
        with patch.multiple(self.wb,
                            _gs=True,
                            _generate_ts=lambda *x: (self.TRAIN_X, Y),
                            _extract_features=MagicMock(
                                return_value=FEATS),
                            _model=MOCK_DEFAULT):
            with patch("dsenser.wang.wangbase.GridSearchCV"):
                self.wb._model.decision_function = \
                    MagicMock(return_value=func_ret)
                self.wb._model.classes_ = CLASSES_
                self.wb.train(([(0, REL1)], [PARSE1]),
                              ([(0, REL1)], [PARSE1]),
                              1, 1, ret_train, ret_dev)
项目:DiscourseSenser    作者:WladimirSidorenko    | 项目源码 | 文件源码
def test_predict_1(self):
        x = np.array([[0, 1, 2, 3, 4], [[0, 1, 2, 3, 4]]])
        y = np.array([0, 0, 0, 0])

        def _predict_func_mock(*args, **kwargs):
            return np.array([[0, 0, 1, 0],
                             [0, 0, 0, 1],
                             [0, 0, 1, 0],
                             [1, 0, 0, 0],
                             ])

        with patch.multiple(self.nnbs,
                            _predict_func=_predict_func_mock,
                            get_test_w_emb_i=None,
                            _init_wemb_funcs=MagicMock(),
                            _rel2x=MagicMock(return_value=x)):
            ret = np.array([[0] * 4])
            self.nnbs.predict(None, (None, None), ret, 0)
            assert np.allclose(ret[0], y)
项目:DiscourseSenser    作者:WladimirSidorenko    | 项目源码 | 文件源码
def test_init_wemb_funcs_0(self):
        word2vec = Mock()
        word2vec.load = MagicMock()
        word2vec.ndim = 2
        fmock = MagicMock()
        with patch.multiple(self.nnbs, _plain_w2v=True, ndim=8,
                            w2v=word2vec, _trained=True,
                            _predict_func_emb=fmock):
            self.nnbs._init_wemb_funcs()
            assert word2vec.load.called
            assert self.nnbs.ndim == word2vec.ndim
            assert self.nnbs.get_train_w_emb_i == \
                self.nnbs._get_train_w2v_emb_i
            assert self.nnbs.get_test_w_emb_i == \
                self.nnbs._get_test_w2v_emb_i
            assert self.nnbs._predict_func == fmock
项目:DiscourseSenser    作者:WladimirSidorenko    | 项目源码 | 文件源码
def test_init_wemb_funcs_2(self):
        word2vec = Mock()
        word2vec.load = MagicMock()
        word2vec.ndim = 2
        fmock = MagicMock()
        with patch.multiple(self.nnbs, _plain_w2v=False, lstsq=True,
                            ndim=8, w2v=None, _trained=True,
                            _predict_func_emb=fmock):
            with patch.object(dsenser.nnbase, "Word2Vec", word2vec):
                self.nnbs._init_wemb_funcs()
                assert self.nnbs.ndim == DFLT_VDIM
                assert word2vec.load.called
                assert self.nnbs.get_train_w_emb_i == \
                    self.nnbs._get_train_w2v_emb_i
                assert self.nnbs.get_test_w_emb_i == \
                    self.nnbs._get_test_w2v_lstsq_emb_i
项目:charm-ceph    作者:openstack    | 项目源码 | 文件源码
def test_upgrade_charm_with_nrpe_relation_installs_dependencies(self):
        with patch.multiple(
                ceph_hooks,
                apt_install=DEFAULT,
                rsync=DEFAULT,
                log=DEFAULT,
                write_file=DEFAULT,
                nrpe=DEFAULT,
                emit_cephconf=DEFAULT,
                mon_relation_joined=DEFAULT,
                is_relation_made=DEFAULT) as mocks, patch(
                    "charmhelpers.contrib.hardening.harden.config"):
            mocks["is_relation_made"].return_value = True
            ceph_hooks.upgrade_charm()
        mocks["apt_install"].assert_called_with(
            ["python-dbus", "lockfile-progs"])
项目:lewis    作者:DMSC-Instrument-Data    | 项目源码 | 文件源码
def test_not_implemented_errors(self):
        # Construction of the base class should not be possible
        self.assertRaises(NotImplementedError, StateMachineDevice)

        mandatory_methods = {
            '_get_state_handlers': Mock(return_value={'test': MagicMock()}),
            '_get_initial_state': Mock(return_value='test'),
            '_get_transition_handlers': Mock(
                return_value={('test', 'test'): Mock(return_value=True)})
        }

        # If any of the mandatory methods is missing, a NotImplementedError must be raised
        for methods in itertools.combinations(mandatory_methods.items(), 2):
            with patch.multiple('lewis.devices.StateMachineDevice', **dict(methods)):
                self.assertRaises(NotImplementedError, StateMachineDevice)

        # If all are implemented, no exception should be raised
        with patch.multiple('lewis.devices.StateMachineDevice', **mandatory_methods):
            assertRaisesNothing(self, StateMachineDevice)
项目:pytest-salt-containers    作者:dincamihai    | 项目源码 | 文件源码
def mocks(request, testdir):
    testdir.makeini("""
        [pytest]
        IMAGE = myregistry/defaultimage
        MINION_IMAGE = myregistry/minion_image
    """)
    plugin_mocks = patch.multiple(
        'saltcontainers.plugin',
        Client=MagicMock(**{
            'return_value.inspect_container.return_value': {
                'NetworkSettings': {'IPAddress': 'fake-ip'}}
        }))
    salt_key_mock = patch(
        'saltcontainers.factories.MasterModel.salt_key',
        **{'return_value.__getitem__.return_value.__contains__.return_value': True})
    plugin_mocks.start()
    request.addfinalizer(plugin_mocks.stop)
    salt_key_mock.start()
    request.addfinalizer(salt_key_mock.stop)
项目:deb-python-requests-kerberos    作者:openstack    | 项目源码 | 文件源码
def test_authenticate_server(self):
        with patch.multiple(kerberos_module_name, authGSSClientStep=clientStep_complete):

            response_ok = requests.Response()
            response_ok.url = "http://www.example.org/"
            response_ok.status_code = 200
            response_ok.headers = {
                'www-authenticate': 'negotiate servertoken',
                'authorization': 'Negotiate GSSRESPONSE'}

            auth = requests_kerberos.HTTPKerberosAuth()
            auth.context = {"www.example.org": "CTX"}
            result = auth.authenticate_server(response_ok)

            self.assertTrue(result)
            clientStep_complete.assert_called_with("CTX", "servertoken")
项目:charm-ceph-mon    作者:openstack    | 项目源码 | 文件源码
def test_upgrade_charm_with_nrpe_relation_installs_dependencies(
            self,
            mock_config):
        config = copy.deepcopy(CHARM_CONFIG)
        mock_config.side_effect = lambda key: config[key]
        with patch.multiple(
                ceph_hooks,
                apt_install=DEFAULT,
                rsync=DEFAULT,
                log=DEFAULT,
                write_file=DEFAULT,
                nrpe=DEFAULT,
                emit_cephconf=DEFAULT,
                mon_relation_joined=DEFAULT,
                is_relation_made=DEFAULT) as mocks, patch(
                    "charmhelpers.contrib.hardening.harden.config"):
            mocks["is_relation_made"].return_value = True
            ceph_hooks.upgrade_charm()
        mocks["apt_install"].assert_called_with(
            ["python-dbus", "lockfile-progs"])
项目:supvisors    作者:julien6387    | 项目源码 | 文件源码
def test_send_request(self):
        """ Test the execution of a deferred Supervisor request. """
        from supvisors.mainloop import SupvisorsMainLoop
        from supvisors.utils import DeferredRequestHeaders
        main_loop = SupvisorsMainLoop(self.supvisors)
        # patch main loop subscriber
        with patch.multiple(main_loop, check_address=DEFAULT,
            start_process=DEFAULT, stop_process=DEFAULT,
            restart=DEFAULT, shutdown=DEFAULT) as mocked_loop:
            # test check address
            self.check_call(main_loop, mocked_loop, 'check_address',
                            DeferredRequestHeaders.CHECK_ADDRESS,
                            ('10.0.0.2', ))
            # test start process
            self.check_call(main_loop, mocked_loop, 'start_process',
                            DeferredRequestHeaders.START_PROCESS,
                            ('10.0.0.2', 'dummy_process', 'extra args'))
            # test stop process
            self.check_call(main_loop, mocked_loop, 'stop_process',
                            DeferredRequestHeaders.STOP_PROCESS,
                            ('10.0.0.2', 'dummy_process'))
            # test restart
            self.check_call(main_loop, mocked_loop, 'restart',
                            DeferredRequestHeaders.RESTART,
                            ('10.0.0.2', ))
            # test shutdown
            self.check_call(main_loop, mocked_loop, 'shutdown',
                            DeferredRequestHeaders.SHUTDOWN,
                            ('10.0.0.2', ))
项目:supvisors    作者:julien6387    | 项目源码 | 文件源码
def test_on_remote_event(self):
        """ Test the reception of a Supervisor remote comm event. """
        from supvisors.listener import SupervisorListener
        listener = SupervisorListener(self.supvisors)
        # add patches for what is tested just above
        with patch.multiple(listener, unstack_event=DEFAULT,
                unstack_info=DEFAULT, authorization=DEFAULT):
            # test unknown type
            event = Mock(type='unknown', data='')
            listener.on_remote_event(event)
            self.assertFalse(listener.unstack_event.called)
            self.assertFalse(listener.unstack_info.called)
            self.assertFalse(listener.authorization.called)
            # test event
            event = Mock(type='event', data={'state': 'RUNNING'})
            listener.on_remote_event(event)
            self.assertEqual([call({'state': 'RUNNING'})],
                listener.unstack_event.call_args_list)
            self.assertFalse(listener.unstack_info.called)
            self.assertFalse(listener.authorization.called)
            listener.unstack_event.reset_mock()
            # test info
            event = Mock(type='info', data={'name': 'dummy_process'})
            listener.on_remote_event(event)
            self.assertFalse(listener.unstack_event.called)
            self.assertEqual([call({'name': 'dummy_process'})],
                listener.unstack_info.call_args_list)
            self.assertFalse(listener.authorization.called)
            listener.unstack_info.reset_mock()
            # test authorization
            event = Mock(type='auth', data=('10.0.0.1', True))
            listener.on_remote_event(event)
            self.assertFalse(listener.unstack_event.called)
            self.assertFalse(listener.unstack_info.called)
            self.assertEqual([call(('10.0.0.1', True))],
                listener.authorization.call_args_list)
项目:vault-redirector-twisted    作者:manheim    | 项目源码 | 文件源码
def test_run(self):
        self.cls.reactor = Mock(spec_set=reactor)
        with patch.multiple(
            pbm,
            logger=DEFAULT,
            Site=DEFAULT,
            LoopingCall=DEFAULT,
            VaultRedirectorSite=DEFAULT
        ) as mod_mocks:
            with patch.multiple(
                pb,
                get_active_node=DEFAULT,
                run_reactor=DEFAULT,
                listentcp=DEFAULT,
                add_update_loop=DEFAULT,
                listentls=DEFAULT
            ) as cls_mocks:
                cls_mocks['get_active_node'].return_value = 'consul:1234'
                self.cls.run()
        assert self.cls.active_node_ip_port == 'consul:1234'
        assert mod_mocks['logger'].mock_calls == [
            call.warning('Initial Vault active node: %s', 'consul:1234'),
            call.warning('Starting Twisted reactor (event loop)')
        ]
        assert mod_mocks['VaultRedirectorSite'].mock_calls == [call(self.cls)]
        assert mod_mocks['Site'].mock_calls == [
            call(mod_mocks['VaultRedirectorSite'].return_value)
        ]
        assert self.cls.reactor.mock_calls == []
        assert cls_mocks['run_reactor'].mock_calls == [call()]
        assert mod_mocks['LoopingCall'].mock_calls == []
        assert cls_mocks['listentcp'].mock_calls == [
            call(mod_mocks['Site'].return_value)
        ]
        assert cls_mocks['add_update_loop'].mock_calls == [call()]
        assert cls_mocks['listentls'].mock_calls == []
项目:vault-redirector-twisted    作者:manheim    | 项目源码 | 文件源码
def test_run_tls(self):
        self.cls.reactor = Mock(spec_set=reactor)
        self.cls.tls_factory = Mock()
        with patch.multiple(
            pbm,
            logger=DEFAULT,
            Site=DEFAULT,
            LoopingCall=DEFAULT,
            VaultRedirectorSite=DEFAULT
        ) as mod_mocks:
            with patch.multiple(
                pb,
                get_active_node=DEFAULT,
                run_reactor=DEFAULT,
                listentcp=DEFAULT,
                add_update_loop=DEFAULT,
                listentls=DEFAULT
            ) as cls_mocks:
                cls_mocks['get_active_node'].return_value = 'consul:1234'
                self.cls.run()
        assert self.cls.active_node_ip_port == 'consul:1234'
        assert mod_mocks['logger'].mock_calls == [
            call.warning('Initial Vault active node: %s', 'consul:1234'),
            call.warning('Starting Twisted reactor (event loop)')
        ]
        assert mod_mocks['VaultRedirectorSite'].mock_calls == [call(self.cls)]
        assert mod_mocks['Site'].mock_calls == [
            call(mod_mocks['VaultRedirectorSite'].return_value)
        ]
        assert self.cls.reactor.mock_calls == []
        assert cls_mocks['run_reactor'].mock_calls == [call()]
        assert mod_mocks['LoopingCall'].mock_calls == []
        assert cls_mocks['listentls'].mock_calls == [
            call(mod_mocks['Site'].return_value)
        ]
        assert cls_mocks['add_update_loop'].mock_calls == [call()]
        assert cls_mocks['listentcp'].mock_calls == []
项目:vault-redirector-twisted    作者:manheim    | 项目源码 | 文件源码
def test_run_error(self):
        self.cls.reactor = Mock(spec_set=reactor)
        with patch.multiple(
            pbm,
            logger=DEFAULT,
            Site=DEFAULT,
            LoopingCall=DEFAULT,
            VaultRedirectorSite=DEFAULT
        ) as mod_mocks:
            with patch.multiple(
                pb,
                get_active_node=DEFAULT,
                run_reactor=DEFAULT,
                listentcp=DEFAULT,
                add_update_loop=DEFAULT
            ) as cls_mocks:
                cls_mocks['get_active_node'].return_value = None
                with pytest.raises(SystemExit) as excinfo:
                    self.cls.run()
        assert excinfo.value.code == 3
        assert mod_mocks['logger'].mock_calls == [
            call.critical("ERROR: Could not get active vault node from "
                          "Consul. Exiting.")
        ]
        assert mod_mocks['VaultRedirectorSite'].mock_calls == []
        assert mod_mocks['Site'].mock_calls == []
        assert self.cls.reactor.mock_calls == []
        assert cls_mocks['run_reactor'].mock_calls == []
        assert mod_mocks['LoopingCall'].mock_calls == []
项目:http-prompt    作者:eliangcs    | 项目源码 | 文件源码
def run_and_exit(cli_args=None, prompt_commands=None):
    """Run http-prompt executable, execute some prompt commands, and exit."""
    if cli_args is None:
        cli_args = []

        # Make sure last command is 'exit'
    if prompt_commands is None:
        prompt_commands = ['exit']
    else:
        prompt_commands += ['exit']

    # Fool cli() so that it believes we're running from CLI instead of pytest.
    # We will restore it at the end of the function.
    orig_argv = sys.argv
    sys.argv = ['http-prompt'] + cli_args

    try:
        with patch.multiple('http_prompt.cli',
                            prompt=DEFAULT, execute=DEFAULT) as mocks:
            mocks['execute'].side_effect = execute

            # prompt() is mocked to return the command in 'prompt_commands' in
            # sequence, i.e., prompt() returns prompt_commands[i-1] when it is
            # called for the ith time
            mocks['prompt'].side_effect = prompt_commands

            result = CliRunner().invoke(cli, cli_args)
            context = mocks['execute'].call_args[0][1]

        return result, context
    finally:
        sys.argv = orig_argv
项目:DiscourseSenser    作者:WladimirSidorenko    | 项目源码 | 文件源码
def test_train(self):
        with patch.multiple(self.ds, implicit=Mock(),
                            explicit=Mock()):
            self.ds.train(None)
项目:DiscourseSenser    作者:WladimirSidorenko    | 项目源码 | 文件源码
def test_train_0(self):
        with patch.multiple(self.wb,
                            _extract_features=MOCK_DEFAULT,
                            _model=MOCK_DEFAULT):
            self.wb.train(([], []), None)
项目:DiscourseSenser    作者:WladimirSidorenko    | 项目源码 | 文件源码
def test_train_0(self):
        with patch("dsenser.wang.wangbase.GridSearchCV"):
            with patch.multiple(self.wb,
                                _generate_ts=lambda *x: (self.TRAIN_X, Y),
                                _model=MOCK_DEFAULT):
                self.wb.train(([], []), None)
项目:DiscourseSenser    作者:WladimirSidorenko    | 项目源码 | 文件源码
def test_train_1(self):
        rels = [REL1] * NFOLDS
        parses = [PARSE1] * NFOLDS
        with patch("dsenser.wang.wangbase.GridSearchCV"):
            with patch.multiple(self.wb,
                                _generate_ts=lambda *x: (self.TRAIN_X, Y),
                                _model=MOCK_DEFAULT):
                self.wb.train((rels, parses),
                              (rels, parses))
项目:DiscourseSenser    作者:WladimirSidorenko    | 项目源码 | 文件源码
def test_get_train_w_emb_i(self):
        with patch.multiple(self.nnbs,
                            w2emb_i={'1': 1, "hello": 2},
                            _w_stat={"world": 1, "z": 3},
                            w_i=3):
            with patch("dsenser.nnbase.UNK_PROB",
                       MagicMock(return_value=True)):
                assert self.nnbs._get_train_w_emb_i("1024") == 1
                assert self.nnbs._get_train_w_emb_i("HELLO") == 2
                assert self.nnbs._get_train_w_emb_i("world") == \
                    self.nnbs.unk_w_i
                assert self.nnbs._get_train_w_emb_i("Z") == 3
                assert self.nnbs.w_i == 4
项目:DiscourseSenser    作者:WladimirSidorenko    | 项目源码 | 文件源码
def test_get_train_w2v_emb_i(self):
        with patch.multiple(self.nnbs,
                            w2emb_i={'1': 1, "hello": 2},
                            w2v={"world": None},
                            w_i=3):
            assert self.nnbs._get_train_w2v_emb_i("1024") == 1
            assert self.nnbs._get_train_w2v_emb_i("HELLO") == 2
            assert self.nnbs._get_train_w2v_emb_i("world") == 3
            assert self.nnbs._get_train_w2v_emb_i("Z") == self.nnbs.unk_w_i
            assert self.nnbs.w_i == 4
项目:DiscourseSenser    作者:WladimirSidorenko    | 项目源码 | 文件源码
def test_get_test_w2v_emb_i(self):
        with patch.multiple(self.nnbs,
                            w2emb_i={'1': 1, "hello": 2},
                            w2v={"world": VEC3},
                            W_EMB=np.vstack([VEC0, VEC1, VEC2])):
            assert np.allclose(self.nnbs._get_test_w2v_emb_i("Z"), VEC0)
            assert np.allclose(self.nnbs._get_test_w2v_emb_i("256"), VEC1)
            assert np.allclose(self.nnbs._get_test_w2v_emb_i("HELLO"), VEC2)
            assert np.allclose(self.nnbs._get_test_w2v_emb_i("world"), VEC3)
项目:DiscourseSenser    作者:WladimirSidorenko    | 项目源码 | 文件源码
def test_get_train_c_emb_i_1(self):
        conn2emb = {}
        CONN = "USUALLY WHEN"
        with patch.multiple(self.nnbs,
                            c_i=2,
                            c2emb_i=conn2emb):
            ret = self.nnbs.get_train_c_emb_i(CONN)
            assert ret == 2
            assert self.nnbs.c_i == 3
项目:DiscourseSenser    作者:WladimirSidorenko    | 项目源码 | 文件源码
def test_init_w_emb(self):
        with patch.multiple(self.nnbs, _params=[], ndim=3):
            self.nnbs._init_w_emb()
            assert len(self.nnbs._params) == 1
项目:DiscourseSenser    作者:WladimirSidorenko    | 项目源码 | 文件源码
def test_init_wemb_funcs_1(self):
        word2vec = Mock()
        with patch.multiple(self.nnbs, _plain_w2v=True, w2v=None,
                            _trained=False):
            with patch.object(dsenser.nnbase, "Word2Vec", word2vec):
                self.nnbs._init_wemb_funcs()
                assert self.nnbs.get_test_w_emb_i == \
                    self.nnbs._get_train_w2v_emb_i
项目:DiscourseSenser    作者:WladimirSidorenko    | 项目源码 | 文件源码
def test_init_funcs_0(self):
        rmsmock = MagicMock(return_value=(1, 2, 3))
        theano_mock = Mock()
        theano_mock.function = MagicMock()
        with patch.multiple(dsenser.nnbase, rmsprop=rmsmock,
                            theano=theano_mock):
                self.nnbs._init_funcs(True)
                assert rmsmock.called
项目:charm-ceph    作者:openstack    | 项目源码 | 文件源码
def test_nrpe_dependency_installed(self):
        with patch.multiple(ceph_hooks,
                            apt_install=DEFAULT,
                            rsync=DEFAULT,
                            log=DEFAULT,
                            write_file=DEFAULT,
                            nrpe=DEFAULT) as mocks:
            ceph_hooks.update_nrpe_config()
        mocks["apt_install"].assert_called_once_with(
            ["python-dbus", "lockfile-progs"])
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def test_cached(self):
        mock_t = Mock()
        mock_std = Mock()
        mock_stpp = Mock()
        mock_stm = Mock()
        mock_mct = Mock()
        mock_mbs = Mock()
        mock_mos = Mock()
        with patch.multiple(
            pb,
            autospec=True,
            _transactions=DEFAULT,
            _scheduled_transactions_date=DEFAULT,
            _scheduled_transactions_per_period=DEFAULT,
            _scheduled_transactions_monthly=DEFAULT,
            _make_combined_transactions=DEFAULT,
            _make_budget_sums=DEFAULT,
            _make_overall_sums=DEFAULT
        ) as mocks:
            mocks['_transactions'].return_value.all.return_value = mock_t
            mocks['_scheduled_transactions_date'
                  ''].return_value.all.return_value = mock_std
            mocks['_scheduled_transactions_per_period'
                  ''].return_value.all.return_value = mock_stpp
            mocks['_scheduled_transactions_monthly'
                  ''].return_value.all.return_value = mock_stm
            mocks['_make_combined_transactions'].return_value = mock_mct
            mocks['_make_budget_sums'].return_value = mock_mbs
            mocks['_make_overall_sums'].return_value = mock_mos
            self.cls._data_cache = {'foo': 'bar'}
            res = self.cls._data
        assert res == {'foo': 'bar'}
        assert mocks['_transactions'].mock_calls == []
        assert mocks['_scheduled_transactions_date'].mock_calls == []
        assert mocks['_scheduled_transactions_per_period'].mock_calls == []
        assert mocks['_scheduled_transactions_monthly'].mock_calls == []
        assert mocks['_make_combined_transactions'].mock_calls == []
        assert mocks['_make_budget_sums'].mock_calls == []
        assert mocks['_make_overall_sums'].mock_calls == []
项目:django-elasticsearch-dsl    作者:sabricot    | 项目源码 | 文件源码
def test_rebuild_indices(self):

        with patch.multiple(
            Command, _create=DEFAULT, _delete=DEFAULT, _populate=DEFAULT
        ) as handles:
            handles['_delete'].return_value = True
            call_command('search_index', stdout=self.out, action='rebuild')
            handles['_delete'].assert_called()
            handles['_create'].assert_called()
            handles['_populate'].assert_called()
项目:django-elasticsearch-dsl    作者:sabricot    | 项目源码 | 文件源码
def test_rebuild_indices_aborted(self):

        with patch.multiple(
            Command, _create=DEFAULT, _delete=DEFAULT, _populate=DEFAULT
        ) as handles:
            handles['_delete'].return_value = False
            call_command('search_index', stdout=self.out, action='rebuild')
            handles['_delete'].assert_called()
            handles['_create'].assert_not_called()
            handles['_populate'].assert_not_called()
项目:deb-python-requests-kerberos    作者:openstack    | 项目源码 | 文件源码
def test_force_preemptive(self):
        with patch.multiple(kerberos_module_name,
                            authGSSClientInit=clientInit_complete,
                            authGSSClientResponse=clientResponse,
                            authGSSClientStep=clientStep_continue):
            auth = requests_kerberos.HTTPKerberosAuth(force_preemptive=True)

            request = requests.Request(url="http://www.example.org")

            auth.__call__(request)

            self.assertTrue('Authorization' in request.headers)
            self.assertEqual(request.headers.get('Authorization'), 'Negotiate GSSRESPONSE')
项目:deb-python-requests-kerberos    作者:openstack    | 项目源码 | 文件源码
def test_no_force_preemptive(self):
        with patch.multiple(kerberos_module_name,
                            authGSSClientInit=clientInit_complete,
                            authGSSClientResponse=clientResponse,
                            authGSSClientStep=clientStep_continue):
            auth = requests_kerberos.HTTPKerberosAuth()

            request = requests.Request(url="http://www.example.org")

            auth.__call__(request)

            self.assertTrue('Authorization' not in request.headers)
项目:python-wiremock    作者:platinummonkey    | 项目源码 | 文件源码
def test_with_statement(self):
        with patch.multiple(WireMockServer, start=DEFAULT, stop=DEFAULT) as mocks:

            with WireMockServer() as wm:
                self.assertIsInstance(wm, WireMockServer)
                mocks['start'].assert_called_once_with()

            mocks['stop'].assert_called_once_with()
项目:tracboat    作者:nazavode    | 项目源码 | 文件源码
def test_connectionbase():
        # Temporarily disable ABC checks
        with patch.multiple(ConnectionBase, __abstractmethods__=set()):
            with pytest.raises(TypeError):
                ConnectionBase()
            c = ConnectionBase('project')
            assert c.project_name == 'project'
            assert not c.project_namespace
            assert c.project_qualname == 'project'
            c = ConnectionBase('ns/project')
            assert c.project_name == 'project'
            assert c.project_namespace == 'ns'
            assert c.project_qualname == 'ns/project'
项目:vault-redirector-twisted    作者:manheim    | 项目源码 | 文件源码
def test_acceptance(self):
        logger.debug('starting acceptance test')
        with patch.multiple(
            pb,
            get_active_node=DEFAULT,
            run_reactor=DEFAULT,
            update_active_node=DEFAULT
        ) as cls_mocks:
            # setup some return values
            cls_mocks['run_reactor'].side_effect = self.se_run_reactor
            cls_mocks['get_active_node'].return_value = 'foo:1234'
            cls_mocks['update_active_node'].side_effect = self.se_update_active
            # instantiate class
            self.cls = VaultRedirector('consul:1234', poll_interval=0.5)
            self.cls.log_enabled = False
            # make sure active is None (starting state)
            assert self.cls.active_node_ip_port is None
            self.cls.bind_port = self.get_open_port()
            self.cls.log_enabled = True
            # setup an async task to make the HTTP request
            self.cls.reactor.callLater(2.0, self.se_requester)
            # do this in case the callLater for self.stop_reactor fails...
            signal.signal(signal.SIGALRM, self.stop_reactor)
            signal.alarm(20)  # send SIGALRM in 20 seconds, to stop runaway loop
            self.cls.run()
            signal.alarm(0)  # disable SIGALRM
        assert self.cls.active_node_ip_port == 'bar:5678'  # from update_active
        assert self.update_active_called is True
        assert cls_mocks['update_active_node'].mock_calls[0] == call()
        assert cls_mocks['run_reactor'].mock_calls == [call()]
        assert cls_mocks['get_active_node'].mock_calls == [call()]
        # HTTP response checks
        resp = json.loads(self.response)
        # /bar/baz
        assert resp['/bar/baz']['headers'][
                   'Server'] == "vault-redirector/%s/TwistedWeb/%s" % (
            _VERSION, twisted_version.short()
        )
        assert resp['/bar/baz']['headers'][
                   'Location'] == 'http://bar:5678/bar/baz'
        assert resp['/bar/baz']['status_code'] == 307
        # /vault-redirector-health
        assert resp['/vault-redirector-health']['status_code'] == 200
        health_info = json.loads(resp['/vault-redirector-health']['text'])
        assert resp['/vault-redirector-health']['headers'][
            'Content-Type'] == 'application/json'
        assert health_info['healthy'] is True
        assert health_info['application'] == 'vault-redirector'
        assert health_info['version'] == _VERSION
        assert health_info['source'] == _PROJECT_URL
        assert health_info['consul_host_port'] == 'consul:1234'
        assert health_info['active_vault'] == 'bar:5678'
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def test_initial(self):
        mock_t = Mock()
        mock_std = Mock()
        mock_stpp = Mock()
        mock_stm = Mock()
        mock_mct = Mock()
        mock_mbs = Mock()
        mock_mos = Mock()
        with patch.multiple(
            pb,
            autospec=True,
            _transactions=DEFAULT,
            _scheduled_transactions_date=DEFAULT,
            _scheduled_transactions_per_period=DEFAULT,
            _scheduled_transactions_monthly=DEFAULT,
            _make_combined_transactions=DEFAULT,
            _make_budget_sums=DEFAULT,
            _make_overall_sums=DEFAULT
        ) as mocks:
            mocks['_transactions'].return_value.all.return_value = mock_t
            mocks['_scheduled_transactions_date'
                  ''].return_value.all.return_value = mock_std
            mocks['_scheduled_transactions_per_period'
                  ''].return_value.all.return_value = mock_stpp
            mocks['_scheduled_transactions_monthly'
                  ''].return_value.all.return_value = mock_stm
            mocks['_make_combined_transactions'].return_value = mock_mct
            mocks['_make_budget_sums'].return_value = mock_mbs
            mocks['_make_overall_sums'].return_value = mock_mos
            res = self.cls._data
        assert res == {
            'transactions': mock_t,
            'st_date': mock_std,
            'st_per_period': mock_stpp,
            'st_monthly': mock_stm,
            'all_trans_list': mock_mct,
            'budget_sums': mock_mbs,
            'overall_sums': mock_mos
        }
        assert mocks['_transactions'].mock_calls == [
            call(self.cls), call().all()
        ]
        assert mocks['_scheduled_transactions_date'].mock_calls == [
            call(self.cls), call().all()
        ]
        assert mocks['_scheduled_transactions_per_period'].mock_calls == [
            call(self.cls), call().all()
        ]
        assert mocks['_scheduled_transactions_monthly'].mock_calls == [
            call(self.cls), call().all()
        ]
        assert mocks['_make_combined_transactions'].mock_calls == [
            call(self.cls)
        ]
        assert mocks['_make_budget_sums'].mock_calls == [
            call(self.cls)
        ]
        assert mocks['_make_overall_sums'].mock_calls == [
            call(self.cls)
        ]
项目:stackforce-ansible    作者:devjatkin    | 项目源码 | 文件源码
def test_base(self):
        class MockGroup(MagicMock):
            def __init__(self, name):
                super(MockGroup, self).__init__()
                self.name = name

            hosts = []
            vars = {}

            def __repr__(self):
                return str(self.name)

        group_all = MockGroup("all")
        compute = MockGroup("compute")
        controller = MockGroup("controller")
        ungrouped = MockGroup("ungrouped")
        group_all.hosts = ['localhost']
        compute.hosts = ['localhost']
        compute.vars = {'localhost': {
            "ansible_connection": "local",
            "neutron_physical_interface_mappings": "vlan:enp0s8,"
                                                   "external:enp0s3",
            "cinder_disk": "/dev/sdc"}}
        controller.hosts = ['localhost']
        controller.vars = {'localhost': {
            "ansible_connection": "local"
        }}
        mocked_inv = MagicMock()
        mocked_inv.groups = {'all': group_all,
                             'compute': compute,
                             'controller': controller,
                             'ungrouped': ungrouped}
        mocked_dl = MagicMock()
        inventory_path = "NonePath"
        with patch.multiple('inventory.dynlxc', InventoryParser=mocked_inv,
                            DataLoader=mocked_dl):
            res = dynlxc.read_inventory_file(inventory_path)
            mocked_dl.assert_called_once_with()
            assert "all" in res
            assert "_meta" in res
            assert "hostvars" in res["_meta"]
            assert "groupvars" in res["_meta"]
            assert "localhost" in res["all"]
            assert "compute" in res
            assert "localhost" in res["compute"]["hosts"]