Python cryptography.hazmat.primitives.serialization 模块,load_pem_private_key() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用cryptography.hazmat.primitives.serialization.load_pem_private_key()

项目:marathon-acme    作者:praekeltfoundation    | 项目源码 | 文件源码
def test_key_not_exists(self, pem_path):
        """
        When we get the client key and no key file exists, a new key should be
        generated and the key should be saved in a key file.
        """
        key = maybe_key(pem_path)

        pem_file = pem_path.child(u'client.key')
        assert_that(pem_file.exists(), Equals(True))

        file_key = serialization.load_pem_private_key(
            pem_file.getContent(),
            password=None,
            backend=default_backend()
        )
        file_key = JWKRSA(key=file_key)

        assert_that(key, Equals(file_key))
项目:fabric-sdk-py    作者:hyperledger    | 项目源码 | 文件源码
def _restore_state(self):
        """ Restore user state. """
        try:
            state = self._state_store.get_value(self._state_store_key)
            state_dict = pickle.loads(
                binascii.unhexlify(state.encode("utf-8")))
            self._name = state_dict['name']
            self.enrollment_secret = state_dict['enrollment_secret']
            enrollment = state_dict['enrollment']
            if enrollment:
                private_key = serialization.load_pem_private_key(
                    enrollment['private_key'],
                    password=None,
                    backend=default_backend()
                )
                cert = enrollment['cert']
                self.enrollment = Enrollment(private_key, cert)
            self.affiliation = state_dict['affiliation']
            self.account = state_dict['account']
            self.roles = state_dict['roles']
            self._org = state_dict['org']
            self.msp_id = state_dict['msp_id']
        except Exception as e:
            raise IOError("Cannot deserialize the user", e)
项目:loopchain    作者:theloopkr    | 项目源码 | 文件源码
def read_file(self, subject_name, pw=None):
        """
        ???? ???

        :param subject_name: ??? ??? ??? ??
        :return: ??? ???
        """
        pem_path = os.path.join(os.path.dirname(__file__),
                                "../../resources/unittest/" + subject_name + "/cert.pem")
        f = open(pem_path, "rb")
        cert_pem = f.read()
        f.close()
        cert = x509.load_pem_x509_certificate(cert_pem, default_backend())

        key_path = os.path.join(os.path.dirname(__file__),
                                "../../resources/unittest/" + subject_name + "/key.pem")
        f = open(key_path, "rb")
        cert_key = f.read()
        f.close()

        private_key = serialization.load_pem_private_key(cert_key, pw, default_backend())
        return {'cert': cert, 'private_key': private_key}
项目:loopchain    作者:theloopkr    | 项目源码 | 文件源码
def load_pki(self, cert_path: str, cert_pass=None):
        """
        ??? ??

        :param cert_path: ??? ??
        :param cert_pass: ??? ????
        """
        ca_cert_file = join(cert_path, self.CERT_NAME)
        ca_pri_file = join(cert_path, self.PRI_NAME)

        # ???/??? ??
        with open(ca_cert_file, "rb") as der:
            cert_bytes = der.read()
            self.__ca_cert = x509.load_pem_x509_certificate(cert_bytes, default_backend())
        with open(ca_pri_file, "rb") as der:
            private_bytes = der.read()
            try:
                self.__ca_pri = serialization.load_pem_private_key(private_bytes, cert_pass, default_backend())
            except ValueError:
                logging.debug("Invalid Password")

        # ??? ? ? ??
        sign = self.sign_data(b'TEST')
        if self.verify_data(b'TEST', sign) is False:
            logging.debug("Invalid Signature(Root Certificate load test)")
