Python responses 模块,RequestsMock() 实例源码

我们从Python开源项目中,提取了以下45个代码示例,用于说明如何使用responses.RequestsMock()

项目:instagram_private_api_extensions    作者:ping    | 项目源码 | 文件源码
def test_downloader_conn_error(self):
        exception = ConnectionError()
        with responses.RequestsMock(assert_all_requests_are_fired=True) as rsps:
            max_retry = 3
            for _ in range(max_retry + 1):
                rsps.add(responses.GET, self.TEST_MPD_URL, body=exception)

            dl = live.Downloader(
                mpd=self.TEST_MPD_URL,
                output_dir='output_connerror',
                duplicate_etag_retry=2,
                singlethreaded=True,
                max_connection_error_retry=max_retry)
            dl.run()
            dl.stream_id = '17875351285037717'
            output_file = 'output_connerror.mp4'
            dl.stitch(output_file, cleartempfiles=True)
            self.assertFalse(os.path.isfile(output_file), '{0!s} not generated'.format(output_file))
项目:libmozdata    作者:mozilla    | 项目源码 | 文件源码
def setup_versions(self, stable, devel, nightly, esr, esr_next=None):
        """
        Add a mock response for official versions
        """
        self.cleanup()
        body = {
            "FIREFOX_NIGHTLY": nightly,
            "FIREFOX_ESR": esr,
            "FIREFOX_ESR_NEXT": esr_next,
            "LATEST_FIREFOX_DEVEL_VERSION": devel,
            "LATEST_FIREFOX_OLDER_VERSION": "3.6.28",
            "LATEST_FIREFOX_RELEASED_DEVEL_VERSION": devel,
            "LATEST_FIREFOX_VERSION": stable,
        }
        local_mock = responses.RequestsMock()
        local_mock.add(responses.GET, versions.URL_VERSIONS, json=body)
        local_mock.start()
        yield local_mock
        local_mock.stop()
项目:python-deform    作者:deformio    | 项目源码 | 文件源码
def test_create_new_user(self):
        with responses.RequestsMock() as rsps:
            rsps.add(
                self.deform_client.user.create.method.upper(),
                self.deform_client.user.create.get_context({})['url'],
                json={
                    'message': 'Check your email for confirmation code'
                },
                status=201
            )
            response = self.deform_client.user.create(
                email='a' + self.CONFIG['DEFORM']['EMAIL'],
                password=self.CONFIG['DEFORM']['PASSWORD']
            )
            assert_that(
                response,
                has_entry('message', 'Check your email for confirmation code')
            )
项目:python-deform    作者:deformio    | 项目源码 | 文件源码
def test_confirm(self):
        session_id = 'Qivjy3PW3dqxAOY9i4MeT1H0FHyGkw'
        with responses.RequestsMock() as rsps:
            rsps.add(
                self.deform_client.user.confirm.method.upper(),
                self.deform_client.user.confirm.get_context({})['url'],
                json={
                    'sessionId': session_id
                },
                status=200
            )
            response = self.deform_client.user.confirm(
                code='12345'
            )
            assert_that(
                response,
                has_entry('sessionId', session_id)
            )
项目:python-deform    作者:deformio    | 项目源码 | 文件源码
def test_confirm_already_confirmed_code(self):
        with responses.RequestsMock() as rsps:
            rsps.add(
                self.deform_client.user.confirm.method.upper(),
                self.deform_client.user.confirm.get_context({})['url'],
                json={
                    'message': 'User already exists.'
                },
                status=409
            )
            assert_that(
                calling(self.deform_client.user.confirm).with_args(
                    code='12345'
                ),
                raises(ConflictError, '^User already exists\.$')
            )
项目:cpauto    作者:dana-at-cp    | 项目源码 | 文件源码
def test_all_the_things(self, core_client, mgmt_server_base_uri, resource, params):
        endpoint = mgmt_server_base_uri + resource
        with responses.RequestsMock() as rsps:
            resp_body = {'foo': 'bar', 'message': 'OK'}
            rsps.add(responses.POST, endpoint,
                     json=resp_body, status=200,
                     content_type='application/json')

            lm = cpauto.LoginMessage(core_client)
            if resource == "show-login-message":
                r = lm.show()

                assert r.status_code == 200
                assert r.json() == resp_body

            if resource == "set-login-message":
                r = lm.set(params=params)

                assert r.status_code == 200
                assert r.json() == resp_body
