我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用certifi.where()。
def fetch_service_config_rollout_strategy(metadata): """Fetch service config rollout strategy from metadata URL.""" url = metadata + _METADATA_PATH + "/attributes/" + \ _METADATA_ROLLOUT_STRATEGY headers = {"Metadata-Flavor": "Google"} client = urllib3.PoolManager(ca_certs=certifi.where()) try: response = client.request("GET", url, headers=headers) except: logging.info("Failed to fetch service config rollout strategy " + \ "from the metadata server: " + url); return None status_code = response.status if status_code != 200: # Fetching rollout strategy is optional. No need to leave log return None rollout_strategy = response.data logging.info("Service config rollout strategy: " + rollout_strategy) return rollout_strategy
def fetch_service_name(metadata): """Fetch service name from metadata URL.""" url = metadata + _METADATA_PATH + "/attributes/" + _METADATA_SERVICE_NAME headers = {"Metadata-Flavor": "Google"} client = urllib3.PoolManager(ca_certs=certifi.where()) try: response = client.request("GET", url, headers=headers) except: raise FetchError(1, "Failed to fetch service name from the metadata server: " + url) status_code = response.status if status_code != 200: message_template = "Fetching service name failed (url {}, status code {})" raise FetchError(1, message_template.format(url, status_code)) name = response.data logging.info("Service name: " + name) return name # config_id from metadata is optional. Returns None instead of raising error
def fetch_access_token(metadata): """Fetch access token from metadata URL.""" access_token_url = metadata + _METADATA_PATH + "/service-accounts/default/token" headers = {"Metadata-Flavor": "Google"} client = urllib3.PoolManager(ca_certs=certifi.where()) try: response = client.request("GET", access_token_url, headers=headers) except: raise FetchError(1, "Failed to fetch access token from the metadata server: " + access_token_url) status_code = response.status if status_code != 200: message_template = "Fetching access token failed (url {}, status code {})" raise FetchError(1, message_template.format(access_token_url, status_code)) token = json.loads(response.data)["access_token"] return token
def __init__(self) -> None: http_client = urllib3.PoolManager( timeout=urllib3.Timeout.DEFAULT_TIMEOUT, cert_reqs='CERT_REQUIRED', ca_certs=certifi.where(), retries=urllib3.Retry( total=5, backoff_factor=0.2, status_forcelist=[500, 502, 503, 504] ), maxsize=20 ) self.client = minio.Minio( S3_SERVER, access_key=S3_ACCESS_KEY, secret_key=S3_SECRET_KEY, region=S3_REGION, secure=S3_SERVER == 's3.amazonaws.com', http_client=http_client )
def get_connection(): # pycurl initialization h = pycurl.Curl() # follow redirects h.setopt(pycurl.FOLLOWLOCATION, False) # enable compression h.setopt(pycurl.ENCODING, 'gzip, deflate') # certifi h.setopt(pycurl.CAINFO, certifi.where()) # no signal h.setopt(pycurl.NOSIGNAL, 1) # certificate informations h.setopt(pycurl.OPT_CERTINFO, 1) return h
def __getPage(self,url): http = urllib3.PoolManager( cert_reqs='CERT_REQUIRED', # Force certificate check. ca_certs=certifi.where(), # Path to the Certifi bundle. ) data = '' try: data = http.request('GET', url, timeout=10, headers={ 'User-agent' : 'Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.5) Gecko/20091102 Firefox/3.5.5'} ).data codeType = chardet.detect(data) data = data.decode(codeType['encoding']) except: pass return data
def check_cert(): """ Checks availability of "cacert.pem" file installed by the certifi module """ LOGGER.info('certificate: checking') import certifi from emft.core.path import Path cacert = str(Path(certifi.where()).abspath()) if not os.path.exists(cacert): raise FileNotFoundError(cacert) # # noinspection SpellCheckingInspection # if not cacert.crc32() == 'D069EE01': # raise ImportError('cacert.pem file is corrupted: {}'.format(cacert.crc32())) LOGGER.debug('setting up local cacert file to: {}'.format(cacert)) os.environ['REQUESTS_CA_BUNDLE'] = cacert LOGGER.info('certificate: checked')
def mercatox(): http = urllib3.PoolManager(cert_reqs='CERT_REQUIRED',ca_certs=certifi.where()) url = 'https://mercatox.com/public/json24' #response = http.request('GET', url, headers=header, timeout=20.0) response = http.request('GET', url, timeout=20.0) json_mercatox = json.loads(response.data) json_array = json_mercatox['pairs']['XRB_BTC'] try: last_price = int(float(json_array['last']) * (10 ** 8)) except KeyError: last_price = 0 high_price = int(float(json_array['high24hr']) * (10 ** 8)) low_price = int(float(json_array['low24hr']) * (10 ** 8)) ask_price = int(float(json_array['lowestAsk']) * (10 ** 8)) bid_price = int(float(json_array['highestBid']) * (10 ** 8)) volume = int(float(json_array['baseVolume'])) btc_volume = int(float(json_array['quoteVolume']) * (10 ** 8)) mysql_set_price(1, last_price, high_price, low_price, ask_price, bid_price, volume, btc_volume)
def monitoring_block_count(): # set bot bot = Bot(api_key) count = int(rpc({"action": "block_count"}, 'count')) reference_count = int(reference_block_count()) http = urllib3.PoolManager(cert_reqs='CERT_REQUIRED',ca_certs=certifi.where()) response = http.request('GET', summary_url, headers=header, timeout=20.0) try: json_data = json.loads(response.data) community_count = int(json_data['blocks']) except ValueError as e: community_count = reference_count difference = int(math.fabs(community_count - count)) response = http.request('GET', block_count_url, headers=header, timeout=20.0) raiwallet_count = int(response.data) if (difference > block_count_difference_threshold): # Warning to admins for user_id in admin_list: push(bot, user_id, 'Block count: {0}\nCommunity: {1}\nDifference: *{2}*\nReference: {3}\nraiwallet.info: {4}'.format(count, community_count, difference, reference_count, raiwallet_count)) # trying to fix bootstrap_multi()
def create_dest_sslcontext(insecure=False, trusted_ca_certs="", alpn=None): ssl_ctx = create_basic_sslcontext() if not insecure: trusted_ca_certs = trusted_ca_certs or certifi.where() ssl_ctx.load_verify_locations(trusted_ca_certs) ssl_ctx.set_verify( SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT, certificate_verify_cb) else: ssl_ctx.set_verify(SSL.VERIFY_NONE, certificate_verify_cb) if alpn and HAS_ALPN: ssl_ctx.set_alpn_protos(alpn) return ssl_ctx
def __create_connection(self, config): kwargs = { 'host': config['HOST'], 'port': config.get('PORT', 9200), 'use_ssl': config.get('USE_SSL', False), 'verify_certs': True, 'ca_certs': certifi.where() } if 'AWS_ACCESS_KEY' in config and \ 'AWS_SECRET_KEY' in config and \ 'AWS_REGION' in config: kwargs['connection_class'] = RequestsHttpConnection kwargs['http_auth'] = AWSRequestsAuth( aws_access_key=config['AWS_ACCESS_KEY'], aws_secret_access_key=config['AWS_SECRET_KEY'], aws_host=config['HOST'], aws_region=config['AWS_REGION'], aws_service='es') es = Elasticsearch(**kwargs) es._index = config['INDEX_NAME'] es._settings = config.get('INDEX_SETTINGS', DEFAULT_INDEX_SETTINGS) return es
def getShortcutIcon(pkgname): try: return getShortcutIcon.cache[pkgname] except AttributeError: getShortcutIcon.cache = {} except KeyError: try: packageUrl = parsePackage(getPackageInfo(pkgname))[3] pool = PoolManager(cert_reqs='CERT_REQUIRED', ca_certs=where()) pageRequest = pool.request('GET', packageUrl, timeout=3.0) pageContent = pageRequest.data pageSoup = BeautifulSoup(pageContent, 'html.parser') iconLink = pageSoup.find('link', rel='icon')['href'] getShortcutIcon.cache[pkgname] = urljoin(packageUrl, iconLink) except: try: getShortcutIcon.cache[pkgname] = urljoin(packageUrl, '/favicon.ico') except: getShortcutIcon.cache[pkgname] = None return getShortcutIcon(pkgname)
def _get_url(self, url): if self.API_TOKEN == None: logging.error('none token') # 3 For ERROR level return try: c = pycurl.Curl() c.setopt(pycurl.CAINFO, certifi.where()) c.setopt(pycurl.URL, url) b = StringIO.StringIO() c.setopt(pycurl.WRITEFUNCTION, b.write) c.setopt(pycurl.USERAGENT, "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)") c.setopt(pycurl.HTTPHEADER, ['Authorization: JWT %s' % self.API_TOKEN.encode()]) c.setopt(pycurl.CUSTOMREQUEST, "GET") c.setopt(pycurl.FOLLOWLOCATION, 1) c.perform() result = b.getvalue() logging.debug('result') except Exception as e: logging.error(e.message) logging.error('go error') pass return result
def where(): """Return the preferred certificate bundle.""" # vendored bundle inside Requests return os.path.join(os.path.dirname(__file__), 'cacert.pem')
def __init__(self, api_key, source, logger): self.apiKey = api_key self.source = source self.logger = logger self.sort_error_displayed = False self.http = urllib3.PoolManager( cert_reqs = "CERT_REQUIRED", # Force certificate check. ca_certs = certifi.where(), # Path to the Certifi bundle. ) # Retrieves articles from source
def get_ssl_context(*args): """Create and return an SSLContext object.""" certfile, keyfile, ca_certs, cert_reqs = args # Note PROTOCOL_SSLv23 is about the most misleading name imaginable. # This configures the server and client to negotiate the # highest protocol version they both support. A very good thing. ctx = SSLContext(ssl.PROTOCOL_SSLv23) if hasattr(ctx, "options"): # Explicitly disable SSLv2 and SSLv3. Note that up to # date versions of MongoDB 2.4 and above already do this, # python disables SSLv2 by default in >= 2.7.7 and >= 3.3.4 # and SSLv3 in >= 3.4.3. There is no way for us to do this # explicitly for python 2.6 or 2.7 before 2.7.9. ctx.options |= getattr(ssl, "OP_NO_SSLv2", 0) ctx.options |= getattr(ssl, "OP_NO_SSLv3", 0) if certfile is not None: ctx.load_cert_chain(certfile, keyfile) if ca_certs is not None: ctx.load_verify_locations(ca_certs) elif cert_reqs != ssl.CERT_NONE: # CPython >= 2.7.9 or >= 3.4.0, pypy >= 2.5.1 if hasattr(ctx, "load_default_certs"): ctx.load_default_certs() # Python >= 3.2.0, useless on Windows. elif (sys.platform != "win32" and hasattr(ctx, "set_default_verify_paths")): ctx.set_default_verify_paths() elif sys.platform == "win32" and HAVE_WINCERTSTORE: with _WINCERTSLOCK: if _WINCERTS is None: _load_wincerts() ctx.load_verify_locations(_WINCERTS.name) elif HAVE_CERTIFI: ctx.load_verify_locations(certifi.where()) else: raise ConfigurationError( "`ssl_cert_reqs` is not ssl.CERT_NONE and no system " "CA certificates could be loaded. `ssl_ca_certs` is " "required.") ctx.verify_mode = ssl.CERT_REQUIRED if cert_reqs is None else cert_reqs return ctx
def find_ca_bundle(): """Return an existing CA bundle path, or None""" if os.name == 'nt': return get_win_certfile() else: for cert_path in cert_paths: if os.path.isfile(cert_path): return cert_path try: import certifi return certifi.where() except (ImportError, ResolutionError, ExtractionError): return None
def _default_ca_certs(): if certifi is None: raise Exception("The 'certifi' package is required to use https " "in simple_httpclient") return certifi.where()
def resolve(self, host, port, family=socket.AF_UNSPEC, callback=None): """Resolves an address. The ``host`` argument is a string which may be a hostname or a literal IP address. Returns a `.Future` whose result is a list of (family, address) pairs, where address is a tuple suitable to pass to `socket.connect <socket.socket.connect>` (i.e. a ``(host, port)`` pair for IPv4; additional fields may be present for IPv6). If a ``callback`` is passed, it will be run with the result as an argument when it is complete. """ raise NotImplementedError()
def fetch_service_config_id(metadata): """Fetch service config ID from metadata URL.""" url = metadata + _METADATA_PATH + "/attributes/" + _METADATA_SERVICE_CONFIG_ID headers = {"Metadata-Flavor": "Google"} client = urllib3.PoolManager(ca_certs=certifi.where()) try: response = client.request("GET", url, headers=headers) if response.status != 200: # Fetching service config id is optional. No need to leave log raise None except: url = metadata + _METADATA_PATH + "/attributes/endpoints-service-version" try: response = client.request("GET", url, headers=headers) except: logging.info("Failed to fetch service config ID from the metadata server: " + url) return None if response.status != 200: message_template = "Fetching service config ID failed (url {}, status code {})" logging.info(message_template.format(url, response.status)) return None version = response.data logging.info("Service config ID:" + version) return version
def fetch_latest_rollout(management_service, service_name, access_token): """Fetch rollouts""" if access_token is None: headers = {} else: headers = {"Authorization": "Bearer {}".format(access_token)} client = urllib3.PoolManager(ca_certs=certifi.where()) service_mgmt_url = SERVICE_MGMT_ROLLOUTS_URL_TEMPLATE.format(management_service, service_name) try: response = client.request("GET", service_mgmt_url, headers=headers) except: raise FetchError(1, "Failed to fetch rollouts") status_code = response.status if status_code != 200: message_template = ("Fetching rollouts failed "\ "(status code {}, reason {}, url {})") raise FetchError(1, message_template.format(status_code, response.reason, service_mgmt_url)) rollouts = json.loads(response.data) # No valid rollouts if rollouts is None or \ 'rollouts' not in rollouts or \ len(rollouts["rollouts"]) == 0 or \ "rolloutId" not in rollouts["rollouts"][0] or \ "trafficPercentStrategy" not in rollouts["rollouts"][0] or \ "percentages" not in rollouts["rollouts"][0]["trafficPercentStrategy"]: message_template = ("Invalid rollouts response (url {}, data {})") raise FetchError(1, message_template.format(service_mgmt_url, response.data)) return rollouts["rollouts"][0]
def _get_ca_bundle(): """ If verify=True, then requests is using the built in certifi CA database. Attempt to get that path for the websocket. """ try: import certifi return certifi.where() except ImportError: pass
def parse_es_section(parser, es_section): ES_config = namedtuple('ES_config', ['es_read', 'es_write', 'es_read_git_index', 'es_write_git_index']) user = parser.get(es_section, 'user') password = parser.get(es_section, 'password') host = parser.get(es_section, 'host') port = parser.get(es_section, 'port') path = parser.get(es_section, 'path') es_read_git_index = parser.get(es_section, 'index_git_raw') host_output = parser.get(es_section, 'host_output') port_output = parser.get(es_section, 'port_output') user_output = parser.get(es_section, 'user_output') password_output = parser.get(es_section, 'password_output') path_output = parser.get(es_section, 'path_output') es_write_git_index = parser.get(es_section, 'index_git_output') connection_input = "https://" + user + ":" + password + "@" + host + ":"\ + port + "/" + path print("Input ES:", connection_input) es_read = Elasticsearch([connection_input], use_ssl=True, verity_certs=True, ca_cert=certifi.where(), timeout=100) credentials = "" if user_output: credentials = user_output + ":" + password_output + "@" connection_output = "http://" + credentials + host_output + ":"\ + port_output + "/" + path_output # es_write = Elasticsearch([connection_output], use_ssl=True, # verity_certs=True, ca_cert=certifi.where(), # scroll='300m', timeout=100) print("Output ES:", connection_output) es_write = Elasticsearch([connection_output]) return ES_config(es_read=es_read, es_write=es_write, es_read_git_index=es_read_git_index, es_write_git_index=es_write_git_index)
def ESConnection(): parser = configparser.ConfigParser() conf_file = '.settings' fd = open(conf_file, 'r') parser.readfp(fd) fd.close() sections = parser.sections() for section in sections: options = parser.options(section) for option in options: if option == 'user': user = parser.get(section, option) if option == 'password': password = parser.get(section, option) if option == 'host': host = parser.get(section, option) if option == 'port': port = parser.get(section, option) if option == 'path': path = parser.get(section, option) if option == 'index_git_raw': es_read_git_index = parser.get(section, option) if option == 'host_output': host_output = parser.get(section, option) if option == 'port_output': port_output = parser.get(section, option) if option == 'user_output': user_output = parser.get(section, option) if option == 'password_output': password_output = parser.get(section, option) if option == 'path_output': path_output = parser.get(section, option) if option == 'index_git_output': es_write_git_index = parser.get(section, option) connection = "https://" + user + ":" + password + "@" + host + ":" + port + "/" + path print (connection) es_read = Elasticsearch([connection], use_ssl=True, verity_certs=True, ca_cert=certifi.where(), scroll='300m', timeout=100) credentials = "" if user_output: credentials = user_output + ":" + password_output + "@" connection_output = "http://" + credentials + host_output + ":" + port_output + "/" + path_output #es_write = Elasticsearch([connection_output], use_ssl=True, verity_certs=True, ca_cert=certifi.where(), scroll='300m', timeout=100) es_write = Elasticsearch([connection_output]) print (connection_output) return es_read, es_write, es_read_git_index, es_write_git_index
def ESConnection(): parser = configparser.ConfigParser() conf_file = 'settings' fd = open(conf_file, 'r') parser.readfp(fd) fd.close() sections = parser.sections() for section in sections: options = parser.options(section) for option in options: if option == 'user': user = parser.get(section, option) if option == 'password': password = parser.get(section, option) if option == 'host': host = parser.get(section, option) if option == 'port': port = parser.get(section, option) if option == 'path': path = parser.get(section, option) if option == 'genderize_key': genderize_key = parser.get(section, option) if option == 'index_gerrit_raw': es_read_gerrit_index = parser.get(section, option) if option == 'index_git_raw': es_read_git_index = parser.get(section, option) if option == 'host_output': host_output = parser.get(section, option) if option == 'port_output': port_output = parser.get(section, option) if option == 'user_output': user_output = parser.get(section, option) if option == 'password_output': password_output = parser.get(section, option) if option == 'path_output': path_output = parser.get(section, option) if option == 'index_gerrit_output': es_write_gerrit_index = parser.get(section, option) if option == 'index_git_output': es_write_git_index = parser.get(section, option) connection = "https://" + user + ":" + password + "@" + host + ":" + port + "/" print (connection) es_read = Elasticsearch([connection], use_ssl=True, verity_certs=True, ca_cert=certifi.where(), scroll='300m', timeout=100) connection_output = "https://" + user_output + ":" + password_output + "@" + host_output + ":" + port_output + "/" + path_output es_write = Elasticsearch([connection_output], use_ssl=True, verity_certs=True, ca_cert=certifi.where(), scroll='300m', timeout=100) return es_read, es_write, es_read_git_index, es_read_gerrit_index, \ es_write_git_index, es_write_gerrit_index, genderize_key
def read_all(self): """Return buffered data received on stdout and stderr channels. This is useful for non-interactive call where a set of command passed to the API call and their result is needed after the call is concluded. Should be called after run_forever() or update() TODO: Maybe we can process this and return a more meaningful map with channels mapped for each input. """ out = self._all self._all = "" self._channels = {} return out