Python os 模块,getloadavg() 实例源码

我们从Python开源项目中,提取了以下28个代码示例,用于说明如何使用os.getloadavg()

项目:sporco    作者:bwohlberg    | 项目源码 | 文件源码
def idle_cpu_count(mincpu=1):
    """Estimate number of idle CPUs, for use by multiprocessing code
    needing to determine how many processes can be run without excessive
    load. This function uses :func:`os.getloadavg` which is only available
    under a Unix OS.

    Parameters
    ----------
    mincpu : int
      Minimum number of CPUs to report, independent of actual estimate

    Returns
    -------
    idle : int
      Estimate of number of idle CPUs
    """

    if PY2:
        ncpu = mp.cpu_count()
    else:
        ncpu = os.cpu_count()
    idle = int(ncpu - np.floor(os.getloadavg()[0]))
    return max(mincpu, idle)
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def display_loadavg(scr):
    lavg = os.getloadavg()
    write(scr, 1, 0, 'System',             curses.color_pair(9))
    write(scr, 1, 7, 'Load:',              curses.color_pair(9))
    write(scr, 1, 13, '%.02f' % lavg[0],   curses.color_pair(9))
    write(scr, 1, 20, '%.02f' % lavg[1],   curses.color_pair(9))
    write(scr, 1, 27, '%.02f' % lavg[2],   curses.color_pair(9))
项目:sauna    作者:NicolasLM    | 项目源码 | 文件源码
def load(self):
        if not self._load:
            self._load = os.getloadavg()
        return self._load
项目:NebulaSolarDash    作者:toddlerya    | 项目源码 | 文件源码
def get_load(self):
        """
        ??????????
        """
        try:
            data = os.getloadavg()[0]
        except Exception as err:
            print err
            data = str(err)

        return data

    # ----------------end: ???????????????-----------------

    # ----------------start: ??CPU????-----------------
项目:temboard-agent    作者:dalibo    | 项目源码 | 文件源码
def get_load_average(self,):
        return getloadavg()[0]
项目:bumblebee-status    作者:tobi-wan-kenobi    | 项目源码 | 文件源码
def update(self, widgets):
        self._load = os.getloadavg()
项目:landscape-client    作者:CanonicalLtd    | 项目源码 | 文件源码
def __init__(self, interval=15, monitor_interval=60*60,
                 create_time=time.time, get_load_average=os.getloadavg):
        self._interval = interval
        self._monitor_interval = monitor_interval
        self._create_time = create_time
        self._load_averages = []
        self._get_load_average = get_load_average
项目:landscape-client    作者:CanonicalLtd    | 项目源码 | 文件源码
def run(self):
        self._sysinfo.add_header("System load", str(os.getloadavg()[0]))
        return succeed(None)
项目:JimV-N    作者:jamesiter    | 项目源码 | 文件源码
def state_report_engine(self):
        """
        ??????????
        """
        self.init_conn()

        # ????????????
        self.update_interfaces()
        self.update_disks()
        boot_time = ji.Common.ts()

        while True:
            if Utils.exit_flag:
                msg = 'Thread state_report_engine say bye-bye'
                print msg
                logger.info(msg=msg)
                return

            thread_status['state_report_engine'] = ji.JITime.now_date_time()

            # noinspection PyBroadException
            try:
                time.sleep(2)

                # ????????
                if ji.Common.ts() % 60 == 0:
                    self.update_interfaces()
                    self.update_disks()

                host_event_emit.heartbeat(message={'node_id': self.node_id, 'cpu': self.cpu, 'memory': self.memory,
                                                   'interfaces': self.interfaces, 'disks': self.disks,
                                                   'system_load': os.getloadavg(), 'boot_time': boot_time,
                                                   'memory_available': psutil.virtual_memory().available})

            except:
                logger.error(traceback.format_exc())
                log_emit.error(traceback.format_exc())
项目:gpvdm    作者:roderickmackenzie    | 项目源码 | 文件源码
def heart_beat(self):
        while(1):
            my_load=os.getloadavg()
            my_load=my_load[0]
            command="load#"+str(my_load)
            self.socket.sendto(command, (self.server_ip, self.tx_port))
            sleep(0.25)
