Python psutil 模块,POSIX 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用psutil.POSIX

项目:cli    作者:sparkl    | 项目源码 | 文件源码
def get_default_session():
    """
    Locates the first ancestor process which is a shell. Returns
    its pid, or None if not found.
    """
    if psutil.POSIX:
        def predicate(name):
            return name.endswith("sh")

    elif psutil.WINDOWS:
        def predicate(name):
            return name in ("cmd.exe", "powershell.exe")

    else:
        return None

    proc = psutil.Process()
    while proc.parent().pid:
        proc = proc.parent()
        if predicate(proc.name()):
            return proc.pid

    return None
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_wait_timeout_0(self):
        sproc = get_test_subprocess()
        p = psutil.Process(sproc.pid)
        self.assertRaises(psutil.TimeoutExpired, p.wait, 0)
        p.kill()
        stop_at = time.time() + 2
        while True:
            try:
                code = p.wait(0)
            except psutil.TimeoutExpired:
                if time.time() >= stop_at:
                    raise
            else:
                break
        if POSIX:
            self.assertEqual(code, signal.SIGKILL)
        else:
            self.assertEqual(code, 0)
        self.assertFalse(p.is_running())
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_username(self):
        sproc = get_test_subprocess()
        p = psutil.Process(sproc.pid)
        if POSIX:
            import pwd
            self.assertEqual(p.username(), pwd.getpwuid(os.getuid()).pw_name)
            with mock.patch("psutil.pwd.getpwuid",
                            side_effect=KeyError) as fun:
                p.username() == str(p.uids().real)
                assert fun.called

        elif WINDOWS and 'USERNAME' in os.environ:
            expected_username = os.environ['USERNAME']
            expected_domain = os.environ['USERDOMAIN']
            domain, username = p.username().split('\\')
            self.assertEqual(domain, expected_domain)
            self.assertEqual(username, expected_username)
        else:
            p.username()
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_net_if_addrs_mac_null_bytes(self):
        # Simulate that the underlying C function returns an incomplete
        # MAC address. psutil is supposed to fill it with null bytes.
        # https://github.com/giampaolo/psutil/issues/786
        if POSIX:
            ret = [('em1', psutil.AF_LINK, '06:3d:29', None, None, None)]
        else:
            ret = [('em1', -1, '06-3d-29', None, None, None)]
        with mock.patch('psutil._psplatform.net_if_addrs',
                        return_value=ret) as m:
            addr = psutil.net_if_addrs()['em1'][0]
            assert m.called
            if POSIX:
                self.assertEqual(addr.address, '06:3d:29:00:00:00')
            else:
                self.assertEqual(addr.address, '06-3d-29-00-00-00')
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_wait_timeout_0(self):
        sproc = get_test_subprocess()
        p = psutil.Process(sproc.pid)
        self.assertRaises(psutil.TimeoutExpired, p.wait, 0)
        p.kill()
        stop_at = time.time() + 2
        while True:
            try:
                code = p.wait(0)
            except psutil.TimeoutExpired:
                if time.time() >= stop_at:
                    raise
            else:
                break
        if POSIX:
            self.assertEqual(code, -signal.SIGKILL)
        else:
            self.assertEqual(code, 0)
        self.assertFalse(p.is_running())
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_username(self):
        sproc = get_test_subprocess()
        p = psutil.Process(sproc.pid)
        if POSIX:
            import pwd
            self.assertEqual(p.username(), pwd.getpwuid(os.getuid()).pw_name)
            with mock.patch("psutil.pwd.getpwuid",
                            side_effect=KeyError) as fun:
                p.username() == str(p.uids().real)
                assert fun.called

        elif WINDOWS and 'USERNAME' in os.environ:
            expected_username = os.environ['USERNAME']
            expected_domain = os.environ['USERDOMAIN']
            domain, username = p.username().split('\\')
            self.assertEqual(domain, expected_domain)
            self.assertEqual(username, expected_username)
        else:
            p.username()
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_net_if_addrs_mac_null_bytes(self):
        # Simulate that the underlying C function returns an incomplete
        # MAC address. psutil is supposed to fill it with null bytes.
        # https://github.com/giampaolo/psutil/issues/786
        if POSIX:
            ret = [('em1', psutil.AF_LINK, '06:3d:29', None, None, None)]
        else:
            ret = [('em1', -1, '06-3d-29', None, None, None)]
        with mock.patch('psutil._psplatform.net_if_addrs',
                        return_value=ret) as m:
            addr = psutil.net_if_addrs()['em1'][0]
            assert m.called
            if POSIX:
                self.assertEqual(addr.address, '06:3d:29:00:00:00')
            else:
                self.assertEqual(addr.address, '06-3d-29-00-00-00')
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_wait_timeout_0(self):
        sproc = get_test_subprocess()
        p = psutil.Process(sproc.pid)
        self.assertRaises(psutil.TimeoutExpired, p.wait, 0)
        p.kill()
        stop_at = time.time() + 2
        while True:
            try:
                code = p.wait(0)
            except psutil.TimeoutExpired:
                if time.time() >= stop_at:
                    raise
            else:
                break
        if POSIX:
            self.assertEqual(code, -signal.SIGKILL)
        else:
            self.assertEqual(code, 0)
        self.assertFalse(p.is_running())
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_username(self):
        sproc = get_test_subprocess()
        p = psutil.Process(sproc.pid)
        if POSIX:
            import pwd
            self.assertEqual(p.username(), pwd.getpwuid(os.getuid()).pw_name)
            with mock.patch("psutil.pwd.getpwuid",
                            side_effect=KeyError) as fun:
                p.username() == str(p.uids().real)
                assert fun.called

        elif WINDOWS and 'USERNAME' in os.environ:
            expected_username = os.environ['USERNAME']
            expected_domain = os.environ['USERDOMAIN']
            domain, username = p.username().split('\\')
            self.assertEqual(domain, expected_domain)
            self.assertEqual(username, expected_username)
        else:
            p.username()
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_net_if_addrs_mac_null_bytes(self):
        # Simulate that the underlying C function returns an incomplete
        # MAC address. psutil is supposed to fill it with null bytes.
        # https://github.com/giampaolo/psutil/issues/786
        if POSIX:
            ret = [('em1', psutil.AF_LINK, '06:3d:29', None, None, None)]
        else:
            ret = [('em1', -1, '06-3d-29', None, None, None)]
        with mock.patch('psutil._psplatform.net_if_addrs',
                        return_value=ret) as m:
            addr = psutil.net_if_addrs()['em1'][0]
            assert m.called
            if POSIX:
                self.assertEqual(addr.address, '06:3d:29:00:00:00')
            else:
                self.assertEqual(addr.address, '06-3d-29-00-00-00')
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_kill(self):
        sproc = get_test_subprocess()
        test_pid = sproc.pid
        p = psutil.Process(test_pid)
        p.kill()
        sig = p.wait()
        self.assertFalse(psutil.pid_exists(test_pid))
        if POSIX:
            self.assertEqual(sig, signal.SIGKILL)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_terminate(self):
        sproc = get_test_subprocess()
        test_pid = sproc.pid
        p = psutil.Process(test_pid)
        p.terminate()
        sig = p.wait()
        self.assertFalse(psutil.pid_exists(test_pid))
        if POSIX:
            self.assertEqual(sig, signal.SIGTERM)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_send_signal(self):
        sig = signal.SIGKILL if POSIX else signal.SIGTERM
        sproc = get_test_subprocess()
        p = psutil.Process(sproc.pid)
        p.send_signal(sig)
        exit_sig = p.wait()
        self.assertFalse(psutil.pid_exists(p.pid))
        if POSIX:
            self.assertEqual(exit_sig, sig)
            #
            sproc = get_test_subprocess()
            p = psutil.Process(sproc.pid)
            p.send_signal(sig)
            with mock.patch('psutil.os.kill',
                            side_effect=OSError(errno.ESRCH, "")):
                with self.assertRaises(psutil.NoSuchProcess):
                    p.send_signal(sig)
            #
            sproc = get_test_subprocess()
            p = psutil.Process(sproc.pid)
            p.send_signal(sig)
            with mock.patch('psutil.os.kill',
                            side_effect=OSError(errno.EPERM, "")):
                with self.assertRaises(psutil.AccessDenied):
                    psutil.Process().send_signal(sig)
            # Sending a signal to process with PID 0 is not allowed as
            # it would affect every process in the process group of
            # the calling process (os.getpid()) instead of PID 0").
            if 0 in psutil.pids():
                p = psutil.Process(0)
                self.assertRaises(ValueError, p.send_signal, signal.SIGTERM)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_memory_maps(self):
        p = psutil.Process()
        maps = p.memory_maps()
        paths = [x for x in maps]
        self.assertEqual(len(paths), len(set(paths)))
        ext_maps = p.memory_maps(grouped=False)

        for nt in maps:
            if not nt.path.startswith('['):
                assert os.path.isabs(nt.path), nt.path
                if POSIX:
                    try:
                        assert os.path.exists(nt.path) or \
                            os.path.islink(nt.path), nt.path
                    except AssertionError:
                        if not LINUX:
                            raise
                        else:
                            # https://github.com/giampaolo/psutil/issues/759
                            with open('/proc/self/smaps') as f:
                                data = f.read()
                            if "%s (deleted)" % nt.path not in data:
                                raise
                else:
                    # XXX - On Windows we have this strange behavior with
                    # 64 bit dlls: they are visible via explorer but cannot
                    # be accessed via os.stat() (wtf?).
                    if '64' not in os.path.basename(nt.path):
                        assert os.path.exists(nt.path), nt.path
        for nt in ext_maps:
            for fname in nt._fields:
                value = getattr(nt, fname)
                if fname == 'path':
                    continue
                elif fname in ('addr', 'perms'):
                    assert value, value
                else:
                    self.assertIsInstance(value, (int, long))
                    assert value >= 0, value
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_pid_0(self):
        # Process(0) is supposed to work on all platforms except Linux
        if 0 not in psutil.pids():
            self.assertRaises(psutil.NoSuchProcess, psutil.Process, 0)
            return

        # test all methods
        p = psutil.Process(0)
        for name in psutil._as_dict_attrnames:
            if name == 'pid':
                continue
            meth = getattr(p, name)
            try:
                ret = meth()
            except psutil.AccessDenied:
                pass
            else:
                if name in ("uids", "gids"):
                    self.assertEqual(ret.real, 0)
                elif name == "username":
                    if POSIX:
                        self.assertEqual(p.username(), 'root')
                    elif WINDOWS:
                        self.assertEqual(p.username(), 'NT AUTHORITY\\SYSTEM')
                elif name == "name":
                    assert name, name

        if hasattr(p, 'rlimit'):
            try:
                p.rlimit(psutil.RLIMIT_FSIZE)
            except psutil.AccessDenied:
                pass

        p.as_dict()

        if not OPENBSD:
            self.assertIn(0, psutil.pids())
            self.assertTrue(psutil.pid_exists(0))
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def setUp(self):
        if POSIX:
            import pwd
            import grp
            users = pwd.getpwall()
            groups = grp.getgrall()
            self.all_uids = set([x.pw_uid for x in users])
            self.all_usernames = set([x.pw_name for x in users])
            self.all_gids = set([x.gr_gid for x in groups])
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def exe(self, ret, proc):
        if not ret:
            self.assertEqual(ret, '')
        else:
            assert os.path.isabs(ret), ret
            # Note: os.stat() may return False even if the file is there
            # hence we skip the test, see:
            # http://stackoverflow.com/questions/3112546/os-path-exists-lies
            if POSIX and os.path.isfile(ret):
                if hasattr(os, 'access') and hasattr(os, "X_OK"):
                    # XXX may fail on OSX
                    self.assertTrue(os.access(ret, os.X_OK))
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def memory_info(self, ret, proc):
        for name in ret._fields:
            self.assertGreaterEqual(getattr(ret, name), 0)
        if POSIX and ret.vms != 0:
            # VMS is always supposed to be the highest
            for name in ret._fields:
                if name != 'vms':
                    value = getattr(ret, name)
                    assert ret.vms > value, ret
        elif WINDOWS:
            assert ret.peak_wset >= ret.wset, ret
            assert ret.peak_paged_pool >= ret.paged_pool, ret
            assert ret.peak_nonpaged_pool >= ret.nonpaged_pool, ret
            assert ret.peak_pagefile >= ret.pagefile, ret
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def nice(self, ret, proc):
        if POSIX:
            assert -20 <= ret <= 20, ret
        else:
            priorities = [getattr(psutil, x) for x in dir(psutil)
                          if x.endswith('_PRIORITY_CLASS')]
            self.assertIn(ret, priorities)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def create_exe(outpath, c_code=None):
    assert not os.path.exists(outpath), outpath
    if which("gcc"):
        if c_code is None:
            c_code = textwrap.dedent(
                """
                #include <unistd.h>
                int main() {
                    pause();
                    return 1;
                }
                """)
        with tempfile.NamedTemporaryFile(
                suffix='.c', delete=False, mode='wt') as f:
            f.write(c_code)
        try:
            subprocess.check_call(["gcc", f.name, "-o", outpath])
        finally:
            safe_rmpath(f.name)
    else:
        # fallback - use python's executable
        shutil.copyfile(sys.executable, outpath)
        if POSIX:
            st = os.stat(outpath)
            os.chmod(outpath, st.st_mode | stat.S_IEXEC)


