Python psutil 模块,net_io_counters() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用psutil.net_io_counters()

项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_procfs_path(self):
        tdir = tempfile.mkdtemp()
        try:
            psutil.PROCFS_PATH = tdir
            self.assertRaises(IOError, psutil.virtual_memory)
            self.assertRaises(IOError, psutil.cpu_times)
            self.assertRaises(IOError, psutil.cpu_times, percpu=True)
            self.assertRaises(IOError, psutil.boot_time)
            # self.assertRaises(IOError, psutil.pids)
            self.assertRaises(IOError, psutil.net_connections)
            self.assertRaises(IOError, psutil.net_io_counters)
            self.assertRaises(IOError, psutil.net_if_stats)
            self.assertRaises(IOError, psutil.disk_io_counters)
            self.assertRaises(IOError, psutil.disk_partitions)
            self.assertRaises(psutil.NoSuchProcess, psutil.Process)
        finally:
            psutil.PROCFS_PATH = "/proc"
            os.rmdir(tdir)
项目:RTask    作者:HatBoy    | 项目源码 | 文件源码
def get_network():
    network = psutil.net_io_counters(pernic=True)
    ifaces = psutil.net_if_addrs()
    networks = list()
    for k, v in ifaces.items():
        ip = v[0].address
        data = network[k]
        ifnet = dict()
        ifnet['ip'] = ip
        ifnet['iface'] = k
        ifnet['sent'] = '%.2fMB' % (data.bytes_sent/1024/1024)
        ifnet['recv'] = '%.2fMB' % (data.bytes_recv/1024/1024)
        ifnet['packets_sent'] = data.packets_sent
        ifnet['packets_recv'] = data.packets_recv
        ifnet['errin'] = data.errin
        ifnet['errout'] = data.errout
        ifnet['dropin'] = data.dropin
        ifnet['dropout'] = data.dropout
        networks.append(ifnet)
    return networks
项目:docker-box    作者:MicroPyramid    | 项目源码 | 文件源码
def stream_host_stats():
    while True:
        net = psutil.net_io_counters(pernic=True)
        time.sleep(1)
        net1 = psutil.net_io_counters(pernic=True)
        net_stat_download = {}
        net_stat_upload = {}
        for k, v in net.items():
            for k1, v1 in net1.items():
                if k1 == k:
                    net_stat_download[k] = (v1.bytes_recv - v.bytes_recv) / 1000.
                    net_stat_upload[k] = (v1.bytes_sent - v.bytes_sent) / 1000.
        ds = statvfs('/')
        disk_str = {"Used": ((ds.f_blocks - ds.f_bfree) * ds.f_frsize) / 10 ** 9, "Unused": (ds.f_bavail * ds.f_frsize) / 10 ** 9}
        yield '[{"cpu":"%s","memory":"%s","memTotal":"%s","net_stats_down":"%s","net_stats_up":"%s","disk":"%s"}],' \
              % (psutil.cpu_percent(interval=1), psutil.virtual_memory().used, psutil.virtual_memory().free, \
                 net_stat_download, net_stat_upload, disk_str)
项目:Prism    作者:Stumblinbear    | 项目源码 | 文件源码
def get_network(self):
        usage = 0

        current_network = psutil.net_io_counters()[0]

        if self._previous_network == 0:
            # Check, wait a second, then check again to get a base value.
            self._previous_network = current_network
            time.sleep(2)
            current_network = psutil.net_io_counters()[0]

        time_since = time.time() - self._last_check
        usage = (current_network - self._previous_network)

        self._previous_network = current_network
        self._last_check = time.time()

        return usage
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_serialization(self):
        def check(ret):
            if json is not None:
                json.loads(json.dumps(ret))
            a = pickle.dumps(ret)
            b = pickle.loads(a)
            self.assertEqual(ret, b)

        check(psutil.Process().as_dict())
        check(psutil.virtual_memory())
        check(psutil.swap_memory())
        check(psutil.cpu_times())
        check(psutil.cpu_times_percent(interval=0))
        check(psutil.net_io_counters())
        if LINUX and not os.path.exists('/proc/diskstats'):
            pass
        else:
            if not APPVEYOR:
                check(psutil.disk_io_counters())
        check(psutil.disk_partitions())
        check(psutil.disk_usage(os.getcwd()))
        check(psutil.users())
