Python requests 模块,HTTPError() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用requests.HTTPError()

项目:upstox-python    作者:upstox    | 项目源码 | 文件源码
def start_websocket(self, run_in_background=False):
        socket_params = {}
        try:
            socket_params = self.get_socket_params()
        except requests.exceptions.HTTPError:
            print ("Can't Access Socket Params")
        ping_interval = 60
        ping_timeout = 10

        if 'pythonPingInterval' in socket_params.keys():
            ping_interval = socket_params['pythonPingInterval']

        if 'pythonPingTimeout' in socket_params.keys():
            ping_timeout = socket_params['pythonPingTimeout']

        url = self.config['socketEndpoint'].format(api_key=self.api_key, access_token=self.access_token)
        self.websocket = websocket.WebSocketApp(url,
                                                header={'Authorization: Bearer' + self.access_token},
                                                on_data=self._on_data,
                                                on_error=self._on_error,
                                                on_close=self._on_close)
        if run_in_background is True:
            self.ws_thread = threading.Thread(target=self.websocket.run_forever)
            self.ws_thread.daemon = True
            self.ws_thread.start()
        else:
            self.websocket.run_forever(ping_interval=ping_interval, ping_timeout=ping_timeout)
项目:ISB-CGC-pipelines    作者:isb-cgc    | 项目源码 | 文件源码
def sendRequest(ip, port, route, data=None, protocol="http"):
        url = "{protocol}://{ip}:{port}{route}".format(protocol=protocol, ip=ip, port=port, route=route)

        if data is not None:
            try:
                resp = requests.post(url, data=data)

            except requests.HTTPError as e:
                raise PipelineServiceError("{reason}".format(reason=e))

        else:
            try:
                resp = requests.get(url)

            except requests.HTTPError as e:
                raise PipelineServiceError("{reason}".format(reason=e))

        return resp
项目:upstox-python    作者:upstox    | 项目源码 | 文件源码
def api_call_helper(self, name, http_method, params, data):
        # helper formats the url and reads error codes nicely
        url = self.config['host'] + self.config['routes'][name]

        if params is not None:
            url = url.format(**params)

        response = self.api_call(url, http_method, data)

        if response.status_code != 200:
            raise requests.HTTPError(response.text)

        body = json.loads(response.text)

        if is_status_2xx(body['code']):
            # success
            return body['data']
        else:
            raise requests.HTTPError(response.text)

        return
项目:sapi-python-client    作者:keboola    | 项目源码 | 文件源码
def test_load_inexistent_tables_to_workspace(self):
        """
        Workspace endpoint raises HTTPError when mock loading inexistent table
        """
        msg = ('404 Client Error: Not Found for url: '
               'https://connection.keboola.com/v2/storage/workspaces/78432/'
               'load')
        responses.add(
            responses.Response(
                method='POST',
                url=('https://connection.keboola.com/v2/storage/workspaces/'
                     '78432/load'),
                body=HTTPError(msg)
            )
        )
        workspace_id = '78432'
        mapping = {"in.c-table.does_not_exist": "my-table"}
        with self.assertRaises(HTTPError) as error_context:
            self.ws.load_tables(workspace_id, mapping)
        assert error_context.exception.args[0] == msg
项目:twindb_cloudflare    作者:twindb    | 项目源码 | 文件源码
def test_api_call_raises_exception_connection_error(mock_requests,
                                                    cloudflare):

    for ex in [requests.exceptions.RequestException,
               requests.exceptions.HTTPError,
               requests.exceptions.ConnectionError,
               requests.exceptions.ProxyError,
               requests.exceptions.SSLError,
               requests.exceptions.Timeout,
               requests.exceptions.ConnectTimeout,
               requests.exceptions.ReadTimeout,
               requests.exceptions.URLRequired,
               requests.exceptions.TooManyRedirects,
               requests.exceptions.MissingSchema,
               requests.exceptions.InvalidSchema,
               requests.exceptions.InvalidURL,
               requests.exceptions.ChunkedEncodingError,
               requests.exceptions.ContentDecodingError,
               requests.exceptions.StreamConsumedError,
               requests.exceptions.RetryError]:

        mock_requests.get.side_effect = ex('Some error')
        with pytest.raises(CloudFlareException):
            cloudflare._api_call('/foo')
项目:it-jira-bamboohr    作者:saucelabs    | 项目源码 | 文件源码
def request_jira(client, url, method='GET', **kwargs):
    jwt_authorization = 'JWT %s' % encode_token(
        method, url, app.config.get('ADDON_KEY'),
        client.sharedSecret)
    result = requests.request(
        method,
        client.baseUrl.rstrip('/') + url,
        headers={
            "Authorization": jwt_authorization,
            "Content-Type": "application/json"
        },
        **kwargs)
    try:
        result.raise_for_status()
    except requests.HTTPError as e:
        raise requests.HTTPError(e.response.text, response=e.response)
    return result
