Python cryptography.x509 模块,Name() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用cryptography.x509.Name()

项目:fabric-sdk-py    作者:hyperledger    | 项目源码 | 文件源码
def enroll(self, enrollment_id, enrollment_secret):
        """Enroll a registered user in order to receive a signed X509 certificate

        Args:
            enrollment_id (str): The registered ID to use for enrollment
            enrollment_secret (str): The secret associated with the
                                     enrollment ID

        Returns: PEM-encoded X509 certificate

        Raises:
            RequestException: errors in requests.exceptions
            ValueError: Failed response, json parse error, args missing

        """
        private_key = self._crypto.generate_private_key()
        csr = self._crypto.generate_csr(private_key, x509.Name(
            [x509.NameAttribute(NameOID.COMMON_NAME, six.u(enrollment_id))]))
        cert = self._ca_client.enroll(
            enrollment_id, enrollment_secret,
            csr.public_bytes(Encoding.PEM).decode("utf-8"))

        return Enrollment(private_key, cert)
项目:privcount    作者:privcount    | 项目源码 | 文件源码
def generate_cert(key_path, cert_out_path):
    private_key = load_private_key_file(key_path)
    public_key = private_key.public_key()

    builder = x509.CertificateBuilder()
    builder = builder.subject_name(x509.Name([
        x509.NameAttribute(x509.OID_COMMON_NAME, u'PrivCount User'),
    ]))
    builder = builder.issuer_name(x509.Name([
        x509.NameAttribute(x509.OID_COMMON_NAME, u'PrivCount Authority'),
    ]))
    builder = builder.not_valid_before(datetime.datetime.today() - datetime.timedelta(days=1))
    builder = builder.not_valid_after(datetime.datetime(2020, 1, 1))
    builder = builder.serial_number(int(uuid.uuid4()))
    builder = builder.public_key(public_key)
    builder = builder.add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True)

    certificate = builder.sign(private_key=private_key, algorithm=hashes.SHA256(), backend=default_backend())

    with open(cert_out_path, 'wb') as outf:
        print >>outf, certificate.public_bytes(encoding=serialization.Encoding.PEM)
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def _decode_x509_name(backend, x509_name):
    count = backend._lib.X509_NAME_entry_count(x509_name)
    attributes = []
    prev_set_id = -1
    for x in range(count):
        entry = backend._lib.X509_NAME_get_entry(x509_name, x)
        attribute = _decode_x509_name_entry(backend, entry)
        set_id = backend._lib.Cryptography_X509_NAME_ENTRY_set(entry)
        if set_id != prev_set_id:
            attributes.append(set([attribute]))
        else:
            # is in the same RDN a previous entry
            attributes[-1].add(attribute)
        prev_set_id = set_id

    return x509.Name(x509.RelativeDistinguishedName(rdn) for rdn in attributes)
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def _build_x509_name(self, x509_name):
        count = self._backend._lib.X509_NAME_entry_count(x509_name)
        attributes = []
        for x in range(count):
            entry = self._backend._lib.X509_NAME_get_entry(x509_name, x)
            obj = self._backend._lib.X509_NAME_ENTRY_get_object(entry)
            assert obj != self._backend._ffi.NULL
            data = self._backend._lib.X509_NAME_ENTRY_get_data(entry)
            assert data != self._backend._ffi.NULL
            buf = self._backend._ffi.new("unsigned char **")
            res = self._backend._lib.ASN1_STRING_to_UTF8(buf, data)
            assert res >= 0
            assert buf[0] != self._backend._ffi.NULL
            buf = self._backend._ffi.gc(
                buf, lambda buf: self._backend._lib.OPENSSL_free(buf[0])
            )
            value = self._backend._ffi.buffer(buf[0], res)[:].decode('utf8')
            oid = self._obj2txt(obj)
            attributes.append(
                x509.NameAttribute(
                    x509.ObjectIdentifier(oid), value
                )
            )

        return x509.Name(attributes)
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def _build_x509_name(self, x509_name):
        count = self._backend._lib.X509_NAME_entry_count(x509_name)
        attributes = []
        for x in range(count):
            entry = self._backend._lib.X509_NAME_get_entry(x509_name, x)
            obj = self._backend._lib.X509_NAME_ENTRY_get_object(entry)
            assert obj != self._backend._ffi.NULL
            data = self._backend._lib.X509_NAME_ENTRY_get_data(entry)
            assert data != self._backend._ffi.NULL
            buf = self._backend._ffi.new("unsigned char **")
            res = self._backend._lib.ASN1_STRING_to_UTF8(buf, data)
            assert res >= 0
            assert buf[0] != self._backend._ffi.NULL
            buf = self._backend._ffi.gc(
                buf, lambda buf: self._backend._lib.OPENSSL_free(buf[0])
            )
            value = self._backend._ffi.buffer(buf[0], res)[:].decode('utf8')
            oid = self._obj2txt(obj)
            attributes.append(
                x509.NameAttribute(
                    x509.ObjectIdentifier(oid), value
                )
            )

        return x509.Name(attributes)
