Python cryptography.hazmat.backends 模块,default_backend() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用cryptography.hazmat.backends.default_backend()

项目:marathon-acme    作者:praekeltfoundation    | 项目源码 | 文件源码
def test_key_not_exists(self, pem_path):
        """
        When we get the client key and no key file exists, a new key should be
        generated and the key should be saved in a key file.
        """
        key = maybe_key(pem_path)

        pem_file = pem_path.child(u'client.key')
        assert_that(pem_file.exists(), Equals(True))

        file_key = serialization.load_pem_private_key(
            pem_file.getContent(),
            password=None,
            backend=default_backend()
        )
        file_key = JWKRSA(key=file_key)

        assert_that(key, Equals(file_key))
项目:fabric-sdk-py    作者:hyperledger    | 项目源码 | 文件源码
def _restore_state(self):
        """ Restore user state. """
        try:
            state = self._state_store.get_value(self._state_store_key)
            state_dict = pickle.loads(
                binascii.unhexlify(state.encode("utf-8")))
            self._name = state_dict['name']
            self.enrollment_secret = state_dict['enrollment_secret']
            enrollment = state_dict['enrollment']
            if enrollment:
                private_key = serialization.load_pem_private_key(
                    enrollment['private_key'],
                    password=None,
                    backend=default_backend()
                )
                cert = enrollment['cert']
                self.enrollment = Enrollment(private_key, cert)
            self.affiliation = state_dict['affiliation']
            self.account = state_dict['account']
            self.roles = state_dict['roles']
            self._org = state_dict['org']
            self.msp_id = state_dict['msp_id']
        except Exception as e:
            raise IOError("Cannot deserialize the user", e)
项目:privcount    作者:privcount    | 项目源码 | 文件源码
def verify_hmac(expected_result, secret_key, unique_prefix, data):
    '''
    Perform a HMAC using the secret key, unique hash prefix, and data, and
    verify that the result of:
    HMAC-SHA256(secret_key, unique_prefix | data)
    matches the bytes in expected_result.
    The key must be kept secret. The prefix ensures hash uniqueness.
    Returns True if the signature matches, and False if it does not.
    '''
    # If the secret key is shorter than the digest size, security is reduced
    assert secret_key
    assert len(secret_key) >= CryptoHash.digest_size
    h = hmac.HMAC(bytes(secret_key), CryptoHash(), backend=default_backend())
    h.update(bytes(unique_prefix))
    h.update(bytes(data))
    try:
        h.verify(bytes(expected_result))
        return True
    except InvalidSignature:
        return False
项目:python-miio    作者:rytilahti    | 项目源码 | 文件源码
def encrypt(plaintext: bytes, token: bytes) -> bytes:
        """Encrypt plaintext with a given token.

        :param bytes plaintext: Plaintext (json) to encrypt
        :param bytes token: Token to use
        :return: Encrypted bytes"""
        if not isinstance(plaintext, bytes):
            raise TypeError("plaintext requires bytes")
        Utils.verify_token(token)
        key, iv = Utils.key_iv(token)
        padder = padding.PKCS7(128).padder()

        padded_plaintext = padder.update(plaintext) + padder.finalize()
        cipher = Cipher(algorithms.AES(key), modes.CBC(iv),
                        backend=default_backend())

        encryptor = cipher.encryptor()
        return encryptor.update(padded_plaintext) + encryptor.finalize()
项目:python-miio    作者:rytilahti    | 项目源码 | 文件源码
def decrypt(ciphertext: bytes, token: bytes) -> bytes:
        """Decrypt ciphertext with a given token.

        :param bytes ciphertext: Ciphertext to decrypt
        :param bytes token: Token to use
        :return: Decrypted bytes object"""
        if not isinstance(ciphertext, bytes):
            raise TypeError("ciphertext requires bytes")
        Utils.verify_token(token)
        key, iv = Utils.key_iv(token)
        cipher = Cipher(algorithms.AES(key), modes.CBC(iv),
                        backend=default_backend())

        decryptor = cipher.decryptor()
        padded_plaintext = decryptor.update(ciphertext) + decryptor.finalize()

        unpadder = padding.PKCS7(128).unpadder()
        unpadded_plaintext = unpadder.update(padded_plaintext)
        unpadded_plaintext += unpadder.finalize()
        return unpadded_plaintext
