Python psutil 模块,Popen() 实例源码

我们从Python开源项目中,提取了以下29个代码示例,用于说明如何使用psutil.Popen()

项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_wait_non_children(self):
        # test wait() against processes which are not our children
        code = "import sys;"
        code += "from subprocess import Popen, PIPE;"
        code += "cmd = ['%s', '-c', 'import time; time.sleep(60)'];" % PYTHON
        code += "sp = Popen(cmd, stdout=PIPE);"
        code += "sys.stdout.write(str(sp.pid));"
        sproc = get_test_subprocess([PYTHON, "-c", code],
                                    stdout=subprocess.PIPE)
        grandson_pid = int(sproc.stdout.read())
        grandson_proc = psutil.Process(grandson_pid)
        try:
            self.assertRaises(psutil.TimeoutExpired, grandson_proc.wait, 0.01)
            grandson_proc.kill()
            ret = grandson_proc.wait()
            self.assertEqual(ret, None)
        finally:
            reap_children(recursive=True)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_children_recursive(self):
        # here we create a subprocess which creates another one as in:
        # A (parent) -> B (child) -> C (grandchild)
        s = "import subprocess, os, sys, time;"
        s += "PYTHON = os.path.realpath(sys.executable);"
        s += "cmd = [PYTHON, '-c', 'import time; time.sleep(60);'];"
        s += "subprocess.Popen(cmd);"
        s += "time.sleep(60);"
        get_test_subprocess(cmd=[PYTHON, "-c", s])
        p = psutil.Process()
        self.assertEqual(len(p.children(recursive=False)), 1)
        # give the grandchild some time to start
        stop_at = time.time() + GLOBAL_TIMEOUT
        while time.time() < stop_at:
            children = p.children(recursive=True)
            if len(children) > 1:
                break
        self.assertEqual(len(children), 2)
        self.assertEqual(children[0].ppid(), os.getpid())
        self.assertEqual(children[1].ppid(), children[0].pid)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_Popen(self):
        # XXX this test causes a ResourceWarning on Python 3 because
        # psutil.__subproc instance doesn't get propertly freed.
        # Not sure what to do though.
        cmd = [PYTHON, "-c", "import time; time.sleep(60);"]
        proc = psutil.Popen(cmd, stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE)
        try:
            proc.name()
            proc.cpu_times()
            proc.stdin
            self.assertTrue(dir(proc))
            self.assertRaises(AttributeError, getattr, proc, 'foo')
        finally:
            proc.kill()
            proc.wait()
项目:esdc-ce    作者:erigones    | 项目源码 | 文件源码
def execute(cmd, stderr_to_stdout=False, stdin=None):
    """Execute a command in the shell and return a tuple (rc, stdout, stderr)"""
    if stderr_to_stdout:
        stderr = STDOUT
    else:
        stderr = PIPE

    if stdin is None:
        _stdin = None
    else:
        _stdin = PIPE

    p = Popen(cmd, close_fds=True, stdin=_stdin, stdout=PIPE, stderr=stderr, preexec_fn=os.setsid)
    stdout, stderr = p.communicate(input=stdin)

    return p.returncode, stdout, stderr
