Python unittest.mock 模块,DEFAULT 实例源码

我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用unittest.mock.DEFAULT

项目:pypyr-cli    作者:pypyr    | 项目源码 | 文件源码
def test_env_only_calls_get():
    context = Context({
        'key1': 'value1',
        'key2': 'value2',
        'key3': 'value3',
        'envGet': {
            'key2': 'ARB_GET_ME1',
            'key4': 'ARB_GET_ME2'
        }
    })

    with patch.multiple('pypyr.steps.env',
                        env_get=DEFAULT,
                        env_set=DEFAULT,
                        env_unset=DEFAULT
                        ) as mock_env:
        pypyr.steps.env.run_step(context)

    mock_env['env_get'].assert_called_once()
    mock_env['env_set'].assert_not_called()
    mock_env['env_unset'].assert_not_called()
项目:pypyr-cli    作者:pypyr    | 项目源码 | 文件源码
def test_env_only_calls_set():
    context = Context({
        'key1': 'value1',
        'key2': 'value2',
        'key3': 'value3',
        'envSet': {
            'ARB_SET_ME1': 'key2',
            'ARB_SET_ME2': 'key1'
        }
    })

    with patch.multiple('pypyr.steps.env',
                        env_get=DEFAULT,
                        env_set=DEFAULT,
                        env_unset=DEFAULT
                        ) as mock_env:
        pypyr.steps.env.run_step(context)

    mock_env['env_get'].assert_not_called()
    mock_env['env_set'].assert_called_once()
    mock_env['env_unset'].assert_not_called()
项目:pypyr-cli    作者:pypyr    | 项目源码 | 文件源码
def test_env_only_calls_unset():
    context = Context({
        'key1': 'value1',
        'key2': 'value2',
        'key3': 'value3',
        'envUnset': [
            'ARB_DELETE_ME1',
            'ARB_DELETE_ME2'
        ]
    })

    with patch.multiple('pypyr.steps.env',
                        env_get=DEFAULT,
                        env_set=DEFAULT,
                        env_unset=DEFAULT
                        ) as mock_env:
        pypyr.steps.env.run_step(context)

    mock_env['env_get'].assert_not_called()
    mock_env['env_set'].assert_not_called()
    mock_env['env_unset'].assert_called_once()
项目:pypyr-cli    作者:pypyr    | 项目源码 | 文件源码
def test_env_only_calls_set_unset():
    context = Context({
        'key1': 'value1',
        'key2': 'value2',
        'key3': 'value3',
        'envUnset': [
            'ARB_DELETE_ME1',
            'ARB_DELETE_ME2'
        ],
        'envSet': {
            'ARB_SET_ME1': 'key2',
            'ARB_SET_ME2': 'key1'
        }
    })

    with patch.multiple('pypyr.steps.env',
                        env_get=DEFAULT,
                        env_set=DEFAULT,
                        env_unset=DEFAULT
                        ) as mock_env:
        pypyr.steps.env.run_step(context)

    mock_env['env_get'].assert_not_called()
    mock_env['env_set'].assert_called_once()
    mock_env['env_unset'].assert_called_once()
项目:pypyr-cli    作者:pypyr    | 项目源码 | 文件源码
def test_tar_only_calls_extract():
    """Only calls extract if only extract specified."""
    context = Context({
        'key1': 'value1',
        'key2': 'value2',
        'key3': 'value3',
        'tarExtract': [
            {'in': 'key2',
             'out': 'ARB_GET_ME1'},
            {'in': 'key4',
             'out': 'ARB_GET_ME2'}
        ]
    })

    with patch.multiple('pypyr.steps.tar',
                        tar_archive=DEFAULT,
                        tar_extract=DEFAULT
                        ) as mock_tar:
        pypyr.steps.tar.run_step(context)

    mock_tar['tar_extract'].assert_called_once()
    mock_tar['tar_archive'].assert_not_called()
