Python psutil 模块,disk_usage() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用psutil.disk_usage()

项目:stephanie-va    作者:SlapBot    | 项目源码 | 文件源码
def tell_system_status(self):
        import psutil
        import platform
        import datetime

        os, name, version, _, _, _ = platform.uname()
        version = version.split('-')[0]
        cores = psutil.cpu_count()
        cpu_percent = psutil.cpu_percent()
        memory_percent = psutil.virtual_memory()[2]
        disk_percent = psutil.disk_usage('/')[3]
        boot_time = datetime.datetime.fromtimestamp(psutil.boot_time())
        running_since = boot_time.strftime("%A %d. %B %Y")
        response = "I am currently running on %s version %s.  " % (os, version)
        response += "This system is named %s and has %s CPU cores.  " % (name, cores)
        response += "Current disk_percent is %s percent.  " % disk_percent
        response += "Current CPU utilization is %s percent.  " % cpu_percent
        response += "Current memory utilization is %s percent. " % memory_percent
        response += "it's running since %s." % running_since
        return response
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_disk_partitions_and_usage(self):
        # test psutil.disk_usage() and psutil.disk_partitions()
        # against "df -a"
        def df(path):
            out = sh('df -P -B 1 "%s"' % path).strip()
            lines = out.split('\n')
            lines.pop(0)
            line = lines.pop(0)
            dev, total, used, free = line.split()[:4]
            if dev == 'none':
                dev = ''
            total, used, free = int(total), int(used), int(free)
            return dev, total, used, free

        for part in psutil.disk_partitions(all=False):
            usage = psutil.disk_usage(part.mountpoint)
            dev, total, used, free = df(part.mountpoint)
            self.assertEqual(usage.total, total)
            # 10 MB tollerance
            if abs(usage.free - free) > 10 * 1024 * 1024:
                self.fail("psutil=%s, df=%s" % (usage.free, free))
            if abs(usage.used - used) > 10 * 1024 * 1024:
                self.fail("psutil=%s, df=%s" % (usage.used, used))
项目:Prism    作者:Stumblinbear    | 项目源码 | 文件源码
def get_file_systems(self):
        systems = psutil.disk_partitions()

        for i in range(0, len(systems)):
            system = systems[i]

            system_options = {}
            for option in system.opts.split(','):
                option_local = prism.helpers.locale_('system', 'mount.options.' + option)
                if option != option_local:
                    system_options[option] = option_local
                else:
                    system_options[option] = prism.helpers.locale_('system', 'mount.options.unknown')

            systems[i] = {'device': system.device, 'mount_point': system.mountpoint,
                            'fs_type': system.fstype, 'options': system_options,
                            'usage': psutil.disk_usage(system.mountpoint)}

        return systems
项目:cpu-g    作者:atareao    | 项目源码 | 文件源码
def disksinfo(self):
        values = []
        disk_partitions = psutil.disk_partitions(all=False)
        for partition in disk_partitions:
            usage = psutil.disk_usage(partition.mountpoint)
            device = {'device': partition.device,
                      'mountpoint': partition.mountpoint,
                      'fstype': partition.fstype,
                      'opts': partition.opts,
                      'total': usage.total,
                      'used': usage.used,
                      'free': usage.free,
                      'percent': usage.percent
                      }
            values.append(device)
        values = sorted(values, key=lambda device: device['device'])
        return values
项目:spc    作者:whbrewer    | 项目源码 | 文件源码
def get_stats():
    root.authorized()
    params = {}

    # number of jobs in queued, running, and completed states
    params['nq'] = db(jobs.state=='Q').count()
    params['nr'] = db(jobs.state=='R').count()
    params['nc'] = db(jobs.state=='C').count()

    params['cpu'] = psutil.cpu_percent()
    params['vm'] = psutil.virtual_memory().percent
    params['disk'] = psutil.disk_usage('/').percent
    params['cid'] = request.query.cid
    params['app'] = request.query.app

    return template("stats", params)
