Python Crypto.Cipher.AES 模块,block_size() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用Crypto.Cipher.AES.block_size()

项目:Stitch    作者:nathanlopez    | 项目源码 | 文件源码
def get_encryption():
    return '''
import base64
from Crypto import Random
from Crypto.Cipher import AES

abbrev = '{2}'
{0} = base64.b64decode('{1}')

def encrypt(raw):
    iv = Random.new().read( AES.block_size )
    cipher = AES.new({0}, AES.MODE_CFB, iv )
    return (base64.b64encode( iv + cipher.encrypt( raw ) ) )

def decrypt(enc):
    enc = base64.b64decode(enc)
    iv = enc[:16]
    cipher = AES.new({0}, AES.MODE_CFB, iv )
    return cipher.decrypt( enc[16:] )
'''.format(st_obf[0],aes_encoded,aes_abbrev)

################################################################################
#                       st_protocol.py stitch_gen variables                    #
################################################################################
项目:Netkeeper    作者:1941474711    | 项目源码 | 文件源码
def AESEnc(sour, key):
    from Crypto.Cipher import AES
    from Crypto import Random

    sour = sour.encode('utf8')
    key = key.encode('utf8')
    bs = AES.block_size
    pad = lambda s: s + (bs - len(s) % bs) * chr(bs - len(s) % bs)
    iv = Random.new().read(bs)
    cipher = AES.new(key, AES.MODE_ECB, iv)
    resData1 = cipher.encrypt(pad(sour))

    resData2 = resData1.encode('hex')
    resData3 = resData2.upper()
    print resData3
    return resData3
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _pseudo_random_data(self, bytes):
        if not (0 <= bytes <= self.max_bytes_per_request):
            raise AssertionError("You cannot ask for more than 1 MiB of data per request")

        num_blocks = ceil_shift(bytes, self.block_size_shift)   # num_blocks = ceil(bytes / self.block_size)

        # Compute the output
        retval = self._generate_blocks(num_blocks)[:bytes]

        # Switch to a new key to avoid later compromises of this output (i.e.
        # state compromise extension attacks)
        self._set_key(self._generate_blocks(self.blocks_per_key))

        assert len(retval) == bytes
        assert len(self.key) == self.key_size

        return retval
项目:matasano    作者:shainer    | 项目源码 | 文件源码
def __init__(self):
        self.iv = Random.new().read(AES.block_size)
        self.key = Random.new().read(AES.block_size)
        self.cipher = AES.new(key=self.key, mode=AES.MODE_CBC, IV=self.iv)

        texts = []
        self.plaintexts = []

        # Reads all the lines from the data, as binary strings, then decodes
        # them from base64 and applies Pkcs7 padding.
        with open('data/7.txt', 'rb') as dataFile:
            texts = dataFile.readlines()

        for text in texts:
            strText = base64.b64decode(text)
            self.plaintexts.append(Pkcs7(strText))
项目:matasano    作者:shainer    | 项目源码 | 文件源码
def RandomEncrypt(plaintext):
    """Returns a tuple with the encrypted text and the mode used."""
    # Random key.
    key = Random.new().read(AES.block_size)

    # Random padding both before and after the plaintext. The
    # size of the second padding cannot be random since the result
    # needs to have a number of bytes multiple of 16.
    paddingSize = random.randint(5, 10)
    prepend = Random.new().read(paddingSize)
    append = Random.new().read(AES.block_size - paddingSize)

    # Pick encryption mode at random.
    mode = None
    if random.randint(0, 1) == 0:
        mode = AES.MODE_ECB
    else:
        mode = AES.MODE_CBC

    # Perform the encryption.
    aes = aes_lib.AESCipher(key, mode=mode)
    text = prepend + plaintext + append
    return (aes.aes_encrypt(text), mode)
项目:matasano    作者:shainer    | 项目源码 | 文件源码
def __init__(self, key=None, mode=AES.MODE_ECB, iv=None):
        """Initialize a AES cipher with the given mode, and key (as a byte string)

        Parameters:
            key: the key, as a byte string (e.g. b'YELLOW SUBMARINE'). If None,
                a random one will be generated.
            mode: AES mode. Default: AES.MODE_ECB.
            iv: IV. If None, a random one will be generated internally.
        """

        if iv is None:
            self._iv = self.GenerateRandomBytes(AES.block_size)
        else:
            self._iv = iv

        if key is None:
            self._key = self.GenerateRandomBytes(AES.block_size)
        else:
            self._key = key

        self.mode = mode
        self._cipher = AES.new(key=self._key, mode=self.mode, IV=self._iv)