项目:pypyr-cli    作者:pypyr    | 项目源码 | 文件源码
def test_tar_calls_archive_and_extract():
    """Calls both extract and archive when both specified."""
    context = Context({
        'key2': 'value2',
        'key1': 'value1',
        'key3': 'value3',
        'tarArchive': [
            {'in': 'key2',
             'out': 'ARB_GET_ME1'},
            {'in': 'key4',
             'out': 'ARB_GET_ME2'}
        ],
        'tarExtract': [
            {'in': 'key2',
             'out': 'ARB_GET_ME1'},
            {'in': 'key4',
             'out': 'ARB_GET_ME2'}
        ]
    })

    with patch.multiple('pypyr.steps.tar',
                        tar_archive=DEFAULT,
                        tar_extract=DEFAULT
                        ) as mock_tar:
        pypyr.steps.tar.run_step(context)

    mock_tar['tar_extract'].assert_called_once()
    mock_tar['tar_archive'].assert_called_once()


# ------------------------- tar base   ---------------------------------------#
#
# ------------------------- tar extract---------------------------------------#
项目:asynq    作者:quora    | 项目源码 | 文件源码
def patch(
        target, new=mock.DEFAULT, spec=None, create=False,
        mocksignature=False, spec_set=None, autospec=False,
        new_callable=None, **kwargs
):
    """Mocks an async function.

    Should be a drop-in replacement for mock.patch that handles async automatically. The .asynq
    attribute is automatically created and shouldn't be used when accessing data on
    the mock.

    """
    getter, attribute = _get_target(target)
    return _make_patch_async(
        getter, attribute, new, spec, create, mocksignature, spec_set, autospec, new_callable,
        kwargs
    )
项目:heaviside    作者:jhuapl-boss    | 项目源码 | 文件源码
def test_run(self, mCreateSession, mSample):
        mSample.return_value = 'XXX'
        iSession = MockSession()
        mCreateSession.return_value = (iSession, '123456')
        client = iSession.client('stepfunctions')
        client.list_activities.return_value = {
            'activities':[{
                'name': 'name',
                'activityArn': 'XXX'
            }]
        }
        client.get_activity_task.return_value = {
            'taskToken': 'YYY',
            'input': '{}'
        }

        target = mock.MagicMock()

        activity = ActivityMixin(handle_task = target)

        def stop_loop(*args, **kwargs):
            activity.polling = False
            return mock.DEFAULT
        target.side_effect = stop_loop

        activity.run('name')

        calls = [
            mock.call.list_activities(),
            mock.call.get_activity_task(activityArn = 'XXX',
                                        workerName = 'name-XXX')
        ]
        self.assertEqual(client.mock_calls, calls)

        calls = [
            mock.call('YYY', {}),
            mock.call().start()
        ]
        self.assertEqual(target.mock_calls, calls)
项目:vault-redirector-twisted    作者:manheim    | 项目源码 | 文件源码
def test_run(self):
        self.cls.reactor = Mock(spec_set=reactor)
        with patch.multiple(
            pbm,
            logger=DEFAULT,
            Site=DEFAULT,
            LoopingCall=DEFAULT,
            VaultRedirectorSite=DEFAULT
        ) as mod_mocks:
            with patch.multiple(
                pb,
                get_active_node=DEFAULT,
                run_reactor=DEFAULT,
                listentcp=DEFAULT,
                add_update_loop=DEFAULT,
                listentls=DEFAULT
            ) as cls_mocks:
                cls_mocks['get_active_node'].return_value = 'consul:1234'
                self.cls.run()
        assert self.cls.active_node_ip_port == 'consul:1234'
        assert mod_mocks['logger'].mock_calls == [
            call.warning('Initial Vault active node: %s', 'consul:1234'),
            call.warning('Starting Twisted reactor (event loop)')
        ]
        assert mod_mocks['VaultRedirectorSite'].mock_calls == [call(self.cls)]
        assert mod_mocks['Site'].mock_calls == [
            call(mod_mocks['VaultRedirectorSite'].return_value)
        ]
        assert self.cls.reactor.mock_calls == []
        assert cls_mocks['run_reactor'].mock_calls == [call()]
        assert mod_mocks['LoopingCall'].mock_calls == []
        assert cls_mocks['listentcp'].mock_calls == [
            call(mod_mocks['Site'].return_value)
        ]
        assert cls_mocks['add_update_loop'].mock_calls == [call()]
        assert cls_mocks['listentls'].mock_calls == []
