Python email.message 模块,EmailMessage() 实例源码

我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用email.message.EmailMessage()

项目:chrome-prerender    作者:bosondata    | 项目源码 | 文件源码
def add(self, location: str, content_type: str, payload: str, encoding: str = 'quoted-printable') -> None:
        resource = EmailMessage()
        if content_type == 'text/html':
            resource.add_header('Content-Type', 'text/html', charset='utf-8')
        else:
            resource['Content-Type'] = content_type
        if encoding == 'quoted-printable':
            resource['Content-Transfer-Encoding'] = encoding
            resource.set_payload(quopri.encodestring(payload.encode()))
        elif encoding == 'base64':
            resource['Content-Transfer-Encoding'] = encoding
            resource.set_payload(base64.b64encode(payload))
        elif encoding == 'base64-encoded':  # Already base64 encoded
            resource['Content-Transfer-Encoding'] = 'base64'
            resource.set_payload(payload)
        else:
            raise ValueError('invalid encoding')
        resource['Content-Location'] = location
        self._msg.attach(resource)
项目:DualisWatcher    作者:LucaVazz    | 项目源码 | 文件源码
def send(self, target: str, subject: str, html_content_with_cids: str, inline_png_cids_filenames: {str : str}):
        msg = EmailMessage()
        msg['Subject'] = subject
        msg['From'] = self.sender
        msg['To'] = target

        msg.set_content('')

        msg.add_alternative(
            html_content_with_cids, subtype='html'
        )

        for png_cid in inline_png_cids_filenames:
            full_path_to_png = os.path.abspath(os.path.join(
                os.path.dirname(__file__), inline_png_cids_filenames[png_cid]
            ))
            with open(full_path_to_png, 'rb') as png_file:
                file_contents = png_file.read()
                msg.get_payload()[1].add_related(file_contents, 'image', 'png', cid=png_cid)

        with smtplib.SMTP(self.smtp_server_host, self.smtp_server_port) as smtp_connection:
            smtp_connection.starttls()
            smtp_connection.login(self.username, self.password)
            smtp_connection.send_message(msg)
项目:aioweb    作者:kreopt    | 项目源码 | 文件源码
def send_mail(app,
                    sender='%s service <%s>' % (settings.BRAND, settings.SERVER_EMAIL),
                    recipients=tuple(),
                    subject='',
                    body=''):
    if not len(recipients):
        return
    async with app.smtp as conn:

        msg = EmailMessage()

        msg['Subject'] = subject
        msg['From'] = sender
        msg['To'] = ', '.join(recipients)

        msg.set_content(body)

        return await conn.send_message(msg, sender, recipients, timeout=5)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def __init__(self, _factory=None, *, policy=compat32):
        """_factory is called with no arguments to create a new message obj

        The policy keyword specifies a policy object that controls a number of
        aspects of the parser's operation.  The default policy maintains
        backward compatibility.

        """
        self.policy = policy
        self._factory_kwds = lambda: {'policy': self.policy}
        if _factory is None:
            # What this should be:
            #self._factory = policy.default_message_factory
            # but, because we are post 3.4 feature freeze, fix with temp hack:
            if self.policy is compat32:
                self._factory = message.Message
            else:
                self._factory = message.EmailMessage
        else:
            self._factory = _factory
            try:
                _factory(policy=self.policy)
            except TypeError:
                # Assume this is an old-style factory
                self._factory_kwds = lambda: {}
        self._input = BufferedSubFile()
        self._msgstack = []
        self._parse = self._parsegen().__next__
        self._cur = None
        self._last = None
        self._headersonly = False

    # Non-public interface for supporting Parser's headersonly flag
项目:chrome-prerender    作者:bosondata    | 项目源码 | 文件源码
def __init__(self):
        self._msg = EmailMessage()
        self._msg['MIME-Version'] = '1.0'
        self._msg.add_header('Content-Type', 'multipart/related', type='text/html')
