Python mock 模块,DEFAULT 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用mock.DEFAULT

项目:Eskapade    作者:KaveIO    | 项目源码 | 文件源码
def test_init(self, mock_get_env_var, mock_get_dir_path):
        """Test initialization of config object"""

        # set return values of project utility functions
        mock_get_env_var.side_effect = lambda *a, **k: ':0.0' if a and a[0] == 'display' else mock.DEFAULT
        mock_get_dir_path.side_effect = lambda *a, **k: 'es_path' if a and a[0] == 'es_root' else mock.DEFAULT

        # create mock config object
        mock_config_object = mock.MagicMock(name='ConfigObject_instance')
        settings = {}
        mock_config_object.__getitem__ = lambda s, k: settings.__getitem__(k)
        mock_config_object.__setitem__ = lambda s, k, v: settings.__setitem__(k, v)
        mock_config_object.get = settings.get

        # call init method with mock variables
        CONFIG_VARS.update([('sec1', ['var1']), ('sec2', ['var2', 'var3'])])
        CONFIG_DEFAULTS.update(var1='foo', var3=42)
        ConfigObject.__init__(mock_config_object)

        # check values of settings variables
        exp_settings = dict(var1='foo', var2=None, var3=42, batchMode=False, esRoot='es_path',
                            resultsDir='es_path/results', dataDir='es_path/data', macrosDir='es_path/tutorials',
                            templatesDir='es_path/templates', configDir='es_path/config')
        self.assertDictEqual(settings, exp_settings, 'unexpected resulting settings dictionary')
项目:Dallinger    作者:Dallinger    | 项目源码 | 文件源码
def test_sends_notification_if_resubmitted(self, run_check):
        # Include whimsical set to True to avoid error in the False code branch:
        config = {
            'duration': 1.0,
            'host': 'fakehost.herokuapp.com',
            'whimsical': False
        }
        fake_assignment = mock.Mock(AssignmentStatus='Submitted')
        mturk = mock.Mock(**{'get_assignment.return_value': [fake_assignment]})
        participants = [self.a.participant()]
        session = None
        # Move the clock forward so assignment is overdue:
        reference_time = datetime.datetime.now() + datetime.timedelta(hours=6)
        mock_messager = mock.Mock(spec=NullHITMessager)
        with mock.patch.multiple('dallinger.heroku.clock',
                                 requests=mock.DEFAULT,
                                 NullHITMessager=mock.DEFAULT) as mocks:
            mocks['NullHITMessager'].return_value = mock_messager
            run_check(config, mturk, participants, session, reference_time)
            mock_messager.send_resubmitted_msg.assert_called()
项目:Dallinger    作者:Dallinger    | 项目源码 | 文件源码
def test_no_assignement_on_mturk_sends_hit_cancelled_message(self, run_check):
        # Include whimsical set to True to avoid error in the False code branch:
        config = {
            'duration': 1.0,
            'host': 'fakehost.herokuapp.com',
            'whimsical': False
        }
        mturk = mock.Mock(**{'get_assignment.return_value': []})
        participants = [self.a.participant()]
        session = None
        # Move the clock forward so assignment is overdue:
        reference_time = datetime.datetime.now() + datetime.timedelta(hours=6)
        mock_messager = mock.Mock(spec=NullHITMessager)
        with mock.patch.multiple('dallinger.heroku.clock',
                                 requests=mock.DEFAULT,
                                 NullHITMessager=mock.DEFAULT) as mocks:
            mocks['NullHITMessager'].return_value = mock_messager
            run_check(config, mturk, participants, session, reference_time)
            mock_messager.send_hit_cancelled_msg.assert_called()
项目:gax-python    作者:googleapis    | 项目源码 | 文件源码
def test_retry(self, mock_exc_to_code, mock_time):
        mock_exc_to_code.side_effect = lambda e: e.code
        to_attempt = 3
        retry = RetryOptions(
            [_FAKE_STATUS_CODE_1],
            BackoffSettings(0, 0, 0, 0, 0, 0, 1))

        # Succeeds on the to_attempt'th call, and never again afterward
        mock_call = mock.Mock()
        mock_call.side_effect = ([CustomException('', _FAKE_STATUS_CODE_1)] *
                                 (to_attempt - 1) + [mock.DEFAULT])
        mock_call.return_value = 1729
        mock_time.return_value = 0
        settings = _CallSettings(timeout=0, retry=retry)
        my_callable = api_callable.create_api_call(mock_call, settings)
        self.assertEqual(my_callable(None), 1729)
        self.assertEqual(mock_call.call_count, to_attempt)
项目:gax-python    作者:googleapis    | 项目源码 | 文件源码
def test_retryable_without_timeout(self, mock_time, mock_exc_to_code):
        mock_time.return_value = 0
        mock_exc_to_code.side_effect = lambda e: e.code

        to_attempt = 3
        mock_call = mock.Mock()
        mock_call.side_effect = ([CustomException('', _FAKE_STATUS_CODE_1)] *
                                 (to_attempt - 1) + [mock.DEFAULT])
        mock_call.return_value = 1729

        retry_options = RetryOptions(
            [_FAKE_STATUS_CODE_1],
            BackoffSettings(0, 0, 0, None, None, None, None))

        my_callable = retry.retryable(mock_call, retry_options)

        self.assertEqual(my_callable(None), 1729)
        self.assertEqual(to_attempt, mock_call.call_count)
