Python requests.exceptions 模块,ConnectTimeout() 实例源码

我们从Python开源项目中,提取了以下28个代码示例,用于说明如何使用requests.exceptions.ConnectTimeout()

项目:pubchem-ranker    作者:jacobwindsor    | 项目源码 | 文件源码
def get_cids(self, cas):
        """
        Use the PubChem API to get the CID
        :param cas: string - CAS identifier
        :return: list of CIDs
        """
        uri = "http://pubchem.ncbi.nlm.nih.gov/rest/pug/compound/name/%s/cids/json" \
              "?email=%s"

        try:
            response = get((uri % (cas, app.config['ADMIN_EMAIL']))).json()
            try:
                cids = response['IdentifierList']['CID']
                return cids
            except KeyError:
                return None

        except (exceptions.ConnectionError, TimeoutError, exceptions.Timeout,
                exceptions.ConnectTimeout, exceptions.ReadTimeout) as e:
            # Error. return the error and the CAS number that this error occured on
            sys.stderr.write("Error: %s. Occurred on CAS: %s", (e, cas))
            sys.stderr.flush()
            sys.stdout.flush()
项目:trio2o    作者:openstack    | 项目源码 | 文件源码
def handle_list(self, cxt, resource, filters):
        try:
            resource = self._adapt_resource(resource)
            client = self._get_client(cxt)
            collection = '%ss' % resource
            # only server list supports filter
            if resource == 'server':
                search_opts = _transform_filters(filters)
                return [res.to_dict() for res in getattr(
                    client, collection).list(search_opts=search_opts)]
            else:
                return [res.to_dict() for res in getattr(client,
                                                         collection).list()]
        except r_exceptions.ConnectTimeout:
            self.endpoint_url = None
            raise exceptions.EndpointNotAvailable('nova',
                                                  client.client.management_url)
项目:qlcoder    作者:L1nwatch    | 项目源码 | 文件源码
def single_thread_solve():
    """
    ???????????????, ????????, ??????????
    """
    with open("solve_result.txt", "w") as f:
        check_codes = dict()
        for i in range(355, 1000 + 1):
            # ??? 1 ? 1000 ??????????
            check_code = get_verify_code(i, check_codes, f)

            # ????
            url = ("http://www.qlcoder.com/train/handsomerank?_token=d4texP05ci7veIAztvnwe5yETOFhlLWkSaBYC51B"
                   "&user=w%40tch&checkcode={}".format(check_code))

            while True:
                try:
                    response = requests.get(url, timeout=10)
                    if "?????" not in response.text:
                        print("[+] ????? {} ?".format(i), file=f)
                except (ConnectTimeout, ReadTimeout, ValueError, ConnectionError, TooManyRedirects):
                    print("[-] ??? {} ???".format(i), file=f)
                else:
                    break
项目:treehugger    作者:timeoutdigital    | 项目源码 | 文件源码
def load_user_data_as_yaml_or_die(ignore_missing=False):
    try:
        data = load_user_data_as_yaml()
    except (ConnectTimeout, ConnectionError):
        die('Could not connect to EC2 metadata service - are we on an EC2 instance?')
    except HTTPError as exc:
        if exc.response.status_code == 404 and ignore_missing:
            return {'TREEHUGGER_APP': 'Missing', 'TREEHUGGER_STAGE': 'Missing'}

        die('Got a {} from the EC2 metadata service when retrieving user data'.format(exc.response.status_code))
    except yaml.error.YAMLError:
        die('Did not find valid YAML in the EC2 user data')

    try:
        return data['treehugger']
    except TypeError:
        die('EC2 user data is not a YAML dictionary')
    except KeyError:
        die('YAML in EC2 user data does not have a key "treehugger"')
项目:ethwallet    作者:absortium    | 项目源码 | 文件源码
def notify_user(self, tx_pk):
    t = Transaction.objects.get(pk=tx_pk)

    data = {
        'address': t.to_address,
        'amount': t.value,
        'tx_hash': t.hash
    }

    try:
        requests.post(t.owner.web_hook, data=data)
        t.notification_status = constants.NOTIFICATION_DONE
        t.save()

    except (ConnectionError, ConnectTimeout) as e:
        raise self.retry(countdown=constants.NOTIFY_RETRY_COUNTDOWN)
