Python psutil 模块,disk_partitions() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用psutil.disk_partitions()

项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_disk_partitions_and_usage(self):
        # test psutil.disk_usage() and psutil.disk_partitions()
        # against "df -a"
        def df(path):
            out = sh('df -P -B 1 "%s"' % path).strip()
            lines = out.split('\n')
            lines.pop(0)
            line = lines.pop(0)
            dev, total, used, free = line.split()[:4]
            if dev == 'none':
                dev = ''
            total, used, free = int(total), int(used), int(free)
            return dev, total, used, free

        for part in psutil.disk_partitions(all=False):
            usage = psutil.disk_usage(part.mountpoint)
            dev, total, used, free = df(part.mountpoint)
            self.assertEqual(usage.total, total)
            # 10 MB tollerance
            if abs(usage.free - free) > 10 * 1024 * 1024:
                self.fail("psutil=%s, df=%s" % (usage.free, free))
            if abs(usage.used - used) > 10 * 1024 * 1024:
                self.fail("psutil=%s, df=%s" % (usage.used, used))
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_procfs_path(self):
        tdir = tempfile.mkdtemp()
        try:
            psutil.PROCFS_PATH = tdir
            self.assertRaises(IOError, psutil.virtual_memory)
            self.assertRaises(IOError, psutil.cpu_times)
            self.assertRaises(IOError, psutil.cpu_times, percpu=True)
            self.assertRaises(IOError, psutil.boot_time)
            # self.assertRaises(IOError, psutil.pids)
            self.assertRaises(IOError, psutil.net_connections)
            self.assertRaises(IOError, psutil.net_io_counters)
            self.assertRaises(IOError, psutil.net_if_stats)
            self.assertRaises(IOError, psutil.disk_io_counters)
            self.assertRaises(IOError, psutil.disk_partitions)
            self.assertRaises(psutil.NoSuchProcess, psutil.Process)
        finally:
            psutil.PROCFS_PATH = "/proc"
            os.rmdir(tdir)
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def btrfs_get_mountpoints_of_subvol(subvol='', **kwargs):
    """
    Determine the list of mountpoints for a given subvol (of the form @/foo/bar).

    Returns a list of mountpoint(s), or an empty list.
    """
    mountpoints = []
    if not subvol:
        return []

    # Seems the easiest way to do this is to walk the disk partitions, extract the opts
    # string and see if subvol is present.  Remember the leading '/'.
    for part in psutil.disk_partitions():
        if "subvol=/{}".format(subvol) in part.opts:
            mountpoints.append(part.mountpoint)

    return mountpoints


# pylint: disable=unused-argument
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def get_mountpoint_opts(mountpoint='', **kwargs):
    """
    Determine the mount options set for a given mountpoint.

    Returns a list of mount opts or None on error.  For opts in the form 'key=val',
    convert the opt into dictionary.  Thus, our return structure may look
    something like:
      [ 'rw', 'relatime', ..., { 'subvolid': '259' }, ... ]'
    """
    opts = None

    for part in psutil.disk_partitions():
        if part.mountpoint == mountpoint:
            opts = part.opts.split(',')

    # Convert foo=bar to dictionary entries if opts is not None or not an empty list.
    opts = [o if '=' not in o else {k: v for (k, v) in [tuple(o.split('='))]}
            for o in opts] if opts else None

    if not opts:
        log.error("Failed to determine mount opts for '{}'.".format(mountpoint))

    return opts
项目:Prism    作者:Stumblinbear    | 项目源码 | 文件源码
def get_file_systems(self):
        systems = psutil.disk_partitions()

        for i in range(0, len(systems)):
            system = systems[i]

            system_options = {}
            for option in system.opts.split(','):
                option_local = prism.helpers.locale_('system', 'mount.options.' + option)
                if option != option_local:
                    system_options[option] = option_local
                else:
                    system_options[option] = prism.helpers.locale_('system', 'mount.options.unknown')

            systems[i] = {'device': system.device, 'mount_point': system.mountpoint,
                            'fs_type': system.fstype, 'options': system_options,
                            'usage': psutil.disk_usage(system.mountpoint)}

        return systems
项目:cpu-g    作者:atareao    | 项目源码 | 文件源码
def disksinfo(self):
        values = []
        disk_partitions = psutil.disk_partitions(all=False)
        for partition in disk_partitions:
            usage = psutil.disk_usage(partition.mountpoint)
            device = {'device': partition.device,
                      'mountpoint': partition.mountpoint,
                      'fstype': partition.fstype,
                      'opts': partition.opts,
                      'total': usage.total,
                      'used': usage.used,
                      'free': usage.free,
                      'percent': usage.percent
                      }
            values.append(device)
        values = sorted(values, key=lambda device: device['device'])
        return values
