Python os 模块,nice() 实例源码

我们从Python开源项目中,提取了以下25个代码示例,用于说明如何使用os.nice()

项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def per_process_init():
    # type: () -> None
    try:
        os.nice(19)
    except AttributeError:
        # nice is not available everywhere.
        pass
    except OSError:
        # When this program is already running on the nicest level (20) on OS X
        # it is not permitted to change the priority.
        pass
    # A keyboard interrupt disrupts the communication between a
    # Python script and its subprocesses when using multiprocessing.
    # The child can ignore SIGINT and is properly shut down
    # by a pool.terminate() call in case of a keyboard interrupt
    # or an early generator exit.
    signal.signal(signal.SIGINT, signal.SIG_IGN)
项目:n1mm_view    作者:n1kdo    | 项目源码 | 文件源码
def update_charts(q, event, size):
    try:
        os.nice(10)
    except AttributeError:
        logging.warn("can't be nice to windows")
    q.put((CRAWL_MESSAGE, 4, 'Chart engine starting...'))
    base_map = graphics.create_map()
    last_qso_timestamp = 0
    q.put((CRAWL_MESSAGE, 4, ''))

    try:
        while not event.is_set():
            t0 = time.time()
            last_qso_timestamp = load_data(size, q, base_map, last_qso_timestamp)
            t1 = time.time()
            delta = t1 - t0
            update_delay = config.DATA_DWELL_TIME - delta
            if update_delay < 0:
                update_delay = config.DATA_DWELL_TIME
            logging.debug('Next data update in %f seconds', update_delay)
            event.wait(update_delay)
    except Exception, e:
        logging.exception('Exception in update_charts', exc_info=e)
        q.put((CRAWL_MESSAGE, 4, 'Chart engine failed.', graphics.YELLOW, graphics.RED))
项目:pytrip    作者:pytrip    | 项目源码 | 文件源码
def calculate_angle_quality_thread(self,
                                       voi,
                                       gantry,
                                       couch,
                                       calculate_from=0,
                                       stepsize=1.0,
                                       q=None,
                                       avoid=[],
                                       voi_cube=None,
                                       gradient=True):
        """ TODO: Documentation
        """
        os.nice(1)
        for couch_angle in couch:
            qual = self.calculate_angle_quality(voi, gantry, couch_angle, calculate_from, stepsize, avoid, voi_cube,
                                                gradient)
            q.put({"couch": couch_angle, "gantry": gantry, "data": qual})
项目:isar    作者:ilbers    | 项目源码 | 文件源码
def parseConfiguration(self):
        # Set log file verbosity
        verboselogs = bb.utils.to_boolean(self.data.getVar("BB_VERBOSE_LOGS", False))
        if verboselogs:
            bb.msg.loggerVerboseLogs = True

        # Change nice level if we're asked to
        nice = self.data.getVar("BB_NICE_LEVEL", True)
        if nice:
            curnice = os.nice(0)
            nice = int(nice) - curnice
            buildlog.verbose("Renice to %s " % os.nice(nice))

        if self.recipecaches:
            del self.recipecaches
        self.multiconfigs = self.databuilder.mcdata.keys()
        self.recipecaches = {}
        for mc in self.multiconfigs:
            self.recipecaches[mc] = bb.cache.CacheData(self.caches_array)

        self.handleCollections(self.data.getVar("BBFILE_COLLECTIONS", True))
项目:Nanopore-read-processor    作者:rrwick    | 项目源码 | 文件源码
def make_ggplot_figures(self):
        info_filename = self.get_tsv_path()
        if not os.path.isfile(info_filename):
            print('Could not find tsv file: ' + info_filename)
            return

        r_script = os.path.join(os.path.dirname(os.path.realpath(__file__)),
                                'nanopore_read_analysis.R')
        print()
        print(' '.join(['nanopore_read_analysis.R', info_filename]), flush=True)
        command = [r_script, info_filename]
        p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                             preexec_fn=lambda: os.nice(20))
        out, err = p.communicate()
        p.wait()
        script_output_lines = err.decode().splitlines()
        for line in script_output_lines:
            print('  ' + line)
