Python unittest.mock 模块,Mock() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用unittest.mock.Mock()

项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def test_get_cached_content_ids__calls(self, mock_get, mock_queryset):
        mock_redis = Mock(zrevrange=Mock(return_value=[]))
        mock_get.return_value = mock_redis
        self.stream.stream_type = StreamType.PUBLIC
        self.stream.get_cached_content_ids()
        # Skips zrevrank if not last_id
        self.assertFalse(mock_redis.zrevrank.called)
        # Calls zrevrange with correct parameters
        mock_redis.zrevrange.assert_called_once_with(self.stream.key, 0, self.stream.paginate_by)
        mock_redis.reset_mock()
        # Calls zrevrank with last_id
        self.stream.last_id = self.content2.id
        mock_redis.zrevrank.return_value = 3
        self.stream.get_cached_content_ids()
        mock_redis.zrevrank.assert_called_once_with(self.stream.key, self.content2.id)
        mock_redis.zrevrange.assert_called_once_with(self.stream.key, 4, 4 + self.stream.paginate_by)
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def test_get_cached_range(self, mock_get, mock_queryset):
        self.stream.stream_type = StreamType.PUBLIC
        mock_zrevrange = Mock(return_value=[str(self.content2.id), str(self.content1.id)])
        mock_hmget = Mock(return_value=[str(self.content2.id), str(self.content1.id)])
        mock_redis = Mock(zrevrange=mock_zrevrange, hmget=mock_hmget)
        mock_get.return_value = mock_redis
        ids, throughs = self.stream.get_cached_range(0)
        self.assertEqual(ids, [self.content2.id, self.content1.id])
        self.assertEqual(throughs, {self.content2.id: self.content2.id, self.content1.id: self.content1.id})
        mock_zrevrange.assert_called_once_with(self.stream.key, 0, 0 + self.stream.paginate_by)
        mock_hmget.assert_called_once_with(BaseStream.get_throughs_key(self.stream.key), keys=[
            self.content2.id, self.content1.id,
        ])

        # Non-zero index
        mock_zrevrange.reset_mock()
        self.stream.get_cached_range(5)
        mock_zrevrange.assert_called_once_with(self.stream.key, 5, 5 + self.stream.paginate_by)
项目:abodepy    作者:MisterWil    | 项目源码 | 文件源码
def tests_event_registration(self):
        """Tests that events register correctly."""
        # Get the event controller
        events = self.abode.events
        self.assertIsNotNone(events)

        # Create mock callback
        callback = Mock()

        # Test that a valid event registers
        self.assertTrue(
            events.add_event_callback(TIMELINE.ALARM_GROUP, callback))

        # Test that no event group returns false
        self.assertFalse(events.add_event_callback(None, callback))

        # Test that an invalid event throws exception
        with self.assertRaises(abodepy.AbodeException):
            events.add_event_callback("lol", callback)
项目:abodepy    作者:MisterWil    | 项目源码 | 文件源码
def tests_timeline_registration(self):
        """Tests that timeline events register correctly."""
        # Get the event controller
        events = self.abode.events
        self.assertIsNotNone(events)

        # Create mock callback
        callback = Mock()

        # Test that a valid timeline event registers
        self.assertTrue(
            events.add_timeline_callback(
                TIMELINE.CAPTURE_IMAGE, callback))

        # Test that no timeline event returns false
        self.assertFalse(events.add_timeline_callback(None, callback))

        # Test that an invalid timeline event string throws exception
        with self.assertRaises(abodepy.AbodeException):
            events.add_timeline_callback("lol", callback)

        # Test that an invalid timeline event dict throws exception
        with self.assertRaises(abodepy.AbodeException):
            events.add_timeline_callback({"lol": "lol"}, callback)
项目:abodepy    作者:MisterWil    | 项目源码 | 文件源码
def tests_multi_events_callback(self):
        """Tests that multiple event updates callback correctly."""
        # Get the event controller
        events = self.abode.events
        self.assertIsNotNone(events)

        # Create mock callback
        callback = Mock()

        # Register our events
        self.assertTrue(
            events.add_event_callback(
                [TIMELINE.ALARM_GROUP, TIMELINE.CAPTURE_GROUP],
                callback))

        # Call our events callback method and trigger a capture group event
        # pylint: disable=protected-access
        event_json = json.loads(IRCAMERA.timeline_event())
        events._on_timeline_update(event_json)

        # Ensure our callback was called
        callback.assert_called_with(event_json)