项目:cpauto    作者:dana-at-cp    | 项目源码 | 文件源码
def test_show_all_access_layers(core_client, mgmt_server_base_uri,
    limit, offset, order, details_level):
    endpoint = mgmt_server_base_uri + 'show-access-layers'
    with responses.RequestsMock() as rsps:
        resp_body = {'foo': 'bar', 'message': 'OK'}
        rsps.add(responses.POST, endpoint,
                 json=resp_body, status=200,
                 content_type='application/json')

        c = cpauto.AccessLayer(core_client)
        r = c.show_all(limit=limit, offset=offset,
            order=order, details_level=details_level)

        assert r.status_code == 200
        assert r.json() == resp_body

# NATRule
项目:rets    作者:refindlyllc    | 项目源码 | 文件源码
def test_preferred_object(self):
        with open('tests/rets_responses/GetObject_multipart.byte', 'rb') as f:
            multiple = f.read()

        multi_headers = {
            'Content-Type': 'multipart/parallel; boundary="24cbd0e0afd2589bb9dcb1f34cf19862"; charset=utf-8',
            'Connection': 'keep-alive', 'RETS-Version': 'RETS/1.7.2', 'MIME-Version': '1.0, 1.0'}

        with responses.RequestsMock() as resps:
            resps.add(resps.POST, 'http://server.rets.com/rets/GetObject.ashx',
                      body=multiple, status=200, headers=multi_headers)

            obj = self.session.get_preferred_object(resource='Property', object_type='Photo', content_id=1)
            self.assertTrue(obj)

            resps.add(resps.POST, 'http://server.rets.com/rets/GetObject.ashx',
                      body=multiple, status=200)

            resource = dict()
            resource['ResourceID'] = 'Agent'
            obj1 = self.session.get_preferred_object(resource=resource, object_type='Photo', content_id=1)
            self.assertTrue(obj1)
项目:rets    作者:refindlyllc    | 项目源码 | 文件源码
def test_class_metadata(self):
        with open('tests/rets_responses/COMPACT-DECODED/GetMetadata_classes.xml') as f:
            contents = ''.join(f.readlines())

        with open('tests/rets_responses/COMPACT-DECODED/GetMetadata_classes_single.xml') as f:
            single_contents = ''.join(f.readlines())

        with responses.RequestsMock() as resps:
            resps.add(resps.POST, 'http://server.rets.com/rets/GetMetadata.ashx',
                      body=contents, status=200)
            resource_classes = self.session.get_class_metadata(resource='Agent')
            self.assertEqual(len(resource_classes), 6)

            resps.add(resps.POST, 'http://server.rets.com/rets/GetMetadata.ashx',
                      body=single_contents, status=200)
            resource_classes_single = self.session.get_class_metadata(resource='Property')
            self.assertEqual(len(resource_classes_single), 1)
项目:rets    作者:refindlyllc    | 项目源码 | 文件源码
def test_auto_offset(self):
        with open('tests/rets_responses/COMPACT-DECODED/Search_1of2.xml') as f:
            search1_contents = ''.join(f.readlines())

        with open('tests/rets_responses/COMPACT-DECODED/Search_2of2.xml') as f:
            search2_contents = ''.join(f.readlines())

        with responses.RequestsMock() as resps:
            resps.add(resps.POST, 'http://server.rets.com/rets/Search.ashx',
                      body=search1_contents, status=200, stream=True)
            resps.add(resps.POST, 'http://server.rets.com/rets/Search.ashx',
                      body=search2_contents, status=200, stream=True)
            results = self.session.search(resource='Property',
                                          resource_class='RES',
                                          search_filter={'ListingPrice': 200000})

            self.assertEqual(len(results), 6)
项目:rets    作者:refindlyllc    | 项目源码 | 文件源码
def test_change_parser_automatically(self):
        self.assertEqual(self.session.metadata_format, 'COMPACT-DECODED')

        with open('tests/rets_responses/Errors/20514.xml') as f:
            dtd_error = ''.join(f.readlines())

        with open('tests/rets_responses/STANDARD-XML/GetMetadata_system.xml') as f:
            content = ''.join(f.readlines())

        with responses.RequestsMock() as resps:
            resps.add(resps.POST, 'http://server.rets.com/rets/GetMetadata.ashx',
                      body=dtd_error, status=200)
            resps.add(resps.POST, 'http://server.rets.com/rets/GetMetadata.ashx',
                      body=content, status=200)
            self.session.get_system_metadata()

        self.assertEqual(self.session.metadata_format, 'STANDARD-XML')
项目:python-trustpilot    作者:trustpilot    | 项目源码 | 文件源码
def test_request_renew_auth_token_success(self, auth_mock):
        auth_mock.request_new_access_token.return_value = "access_token"
        with responses.RequestsMock(
                assert_all_requests_are_fired=True) as rsps:
            rsps.add(
                responses.GET,
                'https://hostname.com/v1/this/1',
                body="foo", status=401
            )
            rsps.add(
                responses.GET,
                'https://hostname.com/v1/this/1',
                body="bar", status=200
            )

            session = self.session
            response = session.get(self.request_url)

            headers = dict(response.request.headers)
            assert response.text == "bar"
            assert all(value == headers[key] for key, value in self.exp_headers.items())
