Python cryptography.x509 模块,load_der_x509_certificate() 实例源码

我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用cryptography.x509.load_der_x509_certificate()

项目:cursive    作者:openstack    | 项目源码 | 文件源码
def load_certificate(self, cert_name):
        # Load the raw certificate file data.
        path = os.path.join(self.cert_path, cert_name)
        with open(path, 'rb') as cert_file:
            data = cert_file.read()

        # Convert the raw certificate data into a certificate object, first
        # as a PEM-encoded certificate and, if that fails, then as a
        # DER-encoded certificate. If both fail, the certificate cannot be
        # loaded.
        try:
            return x509.load_pem_x509_certificate(data, default_backend())
        except Exception:
            try:
                return x509.load_der_x509_certificate(data, default_backend())
            except Exception:
                raise exception.SignatureVerificationError(
                    "Failed to load certificate: %s" % path
                )
项目:loopchain    作者:theloopkr    | 项目源码 | 文件源码
def verify_peer_token(self, peer_token, peer, peer_type):
        token_time = peer_token[:16]
        token_sign = peer_token[16:]
        current_date = int(datetime.datetime.now().timestamp() * 1000)
        token_date = int(token_time, 16)
        if current_date > token_date:
            return False

        date = bytes.fromhex(token_time)

        peer_info = b''.join([peer.peer_id.encode('utf-8'),
                              peer.target.encode('utf-8'),
                              peer.group_id.encode('utf-8')]) + bytes([peer_type])

        peer_cert = x509.load_der_x509_certificate(bytes.fromhex(peer.cert), default_backend())
        peer_pub = peer_cert.public_key().public_bytes(encoding=serialization.Encoding.DER,
                                                       format=PublicFormat.SubjectPublicKeyInfo)

        token_bytes = peer_info + date + peer_pub
        logging.debug("TBS Token(V) : %s", token_bytes.hex())
        signature = bytes.fromhex(token_sign)

        return self.verify_data_with_cert(cert=self.__ca_cert, data=token_bytes, signature=signature)
项目:certproxy    作者:geneanet    | 项目源码 | 文件源码
def load_certificate(cert_file=None, cert_bytes=None):
    if cert_file:
        with open(cert_file, 'rb') as f:
            data = f.read()
    else:
        data = cert_bytes

    if '-----BEGIN'.encode() in data:
        cert = x509.load_pem_x509_certificate(
            data=data,
            backend=default_backend()
        )
    else:
        cert = x509.load_der_x509_certificate(
            data=data,
            backend=default_backend()
        )

    return cert
项目:reston    作者:zeaphoo    | 项目源码 | 文件源码
def get_certificate(self, filename):
        """
            Return a certificate object by giving the name in the apk file
        """
        pkcs7message = self.get_file(filename)

        message, _ = decode(pkcs7message)
        cert = encode(message[1][3])
        # Remove the first identifier·
        # byte 0 == identifier, skip
        # byte 1 == length. If byte1 & 0x80 > 1, we have long format
        #                   The length of to read bytes is then coded
        #                   in byte1 & 0x7F
        cert = cert[2 + (cert[1] & 0x7F) if cert[1] & 0x80 > 1 else 2:]

        certificate = x509.load_der_x509_certificate(cert, default_backend())

        return certificate
项目:concorde    作者:frutiger    | 项目源码 | 文件源码
def get_certificate_chain(self, certificate):
        cert = requests.get(certificate)
        if cert.status_code != 200:
            raise ClientError('Certificate fetch failed: {}'.format(
                                                        cert.json()['detail']))

        if 'up' not in cert.links:
            return

        chain_url = urllib.parse.urljoin(certificate, cert.links['up']['url'])
        chain = requests.get(chain_url)
        if chain.status_code != 200:
            raise ClientError('Certificate chain fetch failed: {}'.format(
                                                        cert.json()['detail']))

        return x509.load_der_x509_certificate(chain.content, backend)
