Python ssl 模块,HAS_SNI 实例源码

我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用ssl.HAS_SNI

项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def connect(self):
            "Connect to a host on a given (SSL) port."

            sock = socket_create_connection((self.host, self.port),
                                            self.timeout, self.source_address)

            if self._tunnel_host:
                self.sock = sock
                self._tunnel()

            server_hostname = self.host if ssl.HAS_SNI else None
            self.sock = self._context.wrap_socket(sock,
                                                  server_hostname=server_hostname)
            try:
                if self._check_hostname:
                    ssl.match_hostname(self.sock.getpeercert(), self.host)
            except Exception:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def connect(self):
            "Connect to a host on a given (SSL) port."

            sock = socket.create_connection((self.host, self.port),
                                            self.timeout, self.source_address)

            if self._tunnel_host:
                self.sock = sock
                self._tunnel()

            server_hostname = self.host if ssl.HAS_SNI else None
            self.sock = self._context.wrap_socket(sock,
                                                  server_hostname=server_hostname)
            try:
                if self._check_hostname:
                    ssl.match_hostname(self.sock.getpeercert(), self.host)
            except Exception:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_sni(self):
        self.skipTest("test disabled - test server needed")
        # Checks that Server Name Indication works, if supported by the
        # OpenSSL linked to.
        # The ssl module itself doesn't have server-side support for SNI,
        # so we rely on a third-party test site.
        expect_sni = ssl.HAS_SNI
        with support.transient_internet("XXX"):
            u = urllib.request.urlopen("XXX")
            contents = u.readall()
            if expect_sni:
                self.assertIn(b"Great", contents)
                self.assertNotIn(b"Unfortunately", contents)
            else:
                self.assertNotIn(b"Great", contents)
                self.assertIn(b"Unfortunately", contents)
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def connect(self):
            "Connect to a host on a given (SSL) port."

            sock = socket_create_connection((self.host, self.port),
                                            self.timeout, self.source_address)

            if self._tunnel_host:
                self.sock = sock
                self._tunnel()

            server_hostname = self.host if ssl.HAS_SNI else None
            self.sock = self._context.wrap_socket(sock,
                                                  server_hostname=server_hostname)
            try:
                if self._check_hostname:
                    ssl.match_hostname(self.sock.getpeercert(), self.host)
            except Exception:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
项目:islam-buddy    作者:hamir    | 项目源码 | 文件源码
def connect(self):
            "Connect to a host on a given (SSL) port."

            sock = socket_create_connection((self.host, self.port),
                                            self.timeout, self.source_address)

            if self._tunnel_host:
                self.sock = sock
                self._tunnel()

            server_hostname = self.host if ssl.HAS_SNI else None
            self.sock = self._context.wrap_socket(sock,
                                                  server_hostname=server_hostname)
            try:
                if self._check_hostname:
                    ssl.match_hostname(self.sock.getpeercert(), self.host)
            except Exception:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_sni(self):
        self.skipTest("test disabled - test server needed")
        # Checks that Server Name Indication works, if supported by the
        # OpenSSL linked to.
        # The ssl module itself doesn't have server-side support for SNI,
        # so we rely on a third-party test site.
        expect_sni = ssl.HAS_SNI
        with support.transient_internet("XXX"):
            u = urllib.request.urlopen("XXX")
            contents = u.readall()
            if expect_sni:
                self.assertIn(b"Great", contents)
                self.assertNotIn(b"Unfortunately", contents)
            else:
                self.assertNotIn(b"Great", contents)
                self.assertIn(b"Unfortunately", contents)
项目:FightstickDisplay    作者:calexil    | 项目源码 | 文件源码
def connect(self):
            "Connect to a host on a given (SSL) port."

            sock = socket.create_connection((self.host, self.port),
                                            self.timeout, self.source_address)

            if self._tunnel_host:
                self.sock = sock
                self._tunnel()

            server_hostname = self.host if ssl.HAS_SNI else None
            self.sock = self._context.wrap_socket(sock,
                                                  server_hostname=server_hostname)
            try:
                if self._check_hostname:
                    ssl.match_hostname(self.sock.getpeercert(), self.host)
            except Exception:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