项目:mal    作者:chaosking121    | 项目源码 | 文件源码
def sysInfoRaw():
    import time
    import psutil

    cpuPercent = psutil.cpu_percent(interval=1)
    memPercent = psutil.virtual_memory().percent
    sda2Percent = psutil.disk_usage('/').percent
    sda3Percent = psutil.disk_usage('/home').percent

    seconds = int(time.time()) - int(psutil.boot_time())
    m, s = divmod(seconds, 60)
    h, m = divmod(m, 60)
    d, h = divmod(h, 24)

    uptime =  [d, h, m, s]

    return [cpuPercent, memPercent, sda2Percent, sda3Percent, uptime]
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_serialization(self):
        def check(ret):
            if json is not None:
                json.loads(json.dumps(ret))
            a = pickle.dumps(ret)
            b = pickle.loads(a)
            self.assertEqual(ret, b)

        check(psutil.Process().as_dict())
        check(psutil.virtual_memory())
        check(psutil.swap_memory())
        check(psutil.cpu_times())
        check(psutil.cpu_times_percent(interval=0))
        check(psutil.net_io_counters())
        if LINUX and not os.path.exists('/proc/diskstats'):
            pass
        else:
            if not APPVEYOR:
                check(psutil.disk_io_counters())
        check(psutil.disk_partitions())
        check(psutil.disk_usage(os.getcwd()))
        check(psutil.users())
项目:mycroft-skill-diagnostics    作者:the7erm    | 项目源码 | 文件源码
def handle_drive_intent(self, message):
        partitions = psutil.disk_partitions()
        for partition in partitions:
            print("partition.mountpoint: %s" % partition.mountpoint)
            if partition.mountpoint.startswith("/snap/"):
                continue
            partition_data = psutil.disk_usage(partition.mountpoint)
            # total=21378641920, used=4809781248, free=15482871808,
            # percent=22.5
            data = {
                "mountpoint": partition.mountpoint,
                "total": sizeof_fmt(partition_data.total),
                "used": sizeof_fmt(partition_data.used),
                "free": sizeof_fmt(partition_data.free),
                "percent": partition_data.percent
            }
            if partition_data.percent >= 90:
                self.speak_dialog("drive.low", data)
            else:
                self.speak_dialog("drive", data)
项目:dino    作者:thenetcircle    | 项目源码 | 文件源码
def disk():
    c = statsd.StatsClient(STATSD_HOST, 8125, prefix=PREFIX + 'system.disk')
    while True:
        for path, label in PATHS:
            disk_usage = psutil.disk_usage(path)

            st = os.statvfs(path)
            total_inode = st.f_files
            free_inode = st.f_ffree
            inode_percentage = int(100*(float(total_inode - free_inode) / total_inode))

            c.gauge('%s.inodes.percent' % label, inode_percentage)
            c.gauge('%s.total' % label, disk_usage.total)
            c.gauge('%s.used' % label, disk_usage.used)
            c.gauge('%s.free' % label, disk_usage.free)
            c.gauge('%s.percent' % label, disk_usage.percent)
        time.sleep(GRANULARITY)
项目:WIFIHTTPMonitor    作者:kbdancer    | 项目源码 | 文件源码
def POST(self):
        cpuused = psutil.cpu_percent(interval=None, percpu=False)
        memused = psutil.virtual_memory()[2]
        diskused = psutil.disk_usage('/')[3]
        pidcount = len(psutil.pids())

        uptimeshell = subprocess.Popen(['uptime'], stderr=subprocess.PIPE,stdout=subprocess.PIPE)
        uptimeshell.wait()
        uptime = uptimeshell.communicate()

        return json.dumps(
            {
                "code": 0,
                "current": {
                    "cpuused": cpuused,
                    "memused": memused,
                    "diskused": diskused,
                    "pidcount": pidcount,
                    "uptime": uptime
                }
            }
        )
项目:serverMonitor    作者:WangChengLongWilliam    | 项目源码 | 文件源码
def cpuinfo():
    men=psutil.virtual_memory()
    menab=(men.available)/(1024*1024)
    menta=(men.total)/(1024*1024)
    menus=(men.used)/(1024*1024)
    disk=psutil.disk_usage('/')
    diskta=(disk.total)/(1024*1024*1024)
    diskus=(disk.used)/(1024*1024*1024)
    diskfr=(disk.free)/(1024*1024*1024)
    time = datetime.datetime.now()
    with open('eggs.csv', 'a') as csvfile:
        spamwriter = csv.writer(csvfile, delimiter=' ',
                                quotechar='|', quoting=csv.QUOTE_MINIMAL)
        spamwriter.writerow({"??????%s"%time})
        spamwriter.writerow({"cpu?????%d%s"%(psutil.cpu_percent(interval=1),"%")})
        spamwriter.writerow({"???????%sMB" % int(menta)})
        spamwriter.writerow({"?????????%sMB" % int(menus)})
        spamwriter.writerow({"????????%sMB" % int(menab)})
        spamwriter.writerow({"???????%sG" % int(diskta)})
        spamwriter.writerow({"???????%sG" % int(diskus)})
        spamwriter.writerow({"???????%sG" % int(diskfr)})
    with open('eggs.csv', 'r') as csvfile:
          spamreader = csv.reader(csvfile, delimiter=' ', quotechar='|')
          for row in spamreader:
             print(row)
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def _get_docker_node_info(info):
    """Gets the node info specific to docker.
    """
    cpucapacity = int(cpu_count() * 100)
    memcapacity = (psutil.virtual_memory().total * 0.9) // _BYTES_IN_MB

    # TODO: manage disk space a little better
    if os.name == 'nt':
        path = 'C:\\ProgramData\\docker'
    else:
        path = '/var/lib/docker'

    diskfree = disk_usage(path).free // _BYTES_IN_MB

    info.update({
        'memory': '%dM' % memcapacity,
        'disk':  '%dM' % diskfree,
        'cpu': '%d%%' % cpucapacity,
        'up_since': up_since(),
    })

    return info
