Python psutil 模块,pids() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用psutil.pids()

项目:Chasar    作者:camilochs    | 项目源码 | 文件源码
def pids_active(pids_computer):
    """
    This function find pids of computer and return the valid.
    """
    pid_valid = {}
    for pid in pids_computer:
        data = None
        try:
            process = psutil.Process(pid)
            data = {"pid": process.pid,
                    "status": process.status(),
                    "percent_cpu_used": process.cpu_percent(interval=0.0),
                    "percent_memory_used": process.memory_percent()}

        except (psutil.ZombieProcess, psutil.AccessDenied, psutil.NoSuchProcess):
            data = None

        if data is not None:
            pid_valid[process.name()] = data
    return pid_valid
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_procfs_path(self):
        tdir = tempfile.mkdtemp()
        try:
            psutil.PROCFS_PATH = tdir
            self.assertRaises(IOError, psutil.virtual_memory)
            self.assertRaises(IOError, psutil.cpu_times)
            self.assertRaises(IOError, psutil.cpu_times, percpu=True)
            self.assertRaises(IOError, psutil.boot_time)
            # self.assertRaises(IOError, psutil.pids)
            self.assertRaises(IOError, psutil.net_connections)
            self.assertRaises(IOError, psutil.net_io_counters)
            self.assertRaises(IOError, psutil.net_if_stats)
            self.assertRaises(IOError, psutil.disk_io_counters)
            self.assertRaises(IOError, psutil.disk_partitions)
            self.assertRaises(psutil.NoSuchProcess, psutil.Process)
        finally:
            psutil.PROCFS_PATH = "/proc"
            os.rmdir(tdir)
项目:OSPTF    作者:xSploited    | 项目源码 | 文件源码
def getSidToken(token_sid):

    # trying to get system privileges
    if token_sid == "S-1-5-18":
        sids = ListSids()
        for sid in sids:
            if "winlogon" in sid[1].lower():
                hToken = gethTokenFromPid(sid[0])
                if hToken:
                    print "\t[+] Using PID: " + str(sid[0])
                    return hToken
                else:
                    return None

    # trying to impersonate a token 
    else:
        pids = [int(x) for x in psutil.pids() if int(x)>4]

        for pid in pids:
            hToken = gethTokenFromPid(pid)
            if hToken:
                if GetTokenSid( hToken ) == token_sid:
                    print "\t[+] Using PID: " + str(pid)
                    return hToken
项目:uac-a-mola    作者:ElevenPaths    | 项目源码 | 文件源码
def run_module(self):
        # To access user provided attributes, use self.options dictionary

        Thread(target=self.monitoring).start()

        print "\n[*] STARTING PROGRAMS EXECUTION IN 5 SECONDS...\n"
        time.sleep(5)

        for b in self.binaries():
            self.print_info("  [+] Executing %s ..." % b)
            previous_pid = psutil.pids()
            self.execute(b)
            time.sleep(int(self.args["sleep_time"]))
            new_pid = psutil.pids()
            print "  [-] Killing the process"
            self.kill(b.split('.')[0], previous_pid, new_pid)

        print "\n[*] PLEASE CLOSE PROCMON PROCESS\n"
        self.parsing_results()
        print "\n[*] RESULTS PARSED TO XML\n"
项目:Chasar    作者:camilochs    | 项目源码 | 文件源码
def pids_active(pids_computer):
    """
    This function find pids of computer and return the valid.
    """
    pid_valid = {}
    for pid in pids_computer:
        data = None
        try:
            process = psutil.Process(pid)
            data = {"pid": process.pid,
                    "status": process.status(),
                    "percent_cpu_used": process.cpu_percent(interval=0.0),
                    "percent_memory_used": process.memory_percent()}

        except (psutil.ZombieProcess, psutil.AccessDenied, psutil.NoSuchProcess):
            data = None

        if data is not None:
            pid_valid[process.name()] = data
    return pid_valid