项目:python-trustpilot    作者:trustpilot    | 项目源码 | 文件源码
def test_request_renew_auth_token_fail(self, auth_mock):
        auth_mock.request_new_access_token.return_value = "access_token"
        with responses.RequestsMock(
                assert_all_requests_are_fired=True) as rsps:
            rsps.add(
                    responses.GET,
                    'https://hostname.com/v1/this/1',
                    body="bar",
                    status=401
            )
            rsps.add(
                    responses.GET,
                    'https://hostname.com/v1/this/1',
                    body="foo",
                    status=401
            )
            session = self.session
            response = session.get(self.request_url)

            assert response.text == "foo"
项目:python-trustpilot    作者:trustpilot    | 项目源码 | 文件源码
def test_no_hooks(self, auth_mock):
        auth_mock.request_new_access_token.return_value = "access_token"
        with responses.RequestsMock(
                assert_all_requests_are_fired=True) as rsps:
            rsps.add(
                responses.GET,
                'https://hostname.com/v1/this/1',
                body="foo", status=401
            )
            rsps.add(
                responses.GET,
                'https://hostname.com/v1/this/1',
                body="bar", status=200
            )
            session = self.session
            hook_mock = mock.Mock()

            # no hooks
            session.get(self.request_url)
            assert_not_called(hook_mock.pre_hook)
            assert_not_called(hook_mock.post_hook)
项目:python-trustpilot    作者:trustpilot    | 项目源码 | 文件源码
def test_pre_hook(self, auth_mock):
        auth_mock.request_new_access_token.return_value = "access_token"
        with responses.RequestsMock(
                assert_all_requests_are_fired=True) as rsps:
            rsps.add(
                responses.GET,
                'https://hostname.com/v1/this/1',
                body="foo", status=401
            )
            rsps.add(
                responses.GET,
                'https://hostname.com/v1/this/1',
                body="bar", status=200
            )
            session = self.session
            hook_mock = mock.Mock()

            session.register_pre_hook(hook_mock.pre_hook)

            session.get(self.request_url)

            assert_called_once(hook_mock.pre_hook)
            assert_not_called(hook_mock.post_hook)
项目:python-trustpilot    作者:trustpilot    | 项目源码 | 文件源码
def test_pre_and_post_hooks(self, auth_mock):
        auth_mock.request_new_access_token.return_value = "access_token"
        with responses.RequestsMock(
                assert_all_requests_are_fired=True) as rsps:
            rsps.add(
                responses.GET,
                'https://hostname.com/v1/this/1',
                body="foo", status=401
            )
            rsps.add(
                responses.GET,
                'https://hostname.com/v1/this/1',
                body="bar", status=200
            )
            session = self.session
            hook_mock = mock.Mock()

            session.register_pre_hook(hook_mock.pre_hook)
            session.register_post_hook(hook_mock.post_hook)

            session.get(self.request_url)

            assert_called_once(hook_mock.pre_hook)
            assert_called_once(hook_mock.post_hook)
项目:sdk-py    作者:predicthq    | 项目源码 | 文件源码
def with_mock_responses(req_resp=None):
    def decorator(f):
        @functools.wraps(f)
        def wrapper(*args, **kwargs):
            class_name, func_name = re.sub(r'([A-Z]+)', r'_\1', args[0].__class__.__name__).lower().strip('_'), f.__name__
            fixtures = load_reqresp_fixture(req_resp or "{}/{}".format(class_name, func_name))
            with responses.RequestsMock(assert_all_requests_are_fired=False) as rsps:
                for fixture in fixtures:
                    if 'url_re' in fixture:
                        url_re = fixture.pop('url_re')
                        fixture['url'] = re.compile(Client.build_url(url_re))
                    else:
                        fixture['url'] = Client.build_url(fixture['url'])
                    if "content_type" in fixture and fixture['content_type'] == "application/json":
                        fixture['body'] = json.dumps(fixture['body'])
                    rsps.add(**fixture)
                return f(responses=rsps, *args, **kwargs)
        return wrapper
    return decorator
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def test_modpack_fetch(minimal_pack, tinkers_update):
    """Does the File.fetch fetches the file correctly?"""

    minimal_pack.path /= 'files'
    session = requests.Session()
    file = tinkers_update

    with betamax.Betamax(session).use_cassette('file-fetch'):
        with pytest.raises(OSError):
            minimal_pack.fetch(file, session=session)

        minimal_pack.path.mkdir()
        minimal_pack.fetch(file, session=session)

        filepath = minimal_pack.path / file.name
        assert filepath.is_file()
        assert filepath.stat().st_mtime == file.date.timestamp()

    with responses.RequestsMock() as rsps:
        minimal_pack.fetch(file, session=session)

        assert len(rsps.calls) == 0
