Python certifi 模块,where() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用certifi.where()

项目:endpoints-tools    作者:cloudendpoints    | 项目源码 | 文件源码
def fetch_service_config_rollout_strategy(metadata):
    """Fetch service config rollout strategy from metadata URL."""
    url = metadata + _METADATA_PATH + "/attributes/" + \
        _METADATA_ROLLOUT_STRATEGY
    headers = {"Metadata-Flavor": "Google"}
    client = urllib3.PoolManager(ca_certs=certifi.where())
    try:
        response = client.request("GET", url, headers=headers)
    except:
        logging.info("Failed to fetch service config rollout strategy " + \
            "from the metadata server: " + url);
        return None
    status_code = response.status

    if status_code != 200:
        # Fetching rollout strategy is optional. No need to leave log
        return None

    rollout_strategy = response.data
    logging.info("Service config rollout strategy: " + rollout_strategy)
    return rollout_strategy
项目:endpoints-tools    作者:cloudendpoints    | 项目源码 | 文件源码
def fetch_service_name(metadata):
    """Fetch service name from metadata URL."""
    url = metadata + _METADATA_PATH + "/attributes/" + _METADATA_SERVICE_NAME
    headers = {"Metadata-Flavor": "Google"}
    client = urllib3.PoolManager(ca_certs=certifi.where())
    try:
        response = client.request("GET", url, headers=headers)
    except:
        raise FetchError(1,
            "Failed to fetch service name from the metadata server: " + url)
    status_code = response.status

    if status_code != 200:
        message_template = "Fetching service name failed (url {}, status code {})"
        raise FetchError(1, message_template.format(url, status_code))

    name = response.data
    logging.info("Service name: " + name)
    return name

# config_id from metadata is optional. Returns None instead of raising error
项目:endpoints-tools    作者:cloudendpoints    | 项目源码 | 文件源码
def fetch_access_token(metadata):
    """Fetch access token from metadata URL."""
    access_token_url = metadata + _METADATA_PATH + "/service-accounts/default/token"
    headers = {"Metadata-Flavor": "Google"}
    client = urllib3.PoolManager(ca_certs=certifi.where())
    try:
        response = client.request("GET", access_token_url, headers=headers)
    except:
        raise FetchError(1,
            "Failed to fetch access token from the metadata server: " + access_token_url)
    status_code = response.status

    if status_code != 200:
        message_template = "Fetching access token failed (url {}, status code {})"
        raise FetchError(1, message_template.format(access_token_url, status_code))

    token = json.loads(response.data)["access_token"]
    return token
项目:chrome-prerender    作者:bosondata    | 项目源码 | 文件源码
def __init__(self) -> None:
        http_client = urllib3.PoolManager(
            timeout=urllib3.Timeout.DEFAULT_TIMEOUT,
            cert_reqs='CERT_REQUIRED',
            ca_certs=certifi.where(),
            retries=urllib3.Retry(
                total=5,
                backoff_factor=0.2,
                status_forcelist=[500, 502, 503, 504]
            ),
            maxsize=20
        )
        self.client = minio.Minio(
            S3_SERVER,
            access_key=S3_ACCESS_KEY,
            secret_key=S3_SECRET_KEY,
            region=S3_REGION,
            secure=S3_SERVER == 's3.amazonaws.com',
            http_client=http_client
        )
项目:searx-stats2    作者:dalf    | 项目源码 | 文件源码
def get_connection():
    # pycurl initialization
    h = pycurl.Curl()

    # follow redirects
    h.setopt(pycurl.FOLLOWLOCATION, False)

    # enable compression
    h.setopt(pycurl.ENCODING, 'gzip, deflate')

    # certifi
    h.setopt(pycurl.CAINFO, certifi.where())

    # no signal
    h.setopt(pycurl.NOSIGNAL, 1)

    # certificate informations
    h.setopt(pycurl.OPT_CERTINFO, 1)

    return h
