Python requests 模块,Response() 实例源码

我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用requests.Response()

项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def get_response_and_time(self, key, default=(None, None)):
        """ Retrieves response and timestamp for `key` if it's stored in cache,
        otherwise returns `default`

        :param key: key of resource
        :param default: return this if `key` not found in cache
        :returns: tuple (response, datetime)

        .. note:: Response is restored after unpickling with :meth:`restore_response`
        """
        try:
            if key not in self.responses:
                key = self.keys_map[key]
            response, timestamp = self.responses[key]
        except KeyError:
            return default
        return self.restore_response(response), timestamp
项目:onedrive-e    作者:tobecontinued    | 项目源码 | 文件源码
def put(self, url, data, headers=None, ok_status_code=requests.codes.ok, auto_renew=True):
        """
        Perform a HTTP PUT request.
        :param str url: URL of the HTTP request.
        :param bytes | None data: Binary data to send in the request body.
        :param dict | None headers: Additional headers for the HTTP request.
        :param int ok_status_code: (Optional) Expected status code for the HTTP response.
        :param True | False auto_renew: (Optional) If True, auto recover from expired token error or Internet failure.
        :rtype: requests.Response
        """
        params = {
            'proxies': self.proxies,
            'data': data
        }
        if headers is not None:
            params['headers'] = headers
        return self.request('put', url, params=params,
                            ok_status_code=ok_status_code, auto_renew=auto_renew)
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def save_response(self, key, response):
        """ Save response to cache

        :param key: key for this response
        :param response: response to save

        .. note:: Response is reduced before saving (with :meth:`reduce_response`)
                  to make it picklable
        """
        #
        # Delete the X-Auth-Token
        #
        if 'X-Auth-Token' in response.request.headers:
            del response.request.headers['X-Auth-Token']

        self.responses[key] = response
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def get_response(self, key, default=None):
        """ Retrieves response and timestamp for `key` if it's stored in cache,
        otherwise returns `default`

        :param key: key of resource
        :param default: return this if `key` not found in cache
        :returns: tuple (response, datetime)

        .. note:: Response is restored after unpickling with :meth:`restore_response`
        """
        try:
            if key not in self.responses:
                key = self.keys_map[key]
            response = self.responses[key]
        except KeyError:
            return default
        return response
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def restore_response(self, response, seen=None):
        """ Restore response object after unpickling
        """
        if seen is None:
            seen = {}
        try:
            return seen[id(response)]
        except KeyError:
            pass
        result = requests.Response()
        for field in self._response_attrs:
            setattr(result, field, getattr(response, field, None))
        result.raw._cached_content_ = result.content
        seen[id(response)] = result
        result.history = tuple(self.restore_response(r, seen) for r in response.history)
        return result
项目:networking-opencontrail    作者:openstack    | 项目源码 | 文件源码
def test_request_api_server_auth_recover(self, options, config, request,
                                             text):
        driver = self._get_driver(options, config)
        driver._apiinsecure = False
        driver._use_api_certs = False
        response_bad = requests.Response()
        response_bad.status_code = requests.codes.unauthorized
        token = "xyztoken"
        text.return_value = json.dumps({
            'access': {
                'token': {
                    'id': token
                },
            },
        })
        response_good = requests.Response()
        response_good.status_code = requests.codes.ok
        request.side_effect = [response_bad, response_good, response_good]
        url = "/URL"

        driver._request_api_server(url)

        request.assert_called_with('/URL', data=None,
                                   headers={'X-AUTH-TOKEN': token})
项目:networking-opencontrail    作者:openstack    | 项目源码 | 文件源码
def test_request_backend(self, options, config):
        driver = self._get_driver(options, config)
        driver._relay_request = mock.MagicMock()
        response = requests.Response()
        response.status_code = requests.codes.ok
        driver._relay_request.return_value = response

        context = mock.MagicMock()
        data_dict = {}
        obj_name = 'object'
        action = 'TEST'

        code, message = driver._request_backend(context, data_dict, obj_name,
                                                action)

        self.assertEqual(requests.codes.ok, code)
        self.assertEqual({'message': None}, message)
        driver._relay_request.assert_called()