项目:charm-ceph-osd    作者:openstack    | 项目源码 | 文件源码
def add_device(request, device_path, bucket=None):
    ceph.utils.osdize(dev, hookenv.config('osd-format'),
                      ceph_hooks.get_journal_devices(),
                      hookenv.config('osd-reformat'),
                      hookenv.config('ignore-device-errors'),
                      hookenv.config('osd-encrypt'),
                      hookenv.config('bluestore'))
    # Make it fast!
    if hookenv.config('autotune'):
        ceph.utils.tune_dev(dev)
    mounts = filter(lambda disk: device_path
                    in disk.device, psutil.disk_partitions())
    if mounts:
        osd = mounts[0]
        osd_id = osd.mountpoint.split('/')[-1].split('-')[-1]
        request.ops.append({
            'op': 'move-osd-to-bucket',
            'osd': "osd.{}".format(osd_id),
            'bucket': bucket})
    return request
项目:psystem    作者:gokhanm    | 项目源码 | 文件源码
def disk(self):
        """
            Per system partition disk usage on system
            Converting byte to human readable format

            Return dict
        """
        disk_info = {}
        parts = self.disk_partitions

        for i in parts:
            part = i.mountpoint
            part_usage = self.partition(part)
            total = self.hr(part_usage.total)
            used = self.hr(part_usage.used)
            free = self.hr(part_usage.free)
            percent = part_usage.percent
            disk_info[part] = {
                                "total": total,
                                "used": used,
                                "free": free,
                                "percent": percent
                              }

        return disk_info
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_disk_partitions_mocked(self):
        # Test that ZFS partitions are returned.
        with open("/proc/filesystems", "r") as f:
            data = f.read()
        if 'zfs' in data:
            for part in psutil.disk_partitions():
                if part.fstype == 'zfs':
                    break
            else:
                self.fail("couldn't find any ZFS partition")
        else:
            # No ZFS partitions on this system. Let's fake one.
            fake_file = io.StringIO(u("nodev\tzfs\n"))
            with mock.patch('psutil._pslinux.open',
                            return_value=fake_file, create=True) as m1:
                with mock.patch(
                        'psutil._pslinux.cext.disk_partitions',
                        return_value=[('/dev/sdb3', '/', 'zfs', 'rw')]) as m2:
                    ret = psutil.disk_partitions()
                    assert m1.called
                    assert m2.called
                    assert ret
                    self.assertEqual(ret[0].fstype, 'zfs')
项目:charm-ceph    作者:openstack    | 项目源码 | 文件源码
def add_device(request, device_path, bucket=None):
    ceph.osdize(dev, config('osd-format'),
                get_journal_devices(), config('osd-reformat'),
                config('ignore-device-errors'),
                config('osd-encrypt'),
                config('bluestore'))
    # Make it fast!
    if config('autotune'):
        ceph.tune_dev(dev)
    mounts = filter(lambda disk: device_path
                    in disk.device, psutil.disk_partitions())
    if mounts:
        osd = mounts[0]
        osd_id = osd.mountpoint.split('/')[-1].split('-')[-1]
        request.ops.append({
            'op': 'move-osd-to-bucket',
            'osd': "osd.{}".format(osd_id),
            'bucket': bucket})
    return request
项目:mycroft-skill-diagnostics    作者:the7erm    | 项目源码 | 文件源码
def handle_drive_intent(self, message):
        partitions = psutil.disk_partitions()
        for partition in partitions:
            print("partition.mountpoint: %s" % partition.mountpoint)
            if partition.mountpoint.startswith("/snap/"):
                continue
            partition_data = psutil.disk_usage(partition.mountpoint)
            # total=21378641920, used=4809781248, free=15482871808,
            # percent=22.5
            data = {
                "mountpoint": partition.mountpoint,
                "total": sizeof_fmt(partition_data.total),
                "used": sizeof_fmt(partition_data.used),
                "free": sizeof_fmt(partition_data.free),
                "percent": partition_data.percent
            }
            if partition_data.percent >= 90:
                self.speak_dialog("drive.low", data)
            else:
                self.speak_dialog("drive", data)