项目:gax-python    作者:googleapis    | 项目源码 | 文件源码
def test_retryable_with_timeout(self, mock_time, mock_exc_to_code):
        mock_time.return_value = 1
        mock_exc_to_code.side_effect = lambda e: e.code

        mock_call = mock.Mock()
        mock_call.side_effect = [CustomException('', _FAKE_STATUS_CODE_1),
                                 mock.DEFAULT]
        mock_call.return_value = 1729

        retry_options = RetryOptions(
            [_FAKE_STATUS_CODE_1],
            BackoffSettings(0, 0, 0, 0, 0, 0, 0))

        my_callable = retry.retryable(mock_call, retry_options)

        self.assertRaises(errors.RetryError, my_callable)
        self.assertEqual(0, mock_call.call_count)
项目:gax-python    作者:googleapis    | 项目源码 | 文件源码
def test_retryable_when_no_codes(self, mock_time, mock_exc_to_code):
        mock_time.return_value = 0
        mock_exc_to_code.side_effect = lambda e: e.code

        mock_call = mock.Mock()
        mock_call.side_effect = [CustomException('', _FAKE_STATUS_CODE_1),
                                 mock.DEFAULT]
        mock_call.return_value = 1729

        retry_options = RetryOptions(
            [],
            BackoffSettings(0, 0, 0, 0, 0, 0, 1))

        my_callable = retry.retryable(mock_call, retry_options)

        try:
            my_callable(None)
            self.fail('Should not have been reached')
        except errors.RetryError as exc:
            self.assertIsInstance(exc.cause, CustomException)

        self.assertEqual(1, mock_call.call_count)
项目:gax-python    作者:googleapis    | 项目源码 | 文件源码
def test_retryable_aborts_on_unexpected_exception(
            self, mock_time, mock_exc_to_code):
        mock_time.return_value = 0
        mock_exc_to_code.side_effect = lambda e: e.code

        mock_call = mock.Mock()
        mock_call.side_effect = [CustomException('', _FAKE_STATUS_CODE_2),
                                 mock.DEFAULT]
        mock_call.return_value = 1729

        retry_options = RetryOptions(
            [_FAKE_STATUS_CODE_1],
            BackoffSettings(0, 0, 0, 0, 0, 0, 1))

        my_callable = retry.retryable(mock_call, retry_options)

        try:
            my_callable(None)
            self.fail('Should not have been reached')
        except errors.RetryError as exc:
            self.assertIsInstance(exc.cause, CustomException)

        self.assertEqual(1, mock_call.call_count)
项目:charm-ceph-mon    作者:openstack    | 项目源码 | 文件源码
def test_nrpe_dependency_installed(self, mock_config):
        config = copy.deepcopy(CHARM_CONFIG)
        mock_config.side_effect = lambda key: config[key]
        with patch.multiple(ceph_hooks,
                            apt_install=DEFAULT,
                            rsync=DEFAULT,
                            log=DEFAULT,
                            write_file=DEFAULT,
                            nrpe=DEFAULT) as mocks:
            ceph_hooks.update_nrpe_config()
        mocks["apt_install"].assert_called_once_with(
            ["python-dbus", "lockfile-progs"])
项目:DiscourseSenser    作者:WladimirSidorenko    | 项目源码 | 文件源码
def test_train_1(self):
        j = 2
        ret_train = np.zeros((6, 3, N_CLASSES))
        ret_dev = np.zeros((6, 3, N_CLASSES))
        func_ret = np.zeros((1, N_CLASSES))
        func_ret[0, j] = 1.
        with patch.multiple(self.wb,
                            _gs=True,
                            _generate_ts=lambda *x: (self.TRAIN_X, Y),
                            _extract_features=MagicMock(
                                return_value=FEATS),
                            _model=MOCK_DEFAULT):
            with patch("dsenser.wang.wangbase.GridSearchCV"):
                self.wb._model.decision_function = \
                    MagicMock(return_value=func_ret)
                self.wb._model.classes_ = CLASSES_
                self.wb.train(([(0, REL1)], [PARSE1]),
                              ([(0, REL1)], [PARSE1]),
                              1, 1, ret_train, ret_dev)
项目:charm-ceph    作者:openstack    | 项目源码 | 文件源码
def test_upgrade_charm_with_nrpe_relation_installs_dependencies(self):
        with patch.multiple(
                ceph_hooks,
                apt_install=DEFAULT,
                rsync=DEFAULT,
                log=DEFAULT,
                write_file=DEFAULT,
                nrpe=DEFAULT,
                emit_cephconf=DEFAULT,
                mon_relation_joined=DEFAULT,
                is_relation_made=DEFAULT) as mocks, patch(
                    "charmhelpers.contrib.hardening.harden.config"):
            mocks["is_relation_made"].return_value = True
            ceph_hooks.upgrade_charm()
        mocks["apt_install"].assert_called_with(
            ["python-dbus", "lockfile-progs"])
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def test__allocate_sockets(self):
        """Test allocating sockets.
        """
        # access protected module _allocate_sockets
        # pylint: disable=w0212

        socket.socket.bind.side_effect = [
            socket.error(errno.EADDRINUSE, 'In use'),
            mock.DEFAULT,
            mock.DEFAULT,
            mock.DEFAULT
        ]

        sockets = treadmill.runtime._allocate_sockets(
            'prod', '0.0.0.0', socket.SOCK_STREAM, 3
        )

        self.assertEqual(3, len(sockets))
