Python netaddr 模块,IPRange() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用netaddr.IPRange()

项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_iprange_slicing():
    iprange = IPRange('192.0.2.1', '192.0.2.254')

    assert list(iprange[0:3]) == [
        IPAddress('192.0.2.1'),
        IPAddress('192.0.2.2'),
        IPAddress('192.0.2.3'),
    ]

    assert list(iprange[0:10:2]) == [
        IPAddress('192.0.2.1'),
        IPAddress('192.0.2.3'),
        IPAddress('192.0.2.5'),
        IPAddress('192.0.2.7'),
        IPAddress('192.0.2.9'),
    ]

    assert list(iprange[0:1024:512]) == [IPAddress('192.0.2.1')]
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_iprange_pickling_v6():
    iprange = IPRange('::ffff:192.0.2.1', '::ffff:192.0.2.254')

    assert iprange == IPRange('::ffff:192.0.2.1', '::ffff:192.0.2.254')
    assert iprange.first == 281473902969345
    assert iprange.last == 281473902969598
    assert iprange.version == 6

    buf = pickle.dumps(iprange)

    iprange2 = pickle.loads(buf)

    assert iprange2 == iprange
    assert iprange2.first == 281473902969345
    assert iprange2.last == 281473902969598
    assert iprange2.version == 6
项目:enos    作者:BeyondTheClouds    | 项目源码 | 文件源码
def pop_ip(env=None):
    """Picks an ip from env['provider_net'].

    It will first take ips in the extra_ips if possible.
    extra_ips is a list of isolated ips whereas ips described
    by the [provider_net.start, provider.end] range is a continuous
    list of ips.
    """
    # Construct the pool of ips
    extra_ips = env['provider_net'].get('extra_ips', [])
    if len(extra_ips) > 0:
        ip = extra_ips.pop()
        env['provider_net']['extra_ips'] = extra_ips
        return ip

    ips = list(IPRange(env['provider_net']['start'],
                       env['provider_net']['end']))

    # Get the next ip
    ip = str(ips.pop())

    # Remove this ip from the env
    env['provider_net']['end'] = str(ips.pop())

    return ip
项目:quark    作者:openstack    | 项目源码 | 文件源码
def _build_excludes(self):
        self._validate_allocation_pools()
        subnet_net = netaddr.IPNetwork(self._subnet_cidr)
        version = subnet_net.version
        cidrset = netaddr.IPSet(
            netaddr.IPRange(
                netaddr.IPAddress(subnet_net.first, version=version),
                netaddr.IPAddress(subnet_net.last, version=version)).cidrs())

        if isinstance(self._alloc_pools, list):
            for p in self._alloc_pools:
                start = netaddr.IPAddress(p["start"])
                end = netaddr.IPAddress(p["end"])
                cidrset -= netaddr.IPSet(netaddr.IPRange(
                    netaddr.IPAddress(start),
                    netaddr.IPAddress(end)).cidrs())
        elif self._alloc_pools is None:
            # Empty list is completely unallocatable, None is fully
            # allocatable
            cidrset = netaddr.IPSet()

        for p in self._policies:
            cidrset.add(netaddr.IPNetwork(p))

        self._exclude_cidrs = cidrset
项目:event-driven-security    作者:acantril    | 项目源码 | 文件源码
def test_iprange_slicing():
    iprange = IPRange('192.0.2.1', '192.0.2.254')

    assert list(iprange[0:3]) == [
        IPAddress('192.0.2.1'),
        IPAddress('192.0.2.2'),
        IPAddress('192.0.2.3'),
    ]

    assert list(iprange[0:10:2]) == [
        IPAddress('192.0.2.1'),
        IPAddress('192.0.2.3'),
        IPAddress('192.0.2.5'),
        IPAddress('192.0.2.7'),
        IPAddress('192.0.2.9'),
    ]

    assert list(iprange[0:1024:512]) == [IPAddress('192.0.2.1')]
项目:event-driven-security    作者:acantril    | 项目源码 | 文件源码
def test_iprange_pickling_v6():
    iprange = IPRange('::ffff:192.0.2.1', '::ffff:192.0.2.254')

    assert iprange == IPRange('::ffff:192.0.2.1', '::ffff:192.0.2.254')
    assert iprange.first == 281473902969345
    assert iprange.last == 281473902969598
    assert iprange.version == 6

    buf = pickle.dumps(iprange)

    iprange2 = pickle.loads(buf)

    assert iprange2 == iprange
    assert iprange2.first == 281473902969345
    assert iprange2.last == 281473902969598
    assert iprange2.version == 6