项目:pubchem-ranker    作者:jacobwindsor    | 项目源码 | 文件源码
def pubchem_counter(self, cid, collection):
        """
        Use the SDQAgent that PubChem uses on their compound pages to get counts for a collection.

        cid: integer. The pubchem compound identifier
        Collection. String. One of the pubchem collections. E.g. "bioactivity" or "biocollection"

        Returns: Integer count
        """

        uri = 'https://pubchem.ncbi.nlm.nih.gov/sdq/sdqagent.cgi?' \
              'infmt=json&outfmt=json' \
              '&query={"select":["*"],"collection":"%s",' \
              '"where":{"ors":{"cid":"%s"}},"start":1,"limit":1}' % (collection, cid)

        try:
            response = get(uri).json()
            try:
                count = response['SDQOutputSet'][0]['totalCount']
                sys.stdout.write(str(count) + "\n")
                sys.stdout.flush()
                return count
            except KeyError:
                return None

        except (exceptions.ConnectionError, TimeoutError, exceptions.Timeout,
                exceptions.ConnectTimeout, exceptions.ReadTimeout) as e:
            # Error. return the error and the CID number that this error occured on
            # Save what have so far
            sys.stderr.write("Error: %s. Occurred on CID: %s", (e, cid))
            sys.stderr.flush()
            sys.stdout.flush()
            quit()
        except exceptions.ChunkedEncodingError as e:
            sys.stderr.write("Error: %s. Occurred on CID: %s", (e, cid))
            sys.stderr.flush()
            quit()
项目:trio2o    作者:openstack    | 项目源码 | 文件源码
def handle_create(self, cxt, resource, *args, **kwargs):
        try:
            resource = self._adapt_resource(resource)
            client = self._get_client(cxt)
            collection = '%ss' % resource
            return getattr(client, collection).create(
                *args, **kwargs).to_dict()
        except r_exceptions.ConnectTimeout:
            self.endpoint_url = None
            raise exceptions.EndpointNotAvailable('nova',
                                                  client.client.management_url)
项目:trio2o    作者:openstack    | 项目源码 | 文件源码
def handle_get(self, cxt, resource, resource_id):
        try:
            resource = self._adapt_resource(resource)
            client = self._get_client(cxt)
            collection = '%ss' % resource
            return getattr(client, collection).get(resource_id).to_dict()
        except r_exceptions.ConnectTimeout:
            self.endpoint_url = None
            raise exceptions.EndpointNotAvailable('nova',
                                                  client.client.management_url)
        except n_exceptions.NotFound:
            LOG.debug("%(resource)s %(resource_id)s not found",
                      {'resource': resource, 'resource_id': resource_id})
项目:trio2o    作者:openstack    | 项目源码 | 文件源码
def handle_delete(self, cxt, resource, resource_id):
        try:
            resource = self._adapt_resource(resource)
            client = self._get_client(cxt)
            collection = '%ss' % resource
            return getattr(client, collection).delete(resource_id)
        except r_exceptions.ConnectTimeout:
            self.endpoint_url = None
            raise exceptions.EndpointNotAvailable('nova',
                                                  client.client.management_url)
        except n_exceptions.NotFound:
            LOG.debug("Delete %(resource)s %(resource_id)s which not found",
                      {'resource': resource, 'resource_id': resource_id})
项目:trio2o    作者:openstack    | 项目源码 | 文件源码
def handle_action(self, cxt, resource, action, *args, **kwargs):
        try:
            resource = self._adapt_resource(resource)
            client = self._get_client(cxt)
            collection = '%ss' % resource
            resource_manager = getattr(client, collection)
            resource_manager.convert_into_with_meta = _convert_into_with_meta
            # NOTE(zhiyuan) yes, this is a dirty hack. but the original
            # implementation hides response object which is needed
            return getattr(resource_manager, action)(*args, **kwargs)
        except r_exceptions.ConnectTimeout:
            self.endpoint_url = None
            raise exceptions.EndpointNotAvailable('nova',
                                                  client.client.management_url)
项目:trio2o    作者:openstack    | 项目源码 | 文件源码
def handle_delete(self, cxt, resource, resource_id):
        try:
            client = self._get_client(cxt)
            collection = '%ss' % resource
            return getattr(client, collection).delete(resource_id)
        except r_exceptions.ConnectTimeout:
            self.endpoint_url = None
            raise exceptions.EndpointNotAvailable('cinder',
                                                  client.client.management_url)
        except c_exceptions.NotFound:
            LOG.debug("Delete %(resource)s %(resource_id)s which not found",
                      {'resource': resource, 'resource_id': resource_id})
