Python requests.exceptions 模块,SSLError() 实例源码

我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用requests.exceptions.SSLError()

项目:my-weather-indicator    作者:atareao    | 项目源码 | 文件源码
def run_query(self):
        q = 'select * from weather.forecast where woeid=%s' % self.woeid
        url = 'https://query.yahooapis.com/v1/yql?q=%s' % q
        params = {}
        params['format'] = 'json'
        try:
            ans = requests.get(url, auth=self.oauth, params=params)
        except SSLError as e:
            '''
            Bug #1568774
            '''
            print('wyahooapi.py: Bug #1568774', str(e))
            print('wyahooapi.py: Unable to query https url, switch to http url')
            url = 'http://query.yahooapis.com/v1/yql?q=%s' % q
            ans = requests.get(url, auth=self.oauth, params=params)

        if ans.status_code == 200:
            return ans.json()
        else:
            print('wyahooapi.py: Request status code not 200, status_code = ', str(ans.status_code))
            return None
项目:multi-cloud-control    作者:robertpeteuil    | 项目源码 | 文件源码
def conn_gcp(cred, crid):
    """Establish connection to GCP."""
    gcp_auth_type = cred.get('gcp_auth_type', "S")
    if gcp_auth_type == "A":  # Application Auth
        gcp_crd_ia = CONFIG_DIR + ".gcp_libcloud_a_auth." + cred['gcp_proj_id']
        gcp_crd = {'user_id': cred['gcp_client_id'],
                   'key': cred['gcp_client_sec'],
                   'project': cred['gcp_proj_id'],
                   'auth_type': "IA",
                   'credential_file': gcp_crd_ia}
    else:  # Service Account Auth
        gcp_pem = CONFIG_DIR + cred['gcp_pem_file']
        gcp_crd_sa = CONFIG_DIR + ".gcp_libcloud_s_auth." + cred['gcp_proj_id']
        gcp_crd = {'user_id': cred['gcp_svc_acct_email'],
                   'key': gcp_pem,
                   'project': cred['gcp_proj_id'],
                   'credential_file': gcp_crd_sa}
    driver = get_driver(Provider.GCE)
    try:
        gcp_obj = driver(**gcp_crd)
    except SSLError as e:
        abort_err("\r SSL Error with GCP: {}".format(e))
    except (InvalidCredsError, ValueError) as e:
        abort_err("\r Error with GCP Credentials: {}".format(e))
    return {crid: gcp_obj}
项目:apsconnect-cli    作者:ingrammicro    | 项目源码 | 文件源码
def _check_connector_backend_access(url, timeout=120):
    max_time = datetime.now() + timedelta(seconds=timeout)

    while True:
        try:
            response = get(url=url, timeout=10)

            if response.status_code == 200:
                print()
                return

            raise_by_max_time("Waiting time exceeded", max_time)
        except SSLError:
            raise_by_max_time("An SSL error occurred", max_time)
        except Timeout:
            raise_by_max_time("Timeout connecting to Connector Backend", max_time)
        except ConnectionError:
            raise_by_max_time("Connection error to Connector Backend", max_time)
        except Exception as e:
            raise_by_max_time(str(e), max_time)

        time.sleep(10)
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def get_container(self, name=None):
        '''
        Lookup a container and return the inspection results.
        '''
        if name is None:
            return None

        search_name = name
        if not name.startswith('/'):
            search_name = '/' + name

        result = None
        try:
            for container in self.containers(all=True):
                self.log("testing container: %s" % (container['Names']))
                if search_name in container['Names']:
                    result = container
                    break
                if container['Id'].startswith(name):
                    result = container
                    break
                if container['Id'] == name:
                    result = container
                    break
        except SSLError as exc:
            self._handle_ssl_error(exc)
        except Exception as exc:
            self.fail("Error retrieving container list: %s" % exc)

        if result is not None:
            try:
                self.log("Inspecting container Id %s" % result['Id'])
                result = self.inspect_container(container=result['Id'])
                self.log("Completed container inspection")
            except Exception as exc:
                self.fail("Error inspecting container: %s" % exc)

        return result
项目:multi-cloud-control    作者:robertpeteuil    | 项目源码 | 文件源码
def conn_aws(cred, crid):
    """Establish connection to AWS service."""
    driver = get_driver(Provider.EC2)
    try:
        aws_obj = driver(cred['aws_access_key_id'],
                         cred['aws_secret_access_key'],
                         region=cred['aws_default_region'])
    except SSLError as e:
        abort_err("\r SSL Error with AWS: {}".format(e))
    except InvalidCredsError as e:
        abort_err("\r Error with AWS Credentials: {}".format(e))
    return {crid: aws_obj}
项目:multi-cloud-control    作者:robertpeteuil    | 项目源码 | 文件源码
def conn_az(cred, crid):
    """Establish connection to Azure service."""
    driver = get_driver(Provider.AZURE_ARM)
    try:
        az_obj = driver(tenant_id=cred['az_tenant_id'],
                        subscription_id=cred['az_sub_id'],
                        key=cred['az_app_id'],
                        secret=cred['az_app_sec'])
    except SSLError as e:
        abort_err("\r SSL Error with Azure: {}".format(e))
    except InvalidCredsError as e:
        abort_err("\r Error with Azure Credentials: {}".format(e))
    return {crid: az_obj}
