Python imaplib 模块,IMAP4 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用imaplib.IMAP4

项目:PySMS    作者:clayshieh    | 项目源码 | 文件源码
def init_server(self):
        self.logger.info("Initializing SMTP/IMAP servers.")
        # PySMS at minimum uses smtp server
        try:
            if self.ssl:
                self.smtp = smtplib.SMTP_SSL(self.smtp_server, self.smtp_port)
            else:
                self.smtp = smtplib.SMTP(self.smtp_server, self.smtp_port)
                self.smtp.starttls()
            self.smtp.login(self.address, self.password)
        except smtplib.SMTPException:
            raise PySMSException("Unable to start smtp server, please check credentials.")

        # If responding functionality is enabled
        if self.imap_server:
            try:
                if self.ssl:
                    self.imap = imaplib.IMAP4_SSL(self.imap_server)
                else:
                    self.imap = imaplib.IMAP4(self.imap_server)
                r, data = self.imap.login(self.address, self.password)
                if r == "OK":
                    r, data = self.imap.select(self.imap_mailbox)
                    if r != "OK":
                        raise PySMSException("Unable to select mailbox: {0}".format(self.imap_mailbox))
                else:
                    raise PySMSException("Unable to login to IMAP server with given credentials.")
            except imaplib.IMAP4.error:
                raise PySMSException("Unable to start IMAP server, please check address and SSL/TLS settings.")
项目:PyperGrabber    作者:pykong    | 项目源码 | 文件源码
def __init__(self, hostname, port=None, ssl=True):
        self.hostname = hostname
        self.port = port
        kwargs = {}

        if ssl:
            self.transport = IMAP4_SSL
            if not self.port:
                self.port = 993

        else:
            self.transport = IMAP4
            if not self.port:
                self.port = 143

        self.server = self.transport(self.hostname, self.port)
        logger.debug("Created IMAP4 transport for {host}:{port}"
                     .format(host=self.hostname, port=self.port))
项目:prestashop-sync    作者:dragoon    | 项目源码 | 文件源码
def connect(self):
        """Connects to and authenticates with an IMAP4 mail server"""

        import imaplib

        M = None
        try:
            if (self.keyfile and self.certfile) or self.ssl:
                M = imaplib.IMAP4_SSL(self.host, self.port, self.keyfile, self.certfile)
            else:
                M = imaplib.IMAP4(self.host, self.port)

            M.login(self.username, self.password)
            M.select()
        except socket.error, err:
            raise
        else:
            return M
项目:imap_tools    作者:ikvk    | 项目源码 | 文件源码
def __init__(self, host='', port=None, ssl=True, keyfile=None, certfile=None, ssl_context=None):
        """
        :param host: host's name (default: localhost)
        :param port: port number (default: standard IMAP4 SSL port)
        :param ssl: use client class over SSL connection (IMAP4_SSL) if True, else use IMAP4
        :param keyfile: PEM formatted file that contains your private key (default: None)
        :param certfile: PEM formatted certificate chain file (default: None)
        :param ssl_context: SSLContext object that contains your certificate chain and private key (default: None)
        Note: if ssl_context is provided, then parameters keyfile or
              certfile should not be set otherwise ValueError is raised.
        """
        self._host = host
        self._port = port
        self._keyfile = keyfile
        self._certfile = certfile
        self._ssl_context = ssl_context
        if ssl:
            self.box = imaplib.IMAP4_SSL(
                host, port or imaplib.IMAP4_SSL_PORT, keyfile, certfile, ssl_context)
        else:
            self.box = imaplib.IMAP4(host, port or imaplib.IMAP4_PORT)
        self._username = None
        self._password = None
        self._initial_folder = None
        self.folder = None
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_aborted_authentication(self):

        class MyServer(SimpleIMAPHandler):

            def cmd_AUTHENTICATE(self, tag, args):
                self._send_textline('+')
                self.response = yield

                if self.response == b'*\r\n':
                    self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] aborted')
                else:
                    self._send_tagged(tag, 'OK', 'MYAUTH successful')

        with self.reaped_pair(MyServer) as (server, client):
            with self.assertRaises(imaplib.IMAP4.error):
                code, data = client.authenticate('MYAUTH', lambda x: None)
