Python psutil 模块,pid_exists() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用psutil.pid_exists()

项目:user-sync.py    作者:adobe-apiplatform    | 项目源码 | 文件源码
def is_locked(self):
        # process is not locked if lock file does not exist
        if not os.path.exists(self.path):
            return False

        # if lock file exists, get pid
        with open(self.path, 'r') as f:
            lock_pid = f.read().strip()

        if not lock_pid:
            return False

        lock_pid = int(lock_pid)

        if psutil.pid_exists(lock_pid):
            return True
        else:
            return False
项目:OneDrive-L    作者:epam    | 项目源码 | 文件源码
def test_start(self):
        """
        Verifies that pidfile is created, process exists, pidfile is not empty.
        """

        result = os.system(self.cmd % (THIS_FILE_PATH, 'start', self.pidfile))
        sleep(1)

        self.assertTrue(os.path.exists(self.pidfile),
                        'Pidfile has not been created')

        with open(self.pidfile) as pid_file:
            pid = pid_file.read().rstrip()

        self.assertTrue(pid, 'Pidfile is empty')

        self.assertTrue(psutil.pid_exists(int(pid)),
                        'Process with specified pid does not exist')

        self.assertEqual(result, 0, 'Wrong return code')
项目:OneDrive-L    作者:epam    | 项目源码 | 文件源码
def test_stop(self):
        """
        Test daemon stop functionality. Verifies that pidfile is being removed,
        process does not exist.
        """

        with open(self.pidfile) as pid_file:
            pid = pid_file.read().rstrip()

        result = os.system(self.cmd % (THIS_FILE_PATH, 'stop', self.pidfile))
        sleep(1)

        self.assertEqual(result, 0, 'Wrong return code')

        self.assertFalse(psutil.pid_exists(int(pid)),
                         'Process with {0} pid still exists'.format(pid))

        self.assertFalse(os.path.exists(self.pidfile),
                         'Pidfile has not been removed')
项目:OneDrive-L    作者:epam    | 项目源码 | 文件源码
def test_restart(self):
        """
        Verify that pidfile is created, process exists, pidfile is not empty.
        """

        result = os.system(self.cmd % (THIS_FILE_PATH, 'restart',
                                       self.pidfile))
        sleep(1)

        self.assertTrue(os.path.exists(self.pidfile),
                        'Pidfile has not been created')

        with open(self.pidfile) as pid_file:
            pid = pid_file.read().rstrip()

        self.assertTrue(pid, 'Pidfile is empty')

        self.assertTrue(psutil.pid_exists(int(pid)),
                        'Process with specified pid does not exist')

        self.assertEqual(result, 0, 'Wrong return code')
项目:tasker    作者:wavenator    | 项目源码 | 文件源码
def killing_loop(
        self,
    ):
        while self.stop_event.wait():
            if not psutil.pid_exists(self.pid_to_kill):
                return

            process = psutil.Process(self.pid_to_kill)
            if self.memory_limit != 0 and process.memory_info().rss >= self.memory_limit:
                os.kill(self.pid_to_kill, self.memory_limit_signal)

            if self.soft_timeout != 0 and self.time_elapsed >= self.soft_timeout:
                os.kill(self.pid_to_kill, self.soft_timeout_signal)

            if self.hard_timeout != 0 and self.time_elapsed >= self.hard_timeout:
                os.kill(self.pid_to_kill, self.hard_timeout_signal)

            if self.critical_timeout != 0 and self.time_elapsed >= self.critical_timeout:
                os.kill(self.pid_to_kill, self.critical_timeout_signal)

            time.sleep(self.sleep_interval)
            self.time_elapsed += self.sleep_interval
项目:tasker    作者:wavenator    | 项目源码 | 文件源码
def killing_loop(
        self,
    ):
        while self.stop_event.wait():
            if not psutil.pid_exists(self.pid_to_kill):
                return

            process = psutil.Process(self.pid_to_kill)
            if self.memory_limit != 0 and process.memory_info().rss >= self.memory_limit:
                os.kill(self.pid_to_kill, self.memory_limit_signal)

            with self.time_elapsed.get_lock():
                if self.soft_timeout != 0 and self.time_elapsed.value >= self.soft_timeout:
                    os.kill(self.pid_to_kill, self.soft_timeout_signal)

                if self.hard_timeout != 0 and self.time_elapsed.value >= self.hard_timeout:
                    os.kill(self.pid_to_kill, self.hard_timeout_signal)

                if self.critical_timeout != 0 and self.time_elapsed.value >= self.critical_timeout:
                    os.kill(self.pid_to_kill, self.critical_timeout_signal)

                self.time_elapsed.value += self.sleep_interval

            time.sleep(self.sleep_interval)
