Python OpenSSL.crypto 模块,load_certificate() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用OpenSSL.crypto.load_certificate()

项目:python-gnutls    作者:AGProjects    | 项目源码 | 文件源码
def __new__(typ, value):
        if isinstance(value, basestring):
            path = process.config_file(value)
            if path is None:
                log.warn("Certificate file '%s' is not readable" % value)
                return None
            try:
                f = open(path, 'rt')
            except:
                log.warn("Certificate file '%s' could not be open" % value)
                return None
            try:
                try:
                    return crypto.load_certificate(crypto.FILETYPE_PEM, f.read())
                except crypto.Error, e:
                    log.warn("Certificate file '%s' could not be loaded: %s" % (value, str(e)))
                    return None
            finally:
                f.close()
        else:
            raise TypeError, 'value should be a string'
项目:estreamer    作者:spohara79    | 项目源码 | 文件源码
def __init__(self, host, port, verify, cert_path, pkey_path, pkey_passphrase=''):
        self.host = host
        self.port = port
        try:
            self.pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, open(pkey_path, 'rb').read(), pkey_passphrase)
        except IOError:
            raise eStreamerKeyError("Unable to locate key file {}".format(pkey_path))
        except crypto.Error:
            raise eStreamerKeyError("Invalid key file or bad passphrase {}".format(pkey_path))
        try:
            self.cert = crypto.load_certificate(crypto.FILETYPE_PEM, open(cert_path, 'rb').read())
        except IOError:
            raise eStreamerCertError("Unable to locate cert file {}".format(cert_path))
        except crypto.Error:
            raise eStreamerCertError("Invalid certificate {}".format(cert_path))
        self.verify = verify
        self.ctx = None
        self.sock = None
        self._bytes = None
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def from_string(key_pem, is_x509_cert):
        """Construct a Verified instance from a string.

        Args:
            key_pem: string, public key in PEM format.
            is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it
                          is expected to be an RSA key in PEM format.

        Returns:
            Verifier instance.

        Raises:
            OpenSSL.crypto.Error: if the key_pem can't be parsed.
        """
        key_pem = _to_bytes(key_pem)
        if is_x509_cert:
            pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
        else:
            pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
        return OpenSSLVerifier(pubkey)
