Python mock.patch 模块,dict() 实例源码

我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用mock.patch.dict()

项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def test_get_jira_user_multi(self):
        """
        Tests the _get_jira_user method when JIRA returns more than one
        matching user.
        """
        self.mock_auth.search_users = Mock(return_value=[self.mock_jira_user_1,
                                                         self.mock_jira_user_2])
        with patch('ambassador.transport.Transport.get_key_cert',
                   return_value=self.mock_cert):
            with patch('platforms.jira.handlers.jira.JIRA',
                       return_value=self.mock_auth):
                with patch.dict('platforms.jira.handlers.settings.JIRA',
                                self.mock_settings):
                    handler_w_user = IssueAPI(endpoint=self.endpoint,
                                              user=self.user)
                    actual = handler_w_user._get_jira_user()
                    expected = None
                    self.assertEqual(actual, expected)
项目:py-lambda-packer    作者:digitalrounin    | 项目源码 | 文件源码
def test_trims_path(self, virtual_env):
        # pylint: disable=no-self-use
        venv_path = '/Y1w4sD/DELETE/THIS/ISUy0r'
        values = {
            'VIRTUAL_ENV': venv_path,
            'PATH': ':'.join(('/0ho8Ke/hL9DsW/ymH81W',
                              '/L51pua',
                              '/Vngp3V/G2m7Ih/05m7qW/LyrYkK/l5NuwA/oq1DPp',
                              venv_path,
                              '/Zpdhu4/bjvuqt',
                              '/STGtcb/FhAnWH/HwTvOr/gngGiB',
                              '/Zizj4D/szncsv/O5wO6X/joFHVT'))
        }
        with patch.dict('os.environ', values=values, clear=True):
            env = virtual_env.sanitized_env()

        assert venv_path not in env['PATH']
        assert env['PATH'] == (
            '/0ho8Ke/hL9DsW/ymH81W:/L51pua:/Vngp3V/G2m7Ih/05m7qW/LyrYkK/'
            'l5NuwA/oq1DPp:/Zpdhu4/bjvuqt:/STGtcb/FhAnWH/HwTvOr/gngGiB:'
            '/Zizj4D/szncsv/O5wO6X/joFHVT')
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_disqus_sso_payload_auth_user(self, mock_b64encode, mock_hmac):
        """Test Disqus SSO payload auth works."""
        user = UserFactory.create()

        DISQUS_PUBLIC_KEY = 'public'
        DISQUS_SECRET_KEY = 'secret'
        patch_dict = {'DISQUS_PUBLIC_KEY': DISQUS_PUBLIC_KEY,
                      'DISQUS_SECRET_KEY': DISQUS_SECRET_KEY}
        data = json.dumps({'id': user.id,
                           'username': user.name,
                           'email': user.email_addr})

        mock_b64encode.return_value = data

        with patch.dict(self.flask_app.config, patch_dict):
            message, timestamp, sig, pub_key = util.get_disqus_sso_payload(user)
            mock_b64encode.assert_called_with(data)
            mock_hmac.assert_called_with(DISQUS_SECRET_KEY, '%s %s' % (data, timestamp),
                                         hashlib.sha1)
            assert timestamp
            assert sig
            assert pub_key == DISQUS_PUBLIC_KEY
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_disqus_sso_payload_anon_user(self, mock_b64encode, mock_hmac):
        """Test Disqus SSO payload anon works."""

        DISQUS_PUBLIC_KEY = 'public'
        DISQUS_SECRET_KEY = 'secret'
        patch_dict = {'DISQUS_PUBLIC_KEY': DISQUS_PUBLIC_KEY,
                      'DISQUS_SECRET_KEY': DISQUS_SECRET_KEY}

        data = json.dumps({})

        mock_b64encode.return_value = data

        with patch.dict(self.flask_app.config, patch_dict):
            message, timestamp, sig, pub_key = util.get_disqus_sso_payload(None)
            mock_b64encode.assert_called_with(data)
            mock_hmac.assert_called_with(DISQUS_SECRET_KEY, '%s %s' % (data, timestamp),
                                         hashlib.sha1)
            assert timestamp
            assert sig
            assert pub_key == DISQUS_PUBLIC_KEY
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_handle_content_type_json_error(self, mocklast, mockjsonify,
                                            mockrender, mockrequest):
        fake_d = {'Content-Type': 'application/json'}
        mockrequest.headers.__getitem__.side_effect = fake_d.__getitem__
        mockrequest.headers.get.side_effect = fake_d.get
        mockrequest.headers.__iter__.side_effect = fake_d.__iter__
        mockjsonify.side_effect = myjsonify
        res, code = util.handle_content_type(
                                             dict(
                                                 template='example.html',
                                                 code=404,
                                                 description="Not found"))
        err_msg = "template key should exist"
        assert res.get('template') == 'example.html', err_msg
        err_msg = "jsonify should be called"
        assert mockjsonify.called, err_msg
        err_msg = "Error code should exist"
        assert res.get('code') == 404, err_msg
        assert code == 404, err_msg
        err_msg = "Error description should exist"
        assert res.get('description') is not None, err_msg
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_handle_content_type_json_form(self, mocklast, mockcsrf,
                                           mockjsonify, mockrender,
                                           mockrequest):
        fake_d = {'Content-Type': 'application/json'}
        mockrequest.headers.__getitem__.side_effect = fake_d.__getitem__
        mockrequest.headers.get.side_effect = fake_d.get
        mockrequest.headers.__iter__.side_effect = fake_d.__iter__
        mockjsonify.side_effect = myjsonify
        mockcsrf.return_value = "yourcsrf"
        form = MagicMock(spec=Form, data=dict(foo=1), errors=None)
        res = util.handle_content_type(dict(template='example.html',
                                            form=form))
        err_msg = "template key should exist"
        assert res.get('template') == 'example.html', err_msg
        err_msg = "jsonify should be called"
        assert mockjsonify.called, err_msg
        err_msg = "Form should exist"
        assert res.get('form'), err_msg
        err_msg = "Form should have a csrf key/value"
        assert res.get('form').get('csrf') == 'yourcsrf', err_msg
        err_msg = "There should be the keys of the form"
        keys = ['foo', 'errors', 'csrf']
        assert res.get('form').keys().sort() == keys.sort(), err_msg
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_handle_content_type_html(self, mockjsonify,
                                      mockrender, mockrequest):
        fake_d = {'Content-Type': 'text/html'}
        mockrequest.headers.__getitem__.side_effect = fake_d.__getitem__
        mockrequest.headers.get.side_effect = fake_d.get
        mockrequest.headers.__iter__.side_effect = fake_d.__iter__
        mockjsonify.side_effect = myjsonify
        mockrender.side_effect = myrender
        pagination = util.Pagination(page=1, per_page=5, total_count=10)
        template, data = util.handle_content_type(dict(template='example.html',
                                                       pagination=pagination))
        err_msg = "Template should be rendered"
        assert template == 'example.html', err_msg
        err_msg = "Template key should not exist"
        assert data.get('template') is None, err_msg
        err_msg = "jsonify should not be called"
        assert mockjsonify.called is False, err_msg
        err_msg = "render_template should be called"
        assert mockrender.called is True, err_msg
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_handle_content_type_html_error(self, mockjsonify,
                                            mockrender, mockrequest):
        fake_d = {'Content-Type': 'text/html'}
        mockrequest.headers.__getitem__.side_effect = fake_d.__getitem__
        mockrequest.headers.get.side_effect = fake_d.get
        mockrequest.headers.__iter__.side_effect = fake_d.__iter__
        mockjsonify.side_effect = myjsonify
        mockrender.side_effect = myrender
        template, code = util.handle_content_type(dict(template='example.html',
                                                       code=404))
        data = template[1]
        template = template[0]
        err_msg = "Template should be rendered"
        assert template == 'example.html', err_msg
        err_msg = "Template key should not exist"
        assert data.get('template') is None, err_msg
        err_msg = "jsonify should not be called"
        assert mockjsonify.called is False, err_msg
        err_msg = "render_template should be called"
        assert mockrender.called is True, err_msg
        err_msg = "There should be an error"
        assert code == 404, err_msg
        err_msg = "There should not be code key"
        assert data.get('code') is None, err_msg
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_UnicodeWriter(self):
        """Test UnicodeWriter class works."""
        tmp = tempfile.NamedTemporaryFile()
        uw = util.UnicodeWriter(tmp)
        fake_csv = ['one, two, three, {"i": 1}']
        for row in csv.reader(fake_csv):
            # change it for a dict
            row[3] = dict(i=1)
            uw.writerow(row)
        tmp.seek(0)
        err_msg = "It should be the same CSV content"
        with open(tmp.name, 'rb') as f:
            reader = csv.reader(f)
            for row in reader:
                for item in row:
                    assert item in fake_csv[0], err_msg
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_register_json_errors_get(self):
        """Test WEB register errors JSON works"""
        with patch.dict(self.flask_app.config, {'WTF_CSRF_ENABLED': True}):
            csrf = self.get_csrf('/account/register')

            userdict = {'fullname': 'a', 'name': 'name',
                        'email_addr': None, 'password': 'p'}

            res = self.app.post('/account/register', data=json.dumps(userdict),
                                content_type='application/json',
                                headers={'X-CSRFToken': csrf})
            # The output should have a mime-type: application/json
            errors = json.loads(res.data).get('form').get('errors')
            assert res.mimetype == 'application/json', res.data
            err_msg = "There should be an error with the email"
            assert errors.get('email_addr'), err_msg
            err_msg = "There should be an error with fullname"
            assert errors.get('fullname'), err_msg
            err_msg = "There should be an error with password"
            assert errors.get('password'), err_msg
            err_msg = "There should NOT be an error with name"
            assert errors.get('name') is None, err_msg
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_register_confirmation_validates_email(self, fake_signer):
        """Test WEB validates email"""
        self.register()
        user = db.session.query(User).get(1)
        user.valid_email = False
        user.confirmation_email_sent = True
        db.session.commit()

        fake_signer.loads.return_value = dict(fullname=user.fullname,
                                              name=user.name,
                                              email_addr=user.email_addr)
        self.app.get('/account/register/confirmation?key=valid-key')

        user = db.session.query(User).get(1)
        assert user is not None
        msg = "Email has not been validated"
        assert user.valid_email, msg
        msg = "Confirmation email flag has not been restored"
        assert user.confirmation_email_sent is False, msg
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_confirm_account_newsletter(self, fake_signer, url_for, newsletter):
        """Test WEB confirm email shows newsletter or home."""
        newsletter.ask_user_to_subscribe.return_value = True
        self.register()
        user = db.session.query(User).get(1)
        user.valid_email = False
        db.session.commit()
        fake_signer.loads.return_value = dict(fullname=user.fullname,
                                              name=user.name,
                                              email_addr=user.email_addr)
        self.app.get('/account/register/confirmation?key=valid-key')

        url_for.assert_called_with('account.newsletter_subscribe', next=None)

        newsletter.ask_user_to_subscribe.return_value = False
        self.app.get('/account/register/confirmation?key=valid-key')
        url_for.assert_called_with('home.home')
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_account_upload_avatar(self):
        """Test WEB Account upload avatar."""
        import io
        owner = UserFactory.create()
        url = '/account/%s/update?api_key=%s' % (owner.name,
                                                 owner.api_key)
        avatar = (io.BytesIO(b'test'), 'test_file.jpg')
        payload = dict(btn='Upload', avatar=avatar,
                       id=owner.id, x1=0, y1=0,
                       x2=100, y2=100)
        res = self.app.post(url, follow_redirects=True,
                            content_type="multipart/form-data", data=payload)
        assert res.status_code == 200
        u = user_repo.get(owner.id)
        assert u.info['avatar'] is not None
        assert u.info['container'] is not None
        avatar_url = '/uploads/%s/%s' % (u.info['container'], u.info['avatar'])
        assert u.info['avatar_url'] == avatar_url
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_update_project_json_as_user(self):
        """Test WEB JSON update project as user."""
        admin = UserFactory.create()
        owner = UserFactory.create()
        user = UserFactory.create()

        project = ProjectFactory.create(owner=owner)

        url = '/project/%s/update?api_key=%s' % (project.short_name, user.api_key)

        res = self.app_get_json(url)
        data = json.loads(res.data)

        assert data['code'] == 403, data

        old_data = dict()

        old_data['description'] = 'foobar'

        res = self.app_post_json(url, data=old_data)
        data = json.loads(res.data)

        assert data['code'] == 403, data
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_26_tutorial_signed_user(self):
        """Test WEB tutorials work as signed in user"""
        self.create()
        project1 = db.session.query(Project).get(1)
        project1.info = dict(tutorial="some help", task_presenter="presenter")
        db.session.commit()
        self.register()
        # First time accessing the project should redirect me to the tutorial
        res = self.app.get('/project/test-app/newtask', follow_redirects=True)
        err_msg = "There should be some tutorial for the project"
        assert "some help" in res.data, err_msg
        # Second time should give me a task, and not the tutorial
        res = self.app.get('/project/test-app/newtask', follow_redirects=True)
        assert "some help" not in res.data

        # Check if the tutorial can be accessed directly
        res = self.app.get('/project/test-app/tutorial', follow_redirects=True)
        err_msg = "There should be some tutorial for the project"
        assert "some help" in res.data, err_msg
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_26_tutorial_signed_user_json(self):
        """Test WEB tutorials work as signed in user"""
        self.create()
        project1 = db.session.query(Project).get(1)
        project1.info = dict(tutorial="some help", task_presenter="presenter")
        db.session.commit()
        self.register()
        # First time accessing the project should redirect me to the tutorial
        res = self.app.get('/project/test-app/newtask', follow_redirects=True)
        err_msg = "There should be some tutorial for the project"
        assert "some help" in res.data, err_msg
        # Second time should give me a task, and not the tutorial
        res = self.app.get('/project/test-app/newtask', follow_redirects=True)
        assert "some help" not in res.data

        # Check if the tutorial can be accessed directly
        res = self.app_get_json('/project/test-app/tutorial')
        data = json.loads(res.data)
        err_msg = 'key missing'
        assert 'owner' in data, err_msg
        assert 'project' in data, err_msg
        assert 'template' in data, err_msg
        assert 'title' in data, err_msg
        err_msg = 'project tutorial missing'
        assert 'My New Project' in data['title'], err_msg
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_27_tutorial_anonymous_user_json(self):
        """Test WEB tutorials work as an anonymous user"""
        self.create()
        project = db.session.query(Project).get(1)
        project.info = dict(tutorial="some help", task_presenter="presenter")
        db.session.commit()
        # First time accessing the project should redirect me to the tutorial
        res = self.app.get('/project/test-app/newtask', follow_redirects=True)
        err_msg = "There should be some tutorial for the project"
        assert "some help" in res.data, err_msg
        # Second time should give me a task, and not the tutorial
        res = self.app.get('/project/test-app/newtask', follow_redirects=True)
        assert "some help" not in res.data

        # Check if the tutorial can be accessed directly
        res = self.app_get_json('/project/test-app/tutorial')
        data = json.loads(res.data)
        err_msg = 'key missing'
        assert 'owner' in data, err_msg
        assert 'project' in data, err_msg
        assert 'template' in data, err_msg
        assert 'title' in data, err_msg
        err_msg = 'project tutorial missing'
        assert 'My New Project' in data['title'], err_msg
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_anon(self):
        """Test with user authenticated."""
        url = 'api/disqus/sso'

        DISQUS_PUBLIC_KEY = 'public'
        DISQUS_SECRET_KEY = 'secret'

        patch_dict = {'DISQUS_PUBLIC_KEY': DISQUS_PUBLIC_KEY,
                      'DISQUS_SECRET_KEY': DISQUS_SECRET_KEY}

        with patch.dict(self.flask_app.config, patch_dict):
            res = self.app.get(url)
            data = json.loads(res.data)
            assert res.status_code == 200, res.status_code
            assert data['remote_auth_s3'] is not None, data
            assert data['api_key'] is not None, data
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_get_jobs_only_on_sunday_variant(self, mock_datetime):
        """Test JOB get jobs for weekly stats works only on Sunday variant."""
        user = UserFactory.create(pro=True)
        pr = ProjectFactory(owner=user)
        task = TaskFactory.create(project=pr)
        TaskRunFactory.create(project=pr, task=task)
        mock_date = MagicMock()
        mock_date.strftime.return_value = 'Sunday'
        mock_datetime.today.return_value = mock_date

        jobs = get_weekly_stats_update_projects()
        for job in jobs:
            assert type(job) == dict, type(job)
            assert job['name'] == send_weekly_stats_project
            assert job['args'] == [pr.id]
            assert job['kwargs'] == {}
            assert job['timeout'] == self.flask_app.config.get('TIMEOUT')
            assert job['queue'] == 'low'
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_get_jobs_no_pro_feature_for_everyone(self, mock_datetime):
        """Test JOB get jobs for weekly stats works for non pros if feature is
        only for everyone."""
        user = UserFactory.create(pro=False)
        pr = ProjectFactory(owner=user)
        task = TaskFactory.create(project=pr)
        TaskRunFactory.create(project=pr, task=task)
        mock_date = MagicMock()
        mock_date.strftime.return_value = 'Sunday'
        mock_datetime.today.return_value = mock_date

        jobs = [job for job in get_weekly_stats_update_projects()]

        assert len(jobs) == 1
        for job in jobs:
            assert type(job) == dict, type(job)
            assert job['name'] == send_weekly_stats_project
            assert job['args'] == [pr.id]
            assert job['kwargs'] == {}
            assert job['timeout'] == self.flask_app.config.get('TIMEOUT')
            assert job['queue'] == 'low'
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_get_jobs_only_on_featured_variant(self, mock_datetime):
        """Test JOB get jobs for weekly stats works for featured."""
        user = UserFactory.create(pro=False)
        pr = ProjectFactory(owner=user, featured=True)
        task = TaskFactory.create(project=pr)
        TaskRunFactory.create(project=pr, task=task)
        mock_date = MagicMock()
        mock_date.strftime.return_value = 'Sunday'
        mock_datetime.today.return_value = mock_date

        jobs = get_weekly_stats_update_projects()
        for job in jobs:
            assert type(job) == dict, type(job)
            assert job['name'] == send_weekly_stats_project
            assert job['args'] == [pr.id]
            assert job['kwargs'] == {}
            assert job['timeout'] == self.flask_app.config.get('TIMEOUT')
            assert job['queue'] == 'low'
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_autoimport_jobs_with_autoimporter(self):
        """Test JOB autoimport jobs returns projects with autoimporter."""
        user = UserFactory.create(pro=True)
        project = ProjectFactory.create(owner=user,info=dict(autoimporter='foobar'))
        jobs_generator = get_autoimport_jobs()
        jobs = []
        for job in jobs_generator:
            jobs.append(job)

        msg = "There should be 1 job."
        assert len(jobs) == 1, msg
        job = jobs[0]
        msg = "It sould be the same project."
        assert job['args'][0] == project.id, msg
        msg = "It sould be created as an auto import job."
        assert job['args'][1] == True, msg
        msg = "There sould be the kwargs."
        assert job['kwargs'] == 'foobar', msg
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_autoimport_jobs_without_pro_when_for_everyone(self):
        """Test JOB autoimport jobs returns normal user owned projects
        if autoimporter is enabled for everyone."""
        project = ProjectFactory.create(info=dict(autoimporter='foobar'))
        jobs_generator = get_autoimport_jobs()
        jobs = []
        for job in jobs_generator:
            jobs.append(job)

        msg = "There should be 1 job."
        assert len(jobs) == 1, msg
        job = jobs[0]
        msg = "It sould be the same project."
        assert job['args'][0] == project.id, msg
        msg = "It sould be created as an auto import job."
        assert job['args'][1] == True, msg
        msg = "There sould be the kwargs."
        assert job['kwargs'] == 'foobar', msg
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_rackspace_uploader_init(self, Mock):
        """Test RACKSPACE UPLOADER init works."""
        new_extensions = ['pdf', 'doe']
        with patch('pybossa.uploader.rackspace.pyrax.cloudfiles',
                   return_value=cloudfiles_mock):
            with patch.dict(self.flask_app.config,
                            {'ALLOWED_EXTENSIONS': new_extensions}):

                with patch('pybossa.uploader.rackspace.pyrax.cloudfiles') as mycf:
                    mycf.get_container.return_value = True
                    u = RackspaceUploader()
                    res = u.init_app(self.flask_app, cont_name='mycontainer')
                    err_msg = "It should return the container."
                    assert res is True, err_msg
                    err_msg = "The container name should be updated."
                    assert u.cont_name == 'mycontainer', err_msg
                    for ext in new_extensions:
                        err_msg = "The .%s extension should be allowed" % ext
                        assert ext in u.allowed_extensions, err_msg
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_local_uploader_init(self):
        """Test LOCAL UPLOADER init works."""
        u = LocalUploader()
        u.init_app(self.flask_app)
        new_extensions = ['pdf', 'doe']
        new_upload_folder = '/tmp/'
        new_config_ext = {'ALLOWED_EXTENSIONS': new_extensions}
        new_config_uf = {'UPLOAD_FOLDER': new_upload_folder}
        with patch.dict(self.flask_app.config, new_config_ext):
            with patch.dict(self.flask_app.config, new_config_uf):
                new_uploader = LocalUploader()
                new_uploader.init_app(self.flask_app)
                expected_extensions = set.union(u.allowed_extensions,
                                                new_extensions)
                err_msg = "The new uploader should support two extra extensions"
                assert expected_extensions == new_uploader.allowed_extensions, err_msg
                err_msg = "Upload folder /tmp should be existing"
                assert os.path.isdir(new_uploader.upload_folder) is True, err_msg
                err_msg = "Upload folder by default is /tmp/"
                assert new_uploader.upload_folder == '/tmp/', err_msg