项目:backend.ai-client-py    作者:lablup    | 项目源码 | 文件源码
def test_asend_returns_appropriate_sorna_response(mocker, req_params,
                                                        mock_sorna_aresp):
    req = Request(**req_params)
    methods = Request._allowed_methods
    for method in methods:
        req.method = method

        mock_reqfunc = mocker.patch.object(
            aiohttp.ClientSession, method.lower(),
            new_callable=asynctest.CoroutineMock
        )
        mock_reqfunc.return_value, conf = mock_sorna_aresp

        resp = await req.asend()

        assert isinstance(resp, Response)
        assert resp.status == conf['status']
        assert resp.reason == conf['reason']
        assert resp.content_type == conf['content_type']
        body = await conf['read']()
        assert resp.content_length == len(body)
        assert resp.text() == body.decode()
        assert resp.json() == json.loads(body.decode())
项目:retrieve-and-rank-tuning    作者:rchaks    | 项目源码 | 文件源码
def _parse_response_content_for_predictions(self, question_number, response):
        """
        Parses the json representation of the docs from the HTTP response and returns it as list predictions
            with scores (and confidences if they exist)
        :param str question_number: used as qid
        :param requests.Response response:
        :return: list of feature vectors
        :rtype: list(list(str))
        """
        response_content = response.json()
        results = []
        if response_content['response']['numFound'] > 0:
            for doc in response_content['response']['docs']:
                if "ranker.confidence" in doc:
                    conf_score = doc["ranker.confidence"]
                else:
                    conf_score = None
                results.append(Prediction(qid=question_number, aid=doc[self.DOC_ID_FIELD_NAME], rank_score=doc['score'],
                                          conf_score=conf_score))
        else:
            self.logger.warn('Empty response: %s' % vars(response))
        return results
项目:retrieve-and-rank-tuning    作者:rchaks    | 项目源码 | 文件源码
def _parse_response_content_for_predictions(self, question_number, response):
        """
        Parses the json representation of the docs from the HTTP response and returns it as list predictions
            with scores (and confidences if they exist)
        :param str question_number: used as qid
        :param requests.Response response:
        :return: list of feature vectors
        :rtype: list(list(str))
        """
        response_content = response.json()
        results = []
        if response_content["matching_results"] > 0:
            for doc in response_content['results']:
                results.append(Prediction(qid=question_number, aid=doc[DOC_ID_FIELD_NAME], rank_score=doc['score'],
                                          conf_score=None))
        else:
            self.logger.warn('Empty response: %s' % vars(response))
        return results
项目:onedrive-e    作者:tobecontinued    | 项目源码 | 文件源码
def get(self, url, params=None, headers=None, ok_status_code=requests.codes.ok, auto_renew=True):
        """
        Perform a HTTP GET request.
        :param str url: URL of the HTTP request.
        :param dict[str, T] | None params: (Optional) Dictionary to construct query string.
        :param dict | None headers: (Optional) Additional headers for the HTTP request.
        :param int ok_status_code: (Optional) Expected status code for the HTTP response.
        :param True | False auto_renew: (Optional) If True, auto recover from expired token error or Internet failure.
        :rtype: requests.Response
        """
        args = {'proxies': self.proxies}
        if params is not None:
            args['params'] = params
        if headers is not None:
            args['headers'] = headers
        return self.request('get', url, args, ok_status_code=ok_status_code, auto_renew=auto_renew)
项目:onedrive-e    作者:tobecontinued    | 项目源码 | 文件源码
def post(self, url, data=None, json=None, headers=None, ok_status_code=requests.codes.ok, auto_renew=True):
        """
        Perform a HTTP POST request.
        :param str url: URL of the HTTP request.
        :param dict | None data: (Optional) Data in POST body of the request.
        :param dict | None json: (Optional) Send the dictionary as JSON content in POST body and set proper headers.
        :param dict | None headers: (Optional) Additional headers for the HTTP request.
        :param int ok_status_code: (Optional) Expected status code for the HTTP response.
        :param True | False auto_renew: (Optional) If True, auto recover from expired token error or Internet failure.
        :rtype: requests.Response
        """
        params = {
            'proxies': self.proxies
        }
        if json is not None:
            params['json'] = json
        else:
            params['data'] = data
        if headers is not None:
            params['headers'] = headers
        return self.request('post', url, params, ok_status_code=ok_status_code, auto_renew=auto_renew)
