Python ssl 模块,CERT_OPTIONAL 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ssl.CERT_OPTIONAL

项目:gimel    作者:Alephbet    | 项目源码 | 文件源码
def __init__(self, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None,
                 ssl_ca_certs=None, **kwargs):
        if not ssl_available:
            raise RedisError("Python wasn't built with SSL support")

        super(SSLConnection, self).__init__(**kwargs)

        self.keyfile = ssl_keyfile
        self.certfile = ssl_certfile
        if ssl_cert_reqs is None:
            ssl_cert_reqs = ssl.CERT_NONE
        elif isinstance(ssl_cert_reqs, basestring):
            CERT_REQS = {
                'none': ssl.CERT_NONE,
                'optional': ssl.CERT_OPTIONAL,
                'required': ssl.CERT_REQUIRED
            }
            if ssl_cert_reqs not in CERT_REQS:
                raise RedisError(
                    "Invalid SSL Certificate Requirements Flag: %s" %
                    ssl_cert_reqs)
            ssl_cert_reqs = CERT_REQS[ssl_cert_reqs]
        self.cert_reqs = ssl_cert_reqs
        self.ca_certs = ssl_ca_certs
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def __init__(self, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None,
                 ssl_ca_certs=None, **kwargs):
        if not ssl_available:
            raise RedisError("Python wasn't built with SSL support")

        super(SSLConnection, self).__init__(**kwargs)

        self.keyfile = ssl_keyfile
        self.certfile = ssl_certfile
        if ssl_cert_reqs is None:
            ssl_cert_reqs = ssl.CERT_NONE
        elif isinstance(ssl_cert_reqs, basestring):
            CERT_REQS = {
                'none': ssl.CERT_NONE,
                'optional': ssl.CERT_OPTIONAL,
                'required': ssl.CERT_REQUIRED
            }
            if ssl_cert_reqs not in CERT_REQS:
                raise RedisError(
                    "Invalid SSL Certificate Requirements Flag: %s" %
                    ssl_cert_reqs)
            ssl_cert_reqs = CERT_REQS[ssl_cert_reqs]
        self.cert_reqs = ssl_cert_reqs
        self.ca_certs = ssl_ca_certs
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def __init__(self, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None,
                 ssl_ca_certs=None, **kwargs):
        if not ssl_available:
            raise RedisError("Python wasn't built with SSL support")

        super(SSLConnection, self).__init__(**kwargs)

        self.keyfile = ssl_keyfile
        self.certfile = ssl_certfile
        if ssl_cert_reqs is None:
            ssl_cert_reqs = ssl.CERT_NONE
        elif isinstance(ssl_cert_reqs, basestring):
            CERT_REQS = {
                'none': ssl.CERT_NONE,
                'optional': ssl.CERT_OPTIONAL,
                'required': ssl.CERT_REQUIRED
            }
            if ssl_cert_reqs not in CERT_REQS:
                raise RedisError(
                    "Invalid SSL Certificate Requirements Flag: %s" %
                    ssl_cert_reqs)
            ssl_cert_reqs = CERT_REQS[ssl_cert_reqs]
        self.cert_reqs = ssl_cert_reqs
        self.ca_certs = ssl_ca_certs
项目:http2    作者:mSOHU    | 项目源码 | 文件源码
def _verify_cert(self, peer_cert):
        """Returns True if peercert is valid according to the configured
        validation mode and hostname.

        The ssl handshake already tested the certificate for a valid
        CA signature; the only thing that remains is to check
        the hostname.
        """
        verify_mode = self.ssl_options.verify_mode
        assert verify_mode in (ssl.CERT_NONE, ssl.CERT_REQUIRED, ssl.CERT_OPTIONAL)
        if verify_mode == ssl.CERT_NONE or self.host is None:
            return True

        if peer_cert is None and verify_mode == ssl.CERT_REQUIRED:
            logger.warning("No SSL certificate given")
            return False
        try:
            ssl_match_hostname(peer_cert, self.host)
        except SSLCertificateError:
            logger.warning("Invalid SSL certificate", exc_info=True)
            return False
        else:
            return True
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def wrap_socket(self, sock):
        try:
            if self.clientcert_req:
                ca_certs = self.interface[4]
                cert_reqs = ssl.CERT_OPTIONAL
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       cert_reqs=cert_reqs,
                                       ca_certs=ca_certs,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
            else:
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
        except SSLError:
            # Generally this happens when an HTTP request is received on a
            # secure socket. We don't do anything because it will be detected
            # by Worker and dealt with appropriately.
            pass

        return sock
项目:true_review_web2py    作者:lucadealfaro    | 项目源码 | 文件源码
def wrap_socket(self, sock):
        try:
            if self.clientcert_req:
                ca_certs = self.interface[4]
                cert_reqs = ssl.CERT_OPTIONAL
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       cert_reqs=cert_reqs,
                                       ca_certs=ca_certs,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
            else:
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
        except SSLError:
            # Generally this happens when an HTTP request is received on a
            # secure socket. We don't do anything because it will be detected
            # by Worker and dealt with appropriately.
            pass

        return sock
