Python psutil 模块,net_connections() 实例源码

我们从Python开源项目中,提取了以下34个代码示例,用于说明如何使用psutil.net_connections()

项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_procfs_path(self):
        tdir = tempfile.mkdtemp()
        try:
            psutil.PROCFS_PATH = tdir
            self.assertRaises(IOError, psutil.virtual_memory)
            self.assertRaises(IOError, psutil.cpu_times)
            self.assertRaises(IOError, psutil.cpu_times, percpu=True)
            self.assertRaises(IOError, psutil.boot_time)
            # self.assertRaises(IOError, psutil.pids)
            self.assertRaises(IOError, psutil.net_connections)
            self.assertRaises(IOError, psutil.net_io_counters)
            self.assertRaises(IOError, psutil.net_if_stats)
            self.assertRaises(IOError, psutil.disk_io_counters)
            self.assertRaises(IOError, psutil.disk_partitions)
            self.assertRaises(psutil.NoSuchProcess, psutil.Process)
        finally:
            psutil.PROCFS_PATH = "/proc"
            os.rmdir(tdir)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_net_connections_mocked(self):
        def open_mock(name, *args, **kwargs):
            if name == '/proc/net/unix':
                return io.StringIO(textwrap.dedent(u"""\
                    0: 00000003 000 000 0001 03 462170 @/tmp/dbus-Qw2hMPIU3n
                    0: 00000003 000 000 0001 03 35010 @/tmp/dbus-tB2X8h69BQ
                    0: 00000003 000 000 0001 03 34424 @/tmp/dbus-cHy80Y8O
                    000000000000000000000000000000000000000000000000000000
                    """))
            else:
                return orig_open(name, *args, **kwargs)

        orig_open = open
        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
        with mock.patch(patch_point, side_effect=open_mock) as m:
            psutil.net_connections(kind='unix')
            assert m.called


# =====================================================================
# system disk
# =====================================================================
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_net_connections_mocked(self):
        def open_mock(name, *args, **kwargs):
            if name == '/proc/net/unix':
                return io.StringIO(textwrap.dedent(u"""\
                    0: 00000003 000 000 0001 03 462170 @/tmp/dbus-Qw2hMPIU3n
                    0: 00000003 000 000 0001 03 35010 @/tmp/dbus-tB2X8h69BQ
                    0: 00000003 000 000 0001 03 34424 @/tmp/dbus-cHy80Y8O
                    000000000000000000000000000000000000000000000000000000
                    """))
            else:
                return orig_open(name, *args, **kwargs)

        orig_open = open
        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
        with mock.patch(patch_point, side_effect=open_mock) as m:
            psutil.net_connections(kind='unix')
            assert m.called


# =====================================================================
# system disk
# =====================================================================
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_procfs_path(self):
        tdir = tempfile.mkdtemp()
        try:
            psutil.PROCFS_PATH = tdir
            self.assertRaises(IOError, psutil.virtual_memory)
            self.assertRaises(IOError, psutil.cpu_times)
            self.assertRaises(IOError, psutil.cpu_times, percpu=True)
            self.assertRaises(IOError, psutil.boot_time)
            # self.assertRaises(IOError, psutil.pids)
            self.assertRaises(IOError, psutil.net_connections)
            self.assertRaises(IOError, psutil.net_io_counters)
            self.assertRaises(IOError, psutil.net_if_stats)
            self.assertRaises(IOError, psutil.disk_io_counters)
            self.assertRaises(IOError, psutil.disk_partitions)
            self.assertRaises(psutil.NoSuchProcess, psutil.Process)
        finally:
            psutil.PROCFS_PATH = "/proc"
            os.rmdir(tdir)
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_net_connections_mocked(self):
        def open_mock(name, *args, **kwargs):
            if name == '/proc/net/unix':
                return io.StringIO(textwrap.dedent(u"""\
                    0: 00000003 000 000 0001 03 462170 @/tmp/dbus-Qw2hMPIU3n
                    0: 00000003 000 000 0001 03 35010 @/tmp/dbus-tB2X8h69BQ
                    0: 00000003 000 000 0001 03 34424 @/tmp/dbus-cHy80Y8O
                    000000000000000000000000000000000000000000000000000000
                    """))
            else:
                return orig_open(name, *args, **kwargs)

        orig_open = open
        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
        with mock.patch(patch_point, side_effect=open_mock) as m:
            psutil.net_connections(kind='unix')
            assert m.called


