我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.models.Response()。
def mock_nsx_clustered_api(self, session_response=None, **kwargs): orig_request = nsx_cluster.TimeoutSession.request def mocked_request(*args, **kwargs): if args[2].endswith('api/session/create'): response = models.Response() response.status_code = 200 response.headers = { 'Set-Cookie': 'JSESSIONID=%s;junk' % JSESSIONID} return response return orig_request(*args, **kwargs) with mock.patch.object(nsx_cluster.TimeoutSession, 'request', new=mocked_request): cluster = NsxClientTestCase.MockNSXClusteredAPI( session_response=session_response, **kwargs) return cluster
def call_endpoint(self, method: str, endpoint: str, *args: List[Any], **kwargs: Dict[Any, Any]) -> Response: """Call api endpoint method. :param method: REST API methods name (string) - 'POST', 'GET', 'PUT', 'PATCH', 'DELETE' :param endpoint: endpoint which will be called with method. :param args: additional arguments :param kwargs: additional key-worded arguments :return: """ logging.info('Calling method %s for endpoint %s', method, endpoint) if method.upper() not in REST_API_METHODS: logging.error('Method %s does not match REST API METHODS: %s', method, ','.join(REST_API_METHODS)) raise TypeError('Method {0} does not match REST API METHODS: {1}'.format(str(method), ','.join(REST_API_METHODS))) req_method = getattr(requests, method.lower()) resp = req_method(urljoin(self.host_name, endpoint), *args, **kwargs) return resp
def expected_error_response(self, response): if isinstance(response, Response): # requests response # don't want bytes _txt = response.content.decode('utf8') response = ErrorResponse().from_json(_txt) if isinstance(response, ErrorResponse): self.conv.events.store(EV_PROTOCOL_RESPONSE, response, sender=self.__class__.__name__) if response["error"] not in self.expect_error["error"]: raise Break("Wrong error, got {} expected {}".format( response["error"], self.expect_error["error"])) try: if self.expect_error["stop"]: raise Break("Stop requested after received expected error") except KeyError: pass else: self.conv.events.store(EV_FAULT, "Expected error, didn't get it") raise Break("Did not receive expected error") return response
def get_sess_create_resp(): sess_create_response = models.Response() sess_create_response.status_code = 200 sess_create_response.headers = {'Set-Cookie': 'JSESSIONID=abc;'} return sess_create_response
def setUp(self): self.listener = ExecutionListener({}) self.response = Response() self.response.cookies.update({ 'username': 'john', 'sessionid': 'abcd' }) self.context = Context('http://localhost') self.context.headers['Cookie'] = 'name="John Doe"; sessionid=xyz'
def forward_request(self, method, path, data, headers): """ This interceptor method is called by the proxy when receiving a new request (*before* forwarding the request to the backend service). It receives details of the incoming request, and returns either of the following results: * True if the request should be forwarded to the backend service as-is (default). * An integer (e.g., 200) status code to return directly to the client without calling the backend service. * An instance of requests.models.Response to return directly to the client without calling the backend service. * An instance of requests.models.Request which represents a new/modified request that will be forwarded to the backend service. * Any other value, in which case a 503 Bad Gateway is returned to the client without calling the backend service. """ return True
def test_run_test_bad_method(self, capsys): conf = Mock() conf.get.return_value = { 'ep1': {'method': 'FOO'} } args = Mock(endpoint_name='ep1') res1 = Mock(spec_set=Response) type(res1).status_code = 200 type(res1).content = 'res1content' type(res1).headers = { 'h1': 'h1val', 'hz': 'hzval' } with patch.multiple( pbm, autospec=True, requests=DEFAULT, logger=DEFAULT, get_base_url=DEFAULT, node=DEFAULT ) as mocks: mocks['get_base_url'].return_value = 'mybase/' mocks['node'].return_value = 'mynode' mocks['requests'].get.return_value = res1 with pytest.raises(Exception) as excinfo: run_test(conf, args) assert exc_msg(excinfo.value) == 'Unimplemented method: FOO'
def _response_to_json(resp: Response) -> bytes: """Generate a json byte string from a response received through requests.""" # we store only the top of the file because core dumps can become very large # also: we do not want to store more potentially sensitive data than necessary # to determine whether there is a leak or not return json.dumps({ 'text': resp.content[0:50*1024].decode(errors='replace'), 'status_code': resp.status_code, 'headers': dict(resp.headers), 'url': resp.url, }).encode()
def validate_response(called_method: Callable) -> Callable: """Decorator for HTTPClient validating received response for REST API methods. If status code from server is different than 200 raises HTTPError. :param called_method: requests methods - POST, GET, PUT, PATH, DELETE :return: response from server """ def wrapper(method: Callable, endpoint: str, *args: List[Any], **kwargs: Dict[Any, Any]) -> Response: """Executes decorated function and checks status_code.""" resp = called_method(method, endpoint, *args, **kwargs) if resp.status_code != 200: logging.error('HTTP Client returned status code %s for url %s', resp.status_code, resp.url) raise HTTPError('HTTP Client returned status code {0} for url {1}'.format(str(resp.status_code), resp.url)) return resp return wrapper
def to_json(response: Response) -> dict: """Convert response from requests to json object. :param response: response from requests :return: json object """ return response.json()
def setUpDS(self): self.app.app.registry.docservice_url = 'http://localhost' test = self def request(method, url, **kwargs): response = Response() if method == 'POST' and '/upload' in url: url = test.generate_docservice_url() response.status_code = 200 response.encoding = 'application/json' response._content = '{{"data":{{"url":"{url}","hash":"md5:{md5}","format":"application/msword","title":"name.doc"}},"get_url":"{url}"}}'.format(url=url, md5='0'*32) response.reason = '200 OK' return response self._srequest = SESSION.request SESSION.request = request
def setUpBadDS(self): self.app.app.registry.docservice_url = 'http://localhost' def request(method, url, **kwargs): response = Response() response.status_code = 403 response.encoding = 'application/json' response._content = '"Unauthorized: upload_view failed permission check"' response.reason = '403 Forbidden' return response self._srequest = SESSION.request SESSION.request = request
def do_request(self, req, status=None, expect_errors=None): req.headers.environ["HTTP_HOST"] = self.hostname if hasattr(self, 'file_obj') and not self.file_obj.closed: self.file_obj.write(req.as_bytes(True)) self.file_obj.write("\n") if req.body: try: self.file_obj.write( 'DATA:\n' + json.dumps(json.loads(req.body), indent=2, ensure_ascii=False).encode('utf8')) self.file_obj.write("\n") except: pass self.file_obj.write("\n") resp = super(DumpsTestAppwebtest, self).do_request(req, status=status, expect_errors=expect_errors) if hasattr(self, 'file_obj') and not self.file_obj.closed: headers = [(n.title(), v) for n, v in resp.headerlist if n.lower() != 'content-length'] headers.sort() self.file_obj.write(str('Response: %s\n%s\n') % ( resp.status, str('\n').join([str('%s: %s') % (n, v) for n, v in headers]), )) if resp.testbody: try: self.file_obj.write(json.dumps(json.loads(resp.testbody), indent=2, ensure_ascii=False).encode('utf8')) except: pass self.file_obj.write("\n\n") return resp
def test_from_response_404(self): r = models.Response() r.status_code = 404 r.headers['Content-Type'] = "application/json" r._content = json.dumps( {"description": "Archive policy rule foobar does not exist"} ).encode('utf-8') exc = exceptions.from_response(r) self.assertIsInstance(exc, exceptions.ArchivePolicyRuleNotFound)
def test_resource_type_before_resource(self): r = models.Response() r.status_code = 404 r.headers['Content-Type'] = "application/json" r._content = json.dumps( {"description": "Resource type foobar does not exist"} ).encode('utf-8') exc = exceptions.from_response(r) self.assertIsInstance(exc, exceptions.ResourceTypeNotFound)
def test_from_response_keystone_401(self): r = models.Response() r.status_code = 401 r.headers['Content-Type'] = "application/json" r._content = json.dumps({"error": { "message": "The request you have made requires authentication.", "code": 401, "title": "Unauthorized"}} ).encode('utf-8') exc = exceptions.from_response(r) self.assertIsInstance(exc, exceptions.Unauthorized) self.assertEqual("The request you have made requires authentication.", exc.message)
def test_from_response_unknown_middleware(self): r = models.Response() r.status_code = 400 r.headers['Content-Type'] = "application/json" r._content = json.dumps( {"unknown": "random message"} ).encode('utf-8') exc = exceptions.from_response(r) self.assertIsInstance(exc, exceptions.ClientException) self.assertEqual('{"unknown": "random message"}', exc.message)
def post_comment(post_payload, assert_success: bool=True) -> Response: response = requests.post(url=COMMENT_SIDECAR_URL, json=post_payload) if assert_success: assert_that(response.status_code) \ .described_as("Comment creation failed. Message: " + response.text) \ .is_equal_to(201) return response
def get_comments(site: str = DEFAULT_SITE, path: str = DEFAULT_PATH, assert_success: bool=True) -> Response: response = requests.get("{}?site={}&path={}".format(COMMENT_SIDECAR_URL, site, path)) if assert_success: assert_that(response.status_code)\ .described_as("Getting comments failed. Message: " + response.text)\ .is_equal_to(200) return response
def send(self, request, stream=None, timeout=None, verify=None, cert=None, proxies=None): parsed_url = urlparse.urlparse(request.url) # We only work for requests with a host of localhost if parsed_url.netloc.lower() != "localhost": raise InvalidURL("Invalid URL %r: Only localhost is allowed" % request.url) real_url = urlparse.urlunparse(parsed_url[:1] + ("",) + parsed_url[2:]) pathname = url_to_path(real_url) resp = Response() resp.status_code = 200 resp.url = real_url stats = os.stat(pathname) modified = email.utils.formatdate(stats.st_mtime, usegmt=True) resp.headers = CaseInsensitiveDict({ "Content-Type": mimetypes.guess_type(pathname)[0] or "text/plain", "Content-Length": stats.st_size, "Last-Modified": modified, }) resp.raw = LocalFSResponse(open(pathname, "rb")) resp.close = resp.raw.close return resp
def _make_response(self, data, status_code=200): from requests.models import Response response = Response() response._content = json.dumps(data).encode('utf-8') response.status_code = status_code return response
def generate_fake_error_response(msg, status_code=401, encoding='utf-8'): r = Response() r.status_code = status_code r.encoding = encoding r.raw = RequestsStringIO(msg.encode()) r._content_consumed = True r._content = r.raw.read() return r # Use mock decorators when generating documentation, so all functino signatures # are displayed correctly
def mock_response(json_payload): response = Response() response.status_code = 200 response.json = MagicMock(return_value=json_payload) return MagicMock(return_value=response) # Fake API output.
def parser(response): """ Parser Response to Result :param response: :return: dict """ result = { "url": response.url, "title": response.doc('title').text(), } return result
def start_downloader(self, url, args): """ Start Downloader """ resp = Response() if url.find(u'?args=') > -1: real_url, search_word = url.split('?args=') search_word = unicode(unquote(search_word)) print 'url: ', real_url print 'search_word: ', search_word c = IndustryAndCommerceGeetestCrack( url=real_url, search_text=search_word, input_id="content", search_element_id="search", gt_element_class_name="gt_box", gt_slider_knob_name="gt_slider_knob", result_numbers_xpath='/html/body/div[1]/div[6]/div[1]/span', result_list_verify_class='clickStyle') result, cookies = c.crack() current_url = real_url body = result.encode('utf-8') if result else u'<html>??????</html>'.encode('utf-8') # resp.status_code = 200 resp._content = body resp.url = real_url resp.doc = PyQuery(body) return resp else: resp = self.download(url, args=args) return resp
def __str__(self): if isinstance(self.m_response, Response_name): if self.m_response: return "<Response [%s] [%s] [%.2f KB]>" % ( self.m_response.status_code, self.m_response.url, (float(len(self.m_response.content)) / 1000)) else: return "<Response failed: %s>" % self.request.url else: return "<Selenium Response: %s>" % self.request.url
def judge_response(): import requests from requests.models import Response body = requests.get("http://www.126.com") if isinstance(body, Response): print "yes" else: print "no"
def send(self, request, **kwargs): request._extra_kwargs = kwargs self._recorder.append(request) resp = models.Response() resp.url = request.url resp.status_code = self._status resp.raw = six.BytesIO(self._data.encode()) return resp
def run_parallel_download(): file_length = 10000000 class DownloadListener(ProxyListener): def forward_request(self, method, path, data, headers): sleep_time = int(path.replace('/', '')) time.sleep(sleep_time) response = Response() response.status_code = 200 response._content = ('%s' % sleep_time) * file_length return response test_port = 12124 tmp_file_pattern = '/tmp/test.%s' proxy = GenericProxy(port=test_port, update_listener=DownloadListener()) proxy.start() def do_download(param): tmp_file = tmp_file_pattern % param TMP_FILES.append(tmp_file) download('http://localhost:%s/%s' % (test_port, param), tmp_file) values = (1, 2, 3) parallelize(do_download, values) proxy.stop() for val in values: tmp_file = tmp_file_pattern % val assert len(load_file(tmp_file)) == file_length
def test_api_gateway_http_integration(): test_port = 12123 backend_url = 'http://localhost:%s%s' % (test_port, API_PATH_HTTP_BACKEND) # create target HTTP backend class TestListener(ProxyListener): def forward_request(self, **kwargs): response = Response() response.status_code = 200 response._content = kwargs.get('data') or '{}' return response proxy = GenericProxy(test_port, update_listener=TestListener()) proxy.start() # create API Gateway and connect it to the HTTP backend result = connect_api_gateway_to_http('test_gateway2', backend_url, path=API_PATH_HTTP_BACKEND) url = INBOUND_GATEWAY_URL_PATTERN.format(api_id=result['id'], stage_name=TEST_STAGE_NAME, path=API_PATH_HTTP_BACKEND) # make sure CORS headers are present origin = 'localhost' result = requests.options(url, headers={'origin': origin}) assert result.status_code == 200 assert re.match(result.headers['Access-Control-Allow-Origin'].replace('*', '.*'), origin) assert 'POST' in result.headers['Access-Control-Allow-Methods'] # make test request to gateway result = requests.get(url) assert result.status_code == 200 assert to_str(result.content) == '{}' data = {'data': 123} result = requests.post(url, data=json.dumps(data)) assert result.status_code == 200 assert json.loads(to_str(result.content)) == data # clean up proxy.stop()
def error_response(message, code=400, error_type='ValidationError'): response = Response() response.status_code = code response.headers['x-amzn-errortype'] = error_type response._content = """<ErrorResponse xmlns="%s"> <Error> <Type>Sender</Type> <Code>%s</Code> <Message>%s</Message> </Error> <RequestId>%s</RequestId> </ErrorResponse>""" % (XMLNS_CLOUDFORMATION, error_type, message, uuid.uuid4()) return response
def make_response(operation_name, content='', code=200): response = Response() response._content = """<{op_name}Response xmlns="{xmlns}"> <{op_name}Result> {content} </{op_name}Result> <ResponseMetadata><RequestId>{uid}</RequestId></ResponseMetadata> </{op_name}Response>""".format(xmlns=XMLNS_CLOUDFORMATION, op_name=operation_name, uid=uuid.uuid4(), content=content) response.status_code = code return response
def _create_response_object(content, code, headers): response = Response() response.status_code = code response.headers = headers response._content = content return response
def get_cors_response(headers): # TODO: for now we simply return "allow-all" CORS headers, but in the future # we should implement custom headers for CORS rules, as supported by API Gateway: # http://docs.aws.amazon.com/apigateway/latest/developerguide/how-to-cors.html response = Response() response.status_code = 200 response.headers['Access-Control-Allow-Origin'] = '*' response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE' response.headers['Access-Control-Allow-Headers'] = '*' response._content = '' return response
def return_response(self, method, path, data, headers, response): """ This interceptor method is called by the proxy when returning a response (*after* having forwarded the request and received a response from the backend service). It receives details of the incoming request as well as the response from the backend service, and returns either of the following results: * An instance of requests.models.Response to return to the client instead of the actual response returned from the backend service. * Any other value, in which case the response from the backend service is returned to the client. """ return None
def get_cors(bucket_name): response = Response() cors = BUCKET_CORS.get(bucket_name) if not cors: # TODO: check if bucket exists, otherwise return 404-like error cors = { 'CORSConfiguration': [] } body = xmltodict.unparse(cors) response._content = body response.status_code = 200 return response
def set_cors(bucket_name, cors): # TODO: check if bucket exists, otherwise return 404-like error if isinstance(cors, six.string_types): cors = xmltodict.parse(cors) BUCKET_CORS[bucket_name] = cors response = Response() response.status_code = 200 return response
def get_lifecycle(bucket_name): response = Response() lifecycle = BUCKET_LIFECYCLE.get(bucket_name) if not lifecycle: # TODO: check if bucket exists, otherwise return 404-like error lifecycle = { 'LifecycleConfiguration': [] } body = xmltodict.unparse(lifecycle) response._content = body response.status_code = 200 return response
def set_lifecycle(bucket_name, lifecycle): # TODO: check if bucket exists, otherwise return 404-like error if isinstance(to_str(lifecycle), six.string_types): lifecycle = xmltodict.parse(lifecycle) BUCKET_LIFECYCLE[bucket_name] = lifecycle response = Response() response.status_code = 200 return response
def return_response(self, method, path, data, headers, response, request_handler): if method == 'POST' and path == '/': req_data = urlparse.parse_qs(to_str(data)) action = req_data.get('Action', [None])[0] event_type = None queue_url = None if action == 'CreateQueue': event_type = event_publisher.EVENT_SQS_CREATE_QUEUE response_data = xmltodict.parse(response.content) if 'CreateQueueResponse' in response_data: queue_url = response_data['CreateQueueResponse']['CreateQueueResult']['QueueUrl'] elif action == 'DeleteQueue': event_type = event_publisher.EVENT_SQS_DELETE_QUEUE queue_url = req_data.get('QueueUrl', [None])[0] if event_type and queue_url: event_publisher.fire_event(event_type, payload={'u': event_publisher.get_hash(queue_url)}) # patch the response and return the correct endpoint URLs if action in ('CreateQueue', 'GetQueueUrl', 'ListQueues'): content_str = content_str_original = to_str(response.content) new_response = Response() new_response.status_code = response.status_code new_response.headers = response.headers if config.USE_SSL and '<QueueUrl>http://' in content_str: # return https://... if we're supposed to use SSL content_str = re.sub(r'<QueueUrl>\s*http://', r'<QueueUrl>https://', content_str) # expose external hostname:port external_port = get_external_port(headers, request_handler) content_str = re.sub(r'<QueueUrl>\s*([a-z]+)://[^<]*:([0-9]+)/([^<]*)\s*</QueueUrl>', r'<QueueUrl>\1://%s:%s/\3</QueueUrl>' % (HOSTNAME_EXTERNAL, external_port), content_str) new_response._content = content_str if content_str_original != new_response._content: # if changes have been made, return patched response new_response.headers['content-length'] = len(new_response._content) return new_response # extract the external port used by the client to make the request
def kinesis_error_response(data): error_response = Response() error_response.status_code = 200 content = {'FailedRecordCount': 1, 'Records': []} for record in data['Records']: content['Records'].append({ 'ErrorCode': 'ProvisionedThroughputExceededException', 'ErrorMessage': 'Rate exceeded for shard X in stream Y under account Z.' }) error_response._content = json.dumps(content) return error_response
def make_response(op_name, content=''): response = Response() if not content: content = '<MessageId>%s</MessageId>' % short_uid() response._content = """<{op_name}Response xmlns="http://sns.amazonaws.com/doc/2010-03-31/"> <{op_name}Result> {content} </{op_name}Result> <ResponseMetadata><RequestId>{req_id}</RequestId></ResponseMetadata> </{op_name}Response>""".format(op_name=op_name, content=content, req_id=short_uid()) response.status_code = 200 return response
def error_response(message=None, error_type=None, code=400): if not message: message = 'Unknown error' if not error_type: error_type = 'UnknownError' if 'com.amazonaws.dynamodb' not in error_type: error_type = 'com.amazonaws.dynamodb.v20120810#%s' % error_type response = Response() response.status_code = code content = { 'message': message, '__type': error_type } response._content = json.dumps(content) return response
def test_api_gateway_http_integration(): test_port = 12123 backend_url = 'http://localhost:%s%s' % (test_port, API_PATH_HTTP_BACKEND) # create target HTTP backend def listener(**kwargs): response = Response() response.status_code = 200 response._content = json.dumps(kwargs['data']) if kwargs['data'] else '{}' return response proxy = GenericProxy(test_port, update_listener=listener) proxy.start() # create API Gateway and connect it to the HTTP backend result = connect_api_gateway_to_http('test_gateway2', backend_url, path=API_PATH_HTTP_BACKEND) # make test request to gateway url = INBOUND_GATEWAY_URL_PATTERN.format(api_id=result['id'], stage_name=TEST_STAGE_NAME, path=API_PATH_HTTP_BACKEND) result = requests.get(url) assert result.status_code == 200 assert to_str(result.content) == '{}' data = {"data": 123} result = requests.post(url, data=json.dumps(data)) assert result.status_code == 200 assert json.loads(to_str(result.content)) == data # clean up proxy.stop()