项目:clusterfuzz-tools    作者:google    | 项目源码 | 文件源码
def get_process_ids(self, process_id, recursive=True):
    """Return list of pids for a process and its descendants."""

    # Try to find the running process.
    if not psutil.pid_exists(process_id):
      return []

    pids = [process_id]
    try:
      psutil_handle = psutil.Process(process_id)
      children = psutil_handle.children(recursive=recursive)
      for child in children:
        pids.append(child.pid)
    except:
      logger.info('psutil: Process abruptly ended.')
      raise

    return pids
项目:slag    作者:n8v-guy    | 项目源码 | 文件源码
def _find_next_job(self):
        """Lookup for available jobs to execute"""
        with self._ctx:
            workers = self._coll.distinct('who')
            unused = [pid for pid in workers
                      if not (pid and psutil.pid_exists(pid))]
            job = self._coll.find_one_and_update(
                {
                    'when': {'$lte': datetime.datetime.now()},
                    'who': {'$in': unused},
                },
                {
                    '$set': {'who': os.getpid()}
                },
                sort=[('when', -1)])
        return job
项目:smarthome    作者:smarthomeNG    | 项目源码 | 文件源码
def kill(pidfile, waittime=15):
    """
    This method kills the process identified by pidfile.

    :param pidfile: Name of the pidfile identifying the process to kill
    :param waittime: Number of seconds to wait before killing the process
    :type pidfile: str
    :type waittime: int
    """

    pid = read_pidfile(pidfile)
    if psutil.pid_exists(pid):
        p = psutil.Process(pid)
        if p is not None:
            p.terminate()
            try:
                p.wait(timeout=waittime)
            except Exception as e:
                pass
            if p.is_running():
                logger.warning("Killing process")
                p.kill()
项目:rotest    作者:gregoil    | 项目源码 | 文件源码
def validate_test_processes(self, expected_processes_num):
        """Validate that case's processes ran and were killed.

        * Validate that expected number of processes ran.
        * Validate that all processes got killed.

        Args:
            expected_processes_num (number): expected number of processes to
                validate.

        Raises:
            AssertionError. of one or more of the processes wasn't killed.
        """
        for _ in xrange(expected_processes_num):
            pid = self.pid_queue.get_nowait()
            self.assertFalse(psutil.pid_exists(pid),
                             "Process %s wasn't killed" % pid)
项目:pypers    作者:frankosan    | 项目源码 | 文件源码
def status(self,job_ids):

        """  Returns the status of a pid from a local background 'job'
        """

        job_ids = set(job_ids)
        status_map = dict.fromkeys(job_ids,'Unknown')

        for job_id in job_ids:
            job_failed_file = os.path.join(self.job_ids.get(job_id, {}).get('cwd'), FAILED_FILE)
            if not psutil.pid_exists(int(job_id)):
                if os.path.exists(job_failed_file):
                    status_map[job_id] = JOB_STATUS.FAILED
                else:
                    status_map[job_id] = JOB_STATUS.SUCCEEDED
            elif psutil.Process(job_id).status() == psutil.STATUS_ZOMBIE:
                if os.path.exists(job_failed_file):
                    status_map[job_id] = JOB_STATUS.FAILED
                else:
                    status_map[job_id] = JOB_STATUS.SUCCEEDED
            else:
                status_map[job_id] = JOB_STATUS.RUNNING

        return status_map
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_issue_687(self):
        # In case of thread ID:
        # - pid_exists() is supposed to return False
        # - Process(tid) is supposed to work
        # - pids() should not return the TID
        # See: https://github.com/giampaolo/psutil/issues/687
        t = ThreadTask()
        t.start()
        try:
            p = psutil.Process()
            tid = p.threads()[1].id
            assert not psutil.pid_exists(tid), tid
            pt = psutil.Process(tid)
            pt.as_dict()
            self.assertNotIn(tid, psutil.pids())
        finally:
            t.stop()
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_issue_687(self):
        # In case of thread ID:
        # - pid_exists() is supposed to return False
        # - Process(tid) is supposed to work
        # - pids() should not return the TID
        # See: https://github.com/giampaolo/psutil/issues/687
        t = ThreadTask()
        t.start()
        try:
            p = psutil.Process()
            tid = p.threads()[1].id
            assert not psutil.pid_exists(tid), tid
            pt = psutil.Process(tid)
            pt.as_dict()
            self.assertNotIn(tid, psutil.pids())
        finally:
            t.stop()


