Python requests.models 模块,Response() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.models.Response()

项目:vmware-nsxlib    作者:openstack    | 项目源码 | 文件源码
def mock_nsx_clustered_api(self, session_response=None, **kwargs):
        orig_request = nsx_cluster.TimeoutSession.request

        def mocked_request(*args, **kwargs):
            if args[2].endswith('api/session/create'):
                response = models.Response()
                response.status_code = 200
                response.headers = {
                    'Set-Cookie': 'JSESSIONID=%s;junk' % JSESSIONID}
                return response
            return orig_request(*args, **kwargs)

        with mock.patch.object(nsx_cluster.TimeoutSession, 'request',
                               new=mocked_request):
            cluster = NsxClientTestCase.MockNSXClusteredAPI(
                session_response=session_response, **kwargs)
        return cluster
项目:StockScraper    作者:stovorov    | 项目源码 | 文件源码
def call_endpoint(self, method: str, endpoint: str, *args: List[Any], **kwargs: Dict[Any, Any]) -> Response:
        """Call api endpoint method.

        :param method: REST API methods name (string) - 'POST', 'GET', 'PUT', 'PATCH', 'DELETE'
        :param endpoint: endpoint which will be called with method.
        :param args: additional arguments
        :param kwargs: additional key-worded arguments
        :return:
        """
        logging.info('Calling method %s for endpoint %s', method, endpoint)

        if method.upper() not in REST_API_METHODS:
            logging.error('Method %s does not match REST API METHODS: %s', method, ','.join(REST_API_METHODS))
            raise TypeError('Method {0} does not match REST API METHODS: {1}'.format(str(method),
                                                                                     ','.join(REST_API_METHODS)))

        req_method = getattr(requests, method.lower())
        resp = req_method(urljoin(self.host_name, endpoint), *args, **kwargs)
        return resp
项目:otest    作者:rohe    | 项目源码 | 文件源码
def expected_error_response(self, response):
        if isinstance(response, Response):  # requests response
            # don't want bytes
            _txt = response.content.decode('utf8')
            response = ErrorResponse().from_json(_txt)

        if isinstance(response, ErrorResponse):
            self.conv.events.store(EV_PROTOCOL_RESPONSE, response,
                                   sender=self.__class__.__name__)
            if response["error"] not in self.expect_error["error"]:
                raise Break("Wrong error, got {} expected {}".format(
                    response["error"], self.expect_error["error"]))
            try:
                if self.expect_error["stop"]:
                    raise Break("Stop requested after received expected error")
            except KeyError:
                pass
        else:
            self.conv.events.store(EV_FAULT, "Expected error, didn't get it")
            raise Break("Did not receive expected error")

        return response
项目:vmware-nsxlib    作者:openstack    | 项目源码 | 文件源码
def get_sess_create_resp():
    sess_create_response = models.Response()
    sess_create_response.status_code = 200
    sess_create_response.headers = {'Set-Cookie': 'JSESSIONID=abc;'}
    return sess_create_response
项目:http-prompt    作者:eliangcs    | 项目源码 | 文件源码
def setUp(self):
        self.listener = ExecutionListener({})

        self.response = Response()
        self.response.cookies.update({
            'username': 'john',
            'sessionid': 'abcd'
        })

        self.context = Context('http://localhost')
        self.context.headers['Cookie'] = 'name="John Doe"; sessionid=xyz'
项目:localstack    作者:localstack    | 项目源码 | 文件源码
def forward_request(self, method, path, data, headers):
        """ This interceptor method is called by the proxy when receiving a new request
            (*before* forwarding the request to the backend service). It receives details
            of the incoming request, and returns either of the following results:

            * True if the request should be forwarded to the backend service as-is (default).
            * An integer (e.g., 200) status code to return directly to the client without
              calling the backend service.
            * An instance of requests.models.Response to return directly to the client without
              calling the backend service.
            * An instance of requests.models.Request which represents a new/modified request
              that will be forwarded to the backend service.
            * Any other value, in which case a 503 Bad Gateway is returned to the client
              without calling the backend service.
        """
        return True