项目:netsocadmin2    作者:UCCNetworkingSociety    | 项目源码 | 文件源码
def send_help_email(username:str, user_email:str, subject:str, message:str) -> bool:
    """
    Sends an email to the netsoc email address containing the help data, 
    CC'ing all the SysAdmins and the user requesting help.
    This enables us to reply to the email directly instead of copypasting the
    from address and disconnecting history.

    :param username the user requesting help
    :param user_email the user's email address
    :param subject the subject of the user's help requests
    :param message the user's actual message
    """
    message_body = \
    """
From: %s\n
Email: %s

%s

PS: Please "Reply All" to the emails so that you get a quicker response."""%(
        username, user_email, message)

    msg = EmailMessage()
    msg.set_content(message_body)
    msg["From"] = p.NETSOC_ADMIN_EMAIL_ADDRESS
    msg["To"] = p.NETSOC_EMAIL_ADDRESS
    msg["Subject"] = "[Netsoc Help] " + subject
    msg["Cc"] = tuple(p.SYSADMIN_EMAILS + [user_email])
    try:
        with smtplib.SMTP("smtp.sendgrid.net", 587) as s:
            s.login(p.SENDGRID_USERNAME, p.SENDGRID_PASSWORD)
            s.send_message(msg)
    except:
        return False
    return True
项目:netsocadmin2    作者:UCCNetworkingSociety    | 项目源码 | 文件源码
def send_sudo_request_email(username:str, user_email:str):
    """
    Sends an email notifying SysAdmins that a user has requested an account on feynman.

    :param username the server username of the user who made the request.
    :param user_email the email address of that user to contact them for vetting.
    """
    message_body = \
    """
Hi {username},

Thank you for making a request for an account with sudo privileges on feynman.netsoc.co.

We will be in touch shortly. 

Best,

The UCC Netsoc SysAdmin Team.

PS: Please "Reply All" to the emails so that you get a quicker response.

""".format(username=username)

    msg = EmailMessage()
    msg.set_content(message_body)
    msg["From"] = p.NETSOC_ADMIN_EMAIL_ADDRESS
    msg["To"] = p.NETSOC_EMAIL_ADDRESS
    msg["Subject"] = "[Netsoc Help] Sudo request on Feynman for {user}".format(
        user=username)
    msg["Cc"] = tuple(p.SYSADMIN_EMAILS + [user_email])

    with smtplib.SMTP("smtp.sendgrid.net", 587) as s:
        s.login(p.SENDGRID_USERNAME, p.SENDGRID_PASSWORD)
        s.send_message(msg)
项目:WebGames    作者:Julien00859    | 项目源码 | 文件源码
def signup(req):
    if any(map(lambda key: key not in req.json, ["name", "email", "password"])):
        logger.debug(f"Request is {req.json} but some arguments are missing.")
        raise InvalidUsage("Missing argument")

    if not await User.is_free(req.json["name"], req.json["email"]):
        logger.debug(f"Request is {req.json} but name or email is already taken")
        raise InvalidUsage("Username or email already taken")

    guest = Guest(req.json["name"], req.json["email"], User.hashpwd(req.json["password"]))

    chlg = await challenges.create_for(guest)

    logger.info(f"Guest signed up with name: {guest.name} and email: {guest.email}. Challenge generated: {chlg}")
    with open("mails" + sep + "challenge.txt") as mailtext:
        mail = EmailMessage()
        mail.set_content(mailtext.read().format(domain=req.app.config.domain,
                                                scheme="https" if req.app.config.schemessl else "http",
                                                challenge=chlg))
        mail["Subject"] = "WebGames Registration Challenge"
        mail["From"] = req.app.config.smtpuser
        mail["To"] = guest.email

        with SMTP(req.app.config.smtphost, req.app.config.smtpport) as smtp:
            if req.app.config.smtpssl:
                smtp.ehlo()
                smtp.starttls()
                smtp.ehlo()
            else:
                smtp.helo()
            smtp.login(req.app.config.smtpuser, req.app.config.smtppwd)
            smtp.send_message(mail)

    return text(f"Challenge sent to {guest.email}")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def emit(self, record):
        """
        Emit a record.

        Format the record and send it to the specified addressees.
        """
        try:
            import smtplib
            from email.message import EmailMessage
            import email.utils

            port = self.mailport
            if not port:
                port = smtplib.SMTP_PORT
            smtp = smtplib.SMTP(self.mailhost, port, timeout=self.timeout)
            msg = EmailMessage()
            msg['From'] = self.fromaddr
            msg['To'] = ','.join(self.toaddrs)
            msg['Subject'] = self.getSubject(record)
            msg['Date'] = email.utils.localtime()
            msg.set_content(self.format(record))
            if self.username:
                if self.secure is not None:
                    smtp.ehlo()
                    smtp.starttls(*self.secure)
                    smtp.ehlo()
                smtp.login(self.username, self.password)
            smtp.send_message(msg)
            smtp.quit()
        except Exception:
            self.handleError(record)