项目:nixstatsagent    作者:NIXStats    | 项目源码 | 文件源码
def run(self, *unused):
        return os.getloadavg()
项目:raspberry-pi-2    作者:LuisDiazUgena    | 项目源码 | 文件源码
def cpu_usage():
    # load average, uptime
    uptime = datetime.now() - datetime.fromtimestamp(psutil.boot_time())
    av1, av2, av3 = os.getloadavg()
    return "Ld:%.1f %.1f %.1f Up: %s" \
            % (av1, av2, av3, str(uptime).split('.')[0])
项目:NZ-ORCID-Hub    作者:Royal-Society-of-New-Zealand    | 项目源码 | 文件源码
def get_os_internals():  # noqa: D103
    os_internals = []
    if hasattr(os, 'getcwd'):
        os_internals.append(("Current Working Directory", os.getcwd()))
    if hasattr(os, 'getegid'):
        os_internals.append(("Effective Group ID", os.getegid()))

    if hasattr(os, 'geteuid'):
        os_internals.append(("Effective User ID", os.geteuid()))

    if hasattr(os, 'getgid'):
        os_internals.append(("Group ID", os.getgid()))

    if hasattr(os, 'getuid'):
        os_internals.append(("User ID", os.getuid()))

    if hasattr(os, 'getgroups'):
        os_internals.append(("Group Membership", ', '.join(map(str, os.getgroups()))))

    if hasattr(os, 'linesep'):
        os_internals.append(("Line Seperator", repr(os.linesep)[1:-1]))

    if hasattr(os, 'pathsep'):
        os_internals.append(("Path Seperator", os.pathsep))

    if hasattr(os, 'getloadavg'):
        os_internals.append(("Load Avarage", ', '.join(
            map(lambda x: str(round(x, 2)), os.getloadavg()))))

    return os_internals
项目:Mac-Python-3.X    作者:L1nwatch    | 项目源码 | 文件源码
def main():
    print(os.nice(0))  # get relative process priority
    print(os.nice(1))  # change relative priority
    print(os.times())  # process times: system, user etc...
    print(os.isatty(0))  # is the file descriptor arg a tty?(0 = stdin)
    print(os.isatty(4))  # 4 is just an arbitrary test value
    print(os.getloadavg())  # UNIX only - number of processes in queue
    print(os.cpu_count())  # New in Python 3.4
项目:perf    作者:vstinner    | 项目源码 | 文件源码
def collect_system_metadata(metadata):
    metadata['platform'] = platform.platform(True, False)
    if sys.platform.startswith('linux'):
        collect_linux_metadata(metadata)

    # on linux, load average over 1 minute
    for line in read_proc("loadavg"):
        fields = line.split()
        loadavg = fields[0]
        metadata['load_avg_1min'] = float(loadavg)

        if len(fields) >= 4 and '/' in fields[3]:
            runnable_threads = fields[3].split('/', 1)[0]
            runnable_threads = int(runnable_threads)
            metadata['runnable_threads'] = runnable_threads

    if 'load_avg_1min' not in metadata and hasattr(os, 'getloadavg'):
        metadata['load_avg_1min'] = os.getloadavg()[0]

    # Hostname
    hostname = socket.gethostname()
    if hostname:
        metadata['hostname'] = hostname

    # Boot time
    boot_time = None
    for line in read_proc("stat"):
        if not line.startswith("btime "):
            continue
        boot_time = int(line[6:])
        break

    if boot_time is None and psutil:
        boot_time = psutil.boot_time()

    if boot_time is not None:
        btime = datetime.datetime.fromtimestamp(boot_time)
        metadata['boot_time'] = format_datetime(btime)
        metadata['uptime'] = time.time() - boot_time
项目:troup    作者:troup-system    | 项目源码 | 文件源码
def _get_system_stats_(self):
        stats = {}
        one, five, fifteen = os.getloadavg()
        stats['load'] = [one, five, fifteen]

        return stats
