Python ssl 模块,html() 实例源码

我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用ssl.html()

项目:wptagent    作者:WPO-Foundation    | 项目源码 | 文件源码
def close(self, code=1000, reason=''):
        """
        Call this method to initiate the websocket connection
        closing by sending a close frame to the connected peer.
        The ``code`` is the status code representing the
        termination's reason.

        Once this method is called, the ``server_terminated``
        attribute is set. Calling this method several times is
        safe as the closing frame will be sent only the first
        time.

        .. seealso:: Defined Status Codes http://tools.ietf.org/html/rfc6455#section-7.4.1
        """
        if not self.server_terminated:
            self.server_terminated = True
            try:
                self._write(self.stream.close(code=code, reason=reason).single(mask=self.stream.always_mask))
            except Exception as ex:
                logger.error("Error when terminating the connection: %s", str(ex))
项目:annotated-py-tornado    作者:hhstore    | 项目源码 | 文件源码
def get_ssl_certificate(self, binary_form=False):
        """Returns the client's SSL certificate, if any.

        To use client certificates, the HTTPServer must have been constructed
        with cert_reqs set in ssl_options, e.g.::

            server = HTTPServer(app,
                ssl_options=dict(
                    certfile="foo.crt",
                    keyfile="foo.key",
                    cert_reqs=ssl.CERT_REQUIRED,
                    ca_certs="cacert.crt"))

        By default, the return value is a dictionary (or None, if no
        client certificate is present).  If ``binary_form`` is true, a
        DER-encoded form of the certificate is returned instead.  See
        SSLSocket.getpeercert() in the standard library for more
        details.
        http://docs.python.org/library/ssl.html#sslsocket-objects
        """
        try:
            return self.connection.stream.socket.getpeercert(
                binary_form=binary_form)
        except ssl.SSLError:
            return None
项目:get_started_with_respeaker    作者:respeaker    | 项目源码 | 文件源码
def get_ssl_certificate(self, binary_form=False):
        """Returns the client's SSL certificate, if any.

        To use client certificates, the HTTPServer must have been constructed
        with cert_reqs set in ssl_options, e.g.::

            server = HTTPServer(app,
                ssl_options=dict(
                    certfile="foo.crt",
                    keyfile="foo.key",
                    cert_reqs=ssl.CERT_REQUIRED,
                    ca_certs="cacert.crt"))

        By default, the return value is a dictionary (or None, if no
        client certificate is present).  If ``binary_form`` is true, a
        DER-encoded form of the certificate is returned instead.  See
        SSLSocket.getpeercert() in the standard library for more
        details.
        http://docs.python.org/library/ssl.html#sslsocket-objects
        """
        try:
            return self.connection.stream.socket.getpeercert(
                binary_form=binary_form)
        except ssl.SSLError:
            return None
项目:GotoX    作者:SeaHOH    | 项目源码 | 文件源码
def __init__(self, max_window=4, timeout=8, proxy='', ssl_ciphers=None, max_retry=2):
        # http://docs.python.org/dev/library/ssl.html
        # http://blog.ivanristic.com/2009/07/examples-of-the-information-collected-from-ssl-handshakes.html
        # http://src.chromium.org/svn/trunk/src/net/third_party/nss/ssl/sslenum.c
        # http://www.openssl.org/docs/apps/ciphers.html
        # openssl s_server -accept 443 -key CA.crt -cert CA.crt
        # set_ciphers as Modern Browsers
        BaseHTTPUtil.__init__(self, GC.LINK_OPENSSL, os.path.join(cert_dir, 'cacerts'), ssl_ciphers)
        self.max_window = max_window
        self.max_retry = max_retry
        self.timeout = timeout
        self.proxy = proxy
        self.tcp_connection_time = LRUCache(512 if self.gws else 4096)
        self.ssl_connection_time = LRUCache(512 if self.gws else 4096)

        if self.gws and GC.GAE_ENABLEPROXY:
            self.gws_front_connection_time = LRUCache(128)
            self.gws_front_connection_time.set('ip', LRUCache(128), noexpire=True)
            self.create_ssl_connection = self.create_gws_connection_withproxy

        #if self.proxy:
        #    dns_resolve = self.__dns_resolve_withproxy
        #    self.create_connection = self.__create_connection_withproxy
        #    self.create_ssl_connection = self.__create_ssl_connection_withproxy
