Python unittest.mock.patch 模块,multiple() 实例源码

我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用unittest.mock.patch.multiple()

项目:pypyr-cli    作者:pypyr    | 项目源码 | 文件源码
def test_env_only_calls_get():
    context = Context({
        'key1': 'value1',
        'key2': 'value2',
        'key3': 'value3',
        'envGet': {
            'key2': 'ARB_GET_ME1',
            'key4': 'ARB_GET_ME2'
        }
    })

    with patch.multiple('pypyr.steps.env',
                        env_get=DEFAULT,
                        env_set=DEFAULT,
                        env_unset=DEFAULT
                        ) as mock_env:
        pypyr.steps.env.run_step(context)

    mock_env['env_get'].assert_called_once()
    mock_env['env_set'].assert_not_called()
    mock_env['env_unset'].assert_not_called()
项目:pypyr-cli    作者:pypyr    | 项目源码 | 文件源码
def test_env_only_calls_set():
    context = Context({
        'key1': 'value1',
        'key2': 'value2',
        'key3': 'value3',
        'envSet': {
            'ARB_SET_ME1': 'key2',
            'ARB_SET_ME2': 'key1'
        }
    })

    with patch.multiple('pypyr.steps.env',
                        env_get=DEFAULT,
                        env_set=DEFAULT,
                        env_unset=DEFAULT
                        ) as mock_env:
        pypyr.steps.env.run_step(context)

    mock_env['env_get'].assert_not_called()
    mock_env['env_set'].assert_called_once()
    mock_env['env_unset'].assert_not_called()
项目:pypyr-cli    作者:pypyr    | 项目源码 | 文件源码
def test_env_only_calls_unset():
    context = Context({
        'key1': 'value1',
        'key2': 'value2',
        'key3': 'value3',
        'envUnset': [
            'ARB_DELETE_ME1',
            'ARB_DELETE_ME2'
        ]
    })

    with patch.multiple('pypyr.steps.env',
                        env_get=DEFAULT,
                        env_set=DEFAULT,
                        env_unset=DEFAULT
                        ) as mock_env:
        pypyr.steps.env.run_step(context)

    mock_env['env_get'].assert_not_called()
    mock_env['env_set'].assert_not_called()
    mock_env['env_unset'].assert_called_once()
项目:pypyr-cli    作者:pypyr    | 项目源码 | 文件源码
def test_env_only_calls_set_unset():
    context = Context({
        'key1': 'value1',
        'key2': 'value2',
        'key3': 'value3',
        'envUnset': [
            'ARB_DELETE_ME1',
            'ARB_DELETE_ME2'
        ],
        'envSet': {
            'ARB_SET_ME1': 'key2',
            'ARB_SET_ME2': 'key1'
        }
    })

    with patch.multiple('pypyr.steps.env',
                        env_get=DEFAULT,
                        env_set=DEFAULT,
                        env_unset=DEFAULT
                        ) as mock_env:
        pypyr.steps.env.run_step(context)

    mock_env['env_get'].assert_not_called()
    mock_env['env_set'].assert_called_once()
    mock_env['env_unset'].assert_called_once()
项目:pypyr-cli    作者:pypyr    | 项目源码 | 文件源码
def test_tar_only_calls_extract():
    """Only calls extract if only extract specified."""
    context = Context({
        'key1': 'value1',
        'key2': 'value2',
        'key3': 'value3',
        'tarExtract': [
            {'in': 'key2',
             'out': 'ARB_GET_ME1'},
            {'in': 'key4',
             'out': 'ARB_GET_ME2'}
        ]
    })

    with patch.multiple('pypyr.steps.tar',
                        tar_archive=DEFAULT,
                        tar_extract=DEFAULT
                        ) as mock_tar:
        pypyr.steps.tar.run_step(context)

    mock_tar['tar_extract'].assert_called_once()
    mock_tar['tar_archive'].assert_not_called()
