Python pty 模块,openpty() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用pty.openpty()

项目:deb-python-dcos    作者:openstack    | 项目源码 | 文件源码
def popen_tty(cmd):
    """Open a process with stdin connected to a pseudo-tty.  Returns a

    :param cmd: command to run
    :type cmd: str
    :returns: (Popen, master) tuple, where master is the master side
       of the of the tty-pair.  It is the responsibility of the caller
       to close the master fd, and to perform any cleanup (including
       waiting for completion) of the Popen object.
    :rtype: (Popen, int)

    """

    import pty
    master, slave = pty.openpty()
    proc = subprocess.Popen(cmd,
                            stdin=slave,
                            stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE,
                            preexec_fn=os.setsid,
                            close_fds=True,
                            shell=True)
    os.close(slave)

    return (proc, master)
项目:pyrepl    作者:dajose    | 项目源码 | 文件源码
def test_signal_failure(monkeypatch):
    import os
    import pty
    import signal
    from pyrepl.unix_console import UnixConsole

    def failing_signal(a, b):
        raise ValueError

    def really_failing_signal(a, b):
        raise AssertionError

    mfd, sfd = pty.openpty()
    try:
        c = UnixConsole(sfd, sfd)
        c.prepare()
        c.restore()
        monkeypatch.setattr(signal, 'signal', failing_signal)
        c.prepare()
        monkeypatch.setattr(signal, 'signal', really_failing_signal)
        c.restore()
    finally:
        os.close(mfd)
        os.close(sfd)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_ioctl_signed_unsigned_code_param(self):
        if not pty:
            raise unittest.SkipTest('pty module required')
        mfd, sfd = pty.openpty()
        try:
            if termios.TIOCSWINSZ < 0:
                set_winsz_opcode_maybe_neg = termios.TIOCSWINSZ
                set_winsz_opcode_pos = termios.TIOCSWINSZ & 0xffffffff
            else:
                set_winsz_opcode_pos = termios.TIOCSWINSZ
                set_winsz_opcode_maybe_neg, = struct.unpack("i",
                        struct.pack("I", termios.TIOCSWINSZ))

            our_winsz = struct.pack("HHHH",80,25,0,0)
            # test both with a positive and potentially negative ioctl code
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_pos, our_winsz)
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_maybe_neg, our_winsz)
        finally:
            os.close(mfd)
            os.close(sfd)
项目:GroundStation    作者:ClydeSpace-GroundStation    | 项目源码 | 文件源码
def __init__(self):
        gr.sync_block.__init__(self,
                               name="hdlc_to_ax25",
                               in_sig=None,
                               out_sig=None)

        (self.pty_master, self.pty_slave) = pty.openpty()
        tty = os.ttyname(self.pty_slave)

        if os.path.islink('/tmp/kiss_pty'):
            os.unlink('/tmp/kiss_pty')
        os.symlink(tty, '/tmp/kiss_pty')

        print 'KISS PTY is: /tmp/kiss_pty (%s)' % os.ttyname(self.pty_slave)

        self.message_port_register_in(pmt.intern('in'))
        self.set_msg_handler(pmt.intern('in'), self.handle_msg)
        self.count = 0
        self.dropped = 0
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_ioctl_signed_unsigned_code_param(self):
        if not pty:
            raise unittest.SkipTest('pty module required')
        mfd, sfd = pty.openpty()
        try:
            if termios.TIOCSWINSZ < 0:
                set_winsz_opcode_maybe_neg = termios.TIOCSWINSZ
                set_winsz_opcode_pos = termios.TIOCSWINSZ & 0xffffffffL
            else:
                set_winsz_opcode_pos = termios.TIOCSWINSZ
                set_winsz_opcode_maybe_neg, = struct.unpack("i",
                        struct.pack("I", termios.TIOCSWINSZ))

            our_winsz = struct.pack("HHHH",80,25,0,0)
            # test both with a positive and potentially negative ioctl code
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_pos, our_winsz)
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_maybe_neg, our_winsz)
        finally:
            os.close(mfd)
            os.close(sfd)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_ioctl_signed_unsigned_code_param(self):
        if not pty:
            raise unittest.SkipTest('pty module required')
        mfd, sfd = pty.openpty()
        try:
            if termios.TIOCSWINSZ < 0:
                set_winsz_opcode_maybe_neg = termios.TIOCSWINSZ
                set_winsz_opcode_pos = termios.TIOCSWINSZ & 0xffffffffL
            else:
                set_winsz_opcode_pos = termios.TIOCSWINSZ
                set_winsz_opcode_maybe_neg, = struct.unpack("i",
                        struct.pack("I", termios.TIOCSWINSZ))

            our_winsz = struct.pack("HHHH",80,25,0,0)
            # test both with a positive and potentially negative ioctl code
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_pos, our_winsz)
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_maybe_neg, our_winsz)
        finally:
            os.close(mfd)
            os.close(sfd)