项目:gfw_domain_whitelist_spider    作者:R0uter    | 项目源码 | 文件源码
def __getPage(self,url):
        http = urllib3.PoolManager(
            cert_reqs='CERT_REQUIRED',  # Force certificate check.
            ca_certs=certifi.where(),  # Path to the Certifi bundle.
        )
        data = ''
        try:
            data = http.request('GET', url, timeout=10,
                                headers={
                                    'User-agent' : 'Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.5) Gecko/20091102 Firefox/3.5.5'}
                                ).data

            codeType = chardet.detect(data)
            data = data.decode(codeType['encoding'])
        except:
            pass

        return data
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def check_cert():
    """
    Checks availability of "cacert.pem" file installed by the certifi module
    """
    LOGGER.info('certificate: checking')
    import certifi
    from emft.core.path import Path
    cacert = str(Path(certifi.where()).abspath())
    if not os.path.exists(cacert):
        raise FileNotFoundError(cacert)
    # # noinspection SpellCheckingInspection
    # if not cacert.crc32() == 'D069EE01':
    #     raise ImportError('cacert.pem file is corrupted: {}'.format(cacert.crc32()))
    LOGGER.debug('setting up local cacert file to: {}'.format(cacert))
    os.environ['REQUESTS_CA_BUNDLE'] = cacert
    LOGGER.info('certificate: checked')
项目:RaiWalletBot    作者:SergiySW    | 项目源码 | 文件源码
def mercatox():
    http = urllib3.PoolManager(cert_reqs='CERT_REQUIRED',ca_certs=certifi.where())
    url = 'https://mercatox.com/public/json24'
    #response = http.request('GET', url, headers=header, timeout=20.0)
    response = http.request('GET', url, timeout=20.0)
    json_mercatox = json.loads(response.data)
    json_array = json_mercatox['pairs']['XRB_BTC']
    try:
        last_price = int(float(json_array['last']) * (10 ** 8))
    except KeyError:
        last_price = 0
    high_price = int(float(json_array['high24hr']) * (10 ** 8))
    low_price = int(float(json_array['low24hr']) * (10 ** 8))
    ask_price = int(float(json_array['lowestAsk']) * (10 ** 8))
    bid_price = int(float(json_array['highestBid']) * (10 ** 8))
    volume = int(float(json_array['baseVolume']))
    btc_volume = int(float(json_array['quoteVolume']) * (10 ** 8))

    mysql_set_price(1, last_price, high_price, low_price, ask_price, bid_price, volume, btc_volume)
项目:RaiWalletBot    作者:SergiySW    | 项目源码 | 文件源码
def monitoring_block_count():
    # set bot
    bot = Bot(api_key)
    count = int(rpc({"action": "block_count"}, 'count'))
    reference_count = int(reference_block_count())

    http = urllib3.PoolManager(cert_reqs='CERT_REQUIRED',ca_certs=certifi.where())
    response = http.request('GET', summary_url, headers=header, timeout=20.0)
    try:
        json_data = json.loads(response.data)
        community_count = int(json_data['blocks'])
    except ValueError as e:
        community_count = reference_count
    difference = int(math.fabs(community_count - count))

    response = http.request('GET', block_count_url, headers=header, timeout=20.0)
    raiwallet_count = int(response.data)

    if (difference > block_count_difference_threshold):
        # Warning to admins
        for user_id in admin_list:
            push(bot, user_id, 'Block count: {0}\nCommunity: {1}\nDifference: *{2}*\nReference: {3}\nraiwallet.info: {4}'.format(count, community_count, difference, reference_count, raiwallet_count))
        # trying to fix
        bootstrap_multi()
项目:microProxy    作者:mike820324    | 项目源码 | 文件源码
def create_dest_sslcontext(insecure=False, trusted_ca_certs="", alpn=None):
    ssl_ctx = create_basic_sslcontext()

    if not insecure:
        trusted_ca_certs = trusted_ca_certs or certifi.where()
        ssl_ctx.load_verify_locations(trusted_ca_certs)
        ssl_ctx.set_verify(
            SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
            certificate_verify_cb)
    else:
        ssl_ctx.set_verify(SSL.VERIFY_NONE, certificate_verify_cb)

    if alpn and HAS_ALPN:
        ssl_ctx.set_alpn_protos(alpn)

    return ssl_ctx
