Python requests 模块,response() 实例源码

我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用requests.response()

项目:microservices    作者:viatoriche    | 项目源码 | 文件源码
def request(self, method, *resources, **kwargs):
        method = method.upper()
        response_key = kwargs.pop('response_key', None)
        key = kwargs.pop('key', None)
        if key is not None:
            response_key = key
        query = kwargs.pop('query', None)
        data = kwargs.pop('data', None)
        fragment = kwargs.pop('fragment', '')
        params = kwargs.pop('params', '')
        keep_blank_values = kwargs.pop('keep_blank_values', None)
        timeout = kwargs.pop('timeout', 60)
        resource = self.build_resource(resources)
        content_type = kwargs.pop('content_type', 'json')
        if data is not None:
            if 'json' in content_type:
                kwargs['json'] = data
            if content_type == 'body':
                kwargs['data'] = data
        url = self.url_for(resource, query, params=params,
                           fragment=fragment,
                           keep_blank_values=keep_blank_values)
        self.logger.info('Request %s for %s', method, url)
        response = requests.request(method, url, timeout=timeout, **kwargs)
        return self.handle_response(response, response_key=response_key)
项目:python-cozify    作者:Artanicus    | 项目源码 | 文件源码
def emaillogin(email, otp):
    """Raw Cloud API call, request cloud token with email address & OTP.

    Args:
        email(str): Email address connected to Cozify account.
        otp(int): One time passcode.

    Returns:
        str: cloud token
    """

    payload = {
            'email': email,
            'password': otp
    }

    response = requests.post(cloudBase + 'user/emaillogin', params=payload)
    if response.status_code == 200:
        return response.text
    else:
        raise APIError(response.status_code, response.text)
项目:python-cozify    作者:Artanicus    | 项目源码 | 文件源码
def hubkeys(cloud_token):
    """1:1 implementation of user/hubkeys

    Args:
        cloud_token(str) Cloud remote authentication token.

    Returns:
        dict: Map of hub_id: hub_token pairs.
    """
    headers = {
            'Authorization': cloud_token
    }
    response = requests.get(cloudBase + 'user/hubkeys', headers=headers)
    if response.status_code == 200:
        return json.loads(response.text)
    else:
        raise APIError(response.status_code, response.text)
项目:python-cozify    作者:Artanicus    | 项目源码 | 文件源码
def refreshsession(cloud_token):
    """1:1 implementation of user/refreshsession

    Args:
        cloud_token(str) Cloud remote authentication token.

    Returns:
        str: New cloud remote authentication token. Not automatically stored into state.
    """
    headers = {
            'Authorization': cloud_token
    }
    response = requests.get(cloudBase + 'user/refreshsession', headers=headers)
    if response.status_code == 200:
        return response.text
    else:
        raise APIError(response.status_code, response.text)
项目:gaia-on-tap    作者:andycasey    | 项目源码 | 文件源码
def cone_search(ra, dec, radius, table="gaiadr1.gaia_source", **kwargs):
    """
    Perform a cone search against the ESA Gaia database using the TAP.

    :param ra:
        Right ascension (degrees).

    :param dec:
        Declination (degrees).

    :param radius:
        Cone search radius (degrees).

    :param table: [optional]
        The table name to perform the cone search on. Some examples are:

        gaiadr1.gaia_source
        gaiadr1.tgas_source

    :param kwargs:
        Keyword arguments are passed directly to the `query` method.

    :returns:
        The data returned by the ESA/Gaia archive -- either as an astropy
        table or as a dictionary -- and optionally, the `requests.response`
        object used.
    """

    return query(
        """ SELECT * 
        FROM {table} 
        WHERE CONTAINS(
            POINT('ICRS',{table}.ra,{table}.dec),
            CIRCLE('ICRS',{ra:.10f},{dec:.10f},{radius:.10f})) = 1;""".format(
            table=table, ra=ra, dec=dec, radius=radius), **kwargs)
项目:microservices    作者:viatoriche    | 项目源码 | 文件源码
def __init__(self, response, description, *args, **kwargs):
        """Exception

        exception instance has:
            response, description, content and status_code

        :param response: requests.response
        :param description: str - description for error
        """
        self.response = response
        self.description = description
        self.status_code = response.status_code
        self.content = response.content
        super(ResponseError, self).__init__(*args, **kwargs)
