Python OpenSSL.SSL 模块,Context() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用OpenSSL.SSL.Context()

项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def __init__(self, privateKeyFileName, certificateFileName,
                 sslmethod=SSL.SSLv23_METHOD, _contextFactory=SSL.Context):
        """
        @param privateKeyFileName: Name of a file containing a private key
        @param certificateFileName: Name of a file containing a certificate
        @param sslmethod: The SSL method to use
        """
        self.privateKeyFileName = privateKeyFileName
        self.certificateFileName = certificateFileName
        self.sslmethod = sslmethod
        self._contextFactory = _contextFactory

        # Create a context object right now.  This is to force validation of
        # the given parameters so that errors are detected earlier rather
        # than later.
        self.cacheContext()
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def __init__(self, server_address, HandlerClass, logRequests=True):
        """Secure XML-RPC server.

        It it very similar to SimpleXMLRPCServer but it uses HTTPS for transporting XML data.
        """
        self.logRequests = logRequests

        SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self)
        SocketServer.BaseServer.__init__(self, server_address, HandlerClass)
        ctx = SSL.Context(SSL.SSLv23_METHOD)
        ctx.use_privatekey_file (KEYFILE)
        ctx.use_certificate_file(CERTFILE)
        self.socket = SSL.Connection(ctx, socket.socket(self.address_family,
                                                        self.socket_type))
        self.server_bind()
        self.server_activate()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test03_ssl_verification_of_peer_fails(self):
        ctx = SSL.Context(SSL.TLSv1_METHOD)

        def verify_callback(conn, x509, errnum, errdepth, preverify_ok): 
            log.debug('SSL peer certificate verification failed for %r',
                      x509.get_subject())
            return preverify_ok 

        ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
        ctx.set_verify_depth(9)

        # Set bad location - unit test dir has no CA certs to verify with
        ctx.load_verify_locations(None, Constants.UNITTEST_DIR)

        conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT,
                               ssl_context=ctx)
        conn.connect()        
        self.assertRaises(SSL.Error, conn.request, 'GET', '/')
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test04_ssl_verification_with_subj_alt_name(self):
        ctx = SSL.Context(SSL.TLSv1_METHOD)

        verification = ServerSSLCertVerification(hostname='localhost')
        verify_callback = verification.get_verify_server_cert_func()

        ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
        ctx.set_verify_depth(9)

        # Set correct location for CA certs to verify with
        ctx.load_verify_locations(None, Constants.CACERT_DIR)

        conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT,
                               ssl_context=ctx)
        conn.connect()
        conn.request('GET', '/')
        resp = conn.getresponse()
        print('Response = %s' % resp.read())
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test04_ssl_verification_with_subj_common_name(self):
        ctx = SSL.Context(SSL.TLSv1_METHOD)

        # Explicitly set verification of peer hostname using peer certificate
        # subject common name
        verification = ServerSSLCertVerification(hostname='localhost',
                                                 subj_alt_name_match=False)

        verify_callback = verification.get_verify_server_cert_func()

        ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
        ctx.set_verify_depth(9)

        # Set correct location for CA certs to verify with
        ctx.load_verify_locations(None, Constants.CACERT_DIR)

        conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT,
                               ssl_context=ctx)
        conn.connect()
        conn.request('GET', '/')
        resp = conn.getresponse()
        print('Response = %s' % resp.read())
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def connect(self):
        """Create SSL socket and connect to peer
        """
        if getattr(self, 'ssl_context', None):
            if not isinstance(self.ssl_context, SSL.Context):
                raise TypeError('Expecting OpenSSL.SSL.Context type for "'
                                'ssl_context" attribute; got %r instead' %
                                self.ssl_context)
            ssl_context = self.ssl_context
        else:
            ssl_context = SSL.Context(self.__class__.default_ssl_method)

        sock = socket.create_connection((self.host, self.port), self.timeout)

        # Tunnel if using a proxy - ONLY available for Python 2.6.2 and above      
        if getattr(self, '_tunnel_host', None):
            self.sock = sock
            self._tunnel()

        self.sock = SSLSocket(ssl_context, sock)

        # Go to client mode.
        self.sock.set_connect_state()