项目:trio2o    作者:openstack    | 项目源码 | 文件源码
def handle_action(self, cxt, resource, action, *args, **kwargs):
        try:
            client = self._get_client(cxt)
            collection = '%ss' % resource
            resource_manager = getattr(client, collection)
            getattr(resource_manager, action)(*args, **kwargs)
        except r_exceptions.ConnectTimeout:
            self.endpoint_url = None
            raise exceptions.EndpointNotAvailable('cinder',
                                                  client.client.management_url)
项目:vmware-nsxlib    作者:openstack    | 项目源码 | 文件源码
def test_cluster_proxy_connection_error(self):

        def connect_timeout():
            raise requests_exceptions.ConnectTimeout()

        api = self.mock_nsx_clustered_api(session_response=connect_timeout)
        api._validate = mock.Mock()
        self.assertRaises(nsxlib_exc.ServiceClusterUnavailable,
                          api.get, 'api/v1/transport-zones')
项目:nps-sdk-python    作者:Ingenico-NPS-Latam    | 项目源码 | 文件源码
def _soap_call(self, service, params):
        try:
            params = utils.add_extra_info(service, params)
            if Configuration.sanitize:
                params = utils._check_sanitize(params=params, is_root=True)
            if not params.get('psp_ClientSession'):
                params = utils.add_secure_hash(params, Configuration.secret_key)

            response = getattr(self._client.service, service)(params)
            return response
        except ReadTimeout:
            raise ApiException
        except ConnectTimeout:
            raise ApiException
项目:dockerscan    作者:cr0hn    | 项目源码 | 文件源码
def get_remote_registry_info(target: str) -> Union[Set,
                                                   DockerscanTimeoutError]:
    """
    This function does two things:

    - detect the remote registry version. Allowed returned values are: {1, 2}
    - detect if remote Docker Registry has enabled the authentication

    :return: a tuple as format: (REMOTE_VERSION, ENABLED_OR_NOT_AUTH)
    :rtype: tuple(int, bool)

    :raise DockerscanTimeoutError: If remote server reach a timeout
    """
    #
    # Check for verion 2
    #
    remote_version = 1
    enabled_auth = False

    try:
        r = requests.get("{}/v2/".format(target),
                         timeout=2,
                         allow_redirects=False,
                         verify=False)

        if r.status_code in (200, 401):
            if "registry/2.0" in r.headers["Docker-Distribution-Api-Version"]:
                remote_version = 2

            if r.status_code == 401:
                enabled_auth = True

        return remote_version, enabled_auth

    except (ConnectTimeout, ConnectionError) as e:
        raise DockerscanTimeoutError("Remote registry '{}' do not responds".
                                     format(target))
项目:momo    作者:gusibi    | 项目源码 | 文件源码
def avinfo(self):
        url = '%s&avinfo' % self.url
        try:
            req = requests.get(url, timeout=(2, 2))
        except (ConnectTimeout, ReadTimeout):
            return {}
        if req.status_code == 200:
            info = req.json()
            return info.get('format', {})
        return {}
项目:momo    作者:gusibi    | 项目源码 | 文件源码
def md5(self):
        url = '%s&hash/md5' % self.url
        try:
            req = requests.get(url, timeout=(2, 2))
        except (ConnectTimeout, ReadTimeout):
            return ''
        if req.status_code == 200:
            info = req.json()
            return info.get('md5', '')
        return ''
项目:python-kemptech-api    作者:KEMPtechnologies    | 项目源码 | 文件源码
def test_ConnectionTimeoutException(self):
        self.response.raise_for_status.side_effect = \
                exceptions.ConnectTimeout
        with assert_raises(client.ConnectionTimeoutException):
            self.client._do_request('GET','MyCommand')
项目:streamalert    作者:airbnb    | 项目源码 | 文件源码
def test_make_request_timeout(self, requests_mock):
        """App Integration - Make Request, Timeout"""
        requests_mock.side_effect = ConnectTimeout(None, response='too slow')
        result, response = self._app._make_get_request('hostname', None, None)
        assert_false(result)
        assert_is_none(response)
