Python OpenSSL.crypto 模块,X509Extension() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用OpenSSL.crypto.X509Extension()

项目:GotoX    作者:SeaHOH    | 项目源码 | 文件源码
def create_ca():
    pkey = crypto.PKey()
    pkey.generate_key(crypto.TYPE_RSA, 2048)
    ca = crypto.X509()
    ca.set_version(2)
    ca.set_serial_number(0)
    subject = ca.get_subject()
    subject.countryName = 'CN'
    subject.stateOrProvinceName = 'Internet'
    subject.localityName = 'Cernet'
    subject.organizationName = ca_vendor
    subject.organizationalUnitName = '%s Root' % ca_vendor
    subject.commonName = '%s CA' % ca_vendor
    #????????????????????
    ca.gmtime_adj_notBefore(ca_time_b)
    ca.gmtime_adj_notAfter(ca_time_a)
    ca.set_issuer(subject)
    ca.set_pubkey(pkey)
    ca.add_extensions([
        crypto.X509Extension(b'basicConstraints', True, b'CA:TRUE, pathlen:0'),
        crypto.X509Extension(b'extendedKeyUsage', True, b'serverAuth,emailProtection,timeStamping'),
        crypto.X509Extension(b'keyUsage', False, b'keyCertSign, cRLSign'),
        crypto.X509Extension(b'subjectKeyIdentifier', False, b'hash', subject=ca), ])
    ca.sign(pkey, ca_digest)
    return pkey, ca
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_construction(self):
        """
        L{X509Extension} accepts an extension type name, a critical flag,
        and an extension value and returns an L{X509ExtensionType} instance.
        """
        basic = X509Extension('basicConstraints', True, 'CA:true')
        self.assertTrue(
            isinstance(basic, X509ExtensionType),
            "%r is of type %r, should be %r" % (
                basic, type(basic), X509ExtensionType))

        comment = X509Extension('nsComment', False, 'pyOpenSSL unit test')
        self.assertTrue(
            isinstance(comment, X509ExtensionType),
            "%r is of type %r, should be %r" % (
                comment, type(comment), X509ExtensionType))
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_invalid_extension(self):
        """
        L{X509Extension} raises something if it is passed a bad extension
        name or value.
        """
        self.assertRaises(
            Error, X509Extension, 'thisIsMadeUp', False, 'hi')
        self.assertRaises(
            Error, X509Extension, 'basicConstraints', False, 'blah blah')

        # Exercise a weird one (an extension which uses the r2i method).  This
        # exercises the codepath that requires a non-NULL ctx to be passed to
        # X509V3_EXT_nconf.  It can't work now because we provide no
        # configuration database.  It might be made to work in the future.
        self.assertRaises(
            Error, X509Extension, 'proxyCertInfo', True,
            'language:id-ppl-anyLanguage,pathlen:1,policy:text:AB')
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def test_construction(self):
        """
        L{X509Extension} accepts an extension type name, a critical flag,
        and an extension value and returns an L{X509ExtensionType} instance.
        """
        basic = X509Extension('basicConstraints', True, 'CA:true')
        self.assertTrue(
            isinstance(basic, X509ExtensionType),
            "%r is of type %r, should be %r" % (
                basic, type(basic), X509ExtensionType))

        comment = X509Extension('nsComment', False, 'pyOpenSSL unit test')
        self.assertTrue(
            isinstance(comment, X509ExtensionType),
            "%r is of type %r, should be %r" % (
                comment, type(comment), X509ExtensionType))
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def test_invalid_extension(self):
        """
        L{X509Extension} raises something if it is passed a bad extension
        name or value.
        """
        self.assertRaises(
            Error, X509Extension, 'thisIsMadeUp', False, 'hi')
        self.assertRaises(
            Error, X509Extension, 'basicConstraints', False, 'blah blah')

        # Exercise a weird one (an extension which uses the r2i method).  This
        # exercises the codepath that requires a non-NULL ctx to be passed to
        # X509V3_EXT_nconf.  It can't work now because we provide no
        # configuration database.  It might be made to work in the future.
        self.assertRaises(
            Error, X509Extension, 'proxyCertInfo', True,
            'language:id-ppl-anyLanguage,pathlen:1,policy:text:AB')