项目:Chaos    作者:Chaosthebot    | 项目源码 | 文件源码
def create_decryptor(private_location, public_location):
    try:
        with open(private_location, "rb") as key_file:
            private_key = serialization.load_pem_private_key(
                key_file.read(),
                password=None,
                backend=default_backend()
            )
    except FileNotFoundError:
        with open(private_location, "wb") as key_file:
            private_key = rsa.generate_private_key(
                public_exponent=65537,
                key_size=2048,
                backend=default_backend()
            )
            key_file.write(private_key.private_bytes(
                encoding=serialization.Encoding.PEM,
                format=serialization.PrivateFormat.TraditionalOpenSSL,
                encryption_algorithm=serialization.NoEncryption()
            ))

    with open(public_location, "wb") as public_file:
        public_key = private_key.public_key()
        pem = public_key.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )
        public_file.write(pem)

    def decrypt(ciphertext):
        return private_key.decrypt(
            ciphertext,
            padding.OAEP(
                mgf=padding.MGF1(algorithm=hashes.SHA1()),
                algorithm=hashes.SHA1(),
                label=None
            )
        )

    return decrypt
项目:certbot    作者:nikoloskii    | 项目源码 | 文件源码
def _load_cryptography_key(cls, data, password=None, backend=None):
        backend = default_backend() if backend is None else backend
        exceptions = {}

        # private key?
        for loader in (serialization.load_pem_private_key,
                       serialization.load_der_private_key):
            try:
                return loader(data, password, backend)
            except (ValueError, TypeError,
                    cryptography.exceptions.UnsupportedAlgorithm) as error:
                exceptions[loader] = error

        # public key?
        for loader in (serialization.load_pem_public_key,
                       serialization.load_der_public_key):
            try:
                return loader(data, backend)
            except (ValueError,
                    cryptography.exceptions.UnsupportedAlgorithm) as error:
                exceptions[loader] = error

        # no luck
        raise errors.Error('Unable to deserialize key: {0}'.format(exceptions))
项目:Parlay    作者:PromenadeSoftware    | 项目源码 | 文件源码
def get_channel(self):
        #first get a token we need to sign in order to prove we are who we say we are
        r = requests.get(str(self.base_link_uri) + "/get_device_token", params={"UUID": self.uuid, })

        token = r.json()["token"]
        # get the private Key
        with open(CloudLinkSettings.PRIVATE_KEY_LOCATION,'r') as key_file:
            private_key = serialization.load_pem_private_key(key_file.read(),
                                                             password=CloudLinkSettings.PRIVATE_KEY_PASSPHRASE,
                                                             backend=default_backend())

        # sign the token with our private key
        signer = private_key.signer(padding.PKCS1v15(), hashes.SHA256())
        signer.update(bytes(token))
        signature = signer.finalize()

        # get the randomly assigned channel for my UUID
        r = requests.get(str(self.base_link_uri) + "/get_device_group",
                         params={"UUID": self.uuid, "signature": b64encode(signature), "format": "PKCS1_v1_5"})
        if r.ok:
            self.channel_uri = r.json()["channel"]
        elif r.status_code == 400:
            raise Exception("UUID or Token not registered with Cloud.")
        elif r.status_code == 403:
            raise Exception("Signature didn't verify correctly. Bad private key or signature.")
项目:dcos    作者:dcos    | 项目源码 | 文件源码
def decode_pem_key(key_pem):
    """Convert plaintext PEM key into the format usable for JWT generation

    Args:
        key_pam (str): key data in PEM format, presented as plain string

    Returns:
        Parsed PEM data
    """
    private_key = serialization.load_pem_private_key(
        data=key_pem.encode('ascii'),
        password=None,
        backend=default_backend())

    msg = 'Unexpected private key type'
    assert isinstance(private_key, rsa.RSAPrivateKey), msg
    assert private_key.key_size >= 2048, 'RSA key size too small'

    return private_key
项目:komlogd    作者:komlog-io    | 项目源码 | 文件源码
def load_private_key(privkey_file):
    ''' 
    Loads the private key stored in the file indicated by the privkey_file
    parameter and returns it

    The `privkey_file` parameter indicates the absolute path to the file
    storing the private key.

    The key returned is a RSAPrivateKey instance.
    '''
    with open(privkey_file, "rb") as key_file:
        privkey = serialization.load_pem_private_key(
            key_file.read(),
            password=None,
            backend=default_backend()
        )
    return privkey