项目:jack    作者:jack-cli-cd-ripper    | 项目源码 | 文件源码
def start_new_process(args, nice_value=0):
    "start a new process in a pty and renice it"
    data = {}
    data['start_time'] = time.time()
    pid, master_fd = pty.fork()
    if pid == CHILD:
        default_signals()
        if nice_value:
            os.nice(nice_value)
        os.execvp(args[0], [a.encode(cf['_charset'], "replace") for a in args])
    else:
        data['pid'] = pid
        if os.uname()[0] == "Linux":
            fcntl.fcntl(master_fd, F_SETFL, O_NONBLOCK)
        data['fd'] = master_fd
        data['file'] = os.fdopen(master_fd)
        data['cmd'] = args
        data['buf'] = ""
        data['otf'] = 0
        data['percent'] = 0
        data['elapsed'] = 0
        return data
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def nice(level):
            pass
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def beNice(very_nice=False):
        if very_nice:
            value = 10
        else:
            value = 5
        nice(value)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _child(self, nice_level, child_on_start, child_on_exit):
        # right now we need to call a function, but first we need to
        # map all IO that might happen
        sys.stdout = stdout = get_unbuffered_io(1, self.STDOUT)
        sys.stderr = stderr = get_unbuffered_io(2, self.STDERR)
        retvalf = self.RETVAL.open("wb")
        EXITSTATUS = 0
        try:
            if nice_level:
                os.nice(nice_level)
            try:
                if child_on_start is not None:
                    child_on_start()
                retval = self.fun(*self.args, **self.kwargs)
                retvalf.write(marshal.dumps(retval))
                if child_on_exit is not None:
                    child_on_exit()
            except:
                excinfo = py.code.ExceptionInfo()
                stderr.write(str(excinfo._getreprcrash()))
                EXITSTATUS = self.EXITSTATUS_EXCEPTION
        finally:
            stdout.close()
            stderr.close()
            retvalf.close()
        os.close(1)
        os.close(2)
        os._exit(EXITSTATUS)
项目:execnet    作者:pytest-dev    | 项目源码 | 文件源码
def test_popen_nice(self, makegateway):
        gw = makegateway("popen")

        def getnice(channel):
            import os
            if hasattr(os, 'nice'):
                channel.send(os.nice(0))
            else:
                channel.send(None)
        remotenice = gw.remote_exec(getnice).receive()
        gw.exit()
        if remotenice is not None:
            gw = makegateway("popen//nice=5")
            remotenice2 = gw.remote_exec(getnice).receive()
            assert remotenice2 == remotenice + 5
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def _child(self, nice_level, child_on_start, child_on_exit):
        # right now we need to call a function, but first we need to
        # map all IO that might happen
        sys.stdout = stdout = get_unbuffered_io(1, self.STDOUT)
        sys.stderr = stderr = get_unbuffered_io(2, self.STDERR)
        retvalf = self.RETVAL.open("wb")
        EXITSTATUS = 0
        try:
            if nice_level:
                os.nice(nice_level)
            try:
                if child_on_start is not None:
                    child_on_start()
                retval = self.fun(*self.args, **self.kwargs)
                retvalf.write(marshal.dumps(retval))
                if child_on_exit is not None:
                    child_on_exit()
            except:
                excinfo = py.code.ExceptionInfo()
                stderr.write(str(excinfo._getreprcrash()))
                EXITSTATUS = self.EXITSTATUS_EXCEPTION
        finally:
            stdout.close()
            stderr.close()
            retvalf.close()
        os.close(1)
        os.close(2)
        os._exit(EXITSTATUS)