项目:aws-iot-device-sdk-python    作者:aws    | 项目源码 | 文件源码
def set_cert_credentials_provider(self, cert_credentials_provider):
        # History issue from Yun SDK where AR9331 embedded Linux only have Python 2.7.3
        # pre-installed. In this version, TLSv1_2 is not even an option.
        # SSLv23 is a work-around which selects the highest TLS version between the client
        # and service. If user installs opensslv1.0.1+, this option will work fine for Mutual
        # Auth.
        # Note that we cannot force TLSv1.2 for Mutual Auth. in Python 2.7.3 and TLS support
        # in Python only starts from Python2.7.
        # See also: https://docs.python.org/2/library/ssl.html#ssl.PROTOCOL_SSLv23
        if self._use_wss:
            ca_path = cert_credentials_provider.get_ca_path()
            self._paho_client.tls_set(ca_certs=ca_path, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_SSLv23)
        else:
            ca_path = cert_credentials_provider.get_ca_path()
            cert_path = cert_credentials_provider.get_cert_path()
            key_path = cert_credentials_provider.get_key_path()
            self._paho_client.tls_set(ca_certs=ca_path,certfile=cert_path, keyfile=key_path,
                                      cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_SSLv23)
项目:PyKI    作者:pykiki    | 项目源码 | 文件源码
def index():
    '''
        Index page for listing all possibilities
    '''

    htmlpage = '''
    <html>
        <HEAD>
            <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.7/css/bootstrap.min.css" integrity="sha384-BVYiiSIFeK1dGmJRAkycuHAHRg32OmUcww7on3RYdg4Va+PmSTsz/K68vbdEjh4u" crossorigin="anonymous">
        </HEAD>
        <BODY>
            <div class='container'>
                <h1>
                    Welcome to flask tests, here are links to test func calls:
                </h1>
                <p>
                    <a href='https://"+lhost+":"+str(lport)+"/listCert'> List of certificate in the pki database </a>
                </p>
                <p>
                    <a href='https://"+lhost+":"+str(lport)+"/help'> PKI documentation</a>
                </p>
            </div>

        </BODY>
    </html>
    '''
    # allow us to modify headers
    resp = make_response(htmlpage)
    resp.headers['Server'] = 'PyKI amaibach API'
    return(resp)
项目:PyKI    作者:pykiki    | 项目源码 | 文件源码
def help():
    '''
        Documentation page
    '''

    info = open(curScriptDir + "/help.html", 'rt')
    helpContent = info.read()
    info.close()

    # allow us to modify headers
    resp = make_response(helpContent)
    resp.headers['Server'] = 'PyKI amaibach API'

    return(resp)
项目:wptagent    作者:WPO-Foundation    | 项目源码 | 文件源码
def closed(self, code, reason=None):
        """
        Called  when the websocket stream and connection are finally closed.
        The provided ``code`` is status set by the other point and
        ``reason`` is a human readable message.

        .. seealso:: Defined Status Codes http://tools.ietf.org/html/rfc6455#section-7.4.1
        """
        pass