项目:dcmha    作者:wwwbjqcom    | 项目源码 | 文件源码
def ReStart(groupname):
    '''??mysqlrouter'''
    restart_stat = None
    pids = psutil.pids()
    for pid in pids:
        p = psutil.Process(pid)
        cmdline = p.cmdline()
        if groupname+'.conf' in cmdline:
            try:
                os.kill(pid, 9)
                os.popen('cd /etc/mysqlrouter;nohup mysqlrouter -c mysqlrouter.conf -a %s.conf &' % groupname)
                return True
            except Exception,e:
                logging.error(traceback.format_exc())
                return False
            restart_stat = True
            break
    if restart_stat is None:
        os.popen('cd /etc/mysqlrouter;nohup mysqlrouter -c mysqlrouter.conf -a %s.conf &' % groupname)
项目:pupy    作者:ru-faraon    | 项目源码 | 文件源码
def getSidToken(token_sid):

    # trying to get system privileges
    if token_sid == "S-1-5-18":
        sids = ListSids()
        for sid in sids:
            if "winlogon" in sid[1].lower():
                hToken = gethTokenFromPid(sid[0])
                if hToken:
                    print "\t[+] Using PID: " + str(sid[0])
                    return hToken
                else:
                    return None

    # trying to impersonate a token 
    else:
        pids = [int(x) for x in psutil.pids() if int(x)>4]

        for pid in pids:
            hToken = gethTokenFromPid(pid)
            if hToken:
                if GetTokenSid( hToken ) == token_sid:
                    print "\t[+] Using PID: " + str(pid)
                    return hToken
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_exe_mocked(self):
        with mock.patch('psutil._pslinux.os.readlink',
                        side_effect=OSError(errno.ENOENT, "")) as m:
            # No such file error; might be raised also if /proc/pid/exe
            # path actually exists for system processes with low pids
            # (about 0-20). In this case psutil is supposed to return
            # an empty string.
            ret = psutil.Process().exe()
            assert m.called
            self.assertEqual(ret, "")

            # ...but if /proc/pid no longer exist we're supposed to treat
            # it as an alias for zombie process
            with mock.patch('psutil._pslinux.os.path.lexists',
                            return_value=False):
                self.assertRaises(psutil.ZombieProcess, psutil.Process().exe)
项目:WIFIHTTPMonitor    作者:kbdancer    | 项目源码 | 文件源码
def POST(self):
        cpuused = psutil.cpu_percent(interval=None, percpu=False)
        memused = psutil.virtual_memory()[2]
        diskused = psutil.disk_usage('/')[3]
        pidcount = len(psutil.pids())

        uptimeshell = subprocess.Popen(['uptime'], stderr=subprocess.PIPE,stdout=subprocess.PIPE)
        uptimeshell.wait()
        uptime = uptimeshell.communicate()

        return json.dumps(
            {
                "code": 0,
                "current": {
                    "cpuused": cpuused,
                    "memused": memused,
                    "diskused": diskused,
                    "pidcount": pidcount,
                    "uptime": uptime
                }
            }
        )
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_procfs_path(self):
        tdir = tempfile.mkdtemp()
        try:
            psutil.PROCFS_PATH = tdir
            self.assertRaises(IOError, psutil.virtual_memory)
            self.assertRaises(IOError, psutil.cpu_times)
            self.assertRaises(IOError, psutil.cpu_times, percpu=True)
            self.assertRaises(IOError, psutil.boot_time)
            # self.assertRaises(IOError, psutil.pids)
            self.assertRaises(IOError, psutil.net_connections)
            self.assertRaises(IOError, psutil.net_io_counters)
            self.assertRaises(IOError, psutil.net_if_stats)
            self.assertRaises(IOError, psutil.disk_io_counters)
            self.assertRaises(IOError, psutil.disk_partitions)
            self.assertRaises(psutil.NoSuchProcess, psutil.Process)
        finally:
            psutil.PROCFS_PATH = "/proc"
            os.rmdir(tdir)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_issue_687(self):
        # In case of thread ID:
        # - pid_exists() is supposed to return False
        # - Process(tid) is supposed to work
        # - pids() should not return the TID
        # See: https://github.com/giampaolo/psutil/issues/687
        t = ThreadTask()
        t.start()
        try:
            p = psutil.Process()
            tid = p.threads()[1].id
            assert not psutil.pid_exists(tid), tid
            pt = psutil.Process(tid)
            pt.as_dict()
            self.assertNotIn(tid, psutil.pids())
        finally:
            t.stop()
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_exe_mocked(self):
        with mock.patch('psutil._pslinux.os.readlink',
                        side_effect=OSError(errno.ENOENT, "")) as m:
            # No such file error; might be raised also if /proc/pid/exe
            # path actually exists for system processes with low pids
            # (about 0-20). In this case psutil is supposed to return
            # an empty string.
            ret = psutil.Process().exe()
            assert m.called
            self.assertEqual(ret, "")

            # ...but if /proc/pid no longer exist we're supposed to treat
            # it as an alias for zombie process
            with mock.patch('psutil._pslinux.os.path.lexists',
                            return_value=False):
                self.assertRaises(psutil.ZombieProcess, psutil.Process().exe)
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_procfs_path(self):
        tdir = tempfile.mkdtemp()
        try:
            psutil.PROCFS_PATH = tdir
            self.assertRaises(IOError, psutil.virtual_memory)
            self.assertRaises(IOError, psutil.cpu_times)
            self.assertRaises(IOError, psutil.cpu_times, percpu=True)
            self.assertRaises(IOError, psutil.boot_time)
            # self.assertRaises(IOError, psutil.pids)
            self.assertRaises(IOError, psutil.net_connections)
            self.assertRaises(IOError, psutil.net_io_counters)
            self.assertRaises(IOError, psutil.net_if_stats)
            self.assertRaises(IOError, psutil.disk_io_counters)
            self.assertRaises(IOError, psutil.disk_partitions)
            self.assertRaises(psutil.NoSuchProcess, psutil.Process)
        finally:
            psutil.PROCFS_PATH = "/proc"
            os.rmdir(tdir)
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_issue_687(self):
        # In case of thread ID:
        # - pid_exists() is supposed to return False
        # - Process(tid) is supposed to work
        # - pids() should not return the TID
        # See: https://github.com/giampaolo/psutil/issues/687
        t = ThreadTask()
        t.start()
        try:
            p = psutil.Process()
            tid = p.threads()[1].id
            assert not psutil.pid_exists(tid), tid
            pt = psutil.Process(tid)
            pt.as_dict()
            self.assertNotIn(tid, psutil.pids())
        finally:
            t.stop()


