Python stat 模块,S_ISCHR 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用stat.S_ISCHR

项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def __hashEntry(self, prefix, entry, s):
        if stat.S_ISREG(s.st_mode):
            digest = self.__index.check(prefix, entry, s, hashFile)
        elif stat.S_ISDIR(s.st_mode):
            digest = self.__hashDir(prefix, entry)
        elif stat.S_ISLNK(s.st_mode):
            digest = self.__index.check(prefix, entry, s, DirHasher.__hashLink)
        elif stat.S_ISBLK(s.st_mode) or stat.S_ISCHR(s.st_mode):
            digest = struct.pack("<L", s.st_rdev)
        elif stat.S_ISFIFO(s.st_mode):
            digest = b''
        else:
            digest = b''
            logging.getLogger(__name__).warning("Unknown file: %s", entry)

        return digest
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def __init__(self, devname=None):
        if devname is None:
            self.name = "/dev/urandom"
        else:
            self.name = devname

        # Test that /dev/urandom is a character special device
        f = open(self.name, "rb", 0)
        fmode = os.fstat(f.fileno())[stat.ST_MODE]
        if not stat.S_ISCHR(fmode):
            f.close()
            raise TypeError("%r is not a character special device" % (self.name,))

        self.__file = f

        BaseRNG.__init__(self)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
        super().__init__(extra)
        self._extra['pipe'] = pipe
        self._loop = loop
        self._pipe = pipe
        self._fileno = pipe.fileno()
        mode = os.fstat(self._fileno).st_mode
        if not (stat.S_ISFIFO(mode) or
                stat.S_ISSOCK(mode) or
                stat.S_ISCHR(mode)):
            raise ValueError("Pipe transport is for pipes/sockets only.")
        _set_nonblocking(self._fileno)
        self._protocol = protocol
        self._closing = False
        self._loop.call_soon(self._protocol.connection_made, self)
        # only start reading when connection_made() has been called
        self._loop.call_soon(self._loop.add_reader,
                             self._fileno, self._read_ready)
        if waiter is not None:
            # only wake up the waiter when connection_made() has been called
            self._loop.call_soon(waiter._set_result_unless_cancelled, None)
项目:watchmen    作者:lycclsltt    | 项目源码 | 文件源码
def __init__(self, devname=None):
        if devname is None:
            self.name = "/dev/urandom"
        else:
            self.name = devname

        # Test that /dev/urandom is a character special device
        f = open(self.name, "rb", 0)
        fmode = os.fstat(f.fileno())[stat.ST_MODE]
        if not stat.S_ISCHR(fmode):
            f.close()
            raise TypeError("%r is not a character special device" % (self.name,))

        self.__file = f

        BaseRNG.__init__(self)
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def __init__(self, devname=None):
        if devname is None:
            self.name = "/dev/urandom"
        else:
            self.name = devname

        # Test that /dev/urandom is a character special device
        f = open(self.name, "rb", 0)
        fmode = os.fstat(f.fileno())[stat.ST_MODE]
        if not stat.S_ISCHR(fmode):
            f.close()
            raise TypeError("%r is not a character special device" % (self.name,))

        self.__file = f

        BaseRNG.__init__(self)
项目:AshsSDK    作者:thehappydinoa    | 项目源码 | 文件源码
def is_special_file(path):
    """
    This function checks to see if a special file.  It checks if the
    file is a character special device, block special device, FIFO, or
    socket. 
    """
    mode = os.stat(path).st_mode
    # Character special device.
    if stat.S_ISCHR(mode):
        return True
    # Block special device
    if stat.S_ISBLK(mode):
        return True
    # FIFO.
    if stat.S_ISFIFO(mode):
        return True
    # Socket.
    if stat.S_ISSOCK(mode):
        return True
    return False
项目:git_intgrtn_aws_s3    作者:droidlabour    | 项目源码 | 文件源码
def __init__(self, devname=None):
        if devname is None:
            self.name = "/dev/urandom"
        else:
            self.name = devname

        # Test that /dev/urandom is a character special device
        f = open(self.name, "rb", 0)
        fmode = os.fstat(f.fileno())[stat.ST_MODE]
        if not stat.S_ISCHR(fmode):
            f.close()
            raise TypeError("%r is not a character special device" % (self.name,))

        self.__file = f

        BaseRNG.__init__(self)
项目:MCSManager-fsmodule    作者:Suwings    | 项目源码 | 文件源码
def __init__(self, devname=None):
        if devname is None:
            self.name = "/dev/urandom"
        else:
            self.name = devname

        # Test that /dev/urandom is a character special device
        f = open(self.name, "rb", 0)
        fmode = os.fstat(f.fileno())[stat.ST_MODE]
        if not stat.S_ISCHR(fmode):
            f.close()
            raise TypeError("%r is not a character special device" % (self.name,))

        self.__file = f

        BaseRNG.__init__(self)
