Python errno 模块,ELOOP 实例源码

我们从Python开源项目中,提取了以下5个代码示例,用于说明如何使用errno.ELOOP

项目:isar    作者:ilbers    | 项目源码 | 文件源码
def __realpath(file, root, loop_cnt, assume_dir):
    while os.path.islink(file) and len(file) >= len(root):
        if loop_cnt == 0:
            raise OSError(errno.ELOOP, file)

        loop_cnt -= 1
        target = os.path.normpath(os.readlink(file))

        if not os.path.isabs(target):
            tdir = os.path.dirname(file)
            assert(__is_path_below(tdir, root))
        else:
            tdir = root

        file = __realpath_rel(tdir, target, root, loop_cnt, assume_dir)

    try:
        is_dir = os.path.isdir(file)
    except:
        is_dir = false

    return (file, is_dir)
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def open_image_file(root, path, flag, mode=None):
        """Safely open files that ensures that the path we'are accessing resides
        within a specified image root.

        'root' is a directory that the path must reside in.
        """

        try:
                return os.fdopen(os.open(path, flag|os.O_NOFOLLOW, mode))
        except EnvironmentError as e:
                if e.errno != errno.ELOOP:
                        # Access to protected member; pylint: disable=W0212
                        raise api_errors._convert_error(e)
        # If it is a symbolic link, fall back to ar_open. ar_open interprets
        # 'path' as relative to 'root', that is, 'root' will be prepended to
        # 'path', so we need to call os.path.relpath here.
        from pkg.altroot import ar_open
        return os.fdopen(ar_open(root, os.path.relpath(path, root), flag, mode))
项目:isar    作者:ilbers    | 项目源码 | 文件源码
def realpath(file, root, use_physdir = True, loop_cnt = 100, assume_dir = False):
    """ Returns the canonical path of 'file' with assuming a
    toplevel 'root' directory. When 'use_physdir' is set, all
    preceding path components of 'file' will be resolved first;
    this flag should be set unless it is guaranteed that there is
    no symlink in the path. When 'assume_dir' is not set, missing
    path components will raise an ENOENT error"""

    root = os.path.normpath(root)
    file = os.path.normpath(file)

    if not root.endswith(os.path.sep):
        # letting root end with '/' makes some things easier
        root = root + os.path.sep

    if not __is_path_below(file, root):
        raise OSError(errno.EINVAL, "file '%s' is not below root" % file)

    try:
        if use_physdir:
            file = __realpath_rel(root, file[(len(root) - 1):], root, loop_cnt, assume_dir)
        else:
            file = __realpath(file, root, loop_cnt, assume_dir)[0]
    except OSError as e:
        if e.errno == errno.ELOOP:
            # make ELOOP more readable; without catching it, there will
            # be printed a backtrace with 100s of OSError exceptions
            # else
            raise OSError(errno.ELOOP,
                          "too much recursions while resolving '%s'; loop in '%s'" %
                          (file, e.strerror))

        raise

    return file
项目:rdiff-backup    作者:sol1    | 项目源码 | 文件源码
def read_from_rp(self, rp):
        """Set the extended attributes from an rpath"""
        try:
            attr_list = rp.conn.xattr.listxattr(encode(rp.path),
                                                rp.issym())
        except IOError, exc:
            if exc[0] in (errno.EOPNOTSUPP, errno.EPERM, errno.ETXTBSY):
                return # if not supported, consider empty
            if exc[0] in (errno.EACCES, errno.ENOENT, errno.ELOOP):
                log.Log("Warning: listattr(%s): %s" % (repr(rp.path), exc), 4)
                return
            raise
        for attr in attr_list:
            if attr.startswith('system.'):
                # Do not preserve system extended attributes
                continue
            if not rp.isdir() and attr == 'com.apple.ResourceFork':
                # Resource Fork handled elsewhere, except for directories
                continue
            try:
                self.attr_dict[attr] = \
                    rp.conn.xattr.getxattr(encode(rp.path),
                                            attr, rp.issym())
            except IOError, exc:
                # File probably modified while reading, just continue
                if exc[0] == errno.ENODATA: continue
                elif exc[0] == errno.ENOENT: break
                # Handle bug in pyxattr < 0.2.2
                elif exc[0] == errno.ERANGE: continue
                else: raise
项目:capidup    作者:israel-lugo    | 项目源码 | 文件源码
def filter_visited(curr_dir, subdirs, already_visited, follow_dirlinks, on_error):
    """Filter subdirs that have already been visited.

    This is used to avoid loops in the search performed by os.walk() in
    index_files_by_size.

    curr_dir is the path of the current directory, as returned by os.walk().

    subdirs is the list of subdirectories for the current directory, as
    returned by os.walk().

    already_visited is a set of tuples (st_dev, st_ino) of already
    visited directories. This set will not be modified.

    on error is a function f(OSError) -> None, to be called in case of
    error.

    Returns a tuple: the new (possibly filtered) subdirs list, and a new
    set of already visited directories, now including the subdirs.

    """
    filtered = []
    to_visit = set()
    _already_visited = already_visited.copy()

    try:
        # mark the current directory as visited, so we catch symlinks to it
        # immediately instead of after one iteration of the directory loop
        file_info = os.stat(curr_dir) if follow_dirlinks else os.lstat(curr_dir)
        _already_visited.add((file_info.st_dev, file_info.st_ino))
    except OSError as e:
        on_error(e)

    for subdir in subdirs:
        full_path = os.path.join(curr_dir, subdir)
        try:
            file_info = os.stat(full_path) if follow_dirlinks else os.lstat(full_path)
        except OSError as e:
            on_error(e)
            continue

        if not follow_dirlinks and stat.S_ISLNK(file_info.st_mode):
            # following links to dirs is disabled, ignore this one
            continue

        dev_inode = (file_info.st_dev, file_info.st_ino)
        if dev_inode not in _already_visited:
            filtered.append(subdir)
            to_visit.add(dev_inode)
        else:
            on_error(OSError(errno.ELOOP, "directory loop detected", full_path))

    return filtered, _already_visited.union(to_visit)