项目:webhook2lambda2sqs    作者:jantman    | 项目源码 | 文件源码
def test_run_test_bad_method(self, capsys):
        conf = Mock()
        conf.get.return_value = {
            'ep1': {'method': 'FOO'}
        }
        args = Mock(endpoint_name='ep1')
        res1 = Mock(spec_set=Response)
        type(res1).status_code = 200
        type(res1).content = 'res1content'
        type(res1).headers = {
            'h1': 'h1val',
            'hz': 'hzval'
        }

        with patch.multiple(
            pbm,
            autospec=True,
            requests=DEFAULT,
            logger=DEFAULT,
            get_base_url=DEFAULT,
            node=DEFAULT
        ) as mocks:
            mocks['get_base_url'].return_value = 'mybase/'
            mocks['node'].return_value = 'mynode'
            mocks['requests'].get.return_value = res1
            with pytest.raises(Exception) as excinfo:
                run_test(conf, args)
        assert exc_msg(excinfo.value) == 'Unimplemented method: FOO'
项目:PrivacyScore    作者:PrivacyScore    | 项目源码 | 文件源码
def _response_to_json(resp: Response) -> bytes:
    """Generate a json byte string from a response received through requests."""
    # we store only the top of the file because core dumps can become very large
    # also: we do not want to store more potentially sensitive data than necessary
    # to determine whether there is a leak or not

    return json.dumps({
        'text': resp.content[0:50*1024].decode(errors='replace'),
        'status_code': resp.status_code,
        'headers': dict(resp.headers),
        'url': resp.url,
    }).encode()
项目:StockScraper    作者:stovorov    | 项目源码 | 文件源码
def validate_response(called_method: Callable) -> Callable:
    """Decorator for HTTPClient validating received response for REST API methods.
    If status code from server is different than 200 raises HTTPError.

    :param called_method: requests methods - POST, GET, PUT, PATH, DELETE
    :return: response from server
    """
    def wrapper(method: Callable, endpoint: str, *args: List[Any], **kwargs: Dict[Any, Any]) -> Response:
        """Executes decorated function and checks status_code."""
        resp = called_method(method, endpoint, *args, **kwargs)
        if resp.status_code != 200:
            logging.error('HTTP Client returned status code %s for url %s', resp.status_code, resp.url)
            raise HTTPError('HTTP Client returned status code {0} for url {1}'.format(str(resp.status_code), resp.url))
        return resp
    return wrapper
项目:StockScraper    作者:stovorov    | 项目源码 | 文件源码
def to_json(response: Response) -> dict:
        """Convert response from requests to json object.

        :param response: response from requests
        :return: json object
        """
        return response.json()
项目:openregistry.api    作者:openprocurement    | 项目源码 | 文件源码
def setUpDS(self):
        self.app.app.registry.docservice_url = 'http://localhost'
        test = self
        def request(method, url, **kwargs):
            response = Response()
            if method == 'POST' and '/upload' in url:
                url = test.generate_docservice_url()
                response.status_code = 200
                response.encoding = 'application/json'
                response._content = '{{"data":{{"url":"{url}","hash":"md5:{md5}","format":"application/msword","title":"name.doc"}},"get_url":"{url}"}}'.format(url=url, md5='0'*32)
                response.reason = '200 OK'
            return response

        self._srequest = SESSION.request
        SESSION.request = request
项目:openregistry.api    作者:openprocurement    | 项目源码 | 文件源码
def setUpBadDS(self):
        self.app.app.registry.docservice_url = 'http://localhost'
        def request(method, url, **kwargs):
            response = Response()
            response.status_code = 403
            response.encoding = 'application/json'
            response._content = '"Unauthorized: upload_view failed permission check"'
            response.reason = '403 Forbidden'
            return response

        self._srequest = SESSION.request
        SESSION.request = request