项目:SuperOcto    作者:mcecchi    | 项目源码 | 文件源码
def readGcodeFilesForOrigin(origin):
    if origin not in [FileDestinations.LOCAL, FileDestinations.SDCARD]:
        return make_response("Unknown origin: %s" % origin, 404)

    recursive = request.values.get("recursive", "false") in valid_boolean_trues
    force = request.values.get("force", "false") in valid_boolean_trues

    if force:
        with _file_cache_mutex:
            try:
                del _file_cache[origin]
            except KeyError:
                pass

    files = _getFileList(origin, recursive=recursive)

    if origin == FileDestinations.LOCAL:
        usage = psutil.disk_usage(settings().getBaseFolder("uploads"))
        return jsonify(files=files, free=usage.free, total=usage.total)
    else:
        return jsonify(files=files)
项目:EyesInTheSky    作者:SherineSameh    | 项目源码 | 文件源码
def CurrentSpecs():
    PrivateIP = getPrivateIP()
    CPU_temp = getCPUtemperature()
    CPU_usage = getCPUusage()
    RAM = psutil.phymem_usage()
    RAM_usage = RAM.percent
    disk = psutil.disk_usage('/')
    DISK_usage = disk.percent
    # RAM_stats = getRAMinfo()
    # RAM_total = round(int(RAM_stats[0]) / 1000,1)
    # RAM_used = round(int(RAM_stats[1]) / 1000,1)
    # RAM_free = round(int(RAM_stats[2]) / 1000,1)
    # RAM_perc = ((RAM_total - RAM_free)/RAM_total)*100
    # DISK_stats = getDiskSpace()
    # DISK_free = DISK_stats[1]
    # DISK_used = DISK_stats[2]
    # DISK_perc = DISK_stats[3]
    return(PrivateIP +':_:'+ CPU_temp +':_:'+ CPU_usage +':_:'+ str(DISK_usage) +':_:'+ str(RAM_usage))
项目:ops_agent    作者:sjqzhang    | 项目源码 | 文件源码
def get_disk_info(self):
        data = []
        try:
            disks = psutil.disk_partitions(all=True)
            for disk in disks:
                if not disk.device:
                    continue
                if disk.opts.upper() in ('CDROM', 'REMOVABLE'):
                    continue
                item = {}
                item['name'] = disk.device
                item['device'] = disk.device
                item['mountpoint'] = disk.mountpoint
                item['fstype'] = disk.fstype
                item['size'] = psutil.disk_usage(disk.mountpoint).total >> 10
                data.append(item)
            data.sort(key=lambda x: x['device'])
        except:
            data = []
            self.logger.error(traceback.format_exc())

        return data