项目:ops_agent    作者:sjqzhang    | 项目源码 | 文件源码
def get_disk_info(self):
        data = []
        try:
            disks = psutil.disk_partitions(all=True)
            for disk in disks:
                if not disk.device:
                    continue
                if disk.opts.upper() in ('CDROM', 'REMOVABLE'):
                    continue
                item = {}
                item['name'] = disk.device
                item['device'] = disk.device
                item['mountpoint'] = disk.mountpoint
                item['fstype'] = disk.fstype
                item['size'] = psutil.disk_usage(disk.mountpoint).total >> 10
                data.append(item)
            data.sort(key=lambda x: x['device'])
        except:
            data = []
            self.logger.error(traceback.format_exc())

        return data
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_disk_partitions_and_usage(self):
        # test psutil.disk_usage() and psutil.disk_partitions()
        # against "df -a"
        def df(path):
            out = sh('df -P -B 1 "%s"' % path).strip()
            lines = out.split('\n')
            lines.pop(0)
            line = lines.pop(0)
            dev, total, used, free = line.split()[:4]
            if dev == 'none':
                dev = ''
            total, used, free = int(total), int(used), int(free)
            return dev, total, used, free

        for part in psutil.disk_partitions(all=False):
            usage = psutil.disk_usage(part.mountpoint)
            dev, total, used, free = df(part.mountpoint)
            self.assertEqual(usage.total, total)
            # 10 MB tollerance
            if abs(usage.free - free) > 10 * 1024 * 1024:
                self.fail("psutil=%s, df=%s" % (usage.free, free))
            if abs(usage.used - used) > 10 * 1024 * 1024:
                self.fail("psutil=%s, df=%s" % (usage.used, used))
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_disk_partitions_mocked(self):
        # Test that ZFS partitions are returned.
        with open("/proc/filesystems", "r") as f:
            data = f.read()
        if 'zfs' in data:
            for part in psutil.disk_partitions():
                if part.fstype == 'zfs':
                    break
            else:
                self.fail("couldn't find any ZFS partition")
        else:
            # No ZFS partitions on this system. Let's fake one.
            fake_file = io.StringIO(u("nodev\tzfs\n"))
            with mock.patch('psutil._pslinux.open',
                            return_value=fake_file, create=True) as m1:
                with mock.patch(
                        'psutil._pslinux.cext.disk_partitions',
                        return_value=[('/dev/sdb3', '/', 'zfs', 'rw')]) as m2:
                    ret = psutil.disk_partitions()
                    assert m1.called
                    assert m2.called
                    assert ret
                    self.assertEqual(ret[0].fstype, 'zfs')
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_procfs_path(self):
        tdir = tempfile.mkdtemp()
        try:
            psutil.PROCFS_PATH = tdir
            self.assertRaises(IOError, psutil.virtual_memory)
            self.assertRaises(IOError, psutil.cpu_times)
            self.assertRaises(IOError, psutil.cpu_times, percpu=True)
            self.assertRaises(IOError, psutil.boot_time)
            # self.assertRaises(IOError, psutil.pids)
            self.assertRaises(IOError, psutil.net_connections)
            self.assertRaises(IOError, psutil.net_io_counters)
            self.assertRaises(IOError, psutil.net_if_stats)
            self.assertRaises(IOError, psutil.disk_io_counters)
            self.assertRaises(IOError, psutil.disk_partitions)
            self.assertRaises(psutil.NoSuchProcess, psutil.Process)
        finally:
            psutil.PROCFS_PATH = "/proc"
            os.rmdir(tdir)
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_disk_partitions_and_usage(self):
        # test psutil.disk_usage() and psutil.disk_partitions()
        # against "df -a"
        def df(path):
            out = sh('df -P -B 1 "%s"' % path).strip()
            lines = out.split('\n')
            lines.pop(0)
            line = lines.pop(0)
            dev, total, used, free = line.split()[:4]
            if dev == 'none':
                dev = ''
            total, used, free = int(total), int(used), int(free)
            return dev, total, used, free

        for part in psutil.disk_partitions(all=False):
            usage = psutil.disk_usage(part.mountpoint)
            dev, total, used, free = df(part.mountpoint)
            self.assertEqual(usage.total, total)
            # 10 MB tollerance
            if abs(usage.free - free) > 10 * 1024 * 1024:
                self.fail("psutil=%s, df=%s" % (usage.free, free))
            if abs(usage.used - used) > 10 * 1024 * 1024:
                self.fail("psutil=%s, df=%s" % (usage.used, used))
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_disk_partitions_mocked(self):
        # Test that ZFS partitions are returned.
        with open("/proc/filesystems", "r") as f:
            data = f.read()
        if 'zfs' in data:
            for part in psutil.disk_partitions():
                if part.fstype == 'zfs':
                    break
            else:
                self.fail("couldn't find any ZFS partition")
        else:
            # No ZFS partitions on this system. Let's fake one.
            fake_file = io.StringIO(u("nodev\tzfs\n"))
            with mock.patch('psutil._pslinux.open',
                            return_value=fake_file, create=True) as m1:
                with mock.patch(
                        'psutil._pslinux.cext.disk_partitions',
                        return_value=[('/dev/sdb3', '/', 'zfs', 'rw')]) as m2:
                    ret = psutil.disk_partitions()
                    assert m1.called
                    assert m2.called
                    assert ret
                    self.assertEqual(ret[0].fstype, 'zfs')
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_procfs_path(self):
        tdir = tempfile.mkdtemp()
        try:
            psutil.PROCFS_PATH = tdir
            self.assertRaises(IOError, psutil.virtual_memory)
            self.assertRaises(IOError, psutil.cpu_times)
            self.assertRaises(IOError, psutil.cpu_times, percpu=True)
            self.assertRaises(IOError, psutil.boot_time)
            # self.assertRaises(IOError, psutil.pids)
            self.assertRaises(IOError, psutil.net_connections)
            self.assertRaises(IOError, psutil.net_io_counters)
            self.assertRaises(IOError, psutil.net_if_stats)
            self.assertRaises(IOError, psutil.disk_io_counters)
            self.assertRaises(IOError, psutil.disk_partitions)
            self.assertRaises(psutil.NoSuchProcess, psutil.Process)
        finally:
            psutil.PROCFS_PATH = "/proc"
            os.rmdir(tdir)
