Python unittest.mock 模块,call() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用unittest.mock.call()

项目:flash_services    作者:textbook    | 项目源码 | 文件源码
def test_update_details(debug, service):
    service.current_iteration = 1
    service.project_version = 1
    service._cached = {'foo': 'bar'}
    name = 'foo'
    responses.add(
        responses.GET,
        'https://www.pivotaltracker.com/services/v5/projects/123',
        json={'current_iteration_number': 1, 'name': name},
        adding_headers={'X-Tracker-Project-Version': '2'},
    )
    responses.add(
        responses.GET,
        'https://www.pivotaltracker.com/services/v5/projects/123/iterations'
        '/1?fields=%3Adefault%2Cvelocity%2Cstories',
        json={'velocity': 10, 'stories': []}
    )

    result = service.update()

    debug.assert_has_calls([
        mock.call('fetching Tracker project data'),
        mock.call('project updated, fetching iteration details'),
    ])
    assert result == dict(velocity=10, stories={}, name=name)
项目:foremast    作者:gogoair    | 项目源码 | 文件源码
def test_iam_role_policy(resource_action, get_template, get_properties, get_details, construct_policy, session,
                         attach_profile_to_role):
    """IAM Role Policy should match deployment type."""
    get_properties.return_value = {'type': 'ec2'}
    get_details.return_value.iam.return_value = {'group': 1, 'policy': 2, 'profile': 3, 'role': 4, 'user': 5}

    assert create_iam_resources()

    get_template.assert_called_with(EC2_TEMPLATE_NAME)
    calls = [
        mock.call(
            mock.ANY,
            action='create_role',
            log_format=mock.ANY,
            RoleName=mock.ANY,
            AssumeRolePolicyDocument=get_template.return_value)
    ]
    resource_action.assert_has_calls(calls)
项目:foremast    作者:gogoair    | 项目源码 | 文件源码
def test_create_s3_event_multiple_buckets(mock_get_properties, mock_create_s3_event, mock_remove_perms, mock_boto3, mock_arn, mock_perms):
    """Try to create a lambda with two S3 triggers from different buckets."""

    triggers = TRIGGERS_BUCKET_A + TRIGGERS_BUCKET_B
    properties = get_properties_with_triggers(triggers)
    mock_get_properties.return_value = properties
    mock_arn.return_value = 'fake_arn'

    events = LambdaEvent(app='test_app', env='test_env', region='us-east-1', prop_path='other')
    events.create_lambda_events()

    s3_calls = [
        mock.call(app_name='test_app', env='test_env', region='us-east-1', bucket='my.shared.bucket', triggers=TRIGGERS_BUCKET_A),
        mock.call(app_name='test_app', env='test_env', region='us-east-1', bucket='my.other.shared.bucket', triggers=TRIGGERS_BUCKET_B)
    ]

    mock_create_s3_event.assert_has_calls(s3_calls, any_order=True)
项目:rosie    作者:datasciencebr    | 项目源码 | 文件源码
def test_call(self, mocked_predict, mocked_load):
        mocked_load.return_value = 'model'
        settings = MagicMock()
        settings.UNIQUE_IDS = ['number']
        settings.CLASSIFIERS = {'answer': 42, 'another': 13}
        core = Core(settings, self.adapter)
        core.suspicions = MagicMock()
        core()

        # assert load and predict was called for each classifier
        mocked_load.assert_has_calls((call(42), call(13)), any_order=True)
        mocked_predict.assert_has_calls((
            call('model', 'answer'),
            call('model', 'another')
        ), any_order=True)

        # assert suspicions.xz was created
        expected_path = os.path.join('tmp', 'test', 'suspicions.xz')
        core.suspicions.to_csv.assert_called_once_with(
            expected_path,
            compression='xz',
            encoding='utf-8',
            index=False
        )
项目:mistletoe    作者:miyuchina    | 项目源码 | 文件源码
def test_interactive(self, mock_print, mock_markdown,
                         mock_print_heading, mock_import_readline):
        def MockInputFactory(return_values):
            _counter = -1
            def mock_input(prompt=''):
                nonlocal _counter
                _counter += 1
                if _counter < len(return_values):
                    return return_values[_counter]
                elif _counter == len(return_values):
                    raise EOFError
                else:
                    raise KeyboardInterrupt
            return mock_input

        return_values = ['foo', 'bar', 'baz']
        with patch('builtins.input', MockInputFactory(return_values)):
            cli.interactive(sentinel.RendererCls)

        mock_import_readline.assert_called_with()
        mock_print_heading.assert_called_with(sentinel.RendererCls)
        mock_markdown.assert_called_with(['foo\n', 'bar\n', 'baz\n'],
                sentinel.RendererCls)
        calls = [call('\nrendered text', end=''), call('\nExiting.')]
        mock_print.assert_has_calls(calls)
项目:pyconnectome    作者:neurospin    | 项目源码 | 文件源码
def test_normal_execution(self, mock_isfile, mock_glob):
        """ Test the normal behaviour of the function.
        """
        # Set the mocked function returned values.
        mock_isfile.side_effect = [True]
        mock_glob.return_value = ["/my/path/mock_output"]

        # Test execution
        fslreorient2std(**self.kwargs)

        self.assertEqual([
            mock.call(["which", "fslreorient2std"],
                      env={}, stderr=-1, stdout=-1),
            mock.call(["fslreorient2std",
                      self.kwargs["input_image"],
                      self.kwargs["output_image"]],
                      cwd=None, env={}, stderr=-1, stdout=-1)],
            self.mock_popen.call_args_list)
        self.assertEqual(len(self.mock_env.call_args_list), 1)