项目:ops_agent    作者:sjqzhang    | 项目源码 | 文件源码
def check(self):
        data = {}
        # 20160725 windows???load??
        if platform.system() != 'Windows':
            load = os.getloadavg()
            data.update({'load.1': load[0], 'load.5': load[1], 'load.15': load[2]})
        data.update({"cpu.used_total": int(psutil.cpu_percent())})
        # ????CPU????
        per_cpu = psutil.cpu_percent(percpu=True)
        # ????CPU0???
        data.update({'cpu.cpu{0}_used'.format(i): int(val) for i,val in enumerate(per_cpu)})

        # ??CPU???
        new_cpu_times = psutil.cpu_times()
        if self.last_cpu_times is not None:
            last_total_time = reduce(lambda s,x:s+x, self.last_cpu_times)
            now_total_time = reduce(lambda s,x:s+x, new_cpu_times)
            total_time = now_total_time - last_total_time
            data['cpu.used_sy'] = self._get_cpu_time('system', total_time, new_cpu_times)
            data['cpu.used_us'] = self._get_cpu_time('user', total_time, new_cpu_times)
            data['cpu.used_wa'] = self._get_cpu_time('iowait', total_time, new_cpu_times)
            # data['cpu.used_id'] = self._get_cpu_time('idle', total_time, new_cpu_times)
            # data['cpu.used_ni'] = self._get_cpu_time('nice', total_time, new_cpu_times)
            # data['cpu.used_hi'] = self._get_cpu_time('irq', total_time, new_cpu_times)
            # data['cpu.used_si'] = self._get_cpu_time('softirq', total_time, new_cpu_times)
            # data['cpu.used_st'] = self._get_cpu_time('steal', total_time, new_cpu_times)
        else:# ?????
            self.last_cpu_times = new_cpu_times
            gevent.sleep(0.1)
            new_cpu_times = psutil.cpu_times()
            last_total_time = reduce(lambda s,x:s+x, self.last_cpu_times)
            now_total_time = reduce(lambda s,x:s+x, new_cpu_times)
            total_time = now_total_time - last_total_time
            data['cpu.used_sy'] = self._get_cpu_time('system', total_time, new_cpu_times)
            data['cpu.used_us'] = self._get_cpu_time('user', total_time, new_cpu_times)
            data['cpu.used_wa'] = self._get_cpu_time('iowait', total_time, new_cpu_times)

        self.last_cpu_times = new_cpu_times
        return data
项目:pep.py    作者:osuripple    | 项目源码 | 文件源码
def getSystemInfo():
    """
    Get a dictionary with some system/server info

    :return: ["unix", "connectedUsers", "webServer", "cpuUsage", "totalMemory", "usedMemory", "loadAverage"]
    """
    data = {"unix": runningUnderUnix(), "connectedUsers": len(glob.tokens.tokens), "matches": len(glob.matches.matches)}

    # General stats
    delta = time.time()-glob.startTime
    days = math.floor(delta/86400)
    delta -= days*86400

    hours = math.floor(delta/3600)
    delta -= hours*3600

    minutes = math.floor(delta/60)
    delta -= minutes*60

    seconds = math.floor(delta)

    data["uptime"] = "{}d {}h {}m {}s".format(days, hours, minutes, seconds)
    data["cpuUsage"] = psutil.cpu_percent()
    memory = psutil.virtual_memory()
    data["totalMemory"] = "{0:.2f}".format(memory.total/1074000000)
    data["usedMemory"] = "{0:.2f}".format(memory.active/1074000000)

    # Unix only stats
    if data["unix"]:
        data["loadAverage"] = os.getloadavg()
    else:
        data["loadAverage"] = (0,0,0)

    return data
项目:WelcomeBot-Telegram    作者:Too-Naive    | 项目源码 | 文件源码
def getloadavg():
    r = os.getloadavg()
    return '{} {} {}'.format(r[0],r[1],r[2])
项目:trough    作者:internetarchive    | 项目源码 | 文件源码
def heartbeat(self, pool=None, node=None, ttl=None, **doc):
        if None in [pool, node, ttl]:
            raise Exception('"pool", "node" and "ttl" are required arguments.')
        doc['id'] = "%s:%s:%s" % (pool, node, doc.get('segment'))
        logging.info("Setting Heartbeat ID to [%s]" % doc['id'])
        doc['role'] = pool
        doc['node'] = node
        doc['ttl'] = ttl
        doc['load'] = os.getloadavg()[1] # load average over last 5 mins
        logging.info('Heartbeat: role[%s] node[%s] at IP %s:%s with ttl %s' % (pool, node, node, doc.get('port'), ttl))
        return self.services.heartbeat(doc)
