Python unittest.mock 模块,ANY 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用unittest.mock.ANY

项目:foremast    作者:gogoair    | 项目源码 | 文件源码
def test_iam_role_policy(resource_action, get_template, get_properties, get_details, construct_policy, session,
                         attach_profile_to_role):
    """IAM Role Policy should match deployment type."""
    get_properties.return_value = {'type': 'ec2'}
    get_details.return_value.iam.return_value = {'group': 1, 'policy': 2, 'profile': 3, 'role': 4, 'user': 5}

    assert create_iam_resources()

    get_template.assert_called_with(EC2_TEMPLATE_NAME)
    calls = [
        mock.call(
            mock.ANY,
            action='create_role',
            log_format=mock.ANY,
            RoleName=mock.ANY,
            AssumeRolePolicyDocument=get_template.return_value)
    ]
    resource_action.assert_has_calls(calls)
项目:backend.ai-client-py    作者:lablup    | 项目源码 | 文件源码
def test_create_kernel_url():
    mock_resp = asynctest.MagicMock(spec=aiohttp.ClientResponse)
    mock_resp.status = 201
    mock_resp.json = asynctest.MagicMock()

    mock_req_obj = asynctest.MagicMock(spec=Request)
    mock_req_obj.asend.return_value = mock_resp

    with asynctest.patch('ai.backend.client.kernel.Request',
                         return_value=mock_req_obj) as mock_req_cls:
        await Kernel.get_or_create('python')

        mock_req_cls.assert_called_once_with(
            'POST', '/kernel/create', mock.ANY)
        mock_req_obj.asend.assert_called_once_with()
        mock_req_obj.asend.return_value.json.assert_called_once_with()
项目:scar    作者:grycap    | 项目源码 | 文件源码
def test_run_event_source(self, mock_aws_client):
        mock_aws_client.return_value.get_s3_file_list.return_value = ['test_file_1','test_file_2','test_file_3','test_file_4']
        args = Args()
        args.verbose = False
        args.event_source = 'test_bucket'
        Scar().run(args)        
        self.assertEqual(mock_aws_client.call_count, 1)
        # check_function_name_not_exists
        mock_aws_client.mock_calls[1].assert_called_with('test-name', False)
        # get_s3_file_list
        mock_aws_client.mock_calls[2].assert_called_with('test_bucket')
        # launch_request_response_event
        mock_aws_client.mock_calls[3].assert_called_with('test_file_1', {'Records': [{'eventSource': 'aws:s3', 's3': {'bucket': {'name': 'test_bucket'}, 'object': {'key': ''}}}]}, ANY, ANY)
        # launch_async_event
        mock_aws_client.mock_calls[4].assert_called_with('test_file_2', {'Records': [{'eventSource': 'aws:s3', 's3': {'bucket': {'name': 'test_bucket'}, 'object': {'key': ''}}}]}, ANY, ANY)
        # launch_async_event
        mock_aws_client.mock_calls[5].assert_called_with('test_file_3', {'Records': [{'eventSource': 'aws:s3', 's3': {'bucket': {'name': 'test_bucket'}, 'object': {'key': ''}}}]}, ANY, ANY)
        # launch_async_event
        mock_aws_client.mock_calls[6].assert_called_with('test_file_4', {'Records': [{'eventSource': 'aws:s3', 's3': {'bucket': {'name': 'test_bucket'}, 'object': {'key': ''}}}]}, ANY, ANY)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_callback_with_exception(self):
        def callback():
            raise ValueError()

        self.loop = mock.Mock()
        self.loop.call_exception_handler = mock.Mock()

        h = asyncio.Handle(callback, (), self.loop)
        h._run()

        self.loop.call_exception_handler.assert_called_with({
            'message': test_utils.MockPattern('Exception in callback.*'),
            'exception': mock.ANY,
            'handle': h,
            'source_traceback': h._source_traceback,
        })
项目:easy-job    作者:inb-co    | 项目源码 | 文件源码
def test_rabbitmq_worker_callback_method_with_invalid_serialization_method(self, getLogger, import_string):
        assert isinstance(import_string, mock.MagicMock)
        # Arrange
        queue_name = "queue_name"
        rabbitmq_configs = {}
        serialization_method = "invalid"
        result_backend = {
            "result_backend_class": "easy_job.result_backends.dummy.DummyBackend",
        }
        logger = getLogger.return_value = mock.MagicMock()
        # Act
        rbt = RabbitMQWorker(queue_name=queue_name,
                             rabbitmq_configs=rabbitmq_configs,
                             serialization_method=serialization_method,
                             result_backend=result_backend,
                             logger="log")
        rbt.callback(mock.MagicMock(), mock.MagicMock(), None, "")

        # Assert

        logger.log.assert_called_once_with(logging.ERROR, mock.ANY)