项目:OpenStackClient_VBS    作者:Huawei    | 项目源码 | 文件源码
def mixin_request_id(self, resp):
        """mixin request id from request response

        :param request.Response resp: http response
        """
        if isinstance(resp, Response):
            # Extract 'X-Openstack-Request-Id' from headers if
            # response is a Response object.
            request_id = (resp.headers.get('openstack-request-id') or
                          resp.headers.get('x-openstack-request-id') or
                          resp.headers.get('x-compute-request-id'))
        else:
            # If resp is of type string or None.
            request_id = resp

        self.request_id = request_id
项目:python-bileanclient    作者:openstack    | 项目源码 | 文件源码
def __init__(self, data):
        super(TestResponse, self).__init__()
        self._content_consumed = True
        if isinstance(data, dict):
            self.status_code = data.get('status_code', 200)
            # Fake the text attribute to streamline Response creation
            text = data.get('text', "")
            if isinstance(text, (dict, list)):
                self._content = json.dumps(text)
                default_headers = {
                    "Content-Type": "application/json",
                }
            else:
                self._content = text
                default_headers = {}
            if six.PY3 and isinstance(self._content, six.string_types):
                self._content = self._content.encode('utf-8', 'strict')
            self.headers = data.get('headers') or default_headers
        else:
            self.status_code = data
项目:Dshield    作者:ywjt    | 项目源码 | 文件源码
def test_request_retry(self, mock_request):
        """Tests that two connection errors will be handled"""

        class CustomMock(object):
            i = 0

            def connection_error(self, *args, **kwargs):
                self.i += 1

                if self.i < 3:
                    raise requests.exceptions.ConnectionError
                else:
                    r = requests.Response()
                    r.status_code = 204
                    return r

        mock_request.side_effect = CustomMock().connection_error

        cli = InfluxDBClient(database='db')
        cli.write_points(
            self.dummy_points
        )
项目:Dshield    作者:ywjt    | 项目源码 | 文件源码
def test_request_retry_raises(self, mock_request):
        """Tests that three connection errors will not be handled"""

        class CustomMock(object):
            i = 0

            def connection_error(self, *args, **kwargs):
                self.i += 1

                if self.i < 4:
                    raise requests.exceptions.ConnectionError
                else:
                    r = requests.Response()
                    r.status_code = 200
                    return r

        mock_request.side_effect = CustomMock().connection_error

        cli = InfluxDBClient(database='db')

        with self.assertRaises(requests.exceptions.ConnectionError):
            cli.write_points(self.dummy_points)
项目:Dshield    作者:ywjt    | 项目源码 | 文件源码
def test_request_retry_raises(self, mock_request):
        """Tests that three connection errors will not be handled"""

        class CustomMock(object):
            i = 0

            def connection_error(self, *args, **kwargs):
                self.i += 1

                if self.i < 4:
                    raise requests.exceptions.ConnectionError
                else:
                    r = requests.Response()
                    r.status_code = 200
                    return r

        mock_request.side_effect = CustomMock().connection_error

        cli = InfluxDBClient(database='db')

        with self.assertRaises(requests.exceptions.ConnectionError):
            cli.write_points(self.dummy_points)
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def test_add_channel_failed_create_channel(mock_staff_client, mocker):
    """If client.channels.create fails an exception should be raised"""
    response_500 = Response()
    response_500.status_code = statuses.HTTP_500_INTERNAL_SERVER_ERROR
    mock_staff_client.channels.create.return_value.raise_for_status.side_effect = HTTPError(response=response_500)

    with pytest.raises(ChannelCreationException) as ex:
        api.add_channel(
            Search.from_dict({}),
            "title",
            "name",
            "public_description",
            "channel_type",
            123,
            456,
        )
    assert ex.value.args[0] == "Error creating channel name"
    mock_staff_client.channels.create.return_value.raise_for_status.assert_called_with()
    assert mock_staff_client.channels.create.call_count == 1
    assert PercolateQuery.objects.count() == 0
    assert Channel.objects.count() == 0
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def test_send_batch_400_no_raise(self, mock_post):
        """
        Test that if raise_for_status is False we don't raise an exception for a 400 response
        """
        mock_post.return_value = Mock(
            spec=Response,
            status_code=HTTP_400_BAD_REQUEST,
            json=mocked_json()
        )

        chunk_size = 10
        recipient_tuples = [("{0}@example.com".format(letter), None) for letter in string.ascii_letters]
        assert len(recipient_tuples) == 52
        with override_settings(
            MAILGUN_RECIPIENT_OVERRIDE=None,
        ):
            resp_list = MailgunClient.send_batch(
                'email subject', 'email body', recipient_tuples, chunk_size=chunk_size, raise_for_status=False
            )

        assert len(resp_list) == 6
        for resp in resp_list:
            assert resp.status_code == HTTP_400_BAD_REQUEST
        assert mock_post.call_count == 6
        assert mock_post.return_value.raise_for_status.called is False
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def test_financial_aid_email_error(self, raise_for_status, mock_post):
        """
        Test that send_financial_aid_email handles errors correctly
        """
        mock_post.return_value = Mock(
            spec=Response,
            status_code=HTTP_400_BAD_REQUEST,
            json=mocked_json(),
        )

        response = MailgunClient.send_financial_aid_email(
            self.staff_user_profile.user,
            self.financial_aid,
            'email subject',
            'email body',
            raise_for_status=raise_for_status,
        )

        assert response.raise_for_status.called is raise_for_status
        assert response.status_code == HTTP_400_BAD_REQUEST
        assert response.json() == {}