项目:trough    作者:internetarchive    | 项目源码 | 文件源码
def bulk_heartbeat(self, ids):
        self.rethinker.table('services').get_all(*ids).update({ 'last_heartbeat': r.now(), 'load': os.getloadavg()[1] }).run()
        # send a non-bulk heartbeat for each id we *didn't* just update
        missing_ids = set(ids) - set(self.rethinker.table('services').get_all(*ids).get_field('id').run())
        for id in missing_ids:
            pool, node, segment = id.split(":")
            port = settings['WRITE_PORT'] if pool == 'trough-write' else settings['READ_PORT']
            url = 'http://%s:%s/?segment=%s' % (node, port, segment)
            self.heartbeat(pool=pool, node=node, segment=segment, port=port, url=url, ttl=round(settings['SYNC_LOOP_TIMING'] * 4))
项目:scff    作者:softScheck    | 项目源码 | 文件源码
def print_status():
    ret = ""
    if os.path.isfile(".scff/distributed"):
        mode = "Distributed"
    else:
        mode = str(CPU_CORES) + " * " + fuzzer
    ret = ("\nMode: " + mode + " \tUptime: " + get_uptime() + " \tLoad: " \
        + str(os.getloadavg()[0])[:4] + "\tCPU:" \
        + str(int(psutil.cpu_percent(interval=0.2))) \
        + "%")
    if len(getRunningFuzzers()) >= 1:
        ret += ("\nS CMDLINE                                       PID     CPU%   MEM%")
        for proc in getRunningFuzzers():
            if proc.status() == "sleeping":
                status = "zZ"
                status = "S"
            elif proc.status() == "running":
                status = ">>"
                status = "R"
            elif proc.status() == "stopped":
                status = "||"
                status = "T"
            else:
                status = ":("
                status = "D"
            cmdline = list2str(proc.cmdline())
            ret += ( \
                "\n{} {:.42} {}   {}    {}".format( \
                status, \
                cmdline, \
                " " * (45 - min(len(cmdline), 42)) + str(proc.pid), \
                proc.cpu_percent(interval=0.1), \
                str(round(proc.memory_percent(), 2))) \
                )
    else:
        ret += ("\n\t\t*** No running fuzzers found! ***")
    return ret
