Python cryptography.hazmat.primitives.serialization 模块,BestAvailableEncryption() 实例源码

我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用cryptography.hazmat.primitives.serialization.BestAvailableEncryption()

项目:requests-http-signature    作者:kislyuk    | 项目源码 | 文件源码
def test_rsa(self):
        from cryptography.hazmat.backends import default_backend
        from cryptography.hazmat.primitives.asymmetric import rsa
        from cryptography.hazmat.primitives import serialization
        private_key = rsa.generate_private_key(
            public_exponent=65537,
            key_size=2048,
            backend=default_backend()
        )
        private_key_pem = private_key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.PKCS8,
            encryption_algorithm=serialization.BestAvailableEncryption(passphrase)
        )
        public_key_pem = private_key.public_key().public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )
        url = 'http://example.com/path?query#fragment'
        auth = HTTPSignatureAuth(algorithm="rsa-sha256", key=private_key_pem, key_id="sekret", passphrase=passphrase)
        self.session.get(url, auth=auth, headers=dict(pubkey=base64.b64encode(public_key_pem)))
项目:Parlay    作者:PromenadeSoftware    | 项目源码 | 文件源码
def generate_keys(self):
        private_key = rsa.generate_private_key(
            public_exponent=65537,
            key_size=2048,
            backend=default_backend()
        )

        private_pem = private_key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.PKCS8,
            encryption_algorithm=serialization.BestAvailableEncryption(bytes(CloudLinkSettings.PRIVATE_KEY_PASSPHRASE))
        )
        public_pem = private_key.public_key().public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )
        with open(CloudLinkSettings.PRIVATE_KEY_LOCATION, 'w') as key_file:
            key_file.write(str(private_pem))

        with open(CloudLinkSettings.PUBLIC_KEY_LOCATION, 'w') as key_file:
            key_file.write(str(public_pem))
项目:delta-sdk-python    作者:Covata    | 项目源码 | 文件源码
def __save(self, private_key, file_name):
        if not isinstance(private_key, RSAPrivateKey):
            raise TypeError("private_key must be an instance of RSAPrivateKey, "
                            "actual: {}".format(type(private_key).__name__))

        pem = private_key.private_bytes(
            serialization.Encoding.PEM,
            serialization.PrivateFormat.PKCS8,
            serialization.BestAvailableEncryption(self.__key_store_passphrase))

        file_path = os.path.join(self.key_store_path, file_name)
        if not os.path.isdir(self.key_store_path):
            os.makedirs(self.key_store_path)

        if os.path.isfile(file_path):
            msg = "Save failed: A key with name [{}] already exists".format(
                file_name)
            raise IOError(msg)

        with open(file_path, 'w') as f:
            f.write(pem.decode(encoding='utf8'))
项目:django-autocert    作者:farrepa    | 项目源码 | 文件源码
def set_key(self):
        password = settings.ACCOUNT_KEY_PASSWORD.encode()
        key = rsa.generate_private_key(public_exponent=65537, key_size=settings.BITS, backend=default_backend())
        self.key = key.private_bytes(encoding=serialization.Encoding.PEM,
                                     format=serialization.PrivateFormat.TraditionalOpenSSL,
                                     encryption_algorithm=serialization.BestAvailableEncryption(password))
项目:parsec-cloud    作者:Scille    | 项目源码 | 文件源码
def export(self, password: str):
        return self._hazmat_private_key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.PKCS8,
            encryption_algorithm=serialization.BestAvailableEncryption(password.encode('utf-8'))
        )