项目:pypyr-cli    作者:pypyr    | 项目源码 | 文件源码
def test_tar_calls_archive_and_extract():
    """Calls both extract and archive when both specified."""
    context = Context({
        'key2': 'value2',
        'key1': 'value1',
        'key3': 'value3',
        'tarArchive': [
            {'in': 'key2',
             'out': 'ARB_GET_ME1'},
            {'in': 'key4',
             'out': 'ARB_GET_ME2'}
        ],
        'tarExtract': [
            {'in': 'key2',
             'out': 'ARB_GET_ME1'},
            {'in': 'key4',
             'out': 'ARB_GET_ME2'}
        ]
    })

    with patch.multiple('pypyr.steps.tar',
                        tar_archive=DEFAULT,
                        tar_extract=DEFAULT
                        ) as mock_tar:
        pypyr.steps.tar.run_step(context)

    mock_tar['tar_extract'].assert_called_once()
    mock_tar['tar_archive'].assert_called_once()


# ------------------------- tar base   ---------------------------------------#
#
# ------------------------- tar extract---------------------------------------#
项目:vault-redirector-twisted    作者:manheim    | 项目源码 | 文件源码
def test_run(self):
        self.cls.reactor = Mock(spec_set=reactor)
        with patch.multiple(
            pbm,
            logger=DEFAULT,
            Site=DEFAULT,
            LoopingCall=DEFAULT,
            VaultRedirectorSite=DEFAULT
        ) as mod_mocks:
            with patch.multiple(
                pb,
                get_active_node=DEFAULT,
                run_reactor=DEFAULT,
                listentcp=DEFAULT,
                add_update_loop=DEFAULT,
                listentls=DEFAULT
            ) as cls_mocks:
                cls_mocks['get_active_node'].return_value = 'consul:1234'
                self.cls.run()
        assert self.cls.active_node_ip_port == 'consul:1234'
        assert mod_mocks['logger'].mock_calls == [
            call.warning('Initial Vault active node: %s', 'consul:1234'),
            call.warning('Starting Twisted reactor (event loop)')
        ]
        assert mod_mocks['VaultRedirectorSite'].mock_calls == [call(self.cls)]
        assert mod_mocks['Site'].mock_calls == [
            call(mod_mocks['VaultRedirectorSite'].return_value)
        ]
        assert self.cls.reactor.mock_calls == []
        assert cls_mocks['run_reactor'].mock_calls == [call()]
        assert mod_mocks['LoopingCall'].mock_calls == []
        assert cls_mocks['listentcp'].mock_calls == [
            call(mod_mocks['Site'].return_value)
        ]
        assert cls_mocks['add_update_loop'].mock_calls == [call()]
        assert cls_mocks['listentls'].mock_calls == []
项目:vault-redirector-twisted    作者:manheim    | 项目源码 | 文件源码
def test_run_tls(self):
        self.cls.reactor = Mock(spec_set=reactor)
        self.cls.tls_factory = Mock()
        with patch.multiple(
            pbm,
            logger=DEFAULT,
            Site=DEFAULT,
            LoopingCall=DEFAULT,
            VaultRedirectorSite=DEFAULT
        ) as mod_mocks:
            with patch.multiple(
                pb,
                get_active_node=DEFAULT,
                run_reactor=DEFAULT,
                listentcp=DEFAULT,
                add_update_loop=DEFAULT,
                listentls=DEFAULT
            ) as cls_mocks:
                cls_mocks['get_active_node'].return_value = 'consul:1234'
                self.cls.run()
        assert self.cls.active_node_ip_port == 'consul:1234'
        assert mod_mocks['logger'].mock_calls == [
            call.warning('Initial Vault active node: %s', 'consul:1234'),
            call.warning('Starting Twisted reactor (event loop)')
        ]
        assert mod_mocks['VaultRedirectorSite'].mock_calls == [call(self.cls)]
        assert mod_mocks['Site'].mock_calls == [
            call(mod_mocks['VaultRedirectorSite'].return_value)
        ]
        assert self.cls.reactor.mock_calls == []
        assert cls_mocks['run_reactor'].mock_calls == [call()]
        assert mod_mocks['LoopingCall'].mock_calls == []
        assert cls_mocks['listentls'].mock_calls == [
            call(mod_mocks['Site'].return_value)
        ]
        assert cls_mocks['add_update_loop'].mock_calls == [call()]
        assert cls_mocks['listentcp'].mock_calls == []