项目:microservices    作者:viatoriche    | 项目源码 | 文件源码
def __repr__(self):  # pragma: no cover
        return 'Error status code: {}. Description: {}'.format(
            self.response.status_code, self.description)
项目:microservices    作者:viatoriche    | 项目源码 | 文件源码
def handle_response(self, response, response_key=None):
        """Handler for response object

        :param response: requests.response obj
        :param response_key: key for dict in response obj
        :return object, result for response, python obj
        """
        status_code = response.status_code
        try:
            result = response.json()
        except Exception as e:
            self.logger.exception(e)
            raise ResponseError(response, e)

        if result:
            if response_key is not None and status_code in self.ok_statuses:
                if response_key in result:
                    result = result[response_key]
                else:
                    raise ResponseError(response, 'Response key not found!')
            elif response_key is not None and status_code in self.to_none_statuses:
                result = None
            elif status_code not in self.ok_statuses and status_code not in self.to_none_statuses:
                raise ResponseError(response,
                                    'Status code {} not in ok_statuses {}'.format(
                                        status_code, self.ok_statuses))
        if response_key is not None and self.empty_to_none and result is not None and not result:
            result = None

        return result
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def make_402_payment(self, response, max_price):
        """Payment handling method implemented by a BitRequests subclass.

        Args:
            response (requests.response): 402 response from the API server.
            max_price (int): maximum allowed price for a request (in satoshi).

        Returns:
            headers (dict):
                dict of headers with payment data to send to the
                API server to inform payment status for the resource.
        """
        raise NotImplementedError()
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def make_402_payment(self, response, max_price):
        """Make an on-chain payment."""
        # Retrieve payment headers
        headers = response.headers
        price = headers.get(OnChainRequests.HTTP_BITCOIN_PRICE)
        payee_address = headers.get(OnChainRequests.HTTP_BITCOIN_ADDRESS)

        # Verify that the payment method is supported
        if price is None or payee_address is None:
            raise UnsupportedPaymentMethodError(
                'Resource does not support that payment method.')

        # Convert string headers into correct data types
        price = int(price)

        # Verify resource cost against our budget
        if max_price and price > max_price:
            max_price_err = 'Resource price ({}) exceeds max price ({}).'
            raise ResourcePriceGreaterThanMaxPriceError(max_price_err.format(price, max_price))

        # Create the signed transaction
        onchain_payment = self.wallet.make_signed_transaction_for(
            payee_address, price, use_unconfirmed=True)[0].get('txn').to_hex()
        return_address = self.wallet.current_address
        logger.debug('[OnChainRequests] Signed transaction: {}'.format(
            onchain_payment))

        return {
            'Bitcoin-Transaction': onchain_payment,
            'Return-Wallet-Address': return_address,
            OnChainRequests.HTTP_BITCOIN_PRICE: str(price),
            OnChainRequests.HTTP_PAYER_21USERNAME: urllib.parse.quote(self.username) if self.username else None
        }
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def get_402_info(self, url):
        """Get channel payment information about the resource."""
        response = requests.get(url)
        price = response.headers.get(ChannelRequests.HTTP_BITCOIN_PRICE)
        channel_url = response.headers.get(ChannelRequests.HTTP_BITCOIN_PAYMENT_CHANNEL_SERVER)
        return {ChannelRequests.HTTP_BITCOIN_PRICE: price,
                ChannelRequests.HTTP_BITCOIN_PAYMENT_CHANNEL_SERVER: channel_url}
项目:python-cozify    作者:Artanicus    | 项目源码 | 文件源码
def requestlogin(email):
    """Raw Cloud API call, request OTP to be sent to account email address.

    Args:
        email(str): Email address connected to Cozify account.
    """

    payload = { 'email': email }
    response = requests.post(cloudBase + 'user/requestlogin', params=payload)
    if response.status_code is not 200:
        raise APIError(response.status_code, response.text)
项目:python-cozify    作者:Artanicus    | 项目源码 | 文件源码
def lan_ip():
    """1:1 implementation of hub/lan_ip

    This call will fail with an APIError if the requesting source address is not the same as that of the hub, i.e. if they're not in the same NAT network.
    The above is based on observation and may only be partially true.

    Returns:
        list: List of Hub ip addresses.
    """
    response = requests.get(cloudBase + 'hub/lan_ip')
    if response.status_code == 200:
        return json.loads(response.text)
    else:
        raise APIError(response.status_code, response.text)
