Python OpenSSL.SSL 模块,VERIFY_NONE 实例源码

我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用OpenSSL.SSL.VERIFY_NONE

项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def ssl(sock, keyfile=None, certfile=None):
    context = SSL.Context(SSL.SSLv23_METHOD)
    if certfile is not None:
        context.use_certificate_file(certfile)
    if keyfile is not None:
        context.use_privatekey_file(keyfile)
    context.set_verify(SSL.VERIFY_NONE, lambda *x: True)
    timeout = sock.gettimeout()
    try:
        sock = sock._sock
    except AttributeError:
        pass
    connection = SSL.Connection(context, sock)
    ssl_sock = SSLObject(connection)
    ssl_sock.settimeout(timeout)

    try:
        sock.getpeername()
    except Exception:
        # no, no connection yet
        pass
    else:
        # yes, do the handshake
        ssl_sock.do_handshake()
    return ssl_sock
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def ssl(sock, keyfile=None, certfile=None):
    context = SSL.Context(SSL.SSLv23_METHOD)
    if certfile is not None:
        context.use_certificate_file(certfile)
    if keyfile is not None:
        context.use_privatekey_file(keyfile)
    context.set_verify(SSL.VERIFY_NONE, lambda *x: True)
    timeout = sock.gettimeout()
    try:
        sock = sock._sock
    except AttributeError:
        pass
    connection = SSL.Connection(context, sock)
    ssl_sock = SSLObject(connection)
    ssl_sock.settimeout(timeout)

    try:
        sock.getpeername()
    except Exception:
        # no, no connection yet
        pass
    else:
        # yes, do the handshake
        ssl_sock.do_handshake()
    return ssl_sock
项目:microProxy    作者:mike820324    | 项目源码 | 文件源码
def test_start_tls_smtp(self):
        def verify_cb(conn, x509, err_num, err_depth, err_code):
            return True
        # This flow is simplified from RFC 3207 section 5.
        # We don't really need all of this, but it helps to make sure
        # that after realistic back-and-forth traffic the buffers end up
        # in a sane state.
        yield self.server_send_line(b"220 mail.example.com ready\r\n")
        yield self.client_send_line(b"EHLO mail.example.com\r\n")
        yield self.server_send_line(b"250-mail.example.com welcome\r\n")
        yield self.server_send_line(b"250 STARTTLS\r\n")
        yield self.client_send_line(b"STARTTLS\r\n")
        yield self.server_send_line(b"220 Go ahead\r\n")
        client_future = self.client_start_tls(
            _client_ssl_options(SSL.VERIFY_NONE, verify_cb))

        server_future = self.server_start_tls(_server_ssl_options())
        self.client_stream = yield client_future
        self.server_stream = yield server_future
        self.assertTrue(isinstance(self.client_stream, MicroProxySSLIOStream))
        self.assertTrue(isinstance(self.server_stream, MicroProxySSLIOStream))
        yield self.client_send_line(b"EHLO mail.example.com\r\n")
        yield self.server_send_line(b"250 mail.example.com welcome\r\n")
项目:microProxy    作者:mike820324    | 项目源码 | 文件源码
def create_dest_sslcontext(insecure=False, trusted_ca_certs="", alpn=None):
    ssl_ctx = create_basic_sslcontext()

    if not insecure:
        trusted_ca_certs = trusted_ca_certs or certifi.where()
        ssl_ctx.load_verify_locations(trusted_ca_certs)
        ssl_ctx.set_verify(
            SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
            certificate_verify_cb)
    else:
        ssl_ctx.set_verify(SSL.VERIFY_NONE, certificate_verify_cb)

    if alpn and HAS_ALPN:
        ssl_ctx.set_alpn_protos(alpn)

    return ssl_ctx
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def ssl(sock, keyfile=None, certfile=None):
    context = SSL.Context(SSL.SSLv23_METHOD)
    if certfile is not None:
        context.use_certificate_file(certfile)
    if keyfile is not None:
        context.use_privatekey_file(keyfile)
    context.set_verify(SSL.VERIFY_NONE, lambda *x: True)
    timeout = sock.gettimeout()
    try:
        sock = sock._sock
    except AttributeError:
        pass
    connection = SSL.Connection(context, sock)
    ssl_sock = SSLObject(connection)
    ssl_sock.settimeout(timeout)

    try:
        sock.getpeername()
    except Exception:
        # no, no connection yet
        pass
    else:
        # yes, do the handshake
        ssl_sock.do_handshake()
    return ssl_sock