项目:vault-redirector-twisted    作者:manheim    | 项目源码 | 文件源码
def test_run_error(self):
        self.cls.reactor = Mock(spec_set=reactor)
        with patch.multiple(
            pbm,
            logger=DEFAULT,
            Site=DEFAULT,
            LoopingCall=DEFAULT,
            VaultRedirectorSite=DEFAULT
        ) as mod_mocks:
            with patch.multiple(
                pb,
                get_active_node=DEFAULT,
                run_reactor=DEFAULT,
                listentcp=DEFAULT,
                add_update_loop=DEFAULT
            ) as cls_mocks:
                cls_mocks['get_active_node'].return_value = None
                with pytest.raises(SystemExit) as excinfo:
                    self.cls.run()
        assert excinfo.value.code == 3
        assert mod_mocks['logger'].mock_calls == [
            call.critical("ERROR: Could not get active vault node from "
                          "Consul. Exiting.")
        ]
        assert mod_mocks['VaultRedirectorSite'].mock_calls == []
        assert mod_mocks['Site'].mock_calls == []
        assert self.cls.reactor.mock_calls == []
        assert cls_mocks['run_reactor'].mock_calls == []
        assert mod_mocks['LoopingCall'].mock_calls == []
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def test_cached(self):
        mock_t = Mock()
        mock_std = Mock()
        mock_stpp = Mock()
        mock_stm = Mock()
        mock_mct = Mock()
        mock_mbs = Mock()
        mock_mos = Mock()
        with patch.multiple(
            pb,
            autospec=True,
            _transactions=DEFAULT,
            _scheduled_transactions_date=DEFAULT,
            _scheduled_transactions_per_period=DEFAULT,
            _scheduled_transactions_monthly=DEFAULT,
            _make_combined_transactions=DEFAULT,
            _make_budget_sums=DEFAULT,
            _make_overall_sums=DEFAULT
        ) as mocks:
            mocks['_transactions'].return_value.all.return_value = mock_t
            mocks['_scheduled_transactions_date'
                  ''].return_value.all.return_value = mock_std
            mocks['_scheduled_transactions_per_period'
                  ''].return_value.all.return_value = mock_stpp
            mocks['_scheduled_transactions_monthly'
                  ''].return_value.all.return_value = mock_stm
            mocks['_make_combined_transactions'].return_value = mock_mct
            mocks['_make_budget_sums'].return_value = mock_mbs
            mocks['_make_overall_sums'].return_value = mock_mos
            self.cls._data_cache = {'foo': 'bar'}
            res = self.cls._data
        assert res == {'foo': 'bar'}
        assert mocks['_transactions'].mock_calls == []
        assert mocks['_scheduled_transactions_date'].mock_calls == []
        assert mocks['_scheduled_transactions_per_period'].mock_calls == []
        assert mocks['_scheduled_transactions_monthly'].mock_calls == []
        assert mocks['_make_combined_transactions'].mock_calls == []
        assert mocks['_make_budget_sums'].mock_calls == []
        assert mocks['_make_overall_sums'].mock_calls == []
项目:journald-2-cloudwatch    作者:lincheney    | 项目源码 | 文件源码
def test_upload_journal_logs(self):
        ''' test upload_journal_logs() '''

        tskey = '__REALTIME_TIMESTAMP'

        journal = systemd.journal.Reader(path=os.getcwd())
        with patch('systemd.journal.Reader', return_value=journal) as journal:
            with patch('main.JournaldClient', MagicMock(autospec=True)) as reader:
                reader.return_value.__iter__.return_value = [sentinel.msg1, sentinel.msg2, sentinel.msg3, sentinel.msg4]

                with patch.multiple(self.client, retain_message=DEFAULT, group_messages=DEFAULT):
                    log_group1 = Mock()
                    log_group2 = Mock()
                    self.client.retain_message.side_effect = [True, False, True, True]
                    self.client.group_messages.return_value = [
                        ((log_group1, 'stream1'), [sentinel.msg1]),
                        ((log_group2, 'stream2'), [sentinel.msg3, sentinel.msg4]),
                    ]

                    self.client.upload_journal_logs(os.getcwd())

        # creates reader
        reader.assert_called_once_with(journal.return_value, self.CURSOR_CONTENT)
        # uploads log messages
        log_group1.log_messages.assert_called_once_with('stream1', [sentinel.msg1])
        log_group2.log_messages.assert_called_once_with('stream2', [sentinel.msg3, sentinel.msg4])