项目:pillar    作者:armadillica    | 项目源码 | 文件源码
def send_email(self: celery.Task, to_name: str, to_addr: str, subject: str, text: str, html: str):
    """Send an email to a single address."""
    # WARNING: when changing the signature of this function, also change the
    # self.retry() call below.
    cfg = current_app.config

    # Construct the message
    msg = EmailMessage()
    msg['Subject'] = subject
    msg['From'] = Address(cfg['MAIL_DEFAULT_FROM_NAME'], addr_spec=cfg['MAIL_DEFAULT_FROM_ADDR'])
    msg['To'] = (Address(to_name, addr_spec=to_addr),)
    msg.set_content(text)
    msg.add_alternative(html, subtype='html')

    # Refuse to send mail when we're testing.
    if cfg['TESTING']:
        log.warning('not sending mail to %s <%s> because we are TESTING', to_name, to_addr)
        return
    log.info('sending email to %s <%s>', to_name, to_addr)

    # Send the message via local SMTP server.
    try:
        with smtplib.SMTP(cfg['SMTP_HOST'], cfg['SMTP_PORT'], timeout=cfg['SMTP_TIMEOUT']) as smtp:
            if cfg.get('SMTP_USERNAME') and cfg.get('SMTP_PASSWORD'):
                smtp.login(cfg['SMTP_USERNAME'], cfg['SMTP_PASSWORD'])
            smtp.send_message(msg)
    except (IOError, OSError) as ex:
        log.exception('error sending email to %s <%s>, will retry later: %s',
                      to_name, to_addr, ex)
        self.retry((to_name, to_addr, subject, text, html), countdown=cfg['MAIL_RETRY'])
    else:
        log.info('mail to %s <%s> successfully sent', to_name, to_addr)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def __init__(self, _factory=None, *, policy=compat32):
        """_factory is called with no arguments to create a new message obj

        The policy keyword specifies a policy object that controls a number of
        aspects of the parser's operation.  The default policy maintains
        backward compatibility.

        """
        self.policy = policy
        self._factory_kwds = lambda: {'policy': self.policy}
        if _factory is None:
            # What this should be:
            #self._factory = policy.default_message_factory
            # but, because we are post 3.4 feature freeze, fix with temp hack:
            if self.policy is compat32:
                self._factory = message.Message
            else:
                self._factory = message.EmailMessage
        else:
            self._factory = _factory
            try:
                _factory(policy=self.policy)
            except TypeError:
                # Assume this is an old-style factory
                self._factory_kwds = lambda: {}
        self._input = BufferedSubFile()
        self._msgstack = []
        self._parse = self._parsegen().__next__
        self._cur = None
        self._last = None
        self._headersonly = False

    # Non-public interface for supporting Parser's headersonly flag