项目:hapi    作者:mayaculpa    | 项目源码 | 文件源码
def update(self):
        """Function to update the entire class information."""
        self.cpu["percentage"] = psutil.cpu_percent(interval=0.7)
        self.boot = datetime.datetime.fromtimestamp(psutil.boot_time()).strftime(
            "%Y-%m-%d %H:%M:%S")
        virtual_memory = psutil.virtual_memory()
        self.memory["used"] = virtual_memory.used
        self.memory["free"] = virtual_memory.free
        self.memory["cached"] = virtual_memory.cached
        net_io_counters = psutil.net_io_counters()
        self.network["packet_sent"] = net_io_counters.packets_sent
        self.network["packet_recv"] = net_io_counters.packets_recv
        disk_usage = psutil.disk_usage('/')
        self.disk["total"] = int(disk_usage.total/1024)
        self.disk["used"] = int(disk_usage.used/1024)
        self.disk["free"] = int(disk_usage.free/1024)
        self.timestamp = time.time()
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_disk_partitions_and_usage(self):
        # test psutil.disk_usage() and psutil.disk_partitions()
        # against "df -a"
        def df(path):
            out = sh('df -P -B 1 "%s"' % path).strip()
            lines = out.split('\n')
            lines.pop(0)
            line = lines.pop(0)
            dev, total, used, free = line.split()[:4]
            if dev == 'none':
                dev = ''
            total, used, free = int(total), int(used), int(free)
            return dev, total, used, free

        for part in psutil.disk_partitions(all=False):
            usage = psutil.disk_usage(part.mountpoint)
            dev, total, used, free = df(part.mountpoint)
            self.assertEqual(usage.total, total)
            # 10 MB tollerance
            if abs(usage.free - free) > 10 * 1024 * 1024:
                self.fail("psutil=%s, df=%s" % (usage.free, free))
            if abs(usage.used - used) > 10 * 1024 * 1024:
                self.fail("psutil=%s, df=%s" % (usage.used, used))
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_serialization(self):
        def check(ret):
            if json is not None:
                json.loads(json.dumps(ret))
            a = pickle.dumps(ret)
            b = pickle.loads(a)
            self.assertEqual(ret, b)

        check(psutil.Process().as_dict())
        check(psutil.virtual_memory())
        check(psutil.swap_memory())
        check(psutil.cpu_times())
        check(psutil.cpu_times_percent(interval=0))
        check(psutil.net_io_counters())
        if LINUX and not os.path.exists('/proc/diskstats'):
            pass
        else:
            if not APPVEYOR:
                check(psutil.disk_io_counters())
        check(psutil.disk_partitions())
        check(psutil.disk_usage(os.getcwd()))
        check(psutil.users())
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_disk_partitions_and_usage(self):
        # test psutil.disk_usage() and psutil.disk_partitions()
        # against "df -a"
        def df(path):
            out = sh('df -P -B 1 "%s"' % path).strip()
            lines = out.split('\n')
            lines.pop(0)
            line = lines.pop(0)
            dev, total, used, free = line.split()[:4]
            if dev == 'none':
                dev = ''
            total, used, free = int(total), int(used), int(free)
            return dev, total, used, free

        for part in psutil.disk_partitions(all=False):
            usage = psutil.disk_usage(part.mountpoint)
            dev, total, used, free = df(part.mountpoint)
            self.assertEqual(usage.total, total)
            # 10 MB tollerance
            if abs(usage.free - free) > 10 * 1024 * 1024:
                self.fail("psutil=%s, df=%s" % (usage.free, free))
            if abs(usage.used - used) > 10 * 1024 * 1024:
                self.fail("psutil=%s, df=%s" % (usage.used, used))
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_serialization(self):
        def check(ret):
            if json is not None:
                json.loads(json.dumps(ret))
            a = pickle.dumps(ret)
            b = pickle.loads(a)
            self.assertEqual(ret, b)

        check(psutil.Process().as_dict())
        check(psutil.virtual_memory())
        check(psutil.swap_memory())
        check(psutil.cpu_times())
        check(psutil.cpu_times_percent(interval=0))
        check(psutil.net_io_counters())
        if LINUX and not os.path.exists('/proc/diskstats'):
            pass
        else:
            if not APPVEYOR:
                check(psutil.disk_io_counters())
        check(psutil.disk_partitions())
        check(psutil.disk_usage(os.getcwd()))
        check(psutil.users())
项目:pybix    作者:lioncui    | 项目源码 | 文件源码
def get_disk_rate_info(self):
        returnData = {}
        returnData['disk_total'] = {}
        returnData['disk_used'] = {}
        returnData['disk_percent'] = {}
        try:
            disk = psutil.disk_partitions()
            for val in disk:
                if val.fstype != "":
                    mountpoint = val.mountpoint
                    one = psutil.disk_usage(mountpoint)
                    tmp = one.total/1024/1024/1024.0
                    returnData['disk_total'][mountpoint] = "%.2f" % tmp
                    tmp = one.used/1024/1024/1024.0
                    returnData['disk_used'][mountpoint] = "%.2f" % tmp
                    returnData['disk_percent'][mountpoint] = one.percent
        except Exception:
            pybixlib.error(self.logHead + traceback.format_exc())
            self.errorInfoDone(traceback.format_exc())
        return returnData