# ===================================================================
# --- testing
# ===================================================================
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_kill(self):
        sproc = get_test_subprocess()
        test_pid = sproc.pid
        p = psutil.Process(test_pid)
        p.kill()
        sig = p.wait()
        self.assertFalse(psutil.pid_exists(test_pid))
        if POSIX:
            self.assertEqual(sig, -signal.SIGKILL)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_terminate(self):
        sproc = get_test_subprocess()
        test_pid = sproc.pid
        p = psutil.Process(test_pid)
        p.terminate()
        sig = p.wait()
        self.assertFalse(psutil.pid_exists(test_pid))
        if POSIX:
            self.assertEqual(sig, -signal.SIGTERM)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_send_signal(self):
        sig = signal.SIGKILL if POSIX else signal.SIGTERM
        sproc = get_test_subprocess()
        p = psutil.Process(sproc.pid)
        p.send_signal(sig)
        exit_sig = p.wait()
        self.assertFalse(psutil.pid_exists(p.pid))
        if POSIX:
            self.assertEqual(exit_sig, -sig)
            #
            sproc = get_test_subprocess()
            p = psutil.Process(sproc.pid)
            p.send_signal(sig)
            with mock.patch('psutil.os.kill',
                            side_effect=OSError(errno.ESRCH, "")):
                with self.assertRaises(psutil.NoSuchProcess):
                    p.send_signal(sig)
            #
            sproc = get_test_subprocess()
            p = psutil.Process(sproc.pid)
            p.send_signal(sig)
            with mock.patch('psutil.os.kill',
                            side_effect=OSError(errno.EPERM, "")):
                with self.assertRaises(psutil.AccessDenied):
                    psutil.Process().send_signal(sig)
            # Sending a signal to process with PID 0 is not allowed as
            # it would affect every process in the process group of
            # the calling process (os.getpid()) instead of PID 0").
            if 0 in psutil.pids():
                p = psutil.Process(0)
                self.assertRaises(ValueError, p.send_signal, signal.SIGTERM)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_memory_maps(self):
        p = psutil.Process()
        maps = p.memory_maps()
        paths = [x for x in maps]
        self.assertEqual(len(paths), len(set(paths)))
        ext_maps = p.memory_maps(grouped=False)

        for nt in maps:
            if not nt.path.startswith('['):
                assert os.path.isabs(nt.path), nt.path
                if POSIX:
                    try:
                        assert os.path.exists(nt.path) or \
                            os.path.islink(nt.path), nt.path
                    except AssertionError:
                        if not LINUX:
                            raise
                        else:
                            # https://github.com/giampaolo/psutil/issues/759
                            with open('/proc/self/smaps') as f:
                                data = f.read()
                            if "%s (deleted)" % nt.path not in data:
                                raise
                else:
                    # XXX - On Windows we have this strange behavior with
                    # 64 bit dlls: they are visible via explorer but cannot
                    # be accessed via os.stat() (wtf?).
                    if '64' not in os.path.basename(nt.path):
                        assert os.path.exists(nt.path), nt.path
        for nt in ext_maps:
            for fname in nt._fields:
                value = getattr(nt, fname)
                if fname == 'path':
                    continue
                elif fname in ('addr', 'perms'):
                    assert value, value
                else:
                    self.assertIsInstance(value, (int, long))
                    assert value >= 0, value
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_pid_0(self):
        # Process(0) is supposed to work on all platforms except Linux
        if 0 not in psutil.pids():
            self.assertRaises(psutil.NoSuchProcess, psutil.Process, 0)
            return

        # test all methods
        p = psutil.Process(0)
        for name in psutil._as_dict_attrnames:
            if name == 'pid':
                continue
            meth = getattr(p, name)
            try:
                ret = meth()
            except psutil.AccessDenied:
                pass
            else:
                if name in ("uids", "gids"):
                    self.assertEqual(ret.real, 0)
                elif name == "username":
                    if POSIX:
                        self.assertEqual(p.username(), 'root')
                    elif WINDOWS:
                        self.assertEqual(p.username(), 'NT AUTHORITY\\SYSTEM')
                elif name == "name":
                    assert name, name

        if hasattr(p, 'rlimit'):
            try:
                p.rlimit(psutil.RLIMIT_FSIZE)
            except psutil.AccessDenied:
                pass

        p.as_dict()

        if not OPENBSD:
            self.assertIn(0, psutil.pids())
            self.assertTrue(psutil.pid_exists(0))
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def setUp(self):
        if POSIX:
            import pwd
            import grp
            users = pwd.getpwall()
            groups = grp.getgrall()
            self.all_uids = set([x.pw_uid for x in users])
            self.all_usernames = set([x.pw_name for x in users])
            self.all_gids = set([x.gr_gid for x in groups])
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def exe(self, ret, proc):
        if not ret:
            self.assertEqual(ret, '')
        else:
            assert os.path.isabs(ret), ret
            # Note: os.stat() may return False even if the file is there
            # hence we skip the test, see:
            # http://stackoverflow.com/questions/3112546/os-path-exists-lies
            if POSIX and os.path.isfile(ret):
                if hasattr(os, 'access') and hasattr(os, "X_OK"):
                    # XXX may fail on OSX
                    self.assertTrue(os.access(ret, os.X_OK))
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def memory_info(self, ret, proc):
        for name in ret._fields:
            self.assertGreaterEqual(getattr(ret, name), 0)
        if POSIX and ret.vms != 0:
            # VMS is always supposed to be the highest
            for name in ret._fields:
                if name != 'vms':
                    value = getattr(ret, name)
                    assert ret.vms > value, ret
        elif WINDOWS:
            assert ret.peak_wset >= ret.wset, ret
            assert ret.peak_paged_pool >= ret.paged_pool, ret
            assert ret.peak_nonpaged_pool >= ret.nonpaged_pool, ret
            assert ret.peak_pagefile >= ret.pagefile, ret
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def nice(self, ret, proc):
        if POSIX:
            assert -20 <= ret <= 20, ret
        else:
            priorities = [getattr(psutil, x) for x in dir(psutil)
                          if x.endswith('_PRIORITY_CLASS')]
            self.assertIn(ret, priorities)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def create_exe(outpath, c_code=None):
    """Creates an executable file in the given location."""
    assert not os.path.exists(outpath), outpath
    if which("gcc"):
        if c_code is None:
            c_code = textwrap.dedent(
                """
                #include <unistd.h>
                int main() {
                    pause();
                    return 1;
                }
                """)
        with tempfile.NamedTemporaryFile(
                suffix='.c', delete=False, mode='wt') as f:
            f.write(c_code)
        try:
            subprocess.check_call(["gcc", f.name, "-o", outpath])
        finally:
            safe_rmpath(f.name)
    else:
        # fallback - use python's executable
        if c_code is not None:
            raise ValueError(
                "can't specify c_code arg as gcc is not installed")
        shutil.copyfile(sys.executable, outpath)
        if POSIX:
            st = os.stat(outpath)
            os.chmod(outpath, st.st_mode | stat.S_IEXEC)


