Python ssl 模块,get_server_certificate() 实例源码

我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用ssl.get_server_certificate()

项目:ender-chest-public    作者:samjo-nyang    | 项目源码 | 文件源码
def inspect(host, port):
    try:
        r = requests.get('https://%s:%s' % (host, port), timeout=2)
    except SSLError as e:
        errmsg = str(e)

        # CHECK ERR_HOST_NOT_MATCH
        if errmsg.startswith('hostname'):
            return ERR_HOST_NOT_MATCH

        try:
            raw_cert = ssl.get_server_certificate((host, port))
        except:
            return ERR_UNKNOWN

        x509 = crypto.load_certificate(crypto.FILETYPE_PEM, raw_cert)

        # CHECK ERR_EXPIRED
        now = datetime.now()
        not_after = datetime.strptime(x509.get_notAfter().decode('utf-8'),
                                      "%Y%m%d%H%M%SZ")
        not_before = datetime.strptime(x509.get_notBefore().decode('utf-8'),
                                       "%Y%m%d%H%M%SZ")
        if now > not_after or now < not_before:
            return ERR_EXPIRED

        # otherwise ERR_SELF_SIGNED
        return ERR_SELF_SIGNED
    except ConnectionError as e:
        return ERR_TIMEOUT
    except Timeout as e:
        return ERR_TIMEOUT
    except:
        return ERR_UNKNOWN
    return ERR_NONE
项目:SlackUptimeMonitor    作者:AndreiD    | 项目源码 | 文件源码
def check():
    hostname = "your_server.com"
    port = 443

    num_days = 7
    cert = ssl.get_server_certificate(
        (hostname, port), ssl_version=ssl.PROTOCOL_TLSv1)
    x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
    expiry_date = x509.get_notAfter()
    print(str(expiry_date))
    assert expiry_date, "Cert doesn't have an expiry date."

    ssl_date_fmt = r'%Y%m%d%H%M%SZ'
    expires = datetime.datetime.strptime(str(expiry_date)[2:-1], ssl_date_fmt)
    remaining = (expires - datetime.datetime.utcnow()).days

    if remaining <= num_days:
        print("ALERTING!")
    else:
        print("Not alerting. You have " + str(remaining) + " days")
项目:ws-backend-community    作者:lavalamp-    | 项目源码 | 文件源码
def get_ssl_certificate(self, ssl_version=None):
        """
        Get an OpenSSL cryptographic PEM certificate from the endpoint using the specified SSL version.
        :param ssl_version: The SSL version to connect with. If None, then let OpenSSL negotiate which
        protocol to use.
        :return: A tuple containing (1) the certificate string and (2) an OpenSSL cryptographic PEM
        certificate from the endpoint.
        """

        try:
            if ssl_version is not None:
                cert = ssl.get_server_certificate((self.address, self.port), ssl_version=getattr(ssl, ssl_version))
            else:
                cert = ssl.get_server_certificate((self.address, self.port))
            return cert, OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
        except ssl.SSLError as e:
            raise SslCertificateRetrievalFailedError(
                message="SSL error thrown when retrieving SSL certificate: %s." % (e,)
            )

    # Protected Methods

    # Private Methods
项目:pyxcli    作者:IBM    | 项目源码 | 文件源码
def _certificate_required(cls, hostname, port=XCLI_DEFAULT_PORT,
                              ca_certs=None, validate=None):
        '''
        returns true if connection should verify certificate
        '''
        if not ca_certs:
            return False

        xlog.debug("CONNECT SSL %s:%s, cert_file=%s",
                   hostname, port, ca_certs)
        certificate = ssl.get_server_certificate((hostname, port),
                                                 ca_certs=None)
        # handle XIV pre-defined certifications
        # if a validation function was given - we let the user check
        # the certificate himself, with the user's own validate function.
        # if the validate returned True - the user checked the cert
        # and we don't need check it, so we return false.
        if validate:
            return not validate(certificate)
        return True
