Python cryptography.x509 模块,load_pem_x509_certificate() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用cryptography.x509.load_pem_x509_certificate()

项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def verify_renewable_cert_sig(renewable_cert):
    """ Verifies the signature of a `.storage.RenewableCert` object.

    :param `.storage.RenewableCert` renewable_cert: cert to verify

    :raises errors.Error: If signature verification fails.
    """
    try:
        with open(renewable_cert.chain, 'rb') as chain:
            chain, _ = pyopenssl_load_certificate(chain.read())
        with open(renewable_cert.cert, 'rb') as cert:
            cert = x509.load_pem_x509_certificate(cert.read(), default_backend())
        hash_name = cert.signature_hash_algorithm.name
        OpenSSL.crypto.verify(chain, cert.signature, cert.tbs_certificate_bytes, hash_name)
    except (IOError, ValueError, OpenSSL.crypto.Error) as e:
        error_str = "verifying the signature of the cert located at {0} has failed. \
                Details: {1}".format(renewable_cert.cert, e)
        logger.exception(error_str)
        raise errors.Error(error_str)
项目:cursive    作者:openstack    | 项目源码 | 文件源码
def load_certificate(self, cert_name):
        # Load the raw certificate file data.
        path = os.path.join(self.cert_path, cert_name)
        with open(path, 'rb') as cert_file:
            data = cert_file.read()

        # Convert the raw certificate data into a certificate object, first
        # as a PEM-encoded certificate and, if that fails, then as a
        # DER-encoded certificate. If both fail, the certificate cannot be
        # loaded.
        try:
            return x509.load_pem_x509_certificate(data, default_backend())
        except Exception:
            try:
                return x509.load_der_x509_certificate(data, default_backend())
            except Exception:
                raise exception.SignatureVerificationError(
                    "Failed to load certificate: %s" % path
                )
项目:bitmask-dev    作者:leapcode    | 项目源码 | 文件源码
def validate_ca_cert(self, ignored):
        expected = self._get_expected_ca_cert_fingerprint()
        algo, expectedfp = expected.split(':')
        expectedfp = expectedfp.replace(' ', '')
        backend = default_backend()

        with open(self._get_ca_cert_path(), 'r') as f:
            certstr = f.read()
        cert = load_pem_x509_certificate(certstr, backend)
        hasher = getattr(hashes, algo)()
        fpbytes = cert.fingerprint(hasher)
        fp = binascii.hexlify(fpbytes)

        if fp != expectedfp:
            os.unlink(self._get_ca_cert_path())
            self.log.error("Fingerprint of CA cert doesn't match: %s <-> %s"
                           % (fp, expectedfp))
            raise NetworkError("The provider's CA fingerprint doesn't match")
项目:loopchain    作者:theloopkr    | 项目源码 | 文件源码
def read_file(self, subject_name, pw=None):
        """
        ???? ???

        :param subject_name: ??? ??? ??? ??
        :return: ??? ???
        """
        pem_path = os.path.join(os.path.dirname(__file__),
                                "../../resources/unittest/" + subject_name + "/cert.pem")
        f = open(pem_path, "rb")
        cert_pem = f.read()
        f.close()
        cert = x509.load_pem_x509_certificate(cert_pem, default_backend())

        key_path = os.path.join(os.path.dirname(__file__),
                                "../../resources/unittest/" + subject_name + "/key.pem")
        f = open(key_path, "rb")
        cert_key = f.read()
        f.close()

        private_key = serialization.load_pem_private_key(cert_key, pw, default_backend())
        return {'cert': cert, 'private_key': private_key}
项目:loopchain    作者:theloopkr    | 项目源码 | 文件源码
def load_pki(self, cert_path: str, cert_pass=None):
        """
        ??? ??

        :param cert_path: ??? ??
        :param cert_pass: ??? ????
        """
        ca_cert_file = join(cert_path, self.CERT_NAME)
        ca_pri_file = join(cert_path, self.PRI_NAME)

        # ???/??? ??
        with open(ca_cert_file, "rb") as der:
            cert_bytes = der.read()
            self.__ca_cert = x509.load_pem_x509_certificate(cert_bytes, default_backend())
        with open(ca_pri_file, "rb") as der:
            private_bytes = der.read()
            try:
                self.__ca_pri = serialization.load_pem_private_key(private_bytes, cert_pass, default_backend())
            except ValueError:
                logging.debug("Invalid Password")

        # ??? ? ? ??
        sign = self.sign_data(b'TEST')
        if self.verify_data(b'TEST', sign) is False:
            logging.debug("Invalid Signature(Root Certificate load test)")