项目:python-kingbirdclient    作者:openstack    | 项目源码 | 文件源码
def __init__(self, data):
        super(TestResponse, self).__init__()
        self._content_consumed = True
        if isinstance(data, dict):
            self.status_code = data.get('status_code', 200)
            # Fake the text attribute to streamline Response creation
            text = data.get('text', "")
            if isinstance(text, (dict, list)):
                self._content = json.dumps(text)
                default_headers = {
                    "Content-Type": "application/json",
                }
            else:
                self._content = text
                default_headers = {}
            if six.PY3 and isinstance(self._content, six.string_types):
                self._content = self._content.encode('utf-8', 'strict')
            self.headers = data.get('headers') or default_headers
        else:
            self.status_code = data
项目:fbbotw    作者:JoabMendes    | 项目源码 | 文件源码
def post_settings(greeting_text):
    """ Sets the START Button and also the Greeting Text.
        The payload for the START Button will be 'USER_START'

    :param str greeting_text: Desired Greeting Text (160 chars).
    :return: `Response object <http://docs.python-requests.org/en/\
    master/api/#requests.Response>`_
    """
    # Set the greeting texts
    url = TD_STS_URL + PAGE_ACCESS_TOKEN
    txtpayload = {}
    txtpayload['setting_type'] = 'greeting'
    txtpayload['greeting'] = {'text': greeting_text}
    response_msg = json.dumps(txtpayload)
    status = requests.post(url, headers=HEADER, data=response_msg)
    # Set the start button
    btpayload = {}
    btpayload['setting_type'] = 'call_to_actions'
    btpayload['thread_state'] = 'new_thread'
    btpayload['call_to_actions'] = [{'payload': 'USER_START'}]
    response_msg = json.dumps(btpayload)
    status = requests.post(url, headers=HEADER, data=response_msg)
    return status
项目:fbbotw    作者:JoabMendes    | 项目源码 | 文件源码
def post_start_button(payload='START'):
    """ Sets the Thread Settings Greeting Text
    (/docs/messenger-platform/thread-settings/get-started-button).

    :param str payload: Desired postback payload (Default START).
    :return: `Response object <http://docs.python-requests.org/en/\
    master/api/#requests.Response>`_
    """
    url = TD_STS_URL + PAGE_ACCESS_TOKEN
    btpayload = {}
    btpayload["setting_type"] = "call_to_actions"
    btpayload["thread_state"] = "new_thread"
    btpayload["call_to_actions"] = [{"payload": payload}]
    response_msg = json.dumps(btpayload)
    status = requests.post(url, headers=HEADER, data=response_msg)
    return status
项目:fbbotw    作者:JoabMendes    | 项目源码 | 文件源码
def post_domain_whitelisting(whitelisted_domains, domain_action_type='add'):
    """ Sets the whistelisted domains for the Messenger Extension
    (/docs/messenger-platform/thread-settings/domain-whitelisting).

    :param list whistelisted_domains: Domains to be whistelisted.
    :param str domain_action_type: Action to run `add/remove` (Defaut add).
    :return: `Response object <http://docs.python-requests.org/en/\
    master/api/#requests.Response>`_
    """
    url = TD_STS_URL + PAGE_ACCESS_TOKEN
    payload = {}
    payload['setting_type'] = 'domain_whitelisting'
    payload['whitelisted_domains'] = whitelisted_domains
    payload['domain_action_type'] = domain_action_type
    data = json.dumps(payload)
    status = requests.post(url, headers=HEADER, data=data)
    return status