项目:openregistry.api    作者:openprocurement    | 项目源码 | 文件源码
def do_request(self, req, status=None, expect_errors=None):
        req.headers.environ["HTTP_HOST"] = self.hostname
        if hasattr(self, 'file_obj') and not self.file_obj.closed:
            self.file_obj.write(req.as_bytes(True))
            self.file_obj.write("\n")
            if req.body:
                try:
                    self.file_obj.write(
                            'DATA:\n' + json.dumps(json.loads(req.body), indent=2, ensure_ascii=False).encode('utf8'))
                    self.file_obj.write("\n")
                except:
                    pass
            self.file_obj.write("\n")
        resp = super(DumpsTestAppwebtest, self).do_request(req, status=status, expect_errors=expect_errors)
        if hasattr(self, 'file_obj') and not self.file_obj.closed:
            headers = [(n.title(), v)
                       for n, v in resp.headerlist
                       if n.lower() != 'content-length']
            headers.sort()
            self.file_obj.write(str('Response: %s\n%s\n') % (
                resp.status,
                str('\n').join([str('%s: %s') % (n, v) for n, v in headers]),
            ))

            if resp.testbody:
                try:
                    self.file_obj.write(json.dumps(json.loads(resp.testbody), indent=2, ensure_ascii=False).encode('utf8'))
                except:
                    pass
            self.file_obj.write("\n\n")
        return resp
项目:python-gnocchiclient    作者:gnocchixyz    | 项目源码 | 文件源码
def test_from_response_404(self):
        r = models.Response()
        r.status_code = 404
        r.headers['Content-Type'] = "application/json"
        r._content = json.dumps(
            {"description": "Archive policy rule foobar does not exist"}
        ).encode('utf-8')
        exc = exceptions.from_response(r)
        self.assertIsInstance(exc, exceptions.ArchivePolicyRuleNotFound)
项目:python-gnocchiclient    作者:gnocchixyz    | 项目源码 | 文件源码
def test_resource_type_before_resource(self):
        r = models.Response()
        r.status_code = 404
        r.headers['Content-Type'] = "application/json"
        r._content = json.dumps(
            {"description": "Resource type foobar does not exist"}
        ).encode('utf-8')
        exc = exceptions.from_response(r)
        self.assertIsInstance(exc, exceptions.ResourceTypeNotFound)
项目:python-gnocchiclient    作者:gnocchixyz    | 项目源码 | 文件源码
def test_from_response_keystone_401(self):
        r = models.Response()
        r.status_code = 401
        r.headers['Content-Type'] = "application/json"
        r._content = json.dumps({"error": {
            "message": "The request you have made requires authentication.",
            "code": 401, "title": "Unauthorized"}}
        ).encode('utf-8')
        exc = exceptions.from_response(r)
        self.assertIsInstance(exc, exceptions.Unauthorized)
        self.assertEqual("The request you have made requires authentication.",
                         exc.message)
项目:python-gnocchiclient    作者:gnocchixyz    | 项目源码 | 文件源码
def test_from_response_unknown_middleware(self):
        r = models.Response()
        r.status_code = 400
        r.headers['Content-Type'] = "application/json"
        r._content = json.dumps(
            {"unknown": "random message"}
        ).encode('utf-8')
        exc = exceptions.from_response(r)
        self.assertIsInstance(exc, exceptions.ClientException)
        self.assertEqual('{"unknown": "random message"}', exc.message)
项目:comment-sidecar    作者:phauer    | 项目源码 | 文件源码
def post_comment(post_payload, assert_success: bool=True) -> Response:
    response = requests.post(url=COMMENT_SIDECAR_URL, json=post_payload)
    if assert_success:
        assert_that(response.status_code) \
            .described_as("Comment creation failed. Message: " + response.text) \
            .is_equal_to(201)
    return response
项目:comment-sidecar    作者:phauer    | 项目源码 | 文件源码
def get_comments(site: str = DEFAULT_SITE, path: str = DEFAULT_PATH, assert_success: bool=True) -> Response:
    response = requests.get("{}?site={}&path={}".format(COMMENT_SIDECAR_URL, site, path))
    if assert_success:
        assert_that(response.status_code)\
            .described_as("Getting comments failed. Message: " + response.text)\
            .is_equal_to(200)
    return response