项目:Docker-XX-Net    作者:kuanghy    | 项目源码 | 文件源码
def ssl(sock, keyfile=None, certfile=None):
    context = SSL.Context(SSL.SSLv23_METHOD)
    if certfile is not None:
        context.use_certificate_file(certfile)
    if keyfile is not None:
        context.use_privatekey_file(keyfile)
    context.set_verify(SSL.VERIFY_NONE, lambda *x: True)
    timeout = sock.gettimeout()
    try:
        sock = sock._sock
    except AttributeError:
        pass
    connection = SSL.Connection(context, sock)
    ssl_sock = SSLObject(connection)
    ssl_sock.settimeout(timeout)

    try:
        sock.getpeername()
    except Exception:
        # no, no connection yet
        pass
    else:
        # yes, do the handshake
        ssl_sock.do_handshake()
    return ssl_sock
项目:microProxy    作者:mike820324    | 项目源码 | 文件源码
def _make_client_iostream(self, connection, **kwargs):
        def verify_cb(conn, x509, err_num, err_depth, err_code):
            return True
        dest_context = _client_ssl_options(SSL.VERIFY_NONE, verify_cb)
        return MicroProxySSLIOStream(
            connection, io_loop=self.io_loop,
            ssl_options=dest_context, **kwargs)
项目:CoinSwapCS    作者:AdamISZ    | 项目源码 | 文件源码
def getContext(self):
        ctx = ssl.ClientContextFactory.getContext(self)
        #TODO: replace VERIFY_NONE with VERIFY_PEER when we have
        #a real server with a valid CA signed cert. If that doesn't
        #work it'll be possible to use self-signed certs, if they're distributed,
        #by placing the cert.pem file and location in the config and uncommenting
        #the ctx.load_verify_locations line.
        #As it stands this is using non-authenticated certs, meaning MITM exposed.
        ctx.set_verify(SSL.VERIFY_NONE, verifyCallback)
        #ctx.load_verify_locations("/path/to/cert.pem")
        return ctx
项目:python-gnutls    作者:AGProjects    | 项目源码 | 文件源码
def _makeContext(self):
        ctx = SSL.Context(self.method)
        ctx.set_app_data(_SSLApplicationData())

        if self.certificate is not None and self.privateKey is not None:
            ctx.use_certificate(self.certificate)
            ctx.use_privatekey(self.privateKey)
            # Sanity check
            ctx.check_privatekey()

        verifyFlags = SSL.VERIFY_NONE
        if self.verify:
            verifyFlags = SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT
            if self.caCerts:
                store = ctx.get_cert_store()
                for cert in self.caCerts:
                    store.add_cert(cert)

        def _trackVerificationProblems(conn,cert,errno,depth,preverify_ok):
            return True

        ctx.set_verify(verifyFlags, _trackVerificationProblems)

        if self.enableSessions:
            sessionName = md5.md5("%s-%d" % (reflect.qual(self.__class__), _sessionCounter())).hexdigest()
            ctx.set_session_id(sessionName)

        return ctx