项目:endosome    作者:teor2345    | 项目源码 | 文件源码
def kdf_rfc5869_derive(secret_input_bytes, output_len, m_expand=M_EXPAND_NTOR,
                       t_key=T_KEY_NTOR):
    '''
    Return output_len bytes generated from secret_input_bytes using RFC5869
    with HKDF-SHA256.
    There is no equivalent verification function, as only the nonce part of
    the KDF result is verified directly.
    See https://gitweb.torproject.org/torspec.git/tree/tor-spec.txt#n1026
    '''
    hkdf_sha256 = hkdf.HKDF(algorithm=hashes.SHA256(),
                            length=output_len,
                            info=bytes(m_expand),
                            salt=bytes(t_key),
                            backend=backends.default_backend())
    output_bytes = hkdf_sha256.derive(bytes(secret_input_bytes))
    assert len(output_bytes) == output_len
    return bytearray(output_bytes)
项目:endosome    作者:teor2345    | 项目源码 | 文件源码
def crypt_create(key_bytes,
                 is_encrypt_flag=None,
                 iv_bytes=None,
                 algorithm=ciphers_algorithms.AES,
                 mode=ciphers_modes.CTR):
    '''
    Create and return a crypto context for symmetric key encryption using the
    key key_bytes.

    Uses algorithm in mode with an IV of iv_bytes.
    If iv_bytes is None, an all-zero IV is used.

    AES CTR mode uses the same operations for encryption and decyption.
    '''
    #print "Key: " + binascii.hexlify(bytes(key_bytes))
    algo = algorithm(bytes(key_bytes))
    # The block_size is in bits
    if iv_bytes is None:
        iv_bytes = get_zero_pad(algo.block_size/BITS_IN_BYTE)
    cipher = ciphers.Cipher(algo, mode(bytes(iv_bytes)),
                            backend=backends.default_backend()) 
    if is_encrypt_flag:
        return cipher.encryptor()
    else:
        return cipher.decryptor()
项目:requests-http-signature    作者:kislyuk    | 项目源码 | 文件源码
def test_rsa(self):
        from cryptography.hazmat.backends import default_backend
        from cryptography.hazmat.primitives.asymmetric import rsa
        from cryptography.hazmat.primitives import serialization
        private_key = rsa.generate_private_key(
            public_exponent=65537,
            key_size=2048,
            backend=default_backend()
        )
        private_key_pem = private_key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.PKCS8,
            encryption_algorithm=serialization.BestAvailableEncryption(passphrase)
        )
        public_key_pem = private_key.public_key().public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )
        url = 'http://example.com/path?query#fragment'
        auth = HTTPSignatureAuth(algorithm="rsa-sha256", key=private_key_pem, key_id="sekret", passphrase=passphrase)
        self.session.get(url, auth=auth, headers=dict(pubkey=base64.b64encode(public_key_pem)))
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def verify_renewable_cert_sig(renewable_cert):
    """ Verifies the signature of a `.storage.RenewableCert` object.

    :param `.storage.RenewableCert` renewable_cert: cert to verify

    :raises errors.Error: If signature verification fails.
    """
    try:
        with open(renewable_cert.chain, 'rb') as chain:
            chain, _ = pyopenssl_load_certificate(chain.read())
        with open(renewable_cert.cert, 'rb') as cert:
            cert = x509.load_pem_x509_certificate(cert.read(), default_backend())
        hash_name = cert.signature_hash_algorithm.name
        OpenSSL.crypto.verify(chain, cert.signature, cert.tbs_certificate_bytes, hash_name)
    except (IOError, ValueError, OpenSSL.crypto.Error) as e:
        error_str = "verifying the signature of the cert located at {0} has failed. \
                Details: {1}".format(renewable_cert.cert, e)
        logger.exception(error_str)
        raise errors.Error(error_str)
项目:cursive    作者:openstack    | 项目源码 | 文件源码
def load_certificate(self, cert_name):
        # Load the raw certificate file data.
        path = os.path.join(self.cert_path, cert_name)
        with open(path, 'rb') as cert_file:
            data = cert_file.read()

        # Convert the raw certificate data into a certificate object, first
        # as a PEM-encoded certificate and, if that fails, then as a
        # DER-encoded certificate. If both fail, the certificate cannot be
        # loaded.
        try:
            return x509.load_pem_x509_certificate(data, default_backend())
        except Exception:
            try:
                return x509.load_der_x509_certificate(data, default_backend())
            except Exception:
                raise exception.SignatureVerificationError(
                    "Failed to load certificate: %s" % path
                )