项目:seedbox    作者:nailgun    | 项目源码 | 文件源码
def validate_ca_certificate_constraints(cert_pem_data):
    cert = x509.load_pem_x509_certificate(cert_pem_data, default_backend())
    try:
        constraints = cert.extensions.get_extension_for_oid(ExtensionOID.BASIC_CONSTRAINTS)
        constraints = constraints.value
    except x509.extensions.ExtensionNotFound:
        return

    if not constraints.ca:
        raise InvalidCertificate("Not a CA certificate")

    if constraints.path_length != 0:
        raise InvalidCertificate("Invalid pathlen")


# based on ssl.match_hostname code
# https://github.com/python/cpython/blob/6f0eb93183519024cb360162bdd81b9faec97ba6/Lib/ssl.py#L279
项目:certproxy    作者:geneanet    | 项目源码 | 文件源码
def load_certificate(cert_file=None, cert_bytes=None):
    if cert_file:
        with open(cert_file, 'rb') as f:
            data = f.read()
    else:
        data = cert_bytes

    if '-----BEGIN'.encode() in data:
        cert = x509.load_pem_x509_certificate(
            data=data,
            backend=default_backend()
        )
    else:
        cert = x509.load_der_x509_certificate(
            data=data,
            backend=default_backend()
        )

    return cert
项目:deb-python-dcos    作者:openstack    | 项目源码 | 文件源码
def _user_cert_validation(cert_str):
    """Prompt user for validation of certification from cluster

    :param cert_str: cluster certificate bundle
    :type cert_str: str
    :returns whether or not user validated cert
    :rtype: bool
    """

    cert = x509.load_pem_x509_certificate(
        cert_str.encode('utf-8'), default_backend())
    fingerprint = cert.fingerprint(hashes.SHA256())
    pp_fingerprint = ":".join("{:02x}".format(c) for c in fingerprint).upper()

    msg = "SHA256 fingerprint of cluster certificate bundle:\n{}".format(
            pp_fingerprint)

    return confirm(msg, False)
项目:social-core    作者:python-social-auth    | 项目源码 | 文件源码
def get_certificate(self, kid):
        # retrieve keys from jwks_url
        resp = self.request(self.jwks_url(), method='GET')
        resp.raise_for_status()

        # find the proper key for the kid
        for key in resp.json()['keys']:
            if key['kid'] == kid:
                x5c = key['x5c'][0]
                break
        else:
            raise DecodeError('Cannot find kid={}'.format(kid))

        certificate = '-----BEGIN CERTIFICATE-----\n' \
                      '{}\n' \
                      '-----END CERTIFICATE-----'.format(x5c)

        return load_pem_x509_certificate(certificate.encode(),
                                         default_backend())
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def test_crl_0(self):
                """Test that the X509 CRL revocation works correctly."""

                with open(os.path.join(self.crl_dir, "ch1_ta4_crl.pem"),
                    "rb") as f:
                        crl = x509.load_pem_x509_crl(
                            f.read(), default_backend())

                with open(os.path.join(self.cs_dir,
                    "cs1_ch1_ta4_cert.pem"), "rb") as f:
                        cert = x509.load_pem_x509_certificate(
                            f.read(), default_backend())

                self.assertTrue(crl.issuer == cert.issuer)
                for rev in crl:
                        if rev.serial_number == cert.serial_number:
                                break
                else:
                        self.assertTrue(False, "Can not find revoked "
                            "certificate in CRL!")
项目:ansible-playbook    作者:fnordpipe    | 项目源码 | 文件源码
def requireRenewal(self, path, lifetime):
        with open(path, 'rb') as fh:
            crt = fh.read()

        data = x509.load_pem_x509_certificate(crt, default_backend())
        date = data.not_valid_after - datetime.timedelta(days = lifetime)

        if datetime.datetime.now() <= date:
            return False
        else:
            return True
项目:django-autocert    作者:farrepa    | 项目源码 | 文件源码
def certificate_expiry_date(self):
        if self.certificate:
            cert = x509.load_pem_x509_certificate(str(self.certificate), default_backend())
            return cert.not_valid_after
项目:django-autocert    作者:farrepa    | 项目源码 | 文件源码
def certificate_common_name(self):
        if self.certificate:
            cert = x509.load_pem_x509_certificate(str(self.certificate), default_backend())
            return cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
项目:manuale    作者:veeti    | 项目源码 | 文件源码
def load_pem_certificate(data):
    """
    Loads a PEM X.509 certificate.
    """
    return x509.load_pem_x509_certificate(data, default_backend())
项目:2FAssassin    作者:maxwellkoh    | 项目源码 | 文件源码
def test_convert_from_cryptography(self):
        crypto_cert = x509.load_pem_x509_certificate(
            intermediate_cert_pem, backend
        )
        cert = X509.from_cryptography(crypto_cert)

        assert isinstance(cert, X509)
        assert cert.get_version() == crypto_cert.version.value