项目:matasano    作者:shainer    | 项目源码 | 文件源码
def SimulateCBCEncryption(self, plaintext):
        assert self.mode == AES.MODE_ECB

        prev_ct = self._iv
        block_index = 0
        ciphertext = b''

        # The loop simulates decryption through AES in CBC mode.
        # In such mode, the ciphertext is divided in blocks the size
        # of the key. Each block is decrypted, then the plaintext is XORed
        # with the previous ciphertext block. To initialize the algorithm,
        # a random IV (initialization vector) is used.
        while block_index < len(plaintext):
            block = plaintext[block_index : block_index + AES.block_size]
            final_block = self._ByteXOR(block, prev_ct)

            cipher_block = self.aes_encrypt(final_block)
            prev_ct = cipher_block
            ciphertext += cipher_block

            block_index += AES.block_size

        return ciphertext
项目:matasano    作者:shainer    | 项目源码 | 文件源码
def encrypt(self, plaintext):
        """CBC encryption."""
        cipher = AES.new(key=self._key, mode=AES.MODE_ECB)

        # The full URL is not necessary for this setup, so I am just encrypting
        # the plaintext as it is. I don't even need to support padding.
        prev_ct = self._iv
        block_index = 0
        ciphertext = b''

        # The loop simulates encryption through AES in CBC mode.
        while block_index < len(plaintext):
            block = plaintext[block_index : block_index + AES.block_size]
            final_block = strxor(block, prev_ct)

            cipher_block = cipher.encrypt(final_block)
            prev_ct = cipher_block
            ciphertext += cipher_block

            block_index += AES.block_size

        return ciphertext
项目:matasano    作者:shainer    | 项目源码 | 文件源码
def decrypt(self, ciphertext):
        """CBC decryption."""
        cipher = AES.new(key=self._key, mode=AES.MODE_ECB)

        prev_ct = self._iv
        block_index = 0
        plaintext = b''

        # The loop simulates decryption through AES in CBC mode.
        while block_index < len(ciphertext):
            block = ciphertext[block_index : block_index + AES.block_size]

            prep_plaintext = cipher.decrypt(block)
            plaintext += strxor(prev_ct, prep_plaintext)
            prev_ct = block

            block_index += AES.block_size

        # Here we should check if this is all readable ASCII, and raise an
        # exception if it's not. However that part is not really necessary,
        # and converting from Exception object to byte string (instead of a
        # usual string) does not look great so let's be lazy :)
        return plaintext
项目:matasano    作者:shainer    | 项目源码 | 文件源码
def EditCTR(ciphertext, offset, newText, ctrObj):
    numBlocks = block_utils.GetNumBlocks(ciphertext)

    # Sanity checking.
    if offset < 0 or offset > numBlocks - 1:
        raise ValueError("Invalid offset.")

    if len(newText) != AES.block_size:
        raise ValueError("New plaintext must be 1 block in size")

    # Encrypt the new block of text using the value of the
    # counter for the 'offset' block of the ciphertext. The idea
    # is that newBlock will replace the block at position 'offset'
    # in the ciphertext, although here we do not perform the
    # actual substitution.
    newBlock = ctrObj.OneBlockCrypt(newText, offset)
    return newBlock

# This function is only here to recover the text as explained
# in the challenge. Of course here we need to know the key :)
项目:watchmen    作者:lycclsltt    | 项目源码 | 文件源码
def _pseudo_random_data(self, bytes):
        if not (0 <= bytes <= self.max_bytes_per_request):
            raise AssertionError("You cannot ask for more than 1 MiB of data per request")

        num_blocks = ceil_shift(bytes, self.block_size_shift)   # num_blocks = ceil(bytes / self.block_size)

        # Compute the output
        retval = self._generate_blocks(num_blocks)[:bytes]

        # Switch to a new key to avoid later compromises of this output (i.e.
        # state compromise extension attacks)
        self._set_key(self._generate_blocks(self.blocks_per_key))

        assert len(retval) == bytes
        assert len(self.key) == self.key_size

        return retval