项目:asynq    作者:quora    | 项目源码 | 文件源码
def patch(
        target, new=mock.DEFAULT, spec=None, create=False,
        mocksignature=False, spec_set=None, autospec=False,
        new_callable=None, **kwargs
):
    """Mocks an async function.

    Should be a drop-in replacement for mock.patch that handles async automatically. The .asynq
    attribute is automatically created and shouldn't be used when accessing data on
    the mock.

    """
    getter, attribute = _get_target(target)
    return _make_patch_async(
        getter, attribute, new, spec, create, mocksignature, spec_set, autospec, new_callable,
        kwargs
    )
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_attach_volume_fail(self, mock_execute):
        self.encryptor._get_key = mock.MagicMock()
        self.encryptor._get_key.return_value = \
                test_cryptsetup.fake__get_key(None)

        mock_execute.side_effect = [
                processutils.ProcessExecutionError(exit_code=1),  # luksOpen
                mock.DEFAULT,  # isLuks
        ]

        self.assertRaises(processutils.ProcessExecutionError,
                          self.encryptor.attach_volume, None)

        mock_execute.assert_has_calls([
            mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
                      self.dev_name, process_input='0' * 32,
                      run_as_root=True, check_exit_code=True),
            mock.call('cryptsetup', 'isLuks', '--verbose', self.dev_path,
                      run_as_root=True, check_exit_code=True),
        ], any_order=False)
        self.assertEqual(2, mock_execute.call_count)
项目:ironic-staging-drivers    作者:openstack    | 项目源码 | 文件源码
def test_deploy_no_callback(self, power_mock, get_ip_mock):
        self.config(group='ansible', use_ramdisk_callback=False)
        with mock.patch.multiple(self.driver,
                                 _ansible_deploy=mock.DEFAULT,
                                 reboot_to_instance=mock.DEFAULT) as moks:
            with task_manager.acquire(
                    self.context, self.node['uuid'], shared=False) as task:
                driver_return = self.driver.deploy(task)
                self.assertEqual(driver_return, states.DEPLOYDONE)
                power_mock.assert_called_once_with(task, states.REBOOT)
                get_ip_mock.assert_called_once_with(task)
                moks['_ansible_deploy'].assert_called_once_with(task,
                                                                '127.0.0.1')
                moks['reboot_to_instance'].assert_called_once_with(task)
项目:ironic-staging-drivers    作者:openstack    | 项目源码 | 文件源码
def test_continue_deploy(self, getip_mock):
        self.node.provision_state = states.DEPLOYWAIT
        self.node.target_provision_state = states.ACTIVE
        self.node.save()
        with task_manager.acquire(self.context, self.node.uuid) as task:
            with mock.patch.multiple(self.driver, autospec=True,
                                     _ansible_deploy=mock.DEFAULT,
                                     reboot_to_instance=mock.DEFAULT):
                self.driver.continue_deploy(task)
                getip_mock.assert_called_once_with(task)
                self.driver._ansible_deploy.assert_called_once_with(
                    task, '1.2.3.4')
                self.driver.reboot_to_instance.assert_called_once_with(task)
            self.assertEqual(states.ACTIVE, task.node.target_provision_state)
            self.assertEqual(states.DEPLOYING, task.node.provision_state)
项目:heaviside    作者:jhuapl-boss    | 项目源码 | 文件源码
def test_run(self, mCreateSession, mSample):
        mSample.return_value = 'XXX'
        iSession = MockSession()
        mCreateSession.return_value = (iSession, '123456')
        client = iSession.client('stepfunctions')
        client.list_activities.return_value = {
            'activities':[{
                'name': 'name',
                'activityArn': 'XXX'
            }]
        }
        client.get_activity_task.return_value = {
            'taskToken': 'YYY',
            'input': '{}'
        }

        target = mock.MagicMock()

        activity = ActivityMixin(handle_task = target)

        def stop_loop(*args, **kwargs):
            activity.polling = False
            return mock.DEFAULT
        target.side_effect = stop_loop

        activity.run('name')

        calls = [
            mock.call.list_activities(),
            mock.call.get_activity_task(activityArn = 'XXX',
                                        workerName = 'name-XXX')
        ]
        self.assertEqual(client.mock_calls, calls)

        calls = [
            mock.call('YYY', {}),
            mock.call().start()
        ]
        self.assertEqual(target.mock_calls, calls)
项目:deckhand    作者:att-comdev    | 项目源码 | 文件源码
def patchobject(self, target, attribute, new=mock.DEFAULT, autospec=True):
        """Convenient wrapper around `mock.patch.object`

        Returns a started mock that will be automatically stopped after the
        test ran.
        """

        p = mock.patch.object(target, attribute, new, autospec=autospec)
        m = p.start()
        self.addCleanup(p.stop)
        return m