项目:loopchain    作者:theloopkr    | 项目源码 | 文件源码
def main(argv):
    if len(argv) > 0:
        password = argv[0].encode()
    else:
        password = b'test'

    private_key = ec.generate_private_key(ec.SECP256K1(), default_backend())
    serialized_private = private_key.private_bytes(
        encoding=serialization.Encoding.DER,
        format=serialization.PrivateFormat.PKCS8,
        encryption_algorithm=serialization.BestAvailableEncryption(password)
    )

    serialized_public = private_key.public_key().public_bytes(
        encoding=serialization.Encoding.DER,
        format=serialization.PublicFormat.SubjectPublicKeyInfo
    )

    private_key = serialization.load_der_private_key(serialized_private, password, default_backend())

    with open('resources/default_pki/private.der', 'wb') as private_file:
        private_file.write(serialized_private)

    with open('resources/default_pki/public.der', 'wb') as public_file:
        public_file.write(serialized_public)

    print('save pki in : resources/default_pki/')
项目:wile    作者:costela    | 项目源码 | 文件源码
def ask_for_password_or_no_crypto(key_path):
    # we can't use prompt's "default" and "value_proc" arguments because we monkeypatch prompt in test_wile.py
    password = click.prompt('(optional) Password for %s' % key_path, default='',
                            hide_input=True, confirmation_prompt=True, show_default=False)
    if password:
        return serialization.BestAvailableEncryption(password.encode('utf-8'))
    else:
        return serialization.NoEncryption()
项目:pylibs    作者:tqlihuiqi    | 项目源码 | 文件源码
def gen(bits=1024, password=None):
        """ ??SSH RSA???

        :?? bits: ?????????: 1024
        :?? password: ?????????: None (???)
        :??: RSA??, RSA??
        """

        rsaKey = rsa.generate_private_key(
            backend = default_backend(), 
            public_exponent = 65537, 
            key_size = bits
        )

        privateKey = rsaKey.private_bytes(
            encoding = serialization.Encoding.PEM, 
            format = serialization.PrivateFormat.TraditionalOpenSSL,
            encryption_algorithm = password and serialization.BestAvailableEncryption(str(password).encode("utf-8")) or serialization.NoEncryption()
        ).decode("utf-8")

        publicKey = rsaKey.public_key().public_bytes(
            encoding = serialization.Encoding.OpenSSH, 
            format = serialization.PublicFormat.OpenSSH
        ).decode("utf-8")

        return privateKey, publicKey
项目:wetland    作者:ohmyadd    | 项目源码 | 文件源码
def _write_private_key(self, f, key, format, password=None):
        if password is None:
            encryption = serialization.NoEncryption()
        else:
            encryption = serialization.BestAvailableEncryption(b(password))

        f.write(key.private_bytes(
            serialization.Encoding.PEM,
            format,
            encryption
        ).decode())
项目:RemoteTree    作者:deNULL    | 项目源码 | 文件源码
def _write_private_key(self, f, key, format, password=None):
        if password is None:
            encryption = serialization.NoEncryption()
        else:
            encryption = serialization.BestAvailableEncryption(b(password))

        f.write(key.private_bytes(
            serialization.Encoding.PEM,
            format,
            encryption
        ).decode())