项目:python-gnutls    作者:AGProjects    | 项目源码 | 文件源码
def _makeContext(self):
        ctx = SSL.Context(self.method)
        ctx.set_app_data(_SSLApplicationData())

        if self.certificate is not None and self.privateKey is not None:
            ctx.use_certificate(self.certificate)
            ctx.use_privatekey(self.privateKey)
            # Sanity check
            ctx.check_privatekey()

        verifyFlags = SSL.VERIFY_NONE
        if self.verify:
            verifyFlags = SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT
            if self.caCerts:
                store = ctx.get_cert_store()
                for cert in self.caCerts:
                    store.add_cert(cert)

        def _trackVerificationProblems(conn,cert,errno,depth,preverify_ok):
            return True

        ctx.set_verify(verifyFlags, _trackVerificationProblems)

        if self.enableSessions:
            sessionName = md5.md5("%s-%d" % (reflect.qual(self.__class__), _sessionCounter())).hexdigest()
            ctx.set_session_id(sessionName)

        return ctx
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def make_ssl_context(key_file=None, cert_file=None, pem_file=None, ca_dir=None,
                     verify_peer=False, url=None, method=SSL.TLSv1_METHOD,
                     key_file_passphrase=None):
    """
    Creates SSL context containing certificate and key file locations.
    """
    ssl_context = SSL.Context(method)

    # Key file defaults to certificate file if present.
    if cert_file:
        ssl_context.use_certificate_file(cert_file)

    if key_file_passphrase:
        passwd_cb = lambda max_passphrase_len, set_prompt, userdata: \
                           key_file_passphrase 
        ssl_context.set_passwd_cb(passwd_cb)

    if key_file:
        ssl_context.use_privatekey_file(key_file)
    elif cert_file:
        ssl_context.use_privatekey_file(cert_file)

    if pem_file or ca_dir:
        ssl_context.load_verify_locations(pem_file, ca_dir)

    def _callback(conn, x509, errnum, errdepth, preverify_ok):
        """Default certification verification callback.
        Performs no checks and returns the status passed in.
        """
        return preverify_ok

    verify_callback = _callback

    if verify_peer:
        ssl_context.set_verify_depth(9)
        if url:
            set_peer_verification_for_url_hostname(ssl_context, url)
        else:
            ssl_context.set_verify(SSL.VERIFY_PEER, verify_callback)
    else:
        ssl_context.set_verify(SSL.VERIFY_NONE, verify_callback)

    return ssl_context
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _makeContext(self):
        ctx = SSL.Context(self.method)
        ctx.set_app_data(_SSLApplicationData())

        if self.certificate is not None and self.privateKey is not None:
            ctx.use_certificate(self.certificate)
            ctx.use_privatekey(self.privateKey)
            # Sanity check
            ctx.check_privatekey()

        verifyFlags = SSL.VERIFY_NONE
        if self.verify:
            verifyFlags = SSL.VERIFY_PEER
            if self.requireCertificate:
                verifyFlags |= SSL.VERIFY_FAIL_IF_NO_PEER_CERT
            if self.verifyOnce:
                verifyFlags |= SSL.VERIFY_CLIENT_ONCE
            if self.caCerts:
                store = ctx.get_cert_store()
                for cert in self.caCerts:
                    store.add_cert(cert)

        def _trackVerificationProblems(conn,cert,errno,depth,preverify_ok):
            # retcode is the answer OpenSSL's default verifier would have
            # given, had we allowed it to run.
            if not preverify_ok:
                ctx.get_app_data().problems.append(OpenSSLVerifyError(cert, errno, depth))
            return preverify_ok
        ctx.set_verify(verifyFlags, _trackVerificationProblems)

        if self.verifyDepth is not None:
            ctx.set_verify_depth(self.verifyDepth)

        if self.enableSingleUseKeys:
            ctx.set_options(SSL.OP_SINGLE_DH_USE)

        if self.fixBrokenPeers:
            ctx.set_options(self._OP_ALL)

        if self.enableSessions:
            sessionName = md5.md5("%s-%d" % (reflect.qual(self.__class__), _sessionCounter())).hexdigest()
            ctx.set_session_id(sessionName)

        return ctx