项目:django-rest-search    作者:wemap    | 项目源码 | 文件源码
def __create_connection(self, config):
        kwargs = {
            'host': config['HOST'],
            'port': config.get('PORT', 9200),
            'use_ssl': config.get('USE_SSL', False),
            'verify_certs': True,
            'ca_certs': certifi.where()
        }

        if 'AWS_ACCESS_KEY' in config and \
           'AWS_SECRET_KEY' in config and \
           'AWS_REGION' in config:

            kwargs['connection_class'] = RequestsHttpConnection
            kwargs['http_auth'] = AWSRequestsAuth(
                aws_access_key=config['AWS_ACCESS_KEY'],
                aws_secret_access_key=config['AWS_SECRET_KEY'],
                aws_host=config['HOST'],
                aws_region=config['AWS_REGION'],
                aws_service='es')

        es = Elasticsearch(**kwargs)
        es._index = config['INDEX_NAME']
        es._settings = config.get('INDEX_SETTINGS', DEFAULT_INDEX_SETTINGS)
        return es
项目:WebMan    作者:flipflop97    | 项目源码 | 文件源码
def getShortcutIcon(pkgname):
    try:
        return getShortcutIcon.cache[pkgname]
    except AttributeError:
        getShortcutIcon.cache = {}
    except KeyError:
        try:
            packageUrl = parsePackage(getPackageInfo(pkgname))[3]
            pool = PoolManager(cert_reqs='CERT_REQUIRED', ca_certs=where())
            pageRequest = pool.request('GET', packageUrl, timeout=3.0)
            pageContent = pageRequest.data
            pageSoup = BeautifulSoup(pageContent, 'html.parser')
            iconLink = pageSoup.find('link', rel='icon')['href']
            getShortcutIcon.cache[pkgname] = urljoin(packageUrl, iconLink)
        except:
            try:
                getShortcutIcon.cache[pkgname] = urljoin(packageUrl, '/favicon.ico')
            except:
                getShortcutIcon.cache[pkgname] = None

    return getShortcutIcon(pkgname)
项目:ZPoc    作者:zidier215    | 项目源码 | 文件源码
def _get_url(self, url):
        if self.API_TOKEN == None:
            logging.error('none token') # 3 For ERROR level
            return
        try:
            c = pycurl.Curl()
            c.setopt(pycurl.CAINFO, certifi.where())
            c.setopt(pycurl.URL, url)
            b = StringIO.StringIO()
            c.setopt(pycurl.WRITEFUNCTION, b.write)
            c.setopt(pycurl.USERAGENT, "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)")
            c.setopt(pycurl.HTTPHEADER, ['Authorization: JWT %s' % self.API_TOKEN.encode()])
            c.setopt(pycurl.CUSTOMREQUEST, "GET")
            c.setopt(pycurl.FOLLOWLOCATION, 1)
            c.perform()
            result = b.getvalue()
            logging.debug('result')
        except Exception as e:
            logging.error(e.message)
            logging.error('go error')
            pass
        return result
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
项目:algo-trading-pipeline    作者:NeuralKnot    | 项目源码 | 文件源码
def __init__(self, api_key, source, logger):
        self.apiKey = api_key
        self.source = source
        self.logger = logger
        self.sort_error_displayed = False
        self.http = urllib3.PoolManager(
            cert_reqs = "CERT_REQUIRED", # Force certificate check.
            ca_certs = certifi.where(),  # Path to the Certifi bundle.
        )

    # Retrieves articles from source
项目:mongodb-monitoring    作者:jruaux    | 项目源码 | 文件源码
def get_ssl_context(*args):
        """Create and return an SSLContext object."""
        certfile, keyfile, ca_certs, cert_reqs = args
        # Note PROTOCOL_SSLv23 is about the most misleading name imaginable.
        # This configures the server and client to negotiate the
        # highest protocol version they both support. A very good thing.
        ctx = SSLContext(ssl.PROTOCOL_SSLv23)
        if hasattr(ctx, "options"):
            # Explicitly disable SSLv2 and SSLv3. Note that up to
            # date versions of MongoDB 2.4 and above already do this,
            # python disables SSLv2 by default in >= 2.7.7 and >= 3.3.4
            # and SSLv3 in >= 3.4.3. There is no way for us to do this
            # explicitly for python 2.6 or 2.7 before 2.7.9.
            ctx.options |= getattr(ssl, "OP_NO_SSLv2", 0)
            ctx.options |= getattr(ssl, "OP_NO_SSLv3", 0)
        if certfile is not None:
            ctx.load_cert_chain(certfile, keyfile)
        if ca_certs is not None:
            ctx.load_verify_locations(ca_certs)
        elif cert_reqs != ssl.CERT_NONE:
            # CPython >= 2.7.9 or >= 3.4.0, pypy >= 2.5.1
            if hasattr(ctx, "load_default_certs"):
                ctx.load_default_certs()
            # Python >= 3.2.0, useless on Windows.
            elif (sys.platform != "win32" and
                  hasattr(ctx, "set_default_verify_paths")):
                ctx.set_default_verify_paths()
            elif sys.platform == "win32" and HAVE_WINCERTSTORE:
                with _WINCERTSLOCK:
                    if _WINCERTS is None:
                        _load_wincerts()
                ctx.load_verify_locations(_WINCERTS.name)
            elif HAVE_CERTIFI:
                ctx.load_verify_locations(certifi.where())
            else:
                raise ConfigurationError(
                    "`ssl_cert_reqs` is not ssl.CERT_NONE and no system "
                    "CA certificates could be loaded. `ssl_ca_certs` is "
                    "required.")
        ctx.verify_mode = ssl.CERT_REQUIRED if cert_reqs is None else cert_reqs
        return ctx
