Python termios 模块,FIONREAD 实例源码

我们从Python开源项目中,提取了以下27个代码示例,用于说明如何使用termios.FIONREAD

项目:android3dblendermouse    作者:sketchpunk    | 项目源码 | 文件源码
def in_waiting(self):
        """Return the number of bytes currently in the input buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]

    # select based implementation, proved to work on many systems
项目:android3dblendermouse    作者:sketchpunk    | 项目源码 | 文件源码
def out_waiting(self):
        """Return the number of bytes currently in the output buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCOUTQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]
项目:microperi    作者:c0d3st0rm    | 项目源码 | 文件源码
def in_waiting(self):
        """Return the number of bytes currently in the input buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]

    # select based implementation, proved to work on many systems
项目:microperi    作者:c0d3st0rm    | 项目源码 | 文件源码
def out_waiting(self):
        """Return the number of bytes currently in the output buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCOUTQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]
项目:inotify_simple    作者:chrisjbillington    | 项目源码 | 文件源码
def read(self, timeout=None, read_delay=None):
        """Read the inotify file descriptor and return the resulting list of
        :attr:`~inotify_simple.Event` namedtuples (wd, mask, cookie, name).

        Args:
            timeout (int): The time in milliseconds to wait for events if
                there are none. If `negative or `None``, block until there are
                events.

            read_delay (int): The time in milliseconds to wait after the first
                event arrives before reading the buffer. This allows further
                events to accumulate before reading, which allows the kernel
                to consolidate like events and can enhance performance when
                there are many similar events.

        Returns:
            list: list of :attr:`~inotify_simple.Event` namedtuples"""
        # Wait for the first event:
        pending = self._poller.poll(timeout)
        if not pending:
            # Timed out, no events
            return []
        if read_delay is not None:
            # Wait for more events to accumulate:
            time.sleep(read_delay/1000.0)
        # How much data is available to read?
        bytes_avail = ctypes.c_int()
        ioctl(self.fd, FIONREAD, bytes_avail)
        buffer_size = bytes_avail.value
        # Read and parse it:
        data = os.read(self.fd, buffer_size)
        events = parse_events(data)
        return events
项目:gcodeplot    作者:arpruss    | 项目源码 | 文件源码
def in_waiting(self):
        """Return the number of bytes currently in the input buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]

    # select based implementation, proved to work on many systems
项目:gcodeplot    作者:arpruss    | 项目源码 | 文件源码
def out_waiting(self):
        """Return the number of bytes currently in the output buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCOUTQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]
项目:bitio    作者:whaleygeek    | 项目源码 | 文件源码
def in_waiting(self):
        """Return the number of bytes currently in the input buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]

    # select based implementation, proved to work on many systems
项目:bitio    作者:whaleygeek    | 项目源码 | 文件源码
def out_waiting(self):
        """Return the number of bytes currently in the output buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCOUTQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]
项目:caproto    作者:NSLS-II    | 项目源码 | 文件源码
def __call__(self):
        '''Selector poll loop'''
        avail_buf = array.array('i', [0])
        while self._running:
            with self._socket_map_lock:
                for sock in self._unregister_sockets:
                    self.selector.unregister(sock)
                self._unregister_sockets.clear()

                for sock in self._register_sockets:
                    self.selector.register(sock, selectors.EVENT_READ)
                self._register_sockets.clear()

            events = self.selector.select(timeout=0.1)
            with self._socket_map_lock:
                if self._unregister_sockets or self._register_sockets:
                    continue

                ready_ids = [self.socket_to_id[key.fileobj]
                             for key, mask in events]
                ready_objs = [(self.objects[obj_id], self.id_to_socket[obj_id])
                              for obj_id in ready_ids]

            for obj, sock in ready_objs:
                # TODO: consider thread pool for recv and command_loop

                if fcntl.ioctl(sock, termios.FIONREAD, avail_buf) < 0:
                    continue

                bytes_available = avail_buf[0]

                try:
                    bytes_recv, address = sock.recvfrom(max((4096,
                                                             bytes_available)))
                except OSError as ex:
                    if ex.errno == errno.EAGAIN:
                        continue
                    bytes_recv, address = b'', None

                obj.received(bytes_recv, address)