项目:aws-greengrass-mini-fulfillment    作者:awslabs    | 项目源码 | 文件源码
def create_group_cert(cli):
    k = crypto.PKey()
    k.generate_key(crypto.TYPE_RSA, 2048)  # generate RSA key-pair

    cert = crypto.X509()
    cert.get_subject().countryName = "US"
    cert.get_subject().stateOrProvinceName = "CA"
    cert.get_subject().organizationName = "mini-fulfillment"
    cert.get_subject().organizationalUnitName = "demo"
    cert.get_subject().commonName = "mini-fulfillment"
    cert.set_serial_number(1000)
    cert.gmtime_adj_notBefore(0)
    cert.gmtime_adj_notAfter(5 * 365 * 24 * 60 * 60)  # 5 year expiry date
    cert.set_issuer(cert.get_subject())  # self-sign this certificate
    cert.set_pubkey(k)
    san_list = ["IP:{0}".format(cli.ip_address)]
    extension_list = [
        crypto.X509Extension(type_name=b"basicConstraints",
                             critical=False, value=b"CA:false"),
        crypto.X509Extension(type_name=b"subjectAltName",
                             critical=True, value=", ".join(san_list)),
        # crypto.X509Extension(type_name=b"subjectKeyIdentifier",
        #                      critical=True, value=b"hash")
    ]
    cert.add_extensions(extension_list)
    cert.sign(k, 'sha256')

    prefix = str(cli.out_dir) + '/' + cli.group_name

    open("{0}-server.crt".format(prefix), 'wt').write(
        crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
    open("{0}-server-private.key".format(prefix), 'wt').write(
        crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey=k))
    open("{0}-server-public.key".format(prefix), 'wt').write(
        crypto.dump_publickey(crypto.FILETYPE_PEM, pkey=k))
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_type(self):
        """
        L{X509Extension} and L{X509ExtensionType} refer to the same type object
        and can be used to create instances of that type.
        """
        self.assertIdentical(X509Extension, X509ExtensionType)
        self.assertConsistentType(
            X509Extension, 'X509Extension', 'basicConstraints', True, 'CA:true')
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_get_critical(self):
        """
        L{X509ExtensionType.get_critical} returns the value of the
        extension's critical flag.
        """
        ext = X509Extension('basicConstraints', True, 'CA:true')
        self.assertTrue(ext.get_critical())
        ext = X509Extension('basicConstraints', False, 'CA:true')
        self.assertFalse(ext.get_critical())
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_get_short_name(self):
        """
        L{X509ExtensionType.get_short_name} returns a string giving the short
        type name of the extension.
        """
        ext = X509Extension('basicConstraints', True, 'CA:true')
        self.assertEqual(ext.get_short_name(), 'basicConstraints')
        ext = X509Extension('nsComment', True, 'foo bar')
        self.assertEqual(ext.get_short_name(), 'nsComment')
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_subject(self):
        """
        If an extension requires a subject, the C{subject} parameter to
        L{X509Extension} provides its value.
        """
        ext3 = X509Extension('subjectKeyIdentifier', False, 'hash', subject=self.x509)
        self.x509.add_extensions([ext3])
        self.x509.sign(self.pkey, 'sha1')
        text = dump_certificate(FILETYPE_TEXT, self.x509)
        self.assertTrue('X509v3 Subject Key Identifier:' in text)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_missing_subject(self):
        """
        If an extension requires a subject and the C{subject} parameter is
        given no value, something happens.
        """
        self.assertRaises(
            Error, X509Extension, 'subjectKeyIdentifier', False, 'hash')
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_invalid_subject(self):
        """
        If the C{subject} parameter is given a value which is not an L{X509}
        instance, L{TypeError} is raised.
        """
        for badObj in [True, object(), "hello", [], self]:
            self.assertRaises(
                TypeError,
                X509Extension,
                'basicConstraints', False, 'CA:TRUE', subject=badObj)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_unused_issuer(self):
        """
        The C{issuer} parameter to L{X509Extension} may be provided for an
        extension which does not use it and is ignored in this case.
        """
        ext1 = X509Extension('basicConstraints', False, 'CA:TRUE', issuer=self.x509)
        self.x509.add_extensions([ext1])
        self.x509.sign(self.pkey, 'sha1')
        text = dump_certificate(FILETYPE_TEXT, self.x509)
        self.assertTrue('X509v3 Basic Constraints:' in text)
        self.assertTrue('CA:TRUE' in text)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_issuer(self):
        """
        If an extension requires a issuer, the C{issuer} parameter to
        L{X509Extension} provides its value.
        """
        ext2 = X509Extension(
            'authorityKeyIdentifier', False, 'issuer:always',
            issuer=self.x509)
        self.x509.add_extensions([ext2])
        self.x509.sign(self.pkey, 'sha1')
        text = dump_certificate(FILETYPE_TEXT, self.x509)
        self.assertTrue('X509v3 Authority Key Identifier:' in text)
        self.assertTrue('DirName:/CN=Yoda root CA' in text)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_invalid_issuer(self):
        """
        If the C{issuer} parameter is given a value which is not an L{X509}
        instance, L{TypeError} is raised.
        """
        for badObj in [True, object(), "hello", [], self]:
            self.assertRaises(
                TypeError,
                X509Extension,
                'authorityKeyIdentifier', False, 'keyid:always,issuer:always',
                issuer=badObj)
项目:docket    作者:rocknsm    | 项目源码 | 文件源码
def generate(self, module):
        '''Generate the certificate signing request.'''

        if not self.check(module, perms_required=False) or self.force:
            req = crypto.X509Req()
            req.set_version(self.version)
            subject = req.get_subject()
            for (key, value) in self.subject.items():
                if value is not None:
                    setattr(subject, key, value)

            altnames = ', '.join(self.subjectAltName)
            extensions = [crypto.X509Extension(b"subjectAltName", False, altnames.encode('ascii'))]

            if self.keyUsage:
                usages = ', '.join(self.keyUsage)
                extensions.append(crypto.X509Extension(b"keyUsage", False, usages.encode('ascii')))

            if self.extendedKeyUsage:
                usages = ', '.join(self.extendedKeyUsage)
                extensions.append(crypto.X509Extension(b"extendedKeyUsage", False, usages.encode('ascii')))

            req.add_extensions(extensions)

            req.set_pubkey(self.privatekey)
            req.sign(self.privatekey, self.digest)
            self.request = req

            try:
                csr_file = open(self.path, 'wb')
                csr_file.write(crypto.dump_certificate_request(crypto.FILETYPE_PEM, self.request))
                csr_file.close()
            except (IOError, OSError) as exc:
                raise CertificateSigningRequestError(exc)

            self.changed = True

        file_args = module.load_file_common_arguments(module.params)
        if module.set_fs_attributes_if_different(file_args, False):
            self.changed = True
项目:django-x509    作者:openwisp    | 项目源码 | 文件源码
def test_subject_key_identifier(self):
        ca = self._create_ca()
        e = ca.x509.get_extension(2)
        self.assertEqual(e.get_short_name().decode(), 'subjectKeyIdentifier')
        self.assertEqual(e.get_critical(), False)
        e2 = crypto.X509Extension(b'subjectKeyIdentifier',
                                  False,
                                  b'hash',
                                  subject=ca.x509)
        self.assertEqual(e.get_data(), e2.get_data())
项目:django-x509    作者:openwisp    | 项目源码 | 文件源码
def test_authority_key_identifier(self):
        ca = self._create_ca()
        e = ca.x509.get_extension(3)
        self.assertEqual(e.get_short_name().decode(), 'authorityKeyIdentifier')
        self.assertEqual(e.get_critical(), False)
        e2 = crypto.X509Extension(b'authorityKeyIdentifier',
                                  False,
                                  b'keyid:always,issuer:always',
                                  issuer=ca.x509)
        self.assertEqual(e.get_data(), e2.get_data())
项目:django-x509    作者:openwisp    | 项目源码 | 文件源码
def test_subject_key_identifier(self):
        cert = self._create_cert()
        e = cert.x509.get_extension(2)
        self.assertEqual(e.get_short_name().decode(), 'subjectKeyIdentifier')
        self.assertEqual(e.get_critical(), False)
        e2 = crypto.X509Extension(b'subjectKeyIdentifier',
                                  False,
                                  b'hash',
                                  subject=cert.x509)
        self.assertEqual(e.get_data(), e2.get_data())