项目:aws-waf-security-automation    作者:cerbo    | 项目源码 | 文件源码
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def find_ca_bundle():
    """Return an existing CA bundle path, or None"""
    if os.name == 'nt':
        return get_win_certfile()
    else:
        for cert_path in cert_paths:
            if os.path.isfile(cert_path):
                return cert_path
    try:
        import certifi
        return certifi.where()
    except (ImportError, ResolutionError, ExtractionError):
        return None
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _default_ca_certs():
    if certifi is None:
        raise Exception("The 'certifi' package is required to use https "
                        "in simple_httpclient")
    return certifi.where()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def resolve(self, host, port, family=socket.AF_UNSPEC, callback=None):
        """Resolves an address.

        The ``host`` argument is a string which may be a hostname or a
        literal IP address.

        Returns a `.Future` whose result is a list of (family,
        address) pairs, where address is a tuple suitable to pass to
        `socket.connect <socket.socket.connect>` (i.e. a ``(host,
        port)`` pair for IPv4; additional fields may be present for
        IPv6). If a ``callback`` is passed, it will be run with the
        result as an argument when it is complete.
        """
        raise NotImplementedError()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def resolve(self, host, port, family=socket.AF_UNSPEC, callback=None):
        """Resolves an address.

        The ``host`` argument is a string which may be a hostname or a
        literal IP address.

        Returns a `.Future` whose result is a list of (family,
        address) pairs, where address is a tuple suitable to pass to
        `socket.connect <socket.socket.connect>` (i.e. a ``(host,
        port)`` pair for IPv4; additional fields may be present for
        IPv6). If a ``callback`` is passed, it will be run with the
        result as an argument when it is complete.
        """
        raise NotImplementedError()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _default_ca_certs():
    if certifi is None:
        raise Exception("The 'certifi' package is required to use https "
                        "in simple_httpclient")
    return certifi.where()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def resolve(self, host, port, family=socket.AF_UNSPEC, callback=None):
        """Resolves an address.

        The ``host`` argument is a string which may be a hostname or a
        literal IP address.

        Returns a `.Future` whose result is a list of (family,
        address) pairs, where address is a tuple suitable to pass to
        `socket.connect <socket.socket.connect>` (i.e. a ``(host,
        port)`` pair for IPv4; additional fields may be present for
        IPv6). If a ``callback`` is passed, it will be run with the
        result as an argument when it is complete.
        """
        raise NotImplementedError()
项目:jira_worklog_scanner    作者:pgarneau    | 项目源码 | 文件源码
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
项目:endpoints-tools    作者:cloudendpoints    | 项目源码 | 文件源码
def fetch_service_config_id(metadata):
    """Fetch service config ID from metadata URL."""
    url = metadata + _METADATA_PATH + "/attributes/" + _METADATA_SERVICE_CONFIG_ID
    headers = {"Metadata-Flavor": "Google"}
    client = urllib3.PoolManager(ca_certs=certifi.where())
    try:
        response = client.request("GET", url, headers=headers)
        if response.status != 200:
            # Fetching service config id is optional. No need to leave log
            raise None
    except:
        url = metadata + _METADATA_PATH + "/attributes/endpoints-service-version"
        try:
            response = client.request("GET", url, headers=headers)
        except:
            logging.info("Failed to fetch service config ID from the metadata server: " + url)
            return None

        if response.status != 200:
            message_template = "Fetching service config ID failed (url {}, status code {})"
            logging.info(message_template.format(url, response.status))
            return None

    version = response.data
    logging.info("Service config ID:" + version)
    return version