项目:microbit-serial    作者:martinohanlon    | 项目源码 | 文件源码
def in_waiting(self):
        """Return the number of bytes currently in the input buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]

    # select based implementation, proved to work on many systems
项目:microbit-serial    作者:martinohanlon    | 项目源码 | 文件源码
def out_waiting(self):
        """Return the number of bytes currently in the output buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCOUTQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]
项目:ddt4all    作者:cedricp    | 项目源码 | 文件源码
def in_waiting(self):
        """Return the number of bytes currently in the input buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]

    # select based implementation, proved to work on many systems
项目:ddt4all    作者:cedricp    | 项目源码 | 文件源码
def out_waiting(self):
        """Return the number of bytes currently in the output buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCOUTQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]
项目:mt7687-serial-uploader    作者:will127534    | 项目源码 | 文件源码
def in_waiting(self):
        """Return the number of bytes currently in the input buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]

    # select based implementation, proved to work on many systems
项目:mt7687-serial-uploader    作者:will127534    | 项目源码 | 文件源码
def out_waiting(self):
        """Return the number of bytes currently in the output buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCOUTQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]
项目:i3configger    作者:obestwalter    | 项目源码 | 文件源码
def read(self, timeout=None, read_delay=None):
        """Read the inotify file descriptor and return the resulting list of
        :attr:`~inotify_simple.Event` namedtuples (wd, mask, cookie, name).

        Args:
            timeout (int): The time in milliseconds to wait for events if
                there are none. If `negative or `None``, block until there are
                events.

            read_delay (int): The time in milliseconds to wait after the first
                event arrives before reading the buffer. This allows further
                events to accumulate before reading, which allows the kernel
                to consolidate like events and can enhance performance when
                there are many similar events.

        Returns:
            list: list of :attr:`~inotify_simple.Event` namedtuples"""
        # Wait for the first event:
        pending = self._poller.poll(timeout)
        if not pending:
            # Timed out, no events
            return []
        if read_delay is not None:
            # Wait for more events to accumulate:
            time.sleep(read_delay/1000.0)
        # How much data is available to read?
        bytes_avail = ctypes.c_int()
        ioctl(self.fd, FIONREAD, bytes_avail)
        buffer_size = bytes_avail.value
        # Read and parse it:
        data = os.read(self.fd, buffer_size)
        events = parse_events(data)
        return events
项目:Jackal_Velodyne_Duke    作者:MengGuo    | 项目源码 | 文件源码
def in_waiting(self):
        """Return the number of bytes currently in the input buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]

    # select based implementation, proved to work on many systems
项目:Jackal_Velodyne_Duke    作者:MengGuo    | 项目源码 | 文件源码
def out_waiting(self):
        """Return the number of bytes currently in the output buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCOUTQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]
项目:Jackal_Velodyne_Duke    作者:MengGuo    | 项目源码 | 文件源码
def in_waiting(self):
        """Return the number of bytes currently in the input buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]

    # select based implementation, proved to work on many systems
项目:Jackal_Velodyne_Duke    作者:MengGuo    | 项目源码 | 文件源码
def out_waiting(self):
        """Return the number of bytes currently in the output buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCOUTQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]
项目:hacker-scripts    作者:restran    | 项目源码 | 文件源码
def read_events(self):
        """
        Read events from device, build _RawEvents, and enqueue them.
        """
        buf_ = array.array('i', [0])
        # get event queue size
        if fcntl.ioctl(self._fd, termios.FIONREAD, buf_, 1) == -1:
            return
        queue_size = buf_[0]
        if queue_size < self._threshold:
            log.debug('(fd: %d) %d bytes available to read but threshold is '
                      'fixed to %d bytes', self._fd, queue_size,
                      self._threshold)
            return

        try:
            # Read content from file
            r = os.read(self._fd, queue_size)
        except Exception, msg:
            raise NotifierError(msg)
        log.debug('Event queue size: %d', queue_size)
        rsum = 0  # counter
        while rsum < queue_size:
            s_size = 16
            # Retrieve wd, mask, cookie and fname_len
            wd, mask, cookie, fname_len = struct.unpack('iIII',
                                                        r[rsum:rsum+s_size])
            # Retrieve name
            fname, = struct.unpack('%ds' % fname_len,
                                   r[rsum + s_size:rsum + s_size + fname_len])
            rawevent = _RawEvent(wd, mask, cookie, fname)
            if self._coalesce:
                # Only enqueue new (unique) events.
                raweventstr = str(rawevent)
                if raweventstr not in self._eventset:
                    self._eventset.add(raweventstr)
                    self._eventq.append(rawevent)
            else:
                self._eventq.append(rawevent)
            rsum += s_size + fname_len