项目:sndlatr    作者:Schibum    | 项目源码 | 文件源码
def from_string(key_pem, is_x509_cert):
      """Construct a Verified instance from a string.

      Args:
        key_pem: string, public key in PEM format.
        is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is
          expected to be an RSA key in PEM format.

      Returns:
        Verifier instance.

      Raises:
        OpenSSL.crypto.Error if the key_pem can't be parsed.
      """
      if is_x509_cert:
        pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
      else:
        pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
      return OpenSSLVerifier(pubkey)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def gen_pkcs12(self, cert_pem=None, key_pem=None, ca_pem=None, friendly_name=None):
        """
        Generate a PKCS12 object with components from PEM.  Verify that the set
        functions return None.
        """
        p12 = PKCS12()
        if cert_pem:
            ret = p12.set_certificate(load_certificate(FILETYPE_PEM, cert_pem))
            self.assertEqual(ret, None)
        if key_pem:
            ret = p12.set_privatekey(load_privatekey(FILETYPE_PEM, key_pem))
            self.assertEqual(ret, None)
        if ca_pem:
            ret = p12.set_ca_certificates((load_certificate(FILETYPE_PEM, ca_pem),))
            self.assertEqual(ret, None)
        if friendly_name:
            ret = p12.set_friendlyname(friendly_name)
            self.assertEqual(ret, None)
        return p12
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_replace(self):
        """
        L{PKCS12.set_certificate} replaces the certificate in a PKCS12 cluster.
        L{PKCS12.set_privatekey} replaces the private key.
        L{PKCS12.set_ca_certificates} replaces the CA certificates.
        """
        p12 = self.gen_pkcs12(client_cert_pem, client_key_pem, root_cert_pem)
        p12.set_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
        p12.set_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
        root_cert = load_certificate(FILETYPE_PEM, root_cert_pem)
        client_cert = load_certificate(FILETYPE_PEM, client_cert_pem)
        p12.set_ca_certificates([root_cert]) # not a tuple
        self.assertEqual(1, len(p12.get_ca_certificates()))
        self.assertEqual(root_cert, p12.get_ca_certificates()[0])
        p12.set_ca_certificates([client_cert, root_cert])
        self.assertEqual(2, len(p12.get_ca_certificates()))
        self.assertEqual(client_cert, p12.get_ca_certificates()[0])
        self.assertEqual(root_cert, p12.get_ca_certificates()[1])
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _server(self, sock):
        """
        Create a new server-side SSL L{Connection} object wrapped around
        C{sock}.
        """
        # Create the server side Connection.  This is mostly setup boilerplate
        # - use TLSv1, use a particular certificate, etc.
        server_ctx = Context(TLSv1_METHOD)
        server_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE )
        server_ctx.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT|VERIFY_CLIENT_ONCE, verify_cb)
        server_store = server_ctx.get_cert_store()
        server_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
        server_ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
        server_ctx.check_privatekey()
        server_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem))
        # Here the Connection is actually created.  If None is passed as the 2nd
        # parameter, it indicates a memory BIO should be created.
        server_conn = Connection(server_ctx, sock)
        server_conn.set_accept_state()
        return server_conn
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _client(self, sock):
        """
        Create a new client-side SSL L{Connection} object wrapped around
        C{sock}.
        """
        # Now create the client side Connection.  Similar boilerplate to the
        # above.
        client_ctx = Context(TLSv1_METHOD)
        client_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE )
        client_ctx.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT|VERIFY_CLIENT_ONCE, verify_cb)
        client_store = client_ctx.get_cert_store()
        client_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, client_key_pem))
        client_ctx.use_certificate(load_certificate(FILETYPE_PEM, client_cert_pem))
        client_ctx.check_privatekey()
        client_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem))
        client_conn = Connection(client_ctx, sock)
        client_conn.set_connect_state()
        return client_conn
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_set_multiple_ca_list(self):
        """
        If passed a list containing multiple X509Name objects,
        L{Context.set_client_ca_list} configures the context to send those CA
        names to the client and, on both the server and client sides,
        L{Connection.get_client_ca_list} returns a list containing those
        X509Names after the connection is set up.
        """
        secert = load_certificate(FILETYPE_PEM, server_cert_pem)
        clcert = load_certificate(FILETYPE_PEM, server_cert_pem)

        sedesc = secert.get_subject()
        cldesc = clcert.get_subject()

        def multiple_ca(ctx):
            L = [sedesc, cldesc]
            ctx.set_client_ca_list(L)
            return L
        self._check_client_ca_list(multiple_ca)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_reset_ca_list(self):
        """
        If called multiple times, only the X509Names passed to the final call
        of L{Context.set_client_ca_list} are used to configure the CA names
        sent to the client.
        """
        cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
        secert = load_certificate(FILETYPE_PEM, server_cert_pem)
        clcert = load_certificate(FILETYPE_PEM, server_cert_pem)

        cadesc = cacert.get_subject()
        sedesc = secert.get_subject()
        cldesc = clcert.get_subject()

        def changed_ca(ctx):
            ctx.set_client_ca_list([sedesc, cldesc])
            ctx.set_client_ca_list([cadesc])
            return [cadesc]
        self._check_client_ca_list(changed_ca)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_mutated_ca_list(self):
        """
        If the list passed to L{Context.set_client_ca_list} is mutated
        afterwards, this does not affect the list of CA names sent to the
        client.
        """
        cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
        secert = load_certificate(FILETYPE_PEM, server_cert_pem)

        cadesc = cacert.get_subject()
        sedesc = secert.get_subject()

        def mutated_ca(ctx):
            L = [cadesc]
            ctx.set_client_ca_list([cadesc])
            L.append(sedesc)
            return [cadesc]
        self._check_client_ca_list(mutated_ca)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_set_and_add_client_ca(self):
        """
        A call to L{Context.set_client_ca_list} followed by a call to
        L{Context.add_client_ca} results in using the CA names from the first
        call and the CA name from the second call.
        """
        cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
        secert = load_certificate(FILETYPE_PEM, server_cert_pem)
        clcert = load_certificate(FILETYPE_PEM, server_cert_pem)

        cadesc = cacert.get_subject()
        sedesc = secert.get_subject()
        cldesc = clcert.get_subject()

        def mixed_set_add_ca(ctx):
            ctx.set_client_ca_list([cadesc, sedesc])
            ctx.add_client_ca(clcert)
            return [cadesc, sedesc, cldesc]
        self._check_client_ca_list(mixed_set_add_ca)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_set_after_add_client_ca(self):
        """
        A call to L{Context.set_client_ca_list} after a call to
        L{Context.add_client_ca} replaces the CA name specified by the former
        call with the names specified by the latter cal.
        """
        cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
        secert = load_certificate(FILETYPE_PEM, server_cert_pem)
        clcert = load_certificate(FILETYPE_PEM, server_cert_pem)

        cadesc = cacert.get_subject()
        sedesc = secert.get_subject()
        cldesc = clcert.get_subject()

        def set_replaces_add_ca(ctx):
            ctx.add_client_ca(clcert)
            ctx.set_client_ca_list([cadesc])
            ctx.add_client_ca(secert)
            return [cadesc, sedesc]
        self._check_client_ca_list(set_replaces_add_ca)
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def from_string(key_pem, is_x509_cert):
        """Construct a Verified instance from a string.

        Args:
            key_pem: string, public key in PEM format.
            is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it
                          is expected to be an RSA key in PEM format.

        Returns:
            Verifier instance.

        Raises:
            OpenSSL.crypto.Error: if the key_pem can't be parsed.
        """
        key_pem = _helpers._to_bytes(key_pem)
        if is_x509_cert:
            pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
        else:
            pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
        return OpenSSLVerifier(pubkey)