项目:pyseeder    作者:PurpleI2P    | 项目源码 | 文件源码
def keygen(pub_key, priv_key, user_id, priv_key_password=None):
    """Generate new private key and certificate RSA_SHA512_4096"""
    # Generate our key
    key = rsa.generate_private_key(public_exponent=65537, key_size=4096,
                                            backend=default_backend())

    if priv_key_password:
        ea = serialization.BestAvailableEncryption(priv_key_password)
    else:
        ea = serialization.NoEncryption()

    # Write our key to disk for safe keeping
    with open(priv_key, "wb") as f:
        f.write(key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.TraditionalOpenSSL,
            encryption_algorithm=ea,
        ))

    # Various details about who we are. For a self-signed certificate the
    # subject and issuer are always the same.
    subject = issuer = x509.Name([
        x509.NameAttribute(NameOID.COUNTRY_NAME, "XX"),
        x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "XX"),
        x509.NameAttribute(NameOID.LOCALITY_NAME, "XX"),
        x509.NameAttribute(NameOID.ORGANIZATION_NAME, "I2P Anonymous Network"),
        x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, "I2P"),
        x509.NameAttribute(NameOID.COMMON_NAME, user_id),
    ])

    cert = x509.CertificateBuilder() \
        .subject_name(subject) \
        .issuer_name(issuer) \
        .public_key(key.public_key()) \
        .not_valid_before(datetime.datetime.utcnow()) \
        .not_valid_after(
            datetime.datetime.utcnow() + datetime.timedelta(days=365*10)
        ) \
        .serial_number(random.randrange(1000000000, 2000000000)) \
        .add_extension(
            x509.SubjectKeyIdentifier.from_public_key(key.public_key()),
            critical=False,
        ).sign(key, hashes.SHA512(), default_backend())

    with open(pub_key, "wb") as f:
        f.write(cert.public_bytes(serialization.Encoding.PEM))
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def _private_key_bytes(self, encoding, format, encryption_algorithm,
                           traditional_write_func, evp_pkey, cdata):
        if not isinstance(encoding, serialization.Encoding):
            raise TypeError("encoding must be an item from the Encoding enum")

        if not isinstance(format, serialization.PrivateFormat):
            raise TypeError(
                "format must be an item from the PrivateFormat enum"
            )

        # This is a temporary check until we land DER serialization.
        if encoding is not serialization.Encoding.PEM:
            raise ValueError("Only PEM encoding is supported by this backend")

        if format is serialization.PrivateFormat.PKCS8:
            write_bio = self._lib.PEM_write_bio_PKCS8PrivateKey
            key = evp_pkey
        elif format is serialization.PrivateFormat.TraditionalOpenSSL:
            write_bio = traditional_write_func
            key = cdata

        if not isinstance(encryption_algorithm,
                          serialization.KeySerializationEncryption):
            raise TypeError(
                "Encryption algorithm must be a KeySerializationEncryption "
                "instance"
            )

        if isinstance(encryption_algorithm, serialization.NoEncryption):
            password = b""
            passlen = 0
            evp_cipher = self._ffi.NULL
        elif isinstance(encryption_algorithm,
                        serialization.BestAvailableEncryption):
            # This is a curated value that we will update over time.
            evp_cipher = self._lib.EVP_get_cipherbyname(
                b"aes-256-cbc"
            )
            password = encryption_algorithm.password
            passlen = len(password)
            if passlen > 1023:
                raise ValueError(
                    "Passwords longer than 1023 bytes are not supported by "
                    "this backend"
                )
        else:
            raise ValueError("Unsupported encryption type")

        bio = self._create_mem_bio()
        res = write_bio(
            bio,
            key,
            evp_cipher,
            password,
            passlen,
            self._ffi.NULL,
            self._ffi.NULL
        )
        assert res == 1
        return self._read_mem_bio(bio)
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def _private_key_bytes(self, encoding, format, encryption_algorithm,
                           traditional_write_func, evp_pkey, cdata):
        if not isinstance(encoding, serialization.Encoding):
            raise TypeError("encoding must be an item from the Encoding enum")

        if not isinstance(format, serialization.PrivateFormat):
            raise TypeError(
                "format must be an item from the PrivateFormat enum"
            )

        # This is a temporary check until we land DER serialization.
        if encoding is not serialization.Encoding.PEM:
            raise ValueError("Only PEM encoding is supported by this backend")

        if format is serialization.PrivateFormat.PKCS8:
            write_bio = self._lib.PEM_write_bio_PKCS8PrivateKey
            key = evp_pkey
        elif format is serialization.PrivateFormat.TraditionalOpenSSL:
            write_bio = traditional_write_func
            key = cdata

        if not isinstance(encryption_algorithm,
                          serialization.KeySerializationEncryption):
            raise TypeError(
                "Encryption algorithm must be a KeySerializationEncryption "
                "instance"
            )

        if isinstance(encryption_algorithm, serialization.NoEncryption):
            password = b""
            passlen = 0
            evp_cipher = self._ffi.NULL
        elif isinstance(encryption_algorithm,
                        serialization.BestAvailableEncryption):
            # This is a curated value that we will update over time.
            evp_cipher = self._lib.EVP_get_cipherbyname(
                b"aes-256-cbc"
            )
            password = encryption_algorithm.password
            passlen = len(password)
            if passlen > 1023:
                raise ValueError(
                    "Passwords longer than 1023 bytes are not supported by "
                    "this backend"
                )
        else:
            raise ValueError("Unsupported encryption type")

        bio = self._create_mem_bio()
        res = write_bio(
            bio,
            key,
            evp_cipher,
            password,
            passlen,
            self._ffi.NULL,
            self._ffi.NULL
        )
        assert res == 1
        return self._read_mem_bio(bio)