项目:px    作者:genotrance    | 项目源码 | 文件源码
def parsenoproxy(noproxy):
    global NOPROXY

    nops = [i.strip() for i in noproxy.split(",")]
    for nop in nops:
        if not nop:
            continue

        try:
            if "-" in nop:
                spl = nop.split("-", 1)
                ipns = netaddr.IPRange(spl[0], spl[1])
            elif "*" in nop:
                ipns = netaddr.IPGlob(nop)
            else:
                ipns = netaddr.IPNetwork(nop)
            NOPROXY.add(ipns)
        except:
            print("Bad noproxy IP definition")
            sys.exit()
项目:pat    作者:GusKhawaja    | 项目源码 | 文件源码
def _get_ip_addresses(self,input_data):
        ip_addresses = []

        if "-" in input_data:
            input_data_splitted = input_data.split('-')
            first_ip_address = input_data_splitted[0]
            first_ip_address_splitted = first_ip_address.split('.')
            second_ip_address = '%s.%s.%s.%s'%(first_ip_address_splitted[0],first_ip_address_splitted[1],first_ip_address_splitted[2],input_data_splitted[1])
            ip_addresses = IPRange(first_ip_address,second_ip_address)

        elif "," in input_data:
            ip_addresses = input_data.split(',')

        else:
            ip_addresses = IPNetwork(input_data)

        return ip_addresses

    # Description: Process the terminal arguments
    # Return: (void)
项目:aCloudGuru-Event-Driven-Security    作者:mikegchambers    | 项目源码 | 文件源码
def test_iprange_slicing():
    iprange = IPRange('192.0.2.1', '192.0.2.254')

    assert list(iprange[0:3]) == [
        IPAddress('192.0.2.1'),
        IPAddress('192.0.2.2'),
        IPAddress('192.0.2.3'),
    ]

    assert list(iprange[0:10:2]) == [
        IPAddress('192.0.2.1'),
        IPAddress('192.0.2.3'),
        IPAddress('192.0.2.5'),
        IPAddress('192.0.2.7'),
        IPAddress('192.0.2.9'),
    ]

    assert list(iprange[0:1024:512]) == [IPAddress('192.0.2.1')]
项目:aCloudGuru-Event-Driven-Security    作者:mikegchambers    | 项目源码 | 文件源码
def test_iprange_pickling_v6():
    iprange = IPRange('::ffff:192.0.2.1', '::ffff:192.0.2.254')

    assert iprange == IPRange('::ffff:192.0.2.1', '::ffff:192.0.2.254')
    assert iprange.first == 281473902969345
    assert iprange.last == 281473902969598
    assert iprange.version == 6

    buf = pickle.dumps(iprange)

    iprange2 = pickle.loads(buf)

    assert iprange2 == iprange
    assert iprange2.first == 281473902969345
    assert iprange2.last == 281473902969598
    assert iprange2.version == 6
项目:ws-backend-community    作者:lavalamp-    | 项目源码 | 文件源码
def __init__(self, blacklist_string):
        """
        Initialize a IPBlacklistEntry object based on the blacklist_string
        argument.
        :param blacklist_string: The string to build an IPBlacklistEntry object
         from.
        :return: None
        """
        self._ip_string = blacklist_string[:blacklist_string.find(",")].strip()
        self._range_name = blacklist_string[blacklist_string.find(",") + 1:].strip()
        if "-" in self._ip_string:
            range_start = self._ip_string[:self._ip_string.find("-")].strip()
            range_end = self._ip_string[self._ip_string.find("-") + 1:].strip()
            self._ip_range = IPRange(range_start, range_end)
        else:
            self._ip_range = IPNetwork(self._ip_string)

    # Static Methods

    # Class Methods

    # Public Methods
项目:cthulhu    作者:sholsapp    | 项目源码 | 文件源码
def main(instances, docker_bridge, docker_container, fixture_root, fixture_name):

    try:
        addrs = netifaces.ifaddresses(docker_bridge)
    except ValueError:
        click.secho('It appears {0} is not a valid newtork interface on this system.'.format(
            docker_bridge), fg='red')
        sys.exit(1)

    try:
        docker_bridge_addr = IPAddress(addrs[netifaces.AF_INET][0]['addr'])
    except IndexError:
        click.secho('It appears {0} does not have an address at this time.'.format(docker_bridge), fg='red')
        sys.exit(1)

    network = list(IPRange(docker_bridge_addr + 1,
                           docker_bridge_addr + 1 + instances - 1))

    if os.path.exists(os.path.join(fixture_root, fixture_name)):
        click.secho('[ERROR] ', fg='red', nl=False)
        click.secho('A fixture named {0} already exists.'.format(fixture_name))
        sys.exit(1)

    fixture_ctx = FixtureContext(fixture_root, fixture_name)

    for instance in range(0, len(network)):
        # TODO(sholsapp): We might want to specify different containers for
        # different instances one day.
        instance_ctx = InstanceContext(
            fixture_ctx.fixture_root, instance, network, docker_container)
        click.secho('Creating instance {0} at {1}... '.format(
            instance_ctx.instance, instance_ctx.node_root), nl=False)
        fixture_ctx.instances.append(instance_ctx)
        click.secho('[GOOD]', fg='green')
    fixture_ctx.render()