项目:aioviber    作者:nonamenix    | 项目源码 | 文件源码
def test_get_account_info(api):
    await api.get_account_info()
    assert api._make_request.call_args == call('get_account_info')

#
# class CoroutineContextManager(CoroutineMock):
#     async def __aexit__(self, *args, **kwargs):
#         pass
#
#     async def __aenter__(self, *args, **kwargs):
#         return Mock()
#
#
# async def test_make_request(bot_configuration, loop):
#     api = Api(bot_configuration, CoroutineContextManager(), loop)
#     api._logger = Mock()
#     await api._make_request(api.endpoints.SEND_MESSAGE)
项目:heaviside    作者:jhuapl-boss    | 项目源码 | 文件源码
def test_credentials(self, mSession):
        iSession = mSession.return_value = MockSession()
        access = 'XXX'
        secret = 'YYY'
        account = '123456'
        region = 'east'
        credentials = {
            'aws_access_key': access,
            'aws_secret_key': secret,
            'aws_account_id': account,
            'aws_region': region
        }

        session, account_id = utils.create_session(credentials = credentials)

        self.assertEqual(session, iSession)
        self.assertEqual(account_id, account)
        self.assertEqual(iSession.region, region)

        call = mock.call(aws_access_key_id = access,
                         aws_secret_access_key = secret)
        calls = [call]
        self.assertEqual(mSession.mock_calls, calls)
项目:heaviside    作者:jhuapl-boss    | 项目源码 | 文件源码
def test_keys(self, mSession):
        iSession = mSession.return_value = MockSession()
        access = 'XXX'
        secret = 'YYY'
        account = '123456'
        region = 'east'
        credentials = {
            'aws_access_key': access,
            'aws_secret_key': secret,
            'aws_account_id': account,
            'aws_region': region
        }

        # Note: Same as test_credentials, except passing the arguments are key word args
        session, account_id = utils.create_session(**credentials)

        self.assertEqual(session, iSession)
        self.assertEqual(account_id, account)
        self.assertEqual(iSession.region, region)

        call = mock.call(aws_access_key_id = access,
                         aws_secret_access_key = secret)
        calls = [call]
        self.assertEqual(mSession.mock_calls, calls)
项目:heaviside    作者:jhuapl-boss    | 项目源码 | 文件源码
def test_constructor(self, mCreateSession):
        iSession = MockSession()
        mCreateSession.return_value = (iSession, '123456')
        client = iSession.client('stepfunctions')
        client.list_state_machines.return_value = {
            'stateMachines':[{
                'name': 'name',
                'stateMachineArn': 'XXX'
            }]
        }

        machine = StateMachine('name')

        self.assertEqual(machine.arn, 'XXX')

        calls = [
            mock.call.list_state_machines()
        ]
        self.assertEqual(client.mock_calls, calls)
项目:heaviside    作者:jhuapl-boss    | 项目源码 | 文件源码
def test_create_exists(self, mCreateSession):
        iSession = MockSession()
        mCreateSession.return_value = (iSession, '123456')

        client = iSession.client('stepfunctions')
        client.list_state_machines.return_value = {
            'stateMachines':[{
                'name': 'name',
                'stateMachineArn': 'XXX'
            }]
        }

        machine = StateMachine('name')

        with self.assertRaises(Exception):
            machine.create('source', 'role')

        self.assertEqual(machine.arn, 'XXX')

        calls = [
            mock.call.list_state_machines(),
        ]
        self.assertEqual(client.mock_calls, calls)
项目:heaviside    作者:jhuapl-boss    | 项目源码 | 文件源码
def test_delete_exception(self, mCreateSession):
        iSession = MockSession()
        mCreateSession.return_value = (iSession, '123456')

        client = iSession.client('stepfunctions')
        client.list_state_machines.return_value = {
            'stateMachines':[]
        }

        machine = StateMachine('name')

        with self.assertRaises(Exception):
            machine.delete(True)

        self.assertEqual(machine.arn, None)

        calls = [
            mock.call.list_state_machines(),
        ]
        self.assertEqual(client.mock_calls, calls)
项目:heaviside    作者:jhuapl-boss    | 项目源码 | 文件源码
def test_start_doesnt_exist(self, mCreateSession):
        iSession = MockSession()
        mCreateSession.return_value = (iSession, '123456')

        client = iSession.client('stepfunctions')
        client.list_state_machines.return_value = {
            'stateMachines':[]
        }

        machine = StateMachine('name')

        with self.assertRaises(Exception):
            machine.start({}, 'run')

        calls = [
            mock.call.list_state_machines(),
        ]
        self.assertEqual(client.mock_calls, calls)
项目:heaviside    作者:jhuapl-boss    | 项目源码 | 文件源码
def test_stop(self, mCreateSession):
        iSession = MockSession()
        mCreateSession.return_value = (iSession, '123456')

        client = iSession.client('stepfunctions')
        client.list_state_machines.return_value = {
            'stateMachines':[{
                'name': 'name',
                'stateMachineArn': 'XXX'
            }]
        }

        machine = StateMachine('name')
        machine.stop('arn', 'error', 'cause')

        calls = [
            mock.call.list_state_machines(),
            mock.call.stop_execution(executionArn = 'arn',
                                     error = 'error',
                                     cause = 'cause')

        ]
        self.assertEqual(client.mock_calls, calls)