项目:dcos-rabbitmq    作者:sheepkiller    | 项目源码 | 文件源码
def get_cluster_version(dcos_url):
    """Given a cluster url, return its version string, such as 1.7, 1.8,
    1.8.8, 1.9, 1.9-dev, etc"""
    version_url = "%s/%s" % (dcos_url, "dcos-metadata/dcos-version.json")
    # Since our test clusters have weird certs that won't validate, turn off
    # validation
    try:
        # this will handle a variety of versions, including some 2.7.x
        # (haven't investigated which) and all recent 3.x
        noverify_context = ssl.SSLContext()
    except:
        if hasattr(ssl, 'PROTOCOL_TLS'):
            # 2.7.13
            noverify_context = ssl.SSLContext(ssl.PROTOCOL_TLS)
        elif hasattr(ssl, 'OP_NO_SSLv2'):
            # 2.7.9-2.7.12
            logger.warn("Old python; Asking for something better than sslv2 or 3")
            noverify_context =  ssl.SSLContext(ssl.PROTOCOL_SSLv23)
            # explicitly switch these off.  This is recommended for ancient
            # versions as per https://docs.python.org/2/library/ssl.html#protocol-versions
            noverify_context.options |=  ssl.OP_NO_SSLv2
            noverify_context.options |=  ssl.OP_NO_SSLv3
        else:
            # before 2.7.9
            logger.error("*VERY* old python.  Very weak/unsafe encryption ahoy.")
            noverify_context =  ssl.SSLContext(ssl.PROTOCOL_SSLv23)

    response = urllib.request.urlopen(version_url, context=noverify_context)
    encoding = 'utf-8' # default
    try:
        #python3
        provided_encoding = response.headers.get_content_charset()
        if provided_encoding:
            encoding =  provided_encoding
    except:
        pass
    json_s = response.read().decode(encoding)
    ver_s = json.loads(json_s)['version']
    return ver_s
项目:elasticsplunk    作者:brunotm    | 项目源码 | 文件源码
def create_ssl_context(**kwargs):
    """
    A helper function around creating an SSL context

    https://docs.python.org/3/library/ssl.html#context-creation

    Accepts kwargs in the same manner as `create_default_context`.
    """
    ctx = ssl.create_default_context(**kwargs)
    return ctx
项目:AshsSDK    作者:thehappydinoa    | 项目源码 | 文件源码
def connect(self, keepAliveInterval=30):
        if keepAliveInterval is None :
            self._log.error("connect: None type inputs detected.")
            raise TypeError("None type inputs detected.")
        if not isinstance(keepAliveInterval, int):
            self._log.error("connect: Wrong input type detected. KeepAliveInterval must be an integer.")
            raise TypeError("Non-integer type inputs detected.")
        # Return connect succeeded/failed
        ret = False
        # TLS configuration
        if self._useWebsocket:
            # History issue from Yun SDK where AR9331 embedded Linux only have Python 2.7.3 
            # pre-installed. In this version, TLSv1_2 is not even an option.
            # SSLv23 is a work-around which selects the highest TLS version between the client
            # and service. If user installs opensslv1.0.1+, this option will work fine for Mutal
            # Auth.
            # Note that we cannot force TLSv1.2 for Mutual Auth. in Python 2.7.3 and TLS support
            # in Python only starts from Python2.7.
            # See also: https://docs.python.org/2/library/ssl.html#ssl.PROTOCOL_SSLv23
            self._pahoClient.tls_set(ca_certs=self._cafile, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_SSLv23)
            self._log.info("Connection type: Websocket")
        else:
            self._pahoClient.tls_set(self._cafile, self._cert, self._key, ssl.CERT_REQUIRED, ssl.PROTOCOL_SSLv23)  # Throw exception...
            self._log.info("Connection type: TLSv1.2 Mutual Authentication")
        # Connect
        self._pahoClient.connect(self._host, self._port, keepAliveInterval)  # Throw exception...
        self._pahoClient.loop_start()
        TenmsCount = 0
        while(TenmsCount != self._connectdisconnectTimeout * 100 and self._connectResultCode == sys.maxsize):
            TenmsCount += 1
            time.sleep(0.01)
        if(self._connectResultCode == sys.maxsize):
            self._log.error("Connect timeout.")
            self._pahoClient.loop_stop()
            raise connectTimeoutException()
        elif(self._connectResultCode == 0):
            ret = True
            self._log.info("Connected to AWS IoT.")
            self._log.debug("Connect time consumption: " + str(float(TenmsCount) * 10) + "ms.")
        else:
            self._log.error("A connect error happened: " + str(self._connectResultCode))
            self._pahoClient.loop_stop()
            raise connectError(self._connectResultCode)
        return ret