项目:tracboat    作者:nazavode    | 项目源码 | 文件源码
def test_connectionbase():
        # Temporarily disable ABC checks
        with patch.multiple(ConnectionBase, __abstractmethods__=set()):
            with pytest.raises(TypeError):
                ConnectionBase()
            c = ConnectionBase('project')
            assert c.project_name == 'project'
            assert not c.project_namespace
            assert c.project_qualname == 'project'
            c = ConnectionBase('ns/project')
            assert c.project_name == 'project'
            assert c.project_namespace == 'ns'
            assert c.project_qualname == 'ns/project'
项目:vault-redirector-twisted    作者:manheim    | 项目源码 | 文件源码
def test_acceptance(self):
        logger.debug('starting acceptance test')
        with patch.multiple(
            pb,
            get_active_node=DEFAULT,
            run_reactor=DEFAULT,
            update_active_node=DEFAULT
        ) as cls_mocks:
            # setup some return values
            cls_mocks['run_reactor'].side_effect = self.se_run_reactor
            cls_mocks['get_active_node'].return_value = 'foo:1234'
            cls_mocks['update_active_node'].side_effect = self.se_update_active
            # instantiate class
            self.cls = VaultRedirector('consul:1234', poll_interval=0.5)
            self.cls.log_enabled = False
            # make sure active is None (starting state)
            assert self.cls.active_node_ip_port is None
            self.cls.bind_port = self.get_open_port()
            self.cls.log_enabled = True
            # setup an async task to make the HTTP request
            self.cls.reactor.callLater(2.0, self.se_requester)
            # do this in case the callLater for self.stop_reactor fails...
            signal.signal(signal.SIGALRM, self.stop_reactor)
            signal.alarm(20)  # send SIGALRM in 20 seconds, to stop runaway loop
            self.cls.run()
            signal.alarm(0)  # disable SIGALRM
        assert self.cls.active_node_ip_port == 'bar:5678'  # from update_active
        assert self.update_active_called is True
        assert cls_mocks['update_active_node'].mock_calls[0] == call()
        assert cls_mocks['run_reactor'].mock_calls == [call()]
        assert cls_mocks['get_active_node'].mock_calls == [call()]
        # HTTP response checks
        resp = json.loads(self.response)
        # /bar/baz
        assert resp['/bar/baz']['headers'][
                   'Server'] == "vault-redirector/%s/TwistedWeb/%s" % (
            _VERSION, twisted_version.short()
        )
        assert resp['/bar/baz']['headers'][
                   'Location'] == 'http://bar:5678/bar/baz'
        assert resp['/bar/baz']['status_code'] == 307
        # /vault-redirector-health
        assert resp['/vault-redirector-health']['status_code'] == 200
        health_info = json.loads(resp['/vault-redirector-health']['text'])
        assert resp['/vault-redirector-health']['headers'][
            'Content-Type'] == 'application/json'
        assert health_info['healthy'] is True
        assert health_info['application'] == 'vault-redirector'
        assert health_info['version'] == _VERSION
        assert health_info['source'] == _PROJECT_URL
        assert health_info['consul_host_port'] == 'consul:1234'
        assert health_info['active_vault'] == 'bar:5678'
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def test_initial(self):
        mock_t = Mock()
        mock_std = Mock()
        mock_stpp = Mock()
        mock_stm = Mock()
        mock_mct = Mock()
        mock_mbs = Mock()
        mock_mos = Mock()
        with patch.multiple(
            pb,
            autospec=True,
            _transactions=DEFAULT,
            _scheduled_transactions_date=DEFAULT,
            _scheduled_transactions_per_period=DEFAULT,
            _scheduled_transactions_monthly=DEFAULT,
            _make_combined_transactions=DEFAULT,
            _make_budget_sums=DEFAULT,
            _make_overall_sums=DEFAULT
        ) as mocks:
            mocks['_transactions'].return_value.all.return_value = mock_t
            mocks['_scheduled_transactions_date'
                  ''].return_value.all.return_value = mock_std
            mocks['_scheduled_transactions_per_period'
                  ''].return_value.all.return_value = mock_stpp
            mocks['_scheduled_transactions_monthly'
                  ''].return_value.all.return_value = mock_stm
            mocks['_make_combined_transactions'].return_value = mock_mct
            mocks['_make_budget_sums'].return_value = mock_mbs
            mocks['_make_overall_sums'].return_value = mock_mos
            res = self.cls._data
        assert res == {
            'transactions': mock_t,
            'st_date': mock_std,
            'st_per_period': mock_stpp,
            'st_monthly': mock_stm,
            'all_trans_list': mock_mct,
            'budget_sums': mock_mbs,
            'overall_sums': mock_mos
        }
        assert mocks['_transactions'].mock_calls == [
            call(self.cls), call().all()
        ]
        assert mocks['_scheduled_transactions_date'].mock_calls == [
            call(self.cls), call().all()
        ]
        assert mocks['_scheduled_transactions_per_period'].mock_calls == [
            call(self.cls), call().all()
        ]
        assert mocks['_scheduled_transactions_monthly'].mock_calls == [
            call(self.cls), call().all()
        ]
        assert mocks['_make_combined_transactions'].mock_calls == [
            call(self.cls)
        ]
        assert mocks['_make_budget_sums'].mock_calls == [
            call(self.cls)
        ]
        assert mocks['_make_overall_sums'].mock_calls == [
            call(self.cls)
        ]
