Python pty 模块,STDIN_FILENO 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pty.STDIN_FILENO

项目:jd4    作者:vijos    | 项目源码 | 文件源码
def do_execute(self, stdin_file, stdout_file, stderr_file, cgroup_file):
        pid = fork()
        if not pid:
            chdir('/in/package')
            if stdin_file:
                fd = os_open(stdin_file, O_RDONLY)
                dup2(fd, STDIN_FILENO)
                os_close(fd)
            if stdout_file:
                fd = os_open(stdout_file, O_WRONLY)
                dup2(fd, STDOUT_FILENO)
                os_close(fd)
            if stderr_file:
                fd = os_open(stderr_file, O_WRONLY)
                dup2(fd, STDERR_FILENO)
                os_close(fd)
            if cgroup_file:
                enter_cgroup(cgroup_file)
            execve(self.execute_file, self.execute_args, SPAWN_ENV)
        return wait_and_reap_zombies(pid)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test__copy_eof_on_all(self):
        """Test the empty read EOF case on both master_fd and stdin."""
        read_from_stdout_fd, mock_stdout_fd = self._pipe()
        pty.STDOUT_FILENO = mock_stdout_fd
        mock_stdin_fd, write_to_stdin_fd = self._pipe()
        pty.STDIN_FILENO = mock_stdin_fd
        socketpair = socket.socketpair()
        masters = [s.fileno() for s in socketpair]
        self.fds.extend(masters)

        os.close(masters[1])
        socketpair[1].close()
        os.close(write_to_stdin_fd)

        # Expect two select calls, the last one will cause IndexError
        pty.select = self._mock_select
        self.select_rfds_lengths.append(2)
        self.select_rfds_results.append([mock_stdin_fd, masters[0]])
        # We expect that both fds were removed from the fds list as they
        # both encountered an EOF before the second select call.
        self.select_rfds_lengths.append(0)

        with self.assertRaises(IndexError):
            pty._copy(masters[0])
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test__copy_eof_on_all(self):
        """Test the empty read EOF case on both master_fd and stdin."""
        read_from_stdout_fd, mock_stdout_fd = self._pipe()
        pty.STDOUT_FILENO = mock_stdout_fd
        mock_stdin_fd, write_to_stdin_fd = self._pipe()
        pty.STDIN_FILENO = mock_stdin_fd
        socketpair = socket.socketpair()
        masters = [s.fileno() for s in socketpair]
        self.fds.extend(masters)

        os.close(masters[1])
        socketpair[1].close()
        os.close(write_to_stdin_fd)

        # Expect two select calls, the last one will cause IndexError
        pty.select = self._mock_select
        self.select_rfds_lengths.append(2)
        self.select_rfds_results.append([mock_stdin_fd, masters[0]])
        # We expect that both fds were removed from the fds list as they
        # both encountered an EOF before the second select call.
        self.select_rfds_lengths.append(0)

        with self.assertRaises(IndexError):
            pty._copy(masters[0])
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test__copy_eof_on_all(self):
        """Test the empty read EOF case on both master_fd and stdin."""
        read_from_stdout_fd, mock_stdout_fd = self._pipe()
        pty.STDOUT_FILENO = mock_stdout_fd
        mock_stdin_fd, write_to_stdin_fd = self._pipe()
        pty.STDIN_FILENO = mock_stdin_fd
        socketpair = socket.socketpair()
        masters = [s.fileno() for s in socketpair]
        self.fds.extend(masters)

        os.close(masters[1])
        socketpair[1].close()
        os.close(write_to_stdin_fd)

        # Expect two select calls, the last one will cause IndexError
        pty.select = self._mock_select
        self.select_rfds_lengths.append(2)
        self.select_rfds_results.append([mock_stdin_fd, masters[0]])
        # We expect that both fds were removed from the fds list as they
        # both encountered an EOF before the second select call.
        self.select_rfds_lengths.append(0)

        with self.assertRaises(IndexError):
            pty._copy(masters[0])
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test__copy_eof_on_all(self):
        """Test the empty read EOF case on both master_fd and stdin."""
        read_from_stdout_fd, mock_stdout_fd = self._pipe()
        pty.STDOUT_FILENO = mock_stdout_fd
        mock_stdin_fd, write_to_stdin_fd = self._pipe()
        pty.STDIN_FILENO = mock_stdin_fd
        socketpair = self._socketpair()
        masters = [s.fileno() for s in socketpair]

        os.close(masters[1])
        socketpair[1].close()
        os.close(write_to_stdin_fd)

        # Expect two select calls, the last one will cause IndexError
        pty.select = self._mock_select
        self.select_rfds_lengths.append(2)
        self.select_rfds_results.append([mock_stdin_fd, masters[0]])
        # We expect that both fds were removed from the fds list as they
        # both encountered an EOF before the second select call.
        self.select_rfds_lengths.append(0)

        with self.assertRaises(IndexError):
            pty._copy(masters[0])
