Python ldap 模块,SCOPE_ONELEVEL 实例源码

我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用ldap.SCOPE_ONELEVEL

项目:ipa_check_consistency    作者:peterpakos    | 项目源码 | 文件源码
def _replication_agreements(self):
        self._log.debug('Checking for replication agreements...')
        msg = []
        healthy = True
        suffix = self._base_dn.replace('=', '\\3D').replace(',', '\\2C')
        results = self._search(
            'cn=replica,cn=%s,cn=mapping tree,cn=config' % suffix,
            '(objectClass=*)',
            ['nsDS5ReplicaHost', 'nsds5replicaLastUpdateStatus'],
            scope=ldap.SCOPE_ONELEVEL
        )

        for result in results:
            dn, attrs = result
            host = attrs['nsDS5ReplicaHost'][0].decode('utf-8')
            host = host.partition('.')[0]
            status = attrs['nsds5replicaLastUpdateStatus'][0].decode('utf-8')
            status = status.replace('Error ', '').partition(' ')[0].strip('()')
            if status != '0':
                healthy = False
            msg.append('%s %s' % (host, status))

        return '\n'.join(msg), healthy
项目:useradm    作者:redbrick    | 项目源码 | 文件源码
def check_userfree(self, uid):
        """Check if a username is free.
        If username is already used or is an LDAP group, an
        RBFatalError is raised. If the username is in the additional
        reserved LDAP tree, an RBWarningError is raised and checked if
        it is to be overridden. """
        res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
                                 ldap.SCOPE_ONELEVEL, 'uid=%s' % uid)
        if res:
            raise RBFatalError(
                "Username '%s' is already taken by %s account (%s)" %
                (uid, res[0][1]['objectClass'][0].decode(),
                 res[0][1]['cn'][0].decode()))
        res = self.ldap.search_s(rbconfig.ldap_group_tree, ldap.SCOPE_ONELEVEL,
                                 'cn=%s' % uid)
        if res:
            raise RBFatalError("Username '%s' is reserved (LDAP Group)" % uid)
        res = self.ldap.search_s(rbconfig.ldap_reserved_tree,
                                 ldap.SCOPE_ONELEVEL, 'uid=%s' % uid)
        if res:
            self.rberror(
                RBWarningError("Username '%s' is reserved (%s)" % (uid, res[0][
                    1]['description'][0].decode())))
项目:useradm    作者:redbrick    | 项目源码 | 文件源码
def list_pre_sync(self):
        """Return dictionary of all users for useradm pre_sync() dump."""

        res = self.ldap.search_s(
            rbconfig.ldap_accounts_tree, ldap.SCOPE_ONELEVEL,
            'objectClass=posixAccount', ('uid', 'homeDirectory',
                                         'objectClass'))
        tmp = {}
        for data in res:
            for i in data['objectClass']:
                i = i.decode()
                if i in rbconfig.usertypes:
                    break
            else:
                raise RBFatalError(
                    "Unknown usertype for user '%s'" % data['uid'][0])

            tmp[data['uid'][0]] = {
                'homeDirectory': data['homeDirectory'][0],
                'usertype': data['uid'][0]
            }
        return tmp
项目:useradm    作者:redbrick    | 项目源码 | 文件源码
def uidNumber_findmax(self):
        """Return highest uidNumber found in LDAP accounts tree.
        This is only used to create the uidNumber file, the
        uidNumber_readnext() function should be used for getting the
        next available uidNumber."""

        res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
                                 ldap.SCOPE_ONELEVEL,
                                 'objectClass=posixAccount', ('uidNumber', ))

        maxuid = -1
        for i in res:
            tmp = int(i[1]['uidNumber'][0])
            if tmp > maxuid:
                maxuid = tmp

        return maxuid
项目:auth-tool    作者:luciddg    | 项目源码 | 文件源码
def _search(self, filterstr, attrlist=None):
        """
        A wrapper for the `LDAPObject.search_s` functionality.

        Perform an LDAP search operation, starting at the configured base DN.
        The filterstr argument is a string representation of the filter to
        apply in the search.    The retrieved attributes can be limited with the
        attrlist parameter.  If attrlist is None, all the attributes of each
        entry are returned.
        """
        with self._ldap_connection() as ldap_cxn:
            results = ldap_cxn.search_s(self.base_dn, ldap.SCOPE_ONELEVEL, filterstr, attrlist)
        return results