项目:mitmfnz    作者:dropnz    | 项目源码 | 文件源码
def get_range(self, targets):
        if targets is None:
            return None

        try:
            target_list = []
            for target in targets.split(','):

                if '/' in target:
                    target_list.extend(list(IPNetwork(target)))

                elif '-' in target:
                    start_addr = IPAddress(target.split('-')[0])
                    try:
                        end_addr = IPAddress(target.split('-')[1])
                        ip_range = IPRange(start_addr, end_addr)
                    except AddrFormatError:
                        end_addr = list(start_addr.words)
                        end_addr[-1] = target.split('-')[1]
                        end_addr = IPAddress('.'.join(map(str, end_addr)))
                        ip_range = IPRange(start_addr, end_addr)

                    target_list.extend(list(ip_range))

                else:
                    target_list.append(IPAddress(target))

            return target_list

        except AddrFormatError:
            sys.exit("Specified an invalid IP address/range/network as target")
项目:orangengine    作者:lampwins    | 项目源码 | 文件源码
def _in_network(value, p_value, exact_match=False):
        """

        """

        # 'any' address is an automatic match if we are exact
        if exact_match and 'any' in p_value:
            return True

        addresses = [IPRange(a.split('-')[0], a.split('-')[1]) if '-' in a else IPNetwork(a)
                     for a in value if is_ipv4(a)]
        fqdns = [a for a in value if not is_ipv4(a)]
        p_addresses = [IPRange(a.split('-')[0], a.split('-')[1]) if '-' in a else IPNetwork(a)
                       for a in p_value if is_ipv4(a)]
        p_fqdns = [a for a in p_value if not is_ipv4(a)]

        # network containment implies exact match... i think?
        for a in addresses:
            addr_result = any(a == b or a in b for b in p_addresses)
            if not addr_result:
                return False

        # now match the fqdns
        if exact_match:
            fqdn_result = set(fqdns) == set(p_fqdns)
        else:
            fqdn_result = set(p_fqdns).issubset(set(fqdns))

        return fqdn_result
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ipset_basic_api():
    range1 = IPRange('192.0.2.1', '192.0.2.15')

    ip_list = [
        IPAddress('192.0.2.1'),
        '192.0.2.2/31',
        IPNetwork('192.0.2.4/31'),
        IPAddress('192.0.2.6'),
        IPAddress('192.0.2.7'),
        '192.0.2.8',
        '192.0.2.9',
        IPAddress('192.0.2.10'),
        IPAddress('192.0.2.11'),
        IPNetwork('192.0.2.12/30'),
    ]

    set1 = IPSet(range1.cidrs())

    set2 = IPSet(ip_list)

    assert set2 == IPSet([
        '192.0.2.1/32',
        '192.0.2.2/31',
        '192.0.2.4/30',
        '192.0.2.8/29',
    ])

    assert set1 == set2
    assert set2.pop() in set1
    assert set1 != set2
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ipset_constructor():
    assert IPSet(['192.0.2.0']) == IPSet(['192.0.2.0/32'])
    assert IPSet([IPAddress('192.0.2.0')]) == IPSet(['192.0.2.0/32'])
    assert IPSet([IPNetwork('192.0.2.0')]) == IPSet(['192.0.2.0/32'])
    assert IPSet(IPNetwork('1234::/32')) == IPSet(['1234::/32'])
    assert IPSet([IPNetwork('192.0.2.0/24')]) == IPSet(['192.0.2.0/24'])
    assert IPSet(IPSet(['192.0.2.0/32'])) == IPSet(['192.0.2.0/32'])
    assert IPSet(IPRange("10.0.0.0", "10.0.1.31")) == IPSet(['10.0.0.0/24', '10.0.1.0/27'])
    assert IPSet(IPRange('0.0.0.0', '255.255.255.255')) == IPSet(['0.0.0.0/0'])
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ipset_member_insertion_and_deletion():
    s1 = IPSet()
    s1.add('192.0.2.0')
    assert s1 == IPSet(['192.0.2.0/32'])

    s1.remove('192.0.2.0')
    assert s1 == IPSet([])

    s1.add(IPRange("10.0.0.0", "10.0.0.255"))
    assert s1 == IPSet(['10.0.0.0/24'])

    s1.remove(IPRange("10.0.0.128", "10.10.10.10"))
    assert s1 == IPSet(['10.0.0.0/25'])
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ipset_membership():
    iprange = IPRange('192.0.1.255', '192.0.2.16')

    assert iprange.cidrs() == [
        IPNetwork('192.0.1.255/32'),
        IPNetwork('192.0.2.0/28'),
        IPNetwork('192.0.2.16/32'),
    ]

    ipset = IPSet(['192.0.2.0/28'])

    assert [(str(ip), ip in ipset) for ip in iprange] == [
        ('192.0.1.255', False),
        ('192.0.2.0', True),
        ('192.0.2.1', True),
        ('192.0.2.2', True),
        ('192.0.2.3', True),
        ('192.0.2.4', True),
        ('192.0.2.5', True),
        ('192.0.2.6', True),
        ('192.0.2.7', True),
        ('192.0.2.8', True),
        ('192.0.2.9', True),
        ('192.0.2.10', True),
        ('192.0.2.11', True),
        ('192.0.2.12', True),
        ('192.0.2.13', True),
        ('192.0.2.14', True),
        ('192.0.2.15', True),
        ('192.0.2.16', False),
    ]
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ipset_clear():
    ipset = IPSet(['10.0.0.0/16'])
    ipset.update(IPRange('10.1.0.0', '10.1.255.255'))
    assert ipset == IPSet(['10.0.0.0/15'])

    ipset.clear()
    assert ipset == IPSet([])
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_converting_ipsets_to_ipranges():
    assert list(IPSet().iter_ipranges()) == []
    assert list(IPSet([IPAddress('10.0.0.1')]).iter_ipranges()) == [IPRange('10.0.0.1', '10.0.0.1')]
    assert list(IPSet([IPAddress('10.0.0.1'), IPAddress('10.0.0.2')]).iter_ipranges()) == [IPRange('10.0.0.1', '10.0.0.2')]
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_len_on_ipset_failure_with_large_ipv6_addresses():
    s1 = IPSet(IPRange(IPAddress("::0"), IPAddress(_sys_maxint, 6)))
    with pytest.raises(IndexError):
        len(s1)

    s2 = IPSet(IPRange(IPAddress("::0"), IPAddress(_sys_maxint - 1, 6)))
    assert len(s2) == _sys_maxint
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ipset_ipv4_and_ipv4_separation():
    assert list(IPSet([IPAddress(1, 4), IPAddress(1, 6)]).iter_ipranges()) == [IPRange('0.0.0.1', '0.0.0.1'), IPRange('::1', '::1')]
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_iprange_boolean_evaluation():
    assert bool(IPRange('0.0.0.0', '255.255.255.255'))
    assert bool(IPRange('0.0.0.0', '0.0.0.0'))
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_iprange_indexing():
    iprange = IPRange('192.0.2.1', '192.0.2.254')

    assert len(iprange) == 254
    assert iprange.first == 3221225985
    assert iprange.last == 3221226238
    assert iprange[0] == IPAddress('192.0.2.1')
    assert iprange[-1] == IPAddress('192.0.2.254')

    with pytest.raises(IndexError):
        iprange[512]
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_iprange_ipv6_unsupported_slicing():
    with pytest.raises(TypeError):
        IPRange('::ffff:192.0.2.1', '::ffff:192.0.2.254')[0:10:2]
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_iprange_membership():
    assert IPRange('192.0.2.5', '192.0.2.10') in IPRange('192.0.2.1', '192.0.2.254')
    assert IPRange('fe80::1', 'fe80::fffe') in IPRange('fe80::', 'fe80::ffff:ffff:ffff:ffff')
    assert IPRange('192.0.2.5', '192.0.2.10') not in IPRange('::', '::255.255.255.255')
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_more_iprange_sorting():
    ipranges = (IPRange('192.0.2.40', '192.0.2.50'), IPRange('192.0.2.20', '192.0.2.30'), IPRange('192.0.2.1', '192.0.2.254'),)

    assert sorted(ipranges) == [IPRange('192.0.2.1', '192.0.2.254'), IPRange('192.0.2.20', '192.0.2.30'), IPRange('192.0.2.40', '192.0.2.50')]

    ipranges = list(ipranges)
    ipranges.append(IPRange('192.0.2.45', '192.0.2.49'))

    assert sorted(ipranges) == [
        IPRange('192.0.2.1', '192.0.2.254'),
        IPRange('192.0.2.20', '192.0.2.30'),
        IPRange('192.0.2.40', '192.0.2.50'),
        IPRange('192.0.2.45', '192.0.2.49'),
    ]
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_iprange_info_and_properties():
    iprange = IPRange('192.0.2.1', '192.0.2.254')

    assert eval(str(iprange.info)) == {
        'IPv4': [{
            'date': '1993-05',
            'designation': 'Administered by ARIN',
            'prefix': '192/8',
            'status': 'Legacy',
            'whois': 'whois.arin.net'}]
    }

    assert iprange.is_reserved()

    assert iprange.version == 4
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_iprange_invalid_len_and_alternative():
    range1 = IPRange(IPAddress("::0"), IPAddress(_sys_maxint, 6))

    with pytest.raises(IndexError):
        len(range1)

    range2 = IPRange(IPAddress("::0"), IPAddress(_sys_maxint - 1, 6))
    assert len(range2) == _sys_maxint
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_iprange_pickling_v4():
    iprange = IPRange('192.0.2.1', '192.0.2.254')
    assert iprange == IPRange('192.0.2.1', '192.0.2.254')
    assert iprange.first == 3221225985
    assert iprange.last == 3221226238
    assert iprange.version == 4

    buf = pickle.dumps(iprange)
    iprange2 = pickle.loads(buf)
    assert iprange2 == iprange
    assert id(iprange2) != id(iprange)
    assert iprange2.first == 3221225985
    assert iprange2.last == 3221226238
    assert iprange2.version == 4