项目:hapi    作者:mayaculpa    | 项目源码 | 文件源码
def update(self):
        """Function to update the entire class information."""
        self.cpu["percentage"] = psutil.cpu_percent(interval=0.7)
        self.boot = datetime.datetime.fromtimestamp(psutil.boot_time()).strftime(
            "%Y-%m-%d %H:%M:%S")
        virtual_memory = psutil.virtual_memory()
        self.memory["used"] = virtual_memory.used
        self.memory["free"] = virtual_memory.free
        self.memory["cached"] = virtual_memory.cached
        net_io_counters = psutil.net_io_counters()
        self.network["packet_sent"] = net_io_counters.packets_sent
        self.network["packet_recv"] = net_io_counters.packets_recv
        disk_usage = psutil.disk_usage('/')
        self.disk["total"] = int(disk_usage.total/1024)
        self.disk["used"] = int(disk_usage.used/1024)
        self.disk["free"] = int(disk_usage.free/1024)
        self.timestamp = time.time()
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_procfs_path(self):
        tdir = tempfile.mkdtemp()
        try:
            psutil.PROCFS_PATH = tdir
            self.assertRaises(IOError, psutil.virtual_memory)
            self.assertRaises(IOError, psutil.cpu_times)
            self.assertRaises(IOError, psutil.cpu_times, percpu=True)
            self.assertRaises(IOError, psutil.boot_time)
            # self.assertRaises(IOError, psutil.pids)
            self.assertRaises(IOError, psutil.net_connections)
            self.assertRaises(IOError, psutil.net_io_counters)
            self.assertRaises(IOError, psutil.net_if_stats)
            self.assertRaises(IOError, psutil.disk_io_counters)
            self.assertRaises(IOError, psutil.disk_partitions)
            self.assertRaises(psutil.NoSuchProcess, psutil.Process)
        finally:
            psutil.PROCFS_PATH = "/proc"
            os.rmdir(tdir)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_serialization(self):
        def check(ret):
            if json is not None:
                json.loads(json.dumps(ret))
            a = pickle.dumps(ret)
            b = pickle.loads(a)
            self.assertEqual(ret, b)

        check(psutil.Process().as_dict())
        check(psutil.virtual_memory())
        check(psutil.swap_memory())
        check(psutil.cpu_times())
        check(psutil.cpu_times_percent(interval=0))
        check(psutil.net_io_counters())
        if LINUX and not os.path.exists('/proc/diskstats'):
            pass
        else:
            if not APPVEYOR:
                check(psutil.disk_io_counters())
        check(psutil.disk_partitions())
        check(psutil.disk_usage(os.getcwd()))
        check(psutil.users())
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_nic_names(self):
        p = subprocess.Popen("ifconfig -a", shell=1, stdout=subprocess.PIPE)
        output = p.communicate()[0].strip()
        if p.returncode != 0:
            raise unittest.SkipTest('ifconfig returned no output')
        if PY3:
            output = str(output, sys.stdout.encoding)
        for nic in psutil.net_io_counters(pernic=True).keys():
            for line in output.split():
                if line.startswith(nic):
                    break
            else:
                self.fail(
                    "couldn't find %s nic in 'ifconfig -a' output\n%s" % (
                        nic, output))

    # can't find users on APPVEYOR or TRAVIS
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_procfs_path(self):
        tdir = tempfile.mkdtemp()
        try:
            psutil.PROCFS_PATH = tdir
            self.assertRaises(IOError, psutil.virtual_memory)
            self.assertRaises(IOError, psutil.cpu_times)
            self.assertRaises(IOError, psutil.cpu_times, percpu=True)
            self.assertRaises(IOError, psutil.boot_time)
            # self.assertRaises(IOError, psutil.pids)
            self.assertRaises(IOError, psutil.net_connections)
            self.assertRaises(IOError, psutil.net_io_counters)
            self.assertRaises(IOError, psutil.net_if_stats)
            self.assertRaises(IOError, psutil.disk_io_counters)
            self.assertRaises(IOError, psutil.disk_partitions)
            self.assertRaises(psutil.NoSuchProcess, psutil.Process)
        finally:
            psutil.PROCFS_PATH = "/proc"
            os.rmdir(tdir)
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_serialization(self):
        def check(ret):
            if json is not None:
                json.loads(json.dumps(ret))
            a = pickle.dumps(ret)
            b = pickle.loads(a)
            self.assertEqual(ret, b)

        check(psutil.Process().as_dict())
        check(psutil.virtual_memory())
        check(psutil.swap_memory())
        check(psutil.cpu_times())
        check(psutil.cpu_times_percent(interval=0))
        check(psutil.net_io_counters())
        if LINUX and not os.path.exists('/proc/diskstats'):
            pass
        else:
            if not APPVEYOR:
                check(psutil.disk_io_counters())
        check(psutil.disk_partitions())
        check(psutil.disk_usage(os.getcwd()))
        check(psutil.users())
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_nic_names(self):
        p = subprocess.Popen("ifconfig -a", shell=1, stdout=subprocess.PIPE)
        output = p.communicate()[0].strip()
        if p.returncode != 0:
            raise unittest.SkipTest('ifconfig returned no output')
        if PY3:
            output = str(output, sys.stdout.encoding)
        for nic in psutil.net_io_counters(pernic=True).keys():
            for line in output.split():
                if line.startswith(nic):
                    break
            else:
                self.fail(
                    "couldn't find %s nic in 'ifconfig -a' output\n%s" % (
                        nic, output))

    # can't find users on APPVEYOR or TRAVIS