项目:kolla-ansible    作者:openstack    | 项目源码 | 文件源码
def generate_RSA(bits=4096):
    new_key = rsa.generate_private_key(
        public_exponent=65537,
        key_size=bits,
        backend=default_backend()
    )
    private_key = new_key.private_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PrivateFormat.PKCS8,
        encryption_algorithm=serialization.NoEncryption()
    )
    public_key = new_key.public_key().public_bytes(
        encoding=serialization.Encoding.OpenSSH,
        format=serialization.PublicFormat.OpenSSH
    )
    return private_key, public_key
项目:kolla-ansible    作者:openstack    | 项目源码 | 文件源码
def generate_RSA(bits=4096):
    new_key = rsa.generate_private_key(
        public_exponent=65537,
        key_size=bits,
        backend=default_backend()
    )
    private_key = new_key.private_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PrivateFormat.PKCS8,
        encryption_algorithm=serialization.NoEncryption()
    )
    public_key = new_key.public_key().public_bytes(
        encoding=serialization.Encoding.OpenSSH,
        format=serialization.PublicFormat.OpenSSH
    )
    return private_key, public_key
项目:bitmask-dev    作者:leapcode    | 项目源码 | 文件源码
def validate_ca_cert(self, ignored):
        expected = self._get_expected_ca_cert_fingerprint()
        algo, expectedfp = expected.split(':')
        expectedfp = expectedfp.replace(' ', '')
        backend = default_backend()

        with open(self._get_ca_cert_path(), 'r') as f:
            certstr = f.read()
        cert = load_pem_x509_certificate(certstr, backend)
        hasher = getattr(hashes, algo)()
        fpbytes = cert.fingerprint(hasher)
        fp = binascii.hexlify(fpbytes)

        if fp != expectedfp:
            os.unlink(self._get_ca_cert_path())
            self.log.error("Fingerprint of CA cert doesn't match: %s <-> %s"
                           % (fp, expectedfp))
            raise NetworkError("The provider's CA fingerprint doesn't match")
项目:code-crypt    作者:Nextdoor    | 项目源码 | 文件源码
def _set_encryptor(self, public_key):
        '''Creates a OAEP decryptor based on a RSA private key.'''
        if self.encryptor is not None:
            return

        if not public_key:
            try:
                with open(self.public_key_file, 'rb') as f:
                    public_key = f.read()
            except IOError as e:
                raise errors.InputError(
                    "public key '%s' does not exist." % self.public_key_file)

        try:
            public_key_obj = serialization.load_pem_public_key(
                public_key,
                backend=default_backend())

            self.encryptor = Encryptor(public_key_obj)
        except Exception as e:
            raise errors.EncryptorError(
                "public key is malformed. (Reason: %s)" % (str(e)))
项目:manuale    作者:veeti    | 项目源码 | 文件源码
def create_csr(key, domains, must_staple=False):
    """
    Creates a CSR in DER format for the specified key and domain names.
    """
    assert domains
    name = x509.Name([
        x509.NameAttribute(NameOID.COMMON_NAME, domains[0]),
    ])
    san = x509.SubjectAlternativeName([x509.DNSName(domain) for domain in domains])
    csr = x509.CertificateSigningRequestBuilder().subject_name(name) \
        .add_extension(san, critical=False)
    if must_staple:
        ocsp_must_staple = x509.TLSFeature(features=[x509.TLSFeatureType.status_request])
        csr = csr.add_extension(ocsp_must_staple, critical=False)
    csr = csr.sign(key, hashes.SHA256(), default_backend())
    return export_csr_for_acme(csr)
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def _gen_key_initctr(cls, b_password, b_salt):
        # 16 for AES 128, 32 for AES256
        keylength = 32

        # match the size used for counter.new to avoid extra work
        ivlength = 16

        if HAS_PBKDF2HMAC:
            backend = default_backend()
            kdf = PBKDF2HMAC(
                algorithm=c_SHA256(),
                length=2 * keylength + ivlength,
                salt=b_salt,
                iterations=10000,
                backend=backend)
            b_derivedkey = kdf.derive(b_password)
        else:
            b_derivedkey = cls._create_key(b_password, b_salt, keylength, ivlength)

        b_key1 = b_derivedkey[:keylength]
        b_key2 = b_derivedkey[keylength:(keylength * 2)]
        b_iv = b_derivedkey[(keylength * 2):(keylength * 2) + ivlength]

        return b_key1, b_key2, hexlify(b_iv)
