Python psutil 模块,net_if_stats() 实例源码

我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用psutil.net_if_stats()

项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_procfs_path(self):
        tdir = tempfile.mkdtemp()
        try:
            psutil.PROCFS_PATH = tdir
            self.assertRaises(IOError, psutil.virtual_memory)
            self.assertRaises(IOError, psutil.cpu_times)
            self.assertRaises(IOError, psutil.cpu_times, percpu=True)
            self.assertRaises(IOError, psutil.boot_time)
            # self.assertRaises(IOError, psutil.pids)
            self.assertRaises(IOError, psutil.net_connections)
            self.assertRaises(IOError, psutil.net_io_counters)
            self.assertRaises(IOError, psutil.net_if_stats)
            self.assertRaises(IOError, psutil.disk_io_counters)
            self.assertRaises(IOError, psutil.disk_partitions)
            self.assertRaises(psutil.NoSuchProcess, psutil.Process)
        finally:
            psutil.PROCFS_PATH = "/proc"
            os.rmdir(tdir)
项目:matlab_ctp_md_td    作者:xunquant    | 项目源码 | 文件源码
def get_active_nic_name():
    '''?????????????'''
    all_nic=psutil.net_if_stats()#??????????
    nic_names=all_nic.keys()

    try:
        all_nic.pop('docker0')
    except:
        pass

    try:
        all_nic.pop('lo')
    except:
        pass

    return all_nic
    pass
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_procfs_path(self):
        tdir = tempfile.mkdtemp()
        try:
            psutil.PROCFS_PATH = tdir
            self.assertRaises(IOError, psutil.virtual_memory)
            self.assertRaises(IOError, psutil.cpu_times)
            self.assertRaises(IOError, psutil.cpu_times, percpu=True)
            self.assertRaises(IOError, psutil.boot_time)
            # self.assertRaises(IOError, psutil.pids)
            self.assertRaises(IOError, psutil.net_connections)
            self.assertRaises(IOError, psutil.net_io_counters)
            self.assertRaises(IOError, psutil.net_if_stats)
            self.assertRaises(IOError, psutil.disk_io_counters)
            self.assertRaises(IOError, psutil.disk_partitions)
            self.assertRaises(psutil.NoSuchProcess, psutil.Process)
        finally:
            psutil.PROCFS_PATH = "/proc"
            os.rmdir(tdir)
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_procfs_path(self):
        tdir = tempfile.mkdtemp()
        try:
            psutil.PROCFS_PATH = tdir
            self.assertRaises(IOError, psutil.virtual_memory)
            self.assertRaises(IOError, psutil.cpu_times)
            self.assertRaises(IOError, psutil.cpu_times, percpu=True)
            self.assertRaises(IOError, psutil.boot_time)
            # self.assertRaises(IOError, psutil.pids)
            self.assertRaises(IOError, psutil.net_connections)
            self.assertRaises(IOError, psutil.net_io_counters)
            self.assertRaises(IOError, psutil.net_if_stats)
            self.assertRaises(IOError, psutil.disk_io_counters)
            self.assertRaises(IOError, psutil.disk_partitions)
            self.assertRaises(psutil.NoSuchProcess, psutil.Process)
        finally:
            psutil.PROCFS_PATH = "/proc"
            os.rmdir(tdir)
项目:aetros-cli    作者:aetros    | 项目源码 | 文件源码
def collect_system_information(self):
        values = {}
        mem = psutil.virtual_memory()
        values['memory_total'] = mem.total

        import cpuinfo
        cpu = cpuinfo.get_cpu_info()
        values['resources_limit'] = self.resources_limit
        values['cpu_name'] = cpu['brand']
        values['cpu'] = [cpu['hz_advertised_raw'][0], cpu['count']]
        values['nets'] = {}
        values['disks'] = {}
        values['gpus'] = {}
        values['boot_time'] = psutil.boot_time()

        try:
            for gpu_id, gpu in enumerate(aetros.cuda_gpu.get_ordered_devices()):
                gpu['available'] = gpu_id in self.enabled_gpus

                values['gpus'][gpu_id] = gpu
        except Exception: pass

        for disk in psutil.disk_partitions():
            try:
                name = self.get_disk_name(disk[1])
                values['disks'][name] = psutil.disk_usage(disk[1]).total
            except Exception:
                # suppress Operation not permitted
                pass

        try:
            for id, net in psutil.net_if_stats().items():
                if 0 != id.find('lo') and net.isup:
                    self.nets.append(id)
                    values['nets'][id] = net.speed or 1000
        except Exception:
            # suppress Operation not permitted
            pass

        return values
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_net_if_stats(self):
        self.execute(psutil.net_if_stats)

    # --- others
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_net_if_stats(self):
        for name, stats in psutil.net_if_stats().items():
            try:
                out = sh("ifconfig %s" % name)
            except RuntimeError:
                pass
            else:
                self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
                self.assertEqual(stats.mtu,
                                 int(re.findall('MTU:(\d+)', out)[0]))
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_net_if_stats(self):
        for name, stats in psutil.net_if_stats().items():
            try:
                out = sh("ifconfig %s" % name)
            except RuntimeError:
                pass
            else:
                self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
                self.assertEqual(stats.mtu,
                                 int(re.findall('mtu (\d+)', out)[0]))
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_net_if_stats(self):
        for name, stats in psutil.net_if_stats().items():
            try:
                out = sh("ifconfig %s" % name)
            except RuntimeError:
                pass
            else:
                self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
                self.assertEqual(stats.mtu,
                                 int(re.findall('mtu (\d+)', out)[0]))


