Python os 模块,getpgrp() 实例源码

我们从Python开源项目中,提取了以下29个代码示例,用于说明如何使用os.getpgrp()

项目:iotronic    作者:openstack    | 项目源码 | 文件源码
def _is_daemon():
    # The process group for a foreground process will match the
    # process group of the controlling terminal. If those values do
    # not match, or ioctl() fails on the stdout file handle, we assume
    # the process is running in the background as a daemon.
    # http://www.gnu.org/software/bash/manual/bashref.html#Job-Control-Basics
    try:
        is_daemon = os.getpgrp() != os.tcgetpgrp(sys.stdout.fileno())
    except OSError as err:
        if err.errno == errno.ENOTTY:
            # Assume we are a daemon because there is no terminal.
            is_daemon = True
        else:
            raise
    except UnsupportedOperation:
        # Could not get the fileno for stdout, so we must be a daemon.
        is_daemon = True
    return is_daemon
项目:weibo    作者:windskyer    | 项目源码 | 文件源码
def _is_daemon():
    # The process group for a foreground process will match the
    # process group of the controlling terminal. If those values do
    # not match, or ioctl() fails on the stdout file handle, we assume
    # the process is running in the background as a daemon.
    # http://www.gnu.org/software/bash/manual/bashref.html#Job-Control-Basics
    try:
        is_daemon = os.getpgrp() != os.tcgetpgrp(sys.stdout.fileno())
    except io.UnsupportedOperation:
        # Could not get the fileno for stdout, so we must be a daemon.
        is_daemon = True
    except OSError as err:
        if err.errno == errno.ENOTTY:
            # Assume we are a daemon because there is no terminal.
            is_daemon = True
        else:
            raise
    return is_daemon
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def _check_ioctl_mutate_len(self, nbytes=None):
        buf = array.array('i')
        intsize = buf.itemsize
        ids = (os.getpgrp(), os.getsid(0))
        # A fill value unlikely to be in `ids`
        fill = -12345
        if nbytes is not None:
            # Extend the buffer so that it is exactly `nbytes` bytes long
            buf.extend([fill] * (nbytes // intsize))
            self.assertEqual(len(buf) * intsize, nbytes)   # sanity check
        else:
            buf.append(fill)
        with open("/dev/tty", "rb") as tty:
            r = fcntl.ioctl(tty, termios.TIOCGPGRP, buf, 1)
        rpgrp = buf[0]
        self.assertEqual(r, 0)
        self.assertIn(rpgrp, ids)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def _check_ioctl_mutate_len(self, nbytes=None):
        buf = array.array('i')
        intsize = buf.itemsize
        ids = (os.getpgrp(), os.getsid(0))
        # A fill value unlikely to be in `ids`
        fill = -12345
        if nbytes is not None:
            # Extend the buffer so that it is exactly `nbytes` bytes long
            buf.extend([fill] * (nbytes // intsize))
            self.assertEqual(len(buf) * intsize, nbytes)   # sanity check
        else:
            buf.append(fill)
        with open("/dev/tty", "r") as tty:
            r = fcntl.ioctl(tty, termios.TIOCGPGRP, buf, 1)
        rpgrp = buf[0]
        self.assertEqual(r, 0)
        self.assertIn(rpgrp, ids)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def _check_ioctl_mutate_len(self, nbytes=None):
        buf = array.array('i')
        intsize = buf.itemsize
        ids = (os.getpgrp(), os.getsid(0))
        # A fill value unlikely to be in `ids`
        fill = -12345
        if nbytes is not None:
            # Extend the buffer so that it is exactly `nbytes` bytes long
            buf.extend([fill] * (nbytes // intsize))
            self.assertEqual(len(buf) * intsize, nbytes)   # sanity check
        else:
            buf.append(fill)
        with open("/dev/tty", "r") as tty:
            r = fcntl.ioctl(tty, termios.TIOCGPGRP, buf, 1)
        rpgrp = buf[0]
        self.assertEqual(r, 0)
        self.assertIn(rpgrp, ids)
项目:STaaS    作者:CESNET    | 项目源码 | 文件源码
def getDebug(self):
        return {
            "environment": self.req.env,
            "client": self.req.client.__dict__,
            "database": self.db.get_debug(),
            "system": {
                "uname": os.uname()
            },
            "process": {
                "cwd": os.getcwdu(),
                "pid": os.getpid(),
                "ppid": os.getppid(),
                "pgrp": os.getpgrp(),
                "uid": os.getuid(),
                "gid": os.getgid(),
                "euid": os.geteuid(),
                "egid": os.getegid(),
                "groups": os.getgroups()
            }
        }
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def _check_ioctl_mutate_len(self, nbytes=None):
        buf = array.array('i')
        intsize = buf.itemsize
        ids = (os.getpgrp(), os.getsid(0))
        # A fill value unlikely to be in `ids`
        fill = -12345
        if nbytes is not None:
            # Extend the buffer so that it is exactly `nbytes` bytes long
            buf.extend([fill] * (nbytes // intsize))
            self.assertEqual(len(buf) * intsize, nbytes)   # sanity check
        else:
            buf.append(fill)
        with open("/dev/tty", "rb") as tty:
            r = fcntl.ioctl(tty, termios.TIOCGPGRP, buf, 1)
        rpgrp = buf[0]
        self.assertEqual(r, 0)
        self.assertIn(rpgrp, ids)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def _check_ioctl_mutate_len(self, nbytes=None):
        buf = array.array('i')
        intsize = buf.itemsize
        ids = (os.getpgrp(), os.getsid(0))
        # A fill value unlikely to be in `ids`
        fill = -12345
        if nbytes is not None:
            # Extend the buffer so that it is exactly `nbytes` bytes long
            buf.extend([fill] * (nbytes // intsize))
            self.assertEqual(len(buf) * intsize, nbytes)   # sanity check
        else:
            buf.append(fill)
        with open("/dev/tty", "r") as tty:
            r = fcntl.ioctl(tty, termios.TIOCGPGRP, buf, 1)
        rpgrp = buf[0]
        self.assertEqual(r, 0)
        self.assertIn(rpgrp, ids)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _check_ioctl_mutate_len(self, nbytes=None):
        buf = array.array('i')
        intsize = buf.itemsize
        ids = (os.getpgrp(), os.getsid(0))
        # A fill value unlikely to be in `ids`
        fill = -12345
        if nbytes is not None:
            # Extend the buffer so that it is exactly `nbytes` bytes long
            buf.extend([fill] * (nbytes // intsize))
            self.assertEqual(len(buf) * intsize, nbytes)   # sanity check
        else:
            buf.append(fill)
        with open("/dev/tty", "rb") as tty:
            r = fcntl.ioctl(tty, termios.TIOCGPGRP, buf, 1)
        rpgrp = buf[0]
        self.assertEqual(r, 0)
        self.assertIn(rpgrp, ids)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def _check_ioctl_mutate_len(self, nbytes=None):
        buf = array.array('i')
        intsize = buf.itemsize
        ids = (os.getpgrp(), os.getsid(0))
        # A fill value unlikely to be in `ids`
        fill = -12345
        if nbytes is not None:
            # Extend the buffer so that it is exactly `nbytes` bytes long
            buf.extend([fill] * (nbytes // intsize))
            self.assertEqual(len(buf) * intsize, nbytes)   # sanity check
        else:
            buf.append(fill)
        with open("/dev/tty", "r") as tty:
            r = fcntl.ioctl(tty, termios.TIOCGPGRP, buf, 1)
        rpgrp = buf[0]
        self.assertEqual(r, 0)
        self.assertIn(rpgrp, ids)
项目:TikZ    作者:ellisk42    | 项目源码 | 文件源码
def execute(self,dt):
        if self.finished: return "finished"
        if not self.running:
            self.process = Process(target = executeInProcessGroup, args = (self,))
            self.process.start()
            print "timeshare child PID:",self.process.pid
            os.setpgid(self.process.pid,self.process.pid)
            print "timeshare process group",os.getpgid(self.process.pid)
            assert os.getpgid(self.process.pid) == self.process.pid
            print "my process group",os.getpgrp(),"which should be",os.getpgid(0)
            assert os.getpgid(self.process.pid) != os.getpgid(0)
            self.running = True
        else:
            os.killpg(self.process.pid, signal.SIGCONT)

        self.process.join(dt)
        if self.process.is_alive():
            os.killpg(self.process.pid, signal.SIGSTOP)
            return "still running"
        else:
            self.finished = True
            return self.q.get()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def _check_ioctl_mutate_len(self, nbytes=None):
        buf = array.array('i')
        intsize = buf.itemsize
        ids = (os.getpgrp(), os.getsid(0))
        # A fill value unlikely to be in `ids`
        fill = -12345
        if nbytes is not None:
            # Extend the buffer so that it is exactly `nbytes` bytes long
            buf.extend([fill] * (nbytes // intsize))
            self.assertEqual(len(buf) * intsize, nbytes)   # sanity check
        else:
            buf.append(fill)
        with open("/dev/tty", "rb") as tty:
            r = fcntl.ioctl(tty, termios.TIOCGPGRP, buf, 1)
        rpgrp = buf[0]
        self.assertEqual(r, 0)
        self.assertIn(rpgrp, ids)
项目:pymotw3    作者:reingart    | 项目源码 | 文件源码
def show_setting_prgrp():
    print('Calling os.setpgrp() from {}'.format(os.getpid()))
    os.setpgrp()
    print('Process group is now {}'.format(
        os.getpid(), os.getpgrp()))
    sys.stdout.flush()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_ioctl(self):
        # If this process has been put into the background, TIOCGPGRP returns
        # the session ID instead of the process group id.
        ids = (os.getpgrp(), os.getsid(0))
        with open("/dev/tty", "rb") as tty:
            r = fcntl.ioctl(tty, termios.TIOCGPGRP, "    ")
            rpgrp = struct.unpack("i", r)[0]
            self.assertIn(rpgrp, ids)
项目:landscape-client    作者:CanonicalLtd    | 项目源码 | 文件源码
def main(args):
    if os.getpgrp() != os.getpid():
        os.setsid()
    return run_task_handler(ReleaseUpgrader, args)
项目:landscape-client    作者:CanonicalLtd    | 项目源码 | 文件源码
def main(args):
    if os.getpgrp() != os.getpid():
        os.setsid()
    return run_task_handler(PackageChanger, args)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_ioctl(self):
        # If this process has been put into the background, TIOCGPGRP returns
        # the session ID instead of the process group id.
        ids = (os.getpgrp(), os.getsid(0))
        tty = open("/dev/tty", "r")
        r = fcntl.ioctl(tty, termios.TIOCGPGRP, "    ")
        rpgrp = struct.unpack("i", r)[0]
        self.assertIn(rpgrp, ids)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_ioctl(self):
        # If this process has been put into the background, TIOCGPGRP returns
        # the session ID instead of the process group id.
        ids = (os.getpgrp(), os.getsid(0))
        tty = open("/dev/tty", "r")
        r = fcntl.ioctl(tty, termios.TIOCGPGRP, "    ")
        rpgrp = struct.unpack("i", r)[0]
        self.assertIn(rpgrp, ids)
项目:timmy    作者:openstack    | 项目源码 | 文件源码
def setup_handle_sig(subprocess=False):
    if os.getpid() != os.getpgrp():
        os.setpgrp()
    sig_handler = main_handle_sig if not subprocess else sub_handle_sig
    for sig in [signal.SIGINT, signal.SIGTERM, signal.SIGHUP]:
        signal.signal(sig, sig_handler)
    signal.signal(signal.SIGUSR1, handle_sig_usr1)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_ioctl(self):
        # If this process has been put into the background, TIOCGPGRP returns
        # the session ID instead of the process group id.
        ids = (os.getpgrp(), os.getsid(0))
        with open("/dev/tty", "rb") as tty:
            r = fcntl.ioctl(tty, termios.TIOCGPGRP, "    ")
            rpgrp = struct.unpack("i", r)[0]
            self.assertIn(rpgrp, ids)
项目:CVNDSpider    作者:ddxmaaa    | 项目源码 | 文件源码
def term(sig_num, addtion):
    try:
        #print 'current pid is %s, group id is %s' % (os.getpid(), os.getpgrp())
        os.killpg(os.getpgid(os.getpid()), signal.SIGKILL)
    except:
        pass
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_ioctl(self):
        # If this process has been put into the background, TIOCGPGRP returns
        # the session ID instead of the process group id.
        ids = (os.getpgrp(), os.getsid(0))
        tty = open("/dev/tty", "r")
        r = fcntl.ioctl(tty, termios.TIOCGPGRP, "    ")
        rpgrp = struct.unpack("i", r)[0]
        self.assertIn(rpgrp, ids)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_ioctl(self):
        # If this process has been put into the background, TIOCGPGRP returns
        # the session ID instead of the process group id.
        ids = (os.getpgrp(), os.getsid(0))
        with open("/dev/tty", "rb") as tty:
            r = fcntl.ioctl(tty, termios.TIOCGPGRP, "    ")
            rpgrp = struct.unpack("i", r)[0]
            self.assertIn(rpgrp, ids)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_ioctl(self):
        # If this process has been put into the background, TIOCGPGRP returns
        # the session ID instead of the process group id.
        ids = (os.getpgrp(), os.getsid(0))
        tty = open("/dev/tty", "r")
        r = fcntl.ioctl(tty, termios.TIOCGPGRP, "    ")
        rpgrp = struct.unpack("i", r)[0]
        self.assertIn(rpgrp, ids)
项目:buildroot    作者:flutter    | 项目源码 | 文件源码
def _GetPidForLock():
    """Returns the PID used for host_forwarder initialization.

    The PID of the "sharder" is used to handle multiprocessing. The "sharder"
    is the initial process that forks that is the parent process.
    """
    return os.getpgrp()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_ioctl(self):
        # If this process has been put into the background, TIOCGPGRP returns
        # the session ID instead of the process group id.
        ids = (os.getpgrp(), os.getsid(0))
        with open("/dev/tty", "rb") as tty:
            r = fcntl.ioctl(tty, termios.TIOCGPGRP, "    ")
            rpgrp = struct.unpack("i", r)[0]
            self.assertIn(rpgrp, ids)
项目:slug    作者:xonsh    | 项目源码 | 文件源码
def test_pgid():
    with ProcessGroup() as pg:
        pg.add(Process(runpy("input()")))
        pg.add(Process(runpy("input()")))
        pg.add(Process(runpy("input()")))
    pg.start()
    assert pg.pgid is not None
    assert all(pg.pgid == p.pgid for p in pg)
    assert pg.pgid != os.getpgrp()
    pg.kill()
项目:maas    作者:maas    | 项目源码 | 文件源码
def terminateProcess(
        pid, done, *, term_after=0.0, quit_after=5.0, kill_after=10.0):
    """Terminate the given process.

    A "sensible" way to terminate a process. Does the following:

      1. Sends SIGTERM to the process identified by `pid`.
      2. Waits for up to 5 seconds.
      3. Sends SIGQUIT to the process *group* of process `pid`.
      4. Waits for up to an additional 5 seconds.
      5. Sends SIGKILL to the process *group* of process `pid`.

    Steps #3 and #5 have a safeguard: if the process identified by `pid` has
    the same process group as the invoking process the signal is sent only to
    the process and not to the process group. This prevents the caller from
    inadvertently killing itself. For best effect, ensure that new processes
    become process group leaders soon after spawning.

    :param pid: The PID to terminate.
    :param done: A `Deferred` that fires when the process exits.
    """
    ppgid = os.getpgrp()

    def kill(sig):
        """Attempt to send `signal` to the given `pid`."""
        try:
            _os_kill(pid, sig)
        except ProcessLookupError:
            pass  # Already exited.

    def killpg(sig):
        """Attempt to send `signal` to the progress group of `pid`.

        If `pid` is running in the same process group as the invoking process,
        this falls back to using kill(2) instead of killpg(2).
        """
        try:
            pgid = os.getpgid(pid)
            if pgid == ppgid:
                _os_kill(pid, sig)
            else:
                _os_killpg(pgid, sig)
        except ProcessLookupError:
            pass  # Already exited.

    killers = (
        reactor.callLater(term_after, kill, signal.SIGTERM),
        reactor.callLater(quit_after, killpg, signal.SIGQUIT),
        reactor.callLater(kill_after, killpg, signal.SIGKILL),
    )

    def ended():
        for killer in killers:
            if killer.active():
                killer.cancel()

    done.addBoth(callOut, ended)
项目:NFStest    作者:thombashi    | 项目源码 | 文件源码
def show_progress(self, done=False):
        """Display progress bar if enabled and if running on correct terminal"""
        if SHOWPROG and self.showprog and (done or self.index % 500 == 0) \
          and (os.getpgrp() == os.tcgetpgrp(sys.stderr.fileno())):
            rows, columns = struct.unpack('hh', fcntl.ioctl(2, termios.TIOCGWINSZ, "1234"))
            if columns < 100:
                sps = 30
            else:
                # Terminal is wide enough, include bytes/sec
                sps = 42
            # Progress bar length
            wlen = int(columns) - len(str_units(self.filesize)) - sps
            # Number of bytes per progress bar unit
            xunit = float(self.filesize)/wlen
            # Progress bar units done so far
            xdone = int(self.offset/xunit)
            xtime = time.time()
            progress = 100.0*self.offset/self.filesize

            # Display progress only if there is some change in progress
            if (done and not self.progdone) or (self.prevdone != xdone or \
               int(self.prevtime) != int(xtime) or \
               round(self.prevprog) != round(progress)):
                if done:
                    # Do not display progress again when done=True
                    self.progdone = 1
                otime  = xtime - self.timestart # Overall time
                tdelta = xtime - self.prevtime  # Segment time
                self.prevprog = progress
                self.prevdone = xdone
                self.prevtime = xtime
                # Number of progress bar units for completion
                slen = wlen - xdone
                if done:
                    # Overall average bytes/sec
                    bps = self.offset / otime
                else:
                    # Segment average bytes/sec
                    bps = (self.offset - self.prevoff) / tdelta
                self.prevoff = self.offset
                # Progress bar has both foreground and background colors
                # as green and in case the terminal does not support colors
                # then a "=" is displayed instead instead of a green block
                pbar = " [\033[32m\033[42m%s\033[m%s] " % ("="*xdone, " "*slen)
                # Add progress percentage and how many bytes have been
                # processed so far relative to the total number of bytes
                pbar += "%5.1f%% %9s/%s" % (progress, str_units(self.offset), str_units(self.filesize))
                if columns < 100:
                    sys.stderr.write("%s %-6s\r" % (pbar, str_time(otime)))
                else:
                    # Terminal is wide enough, include bytes/sec
                    sys.stderr.write("%s %9s/s %-6s\r" % (pbar, str_units(bps), str_time(otime)))
                if done:
                    sys.stderr.write("\n")