Python ldap 模块,OPT_X_TLS_DEMAND 实例源码

我们从Python开源项目中,提取了以下3个代码示例,用于说明如何使用ldap.OPT_X_TLS_DEMAND

项目:rucio    作者:rucio01    | 项目源码 | 文件源码
def initiate_ldap():
    """
    contact the LDAP server to return a LDAP object
    """
    ldap_schemes = ['ldap://', 'ldaps://']
    ldap.set_option(ldap.OPT_DEBUG_LEVEL, 0)
    ldap.set_option(ldap.OPT_X_TLS_CACERTDIR, config.get('ldap', 'cacertdir'))
    ldap.set_option(ldap.OPT_X_TLS_CERTFILE, config.get('ldap', 'certfile'))
    ldap.set_option(ldap.OPT_X_TLS_KEYFILE, config.get('ldap', 'keyfile'))
    ldap.set_option(ldap.OPT_X_TLS_DEMAND, True)
    ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND)  # TRY, NEVER, DEMAND
    ldap.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
    for scheme in ldap_schemes:
        ldap_url = scheme + server_url
        ldap_obj = ldap.initialize(ldap_url)
        try:
            ldap_obj.start_tls_s()
        except ldap.OPERATIONS_ERROR as e:
            e_msg = e[0]['info']
            if e_msg == 'TLS already started':
                pass
            else:
                raise
        except ldap.SERVER_DOWN:
            if scheme is not ldap_schemes[-1]:
                continue
            else:
                raise
        if login_dn != 'DEFAULT':  # Use anonymous bind if login_dn is set as DEFAULT
            ldap_obj.bind(login_dn, password, ldap.AUTH_SIMPLE)
        else:
            try:
                ldap_obj.whoami_s()
            except ldap.UNWILLING_TO_PERFORM:
                print 'Anonymous binding is disabled by server'
                raise SystemExit
        return ldap_obj
        break
项目:pyldap_orm    作者:asyd    | 项目源码 | 文件源码
def __init__(self, backend, mode=PLAIN,
                 cert=None,
                 key=None,
                 cacertdir='/etc/ssl/certs',
                 ):

        self.backend = backend
        self._server = None
        self._schema = {}
        self._cert = cert
        self._key = key

        logger.debug("LDAP _session created, id: {}".format(id(self)))

        # Switch to LDAPS mode if ldaps is backend start with 'ldaps'
        if 'ldaps' == backend[:5].lower():
            mode = self.LDAPS

        # Set CACERTDIR and REQUIRED_CERT to TLS_DEMAND (validation required) if needed
        if mode in (self.STARTTLS, self.LDAPS) and cacertdir is not None:
            ldap.set_option(ldap.OPT_X_TLS_CACERTDIR, cacertdir)
            ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND)

        if cacertdir is None:
            warnings.warn("You are in INSECURE mode", ImportWarning, stacklevel=2)
            ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)

        # Set client certificate if both cert and key are provided
        if cert is not None and key is not None:
            if not os.path.isfile(cert):
                raise LDAPSessionException("Certificate file {} does not exist".format(cert))
            if not os.path.isfile(key):
                raise LDAPSessionException("Certificate key file {} does not exist".format(cert))
            ldap.set_option(ldap.OPT_X_TLS_CERTFILE, cert)
            ldap.set_option(ldap.OPT_X_TLS_KEYFILE, key)

        self._server = ldap.initialize(self.backend, bytes_mode=False)

        # Proceed STARTTLS
        if mode == self.STARTTLS:
            self._server.start_tls_s()
项目:django-adldap-sync    作者:marchete    | 项目源码 | 文件源码
def ldap_search(self, filter, attributes, incremental, incremental_filter):
        """
        Query the configured LDAP server with the provided search filter and
        attribute list.
        """
        for uri in self.conf_LDAP_SYNC_BIND_URI:
            #Read record of this uri
            if (self.working_uri == uri):
                adldap_sync = self.working_adldap_sync
                created = False
            else:
                adldap_sync, created = ADldap_Sync.objects.get_or_create(ldap_sync_uri=uri)

            if ((adldap_sync.syncs_to_full > 0) and incremental):
                filter_to_use = incremental_filter.replace('?', self.whenchanged.strftime(self.conf_LDAP_SYNC_INCREMENTAL_TIMESTAMPFORMAT))
                logger.debug("Using an incremental search. Filter is:'%s'" % filter_to_use)
            else:
                filter_to_use = filter

            ldap.set_option(ldap.OPT_REFERRALS, 0)
            #ldap.set_option(ldap.OPT_NETWORK_TIMEOUT, 10)
            l = PagedLDAPObject(uri)
            l.protocol_version = 3

            if (uri.startswith('ldaps:')):
                l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_DEMAND)
                l.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND)
                l.set_option(ldap.OPT_X_TLS_DEMAND, True)
            else:
                l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_NEVER)
                l.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
                l.set_option(ldap.OPT_X_TLS_DEMAND, False)
            try:
                l.simple_bind_s(self.conf_LDAP_SYNC_BIND_DN, self.conf_LDAP_SYNC_BIND_PASS)
            except ldap.LDAPError as e:
                logger.error("Error connecting to LDAP server %s : %s" % (uri, e))
                continue

            results = l.paged_search_ext_s(self.conf_LDAP_SYNC_BIND_SEARCH, ldap.SCOPE_SUBTREE, filter_to_use, attrlist=attributes, serverctrls=None)
            l.unbind_s()
            if (self.working_uri is None):
                self.working_uri = uri
                self.conf_LDAP_SYNC_BIND_URI.insert(0, uri)
                self.working_adldap_sync = adldap_sync

            return (uri, results)  # Return both the LDAP server URI used and the request. This is for incremental sync purposes
        #if not connected correctly, raise error
        raise