Python email 模块,message_from_bytes() 实例源码

我们从Python开源项目中,提取了以下44个代码示例,用于说明如何使用email.message_from_bytes()

项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_8bit_in_quopri_body(self):
        # This is non-RFC compliant data...without 'decode' the library code
        # decodes the body using the charset from the headers, and because the
        # source byte really is utf-8 this works.  This is likely to fail
        # against real dirty data (ie: produce mojibake), but the data is
        # invalid anyway so it is as good a guess as any.  But this means that
        # this test just confirms the current behavior; that behavior is not
        # necessarily the best possible behavior.  With 'decode' it is
        # returning the raw bytes, so that test should be of correct behavior,
        # or at least produce the same result that email4 did.
        m = self.bodytest_msg.format(charset='utf-8',
                                     cte='quoted-printable',
                                     bodyline='p=C3=B6stál').encode('utf-8')
        msg = email.message_from_bytes(m)
        self.assertEqual(msg.get_payload(), 'p=C3=B6stál\n')
        self.assertEqual(msg.get_payload(decode=True),
                         'pöstál\n'.encode('utf-8'))
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def __init__(self, message=None):
        """Initialize a Message instance."""
        if isinstance(message, email.message.Message):
            self._become_message(copy.deepcopy(message))
            if isinstance(message, Message):
                message._explain_to(self)
        elif isinstance(message, bytes):
            self._become_message(email.message_from_bytes(message))
        elif isinstance(message, str):
            self._become_message(email.message_from_string(message))
        elif isinstance(message, io.TextIOWrapper):
            self._become_message(email.message_from_file(message))
        elif hasattr(message, "read"):
            self._become_message(email.message_from_binary_file(message))
        elif message is None:
            email.message.Message.__init__(self)
        else:
            raise TypeError('Invalid message type: %s' % type(message))