# =====================================================================
# --- system disk
# =====================================================================
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_procfs_path(self):
        tdir = tempfile.mkdtemp()
        try:
            psutil.PROCFS_PATH = tdir
            self.assertRaises(IOError, psutil.virtual_memory)
            self.assertRaises(IOError, psutil.cpu_times)
            self.assertRaises(IOError, psutil.cpu_times, percpu=True)
            self.assertRaises(IOError, psutil.boot_time)
            # self.assertRaises(IOError, psutil.pids)
            self.assertRaises(IOError, psutil.net_connections)
            self.assertRaises(IOError, psutil.net_io_counters)
            self.assertRaises(IOError, psutil.net_if_stats)
            self.assertRaises(IOError, psutil.disk_io_counters)
            self.assertRaises(IOError, psutil.disk_partitions)
            self.assertRaises(psutil.NoSuchProcess, psutil.Process)
        finally:
            psutil.PROCFS_PATH = "/proc"
            os.rmdir(tdir)
项目:pyabc    作者:neuralyzer    | 项目源码 | 文件源码
def __init__(self, host="localhost", port=6379):
        conn = psutil.net_connections()
        ports = [c.laddr[1] for c in conn]
        port = max(ports) + 1
        self.__port = port
        self.__redis_server = Popen(["redis-server", "--port", str(port)])
        sleep(1)
        self.__worker = [
            Process(target=work,
                    args=(["--host", "localhost", "--port", str(port)],),
                    daemon=False)
            for _ in range(2)
        ]
        for p in self.__worker:
            p.start()
        super().__init__(host, port)
项目:pybix    作者:lioncui    | 项目源码 | 文件源码
def get_tcp_info(self):
        returnData = {}
        try:
            conns = psutil.net_connections()
            sumConn = {}
            for con in conns:
                if con.status == 'NONE':
                    continue
                if con.status not in sumConn:
                    sumConn[con.status] = 0
                sumConn[con.status] = sumConn[con.status] + 1
            for element in sumConn:
                if 'tcpsum' not in returnData:
                    returnData['tcpsum'] = {}
                returnData['tcpsum'][element] = sumConn[element]
        except Exception:
            pybixlib.error(self.logHead + traceback.format_exc())
            self.errorInfoDone(traceback.format_exc())
        return returnData
项目:LinuxBashShellScriptForOps    作者:DingGuodong    | 项目源码 | 文件源码
def main():
    templ = "%-5s %-30s %-30s %-13s %-6s %s"
    print(templ % (
        "Proto", "Local address", "Remote address", "Status", "PID",
        "Program name"))
    proc_names = {}
    for p in psutil.process_iter():
        try:
            proc_names[p.pid] = p.name()
        except psutil.Error:
            pass
    for c in psutil.net_connections(kind='inet'):
        laddr = "%s:%s" % (c.laddr)
        raddr = ""
        if c.raddr:
            raddr = "%s:%s" % (c.raddr)
        print(templ % (
            proto_map[(c.family, c.type)],
            laddr,
            raddr or AD,
            c.status,
            c.pid or AD,
            proc_names.get(c.pid, '?')[:15],
        ))
项目:LinuxBashShellScriptForOps    作者:DingGuodong    | 项目源码 | 文件源码
def get_tcp_all_conns_count_top(top=10):
    netstat = psutil.net_connections(kind="tcp")

    cared_data = list()

    for sconn in netstat:
        cared_data.append((sconn.laddr[0], sconn.laddr[1]))

    cared_data_with_counter = Counter(cared_data)

    if len(cared_data) < top:
        top = len(cared_data)

    tcp_conns_top_x = sorted(cared_data_with_counter.iteritems(), key=lambda x: x[1], reverse=True)

    return tcp_conns_top_x[0:top]
项目:corvus-web-public    作者:eleme    | 项目源码 | 文件源码
def check_ports(ports):
    connections = psutil.net_connections()
    all_ports = set(
        c.laddr[1] for c in connections if isinstance(c.laddr, tuple))
    ports_inuse = set(ports) & all_ports
    if ports_inuse:
        raise TaskException(
            'ports {} are in use'.format(', '.join(map(str, ports_inuse))))