项目:warriorframework    作者:warriorframework    | 项目源码 | 文件源码
def get_key(encoded_key):
    IV = None
    CIPHER = None
    if encoded_key is False:
        try:
            MYFILE = Tools.__path__[0]+os.sep+"admin"+os.sep+'secret.key'
            with open(MYFILE, 'r') as myfileHandle:
                encoded_key = myfileHandle.read()
        except IOError:
            print_error("Could not find the secret.key file in Tools/Admin!")
    try:
        IV = Random.new().read(AES.block_size)
        CIPHER = AES.new(base64.b64decode(encoded_key), AES.MODE_CFB, IV)
    except Exception as e:
        print_exception("Some problem occured: {0}".format(e))

    return IV, CIPHER
项目:storj-python-sdk    作者:Storj    | 项目源码 | 文件源码
def decrypt_file_aes(self, in_file, out_file, password, key_length=32):
        bs = AES.block_size
        salt = in_file.read(bs)[len('Salted__'):]
        key, iv = self.derive_key_and_iv(password, salt, key_length, bs)
        cipher = AES.new(key, AES.MODE_CBC, iv)
        next_chunk = ''
        finished = False
        while not finished:
            chunk, next_chunk = next_chunk, cipher.decrypt(in_file.read(1024 * bs))
            if len(next_chunk) == 0:
                padding_length = ord(chunk[-1])
                if padding_length < 1 or padding_length > bs:
                    raise ValueError('bad decrypt pad (%d)' % padding_length)
                # all the pad-bytes must be the same
                if chunk[-padding_length:] != (padding_length * chr(padding_length)):
                    # this is similar to the bad decrypt:evp_enc.c from openssl program
                    raise ValueError('bad decrypt')
                chunk = chunk[:-padding_length]
                finished = True
            out_file.write(chunk)
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def _pseudo_random_data(self, bytes):
        if not (0 <= bytes <= self.max_bytes_per_request):
            raise AssertionError("You cannot ask for more than 1 MiB of data per request")

        num_blocks = ceil_shift(bytes, self.block_size_shift)   # num_blocks = ceil(bytes / self.block_size)

        # Compute the output
        retval = self._generate_blocks(num_blocks)[:bytes]

        # Switch to a new key to avoid later compromises of this output (i.e.
        # state compromise extension attacks)
        self._set_key(self._generate_blocks(self.blocks_per_key))

        assert len(retval) == bytes
        assert len(self.key) == self.key_size

        return retval
项目:py-http-test-framework    作者:iyaozhen    | 项目源码 | 文件源码
def test_encrypter(self):
        """
        test_encrypter
        :return:
        """
        key = '0123456701234567'
        iv = Random.new().read(AES.block_size)
        s = '?? 10:00 AM'
        # ECB
        encrypter = tools.Encrypter(key)
        self.assertEqual(encrypter.decrypt(encrypter.encrypt(s)), s)
        # CBC
        encrypter = tools.Encrypter(key, AES.MODE_CBC, iv)
        self.assertEqual(encrypter.decrypt(encrypter.encrypt(s)), s)
        # CFB
        encrypter = tools.Encrypter(key, AES.MODE_CFB, iv)
        self.assertEqual(encrypter.decrypt(encrypter.encrypt(s)), s)
项目:git_intgrtn_aws_s3    作者:droidlabour    | 项目源码 | 文件源码
def _pseudo_random_data(self, bytes):
        if not (0 <= bytes <= self.max_bytes_per_request):
            raise AssertionError("You cannot ask for more than 1 MiB of data per request")

        num_blocks = ceil_shift(bytes, self.block_size_shift)   # num_blocks = ceil(bytes / self.block_size)

        # Compute the output
        retval = self._generate_blocks(num_blocks)[:bytes]

        # Switch to a new key to avoid later compromises of this output (i.e.
        # state compromise extension attacks)
        self._set_key(self._generate_blocks(self.blocks_per_key))

        assert len(retval) == bytes
        assert len(self.key) == self.key_size

        return retval
项目:sdos-core    作者:sdos    | 项目源码 | 文件源码
def __init__(self, key = "", outerHeader = b""):
        """
        Constructor
        """
        self.outerHeader = outerHeader
        self.innerHeader = outerHeader
        self.log = logging.getLogger(__name__)
        self.key = key
        self.blockSize = AES.block_size  # 16 bytes

    # self._padBytesIO = lambda s: s + (self.blockSize - len(s) % self.blockSize) * chr(self.blockSize - len(s) % self.blockSize)
    # self._unpadBytesIO = lambda s : s[:-ord(s[len(s)-1:])]




    ###############################################################################
    ###############################################################################