项目:Dallinger    作者:Dallinger    | 项目源码 | 文件源码
def test_check_db_for_missing_notifications_assembles_resources(self, run_check):
        # Can't import until after config is loaded:
        from dallinger.heroku.clock import check_db_for_missing_notifications
        with mock.patch.multiple('dallinger.heroku.clock',
                                 run_check=mock.DEFAULT,
                                 MTurkConnection=mock.DEFAULT) as mocks:
            mocks['MTurkConnection'].return_value = 'fake connection'
            check_db_for_missing_notifications()

            mocks['run_check'].assert_called()
项目:Dallinger    作者:Dallinger    | 项目源码 | 文件源码
def recruiter(self):
        from dallinger.recruiters import BotRecruiter
        with mock.patch.multiple('dallinger.recruiters',
                                 _get_queue=mock.DEFAULT,
                                 get_base_url=mock.DEFAULT) as mocks:
            mocks['get_base_url'].return_value = 'fake_base_url'
            r = BotRecruiter()
            r._get_bot_factory = mock.Mock()
            yield r
项目:Dallinger    作者:Dallinger    | 项目源码 | 文件源码
def recruiter(self, active_config):
        from dallinger.mturk import MTurkService
        from dallinger.recruiters import MTurkRecruiter
        with mock.patch.multiple('dallinger.recruiters',
                                 os=mock.DEFAULT,
                                 get_base_url=mock.DEFAULT) as mocks:
            mocks['get_base_url'].return_value = 'http://fake-domain'
            mocks['os'].getenv.return_value = 'fake-host-domain'
            mockservice = mock.create_autospec(MTurkService)
            active_config.extend({'mode': u'sandbox'})
            r = MTurkRecruiter()
            r.mturkservice = mockservice('fake key', 'fake secret')
            r.mturkservice.check_credentials.return_value = True
            r.mturkservice.create_hit.return_value = {'type_id': 'fake type id'}
            return r
项目:Dallinger    作者:Dallinger    | 项目源码 | 文件源码
def faster(tempdir):
    with mock.patch.multiple('dallinger.command_line',
                             time=mock.DEFAULT,
                             setup_experiment=mock.DEFAULT) as mocks:
        mocks['setup_experiment'].return_value = ('fake-uid', tempdir)

        yield mocks
项目:RPoint    作者:george17-meet    | 项目源码 | 文件源码
def testDEFAULT(self):
        self.assertIs(DEFAULT, sentinel.DEFAULT)
项目:charm-ceph-mon    作者:openstack    | 项目源码 | 文件源码
def test_upgrade_charm_with_nrpe_relation_installs_dependencies(
            self,
            mock_config):
        config = copy.deepcopy(CHARM_CONFIG)
        mock_config.side_effect = lambda key: config[key]
        with patch.multiple(
                ceph_hooks,
                apt_install=DEFAULT,
                rsync=DEFAULT,
                log=DEFAULT,
                write_file=DEFAULT,
                nrpe=DEFAULT,
                emit_cephconf=DEFAULT,
                mon_relation_joined=DEFAULT,
                is_relation_made=DEFAULT) as mocks, patch(
                    "charmhelpers.contrib.hardening.harden.config"):
            mocks["is_relation_made"].return_value = True
            ceph_hooks.upgrade_charm()
        mocks["apt_install"].assert_called_with(
            ["python-dbus", "lockfile-progs"])
项目:supvisors    作者:julien6387    | 项目源码 | 文件源码
def test_send_request(self):
        """ Test the execution of a deferred Supervisor request. """
        from supvisors.mainloop import SupvisorsMainLoop
        from supvisors.utils import DeferredRequestHeaders
        main_loop = SupvisorsMainLoop(self.supvisors)
        # patch main loop subscriber
        with patch.multiple(main_loop, check_address=DEFAULT,
            start_process=DEFAULT, stop_process=DEFAULT,
            restart=DEFAULT, shutdown=DEFAULT) as mocked_loop:
            # test check address
            self.check_call(main_loop, mocked_loop, 'check_address',
                            DeferredRequestHeaders.CHECK_ADDRESS,
                            ('10.0.0.2', ))
            # test start process
            self.check_call(main_loop, mocked_loop, 'start_process',
                            DeferredRequestHeaders.START_PROCESS,
                            ('10.0.0.2', 'dummy_process', 'extra args'))
            # test stop process
            self.check_call(main_loop, mocked_loop, 'stop_process',
                            DeferredRequestHeaders.STOP_PROCESS,
                            ('10.0.0.2', 'dummy_process'))
            # test restart
            self.check_call(main_loop, mocked_loop, 'restart',
                            DeferredRequestHeaders.RESTART,
                            ('10.0.0.2', ))
            # test shutdown
            self.check_call(main_loop, mocked_loop, 'shutdown',
                            DeferredRequestHeaders.SHUTDOWN,
                            ('10.0.0.2', ))