项目:pybix    作者:lioncui    | 项目源码 | 文件源码
def get_disk_rate_info(self):
        returnData = {}
        returnData['disk_total'] = {}
        returnData['disk_used'] = {}
        returnData['disk_percent'] = {}
        try:
            disk = psutil.disk_partitions()
            for val in disk:
                if val.fstype != "":
                    mountpoint = val.mountpoint
                    one = psutil.disk_usage(mountpoint)
                    tmp = one.total/1024/1024/1024.0
                    returnData['disk_total'][mountpoint] = "%.2f" % tmp
                    tmp = one.used/1024/1024/1024.0
                    returnData['disk_used'][mountpoint] = "%.2f" % tmp
                    returnData['disk_percent'][mountpoint] = one.percent
        except Exception:
            pybixlib.error(self.logHead + traceback.format_exc())
            self.errorInfoDone(traceback.format_exc())
        return returnData
项目:plugins    作者:site24x7    | 项目源码 | 文件源码
def _get_metrics(self):
        inode_files = 0
        inode_free = 0

        self.metrics['plugin_version'] = PLUGIN_VERSION
        self.metrics['heartbeat_required'] = HEARTBEAT

        for part in psutil.disk_partitions(all=False):
            inode_stats = self.__get_inode_stats(part.mountpoint)
            inode_files += inode_stats.f_files
            inode_free += inode_stats.f_ffree

        inode_use_pct = 0
        inode_used = inode_files - inode_free
        self.metrics['inode_total'] = inode_files
        self.metrics['inode_used'] = inode_used
        self.metrics['inode_free'] = inode_free
        if inode_files > 0:
            inode_use_pct =  "{:.2f}".format((inode_used * 100.0) / inode_files )
        self.metrics['inode_use_percent'] = inode_use_pct
        self.units['inode_use_percent'] = "%"

        self.metrics["units"] = self.units
        return self.metrics
项目:LinuxBashShellScriptForOps    作者:DingGuodong    | 项目源码 | 文件源码
def main():
    table = prettytable.PrettyTable(border=False, header=True, left_padding_width=2, padding_width=1)
    table.field_names = ["Device", "Total", "Used", "Free", "Use%", "Type", "Mount"]
    for part in psutil.disk_partitions(all=False):
        if os.name == 'nt':
            if 'cdrom' in part.opts or part.fstype == '':
                # skip cd-rom drives with no disk in it; they may raise
                # ENOENT, pop-up a Windows GUI error for a non-ready
                # partition or just hang.
                continue
        if 'docker' in part.mountpoint and 'aufs' in part.mountpoint:
            continue
        usage = psutil.disk_usage(part.mountpoint)

        table.add_row([part.device,
                       bytes2human(usage.total),
                       bytes2human(usage.used),
                       bytes2human(usage.free),
                       str(int(usage.percent)) + '%',
                       part.fstype,
                       part.mountpoint])
    for field in table.field_names:
        table.align[field] = "l"
    print table
项目:docklet    作者:unias    | 项目源码 | 文件源码
def collect_diskinfo(self):
        global workercinfo
        parts = psutil.disk_partitions()
        setval = []
        devices = {}
        for part in parts:
            # deal with each partition
            if not part.device in devices:
                devices[part.device] = 1
                diskval = {}
                diskval['device'] = part.device
                diskval['mountpoint'] = part.mountpoint
                try:
                    usage = psutil.disk_usage(part.mountpoint)
                    diskval['total'] = usage.total
                    diskval['used'] = usage.used
                    diskval['free'] = usage.free
                    diskval['percent'] = usage.percent
                    if(part.mountpoint.startswith('/opt/docklet/local/volume')):
                        # the mountpoint indicate that the data is the disk used information of a container
                        names = re.split('/',part.mountpoint)
                        container = names[len(names)-1]
                        if not container in workercinfo.keys():
                            workercinfo[container] = {}
                        workercinfo[container]['disk_use'] = diskval
                    setval.append(diskval)  # make a list
                except Exception as err:
                    logger.warning(traceback.format_exc())
                    logger.warning(err)
        #print(output)
        #print(diskparts)
        return setval

    # collect operating system information
项目:pykit    作者:baishancloud    | 项目源码 | 文件源码
def get_all_mountpoint(all=False):
    partitions = psutil.disk_partitions(all=all)
    prt_by_mp = [x.mountpoint for x in partitions]
    return prt_by_mp
项目:pykit    作者:baishancloud    | 项目源码 | 文件源码
def get_disk_partitions():

    partitions = psutil.disk_partitions(all=True)

    by_mount_point = {}
    for pt in partitions:
        # OrderedDict([
        #      ('device', '/dev/disk1'),
        #      ('mountpoint', '/'),
        #      ('fstype', 'hfs'),
        #      ('opts', 'rw,local,rootfs,dovolfs,journaled,multilabel')])
        by_mount_point[pt.mountpoint] = _to_dict(pt)

    return by_mount_point