项目:remote-docker    作者:phizaz    | 项目源码 | 文件源码
def _copy(self):
        '''
        Main select loop. Passes all data to self.master_read() or self.stdin_read().
        '''
        assert self.master_fd is not None
        master_fd = self.master_fd
        while True:
            rfds, wfds, xfds = select.select([master_fd, pty.STDIN_FILENO], [], [])

            anything = False

            if master_fd in rfds:
                anything |= self._has_child_response()
            if pty.STDIN_FILENO in rfds:
                anything |= self._has_user_input()

            if not anything:
                break
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test__copy_eof_on_all(self):
        """Test the empty read EOF case on both master_fd and stdin."""
        read_from_stdout_fd, mock_stdout_fd = self._pipe()
        pty.STDOUT_FILENO = mock_stdout_fd
        mock_stdin_fd, write_to_stdin_fd = self._pipe()
        pty.STDIN_FILENO = mock_stdin_fd
        socketpair = socket.socketpair()
        masters = [s.fileno() for s in socketpair]
        self.fds.extend(masters)

        os.close(masters[1])
        socketpair[1].close()
        os.close(write_to_stdin_fd)

        # Expect two select calls, the last one will cause IndexError
        pty.select = self._mock_select
        self.select_rfds_lengths.append(2)
        self.select_rfds_results.append([mock_stdin_fd, masters[0]])
        # We expect that both fds were removed from the fds list as they
        # both encountered an EOF before the second select call.
        self.select_rfds_lengths.append(0)

        with self.assertRaises(IndexError):
            pty._copy(masters[0])
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test__copy_eof_on_all(self):
        """Test the empty read EOF case on both master_fd and stdin."""
        read_from_stdout_fd, mock_stdout_fd = self._pipe()
        pty.STDOUT_FILENO = mock_stdout_fd
        mock_stdin_fd, write_to_stdin_fd = self._pipe()
        pty.STDIN_FILENO = mock_stdin_fd
        socketpair = self._socketpair()
        masters = [s.fileno() for s in socketpair]

        os.close(masters[1])
        socketpair[1].close()
        os.close(write_to_stdin_fd)

        # Expect two select calls, the last one will cause IndexError
        pty.select = self._mock_select
        self.select_rfds_lengths.append(2)
        self.select_rfds_results.append([mock_stdin_fd, masters[0]])
        # We expect that both fds were removed from the fds list as they
        # both encountered an EOF before the second select call.
        self.select_rfds_lengths.append(0)

        with self.assertRaises(IndexError):
            pty._copy(masters[0])
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test__copy_eof_on_all(self):
        """Test the empty read EOF case on both master_fd and stdin."""
        read_from_stdout_fd, mock_stdout_fd = self._pipe()
        pty.STDOUT_FILENO = mock_stdout_fd
        mock_stdin_fd, write_to_stdin_fd = self._pipe()
        pty.STDIN_FILENO = mock_stdin_fd
        socketpair = socket.socketpair()
        masters = [s.fileno() for s in socketpair]
        self.fds.extend(masters)

        os.close(masters[1])
        socketpair[1].close()
        os.close(write_to_stdin_fd)

        # Expect two select calls, the last one will cause IndexError
        pty.select = self._mock_select
        self.select_rfds_lengths.append(2)
        self.select_rfds_results.append([mock_stdin_fd, masters[0]])
        # We expect that both fds were removed from the fds list as they
        # both encountered an EOF before the second select call.
        self.select_rfds_lengths.append(0)

        with self.assertRaises(IndexError):
            pty._copy(masters[0])
项目:gitsome    作者:donnemartin    | 项目源码 | 文件源码
def _copy(self):
        """Main select loop. Passes all data to self.master_read() or self.stdin_read().
        """
        assert self.master_fd is not None
        master_fd = self.master_fd
        bufsize = self.bufsize
        while True:
            try:
                rfds, wfds, xfds = select.select([master_fd, pty.STDIN_FILENO], [], [])
            except OSError as e:
                if e.errno == 4:  # Interrupted system call. 
                    continue      # This happens at terminal resize.
            if master_fd in rfds:
                data = os.read(master_fd, bufsize)
                self.write_stdout(data)
            if pty.STDIN_FILENO in rfds:
                data = os.read(pty.STDIN_FILENO, bufsize)
                self.write_stdin(data)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test__copy_eof_on_all(self):
        """Test the empty read EOF case on both master_fd and stdin."""
        read_from_stdout_fd, mock_stdout_fd = self._pipe()
        pty.STDOUT_FILENO = mock_stdout_fd
        mock_stdin_fd, write_to_stdin_fd = self._pipe()
        pty.STDIN_FILENO = mock_stdin_fd
        socketpair = self._socketpair()
        masters = [s.fileno() for s in socketpair]

        os.close(masters[1])
        socketpair[1].close()
        os.close(write_to_stdin_fd)

        # Expect two select calls, the last one will cause IndexError
        pty.select = self._mock_select
        self.select_rfds_lengths.append(2)
        self.select_rfds_results.append([mock_stdin_fd, masters[0]])
        # We expect that both fds were removed from the fds list as they
        # both encountered an EOF before the second select call.
        self.select_rfds_lengths.append(0)

        with self.assertRaises(IndexError):
            pty._copy(masters[0])