项目:manuale    作者:veeti    | 项目源码 | 文件源码
def create_csr(key, domains, must_staple=False):
    """
    Creates a CSR in DER format for the specified key and domain names.
    """
    assert domains
    name = x509.Name([
        x509.NameAttribute(NameOID.COMMON_NAME, domains[0]),
    ])
    san = x509.SubjectAlternativeName([x509.DNSName(domain) for domain in domains])
    csr = x509.CertificateSigningRequestBuilder().subject_name(name) \
        .add_extension(san, critical=False)
    if must_staple:
        ocsp_must_staple = x509.TLSFeature(features=[x509.TLSFeatureType.status_request])
        csr = csr.add_extension(ocsp_must_staple, critical=False)
    csr = csr.sign(key, hashes.SHA256(), default_backend())
    return export_csr_for_acme(csr)
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def _build_x509_name(self, x509_name):
        count = self._backend._lib.X509_NAME_entry_count(x509_name)
        attributes = []
        for x in range(count):
            entry = self._backend._lib.X509_NAME_get_entry(x509_name, x)
            obj = self._backend._lib.X509_NAME_ENTRY_get_object(entry)
            assert obj != self._backend._ffi.NULL
            data = self._backend._lib.X509_NAME_ENTRY_get_data(entry)
            assert data != self._backend._ffi.NULL
            buf = self._backend._ffi.new("unsigned char **")
            res = self._backend._lib.ASN1_STRING_to_UTF8(buf, data)
            assert res >= 0
            assert buf[0] != self._backend._ffi.NULL
            buf = self._backend._ffi.gc(
                buf, lambda buf: self._backend._lib.OPENSSL_free(buf[0])
            )
            value = self._backend._ffi.buffer(buf[0], res)[:].decode('utf8')
            oid = self._obj2txt(obj)
            attributes.append(
                x509.NameAttribute(
                    x509.ObjectIdentifier(oid), value
                )
            )

        return x509.Name(attributes)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_hostnameIsIndicated(self):
        """
        Specifying the C{hostname} argument to L{CertificateOptions} also sets
        the U{Server Name Extension
        <https://en.wikipedia.org/wiki/Server_Name_Indication>} TLS indication
        field to the correct value.
        """
        names = []
        def setupServerContext(ctx):
            def servername_received(conn):
                names.append(conn.get_servername().decode("ascii"))
            ctx.set_tlsext_servername_callback(servername_received)
        cProto, sProto, pump = self.serviceIdentitySetup(
            u"valid.example.com",
            u"valid.example.com",
            setupServerContext
        )
        self.assertEqual(names, [u"valid.example.com"])
项目:RemoteTree    作者:deNULL    | 项目源码 | 文件源码
def _decode_x509_name(backend, x509_name):
    count = backend._lib.X509_NAME_entry_count(x509_name)
    attributes = []
    prev_set_id = -1
    for x in range(count):
        entry = backend._lib.X509_NAME_get_entry(x509_name, x)
        attribute = _decode_x509_name_entry(backend, entry)
        set_id = backend._lib.Cryptography_X509_NAME_ENTRY_set(entry)
        if set_id != prev_set_id:
            attributes.append(set([attribute]))
        else:
            # is in the same RDN a previous entry
            attributes[-1].add(attribute)
        prev_set_id = set_id

    return x509.Name(x509.RelativeDistinguishedName(rdn) for rdn in attributes)
项目:quickstart-git2s3    作者:aws-quickstart    | 项目源码 | 文件源码
def _decode_x509_name(backend, x509_name):
    count = backend._lib.X509_NAME_entry_count(x509_name)
    attributes = []
    prev_set_id = -1
    for x in range(count):
        entry = backend._lib.X509_NAME_get_entry(x509_name, x)
        attribute = _decode_x509_name_entry(backend, entry)
        set_id = backend._lib.Cryptography_X509_NAME_ENTRY_set(entry)
        if set_id != prev_set_id:
            attributes.append(set([attribute]))
        else:
            # is in the same RDN a previous entry
            attributes[-1].add(attribute)
        prev_set_id = set_id

    return x509.Name(x509.RelativeDistinguishedName(rdn) for rdn in attributes)
项目:Docker-XX-Net    作者:kuanghy    | 项目源码 | 文件源码
def _build_x509_name(self, x509_name):
        count = self._backend._lib.X509_NAME_entry_count(x509_name)
        attributes = []
        for x in range(count):
            entry = self._backend._lib.X509_NAME_get_entry(x509_name, x)
            obj = self._backend._lib.X509_NAME_ENTRY_get_object(entry)
            assert obj != self._backend._ffi.NULL
            data = self._backend._lib.X509_NAME_ENTRY_get_data(entry)
            assert data != self._backend._ffi.NULL
            buf = self._backend._ffi.new("unsigned char **")
            res = self._backend._lib.ASN1_STRING_to_UTF8(buf, data)
            assert res >= 0
            assert buf[0] != self._backend._ffi.NULL
            buf = self._backend._ffi.gc(
                buf, lambda buf: self._backend._lib.OPENSSL_free(buf[0])
            )
            value = self._backend._ffi.buffer(buf[0], res)[:].decode('utf8')
            oid = self._obj2txt(obj)
            attributes.append(
                x509.NameAttribute(
                    x509.ObjectIdentifier(oid), value
                )
            )

        return x509.Name(attributes)