项目:microProxy    作者:mike820324    | 项目源码 | 文件源码
def _do_ssl_handshake(self):
        try:
            self._handshake_reading = False
            self._handshake_writing = False
            self.socket.do_handshake()

        except SSL.WantReadError:
            self._handshake_reading = True
            return

        except SSL.WantWriteError:
            self._handshake_writing = True
            return

        except SSL.SysCallError as e:
            err_num = abs(e[0])
            if err_num in (errno.EBADF, errno.ENOTCONN, errno.EPERM):
                return self.close(exc_info=True)

            raise

        except SSL.Error as err:
            try:
                peer = self.socket.getpeername()
            except Exception:
                peer = '(not connected)'
                logger.warning("SSL Error on %s %s: %s",
                               self.socket.fileno(), peer, err)
            return self.close(exc_info=True)

        except AttributeError:
            return self.close(exc_info=True)

        else:
            self._ssl_accepting = False
            verify_mode = self.socket.get_context().get_verify_mode()
            if (verify_mode != SSL.VERIFY_NONE and
                    self._server_hostname is not None):
                try:
                    verify_hostname(self.socket, self._server_hostname)
                except VerificationError as e:
                    logger.warning("Invalid SSL certificate: {0}".format(e))
                    self.close(exc_info=True)
                    return

            self._run_ssl_connect_callback()
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def _makeContext(self):
        ctx = SSL.Context(self.method)
        ctx.set_app_data(_SSLApplicationData())

        if self.certificate is not None and self.privateKey is not None:
            ctx.use_certificate(self.certificate)
            ctx.use_privatekey(self.privateKey)
            # Sanity check
            ctx.check_privatekey()

        verifyFlags = SSL.VERIFY_NONE
        if self.verify:
            verifyFlags = SSL.VERIFY_PEER
            if self.requireCertificate:
                verifyFlags |= SSL.VERIFY_FAIL_IF_NO_PEER_CERT
            if self.verifyOnce:
                verifyFlags |= SSL.VERIFY_CLIENT_ONCE
            if self.caCerts:
                store = ctx.get_cert_store()
                for cert in self.caCerts:
                    store.add_cert(cert)

        def _trackVerificationProblems(conn,cert,errno,depth,preverify_ok):
            # retcode is the answer OpenSSL's default verifier would have
            # given, had we allowed it to run.
            if not preverify_ok:
                ctx.get_app_data().problems.append(OpenSSLVerifyError(cert, errno, depth))
            return preverify_ok
        ctx.set_verify(verifyFlags, _trackVerificationProblems)

        if self.verifyDepth is not None:
            ctx.set_verify_depth(self.verifyDepth)

        if self.enableSingleUseKeys:
            ctx.set_options(SSL.OP_SINGLE_DH_USE)

        if self.fixBrokenPeers:
            ctx.set_options(self._OP_ALL)

        if self.enableSessions:
            sessionName = md5.md5("%s-%d" % (reflect.qual(self.__class__), _sessionCounter())).hexdigest()
            ctx.set_session_id(sessionName)

        return ctx
项目:package-33c3    作者:info-beamer    | 项目源码 | 文件源码
def make_ssl_context(key_file=None, cert_file=None, pem_file=None, ca_dir=None,
                     verify_peer=False, url=None, method=SSL.TLSv1_METHOD,
                     key_file_passphrase=None):
    """
    Creates SSL context containing certificate and key file locations.
    """
    ssl_context = SSL.Context(method)

    # Key file defaults to certificate file if present.
    if cert_file:
        ssl_context.use_certificate_file(cert_file)

    if key_file_passphrase:
        passwd_cb = lambda max_passphrase_len, set_prompt, userdata: \
                           key_file_passphrase 
        ssl_context.set_passwd_cb(passwd_cb)

    if key_file:
        ssl_context.use_privatekey_file(key_file)
    elif cert_file:
        ssl_context.use_privatekey_file(cert_file)

    if pem_file or ca_dir:
        ssl_context.load_verify_locations(pem_file, ca_dir)

    def _callback(conn, x509, errnum, errdepth, preverify_ok):
        """Default certification verification callback.
        Performs no checks and returns the status passed in.
        """
        return preverify_ok

    verify_callback = _callback

    if verify_peer:
        ssl_context.set_verify_depth(9)
        if url:
            set_peer_verification_for_url_hostname(ssl_context, url)
        else:
            ssl_context.set_verify(SSL.VERIFY_PEER, verify_callback)
    else:
        ssl_context.set_verify(SSL.VERIFY_NONE, verify_callback)

    return ssl_context
项目:package-33c3    作者:info-beamer    | 项目源码 | 文件源码
def make_ssl_context(key_file=None, cert_file=None, pem_file=None, ca_dir=None,
                     verify_peer=False, url=None, method=SSL.TLSv1_METHOD,
                     key_file_passphrase=None):
    """
    Creates SSL context containing certificate and key file locations.
    """
    ssl_context = SSL.Context(method)

    # Key file defaults to certificate file if present.
    if cert_file:
        ssl_context.use_certificate_file(cert_file)

    if key_file_passphrase:
        passwd_cb = lambda max_passphrase_len, set_prompt, userdata: \
                           key_file_passphrase 
        ssl_context.set_passwd_cb(passwd_cb)

    if key_file:
        ssl_context.use_privatekey_file(key_file)
    elif cert_file:
        ssl_context.use_privatekey_file(cert_file)

    if pem_file or ca_dir:
        ssl_context.load_verify_locations(pem_file, ca_dir)

    def _callback(conn, x509, errnum, errdepth, preverify_ok):
        """Default certification verification callback.
        Performs no checks and returns the status passed in.
        """
        return preverify_ok

    verify_callback = _callback

    if verify_peer:
        ssl_context.set_verify_depth(9)
        if url:
            set_peer_verification_for_url_hostname(ssl_context, url)
        else:
            ssl_context.set_verify(SSL.VERIFY_PEER, verify_callback)
    else:
        ssl_context.set_verify(SSL.VERIFY_NONE, verify_callback)

    return ssl_context