项目:pip-upgrader    作者:simion    | 项目源码 | 文件源码
def _fetch_index_package_info(self, package_name, current_version):
        """
        :type package_name: str
        :type current_version: version.Version
        """

        try:
            package_canonical_name = package_name
            if self.PYPI_API_TYPE == 'simple_html':
                package_canonical_name = canonicalize_name(package_name)
            response = requests.get(self.PYPI_API_URL.format(package=package_canonical_name), timeout=15)
        except HTTPError as e:  # pragma: nocover
            return False, e.message

        if not response.ok:  # pragma: nocover
            return False, 'API error: {}'.format(response.reason)

        if self.PYPI_API_TYPE == 'pypi_json':
            return self._parse_pypi_json_package_info(package_name, current_version, response)
        elif self.PYPI_API_TYPE == 'simple_html':
            return self._parse_simple_html_package_info(package_name, current_version, response)
        else:  # pragma: nocover
            raise NotImplementedError('This type of PYPI_API_TYPE type is not supported')
项目:reconbot    作者:flakas    | 项目源码 | 文件源码
def get_character(self, character_id):
        try:
            character = self.eve.get_character(character_id)
        except requests.HTTPError as ex:
            # Patch for character being unresolvable and ESI throwing internal errors
            # Temporarily stub character to not break our behavior.
            if ex.response.status_code == 500:
                character = { 'name': 'Unknown character', 'corporation_id': 98356193 }
            else:
                raise

        return '**%s** (https://zkillboard.com/character/%d/) (%s)' % (
            character['name'],
            character_id,
            self.get_corporation(character['corporation_id'])
        )
项目:polichombr    作者:ANSSI-FR    | 项目源码 | 文件源码
def get(self, endpoint, args=None):
        """
            Wrapper for requests.get
            @arg args is a dict wich is converted to URL parameters
        """
        answer = requests.get(endpoint, params=args)
        if answer.status_code != 200:
            if answer.status_code == 404:
                self.logger.error(
                    "endpoint %s or resource not found", endpoint)
                self.logger.error(
                    "error description was: %s",
                    answer.json()["error_description"])
                return None
            else:
                self.logger.error(
                    "Error during get request for endpoint %s", endpoint)
                self.logger.error("Status code was %d", answer.status_code)
                self.logger.error("error message was: %s", answer.json())
                raise requests.HTTPError
        return answer.json()
项目:connectbox-pi    作者:ConnectBox    | 项目源码 | 文件源码
def ssidSuccessfullySet(self, ssid_str):
        r = requests.put(self.ADMIN_SSID_URL, auth=getAdminAuth(),
                         data=json.dumps({"value": ssid_str}))
        try:
            # This assumes there's not a better method to test whether
            #  the SSID was of a valid length...
            r.raise_for_status()
        except requests.HTTPError:
            return False

        if r.json()["result"] != self.SUCCESS_RESPONSE:
            return False

        r = requests.get(self.ADMIN_SSID_URL, auth=getAdminAuth())
        r.raise_for_status()
        # Finally, check whether it was actually set
        return r.json()["result"][0] == ssid_str
项目:PythonStudyCode    作者:TongTongX    | 项目源码 | 文件源码
def get_version(self):
        """Fetches the current version number of the Graph API being used."""
        args = {"access_token": self.access_token}
        try:
            response = requests.request("GET",
                                        "https://graph.facebook.com/" +
                                        self.version + "/me",
                                        params=args,
                                        timeout=self.timeout,
                                        proxies=self.proxies)
        except requests.HTTPError as e:
            response = json.loads(e.read())
            raise GraphAPIError(response)

        try:
            headers = response.headers
            version = headers["facebook-api-version"].replace("v", "")
            return float(version)
        except Exception:
            raise GraphAPIError("API version number not available")
项目:choochoo    作者:nlsdfnbch    | 项目源码 | 文件源码
def test_categories_returns_200_and_decoded_json(self):
        provider_id = '1'
        provider_name = 'flinkster'
        category_id  = '1000'

        # Assert querying by network only works as expected
        try:
            resp = self.api.categories(provider_name)
        except HTTPError as e:
            self.fail('Status Code Was %s - URL: %s!' % (e.response.status_code, e.request.url))
        self.assertIsInstance(resp, dict)
        self.assertIn('items', resp)

        # Assert querying by network and category ID works as expected
        try:
            resp = self.api.categories(provider_name, by_id=category_id)
        except HTTPError as e:
            self.fail('Status Code Was %s - URL: %s!' % (e.response.status_code, e.request.url))
        self.assertIsInstance(resp, (list, dict))
        self.assertNotIn('items', resp)