项目:rust_pypi_example    作者:mckaymatt    | 项目源码 | 文件源码
def load_key(pubkey):
    """Load public RSA key, with work-around for keys using
    incorrect header/footer format.

    Read more about RSA encryption with cryptography:
    https://cryptography.io/latest/hazmat/primitives/asymmetric/rsa/
    """
    try:
        return load_pem_public_key(pubkey.encode(), default_backend())
    except ValueError:
        # workaround for https://github.com/travis-ci/travis-api/issues/196
        pubkey = pubkey.replace('BEGIN RSA', 'BEGIN').replace('END RSA', 'END')
        return load_pem_public_key(pubkey.encode(), default_backend())
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def decode_public_key(self, encoded):
        """
        Based on spotnab, this is the gzipped version of the key
        with base64 applied to it.  We decode it and load it.

        """
        fileobj = StringIO()
        try:
            fileobj.write(b64decode(encoded))
        except TypeError:
            return False

        fileobj.seek(0L, SEEK_SET)
        self.public_key = None
        with GzipFile(fileobj=fileobj, mode="rb") as f:
            try:
                self.public_key = serialization.load_pem_public_key(
                    f.read(),
                    backend=default_backend()
                )
            except ValueError:
                # Could not decrypt content
                return False

        if not self.public_key:
            return False

        return True
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def genkeys(self, key_size=KeySize.NORMAL, password=None):
        """
        Generates a Private and Public Key set and returns them in a tuple
        (private, public)

        """

        self.private_key = rsa.generate_private_key(
            # The public exponent of the new key. Usually one of the small
            # Fermat primes 3, 5, 17, 257, 65537. If in doubt you should use
            # 65537. See http://www.daemonology.net/blog/2009-06-11-\
            #  cryptographic-right-answers.html
            public_exponent=65537,
            key_size=key_size,
            backend=default_backend()
        )

        # Generate our Public Key
        self.public_key = self.private_key.public_key()

        # Store our password; this will be used when we save our content
        # via it's searialized value later on
        self.password = password

        # Returns a (RSAPrivateKey, RSAPublicKey)
        return (self.private_key, self.public_key)
项目:calendary    作者:DavidHickman    | 项目源码 | 文件源码
def load_key(pubkey):
    """Load public RSA key, with work-around for keys using
    incorrect header/footer format.

    Read more about RSA encryption with cryptography:
    https://cryptography.io/latest/hazmat/primitives/asymmetric/rsa/
    """
    try:
        return load_pem_public_key(pubkey.encode(), default_backend())
    except ValueError:
        # workaround for https://github.com/travis-ci/travis-api/issues/196
        pubkey = pubkey.replace('BEGIN RSA', 'BEGIN').replace('END RSA', 'END')
        return load_pem_public_key(pubkey.encode(), default_backend())
项目:tokenize-uk    作者:lang-uk    | 项目源码 | 文件源码
def load_key(pubkey):
    """Load public RSA key, with work-around for keys using
    incorrect header/footer format.

    Read more about RSA encryption with cryptography:
    https://cryptography.io/latest/hazmat/primitives/asymmetric/rsa/
    """
    try:
        return load_pem_public_key(pubkey.encode(), default_backend())
    except ValueError:
        # workaround for https://github.com/travis-ci/travis-api/issues/196
        pubkey = pubkey.replace('BEGIN RSA', 'BEGIN').replace('END RSA', 'END')
        return load_pem_public_key(pubkey.encode(), default_backend())
项目:libcloud.api    作者:tonybaloney    | 项目源码 | 文件源码
def load_key(pubkey):
    """Load public RSA key, with work-around for keys using
    incorrect header/footer format.

    Read more about RSA encryption with cryptography:
    https://cryptography.io/latest/hazmat/primitives/asymmetric/rsa/
    """
    try:
        return load_pem_public_key(pubkey.encode(), default_backend())
    except ValueError:
        # workaround for https://github.com/travis-ci/travis-api/issues/196
        pubkey = pubkey.replace('BEGIN RSA', 'BEGIN').replace('END RSA', 'END')
        return load_pem_public_key(pubkey.encode(), default_backend())