项目:synonym    作者:gavinzbq    | 项目源码 | 文件源码
def synonym(args):
    try:
        return _display_answer(args)
    except (ConnectionError, SSLError):
        return crayons.red('\nFailed to establish network connection.\n').color_str
项目:scdlbot    作者:gpchelkin    | 项目源码 | 文件源码
def botan_track(token, message, event_name):
    try:
        # uid = message.chat_id
        uid = message.from_user.id
    except AttributeError:
        logger.warning('Botan no chat_id in message')
        return False
    num_retries = 2
    ssl_verify = True
    for i in range(num_retries):
        try:
            r = requests.post(
                BOTAN_TRACK_URL,
                params={"token": token, "uid": uid, "name": event_name},
                json=json.loads(message.to_json()),
                verify=ssl_verify,
                timeout=2,
            )
            return r.json()
        except Timeout:
            logger.exception("Botan timeout on event: %s" % event_name)
        except SSLError:
            ssl_verify = False
        except (Exception, RequestException, ValueError):
            # catastrophic error
            logger.exception("Botan ??astrophic error on event: %s" % event_name)
    return False
项目:ender-chest-public    作者:samjo-nyang    | 项目源码 | 文件源码
def inspect(host, port):
    try:
        r = requests.get('https://%s:%s' % (host, port), timeout=2)
    except SSLError as e:
        errmsg = str(e)

        # CHECK ERR_HOST_NOT_MATCH
        if errmsg.startswith('hostname'):
            return ERR_HOST_NOT_MATCH

        try:
            raw_cert = ssl.get_server_certificate((host, port))
        except:
            return ERR_UNKNOWN

        x509 = crypto.load_certificate(crypto.FILETYPE_PEM, raw_cert)

        # CHECK ERR_EXPIRED
        now = datetime.now()
        not_after = datetime.strptime(x509.get_notAfter().decode('utf-8'),
                                      "%Y%m%d%H%M%SZ")
        not_before = datetime.strptime(x509.get_notBefore().decode('utf-8'),
                                       "%Y%m%d%H%M%SZ")
        if now > not_after or now < not_before:
            return ERR_EXPIRED

        # otherwise ERR_SELF_SIGNED
        return ERR_SELF_SIGNED
    except ConnectionError as e:
        return ERR_TIMEOUT
    except Timeout as e:
        return ERR_TIMEOUT
    except:
        return ERR_UNKNOWN
    return ERR_NONE
项目:gitsome    作者:donnemartin    | 项目源码 | 文件源码
def authenticate(func):
        """Decorator that authenticates credentials.

        :type func: callable
        :param func: A method to execute if authorization passes.

        :return: The return value of `func` if authorization passes, or
            None if authorization fails.
        """
        def auth_wrapper(self, *args, **kwargs):
            self.config.authenticate()
            self.config.save_config()
            if self.config.check_auth():
                try:
                    return func(self, *args, **kwargs)
                except SSLError:
                    click.secho(('SSL cert verification failed.\n  Try running '
                                 'gh configure --enterprise\n  and type '
                                 "'n' when asked whether to verify SSL certs."),
                                fg=self.config.clr_error)
                except MissingSchema:
                    click.secho('Invalid GitHub Enterprise url',
                                fg=self.config.clr_error)
                except AuthenticationFailed:
                    self.config.print_auth_error()
        return auth_wrapper
项目:ansible-provider-docs    作者:alibaba    | 项目源码 | 文件源码
def get_container(self, name=None):
        '''
        Lookup a container and return the inspection results.
        '''
        if name is None:
            return None

        search_name = name
        if not name.startswith('/'):
            search_name = '/' + name

        result = None
        try:
            for container in self.containers(all=True):
                self.log("testing container: %s" % (container['Names']))
                if isinstance(container['Names'], list) and search_name in container['Names']:
                    result = container
                    break
                if container['Id'].startswith(name):
                    result = container
                    break
                if container['Id'] == name:
                    result = container
                    break
        except SSLError as exc:
            self._handle_ssl_error(exc)
        except Exception as exc:
            self.fail("Error retrieving container list: %s" % exc)

        if result is not None:
            try:
                self.log("Inspecting container Id %s" % result['Id'])
                result = self.inspect_container(container=result['Id'])
                self.log("Completed container inspection")
            except Exception as exc:
                self.fail("Error inspecting container: %s" % exc)

        return result
项目:MrMime    作者:sLoPPydrive    | 项目源码 | 文件源码
def exception_caused_by_proxy_error(ex):
    if not ex.args:
        return False

    for arg in ex.args:
        if isinstance(arg, ProxyError) or isinstance(arg, SSLError) or isinstance(arg, ConnectionError):
            return True
        if isinstance(arg, Exception):
            return exception_caused_by_proxy_error(arg)

    return False