项目:Nanopore-read-processor    作者:rrwick    | 项目源码 | 文件源码
def gzip_fast5s(self):
        tarball = self.get_tarball_path()
        if os.path.isfile(tarball):
            os.remove(tarball)

        print('\ngzipping reads into a tar.gz file...', flush=True)

        base_dir = self.base_dir
        if not base_dir.endswith('/'):
            base_dir += '/'

        for directory in self.all_dirs:
            assert directory.startswith(self.base_dir)

        rel_dirs = [x[len(base_dir):] for x in self.all_dirs]

        current_dir = os.getcwd()
        os.chdir(self.base_dir)

        # Use pigz if available to speed up the compression.
        pigz_path = shutil.which('pigz')
        if pigz_path:
            tar_cmd = 'tar cf - ' + ' '.join(rel_dirs) + ' | pigz -9 -p 8 > ' + tarball
        else:
            tar_cmd = 'tar -czvf ' + tarball + ' '.join(rel_dirs)

        tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                               preexec_fn=lambda: os.nice(20), shell=True)
        _, _ = tar.communicate()
        tar.wait()
        os.chdir(current_dir)
项目:Nanopore-read-processor    作者:rrwick    | 项目源码 | 文件源码
def tarball_count_is_correct(self):
        tarball = self.get_tarball_path()
        print('Checking that fast5 file count matches that in ' + os.path.basename(tarball) +
              '...  ', end='')
        fast5_count = len(self.all_fast5_files)
        tar_count_cmd = 'pigz -dc ' + tarball + ' | tar tf - | grep -P ".fast5$" | wc -l'
        p = subprocess.Popen(tar_count_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                             preexec_fn=lambda: os.nice(20), shell=True)
        out, _ = p.communicate()
        tar_count = int(out.decode())
        match = tar_count == fast5_count
        if match:
            print('all good!')
        return match
项目:Nanopore-read-processor    作者:rrwick    | 项目源码 | 文件源码
def nanonetcall(directory, threads):
    temp_fastq = 'temp_' + str(random.randint(0, 100000000)) + '.fastq'
    nanonetcall_cmd = ['nanonetcall', '--fastq', '--write_events', '--jobs', str(threads),
                       '--max_len', '100000', directory]
    with open(temp_fastq, 'wb') as fastq_out:
        p = subprocess.Popen(nanonetcall_cmd, stdout=fastq_out, stderr=subprocess.PIPE,
                             preexec_fn=lambda: os.nice(20))
        _, _ = p.communicate()
        p.wait()
    remove_if_exists(temp_fastq)
项目:Nanopore-read-processor    作者:rrwick    | 项目源码 | 文件源码
def nanonetcall_2d(directory, threads):
    temp_prefix = 'temp_' + str(random.randint(0, 100000000))
    nanonet2d_cmd = ['nanonet2d', '--fastq', '--write_events', '--jobs', str(threads),
                     '--max_len', '100000', directory, temp_prefix]
    p = subprocess.Popen(nanonet2d_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                         preexec_fn=lambda: os.nice(20))
    _, _ = p.communicate()
    p.wait()
    remove_if_exists(temp_prefix + '_template.fastq')
    remove_if_exists(temp_prefix + '_complement.fastq')
    remove_if_exists(temp_prefix + '_2d.fastq')
项目:godot-python    作者:touilleMan    | 项目源码 | 文件源码
def _child(self, nice_level, child_on_start, child_on_exit):
        # right now we need to call a function, but first we need to
        # map all IO that might happen
        sys.stdout = stdout = get_unbuffered_io(1, self.STDOUT)
        sys.stderr = stderr = get_unbuffered_io(2, self.STDERR)
        retvalf = self.RETVAL.open("wb")
        EXITSTATUS = 0
        try:
            if nice_level:
                os.nice(nice_level)
            try:
                if child_on_start is not None:
                    child_on_start()
                retval = self.fun(*self.args, **self.kwargs)
                retvalf.write(marshal.dumps(retval))
                if child_on_exit is not None:
                    child_on_exit()
            except:
                excinfo = py.code.ExceptionInfo()
                stderr.write(str(excinfo._getreprcrash()))
                EXITSTATUS = self.EXITSTATUS_EXCEPTION
        finally:
            stdout.close()
            stderr.close()
            retvalf.close()
        os.close(1)
        os.close(2)
        os._exit(EXITSTATUS)