项目:PyMal    作者:cysinfo    | 项目源码 | 文件源码
def __init__(self, devname=None):
        if devname is None:
            self.name = "/dev/urandom"
        else:
            self.name = devname

        # Test that /dev/urandom is a character special device
        f = open(self.name, "rb", 0)
        fmode = os.fstat(f.fileno())[stat.ST_MODE]
        if not stat.S_ISCHR(fmode):
            f.close()
            raise TypeError("%r is not a character special device" % (self.name,))

        self.__file = f

        BaseRNG.__init__(self)
项目:SublimeRemoteGDB    作者:summerwinter    | 项目源码 | 文件源码
def __init__(self, devname=None):
        if devname is None:
            self.name = "/dev/urandom"
        else:
            self.name = devname

        # Test that /dev/urandom is a character special device
        f = open(self.name, "rb", 0)
        fmode = os.fstat(f.fileno())[stat.ST_MODE]
        if not stat.S_ISCHR(fmode):
            f.close()
            raise TypeError("%r is not a character special device" % (self.name,))

        self.__file = f

        BaseRNG.__init__(self)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_write_full(self):
        devfull = '/dev/full'
        if not (os.path.exists(devfull) and
                stat.S_ISCHR(os.stat(devfull).st_mode)):
            # Issue #21934: OpenBSD does not have a /dev/full character device
            self.skipTest('requires %r' % devfull)
        with open(devfull, 'wb', 1) as f:
            with self.assertRaises(IOError):
                f.write('hello\n')
        with open(devfull, 'wb', 1) as f:
            with self.assertRaises(IOError):
                # Issue #17976
                f.write('hello')
                f.write('\n')
        with open(devfull, 'wb', 0) as f:
            with self.assertRaises(IOError):
                f.write('h')
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_write_full(self):
        devfull = '/dev/full'
        if not (os.path.exists(devfull) and
                stat.S_ISCHR(os.stat(devfull).st_mode)):
            # Issue #21934: OpenBSD does not have a /dev/full character device
            self.skipTest('requires %r' % devfull)
        with open(devfull, 'wb', 1) as f:
            with self.assertRaises(IOError):
                f.write('hello\n')
        with open(devfull, 'wb', 1) as f:
            with self.assertRaises(IOError):
                # Issue #17976
                f.write('hello')
                f.write('\n')
        with open(devfull, 'wb', 0) as f:
            with self.assertRaises(IOError):
                f.write('h')
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def __init__(self, devname=None):
        if devname is None:
            self.name = "/dev/urandom"
        else:
            self.name = devname

        # Test that /dev/urandom is a character special device
        f = open(self.name, "rb", 0)
        fmode = os.fstat(f.fileno())[stat.ST_MODE]
        if not stat.S_ISCHR(fmode):
            f.close()
            raise TypeError("%r is not a character special device" % (self.name,))

        self.__file = f

        BaseRNG.__init__(self)
项目:Encryped-file-system    作者:kittenish    | 项目源码 | 文件源码
def __init__(self, devname=None):
        if devname is None:
            self.name = "/dev/urandom"
        else:
            self.name = devname

        # Test that /dev/urandom is a character special device
        f = open(self.name, "rb", 0)
        fmode = os.fstat(f.fileno())[stat.ST_MODE]
        if not stat.S_ISCHR(fmode):
            f.close()
            raise TypeError("%r is not a character special device" % (self.name,))

        self.__file = f

        BaseRNG.__init__(self)
项目:isf    作者:w3h    | 项目源码 | 文件源码
def __init__(self, devname=None):
        if devname is None:
            self.name = "/dev/urandom"
        else:
            self.name = devname

        # Test that /dev/urandom is a character special device
        f = open(self.name, "rb", 0)
        fmode = os.fstat(f.fileno())[stat.ST_MODE]
        if not stat.S_ISCHR(fmode):
            f.close()
            raise TypeError("%r is not a character special device" % (self.name,))

        self.__file = f

        BaseRNG.__init__(self)
项目:kekescan    作者:xiaoxiaoleo    | 项目源码 | 文件源码
def __init__(self, devname=None):
        if devname is None:
            self.name = "/dev/urandom"
        else:
            self.name = devname

        # Test that /dev/urandom is a character special device
        f = open(self.name, "rb", 0)
        fmode = os.fstat(f.fileno())[stat.ST_MODE]
        if not stat.S_ISCHR(fmode):
            f.close()
            raise TypeError("%r is not a character special device" % (self.name,))

        self.__file = f

        BaseRNG.__init__(self)
