Python email 模块,message() 实例源码

我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用email.message()

项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def __setitem__(self, key, message):
        """Replace the keyed message; raise KeyError if it doesn't exist."""
        old_subpath = self._lookup(key)
        temp_key = self.add(message)
        temp_subpath = self._lookup(temp_key)
        if isinstance(message, MaildirMessage):
            # temp's subdir and suffix were specified by message.
            dominant_subpath = temp_subpath
        else:
            # temp's subdir and suffix were defaults from add().
            dominant_subpath = old_subpath
        subdir = os.path.dirname(dominant_subpath)
        if self.colon in dominant_subpath:
            suffix = self.colon + dominant_subpath.split(self.colon)[-1]
        else:
            suffix = ''
        self.discard(key)
        new_path = os.path.join(self._path, subdir, key + suffix)
        os.rename(os.path.join(self._path, temp_subpath), new_path)
        if isinstance(message, MaildirMessage):
            os.utime(new_path, (os.path.getatime(new_path),
                                message.get_date()))
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def remove_folder(self, folder):
        """Delete the named folder, which must be empty."""
        path = os.path.join(self._path, '.' + folder)
        for entry in os.listdir(os.path.join(path, 'new')) + \
                     os.listdir(os.path.join(path, 'cur')):
            if len(entry) < 1 or entry[0] != '.':
                raise NotEmptyError('Folder contains message(s): %s' % folder)
        for entry in os.listdir(path):
            if entry != 'new' and entry != 'cur' and entry != 'tmp' and \
               os.path.isdir(os.path.join(path, entry)):
                raise NotEmptyError("Folder contains subdirectory '%s': %s" %
                                    (folder, entry))
        for root, dirs, files in os.walk(path, topdown=False):
            for entry in files:
                os.remove(os.path.join(root, entry))
            for entry in dirs:
                os.rmdir(os.path.join(root, entry))
        os.rmdir(path)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def get_string(self, key):
        """Return a string representation or raise a KeyError."""
        try:
            if self._locked:
                f = open(os.path.join(self._path, str(key)), 'r+')
            else:
                f = open(os.path.join(self._path, str(key)), 'r')
        except IOError, e:
            if e.errno == errno.ENOENT:
                raise KeyError('No message with key: %s' % key)
            else:
                raise
        try:
            if self._locked:
                _lock_file(f)
            try:
                return f.read()
            finally:
                if self._locked:
                    _unlock_file(f)
        finally:
            f.close()
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_get_decoded_payload(self):
        eq = self.assertEqual
        msg = self._msgobj('msg_10.txt')
        # The outer message is a multipart
        eq(msg.get_payload(decode=True), None)
        # Subpart 1 is 7bit encoded
        eq(msg.get_payload(0).get_payload(decode=True),
           'This is a 7bit encoded message.\n')
        # Subpart 2 is quopri
        eq(msg.get_payload(1).get_payload(decode=True),
           '\xa1This is a Quoted Printable encoded message!\n')
        # Subpart 3 is base64
        eq(msg.get_payload(2).get_payload(decode=True),
           'This is a Base64 encoded message.')
        # Subpart 4 has no Content-Transfer-Encoding: header.
        eq(msg.get_payload(3).get_payload(decode=True),
           'This has no Content-Transfer-Encoding: header.\n')
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_invalid_content_type(self):
        eq = self.assertEqual
        neq = self.ndiffAssertEqual
        msg = Message()
        # RFC 2045, $5.2 says invalid yields text/plain
        msg['Content-Type'] = 'text'
        eq(msg.get_content_maintype(), 'text')
        eq(msg.get_content_subtype(), 'plain')
        eq(msg.get_content_type(), 'text/plain')
        # Clear the old value and try something /really/ invalid
        del msg['content-type']
        msg['Content-Type'] = 'foo'
        eq(msg.get_content_maintype(), 'text')
        eq(msg.get_content_subtype(), 'plain')
        eq(msg.get_content_type(), 'text/plain')
        # Still, make sure that the message is idempotently generated
        s = StringIO()
        g = Generator(s)
        g.flatten(msg)
        neq(s.getvalue(), 'Content-Type: foo\n\n')
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_missing_start_boundary(self):
        outer = self._msgobj('msg_42.txt')
        # The message structure is:
        #
        # multipart/mixed
        #    text/plain
        #    message/rfc822
        #        multipart/mixed [*]
        #
        # [*] This message is missing its start boundary
        bad = outer.get_payload(1).get_payload(0)
        self.assertEqual(len(bad.defects), 1)
        self.failUnless(isinstance(bad.defects[0],
                                   errors.StartBoundaryNotFoundDefect))