项目:midip-sslyze    作者:soukupa5    | 项目源码 | 文件源码
def _test_dtls_ciphersuite(self, server_connectivity_info, dtls_version, cipher, port):
        """This function is used by threads to it investigates with support the cipher suite on server, when DTLS protocol(s) is/are tested. Returns instance of class AcceptCipher or RejectCipher.

            Args:
            server_connectivity_info (ServerConnectivityInfo): contains information for connection on server
            dtls_version (str): contains SSL/TLS protocol version, which is used to connect
            cipher (str): contains OpenSSL shortcut for identification cipher suite
            port (int): contains port number for connecting comunication.
        """
        cnx = SSL.Context(dtls_version)
        cnx.set_cipher_list(cipher)
        conn = SSL.Connection(cnx, socket.socket(socket.AF_INET, socket.SOCK_DGRAM))
        try:
            conn.connect((server_connectivity_info.ip_address, port))
            conn.do_handshake()
        except SSL.Error as e:
            error_msg = ((e[0])[0])[2]
            cipher_result = RejectCipher(TLS_OPENSSL_TO_RFC_NAMES_MAPPING[cipher], error_msg)
        else:
            cipher_result = AcceptCipher(TLS_OPENSSL_TO_RFC_NAMES_MAPPING[cipher])
        finally:
            conn.shutdown()
            conn.close()
        return cipher_result
项目:midip-sslyze    作者:soukupa5    | 项目源码 | 文件源码
def test_dtls_protocol_support(self, server_connectivity_info, dtls_version, port):
        """Tests if DTLS protocols are supported by server. Returns true if server supports protocol otherwise returns false.

            Args:
            server_connectivity_info (ServerConnectivityInfo): contains information for connection on server
            dtls_protocol (str): contains version of DTLS protocol, which is supposed to be tested
            port (int): contains port number for connecting comunication.
        """
        cnx = SSL.Context(dtls_version)
        cnx.set_cipher_list('ALL:COMPLEMENTOFALL')
        conn = SSL.Connection(cnx,socket.socket(socket.AF_INET, socket.SOCK_DGRAM))
        try:
            conn.connect((server_connectivity_info.ip_address, port))
            conn.do_handshake()
        except SSL.SysCallError as ex:
            if ex[0] == 111:
                raise ValueError('LuckyThirteenVulnerabilityTesterPlugin: It is entered wrong port for DTLS connection.')
            else:
                support = False
        else:
            support = True
        finally:
            conn.shutdown()
            conn.close()
        return support
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def ssl(sock, keyfile=None, certfile=None):
    context = SSL.Context(SSL.SSLv23_METHOD)
    if certfile is not None:
        context.use_certificate_file(certfile)
    if keyfile is not None:
        context.use_privatekey_file(keyfile)
    context.set_verify(SSL.VERIFY_NONE, lambda *x: True)
    timeout = sock.gettimeout()
    try:
        sock = sock._sock
    except AttributeError:
        pass
    connection = SSL.Connection(context, sock)
    ssl_sock = SSLObject(connection)
    ssl_sock.settimeout(timeout)

    try:
        sock.getpeername()
    except Exception:
        # no, no connection yet
        pass
    else:
        # yes, do the handshake
        ssl_sock.do_handshake()
    return ssl_sock
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def ssl(sock, keyfile=None, certfile=None):
    context = SSL.Context(SSL.SSLv23_METHOD)
    if certfile is not None:
        context.use_certificate_file(certfile)
    if keyfile is not None:
        context.use_privatekey_file(keyfile)
    context.set_verify(SSL.VERIFY_NONE, lambda *x: True)
    timeout = sock.gettimeout()
    try:
        sock = sock._sock
    except AttributeError:
        pass
    connection = SSL.Connection(context, sock)
    ssl_sock = SSLObject(connection)
    ssl_sock.settimeout(timeout)

    try:
        sock.getpeername()
    except Exception:
        # no, no connection yet
        pass
    else:
        # yes, do the handshake
        ssl_sock.do_handshake()
    return ssl_sock