项目:valhalla    作者:LCOGT    | 项目源码 | 文件源码
def get_time_totals_from_pond(timeallocation, start, end, too, recur=0):
    """
    Pond queries are too slow with large proposals and time out, this hack splits the query
    when a timeout occurs
    """
    if recur > 3:
        raise RecursionError('Pond is timing out, too much recursion.')

    total = 0
    try:
        total += query_pond(
            timeallocation.proposal.id, start, end, timeallocation.telescope_class, timeallocation.instrument_name, too
        )
    except requests.HTTPError:
        logger.warning('We got a pond inception. Splitting further.')
        for start, end in split_time(start, end, 4):
            total += get_time_totals_from_pond(timeallocation, start, end, too, recur=recur + 1)

    return total
项目:fac    作者:mickael9    | 项目源码 | 文件源码
def login(self, username, password, require_ownership=False):
        resp = self.session.post(
            self.login_url,
            params=dict(require_game_ownership=int(require_ownership)),
            data=dict(username=username, password=password)
        )

        try:
            json = resp.json()
        except:
            json = None

        try:
            resp.raise_for_status()
            return json[0]
        except requests.HTTPError:
            if isinstance(json, dict) and 'message' in json:
                if json['message'] == 'Insufficient membership':
                    raise OwnershipError(json['message'])
                else:
                    raise AuthError(json['message'])
            else:
                raise
项目:storj-python-sdk    作者:Storj    | 项目源码 | 文件源码
def bucket_get(self, bucket_id):
        """Return the bucket object.
        See `API buckets: GET /buckets
        <https://storj.github.io/bridge/#!/buckets/get_buckets_id>`_
        Args:
            bucket_id (str): bucket unique identifier.
        Returns:
            (:py:class:`model.Bucket`): bucket.
        """
        self.logger.info('bucket_get(%s)', bucket_id)

        try:
            return model.Bucket(**self._request(
                method='GET',
                path='/buckets/%s' % bucket_id))

        except requests.HTTPError as e:
            if e.response.status_code == requests.codes.not_found:
                return None
            else:
                self.logger.error('bucket_get() error=%s', e)
                raise BridgeError()
项目:storj-python-sdk    作者:Storj    | 项目源码 | 文件源码
def test_bucket_get_not_found(self):
        """Test Client.bucket_get() when bucket does not exist."""

        mock_error = requests.HTTPError()
        mock_error.response = mock.Mock()
        mock_error.response.status_code = 404

        self.mock_request.side_effect = mock_error

        bucket = self.client.bucket_get('inexistent')

        self.mock_request.assert_called_once_with(
            method='GET',
            path='/buckets/inexistent')

        assert bucket is None
项目:BusStopPi    作者:LoveBootCaptain    | 项目源码 | 文件源码
def update_json():

    global data

    try:

        # response = requests.get('http://sickpi:3000/station/9100013')  # Spittelmarkt
        response = requests.get('http://sickpi:3000/station/9160523')  # Gotlindestr.

        json_data = response.json()

        data = json.loads(json.dumps(json_data[0], indent=4))

        print('\nUPDATE JSON')

        update_departures()

    except (requests.HTTPError, requests.ConnectionError):

        print('\nERROR UPDATE JSON')
        quit_all()
项目:kripodb    作者:3D-e-Chem    | 项目源码 | 文件源码
def similar_fragments(self, fragment_id, cutoff, limit=1000):
        """Find similar fragments to query.

        Args:
            fragment_id (str): Query fragment identifier
            cutoff (float): Cutoff, similarity scores below cutoff are discarded.
            limit (int): Maximum number of hits. Default is None for no limit.

        Returns:
            list[dict]: Query fragment identifier, hit fragment identifier and similarity score

        Raises:
            request.HTTPError: When fragment_id could not be found
        """
        url = self.base_url + '/fragments/{fragment_id}/similar'.format(fragment_id=fragment_id)
        params = {'cutoff': cutoff, 'limit': limit}
        response = requests.get(url, params)
        response.raise_for_status()
        return response.json()
项目:kripodb    作者:3D-e-Chem    | 项目源码 | 文件源码
def _fetch_fragments(self, idtype, ids):
        url = self.base_url + '/fragments?{idtype}={ids}'.format(idtype=idtype, ids=','.join(ids))
        absent_identifiers = []
        try:
            response = requests.get(url)
            response.raise_for_status()
            fragments = response.json()
        except HTTPError as e:
            if e.response.status_code == 404:
                body = e.response.json()
                fragments = body['fragments']
                absent_identifiers = body['absent_identifiers']
            else:
                raise e
        # Convert molblock string to RDKit Mol object
        for fragment in fragments:
            if fragment['mol'] is not None:
                fragment['mol'] = MolFromMolBlock(fragment['mol'])
        return fragments, absent_identifiers