项目:django-x509    作者:openwisp    | 项目源码 | 文件源码
def test_authority_key_identifier(self):
        cert = self._create_cert()
        e = cert.x509.get_extension(3)
        self.assertEqual(e.get_short_name().decode(), 'authorityKeyIdentifier')
        self.assertEqual(e.get_critical(), False)
        e2 = crypto.X509Extension(b'authorityKeyIdentifier',
                                  False,
                                  b'keyid:always,issuer:always',
                                  issuer=cert.ca.x509)
        self.assertEqual(e.get_data(), e2.get_data())
项目:experiment-manager    作者:softfire-eu    | 项目源码 | 文件源码
def _add_extensions(self, cert, is_server=False):
        """
        (internal use only)
        adds x509 extensions to ``cert``
        """
        ext = list()
        ext.append(crypto.X509Extension(b'basicConstraints',
                                        True,
                                        b'CA:FALSE'))
        ext.append(crypto.X509Extension(b'keyUsage',
                                        CERT_KEYUSAGE_CRITICAL,
                                        bytes_compat(CERT_KEYUSAGE_VALUE)))
        if is_server:
            ext.append(crypto.X509Extension(b'extendedKeyUsage', False, b'serverAuth'))
        issuer_cert = self.ca_cert
        ext.append(crypto.X509Extension(b'subjectKeyIdentifier',
                                        False,
                                        b'hash',
                                        subject=cert))
        cert.add_extensions(ext)
        cert.add_extensions([
            crypto.X509Extension(b'authorityKeyIdentifier',
                                 False,
                                 b'keyid:always,issuer:always',
                                 issuer=issuer_cert)
        ])
        return cert