项目:MCSManager-fsmodule    作者:Suwings    | 项目源码 | 文件源码
def _pseudo_random_data(self, bytes):
        if not (0 <= bytes <= self.max_bytes_per_request):
            raise AssertionError("You cannot ask for more than 1 MiB of data per request")

        num_blocks = ceil_shift(bytes, self.block_size_shift)   # num_blocks = ceil(bytes / self.block_size)

        # Compute the output
        retval = self._generate_blocks(num_blocks)[:bytes]

        # Switch to a new key to avoid later compromises of this output (i.e.
        # state compromise extension attacks)
        self._set_key(self._generate_blocks(self.blocks_per_key))

        assert len(retval) == bytes
        assert len(self.key) == self.key_size

        return retval
项目:nc-backup-py    作者:ChinaNetCloud    | 项目源码 | 文件源码
def decrypt(self, in_file, out_file, password, key_length=32, home_folder=''):
        bs = AES.block_size
        salt = in_file.read(bs)[len('Salted__'):]
        key, iv = self.__derive_key_and_iv(password, salt, key_length, bs)
        cipher = AES.new(key, AES.MODE_CBC, iv)
        next_chunk = ''
        finished = False
        print out_file
        # print in_file
        while not finished:
            chunk, next_chunk = next_chunk, cipher.decrypt(in_file.read(1024 * bs))
            if len(next_chunk) == 0:
                padding_length = ord(chunk[-1])
                if padding_length < 1 or padding_length > bs:
                   raise ValueError("bad decrypt pad (%d)" % padding_length)
                # all the pad-bytes must be the same
                if chunk[-padding_length:] != (padding_length * chr(padding_length)):
                   # this is similar to the bad decrypt:evp_enc.c from openssl program
                   raise ValueError("bad decrypt")
                chunk = chunk[:-padding_length]
                finished = True
            out_file.write(chunk)
        return out_file.name
项目:EasyStorj    作者:lakewik    | 项目源码 | 文件源码
def encrypt_file_aes(self, in_file, out_file, password, key_length=32):
        bs = AES.block_size
        salt = Random.new().read(bs - len('Salted__'))
        key, iv = self.derive_key_and_iv(password, salt, key_length, bs)
        cipher = AES.new(key, AES.MODE_CBC, iv)
        out_file.write('Salted__' + salt)
        finished = False

        while not finished:
            chunk = in_file.read(1024 * bs)

            if len(chunk) == 0 or len(chunk) % bs != 0:
                padding_length = bs - (len(chunk) % bs)
                chunk += padding_length * chr(padding_length)
                finished = True

            out_file.write(cipher.encrypt(chunk))
项目:PyMal    作者:cysinfo    | 项目源码 | 文件源码
def _pseudo_random_data(self, bytes):
        if not (0 <= bytes <= self.max_bytes_per_request):
            raise AssertionError("You cannot ask for more than 1 MiB of data per request")

        num_blocks = ceil_shift(bytes, self.block_size_shift)   # num_blocks = ceil(bytes / self.block_size)

        # Compute the output
        retval = self._generate_blocks(num_blocks)[:bytes]

        # Switch to a new key to avoid later compromises of this output (i.e.
        # state compromise extension attacks)
        self._set_key(self._generate_blocks(self.blocks_per_key))

        assert len(retval) == bytes
        assert len(self.key) == self.key_size

        return retval
项目:SublimeRemoteGDB    作者:summerwinter    | 项目源码 | 文件源码
def _pseudo_random_data(self, bytes):
        if not (0 <= bytes <= self.max_bytes_per_request):
            raise AssertionError("You cannot ask for more than 1 MiB of data per request")

        num_blocks = ceil_shift(bytes, self.block_size_shift)   # num_blocks = ceil(bytes / self.block_size)

        # Compute the output
        retval = self._generate_blocks(num_blocks)[:bytes]

        # Switch to a new key to avoid later compromises of this output (i.e.
        # state compromise extension attacks)
        self._set_key(self._generate_blocks(self.blocks_per_key))

        assert len(retval) == bytes
        assert len(self.key) == self.key_size

        return retval