项目:nova    作者:hubblestack    | 项目源码 | 文件源码
def _get_cert_from_endpoint(server, port=443):
    try:
        cert = ssl.get_server_certificate((server, port))
    except Exception:
        log.error('Unable to retrieve certificate from {0}'.format(server))
        cert = None
    if not cert:
        return None

    return cert
项目:networking-zte    作者:openstack    | 项目源码 | 文件源码
def _fetch_and_store_cert(self, server, port, path):
        """Grabs a certificate from a server and writes it to a given path."""
        try:
            cert = ssl.get_server_certificate((server, port))
        except Exception as e:
            raise cfg.Error(_('Could not retrieve initial '
                              'certificate from controller %(server)s. '
                              'Error details: %(error)s') %
                            {'server': server, 'error': str(e)})

        LOG.warning(_LW("Storing to certificate for host %(server)s "
                        "at %(path)s") % {'server': server,
                                          'path': path})
        self._file_put_contents(path, cert)

        return cert
项目:python-fritzbox    作者:pamapa    | 项目源码 | 文件源码
def save_certificate(self):
    port = self.url.port if self.url.port else 443
    adr = (self.url.hostname, port)
    if self.debug: print("save_certificate of %s to %s" % (str(adr), self.cafile))
    capath = os.path.dirname(self.cafile)
    if not os.path.exists(capath): os.makedirs(capath)
    cert = ssl.get_server_certificate(adr)
    with open(self.cafile, "w") as outfile:
      outfile.write(cert)
项目:vio    作者:vmware    | 项目源码 | 文件源码
def get_vc_fingerprint(vcenter_ip):
    cert_pem = ssl.get_server_certificate((vcenter_ip, VCENTER_PORT),
                                          ssl_version=ssl.PROTOCOL_TLSv1)
    x509 = X509.load_cert_string(cert_pem, X509.FORMAT_PEM)
    fp = x509.get_fingerprint('sha1')
    return ':'.join(a+b for a, b in zip(fp[::2], fp[1::2]))
项目:og-miner    作者:opendns    | 项目源码 | 文件源码
def process(self, profile, state, vertex):
        if 'type' not in vertex:
            return { 'error' : "No vertex type defined!" }

        properties = dict()

        if vertex['type'] == "domain":
            try:
                begin = time.time()
                properties['content'] = ssl.get_server_certificate((vertex['id'], self.ssl_port))
                properties['time'] = time.time() - begin
            except Exception as e:
                properties['error'] = str(e)

        return { "properties": properties, "neighbors" : [] }
项目:analyst-scripts    作者:Te-k    | 项目源码 | 文件源码
def check_certificate(self, domain):
        """
        Download and get information from the TLS certificate
        """
        pem = ssl.get_server_certificate((domain, 443))
        if self.output:
            with open(os.path.join(self.output, 'cert.pem'), 'wb') as f:
                f.write(pem)


        cert = x509.load_pem_x509_certificate(str(pem), default_backend())
        self.log.critical("\tCertificate:")
        self.log.critical("\t\tDomain: %s", ",".join(map(lambda x: x.value, cert.subject)))
        self.log.critical("\t\tNot After: %s", str(cert.not_valid_after))
        self.log.critical("\t\tNot Before: %s", str(cert.not_valid_before))
        self.log.critical("\t\tCA Issuer: %s", ", ".join(map(lambda x:x.value, cert.issuer)))
        self.log.critical("\t\tSerial: %s", cert.serial_number)
        for ext in cert.extensions:
            if ext.oid._name == 'basicConstraints':
                if ext.value.ca:
                    self.log.critical("\t\tBasic Constraints: True")
            elif ext.oid._name == 'subjectAltName':
                self.log.critical("\t\tAlternate names: %s", ", ".join(ext.value.get_values_for_type(x509.DNSName)))