项目:auth-srv    作者:openpermissions    | 项目源码 | 文件源码
def decode_token(token):
    """
    Get the organisation ID from a token

    :param token: a JSON Web Token
    :returns: str, organisation ID
    :raises:
        jwt.DecodeError: Invalid token or not signed with our key
        jwt.ExpiredSignatureError: Token has expired
        jwt.InvalidAudienceError: Invalid "aud" claim
        jwt.InvalidIssuerError: Invalid "iss" claim
        jwt.MissingRequiredClaimError: Missing a required claim
    """
    cert_file = getattr(options, 'ssl_cert', None) or LOCALHOST_CRT
    with open(cert_file) as f:
        cert = load_pem_x509_certificate(f.read(), default_backend())
        public_key = cert.public_key()

    payload = jwt.decode(token,
                         public_key,
                         audience=audience(),
                         issuer=issuer(),
                         algorithms=[ALGORITHM],
                         verify=True)

    if not payload.get('sub'):
        raise jwt.MissingRequiredClaimError('"sub" claim is required')

    payload['scope'] = Scope(payload['scope'])

    return payload
项目:globus-cli    作者:globus    | 项目源码 | 文件源码
def parse_issuer_cred(issuer_cred):
    """
    Given an X509 PEM file in the form of a string, parses it into sections
    by the PEM delimiters of: -----BEGIN <label>----- and -----END <label>----
    Confirms the sections can be decoded in the proxy credential order of:
    issuer cert, issuer private key, proxy chain of 0 or more certs .
    Returns the issuer cert and private key as loaded cryptography objects
    and the proxy chain as a potentially empty string.
    """
    # get each section of the PEM file
    sections = re.findall(
        "-----BEGIN.*?-----.*?-----END.*?-----", issuer_cred, flags=re.DOTALL)
    try:
        issuer_cert = sections[0]
        issuer_private_key = sections[1]
        issuer_chain_certs = sections[2:]
    except IndexError:
        raise ValueError("Unable to parse PEM data in credentials, "
                         "make sure the X.509 file is in PEM format and "
                         "consists of the issuer cert, issuer private key, "
                         "and proxy chain (if any) in that order.")

    # then validate that each section of data can be decoded as expected
    try:
        loaded_cert = x509.load_pem_x509_certificate(
            six.b(issuer_cert), default_backend())
        loaded_private_key = serialization.load_pem_private_key(
            six.b(issuer_private_key),
            password=None, backend=default_backend())
        for chain_cert in issuer_chain_certs:
            x509.load_pem_x509_certificate(
                six.b(chain_cert), default_backend())
        issuer_chain = "".join(issuer_chain_certs)
    except ValueError:
        raise ValueError("Failed to decode PEM data in credentials. Make sure "
                         "the X.509 file consists of the issuer cert, "
                         "issuer private key, and proxy chain (if any) "
                         "in that order.")

    # return loaded cryptography objects and the issuer chain
    return loaded_cert, loaded_private_key, issuer_chain
项目:loopchain    作者:theloopkr    | 项目源码 | 文件源码
def __load_cert(self, cert_dir):
        """???/??? ?? ? ?? ???

        :param cert_dir: ???/??? ?? ??
        :return: X509 ???
        """
        logging.debug("Cert/Key loading...")
        cert_file = join(cert_dir, "cert.pem")
        pri_file = join(cert_dir, "key.pem")

        f = open(cert_file, "rb")
        cert_bytes = f.read()
        f.close()
        cert = x509.load_pem_x509_certificate(cert_bytes, default_backend())

        f = open(pri_file, "rb")
        pri_bytes = f.read()
        f.close()
        try:
            pri = serialization.load_pem_private_key(pri_bytes, self.__PASSWD, default_backend())
        except ValueError:
            logging.debug("Invalid Password(%s)", cert_dir)
            return None

        data = b"test"
        signature = pri.sign(data, ec.ECDSA(hashes.SHA256()))

        try:
            pub_key = cert.public_key()
            result = pub_key.verify(signature, data, ec.ECDSA(hashes.SHA256()))
        except InvalidSignature:
            logging.debug("sign test fail")
            result = False

        if result:
            return cert
        else:
            logging.error("result is False ")
            return None
项目:loopchain    作者:theloopkr    | 项目源码 | 文件源码
def convert_x509cert_from_pem(self, cert_pem):
        """PEM ???? x509 ???? ??

        :param cert_pem: PEM ???
        :return: x509 ???
        """
        return x509.load_pem_x509_certificate(cert_pem, default_backend())
项目:seedbox    作者:nailgun    | 项目源码 | 文件源码
def validate_certificate_common_name(cert_pem_data, subject_name):
    cert = x509.load_pem_x509_certificate(cert_pem_data, default_backend())
    _match_subject_name(cert, subject_name, alt_names=False)