项目:cryptoverse-probe    作者:Cryptoverse    | 项目源码 | 文件源码
def rsa_sign(private_key, message):
    """Signs a message with the provided Rsa private key.

    Args:
        private_key (str): Rsa private key with BEGIN and END sections.
        message (str): Message to be hashed and signed.

    Returns:
        str: Hex signature of the message, with its leading 0x stripped.
    """
    private_rsa = load_pem_private_key(bytes(private_key), password=None, backend=default_backend())
    hashed = sha256(message)
    signature = private_rsa.sign(
        hashed,
        padding.PSS(
            mgf=padding.MGF1(hashes.SHA256()),
            salt_length=padding.PSS.MAX_LENGTH
        ),
        hashes.SHA256()
    )
    return binascii.hexlify(bytearray(signature))
项目:txacme    作者:twisted    | 项目源码 | 文件源码
def load_or_create_client_key(pem_path):
    """
    Load the client key from a directory, creating it if it does not exist.

    .. note:: The client key that will be created will be a 2048-bit RSA key.

    :type pem_path: ``twisted.python.filepath.FilePath``
    :param pem_path: The certificate directory
        to use, as with the endpoint.
    """
    acme_key_file = pem_path.asTextMode().child(u'client.key')
    if acme_key_file.exists():
        key = serialization.load_pem_private_key(
            acme_key_file.getContent(),
            password=None,
            backend=default_backend())
    else:
        key = generate_private_key(u'rsa')
        acme_key_file.setContent(
            key.private_bytes(
                encoding=serialization.Encoding.PEM,
                format=serialization.PrivateFormat.TraditionalOpenSSL,
                encryption_algorithm=serialization.NoEncryption()))
    return JWKRSA(key=key)
项目:sawtooth-core    作者:hyperledger    | 项目源码 | 文件源码
def __init__(self, signer):
        self._factory = MessageFactory(
            family_name="sawtooth_validator_registry",
            family_version="1.0",
            namespace="6a4372",
            signer=signer
        )
        self.public_key_hash = hashlib.sha256(
            signer.get_public_key().as_hex().encode()).hexdigest()
        self._report_private_key = \
            serialization.load_pem_private_key(
                self.__REPORT_PRIVATE_KEY_PEM__.encode(),
                password=None,
                backend=backends.default_backend())

        # First we need to create a public/private key pair for the PoET
        # enclave to use.
        context = create_context('secp256k1')
        self._poet_private_key = Secp256k1PrivateKey.from_hex(
            "1f70fa2518077ad18483f48e77882d11983b537fa5f7cf158684d2c670fe4f1f")
        self.poet_public_key = context.get_public_key(self._poet_private_key)
项目:aws-encryption-sdk-python    作者:awslabs    | 项目源码 | 文件源码
def _mgf1_sha256_supported():
    wk = serialization.load_pem_private_key(
        data=VALUES['raw'][b'asym1'][EncryptionKeyType.PRIVATE],
        password=None,
        backend=default_backend()
    )
    try:
        wk.public_key().encrypt(
            plaintext=b'aosdjfoiajfoiaj;foijae;rogijaerg',
            padding=padding.OAEP(
                mgf=padding.MGF1(hashes.SHA256()),
                algorithm=hashes.SHA256(),
                label=None
            )
        )
    except cryptography.exceptions.UnsupportedAlgorithm:
        return False
    return True
项目:aws-encryption-sdk-python    作者:awslabs    | 项目源码 | 文件源码
def __init__(self, wrapping_algorithm, wrapping_key, wrapping_key_type, password=None):
        """Prepares initial values."""
        self.wrapping_algorithm = wrapping_algorithm
        self.wrapping_key_type = wrapping_key_type
        if wrapping_key_type is EncryptionKeyType.PRIVATE:
            self._wrapping_key = serialization.load_pem_private_key(
                data=wrapping_key,
                password=password,
                backend=default_backend()
            )
        elif wrapping_key_type is EncryptionKeyType.PUBLIC:
            self._wrapping_key = serialization.load_pem_public_key(
                data=wrapping_key,
                backend=default_backend()
            )
        elif wrapping_key_type is EncryptionKeyType.SYMMETRIC:
            self._wrapping_key = wrapping_key
            self._derived_wrapping_key = derive_data_encryption_key(
                source_key=self._wrapping_key,
                algorithm=self.wrapping_algorithm.algorithm,
                message_id=None
            )
        else:
            raise InvalidDataKeyError('Invalid wrapping_key_type: {}'.format(wrapping_key_type))
