Python email 模块,policy() 实例源码

我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用email.policy()

项目:COWNotifier    作者:kadircet    | 项目源码 | 文件源码
def updateTopic(self, topic):
        if not self.initialized:
            return []
        if time.time() - self.time > 60.:
            self.connect()
        try:
            (_, _, first, last, _) = self.conn.group(topic)
        except Exception as e:
            print(e, datetime.datetime.now())
            traceback.print_exc()
            self.connect()
            return
        start = max(self.groups[topic] + 1, first)
        res = []
        headers = ("From", "Newsgroups", "Subject", "Date")
        registry = HeaderRegistry()
        registry.map_to_type('From', UnstructuredHeader)
        policy = EmailPolicy(header_factory=registry)
        while start <= last:
            try:
                artic = self.conn.article(start)[1]
                raw_msg = b'\r\n'.join(artic.lines)
                mime_msg = email.message_from_bytes(raw_msg, policy=policy)
                hdr = "<code>"
                for h in headers:
                    hdr += "%s: %s\r\n" % (h, html.escape(mime_msg[h]))
                content = ""
                for part in mime_msg.walk():
                    if part.get_content_type() == 'text/plain':
                        content += html.escape(part.get_content())
                is_plus_one = isPlusOne(content)
                hdr += "%s: %s\r\n" % ("is_plus_one", is_plus_one)
                hdr += "</code>\r\n"
                res.append([is_plus_one, hdr + content])
            except Exception as e:
                print(e, datetime.datetime.now())
                traceback.print_exc()
            start += 1
        self.groups[topic] = last
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_as_string_policy(self):
        msg = self._msgobj('msg_01.txt')
        newpolicy = msg.policy.clone(linesep='\r\n')
        fullrepr = msg.as_string(policy=newpolicy)
        s = StringIO()
        g = Generator(s, policy=newpolicy)
        g.flatten(msg)
        self.assertEqual(fullrepr, s.getvalue())
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_as_bytes_policy(self):
        msg = self._msgobj('msg_01.txt')
        newpolicy = msg.policy.clone(linesep='\r\n')
        fullrepr = msg.as_bytes(policy=newpolicy)
        s = BytesIO()
        g = BytesGenerator(s,policy=newpolicy)
        g.flatten(msg)
        self.assertEqual(fullrepr, s.getvalue())

    # test_headerregistry.TestContentTypeHeader.bad_params
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_bytes_parser_on_exception_does_not_close_file(self):
        with openfile('msg_15.txt', 'rb') as fp:
            bytesParser = email.parser.BytesParser
            self.assertRaises(email.errors.StartBoundaryNotFoundDefect,
                              bytesParser(policy=email.policy.strict).parse,
                              fp)
            self.assertFalse(fp.closed)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_parser_on_exception_does_not_close_file(self):
        with openfile('msg_15.txt', 'r') as fp:
            parser = email.parser.Parser
            self.assertRaises(email.errors.StartBoundaryNotFoundDefect,
                              parser(policy=email.policy.strict).parse, fp)
            self.assertFalse(fp.closed)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_as_string_policy(self):
        msg = self._msgobj('msg_01.txt')
        newpolicy = msg.policy.clone(linesep='\r\n')
        fullrepr = msg.as_string(policy=newpolicy)
        s = StringIO()
        g = Generator(s, policy=newpolicy)
        g.flatten(msg)
        self.assertEqual(fullrepr, s.getvalue())
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_as_bytes_policy(self):
        msg = self._msgobj('msg_01.txt')
        newpolicy = msg.policy.clone(linesep='\r\n')
        fullrepr = msg.as_bytes(policy=newpolicy)
        s = BytesIO()
        g = BytesGenerator(s,policy=newpolicy)
        g.flatten(msg)
        self.assertEqual(fullrepr, s.getvalue())

    # test_headerregistry.TestContentTypeHeader.bad_params
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_bytes_parser_on_exception_does_not_close_file(self):
        with openfile('msg_15.txt', 'rb') as fp:
            bytesParser = email.parser.BytesParser
            self.assertRaises(email.errors.StartBoundaryNotFoundDefect,
                              bytesParser(policy=email.policy.strict).parse,
                              fp)
            self.assertFalse(fp.closed)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_parser_on_exception_does_not_close_file(self):
        with openfile('msg_15.txt', 'r') as fp:
            parser = email.parser.Parser
            self.assertRaises(email.errors.StartBoundaryNotFoundDefect,
                              parser(policy=email.policy.strict).parse, fp)
            self.assertFalse(fp.closed)
项目:acm-website    作者:ColoradoSchoolOfMines    | 项目源码 | 文件源码
def parse_message(msgstring):
    return email.message_from_string(msgstring, policy=email.policy.default)