项目:aetros-cli    作者:aetros    | 项目源码 | 文件源码
def collect_system_information(self):
        values = {}
        mem = psutil.virtual_memory()
        values['memory_total'] = mem.total

        import cpuinfo
        cpu = cpuinfo.get_cpu_info()
        values['resources_limit'] = self.resources_limit
        values['cpu_name'] = cpu['brand']
        values['cpu'] = [cpu['hz_advertised_raw'][0], cpu['count']]
        values['nets'] = {}
        values['disks'] = {}
        values['gpus'] = {}
        values['boot_time'] = psutil.boot_time()

        try:
            for gpu_id, gpu in enumerate(aetros.cuda_gpu.get_ordered_devices()):
                gpu['available'] = gpu_id in self.enabled_gpus

                values['gpus'][gpu_id] = gpu
        except Exception: pass

        for disk in psutil.disk_partitions():
            try:
                name = self.get_disk_name(disk[1])
                values['disks'][name] = psutil.disk_usage(disk[1]).total
            except Exception:
                # suppress Operation not permitted
                pass

        try:
            for id, net in psutil.net_if_stats().items():
                if 0 != id.find('lo') and net.isup:
                    self.nets.append(id)
                    values['nets'][id] = net.speed or 1000
        except Exception:
            # suppress Operation not permitted
            pass

        return values
项目:psystem    作者:gokhanm    | 项目源码 | 文件源码
def disk_partitions(self):
        """ 
            System disk partitions

            Return list
        """
        partitions = psutil.disk_partitions()

        return partitions
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_disk_partitions(self):
        self.execute(psutil.disk_partitions)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_disks(self):
        # test psutil.disk_usage() and psutil.disk_partitions()
        # against "df -a"
        def df(path):
            out = sh('df -k "%s"' % path).strip()
            lines = out.split('\n')
            lines.pop(0)
            line = lines.pop(0)
            dev, total, used, free = line.split()[:4]
            if dev == 'none':
                dev = ''
            total = int(total) * 1024
            used = int(used) * 1024
            free = int(free) * 1024
            return dev, total, used, free

        for part in psutil.disk_partitions(all=False):
            usage = psutil.disk_usage(part.mountpoint)
            dev, total, used, free = df(part.mountpoint)
            self.assertEqual(part.device, dev)
            self.assertEqual(usage.total, total)
            # 10 MB tollerance
            if abs(usage.free - free) > 10 * 1024 * 1024:
                self.fail("psutil=%s, df=%s" % usage.free, free)
            if abs(usage.used - used) > 10 * 1024 * 1024:
                self.fail("psutil=%s, df=%s" % usage.used, used)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_disks(self):
        ps_parts = psutil.disk_partitions(all=True)
        wmi_parts = wmi.WMI().Win32_LogicalDisk()
        for ps_part in ps_parts:
            for wmi_part in wmi_parts:
                if ps_part.device.replace('\\', '') == wmi_part.DeviceID:
                    if not ps_part.mountpoint:
                        # this is usually a CD-ROM with no disk inserted
                        break
                    try:
                        usage = psutil.disk_usage(ps_part.mountpoint)
                    except OSError as err:
                        if err.errno == errno.ENOENT:
                            # usually this is the floppy
                            break
                        else:
                            raise
                    self.assertEqual(usage.total, int(wmi_part.Size))
                    wmi_free = int(wmi_part.FreeSpace)
                    self.assertEqual(usage.free, wmi_free)
                    # 10 MB tollerance
                    if abs(usage.free - wmi_free) > 10 * 1024 * 1024:
                        self.fail("psutil=%s, wmi=%s" % (
                            usage.free, wmi_free))
                    break
            else:
                self.fail("can't find partition %s" % repr(ps_part))
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_disk_usage(self):
        def df(device):
            out = sh("df -k %s" % device).strip()
            line = out.split('\n')[1]
            fields = line.split()
            total = int(fields[1]) * 1024
            used = int(fields[2]) * 1024
            free = int(fields[3]) * 1024
            percent = float(fields[4].replace('%', ''))
            return (total, used, free, percent)

        tolerance = 4 * 1024 * 1024  # 4MB
        for part in psutil.disk_partitions(all=False):
            usage = psutil.disk_usage(part.mountpoint)
            try:
                total, used, free, percent = df(part.device)
            except RuntimeError as err:
                # see:
                # https://travis-ci.org/giampaolo/psutil/jobs/138338464
                # https://travis-ci.org/giampaolo/psutil/jobs/138343361
                if "no such file or directory" in str(err).lower() or \
                        "raw devices not supported" in str(err).lower():
                    continue
                else:
                    raise
            else:
                self.assertAlmostEqual(usage.total, total, delta=tolerance)
                self.assertAlmostEqual(usage.used, used, delta=tolerance)
                self.assertAlmostEqual(usage.free, free, delta=tolerance)
                self.assertAlmostEqual(usage.percent, percent, delta=1)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_disks(self):
        # test psutil.disk_usage() and psutil.disk_partitions()
        # against "df -a"
        def df(path):
            out = sh('df -k "%s"' % path).strip()
            lines = out.split('\n')
            lines.pop(0)
            line = lines.pop(0)
            dev, total, used, free = line.split()[:4]
            if dev == 'none':
                dev = ''
            total = int(total) * 1024
            used = int(used) * 1024
            free = int(free) * 1024
            return dev, total, used, free

        for part in psutil.disk_partitions(all=False):
            usage = psutil.disk_usage(part.mountpoint)
            dev, total, used, free = df(part.mountpoint)
            self.assertEqual(part.device, dev)
            self.assertEqual(usage.total, total)
            # 10 MB tollerance
            if abs(usage.free - free) > 10 * 1024 * 1024:
                self.fail("psutil=%s, df=%s" % (usage.free, free))
            if abs(usage.used - used) > 10 * 1024 * 1024:
                self.fail("psutil=%s, df=%s" % (usage.used, used))