项目:kripodb    作者:3D-e-Chem    | 项目源码 | 文件源码
def pharmacophores(self, fragment_ids):
        absent_identifiers = []
        pharmacophores = []
        for fragment_id in fragment_ids:
            url = self.base_url + '/fragments/{0}.phar'.format(fragment_id)
            try:
                response = requests.get(url)
                response.raise_for_status()
                pharmacophore = response.text
                pharmacophores.append(pharmacophore)
            except HTTPError as e:
                if e.response.status_code == 404:
                    pharmacophores.append(None)
                    absent_identifiers.append(fragment_id)
                else:
                    raise e
        if absent_identifiers:
            raise IncompletePharmacophores(absent_identifiers, pharmacophores)
        return pharmacophores
项目:oversight    作者:hebenon    | 项目源码 | 文件源码
def handle_trigger_event(self, sender, **data):
        # Pushover doesn't support images, so just send the event.
        notification_data = {
            "user": self.pushover_user,
            "token": self.pushover_token,
            "message": "Camera %s, event: %s" % (data['source'], data['prediction']),
            "timestamp": calendar.timegm(data['timestamp'].timetuple())
        }

        # Optionally, set the device.
        if self.pushover_device:
            notification_data['device'] = self.pushover_device

        try:
            r = requests.post("https://api.pushover.net/1/messages.json", data=notification_data)

            if r.status_code != 200:
                logger.error("Failed to send notification, (%d): %s" % (r.status_code, r.text))
        except requests.ConnectionError, e:
            logger.error("Connection Error:", e)
        except requests.HTTPError, e:
            logger.error("HTTP Error:", e)
项目:Url    作者:beiruan    | 项目源码 | 文件源码
def raise_for_status(self, allow_redirects=True):
        """Raises stored :class:`HTTPError` or :class:`URLError`, if one occurred."""

        if self.status_code == 304:
            return
        elif self.error:
            if self.traceback:
                six.reraise(Exception, Exception(self.error), Traceback.from_string(self.traceback).as_traceback())
            http_error = HTTPError(self.error)
        elif (self.status_code >= 300) and (self.status_code < 400) and not allow_redirects:
            http_error = HTTPError('%s Redirection' % (self.status_code))
        elif (self.status_code >= 400) and (self.status_code < 500):
            http_error = HTTPError('%s Client Error' % (self.status_code))
        elif (self.status_code >= 500) and (self.status_code < 600):
            http_error = HTTPError('%s Server Error' % (self.status_code))
        else:
            return

        http_error.response = self
        raise http_error
项目:commissaire-service    作者:projectatomic    | 项目源码 | 文件源码
def _delete(self, model_instance):
        """
        Deletes a serialized SecretModel string from Custodia.

        :param model_instance: SecretModel instance to delete.
        :type model_instance: commissaire.model.SecretModel
        :raises StorageLookupError: if data lookup fails (404 Not Found)
        :raises requests.HTTPError: if the request fails (other than 404)
        """
        url = self._build_key_url(model_instance)

        try:
            response = self.session.request(
                'DELETE', url, timeout=self.CUSTODIA_TIMEOUT)
            response.raise_for_status()
        except requests.HTTPError as error:
            # XXX bool(response) defers to response.ok, which is a misfeature.
            #     Have to explicitly test "if response is None" to know if the
            #     object is there.
            have_response = response is not None
            if have_response and error.response.status_code == 404:
                raise StorageLookupError(str(error), model_instance)
            else:
                raise error
项目:git-repo    作者:guyzmo    | 项目源码 | 文件源码
def connect(self):
        self.gg.setup(self.url_ro)
        self.gg.set_token(self._privatekey)
        self.gg.set_default_private(self.default_create_private)
        self.gg.setup_session(
                self.session_certificate or not self.session_insecure,
                self.session_proxy)
        try:
            self.username = self.user  # Call to self.gg.authenticated_user()
        except HTTPError as err:
            if err.response is not None and err.response.status_code == 401:
                if not self._privatekey:
                    raise ConnectionError('Could not connect to GoGS. '
                                          'Please configure .gitconfig '
                                          'with your gogs private key.') from err
                else:
                    raise ConnectionError('Could not connect to GoGS. '
                                          'Check your configuration and try again.') from err
            else:
                raise err
项目:kacpaw    作者:benburrill    | 项目源码 | 文件源码
def _comment_exists(self):
        """
        This is yet another horrible workaround to keep this from falling apart.

        Because of the other workarounds to get around the fact that you
        cannot get metadata from a comment, we've run into problems where it
        is not always obvious if a comment even exists.  For this reason, we
        need to have a method to test it for us.
        """
        try:
            # Getting the reply data from a comment should raise an error if
            # it doesn't exist.  We use Comment's get_reply_data because
            # ProgramCommentReply's get_reply_data is another one of those
            # horrible workarounds
            list(Comment(self.id, self.get_program()).get_reply_data())
        except requests.HTTPError:
            return False
        return True