项目:futuquant    作者:FutunnOpen    | 项目源码 | 文件源码
def _fun_thread_daemon_watch(self):
        time_sleep = 5

        if self._close:
            return

        while True:

            '''check api work'''
            is_api_ok = self._is_api_socket_ok()
            if is_api_ok is True:
                time.sleep(time_sleep)
                continue

            self._loop_kill_futunn()

            '''start new ftnn.exe process'''
            process_new = psutil.Popen([self._exe_path, "type=python_auto"])
            if process_new is not None:
                print("FTApiDaemon new futnn process open ! pid={}".format(process_new.pid))
            else:
                print("FTApiDaemon open process fail ! ")

            time.sleep(time_sleep)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_wait_non_children(self):
        # test wait() against processes which are not our children
        code = "import sys;"
        code += "from subprocess import Popen, PIPE;"
        code += "cmd = ['%s', '-c', 'import time; time.sleep(60)'];" % PYTHON
        code += "sp = Popen(cmd, stdout=PIPE);"
        code += "sys.stdout.write(str(sp.pid));"
        sproc = get_test_subprocess([PYTHON, "-c", code],
                                    stdout=subprocess.PIPE)
        grandson_pid = int(sproc.stdout.read())
        grandson_proc = psutil.Process(grandson_pid)
        try:
            self.assertRaises(psutil.TimeoutExpired, grandson_proc.wait, 0.01)
            grandson_proc.kill()
            ret = grandson_proc.wait()
            self.assertEqual(ret, None)
        finally:
            reap_children(recursive=True)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_children_recursive(self):
        # here we create a subprocess which creates another one as in:
        # A (parent) -> B (child) -> C (grandchild)
        s = "import subprocess, os, sys, time;"
        s += "PYTHON = os.path.realpath(sys.executable);"
        s += "cmd = [PYTHON, '-c', 'import time; time.sleep(60);'];"
        s += "subprocess.Popen(cmd);"
        s += "time.sleep(60);"
        get_test_subprocess(cmd=[PYTHON, "-c", s])
        p = psutil.Process()
        self.assertEqual(len(p.children(recursive=False)), 1)
        # give the grandchild some time to start
        stop_at = time.time() + GLOBAL_TIMEOUT
        while time.time() < stop_at:
            children = p.children(recursive=True)
            if len(children) > 1:
                break
        self.assertEqual(len(children), 2)
        self.assertEqual(children[0].ppid(), os.getpid())
        self.assertEqual(children[1].ppid(), children[0].pid)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_Popen(self):
        # XXX this test causes a ResourceWarning on Python 3 because
        # psutil.__subproc instance doesn't get propertly freed.
        # Not sure what to do though.
        cmd = [PYTHON, "-c", "import time; time.sleep(60);"]
        proc = psutil.Popen(cmd, stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE)
        try:
            proc.name()
            proc.cpu_times()
            proc.stdin
            self.assertTrue(dir(proc))
            self.assertRaises(AttributeError, getattr, proc, 'foo')
        finally:
            proc.kill()
            proc.wait()
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_wait_non_children(self):
        # test wait() against processes which are not our children
        code = "import sys;"
        code += "from subprocess import Popen, PIPE;"
        code += "cmd = ['%s', '-c', 'import time; time.sleep(60)'];" % PYTHON
        code += "sp = Popen(cmd, stdout=PIPE);"
        code += "sys.stdout.write(str(sp.pid));"
        sproc = get_test_subprocess([PYTHON, "-c", code],
                                    stdout=subprocess.PIPE)
        grandson_pid = int(sproc.stdout.read())
        grandson_proc = psutil.Process(grandson_pid)
        try:
            self.assertRaises(psutil.TimeoutExpired, grandson_proc.wait, 0.01)
            grandson_proc.kill()
            ret = grandson_proc.wait()
            self.assertEqual(ret, None)
        finally:
            reap_children(recursive=True)
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_children_recursive(self):
        # here we create a subprocess which creates another one as in:
        # A (parent) -> B (child) -> C (grandchild)
        s = "import subprocess, os, sys, time;"
        s += "PYTHON = os.path.realpath(sys.executable);"
        s += "cmd = [PYTHON, '-c', 'import time; time.sleep(60);'];"
        s += "subprocess.Popen(cmd);"
        s += "time.sleep(60);"
        get_test_subprocess(cmd=[PYTHON, "-c", s])
        p = psutil.Process()
        self.assertEqual(len(p.children(recursive=False)), 1)
        # give the grandchild some time to start
        stop_at = time.time() + GLOBAL_TIMEOUT
        while time.time() < stop_at:
            children = p.children(recursive=True)
            if len(children) > 1:
                break
        self.assertEqual(len(children), 2)
        self.assertEqual(children[0].ppid(), os.getpid())
        self.assertEqual(children[1].ppid(), children[0].pid)
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_Popen(self):
        # XXX this test causes a ResourceWarning on Python 3 because
        # psutil.__subproc instance doesn't get propertly freed.
        # Not sure what to do though.
        cmd = [PYTHON, "-c", "import time; time.sleep(60);"]
        proc = psutil.Popen(cmd, stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE)
        try:
            proc.name()
            proc.cpu_times()
            proc.stdin
            self.assertTrue(dir(proc))
            self.assertRaises(AttributeError, getattr, proc, 'foo')
        finally:
            proc.kill()
            proc.wait()
项目:hoplite    作者:ni    | 项目源码 | 文件源码
def start_hoplite_server(port_num):
    proc = psutil.Popen(
        '{} -c "import hoplite.main; hoplite.main.server_main([\'--port={}\'])"'.format(sys.executable, port_num)
    )
    wait_for_hoplite('localhost', port_num)
    return proc
项目:hoplite    作者:ni    | 项目源码 | 文件源码
def start_hoplite_server(port_num):
    proc = psutil.Popen(
        '{} -c "import hoplite.main; hoplite.main.server_main([\'--port={}\'])"'.format(sys.executable, port_num),
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT
    )
    wait_for_hoplite('localhost', port_num)
    return proc