项目:seedbox    作者:nailgun    | 项目源码 | 文件源码
def validate_certificate_hosts(cert_pem_data, host_names):
    cert = x509.load_pem_x509_certificate(cert_pem_data, default_backend())
    for host_name in host_names:
        _match_subject_name(cert, host_name, compare_func=ssl._dnsname_match)
项目:seedbox    作者:nailgun    | 项目源码 | 文件源码
def validate_certificate_host_ips(cert_pem_data, host_ips):
    cert = x509.load_pem_x509_certificate(cert_pem_data, default_backend())
    for host_ip in host_ips:
        _match_subject_ip(cert, host_ip)
项目:seedbox    作者:nailgun    | 项目源码 | 文件源码
def validate_certificate_key_usage(cert_pem_data, is_web_server, is_web_client):
    cert = x509.load_pem_x509_certificate(cert_pem_data, default_backend())
    try:
        key_usage = cert.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE)
        key_usage = key_usage.value
    except x509.extensions.ExtensionNotFound:
        raise InvalidCertificate("Key usage not specified")

    if not key_usage.digital_signature:
        raise InvalidCertificate("Not intented for Digital Signature")

    if not key_usage.key_encipherment:
        raise InvalidCertificate("Not intented for Key Encipherment")

    if is_web_server or is_web_client:
        try:
            exteneded_key_usage = cert.extensions.get_extension_for_oid(ExtensionOID.EXTENDED_KEY_USAGE)
            exteneded_key_usage = exteneded_key_usage.value
        except x509.extensions.ExtensionNotFound:
            raise InvalidCertificate("Extended key usage not specified")

        if is_web_server:
            if ExtendedKeyUsageOID.SERVER_AUTH not in exteneded_key_usage:
                raise InvalidCertificate("Not intented for TLS Web Server Authentication")

        if is_web_client:
            if ExtendedKeyUsageOID.CLIENT_AUTH not in exteneded_key_usage:
                raise InvalidCertificate("Not intented for TLS Web Client Authentication")
项目:certproxy    作者:geneanet    | 项目源码 | 文件源码
def list_hosts(self):
        hosts = {}

        for csr_file in os.listdir(self.csr_path):
            with open(os.path.join(self.csr_path, csr_file), 'rb') as f:
                csr = x509.load_pem_x509_csr(f.read(), default_backend())
                hosts[csr.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value] = {
                    'key_fingerprint': rsa_key_fingerprint(csr.public_key()),
                    'cert_fingerprint': None,
                    'status': 'pending',
                }

        for crt_file in os.listdir(self.crt_path):
            with open(os.path.join(self.crt_path, crt_file), 'rb') as f:
                crt = x509.load_pem_x509_certificate(f.read(), default_backend())
                revoked = revoked_cert(crt, self.crl)
                if revoked:
                    status = 'revoked'
                else:
                    status = 'authorized'
                hosts[crt.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value] = {
                    'key_fingerprint': rsa_key_fingerprint(crt.public_key()),
                    'cert_fingerprint': x509_cert_fingerprint(crt),
                    'status': status,
                }

        return hosts
项目:RKSV    作者:ztp-at    | 项目源码 | 文件源码
def loadCert(pem):
    """
    Creates a cryptography certificate object from the given PEM certificate.
    :param pem: A certificate as a PEM string.
    :return: A cryptography certificate object.
    """
    return x509.load_pem_x509_certificate(pem.encode("utf-8"), default_backend())
项目:networking-vpp    作者:openstack    | 项目源码 | 文件源码
def __init__(self, local_cert, priv_key, ca_cert, controller_name_re):
        """Initialize JWTUtils

        Load the local node certificate, the node private
        key and the CA certificate from files; prepare for both
        signing and validation of key-value pairs.

        Signing will take place with the local certificate, and the
        public half will be added to signed objects.

        Validation will take place with the CA certificate, along with
        other checks that the signing matches the payload.

        :param local_cert: file containing public half of the local key
        :param priv_key: file containing private half of the local key
        :param ca_cert: file containing CA root certificate
        raise: IOError if the files cannot be read.
        """

        priv_key_pem = self._get_crypto_material(priv_key)
        self.private_key = serialization.load_pem_private_key(
            priv_key_pem,
            password=None,
            backend=default_backend())

        self.node_certificate = self._get_crypto_material(local_cert)
        self.node_cert_obj = load_pem_x509_certificate(
            self.node_certificate,
            default_backend())
        self.node_cert_pem = self.node_cert_obj.public_bytes(
            serialization.Encoding.PEM)

        ca_certificate = self._get_crypto_material(ca_cert)

        # pyopenssl
        root_ca = crypto.load_certificate(crypto.FILETYPE_PEM,
                                          ca_certificate)
        self.store = crypto.X509Store()
        self.store.add_cert(root_ca)

        self.controller_name_re = controller_name_re