项目:LinuxBashShellScriptForOps    作者:DingGuodong    | 项目源码 | 文件源码
def main():
    table = prettytable.PrettyTable(border=False, header=True, left_padding_width=2, padding_width=1)
    table.field_names = ["Device", "Total", "Used", "Free", "Use%", "Type", "Mount"]
    for part in psutil.disk_partitions(all=False):
        if os.name == 'nt':
            if 'cdrom' in part.opts or part.fstype == '':
                # skip cd-rom drives with no disk in it; they may raise
                # ENOENT, pop-up a Windows GUI error for a non-ready
                # partition or just hang.
                continue
        if 'docker' in part.mountpoint and 'aufs' in part.mountpoint:
            continue
        usage = psutil.disk_usage(part.mountpoint)

        table.add_row([part.device,
                       bytes2human(usage.total),
                       bytes2human(usage.used),
                       bytes2human(usage.free),
                       str(int(usage.percent)) + '%',
                       part.fstype,
                       part.mountpoint])
    for field in table.field_names:
        table.align[field] = "l"
    print table
项目:persepolis    作者:persepolisdm    | 项目源码 | 文件源码
def freeSpace(dir):
    try:
        import psutil
    except:
        logger.sendToLog("psutil in not installed!", "ERROR")

        return None

    try:
        dir_space = psutil.disk_usage(dir)
        free_space = dir_space.free
        return int(free_space)

    except Exception as e:
        # log in to the log file
        logger.sendToLog("persepolis couldn't find free space value:\n" + str(e), "ERROR")

        return None
项目:docklet    作者:unias    | 项目源码 | 文件源码
def collect_diskinfo(self):
        global workercinfo
        parts = psutil.disk_partitions()
        setval = []
        devices = {}
        for part in parts:
            # deal with each partition
            if not part.device in devices:
                devices[part.device] = 1
                diskval = {}
                diskval['device'] = part.device
                diskval['mountpoint'] = part.mountpoint
                try:
                    usage = psutil.disk_usage(part.mountpoint)
                    diskval['total'] = usage.total
                    diskval['used'] = usage.used
                    diskval['free'] = usage.free
                    diskval['percent'] = usage.percent
                    if(part.mountpoint.startswith('/opt/docklet/local/volume')):
                        # the mountpoint indicate that the data is the disk used information of a container
                        names = re.split('/',part.mountpoint)
                        container = names[len(names)-1]
                        if not container in workercinfo.keys():
                            workercinfo[container] = {}
                        workercinfo[container]['disk_use'] = diskval
                    setval.append(diskval)  # make a list
                except Exception as err:
                    logger.warning(traceback.format_exc())
                    logger.warning(err)
        #print(output)
        #print(diskparts)
        return setval

    # collect operating system information
项目:BioQueue    作者:liyao001    | 项目源码 | 文件源码
def get_disk_used(path='/'):
    du = psutil.disk_usage(path)
    return list(du)[1]
项目:BioQueue    作者:liyao001    | 项目源码 | 文件源码
def get_disk_free(path='/'):
    du = psutil.disk_usage(path)
    return list(du)[2]
项目:BioQueue    作者:liyao001    | 项目源码 | 文件源码
def get_init_resource():
    cpu = cpu_count() * 100
    psy_mem = psutil.virtual_memory()
    vrt_mem = psutil.swap_memory()
    disk = list(psutil.disk_usage(get_config("env", "workspace")))[0]
    return cpu, psy_mem.total, disk, vrt_mem.total + psy_mem.total
项目:Planet-Pipeline-GUI    作者:samapriya    | 项目源码 | 文件源码
def process_size(path, id_list, item_type, asset_type, overwrite):
    results = []
    summation=0
    path= args.size
    spc=psutil.disk_usage(path).free
    remain=float(spc)/1073741824
    # now start downloading each file
    for item_id in id_list:
        url = ASSET_URL.format(item_type, item_id)
        logging.info('Request: {}'.format(url))
        result = SESSION.get(url)
        check_status(result)
        try:
            if result.json()[asset_type]['status'] == 'active':
                download_url = result.json()[asset_type]['location']
                #print(download_url)
                pool = PoolManager()
                response = pool.request("GET", download_url, preload_content=False)
                max_bytes = 100000000000
                content_bytes = response.headers.get("Content-Length")
                print("Item-ID: "+str(item_id))
                #print(int(content_bytes)/1048576,"MB")
                summary=float(content_bytes)/1073741824
                summation=summation+summary
                print(format(float(summation),'.2f'),"GB", end='\r')
                #print ("Total Size in MB",summation)
            else:
                result = False
        except KeyError:
            print('Could not check activation status - asset type \'{}\' not found for {}'.format(asset_type, item_id))
            result = False


        results.append(result)
    #print(remain,"MB")
    print("Remaining Space in MB",format(float(remain*1024),'.2f'))
    print("Remaining Space in GB",format(float(remain),'.2f'))
    print ("Total Size in MB",format(float(summation*1024),'.2f'))
    print ("Total Size in GB",format(float(summation),'.2f'))
    return results