项目:directory-ui-buyer    作者:uktrade    | 项目源码 | 文件源码
def test_case_study_create_api_success(
    mock_create_case_study, supplier_case_study_end_to_end, sso_user,
    all_case_study_data, api_response_200
):
    mock_create_case_study.return_value = api_response_200

    response = supplier_case_study_end_to_end()
    assert response.status_code == http.client.FOUND
    assert response.get('Location') == reverse('company-detail')
    data = {
        **all_case_study_data,
        'image_one': ANY, 'image_two': ANY, 'image_three': ANY,
    }
    # django converts uploaded files to UploadedFile, which makes
    # `assert_called_once_with` tricky.

    assert mock_create_case_study.call_count == 1
    assert mock_create_case_study.call_args == call(
        data=data,
        sso_session_id=sso_user.session_id,
    )
项目:directory-ui-buyer    作者:uktrade    | 项目源码 | 文件源码
def test_case_study_update_api_success(
    mock_update_case_study, supplier_case_study_end_to_end, sso_user,
    all_case_study_data, api_response_200
):
    mock_update_case_study.return_value = api_response_200

    response = supplier_case_study_end_to_end(case_study_id='1')

    assert response.status_code == http.client.FOUND
    assert response.get('Location') == reverse('company-detail')
    # django converts uploaded files to UploadedFile, which makes
    # `assert_called_once_with` tricky.
    data = {
        **all_case_study_data,
        'image_one': ANY, 'image_two': ANY, 'image_three': ANY,
    }
    mock_update_case_study.assert_called_once_with(
        data=data,
        case_study_id='1',
        sso_session_id=sso_user.session_id,
    )
项目:python-shaarli-client    作者:shaarli    | 项目源码 | 文件源码
def test_generate_endpoint_parser_noparam(addargument):
    """Generate a parser from endpoint metadata - no params"""
    name = 'put-stuff'
    metadata = {
        'path': 'stuff',
        'method': 'PUT',
        'help': "Changes stuff",
        'params': {},
    }
    parser = ArgumentParser()
    subparsers = parser.add_subparsers()

    generate_endpoint_parser(subparsers, name, metadata)

    addargument.assert_has_calls([
        # first helper for the main parser
        mock.call('-h', '--help', action='help',
                  default=mock.ANY, help=mock.ANY),

        # second helper for the 'put-stuff' subparser
        mock.call('-h', '--help', action='help',
                  default=mock.ANY, help=mock.ANY)
    ])
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def test_execute_without_failures(self, check_mock, wait_mock):

        client_mock = self.aws_hook_mock.return_value.get_client_type.return_value
        client_mock.run_task.return_value = RESPONSE_WITHOUT_FAILURES

        self.ecs.execute(None)

        self.aws_hook_mock.return_value.get_client_type.assert_called_once_with('ecs', region_name='eu-west-1')
        client_mock.run_task.assert_called_once_with(
            cluster='c',
            overrides={},
            startedBy=mock.ANY,  # Can by 'airflow' or 'Airflow'
            taskDefinition='t'
        )

        wait_mock.assert_called_once_with()
        check_mock.assert_called_once_with()
        self.assertEqual(self.ecs.arn, 'arn:aws:ecs:us-east-1:012345678910:task/d8c67b3c-ac87-4ffe-a847-4785bc3a8b55')
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def test_exec(self, gcs_hook, dataflow_mock):
        """Test DataFlowHook is created and the right args are passed to
        start_python_workflow.

        """
        start_python_hook = dataflow_mock.return_value.start_python_dataflow
        gcs_download_hook = gcs_hook.return_value.google_cloud_to_local
        self.dataflow.execute(None)
        self.assertTrue(dataflow_mock.called)
        expected_options = {
            'project': 'test',
            'staging_location': 'gs://test/staging',
            'output': 'gs://test/output'
        }
        gcs_download_hook.assert_called_once_with(PY_FILE)
        start_python_hook.assert_called_once_with(TASK_ID, expected_options,
                                                  mock.ANY, PY_OPTIONS)
        self.assertTrue(self.dataflow.py_file.startswith('/tmp/dataflow'))
项目:aiotk    作者:AndreLouisCaron    | 项目源码 | 文件源码
def test_exit_stack_exception_propagate():

    h1 = mock.MagicMock()
    h2 = mock.MagicMock()
    v1 = mock.MagicMock()
    v2 = mock.MagicMock()
    error = ValueError('FUUU')

    with pytest.raises(ValueError) as exc:
        with ExitStack() as stack:
            v = stack.enter_context(AutoClose(h1, v=v1))
            assert v is v1
            v = stack.enter_context(AutoClose(h2, v=v2))
            assert v is v2
            raise error
    assert exc.value is error

    h2.close.assert_called_once_with(ValueError, error, mock.ANY)
    h1.close.assert_called_once_with(ValueError, error, mock.ANY)