# =====================================================================
# --- sensors
# =====================================================================
项目:ppapi    作者:PPAPI    | 项目源码 | 文件源码
def all_running_processes_for_case(self, _filter):
        procs = []
        for p in psutil.process_iter():
            try:
                if psutil.pid_exists(p.pid):
                    if _filter in p.cwd(): 
                        procs.append({"run": p.cwd() + " " + p.name(),"cmd":p.cmdline()})
            except (psutil.AccessDenied):
                pass
            except (psutil.NoSuchProcess):
                pass
        return procs

    ##
    # Function to kill all running processes for one case
    #  @param _filter search filter <string>
    #  @retval list of all killed processes <list>
项目:ppapi    作者:PPAPI    | 项目源码 | 文件源码
def kill_all_running_processes_for_case(self, _filter):
        procs = []
        for p in psutil.process_iter():
            try:
                if psutil.pid_exists(p.pid):
                    if _filter in p.cwd():
                        procs.append({"run": p.cwd() + " " + p.name()})
                        self.kill_proc_tree(p.pid)
            except (psutil.AccessDenied):
                pass
            except (psutil.NoSuchProcess):
                pass
        return procs

    ##
    # Function to retrieve all processes for one case (running and completed)
    #  @param _case case <string>
    #  @retval list of all processes <list>
项目:progression    作者:cimatosa    | 项目源码 | 文件源码
def test_wrapper_termination():
    progression.log.setLevel(logging.DEBUG)
    shared_pid = progression.UnsignedIntValue()
    p = mp.Process(target = f_wrapper_termination, args = (shared_pid, ))
    p.start()
    time.sleep(2)
    p.terminate()
    p.join(5)

    pid = shared_pid.value 

    if pid != 0:
        if psutil.pid_exists(pid):
            p = psutil.Process(pid)
            while p.is_running():
                print("pid {} is still running, sigkill".format(pid))
                p.send_signal(signal.SIGKILL)
                time.sleep(0.1)

            print("pid {} has stopped now".format(pid))    
            assert False, "the loop process was still running!"
项目:social-listener    作者:topix-hackademy    | 项目源码 | 文件源码
def refresh_status(self):
        """
        Refresh process status.
        If a process is dead or in zombie status a flag 'is_alive' will be setted to False
        :return:
        """
        what_time_is_now = what_time_is_it()
        for process in Connection.Instance().db.manager.find({"is_alive": True}):
            if (not psutil.pid_exists(process['pid'])) or \
                    (psutil.pid_exists(process['pid']) and psutil.Process(process['pid']).status() ==
                        psutil.STATUS_ZOMBIE):
                self.update_process(process['pid'], {
                        'is_alive': False,
                        'last_update': what_time_is_now
                    })
            elif psutil.pid_exists(process['pid']):
                self.update_process(process['pid'], {'last_update': what_time_is_now})
        logging.info('Refresh Done!')
项目:taskSubmitter    作者:1a1a11a    | 项目源码 | 文件源码
def watch(pid, name="no name", check_interval=1): 
    """ check the given process has finished or not 
        if it finishes, then send out email notification 
    :param pid: the pid of monitoring process 
    :param name: the name of process, this is optional, only used for notification 
    :param check_interval: how long the monitoring process should wait before next check 
    :return: None 

    """
    msg = 'task  &ldquo; {} ({}) &rdquo; on _{}_ just began executing! &#9996;'.format(name, pid, socket.gethostname())
    configEmailClient().send_email(message=msg, topic="Task Submitted") 

    while psutil.pid_exists(pid):
        time.sleep(check_interval)

    msg = 'task &ldquo; {} ({}) &rdquo; on _{}_ finished executing &#9995;'.format(name, pid, socket.gethostname())
    configEmailClient().send_email(message=msg, topic="Task Finished")