项目:deploy-marathon-bluegreen    作者:softonic    | 项目源码 | 文件源码
def _default_is_success(status_code):
    """Returns true if the success status is between [200, 300).

    :param response_status: the http response status
    :type response_status: int
    :returns: True for success status; False otherwise
    :rtype: bool
    """

    return 200 <= status_code < 300
项目:deploy-marathon-bluegreen    作者:softonic    | 项目源码 | 文件源码
def get_auth_scheme(response):
    """Return authentication scheme and realm requested by server for 'Basic'
       or 'acsjwt' (DCOS acs auth) or 'oauthjwt' (DCOS acs oauth) type

    :param response: requests.response
    :type response: requests.Response
    :returns: auth_scheme, realm
    :rtype: (str, str)
    """

    if 'www-authenticate' in response.headers:
        auths = response.headers['www-authenticate'].split(',')
        scheme = next((auth_type.rstrip().lower() for auth_type in auths
                       if auth_type.rstrip().lower().startswith("basic") or
                       auth_type.rstrip().lower().startswith("acsjwt") or
                       auth_type.rstrip().lower().startswith("oauthjwt")),
                      None)
        if scheme:
            scheme_info = scheme.split("=")
            auth_scheme = scheme_info[0].split(" ")[0].lower()
            realm = scheme_info[-1].strip(' \'\"').lower()
            return auth_scheme, realm
        else:
            return None, None
    else:
        return None, None
项目:deploy-marathon-bluegreen    作者:softonic    | 项目源码 | 文件源码
def _get_http_auth(response, url, auth_scheme):
    """Get authentication mechanism required by server

    :param response: requests.response
    :type response: requests.Response
    :param url: parsed request url
    :type url: str
    :param auth_scheme: str
    :type auth_scheme: str
    :returns: AuthBase
    :rtype: AuthBase
    """

    hostname = url.hostname
    username = url.username
    password = url.password

    if 'www-authenticate' in response.headers:
        if auth_scheme not in ['basic', 'acsjwt', 'oauthjwt']:
            msg = ("Server responded with an HTTP 'www-authenticate' field of "
                   "'{}', DCOS only supports 'Basic'".format(
                       response.headers['www-authenticate']))
            raise DCOSException(msg)

        if auth_scheme == 'basic':
            # for basic auth if username + password was present,
            # we'd already be authed by python requests module
            username, password = _get_auth_credentials(username, hostname)
            return HTTPBasicAuth(username, password)
        # dcos auth (acs or oauth)
        else:
            return _get_dcos_auth(auth_scheme, username, password, hostname)
    else:
        msg = ("Invalid HTTP response: server returned an HTTP 401 response "
               "with no 'www-authenticate' field")
        raise DCOSException(msg)
项目:deploy-marathon-bluegreen    作者:softonic    | 项目源码 | 文件源码
def _get_dcos_auth(auth_scheme, username, password, hostname):
    """Get authentication flow for dcos acs auth and dcos oauth

    :param auth_scheme: authentication_scheme
    :type auth_scheme: str
    :param username: username user for authentication
    :type username: str
    :param password: password for authentication
    :type password: str
    :param hostname: hostname for credentials
    :type hostname: str
    :returns: DCOSAcsAuth
    :rtype: AuthBase
    """

    toml_config = util.get_config()
    token = toml_config.get("core.dcos_acs_token")
    if token is None:
        dcos_url = toml_config.get("core.dcos_url")
        if auth_scheme == "acsjwt":
            creds = _get_dcos_acs_auth_creds(username, password, hostname)
        else:
            creds = _get_dcos_oauth_creds(dcos_url)

        verify = _verify_ssl()
        # Silence 'Unverified HTTPS request' and 'SecurityWarning' for bad cert
        if verify is not None:
            silence_requests_warnings()

        url = urllib.parse.urljoin(dcos_url, 'acs/api/v1/auth/login')
        # using private method here, so we don't retry on this request
        # error here will be bubbled up to _request_with_auth
        response = _request('post', url, json=creds, verify=verify)

        if response.status_code == 200:
            token = response.json()['token']
            config.set_val("core.dcos_acs_token", token)

    return DCOSAcsAuth(token)