项目:pwndemo    作者:zh-explorer    | 项目源码 | 文件源码
def _handles(stdin, stdout, stderr):
        master = None

        if stdout is PTY:
            # Normally we could just use subprocess.PIPE and be happy.
            # Unfortunately, this results in undesired behavior when
            # printf() and similar functions buffer data instead of
            # sending it directly.
            #
            # By opening a PTY for STDOUT, the libc routines will not
            # buffer any data on STDOUT.
            master, slave = pty.openpty()

            # By making STDOUT a PTY, the OS will attempt to interpret
            # terminal control codes.  We don't want this, we want all
            # input passed exactly and perfectly to the process.
            tty.setraw(master)
            tty.setraw(slave)

            # Pick one side of the pty to pass to the child
            stdout = slave

        return stdin, stdout, stderr, master
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_ioctl_signed_unsigned_code_param(self):
        if not pty:
            raise unittest.SkipTest('pty module required')
        mfd, sfd = pty.openpty()
        try:
            if termios.TIOCSWINSZ < 0:
                set_winsz_opcode_maybe_neg = termios.TIOCSWINSZ
                set_winsz_opcode_pos = termios.TIOCSWINSZ & 0xffffffff
            else:
                set_winsz_opcode_pos = termios.TIOCSWINSZ
                set_winsz_opcode_maybe_neg, = struct.unpack("i",
                        struct.pack("I", termios.TIOCSWINSZ))

            our_winsz = struct.pack("HHHH",80,25,0,0)
            # test both with a positive and potentially negative ioctl code
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_pos, our_winsz)
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_maybe_neg, our_winsz)
        finally:
            os.close(mfd)
            os.close(sfd)
项目:needle    作者:mwrlabs    | 项目源码 | 文件源码
def command_background_start(self, cmd):
        """Run a background command: run it in a new thread and resume execution immediately."""
        self.printer.debug('[LOCAL CMD] Local Background Command: %s' % cmd)

        def daemon(cmd):
            """Daemon used to run the command so to avoid blocking the UI"""
            # Run command
            master, slave = pty.openpty()
            proc = subprocess.Popen(cmd, shell=True, stdout=slave, stderr=slave, close_fds=True)
            stdout = os.fdopen(master)
            self.printer.info("Monitoring in background...Kill this process when you want to see the dumped content")

        # Run command in a thread
        d = threading.Thread(name='daemon', target=daemon, args=(cmd,))
        d.setDaemon(True)
        d.start()
        time.sleep(2)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_ioctl_signed_unsigned_code_param(self):
        if not pty:
            raise unittest.SkipTest('pty module required')
        mfd, sfd = pty.openpty()
        try:
            if termios.TIOCSWINSZ < 0:
                set_winsz_opcode_maybe_neg = termios.TIOCSWINSZ
                set_winsz_opcode_pos = termios.TIOCSWINSZ & 0xffffffffL
            else:
                set_winsz_opcode_pos = termios.TIOCSWINSZ
                set_winsz_opcode_maybe_neg, = struct.unpack("i",
                        struct.pack("I", termios.TIOCSWINSZ))

            our_winsz = struct.pack("HHHH",80,25,0,0)
            # test both with a positive and potentially negative ioctl code
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_pos, our_winsz)
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_maybe_neg, our_winsz)
        finally:
            os.close(mfd)
            os.close(sfd)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_ioctl_signed_unsigned_code_param(self):
        if not pty:
            raise unittest.SkipTest('pty module required')
        mfd, sfd = pty.openpty()
        try:
            if termios.TIOCSWINSZ < 0:
                set_winsz_opcode_maybe_neg = termios.TIOCSWINSZ
                set_winsz_opcode_pos = termios.TIOCSWINSZ & 0xffffffff
            else:
                set_winsz_opcode_pos = termios.TIOCSWINSZ
                set_winsz_opcode_maybe_neg, = struct.unpack("i",
                        struct.pack("I", termios.TIOCSWINSZ))

            our_winsz = struct.pack("HHHH",80,25,0,0)
            # test both with a positive and potentially negative ioctl code
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_pos, our_winsz)
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_maybe_neg, our_winsz)
        finally:
            os.close(mfd)
            os.close(sfd)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_ioctl_signed_unsigned_code_param(self):
        if not pty:
            raise unittest.SkipTest('pty module required')
        mfd, sfd = pty.openpty()
        try:
            if termios.TIOCSWINSZ < 0:
                set_winsz_opcode_maybe_neg = termios.TIOCSWINSZ
                set_winsz_opcode_pos = termios.TIOCSWINSZ & 0xffffffffL
            else:
                set_winsz_opcode_pos = termios.TIOCSWINSZ
                set_winsz_opcode_maybe_neg, = struct.unpack("i",
                        struct.pack("I", termios.TIOCSWINSZ))

            our_winsz = struct.pack("HHHH",80,25,0,0)
            # test both with a positive and potentially negative ioctl code
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_pos, our_winsz)
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_maybe_neg, our_winsz)
        finally:
            os.close(mfd)
            os.close(sfd)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_ioctl_signed_unsigned_code_param(self):
        if not pty:
            raise unittest.SkipTest('pty module required')
        mfd, sfd = pty.openpty()
        try:
            if termios.TIOCSWINSZ < 0:
                set_winsz_opcode_maybe_neg = termios.TIOCSWINSZ
                set_winsz_opcode_pos = termios.TIOCSWINSZ & 0xffffffff
            else:
                set_winsz_opcode_pos = termios.TIOCSWINSZ
                set_winsz_opcode_maybe_neg, = struct.unpack("i",
                        struct.pack("I", termios.TIOCSWINSZ))

            our_winsz = struct.pack("HHHH",80,25,0,0)
            # test both with a positive and potentially negative ioctl code
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_pos, our_winsz)
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_maybe_neg, our_winsz)
        finally:
            os.close(mfd)
            os.close(sfd)