项目:mogan    作者:openstack    | 项目源码 | 文件源码
def generate_x509_fingerprint(pem_key):
    try:
        if isinstance(pem_key, six.text_type):
            pem_key = pem_key.encode('utf-8')
        cert = x509.load_pem_x509_certificate(
            pem_key, backends.default_backend())
        raw_fp = binascii.hexlify(cert.fingerprint(hashes.SHA1()))
        if six.PY3:
            raw_fp = raw_fp.decode('ascii')
        return ':'.join(a + b for a, b in zip(raw_fp[::2], raw_fp[1::2]))
    except (ValueError, TypeError, binascii.Error) as ex:
        raise exception.InvalidKeypair(
            reason=_('failed to generate X509 fingerprint. '
                     'Error message: %s') % ex)
项目:appbackendapi    作者:codesdk    | 项目源码 | 文件源码
def get_google_public_cert_key():
    r = requests.get(GOOGLE_PUBLIC_CERT_URL)
    r.raise_for_status()

    # Load the certificate.
    certificate = x509.load_pem_x509_certificate(
        r.text.encode('utf-8'), default_backend())

    # Get the certicate's public key.
    public_key = certificate.public_key()

    return public_key
项目:lokey    作者:jpf    | 项目源码 | 文件源码
def deserialize(self, data):
        cert = x509.load_pem_x509_certificate(data, default_backend())
        key_material = cert.public_key().public_numbers()
        self._e = key_material.e
        self._n = key_material.n
项目:eetcz    作者:versatilecz    | 项目源码 | 文件源码
def _verify(self):
        '''
        Check, if message is wellsigned
        '''
        try:
            # canonize soap body a make sha256 digest
            body_c14n = etree.tostring(self.body, method='c14n', exclusive=True, with_comments=False)
            sha256 = hashlib.sha256(body_c14n)
            digest = b64encode(sha256.digest())

            # load cert options
            cert = self.root.find('.//wsse:BinarySecurityToken', namespaces=NSMAP)
            sig_info = self.root.find('.//ds:SignedInfo', namespaces=NSMAP)
            sig_value = self.root.find('.//ds:SignatureValue', namespaces=NSMAP)

            # check, if there is all nesesery data
            assert cert is not None
            assert sig_info is not None
            assert sig_value is not None

            # canonize signature info
            sig_info_c14n = etree.tostring(sig_info, method='c14n', exclusive=True, with_comments=False)

            # transform and load cert
            cert = '\n'.join(['-----BEGIN CERTIFICATE-----'] + textwrap.wrap(cert.text, 64) + ['-----END CERTIFICATE-----\n'])
            cert = load_pem_x509_certificate(cert.encode('utf-8'), default_backend())
            key = cert.public_key()

            # verify digest
            verifier = key.verifier(b64decode(sig_value.text), padding.PKCS1v15(), hashes.SHA256())
            verifier.update(sig_info_c14n)
            # if verify fail, raise exception
            verifier.verify()

            return True

        except Exception as e:
            logger.exception(e)

        # probably error, return false
        return False
项目:trustme    作者:python-trio    | 项目源码 | 文件源码
def test_basics():
    ca = CA()

    today = datetime.datetime.today()

    assert b"BEGIN CERTIFICATE" in ca.cert_pem.bytes()

    ca_cert = x509.load_pem_x509_certificate(
        ca.cert_pem.bytes(), default_backend())
    assert ca_cert.not_valid_before <= today <= ca_cert.not_valid_after

    assert ca_cert.issuer == ca_cert.subject
    bc = ca_cert.extensions.get_extension_for_class(x509.BasicConstraints)
    assert bc.value.ca == True
    assert bc.critical == True

    with pytest.raises(ValueError):
        ca.issue_server_cert()

    server = ca.issue_server_cert(u"test-1.example.org", u"test-2.example.org")

    assert b"PRIVATE KEY" in server.private_key_pem.bytes()
    assert b"BEGIN CERTIFICATE" in server.cert_chain_pems[0].bytes()
    assert len(server.cert_chain_pems) == 1
    assert server.private_key_pem.bytes() in server.private_key_and_cert_chain_pem.bytes()
    for blob in server.cert_chain_pems:
        assert blob.bytes() in server.private_key_and_cert_chain_pem.bytes()

    server_cert = x509.load_pem_x509_certificate(
        server.cert_chain_pems[0].bytes(), default_backend())

    assert server_cert.not_valid_before <= today <= server_cert.not_valid_after
    assert server_cert.issuer == ca_cert.subject

    san = server_cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
    hostnames = san.value.get_values_for_type(x509.DNSName)
    assert hostnames == [u"test-1.example.org", u"test-2.example.org"]
项目:sistemats    作者:timendum    | 项目源码 | 文件源码
def __load_cert_from_file(filename):
        with open(filename, "br") as x509_file:
            crl = load_pem_x509_certificate(x509_file.read(), default_backend())
            rsa = crl.public_key()
            return rsa