项目:barbican-tempest-plugin    作者:openstack    | 项目源码 | 文件源码
def _create_self_signed_certificate(self, private_key):
        issuer = x509.Name([
            x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),
            x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"CA"),
            x509.NameAttribute(NameOID.LOCALITY_NAME, u"San Francisco"),
            x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"My Company"),
            x509.NameAttribute(NameOID.COMMON_NAME, u"Test Certificate"),
        ])
        cert_builder = x509.CertificateBuilder(
            issuer_name=issuer, subject_name=issuer,
            public_key=private_key.public_key(),
            serial_number=x509.random_serial_number(),
            not_valid_before=datetime.utcnow(),
            not_valid_after=datetime.utcnow() + timedelta(days=10)
        )
        cert = cert_builder.sign(private_key,
                                 hashes.SHA256(),
                                 default_backend())
        return cert
项目:concorde    作者:frutiger    | 项目源码 | 文件源码
def _check_or_add_cert(self, name, domain, key, authorization):
        if 'certificate' in domain:
            return domain['certificate']

        self._log('domain:{}: generating CSR...', name)
        builder = x509.CertificateSigningRequestBuilder()
        builder = builder.subject_name(x509.Name([
            x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, name),
        ]))
        csr = builder.sign(key, hashes.SHA256(), backend)
        self._log('domain:{}: done', name)

        self._log('domain:{}: requesting certificate...', name)
        certificate = self._client.new_certificate(self._key, csr)
        domain['certificate'] = certificate
        self._write_config()
        self._log('domain:{}: done: {}', name, certificate)

        return certificate
项目:marathon-acme    作者:praekeltfoundation    | 项目源码 | 文件源码
def generate_wildcard_pem_bytes():
    """
    Generate a wildcard (subject name '*') self-signed certificate valid for
    10 years.

    https://cryptography.io/en/latest/x509/tutorial/#creating-a-self-signed-certificate

    :return: Bytes representation of the PEM certificate data
    """
    key = generate_private_key(u'rsa')
    name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, u'*')])
    cert = (
        x509.CertificateBuilder()
        .issuer_name(name)
        .subject_name(name)
        .not_valid_before(datetime.today() - timedelta(days=1))
        .not_valid_after(datetime.now() + timedelta(days=3650))
        .serial_number(int(uuid.uuid4()))
        .public_key(key.public_key())
        .sign(
            private_key=key,
            algorithm=hashes.SHA256(),
            backend=default_backend())
        )

    return b''.join((
        key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.TraditionalOpenSSL,
            encryption_algorithm=serialization.NoEncryption()),
        cert.public_bytes(serialization.Encoding.PEM)
    ))
项目:fabric-sdk-py    作者:hyperledger    | 项目源码 | 文件源码
def generate_csr(self, private_key, subject_name, extensions=None):
        """Generate certificate signing request.

        Args:
            private_key: Private key
            subject_name (x509.Name): Subject name
            extensions
        Returns: x509.CertificateSigningRequest

        """
        builder = x509.CertificateSigningRequestBuilder(
            subject_name, [] if extensions is None else extensions)

        return builder.sign(
            private_key, self.sign_hash_algorithm, default_backend())
项目:fabric-sdk-py    作者:hyperledger    | 项目源码 | 文件源码
def test_ecies_generate_csr(self):
        """Test case for generate certificate signing request."""
        ecies256 = ecies()
        private_key = ecies256.generate_private_key()
        csr = ecies256.generate_csr(private_key, x509.Name(
            [x509.NameAttribute(NameOID.COMMON_NAME, u"test")]))
        csr_pem = csr.public_bytes(Encoding.PEM)
        self.assertTrue(csr_pem.startswith(
            b"-----BEGIN CERTIFICATE REQUEST-----"))
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _decode_x509_name(backend, x509_name):
    count = backend._lib.X509_NAME_entry_count(x509_name)
    attributes = []
    for x in range(count):
        entry = backend._lib.X509_NAME_get_entry(x509_name, x)
        attributes.append(_decode_x509_name_entry(backend, entry))

    return x509.Name(attributes)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _decode_x509_name(backend, x509_name):
    count = backend._lib.X509_NAME_entry_count(x509_name)
    attributes = []
    for x in range(count):
        entry = backend._lib.X509_NAME_get_entry(x509_name, x)
        attributes.append(_decode_x509_name_entry(backend, entry))

    return x509.Name(attributes)