项目:Travis-Encrypt    作者:mandeep    | 项目源码 | 文件源码
def encrypt_key(key, password):
    """Encrypt the password with the public key and return an ASCII representation.

    The public key retrieved from the Travis API is loaded as an RSAPublicKey
    object using Cryptography's default backend. Then the given password
    is encrypted with the encrypt() method of RSAPublicKey. The encrypted
    password is then encoded to base64 and decoded into ASCII in order to
    convert the bytes object into a string object.

    Parameters
    ----------
    key: str
        Travis CI public RSA key that requires deserialization
    password: str
        the password to be encrypted

    Returns
    -------
    encrypted_password: str
        the base64 encoded encrypted password decoded as ASCII

    Notes
    -----
    Travis CI uses the PKCS1v15 padding scheme. While PKCS1v15 is secure,
    it is outdated and should be replaced with OAEP.

    Example:
    OAEP(mgf=MGF1(algorithm=SHA256()), algorithm=SHA256(), label=None))
    """
    public_key = load_pem_public_key(key.encode(), default_backend())
    encrypted_password = public_key.encrypt(password, PKCS1v15())
    return base64.b64encode(encrypted_password).decode('ascii')
项目:MDRun    作者:jeiros    | 项目源码 | 文件源码
def load_key(pubkey):
    """Load public RSA key, with work-around for keys using
    incorrect header/footer format.

    Read more about RSA encryption with cryptography:
    https://cryptography.io/latest/hazmat/primitives/asymmetric/rsa/
    """
    try:
        return load_pem_public_key(pubkey.encode(), default_backend())
    except ValueError:
        # workaround for https://github.com/travis-ci/travis-api/issues/196
        pubkey = pubkey.replace('BEGIN RSA', 'BEGIN').replace('END RSA', 'END')
        return load_pem_public_key(pubkey.encode(), default_backend())
项目:twindb_cloudflare    作者:twindb    | 项目源码 | 文件源码
def load_key(pubkey):
    """Load public RSA key, with work-around for keys using
    incorrect header/footer format.

    Read more about RSA encryption with cryptography:
    https://cryptography.io/latest/hazmat/primitives/asymmetric/rsa/
    """
    try:
        return load_pem_public_key(pubkey.encode(), default_backend())
    except ValueError:
        # workaround for https://github.com/travis-ci/travis-api/issues/196
        pubkey = pubkey.replace('BEGIN RSA', 'BEGIN').replace('END RSA', 'END')
        return load_pem_public_key(pubkey.encode(), default_backend())
项目:marathon-acme    作者:praekeltfoundation    | 项目源码 | 文件源码
def maybe_key(pem_path):
    """
    Set up a client key if one does not exist already.

    https://gist.github.com/glyph/27867a478bb71d8b6046fbfb176e1a33#file-local-certs-py-L32-L50

    :type pem_path: twisted.python.filepath.FilePath
    :param pem_path:
        The path to the certificate directory to use.
    """
    acme_key_file = pem_path.child(u'client.key')
    if acme_key_file.exists():
        key = serialization.load_pem_private_key(
            acme_key_file.getContent(),
            password=None,
            backend=default_backend()
        )
    else:
        key = generate_private_key(u'rsa')
        acme_key_file.setContent(
            key.private_bytes(
                encoding=serialization.Encoding.PEM,
                format=serialization.PrivateFormat.TraditionalOpenSSL,
                encryption_algorithm=serialization.NoEncryption()
            )
        )
    return jose.JWKRSA(key=key)
项目:marathon-acme    作者:praekeltfoundation    | 项目源码 | 文件源码
def generate_wildcard_pem_bytes():
    """
    Generate a wildcard (subject name '*') self-signed certificate valid for
    10 years.

    https://cryptography.io/en/latest/x509/tutorial/#creating-a-self-signed-certificate

    :return: Bytes representation of the PEM certificate data
    """
    key = generate_private_key(u'rsa')
    name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, u'*')])
    cert = (
        x509.CertificateBuilder()
        .issuer_name(name)
        .subject_name(name)
        .not_valid_before(datetime.today() - timedelta(days=1))
        .not_valid_after(datetime.now() + timedelta(days=3650))
        .serial_number(int(uuid.uuid4()))
        .public_key(key.public_key())
        .sign(
            private_key=key,
            algorithm=hashes.SHA256(),
            backend=default_backend())
        )

    return b''.join((
        key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.TraditionalOpenSSL,
            encryption_algorithm=serialization.NoEncryption()),
        cert.public_bytes(serialization.Encoding.PEM)
    ))