项目:MySQL_Watcher    作者:kinghows    | 项目源码 | 文件源码
def f_print_linux_status(save_as):
    ###????###################################################################
    #scputimes(user=, nice, system, idle, iowait, irq, softirq,steal, guest, guest_nice)
    cpu_times = psutil.cpu_times()
    #scpustats(ctx_switches, interrupts, soft_interrupts, syscalls)
    #cpu_stats = psutil.cpu_stats()
    # svmem(total , available, percent, used , free, active, inactive, buffers, cached, shared)
    mem = psutil.virtual_memory()
    # sswap(total, used, free, percent, sin, sout)
    swap = psutil.swap_memory()
    #sdiskusage(total, used, free, percent)
    #disk_usage = psutil.disk_usage('/')
    #sdiskio(read_count, write_count, read_bytes, write_bytes, read_time, write_time)
    #disk_io_counters = psutil.disk_io_counters()
    #snetio(bytes_sent, bytes_recv, packets_sent, packets_recv, errin, errout, dropin, dropout)
    #net = psutil.net_io_counters()
    #load
    try:
        load = os.getloadavg()
    except (OSError, AttributeError):
        stats = {}
    else:
        stats = {'min1': load[0], 'min5': load[1], 'min15': load[2]}

    #Uptime = datetime.datetime.fromtimestamp(psutil.boot_time()).strftime("%Y-%m-%d %H:%M:%S")
    ###????###################################################################
    style1 = {1: ' ,6,l', 2: ' ,10,r',3: ' ,6,l', 4: ' ,10,r',5: ' ,6,l', 6: ' ,6,r',7: ' ,8,l',8: ' ,6,r',9: ' ,6,l', 10: ' ,6,r',11: ' ,6,l', 12: ' ,5,r',}
    style = {1: ' ,l', 2: ' ,r',3: ' ,l', 4: ' ,r',5: ' ,l', 6: ' ,r',7: ' ,l',8: ' ,r',9: ' ,l', 10: ' ,r',11: ' ,l', 12: ' ,r',}
    rows=[
          ["CPU", str(psutil.cpu_percent(interval=1))+'%',"nice", cpu_times.nice,"MEM", str(mem.percent) + '%',"active", str(mem.active/1024/1024) + 'M',"SWAP", str(swap.percent)+'%',"LOAD", str(psutil.cpu_count())+'core'],
          ["user", cpu_times.user,"irq", cpu_times.irq,"total", str(mem.total/1024/1024)+'M',"inactive", str(mem.inactive/1024/1024) + 'M',"total", str(swap.total/1024/1024) + 'M',"1 min", stats["min1"]],
          ["system", cpu_times.system,"iowait", cpu_times.iowait,"used", str(mem.used/1024/1024)+'M',"buffers", str(mem.buffers/1024/1024) + 'M',"used", str(swap.used/1024/1024) + 'M',"5 min", stats["min5"]],
          ["idle", cpu_times.idle,"steal", cpu_times.steal,"free", str(mem.free/1024/1024) + 'M',"cached", str(mem.cached/1024/1024) + 'M',"free", str(swap.free/1024/1024) + 'M',"15 min", stats["min15"]]
         ]

    title = "Linux Overview"
    if save_as == "txt":
        f_print_title(title)
        f_print_table_body(rows, style1,' ')
    elif save_as == "html":
        f_print_table_html(rows, title, style)
项目:centos-base-consul    作者:zeroc0d3lab    | 项目源码 | 文件源码
def system_load(pl, format='{avg:.1f}', threshold_good=1, threshold_bad=2,
                track_cpu_count=False, short=False):
    '''Return system load average.

    Highlights using ``system_load_good``, ``system_load_bad`` and
    ``system_load_ugly`` highlighting groups, depending on the thresholds
    passed to the function.

    :param str format:
        format string, receives ``avg`` as an argument
    :param float threshold_good:
        threshold for gradient level 0: any normalized load average below this
        value will have this gradient level.
    :param float threshold_bad:
        threshold for gradient level 100: any normalized load average above this
        value will have this gradient level. Load averages between
        ``threshold_good`` and ``threshold_bad`` receive gradient level that
        indicates relative position in this interval:
        (``100 * (cur-good) / (bad-good)``).
        Note: both parameters are checked against normalized load averages.
    :param bool track_cpu_count:
        if True powerline will continuously poll the system to detect changes
        in the number of CPUs.
    :param bool short:
        if True only the sys load over last 1 minute will be displayed.

    Divider highlight group used: ``background:divider``.

    Highlight groups used: ``system_load_gradient`` (gradient) or ``system_load``.
    '''
    global cpu_count
    try:
        cpu_num = cpu_count = _cpu_count() if cpu_count is None or track_cpu_count else cpu_count
    except NotImplementedError:
        pl.warn('Unable to get CPU count: method is not implemented')
        return None
    ret = []
    for avg in os.getloadavg():
        normalized = avg / cpu_num
        if normalized < threshold_good:
            gradient_level = 0
        elif normalized < threshold_bad:
            gradient_level = (normalized - threshold_good) * 100.0 / (threshold_bad - threshold_good)
        else:
            gradient_level = 100
        ret.append({
            'contents': format.format(avg=avg),
            'highlight_groups': ['system_load_gradient', 'system_load'],
            'divider_highlight_group': 'background:divider',
            'gradient_level': gradient_level,
        })

        if short:
            return ret

    ret[0]['contents'] += ' '
    ret[1]['contents'] += ' '
    return ret