项目:feedstockrot    作者:axiom-data-science    | 项目源码 | 文件源码
def setup(self, rsps: RequestsMock):
        for package_name in self.package_names:
            response = [
                {"ref": "refs/tags/{}".format(package['version'])}
                for package in mock_package_list
                if package['name'] == package_name
            ]

            github_name = package_name
            if '/' not in github_name:
                github_name = package_name + "/" + package_name

            rsps.add(rsps.GET, Github._REFS_URL.format(github_name), json=response)

        re_notfound = re.compile(escape_regex_format_str(Github._REFS_URL).format("([a-zA-Z-_.]+)"))
        for _ in range(self._expected_missing):
            rsps.add(rsps.GET, re_notfound, status=404)
项目:data.world-py    作者:datadotworld    | 项目源码 | 文件源码
def test_parameterized_queries(self, type, query, parameters, expected,
                                   dw, dataset_key, query_response_json):
        with responses.RequestsMock() as rsps:
            def request_callback(request):
                for value in expected:
                    assert_that(request.url, contains_string(value),
                                reason="Expected [[\n{}\n]] to contain "
                                "[[\n{}\n]]".format(request.url, expected))
                return(200, {}, json.dumps(query_response_json))

            rsps.add_callback(rsps.GET, re.compile(
                              r'https?://query\.data\.world/.*'),
                              callback=request_callback,
                              content_type="application/json",
                              match_querystring=True)

            dw.query(dataset_key, query, query_type=type,
                     parameters=parameters)
项目:data.world-py    作者:datadotworld    | 项目源码 | 文件源码
def test_write_basic(self):
        with responses.RequestsMock() as resp:
            def upload_endpoint(request):
                assert "test" == ''.join([chunk.decode('utf-8')
                                          for chunk in request.body])
                assert request.headers.get('User-Agent') == _user_agent()
                return 200, {}, json.dumps({})

            resp.add_callback(resp.PUT,
                              '{}/uploads/{}/{}/files/{}'
                              .format('https://api.data.world/v0',
                                      'user', 'dataset', 'file.txt'),
                              callback=upload_endpoint)
            with RemoteFile(DefaultConfig(),
                            "user/dataset", "file.txt") as writer:
                writer.write("test")
项目:data.world-py    作者:datadotworld    | 项目源码 | 文件源码
def test_write_csv(self):
        with responses.RequestsMock() as resp:
            def upload_endpoint(request):
                assert "a,b\r\n42,17\r\n420,178\r\n" == \
                       ''.join([chunk.decode('utf-8')
                                for chunk in request.body])
                assert request.headers.get('User-Agent') == _user_agent()
                return 200, {}, json.dumps({})

            resp.add_callback(resp.PUT, '{}/uploads/{}/{}/files/{}'
                              .format('https://api.data.world/v0',
                                      'user', 'dataset', 'file.csv'),
                              callback=upload_endpoint)
            with RemoteFile(DefaultConfig(),
                            "user/dataset", "file.csv") as writer:
                csvw = csv.DictWriter(writer, fieldnames=['a', 'b'])
                csvw.writeheader()
                csvw.writerow({'a': 42, 'b': 17})
                csvw.writerow({'a': 420, 'b': 178})
项目:data.world-py    作者:datadotworld    | 项目源码 | 文件源码
def test_read_jsonl(self):
        with responses.RequestsMock() as resp:
            def download_endpoint(request):
                assert request.headers.get('User-Agent') == _user_agent()
                return 200, {}, '{"A":"1", "B":"2", "C":"3"}\n' \
                                '{"A":"4", "B":"5", "C":"6"}\n'

            resp.add_callback(resp.GET, '{}/file_download/{}/{}/{}'
                              .format('https://query.data.world',
                                      'user', 'dataset', 'file.csv'),
                              callback=download_endpoint)
            with RemoteFile(DefaultConfig(), "user/dataset", "file.csv",
                            mode="r") as reader:
                rows = [json.loads(line) for line in reader if line.strip()]
                assert rows[0] == {'A': '1', 'B': '2', 'C': '3'}
                assert rows[1] == {'A': '4', 'B': '5', 'C': '6'}