项目:concorde    作者:frutiger    | 项目源码 | 文件源码
def __call__(self, parser, namespace, values, option_string):

        if namespace.key_type == 'raw':
            setattr(namespace, self.dest, raw_loader(values))
        elif namespace.key_type == 'pem':
            setattr(namespace,
                    self.dest,
                    serialization.load_pem_private_key(raw_loader(values),
                                                       None,
                                                       backend))
        elif namespace.key_type == 'der':
            setattr(namespace,
                    self.dest,
                    serialization.load_der_private_key(raw_loader(values),
                                                       None,
                                                       backend))
项目:bless    作者:Netflix    | 项目源码 | 文件源码
def __init__(self, pem_private_key, private_key_password=None):
        """
        RSA Certificate Authority used to sign certificates.
        :param pem_private_key: PEM formatted RSA Private Key.  It should be encrypted with a
        password, but that is not required.
        :param private_key_password: Password to decrypt the PEM RSA Private Key, if it is
        encrypted.  Which it should be.
        """
        super(SSHCertificateAuthority, self).__init__()
        self.public_key_type = SSHPublicKeyType.RSA

        self.private_key = load_pem_private_key(pem_private_key,
                                                private_key_password,
                                                default_backend())

        self.signer = self.private_key.signer(padding.PKCS1v15(),
                                              hashes.SHA1())
        ca_pub_numbers = self.private_key.public_key().public_numbers()

        self.e = ca_pub_numbers.e
        self.n = ca_pub_numbers.n
项目:marathon-acme    作者:praekeltfoundation    | 项目源码 | 文件源码
def maybe_key(pem_path):
    """
    Set up a client key if one does not exist already.

    https://gist.github.com/glyph/27867a478bb71d8b6046fbfb176e1a33#file-local-certs-py-L32-L50

    :type pem_path: twisted.python.filepath.FilePath
    :param pem_path:
        The path to the certificate directory to use.
    """
    acme_key_file = pem_path.child(u'client.key')
    if acme_key_file.exists():
        key = serialization.load_pem_private_key(
            acme_key_file.getContent(),
            password=None,
            backend=default_backend()
        )
    else:
        key = generate_private_key(u'rsa')
        acme_key_file.setContent(
            key.private_bytes(
                encoding=serialization.Encoding.PEM,
                format=serialization.PrivateFormat.TraditionalOpenSSL,
                encryption_algorithm=serialization.NoEncryption()
            )
        )
    return jose.JWKRSA(key=key)
项目:fabric-sdk-py    作者:hyperledger    | 项目源码 | 文件源码
def get_peer_org_user(client, peer_org, user='Admin'):
    """Loads the requested user for a given peer org
        and returns a user object.
    """

    peer_user_base_path = os.path.join(
        os.getcwd(),
        'test/fixtures/e2e_cli/crypto-config/peerOrganizations/{0}'
        '/users/{1}@{0}/msp/'.format(peer_org, user)
    )

    key_path = os.path.join(
        peer_user_base_path,
        'keystore/',
        E2E_CONFIG['test-network'][peer_org]['users'][user]['private_key']
    )

    cert_path = os.path.join(
        peer_user_base_path,
        'signcerts/',
        E2E_CONFIG['test-network'][peer_org]['users'][user]['cert']
    )

    with open(key_path, 'rb') as key:
        key_pem = key.read()

    with open(cert_path, 'rb') as cert:
        cert_pem = cert.read()

    org_user = User('peer' + peer_org + user, peer_org, client.state_store)

    # wrap the key in a 'cryptography' private key object
    # so that all the methods can be used
    private_key = load_pem_private_key(key_pem, None, default_backend())

    enrollment = Enrollment(private_key, cert_pem)

    org_user.enrollment = enrollment
    org_user.msp_id = E2E_CONFIG['test-network'][peer_org]['mspid']

    return org_user