项目:godot-python    作者:touilleMan    | 项目源码 | 文件源码
def _child(self, nice_level, child_on_start, child_on_exit):
        # right now we need to call a function, but first we need to
        # map all IO that might happen
        sys.stdout = stdout = get_unbuffered_io(1, self.STDOUT)
        sys.stderr = stderr = get_unbuffered_io(2, self.STDERR)
        retvalf = self.RETVAL.open("wb")
        EXITSTATUS = 0
        try:
            if nice_level:
                os.nice(nice_level)
            try:
                if child_on_start is not None:
                    child_on_start()
                retval = self.fun(*self.args, **self.kwargs)
                retvalf.write(marshal.dumps(retval))
                if child_on_exit is not None:
                    child_on_exit()
            except:
                excinfo = py.code.ExceptionInfo()
                stderr.write(str(excinfo._getreprcrash()))
                EXITSTATUS = self.EXITSTATUS_EXCEPTION
        finally:
            stdout.close()
            stderr.close()
            retvalf.close()
        os.close(1)
        os.close(2)
        os._exit(EXITSTATUS)
项目:Mac-Python-3.X    作者:L1nwatch    | 项目源码 | 文件源码
def main():
    print(os.nice(0))  # get relative process priority
    print(os.nice(1))  # change relative priority
    print(os.times())  # process times: system, user etc...
    print(os.isatty(0))  # is the file descriptor arg a tty?(0 = stdin)
    print(os.isatty(4))  # 4 is just an arbitrary test value
    print(os.getloadavg())  # UNIX only - number of processes in queue
    print(os.cpu_count())  # New in Python 3.4
项目:dancedeets-monorepo    作者:mikelambert    | 项目源码 | 文件源码
def _partition_init_worker():
    import signal
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    import os
    os.nice(5)
项目:dancedeets-monorepo    作者:mikelambert    | 项目源码 | 文件源码
def init_worker():
    import signal
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    import os
    os.nice(5)
项目:GSM-scanner    作者:yosriayed    | 项目源码 | 文件源码
def _child(self, nice_level, child_on_start, child_on_exit):
        # right now we need to call a function, but first we need to
        # map all IO that might happen
        sys.stdout = stdout = get_unbuffered_io(1, self.STDOUT)
        sys.stderr = stderr = get_unbuffered_io(2, self.STDERR)
        retvalf = self.RETVAL.open("wb")
        EXITSTATUS = 0
        try:
            if nice_level:
                os.nice(nice_level)
            try:
                if child_on_start is not None:
                    child_on_start()
                retval = self.fun(*self.args, **self.kwargs)
                retvalf.write(marshal.dumps(retval))
                if child_on_exit is not None:
                    child_on_exit()
            except:
                excinfo = py.code.ExceptionInfo()
                stderr.write(str(excinfo._getreprcrash()))
                EXITSTATUS = self.EXITSTATUS_EXCEPTION
        finally:
            stdout.close()
            stderr.close()
            retvalf.close()
        os.close(1)
        os.close(2)
        os._exit(EXITSTATUS)