项目:2FAssassin    作者:maxwellkoh    | 项目源码 | 文件源码
def main():
    """
    Connect to an SNI-enabled server and request a specific hostname, specified
    by argv[1], of it.
    """
    if len(argv) < 2:
        print 'Usage: %s <hostname>' % (argv[0],)
        return 1

    client = socket()

    print 'Connecting...',
    stdout.flush()
    client.connect(('127.0.0.1', 8443))
    print 'connected', client.getpeername()

    client_ssl = Connection(Context(TLSv1_METHOD), client)
    client_ssl.set_connect_state()
    client_ssl.set_tlsext_host_name(argv[1])
    client_ssl.do_handshake()
    print 'Server subject is', client_ssl.get_peer_certificate().get_subject()
    client_ssl.close()
项目:2FAssassin    作者:maxwellkoh    | 项目源码 | 文件源码
def main():
    """
    Run an SNI-enabled server which selects between a few certificates in a
    C{dict} based on the handshake request it receives from a client.
    """
    port = socket()
    port.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    port.bind(('', 8443))
    port.listen(3)

    print 'Accepting...',
    stdout.flush()
    server, addr = port.accept()
    print 'accepted', addr

    server_context = Context(TLSv1_METHOD)
    server_context.set_tlsext_servername_callback(pick_certificate)

    server_ssl = Connection(server_context, server)
    server_ssl.set_accept_state()
    server_ssl.do_handshake()
    server.close()
项目:2FAssassin    作者:maxwellkoh    | 项目源码 | 文件源码
def __init__(self, server_address, RequestHandlerClass):
        SocketServer.BaseServer.__init__(
            self, server_address, RequestHandlerClass
        )

        # Same as normal, but make it secure:
        ctx = SSL.Context(SSL.SSLv23_METHOD)
        ctx.set_options(SSL.OP_NO_SSLv2)

        dir = os.curdir
        ctx.use_privatekey_file(os.path.join(dir, 'server.pkey'))
        ctx.use_certificate_file(os.path.join(dir, 'server.cert'))

        self.socket = SSLWrapper(
            SSL.Connection(
                ctx, socket.socket(self.address_family, self.socket_type)
            )
        )
        self.server_bind()
        self.server_activate()
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def ssl(sock, keyfile=None, certfile=None):
    context = SSL.Context(SSL.SSLv23_METHOD)
    if certfile is not None:
        context.use_certificate_file(certfile)
    if keyfile is not None:
        context.use_privatekey_file(keyfile)
    context.set_verify(SSL.VERIFY_NONE, lambda *x: True)
    timeout = sock.gettimeout()
    try:
        sock = sock._sock
    except AttributeError:
        pass
    connection = SSL.Connection(context, sock)
    ssl_sock = SSLObject(connection)
    ssl_sock.settimeout(timeout)

    try:
        sock.getpeername()
    except Exception:
        # no, no connection yet
        pass
    else:
        # yes, do the handshake
        ssl_sock.do_handshake()
    return ssl_sock
项目:package-33c3    作者:info-beamer    | 项目源码 | 文件源码
def connect(self):
        """Create SSL socket and connect to peer
        """
        if getattr(self, 'ssl_context', None):
            if not isinstance(self.ssl_context, SSL.Context):
                raise TypeError('Expecting OpenSSL.SSL.Context type for "'
                                'ssl_context" attribute; got %r instead' %
                                self.ssl_context)
            ssl_context = self.ssl_context
        else:
            ssl_context = SSL.Context(self.__class__.default_ssl_method)

        sock = socket.create_connection((self.host, self.port), self.timeout)

        # Tunnel if using a proxy - ONLY available for Python 2.6.2 and above      
        if getattr(self, '_tunnel_host', None):
            self.sock = sock
            self._tunnel()

        self.sock = SSLSocket(ssl_context, sock)

        # Go to client mode.
        self.sock.set_connect_state()
项目:package-33c3    作者:info-beamer    | 项目源码 | 文件源码
def __init__(self, ssl_context, debuglevel=0):
        """
        @param ssl_context:SSL context
        @type ssl_context: OpenSSL.SSL.Context
        @param debuglevel: debug level for HTTPSHandler
        @type debuglevel: int
        """
        AbstractHTTPHandler.__init__(self, debuglevel)

        if ssl_context is not None:
            if not isinstance(ssl_context, SSL.Context):
                raise TypeError('Expecting OpenSSL.SSL.Context type for "'
                                'ssl_context" keyword; got %r instead' %
                                ssl_context)
            self.ssl_context = ssl_context
        else:
            self.ssl_context = SSL.Context(SSL.TLSv1_METHOD)