项目:loopchain    作者:theloopkr    | 项目源码 | 文件源码
def test_ecc_key(self):
        """
        ECC ??? ???? ??? ??, ECDSA ??/?? ???
        """
        logging.debug("----- ECDSA Test Start -----")
        # ?? ??
        pri_key = ec.generate_private_key(ec.SECP256K1(), default_backend())
        pub_key = pri_key.public_key()

        pri_der = pri_key.private_bytes(
            encoding=serialization.Encoding.DER,
            format=serialization.PrivateFormat.PKCS8,
            # encryption_algorithm=serialization.NoEncryption()
            encryption_algorithm=serialization.BestAvailableEncryption(password=b'qwer1234')
        )

        pub_der = pub_key.public_bytes(
            encoding=serialization.Encoding.DER,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )

        pri_b64 = base64.b64encode(pri_der, altchars=None)
        pub_b64 = base64.b64encode(pub_der, altchars=None)

        logging.debug("Private Key : \n%s", pri_b64)
        logging.debug("Public  Key : \n%s", pub_b64)

        # ??? ??
        cert = self._generate_cert(pub_key=pub_key, issuer_key=pri_key, subject_name="test")
        cert_key = cert.public_key()

        # ECDSA ?? ?? ? ?? ???
        data = b"test"
        signature = self._generate_sign(pri_key=pri_key, data=data)

        sign_b64 = base64.b64encode(signature, altchars=None)
        logging.debug("Sign : %s", sign_b64)

        validation_result = self._verify_signature(pub_key=cert_key, data=data, signature=signature)
        logging.debug("Verify : %s", validation_result)
        self.assertEqual(validation_result, True)

        # ECDSA ??? ???? ?? ??
        signature = pri_key.sign(
            data,
            ec.ECDSA(hashes.SHA256())
        )

        validation_result = self._verify_signature(pub_key=cert_key, data=data, signature=signature)
        logging.debug("----- ECDSA Test End -----\n")
        self.assertTrue(validation_result)