项目:sardana    作者:sardana-org    | 项目源码 | 文件源码
def __interact_copy(self, escape_character=None, input_filter=None, output_filter=None):
        """This is used by the interact() method.
        """

        while self.isalive():
            r, w, e = self.__select([self.child_fd, self.STDIN_FILENO], [], [])
            if self.child_fd in r:
                data = self.__interact_read(self.child_fd)
                if output_filter:
                    data = output_filter(data)
                if self.logfile is not None:
                    self.logfile.write(data)
                    self.logfile.flush()
                os.write(self.STDOUT_FILENO, data)
            if self.STDIN_FILENO in r:
                data = self.__interact_read(self.STDIN_FILENO)
                if input_filter:
                    data = input_filter(data)
                i = data.rfind(escape_character)
                if i != -1:
                    data = data[:i]
                    self.__interact_writen(self.child_fd, data)
                    break
                self.__interact_writen(self.child_fd, data)
项目:watchmen    作者:lycclsltt    | 项目源码 | 文件源码
def __interact_copy(self, escape_character=None,
            input_filter=None, output_filter=None):

        '''This is used by the interact() method.
        '''

        while self.isalive():
            r, w, e = select_ignore_interrupts([self.child_fd, self.STDIN_FILENO], [], [])
            if self.child_fd in r:
                try:
                    data = self.__interact_read(self.child_fd)
                except OSError as err:
                    if err.args[0] == errno.EIO:
                        # Linux-style EOF
                        break
                    raise
                if data == b'':
                    # BSD-style EOF
                    break
                if output_filter:
                    data = output_filter(data)
                self._log(data, 'read')
                os.write(self.STDOUT_FILENO, data)
            if self.STDIN_FILENO in r:
                data = self.__interact_read(self.STDIN_FILENO)
                if input_filter:
                    data = input_filter(data)
                i = -1
                if escape_character is not None:
                    i = data.rfind(escape_character)
                if i != -1:
                    data = data[:i]
                    if data:
                        self._log(data, 'send')
                    self.__interact_writen(self.child_fd, data)
                    break
                self._log(data, 'send')
                self.__interact_writen(self.child_fd, data)