# =====================================================================
# --- sensors
# =====================================================================
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_exe_mocked(self):
        with mock.patch('psutil._pslinux.os.readlink',
                        side_effect=OSError(errno.ENOENT, "")) as m:
            # No such file error; might be raised also if /proc/pid/exe
            # path actually exists for system processes with low pids
            # (about 0-20). In this case psutil is supposed to return
            # an empty string.
            ret = psutil.Process().exe()
            assert m.called
            self.assertEqual(ret, "")

            # ...but if /proc/pid no longer exist we're supposed to treat
            # it as an alias for zombie process
            with mock.patch('psutil._pslinux.os.path.lexists',
                            return_value=False):
                self.assertRaises(psutil.ZombieProcess, psutil.Process().exe)
项目:tm-manifesting    作者:FabricAttachedMemory    | 项目源码 | 文件源码
def kill_pid(pid, procname='', daemon=True):
    '''Find a PID with optional qualifications and kill it.'''
    if pid not in psutil.pids():
        return False
    for p in psutil.process_iter():
        if p.pid != pid:
            continue
        if procname and p.name() != procname:
            continue
        if daemon and p.ppid() != 1:
            continue
        _kill_pid_object(p)
        return True
    return False

###########################################################################
# Some packages (nscd, nslcd) start their daemons after installation.
# If I'm in an ARM chroot, they live after the chroot exits.  Kill them.
# In a cascade situation, sometimes the /proc/<PID>/root link is changed
# to '/some/path (deleted)'.  This routine still works.
项目:uac-a-mola    作者:ElevenPaths    | 项目源码 | 文件源码
def last_process_created(self, prev_pids, new_pids):
        pids = []
        for p in new_pids:
            if p not in prev_pids:
                pids.append(p)
        return pids
项目:uac-a-mola    作者:ElevenPaths    | 项目源码 | 文件源码
def last_process_created(self, prev_pids, new_pids):
        pids = []
        for p in new_pids:
            if p not in prev_pids:
                pids.append(p)
        return pids
项目:uac-a-mola    作者:ElevenPaths    | 项目源码 | 文件源码
def is_cmd_open(self, prev_pids):
        created_pids = []
        for pid in psutil.pids():
            if pid not in prev_pids:
                created_pids.append(pid)
        try:
            return 'cmd.exe' in [psutil.Process(p).name() for p in created_pids]
        except:
            pass