项目:loopchain    作者:theloopkr    | 项目源码 | 文件源码
def generate_ca_cert(self, cn, ou, o, expire_period=None, password=None):
        """CA ??? ??
        Peer ??? ?? ?? ???(ECC Key)

        :param cn: ?? CommonName
        :param ou: ?? OrganizationalUnitName
        :param o: ?? OrganizationName
        :param expire_period: ??? ????(year)
        :param password: ??? ??? ????(8?? ??)
        """
        sign_pri_key = ec.generate_private_key(ec.SECP256K1(), default_backend())
        sign_pub_key = sign_pri_key.public_key()

        subject_name = x509.Name([
            x509.NameAttribute(NameOID.COMMON_NAME, cn),
            x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, ou),
            x509.NameAttribute(NameOID.ORGANIZATION_NAME, o),
            x509.NameAttribute(NameOID.COUNTRY_NAME, "kr")
        ])

        serial_number = self.__LAST_CA_INDEX + 1

        key_usage = x509.KeyUsage(digital_signature=True, content_commitment=False,
                                  key_encipherment=True, data_encipherment=False, key_agreement=False,
                                  key_cert_sign=True, crl_sign=False,
                                  encipher_only=False, decipher_only=False)

        if expire_period is None:
            expire_period = self.__ca_expired

        new_cert = self.__generate_cert(pub_key=sign_pub_key, subject_name=subject_name,
                                        issuer_name=subject_name, serial_number=serial_number,
                                        expire_period=expire_period, key_usage=key_usage,
                                        issuer_priv=sign_pri_key)

        cert_pem = new_cert.public_bytes(encoding=serialization.Encoding.PEM)
        if password is None:
            pri_pem = sign_pri_key.private_bytes(
                encoding=serialization.Encoding.PEM,
                format=serialization.PrivateFormat.PKCS8,
                encryption_algorithm=serialization.NoEncryption()
            )
        else:
            pri_pem = sign_pri_key.private_bytes(
                encoding=serialization.Encoding.DER,
                format=serialization.PrivateFormat.PKCS8,
                encryption_algorithm=serialization.BestAvailableEncryption(password=password)
            )

        self.__save(self.__CA_PATH, cert_pem, pri_pem)
        self.__LAST_CA_INDEX += 1
        self.__show_certificate(new_cert)
项目:loopchain    作者:theloopkr    | 项目源码 | 文件源码
def generate_peer_cert(self, cn, password=None):
        """Peer ??? ??
        ???/???? ???(ECC Key), ????? 1?

        :param cn: ?? CommonName
        :param password: ??? ??? ????(8?? ??)
        """
        pri_key = ec.generate_private_key(ec.SECP256K1(), default_backend())
        pub_key = pri_key.public_key()

        issuer_name = self.__ca_cert.issuer
        ou = issuer_name.get_attributes_for_oid(NameOID.ORGANIZATIONAL_UNIT_NAME)[0].value
        o = issuer_name.get_attributes_for_oid(NameOID.ORGANIZATION_NAME)[0].value

        subject_name = x509.Name([
            x509.NameAttribute(NameOID.COMMON_NAME, cn),
            x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, ou),
            x509.NameAttribute(NameOID.ORGANIZATION_NAME, o),
            x509.NameAttribute(NameOID.COUNTRY_NAME, "kr")
        ])

        expire_period = self.__peer_expired

        serial_number = self.__LAST_PEER_INDEX + 1

        key_usage = x509.KeyUsage(digital_signature=True, content_commitment=False,
                                  key_encipherment=True, data_encipherment=False, key_agreement=False,
                                  key_cert_sign=False, crl_sign=False,
                                  encipher_only=False, decipher_only=False)

        new_cert = self.__generate_cert(pub_key=pub_key, subject_name=subject_name,
                                        issuer_name=issuer_name, serial_number=serial_number,
                                        expire_period=expire_period, key_usage=key_usage,
                                        issuer_priv=self.__ca_pri, issuer_cert=self.__ca_cert)

        cert_pem = new_cert.public_bytes(encoding=serialization.Encoding.PEM)
        if password is None:
            pri_pem = pri_key.private_bytes(
                encoding=serialization.Encoding.PEM,
                format=serialization.PrivateFormat.PKCS8,
                encryption_algorithm=serialization.NoEncryption()
            )
        else:
            pri_pem = pri_key.private_bytes(
                encoding=serialization.Encoding.PEM,
                format=serialization.PrivateFormat.PKCS8,
                encryption_algorithm=serialization.BestAvailableEncryption(password=password)
            )

        ca_cert_pem = self.__ca_cert.public_bytes(encoding=serialization.Encoding.PEM)

        peer_path = join(self.__DEFAULT_PATH, cn)
        self.__save(peer_path, cert_bytes=cert_pem, pri_bytes=pri_pem, ca_cert=ca_cert_pem)

        # ???? ??
        self.__load_peer_certificate(cert_bytes=cert_pem)

        self.__show_certificate(new_cert)
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def _private_key_bytes(self, encoding, format, encryption_algorithm,
                           traditional_write_func, evp_pkey, cdata):
        if not isinstance(encoding, serialization.Encoding):
            raise TypeError("encoding must be an item from the Encoding enum")

        if not isinstance(format, serialization.PrivateFormat):
            raise TypeError(
                "format must be an item from the PrivateFormat enum"
            )

        # This is a temporary check until we land DER serialization.
        if encoding is not serialization.Encoding.PEM:
            raise ValueError("Only PEM encoding is supported by this backend")

        if format is serialization.PrivateFormat.PKCS8:
            write_bio = self._lib.PEM_write_bio_PKCS8PrivateKey
            key = evp_pkey
        elif format is serialization.PrivateFormat.TraditionalOpenSSL:
            write_bio = traditional_write_func
            key = cdata

        if not isinstance(encryption_algorithm,
                          serialization.KeySerializationEncryption):
            raise TypeError(
                "Encryption algorithm must be a KeySerializationEncryption "
                "instance"
            )

        if isinstance(encryption_algorithm, serialization.NoEncryption):
            password = b""
            passlen = 0
            evp_cipher = self._ffi.NULL
        elif isinstance(encryption_algorithm,
                        serialization.BestAvailableEncryption):
            # This is a curated value that we will update over time.
            evp_cipher = self._lib.EVP_get_cipherbyname(
                b"aes-256-cbc"
            )
            password = encryption_algorithm.password
            passlen = len(password)
            if passlen > 1023:
                raise ValueError(
                    "Passwords longer than 1023 bytes are not supported by "
                    "this backend"
                )
        else:
            raise ValueError("Unsupported encryption type")

        bio = self._create_mem_bio()
        res = write_bio(
            bio,
            key,
            evp_cipher,
            password,
            passlen,
            self._ffi.NULL,
            self._ffi.NULL
        )
        assert res == 1
        return self._read_mem_bio(bio)