项目:pyseeder    作者:PurpleI2P    | 项目源码 | 文件源码
def append_signature(target_file, priv_key, priv_key_password=None):
    """Append signature to the end of file"""
    with open(target_file, "rb") as f:
        contents = f.read()

    with open(priv_key, "rb") as kf:
        private_key = serialization.load_pem_private_key(
            kf.read(), password=priv_key_password, backend=default_backend())

    signature = private_key.sign(contents, padding.PKCS1v15(), hashes.SHA512())

    with open(target_file, "ab") as f:
        f.write(signature)
项目:privcount    作者:privcount    | 项目源码 | 文件源码
def load_private_key_string(key_string):
    return serialization.load_pem_private_key(key_string, password=None, backend=default_backend())
项目:django-rest-framework-sso    作者:namespace-ee    | 项目源码 | 文件源码
def get_private_key_and_key_id(issuer, key_id=None):
    file_name = get_key_file_name(keys=api_settings.PRIVATE_KEYS, issuer=issuer, key_id=key_id)
    file_data = read_key_file(file_name=file_name)
    key = load_pem_private_key(file_data, password=None, backend=default_backend())
    return key, get_key_id(file_name=file_name)
项目:Steganography    作者:Ludisposed    | 项目源码 | 文件源码
def load_key(filename):
    with open(filename, 'rb') as pem_in:
        pemlines = pem_in.read()
    private_key = load_pem_private_key(pemlines, None, default_backend())
    return private_key
项目:Complete-Bunq-API-Python-Wrapper    作者:PJUllrich    | 项目源码 | 文件源码
def convert_private_key_to_pem(private_key):
        private_key_bytes = private_key
        if not isinstance(private_key_bytes, bytes):
            private_key_bytes = private_key.encode()

        return serialization.load_pem_private_key(
            private_key_bytes,
            password=None,
            backend=default_backend()
        )
项目:Complete-Bunq-API-Python-Wrapper    作者:PJUllrich    | 项目源码 | 文件源码
def convert_privkey_to_pem(key):
        key_bytes = ApiClient.convert_to_bytes(key)

        return serialization.load_pem_private_key(
            key_bytes,
            password=None,
            backend=default_backend()
        )
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def load_rsa_private_key(*names):
    """Load RSA private key."""
    loader = _guess_loader(names[-1], serialization.load_pem_private_key,
                           serialization.load_der_private_key)
    return jose.ComparableRSAKey(loader(
        load_vector(*names), password=None, backend=default_backend()))
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def load_rsa_private_key(*names):
    """Load RSA private key."""
    loader = _guess_loader(names[-1], serialization.load_pem_private_key,
                           serialization.load_der_private_key)
    return jose.ComparableRSAKey(loader(
        load_vector(*names), password=None, backend=default_backend()))
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def _load_cryptography_key(cls, data, password=None, backend=None):
        backend = default_backend() if backend is None else backend
        exceptions = {}

        # private key?
        for loader in (serialization.load_pem_private_key,
                       serialization.load_der_private_key):
            try:
                return loader(data, password, backend)
            except (ValueError, TypeError,
                    cryptography.exceptions.UnsupportedAlgorithm) as error:
                exceptions[loader] = error

        # public key?
        for loader in (serialization.load_pem_public_key,
                       serialization.load_der_public_key):
            try:
                return loader(data, backend)
            except (ValueError,
                    cryptography.exceptions.UnsupportedAlgorithm) as error:
                exceptions[loader] = error

        # no luck
        raise errors.Error('Unable to deserialize key: {0}'.format(exceptions))
项目:libtrust-py    作者:realityone    | 项目源码 | 文件源码
def from_pem(cls, key_data, passphrase=None):
        private_key = serialization.load_pem_private_key(
            key_data,
            passphrase,
            default_backend()
        )
        return cls(private_key)