项目:deb-python-pysaml2    作者:openstack    | 项目源码 | 文件源码
def extract_rsa_key_from_x509_cert(pem):
    cert = load_pem_x509_certificate(pem, backend)
    return cert.public_key()
项目:rvmi-rekall    作者:fireeye    | 项目源码 | 文件源码
def from_primitive(cls, pem_string, session=None):
        result = cls(session=session)
        try:
            return result.from_raw_key(x509.load_pem_x509_certificate(
                utils.SmartStr(pem_string), backend=openssl.backend))
        except (TypeError, ValueError, exceptions.UnsupportedAlgorithm) as e:
            raise CipherError("X509 Certificate invalid: %s" % e)
        return result
项目:txacme    作者:twisted    | 项目源码 | 文件源码
def _match_certificate(matcher):
    return MatchesAny(
        Not(IsInstance(Certificate)),
        AfterPreprocessing(
            lambda c: x509.load_pem_x509_certificate(
                c.as_bytes(), default_backend()),
            matcher))
项目:humancrypto    作者:iffy    | 项目源码 | 文件源码
def load(cls, data=None, filename=None):
        if filename is not None:
            with open(filename, 'rb') as fh:
                data = fh.read()
        return Certificate(
            x509.load_pem_x509_certificate(data, default_backend())
        )
项目:humancrypto    作者:iffy    | 项目源码 | 文件源码
def test_self_signed_cert(self):
        priv = PrivateKey.create()
        cert = priv.self_signed_cert({'common_name': u'jose'})
        assert isinstance(cert, Certificate)
        assert cert.subject.attribs['common_name'] == u'jose'
        assert cert.issuer.attribs['common_name'] == u'jose'

        cert = x509.load_pem_x509_certificate(
            cert.dump(), default_backend()
        )
        assert cert.subject.get_attributes_for_oid(
            NameOID.COMMON_NAME)[0].value == u'jose'
        assert cert.issuer.get_attributes_for_oid(
            NameOID.COMMON_NAME)[0].value == u'jose'
项目:IoT_pki    作者:zibawa    | 项目源码 | 文件源码
def loadPEMCert(pathToCert):
    with open(pathToCert, 'rb') as f:
        crt_data = f.read()
        cert=x509.load_pem_x509_certificate(crt_data,default_backend())
        return cert
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def __string_to_cert(s, pkg_hash=None):
                """Convert a string to a X509 cert."""

                try:
                        return x509.load_pem_x509_certificate(
                            misc.force_bytes(s), default_backend())
                except ValueError:
                        if pkg_hash is not None:
                                raise api_errors.BadFileFormat(_("The file "
                                    "with hash {0} was expected to be a PEM "
                                    "certificate but it could not be "
                                    "read.").format(pkg_hash))
                        raise api_errors.BadFileFormat(_("The following string "
                            "was expected to be a PEM certificate, but it "
                            "could not be parsed as such:\n{0}".format(s)))
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def __get_certs_by_name(self, name):
                """Given 'name', a Cryptograhy 'Name' object, return the certs
                with that name as a subject."""

                res = []
                count = 0
                name_hsh = hashlib.sha1(misc.force_bytes(name)).hexdigest()

                def load_cert(pth):
                        with open(pth, "rb") as f:
                                return x509.load_pem_x509_certificate(
                                    f.read(), default_backend())

                try:
                        while True:
                                pth = os.path.join(self.__subj_root,
                                    "{0}.{1}".format(name_hsh, count))
                                res.append(load_cert(pth))
                                count += 1
                except EnvironmentError as e:
                        # When switching to a different hash algorithm, the hash
                        # name of file changes so that we couldn't find the
                        # file. We try harder to rebuild the subject's metadata
                        # if it's the first time we fail (count == 0).
                        if count == 0 and e.errno == errno.ENOENT:
                                self.__rebuild_subj_root()
                                try:
                                        res.append(load_cert(pth))
                                except EnvironmentError as ex:
                                        if ex.errno != errno.ENOENT:
                                                raise

                        t = api_errors._convert_error(e,
                            [errno.ENOENT])
                        if t:
                                raise t
                res.extend(self.__issuers.get(name_hsh, []))
                return res
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def calc_pem_hash(pth):
                """Find the hash of pem representation the file."""
                with open(pth, "rb") as f:
                        cert = x509.load_pem_x509_certificate(
                            f.read(), default_backend())
                return hashlib.sha1(
                    cert.public_bytes(serialization.Encoding.PEM)).hexdigest()
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def __make_tmp_cert(d, pth):
        try:
                with open(pth, "rb") as f:
                        cert = x509.load_pem_x509_certificate(f.read(),
                            default_backend())
        except (ValueError, IOError) as e:
                raise api_errors.BadFileFormat(_("The file {0} was expected to "
                    "be a PEM certificate but it could not be read.").format(
                    pth))
        fd, fp = tempfile.mkstemp(dir=d)
        with os.fdopen(fd, "wb") as fh:
                fh.write(cert.public_bytes(serialization.Encoding.PEM))
        return fp