项目:patrole    作者:openstack    | 项目源码 | 文件源码
def resource_setup(cls):
        super(PortsRbacTest, cls).resource_setup()
        # Create a network and subnet.
        cls.network = cls.create_network()
        cls.cidr = netaddr.IPNetwork(CONF.network.project_network_cidr)
        cls.subnet = cls.create_subnet(cls.network, cidr=cls.cidr,
                                       mask_bits=24)
        cls.ip_range = netaddr.IPRange(
            cls.subnet['allocation_pools'][0]['start'],
            cls.subnet['allocation_pools'][0]['end'])

        cls.port = cls.create_port(cls.network)
        ipaddr = cls.port['fixed_ips'][0]['ip_address']
        cls.port_ip_address = ipaddr
        cls.port_mac_address = cls.port['mac_address']
项目:patrole    作者:openstack    | 项目源码 | 文件源码
def resource_setup(cls):
        super(RouterRbacTest, cls).resource_setup()
        # Create a network with external gateway so that
        # ``external_gateway_info`` policies can be validated.
        post_body = {'router:external': True}
        cls.network = cls.create_network(**post_body)
        cls.subnet = cls.create_subnet(cls.network)
        cls.ip_range = netaddr.IPRange(
            cls.subnet['allocation_pools'][0]['start'],
            cls.subnet['allocation_pools'][0]['end'])
        cls.router = cls.create_router()