项目:genericclient-requests    作者:genericclient    | 项目源码 | 文件源码
def test_endpoint_all(self):
        with responses.RequestsMock() as rsps:
            rsps.add(responses.GET, MOCK_API_URL + '/users', json=[
                {
                    'id': 1,
                    'username': 'user1',
                    'group': 'watchers',
                },
                {
                    'id': 2,
                    'username': 'user2',
                    'group': 'watchers',
                },
                {
                    'id': 3,
                    'username': 'user3',
                    'group': 'admin',
                },
            ])

            users = generic_client.users.all()
            self.assertEqual(len(users), 3)
项目:genericclient-requests    作者:genericclient    | 项目源码 | 文件源码
def test_endpoint_links(self):
        with responses.RequestsMock() as rsps:
            rsps.add('GET', MOCK_API_URL + '/users?page=2', json=[
                {
                    'id': 3,
                    'username': 'user1',
                    'group': 'watchers',
                },
                {
                    'id': 4,
                    'username': 'user2',
                    'group': 'watchers',
                },
            ], match_querystring=True, headers={
                'Link': '<http://example.com/users?page=3>; rel=next,<http://example.com/users?page=1>; rel=previous'
            })

            users = generic_client.users.filter(page=2)
            self.assertEqual(users.response.links, {
                'next': {'url': 'http://example.com/users?page=3', 'rel': 'next'},
                'previous': {'url': 'http://example.com/users?page=1', 'rel': 'previous'}
            })
项目:genericclient-requests    作者:genericclient    | 项目源码 | 文件源码
def test_endpoint_delete(self):
        with responses.RequestsMock() as rsps:
            rsps.add(responses.DELETE, MOCK_API_URL + '/users/1', status=204)

            generic_client.users.delete(1)

        with responses.RequestsMock() as rsps:
            rsps.add(responses.DELETE, MOCK_API_URL + '/users/1', status=404)

            with self.assertRaises(generic_client.ResourceNotFound):
                generic_client.users.delete(1)

        with responses.RequestsMock() as rsps:
            rsps.add(responses.DELETE, MOCK_API_URL + '/users/1', status=500)

            with self.assertRaises(generic_client.HTTPError):
                generic_client.users.delete(1)
项目:genericclient-requests    作者:genericclient    | 项目源码 | 文件源码
def test_resource_save_uuid(self):
        with responses.RequestsMock() as rsps:
            rsps.add(responses.GET, MOCK_API_URL + '/users/1', json={
                'uuid': 1,
                'username': 'user1',
                'group': 'watchers',
            })

            user1 = generic_client.users.get(uuid=1)
            self.assertEqual(user1.username, 'user1')

        with responses.RequestsMock() as rsps:
            rsps.add(responses.PUT, MOCK_API_URL + '/users/1', json={
                'uuid': 1,
                'username': 'user1',
                'group': 'admins',
            })

            user1.group = 'admins'
            user1.save()
项目:genericclient-requests    作者:genericclient    | 项目源码 | 文件源码
def test_resource_save(self):
        with responses.RequestsMock() as rsps:
            rsps.add(responses.GET, MOCK_API_URL + '/users/1/', json={
                'id': 1,
                'username': 'user1',
                'group': 'watchers',
            })

            user1 = generic_client.users.get(id=1)
            self.assertEqual(user1.username, 'user1')

        with responses.RequestsMock() as rsps:
            rsps.add(responses.PUT, MOCK_API_URL + '/users/1/', json={
                'id': 1,
                'username': 'user1',
                'group': 'admins',
            })

            user1.group = 'admins'
            user1.save()
项目:genericclient-requests    作者:genericclient    | 项目源码 | 文件源码
def test_endpoint_detail_route(self):
        with responses.RequestsMock() as rsps:
            rsps.add_callback(
                responses.POST, MOCK_API_URL + '/users/2/notify',
                callback=request_callback,
                content_type='application/json',
            )

            response = generic_client.users(id=2).notify(unread=3)
            self.assertEqual(response.json(), {'unread': 3})

        with responses.RequestsMock() as rsps:
            rsps.add_callback(
                responses.GET, MOCK_API_URL + '/users/2/notify',
                callback=request_callback,
                content_type='application/json',
            )

            response = generic_client.users(_method='get', id=2).notify(unread=3)
            self.assertEqual(response.json(), {'unread': 3})
项目:genericclient-requests    作者:genericclient    | 项目源码 | 文件源码
def test_endpoint_list_route(self):
        with responses.RequestsMock() as rsps:
            rsps.add_callback(
                responses.POST, MOCK_API_URL + '/users/notify',
                callback=request_callback,
                content_type='application/json',
            )

            response = generic_client.users().notify(unread=3)
            self.assertEqual(response.json(), {'unread': 3})

        with responses.RequestsMock() as rsps:
            rsps.add_callback(
                responses.GET, MOCK_API_URL + '/users/notify',
                callback=request_callback,
                content_type='application/json',
            )

            response = generic_client.users(_method='get').notify(unread=3)
            self.assertEqual(response.json(), {'unread': 3})