项目:kacpaw    作者:benburrill    | 项目源码 | 文件源码
def get_metadata(self):
        """Returns a dictionary with information about this ``ProgramCommentReply``."""
        # there's no way that I've found to get comment reply metadata directly, 
        # so we iterate though the comment thread until you find this comment
        for comment_data in self.get_parent().get_reply_data():
            if comment_data["key"] == self.id:
                return comment_data
        raise _CommentDoesntExistError(self)

        # I'm keeping this todo until I can fully address it, although I did
        # add an error

        # todo: raise some error instead of returning None.  What error?  IDK.
        # I'm almost tempted to pretend it's an HTTPError, but I'll need to do
        # some research into why we would get here (We can get here btw.
        # That's how I found this).  Would self.get_parent().get_reply_data()
        # raise an HTTPError if self.comment_key was nonsense?  If that's the
        # case, we will only (probably) be here if comment_key was a
        # ProgramComment key instead of a ProgramCommentReply key, so we would
        # probably want to have an error that's more specific (like TypeError
        # maybe?).  Man, there are so many edge cases with this stuff that I
        # really should look into now that I think about it...  We are also
        # probably going to need to keep a lot of this comment to explain why
        # we raise the error we do.
项目:voucherify-python-sdk    作者:voucherifyio    | 项目源码 | 文件源码
def request(self, path, method='GET', **kwargs):
        try:
            url = ENDPOINT_URL + path

            response = requests.request(
                method=method,
                url=url,
                headers=self.headers,
                timeout=self.timeout,
                **kwargs
            )
        except requests.HTTPError as e:
            response = json.loads(e.read())
        except requests.ConnectionError as e:
            raise VoucherifyError(e)

        if response.headers.get('content-type') and 'json' in response.headers['content-type']:
            result = response.json()
        else:
            result = response.text

        if isinstance(result, dict) and result.get('error'):
            raise VoucherifyError(result)

        return result
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def get_from_chain(url_adder):
    url = 'https://api.chain.com/v2/bitcoin/%s' % (url_adder)

    ok = False
    while not ok:
        try:
            r = requests.get(url, auth=(CHAIN_API_KEY, CHAIN_API_SECRET))
            r.raise_for_status()
            ok = True
        except requests.HTTPError as e:
            if r.status_code == 429:  # Too many requests
                time.sleep(1)
            else:
                print("Request was to %s" % (url))
                raise e
    b = json.loads(r.text)

    return b
项目:TestRewrite    作者:osqa-interns    | 项目源码 | 文件源码
def goto_course_list(self):
        """Go to the course picker."""
        long_wait = WebDriverWait(self.driver, 30)
        try:
            long_wait.until(
                expect.presence_of_element_located(
                    (By.ID, 'ox-react-root-container')
                )
            )
            if 'tutor' in self.current_url():
                self.find(By.CSS_SELECTOR, '.ui-brand-logo').click()
                self.page.wait_for_page_load()
            else:
                raise HTTPError('Not currently on an OpenStax Tutor webpage:' +
                                '%s' % self.current_url())
        except Exception as ex:
            raise ex
项目:cobra    作者:wufeifei    | 项目源码 | 文件源码
def push(self):
        """
        Push data to API.
        :return: push success or not
        """

        try:
            re = requests.post(url=self.api, data={"info": json.dumps(self.post_data, ensure_ascii=False)})

            result = re.json()
            if result.get("vul_pdf", "") != "":
                logger.info('[PUSH API] Push success!')
                return True
            else:
                logger.warning('[PUSH API] Push result error: {0}'.format(re.text))
                return False
        except (requests.ConnectionError, requests.HTTPError) as error:
            logger.critical('[PUSH API] Network error: {0}'.format(str(error)))
            return False
        except ValueError as error:
            logger.critical('[PUSH API] Response error: {0}'.format(str(error)))
            return False
项目:icfpc2016-judge    作者:icfpc2016    | 项目源码 | 文件源码
def test_submit_problem_web(self):
        common.ensure_login()
        common.ensure_api_key()
        # publish_time is before the first publish time
        with self.assertRaises(requests.HTTPError) as cm:
            self.submit_problem_web(1475279990, 1451606400)
        assert cm.exception.response.status_code == 403
        # Correct publish_time
        self.submit_problem_web(1475280000, 1451606400)
        self.submit_problem_web(1480550400, 1451606400)
        # publish_time is after the last publish time
        with self.assertRaises(requests.HTTPError) as cm:
            self.submit_problem_web(1480550410, 1451606400)
        assert cm.exception.response.status_code == 403
        # publish_time is past
        with self.assertRaises(requests.HTTPError) as cm:
            self.submit_problem_web(1475280000, 1475280001)
        assert cm.exception.response.status_code == 403