项目:piSociEty    作者:paranoidninja    | 项目源码 | 文件源码
def get_range(self, targets):
        if targets is None:
            return None

        try:
            target_list = []
            for target in targets.split(','):

                if '/' in target:
                    target_list.extend(list(IPNetwork(target)))

                elif '-' in target:
                    start_addr = IPAddress(target.split('-')[0])
                    try:
                        end_addr = IPAddress(target.split('-')[1])
                        ip_range = IPRange(start_addr, end_addr)
                    except AddrFormatError:
                        end_addr = list(start_addr.words)
                        end_addr[-1] = target.split('-')[1]
                        end_addr = IPAddress('.'.join(map(str, end_addr)))
                        ip_range = IPRange(start_addr, end_addr)

                    target_list.extend(list(ip_range))

                else:
                    target_list.append(IPAddress(target))

            return target_list

        except AddrFormatError:
            sys.exit("Specified an invalid IP address/range/network as target")
项目:quark    作者:openstack    | 项目源码 | 文件源码
def _pool_is_growing(original_pool, new_pool):
    # create IPSet for original pool
    ori_set = netaddr.IPSet()
    for rng in original_pool._alloc_pools:
        ori_set.add(netaddr.IPRange(rng['start'], rng['end']))

    # create IPSet for net pool
    new_set = netaddr.IPSet()
    for rng in new_pool._alloc_pools:
        new_set.add(netaddr.IPRange(rng['start'], rng['end']))

    # we are growing the original set is not a superset of the new set
    return not ori_set.issuperset(new_set)