项目:vault-redirector-twisted    作者:manheim    | 项目源码 | 文件源码
def test_run_tls(self):
        self.cls.reactor = Mock(spec_set=reactor)
        self.cls.tls_factory = Mock()
        with patch.multiple(
            pbm,
            logger=DEFAULT,
            Site=DEFAULT,
            LoopingCall=DEFAULT,
            VaultRedirectorSite=DEFAULT
        ) as mod_mocks:
            with patch.multiple(
                pb,
                get_active_node=DEFAULT,
                run_reactor=DEFAULT,
                listentcp=DEFAULT,
                add_update_loop=DEFAULT,
                listentls=DEFAULT
            ) as cls_mocks:
                cls_mocks['get_active_node'].return_value = 'consul:1234'
                self.cls.run()
        assert self.cls.active_node_ip_port == 'consul:1234'
        assert mod_mocks['logger'].mock_calls == [
            call.warning('Initial Vault active node: %s', 'consul:1234'),
            call.warning('Starting Twisted reactor (event loop)')
        ]
        assert mod_mocks['VaultRedirectorSite'].mock_calls == [call(self.cls)]
        assert mod_mocks['Site'].mock_calls == [
            call(mod_mocks['VaultRedirectorSite'].return_value)
        ]
        assert self.cls.reactor.mock_calls == []
        assert cls_mocks['run_reactor'].mock_calls == [call()]
        assert mod_mocks['LoopingCall'].mock_calls == []
        assert cls_mocks['listentls'].mock_calls == [
            call(mod_mocks['Site'].return_value)
        ]
        assert cls_mocks['add_update_loop'].mock_calls == [call()]
        assert cls_mocks['listentcp'].mock_calls == []
项目:vault-redirector-twisted    作者:manheim    | 项目源码 | 文件源码
def test_run_error(self):
        self.cls.reactor = Mock(spec_set=reactor)
        with patch.multiple(
            pbm,
            logger=DEFAULT,
            Site=DEFAULT,
            LoopingCall=DEFAULT,
            VaultRedirectorSite=DEFAULT
        ) as mod_mocks:
            with patch.multiple(
                pb,
                get_active_node=DEFAULT,
                run_reactor=DEFAULT,
                listentcp=DEFAULT,
                add_update_loop=DEFAULT
            ) as cls_mocks:
                cls_mocks['get_active_node'].return_value = None
                with pytest.raises(SystemExit) as excinfo:
                    self.cls.run()
        assert excinfo.value.code == 3
        assert mod_mocks['logger'].mock_calls == [
            call.critical("ERROR: Could not get active vault node from "
                          "Consul. Exiting.")
        ]
        assert mod_mocks['VaultRedirectorSite'].mock_calls == []
        assert mod_mocks['Site'].mock_calls == []
        assert self.cls.reactor.mock_calls == []
        assert cls_mocks['run_reactor'].mock_calls == []
        assert mod_mocks['LoopingCall'].mock_calls == []
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testDEFAULT(self):
        self.assertTrue(DEFAULT is sentinel.DEFAULT)
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def test_cached(self):
        mock_t = Mock()
        mock_std = Mock()
        mock_stpp = Mock()
        mock_stm = Mock()
        mock_mct = Mock()
        mock_mbs = Mock()
        mock_mos = Mock()
        with patch.multiple(
            pb,
            autospec=True,
            _transactions=DEFAULT,
            _scheduled_transactions_date=DEFAULT,
            _scheduled_transactions_per_period=DEFAULT,
            _scheduled_transactions_monthly=DEFAULT,
            _make_combined_transactions=DEFAULT,
            _make_budget_sums=DEFAULT,
            _make_overall_sums=DEFAULT
        ) as mocks:
            mocks['_transactions'].return_value.all.return_value = mock_t
            mocks['_scheduled_transactions_date'
                  ''].return_value.all.return_value = mock_std
            mocks['_scheduled_transactions_per_period'
                  ''].return_value.all.return_value = mock_stpp
            mocks['_scheduled_transactions_monthly'
                  ''].return_value.all.return_value = mock_stm
            mocks['_make_combined_transactions'].return_value = mock_mct
            mocks['_make_budget_sums'].return_value = mock_mbs
            mocks['_make_overall_sums'].return_value = mock_mos
            self.cls._data_cache = {'foo': 'bar'}
            res = self.cls._data
        assert res == {'foo': 'bar'}
        assert mocks['_transactions'].mock_calls == []
        assert mocks['_scheduled_transactions_date'].mock_calls == []
        assert mocks['_scheduled_transactions_per_period'].mock_calls == []
        assert mocks['_scheduled_transactions_monthly'].mock_calls == []
        assert mocks['_make_combined_transactions'].mock_calls == []
        assert mocks['_make_budget_sums'].mock_calls == []
        assert mocks['_make_overall_sums'].mock_calls == []
