我们从Python开源项目中,提取了以下3个代码示例,用于说明如何使用psutil.CONN_LISTEN。
def _find_ngrok_inspection_port(self): """ :return (str, int): """ for c in psutil.Process(self.ngrok_proc.pid).connections(): if c.laddr[0] == '127.0.0.1' and c.raddr == () and c.status == psutil.CONN_LISTEN: return c.laddr raise RuntimeError('Did not find API interface of ngrok.')
def find_status(): """This util is used to find the status of sshd service. It will identify sshd status using process id of sshd service. input: (No input required) output: {"name": "", "port": "", "status": ""} """ sshd = {"name": "", "port": "", "status": ""} cmd = cmd_utils.Command("systemctl show sshd.service") out, err, rc = cmd.run() if not err: pid = _find_pid(out) if pid != 0: p = psutil.Process(pid) result = [con for con in p.connections() if con.status == psutil.CONN_LISTEN and con.laddr[0] == "0.0.0.0"] if result: sshd["name"] = p.name() sshd["port"] = int(result[0].laddr[1]) sshd["status"] = result[0].status else: err = "Unable to find ssh port number" Event( Message( priority="debug", publisher="commons", payload={"message": err} ) ) else: err = "sshd service is not running" Event( Message( priority="debug", publisher="commons", payload={"message": err} ) ) else: Event( Message( priority="debug", publisher="commons", payload={"message": err} ) ) return sshd, err
def get_connections(self): listening = [] connections = psutil.net_connections() for conn in connections: if not conn.pid: continue if conn.status == psutil.CONN_LISTEN: ip, port = conn.laddr # instance = '%s:%s' % (ip, port) if port not in listening: listening.append(port) res = [] clients = {} for conn in connections: if not conn.pid: continue if not conn.status == psutil.CONN_ESTABLISHED: continue ip, port = conn.laddr # instance = '%s:%s' % (ip, port) if port in listening: continue try: proc = psutil.Process(conn.pid) except psutil.NoSuchProcess: continue current = time.time() create_time = proc.create_time() if current - create_time < 60 * 5: continue rip, rport = conn.raddr record_id = '%s-%s:%s' % (conn.pid, rip, rport) client = clients.setdefault(record_id, {}) if not client: client['client_pid'] = conn.pid client['client_process_create_time'] = int(create_time) client['client_pname'] = proc.name() client['client_cwd'] = proc.cwd() client['client_ip'] = ip # client['client_ports'] = '%s' % port client['client_port_num'] = 1 client['server_ip'] = rip client['server_port'] = rport client['server_pid'] = '' client['server_pname'] = '' else: # client['client_ports'] += ', %s' % port client['client_port_num'] += 1 res = clients.values() res.sort(key=lambda x: (x['client_pid'], x['server_port'])) return res