项目:munki-enrollment-client    作者:gerritdewitt    | 项目源码 | 文件源码
def read_client_identity():
    '''Loads the private key and certificate objects as read
        from the client identity PEM file.  Returns a pair of objects
        (key,cert) or None if something bad happened.'''
    common.print_info("Loading identity file...")
    # Check for missing client identity:
    if not os.path.exists(config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH):
        common.print_error("No client identity file found at %s." % config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH)
        return None
    # Read and load PKI material from the client identity:
    file_object = open(config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH,'r')
    file_contents = file_object.read()
    file_object.close()
    try:
        cert = crypto.load_certificate(crypto.FILETYPE_PEM,file_contents)
    except crypto.Error:
        common.print_error("Could not read the certificate from %s." % config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH)
        cert = None
    try:
        key = crypto.load_privatekey(crypto.FILETYPE_PEM,file_contents)
    except crypto.Error:
        common.print_error("Could not read the private key from %s." % config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH)
        key = None
    # Return PKI materials:
    return key,cert
项目:PySIGNFe    作者:thiagopena    | 项目源码 | 文件源码
def prepara_certificado_txt(self, cert_txt):
        #
        # Para dar certo a leitura pelo xmlsec, temos que separar o certificado
        # em linhas de 64 caracteres de extensão...
        #
        cert_txt = cert_txt.replace(u'\n', u'')
        cert_txt = cert_txt.replace(u'-----BEGIN CERTIFICATE-----', u'')
        cert_txt = cert_txt.replace(u'-----END CERTIFICATE-----', u'')

        linhas_certificado = [u'-----BEGIN CERTIFICATE-----\n']
        for i in range(0, len(cert_txt), 64):
            linhas_certificado.append(cert_txt[i:i+64] + '\n')
        linhas_certificado.append(u'-----END CERTIFICATE-----\n')

        self.certificado = u''.join(linhas_certificado)

        cert_openssl = crypto.load_certificate(crypto.FILETYPE_PEM, self.certificado)

        self.emissor = dict(cert_openssl.get_issuer().get_components())
        self.proprietario = dict(cert_openssl.get_subject().get_components())

        self.data_inicio_validade = datetime.strptime(cert_openssl.get_notBefore().decode('utf-8'), '%Y%m%d%H%M%SZ')
        self.data_fim_validade    = datetime.strptime(cert_openssl.get_notAfter().decode('utf-8'), '%Y%m%d%H%M%SZ')
项目:office-interoperability-tools    作者:milossramek    | 项目源码 | 文件源码
def from_string(key_pem, is_x509_cert):
    """Construct a Verified instance from a string.

    Args:
      key_pem: string, public key in PEM format.
      is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is
        expected to be an RSA key in PEM format.

    Returns:
      Verifier instance.

    Raises:
      OpenSSL.crypto.Error if the key_pem can't be parsed.
    """
    if is_x509_cert:
      pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
    else:
      pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
    return Verifier(pubkey)
项目:vmware-nsxlib    作者:openstack    | 项目源码 | 文件源码
def _get_cert_from_file(self, filename):
        with open(filename, 'r') as f:
            cert_pem = f.read()

        if not cert_pem:
            raise nsxlib_exceptions.CertificateError(
                msg=_("Failed to read certificate from %s") % filename)

        # validate correct crypto
        try:
            cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem)
        except crypto.Error:
            raise nsxlib_exceptions.CertificateError(
                msg=_("Failed to import client certificate"))

        return cert