项目:vieterp-mailbox    作者:vieterp    | 项目源码 | 文件源码
def connect(self):
        if self.source_id and self.source_id.id:
            self.ensure_one()
            if self.source_id.type == 'imap':
                if self.source_id.is_ssl:
                    connection = IMAP4_SSL(self.source_id.server, int(self.source_id.port))
                else:
                    connection = IMAP4(self.source_id.server, int(self.source_id.port))
                connection.login(self.user, self.password)
            elif self.type == 'pop':
                if self.source_id.is_ssl:
                    connection = POP3_SSL(self.source_id.server, int(self.source_id.port))
                else:
                    connection = POP3(self.source_id.server, int(self.source_id.port))
                # TODO: use this to remove only unread messages
                # connection.user("recent:"+server.user)
                connection.user(self.user)
                connection.pass_(self.password)
            # Add timeout on socket
            connection.sock.settimeout(MAIL_TIMEOUT)
            return connection
        return super(vieterp_fetchmail_server, self).connect()
项目:SPF    作者:Exploit-install    | 项目源码 | 文件源码
def connect(self, mailserver, port="143"):
        self.mailserver = mailserver
        self.port = port
        try:
            self.srv = imaplib.IMAP4(self.mailserver)
        except:
            self.srv = None
            pass
项目:SPF    作者:Exploit-install    | 项目源码 | 文件源码
def validate(self, user, password):
        if (not self.srv):
            return

        self.user = user
        self.password = password
        try:
            self.srv.login(user, password)
        except ssl.SSLError as e:
            return False
        except imaplib.IMAP4.error as e:
            return False
        return True
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def run(self):
        value = getword()
        try:
            print "-"*12
            print "User:",user[:-1],"Password:",value
            M = imaplib.IMAP4(ip)
            M = login(user[:-1], value)
            print "\t\nLogin successful:",user, value
            M.close()
            M.logout()
            work.join()
            sys.exit(2)
        except(IMAP4.error, socket.gaierror, socket.error, socket.herror), msg: 
            print "An error occurred:", msg
            pass
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def run(self):
        value = getword()
        try:
            print "-"*12
            print "User:",user[:-1],"Password:",value
            M = imaplib.IMAP4(ipaddr[0])
            M = login(user[:-1], value)
            print "\t\nLogin successful:",user, value
            M.close()
            M.logout()
            work.join()
            sys.exit(2)
        except(IMAP4.error, socket.gaierror, socket.error, socket.herror), msg: 
            print "An error occurred:", msg
            pass
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def run(self):
        value, user = getword()
        try:
            print "-"*12
            print "User:",user,"Password:",value
            M = imaplib.IMAP4(sys.argv[1])
            M = login(user, value)
            print "\t\nLogin successful:",user, value 
            M.close()
            M.logout()
            work.join()
            sys.exit(2)
        except(IMAP4.error, socket.gaierror, socket.error, socket.herror), msg: 
            print "An error occurred:", msg
            pass
项目:abusehelper    作者:Exploit-install    | 项目源码 | 文件源码
def __init__(self, host, port, timeout=None):
        self._timeout = timeout

        imaplib.IMAP4.__init__(self, host, port)
