Python ldap 模块,set_option() 实例源码

我们从Python开源项目中,提取了以下8个代码示例,用于说明如何使用ldap.set_option()

项目:auth-tool    作者:luciddg    | 项目源码 | 文件源码
def _ldap_connection(self):
        """
        Context manager for ldap connections
        """
        if self.no_verify:
            ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT,
                            ldap.OPT_X_TLS_NEVER)

        ldap_cxn = ldap.initialize('{0}'.format(self.uri))
        ldap_cxn.protocol_version = 3
        ldap_cxn.set_option(ldap.OPT_REFERRALS, 0)

        if self.tls and not self.uri.startswith('ldaps'):
            ldap_cxn.start_tls_s()

        yield ldap_cxn
项目:isam-ansible-roles    作者:IBM-Security    | 项目源码 | 文件源码
def _connect_to_ldap(self):
        ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
        connection = ldap.initialize(self.server_uri)

        if self.start_tls:
            try:
                connection.start_tls_s()
            except ldap.LDAPError:
                e = get_exception()
                self.module.fail_json(msg="Cannot start TLS.", details=str(e))

        try:
            if self.bind_dn is not None:
                connection.simple_bind_s(self.bind_dn, self.bind_pw)
            else:
                connection.sasl_interactive_bind_s('', ldap.sasl.external())
        except ldap.LDAPError:
            e = get_exception()
            self.module.fail_json(
                msg="Cannot bind to the server.", details=str(e))

        return connection
项目:rucio    作者:rucio01    | 项目源码 | 文件源码
def initiate_ldap():
    """
    contact the LDAP server to return a LDAP object
    """
    ldap_schemes = ['ldap://', 'ldaps://']
    ldap.set_option(ldap.OPT_DEBUG_LEVEL, 0)
    ldap.set_option(ldap.OPT_X_TLS_CACERTDIR, config.get('ldap', 'cacertdir'))
    ldap.set_option(ldap.OPT_X_TLS_CERTFILE, config.get('ldap', 'certfile'))
    ldap.set_option(ldap.OPT_X_TLS_KEYFILE, config.get('ldap', 'keyfile'))
    ldap.set_option(ldap.OPT_X_TLS_DEMAND, True)
    ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND)  # TRY, NEVER, DEMAND
    ldap.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
    for scheme in ldap_schemes:
        ldap_url = scheme + server_url
        ldap_obj = ldap.initialize(ldap_url)
        try:
            ldap_obj.start_tls_s()
        except ldap.OPERATIONS_ERROR as e:
            e_msg = e[0]['info']
            if e_msg == 'TLS already started':
                pass
            else:
                raise
        except ldap.SERVER_DOWN:
            if scheme is not ldap_schemes[-1]:
                continue
            else:
                raise
        if login_dn != 'DEFAULT':  # Use anonymous bind if login_dn is set as DEFAULT
            ldap_obj.bind(login_dn, password, ldap.AUTH_SIMPLE)
        else:
            try:
                ldap_obj.whoami_s()
            except ldap.UNWILLING_TO_PERFORM:
                print 'Anonymous binding is disabled by server'
                raise SystemExit
        return ldap_obj
        break
项目:iris    作者:linkedin    | 项目源码 | 文件源码
def ldap_auth(self, username, password):
        ldap.set_option(ldap.OPT_X_TLS_CACERTFILE, self.cert_path)
        connection = ldap.initialize(self.ldap_url)
        connection.set_option(ldap.OPT_REFERRALS, 0)

        try:
            if password:
                connection.simple_bind_s(username + self.user_suffix, password)
            else:
                return False
        except ldap.INVALID_CREDENTIALS:
            return False
        except ldap.SERVER_DOWN:
            return None
        return True
项目:pyldap_orm    作者:asyd    | 项目源码 | 文件源码
def __init__(self, backend, mode=PLAIN,
                 cert=None,
                 key=None,
                 cacertdir='/etc/ssl/certs',
                 ):

        self.backend = backend
        self._server = None
        self._schema = {}
        self._cert = cert
        self._key = key

        logger.debug("LDAP _session created, id: {}".format(id(self)))

        # Switch to LDAPS mode if ldaps is backend start with 'ldaps'
        if 'ldaps' == backend[:5].lower():
            mode = self.LDAPS

        # Set CACERTDIR and REQUIRED_CERT to TLS_DEMAND (validation required) if needed
        if mode in (self.STARTTLS, self.LDAPS) and cacertdir is not None:
            ldap.set_option(ldap.OPT_X_TLS_CACERTDIR, cacertdir)
            ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND)

        if cacertdir is None:
            warnings.warn("You are in INSECURE mode", ImportWarning, stacklevel=2)
            ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)

        # Set client certificate if both cert and key are provided
        if cert is not None and key is not None:
            if not os.path.isfile(cert):
                raise LDAPSessionException("Certificate file {} does not exist".format(cert))
            if not os.path.isfile(key):
                raise LDAPSessionException("Certificate key file {} does not exist".format(cert))
            ldap.set_option(ldap.OPT_X_TLS_CERTFILE, cert)
            ldap.set_option(ldap.OPT_X_TLS_KEYFILE, key)

        self._server = ldap.initialize(self.backend, bytes_mode=False)

        # Proceed STARTTLS
        if mode == self.STARTTLS:
            self._server.start_tls_s()