项目:qqbot    作者:pandolia    | 项目源码 | 文件源码
def getSubject(self, i):
        conn = self.conn
        id_list = conn.search(None, '(UNSEEN)')[1][0].split()
        try:
            email_id = id_list[i]
        except IndexError:
            return None
        data = conn.fetch(email_id, '(BODY.PEEK[HEADER.FIELDS (SUBJECT)])')[1]
        if not PY3:
            msg = message_from_string(data[0][1])
            s, encoding = decode_header(msg['Subject'])[0]
            subject = s.decode(encoding or 'utf-8').encode('utf-8')
        else:
            msg = message_from_bytes(data[0][1])
            s, encoding = decode_header(msg['Subject'])[0]
            subject = s if type(s) is str else s.decode(encoding or 'utf-8')
        return subject
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_binary_body_with_encode_7or8bit(self):
        # Issue 17171.
        bytesdata = b'\xfa\xfb\xfc\xfd\xfe\xff'
        msg = MIMEApplication(bytesdata, _encoder=encoders.encode_7or8bit)
        # Treated as a string, this will be invalid code points.
        self.assertEqual(msg.get_payload(), '\uFFFD' * len(bytesdata))
        self.assertEqual(msg.get_payload(decode=True), bytesdata)
        self.assertEqual(msg['Content-Transfer-Encoding'], '8bit')
        s = BytesIO()
        g = BytesGenerator(s)
        g.flatten(msg)
        wireform = s.getvalue()
        msg2 = email.message_from_bytes(wireform)
        self.assertEqual(msg.get_payload(), '\uFFFD' * len(bytesdata))
        self.assertEqual(msg2.get_payload(decode=True), bytesdata)
        self.assertEqual(msg2['Content-Transfer-Encoding'], '8bit')
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_binary_body_with_encode_noop(self):
        # Issue 16564: This does not produce an RFC valid message, since to be
        # valid it should have a CTE of binary.  But the below works in
        # Python2, and is documented as working this way.
        bytesdata = b'\xfa\xfb\xfc\xfd\xfe\xff'
        msg = MIMEApplication(bytesdata, _encoder=encoders.encode_noop)
        # Treated as a string, this will be invalid code points.
        self.assertEqual(msg.get_payload(), '\uFFFD' * len(bytesdata))
        self.assertEqual(msg.get_payload(decode=True), bytesdata)
        s = BytesIO()
        g = BytesGenerator(s)
        g.flatten(msg)
        wireform = s.getvalue()
        msg2 = email.message_from_bytes(wireform)
        self.assertEqual(msg.get_payload(), '\uFFFD' * len(bytesdata))
        self.assertEqual(msg2.get_payload(decode=True), bytesdata)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_invalid_8bit_in_non_8bit_cte_uses_replace(self):
        # This is similar to the previous test, but proves that if the 8bit
        # byte is undecodeable in the specified charset, it gets replaced
        # by the unicode 'unknown' character.  Again, this may or may not
        # be the ideal behavior.  Note that if decode=False none of the
        # decoders will get involved, so this is the only test we need
        # for this behavior.
        m = self.bodytest_msg.format(charset='ascii',
                                     cte='quoted-printable',
                                     bodyline='p=C3=B6stál').encode('utf-8')
        msg = email.message_from_bytes(m)
        self.assertEqual(msg.get_payload(), 'p=C3=B6st\uFFFD\uFFFDl\n')
        self.assertEqual(msg.get_payload(decode=True),
                        'pöstál\n'.encode('utf-8'))

    # test_defect_handling:test_invalid_chars_in_base64_payload
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_binary_body_with_encode_7or8bit(self):
        # Issue 17171.
        bytesdata = b'\xfa\xfb\xfc\xfd\xfe\xff'
        msg = MIMEApplication(bytesdata, _encoder=encoders.encode_7or8bit)
        # Treated as a string, this will be invalid code points.
        self.assertEqual(msg.get_payload(), '\uFFFD' * len(bytesdata))
        self.assertEqual(msg.get_payload(decode=True), bytesdata)
        self.assertEqual(msg['Content-Transfer-Encoding'], '8bit')
        s = BytesIO()
        g = BytesGenerator(s)
        g.flatten(msg)
        wireform = s.getvalue()
        msg2 = email.message_from_bytes(wireform)
        self.assertEqual(msg.get_payload(), '\uFFFD' * len(bytesdata))
        self.assertEqual(msg2.get_payload(decode=True), bytesdata)
        self.assertEqual(msg2['Content-Transfer-Encoding'], '8bit')
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_8bit_in_quopri_body(self):
        # This is non-RFC compliant data...without 'decode' the library code
        # decodes the body using the charset from the headers, and because the
        # source byte really is utf-8 this works.  This is likely to fail
        # against real dirty data (ie: produce mojibake), but the data is
        # invalid anyway so it is as good a guess as any.  But this means that
        # this test just confirms the current behavior; that behavior is not
        # necessarily the best possible behavior.  With 'decode' it is
        # returning the raw bytes, so that test should be of correct behavior,
        # or at least produce the same result that email4 did.
        m = self.bodytest_msg.format(charset='utf-8',
                                     cte='quoted-printable',
                                     bodyline='p=C3=B6stál').encode('utf-8')
        msg = email.message_from_bytes(m)
        self.assertEqual(msg.get_payload(), 'p=C3=B6stál\n')
        self.assertEqual(msg.get_payload(decode=True),
                         'pöstál\n'.encode('utf-8'))
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_invalid_8bit_in_non_8bit_cte_uses_replace(self):
        # This is similar to the previous test, but proves that if the 8bit
        # byte is undecodeable in the specified charset, it gets replaced
        # by the unicode 'unknown' character.  Again, this may or may not
        # be the ideal behavior.  Note that if decode=False none of the
        # decoders will get involved, so this is the only test we need
        # for this behavior.
        m = self.bodytest_msg.format(charset='ascii',
                                     cte='quoted-printable',
                                     bodyline='p=C3=B6stál').encode('utf-8')
        msg = email.message_from_bytes(m)
        self.assertEqual(msg.get_payload(), 'p=C3=B6st\uFFFD\uFFFDl\n')
        self.assertEqual(msg.get_payload(decode=True),
                        'pöstál\n'.encode('utf-8'))

    # test_defect_handling:test_invalid_chars_in_base64_payload
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def __init__(self, message=None):
        """Initialize a Message instance."""
        if isinstance(message, email.message.Message):
            self._become_message(copy.deepcopy(message))
            if isinstance(message, Message):
                message._explain_to(self)
        elif isinstance(message, bytes):
            self._become_message(email.message_from_bytes(message))
        elif isinstance(message, str):
            self._become_message(email.message_from_string(message))
        elif isinstance(message, io.TextIOWrapper):
            self._become_message(email.message_from_file(message))
        elif hasattr(message, "read"):
            self._become_message(email.message_from_binary_file(message))
        elif message is None:
            email.message.Message.__init__(self)
        else:
            raise TypeError('Invalid message type: %s' % type(message))
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_binary_body_with_encode_7or8bit(self):
        # Issue 17171.
        bytesdata = b'\xfa\xfb\xfc\xfd\xfe\xff'
        msg = MIMEApplication(bytesdata, _encoder=encoders.encode_7or8bit)
        # Treated as a string, this will be invalid code points.
        self.assertEqual(msg.get_payload(), '\uFFFD' * len(bytesdata))
        self.assertEqual(msg.get_payload(decode=True), bytesdata)
        self.assertEqual(msg['Content-Transfer-Encoding'], '8bit')
        s = BytesIO()
        g = BytesGenerator(s)
        g.flatten(msg)
        wireform = s.getvalue()
        msg2 = email.message_from_bytes(wireform)
        self.assertEqual(msg.get_payload(), '\uFFFD' * len(bytesdata))
        self.assertEqual(msg2.get_payload(decode=True), bytesdata)
        self.assertEqual(msg2['Content-Transfer-Encoding'], '8bit')
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_8bit_in_quopri_body(self):
        # This is non-RFC compliant data...without 'decode' the library code
        # decodes the body using the charset from the headers, and because the
        # source byte really is utf-8 this works.  This is likely to fail
        # against real dirty data (ie: produce mojibake), but the data is
        # invalid anyway so it is as good a guess as any.  But this means that
        # this test just confirms the current behavior; that behavior is not
        # necessarily the best possible behavior.  With 'decode' it is
        # returning the raw bytes, so that test should be of correct behavior,
        # or at least produce the same result that email4 did.
        m = self.bodytest_msg.format(charset='utf-8',
                                     cte='quoted-printable',
                                     bodyline='p=C3=B6stál').encode('utf-8')
        msg = email.message_from_bytes(m)
        self.assertEqual(msg.get_payload(), 'p=C3=B6stál\n')
        self.assertEqual(msg.get_payload(decode=True),
                         'pöstál\n'.encode('utf-8'))
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_invalid_8bit_in_non_8bit_cte_uses_replace(self):
        # This is similar to the previous test, but proves that if the 8bit
        # byte is undecodeable in the specified charset, it gets replaced
        # by the unicode 'unknown' character.  Again, this may or may not
        # be the ideal behavior.  Note that if decode=False none of the
        # decoders will get involved, so this is the only test we need
        # for this behavior.
        m = self.bodytest_msg.format(charset='ascii',
                                     cte='quoted-printable',
                                     bodyline='p=C3=B6stál').encode('utf-8')
        msg = email.message_from_bytes(m)
        self.assertEqual(msg.get_payload(), 'p=C3=B6st\uFFFD\uFFFDl\n')
        self.assertEqual(msg.get_payload(decode=True),
                        'pöstál\n'.encode('utf-8'))

    # test_defect_handling:test_invalid_chars_in_base64_payload
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def __init__(self, message=None):
        """Initialize a Message instance."""
        if isinstance(message, email.message.Message):
            self._become_message(copy.deepcopy(message))
            if isinstance(message, Message):
                message._explain_to(self)
        elif isinstance(message, bytes):
            self._become_message(email.message_from_bytes(message))
        elif isinstance(message, str):
            self._become_message(email.message_from_string(message))
        elif isinstance(message, io.TextIOWrapper):
            self._become_message(email.message_from_file(message))
        elif hasattr(message, "read"):
            self._become_message(email.message_from_binary_file(message))
        elif message is None:
            email.message.Message.__init__(self)
        else:
            raise TypeError('Invalid message type: %s' % type(message))