项目:docklet    作者:unias    | 项目源码 | 文件源码
def collect_net_stats(self):
        raw_stats = psutil.net_io_counters(pernic=True)
        for key in raw_stats.keys():
            if re.match('[\d]+-[\d]+',key) is not None:
                if key not in self.net_stats.keys():
                    self.net_stats[key] = {}
                    self.net_stats[key]['bytes_sent'] = 0
                    self.net_stats[key]['bytes_recv'] = 0
                self.net_stats[key]['bytes_recv_per_sec'] = round((int(raw_stats[key].bytes_sent) - self.net_stats[key]['bytes_recv']) / self.interval)
                self.net_stats[key]['bytes_sent_per_sec'] = round((int(raw_stats[key].bytes_recv) - self.net_stats[key]['bytes_sent']) / self.interval)
                self.net_stats[key]['bytes_recv'] = int(raw_stats[key].bytes_sent)
                self.net_stats[key]['bytes_sent'] = int(raw_stats[key].bytes_recv)
                self.net_stats[key]['packets_recv'] = int(raw_stats[key].packets_sent)
                self.net_stats[key]['packets_sent'] = int(raw_stats[key].packets_recv)
                self.net_stats[key]['errin'] = int(raw_stats[key].errout)
                self.net_stats[key]['errout'] = int(raw_stats[key].errin)
                self.net_stats[key]['dropin'] = int(raw_stats[key].dropout)
                self.net_stats[key]['dropout'] = int(raw_stats[key].dropin)
            else:
                if key not in gateways_stats.keys():
                    gateways_stats[key] = {}
                gateways_stats[key]['bytes_recv'] = int(raw_stats[key].bytes_sent)
                gateways_stats[key]['bytes_sent'] = int(raw_stats[key].bytes_recv)
                gateways_stats[key]['bytes_total'] = gateways_stats[key]['bytes_recv'] + gateways_stats[key]['bytes_sent']
        #logger.info(self.net_stats)

    # the main function to collect monitoring data of a container
项目:icinga2checks    作者:c-store    | 项目源码 | 文件源码
def check_network(i_warning, i_critical):
    test_int(i_warning, i_critical)

    s_perfdata    = ''
    s_output      = ''
    i_max         = 0
    s_maxdesc     = ''
    d_io_counters = psutil.net_io_counters(pernic=True)

    for s_device, nt_counters in d_io_counters.items():
        d_counters = nt_counters._asdict()
        # add all io_counters to perfdata
        for key, value in d_counters.items():
            if 'err' in key or 'drop' in key:
                if value > i_max: 
                    i_max = value
                    s_maxdesc = '{} has {} {} packets.'.format(s_device, value, key)
            s_perfdata = add_perfdata(s_perfdata, s_device, key, value)

    s_output = check_status(i_warning, i_critical, i_max)

    if not 'OK' in s_output: s_output += s_maxdesc

    s_output += ' | {}'.format(s_perfdata)

    return s_output
项目:centos-base-consul    作者:zeroc0d3lab    | 项目源码 | 文件源码
def _get_bytes(interface):
        try:
            io_counters = psutil.net_io_counters(pernic=True)
        except AttributeError:
            io_counters = psutil.network_io_counters(pernic=True)
        if_io = io_counters.get(interface)
        if not if_io:
            return None
        return if_io.bytes_recv, if_io.bytes_sent