项目:abodepy    作者:MisterWil    | 项目源码 | 文件源码
def tests_multi_timeline_callback(self):
        """Tests that multiple timeline updates callback correctly."""
        # Get the event controller
        events = self.abode.events
        self.assertIsNotNone(events)

        # Create mock callback
        callback = Mock()

        # Register our events
        self.assertTrue(
            events.add_timeline_callback(
                [TIMELINE.CAPTURE_IMAGE, TIMELINE.OPENED], callback))

        # Call our events callback method and trigger a capture group event
        # pylint: disable=protected-access
        event_json = json.loads(IRCAMERA.timeline_event())
        events._on_timeline_update(event_json)

        # Ensure our callback was called
        callback.assert_called_with(event_json)
项目:abodepy    作者:MisterWil    | 项目源码 | 文件源码
def tests_automations_callback(self):
        """Tests that automation updates callback correctly."""
        # Get the event controller
        events = self.abode.events
        self.assertIsNotNone(events)

        # Create mock callbacks
        automation_callback = Mock()

        # Register our events
        self.assertTrue(
            events.add_event_callback(
                TIMELINE.AUTOMATION_EDIT_GROUP, automation_callback))

        # Call our events callback method and trigger a capture group event
        # pylint: disable=protected-access
        events._on_automation_update('{}')

        # Our capture callback should get one, but our alarm should not
        automation_callback.assert_called_with('{}')
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def test_no_modifications(self):
        self.io._write_attr_annotations = mock.Mock()

        self.io.write_all_blocks(self.neo_blocks)
        self.io._write_attr_annotations.assert_not_called()
        self.compare_blocks(self.neo_blocks, self.io.nix_file.blocks)

        # clearing hashes and checking again
        for k in self.io._object_hashes.keys():
            self.io._object_hashes[k] = None
        self.io.write_all_blocks(self.neo_blocks)
        self.io._write_attr_annotations.assert_not_called()

        # changing hashes to force rewrite
        for k in self.io._object_hashes.keys():
            self.io._object_hashes[k] = "_"
        self.io.write_all_blocks(self.neo_blocks)
        callcount = self.io._write_attr_annotations.call_count
        self.assertEqual(callcount, len(self.io._object_hashes))
        self.compare_blocks(self.neo_blocks, self.io.nix_file.blocks)
项目:azure-python-devtools    作者:Azure    | 项目源码 | 文件源码
def test_subscription_recording_processor_for_request(self):
        replaced_subscription_id = str(uuid.uuid4())
        rp = SubscriptionRecordingProcessor(replaced_subscription_id)

        uri_templates = ['https://management.azure.com/subscriptions/{}/providers/Microsoft.ContainerRegistry/'
                         'checkNameAvailability?api-version=2017-03-01',
                         'https://graph.windows.net/{}/applications?api-version=1.6']

        for template in uri_templates:
            mock_sub_id = str(uuid.uuid4())
            mock_request = mock.Mock()
            mock_request.uri = template.format(mock_sub_id)
            mock_request.body = self._mock_subscription_request_body(mock_sub_id)

            rp.process_request(mock_request)
            self.assertEqual(mock_request.uri, template.format(replaced_subscription_id))
            self.assertEqual(mock_request.body,
                             self._mock_subscription_request_body(replaced_subscription_id))
项目:SanicMongo    作者:beepaste    | 项目源码 | 文件源码
def test_getattr_with_under(self, *args, **kwrags):
        connection = Mock(spec=core.SanicMongoAgnosticClient)
        delegate = MagicMock()
        delegate.name = 'blabla'
        connection.delegate = delegate
        name = 'blabla'
        db = create_class_with_framework(core.SanicMongoAgnosticDatabase,
                                         asyncio_framework,
                                         self.__module__)(connection, name)

        coll = create_class_with_framework(core.SanicMongoAgnosticCollection,
                                           asyncio_framework,
                                           self.__module__)
        coll = coll(db, name)

        self.assertEqual(coll._get_write_mode, coll.delegate._get_write_mode)