项目:py2p    作者:p2p-today    | 项目源码 | 文件源码
def generate_self_signed_cert(cert_file, key_file):
    # type: (Any, Any) -> None
    """Given two file-like objects, generate an SSL key and certificate

    Args:
        cert_file:  The certificate file you wish to write to
        key_file:   The key file you wish to write to
    """
    one_day = timedelta(1, 0, 0)
    private_key = rsa.generate_private_key(
        public_exponent=65537, key_size=2048, backend=default_backend())
    public_key = private_key.public_key()
    builder = x509.CertificateBuilder()
    builder = builder.subject_name(
        x509.Name([
            x509.NameAttribute(NameOID.COMMON_NAME, u'cryptography.io'),
        ]))
    builder = builder.issuer_name(
        x509.Name([
            x509.NameAttribute(NameOID.COMMON_NAME, u'cryptography.io'),
        ]))
    builder = builder.not_valid_before(datetime.today() - one_day)
    builder = builder.not_valid_after(datetime.today() + timedelta(365 * 10))
    builder = builder.serial_number(uuid4().int)
    builder = builder.public_key(public_key)
    builder = builder.add_extension(
        x509.BasicConstraints(ca=False, path_length=None),
        critical=True, )
    certificate = builder.sign(
        private_key=private_key,
        algorithm=hashes.SHA256(),
        backend=default_backend())

    key_file.write(
        private_key.private_bytes(
            Encoding.PEM, PrivateFormat.TraditionalOpenSSL, NoEncryption()))
    cert_file.write(certificate.public_bytes(Encoding.PEM))
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def _decode_x509_name(backend, x509_name):
    count = backend._lib.X509_NAME_entry_count(x509_name)
    attributes = []
    for x in range(count):
        entry = backend._lib.X509_NAME_get_entry(x509_name, x)
        attributes.append(_decode_x509_name_entry(backend, entry))

    return x509.Name(attributes)
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def _decode_x509_name(backend, x509_name):
    count = backend._lib.X509_NAME_entry_count(x509_name)
    attributes = []
    for x in range(count):
        entry = backend._lib.X509_NAME_get_entry(x509_name, x)
        attributes.append(_decode_x509_name_entry(backend, entry))

    return x509.Name(attributes)
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def _decode_x509_name(backend, x509_name):
    count = backend._lib.X509_NAME_entry_count(x509_name)
    attributes = []
    for x in range(count):
        entry = backend._lib.X509_NAME_get_entry(x509_name, x)
        attributes.append(_decode_x509_name_entry(backend, entry))

    return x509.Name(attributes)
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def _decode_x509_name(backend, x509_name):
    count = backend._lib.X509_NAME_entry_count(x509_name)
    attributes = []
    for x in range(count):
        entry = backend._lib.X509_NAME_get_entry(x509_name, x)
        attributes.append(_decode_x509_name_entry(backend, entry))

    return x509.Name(attributes)
项目:django-autocert    作者:farrepa    | 项目源码 | 文件源码
def set_csr_if_blank(self):
        if not self.csr:
            private_key = self.get_key()
            builder = x509.CertificateSigningRequestBuilder()
            builder = builder.subject_name(x509.Name([
                x509.NameAttribute(NameOID.COMMON_NAME, self.get_common_name()),
                x509.NameAttribute(NameOID.COUNTRY_NAME, u'{}'.format(self.account.country)),
                x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u'{}'.format(self.account.state)),
                x509.NameAttribute(NameOID.LOCALITY_NAME, u'{}'.format(self.account.locality)),
                x509.NameAttribute(NameOID.ORGANIZATION_NAME, u'{}'.format(self.account.organization_name)),
                x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u'{}'.format(self.account.organizational_unit_name)),
            ]))
            builder = builder.add_extension(x509.SubjectAlternativeName(self.get_san_entries()), critical=False)
            csr = builder.sign(private_key, hashes.SHA256(), default_backend())
            self.csr = csr.public_bytes(serialization.Encoding.PEM)
项目:2FAssassin    作者:maxwellkoh    | 项目源码 | 文件源码
def ca_file(tmpdir):
    """
    Create a valid PEM file with CA certificates and return the path.
    """
    key = rsa.generate_private_key(
        public_exponent=65537,
        key_size=2048,
        backend=default_backend()
    )
    public_key = key.public_key()

    builder = x509.CertificateBuilder()
    builder = builder.subject_name(x509.Name([
        x509.NameAttribute(NameOID.COMMON_NAME, u"pyopenssl.org"),
    ]))
    builder = builder.issuer_name(x509.Name([
        x509.NameAttribute(NameOID.COMMON_NAME, u"pyopenssl.org"),
    ]))
    one_day = datetime.timedelta(1, 0, 0)
    builder = builder.not_valid_before(datetime.datetime.today() - one_day)
    builder = builder.not_valid_after(datetime.datetime.today() + one_day)
    builder = builder.serial_number(int(uuid.uuid4()))
    builder = builder.public_key(public_key)
    builder = builder.add_extension(
        x509.BasicConstraints(ca=True, path_length=None), critical=True,
    )

    certificate = builder.sign(
        private_key=key, algorithm=hashes.SHA256(),
        backend=default_backend()
    )

    ca_file = tmpdir.join("test.pem")
    ca_file.write_binary(
        certificate.public_bytes(
            encoding=serialization.Encoding.PEM,
        )
    )

    return str(ca_file).encode("ascii")