项目:Callandtext    作者:iaora    | 项目源码 | 文件源码
def send(self, request, stream=None, timeout=None, verify=None, cert=None,
             proxies=None):
        parsed_url = urlparse.urlparse(request.url)

        # We only work for requests with a host of localhost
        if parsed_url.netloc.lower() != "localhost":
            raise InvalidURL("Invalid URL %r: Only localhost is allowed" %
                request.url)

        real_url = urlparse.urlunparse(parsed_url[:1] + ("",) + parsed_url[2:])
        pathname = url_to_path(real_url)

        resp = Response()
        resp.status_code = 200
        resp.url = real_url

        stats = os.stat(pathname)
        modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
        resp.headers = CaseInsensitiveDict({
            "Content-Type": mimetypes.guess_type(pathname)[0] or "text/plain",
            "Content-Length": stats.st_size,
            "Last-Modified": modified,
        })

        resp.raw = LocalFSResponse(open(pathname, "rb"))
        resp.close = resp.raw.close

        return resp
项目:python_ddd_flask    作者:igorvinnicius    | 项目源码 | 文件源码
def send(self, request, stream=None, timeout=None, verify=None, cert=None,
             proxies=None):
        parsed_url = urlparse.urlparse(request.url)

        # We only work for requests with a host of localhost
        if parsed_url.netloc.lower() != "localhost":
            raise InvalidURL("Invalid URL %r: Only localhost is allowed" %
                request.url)

        real_url = urlparse.urlunparse(parsed_url[:1] + ("",) + parsed_url[2:])
        pathname = url_to_path(real_url)

        resp = Response()
        resp.status_code = 200
        resp.url = real_url

        stats = os.stat(pathname)
        modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
        resp.headers = CaseInsensitiveDict({
            "Content-Type": mimetypes.guess_type(pathname)[0] or "text/plain",
            "Content-Length": stats.st_size,
            "Last-Modified": modified,
        })

        resp.raw = LocalFSResponse(open(pathname, "rb"))
        resp.close = resp.raw.close

        return resp
项目:wukong    作者:SurveyMonkey    | 项目源码 | 文件源码
def _make_response(self, data, status_code=200):
        from requests.models import Response
        response = Response()
        response._content = json.dumps(data).encode('utf-8')
        response.status_code = status_code
        return response
项目:aegea    作者:kislyuk    | 项目源码 | 文件源码
def generate_fake_error_response(msg, status_code=401, encoding='utf-8'):
    r = Response()
    r.status_code = status_code
    r.encoding = encoding
    r.raw = RequestsStringIO(msg.encode())
    r._content_consumed = True
    r._content = r.raw.read()
    return r

# Use mock decorators when generating documentation, so all functino signatures
# are displayed correctly
项目:netbox-as-ansible-inventory    作者:AAbouZaid    | 项目源码 | 文件源码
def mock_response(json_payload):
    response = Response()
    response.status_code = 200
    response.json = MagicMock(return_value=json_payload)
    return MagicMock(return_value=response)

# Fake API output.
项目:xspider    作者:zym1115718204    | 项目源码 | 文件源码
def parser(response):
        """
        Parser Response to Result
        :param response:
        :return: dict
        """
        result = {
            "url": response.url,
            "title": response.doc('title').text(),
        }
        return result
项目:xspider    作者:zym1115718204    | 项目源码 | 文件源码
def start_downloader(self, url, args):
        """
        Start Downloader
        """
        resp = Response()
        if url.find(u'?args=') > -1:
            real_url, search_word = url.split('?args=')
            search_word = unicode(unquote(search_word))
            print 'url: ', real_url
            print 'search_word: ', search_word
            c = IndustryAndCommerceGeetestCrack(
                url=real_url,
                search_text=search_word,
                input_id="content",
                search_element_id="search",
                gt_element_class_name="gt_box",
                gt_slider_knob_name="gt_slider_knob",
                result_numbers_xpath='/html/body/div[1]/div[6]/div[1]/span',
                result_list_verify_class='clickStyle')
            result, cookies = c.crack()
            current_url = real_url
            body = result.encode('utf-8') if result else u'<html>??????</html>'.encode('utf-8')
            # resp.status_code = 200
            resp._content = body
            resp.url = real_url
            resp.doc = PyQuery(body)
            return resp
        else:
            resp = self.download(url, args=args)
            return resp