项目:vmware-nsxlib    作者:openstack    | 项目源码 | 文件源码
def get_cert_and_key(self):
        """Load cert and key from storage"""
        if self._cert and self._key:
            return self._cert, self._key

        cert_pem, key_pem = self._load_from_storage()

        if cert_pem is None:
            return None, None

        try:
            cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem)
            key = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
        except crypto.Error:
            raise nsxlib_exceptions.CertificateError(
                msg="Failed to load client certificate")

        return cert, key
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def from_string(key_pem, is_x509_cert):
      """Construct a Verified instance from a string.

      Args:
        key_pem: string, public key in PEM format.
        is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is
          expected to be an RSA key in PEM format.

      Returns:
        Verifier instance.

      Raises:
        OpenSSL.crypto.Error if the key_pem can't be parsed.
      """
      from OpenSSL import crypto
      if is_x509_cert:
        pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
      else:
        pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
      return OpenSSLVerifier(pubkey)
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def from_string(key_pem, is_x509_cert):
      """Construct a Verified instance from a string.

      Args:
        key_pem: string, public key in PEM format.
        is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is
          expected to be an RSA key in PEM format.

      Returns:
        Verifier instance.

      Raises:
        OpenSSL.crypto.Error if the key_pem can't be parsed.
      """
      from OpenSSL import crypto
      if is_x509_cert:
        pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
      else:
        pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
      return OpenSSLVerifier(pubkey)
项目:deb-python-oauth2client    作者:openstack    | 项目源码 | 文件源码
def from_string(key_pem, is_x509_cert):
        """Construct a Verified instance from a string.

        Args:
            key_pem: string, public key in PEM format.
            is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it
                          is expected to be an RSA key in PEM format.

        Returns:
            Verifier instance.

        Raises:
            OpenSSL.crypto.Error: if the key_pem can't be parsed.
        """
        key_pem = _helpers._to_bytes(key_pem)
        if is_x509_cert:
            pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
        else:
            pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
        return OpenSSLVerifier(pubkey)
项目:2FAssassin    作者:maxwellkoh    | 项目源码 | 文件源码
def gen_pkcs12(self, cert_pem=None, key_pem=None, ca_pem=None,
                   friendly_name=None):
        """
        Generate a PKCS12 object with components from PEM.  Verify that the set
        functions return None.
        """
        p12 = PKCS12()
        if cert_pem:
            ret = p12.set_certificate(load_certificate(FILETYPE_PEM, cert_pem))
            assert ret is None
        if key_pem:
            ret = p12.set_privatekey(load_privatekey(FILETYPE_PEM, key_pem))
            assert ret is None
        if ca_pem:
            ret = p12.set_ca_certificates(
                (load_certificate(FILETYPE_PEM, ca_pem),)
            )
            assert ret is None
        if friendly_name:
            ret = p12.set_friendlyname(friendly_name)
            assert ret is None
        return p12
项目:2FAssassin    作者:maxwellkoh    | 项目源码 | 文件源码
def test_replace(self):
        """
        `PKCS12.set_certificate` replaces the certificate in a PKCS12
        cluster. `PKCS12.set_privatekey` replaces the private key.
        `PKCS12.set_ca_certificates` replaces the CA certificates.
        """
        p12 = self.gen_pkcs12(client_cert_pem, client_key_pem, root_cert_pem)
        p12.set_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
        p12.set_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
        root_cert = load_certificate(FILETYPE_PEM, root_cert_pem)
        client_cert = load_certificate(FILETYPE_PEM, client_cert_pem)
        p12.set_ca_certificates([root_cert])  # not a tuple
        assert 1 == len(p12.get_ca_certificates())
        assert root_cert == p12.get_ca_certificates()[0]
        p12.set_ca_certificates([client_cert, root_cert])
        assert 2 == len(p12.get_ca_certificates())
        assert client_cert == p12.get_ca_certificates()[0]
        assert root_cert == p12.get_ca_certificates()[1]
项目:2FAssassin    作者:maxwellkoh    | 项目源码 | 文件源码
def test_sign_verify_ecdsa(self):
        """
        `sign` generates a cryptographic signature which `verify` can check.
        ECDSA Signatures in the X9.62 format may have variable length,
        different from the length of the private key.
        """
        content = (
            b"It was a bright cold day in April, and the clocks were striking "
            b"thirteen. Winston Smith, his chin nuzzled into his breast in an "
            b"effort to escape the vile wind, slipped quickly through the "
            b"glass doors of Victory Mansions, though not quickly enough to "
            b"prevent a swirl of gritty dust from entering along with him."
        ).decode("ascii")
        priv_key = load_privatekey(FILETYPE_PEM, ec_root_key_pem)
        cert = load_certificate(FILETYPE_PEM, ec_root_cert_pem)
        sig = sign(priv_key, content, "sha1")
        verify(cert, sig, content, "sha1")