项目:Pigrow    作者:Pragmatismo    | 项目源码 | 文件源码
def gather_data(path="./"):
    print("Interorgating pi about it's status...")
    timenow = datetime.datetime.now()
    #check storage space
    st = os.statvfs(path)
    free = (st.f_bavail * st.f_frsize)
    total = (st.f_blocks * st.f_frsize)
    used = (st.f_blocks - st.f_bfree) * st.f_frsize
    try:
        percent = ret = (float(used) / total) * 100
    except ZeroDivisionError:
        percent = 0
    #check up time
    with open('/proc/uptime', 'r') as f:
        uptime_seconds = float(f.readline().split()[0])
        uptime_string = str(datetime.timedelta(seconds = uptime_seconds))
    load_ave1,load_ave5,load_ave15 = os.getloadavg() # system load Averages for 1, 5 and 15 min;
    #check memory info
    with open('/proc/meminfo', 'r') as f:
        for line in f:
            if line.split(":")[0]=="MemTotal":
                memtotal = line.split(":")[1].strip()
            elif line.split(":")[0]=="MemAvailable":
                memavail = line.split(":")[1].strip()
            elif line.split(":")[0]=="MemFree":
                memfree = line.split(":")[1].strip()
    #check cpu temp with '/opt/vc/bin/vcgencmd measure_temp'
    cpu_temp = os.popen('/opt/vc/bin/vcgencmd measure_temp').read().strip()
    cpu_temp = cpu_temp.split('=')[1]
    #send back data in a dictionary
    return {'disk_total':total,
            'disk_used':used,
            'disk_free':free,
            'disk_percent':round(percent, 1),
            'timenow':timenow,
            'uptime_sec':uptime_seconds,
            'uptime_str':uptime_string.split('.')[0],
            'load_ave1':load_ave1,
            'load_ave5':load_ave5,
            'load_ave15':load_ave15,
            'memtotal':memtotal,
            'memfree':memfree,
            'memavail':memavail,
            'cpu_temp':cpu_temp
            }