项目:knowledge-management    作者:dgore7    | 项目源码 | 文件源码
def create_secure_socket(self, cert_dir, host='localhost', port=8001):
        context = ssl.SSLContext() # Defaults to SSL/TLS support with PROTOCOL_TLS (best for now for compatibility)
        context.verify_mode = ssl.CERT_OPTIONAL # ssl.CERT_REQUIRED is more secure
        context.check_hostname = False # Hostname verification on certs (Dont want for now)
        context.load_default_certs(purpose=ssl.Purpose.CLIENT_AUTH) # Load the public CA certs for the server socket (need CLIENT_AUTH param)
        self.generate_server_self_cert(cert_dir)
        context.load_cert_chain(join(cert_dir, "KnowledgeManagement.crt"), keyfile=join(cert_dir, "KnowledgeManagement.key"))

        # Create the secure socket that will be listening for connections.
        # Note that the SSL handshake is NOT performed upon connection so the server can securely transfer the cert
        # if required.
        self.server_proc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_proc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        secureSocket = context.wrap_socket(self.server_proc, server_side=True)
        secureSocket.bind((host, port))
        print("SSL Server started on {}:{}".format(host, port))

        #self.server_negotiate_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        #self.server_negotiate_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        #self.server_negotiate_sock.bind((host, port+1))
        self.is_listening=True
        return secureSocket
项目:Problematica-public    作者:TechMaz    | 项目源码 | 文件源码
def wrap_socket(self, sock):
        try:
            if self.clientcert_req:
                ca_certs = self.interface[4]
                cert_reqs = ssl.CERT_OPTIONAL
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       cert_reqs=cert_reqs,
                                       ca_certs=ca_certs,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
            else:
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
        except SSLError:
            # Generally this happens when an HTTP request is received on a
            # secure socket. We don't do anything because it will be detected
            # by Worker and dealt with appropriately.
            pass

        return sock
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def __init__(self, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None,
                 ssl_ca_certs=None, **kwargs):
        if not ssl_available:
            raise RedisError("Python wasn't built with SSL support")

        super(SSLConnection, self).__init__(**kwargs)

        self.keyfile = ssl_keyfile
        self.certfile = ssl_certfile
        if ssl_cert_reqs is None:
            ssl_cert_reqs = ssl.CERT_NONE
        elif isinstance(ssl_cert_reqs, basestring):
            CERT_REQS = {
                'none': ssl.CERT_NONE,
                'optional': ssl.CERT_OPTIONAL,
                'required': ssl.CERT_REQUIRED
            }
            if ssl_cert_reqs not in CERT_REQS:
                raise RedisError(
                    "Invalid SSL Certificate Requirements Flag: %s" %
                    ssl_cert_reqs)
            ssl_cert_reqs = CERT_REQS[ssl_cert_reqs]
        self.cert_reqs = ssl_cert_reqs
        self.ca_certs = ssl_ca_certs
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def __init__(self, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None,
                 ssl_ca_certs=None, **kwargs):
        if not ssl_available:
            raise RedisError("Python wasn't built with SSL support")

        super(SSLConnection, self).__init__(**kwargs)

        self.keyfile = ssl_keyfile
        self.certfile = ssl_certfile
        if ssl_cert_reqs is None:
            ssl_cert_reqs = ssl.CERT_NONE
        elif isinstance(ssl_cert_reqs, basestring):
            CERT_REQS = {
                'none': ssl.CERT_NONE,
                'optional': ssl.CERT_OPTIONAL,
                'required': ssl.CERT_REQUIRED
            }
            if ssl_cert_reqs not in CERT_REQS:
                raise RedisError(
                    "Invalid SSL Certificate Requirements Flag: %s" %
                    ssl_cert_reqs)
            ssl_cert_reqs = CERT_REQS[ssl_cert_reqs]
        self.cert_reqs = ssl_cert_reqs
        self.ca_certs = ssl_ca_certs
项目:rekall-agent-server    作者:rekall-innovations    | 项目源码 | 文件源码
def wrap_socket(self, sock):
        try:
            if self.clientcert_req:
                ca_certs = self.interface[4]
                cert_reqs = ssl.CERT_OPTIONAL
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       cert_reqs=cert_reqs,
                                       ca_certs=ca_certs,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
            else:
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
        except SSLError:
            # Generally this happens when an HTTP request is received on a
            # secure socket. We don't do anything because it will be detected
            # by Worker and dealt with appropriately.
            pass

        return sock
项目:certproxy    作者:geneanet    | 项目源码 | 文件源码
def run(self, handler):
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
        context.load_cert_chain(handler.certificate_file, handler.private_key_file)
        context.load_verify_locations(cafile=handler.certificate_file)
        context.load_verify_locations(cafile=handler.crl_file)
        context.options &= ssl.OP_NO_SSLv3
        context.options &= ssl.OP_NO_SSLv2
        context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
        context.verify_mode = ssl.CERT_OPTIONAL
        self.options['ssl_context'] = context

        logger.info('Starting server on host %s port %d.', self.host, self.port)

        server = pywsgi.WSGIServer(
            (self.host, self.port),
            handler,
            ssl_context=context,
            handler_class=RequestHandler,
            log=logger,
            error_log=logger,
        )
        server.serve_forever()
项目:thingflow-python    作者:mpi-sws-rse    | 项目源码 | 文件源码
def _connect(self):
        if self.server_tls:
            raise Exception("TBD")
            print(self.client.tls_set(self.server_tls.server_cert, cert_reqs=ssl.CERT_OPTIONAL))
            print(self.client.connect(self.host, self.port))
        else:
            self.client.connect(self.host, self.port) 
            self.client.subscribe(self.topics)

        def on_connect(client, userdata, flags, rc):
            print("Connected with result code "+str(rc))
        self.client.on_connect = on_connect

        def on_publish(client, userdata, mid):
            print("Successfully published mid %d" % mid)
        self.client.on_publish = on_publish