项目:heaviside    作者:jhuapl-boss    | 项目源码 | 文件源码
def test_success(self, mCreateSession):
        iSession = MockSession()
        client = iSession.client('stepfunctions')
        mCreateSession.return_value = (iSession, '123456')

        task = TaskMixin()
        task.token = 'token'

        self.assertEqual(task.token, 'token')

        task.success(None)

        self.assertEqual(task.token, None)
        call = mock.call.send_task_success(taskToken = 'token',
                                           output = 'null')
        self.assertEqual(client.mock_calls, [call])
项目:heaviside    作者:jhuapl-boss    | 项目源码 | 文件源码
def test_failure(self, mCreateSession):
        iSession = MockSession()
        client = iSession.client('stepfunctions')
        mCreateSession.return_value = (iSession, '123456')

        task = TaskMixin()
        task.token = 'token'

        self.assertEqual(task.token, 'token')

        task.failure(None, None)

        self.assertEqual(task.token, None)
        call = mock.call.send_task_failure(taskToken = 'token',
                                           error = None,
                                           cause = None)
        self.assertEqual(client.mock_calls, [call])
项目:heaviside    作者:jhuapl-boss    | 项目源码 | 文件源码
def test_run_generator(self, mCreateSession):
        iSession = MockSession()
        mCreateSession.return_value = (iSession, '123456')
        client = iSession.client('stepfunctions')

        def target(input_):
            yield
            yield
            return

        # Just make sure the target is actually a generator
        self.assertEqual(type(target(None)), types.GeneratorType)

        task = TaskMixin(process = target)
        task.handle_task('token', None)

        self.assertEqual(task.token, None)
        call = mock.call.send_task_success(taskToken = 'token',
                                           output = 'null')
        call_ = mock.call.send_task_heartbeat(taskToken = 'token')
        calls = [call_, call_, call]
        self.assertEqual(client.mock_calls, calls)
项目:heaviside    作者:jhuapl-boss    | 项目源码 | 文件源码
def test_run_exception(self, mCreateSession):
        iSession = MockSession()
        mCreateSession.return_value = (iSession, '123456')
        client = iSession.client('stepfunctions')

        target = mock.MagicMock()
        target.side_effect = BossError('cause')

        task = TaskMixin(process = target)
        task.handle_task('token', None)

        self.assertEqual(task.token, None)
        call = mock.call.send_task_failure(taskToken = 'token',
                                           error = 'BossError',
                                           cause = 'cause')
        self.assertEqual(client.mock_calls, [call])
项目:heaviside    作者:jhuapl-boss    | 项目源码 | 文件源码
def test_create_activity(self, mCreateSession):
        iSession = MockSession()
        mCreateSession.return_value = (iSession, '123456')
        client = iSession.client('stepfunctions')
        client.create_activity.return_value = {
            'activityArn': 'XXX'
        }

        activity = ActivityMixin()
        activity.name = 'name'

        self.assertEqual(activity.arn, None)

        activity.create_activity()

        self.assertEqual(activity.arn, 'XXX')

        calls = [
            mock.call.create_activity(name = 'name')
        ]
        self.assertEqual(client.mock_calls, calls)
项目:heaviside    作者:jhuapl-boss    | 项目源码 | 文件源码
def test_delete_activity(self, mCreateSession):
        iSession = MockSession()
        mCreateSession.return_value = (iSession, '123456')
        client = iSession.client('stepfunctions')

        activity = ActivityMixin()
        activity.arn = 'XXX'

        activity.delete_activity()

        self.assertEqual(activity.arn, None)

        calls = [
            mock.call.delete_activity(activityArn = 'XXX')
        ]
        self.assertEqual(client.mock_calls, calls)
项目:heaviside    作者:jhuapl-boss    | 项目源码 | 文件源码
def test_poll_task(self, mCreateSession):
        iSession = MockSession()
        mCreateSession.return_value = (iSession, '123456')
        client = iSession.client('stepfunctions')
        client.get_activity_task.return_value = {
            'taskToken': 'YYY',
            'input': '{}'
        }

        activity = ActivityMixin()
        activity.arn = 'XXX'

        token,  input_ = activity.poll_task('worker')

        self.assertEqual(token, 'YYY')
        self.assertEqual(input_, {})

        calls = [
            mock.call.get_activity_task(activityArn = 'XXX',
                                        workerName = 'worker')
        ]
        self.assertEqual(client.mock_calls, calls)
项目:heaviside    作者:jhuapl-boss    | 项目源码 | 文件源码
def test_poll_task_no_work(self, mCreateSession):
        iSession = MockSession()
        mCreateSession.return_value = (iSession, '123456')
        client = iSession.client('stepfunctions')
        client.get_activity_task.return_value = {
            'taskToken': ''
        }

        activity = ActivityMixin()
        activity.arn = 'XXX'

        token,  input_ = activity.poll_task('worker')

        self.assertEqual(token, None)
        self.assertEqual(input_, None)

        calls = [
            mock.call.get_activity_task(activityArn = 'XXX',
                                        workerName = 'worker')
        ]
        self.assertEqual(client.mock_calls, calls)