项目:SanicMongo    作者:beepaste    | 项目源码 | 文件源码
def test_getitem(self, *args, **kwargs):
        connection = Mock(spec=core.SanicMongoAgnosticClient)
        delegate = MagicMock()
        delegate.name = 'blabla'
        connection.delegate = delegate
        name = 'blabla'
        db = create_class_with_framework(core.SanicMongoAgnosticDatabase,
                                         asyncio_framework,
                                         self.__module__)(connection, name)

        coll = create_class_with_framework(core.SanicMongoAgnosticCollection,
                                           asyncio_framework,
                                           self.__module__)
        coll = coll(db, name)

        other_coll = coll['other']
        self.assertTrue(isinstance(other_coll, type(coll)))
项目:SanicMongo    作者:beepaste    | 项目源码 | 文件源码
def test_find(self, *args, **kwargs):
        connection = Mock(spec=core.SanicMongoAgnosticClient)
        delegate = MagicMock()
        delegate.name = 'blabla'
        connection.delegate = delegate
        name = 'blabla'
        db = create_class_with_framework(core.SanicMongoAgnosticDatabase,
                                         asyncio_framework,
                                         self.__module__)(connection, name)

        coll = create_class_with_framework(core.SanicMongoAgnosticCollection,
                                           asyncio_framework,
                                           self.__module__)(db, name)
        cursor = create_class_with_framework(core.SanicMongoAgnosticCursor,
                                             asyncio_framework,
                                             self.__module__)
        self.assertIsInstance(coll.find(), cursor)
项目:SanicMongo    作者:beepaste    | 项目源码 | 文件源码
def test_get_item(self, *args, **kwargs):
        connection = Mock(spec=core.SanicMongoAgnosticClient)
        delegate = MagicMock()
        connection.delegate = delegate
        name = 'blabla'
        db = create_class_with_framework(core.SanicMongoAgnosticDatabase,
                                         asyncio_framework,
                                         self.__module__)(connection, name)

        comp_coll = create_class_with_framework(
            core.SanicMongoAgnosticCollection,
            asyncio_framework,
            self.__module__)

        coll = db['bla']
        self.assertTrue(isinstance(coll, comp_coll))
项目:SanicMongo    作者:beepaste    | 项目源码 | 文件源码
def test_asynchornize(self):

        test_mock = Mock()

        class TestClass:

            @classmethod
            def _get_db(cls):
                db = Mock()
                db._framework = asyncio_framework
                return db

            @metaprogramming.asynchronize
            def sync(self):
                test_mock()

        testobj = TestClass()
        self.assertTrue(isinstance(testobj.sync(), Future))
        yield from testobj.sync()
        self.assertTrue(test_mock.called)
项目:SanicMongo    作者:beepaste    | 项目源码 | 文件源码
def test_asynchornize_with_exception(self):

        class TestClass:

            @classmethod
            def _get_db(cls):
                db = Mock()
                db._framework = asyncio_framework
                return db

            @metaprogramming.asynchronize
            def sync(self):
                raise Exception

        testobj = TestClass()
        with self.assertRaises(Exception):
            yield from testobj.sync()