项目:vim-clangd    作者:Chilledheart    | 项目源码 | 文件源码
def EstimateUnreadBytes(fd):
    buf = array('i', [0])
    ioctl(fd, FIONREAD, buf, 1)
    return buf[0]
项目:ftcommunity-apps    作者:ftCommunity    | 项目源码 | 文件源码
def in_waiting(self):
        """Return the number of bytes currently in the input buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]

    # select based implementation, proved to work on many systems
项目:ftcommunity-apps    作者:ftCommunity    | 项目源码 | 文件源码
def out_waiting(self):
        """Return the number of bytes currently in the output buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCOUTQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]
项目:ClassiPi    作者:mugroma3    | 项目源码 | 文件源码
def read_events(self):
        """
        Read events from device, build _RawEvents, and enqueue them.
        """
        buf_ = array.array('i', [0])
        # get event queue size
        if fcntl.ioctl(self._fd, termios.FIONREAD, buf_, 1) == -1:
            return
        queue_size = buf_[0]
        if queue_size < self._threshold:
            log.debug('(fd: %d) %d bytes available to read but threshold is '
                      'fixed to %d bytes', self._fd, queue_size,
                      self._threshold)
            return

        try:
            # Read content from file
            r = os.read(self._fd, queue_size)
        except Exception as msg:
            raise NotifierError(msg)
        log.debug('Event queue size: %d', queue_size)
        rsum = 0  # counter
        while rsum < queue_size:
            s_size = 16
            # Retrieve wd, mask, cookie and fname_len
            wd, mask, cookie, fname_len = struct.unpack('iIII',
                                                        r[rsum:rsum+s_size])
            # Retrieve name
            bname, = struct.unpack('%ds' % fname_len,
                                   r[rsum + s_size:rsum + s_size + fname_len])
            # FIXME: should we explictly call sys.getdefaultencoding() here ??
            uname = bname.decode()
            rawevent = _RawEvent(wd, mask, cookie, uname)
            if self._coalesce:
                # Only enqueue new (unique) events.
                raweventstr = str(rawevent)
                if raweventstr not in self._eventset:
                    self._eventset.add(raweventstr)
                    self._eventq.append(rawevent)
            else:
                self._eventq.append(rawevent)
            rsum += s_size + fname_len
项目:isar    作者:ilbers    | 项目源码 | 文件源码
def read_events(self):
        """
        Read events from device, build _RawEvents, and enqueue them.
        """
        buf_ = array.array('i', [0])
        # get event queue size
        if fcntl.ioctl(self._fd, termios.FIONREAD, buf_, 1) == -1:
            return
        queue_size = buf_[0]
        if queue_size < self._threshold:
            log.debug('(fd: %d) %d bytes available to read but threshold is '
                      'fixed to %d bytes', self._fd, queue_size,
                      self._threshold)
            return

        try:
            # Read content from file
            r = os.read(self._fd, queue_size)
        except Exception as msg:
            raise NotifierError(msg)
        log.debug('Event queue size: %d', queue_size)
        rsum = 0  # counter
        while rsum < queue_size:
            s_size = 16
            # Retrieve wd, mask, cookie and fname_len
            wd, mask, cookie, fname_len = struct.unpack('iIII',
                                                        r[rsum:rsum+s_size])
            # Retrieve name
            bname, = struct.unpack('%ds' % fname_len,
                                   r[rsum + s_size:rsum + s_size + fname_len])
            # FIXME: should we explictly call sys.getdefaultencoding() here ??
            uname = bname.decode()
            rawevent = _RawEvent(wd, mask, cookie, uname)
            if self._coalesce:
                # Only enqueue new (unique) events.
                raweventstr = str(rawevent)
                if raweventstr not in self._eventset:
                    self._eventset.add(raweventstr)
                    self._eventq.append(rawevent)
            else:
                self._eventq.append(rawevent)
            rsum += s_size + fname_len