项目:genericclient-requests    作者:genericclient    | 项目源码 | 文件源码
def test_endpoint_links(self):
        with responses.RequestsMock() as rsps:
            rsps.add('GET', MOCK_API_URL + '/users/?page=2', json=[
                {
                    'id': 3,
                    'username': 'user1',
                    'group': 'watchers',
                },
                {
                    'id': 4,
                    'username': 'user2',
                    'group': 'watchers',
                },
            ], match_querystring=True, headers={
                'Link': '<http://example.com/users/?page=3>; rel=next,<http://example.com/users/?page=1>; rel=previous'
            })

            users = generic_client.users.filter(page=2)
            self.assertEqual(users.response.links, {
                'next': {'url': 'http://example.com/users/?page=3', 'rel': 'next'},
                'previous': {'url': 'http://example.com/users/?page=1', 'rel': 'previous'}
            })
项目:money-to-prisoners-send-money    作者:ministryofjustice    | 项目源码 | 文件源码
def test_failure_page(self):
        processor_id = '3'
        with responses.RequestsMock() as rsps:
            rsps.add(rsps.GET, api_url('/payments/'), json={
                'uuid': 'wargle-blargle',
                'processor_id': processor_id,
                'recipient_name': 'James Bond',
                'amount': 2000,
                'status': 'pending',
                'created': datetime.datetime.now().isoformat() + 'Z',
                'prisoner_number': 'A5544CD',
                'prisoner_dob': '1992-12-05'
            })
            rsps.add(rsps.GET, govuk_url('/payments/%s' % processor_id), json={
                'state': {'status': 'failed'}
            })

            self.driver.get(self.live_server_url + '/en-gb/debit-card/confirmation/?payment_ref=REF12345')
            self.assertInSource('We’re sorry, your payment could not be processed on this occasion')
            self.assertInSource('Your reference number is <strong>REF12345</strong>')
项目:money-to-prisoners-send-money    作者:ministryofjustice    | 项目源码 | 文件源码
def assertFormValid(self, form):  # noqa
        with responses.RequestsMock() as rsps, \
                mock.patch('send_money.forms.PrisonerDetailsForm.get_api_session') as mocked_api_session:
            mocked_api_session.side_effect = get_api_session
            mock_auth(rsps)
            rsps.add(
                rsps.GET,
                api_url('/prisoner_validity/'),
                json={
                    'count': 1,
                    'results': [{
                        'prisoner_number': 'A1234AB',
                        'prisoner_dob': '1980-10-05',
                    }],
                },
                status=200,
            )
            self.assertTrue(form.is_valid(), msg='\n\n%s' % form.errors.as_text())
项目:money-to-prisoners-send-money    作者:ministryofjustice    | 项目源码 | 文件源码
def test_update_incomplete_payments_no_govuk_payment_found(self):
        with responses.RequestsMock() as rsps:
            mock_auth(rsps)
            rsps.add(
                rsps.GET,
                api_url('/payments/'),
                json=self.payment_data,
                status=200,
            )
            rsps.add(
                rsps.GET,
                govuk_url('/payments/%s/' % self.processor_id),
                status=404
            )
            rsps.add(
                rsps.PATCH,
                api_url('/payments/%s/' % self.ref),
                status=200,
            )

            call_command('update_incomplete_payments')

            self.assertEqual(rsps.calls[3].request.body.decode(), '{"status": "failed"}')
项目:money-to-prisoners-send-money    作者:ministryofjustice    | 项目源码 | 文件源码
def test_search_not_limited_to_specific_prisons(self, mocked_api_session):
        mocked_api_session.side_effect = get_api_session
        self.choose_bank_transfer_payment_method()

        with responses.RequestsMock() as rsps:
            mock_auth(rsps)
            rsps.add(
                rsps.GET,
                api_url('/prisoner_validity/') + '?prisoner_number=A1231DE'
                                                 '&prisoner_dob=1980-10-04',
                match_querystring=True,
                json={
                    'count': 0,
                    'results': []
                },
            )
            self.client.post(self.url, data={
                'prisoner_number': 'A1231DE',
                'prisoner_dob_0': '4',
                'prisoner_dob_1': '10',
                'prisoner_dob_2': '1980',
            }, follow=True)