项目:Repobot    作者:Desgard    | 项目源码 | 文件源码
def __init__(self, devname=None):
        if devname is None:
            self.name = "/dev/urandom"
        else:
            self.name = devname

        # Test that /dev/urandom is a character special device
        f = open(self.name, "rb", 0)
        fmode = os.fstat(f.fileno())[stat.ST_MODE]
        if not stat.S_ISCHR(fmode):
            f.close()
            raise TypeError("%r is not a character special device" % (self.name,))

        self.__file = f

        BaseRNG.__init__(self)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
        super().__init__(extra)
        self._extra['pipe'] = pipe
        self._loop = loop
        self._pipe = pipe
        self._fileno = pipe.fileno()
        mode = os.fstat(self._fileno).st_mode
        if not (stat.S_ISFIFO(mode) or
                stat.S_ISSOCK(mode) or
                stat.S_ISCHR(mode)):
            raise ValueError("Pipe transport is for pipes/sockets only.")
        _set_nonblocking(self._fileno)
        self._protocol = protocol
        self._closing = False
        self._loop.call_soon(self._protocol.connection_made, self)
        # only start reading when connection_made() has been called
        self._loop.call_soon(self._loop.add_reader,
                             self._fileno, self._read_ready)
        if waiter is not None:
            # only wake up the waiter when connection_made() has been called
            self._loop.call_soon(futures._set_result_unless_cancelled,
                                 waiter, None)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
        super().__init__(extra)
        self._extra['pipe'] = pipe
        self._loop = loop
        self._pipe = pipe
        self._fileno = pipe.fileno()
        mode = os.fstat(self._fileno).st_mode
        if not (stat.S_ISFIFO(mode) or
                stat.S_ISSOCK(mode) or
                stat.S_ISCHR(mode)):
            raise ValueError("Pipe transport is for pipes/sockets only.")
        _set_nonblocking(self._fileno)
        self._protocol = protocol
        self._closing = False
        self._loop.add_reader(self._fileno, self._read_ready)
        self._loop.call_soon(self._protocol.connection_made, self)
        if waiter is not None:
            # wait until protocol.connection_made() has been called
            self._loop.call_soon(waiter._set_result_unless_cancelled, None)
项目:hachoir3    作者:vstinner    | 项目源码 | 文件源码
def _ftypelet(mode):
    if stat.S_ISREG(mode) or not stat.S_IFMT(mode):
        return '-'
    if stat.S_ISBLK(mode):
        return 'b'
    if stat.S_ISCHR(mode):
        return 'c'
    if stat.S_ISDIR(mode):
        return 'd'
    if stat.S_ISFIFO(mode):
        return 'p'
    if stat.S_ISLNK(mode):
        return 'l'
    if stat.S_ISSOCK(mode):
        return 's'
    return '?'
项目:PyQYT    作者:collinsctk    | 项目源码 | 文件源码
def __init__(self, devname=None):
        if devname is None:
            self.name = "/dev/urandom"
        else:
            self.name = devname

        # Test that /dev/urandom is a character special device
        f = open(self.name, "rb", 0)
        fmode = os.fstat(f.fileno())[stat.ST_MODE]
        if not stat.S_ISCHR(fmode):
            f.close()
            raise TypeError("%r is not a character special device" % (self.name,))

        self.__file = f

        BaseRNG.__init__(self)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
        super().__init__(extra, loop)
        self._extra['pipe'] = pipe
        self._pipe = pipe
        self._fileno = pipe.fileno()
        mode = os.fstat(self._fileno).st_mode
        is_socket = stat.S_ISSOCK(mode)
        if not (is_socket or
                stat.S_ISFIFO(mode) or
                stat.S_ISCHR(mode)):
            raise ValueError("Pipe transport is only for "
                             "pipes, sockets and character devices")
        _set_nonblocking(self._fileno)
        self._protocol = protocol
        self._buffer = []
        self._conn_lost = 0
        self._closing = False  # Set when close() or write_eof() called.

        self._loop.call_soon(self._protocol.connection_made, self)

        # On AIX, the reader trick (to be notified when the read end of the
        # socket is closed) only works for sockets. On other platforms it
        # works for pipes and sockets. (Exception: OS X 10.4?  Issue #19294.)
        if is_socket or not sys.platform.startswith("aix"):
            # only start reading when connection_made() has been called
            self._loop.call_soon(self._loop.add_reader,
                                 self._fileno, self._read_ready)

        if waiter is not None:
            # only wake up the waiter when connection_made() has been called
            self._loop.call_soon(waiter._set_result_unless_cancelled, None)
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def is_special_file(cls, filename):
        """Checks to see if a file is a special UNIX file.

        It checks if the file is a character special device, block special
        device, FIFO, or socket.

        :param filename: Name of the file

        :returns: True if the file is a special file. False, if is not.
        """
        # If it does not exist, it must be a new file so it cannot be
        # a special file.
        if not os.path.exists(filename):
            return False
        mode = os.stat(filename).st_mode
        # Character special device.
        if stat.S_ISCHR(mode):
            return True
        # Block special device
        if stat.S_ISBLK(mode):
            return True
        # Named pipe / FIFO
        if stat.S_ISFIFO(mode):
            return True
        # Socket.
        if stat.S_ISSOCK(mode):
            return True
        return False