项目:supvisors    作者:julien6387    | 项目源码 | 文件源码
def test_on_remote_event(self):
        """ Test the reception of a Supervisor remote comm event. """
        from supvisors.listener import SupervisorListener
        listener = SupervisorListener(self.supvisors)
        # add patches for what is tested just above
        with patch.multiple(listener, unstack_event=DEFAULT,
                unstack_info=DEFAULT, authorization=DEFAULT):
            # test unknown type
            event = Mock(type='unknown', data='')
            listener.on_remote_event(event)
            self.assertFalse(listener.unstack_event.called)
            self.assertFalse(listener.unstack_info.called)
            self.assertFalse(listener.authorization.called)
            # test event
            event = Mock(type='event', data={'state': 'RUNNING'})
            listener.on_remote_event(event)
            self.assertEqual([call({'state': 'RUNNING'})],
                listener.unstack_event.call_args_list)
            self.assertFalse(listener.unstack_info.called)
            self.assertFalse(listener.authorization.called)
            listener.unstack_event.reset_mock()
            # test info
            event = Mock(type='info', data={'name': 'dummy_process'})
            listener.on_remote_event(event)
            self.assertFalse(listener.unstack_event.called)
            self.assertEqual([call({'name': 'dummy_process'})],
                listener.unstack_info.call_args_list)
            self.assertFalse(listener.authorization.called)
            listener.unstack_info.reset_mock()
            # test authorization
            event = Mock(type='auth', data=('10.0.0.1', True))
            listener.on_remote_event(event)
            self.assertFalse(listener.unstack_event.called)
            self.assertFalse(listener.unstack_info.called)
            self.assertEqual([call(('10.0.0.1', True))],
                listener.authorization.call_args_list)
项目:vault-redirector-twisted    作者:manheim    | 项目源码 | 文件源码
def test_run(self):
        self.cls.reactor = Mock(spec_set=reactor)
        with patch.multiple(
            pbm,
            logger=DEFAULT,
            Site=DEFAULT,
            LoopingCall=DEFAULT,
            VaultRedirectorSite=DEFAULT
        ) as mod_mocks:
            with patch.multiple(
                pb,
                get_active_node=DEFAULT,
                run_reactor=DEFAULT,
                listentcp=DEFAULT,
                add_update_loop=DEFAULT,
                listentls=DEFAULT
            ) as cls_mocks:
                cls_mocks['get_active_node'].return_value = 'consul:1234'
                self.cls.run()
        assert self.cls.active_node_ip_port == 'consul:1234'
        assert mod_mocks['logger'].mock_calls == [
            call.warning('Initial Vault active node: %s', 'consul:1234'),
            call.warning('Starting Twisted reactor (event loop)')
        ]
        assert mod_mocks['VaultRedirectorSite'].mock_calls == [call(self.cls)]
        assert mod_mocks['Site'].mock_calls == [
            call(mod_mocks['VaultRedirectorSite'].return_value)
        ]
        assert self.cls.reactor.mock_calls == []
        assert cls_mocks['run_reactor'].mock_calls == [call()]
        assert mod_mocks['LoopingCall'].mock_calls == []
        assert cls_mocks['listentcp'].mock_calls == [
            call(mod_mocks['Site'].return_value)
        ]
        assert cls_mocks['add_update_loop'].mock_calls == [call()]
        assert cls_mocks['listentls'].mock_calls == []
项目:vault-redirector-twisted    作者:manheim    | 项目源码 | 文件源码
def test_run_tls(self):
        self.cls.reactor = Mock(spec_set=reactor)
        self.cls.tls_factory = Mock()
        with patch.multiple(
            pbm,
            logger=DEFAULT,
            Site=DEFAULT,
            LoopingCall=DEFAULT,
            VaultRedirectorSite=DEFAULT
        ) as mod_mocks:
            with patch.multiple(
                pb,
                get_active_node=DEFAULT,
                run_reactor=DEFAULT,
                listentcp=DEFAULT,
                add_update_loop=DEFAULT,
                listentls=DEFAULT
            ) as cls_mocks:
                cls_mocks['get_active_node'].return_value = 'consul:1234'
                self.cls.run()
        assert self.cls.active_node_ip_port == 'consul:1234'
        assert mod_mocks['logger'].mock_calls == [
            call.warning('Initial Vault active node: %s', 'consul:1234'),
            call.warning('Starting Twisted reactor (event loop)')
        ]
        assert mod_mocks['VaultRedirectorSite'].mock_calls == [call(self.cls)]
        assert mod_mocks['Site'].mock_calls == [
            call(mod_mocks['VaultRedirectorSite'].return_value)
        ]
        assert self.cls.reactor.mock_calls == []
        assert cls_mocks['run_reactor'].mock_calls == [call()]
        assert mod_mocks['LoopingCall'].mock_calls == []
        assert cls_mocks['listentls'].mock_calls == [
            call(mod_mocks['Site'].return_value)
        ]
        assert cls_mocks['add_update_loop'].mock_calls == [call()]
        assert cls_mocks['listentcp'].mock_calls == []