项目:abusehelper    作者:Exploit-install    | 项目源码 | 文件源码
def run_mailbox(self, min_delay=5.0, max_delay=60.0):
        mailbox = None

        try:
            while True:
                item = yield idiokit.next()

                while True:
                    delay = min(min_delay, max_delay)
                    while mailbox is None:
                        try:
                            mailbox = yield idiokit.thread(self.connect)
                        except (imaplib.IMAP4.abort, socket.error) as error:
                            self.log.error("Failed IMAP connection ({0})".format(utils.format_exception(error)))
                        else:
                            break

                        self.log.info("Retrying connection in {0:.2f} seconds".format(delay))
                        yield idiokit.sleep(delay)
                        delay = min(2 * delay, max_delay)

                    event, name, args, keys = item
                    if event.result().unsafe_is_set():
                        break

                    try:
                        method = getattr(mailbox, name)
                        result = yield idiokit.thread(method, *args, **keys)
                    except (imaplib.IMAP4.abort, socket.error) as error:
                        yield idiokit.thread(self.disconnect, mailbox)
                        self.log.error("Lost IMAP connection ({0})".format(utils.format_exception(error)))
                        mailbox = None
                    except imaplib.IMAP4.error as error:
                        event.fail(type(error), error, None)
                        break
                    else:
                        event.succeed(result)
                        break
        finally:
            if mailbox is not None:
                yield idiokit.thread(self.disconnect, mailbox)
项目:abusehelper    作者:Exploit-install    | 项目源码 | 文件源码
def connect(self):
        self.log.info("Connecting to IMAP server {0!r} port {1}".format(
            self.mail_server, self.mail_port))

        if self.mail_disable_ssl:
            mailbox = _IMAP4(
                self.mail_server,
                self.mail_port,
                timeout=self.mail_connection_timeout
            )
        else:
            mailbox = _IMAP4_SSL(
                self.mail_server,
                self.mail_port,
                timeout=self.mail_connection_timeout,
                ca_certs=self.mail_ca_certs
            )

        self.log.info("Logging in to IMAP server {0!r} port {1}".format(
            self.mail_server, self.mail_port))
        mailbox.login(self.mail_user, self.mail_password)
        try:
            status, msgs = mailbox.select(self.mail_box, readonly=False)

            if status != "OK":
                for msg in msgs:
                    raise imaplib.IMAP4.abort(msg)
        except:
            mailbox.logout()
            raise

        self.log.info("Logged in to IMAP server {0!r} port {1}".format(
            self.mail_server, self.mail_port))
        return mailbox
项目:abusehelper    作者:Exploit-install    | 项目源码 | 文件源码
def disconnect(self, mailbox):
        try:
            mailbox.close()
        except (imaplib.IMAP4.error, socket.error):
            pass

        try:
            mailbox.logout()
        except (imaplib.IMAP4.error, socket.error):
            pass
项目:abusehelper    作者:Exploit-install    | 项目源码 | 文件源码
def __init__(self, host, port, timeout=None):
        self._timeout = timeout

        imaplib.IMAP4.__init__(self, host, port)
项目:abusehelper    作者:Exploit-install    | 项目源码 | 文件源码
def connect(self):
        self.log.info("Connecting to IMAP server {0!r} port {1}".format(
            self.mail_server, self.mail_port))

        if self.mail_disable_ssl:
            mailbox = _IMAP4(
                self.mail_server,
                self.mail_port,
                timeout=self.mail_connection_timeout
            )
        else:
            mailbox = _IMAP4_SSL(
                self.mail_server,
                self.mail_port,
                timeout=self.mail_connection_timeout,
                ca_certs=self.mail_ca_certs
            )

        self.log.info("Logging in to IMAP server {0!r} port {1}".format(
            self.mail_server, self.mail_port))
        mailbox.login(self.mail_user, self.mail_password)
        try:
            status, msgs = mailbox.select(self.mail_box, readonly=False)

            if status != "OK":
                for msg in msgs:
                    raise imaplib.IMAP4.abort(msg)
        except:
            mailbox.logout()
            raise

        self.log.info("Logged in to IMAP server {0!r} port {1}".format(
            self.mail_server, self.mail_port))
        return mailbox
项目:abusehelper    作者:Exploit-install    | 项目源码 | 文件源码
def disconnect(self, mailbox):
        try:
            mailbox.close()
        except (imaplib.IMAP4.error, socket.error):
            pass

        try:
            mailbox.logout()
        except (imaplib.IMAP4.error, socket.error):
            pass
