Python netaddr 模块,iter_iprange() 实例源码

我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用netaddr.iter_iprange()

项目:cluster-genesis    作者:open-power-ref-design-toolkit    | 项目源码 | 文件源码
def load_network_ips(inventory):
    networks = inventory['networks']
    available_network_ips = {}
    for net_name, net in networks.iteritems():
        if 'available-ips' in net:
            ip_list_raw = net.get('available-ips')
            ip_list_out = []
            for ip in ip_list_raw:
                if ' ' in ip:
                    ip_range = ip.split()
                    for _ip in netaddr.iter_iprange(ip_range[0], ip_range[1]):
                        ip_list_out.append(_ip)
                else:
                    ip_list_out.append(ip)
            available_network_ips[net_name] = ip_list_out

    return available_network_ips
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_iprange_boundaries():
    assert list(iter_iprange('192.0.2.0', '192.0.2.7')) == [
        IPAddress('192.0.2.0'),
        IPAddress('192.0.2.1'),
        IPAddress('192.0.2.2'),
        IPAddress('192.0.2.3'),
        IPAddress('192.0.2.4'),
        IPAddress('192.0.2.5'),
        IPAddress('192.0.2.6'),
        IPAddress('192.0.2.7'),
    ]

    assert list(iter_iprange('::ffff:192.0.2.0', '::ffff:192.0.2.7')) == [
        IPAddress('::ffff:192.0.2.0'),
        IPAddress('::ffff:192.0.2.1'),
        IPAddress('::ffff:192.0.2.2'),
        IPAddress('::ffff:192.0.2.3'),
        IPAddress('::ffff:192.0.2.4'),
        IPAddress('::ffff:192.0.2.5'),
        IPAddress('::ffff:192.0.2.6'),
        IPAddress('::ffff:192.0.2.7'),
    ]
项目:event-driven-security    作者:acantril    | 项目源码 | 文件源码
def test_iprange_boundaries():
    assert list(iter_iprange('192.0.2.0', '192.0.2.7')) == [
        IPAddress('192.0.2.0'),
        IPAddress('192.0.2.1'),
        IPAddress('192.0.2.2'),
        IPAddress('192.0.2.3'),
        IPAddress('192.0.2.4'),
        IPAddress('192.0.2.5'),
        IPAddress('192.0.2.6'),
        IPAddress('192.0.2.7'),
    ]

    assert list(iter_iprange('::ffff:192.0.2.0', '::ffff:192.0.2.7')) == [
        IPAddress('::ffff:192.0.2.0'),
        IPAddress('::ffff:192.0.2.1'),
        IPAddress('::ffff:192.0.2.2'),
        IPAddress('::ffff:192.0.2.3'),
        IPAddress('::ffff:192.0.2.4'),
        IPAddress('::ffff:192.0.2.5'),
        IPAddress('::ffff:192.0.2.6'),
        IPAddress('::ffff:192.0.2.7'),
    ]
项目:aCloudGuru-Event-Driven-Security    作者:mikegchambers    | 项目源码 | 文件源码
def test_iprange_boundaries():
    assert list(iter_iprange('192.0.2.0', '192.0.2.7')) == [
        IPAddress('192.0.2.0'),
        IPAddress('192.0.2.1'),
        IPAddress('192.0.2.2'),
        IPAddress('192.0.2.3'),
        IPAddress('192.0.2.4'),
        IPAddress('192.0.2.5'),
        IPAddress('192.0.2.6'),
        IPAddress('192.0.2.7'),
    ]

    assert list(iter_iprange('::ffff:192.0.2.0', '::ffff:192.0.2.7')) == [
        IPAddress('::ffff:192.0.2.0'),
        IPAddress('::ffff:192.0.2.1'),
        IPAddress('::ffff:192.0.2.2'),
        IPAddress('::ffff:192.0.2.3'),
        IPAddress('::ffff:192.0.2.4'),
        IPAddress('::ffff:192.0.2.5'),
        IPAddress('::ffff:192.0.2.6'),
        IPAddress('::ffff:192.0.2.7'),
    ]
项目:steth    作者:openstack    | 项目源码 | 文件源码
def get_ip_range(start, end):
    generator = iter_iprange(start, end, step=1)
    ips = []
    while True:
        try:
            ips.append(str(generator.next()))
        except StopIteration:
            break
    return ips
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ip_range():
    ip_list = list(iter_iprange('192.0.2.1', '192.0.2.14'))

    assert len(ip_list) == 14

    assert ip_list == [
        IPAddress('192.0.2.1'),
        IPAddress('192.0.2.2'),
        IPAddress('192.0.2.3'),
        IPAddress('192.0.2.4'),
        IPAddress('192.0.2.5'),
        IPAddress('192.0.2.6'),
        IPAddress('192.0.2.7'),
        IPAddress('192.0.2.8'),
        IPAddress('192.0.2.9'),
        IPAddress('192.0.2.10'),
        IPAddress('192.0.2.11'),
        IPAddress('192.0.2.12'),
        IPAddress('192.0.2.13'),
        IPAddress('192.0.2.14'),
    ]

    assert cidr_merge(ip_list) == [
        IPNetwork('192.0.2.1/32'),
        IPNetwork('192.0.2.2/31'),
        IPNetwork('192.0.2.4/30'),
        IPNetwork('192.0.2.8/30'),
        IPNetwork('192.0.2.12/31'),
        IPNetwork('192.0.2.14/32'),
    ]