项目:aiotk    作者:AndreLouisCaron    | 项目源码 | 文件源码
def test_exit_stack_exception_propagate():

    h1 = mock.MagicMock()
    h2 = mock.MagicMock()
    v1 = mock.MagicMock()
    v2 = mock.MagicMock()
    error = ValueError('FUUU')

    with pytest.raises(ValueError) as exc:
        async with AsyncExitStack() as stack:
            v = await stack.enter_context(AutoClose(h1, v=v1))
            assert v is v1
            v = await stack.enter_context(AutoClose(h2, v=v2))
            assert v is v2
            raise error
    assert exc.value is error

    h2.close.assert_called_once_with(ValueError, error, mock.ANY)
    h1.close.assert_called_once_with(ValueError, error, mock.ANY)
项目:aiotk    作者:AndreLouisCaron    | 项目源码 | 文件源码
def test_exit_stack_exception_propagate():

    h1 = mock.MagicMock()
    h2 = mock.MagicMock()
    v1 = mock.MagicMock()
    v2 = mock.MagicMock()
    error = ValueError('FUUU')

    with pytest.raises(ValueError) as exc:
        async with AsyncExitStack() as stack:
            v = await stack.enter_context(AutoClose(h1, v=v1))
            assert v is v1
            v = await stack.enter_context(AutoClose(h2, v=v2))
            assert v is v2
            raise error
    assert exc.value is error

    h2.close.assert_called_once_with(ValueError, error, mock.ANY)
    h1.close.assert_called_once_with(ValueError, error, mock.ANY)
项目:meta-maas    作者:maas    | 项目源码 | 文件源码
def test_Region_sync_custom_image_already_synced(tmpdir):
    """Test Region.sync_custom performs create and handles when its already
    synced."""
    region = make_Region()
    image_path = tmpdir.join("image.tar.gz")
    image_path.write(b"data")
    region.sync_custom("image", {
        "path": str(image_path),
        "architecture": "amd64/generic",
        "title": "My Title",
    })
    assert call(
        "custom/image", "amd64/generic", ANY,
        title="My Title", filetype=BootResourceFileType.TGZ,
        progress_callback=ANY) == region.origin.BootResources.create.call_args
    assert (
        call("custom/image already in sync", level=MessageLevel.SUCCESS) ==
        region.print_msg.call_args)
项目:GulpIO    作者:TwentyBN    | 项目源码 | 文件源码
def test_ingest(self,
                    mock_process_pool,
                    mock_chunk_writer,
                    ):

        # The next three lines mock the ProcessPoolExecutor and it's map
        # function.
        executor_mock = mock.Mock()
        executor_mock.map.return_value = []
        mock_process_pool.return_value.__enter__.return_value = executor_mock
        self.gulp_ingestor.adapter.__len__.return_value = 2
        self.gulp_ingestor()
        mock_chunk_writer.assert_called_once_with(self.adapter)

        executor_mock.map.assert_called_once_with(
            mock_chunk_writer.return_value.write_chunk,
            mock.ANY,
            [slice(0, 1), slice(1, 2)],
        )
项目:flask-openapi    作者:remcohaszing    | 项目源码 | 文件源码
def test_swagger_minimal(app):
    """
    Test a swagger config for a minimal setup.

    """
    app.config['OPENAPI_INFO_VERSION'] = '1.2.3'
    openapi = OpenAPI(app)
    assert openapi.swagger == {
        'swagger': '2.0',
        'info': {
            'title': 'test_swagger_minimal',
            'version': '1.2.3'
        },
        'paths': ANY,
        'schemes': ['http'],
    }
项目:flask-openapi    作者:remcohaszing    | 项目源码 | 文件源码
def test_swagger_full(app):
    """
    Test a swagger config for a fully configured setup.

    """
    app.config.update(
        SERVER_NAME='api.example.com',
        OPENAPI_SHOW_HOST=True,
        OPENAPI_INFO_VERSION='1.2.3'
    )
    openapi = OpenAPI(app)
    assert openapi.swagger == {
        'swagger': '2.0',
        'info': {
            'title': 'test_swagger_full',
            'version': '1.2.3'
        },
        'paths': ANY,
        'host': 'api.example.com',
        'schemes': ['http']
    }
项目:slurm-pipeline    作者:acorg    | 项目源码 | 文件源码
def testCwdWithRelativeScriptPath(self, isdirMock, existsMock, accessMock,
                                      subprocessMock):
        """
        If a step has a cwd set and its script is a relative path, the path of
        the executed script that is executed must be as specified (not
        converted to an absolute path).
        """
        subprocessMock.return_value = ''

        sp = SlurmPipeline(
            {
                'steps': [
                    {
                        'cwd': '/tmp',
                        'name': 'name1',
                        'script': 'script1',
                    },
                ],
            })
        sp.schedule()

        subprocessMock.assert_has_calls([
            call(['script1'], cwd='/tmp', universal_newlines=True,
                 stdin=DEVNULL, env=ANY),
        ])