项目:libtrust-py    作者:realityone    | 项目源码 | 文件源码
def from_pem(cls, key_data, passphrase=None):
        private_key = serialization.load_pem_private_key(
            key_data,
            passphrase,
            default_backend()
        )
        return cls(private_key)
项目:django-autocert    作者:farrepa    | 项目源码 | 文件源码
def get_key(self):
        password = settings.ACCOUNT_KEY_PASSWORD.encode()
        if not self.key:
            self.set_key()
            self.save()
        return serialization.load_pem_private_key(self.key.encode(), password=password, backend=default_backend())
项目:code-crypt    作者:Nextdoor    | 项目源码 | 文件源码
def _set_decryptor(
            self,
            plaintext_private_key=None,
            encrypted_private_key=None):
        '''Creates a OAEP decryptor based on a RSA private key.'''
        if self.decryptor is not None:
            return

        if not plaintext_private_key:
            if not encrypted_private_key:
                try:
                    with open(self.encrypted_private_key_file, 'rb') as f:
                        encrypted_private_key = f.read()
                except IOError as e:
                    raise errors.InputError(
                        "private key '%s' does not exist." % (
                            self.encrypted_private_key_file))

            plaintext_private_key = self._get_plaintext_private_key(
                encrypted_private_key,
                encryption_context=self.encryption_context)

        try:
            private_key_obj = serialization.load_pem_private_key(
                plaintext_private_key,
                password=None,
                backend=default_backend())

            self.decryptor = Decryptor(private_key_obj)
        except Exception as e:
            raise errors.DecryptorError(
                "private key is malformed. (Reason: %s)" % (str(e)))
项目:miracle    作者:mozilla    | 项目源码 | 文件源码
def _load_private(self, data):
        try:
            self._private = serialization.load_pem_private_key(
                b64decode(data), password=None, backend=self._backend)
            self._private_jwk = JWK()
            self._private_jwk.import_from_pyca(self._private)
        except Exception:  # pragma: no cover
            LOGGER.error(
                'Failed to parse private key starting with: %s', data[:28])
项目:parsec-cloud    作者:Scille    | 项目源码 | 文件源码
def __init__(self, key: bytes, password: str=None):
        self._pub_key = None
        if isinstance(key, bytes):
            private_key = serialization.load_pem_private_key(
                key,
                password=password.encode('utf-8') if password else None,
                backend=default_backend()
            )
            self._hazmat_private_key = private_key
        else:
            self._hazmat_private_key = key
        if self._hazmat_private_key.key_size < 1023:
            raise RuntimeError('Minimal key size is 1024bits')
项目:2FAssassin    作者:maxwellkoh    | 项目源码 | 文件源码
def test_convert_from_cryptography_private_key(self):
        """
        PKey.from_cryptography_key creates a proper private PKey.
        """
        key = serialization.load_pem_private_key(
            intermediate_key_pem, None, backend
        )
        pkey = PKey.from_cryptography_key(key)

        assert isinstance(pkey, PKey)
        assert pkey.bits() == key.key_size
        assert pkey._only_public is False
        assert pkey._initialized is True
项目:2FAssassin    作者:maxwellkoh    | 项目源码 | 文件源码
def test_convert_from_cryptography_unsupported_type(self):
        """
        PKey.from_cryptography_key raises TypeError with an unsupported type.
        """
        key = serialization.load_pem_private_key(
            ec_private_key_pem, None, backend
        )
        with pytest.raises(TypeError):
            PKey.from_cryptography_key(key)