项目:pygenie    作者:Netflix    | 项目源码 | 文件源码
def test_stdout_chunk(self, get_stdout, get_info_for_rj):
        """Test RunningJob() stdout_chunk"""

        get_info_for_rj.return_value = dict([('stdout_size', 10)])
        get_stdout.return_value = "line1\nline2\nline3\nline4\nline5\nline6"

        running_job = pygenie.jobs.RunningJob('1234-stdout-chunk',
                                              info={'status': 'SUCCEEDED'})

        running_job.stdout_chunk(10, offset=0)

        assert_equals(
            [
                call('1234-stdout-chunk', headers={'Range': 'bytes=0-10'})
            ],
            get_stdout.call_args_list
        )
项目:pygenie    作者:Netflix    | 项目源码 | 文件源码
def test_stdout_chunk_zero_size(self, get_stdout, get_info_for_rj):
        """Test RunningJob() stdout_chunk_zero_size"""

        get_info_for_rj.return_value = dict([('stdout_size', 0)])
        get_stdout.return_value = "line1\nline2\nline3\nline4\nline5\nline6"

        running_job = pygenie.jobs.RunningJob('1234-stdout-chunk',
                                              info={'status': 'SUCCEEDED'})

        chunk = running_job.stdout_chunk(10, offset=0)

        assert_equals(
            [],
            get_stdout.call_args_list
        )

        assert_equals(chunk, None)