项目:Sudoku-Solver    作者:ayush1997    | 项目源码 | 文件源码
def send(self, request, stream=None, timeout=None, verify=None, cert=None,
             proxies=None):
        parsed_url = urlparse.urlparse(request.url)

        # We only work for requests with a host of localhost
        if parsed_url.netloc.lower() != "localhost":
            raise InvalidURL("Invalid URL %r: Only localhost is allowed" %
                request.url)

        real_url = urlparse.urlunparse(parsed_url[:1] + ("",) + parsed_url[2:])
        pathname = url_to_path(real_url)

        resp = Response()
        resp.status_code = 200
        resp.url = real_url

        stats = os.stat(pathname)
        modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
        resp.headers = CaseInsensitiveDict({
            "Content-Type": mimetypes.guess_type(pathname)[0] or "text/plain",
            "Content-Length": stats.st_size,
            "Last-Modified": modified,
        })

        resp.raw = LocalFSResponse(open(pathname, "rb"))
        resp.close = resp.raw.close

        return resp
项目:youtube-trending-music    作者:ishan-nitj    | 项目源码 | 文件源码
def send(self, request, stream=None, timeout=None, verify=None, cert=None,
             proxies=None):
        parsed_url = urlparse.urlparse(request.url)

        # We only work for requests with a host of localhost
        if parsed_url.netloc.lower() != "localhost":
            raise InvalidURL("Invalid URL %r: Only localhost is allowed" %
                request.url)

        real_url = urlparse.urlunparse(parsed_url[:1] + ("",) + parsed_url[2:])
        pathname = url_to_path(real_url)

        resp = Response()
        resp.status_code = 200
        resp.url = real_url

        stats = os.stat(pathname)
        modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
        resp.headers = CaseInsensitiveDict({
            "Content-Type": mimetypes.guess_type(pathname)[0] or "text/plain",
            "Content-Length": stats.st_size,
            "Last-Modified": modified,
        })

        resp.raw = LocalFSResponse(open(pathname, "rb"))
        resp.close = resp.raw.close

        return resp
项目:gitsome    作者:donnemartin    | 项目源码 | 文件源码
def generate_fake_error_response(msg, status_code=401, encoding='utf-8'):
    r = Response()
    r.status_code = status_code
    r.encoding = encoding
    r.raw = RequestsStringIO(msg.encode())
    r._content_consumed = True
    r._content = r.raw.read()
    return r

# Use mock decorators when generating documentation, so all functino signatures
# are displayed correctly
项目:Sasila    作者:DarkSand    | 项目源码 | 文件源码
def __str__(self):
        if isinstance(self.m_response, Response_name):
            if self.m_response:
                return "<Response [%s] [%s] [%.2f KB]>" % (
                    self.m_response.status_code, self.m_response.url, (float(len(self.m_response.content)) / 1000))
            else:
                return "<Response failed: %s>" % self.request.url
        else:
            return "<Selenium Response: %s>" % self.request.url
项目:biji    作者:jianmoumou    | 项目源码 | 文件源码
def judge_response():
    import requests
    from requests.models import Response
    body = requests.get("http://www.126.com")
    if isinstance(body, Response):
        print "yes"
    else:
        print "no"
项目:firebase-admin-python    作者:firebase    | 项目源码 | 文件源码
def send(self, request, **kwargs):
        request._extra_kwargs = kwargs
        self._recorder.append(request)
        resp = models.Response()
        resp.url = request.url
        resp.status_code = self._status
        resp.raw = six.BytesIO(self._data.encode())
        return resp
项目:localstack    作者:localstack    | 项目源码 | 文件源码
def run_parallel_download():

    file_length = 10000000

    class DownloadListener(ProxyListener):

        def forward_request(self, method, path, data, headers):
            sleep_time = int(path.replace('/', ''))
            time.sleep(sleep_time)
            response = Response()
            response.status_code = 200
            response._content = ('%s' % sleep_time) * file_length
            return response

    test_port = 12124
    tmp_file_pattern = '/tmp/test.%s'

    proxy = GenericProxy(port=test_port, update_listener=DownloadListener())
    proxy.start()

    def do_download(param):
        tmp_file = tmp_file_pattern % param
        TMP_FILES.append(tmp_file)
        download('http://localhost:%s/%s' % (test_port, param), tmp_file)

    values = (1, 2, 3)
    parallelize(do_download, values)
    proxy.stop()

    for val in values:
        tmp_file = tmp_file_pattern % val
        assert len(load_file(tmp_file)) == file_length