项目:crypto-detector    作者:Wind-River    | 项目源码 | 文件源码
def ischr(self):
        return stat.S_ISCHR(self.mode)
项目:crypto-detector    作者:Wind-River    | 项目源码 | 文件源码
def isdev(self):
        return (stat.S_ISCHR(self.mode) or stat.S_ISBLK(self.mode))
# class CpioInfo
项目:AshsSDK    作者:thehappydinoa    | 项目源码 | 文件源码
def is_special_file(cls, filename):
        """Checks to see if a file is a special UNIX file.

        It checks if the file is a character special device, block special
        device, FIFO, or socket.

        :param filename: Name of the file

        :returns: True if the file is a special file. False, if is not.
        """
        # If it does not exist, it must be a new file so it cannot be
        # a special file.
        if not os.path.exists(filename):
            return False
        mode = os.stat(filename).st_mode
        # Character special device.
        if stat.S_ISCHR(mode):
            return True
        # Block special device
        if stat.S_ISBLK(mode):
            return True
        # Named pipe / FIFO
        if stat.S_ISFIFO(mode):
            return True
        # Socket.
        if stat.S_ISSOCK(mode):
            return True
        return False
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def statinfo(st):
    return {
        'mode'     : "%04o" % stat.S_IMODE(st.st_mode),
        'isdir'    : stat.S_ISDIR(st.st_mode),
        'ischr'    : stat.S_ISCHR(st.st_mode),
        'isblk'    : stat.S_ISBLK(st.st_mode),
        'isreg'    : stat.S_ISREG(st.st_mode),
        'isfifo'   : stat.S_ISFIFO(st.st_mode),
        'islnk'    : stat.S_ISLNK(st.st_mode),
        'issock'   : stat.S_ISSOCK(st.st_mode),
        'uid'      : st.st_uid,
        'gid'      : st.st_gid,
        'size'     : st.st_size,
        'inode'    : st.st_ino,
        'dev'      : st.st_dev,
        'nlink'    : st.st_nlink,
        'atime'    : st.st_atime,
        'mtime'    : st.st_mtime,
        'ctime'    : st.st_ctime,
        'wusr'     : bool(st.st_mode & stat.S_IWUSR),
        'rusr'     : bool(st.st_mode & stat.S_IRUSR),
        'xusr'     : bool(st.st_mode & stat.S_IXUSR),
        'wgrp'     : bool(st.st_mode & stat.S_IWGRP),
        'rgrp'     : bool(st.st_mode & stat.S_IRGRP),
        'xgrp'     : bool(st.st_mode & stat.S_IXGRP),
        'woth'     : bool(st.st_mode & stat.S_IWOTH),
        'roth'     : bool(st.st_mode & stat.S_IROTH),
        'xoth'     : bool(st.st_mode & stat.S_IXOTH),
        'isuid'    : bool(st.st_mode & stat.S_ISUID),
        'isgid'    : bool(st.st_mode & stat.S_ISGID),
    }
项目:aws-ec2rescue-linux    作者:awslabs    | 项目源码 | 文件源码
def is_special_file(cls, filename):
        """Checks to see if a file is a special UNIX file.

        It checks if the file is a character special device, block special
        device, FIFO, or socket.

        :param filename: Name of the file

        :returns: True if the file is a special file. False, if is not.
        """
        # If it does not exist, it must be a new file so it cannot be
        # a special file.
        if not os.path.exists(filename):
            return False
        mode = os.stat(filename).st_mode
        # Character special device.
        if stat.S_ISCHR(mode):
            return True
        # Block special device
        if stat.S_ISBLK(mode):
            return True
        # Named pipe / FIFO
        if stat.S_ISFIFO(mode):
            return True
        # Socket.
        if stat.S_ISSOCK(mode):
            return True
        return False
项目:click-configfile    作者:click-contrib    | 项目源码 | 文件源码
def is_char_device(self):
        """
        Whether this path is a character device.
        """
        try:
            return S_ISCHR(self.stat().st_mode)
        except OSError as e:
            if e.errno != ENOENT:
                raise
            # Path doesn't exist or is a broken symlink
            # (see https://bitbucket.org/pitrou/pathlib/issue/12/)
            return False
项目:PRET    作者:RUB-NDS    | 项目源码 | 文件源码
def open(self, target):
    # target is a character device
    if os.path.exists(target) \
      and stat.S_ISCHR(os.stat(target).st_mode):
        self._file = os.open(target, os.O_RDWR)
    # treat target as ipv4 socket
    else: self._sock.connect((target, 9100))

  # close connection