项目:edx-video-pipeline    作者:edx    | 项目源码 | 文件源码
def invoke_3play_callback(self, state='complete'):
        """
        Make request to 3PlayMedia callback handler, this invokes
        callback with all the necessary parameters.

        Arguments:
            state(str): state of the callback
        """
        response = self.client.post(
            # `build_url` strips `/`, putting it back and add necessary query params.
            '/{}'.format(build_url(
                self.url, edx_video_id=self.video.studio_id,
                org=self.org, lang_code=self.video_source_language
            )),
            content_type='application/x-www-form-urlencoded',
            data=urllib.urlencode(dict(file_id=self.file_id, status=state))
        )
        return response
项目:pylogctx    作者:peopledoc    | 项目源码 | 文件源码
def test_adapter_mro(context):
    from pylogctx import log_adapter

    class Parent(object):
        pass

    class Child(Parent):
        pass

    @log_adapter(Parent)
    def parent_log_maker(instance):
        return dict(parent=id(instance))

    context.update(Child())

    data = context.as_dict()
    assert 'parent' in data
项目:pylogctx    作者:peopledoc    | 项目源码 | 文件源码
def test_adapter_manager(context):
    from pylogctx import log_adapter

    class Parent(object):
        pass

    class Child(Parent):
        pass

    @log_adapter(Parent)
    def parent_log_maker(instance):
        return dict(parent=id(instance))

    with context(Child()):
        data = context.as_dict()
        assert 'parent' in data