项目:Prism    作者:Stumblinbear    | 项目源码 | 文件源码
def get_disk(self):
        return int(psutil.disk_usage('/')[3])
项目:Prism    作者:Stumblinbear    | 项目源码 | 文件源码
def get_total_disk(self):
        return psutil.disk_usage('/')[0]
项目:zun    作者:openstack    | 项目源码 | 文件源码
def statvfs():
    docker_path = CONF.docker.root_directory
    if not os.path.exists(docker_path):
        docker_path = '/'
    return psutil.disk_usage(docker_path)
项目:node-agent    作者:Tendrl    | 项目源码 | 文件源码
def get_disk_usage(self, device_name):
        return psutil.disk_usage(device_name)
项目:Chasar    作者:camilochs    | 项目源码 | 文件源码
def process_send_data(socket, context):
    """
    Send all memory, cpu, disk, network data of computer to server(master node)
    """
    while True:
        try:
            cpu_percent = psutil.cpu_percent(interval=1, percpu=True)
            memory_info = psutil.virtual_memory()
            disk_info = psutil.disk_usage('/')
            pids_computer = psutil.pids()

            info_to_send = json.dumps({
                "computer_utc_clock": str(datetime.datetime.utcnow()),
                "computer_clock": str(datetime.datetime.now()),
                "hostname": platform.node(),
                "mac_address": mac_address(),
                "ipv4_interfaces": internet_addresses(),
                "cpu": {
                    "percent_used": cpu_percent
                },
                "memory": {
                    "total_bytes": memory_info.total,
                    "total_bytes_used": memory_info.used,
                    "percent_used": memory_info.percent
                },
                "disk": {
                    "total_bytes": disk_info.total,
                    "total_bytes_used": disk_info.used,
                    "total_bytes_free": disk_info.free,
                    "percent_used": disk_info.percent
                },
                "process": pids_active(pids_computer)

            }).encode()
            #send json data in the channel 'status', although is not necessary to send.
            socket.send_multipart([b"status", info_to_send])
            #time.sleep(0.500)

        except (KeyboardInterrupt, SystemExit):
            socket.close()
            context.term()
项目:Chasar    作者:camilochs    | 项目源码 | 文件源码
def process_send_data(socket, context):
    """
    Send all memory, cpu, disk, network data of computer to server(master node)
    """
    while True:
        try:
            cpu_percent = psutil.cpu_percent(interval=1, percpu=True)
            memory_info = psutil.virtual_memory()
            disk_info = psutil.disk_usage('/')
            pids_computer = psutil.pids()

            info_to_send = json.dumps({
                "computer_utc_clock": str(datetime.datetime.utcnow()),
                "computer_clock": str(datetime.datetime.now()),
                "hostname": platform.node(),
                "mac_address": mac_address(),
                "ipv4_interfaces": internet_addresses(),
                "cpu": {
                    "percent_used": cpu_percent
                },
                "memory": {
                    "total_bytes": memory_info.total,
                    "total_bytes_used": memory_info.used,
                    "percent_used": memory_info.percent
                },
                "disk": {
                    "total_bytes": disk_info.total,
                    "total_bytes_used": disk_info.used,
                    "total_bytes_free": disk_info.free,
                    "percent_used": disk_info.percent
                },
                "process": pids_active(pids_computer)

            }).encode()
            #send json data in the channel 'status', although is not necessary to send.
            socket.send_multipart([b"status", info_to_send])
            #time.sleep(0.500)

        except (KeyboardInterrupt, SystemExit):
            socket.close()
            context.term()
项目:telebot    作者:DronMDF    | 项目源码 | 文件源码
def actions(self):
        if psutil.disk_usage('/').percent > self.level:
            return [AcLowHDD(self.chat_id)]
        return []
项目:telebot    作者:DronMDF    | 项目源码 | 文件源码
def asString(self):
        return '???? ????? ?? %u%%' % psutil.disk_usage('/').percent
项目:django-sysinfo    作者:saxix    | 项目源码 | 文件源码
def get_device_info(path):
    try:
        info = psutil.disk_usage(os.path.realpath(path))
        return {"total": humanize_bytes(info.total),
                "used": humanize_bytes(info.used),
                "free": humanize_bytes(info.free)}
    except OSError as e:
        return {"ERROR": str(e)}