项目:golightan    作者:shirou    | 项目源码 | 文件源码
def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
        super().__init__(extra)
        self._extra['pipe'] = pipe
        self._loop = loop
        self._pipe = pipe
        self._fileno = pipe.fileno()
        self._protocol = protocol
        self._closing = False

        mode = os.fstat(self._fileno).st_mode
        if not (stat.S_ISFIFO(mode) or
                stat.S_ISSOCK(mode) or
                stat.S_ISCHR(mode)):
            self._pipe = None
            self._fileno = None
            self._protocol = None
            raise ValueError("Pipe transport is for pipes/sockets only.")

        _set_nonblocking(self._fileno)

        self._loop.call_soon(self._protocol.connection_made, self)
        # only start reading when connection_made() has been called
        self._loop.call_soon(self._loop._add_reader,
                             self._fileno, self._read_ready)
        if waiter is not None:
            # only wake up the waiter when connection_made() has been called
            self._loop.call_soon(futures._set_result_unless_cancelled,
                                 waiter, None)
项目:golightan    作者:shirou    | 项目源码 | 文件源码
def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
        super().__init__(extra, loop)
        self._extra['pipe'] = pipe
        self._pipe = pipe
        self._fileno = pipe.fileno()
        self._protocol = protocol
        self._buffer = bytearray()
        self._conn_lost = 0
        self._closing = False  # Set when close() or write_eof() called.

        mode = os.fstat(self._fileno).st_mode
        is_char = stat.S_ISCHR(mode)
        is_fifo = stat.S_ISFIFO(mode)
        is_socket = stat.S_ISSOCK(mode)
        if not (is_char or is_fifo or is_socket):
            self._pipe = None
            self._fileno = None
            self._protocol = None
            raise ValueError("Pipe transport is only for "
                             "pipes, sockets and character devices")

        _set_nonblocking(self._fileno)
        self._loop.call_soon(self._protocol.connection_made, self)

        # On AIX, the reader trick (to be notified when the read end of the
        # socket is closed) only works for sockets. On other platforms it
        # works for pipes and sockets. (Exception: OS X 10.4?  Issue #19294.)
        if is_socket or (is_fifo and not sys.platform.startswith("aix")):
            # only start reading when connection_made() has been called
            self._loop.call_soon(self._loop._add_reader,
                                 self._fileno, self._read_ready)

        if waiter is not None:
            # only wake up the waiter when connection_made() has been called
            self._loop.call_soon(futures._set_result_unless_cancelled,
                                 waiter, None)
项目:click-configfile    作者:jenisys    | 项目源码 | 文件源码
def is_char_device(self):
        """
        Whether this path is a character device.
        """
        try:
            return S_ISCHR(self.stat().st_mode)
        except OSError as e:
            if e.errno != ENOENT:
                raise
            # Path doesn't exist or is a broken symlink
            # (see https://bitbucket.org/pitrou/pathlib/issue/12/)
            return False
项目:flux_line_bot    作者:blesscat    | 项目源码 | 文件源码
def is_serial_port(filepath):
    if os.path.exists(filepath):
        st_mode = os.stat(filepath).st_mode
        return S_ISCHR(st_mode)
    else:
        return False
项目:tfhfs    作者:fingon    | 项目源码 | 文件源码
def mknod(self, parent_inode, name, mode, rdev, ctx):
        assert self._initialized
        fd, a = self.create(parent_inode, name, mode,
                            os.O_TRUNC | os.O_CREAT, ctx)
        inode = self.forest.inodes.get_by_value(a.st_ino)
        if stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
            inode.direntry.set_data('st_rdev', rdev)
        self.release(fd)
        return self._inode_attributes(inode)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
        super().__init__(extra, loop)
        self._extra['pipe'] = pipe
        self._pipe = pipe
        self._fileno = pipe.fileno()
        mode = os.fstat(self._fileno).st_mode
        is_socket = stat.S_ISSOCK(mode)
        if not (is_socket or
                stat.S_ISFIFO(mode) or
                stat.S_ISCHR(mode)):
            raise ValueError("Pipe transport is only for "
                             "pipes, sockets and character devices")
        _set_nonblocking(self._fileno)
        self._protocol = protocol
        self._buffer = []
        self._conn_lost = 0
        self._closing = False  # Set when close() or write_eof() called.

        self._loop.call_soon(self._protocol.connection_made, self)

        # On AIX, the reader trick (to be notified when the read end of the
        # socket is closed) only works for sockets. On other platforms it
        # works for pipes and sockets. (Exception: OS X 10.4?  Issue #19294.)
        if is_socket or not sys.platform.startswith("aix"):
            # only start reading when connection_made() has been called
            self._loop.call_soon(self._loop.add_reader,
                                 self._fileno, self._read_ready)

        if waiter is not None:
            # only wake up the waiter when connection_made() has been called
            self._loop.call_soon(futures._set_result_unless_cancelled,
                                 waiter, None)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def is_char_device(self):
        """
        Whether this path is a character device.
        """
        try:
            return S_ISCHR(self.stat().st_mode)
        except OSError as e:
            if e.errno not in (ENOENT, ENOTDIR):
                raise
            # Path doesn't exist or is a broken symlink
            # (see https://bitbucket.org/pitrou/pathlib/issue/12/)
            return False