项目:quark    作者:openstack    | 项目源码 | 文件源码
def test_update_allocation_pools(self):
        cidr = "192.168.1.0/24"
        ip_network = netaddr.IPNetwork(cidr)
        network = dict(name="public", tenant_id="fake", network_plugin="BASE")
        network = {"network": network}
        subnet = dict(id=1, ip_version=4, next_auto_assign_ip=2,
                      cidr=cidr, first_ip=ip_network.first,
                      last_ip=ip_network.last, ip_policy=None,
                      tenant_id="fake")
        subnet = {"subnet": subnet}
        with self._stubs(network, subnet) as (net, sub1):
            subnet = subnet_api.get_subnet(self.context, 1)
            start_pools = subnet['allocation_pools']
            new_pools = [
                [dict(start='192.168.1.10', end='192.168.1.50')],
                [dict(start='192.168.1.5', end='192.168.1.25')],
                [dict(start='192.168.1.50', end='192.168.1.51')],
                [dict(start='192.168.1.50', end='192.168.1.51'),
                    dict(start='192.168.1.100', end='192.168.1.250')],
                [dict(start='192.168.1.50', end='192.168.1.51')],
                start_pools,
            ]
            prev_pool = start_pools
            for pool in new_pools:
                subnet_update = {"subnet": dict(allocation_pools=pool)}
                subnet = subnet_api.update_subnet(self.context, 1,
                                                  subnet_update)
                self.assertNotEqual(prev_pool, subnet['allocation_pools'])
                self.assertEqual(pool, subnet['allocation_pools'])
                policies = policy_api.get_ip_policies(self.context)
                self.assertEqual(1, len(policies))
                policy = policies[0]
                ip_set = netaddr.IPSet()
                for ip in policy['exclude']:
                    ip_set.add(netaddr.IPNetwork(ip))
                for extent in pool:
                    for ip in netaddr.IPRange(extent['start'], extent['end']):
                        self.assertFalse(ip in ip_set)
                prev_pool = pool
项目:quark    作者:openstack    | 项目源码 | 文件源码
def test_allow_allocation_pool_growth(self):
        CONF.set_override('allow_allocation_pool_growth', True, 'QUARK')
        cidr = "192.168.1.0/24"
        ip_network = netaddr.IPNetwork(cidr)
        network = dict(name="public", tenant_id="fake", network_plugin="BASE")
        network = {"network": network}
        pool = [dict(start='192.168.1.15', end='192.168.1.30')]
        subnet = dict(id=1, ip_version=4, next_auto_assign_ip=2,
                      cidr=cidr, first_ip=ip_network.first,
                      last_ip=ip_network.last, ip_policy=None,
                      allocation_pools=pool, tenant_id="fake")
        subnet = {"subnet": subnet}
        with self._stubs(network, subnet) as (net, sub1):
            subnet = subnet_api.get_subnet(self.context, 1)
            start_pools = subnet['allocation_pools']
            new_pool = [dict(start='192.168.1.10', end='192.168.1.50')]

            subnet_update = {"subnet": dict(allocation_pools=new_pool)}
            subnet = subnet_api.update_subnet(self.context, 1,
                                              subnet_update)
            self.assertNotEqual(start_pools, subnet['allocation_pools'])
            self.assertEqual(new_pool, subnet['allocation_pools'])
            policies = policy_api.get_ip_policies(self.context)
            self.assertEqual(1, len(policies))
            policy = policies[0]
            ip_set = netaddr.IPSet()
            for ip in policy['exclude']:
                ip_set.add(netaddr.IPNetwork(ip))
            for extent in new_pool:
                for ip in netaddr.IPRange(extent['start'], extent['end']):
                    self.assertFalse(ip in ip_set)

            start_ip_set = netaddr.IPSet()
            for rng in start_pools:
                start_ip_set.add(netaddr.IPRange(rng['start'], rng['end']))

            new_ip_set = netaddr.IPSet()
            for rng in subnet['allocation_pools']:
                new_ip_set.add(netaddr.IPRange(rng['start'], rng['end']))

            self.assertTrue(start_ip_set | new_ip_set != start_ip_set)