项目:centos-base-consul    作者:zeroc0d3lab    | 项目源码 | 文件源码
def compute_state(self, key):
        if not key.username or not key.password:
            self.warn('Username and password are not configured')
            return None
        if key.use_ssl:
            mail = IMAP4_SSL(key.server, key.port)
        else:
            mail = IMAP4(key.server, key.port)
        mail.login(key.username, key.password)
        rc, message = mail.status(key.folder, '(UNSEEN)')
        unread_str = message[0].decode('utf-8')
        unread_count = int(re.search('UNSEEN (\d+)', unread_str).group(1))
        return unread_count
项目:PiBunny    作者:tholum    | 项目源码 | 文件源码
def __init__(self, target):
        # Target comes as protocol://target:port
        self.target = target
        proto, host, port = target.split(':')
        host = host[2:]
        if int(port) == 993 or proto.upper() == 'IMAPS':
            self.session = imaplib.IMAP4_SSL(host,int(port))
        else:
            #assume non-ssl IMAP
            self.session = imaplib.IMAP4(host,port)
        if 'AUTH=NTLM' not in self.session.capabilities:
            logging.error('IMAP server does not support NTLM authentication!')
            return False
        self.authtag = self.session._new_tag()
        self.lastresult = None
项目:RadicaleIMAP    作者:Unrud    | 项目源码 | 文件源码
def is_authenticated(self, user, password):
        host = ""
        if self.configuration.has_option("auth", "imap_host"):
            host = self.configuration.get("auth", "imap_host")
        secure = True
        if self.configuration.has_option("auth", "imap_secure"):
            secure = self.configuration.getboolean("auth", "imap_secure")
        try:
            if ":" in host:
                address, port = host.rsplit(":", maxsplit=1)
            else:
                address, port = host, 143
            address, port = address.strip("[] "), int(port)
        except ValueError as e:
            raise RuntimeError(
                "Failed to parse address %r: %s" % (host, e)) from e
        if sys.version_info < (3, 4) and secure:
            raise RuntimeError("Secure IMAP is not availabe in Python < 3.4")
        try:
            connection = imaplib.IMAP4(host=address, port=port)
            try:
                if sys.version_info < (3, 4):
                    connection.starttls()
                else:
                    connection.starttls(ssl.create_default_context())
            except (imaplib.IMAP4.error, ssl.CertificateError) as e:
                if secure:
                    raise
                self.logger.debug("Failed to establish secure connection: %s",
                                  e, exc_info=True)
            try:
                connection.login(user, password)
            except imaplib.IMAP4.error as e:
                self.logger.debug(
                    "IMAP authentication failed: %s", e, exc_info=True)
                return False
            connection.logout()
            return True
        except (OSError, imaplib.IMAP4.error) as e:
            raise RuntimeError("Failed to communicate with IMAP server %r: "
                               "%s" % (host, e)) from e
项目:prestashop-sync    作者:dragoon    | 项目源码 | 文件源码
def fetch(self):
        """Fetches email messages from an IMAP4 server"""

        messages = {}

        typ, data = self.handle.search(None, 'ALL')
        for num in data[0].split():
            typ, data = self.handle.fetch(num, '(RFC822)')
            messages[num] = self.parse_email(data[0][1])

        return messages
项目:prestashop-sync    作者:dragoon    | 项目源码 | 文件源码
def disconnect(self):
        """Closes the IMAP4 handle"""

        self.handle.expunge()
        self.handle.close()
        self.handle.logout()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_issue5949(self):

        class EOFHandler(socketserver.StreamRequestHandler):
            def handle(self):
                # EOF without sending a complete welcome message.
                self.wfile.write(b'* OK')

        with self.reaped_server(EOFHandler) as server:
            self.assertRaises(imaplib.IMAP4.abort,
                              self.imap_class, *server.server_address)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_line_termination(self):

        class BadNewlineHandler(SimpleIMAPHandler):

            def cmd_CAPABILITY(self, tag, args):
                self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
                self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode('ASCII'))

        with self.reaped_server(BadNewlineHandler) as server:
            self.assertRaises(imaplib.IMAP4.abort,
                              self.imap_class, *server.server_address)