项目:jepsen-training-vpc    作者:bloomberg    | 项目源码 | 文件源码
def is_special_file(cls, filename):
        """Checks to see if a file is a special UNIX file.

        It checks if the file is a character special device, block special
        device, FIFO, or socket.

        :param filename: Name of the file

        :returns: True if the file is a special file. False, if is not.
        """
        # If it does not exist, it must be a new file so it cannot be
        # a special file.
        if not os.path.exists(filename):
            return False
        mode = os.stat(filename).st_mode
        # Character special device.
        if stat.S_ISCHR(mode):
            return True
        # Block special device
        if stat.S_ISBLK(mode):
            return True
        # Named pipe / FIFO
        if stat.S_ISFIFO(mode):
            return True
        # Socket.
        if stat.S_ISSOCK(mode):
            return True
        return False
项目:alicloud-duplicity    作者:aliyun    | 项目源码 | 文件源码
def set_from_stat(self):
        """Set the value of self.type, self.mode from self.stat"""
        if not self.stat:
            self.type = None

        st_mode = self.stat.st_mode
        if stat.S_ISREG(st_mode):
            self.type = "reg"
        elif stat.S_ISDIR(st_mode):
            self.type = "dir"
        elif stat.S_ISLNK(st_mode):
            self.type = "sym"
        elif stat.S_ISFIFO(st_mode):
            self.type = "fifo"
        elif stat.S_ISSOCK(st_mode):
            raise PathException(util.ufn(self.get_relative_path()) +
                                u"is a socket, unsupported by tar")
            self.type = "sock"
        elif stat.S_ISCHR(st_mode):
            self.type = "chr"
        elif stat.S_ISBLK(st_mode):
            self.type = "blk"
        else:
            raise PathException("Unknown type")

        self.mode = stat.S_IMODE(st_mode)
        if self.type in ("chr", "blk"):
            try:
                self.devnums = (os.major(self.stat.st_rdev),
                                os.minor(self.stat.st_rdev))
            except:
                log.Warn(_("Warning: %s invalid devnums (0x%X), treating as (0, 0).")
                         % (util.ufn(self.get_relative_path()), self.stat.st_rdev))
                self.devnums = (0, 0)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
        super().__init__(extra)
        self._extra['pipe'] = pipe
        self._loop = loop
        self._pipe = pipe
        self._fileno = pipe.fileno()
        mode = os.fstat(self._fileno).st_mode
        is_socket = stat.S_ISSOCK(mode)
        if not (is_socket or
                stat.S_ISFIFO(mode) or
                stat.S_ISCHR(mode)):
            raise ValueError("Pipe transport is only for "
                             "pipes, sockets and character devices")
        _set_nonblocking(self._fileno)
        self._protocol = protocol
        self._buffer = []
        self._conn_lost = 0
        self._closing = False  # Set when close() or write_eof() called.

        # On AIX, the reader trick only works for sockets.
        # On other platforms it works for pipes and sockets.
        # (Exception: OS X 10.4?  Issue #19294.)
        if is_socket or not sys.platform.startswith("aix"):
            self._loop.add_reader(self._fileno, self._read_ready)

        self._loop.call_soon(self._protocol.connection_made, self)
        if waiter is not None:
            # wait until protocol.connection_made() has been called
            self._loop.call_soon(waiter._set_result_unless_cancelled, None)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def is_char_device(self):
        """
        Whether this path is a character device.
        """
        try:
            return S_ISCHR(self.stat().st_mode)
        except OSError as e:
            if e.errno != ENOENT:
                raise
            # Path doesn't exist or is a broken symlink
            # (see https://bitbucket.org/pitrou/pathlib/issue/12/)
            return False
项目:s3transfer    作者:boto    | 项目源码 | 文件源码
def is_special_file(cls, filename):
        """Checks to see if a file is a special UNIX file.

        It checks if the file is a character special device, block special
        device, FIFO, or socket.

        :param filename: Name of the file

        :returns: True if the file is a special file. False, if is not.
        """
        # If it does not exist, it must be a new file so it cannot be
        # a special file.
        if not os.path.exists(filename):
            return False
        mode = os.stat(filename).st_mode
        # Character special device.
        if stat.S_ISCHR(mode):
            return True
        # Block special device
        if stat.S_ISBLK(mode):
            return True
        # Named pipe / FIFO
        if stat.S_ISFIFO(mode):
            return True
        # Socket.
        if stat.S_ISSOCK(mode):
            return True
        return False
