Python email.parser 模块,BytesParser() 实例源码

我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用email.parser.BytesParser()

项目:PyCIRCLeanMail    作者:CIRCL    | 项目源码 | 文件源码
def split_email(self, raw_email):
        parsed_email = BytesParser().parsebytes(raw_email)
        to_keep = []
        attachments = []
        if parsed_email.is_multipart():
            for p in parsed_email.get_payload():
                if p.get_filename():
                    filename = decode_header(p.get_filename())
                    if filename[0][1]:
                        filename = filename[0][0].decode(filename[0][1])
                    else:
                        filename = filename[0][0]
                    attachments.append(File(p.get_payload(decode=True), filename))
                else:
                    to_keep.append(p)
        else:
            to_keep.append(parsed_email.get_payload())
        return to_keep, attachments, parsed_email
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def message_from_bytes(s, *args, **kws):
    """Parse a bytes string into a Message object model.

    Optional _class and strict are passed to the Parser constructor.
    """
    from email.parser import BytesParser
    return BytesParser(*args, **kws).parsebytes(s)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def message_from_binary_file(fp, *args, **kws):
    """Read a binary file and parse its contents into a Message object model.

    Optional _class and strict are passed to the Parser constructor.
    """
    from email.parser import BytesParser
    return BytesParser(*args, **kws).parse(fp)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def message_from_bytes(s, *args, **kws):
    """Parse a bytes string into a Message object model.

    Optional _class and strict are passed to the Parser constructor.
    """
    from email.parser import BytesParser
    return BytesParser(*args, **kws).parsebytes(s)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def message_from_binary_file(fp, *args, **kws):
    """Read a binary file and parse its contents into a Message object model.

    Optional _class and strict are passed to the Parser constructor.
    """
    from email.parser import BytesParser
    return BytesParser(*args, **kws).parse(fp)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def message_from_bytes(s, *args, **kws):
    """Parse a bytes string into a Message object model.

    Optional _class and strict are passed to the Parser constructor.
    """
    from email.parser import BytesParser
    return BytesParser(*args, **kws).parsebytes(s)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def message_from_binary_file(fp, *args, **kws):
    """Read a binary file and parse its contents into a Message object model.

    Optional _class and strict are passed to the Parser constructor.
    """
    from email.parser import BytesParser
    return BytesParser(*args, **kws).parse(fp)
项目:gulper    作者:QuantifiedSelfless    | 项目源码 | 文件源码
def get_content(self, raw):
        data = base64.urlsafe_b64decode(raw)
        email_parser = EmailParser(policy=policy.default)
        email = email_parser.parsebytes(data)
        plain = email.get_body(preferencelist=('plain',))
        body = None
        if plain:
            body = plain.get_payload()
        email_dict = dict(email)
        email_dict['body'] = body
        return email_dict
项目:authserver    作者:jdelic    | 项目源码 | 文件源码
def _format_denied_recipients(self, original_mail: bytes, recipients: Sequence[str]) -> bytes:
        parser = BytesParser()
        # TODO: fix type annotation when typeshed has better stubs
        msg = None  # type: Message
        msg = parser.parsebytes(original_mail, True)  # type: ignore
        msg["Subject"] = "[mailforwarder error] Re: %s" % msg["Subject"]
        msg["To"] = msg["From"]
        msg["From"] = "mailforwarder bounce <>"

        rcptlist = ""
        for rcpt in recipients:
            rcptlist = "%s\n%s" % ("  * %s" % rcpt, rcptlist,)
        txt = denied_recipients_template.format(rcptlist=rcptlist)
        msg.set_payload(txt, charset='utf-8')
        return msg.as_bytes(policy=policy.SMTP)
项目:authserver    作者:jdelic    | 项目源码 | 文件源码
def add_received_header(self, peer: Tuple[str, int], msg: bytes, channel: PatchedSMTPChannel) -> bytes:
        parser = BytesParser(_class=SaneMessage)
        # TODO: remove type annotation when issue is fixed
        new_msg = None  # type: SaneMessage
        new_msg = parser.parsebytes(msg)  # type: ignore
        new_msg.prepend_header("Received",
                               "from %s (%s:%s)\r\n\tby %s (%s [%s:%s]) with SMTP;\r\n\t%s" %
                               (channel.seen_greeting, peer[0], peer[1], self.server_name, self.daemon_name,
                                self._localaddr[0], self._localaddr[1],
                                timezone.now().strftime("%a, %d %b %Y %H:%M:%S %z (%Z)")))
        return new_msg.as_bytes(policy=policy.SMTP)