项目:ipa_check_consistency    作者:peterpakos    | 项目源码 | 文件源码
def _count_hbac_rules(self):
        self._log.debug('Counting HBAC rules...')
        results = self._search(
            'cn=hbac,%s' % self._base_dn,
            '(ipaUniqueID=*)',
            scope=ldap.SCOPE_ONELEVEL
        )
        return len(results)
项目:ipa_check_consistency    作者:peterpakos    | 项目源码 | 文件源码
def _count_sudo_rules(self):
        self._log.debug('Counting SUDO rules...')
        results = self._search(
            'cn=sudorules,cn=sudo,%s' % self._base_dn,
            '(ipaUniqueID=*)',
            scope=ldap.SCOPE_ONELEVEL
        )
        return len(results)
项目:ipa_check_consistency    作者:peterpakos    | 项目源码 | 文件源码
def _count_dns_zones(self):
        self._log.debug('Counting DNS zones...')
        results = self._search(
            'cn=dns,%s' % self._base_dn,
            '(|(objectClass=idnszone)(objectClass=idnsforwardzone))',
            scope=ldap.SCOPE_ONELEVEL
        )
        return len(results)
项目:ipa_check_consistency    作者:peterpakos    | 项目源码 | 文件源码
def _count_certificates(self):
        self._log.debug('Counting certificates...')
        try:
            results = self._search(
                'ou=certificateRepository,ou=ca,o=ipaca',
                '(certStatus=*)',
                scope=ldap.SCOPE_ONELEVEL
            )
        except ldap.NO_SUCH_OBJECT:
            return 'N/A'
        n = len(results)
        return n
项目:useradm    作者:redbrick    | 项目源码 | 文件源码
def check_user_byname(self, uid):
        """Raise RBFatalError if given username does not exist in user
        database."""
        if not self.ldap.search_s(rbconfig.ldap_accounts_tree,
                                  ldap.SCOPE_ONELEVEL, 'uid=%s' % uid):
            raise RBFatalError("User '%s' does not exist" % uid)
项目:useradm    作者:redbrick    | 项目源码 | 文件源码
def check_user_byid(self, user_id):
        """Raise RBFatalError if given id does not belong to a user in
        user database."""
        if not self.ldap.search_s(rbconfig.ldap_accounts_tree,
                                  ldap.SCOPE_ONELEVEL, 'id=%s' % user_id):
            raise RBFatalError("User with id '%s' does not exist" % user_id)
项目:useradm    作者:redbrick    | 项目源码 | 文件源码
def check_group_byname(self, group):
        """Raise RBFatalError if given group does not exist in group
        database."""
        if not self.ldap.search_s(rbconfig.ldap_group_tree,
                                  ldap.SCOPE_ONELEVEL, 'cn=%s' % group):
            raise RBFatalError("Group '%s' does not exist" % group)
项目:useradm    作者:redbrick    | 项目源码 | 文件源码
def check_group_byid(self, gid):
        """Raise RBFatalError if given id does not belong to a group in
        group database."""
        if not self.ldap.search_s(rbconfig.ldap_group_tree,
                                  ldap.SCOPE_ONELEVEL, 'gidNumber=%s' % gid):
            raise RBFatalError("Group with id '%s' does not exist" % gid)

    # ------------------------------------------------------------------- #
    # INFORMATION RETRIEVAL METHODS                                       #
    # ------------------------------------------------------------------- #

    # fixme still needed ?

    # def get_usertype_byname(self, uid):
    #     """Return usertype for username in user database. Raise
    #     RBFatalError if user does not exist."""
    #     res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
    #                              ldap.SCOPE_ONELEVEL, 'uid=%s' % usr.uid,
    #                              ('objectClass', ))
    #     if res:
    #         for i in res[0][1]['objectClass']:
    #             if i in rbconfig.usertypes:
    #                 return i
    #             else:
    #                raise RBFatalError("Unknown usertype for user '%s'" % uid)
    #         else:
    #             raise RBFatalError("User '%s' does not exist" % uid)
