Python termios 模块,TCIFLUSH 实例源码

我们从Python开源项目中,提取了以下20个代码示例,用于说明如何使用termios.TCIFLUSH

项目:a_sync    作者:notion    | 项目源码 | 文件源码
def a_input(prompt: str) -> str:
    """Async input prompt."""
    readable = []  # type: List[int]
    print(prompt, end='')
    sys.stdout.flush()
    while not readable:
        readable, _, _ = select.select([sys.stdin], [], [], 0)
        try:
            await asyncio.sleep(0.1)
        except futures.CancelledError:
            print("input cancelled...")
            termios.tcflush(sys.stdin, termios.TCIFLUSH)
            raise
    return sys.stdin.readline().rstrip()


# [ Classes ]
项目:android3dblendermouse    作者:sketchpunk    | 项目源码 | 文件源码
def reset_input_buffer(self):
        """Clear input buffer, discarding all that is in the buffer."""
        if not self.is_open:
            raise portNotOpenError
        termios.tcflush(self.fd, termios.TCIFLUSH)
项目:pyrepl    作者:dajose    | 项目源码 | 文件源码
def forgetinput(self):
        termios.tcflush(self.input_fd, termios.TCIFLUSH)
项目:microperi    作者:c0d3st0rm    | 项目源码 | 文件源码
def reset_input_buffer(self):
        """Clear input buffer, discarding all that is in the buffer."""
        if not self.is_open:
            raise portNotOpenError
        termios.tcflush(self.fd, termios.TCIFLUSH)
项目:gcodeplot    作者:arpruss    | 项目源码 | 文件源码
def reset_input_buffer(self):
        """Clear input buffer, discarding all that is in the buffer."""
        if not self.is_open:
            raise portNotOpenError
        termios.tcflush(self.fd, termios.TCIFLUSH)
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def missing_host_key(self, client, hostname, key):

        if C.HOST_KEY_CHECKING:

            self.connection.connection_lock()

            old_stdin = sys.stdin
            sys.stdin = self._new_stdin

            # clear out any premature input on sys.stdin
            tcflush(sys.stdin, TCIFLUSH)

            fingerprint = hexlify(key.get_fingerprint())
            ktype = key.get_name()

            inp = input(AUTHENTICITY_MSG % (hostname, ktype, fingerprint))
            sys.stdin = old_stdin

            self.connection.connection_unlock()

            if inp not in ['yes','y','']:
                raise AnsibleError("host connection rejected by user")

        key._added_by_ansible_this_time = True

        # existing implementation below:
        client._host_keys.add(hostname, key.get_name(), key)

        # host keys are actually saved in close() function below
        # in order to control ordering.


# keep connection objects on a per host basis to avoid repeated attempts to reconnect
项目:bitio    作者:whaleygeek    | 项目源码 | 文件源码
def reset_input_buffer(self):
        """Clear input buffer, discarding all that is in the buffer."""
        if not self.is_open:
            raise portNotOpenError
        termios.tcflush(self.fd, termios.TCIFLUSH)
项目:microbit-serial    作者:martinohanlon    | 项目源码 | 文件源码
def reset_input_buffer(self):
        """Clear input buffer, discarding all that is in the buffer."""
        if not self.is_open:
            raise portNotOpenError
        termios.tcflush(self.fd, termios.TCIFLUSH)
项目:Parlay    作者:PromenadeSoftware    | 项目源码 | 文件源码
def flushInput(self):
        """Clear input buffer, discarding all that is in the buffer."""
        termios.tcflush(self._f.fileno(), termios.TCIFLUSH)
项目:ddt4all    作者:cedricp    | 项目源码 | 文件源码
def reset_input_buffer(self):
        """Clear input buffer, discarding all that is in the buffer."""
        if not self.is_open:
            raise portNotOpenError
        termios.tcflush(self.fd, termios.TCIFLUSH)
项目:mt7687-serial-uploader    作者:will127534    | 项目源码 | 文件源码
def reset_input_buffer(self):
        """Clear input buffer, discarding all that is in the buffer."""
        if not self.is_open:
            raise portNotOpenError
        termios.tcflush(self.fd, termios.TCIFLUSH)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def forgetinput(self):
        termios.tcflush(self.input_fd, termios.TCIFLUSH)
项目:Jackal_Velodyne_Duke    作者:MengGuo    | 项目源码 | 文件源码
def reset_input_buffer(self):
        """Clear input buffer, discarding all that is in the buffer."""
        if not self.is_open:
            raise portNotOpenError
        termios.tcflush(self.fd, termios.TCIFLUSH)
项目:Jackal_Velodyne_Duke    作者:MengGuo    | 项目源码 | 文件源码
def reset_input_buffer(self):
        """Clear input buffer, discarding all that is in the buffer."""
        if not self.is_open:
            raise portNotOpenError
        termios.tcflush(self.fd, termios.TCIFLUSH)
项目:ansible-provider-docs    作者:alibaba    | 项目源码 | 文件源码
def missing_host_key(self, client, hostname, key):

        if all((C.HOST_KEY_CHECKING, not C.PARAMIKO_HOST_KEY_AUTO_ADD)):

            if C.USE_PERSISTENT_CONNECTIONS:
                raise AnsibleConnectionFailure('rejected %s host key for host %s: %s' % (key.get_name(), hostname, hexlify(key.get_fingerprint())))

            self.connection.connection_lock()

            old_stdin = sys.stdin
            sys.stdin = self._new_stdin

            # clear out any premature input on sys.stdin
            tcflush(sys.stdin, TCIFLUSH)

            fingerprint = hexlify(key.get_fingerprint())
            ktype = key.get_name()

            inp = input(AUTHENTICITY_MSG % (hostname, ktype, fingerprint))
            sys.stdin = old_stdin

            self.connection.connection_unlock()

            if inp not in ['yes', 'y', '']:
                raise AnsibleError("host connection rejected by user")

        key._added_by_ansible_this_time = True

        # existing implementation below:
        client._host_keys.add(hostname, key.get_name(), key)

        # host keys are actually saved in close() function below
        # in order to control ordering.


# keep connection objects on a per host basis to avoid repeated attempts to reconnect
项目:helios-server-mixnet    作者:RunasSudo    | 项目源码 | 文件源码
def getch(self):
                termios.tcflush(0, termios.TCIFLUSH) # XXX Leave this in?
                return os.read(self._fd, 1)
项目:helios-server-mixnet    作者:RunasSudo    | 项目源码 | 文件源码
def close(self, delay = 0):
                if delay:
                    time.sleep(delay)
                    termios.tcflush(self._fd, termios.TCIFLUSH)
                termios.tcsetattr(self._fd, termios.TCSAFLUSH, self._old)
项目:helios-server-mixnet    作者:RunasSudo    | 项目源码 | 文件源码
def getch(self):
                termios.tcflush(0, termios.TCIFLUSH) # XXX Leave this in?
                return os.read(self._fd, 1)
项目:helios-server-mixnet    作者:RunasSudo    | 项目源码 | 文件源码
def close(self, delay = 0):
                if delay:
                    time.sleep(delay)
                    termios.tcflush(self._fd, termios.TCIFLUSH)
                termios.tcsetattr(self._fd, termios.TCSAFLUSH, self._old)
项目:ftcommunity-apps    作者:ftCommunity    | 项目源码 | 文件源码
def reset_input_buffer(self):
        """Clear input buffer, discarding all that is in the buffer."""
        if not self.is_open:
            raise portNotOpenError
        termios.tcflush(self.fd, termios.TCIFLUSH)