项目:dfkernel    作者:dataflownb    | 项目源码 | 文件源码
def setup():
    """setup temporary env for tests"""
    global tmp
    tmp = tempfile.mkdtemp()
    patchers[:] = [
        patch.dict(os.environ, {
            'HOME': tmp,
            # Let tests work with --user install when HOME is changed:
            'PYTHONPATH': os.pathsep.join(sys.path),
        }),
    ]
    for p in patchers:
        p.start()

    # install IPython in the temp home:
    install(user=True)
项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def test_format_descr_wo_notes(self):
        """
        Tests the _format_description method when the alert has no notes.
        """
        self.mock_settings['INCLUDE_FULL_DESCRIPTION'] = False
        with patch('ambassador.transport.Transport.get_key_cert',
                   return_value=self.mock_cert):
            with patch('platforms.jira.handlers.jira.JIRA',
                       return_value=self.mock_auth):
                with patch.dict('platforms.jira.handlers.settings.JIRA',
                                self.mock_settings):
                    handler_w_user = IssueAPI(endpoint=self.endpoint,
                                              user=self.user)
                    alert = Alert.objects.get(pk=2)
                    actual = handler_w_user._format_description(alert)
                    expected = ''
        self.assertEqual(actual, expected)
项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def test_allowed_file(self):
        """
        Tests the get_file_path function for an allowed file type.
        """
        self.msg.attach(self.image)
        attachment = get_first_attachment(self.msg)
        mock_uuid = Mock()
        mock_uuid.hex = self.uuid
        with patch('sifter.mailsifter.attachments.uuid.uuid4',
                   return_value=mock_uuid):
            with patch.dict('sifter.mailsifter.attachments.settings.MAILSIFTER',
                            self.mock_mailsifter_settings):
                actual = attachments.get_file_path(attachment, self.company)
                expected = '%s/test-attachments/%s.jpeg' \
                           % (self.company.uuid.hex, self.uuid)
                self.assertEqual(actual, expected)