项目:python-webscraping    作者:pgrangeiro    | 项目源码 | 文件源码
def test_execute_calls_save_only_if_product_has_not_saved_before(self):
        def side_effect():
            return [
                (1, 'name', 'title', 'url'),
                (2, 'Name', 'Title', 'Url'),
                (1, 'name', 'title', 'url'),
            ]
        self.crawler_uc.execute.side_effect = side_effect

        GetProductsFromSiteUseCase.execute()

        assert 2 == self.saveinfo_uc.execute.call_count
        self.saveinfo_uc.execute.assert_has_calls([
            call('name', 'title', 'url'),
            call('Name', 'Title', 'Url'),
        ])
项目:qcore    作者:quora    | 项目源码 | 文件源码
def test_memoize():
    """Test Caching with no Time-To-Live (TTL)."""
    global x
    for fn in memoize_fns:
        x = 0
        assert_eq(4, fn(1))
        assert_eq(1, x)

        assert_eq(8, fn(2, 4))
        assert_eq(2, x)
        # should not result in another call
        assert_eq(8, fn(2, z=4))
        assert_eq(2, x)
        assert_eq(8, fn(y=2, z=4))
        assert_eq(2, x)

        fn.clear_cache()
        assert_eq(4, fn(1))
        assert_eq(3, x)
项目:qcore    作者:quora    | 项目源码 | 文件源码
def test_memoize_with_ttl_unhashable():
    """Test Caching with TTL using dictionary arguments."""
    global x
    x = 0
    assert_eq(12, cached_fn_with_ttl_unhashable(2))
    assert_eq(1, x)

    assert_eq(10, cached_fn_with_ttl_unhashable(1, z={'a': 2, 'b': 3, 'c': 5}))
    assert_eq(2, x)
    # should not result in another call
    assert_eq(10, cached_fn_with_ttl_unhashable(1, z={'a': 2, 'b': 3, 'c': 5}))
    assert_eq(2, x)
    assert_eq(12, cached_fn_with_ttl_unhashable(2, z={'a': 1, 'b': 2, 'c': 3}))
    assert_eq(2, x)

    cached_fn_with_ttl_unhashable.clear_cache()
    assert_eq(12, cached_fn_with_ttl_unhashable(2))
    assert_eq(3, x)
项目:deep-learning-nd    作者:RyanCCollins    | 项目源码 | 文件源码
def test_discriminator(discriminator, tf_module):
    with TmpMock(tf_module, 'variable_scope') as mock_variable_scope:
        image = tf.placeholder(tf.float32, [None, 28, 28, 3])

        output, logits = discriminator(image)
        _assert_tensor_shape(output, [None, 1], 'Discriminator Training(reuse=false) output')
        _assert_tensor_shape(logits, [None, 1], 'Discriminator Training(reuse=false) Logits')
        assert mock_variable_scope.called,\
            'tf.variable_scope not called in Discriminator Training(reuse=false)'
        assert mock_variable_scope.call_args == mock.call('discriminator', reuse=False), \
            'tf.variable_scope called with wrong arguments in Discriminator Training(reuse=false)'

        mock_variable_scope.reset_mock()

        output_reuse, logits_reuse = discriminator(image, True)
        _assert_tensor_shape(output_reuse, [None, 1], 'Discriminator Inference(reuse=True) output')
        _assert_tensor_shape(logits_reuse, [None, 1], 'Discriminator Inference(reuse=True) Logits')
        assert mock_variable_scope.called, \
            'tf.variable_scope not called in Discriminator Inference(reuse=True)'
        assert mock_variable_scope.call_args == mock.call('discriminator', reuse=True), \
            'tf.variable_scope called with wrong arguments in Discriminator Inference(reuse=True)'
项目:deep-learning-nd    作者:RyanCCollins    | 项目源码 | 文件源码
def test_generator(generator, tf_module):
    with TmpMock(tf_module, 'variable_scope') as mock_variable_scope:
        z = tf.placeholder(tf.float32, [None, 100])
        out_channel_dim = 5

        output = generator(z, out_channel_dim)
        _assert_tensor_shape(output, [None, 28, 28, out_channel_dim], 'Generator output (is_train=True)')
        assert mock_variable_scope.called, \
            'tf.variable_scope not called in Generator Training(reuse=false)'
        assert mock_variable_scope.call_args == mock.call('generator', reuse=False), \
            'tf.variable_scope called with wrong arguments in Generator Training(reuse=false)'

        mock_variable_scope.reset_mock()
        output = generator(z, out_channel_dim, False)
        _assert_tensor_shape(output, [None, 28, 28, out_channel_dim], 'Generator output (is_train=False)')
        assert mock_variable_scope.called, \
            'tf.variable_scope not called in Generator Inference(reuse=True)'
        assert mock_variable_scope.call_args == mock.call('generator', reuse=True), \
            'tf.variable_scope called with wrong arguments in Generator Inference(reuse=True)'
项目:easy-job    作者:inb-co    | 项目源码 | 文件源码
def test_normal_start(self, runner, process, getLogger):
        assert isinstance(process, mock.MagicMock)
        # Arrange
        process_count = 4
        logger = getLogger.return_value = mock.MagicMock()
        expected_result = runner.return_value = "SOME_RETURN_VALUE"
        result_backend = {
            'result_backend_class': 'easy_job.result_backends.dummy.DummyBackend'
        }
        # Act
        initializer = RabbitMQInitializer(count=process_count, logger="logger", result_backend=result_backend,
                                          options={'serialization_method': 'json', 'queue_name': 'queue',
                                                   'rabbitmq_configs': {}})
        result = initializer.start()
        # Assert
        logger.log.assert_called_once_with(10, "Starting {} RabbitMQ workers...".format(process_count))

        process.assert_has_calls([mock.call(args=(mock.ANY,), target=mock.ANY), mock.call().start()] * process_count)
        self.assertEqual(result, expected_result)
        _, args, kwargs = process.mock_calls[0]
        worker_object = kwargs['args'][0]
        self.assertIsInstance(worker_object, RabbitMQWorker)