项目:SanicMongo    作者:beepaste    | 项目源码 | 文件源码
def test_create_attribute_with_attribute_error(self):

        test_mock = Mock()

        class BaseTestClass:

            @classmethod
            def _get_db(cls):
                db = Mock()
                db._framework = asyncio_framework
                return db

            def some_method(self):
                test_mock()

        class TestClass(BaseTestClass):

            some_method = metaprogramming.Async()

        test_class = TestClass
        with self.assertRaises(AttributeError):
            test_class.some_method = TestClass.some_method.create_attribute(
                TestClass, 'some_other_method')
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def test_errors_reset():
    from jenkins_epo.bot import Error, Instruction
    from jenkins_epo.utils import parse_datetime
    from jenkins_epo.extensions.core import ErrorExtension

    ext = ErrorExtension('error', Mock())
    ext.current = ext.bot.current
    ext.current.denied_instructions = []
    ext.current.errors = [
        Error('message', parse_datetime('2016-08-03T15:58:47Z')),
    ]
    ext.current.error_reset = None
    ext.process_instruction(
        Instruction(name='reset-errors', author='bot', date=datetime.utcnow())
    )
    assert ext.current.error_reset

    yield from ext.run()

    assert not ext.current.head.comment.mock_calls
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def test_error_denied():
    from jenkins_epo.bot import Instruction
    from jenkins_epo.extensions.core import ErrorExtension

    ext = ErrorExtension('error', Mock())
    ext.current = ext.bot.current
    ext.current.denied_instructions = [Mock()]
    ext.current.errors = []
    ext.current.error_reset = None

    ext.process_instruction(
        Instruction(name='reset-denied', author='bot')
    )

    assert not ext.current.denied_instructions

    yield from ext.run()

    assert not ext.current.head.comment.mock_calls

    ext.current.denied_instructions = [Mock(author='toto')]

    yield from ext.run()

    assert ext.current.head.comment.mock_calls
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def test_yml_found(mocker, SETTINGS):
    GITHUB = mocker.patch('jenkins_epo.extensions.core.GITHUB')
    Job = mocker.patch('jenkins_epo.extensions.core.Job')
    from jenkins_epo.extensions.core import YamlExtension

    Job.jobs_filter = ['*', '-skip']
    SETTINGS.update(YamlExtension.SETTINGS)

    ext = YamlExtension('ext', Mock())
    ext.current = ext.bot.current
    ext.current.yaml = {'job': dict()}

    GITHUB.fetch_file_contents = CoroutineMock(
        return_value="job: command\nskip: command",
    )

    head = ext.current.head
    head.repository.url = 'https://github.com/owner/repo.git'
    head.repository.jobs = {}

    yield from ext.run()

    assert GITHUB.fetch_file_contents.mock_calls
    assert 'job' in ext.current.job_specs
    assert 'skip' not in ext.current.job_specs
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def test_yml_comment_dict():
    from jenkins_epo.bot import Instruction
    from jenkins_epo.extensions.core import YamlExtension

    ext = YamlExtension('ext', Mock())
    ext.current = ext.bot.current
    ext.current.yaml = {}

    ext.process_instruction(Instruction(
        author='a', name='yaml', args=dict(job=dict(parameters=dict(PARAM1=1)))
    ))

    ext.process_instruction(Instruction(
        author='a', name='params', args=dict(job=dict(PARAM2=1))
    ))

    assert 'PARAM1' in ext.current.yaml['job']['parameters']
    assert 'PARAM2' in ext.current.yaml['job']['parameters']
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def test_job_update():
    from jenkins_epo.extensions.jenkins import CreateJobsExtension

    ext = CreateJobsExtension('createjob', Mock())
    ext.current = ext.bot.current
    ext.current.refresh_jobs = None
    ext.current.job_specs = {'new_job': Mock(config=dict())}
    ext.current.job_specs['new_job'].name = 'new_job'
    job = Mock()
    job.spec.contains.return_value = False
    ext.current.jobs = {'new_job': job}

    res = [x for x in ext.process_job_specs()]
    assert res

    action, spec = res[0]

    assert action == job.update
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def test_jenkins_fails_existing(mocker):
    process_job_specs = mocker.patch(
        'jenkins_epo.extensions.jenkins.CreateJobsExtension.process_job_specs'
    )

    from jenkins_yml.job import Job as JobSpec
    from jenkins_epo.extensions.jenkins import CreateJobsExtension

    ext = CreateJobsExtension('createjob', Mock())
    ext.current = ext.bot.current
    ext.current.errors = []
    ext.current.head.sha = 'cafed0d0'
    ext.current.head.repository.jobs = {'job': Mock()}
    ext.current.job_specs = dict(job=JobSpec.factory('job', 'toto'))
    job = Mock()
    ext.current.jobs = {'job': job}
    job.update = CoroutineMock(side_effect=Exception('POUET'))

    process_job_specs.return_value = [(job.update, Mock())]

    yield from ext.run()

    assert ext.current.errors
    assert job.update.mock_calls
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def test_second_stage():
    from jenkins_epo.extensions.jenkins import StagesExtension

    ext = StagesExtension('stages', Mock())
    ext.current = Mock()
    ext.current.head.ref = 'pr'
    ext.current.SETTINGS.STAGES = ['build', 'test']
    ext.current.job_specs = specs = {
        'test': Mock(config=dict()),
        'missing': Mock(config=dict()),
    }
    specs['test'].name = 'test'
    specs['missing'].name = 'missing'

    ext.current.jobs = jobs = {
        'test': Mock(),
    }
    jobs['test'].list_contexts.return_value = ['test']

    ext.current.statuses = {}

    yield from ext.run()

    assert ext.current.current_stage.name == 'test'
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def test_no_test_stage():
    from jenkins_epo.extensions.jenkins import StagesExtension

    ext = StagesExtension('stages', Mock())
    ext.current = Mock()
    ext.current.head.ref = 'pr'
    ext.current.SETTINGS.STAGES = ['build', 'deploy']
    ext.current.job_specs = specs = {
        'build': Mock(config=dict()),
    }
    specs['build'].name = 'build'

    ext.current.jobs = jobs = {
        'build': Mock(),
    }
    jobs['build'].list_contexts.return_value = ['build']

    ext.current.statuses = {}

    yield from ext.run()

    assert 'build' == ext.current.current_stage.name
    assert 'build' in ext.current.job_specs
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def test_branches_limit():
    from jenkins_epo.extensions.jenkins import StagesExtension

    ext = StagesExtension('stages', Mock())
    ext.current = Mock()
    ext.current.head.ref = 'pr'
    ext.current.SETTINGS.STAGES = ['test']
    ext.current.job_specs = specs = {
        'job': Mock(config=dict(branches='master')),
    }
    specs['job'].name = 'job'

    ext.current.jobs = jobs = {'job': Mock()}
    jobs['job'].list_contexts.return_value = ['job']

    ext.current.statuses = {}

    yield from ext.run()

    assert ext.current.current_stage.name == 'test'
    assert 'job' not in ext.current.job_specs
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def test_comment_deny():
    from jenkins_epo.bot import Instruction
    from jenkins_epo.extensions.core import MergerExtension

    ext = MergerExtension('merger', Mock())
    ext.current = Mock()
    ext.current.SETTINGS.COLLABORATORS = []
    ext.current.last_commit.date = datetime.now()
    ext.current.opm = {}
    ext.current.opm_denied = [Instruction(
        author='noncollaborator', name='opm',
        date=ext.current.last_commit.date + timedelta(hours=1),
    )]

    yield from ext.run()

    assert ext.current.head.comment.mock_calls
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def test_deny_outdated_opm():
    from jenkins_epo.bot import Instruction
    from jenkins_epo.extensions.core import MergerExtension

    ext = MergerExtension('merger', Mock())
    ext.current = Mock()
    ext.current.SETTINGS.COLLABORATORS = ['collaborator']
    ext.current.last_commit.date = datetime.now()
    ext.current.opm = {}
    ext.current.opm_denied = []

    ext.process_instruction(Instruction(
        author='collaborator', name='opm',
        date=ext.current.last_commit.date - timedelta(hours=1),
    ))

    assert not ext.current.opm
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def test_deny_non_collaborator_processed():
    from jenkins_epo.bot import Instruction
    from jenkins_epo.extensions.core import MergerExtension

    ext = MergerExtension('merger', Mock())
    ext.current = Mock()
    ext.current.SETTINGS.COLLABORATORS = []
    ext.current.last_commit.date = datetime.now()
    ext.current.opm = None
    ext.current.opm_denied = [Instruction(
        author='noncollaborator', name='opm',
    )]

    ext.process_instruction(Instruction(
        author='bot', name='opm-processed',
        date=ext.current.last_commit.date + timedelta(hours=2),
    ))

    assert not ext.current.opm
    assert not ext.current.opm_denied
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def test_accept_lgtm():
    from jenkins_epo.bot import Instruction
    from jenkins_epo.extensions.core import MergerExtension

    ext = MergerExtension('merger', Mock())
    ext.current = Mock()
    ext.current.SETTINGS.COLLABORATORS = ['collaborator']
    ext.current.last_commit.date = commit_date = datetime.now()
    ext.current.opm = None
    ext.current.opm_denied = []

    ext.process_instruction(Instruction(
        author='collaborator', name='opm',
        date=commit_date + timedelta(hours=1),
    ))

    assert 'collaborator' == ext.current.opm.author
    assert not ext.current.opm_denied
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def test_merge_wip():
    from jenkins_epo.extensions.core import MergerExtension

    ext = MergerExtension('merger', Mock())
    ext.current = Mock()
    ext.current.SETTINGS.COLLABORATORS = ['collaborator']
    ext.current.last_commit.date = datetime.now()
    ext.current.opm.author = 'collaborator'
    ext.current.opm.date = datetime.now()
    ext.current.opm_denied = []
    ext.current.opm_processed = None
    ext.current.wip = True

    yield from ext.run()

    assert ext.current.head.comment.mock_calls
    body = ext.current.head.comment.call_args[1]['body']
    assert '@collaborator' in body
    assert not ext.current.head.merge.mock_calls
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def test_merge_wip_skip_outdated():
    from jenkins_epo.extensions.core import MergerExtension

    ext = MergerExtension('merger', Mock())
    ext.current = Mock()
    ext.current.SETTINGS.COLLABORATORS = ['collaborator']
    ext.current.last_commit.date = datetime.now()
    ext.current.opm.author = 'collaborator'
    ext.current.opm.date = datetime.now()
    ext.current.opm_denied = []
    ext.current.wip = True
    ext.current.opm_processed = Mock(
        date=ext.current.opm.date + timedelta(minutes=5)
    )

    yield from ext.run()

    assert not ext.current.head.comment.mock_calls
    assert not ext.current.head.merge.mock_calls
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def test_merge_fail():
    from jenkins_epo.extensions.core import MergerExtension, ApiError

    ext = MergerExtension('merger', Mock())
    ext.current = Mock()
    ext.current.SETTINGS.COLLABORATORS = ['collaborator']
    ext.current.last_commit.date = datetime.now()
    ext.current.opm.author = 'collaborator'
    ext.current.opm_denied = []
    ext.current.wip = None
    ext.current.last_commit.fetch_combined_status = CoroutineMock(
        return_value={'state': 'success'}
    )
    ext.current.head.merge.side_effect = ApiError('url', {}, dict(
        code=405, json=dict(message="error")
    ))

    yield from ext.run()

    assert not ext.current.head.delete_branch.mock_calls
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def test_merge_success():
    from jenkins_epo.extensions.core import MergerExtension

    ext = MergerExtension('merger', Mock())
    ext.current = Mock()
    ext.current.opm = Mock(author='author')
    ext.current.opm_denied = []
    ext.current.last_merge_error = None
    ext.current.last_commit.fetch_combined_status = CoroutineMock(
        return_value={'state': 'success'}
    )
    ext.current.wip = None

    yield from ext.run()

    assert ext.current.head.merge.mock_calls
    assert not ext.current.head.comment.mock_calls
    assert ext.current.head.delete_branch.mock_calls
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def test_process_allow():
    from jenkins_epo.bot import Instruction
    from jenkins_epo.extensions.core import SecurityExtension

    ext = SecurityExtension('sec', Mock())
    ext.current = ext.bot.current
    ext.current.head.author = 'contributor'
    ext.current.security_feedback_processed = None
    ext.current.SETTINGS.COLLABORATORS = ['owner']
    ext.current.denied_instructions = [
        Instruction(author='contributor', name='skip')
    ]

    ext.process_instruction(Instruction(
        author='owner', name='allow'
    ))

    assert 'contributor' in ext.current.SETTINGS.COLLABORATORS
    assert not ext.current.denied_instructions
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def test_skip_outdated():
    from jenkins_epo.extensions.jenkins import PollExtension

    ext = PollExtension('test', Mock())
    ext.current = ext.bot.current
    ext.current.head.sha = 'cafed0d0'
    ext.current.cancel_queue = []
    ext.current.job_specs = {'job': Mock()}
    ext.current.job_specs['job'].name = 'job'
    ext.current.jobs = {}
    ext.current.jobs['job'] = job = Mock()
    job.fetch_builds = CoroutineMock()
    job.process_builds.return_value = builds = [Mock()]
    build = builds[0]
    build.is_outdated = True

    yield from ext.run()

    assert not builds[0].is_running.mock_calls
    assert 0 == len(ext.current.cancel_queue)
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def test_skip_build_not_running():
    from jenkins_epo.extensions.jenkins import PollExtension

    ext = PollExtension('test', Mock())
    ext.current = ext.bot.current
    ext.current.head.sha = 'cafed0d0'
    ext.current.cancel_queue = []
    ext.current.job_specs = {'job': Mock()}
    ext.current.job_specs['job'].name = 'job'
    ext.current.jobs = {}
    ext.current.jobs['job'] = job = Mock()
    job.fetch_builds = CoroutineMock()
    job.process_builds.return_value = builds = [Mock()]
    build = builds[0]
    build.is_outdated = False
    build.is_running = False

    yield from ext.run()

    assert 0 == len(ext.current.cancel_queue)
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def test_skip_current_sha(mocker):
    from jenkins_epo.extensions.jenkins import PollExtension

    ext = PollExtension('test', Mock())
    ext.current = ext.bot.current
    ext.current.cancel_queue = []
    ext.current.head.ref = 'branch'
    ext.current.head.sha = 'bab1'
    ext.current.job_specs = {'job': Mock()}
    ext.current.job_specs['job'].name = 'job'
    ext.current.jobs = {}
    ext.current.jobs['job'] = job = Mock()
    job.list_contexts.return_value = []
    job.fetch_builds = CoroutineMock()
    job.process_builds.return_value = builds = [Mock()]
    build = builds[0]
    build.is_outdated = False
    build.is_running = True
    build.ref = 'branch'
    build.sha = ext.current.head.sha

    yield from ext.run()

    assert 0 == len(ext.current.cancel_queue)
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def test_cancel(mocker):
    from jenkins_epo.extensions.jenkins import PollExtension

    ext = PollExtension('test', Mock())
    ext.current = ext.bot.current
    ext.current.cancel_queue = []
    ext.current.head.ref = 'branch'
    ext.current.head.sha = 'bab1'
    ext.current.job_specs = {'job': Mock()}
    ext.current.job_specs['job'].name = 'job'
    ext.current.jobs = {}
    ext.current.jobs['job'] = job = Mock()
    job.fetch_builds = CoroutineMock()
    job.process_builds.return_value = builds = [Mock()]
    build = builds[0]
    build.is_outdated = False
    build.is_running = True
    build.ref = 'branch'
    build.sha = '01d'
    build.url = 'url://'
    build.commit_status = dict()

    yield from ext.run()

    assert 1 == len(ext.current.cancel_queue)
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def test_cancel_ignore_other(mocker):
    Build = mocker.patch('jenkins_epo.extensions.jenkins.Build')
    from jenkins_epo.extensions.jenkins import (
        CancellerExtension, CommitStatus, NotOnJenkins
    )

    Build.from_url = CoroutineMock(side_effect=NotOnJenkins())

    commit = Mock()

    ext = CancellerExtension('test', Mock())
    ext.current = ext.bot.current
    ext.current.head.sha = 'cafed0d0'
    ext.current.poll_queue = []
    ext.current.cancel_queue = [
        (commit, CommitStatus(
            context='ci/...', target_url='circleci://1', state='pending',
        )),
    ]
    ext.current.last_commit.fetch_statuses.return_value = []
    ext.current.last_commit.maybe_update_status = CoroutineMock()

    yield from ext.run()

    assert not commit.maybe_update_status.mock_calls
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def test_poll_lost_build(mocker):
    JENKINS = mocker.patch('jenkins_epo.extensions.jenkins.JENKINS')
    Build = mocker.patch('jenkins_epo.extensions.jenkins.Build')
    from jenkins_epo.extensions.jenkins import CancellerExtension, CommitStatus

    commit = Mock()
    commit.maybe_update_status = CoroutineMock()

    ext = CancellerExtension('test', Mock())
    ext.current = ext.bot.current
    ext.current.head.sha = 'cafed0d0'
    ext.current.poll_queue = []
    ext.current.cancel_queue = [
        (commit, CommitStatus(context='job', target_url='jenkins://job/1')),
    ]
    ext.current.last_commit.fetch_statuses.return_value = []

    JENKINS.baseurl = 'jenkins://'
    Build.from_url = CoroutineMock()

    yield from ext.run()

    assert Build.from_url.mock_calls
    assert commit.maybe_update_status.mock_calls