项目:fbbotw    作者:JoabMendes    | 项目源码 | 文件源码
def typing(fbid, sender_action):
    """ Displays/Hides the typing gif/mark seen on facebook chat
    (/docs/messenger-platform/send-api-reference/sender-actions)

    :param str fbid: User id to display action.
    :param str sender_action: `typing_off/typing_on/mark_seen`.
    :return: `Response object <http://docs.python-requests.org/en/\
    master/api/#requests.Response>`_
    """
    url = MSG_URL + PAGE_ACCESS_TOKEN
    payload = {}
    payload['recipient'] = {'id': fbid}
    payload['sender_action'] = sender_action
    data = json.dumps(payload)
    status = requests.post(url, headers=HEADER, data=data)
    return status


# Send API Content Type
项目:fbbotw    作者:JoabMendes    | 项目源码 | 文件源码
def post_text_message(fbid, message):
    """ Sends a common text message
    (/docs/messenger-platform/send-api-reference/text-message)

    :param str fbid: User id to send the text.
    :param str message: Text to be displayed for the user (230 chars).
    :return: `Response object <http://docs.python-requests.org/en/\
    master/api/#requests.Response>`_
    """
    url = MSG_URL + PAGE_ACCESS_TOKEN
    payload = {}
    payload['recipient'] = {'id': fbid}
    payload['message'] = {'text': message}  # Limit 320 chars
    data = json.dumps(payload)
    status = requests.post(url, headers=HEADER, data=data)
    return status
项目:fbbotw    作者:JoabMendes    | 项目源码 | 文件源码
def post_attachment(fbid, media_url, file_type):
    """ Sends a media attachment
    (/docs/messenger-platform/send-api-reference/contenttypes)

    :param str fbid: User id to send the audio.
    :param str url: Url of a hosted media.
    :param str type: image/audio/video/file.
    :return: `Response object <http://docs.python-requests.org/en/\
    master/api/#requests.Response>`_
    """
    url = MSG_URL + PAGE_ACCESS_TOKEN
    payload = {}
    payload['recipient'] = {'id': fbid}
    attachment = {"type": file_type, "payload": {"url": media_url}}
    payload['message'] = {"attachment": attachment}
    data = json.dumps(payload)
    status = requests.post(url, headers=HEADER, data=data)
    return status
项目:fbbotw    作者:JoabMendes    | 项目源码 | 文件源码
def post_start_button(payload='START'):
    """ Sets the **Get Started button**.

    :usage:

        >>> payload = 'GET_STARTED'
        >>> response = fbbotw.post_start_button(payload=payload)
    :param str payload: Desired postback payload (Default 'START').
    :return: `Response object <http://docs.python-requests.org/en/\
    master/api/#requests.Response>`_
    :facebook docs: `/get-started-button <https://developers.facebook\
    .com/docs/messenger-platform/messenger-profile/get-started-button>`_
    """
    url = MESSENGER_PROFILE_URL.format(access_token=PAGE_ACCESS_TOKEN)
    payload_data = {}
    payload_data['get_started'] = {'payload': payload}
    data = json.dumps(payload_data)
    status = requests.post(url, headers=HEADER, data=data)
    return status
项目:fbbotw    作者:JoabMendes    | 项目源码 | 文件源码
def post_domain_whitelist(whitelisted_domains):
    """ Sets the whistelisted domains for the Messenger Extension

    :usage:

        >>> # Define the array of domains to be whitelisted (Max 10)
        >>> whistelisted_domains = [
                "https://myfirst_domain.com",
                "https://another_domain.com"
            ]
        >>> fbbotw.post_domain_whitelist(
                whitelisted_domains=whitelisted_domains
            )
    :param list whistelisted_domains: Domains to be whistelisted (Max 10).
    :return: `Response object <http://docs.python-requests.org/en/\
    master/api/#requests.Response>`_
    :facebook docs: `/domain-whitelisting <https://developers.facebook.\
    com/docs/messenger-platform/messenger-profile/domain-whitelisting>`_
    """
    url = MESSENGER_PROFILE_URL.format(access_token=PAGE_ACCESS_TOKEN)
    payload = {}
    payload['whitelisted_domains'] = whitelisted_domains
    data = json.dumps(payload)
    status = requests.post(url, headers=HEADER, data=data)
    return status