项目:qqbot    作者:pandolia    | 项目源码 | 文件源码
def __init__(self, account, auth_code, name='', **config):
        account_name, server_name = account.split('@')

        self.smtp = 'smtp.' + server_name
        self.imap = 'imap.' + server_name
        self.server_name = server_name
        self.smtp_port = 0
        self.imap_port = 0
        self.use_ssl = True

        self.__dict__.update(SERVER_LIB.get(server_name, {}))
        self.__dict__.update(config)

        self.name = '%s <%s>' % (name or account_name, account)
        self.account = account
        self.auth_code = auth_code

        st_SMTP = smtplib.SMTP_SSL if self.use_ssl else smtplib.SMTP
        st_IMAP = imaplib.IMAP4_SSL if self.use_ssl else imaplib.IMAP4

        if self.smtp_port:
            self.st_SMTP = lambda : st_SMTP(self.smtp, self.smtp_port)
        else:
            self.st_SMTP = lambda : st_SMTP(self.smtp)

        if self.imap_port:
            self.st_IMAP = lambda : st_IMAP(self.imap, self.imap_port)
        else:
            self.st_IMAP = lambda : st_IMAP(self.imap)

        self.SMTP = lambda : SMTP(self)
        self.IMAP = lambda : IMAP(self)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_issue5949(self):

        class EOFHandler(SocketServer.StreamRequestHandler):
            def handle(self):
                # EOF without sending a complete welcome message.
                self.wfile.write('* OK')

        with self.reaped_server(EOFHandler) as server:
            self.assertRaises(imaplib.IMAP4.abort,
                              self.imap_class, *server.server_address)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_linetoolong(self):
        class TooLongHandler(SimpleIMAPHandler):
            def handle(self):
                # Send a very long response line
                self.wfile.write('* OK ' + imaplib._MAXLINE*'x' + '\r\n')

        with self.reaped_server(TooLongHandler) as server:
            self.assertRaises(imaplib.IMAP4.error,
                              self.imap_class, *server.server_address)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_issue5949(self):

        class EOFHandler(SocketServer.StreamRequestHandler):
            def handle(self):
                # EOF without sending a complete welcome message.
                self.wfile.write('* OK')

        with self.reaped_server(EOFHandler) as server:
            self.assertRaises(imaplib.IMAP4.abort,
                              self.imap_class, *server.server_address)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_linetoolong(self):
        class TooLongHandler(SimpleIMAPHandler):
            def handle(self):
                # Send a very long response line
                self.wfile.write('* OK ' + imaplib._MAXLINE*'x' + '\r\n')

        with self.reaped_server(TooLongHandler) as server:
            self.assertRaises(imaplib.IMAP4.error,
                              self.imap_class, *server.server_address)
项目:CVE-2017-7494    作者:joxeankoret    | 项目源码 | 文件源码
def __init__(self, target):
        # Target comes as protocol://target:port
        self.target = target
        proto, host, port = target.split(':')
        host = host[2:]
        if int(port) == 993 or proto.upper() == 'IMAPS':
            self.session = imaplib.IMAP4_SSL(host,int(port))
        else:
            #assume non-ssl IMAP
            self.session = imaplib.IMAP4(host,port)
        if 'AUTH=NTLM' not in self.session.capabilities:
            logging.error('IMAP server does not support NTLM authentication!')
            return False
        self.authtag = self.session._new_tag()
        self.lastresult = None