项目:quark    作者:openstack    | 项目源码 | 文件源码
def test_do_not_allow_allocation_pool_growth(self):
        CONF.set_override('allow_allocation_pool_growth', False, 'QUARK')
        cidr = "192.168.1.0/24"
        ip_network = netaddr.IPNetwork(cidr)
        network = dict(name="public", tenant_id="fake", network_plugin="BASE")
        network = {"network": network}
        pool = [dict(start='192.168.1.15', end='192.168.1.30')]
        subnet = dict(id=1, ip_version=4, next_auto_assign_ip=2,
                      cidr=cidr, first_ip=ip_network.first,
                      last_ip=ip_network.last, ip_policy=None,
                      allocation_pools=pool, tenant_id="fake")
        subnet = {"subnet": subnet}
        with self._stubs(network, subnet) as (net, sub1):
            subnet = subnet_api.get_subnet(self.context, 1)
            start_pools = subnet['allocation_pools']
            new_pool = [dict(start='192.168.1.10', end='192.168.1.50')]

            start_ip_set = netaddr.IPSet()
            for rng in start_pools:
                start_ip_set.add(netaddr.IPRange(rng['start'], rng['end']))

            new_ip_set = netaddr.IPSet()
            for rng in new_pool:
                new_ip_set.add(netaddr.IPRange(rng['start'], rng['end']))

            self.assertTrue(start_ip_set | new_ip_set != start_ip_set)

            subnet_update = {"subnet": dict(allocation_pools=new_pool)}
            with self.assertRaises(n_exc.BadRequest):
                subnet = subnet_api.update_subnet(self.context, 1,
                                                  subnet_update)
项目:event-driven-security    作者:acantril    | 项目源码 | 文件源码
def test_ipset_basic_api():
    range1 = IPRange('192.0.2.1', '192.0.2.15')

    ip_list = [
        IPAddress('192.0.2.1'),
        '192.0.2.2/31',
        IPNetwork('192.0.2.4/31'),
        IPAddress('192.0.2.6'),
        IPAddress('192.0.2.7'),
        '192.0.2.8',
        '192.0.2.9',
        IPAddress('192.0.2.10'),
        IPAddress('192.0.2.11'),
        IPNetwork('192.0.2.12/30'),
    ]

    set1 = IPSet(range1.cidrs())

    set2 = IPSet(ip_list)

    assert set2 == IPSet([
        '192.0.2.1/32',
        '192.0.2.2/31',
        '192.0.2.4/30',
        '192.0.2.8/29',
    ])

    assert set1 == set2
    assert set2.pop() in set1
    assert set1 != set2
项目:event-driven-security    作者:acantril    | 项目源码 | 文件源码
def test_ipset_constructor():
    assert IPSet(['192.0.2.0']) == IPSet(['192.0.2.0/32'])
    assert IPSet([IPAddress('192.0.2.0')]) == IPSet(['192.0.2.0/32'])
    assert IPSet([IPNetwork('192.0.2.0')]) == IPSet(['192.0.2.0/32'])
    assert IPSet(IPNetwork('1234::/32')) == IPSet(['1234::/32'])
    assert IPSet([IPNetwork('192.0.2.0/24')]) == IPSet(['192.0.2.0/24'])
    assert IPSet(IPSet(['192.0.2.0/32'])) == IPSet(['192.0.2.0/32'])
    assert IPSet(IPRange("10.0.0.0", "10.0.1.31")) == IPSet(['10.0.0.0/24', '10.0.1.0/27'])
    assert IPSet(IPRange('0.0.0.0', '255.255.255.255')) == IPSet(['0.0.0.0/0'])
项目:event-driven-security    作者:acantril    | 项目源码 | 文件源码
def test_ipset_member_insertion_and_deletion():
    s1 = IPSet()
    s1.add('192.0.2.0')
    assert s1 == IPSet(['192.0.2.0/32'])

    s1.remove('192.0.2.0')
    assert s1 == IPSet([])

    s1.add(IPRange("10.0.0.0", "10.0.0.255"))
    assert s1 == IPSet(['10.0.0.0/24'])

    s1.remove(IPRange("10.0.0.128", "10.10.10.10"))
    assert s1 == IPSet(['10.0.0.0/25'])
项目:event-driven-security    作者:acantril    | 项目源码 | 文件源码
def test_ipset_membership():
    iprange = IPRange('192.0.1.255', '192.0.2.16')

    assert iprange.cidrs() == [
        IPNetwork('192.0.1.255/32'),
        IPNetwork('192.0.2.0/28'),
        IPNetwork('192.0.2.16/32'),
    ]

    ipset = IPSet(['192.0.2.0/28'])

    assert [(str(ip), ip in ipset) for ip in iprange] == [
        ('192.0.1.255', False),
        ('192.0.2.0', True),
        ('192.0.2.1', True),
        ('192.0.2.2', True),
        ('192.0.2.3', True),
        ('192.0.2.4', True),
        ('192.0.2.5', True),
        ('192.0.2.6', True),
        ('192.0.2.7', True),
        ('192.0.2.8', True),
        ('192.0.2.9', True),
        ('192.0.2.10', True),
        ('192.0.2.11', True),
        ('192.0.2.12', True),
        ('192.0.2.13', True),
        ('192.0.2.14', True),
        ('192.0.2.15', True),
        ('192.0.2.16', False),
    ]