项目:cli    作者:sparkl    | 项目源码 | 文件源码
def garbage_collect():
    """
    Performs a garbage collection of temp dirs not associated with
    a running process.
    """
    for working_dir in os.listdir(get_working_root()):
        pid = int(working_dir)
        if not psutil.pid_exists(pid):
            obsolete_dir = os.path.join(
                get_working_root(),
                working_dir)
            shutil.rmtree(obsolete_dir)
项目:apocalypse    作者:dhoomakethu    | 项目源码 | 文件源码
def kill(self, pid=None):
        if self.running:
            pid = self._pid if pid is None else pid
            if pid_exists(pid):
                log.debug("killing process with pid %s" % pid)

                process = Process(pid)
                self._out_stream.stop()
                self._error_stream.stop()
                for proc in process.children(recursive=True):
                    proc.kill()
                process.kill()
项目:utils    作者:rapydo    | 项目源码 | 文件源码
def find(prefix, suffixes=None, local_bin=False):

    current_pid = os.getpid()

    for pid in psutil.pids():

        if pid == current_pid or not psutil.pid_exists(pid):
            continue
        process = psutil.Process(pid)

        if process.name() == prefix:
            cmd = process.cmdline()

            if local_bin:
                check = False
                for word in cmd:
                    if '/usr/local/bin' in word:
                        check = True
                if not check:
                    continue

            if suffixes is not None:
                check = False
                for word in cmd:
                    if word in suffixes:
                        check = True
                if not check:
                    continue

            log.warning('Already existing')  # : %s' % cmd)
            return True

    return False
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def terminate(self):
        if self.process and psutil.pid_exists(self.process.pid):
            kill_process_tree(self.log, self.process.pid)
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def terminate(self):
        if self.process and psutil.pid_exists(self.process.pid):
            kill_process_tree(self.log, self.process.pid)
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def _parent_of_ignores_sigterm(child_process_killed, child_pid,
                                   process_done, setup_done):
        child = multiprocessing.Process(target=TestHelpers._ignores_sigterm,
                                        args=[child_pid, setup_done])
        child.start()
        if setup_done.acquire(timeout=1.0):
            helpers.kill_process_tree(logging.getLogger(), os.getpid(), timeout=1.0)
            # Process.is_alive doesnt work with SIGKILL
            if not psutil.pid_exists(child_pid.value):
                child_process_killed.value = 1

        process_done.release()
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_pid_exists(self):
        self.execute(psutil.pid_exists, os.getpid())

    # --- disk
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_kill(self):
        sproc = get_test_subprocess()
        test_pid = sproc.pid
        p = psutil.Process(test_pid)
        p.kill()
        sig = p.wait()
        self.assertFalse(psutil.pid_exists(test_pid))
        if POSIX:
            self.assertEqual(sig, signal.SIGKILL)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_terminate(self):
        sproc = get_test_subprocess()
        test_pid = sproc.pid
        p = psutil.Process(test_pid)
        p.terminate()
        sig = p.wait()
        self.assertFalse(psutil.pid_exists(test_pid))
        if POSIX:
            self.assertEqual(sig, signal.SIGTERM)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_send_signal(self):
        sig = signal.SIGKILL if POSIX else signal.SIGTERM
        sproc = get_test_subprocess()
        p = psutil.Process(sproc.pid)
        p.send_signal(sig)
        exit_sig = p.wait()
        self.assertFalse(psutil.pid_exists(p.pid))
        if POSIX:
            self.assertEqual(exit_sig, sig)
            #
            sproc = get_test_subprocess()
            p = psutil.Process(sproc.pid)
            p.send_signal(sig)
            with mock.patch('psutil.os.kill',
                            side_effect=OSError(errno.ESRCH, "")):
                with self.assertRaises(psutil.NoSuchProcess):
                    p.send_signal(sig)
            #
            sproc = get_test_subprocess()
            p = psutil.Process(sproc.pid)
            p.send_signal(sig)
            with mock.patch('psutil.os.kill',
                            side_effect=OSError(errno.EPERM, "")):
                with self.assertRaises(psutil.AccessDenied):
                    psutil.Process().send_signal(sig)
            # Sending a signal to process with PID 0 is not allowed as
            # it would affect every process in the process group of
            # the calling process (os.getpid()) instead of PID 0").
            if 0 in psutil.pids():
                p = psutil.Process(0)
                self.assertRaises(ValueError, p.send_signal, signal.SIGTERM)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_pid_0(self):
        # Process(0) is supposed to work on all platforms except Linux
        if 0 not in psutil.pids():
            self.assertRaises(psutil.NoSuchProcess, psutil.Process, 0)
            return

        # test all methods
        p = psutil.Process(0)
        for name in psutil._as_dict_attrnames:
            if name == 'pid':
                continue
            meth = getattr(p, name)
            try:
                ret = meth()
            except psutil.AccessDenied:
                pass
            else:
                if name in ("uids", "gids"):
                    self.assertEqual(ret.real, 0)
                elif name == "username":
                    if POSIX:
                        self.assertEqual(p.username(), 'root')
                    elif WINDOWS:
                        self.assertEqual(p.username(), 'NT AUTHORITY\\SYSTEM')
                elif name == "name":
                    assert name, name

        if hasattr(p, 'rlimit'):
            try:
                p.rlimit(psutil.RLIMIT_FSIZE)
            except psutil.AccessDenied:
                pass

        p.as_dict()

        if not OPENBSD:
            self.assertIn(0, psutil.pids())
            self.assertTrue(psutil.pid_exists(0))
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_pid_exists_2(self):
        reap_children()
        pids = psutil.pids()
        for pid in pids:
            try:
                assert psutil.pid_exists(pid)
            except AssertionError:
                # in case the process disappeared in meantime fail only
                # if it is no longer in psutil.pids()
                time.sleep(.1)
                if pid in psutil.pids():
                    self.fail(pid)
        pids = range(max(pids) + 5000, max(pids) + 6000)
        for pid in pids:
            self.assertFalse(psutil.pid_exists(pid), msg=pid)