项目:localstack    作者:localstack    | 项目源码 | 文件源码
def test_api_gateway_http_integration():
    test_port = 12123
    backend_url = 'http://localhost:%s%s' % (test_port, API_PATH_HTTP_BACKEND)

    # create target HTTP backend
    class TestListener(ProxyListener):

        def forward_request(self, **kwargs):
            response = Response()
            response.status_code = 200
            response._content = kwargs.get('data') or '{}'
            return response

    proxy = GenericProxy(test_port, update_listener=TestListener())
    proxy.start()

    # create API Gateway and connect it to the HTTP backend
    result = connect_api_gateway_to_http('test_gateway2', backend_url, path=API_PATH_HTTP_BACKEND)

    url = INBOUND_GATEWAY_URL_PATTERN.format(api_id=result['id'],
        stage_name=TEST_STAGE_NAME, path=API_PATH_HTTP_BACKEND)

    # make sure CORS headers are present
    origin = 'localhost'
    result = requests.options(url, headers={'origin': origin})
    assert result.status_code == 200
    assert re.match(result.headers['Access-Control-Allow-Origin'].replace('*', '.*'), origin)
    assert 'POST' in result.headers['Access-Control-Allow-Methods']

    # make test request to gateway
    result = requests.get(url)
    assert result.status_code == 200
    assert to_str(result.content) == '{}'
    data = {'data': 123}
    result = requests.post(url, data=json.dumps(data))
    assert result.status_code == 200
    assert json.loads(to_str(result.content)) == data

    # clean up
    proxy.stop()
项目:localstack    作者:localstack    | 项目源码 | 文件源码
def error_response(message, code=400, error_type='ValidationError'):
    response = Response()
    response.status_code = code
    response.headers['x-amzn-errortype'] = error_type
    response._content = """<ErrorResponse xmlns="%s">
          <Error>
            <Type>Sender</Type>
            <Code>%s</Code>
            <Message>%s</Message>
          </Error>
          <RequestId>%s</RequestId>
        </ErrorResponse>""" % (XMLNS_CLOUDFORMATION, error_type, message, uuid.uuid4())
    return response
项目:localstack    作者:localstack    | 项目源码 | 文件源码
def make_response(operation_name, content='', code=200):
    response = Response()
    response._content = """<{op_name}Response xmlns="{xmlns}">
      <{op_name}Result>
        {content}
      </{op_name}Result>
      <ResponseMetadata><RequestId>{uid}</RequestId></ResponseMetadata>
    </{op_name}Response>""".format(xmlns=XMLNS_CLOUDFORMATION,
        op_name=operation_name, uid=uuid.uuid4(), content=content)
    response.status_code = code
    return response
项目:localstack    作者:localstack    | 项目源码 | 文件源码
def _create_response_object(content, code, headers):
    response = Response()
    response.status_code = code
    response.headers = headers
    response._content = content
    return response
项目:localstack    作者:localstack    | 项目源码 | 文件源码
def get_cors_response(headers):
    # TODO: for now we simply return "allow-all" CORS headers, but in the future
    # we should implement custom headers for CORS rules, as supported by API Gateway:
    # http://docs.aws.amazon.com/apigateway/latest/developerguide/how-to-cors.html
    response = Response()
    response.status_code = 200
    response.headers['Access-Control-Allow-Origin'] = '*'
    response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE'
    response.headers['Access-Control-Allow-Headers'] = '*'
    response._content = ''
    return response
项目:localstack    作者:localstack    | 项目源码 | 文件源码
def return_response(self, method, path, data, headers, response):
        """ This interceptor method is called by the proxy when returning a response
            (*after* having forwarded the request and received a response from the backend
            service). It receives details of the incoming request as well as the response
            from the backend service, and returns either of the following results:

            * An instance of requests.models.Response to return to the client instead of the
              actual response returned from the backend service.
            * Any other value, in which case the response from the backend service is
              returned to the client.
        """
        return None