项目:jd4    作者:vijos    | 项目源码 | 文件源码
def do_build(self, output_file, cgroup_file):
        pid = fork()
        if not pid:
            chdir('/out')
            os_close(STDIN_FILENO)
            if output_file:
                fd = os_open(output_file, O_WRONLY)
                dup2(fd, STDOUT_FILENO)
                dup2(fd, STDERR_FILENO)
                os_close(fd)
            if cgroup_file:
                enter_cgroup(cgroup_file)
            execve(self.compiler_file, self.compiler_args, SPAWN_ENV)
        return wait_and_reap_zombies(pid)
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def __interact_copy(self, escape_character=None,
            input_filter=None, output_filter=None):

        '''This is used by the interact() method.
        '''

        while self.isalive():
            r, w, e = select_ignore_interrupts([self.child_fd, self.STDIN_FILENO], [], [])
            if self.child_fd in r:
                try:
                    data = self.__interact_read(self.child_fd)
                except OSError as err:
                    if err.args[0] == errno.EIO:
                        # Linux-style EOF
                        break
                    raise
                if data == b'':
                    # BSD-style EOF
                    break
                if output_filter:
                    data = output_filter(data)
                self._log(data, 'read')
                os.write(self.STDOUT_FILENO, data)
            if self.STDIN_FILENO in r:
                data = self.__interact_read(self.STDIN_FILENO)
                if input_filter:
                    data = input_filter(data)
                i = -1
                if escape_character is not None:
                    i = data.rfind(escape_character)
                if i != -1:
                    data = data[:i]
                    if data:
                        self._log(data, 'send')
                    self.__interact_writen(self.child_fd, data)
                    break
                self._log(data, 'send')
                self.__interact_writen(self.child_fd, data)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def setUp(self):
        self.orig_stdin_fileno = pty.STDIN_FILENO
        self.orig_stdout_fileno = pty.STDOUT_FILENO
        self.orig_pty_select = pty.select
        self.fds = []  # A list of file descriptors to close.
        self.select_rfds_lengths = []
        self.select_rfds_results = []
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def tearDown(self):
        pty.STDIN_FILENO = self.orig_stdin_fileno
        pty.STDOUT_FILENO = self.orig_stdout_fileno
        pty.select = self.orig_pty_select
        for fd in self.fds:
            try:
                os.close(fd)
            except:
                pass
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test__copy_to_each(self):
        """Test the normal data case on both master_fd and stdin."""
        read_from_stdout_fd, mock_stdout_fd = self._pipe()
        pty.STDOUT_FILENO = mock_stdout_fd
        mock_stdin_fd, write_to_stdin_fd = self._pipe()
        pty.STDIN_FILENO = mock_stdin_fd
        socketpair = socket.socketpair()
        masters = [s.fileno() for s in socketpair]
        self.fds.extend(masters)

        # Feed data.  Smaller than PIPEBUF.  These writes will not block.
        os.write(masters[1], b'from master')
        os.write(write_to_stdin_fd, b'from stdin')

        # Expect two select calls, the last one will cause IndexError
        pty.select = self._mock_select
        self.select_rfds_lengths.append(2)
        self.select_rfds_results.append([mock_stdin_fd, masters[0]])
        self.select_rfds_lengths.append(2)

        with self.assertRaises(IndexError):
            pty._copy(masters[0])

        # Test that the right data went to the right places.
        rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0]
        self.assertEqual([read_from_stdout_fd, masters[1]], rfds)
        self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master')
        self.assertEqual(os.read(masters[1], 20), b'from stdin')
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def setUp(self):
        self.orig_stdin_fileno = pty.STDIN_FILENO
        self.orig_stdout_fileno = pty.STDOUT_FILENO
        self.orig_pty_select = pty.select
        self.fds = []  # A list of file descriptors to close.
        self.select_rfds_lengths = []
        self.select_rfds_results = []
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def tearDown(self):
        pty.STDIN_FILENO = self.orig_stdin_fileno
        pty.STDOUT_FILENO = self.orig_stdout_fileno
        pty.select = self.orig_pty_select
        for fd in self.fds:
            try:
                os.close(fd)
            except:
                pass
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test__copy_to_each(self):
        """Test the normal data case on both master_fd and stdin."""
        read_from_stdout_fd, mock_stdout_fd = self._pipe()
        pty.STDOUT_FILENO = mock_stdout_fd
        mock_stdin_fd, write_to_stdin_fd = self._pipe()
        pty.STDIN_FILENO = mock_stdin_fd
        socketpair = socket.socketpair()
        masters = [s.fileno() for s in socketpair]
        self.fds.extend(masters)

        # Feed data.  Smaller than PIPEBUF.  These writes will not block.
        os.write(masters[1], b'from master')
        os.write(write_to_stdin_fd, b'from stdin')

        # Expect two select calls, the last one will cause IndexError
        pty.select = self._mock_select
        self.select_rfds_lengths.append(2)
        self.select_rfds_results.append([mock_stdin_fd, masters[0]])
        self.select_rfds_lengths.append(2)

        with self.assertRaises(IndexError):
            pty._copy(masters[0])

        # Test that the right data went to the right places.
        rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0]
        self.assertEqual([read_from_stdout_fd, masters[1]], rfds)
        self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master')
        self.assertEqual(os.read(masters[1], 20), b'from stdin')
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def setUp(self):
        self.orig_stdin_fileno = pty.STDIN_FILENO
        self.orig_stdout_fileno = pty.STDOUT_FILENO
        self.orig_pty_select = pty.select
        self.fds = []  # A list of file descriptors to close.
        self.select_rfds_lengths = []
        self.select_rfds_results = []
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def tearDown(self):
        pty.STDIN_FILENO = self.orig_stdin_fileno
        pty.STDOUT_FILENO = self.orig_stdout_fileno
        pty.select = self.orig_pty_select
        for fd in self.fds:
            try:
                os.close(fd)
            except:
                pass
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test__copy_to_each(self):
        """Test the normal data case on both master_fd and stdin."""
        read_from_stdout_fd, mock_stdout_fd = self._pipe()
        pty.STDOUT_FILENO = mock_stdout_fd
        mock_stdin_fd, write_to_stdin_fd = self._pipe()
        pty.STDIN_FILENO = mock_stdin_fd
        socketpair = socket.socketpair()
        masters = [s.fileno() for s in socketpair]
        self.fds.extend(masters)

        # Feed data.  Smaller than PIPEBUF.  These writes will not block.
        os.write(masters[1], b'from master')
        os.write(write_to_stdin_fd, b'from stdin')

        # Expect two select calls, the last one will cause IndexError
        pty.select = self._mock_select
        self.select_rfds_lengths.append(2)
        self.select_rfds_results.append([mock_stdin_fd, masters[0]])
        self.select_rfds_lengths.append(2)

        with self.assertRaises(IndexError):
            pty._copy(masters[0])

        # Test that the right data went to the right places.
        rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0]
        self.assertEqual([read_from_stdout_fd, masters[1]], rfds)
        self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master')
        self.assertEqual(os.read(masters[1], 20), b'from stdin')
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def setUp(self):
        self.orig_stdin_fileno = pty.STDIN_FILENO
        self.orig_stdout_fileno = pty.STDOUT_FILENO
        self.orig_pty_select = pty.select
        self.fds = []  # A list of file descriptors to close.
        self.files = []
        self.select_rfds_lengths = []
        self.select_rfds_results = []
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def tearDown(self):
        pty.STDIN_FILENO = self.orig_stdin_fileno
        pty.STDOUT_FILENO = self.orig_stdout_fileno
        pty.select = self.orig_pty_select
        for file in self.files:
            try:
                file.close()
            except OSError:
                pass
        for fd in self.fds:
            try:
                os.close(fd)
            except OSError:
                pass
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test__copy_to_each(self):
        """Test the normal data case on both master_fd and stdin."""
        read_from_stdout_fd, mock_stdout_fd = self._pipe()
        pty.STDOUT_FILENO = mock_stdout_fd
        mock_stdin_fd, write_to_stdin_fd = self._pipe()
        pty.STDIN_FILENO = mock_stdin_fd
        socketpair = self._socketpair()
        masters = [s.fileno() for s in socketpair]

        # Feed data.  Smaller than PIPEBUF.  These writes will not block.
        os.write(masters[1], b'from master')
        os.write(write_to_stdin_fd, b'from stdin')

        # Expect two select calls, the last one will cause IndexError
        pty.select = self._mock_select
        self.select_rfds_lengths.append(2)
        self.select_rfds_results.append([mock_stdin_fd, masters[0]])
        self.select_rfds_lengths.append(2)

        with self.assertRaises(IndexError):
            pty._copy(masters[0])

        # Test that the right data went to the right places.
        rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0]
        self.assertEqual([read_from_stdout_fd, masters[1]], rfds)
        self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master')
        self.assertEqual(os.read(masters[1], 20), b'from stdin')