项目:fabric-sdk-py    作者:hyperledger    | 项目源码 | 文件源码
def get_peer_org_user(client, peer_org, user='Admin'):
    """Loads the requested user for a given peer org
        and returns a user object.
    """

    peer_user_base_path = os.path.join(
        os.getcwd(),
        'test/fixtures/e2e_cli/crypto-config/peerOrganizations/{0}'
        '/users/{1}@{0}/msp/'.format(peer_org, user)
    )

    key_path = os.path.join(
        peer_user_base_path,
        'keystore/',
        E2E_CONFIG['test-network'][peer_org]['users'][user]['private_key']
    )

    cert_path = os.path.join(
        peer_user_base_path,
        'signcerts/',
        E2E_CONFIG['test-network'][peer_org]['users'][user]['cert']
    )

    with open(key_path, 'rb') as key:
        key_pem = key.read()

    with open(cert_path, 'rb') as cert:
        cert_pem = cert.read()

    org_user = User('peer' + peer_org + user, peer_org, client.state_store)

    # wrap the key in a 'cryptography' private key object
    # so that all the methods can be used
    private_key = load_pem_private_key(key_pem, None, default_backend())

    enrollment = Enrollment(private_key, cert_pem)

    org_user.enrollment = enrollment
    org_user.msp_id = E2E_CONFIG['test-network'][peer_org]['mspid']

    return org_user
项目:pyseeder    作者:PurpleI2P    | 项目源码 | 文件源码
def append_signature(target_file, priv_key, priv_key_password=None):
    """Append signature to the end of file"""
    with open(target_file, "rb") as f:
        contents = f.read()

    with open(priv_key, "rb") as kf:
        private_key = serialization.load_pem_private_key(
            kf.read(), password=priv_key_password, backend=default_backend())

    signature = private_key.sign(contents, padding.PKCS1v15(), hashes.SHA512())

    with open(target_file, "ab") as f:
        f.write(signature)
项目:latexipy    作者:masasin    | 项目源码 | 文件源码
def load_key(pubkey):
    """Load public RSA key.

    Work around keys with incorrect header/footer format.

    Read more about RSA encryption with cryptography:
    https://cryptography.io/latest/hazmat/primitives/asymmetric/rsa/
    """
    try:
        return load_pem_public_key(pubkey.encode(), default_backend())
    except ValueError:
        # workaround for https://github.com/travis-ci/travis-api/issues/196
        pubkey = pubkey.replace('BEGIN RSA', 'BEGIN').replace('END RSA', 'END')
        return load_pem_public_key(pubkey.encode(), default_backend())
项目:sketch-components    作者:ibhubs    | 项目源码 | 文件源码
def load_key(pubkey):
    """Load public RSA key.

    Work around keys with incorrect header/footer format.

    Read more about RSA encryption with cryptography:
    https://cryptography.io/latest/hazmat/primitives/asymmetric/rsa/
    """
    try:
        return load_pem_public_key(pubkey.encode(), default_backend())
    except ValueError:
        # workaround for https://github.com/travis-ci/travis-api/issues/196
        pubkey = pubkey.replace('BEGIN RSA', 'BEGIN').replace('END RSA', 'END')
        return load_pem_public_key(pubkey.encode(), default_backend())
项目:messier    作者:conorsch    | 项目源码 | 文件源码
def load_key(pubkey):
    """Load public RSA key, with work-around for keys using
    incorrect header/footer format.

    Read more about RSA encryption with cryptography:
    https://cryptography.io/latest/hazmat/primitives/asymmetric/rsa/
    """
    try:
        return load_pem_public_key(pubkey.encode(), default_backend())
    except ValueError:
        # workaround for https://github.com/travis-ci/travis-api/issues/196
        pubkey = pubkey.replace('BEGIN RSA', 'BEGIN').replace('END RSA', 'END')
        return load_pem_public_key(pubkey.encode(), default_backend())
项目:QUANTAXIS    作者:yutiansut    | 项目源码 | 文件源码
def __init__(self, broker="http://127.0.0.1:19820/api", encoding="utf-8", enc_key=None, enc_iv=None):
        super().__init__()

        self._endpoint = broker
        self._encoding = "utf-8"
        if enc_key == None or enc_iv == None:
            self._transport_enc = False
            self._transport_enc_key = None
            self._transport_enc_iv = None
            self._cipher = None
        else:
            self._transport_enc = True
            self._transport_enc_key = enc_key
            self._transport_enc_iv = enc_iv
            backend = default_backend()
            self._cipher = Cipher(algorithms.AES(
                enc_key), modes.CBC(enc_iv), backend=backend)

        self._session = requests.Session()
        self._event_dict = {'logon': self.on_login, 'logoff': self.on_logout, 'ping': self.on_ping,
                            'query_data': self.on_query_data, 'send_order': self.on_insert_order,
                            'cancel_order': self.on_cancel_order_event, 'get_quote': self.on_get_quote}
        self.client_id = ''
        self.account_id = ''