项目:Docker-XX-Net    作者:kuanghy    | 项目源码 | 文件源码
def _private_key_bytes(self, encoding, format, encryption_algorithm,
                           traditional_write_func, evp_pkey, cdata):
        if not isinstance(encoding, serialization.Encoding):
            raise TypeError("encoding must be an item from the Encoding enum")

        if not isinstance(format, serialization.PrivateFormat):
            raise TypeError(
                "format must be an item from the PrivateFormat enum"
            )

        # This is a temporary check until we land DER serialization.
        if encoding is not serialization.Encoding.PEM:
            raise ValueError("Only PEM encoding is supported by this backend")

        if format is serialization.PrivateFormat.PKCS8:
            write_bio = self._lib.PEM_write_bio_PKCS8PrivateKey
            key = evp_pkey
        elif format is serialization.PrivateFormat.TraditionalOpenSSL:
            write_bio = traditional_write_func
            key = cdata

        if not isinstance(encryption_algorithm,
                          serialization.KeySerializationEncryption):
            raise TypeError(
                "Encryption algorithm must be a KeySerializationEncryption "
                "instance"
            )

        if isinstance(encryption_algorithm, serialization.NoEncryption):
            password = b""
            passlen = 0
            evp_cipher = self._ffi.NULL
        elif isinstance(encryption_algorithm,
                        serialization.BestAvailableEncryption):
            # This is a curated value that we will update over time.
            evp_cipher = self._lib.EVP_get_cipherbyname(
                b"aes-256-cbc"
            )
            password = encryption_algorithm.password
            passlen = len(password)
            if passlen > 1023:
                raise ValueError(
                    "Passwords longer than 1023 bytes are not supported by "
                    "this backend"
                )
        else:
            raise ValueError("Unsupported encryption type")

        bio = self._create_mem_bio()
        res = write_bio(
            bio,
            key,
            evp_cipher,
            password,
            passlen,
            self._ffi.NULL,
            self._ffi.NULL
        )
        assert res == 1
        return self._read_mem_bio(bio)