项目:package-33c3    作者:info-beamer    | 项目源码 | 文件源码
def connect(self):
        """Create SSL socket and connect to peer
        """
        if getattr(self, 'ssl_context', None):
            if not isinstance(self.ssl_context, SSL.Context):
                raise TypeError('Expecting OpenSSL.SSL.Context type for "'
                                'ssl_context" attribute; got %r instead' %
                                self.ssl_context)
            ssl_context = self.ssl_context
        else:
            ssl_context = SSL.Context(self.__class__.default_ssl_method)

        sock = socket.create_connection((self.host, self.port), self.timeout)

        # Tunnel if using a proxy - ONLY available for Python 2.6.2 and above      
        if getattr(self, '_tunnel_host', None):
            self.sock = sock
            self._tunnel()

        self.sock = SSLSocket(ssl_context, sock)

        # Go to client mode.
        self.sock.set_connect_state()
项目:package-33c3    作者:info-beamer    | 项目源码 | 文件源码
def __init__(self, ssl_context, debuglevel=0):
        """
        @param ssl_context:SSL context
        @type ssl_context: OpenSSL.SSL.Context
        @param debuglevel: debug level for HTTPSHandler
        @type debuglevel: int
        """
        AbstractHTTPHandler.__init__(self, debuglevel)

        if ssl_context is not None:
            if not isinstance(ssl_context, SSL.Context):
                raise TypeError('Expecting OpenSSL.SSL.Context type for "'
                                'ssl_context" keyword; got %r instead' %
                                ssl_context)
            self.ssl_context = ssl_context
        else:
            self.ssl_context = SSL.Context(SSL.TLSv1_METHOD)
项目:lantern-detection    作者:gongxijun    | 项目源码 | 文件源码
def bind(self, family, type, proto=0):
        """Create (or recreate) the actual socket object."""
        self.socket = socket.socket(family, type, proto)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
##        self.socket.setsockopt(socket.SOL_SOCKET, socket.TCP_NODELAY, 1)
        if self.ssl_certificate and self.ssl_private_key:
            if SSL is None:
                raise ImportError("You must install pyOpenSSL to use HTTPS.")

            # See http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/442473
            ctx = SSL.Context(SSL.SSLv23_METHOD)
            ctx.use_privatekey_file(self.ssl_private_key)
            ctx.use_certificate_file(self.ssl_certificate)
            self.socket = SSLConnection(ctx, self.socket)
            self.populate_ssl_environ()
        self.socket.bind(self.bind_addr)
项目:lantern-detection    作者:gongxijun    | 项目源码 | 文件源码
def bind(self, family, type, proto=0):
        """Create (or recreate) the actual socket object."""
        self.socket = socket.socket(family, type, proto)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
##        self.socket.setsockopt(socket.SOL_SOCKET, socket.TCP_NODELAY, 1)
        if self.ssl_certificate and self.ssl_private_key:
            if SSL is None:
                raise ImportError("You must install pyOpenSSL to use HTTPS.")

            # See http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/442473
            ctx = SSL.Context(SSL.SSLv23_METHOD)
            ctx.use_privatekey_file(self.ssl_private_key)
            ctx.use_certificate_file(self.ssl_certificate)
            self.socket = SSLConnection(ctx, self.socket)
            self.populate_ssl_environ()
        self.socket.bind(self.bind_addr)
项目:slack_scholar    作者:xLeitix    | 项目源码 | 文件源码
def test03_ssl_verification_of_peer_fails(self):
        ctx = SSL.Context(SSL.TLSv1_METHOD)

        def verify_callback(conn, x509, errnum, errdepth, preverify_ok): 
            log.debug('SSL peer certificate verification failed for %r',
                      x509.get_subject())
            return preverify_ok 

        ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
        ctx.set_verify_depth(9)

        # Set bad location - unit test dir has no CA certs to verify with
        ctx.load_verify_locations(None, Constants.UNITTEST_DIR)

        conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT,
                               ssl_context=ctx)
        conn.connect()        
        self.assertRaises(SSL.Error, conn.request, 'GET', '/')