项目:Playground3    作者:CrimsonVista    | 项目源码 | 文件源码
def getCertFromBytes(pem_data):
    return x509.load_pem_x509_certificate(pem_data, backend)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def generate_x509_fingerprint(pem_key):
    try:
        if isinstance(pem_key, six.text_type):
            pem_key = pem_key.encode('utf-8')
        cert = x509.load_pem_x509_certificate(
            pem_key, backends.default_backend())
        raw_fp = binascii.hexlify(cert.fingerprint(hashes.SHA1()))
        if six.PY3:
            raw_fp = raw_fp.decode('ascii')
        return ':'.join(a + b for a, b in zip(raw_fp[::2], raw_fp[1::2]))
    except (ValueError, TypeError, binascii.Error) as ex:
        raise exception.InvalidKeypair(
            reason=_('failed to generate X509 fingerprint. '
                     'Error message: %s') % ex)
项目:perkele    作者:schors    | 项目源码 | 文件源码
def load_pem_certificate(data):
    """
    Loads a PEM X.509 certificate.
    """
    return x509.load_pem_x509_certificate(data, default_backend())
项目:django-auth-adfs    作者:jobec    | 项目源码 | 文件源码
def _load_from_string(cls, certificate):
        """
        Load a certificate from a string.

        Args:
            certificate (str): A base64 PEM encoded certificate
        """
        certificate = certificate.encode()
        try:
            cert_obj = load_pem_x509_certificate(certificate, backend)
            cls._public_keys.append(cert_obj.public_key())
            cls._key_age = datetime.now()
        except ValueError:
            raise ImproperlyConfigured("Invalid ADFS token signing certificate")
项目:marathon-acme    作者:praekeltfoundation    | 项目源码 | 文件源码
def test_generate_wildcard_pem_bytes():
    """
    When we generate a self-signed wildcard certificate's PEM data, that data
    should be deserializable and the deserilized certificate should have the
    expected parameters.
    """
    pem_bytes = generate_wildcard_pem_bytes()

    # Parse the concatenated bytes into a list of object
    pem_objects = pem.parse(pem_bytes)

    assert_that(pem_objects, HasLength(2))

    # Deserialize the private key and assert that it is the right type (the
    # other details we trust txacme with)
    key = serialization.load_pem_private_key(
        pem_objects[0].as_bytes(),
        password=None,
        backend=default_backend()
    )
    assert_that(key, IsInstance(rsa.RSAPrivateKey))

    # Deserialize the certificate and validate all the options we set
    cert = x509.load_pem_x509_certificate(
        pem_objects[1].as_bytes(), backend=default_backend()
    )
    expected_before = datetime.today() - timedelta(days=1)
    expected_after = datetime.now() + timedelta(days=3650)
    assert_that(cert, MatchesStructure(
        issuer=MatchesListwise([
            MatchesStructure(value=Equals(u'*'))
        ]),
        subject=MatchesListwise([
            MatchesStructure(value=Equals(u'*'))
        ]),
        not_valid_before=matches_time_or_just_before(expected_before),
        not_valid_after=matches_time_or_just_before(expected_after),
        signature_hash_algorithm=IsInstance(hashes.SHA256)
    ))
    assert_that(cert.public_key().public_numbers(), Equals(
                key.public_key().public_numbers()))


# From txacme
项目:ndr    作者:SecuredByTHEM    | 项目源码 | 文件源码
def verify_and_load_message(config, file, only_accept_cn=None):
        '''Checks the message's signature to make sure it was signed by someone in our CA chain'''
        try:
            # We need a temporary file to get the signer PEM
            msg_fd, signer_pem = tempfile.mkstemp()
            os.close(msg_fd)  # Don't need to write anything to it

            # CApath is required to force out any system CAs that might be in the global CNF file
            ossl_verify_cmd = ["openssl", "smime", "-verify",
                               "-in", file, "-CAfile", config.ssl_cafile,
                               "-CApath", "/dev/nonexistant-dir",
                               "-text", "-signer", signer_pem]

            ossl_verify_proc = subprocess.run(
                args=ossl_verify_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                check=False)

            if ossl_verify_proc.returncode != 0:
                config.logger.warn(
                    "rejecting %s: %s", file, str(ossl_verify_proc.stderr))
                return None

            config.logger.info("%s passed openssl S/MIME verify", file)

            config.logger.debug("checking %s", signer_pem)
            with open(signer_pem, 'rb') as x509_signer:
                # NOW we can use cryptography to read the x509 certificates
                cert = x509.load_pem_x509_certificate(
                    x509_signer.read(), default_backend())

                common_name = cert.subject.get_attributes_for_oid(
                    x509.NameOID.COMMON_NAME)[0].value

                config.logger.debug(
                    "signed message came from common name: %s", common_name)

            # Make sure the CN is something we're willing to accept
            if only_accept_cn is not None:
                if common_name != only_accept_cn:
                    config.logger.error("rejecting message due to CN %s != %s",
                                        common_name, only_accept_cn)

            # Read it in
            decoded_message = ossl_verify_proc.stdout
            message = IngestMessage(config)
            message.load_from_yaml(decoded_message)

            return message

        # and clean up after ourselves
        finally:
            os.remove(signer_pem)