项目:aioimaplib    作者:bamthomas    | 项目源码 | 文件源码
def append_literal(self, data):
        tag, mailbox_name, size = self.append_literal_command
        if data == CRLF:
            if 'UIDPLUS' in self.capabilities:
                self.send_tagged_line(tag, 'OK [APPENDUID %s %s] APPEND completed.' %
                                      (self.uidvalidity, self.server_state.max_uid(self.user_login, mailbox_name)))
            else:
                self.send_tagged_line(tag, 'OK APPEND completed.')
            self.append_literal_command = None
            return

        if len(data) != size:
            self.send_tagged_line(self.append_literal_command[0],
                                  'BAD literal length : expected %s but was %s' % (size, len(data)))
            self.append_literal_command = None
        else:
            m = email.message_from_bytes(data)
            self.server_state.add_mail(self.user_login, Mail(m), mailbox_name)
项目:COWNotifier    作者:kadircet    | 项目源码 | 文件源码
def updateTopic(self, topic):
        if not self.initialized:
            return []
        if time.time() - self.time > 60.:
            self.connect()
        try:
            (_, _, first, last, _) = self.conn.group(topic)
        except Exception as e:
            print(e, datetime.datetime.now())
            traceback.print_exc()
            self.connect()
            return
        start = max(self.groups[topic] + 1, first)
        res = []
        headers = ("From", "Newsgroups", "Subject", "Date")
        registry = HeaderRegistry()
        registry.map_to_type('From', UnstructuredHeader)
        policy = EmailPolicy(header_factory=registry)
        while start <= last:
            try:
                artic = self.conn.article(start)[1]
                raw_msg = b'\r\n'.join(artic.lines)
                mime_msg = email.message_from_bytes(raw_msg, policy=policy)
                hdr = "<code>"
                for h in headers:
                    hdr += "%s: %s\r\n" % (h, html.escape(mime_msg[h]))
                content = ""
                for part in mime_msg.walk():
                    if part.get_content_type() == 'text/plain':
                        content += html.escape(part.get_content())
                is_plus_one = isPlusOne(content)
                hdr += "%s: %s\r\n" % ("is_plus_one", is_plus_one)
                hdr += "</code>\r\n"
                res.append([is_plus_one, hdr + content])
            except Exception as e:
                print(e, datetime.datetime.now())
                traceback.print_exc()
            start += 1
        self.groups[topic] = last