项目:slurm-pipeline    作者:acorg    | 项目源码 | 文件源码
def testForce(self, existsMock, accessMock, subprocessMock):
        """
        If force=True is given to SlurmPipeline, SP_FORCE must be set to '1'
        in the step execution environment.
        """
        subprocessMock.return_value = ''

        sp = SlurmPipeline(
            {
                'steps': [
                    {
                        'name': 'name1',
                        'script': 'script1',
                    },
                ],
            })
        sp.schedule(force=True)

        subprocessMock.assert_has_calls([
            call(['script1'], cwd='.', universal_newlines=True,
                 stdin=DEVNULL, env=ANY),
        ])

        env = subprocessMock.mock_calls[0][2]['env']
        self.assertEqual('1', env['SP_FORCE'])
项目:slurm-pipeline    作者:acorg    | 项目源码 | 文件源码
def testDefaultNice(self, existsMock, accessMock, subprocessMock):
        """
        If no nice value is given to schedule, SP_NICE_ARG must be set to
        '--nice' in the step execution environment.
        """
        subprocessMock.return_value = ''

        sp = SlurmPipeline(
            {
                'steps': [
                    {
                        'name': 'name1',
                        'script': 'script1',
                    },
                ],
            })
        sp.schedule()

        subprocessMock.assert_has_calls([
            call(['script1'], cwd='.', universal_newlines=True,
                 stdin=DEVNULL, env=ANY),
        ])

        env = subprocessMock.mock_calls[0][2]['env']
        self.assertEqual('--nice', env['SP_NICE_ARG'])
项目:slurm-pipeline    作者:acorg    | 项目源码 | 文件源码
def testSpecificNice(self, existsMock, accessMock, subprocessMock):
        """
        If a specific nice value is given to schedule, SP_NICE_ARG must be set
        to the expected value in the step execution environment.
        """
        subprocessMock.return_value = ''

        sp = SlurmPipeline(
            {
                'steps': [
                    {
                        'name': 'name1',
                        'script': 'script1',
                    },
                ],
            })
        sp.schedule(nice=40)

        subprocessMock.assert_has_calls([
            call(['script1'], cwd='.', universal_newlines=True,
                 stdin=DEVNULL, env=ANY),
        ])

        env = subprocessMock.mock_calls[0][2]['env']
        self.assertEqual('--nice 40', env['SP_NICE_ARG'])
项目:quantumsim    作者:brianzi    | 项目源码 | 文件源码
def test_simple(self):
        sdm = MagicMock()
        sdm.classical = {"A": 0, "B": 1}
        sdm.apply_ptm = MagicMock()

        c = circuit.Circuit()

        c.add_gate("hadamard", "A", time=0, conditional_bit="B")

        c.apply_to(sdm)

        sdm.apply_ptm.assert_called_once_with("A", ptm=ANY)
        sdm.ensure_classical.assert_called_once_with("B")

        sdm = MagicMock()
        sdm.classical = {"A": 0, "B": 0}
        sdm.hadamard = MagicMock()

        c.apply_to(sdm)

        sdm.apply_ptm.assert_not_called()
        sdm.ensure_classical.assert_called_once_with("B")
项目:commissaire    作者:projectatomic    | 项目源码 | 文件源码
def test_temporarysshkey_remove_failure(self):
        """
        Verify TemporarySSHKey.remove reacts properly to failure.
        """
        mock_logger = mock.MagicMock(logging.Logger('test'))
        key = TemporarySSHKey(TEST_HOST_CREDS, mock_logger)
        key.create()
        with mock.patch('os.unlink') as _unlink:
            _unlink.side_effect = Exception
            self.assertTrue(os.path.isfile(key.path))
            key.remove()
            self.assertTrue(os.path.isfile(key.path))
            # We should have a warning in the log
            mock_logger.warn.assert_called_once_with(
                mock.ANY, mock.ANY, mock.ANY)
        # Clean up the file
        key.remove()
项目:flowcelltool    作者:bihealth    | 项目源码 | 文件源码
def test_render(self):
        """Test that the flow cell delete POST works"""
        # Check precondition
        self.assertEqual(FlowCell.objects.all().count(), 1)

        # Simulate the POST
        with self.login(self.user):
            response = self.client.post(
                reverse('flowcell_delete', kwargs={'pk': self.flow_cell.pk}))

        # Check resulting database state
        self.assertEqual(FlowCell.objects.all().count(), 0)

        # Check call to sending emails
        self.email_mock.assert_called_once_with(self.user, ANY)
        m1 = model_to_dict(self.arg_flowcell)
        del m1['id']
        m2 = model_to_dict(self.flow_cell)
        del m2['id']
        self.assertEqual(m1, m2)

        # Check resulting response
        with self.login(self.user):
            self.assertRedirects(
                response, reverse('flowcell_list'))