项目:remote-docker    作者:phizaz    | 项目源码 | 文件源码
def _init_tty(self):
        try:
            self.mode = tty.tcgetattr(pty.STDIN_FILENO)
            tty.setraw(pty.STDIN_FILENO)  # this seems to change the behavior of the stdout
        except tty.error:
            pass
项目:remote-docker    作者:phizaz    | 项目源码 | 文件源码
def _del_tty(self):
        if self.mode:
            # restore the tty mode for stdout
            tty.tcsetattr(pty.STDIN_FILENO, tty.TCSAFLUSH, self.mode)
项目:remote-docker    作者:phizaz    | 项目源码 | 文件源码
def _read_stdin(self, bufsize):
        return os.read(pty.STDIN_FILENO, bufsize)
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def __fork_pty(self):
        '''This implements a substitute for the forkpty system call. This
        should be more portable than the pty.fork() function. Specifically,
        this should work on Solaris.

        Modified 10.06.05 by Geoff Marshall: Implemented __fork_pty() method to
        resolve the issue with Python's pty.fork() not supporting Solaris,
        particularly ssh. Based on patch to posixmodule.c authored by Noah
        Spurrier::

            http://mail.python.org/pipermail/python-dev/2003-May/035281.html

        '''

        parent_fd, child_fd = os.openpty()
        if parent_fd < 0 or child_fd < 0:
            raise ExceptionPexpect("Could not open with os.openpty().")

        pid = os.fork()
        if pid == pty.CHILD:
            # Child.
            os.close(parent_fd)
            self.__pty_make_controlling_tty(child_fd)

            os.dup2(child_fd, self.STDIN_FILENO)
            os.dup2(child_fd, self.STDOUT_FILENO)
            os.dup2(child_fd, self.STDERR_FILENO)

        else:
            # Parent.
            os.close(child_fd)

        return pid, parent_fd
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def __interact_copy(self, escape_character=None,
            input_filter=None, output_filter=None):

        '''This is used by the interact() method.
        '''

        while self.isalive():
            r, w, e = self.__select([self.child_fd, self.STDIN_FILENO], [], [])
            if self.child_fd in r:
                try:
                    data = self.__interact_read(self.child_fd)
                except OSError as err:
                    if err.args[0] == errno.EIO:
                        # Linux-style EOF
                        break
                    raise
                if data == b'':
                    # BSD-style EOF
                    break
                if output_filter:
                    data = output_filter(data)
                if self.logfile is not None:
                    self.logfile.write(data)
                    self.logfile.flush()
                os.write(self.STDOUT_FILENO, data)
            if self.STDIN_FILENO in r:
                data = self.__interact_read(self.STDIN_FILENO)
                if input_filter:
                    data = input_filter(data)
                i = data.rfind(escape_character)
                if i != -1:
                    data = data[:i]
                    self.__interact_writen(self.child_fd, data)
                    break
                self.__interact_writen(self.child_fd, data)