项目:CommunityCellularManager    作者:facebookincubator    | 项目源码 | 文件源码
def get_data(self):
        """Gets system utilization stats."""
        # Get system utilization stats.
        cpu_percent = psutil.cpu_percent(interval=1)
        memory_percent = psutil.virtual_memory().percent
        disk_percent = psutil.disk_usage('/').percent
        network_io = psutil.net_io_counters()
        # Compute deltas for sent and received bytes.  Note this is system-wide
        # network usage and not necessarily GPRS-related.
        # TODO(matt): query on a specific interface..which one, I'm not sure.
        if self.last_bytes_sent == 0:
            bytes_sent_delta = 0
        else:
            bytes_sent_delta = network_io.bytes_sent - self.last_bytes_sent
        self.last_bytes_sent = network_io.bytes_sent
        if self.last_bytes_received == 0:
            bytes_received_delta = 0
        else:
            bytes_received_delta = (
                network_io.bytes_recv - self.last_bytes_received)
        self.last_bytes_received = network_io.bytes_recv
        return {
            'cpu_percent': cpu_percent,
            'memory_percent': memory_percent,
            'disk_percent': disk_percent,
            'bytes_sent_delta': bytes_sent_delta,
            'bytes_received_delta': bytes_received_delta,
        }
项目:Planet-GEE-Pipeline-CLI    作者:samapriya    | 项目源码 | 文件源码
def process_size(path, id_list, item_type, asset_type, overwrite):
    results = []
    summation=0
    path= args.size
    spc=psutil.disk_usage(path).free
    remain=float(spc)/1073741824
    # now start downloading each file
    for item_id in id_list:
        url = ASSET_URL.format(item_type, item_id)
        logging.info('Request: {}'.format(url))
        result = SESSION.get(url)
        check_status(result)
        try:
            if result.json()[asset_type]['status'] == 'active':
                download_url = result.json()[asset_type]['location']
                #print(download_url)
                pool = PoolManager()
                response = pool.request("GET", download_url, preload_content=False)
                max_bytes = 100000000000
                content_bytes = response.headers.get("Content-Length")
                print("Item-ID: "+str(item_id))
                #print(int(content_bytes)/1048576,"MB")
                summary=float(content_bytes)/1073741824
                summation=summation+summary
                #print ("Total Size in MB",summation)
            else:
                result = False
        except KeyError:
            print('Could not check activation status - asset type \'{}\' not found for {}'.format(asset_type, item_id))
            result = False


        results.append(result)
    #print(remain,"MB")
    print("Remaining Space in MB",format(float(remain*1024),'.2f'))
    print("Remaining Space in GB",format(float(remain),'.2f'))
    print ("Total Size in MB",format(float(summation*1024),'.2f'))
    print ("Total Size in GB",format(float(summation),'.2f'))
    return results
项目:aetros-cli    作者:aetros    | 项目源码 | 文件源码
def collect_system_information(self):
        values = {}
        mem = psutil.virtual_memory()
        values['memory_total'] = mem.total

        import cpuinfo
        cpu = cpuinfo.get_cpu_info()
        values['resources_limit'] = self.resources_limit
        values['cpu_name'] = cpu['brand']
        values['cpu'] = [cpu['hz_advertised_raw'][0], cpu['count']]
        values['nets'] = {}
        values['disks'] = {}
        values['gpus'] = {}
        values['boot_time'] = psutil.boot_time()

        try:
            for gpu_id, gpu in enumerate(aetros.cuda_gpu.get_ordered_devices()):
                gpu['available'] = gpu_id in self.enabled_gpus

                values['gpus'][gpu_id] = gpu
        except Exception: pass

        for disk in psutil.disk_partitions():
            try:
                name = self.get_disk_name(disk[1])
                values['disks'][name] = psutil.disk_usage(disk[1]).total
            except Exception:
                # suppress Operation not permitted
                pass

        try:
            for id, net in psutil.net_if_stats().items():
                if 0 != id.find('lo') and net.isup:
                    self.nets.append(id)
                    values['nets'][id] = net.speed or 1000
        except Exception:
            # suppress Operation not permitted
            pass

        return values