项目:raiden    作者:raiden-network    | 项目源码 | 文件源码
def get_free_port(address, initial_port):
    """Find an unused TCP port in a specified range. This should not
      be used in misson-critical applications - a race condition may
      occur if someone grabs the port before caller of this function
      has chance to use it.
      Parameters:
          address       (string): an ip address of interface to use
          initial_port  (int)   : port to start iteration with
      Return:
          iterator that will return next unused port on a specified
            interface
    """

    try:
        # On OSX this function requires root privileges
        psutil.net_connections()
    except psutil.AccessDenied:
        return count(initial_port)

    def _unused_ports():
        for port in count(initial_port):
            # check if the port is being used
            connect_using_port = (
                conn
                for conn in psutil.net_connections()
                if hasattr(conn, 'laddr') and
                conn.laddr[0] == address and
                conn.laddr[1] == port
            )

            # only generate unused ports
            if not any(connect_using_port):
                yield port

    return _unused_ports()
项目:jubakit    作者:jubatus    | 项目源码 | 文件源码
def _get_ports_in_use(cls):
    """
    Returns a set of ports currently used on localhost.
    """
    try:
      return set([x.laddr[1] for x in psutil.net_connections(kind='inet4')])
    except psutil.AccessDenied:
      # On some platforms (such as OS X), root privilege is required to get used ports.
      # In that case we avoid port confliction to the best of our knowledge.
      _logger.info('ports in use cannot be obtained on this platform; ports will be assigned sequentially')
      return set()
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_net_connections(self):
        self.execute(psutil.net_connections)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def test_net_connections_ipv6_unsupported(self, supports_ipv6, inet_ntop):
        # see: https://github.com/giampaolo/psutil/issues/623
        try:
            s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
            self.addCleanup(s.close)
            s.bind(("::1", 0))
        except socket.error:
            pass
        psutil.net_connections(kind='inet6')
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def compare_proc_sys_cons(self, pid, proc_cons):
        from psutil._common import pconn
        sys_cons = [c[:-1] for c in psutil.net_connections(kind='all')
                    if c.pid == pid]
        if FREEBSD:
            # on FreeBSD all fds are set to -1
            proc_cons = [pconn(*[-1] + list(x[1:])) for x in proc_cons]
        self.assertEqual(sorted(proc_cons), sorted(sys_cons))
项目:pwndemo    作者:zh-explorer    | 项目源码 | 文件源码
def pidof(target):
    """pidof(target) -> int list

    Get PID(s) of `target`.  The returned PID(s) depends on the type of `target`:

    - :class:`str`: PIDs of all processes with a name matching `target`.
    - :class:`pwnlib.tubes.process.process`: singleton list of the PID of `target`.
    - :class:`pwnlib.tubes.sock.sock`: singleton list of the PID at the
      remote end of `target` if it is running on the host.  Otherwise an
      empty list.

    Arguments:
        target(object):  The target whose PID(s) to find.

    Returns:
        A list of found PIDs.
    """
    if isinstance(target, tubes.sock.sock):
         local  = target.sock.getsockname()
         remote = target.sock.getpeername()

         def match(p):
             return (c.raddr, c.laddr, c.status) == (local, remote, 'ESTABLISHED')

         return [c.pid for c in psutil.net_connections() if match(c)]

    elif isinstance(target, tubes.process.process):
         return [target.proc.pid]

    else:
         return pid_by_name(target)