项目:wsproto    作者:python-hyper    | 项目源码 | 文件源码
def parse_headers(headers):
    if PY3:
        headers = email.message_from_bytes(headers)
    else:
        headers = email.message_from_string(headers)

    return dict(headers.items())
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test__all__(self):
        module = __import__('email')
        # Can't use sorted() here due to Python 2.3 compatibility
        all = module.__all__[:]
        all.sort()
        self.assertEqual(all, [
            'base64mime', 'charset', 'encoders', 'errors', 'generator',
            'header', 'iterators', 'message', 'message_from_binary_file',
            'message_from_bytes', 'message_from_file',
            'message_from_string', 'mime', 'parser',
            'quoprimime', 'utils',
            ])
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_known_8bit_CTE(self):
        m = self.bodytest_msg.format(charset='utf-8',
                                     cte='8bit',
                                     bodyline='pöstal').encode('utf-8')
        msg = email.message_from_bytes(m)
        self.assertEqual(msg.get_payload(), "pöstal\n")
        self.assertEqual(msg.get_payload(decode=True),
                         "pöstal\n".encode('utf-8'))
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_unknown_8bit_CTE(self):
        m = self.bodytest_msg.format(charset='notavalidcharset',
                                     cte='8bit',
                                     bodyline='pöstal').encode('utf-8')
        msg = email.message_from_bytes(m)
        self.assertEqual(msg.get_payload(), "p\uFFFD\uFFFDstal\n")
        self.assertEqual(msg.get_payload(decode=True),
                         "pöstal\n".encode('utf-8'))
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_invalid_8bit_in_non_8bit_cte_uses_replace(self):
        # This is similar to the previous test, but proves that if the 8bit
        # byte is undecodeable in the specified charset, it gets replaced
        # by the unicode 'unknown' character.  Again, this may or may not
        # be the ideal behavior.  Note that if decode=False none of the
        # decoders will get involved, so this is the only test we need
        # for this behavior.
        m = self.bodytest_msg.format(charset='ascii',
                                     cte='quoted-printable',
                                     bodyline='p=C3=B6stál').encode('utf-8')
        msg = email.message_from_bytes(m)
        self.assertEqual(msg.get_payload(), 'p=C3=B6st\uFFFD\uFFFDl\n')
        self.assertEqual(msg.get_payload(decode=True),
                        'pöstál\n'.encode('utf-8'))
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_8bit_in_uuencode_body(self):
        # Sticking an 8bit byte in a uuencode block makes it undecodable by
        # normal means, so the block is returned undecoded, but as bytes.
        m = self.bodytest_msg.format(charset='utf-8',
                                     cte='uuencode',
                                     bodyline='<,.V<W1A; á ').encode('utf-8')
        msg = email.message_from_bytes(m)
        self.assertEqual(msg.get_payload(decode=True),
                         '<,.V<W1A; á \n'.encode('utf-8'))
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_get_8bit_header(self):
        msg = email.message_from_bytes(self.headertest_msg)
        self.assertEqual(str(msg.get('to')), 'b\uFFFD\uFFFDz')
        self.assertEqual(str(msg['to']), 'b\uFFFD\uFFFDz')
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_print_8bit_headers(self):
        msg = email.message_from_bytes(self.headertest_msg)
        self.assertEqual(str(msg),
                         textwrap.dedent("""\
                            From: {}
                            To: {}
                            Subject: {}
                            From: {}

                            Yes, they are flying.
                            """).format(*[expected[1] for (_, expected) in
                                        self.headertest_headers]))
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_get_content_type_with_8bit(self):
        msg = email.message_from_bytes(textwrap.dedent("""\
            Content-Type: text/pl\xA7in; charset=utf-8
            """).encode('latin-1'))
        self.assertEqual(msg.get_content_type(), "text/pl\uFFFDin")
        self.assertEqual(msg.get_content_maintype(), "text")
        self.assertEqual(msg.get_content_subtype(), "pl\uFFFDin")
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_get_params_with_8bit(self):
        msg = email.message_from_bytes(
            'X-Header: foo=\xa7ne; b\xa7r=two; baz=three\n'.encode('latin-1'))
        self.assertEqual(msg.get_params(header='x-header'),
           [('foo', '\uFFFDne'), ('b\uFFFDr', 'two'), ('baz', 'three')])
        self.assertEqual(msg.get_param('Foo', header='x-header'), '\uFFFdne')
        # XXX: someday you might be able to get 'b\xa7r', for now you can't.
        self.assertEqual(msg.get_param('b\xa7r', header='x-header'), None)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_get_rfc2231_params_with_8bit(self):
        msg = email.message_from_bytes(textwrap.dedent("""\
            Content-Type: text/plain; charset=us-ascii;
             title*=us-ascii'en'This%20is%20not%20f\xa7n"""
             ).encode('latin-1'))
        self.assertEqual(msg.get_param('title'),
            ('us-ascii', 'en', 'This is not f\uFFFDn'))
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_set_rfc2231_params_with_8bit(self):
        msg = email.message_from_bytes(textwrap.dedent("""\
            Content-Type: text/plain; charset=us-ascii;
             title*=us-ascii'en'This%20is%20not%20f\xa7n"""
             ).encode('latin-1'))
        msg.set_param('title', 'test')
        self.assertEqual(msg.get_param('title'), 'test')
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_del_rfc2231_params_with_8bit(self):
        msg = email.message_from_bytes(textwrap.dedent("""\
            Content-Type: text/plain; charset=us-ascii;
             title*=us-ascii'en'This%20is%20not%20f\xa7n"""
             ).encode('latin-1'))
        msg.del_param('title')
        self.assertEqual(msg.get_param('title'), None)
        self.assertEqual(msg.get_content_maintype(), 'text')
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_bytes_generator(self):
        msg = email.message_from_bytes(self.non_latin_bin_msg)
        out = BytesIO()
        email.generator.BytesGenerator(out).flatten(msg)
        self.assertEqual(out.getvalue(), self.non_latin_bin_msg)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_generator_handles_8bit(self):
        msg = email.message_from_bytes(self.non_latin_bin_msg)
        out = StringIO()
        email.generator.Generator(out).flatten(msg)
        self.assertEqual(out.getvalue(), self.non_latin_bin_msg_as7bit_wrapped)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_bytes_generator_with_unix_from(self):
        # The unixfrom contains a current date, so we can't check it
        # literally.  Just make sure the first word is 'From' and the
        # rest of the message matches the input.
        msg = email.message_from_bytes(self.non_latin_bin_msg)
        out = BytesIO()
        email.generator.BytesGenerator(out).flatten(msg, unixfrom=True)
        lines = out.getvalue().split(b'\n')
        self.assertEqual(lines[0].split()[0], b'From')
        self.assertEqual(b'\n'.join(lines[1:]), self.non_latin_bin_msg)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_string_generator_reencodes_to_quopri_when_appropriate(self):
        m = email.message_from_bytes(self.latin_bin_msg)
        self.assertEqual(str(m), self.latin_bin_msg_as7bit)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_decoded_generator_emits_unicode_body(self):
        m = email.message_from_bytes(self.latin_bin_msg)
        out = StringIO()
        email.generator.DecodedGenerator(out).flatten(m)
        #DecodedHeader output contains an extra blank line compared
        #to the input message.  RDM: not sure if this is a bug or not,
        #but it is not specific to the 8bit->7bit conversion.
        self.assertEqual(out.getvalue(),
            self.latin_bin_msg.decode('latin-1')+'\n')
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def _msgobj(self, filename):
        with openfile(filename, 'rb') as fp:
            data = fp.read()
        data = self.normalize_linesep_regex.sub(self.blinesep, data)
        msg = email.message_from_bytes(data)
        return msg, data
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def get_string(self, key):
        """Return a string representation or raise a KeyError.

        Uses email.message.Message to create a 7bit clean string
        representation of the message."""
        return email.message_from_bytes(self.get_bytes(key)).as_string()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def get_string(self, key, from_=False):
        """Return a string representation or raise a KeyError."""
        return email.message_from_bytes(
            self.get_bytes(key)).as_string(unixfrom=from_)