项目:pman    作者:FNNDSC    | 项目源码 | 文件源码
def t_hello_process(self, *args, **kwargs):
        """

        The 'hello' action is merely to 'speak' with the server. The server
        can return current date/time, echo back a string, query the startup
        command line args, etc.

        This method is a simple means of checking if the server is "up" and
        running.

        :param args:
        :param kwargs:
        :return:
        """

        self.dp.qprint("In hello process...")
        b_status            = False
        d_ret               = {}
        for k, v in kwargs.items():
            if k == 'request':      d_request   = v

        d_meta  = d_request['meta']
        if 'askAbout' in d_meta.keys():
            str_askAbout    = d_meta['askAbout']
            d_ret['name']       = self.within.str_name
            d_ret['version']    = self.within.str_version
            if str_askAbout == 'timestamp':
                str_timeStamp   = datetime.datetime.today().strftime('%Y%m%d%H%M%S.%f')
                d_ret['timestamp']              = {}
                d_ret['timestamp']['now']       = str_timeStamp
                b_status                        = True
            if str_askAbout == 'sysinfo':
                d_ret['sysinfo']                = {}
                d_ret['sysinfo']['system']      = platform.system()
                d_ret['sysinfo']['machine']     = platform.machine()
                d_ret['sysinfo']['platform']    = platform.platform()
                d_ret['sysinfo']['uname']       = platform.uname()
                d_ret['sysinfo']['version']     = platform.version()
                d_ret['sysinfo']['memory']      = psutil.virtual_memory()
                d_ret['sysinfo']['cpucount']    = multiprocessing.cpu_count()
                d_ret['sysinfo']['loadavg']     = os.getloadavg()
                d_ret['sysinfo']['cpu_percent'] = psutil.cpu_percent()
                d_ret['sysinfo']['hostname']    = socket.gethostname()
                d_ret['sysinfo']['inet']        = [l for l in ([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")][:1], [[(s.connect(('8.8.8.8', 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) if l][0][0]
                b_status                        = True
            if str_askAbout == 'echoBack':
                d_ret['echoBack']               = {}
                d_ret['echoBack']['msg']        = d_meta['echoBack']
                b_status                        = True

        return { 'd_ret':   d_ret,
                 'status':  b_status}
项目:dotfiles    作者:gbraad    | 项目源码 | 文件源码
def system_load(pl, format='{avg:.1f}', threshold_good=1, threshold_bad=2,
                track_cpu_count=False, short=False):
    '''Return system load average.

    Highlights using ``system_load_good``, ``system_load_bad`` and
    ``system_load_ugly`` highlighting groups, depending on the thresholds
    passed to the function.

    :param str format:
        format string, receives ``avg`` as an argument
    :param float threshold_good:
        threshold for gradient level 0: any normalized load average below this
        value will have this gradient level.
    :param float threshold_bad:
        threshold for gradient level 100: any normalized load average above this
        value will have this gradient level. Load averages between
        ``threshold_good`` and ``threshold_bad`` receive gradient level that
        indicates relative position in this interval:
        (``100 * (cur-good) / (bad-good)``).
        Note: both parameters are checked against normalized load averages.
    :param bool track_cpu_count:
        if True powerline will continuously poll the system to detect changes
        in the number of CPUs.
    :param bool short:
        if True only the sys load over last 1 minute will be displayed.

    Divider highlight group used: ``background:divider``.

    Highlight groups used: ``system_load_gradient`` (gradient) or ``system_load``.
    '''
    global cpu_count
    try:
        cpu_num = cpu_count = _cpu_count() if cpu_count is None or track_cpu_count else cpu_count
    except NotImplementedError:
        pl.warn('Unable to get CPU count: method is not implemented')
        return None
    ret = []
    for avg in os.getloadavg():
        normalized = avg / cpu_num
        if normalized < threshold_good:
            gradient_level = 0
        elif normalized < threshold_bad:
            gradient_level = (normalized - threshold_good) * 100.0 / (threshold_bad - threshold_good)
        else:
            gradient_level = 100
        ret.append({
            'contents': format.format(avg=avg),
            'highlight_groups': ['system_load_gradient', 'system_load'],
            'divider_highlight_group': 'background:divider',
            'gradient_level': gradient_level,
        })

        if short:
            return ret

    ret[0]['contents'] += ' '
    ret[1]['contents'] += ' '
    return ret
项目:LinuxBashShellScriptForOps    作者:DingGuodong    | 项目源码 | 文件源码
def print_header(procs_status, num_procs):
    """Print system-related info, above the process list."""

    def get_dashes(perc):
        dashes = "|" * int((float(perc) / 10 * 4))
        empty_dashes = " " * (40 - len(dashes))
        return dashes, empty_dashes

    # cpu usage
    percs = psutil.cpu_percent(interval=0, percpu=True)
    for cpu_num, perc in enumerate(percs):
        dashes, empty_dashes = get_dashes(perc)
        print_line(" CPU%-2s [%s%s] %5s%%" % (cpu_num, dashes, empty_dashes,
                                              perc))
    mem = psutil.virtual_memory()
    dashes, empty_dashes = get_dashes(mem.percent)
    line = " Mem   [%s%s] %5s%% %6s / %s" % (
        dashes, empty_dashes,
        mem.percent,
        str(int(mem.used / 1024 / 1024)) + "M",
        str(int(mem.total / 1024 / 1024)) + "M"
    )
    print_line(line)

    # swap usage
    swap = psutil.swap_memory()
    dashes, empty_dashes = get_dashes(swap.percent)
    line = " Swap  [%s%s] %5s%% %6s / %s" % (
        dashes, empty_dashes,
        swap.percent,
        str(int(swap.used / 1024 / 1024)) + "M",
        str(int(swap.total / 1024 / 1024)) + "M"
    )
    print_line(line)

    # processes number and status
    st = []
    for x, y in procs_status.items():
        if y:
            st.append("%s=%s" % (x, y))
    st.sort(key=lambda x: x[:3] in ('run', 'sle'), reverse=1)
    print_line(" Processes: %s (%s)" % (num_procs, ', '.join(st)))
    # load average, uptime
    uptime = datetime.datetime.now() - \
             datetime.datetime.fromtimestamp(psutil.boot_time())
    av1, av2, av3 = os.getloadavg()
    line = " Load average: %.2f %.2f %.2f  Uptime: %s" \
           % (av1, av2, av3, str(uptime).split('.')[0])
    print_line(line)