项目:2FAssassin    作者:maxwellkoh    | 项目源码 | 文件源码
def test_shutdown_truncated(self):
        """
        If the underlying connection is truncated, `Connection.shutdown`
        raises an `Error`.
        """
        server_ctx = Context(TLSv1_METHOD)
        client_ctx = Context(TLSv1_METHOD)
        server_ctx.use_privatekey(
            load_privatekey(FILETYPE_PEM, server_key_pem))
        server_ctx.use_certificate(
            load_certificate(FILETYPE_PEM, server_cert_pem))
        server = Connection(server_ctx, None)
        client = Connection(client_ctx, None)
        handshake_in_memory(client, server)
        assert not server.shutdown()
        with pytest.raises(WantReadError):
            server.shutdown()
        server.bio_shutdown()
        with pytest.raises(Error):
            server.shutdown()
项目:2FAssassin    作者:maxwellkoh    | 项目源码 | 文件源码
def _server(self, sock):
        """
        Create a new server-side SSL `Connection` object wrapped around `sock`.
        """
        # Create the server side Connection.  This is mostly setup boilerplate
        # - use TLSv1, use a particular certificate, etc.
        server_ctx = Context(TLSv1_METHOD)
        server_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE)
        server_ctx.set_verify(
            VERIFY_PEER | VERIFY_FAIL_IF_NO_PEER_CERT | VERIFY_CLIENT_ONCE,
            verify_cb
        )
        server_store = server_ctx.get_cert_store()
        server_ctx.use_privatekey(
            load_privatekey(FILETYPE_PEM, server_key_pem))
        server_ctx.use_certificate(
            load_certificate(FILETYPE_PEM, server_cert_pem))
        server_ctx.check_privatekey()
        server_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem))
        # Here the Connection is actually created.  If None is passed as the
        # 2nd parameter, it indicates a memory BIO should be created.
        server_conn = Connection(server_ctx, sock)
        server_conn.set_accept_state()
        return server_conn
项目:2FAssassin    作者:maxwellkoh    | 项目源码 | 文件源码
def test_set_multiple_ca_list(self):
        """
        If passed a list containing multiple X509Name objects,
        `Context.set_client_ca_list` configures the context to send
        those CA names to the client and, on both the server and client sides,
        `Connection.get_client_ca_list` returns a list containing those
        X509Names after the connection is set up.
        """
        secert = load_certificate(FILETYPE_PEM, server_cert_pem)
        clcert = load_certificate(FILETYPE_PEM, server_cert_pem)

        sedesc = secert.get_subject()
        cldesc = clcert.get_subject()

        def multiple_ca(ctx):
            L = [sedesc, cldesc]
            ctx.set_client_ca_list(L)
            return L
        self._check_client_ca_list(multiple_ca)
项目:2FAssassin    作者:maxwellkoh    | 项目源码 | 文件源码
def test_reset_ca_list(self):
        """
        If called multiple times, only the X509Names passed to the final call
        of `Context.set_client_ca_list` are used to configure the CA
        names sent to the client.
        """
        cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
        secert = load_certificate(FILETYPE_PEM, server_cert_pem)
        clcert = load_certificate(FILETYPE_PEM, server_cert_pem)

        cadesc = cacert.get_subject()
        sedesc = secert.get_subject()
        cldesc = clcert.get_subject()

        def changed_ca(ctx):
            ctx.set_client_ca_list([sedesc, cldesc])
            ctx.set_client_ca_list([cadesc])
            return [cadesc]
        self._check_client_ca_list(changed_ca)
项目:2FAssassin    作者:maxwellkoh    | 项目源码 | 文件源码
def test_mutated_ca_list(self):
        """
        If the list passed to `Context.set_client_ca_list` is mutated
        afterwards, this does not affect the list of CA names sent to the
        client.
        """
        cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
        secert = load_certificate(FILETYPE_PEM, server_cert_pem)

        cadesc = cacert.get_subject()
        sedesc = secert.get_subject()

        def mutated_ca(ctx):
            L = [cadesc]
            ctx.set_client_ca_list([cadesc])
            L.append(sedesc)
            return [cadesc]
        self._check_client_ca_list(mutated_ca)
项目:2FAssassin    作者:maxwellkoh    | 项目源码 | 文件源码
def test_set_and_add_client_ca(self):
        """
        A call to `Context.set_client_ca_list` followed by a call to
        `Context.add_client_ca` results in using the CA names from the
        first call and the CA name from the second call.
        """
        cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
        secert = load_certificate(FILETYPE_PEM, server_cert_pem)
        clcert = load_certificate(FILETYPE_PEM, server_cert_pem)

        cadesc = cacert.get_subject()
        sedesc = secert.get_subject()
        cldesc = clcert.get_subject()

        def mixed_set_add_ca(ctx):
            ctx.set_client_ca_list([cadesc, sedesc])
            ctx.add_client_ca(clcert)
            return [cadesc, sedesc, cldesc]
        self._check_client_ca_list(mixed_set_add_ca)