项目:wptagent    作者:WPO-Foundation    | 项目源码 | 文件源码
def _get_from_pending(self):
        """
        The SSL socket object provides the same interface
        as the socket interface but behaves differently.

        When data is sent over a SSL connection
        more data may be read than was requested from by
        the ws4py websocket object.

        In that case, the data may have been indeed read
        from the underlying real socket, but not read by the
        application which will expect another trigger from the
        manager's polling mechanism as if more data was still on the
        wire. This will happen only when new data is
        sent by the other peer which means there will be
        some delay before the initial read data is handled
        by the application.

        Due to this, we have to rely on a non-public method
        to query the internal SSL socket buffer if it has indeed
        more data pending in its buffer.

        Now, some people in the Python community
        `discourage <https://bugs.python.org/issue21430>`_
        this usage of the ``pending()`` method because it's not
        the right way of dealing with such use case. They advise
        `this approach <https://docs.python.org/dev/library/ssl.html#notes-on-non-blocking-sockets>`_
        instead. Unfortunately, this applies only if the
        application can directly control the poller which is not
        the case with the WebSocket abstraction here.

        We therefore rely on this `technic <http://stackoverflow.com/questions/3187565/select-and-ssl-in-python>`_
        which seems to be valid anyway.

        This is a bit of a shame because we have to process
        more data than what wanted initially.
        """
        data = b""
        pending = self.sock.pending()
        while pending:
            data += self.sock.recv(pending)
            pending = self.sock.pending()
        return data
项目:cloudcam    作者:revmischa    | 项目源码 | 文件源码
def connect(self, keepAliveInterval=30):
        if keepAliveInterval is None :
            self._log.error("connect: None type inputs detected.")
            raise TypeError("None type inputs detected.")
        if not isinstance(keepAliveInterval, int):
            self._log.error("connect: Wrong input type detected. KeepAliveInterval must be an integer.")
            raise TypeError("Non-integer type inputs detected.")
        # Return connect succeeded/failed
        ret = False
        # TLS configuration
        if self._useWebsocket:
            # History issue from Yun SDK where AR9331 embedded Linux only have Python 2.7.3 
            # pre-installed. In this version, TLSv1_2 is not even an option.
            # SSLv23 is a work-around which selects the highest TLS version between the client
            # and service. If user installs opensslv1.0.1+, this option will work fine for Mutal
            # Auth.
            # Note that we cannot force TLSv1.2 for Mutual Auth. in Python 2.7.3 and TLS support
            # in Python only starts from Python2.7.
            # See also: https://docs.python.org/2/library/ssl.html#ssl.PROTOCOL_SSLv23
            self._pahoClient.tls_set(ca_certs=self._cafile, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_SSLv23)
            self._log.info("Connection type: Websocket")
        else:
            self._pahoClient.tls_set(self._cafile, self._cert, self._key, ssl.CERT_REQUIRED, ssl.PROTOCOL_SSLv23)  # Throw exception...
            self._log.info("Connection type: TLSv1.2 Mutual Authentication")
        # Connect
        self._pahoClient.connect(self._host, self._port, keepAliveInterval)  # Throw exception...
        self._pahoClient.loop_start()
        TenmsCount = 0
        while(TenmsCount != self._connectdisconnectTimeout * 100 and self._connectResultCode == sys.maxsize):
            TenmsCount += 1
            time.sleep(0.01)
        if(self._connectResultCode == sys.maxsize):
            self._log.error("Connect timeout.")
            self._pahoClient.loop_stop()
            raise connectTimeoutException()
        elif(self._connectResultCode == 0):
            ret = True
            self._log.info("Connected to AWS IoT.")
            self._log.debug("Connect time consumption: " + str(float(TenmsCount) * 10) + "ms.")
        else:
            self._log.error("A connect error happened: " + str(self._connectResultCode))
            self._pahoClient.loop_stop()
            raise connectError(self._connectResultCode)
        return ret