项目:journald-2-cloudwatch    作者:lincheney    | 项目源码 | 文件源码
def test_upload_journal_logs(self):
        ''' test upload_journal_logs() '''

        tskey = '__REALTIME_TIMESTAMP'

        journal = systemd.journal.Reader(path=os.getcwd())
        with patch('systemd.journal.Reader', return_value=journal) as journal:
            with patch('main.JournaldClient', MagicMock(autospec=True)) as reader:
                reader.return_value.__iter__.return_value = [sentinel.msg1, sentinel.msg2, sentinel.msg3, sentinel.msg4]

                with patch.multiple(self.client, retain_message=DEFAULT, group_messages=DEFAULT):
                    log_group1 = Mock()
                    log_group2 = Mock()
                    self.client.retain_message.side_effect = [True, False, True, True]
                    self.client.group_messages.return_value = [
                        ((log_group1, 'stream1'), [sentinel.msg1]),
                        ((log_group2, 'stream2'), [sentinel.msg3, sentinel.msg4]),
                    ]

                    self.client.upload_journal_logs(os.getcwd())

        # creates reader
        reader.assert_called_once_with(journal.return_value, self.CURSOR_CONTENT)
        # uploads log messages
        log_group1.log_messages.assert_called_once_with('stream1', [sentinel.msg1])
        log_group2.log_messages.assert_called_once_with('stream2', [sentinel.msg3, sentinel.msg4])
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testDEFAULT(self):
        self.assertIs(DEFAULT, sentinel.DEFAULT)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def testDEFAULT(self):
        self.assertIs(DEFAULT, sentinel.DEFAULT)
项目:python-ioctl    作者:olavmrk    | 项目源码 | 文件源码
def test_ioctl_fn_ptr_r(self, ioctl_mock):
        def _handle_ioctl(fd, request, int_ptr):
            assert fd == 12
            assert request == 32
            assert type(int_ptr) == ctypes.POINTER(ctypes.c_int)
            int_ptr.contents.value = 42
            return mock.DEFAULT
        ioctl_mock.side_effect = _handle_ioctl

        fn = ioctl.ioctl_fn_ptr_r(32, ctypes.c_int)
        res = fn(12)
        assert res == 42
项目:python-ioctl    作者:olavmrk    | 项目源码 | 文件源码
def test_ioctl_fn_ptr_w(self, ioctl_mock):
        def _handle_ioctl(fd, request, int_ptr):
            assert fd == 12
            assert request == 32
            assert type(int_ptr) == ctypes.POINTER(ctypes.c_int)
            assert int_ptr.contents.value == 42
            return mock.DEFAULT
        ioctl_mock.side_effect = _handle_ioctl

        fn = ioctl.ioctl_fn_ptr_w(32, ctypes.c_int)
        fn(12, 42)