项目:useradm    作者:redbrick    | 项目源码 | 文件源码
def connect(self,
                uri=rbconfig.LDAP_URI,
                dn=rbconfig.LDAP_ROOT_DN,
                password=None,
                dcu_uri=rbconfig.LDAP_DCU_URI,
                dcu_dn=rbconfig.LDAP_DCU_RBDN,
                dcu_pw=None):
        """Connect to databases.
        Custom URI, DN and password may be given for RedBrick LDAP.
        Password if not given will be read from shared secret file set
        in rbconfig.
        Custom URI may be given for DCU LDAP. """
        if not password:
            try:
                pw_file = open(rbconfig.LDAP_ROOTPW_FILE, 'r')
                password = pw_file.readline().rstrip()
            except IOError:
                raise RBFatalError("Unable to open LDAP root password file")
            pw_file.close()

        if not dcu_pw:
            try:
                pw_file = open(rbconfig.LDAP_DCU_RBPW, 'r')
                dcu_pw = pw_file.readline().rstrip()
            except IOError:
                raise RBFatalError("Unable to open DCU AD root password file")
            pw_file.close()

        # Default protocol seems to be 2, set to 3.
        ldap.set_option(ldap.OPT_PROTOCOL_VERSION, 3)

        # Connect to RedBrick LDAP.
        self.ldap = ldap.initialize(uri)
        self.ldap.simple_bind_s(dn, password)

        # Connect to DCU LDAP (anonymous bind).
        self.ldap_dcu = ldap.initialize(dcu_uri)
        #       self.ldap_dcu.simple_bind_s('', '')
        self.ldap_dcu.simple_bind_s(dcu_dn, dcu_pw)
项目:django-adldap-sync    作者:marchete    | 项目源码 | 文件源码
def ldap_search(self, filter, attributes, incremental, incremental_filter):
        """
        Query the configured LDAP server with the provided search filter and
        attribute list.
        """
        for uri in self.conf_LDAP_SYNC_BIND_URI:
            #Read record of this uri
            if (self.working_uri == uri):
                adldap_sync = self.working_adldap_sync
                created = False
            else:
                adldap_sync, created = ADldap_Sync.objects.get_or_create(ldap_sync_uri=uri)

            if ((adldap_sync.syncs_to_full > 0) and incremental):
                filter_to_use = incremental_filter.replace('?', self.whenchanged.strftime(self.conf_LDAP_SYNC_INCREMENTAL_TIMESTAMPFORMAT))
                logger.debug("Using an incremental search. Filter is:'%s'" % filter_to_use)
            else:
                filter_to_use = filter

            ldap.set_option(ldap.OPT_REFERRALS, 0)
            #ldap.set_option(ldap.OPT_NETWORK_TIMEOUT, 10)
            l = PagedLDAPObject(uri)
            l.protocol_version = 3

            if (uri.startswith('ldaps:')):
                l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_DEMAND)
                l.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND)
                l.set_option(ldap.OPT_X_TLS_DEMAND, True)
            else:
                l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_NEVER)
                l.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
                l.set_option(ldap.OPT_X_TLS_DEMAND, False)
            try:
                l.simple_bind_s(self.conf_LDAP_SYNC_BIND_DN, self.conf_LDAP_SYNC_BIND_PASS)
            except ldap.LDAPError as e:
                logger.error("Error connecting to LDAP server %s : %s" % (uri, e))
                continue

            results = l.paged_search_ext_s(self.conf_LDAP_SYNC_BIND_SEARCH, ldap.SCOPE_SUBTREE, filter_to_use, attrlist=attributes, serverctrls=None)
            l.unbind_s()
            if (self.working_uri is None):
                self.working_uri = uri
                self.conf_LDAP_SYNC_BIND_URI.insert(0, uri)
                self.working_adldap_sync = adldap_sync

            return (uri, results)  # Return both the LDAP server URI used and the request. This is for incremental sync purposes
        #if not connected correctly, raise error
        raise
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def open_ldap():
    """
    Returns a freshly made LDAP object, according to the settings
    configured in webfront.conf.
    """
    # Get config settings
    server = _config.get('ldap', 'server')
    port = _config.getint('ldap', 'port')
    encryption = _config.get('ldap', 'encryption').lower()
    timeout = _config.getfloat('ldap', 'timeout')
    # Revert to no encryption if none of the valid settings are found
    if encryption not in ('ssl', 'tls', 'none'):
        _logger.warning('Unknown encryption setting %r in config file, '
                        'using no encryption instead',
                        _config.get('ldap', 'encryption'))
        encryption = 'none'

    # Debug tracing from python-ldap/openldap to stderr
    if _config.getboolean('ldap', 'debug'):
        ldap.set_option(ldap.OPT_DEBUG_LEVEL, 255)

    # Use STARTTLS if enabled, then fail miserably if the server
    # does not support it
    if encryption == 'tls':
        _logger.debug("Using STARTTLS for ldap connection")
        lconn = ldap.open(server, port)
        lconn.timeout = timeout
        try:
            lconn.start_tls_s()
        except ldap.PROTOCOL_ERROR:
            _logger.error('LDAP server %s does not support the STARTTLS '
                          'extension.  Aborting.', server)
            raise NoStartTlsError(server)
        except (ldap.SERVER_DOWN, ldap.CONNECT_ERROR):
            _logger.exception("LDAP server is down")
            raise NoAnswerError(server)
    else:
        scheme = encryption == 'ssl' and 'ldaps' or 'ldap'
        uri = '%s://%s:%s' % (scheme, server, port)
        lconn = ldap.initialize(uri)
        lconn.timeout = timeout

    return lconn