项目:hubble-salt    作者:hubblestack    | 项目源码 | 文件源码
def _get_cert_from_endpoint(server, port=443):
    try:
        cert = ssl.get_server_certificate((server, port))
    except Exception:
        log.error('Unable to retrieve certificate from {0}'.format(server))
        cert = None
    if not cert:
        return None

    return cert
项目:hubble    作者:hubblestack    | 项目源码 | 文件源码
def _get_cert_from_endpoint(server, port=443):
    try:
        cert = ssl.get_server_certificate((server, port))
    except Exception:
        log.error('Unable to retrieve certificate from {0}'.format(server))
        cert = None
    if not cert:
        return None

    return cert
项目:lokey    作者:jpf    | 项目源码 | 文件源码
def tls(ctx, domain_name):
    """Get the TLS certificate for a domain.

    Example:

        $ lokey fetch tls gliderlabs.com
    """
    try:
        cert = stdlib_ssl.get_server_certificate((domain_name, 443))
        click.echo(cert)
    except:
        msg =("Unable to fetch key from {}, "
              "is that domain configured for TLS?").format(domain_name)
        raise click.ClickException(msg)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def get_server_certificate(*args, **kwargs):
        return await run_in_thread(partial(_ssl.get_server_certificate, *args, **kwargs))

    # Small wrapper class to make sure the wrap_socket() method returns the right type
项目:pyvmwareclient    作者:wbugbofh    | 项目源码 | 文件源码
def onHtml(self, event):
        fila = self.listadoVM
        for i in range(len(fila)):
            if logger != None: logger.info(fila[i])
        # El tercer elemento es la ip y el 9 es el UUID
        vm = conexion.searchIndex.FindByUuid(None,fila[8], True)
        vm_name = vm.summary.config.name
        vm_moid = vm._moId
        if logger != None: logger.info('void= '.format(vm_moid))
        vcenter_data = conexion.setting
        vcenter_settings = vcenter_data.setting
        console_port = '9443'
        puerto_vcenter= '443'

        for item in vcenter_settings:
            key = getattr(item, 'key')
            #print ('key: ' + key + ' =>'+ str(getattr(item, 'value')))
            if key == 'VirtualCenter.FQDN':
                vcenter_fqdn = getattr(item, 'value')
                #if key == 'WebService.Ports.https':
                #console_port = str(getattr(item, 'value'))

        host = vcenter_fqdn

        session_manager = conexion.sessionManager
        session = session_manager.AcquireCloneTicket()
        vc_cert = ssl.get_server_certificate((host, int(puerto_vcenter)))
        vc_pem = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,vc_cert)
        vc_fingerprint = vc_pem.digest('sha1')

        if logger != None: logger.info("Open the following URL in your browser to access the " \
                                       "Remote Console.\n" \
                                       "You have 60 seconds to open the URL, or the session" \
                                       "will be terminated.\n")
        print(str(vcenter_data))

        # Locate the version of vcenter the object .version for locate the version of vcenter
        object_about = conexion.about
        #For version vcenter 5.5
        if object_about.version == '5.5.0':
            console_portv5 = '7331'
            URL5 = "http://" + host + ":" + console_portv5 + "/console/?vmId=" \
                   + str(vm_moid) + "&vmName=" + vm_name + "&host=" + vcenter_fqdn \
                   + "&sessionTicket=" + session + "&thumbprint=" + vc_fingerprint.decode('utf8')
            webbrowser.open(URL5, new=1, autoraise=True)

        #For version vcenter 6.0 and 6.5
        if object_about.version == '6.0.0' or object_about.version == '6.5.0':
            URL = "https://" + host + ":" + console_port + "/vsphere-client/webconsole.html?vmId=" \
                  + str(vm_moid) + "&vmName=" + vm_name + "&host=" + vcenter_fqdn \
                  + "&sessionTicket=" + session + "&thumbprint.info=" + vc_fingerprint.decode('utf-8')
            if logger != None: logger.info(URL)
            webbrowser.open(URL, new=1, autoraise=True)
            if logger != None: logger.info ("Waiting for 60 seconds, then exit")