项目:ethwallet    作者:absortium    | 项目源码 | 文件源码
def send(self, user_pk, from_address, to_address, value):
    try:

        User = get_user_model()
        user = User.objects.get(pk=user_pk)

        password = get_wallet_password(user.wallet_secret_key)

        client = get_rpc_client(host=settings.ETHNODE_URL)
        client.personal_unlockAccount(address=from_address, passphrase=password)

        balance = client.eth_getBalance(from_address)
        value = eth2wei(value)
        fee = eth2wei(constants.TX_FEE)

        if balance >= value:
            if value > fee:
                client.eth_sendTransaction(from_address=from_address,
                                           to_address=to_address,
                                           value=value - fee)
            else:
                raise ValidationError("Fee higher than 'value' itself")
        else:
            raise ValidationError("Not enough money")

    except (ConnectionError, ConnectTimeout) as e:
        raise self.retry(countdown=constants.SEND_RETRY_COUNTDOWN)
项目:twitterresearch    作者:trifle    | 项目源码 | 文件源码
def throttled_call(*args, **kwargs):
    """
    Helper function for complying with rate limits.

    :parameters: Same as requests.get
    :returns: requests response object
    :side effects: updates global rate_limit count and expires values
    """
    # Declare rate_limit as global so we can write to it
    global rate_limit

    # Try as long as we need to succeed
    while True:
        # Take a break if there are less than 4 calls in the current rate limit timeslot
        now = datetime.datetime.utcnow()
        if rate_limit['calls'] < 5 and rate_limit['expires'] > now:
            wait_for_limit()
        try:
            result = auth.get(*args, **kwargs)
            # Update remaining calls and expiry date with the new number from twitter
            rate_limit['calls'] = int(result.headers.get('x-rate-limit-remaining', 0))
            if 'x-rate-limit-reset' in result.headers:
                rate_limit['expires'] = datetime.datetime.utcfromtimestamp(int(result.headers['x-rate-limit-reset']))
            return result
        # Catch these two errors and continue
        # It is generally a good idea to only catch errors that you anticipate
        # Unknown Exceptions should be allowed to occur so you learn about them!
        except (ReadTimeout, ConnectTimeout):
            logging.error("There was a network timeout, retrying!")
        finally:
            # Wait for one second, regardless of our success.
            # Waiting one second between requests is a generally accepted sane default
            time.sleep(1)

# -----------------------
# DATA FETCHING FUNCTIONS
# -----------------------
项目:st-python-api    作者:Secure-Trading    | 项目源码 | 文件源码
def test_handle_exception(self):
        tests = [(RequestException(), [''], "7", "7"),
                 (ConnectTimeout(), [''], "7", "7"),
                 (RequestsConnectionError(), [''], "7", "7"),
                 (Exception("Any other exception"),
                  ["Any other exception"],
                  "8 Any other exception", "8"
                  ),
                 ]

        for exception, exp_data, exp_english, exp_code in tests:
            self.check_st_exception(ConnectionError,
                                    exp_data, exp_english, exp_code,
                                    self.http_client._handle_exception,
                                    func_args=(exception,))
项目:openkamer    作者:openkamer    | 项目源码 | 文件源码
def create_dossier_retry_on_error(dossier_id, max_tries=3):
    dossier_id = str(dossier_id)
    tries = 0
    while True:
        try:
            tries += 1
            create_or_update_dossier(dossier_id)
        except (ConnectionError, ConnectTimeout) as error:
            logger.exception(error)
            time.sleep(5)  # wait 5 seconds for external servers to relax
            if tries < max_tries:
                logger.error('trying again!')
                continue
            logger.error('max tries reached, skipping dossier: ' + dossier_id)
        break
项目:openkamer    作者:openkamer    | 项目源码 | 文件源码
def create_parliament_members(max_results=None, all_members=False, update_votes=True):
    logger.info('BEGIN')
    parliament = Parliament.get_or_create_tweede_kamer()
    if all_members:
        member_wikidata_ids = wikidata.search_parliament_member_ids()
    else:
        member_wikidata_ids = wikidata.search_parliament_member_ids_with_start_date()
    counter = 0
    members = []
    for person_wikidata_id in member_wikidata_ids:
        logger.info('=========================')
        try:
            members += create_parliament_member_from_wikidata_id(parliament, person_wikidata_id)
        except (JSONDecodeError, ConnectionError, ConnectTimeout, ChunkedEncodingError) as error:
            logger.exception(error)
        except Exception as error:
            logger.exception(error)
            raise
        counter += 1
        if max_results and counter >= max_results:
            logger.info('END: max results reached')
            break
    if update_votes:
        set_individual_votes_derived_info()
    logger.info('END')
    return members