项目:python-ioctl    作者:olavmrk    | 项目源码 | 文件源码
def test_ioctl_fn_ptr_wr(self, ioctl_mock):
        def _handle_ioctl(fd, request, int_ptr):
            assert fd == 12
            assert request == 32
            assert type(int_ptr) == ctypes.POINTER(ctypes.c_int)
            assert int_ptr.contents.value == 24
            int_ptr.contents.value = 42
            return mock.DEFAULT
        ioctl_mock.side_effect = _handle_ioctl

        fn = ioctl.ioctl_fn_ptr_wr(32, ctypes.c_int)
        res = fn(12, 24)
        assert res == 42
项目:python-ioctl    作者:olavmrk    | 项目源码 | 文件源码
def test_ioctl_fn_w(self, ioctl_mock):
        def _handle_ioctl(fd, request, int_val):
            assert fd == 12
            assert request == 32
            assert type(int_val) == ctypes.c_int
            assert int_val.value == 42
            return mock.DEFAULT
        ioctl_mock.side_effect = _handle_ioctl

        fn = ioctl.ioctl_fn_w(32, ctypes.c_int)
        fn(12, 42)
项目:asynq    作者:quora    | 项目源码 | 文件源码
def _patch_object(
        target, attribute, new=mock.DEFAULT, spec=None,
        create=False, mocksignature=False, spec_set=None, autospec=False,
        new_callable=None, **kwargs
):
    getter = lambda: target
    return _make_patch_async(
        getter, attribute, new, spec, create, mocksignature, spec_set, autospec, new_callable,
        kwargs
    )
项目:asynq    作者:quora    | 项目源码 | 文件源码
def _maybe_wrap_new(new):
    """If the mock replacement cannot have attributes set on it, wraps it in a function.

    Also, if the replacement object is a method, applies the async() decorator.

    This is needed so that we support patch(..., x.method) where x.method is an instancemethod
    object, because instancemethods do not support attribute assignment.

    """
    if new is mock.DEFAULT:
        return new

    if inspect.isfunction(new) or isinstance(new, (classmethod, staticmethod)):
        return asynq(sync_fn=new)(new)
    elif not callable(new):
        return new

    try:
        new._maybe_wrap_new_test_attribute = None
        del new._maybe_wrap_new_test_attribute
    except (AttributeError, TypeError):
        # setting something on a bound method raises AttributeError, setting something on a
        # Cythonized class raises TypeError
        should_wrap = True
    else:
        should_wrap = False

    if should_wrap:
        # we can't just use a lambda because that overrides __get__ and creates bound methods we
        # don't want, so we make a wrapper class that overrides __call__
        class Wrapper(object):
            def __call__(self, *args, **kwargs):
                return new(*args, **kwargs)
        return Wrapper()
    else:
        return new