项目:ws-backend-community    作者:lavalamp-    | 项目源码 | 文件源码
def fingerprint_virtual_host(
        self,
        org_uuid=None,
        network_service_uuid=None,
        network_service_scan_uuid=None,
        use_ssl=None,
        hostname=None,
        order_uuid=None,
):
    """
    Get a virtual host fingerprint from the web service running at the given network service for
    the given hostname.
    :param org_uuid: The UUID of the organization to retrieve a fingerprint for.
    :param network_service_uuid: The UUID of the network service where the web service resides.
    :param network_service_scan_uuid: The UUID of the network service scan that this fingerprinting is a part
    of.
    :param use_ssl: Whether or not to use SSL to connect to the remote endpoint.
    :param hostname: The hostname to submit a request for.
    :return: None
    """
    logger.info(
        "Now retrieving virtual host fingerprint for service %s with hostname %s. Organization is %s."
        % (network_service_uuid, hostname, org_uuid)
    )
    ip_address, port, protocol = self.get_endpoint_information()
    inspector = WebServiceInspector(ip_address=ip_address, port=port, use_ssl=use_ssl, hostname=hostname)
    try:
        response = inspector.get()
    except (SSLError, ReadTimeout) as e:
        logger.error(
            "Error thrown when retrieving fingerprint: %s %s."
            % (e.__class__.__name__, e.message)
        )
        return
    logger.info(
        "Fingerprint retrieved for virtual host %s on service %s."
        % (hostname, network_service_uuid)
    )
    fingerprint_model = response.to_es_model(model_uuid=network_service_scan_uuid, db_session=self.db_session)
    fingerprint_model.save(org_uuid)
    logger.info("Fingerprint pushed to Elasticsearch successfully.")
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def impl(context):
    verify=False
    if context.SERVER.startswith("https"):
        verify=os.path.join(context.CERT_DIR, "ca.pem")

    try:
        context.request = requests.get(
            context.SERVER + '/api/v0/status',
            auth=context.auth,
            verify=verify,
            cert=getattr(context, "cert", None))
    except SSLError, e:
        context.request_ssl_error = e
项目:Sentry    作者:NetEaseGame    | 项目源码 | 文件源码
def safe_urlopen(url, method=None, params=None, data=None, json=None,
                 headers=None, allow_redirects=False, timeout=30,
                 verify_ssl=True, user_agent=None):
    """
    A slightly safer version of ``urlib2.urlopen`` which prevents redirection
    and ensures the URL isn't attempting to hit a blacklisted IP range.
    """
    if user_agent is not None:
        warnings.warn('user_agent is no longer used with safe_urlopen')

    session = build_session()

    kwargs = {}

    if json:
        kwargs['json'] = json
        if not headers:
            headers = {}
        headers.setdefault('Content-Type', 'application/json')

    if data:
        kwargs['data'] = data

    if params:
        kwargs['params'] = params

    if headers:
        kwargs['headers'] = headers

    if method is None:
        method = 'POST' if (data or json) else 'GET'

    try:
        response = session.request(
            method=method,
            url=url,
            allow_redirects=allow_redirects,
            timeout=timeout,
            verify=verify_ssl,
            **kwargs
        )
    # Our version of requests does not transform ZeroReturnError into an
    # appropriately generically catchable exception
    except ZeroReturnError as exc:
        import sys
        exc_tb = sys.exc_info()[2]
        six.reraise(SSLError, exc, exc_tb)
        del exc_tb

    # requests' attempts to use chardet internally when no encoding is found
    # and we want to avoid that slow behavior
    if not response.encoding:
        response.encoding = 'utf-8'

    return response
项目:rowgenerators    作者:Metatab    | 项目源码 | 文件源码
def _download(self, url, cache_path):
        import requests

        def copy_callback(read, total):
            if self.callback:
                self.callback('copy_file', read, total)

        if self.callback:
            self.callback('download', url, 0)

        if url.startswith('s3:'):

            from appurl.url import Url

            s3url = Url(url)

            try:
                with self.cache.open(cache_path, 'wb') as f:
                    s3url.object.download_fileobj(f)
            except Exception as e:
                raise DownloadError("Failed to fetch S3 url '{}': {}".format(url, e))

        elif url.startswith('ftp:'):
            from contextlib import closing

            with closing(urlopen(url)) as fin:

                with self.cache.open(cache_path, 'wb') as fout:

                    read_len = 16 * 1024
                    total_len = 0
                    while 1:
                        buf = fin.read(read_len)
                        if not buf:
                            break
                        fout.write(buf)
                        total_len += len(buf)

                        if self.callback:
                            copy_callback(len(buf), total_len)

        else:

            try:
                r = requests.get(url, stream=True)
                r.raise_for_status()
            except SSLError as e:
                raise DownloadError("Failed to GET {}: {} ".format(url, e))

            # Requests will auto decode gzip responses, but not when streaming. This following
            # monkey patch is recommended by a core developer at
            # https://github.com/kennethreitz/requests/issues/2155
            if r.headers.get('content-encoding') == 'gzip':
                r.raw.read = functools.partial(r.raw.read, decode_content=True)

            with self.cache.open(cache_path, 'wb') as f:
                copy_file_or_flo(r.raw, f, cb=copy_callback)

            assert self.cache.exists(cache_path)