项目:fbbotw    作者:JoabMendes    | 项目源码 | 文件源码
def post_account_linking_url(account_linking_url):
    """ Sets the **liking_url** to connect the user with your business login

    :usage:

        >>> my_callback_linking_url = "https://my_business_callback.com"
        >>> response = fbbotw.post_account_linking_url(
                account_linking_url=my_callback_linking_url
            )
    :param str account_linking_url: URL to the account linking OAuth flow.
    :return: `Response object <http://docs.python-requests.org/en/\
    master/api/#requests.Response>`_
    :facebook docs: `/account-linking-url <https://developers.facebook.\
    com/docs/messenger-platform/messenger-profile/account-linking-url>`_
    """
    url = MESSENGER_PROFILE_URL.format(access_token=PAGE_ACCESS_TOKEN)
    payload = {}
    payload['account_linking_url'] = account_linking_url
    data = json.dumps(payload)
    status = requests.post(url, headers=HEADER, data=data)
    return status
项目:Treehacks    作者:andrewsy97    | 项目源码 | 文件源码
def response(cls, r):
        """Extract the data from the API response *r*.

        This method essentially strips the actual response of the envelope
        while raising an :class:`~errors.ApiError` if it contains one or more
        errors.

        :param r: the HTTP response from an API call
        :type r: :class:`requests.Response`
        :returns: API response data
        :rtype: json
        """
        try:
            data = r.json()
        except ValueError:
            raise errors.ApiError(r)
        if data['meta'].get("errors"):
            raise errors.ApiError(data['meta'])
        return data["response"]
项目:armada    作者:att-comdev    | 项目源码 | 文件源码
def post(self, endpoint, query=None, body=None, data=None):
        """
        Send a POST request to armada. If both body and data are specified,
        body will will be used.

        :param string endpoint: URL string following hostname and API prefix
        :param dict query: dict of k, v parameters to add to the query string
        :param string body: string to use as the request body.
        :param data: Something json.dumps(s) can serialize.
        :return: A requests.Response object
        """
        api_url = '{}{}'.format(self.base_url, endpoint)

        self.logger.debug("Sending POST with armada_client session")
        if body is not None:
            self.logger.debug("Sending POST with explicit body: \n%s" % body)
            resp = self._session.post(
                api_url, params=query, data=body, timeout=3600)
        else:
            self.logger.debug("Sending POST with JSON body: \n%s" % str(data))
            resp = self._session.post(
                api_url, params=query, json=data, timeout=3600)

        return resp
项目:drydock    作者:att-comdev    | 项目源码 | 文件源码
def post(self, endpoint, query=None, body=None, data=None):
        """
        Send a POST request to Drydock. If both body and data are specified,
        body will will be used.

        :param string endpoint: The URL string following the hostname and API prefix
        :param dict query: A dict of k, v parameters to add to the query string
        :param string body: A string to use as the request body. Will be treated as raw
        :param data: Something json.dumps(s) can serialize. Result will be used as the request body
        :return: A requests.Response object
        """

        self.logger.debug("Sending POST with drydock_client session")
        if body is not None:
            self.logger.debug("Sending POST with explicit body: \n%s" % body)
            resp = self.__session.post(
                self.base_url + endpoint, params=query, data=body, timeout=10)
        else:
            self.logger.debug("Sending POST with JSON body: \n%s" % str(data))
            resp = self.__session.post(
                self.base_url + endpoint, params=query, json=data, timeout=10)

        return resp
项目:directory-ui-buyer    作者:uktrade    | 项目源码 | 文件源码
def test_store_companies_house_profile_in_session_saves_in_session(
    mock_retrieve_profile, client
):
    data = {
        'date_of_creation': '2000-10-10',
        'company_name': 'Example corp',
        'company_status': 'active',
        'company_number': '01234567',
        'registered_office_address': {'foo': 'bar'}
    }
    response = Response()
    response.status_code = http.client.OK
    response.json = lambda: data
    session = client.session
    mock_retrieve_profile.return_value = response

    helpers.store_companies_house_profile_in_session(session, '01234567')

    mock_retrieve_profile.assert_called_once_with(number='01234567')
    assert session[helpers.COMPANIES_HOUSE_PROFILE_SESSION_KEY] == data
    assert session.modified is True