项目:ifi    作者:mike01    | 项目源码 | 文件源码
def get_procinfo_by_address(ip_src, ip_dst, port_src=None, port_dst=None):
    """
    Gets Infos about the Prozess associated with the given address information
    Both port_src and port_dst must be not None or None at the same time.

    return -- [pid, "/path/to/command", "user", "hash"] or []
    """
    result = []

    if port_src is not None:
        pids = [c.pid for c in net_connections()
            if len(c.raddr) != 0 and c.pid is not None and
                (c.laddr[0], c.raddr[0], c.laddr[1], c.raddr[1]) == (ip_src, ip_dst, port_src, port_dst)]
    else:
        pids = [c.pid for c in net_connections()
            if len(c.raddr) != 0 and c.pid is not None and (c.laddr[0], c.raddr[0]) == (ip_src, ip_dst)]

    try:
        if len(pids) > 1:
            logger.warning("more than 1 matching process: %r", pids)
        proc = Process(pids[0])
        cmd = [pids[0], proc.cmdline(), proc.username()]
        hash_input = "%d%s%s" % (cmd[0], cmd[1], cmd[2])
        procinfo_hash = hashlib.sha256(hash_input.encode("UTF-8"))
        cmd.append(procinfo_hash)
        logger.debug("process info: %r", cmd)
        return cmd
    except IndexError:
        pass
    return []
项目:ops_agent    作者:sjqzhang    | 项目源码 | 文件源码
def get_listen_ports(self):
        """
        ??????
        :return: ??????
        """
        listen_ports = []
        connections = psutil.net_connections()
        for connection in connections:
            if connection.status == "LISTEN":
                listen_ports.append((connection.laddr[1], connection.pid))
        return listen_ports
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_net_connections(self):
        self.execute(psutil.net_connections)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_net_connections_ipv6_unsupported(self, supports_ipv6, inet_ntop):
        # see: https://github.com/giampaolo/psutil/issues/623
        try:
            s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
            self.addCleanup(s.close)
            s.bind(("::1", 0))
        except socket.error:
            pass
        psutil.net_connections(kind='inet6')
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def compare_proc_sys_cons(self, pid, proc_cons):
        from psutil._common import pconn
        sys_cons = [c[:-1] for c in psutil.net_connections(kind='all')
                    if c.pid == pid]
        if FREEBSD:
            # on FreeBSD all fds are set to -1
            proc_cons = [pconn(*[-1] + list(x[1:])) for x in proc_cons]
        self.assertEqual(sorted(proc_cons), sorted(sys_cons))
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_net_connections(self):
        self.execute(psutil.net_connections)
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def test_net_connections_ipv6_unsupported(self, supports_ipv6, inet_ntop):
        # see: https://github.com/giampaolo/psutil/issues/623
        try:
            s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
            self.addCleanup(s.close)
            s.bind(("::1", 0))
        except socket.error:
            pass
        psutil.net_connections(kind='inet6')
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def compare_proc_sys_cons(self, pid, proc_cons):
        from psutil._common import pconn
        sys_cons = [c[:-1] for c in psutil.net_connections(kind='all')
                    if c.pid == pid]
        if FREEBSD:
            # on FreeBSD all fds are set to -1
            proc_cons = [pconn(*[-1] + list(x[1:])) for x in proc_cons]
        self.assertEqual(sorted(proc_cons), sorted(sys_cons))
项目:black_zone    作者:zh-explorer    | 项目源码 | 文件源码
def pidof(target):
    """pidof(target) -> int list

    Get PID(s) of `target`.  The returned PID(s) depends on the type of `target`:

    - :class:`str`: PIDs of all processes with a name matching `target`.
    - :class:`pwnlib.tubes.process.process`: singleton list of the PID of `target`.
    - :class:`pwnlib.tubes.sock.sock`: singleton list of the PID at the
      remote end of `target` if it is running on the host.  Otherwise an
      empty list.

    Arguments:
        target(object):  The target whose PID(s) to find.

    Returns:
        A list of found PIDs.
    """
    if isinstance(target, tubes.sock.sock):
         local  = target.sock.getsockname()
         remote = target.sock.getpeername()

         def match(p):
             return (c.raddr, c.laddr, c.status) == (local, remote, 'ESTABLISHED')

         return [c.pid for c in psutil.net_connections() if match(c)]

    elif isinstance(target, tubes.process.process):
         return [target.proc.pid]

    else:
         return pid_by_name(target)
项目:LinuxBashShellScriptForOps    作者:DingGuodong    | 项目源码 | 文件源码
def print_all_conns():
    system_conns = psutil.net_connections()
    for conn in system_conns:
        print conn
项目:LinuxBashShellScriptForOps    作者:DingGuodong    | 项目源码 | 文件源码
def print_conns_by_proc_pid(pid):
    system_conns = psutil.net_connections()
    for conn in system_conns:
        if conn.pid == pid:
            print conn