项目:globus-cli    作者:globus    | 项目源码 | 文件源码
def parse_issuer_cred(issuer_cred):
    """
    Given an X509 PEM file in the form of a string, parses it into sections
    by the PEM delimiters of: -----BEGIN <label>----- and -----END <label>----
    Confirms the sections can be decoded in the proxy credential order of:
    issuer cert, issuer private key, proxy chain of 0 or more certs .
    Returns the issuer cert and private key as loaded cryptography objects
    and the proxy chain as a potentially empty string.
    """
    # get each section of the PEM file
    sections = re.findall(
        "-----BEGIN.*?-----.*?-----END.*?-----", issuer_cred, flags=re.DOTALL)
    try:
        issuer_cert = sections[0]
        issuer_private_key = sections[1]
        issuer_chain_certs = sections[2:]
    except IndexError:
        raise ValueError("Unable to parse PEM data in credentials, "
                         "make sure the X.509 file is in PEM format and "
                         "consists of the issuer cert, issuer private key, "
                         "and proxy chain (if any) in that order.")

    # then validate that each section of data can be decoded as expected
    try:
        loaded_cert = x509.load_pem_x509_certificate(
            six.b(issuer_cert), default_backend())
        loaded_private_key = serialization.load_pem_private_key(
            six.b(issuer_private_key),
            password=None, backend=default_backend())
        for chain_cert in issuer_chain_certs:
            x509.load_pem_x509_certificate(
                six.b(chain_cert), default_backend())
        issuer_chain = "".join(issuer_chain_certs)
    except ValueError:
        raise ValueError("Failed to decode PEM data in credentials. Make sure "
                         "the X.509 file consists of the issuer cert, "
                         "issuer private key, and proxy chain (if any) "
                         "in that order.")

    # return loaded cryptography objects and the issuer chain
    return loaded_cert, loaded_private_key, issuer_chain
项目:morango    作者:learningequality    | 项目源码 | 文件源码
def _set_private_key_string(self, private_key_string):

        self._private_key = crypto_serialization.load_pem_private_key(
            self.ensure_bytes(private_key_string),
            password=None,
            backend=crypto_backend,
        )
        self._public_key = self._private_key.public_key()


# alias the most-preferred key wrapper class we have available as `Key`
项目:loopchain    作者:theloopkr    | 项目源码 | 文件源码
def __load_cert(self, cert_dir):
        """???/??? ?? ? ?? ???

        :param cert_dir: ???/??? ?? ??
        :return: X509 ???
        """
        logging.debug("Cert/Key loading...")
        cert_file = join(cert_dir, "cert.pem")
        pri_file = join(cert_dir, "key.pem")

        f = open(cert_file, "rb")
        cert_bytes = f.read()
        f.close()
        cert = x509.load_pem_x509_certificate(cert_bytes, default_backend())

        f = open(pri_file, "rb")
        pri_bytes = f.read()
        f.close()
        try:
            pri = serialization.load_pem_private_key(pri_bytes, self.__PASSWD, default_backend())
        except ValueError:
            logging.debug("Invalid Password(%s)", cert_dir)
            return None

        data = b"test"
        signature = pri.sign(data, ec.ECDSA(hashes.SHA256()))

        try:
            pub_key = cert.public_key()
            result = pub_key.verify(signature, data, ec.ECDSA(hashes.SHA256()))
        except InvalidSignature:
            logging.debug("sign test fail")
            result = False

        if result:
            return cert
        else:
            logging.error("result is False ")
            return None
项目:loopchain    作者:theloopkr    | 项目源码 | 文件源码
def convert_privatekey_from_pem(self, pri_pem, password):
        """PEM ???? ????? private_key ? ??
        ??? ??? CA ???/??? None?? ??

        :param pri_pem: PEM ???
        :param password: ??? ??? ????
        :return: private_key
        """
        try:
            return serialization.load_pem_private_key(pri_pem, password, default_backend())
        except ValueError:
            logging.debug("Invalid Password")
            self.__ca_cert = None
            self.__ca_pri = None
            return None
项目:djamazing    作者:sunscrapers    | 项目源码 | 文件源码
def _init_protected_mode(self, config):
        self.protected = 'CLOUDFRONT_KEY_ID' in config
        if self.protected:
            cloud_front_key = self._get_cloud_front_key(config)
            self.key_id = config['CLOUDFRONT_KEY_ID']
            self.cloud_front_key = serialization.load_pem_private_key(
                cloud_front_key,
                password=None,
                backend=default_backend(),
            )
            self.signer = CloudFrontSigner(self.key_id, self.rsa_signer)