项目:slack_scholar    作者:xLeitix    | 项目源码 | 文件源码
def test04_ssl_verification_with_subj_alt_name(self):
        ctx = SSL.Context(SSL.TLSv1_METHOD)

        verification = ServerSSLCertVerification(hostname='localhost')
        verify_callback = verification.get_verify_server_cert_func()

        ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
        ctx.set_verify_depth(9)

        # Set correct location for CA certs to verify with
        ctx.load_verify_locations(None, Constants.CACERT_DIR)

        conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT,
                               ssl_context=ctx)
        conn.connect()
        conn.request('GET', '/')
        resp = conn.getresponse()
        print('Response = %s' % resp.read())
项目:slack_scholar    作者:xLeitix    | 项目源码 | 文件源码
def test04_ssl_verification_with_subj_common_name(self):
        ctx = SSL.Context(SSL.TLSv1_METHOD)

        # Explicitly set verification of peer hostname using peer certificate
        # subject common name
        verification = ServerSSLCertVerification(hostname='localhost',
                                                 subj_alt_name_match=False)

        verify_callback = verification.get_verify_server_cert_func()

        ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
        ctx.set_verify_depth(9)

        # Set correct location for CA certs to verify with
        ctx.load_verify_locations(None, Constants.CACERT_DIR)

        conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT,
                               ssl_context=ctx)
        conn.connect()
        conn.request('GET', '/')
        resp = conn.getresponse()
        print('Response = %s' % resp.read())
项目:slack_scholar    作者:xLeitix    | 项目源码 | 文件源码
def connect(self):
        """Create SSL socket and connect to peer
        """
        if getattr(self, 'ssl_context', None):
            if not isinstance(self.ssl_context, SSL.Context):
                raise TypeError('Expecting OpenSSL.SSL.Context type for "'
                                'ssl_context" attribute; got %r instead' %
                                self.ssl_context)
            ssl_context = self.ssl_context
        else:
            ssl_context = SSL.Context(self.__class__.default_ssl_method)

        sock = socket.create_connection((self.host, self.port), self.timeout)

        # Tunnel if using a proxy - ONLY available for Python 2.6.2 and above      
        if getattr(self, '_tunnel_host', None):
            self.sock = sock
            self._tunnel()

        self.sock = SSLSocket(ssl_context, sock)

        # Go to client mode.
        self.sock.set_connect_state()
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def __init__(self, hostname, ctx):
        """
        Initialize L{ClientTLSOptions}.

        @param hostname: The hostname to verify as input by a human.
        @type hostname: L{unicode}

        @param ctx: an L{OpenSSL.SSL.Context} to use for new connections.
        @type ctx: L{OpenSSL.SSL.Context}.
        """
        self._ctx = ctx
        self._hostname = hostname
        self._hostnameBytes = _idnaBytes(hostname)
        self._hostnameASCII = self._hostnameBytes.decode("ascii")
        ctx.set_info_callback(
            _tolerateErrors(self._identityVerifyingInfoCallback)
        )
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def _identityVerifyingInfoCallback(self, connection, where, ret):
        """
        U{info_callback
        <http://pythonhosted.org/pyOpenSSL/api/ssl.html#OpenSSL.SSL.Context.set_info_callback>
        } for pyOpenSSL that verifies the hostname in the presented certificate
        matches the one passed to this L{ClientTLSOptions}.

        @param connection: the connection which is handshaking.
        @type connection: L{OpenSSL.SSL.Connection}

        @param where: flags indicating progress through a TLS handshake.
        @type where: L{int}

        @param ret: ignored
        @type ret: ignored
        """
        if where & SSL.SSL_CB_HANDSHAKE_START:
            connection.set_tlsext_host_name(self._hostnameBytes)
        elif where & SSL.SSL_CB_HANDSHAKE_DONE:
            try:
                verifyHostname(connection, self._hostnameASCII)
            except VerificationError:
                f = Failure()
                transport = connection.get_app_data()
                transport.failVerification(f)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_extraChainFilesAreAddedIfSupplied(self):
        """
        If C{extraCertChain} is set and all prerequisites are met, the
        specified chain certificates are added to C{Context}s that get
        created.
        """
        opts = sslverify.OpenSSLCertificateOptions(
            privateKey=self.sKey,
            certificate=self.sCert,
            extraCertChain=self.extraCertChain,
        )
        opts._contextFactory = FakeContext
        ctx = opts.getContext()
        self.assertEqual(self.sKey, ctx._privateKey)
        self.assertEqual(self.sCert, ctx._certificate)
        self.assertEqual(self.extraCertChain, ctx._extraCertChain)