项目:2FAssassin    作者:maxwellkoh    | 项目源码 | 文件源码
def test_set_after_add_client_ca(self):
        """
        A call to `Context.set_client_ca_list` after a call to
        `Context.add_client_ca` replaces the CA name specified by the
        former call with the names specified by the latter call.
        """
        cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
        secert = load_certificate(FILETYPE_PEM, server_cert_pem)
        clcert = load_certificate(FILETYPE_PEM, server_cert_pem)

        cadesc = cacert.get_subject()
        sedesc = secert.get_subject()

        def set_replaces_add_ca(ctx):
            ctx.add_client_ca(clcert)
            ctx.set_client_ca_list([cadesc])
            ctx.add_client_ca(secert)
            return [cadesc, sedesc]
        self._check_client_ca_list(set_replaces_add_ca)
项目:REMAP    作者:REMAPApp    | 项目源码 | 文件源码
def from_string(key_pem, is_x509_cert):
        """Construct a Verified instance from a string.

        Args:
            key_pem: string, public key in PEM format.
            is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it
                          is expected to be an RSA key in PEM format.

        Returns:
            Verifier instance.

        Raises:
            OpenSSL.crypto.Error: if the key_pem can't be parsed.
        """
        key_pem = _helpers._to_bytes(key_pem)
        if is_x509_cert:
            pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
        else:
            pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
        return OpenSSLVerifier(pubkey)