项目:fusecry    作者:nul-one    | 项目源码 | 文件源码
def enc(self, chunk):
        """Encrypt a chunk of bytes and returned encrypted chunk.

        Initialization vector is randomly generated for each chunk.

        Args:
            chunk (bytes): Plain text data to be encrypted.

        Returns:
            bytes: Encrypted chunk.

            Encrypted chunk is bytes object consisting of TAG and cipher text.
            Cipher text is IV + encrypted chunk. TAG is HMAC of cipher text
            using hashed_aes_key as key.
        """
        if not chunk:
            return bytes(0)
        checksum = HMAC.new(self.hashed_aes_key, digestmod=self.hash_func)
        chunk += bytes((AES.block_size - len(chunk)) % AES.block_size)
        enc_data = self._aes_enc(chunk)
        checksum.update(enc_data)
        return checksum.digest() + enc_data
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def _pseudo_random_data(self, bytes):
        if not (0 <= bytes <= self.max_bytes_per_request):
            raise AssertionError("You cannot ask for more than 1 MiB of data per request")

        num_blocks = ceil_shift(bytes, self.block_size_shift)   # num_blocks = ceil(bytes / self.block_size)

        # Compute the output
        retval = self._generate_blocks(num_blocks)[:bytes]

        # Switch to a new key to avoid later compromises of this output (i.e.
        # state compromise extension attacks)
        self._set_key(self._generate_blocks(self.blocks_per_key))

        assert len(retval) == bytes
        assert len(self.key) == self.key_size

        return retval
项目:Encryped-file-system    作者:kittenish    | 项目源码 | 文件源码
def _pseudo_random_data(self, bytes):
        if not (0 <= bytes <= self.max_bytes_per_request):
            raise AssertionError("You cannot ask for more than 1 MiB of data per request")

        num_blocks = ceil_shift(bytes, self.block_size_shift)   # num_blocks = ceil(bytes / self.block_size)

        # Compute the output
        retval = self._generate_blocks(num_blocks)[:bytes]

        # Switch to a new key to avoid later compromises of this output (i.e.
        # state compromise extension attacks)
        self._set_key(self._generate_blocks(self.blocks_per_key))

        assert len(retval) == bytes
        assert len(self.key) == self.key_size

        return retval
项目:Encryped-file-system    作者:kittenish    | 项目源码 | 文件源码
def encrypt(key, text, isfile=False):

    ciphertexts = ''
    print_text = str(text[0:min(50, len(text))]) + '...'
    print "Begin to encrypt:", print_text

    if not isfile:
        for start in xrange(0, len(text), chunk_size(key)):
            end = start + chunk_size(key)
            chunk = text[start:end]
            ciphertext = key.encrypt(chunk, K=0)[0]
            ciphertexts = ciphertexts + base64.b64encode(ciphertext)
            # len(base64.b64encode(ciphertext)) = 344
    else:
        iv = Random.new().read(AES.block_size)
        cipher = AES.new(key, AES.MODE_CFB, iv)
        ciphertexts = iv + cipher.encrypt(text)

    return ciphertexts
项目:internet-content-detection    作者:liubo0621    | 项目源码 | 文件源码
def encrypt(self, text):
        #????key ?????16?AES-128??24?AES-192???32?AES-256?Bytes ??.??AES-128???
        cryptor = AES.new(self.key, self.mode, self.key)

        #?????????? ?????? ????base64????????????????????AES???????base64????
        text = text.encode('utf8')
        text = base64.b64encode(text)
        text = text.decode('utf8')
        #???????text??16????????text???16???????????16???
        text = text + (AES.block_size - (len(text) % AES.block_size)) * self.append
        self.ciphertext = cryptor.encrypt(text)
        #??AES??????????????ascii??????????????????????
        #?????????????????16?????
        return b2a_hex(self.ciphertext).decode('utf8')

    #????????????strip() ??
项目:CryptoAttacks    作者:GrosQuildu    | 项目源码 | 文件源码
def encryption_oracle_aes(payload):
    global constant, prefix_len, suffix_len, secret
    if secret:
        if constant:
            payload = random_str(prefix_len) + payload + secret
        else:
            payload = random_str(random.randint(1, 50)) + payload + secret
    else:
        if constant:
            payload = random_str(prefix_len) + payload + random_str(suffix_len)
        else:
            payload = random_str(random.randint(1, 50)) + payload + random_str(random.randint(1, 50))

    payload = add_padding(payload, AES.block_size)
    cipher = AES.new(key_AES, AES.MODE_ECB)
    return cipher.encrypt(payload)