# Test RFC 2047 header encoding and decoding
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_generate(self):
        # First craft the message to be encapsulated
        m = Message()
        m['Subject'] = 'An enclosed message'
        m.set_payload('Here is the body of the message.\n')
        r = MIMEMessage(m)
        r['Subject'] = 'The enclosing message'
        s = StringIO()
        g = Generator(s)
        g.flatten(r)
        self.assertEqual(s.getvalue(), """\
Content-Type: message/rfc822
MIME-Version: 1.0
Subject: The enclosing message

Subject: An enclosed message

Here is the body of the message.
""")
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_default_type(self):
        eq = self.assertEqual
        fp = openfile('msg_30.txt')
        try:
            msg = email.message_from_file(fp)
        finally:
            fp.close()
        container1 = msg.get_payload(0)
        eq(container1.get_default_type(), 'message/rfc822')
        eq(container1.get_content_type(), 'message/rfc822')
        container2 = msg.get_payload(1)
        eq(container2.get_default_type(), 'message/rfc822')
        eq(container2.get_content_type(), 'message/rfc822')
        container1a = container1.get_payload(0)
        eq(container1a.get_default_type(), 'text/plain')
        eq(container1a.get_content_type(), 'text/plain')
        container2a = container2.get_payload(0)
        eq(container2a.get_default_type(), 'text/plain')
        eq(container2a.get_content_type(), 'text/plain')
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_default_type_with_explicit_container_type(self):
        eq = self.assertEqual
        fp = openfile('msg_28.txt')
        try:
            msg = email.message_from_file(fp)
        finally:
            fp.close()
        container1 = msg.get_payload(0)
        eq(container1.get_default_type(), 'message/rfc822')
        eq(container1.get_content_type(), 'message/rfc822')
        container2 = msg.get_payload(1)
        eq(container2.get_default_type(), 'message/rfc822')
        eq(container2.get_content_type(), 'message/rfc822')
        container1a = container1.get_payload(0)
        eq(container1a.get_default_type(), 'text/plain')
        eq(container1a.get_content_type(), 'text/plain')
        container2a = container2.get_payload(0)
        eq(container2a.get_default_type(), 'text/plain')
        eq(container2a.get_content_type(), 'text/plain')
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_mime_attachments_in_constructor(self):
        eq = self.assertEqual
        text1 = MIMEText('')
        text2 = MIMEText('')
        msg = MIMEMultipart(_subparts=(text1, text2))
        eq(len(msg.get_payload()), 2)
        eq(msg.get_payload(0), text1)
        eq(msg.get_payload(1), text2)