项目:Repobot    作者:Desgard    | 项目源码 | 文件源码
def interact(self, escape_character=chr(29),
            input_filter=None, output_filter=None):

        '''This gives control of the child process to the interactive user (the
        human at the keyboard). Keystrokes are sent to the child process, and
        the stdout and stderr output of the child process is printed. This
        simply echos the child stdout and child stderr to the real stdout and
        it echos the real stdin to the child stdin. When the user types the
        escape_character this method will return None. The escape_character
        will not be transmitted.  The default for escape_character is
        entered as ``Ctrl - ]``, the very same as BSD telnet. To prevent
        escaping, escape_character may be set to None.

        If a logfile is specified, then the data sent and received from the
        child process in interact mode is duplicated to the given log.

        You may pass in optional input and output filter functions. These
        functions should take a string and return a string. The output_filter
        will be passed all the output from the child process. The input_filter
        will be passed all the keyboard input from the user. The input_filter
        is run BEFORE the check for the escape_character.

        Note that if you change the window size of the parent the SIGWINCH
        signal will not be passed through to the child. If you want the child
        window size to change when the parent's window size changes then do
        something like the following example::

            import pexpect, struct, fcntl, termios, signal, sys
            def sigwinch_passthrough (sig, data):
                s = struct.pack("HHHH", 0, 0, 0, 0)
                a = struct.unpack('hhhh', fcntl.ioctl(sys.stdout.fileno(),
                    termios.TIOCGWINSZ , s))
                global p
                p.setwinsize(a[0],a[1])
            # Note this 'p' global and used in sigwinch_passthrough.
            p = pexpect.spawn('/bin/bash')
            signal.signal(signal.SIGWINCH, sigwinch_passthrough)
            p.interact()
        '''

        # Flush the buffer.
        self.write_to_stdout(self.buffer)
        self.stdout.flush()
        self.buffer = self.string_type()
        mode = tty.tcgetattr(self.STDIN_FILENO)
        tty.setraw(self.STDIN_FILENO)
        if escape_character is not None and PY3:
            escape_character = escape_character.encode('latin-1')
        try:
            self.__interact_copy(escape_character, input_filter, output_filter)
        finally:
            tty.tcsetattr(self.STDIN_FILENO, tty.TCSAFLUSH, mode)
项目:Repobot    作者:Desgard    | 项目源码 | 文件源码
def __interact_copy(self, escape_character=None,
            input_filter=None, output_filter=None):

        '''This is used by the interact() method.
        '''

        while self.isalive():
            r, w, e = select_ignore_interrupts([self.child_fd, self.STDIN_FILENO], [], [])
            if self.child_fd in r:
                try:
                    data = self.__interact_read(self.child_fd)
                except OSError as err:
                    if err.args[0] == errno.EIO:
                        # Linux-style EOF
                        break
                    raise
                if data == b'':
                    # BSD-style EOF
                    break
                if output_filter:
                    data = output_filter(data)
                self._log(data, 'read')
                os.write(self.STDOUT_FILENO, data)
            if self.STDIN_FILENO in r:
                data = self.__interact_read(self.STDIN_FILENO)
                if input_filter:
                    data = input_filter(data)
                i = -1
                if escape_character is not None:
                    i = data.rfind(escape_character)
                if i != -1:
                    data = data[:i]
                    if data:
                        self._log(data, 'send')
                    self.__interact_writen(self.child_fd, data)
                    break
                self._log(data, 'send')
                self.__interact_writen(self.child_fd, data)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def setUp(self):
        self.orig_stdin_fileno = pty.STDIN_FILENO
        self.orig_stdout_fileno = pty.STDOUT_FILENO
        self.orig_pty_select = pty.select
        self.fds = []  # A list of file descriptors to close.
        self.select_rfds_lengths = []
        self.select_rfds_results = []
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def tearDown(self):
        pty.STDIN_FILENO = self.orig_stdin_fileno
        pty.STDOUT_FILENO = self.orig_stdout_fileno
        pty.select = self.orig_pty_select
        for fd in self.fds:
            try:
                os.close(fd)
            except:
                pass
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test__copy_to_each(self):
        """Test the normal data case on both master_fd and stdin."""
        read_from_stdout_fd, mock_stdout_fd = self._pipe()
        pty.STDOUT_FILENO = mock_stdout_fd
        mock_stdin_fd, write_to_stdin_fd = self._pipe()
        pty.STDIN_FILENO = mock_stdin_fd
        socketpair = socket.socketpair()
        masters = [s.fileno() for s in socketpair]
        self.fds.extend(masters)

        # Feed data.  Smaller than PIPEBUF.  These writes will not block.
        os.write(masters[1], b'from master')
        os.write(write_to_stdin_fd, b'from stdin')

        # Expect two select calls, the last one will cause IndexError
        pty.select = self._mock_select
        self.select_rfds_lengths.append(2)
        self.select_rfds_results.append([mock_stdin_fd, masters[0]])
        self.select_rfds_lengths.append(2)

        with self.assertRaises(IndexError):
            pty._copy(masters[0])

        # Test that the right data went to the right places.
        rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0]
        self.assertEqual([read_from_stdout_fd, masters[1]], rfds)
        self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master')
        self.assertEqual(os.read(masters[1], 20), b'from stdin')