项目:endpoints-tools    作者:cloudendpoints    | 项目源码 | 文件源码
def fetch_latest_rollout(management_service, service_name, access_token):
    """Fetch rollouts"""
    if access_token is None:
        headers = {}
    else:
        headers = {"Authorization": "Bearer {}".format(access_token)}

    client = urllib3.PoolManager(ca_certs=certifi.where())

    service_mgmt_url = SERVICE_MGMT_ROLLOUTS_URL_TEMPLATE.format(management_service,
                                                                 service_name)
    try:
        response = client.request("GET", service_mgmt_url, headers=headers)
    except:
        raise FetchError(1, "Failed to fetch rollouts")

    status_code = response.status
    if status_code != 200:
        message_template = ("Fetching rollouts failed "\
                            "(status code {}, reason {}, url {})")
        raise FetchError(1, message_template.format(status_code,
                                                    response.reason,
                                                    service_mgmt_url))
    rollouts = json.loads(response.data)
    # No valid rollouts
    if rollouts is None or \
      'rollouts' not in rollouts or \
      len(rollouts["rollouts"]) == 0 or \
      "rolloutId" not in rollouts["rollouts"][0] or \
      "trafficPercentStrategy" not in rollouts["rollouts"][0] or \
      "percentages" not in rollouts["rollouts"][0]["trafficPercentStrategy"]:
        message_template = ("Invalid rollouts response (url {}, data {})")
        raise FetchError(1, message_template.format(service_mgmt_url,
                                                    response.data))

    return rollouts["rollouts"][0]
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
项目:YoWhenReady    作者:jnsdrtlf    | 项目源码 | 文件源码
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
项目:smc-python    作者:gabstopper    | 项目源码 | 文件源码
def _get_ca_bundle():
    """  
    If verify=True, then requests is using the built in
    certifi CA database. Attempt to get that path for
    the websocket.
    """
    try:
        import certifi
        return certifi.where()
    except ImportError:
        pass
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
项目:purelove    作者:hucmosin    | 项目源码 | 文件源码
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
项目:ceres    作者:dicortazar    | 项目源码 | 文件源码
def parse_es_section(parser, es_section):

    ES_config = namedtuple('ES_config',
                           ['es_read', 'es_write', 'es_read_git_index',
                            'es_write_git_index'])

    user = parser.get(es_section, 'user')
    password = parser.get(es_section, 'password')
    host = parser.get(es_section, 'host')
    port = parser.get(es_section, 'port')
    path = parser.get(es_section, 'path')
    es_read_git_index = parser.get(es_section, 'index_git_raw')

    host_output = parser.get(es_section, 'host_output')
    port_output = parser.get(es_section, 'port_output')
    user_output = parser.get(es_section, 'user_output')
    password_output = parser.get(es_section, 'password_output')
    path_output = parser.get(es_section, 'path_output')
    es_write_git_index = parser.get(es_section, 'index_git_output')

    connection_input = "https://" + user + ":" + password + "@" + host + ":"\
                       + port + "/" + path
    print("Input ES:", connection_input)
    es_read = Elasticsearch([connection_input], use_ssl=True, verity_certs=True,
                            ca_cert=certifi.where(), timeout=100)

    credentials = ""
    if user_output:
        credentials = user_output + ":" + password_output + "@"

    connection_output = "http://" + credentials + host_output + ":"\
                        + port_output + "/" + path_output
    # es_write = Elasticsearch([connection_output], use_ssl=True,
    #                           verity_certs=True, ca_cert=certifi.where(),
    #                           scroll='300m', timeout=100)
    print("Output ES:", connection_output)
    es_write = Elasticsearch([connection_output])

    return ES_config(es_read=es_read, es_write=es_write,
                     es_read_git_index=es_read_git_index,
                     es_write_git_index=es_write_git_index)