项目:gaia-on-tap    作者:andycasey    | 项目源码 | 文件源码
def query(query, authenticate=False, json=False, full_output=False, **kwargs):
    """
    Execute a synchronous TAP query to the ESA Gaia database.

    :param query:
        The TAP query to execute.

    :param authenticate: [optional]
        Authenticate with the username and password information stored in the
        config.

    :param json: [optional]
        Return the data in JSON format. If set to False, then the data will be
        returned as an `astropy.table.Table`.

    :param full_output: [optional]
        Return a two-length tuple containing the data and the corresponding
        `requests.response` object.

    :returns:
        The data returned - either as an astropy table or a dictionary (JSON) -
        and optionally, the `requests.response` object used.
    """

    format = "json" if json else "votable"
    params = dict(REQUEST="doQuery", LANG="ADQL", FORMAT=format, query=query)
    params.update(kwargs)

    # Create session.
    session = requests.Session()
    if authenticate:
        utils.login(session)
    response = session.get("{}/tap/sync".format(config.url), params=params)

    if not response.ok:
        raise TAPQueryException(response)

    if json:
        data = response.json()

    else:
        # Take the table contents and return an astropy table.
        data = Table.read(StringIO(response.text), format="votable")

    return (data, response) if full_output else data
项目:microservices    作者:viatoriche    | 项目源码 | 文件源码
def __init__(self, endpoint, ok_statuses=None, to_none_statuses=None,
                 empty_to_none=True, close_slash=True,
                 logger=None, name=None, keep_blank_values=True):
        """Create a client

        :param endpoint: str, ex. http://localhost:5000 or http://localhost:5000/api/
        :param ok_statuses: default - (200, 201, 202, ), status codes for "ok"
        :param to_none_statuses: statuses, for generate None as response, default - (404, )
        :param empty_to_none: boolean, default - True, if True - empty response will be generate None response (empty str, empty list, empty dict)
        :param close_slash: boolean, url += '/', if url.endswith != '/', default - True
        :param logger: logger instance
        :param name: name for client
        :type name: str
        """
        if name is None:
            name = '<client: {}>'.format(endpoint)

        if logger is None:
            logger = get_logger(__name__)

        self.logger = InstanceLogger(self, logger)
        if endpoint.endswith('/'):
            endpoint = endpoint[:-1]
        if ok_statuses is not None:
            self.ok_statuses = ok_statuses
        if to_none_statuses is not None:
            self.to_none_statuses = to_none_statuses
        self.empty_to_none = empty_to_none
        self.close_slash = close_slash
        parsed_url = urlparse.urlparse(endpoint)
        endpoint = self.get_endpoint_from_parsed_url(parsed_url)
        self.keep_blank_values = keep_blank_values
        self.endpoint = endpoint
        self.path = parsed_url.path
        self.query = urlparse.parse_qs(parsed_url.query,
                                       keep_blank_values=self.keep_blank_values)
        self.fragment = parsed_url.fragment
        self.params = parsed_url.params
        self.name = name
        self.logger.debug(
            'Client built, endpoint: "%s", path: "%s", query: %s, params: %s, fragment: %s',
            self.endpoint, self.path,
            self.query, self.params, self.fragment)
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def request(self, method, url, max_price=None, mock_requests=False, **kwargs):
        """Make a 402 request for a resource.

        This is the BitRequests public method that should be used to complete a
        402 request using the desired payment method (as constructed by a class
        implementing BitRequests)

        Args:
            method (string): HTTP method for completing the request in lower-
                case letters. Examples: 'get', 'post', 'put'
            url (string): URL of the requested resource.
            data (dict): python dict of parameters to send with the request.
            max_price (int): maximum allowed price for a request (in satoshi).

        Returns:
            response (requests.response):
                response from paying for the requested resource.
        """
        if mock_requests:
            fake_response = requests.models.Response()
            fake_response.status_code = 200
            fake_response._content = b''
            return fake_response

        # Make the initial request for the resource
        response = requests.request(method, url, **kwargs)

        # Return if we receive a status code other than 402: payment required
        if response.status_code != requests.codes.payment_required:
            return response

        # Pass the response to the main method for handling payment
        logger.debug('[BitRequests] 402 payment required: {} satoshi.'.format(
            response.headers['price']))
        payment_headers = self.make_402_payment(response, max_price)

        # Reset the position of any files that have been used
        self._reset_file_positions(kwargs.get('files'), kwargs.get('data'))

        # Add any user-provided headers to the payment headers dict
        if 'headers' in kwargs:
            if isinstance(kwargs['headers'], dict):
                kwargs['headers'].update(payment_headers)
            else:
                raise ValueError('argument \'headers\' must be a dict.')
        else:
            kwargs['headers'] = payment_headers

        paid_response = requests.request(method, url, **kwargs)
        setattr(paid_response, 'amount_paid', int(response.headers['price']))

        if paid_response.status_code == requests.codes.ok:
            logger.debug('[BitRequests] Successfully purchased resource.')
        else:
            logger.debug('[BitRequests] Could not purchase resource.')

        return paid_response
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def make_402_payment(self, response, max_price):
        """Make a bit-transfer payment to the payment-handling service."""
        # Retrieve payment headers
        headers = response.headers
        price = headers.get(BitTransferRequests.HTTP_BITCOIN_PRICE)
        payee_address = headers.get(BitTransferRequests.HTTP_BITCOIN_ADDRESS)
        payee_username = headers.get(BitTransferRequests.HTTP_BITCOIN_USERNAME)

        # Verify that the payment method is supported
        if price is None or payee_address is None or payee_username is None:
            raise UnsupportedPaymentMethodError(
                'Resource does not support that payment method.')

        # Convert string headers into correct data types
        price = int(price)

        # verify that we have the money to purchase the resource
        buffer_balance = self.client.get_earnings()["total_earnings"]
        if price > buffer_balance:
            insuff_funds_err = 'Resource price ({}) exceeds buffer balance ({}).'
            raise InsufficientBalanceError(insuff_funds_err.format(price, buffer_balance))

        # Verify resource cost against our budget
        if max_price and price > max_price:
            max_price_err = 'Resource price ({}) exceeds max price ({}).'
            raise ResourcePriceGreaterThanMaxPriceError(max_price_err.format(price, max_price))

        # Get the signing public key
        pubkey = self.wallet.get_public_key()
        compressed_pubkey = codecs.encode(pubkey.compressed_bytes, 'base64').decode()

        # Create and sign BitTranfer
        bittransfer = json.dumps({
            'payer': self.username,
            'payer_pubkey': compressed_pubkey,
            'payee_address': payee_address,
            'payee_username': payee_username,
            'amount': price,
            'timestamp': time.time(),
            'description': response.url
        })
        if not isinstance(bittransfer, str):
            raise TypeError("Serialized bittransfer must be a string")
        signature = self.wallet.sign_message(bittransfer)
        logger.debug('[BitTransferRequests] Signature: {}'.format(signature))
        logger.debug('[BitTransferRequests] BitTransfer: {}'.format(bittransfer))
        return {
            'Bitcoin-Transfer': bittransfer,
            'Authorization': signature
        }