项目:CAAPR    作者:Stargrazer82301    | 项目源码 | 文件源码
def run(self):
        time.sleep(10)  # wait after launch before beginning to check for PID
        while psutil.pid_exists(self.masterPID):
            time.sleep(2)
        sys.stderr.write("\n\n----\nlooks like python session that owned this instance of the " +
                         "ZTV gui is gone, so disposing of the window\n----\n")
        wx.CallAfter(pub.sendMessage, 'kill-ztv', msg=None)
项目:CAAPR    作者:Stargrazer82301    | 项目源码 | 文件源码
def run(self):
        time.sleep(10)  # wait after launch before beginning to check for PID
        while psutil.pid_exists(self.masterPID):
            time.sleep(2)
        sys.stderr.write("\n\n----\nlooks like python session that owned this instance of the " +
                         "ZTV gui is gone, so disposing of the window\n----\n")
        wx.CallAfter(pub.sendMessage, 'kill-ztv', msg=None)
项目:ahenk    作者:Pardus-LiderAhenk    | 项目源码 | 文件源码
def is_running():
            try:
                if System.Ahenk.get_pid_number() is not None:
                    return psutil.pid_exists(int(System.Ahenk.get_pid_number()))
                else:
                    return False
            except Exception as e:
                return False
项目:ahenk    作者:Pardus-LiderAhenk    | 项目源码 | 文件源码
def is_running(pid):
            return psutil.pid_exists(pid)
项目:picire    作者:renatahodovan    | 项目源码 | 文件源码
def _cleanup_slots(self):
        for i, pid in enumerate(self._slots):
            if pid != 0:
                if not psutil.pid_exists(pid):
                    self._slots[i] = 0

    # Target is expected to return True if loop shall continue and False if it
    # should break.