项目:sauna    作者:NicolasLM    | 项目源码 | 文件源码
def test_get_network_data(self, time_mock, sleep_mock):
        time_mock.side_effect = [1, 2]

        Counter = namedtuple('Counter',
                             ['bytes_sent', 'bytes_recv', 'packets_sent',
                              'packets_recv'])

        first_counter = Counter(bytes_sent=54000, bytes_recv=12000,
                                packets_sent=50, packets_recv=100)
        second_counter = Counter(bytes_sent=108000, bytes_recv=36000,
                                 packets_sent=75, packets_recv=150)

        m = mock.Mock()
        m.side_effect = [
            {'eth0': first_counter}, {'eth0': second_counter}
        ]

        self.network.psutil.net_io_counters = m
        kb_ul, kb_dl, p_ul, p_dl = self.network.get_network_data(
            interface='eth0', delay=1)
        self.assertEqual(kb_ul, 54000)
        self.assertEqual(kb_dl, 24000)
        self.assertEqual(p_ul, 25)
        self.assertEqual(p_dl, 50)
项目:trellio    作者:artificilabs    | 项目源码 | 文件源码
def test_xsubscribe(service_a1, service_d1, registry):
    # assert service_d1 == {}
    registry.register_service(
        packet={'params': service_a1}, registry_protocol=mock.Mock())
    registry.register_service(
        packet={'params': service_d1}, registry_protocol=mock.Mock())
    registry._xsubscribe(packet={'params': service_d1})

    protocol = mock.Mock()
    params = {
        'name': service_a1['name'],
        'version': service_a1['version'],
        'endpoint': service_d1['events'][0]['endpoint']
    }
    registry.get_subscribers(packet={'params': params, 'request_id': str(uuid.uuid4())}, protocol=protocol)
    assert subscriber_returned_successfully(protocol.send.call_args_list[0][0][0], service_d1)
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def test_non_post_entity_types_are_skipped(self, mock_logger):
        process_entity_retraction(Mock(entity_type="foo"), Mock())
        mock_logger.assert_called_with("Ignoring retraction of entity_type %s", "foo")
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def test_does_nothing_if_content_doesnt_exist(self, mock_logger):
        process_entity_retraction(Mock(entity_type="Post", target_guid="bar"), Mock())
        mock_logger.assert_called_with("Retracted remote content %s cannot be found", "bar")
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def test_does_nothing_if_content_is_not_local(self, mock_logger):
        process_entity_retraction(Mock(entity_type="Post", target_guid=self.local_content.guid), Mock())
        mock_logger.assert_called_with(
            "Retracted remote content %s cannot be found", self.local_content.guid,
        )
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def test_does_nothing_if_content_author_is_not_same_as_remote_profile(self, mock_logger):
        remote_profile = Mock()
        process_entity_retraction(Mock(entity_type="Post", target_guid=self.content.guid), remote_profile)
        mock_logger.assert_called_with(
            "Content %s is not owned by remote retraction profile %s", self.content, remote_profile
        )
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def test_deletes_content(self):
        process_entity_retraction(
            Mock(entity_type="Post", target_guid=self.content.guid, handle=self.content.author.handle),
            self.content.author
        )
        with self.assertRaises(Content.DoesNotExist):
            Content.objects.get(id=self.content.id)
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def test_returns_none_on_exception(self, mock_post):
        entity = make_federable_retraction(Mock(), Mock())
        self.assertIsNone(entity)
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def test_remote_profile_public_key_is_returned(self, mock_from_remote, mock_retrieve):
        remote_profile = Mock(rsa_public_key="foo")
        mock_retrieve.return_value = mock_from_remote.return_value = remote_profile
        self.assertEqual(sender_key_fetcher("bar"), "foo")
        mock_retrieve.assert_called_once_with("bar")
        mock_from_remote.assert_called_once_with(remote_profile)
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def test_returns_legacy_xml_payload(self):
        self.assertEqual(
            DiasporaReceiveViewMixin.get_payload_from_request(Mock(body="foobar", POST={"xml": "barfoo"})),
            "barfoo",
        )