项目:vault-redirector-twisted    作者:manheim    | 项目源码 | 文件源码
def test_run_error(self):
        self.cls.reactor = Mock(spec_set=reactor)
        with patch.multiple(
            pbm,
            logger=DEFAULT,
            Site=DEFAULT,
            LoopingCall=DEFAULT,
            VaultRedirectorSite=DEFAULT
        ) as mod_mocks:
            with patch.multiple(
                pb,
                get_active_node=DEFAULT,
                run_reactor=DEFAULT,
                listentcp=DEFAULT,
                add_update_loop=DEFAULT
            ) as cls_mocks:
                cls_mocks['get_active_node'].return_value = None
                with pytest.raises(SystemExit) as excinfo:
                    self.cls.run()
        assert excinfo.value.code == 3
        assert mod_mocks['logger'].mock_calls == [
            call.critical("ERROR: Could not get active vault node from "
                          "Consul. Exiting.")
        ]
        assert mod_mocks['VaultRedirectorSite'].mock_calls == []
        assert mod_mocks['Site'].mock_calls == []
        assert self.cls.reactor.mock_calls == []
        assert cls_mocks['run_reactor'].mock_calls == []
        assert mod_mocks['LoopingCall'].mock_calls == []
项目:http-prompt    作者:eliangcs    | 项目源码 | 文件源码
def run_and_exit(cli_args=None, prompt_commands=None):
    """Run http-prompt executable, execute some prompt commands, and exit."""
    if cli_args is None:
        cli_args = []

        # Make sure last command is 'exit'
    if prompt_commands is None:
        prompt_commands = ['exit']
    else:
        prompt_commands += ['exit']

    # Fool cli() so that it believes we're running from CLI instead of pytest.
    # We will restore it at the end of the function.
    orig_argv = sys.argv
    sys.argv = ['http-prompt'] + cli_args

    try:
        with patch.multiple('http_prompt.cli',
                            prompt=DEFAULT, execute=DEFAULT) as mocks:
            mocks['execute'].side_effect = execute

            # prompt() is mocked to return the command in 'prompt_commands' in
            # sequence, i.e., prompt() returns prompt_commands[i-1] when it is
            # called for the ith time
            mocks['prompt'].side_effect = prompt_commands

            result = CliRunner().invoke(cli, cli_args)
            context = mocks['execute'].call_args[0][1]

        return result, context
    finally:
        sys.argv = orig_argv
项目:DiscourseSenser    作者:WladimirSidorenko    | 项目源码 | 文件源码
def test_train_0(self):
        with patch.multiple(self.wb,
                            _extract_features=MOCK_DEFAULT,
                            _model=MOCK_DEFAULT):
            self.wb.train(([], []), None)
项目:DiscourseSenser    作者:WladimirSidorenko    | 项目源码 | 文件源码
def test_train_0(self):
        with patch("dsenser.wang.wangbase.GridSearchCV"):
            with patch.multiple(self.wb,
                                _generate_ts=lambda *x: (self.TRAIN_X, Y),
                                _model=MOCK_DEFAULT):
                self.wb.train(([], []), None)
项目:DiscourseSenser    作者:WladimirSidorenko    | 项目源码 | 文件源码
def test_train_1(self):
        rels = [REL1] * NFOLDS
        parses = [PARSE1] * NFOLDS
        with patch("dsenser.wang.wangbase.GridSearchCV"):
            with patch.multiple(self.wb,
                                _generate_ts=lambda *x: (self.TRAIN_X, Y),
                                _model=MOCK_DEFAULT):
                self.wb.train((rels, parses),
                              (rels, parses))
项目:charm-ceph    作者:openstack    | 项目源码 | 文件源码
def test_nrpe_dependency_installed(self):
        with patch.multiple(ceph_hooks,
                            apt_install=DEFAULT,
                            rsync=DEFAULT,
                            log=DEFAULT,
                            write_file=DEFAULT,
                            nrpe=DEFAULT) as mocks:
            ceph_hooks.update_nrpe_config()
        mocks["apt_install"].assert_called_once_with(
            ["python-dbus", "lockfile-progs"])
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def test_cached(self):
        mock_t = Mock()
        mock_std = Mock()
        mock_stpp = Mock()
        mock_stm = Mock()
        mock_mct = Mock()
        mock_mbs = Mock()
        mock_mos = Mock()
        with patch.multiple(
            pb,
            autospec=True,
            _transactions=DEFAULT,
            _scheduled_transactions_date=DEFAULT,
            _scheduled_transactions_per_period=DEFAULT,
            _scheduled_transactions_monthly=DEFAULT,
            _make_combined_transactions=DEFAULT,
            _make_budget_sums=DEFAULT,
            _make_overall_sums=DEFAULT
        ) as mocks:
            mocks['_transactions'].return_value.all.return_value = mock_t
            mocks['_scheduled_transactions_date'
                  ''].return_value.all.return_value = mock_std
            mocks['_scheduled_transactions_per_period'
                  ''].return_value.all.return_value = mock_stpp
            mocks['_scheduled_transactions_monthly'
                  ''].return_value.all.return_value = mock_stm
            mocks['_make_combined_transactions'].return_value = mock_mct
            mocks['_make_budget_sums'].return_value = mock_mbs
            mocks['_make_overall_sums'].return_value = mock_mos
            self.cls._data_cache = {'foo': 'bar'}
            res = self.cls._data
        assert res == {'foo': 'bar'}
        assert mocks['_transactions'].mock_calls == []
        assert mocks['_scheduled_transactions_date'].mock_calls == []
        assert mocks['_scheduled_transactions_per_period'].mock_calls == []
        assert mocks['_scheduled_transactions_monthly'].mock_calls == []
        assert mocks['_make_combined_transactions'].mock_calls == []
        assert mocks['_make_budget_sums'].mock_calls == []
        assert mocks['_make_overall_sums'].mock_calls == []