项目:centos-base-consul    作者:zeroc0d3lab    | 项目源码 | 文件源码
def _get_interfaces():
        try:
            io_counters = psutil.net_io_counters(pernic=True)
        except AttributeError:
            io_counters = psutil.network_io_counters(pernic=True)
        for interface, data in io_counters.items():
            if data:
                yield interface, data.bytes_recv, data.bytes_sent
项目:Prism    作者:Stumblinbear    | 项目源码 | 文件源码
def get_networks(self):
        return psutil.net_io_counters(pernic=True)
项目:CommunityCellularManager    作者:facebookincubator    | 项目源码 | 文件源码
def get_data(self):
        """Gets system utilization stats."""
        # Get system utilization stats.
        cpu_percent = psutil.cpu_percent(interval=1)
        memory_percent = psutil.virtual_memory().percent
        disk_percent = psutil.disk_usage('/').percent
        network_io = psutil.net_io_counters()
        # Compute deltas for sent and received bytes.  Note this is system-wide
        # network usage and not necessarily GPRS-related.
        # TODO(matt): query on a specific interface..which one, I'm not sure.
        if self.last_bytes_sent == 0:
            bytes_sent_delta = 0
        else:
            bytes_sent_delta = network_io.bytes_sent - self.last_bytes_sent
        self.last_bytes_sent = network_io.bytes_sent
        if self.last_bytes_received == 0:
            bytes_received_delta = 0
        else:
            bytes_received_delta = (
                network_io.bytes_recv - self.last_bytes_received)
        self.last_bytes_received = network_io.bytes_recv
        return {
            'cpu_percent': cpu_percent,
            'memory_percent': memory_percent,
            'disk_percent': disk_percent,
            'bytes_sent_delta': bytes_sent_delta,
            'bytes_received_delta': bytes_received_delta,
        }
项目:simon    作者:half0wl    | 项目源码 | 文件源码
def network_recv():
    return bytes2human(psutil.net_io_counters().bytes_recv)
项目:simon    作者:half0wl    | 项目源码 | 文件源码
def network_sent():
    return bytes2human(psutil.net_io_counters().bytes_sent)
项目:bumblebee-status    作者:tobi-wan-kenobi    | 项目源码 | 文件源码
def _update_widgets(self, widgets):
        interfaces = [i for i in netifaces.interfaces() if not i.startswith(self._exclude)]

        del widgets[:]

        counters = psutil.net_io_counters(pernic=True)
        for interface in interfaces:
            if not interface: interface = "lo"
            state = "down"
            if len(self.get_addresses(interface)) > 0:
                state = "up"

            if len(self._states["exclude"]) > 0 and state in self._states["exclude"]: continue
            if len(self._states["include"]) > 0 and state not in self._states["include"]: continue

            data = {
                "rx": counters[interface].bytes_recv,
                "tx": counters[interface].bytes_sent,
            }

            name = "traffic-{}".format(interface)

            if self._showname:
               self.create_widget(widgets, name, interface)

            for direction in ["rx", "tx"]:
                name = "traffic.{}-{}".format(direction, interface)
                widget = self.create_widget(widgets, name, attributes={"theme.minwidth": "1000.00MB"})
                prev = self._prev.get(name, 0)
                speed = bumblebee.util.bytefmt(int(data[direction]) - int(prev))
                widget.full_text(speed)
                self._prev[name] = data[direction]

# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
项目:psystem    作者:gokhanm    | 项目源码 | 文件源码
def network(self, all_interface=False):
        """
            all_interface: if true, shows all interface network statistics
            Return: dict
        """
        if all_interface:
            stats = psutil.net_io_counters(pernic=True)
        else:
            stats = psutil.net_io_counters()

        if all_interface:
            n = {}
            for k, v in stats.items():
                n[k] = {
                            "bytes_send": self.hr(v.bytes_sent),
                            "bytes_recv": self.hr(v.bytes_recv),
                            "packets_sent": self.hr(v.packets_sent),
                            "packets_recv": self.hr(v.packets_recv),
                            "errin": v.errin,
                            "errorout": v.errout,
                            "dropin": v.dropin,
                            "dropout": v.dropout
                        }
        else:
            n = {
                    "bytes_sent": self.hr(stats.bytes_sent),
                    "bytes_recv": self.hr(stats.bytes_recv),
                    "packets_sent": self.hr(stats.packets_sent),
                    "packets_recv": self.hr(stats.packets_recv),
                    "errin": stats.errin,
                    "errorout": stats.errout,
                    "dropin": stats.dropin,
                    "dropout": stats.dropout
                }

        return n