# A general test of parser->model->generator idempotency.  IOW, read a message
# in, parse it into a message object tree, then without touching the tree,
# regenerate the plain text.  The original text and the transformed text
# should be identical.  Note: that we ignore the Unix-From since that may
# contain a changed date.
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test__all__(self):
        module = __import__('email')
        # Can't use sorted() here due to Python 2.3 compatibility
        all = module.__all__[:]
        all.sort()
        self.assertEqual(all, [
            # Old names
            'Charset', 'Encoders', 'Errors', 'Generator',
            'Header', 'Iterators', 'MIMEAudio', 'MIMEBase',
            'MIMEImage', 'MIMEMessage', 'MIMEMultipart',
            'MIMENonMultipart', 'MIMEText', 'Message',
            'Parser', 'Utils', 'base64MIME',
            # new names
            'base64mime', 'charset', 'encoders', 'errors', 'generator',
            'header', 'iterators', 'message', 'message_from_file',
            'message_from_string', 'mime', 'parser',
            'quopriMIME', 'quoprimime', 'utils',
            ])
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_body_line_iterator(self):
        eq = self.assertEqual
        neq = self.ndiffAssertEqual
        # First a simple non-multipart message
        msg = self._msgobj('msg_01.txt')
        it = iterators.body_line_iterator(msg)
        lines = list(it)
        eq(len(lines), 6)
        neq(EMPTYSTRING.join(lines), msg.get_payload())
        # Now a more complicated multipart
        msg = self._msgobj('msg_02.txt')
        it = iterators.body_line_iterator(msg)
        lines = list(it)
        eq(len(lines), 43)
        fp = openfile('msg_19.txt')
        try:
            neq(EMPTYSTRING.join(lines), fp.read())
        finally:
            fp.close()
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_typed_subpart_iterator_default_type(self):
        eq = self.assertEqual
        msg = self._msgobj('msg_03.txt')
        it = iterators.typed_subpart_iterator(msg, 'text', 'plain')
        lines = []
        subparts = 0
        for subpart in it:
            subparts += 1
            lines.append(subpart.get_payload())
        eq(subparts, 1)
        eq(EMPTYSTRING.join(lines), """\

Hi,

Do you like this message?

-Me
""")
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_get_decoded_payload(self):
        eq = self.assertEqual
        msg = self._msgobj('msg_10.txt')
        # The outer message is a multipart
        eq(msg.get_payload(decode=True), None)
        # Subpart 1 is 7bit encoded
        eq(msg.get_payload(0).get_payload(decode=True),
           'This is a 7bit encoded message.\n')
        # Subpart 2 is quopri
        eq(msg.get_payload(1).get_payload(decode=True),
           '\xa1This is a Quoted Printable encoded message!\n')
        # Subpart 3 is base64
        eq(msg.get_payload(2).get_payload(decode=True),
           'This is a Base64 encoded message.')
        # Subpart 4 is base64 with a trailing newline, which
        # used to be stripped (issue 7143).
        eq(msg.get_payload(3).get_payload(decode=True),
           'This is a Base64 encoded message.\n')
        # Subpart 5 has no Content-Transfer-Encoding: header.
        eq(msg.get_payload(4).get_payload(decode=True),
           'This has no Content-Transfer-Encoding: header.\n')
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_as_string(self):
        eq = self.assertEqual
        msg = self._msgobj('msg_01.txt')
        fp = openfile('msg_01.txt')
        try:
            # BAW 30-Mar-2009 Evil be here.  So, the generator is broken with
            # respect to long line breaking.  It's also not idempotent when a
            # header from a parsed message is continued with tabs rather than
            # spaces.  Before we fixed bug 1974 it was reversedly broken,
            # i.e. headers that were continued with spaces got continued with
            # tabs.  For Python 2.x there's really no good fix and in Python
            # 3.x all this stuff is re-written to be right(er).  Chris Withers
            # convinced me that using space as the default continuation
            # character is less bad for more applications.
            text = fp.read().replace('\t', ' ')
        finally:
            fp.close()
        self.ndiffAssertEqual(text, msg.as_string())
        fullrepr = str(msg)
        lines = fullrepr.split('\n')
        self.assertTrue(lines[0].startswith('From '))
        eq(text, NL.join(lines[1:]))
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_binary_body_with_encode_noop(self):
        # Issue 16564: This does not produce an RFC valid message, since to be
        # valid it should have a CTE of binary.  But the below works, and is
        # documented as working this way.
        bytesdata = b'\xfa\xfb\xfc\xfd\xfe\xff'
        msg = MIMEApplication(bytesdata, _encoder=encoders.encode_noop)
        self.assertEqual(msg.get_payload(), bytesdata)
        self.assertEqual(msg.get_payload(decode=True), bytesdata)
        s = StringIO()
        g = Generator(s)
        g.flatten(msg)
        wireform = s.getvalue()
        msg2 = email.message_from_string(wireform)
        self.assertEqual(msg.get_payload(), bytesdata)
        self.assertEqual(msg2.get_payload(decode=True), bytesdata)