项目:forseti-security    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def test_insert_violations_with_error(self):
        """Test insert_violations handles errors during insert.

        Setup:
            * Create mocks:
                * self.dao.conn
                * self.dao.get_latest_snapshot_timestamp
                * self.dao.create_snapshot_table
            * Create side effect for one violation to raise an error.

        Expect:
            * Log MySQLError when table insert error occurs and return list
              of errors.
            * Return a tuple of (num_violations-1, [violation])
        """
        resource_name = 'policy_violations'
        self.dao.get_latest_snapshot_timestamp = mock.MagicMock(
            return_value=self.fake_snapshot_timestamp)
        self.dao.create_snapshot_table = mock.MagicMock(
            return_value=self.fake_table_name)
        violation_dao.LOGGER = mock.MagicMock()

        def insert_violation_side_effect(*args, **kwargs):
            if args[2] == self.expected_fake_violations[1]:
                raise MySQLdb.DataError(
                    self.resource_name, mock.MagicMock())
            else:
                return mock.DEFAULT

        self.dao.execute_sql_with_commit = mock.MagicMock(
            side_effect=insert_violation_side_effect)

        actual = self.dao.insert_violations(
            self.fake_flattened_violations,
            self.resource_name)

        expected = (2, [self.expected_fake_violations[1]])

        self.assertEqual(expected, actual)
        self.assertEquals(1, violation_dao.LOGGER.error.call_count)
项目:django-elasticsearch-dsl    作者:sabricot    | 项目源码 | 文件源码
def test_rebuild_indices(self):

        with patch.multiple(
            Command, _create=DEFAULT, _delete=DEFAULT, _populate=DEFAULT
        ) as handles:
            handles['_delete'].return_value = True
            call_command('search_index', stdout=self.out, action='rebuild')
            handles['_delete'].assert_called()
            handles['_create'].assert_called()
            handles['_populate'].assert_called()
项目:django-elasticsearch-dsl    作者:sabricot    | 项目源码 | 文件源码
def test_rebuild_indices_aborted(self):

        with patch.multiple(
            Command, _create=DEFAULT, _delete=DEFAULT, _populate=DEFAULT
        ) as handles:
            handles['_delete'].return_value = False
            call_command('search_index', stdout=self.out, action='rebuild')
            handles['_delete'].assert_called()
            handles['_create'].assert_not_called()
            handles['_populate'].assert_not_called()
项目:python-wiremock    作者:platinummonkey    | 项目源码 | 文件源码
def test_with_statement(self):
        with patch.multiple(WireMockServer, start=DEFAULT, stop=DEFAULT) as mocks:

            with WireMockServer() as wm:
                self.assertIsInstance(wm, WireMockServer)
                mocks['start'].assert_called_once_with()

            mocks['stop'].assert_called_once_with()
项目:python-ioctl    作者:olavmrk    | 项目源码 | 文件源码
def test_ioctl_fn_ptr_r(self, ioctl_mock):
        def _handle_ioctl(fd, request, int_ptr):
            assert fd == 12
            assert request == 32
            assert type(int_ptr) == ctypes.POINTER(ctypes.c_int)
            int_ptr.contents.value = 42
            return mock.DEFAULT
        ioctl_mock.side_effect = _handle_ioctl

        fn = ioctl.ioctl_fn_ptr_r(32, ctypes.c_int)
        res = fn(12)
        assert res == 42
项目:python-ioctl    作者:olavmrk    | 项目源码 | 文件源码
def test_ioctl_fn_ptr_w(self, ioctl_mock):
        def _handle_ioctl(fd, request, int_ptr):
            assert fd == 12
            assert request == 32
            assert type(int_ptr) == ctypes.POINTER(ctypes.c_int)
            assert int_ptr.contents.value == 42
            return mock.DEFAULT
        ioctl_mock.side_effect = _handle_ioctl

        fn = ioctl.ioctl_fn_ptr_w(32, ctypes.c_int)
        fn(12, 42)
项目:python-ioctl    作者:olavmrk    | 项目源码 | 文件源码
def test_ioctl_fn_ptr_wr(self, ioctl_mock):
        def _handle_ioctl(fd, request, int_ptr):
            assert fd == 12
            assert request == 32
            assert type(int_ptr) == ctypes.POINTER(ctypes.c_int)
            assert int_ptr.contents.value == 24
            int_ptr.contents.value = 42
            return mock.DEFAULT
        ioctl_mock.side_effect = _handle_ioctl

        fn = ioctl.ioctl_fn_ptr_wr(32, ctypes.c_int)
        res = fn(12, 24)
        assert res == 42