项目:uac-a-mola    作者:ElevenPaths    | 项目源码 | 文件源码
def handle_dll_local(self, subpath, binary):
        path = subpath + "\\x86_microsoft.windows.common-controls_6595b64144ccf1df_6.0.15063.0_none_583b8639f462029f\\"
        try:
            try:
                subprocess.check_call(
                    ["powershell", "-C", "rm", "-r", "-Force", subpath, "-erroraction", "'silentlycontinue'"])
            except:
                pass

            print "[+] Creating: " + path
            subprocess.check_call(
                ["powershell", "-C", "mkdir", path, ">", "$null"])

            print "[+] Copying the malicious dll to the path"
            subprocess.check_call(
                ["powershell", "-C", "cp", self.args["malicious_dll"], path])

            prev_pids = psutil.pids()
            print "[*] Executing the binary"
            subprocess.check_call(["powershell", "-C", binary])
            time.sleep(1)

            if self.is_cmd_open(prev_pids):
                print colored("[*] THIS BINARY IS VULNERABLE TO DLL HIJACKING UAC BYPASS!", 'cyan', attrs=['bold'])
                if binary not in self._results["vulnerables"]:
                    self._results["vulnerables"].append(binary)
            else:
                if binary not in self._results['sospechosos']:
                    self._results['sospechosos'].append(binary)

            new_pids = psutil.pids()

            self.kill(binary, prev_pids, new_pids)

            print "[-] Deleting the path and cleaning up\n"
            subprocess.check_call(
                ["powershell", "-C", "rm", "-r", "-Force", subpath])

        except subprocess.CalledProcessError as error:
            print "ERROR: COPYING THE FILE"
项目:uac-a-mola    作者:ElevenPaths    | 项目源码 | 文件源码
def is_cmd_open(self, prev_pids):
        created_pids = []
        for pid in psutil.pids():
            if pid not in prev_pids:
                created_pids.append(pid)
        try:
            return 'cmd.exe' in [psutil.Process(p).name() for p in created_pids]
        except:
            pass
项目:uac-a-mola    作者:ElevenPaths    | 项目源码 | 文件源码
def handle_dll_local(self, subpath, binary, clean):
        path = subpath + "\\x86_microsoft.windows.common-controls_6595b64144ccf1df_6.0.15063.0_none_583b8639f462029f\\"
        try:
            print "[+] Creating: " + path
            subprocess.check_call(
                ["powershell", "-C", "mkdir", path, ">", "$null"])
            print "[+] Copying the malicious dll to the path"
            subprocess.check_call(
                ["powershell", "-C", "cp", self.args["malicious_dll"], path])
            prev_pids = psutil.pids()
            print "[*] Executing the binary"
            subprocess.check_call(["powershell", "-C", binary])
        except subprocess.CalledProcessError as error:
            self.print_ko(str(error) + "\n")
项目:uac-a-mola    作者:ElevenPaths    | 项目源码 | 文件源码
def clean_path(self, subpath, binary):
        print "[-] Deleting the path and cleaning up\n"
        for pid in psutil.pids():
            if binary in psutil.Process(pid).name():
                try:
                    print "Killing the process..."
                    subprocess.check_call(["taskkill", "/t", "/f", "/pid", str(pid)])
                except subprocess.CalledProcessError:
                    self.print_ko("[!!] The process %s can't be killed" % binary)
        subprocess.check_call(
            ["powershell", "-C", "rm", "-r", "-Force", subpath])
项目:utils    作者:rapydo    | 项目源码 | 文件源码
def find(prefix, suffixes=None, local_bin=False):

    current_pid = os.getpid()

    for pid in psutil.pids():

        if pid == current_pid or not psutil.pid_exists(pid):
            continue
        process = psutil.Process(pid)

        if process.name() == prefix:
            cmd = process.cmdline()

            if local_bin:
                check = False
                for word in cmd:
                    if '/usr/local/bin' in word:
                        check = True
                if not check:
                    continue

            if suffixes is not None:
                check = False
                for word in cmd:
                    if word in suffixes:
                        check = True
                if not check:
                    continue

            log.warning('Already existing')  # : %s' % cmd)
            return True

    return False