项目:web3py    作者:web2py    | 项目源码 | 文件源码
def wrap_socket(self, sock):
        try:
            if self.clientcert_req:
                ca_certs = self.interface[4]
                cert_reqs = ssl.CERT_OPTIONAL
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       cert_reqs=cert_reqs,
                                       ca_certs=ca_certs,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
            else:
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
        except SSLError:
            # Generally this happens when an HTTP request is received on a
            # secure socket. We don't do anything because it will be detected
            # by Worker and dealt with appropriately.
            pass

        return sock
项目:CSC376KnowledgeManagement    作者:WCotterman    | 项目源码 | 文件源码
def wrap_socket(self, sock):
        try:
            if self.clientcert_req:
                ca_certs = self.interface[4]
                cert_reqs = ssl.CERT_OPTIONAL
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       cert_reqs=cert_reqs,
                                       ca_certs=ca_certs,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
            else:
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
        except SSLError:
            # Generally this happens when an HTTP request is received on a
            # secure socket. We don't do anything because it will be detected
            # by Worker and dealt with appropriately.
            pass

        return sock
项目:slugiot-client    作者:slugiot    | 项目源码 | 文件源码
def wrap_socket(self, sock):
        try:
            if self.clientcert_req:
                ca_certs = self.interface[4]
                cert_reqs = ssl.CERT_OPTIONAL
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       cert_reqs=cert_reqs,
                                       ca_certs=ca_certs,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
            else:
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
        except SSLError:
            # Generally this happens when an HTTP request is received on a
            # secure socket. We don't do anything because it will be detected
            # by Worker and dealt with appropriately.
            pass

        return sock
项目:PopRank    作者:SanketMore    | 项目源码 | 文件源码
def __init__(self, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None,
                 ssl_ca_certs=None, **kwargs):
        if not ssl_available:
            raise RedisError("Python wasn't built with SSL support")

        super(SSLConnection, self).__init__(**kwargs)

        self.keyfile = ssl_keyfile
        self.certfile = ssl_certfile
        if ssl_cert_reqs is None:
            ssl_cert_reqs = ssl.CERT_NONE
        elif isinstance(ssl_cert_reqs, basestring):
            CERT_REQS = {
                'none': ssl.CERT_NONE,
                'optional': ssl.CERT_OPTIONAL,
                'required': ssl.CERT_REQUIRED
            }
            if ssl_cert_reqs not in CERT_REQS:
                raise RedisError(
                    "Invalid SSL Certificate Requirements Flag: %s" %
                    ssl_cert_reqs)
            ssl_cert_reqs = CERT_REQS[ssl_cert_reqs]
        self.cert_reqs = ssl_cert_reqs
        self.ca_certs = ssl_ca_certs
项目:aredis    作者:NoneGG    | 项目源码 | 文件源码
def __init__(self, keyfile=None, certfile=None,
                 cert_reqs=None, ca_certs=None):
        self.keyfile = keyfile
        self.certfile = certfile
        if cert_reqs is None:
            self.cert_reqs = ssl.CERT_NONE
        elif isinstance(cert_reqs, str):
            CERT_REQS = {
                'none': ssl.CERT_NONE,
                'optional': ssl.CERT_OPTIONAL,
                'required': ssl.CERT_REQUIRED
            }
            if cert_reqs not in CERT_REQS:
                raise RedisError(
                    "Invalid SSL Certificate Requirements Flag: %s" %
                    cert_reqs)
            self.cert_reqs = CERT_REQS[cert_reqs]
        self.ca_certs = ca_certs
        self.context = None
项目:aredis    作者:NoneGG    | 项目源码 | 文件源码
def test_cert_reqs_options(self):
        import ssl
        with pytest.raises(TypeError) as e:
            pool = aredis.ConnectionPool.from_url(
                'rediss://?ssl_cert_reqs=none&ssl_keyfile=test')
            assert e.message == 'certfile should be a valid filesystem path'
            assert pool.get_connection().ssl_context.verify_mode == ssl.CERT_NONE

        with pytest.raises(TypeError) as e:
            pool = aredis.ConnectionPool.from_url(
                'rediss://?ssl_cert_reqs=optional&ssl_keyfile=test')
            assert e.message == 'certfile should be a valid filesystem path'
            assert pool.get_connection().ssl_context.verify_mode == ssl.CERT_OPTIONAL

        with pytest.raises(TypeError) as e:
            pool = aredis.ConnectionPool.from_url(
                'rediss://?ssl_cert_reqs=required&ssl_keyfile=test')
            assert e.message == 'certfile should be a valid filesystem path'
            assert pool.get_connection().ssl_context.verify_mode == ssl.CERT_REQUIRED