项目:cryptogram    作者:xinmingzhang    | 项目源码 | 文件源码
def connect(self):
            "Connect to a host on a given (SSL) port."

            sock = socket.create_connection((self.host, self.port),
                                            self.timeout, self.source_address)

            if self._tunnel_host:
                self.sock = sock
                self._tunnel()

            server_hostname = self.host if ssl.HAS_SNI else None
            self.sock = self._context.wrap_socket(sock,
                                                  server_hostname=server_hostname)
            try:
                if self._check_hostname:
                    ssl.match_hostname(self.sock.getpeercert(), self.host)
            except Exception:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
项目:Repobot    作者:Desgard    | 项目源码 | 文件源码
def connect(self):
            "Connect to a host on a given (SSL) port."

            sock = socket_create_connection((self.host, self.port),
                                            self.timeout, self.source_address)

            if self._tunnel_host:
                self.sock = sock
                self._tunnel()

            server_hostname = self.host if ssl.HAS_SNI else None
            self.sock = self._context.wrap_socket(sock,
                                                  server_hostname=server_hostname)
            try:
                if self._check_hostname:
                    ssl.match_hostname(self.sock.getpeercert(), self.host)
            except Exception:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
项目:UMOG    作者:hsab    | 项目源码 | 文件源码
def connect(self):
            "Connect to a host on a given (SSL) port."

            sock = socket_create_connection((self.host, self.port),
                                            self.timeout, self.source_address)

            if self._tunnel_host:
                self.sock = sock
                self._tunnel()

            server_hostname = self.host if ssl.HAS_SNI else None
            self.sock = self._context.wrap_socket(sock,
                                                  server_hostname=server_hostname)
            try:
                if self._check_hostname:
                    ssl.match_hostname(self.sock.getpeercert(), self.host)
            except Exception:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
项目:blackmamba    作者:zrzka    | 项目源码 | 文件源码
def connect(self):
            "Connect to a host on a given (SSL) port."

            sock = socket_create_connection((self.host, self.port),
                                            self.timeout, self.source_address)

            if self._tunnel_host:
                self.sock = sock
                self._tunnel()

            server_hostname = self.host if ssl.HAS_SNI else None
            self.sock = self._context.wrap_socket(sock,
                                                  server_hostname=server_hostname)
            try:
                if self._check_hostname:
                    ssl.match_hostname(self.sock.getpeercert(), self.host)
            except Exception:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def starttls(self, ssl_context=None):
        name = 'STARTTLS'
        if not HAVE_SSL:
            raise self.error('SSL support missing')
        if self._tls_established:
            raise self.abort('TLS session already established')
        if name not in self.capabilities:
            raise self.abort('TLS not supported by server')
        # Generate a default SSL context if none was passed.
        if ssl_context is None:
            ssl_context = ssl._create_stdlib_context()
        typ, dat = self._simple_command(name)
        if typ == 'OK':
            server_hostname = self.host if ssl.HAS_SNI else None
            self.sock = ssl_context.wrap_socket(self.sock,
                                                server_hostname=server_hostname)
            self.file = self.sock.makefile('rb')
            self._tls_established = True
            self._get_capabilities()
        else:
            raise self.error("Couldn't establish TLS session")
        return self._untagged_response(typ, dat, name)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def connect(self):
            "Connect to a host on a given (SSL) port."

            super().connect()

            if self._tunnel_host:
                server_hostname = self._tunnel_host
            else:
                server_hostname = self.host
            sni_hostname = server_hostname if ssl.HAS_SNI else None

            self.sock = self._context.wrap_socket(self.sock,
                                                  server_hostname=sni_hostname)
            if not self._context.check_hostname and self._check_hostname:
                try:
                    ssl.match_hostname(self.sock.getpeercert(), server_hostname)
                except Exception:
                    self.sock.shutdown(socket.SHUT_RDWR)
                    self.sock.close()
                    raise
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def stls(self, context=None):
        """Start a TLS session on the active connection as specified in RFC 2595.

                context - a ssl.SSLContext
        """
        if not HAVE_SSL:
            raise error_proto('-ERR TLS support missing')
        if self._tls_established:
            raise error_proto('-ERR TLS session already established')
        caps = self.capa()
        if not 'STLS' in caps:
            raise error_proto('-ERR STLS not supported by server')
        if context is None:
            context = ssl._create_stdlib_context()
        resp = self._shortcmd('STLS')
        server_hostname = self.host if ssl.HAS_SNI else None
        self.sock = context.wrap_socket(self.sock,
                                        server_hostname=server_hostname)
        self.file = self.sock.makefile('rb')
        self._tls_established = True
        return resp