项目:Chasar    作者:camilochs    | 项目源码 | 文件源码
def process_send_data(socket, context):
    """
    Send all memory, cpu, disk, network data of computer to server(master node)
    """
    while True:
        try:
            cpu_percent = psutil.cpu_percent(interval=1, percpu=True)
            memory_info = psutil.virtual_memory()
            disk_info = psutil.disk_usage('/')
            pids_computer = psutil.pids()

            info_to_send = json.dumps({
                "computer_utc_clock": str(datetime.datetime.utcnow()),
                "computer_clock": str(datetime.datetime.now()),
                "hostname": platform.node(),
                "mac_address": mac_address(),
                "ipv4_interfaces": internet_addresses(),
                "cpu": {
                    "percent_used": cpu_percent
                },
                "memory": {
                    "total_bytes": memory_info.total,
                    "total_bytes_used": memory_info.used,
                    "percent_used": memory_info.percent
                },
                "disk": {
                    "total_bytes": disk_info.total,
                    "total_bytes_used": disk_info.used,
                    "total_bytes_free": disk_info.free,
                    "percent_used": disk_info.percent
                },
                "process": pids_active(pids_computer)

            }).encode()
            #send json data in the channel 'status', although is not necessary to send.
            socket.send_multipart([b"status", info_to_send])
            #time.sleep(0.500)

        except (KeyboardInterrupt, SystemExit):
            socket.close()
            context.term()
项目:Chasar    作者:camilochs    | 项目源码 | 文件源码
def process_send_data(socket, context):
    """
    Send all memory, cpu, disk, network data of computer to server(master node)
    """
    while True:
        try:
            cpu_percent = psutil.cpu_percent(interval=1, percpu=True)
            memory_info = psutil.virtual_memory()
            disk_info = psutil.disk_usage('/')
            pids_computer = psutil.pids()

            info_to_send = json.dumps({
                "computer_utc_clock": str(datetime.datetime.utcnow()),
                "computer_clock": str(datetime.datetime.now()),
                "hostname": platform.node(),
                "mac_address": mac_address(),
                "ipv4_interfaces": internet_addresses(),
                "cpu": {
                    "percent_used": cpu_percent
                },
                "memory": {
                    "total_bytes": memory_info.total,
                    "total_bytes_used": memory_info.used,
                    "percent_used": memory_info.percent
                },
                "disk": {
                    "total_bytes": disk_info.total,
                    "total_bytes_used": disk_info.used,
                    "total_bytes_free": disk_info.free,
                    "percent_used": disk_info.percent
                },
                "process": pids_active(pids_computer)

            }).encode()
            #send json data in the channel 'status', although is not necessary to send.
            socket.send_multipart([b"status", info_to_send])
            #time.sleep(0.500)

        except (KeyboardInterrupt, SystemExit):
            socket.close()
            context.term()
项目:video-importer    作者:tnc-ca-geo    | 项目源码 | 文件源码
def lock_or_exit(self, lock_filename, message="process %s is already running"):
        pid = psutil.Process().pid
        if os.path.exists(lock_filename):
            with open(lock_filename) as lockfile:            
                old_pid = int(lockfile.read())
                if old_pid in psutil.pids():
                    logging.info(message % old_pid)
                    sys.exit(1)
                else:
                    os.unlink(lock_filename)