项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def test_unallowed_file(self):
        """
        Tests the get_file_path function for an unallowed file type.
        """
        self.mock_mailsifter_settings['ALLOWED_EMAIL_ATTACHMENTS'] = ('application/java',)
        with patch.dict('sifter.mailsifter.accessors.settings.MAILSIFTER',
                        self.mock_mailsifter_settings):
            self.msg.attach(self.java)
            attachment = get_first_attachment(self.msg)
            with patch('sifter.mailsifter.attachments.settings',
                       return_value=self.mock_settings):
                with LogCapture() as log_capture:
                    actual = attachments.get_file_path(attachment)
                    expected = None
                    self.assertEqual(actual, expected)
                    msg = 'The attachment %s is not an allowed file type' \
                          % self.java_file
                    log_capture.check(
                        ('sifter.mailsifter.attachments', 'WARNING', msg),
                    )
项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def test_no_file_path(self):
        """
        Tests the save_attachment function.
        """
        mock_settings_1 = {
            'ALLOWED_EMAIL_ATTACHMENTS': ('application/java',)
        }
        with patch.dict('sifter.mailsifter.accessors.settings.MAILSIFTER',
                        mock_settings_1):
            self.msg.attach(self.java)
            attachment = get_first_attachment(self.msg)
            with patch('sifter.mailsifter.attachments.settings',
                       self.mock_settings):
                with LogCapture() as log_capture:
                    actual = attachments.save_attachment(attachment)
                    expected = None
                    self.assertEqual(actual, expected)
                    msg = 'The attachment %s is not an allowed file type' \
                          % self.java_file
                    log_capture.check(
                        ('sifter.mailsifter.attachments', 'WARNING', msg),
                    )