项目:seedbox    作者:nailgun    | 项目源码 | 文件源码
def create_ca_certificate(cn, key_size=4096, certify_days=365):
    key = rsa.generate_private_key(public_exponent=65537, key_size=key_size, backend=default_backend())
    key_id = x509.SubjectKeyIdentifier.from_public_key(key.public_key())

    subject = issuer = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cn)])

    now = datetime.datetime.utcnow()
    serial = x509.random_serial_number()
    cert = x509.CertificateBuilder() \
        .subject_name(subject) \
        .issuer_name(issuer) \
        .public_key(key.public_key()) \
        .serial_number(serial) \
        .not_valid_before(now) \
        .not_valid_after(now + datetime.timedelta(days=certify_days)) \
        .add_extension(key_id, critical=False) \
        .add_extension(x509.AuthorityKeyIdentifier(key_id.digest,
                                                   [x509.DirectoryName(issuer)],
                                                   serial),
                       critical=False) \
        .add_extension(x509.BasicConstraints(ca=True, path_length=0), critical=True) \
        .add_extension(x509.KeyUsage(digital_signature=True,
                                     content_commitment=False,
                                     key_encipherment=False,
                                     data_encipherment=False,
                                     key_agreement=False,
                                     key_cert_sign=True,
                                     crl_sign=True,
                                     encipher_only=False,
                                     decipher_only=False),
                       critical=True) \
        .sign(key, hashes.SHA256(), default_backend())

    cert = cert.public_bytes(serialization.Encoding.PEM)
    key = key.private_bytes(encoding=serialization.Encoding.PEM,
                            format=serialization.PrivateFormat.TraditionalOpenSSL,
                            encryption_algorithm=serialization.NoEncryption())
    return cert, key
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def _decode_x509_name(backend, x509_name):
    count = backend._lib.X509_NAME_entry_count(x509_name)
    attributes = []
    for x in range(count):
        entry = backend._lib.X509_NAME_get_entry(x509_name, x)
        attributes.append(_decode_x509_name_entry(backend, entry))

    return x509.Name(attributes)
项目:certproxy    作者:geneanet    | 项目源码 | 文件源码
def dict_to_x509_name(data):
    name_attributes = []
    attr_name_oid = {
        'commonName': x509.NameOID.COMMON_NAME,
        'countryName': x509.NameOID.COUNTRY_NAME,
        'stateOrProvinceName': x509.NameOID.STATE_OR_PROVINCE_NAME,
        'locality': x509.NameOID.LOCALITY_NAME,
        'organizationName': x509.NameOID.ORGANIZATION_NAME,
        'organizationalUnitName': x509.NameOID.ORGANIZATIONAL_UNIT_NAME,
    }
    for key, value in data.items():
        if not key in attr_name_oid:
            raise ValueError('{} is not a supported x509 name attribute'.format(key))
        name_attributes.append(x509.NameAttribute(attr_name_oid[key], value))
    return x509.Name(name_attributes)
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def _decode_x509_name(backend, x509_name):
    count = backend._lib.X509_NAME_entry_count(x509_name)
    attributes = []
    for x in range(count):
        entry = backend._lib.X509_NAME_get_entry(x509_name, x)
        attributes.append(_decode_x509_name_entry(backend, entry))

    return x509.Name(attributes)
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def _decode_x509_name(backend, x509_name):
    count = backend._lib.X509_NAME_entry_count(x509_name)
    attributes = []
    for x in range(count):
        entry = backend._lib.X509_NAME_get_entry(x509_name, x)
        attributes.append(_decode_x509_name_entry(backend, entry))

    return x509.Name(attributes)