项目:satpy    作者:pytroll    | 项目源码 | 文件源码
def test_create_reader_instances_with_filenames(self):
        import satpy.scene
        filenames = ["bla", "foo", "bar"]
        sensors = None
        reader_name = None
        with mock.patch('satpy.scene.Scene._compute_metadata_from_readers'):
            with mock.patch('satpy.scene.ReaderFinder') as findermock:
                scene = satpy.scene.Scene(filenames=filenames)
                findermock.assert_called_once_with(ppp_config_dir=mock.ANY,
                                                   base_dir=None,
                                                   area=None,
                                                   end_time=None,
                                                   start_time=None)
                findermock.return_value.assert_called_once_with(
                    reader=reader_name,
                    sensor=set(),
                    filenames=filenames,
                    reader_kwargs=None,
                    metadata={}
                )
项目:satpy    作者:pytroll    | 项目源码 | 文件源码
def test_create_reader_instances_with_sensor(self):
        import satpy.scene
        sensors = ["bla", "foo", "bar"]
        filenames = None
        reader_name = None
        with mock.patch('satpy.scene.Scene._compute_metadata_from_readers'):
            with mock.patch('satpy.scene.ReaderFinder') as findermock:
                scene = satpy.scene.Scene(sensor=sensors)
                findermock.assert_called_once_with(ppp_config_dir=mock.ANY,
                                                   base_dir=None,
                                                   area=None,
                                                   end_time=None,
                                                   start_time=None)
                findermock.return_value.assert_called_once_with(
                    reader=reader_name,
                    sensor=sensors,
                    filenames=filenames,
                    reader_kwargs=None,
                    metadata={}
                )
项目:satpy    作者:pytroll    | 项目源码 | 文件源码
def test_create_reader_instances_with_sensor_and_filenames(self):
        import satpy.scene
        sensors = ["bla", "foo", "bar"]
        filenames = ["1", "2", "3"]
        reader_name = None
        with mock.patch('satpy.scene.Scene._compute_metadata_from_readers'):
            with mock.patch('satpy.scene.ReaderFinder') as findermock:
                scene = satpy.scene.Scene(sensor=sensors, filenames=filenames)
                findermock.assert_called_once_with(ppp_config_dir=mock.ANY,
                                                   base_dir=None,
                                                   area=None,
                                                   end_time=None,
                                                   start_time=None)
                findermock.return_value.assert_called_once_with(
                    reader=reader_name,
                    sensor=sensors,
                    filenames=filenames,
                    reader_kwargs=None,
                    metadata={}
                )
项目:satpy    作者:pytroll    | 项目源码 | 文件源码
def test_create_reader_instances_with_reader(self):
        from satpy.scene import Scene
        reader = "foo"
        filenames = ["1", "2", "3"]
        sensors = set()
        with mock.patch('satpy.scene.Scene._compute_metadata_from_readers'):
            with mock.patch('satpy.scene.ReaderFinder') as findermock:
                scene = Scene(reader=reader, filenames=filenames)
                findermock.assert_called_once_with(ppp_config_dir=mock.ANY,
                                                   base_dir=None,
                                                   area=None,
                                                   end_time=None,
                                                   start_time=None)
                findermock.return_value.assert_called_once_with(
                    reader=reader,
                    sensor=sensors,
                    filenames=filenames,
                    reader_kwargs=None,
                    metadata={}
                )
项目:commissaire-http    作者:projectatomic    | 项目源码 | 文件源码
def test_create_cluster_with_valid_network(self):
        """
        Verify create_cluster uses valid networks as expected.
        """
        bus = mock.MagicMock()
        cluster = Cluster.new(name='test', network='test')
        # The cluster doesn't exist yet
        bus.storage.get_cluster.side_effect = Exception
        # Network response
        bus.storage.get_network.return_value = Network.new(name='test')
        # Creation of the cluster
        bus.storage.save.return_value = cluster

        # Call the handler...
        clusters.create_cluster.handler(copy.deepcopy(NETWORK_CLUSTER_REQUEST), bus)

        bus.storage.save.assert_called_with(mock.ANY)
项目:commissaire-http    作者:projectatomic    | 项目源码 | 文件源码
def test_create_cluster_with_invalid_network(self):
        """
        Verify create_cluster reacts to invalid networks as expected.
        """
        bus = mock.MagicMock()
        cluster = Cluster.new(name='test', network='test')
        # The cluster doesn't exist yet
        bus.storage.get_cluster.side_effect = Exception
        # The network doesn't exist
        bus.storage.get_network.side_effect = Exception
        # The cluster creation
        bus.storage.save.return_value = cluster

        # Call the handler...
        clusters.create_cluster.handler(copy.deepcopy(NETWORK_CLUSTER_REQUEST), bus)
        # Update clusters network to be 'default' as we expect 'test' to be
        # rejected by the handler
        cluster.network = 'default'
        bus.storage.save.assert_called_with(mock.ANY)