项目:slack_scholar    作者:xLeitix    | 项目源码 | 文件源码
def make_ssl_context(key_file=None, cert_file=None, pem_file=None, ca_dir=None,
                     verify_peer=False, url=None, method=SSL.TLSv1_METHOD,
                     key_file_passphrase=None):
    """
    Creates SSL context containing certificate and key file locations.
    """
    ssl_context = SSL.Context(method)

    # Key file defaults to certificate file if present.
    if cert_file:
        ssl_context.use_certificate_file(cert_file)

    if key_file_passphrase:
        passwd_cb = lambda max_passphrase_len, set_prompt, userdata: \
                           key_file_passphrase 
        ssl_context.set_passwd_cb(passwd_cb)

    if key_file:
        ssl_context.use_privatekey_file(key_file)
    elif cert_file:
        ssl_context.use_privatekey_file(cert_file)

    if pem_file or ca_dir:
        ssl_context.load_verify_locations(pem_file, ca_dir)

    def _callback(conn, x509, errnum, errdepth, preverify_ok):
        """Default certification verification callback.
        Performs no checks and returns the status passed in.
        """
        return preverify_ok

    verify_callback = _callback

    if verify_peer:
        ssl_context.set_verify_depth(9)
        if url:
            set_peer_verification_for_url_hostname(ssl_context, url)
        else:
            ssl_context.set_verify(SSL.VERIFY_PEER, verify_callback)
    else:
        ssl_context.set_verify(SSL.VERIFY_NONE, verify_callback)

    return ssl_context
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def _makeContext(self):
        ctx = self._contextFactory(self.method)
        ctx.set_options(self._options)
        ctx.set_mode(self._mode)

        if self.certificate is not None and self.privateKey is not None:
            ctx.use_certificate(self.certificate)
            ctx.use_privatekey(self.privateKey)
            for extraCert in self.extraCertChain:
                ctx.add_extra_chain_cert(extraCert)
            # Sanity check
            ctx.check_privatekey()

        verifyFlags = SSL.VERIFY_NONE
        if self.verify:
            verifyFlags = SSL.VERIFY_PEER
            if self.requireCertificate:
                verifyFlags |= SSL.VERIFY_FAIL_IF_NO_PEER_CERT
            if self.verifyOnce:
                verifyFlags |= SSL.VERIFY_CLIENT_ONCE
            self.trustRoot._addCACertsToContext(ctx)

        # It'd be nice if pyOpenSSL let us pass None here for this behavior (as
        # the underlying OpenSSL API call allows NULL to be passed).  It
        # doesn't, so we'll supply a function which does the same thing.
        def _verifyCallback(conn, cert, errno, depth, preverify_ok):
            return preverify_ok
        ctx.set_verify(verifyFlags, _verifyCallback)
        if self.verifyDepth is not None:
            ctx.set_verify_depth(self.verifyDepth)

        if self.enableSessions:
            name = "%s-%d" % (reflect.qual(self.__class__), _sessionCounter())
            sessionName = md5(networkString(name)).hexdigest()

            ctx.set_session_id(sessionName.encode('ascii'))

        if self.dhParameters:
            ctx.load_tmp_dh(self.dhParameters._dhFile.path)
        ctx.set_cipher_list(self._cipherString.encode('ascii'))

        if self._ecCurve is not None:
            try:
                self._ecCurve.addECKeyToContext(ctx)
            except BaseException:
                pass  # ECDHE support is best effort only.

        if self._acceptableProtocols:
            # Try to set NPN and ALPN. _acceptableProtocols cannot be set by
            # the constructor unless at least one mechanism is supported.
            _setAcceptableProtocols(ctx, self._acceptableProtocols)

        return ctx