项目:deploy-marathon-bluegreen    作者:softonic    | 项目源码 | 文件源码
def request(method,
            url,
            is_success=_default_is_success,
            timeout=None,
            verify=None,
            **kwargs):
    """Sends an HTTP request. If the server responds with a 401, ask the
    user for their credentials, and try request again (up to 3 times).

    :param method: method for the new Request object
    :type method: str
    :param url: URL for the new Request object
    :type url: str
    :param is_success: Defines successful status codes for the request
    :type is_success: Function from int to bool
    :param timeout: request timeout
    :type timeout: int
    :param verify: whether to verify SSL certs or path to cert(s)
    :type verify: bool | str
    :param kwargs: Additional arguments to requests.request
        (see http://docs.python-requests.org/en/latest/api/#requests.request)
    :type kwargs: dict
    :rtype: Response
    """

    if 'headers' not in kwargs:
        kwargs['headers'] = {'Accept': 'application/json'}

    verify = _verify_ssl(verify)

    # Silence 'Unverified HTTPS request' and 'SecurityWarning' for bad certs
    if verify is not None:
        silence_requests_warnings()

    response = _request(method, url, is_success, timeout,
                        verify=verify, **kwargs)

    if response.status_code == 401:
        response = _request_with_auth(response, method, url, is_success,
                                      timeout, verify, **kwargs)

    if is_success(response.status_code):
        return response
    elif response.status_code == 403:
        raise DCOSAuthorizationException(response)
    else:
        raise DCOSHTTPException(response)