项目:psystem    作者:gokhanm    | 项目源码 | 文件源码
def partition(self, partition):
        """
            System disk partition usage

            Return psutil class
        """
        usage = psutil.disk_usage(str(partition))

        return usage
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_disk_usage(self):
        self.execute(psutil.disk_usage, '.')
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_disks(self):
        # test psutil.disk_usage() and psutil.disk_partitions()
        # against "df -a"
        def df(path):
            out = sh('df -k "%s"' % path).strip()
            lines = out.split('\n')
            lines.pop(0)
            line = lines.pop(0)
            dev, total, used, free = line.split()[:4]
            if dev == 'none':
                dev = ''
            total = int(total) * 1024
            used = int(used) * 1024
            free = int(free) * 1024
            return dev, total, used, free

        for part in psutil.disk_partitions(all=False):
            usage = psutil.disk_usage(part.mountpoint)
            dev, total, used, free = df(part.mountpoint)
            self.assertEqual(part.device, dev)
            self.assertEqual(usage.total, total)
            # 10 MB tollerance
            if abs(usage.free - free) > 10 * 1024 * 1024:
                self.fail("psutil=%s, df=%s" % usage.free, free)
            if abs(usage.used - used) > 10 * 1024 * 1024:
                self.fail("psutil=%s, df=%s" % usage.used, used)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_disk_usage(self):
        self.assert_stdout('disk_usage.py')
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_disk_usage(self):
        psutil.disk_usage(self.udir)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_disk_usage(self):
        usage = psutil.disk_usage(os.getcwd())
        assert usage.total > 0, usage
        assert usage.used > 0, usage
        assert usage.free > 0, usage
        assert usage.total > usage.used, usage
        assert usage.total > usage.free, usage
        assert 0 <= usage.percent <= 100, usage.percent
        if hasattr(shutil, 'disk_usage'):
            # py >= 3.3, see: http://bugs.python.org/issue12442
            shutil_usage = shutil.disk_usage(os.getcwd())
            tolerance = 5 * 1024 * 1024  # 5MB
            self.assertEqual(usage.total, shutil_usage.total)
            self.assertAlmostEqual(usage.free, shutil_usage.free,
                                   delta=tolerance)
            self.assertAlmostEqual(usage.used, shutil_usage.used,
                                   delta=tolerance)

        # if path does not exist OSError ENOENT is expected across
        # all platforms
        fname = tempfile.mktemp()
        try:
            psutil.disk_usage(fname)
        except OSError as err:
            if err.args[0] != errno.ENOENT:
                raise
        else:
            self.fail("OSError not raised")
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_disk_usage_unicode(self):
        # see: https://github.com/giampaolo/psutil/issues/416
        safe_rmpath(TESTFN_UNICODE)
        self.addCleanup(safe_rmpath, TESTFN_UNICODE)
        os.mkdir(TESTFN_UNICODE)
        psutil.disk_usage(TESTFN_UNICODE)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_disk_usage(self):
        def df(device):
            out = sh("df -k %s" % device).strip()
            line = out.split('\n')[1]
            fields = line.split()
            total = int(fields[1]) * 1024
            used = int(fields[2]) * 1024
            free = int(fields[3]) * 1024
            percent = float(fields[4].replace('%', ''))
            return (total, used, free, percent)

        tolerance = 4 * 1024 * 1024  # 4MB
        for part in psutil.disk_partitions(all=False):
            usage = psutil.disk_usage(part.mountpoint)
            try:
                total, used, free, percent = df(part.device)
            except RuntimeError as err:
                # see:
                # https://travis-ci.org/giampaolo/psutil/jobs/138338464
                # https://travis-ci.org/giampaolo/psutil/jobs/138343361
                if "no such file or directory" in str(err).lower() or \
                        "raw devices not supported" in str(err).lower():
                    continue
                else:
                    raise
            else:
                self.assertAlmostEqual(usage.total, total, delta=tolerance)
                self.assertAlmostEqual(usage.used, used, delta=tolerance)
                self.assertAlmostEqual(usage.free, free, delta=tolerance)
                self.assertAlmostEqual(usage.percent, percent, delta=1)
项目:Platypus    作者:gmemstr    | 项目源码 | 文件源码
def GetStats():
    s = {}
    s["cpu"] = round(psutil.cpu_percent())  # Used CPU
    s["memory"] = round(psutil.virtual_memory().percent)  # Used memory
    s["disk"] = round(psutil.disk_usage('C:\\').percent)  # Used disk
    return s
项目:JimV-N    作者:jamesiter    | 项目源码 | 文件源码
def update_disks(self):
        self.disks.clear()
        for disk in psutil.disk_partitions(all=False):
            disk_usage = psutil.disk_usage(disk.mountpoint)
            self.disks[disk.mountpoint] = {'device': disk.device, 'real_device': disk.device, 'fstype': disk.fstype,
                                           'opts': disk.opts, 'total': disk_usage.total, 'used': disk_usage.used,
                                           'free': disk_usage.free, 'percent': disk_usage.percent}

            if os.path.islink(disk.device):
                self.disks[disk.mountpoint]['real_device'] = os.path.realpath(disk.device)

    # ?????????????? ??? ???