项目:corgi    作者:el-ethan    | 项目源码 | 文件源码
def running_emacs_count(self):
        """Count how many instances of Emacs are currently open"""
        try:
            import psutil
            return [psutil.Process(p).name() for p in psutil.pids()].count('emacs')
        except ImportError:
            Logger.warning('Cannot determine how many instances of Emacs are running. This may cause sync issues')
            return 0
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_wait_for_pid(self):
        wait_for_pid(os.getpid())
        nopid = max(psutil.pids()) + 99999
        with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])):
            self.assertRaises(psutil.NoSuchProcess, wait_for_pid, nopid)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_pids(self):
        # Note: this test might fail if the OS is starting/killing
        # other processes in the meantime
        w = wmi.WMI().Win32_Process()
        wmi_pids = set([x.ProcessId for x in w])
        psutil_pids = set(psutil.pids())
        self.assertEqual(wmi_pids, psutil_pids)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_send_signal(self):
        sig = signal.SIGKILL if POSIX else signal.SIGTERM
        sproc = get_test_subprocess()
        p = psutil.Process(sproc.pid)
        p.send_signal(sig)
        exit_sig = p.wait()
        self.assertFalse(psutil.pid_exists(p.pid))
        if POSIX:
            self.assertEqual(exit_sig, sig)
            #
            sproc = get_test_subprocess()
            p = psutil.Process(sproc.pid)
            p.send_signal(sig)
            with mock.patch('psutil.os.kill',
                            side_effect=OSError(errno.ESRCH, "")):
                with self.assertRaises(psutil.NoSuchProcess):
                    p.send_signal(sig)
            #
            sproc = get_test_subprocess()
            p = psutil.Process(sproc.pid)
            p.send_signal(sig)
            with mock.patch('psutil.os.kill',
                            side_effect=OSError(errno.EPERM, "")):
                with self.assertRaises(psutil.AccessDenied):
                    psutil.Process().send_signal(sig)
            # Sending a signal to process with PID 0 is not allowed as
            # it would affect every process in the process group of
            # the calling process (os.getpid()) instead of PID 0").
            if 0 in psutil.pids():
                p = psutil.Process(0)
                self.assertRaises(ValueError, p.send_signal, signal.SIGTERM)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_pid_0(self):
        # Process(0) is supposed to work on all platforms except Linux
        if 0 not in psutil.pids():
            self.assertRaises(psutil.NoSuchProcess, psutil.Process, 0)
            return

        # test all methods
        p = psutil.Process(0)
        for name in psutil._as_dict_attrnames:
            if name == 'pid':
                continue
            meth = getattr(p, name)
            try:
                ret = meth()
            except psutil.AccessDenied:
                pass
            else:
                if name in ("uids", "gids"):
                    self.assertEqual(ret.real, 0)
                elif name == "username":
                    if POSIX:
                        self.assertEqual(p.username(), 'root')
                    elif WINDOWS:
                        self.assertEqual(p.username(), 'NT AUTHORITY\\SYSTEM')
                elif name == "name":
                    assert name, name

        if hasattr(p, 'rlimit'):
            try:
                p.rlimit(psutil.RLIMIT_FSIZE)
            except psutil.AccessDenied:
                pass

        p.as_dict()

        if not OPENBSD:
            self.assertIn(0, psutil.pids())
            self.assertTrue(psutil.pid_exists(0))
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_pid_exists(self):
        sproc = get_test_subprocess()
        self.assertTrue(psutil.pid_exists(sproc.pid))
        p = psutil.Process(sproc.pid)
        p.kill()
        p.wait()
        self.assertFalse(psutil.pid_exists(sproc.pid))
        self.assertFalse(psutil.pid_exists(-1))
        self.assertEqual(psutil.pid_exists(0), 0 in psutil.pids())
        # pid 0
        psutil.pid_exists(0) == 0 in psutil.pids()
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_pid_exists_2(self):
        reap_children()
        pids = psutil.pids()
        for pid in pids:
            try:
                assert psutil.pid_exists(pid)
            except AssertionError:
                # in case the process disappeared in meantime fail only
                # if it is no longer in psutil.pids()
                time.sleep(.1)
                if pid in psutil.pids():
                    self.fail(pid)
        pids = range(max(pids) + 5000, max(pids) + 6000)
        for pid in pids:
            self.assertFalse(psutil.pid_exists(pid), msg=pid)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_pids(self):
        plist = [x.pid for x in psutil.process_iter()]
        pidlist = psutil.pids()
        self.assertEqual(plist.sort(), pidlist.sort())
        # make sure every pid is unique
        self.assertEqual(len(pidlist), len(set(pidlist)))
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_pids(self):
        # Note: this test might fail if the OS is starting/killing
        # other processes in the meantime
        if SUNOS:
            cmd = ["ps", "-A", "-o", "pid"]
        else:
            cmd = ["ps", "ax", "-o", "pid"]
        p = get_test_subprocess(cmd, stdout=subprocess.PIPE)
        output = p.communicate()[0].strip()
        assert p.poll() == 0
        if PY3:
            output = str(output, sys.stdout.encoding)
        pids_ps = []
        for line in output.split('\n')[1:]:
            if line:
                pid = int(line.split()[0].strip())
                pids_ps.append(pid)
        # remove ps subprocess pid which is supposed to be dead in meantime
        pids_ps.remove(p.pid)
        pids_psutil = psutil.pids()
        pids_ps.sort()
        pids_psutil.sort()

        # on OSX ps doesn't show pid 0
        if OSX and 0 not in pids_ps:
            pids_ps.insert(0, 0)

        if pids_ps != pids_psutil:
            difference = [x for x in pids_psutil if x not in pids_ps] + \
                         [x for x in pids_ps if x not in pids_psutil]
            self.fail("difference: " + str(difference))

    # for some reason ifconfig -a does not report all interfaces
    # returned by psutil