项目:Docker-XX-Net    作者:kuanghy    | 项目源码 | 文件源码
def ssl(sock, keyfile=None, certfile=None):
    context = SSL.Context(SSL.SSLv23_METHOD)
    if certfile is not None:
        context.use_certificate_file(certfile)
    if keyfile is not None:
        context.use_privatekey_file(keyfile)
    context.set_verify(SSL.VERIFY_NONE, lambda *x: True)
    timeout = sock.gettimeout()
    try:
        sock = sock._sock
    except AttributeError:
        pass
    connection = SSL.Connection(context, sock)
    ssl_sock = SSLObject(connection)
    ssl_sock.settimeout(timeout)

    try:
        sock.getpeername()
    except Exception:
        # no, no connection yet
        pass
    else:
        # yes, do the handshake
        ssl_sock.do_handshake()
    return ssl_sock
项目:simple-ddns    作者:ihciah    | 项目源码 | 文件源码
def main():
    resolver = DNSResolver()
    factory = server.DNSServerFactory(
        clients=[resolver]
    )

    protocol = dns.DNSDatagramProtocol(controller=factory)
    httpserver = webserver.Site(HTTPServer(resolver))
    context = Context(TLSv1_METHOD)
    context.use_certificate_chain_file(SERVER_CONFIG["ssl_crt"])
    context.use_privatekey_file(SERVER_CONFIG["ssl_key"])

    reactor.listenUDP(SERVER_CONFIG["dns_port"], protocol)
    reactor.listenSSL(SERVER_CONFIG["http_port"], httpserver, ContextFactory(context))

    reactor.run()
项目:estreamer    作者:spohara79    | 项目源码 | 文件源码
def __enter__(self):
        self.ctx = SSL.Context(SSL.TLSv1_METHOD)
        if self.verify:
            self.ctx.set_verify(SSL.VERIFY_PEER, self.validate_cert)
            self.ctx.load_verify_locations(self.verify)
            self.trusted_cert = crypto.load_certificate(crypto.FILETYPE_PEM, file(self.verify).read())
        self.ctx.use_privatekey(self.pkey)
        self.ctx.use_certificate(self.cert)
        self.sock = SSL.Connection(self.ctx, socket.socket(socket.AF_INET, socket.SOCK_STREAM))
        self.sock.connect((self.host, self.port))
        return self
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def __init__(self, registerInstance, server_address, keyFile=DEFAULTKEYFILE, certFile=DEFAULTCERTFILE, logRequests=True):
        """Secure Documenting XML-RPC server.
        It it very similar to DocXMLRPCServer but it uses HTTPS for transporting XML data.
        """
        DocXMLRPCServer.__init__(self, server_address, SecureDocXMLRpcRequestHandler, logRequests)
        self.logRequests = logRequests

        # stuff for doc server
        try: self.set_server_title(registerInstance.title)
        except AttributeError: self.set_server_title('default title')
        try: self.set_server_name(registerInstance.name)
        except AttributeError: self.set_server_name('default name')
        if registerInstance.__doc__: self.set_server_documentation(registerInstance.__doc__)
        else: self.set_server_documentation('default documentation')
        self.register_introspection_functions()

        # init stuff, handle different versions:
        try:
            SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self)
        except TypeError:
            # An exception is raised in Python 2.5 as the prototype of the __init__
            # method has changed and now has 3 arguments (self, allow_none, encoding)
            SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self, False, None)
        SocketServer.BaseServer.__init__(self, server_address, SecureDocXMLRpcRequestHandler)
        self.register_instance(registerInstance) # for some reason, have to register instance down here!

        # SSL socket stuff
        ctx = SSL.Context(SSL.SSLv23_METHOD)
        ctx.use_privatekey_file(keyFile)
        ctx.use_certificate_file(certFile)
        self.socket = SSL.Connection(ctx, socket.socket(self.address_family, self.socket_type))
        self.server_bind()
        self.server_activate()

        # requests count and condition, to allow for keyboard quit via CTL-C
        self.requests = 0
        self.rCondition = Condition()