项目:beepboop    作者:nicolehe    | 项目源码 | 文件源码
def connect(self):
            "Connect to a host on a given (SSL) port."

            sock = socket_create_connection((self.host, self.port),
                                            self.timeout, self.source_address)

            if self._tunnel_host:
                self.sock = sock
                self._tunnel()

            server_hostname = self.host if ssl.HAS_SNI else None
            self.sock = self._context.wrap_socket(sock,
                                                  server_hostname=server_hostname)
            try:
                if self._check_hostname:
                    ssl.match_hostname(self.sock.getpeercert(), self.host)
            except Exception:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
项目:hackathon    作者:vertica    | 项目源码 | 文件源码
def connect(self):
            "Connect to a host on a given (SSL) port."

            sock = socket_create_connection((self.host, self.port),
                                            self.timeout, self.source_address)

            if self._tunnel_host:
                self.sock = sock
                self._tunnel()

            server_hostname = self.host if ssl.HAS_SNI else None
            self.sock = self._context.wrap_socket(sock,
                                                  server_hostname=server_hostname)
            try:
                if self._check_hostname:
                    ssl.match_hostname(self.sock.getpeercert(), self.host)
            except Exception:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
项目:yatta_reader    作者:sound88    | 项目源码 | 文件源码
def connect(self):
            "Connect to a host on a given (SSL) port."

            sock = socket_create_connection((self.host, self.port),
                                            self.timeout, self.source_address)

            if self._tunnel_host:
                self.sock = sock
                self._tunnel()

            server_hostname = self.host if ssl.HAS_SNI else None
            self.sock = self._context.wrap_socket(sock,
                                                  server_hostname=server_hostname)
            try:
                if self._check_hostname:
                    ssl.match_hostname(self.sock.getpeercert(), self.host)
            except Exception:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_https_sni(self):
        if ssl is None:
            self.skipTest("ssl module required")
        if not ssl.HAS_SNI:
            self.skipTest("SNI support required in OpenSSL")
        sni_name = [None]
        def cb_sni(ssl_sock, server_name, initial_context):
            sni_name[0] = server_name
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        context.set_servername_callback(cb_sni)
        handler = self.start_https_server(context=context, certfile=CERT_localhost)
        context = ssl.create_default_context(cafile=CERT_localhost)
        self.urlopen("https://localhost:%s" % handler.port, context=context)
        self.assertEqual(sni_name[0], "localhost")
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_https_sni(self):
        if ssl is None:
            self.skipTest("ssl module required")
        if not ssl.HAS_SNI:
            self.skipTest("SNI support required in OpenSSL")
        sni_name = [None]
        def cb_sni(ssl_sock, server_name, initial_context):
            sni_name[0] = server_name
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        context.set_servername_callback(cb_sni)
        handler = self.start_https_server(context=context, certfile=CERT_localhost)
        context = ssl.create_default_context(cafile=CERT_localhost)
        self.urlopen("https://localhost:%s" % handler.port, context=context)
        self.assertEqual(sni_name[0], "localhost")
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_https_sni(self):
        if ssl is None:
            self.skipTest("ssl module required")
        if not ssl.HAS_SNI:
            self.skipTest("SNI support required in OpenSSL")
        sni_name = [None]
        def cb_sni(ssl_sock, server_name, initial_context):
            sni_name[0] = server_name
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        context.set_servername_callback(cb_sni)
        handler = self.start_https_server(context=context, certfile=CERT_localhost)
        context = ssl.create_default_context(cafile=CERT_localhost)
        self.urlopen("https://localhost:%s" % handler.port, context=context)
        self.assertEqual(sni_name[0], "localhost")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_https_sni(self):
        if ssl is None:
            self.skipTest("ssl module required")
        if not ssl.HAS_SNI:
            self.skipTest("SNI support required in OpenSSL")
        sni_name = None
        def cb_sni(ssl_sock, server_name, initial_context):
            nonlocal sni_name
            sni_name = server_name
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        context.set_servername_callback(cb_sni)
        handler = self.start_https_server(context=context, certfile=CERT_localhost)
        context = ssl.create_default_context(cafile=CERT_localhost)
        self.urlopen("https://localhost:%s" % handler.port, context=context)
        self.assertEqual(sni_name, "localhost")
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def _create_socket(self):
            sock = IMAP4._create_socket(self)
            server_hostname = self.host if ssl.HAS_SNI else None
            return self.ssl_context.wrap_socket(sock,
                                                server_hostname=server_hostname)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def _get_socket(self, host, port, timeout):
            if self.debuglevel > 0:
                print('connect:', (host, port), file=stderr)
            new_socket = socket.create_connection((host, port), timeout,
                    self.source_address)
            server_hostname = self._host if ssl.HAS_SNI else None
            new_socket = self.context.wrap_socket(new_socket,
                                                  server_hostname=server_hostname)
            return new_socket
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def auth(self):
            '''Set up secure control connection by using TLS/SSL.'''
            if isinstance(self.sock, ssl.SSLSocket):
                raise ValueError("Already using TLS")
            if self.ssl_version == ssl.PROTOCOL_TLSv1:
                resp = self.voidcmd('AUTH TLS')
            else:
                resp = self.voidcmd('AUTH SSL')
            server_hostname = self.host if ssl.HAS_SNI else None
            self.sock = self.context.wrap_socket(self.sock,
                                                 server_hostname=server_hostname)
            self.file = self.sock.makefile(mode='r', encoding=self.encoding)
            return resp
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def ntransfercmd(self, cmd, rest=None):
            conn, size = FTP.ntransfercmd(self, cmd, rest)
            if self._prot_p:
                server_hostname = self.host if ssl.HAS_SNI else None
                conn = self.context.wrap_socket(conn,
                                                server_hostname=server_hostname)
            return conn, size
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_https_sni(self):
        if ssl is None:
            self.skipTest("ssl module required")
        if not ssl.HAS_SNI:
            self.skipTest("SNI support required in OpenSSL")
        sni_name = None
        def cb_sni(ssl_sock, server_name, initial_context):
            nonlocal sni_name
            sni_name = server_name
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        context.set_servername_callback(cb_sni)
        handler = self.start_https_server(context=context, certfile=CERT_localhost)
        self.urlopen("https://localhost:%s" % handler.port)
        self.assertEqual(sni_name, "localhost")