项目:commissaire-http    作者:projectatomic    | 项目源码 | 文件源码
def test_delete_cluster_member_with_container_manager(self):
        """
        Verify that delete_cluster_member handles a container manager
        """
        bus = mock.MagicMock()
        cluster = Cluster.new(
            name='test', hostset=['127.0.0.1'],
            container_manager=C.CONTAINER_MANAGER_OPENSHIFT)

        bus.storage.get_cluster.return_value = cluster
        bus.storage.save.return_value = None

        self.assertEquals(
            create_jsonrpc_response(ID, []),
            clusters.delete_cluster_member.handler(CHECK_CLUSTER_REQUEST, bus))

        # Verify we had a 'container.remove_node'
        bus.request.assert_called_with('container.remove_node', params=mock.ANY)
项目:commissaire-http    作者:projectatomic    | 项目源码 | 文件源码
def test_authentication_manager_multi_complex_deny(self):
        """
        Verify AuthenticationManager handles the complex forbidden case with multiple authenticators.
        """
        start_response = mock.MagicMock()
        response_code = '402 Payment Required'
        expected_result = [bytes('$$$', 'utf8')]
        def complex_auth(environ, start_response):
            start_response(response_code, [])
            return expected_result

        self.authentication_manager.authenticators = [
            mock.MagicMock(authenticate=mock.MagicMock(return_value=False)),
            mock.MagicMock(authenticate=complex_auth),
            mock.MagicMock(authenticate=mock.MagicMock(return_value=False)),
        ]
        result = self.authentication_manager(create_environ(), start_response)
        self.assertEquals(expected_result, result)
        start_response.assert_called_once_with(response_code, mock.ANY)
项目:commissaire-http    作者:projectatomic    | 项目源码 | 文件源码
def test_authentication_manager_multi_complex_allow(self):
        """
        Verify AuthenticationManager handles the complex allow case with multiple authenticators.
        """
        expected_result = [bytes('itrustyou', 'utf8')]
        def complex_auth(environ, start_response):
            start_response('200 OK', [])
            return expected_result

        start_response = mock.MagicMock()
        self.authentication_manager.authenticators = [
            mock.MagicMock(authenticate=mock.MagicMock(return_value=False)),
            mock.MagicMock(authenticate=complex_auth),
            mock.MagicMock(authenticate=mock.MagicMock(return_value=False)),
        ]
        result = self.authentication_manager(create_environ(), start_response)
        self.assertEquals(expected_result, result)
        start_response.assert_called_once_with('200 OK', mock.ANY)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_callback_with_exception(self):
        def callback():
            raise ValueError()

        self.loop = mock.Mock()
        self.loop.call_exception_handler = mock.Mock()

        h = asyncio.Handle(callback, (), self.loop)
        h._run()

        self.loop.call_exception_handler.assert_called_with({
            'message': test_utils.MockPattern('Exception in callback.*'),
            'exception': mock.ANY,
            'handle': h,
            'source_traceback': h._source_traceback,
        })
项目:aioredlock    作者:joanvila    | 项目源码 | 文件源码
def test_lock_one_retry(self, lock_manager_redis_patched, locked_lock):
        lock_manager, redis = lock_manager_redis_patched
        redis.set_lock = CoroutineMock(side_effect=[
            (False, 1),
            (True, 1)
        ])

        lock = await lock_manager.lock('resource')

        calls = [
            call('resource', ANY),
            call('resource', ANY)
        ]
        redis.set_lock.assert_has_calls(calls)
        assert lock.resource == 'resource'
        assert lock.id == ANY
        assert lock.valid is True
项目:aioredlock    作者:joanvila    | 项目源码 | 文件源码
def test_lock_expire_retries(self, lock_manager_redis_patched, locked_lock):
        lock_manager, redis = lock_manager_redis_patched
        redis.set_lock = CoroutineMock(side_effect=[
            (False, 1),
            (False, 1),
            (False, 1)
        ])

        lock = await lock_manager.lock('resource')

        calls = [
            call('resource', ANY),
            call('resource', ANY),
            call('resource', ANY)
        ]
        redis.set_lock.assert_has_calls(calls)
        assert lock.resource == 'resource'
        assert lock.id == ANY
        assert lock.valid is False
项目:aioredlock    作者:joanvila    | 项目源码 | 文件源码
def test_lock_one_timeout(self, lock_manager_redis_patched, locked_lock):
        lock_manager, redis = lock_manager_redis_patched
        redis.set_lock = CoroutineMock(side_effect=[
            (True, 1500),
            (True, 1)
        ])

        lock = await lock_manager.lock('resource')

        calls = [
            call('resource', ANY),
            call('resource', ANY)
        ]
        redis.set_lock.assert_has_calls(calls)
        assert lock.resource == 'resource'
        assert lock.id == ANY
        assert lock.valid is True