项目:electrum-martexcoin-server    作者:martexcoin    | 项目源码 | 文件源码
def __init__(self, server_address, certfile, keyfile, RequestHandlerClass, bind_and_activate=True):
        from OpenSSL import SSL
        SocketServer.BaseServer.__init__(self, server_address, RequestHandlerClass)
        ctx = SSL.Context(SSL.SSLv23_METHOD)
        ctx.use_privatekey_file(keyfile)
        ctx.use_certificate_file(certfile)
        self.socket = SSL.Connection(ctx, socket.socket(self.address_family, self.socket_type))
        if bind_and_activate:
            self.server_bind()
            self.server_activate()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test01_configuration(self):
        config = Configuration(SSL.Context(SSL.TLSv1_METHOD), True)
        self.assertTrue(config.ssl_context)
        self.assertEqual(config.debug, True)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test02_fetch_from_url(self):
        config = Configuration(SSL.Context(SSL.TLSv1_METHOD), True)
        res = fetch_from_url(Constants.TEST_URI, config)
        self.assertTrue(res)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test03_open_url(self):
        config = Configuration(SSL.Context(SSL.TLSv1_METHOD), True)
        res = open_url(Constants.TEST_URI, config)
        self.assertEqual(res[0], 200, 
                         'open_url for %r failed' % Constants.TEST_URI)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test04_open_peer_cert_verification_fails(self):
        # Explicitly set empty CA directory to make verification fail
        ctx = SSL.Context(SSL.TLSv1_METHOD)
        verify_callback = lambda conn, x509, errnum, errdepth, preverify_ok: \
            preverify_ok 

        ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
        ctx.load_verify_locations(None, './')
        opener = build_opener(ssl_context=ctx)
        self.assertRaises(SSL.Error, opener.open, Constants.TEST_URI)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def __init__(self, host, port=None, strict=None,
                 timeout=socket._GLOBAL_DEFAULT_TIMEOUT, ssl_context=None):
        HTTPConnection.__init__(self, host, port, strict, timeout)
        if not hasattr(self, 'ssl_context'):
            self.ssl_context = None

        if ssl_context is not None:
            if not isinstance(ssl_context, SSL.Context):
                raise TypeError('Expecting OpenSSL.SSL.Context type for "'
                                'ssl_context" keyword; got %r instead' %
                                ssl_context)

            self.ssl_context = ssl_context
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def connect (self, host, port):
        if self.state == 1:
            print "Already has an active connection"
        elif self.type == 0: # TCP
            if self.ssl == 1:
                ctx = SSL.Context (SSL.SSLv23_METHOD)
                s = SSL.Connection (ctx, socket(AF_INET, SOCK_STREAM))
                try:
                    err = s.connect_ex ((host, port))
                except:
                    print "Couldn't connect SSL socket"
                    return
                if err == 0:
                    self.skt        = s
                    self.state  = 1
            else:
                s = socket (AF_INET, SOCK_STREAM)
                try:
                    err = s.connect_ex ((host, port))
                except:
                    print "Couldn't connect TCP socket"
                    return
                if err == 0:
                    self.skt        = s
                    self.state  = 1
        elif self.type == 1: # UDP
                s = socket (AF_INET, SOCK_DGRAM)
                try:
                    err = s.connect_ex ((host, port))
                except:
                    print "Couldn't create UDP socket"
                    return
                if err == 0:
                    self.skt        = s
                    self.state  = 1
        else:
            print "RAW sockets not implemented yet"
        if self.state == 1:
            return "OK"
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def getContext(self):
        """Return a SSL.Context object.
        """
        if self._context is None:
            self._context = self._makeContext()
        return self._context
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def getContext(self):
        """Return a SSL.Context object. override in subclasses."""
        raise NotImplementedError
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def cacheContext(self):
        ctx = SSL.Context(self.sslmethod)
        ctx.use_certificate_file(self.certificateFileName)
        ctx.use_privatekey_file(self.privateKeyFileName)
        self._context = ctx
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def getContext(self):
        return SSL.Context(self.method)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def getContext(self):
        ctx = SSL.Context(SSL.TLSv1_METHOD)
        ctx.use_certificate_file(self.filename)
        ctx.use_privatekey_file(self.filename)
        return ctx
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def getContext(self):
        """Create an SSL context."""
        from OpenSSL import SSL
        ctx = SSL.Context(SSL.SSLv23_METHOD)
        ctx.use_certificate_file(self.filename)
        ctx.use_privatekey_file(self.filename)
        return ctx