项目:REMAP    作者:REMAPApp    | 项目源码 | 文件源码
def from_string(key_pem, is_x509_cert):
      """Construct a Verified instance from a string.

      Args:
        key_pem: string, public key in PEM format.
        is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is
          expected to be an RSA key in PEM format.

      Returns:
        Verifier instance.

      Raises:
        OpenSSL.crypto.Error if the key_pem can't be parsed.
      """
      if is_x509_cert:
        pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
      else:
        pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
      return OpenSSLVerifier(pubkey)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def gen_pkcs12(self, cert_pem=None, key_pem=None, ca_pem=None, friendly_name=None):
        """
        Generate a PKCS12 object with components from PEM.  Verify that the set
        functions return None.
        """
        p12 = PKCS12()
        if cert_pem:
            ret = p12.set_certificate(load_certificate(FILETYPE_PEM, cert_pem))
            self.assertEqual(ret, None)
        if key_pem:
            ret = p12.set_privatekey(load_privatekey(FILETYPE_PEM, key_pem))
            self.assertEqual(ret, None)
        if ca_pem:
            ret = p12.set_ca_certificates((load_certificate(FILETYPE_PEM, ca_pem),))
            self.assertEqual(ret, None)
        if friendly_name:
            ret = p12.set_friendlyname(friendly_name)
            self.assertEqual(ret, None)
        return p12
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def test_replace(self):
        """
        L{PKCS12.set_certificate} replaces the certificate in a PKCS12 cluster.
        L{PKCS12.set_privatekey} replaces the private key.
        L{PKCS12.set_ca_certificates} replaces the CA certificates.
        """
        p12 = self.gen_pkcs12(client_cert_pem, client_key_pem, root_cert_pem)
        p12.set_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
        p12.set_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
        root_cert = load_certificate(FILETYPE_PEM, root_cert_pem)
        client_cert = load_certificate(FILETYPE_PEM, client_cert_pem)
        p12.set_ca_certificates([root_cert]) # not a tuple
        self.assertEqual(1, len(p12.get_ca_certificates()))
        self.assertEqual(root_cert, p12.get_ca_certificates()[0])
        p12.set_ca_certificates([client_cert, root_cert])
        self.assertEqual(2, len(p12.get_ca_certificates()))
        self.assertEqual(client_cert, p12.get_ca_certificates()[0])
        self.assertEqual(root_cert, p12.get_ca_certificates()[1])
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def _server(self, sock):
        """
        Create a new server-side SSL L{Connection} object wrapped around
        C{sock}.
        """
        # Create the server side Connection.  This is mostly setup boilerplate
        # - use TLSv1, use a particular certificate, etc.
        server_ctx = Context(TLSv1_METHOD)
        server_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE )
        server_ctx.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT|VERIFY_CLIENT_ONCE, verify_cb)
        server_store = server_ctx.get_cert_store()
        server_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
        server_ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
        server_ctx.check_privatekey()
        server_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem))
        # Here the Connection is actually created.  If None is passed as the 2nd
        # parameter, it indicates a memory BIO should be created.
        server_conn = Connection(server_ctx, sock)
        server_conn.set_accept_state()
        return server_conn
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def _client(self, sock):
        """
        Create a new client-side SSL L{Connection} object wrapped around
        C{sock}.
        """
        # Now create the client side Connection.  Similar boilerplate to the
        # above.
        client_ctx = Context(TLSv1_METHOD)
        client_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE )
        client_ctx.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT|VERIFY_CLIENT_ONCE, verify_cb)
        client_store = client_ctx.get_cert_store()
        client_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, client_key_pem))
        client_ctx.use_certificate(load_certificate(FILETYPE_PEM, client_cert_pem))
        client_ctx.check_privatekey()
        client_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem))
        client_conn = Connection(client_ctx, sock)
        client_conn.set_connect_state()
        return client_conn
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def test_set_multiple_ca_list(self):
        """
        If passed a list containing multiple X509Name objects,
        L{Context.set_client_ca_list} configures the context to send those CA
        names to the client and, on both the server and client sides,
        L{Connection.get_client_ca_list} returns a list containing those
        X509Names after the connection is set up.
        """
        secert = load_certificate(FILETYPE_PEM, server_cert_pem)
        clcert = load_certificate(FILETYPE_PEM, server_cert_pem)

        sedesc = secert.get_subject()
        cldesc = clcert.get_subject()

        def multiple_ca(ctx):
            L = [sedesc, cldesc]
            ctx.set_client_ca_list(L)
            return L
        self._check_client_ca_list(multiple_ca)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def test_reset_ca_list(self):
        """
        If called multiple times, only the X509Names passed to the final call
        of L{Context.set_client_ca_list} are used to configure the CA names
        sent to the client.
        """
        cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
        secert = load_certificate(FILETYPE_PEM, server_cert_pem)
        clcert = load_certificate(FILETYPE_PEM, server_cert_pem)

        cadesc = cacert.get_subject()
        sedesc = secert.get_subject()
        cldesc = clcert.get_subject()

        def changed_ca(ctx):
            ctx.set_client_ca_list([sedesc, cldesc])
            ctx.set_client_ca_list([cadesc])
            return [cadesc]
        self._check_client_ca_list(changed_ca)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def test_mutated_ca_list(self):
        """
        If the list passed to L{Context.set_client_ca_list} is mutated
        afterwards, this does not affect the list of CA names sent to the
        client.
        """
        cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
        secert = load_certificate(FILETYPE_PEM, server_cert_pem)

        cadesc = cacert.get_subject()
        sedesc = secert.get_subject()

        def mutated_ca(ctx):
            L = [cadesc]
            ctx.set_client_ca_list([cadesc])
            L.append(sedesc)
            return [cadesc]
        self._check_client_ca_list(mutated_ca)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def test_set_and_add_client_ca(self):
        """
        A call to L{Context.set_client_ca_list} followed by a call to
        L{Context.add_client_ca} results in using the CA names from the first
        call and the CA name from the second call.
        """
        cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
        secert = load_certificate(FILETYPE_PEM, server_cert_pem)
        clcert = load_certificate(FILETYPE_PEM, server_cert_pem)

        cadesc = cacert.get_subject()
        sedesc = secert.get_subject()
        cldesc = clcert.get_subject()

        def mixed_set_add_ca(ctx):
            ctx.set_client_ca_list([cadesc, sedesc])
            ctx.add_client_ca(clcert)
            return [cadesc, sedesc, cldesc]
        self._check_client_ca_list(mixed_set_add_ca)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def test_set_after_add_client_ca(self):
        """
        A call to L{Context.set_client_ca_list} after a call to
        L{Context.add_client_ca} replaces the CA name specified by the former
        call with the names specified by the latter cal.
        """
        cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
        secert = load_certificate(FILETYPE_PEM, server_cert_pem)
        clcert = load_certificate(FILETYPE_PEM, server_cert_pem)

        cadesc = cacert.get_subject()
        sedesc = secert.get_subject()
        cldesc = clcert.get_subject()

        def set_replaces_add_ca(ctx):
            ctx.add_client_ca(clcert)
            ctx.set_client_ca_list([cadesc])
            ctx.add_client_ca(secert)
            return [cadesc, sedesc]
        self._check_client_ca_list(set_replaces_add_ca)