项目:sawtooth-validator    作者:hyperledger-archives    | 项目源码 | 文件源码
def get_stats(self):
        cpct = psutil.cpu_percent(interval=0)
        ctimes = psutil.cpu_times_percent()
        self.cpu_stats = CpuStats(cpct, ctimes.user, ctimes.system,
                                  ctimes.idle)

        self.vmem_stats = psutil.virtual_memory()
        self.disk_stats = psutil.disk_io_counters()
        self.net_stats = psutil.net_io_counters()

        # must create new stats list each time stats are updated
        # because named tuples are immutable
        self.statslist = [self.cpu_stats, self.vmem_stats, self.disk_stats,
                          self.net_stats]
项目:hydra    作者:lake-lerna    | 项目源码 | 文件源码
def test_start(self):
        process = psutil.Process()
        self.run_data['start'] = True
        self.run_data['test_status'] = 'running'
        self.run_data['stats'] = {'net:start': json.dumps(psutil.net_io_counters()),
                                  'cpu:start': json.dumps(process.cpu_times()),
                                  'mem:start': json.dumps(process.memory_info()),
                                  'time:start': json.dumps(time.time())}
        return ('ok', None)
项目:hydra    作者:lake-lerna    | 项目源码 | 文件源码
def get_stats(self):
        process = psutil.Process()
        self.run_data['stats']['msg_cnt'] = self.msg_cnt
        self.run_data['stats']['net:end'] = json.dumps(psutil.net_io_counters())
        self.run_data['stats']['cpu:end'] = json.dumps(process.cpu_times())
        self.run_data['stats']['mem:end'] = json.dumps(process.memory_info())
        self.run_data['stats']['reconnect_cnt'] = self.reconnect_cnt
        self.run_data['stats']['rate'] = self.run_data['stats']['msg_cnt'] / (
            self.run_data['last_msg_time_r'] - self.run_data['first_msg_time_r'])
        return ('ok', self.run_data['stats'])
项目:hydra    作者:lake-lerna    | 项目源码 | 文件源码
def reset_stats(self):
        l.info("RESETTING SUB STATS")
        process = psutil.Process()
        self.run_data = {'stats': {}}
        self.run_data['stats'] = {'msg_cnt': 0, 'first_msg_time': 0, 'last_msg_time': 0}
        self.run_data['stats']['net:start'] = json.dumps(psutil.net_io_counters())
        self.run_data['stats']['cpu:start'] = json.dumps(process.cpu_times())
        self.run_data['stats']['mem:start'] = json.dumps(process.memory_info())
        self.run_data['first_msg_time_r'] = 0
        self.run_data['last_msg_time_r'] = 1
        self.msg_cnt = 0
        self.reconnect_cnt = 0
        return ('ok', 'stats reset')
项目:hydra    作者:lake-lerna    | 项目源码 | 文件源码
def get_stats(self):
        process = psutil.Process()
        self.run_data['stats']['net']['end'] = psutil.net_io_counters()
        self.run_data['stats']['cpu']['end'] = process.cpu_times()
        self.run_data['stats']['mem']['end'] = process.memory_info()
        duration = self.run_data['last_msg_time'] - self.run_data['first_msg_time']
        if duration == 0:
            self.run_data['rate'] = 0
        else:
            self.run_data['rate'] = self.run_data['msg_cnt'] / duration

        return ('ok', self.run_data)
项目:hydra    作者:lake-lerna    | 项目源码 | 文件源码
def reset_stats(self):
        l.info("RESETTING SUB STATS")
        process = psutil.Process()
        self.run_data = {'msg_cnt': 0, 'first_msg_time': 0, 'last_msg_time': 0, 'stats': {}}
        self.run_data['stats']['net'] = {'start': psutil.net_io_counters()}
        self.run_data['stats']['cpu'] = {'start': process.cpu_times()}
        self.run_data['stats']['mem'] = {'start': process.memory_info()}
        self.msg_cnt = 0
        return ('ok', 'stats reset')
项目:hydra    作者:lake-lerna    | 项目源码 | 文件源码
def test_start(self):
        process = psutil.Process()
        self.run_data['start'] = True
        self.run_data['test_status'] = 'running'
        self.run_data['stats'] = {'net:start': json.dumps(psutil.net_io_counters()),
                                  'cpu:start': json.dumps(process.cpu_times()),
                                  'mem:start': json.dumps(process.memory_info()),
                                  'time:start': json.dumps(time.time())}
        return ('ok', None)