项目:grical    作者:wikical    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs): # {{{2
        if settings.IMAP_SSL:
            self.mailbox = IMAP4_SSL(
                    host = settings.IMAP_SERVER, port = settings.IMAP_PORT)
        else:
            self.mailbox = IMAP4( settings.IMAP_SERVER )
        self.mailbox.login( settings.IMAP_LOGIN, settings.IMAP_PASSWD )
        self.mailbox.select()
        super( Command, self ).__init__( *args, **kwargs )
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_issue5949(self):

        class EOFHandler(socketserver.StreamRequestHandler):
            def handle(self):
                # EOF without sending a complete welcome message.
                self.wfile.write(b'* OK')

        with self.reaped_server(EOFHandler) as server:
            self.assertRaises(imaplib.IMAP4.abort,
                              self.imap_class, *server.server_address)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_line_termination(self):

        class BadNewlineHandler(SimpleIMAPHandler):

            def cmd_CAPABILITY(self, tag, args):
                self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
                self._send_tagged(tag, 'OK', 'CAPABILITY completed')

        with self.reaped_server(BadNewlineHandler) as server:
            self.assertRaises(imaplib.IMAP4.abort,
                              self.imap_class, *server.server_address)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_bad_auth_name(self):

        class MyServer(SimpleIMAPHandler):

            def cmd_AUTHENTICATE(self, tag, args):
                self._send_tagged(tag, 'NO', 'unrecognized authentication '
                        'type {}'.format(args[0]))

        with self.reaped_pair(MyServer) as (server, client):
            with self.assertRaises(imaplib.IMAP4.error):
                client.authenticate('METHOD', lambda: 1)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_invalid_authentication(self):

        class MyServer(SimpleIMAPHandler):

            def cmd_AUTHENTICATE(self, tag, args):
                self._send_textline('+')
                self.response = yield
                self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid')

        with self.reaped_pair(MyServer) as (server, client):
            with self.assertRaises(imaplib.IMAP4.error):
                code, data = client.authenticate('MYAUTH', lambda x: b'fake')
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_linetoolong(self):
        class TooLongHandler(SimpleIMAPHandler):
            def handle(self):
                # Send a very long response line
                self.wfile.write(b'* OK ' + imaplib._MAXLINE*b'x' + b'\r\n')

        with self.reaped_server(TooLongHandler) as server:
            self.assertRaises(imaplib.IMAP4.error,
                              self.imap_class, *server.server_address)
项目:Python_Master-the-Art-of-Design-Patterns    作者:PacktPublishing    | 项目源码 | 文件源码
def get_inbox(self):
        mailbox = imaplib.IMAP4(self.host)
        mailbox.login(bytes(self.username, 'utf8'), 
            bytes(self.password, 'utf8'))
        mailbox.select()
        x, data = mailbox.search(None, 'ALL')
        messages = []
        for num in data[0].split():
            x, message = mailbox.fetch(num, '(RFC822)')
            messages.append(message[0][1])
        return messages
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_issue5949(self):

        class EOFHandler(SocketServer.StreamRequestHandler):
            def handle(self):
                # EOF without sending a complete welcome message.
                self.wfile.write('* OK')

        with self.reaped_server(EOFHandler) as server:
            self.assertRaises(imaplib.IMAP4.abort,
                              self.imap_class, *server.server_address)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_linetoolong(self):
        class TooLongHandler(SimpleIMAPHandler):
            def handle(self):
                # Send a very long response line
                self.wfile.write('* OK ' + imaplib._MAXLINE*'x' + '\r\n')

        with self.reaped_server(TooLongHandler) as server:
            self.assertRaises(imaplib.IMAP4.error,
                              self.imap_class, *server.server_address)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_issue5949(self):

        class EOFHandler(socketserver.StreamRequestHandler):
            def handle(self):
                # EOF without sending a complete welcome message.
                self.wfile.write(b'* OK')

        with self.reaped_server(EOFHandler) as server:
            self.assertRaises(imaplib.IMAP4.abort,
                              self.imap_class, *server.server_address)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_line_termination(self):

        class BadNewlineHandler(SimpleIMAPHandler):

            def cmd_CAPABILITY(self, tag, args):
                self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
                self._send_tagged(tag, 'OK', 'CAPABILITY completed')

        with self.reaped_server(BadNewlineHandler) as server:
            self.assertRaises(imaplib.IMAP4.abort,
                              self.imap_class, *server.server_address)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_bad_auth_name(self):

        class MyServer(SimpleIMAPHandler):

            def cmd_AUTHENTICATE(self, tag, args):
                self._send_tagged(tag, 'NO', 'unrecognized authentication '
                        'type {}'.format(args[0]))

        with self.reaped_pair(MyServer) as (server, client):
            with self.assertRaises(imaplib.IMAP4.error):
                client.authenticate('METHOD', lambda: 1)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_invalid_authentication(self):

        class MyServer(SimpleIMAPHandler):

            def cmd_AUTHENTICATE(self, tag, args):
                self._send_textline('+')
                self.response = yield
                self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid')

        with self.reaped_pair(MyServer) as (server, client):
            with self.assertRaises(imaplib.IMAP4.error):
                code, data = client.authenticate('MYAUTH', lambda x: b'fake')
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_issue5949(self):

        class EOFHandler(SocketServer.StreamRequestHandler):
            def handle(self):
                # EOF without sending a complete welcome message.
                self.wfile.write('* OK')

        with self.reaped_server(EOFHandler) as server:
            self.assertRaises(imaplib.IMAP4.abort,
                              self.imap_class, *server.server_address)