项目:analyst-scripts    作者:Te-k    | 项目源码 | 文件源码
def check_certificate(self, domain):
        """
        Download and get information from the TLS certificate
        """
        pem = ssl.get_server_certificate((domain, 443))
        if self.output:
            with open(os.path.join(self.output, 'cert.pem'), 'wb') as f:
                f.write(pem)


        cert = x509.load_pem_x509_certificate(str(pem), default_backend())
        self.log.critical("\tCertificate:")
        self.log.critical("\t\tDomain: %s", ",".join(map(lambda x: x.value, cert.subject)))
        self.log.critical("\t\tNot After: %s", str(cert.not_valid_after))
        self.log.critical("\t\tNot Before: %s", str(cert.not_valid_before))
        self.log.critical("\t\tCA Issuer: %s", ", ".join(map(lambda x:x.value, cert.issuer)))
        self.log.critical("\t\tSerial: %s", cert.serial_number)
        for ext in cert.extensions:
            if ext.oid._name == 'basicConstraints':
                if ext.value.ca:
                    self.log.critical("\t\tBasic Constraints: True")
            elif ext.oid._name == 'subjectAltName':
                self.log.critical("\t\tAlternate names: %s", ", ".join(ext.value.get_values_for_type(x509.DNSName)))
项目:seedbox    作者:nailgun    | 项目源码 | 文件源码
def issue_certificate(cn, ca_cert, ca_key,
                      organizations=(),
                      san_dns=(),
                      san_ips=(),
                      key_size=2048,
                      certify_days=365,
                      is_web_server=False,
                      is_web_client=False):
    ca_cert = x509.load_pem_x509_certificate(ca_cert, default_backend())
    ca_key = serialization.load_pem_private_key(ca_key, password=None, backend=default_backend())
    ca_key_id = x509.SubjectKeyIdentifier.from_public_key(ca_key.public_key())

    key = rsa.generate_private_key(public_exponent=65537, key_size=key_size, backend=default_backend())

    subject_name_attributes = [x509.NameAttribute(NameOID.COMMON_NAME, cn)]
    subject_name_attributes += [x509.NameAttribute(NameOID.ORGANIZATION_NAME, org) for org in organizations]
    subject = x509.Name(subject_name_attributes)

    now = datetime.datetime.utcnow()
    cert = x509.CertificateBuilder() \
        .subject_name(subject) \
        .issuer_name(ca_cert.issuer) \
        .public_key(key.public_key()) \
        .serial_number(x509.random_serial_number()) \
        .not_valid_before(now) \
        .not_valid_after(now + datetime.timedelta(days=certify_days)) \
        .add_extension(x509.AuthorityKeyIdentifier(ca_key_id.digest,
                                                   [x509.DirectoryName(ca_cert.issuer)],
                                                   ca_cert.serial_number),
                       critical=False) \
        .add_extension(x509.KeyUsage(digital_signature=True,
                                     content_commitment=False,
                                     key_encipherment=True,
                                     data_encipherment=False,
                                     key_agreement=False,
                                     key_cert_sign=False,
                                     crl_sign=False,
                                     encipher_only=False,
                                     decipher_only=False),
                       critical=True)

    extended_usages = []
    if is_web_server:
        extended_usages.append(ExtendedKeyUsageOID.SERVER_AUTH)
    if is_web_client:
        extended_usages.append(ExtendedKeyUsageOID.CLIENT_AUTH)
    if extended_usages:
        cert = cert.add_extension(x509.ExtendedKeyUsage(extended_usages), critical=False)

    sans = [x509.DNSName(name) for name in san_dns]
    sans += [x509.IPAddress(ipaddress.ip_address(ip)) for ip in san_ips]
    if sans:
        cert = cert.add_extension(x509.SubjectAlternativeName(sans), critical=False)

    cert = cert.sign(ca_key, hashes.SHA256(), default_backend())

    cert = cert.public_bytes(serialization.Encoding.PEM)
    key = key.private_bytes(encoding=serialization.Encoding.PEM,
                            format=serialization.PrivateFormat.TraditionalOpenSSL,
                            encryption_algorithm=serialization.NoEncryption())
    return cert, key