项目:RKSV    作者:ztp-at    | 项目源码 | 文件源码
def makeSignedCert(cpub, ccn, cvdays, cserial, spriv, scert=None):
    """
    Creates a certificate for a given public key and signs it with a given
    certificate and private key. It will reuse the subject of the signing
    certificate as the subject of the new certificate, only replacing the
    common name with the one given as parameter, if a signing certificate is
    specified, otherwise it will just use the given common name as subject
    and issuer.
    :param cpub: Public key for which to create a certificate.
    :param ccn: Common name for the new certificate.
    :param cvdays: Number of days the new certificate is valid.
    :param cserial: The serial number for the new certificate as an int.
    :param spriv: Private key for the signing certificate.
    :param scert: Certificate used to sign the new certificate, or None if
    no certificate is used.
    :return: The new certificate as an object.
    """
    if scert:
        sname = x509.Name(
            [ p for p in scert.subject if p.oid != NameOID.COMMON_NAME ]
            + [ x509.NameAttribute(NameOID.COMMON_NAME, ccn) ])
        iname = scert.subject
    else:
        sname = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, ccn)])
        iname = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, ccn)])

    builder = x509.CertificateBuilder()
    builder = builder.subject_name(sname)
    builder = builder.issuer_name(iname)
    builder = builder.not_valid_before(datetime.datetime.today())
    builder = builder.not_valid_after(datetime.datetime.today() +
            datetime.timedelta(cvdays, 0, 0))
    builder = builder.serial_number(cserial)
    builder = builder.public_key(cpub)
    builder = builder.add_extension(
            x509.BasicConstraints(ca=True, path_length=None),
            critical=True
    )
    return builder.sign(private_key=spriv, algorithm=hashes.SHA256(),
            backend=default_backend())
项目:autocert    作者:mozilla-it    | 项目源码 | 文件源码
def _create_csr(common_name, key, oids=None, sans=None):
    app.logger.info('called create_csr:\n{0}'.format(pformat(locals())))
    builder = x509.CertificateSigningRequestBuilder()
    oids = _create_oids(common_name, oids if oids else {})
    subject = builder.subject_name(x509.Name(oids))
    if sans:
        _add_sans(subject, sans)
    csr = subject.sign(key, hashes.SHA256(), default_backend())
    return csr
项目:autocert    作者:mozilla-it    | 项目源码 | 文件源码
def _create_csr(common_name, key, oids=None, sans=None):
    app.logger.info('called create_csr:\n{0}'.format(pformat(locals())))
    builder = x509.CertificateSigningRequestBuilder()
    oids = _create_oids(common_name, oids if oids else {})
    subject = builder.subject_name(x509.Name(oids))
    if sans:
        _add_sans(subject, sans)
    csr = subject.sign(key, hashes.SHA256(), default_backend())
    return csr
项目:autocert    作者:mozilla-it    | 项目源码 | 文件源码
def _create_csr(common_name, key, oids=None, sans=None):
    app.logger.info('called create_csr:\n{0}'.format(pformat(locals())))
    builder = x509.CertificateSigningRequestBuilder()
    oids = _create_oids(common_name, oids if oids else {})
    subject = builder.subject_name(x509.Name(oids))
    if sans:
        _add_sans(subject, sans)
    csr = subject.sign(key, hashes.SHA256(), default_backend())
    return csr
项目:autocert    作者:mozilla-it    | 项目源码 | 文件源码
def _create_csr(common_name, key, oids=None, sans=None):
    app.logger.info('called create_csr:\n{0}'.format(pformat(locals())))
    builder = x509.CertificateSigningRequestBuilder()
    oids = _create_oids(common_name, oids if oids else {})
    subject = builder.subject_name(x509.Name(oids))
    if sans:
        _add_sans(subject, sans)
    csr = subject.sign(key, hashes.SHA256(), default_backend())
    return csr
项目:lokey    作者:jpf    | 项目源码 | 文件源码
def serialize(self,
                  # password=None,
                  country=u"US",
                  state=u"CA",
                  city=u"San Francisco",
                  company=u"Lokey Examle",
                  common_name=u"example.com"):
        # This should be handled already
        # if not password:
        #     password = None
        key = serialization.load_pem_private_key(
            self.to('pem'),
            password=None,
            backend=default_backend())

        subject = x509.Name([
            x509.NameAttribute(NameOID.COUNTRY_NAME, country),
            x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, state),
            x509.NameAttribute(NameOID.LOCALITY_NAME, city),
            x509.NameAttribute(NameOID.ORGANIZATION_NAME, company),
            x509.NameAttribute(NameOID.COMMON_NAME, common_name),
        ])
        cert = x509.CertificateSigningRequestBuilder().subject_name(
            subject
        ).sign(key, hashes.SHA256(), default_backend())
        return cert.public_bytes(serialization.Encoding.PEM)
项目:trustme    作者:python-trio    | 项目源码 | 文件源码
def _name(name):
    return x509.Name([
        x509.NameAttribute(NameOID.ORGANIZATION_NAME,
                           u"trustme v{}".format(__version__)),
        x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, name),
    ])
项目:slack_scholar    作者:xLeitix    | 项目源码 | 文件源码
def _decode_x509_name(backend, x509_name):
    count = backend._lib.X509_NAME_entry_count(x509_name)
    attributes = []
    for x in range(count):
        entry = backend._lib.X509_NAME_get_entry(x509_name, x)
        attributes.append(_decode_x509_name_entry(backend, entry))

    return x509.Name(attributes)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def testInspectDistinguishedNameWithoutAllFields(self):
        n = sslverify.DN(localityName=b'locality name')
        s = n.inspect()
        for k in [
            'common name',
            'organization name',
            'organizational unit name',
            'state or province name',
            'country name',
            'email address']:
            self.assertNotIn(k, s, "%r was in inspect output." % (k,))
            self.assertNotIn(k.title(), s, "%r was in inspect output." % (k,))
        self.assertIn('locality name', s)
        self.assertIn('Locality Name', s)