项目:smarthome    作者:smarthomeNG    | 项目源码 | 文件源码
def check_sh_is_running(pidfile):
    """
    This method deamonizes the sh.py process and redirects standard file descriptors.

    :param pidfile: Name of the pidfile to check
    :type pidfile: str

    :return: True: if SmartHomeNG is running, False: if SmartHome is not running
    :rtype: bool
    """

    pid = read_pidfile(pidfile)
    return psutil.pid_exists(pid) if pid > 0 else False
项目:py_daemoniker    作者:Muterra    | 项目源码 | 文件源码
def test_send(self):
        ''' Test sending signals.
        '''
        python_path = sys.executable
        python_path = os.path.abspath(python_path)
        worker_cmd = ('"' + python_path + '" -c ' +
                      '"import time; time.sleep(60)"')

        with tempfile.TemporaryDirectory() as dirpath:
            pidfile = dirpath + '/pid.pid'

            for sig in [2, signal.SIGTERM, SIGABRT]:
                with self.subTest(sig):
                    worker = subprocess.Popen(
                        worker_cmd
                    )
                    worker_pid = worker.pid

                    with open(pidfile, 'w') as f:
                        f.write(str(worker_pid) + '\n')

                    send(pidfile, sig)
                    worker.wait()
                    self.assertEqual(worker.returncode, int(sig))

                    # Get a false PID so we can test against it as well
                    # Note the mild race condition here
                    bad_pid = os.getpid()
                    while psutil.pid_exists(bad_pid):
                        bad_pid = random.randint(1000, 99999)

                    with open(pidfile, 'w') as f:
                        f.write(str(bad_pid) + '\n')

                    with self.assertRaises(OSError):
                        send(pidfile, sig)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_pid_exists(self):
        self.execute(psutil.pid_exists, os.getpid())

    # --- disk
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_kill(self):
        sproc = get_test_subprocess()
        test_pid = sproc.pid
        p = psutil.Process(test_pid)
        p.kill()
        sig = p.wait()
        self.assertFalse(psutil.pid_exists(test_pid))
        if POSIX:
            self.assertEqual(sig, -signal.SIGKILL)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_terminate(self):
        sproc = get_test_subprocess()
        test_pid = sproc.pid
        p = psutil.Process(test_pid)
        p.terminate()
        sig = p.wait()
        self.assertFalse(psutil.pid_exists(test_pid))
        if POSIX:
            self.assertEqual(sig, -signal.SIGTERM)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_send_signal(self):
        sig = signal.SIGKILL if POSIX else signal.SIGTERM
        sproc = get_test_subprocess()
        p = psutil.Process(sproc.pid)
        p.send_signal(sig)
        exit_sig = p.wait()
        self.assertFalse(psutil.pid_exists(p.pid))
        if POSIX:
            self.assertEqual(exit_sig, -sig)
            #
            sproc = get_test_subprocess()
            p = psutil.Process(sproc.pid)
            p.send_signal(sig)
            with mock.patch('psutil.os.kill',
                            side_effect=OSError(errno.ESRCH, "")):
                with self.assertRaises(psutil.NoSuchProcess):
                    p.send_signal(sig)
            #
            sproc = get_test_subprocess()
            p = psutil.Process(sproc.pid)
            p.send_signal(sig)
            with mock.patch('psutil.os.kill',
                            side_effect=OSError(errno.EPERM, "")):
                with self.assertRaises(psutil.AccessDenied):
                    psutil.Process().send_signal(sig)
            # Sending a signal to process with PID 0 is not allowed as
            # it would affect every process in the process group of
            # the calling process (os.getpid()) instead of PID 0").
            if 0 in psutil.pids():
                p = psutil.Process(0)
                self.assertRaises(ValueError, p.send_signal, signal.SIGTERM)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_pid_exists(self):
        sproc = get_test_subprocess()
        self.assertTrue(psutil.pid_exists(sproc.pid))
        p = psutil.Process(sproc.pid)
        p.kill()
        p.wait()
        self.assertFalse(psutil.pid_exists(sproc.pid))
        self.assertFalse(psutil.pid_exists(-1))
        self.assertEqual(psutil.pid_exists(0), 0 in psutil.pids())
        # pid 0
        psutil.pid_exists(0) == 0 in psutil.pids()
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_pid_exists_2(self):
        reap_children()
        pids = psutil.pids()
        for pid in pids:
            try:
                assert psutil.pid_exists(pid)
            except AssertionError:
                # in case the process disappeared in meantime fail only
                # if it is no longer in psutil.pids()
                time.sleep(.1)
                if pid in psutil.pids():
                    self.fail(pid)
        pids = range(max(pids) + 5000, max(pids) + 6000)
        for pid in pids:
            self.assertFalse(psutil.pid_exists(pid), msg=pid)
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_pid_exists(self):
        self.execute(psutil.pid_exists, os.getpid())

    # --- disk
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_kill(self):
        sproc = get_test_subprocess()
        test_pid = sproc.pid
        p = psutil.Process(test_pid)
        p.kill()
        sig = p.wait()
        self.assertFalse(psutil.pid_exists(test_pid))
        if POSIX:
            self.assertEqual(sig, -signal.SIGKILL)
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_terminate(self):
        sproc = get_test_subprocess()
        test_pid = sproc.pid
        p = psutil.Process(test_pid)
        p.terminate()
        sig = p.wait()
        self.assertFalse(psutil.pid_exists(test_pid))
        if POSIX:
            self.assertEqual(sig, -signal.SIGTERM)
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_send_signal(self):
        sig = signal.SIGKILL if POSIX else signal.SIGTERM
        sproc = get_test_subprocess()
        p = psutil.Process(sproc.pid)
        p.send_signal(sig)
        exit_sig = p.wait()
        self.assertFalse(psutil.pid_exists(p.pid))
        if POSIX:
            self.assertEqual(exit_sig, -sig)
            #
            sproc = get_test_subprocess()
            p = psutil.Process(sproc.pid)
            p.send_signal(sig)
            with mock.patch('psutil.os.kill',
                            side_effect=OSError(errno.ESRCH, "")):
                with self.assertRaises(psutil.NoSuchProcess):
                    p.send_signal(sig)
            #
            sproc = get_test_subprocess()
            p = psutil.Process(sproc.pid)
            p.send_signal(sig)
            with mock.patch('psutil.os.kill',
                            side_effect=OSError(errno.EPERM, "")):
                with self.assertRaises(psutil.AccessDenied):
                    psutil.Process().send_signal(sig)
            # Sending a signal to process with PID 0 is not allowed as
            # it would affect every process in the process group of
            # the calling process (os.getpid()) instead of PID 0").
            if 0 in psutil.pids():
                p = psutil.Process(0)
                self.assertRaises(ValueError, p.send_signal, signal.SIGTERM)
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_pid_exists(self):
        sproc = get_test_subprocess()
        self.assertTrue(psutil.pid_exists(sproc.pid))
        p = psutil.Process(sproc.pid)
        p.kill()
        p.wait()
        self.assertFalse(psutil.pid_exists(sproc.pid))
        self.assertFalse(psutil.pid_exists(-1))
        self.assertEqual(psutil.pid_exists(0), 0 in psutil.pids())
        # pid 0
        psutil.pid_exists(0) == 0 in psutil.pids()
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_pid_exists_2(self):
        reap_children()
        pids = psutil.pids()
        for pid in pids:
            try:
                assert psutil.pid_exists(pid)
            except AssertionError:
                # in case the process disappeared in meantime fail only
                # if it is no longer in psutil.pids()
                time.sleep(.1)
                if pid in psutil.pids():
                    self.fail(pid)
        pids = range(max(pids) + 5000, max(pids) + 6000)
        for pid in pids:
            self.assertFalse(psutil.pid_exists(pid), msg=pid)
项目:pydatalab    作者:googledatalab    | 项目源码 | 文件源码
def _wait_and_kill(pid_to_wait, pids_to_kill):
  """ Wait for a process to finish if it exists, and then kill a list of processes.

  Args:
    pid_to_wait: the process to wait for.
    pids_to_kill: a list of processes to kill after the process of pid_to_wait finishes.
  """
  if psutil.pid_exists(pid_to_wait):
    psutil.Process(pid=pid_to_wait).wait()

  for pid_to_kill in pids_to_kill:
    if psutil.pid_exists(pid_to_kill):
      p = psutil.Process(pid=pid_to_kill)
      p.kill()