我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用requests.exceptions.SSLError()。
def run_query(self): q = 'select * from weather.forecast where woeid=%s' % self.woeid url = 'https://query.yahooapis.com/v1/yql?q=%s' % q params = {} params['format'] = 'json' try: ans = requests.get(url, auth=self.oauth, params=params) except SSLError as e: ''' Bug #1568774 ''' print('wyahooapi.py: Bug #1568774', str(e)) print('wyahooapi.py: Unable to query https url, switch to http url') url = 'http://query.yahooapis.com/v1/yql?q=%s' % q ans = requests.get(url, auth=self.oauth, params=params) if ans.status_code == 200: return ans.json() else: print('wyahooapi.py: Request status code not 200, status_code = ', str(ans.status_code)) return None
def conn_gcp(cred, crid): """Establish connection to GCP.""" gcp_auth_type = cred.get('gcp_auth_type', "S") if gcp_auth_type == "A": # Application Auth gcp_crd_ia = CONFIG_DIR + ".gcp_libcloud_a_auth." + cred['gcp_proj_id'] gcp_crd = {'user_id': cred['gcp_client_id'], 'key': cred['gcp_client_sec'], 'project': cred['gcp_proj_id'], 'auth_type': "IA", 'credential_file': gcp_crd_ia} else: # Service Account Auth gcp_pem = CONFIG_DIR + cred['gcp_pem_file'] gcp_crd_sa = CONFIG_DIR + ".gcp_libcloud_s_auth." + cred['gcp_proj_id'] gcp_crd = {'user_id': cred['gcp_svc_acct_email'], 'key': gcp_pem, 'project': cred['gcp_proj_id'], 'credential_file': gcp_crd_sa} driver = get_driver(Provider.GCE) try: gcp_obj = driver(**gcp_crd) except SSLError as e: abort_err("\r SSL Error with GCP: {}".format(e)) except (InvalidCredsError, ValueError) as e: abort_err("\r Error with GCP Credentials: {}".format(e)) return {crid: gcp_obj}
def _check_connector_backend_access(url, timeout=120): max_time = datetime.now() + timedelta(seconds=timeout) while True: try: response = get(url=url, timeout=10) if response.status_code == 200: print() return raise_by_max_time("Waiting time exceeded", max_time) except SSLError: raise_by_max_time("An SSL error occurred", max_time) except Timeout: raise_by_max_time("Timeout connecting to Connector Backend", max_time) except ConnectionError: raise_by_max_time("Connection error to Connector Backend", max_time) except Exception as e: raise_by_max_time(str(e), max_time) time.sleep(10)
def get_container(self, name=None): ''' Lookup a container and return the inspection results. ''' if name is None: return None search_name = name if not name.startswith('/'): search_name = '/' + name result = None try: for container in self.containers(all=True): self.log("testing container: %s" % (container['Names'])) if search_name in container['Names']: result = container break if container['Id'].startswith(name): result = container break if container['Id'] == name: result = container break except SSLError as exc: self._handle_ssl_error(exc) except Exception as exc: self.fail("Error retrieving container list: %s" % exc) if result is not None: try: self.log("Inspecting container Id %s" % result['Id']) result = self.inspect_container(container=result['Id']) self.log("Completed container inspection") except Exception as exc: self.fail("Error inspecting container: %s" % exc) return result
def conn_aws(cred, crid): """Establish connection to AWS service.""" driver = get_driver(Provider.EC2) try: aws_obj = driver(cred['aws_access_key_id'], cred['aws_secret_access_key'], region=cred['aws_default_region']) except SSLError as e: abort_err("\r SSL Error with AWS: {}".format(e)) except InvalidCredsError as e: abort_err("\r Error with AWS Credentials: {}".format(e)) return {crid: aws_obj}
def conn_az(cred, crid): """Establish connection to Azure service.""" driver = get_driver(Provider.AZURE_ARM) try: az_obj = driver(tenant_id=cred['az_tenant_id'], subscription_id=cred['az_sub_id'], key=cred['az_app_id'], secret=cred['az_app_sec']) except SSLError as e: abort_err("\r SSL Error with Azure: {}".format(e)) except InvalidCredsError as e: abort_err("\r Error with Azure Credentials: {}".format(e)) return {crid: az_obj}
def synonym(args): try: return _display_answer(args) except (ConnectionError, SSLError): return crayons.red('\nFailed to establish network connection.\n').color_str
def botan_track(token, message, event_name): try: # uid = message.chat_id uid = message.from_user.id except AttributeError: logger.warning('Botan no chat_id in message') return False num_retries = 2 ssl_verify = True for i in range(num_retries): try: r = requests.post( BOTAN_TRACK_URL, params={"token": token, "uid": uid, "name": event_name}, json=json.loads(message.to_json()), verify=ssl_verify, timeout=2, ) return r.json() except Timeout: logger.exception("Botan timeout on event: %s" % event_name) except SSLError: ssl_verify = False except (Exception, RequestException, ValueError): # catastrophic error logger.exception("Botan ??astrophic error on event: %s" % event_name) return False
def inspect(host, port): try: r = requests.get('https://%s:%s' % (host, port), timeout=2) except SSLError as e: errmsg = str(e) # CHECK ERR_HOST_NOT_MATCH if errmsg.startswith('hostname'): return ERR_HOST_NOT_MATCH try: raw_cert = ssl.get_server_certificate((host, port)) except: return ERR_UNKNOWN x509 = crypto.load_certificate(crypto.FILETYPE_PEM, raw_cert) # CHECK ERR_EXPIRED now = datetime.now() not_after = datetime.strptime(x509.get_notAfter().decode('utf-8'), "%Y%m%d%H%M%SZ") not_before = datetime.strptime(x509.get_notBefore().decode('utf-8'), "%Y%m%d%H%M%SZ") if now > not_after or now < not_before: return ERR_EXPIRED # otherwise ERR_SELF_SIGNED return ERR_SELF_SIGNED except ConnectionError as e: return ERR_TIMEOUT except Timeout as e: return ERR_TIMEOUT except: return ERR_UNKNOWN return ERR_NONE
def authenticate(func): """Decorator that authenticates credentials. :type func: callable :param func: A method to execute if authorization passes. :return: The return value of `func` if authorization passes, or None if authorization fails. """ def auth_wrapper(self, *args, **kwargs): self.config.authenticate() self.config.save_config() if self.config.check_auth(): try: return func(self, *args, **kwargs) except SSLError: click.secho(('SSL cert verification failed.\n Try running ' 'gh configure --enterprise\n and type ' "'n' when asked whether to verify SSL certs."), fg=self.config.clr_error) except MissingSchema: click.secho('Invalid GitHub Enterprise url', fg=self.config.clr_error) except AuthenticationFailed: self.config.print_auth_error() return auth_wrapper
def get_container(self, name=None): ''' Lookup a container and return the inspection results. ''' if name is None: return None search_name = name if not name.startswith('/'): search_name = '/' + name result = None try: for container in self.containers(all=True): self.log("testing container: %s" % (container['Names'])) if isinstance(container['Names'], list) and search_name in container['Names']: result = container break if container['Id'].startswith(name): result = container break if container['Id'] == name: result = container break except SSLError as exc: self._handle_ssl_error(exc) except Exception as exc: self.fail("Error retrieving container list: %s" % exc) if result is not None: try: self.log("Inspecting container Id %s" % result['Id']) result = self.inspect_container(container=result['Id']) self.log("Completed container inspection") except Exception as exc: self.fail("Error inspecting container: %s" % exc) return result
def exception_caused_by_proxy_error(ex): if not ex.args: return False for arg in ex.args: if isinstance(arg, ProxyError) or isinstance(arg, SSLError) or isinstance(arg, ConnectionError): return True if isinstance(arg, Exception): return exception_caused_by_proxy_error(arg) return False
def fingerprint_virtual_host( self, org_uuid=None, network_service_uuid=None, network_service_scan_uuid=None, use_ssl=None, hostname=None, order_uuid=None, ): """ Get a virtual host fingerprint from the web service running at the given network service for the given hostname. :param org_uuid: The UUID of the organization to retrieve a fingerprint for. :param network_service_uuid: The UUID of the network service where the web service resides. :param network_service_scan_uuid: The UUID of the network service scan that this fingerprinting is a part of. :param use_ssl: Whether or not to use SSL to connect to the remote endpoint. :param hostname: The hostname to submit a request for. :return: None """ logger.info( "Now retrieving virtual host fingerprint for service %s with hostname %s. Organization is %s." % (network_service_uuid, hostname, org_uuid) ) ip_address, port, protocol = self.get_endpoint_information() inspector = WebServiceInspector(ip_address=ip_address, port=port, use_ssl=use_ssl, hostname=hostname) try: response = inspector.get() except (SSLError, ReadTimeout) as e: logger.error( "Error thrown when retrieving fingerprint: %s %s." % (e.__class__.__name__, e.message) ) return logger.info( "Fingerprint retrieved for virtual host %s on service %s." % (hostname, network_service_uuid) ) fingerprint_model = response.to_es_model(model_uuid=network_service_scan_uuid, db_session=self.db_session) fingerprint_model.save(org_uuid) logger.info("Fingerprint pushed to Elasticsearch successfully.")
def impl(context): verify=False if context.SERVER.startswith("https"): verify=os.path.join(context.CERT_DIR, "ca.pem") try: context.request = requests.get( context.SERVER + '/api/v0/status', auth=context.auth, verify=verify, cert=getattr(context, "cert", None)) except SSLError, e: context.request_ssl_error = e
def safe_urlopen(url, method=None, params=None, data=None, json=None, headers=None, allow_redirects=False, timeout=30, verify_ssl=True, user_agent=None): """ A slightly safer version of ``urlib2.urlopen`` which prevents redirection and ensures the URL isn't attempting to hit a blacklisted IP range. """ if user_agent is not None: warnings.warn('user_agent is no longer used with safe_urlopen') session = build_session() kwargs = {} if json: kwargs['json'] = json if not headers: headers = {} headers.setdefault('Content-Type', 'application/json') if data: kwargs['data'] = data if params: kwargs['params'] = params if headers: kwargs['headers'] = headers if method is None: method = 'POST' if (data or json) else 'GET' try: response = session.request( method=method, url=url, allow_redirects=allow_redirects, timeout=timeout, verify=verify_ssl, **kwargs ) # Our version of requests does not transform ZeroReturnError into an # appropriately generically catchable exception except ZeroReturnError as exc: import sys exc_tb = sys.exc_info()[2] six.reraise(SSLError, exc, exc_tb) del exc_tb # requests' attempts to use chardet internally when no encoding is found # and we want to avoid that slow behavior if not response.encoding: response.encoding = 'utf-8' return response
def _download(self, url, cache_path): import requests def copy_callback(read, total): if self.callback: self.callback('copy_file', read, total) if self.callback: self.callback('download', url, 0) if url.startswith('s3:'): from appurl.url import Url s3url = Url(url) try: with self.cache.open(cache_path, 'wb') as f: s3url.object.download_fileobj(f) except Exception as e: raise DownloadError("Failed to fetch S3 url '{}': {}".format(url, e)) elif url.startswith('ftp:'): from contextlib import closing with closing(urlopen(url)) as fin: with self.cache.open(cache_path, 'wb') as fout: read_len = 16 * 1024 total_len = 0 while 1: buf = fin.read(read_len) if not buf: break fout.write(buf) total_len += len(buf) if self.callback: copy_callback(len(buf), total_len) else: try: r = requests.get(url, stream=True) r.raise_for_status() except SSLError as e: raise DownloadError("Failed to GET {}: {} ".format(url, e)) # Requests will auto decode gzip responses, but not when streaming. This following # monkey patch is recommended by a core developer at # https://github.com/kennethreitz/requests/issues/2155 if r.headers.get('content-encoding') == 'gzip': r.raw.read = functools.partial(r.raw.read, decode_content=True) with self.cache.open(cache_path, 'wb') as f: copy_file_or_flo(r.raw, f, cb=copy_callback) assert self.cache.exists(cache_path)