项目:icfpc2016-judge    作者:icfpc2016    | 项目源码 | 文件源码
def test_submit_problem_api(self):
        common.ensure_login()
        common.ensure_api_key()
        # publish_time is before the first publish time
        with self.assertRaises(requests.HTTPError) as cm:
            self.submit_problem_api(1475279990, 1451606400)
        assert cm.exception.response.status_code == 403
        # Correct publish_time
        self.submit_problem_api(1475280000, 1451606400)
        self.submit_problem_api(1480550400, 1451606400)
        # publish_time is after the last publish time
        with self.assertRaises(requests.HTTPError) as cm:
            self.submit_problem_api(1480550410, 1451606400)
        assert cm.exception.response.status_code == 403
        # publish_time is past
        with self.assertRaises(requests.HTTPError) as cm:
            self.submit_problem_api(1475280000, 1475280001)
        assert cm.exception.response.status_code == 403
项目:tintri-stats    作者:genebean    | 项目源码 | 文件源码
def api_delete(server_name, api, session_id):
    #Header and URL for delete call
    headers = {'content-type': 'application/json',
               'cookie': 'JSESSIONID='+session_id }

    url = 'https://' + server_name + API + api

    try:
        # Invoke the API.
        r = requests.delete(url, headers=headers, verify=False)
    except requests.ConnectionError:
        raise TintrRequestsiApiException("API Connection error occurred.")
    except requests.HTTPError:
        raise TintriRequestsException("HTTP error occurred.")
    except requests.Timeout:
        raise TintriRequestsException("Request timed out.")
    except:
        raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.")

    return r


# PUT
项目:tintri-stats    作者:genebean    | 项目源码 | 文件源码
def api_put(server_name, api, payload, session_id):
    headers = {'content-type': 'application/json',
               'cookie': 'JSESSIONID='+session_id }

    url = 'https://' + server_name + API + api

    try:
        # Invoke the API.
        r = requests.put(url, data=json.dumps(payload),
                         headers=headers, verify=False)
    except requests.ConnectionError:
        raise TintriRequestsException("API Connection error occurred.")
    except requests.HTTPError:
        raise TintriRequestsException("HTTP error occurred.")
    except requests.Timeout:
        raise TintriRequestsException("Request timed out.")
    except:
        raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.")

    return r


# POST
项目:tintri-stats    作者:genebean    | 项目源码 | 文件源码
def api_post(server_name, api, payload, session_id):
    headers = {'content-type': 'application/json',
               'cookie': 'JSESSIONID='+session_id }

    url = 'https://' + server_name + API + api

    try:
        # Invoke the API.
        r = requests.post(url, data=json.dumps(payload),
                          headers=headers, verify=False)
    except requests.ConnectionError:
        raise TintriRequestsException("API Connection error occurred.")
    except requests.HTTPError:
        raise TintriRequestsException("HTTP error occurred.")
    except requests.Timeout:
        raise TintriRequestsException("Request timed out.")
    except:
        raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.")

    return r


# Login.
项目:tintri-stats    作者:genebean    | 项目源码 | 文件源码
def download_file(server_name, report_url, session_id, file_name):
    headers = {'content-type': 'application/json'}

    try:
        r = requests.get(report_url, headers=headers, verify=False, stream=True)
        # if HTTP Response is not 200 then raise an exception
        if r.status_code != 200:
            message = "The HTTP response for get call to the server is not 200."
            raise TintriApiException(message, r.status_code, report_url, "No Payload", r.text)

        with open(file_name, 'w') as file_h:
            for block in r.iter_content(4096):
                file_h.write(block)

    except requests.ConnectionError:
        raise TintriRequestsException("API Connection error occurred.")
    except requests.HTTPError:
        raise TintriRequestsException("HTTP error occurred.")
    except requests.Timeout:
        raise TintriRequestsException("Request timed out.")
    except Exception as e:
        raise TintriRequestsException("An unexpected error: " + e.__str__())
项目:Foxlogin    作者:callMePlus    | 项目源码 | 文件源码
def login(self):
        login_params = {
            'j_username': GlobalVal.username,
            'j_password': GlobalVal.password,
            'validateCode': GlobalVal.secretCode
        }
        login_result = ''
        try:
            login_result = self.__session.post(self.home_url, data=login_params, headers=self.header,timeout=6)
        # DNS failure, refused connection, etc
        except ConnectionError :
            return False,'error:ConnectionError.'
        # Timeout
        except TimeoutError:
            return False, 'error:TimeoutError.'
        # HTTPError
        except requests.HTTPError:
            return False, 'error:HTTPError:%s.'%login_result.status_code
        except :
            return False,'error:network disconnect.'
        # Almost everything is ok.
        if 'success' in login_result.json():
            return True, login_result.json()
        else:
            return False, login_result.json()