项目:aioredlock    作者:joanvila    | 项目源码 | 文件源码
def test_lock_expire_retries_for_timeouts(self, lock_manager_redis_patched, locked_lock):
        lock_manager, redis = lock_manager_redis_patched
        redis.set_lock = CoroutineMock(side_effect=[
            (True, 1100),
            (True, 1001),
            (True, 2000)
        ])

        lock = await lock_manager.lock('resource')

        calls = [
            call('resource', ANY),
            call('resource', ANY),
            call('resource', ANY)
        ]
        redis.set_lock.assert_has_calls(calls)
        assert lock.resource == 'resource'
        assert lock.id == ANY
        assert lock.valid is False
项目:qubes-core-admin-client    作者:QubesOS    | 项目源码 | 文件源码
def test_000_simple(self, mock_backup, mock_getpass, mock_input):
        mock_getpass.return_value = 'testpass'
        mock_input.return_value = 'Y'
        vm1 = BackupVM()
        vm1.name = 'test-vm'
        vm1.backup_path = 'path/in/backup'
        vm1.template = None
        vm1.klass = 'StandaloneVM'
        vm1.label = 'red'
        mock_restore_info = {
            1: BackupRestore.VMToRestore(vm1),
        }
        mock_backup.configure_mock(**{
            'return_value.get_restore_summary.return_value': '',
            'return_value.get_restore_info.return_value': mock_restore_info,
        })
        with mock.patch('qubesadmin.tools.qvm_backup_restore.handle_broken') \
                as mock_handle_broken:
            qubesadmin.tools.qvm_backup_restore.main(['/some/path'],
                app=self.app)
            mock_handle_broken.assert_called_once_with(
                self.app, mock.ANY, mock_restore_info)
        mock_backup.assert_called_once_with(
            self.app, '/some/path', None, 'testpass')
        self.assertAllCalled()
项目:mal    作者:ryukinix    | 项目源码 | 文件源码
def test_search_found(self, mock_requests_get):
        search_query = 'test'
        search_results = MalSearchResponseBuilder()
        search_results.add_result({'title': search_query})
        mock_requests_get.return_value = mock.Mock(
            status_code=200,
            text=search_results.get_response_xml()
        )
        results = self.mal.search(search_query)

        mock_requests_get.assert_called_with(
            ANY,
            params=dict(q=search_query),
            auth=ANY,
            headers=ANY
        )

        self.assertTrue(len(results) == 1)
        first_result = results[0]
        self.assertTrue(first_result['title'] == search_query)
项目:mal    作者:ryukinix    | 项目源码 | 文件源码
def test_search_found_more_than_one(self, mock_requests_get):
        search_query = 'test'
        search_results = MalSearchResponseBuilder()
        search_results.add_result({'title': 'test1'})
        search_results.add_result({'title': 'test2'})
        mock_requests_get.return_value = mock.Mock(
            status_code=200,
            text=search_results.get_response_xml()
        )
        results = self.mal.search(search_query)

        mock_requests_get.assert_called_with(
            ANY,
            params=dict(q=search_query),
            auth=ANY,
            headers=ANY
        )

        self.assertTrue(len(results) > 1)
        for result in results:
            self.assertTrue(search_query in result['title'])
项目:mal    作者:ryukinix    | 项目源码 | 文件源码
def test_update_post(self, mock_requests_post):
        item_id = 1
        entry = {'episode': 10}
        expected_xml = '<entry><episode>10</episode></entry>'
        expected_response_code = 200

        xml_header='<?xml version="1.0" encoding="UTF-8"?>'
        mock_requests_post.return_value = mock.Mock(
            status_code=expected_response_code
        )
        result = self.mal.update(item_id, entry)

        mock_requests_post.assert_called_with(
            'https://myanimelist.net/api/animelist/update/{0}.xml'.format(
                item_id),
            data={'data': xml_header + expected_xml},
            auth=(MOCK_USER, MOCK_PASS),
            headers=ANY
        )

        self.assertTrue(result == expected_response_code)
项目:aiojobs    作者:aio-libs    | 项目源码 | 文件源码
def test_exception_non_waited_job(make_scheduler, loop):
    exc_handler = mock.Mock()
    scheduler = await make_scheduler(exception_handler=exc_handler)
    exc = RuntimeError()

    async def coro():
        await asyncio.sleep(0, loop=loop)
        raise exc

    await scheduler.spawn(coro())
    assert len(scheduler) == 1

    await asyncio.sleep(0.05, loop=loop)

    assert len(scheduler) == 0

    expect = {'exception': exc,
              'job': mock.ANY,
              'message': 'Job processing failed'}
    if loop.get_debug():
        expect['source_traceback'] = mock.ANY
    exc_handler.assert_called_with(scheduler, expect)