项目:CryptoAttacks    作者:GrosQuildu    | 项目源码 | 文件源码
def encryption_oracle_des(payload):
    global constant, prefix_len, suffix_len, secret
    if secret:
        if constant:
            payload = random_str(prefix_len) + payload + secret
        else:
            payload = random_str(random.randint(1, 50)) + payload + secret
    else:
        if constant:
            payload = random_str(prefix_len) + payload + random_str(suffix_len)
        else:
            payload = random_str(random.randint(1, 50)) + payload + random_str(random.randint(1, 50))

    payload = add_padding(payload, DES3.block_size)
    cipher = DES3.new(key_DES3, DES3.MODE_ECB)
    return cipher.encrypt(payload)
项目:CryptoAttacks    作者:GrosQuildu    | 项目源码 | 文件源码
def test_fake_ciphertext_padding_oracle(amount=5):
    for _ in range(amount):
        new_plaintext = random_str(randint(1, 10))
        new_plaintext_padded = add_padding(new_plaintext, block_size)

        print("Test small: cbc.fake_ciphertext(new_plaintext_padded, padding_oracle=padding_oracle)")
        new_ciphertext = cbc.fake_ciphertext(new_plaintext_padded, padding_oracle=padding_oracle)
        decrypted = h2b(subprocess.check_output(
            ['python', './cbc_oracles.py', 'decrypt', b2h(new_ciphertext)]).strip())
        assert decrypted == new_plaintext

    for _ in range(amount):
        new_plaintext = random_str(randint(10, 50))
        new_plaintext_padded = add_padding(new_plaintext, block_size)

        print("Test large: cbc.fake_ciphertext(new_plaintext_padded, padding_oracle=padding_oracle)")
        new_ciphertext = cbc.fake_ciphertext(new_plaintext_padded, padding_oracle=padding_oracle)
        decrypted = h2b(subprocess.check_output(
            ['python', './cbc_oracles.py', 'decrypt', b2h(new_ciphertext)]).strip())
        assert decrypted == new_plaintext
项目:CryptoAttacks    作者:GrosQuildu    | 项目源码 | 文件源码
def test_fake_ciphertext_decryption_oracle(amount=5):
    for _ in range(amount):
        new_plaintext = random_str(randint(1, 10))
        new_plaintext_padded = add_padding(new_plaintext, block_size)

        print("Test small: cbc.fake_ciphertext(new_plaintext_padded, decryption_oracle=decryption_oracle)")
        new_ciphertext = cbc.fake_ciphertext(new_plaintext_padded, decryption_oracle=decryption_oracle)
        decrypted = h2b(subprocess.check_output(
            ['python', './cbc_oracles.py', 'decrypt', b2h(new_ciphertext)]).strip())
        assert decrypted == new_plaintext

    for _ in range(amount):
        new_plaintext = random_str(randint(10, 50))
        new_plaintext_padded = add_padding(new_plaintext, block_size)

        print("Test large: cbc.fake_ciphertext(new_plaintext_padded, decryption_oracle=decryption_oracle)")
        new_ciphertext = cbc.fake_ciphertext(new_plaintext_padded, decryption_oracle=decryption_oracle, padding_oracle=padding_oracle)
        decrypted = h2b(subprocess.check_output(
            ['python', './cbc_oracles.py', 'decrypt', b2h(new_ciphertext)]).strip())
        assert decrypted == new_plaintext
项目:CryptoAttacks    作者:GrosQuildu    | 项目源码 | 文件源码
def test_iv_as_key(from_test=1):
    global iv_as_key
    iv_as_key = True
    plaintext = random_str(randint(1, 40))
    plaintext_padded = add_padding(plaintext, block_size)
    ciphertext = encrypt(plaintext, iv_as_key=iv_as_key)

    if from_test <= 1:
        print("Test 1: cbc.iv_as_key()")
        key = cbc.iv_as_key(ciphertext[AES.block_size:], plaintext_padded, padding_oracle=padding_oracle)
        assert key == KEY

    if from_test <= 2:
        print("Test 2: cbc.iv_as_key()")
        key = cbc.iv_as_key(ciphertext[AES.block_size:], plaintext_padded, decryption_oracle=decryption_oracle)
        assert key == KEY

    iv_as_key = False