项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def test_match_with_default(self):
        """
        Tests the process_email receiver for an email that matches an
        existing MailChute.
        """
        doc_obj = self.doc_obj
        doc_obj.data['Subject'] = 'critical alert'
        mock_config = {
            'DEFAULT_MUNGER': 'default_mail',
            'DEFAULT_MUNGER_ENABLED': True
        }
        with patch('distilleries.models.Distillery.save_data',
                   return_value='id_123') as mock_save:
            with patch.dict('sifter.mailsifter.mailchutes.models.conf.MAILSIFTER',
                            mock_config):
                with patch('sifter.mailsifter.mailchutes.models.MailChuteManager._process_with_default') \
                        as mock_catch_email:
                    MailChute.objects.process(doc_obj)
                    self.assertIs(mock_save.called, True)
                    self.assertIs(mock_catch_email.called, False)
项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def test_no_match_with_default(self):
        """
        Tests the process_email receiver for an email that doesn't match
        an existing MailChute when a default MailMunger is enabled.
        """
        doc_obj = self.doc_obj
        doc_obj.data['Subject'] = 'nothing to see here'
        mock_config = {
            'DEFAULT_MUNGER': 'default_mail',
            'DEFAULT_MUNGER_ENABLED': True
        }
        with patch.dict('sifter.mailsifter.mailchutes.models.conf.MAILSIFTER',
                        mock_config):
            with patch('sifter.mailsifter.mailchutes.models.MailChuteManager._process_with_default') \
                    as mock_default_process:
                MailChute.objects.process(doc_obj)
                mock_default_process.assert_called_once_with(doc_obj)