项目:isard    作者:isard-vdi    | 项目源码 | 文件源码
def _secure_viewer(self):
        cert_file='install/viewer-certs/ca-cert.pem'
        cert_file=''
        try:
            with open(cert_file, "r") as caFile:
                ca=caFile.read()
            from OpenSSL import crypto
            cert = crypto.load_certificate(crypto.FILETYPE_PEM, open(cert_file).read())
            return {'defaultMode':'Secure',
                                'certificate':ca,
                                'domain':cert.get_issuer().organizationName}
        except Exception as e:
            print(e)
            return {'defaultMode':'Insecure',
                                'certificate':'',
                                'domain':''}
项目:ecodash    作者:Servir-Mekong    | 项目源码 | 文件源码
def from_string(key_pem, is_x509_cert):
        """Construct a Verified instance from a string.

        Args:
            key_pem: string, public key in PEM format.
            is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it
                          is expected to be an RSA key in PEM format.

        Returns:
            Verifier instance.

        Raises:
            OpenSSL.crypto.Error: if the key_pem can't be parsed.
        """
        key_pem = _to_bytes(key_pem)
        if is_x509_cert:
            pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
        else:
            pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
        return OpenSSLVerifier(pubkey)
项目:ecodash    作者:Servir-Mekong    | 项目源码 | 文件源码
def from_string(key_pem, is_x509_cert):
      """Construct a Verified instance from a string.

      Args:
        key_pem: string, public key in PEM format.
        is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is
          expected to be an RSA key in PEM format.

      Returns:
        Verifier instance.

      Raises:
        OpenSSL.crypto.Error if the key_pem can't be parsed.
      """
      if is_x509_cert:
        pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
      else:
        pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
      return OpenSSLVerifier(pubkey)
项目:munki-enrollment-server    作者:gerritdewitt    | 项目源码 | 文件源码
def read_ca_cert():
    '''Loads CA certificate from the filesystem and returns it as an object.
        Returns None if something went wrong.'''
    common.logging_info("Loading CA certificate.")
    # Check for missing CA certificate:
    if not os.path.exists(config_ca.CA_CERT_FILE_PATH):
        common.logging_error("No CA certificate file found at %s." % config_ca.CA_CERT_FILE_PATH)
        return None
    # Read and load CA certificate:
    try:
        file_object = open(config_ca.CA_CERT_FILE_PATH,'r')
        file_contents = file_object.read()
        file_object.close()
    except IOError:
        return None
    # Return CA cert:
    try:
        return crypto.load_certificate(crypto.FILETYPE_PEM,file_contents)
    except crypto.Error:
        common.logging_error("Could not read CA certificate from %s." % config_ca.CA_CERT_FILE_PATH)
        return None
项目:fabric-test    作者:hyperledger    | 项目源码 | 文件源码
def initFromPath(self, path):
        'Will initialize the directory from the path supplied'
        import cPickle
        data = None
        with open(path,'r') as f:
            data = cPickle.load(f)
        assert data != None, "Expected some data, did not load any."
        priv_key_from_pem = lambda x: crypto.load_privatekey(crypto.FILETYPE_PEM, x)
        for userName, keyTuple in data['users'].iteritems():
            self.users[userName] = User(userName, directory=self,
                                        ecdsaSigningKey=ecdsa.SigningKey.from_pem(keyTuple[0]),
                                        rsaSigningKey=priv_key_from_pem(keyTuple[1]))
            self.users[userName].tags = keyTuple[2]
        for orgName, tuple in data['organizations'].iteritems():
            org = Organization(orgName, ecdsaSigningKey=ecdsa.SigningKey.from_pem(tuple[0]),
                               rsaSigningKey=priv_key_from_pem(tuple[0]))
            org.signedCert = crypto.load_certificate(crypto.FILETYPE_PEM, tuple[2])
            org.networks = [Network[name] for name in tuple[3]]
            self.organizations[orgName] = org
        for nat, cert_as_pem in data['nats'].iteritems():
            self.ordererAdminTuples[nat] = crypto.load_certificate(crypto.FILETYPE_PEM, cert_as_pem)
项目:python-gnutls    作者:AGProjects    | 项目源码 | 文件源码
def __new__(typ, value):
        if isinstance(value, basestring):
            path = process.config_file(value)
            if path is None:
                log.warn("Certificate file '%s' is not readable" % value)
                return None
            try:
                f = open(path, 'rt')
            except:
                log.warn("Certificate file '%s' could not be open" % value)
                return None
            try:
                try:
                    return crypto.load_certificate(crypto.FILETYPE_PEM, f.read())
                except crypto.Error, e:
                    log.warn("Certificate file '%s' could not be loaded: %s" % (value, str(e)))
                    return None
            finally:
                f.close()
        else:
            raise TypeError, 'value should be a string'