我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用Crypto.Cipher.AES.block_size()。
def get_encryption(): return ''' import base64 from Crypto import Random from Crypto.Cipher import AES abbrev = '{2}' {0} = base64.b64decode('{1}') def encrypt(raw): iv = Random.new().read( AES.block_size ) cipher = AES.new({0}, AES.MODE_CFB, iv ) return (base64.b64encode( iv + cipher.encrypt( raw ) ) ) def decrypt(enc): enc = base64.b64decode(enc) iv = enc[:16] cipher = AES.new({0}, AES.MODE_CFB, iv ) return cipher.decrypt( enc[16:] ) '''.format(st_obf[0],aes_encoded,aes_abbrev) ################################################################################ # st_protocol.py stitch_gen variables # ################################################################################
def AESEnc(sour, key): from Crypto.Cipher import AES from Crypto import Random sour = sour.encode('utf8') key = key.encode('utf8') bs = AES.block_size pad = lambda s: s + (bs - len(s) % bs) * chr(bs - len(s) % bs) iv = Random.new().read(bs) cipher = AES.new(key, AES.MODE_ECB, iv) resData1 = cipher.encrypt(pad(sour)) resData2 = resData1.encode('hex') resData3 = resData2.upper() print resData3 return resData3
def _pseudo_random_data(self, bytes): if not (0 <= bytes <= self.max_bytes_per_request): raise AssertionError("You cannot ask for more than 1 MiB of data per request") num_blocks = ceil_shift(bytes, self.block_size_shift) # num_blocks = ceil(bytes / self.block_size) # Compute the output retval = self._generate_blocks(num_blocks)[:bytes] # Switch to a new key to avoid later compromises of this output (i.e. # state compromise extension attacks) self._set_key(self._generate_blocks(self.blocks_per_key)) assert len(retval) == bytes assert len(self.key) == self.key_size return retval
def __init__(self): self.iv = Random.new().read(AES.block_size) self.key = Random.new().read(AES.block_size) self.cipher = AES.new(key=self.key, mode=AES.MODE_CBC, IV=self.iv) texts = [] self.plaintexts = [] # Reads all the lines from the data, as binary strings, then decodes # them from base64 and applies Pkcs7 padding. with open('data/7.txt', 'rb') as dataFile: texts = dataFile.readlines() for text in texts: strText = base64.b64decode(text) self.plaintexts.append(Pkcs7(strText))
def RandomEncrypt(plaintext): """Returns a tuple with the encrypted text and the mode used.""" # Random key. key = Random.new().read(AES.block_size) # Random padding both before and after the plaintext. The # size of the second padding cannot be random since the result # needs to have a number of bytes multiple of 16. paddingSize = random.randint(5, 10) prepend = Random.new().read(paddingSize) append = Random.new().read(AES.block_size - paddingSize) # Pick encryption mode at random. mode = None if random.randint(0, 1) == 0: mode = AES.MODE_ECB else: mode = AES.MODE_CBC # Perform the encryption. aes = aes_lib.AESCipher(key, mode=mode) text = prepend + plaintext + append return (aes.aes_encrypt(text), mode)
def __init__(self, key=None, mode=AES.MODE_ECB, iv=None): """Initialize a AES cipher with the given mode, and key (as a byte string) Parameters: key: the key, as a byte string (e.g. b'YELLOW SUBMARINE'). If None, a random one will be generated. mode: AES mode. Default: AES.MODE_ECB. iv: IV. If None, a random one will be generated internally. """ if iv is None: self._iv = self.GenerateRandomBytes(AES.block_size) else: self._iv = iv if key is None: self._key = self.GenerateRandomBytes(AES.block_size) else: self._key = key self.mode = mode self._cipher = AES.new(key=self._key, mode=self.mode, IV=self._iv)
def SimulateCBCEncryption(self, plaintext): assert self.mode == AES.MODE_ECB prev_ct = self._iv block_index = 0 ciphertext = b'' # The loop simulates decryption through AES in CBC mode. # In such mode, the ciphertext is divided in blocks the size # of the key. Each block is decrypted, then the plaintext is XORed # with the previous ciphertext block. To initialize the algorithm, # a random IV (initialization vector) is used. while block_index < len(plaintext): block = plaintext[block_index : block_index + AES.block_size] final_block = self._ByteXOR(block, prev_ct) cipher_block = self.aes_encrypt(final_block) prev_ct = cipher_block ciphertext += cipher_block block_index += AES.block_size return ciphertext
def encrypt(self, plaintext): """CBC encryption.""" cipher = AES.new(key=self._key, mode=AES.MODE_ECB) # The full URL is not necessary for this setup, so I am just encrypting # the plaintext as it is. I don't even need to support padding. prev_ct = self._iv block_index = 0 ciphertext = b'' # The loop simulates encryption through AES in CBC mode. while block_index < len(plaintext): block = plaintext[block_index : block_index + AES.block_size] final_block = strxor(block, prev_ct) cipher_block = cipher.encrypt(final_block) prev_ct = cipher_block ciphertext += cipher_block block_index += AES.block_size return ciphertext
def decrypt(self, ciphertext): """CBC decryption.""" cipher = AES.new(key=self._key, mode=AES.MODE_ECB) prev_ct = self._iv block_index = 0 plaintext = b'' # The loop simulates decryption through AES in CBC mode. while block_index < len(ciphertext): block = ciphertext[block_index : block_index + AES.block_size] prep_plaintext = cipher.decrypt(block) plaintext += strxor(prev_ct, prep_plaintext) prev_ct = block block_index += AES.block_size # Here we should check if this is all readable ASCII, and raise an # exception if it's not. However that part is not really necessary, # and converting from Exception object to byte string (instead of a # usual string) does not look great so let's be lazy :) return plaintext
def EditCTR(ciphertext, offset, newText, ctrObj): numBlocks = block_utils.GetNumBlocks(ciphertext) # Sanity checking. if offset < 0 or offset > numBlocks - 1: raise ValueError("Invalid offset.") if len(newText) != AES.block_size: raise ValueError("New plaintext must be 1 block in size") # Encrypt the new block of text using the value of the # counter for the 'offset' block of the ciphertext. The idea # is that newBlock will replace the block at position 'offset' # in the ciphertext, although here we do not perform the # actual substitution. newBlock = ctrObj.OneBlockCrypt(newText, offset) return newBlock # This function is only here to recover the text as explained # in the challenge. Of course here we need to know the key :)
def get_key(encoded_key): IV = None CIPHER = None if encoded_key is False: try: MYFILE = Tools.__path__[0]+os.sep+"admin"+os.sep+'secret.key' with open(MYFILE, 'r') as myfileHandle: encoded_key = myfileHandle.read() except IOError: print_error("Could not find the secret.key file in Tools/Admin!") try: IV = Random.new().read(AES.block_size) CIPHER = AES.new(base64.b64decode(encoded_key), AES.MODE_CFB, IV) except Exception as e: print_exception("Some problem occured: {0}".format(e)) return IV, CIPHER
def decrypt_file_aes(self, in_file, out_file, password, key_length=32): bs = AES.block_size salt = in_file.read(bs)[len('Salted__'):] key, iv = self.derive_key_and_iv(password, salt, key_length, bs) cipher = AES.new(key, AES.MODE_CBC, iv) next_chunk = '' finished = False while not finished: chunk, next_chunk = next_chunk, cipher.decrypt(in_file.read(1024 * bs)) if len(next_chunk) == 0: padding_length = ord(chunk[-1]) if padding_length < 1 or padding_length > bs: raise ValueError('bad decrypt pad (%d)' % padding_length) # all the pad-bytes must be the same if chunk[-padding_length:] != (padding_length * chr(padding_length)): # this is similar to the bad decrypt:evp_enc.c from openssl program raise ValueError('bad decrypt') chunk = chunk[:-padding_length] finished = True out_file.write(chunk)
def test_encrypter(self): """ test_encrypter :return: """ key = '0123456701234567' iv = Random.new().read(AES.block_size) s = '?? 10:00 AM' # ECB encrypter = tools.Encrypter(key) self.assertEqual(encrypter.decrypt(encrypter.encrypt(s)), s) # CBC encrypter = tools.Encrypter(key, AES.MODE_CBC, iv) self.assertEqual(encrypter.decrypt(encrypter.encrypt(s)), s) # CFB encrypter = tools.Encrypter(key, AES.MODE_CFB, iv) self.assertEqual(encrypter.decrypt(encrypter.encrypt(s)), s)
def __init__(self, key = "", outerHeader = b""): """ Constructor """ self.outerHeader = outerHeader self.innerHeader = outerHeader self.log = logging.getLogger(__name__) self.key = key self.blockSize = AES.block_size # 16 bytes # self._padBytesIO = lambda s: s + (self.blockSize - len(s) % self.blockSize) * chr(self.blockSize - len(s) % self.blockSize) # self._unpadBytesIO = lambda s : s[:-ord(s[len(s)-1:])] ############################################################################### ###############################################################################
def decrypt(self, in_file, out_file, password, key_length=32, home_folder=''): bs = AES.block_size salt = in_file.read(bs)[len('Salted__'):] key, iv = self.__derive_key_and_iv(password, salt, key_length, bs) cipher = AES.new(key, AES.MODE_CBC, iv) next_chunk = '' finished = False print out_file # print in_file while not finished: chunk, next_chunk = next_chunk, cipher.decrypt(in_file.read(1024 * bs)) if len(next_chunk) == 0: padding_length = ord(chunk[-1]) if padding_length < 1 or padding_length > bs: raise ValueError("bad decrypt pad (%d)" % padding_length) # all the pad-bytes must be the same if chunk[-padding_length:] != (padding_length * chr(padding_length)): # this is similar to the bad decrypt:evp_enc.c from openssl program raise ValueError("bad decrypt") chunk = chunk[:-padding_length] finished = True out_file.write(chunk) return out_file.name
def encrypt_file_aes(self, in_file, out_file, password, key_length=32): bs = AES.block_size salt = Random.new().read(bs - len('Salted__')) key, iv = self.derive_key_and_iv(password, salt, key_length, bs) cipher = AES.new(key, AES.MODE_CBC, iv) out_file.write('Salted__' + salt) finished = False while not finished: chunk = in_file.read(1024 * bs) if len(chunk) == 0 or len(chunk) % bs != 0: padding_length = bs - (len(chunk) % bs) chunk += padding_length * chr(padding_length) finished = True out_file.write(cipher.encrypt(chunk))
def enc(self, chunk): """Encrypt a chunk of bytes and returned encrypted chunk. Initialization vector is randomly generated for each chunk. Args: chunk (bytes): Plain text data to be encrypted. Returns: bytes: Encrypted chunk. Encrypted chunk is bytes object consisting of TAG and cipher text. Cipher text is IV + encrypted chunk. TAG is HMAC of cipher text using hashed_aes_key as key. """ if not chunk: return bytes(0) checksum = HMAC.new(self.hashed_aes_key, digestmod=self.hash_func) chunk += bytes((AES.block_size - len(chunk)) % AES.block_size) enc_data = self._aes_enc(chunk) checksum.update(enc_data) return checksum.digest() + enc_data
def encrypt(key, text, isfile=False): ciphertexts = '' print_text = str(text[0:min(50, len(text))]) + '...' print "Begin to encrypt:", print_text if not isfile: for start in xrange(0, len(text), chunk_size(key)): end = start + chunk_size(key) chunk = text[start:end] ciphertext = key.encrypt(chunk, K=0)[0] ciphertexts = ciphertexts + base64.b64encode(ciphertext) # len(base64.b64encode(ciphertext)) = 344 else: iv = Random.new().read(AES.block_size) cipher = AES.new(key, AES.MODE_CFB, iv) ciphertexts = iv + cipher.encrypt(text) return ciphertexts
def encrypt(self, text): #????key ?????16?AES-128??24?AES-192???32?AES-256?Bytes ??.??AES-128??? cryptor = AES.new(self.key, self.mode, self.key) #?????????? ?????? ????base64????????????????????AES???????base64???? text = text.encode('utf8') text = base64.b64encode(text) text = text.decode('utf8') #???????text??16????????text???16???????????16??? text = text + (AES.block_size - (len(text) % AES.block_size)) * self.append self.ciphertext = cryptor.encrypt(text) #??AES??????????????ascii?????????????????????? #?????????????????16????? return b2a_hex(self.ciphertext).decode('utf8') #????????????strip() ??
def encryption_oracle_aes(payload): global constant, prefix_len, suffix_len, secret if secret: if constant: payload = random_str(prefix_len) + payload + secret else: payload = random_str(random.randint(1, 50)) + payload + secret else: if constant: payload = random_str(prefix_len) + payload + random_str(suffix_len) else: payload = random_str(random.randint(1, 50)) + payload + random_str(random.randint(1, 50)) payload = add_padding(payload, AES.block_size) cipher = AES.new(key_AES, AES.MODE_ECB) return cipher.encrypt(payload)
def encryption_oracle_des(payload): global constant, prefix_len, suffix_len, secret if secret: if constant: payload = random_str(prefix_len) + payload + secret else: payload = random_str(random.randint(1, 50)) + payload + secret else: if constant: payload = random_str(prefix_len) + payload + random_str(suffix_len) else: payload = random_str(random.randint(1, 50)) + payload + random_str(random.randint(1, 50)) payload = add_padding(payload, DES3.block_size) cipher = DES3.new(key_DES3, DES3.MODE_ECB) return cipher.encrypt(payload)
def test_fake_ciphertext_padding_oracle(amount=5): for _ in range(amount): new_plaintext = random_str(randint(1, 10)) new_plaintext_padded = add_padding(new_plaintext, block_size) print("Test small: cbc.fake_ciphertext(new_plaintext_padded, padding_oracle=padding_oracle)") new_ciphertext = cbc.fake_ciphertext(new_plaintext_padded, padding_oracle=padding_oracle) decrypted = h2b(subprocess.check_output( ['python', './cbc_oracles.py', 'decrypt', b2h(new_ciphertext)]).strip()) assert decrypted == new_plaintext for _ in range(amount): new_plaintext = random_str(randint(10, 50)) new_plaintext_padded = add_padding(new_plaintext, block_size) print("Test large: cbc.fake_ciphertext(new_plaintext_padded, padding_oracle=padding_oracle)") new_ciphertext = cbc.fake_ciphertext(new_plaintext_padded, padding_oracle=padding_oracle) decrypted = h2b(subprocess.check_output( ['python', './cbc_oracles.py', 'decrypt', b2h(new_ciphertext)]).strip()) assert decrypted == new_plaintext
def test_fake_ciphertext_decryption_oracle(amount=5): for _ in range(amount): new_plaintext = random_str(randint(1, 10)) new_plaintext_padded = add_padding(new_plaintext, block_size) print("Test small: cbc.fake_ciphertext(new_plaintext_padded, decryption_oracle=decryption_oracle)") new_ciphertext = cbc.fake_ciphertext(new_plaintext_padded, decryption_oracle=decryption_oracle) decrypted = h2b(subprocess.check_output( ['python', './cbc_oracles.py', 'decrypt', b2h(new_ciphertext)]).strip()) assert decrypted == new_plaintext for _ in range(amount): new_plaintext = random_str(randint(10, 50)) new_plaintext_padded = add_padding(new_plaintext, block_size) print("Test large: cbc.fake_ciphertext(new_plaintext_padded, decryption_oracle=decryption_oracle)") new_ciphertext = cbc.fake_ciphertext(new_plaintext_padded, decryption_oracle=decryption_oracle, padding_oracle=padding_oracle) decrypted = h2b(subprocess.check_output( ['python', './cbc_oracles.py', 'decrypt', b2h(new_ciphertext)]).strip()) assert decrypted == new_plaintext
def test_iv_as_key(from_test=1): global iv_as_key iv_as_key = True plaintext = random_str(randint(1, 40)) plaintext_padded = add_padding(plaintext, block_size) ciphertext = encrypt(plaintext, iv_as_key=iv_as_key) if from_test <= 1: print("Test 1: cbc.iv_as_key()") key = cbc.iv_as_key(ciphertext[AES.block_size:], plaintext_padded, padding_oracle=padding_oracle) assert key == KEY if from_test <= 2: print("Test 2: cbc.iv_as_key()") key = cbc.iv_as_key(ciphertext[AES.block_size:], plaintext_padded, decryption_oracle=decryption_oracle) assert key == KEY iv_as_key = False
def encrypt(self, plaintext, block_size, iv=None): '''Encryption cannot be multithreaded''' plaintext = pkcs7pad(plaintext, block_size) if iv is None: iv = '\x00' * block_size encrypted = iv blocks = chunk(plaintext, block_size) self.resultq = Queue() block = iv for prev in blocks[::-1]: self.bust(block, block_size) _, inter = self.pop_result() block = xor(inter, prev) encrypted = block + encrypted return encrypted
def test_find_blocksize(self): def encryption_oracle(s): pt = pkcs7pad(s, AES.block_size) if mode == 'ECB': return encrypt_ecb(pt, key) elif mode == 'CBC': return encrypt_cbc(pt, key, iv) for mode in ['ECB', 'CBC']: for key_size in AES.key_size: for _ in xrange(100): key = random_bytes(key_size) iv = random_bytes(AES.block_size) self.assertTrue(AES.block_size == find_blocksize(encryption_oracle))
def encrypt(raw, aes_key=secret): iv = Random.new().read( AES.block_size ) cipher = AES.new(aes_key, AES.MODE_CFB, iv ) return (base64.b64encode( iv + cipher.encrypt( raw ) ) )
def encrypt(raw): raw = pad(raw) iv = Random.new().read(AES.block_size) cipher = AES.new(KEY, AES.MODE_CBC, iv) return base64.b64encode( iv + cipher.encrypt(raw) )
def fetch_encyption_key(file_name): with open(file_name, 'r') as f: return f.read(AES.block_size)
def get_initialization_vector(schema_id, initialization_vector_array=None): if initialization_vector_array is None: initialization_vector_array = os.urandom(AES.block_size) _verify_initialization_vector_params(initialization_vector_array) return MetaAttribute( schema_id=schema_id, payload_data=initialization_vector_array )
def encrypt(self, raw): iv = Random.new().read(AES.block_size) cipher = AES.new(self.key, AES.MODE_CBC, iv) return iv + cipher.encrypt(self._pad(raw))
def decrypt(self, enc): iv = enc[:AES.block_size] cipher = AES.new(self.key, AES.MODE_CBC, iv) return self._unpad(cipher.decrypt(enc[AES.block_size:]))
def __init__(self): self.counter = Counter.new(nbits=self.block_size*8, initial_value=0, little_endian=True) self.key = None # Set some helper constants self.block_size_shift = exact_log2(self.block_size) assert (1 << self.block_size_shift) == self.block_size self.blocks_per_key = exact_div(self.key_size, self.block_size) assert self.key_size == self.blocks_per_key * self.block_size self.max_bytes_per_request = self.max_blocks_per_request * self.block_size
def _generate_blocks(self, num_blocks): if self.key is None: raise AssertionError("generator must be seeded before use") assert 0 <= num_blocks <= self.max_blocks_per_request retval = [] for i in xrange(num_blocks >> 12): # xrange(num_blocks / 4096) retval.append(self._cipher.encrypt(self._four_kiblocks_of_zeros)) remaining_bytes = (num_blocks & 4095) << self.block_size_shift # (num_blocks % 4095) * self.block_size retval.append(self._cipher.encrypt(self._four_kiblocks_of_zeros[:remaining_bytes])) return b("").join(retval) # vim:set ts=4 sw=4 sts=4 expandtab:
def _encrypt(passphrase, data): from Crypto.Cipher import AES as cipher leftover = len(data) % cipher.block_size if leftover: data += ' '*(cipher.block_size - leftover) return cipher.new(md5.new(passphrase).digest()[:16]).encrypt(data)
def EncryptWithFixedNonce(plaintextes): """Challenge setup: encrypt all the plaintextes separately with a fixed nonce (0) and fixed random key.""" result = [] key = Random.new().read(AES.block_size) for pt in plaintextes: ct = ctr.DoCTR(base64.b64decode(pt), key, 0) result.append(ct) return result
def Pkcs7(string): """Applies Pkcs7 padding to a byte string.""" numPadding = 0 if len(string) % AES.block_size != 0: numPadding = AES.block_size - (len(string) % AES.block_size) paddedString = string for i in range(0, numPadding): paddedString += bytes([numPadding]) return paddedString
def VerifyPkcs7(string): """Returns True or False depending on whether the byte string 'string' has valid Pkcs7 padding applied.""" paddingFound = False # The string was not padded properly, since it is not of the # proper size for AES. if len(string) % AES.block_size != 0: raise Exception('String has not been padded properly.') index = len(string) - 1 lastCh = string[index] # The padding bytes cannot be larger than the block size # by definition. if lastCh > AES.block_size: return False num_padding = 1 while True: index -= 1 if string[index] != lastCh: break num_padding += 1 return num_padding == lastCh
def Block(text, numBlock): """ Convenience function to return the block with index numBlock in the text. Blocks are of AES.block_size size. Indices start from 1, not 0. """ block = text[(numBlock - 1) * AES.block_size : numBlock * AES.block_size] return block
def DoCTR(text, key, nonce): """The algorithm is the same for both encryption and decryption.""" # Uses ECB inside for a single block. cipher = AES.new(key=key, mode=AES.MODE_ECB) resultingText = b'' keystream = b'' # There is no padding in CTR mode, so both a plaintext and a ciphertext # can be of non-exact block size. We decrypt the last block by generating # the keystream and only using the bytes we need. numBlocks = int(len(text) / AES.block_size) counter = 0 # +1 to get the final bytes that do not fit in a block. for blockIndex in range(0, numBlocks + 1): # If at the end, simply get all the remaining bytes; there # will be exactly AES.block_size or less of them. Otherwise, # get a block. if blockIndex == numBlocks: block = text[blockIndex * AES.block_size :] else: block = text[blockIndex * AES.block_size : (blockIndex + 1) * AES.block_size] # The keystream generator is composed by 8 bytes of the nonce + # 8 bytes of counter, little-endian. keystreamGen = bytes([nonce]) * 8 # This is a quick hack that only works if there are not enough # blocks of text to need the second digit of the counter too. # Since that is a lot of blocks we can be a bit lazy. keystreamGen += bytes([counter, 0, 0, 0, 0, 0, 0, 0]) # Careful, we always encrypt the keystream even when decrypting. keystream = cipher.encrypt(keystreamGen) # The PT is simply the CT XORed with the keystream, and vice versa. # Since the PT only depends on the same block of CT plus a predictable # keystream, CTR is very suited for parallel decryption of many blocks. resultingText += BlockXOR(block, keystream) counter += 1 return resultingText