项目:useradm    作者:redbrick    | 项目源码 | 文件源码
def get_user_byid(self, usr):
        """Populate RBUser object with data from user with given id in
        user database. Raise RBFatalError if user does not exist."""
        res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
                                 ldap.SCOPE_ONELEVEL, 'id=%s' % usr.id)
        if res:
            self.set_user(usr, res[0])
        else:
            raise RBFatalError("User with id '%s' does not exist" % usr.id)
项目:useradm    作者:redbrick    | 项目源码 | 文件源码
def get_dummyid(self, usr):
        """Set usr.id to unique 'dummy' DCU ID number."""
        raise RBFatalError('NOT YET IMPLEMENTED')
        res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
                                 ldap.SCOPE_ONELEVEL,
                                 '(&(id>=10000000)(id<20000000))"' % (usr.uid))
        if res:
            usr.id = int(res[0][1]['id'][0]) + 1
        else:
            usr.id = 10000000
项目:useradm    作者:redbrick    | 项目源码 | 文件源码
def get_gid_byname(self, group):
        """Get gid for given group name.
        Raise RBFatalError if given name does not belong to a group in
        group database."""
        res = self.ldap.search_s(rbconfig.ldap_group_tree, ldap.SCOPE_ONELEVEL,
                                 'cn=%s' % group)
        if res:
            return int(res[0][1]['gidNumber'][0])
        else:
            raise RBFatalError("Group '%s' does not exist" % group)
项目:useradm    作者:redbrick    | 项目源码 | 文件源码
def get_group_byid(self, gid):
        """Get group name for given group ID.
        Raise RBFatalError if given id does not belong to a group in
        group database."""
        res = self.ldap.search_s(rbconfig.ldap_group_tree, ldap.SCOPE_ONELEVEL,
                                 'gidNumber=%s' % gid)
        if res:
            return res[0][1]['cn'][0]
        else:
            raise RBFatalError("Group with id '%s' does not exist" % gid)
项目:useradm    作者:redbrick    | 项目源码 | 文件源码
def list_paid_newbies(self):
        """Return list of all paid newbie usernames."""
        res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
                                 ldap.SCOPE_ONELEVEL,
                                 '(&(yearsPaid>=1)(newbie=TRUE))', ('uid', ))
        return [data['uid'][0] for dn, data in res]
项目:useradm    作者:redbrick    | 项目源码 | 文件源码
def list_paid_non_newbies(self):
        """Return list of all paid renewal (non-newbie) usernames."""
        res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
                                 ldap.SCOPE_ONELEVEL,
                                 '(&(yearsPaid>=1)(newbie=FALSE))', ('uid', ))
        return [data['uid'][0] for dn, data in res]
项目:useradm    作者:redbrick    | 项目源码 | 文件源码
def list_non_newbies(self):
        """Return list of all non newbie usernames."""
        res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
                                 ldap.SCOPE_ONELEVEL, 'newbie=FALSE',
                                 ('uid', ))
        return [data['uid'][0] for dn, data in res]
项目:useradm    作者:redbrick    | 项目源码 | 文件源码
def list_newbies(self):
        """Return list of all newbie usernames."""
        res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
                                 ldap.SCOPE_ONELEVEL, 'newbie=TRUE', ('uid', ))
        return [data['uid'][0] for dn, data in res]
项目:useradm    作者:redbrick    | 项目源码 | 文件源码
def list_groups(self):
        """Return list of all groups."""
        res = self.ldap.search_s(rbconfig.ldap_group_tree, ldap.SCOPE_ONELEVEL,
                                 'objectClass=posixGroup', ('cn', ))
        return [data['cn'][0] for dn, data in res]
项目:useradm    作者:redbrick    | 项目源码 | 文件源码
def list_reserved_static(self):
        """Return list of all static reserved names."""
        res = self.ldap.search_s(
            rbconfig.ldap_reserved_tree, ldap.SCOPE_ONELEVEL,
            '(&(objectClass=reserved)(flag=static))', ('uid', ))
        return [data['uid'][0] for dn, data in res]