项目:JimV-N    作者:jamesiter    | 项目源码 | 文件源码
def update_disks(self):
        self.disks.clear()
        for disk in psutil.disk_partitions(all=False):
            disk_usage = psutil.disk_usage(disk.mountpoint)
            self.disks[disk.mountpoint] = {'device': disk.device, 'real_device': disk.device, 'fstype': disk.fstype,
                                           'opts': disk.opts, 'total': disk_usage.total, 'used': disk_usage.used,
                                           'free': disk_usage.free, 'percent': disk_usage.percent}

            if os.path.islink(disk.device):
                self.disks[disk.mountpoint]['real_device'] = os.path.realpath(disk.device)

    # ?????????????? ??? ???
项目:ahenk    作者:Pardus-LiderAhenk    | 项目源码 | 文件源码
def partitions():
                return psutil.disk_partitions()
项目:IoT_Car-Final_Project    作者:hanmolee    | 项目源码 | 文件源码
def getLocation():
    context = pyudev.Context()
    removable = [device for device in context.list_devices(subsystem='block', DEVTYPE='disk') if device.attributes.asstring('removable') == "1"]
    for device in removable:
        partitions = [device.device_node for device in context.list_devices(subsystem='block', DEVTYPE='partition', parent=device)]
        for p in psutil.disk_partitions():
            if p.device in partitions:
                mntpoint = p.mountpoint
                print(p.device + ' => ' + mntpoint)
                return mntpoint
项目:nixstatsagent    作者:NIXStats    | 项目源码 | 文件源码
def run(self, config):
        disk = {}
        disk['df-psutil'] = []
        for part in psutil.disk_partitions(False):
            if os.name == 'nt':
                if 'cdrom' in part.opts or part.fstype == '':
                    # skip cd-rom drives with no disk in it; they may raise
                    # ENOENT, pop-up a Windows GUI error for a non-ready
                    # partition or just hang.
                    continue
            try:
                usage = psutil.disk_usage(part.mountpoint)
                diskdata = {}
                diskdata['info'] = part
                for key in usage._fields:
                    diskdata[key] = getattr(usage, key)
                disk['df-psutil'].append(diskdata)
            except:
                pass

        try:
                force_df = config.get('diskusage', 'force_df')
        except:
                force_df = 'no'

        if len(disk['df-psutil']) == 0 or force_df == 'yes': 
            try:
            disk['df-psutil'] = []
                df_output_lines = [s.split() for s in os.popen("df -Pl").read().splitlines()] 
                del df_output_lines[0]
                for row in df_output_lines:
                    if row[0] == 'tmpfs':
                        continue
                    disk['df-psutil'].append({'info': [row[0], row[5],'',''], 'total': int(row[1])*1024, 'used': int(row[2])*1024, 'free': int(row[3])*1024, 'percent': row[4][:-1]}) 
            except:
                pass

        return disk
项目:maestro    作者:InWorldz    | 项目源码 | 文件源码
def getAttrRO(self):
        attrs = [
           'disk_partitions'
            ]
        return ServiceBase.getAttrRO() + attrs
项目:maestro    作者:InWorldz    | 项目源码 | 文件源码
def get_disk_partitions(self):
        return psutil.disk_partitions()
项目:skynet    作者:skynetera    | 项目源码 | 文件源码
def monitor(frist_invoke=1):
    value_dic = {
            'disk': {}
    }

    for part in psutil.disk_partitions(all=False):
        if os.name == 'nt':
            if 'cdrom' in part.opts or part.fstype == '':
                # skip cd-rom drives with no disk in it; they may raise
                # ENOENT, pop-up a Windows GUI error for a non-ready
                # partition or just hang.
                continue
        mount_point = part.mountpoint
        device =  part.device
        usage = psutil.disk_usage(part.mountpoint)
        value_dic['disk'][mount_point] = {
            'disk.device':device,
            'disk.mount_point':mount_point,
            'disk.total':usage.total/(1024*1024),
            'disk.used':usage.used/(1024*1024),
            'disk.free':usage.free/(1024*1024),
            'disk.percent': usage.percent,
            'disk.type':part.fstype,
        }

    return value_dic
