Python ipaddr 模块,IPv4Network() 实例源码

我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用ipaddr.IPv4Network()

项目:usres_monitor    作者:pierky    | 项目源码 | 文件源码
def get_sre(net, target_prefix_len):
        """Calculate first and last /target_prefix_len subnets from net

        Returns: first, last, cnt (all integers)
        """

        assert isinstance(net, (ipaddr.IPv4Network, ipaddr.IPv6Network))
        assert target_prefix_len <= 64, "Max target prefix length is 64"
        assert net.prefixlen <= target_prefix_len, \
            ("Prefix length ({}) must be <= of the target prefix "
             "length ({}): {}".format(net.prefixlen, target_prefix_len, net))

        first = UniqueSmallestRoutableEntriesMonitor.get_first(net)

        tot_len = 64 if net.version == 6 else 32

        # first <= 2^63 -1 to avoid overflows
        assert first <= 9223372036854775807, \
            "Only prefixes <= 7fff:ffff:ffff:ffff::/64 can be processed"

        diff_len = target_prefix_len - net.prefixlen

        last = first | ((2**diff_len - 1) << tot_len - target_prefix_len)

        return first, last, 2**diff_len
项目:pycroft    作者:agdsn    | 项目源码 | 文件源码
def get_free_ip(subnets):
    for subnet in subnets:
        reserved = subnet.reserved_addresses
        net = ipaddr.IPv4Network(subnet.address)
        used_ips = [ip.address for ip in subnet.ips]

        if (net.numhosts -2 - reserved) <= len(used_ips):
            continue

        for ip in net.iterhosts():
            if reserved > 0:
                reserved -= 1
                continue
            if ip.compressed not in used_ips:
                return ip.compressed

    raise SubnetFullException()
项目:hacker-scripts    作者:restran    | 项目源码 | 文件源码
def fetch_url(self, i, fn_on_response):
        item = self.get_next_task()
        while item is not None:
            try:
                if '/' in item:
                    mask = ipaddr.IPv4Network(item)
                    ip_list = [text_type(t) for t in mask.iterhosts()]
                else:
                    ip_list = [item]
            except:
                ip_list = []

            for t in ip_list:
                if t == '':
                    continue

                url_list = ['http://%s:%s' % (t, p) for p in self.port_list]
                url_list.extend(['https://%s:%s' % (t, p) for p in [443, 8443]])
                for u in url_list:
                    yield self.do_request(u, 'GET', fn_on_response)
            item = self.get_next_task()
项目:hacker-scripts    作者:restran    | 项目源码 | 文件源码
def on_queue_empty(self, queue, max_num=100):
        for _ in range(max_num):
            try:
                item = self.list_data.popleft()
                if '/' in item:
                    mask = ipaddr.IPv4Network(item)
                    ip_list = [text_type(t) for t in mask.iterhosts()]
                else:
                    ip_list = [item]
                queue.extend(ip_list)
            except IndexError:
                break

        try:
            item = queue.popleft()
        except IndexError:
            item = None
        return item
项目:mass-ipv4-whois    作者:marklit    | 项目源码 | 文件源码
def is_reserved(ip):
    return (ipaddr.IPv4Network(ip).is_multicast |
            ipaddr.IPv4Network(ip).is_private |
            ipaddr.IPv4Network(ip).is_link_local |
            ipaddr.IPv4Network(ip).is_loopback |
            ipaddr.IPv4Network(ip).is_reserved)
项目:usres_monitor    作者:pierky    | 项目源码 | 文件源码
def get_net(net):
        if isinstance(net, (ipaddr.IPv4Network, ipaddr.IPv6Network)):
            return net
        return ipaddr.IPNetwork(net)
项目:usres_monitor    作者:pierky    | 项目源码 | 文件源码
def get_first(net):
        assert isinstance(net, (ipaddr.IPv4Network, ipaddr.IPv6Network))
        if net.version == 6:
            return int(net.network) >> 64
        else:
            return int(net.network)
项目:dnsAutoRebinding    作者:Tr3jer    | 项目源码 | 文件源码
def ipListBuild(address):
    print '1. Single IP Covert For En\n2. Build IP List'
    opt_req = raw_input("[+] [1 By Default/2]") or '1'
    if opt_req == '1':
        print numToEnToNum(address)
        exit()

    conf_main = conf_read('maindomain')[:-1]
    seg_len = raw_input("[+] Please Input Segment Length [24 By Default]") or 24
    encode_req = raw_input("[+] Please Input Encoding ['ipv4' By Default]")
    mainDomain = raw_input("[+] Please Input Server Root Address [{} By Default]".format(conf_main)) or conf_main
    segment = eval("ipaddr.IPv4Network('{}/{}').iterhosts()".format(address, int(seg_len)))
    save_file = "{}_{}_{}.txt".format(time.strftime("%Y%m%d%X", time.localtime()).replace(':', ''), mainDomain.replace('.','_'),(encode_req if encode_req else 'ipv4'))
    results = []

    try:

        if encode_req == '': results += ["{}.{}".format(str(i),mainDomain) for i in list(segment)]
        elif encode_req == 'en':
            results += ["{}.{}".format(numToEnToNum(str(i)),mainDomain) for i in list(segment)]
        elif encode_req == 'int':
            results += ["{}.{}".format(int(ipaddr.IPAddress(str(i))),mainDomain) for i in list(segment)]
        elif encode_req == 'hex':
            results += ["{}.{}".format(str(i).encode('hex'),mainDomain) for i in list(segment)]
        else:
            pass

        f = open(save_file,'a')
        [f.write(i+'\n') for i in results]
        f.close()
        print '[+] Stored in the {}'.format(save_file)
    except Exception,e:
        print e
        exit()