项目:python-ioctl    作者:olavmrk    | 项目源码 | 文件源码
def test_ioctl_fn_w(self, ioctl_mock):
        def _handle_ioctl(fd, request, int_val):
            assert fd == 12
            assert request == 32
            assert type(int_val) == ctypes.c_int
            assert int_val.value == 42
            return mock.DEFAULT
        ioctl_mock.side_effect = _handle_ioctl

        fn = ioctl.ioctl_fn_w(32, ctypes.c_int)
        fn(12, 42)
项目:asynq    作者:quora    | 项目源码 | 文件源码
def _patch_object(
        target, attribute, new=mock.DEFAULT, spec=None,
        create=False, mocksignature=False, spec_set=None, autospec=False,
        new_callable=None, **kwargs
):
    getter = lambda: target
    return _make_patch_async(
        getter, attribute, new, spec, create, mocksignature, spec_set, autospec, new_callable,
        kwargs
    )
项目:asynq    作者:quora    | 项目源码 | 文件源码
def _maybe_wrap_new(new):
    """If the mock replacement cannot have attributes set on it, wraps it in a function.

    Also, if the replacement object is a method, applies the async() decorator.

    This is needed so that we support patch(..., x.method) where x.method is an instancemethod
    object, because instancemethods do not support attribute assignment.

    """
    if new is mock.DEFAULT:
        return new

    if inspect.isfunction(new) or isinstance(new, (classmethod, staticmethod)):
        return asynq(sync_fn=new)(new)
    elif not callable(new):
        return new

    try:
        new._maybe_wrap_new_test_attribute = None
        del new._maybe_wrap_new_test_attribute
    except (AttributeError, TypeError):
        # setting something on a bound method raises AttributeError, setting something on a
        # Cythonized class raises TypeError
        should_wrap = True
    else:
        should_wrap = False

    if should_wrap:
        # we can't just use a lambda because that overrides __get__ and creates bound methods we
        # don't want, so we make a wrapper class that overrides __call__
        class Wrapper(object):
            def __call__(self, *args, **kwargs):
                return new(*args, **kwargs)
        return Wrapper()
    else:
        return new
项目:LIS-Tempest    作者:LIS    | 项目源码 | 文件源码
def __init__(self, obj, attr, new=mock.DEFAULT, **kwargs):
        self.obj = obj
        self.attr = attr
        self.kwargs = kwargs
        self.new = new
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_attach_volume_not_formatted(self, mock_execute):
        self.encryptor._get_key = mock.MagicMock()
        self.encryptor._get_key.return_value = \
                test_cryptsetup.fake__get_key(None)

        mock_execute.side_effect = [
                processutils.ProcessExecutionError(exit_code=1),  # luksOpen
                processutils.ProcessExecutionError(exit_code=1),  # isLuks
                mock.DEFAULT,  # luksFormat
                mock.DEFAULT,  # luksOpen
                mock.DEFAULT,  # ln
        ]

        self.encryptor.attach_volume(None)

        mock_execute.assert_has_calls([
            mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
                      self.dev_name, process_input='0' * 32,
                      run_as_root=True, check_exit_code=True),
            mock.call('cryptsetup', 'isLuks', '--verbose', self.dev_path,
                      run_as_root=True, check_exit_code=True),
            mock.call('cryptsetup', '--batch-mode', 'luksFormat',
                      '--key-file=-', self.dev_path, process_input='0' * 32,
                      run_as_root=True, check_exit_code=True, attempts=3),
            mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
                      self.dev_name, process_input='0' * 32,
                      run_as_root=True, check_exit_code=True),
            mock.call('ln', '--symbolic', '--force',
                      '/dev/mapper/%s' % self.dev_name, self.symlink_path,
                      run_as_root=True, check_exit_code=True),
        ], any_order=False)
        self.assertEqual(5, mock_execute.call_count)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_get_rdp_console(self):
        mock_get_host_ip = self.rdpconsoleops._hostops.get_host_ip_addr
        mock_get_rdp_port = (
            self.rdpconsoleops._rdpconsoleutils.get_rdp_console_port)
        mock_get_vm_id = self.rdpconsoleops._vmutils.get_vm_id

        connect_info = self.rdpconsoleops.get_rdp_console(mock.DEFAULT)

        self.assertEqual(mock_get_host_ip.return_value, connect_info.host)
        self.assertEqual(mock_get_rdp_port.return_value, connect_info.port)
        self.assertEqual(mock_get_vm_id.return_value,
                         connect_info.internal_access_path)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_spawn_no_admin_permissions(self):
        self._vmops._vmutils.check_admin_permissions.side_effect = (
            os_win_exc.HyperVException)
        self.assertRaises(os_win_exc.HyperVException,
                          self._vmops.spawn,
                          self.context, mock.DEFAULT, mock.DEFAULT,
                          [mock.sentinel.FILE], mock.sentinel.PASSWORD,
                          mock.sentinel.INFO, mock.sentinel.DEV_INFO)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_get_volume_connector(self):
        mock_instance = mock.DEFAULT
        initiator = self._volumeops._volutils.get_iscsi_initiator.return_value
        expected = {'ip': CONF.my_ip,
                    'host': CONF.host,
                    'initiator': initiator}

        response = self._volumeops.get_volume_connector(instance=mock_instance)

        self._volumeops._volutils.get_iscsi_initiator.assert_called_once_with()
        self.assertEqual(expected, response)