# =====================================================================
# --- FreeBSD
# =====================================================================
项目:wptagent    作者:WPO-Foundation    | 项目源码 | 文件源码
def find_default_interface(self):
        """Look through the list of interfaces for the non-loopback interface"""
        import psutil
        try:
            if self.interfaces is None:
                self.interfaces = {}
                # Look to see which interfaces are up
                stats = psutil.net_if_stats()
                for interface in stats:
                    if interface != 'lo' and interface[:3] != 'ifb' and stats[interface].isup:
                        self.interfaces[interface] = {'packets': 0}
                if len(self.interfaces) > 1:
                    # See which interfaces have received data
                    cnt = psutil.net_io_counters(True)
                    for interface in cnt:
                        if interface in self.interfaces:
                            self.interfaces[interface]['packets'] = \
                                cnt[interface].packets_sent + cnt[interface].packets_recv
                    remove = []
                    for interface in self.interfaces:
                        if self.interfaces[interface]['packets'] == 0:
                            remove.append(interface)
                    if len(remove):
                        for interface in remove:
                            del self.interfaces[interface]
                if len(self.interfaces) > 1:
                    # Eliminate any with the loopback address
                    remove = []
                    addresses = psutil.net_if_addrs()
                    for interface in addresses:
                        if interface in self.interfaces:
                            for address in addresses[interface]:
                                if address.address == '127.0.0.1':
                                    remove.append(interface)
                                    break
                    if len(remove):
                        for interface in remove:
                            del self.interfaces[interface]
        except Exception:
            pass
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_net_if_stats(self):
        self.execute(psutil.net_if_stats)

    # --- sensors
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_net_if_stats(self):
        for name, stats in psutil.net_if_stats().items():
            try:
                out = sh("ifconfig %s" % name)
            except RuntimeError:
                pass
            else:
                # Not always reliable.
                # self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
                self.assertEqual(stats.mtu,
                                 int(re.findall('MTU:(\d+)', out)[0]))
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_net_if_stats(self):
        for name, stats in psutil.net_if_stats().items():
            try:
                out = sh("ifconfig %s" % name)
            except RuntimeError:
                pass
            else:
                self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
                self.assertEqual(stats.mtu,
                                 int(re.findall('mtu (\d+)', out)[0]))
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_net_if_stats(self):
        for name, stats in psutil.net_if_stats().items():
            try:
                out = sh("ifconfig %s" % name)
            except RuntimeError:
                pass
            else:
                self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
                self.assertEqual(stats.mtu,
                                 int(re.findall('mtu (\d+)', out)[0]))


# =====================================================================
# --- FreeBSD
# =====================================================================
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_net_if_stats(self):
        self.execute(psutil.net_if_stats)

    # --- sensors
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_net_if_stats(self):
        for name, stats in psutil.net_if_stats().items():
            try:
                out = sh("ifconfig %s" % name)
            except RuntimeError:
                pass
            else:
                # Not always reliable.
                # self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
                self.assertEqual(stats.mtu,
                                 int(re.findall('MTU:(\d+)', out)[0]))
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_net_if_stats(self):
        for name, stats in psutil.net_if_stats().items():
            try:
                out = sh("ifconfig %s" % name)
            except RuntimeError:
                pass
            else:
                self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
                self.assertEqual(stats.mtu,
                                 int(re.findall('mtu (\d+)', out)[0]))
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_net_if_stats(self):
        for name, stats in psutil.net_if_stats().items():
            try:
                out = sh("ifconfig %s" % name)
            except RuntimeError:
                pass
            else:
                self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
                self.assertEqual(stats.mtu,
                                 int(re.findall('mtu (\d+)', out)[0]))