# Test the basic MIMEText class
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_invalid_content_type(self):
        eq = self.assertEqual
        neq = self.ndiffAssertEqual
        msg = Message()
        # RFC 2045, $5.2 says invalid yields text/plain
        msg['Content-Type'] = 'text'
        eq(msg.get_content_maintype(), 'text')
        eq(msg.get_content_subtype(), 'plain')
        eq(msg.get_content_type(), 'text/plain')
        # Clear the old value and try something /really/ invalid
        del msg['content-type']
        msg['Content-Type'] = 'foo'
        eq(msg.get_content_maintype(), 'text')
        eq(msg.get_content_subtype(), 'plain')
        eq(msg.get_content_type(), 'text/plain')
        # Still, make sure that the message is idempotently generated
        s = StringIO()
        g = Generator(s)
        g.flatten(msg)
        neq(s.getvalue(), 'Content-Type: foo\n\n')
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_no_start_boundary(self):
        eq = self.ndiffAssertEqual
        msg = self._msgobj('msg_31.txt')
        eq(msg.get_payload(), """\
--BOUNDARY
Content-Type: text/plain

message 1

--BOUNDARY
Content-Type: text/plain

message 2

--BOUNDARY--
""")
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_generate(self):
        # First craft the message to be encapsulated
        m = Message()
        m['Subject'] = 'An enclosed message'
        m.set_payload('Here is the body of the message.\n')
        r = MIMEMessage(m)
        r['Subject'] = 'The enclosing message'
        s = StringIO()
        g = Generator(s)
        g.flatten(r)
        self.assertEqual(s.getvalue(), """\
Content-Type: message/rfc822
MIME-Version: 1.0
Subject: The enclosing message

Subject: An enclosed message

Here is the body of the message.
""")
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_default_type(self):
        eq = self.assertEqual
        fp = openfile('msg_30.txt')
        try:
            msg = email.message_from_file(fp)
        finally:
            fp.close()
        container1 = msg.get_payload(0)
        eq(container1.get_default_type(), 'message/rfc822')
        eq(container1.get_content_type(), 'message/rfc822')
        container2 = msg.get_payload(1)
        eq(container2.get_default_type(), 'message/rfc822')
        eq(container2.get_content_type(), 'message/rfc822')
        container1a = container1.get_payload(0)
        eq(container1a.get_default_type(), 'text/plain')
        eq(container1a.get_content_type(), 'text/plain')
        container2a = container2.get_payload(0)
        eq(container2a.get_default_type(), 'text/plain')
        eq(container2a.get_content_type(), 'text/plain')
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_default_type_with_explicit_container_type(self):
        eq = self.assertEqual
        fp = openfile('msg_28.txt')
        try:
            msg = email.message_from_file(fp)
        finally:
            fp.close()
        container1 = msg.get_payload(0)
        eq(container1.get_default_type(), 'message/rfc822')
        eq(container1.get_content_type(), 'message/rfc822')
        container2 = msg.get_payload(1)
        eq(container2.get_default_type(), 'message/rfc822')
        eq(container2.get_content_type(), 'message/rfc822')
        container1a = container1.get_payload(0)
        eq(container1a.get_default_type(), 'text/plain')
        eq(container1a.get_content_type(), 'text/plain')
        container2a = container2.get_payload(0)
        eq(container2a.get_default_type(), 'text/plain')
        eq(container2a.get_content_type(), 'text/plain')
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_parser(self):
        eq = self.assertEqual
        msg, text = self._msgobj('msg_06.txt')
        # Check some of the outer headers
        eq(msg.get_content_type(), 'message/rfc822')
        # Make sure the payload is a list of exactly one sub-Message, and that
        # that submessage has a type of text/plain
        payload = msg.get_payload()
        self.assertIsInstance(payload, list)
        eq(len(payload), 1)
        msg1 = payload[0]
        self.assertIsInstance(msg1, Message)
        eq(msg1.get_content_type(), 'text/plain')
        self.assertIsInstance(msg1.get_payload(), str)
        eq(msg1.get_payload(), '\n')