项目:ansible-provider-docs    作者:alibaba    | 项目源码 | 文件源码
def _do_it(self, action):

        master, slave = pty.openpty()
        p = subprocess.Popen(["ansible-connection"], stdin=slave, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        stdin = os.fdopen(master, 'wb', 0)
        os.close(slave)

        # Need to force a protocol that is compatible with both py2 and py3.
        # That would be protocol=2 or less.
        # Also need to force a protocol that excludes certain control chars as
        # stdin in this case is a pty and control chars will cause problems.
        # that means only protocol=0 will work.
        src = cPickle.dumps(self._play_context.serialize(), protocol=0)
        stdin.write(src)

        stdin.write(b'\n#END_INIT#\n')
        stdin.write(to_bytes(action))
        stdin.write(b'\n\n')

        (stdout, stderr) = p.communicate()
        stdin.close()

        return (p.returncode, stdout, stderr)
项目:dcos-tunnel    作者:dcos    | 项目源码 | 文件源码
def popen_tty(cmd):
    """Open a process with stdin connected to a pseudo-tty.  Returns a

    :param cmd: command to run
    :type cmd: str
    :returns: (Popen, master) tuple, where master is the master side
       of the of the tty-pair.  It is the responsibility of the caller
       to close the master fd, and to perform any cleanup (including
       waiting for completion) of the Popen object.
    :rtype: (Popen, int)

    """
    master, slave = pty.openpty()
    proc = subprocess.Popen(cmd,
                            stdin=slave,
                            stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE,
                            preexec_fn=os.setsid,
                            close_fds=True,
                            shell=True)
    os.close(slave)

    return (proc, master)
项目:black_zone    作者:zh-explorer    | 项目源码 | 文件源码
def _handles(stdin, stdout, stderr):
        master = None

        if stdout is PTY:
            # Normally we could just use subprocess.PIPE and be happy.
            # Unfortunately, this results in undesired behavior when
            # printf() and similar functions buffer data instead of
            # sending it directly.
            #
            # By opening a PTY for STDOUT, the libc routines will not
            # buffer any data on STDOUT.
            master, slave = pty.openpty()

            # By making STDOUT a PTY, the OS will attempt to interpret
            # terminal control codes.  We don't want this, we want all
            # input passed exactly and perfectly to the process.
            tty.setraw(master)
            tty.setraw(slave)

            # Pick one side of the pty to pass to the child
            stdout = slave

        return stdin, stdout, stderr, master
项目:deploy-marathon-bluegreen    作者:softonic    | 项目源码 | 文件源码
def popen_tty(cmd):
    """Open a process with stdin connected to a pseudo-tty.  Returns a

    :param cmd: command to run
    :type cmd: str
    :returns: (Popen, master) tuple, where master is the master side
       of the of the tty-pair.  It is the responsibility of the caller
       to close the master fd, and to perform any cleanup (including
       waiting for completion) of the Popen object.
    :rtype: (Popen, int)

    """
    master, slave = pty.openpty()
    proc = subprocess.Popen(cmd,
                            stdin=slave,
                            stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE,
                            preexec_fn=os.setsid,
                            close_fds=True,
                            shell=True)
    os.close(slave)

    return (proc, master)
项目:binja_dynamics    作者:nccgroup    | 项目源码 | 文件源码
def __init__(self, message_q):
        QThread.__init__(self)
        self.messages = message_q
        # Only the inferior process need use the slave file descriptor
        self.master, self.slave = pty.openpty()
        self.tty = os.ttyname(self.slave)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def getPty(self, term, windowSize, modes):
        self.environ['TERM'] = term
        self.winSize = windowSize
        self.modes = modes
        master, slave = pty.openpty()
        ttyname = os.ttyname(slave)
        self.environ['SSH_TTY'] = ttyname 
        self.ptyTuple = (master, slave, ttyname)
项目:pyTeliumManager    作者:Ousret    | 项目源码 | 文件源码
def __init__(self):
        self._master, self._slave = pty.openpty()
        self._s_name = os.ttyname(self._slave)

        self._fake = Faker()

        self._fake_device = threading.Thread(target=self.__run)
项目:pyrepl    作者:dajose    | 项目源码 | 文件源码
def test_readline():
    master, slave = pty.openpty()
    readline_wrapper = _ReadlineWrapper(slave, slave)
    os.write(master, b'input\n')

    result = readline_wrapper.get_reader().readline()
    assert result == b'input'
    assert isinstance(result, bytes_type)
项目:pyrepl    作者:dajose    | 项目源码 | 文件源码
def test_readline_returns_unicode():
    master, slave = pty.openpty()
    readline_wrapper = _ReadlineWrapper(slave, slave)
    os.write(master, b'input\n')

    result = readline_wrapper.get_reader().readline(returns_unicode=True)
    assert result == 'input'
    assert isinstance(result, unicode_type)
项目:pyrepl    作者:dajose    | 项目源码 | 文件源码
def test_raw_input():
    master, slave = pty.openpty()
    readline_wrapper = _ReadlineWrapper(slave, slave)
    os.write(master, b'input\n')

    result = readline_wrapper.raw_input('prompt:')
    assert result == b'input'
    assert isinstance(result, bytes_type)
项目:OSPTF    作者:xSploited    | 项目源码 | 文件源码
def spawn(self, argv=None, term=None):
        if argv is None:
            if 'SHELL' in os.environ:
                argv = [os.environ['SHELL']]
            elif 'PATH' in os.environ: #searching sh in the path. It can be unusual like /system/bin/sh on android
                for shell in ["bash","sh","ksh","zsh","csh","ash"]:
                    for path in os.environ['PATH'].split(':'):
                        fullpath=os.path.join(path.strip(),shell)
                        if os.path.isfile(fullpath):
                            argv=[fullpath]
                            break
                    if argv:
                        break
        if not argv:
            argv= ['/bin/sh']

        if term is not None:
            os.environ['TERM']=term

        master, slave = pty.openpty()
        self.slave=slave
        self.master = os.fdopen(master, 'rb+wb', 0) # open file in an unbuffered mode
        flags = fcntl.fcntl(self.master, fcntl.F_GETFL)
        assert flags>=0
        flags = fcntl.fcntl(self.master, fcntl.F_SETFL , flags | os.O_NONBLOCK)
        assert flags>=0
        self.prog = subprocess.Popen(
            shell=False,
            args=argv,
            stdin=slave,
            stdout=slave,
            stderr=subprocess.STDOUT,
            preexec_fn=prepare
        )
项目:Prism    作者:Stumblinbear    | 项目源码 | 文件源码
def __init__(self, command=None, return_url=None, restart=False):
        self.command = command
        self.terminal_id = prism.generate_random_string(8)
        self.return_url = return_url
        self.restart = restart
        self.process = TerminalProcess(*pty.openpty())

        logging.info('Terminal Opened: %s' % self.terminal_id)
项目:pupy    作者:ru-faraon    | 项目源码 | 文件源码
def spawn(self, argv=None, term=None):
        if argv is None:
            if 'SHELL' in os.environ:
                argv = [os.environ['SHELL']]
            elif 'PATH' in os.environ: #searching sh in the path. It can be unusual like /system/bin/sh on android
                for shell in ["bash","sh","ksh","zsh","csh","ash"]:
                    for path in os.environ['PATH'].split(':'):
                        fullpath=os.path.join(path.strip(),shell)
                        if os.path.isfile(fullpath):
                            argv=[fullpath]
                            break
                    if argv:
                        break
        if not argv:
            argv= ['/bin/sh']

        if term is not None:
            os.environ['TERM']=term

        master, slave = pty.openpty()
        self.slave=slave
        self.master = os.fdopen(master, 'rb+wb', 0) # open file in an unbuffered mode
        flags = fcntl.fcntl(self.master, fcntl.F_GETFL)
        assert flags>=0
        flags = fcntl.fcntl(self.master, fcntl.F_SETFL , flags | os.O_NONBLOCK)
        assert flags>=0
        self.prog = subprocess.Popen(
            shell=False,
            args=argv,
            stdin=slave,
            stdout=slave,
            stderr=subprocess.STDOUT,
            preexec_fn=prepare
        )
项目:binja_dynamics    作者:ehennenfent    | 项目源码 | 文件源码
def __init__(self, message_q):
        QThread.__init__(self)
        self.messages = message_q
        # Only the inferior process need use the slave file descriptor
        self.master, self.slave = pty.openpty()
        self.tty = os.ttyname(self.slave)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def getPty(self, term, windowSize, modes):
        self.environ['TERM'] = term
        self.winSize = windowSize
        self.modes = modes
        master, slave = pty.openpty()
        ttyname = os.ttyname(slave)
        self.environ['SSH_TTY'] = ttyname 
        self.ptyTuple = (master, slave, ttyname)
项目:vbcg    作者:nspi    | 项目源码 | 文件源码
def __init__(self, name):
        """If possible, connect to serial port"""

        # Try to establish connection to serial port
        try:
            self.serial_connection = serial.Serial(name, 9600)
            self.serial_connection_established = True

        # If it fails because there is no device, use virtual serial port
        except serial.SerialException:

            # Create virtual serial port
            self.master, self.slave = pty.openpty()
            self.vPort = os.ttyname(self.slave)

            # Create instance
            self.serial_connection = serial.Serial(self.vPort, 9600)
            self.serial_connection_established = True

            logging.warn("Trigger device not found -> Using virtual device")

        # Create events
        self.trigger_event = threading.Event()
        self.eventProgramEnd = threading.Event()

        # Store current time
        self.last_trigger_time = time.time()
        self.firstRun = True

        # Call initialization of thread class
        threading.Thread.__init__(self)
项目:dcos    作者:dcos    | 项目源码 | 文件源码
def make_slave_pty():
    master_pty, slave_pty = pty.openpty()
    yield slave_pty
    os.close(slave_pty)
    os.close(master_pty)
项目:freedomfighting    作者:JusticeRage    | 项目源码 | 文件源码
def main():
    s = establish_connection()
    if s is None:
        return -1
    print success("Connection established!")
    daemonize()

    master, slave = pty.openpty()
    bash = subprocess.Popen(SHELL,
                            preexec_fn=os.setsid,
                            stdin=slave,
                            stdout=slave,
                            stderr=slave,
                            universal_newlines=True)
    time.sleep(1)  # Wait for bash to start before sending data to it.
    os.write(master, "%s\n" % FIRST_COMMAND)

    try:
        while bash.poll() is None:
            r, w, e = select.select([s, master], [], [])

            # SSLSockets don't play nice with select because they buffer data internally.
            # Code taken from https://stackoverflow.com/questions/3187565/select-and-ssl-in-python.
            if s in r:
                try:
                    data = s.recv(1024)
                except ssl.SSLError as e:
                    if e.errno == ssl.SSL_ERROR_WANT_READ:
                        continue
                    raise
                if not data:  # End of file.
                    break
                data_left = s.pending()
                while data_left:
                    data += s.recv(data_left)
                    data_left = s.pending()
                os.write(master, data)
            elif master in r:
                s.write(os.read(master, 2048))
    finally:
        s.close()
项目:makemkv    作者:Robpol86    | 项目源码 | 文件源码
def test_rip_error(request, tmpdir):
    """Test failed rips error handling.

    :param request: pytest fixture.
    :param py.path.local tmpdir: pytest fixture.
    """
    # Duplicate ISO.
    iso = tmpdir.join('truncated.iso')
    py.path.local(__file__).dirpath().join('sample.iso').copy(iso)

    # Load the ISO.
    pytest.cdload(iso)

    # Execute.
    output = tmpdir.ensure_dir('output')
    command = ['docker', 'run', '--device=/dev/cdrom', '-v', '{}:/output'.format(output), '-e', 'DEBUG=true',
               'robpol86/makemkv']
    master, slave = pty.openpty()
    request.addfinalizer(lambda: [os.close(master), os.close(slave)])
    proc = subprocess.Popen(command, bufsize=1, cwd=HERE, stderr=subprocess.STDOUT, stdin=slave, stdout=subprocess.PIPE)

    # Read output.
    caught = False
    while proc.poll() is None or proc.stdout.peek(1):
        for line in proc.stdout:
            # Watch for specific line, then truncate ISO.
            if b'Analyzing seamless segments' in line:
                iso.open('w').close()
            elif b'ERROR: One or more titles failed.' in line:
                caught = True
            print(line)  # Write to stdout, pytest will print if test fails.

    # Verify.
    assert proc.poll() > 0
    assert caught is True
    pytest.verify_failed_file(output)
项目:yamllint    作者:adrienverge    | 项目源码 | 文件源码
def test_run_colored_output(self):
        file = os.path.join(self.wd, 'a.yaml')

        # Create a pseudo-TTY and redirect stdout to it
        master, slave = pty.openpty()
        sys.stdout = sys.stderr = os.fdopen(slave, 'w')

        with self.assertRaises(SystemExit) as ctx:
            cli.run((file, ))
        sys.stdout.flush()

        self.assertEqual(ctx.exception.code, 1)

        # Read output from TTY
        output = os.fdopen(master, 'r')
        flag = fcntl.fcntl(master, fcntl.F_GETFD)
        fcntl.fcntl(master, fcntl.F_SETFL, flag | os.O_NONBLOCK)

        out = output.read().replace('\r\n', '\n')

        sys.stdout.close()
        sys.stderr.close()
        output.close()

        self.assertEqual(out, (
            '\033[4m%s\033[0m\n'
            '  \033[2m2:4\033[0m       \033[31merror\033[0m    '
            'trailing spaces  \033[2m(trailing-spaces)\033[0m\n'
            '  \033[2m3:4\033[0m       \033[31merror\033[0m    '
            'no new line character at the end of file  '
            '\033[2m(new-line-at-end-of-file)\033[0m\n'
            '\n' % file))
项目:TouchUI    作者:harbaum    | 项目源码 | 文件源码
def launch_textmode_app(self, executable, name):
        dialog = TextmodeDialog(name, self.w)

        self.app_executable = executable
        master_fd, slave_fd = pty.openpty()
        self.app_process = subprocess.Popen(str(executable), stdout=slave_fd, stderr=slave_fd)
        dialog.poll(self.app_process, master_fd)
        dialog.exec_()
        os.close(master_fd)
        os.close(slave_fd)
项目:jumper-ble-logger    作者:Jumperr-labs    | 项目源码 | 文件源码
def __init__(self, hci_device_number=0, logger=None, events_config=None):
        self._logger = logger or logging.getLogger(__name__)

        self._event_parser = EventParser(config=events_config, logger=self._logger)
        self._agent_events_sender = AgentEventsSender(logger=self._logger)

        self._hci_device_number = hci_device_number
        try:
            subprocess.check_call(['hciconfig', self.hci_device_name, 'down'])
        except subprocess.CalledProcessError:
            self._logger.error('Could not run hciconfig down command for HCI device')
            raise
        self._hci_socket = create_bt_socket_hci_channel_user(hci_device_number)
        self._logger.info('bind to %s complete', self.hci_device_name)

        self._pty_master, pty_slave = pty.openpty()
        self._pty_fd = os.fdopen(self._pty_master, 'rwb')
        hci_tty = os.ttyname(pty_slave)
        self._logger.debug('TTY slave for the virtual HCI: %s', hci_tty)
        try:
            subprocess.check_call(['hciattach', hci_tty, 'any'])
        except subprocess.CalledProcessError:
            self._logger.error('Could not run hciattach on PTY device')
            raise

        self._inputs = [self._pty_fd, self._hci_socket]

        self._pty_buffer = StringIO()  # Used as a seekable stream
        self._gatt_logger = GattLogger(self._logger)
        self._should_stop = False
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def getPty(self, term, windowSize, modes):
        self.environ['TERM'] = term
        self.winSize = windowSize
        self.modes = modes
        master, slave = pty.openpty()
        ttyname = os.ttyname(slave)
        self.environ['SSH_TTY'] = ttyname
        self.ptyTuple = (master, slave, ttyname)
项目:Jackal_Velodyne_Duke    作者:MengGuo    | 项目源码 | 文件源码
def setUp(self):
        # Open PTY
        self.master, self.slave = pty.openpty()
项目:pentesty_goodness    作者:Inf0Junki3    | 项目源码 | 文件源码
def connect(host, port, disable_encryption = False):
    sock            = sctp_socket(family = socket.AF_INET)
    sock.connect((host, port))
    if disable_encryption:
        std_out         = sock.sock().makefile("w")
        std_in          = sock.sock().makefile()
        shell = Popen(os.environ["SHELL"], 
                      stdin     = std_in, 
                      stdout    = std_out, 
                      shell     = True)

    else:
        ssl_sock        = ssl.wrap_socket(sock.sock(),
                                          ssl_version = ssl.PROTOCOL_TLSv1)
    ssl_sock.send("Hi! This is the client. You're connected.\n")

        r, w    = os.pipe()
        std_in  = os.fdopen(r, "r")
        std_out = os.fdopen(w, "w")

        #Set our shell up to use pty, and make the output non-blocking.
        master, slave = pty.openpty()
        shell = Popen(os.environ["SHELL"], 
                      stdin     = PIPE, 
                      stdout    = slave, 
                      shell     = True)
        shell.stdout = os.fdopen(os.dup(master), "r+")
        fd = shell.stdout.fileno()
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        process_connection(ssl_sock, shell.stdin, shell.stdout, disable_encryption)

    sock.close()
项目:pwning-tools    作者:peternguyen93    | 项目源码 | 文件源码
def __start_proc(self):
        # for more information, pls refer :
        # http://rachid.koucha.free.fr/tech_corner/pty_pdip.html
        # open new pty 
        master,slave = pty.openpty()
        # set to raw mode
        tty.setraw(master)
        tty.setraw(slave)

        # binding slave in to subprocess
        self.proc = Popen(self.elf_path,
            stdin=slave,
            stdout=slave,
            stderr=slave,
            close_fds=True,
            preexec_fn=self.preexec_fn,
            env=self.env,
            shell=True
        )
        self.pid = self.proc.pid
        print('[+] Start pid : %d' % self.pid)
        # binding master to own controlled file descriptors
        self.proc.stdin = os.fdopen(os.dup(master),"r+")
        self.proc.stdout = os.fdopen(os.dup(master),"r+")
        self.proc.stderr = os.fdopen(os.dup(master),"r+")
        # close unnessesary file descriptor
        os.close(slave)
        os.close(master)

        # set non-blocking mode
        fd = self.proc.stdout.fileno()
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def test_posix_printengine_tty(self):
                """Test POSIX print engine tty mode."""
                sio = six.StringIO()
                def __drain(masterf):
                        """Drain data from masterf and discard until eof."""
                        while True:
                                chunksz = 1024
                                termdata = masterf.read(chunksz)
                                if len(termdata) < chunksz:
                                        # assume we hit EOF
                                        break
                                print(termdata, file=sio)

                #
                # - Allocate a pty
                # - Create a thread to drain off the master side; without
                #   this, the slave side will block when trying to write.
                # - Connect the printengine to the slave side
                # - Set it running
                #
                (master, slave) = pty.openpty()
                slavef = os.fdopen(slave, "w")
                masterf = os.fdopen(master, "r")

                t = threading.Thread(target=__drain, args=(masterf,))
                t.start()

                printengine.test_posix_printengine(slavef, True)
                slavef.close()

                t.join()
                masterf.close()
                self.assertTrue(len(sio.getvalue()) > 0)
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def __t_pty_tracker(self, trackerclass, **kwargs):
                def __drain(masterf):
                        while True:
                                chunksz = 1024
                                termdata = masterf.read(chunksz)
                                if len(termdata) < chunksz:
                                        # assume we hit EOF
                                        break

                #
                # - Allocate a pty
                # - Create a thread to drain off the master side; without
                #   this, the slave side will block when trying to write.
                # - Connect the prog tracker to the slave side
                # - Set it running
                #
                (master, slave) = pty.openpty()
                slavef = os.fdopen(slave, "w")
                masterf = os.fdopen(master, "rb")

                t = threading.Thread(target=__drain, args=(masterf,))
                t.start()

                p = trackerclass(output_file=slavef, **kwargs)
                progress.test_progress_tracker(p, gofast=True)
                slavef.close()

                t.join()
                masterf.close()
项目:ntf    作者:p4lang    | 项目源码 | 文件源码
def startShell(self, mnopts=None):
        self.stop()

        args = ['docker', 'run', '-ti', '--rm', '--privileged=true']
        args.extend(['--hostname=' + self.name, '--name=' + self.name])
        args.extend(['-e', 'DISPLAY'])
        args.extend(['-v', '/tmp/.X11-unix:/tmp/.X11-unix:ro'])
        print self.port_map
        if self.port_map is not None:
            for p in self.port_map:
                args.extend(['-p', '%d:%d' % (p[0], p[1])])
        if self.fs_map is not None:
            for f in self.fs_map:
                args.extend(['-v', '%s:%s' % (f[0], f[1])])
        args.extend([self.docker_image])

        master, slave = pty.openpty()
        self.shell = subprocess.Popen(
            args,
            stdin=slave,
            stdout=slave,
            stderr=slave,
            close_fds=True,
            preexec_fn=os.setpgrp)
        os.close(slave)
        ttyobj = os.fdopen(master, 'rw')
        self.stdin = ttyobj
        self.stdout = ttyobj
        self.pid = self.shell.pid
        self.pollOut = select.poll()
        self.pollOut.register(self.stdout)
        self.outToNode[self.stdout.fileno()] = self
        self.inToNode[self.stdin.fileno()] = self
        self.execed = False
        self.lastCmd = None
        self.lastPid = None
        self.readbuf = ''
        self.waiting = False

        # Wait for prompt
        time.sleep(1)

        pid_cmd = ['docker', 'inspect', '--format=\'{{ .State.Pid }}\'',
                   self.name]
        pidp = subprocess.Popen(
            pid_cmd,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            close_fds=False)
        pidp.wait()
        ps_out = pidp.stdout.readlines()
        self.pid = int(ps_out[0])
        self.cmd('export PS1=\"\\177\"; printf "\\177"')
        self.cmd('stty -echo; set +m')
项目:makemkv    作者:Robpol86    | 项目源码 | 文件源码
def run(command=None, args=None, output=None, image_id=None, environ=None, cwd=None):
    """Run a command and return the output. Supports string and py.path paths.

    :raise CalledProcessError: Command exits non-zero.

    :param iter command: Command to run.
    :param iter args: List of command line arguments to insert to command.
    :param str output: Path to bind mount to /output when `command` is None.
    :param str image_id: Docker image to use instead of robpol86/makemkv.
    :param dict environ: Environment variables to set/override in the command.
    :param str cwd: Current working directory. Default is tests directory.

    :return: Command stdout and stderr output.
    :rtype: tuple
    """
    if command is None:
        command = ['docker', 'run', '--device=/dev/cdrom', '-e', 'DEBUG=true', image_id or 'robpol86/makemkv']
        if output:
            command = command[:-1] + ['-v', '{}:/output'.format(output)] + command[-1:]
    if args:
        command = command[:-1] + list(args) + command[-1:]

    env = os.environ.copy()
    if environ:
        env.update(environ)

    # Simulate stdin pty so 'docker run -it' doesn't complain.
    if command[0] in ('bash', 'docker'):
        master, slave = pty.openpty()
    else:
        master, slave = 0, 0

    # Determine timeout.
    if command[0] in ('docker',):
        timeout = 120
    else:
        timeout = 30

    # Run command.
    try:
        result = subprocess.run(
            [str(i) for i in command], check=True, cwd=cwd or HERE, env=env,
            stderr=subprocess.PIPE, stdin=slave or None, stdout=subprocess.PIPE,
            timeout=timeout
        )
    finally:
        if slave:
            os.close(slave)
        if master:
            os.close(master)

    return result.stdout, result.stderr
项目:ops_agent    作者:sjqzhang    | 项目源码 | 文件源码
def runShell(script, env=None, cwd="/", user=None, base64_encode=False):
    # check script
    import pwd, grp
    import pty

    if not script:
        return 0, ""

    try:
        # base64 decode
        if base64_encode:
            decode_script = base64.decodestring(script)
        else:
            decode_script = script

        # check user
        logger.info(decode_script)
        cur_user = pwd.getpwuid(os.getuid())[0]
        if not user or user == cur_user:
            shell_list = ["bash", "-c", decode_script.encode('utf8')]
        else:
            shell_list = ["su",  user, "-c", decode_script.encode('utf8'), "-s", "/bin/bash"]
        master_fd, slave_fd = pty.openpty()
        proc = Popen(shell_list, shell=False, universal_newlines=True, bufsize=1,
                     stdout=slave_fd, stderr=STDOUT, env=env, cwd=cwd, close_fds=True)
    except Exception, e:
        logger.error(e)
        return (1, str(e))

    timeout = .1
    outputs = ''
    while True:
        try:
            ready, _, _ = gs.select([master_fd], [], [], timeout)
        except gs.error  as ex:
            if ex[0] == 4:
                continue
            else:
                raise
        if ready:
            data = os.read(master_fd, 512)
            outputs += data
            if not data:
                break
        elif proc.poll() is not None:
            break
    os.close(slave_fd)
    os.close(master_fd)
    proc.wait()
    status = proc.returncode
    return (status, outputs)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def __init__(self, reactor, executable, args, environment, path, proto,
                 uid=None, gid=None, usePTY=None):
        """
        Spawn an operating-system process.

        This is where the hard work of disconnecting all currently open
        files / forking / executing the new process happens.  (This is
        executed automatically when a Process is instantiated.)

        This will also run the subprocess as a given user ID and group ID, if
        specified.  (Implementation Note: this doesn't support all the arcane
        nuances of setXXuid on UNIX: it will assume that either your effective
        or real UID is 0.)
        """
        if pty is None and not isinstance(usePTY, (tuple, list)):
            # no pty module and we didn't get a pty to use
            raise NotImplementedError(
                "cannot use PTYProcess on platforms without the pty module.")
        abstract.FileDescriptor.__init__(self, reactor)
        _BaseProcess.__init__(self, proto)

        if isinstance(usePTY, (tuple, list)):
            masterfd, slavefd, _ = usePTY
        else:
            masterfd, slavefd = pty.openpty()

        try:
            self._fork(path, uid, gid, executable, args, environment,
                       masterfd=masterfd, slavefd=slavefd)
        except:
            if not isinstance(usePTY, (tuple, list)):
                os.close(masterfd)
                os.close(slavefd)
            raise

        # we are now in parent process:
        os.close(slavefd)
        fdesc.setNonBlocking(masterfd)
        self.fd = masterfd
        self.startReading()
        self.connected = 1
        self.status = -1
        try:
            self.proto.makeConnection(self)
        except:
            log.err()
        registerReapProcessHandler(self.pid, self)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def _setupChild(self, masterfd, slavefd):
        """
        Set up child process after C{fork()} but before C{exec()}.

        This involves:

            - closing C{masterfd}, since it is not used in the subprocess

            - creating a new session with C{os.setsid}

            - changing the controlling terminal of the process (and the new
              session) to point at C{slavefd}

            - duplicating C{slavefd} to standard input, output, and error

            - closing all other open file descriptors (according to
              L{_listOpenFDs})

            - re-setting all signal handlers to C{SIG_DFL}

        @param masterfd: The master end of a PTY file descriptors opened with
            C{openpty}.
        @type masterfd: L{int}

        @param slavefd: The slave end of a PTY opened with C{openpty}.
        @type slavefd: L{int}
        """
        os.close(masterfd)
        os.setsid()
        fcntl.ioctl(slavefd, termios.TIOCSCTTY, '')

        for fd in range(3):
            if fd != slavefd:
                os.close(fd)

        os.dup2(slavefd, 0) # stdin
        os.dup2(slavefd, 1) # stdout
        os.dup2(slavefd, 2) # stderr

        for fd in _listOpenFDs():
            if fd > 2:
                try:
                    os.close(fd)
                except:
                    pass

        self._resetSignalDisposition()
项目:ci    作者:ros2    | 项目源码 | 文件源码
def _execute_process_pty(cmd, cwd, env, shell, stderr_to_stdout=True):
    stdout_master, stdout_slave = None, None
    stderr_master, stderr_slave = None, None
    fds_to_close = [stdout_master, stdout_slave, stderr_master, stderr_slave]
    try:
        stdout_master, stdout_slave = pty.openpty()
        if stderr_to_stdout:
            stderr_master, stderr_slave = stdout_master, stdout_slave
        else:
            stderr_master, stderr_slave = pty.openpty()

        p = None
        while p is None:
            try:
                p = Popen(
                    cmd,
                    stdin=stdout_slave, stdout=stderr_slave, stderr=STDOUT,
                    cwd=cwd, env=env, shell=shell, close_fds=False)
            except OSError as exc:
                # This can happen if a file you are trying to execute is being
                # written to simultaneously on Linux
                # (doesn't appear to happen on OS X)
                # It seems like the best strategy is to just try again later
                # Worst case is that the file eventually gets deleted, then a
                # different OSError would occur.
                if 'Text file busy' in '{0}'.format(exc):
                    # This is a transient error, try again shortly
                    time.sleep(0.01)
                    continue
                raise
        # This causes the below select to exit when the subprocess closes.
        # On Linux, this sometimes causes Errno 5 OSError's when os.read
        # is called from within _yield_data, so on Linux _yield_data
        # catches and passes on that particular OSError.
        os.close(stdout_slave)
        if not stderr_to_stdout:
            os.close(stderr_slave)

        left_overs = {stdout_master: b'', stderr_master: b''}

        fds = [stdout_master]
        if stderr_master != stdout_master:
            fds.append(stderr_master)
    finally:
        # Make sure we don't leak file descriptors
        _close_fds(fds_to_close)

    # The linesep with pty's always seems to be "\r\n", even on OS X
    return _yield_data(p, fds, left_overs, "\r\n", fds_to_close)
项目:containernet    作者:containernet    | 项目源码 | 文件源码
def startShell( self, mnopts=None ):
        "Start a shell process for running commands"
        if self.shell:
            error( "%s: shell is already running\n" % self.name )
            return
        # mnexec: (c)lose descriptors, (d)etach from tty,
        # (p)rint pid, and run in (n)amespace
        opts = '-cd' if mnopts is None else mnopts
        if self.inNamespace:
            opts += 'n'
        # bash -i: force interactive
        # -s: pass $* to shell, and make process easy to find in ps
        # prompt is set to sentinel chr( 127 )
        cmd = [ 'mnexec', opts, 'env', 'PS1=' + chr( 127 ),
                'bash', '--norc', '-is', 'mininet:' + self.name ]
        # Spawn a shell subprocess in a pseudo-tty, to disable buffering
        # in the subprocess and insulate it from signals (e.g. SIGINT)
        # received by the parent
        master, slave = pty.openpty()
        self.shell = self._popen( cmd, stdin=slave, stdout=slave, stderr=slave,
                                  close_fds=False )
        self.stdin = os.fdopen( master, 'rw' )
        self.stdout = self.stdin
        self.pid = self.shell.pid
        self.pollOut = select.poll()
        self.pollOut.register( self.stdout )
        # Maintain mapping between file descriptors and nodes
        # This is useful for monitoring multiple nodes
        # using select.poll()
        self.outToNode[ self.stdout.fileno() ] = self
        self.inToNode[ self.stdin.fileno() ] = self
        self.execed = False
        self.lastCmd = None
        self.lastPid = None
        self.readbuf = ''
        # Wait for prompt
        while True:
            data = self.read( 1024 )
            if data[ -1 ] == chr( 127 ):
                break
            self.pollOut.poll()
        self.waiting = False
        # +m: disable job control notification
        self.cmd( 'unset HISTFILE; stty -echo; set +m' )
项目:containernet    作者:containernet    | 项目源码 | 文件源码
def startShell( self, *args, **kwargs ):
        "Start a shell process for running commands"
        if self.shell:
            error( "%s: shell is already running\n" % self.name )
            return
        # mnexec: (c)lose descriptors, (d)etach from tty,
        # (p)rint pid, and run in (n)amespace
        # opts = '-cd' if mnopts is None else mnopts
        # if self.inNamespace:
        #     opts += 'n'
        # bash -i: force interactive
        # -s: pass $* to shell, and make process easy to find in ps
        # prompt is set to sentinel chr( 127 )
        cmd = [ 'docker', 'exec', '-it',  '%s.%s' % ( self.dnameprefix, self.name ), 'env', 'PS1=' + chr( 127 ),
                'bash', '--norc', '-is', 'mininet:' + self.name ]
        # Spawn a shell subprocess in a pseudo-tty, to disable buffering
        # in the subprocess and insulate it from signals (e.g. SIGINT)
        # received by the parent
        master, slave = pty.openpty()
        self.shell = self._popen( cmd, stdin=slave, stdout=slave, stderr=slave,
                                  close_fds=False )
        self.stdin = os.fdopen( master, 'rw' )
        self.stdout = self.stdin
        self.pid = self._get_pid()
        self.pollOut = select.poll()
        self.pollOut.register( self.stdout )
        # Maintain mapping between file descriptors and nodes
        # This is useful for monitoring multiple nodes
        # using select.poll()
        self.outToNode[ self.stdout.fileno() ] = self
        self.inToNode[ self.stdin.fileno() ] = self
        self.execed = False
        self.lastCmd = None
        self.lastPid = None
        self.readbuf = ''
        # Wait for prompt
        while True:
            data = self.read( 1024 )
            if data[ -1 ] == chr( 127 ):
                break
            self.pollOut.poll()
        self.waiting = False
        # +m: disable job control notification
        self.cmd( 'unset HISTFILE; stty -echo; set +m' )