项目:hydra    作者:lake-lerna    | 项目源码 | 文件源码
def get_stats(self):
        process = psutil.Process()
        self.run_data['stats']['msg_cnt'] = self.msg_cnt
        self.run_data['stats']['net:end'] = json.dumps(psutil.net_io_counters())
        self.run_data['stats']['cpu:end'] = json.dumps(process.cpu_times())
        self.run_data['stats']['mem:end'] = json.dumps(process.memory_info())
        self.run_data['stats']['rate'] = self.run_data['stats']['msg_cnt'] / (
            self.run_data['last_msg_time_r'] - self.run_data['first_msg_time_r'])

        return ('ok', self.run_data['stats'])
项目:hydra    作者:lake-lerna    | 项目源码 | 文件源码
def reset_stats(self):
        l.info("RESETTING SUB STATS")
        process = psutil.Process()
        self.run_data = {'msg_cnt': 0, 'first_msg_time': 0, 'last_msg_time': 0, 'stats': {}}
        self.run_data['stats']['net'] = {'start': psutil.net_io_counters()}
        self.run_data['stats']['cpu'] = {'start': process.cpu_times()}
        self.run_data['stats']['mem'] = {'start': process.memory_info()}
        self.run_data['first_msg_time_r'] = 0
        self.run_data['last_msg_time_r'] = 1
        self.msg_cnt = 0
        return ('ok', 'stats reset')
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_net_io_counters(self):
        self.execute(psutil.net_io_counters)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_net_io_counters(self):
        def ifconfig(nic):
            ret = {}
            out = sh("ifconfig %s" % name)
            ret['packets_recv'] = int(re.findall('RX packets:(\d+)', out)[0])
            ret['packets_sent'] = int(re.findall('TX packets:(\d+)', out)[0])
            ret['errin'] = int(re.findall('errors:(\d+)', out)[0])
            ret['errout'] = int(re.findall('errors:(\d+)', out)[1])
            ret['dropin'] = int(re.findall('dropped:(\d+)', out)[0])
            ret['dropout'] = int(re.findall('dropped:(\d+)', out)[1])
            ret['bytes_recv'] = int(re.findall('RX bytes:(\d+)', out)[0])
            ret['bytes_sent'] = int(re.findall('TX bytes:(\d+)', out)[0])
            return ret

        for name, stats in psutil.net_io_counters(pernic=True).items():
            try:
                ifconfig_ret = ifconfig(name)
            except RuntimeError:
                continue
            self.assertAlmostEqual(
                stats.bytes_recv, ifconfig_ret['bytes_recv'], delta=1024 * 5)
            self.assertAlmostEqual(
                stats.bytes_sent, ifconfig_ret['bytes_sent'], delta=1024 * 5)
            self.assertAlmostEqual(
                stats.packets_recv, ifconfig_ret['packets_recv'], delta=1024)
            self.assertAlmostEqual(
                stats.packets_sent, ifconfig_ret['packets_sent'], delta=1024)
            self.assertAlmostEqual(
                stats.errin, ifconfig_ret['errin'], delta=10)
            self.assertAlmostEqual(
                stats.errout, ifconfig_ret['errout'], delta=10)
            self.assertAlmostEqual(
                stats.dropin, ifconfig_ret['dropin'], delta=10)
            self.assertAlmostEqual(
                stats.dropout, ifconfig_ret['dropout'], delta=10)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_nic_names(self):
        p = subprocess.Popen(['ipconfig', '/all'], stdout=subprocess.PIPE)
        out = p.communicate()[0]
        if PY3:
            out = str(out, sys.stdout.encoding or sys.getfilesystemencoding())
        nics = psutil.net_io_counters(pernic=True).keys()
        for nic in nics:
            if "pseudo-interface" in nic.replace(' ', '-').lower():
                continue
            if nic not in out:
                self.fail(
                    "%r nic wasn't found in 'ipconfig /all' output" % nic)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_nic_names(self):
        p = subprocess.Popen("ifconfig -a", shell=1, stdout=subprocess.PIPE)
        output = p.communicate()[0].strip()
        if p.returncode != 0:
            raise unittest.SkipTest('ifconfig returned no output')
        if PY3:
            output = str(output, sys.stdout.encoding)
        for nic in psutil.net_io_counters(pernic=True).keys():
            for line in output.split():
                if line.startswith(nic):
                    break
            else:
                self.fail(
                    "couldn't find %s nic in 'ifconfig -a' output\n%s" % (
                        nic, output))