项目:hoplite    作者:ni    | 项目源码 | 文件源码
def start_hoplite_server(port_num):
    proc = psutil.Popen(
        '{} -c "import hoplite.main; hoplite.main.server_main([\'--port={}\'])"'.format(sys.executable, port_num)
    )
    wait_for_hoplite('localhost', port_num)
    return proc
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_exe(self):
        sproc = get_test_subprocess()
        exe = psutil.Process(sproc.pid).exe()
        try:
            self.assertEqual(exe, PYTHON)
        except AssertionError:
            if WINDOWS and len(exe) == len(PYTHON):
                # on Windows we don't care about case sensitivity
                normcase = os.path.normcase
                self.assertEqual(normcase(exe), normcase(PYTHON))
            else:
                # certain platforms such as BSD are more accurate returning:
                # "/usr/local/bin/python2.7"
                # ...instead of:
                # "/usr/local/bin/python"
                # We do not want to consider this difference in accuracy
                # an error.
                ver = "%s.%s" % (sys.version_info[0], sys.version_info[1])
                try:
                    self.assertEqual(exe.replace(ver, ''),
                                     PYTHON.replace(ver, ''))
                except AssertionError:
                    # Tipically OSX. Really not sure what to do here.
                    pass

        subp = subprocess.Popen([exe, '-c', 'import os; print("hey")'],
                                stdout=subprocess.PIPE)
        out, _ = subp.communicate()
        self.assertEqual(out.strip(), b'hey')
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_Popen_ctx_manager(self):
        with psutil.Popen([PYTHON, "-V"],
                          stdout=subprocess.PIPE,
                          stderr=subprocess.PIPE,
                          stdin=subprocess.PIPE) as proc:
            pass
        assert proc.stdout.closed
        assert proc.stderr.closed
        assert proc.stdin.closed
项目:esdc-ce    作者:erigones    | 项目源码 | 文件源码
def _execute(cmd, stdin=None):
    """Run command and return output"""
    logger.warn('Running command (panel): %s', cmd)  # Warning level because we want to see this in logs
    proc = Popen(cmd, bufsize=0, close_fds=True, stdout=PIPE, stderr=PIPE, stdin=PIPE)
    stdout, stderr = proc.communicate(input=stdin)

    return {
        'returncode': proc.returncode,
        'stdout': stdout,
        'stderr': stderr,
    }


# noinspection PyUnusedLocal
项目:esdc-ce    作者:erigones    | 项目源码 | 文件源码
def start(self, parent):
        self._set_node_uuid()
        super(FastDaemon, self).start(parent)
        self.vm_status_queue = Queue()
        self.vm_status_watcher = Popen(self.SYSEVENT, bufsize=0, close_fds=True, stdout=PIPE, stderr=STDOUT,
                                       preexec_fn=os.setsid)
        self.vm_status_monitor_thread = Thread(target=self._vm_status_monitor, name='VMStatusMonitor',
                                               args=(self.vm_status_watcher.stdout,))
        self.vm_status_monitor_thread.daemon = True
        self.vm_status_monitor_thread.start()
        self.vm_status_dispatcher_thread = Thread(target=self._vm_status_dispatcher, name='VMStatusDispatcher')
        self.vm_status_dispatcher_thread.daemon = True
        self.vm_status_dispatcher_thread.start()
项目:futuquant    作者:FutunnOpen    | 项目源码 | 文件源码
def _fun_thread_daemon_restart(self):
        if self._close:
            return

        while True:
            if self._is_api_socket_ok() is False:
                process_new = psutil.Popen([self._exe_path, "type=python_auto"])
                if process_new is not None:
                    print("FTApiDaemon new futnn process open ! pid={}".format(process_new.pid))
                else:
                    print("FTApiDaemon open process fail ! ")

            time.sleep(self._time_restart)
            self._loop_kill_futunn()
项目:maestro    作者:InWorldz    | 项目源码 | 文件源码
def RunCommandEx(self, cwd, cmd, cmdargs):
        try:
            cmdline = "{0} {1}".format(cmd, cmdargs)    
            p = psutil.Popen(cmdline, close_fds=True, cwd=cwd, creationflags=subprocess.CREATE_NEW_CONSOLE)
            return p
        except Exception: 
            return None
项目:maestro    作者:InWorldz    | 项目源码 | 文件源码
def RunCommandAsEx(self, username, password, cwd, cmd, cmdargs):
        try:
            cmdline = "PsExec -u {0} -p {1} {2} {3}".format(username, password, cwd, cmd, cmdargs)
            p = psutil.Popen(cmdline, close_fds=True, cwd=cwd, creationflags=subprocess.CREATE_NEW_CONSOLE)
            return p
        except Exception: 
            return None