项目:aapns    作者:HDE    | 项目源码 | 文件源码
def gen_certificate(key: rsa.RSAPrivateKey,
                    common_name: str,
                    *,
                    issuer: Optional[str]=None,
                    sign_key: Optional[rsa.RSAPrivateKey]=None) -> x509.Certificate:
    now = datetime.datetime.utcnow()
    return x509.CertificateBuilder().subject_name(
        x509.Name([
            x509.NameAttribute(NameOID.COMMON_NAME, common_name),
        ])
    ).issuer_name(
        x509.Name([
            x509.NameAttribute(
                NameOID.COMMON_NAME,
                issuer or common_name
            )
        ])
    ).not_valid_before(
        now
    ).not_valid_after(
        now + datetime.timedelta(seconds=86400)
    ).serial_number(
        x509.random_serial_number()
    ).public_key(
        key.public_key()
    ).add_extension(
        x509.BasicConstraints(ca=True, path_length=0), critical=True
    ).sign(
        private_key=sign_key or key,
        algorithm=hashes.SHA256(),
        backend=BACKEND
    )
项目:rvmi-rekall    作者:fireeye    | 项目源码 | 文件源码
def MakeCASignedCert(common_name,
                     private_key,
                     ca_cert,
                     ca_private_key,
                     serial_number=2,
                     session=None):
    """Make a cert and sign it with the CA's private key."""
    public_key = private_key.public_key()
    builder = x509.CertificateBuilder()

    builder = builder.issuer_name(ca_cert.get_issuer())

    subject = x509.Name([
            x509.NameAttribute(oid.NameOID.COMMON_NAME, common_name)
    ])
    builder = builder.subject_name(subject)

    valid_from = time.time() - 60 * 60 * 24
    valid_until = time.time() + 60 * 60 * 24 * 365 * 10
    builder = builder.not_valid_before(datetime.datetime.fromtimestamp(
        valid_from))
    builder = builder.not_valid_after(datetime.datetime.fromtimestamp(
        valid_until))

    builder = builder.serial_number(serial_number)
    builder = builder.public_key(public_key.get_raw_key())

    builder = builder.add_extension(
            x509.BasicConstraints(
                    ca=False, path_length=None), critical=True)
    certificate = builder.sign(
            private_key=ca_private_key.get_raw_key(),
            algorithm=hashes.SHA256(),
            backend=openssl.backend)

    return X509Ceritifcate(session=session).from_raw_key(certificate)
项目:Docker-XX-Net    作者:kuanghy    | 项目源码 | 文件源码
def _decode_x509_name(backend, x509_name):
    count = backend._lib.X509_NAME_entry_count(x509_name)
    attributes = []
    for x in range(count):
        entry = backend._lib.X509_NAME_get_entry(x509_name, x)
        attributes.append(_decode_x509_name_entry(backend, entry))

    return x509.Name(attributes)
项目:txacme    作者:twisted    | 项目源码 | 文件源码
def _generate_ca_cert(self):
        """
        Generate a CA cert/key.
        """
        if self._ca_key is None:
            self._ca_key = generate_private_key(u'rsa')
        self._ca_name = x509.Name([
            x509.NameAttribute(NameOID.COMMON_NAME, u'ACME Snake Oil CA')])
        self._ca_cert = (
            x509.CertificateBuilder()
            .subject_name(self._ca_name)
            .issuer_name(self._ca_name)
            .not_valid_before(self._now() - timedelta(seconds=3600))
            .not_valid_after(self._now() + timedelta(days=3650))
            .public_key(self._ca_key.public_key())
            .serial_number(int(uuid4()))
            .add_extension(
                x509.BasicConstraints(ca=True, path_length=0),
                critical=True)
            .add_extension(
                x509.SubjectKeyIdentifier.from_public_key(
                    self._ca_key.public_key()),
                critical=False)
            .sign(
                private_key=self._ca_key,
                algorithm=hashes.SHA256(),
                backend=default_backend()))
        self._ca_aki = x509.AuthorityKeyIdentifier.from_issuer_public_key(
            self._ca_key.public_key())
项目:txacme    作者:twisted    | 项目源码 | 文件源码
def test_common_name_too_long(self):
        """
        If the first name provided is too long, `~txacme.util.csr_for_names`
        uses a dummy value for the common name.
        """
        self.assertThat(
            csr_for_names([u'aaaa.' * 16], RSA_KEY_512_RAW),
            MatchesStructure(
                subject=Equals(x509.Name([
                    x509.NameAttribute(
                        NameOID.COMMON_NAME,
                        u'san.too.long.invalid')]))))