项目:jupyter_contrib_core    作者:Jupyter-contrib    | 项目源码 | 文件源码
def patch_jupyter_dirs():
    """
    Patch jupyter paths to use temporary directories.

    This just creates the patches and directories, caller is still
    responsible for starting & stopping patches, and removing temp dir when
    appropriate.
    """
    test_dir = tempfile.mkdtemp(prefix='jupyter_')
    jupyter_dirs = {name: make_dirs(test_dir, name) for name in (
        'user_home', 'env_vars', 'system', 'sys_prefix', 'custom', 'server')}
    jupyter_dirs['root'] = test_dir

    for name in ('notebook', 'runtime'):
        d = jupyter_dirs['server'][name] = os.path.join(
            test_dir, 'server', name)
        if not os.path.exists(d):
            os.makedirs(d)

    # patch relevant environment variables
    jupyter_patches = []
    jupyter_patches.append(
        patch.dict('os.environ', stringify_env({
            'HOME': jupyter_dirs['user_home']['root'],
            'JUPYTER_CONFIG_DIR': jupyter_dirs['env_vars']['conf'],
            'JUPYTER_DATA_DIR': jupyter_dirs['env_vars']['data'],
            'JUPYTER_RUNTIME_DIR': jupyter_dirs['server']['runtime'],
        })))

    # patch jupyter path variables in various modules
    # find the appropriate modules to patch according to compat.
    # Should include either
    # notebook.nbextensions
    # or
    # jupyter_contrib_core.notebook_compat._compat.nbextensions
    modules_to_patch = set([
        jupyter_core.paths,
        sys.modules[nbextensions._get_config_dir.__module__],
        sys.modules[nbextensions._get_nbextension_dir.__module__],
    ])
    path_patches = dict(
        SYSTEM_CONFIG_PATH=[jupyter_dirs['system']['conf']],
        ENV_CONFIG_PATH=[jupyter_dirs['sys_prefix']['conf']],
        SYSTEM_JUPYTER_PATH=[jupyter_dirs['system']['data']],
        ENV_JUPYTER_PATH=[jupyter_dirs['sys_prefix']['data']],
    )
    for mod in modules_to_patch:
        applicable_patches = {
            attrname: newval for attrname, newval in path_patches.items()
            if hasattr(mod, attrname)}
        jupyter_patches.append(
            patch.multiple(mod, **applicable_patches))

    def remove_jupyter_dirs():
        """Remove all temporary directories created."""
        shutil.rmtree(test_dir)

    return jupyter_patches, jupyter_dirs, remove_jupyter_dirs