项目:useradm    作者:redbrick    | 项目源码 | 文件源码
def list_reserved_dynamic(self):
        """Return list of all dynamic reserved names."""
        res = self.ldap.search_s(
            rbconfig.ldap_reserved_tree, ldap.SCOPE_ONELEVEL,
            '(&(objectClass=reserved)(!(flag=static)))', ('uid', ))
        return [data['uid'][0] for dn, data in res]
项目:useradm    作者:redbrick    | 项目源码 | 文件源码
def list_unpaid(self):
        """Return list of all non-renewed users."""
        res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
                                 ldap.SCOPE_ONELEVEL, 'yearsPaid<=0',
                                 ('uid', ))
        return [data['uid'][0] for dn, data in res]
项目:useradm    作者:redbrick    | 项目源码 | 文件源码
def list_unpaid_normal(self):
        """Return list of all normal non-renewed users."""
        res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
                                 ldap.SCOPE_ONELEVEL, 'yearsPaid=0', ('uid', ))
        return [data['uid'][0] for dn, data in res]
项目:useradm    作者:redbrick    | 项目源码 | 文件源码
def list_unpaid_grace(self):
        """Return list of all grace non-renewed users."""
        res = self.ldap.search_s(rbconfig.ldap_accounts_tree,
                                 ldap.SCOPE_ONELEVEL, 'yearsPaid<=-1',
                                 ('uid', ))
        return [data['uid'][0] for dn, data in res]
项目:useradm    作者:redbrick    | 项目源码 | 文件源码
def dict_reserved_desc(self):
        """Return dictionary of all reserved entries with their
        description."""
        res = self.ldap.search_s(rbconfig.ldap_reserved_tree,
                                 ldap.SCOPE_ONELEVEL, 'objectClass=reserved',
                                 ('uid', 'description'))
        tmp = {}
        for data in res:
            tmp[data['uid'][0]] = data['description'][0]
        return tmp
项目:useradm    作者:redbrick    | 项目源码 | 文件源码
def dict_reserved_static(self):
        """Return dictionary of all static reserved entries with their
        description."""
        res = self.ldap.search_s(
            rbconfig.ldap_reserved_tree, ldap.SCOPE_ONELEVEL,
            '(&(objectClass=reserved)(flag=static))', ('uid', 'description'))
        tmp = {}
        for data in res:
            tmp[data['uid'][0]] = data['description'][0]
        return tmp

    # -------------------------------- #
    # METHODS RETURNING SEARCH RESULTS #
    # -------------------------------- #
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def execute(self):
        args = self.args
        # we can connect in 2 ways. By hostname/ip (and portnumber)
        # or by ldap-uri
        if "url" in args and ldapurl.isLDAPUrl(args["url"]):
            conn = ldap.initialize(args["url"])
        else:
            ip, port = self.get_address()
            conn = ldap.initialize("ldap://%s:%s" % (ip, port))
        username = args.get("username", "")
        password = args.get("password", "")
        conn.simple_bind(username, password)

        try:
            self._set_version(args, conn)
        except ValueError:
            return Event.DOWN, "unsupported protocol version"

        base = args.get("base", "dc=example,dc=org")
        if base == "cn=monitor":
            my_res = conn.search_st(base, ldap.SCOPE_BASE,
                                    timeout=self.timeout)
            versionstr = str(my_res[0][-1]['description'][0])
            self.version = versionstr
            return Event.UP, versionstr
        scope = args.get("scope", "SUBTREE").upper()
        if scope == "BASE":
            scope = ldap.SCOPE_BASE
        elif scope == "ONELEVEL":
            scope = ldap.SCOPE_ONELEVEL
        else:
            scope = ldap.SCOPE_SUBTREE
        filtr = args.get("filter", "objectClass=*")
        try:
            conn.search_ext_s(base, scope, filterstr=filtr,
                              timeout=self.timeout)
            # pylint: disable=W0703
        except Exception as err:
            return (Event.DOWN,
                    "Failed ldapSearch on %s for %s: %s" % (
                        self.get_address(), filtr, str(err)))

        conn.unbind()

        return Event.UP, "Ok"