# ===================================================================
# --- testing
# ===================================================================
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_kill(self):
        sproc = get_test_subprocess()
        test_pid = sproc.pid
        p = psutil.Process(test_pid)
        p.kill()
        sig = p.wait()
        self.assertFalse(psutil.pid_exists(test_pid))
        if POSIX:
            self.assertEqual(sig, -signal.SIGKILL)
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_terminate(self):
        sproc = get_test_subprocess()
        test_pid = sproc.pid
        p = psutil.Process(test_pid)
        p.terminate()
        sig = p.wait()
        self.assertFalse(psutil.pid_exists(test_pid))
        if POSIX:
            self.assertEqual(sig, -signal.SIGTERM)
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_send_signal(self):
        sig = signal.SIGKILL if POSIX else signal.SIGTERM
        sproc = get_test_subprocess()
        p = psutil.Process(sproc.pid)
        p.send_signal(sig)
        exit_sig = p.wait()
        self.assertFalse(psutil.pid_exists(p.pid))
        if POSIX:
            self.assertEqual(exit_sig, -sig)
            #
            sproc = get_test_subprocess()
            p = psutil.Process(sproc.pid)
            p.send_signal(sig)
            with mock.patch('psutil.os.kill',
                            side_effect=OSError(errno.ESRCH, "")):
                with self.assertRaises(psutil.NoSuchProcess):
                    p.send_signal(sig)
            #
            sproc = get_test_subprocess()
            p = psutil.Process(sproc.pid)
            p.send_signal(sig)
            with mock.patch('psutil.os.kill',
                            side_effect=OSError(errno.EPERM, "")):
                with self.assertRaises(psutil.AccessDenied):
                    psutil.Process().send_signal(sig)
            # Sending a signal to process with PID 0 is not allowed as
            # it would affect every process in the process group of
            # the calling process (os.getpid()) instead of PID 0").
            if 0 in psutil.pids():
                p = psutil.Process(0)
                self.assertRaises(ValueError, p.send_signal, signal.SIGTERM)
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_memory_maps(self):
        p = psutil.Process()
        maps = p.memory_maps()
        paths = [x for x in maps]
        self.assertEqual(len(paths), len(set(paths)))
        ext_maps = p.memory_maps(grouped=False)

        for nt in maps:
            if not nt.path.startswith('['):
                assert os.path.isabs(nt.path), nt.path
                if POSIX:
                    try:
                        assert os.path.exists(nt.path) or \
                            os.path.islink(nt.path), nt.path
                    except AssertionError:
                        if not LINUX:
                            raise
                        else:
                            # https://github.com/giampaolo/psutil/issues/759
                            with open('/proc/self/smaps') as f:
                                data = f.read()
                            if "%s (deleted)" % nt.path not in data:
                                raise
                else:
                    # XXX - On Windows we have this strange behavior with
                    # 64 bit dlls: they are visible via explorer but cannot
                    # be accessed via os.stat() (wtf?).
                    if '64' not in os.path.basename(nt.path):
                        assert os.path.exists(nt.path), nt.path
        for nt in ext_maps:
            for fname in nt._fields:
                value = getattr(nt, fname)
                if fname == 'path':
                    continue
                elif fname in ('addr', 'perms'):
                    assert value, value
                else:
                    self.assertIsInstance(value, (int, long))
                    assert value >= 0, value
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_pid_0(self):
        # Process(0) is supposed to work on all platforms except Linux
        if 0 not in psutil.pids():
            self.assertRaises(psutil.NoSuchProcess, psutil.Process, 0)
            return

        # test all methods
        p = psutil.Process(0)
        for name in psutil._as_dict_attrnames:
            if name == 'pid':
                continue
            meth = getattr(p, name)
            try:
                ret = meth()
            except psutil.AccessDenied:
                pass
            else:
                if name in ("uids", "gids"):
                    self.assertEqual(ret.real, 0)
                elif name == "username":
                    if POSIX:
                        self.assertEqual(p.username(), 'root')
                    elif WINDOWS:
                        self.assertEqual(p.username(), 'NT AUTHORITY\\SYSTEM')
                elif name == "name":
                    assert name, name

        if hasattr(p, 'rlimit'):
            try:
                p.rlimit(psutil.RLIMIT_FSIZE)
            except psutil.AccessDenied:
                pass

        p.as_dict()

        if not OPENBSD:
            self.assertIn(0, psutil.pids())
            self.assertTrue(psutil.pid_exists(0))
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def setUp(self):
        if POSIX:
            import pwd
            import grp
            users = pwd.getpwall()
            groups = grp.getgrall()
            self.all_uids = set([x.pw_uid for x in users])
            self.all_usernames = set([x.pw_name for x in users])
            self.all_gids = set([x.gr_gid for x in groups])
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def exe(self, ret, proc):
        if not ret:
            self.assertEqual(ret, '')
        else:
            assert os.path.isabs(ret), ret
            # Note: os.stat() may return False even if the file is there
            # hence we skip the test, see:
            # http://stackoverflow.com/questions/3112546/os-path-exists-lies
            if POSIX and os.path.isfile(ret):
                if hasattr(os, 'access') and hasattr(os, "X_OK"):
                    # XXX may fail on OSX
                    self.assertTrue(os.access(ret, os.X_OK))
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def memory_info(self, ret, proc):
        for name in ret._fields:
            self.assertGreaterEqual(getattr(ret, name), 0)
        if POSIX and ret.vms != 0:
            # VMS is always supposed to be the highest
            for name in ret._fields:
                if name != 'vms':
                    value = getattr(ret, name)
                    assert ret.vms > value, ret
        elif WINDOWS:
            assert ret.peak_wset >= ret.wset, ret
            assert ret.peak_paged_pool >= ret.paged_pool, ret
            assert ret.peak_nonpaged_pool >= ret.nonpaged_pool, ret
            assert ret.peak_pagefile >= ret.pagefile, ret
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def nice(self, ret, proc):
        if POSIX:
            assert -20 <= ret <= 20, ret
        else:
            priorities = [getattr(psutil, x) for x in dir(psutil)
                          if x.endswith('_PRIORITY_CLASS')]
            self.assertIn(ret, priorities)
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def create_exe(outpath, c_code=None):
    """Creates an executable file in the given location."""
    assert not os.path.exists(outpath), outpath
    if which("gcc"):
        if c_code is None:
            c_code = textwrap.dedent(
                """
                #include <unistd.h>
                int main() {
                    pause();
                    return 1;
                }
                """)
        with tempfile.NamedTemporaryFile(
                suffix='.c', delete=False, mode='wt') as f:
            f.write(c_code)
        try:
            subprocess.check_call(["gcc", f.name, "-o", outpath])
        finally:
            safe_rmpath(f.name)
    else:
        # fallback - use python's executable
        if c_code is not None:
            raise ValueError(
                "can't specify c_code arg as gcc is not installed")
        shutil.copyfile(sys.executable, outpath)
        if POSIX:
            st = os.stat(outpath)
            os.chmod(outpath, st.st_mode | stat.S_IEXEC)