项目:GSM-scanner    作者:yosriayed    | 项目源码 | 文件源码
def _child(self, nice_level, child_on_start, child_on_exit):
        # right now we need to call a function, but first we need to
        # map all IO that might happen
        sys.stdout = stdout = get_unbuffered_io(1, self.STDOUT)
        sys.stderr = stderr = get_unbuffered_io(2, self.STDERR)
        retvalf = self.RETVAL.open("wb")
        EXITSTATUS = 0
        try:
            if nice_level:
                os.nice(nice_level)
            try:
                if child_on_start is not None:
                    child_on_start()
                retval = self.fun(*self.args, **self.kwargs)
                retvalf.write(marshal.dumps(retval))
                if child_on_exit is not None:
                    child_on_exit()
            except:
                excinfo = py.code.ExceptionInfo()
                stderr.write(str(excinfo._getreprcrash()))
                EXITSTATUS = self.EXITSTATUS_EXCEPTION
        finally:
            stdout.close()
            stderr.close()
            retvalf.close()
        os.close(1)
        os.close(2)
        os._exit(EXITSTATUS)
项目:py    作者:pytest-dev    | 项目源码 | 文件源码
def _child(self, nice_level, child_on_start, child_on_exit):
        # right now we need to call a function, but first we need to
        # map all IO that might happen
        sys.stdout = stdout = get_unbuffered_io(1, self.STDOUT)
        sys.stderr = stderr = get_unbuffered_io(2, self.STDERR)
        retvalf = self.RETVAL.open("wb")
        EXITSTATUS = 0
        try:
            if nice_level:
                os.nice(nice_level)
            try:
                if child_on_start is not None:
                    child_on_start()
                retval = self.fun(*self.args, **self.kwargs)
                retvalf.write(marshal.dumps(retval))
                if child_on_exit is not None:
                    child_on_exit()
            except:
                excinfo = py.code.ExceptionInfo()
                stderr.write(str(excinfo._getreprcrash()))
                EXITSTATUS = self.EXITSTATUS_EXCEPTION
        finally:
            stdout.close()
            stderr.close()
            retvalf.close()
        os.close(1)
        os.close(2)
        os._exit(EXITSTATUS)
项目:xpybuild    作者:xpybuild    | 项目源码 | 文件源码
def lowerCurrentProcessPriority():
    if buildcommon.isWindows():
        import win32process, win32api,win32con
        win32process.SetPriorityClass(win32api.GetCurrentProcess(), win32process.BELOW_NORMAL_PRIORITY_CLASS)
    else:
        # on unix, people may run nice before executing the process, so 
        # only change the priority unilaterally if it's currently at its 
        # default value
        if os.nice(0) == 0:
            os.nice(1) # change to 1 below the current level
项目:anglerfish    作者:juancarlospaco    | 项目源码 | 文件源码
def set_process_priority(nice: bool=True, ionice: bool=False,
                         cpulimit: int=0) -> bool:
    """Set process name and cpu priority."""
    w = " may delay I/O Operations, not recommended on user-facing GUI!."
    try:
        if nice:
            _old = os.getpriority(os.PRIO_PROCESS, 0)
            os.nice(19)  # smooth cpu priority
            log.debug(f"Process CPUs Priority set: from {_old} to 19.")
        elif ionice and which("ionice"):
            log.warning("ionice" + w)
            cmnd = f"{which('ionice')} --ignore --class 3 --pid {os.getpid()}"
            call(cmnd, shell=True)  # I/O nice,should work on Linux and Os X
            log.debug(f"Process PID {os.getpid()} I/O Priority set to: {cmnd}")
        elif cpulimit and which("cpulimit"):
            log.warning("cpulimit" + w)
            log.debug("Launching 1 background 'cpulimit' child subprocess...")
            cpulimit = int(cpulimit if cpulimit > 4 else 5)  # makes 5 the min.
            command = "{0} --include-children --pid={1} --limit={2}".format(
                which("cpulimit"), os.getpid(), cpulimit)
            proces = Popen(command, shell=True)  # This launch a subprocess.
            atexit.register(proces.kill)  # Force Kill subprocess at exit.
            log.debug(f"Process CPUs Max Limits capped to: {command}.")
    except Exception as error:
        log.warning(error)
        return False  # this may fail on windows and its normal, so be silent.
    else:
        return True