我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用Crypto.Cipher.AES.MODE_CCM。
def __init__(self, name, cipher, mode, block_size=None, iv_size=None, key_size=None, icv_size=None): """ @param name: the name of this encryption algorithm @param cipher: a Cipher module @param mode: the mode used with the cipher module @param block_size: the length a block for this algo. Defaults to the `block_size` of the cipher. @param iv_size: the length of the initialization vector of this algo. Defaults to the `block_size` of the cipher. @param key_size: an integer or list/tuple of integers. If specified, force the secret keys length to one of the values. Defaults to the `key_size` of the cipher. """ self.name = name self.cipher = cipher self.mode = mode self.icv_size = icv_size self.is_aead = (hasattr(self.cipher, 'MODE_GCM') and self.mode == self.cipher.MODE_GCM) or \ (hasattr(self.cipher, 'MODE_CCM') and self.mode == self.cipher.MODE_CCM) if block_size is not None: self.block_size = block_size elif cipher is not None: self.block_size = cipher.block_size else: self.block_size = 1 if iv_size is None: self.iv_size = self.block_size else: self.iv_size = iv_size if key_size is not None: self.key_size = key_size elif cipher is not None: self.key_size = cipher.key_size else: self.key_size = None
def sendSMB(self, packet): # The idea here is to receive multiple/single commands and create a compound request, and send it # Should return the MessageID for later retrieval. Implement compounded related requests. # If Connection.Dialect is equal to "3.000" and if Connection.SupportsMultiChannel or # Connection.SupportsPersistentHandles is TRUE, the client MUST set ChannelSequence in the # SMB2 header to Session.ChannelSequence # Check this is not a CANCEL request. If so, don't consume sequece numbers if packet['Command'] is not SMB2_CANCEL: packet['MessageID'] = self._Connection['SequenceWindow'] self._Connection['SequenceWindow'] += 1 packet['SessionID'] = self._Session['SessionID'] # Default the credit charge to 1 unless set by the caller if packet.fields.has_key('CreditCharge') is False: packet['CreditCharge'] = 1 # Standard credit request after negotiating protocol if self._Connection['SequenceWindow'] > 3: packet['CreditRequestResponse'] = 127 messageId = packet['MessageID'] if self._Session['SigningActivated'] is True and self._Connection['SequenceWindow'] > 2: if packet['TreeID'] > 0 and self._Session['TreeConnectTable'].has_key(packet['TreeID']) is True: if self._Session['TreeConnectTable'][packet['TreeID']]['EncryptData'] is False: packet['Flags'] = SMB2_FLAGS_SIGNED self.signSMB(packet) elif packet['TreeID'] == 0: packet['Flags'] = SMB2_FLAGS_SIGNED self.signSMB(packet) if (self._Session['SessionFlags'] & SMB2_SESSION_FLAG_ENCRYPT_DATA) or ( packet['TreeID'] != 0 and self._Session['TreeConnectTable'][packet['TreeID']]['EncryptData'] is True): plainText = str(packet) transformHeader = SMB2_TRANSFORM_HEADER() transformHeader['Nonce'] = ''.join([random.choice(string.letters) for i in range(11)]) transformHeader['OriginalMessageSize'] = len(plainText) transformHeader['EncryptionAlgorithm'] = SMB2_ENCRYPTION_AES128_CCM transformHeader['SessionID'] = self._Session['SessionID'] from Crypto.Cipher import AES try: AES.MODE_CCM except: LOG.critical("Your pycrypto doesn't support AES.MODE_CCM. Currently only pycrypto experimental supports this mode.\nDownload it from https://www.dlitz.net/software/pycrypto ") raise cipher = AES.new(self._Session['EncryptionKey'], AES.MODE_CCM, transformHeader['Nonce']) cipher.update(str(transformHeader)[20:]) cipherText = cipher.encrypt(plainText) transformHeader['Signature'] = cipher.digest() packet = str(transformHeader) + cipherText self._NetBIOSSession.send_packet(str(packet)) return messageId
def recvSMB(self, packetID = None): # First, verify we don't have the packet already if self._Connection['OutstandingResponses'].has_key(packetID): return self._Connection['OutstandingResponses'].pop(packetID) data = self._NetBIOSSession.recv_packet(self._timeout) if data.get_trailer().startswith('\xfdSMB'): # Packet is encrypted transformHeader = SMB2_TRANSFORM_HEADER(data.get_trailer()) from Crypto.Cipher import AES try: AES.MODE_CCM except: LOG.critical("Your pycrypto doesn't support AES.MODE_CCM. Currently only pycrypto experimental supports this mode.\nDownload it from https://www.dlitz.net/software/pycrypto ") raise cipher = AES.new(self._Session['DecryptionKey'], AES.MODE_CCM, transformHeader['Nonce'][:11]) cipher.update(str(transformHeader)[20:]) plainText = cipher.decrypt(data.get_trailer()[len(SMB2_TRANSFORM_HEADER()):]) #cipher.verify(transformHeader['Signature']) packet = SMB2Packet(plainText) else: # In all SMB dialects for a response this field is interpreted as the Status field. # This field can be set to any value. For a list of valid status codes, # see [MS-ERREF] section 2.3. packet = SMB2Packet(data.get_trailer()) # Loop while we receive pending requests if packet['Status'] == STATUS_PENDING: status = STATUS_PENDING while status == STATUS_PENDING: data = self._NetBIOSSession.recv_packet(self._timeout) if data.get_trailer().startswith('\xfeSMB'): packet = SMB2Packet(data.get_trailer()) else: # Packet is encrypted transformHeader = SMB2_TRANSFORM_HEADER(data.get_trailer()) from Crypto.Cipher import AES try: AES.MODE_CCM except: LOG.critical("Your pycrypto doesn't support AES.MODE_CCM. Currently only pycrypto experimental supports this mode.\nDownload it from https://www.dlitz.net/software/pycrypto ") raise cipher = AES.new(self._Session['DecryptionKey'], AES.MODE_CCM, transformHeader['Nonce'][:11]) cipher.update(str(transformHeader)[20:]) plainText = cipher.decrypt(data.get_trailer()[len(SMB2_TRANSFORM_HEADER()):]) #cipher.verify(transformHeader['Signature']) packet = SMB2Packet(plainText) status = packet['Status'] if packet['MessageID'] == packetID or packetID is None: # Let's update the sequenceWindow based on the CreditsCharged # In the SMB 2.0.2 dialect, this field MUST NOT be used and MUST be reserved. # The sender MUST set this to 0, and the receiver MUST ignore it. # In all other dialects, this field indicates the number of credits that this request consumes. if self._Connection['Dialect'] > SMB2_DIALECT_002: self._Connection['SequenceWindow'] += (packet['CreditCharge'] - 1) return packet else: self._Connection['OutstandingResponses'][packet['MessageID']] = packet return self.recvSMB(packetID)
def recvSMB(self, packetID = None): # First, verify we don't have the packet already if self._Connection['OutstandingResponses'].has_key(packetID): return self._Connection['OutstandingResponses'].pop(packetID) data = self._NetBIOSSession.recv_packet(self._timeout) if data.get_trailer().startswith('\xfdSMB'): # Packet is encrypted transformHeader = SMB2_TRANSFORM_HEADER(data.get_trailer()) from Crypto.Cipher import AES try: AES.MODE_CCM except: LOG.critical("Your pycrypto doesn't support AES.MODE_CCM. Currently only pycrypto experimental supports this mode.\nDownload it from https://www.dlitz.net/software/pycrypto ") raise cipher = AES.new(self._Session['DecryptionKey'], AES.MODE_CCM, transformHeader['Nonce'][:11]) cipher.update(str(transformHeader)[20:]) plainText = cipher.decrypt(data.get_trailer()[len(SMB2_TRANSFORM_HEADER()):]) #cipher.verify(transformHeader['Signature']) packet = SMB2Packet(plainText) else: # In all SMB dialects for a response this field is interpreted as the Status field. # This field can be set to any value. For a list of valid status codes, # see [MS-ERREF] section 2.3. packet = SMB2Packet(data.get_trailer()) # Loop while we receive pending requests if packet['Status'] == STATUS_PENDING: status = STATUS_PENDING while status == STATUS_PENDING: data = self._NetBIOSSession.recv_packet(self._timeout) if data.get_trailer().startswith('\xfeSMB'): packet = SMB2Packet(data.get_trailer()) else: # Packet is encrypted transformHeader = SMB2_TRANSFORM_HEADER(data.get_trailer()) from Crypto.Cipher import AES try: AES.MODE_CCM except: LOG.critical("Your pycrypto doesn't support AES.MODE_CCM. Currently only pycrypto experimental supports this mode.\nDownload it from https://www.dlitz.net/software/pycrypto ") raise cipher = AES.new(self._Session['DecryptionKey'], AES.MODE_CCM, transformHeader['Nonce'][:11]) cipher.update(str(transformHeader)[20:]) plainText = cipher.decrypt(data.get_trailer()[len(SMB2_TRANSFORM_HEADER()):]) #cipher.verify(transformHeader['Signature']) packet = SMB2Packet(plainText) status = packet['Status'] if packet['MessageID'] == packetID or packetID is None: # if self._Session['SigningRequired'] is True: # self.signSMB(packet) # Let's update the sequenceWindow based on the CreditsCharged self._Connection['SequenceWindow'] += (packet['CreditCharge'] - 1) return packet else: self._Connection['OutstandingResponses'][packet['MessageID']] = packet return self.recvSMB(packetID)