项目:ceres    作者:dicortazar    | 项目源码 | 文件源码
def ESConnection():

    parser = configparser.ConfigParser()
    conf_file = '.settings'
    fd = open(conf_file, 'r')
    parser.readfp(fd)
    fd.close()

    sections = parser.sections()
    for section in sections:
        options = parser.options(section)
        for option in options:
            if option == 'user': user = parser.get(section, option)
            if option == 'password': password = parser.get(section, option)
            if option == 'host': host = parser.get(section, option)
            if option == 'port': port = parser.get(section, option)
            if option == 'path': path = parser.get(section, option)
            if option == 'index_git_raw': es_read_git_index = parser.get(section, option)

            if option == 'host_output': host_output = parser.get(section, option)
            if option == 'port_output': port_output = parser.get(section, option)
            if option == 'user_output': user_output = parser.get(section, option)
            if option == 'password_output': password_output = parser.get(section, option)
            if option == 'path_output': path_output = parser.get(section, option)
            if option == 'index_git_output': es_write_git_index = parser.get(section, option)


    connection = "https://" + user + ":" + password + "@" + host + ":" + port + "/" + path
    print (connection)
    es_read = Elasticsearch([connection], use_ssl=True, verity_certs=True, ca_cert=certifi.where(), scroll='300m', timeout=100)

    credentials = ""
    if user_output:
        credentials = user_output + ":" + password_output + "@"

    connection_output = "http://" + credentials + host_output + ":" + port_output + "/" + path_output
    #es_write = Elasticsearch([connection_output], use_ssl=True, verity_certs=True, ca_cert=certifi.where(), scroll='300m', timeout=100)
    es_write = Elasticsearch([connection_output])
    print (connection_output)

    return es_read, es_write, es_read_git_index, es_write_git_index
项目:ceres    作者:dicortazar    | 项目源码 | 文件源码
def ESConnection():

    parser = configparser.ConfigParser()
    conf_file = 'settings'
    fd = open(conf_file, 'r')
    parser.readfp(fd)
    fd.close()

    sections = parser.sections()
    for section in sections:
        options = parser.options(section)
        for option in options:
            if option == 'user': user = parser.get(section, option)
            if option == 'password': password = parser.get(section, option)
            if option == 'host': host = parser.get(section, option)
            if option == 'port': port = parser.get(section, option)
            if option == 'path': path = parser.get(section, option)
            if option == 'genderize_key': genderize_key = parser.get(section, option)
            if option == 'index_gerrit_raw': es_read_gerrit_index = parser.get(section, option)
            if option == 'index_git_raw': es_read_git_index = parser.get(section, option)

            if option == 'host_output': host_output = parser.get(section, option)
            if option == 'port_output': port_output = parser.get(section, option)
            if option == 'user_output': user_output = parser.get(section, option)
            if option == 'password_output': password_output = parser.get(section, option)
            if option == 'path_output': path_output = parser.get(section, option)
            if option == 'index_gerrit_output': es_write_gerrit_index = parser.get(section, option)
            if option == 'index_git_output': es_write_git_index = parser.get(section, option)


    connection = "https://" + user + ":" + password + "@" + host + ":" + port + "/"
    print (connection)
    es_read = Elasticsearch([connection], use_ssl=True, verity_certs=True, ca_cert=certifi.where(), scroll='300m', timeout=100)

    connection_output = "https://" + user_output + ":" + password_output + "@" + host_output + ":" + port_output + "/" + path_output
    es_write = Elasticsearch([connection_output], use_ssl=True, verity_certs=True, ca_cert=certifi.where(), scroll='300m', timeout=100)

    return es_read, es_write, es_read_git_index, es_read_gerrit_index, \
           es_write_git_index, es_write_gerrit_index, genderize_key
项目:alexa-stackoverflow    作者:benvand    | 项目源码 | 文件源码
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
项目:harbour-sailfinder    作者:DylanVanAssche    | 项目源码 | 文件源码
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
项目:harbour-sailfinder    作者:DylanVanAssche    | 项目源码 | 文件源码
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')
项目:python-base    作者:kubernetes-client    | 项目源码 | 文件源码
def read_all(self):
        """Return buffered data received on stdout and stderr channels.
        This is useful for non-interactive call where a set of command passed
        to the API call and their result is needed after the call is concluded.
        Should be called after run_forever() or update()

        TODO: Maybe we can process this and return a more meaningful map with
        channels mapped for each input.
        """
        out = self._all
        self._all = ""
        self._channels = {}
        return out
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def where():
        """Return the preferred certificate bundle."""
        # vendored bundle inside Requests
        return os.path.join(os.path.dirname(__file__), 'cacert.pem')