项目:lambda-examples    作者:GalacticFog    | 项目源码 | 文件源码
def __init__(self, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None,
                 ssl_ca_certs=None, **kwargs):
        if not ssl_available:
            raise RedisError("Python wasn't built with SSL support")

        super(SSLConnection, self).__init__(**kwargs)

        self.keyfile = ssl_keyfile
        self.certfile = ssl_certfile
        if ssl_cert_reqs is None:
            ssl_cert_reqs = ssl.CERT_NONE
        elif isinstance(ssl_cert_reqs, basestring):
            CERT_REQS = {
                'none': ssl.CERT_NONE,
                'optional': ssl.CERT_OPTIONAL,
                'required': ssl.CERT_REQUIRED
            }
            if ssl_cert_reqs not in CERT_REQS:
                raise RedisError(
                    "Invalid SSL Certificate Requirements Flag: %s" %
                    ssl_cert_reqs)
            ssl_cert_reqs = CERT_REQS[ssl_cert_reqs]
        self.cert_reqs = ssl_cert_reqs
        self.ca_certs = ssl_ca_certs
项目:StuffShare    作者:StuffShare    | 项目源码 | 文件源码
def wrap_socket(self, sock):
        try:
            if self.clientcert_req:
                ca_certs = self.interface[4]
                cert_reqs = ssl.CERT_OPTIONAL
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       cert_reqs=cert_reqs,
                                       ca_certs=ca_certs,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
            else:
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
        except SSLError:
            # Generally this happens when an HTTP request is received on a
            # secure socket. We don't do anything because it will be detected
            # by Worker and dealt with appropriately.
            pass

        return sock
项目:mongodb-monitoring    作者:jruaux    | 项目源码 | 文件源码
def validate_cert_reqs(option, value):
        """Validate the cert reqs are valid. It must be None or one of the
        three values ``ssl.CERT_NONE``, ``ssl.CERT_OPTIONAL`` or
        ``ssl.CERT_REQUIRED``.
        """
        if value is None:
            return value
        elif isinstance(value, string_type) and hasattr(ssl, value):
            value = getattr(ssl, value)

        if value in (ssl.CERT_NONE, ssl.CERT_OPTIONAL, ssl.CERT_REQUIRED):
            return value
        raise ValueError("The value of %s must be one of: "
                         "`ssl.CERT_NONE`, `ssl.CERT_OPTIONAL` or "
                         "`ssl.CERT_REQUIRED" % (option,))
项目:mongodb-monitoring    作者:jruaux    | 项目源码 | 文件源码
def __get_verify_mode(self):
        """Whether to try to verify other peers' certificates and how to
        behave if verification fails. This attribute must be one of
        ssl.CERT_NONE, ssl.CERT_OPTIONAL or ssl.CERT_REQUIRED.
        """
        return self._verify_mode
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def validate_cert_reqs(option, value):
    """Validate the cert reqs are valid. It must be None or one of the three
    values ``ssl.CERT_NONE``, ``ssl.CERT_OPTIONAL`` or ``ssl.CERT_REQUIRED``"""
    if value is None:
        return value
    if HAS_SSL:
        if value in (ssl.CERT_NONE, ssl.CERT_OPTIONAL, ssl.CERT_REQUIRED):
            return value
        raise ConfigurationError("The value of %s must be one of: "
                                 "`ssl.CERT_NONE`, `ssl.CERT_OPTIONAL` or "
                                 "`ssl.CERT_REQUIRED" % (option,))
    else:
        raise ConfigurationError("The value of %s is set but can't be "
                                 "validated. The ssl module is not available"
                                 % (option,))
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _verify_cert(self, peercert):
        """Returns True if peercert is valid according to the configured
        validation mode and hostname.

        The ssl handshake already tested the certificate for a valid
        CA signature; the only thing that remains is to check
        the hostname.
        """
        if isinstance(self._ssl_options, dict):
            verify_mode = self._ssl_options.get('cert_reqs', ssl.CERT_NONE)
        elif isinstance(self._ssl_options, ssl.SSLContext):
            verify_mode = self._ssl_options.verify_mode
        assert verify_mode in (ssl.CERT_NONE, ssl.CERT_REQUIRED, ssl.CERT_OPTIONAL)
        if verify_mode == ssl.CERT_NONE or self._server_hostname is None:
            return True
        cert = self.socket.getpeercert()
        if cert is None and verify_mode == ssl.CERT_REQUIRED:
            gen_log.warning("No SSL certificate given")
            return False
        try:
            ssl_match_hostname(peercert, self._server_hostname)
        except SSLCertificateError as e:
            gen_log.warning("Invalid SSL certificate: %s" % e)
            return False
        else:
            return True
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def validate_cert_reqs(option, value):
    """Validate the cert reqs are valid. It must be None or one of the three
    values ``ssl.CERT_NONE``, ``ssl.CERT_OPTIONAL`` or ``ssl.CERT_REQUIRED``"""
    if value is None:
        return value
    if HAS_SSL:
        if value in (ssl.CERT_NONE, ssl.CERT_OPTIONAL, ssl.CERT_REQUIRED):
            return value
        raise ConfigurationError("The value of %s must be one of: "
                                 "`ssl.CERT_NONE`, `ssl.CERT_OPTIONAL` or "
                                 "`ssl.CERT_REQUIRED" % (option,))
    else:
        raise ConfigurationError("The value of %s is set but can't be "
                                 "validated. The ssl module is not available"
                                 % (option,))
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def validate_cert_reqs(option, value):
    """Validate the cert reqs are valid. It must be None or one of the three
    values ``ssl.CERT_NONE``, ``ssl.CERT_OPTIONAL`` or ``ssl.CERT_REQUIRED``"""
    if value is None:
        return value
    if HAS_SSL:
        if value in (ssl.CERT_NONE, ssl.CERT_OPTIONAL, ssl.CERT_REQUIRED):
            return value
        raise ConfigurationError("The value of %s must be one of: "
                                 "`ssl.CERT_NONE`, `ssl.CERT_OPTIONAL` or "
                                 "`ssl.CERT_REQUIRED" % (option,))
    else:
        raise ConfigurationError("The value of %s is set but can't be "
                                 "validated. The ssl module is not available"
                                 % (option,))
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _verify_cert(self, peercert):
        """Returns True if peercert is valid according to the configured
        validation mode and hostname.

        The ssl handshake already tested the certificate for a valid
        CA signature; the only thing that remains is to check
        the hostname.
        """
        if isinstance(self._ssl_options, dict):
            verify_mode = self._ssl_options.get('cert_reqs', ssl.CERT_NONE)
        elif isinstance(self._ssl_options, ssl.SSLContext):
            verify_mode = self._ssl_options.verify_mode
        assert verify_mode in (ssl.CERT_NONE, ssl.CERT_REQUIRED, ssl.CERT_OPTIONAL)
        if verify_mode == ssl.CERT_NONE or self._server_hostname is None:
            return True
        cert = self.socket.getpeercert()
        if cert is None and verify_mode == ssl.CERT_REQUIRED:
            gen_log.warning("No SSL certificate given")
            return False
        try:
            ssl_match_hostname(peercert, self._server_hostname)
        except SSLCertificateError as e:
            gen_log.warning("Invalid SSL certificate: %s" % e)
            return False
        else:
            return True
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def validate_cert_reqs(option, value):
    """Validate the cert reqs are valid. It must be None or one of the three
    values ``ssl.CERT_NONE``, ``ssl.CERT_OPTIONAL`` or ``ssl.CERT_REQUIRED``"""
    if value is None:
        return value
    if HAS_SSL:
        if value in (ssl.CERT_NONE, ssl.CERT_OPTIONAL, ssl.CERT_REQUIRED):
            return value
        raise ConfigurationError("The value of %s must be one of: "
                                 "`ssl.CERT_NONE`, `ssl.CERT_OPTIONAL` or "
                                 "`ssl.CERT_REQUIRED" % (option,))
    else:
        raise ConfigurationError("The value of %s is set but can't be "
                                 "validated. The ssl module is not available"
                                 % (option,))