项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def test_no_match_missing_munger(self):
        """
        Tests the process_email receiver for an email that doesn't match
        an existing MailChute when a default MailChute is enabled but
        the defaul MailMunger can't be found.
        """
        doc_obj = self.doc_obj
        doc_obj.data['Subject'] = 'nothing to see here'
        mock_config = {
            'DEFAULT_MUNGER': 'missing_munger',
            'DEFAULT_MUNGER_ENABLED': True
        }
        with patch.dict('sifter.mailsifter.mailchutes.models.conf.MAILSIFTER',
                        mock_config):
            with LogCapture() as log_capture:
                msg = 'Default MailMunger "missing_munger" is not configured.'
                MailChute.objects.process(doc_obj)
                log_capture.check(
                    ('sifter.chutes.models', 'ERROR', msg),
                )
项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def test_get_default_no_chute(self):
        """
        Tests the _default_munger function when the default LogMunger
        does not exist.
        """
        mock_config = {
            'DEFAULT_MUNGER': 'dummy_munger',
            'DEFAULT_MUNGER_ENABLED': True
        }
        with patch.dict('sifter.logsifter.logchutes.models.conf.LOGSIFTER',
                        mock_config):
            with LogCapture() as log_capture:
                actual = LogChute.objects._default_munger
                expected = None
                self.assertEqual(actual, expected)
                self.assertFalse(LogChute.objects._default_munger_enabled)
                log_capture.check(
                    ('sifter.chutes.models',
                     'ERROR',
                     'Default LogMunger "dummy_munger" is not configured.'),
                )
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def test_raise_no_supported_snmp_backend_found_raised_if_no_snmp_libraries_are_available(
            self):
        modules = {
            'pynetsnmp': None,
        }

        with patch.dict(sys.modules, modules):
            self._patch_cleaning(sys.modules)
            pytest.raises(ImportError, 'import pynetsnmp')

            pytest.raises(ImportError, 'from nav.Snmp.pynetsnmp import *')

            try:
                from nav.Snmp import Snmp
                pytest.fail("Should never happen")
            except ImportError as foo:
                assert str(foo) == 'No supported SNMP backend was found'
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def setUp(self):
        self.plugin_a = Mock(
            name='snmptrapd plugin a')
        self.plugin_a.strip.return_value = 'nav.snmptrapd.handlers.foo'

        self.plugin_b = Mock(
            name='snmptrapd plguin b')
        self.plugin_b.strip.return_value = 'nav.snmptrapd.handlers.bar'
        del self.plugin_b.initialize

        def raise_exception():
            raise Exception('boom')

        self.bad_plugin = Mock(name='snmptrapd plugin which is bad')
        self.bad_plugin.strip.return_value = 'nav.snmptrapd.handlers.bad_plugin'
        self.bad_plugin.initialize=raise_exception

        self.patcher = patch.dict(sys.modules, {
            'nav.snmptrapd.handlers.foo': self.plugin_a,
            'nav.snmptrapd.handlers.bar': self.plugin_b,
            'nav.snmptrapd.handlers.bad_plugin': self.bad_plugin,
            'nav.snmptrapd.handlers.non_existent': None
        })
        self.patcher.start()