项目:easy-job    作者:inb-co    | 项目源码 | 文件源码
def test_actually_calling_retrying_library_in_a_failing_condition(self, ):
        # Arrange
        function_side_effects = [IndentationError(), SyntaxError(), OSError()]
        func = mock.MagicMock(side_effect=function_side_effects)
        from easy_job.workers.common import call_with_retry as target
        retrying_params = {
            "stop_max_attempt_number": 3
        }
        args = (1, 2, 3)
        kwargs = {"p": "arameter"}
        # Act
        with self.assertRaises(OSError):
            target(func, args, kwargs, retry_policy=retrying_params)

        # Assert
        func.assert_has_calls([mock.call(*args, **kwargs),
                               mock.call(*args, **kwargs),
                               mock.call(*args, **kwargs)]
                              )
项目:directory-ui-buyer    作者:uktrade    | 项目源码 | 文件源码
def test_case_study_create_api_success(
    mock_create_case_study, supplier_case_study_end_to_end, sso_user,
    all_case_study_data, api_response_200
):
    mock_create_case_study.return_value = api_response_200

    response = supplier_case_study_end_to_end()
    assert response.status_code == http.client.FOUND
    assert response.get('Location') == reverse('company-detail')
    data = {
        **all_case_study_data,
        'image_one': ANY, 'image_two': ANY, 'image_three': ANY,
    }
    # django converts uploaded files to UploadedFile, which makes
    # `assert_called_once_with` tricky.

    assert mock_create_case_study.call_count == 1
    assert mock_create_case_study.call_args == call(
        data=data,
        sso_session_id=sso_user.session_id,
    )
项目:directory-ui-buyer    作者:uktrade    | 项目源码 | 文件源码
def test_supplier_sectors_edit_standalone_view_api_success(
    mock_update_profile, has_company_client,
    company_profile_sectors_standalone_data,
    api_response_200,
    sso_user,
):
    mock_update_profile.return_value = api_response_200

    url = reverse('company-edit-sectors')
    has_company_client.post(url, company_profile_sectors_standalone_data)
    assert mock_update_profile.call_count == 1
    assert mock_update_profile.call_args == call(
        sso_session_id=sso_user.session_id,
        data={
            'sectors': ['AGRICULTURE_HORTICULTURE_AND_FISHERIES'],
            'export_destinations': ['CN', 'IN'],
            'export_destinations_other': 'West Philadelphia',
        }
    )
项目:directory-ui-buyer    作者:uktrade    | 项目源码 | 文件源码
def test_verify_company_address_end_to_end(
    mock_update_profile, settings, has_company_client,
    send_verification_letter_end_to_end
):
    settings.FEATURE_COMPANIES_HOUSE_OAUTH2_ENABLED = True
    view = views.SendVerificationLetterView

    response = send_verification_letter_end_to_end()

    assert response.status_code == 200
    assert response.template_name == view.templates[view.SENT]
    assert mock_update_profile.call_count == 1
    assert mock_update_profile.call_args == call(
        data={
            'postal_full_name': 'Jeremy',
        },
        sso_session_id='213'
    )
项目:monitoring-scripts    作者:CntoDev    | 项目源码 | 文件源码
def test_wait(mocker, config):  # pylint: disable=W0621
    """Assert runner waits before re-running the monitoring script"""

    mock_execution = mocker.Mock()
    mock_execution.returncode = Codes.CRITICAL.value

    mocker.patch('subprocess.run', return_value=mock_execution)
    mocker.patch('cachetclient.cachet.Components.put')
    time_sleep = mocker.patch('time.sleep')

    n_retries = 5
    d_interval = 10

    with pytest.raises(SystemExit):
        unit.main(script=mocker.Mock(), component_id=99, config_file=config, retries=n_retries,
                  interval=d_interval)

    expected_calls = [call(10), call(10), call(10), call(10), call(10)]

    assert time_sleep.call_args_list == expected_calls
项目:alexa-browser-client    作者:richtier    | 项目源码 | 文件源码
def test_create_amazon_refresh_token_no_defaults(mock_http_server, settings):
    settings.ALEXA_VOICE_SERVICE_CLIENT_ID = 'my-client-id'
    settings.ALEXA_VOICE_SERVICE_CLIENT_SECRET = 'my-client-secret'
    settings.ALEXA_VOICE_SERVICE_DEVICE_TYPE_ID = 'my-device-type-id'

    out = StringIO()
    call_command(
        'create_amazon_refresh_token',
        '--address=127.0.0.1',
        '--port=9000',
        stdout=out
    )

    assert mock_http_server.call_count == 1
    assert mock_http_server.call_args == call(
        server_address=('127.0.0.1', 9000),
        RequestHandlerClass=handlers.AmazonAlexaServiceLoginHandler,
        client_id='my-client-id',
        client_secret='my-client-secret',
        device_type_id='my-device-type-id',
    )

    assert 'running server on http://127.0.0.1:9000' in out.getvalue()