项目:Bethesda_SublimeFO4    作者:Scrivener07    | 项目源码 | 文件源码
def set_cert(self, key_file=None, cert_file=None,
                 cert_reqs='CERT_NONE', ca_certs=None):
        ssl_req_scheme = {
            'CERT_NONE': ssl.CERT_NONE,
            'CERT_OPTIONAL': ssl.CERT_OPTIONAL,
            'CERT_REQUIRED': ssl.CERT_REQUIRED
        }

        self.key_file = key_file
        self.cert_file = cert_file
        self.cert_reqs = ssl_req_scheme.get(cert_reqs) or ssl.CERT_NONE
        self.ca_certs = ca_certs
项目:PyS60-Projects    作者:gauravssnl    | 项目源码 | 文件源码
def set_cert(self, key_file=None, cert_file=None,
                 cert_reqs='CERT_NONE', ca_certs=None):
        ssl_req_scheme = {
            'CERT_NONE': ssl.CERT_NONE,
            'CERT_OPTIONAL': ssl.CERT_OPTIONAL,
            'CERT_REQUIRED': ssl.CERT_REQUIRED
        }

        self.key_file = key_file
        self.cert_file = cert_file
        self.cert_reqs = ssl_req_scheme.get(cert_reqs) or ssl.CERT_NONE
        self.ca_certs = ca_certs
项目:covar_me_app    作者:CovarMe    | 项目源码 | 文件源码
def validate_cert_reqs(option, value):
        """Validate the cert reqs are valid. It must be None or one of the
        three values ``ssl.CERT_NONE``, ``ssl.CERT_OPTIONAL`` or
        ``ssl.CERT_REQUIRED``.
        """
        if value is None:
            return value
        elif isinstance(value, string_type) and hasattr(ssl, value):
            value = getattr(ssl, value)

        if value in (ssl.CERT_NONE, ssl.CERT_OPTIONAL, ssl.CERT_REQUIRED):
            return value
        raise ValueError("The value of %s must be one of: "
                         "`ssl.CERT_NONE`, `ssl.CERT_OPTIONAL` or "
                         "`ssl.CERT_REQUIRED`" % (option,))
项目:covar_me_app    作者:CovarMe    | 项目源码 | 文件源码
def __get_verify_mode(self):
        """Whether to try to verify other peers' certificates and how to
        behave if verification fails. This attribute must be one of
        ssl.CERT_NONE, ssl.CERT_OPTIONAL or ssl.CERT_REQUIRED.
        """
        return self._verify_mode
项目:websearch    作者:abelkhan    | 项目源码 | 文件源码
def validate_cert_reqs(option, value):
    """Validate the cert reqs are valid. It must be None or one of the three
    values ``ssl.CERT_NONE``, ``ssl.CERT_OPTIONAL`` or ``ssl.CERT_REQUIRED``"""
    if value is None:
        return value
    if HAS_SSL:
        if value in (ssl.CERT_NONE, ssl.CERT_OPTIONAL, ssl.CERT_REQUIRED):
            return value
        raise ConfigurationError("The value of %s must be one of: "
                                 "`ssl.CERT_NONE`, `ssl.CERT_OPTIONAL` or "
                                 "`ssl.CERT_REQUIRED" % (option,))
    else:
        raise ConfigurationError("The value of %s is set but can't be "
                                 "validated. The ssl module is not available"
                                 % (option,))