项目:isf    作者:w3h    | 项目源码 | 文件源码
def _pseudo_random_data(self, bytes):
        if not (0 <= bytes <= self.max_bytes_per_request):
            raise AssertionError("You cannot ask for more than 1 MiB of data per request")

        num_blocks = ceil_shift(bytes, self.block_size_shift)   # num_blocks = ceil(bytes / self.block_size)

        # Compute the output
        retval = self._generate_blocks(num_blocks)[:bytes]

        # Switch to a new key to avoid later compromises of this output (i.e.
        # state compromise extension attacks)
        self._set_key(self._generate_blocks(self.blocks_per_key))

        assert len(retval) == bytes
        assert len(self.key) == self.key_size

        return retval
项目:cryptopal    作者:lanjelot    | 项目源码 | 文件源码
def encrypt(self, plaintext, block_size, iv=None):
    '''Encryption cannot be multithreaded'''

    plaintext = pkcs7pad(plaintext, block_size)

    if iv is None:
      iv = '\x00' * block_size

    encrypted = iv
    blocks = chunk(plaintext, block_size)
    self.resultq = Queue()

    block = iv
    for prev in blocks[::-1]:

      self.bust(block, block_size)

      _, inter = self.pop_result()
      block = xor(inter, prev)

      encrypted = block + encrypted

    return encrypted
项目:cryptopal    作者:lanjelot    | 项目源码 | 文件源码
def test_find_blocksize(self):

    def encryption_oracle(s):
      pt = pkcs7pad(s, AES.block_size)
      if mode == 'ECB':
        return encrypt_ecb(pt, key)
      elif mode == 'CBC':
        return encrypt_cbc(pt, key, iv)

    for mode in ['ECB', 'CBC']:
      for key_size in AES.key_size:
        for _ in xrange(100):
          key = random_bytes(key_size)
          iv = random_bytes(AES.block_size)

          self.assertTrue(AES.block_size == find_blocksize(encryption_oracle))
项目:Stitch    作者:nathanlopez    | 项目源码 | 文件源码
def encrypt(raw, aes_key=secret):
    iv = Random.new().read( AES.block_size )
    cipher = AES.new(aes_key, AES.MODE_CFB, iv )
    return (base64.b64encode( iv + cipher.encrypt( raw ) ) )
项目:Starfish    作者:BillWang139967    | 项目源码 | 文件源码
def encrypt(raw):
    raw = pad(raw)
    iv = Random.new().read(AES.block_size)
    cipher = AES.new(KEY, AES.MODE_CBC, iv)
    return base64.b64encode( iv + cipher.encrypt(raw) )
项目:Starfish    作者:BillWang139967    | 项目源码 | 文件源码
def encrypt(raw):
    raw = pad(raw)
    iv = Random.new().read(AES.block_size)
    cipher = AES.new(KEY, AES.MODE_CBC, iv)
    return base64.b64encode( iv + cipher.encrypt(raw) )
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def fetch_encyption_key(file_name):
    with open(file_name, 'r') as f:
        return f.read(AES.block_size)
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def get_initialization_vector(schema_id, initialization_vector_array=None):
    if initialization_vector_array is None:
        initialization_vector_array = os.urandom(AES.block_size)
    _verify_initialization_vector_params(initialization_vector_array)
    return MetaAttribute(
        schema_id=schema_id,
        payload_data=initialization_vector_array
    )
项目:watermark    作者:lishuaijuly    | 项目源码 | 文件源码
def encrypt(self, raw):
        iv = Random.new().read(AES.block_size)
        cipher = AES.new(self.key, AES.MODE_CBC, iv)
        return iv + cipher.encrypt(self._pad(raw))
项目:watermark    作者:lishuaijuly    | 项目源码 | 文件源码
def decrypt(self, enc):
        iv = enc[:AES.block_size]
        cipher = AES.new(self.key, AES.MODE_CBC, iv)
        return self._unpad(cipher.decrypt(enc[AES.block_size:]))
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def __init__(self):
        self.counter = Counter.new(nbits=self.block_size*8, initial_value=0, little_endian=True)
        self.key = None

        # Set some helper constants
        self.block_size_shift = exact_log2(self.block_size)
        assert (1 << self.block_size_shift) == self.block_size

        self.blocks_per_key = exact_div(self.key_size, self.block_size)
        assert self.key_size == self.blocks_per_key * self.block_size

        self.max_bytes_per_request = self.max_blocks_per_request * self.block_size
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _generate_blocks(self, num_blocks):
        if self.key is None:
            raise AssertionError("generator must be seeded before use")
        assert 0 <= num_blocks <= self.max_blocks_per_request
        retval = []
        for i in xrange(num_blocks >> 12):      # xrange(num_blocks / 4096)
            retval.append(self._cipher.encrypt(self._four_kiblocks_of_zeros))
        remaining_bytes = (num_blocks & 4095) << self.block_size_shift  # (num_blocks % 4095) * self.block_size
        retval.append(self._cipher.encrypt(self._four_kiblocks_of_zeros[:remaining_bytes]))
        return b("").join(retval)