项目:aryas    作者:lattkkthxbbye    | 项目源码 | 文件源码
def telljoke(self, ctx: commands.Context) -> None:
        """
        Responds with a random joke from theoatmeal.com
        """
        # TODO: get more joke formats (or a better source)
        # Send typing as the request can take some time.
        await self.bot.send_typing(ctx.message.channel)

        # Build a new request object
        req: grequests.AsyncRequest = grequests.get('http://theoatmeal.com/djtaf/', timeout=1)
        # Send new request
        res: List[requests.Response] = grequests.map([req], exception_handler=self.request_exception_handler)

        # Since we only have one request we can just fetch the first one.
        # res[0].content is the html in bytes
        soup = BeautifulSoup(res[0].content.decode(res[0].encoding), 'html.parser')
        joke_ul = soup.find(id='joke_0')
        joke = joke_ul.find('h2', {'class': 'part1'}).text.lstrip() + '\n' + joke_ul.find(id='part2_0').text
        await self.bot.say(joke)
项目:aryas    作者:lattkkthxbbye    | 项目源码 | 文件源码
def randomfact(self, ctx: commands.Context) -> None:
        """
        Responds with a random fact scraped from unkno.com
        """
        # Send typing as the request can take some time.
        await self.bot.send_typing(ctx.message.channel)

        # Build a new request object
        req: grequests.AsyncRequest = grequests.get('http://www.unkno.com/', timeout=1)
        # Send new request
        res: List[requests.Response] = grequests.map([req], exception_handler=self.request_exception_handler)

        # Since we only have one request we can just fetch the first one.
        # res[0].content is the html in bytes
        soup = BeautifulSoup(res[0].content.decode(res[0].encoding), 'html.parser')
        await self.bot.say(soup.find(id='content').text)
项目:pact-broker-client    作者:Babylonpartners    | 项目源码 | 文件源码
def test_push_pact(mock_put):
    broker_client = BrokerClient(broker_url=settings.PACT_BROKER_URL)

    mocked_response = Response()
    mocked_response.status_code = CREATED
    mock_put.return_value = mocked_response

    with open(PACT_FILE_PATH) as data_file:
        pact = json.load(data_file)

    broker_client.push_pact(
        provider=PROVIDER,
        consumer=CONSUMER,
        pact_file=PACT_FILE_PATH,
        consumer_version=CONSUMER_VERSION
    )[0]

    mock_put.assert_called_once_with(
        EXPECTED_PUSH_PACT_URL,
        json=pact,
        auth=None
    )
项目:pact-broker-client    作者:Babylonpartners    | 项目源码 | 文件源码
def test_tag_consumer(mock_put):
    broker_client = BrokerClient(broker_url=settings.PACT_BROKER_URL)

    mocked_response = Response()
    mocked_response.status_code = CREATED
    mock_put.return_value = mocked_response

    broker_client.tag_consumer(
        provider=PROVIDER,
        consumer=CONSUMER,
        consumer_version=CONSUMER_VERSION,
        tag=TAG
    )[0]

    mock_put.assert_called_once_with(
        EXPECTED_TAG_CONSUMER_URL,
        headers={'Content-Type': 'application/json'},
        auth=None
    )
项目:kuberdock-platform    作者:cloudlinux    | 项目源码 | 文件源码
def send(self, req, **kwargs):

        resp = requests.Response()
        try:
            resp.raw = urlopen(req.url)
            content_length = resp.raw.headers.get('content-length')
            if content_length is not None:
                resp.headers['Content-Length'] = content_length
        except IOError as e:
            if e.errno == errno.EACCES:
                resp.status_code = requests.codes.forbidden
            elif e.errno == errno.ENOENT:
                resp.status_code = requests.codes.not_found
            else:
                resp.status_code = requests.codes.bad_request
        except OSError:
                resp.status_code = requests.codes.bad_request
        else:
            resp.status_code = requests.codes.ok

        return resp
项目:ci-diff-helper    作者:dhermes    | 项目源码 | 文件源码
def test_it(self):
        import sys
        import mock
        import requests
        from ci_diff_helper import _github

        response = requests.Response()
        remaining = '17'
        response.headers[_github._RATE_REMAINING_HEADER] = remaining
        rate_limit = '60'
        response.headers[_github._RATE_LIMIT_HEADER] = rate_limit
        rate_reset = '1475953149'
        response.headers[_github._RATE_RESET_HEADER] = rate_reset
        with mock.patch('six.print_') as mocked:
            self._call_function_under_test(response)
            msg = _github._RATE_LIMIT_TEMPLATE.format(
                remaining, rate_limit, rate_reset)
            self.assertEqual(mocked.call_count, 2)
            mocked.assert_any_call(msg, file=sys.stderr)
            mocked.assert_any_call(_github._GH_ENV_VAR_MSG,
                                   file=sys.stderr)