项目:manage    作者:rochacbruno    | 项目源码 | 文件源码
def load_key(pubkey):
    """Load public RSA key, with work-around for keys using
    incorrect header/footer format.

    Read more about RSA encryption with cryptography:
    https://cryptography.io/latest/hazmat/primitives/asymmetric/rsa/
    """
    try:
        return load_pem_public_key(pubkey.encode(), default_backend())
    except ValueError:
        # workaround for https://github.com/travis-ci/travis-api/issues/196
        pubkey = pubkey.replace('BEGIN RSA', 'BEGIN').replace('END RSA', 'END')
        return load_pem_public_key(pubkey.encode(), default_backend())
项目:mbin    作者:fanglab    | 项目源码 | 文件源码
def load_key(pubkey):
    """Load public RSA key.

    Work around keys with incorrect header/footer format.

    Read more about RSA encryption with cryptography:
    https://cryptography.io/latest/hazmat/primitives/asymmetric/rsa/
    """
    try:
        return load_pem_public_key(pubkey.encode(), default_backend())
    except ValueError:
        # workaround for https://github.com/travis-ci/travis-api/issues/196
        pubkey = pubkey.replace('BEGIN RSA', 'BEGIN').replace('END RSA', 'END')
        return load_pem_public_key(pubkey.encode(), default_backend())
项目:privcount    作者:privcount    | 项目源码 | 文件源码
def load_private_key_string(key_string):
    return serialization.load_pem_private_key(key_string, password=None, backend=default_backend())
项目:privcount    作者:privcount    | 项目源码 | 文件源码
def load_public_key_string(key_string):
    return serialization.load_pem_public_key(key_string, backend=default_backend())
项目:privcount    作者:privcount    | 项目源码 | 文件源码
def get_hmac(secret_key, unique_prefix, data):
    '''
    Perform a HMAC using the secret key, unique hash prefix, and data.
    The key must be kept secret.
    The prefix ensures hash uniqueness.
    Returns HMAC-SHA256(secret_key, unique_prefix | data) as bytes.
    '''
    # If the secret key is shorter than the digest size, security is reduced
    assert secret_key
    assert len(secret_key) >= CryptoHash.digest_size
    h = hmac.HMAC(bytes(secret_key), CryptoHash(), backend=default_backend())
    h.update(bytes(unique_prefix))
    h.update(bytes(data))
    return bytes(h.finalize())
项目:privcount    作者:privcount    | 项目源码 | 文件源码
def generate_keypair(key_out_path):
    private_key = rsa.generate_private_key(public_exponent=65537, key_size=4096, backend=default_backend())
    pem = private_key.private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption())
    with open(key_out_path, 'wb') as outf:
        print >>outf, pem
项目:document_clipper    作者:reclamador    | 项目源码 | 文件源码
def load_key(pubkey):
    """Load public RSA key.

    Work around keys with incorrect header/footer format.

    Read more about RSA encryption with cryptography:
    https://cryptography.io/latest/hazmat/primitives/asymmetric/rsa/
    """
    try:
        return load_pem_public_key(pubkey.encode(), default_backend())
    except ValueError:
        # workaround for https://github.com/travis-ci/travis-api/issues/196
        pubkey = pubkey.replace('BEGIN RSA', 'BEGIN').replace('END RSA', 'END')
        return load_pem_public_key(pubkey.encode(), default_backend())
项目:mutant    作者:peterdemin    | 项目源码 | 文件源码
def load_key(pubkey):
    """Load public RSA key, with work-around for keys using
    incorrect header/footer format.

    Read more about RSA encryption with cryptography:
    https://cryptography.io/latest/hazmat/primitives/asymmetric/rsa/
    """
    try:
        return load_pem_public_key(pubkey.encode(), default_backend())
    except ValueError:
        # workaround for https://github.com/travis-ci/travis-api/issues/196
        pubkey = pubkey.replace('BEGIN RSA', 'BEGIN').replace('END RSA', 'END')
        return load_pem_public_key(pubkey.encode(), default_backend())