项目:pypac    作者:carsonyl    | 项目源码 | 文件源码
def default_proxy_fail_exception_filter(req_exc):
    return isinstance(req_exc, (ProxyError, ConnectTimeout))
项目:pypac    作者:carsonyl    | 项目源码 | 文件源码
def test_timeout_proxy(self):
        # Travis can refuse quickly, and trigger ProxyError instead.
        session = requests.Session()
        with pytest.raises(ConnectTimeout):
            session.get(arbitrary_url, timeout=0.001, proxies=proxy_parameter_for_requests('http://localhost'))
项目:inspect-docker-image    作者:giantswarm    | 项目源码 | 文件源码
def canonical_image_details(registry, namespace, image):
    start = time.time()
    if ":" in image:
        (image, tag) = image.split(":")
    else:
        tag = "latest"
    result = {}
    error = None
    try:
        if registry == app.config["DEFAULT_REGISTRY"]:
            dii = DockerHubImageInspector(namespace + "/" + image, tag)
        else:
            dii = DockerImageInspector(registry, namespace + "/" + image, tag)

        result = {
            "schema_version": dii.manifest["schemaVersion"],
            "name": dii.manifest["name"],
            "tag": dii.manifest["tag"],
            "architecture": dii.manifest["architecture"],
            "create_date": None,
            "history_length": len(dii.manifest["history"]),
            "num_layers": len(dii.layers),
            "config": json.loads(dii.manifest["history"][0]["v1Compatibility"])["container_config"],
            "layers": [],
            "image_size": 0
        }
        if dii.create_date is not None:
            result["create_date"] = dii.create_date.isoformat()
        for l in dii.layers:
            bytesize = dii.get_layer_size(l)
            if bytesize is not None:
                result["image_size"] += bytesize
            result["layers"].append({"digest": l, "size": bytesize})
    except HTTPError, e:
        if "404" in str(e):
            abort(404)
        else:
            error = str(e)
    except ConnectTimeout, e:
        error = str(e)
    except Exception, e:
        error = str(e)
    duration = time.time() - start
    return jsonify(metadata=result, duration=duration, error=error)
项目:pypac    作者:carsonyl    | 项目源码 | 文件源码
def __init__(self, pac=None, proxy_auth=None, pac_enabled=True,
                 response_proxy_fail_filter=None, exception_proxy_fail_filter=None,
                 socks_scheme='socks5', recursion_limit=ARBITRARY_HIGH_RECURSION_LIMIT):
        """
        :param PACFile pac: The PAC file to consult for proxy configuration info.
            If not provided, then upon the first request, :func:`get_pac` is called with default arguments
            in order to find a PAC file.
        :param requests.auth.HTTPProxyAuth proxy_auth: Username and password proxy authentication.
        :param bool pac_enabled: Set to ``False`` to disable all PAC functionality, including PAC auto-discovery.
        :param response_proxy_fail_filter: Callable that takes a ``requests.Response`` and returns
            a boolean for whether the response means the proxy used for the request should no longer be used.
            By default, the response is not inspected.
        :param exception_proxy_fail_filter: Callable that takes an exception and returns
            a boolean for whether the exception means the proxy used for the request should no longer be used.
            By default, :class:`requests.exceptions.ConnectTimeout` and
            :class:`requests.exceptions.ProxyError` are matched.
        :param int recursion_limit: Python recursion limit when executing JavaScript.
            PAC files are often complex enough to need this to be higher than the interpreter default.
            This value is passed to auto-discovered :class:`PACFile` only.
        :param str socks_scheme: Scheme to use when PAC file returns a SOCKS proxy. `socks5` by default.
        """
        super(PACSession, self).__init__()
        self._tried_get_pac = False

        self._proxy_resolver = None
        self._proxy_auth = proxy_auth
        self._socks_scheme = socks_scheme
        self._recursion_limit = recursion_limit

        #: Set to ``False`` to disable all PAC functionality, including PAC auto-discovery.
        self.pac_enabled = pac_enabled

        if pac:
            self._tried_get_pac = True
            self._proxy_resolver = self._get_proxy_resolver(pac)

        self._response_proxy_failure_filter = default_proxy_fail_response_filter
        if response_proxy_fail_filter:
            self._response_proxy_failure_filter = response_proxy_fail_filter

        self._exc_proxy_failure_filter = default_proxy_fail_exception_filter
        if exception_proxy_fail_filter:
            self._exc_proxy_failure_filter = exception_proxy_fail_filter