项目:dirtools3    作者:kirpit    | 项目源码 | 文件源码
def test_internal_find_index_called_for_each_item(self, monkeypatch, tmp_folder):
        """Internal finding an index number whilst scanning only called within
        _insert_sorted() method during async scanning. So it should be called
        only per item. Testing for: def _find_index(self, summary, sort_by).

        :param monkeypatch: pytest monkey patch fixture
        :type monkeypatch: _pytest.monkeypatch.MonkeyPatch
        :param tmp_folder: Test params and dummy test folder factory fixture pair.
        :type tmp_folder: (dict, dirtools.tests.factory.DummyFolderFactory)
        """
        params, factory = tmp_folder
        scan = Folder(factory.path, params['sort_by'], params['level'])
        find_index = Mock(return_value=0)
        monkeypatch.setattr(scan, '_find_index', find_index)

        for item in scan.items(humanise=False):
            assert call(item, params['sort_by']) in find_index.call_args_list
项目:lattice_mc    作者:bjmorgan    | 项目源码 | 文件源码
def test_cubic_lattice( self, mock_lattice, mock_site ):
        a, b, c, = 2, 2, 2
        spacing = 1.0
        mock_site.side_effect = range(2*2*2)
        mock_lattice.return_value = 'bar'
        lattice = init_lattice.cubic_lattice( a, b, c, spacing )
        expected_site_calls = [ [ 1, np.array( [ 0., 0., 0. ] ), [ 2, 2, 3, 3, 5, 5 ], 0.0, 'L' ],
                                [ 2, np.array( [ 1., 0., 0. ] ), [ 1, 1, 4, 4, 6, 6 ], 0.0, 'L' ],
                                [ 3, np.array( [ 0., 1., 0. ] ), [ 4, 4, 1, 1, 7, 7 ], 0.0, 'L' ],
                                [ 4, np.array( [ 1., 1., 0. ] ), [ 3, 3, 2, 2, 8, 8 ], 0.0, 'L' ],
                                [ 5, np.array( [ 0., 0., 1. ] ), [ 6, 6, 7, 7, 1, 1 ], 0.0, 'L' ],
                                [ 6, np.array( [ 1., 0., 1. ] ), [ 5, 5, 8, 8, 2, 2 ], 0.0, 'L' ],
                                [ 7, np.array( [ 0., 1., 1. ] ), [ 8, 8, 5, 5, 3, 3 ], 0.0, 'L' ],
                                [ 8, np.array( [ 1., 1., 1. ] ), [ 7, 7, 6, 6, 4, 4 ], 0.0, 'L' ] ]
        for call, e in zip( mock_site.mock_calls, expected_site_calls ):
            self.assertEqual( call[1][0], e[0] ) # site number
            np.testing.assert_array_equal( call[1][1], e[1] ) # site coordinates
            self.assertEqual( call[1][2], e[2] ) # neighbour lists
            self.assertEqual( call[1][3], e[3] ) # ?
            self.assertEqual( call[1][4], e[4] ) # site label
        self.assertEqual( mock_lattice.mock_calls[0][1][0], list(range(2*2*2)) )
        self.assertEqual( lattice, 'bar' )
        np.testing.assert_array_equal( mock_lattice.mock_calls[0][2]['cell_lengths'], np.array( [ a, b, c, ] ) )
项目:lattice_mc    作者:bjmorgan    | 项目源码 | 文件源码
def test_potential_jumps_if_lattice_is_mostly_empty( self, mock_Jump ): # lattice is mostly vacant
        jumps = [ Mock( spec=Jump ), Mock( spec=Jump ) ]
        mock_Jump.side_effect = jumps
        self.lattice.number_of_occupied_sites = 1 
        self.lattice_number_of_sites = 4
        site = Mock( spec=Site )
        site.neighbours = [ 2, 3 ]
        occupied_sites = [ site ]
        unoccupied_sites = [ Mock( spec=Site ), Mock( spec=Site ), Mock( spec=Site ) ]
        self.lattice.nn_energy = 'A'
        self.lattice.cn_energies = 'B'
        self.lattice.jump_lookup_table = 'C'
        for s in unoccupied_sites:
            s.is_occupied = False
        with patch( 'lattice_mc.lattice.Lattice.occupied_sites' ) as mock_occupied_sites:
            with patch( 'lattice_mc.lattice.Lattice.site_with_id' ) as mock_site_with_id:
                mock_occupied_sites.return_value = occupied_sites
                mock_site_with_id.side_effect = unoccupied_sites[:2]
                potential_jumps = self.lattice.potential_jumps()
                self.assertEqual( potential_jumps, jumps )
                self.assertEqual( mock_Jump.mock_calls[0][1], ( site, unoccupied_sites[0], 'A', 'B', 'C' ) )
                self.assertEqual( mock_Jump.mock_calls[1][1], ( site, unoccupied_sites[1], 'A', 'B', 'C' ) )
                mock_site_with_id.assert_has_calls( [ call(2), call(3) ] )