# Test various other bits of the package's functionality
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test__all__(self):
        module = __import__('email')
        # Can't use sorted() here due to Python 2.3 compatibility
        all = module.__all__[:]
        all.sort()
        self.assertEqual(all, [
            # Old names
            'Charset', 'Encoders', 'Errors', 'Generator',
            'Header', 'Iterators', 'MIMEAudio', 'MIMEBase',
            'MIMEImage', 'MIMEMessage', 'MIMEMultipart',
            'MIMENonMultipart', 'MIMEText', 'Message',
            'Parser', 'Utils', 'base64MIME',
            # new names
            'base64mime', 'charset', 'encoders', 'errors', 'generator',
            'header', 'iterators', 'message', 'message_from_file',
            'message_from_string', 'mime', 'parser',
            'quopriMIME', 'quoprimime', 'utils',
            ])
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_body_line_iterator(self):
        eq = self.assertEqual
        neq = self.ndiffAssertEqual
        # First a simple non-multipart message
        msg = self._msgobj('msg_01.txt')
        it = iterators.body_line_iterator(msg)
        lines = list(it)
        eq(len(lines), 6)
        neq(EMPTYSTRING.join(lines), msg.get_payload())
        # Now a more complicated multipart
        msg = self._msgobj('msg_02.txt')
        it = iterators.body_line_iterator(msg)
        lines = list(it)
        eq(len(lines), 43)
        fp = openfile('msg_19.txt')
        try:
            neq(EMPTYSTRING.join(lines), fp.read())
        finally:
            fp.close()
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def remove_folder(self, folder):
        """Delete the named folder, which must be empty."""
        path = os.path.join(self._path, '.' + folder)
        for entry in os.listdir(os.path.join(path, 'new')) + \
                     os.listdir(os.path.join(path, 'cur')):
            if len(entry) < 1 or entry[0] != '.':
                raise NotEmptyError('Folder contains message(s): %s' % folder)
        for entry in os.listdir(path):
            if entry != 'new' and entry != 'cur' and entry != 'tmp' and \
               os.path.isdir(os.path.join(path, entry)):
                raise NotEmptyError("Folder contains subdirectory '%s': %s" %
                                    (folder, entry))
        for root, dirs, files in os.walk(path, topdown=False):
            for entry in files:
                os.remove(os.path.join(root, entry))
            for entry in dirs:
                os.rmdir(os.path.join(root, entry))
        os.rmdir(path)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _install_message(self, message):
        """Format a message and blindly write to self._file."""
        from_line = None
        if isinstance(message, str) and message.startswith('From '):
            newline = message.find('\n')
            if newline != -1:
                from_line = message[:newline]
                message = message[newline + 1:]
            else:
                from_line = message
                message = ''
        elif isinstance(message, _mboxMMDFMessage):
            from_line = 'From ' + message.get_from()
        elif isinstance(message, email.message.Message):
            from_line = message.get_unixfrom()  # May be None.
        if from_line is None:
            from_line = 'From MAILER-DAEMON %s' % time.asctime(time.gmtime())
        start = self._file.tell()
        self._file.write(from_line + os.linesep)
        self._dump_message(message, self._file, self._mangle_from_)
        stop = self._file.tell()
        return (start, stop)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def __setitem__(self, key, message):
        """Replace the keyed message; raise KeyError if it doesn't exist."""
        path = os.path.join(self._path, str(key))
        try:
            f = open(path, 'rb+')
        except IOError, e:
            if e.errno == errno.ENOENT:
                raise KeyError('No message with key: %s' % key)
            else:
                raise
        try:
            if self._locked:
                _lock_file(f)
            try:
                os.close(os.open(path, os.O_WRONLY | os.O_TRUNC))
                self._dump_message(message, f)
                if isinstance(message, MHMessage):
                    self._dump_sequences(message, key)
            finally:
                if self._locked:
                    _unlock_file(f)
        finally:
            _sync_close(f)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def get_string(self, key):
        """Return a string representation or raise a KeyError."""
        try:
            if self._locked:
                f = open(os.path.join(self._path, str(key)), 'r+')
            else:
                f = open(os.path.join(self._path, str(key)), 'r')
        except IOError, e:
            if e.errno == errno.ENOENT:
                raise KeyError('No message with key: %s' % key)
            else:
                raise
        try:
            if self._locked:
                _lock_file(f)
            try:
                return f.read()
            finally:
                if self._locked:
                    _unlock_file(f)
        finally:
            f.close()