项目:orizonhub    作者:gumblex    | 项目源码 | 文件源码
def connect(self, addr=('irc.freenode.net', 6667), use_ssl=False):
        '''Connect to a IRC server. addr is a tuple of (server, port)'''
        self.acquire_lock()
        self.addr = (rmnlsp(addr[0]), addr[1])

        for res in socket.getaddrinfo(self.addr[0], self.addr[1], socket.AF_UNSPEC, socket.SOCK_STREAM):
            af, socktype, proto, canonname, sa = res
            try:
                if use_ssl:
                    if (3,) <= sys.version_info < (3, 3):
                        self.sock = ssl.SSLSocket(af, socktype, proto)
                    elif sys.version_info >= (3, 4):
                        ctx = ssl.create_default_context()
                        if ssl.HAS_SNI:
                            self.sock = ctx.wrap_socket(socket.socket(af, socktype, proto), server_hostname=self.addr[0])
                        else:
                            self.sock = ctx.wrap_socket(socket.socket(af, socktype, proto))
                    else:
                        self.sock = ssl.SSLSocket(sock=socket.socket(af, socktype, proto))
                else:
                    self.sock = socket.socket(af, socktype, proto)
            except socket.error:
                self.sock = None
                continue
            try:
                self.sock.settimeout(300)
                self.sock.connect(sa)
            except socket.error:
                self.sock.close()
                self.sock = None
                continue
            break

        if self.sock is None:
            e = socket.error(
                '[errno %d] Socket operation on non-socket' % errno.ENOTSOCK)
            e.errno = errno.ENOTSOCK
            self.lock.release()
            raise e

        self.nick = None
        self.recvbuf = b''
        self.sendbuf = b''
        self.lock.release()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def starttls(self, keyfile=None, certfile=None, context=None):
        """Puts the connection to the SMTP server into TLS mode.

        If there has been no previous EHLO or HELO command this session, this
        method tries ESMTP EHLO first.

        If the server supports TLS, this will encrypt the rest of the SMTP
        session. If you provide the keyfile and certfile parameters,
        the identity of the SMTP server and client can be checked. This,
        however, depends on whether the socket module really checks the
        certificates.

        This method may raise the following exceptions:

         SMTPHeloError            The server didn't reply properly to
                                  the helo greeting.
        """
        self.ehlo_or_helo_if_needed()
        if not self.has_extn("starttls"):
            raise SMTPException("STARTTLS extension not supported by server.")
        (resp, reply) = self.docmd("STARTTLS")
        if resp == 220:
            if not _have_ssl:
                raise RuntimeError("No SSL support included in this Python")
            if context is not None and keyfile is not None:
                raise ValueError("context and keyfile arguments are mutually "
                                 "exclusive")
            if context is not None and certfile is not None:
                raise ValueError("context and certfile arguments are mutually "
                                 "exclusive")
            if context is None:
                context = ssl._create_stdlib_context(certfile=certfile,
                                                     keyfile=keyfile)
            server_hostname = self._host if ssl.HAS_SNI else None
            self.sock = context.wrap_socket(self.sock,
                                            server_hostname=server_hostname)
            self.file = None
            # RFC 3207:
            # The client MUST discard any knowledge obtained from
            # the server, such as the list of SMTP service extensions,
            # which was not obtained from the TLS negotiation itself.
            self.helo_resp = None
            self.ehlo_resp = None
            self.esmtp_features = {}
            self.does_esmtp = 0
        return (resp, reply)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def __init__(self, loop, rawsock, protocol, sslcontext, waiter=None,
                 server_side=False, server_hostname=None,
                 extra=None, server=None):
        if ssl is None:
            raise RuntimeError('stdlib ssl module not available')

        if server_side:
            if not sslcontext:
                raise ValueError('Server side ssl needs a valid SSLContext')
        else:
            if not sslcontext:
                # Client side may pass ssl=True to use a default
                # context; in that case the sslcontext passed is None.
                # The default is the same as used by urllib with
                # cadefault=True.
                if hasattr(ssl, '_create_stdlib_context'):
                    sslcontext = ssl._create_stdlib_context(
                        cert_reqs=ssl.CERT_REQUIRED,
                        check_hostname=bool(server_hostname))
                else:
                    # Fallback for Python 3.3.
                    sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
                    sslcontext.options |= ssl.OP_NO_SSLv2
                    sslcontext.set_default_verify_paths()
                    sslcontext.verify_mode = ssl.CERT_REQUIRED

        wrap_kwargs = {
            'server_side': server_side,
            'do_handshake_on_connect': False,
        }
        if server_hostname and not server_side and ssl.HAS_SNI:
            wrap_kwargs['server_hostname'] = server_hostname
        sslsock = sslcontext.wrap_socket(rawsock, **wrap_kwargs)

        super().__init__(loop, sslsock, protocol, extra, server)

        self._server_hostname = server_hostname
        self._waiter = waiter
        self._sslcontext = sslcontext
        self._paused = False

        # SSL-specific extra info.  (peercert is set later)
        self._extra.update(sslcontext=sslcontext)

        if self._loop.get_debug():
            logger.debug("%r starts SSL handshake", self)
            start_time = self._loop.time()
        else:
            start_time = None
        self._on_handshake(start_time)