项目:lattice_mc    作者:bjmorgan    | 项目源码 | 文件源码
def test_potential_jumps_if_lattice_is_mostly_filled( self, mock_Jump ): # lattice is mostly occupied
        mock_Jump.side_effect = [ 'jump1', 'jump2' ]
        self.lattice.number_of_occupied_sites = 3
        self.lattice_number_of_sites = 4
        site = Mock( spec=Site )
        site.neighbours = [ 2, 3 ]
        vacant_sites = [ site ]
        occupied_sites = [ Mock( spec=Site ), Mock( spec=Site ), Mock( spec=Site ) ]
        for s in occupied_sites:
            s.is_occupied = True
        self.lattice.nn_energy = 'A'
        self.lattice.cn_energies = 'B'
        self.lattice.jump_lookup_table = 'C'
        with patch( 'lattice_mc.lattice.Lattice.vacant_sites' ) as mock_vacant_sites:
            with patch( 'lattice_mc.lattice.Lattice.site_with_id' ) as mock_site_with_id:
                mock_vacant_sites.return_value = vacant_sites
                mock_site_with_id.side_effect = occupied_sites[:2]
                jumps = self.lattice.potential_jumps()
                self.assertEqual( jumps, [ 'jump1', 'jump2' ] )
                self.assertEqual( mock_Jump.mock_calls[0][1], ( occupied_sites[0], site, 'A', 'B', 'C' ) )
                self.assertEqual( mock_Jump.mock_calls[1][1], ( occupied_sites[1], site, 'A', 'B', 'C' ) )
                mock_site_with_id.assert_has_calls( [ call(2), call(3) ] )
项目:pypyr-cli    作者:pypyr    | 项目源码 | 文件源码
def test_run_pipeline_pass_skip_parse_context(mocked_work_dir,
                                              mocked_get_pipe_def,
                                              mocked_get_parsed_context,
                                              mocked_run_step_group):
    """run_pipeline passes correct params to all methods."""

    pypyr.pipelinerunner.run_pipeline(
        pipeline_name='arb pipe',
        working_dir='arb/dir',
        parse_input=False)

    mocked_work_dir.assert_not_called()
    mocked_get_pipe_def.assert_called_once_with(pipeline_name='arb pipe',
                                                working_dir='arb/dir')
    mocked_get_parsed_context.assert_not_called()

    # 1st called steps, then on_success
    expected_run_step_groups = [call(context={},
                                     pipeline_definition='pipe def',
                                     step_group_name='steps'),
                                call(context={},
                                     pipeline_definition='pipe def',
                                     step_group_name='on_success')]

    mocked_run_step_group.assert_has_calls(expected_run_step_groups)
项目:volumio-buddy    作者:foxey    | 项目源码 | 文件源码
def test6_RGBLED_set(self, mock_wiringpi_softPwmCreate, mock_wiringpi_softPwmWrite, \
            mock_wiringpi_ISR, mock_wiringpi_pinMode, mock_wiringpi_setup):
        gpio_pin_r = 10
        gpio_pin_g = 8
        gpio_pin_b = 6
        led = RGBLED(gpio_pin_r, gpio_pin_g, gpio_pin_b)
        calls = [call(gpio_pin_r, 1), call(gpio_pin_g, 1), call(gpio_pin_b, 1)]
        mock_wiringpi_pinMode.assert_has_calls(calls)
        calls = [call(gpio_pin_r, 0, 100), call(gpio_pin_g, 0, 100), call(gpio_pin_b, 0, 100)]
        mock_wiringpi_softPwmCreate.assert_has_calls(calls)
        r_level = 1
        g_level = 2
        b_level = 3
        led.set(r_level, g_level, b_level)
        calls = [call(gpio_pin_r, r_level), call(gpio_pin_g, g_level), call(gpio_pin_b, b_level)]
        mock_wiringpi_softPwmWrite.assert_has_calls(calls)
        self.assertEqual(led.get(), (1, 2, 3))
项目:PilosusBot    作者:pilosus    | 项目源码 | 文件源码
def test_send_message_to_chat(self, mock_requests):
        parsed_update = parse_update(TelegramUpdates.TEXT_OK_ID_OK_TEXT)
        parsed_update['parse_mode'] = 'HTML'
        parsed_update['text'] = 'Sentiment'

        result = send_message_to_chat.delay(parsed_update).get(timeout=5)

        # on success the Message sent to the chat is returned
        self.assertEqual(result['text'], parsed_update['text'])
        self.assertEqual(result['parse_mode'], parsed_update['parse_mode'])
        self.assertEqual(result['reply_to_message_id'], parsed_update['reply_to_message_id'])
        self.assertEqual(result['chat_id'], parsed_update['chat_id'])

        mock_requests.assert_called()
        self.assertIn(call(current_app.config['TELEGRAM_URL'] + 'sendMessage',
                      json=parsed_update,
                      timeout=current_app.config['TELEGRAM_REQUEST_TIMEOUT_SEC']),
                      mock_requests.call_args_list)
项目:intake    作者:codeforamerica    | 项目源码 | 文件源码
def test_build_bundled_pdfs_if_some_are_not_prefilled(
            self, logger, get_parser, SimpleUploadedFile, slack, SubService):
        # two submissions
        get_parser.return_value.join_pdfs.return_value = b'pdf'
        mock_submissions = [Mock(), Mock()]
        mock_bundle = Mock(pk=2)
        mock_bundle.should_have_a_pdf.return_value = True
        # one is not prefilled
        mock_bundle.get_individual_filled_pdfs.return_value = [Mock()]
        mock_bundle.submissions.all.return_value = mock_submissions
        mock_bundle.organization.pk = 1
        # run
        BundlesService.build_bundled_pdf_if_necessary(mock_bundle)
        error_msg = "Submissions for ApplicationBundle(pk=2) lack pdfs"
        logger.error.assert_called_once_with(error_msg)
        slack.assert_called_once_with(error_msg)
        self.assertEqual(
            len(mock_bundle.get_individual_filled_pdfs.mock_calls), 2)
        mock_bundle.save.assert_called_once_with()
        SubService.fill_pdfs_for_submission.assert_has_calls(
            [call(mock_sub) for mock_sub in mock_submissions], any_order=True)