项目:imap_tools    作者:ikvk    | 项目源码 | 文件源码
def __init__(self, message_id: str, fetch_data):
        message_data, uid_data, flag_data = self._clean_message_data(fetch_data)
        self.id = message_id
        self.obj = email.message_from_bytes(message_data)
        self._uid_data = uid_data
        self._flag_data = flag_data
项目:ecs    作者:ecs-org    | 项目源码 | 文件源码
def process_message(self, peer, mailfrom, rcpttos, data, **kwargs):
        try:
            if len(rcpttos) > 1:
                raise SMTPError(554, 'Too many recipients')

            msg = email.message_from_bytes(data)
            text = self._get_text(msg)
            orig_msg = self._find_msg(rcpttos[0])
            thread = orig_msg.thread
            # XXX: rawmsg can include multiple content-charsets and should be a binaryfield
            # as a workaround we convert to base64
            thread.messages.filter(
                receiver=orig_msg.receiver).update(unread=False)
            thread_msg = thread.add_message(orig_msg.receiver, text,
                rawmsg=base64.b64encode(data),
                rawmsg_msgid=msg['Message-ID'])
            logger.info(
                'Accepted email from {0} via {1} to {2} id {3} thread {4} orig_msg {5} message {6}'.format(
                mailfrom, orig_msg.receiver.email, orig_msg.sender.email, 
                msg['Message-ID'], thread.pk, orig_msg.pk, thread_msg.pk))

        except SMTPError as e:
            logger.info('Rejected email: {0}'.format(e))
            return str(e)

        except Exception as e:
            logger.error('email raised exception: {0}'.format(e))
            if self.store_exceptions:
                self.undeliverable_maildir.add(data)
            raise

        return '250 Ok'