项目:JimV-N    作者:jamesiter    | 项目源码 | 文件源码
def host_traffic_performance_report(self):

        data = list()
        net_io = psutil.net_io_counters(pernic=True)

        for nic_name in self.interfaces.keys():
            nic = net_io.get(nic_name, None)
            if nic is None:
                continue

            traffic = list()

            if nic_name in self.last_host_traffic:
                traffic = {
                    'node_id': self.node_id,
                    'name': nic_name,
                    'rx_bytes': (nic.bytes_recv - self.last_host_traffic[nic_name].bytes_recv) / self.interval,
                    'rx_packets':
                        (nic.packets_recv - self.last_host_traffic[nic_name].packets_recv) / self.interval,
                    'rx_errs': (nic.errin - self.last_host_traffic[nic_name].errin),
                    'rx_drop': (nic.dropin - self.last_host_traffic[nic_name].dropin),
                    'tx_bytes': (nic.bytes_sent - self.last_host_traffic[nic_name].bytes_sent) / self.interval,
                    'tx_packets':
                        (nic.packets_sent - self.last_host_traffic[nic_name].packets_sent) / self.interval,
                    'tx_errs': (nic.errout - self.last_host_traffic[nic_name].errout),
                    'tx_drop': (nic.dropout - self.last_host_traffic[nic_name].dropout)
                }
            elif not isinstance(self.last_host_disk_io, dict):
                self.last_host_traffic = dict()

            self.last_host_traffic[nic_name] = nic

            if traffic.__len__() > 0:
                data.append(traffic)

        if data.__len__() > 0:
            host_collection_performance_emit.traffic(data=data)
项目:ahenk    作者:Pardus-LiderAhenk    | 项目源码 | 文件源码
def interface_size():
                return len(psutil.net_io_counters(pernic=True))
项目:ahenk    作者:Pardus-LiderAhenk    | 项目源码 | 文件源码
def io_counter_detail():
                return psutil.net_io_counters(pernic=True)
项目:ahenk    作者:Pardus-LiderAhenk    | 项目源码 | 文件源码
def interfaces():
                arr = []
                for iface in psutil.net_io_counters(pernic=True):
                    arr.append(str(iface))
                return arr
项目:ahenk    作者:Pardus-LiderAhenk    | 项目源码 | 文件源码
def mac_addresses():
                mac = get_mac()
                ':'.join(("%012X" % mac)[i:i + 2] for i in range(0, 12, 2))
                arr = []
                for iface in psutil.net_io_counters(pernic=True):
                    try:
                        addr_list = psutil.net_if_addrs()
                        mac = addr_list[str(iface)][2][1]
                        if re.match("[0-9a-f]{2}([-:])[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$", mac.lower()) and str(
                                mac) != '00:00:00:00:00:00':
                            arr.append(mac.lower())
                    except Exception as e:
                        pass

                return arr
项目:ahenk    作者:Pardus-LiderAhenk    | 项目源码 | 文件源码
def ip_addresses():
            arr = []
            for iface in psutil.net_io_counters(pernic=True):
                ip = psutil.net_if_addrs()[str(iface)][0][1]
                if re.match(r'^((\d{1,2}|1\d{2}|2[0-4]\d|25[0-5])\.){3}(\d{1,2}|1\d{2}|2[0-4]\d|25[0-5])$', ip) and str(
                        ip) != 'localhost' and str(ip) != '127.0.0.1':
                    arr.append(ip)
            return arr
项目:wptagent    作者:WPO-Foundation    | 项目源码 | 文件源码
def find_default_interface(self):
        """Look through the list of interfaces for the non-loopback interface"""
        import psutil
        try:
            if self.interfaces is None:
                self.interfaces = {}
                # Look to see which interfaces are up
                stats = psutil.net_if_stats()
                for interface in stats:
                    if interface != 'lo' and interface[:3] != 'ifb' and stats[interface].isup:
                        self.interfaces[interface] = {'packets': 0}
                if len(self.interfaces) > 1:
                    # See which interfaces have received data
                    cnt = psutil.net_io_counters(True)
                    for interface in cnt:
                        if interface in self.interfaces:
                            self.interfaces[interface]['packets'] = \
                                cnt[interface].packets_sent + cnt[interface].packets_recv
                    remove = []
                    for interface in self.interfaces:
                        if self.interfaces[interface]['packets'] == 0:
                            remove.append(interface)
                    if len(remove):
                        for interface in remove:
                            del self.interfaces[interface]
                if len(self.interfaces) > 1:
                    # Eliminate any with the loopback address
                    remove = []
                    addresses = psutil.net_if_addrs()
                    for interface in addresses:
                        if interface in self.interfaces:
                            for address in addresses[interface]:
                                if address.address == '127.0.0.1':
                                    remove.append(interface)
                                    break
                    if len(remove):
                        for interface in remove:
                            del self.interfaces[interface]
        except Exception:
            pass