项目:sonic-mgmt    作者:Azure    | 项目源码 | 文件源码
def default(self, obj):
        if isinstance(obj,
                      (ipaddress.IPv4Network, ipaddress.IPv6Network, ipaddress.IPv4Address, ipaddress.IPv6Address)):
            return str(obj)
        return json.JSONEncoder.default(self, obj)
项目:VulScript    作者:y1ng1996    | 项目源码 | 文件源码
def main():
    targetlist=[]
    if len(argv)>2:
        if argv[1]=='-f':
            return extract_target(argv[2])
        if argv[1]=='-i':
            port=6379
            if len(argv)==5:
                port=int(argv[4])
            targets = ipaddr.IPv4Network(argv[2])
            for tar in targets:
                targetlist.append("%s:%d"%(tar,port))
            return targetlist
项目:pycroft    作者:agdsn    | 项目源码 | 文件源码
def select_subnet_for_ip(ip, subnets):
    for subnet in subnets:
        if ipaddr.IPAddress(ip) in ipaddr.IPv4Network(subnet.address):
            return subnet
项目:ooniprobe-debian    作者:TheTorProject    | 项目源码 | 文件源码
def __init__(self):
        self.ipv4_networks = []
        for ip in WHATSAPP_IPV4.split("\n"):
            try:
                self.ipv4_networks.append(ipaddr.IPv4Network(ip))
            except Exception:
                log.err("IP is wrong")
                log.msg(ip)
        self.ipv6_networks = map(ipaddr.IPv6Network,
                                 WHATSAPP_IPV6.split("\n"))
项目:TrustlookWannaCryToolkit    作者:apkjet    | 项目源码 | 文件源码
def main():
    """main function"""
    global timeout
    opt = parse_options()

    scan_network = opt.network
    try:
        timeout = float(opt.timeout)
    except ValueError:
        timeout = 0.5
        print("Use default timeout {}".format(timeout))

    hosts = []

    if '/' in scan_network:
        network = IPv4Network(scan_network)
        for host in network.iterhosts():
            hosts.append(str(host))
        print('start to scan network {} for {} hosts...'.format(str(network), len(hosts)))
        pool = multiprocessing.Pool(processes=16)
        pool.map(scan, hosts)
        pool.close()
        pool.join()
    else:
        print('start to scan host {}'.format(scan_network))
        scan(scan_network)

    print("done")
项目:sonic-buildimage    作者:Azure    | 项目源码 | 文件源码
def default(self, obj):
        if isinstance(obj, (
            ipaddress.IPv4Network, ipaddress.IPv6Network,
            ipaddress.IPv4Address, ipaddress.IPv6Address
            )):
            return str(obj)
        return json.JSONEncoder.default(self, obj)
项目:rest_api    作者:opentargets    | 项目源码 | 文件源码
def get(self):
        esstore = current_app.extensions['es_access_store']
        mpstore = current_app.extensions['mp_access_store']
        args = self.parser.parse_args()
        event = args['event'][:120]
        auth_token=request.headers.get('Auth-Token')
        ip_resolver = current_app.config['IP_RESOLVER']
        ip = RateLimiter.get_remote_addr()
        ip_net = IPNetwork(ip)
        resolved_org = ip_resolver['default']
        for net, org in ip_resolver.items():
            if isinstance(net, (IPv4Network, IPv6Network)):
                if net.overlaps(ip_net):
                    resolved_org = org
                    break
        data = dict(ip=ip,
                    org=resolved_org,
                    host=request.host,
                    timestamp=datetime.now(),
                    event=event)
        if auth_token:
            payload = TokenAuthentication.get_payload_from_token(auth_token)
            data['app_name'] = payload['app_name']
        # esstore.store_event(data)
        mpstore.store_event(data)
        data['timestamp']= str(data['timestamp'])
        return CTTVResponse.OK(SimpleResult(None, data=data))
项目:usres_monitor    作者:pierky    | 项目源码 | 文件源码
def add_random_net(ip_ver, target_prefix_len):

    def dump_vars():
        print("target_prefix_len", target_prefix_len)
        print("prefix_len", prefix_len)
        print("max_prefix_len", max_prefix_len)
        print("max_range_len", max_range_len)
        print("max_range", max_range)
        print("rand", rand)
        print("net_id", net_id)
        print("ip", ip)
        print("net", net)
        print("str(net.ip)", str(net.ip))
        print("str(net.network)", str(net.network))

    if ip_ver == 4:
        prefix_len = random.randint(8, target_prefix_len-1)
        max_range_len = prefix_len
        max_prefix_len = 32
        ip_class = ipaddr.IPv4Address
        net_class = ipaddr.IPv4Network
    else:
        prefix_len = random.randint(19, target_prefix_len-1)
        max_range_len = prefix_len - 1 # to avoid overflow
        max_prefix_len = 64
        ip_class = ipaddr.IPv6Address
        net_class = ipaddr.IPv6Network

    max_range = 2**max_range_len - 1
    rand = random.randint(0, max_range)
    net_id = rand << max_prefix_len - prefix_len
    if ip_ver == 6:
        net_id = net_id << 64
    ip = ip_class(net_id)
    net = net_class("{}/{}".format(str(ip), prefix_len))

    try:
        assert str(net.ip) == str(net.network)
    except:
        dump_vars()
        raise

    try:
        usres_monitor.add_net(net)
    except USRESMonitorException as e:
        if "it was already in the db" in str(e):
            return "dup", net
    except:
        dump_vars()

    return "ok", net