项目:localstack    作者:localstack    | 项目源码 | 文件源码
def get_cors(bucket_name):
    response = Response()
    cors = BUCKET_CORS.get(bucket_name)
    if not cors:
        # TODO: check if bucket exists, otherwise return 404-like error
        cors = {
            'CORSConfiguration': []
        }
    body = xmltodict.unparse(cors)
    response._content = body
    response.status_code = 200
    return response
项目:localstack    作者:localstack    | 项目源码 | 文件源码
def set_cors(bucket_name, cors):
    # TODO: check if bucket exists, otherwise return 404-like error
    if isinstance(cors, six.string_types):
        cors = xmltodict.parse(cors)
    BUCKET_CORS[bucket_name] = cors
    response = Response()
    response.status_code = 200
    return response
项目:localstack    作者:localstack    | 项目源码 | 文件源码
def get_lifecycle(bucket_name):
    response = Response()
    lifecycle = BUCKET_LIFECYCLE.get(bucket_name)
    if not lifecycle:
        # TODO: check if bucket exists, otherwise return 404-like error
        lifecycle = {
            'LifecycleConfiguration': []
        }
    body = xmltodict.unparse(lifecycle)
    response._content = body
    response.status_code = 200
    return response
项目:localstack    作者:localstack    | 项目源码 | 文件源码
def set_lifecycle(bucket_name, lifecycle):
    # TODO: check if bucket exists, otherwise return 404-like error
    if isinstance(to_str(lifecycle), six.string_types):
        lifecycle = xmltodict.parse(lifecycle)
    BUCKET_LIFECYCLE[bucket_name] = lifecycle
    response = Response()
    response.status_code = 200
    return response
项目:localstack    作者:localstack    | 项目源码 | 文件源码
def return_response(self, method, path, data, headers, response, request_handler):

        if method == 'POST' and path == '/':
            req_data = urlparse.parse_qs(to_str(data))
            action = req_data.get('Action', [None])[0]
            event_type = None
            queue_url = None
            if action == 'CreateQueue':
                event_type = event_publisher.EVENT_SQS_CREATE_QUEUE
                response_data = xmltodict.parse(response.content)
                if 'CreateQueueResponse' in response_data:
                    queue_url = response_data['CreateQueueResponse']['CreateQueueResult']['QueueUrl']
            elif action == 'DeleteQueue':
                event_type = event_publisher.EVENT_SQS_DELETE_QUEUE
                queue_url = req_data.get('QueueUrl', [None])[0]

            if event_type and queue_url:
                event_publisher.fire_event(event_type, payload={'u': event_publisher.get_hash(queue_url)})

            # patch the response and return the correct endpoint URLs
            if action in ('CreateQueue', 'GetQueueUrl', 'ListQueues'):
                content_str = content_str_original = to_str(response.content)
                new_response = Response()
                new_response.status_code = response.status_code
                new_response.headers = response.headers
                if config.USE_SSL and '<QueueUrl>http://' in content_str:
                    # return https://... if we're supposed to use SSL
                    content_str = re.sub(r'<QueueUrl>\s*http://', r'<QueueUrl>https://', content_str)
                # expose external hostname:port
                external_port = get_external_port(headers, request_handler)
                content_str = re.sub(r'<QueueUrl>\s*([a-z]+)://[^<]*:([0-9]+)/([^<]*)\s*</QueueUrl>',
                    r'<QueueUrl>\1://%s:%s/\3</QueueUrl>' % (HOSTNAME_EXTERNAL, external_port), content_str)
                new_response._content = content_str
                if content_str_original != new_response._content:
                    # if changes have been made, return patched response
                    new_response.headers['content-length'] = len(new_response._content)
                    return new_response