项目:pipenv    作者:pypa    | 项目源码 | 文件源码
def __interact_copy(self, escape_character=None,
            input_filter=None, output_filter=None):

        '''This is used by the interact() method.
        '''

        while self.isalive():
            r, w, e = select_ignore_interrupts([self.child_fd, self.STDIN_FILENO], [], [])
            if self.child_fd in r:
                try:
                    data = self.__interact_read(self.child_fd)
                except OSError as err:
                    if err.args[0] == errno.EIO:
                        # Linux-style EOF
                        break
                    raise
                if data == b'':
                    # BSD-style EOF
                    break
                if output_filter:
                    data = output_filter(data)
                self._log(data, 'read')
                os.write(self.STDOUT_FILENO, data)
            if self.STDIN_FILENO in r:
                data = self.__interact_read(self.STDIN_FILENO)
                if input_filter:
                    data = input_filter(data)
                i = -1
                if escape_character is not None:
                    i = data.rfind(escape_character)
                if i != -1:
                    data = data[:i]
                    if data:
                        self._log(data, 'send')
                    self.__interact_writen(self.child_fd, data)
                    break
                self._log(data, 'send')
                self.__interact_writen(self.child_fd, data)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def setUp(self):
        self.orig_stdin_fileno = pty.STDIN_FILENO
        self.orig_stdout_fileno = pty.STDOUT_FILENO
        self.orig_pty_select = pty.select
        self.fds = []  # A list of file descriptors to close.
        self.files = []
        self.select_rfds_lengths = []
        self.select_rfds_results = []
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def tearDown(self):
        pty.STDIN_FILENO = self.orig_stdin_fileno
        pty.STDOUT_FILENO = self.orig_stdout_fileno
        pty.select = self.orig_pty_select
        for file in self.files:
            try:
                file.close()
            except OSError:
                pass
        for fd in self.fds:
            try:
                os.close(fd)
            except OSError:
                pass
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test__copy_to_each(self):
        """Test the normal data case on both master_fd and stdin."""
        read_from_stdout_fd, mock_stdout_fd = self._pipe()
        pty.STDOUT_FILENO = mock_stdout_fd
        mock_stdin_fd, write_to_stdin_fd = self._pipe()
        pty.STDIN_FILENO = mock_stdin_fd
        socketpair = self._socketpair()
        masters = [s.fileno() for s in socketpair]

        # Feed data.  Smaller than PIPEBUF.  These writes will not block.
        os.write(masters[1], b'from master')
        os.write(write_to_stdin_fd, b'from stdin')

        # Expect two select calls, the last one will cause IndexError
        pty.select = self._mock_select
        self.select_rfds_lengths.append(2)
        self.select_rfds_results.append([mock_stdin_fd, masters[0]])
        self.select_rfds_lengths.append(2)

        with self.assertRaises(IndexError):
            pty._copy(masters[0])

        # Test that the right data went to the right places.
        rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0]
        self.assertEqual([read_from_stdout_fd, masters[1]], rfds)
        self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master')
        self.assertEqual(os.read(masters[1], 20), b'from stdin')
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def setUp(self):
        self.orig_stdin_fileno = pty.STDIN_FILENO
        self.orig_stdout_fileno = pty.STDOUT_FILENO
        self.orig_pty_select = pty.select
        self.fds = []  # A list of file descriptors to close.
        self.select_rfds_lengths = []
        self.select_rfds_results = []
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def tearDown(self):
        pty.STDIN_FILENO = self.orig_stdin_fileno
        pty.STDOUT_FILENO = self.orig_stdout_fileno
        pty.select = self.orig_pty_select
        for fd in self.fds:
            try:
                os.close(fd)
            except:
                pass
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test__copy_to_each(self):
        """Test the normal data case on both master_fd and stdin."""
        read_from_stdout_fd, mock_stdout_fd = self._pipe()
        pty.STDOUT_FILENO = mock_stdout_fd
        mock_stdin_fd, write_to_stdin_fd = self._pipe()
        pty.STDIN_FILENO = mock_stdin_fd
        socketpair = socket.socketpair()
        masters = [s.fileno() for s in socketpair]
        self.fds.extend(masters)

        # Feed data.  Smaller than PIPEBUF.  These writes will not block.
        os.write(masters[1], b'from master')
        os.write(write_to_stdin_fd, b'from stdin')

        # Expect two select calls, the last one will cause IndexError
        pty.select = self._mock_select
        self.select_rfds_lengths.append(2)
        self.select_rfds_results.append([mock_stdin_fd, masters[0]])
        self.select_rfds_lengths.append(2)

        with self.assertRaises(IndexError):
            pty._copy(masters[0])

        # Test that the right data went to the right places.
        rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0]
        self.assertEqual([read_from_stdout_fd, masters[1]], rfds)
        self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master')
        self.assertEqual(os.read(masters[1], 20), b'from stdin')