项目:vault-redirector-twisted    作者:manheim    | 项目源码 | 文件源码
def test_acceptance(self):
        logger.debug('starting acceptance test')
        with patch.multiple(
            pb,
            get_active_node=DEFAULT,
            run_reactor=DEFAULT,
            update_active_node=DEFAULT
        ) as cls_mocks:
            # setup some return values
            cls_mocks['run_reactor'].side_effect = self.se_run_reactor
            cls_mocks['get_active_node'].return_value = 'foo:1234'
            cls_mocks['update_active_node'].side_effect = self.se_update_active
            # instantiate class
            self.cls = VaultRedirector('consul:1234', poll_interval=0.5)
            self.cls.log_enabled = False
            # make sure active is None (starting state)
            assert self.cls.active_node_ip_port is None
            self.cls.bind_port = self.get_open_port()
            self.cls.log_enabled = True
            # setup an async task to make the HTTP request
            self.cls.reactor.callLater(2.0, self.se_requester)
            # do this in case the callLater for self.stop_reactor fails...
            signal.signal(signal.SIGALRM, self.stop_reactor)
            signal.alarm(20)  # send SIGALRM in 20 seconds, to stop runaway loop
            self.cls.run()
            signal.alarm(0)  # disable SIGALRM
        assert self.cls.active_node_ip_port == 'bar:5678'  # from update_active
        assert self.update_active_called is True
        assert cls_mocks['update_active_node'].mock_calls[0] == call()
        assert cls_mocks['run_reactor'].mock_calls == [call()]
        assert cls_mocks['get_active_node'].mock_calls == [call()]
        # HTTP response checks
        resp = json.loads(self.response)
        # /bar/baz
        assert resp['/bar/baz']['headers'][
                   'Server'] == "vault-redirector/%s/TwistedWeb/%s" % (
            _VERSION, twisted_version.short()
        )
        assert resp['/bar/baz']['headers'][
                   'Location'] == 'http://bar:5678/bar/baz'
        assert resp['/bar/baz']['status_code'] == 307
        # /vault-redirector-health
        assert resp['/vault-redirector-health']['status_code'] == 200
        health_info = json.loads(resp['/vault-redirector-health']['text'])
        assert resp['/vault-redirector-health']['headers'][
            'Content-Type'] == 'application/json'
        assert health_info['healthy'] is True
        assert health_info['application'] == 'vault-redirector'
        assert health_info['version'] == _VERSION
        assert health_info['source'] == _PROJECT_URL
        assert health_info['consul_host_port'] == 'consul:1234'
        assert health_info['active_vault'] == 'bar:5678'
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def test_initial(self):
        mock_t = Mock()
        mock_std = Mock()
        mock_stpp = Mock()
        mock_stm = Mock()
        mock_mct = Mock()
        mock_mbs = Mock()
        mock_mos = Mock()
        with patch.multiple(
            pb,
            autospec=True,
            _transactions=DEFAULT,
            _scheduled_transactions_date=DEFAULT,
            _scheduled_transactions_per_period=DEFAULT,
            _scheduled_transactions_monthly=DEFAULT,
            _make_combined_transactions=DEFAULT,
            _make_budget_sums=DEFAULT,
            _make_overall_sums=DEFAULT
        ) as mocks:
            mocks['_transactions'].return_value.all.return_value = mock_t
            mocks['_scheduled_transactions_date'
                  ''].return_value.all.return_value = mock_std
            mocks['_scheduled_transactions_per_period'
                  ''].return_value.all.return_value = mock_stpp
            mocks['_scheduled_transactions_monthly'
                  ''].return_value.all.return_value = mock_stm
            mocks['_make_combined_transactions'].return_value = mock_mct
            mocks['_make_budget_sums'].return_value = mock_mbs
            mocks['_make_overall_sums'].return_value = mock_mos
            res = self.cls._data
        assert res == {
            'transactions': mock_t,
            'st_date': mock_std,
            'st_per_period': mock_stpp,
            'st_monthly': mock_stm,
            'all_trans_list': mock_mct,
            'budget_sums': mock_mbs,
            'overall_sums': mock_mos
        }
        assert mocks['_transactions'].mock_calls == [
            call(self.cls), call().all()
        ]
        assert mocks['_scheduled_transactions_date'].mock_calls == [
            call(self.cls), call().all()
        ]
        assert mocks['_scheduled_transactions_per_period'].mock_calls == [
            call(self.cls), call().all()
        ]
        assert mocks['_scheduled_transactions_monthly'].mock_calls == [
            call(self.cls), call().all()
        ]
        assert mocks['_make_combined_transactions'].mock_calls == [
            call(self.cls)
        ]
        assert mocks['_make_budget_sums'].mock_calls == [
            call(self.cls)
        ]
        assert mocks['_make_overall_sums'].mock_calls == [
            call(self.cls)
        ]