项目:gobble    作者:openspending    | 项目源码 | 文件源码
def test_handle_raises_error_if_status_is_400(api_call):
    responses.add(api_call.method, api_call.url, body=HTTPError())
    with raises(HTTPError):
        handle(api_call())


# @responses.activate
# @mark.parametrize('api_call', api_calls)
# def test_call_endpoint_with_query_parameters(api_call):
#     responses.add(
#         api_call.method,
#         api_call.url + '?spam=eggs&foo=bar',
#         match_querystring=False
#     )
#     params = dict(foo='bar', spam='eggs')
#     assert api_call(params=params).status_code == 200
项目:gobble    作者:openspending    | 项目源码 | 文件源码
def _handle_promises(self):
        """Collect all promises from S3 uploads.
        """
        for stream, future in zip(self._streams, self._futures):
            exception = future.exception()
            if exception:
                raise exception
            response = future.result()

            if response.status_code != 200:
                message = 'Something went wrong uploading %s to S3: %s'
                log.error(message, response.url, response.text)
                raise HTTPError(message)

            self._responses.append(response)
            stream.close()
项目:cachet-url-monitor    作者:mtakaki    | 项目源码 | 文件源码
def setUp(self):
        def getLogger(name):
            self.mock_logger = mock.Mock()
            return self.mock_logger

        sys.modules['logging'].getLogger = getLogger

        def get(url, headers):
            get_return = mock.Mock()
            get_return.ok = True
            get_return.json = mock.Mock()
            get_return.json.return_value = {'data': {'status': 1}}
            return get_return

        sys.modules['requests'].get = get

        self.env = EnvironmentVarGuard()
        self.env.set('CACHET_TOKEN', 'token2')

        self.configuration = Configuration('config.yml')
        sys.modules['requests'].Timeout = Timeout
        sys.modules['requests'].ConnectionError = ConnectionError
        sys.modules['requests'].HTTPError = HTTPError
项目:plugin.audio.tidal2    作者:arnesongit    | 项目源码 | 文件源码
def get_album_json_thread(self):
        try:
            while not xbmc.abortRequested and not self.abortAlbumThreads:
                try:
                    album_id = self.albumQueue.get_nowait()
                except:
                    break
                try:
                    self.get_album(album_id, withCache=False)
                except requests.HTTPError as e:
                    r = e.response
                    msg = _T(30505)
                    try:
                        msg = r.reason
                        msg = r.json().get('userMessage')
                    except:
                        pass
                    log('Error getting Album ID %s' % album_id, xbmc.LOGERROR)
                    if r.status_code == 429 and not self.abortAlbumThreads:
                        self.abortAlbumThreads = True
                        log('Too many requests. Aborting Workers ...', xbmc.LOGERROR)
                        self.albumQueue._init(9999)
                        xbmcgui.Dialog().notification(plugin.name, msg, xbmcgui.NOTIFICATION_ERROR)
        except Exception, e:
            traceback.print_exc()
项目:rpwd    作者:ivytin    | 项目源码 | 文件源码
def http_get(self, s, host, port, timeout, appendix=''):
        try:
            r = s.get('http://{}:{}{}'.format(host, port, appendix), timeout=timeout, verify=False)
        # except requests.exceptions.Timeout:
        #     return None, RequetsHostException('HTTP connection timeout, host: http://{}:{}'
        #                                       .format(host, port))
        except requests.RequestException as err:
            return None, RequetsHostException('{}:{} request error, msg: {}'
                                              .format(host, port, type(err).__name__))
        # except requests.HTTPError:
        #     return None, RequetsHostException('HTTP server response error, host: http://{}:{}'
        #                                       .format(host, port))
        # except requests.exceptions.RequestException as msg:
        #     return None, RequetsHostException('call requests.get error, msg: {}'
        #                                       .format(msg))
        else:
            return r, None
项目:xivo-ctid-ng    作者:wazo-pbx    | 项目源码 | 文件源码
def _connect(self):
        logger.debug('ARI client listening...')
        try:
            with self._running():
                self.client.run(apps=[APPLICATION_NAME])
        except socket.error as e:
            if e.errno == errno.EPIPE:
                # bug in ari-py when calling client.close(): ignore it and stop
                logger.error('Error while listening for ARI events: %s', e)
                return
            else:
                self._connection_error(e)
        except (WebSocketException, HTTPError) as e:
            self._connection_error(e)
        except ValueError:
            logger.warning('Received non-JSON message from ARI... disconnecting')
            self.client.close()