项目:My-Web-Server-Framework-With-Python2.7    作者:syjsu    | 项目源码 | 文件源码
def _verify_cert(self, peercert):
        """Returns True if peercert is valid according to the configured
        validation mode and hostname.

        The ssl handshake already tested the certificate for a valid
        CA signature; the only thing that remains is to check
        the hostname.
        """
        if isinstance(self._ssl_options, dict):
            verify_mode = self._ssl_options.get('cert_reqs', ssl.CERT_NONE)
        elif isinstance(self._ssl_options, ssl.SSLContext):
            verify_mode = self._ssl_options.verify_mode
        assert verify_mode in (ssl.CERT_NONE, ssl.CERT_REQUIRED, ssl.CERT_OPTIONAL)
        if verify_mode == ssl.CERT_NONE or self._server_hostname is None:
            return True
        cert = self.socket.getpeercert()
        if cert is None and verify_mode == ssl.CERT_REQUIRED:
            gen_log.warning("No SSL certificate given")
            return False
        try:
            ssl_match_hostname(peercert, self._server_hostname)
        except SSLCertificateError as e:
            gen_log.warning("Invalid SSL certificate: %s" % e)
            return False
        else:
            return True
项目:thingflow-python    作者:mpi-sws-rse    | 项目源码 | 文件源码
def _connect(self):
        if self.server_tls:
            raise Exception("TBD")
            print(self.client.tls_set(self.server_tls.server_cert, cert_reqs=ssl.CERT_OPTIONAL))
            print(self.client.connect(self.host, self.port))
        else:
            self.client.connect(self.host, self.port) 

        def on_connect(client, userdata, flags, rc):
            print("Connected with result code "+str(rc))

            # Subscribing in on_connect() means that if we lose the connection and
            # reconnect then subscriptions will be renewed.
            client.subscribe(self.topics)
        self.client.on_connect = on_connect
项目:annotated-py-tornado    作者:hhstore    | 项目源码 | 文件源码
def _verify_cert(self, peercert):
        """Returns True if peercert is valid according to the configured
        validation mode and hostname.

        The ssl handshake already tested the certificate for a valid
        CA signature; the only thing that remains is to check
        the hostname.
        """
        if isinstance(self._ssl_options, dict):
            verify_mode = self._ssl_options.get('cert_reqs', ssl.CERT_NONE)
        elif isinstance(self._ssl_options, ssl.SSLContext):
            verify_mode = self._ssl_options.verify_mode
        assert verify_mode in (ssl.CERT_NONE, ssl.CERT_REQUIRED, ssl.CERT_OPTIONAL)
        if verify_mode == ssl.CERT_NONE or self._server_hostname is None:
            return True
        cert = self.socket.getpeercert()
        if cert is None and verify_mode == ssl.CERT_REQUIRED:
            gen_log.warning("No SSL certificate given")
            return False
        try:
            ssl_match_hostname(peercert, self._server_hostname)
        except SSLCertificateError:
            gen_log.warning("Invalid SSL certificate", exc_info=True)
            return False
        else:
            return True
项目:annotated-py-tornado    作者:hhstore    | 项目源码 | 文件源码
def _verify_cert(self, peercert):
        """Returns True if peercert is valid according to the configured
        validation mode and hostname.

        The ssl handshake already tested the certificate for a valid
        CA signature; the only thing that remains is to check
        the hostname.
        """
        if isinstance(self._ssl_options, dict):
            verify_mode = self._ssl_options.get('cert_reqs', ssl.CERT_NONE)
        elif isinstance(self._ssl_options, ssl.SSLContext):
            verify_mode = self._ssl_options.verify_mode
        assert verify_mode in (ssl.CERT_NONE, ssl.CERT_REQUIRED, ssl.CERT_OPTIONAL)
        if verify_mode == ssl.CERT_NONE or self._server_hostname is None:
            return True
        cert = self.socket.getpeercert()
        if cert is None and verify_mode == ssl.CERT_REQUIRED:
            gen_log.warning("No SSL certificate given")
            return False
        try:
            ssl_match_hostname(peercert, self._server_hostname)
        except SSLCertificateError:
            gen_log.warning("Invalid SSL certificate", exc_info=True)
            return False
        else:
            return True
项目:annotated-py-tornado    作者:hhstore    | 项目源码 | 文件源码
def _verify_cert(self, peercert):
        """Returns True if peercert is valid according to the configured
        validation mode and hostname.

        The ssl handshake already tested the certificate for a valid
        CA signature; the only thing that remains is to check
        the hostname.
        """
        if isinstance(self._ssl_options, dict):
            verify_mode = self._ssl_options.get('cert_reqs', ssl.CERT_NONE)
        elif isinstance(self._ssl_options, ssl.SSLContext):
            verify_mode = self._ssl_options.verify_mode
        assert verify_mode in (ssl.CERT_NONE, ssl.CERT_REQUIRED, ssl.CERT_OPTIONAL)
        if verify_mode == ssl.CERT_NONE or self._server_hostname is None:
            return True
        cert = self.socket.getpeercert()
        if cert is None and verify_mode == ssl.CERT_REQUIRED:
            gen_log.warning("No SSL certificate given")
            return False
        try:
            ssl_match_hostname(peercert, self._server_hostname)
        except SSLCertificateError:
            gen_log.warning("Invalid SSL certificate", exc_info=True)
            return False
        else:
            return True