项目:money-to-prisoners-send-money    作者:ministryofjustice    | 项目源码 | 文件源码
def test_can_limit_search_to_specific_prisons(self, mocked_api_session):
        mocked_api_session.side_effect = get_api_session
        self.choose_bank_transfer_payment_method()

        with responses.RequestsMock() as rsps:
            mock_auth(rsps)
            rsps.add(
                rsps.GET,
                api_url('/prisoner_validity/') + '?prisoner_number=A1231DE'
                                                 '&prisoner_dob=1980-10-04'
                                                 '&prisons=ABC,DEF',
                match_querystring=True,
                json={
                    'count': 0,
                    'results': []
                },
            )
            self.client.post(self.url, data={
                'prisoner_number': 'A1231DE',
                'prisoner_dob_0': '4',
                'prisoner_dob_1': '10',
                'prisoner_dob_2': '1980',
            }, follow=True)
项目:money-to-prisoners-send-money    作者:ministryofjustice    | 项目源码 | 文件源码
def test_search_not_limited_to_specific_prisons(self, mocked_api_session):
        mocked_api_session.side_effect = get_api_session
        self.choose_debit_card_payment_method()

        with responses.RequestsMock() as rsps:
            mock_auth(rsps)
            rsps.add(
                rsps.GET,
                api_url('/prisoner_validity/') + '?prisoner_number=A1231DE'
                                                 '&prisoner_dob=1980-10-04',
                match_querystring=True,
                json={
                    'count': 0,
                    'results': []
                },
            )
            self.client.post(self.url, data={
                'prisoner_name': 'john smith',
                'prisoner_number': 'A1231DE',
                'prisoner_dob_0': '4',
                'prisoner_dob_1': '10',
                'prisoner_dob_2': '1980',
            }, follow=True)
项目:money-to-prisoners-send-money    作者:ministryofjustice    | 项目源码 | 文件源码
def test_debit_card_payment_handles_govuk_errors(self):
        self.choose_debit_card_payment_method()
        self.fill_in_prisoner_details()
        self.fill_in_amount()

        with responses.RequestsMock() as rsps:
            mock_auth(rsps)
            rsps.add(
                rsps.POST,
                api_url('/payments/'),
                json={'uuid': 'wargle-blargle'},
                status=201,
            )
            rsps.add(
                rsps.POST,
                govuk_url('/payments/'),
                status=500
            )
            with self.patch_prisoner_details_check(), silence_logger():
                response = self.client.get(self.url, follow=False)
            self.assertContains(response, 'We’re sorry, your payment could not be processed on this occasion')
项目:money-to-prisoners-send-money    作者:ministryofjustice    | 项目源码 | 文件源码
def test_confirmation_handles_api_update_errors(self):
        self.choose_debit_card_payment_method()
        self.fill_in_prisoner_details()
        self.fill_in_amount()

        with responses.RequestsMock() as rsps:
            mock_auth(rsps)
            rsps.add(
                rsps.GET,
                api_url('/payments/%s/' % self.ref),
                status=500,
            )
            with self.patch_prisoner_details_check(), silence_logger():
                response = self.client.get(
                    self.url, {'payment_ref': self.ref}, follow=False
                )
            self.assertContains(response, 'your payment could not be processed')
            self.assertContains(response, self.ref[:8].upper())

            # check session is cleared
            for key in self.complete_session_keys:
                self.assertNotIn(key, self.client.session)
项目:BioDownloader    作者:biomadeira    | 项目源码 | 文件源码
def response_mocker(kwargs, base_url, endpoint_url, status=200,
                    content_type='application/json', post=False, data=None):
    """
    Generates a mocked requests response for a given set of
    kwargs, base url and endpoint url
    """

    url = re.sub('\{\{(?P<m>[a-zA-Z_]+)\}\}', lambda m: "%s" % kwargs.get(m.group(1)),
                 base_url + endpoint_url)
    with responses.RequestsMock() as rsps:
        if post:
            rsps.add(responses.POST, url,
                     body=b'{"data": "some json formatted output"}',
                     status=status, content_type='application/json')
            response = requests.post(url, data=data)

        elif content_type == 'application/json':
            rsps.add(responses.GET, url,
                     body=b'{"data": "some json formatted output"}',
                     status=status, content_type='application/json')
            response = requests.get(url)
        elif content_type == 'text/plain':
            rsps.add(responses.GET, url,
                     body="Some text-based content\n spanning multiple lines",
                     status=status, content_type='text/plain')
            response = requests.get(url)
        else:
            rsps.add(responses.GET, url,
                     body=b"Some other binary stuff...",
                     status=status, content_type='application/octet-stream')
            response = requests.get(url)
    return response