项目:lambada    作者:Superpedestrian    | 项目源码 | 文件源码
def test_bouncer_env(self):
        """
        Validate the environment variable loader works as expected.
        """
        # Test it doesn't leak
        with patch.dict('lambada.os.environ', dict(BLAH='asdf'), clear=True):
            config = lambada.get_config_from_env()
            self.assertFalse(config)

        # Test it works with default
        with patch.dict(
            'lambada.os.environ', dict(BOUNCER_BLAH='asdf'), clear=True
        ):
            config = lambada.get_config_from_env()
            self.assertDictEqual(config, dict(blah='asdf'))

        # Test that we can override the prefix
        with patch.dict(
            'lambada.os.environ',
            dict(STUFF_BLAH='asdf', STUFFS_BLAH='jkl;'),
            clear=True
        ):
            config = lambada.get_config_from_env('STUFF_')
            self.assertDictEqual(config, dict(blah='asdf'))
项目:lambada    作者:Superpedestrian    | 项目源码 | 文件源码
def test_dancer_class(self):
        """
        Verify the dancer class constructor, props, and call.
        """

        dancer = lambada.Dancer(self.sample_dancer, 'foo', 'bar', memory=128)
        # Confirm __init__
        self.assertEqual(dancer.name, 'foo')
        self.assertEqual(dancer.description, 'bar')
        self.assertDictEqual(dancer.override_config, dict(memory=128))
        # Validate config merge property
        self.assertDictEqual(
            dancer.config,
            dict(name='foo', description='bar', memory=128)
        )
        self.assertEqual((1, 2), dancer(1, 2))
项目:niceman    作者:ReproNim    | 项目源码 | 文件源码
def test_keeptemp_via_env_variable():

    if os.environ.get('NICEMAN_TESTS_KEEPTEMP'):
        raise SkipTest("We have env variable set to preserve tempfiles")

    files = []

    @with_tempfile()
    def check(f):
        open(f, 'w').write("LOAD")
        files.append(f)

    with patch.dict('os.environ', {}):
        check()

    with patch.dict('os.environ', {'NICEMAN_TESTS_KEEPTEMP': '1'}):
        check()

    eq_(len(files), 2)
    ok_(not exists(files[0]), msg="File %s still exists" % files[0])
    ok_(    exists(files[1]), msg="File %s not exists" % files[1])

    rmtemp(files[-1])
项目:niceman    作者:ReproNim    | 项目源码 | 文件源码
def test_skip_if_no_network():
    cleaned_env = os.environ.copy()
    cleaned_env.pop('NICEMAN_TESTS_NONETWORK', None)
    # we need to run under cleaned env to make sure we actually test in both conditions
    with patch('os.environ', cleaned_env):
        @skip_if_no_network
        def somefunc(a1):
            return a1
        eq_(somefunc.tags, ['network'])
        with patch.dict('os.environ', {'NICEMAN_TESTS_NONETWORK': '1'}):
            assert_raises(SkipTest, somefunc, 1)
        with patch.dict('os.environ', {}):
            eq_(somefunc(1), 1)
        # and now if used as a function, not a decorator
        with patch.dict('os.environ', {'NICEMAN_TESTS_NONETWORK': '1'}):
            assert_raises(SkipTest, skip_if_no_network)
        with patch.dict('os.environ', {}):
            eq_(skip_if_no_network(), None)
项目:windflow    作者:hartym    | 项目源码 | 文件源码
def test_load_settings_from_env(tmpdir):
    with patch.dict('os.environ', {}) as e:
        load_settings_from_env(str(tmpdir))
        assert os.environ.get('FOO', None) is None

    p = tmpdir.join(".env")
    p.write("FOO = bar")

    with patch.dict('os.environ', {}) as e:
        load_settings_from_env(str(tmpdir))
        assert os.environ.get('FOO', None) == 'bar'
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_handle_content_type_json(self, mocklast, mockjsonify,
                                      mockrender, mockrequest):
        fake_d = {'Content-Type': 'application/json'}
        mockrequest.headers.__getitem__.side_effect = fake_d.__getitem__
        mockrequest.headers.get.side_effect = fake_d.get
        mockrequest.headers.__iter__.side_effect = fake_d.__iter__
        mockjsonify.side_effect = myjsonify
        res = util.handle_content_type(dict(template='example.html'))
        err_msg = "template key should exist"
        assert res.get('template') == 'example.html', err_msg
        err_msg = "jsonify should be called"
        assert mockjsonify.called, err_msg