# =====================================================================
# --- FreeBSD
# =====================================================================
项目:LinuxBashShellScriptForOps    作者:DingGuodong    | 项目源码 | 文件源码
def main():
    stats = psutil.net_if_stats()
    io_counters = psutil.net_io_counters(pernic=True)
    for nic, addrs in psutil.net_if_addrs().items():
        print("%s:" % (nic))
        if nic in stats:
            st = stats[nic]
            print("    stats          : ", end='')
            print("speed=%sMB, duplex=%s, mtu=%s, up=%s" % (
                st.speed, duplex_map[st.duplex], st.mtu,
                "yes" if st.isup else "no"))
        if nic in io_counters:
            io = io_counters[nic]
            print("    incoming       : ", end='')
            print("bytes=%s, pkts=%s, errs=%s, drops=%s" % (
                io.bytes_recv, io.packets_recv, io.errin, io.dropin))
            print("    outgoing       : ", end='')
            print("bytes=%s, pkts=%s, errs=%s, drops=%s" % (
                io.bytes_sent, io.packets_sent, io.errout, io.dropout))
        for addr in addrs:
            print("    %-4s" % af_map.get(addr.family, addr.family), end="")
            print(" address   : %s" % addr.address)
            if addr.broadcast:
                print("         broadcast : %s" % addr.broadcast)
            if addr.netmask:
                print("         netmask   : %s" % addr.netmask)
            if addr.ptp:
                print("      p2p       : %s" % addr.ptp)
        print("")
项目:lain    作者:laincloud    | 项目源码 | 文件源码
def _get_cali_veth_stat(self):
        '''
        Check the status of all network interfaces.
        Value is 1 if any one of them is DOWN
        '''
        cali_veth_up = 0
        cali_veth_down = 0
        cali_veth_total = 0
        tmp_veth_up = 0
        tmp_veth_down = 0
        tmp_veth_total = 0
        for name, stat in psutil.net_if_stats().iteritems():
            if name.startswith('cali'):
                cali_veth_total += 1
                if stat.isup:
                    cali_veth_up += 1
                else:
                    cali_veth_down += 1
            elif name.startswith('tmp'):
                tmp_veth_total += 1
                if stat.isup:
                    tmp_veth_up += 1
                else:
                    tmp_veth_down += 1
        self._result.append(
            GraphiteData("lain.cluster.calico.veth.cali.up",
                         self._endpoint, cali_veth_up, self._step, "val"))
        self._result.append(
            GraphiteData("lain.cluster.calico.veth.cali.down",
                         self._endpoint, cali_veth_down, self._step, "val"))
        self._result.append(
            GraphiteData("lain.cluster.calico.veth.cali.total",
                         self._endpoint, cali_veth_total, self._step, "val"))
        self._result.append(
            GraphiteData("lain.cluster.calico.veth.tmp.up",
                         self._endpoint, tmp_veth_up, self._step, "val"))
        self._result.append(
            GraphiteData("lain.cluster.calico.veth.tmp.down",
                         self._endpoint, tmp_veth_down, self._step, "val"))
        self._result.append(
            GraphiteData("lain.cluster.calico.veth.tmp.total",
                         self._endpoint, tmp_veth_total, self._step, "val"))