项目:httplib2    作者:httplib2    | 项目源码 | 文件源码
def __init__(self, info):
        # info is either an email.message or
        # an httplib.HTTPResponse object.
        if isinstance(info, http.client.HTTPResponse):
            for key, value in info.getheaders():
                key = key.lower()
                prev = self.get(key)
                if prev is not None:
                    value = ', '.join((prev, value))
                self[key] = value
            self.status = info.status
            self['status'] = str(self.status)
            self.reason = info.reason
            self.version = info.version
        elif isinstance(info, email.message.Message):
            for key, value in list(info.items()):
                self[key.lower()] = value
            self.status = int(self['status'])
        else:
            for key, value in info.items():
                self[key.lower()] = value
            self.status = int(self.get('status', self.status))
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def get_last_message(self, tablename):
        last_message = None
        # request mailbox list to the server if needed.
        if not isinstance(self.connection.mailbox_names, dict):
            self.get_mailboxes()
        try:
            result = self.connection.select(
                self.connection.mailbox_names[tablename])
            last_message = int(result[1][0])
            # Last message must be a positive integer
            if last_message == 0:
                last_message = 1
        except (IndexError, ValueError, TypeError, KeyError):
            e = sys.exc_info()[1]
            self.db.logger.debug("Error retrieving the last mailbox" +
                         " sequence number. %s" % str(e))
        return last_message
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def EQ(self,first,second):
        name = self.search_fields[first.name]
        result = None
        if name is not None:
            if name == "MESSAGE":
                # query by message sequence number
                result = "%s" % self.expand(second)
            elif name == "UID":
                result = "UID %s" % self.expand(second)
            elif name == "DATE":
                result = "ON %s" % self.convert_date(second)

            elif name in self.flags.values():
                if second:
                    result = "%s" % (name.upper()[1:])
                else:
                    result = "NOT %s" % (name.upper()[1:])
            else:
                raise Exception("Operation not supported")
        else:
            raise Exception("Operation not supported")
        return result
项目:SFBIStats    作者:royludo    | 项目源码 | 文件源码
def get_one_charset_payload(msg):
        """
        Returns one charset and payload from a mail, multipart or not.
        Takes the first charset that is defined in a mutlipart message, and returns this part's payload
        Charset can be None in some weird case that should be dismissed.
        Parameters
        ----------
        msg: email.message

        Returns
        -------
        string: charset of mail
        string: payload

        """
        if msg.is_multipart():
            for part in msg.walk():
                if part.get_content_charset() is not None:
                    return part.get_content_charset(), part.get_payload(decode=True)

        else:
            if msg.get_content_charset() is None:
                raise ValueError('Charset is None, message too weird.')
            return msg.get_content_charset(), msg.get_payload(decode=True)
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def test_get_decoded_payload(self):
        eq = self.assertEqual
        msg = self._msgobj('msg_10.txt')
        # The outer message is a multipart
        eq(msg.get_payload(decode=True), None)
        # Subpart 1 is 7bit encoded
        eq(msg.get_payload(0).get_payload(decode=True),
           'This is a 7bit encoded message.\n')
        # Subpart 2 is quopri
        eq(msg.get_payload(1).get_payload(decode=True),
           '\xa1This is a Quoted Printable encoded message!\n')
        # Subpart 3 is base64
        eq(msg.get_payload(2).get_payload(decode=True),
           'This is a Base64 encoded message.')
        # Subpart 4 is base64 with a trailing newline, which
        # used to be stripped (issue 7143).
        eq(msg.get_payload(3).get_payload(decode=True),
           'This is a Base64 encoded message.\n')
        # Subpart 5 has no Content-Transfer-Encoding: header.
        eq(msg.get_payload(4).get_payload(decode=True),
           'This has no Content-Transfer-Encoding: header.\n')
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def test_as_string(self):
        eq = self.assertEqual
        msg = self._msgobj('msg_01.txt')
        fp = openfile('msg_01.txt')
        try:
            # BAW 30-Mar-2009 Evil be here.  So, the generator is broken with
            # respect to long line breaking.  It's also not idempotent when a
            # header from a parsed message is continued with tabs rather than
            # spaces.  Before we fixed bug 1974 it was reversedly broken,
            # i.e. headers that were continued with spaces got continued with
            # tabs.  For Python 2.x there's really no good fix and in Python
            # 3.x all this stuff is re-written to be right(er).  Chris Withers
            # convinced me that using space as the default continuation
            # character is less bad for more applications.
            text = fp.read().replace('\t', ' ')
        finally:
            fp.close()
        self.ndiffAssertEqual(text, msg.as_string())
        fullrepr = str(msg)
        lines = fullrepr.split('\n')
        self.assertTrue(lines[0].startswith('From '))
        eq(text, NL.join(lines[1:]))
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def test_binary_body_with_encode_noop(self):
        # Issue 16564: This does not produce an RFC valid message, since to be
        # valid it should have a CTE of binary.  But the below works, and is
        # documented as working this way.
        bytesdata = b'\xfa\xfb\xfc\xfd\xfe\xff'
        msg = MIMEApplication(bytesdata, _encoder=encoders.encode_noop)
        self.assertEqual(msg.get_payload(), bytesdata)
        self.assertEqual(msg.get_payload(decode=True), bytesdata)
        s = StringIO()
        g = Generator(s)
        g.flatten(msg)
        wireform = s.getvalue()
        msg2 = email.message_from_string(wireform)
        self.assertEqual(msg.get_payload(), bytesdata)
        self.assertEqual(msg2.get_payload(decode=True), bytesdata)