项目:manuale    作者:veeti    | 项目源码 | 文件源码
def load_der_certificate(data):
    """
    Loads a DER X.509 certificate.
    """
    return x509.load_der_x509_certificate(data, default_backend())
项目:exe    作者:malice-plugins    | 项目源码 | 文件源码
def _ValidatePubkeyGeneric(self, signing_cert, digest_alg, payload,
                             enc_digest):
    cert = x509.load_der_x509_certificate(der_encoder.encode(signing_cert), default_backend())
    pubkey = cert.public_key()
    if isinstance(pubkey, RSAPublicKey):
        verifier = pubkey.verifier(enc_digest, padding.PKCS1v15(), cert.signature_hash_algorithm)
    else:
        verifier = pubkey.verifier(enc_digest, cert.signature_hash_algorithm)
    verifier.update(payload)
    try:
        verifier.verify()
        return True
    except:
        return False
项目:exe    作者:malice-plugins    | 项目源码 | 文件源码
def ValidateCertificateSignature(self, signed_cert, signing_cert):
    """Given a cert signed by another cert, validates the signature."""
    # First the naive way -- note this does not check expiry / use etc.
    signed = x509.load_der_x509_certificate(der_encoder.encode(signed_cert), default_backend())
    signing = x509.load_der_x509_certificate(der_encoder.encode(signing_cert), default_backend())
    verifier = signing.public_key().verifier(signed.signature, padding.PKCS1v15(), signed.signature_hash_algorithm)
    verifier.update(signed.tbs_certificate_bytes)
    try:
        verifier.verify()
    except Exception as e:
        raise Asn1Error('1: Validation of cert signature failed: {}'.format(e))
项目:loopchain    作者:theloopkr    | 项目源码 | 文件源码
def convert_x509cert(self, cert_bytes):
        """???? ???? x509 ???? ??

        :param cert_bytes: ??? bytes
        :return: x509 ???
        """
        return x509.load_der_x509_certificate(cert_bytes, default_backend())
项目:loopchain    作者:theloopkr    | 项目源码 | 文件源码
def verify_data_with_dercert(self, cert_der, data: bytes, signature: bytes) -> bool:
        """
        ?? ? ??? ??
        :param cert_der: ???(der bytes)
        :param data: ?? ??
        :param signature: ??
        :return: ?? ??(True/False)
        """
        cert = x509.load_der_x509_certificate(cert_der, default_backend())
        return self.verify_data_with_cert(cert=cert, data=data, signature=signature)
项目:loopchain    作者:theloopkr    | 项目源码 | 文件源码
def verify_certificate_der(self, der_cert):
        """
        ??? ??
        :param der_cert: DER ??? ?????
        :return: ?? ??
        """
        cert = x509.load_der_x509_certificate(der_cert, default_backend())
        return self.verify_certificate(cert)
项目:loopchain    作者:theloopkr    | 项目源码 | 文件源码
def generate_peer_token(self, peer_sign, peer_cert, peer_id, peer_target,
                            group_id, peer_type, rand_key, token_interval):
        peer_info = b''.join([peer_id.encode('utf-8'),
                              peer_target.encode('utf-8'),
                              group_id.encode('utf-8')]) + bytes([peer_type])
        data = peer_info + rand_key

        cert = x509.load_der_x509_certificate(peer_cert, default_backend())
        if self.verify_data_with_cert(cert=cert, data=data, signature=peer_sign):

            time = datetime.datetime.now() + datetime.timedelta(minutes=token_interval)
            date = int(time.timestamp() * 1000).to_bytes(length=8, byteorder='big')

            peer_pub = cert.public_key().public_bytes(encoding=serialization.Encoding.DER,
                                                      format=PublicFormat.SubjectPublicKeyInfo)

            # token_bytes = peer_id || peer_target || group_id || peer_type || peer_pub
            token_bytes = peer_info + date + peer_pub
            logging.debug("TBS Token[%s]", token_bytes.hex())

            # token = date || CA_Sign(token_bytes)
            signed_token = self.sign_data(token_bytes)
            token = b''.join([date, signed_token]).hex()
            return token
        else:
            logging.debug('?? ?? ?? ?? ??? ?? ??')
            return None