项目:ahenk    作者:Pardus-LiderAhenk    | 项目源码 | 文件源码
def pids():
            return psutil.pids()
项目:ahenk    作者:Pardus-LiderAhenk    | 项目源码 | 文件源码
def find_pids_by_name(p_name):
            arr = []
            for pid in psutil.pids():
                if psutil.Process(id).name() == p_name:
                    arr.append(pid)
            return arr
项目:ahenk    作者:Pardus-LiderAhenk    | 项目源码 | 文件源码
def kill_by_pids(pids):
            for pid in pids:
                psutil.Process(pid).kill()
项目:pwndemo    作者:zh-explorer    | 项目源码 | 文件源码
def ancestors(pid):
    """ancestors(pid) -> int list

    Arguments:
        pid (int): PID of the process.

    Returns:
        List of PIDs of whose parent process is `pid` or an ancestor of `pid`.
    """
    pids = []
    while pid != 0:
         pids.append(pid)
         pid = parent(pid)
    return pids
项目:wow-fishipy    作者:kioltk    | 项目源码 | 文件源码
def check_process():
    print 'Checking WoW is running'
    wow_process_names = ["World of Warcraft"]
    running = False
    for pid in psutil.pids():
        p = psutil.Process(pid)
        if any(p.name() in s for s in wow_process_names):
            print(p.name())
            running = True
    if not running and not dev:
        print 'WoW is not running'
        exit()
    print 'WoW is running'
        # raw_input('Pleas e put your fishing-rod on key 1, zoom-in to max, move camera on fishing-float and press any key')
项目:jd-crawler    作者:qiaofei32    | 项目源码 | 文件源码
def process_count(self):
        p_count = 0
        pids = psutil.pids()
        for pid in pids:
            try:
                p = psutil.Process(pid)
                exe = p.exe()
                cmd = p.cmdline()
                cmd = " ".join(cmd)
                if "jd.py" in cmd:
                    # print pid, cmd[:30]
                    p_count += 1
            except:
                pass
        return p_count
项目:gcMapExplorer    作者:rjdkmr    | 项目源码 | 文件源码
def cleanScratch():
    """ Clean scratch directory.

    It checks whether any other gcMapExplorer process is running. In case,
    when only one process (i.e. current) is running, all files with "gcx"
    prefix will be deleted from default scratch directory.

    """
    config = getConfig()
    count = 0
    for pid in psutil.pids():
        p = None
        try:
            p = psutil.Process(pid)
        except psutil.NoSuchProcess:
            pass

        if p is not None:
            if 'gcMapExplorer' in p.name():
                count += 1

    # If only one gcMapExplorer is running, it is the current one
    if count == 1:
        for f in os.listdir(config['Dirs']['WorkingDirectory']):
            if not os.path.isfile(f):
                continue
            basename = os.path.basename(f)
            base = os.path.splitext(basename)[0]
            if "gcx" in base:
                try:
                    print(' Removing File: {0}'.format(f))
                    os.remove(f)
                except IOError:
                    pass

        print('     ... Finished Cleaning')