项目:kekescan    作者:xiaoxiaoleo    | 项目源码 | 文件源码
def validate_cert_reqs(option, value):
        """Validate the cert reqs are valid. It must be None or one of the
        three values ``ssl.CERT_NONE``, ``ssl.CERT_OPTIONAL`` or
        ``ssl.CERT_REQUIRED``.
        """
        if value is None:
            return value
        elif isinstance(value, string_type) and hasattr(ssl, value):
            value = getattr(ssl, value)

        if value in (ssl.CERT_NONE, ssl.CERT_OPTIONAL, ssl.CERT_REQUIRED):
            return value
        raise ValueError("The value of %s must be one of: "
                         "`ssl.CERT_NONE`, `ssl.CERT_OPTIONAL` or "
                         "`ssl.CERT_REQUIRED" % (option,))
项目:kekescan    作者:xiaoxiaoleo    | 项目源码 | 文件源码
def __get_verify_mode(self):
        """Whether to try to verify other peers' certificates and how to
        behave if verification fails. This attribute must be one of
        ssl.CERT_NONE, ssl.CERT_OPTIONAL or ssl.CERT_REQUIRED.
        """
        return self._verify_mode
项目:deb-python-kafka    作者:openstack    | 项目源码 | 文件源码
def _wrap_ssl(self):
        assert self.config['security_protocol'] in ('SSL', 'SASL_SSL')
        if self._ssl_context is None:
            log.debug('%s: configuring default SSL Context', self)
            self._ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)  # pylint: disable=no-member
            self._ssl_context.options |= ssl.OP_NO_SSLv2  # pylint: disable=no-member
            self._ssl_context.options |= ssl.OP_NO_SSLv3  # pylint: disable=no-member
            self._ssl_context.verify_mode = ssl.CERT_OPTIONAL
            if self.config['ssl_check_hostname']:
                self._ssl_context.check_hostname = True
            if self.config['ssl_cafile']:
                log.info('%s: Loading SSL CA from %s', self, self.config['ssl_cafile'])
                self._ssl_context.load_verify_locations(self.config['ssl_cafile'])
                self._ssl_context.verify_mode = ssl.CERT_REQUIRED
            if self.config['ssl_certfile'] and self.config['ssl_keyfile']:
                log.info('%s: Loading SSL Cert from %s', self, self.config['ssl_certfile'])
                log.info('%s: Loading SSL Key from %s', self, self.config['ssl_keyfile'])
                self._ssl_context.load_cert_chain(
                    certfile=self.config['ssl_certfile'],
                    keyfile=self.config['ssl_keyfile'],
                    password=self.config['ssl_password'])
            if self.config['ssl_crlfile']:
                if not hasattr(ssl, 'VERIFY_CRL_CHECK_LEAF'):
                    error = 'No CRL support with this version of Python.'
                    log.error('%s: %s Disconnecting.', self, error)
                    self.close(Errors.ConnectionError(error))
                    return
                log.info('%s: Loading SSL CRL from %s', self, self.config['ssl_crlfile'])
                self._ssl_context.load_verify_locations(self.config['ssl_crlfile'])
                # pylint: disable=no-member
                self._ssl_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
        log.debug('%s: wrapping socket in ssl context', self)
        try:
            self._sock = self._ssl_context.wrap_socket(
                self._sock,
                server_hostname=self.hostname,
                do_handshake_on_connect=False)
        except ssl.SSLError as e:
            log.exception('%s: Failed to wrap socket in SSLContext!', self)
            self.close(e)
项目:get_started_with_respeaker    作者:respeaker    | 项目源码 | 文件源码
def _verify_cert(self, peercert):
        """Returns True if peercert is valid according to the configured
        validation mode and hostname.

        The ssl handshake already tested the certificate for a valid
        CA signature; the only thing that remains is to check
        the hostname.
        """
        if isinstance(self._ssl_options, dict):
            verify_mode = self._ssl_options.get('cert_reqs', ssl.CERT_NONE)
        elif isinstance(self._ssl_options, ssl.SSLContext):
            verify_mode = self._ssl_options.verify_mode
        assert verify_mode in (ssl.CERT_NONE, ssl.CERT_REQUIRED, ssl.CERT_OPTIONAL)
        if verify_mode == ssl.CERT_NONE or self._server_hostname is None:
            return True
        cert = self.socket.getpeercert()
        if cert is None and verify_mode == ssl.CERT_REQUIRED:
            gen_log.warning("No SSL certificate given")
            return False
        try:
            ssl_match_hostname(peercert, self._server_hostname)
        except SSLCertificateError:
            gen_log.warning("Invalid SSL certificate", exc_info=True)
            return False
        else:
            return True
项目:flask-zhenai-mongo-echarts    作者:Fretice    | 项目源码 | 文件源码
def validate_cert_reqs(option, value):
        """Validate the cert reqs are valid. It must be None or one of the
        three values ``ssl.CERT_NONE``, ``ssl.CERT_OPTIONAL`` or
        ``ssl.CERT_REQUIRED``.
        """
        if value is None:
            return value
        elif isinstance(value, string_type) and hasattr(ssl, value):
            value = getattr(ssl, value)

        if value in (ssl.CERT_NONE, ssl.CERT_OPTIONAL, ssl.CERT_REQUIRED):
            return value
        raise ValueError("The value of %s must be one of: "
                         "`ssl.CERT_NONE`, `ssl.CERT_OPTIONAL` or "
                         "`ssl.CERT_REQUIRED`" % (option,))