项目:Snakepit    作者:K4lium    | 项目源码 | 文件源码
def _ValidatePubkeyGeneric(self, signing_cert, digest_alg, payload,
                             enc_digest):
    cert = x509.load_der_x509_certificate(der_encoder.encode(signing_cert), default_backend())
    pubkey = cert.public_key()
    if isinstance(pubkey, RSAPublicKey):
        verifier = pubkey.verifier(enc_digest, padding.PKCS1v15(), cert.signature_hash_algorithm)
    else:
        verifier = pubkey.verifier(enc_digest, cert.signature_hash_algorithm)
    verifier.update(payload)
    try:
        verifier.verify()
        return True
    except:
        return False
项目:Snakepit    作者:K4lium    | 项目源码 | 文件源码
def ValidateCertificateSignature(self, signed_cert, signing_cert):
    """Given a cert signed by another cert, validates the signature."""
    # First the naive way -- note this does not check expiry / use etc.
    signed = x509.load_der_x509_certificate(der_encoder.encode(signed_cert), default_backend())
    signing = x509.load_der_x509_certificate(der_encoder.encode(signing_cert), default_backend())
    verifier = signing.public_key().verifier(signed.signature, padding.PKCS1v15(), signed.signature_hash_algorithm)
    verifier.update(signed.tbs_certificate_bytes)
    try:
        verifier.verify()
    except Exception as e:
        raise Asn1Error('1: Validation of cert signature failed: {}'.format(e))
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def get_certificate(context, signature_certificate_uuid):
    """Create the certificate object from the retrieved certificate data.

    :param context: the user context for authentication
    :param signature_certificate_uuid: the uuid to use to retrieve the
                                       certificate
    :returns: the certificate cryptography object
    :raises: SignatureVerificationError if the retrieval fails or the format
             is invalid
    """
    keymgr_api = key_manager.API()

    try:
        # The certificate retrieved here is a castellan certificate object
        cert = keymgr_api.get(context, signature_certificate_uuid)
    except KeyManagerError as e:
        # The problem encountered may be backend-specific, since castellan
        # can use different backends.  Rather than importing all possible
        # backends here, the generic "Exception" is used.
        msg = (_LE("Unable to retrieve certificate with ID %(id)s: %(e)s")
               % {'id': signature_certificate_uuid,
                  'e': encodeutils.exception_to_unicode(e)})
        LOG.error(msg)
        raise exception.SignatureVerificationError(
            reason=_('Unable to retrieve certificate with ID: %s')
            % signature_certificate_uuid)

    if cert.format not in CERTIFICATE_FORMATS:
        raise exception.SignatureVerificationError(
            reason=_('Invalid certificate format: %s') % cert.format)

    if cert.format == X_509:
        # castellan always encodes certificates in DER format
        cert_data = cert.get_encoded()
        certificate = x509.load_der_x509_certificate(cert_data,
                                                     default_backend())

    # verify the certificate
    verify_certificate(certificate)

    return certificate
项目:perkele    作者:schors    | 项目源码 | 文件源码
def load_der_certificate(data):
    """
    Loads a DER X.509 certificate.
    """
    return x509.load_der_x509_certificate(data, default_backend())
项目:concorde    作者:frutiger    | 项目源码 | 文件源码
def get_certificate(self, certificate):
        cert = requests.get(certificate)
        if cert.status_code != 200:
            raise ClientError('Certificate fetch failed: {}'.format(
                                                        cert.json()['detail']))

        return x509.load_der_x509_certificate(cert.content, backend)