项目:wptagent    作者:WPO-Foundation    | 项目源码 | 文件源码
def get_net_bytes(self):
        """Get the bytes received, ignoring the loopback interface"""
        import psutil
        bytes_in = 0
        net = psutil.net_io_counters(True)
        for interface in net:
            if self.interfaces is not None:
                if interface in self.interfaces:
                    bytes_in += net[interface].bytes_recv
            elif interface != 'lo' and interface[:3] != 'ifb':
                bytes_in += net[interface].bytes_recv
        return bytes_in
项目:nixstatsagent    作者:NIXStats    | 项目源码 | 文件源码
def run(self, *unused):
        return psutil.net_io_counters(pernic=True)
项目:rabbit_and_flask_basic    作者:spekulant    | 项目源码 | 文件源码
def net_util_value():
    first_time_stamp = psutil.net_io_counters(
        pernic=False).bytes_sent + psutil.net_io_counters(pernic=False).bytes_recv
    time.sleep(5)
    second_time_stamp = psutil.net_io_counters(
        pernic=False).bytes_sent + psutil.net_io_counters(pernic=False).bytes_recv
    return second_time_stamp - first_time_stamp
项目:raspberry-pi-2    作者:LuisDiazUgena    | 项目源码 | 文件源码
def network(iface):
    stat = psutil.net_io_counters(pernic=True)[iface]
    return "%s: Tx%s, Rx%s" % \
           (iface, bytes2human(stat.bytes_sent), bytes2human(stat.bytes_recv))
项目:lainonlife    作者:barrucadu    | 项目源码 | 文件源码
def network_metrics():
    """Get the current upload, in bytes, since last boot."""

    psinfo = psutil.net_io_counters(pernic=True)

    return {
        "{}_{}".format(iface, way): ifinfo[n]
        for iface, ifinfo in psinfo.items()
        for way, n in {"up": 0, "down": 1}.items()
    }
项目:dino    作者:thenetcircle    | 项目源码 | 文件源码
def network():
    c = statsd.StatsClient(STATSD_HOST, 8125, prefix=PREFIX + 'system.network')
    t0 = time.time()
    counters = psutil.net_io_counters(pernic=True)

    last_totals = dict()
    totals = dict()
    interfaces = set([key for key in counters.keys() if key != 'lo'])
    for interface in interfaces:
        totals[interface] = (counters[interface].bytes_sent, counters[interface].bytes_recv)
        last_totals[interface] = (counters[interface].bytes_sent, counters[interface].bytes_recv)

    while True:
        for interface in interfaces:
            counter = psutil.net_io_counters(pernic=True)[interface]
            t1 = time.time()
            totals[interface] = (counter.bytes_sent, counter.bytes_recv)

            ul, dl = [(now - last) / (t1 - t0) / 1000.0
                      for now, last in zip(totals[interface], last_totals[interface])]

            t0 = time.time()
            c.gauge('%s.upload.kbps' % interface, ul)
            c.gauge('%s.download.kbps' % interface, dl)
            last_totals[interface] = totals[interface]

        time.sleep(GRANULARITY)
项目:CodeLabs    作者:TheIoTLearningInitiative    | 项目源码 | 文件源码
def functionDataSensor():
    netdata = psutil.net_io_counters()
    data = netdata.packets_sent + netdata.packets_recv
    return data
项目:skynet    作者:skynetera    | 项目源码 | 文件源码
def poll(interval):
    """
    Retrieve raw stats within an interval window.
    """
    tot_before = psutil.net_io_counters()
    pnic_before = psutil.net_io_counters(pernic=True)
    # sleep some time
    time.sleep(interval)
    tot_after = psutil.net_io_counters()
    pnic_after = psutil.net_io_counters(pernic=True)
    return (tot_before, tot_after, pnic_before, pnic_after)