项目:xivo-ctid-ng    作者:wazo-pbx    | 项目源码 | 文件源码
def _get_mongooseim_history(self, user_uuid, args):
        participant_user = args.get('participant_user_uuid')
        participant_server = args.get('participant_server_uuid', self._xivo_uuid)
        limit = args.get('limit')

        user_jid = self._build_jid(self._xivo_uuid, user_uuid)
        try:
            if participant_user:
                participant_jid = self._build_jid(participant_server, participant_user)
                result = self.mongooseim_client.get_user_history_with_participant(user_jid,
                                                                                  participant_jid,
                                                                                  limit)
            else:
                result = self.mongooseim_client.get_user_history(user_jid, limit)
        except HTTPError as e:
            raise MongooseIMException(self._xivo_uuid, e.response.status_code, e.response.reason)
        except RequestException as e:
            raise MongooseIMUnreachable(self._xivo_uuid, e)

        return result
项目:xivo-ctid-ng    作者:wazo-pbx    | 项目源码 | 文件源码
def send_message(self, request_body, user_uuid=None):
        from_xivo_uuid, from_ = self._build_from(request_body, user_uuid)
        to_xivo_uuid, to = self._build_to(request_body)
        alias = request_body['alias']
        msg = request_body['msg']
        self.contexts.add(from_, to, to_xivo_uuid=to_xivo_uuid, alias=alias)

        from_jid = self._build_jid(from_xivo_uuid, from_)
        to_jid = self._build_jid(to_xivo_uuid, to)
        try:
            self.mongooseim_client.send_message(from_jid, to_jid, msg)
        except HTTPError as e:
            raise MongooseIMException(self._xivo_uuid, e.response.status_code, e.response.reason)
        except RequestException as e:
            raise MongooseIMUnreachable(self._xivo_uuid, e)

        bus_event = ChatMessageSent((from_xivo_uuid, from_),
                                    (to_xivo_uuid, to),
                                    alias,
                                    msg)
        headers = {
            'user_uuid:{uuid}'.format(uuid=from_): True,
        }
        self._bus_publisher.publish(bus_event, headers=headers)
项目:python3-linkedin    作者:DEKHTIARJonathan    | 项目源码 | 文件源码
def raise_for_error(response):
    try:
        response.raise_for_status()
    except (requests.HTTPError, requests.ConnectionError) as error:
        try:
            if len(response.content) == 0:
                # There is nothing we can do here since LinkedIn has neither sent
                # us a 2xx response nor a response content.
                return
            response = response.json()
            if ('error' in response) or ('errorCode' in response):
                message = '%s: %s' % (response.get('error', str(error)),
                                      response.get('message', 'Unknown Error'))
                error_code = response.get('status')
                ex = get_exception_for_error_code(error_code)
                raise ex(message)
            else:
                raise LinkedInError(error.message)
        except (ValueError, TypeError):
            raise LinkedInError(error.message)
项目:pypdns    作者:fousti    | 项目源码 | 文件源码
def _call(self, verb, url, data={}, params={}, headers={}):
        if headers:
            self.session.headers.update(headers)

        log.info('Call %s with data %s', url, data)
        resp = self.session.request(verb, url, json=data, params=params)
        status_code = resp.status_code
        try:
            resp.raise_for_status()
        except requests.HTTPError as exc:
            log.debug('Error occured, endpoint : %s, apikey : %s',
                      url, self.apikey)
            return resp, status_code

        except requests.Timeout:
            log.error('Request Timeout to %si', url)
            return False, status_code
        except requests.RequestException:
            log.error('Requests Error')
            return False, status_code
        else:
            return resp, status_code
项目:sapi-python-client    作者:keboola    | 项目源码 | 文件源码
def test_detail_inexsitent_workspace(self):
        """
        Workspace Endpoint raises HTTPError when mocking inexistent workspace
        detail
        """
        msg = ('404 Client Error: Not Found for url: '
               'https://connection.keboola.com/v2/storage/workspaces/1')
        responses.add(
            responses.Response(
                method='GET',
                url='https://connection.keboola.com/v2/storage/workspaces/1',
                body=HTTPError(msg)
            )
        )
        workspace_id = '1'
        with self.assertRaises(HTTPError) as error_context:
            self.ws.detail(workspace_id)
        assert error_context.exception.args[0] == msg
项目:sapi-python-client    作者:keboola    | 项目源码 | 文件源码
def test_reset_password_for_inexistent_workspace(self):
        """
        Workspace endpoint raises HTTPError when mock resetting password for
        inexistent workspace
        """
        msg = ('404 Client Error: Not Found for url: '
               'https://connection.keboola.com/v2/storage/workspaces/1/'
               'password')
        responses.add(
            responses.Response(
                method='POST',
                url=('https://connection.keboola.com/v2/storage/workspaces/'
                     '1/password'),
                body=HTTPError(msg)
            )
        )
        workspace_id = '1'
        with self.assertRaises(HTTPError) as error_context:
            self.ws.reset_password(workspace_id)
        assert error_context.exception.args[0] == msg