项目:cursive    作者:openstack    | 项目源码 | 文件源码
def get_certificate(context, signature_certificate_uuid):
    """Create the certificate object from the retrieved certificate data.

    :param context: the user context for authentication
    :param signature_certificate_uuid: the uuid to use to retrieve the
                                       certificate
    :returns: the certificate cryptography object
    :raises: SignatureVerificationError if the retrieval fails or the format
             is invalid
    """
    keymgr_api = key_manager.API()

    try:
        # The certificate retrieved here is a castellan certificate object
        cert = keymgr_api.get(context, signature_certificate_uuid)
    except ManagedObjectNotFoundError as e:
        raise exception.SignatureVerificationError(
            reason=_('Certificate not found with ID: %s')
            % signature_certificate_uuid)
    except KeyManagerError as e:
        # The problem encountered may be backend-specific, since castellan
        # can use different backends.  Rather than importing all possible
        # backends here, the generic "Exception" is used.
        msg = (_LE("Unable to retrieve certificate with ID %(id)s: %(e)s")
               % {'id': signature_certificate_uuid,
                  'e': encodeutils.exception_to_unicode(e)})
        LOG.error(msg)
        raise exception.SignatureVerificationError(
            reason=_('Unable to retrieve certificate with ID: %s')
            % signature_certificate_uuid)

    if cert.format not in CERTIFICATE_FORMATS:
        raise exception.SignatureVerificationError(
            reason=_('Invalid certificate format: %s') % cert.format)

    if cert.format == X_509:
        # castellan always encodes certificates in DER format
        cert_data = cert.get_encoded()
        certificate = x509.load_der_x509_certificate(cert_data,
                                                     default_backend())

    return certificate
项目:deb-python-pykmip    作者:openstack    | 项目源码 | 文件源码
def _get_client_identity(self):
        certificate_data = self._connection.getpeercert(binary_form=True)
        try:
            certificate = x509.load_der_x509_certificate(
                certificate_data,
                backends.default_backend()
            )
        except Exception:
            # This should never get raised "in theory," as the ssl socket
            # should fail to connect non-TLS connections before the session
            # gets created. This is a failsafe in case that protection fails.
            raise exceptions.PermissionDenied(
                "Failure loading the client certificate from the session "
                "connection. Could not retrieve client identity."
            )

        try:
            extended_key_usage = certificate.extensions.get_extension_for_oid(
                x509.oid.ExtensionOID.EXTENDED_KEY_USAGE
            ).value
        except x509.ExtensionNotFound:
            raise exceptions.PermissionDenied(
                "The extended key usage extension is missing from the client "
                "certificate. Session client identity unavailable."
            )

        if x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH in extended_key_usage:
            client_identities = certificate.subject.get_attributes_for_oid(
                x509.oid.NameOID.COMMON_NAME
            )
            if len(client_identities) > 0:
                if len(client_identities) > 1:
                    self._logger.warning(
                        "Multiple client identities found. Using the first "
                        "one processed."
                    )
                client_identity = client_identities[0].value
                self._logger.info(
                    "Session client identity: {0}".format(client_identity)
                )
                return client_identity
            else:
                raise exceptions.PermissionDenied(
                    "The client certificate does not define a subject common "
                    "name. Session client identity unavailable."
                )

        raise exceptions.PermissionDenied(
            "The extended key usage extension is not marked for client "
            "authentication in the client certificate. Session client "
            "identity unavailable."
        )
项目:deb-python-kmip    作者:openstack    | 项目源码 | 文件源码
def _get_client_identity(self):
        certificate_data = self._connection.getpeercert(binary_form=True)
        try:
            certificate = x509.load_der_x509_certificate(
                certificate_data,
                backends.default_backend()
            )
        except Exception:
            # This should never get raised "in theory," as the ssl socket
            # should fail to connect non-TLS connections before the session
            # gets created. This is a failsafe in case that protection fails.
            raise exceptions.PermissionDenied(
                "Failure loading the client certificate from the session "
                "connection. Could not retrieve client identity."
            )

        try:
            extended_key_usage = certificate.extensions.get_extension_for_oid(
                x509.oid.ExtensionOID.EXTENDED_KEY_USAGE
            ).value
        except x509.ExtensionNotFound:
            raise exceptions.PermissionDenied(
                "The extended key usage extension is missing from the client "
                "certificate. Session client identity unavailable."
            )

        if x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH in extended_key_usage:
            client_identities = certificate.subject.get_attributes_for_oid(
                x509.oid.NameOID.COMMON_NAME
            )
            if len(client_identities) > 0:
                if len(client_identities) > 1:
                    self._logger.warning(
                        "Multiple client identities found. Using the first "
                        "one processed."
                    )
                client_identity = client_identities[0].value
                self._logger.info(
                    "Session client identity: {0}".format(client_identity)
                )
                return client_identity
            else:
                raise exceptions.PermissionDenied(
                    "The client certificate does not define a subject common "
                    "name. Session client identity unavailable."
                )

        raise exceptions.PermissionDenied(
            "The extended key usage extension is not marked for client "
            "authentication in the client certificate. Session client "
            "identity unavailable."
        )