项目:ops_agent    作者:sjqzhang    | 项目源码 | 文件源码
def get_eth_info(self):
        """
        ?????
        1. ??bond???bond0
        2. ???????eth0:1 
        3. ?ip???eth0(1.1.1.1,2.2.2.2)
        """
        data = []
        addrs = psutil.net_if_addrs()
        stats = psutil.net_if_stats()
        for name, entries in addrs.iteritems():
            eth = {
                'name': name, 
                'mac': '00:00:00:00:00:00', 
                'ip': '', 
                'mask': '', 
                'broadcast': '',
                'status': 'Unknown',
                'speed': 0
            }
            try: # windows???
                eth['name'] = name.decode('gbk')
            except:
                eth['name'] = name
            if name in stats:
                eth['status'] = 'Active' if int(stats[name].isup) else 'Inactive'
                eth['speed'] = stats[name].speed
            for entry in entries:
                if entry.family == psutil.AF_LINK:
                    eth['mac'] = entry.address.upper()
                elif entry.family == socket.AF_INET:
                    if eth['ip']:# ??????ip????????????
                        data.append(copy.deepcopy(eth))
                    eth['ip'] = entry.address
                    eth['mask'] = entry.netmask
                    eth['broadcast'] = entry.broadcast
                else: # ????IPV6
                    continue
            data.append(eth)
        # ???????????????mac?????00:00:00:00:00:00????????????":"
        for item in data:
            # ??????
            if ':' not in item['name']:
                continue
            # ???mac?????????????????????":"????
            if item['mac'] != '00:00:00:00:00:00':
                continue
            name = item['name'].rsplit(':', 1)[0]
            for iitem in data:
                if name == iitem['name']:
                    item['mac'] = iitem['mac']
                    break
        data.sort(key=lambda x: x['name'])
        return data
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_net_if_addrs(self):
        nics = psutil.net_if_addrs()
        assert nics, nics

        nic_stats = psutil.net_if_stats()

        # Not reliable on all platforms (net_if_addrs() reports more
        # interfaces).
        # self.assertEqual(sorted(nics.keys()),
        #                  sorted(psutil.net_io_counters(pernic=True).keys()))

        families = set([socket.AF_INET, AF_INET6, psutil.AF_LINK])
        for nic, addrs in nics.items():
            self.assertIsInstance(nic, (str, unicode))
            self.assertEqual(len(set(addrs)), len(addrs))
            for addr in addrs:
                self.assertIsInstance(addr.family, int)
                self.assertIsInstance(addr.address, str)
                self.assertIsInstance(addr.netmask, (str, type(None)))
                self.assertIsInstance(addr.broadcast, (str, type(None)))
                self.assertIn(addr.family, families)
                if sys.version_info >= (3, 4):
                    self.assertIsInstance(addr.family, enum.IntEnum)
                if nic_stats[nic].isup:
                    # Do not test binding to addresses of interfaces
                    # that are down
                    if addr.family == socket.AF_INET:
                        s = socket.socket(addr.family)
                        with contextlib.closing(s):
                            s.bind((addr.address, 0))
                    elif addr.family == socket.AF_INET6:
                        info = socket.getaddrinfo(
                            addr.address, 0, socket.AF_INET6,
                            socket.SOCK_STREAM, 0, socket.AI_PASSIVE)[0]
                        af, socktype, proto, canonname, sa = info
                        s = socket.socket(af, socktype, proto)
                        with contextlib.closing(s):
                            s.bind(sa)
                for ip in (addr.address, addr.netmask, addr.broadcast,
                           addr.ptp):
                    if ip is not None:
                        # TODO: skip AF_INET6 for now because I get:
                        # AddressValueError: Only hex digits permitted in
                        # u'c6f3%lxcbr0' in u'fe80::c8e0:fff:fe54:c6f3%lxcbr0'
                        if addr.family != AF_INET6:
                            check_net_address(ip, addr.family)
                # broadcast and ptp addresses are mutually exclusive
                if addr.broadcast:
                    self.assertIsNone(addr.ptp)
                elif addr.ptp:
                    self.assertIsNone(addr.broadcast)

        if BSD or OSX or SUNOS:
            if hasattr(socket, "AF_LINK"):
                self.assertEqual(psutil.AF_LINK, socket.AF_LINK)
        elif LINUX:
            self.assertEqual(psutil.AF_LINK, socket.AF_PACKET)
        elif WINDOWS:
            self.assertEqual(psutil.AF_LINK, -1)
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_net_if_addrs(self):
        nics = psutil.net_if_addrs()
        assert nics, nics

        nic_stats = psutil.net_if_stats()

        # Not reliable on all platforms (net_if_addrs() reports more
        # interfaces).
        # self.assertEqual(sorted(nics.keys()),
        #                  sorted(psutil.net_io_counters(pernic=True).keys()))

        families = set([socket.AF_INET, AF_INET6, psutil.AF_LINK])
        for nic, addrs in nics.items():
            self.assertIsInstance(nic, (str, unicode))
            self.assertEqual(len(set(addrs)), len(addrs))
            for addr in addrs:
                self.assertIsInstance(addr.family, int)
                self.assertIsInstance(addr.address, str)
                self.assertIsInstance(addr.netmask, (str, type(None)))
                self.assertIsInstance(addr.broadcast, (str, type(None)))
                self.assertIn(addr.family, families)
                if sys.version_info >= (3, 4):
                    self.assertIsInstance(addr.family, enum.IntEnum)
                if nic_stats[nic].isup:
                    # Do not test binding to addresses of interfaces
                    # that are down
                    if addr.family == socket.AF_INET:
                        s = socket.socket(addr.family)
                        with contextlib.closing(s):
                            s.bind((addr.address, 0))
                    elif addr.family == socket.AF_INET6:
                        info = socket.getaddrinfo(
                            addr.address, 0, socket.AF_INET6,
                            socket.SOCK_STREAM, 0, socket.AI_PASSIVE)[0]
                        af, socktype, proto, canonname, sa = info
                        s = socket.socket(af, socktype, proto)
                        with contextlib.closing(s):
                            s.bind(sa)
                for ip in (addr.address, addr.netmask, addr.broadcast,
                           addr.ptp):
                    if ip is not None:
                        # TODO: skip AF_INET6 for now because I get:
                        # AddressValueError: Only hex digits permitted in
                        # u'c6f3%lxcbr0' in u'fe80::c8e0:fff:fe54:c6f3%lxcbr0'
                        if addr.family != AF_INET6:
                            check_net_address(ip, addr.family)
                # broadcast and ptp addresses are mutually exclusive
                if addr.broadcast:
                    self.assertIsNone(addr.ptp)
                elif addr.ptp:
                    self.assertIsNone(addr.broadcast)

        if BSD or OSX or SUNOS:
            if hasattr(socket, "AF_LINK"):
                self.assertEqual(psutil.AF_LINK, socket.AF_LINK)
        elif LINUX:
            self.assertEqual(psutil.AF_LINK, socket.AF_PACKET)
        elif WINDOWS:
            self.assertEqual(psutil.AF_LINK, -1)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_net_if_addrs(self):
        nics = psutil.net_if_addrs()
        assert nics, nics

        nic_stats = psutil.net_if_stats()

        # Not reliable on all platforms (net_if_addrs() reports more
        # interfaces).
        # self.assertEqual(sorted(nics.keys()),
        #                  sorted(psutil.net_io_counters(pernic=True).keys()))

        families = set([socket.AF_INET, AF_INET6, psutil.AF_LINK])
        for nic, addrs in nics.items():
            self.assertEqual(len(set(addrs)), len(addrs))
            for addr in addrs:
                self.assertIsInstance(addr.family, int)
                self.assertIsInstance(addr.address, str)
                self.assertIsInstance(addr.netmask, (str, type(None)))
                self.assertIsInstance(addr.broadcast, (str, type(None)))
                self.assertIn(addr.family, families)
                if sys.version_info >= (3, 4):
                    self.assertIsInstance(addr.family, enum.IntEnum)
                if nic_stats[nic].isup:
                    # Do not test binding to addresses of interfaces
                    # that are down
                    if addr.family == socket.AF_INET:
                        s = socket.socket(addr.family)
                        with contextlib.closing(s):
                            s.bind((addr.address, 0))
                    elif addr.family == socket.AF_INET6:
                        info = socket.getaddrinfo(
                            addr.address, 0, socket.AF_INET6,
                            socket.SOCK_STREAM, 0, socket.AI_PASSIVE)[0]
                        af, socktype, proto, canonname, sa = info
                        s = socket.socket(af, socktype, proto)
                        with contextlib.closing(s):
                            s.bind(sa)
                for ip in (addr.address, addr.netmask, addr.broadcast,
                           addr.ptp):
                    if ip is not None:
                        # TODO: skip AF_INET6 for now because I get:
                        # AddressValueError: Only hex digits permitted in
                        # u'c6f3%lxcbr0' in u'fe80::c8e0:fff:fe54:c6f3%lxcbr0'
                        if addr.family != AF_INET6:
                            check_net_address(ip, addr.family)
                # broadcast and ptp addresses are mutually exclusive
                if addr.broadcast:
                    self.assertIsNone(addr.ptp)
                elif addr.ptp:
                    self.assertIsNone(addr.broadcast)

        if BSD or OSX or SUNOS:
            if hasattr(socket, "AF_LINK"):
                self.assertEqual(psutil.AF_LINK, socket.AF_LINK)
        elif LINUX:
            self.assertEqual(psutil.AF_LINK, socket.AF_PACKET)
        elif WINDOWS:
            self.assertEqual(psutil.AF_LINK, -1)