项目:python-shaarli-client    作者:shaarli    | 项目源码 | 文件源码
def test_generate_endpoint_parser_noparam(addargument):
    """Generate a parser from endpoint metadata - no params"""
    name = 'put-stuff'
    metadata = {
        'path': 'stuff',
        'method': 'PUT',
        'help': "Changes stuff",
        'params': {},
    }
    parser = ArgumentParser()
    subparsers = parser.add_subparsers()

    generate_endpoint_parser(subparsers, name, metadata)

    addargument.assert_has_calls([
        # first helper for the main parser
        mock.call('-h', '--help', action='help',
                  default=mock.ANY, help=mock.ANY),

        # second helper for the 'put-stuff' subparser
        mock.call('-h', '--help', action='help',
                  default=mock.ANY, help=mock.ANY)
    ])
项目:omnic    作者:michaelpb    | 项目源码 | 文件源码
def test_download_lstree_git(self):
        git_url = 'git://githoobie.com/lol.git<%s>' % tree_object
        await self._do_download(git_url)

        # ensure git config was appended to, and that the output file was
        # opened
        self.open.assert_any_call('/t/mut/lol.git/config', 'a')
        self.open.assert_any_call('/t/res/lol.git', 'w+')

        # ensure the sequence of git commands were called
        assert self.subprocess.run.mock_calls == [
            call(['git', 'clone', '--bare',
                  'git://githoobie.com/lol.git', '/t/mut/lol.git']),
            call(['git', 'ls-tree', '-r', '--long', '--full-tree',
                  tree_object],
                 cwd='/t/mut/lol.git', stdout=self.open()),
        ]
项目:omnic    作者:michaelpb    | 项目源码 | 文件源码
def test_download_directory_git(self):
        path = '/'
        git_url = 'git://githoobie.com/lol.git<%s><%s>' % (tree_object, path)
        await self._do_download(git_url)
        self._check_config()

        # ensure the sequence of git commands were called
        assert self.subprocess.run.mock_calls == [
            call(['git', 'clone', '--bare',
                  'git://githoobie.com/lol.git', '/t/mut/lol.git'],
                  cwd='/t/mut'),
            call(['git', 'rev-parse', '--quiet', '--verify', tree_object],
                 cwd='/t/mut/lol.git', stdout=-1),
            call(['git', 'archive', '--prefix=/t/res/lol.git',
                  '--format=directory', tree_object],
                 cwd='/t/mut/lol.git'),
        ]
项目:omnic    作者:michaelpb    | 项目源码 | 文件源码
def test_download_lstree_git(self):
        git_url = 'git://githoobie.com/lol.git<%s>' % tree_object
        await self._do_download(git_url)

        # ensure git config was appended to, and that the output file was
        # opened
        self.open.assert_any_call('/t/mut/lol.git/config', 'a')
        self.open.assert_any_call('/t/res/lol.git', 'w+')

        # ensure the sequence of git commands were called
        assert self.subprocess.run.mock_calls == [
            call(['git', 'clone', '--bare',
                  'git://githoobie.com/lol.git', '/t/mut/lol.git'],
                  cwd='/t/mut'),
            call(['git', 'rev-parse', '--quiet', '--verify', tree_object],
                 cwd='/t/mut/lol.git', stdout=-1),
            call(['git', 'ls-tree', '-r', '--long', '--full-tree',
                  tree_object],
                 cwd='/t/mut/lol.git', stdout=self.open()),
        ]
项目:omnic    作者:michaelpb    | 项目源码 | 文件源码
def test_update_when_hash_not_present(self):
        class FakeResult:
            stdout = b''
        self.subprocess.run.return_value = FakeResult
        path = 'README.md'
        git_url = 'git://githoobie.com/lol.git<%s><%s>' % (tree_object, path)
        await self._do_download(git_url)
        self._check_config()

        # ensure the sequence of git commands were called
        assert self.subprocess.run.mock_calls == [
            call(['git', 'clone', '--bare',
                  'git://githoobie.com/lol.git', '/t/mut/lol.git'],
                  cwd='/t/mut'),
            call(['git', 'rev-parse', '--quiet', '--verify', tree_object],
                 cwd='/t/mut/lol.git', stdout=-1),
            call(['git', 'fetch'], cwd='/t/mut/lol.git'),
            call(['git', 'archive', '--output=/t/res/README.md',
                  '--format=raw', tree_object, 'README.md'],
                 cwd='/t/mut/lol.git'),
        ]
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def test_calls_update_streams_with_content(self, mock_update):
        # Calls on create
        content = ContentFactory()
        mock_update.assert_called_once_with(content)
        mock_update.reset_mock()
        # Does not call on update
        content.text = "update!"
        content.save()
        self.assertFalse(mock_update.called)
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def test_local_content_with_parent_sent_as_reply(self, mock_send):
        user = UserFactory()
        parent = ContentFactory(author=user.profile)
        mock_send.reset_mock()
        content = ContentFactory(author=user.profile, parent=parent)
        self.assertTrue(content.local)
        call_args = [
            call(send_reply_notifications, content.id),
            call(send_reply, content.id),
        ]
        assert mock_send.call_args_list == call_args