项目:revocation-tracker    作者:alex    | 项目源码 | 文件源码
def fetch_details(self, crtsh_ids):
        rows = self._engine.execute("""
        SELECT
            c.id, c.certificate, array_agg(DISTINCT cc.ca_owner)
        FROM certificate c
        INNER JOIN
            ca_certificate cac ON c.issuer_ca_id = cac.ca_id
        INNER JOIN
            ccadb_certificate cc ON cac.certificate_id = cc.certificate_id
        WHERE c.id IN %s
        GROUP BY c.id, c.certificate
        """, [(tuple(crtsh_ids),)]).fetchall()

        details = []
        for row in rows:
            cert = x509.load_der_x509_certificate(
                bytes(row[1]), default_backend()
            )

            subject_cn = cert.subject.get_attributes_for_oid(
                x509.NameOID.COMMON_NAME
            )
            issuer_cn = cert.issuer.get_attributes_for_oid(
                x509.NameOID.COMMON_NAME
            )
            try:
                san = cert.extensions.get_extension_for_class(
                    x509.SubjectAlternativeName
                )
            except x509.ExtensionNotFound:
                san_domains = None
            else:
                san_domains = san.value.get_values_for_type(x509.DNSName)

            details.append(RawCertificateDetails(
                crtsh_id=row[0],
                common_name=", ".join(a.value for a in subject_cn) if subject_cn else None,
                san_dns_names=san_domains,
                ccadb_owners=[o for o in row[2] if o is not None],
                issuer_common_name=", ".join(a.value for a in issuer_cn) if issuer_cn else None,
                expiration_date=cert.not_valid_after,
            ))
        return details
项目:txacme    作者:twisted    | 项目源码 | 文件源码
def _issue_cert(self, client, server_name):
        """
        Issue a new cert for a particular name.
        """
        log.info(
            'Requesting a certificate for {server_name!r}.',
            server_name=server_name)
        key = self._generate_key()
        objects = [
            Key(key.private_bytes(
                encoding=serialization.Encoding.PEM,
                format=serialization.PrivateFormat.TraditionalOpenSSL,
                encryption_algorithm=serialization.NoEncryption()))]

        def answer_and_poll(authzr):
            def got_challenge(stop_responding):
                return (
                    poll_until_valid(authzr, self._clock, client)
                    .addBoth(tap(lambda _: stop_responding())))
            return (
                answer_challenge(authzr, client, self._responders)
                .addCallback(got_challenge))

        def got_cert(certr):
            objects.append(
                Certificate(
                    x509.load_der_x509_certificate(
                        certr.body, default_backend())
                    .public_bytes(serialization.Encoding.PEM)))
            return certr

        def got_chain(chain):
            for certr in chain:
                got_cert(certr)
            log.info(
                'Received certificate for {server_name!r}.',
                server_name=server_name)
            return objects

        return (
            client.request_challenges(fqdn_identifier(server_name))
            .addCallback(answer_and_poll)
            .addCallback(lambda ign: client.request_issuance(
                CertificateRequest(
                    csr=csr_for_names([server_name], key))))
            .addCallback(got_cert)
            .addCallback(client.fetch_chain)
            .addCallback(got_chain)
            .addCallback(partial(self.cert_store.store, server_name)))