项目:maestro    作者:InWorldz    | 项目源码 | 文件源码
def DumpThreadStacks(pid):
    sddir = os.path.dirname(sys.argv[0])
    p = psutil.Popen([os.path.join(sddir, "stackdump.exe"), str(pid)], 
                     stdout=subprocess.PIPE, 
                     stderr=subprocess.PIPE)

    # wait for the process to terminate
    out = p.communicate()[0]

    return out
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_exe(self):
        sproc = get_test_subprocess()
        exe = psutil.Process(sproc.pid).exe()
        try:
            self.assertEqual(exe, PYTHON)
        except AssertionError:
            if WINDOWS and len(exe) == len(PYTHON):
                # on Windows we don't care about case sensitivity
                normcase = os.path.normcase
                self.assertEqual(normcase(exe), normcase(PYTHON))
            else:
                # certain platforms such as BSD are more accurate returning:
                # "/usr/local/bin/python2.7"
                # ...instead of:
                # "/usr/local/bin/python"
                # We do not want to consider this difference in accuracy
                # an error.
                ver = "%s.%s" % (sys.version_info[0], sys.version_info[1])
                try:
                    self.assertEqual(exe.replace(ver, ''),
                                     PYTHON.replace(ver, ''))
                except AssertionError:
                    # Tipically OSX. Really not sure what to do here.
                    pass

        subp = subprocess.Popen([exe, '-c', 'import os; print("hey")'],
                                stdout=subprocess.PIPE)
        out, _ = subp.communicate()
        self.assertEqual(out.strip(), b'hey')
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_Popen_ctx_manager(self):
        with psutil.Popen([PYTHON, "-V"],
                          stdout=subprocess.PIPE,
                          stderr=subprocess.PIPE,
                          stdin=subprocess.PIPE) as proc:
            pass
        assert proc.stdout.closed
        assert proc.stderr.closed
        assert proc.stdin.closed
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_exe(self):
        sproc = get_test_subprocess()
        exe = psutil.Process(sproc.pid).exe()
        try:
            self.assertEqual(exe, PYTHON)
        except AssertionError:
            if WINDOWS and len(exe) == len(PYTHON):
                # on Windows we don't care about case sensitivity
                normcase = os.path.normcase
                self.assertEqual(normcase(exe), normcase(PYTHON))
            else:
                # certain platforms such as BSD are more accurate returning:
                # "/usr/local/bin/python2.7"
                # ...instead of:
                # "/usr/local/bin/python"
                # We do not want to consider this difference in accuracy
                # an error.
                ver = "%s.%s" % (sys.version_info[0], sys.version_info[1])
                try:
                    self.assertEqual(exe.replace(ver, ''),
                                     PYTHON.replace(ver, ''))
                except AssertionError:
                    # Tipically OSX. Really not sure what to do here.
                    pass

        subp = subprocess.Popen([exe, '-c', 'import os; print("hey")'],
                                stdout=subprocess.PIPE)
        out, _ = subp.communicate()
        self.assertEqual(out.strip(), b'hey')
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_Popen_ctx_manager(self):
        with psutil.Popen([PYTHON, "-V"],
                          stdout=subprocess.PIPE,
                          stderr=subprocess.PIPE,
                          stdin=subprocess.PIPE) as proc:
            pass
        assert proc.stdout.closed
        assert proc.stderr.closed
        assert proc.stdin.closed
项目:RL4Data    作者:fyabc    | 项目源码 | 文件源码
def main():
    process_size = 4

    for episode in range(5):
        ret_values = [None for _ in range(process_size)]
        results = [None for _ in range(process_size)]

        envs = [os.environ.copy() for _ in range(process_size)]

        for i, env in enumerate(envs):
            env[str('THEANO_FLAGS')] = str('device=cpu,floatX=float{}'.format(32 if i % 2 == 0 else 16))

        time_before = time.time()
        pool = [
            psutil.Popen(
                ['python', 'slave.py'],
                stdout=subprocess.PIPE,
                env=envs[i],
            )
            for i in range(process_size)
        ]

        # time_after = time.time()

        # Roll polling
        while any(e is None for e in ret_values):
            for i, process in enumerate(pool):
                ret_values[i] = process.poll()
            time.sleep(1.0)

        time_after = time.time()

        for i, process in enumerate(pool):
            if ret_values[i] == 0:
                results[i], _ = process.communicate()

        print('Time: {:.6}s'.format(time_after - time_before))
        print(*results, sep='')
项目:kickoff-player    作者:jonian    | 项目源码 | 文件源码
def run_command(args, **kwargs):
  kwargs  = merge_dicts({ 'stdout': subprocess.PIPE }, kwargs)
  command = Popen(args, **kwargs)

  return command
项目:indy-plenum    作者:hyperledger    | 项目源码 | 文件源码
def run_script(script, *args):
    s = os.path.join(os.path.dirname(__file__), '../../scripts/' + script)
    command = [executable, s]
    command.extend(args)

    with Popen([executable, s]) as p:
        sleep(4)
        p.send_signal(SIGINT)
        p.wait(timeout=1)
        assert p.poll() == 0, 'script failed'