项目:flask-zhenai-mongo-echarts    作者:Fretice    | 项目源码 | 文件源码
def __get_verify_mode(self):
        """Whether to try to verify other peers' certificates and how to
        behave if verification fails. This attribute must be one of
        ssl.CERT_NONE, ssl.CERT_OPTIONAL or ssl.CERT_REQUIRED.
        """
        return self._verify_mode
项目:teleport    作者:eomsoft    | 项目源码 | 文件源码
def _verify_cert(self, peercert):
        """Returns True if peercert is valid according to the configured
        validation mode and hostname.

        The ssl handshake already tested the certificate for a valid
        CA signature; the only thing that remains is to check
        the hostname.
        """
        if isinstance(self._ssl_options, dict):
            verify_mode = self._ssl_options.get('cert_reqs', ssl.CERT_NONE)
        elif isinstance(self._ssl_options, ssl.SSLContext):
            verify_mode = self._ssl_options.verify_mode
        assert verify_mode in (ssl.CERT_NONE, ssl.CERT_REQUIRED, ssl.CERT_OPTIONAL)
        if verify_mode == ssl.CERT_NONE or self._server_hostname is None:
            return True
        cert = self.socket.getpeercert()
        if cert is None and verify_mode == ssl.CERT_REQUIRED:
            gen_log.warning("No SSL certificate given")
            return False
        try:
            ssl_match_hostname(peercert, self._server_hostname)
        except SSLCertificateError as e:
            gen_log.warning("Invalid SSL certificate: %s" % e)
            return False
        else:
            return True
项目:Data-visualization    作者:insta-code1    | 项目源码 | 文件源码
def validate_cert_reqs(option, value):
        """Validate the cert reqs are valid. It must be None or one of the
        three values ``ssl.CERT_NONE``, ``ssl.CERT_OPTIONAL`` or
        ``ssl.CERT_REQUIRED``.
        """
        if value is None:
            return value
        elif isinstance(value, string_type) and hasattr(ssl, value):
            value = getattr(ssl, value)

        if value in (ssl.CERT_NONE, ssl.CERT_OPTIONAL, ssl.CERT_REQUIRED):
            return value
        raise ValueError("The value of %s must be one of: "
                         "`ssl.CERT_NONE`, `ssl.CERT_OPTIONAL` or "
                         "`ssl.CERT_REQUIRED" % (option,))
项目:Data-visualization    作者:insta-code1    | 项目源码 | 文件源码
def __get_verify_mode(self):
        """Whether to try to verify other peers' certificates and how to
        behave if verification fails. This attribute must be one of
        ssl.CERT_NONE, ssl.CERT_OPTIONAL or ssl.CERT_REQUIRED.
        """
        return self._verify_mode
项目:projects-2017-2    作者:ncss    | 项目源码 | 文件源码
def _verify_cert(self, peercert):
        """Returns True if peercert is valid according to the configured
        validation mode and hostname.

        The ssl handshake already tested the certificate for a valid
        CA signature; the only thing that remains is to check
        the hostname.
        """
        if isinstance(self._ssl_options, dict):
            verify_mode = self._ssl_options.get('cert_reqs', ssl.CERT_NONE)
        elif isinstance(self._ssl_options, ssl.SSLContext):
            verify_mode = self._ssl_options.verify_mode
        assert verify_mode in (ssl.CERT_NONE, ssl.CERT_REQUIRED, ssl.CERT_OPTIONAL)
        if verify_mode == ssl.CERT_NONE or self._server_hostname is None:
            return True
        cert = self.socket.getpeercert()
        if cert is None and verify_mode == ssl.CERT_REQUIRED:
            gen_log.warning("No SSL certificate given")
            return False
        try:
            ssl_match_hostname(peercert, self._server_hostname)
        except SSLCertificateError as e:
            gen_log.warning("Invalid SSL certificate: %s" % e)
            return False
        else:
            return True
项目:aweasome_learning    作者:Knight-ZXW    | 项目源码 | 文件源码
def _verify_cert(self, peercert):
        """Returns True if peercert is valid according to the configured
        validation mode and hostname.

        The ssl handshake already tested the certificate for a valid
        CA signature; the only thing that remains is to check
        the hostname.
        """
        if isinstance(self._ssl_options, dict):
            verify_mode = self._ssl_options.get('cert_reqs', ssl.CERT_NONE)
        elif isinstance(self._ssl_options, ssl.SSLContext):
            verify_mode = self._ssl_options.verify_mode
        assert verify_mode in (ssl.CERT_NONE, ssl.CERT_REQUIRED, ssl.CERT_OPTIONAL)
        if verify_mode == ssl.CERT_NONE or self._server_hostname is None:
            return True
        cert = self.socket.getpeercert()
        if cert is None and verify_mode == ssl.CERT_REQUIRED:
            gen_log.warning("No SSL certificate given")
            return False
        try:
            ssl_match_hostname(peercert, self._server_hostname)
        except SSLCertificateError as e:
            gen_log.warning("Invalid SSL certificate: %s" % e)
            return False
        else:
            return True