项目:LinuxBashShellScriptForOps    作者:DingGuodong    | 项目源码 | 文件源码
def print_conns_by_remote_addr(addr):
    system_conns = psutil.net_connections()
    for conn in system_conns:
        if addr in conn.raddr:
            print conn
项目:tm-manifesting    作者:FabricAttachedMemory    | 项目源码 | 文件源码
def kill_dnsmasq(config):
    # There can be more than one bound to it, especially if libvirt was used..
    # Easy way: started from (previous) run of manifest_api.
    if config['PXE_INTERFACE'] is None:
        return
    try:
        with open(config['DNSMASQ_PIDFILE'], 'r') as f:
            pid = int(f.read())
        kill_pid(pid, 'dnsmasq')
    except Exception:   # a valiant effort in vain
        pass

    # Hard way: track down dnsmasq(s) attached to the configured interface.
    pxe_interface = config['PXE_INTERFACE']
    if pxe_interface not in NIF.interfaces():
        return
    tmp = NIF.ifaddresses(pxe_interface).get(NIF.AF_INET, None)
    if tmp is None:         # ASS-U-MES "torms" address bound here
        return
    pxe_addr = tmp[0]['addr']

    # libvirt bridge will always have DNS (53).  A truly simple net definition
    # won't have TFTP (69) but others might.  Obviously one started from here
    # will have TFTP.  And /etc/init.d/dnsmasq starts a *.* DNS listener
    # which picks up PXE_INTERFACE when explicitly bound processes are killed.
    # net_connections returns an object with a laddr field that's a tuple
    # of (listenaddress, port).   Filter on that.

    openconns = [(c.laddr[1], c.pid)    # port, pid
        for c in psutil.net_connections(kind='inet4')
            if c.laddr[1] in (53, 69) and (
                c.laddr[0] == pxe_addr or c.laddr[0] == '0.0.0.0'
            )]
    pids = set(c[1] for c in openconns)
    if pids:
        mainapp.logger.info('Killing %d copies of dnsmasq' % len(pids))
    while len(pids):
        pid = pids.pop()
        kill_pid(pid, 'dnsmasq')    # Until someone starts using bind9...
项目:opensnitch    作者:evilsocket    | 项目源码 | 文件源码
def get_pid_by_connection(src_addr, src_p, dst_addr, dst_p, proto='tcp'):
    # We always take the first element as we assume it contains only one
    # It should not be possible to keep two connections which are the same.
    for conn in psutil.net_connections(kind=proto):
        if proto == 'tcp':
            if conn.laddr != (src_addr, int(src_p)):
                continue

            if conn.raddr != (dst_addr, int(dst_p)):
                continue

        # UDP gives us a very limited dataset to work with
        elif proto == 'udp':
            if conn.laddr[1] != int(src_p):
                continue

        return conn.pid

    logging.warning("Could not find process for %s connection %s:%s -> %s:%s",
                    proto,
                    src_addr,
                    src_p,
                    dst_addr,
                    dst_p)

    return None