项目:event-driven-security    作者:acantril    | 项目源码 | 文件源码
def test_ipset_clear():
    ipset = IPSet(['10.0.0.0/16'])
    ipset.update(IPRange('10.1.0.0', '10.1.255.255'))
    assert ipset == IPSet(['10.0.0.0/15'])

    ipset.clear()
    assert ipset == IPSet([])
项目:event-driven-security    作者:acantril    | 项目源码 | 文件源码
def test_converting_ipsets_to_ipranges():
    assert list(IPSet().iter_ipranges()) == []
    assert list(IPSet([IPAddress('10.0.0.1')]).iter_ipranges()) == [IPRange('10.0.0.1', '10.0.0.1')]
    assert list(IPSet([IPAddress('10.0.0.1'), IPAddress('10.0.0.2')]).iter_ipranges()) == [IPRange('10.0.0.1', '10.0.0.2')]
项目:event-driven-security    作者:acantril    | 项目源码 | 文件源码
def test_len_on_ipset_failure_with_large_ipv6_addresses():
    s1 = IPSet(IPRange(IPAddress("::0"), IPAddress(_sys_maxint, 6)))
    with pytest.raises(IndexError):
        len(s1)

    s2 = IPSet(IPRange(IPAddress("::0"), IPAddress(_sys_maxint - 1, 6)))
    assert len(s2) == _sys_maxint
项目:event-driven-security    作者:acantril    | 项目源码 | 文件源码
def test_ipset_ipv4_and_ipv4_separation():
    assert list(IPSet([IPAddress(1, 4), IPAddress(1, 6)]).iter_ipranges()) == [IPRange('0.0.0.1', '0.0.0.1'), IPRange('::1', '::1')]
项目:event-driven-security    作者:acantril    | 项目源码 | 文件源码
def test_iprange_boolean_evaluation():
    assert bool(IPRange('0.0.0.0', '255.255.255.255'))
    assert bool(IPRange('0.0.0.0', '0.0.0.0'))
项目:event-driven-security    作者:acantril    | 项目源码 | 文件源码
def test_iprange_indexing():
    iprange = IPRange('192.0.2.1', '192.0.2.254')

    assert len(iprange) == 254
    assert iprange.first == 3221225985
    assert iprange.last == 3221226238
    assert iprange[0] == IPAddress('192.0.2.1')
    assert iprange[-1] == IPAddress('192.0.2.254')

    with pytest.raises(IndexError):
        iprange[512]
项目:event-driven-security    作者:acantril    | 项目源码 | 文件源码
def test_iprange_ipv6_unsupported_slicing():
    with pytest.raises(TypeError):
        IPRange('::ffff:192.0.2.1', '::ffff:192.0.2.254')[0:10:2]
项目:event-driven-security    作者:acantril    | 项目源码 | 文件源码
def test_iprange_membership():
    assert IPRange('192.0.2.5', '192.0.2.10') in IPRange('192.0.2.1', '192.0.2.254')
    assert IPRange('fe80::1', 'fe80::fffe') in IPRange('fe80::', 'fe80::ffff:ffff:ffff:ffff')
    assert IPRange('192.0.2.5', '192.0.2.10') not in IPRange('::', '::255.255.255.255')
项目:event-driven-security    作者:acantril    | 项目源码 | 文件源码
def test_more_iprange_sorting():
    ipranges = (IPRange('192.0.2.40', '192.0.2.50'), IPRange('192.0.2.20', '192.0.2.30'), IPRange('192.0.2.1', '192.0.2.254'),)

    assert sorted(ipranges) == [IPRange('192.0.2.1', '192.0.2.254'), IPRange('192.0.2.20', '192.0.2.30'), IPRange('192.0.2.40', '192.0.2.50')]

    ipranges = list(ipranges)
    ipranges.append(IPRange('192.0.2.45', '192.0.2.49'))

    assert sorted(ipranges) == [
        IPRange('192.0.2.1', '192.0.2.254'),
        IPRange('192.0.2.20', '192.0.2.30'),
        IPRange('192.0.2.40', '192.0.2.50'),
        IPRange('192.0.2.45', '192.0.2.49'),
    ]