项目:PowerSpikeGG    作者:PowerSpikeGG    | 项目源码 | 文件源码
def test_match_not_found(self):
        """Check if the server send an empty match if not found."""
        self.service.cache_manager.find_match.return_value = None
        self.service.riot_api_handler.get_match.side_effect = LoLException(
            "404", requests.Response())

        response = self.stub.Match(service_pb2.MatchRequest(id=4242,
            region=constants_pb2.EUW))
        self.assertEqual(response, match_pb2.MatchReference())
项目:networking-opencontrail    作者:openstack    | 项目源码 | 文件源码
def test_request_api_server_authorized(self, options, config, request):
        driver = self._get_driver(options, config)
        driver._apiinsecure = True
        response = requests.Response()
        response.status_code = requests.codes.ok
        request.return_value = response
        url = "/URL"

        result = driver._request_api_server(url)

        self.assertEqual(response, result)

        request.assert_called_with(url, data=None, headers=mock.ANY,
                                   verify=False)
项目:networking-opencontrail    作者:openstack    | 项目源码 | 文件源码
def test_request_api_server_auth_failed(self, options, config, request):
        driver = self._get_driver(options, config)
        driver._apiinsecure = False
        driver._use_api_certs = False
        response = requests.Response()
        response.status_code = requests.codes.unauthorized
        request.return_value = response
        url = "/URL"

        self.assertRaises(RuntimeError, driver._request_api_server, url)
项目:networking-opencontrail    作者:openstack    | 项目源码 | 文件源码
def test_request_api_server_authn(self, options, config, request):
        driver = self._get_driver(options, config)
        driver._apiinsecure = True
        response = requests.Response()
        response.status_code = requests.codes.ok
        request.return_value = response
        url = "/URL"

        result = driver._request_api_server_authn(url)

        self.assertEqual(response, result)

        request.assert_called_with(url, data=None, headers=mock.ANY,
                                   verify=False)
项目:networking-opencontrail    作者:openstack    | 项目源码 | 文件源码
def test_vnc_api_is_authenticated_unauthorized(self, config, request):
        config.APISERVER = mock.MagicMock()
        config.APISERVER.use_ssl = False
        config.APISERVER.api_server_ip = "localhost"
        config.APISERVER.api_server_port = "8082"
        response = requests.Response()
        response.status_code = requests.codes.unauthorized
        request.return_value = response

        result = utils.vnc_api_is_authenticated()

        request.assert_called_with(mock.ANY)
        self.assertEqual(True, result)
项目:networking-opencontrail    作者:openstack    | 项目源码 | 文件源码
def test_vnc_api_is_authenticated_invalid(self, config, request):
        config.APISERVER = mock.MagicMock()
        config.APISERVER.use_ssl = True
        config.APISERVER.api_server_ip = "localhost"
        config.APISERVER.api_server_port = "8082"
        response = requests.Response()
        response.status_code = requests.codes.server_error
        request.return_value = response

        self.assertRaises(requests.exceptions.HTTPError,
                          utils.vnc_api_is_authenticated)

        request.assert_called_with(mock.ANY)
项目:presto-python-client    作者:prestodb    | 项目源码 | 文件源码
def process(self, http_response):
        # type: (requests.Response) -> PrestoStatus
        if not http_response.ok:
            self.raise_response_error(http_response)

        http_response.encoding = 'utf-8'
        response = http_response.json()
        logger.debug('HTTP {}: {}'.format(http_response.status_code, response))
        if 'error' in response:
            raise self._process_error(response['error'])

        if constants.HEADER_CLEAR_SESSION in http_response.headers:
            for prop in get_header_values(
                response.headers,
                constants.HEADER_CLEAR_SESSION,
            ):
                self._client_session.properties.pop(prop, None)

        if constants.HEADER_SET_SESSION in http_response.headers:
            for key, value in get_session_property_values(
                    response.headers,
                    constants.HEADER_SET_SESSION,
                ):
                self._client_session.properties[key] = value

        self._next_uri = response.get('nextUri')

        return PrestoStatus(
            id=response['id'],
            stats=response['stats'],
            info_uri=response['infoUri'],
            next_uri=self._next_uri,
            rows=response.get('data', []),
            columns=response.get('columns'),
        )