项目:certbot    作者:nikoloskii    | 项目源码 | 文件源码
def load_rsa_private_key(*names):
    """Load RSA private key."""
    loader = _guess_loader(names[-1], serialization.load_pem_private_key,
                           serialization.load_der_private_key)
    return jose.ComparableRSAKey(loader(
        load_vector(*names), password=None, backend=default_backend()))
项目:certbot    作者:nikoloskii    | 项目源码 | 文件源码
def load_rsa_private_key(*names):
    """Load RSA private key."""
    loader = _guess_loader(names[-1], serialization.load_pem_private_key,
                           serialization.load_der_private_key)
    return jose.ComparableRSAKey(loader(
        load_vector(*names), password=None, backend=default_backend()))
项目:scitokens    作者:scitokens    | 项目源码 | 文件源码
def test_deserialization(self):
        """
        Perform the deserialization test
        """
        with open('tests/simple_private_key.pem', 'rb') as key_file:
            private_key = serialization.load_pem_private_key(
            key_file.read(),
            password=None,
            backend=default_backend()
        )
        test_id = "stuffblah"

        public_numbers = private_key.public_key().public_numbers()
        server_address = start_server(public_numbers.n, public_numbers.e, test_id)
        print(server_address)
        issuer = "http://localhost:{}/".format(server_address[1])
        token = scitokens.SciToken(key=private_key, key_id=test_id)
        token.update_claims({"test": "true"})
        serialized_token = token.serialize(issuer=issuer)

        self.assertEqual(len(serialized_token.decode('utf8').split(".")), 3)

        scitoken = scitokens.SciToken.deserialize(serialized_token, insecure=True)

        self.assertIsInstance(scitoken, scitokens.SciToken)

        token = scitokens.SciToken(key=private_key, key_id="doesnotexist")
        serialized_token = token.serialize(issuer=issuer)
        with self.assertRaises(scitokens.utils.errors.MissingKeyException):
            scitoken = scitokens.SciToken.deserialize(serialized_token, insecure=True)
项目:scitokens    作者:scitokens    | 项目源码 | 文件源码
def _test_private(key):
        return serialization.load_pem_private_key(
            key,
            password=None,
            backend=default_backend()
        )
项目:ComBunqWebApp    作者:OGKevin    | 项目源码 | 文件源码
def convert_private_key_to_pem(private_key):
        private_key_bytes = private_key
        if not isinstance(private_key_bytes, bytes):
            private_key_bytes = private_key.encode()

        return serialization.load_pem_private_key(
            private_key_bytes,
            password=None,
            backend=default_backend()
        )
项目:ComBunqWebApp    作者:OGKevin    | 项目源码 | 文件源码
def convert_privkey_to_pem(key):
        key_bytes = ApiClient.convert_to_bytes(key)

        return serialization.load_pem_private_key(
            key_bytes,
            password=None,
            backend=default_backend()
        )
项目:certproxy    作者:geneanet    | 项目源码 | 文件源码
def load_privatekey(pkey_file):
    """ Load a private key """
    with open(pkey_file, 'rb') as f:
        pkey = serialization.load_pem_private_key(
            data=f.read(),
            password=None,
            backend=default_backend()
        )
    return pkey
项目:python-otr    作者:AGProjects    | 项目源码 | 文件源码
def load(cls, path):
        with openfile(path, 'rb') as key_file:
            key = serialization.load_pem_private_key(key_file.read(), password=None, backend=cls.__backend__)
        return PrivateKey.new(key) if cls.__type__ is None else cls(key)
项目:RKSV    作者:ztp-at    | 项目源码 | 文件源码
def loadPrivKey(pem):
    """
    Creates a cryptography private key object from the given PEM private key.
    :param pem: A private key as a PEM string.
    :return: A cryptography private key object.
    """
    return load_pem_private_key(pem.encode("utf-8"), None, default_backend())