项目:odin    作者:imito    | 项目源码 | 文件源码
def get_system_status(memory_total=False, memory_total_actual=False,
                      memory_total_usage=False, memory_total_free=False,
                      all_pids=False, swap_memory=False, pid=False):
  """
  Parameters
  ----------
  threads: bool
      return dict {id: (user_time, system_time)}
  memory_maps: bool
      return dict {path: rss}

  Note
  ----
  All memory is returned in `MiB`
  To calculate memory_percent:
      get_system_status(memory_usage=True) / get_system_status(memory_total=True) * 100
  """
  import psutil
  # ====== general system query ====== #
  if memory_total:
    return psutil.virtual_memory().total / float(2 ** 20)
  if memory_total_actual:
    return psutil.virtual_memory().available / float(2 ** 20)
  if memory_total_usage:
    return psutil.virtual_memory().used / float(2 ** 20)
  if memory_total_free:
    return psutil.virtual_memory().free / float(2 ** 20)
  if swap_memory:
    tmp = psutil.swap_memory()
    tmp.total /= float(2 ** 20)
    tmp.used /= float(2 ** 20)
    tmp.free /= float(2 ** 20)
    tmp.sin /= float(2**20)
    tmp.sout /= float(2**20)
    return tmp
  if all_pids:
    return psutil.pids()
  if pid:
    return os.getpid()
项目:px    作者:genotrance    | 项目源码 | 文件源码
def quit():
    mypid = os.getpid()
    for pid in sorted(psutil.pids(), reverse=True):
        if pid == mypid:
            continue

        try:
            p = psutil.Process(pid)
            if p.exe() == sys.executable:
                p.send_signal(signal.CTRL_C_EVENT)
        except:
            pass
项目:futuquant    作者:FutunnOpen    | 项目源码 | 文件源码
def _get_process_by_path(self, path):
        """????????"""
        lower_path = str(path).lower()
        for pid in psutil.pids():
            try:
                process = psutil.Process(pid)
                tmp = process.exe()
                if str(tmp).lower() == lower_path:
                    return process
            except:
                continue
        return None
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_pids(self):
        self.execute(psutil.pids)

    # --- net
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_wait_for_pid(self):
        wait_for_pid(os.getpid())
        nopid = max(psutil.pids()) + 99999
        with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])):
            self.assertRaises(psutil.NoSuchProcess, wait_for_pid, nopid)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_send_signal(self):
        sig = signal.SIGKILL if POSIX else signal.SIGTERM
        sproc = get_test_subprocess()
        p = psutil.Process(sproc.pid)
        p.send_signal(sig)
        exit_sig = p.wait()
        self.assertFalse(psutil.pid_exists(p.pid))
        if POSIX:
            self.assertEqual(exit_sig, -sig)
            #
            sproc = get_test_subprocess()
            p = psutil.Process(sproc.pid)
            p.send_signal(sig)
            with mock.patch('psutil.os.kill',
                            side_effect=OSError(errno.ESRCH, "")):
                with self.assertRaises(psutil.NoSuchProcess):
                    p.send_signal(sig)
            #
            sproc = get_test_subprocess()
            p = psutil.Process(sproc.pid)
            p.send_signal(sig)
            with mock.patch('psutil.os.kill',
                            side_effect=OSError(errno.EPERM, "")):
                with self.assertRaises(psutil.AccessDenied):
                    psutil.Process().send_signal(sig)
            # Sending a signal to process with PID 0 is not allowed as
            # it would affect every process in the process group of
            # the calling process (os.getpid()) instead of PID 0").
            if 0 in psutil.pids():
                p = psutil.Process(0)
                self.assertRaises(ValueError, p.send_signal, signal.SIGTERM)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_pid_0(self):
        # Process(0) is supposed to work on all platforms except Linux
        if 0 not in psutil.pids():
            self.assertRaises(psutil.NoSuchProcess, psutil.Process, 0)
            return

        # test all methods
        p = psutil.Process(0)
        for name in psutil._as_dict_attrnames:
            if name == 'pid':
                continue
            meth = getattr(p, name)
            try:
                ret = meth()
            except psutil.AccessDenied:
                pass
            else:
                if name in ("uids", "gids"):
                    self.assertEqual(ret.real, 0)
                elif name == "username":
                    if POSIX:
                        self.assertEqual(p.username(), 'root')
                    elif WINDOWS:
                        self.assertEqual(p.username(), 'NT AUTHORITY\\SYSTEM')
                elif name == "name":
                    assert name, name

        if hasattr(p, 'rlimit'):
            try:
                p.rlimit(psutil.RLIMIT_FSIZE)
            except psutil.AccessDenied:
                pass

        p.as_dict()

        if not OPENBSD:
            self.assertIn(0, psutil.pids())
            self.assertTrue(psutil.pid_exists(0))