# Test the basic MIMEText class
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def test_invalid_content_type(self):
        eq = self.assertEqual
        neq = self.ndiffAssertEqual
        msg = Message()
        # RFC 2045, $5.2 says invalid yields text/plain
        msg['Content-Type'] = 'text'
        eq(msg.get_content_maintype(), 'text')
        eq(msg.get_content_subtype(), 'plain')
        eq(msg.get_content_type(), 'text/plain')
        # Clear the old value and try something /really/ invalid
        del msg['content-type']
        msg['Content-Type'] = 'foo'
        eq(msg.get_content_maintype(), 'text')
        eq(msg.get_content_subtype(), 'plain')
        eq(msg.get_content_type(), 'text/plain')
        # Still, make sure that the message is idempotently generated
        s = StringIO()
        g = Generator(s)
        g.flatten(msg)
        neq(s.getvalue(), 'Content-Type: foo\n\n')
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def test_no_start_boundary(self):
        eq = self.ndiffAssertEqual
        msg = self._msgobj('msg_31.txt')
        eq(msg.get_payload(), """\
--BOUNDARY
Content-Type: text/plain

message 1

--BOUNDARY
Content-Type: text/plain

message 2

--BOUNDARY--
""")
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def add(self, message):
        """Add message and return assigned key."""
        raise NotImplementedError('Method must be implemented by subclass')
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def remove(self, key):
        """Remove the keyed message; raise KeyError if it doesn't exist."""
        raise NotImplementedError('Method must be implemented by subclass')
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def discard(self, key):
        """If the keyed message exists, remove it."""
        try:
            self.remove(key)
        except KeyError:
            pass
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def __setitem__(self, key, message):
        """Replace the keyed message; raise KeyError if it doesn't exist."""
        raise NotImplementedError('Method must be implemented by subclass')
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def get(self, key, default=None):
        """Return the keyed message, or default if it doesn't exist."""
        try:
            return self.__getitem__(key)
        except KeyError:
            return default
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def iteritems(self):
        """Return an iterator over (key, message) tuples."""
        for key in self.iterkeys():
            try:
                value = self[key]
            except KeyError:
                continue
            yield (key, value)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def items(self):
        """Return a list of (key, message) tuples. Memory intensive."""
        return list(self.iteritems())
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def has_key(self, key):
        """Return True if the keyed message exists, False otherwise."""
        raise NotImplementedError('Method must be implemented by subclass')
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def pop(self, key, default=None):
        """Delete the keyed message and return it, or default."""
        try:
            result = self[key]
        except KeyError:
            return default
        self.discard(key)
        return result