项目:options-screener    作者:dcwangmit01    | 项目源码 | 文件源码
def self_signed_cert_gen(
            key_type=crypto.TYPE_RSA,
            key_bits=4096,
            country="US",
            state_province="California",
            locality="San Francisco",
            org="Your Company",
            org_unit="Team",
            common_name="www.domain.com",
            subject_alt_names=[],  # alternative dns names as list
            # ^ must look like: ["DNS:*.domain.com", "DNS:domain.ym"]
            validity_days=10 * 365):

        # Create a key pair
        k = crypto.PKey()
        k.generate_key(key_type, key_bits)

        # Create a self-signed cert
        cert = crypto.X509()
        cert.get_subject().C = country
        cert.get_subject().ST = state_province
        cert.get_subject().L = locality
        cert.get_subject().O = org
        cert.get_subject().OU = org_unit
        cert.get_subject().CN = common_name
        if subject_alt_names:
            subject_alt_names = ", ".join(subject_alt_names).encode("utf-8")
            cert.add_extensions([
                crypto.X509Extension("subjectAltName".encode("utf-8"), False,
                                     subject_alt_names)
            ])
        cert.set_serial_number(random.getrandbits(64))
        cert.gmtime_adj_notBefore(0)
        cert.gmtime_adj_notAfter(validity_days * 24 * 60 * 60)
        cert.set_issuer(cert.get_subject())  # self-signer
        cert.set_pubkey(k)
        cert.sign(k, 'sha1')

        # return a tuple of the private key and the self-signed cert
        return (crypto.dump_privatekey(crypto.FILETYPE_PEM, k),
                crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def test_type(self):
        """
        L{X509Extension} and L{X509ExtensionType} refer to the same type object
        and can be used to create instances of that type.
        """
        self.assertIdentical(X509Extension, X509ExtensionType)
        self.assertConsistentType(
            X509Extension, 'X509Extension', 'basicConstraints', True, 'CA:true')
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def test_get_critical(self):
        """
        L{X509ExtensionType.get_critical} returns the value of the
        extension's critical flag.
        """
        ext = X509Extension('basicConstraints', True, 'CA:true')
        self.assertTrue(ext.get_critical())
        ext = X509Extension('basicConstraints', False, 'CA:true')
        self.assertFalse(ext.get_critical())
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def test_get_short_name(self):
        """
        L{X509ExtensionType.get_short_name} returns a string giving the short
        type name of the extension.
        """
        ext = X509Extension('basicConstraints', True, 'CA:true')
        self.assertEqual(ext.get_short_name(), 'basicConstraints')
        ext = X509Extension('nsComment', True, 'foo bar')
        self.assertEqual(ext.get_short_name(), 'nsComment')
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def test_subject(self):
        """
        If an extension requires a subject, the C{subject} parameter to
        L{X509Extension} provides its value.
        """
        ext3 = X509Extension('subjectKeyIdentifier', False, 'hash', subject=self.x509)
        self.x509.add_extensions([ext3])
        self.x509.sign(self.pkey, 'sha1')
        text = dump_certificate(FILETYPE_TEXT, self.x509)
        self.assertTrue('X509v3 Subject Key Identifier:' in text)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def test_missing_subject(self):
        """
        If an extension requires a subject and the C{subject} parameter is
        given no value, something happens.
        """
        self.assertRaises(
            Error, X509Extension, 'subjectKeyIdentifier', False, 'hash')
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def test_invalid_subject(self):
        """
        If the C{subject} parameter is given a value which is not an L{X509}
        instance, L{TypeError} is raised.
        """
        for badObj in [True, object(), "hello", [], self]:
            self.assertRaises(
                TypeError,
                X509Extension,
                'basicConstraints', False, 'CA:TRUE', subject=badObj)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def test_unused_issuer(self):
        """
        The C{issuer} parameter to L{X509Extension} may be provided for an
        extension which does not use it and is ignored in this case.
        """
        ext1 = X509Extension('basicConstraints', False, 'CA:TRUE', issuer=self.x509)
        self.x509.add_extensions([ext1])
        self.x509.sign(self.pkey, 'sha1')
        text = dump_certificate(FILETYPE_TEXT, self.x509)
        self.assertTrue('X509v3 Basic Constraints:' in text)
        self.assertTrue('CA:TRUE' in text)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def test_issuer(self):
        """
        If an extension requires a issuer, the C{issuer} parameter to
        L{X509Extension} provides its value.
        """
        ext2 = X509Extension(
            'authorityKeyIdentifier', False, 'issuer:always',
            issuer=self.x509)
        self.x509.add_extensions([ext2])
        self.x509.sign(self.pkey, 'sha1')
        text = dump_certificate(FILETYPE_TEXT, self.x509)
        self.assertTrue('X509v3 Authority Key Identifier:' in text)
        self.assertTrue('DirName:/CN=Yoda root CA' in text)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def test_invalid_issuer(self):
        """
        If the C{issuer} parameter is given a value which is not an L{X509}
        instance, L{TypeError} is raised.
        """
        for badObj in [True, object(), "hello", [], self]:
            self.assertRaises(
                TypeError,
                X509Extension,
                'authorityKeyIdentifier', False, 'keyid:always,issuer:always',
                issuer=badObj)
项目:fabric-test    作者:hyperledger    | 项目源码 | 文件源码
def _get_cert_extensions_ip_sans(self, user_name, node_name):
        extensions = []
        if 'signer' in user_name.lower():
            if ('peer' in node_name.lower() or 'orderer' in node_name.lower()):
                san_list = ["DNS:{0}".format(node_name)]
                extensions.append(crypto.X509Extension(b"subjectAltName", False, ", ".join(san_list)))
        return extensions
项目:kOVHernetes    作者:antoineco    | 项目源码 | 文件源码
def __init__(self):
        # CA key
        key = crypto.PKey()
        key.generate_key(crypto.TYPE_RSA, 2048)

        # CA cert
        cert = crypto.X509()
        cert.set_serial_number(self.__next_serial)
        cert.set_version(2)
        cert.set_pubkey(key)
        cert.gmtime_adj_notBefore(0)
        cert.gmtime_adj_notAfter(10*365*24*60*60)

        cacert_subject = cert.get_subject()
        cacert_subject.O = 'kOVHernetes'
        cacert_subject.OU = 'kOVHernetes Certificate Authority'
        cacert_subject.CN = 'kOVHernetes Root CA'
        cert.set_issuer(cacert_subject)

        cert.add_extensions((crypto.X509Extension(b'subjectKeyIdentifier', False, b'hash', cert),))
        cacert_ext = []
        cacert_ext.append(crypto.X509Extension(b'authorityKeyIdentifier', True, b'keyid:always,issuer', issuer=cert))
        cacert_ext.append(crypto.X509Extension(b'basicConstraints', True, b'CA:TRUE'))
        cacert_ext.append(crypto.X509Extension(b'keyUsage', True, b'digitalSignature, cRLSign, keyCertSign'))
        cert.add_extensions(cacert_ext)

        # sign CA cert with CA key
        cert.sign(key, 'sha256')

        type(self).__next_serial += 1

        self.cert = cert
        self.key = key
项目:kOVHernetes    作者:antoineco    | 项目源码 | 文件源码
def create_client_cert(self, key, o, cn):
        """Issue a X.509 client certificate"""

        cert = crypto.X509()
        cert.set_serial_number(self.__next_serial)
        cert.set_version(2)
        cert.set_pubkey(key)
        cert.gmtime_adj_notBefore(0)
        cert.gmtime_adj_notAfter(365*24*60*60)

        cert_subject = cert.get_subject()
        cert_subject.O = o
        cert_subject.OU = 'kOVHernetes'
        cert_subject.CN = cn
        cert.set_issuer(self.cert.get_issuer())

        cert_ext = []
        cert_ext.append(crypto.X509Extension(b'subjectKeyIdentifier', False, b'hash', cert))
        cert_ext.append(crypto.X509Extension(b'authorityKeyIdentifier', False, b'keyid,issuer', issuer=cert))
        cert_ext.append(crypto.X509Extension(b'basicConstraints', False, b'CA:FALSE'))
        cert_ext.append(crypto.X509Extension(b'keyUsage', True, b'nonRepudiation, digitalSignature, keyEncipherment'))
        cert_ext.append(crypto.X509Extension(b'extendedKeyUsage', True, b'clientAuth'))
        cert.add_extensions(cert_ext)

        # sign cert with CA key
        cert.sign(self.key, 'sha256')

        type(self).__next_serial += 1

        return cert
项目:kOVHernetes    作者:antoineco    | 项目源码 | 文件源码
def create_server_cert(self, key, o, cn, san=[]):
        """Issue a X.509 server certificate"""

        cert = crypto.X509()
        cert.set_serial_number(self.__next_serial)
        cert.set_version(2)
        cert.set_pubkey(key)
        cert.gmtime_adj_notBefore(0)
        cert.gmtime_adj_notAfter(365*24*60*60)

        cert_subject = cert.get_subject()
        cert_subject.O = o
        cert_subject.OU = 'kOVHernetes'
        cert_subject.CN = cn
        cert.set_issuer(self.cert.get_issuer())

        cert_ext = []
        cert_ext.append(crypto.X509Extension(b'subjectKeyIdentifier', False, b'hash', cert))
        cert_ext.append(crypto.X509Extension(b'authorityKeyIdentifier', False, b'keyid,issuer:always', issuer=cert))
        cert_ext.append(crypto.X509Extension(b'basicConstraints', False, b'CA:FALSE'))
        cert_ext.append(crypto.X509Extension(b'keyUsage', True, b'digitalSignature, keyEncipherment'))
        cert_ext.append(crypto.X509Extension(b'extendedKeyUsage', True, b'serverAuth'))
        if san:
            cert_ext.append(crypto.X509Extension(b'subjectAltName', False, ','.join(san).encode()))
        cert.add_extensions(cert_ext)

        # sign cert with CA key
        cert.sign(self.key, 'sha256')

        type(self).__next_serial += 1

        return cert
项目:kOVHernetes    作者:antoineco    | 项目源码 | 文件源码
def create_client_pair(self, o, cn):
        """Issue a X.509 client key/certificate pair"""

        # key
        key = crypto.PKey()
        key.generate_key(crypto.TYPE_RSA, 2048)

        # cert
        cert = crypto.X509()
        cert.set_serial_number(self.__next_serial)
        cert.set_version(2)
        cert.set_pubkey(key)
        cert.gmtime_adj_notBefore(0)
        cert.gmtime_adj_notAfter(365*24*60*60)

        cert_subject = cert.get_subject()
        cert_subject.O = o
        cert_subject.OU = 'kOVHernetes'
        cert_subject.CN = cn
        cert.set_issuer(self.cert.get_issuer())

        cert_ext = []
        cert_ext.append(crypto.X509Extension(b'subjectKeyIdentifier', False, b'hash', cert))
        cert_ext.append(crypto.X509Extension(b'authorityKeyIdentifier', False, b'keyid,issuer', issuer=cert))
        cert_ext.append(crypto.X509Extension(b'basicConstraints', False, b'CA:FALSE'))
        cert_ext.append(crypto.X509Extension(b'keyUsage', True, b'nonRepudiation, digitalSignature, keyEncipherment'))
        cert_ext.append(crypto.X509Extension(b'extendedKeyUsage', True, b'clientAuth'))
        cert.add_extensions(cert_ext)

        # sign cert with CA key
        cert.sign(self.key, 'sha256')

        type(self).__next_serial += 1

        return key, cert
项目:kOVHernetes    作者:antoineco    | 项目源码 | 文件源码
def create_server_pair(self, o, cn, san=[]):
        """Issue a X.509 server key/certificate pair"""

        # key
        key = crypto.PKey()
        key.generate_key(crypto.TYPE_RSA, 2048)

        # cert
        cert = crypto.X509()
        cert.set_serial_number(self.__next_serial)
        cert.set_version(2)
        cert.set_pubkey(key)
        cert.gmtime_adj_notBefore(0)
        cert.gmtime_adj_notAfter(365*24*60*60)

        cert_subject = cert.get_subject()
        cert_subject.O = o
        cert_subject.OU = 'kOVHernetes'
        cert_subject.CN = cn
        cert.set_issuer(self.cert.get_issuer())

        cert_ext = []
        cert_ext.append(crypto.X509Extension(b'subjectKeyIdentifier', False, b'hash', cert))
        cert_ext.append(crypto.X509Extension(b'authorityKeyIdentifier', False, b'keyid,issuer:always', issuer=cert))
        cert_ext.append(crypto.X509Extension(b'basicConstraints', False, b'CA:FALSE'))
        cert_ext.append(crypto.X509Extension(b'keyUsage', True, b'digitalSignature, keyEncipherment'))
        cert_ext.append(crypto.X509Extension(b'extendedKeyUsage', True, b'serverAuth'))
        if san: cert_ext.append(crypto.X509Extension(b'subjectAltName', False, ','.join(san).encode()))
        cert.add_extensions(cert_ext)

        # sign cert with CA key
        cert.sign(self.key, 'sha256')

        type(self).__next_serial += 1

        return key, cert
项目:ansible-nginx-load-balancer    作者:mrlesmithjr    | 项目源码 | 文件源码
def generate(self, module):
        '''Generate the certificate signing request.'''

        if not os.path.exists(self.path) or self.force:
            req = crypto.X509Req()
            req.set_version(self.version)
            subject = req.get_subject()
            for (key, value) in self.subject.items():
                if value is not None:
                    setattr(subject, key, value)

            if self.subjectAltName is not None:
                req.add_extensions([crypto.X509Extension(b"subjectAltName", False, self.subjectAltName.encode('ascii'))])

            privatekey_content = open(self.privatekey_path).read()
            self.privatekey = crypto.load_privatekey(crypto.FILETYPE_PEM, privatekey_content)

            req.set_pubkey(self.privatekey)
            req.sign(self.privatekey, self.digest)
            self.request = req

            try:
                csr_file = open(self.path, 'wb')
                csr_file.write(crypto.dump_certificate_request(crypto.FILETYPE_PEM, self.request))
                csr_file.close()
            except (IOError, OSError) as exc:
                raise CertificateSigningRequestError(exc)
        else:
            self.changed = False

        file_args = module.load_file_common_arguments(module.params)
        if module.set_fs_attributes_if_different(file_args, False):
            self.changed = True
项目:azure-cli    作者:Azure    | 项目源码 | 文件源码
def _create_test_cert(cert_file, key_file, subject, valid_days, serial_number):
    # create a key pair
    k = crypto.PKey()
    k.generate_key(crypto.TYPE_RSA, 2046)

    # create a self-signed cert with some basic constraints
    cert = crypto.X509()
    cert.get_subject().CN = subject
    cert.gmtime_adj_notBefore(-1 * 24 * 60 * 60)
    cert.gmtime_adj_notAfter(valid_days * 24 * 60 * 60)
    cert.set_version(2)
    cert.set_serial_number(serial_number)
    cert.add_extensions([
        crypto.X509Extension(b"basicConstraints", True, b"CA:TRUE, pathlen:1"),
        crypto.X509Extension(b"subjectKeyIdentifier", False, b"hash",
                             subject=cert),
    ])
    cert.add_extensions([
        crypto.X509Extension(b"authorityKeyIdentifier", False, b"keyid:always",
                             issuer=cert)
    ])
    cert.set_issuer(cert.get_subject())
    cert.set_pubkey(k)
    cert.sign(k, 'sha256')

    cert_str = crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode('ascii')
    key_str = crypto.dump_privatekey(crypto.FILETYPE_PEM, k).decode('ascii')

    open(cert_file, 'w').write(cert_str)
    open(key_file, 'w').write(key_str)
项目:azure-cli    作者:Azure    | 项目源码 | 文件源码
def _create_verification_cert(cert_file, key_file, verification_file, nonce, valid_days, serial_number):
    if exists(cert_file) and exists(key_file):
        # create a key pair
        public_key = crypto.PKey()
        public_key.generate_key(crypto.TYPE_RSA, 2046)

        # open the root cert and key
        signing_cert = crypto.load_certificate(crypto.FILETYPE_PEM, open(cert_file).read())
        k = crypto.load_privatekey(crypto.FILETYPE_PEM, open(key_file).read())

        # create a cert signed by the root
        verification_cert = crypto.X509()
        verification_cert.get_subject().CN = nonce
        verification_cert.gmtime_adj_notBefore(0)
        verification_cert.gmtime_adj_notAfter(valid_days * 24 * 60 * 60)
        verification_cert.set_version(2)
        verification_cert.set_serial_number(serial_number)

        verification_cert.set_pubkey(public_key)
        verification_cert.set_issuer(signing_cert.get_subject())
        verification_cert.add_extensions([
            crypto.X509Extension(b"authorityKeyIdentifier", False, b"keyid:always",
                                 issuer=signing_cert)
        ])
        verification_cert.sign(k, 'sha256')

        verification_cert_str = crypto.dump_certificate(crypto.FILETYPE_PEM, verification_cert).decode('ascii')

        open(verification_file, 'w').write(verification_cert_str)
项目:wile    作者:costela    | 项目源码 | 文件源码
def _generate_csr(key, key_digest, domains):
    csr = crypto.X509Req()
    csr.set_version(2)
    csr.set_pubkey(key)

    sans = ', '.join('DNS:{}'.format(d) for d in domains)
    exts = [crypto.X509Extension(b'subjectAltName', False, b(sans))]
    csr.add_extensions(exts)

    csr.sign(key, str(key_digest))

    return ComparableX509(csr)
项目:certerator    作者:stufus    | 项目源码 | 文件源码
def generate_ca(config_ca):
    ca = crypto.X509()
    ca.set_version(2)
    ca.set_serial_number(config_ca['serial'])
    ca_subj = ca.get_subject()
    if 'commonName' in config_ca:
        ca_subj.commonName = config_ca['commonName']
    if 'stateOrProvinceName' in config_ca:
        ca_subj.stateOrProvinceName = config_ca['stateOrProvinceName']
    if 'localityName' in config_ca:
        ca_subj.localityName = config_ca['localityName']
    if 'organizationName' in config_ca:
        ca_subj.organizationName = config_ca['organizationName']
    if 'organizationalUnitName' in config_ca:
        ca_subj.organizationalUnitName = config_ca['organizationalUnitName']
    if 'emailAddress' in config_ca:
        ca_subj.emailAddress = config_ca['emailAddress']
    if 'countryName' in config_ca:
        ca_subj.countryName = config_ca['countryName']
    if 'validfrom' in config_ca:
        ca.set_notBefore(config_ca['validfrom'])
    if 'validto' in config_ca:
        ca.set_notAfter(config_ca['validto'])
    key = openssl_generate_privatekey(config_ca['keyfilesize'])
    ca.add_extensions([
        crypto.X509Extension("basicConstraints", True, "CA:TRUE, pathlen:1"),
        crypto.X509Extension("keyUsage", False, "keyCertSign, cRLSign"),
        crypto.X509Extension("subjectKeyIdentifier", False, "hash", subject=ca),
    ])
    ca.add_extensions([
        crypto.X509Extension("authorityKeyIdentifier", False, "keyid:always",issuer=ca)
    ])
    ca.set_issuer(ca.get_subject())
    ca.set_pubkey(key)
    ca.sign(key, config_ca['hashalgorithm'])
    return ca, key
项目:certerator    作者:stufus    | 项目源码 | 文件源码
def generate_certificate(config_cert, ca, cakey):
    # Generate the private key
    key = openssl_generate_privatekey(config_cert['keyfilesize'])

    # Generate the certificate request
    req = crypto.X509Req()
    req_subj = req.get_subject()
    if 'commonName' in config_cert:
        req_subj.commonName = config_cert['commonName']
    if 'stateOrProvinceName' in config_cert:
        req_subj.stateOrProvinceName = config_cert['stateOrProvinceName']
    if 'localityName' in config_cert:
        req_subj.localityName = config_cert['localityName']
    if 'organizationName' in config_cert:
        req_subj.organizationName = config_cert['organizationName']
    if 'organizationalUnitName' in config_cert:
        req_subj.organizationalUnitName = config_cert['organizationalUnitName']
    if 'emailAddress' in config_cert:
        req_subj.emailAddress = config_cert['emailAddress']
    if 'countryName' in config_cert:
        req_subj.countryName = config_cert['countryName']

    req.set_pubkey(key)
    req.sign(key, config_cert['hashalgorithm'])

    # Now generate the certificate itself
    cert = crypto.X509()
    cert.set_version(2)
    cert.set_serial_number(config_cert['serial'])
    cert.set_subject(req.get_subject())
    cert.set_pubkey(req.get_pubkey())
    cert.set_issuer(ca.get_subject())

    if 'validfrom' in config_cert:
        cert.set_notBefore(config_cert['validfrom'])
    if 'validto' in config_cert:
        cert.set_notAfter(config_cert['validto'])

    cert.add_extensions([
        crypto.X509Extension("basicConstraints", True, "CA:FALSE"),
        crypto.X509Extension("keyUsage", False, "digitalSignature"),
        crypto.X509Extension("extendedKeyUsage", False, "codeSigning,msCTLSign,timeStamping,msCodeInd,msCodeCom"),
        crypto.X509Extension("subjectKeyIdentifier", False, "hash", subject=cert),
        crypto.X509Extension("authorityKeyIdentifier", False, "keyid:always", issuer=ca)
    ])

    cert.sign(cakey, config_cert['hashalgorithm'])
    return req, cert, key
项目:GotoX    作者:SeaHOH    | 项目源码 | 文件源码
def create_subcert(certfile, commonname, ip=False, sans=None):
    sans = set(sans) if sans else set()
    cert = crypto.X509()
    cert.set_version(2)
    cert.set_serial_number(int((int(time() - sub_serial) + random.random()) * 100)) #setting the only number
    subject = cert.get_subject()
    subject.countryName = 'CN'
    subject.stateOrProvinceName = 'Internet'
    subject.localityName = 'Cernet'
    subject.organizationalUnitName = '%s Branch' % ca_vendor
    subject.commonName = commonname
    subject.organizationName = commonname
    #????????????????????
    cert.gmtime_adj_notBefore(sub_time_b)
    cert.gmtime_adj_notAfter(sub_time_a)
    cert.set_issuer(ca_subject)
    cert.set_pubkey(sub_publickey)
    sans.add(commonname)
    if not ip:
        sans.add('*.' + commonname)
    sans = ', '.join('DNS: %s' % x for x in sans)
    cert.add_extensions([crypto.X509Extension(b'subjectAltName', True, sans.encode())])
    cert.sign(ca_privatekey, ca_digest)

    with open(certfile, 'wb') as fp:
        fp.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def create_certificate(self, request, issuer_cert, issuer_key, serial, days=3650, digest='sha256', extensions=[], subject_alt_names='', version=2):
        """
        Generate a certificate given the certificate request.

        Arguments: request     - Certificate request to sign
                   issuer_cert - The certificate of the issuer
                   issuer_key  - The private key of the issuer
                   extensions        - x509 extensions provided as a dictionary :name, :critical, :value
                   subject_alt_names - subject alt names e.g. IP:192.168.7.1 or DNS:my.domain
                   serial      - The serial number to assign to the certificate
                   days        - The number of days of validity (starting from now)
                   digest      - The digest method for signing (by default sha256)
        """

        certificate = crypto.X509()

        # Handle x509 extensions
        for extension in extensions:
            # handle issuer and subjects that need to be self-referential (root certificate)
            if 'subject' in extension.keys() and extension['subject'] == 'self':
                extension['subject'] = certificate
            if 'issuer' in extension.keys() and extension['issuer'] == 'self':
                extension['issuer'] = certificate
            elif 'issuer' in extension.keys() and extension['issuer'] != 'self':
                extension['issuer'] = issuer_cert

            # have to explicitly set 'critical' extension to a bool.
            if 'critical' in extension.keys():
                extension['critical'] = extension['critical'].lower() in ("yes", "true", "t", "1")

            # add the extensions to the request
            certificate.add_extensions([crypto.X509Extension(**extension)])

        # Handle the subject alternative names (these are just X509 extensions)
        if len(subject_alt_names) != 0:
            certificate.add_extensions([crypto.X509Extension("subjectAltName", False, ", ".join(subject_alt_names))])

        certificate.set_serial_number(serial)
        certificate.set_version(version)
        certificate.gmtime_adj_notBefore(0)
        certificate.gmtime_adj_notAfter(days*86400)
        certificate.set_subject(request.get_subject())
        certificate.set_issuer(issuer_cert.get_subject())
        certificate.set_pubkey(request.get_pubkey())
        certificate.sign(issuer_key, digest)
        return certificate
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def create_certificate(self, request, issuer_cert, issuer_key, serial, days=3650, digest='sha256', extensions=[], subject_alt_names='', version=2):
        """
        Generate a certificate given the certificate request.

        Arguments: request     - Certificate request to sign
                   issuer_cert - The certificate of the issuer
                   issuer_key  - The private key of the issuer
                   extensions        - x509 extensions provided as a dictionary :name, :critical, :value
                   subject_alt_names - subject alt names e.g. IP:192.168.7.1 or DNS:my.domain
                   serial      - The serial number to assign to the certificate
                   days        - The number of days of validity (starting from now)
                   digest      - The digest method for signing (by default sha256)
        """

        certificate = crypto.X509()

        # Handle x509 extensions
        for extension in extensions:
            # handle issuer and subjects that need to be self-referential (root certificate)
            if 'subject' in extension.keys() and extension['subject'] == 'self':
                extension['subject'] = certificate
            if 'issuer' in extension.keys() and extension['issuer'] == 'self':
                extension['issuer'] = certificate
            elif 'issuer' in extension.keys() and extension['issuer'] != 'self':
                extension['issuer'] = issuer_cert

            # have to explicitly set 'critical' extension to a bool.
            if 'critical' in extension.keys():
                extension['critical'] = extension['critical'].lower() in ("yes", "true", "t", "1")

            # add the extensions to the request
            certificate.add_extensions([crypto.X509Extension(**extension)])

        # Handle the subject alternative names (these are just X509 extensions)
        if len(subject_alt_names) != 0:
            certificate.add_extensions([crypto.X509Extension("subjectAltName", False, ", ".join(subject_alt_names))])

        certificate.set_serial_number(serial)
        certificate.set_version(version)
        certificate.gmtime_adj_notBefore(0)
        certificate.gmtime_adj_notAfter(days*86400)
        certificate.set_subject(request.get_subject())
        certificate.set_issuer(issuer_cert.get_subject())
        certificate.set_pubkey(request.get_pubkey())
        certificate.sign(issuer_key, digest)
        return certificate
项目:2FAssassin    作者:maxwellkoh    | 项目源码 | 文件源码
def _create_certificate_chain():
    """
    Construct and return a chain of certificates.

        1. A new self-signed certificate authority certificate (cacert)
        2. A new intermediate certificate signed by cacert (icert)
        3. A new server certificate signed by icert (scert)
    """
    caext = X509Extension(b'basicConstraints', False, b'CA:true')

    # Step 1
    cakey = PKey()
    cakey.generate_key(TYPE_RSA, 1024)
    cacert = X509()
    cacert.get_subject().commonName = "Authority Certificate"
    cacert.set_issuer(cacert.get_subject())
    cacert.set_pubkey(cakey)
    cacert.set_notBefore(b"20000101000000Z")
    cacert.set_notAfter(b"20200101000000Z")
    cacert.add_extensions([caext])
    cacert.set_serial_number(0)
    cacert.sign(cakey, "sha1")

    # Step 2
    ikey = PKey()
    ikey.generate_key(TYPE_RSA, 1024)
    icert = X509()
    icert.get_subject().commonName = "Intermediate Certificate"
    icert.set_issuer(cacert.get_subject())
    icert.set_pubkey(ikey)
    icert.set_notBefore(b"20000101000000Z")
    icert.set_notAfter(b"20200101000000Z")
    icert.add_extensions([caext])
    icert.set_serial_number(0)
    icert.sign(cakey, "sha1")

    # Step 3
    skey = PKey()
    skey.generate_key(TYPE_RSA, 1024)
    scert = X509()
    scert.get_subject().commonName = "Server Certificate"
    scert.set_issuer(icert.get_subject())
    scert.set_pubkey(skey)
    scert.set_notBefore(b"20000101000000Z")
    scert.set_notAfter(b"20200101000000Z")
    scert.add_extensions([
        X509Extension(b'basicConstraints', True, b'CA:false')])
    scert.set_serial_number(0)
    scert.sign(ikey, "sha1")

    return [(cakey, cacert), (ikey, icert), (skey, scert)]
项目:django-x509    作者:openwisp    | 项目源码 | 文件源码
def _add_extensions(self, cert):
        """
        (internal use only)
        adds x509 extensions to ``cert``
        """
        ext = []
        # prepare extensions for CA
        if not hasattr(self, 'ca'):
            pathlen = app_settings.CA_BASIC_CONSTRAINTS_PATHLEN
            ext_value = 'CA:TRUE'
            if pathlen is not None:
                ext_value = '{0}, pathlen:{1}'.format(ext_value, pathlen)
            ext.append(crypto.X509Extension(b'basicConstraints',
                                            app_settings.CA_BASIC_CONSTRAINTS_CRITICAL,
                                            bytes_compat(ext_value)))
            ext.append(crypto.X509Extension(b'keyUsage',
                                            app_settings.CA_KEYUSAGE_CRITICAL,
                                            bytes_compat(app_settings.CA_KEYUSAGE_VALUE)))
            issuer_cert = cert
        # prepare extensions for end-entity certs
        else:
            ext.append(crypto.X509Extension(b'basicConstraints',
                                            False,
                                            b'CA:FALSE'))
            ext.append(crypto.X509Extension(b'keyUsage',
                                            app_settings.CERT_KEYUSAGE_CRITICAL,
                                            bytes_compat(app_settings.CERT_KEYUSAGE_VALUE)))
            issuer_cert = self.ca.x509
        ext.append(crypto.X509Extension(b'subjectKeyIdentifier',
                                        False,
                                        b'hash',
                                        subject=cert))
        cert.add_extensions(ext)
        # authorityKeyIdentifier must be added after
        # the other extensions have been already added
        cert.add_extensions([
            crypto.X509Extension(b'authorityKeyIdentifier',
                                 False,
                                 b'keyid:always,issuer:always',
                                 issuer=issuer_cert)
        ])
        for ext in self.extensions:
            cert.add_extensions([
                crypto.X509Extension(bytes_compat(ext['name']),
                                     bool(ext['critical']),
                                     bytes_compat(ext['value']))
            ])
        return cert
项目:fabric-test    作者:hyperledger    | 项目源码 | 文件源码
def createCertificate(req, issuerCertKey, serial, validityPeriod, digest="sha256", isCA=False, extensions=[]):
    """
    Generate a certificate given a certificate request.
    Arguments: req        - Certificate request to use
               issuerCert - The certificate of the issuer
               issuerKey  - The private key of the issuer
               serial     - Serial number for the certificate
               notBefore  - Timestamp (relative to now) when the certificate
                            starts being valid
               notAfter   - Timestamp (relative to now) when the certificate
                            stops being valid
               digest     - Digest method to use for signing, default is sha256
    Returns:   The signed certificate in an X509 object
    """
    issuerCert, issuerKey = issuerCertKey
    notBefore, notAfter = validityPeriod
    cert = crypto.X509()
    cert.set_version(3)
    cert.set_serial_number(serial)
    cert.gmtime_adj_notBefore(notBefore)
    cert.gmtime_adj_notAfter(notAfter)
    cert.set_issuer(issuerCert.get_subject())
    cert.set_subject(req.get_subject())
    cert.set_pubkey(req.get_pubkey())
    if isCA:
        cert.add_extensions([crypto.X509Extension("basicConstraints", True,
                                                  "CA:TRUE, pathlen:0"),
                             crypto.X509Extension("subjectKeyIdentifier", False, "hash",
                                                  subject=cert)])
        #TODO: This only is appropriate for root self signed!!!!
        cert.add_extensions([crypto.X509Extension("authorityKeyIdentifier", False, "keyid:always", issuer=cert)])
    else:
        cert.add_extensions([crypto.X509Extension("basicConstraints", True,
                                                  "CA:FALSE"),
                             crypto.X509Extension("subjectKeyIdentifier", False, "hash",
                                                  subject=cert)])
        cert.add_extensions([crypto.X509Extension("authorityKeyIdentifier", False, "keyid:always", issuer=issuerCert)])

    cert.add_extensions(extensions)
    cert.sign(issuerKey, digest)
    return cert


# SUBJECT_DEFAULT = {countryName : "US", stateOrProvinceName : "NC", localityName : "RTP", organizationName : "IBM", organizationalUnitName : "Blockchain"}
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def _create_certificate_chain():
    """
    Construct and return a chain of certificates.

        1. A new self-signed certificate authority certificate (cacert)
        2. A new intermediate certificate signed by cacert (icert)
        3. A new server certificate signed by icert (scert)
    """
    caext = X509Extension(b('basicConstraints'), False, b('CA:true'))

    # Step 1
    cakey = PKey()
    cakey.generate_key(TYPE_RSA, 512)
    cacert = X509()
    cacert.get_subject().commonName = "Authority Certificate"
    cacert.set_issuer(cacert.get_subject())
    cacert.set_pubkey(cakey)
    cacert.set_notBefore(b("20000101000000Z"))
    cacert.set_notAfter(b("20200101000000Z"))
    cacert.add_extensions([caext])
    cacert.set_serial_number(0)
    cacert.sign(cakey, "sha1")

    # Step 2
    ikey = PKey()
    ikey.generate_key(TYPE_RSA, 512)
    icert = X509()
    icert.get_subject().commonName = "Intermediate Certificate"
    icert.set_issuer(cacert.get_subject())
    icert.set_pubkey(ikey)
    icert.set_notBefore(b("20000101000000Z"))
    icert.set_notAfter(b("20200101000000Z"))
    icert.add_extensions([caext])
    icert.set_serial_number(0)
    icert.sign(cakey, "sha1")

    # Step 3
    skey = PKey()
    skey.generate_key(TYPE_RSA, 512)
    scert = X509()
    scert.get_subject().commonName = "Server Certificate"
    scert.set_issuer(icert.get_subject())
    scert.set_pubkey(skey)
    scert.set_notBefore(b("20000101000000Z"))
    scert.set_notAfter(b("20200101000000Z"))
    scert.add_extensions([
            X509Extension(b('basicConstraints'), True, b('CA:false'))])
    scert.set_serial_number(0)
    scert.sign(ikey, "sha1")

    return [(cakey, cacert), (ikey, icert), (skey, scert)]