项目:plexivity    作者:mutschler    | 项目源码 | 文件源码
def ssl():
    from OpenSSL import SSL
    ctx = SSL.Context(SSL.SSLv23_METHOD)
    ctx.use_privatekey_file('ssl.key')
    ctx.use_certificate_file('ssl.cert')
    app.run(ssl_context=ctx)
项目:plexivity    作者:mutschler    | 项目源码 | 文件源码
def run_app():
    from subprocess import Popen
    import sys
    os.chdir(os.path.abspath(os.path.dirname(__file__)))
    path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "manage.py")
    args = [sys.executable, path, "db"]
    if not os.path.exists(os.path.join(config.DATA_DIR, "plexivity.db")):
        from app import db
        db.create_all()
        args.append("stamp")
        args.append("head")
    else:
        args.append("upgrade")

    Popen(args)

    helper.startScheduler()

    if config.USE_SSL:
        helper.generateSSLCert()
        try:
            from OpenSSL import SSL
            context = SSL.Context(SSL.SSLv23_METHOD)
            context.use_privatekey_file(os.path.join(config.DATA_DIR, "plexivity.key"))
            context.use_certificate_file(os.path.join(config.DATA_DIR, "plexivity.crt"))
            app.run(host="0.0.0.0", port=config.PORT, debug=False, ssl_context=context)
        except:
            logger.error("plexivity should use SSL but OpenSSL was not found, starting without SSL")
            app.run(host="0.0.0.0", port=config.PORT, debug=False)
    else:
        app.run(host="0.0.0.0", port=config.PORT, debug=False)
项目:midip-sslyze    作者:soukupa5    | 项目源码 | 文件源码
def get_support_dtls_protocols_by_client(self):
        """Returns array which contains all DTLS protocols which are supported by client.
        """
        dtls_ary = []
        for dtls_version in self.DTLS_MODULE:
            try:
                SSL.Context._methods[dtls_version]=getattr(_lib, self.DTLS_MODULE[dtls_version])
            except Exception as e:
                pass
            else:
                dtls_ary.append(dtls_version)
        return dtls_ary
项目:midip-sslyze    作者:soukupa5    | 项目源码 | 文件源码
def get_dtls_cipher_list(self, dtls_version):
        """Returns list of cipher suites available for protocol version, saves in parameter dtls_protocol.

            Args:
            dtls_protocol (str):.
        """
        cnx = SSL.Context(dtls_version)
        cnx.set_cipher_list('ALL:COMPLEMENTOFALL')
        conn = SSL.Connection(cnx, socket.socket(socket.AF_INET,socket.SOCK_DGRAM))
        return conn.get_cipher_list()
项目:lbryum-server    作者:lbryio    | 项目源码 | 文件源码
def __init__(self, server_address, certfile, keyfile, RequestHandlerClass, bind_and_activate=True):
        from OpenSSL import SSL
        SocketServer.BaseServer.__init__(self, server_address, RequestHandlerClass)
        ctx = SSL.Context(SSL.SSLv23_METHOD)
        ctx.use_privatekey_file(keyfile)
        ctx.use_certificate_file(certfile)
        self.socket = SSL.Connection(ctx, socket.socket(self.address_family, self.socket_type))
        if bind_and_activate:
            self.server_bind()
            self.server_activate()
项目:arithmancer    作者:google    | 项目源码 | 文件源码
def generate_adhoc_ssl_context():
    """Generates an adhoc SSL context for the development server."""
    from OpenSSL import SSL
    cert, pkey = generate_adhoc_ssl_pair()
    ctx = SSL.Context(SSL.SSLv23_METHOD)
    ctx.use_privatekey(pkey)
    ctx.use_certificate(cert)
    return ctx