项目:instagram_private_api_extensions    作者:ping    | 项目源码 | 文件源码
def test_downloader_http_errors(self):
        with responses.RequestsMock(assert_all_requests_are_fired=True) as rsps:
            rsps.add(responses.GET, self.TEST_MPD_URL, status=500)
            rsps.add(responses.GET, self.TEST_MPD_URL, status=404)

            dl = live.Downloader(
                mpd=self.TEST_MPD_URL,
                output_dir='output_httperrors',
                duplicate_etag_retry=2,
                singlethreaded=True)
            dl.run()
            dl.stream_id = '17875351285037717'
            output_file = 'output_httperrors.mp4'
            dl.stitch(output_file, cleartempfiles=True)
            self.assertFalse(os.path.isfile(output_file), '{0!s} is generated'.format(output_file))
项目:instagram_private_api_extensions    作者:ping    | 项目源码 | 文件源码
def test_downloader_resp_headers(self):
        with open('mpdstub/mpd/17875351285037717.mpd', 'r') as f:
            mpd_content = f.read()
            with responses.RequestsMock(assert_all_requests_are_fired=True) as rsps:
                rsps.add(responses.GET, self.TEST_MPD_URL, body=mpd_content)
                rsps.add(responses.GET, self.TEST_MPD_URL, body=mpd_content,
                         headers={'Cache-Control': 'max-age=1'})
                rsps.add(responses.GET, self.TEST_MPD_URL, body=mpd_content,
                         headers={'X-FB-Video-Broadcast-Ended': '1'})

                dl = live.Downloader(
                    mpd=self.TEST_MPD_URL,
                    output_dir='output_respheaders')
                dl.run()

            with responses.RequestsMock(assert_all_requests_are_fired=True) as rsps:
                rsps.add(responses.GET, self.TEST_MPD_URL, body=mpd_content,
                         headers={'Cache-Control': 'max-age=1'})
                rsps.add(responses.GET, self.TEST_MPD_URL, body=mpd_content,
                         headers={'Cache-Control': 'max-age=1000'})

                dl = live.Downloader(
                    mpd=self.TEST_MPD_URL,
                    output_dir='output_respheaders')
                dl.run()

            # Can't stitch and check for output because responses does not support
            # url pass through, so the segments cannot be downloaded.
项目:helper_scripts    作者:pythonanywhere    | 项目源码 | 文件源码
def api_responses():
    with responses.RequestsMock() as r:
        yield r
项目:odata-influxdb    作者:Synergetic-Engineering    | 项目源码 | 文件源码
def test_generate_metadata(self):
        with RequestsMock() as rsp:
            rsp.add(rsp.GET, re.compile('.*SHOW\+DATABASES.*'),
                    json=json_database_list, match_querystring=True)
            rsp.add(rsp.GET, re.compile('.*SHOW\+MEASUREMENTS.*'),
                    json=json_measurement_list, match_querystring=True)
            rsp.add(rsp.GET, re.compile('.*SHOW\+MEASUREMENTS.*'),
                    json=json_measurement_list, match_querystring=True)
            rsp.add(rsp.GET, re.compile('.*SHOW\+FIELD\+KEYS.*'),
                    json=json_field_keys, match_querystring=True)
            rsp.add(rsp.GET, re.compile('.*SHOW\+TAG\+KEYS.*'),
                    json=json_tag_keys, match_querystring=True)
            rsp.add(rsp.GET, re.compile('.*SHOW\+FIELD\+KEYS.*'),
                    json=json_field_keys, match_querystring=True)
            rsp.add(rsp.GET, re.compile('.*SHOW\+TAG\+KEYS.*'),
                    json=json_tag_keys, match_querystring=True)
            rsp.add(rsp.GET, re.compile('.*SHOW\+FIELD\+KEYS.*'),
                    json=json_field_keys, match_querystring=True)
            rsp.add(rsp.GET, re.compile('.*SHOW\+TAG\+KEYS.*'),
                    json=json_tag_keys, match_querystring=True)
            rsp.add(rsp.GET, re.compile('.*SHOW\+FIELD\+KEYS.*'),
                    json=json_field_keys, match_querystring=True)
            rsp.add(rsp.GET, re.compile('.*SHOW\+TAG\+KEYS.*'),
                    json=json_tag_keys, match_querystring=True)

            metadata = generate_metadata('influxdb://localhost:8086')
        file1 = open(os.path.join('test_data', 'test_metadata.xml'), 'r').read()
        open(os.path.join('test_data', 'tmp_metadata.xml'), 'wb').write(metadata)
        self.assert_(metadata == file1)
项目:odata-influxdb    作者:Synergetic-Engineering    | 项目源码 | 文件源码
def test_len_collection(self):
        first_feed = next(self._container.itervalues())
        collection = first_feed.OpenCollection()

        with RequestsMock() as rsp:
            rsp.add(rsp.GET, re.compile('.*SELECT\+COUNT.*'),
                    json=json_count(collection.name), match_querystring=True)

            len_collection = len(collection)
        self.assertEqual(len_collection, NUM_TEST_POINTS)