项目:ops_agent    作者:sjqzhang    | 项目源码 | 文件源码
def check(self):
        data = {
            'disk.max_used_percent': 0,
            'disk.max_used_percent_mount': '',
            'disk.max_used': 0,
            'disk.max_used_mount': '',
            'disk.min_free': 0,
            'disk.min_free_mount': '',
            'disk.used_percent': 0,
            'disk.used': 0,
            'disk.free': 0,
            'disk.total': 0,
        }
        partitions = psutil.disk_partitions(all=True)
        for partition in partitions:
            if partition.opts.upper() in ('CDROM', 'REMOVABLE'):
                continue
            mountpoint = partition.mountpoint
            usage = psutil.disk_usage(mountpoint)
            if usage.percent > data['disk.max_used_percent']:
                data['disk.max_used_percent'] = usage.percent
                data['disk.max_used_percent_mount'] = mountpoint
            if usage.used > data['disk.max_used']:
                data['disk.max_used'] = usage.used
                data['disk.max_used_mount'] = mountpoint
            if usage.free < data['disk.min_free'] or data['disk.min_free'] == 0:
                data['disk.min_free'] = usage.free
                data['disk.min_free_mount'] = mountpoint
            data['disk.used'] += usage.used
            data['disk.free'] += usage.free
            data['disk.total'] += usage.total
        data['disk.used_percent'] = data['disk.used']*100 / data['disk.total'] if data['disk.total'] else 0
        data['disk.max_used'] = data['disk.max_used'] / 1024
        data['disk.min_free'] = data['disk.min_free'] / 1024
        data['disk.used'] = data['disk.used'] / 1024
        data['disk.free'] = data['disk.free'] / 1024
        data['disk.total'] = data['disk.total'] / 1024
        return data
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_disk_partitions(self):
        self.execute(psutil.disk_partitions)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_disks(self):
        # test psutil.disk_usage() and psutil.disk_partitions()
        # against "df -a"
        def df(path):
            out = sh('df -k "%s"' % path).strip()
            lines = out.split('\n')
            lines.pop(0)
            line = lines.pop(0)
            dev, total, used, free = line.split()[:4]
            if dev == 'none':
                dev = ''
            total = int(total) * 1024
            used = int(used) * 1024
            free = int(free) * 1024
            return dev, total, used, free

        for part in psutil.disk_partitions(all=False):
            usage = psutil.disk_usage(part.mountpoint)
            dev, total, used, free = df(part.mountpoint)
            self.assertEqual(part.device, dev)
            self.assertEqual(usage.total, total)
            # 10 MB tollerance
            if abs(usage.free - free) > 10 * 1024 * 1024:
                self.fail("psutil=%s, df=%s" % usage.free, free)
            if abs(usage.used - used) > 10 * 1024 * 1024:
                self.fail("psutil=%s, df=%s" % usage.used, used)

    # --- cpu
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_disks(self):
        ps_parts = psutil.disk_partitions(all=True)
        wmi_parts = wmi.WMI().Win32_LogicalDisk()
        for ps_part in ps_parts:
            for wmi_part in wmi_parts:
                if ps_part.device.replace('\\', '') == wmi_part.DeviceID:
                    if not ps_part.mountpoint:
                        # this is usually a CD-ROM with no disk inserted
                        break
                    try:
                        usage = psutil.disk_usage(ps_part.mountpoint)
                    except OSError as err:
                        if err.errno == errno.ENOENT:
                            # usually this is the floppy
                            break
                        else:
                            raise
                    self.assertEqual(usage.total, int(wmi_part.Size))
                    wmi_free = int(wmi_part.FreeSpace)
                    self.assertEqual(usage.free, wmi_free)
                    # 10 MB tollerance
                    if abs(usage.free - wmi_free) > 10 * 1024 * 1024:
                        self.fail("psutil=%s, wmi=%s" % (
                            usage.free, wmi_free))
                    break
            else:
                self.fail("can't find partition %s" % repr(ps_part))
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_disk_usage(self):
        def df(device):
            out = sh("df -k %s" % device).strip()
            line = out.split('\n')[1]
            fields = line.split()
            total = int(fields[1]) * 1024
            used = int(fields[2]) * 1024
            free = int(fields[3]) * 1024
            percent = float(fields[4].replace('%', ''))
            return (total, used, free, percent)

        tolerance = 4 * 1024 * 1024  # 4MB
        for part in psutil.disk_partitions(all=False):
            usage = psutil.disk_usage(part.mountpoint)
            try:
                total, used, free, percent = df(part.device)
            except RuntimeError as err:
                # see:
                # https://travis-ci.org/giampaolo/psutil/jobs/138338464
                # https://travis-ci.org/giampaolo/psutil/jobs/138343361
                if "no such file or directory" in str(err).lower() or \
                        "raw devices not supported" in str(err).lower():
                    continue
                else:
                    raise
            else:
                self.assertAlmostEqual(usage.total, total, delta=tolerance)
                self.assertAlmostEqual(usage.used, used, delta=tolerance)
                self.assertAlmostEqual(usage.free, free, delta=tolerance)
                self.assertAlmostEqual(usage.percent, percent, delta=1)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_disks(self):
        # test psutil.disk_usage() and psutil.disk_partitions()
        # against "df -a"
        def df(path):
            out = sh('df -k "%s"' % path).strip()
            lines = out.split('\n')
            lines.pop(0)
            line = lines.pop(0)
            dev, total, used, free = line.split()[:4]
            if dev == 'none':
                dev = ''
            total = int(total) * 1024
            used = int(used) * 1024
            free = int(free) * 1024
            return dev, total, used, free

        for part in psutil.disk_partitions(all=False):
            usage = psutil.disk_usage(part.mountpoint)
            dev, total, used, free = df(part.mountpoint)
            self.assertEqual(part.device, dev)
            self.assertEqual(usage.total, total)
            # 10 MB tollerance
            if abs(usage.free - free) > 10 * 1024 * 1024:
                self.fail("psutil=%s, df=%s" % (usage.free, free))
            if abs(usage.used - used) > 10 * 1024 * 1024:
                self.fail("psutil=%s, df=%s" % (usage.used, used))
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_disk_partitions(self):
        self.execute(psutil.disk_partitions)
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_disks(self):
        # test psutil.disk_usage() and psutil.disk_partitions()
        # against "df -a"
        def df(path):
            out = sh('df -k "%s"' % path).strip()
            lines = out.split('\n')
            lines.pop(0)
            line = lines.pop(0)
            dev, total, used, free = line.split()[:4]
            if dev == 'none':
                dev = ''
            total = int(total) * 1024
            used = int(used) * 1024
            free = int(free) * 1024
            return dev, total, used, free

        for part in psutil.disk_partitions(all=False):
            usage = psutil.disk_usage(part.mountpoint)
            dev, total, used, free = df(part.mountpoint)
            self.assertEqual(part.device, dev)
            self.assertEqual(usage.total, total)
            # 10 MB tollerance
            if abs(usage.free - free) > 10 * 1024 * 1024:
                self.fail("psutil=%s, df=%s" % usage.free, free)
            if abs(usage.used - used) > 10 * 1024 * 1024:
                self.fail("psutil=%s, df=%s" % usage.used, used)

    # --- cpu
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_disks(self):
        ps_parts = psutil.disk_partitions(all=True)
        wmi_parts = wmi.WMI().Win32_LogicalDisk()
        for ps_part in ps_parts:
            for wmi_part in wmi_parts:
                if ps_part.device.replace('\\', '') == wmi_part.DeviceID:
                    if not ps_part.mountpoint:
                        # this is usually a CD-ROM with no disk inserted
                        break
                    try:
                        usage = psutil.disk_usage(ps_part.mountpoint)
                    except OSError as err:
                        if err.errno == errno.ENOENT:
                            # usually this is the floppy
                            break
                        else:
                            raise
                    self.assertEqual(usage.total, int(wmi_part.Size))
                    wmi_free = int(wmi_part.FreeSpace)
                    self.assertEqual(usage.free, wmi_free)
                    # 10 MB tollerance
                    if abs(usage.free - wmi_free) > 10 * 1024 * 1024:
                        self.fail("psutil=%s, wmi=%s" % (
                            usage.free, wmi_free))
                    break
            else:
                self.fail("can't find partition %s" % repr(ps_part))
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_disk_usage(self):
        for disk in psutil.disk_partitions():
            sys_value = win32api.GetDiskFreeSpaceEx(disk.mountpoint)
            psutil_value = psutil.disk_usage(disk.mountpoint)
            self.assertAlmostEqual(sys_value[0], psutil_value.free,
                                   delta=1024 * 1024)
            self.assertAlmostEqual(sys_value[1], psutil_value.total,
                                   delta=1024 * 1024)
            self.assertEqual(psutil_value.used,
                             psutil_value.total - psutil_value.free)
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_disk_partitions(self):
        sys_value = [
            x + '\\' for x in win32api.GetLogicalDriveStrings().split("\\\x00")
            if x and not x.startswith('A:')]
        psutil_value = [x.mountpoint for x in psutil.disk_partitions(all=True)]
        self.assertEqual(sys_value, psutil_value)
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_disk_usage(self):
        def df(device):
            out = sh("df -k %s" % device).strip()
            line = out.split('\n')[1]
            fields = line.split()
            total = int(fields[1]) * 1024
            used = int(fields[2]) * 1024
            free = int(fields[3]) * 1024
            percent = float(fields[4].replace('%', ''))
            return (total, used, free, percent)

        tolerance = 4 * 1024 * 1024  # 4MB
        for part in psutil.disk_partitions(all=False):
            usage = psutil.disk_usage(part.mountpoint)
            try:
                total, used, free, percent = df(part.device)
            except RuntimeError as err:
                # see:
                # https://travis-ci.org/giampaolo/psutil/jobs/138338464
                # https://travis-ci.org/giampaolo/psutil/jobs/138343361
                if "no such file or directory" in str(err).lower() or \
                        "raw devices not supported" in str(err).lower():
                    continue
                else:
                    raise
            else:
                self.assertAlmostEqual(usage.total, total, delta=tolerance)
                self.assertAlmostEqual(usage.used, used, delta=tolerance)
                self.assertAlmostEqual(usage.free, free, delta=tolerance)
                self.assertAlmostEqual(usage.percent, percent, delta=1)