项目:alicloud-duplicity    作者:aliyun    | 项目源码 | 文件源码
def resetConnection(self):
        parsed_url = self.url
        try:
            imap_server = os.environ['IMAP_SERVER']
        except KeyError:
            imap_server = parsed_url.hostname

        #  Try to close the connection cleanly
        try:
            self.conn.close()
        except Exception:
            pass

        if (parsed_url.scheme == "imap"):
            cl = imaplib.IMAP4
            self.conn = cl(imap_server, 143)
        elif (parsed_url.scheme == "imaps"):
            cl = imaplib.IMAP4_SSL
            self.conn = cl(imap_server, 993)

        log.Debug("Type of imap class: %s" % (cl.__name__))
        self.remote_dir = re.sub(r'^/', r'', parsed_url.path, 1)

        #  Login
        if (not(globals.imap_full_address)):
            self.conn.login(self.username, self.password)
            self.conn.select(globals.imap_mailbox)
            log.Info("IMAP connected")
        else:
            self.conn.login(self.username + "@" + parsed_url.hostname, self.password)
            self.conn.select(globals.imap_mailbox)
            log.Info("IMAP connected")
项目:alicloud-duplicity    作者:aliyun    | 项目源码 | 文件源码
def _put(self, source_path, remote_filename):
        f = source_path.open("rb")
        allowedTimeout = globals.timeout
        if (allowedTimeout == 0):
            # Allow a total timeout of 1 day
            allowedTimeout = 2880
        while allowedTimeout > 0:
            try:
                self.conn.select(remote_filename)
                body = self.prepareBody(f, remote_filename)
                # If we don't select the IMAP folder before
                # append, the message goes into the INBOX.
                self.conn.select(globals.imap_mailbox)
                self.conn.append(globals.imap_mailbox, None, None, body)
                break
            except (imaplib.IMAP4.abort, socket.error, socket.sslerror):
                allowedTimeout -= 1
                log.Info("Error saving '%s', retrying in 30s " % remote_filename)
                time.sleep(30)
                while allowedTimeout > 0:
                    try:
                        self.resetConnection()
                        break
                    except (imaplib.IMAP4.abort, socket.error, socket.sslerror):
                        allowedTimeout -= 1
                        log.Info("Error reconnecting, retrying in 30s ")
                        time.sleep(30)

        log.Info("IMAP mail with '%s' subject stored" % remote_filename)
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def __init__(self, timeout, host='', port=993, keyfile=None, certfile=None):
        self.keyfile = keyfile
        self.certfile = certfile
        self.timeout = timeout
        self.sslobj = None
        imaplib.IMAP4.__init__(self, host, port)

    # pylint: disable=W0222
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def open(self, host, port):
        """Setup connection to remote server on "host:port".
            (default: localhost:standard IMAP4 SSL port).
        This connection will be used by the routines:
            read, readline, send, shutdown.
        """
        self.host = host
        self.port = port
        self.sock = socket.create_connection((host, port))
        self.sslobj = socket.ssl(self.sock, self.keyfile, self.certfile)
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def socket(self):
        """Return socket instance used to connect to IMAP4 server.

        socket = <instance>.socket()
        """
        return self.sock