项目:ssh-tunnel    作者:aalku    | 项目源码 | 文件源码
def __fork_pty(self):
        '''This implements a substitute for the forkpty system call. This
        should be more portable than the pty.fork() function. Specifically,
        this should work on Solaris.

        Modified 10.06.05 by Geoff Marshall: Implemented __fork_pty() method to
        resolve the issue with Python's pty.fork() not supporting Solaris,
        particularly ssh. Based on patch to posixmodule.c authored by Noah
        Spurrier::

            http://mail.python.org/pipermail/python-dev/2003-May/035281.html

        '''

        parent_fd, child_fd = os.openpty()
        if parent_fd < 0 or child_fd < 0:
            raise ExceptionPexpect("Could not open with os.openpty().")

        pid = os.fork()
        if pid == pty.CHILD:
            # Child.
            os.close(parent_fd)
            self.__pty_make_controlling_tty(child_fd)

            os.dup2(child_fd, self.STDIN_FILENO)
            os.dup2(child_fd, self.STDOUT_FILENO)
            os.dup2(child_fd, self.STDERR_FILENO)

        else:
            # Parent.
            os.close(child_fd)

        return pid, parent_fd
项目:ssh-tunnel    作者:aalku    | 项目源码 | 文件源码
def __interact_copy(self, escape_character=None,
            input_filter=None, output_filter=None):

        '''This is used by the interact() method.
        '''

        while self.isalive():
            r, w, e = self.__select([self.child_fd, self.STDIN_FILENO], [], [])
            if self.child_fd in r:
                try:
                    data = self.__interact_read(self.child_fd)
                except OSError as err:
                    if err.args[0] == errno.EIO:
                        # Linux-style EOF
                        break
                    raise
                if data == b'':
                    # BSD-style EOF
                    break
                if output_filter:
                    data = output_filter(data)
                if self.logfile is not None:
                    self.logfile.write(data)
                    self.logfile.flush()
                os.write(self.STDOUT_FILENO, data)
            if self.STDIN_FILENO in r:
                data = self.__interact_read(self.STDIN_FILENO)
                if input_filter:
                    data = input_filter(data)
                i = data.rfind(escape_character)
                if i != -1:
                    data = data[:i]
                    self.__interact_writen(self.child_fd, data)
                    break
                self.__interact_writen(self.child_fd, data)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def setUp(self):
        self.orig_stdin_fileno = pty.STDIN_FILENO
        self.orig_stdout_fileno = pty.STDOUT_FILENO
        self.orig_pty_select = pty.select
        self.fds = []  # A list of file descriptors to close.
        self.files = []
        self.select_rfds_lengths = []
        self.select_rfds_results = []
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def tearDown(self):
        pty.STDIN_FILENO = self.orig_stdin_fileno
        pty.STDOUT_FILENO = self.orig_stdout_fileno
        pty.select = self.orig_pty_select
        for file in self.files:
            try:
                file.close()
            except OSError:
                pass
        for fd in self.fds:
            try:
                os.close(fd)
            except OSError:
                pass
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test__copy_to_each(self):
        """Test the normal data case on both master_fd and stdin."""
        read_from_stdout_fd, mock_stdout_fd = self._pipe()
        pty.STDOUT_FILENO = mock_stdout_fd
        mock_stdin_fd, write_to_stdin_fd = self._pipe()
        pty.STDIN_FILENO = mock_stdin_fd
        socketpair = self._socketpair()
        masters = [s.fileno() for s in socketpair]

        # Feed data.  Smaller than PIPEBUF.  These writes will not block.
        os.write(masters[1], b'from master')
        os.write(write_to_stdin_fd, b'from stdin')

        # Expect two select calls, the last one will cause IndexError
        pty.select = self._mock_select
        self.select_rfds_lengths.append(2)
        self.select_rfds_results.append([mock_stdin_fd, masters[0]])
        self.select_rfds_lengths.append(2)

        with self.assertRaises(IndexError):
            pty._copy(masters[0])

        # Test that the right data went to the right places.
        rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0]
        self.assertEqual([read_from_stdout_fd, masters[1]], rfds)
        self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master')
        self.assertEqual(os.read(masters[1], 20), b'from stdin')
项目:yatta_reader    作者:sound88    | 项目源码 | 文件源码
def __interact_copy(self, escape_character=None,
            input_filter=None, output_filter=None):

        '''This is used by the interact() method.
        '''

        while self.isalive():
            r, w, e = select_ignore_interrupts([self.child_fd, self.STDIN_FILENO], [], [])
            if self.child_fd in r:
                try:
                    data = self.__interact_read(self.child_fd)
                except OSError as err:
                    if err.args[0] == errno.EIO:
                        # Linux-style EOF
                        break
                    raise
                if data == b'':
                    # BSD-style EOF
                    break
                if output_filter:
                    data = output_filter(data)
                self._log(data, 'read')
                os.write(self.STDOUT_FILENO, data)
            if self.STDIN_FILENO in r:
                data = self.__interact_read(self.STDIN_FILENO)
                if input_filter:
                    data = input_filter(data)
                i = -1
                if escape_character is not None:
                    i = data.rfind(escape_character)
                if i != -1:
                    data = data[:i]
                    if data:
                        self._log(data, 'send')
                    self.__interact_writen(self.child_fd, data)
                    break
                self._log(data, 'send')
                self.__interact_writen(self.child_fd, data)