# extract the external port used by the client to make the request
项目:localstack    作者:localstack    | 项目源码 | 文件源码
def kinesis_error_response(data):
    error_response = Response()
    error_response.status_code = 200
    content = {'FailedRecordCount': 1, 'Records': []}
    for record in data['Records']:
        content['Records'].append({
            'ErrorCode': 'ProvisionedThroughputExceededException',
            'ErrorMessage': 'Rate exceeded for shard X in stream Y under account Z.'
        })
    error_response._content = json.dumps(content)
    return error_response
项目:localstack    作者:localstack    | 项目源码 | 文件源码
def make_response(op_name, content=''):
    response = Response()
    if not content:
        content = '<MessageId>%s</MessageId>' % short_uid()
    response._content = """<{op_name}Response xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
        <{op_name}Result>
            {content}
        </{op_name}Result>
        <ResponseMetadata><RequestId>{req_id}</RequestId></ResponseMetadata>
        </{op_name}Response>""".format(op_name=op_name, content=content, req_id=short_uid())
    response.status_code = 200
    return response
项目:localstack    作者:localstack    | 项目源码 | 文件源码
def error_response(message=None, error_type=None, code=400):
    if not message:
        message = 'Unknown error'
    if not error_type:
        error_type = 'UnknownError'
    if 'com.amazonaws.dynamodb' not in error_type:
        error_type = 'com.amazonaws.dynamodb.v20120810#%s' % error_type
    response = Response()
    response.status_code = code
    content = {
        'message': message,
        '__type': error_type
    }
    response._content = json.dumps(content)
    return response
项目:MyFriend-Rob    作者:lcheniv    | 项目源码 | 文件源码
def send(self, request, stream=None, timeout=None, verify=None, cert=None,
             proxies=None):
        parsed_url = urlparse.urlparse(request.url)

        # We only work for requests with a host of localhost
        if parsed_url.netloc.lower() != "localhost":
            raise InvalidURL("Invalid URL %r: Only localhost is allowed" %
                request.url)

        real_url = urlparse.urlunparse(parsed_url[:1] + ("",) + parsed_url[2:])
        pathname = url_to_path(real_url)

        resp = Response()
        resp.status_code = 200
        resp.url = real_url

        stats = os.stat(pathname)
        modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
        resp.headers = CaseInsensitiveDict({
            "Content-Type": mimetypes.guess_type(pathname)[0] or "text/plain",
            "Content-Length": stats.st_size,
            "Last-Modified": modified,
        })

        resp.raw = LocalFSResponse(open(pathname, "rb"))
        resp.close = resp.raw.close

        return resp
项目:localstack    作者:atlassian    | 项目源码 | 文件源码
def test_api_gateway_http_integration():
    test_port = 12123
    backend_url = 'http://localhost:%s%s' % (test_port, API_PATH_HTTP_BACKEND)

    # create target HTTP backend
    def listener(**kwargs):
        response = Response()
        response.status_code = 200
        response._content = json.dumps(kwargs['data']) if kwargs['data'] else '{}'
        return response

    proxy = GenericProxy(test_port, update_listener=listener)
    proxy.start()

    # create API Gateway and connect it to the HTTP backend
    result = connect_api_gateway_to_http('test_gateway2', backend_url, path=API_PATH_HTTP_BACKEND)

    # make test request to gateway
    url = INBOUND_GATEWAY_URL_PATTERN.format(api_id=result['id'],
        stage_name=TEST_STAGE_NAME, path=API_PATH_HTTP_BACKEND)
    result = requests.get(url)
    assert result.status_code == 200
    assert to_str(result.content) == '{}'
    data = {"data": 123}
    result = requests.post(url, data=json.dumps(data))
    assert result.status_code == 200
    assert json.loads(to_str(result.content)) == data

    # clean up
    proxy.stop()
项目:localstack    作者:atlassian    | 项目源码 | 文件源码
def error_response(message, code=400, error_type='ValidationError'):
    response = Response()
    response.status_code = code
    response.headers['x-amzn-errortype'] = error_type
    response._content = """<ErrorResponse xmlns="%s">
          <Error>
            <Type>Sender</Type>
            <Code>%s</Code>
            <Message>%s</Message>
          </Error>
          <RequestId>%s</RequestId>
        </ErrorResponse>""" % (XMLNS_CLOUDFORMATION, error_type, message, uuid.uuid4())
    return response