项目:certbot    作者:nikoloskii    | 项目源码 | 文件源码
def already_listening(port, renewer=False):
    """Check if a process is already listening on the port.

    If so, also tell the user via a display notification.

    .. warning::
        On some operating systems, this function can only usefully be
        run as root.

    :param int port: The TCP port in question.
    :returns: True or False.

    """
    try:
        net_connections = psutil.net_connections()
    except psutil.AccessDenied as error:
        logger.info("Access denied when trying to list network "
                    "connections: %s. Are you root?", error)
        # this function is just a pre-check that often causes false
        # positives and problems in testing (c.f. #680 on Mac, #255
        # generally); we will fail later in bind() anyway
        return False

    listeners = [conn.pid for conn in net_connections
                 if conn.status == 'LISTEN' and
                 conn.type == socket.SOCK_STREAM and
                 conn.laddr[1] == port]
    try:
        if listeners and listeners[0] is not None:
            # conn.pid may be None if the current process doesn't have
            # permission to identify the listening process!  Additionally,
            # listeners may have more than one element if separate
            # sockets have bound the same port on separate interfaces.
            # We currently only have UI to notify the user about one
            # of them at a time.
            pid = listeners[0]
            name = psutil.Process(pid).name()
            display = zope.component.getUtility(interfaces.IDisplay)
            extra = ""
            if renewer:
                extra = (
                    " For automated renewal, you may want to use a script that stops"
                    " and starts your webserver. You can find an example at"
                    " https://letsencrypt.org/howitworks/#writing-your-own-renewal-script"
                    ". Alternatively you can use the webroot plugin to renew without"
                    " needing to stop and start your webserver.")
            display.notification(
                "The program {0} (process ID {1}) is already listening "
                "on TCP port {2}. This will prevent us from binding to "
                "that port. Please stop the {0} program temporarily "
                "and then try again.{3}".format(name, pid, port, extra),
                height=13)
            return True
    except (psutil.NoSuchProcess, psutil.AccessDenied):
        # Perhaps the result of a race where the process could have
        # exited or relinquished the port (NoSuchProcess), or the result
        # of an OS policy where we're not allowed to look up the process
        # name (AccessDenied).
        pass
    return False
项目:PyGNS3    作者:mvdwoord    | 项目源码 | 文件源码
def _getDebugData():
        try:
            connections = psutil.net_connections()
        # You need to be root for OSX
        except psutil.AccessDenied:
            connections = None

        try:
            addrs = ["* {}: {}".format(key, val) for key, val in psutil.net_if_addrs().items()]
        except UnicodeDecodeError:
            addrs = ["INVALID ADDR WITH UNICODE CHARACTERS"]

        data = """Version: {version}
OS: {os}
Python: {python}
CPU: {cpu}
Memory: {memory}

Networks:
{addrs}

Open connections:
{connections}

Processus:
""".format(
            version=__version__,
            os=platform.platform(),
            python=platform.python_version(),
            memory=psutil.virtual_memory(),
            cpu=psutil.cpu_times(),
            connections=connections,
            addrs="\n".join(addrs)
        )
        for proc in psutil.process_iter():
            try:
                psinfo = proc.as_dict(attrs=["name", "exe"])
                data += "* {} {}\n".format(psinfo["name"], psinfo["exe"])
            except psutil.NoSuchProcess:
                pass

        data += "\n\nProjects"
        for project in Controller.instance().projects.values():
            data += "\n\nProject name: {}\nProject ID: {}\n".format(project.name, project.id)
            for link in project.links.values():
                data += "Link {}: {}".format(link.id, link.debug_link_data)

        return data
项目:ops_agent    作者:sjqzhang    | 项目源码 | 文件源码
def get_connections(self):
        listening = []
        connections = psutil.net_connections()
        for conn in connections:
            if not conn.pid:
                continue
            if conn.status == psutil.CONN_LISTEN:
                ip, port = conn.laddr
                # instance = '%s:%s' % (ip, port)
                if port not in listening:
                    listening.append(port)

        res = []
        clients = {}
        for conn in connections:
            if not conn.pid:
                continue
            if not conn.status == psutil.CONN_ESTABLISHED:
                continue

            ip, port = conn.laddr
            # instance = '%s:%s' % (ip, port)
            if port in listening:
                continue

            try:
                proc = psutil.Process(conn.pid)
            except psutil.NoSuchProcess:
                continue

            current = time.time()
            create_time = proc.create_time()
            if current - create_time < 60 * 5:
                continue

            rip, rport = conn.raddr
            record_id = '%s-%s:%s' % (conn.pid, rip, rport)
            client = clients.setdefault(record_id, {})
            if not client:
                client['client_pid'] = conn.pid
                client['client_process_create_time'] = int(create_time)
                client['client_pname'] = proc.name()
                client['client_cwd'] = proc.cwd()
                client['client_ip'] = ip
                # client['client_ports'] = '%s' % port
                client['client_port_num'] = 1
                client['server_ip'] = rip
                client['server_port'] = rport
                client['server_pid'] = ''
                client['server_pname'] = ''
            else:
                # client['client_ports'] += ', %s' % port
                client['client_port_num'] += 1

        res = clients.values()
        res.sort(key=lambda x: (x['client_pid'], x['server_port']))
        return res