项目:aiojobs    作者:aio-libs    | 项目源码 | 文件源码
def test_exception_on_close(make_scheduler, loop):
    exc_handler = mock.Mock()
    scheduler = await make_scheduler(exception_handler=exc_handler)
    exc = RuntimeError()

    fut = asyncio.Future()

    async def coro():
        fut.set_result(None)
        raise exc

    await scheduler.spawn(coro())
    assert len(scheduler) == 1

    await scheduler.close()

    assert len(scheduler) == 0

    expect = {'exception': exc,
              'job': mock.ANY,
              'message': 'Job processing failed'}
    if loop.get_debug():
        expect['source_traceback'] = mock.ANY
    exc_handler.assert_called_with(scheduler, expect)
项目:aiojobs    作者:aio-libs    | 项目源码 | 文件源码
def test_timeout_on_closing(make_scheduler, loop):
    exc_handler = mock.Mock()
    scheduler = await make_scheduler(exception_handler=exc_handler,
                                     close_timeout=0.01)
    fut1 = asyncio.Future()
    fut2 = asyncio.Future()

    async def coro():
        try:
            await fut1
        except asyncio.CancelledError:
            await fut2

    job = await scheduler.spawn(coro())
    await asyncio.sleep(0.001, loop=loop)
    await scheduler.close()
    assert job.closed
    assert fut1.cancelled()
    expect = {'message': 'Job closing timed out',
              'job': job,
              'exception': mock.ANY}
    if loop.get_debug():
        expect['source_traceback'] = mock.ANY
    exc_handler.assert_called_with(scheduler, expect)
项目:aiojobs    作者:aio-libs    | 项目源码 | 文件源码
def test_exception_on_closing(make_scheduler, loop):
    exc_handler = mock.Mock()
    scheduler = await make_scheduler(exception_handler=exc_handler)
    fut = asyncio.Future()
    exc = RuntimeError()

    async def coro():
        fut.set_result(None)
        raise exc

    job = await scheduler.spawn(coro())
    await fut
    await scheduler.close()
    assert job.closed
    expect = {'message': 'Job processing failed',
              'job': job,
              'exception': exc}
    if loop.get_debug():
        expect['source_traceback'] = mock.ANY
    exc_handler.assert_called_with(scheduler, expect)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_callback_with_exception(self):
        def callback():
            raise ValueError()

        self.loop = mock.Mock()
        self.loop.call_exception_handler = mock.Mock()

        h = asyncio.Handle(callback, (), self.loop)
        h._run()

        self.loop.call_exception_handler.assert_called_with({
            'message': test_utils.MockPattern('Exception in callback.*'),
            'exception': mock.ANY,
            'handle': h,
            'source_traceback': h._source_traceback,
        })
项目:aiohttp-devtools    作者:aio-libs    | 项目源码 | 文件源码
def test_serve_main_app_app_instance(tmpworkdir, loop, mocker):
    mktree(tmpworkdir, {
        'app.py': """\
from aiohttp import web

async def hello(request):
    return web.Response(text='<h1>hello world</h1>', content_type='text/html')

app = web.Application()
app.router.add_get('/', hello)
"""
    })
    asyncio.set_event_loop(loop)
    mocker.spy(loop, 'create_server')
    mock_modify_main_app = mocker.patch('aiohttp_devtools.runserver.serve.modify_main_app')
    loop.call_later(0.5, loop.stop)

    config = Config(app_path='app.py')
    serve_main_app(config, '/dev/tty')

    assert loop.is_closed()
    loop.create_server.assert_called_with(mock.ANY, '0.0.0.0', 8000, backlog=128)
    mock_modify_main_app.assert_called_with(mock.ANY, config)
项目:dila    作者:socialwifi    | 项目源码 | 文件源码
def test_data_preserves_translated_strings(db_connection):
    data.add_language('polish', 'pl')
    resource_pk = data.add_resource('r').pk
    string_pk = data.add_or_update_base_string(resource_pk, 'x', comment='comment', context='ctx')
    data.set_translated_string('pl', string_pk, translation='y', translator_comment='tcomment')
    preserved_strings = list(data.get_translated_strings('pl', resource_pk))
    assert preserved_strings == [
        dila.application.structures.TranslatedStringData(
            pk=mock.ANY,
            base_string='x',
            plural='',
            context='ctx',
            translation='y',
            comment='comment',
            translator_comment='tcomment',
            resource_pk=resource_pk,
            plural_translations=None,
        )]
项目:dila    作者:socialwifi    | 项目源码 | 文件源码
def test_adding_the_same_sting(db_connection):
    data.add_language('polish', 'pl')
    resource_pk = data.add_resource('r').pk
    data.add_or_update_base_string(resource_pk, 'x', comment='comment', context='ctx')
    data.add_or_update_base_string(resource_pk, 'x', comment='lolz', context='ctx')
    preserved_strings = list(data.get_translated_strings('pl', resource_pk))
    assert preserved_strings == [
        dila.application.structures.TranslatedStringData(
            pk=mock.ANY,
            base_string='x',
            plural='',
            context='ctx',
            translation='',
            comment='lolz',
            translator_comment='',
            resource_pk=resource_pk,
            plural_translations=None,
        )]