项目:txacme    作者:twisted    | 项目源码 | 文件源码
def generate_tls_sni_01_cert(server_name, key_type=u'rsa',
                             _generate_private_key=None):
    """
    Generate a certificate/key pair for responding to a tls-sni-01 challenge.

    :param str server_name: The SAN the certificate should have.
    :param str key_type: The type of key to generate; usually not necessary.

    :rtype: ``Tuple[`~cryptography.x509.Certificate`, PrivateKey]``
    :return: A tuple of the certificate and private key.
    """
    key = (_generate_private_key or generate_private_key)(key_type)
    name = x509.Name([
        x509.NameAttribute(NameOID.COMMON_NAME, u'acme.invalid')])
    cert = (
        x509.CertificateBuilder()
        .subject_name(name)
        .issuer_name(name)
        .not_valid_before(datetime.now() - timedelta(seconds=3600))
        .not_valid_after(datetime.now() + timedelta(seconds=3600))
        .serial_number(int(uuid.uuid4()))
        .public_key(key.public_key())
        .add_extension(
            x509.SubjectAlternativeName([x509.DNSName(server_name)]),
            critical=False)
        .sign(
            private_key=key,
            algorithm=hashes.SHA256(),
            backend=default_backend())
        )
    return (cert, key)
项目:txacme    作者:twisted    | 项目源码 | 文件源码
def csr_for_names(names, key):
    """
    Generate a certificate signing request for the given names and private key.

    ..  seealso:: `acme.client.Client.request_issuance`

    ..  seealso:: `generate_private_key`

    :param ``List[str]``: One or more names (subjectAltName) for which to
        request a certificate.
    :param key: A Cryptography private key object.

    :rtype: `cryptography.x509.CertificateSigningRequest`
    :return: The certificate request message.
    """
    if len(names) == 0:
        raise ValueError('Must have at least one name')
    if len(names[0]) > 64:
        common_name = u'san.too.long.invalid'
    else:
        common_name = names[0]
    return (
        x509.CertificateSigningRequestBuilder()
        .subject_name(x509.Name([
            x509.NameAttribute(NameOID.COMMON_NAME, common_name)]))
        .add_extension(
            x509.SubjectAlternativeName(list(map(x509.DNSName, names))),
            critical=False)
        .sign(key, hashes.SHA256(), default_backend()))
项目:PyQYT    作者:collinsctk    | 项目源码 | 文件源码
def _decode_x509_name(backend, x509_name):
    count = backend._lib.X509_NAME_entry_count(x509_name)
    attributes = []
    for x in range(count):
        entry = backend._lib.X509_NAME_get_entry(x509_name, x)
        attributes.append(_decode_x509_name_entry(backend, entry))

    return x509.Name(attributes)
项目:IoT_pki    作者:zibawa    | 项目源码 | 文件源码
def build_crl():
#from cryptography import x509
#    from cryptography.hazmat.backends import default_backend
#from cryptography.hazmat.primitives import hashes
#    from cryptography.hazmat.primitives.asymmetric import rsa
#from cryptography.x509.oid import NameOID
#import datetime
    ca=get_newest_ca()
    one_day = datetime.timedelta(1, 0, 0)

    builder = x509.CertificateRevocationListBuilder()
    builder = builder.issuer_name(x509.Name([
        x509.NameAttribute(NameOID.COMMON_NAME,ca.common_name),
        ]))
    builder = builder.last_update(datetime.datetime.today())
    builder = builder.next_update(datetime.datetime.today() + one_day)

    revoked_list=Certificate.objects.filter(issuer_serial_number=ca.serial_number,revoked=True)


    for revoked_cert in revoked_list:
        logger.debug("revoked serial_number: %s",revoked_cert.serial_number)
        revoked_cert = x509.RevokedCertificateBuilder().serial_number(int(revoked_cert.serial_number)
                 ).revocation_date(
                     datetime.datetime.today()
                     ).build(default_backend())
        builder = builder.add_revoked_certificate(revoked_cert)

    crl = builder.sign(
            private_key=loadPEMKey(keyStorePath(ca.serial_number)), algorithm=hashes.SHA256(),
            backend=default_backend()
            )

    dataStream=crl.public_bytes(serialization.Encoding.PEM)

    return dataStream
项目:perkele    作者:schors    | 项目源码 | 文件源码
def create_csr(key, domains):
    """
    Creates a CSR in DER format for the specified key and domain names.
    """
    assert domains
    name = x509.Name([
        x509.NameAttribute(NameOID.COMMON_NAME, domains[0]),
    ])
    san = x509.SubjectAlternativeName([x509.DNSName(domain) for domain in domains])
    csr = x509.CertificateSigningRequestBuilder().subject_name(name) \
        .add_extension(san, critical=False) \
        .sign(key, hashes.SHA256(), default_backend())
    return export_csr_for_acme(csr)