项目:Python-Network-Programming-Cookbook-Second-Edition    作者:PacktPublishing    | 项目源码 | 文件源码
def gen_mako_macro():
    return '''<%
    import netaddr
    from itertools import islice

    it = netaddr.iter_iprange('100.0.0.0','160.0.0.0')

    def gen_paths(num):
        return list('{0}/32'.format(ip) for ip in islice(it, num))
%>
'''
项目:vio    作者:vmware    | 项目源码 | 文件源码
def get_ip_range(oms_spec, key):
    if key in oms_spec:
        start, end = oms_spec[key].split('-')
        return iter_iprange(start, end)
项目:neutron-dynamic-routing    作者:openstack    | 项目源码 | 文件源码
def get_subnet(self, start='10.10.1.0', end='10.10.255.0', step=256):
        subnet_gen = netaddr.iter_iprange(start, end, step=step)
        i = 1
        while True:
            with self.lock:
                try:
                    yield (i, str(subnet_gen.next()))
                except StopIteration:
                    subnet_gen = netaddr.iter_iprange(start, end, step=step)
                    yield (i, str(subnet_gen.next()))
                i += 1
项目:event-driven-security    作者:acantril    | 项目源码 | 文件源码
def test_ip_range():
    ip_list = list(iter_iprange('192.0.2.1', '192.0.2.14'))

    assert len(ip_list) == 14

    assert ip_list == [
        IPAddress('192.0.2.1'),
        IPAddress('192.0.2.2'),
        IPAddress('192.0.2.3'),
        IPAddress('192.0.2.4'),
        IPAddress('192.0.2.5'),
        IPAddress('192.0.2.6'),
        IPAddress('192.0.2.7'),
        IPAddress('192.0.2.8'),
        IPAddress('192.0.2.9'),
        IPAddress('192.0.2.10'),
        IPAddress('192.0.2.11'),
        IPAddress('192.0.2.12'),
        IPAddress('192.0.2.13'),
        IPAddress('192.0.2.14'),
    ]

    assert cidr_merge(ip_list) == [
        IPNetwork('192.0.2.1/32'),
        IPNetwork('192.0.2.2/31'),
        IPNetwork('192.0.2.4/30'),
        IPNetwork('192.0.2.8/30'),
        IPNetwork('192.0.2.12/31'),
        IPNetwork('192.0.2.14/32'),
    ]
项目:aCloudGuru-Event-Driven-Security    作者:mikegchambers    | 项目源码 | 文件源码
def test_ip_range():
    ip_list = list(iter_iprange('192.0.2.1', '192.0.2.14'))

    assert len(ip_list) == 14

    assert ip_list == [
        IPAddress('192.0.2.1'),
        IPAddress('192.0.2.2'),
        IPAddress('192.0.2.3'),
        IPAddress('192.0.2.4'),
        IPAddress('192.0.2.5'),
        IPAddress('192.0.2.6'),
        IPAddress('192.0.2.7'),
        IPAddress('192.0.2.8'),
        IPAddress('192.0.2.9'),
        IPAddress('192.0.2.10'),
        IPAddress('192.0.2.11'),
        IPAddress('192.0.2.12'),
        IPAddress('192.0.2.13'),
        IPAddress('192.0.2.14'),
    ]

    assert cidr_merge(ip_list) == [
        IPNetwork('192.0.2.1/32'),
        IPNetwork('192.0.2.2/31'),
        IPNetwork('192.0.2.4/30'),
        IPNetwork('192.0.2.8/30'),
        IPNetwork('192.0.2.12/31'),
        IPNetwork('192.0.2.14/32'),
    ]
项目:flask_extras    作者:christabor    | 项目源码 | 文件源码
def valid_hosts(formcls, field):
    """Validate a list of IPs (Ipv4) or hostnames using python stdlib.

    This is more robust than the WTForm version as it also considers hostnames.

    Comma separated values:
    e.g. '10.7.223.101,10.7.12.0'
    Space separated values:
    e.g. '10.223.101 10.7.223.102'
    Ranges:
    e.g. '10.7.223.200-10.7.224.10'
    Hostnames:
    e.g. foo.x.y.com, baz.bar.z.com

    :param formcls (object): The form class.
    :param field (str): The list of ips.
    """
    ip_re = re.compile(r'[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}')
    data = field.data
    if ',' in data:
        ips = [ip for ip in data.split(',') if ip]
    elif ' ' in data:
        ips = [ip for ip in data.split(' ') if ip]
    elif '-' in data and re.match(ip_re, data):
        try:
            start, end = data.split('-')
            ips = iter_iprange(start, end)
            ips = [str(ip) for ip in list(ips)]
        except ValueError:
            raise ValueError(
                'Invalid range specified. Format should be: '
                'XXX.XXX.XXX.XXX-XXX.XXX.XXX.XXX '
                '(e.g. 10.7.223.200-10.7.224.10)')
        except AddrFormatError as e:
            raise ValueError(e)
    else:
        # Just use the single ip
        ips = [data]
    # If any fails conversion, it is invalid.
    for ip in ips:
        # Skip hostnames
        if not is_ip(ip):
            if not is_hostname(ip):
                raise ValueError('Invalid hostname: "{}"'.format(ip))
            else:
                continue
        try:
            socket.inet_aton(ip)
        except socket.error:
            raise ValueError('Invalid IP: {}'.format(ip))