项目:obnam    作者:obnam-mirror    | 项目源码 | 文件源码
def show_item_kdirstat(self, gen_id, filename):
        mode = self.repo.get_file_key(
            gen_id, filename, obnamlib.REPO_FILE_MODE)
        size = self.repo.get_file_key(
            gen_id, filename, obnamlib.REPO_FILE_SIZE)
        mtime_sec = self.repo.get_file_key(
            gen_id, filename, obnamlib.REPO_FILE_MTIME_SEC)

        if stat.S_ISREG(mode):
            mode_str = "F\t"
        elif stat.S_ISDIR(mode):
            mode_str = "D "
        elif stat.S_ISLNK(mode):
            mode_str = "L\t"
        elif stat.S_ISBLK(mode):
            mode_str = "BlockDev\t"
        elif stat.S_ISCHR(mode):
            mode_str = "CharDev\t"
        elif stat.S_ISFIFO(mode):
            mode_str = "FIFO\t"
        elif stat.S_ISSOCK(mode):
            mode_str = "Socket\t"
        else:
            # Unhandled, make it look like a comment
            mode_str = "#UNHANDLED\t"

        enc_filename = filename.replace("%", "%25")
        enc_filename = enc_filename.replace(" ", "%20")
        enc_filename = enc_filename.replace("\t", "%09")

        self.app.output.write(
            "%s%s\t%d\t%#x\n" %
            (mode_str, enc_filename, size, mtime_sec))
项目:obnam    作者:obnam-mirror    | 项目源码 | 文件源码
def restore_first_link(self, gen, filename, metadata):
        if stat.S_ISREG(metadata.st_mode):
            self.restore_regular_file(gen, filename, metadata)
        elif stat.S_ISFIFO(metadata.st_mode):
            self.restore_fifo(gen, filename, metadata)
        elif stat.S_ISSOCK(metadata.st_mode):
            self.restore_socket(gen, filename, metadata)
        elif stat.S_ISBLK(metadata.st_mode) or stat.S_ISCHR(metadata.st_mode):
            self.restore_device(gen, filename, metadata)
        else:
            msg = ('Unknown file type: %s (%o)' %
                   (filename, metadata.st_mode))
            logging.error(msg)
            self.app.ts.notify(msg)
项目:diffoscope    作者:ReproducibleBuilds    | 项目源码 | 文件源码
def is_device(self):
        mode = os.lstat(self._name).st_mode
        return stat.S_ISCHR(mode) or stat.S_ISBLK(mode)
项目:diffoscope    作者:ReproducibleBuilds    | 项目源码 | 文件源码
def format_device(mode, major, minor):
    if stat.S_ISCHR(mode):
        kind = 'character'
    elif stat.S_ISBLK(mode):
        kind = 'block'
    else:
        kind = 'weird'
    return 'device:%s\nmajor: %d\nminor: %d\n' % (kind, major, minor)