项目:seqlog    作者:tintoy    | 项目源码 | 文件源码
def load_key(pubkey):
    """Load public RSA key, with work-around for keys using
    incorrect header/footer format.

    Read more about RSA encryption with cryptography:
    https://cryptography.io/latest/hazmat/primitives/asymmetric/rsa/
    """
    try:
        return load_pem_public_key(pubkey.encode(), default_backend())
    except ValueError:
        # workaround for https://github.com/travis-ci/travis-api/issues/196
        pubkey = pubkey.replace('BEGIN RSA', 'BEGIN').replace('END RSA', 'END')
        return load_pem_public_key(pubkey.encode(), default_backend())
项目:word_password_generator    作者:kierun    | 项目源码 | 文件源码
def load_key(pubkey):
    """Load public RSA key, with work-around for keys using
    incorrect header/footer format.

    Read more about RSA encryption with cryptography:
    https://cryptography.io/latest/hazmat/primitives/asymmetric/rsa/
    """
    try:
        return load_pem_public_key(pubkey.encode(), default_backend())
    except ValueError:
        # workaround for https://github.com/travis-ci/travis-api/issues/196
        pubkey = pubkey.replace('BEGIN RSA', 'BEGIN').replace('END RSA', 'END')
        return load_pem_public_key(pubkey.encode(), default_backend())
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def __init__(self, key, backend=None):
        if backend is None:
            backend = default_backend()

        key = base64.urlsafe_b64decode(key)
        if len(key) != 32:
            raise ValueError(
                "Fernet key must be 32 url-safe base64-encoded bytes."
            )

        self._signing_key = key[:16]
        self._encryption_key = key[16:]
        self._backend = backend
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def __init__(self, key, backend=None):
        if backend is None:
            backend = default_backend()

        key = base64.urlsafe_b64decode(key)
        if len(key) != 32:
            raise ValueError(
                "Fernet key must be 32 url-safe base64-encoded bytes."
            )

        self._signing_key = key[:16]
        self._encryption_key = key[16:]
        self._backend = backend
项目:alexafsm    作者:allenai    | 项目源码 | 文件源码
def load_key(pubkey):
    """Load public RSA key, with work-around for keys using
    incorrect header/footer format.

    Read more about RSA encryption with cryptography:
    https://cryptography.io/latest/hazmat/primitives/asymmetric/rsa/
    """
    try:
        return load_pem_public_key(pubkey.encode(), default_backend())
    except ValueError:
        # workaround for https://github.com/travis-ci/travis-api/issues/196
        pubkey = pubkey.replace('BEGIN RSA', 'BEGIN').replace('END RSA', 'END')
        return load_pem_public_key(pubkey.encode(), default_backend())
项目:gbrs    作者:churchill-lab    | 项目源码 | 文件源码
def load_key(pubkey):
    """Load public RSA key, with work-around for keys using
    incorrect header/footer format.

    Read more about RSA encryption with cryptography:
    https://cryptography.io/latest/hazmat/primitives/asymmetric/rsa/
    """
    try:
        return load_pem_public_key(pubkey.encode(), default_backend())
    except ValueError:
        # workaround for https://github.com/travis-ci/travis-api/issues/196
        pubkey = pubkey.replace('BEGIN RSA', 'BEGIN').replace('END RSA', 'END')
        return load_pem_public_key(pubkey.encode(), default_backend())
项目:HtmlTestRunner    作者:oldani    | 项目源码 | 文件源码
def load_key(pubkey):
    """Load public RSA key, with work-around for keys using
    incorrect header/footer format.

    Read more about RSA encryption with cryptography:
    https://cryptography.io/latest/hazmat/primitives/asymmetric/rsa/
    """
    try:
        return load_pem_public_key(pubkey.encode(), default_backend())
    except ValueError:
        # workaround for https://github.com/travis-ci/travis-api/issues/196
        pubkey = pubkey.replace('BEGIN RSA', 'BEGIN').replace('END RSA', 'END')
        return load_pem_public_key(pubkey.encode(), default_backend())
项目:pluralsight    作者:tonybaloney    | 项目源码 | 文件源码
def load_key(pubkey):
    """Load public RSA key, with work-around for keys using
    incorrect header/footer format.

    Read more about RSA encryption with cryptography:
    https://cryptography.io/latest/hazmat/primitives/asymmetric/rsa/
    """
    try:
        return load_pem_public_key(pubkey.encode(), default_backend())
    except ValueError:
        # workaround for https://github.com/travis-ci/travis-api/issues/196
        pubkey = pubkey.replace('BEGIN RSA', 'BEGIN').replace('END RSA', 'END')
        return load_pem_public_key(pubkey.encode(), default_backend())