项目:ovn-scale-test    作者:openvswitch    | 项目源码 | 文件源码
def _create_lports(self, lswitch, lport_create_args = [], lport_amount=1):
        LOG.info("create %d lports on lswitch %s" % \
                            (lport_amount, lswitch["name"]))

        self.RESOURCE_NAME_FORMAT = "lport_XXXXXX_XXXXXX"

        batch = lport_create_args.get("batch", lport_amount)

        LOG.info("Create lports method: %s" % self.install_method)
        install_method = self.install_method

        network_cidr = lswitch.get("cidr", None)
        ip_addrs = None
        if network_cidr:
            end_ip = network_cidr.ip + lport_amount
            if not end_ip in network_cidr:
                message = _("Network %s's size is not big enough for %d lports.")
                raise exceptions.InvalidConfigException(
                            message  % (network_cidr, lport_amount))

            ip_addrs = netaddr.iter_iprange(network_cidr.ip, network_cidr.last)

        ovn_nbctl = self.controller_client("ovn-nbctl")
        ovn_nbctl.set_sandbox("controller-sandbox", install_method)
        ovn_nbctl.enable_batch_mode()

        base_mac = [i[:2] for i in self.task["uuid"].split('-')]
        base_mac[0] = str(hex(int(base_mac[0], 16) & 254))
        base_mac[3:] = ['00']*3

        flush_count = batch
        lports = []
        for i in range(lport_amount):
            name = self.generate_random_name()
            lport = ovn_nbctl.lswitch_port_add(lswitch["name"], name)

            ip = str(ip_addrs.next()) if ip_addrs else ""
            mac = utils.get_random_mac(base_mac)

            ovn_nbctl.lport_set_addresses(name, [mac, ip])
            ovn_nbctl.lport_set_port_security(name, mac)

            lports.append(lport)

            flush_count -= 1
            if flush_count < 1:
                ovn_nbctl.flush()
                flush_count = batch

        ovn_nbctl.flush()  # ensure all commands be run
        ovn_nbctl.enable_batch_mode(False)
        return lports
项目:ovn-scale-test    作者:openvswitch    | 项目源码 | 文件源码
def main():
    module = AnsibleModule(
        argument_spec=dict(
            start_cidr=dict(required=True),
            num_emulation_hosts=dict(required=False, default="null"),
            num_ip=dict(required=True)
        ),
        supports_check_mode=True
    )

    start_cidr = module.params['start_cidr']
    num_emulation_hosts = module.params['num_emulation_hosts']
    num_ip = module.params['num_ip']

    sandbox_cidr = netaddr.IPNetwork(start_cidr)
    sandbox_hosts = netaddr.iter_iprange(sandbox_cidr.ip, sandbox_cidr.last)

    ip_data = t_ip_data()

    chassis_per_host = int(num_ip) / int(num_emulation_hosts)
    overflow = 0
    for i in range(0, int(num_ip)):
        '''
        cidr = start_cidr_ip.split('.')[0] + "." + \
               start_cidr_ip.split('.')[1] + "." + \
               start_cidr_ip.split('.')[2] + "." + \
               str(int(start_cidr_ip.split('.')[3]) + i)
        '''
        # ip_data.index.append(i % int(num_emulation_hosts))
        index = i / chassis_per_host
        if (index >= int(num_emulation_hosts)):
            index = int(num_emulation_hosts) - 1
            overflow += 1
        ip_data.index.append(index)
        ip_data.ip_list.append(str(sandbox_hosts.next()))

    farm_data = t_farm_data()
    num_sandbox = 0
    sandbox_hosts = netaddr.iter_iprange(sandbox_cidr.ip, sandbox_cidr.last)
    for i in range(0, int(num_emulation_hosts)):
        farm_data.farm_index.append(i)

        num_sandbox = chassis_per_host
        if (i == int(num_emulation_hosts) - 1):
            num_sandbox = chassis_per_host + overflow
        farm_data.num_sandbox_farm.append(num_sandbox)

        farm_data.start_cidr_farm.append(str(sandbox_hosts.next()))
        for i in range (0, num_sandbox - 1):
            sandbox_hosts.next()

    module.exit_json(changed=True,ip_index=ip_data.index, \
                     ip_index_list=str(ip_data.ip_list), \
                     prefixlen=str(sandbox_cidr.prefixlen),
                     farm_index=farm_data.farm_index,
                     num_sandbox_farm=farm_data.num_sandbox_farm,
                     start_cidr_farm=farm_data.start_cidr_farm)

# import module snippets