项目:wsgiprox    作者:webrecorder    | 项目源码 | 文件源码
def __init__(self, wsgi,
                 prefix_resolver=None,
                 download_host=None,
                 proxy_host=None,
                 proxy_options=None,
                 proxy_apps=None):

        self._wsgi = wsgi
        self.set_connection_class()

        if isinstance(prefix_resolver, str):
            prefix_resolver = FixedResolver(prefix_resolver)

        self.prefix_resolver = prefix_resolver or FixedResolver()

        self.proxy_apps = proxy_apps or {}

        self.proxy_host = proxy_host or self.DEFAULT_HOST

        #self.has_sni = hasattr(ssl, 'HAS_SNI') and ssl.HAS_SNI
        #self.has_alpn = hasattr(ssl, 'HAS_ALPN') and ssl.HAS_ALPN

        if self.proxy_host not in self.proxy_apps:
            self.proxy_apps[self.proxy_host] = None

        # HTTPS Only Options
        proxy_options = proxy_options or {}

        ca_name = proxy_options.get('ca_name', self.CA_ROOT_NAME)

        ca_file_cache = proxy_options.get('ca_file_cache', self.CA_ROOT_FILE)

        self.ca = CertificateAuthority(ca_name=ca_name,
                                       ca_file_cache=ca_file_cache,
                                       cert_cache=None,
                                       cert_not_before=-3600)

        try:
            self.root_ca_file = self.ca.get_root_pem_filename()
        except Exception as e:
            self.root_ca_file = None

        self.use_wildcard = proxy_options.get('use_wildcard_certs', True)

        if proxy_options.get('enable_cert_download', True):
            download_host = download_host or self.DEFAULT_HOST
            self.proxy_apps[download_host] = CertDownloader(self.ca)

        self.enable_ws = proxy_options.get('enable_websockets', True)
        if WebSocketHandler == object:
            self.enable_ws = None