# vim:set ts=4 sw=4 sts=4 expandtab:
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _encrypt(passphrase, data):
    from Crypto.Cipher import AES as cipher
    leftover = len(data) % cipher.block_size
    if leftover:
        data += ' '*(cipher.block_size - leftover)
    return cipher.new(md5.new(passphrase).digest()[:16]).encrypt(data)
项目:matasano    作者:shainer    | 项目源码 | 文件源码
def EncryptWithFixedNonce(plaintextes):
    """Challenge setup: encrypt all the plaintextes separately with
    a fixed nonce (0) and fixed random key."""
    result = []
    key = Random.new().read(AES.block_size)

    for pt in plaintextes:
        ct = ctr.DoCTR(base64.b64decode(pt), key, 0)
        result.append(ct)

    return result
项目:matasano    作者:shainer    | 项目源码 | 文件源码
def Pkcs7(string):
    """Applies Pkcs7 padding to a byte string."""
    numPadding = 0

    if len(string) % AES.block_size != 0:
        numPadding = AES.block_size - (len(string) % AES.block_size)

    paddedString = string

    for i in range(0, numPadding):
        paddedString += bytes([numPadding])

    return paddedString
项目:matasano    作者:shainer    | 项目源码 | 文件源码
def VerifyPkcs7(string):
    """Returns True or False depending on whether the byte string 'string'
    has valid Pkcs7 padding applied."""
    paddingFound = False

    # The string was not padded properly, since it is not of the
    # proper size for AES.
    if len(string) % AES.block_size != 0:
        raise Exception('String has not been padded properly.')

    index = len(string) - 1

    lastCh = string[index]
    # The padding bytes cannot be larger than the block size
    # by definition.
    if lastCh > AES.block_size:
        return False

    num_padding = 1
    while True:
        index -= 1
        if string[index] != lastCh:
            break

        num_padding += 1

    return num_padding == lastCh
项目:matasano    作者:shainer    | 项目源码 | 文件源码
def Block(text, numBlock):
    """
    Convenience function to return the block with index numBlock in the text.
    Blocks are of AES.block_size size. Indices start from 1, not 0.
    """
    block = text[(numBlock - 1) * AES.block_size : numBlock * AES.block_size]
    return block
项目:matasano    作者:shainer    | 项目源码 | 文件源码
def DoCTR(text, key, nonce):
    """The algorithm is the same for both encryption and decryption."""
    # Uses ECB inside for a single block.
    cipher = AES.new(key=key, mode=AES.MODE_ECB)
    resultingText = b''

    keystream = b''
    # There is no padding in CTR mode, so both a plaintext and a ciphertext
    # can be of non-exact block size. We decrypt the last block by generating
    # the keystream and only using the bytes we need.
    numBlocks = int(len(text) / AES.block_size)
    counter = 0

    # +1 to get the final bytes that do not fit in a block.
    for blockIndex in range(0, numBlocks + 1):
        # If at the end, simply get all the remaining bytes; there
        # will be exactly AES.block_size or less of them. Otherwise,
        # get a block.
        if blockIndex == numBlocks:
            block = text[blockIndex * AES.block_size :]
        else:
            block = text[blockIndex * AES.block_size : (blockIndex + 1) * AES.block_size]

        # The keystream generator is composed by 8 bytes of the nonce +
        # 8 bytes of counter, little-endian.
        keystreamGen = bytes([nonce]) * 8
        # This is a quick hack that only works if there are not enough
        # blocks of text to need the second digit of the counter too.
        # Since that is a lot of blocks we can be a bit lazy.
        keystreamGen += bytes([counter, 0, 0, 0, 0, 0, 0, 0])

        # Careful, we always encrypt the keystream even when decrypting.
        keystream = cipher.encrypt(keystreamGen)

        # The PT is simply the CT XORed with the keystream, and vice versa.
        # Since the PT only depends on the same block of CT plus a predictable
        # keystream, CTR is very suited for parallel decryption of many blocks.
        resultingText += BlockXOR(block, keystream)
        counter += 1

    return resultingText