项目:munch-core    作者:crunchmail    | 项目源码 | 文件源码
def apply(self, envelope):
        if not envelope.recipients:
            raise QueueError('Missing recipient')
        recipient = envelope.recipients[0]

        if extract_domain(recipient) != settings.RETURNPATH_DOMAIN:
            raise QueueError('Domain not valid')

        message = email.message_from_bytes(
            envelope.flatten()[0] + envelope.flatten()[1])

        # Check recipient prefix
        prefix_allowed = False
        for prefix in ['return-', 'unsubscribe-', 'abuse']:
            if recipient.startswith(prefix):
                prefix_allowed = True
                break
        if not prefix_allowed:
            raise QueueError('Prefix not allowed')

        # Find handler
        handler_found = False
        for handler in [DSNHandler, ARFHandler, UnsubscribeHandler]:
            if handler.can_handle(message):
                log.info('Handler {} can handle'.format(handler.__name__))
                handler = handler(envelope, message)
                handler_found = True
                break
        if not handler_found:
            raise QueueError('No handler found')
        handler.apply()
项目:munch-core    作者:crunchmail    | 项目源码 | 文件源码
def check(self, msg):
        """
        @param msg : a string containing the message to check
        @return status, score, message
        """
        msg_bytes = msg.encode()
        try:
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.settimeout(2)
            s.connect((self.host, self.port))
            s.sendall(b'PROCESS SPAMC/1.4')
            s.sendall(b"\r\n")
            s.sendall(("Content-length: %s" % len(msg_bytes)).encode())
            s.sendall(b"\r\n")
            s.sendall(b"\r\n")
            s.sendall(msg_bytes)
            s.shutdown(1)
            socketfile = s.makefile("rb")
            line1_info = socketfile.readline()
            socketfile.readline()
            socketfile.readline()
            socketfile.readline()
            content = socketfile.read()
            if len(line1_info) == 0:
                raise SpamCheckerError("Spamd response is empty")

            answer = line1_info.strip().split()
            if len(answer) != 3:
                raise SpamCheckerError("Invalid Spamd answer: {}".format(
                    answer))
            (version, number, status) = answer
            if status.decode() != 'EX_OK':
                raise SpamCheckerError('Invalid Spamd status: {}'.format(
                    status))

        except (socket.error) as e:
            raise SpamCheckerError(e) from None

        else:
            # parse mail
            return SpamResult.from_headers(email.message_from_bytes(content))
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_cte_type_7bit_handles_unknown_8bit(self):
        source = ("Subject: Maintenant je vous présente mon "
                 "collègue\n\n").encode('utf-8')
        expected = ('Subject: Maintenant je vous =?unknown-8bit?q?'
                    'pr=C3=A9sente_mon_coll=C3=A8gue?=\n\n').encode('ascii')
        msg = message_from_bytes(source)
        s = io.BytesIO()
        g = BytesGenerator(s, policy=self.policy.clone(cte_type='7bit'))
        g.flatten(msg)
        self.assertEqual(s.getvalue(), expected)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_binary_body_with_encode_base64(self):
        bytesdata = b'\xfa\xfb\xfc\xfd\xfe\xff'
        msg = MIMEApplication(bytesdata, _encoder=encoders.encode_base64)
        self.assertEqual(msg.get_payload(), '+vv8/f7/\n')
        self.assertEqual(msg.get_payload(decode=True), bytesdata)
        s = BytesIO()
        g = BytesGenerator(s)
        g.flatten(msg)
        wireform = s.getvalue()
        msg2 = email.message_from_bytes(wireform)
        self.assertEqual(msg.get_payload(), '+vv8/f7/\n')
        self.assertEqual(msg2.get_payload(decode=True), bytesdata)


# Test the basic MIMEText class
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test__all__(self):
        module = __import__('email')
        self.assertEqual(sorted(module.__all__), [
            'base64mime', 'charset', 'encoders', 'errors', 'feedparser',
            'generator', 'header', 'iterators', 'message',
            'message_from_binary_file', 'message_from_bytes',
            'message_from_file', 'message_from_string', 'mime', 'parser',
            'quoprimime', 'utils',
            ])