# ===================================================================
# --- testing
# ===================================================================
项目:LinuxBashShellScriptForOps    作者:DingGuodong    | 项目源码 | 文件源码
def print_(a, b):
    if sys.stdout.isatty() and psutil.POSIX:
        fmt = '\x1b[1;32m%-13s\x1b[0m %s' % (a, b)
    else:
        fmt = '%-11s %s' % (a, b)
    print(fmt)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_wait(self):
        # check exit code signal
        sproc = get_test_subprocess()
        p = psutil.Process(sproc.pid)
        p.kill()
        code = p.wait()
        if POSIX:
            self.assertEqual(code, signal.SIGKILL)
        else:
            self.assertEqual(code, 0)
        self.assertFalse(p.is_running())

        sproc = get_test_subprocess()
        p = psutil.Process(sproc.pid)
        p.terminate()
        code = p.wait()
        if POSIX:
            self.assertEqual(code, signal.SIGTERM)
        else:
            self.assertEqual(code, 0)
        self.assertFalse(p.is_running())

        # check sys.exit() code
        code = "import time, sys; time.sleep(0.01); sys.exit(5);"
        sproc = get_test_subprocess([PYTHON, "-c", code])
        p = psutil.Process(sproc.pid)
        self.assertEqual(p.wait(), 5)
        self.assertFalse(p.is_running())

        # Test wait() issued twice.
        # It is not supposed to raise NSP when the process is gone.
        # On UNIX this should return None, on Windows it should keep
        # returning the exit code.
        sproc = get_test_subprocess([PYTHON, "-c", code])
        p = psutil.Process(sproc.pid)
        self.assertEqual(p.wait(), 5)
        self.assertIn(p.wait(), (5, None))

        # test timeout
        sproc = get_test_subprocess()
        p = psutil.Process(sproc.pid)
        p.name()
        self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01)

        # timeout < 0 not allowed
        self.assertRaises(ValueError, p.wait, -1)

    # XXX why is this skipped on Windows?
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_disk_partitions(self):
        # all = False
        ls = psutil.disk_partitions(all=False)
        # on travis we get:
        #     self.assertEqual(p.cpu_affinity(), [n])
        # AssertionError: Lists differ: [0, 1, 2, 3, 4, 5, 6, 7,... != [0]
        self.assertTrue(ls, msg=ls)
        for disk in ls:
            if WINDOWS and 'cdrom' in disk.opts:
                continue
            if not POSIX:
                assert os.path.exists(disk.device), disk
            else:
                # we cannot make any assumption about this, see:
                # http://goo.gl/p9c43
                disk.device
            if SUNOS:
                # on solaris apparently mount points can also be files
                assert os.path.exists(disk.mountpoint), disk
            else:
                assert os.path.isdir(disk.mountpoint), disk
            assert disk.fstype, disk
            self.assertIsInstance(disk.opts, str)

        # all = True
        ls = psutil.disk_partitions(all=True)
        self.assertTrue(ls, msg=ls)
        for disk in psutil.disk_partitions(all=True):
            if not WINDOWS:
                try:
                    os.stat(disk.mountpoint)
                except OSError as err:
                    if TRAVIS and OSX and err.errno == errno.EIO:
                        continue
                    # http://mail.python.org/pipermail/python-dev/
                    #     2012-June/120787.html
                    if err.errno not in (errno.EPERM, errno.EACCES):
                        raise
                else:
                    if SUNOS:
                        # on solaris apparently mount points can also be files
                        assert os.path.exists(disk.mountpoint), disk
                    else:
                        assert os.path.isdir(disk.mountpoint), disk
            self.assertIsInstance(disk.fstype, str)
            self.assertIsInstance(disk.opts, str)

        def find_mount_point(path):
            path = os.path.abspath(path)
            while not os.path.ismount(path):
                path = os.path.dirname(path)
            return path.lower()

        mount = find_mount_point(__file__)
        mounts = [x.mountpoint.lower() for x in
                  psutil.disk_partitions(all=True)]
        self.assertIn(mount, mounts)
        psutil.disk_usage(mount)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_wait(self):
        # check exit code signal
        sproc = get_test_subprocess()
        p = psutil.Process(sproc.pid)
        p.kill()
        code = p.wait()
        if POSIX:
            self.assertEqual(code, -signal.SIGKILL)
        else:
            self.assertEqual(code, 0)
        self.assertFalse(p.is_running())

        sproc = get_test_subprocess()
        p = psutil.Process(sproc.pid)
        p.terminate()
        code = p.wait()
        if POSIX:
            self.assertEqual(code, -signal.SIGTERM)
        else:
            self.assertEqual(code, 0)
        self.assertFalse(p.is_running())

        # check sys.exit() code
        code = "import time, sys; time.sleep(0.01); sys.exit(5);"
        sproc = get_test_subprocess([PYTHON, "-c", code])
        p = psutil.Process(sproc.pid)
        self.assertEqual(p.wait(), 5)
        self.assertFalse(p.is_running())

        # Test wait() issued twice.
        # It is not supposed to raise NSP when the process is gone.
        # On UNIX this should return None, on Windows it should keep
        # returning the exit code.
        sproc = get_test_subprocess([PYTHON, "-c", code])
        p = psutil.Process(sproc.pid)
        self.assertEqual(p.wait(), 5)
        self.assertIn(p.wait(), (5, None))

        # test timeout
        sproc = get_test_subprocess()
        p = psutil.Process(sproc.pid)
        p.name()
        self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01)

        # timeout < 0 not allowed
        self.assertRaises(ValueError, p.wait, -1)

    # XXX why is this skipped on Windows?
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_disk_partitions(self):
        # all = False
        ls = psutil.disk_partitions(all=False)
        # on travis we get:
        #     self.assertEqual(p.cpu_affinity(), [n])
        # AssertionError: Lists differ: [0, 1, 2, 3, 4, 5, 6, 7,... != [0]
        self.assertTrue(ls, msg=ls)
        for disk in ls:
            self.assertIsInstance(disk.device, (str, unicode))
            self.assertIsInstance(disk.mountpoint, (str, unicode))
            self.assertIsInstance(disk.fstype, (str, unicode))
            self.assertIsInstance(disk.opts, (str, unicode))
            if WINDOWS and 'cdrom' in disk.opts:
                continue
            if not POSIX:
                assert os.path.exists(disk.device), disk
            else:
                # we cannot make any assumption about this, see:
                # http://goo.gl/p9c43
                disk.device
            if SUNOS:
                # on solaris apparently mount points can also be files
                assert os.path.exists(disk.mountpoint), disk
            else:
                assert os.path.isdir(disk.mountpoint), disk
            assert disk.fstype, disk

        # all = True
        ls = psutil.disk_partitions(all=True)
        self.assertTrue(ls, msg=ls)
        for disk in psutil.disk_partitions(all=True):
            if not WINDOWS:
                try:
                    os.stat(disk.mountpoint)
                except OSError as err:
                    if TRAVIS and OSX and err.errno == errno.EIO:
                        continue
                    # http://mail.python.org/pipermail/python-dev/
                    #     2012-June/120787.html
                    if err.errno not in (errno.EPERM, errno.EACCES):
                        raise
                else:
                    if SUNOS:
                        # on solaris apparently mount points can also be files
                        assert os.path.exists(disk.mountpoint), disk
                    else:
                        assert os.path.isdir(disk.mountpoint), disk
            self.assertIsInstance(disk.fstype, str)
            self.assertIsInstance(disk.opts, str)

        def find_mount_point(path):
            path = os.path.abspath(path)
            while not os.path.ismount(path):
                path = os.path.dirname(path)
            return path.lower()

        mount = find_mount_point(__file__)
        mounts = [x.mountpoint.lower() for x in
                  psutil.disk_partitions(all=True)]
        self.assertIn(mount, mounts)
        psutil.disk_usage(mount)
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_wait(self):
        # check exit code signal
        sproc = get_test_subprocess()
        p = psutil.Process(sproc.pid)
        p.kill()
        code = p.wait()
        if POSIX:
            self.assertEqual(code, -signal.SIGKILL)
        else:
            self.assertEqual(code, 0)
        self.assertFalse(p.is_running())

        sproc = get_test_subprocess()
        p = psutil.Process(sproc.pid)
        p.terminate()
        code = p.wait()
        if POSIX:
            self.assertEqual(code, -signal.SIGTERM)
        else:
            self.assertEqual(code, 0)
        self.assertFalse(p.is_running())

        # check sys.exit() code
        code = "import time, sys; time.sleep(0.01); sys.exit(5);"
        sproc = get_test_subprocess([PYTHON, "-c", code])
        p = psutil.Process(sproc.pid)
        self.assertEqual(p.wait(), 5)
        self.assertFalse(p.is_running())

        # Test wait() issued twice.
        # It is not supposed to raise NSP when the process is gone.
        # On UNIX this should return None, on Windows it should keep
        # returning the exit code.
        sproc = get_test_subprocess([PYTHON, "-c", code])
        p = psutil.Process(sproc.pid)
        self.assertEqual(p.wait(), 5)
        self.assertIn(p.wait(), (5, None))

        # test timeout
        sproc = get_test_subprocess()
        p = psutil.Process(sproc.pid)
        p.name()
        self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01)

        # timeout < 0 not allowed
        self.assertRaises(ValueError, p.wait, -1)

    # XXX why is this skipped on Windows?
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_disk_partitions(self):
        # all = False
        ls = psutil.disk_partitions(all=False)
        # on travis we get:
        #     self.assertEqual(p.cpu_affinity(), [n])
        # AssertionError: Lists differ: [0, 1, 2, 3, 4, 5, 6, 7,... != [0]
        self.assertTrue(ls, msg=ls)
        for disk in ls:
            self.assertIsInstance(disk.device, (str, unicode))
            self.assertIsInstance(disk.mountpoint, (str, unicode))
            self.assertIsInstance(disk.fstype, (str, unicode))
            self.assertIsInstance(disk.opts, (str, unicode))
            if WINDOWS and 'cdrom' in disk.opts:
                continue
            if not POSIX:
                assert os.path.exists(disk.device), disk
            else:
                # we cannot make any assumption about this, see:
                # http://goo.gl/p9c43
                disk.device
            if SUNOS:
                # on solaris apparently mount points can also be files
                assert os.path.exists(disk.mountpoint), disk
            else:
                assert os.path.isdir(disk.mountpoint), disk
            assert disk.fstype, disk

        # all = True
        ls = psutil.disk_partitions(all=True)
        self.assertTrue(ls, msg=ls)
        for disk in psutil.disk_partitions(all=True):
            if not WINDOWS:
                try:
                    os.stat(disk.mountpoint)
                except OSError as err:
                    if TRAVIS and OSX and err.errno == errno.EIO:
                        continue
                    # http://mail.python.org/pipermail/python-dev/
                    #     2012-June/120787.html
                    if err.errno not in (errno.EPERM, errno.EACCES):
                        raise
                else:
                    if SUNOS:
                        # on solaris apparently mount points can also be files
                        assert os.path.exists(disk.mountpoint), disk
                    else:
                        assert os.path.isdir(disk.mountpoint), disk
            self.assertIsInstance(disk.fstype, str)
            self.assertIsInstance(disk.opts, str)

        def find_mount_point(path):
            path = os.path.abspath(path)
            while not os.path.ismount(path):
                path = os.path.dirname(path)
            return path.lower()

        mount = find_mount_point(__file__)
        mounts = [x.mountpoint.lower() for x in
                  psutil.disk_partitions(all=True)]
        self.assertIn(mount, mounts)
        psutil.disk_usage(mount)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_wait_procs(self):
        def callback(p):
            l.append(p.pid)

        l = []
        sproc1 = get_test_subprocess()
        sproc2 = get_test_subprocess()
        sproc3 = get_test_subprocess()
        procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)]
        self.assertRaises(ValueError, psutil.wait_procs, procs, timeout=-1)
        self.assertRaises(TypeError, psutil.wait_procs, procs, callback=1)
        t = time.time()
        gone, alive = psutil.wait_procs(procs, timeout=0.01, callback=callback)

        self.assertLess(time.time() - t, 0.5)
        self.assertEqual(gone, [])
        self.assertEqual(len(alive), 3)
        self.assertEqual(l, [])
        for p in alive:
            self.assertFalse(hasattr(p, 'returncode'))

        @retry_before_failing(30)
        def test(procs, callback):
            gone, alive = psutil.wait_procs(procs, timeout=0.03,
                                            callback=callback)
            self.assertEqual(len(gone), 1)
            self.assertEqual(len(alive), 2)
            return gone, alive

        sproc3.terminate()
        gone, alive = test(procs, callback)
        self.assertIn(sproc3.pid, [x.pid for x in gone])
        if POSIX:
            self.assertEqual(gone.pop().returncode, signal.SIGTERM)
        else:
            self.assertEqual(gone.pop().returncode, 1)
        self.assertEqual(l, [sproc3.pid])
        for p in alive:
            self.assertFalse(hasattr(p, 'returncode'))

        @retry_before_failing(30)
        def test(procs, callback):
            gone, alive = psutil.wait_procs(procs, timeout=0.03,
                                            callback=callback)
            self.assertEqual(len(gone), 3)
            self.assertEqual(len(alive), 0)
            return gone, alive

        sproc1.terminate()
        sproc2.terminate()
        gone, alive = test(procs, callback)
        self.assertEqual(set(l), set([sproc1.pid, sproc2.pid, sproc3.pid]))
        for p in gone:
            self.assertTrue(hasattr(p, 'returncode'))
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_wait_procs(self):
        def callback(p):
            l.append(p.pid)

        l = []
        sproc1 = get_test_subprocess()
        sproc2 = get_test_subprocess()
        sproc3 = get_test_subprocess()
        procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)]
        self.assertRaises(ValueError, psutil.wait_procs, procs, timeout=-1)
        self.assertRaises(TypeError, psutil.wait_procs, procs, callback=1)
        t = time.time()
        gone, alive = psutil.wait_procs(procs, timeout=0.01, callback=callback)

        self.assertLess(time.time() - t, 0.5)
        self.assertEqual(gone, [])
        self.assertEqual(len(alive), 3)
        self.assertEqual(l, [])
        for p in alive:
            self.assertFalse(hasattr(p, 'returncode'))

        @retry_before_failing(30)
        def test(procs, callback):
            gone, alive = psutil.wait_procs(procs, timeout=0.03,
                                            callback=callback)
            self.assertEqual(len(gone), 1)
            self.assertEqual(len(alive), 2)
            return gone, alive

        sproc3.terminate()
        gone, alive = test(procs, callback)
        self.assertIn(sproc3.pid, [x.pid for x in gone])
        if POSIX:
            self.assertEqual(gone.pop().returncode, -signal.SIGTERM)
        else:
            self.assertEqual(gone.pop().returncode, 1)
        self.assertEqual(l, [sproc3.pid])
        for p in alive:
            self.assertFalse(hasattr(p, 'returncode'))

        @retry_before_failing(30)
        def test(procs, callback):
            gone, alive = psutil.wait_procs(procs, timeout=0.03,
                                            callback=callback)
            self.assertEqual(len(gone), 3)
            self.assertEqual(len(alive), 0)
            return gone, alive

        sproc1.terminate()
        sproc2.terminate()
        gone, alive = test(procs, callback)
        self.assertEqual(set(l), set([sproc1.pid, sproc2.pid, sproc3.pid]))
        for p in gone:
            self.assertTrue(hasattr(p, 'returncode'))
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_wait_procs(self):
        def callback(p):
            pids.append(p.pid)

        pids = []
        sproc1 = get_test_subprocess()
        sproc2 = get_test_subprocess()
        sproc3 = get_test_subprocess()
        procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)]
        self.assertRaises(ValueError, psutil.wait_procs, procs, timeout=-1)
        self.assertRaises(TypeError, psutil.wait_procs, procs, callback=1)
        t = time.time()
        gone, alive = psutil.wait_procs(procs, timeout=0.01, callback=callback)

        self.assertLess(time.time() - t, 0.5)
        self.assertEqual(gone, [])
        self.assertEqual(len(alive), 3)
        self.assertEqual(pids, [])
        for p in alive:
            self.assertFalse(hasattr(p, 'returncode'))

        @retry_before_failing(30)
        def test(procs, callback):
            gone, alive = psutil.wait_procs(procs, timeout=0.03,
                                            callback=callback)
            self.assertEqual(len(gone), 1)
            self.assertEqual(len(alive), 2)
            return gone, alive

        sproc3.terminate()
        gone, alive = test(procs, callback)
        self.assertIn(sproc3.pid, [x.pid for x in gone])
        if POSIX:
            self.assertEqual(gone.pop().returncode, -signal.SIGTERM)
        else:
            self.assertEqual(gone.pop().returncode, 1)
        self.assertEqual(pids, [sproc3.pid])
        for p in alive:
            self.assertFalse(hasattr(p, 'returncode'))

        @retry_before_failing(30)
        def test(procs, callback):
            gone, alive = psutil.wait_procs(procs, timeout=0.03,
                                            callback=callback)
            self.assertEqual(len(gone), 3)
            self.assertEqual(len(alive), 0)
            return gone, alive

        sproc1.terminate()
        sproc2.terminate()
        gone, alive = test(procs, callback)
        self.assertEqual(set(pids), set([sproc1.pid, sproc2.pid, sproc3.pid]))
        for p in gone:
            self.assertTrue(hasattr(p, 'returncode'))