项目:tf_aws_ecs_instance_draining_on_scale_in    作者:terraform-community-modules    | 项目源码 | 文件源码
def is_special_file(cls, filename):
        """Checks to see if a file is a special UNIX file.

        It checks if the file is a character special device, block special
        device, FIFO, or socket.

        :param filename: Name of the file

        :returns: True if the file is a special file. False, if is not.
        """
        # If it does not exist, it must be a new file so it cannot be
        # a special file.
        if not os.path.exists(filename):
            return False
        mode = os.stat(filename).st_mode
        # Character special device.
        if stat.S_ISCHR(mode):
            return True
        # Block special device
        if stat.S_ISBLK(mode):
            return True
        # Named pipe / FIFO
        if stat.S_ISFIFO(mode):
            return True
        # Socket.
        if stat.S_ISSOCK(mode):
            return True
        return False
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def lsLine(name, s):
    mode = s.st_mode
    perms = array.array('c', '-'*10)
    ft = stat.S_IFMT(mode)
    if stat.S_ISDIR(ft): perms[0] = 'd'
    elif stat.S_ISCHR(ft): perms[0] = 'c'
    elif stat.S_ISBLK(ft): perms[0] = 'b'
    elif stat.S_ISREG(ft): perms[0] = '-'
    elif stat.S_ISFIFO(ft): perms[0] = 'f'
    elif stat.S_ISLNK(ft): perms[0] = 'l'
    elif stat.S_ISSOCK(ft): perms[0] = 's'
    else: perms[0] = '!'
    # user
    if mode&stat.S_IRUSR:perms[1] = 'r'
    if mode&stat.S_IWUSR:perms[2] = 'w'
    if mode&stat.S_IXUSR:perms[3] = 'x'
    # group
    if mode&stat.S_IRGRP:perms[4] = 'r'
    if mode&stat.S_IWGRP:perms[5] = 'w'
    if mode&stat.S_IXGRP:perms[6] = 'x'
    # other
    if mode&stat.S_IROTH:perms[7] = 'r'
    if mode&stat.S_IWOTH:perms[8] = 'w'
    if mode&stat.S_IXOTH:perms[9] = 'x'
    # suid/sgid
    if mode&stat.S_ISUID:
        if perms[3] == 'x': perms[3] = 's'
        else: perms[3] = 'S'
    if mode&stat.S_ISGID:
        if perms[6] == 'x': perms[6] = 's'
        else: perms[6] = 'S'
    l = perms.tostring()
    l += str(s.st_nlink).rjust(5) + ' '
    un = str(s.st_uid)
    l += un.ljust(9)
    gr = str(s.st_gid)
    l += gr.ljust(9)
    sz = str(s.st_size)
    l += sz.rjust(8)
    l += ' '
    sixmo = 60 * 60 * 24 * 7 * 26
    if s.st_mtime + sixmo < time.time(): # last edited more than 6mo ago
        l += time.strftime("%b %2d  %Y ", time.localtime(s.st_mtime))
    else:
        l += time.strftime("%b %2d %H:%S ", time.localtime(s.st_mtime))
    l += name
    return l
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def lsLine(name, s):
    mode = s.st_mode
    perms = array.array('c', '-'*10)
    ft = stat.S_IFMT(mode)
    if stat.S_ISDIR(ft): perms[0] = 'd'
    elif stat.S_ISCHR(ft): perms[0] = 'c'
    elif stat.S_ISBLK(ft): perms[0] = 'b'
    elif stat.S_ISREG(ft): perms[0] = '-'
    elif stat.S_ISFIFO(ft): perms[0] = 'f'
    elif stat.S_ISLNK(ft): perms[0] = 'l'
    elif stat.S_ISSOCK(ft): perms[0] = 's'
    else: perms[0] = '!'
    # user
    if mode&stat.S_IRUSR:perms[1] = 'r'
    if mode&stat.S_IWUSR:perms[2] = 'w'
    if mode&stat.S_IXUSR:perms[3] = 'x'
    # group
    if mode&stat.S_IRGRP:perms[4] = 'r'
    if mode&stat.S_IWGRP:perms[5] = 'w'
    if mode&stat.S_IXGRP:perms[6] = 'x'
    # other
    if mode&stat.S_IROTH:perms[7] = 'r'
    if mode&stat.S_IWOTH:perms[8] = 'w'
    if mode&stat.S_IXOTH:perms[9] = 'x'
    # suid/sgid
    if mode&stat.S_ISUID:
        if perms[3] == 'x': perms[3] = 's'
        else: perms[3] = 'S'
    if mode&stat.S_ISGID:
        if perms[6] == 'x': perms[6] = 's'
        else: perms[6] = 'S'
    l = perms.tostring()
    l += str(s.st_nlink).rjust(5) + ' '
    un = str(s.st_uid)
    l += un.ljust(9)
    gr = str(s.st_gid)
    l += gr.ljust(9)
    sz = str(s.st_size)
    l += sz.rjust(8)
    l += ' '
    sixmo = 60 * 60 * 24 * 7 * 26
    if s.st_mtime + sixmo < time.time(): # last edited more than 6mo ago
        l += time.strftime("%b %2d  %Y ", time.localtime(s.st_mtime))
    else:
        l += time.strftime("%b %2d %H:%S ", time.localtime(s.st_mtime))
    l += name
    return l
项目:unitils    作者:iLoveTux    | 项目源码 | 文件源码
def find(path=".", name=None, iname=None, ftype="*"):
    """Search for files in a directory heirarchy.

    This is dramatically different from the GNU version of
    find. There is no Domain Specific language.

    :param path: The directory to start in
    :param name: The name spec (glob pattern) to search for
    :param iname: The case-insensitive name spec (glob pattern) to search for
    :param ftype: The type of file to search for must be one of b, c, d, p, f, k, s or *
    :type ftype: str
    :type iname: str
    :type name: str
    :type path: str
    """
    if ftype not in "bcdpfls*" or len(ftype) != 1:
        raise NotImplementedError(
            "Introspection for {} not implemented".format(ftype)
        )
    ftype_mapping = {
        "b": stat.S_ISBLK, "c": stat.S_ISCHR,
        "d": stat.S_ISDIR, "p": stat.S_ISFIFO,
        "f": stat.S_ISREG, "l": stat.S_ISLNK,
        "s": stat.S_ISSOCK,
        "*": lambda *args, **kwargs: True,
    }
    type_test = ftype_mapping[ftype]

    if name is not None:
        regex = re.compile(fnmatch.translate(name))
    elif iname is not None:
        regex = re.compile(fnmatch.translate(iname), flags=re.IGNORECASE)
    else:
        regex = re.compile(fnmatch.translate("*"))

    if regex.match(path) and type_test(os.stat(path).st_mode):
        yield os.path.relpath(path)

    for root, dirs, files in os.walk(path):
        for n in files + dirs:
            filename = os.path.join(root, n)
            _stat = os.stat(filename)
            if regex.match(n) and type_test(_stat.st_mode):
                yield filename