Python netaddr 模块,IPAddress() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用netaddr.IPAddress()

项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def is_address_in_network(network, address):
    """
    Determine whether the provided address is within a network range.

    :param network (str): CIDR presentation format. For example,
        '192.168.1.0/24'.
    :param address: An individual IPv4 or IPv6 address without a net
        mask or subnet prefix. For example, '192.168.1.1'.
    :returns boolean: Flag indicating whether address is in network.
    """
    try:
        network = netaddr.IPNetwork(network)
    except (netaddr.core.AddrFormatError, ValueError):
        raise ValueError("Network (%s) is not in CIDR presentation format" %
                         network)

    try:
        address = netaddr.IPAddress(address)
    except (netaddr.core.AddrFormatError, ValueError):
        raise ValueError("Address (%s) is not in correct presentation format" %
                         address)

    if address in network:
        return True
    else:
        return False
项目:craton    作者:openstack    | 项目源码 | 文件源码
def make_very_small_cloud(self, with_cell=False):
        project_id = self.make_project('project_1', foo='P1', zoo='P2',
                                       boo='P3')
        cloud_id = self.make_cloud(project_id, 'cloud_1', zoo='CL1')
        region_id = self.make_region(
            project_id,
            cloud_id,
            'region_1',
            foo='R1', bar='R2', bax='R3')
        if with_cell:
            cell_id = self.make_cell(project_id, cloud_id, region_id, 'cell_1',
                                     bar='C2')
        else:
            cell_id = None
        host_id = self.make_host(project_id, cloud_id, region_id,
                                 'www1.example.com',
                                 IPAddress(u'10.1.2.101'), 'server',
                                 cell_id=cell_id, foo='H1', baz='H3')
        return project_id, cloud_id, region_id, cell_id, host_id
项目:craton    作者:openstack    | 项目源码 | 文件源码
def test_host_labels_delete(self):
        cloud_id = self.make_cloud(self.mock_project_id, 'cloud_1')
        region_id = self.make_region(self.mock_project_id, cloud_id,
                                     'region_1',
                                     foo='R1')
        host_id = self.make_host(self.mock_project_id, cloud_id, region_id,
                                 'www.example.xyz',
                                 IPAddress(u'10.1.2.101'),
                                 'server', bar='bar2')
        _labels = {"labels": ["tom", "jerry", "jones"]}
        dbapi.hosts_labels_update(self.context, host_id, _labels)
        host = dbapi.hosts_get_by_id(self.context, host_id)
        self.assertEqual(sorted(host.labels), sorted(_labels["labels"]))
        _dlabels = {"labels": ["tom"]}
        dbapi.hosts_labels_delete(self.context, host_id, _dlabels)
        host = dbapi.hosts_get_by_id(self.context, host_id)
        self.assertEqual(host.labels, {"jerry", "jones"})
项目:craton    作者:openstack    | 项目源码 | 文件源码
def test_hosts_get_all_with_filters_noexist(self):
        project_id = self.make_project('project_1', foo='P1', zoo='P2')
        cloud_id = self.make_cloud(project_id, 'cloud_1')
        region_id = self.make_region(project_id, cloud_id, 'region_1',
                                     foo='R1')
        host_id = self.make_host(project_id, cloud_id, region_id,
                                 'www.example.xyz',
                                 IPAddress(u'10.1.2.101'),
                                 'server')
        variables = {"key1": "value1", "key2": "value2"}
        dbapi.variables_update_by_resource_id(
            self.context, "hosts", host_id, variables
        )
        filters = {
            "region_id": 1,
            "vars": "key1:value5",
        }
        res, _ = dbapi.hosts_get_all(self.context, filters,
                                     default_pagination)
        self.assertEqual(len(res), 0)
项目:craton    作者:openstack    | 项目源码 | 文件源码
def test_hosts_create_sets_parent_id(self):
        project_id = self.make_project('project_1')
        cloud_id = self.make_cloud(project_id, 'cloud_1')
        region_id = self.make_region(project_id, cloud_id, 'region_1')
        parent_id = self.make_host(
            project_id, cloud_id, region_id, '1.www.example.com',
            IPAddress(u'10.1.2.101'), 'server'
        )
        child = dbapi.hosts_create(
            self.context,
            {
                'project_id': project_id,
                'cloud_id': cloud_id,
                'region_id': region_id,
                'hostname': '2.www.example.com',
                'ip_address': IPAddress(u'10.1.2.102'),
                'device_type': 'server',
                'parent_id': parent_id,
            }
        )
        self.assertEqual(parent_id, child.parent_id)
项目:craton    作者:openstack    | 项目源码 | 文件源码
def test_hosts_update_fails_when_parent_id_set_to_own_id(self):
        project_id = self.make_project('project_1')
        cloud_id = self.make_cloud(project_id, 'cloud_1')
        region_id = self.make_region(project_id, cloud_id, 'region_1')
        host1 = dbapi.hosts_create(
            self.context,
            {
                'project_id': project_id,
                'cloud_id': cloud_id,
                'region_id': region_id,
                'hostname': '1.www.example.com',
                'ip_address': IPAddress(u'10.1.2.101'),
                'device_type': 'server',
                'parent_id': None,
            }
        )
        self.assertRaises(
            exceptions.BadRequest,
            dbapi.hosts_update,
            self.context,
            host1.id,
            {
                'parent_id': host1.id,
            }
        )
项目:mos-horizon    作者:Mirantis    | 项目源码 | 文件源码
def validate(self, value):
        super(IPField, self).validate(value)
        if not value and not self.required:
            return

        try:
            if self.mask:
                self.ip = netaddr.IPNetwork(value)
            else:
                self.ip = netaddr.IPAddress(value)
        except Exception:
            raise ValidationError(self.invalid_format_message)

        if not any([self.version & IPv4 > 0 and self.ip.version == 4,
                    self.version & IPv6 > 0 and self.ip.version == 6]):
            raise ValidationError(self.invalid_version_message)

        if self.mask:
            if self.ip.version == 4 and \
                    not self.min_mask <= self.ip.prefixlen <= self.max_v4_mask:
                raise ValidationError(self.invalid_mask_message)

            if self.ip.version == 6 and \
                    not self.min_mask <= self.ip.prefixlen <= self.max_v6_mask:
                raise ValidationError(self.invalid_mask_message)
项目:vmware-nsx-tempest-plugin    作者:openstack    | 项目源码 | 文件源码
def _create_subnet_with_last_subnet_block(cls, network, ip_version=4):
        """Derive last subnet CIDR block from tenant CIDR and
           create the subnet with that derived CIDR
        """
        if ip_version == 4:
            cidr = netaddr.IPNetwork(CONF.network.project_network_cidr)
            mask_bits = CONF.network.project_network_mask_bits
        elif ip_version == 6:
            cidr = netaddr.IPNetwork(CONF.network.project_network_v6_cidr)
            mask_bits = CONF.network.project_network_v6_mask_bits

        subnet_cidr = list(cidr.subnet(mask_bits))[-1]
        gateway_ip = str(netaddr.IPAddress(subnet_cidr) + 1)
        body = cls.create_subnet(network, gateway=gateway_ip,
                                 cidr=subnet_cidr, mask_bits=mask_bits)
        return body['subnet']
项目:vmware-nsx-tempest-plugin    作者:openstack    | 项目源码 | 文件源码
def test_create_load_balancer_missing_vip_address(self):
        """Test create load balancer

        with a missing vip_address field,checks for
        ipversion and actual ip address
        """
        load_balancer = self._create_active_load_balancer(
            vip_subnet_id=self.subnet['id'])
        self.addCleanup(self._delete_load_balancer, load_balancer['id'])
        load_balancer_ip_initial = load_balancer['vip_address']
        ip = netaddr.IPAddress(load_balancer_ip_initial)
        self.assertEqual(ip.version, 4)
        load_balancer = self._show_load_balancer(
            load_balancer['id'])
        load_balancer_final = load_balancer['vip_address']
        self.assertEqual(load_balancer_ip_initial, load_balancer_final)
项目:Trillian    作者:shapeblue    | 项目源码 | 文件源码
def isIpRangeInUse(api_client, publicIpRange):
    ''' Check that if any Ip in the IP Range is in use
        currently
    '''

    vmList = VirtualMachine.list(api_client,
                                 zoneid=publicIpRange.zoneid,
                                 listall=True)
    if not vmList:
        return False

    for vm in vmList:
        for nic in vm.nic:
            publicIpAddresses = PublicIPAddress.list(api_client,
                                                 associatednetworkid=nic.networkid,
                                                 listall=True)
            if validateList(publicIpAddresses)[0] == PASS:
                for ipaddress in publicIpAddresses:
                    if IPAddress(publicIpRange.startip) <=\
                        IPAddress(ipaddress.ipaddress) <=\
                        IPAddress(publicIpRange.endip):
                        return True
    return False
项目:zun    作者:openstack    | 项目源码 | 文件源码
def obj_to_primitive(obj):
    """Recursively turn an object into a python primitive.

    A ZunObject becomes a dict, and anything that implements ObjectListBase
    becomes a list.
    """
    if isinstance(obj, ObjectListBase):
        return [obj_to_primitive(x) for x in obj]
    elif isinstance(obj, ZunObject):
        result = {}
        for key in obj.obj_fields:
            if obj.obj_attr_is_set(key) or key in obj.obj_extra_fields:
                result[key] = obj_to_primitive(getattr(obj, key))
        return result
    elif isinstance(obj, netaddr.IPAddress):
        return str(obj)
    elif isinstance(obj, netaddr.IPNetwork):
        return str(obj)
    else:
        return obj
项目:cerberus-core    作者:ovh    | 项目源码 | 文件源码
def get_ip_network(ip_str):
    """
        Try to return the owner of the IP address (based on ips.py)

        :param str ip_str: The IP address
        :rtype: str
        :return: The owner if find else None
    """
    try:
        ip_addr = netaddr.IPAddress(ip_str)
    except (netaddr.AddrConversionError, netaddr.AddrFormatError):
        return None

    for brand, networks in IPS_NETWORKS.iteritems():
        for net in networks:
            if net.netmask.value & ip_addr.value == net.value:
                return brand
    return None
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ipnetwork_list_operations_v4():
    ip = IPNetwork('192.0.2.16/29')
    assert len(ip) == 8

    ip_list = list(ip)
    assert len(ip_list) == 8

    assert ip_list == [
        IPAddress('192.0.2.16'),
        IPAddress('192.0.2.17'),
        IPAddress('192.0.2.18'),
        IPAddress('192.0.2.19'),
        IPAddress('192.0.2.20'),
        IPAddress('192.0.2.21'),
        IPAddress('192.0.2.22'),
        IPAddress('192.0.2.23'),
    ]
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ipaddress_and_ipnetwork_canonical_sort_order_by_version():
    ip_list = [
        IPAddress('192.0.2.130'),
        IPNetwork('192.0.2.128/28'),
        IPAddress('::'),
        IPNetwork('192.0.3.0/24'),
        IPNetwork('192.0.2.0/24'),
        IPNetwork('fe80::/64'),
        IPNetwork('172.24/12'),
        IPAddress('10.0.0.1'),
    ]

    random.shuffle(ip_list)
    ip_list.sort()

    assert ip_list == [
        IPAddress('10.0.0.1'),
        IPNetwork('172.24.0.0/12'),
        IPNetwork('192.0.2.0/24'),
        IPNetwork('192.0.2.128/28'),
        IPAddress('192.0.2.130'),
        IPNetwork('192.0.3.0/24'),
        IPAddress('::'),
        IPNetwork('fe80::/64'),
    ]
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ipnetwork_slices_v4():
    assert list(IPNetwork('192.0.2.0/29')[0:-1]) == [
        IPAddress('192.0.2.0'),
        IPAddress('192.0.2.1'),
        IPAddress('192.0.2.2'),
        IPAddress('192.0.2.3'),
        IPAddress('192.0.2.4'),
        IPAddress('192.0.2.5'),
        IPAddress('192.0.2.6'),
    ]

    assert list(IPNetwork('192.0.2.0/29')[::-1]) == [
        IPAddress('192.0.2.7'),
        IPAddress('192.0.2.6'),
        IPAddress('192.0.2.5'),
        IPAddress('192.0.2.4'),
        IPAddress('192.0.2.3'),
        IPAddress('192.0.2.2'),
        IPAddress('192.0.2.1'),
        IPAddress('192.0.2.0'),
    ]
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_iterhosts_v4():
    assert list(IPNetwork('192.0.2.0/29').iter_hosts()) == [
        IPAddress('192.0.2.1'),
        IPAddress('192.0.2.2'),
        IPAddress('192.0.2.3'),
        IPAddress('192.0.2.4'),
        IPAddress('192.0.2.5'),
        IPAddress('192.0.2.6'),
    ]


    assert list(IPNetwork("192.168.0.0/31")) == [
        IPAddress('192.168.0.0'),
        IPAddress('192.168.0.1'),
    ]

    assert list(IPNetwork("1234::/128")) == [IPAddress('1234::')]
    assert list(IPNetwork("1234::/128").iter_hosts()) == []
    assert list(IPNetwork("192.168.0.0/31").iter_hosts()) == [IPAddress('192.168.0.0'),IPAddress('192.168.0.1')]
    assert list(IPNetwork("192.168.0.0/32").iter_hosts()) == [IPAddress('192.168.0.0')]
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_advanced_comparisons():
    assert IPNetwork('192.0.2.0/24') == IPNetwork('192.0.2.112/24')

    assert IPNetwork('192.0.2.0/24').ip != IPNetwork('192.0.2.112/24').ip
    assert IPNetwork('192.0.2.0/24').ip < IPNetwork('192.0.2.112/24').ip

    assert IPNetwork('192.0.2.0/24').cidr == IPNetwork('192.0.2.112/24').cidr

    assert IPNetwork('192.0.2.0/24') != IPNetwork('192.0.3.0/24')

    assert IPNetwork('192.0.2.0/24') < IPNetwork('192.0.3.0/24')

    assert IPAddress('192.0.2.0') != IPNetwork('192.0.2.0/32')

    assert IPAddress('192.0.2.0') == IPNetwork('192.0.2.0/32')[0]
    assert IPAddress('192.0.2.0') == IPNetwork('192.0.2.0/32')[-1]

    assert IPAddress('192.0.2.0') == IPNetwork('192.0.2.0/32')[0]

    assert IPAddress('192.0.2.0') == IPNetwork('192.0.2.0/32').ip

    assert IPAddress('192.0.2.0') == IPNetwork('192.0.2.0/32').ip
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ipset_adding_and_removing_members_ip_addresses_as_ints():
    s1 = IPSet(['10.0.0.0/25'])

    s1.add('10.0.0.0/24')
    assert s1 == IPSet(['10.0.0.0/24'])

    integer1 = int(IPAddress('10.0.0.1'))
    integer2 = int(IPAddress('fe80::'))
    integer3 = int(IPAddress('10.0.0.2'))

    s2 = IPSet([integer1, integer2])
    assert s2 == IPSet(['10.0.0.1/32', 'fe80::/128'])

    s2.add(integer3)
    assert s2 == IPSet(['10.0.0.1/32', '10.0.0.2/32', 'fe80::/128'])

    s2.remove(integer2)
    assert s2 == IPSet(['10.0.0.1/32', '10.0.0.2/32'])

    s2.update([integer2])
    assert s2 == IPSet(['10.0.0.1/32', '10.0.0.2/32', 'fe80::/128'])
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_iprange_boundaries():
    assert list(iter_iprange('192.0.2.0', '192.0.2.7')) == [
        IPAddress('192.0.2.0'),
        IPAddress('192.0.2.1'),
        IPAddress('192.0.2.2'),
        IPAddress('192.0.2.3'),
        IPAddress('192.0.2.4'),
        IPAddress('192.0.2.5'),
        IPAddress('192.0.2.6'),
        IPAddress('192.0.2.7'),
    ]

    assert list(iter_iprange('::ffff:192.0.2.0', '::ffff:192.0.2.7')) == [
        IPAddress('::ffff:192.0.2.0'),
        IPAddress('::ffff:192.0.2.1'),
        IPAddress('::ffff:192.0.2.2'),
        IPAddress('::ffff:192.0.2.3'),
        IPAddress('::ffff:192.0.2.4'),
        IPAddress('::ffff:192.0.2.5'),
        IPAddress('::ffff:192.0.2.6'),
        IPAddress('::ffff:192.0.2.7'),
    ]
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_iprange_slicing():
    iprange = IPRange('192.0.2.1', '192.0.2.254')

    assert list(iprange[0:3]) == [
        IPAddress('192.0.2.1'),
        IPAddress('192.0.2.2'),
        IPAddress('192.0.2.3'),
    ]

    assert list(iprange[0:10:2]) == [
        IPAddress('192.0.2.1'),
        IPAddress('192.0.2.3'),
        IPAddress('192.0.2.5'),
        IPAddress('192.0.2.7'),
        IPAddress('192.0.2.9'),
    ]

    assert list(iprange[0:1024:512]) == [IPAddress('192.0.2.1')]
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_iter_nmap_range_with_multiple_targets_including_cidr():
    assert list(iter_nmap_range('192.168.0.0/29', '192.168.3-5,7.1', 'fe80::1')) == [
        IPAddress('192.168.0.0'),
        IPAddress('192.168.0.1'),
        IPAddress('192.168.0.2'),
        IPAddress('192.168.0.3'),
        IPAddress('192.168.0.4'),
        IPAddress('192.168.0.5'),
        IPAddress('192.168.0.6'),
        IPAddress('192.168.0.7'),
        IPAddress('192.168.3.1'),
        IPAddress('192.168.4.1'),
        IPAddress('192.168.5.1'),
        IPAddress('192.168.7.1'),
        IPAddress('fe80::1'),
    ]
项目:networking-ovn    作者:netgroup-polito    | 项目源码 | 文件源码
def _get_subnet_dhcp_options_for_port(self, port, ip_version):
        """Returns the subnet dhcp options for the port.

        Return the first found DHCP options belong for the port.
        """
        subnets = [
            fixed_ip['subnet_id']
            for fixed_ip in port['fixed_ips']
            if netaddr.IPAddress(fixed_ip['ip_address']).version == ip_version]
        get_opts = self._nb_ovn.get_subnets_dhcp_options(subnets)
        if get_opts:
            if ip_version == const.IP_VERSION_6:
                # Always try to find a dhcpv6 stateful v6 subnet to return.
                # This ensures port can get one stateful v6 address when port
                # has multiple dhcpv6 stateful and stateless subnets.
                for opts in get_opts:
                    # We are setting ovn_const.DHCPV6_STATELESS_OPT to "true"
                    # in _get_ovn_dhcpv6_opts, so entries in DHCP_Options table
                    # should have unicode type 'true' if they were defined as
                    # dhcpv6 stateless.
                    if opts['options'].get(
                        ovn_const.DHCPV6_STATELESS_OPT) != 'true':
                        return opts
            return get_opts[0]
项目:napalm-nxos    作者:napalm-automation    | 项目源码 | 文件源码
def _get_ntp_entity(self, peer_type):
        ntp_entities = {}
        command = 'show ntp peers'
        output = self.device.send_command(command)

        for line in output.splitlines():
            # Skip first two lines and last line of command output
            if line == "" or '-----' in line or 'Peer IP Address' in line:
                continue
            elif IPAddress(len(line.split()[0])).is_unicast:
                peer_addr = line.split()[0]
                ntp_entities[peer_addr] = {}
            else:
                raise ValueError("Did not correctly find a Peer IP Address")

        return ntp_entities
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def is_ipv6(address):
    """Determine whether provided address is IPv6 or not."""
    try:
        address = netaddr.IPAddress(address)
    except netaddr.AddrFormatError:
        # probably a hostname - so not an address at all!
        return False

    return address.version == 6
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def _get_for_address(address, key):
    """Retrieve an attribute of or the physical interface that
    the IP address provided could be bound to.

    :param address (str): An individual IPv4 or IPv6 address without a net
        mask or subnet prefix. For example, '192.168.1.1'.
    :param key: 'iface' for the physical interface name or an attribute
        of the configured interface, for example 'netmask'.
    :returns str: Requested attribute or None if address is not bindable.
    """
    address = netaddr.IPAddress(address)
    for iface in netifaces.interfaces():
        addresses = netifaces.ifaddresses(iface)
        if address.version == 4 and netifaces.AF_INET in addresses:
            addr = addresses[netifaces.AF_INET][0]['addr']
            netmask = addresses[netifaces.AF_INET][0]['netmask']
            network = netaddr.IPNetwork("%s/%s" % (addr, netmask))
            cidr = network.cidr
            if address in cidr:
                if key == 'iface':
                    return iface
                else:
                    return addresses[netifaces.AF_INET][0][key]

        if address.version == 6 and netifaces.AF_INET6 in addresses:
            for addr in addresses[netifaces.AF_INET6]:
                if not addr['addr'].startswith('fe80'):
                    network = netaddr.IPNetwork("%s/%s" % (addr['addr'],
                                                           addr['netmask']))
                    cidr = network.cidr
                    if address in cidr:
                        if key == 'iface':
                            return iface
                        elif key == 'netmask' and cidr:
                            return str(cidr).split('/')[1]
                        else:
                            return addr[key]

    return None
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def run(self):
        Analyzer.run(self)

        data = self.getParam('data', None, 'Data is missing')

        if self.data_type != 'fqdn' and self.data_type != 'ip':
            self.error('Invalid data type')

        if self.allowed_networks is not None:
            if self.data_type == 'fqdn':
                address = IPAddress(socket.gethostbyname(data))
            else:
                try:
                    address = IPAddress(data)
                except Exception as e:
                    self.error("{}".format(e))
            if not any(address in IPNetwork(network)
                for network in self.allowed_networks):
                self.error('Invalid target: not in any allowed network')

        scanner_args = {
            'url': self.url, 'login': self.login, 'password': self.password }
        if self.ca_bundle is not None:
            scanner_args.update({'ca_bundle': self.ca_bundle})
        else:
            scanner_args.update({'insecure': True})

        try:
            scanner = ness6rest.Scanner(**scanner_args)
            scanner.policy_set(name=self.policy)
            scanner.scan_add(targets=data, name="cortex scan for " + data)

            self._run_scan(scanner)
            results = self._get_scan_results(scanner)
            self._delete_scan(scanner)
        except Exception as ex:
            self.error('Scanner error: %s' % ex)

        self.report(results)
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def is_ipv6(address):
    """Determine whether provided address is IPv6 or not."""
    try:
        address = netaddr.IPAddress(address)
    except netaddr.AddrFormatError:
        # probably a hostname - so not an address at all!
        return False

    return address.version == 6
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def is_address_in_network(network, address):
    """
    Determine whether the provided address is within a network range.

    :param network (str): CIDR presentation format. For example,
        '192.168.1.0/24'.
    :param address: An individual IPv4 or IPv6 address without a net
        mask or subnet prefix. For example, '192.168.1.1'.
    :returns boolean: Flag indicating whether address is in network.
    """
    try:
        network = netaddr.IPNetwork(network)
    except (netaddr.core.AddrFormatError, ValueError):
        raise ValueError("Network (%s) is not in CIDR presentation format" %
                         network)

    try:
        address = netaddr.IPAddress(address)
    except (netaddr.core.AddrFormatError, ValueError):
        raise ValueError("Address (%s) is not in correct presentation format" %
                         address)

    if address in network:
        return True
    else:
        return False
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def _get_for_address(address, key):
    """Retrieve an attribute of or the physical interface that
    the IP address provided could be bound to.

    :param address (str): An individual IPv4 or IPv6 address without a net
        mask or subnet prefix. For example, '192.168.1.1'.
    :param key: 'iface' for the physical interface name or an attribute
        of the configured interface, for example 'netmask'.
    :returns str: Requested attribute or None if address is not bindable.
    """
    address = netaddr.IPAddress(address)
    for iface in netifaces.interfaces():
        addresses = netifaces.ifaddresses(iface)
        if address.version == 4 and netifaces.AF_INET in addresses:
            addr = addresses[netifaces.AF_INET][0]['addr']
            netmask = addresses[netifaces.AF_INET][0]['netmask']
            network = netaddr.IPNetwork("%s/%s" % (addr, netmask))
            cidr = network.cidr
            if address in cidr:
                if key == 'iface':
                    return iface
                else:
                    return addresses[netifaces.AF_INET][0][key]

        if address.version == 6 and netifaces.AF_INET6 in addresses:
            for addr in addresses[netifaces.AF_INET6]:
                network = _get_ipv6_network_from_address(addr)
                if not network:
                    continue

                cidr = network.cidr
                if address in cidr:
                    if key == 'iface':
                        return iface
                    elif key == 'netmask' and cidr:
                        return str(cidr).split('/')[1]
                    else:
                        return addr[key]
    return None
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def is_ip(address):
    """
    Returns True if address is a valid IP address.
    """
    try:
        # Test to see if already an IPv4/IPv6 address
        address = netaddr.IPAddress(address)
        return True
    except (netaddr.AddrFormatError, ValueError):
        return False
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def is_ipv6(address):
    """Determine whether provided address is IPv6 or not."""
    try:
        address = netaddr.IPAddress(address)
    except netaddr.AddrFormatError:
        # probably a hostname - so not an address at all!
        return False

    return address.version == 6
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def _get_for_address(address, key):
    """Retrieve an attribute of or the physical interface that
    the IP address provided could be bound to.

    :param address (str): An individual IPv4 or IPv6 address without a net
        mask or subnet prefix. For example, '192.168.1.1'.
    :param key: 'iface' for the physical interface name or an attribute
        of the configured interface, for example 'netmask'.
    :returns str: Requested attribute or None if address is not bindable.
    """
    address = netaddr.IPAddress(address)
    for iface in netifaces.interfaces():
        addresses = netifaces.ifaddresses(iface)
        if address.version == 4 and netifaces.AF_INET in addresses:
            addr = addresses[netifaces.AF_INET][0]['addr']
            netmask = addresses[netifaces.AF_INET][0]['netmask']
            network = netaddr.IPNetwork("%s/%s" % (addr, netmask))
            cidr = network.cidr
            if address in cidr:
                if key == 'iface':
                    return iface
                else:
                    return addresses[netifaces.AF_INET][0][key]

        if address.version == 6 and netifaces.AF_INET6 in addresses:
            for addr in addresses[netifaces.AF_INET6]:
                network = _get_ipv6_network_from_address(addr)
                if not network:
                    continue

                cidr = network.cidr
                if address in cidr:
                    if key == 'iface':
                        return iface
                    elif key == 'netmask' and cidr:
                        return str(cidr).split('/')[1]
                    else:
                        return addr[key]
    return None
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def is_ip(address):
    """
    Returns True if address is a valid IP address.
    """
    try:
        # Test to see if already an IPv4/IPv6 address
        address = netaddr.IPAddress(address)
        return True
    except (netaddr.AddrFormatError, ValueError):
        return False
项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def ip_addr_version(addr, resolve=True, timeout=dns_default_timeout()):
    """Determine what IP version an address, CIDR block or hostname
    represents.  When resolving hostnames to an IP, the search order
    will be A followed by AAAA.

    The returned tuple is (version, ip), where version is 4, 6 or None
    of nothing can be determined and ip is the ip address supplied or
    resolved.
    """

    # Chop out any CIDR suffix.

    slash_index = addr.rfind('/')
    if slash_index > 0:
        try:
            int(addr[slash_index + 1:])
            addr = addr[:slash_index]
        except ValueError:
            # Do nothing; will try to resolve if doing that.
            pass

    try:
        return (netaddr.IPAddress(addr).version, addr)
    except (netaddr.core.AddrFormatError, ValueError):
        # Don't care, will resolve.
        pass

    if not resolve:
        return (None, None)

    for ip_version in [4, 6]:
        resolved = dns_resolve(addr, ip_version=ip_version, timeout=timeout)
        if resolved is not None:
            return (ip_addr_version(resolved, resolve=False)[0], resolved)

    return (None, None)


#
# Find common IP version between two addresses
项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def __refresh(self):
        """
        Update the list of local interfaces if needed
        """
        if self.addresses is None \
           or datetime.datetime.now() > self.expires:

            self.addresses = {}

            if_regex = r'%.*$'

            # Netifaces returns a very deep structure.
            for ifhash in [netifaces.ifaddresses(iface)
                           for iface in netifaces.interfaces()]:
                for afamily in ifhash:
                    for iface in ifhash[afamily]:
                        address = re.sub(if_regex, '', iface["addr"])
                        try:
                            addr_object = netaddr.IPAddress(address)
                            self.addresses[addr_object] = 1
                        except netaddr.core.AddrFormatError:
                            # Don't care about things that don't look like IPS.
                            pass

            self.expires = datetime.datetime.now() \
                + datetime.timedelta(seconds=self.refresh)
项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def __contains__(self, item):
        """
        Determine if item is in the address list
        """

        self.__refresh()
        item_ip = netaddr.IPAddress(item)
        return item_ip in self.addresses
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def _address(addresses, network):
    """
    Return all addresses in the given network

    Note: list comprehension vs. netaddr vs. simple
    """
    matched = []
    for address in addresses:
        log.debug("_address: ip {} in network {} ".format(address, network))
        if IPAddress(address) in IPNetwork(network):
            matched.append(address)
    return matched
项目:craton    作者:openstack    | 项目源码 | 文件源码
def setUp(self):
        super().setUp()
        project_id = self.make_project('project_1')
        cloud_id = self.make_cloud(project_id, 'cloud_1')
        region_id = self.make_region(project_id, cloud_id, 'region_1')
        net_device1_id = self.make_network_device(
            project_id, cloud_id, region_id, 'switch1.example.com',
            IPAddress('10.1.2.101'), 'switch'
        )
        net_device2_id = self.make_network_device(
            project_id, cloud_id, region_id, 'switch2.example.com',
            IPAddress('10.1.2.102'), 'switch', parent_id=net_device1_id
        )
        host1_id = self.make_host(
            project_id, cloud_id, region_id, 'www1.example.com',
            IPAddress(u'10.1.2.103'), 'server', parent_id=net_device2_id
        )
        host2_id = self.make_host(
            project_id, cloud_id, region_id, 'www2.example.com',
            IPAddress(u'10.1.2.104'), 'container', parent_id=host1_id
        )
        host3_id = self.make_host(
            project_id, cloud_id, region_id, 'www3.example.com',
            IPAddress(u'10.1.2.105'), 'server'
        )

        self.parent = net_device1_id
        self.children = [net_device2_id]
        self.descendants = [net_device2_id, host1_id, host2_id]
        self.all = [
            net_device1_id, net_device2_id, host1_id, host2_id, host3_id
        ]
项目:craton    作者:openstack    | 项目源码 | 文件源码
def test_hosts_create_duplicate_raises(self):
        cloud_id = self.make_cloud(self.mock_project_id, 'cloud_1')
        region_id = self.make_region(self.mock_project_id, cloud_id,
                                     'region_1')
        self.make_host(self.mock_project_id, cloud_id, region_id,
                       'www1.example.com',
                       IPAddress(u'10.1.2.101'), 'server')
        new_host = {'name': 'www1.example.com', 'region_id': region_id,
                    'ip_address': IPAddress(u'10.1.2.101'),
                    'device_type': 'server',
                    'cloud_id': cloud_id, 'project_id': self.mock_project_id}
        self.assertRaises(exceptions.DuplicateDevice, dbapi.hosts_create,
                          self.context, new_host)
项目:craton    作者:openstack    | 项目源码 | 文件源码
def test_hosts_update(self):
        cloud_id = self.make_cloud(self.mock_project_id, 'cloud_1')
        region_id = self.make_region(self.mock_project_id, cloud_id,
                                     'region_1')
        host_id = self.make_host(self.mock_project_id, cloud_id, region_id,
                                 'example',
                                 IPAddress(u'10.1.2.101'), 'server',
                                 bar='bar2')
        name = "Host_New"
        res = dbapi.hosts_update(self.context, host_id, {'name': 'Host_New'})
        self.assertEqual(res.name, name)
项目:craton    作者:openstack    | 项目源码 | 文件源码
def test_hosts_resolved_vars_no_cells(self):
        project_id = self.make_project('project_1')
        cloud_id = self.make_cloud(project_id, 'cloud_1')
        region_id = self.make_region(project_id, cloud_id, 'region_1',
                                     foo='R1')
        host_id = self.make_host(project_id, cloud_id, region_id,
                                 'www.example.xyz',
                                 IPAddress(u'10.1.2.101'),
                                 'server', bar='bar2')
        host = dbapi.hosts_get_by_id(self.context, host_id)
        self.assertEqual(host.name, 'www.example.xyz')
        self.assertEqual(host.resolved, {'bar': 'bar2', 'foo': 'R1'})
项目:craton    作者:openstack    | 项目源码 | 文件源码
def test_host_labels_create(self):
        cloud_id = self.make_cloud(self.mock_project_id, 'cloud_1')
        region_id = self.make_region(self.mock_project_id, cloud_id,
                                     'region_1',
                                     foo='R1')
        host_id = self.make_host(self.mock_project_id, cloud_id, region_id,
                                 'www.example.xyz',
                                 IPAddress(u'10.1.2.101'),
                                 'server', bar='bar2')
        labels = {"labels": ["tom", "jerry"]}
        dbapi.hosts_labels_update(self.context, host_id, labels)
项目:craton    作者:openstack    | 项目源码 | 文件源码
def test_hosts_get_all_with_label_filters(self):
        cloud_id = self.make_cloud(self.mock_project_id, 'cloud_1')
        region_id = self.make_region(self.mock_project_id, cloud_id,
                                     'region_1')
        labels = {"labels": ["compute"]}
        host1 = self.make_host(
            self.mock_project_id,
            cloud_id,
            region_id,
            'www1.example.com',
            IPAddress(u'10.1.2.101'),
            'server',
        )
        dbapi.hosts_labels_update(self.context, host1, labels)

        self.make_host(
            self.mock_project_id,
            cloud_id,
            region_id,
            'www1.example2.com',
            IPAddress(u'10.1.2.102'),
            'server',
        )
        res, _ = dbapi.hosts_get_all(self.context, {"label": "compute"},
                                     default_pagination)

        self.assertEqual(len(res), 1)
        self.assertEqual(res[0].name, 'www1.example.com')
项目:craton    作者:openstack    | 项目源码 | 文件源码
def test_hosts_get_with_key_value_filters(self):
        project_id = self.make_project('project_1', foo='P1', zoo='P2')
        cloud_id = self.make_cloud(project_id, 'cloud_1')
        region_id = self.make_region(project_id, cloud_id, 'region_1',
                                     foo='R1')
        host1 = self.make_host(project_id, cloud_id, region_id,
                               'www.example.xyz',
                               IPAddress(u'10.1.2.101'),
                               'server')
        variables = {"key1": "example1", "key2": "Tom"}
        dbapi.variables_update_by_resource_id(
            self.context, "hosts", host1, variables
        )
        # Second host with own variables
        host2 = self.make_host(project_id, cloud_id, region_id,
                               'www.example2.xyz',
                               IPAddress(u'10.1.2.102'),
                               'server')
        variables = {"key1": "example2", "key2": "Tom"}
        dbapi.variables_update_by_resource_id(
            self.context, "hosts", host2, variables
        )
        filters = {"vars": "key1:example2"}

        res, _ = dbapi.hosts_get_all(self.context, filters, default_pagination)
        self.assertEqual(len(res), 1)
        self.assertEqual('www.example2.xyz', res[0].name)

        filters = {"vars": "key2:Tom"}
        res, _ = dbapi.hosts_get_all(self.context, filters, default_pagination)
        self.assertEqual(len(res), 2)
项目:craton    作者:openstack    | 项目源码 | 文件源码
def test_hosts_update_sets_parent_id(self):
        project_id = self.make_project('project_1')
        cloud_id = self.make_cloud(project_id, 'cloud_1')
        region_id = self.make_region(project_id, cloud_id, 'region_1')
        parent_id = self.make_host(
            project_id, cloud_id, region_id, '1.www.example.com',
            IPAddress(u'10.1.2.101'), 'server'
        )
        child = dbapi.hosts_create(
            self.context,
            {
                'project_id': project_id,
                'cloud_id': cloud_id,
                'region_id': region_id,
                'hostname': '2.www.example.com',
                'ip_address': IPAddress(u'10.1.2.102'),
                'device_type': 'server',
                'parent_id': None,
            }
        )
        self.assertIsNone(child.parent_id)
        child_update = dbapi.hosts_update(
            self.context,
            child.id,
            {
                'parent_id': parent_id,
            }
        )
        self.assertEqual(parent_id, child_update.parent_id)
项目:craton    作者:openstack    | 项目源码 | 文件源码
def test_hosts_get_all_with_resolved_var_filters(self):
        project_id = self.make_project('project_1', foo='P1', zoo='P2')
        cloud_id = self.make_cloud(project_id, 'cloud_1')
        region_id = self.make_region(
            project_id, cloud_id, 'region_1', foo='R1')
        switch_id = self.make_network_device(
            project_id, cloud_id, region_id,
            'switch1.example.com', IPAddress('10.1.2.101'), 'switch',
            zoo='S1', bar='S2')
        self.make_host(
            project_id, cloud_id, region_id,
            'www.example.xyz', IPAddress(u'10.1.2.101'), 'server',
            parent_id=switch_id,
            key1="value1", key2="value2")
        self.make_host(
            project_id, cloud_id, region_id,
            'www2.example.xyz', IPAddress(u'10.1.2.102'), 'server',
            parent_id=switch_id,
            key1="value-will-not-match", key2="value2")

        filters = {
            "region_id": 1,
            "vars": "key1:value1,zoo:S1,foo:R1",
            "resolved-values": True,
        }
        res, _ = dbapi.hosts_get_all(
            self.context, filters, default_pagination)
        self.assertEqual(len(res), 1)
        self.assertEqual(res[0].name, 'www.example.xyz')
项目:charm-heat    作者:openstack    | 项目源码 | 文件源码
def is_ipv6(address):
    """Determine whether provided address is IPv6 or not."""
    try:
        address = netaddr.IPAddress(address)
    except netaddr.AddrFormatError:
        # probably a hostname - so not an address at all!
        return False

    return address.version == 6
项目:charm-heat    作者:openstack    | 项目源码 | 文件源码
def is_address_in_network(network, address):
    """
    Determine whether the provided address is within a network range.

    :param network (str): CIDR presentation format. For example,
        '192.168.1.0/24'.
    :param address: An individual IPv4 or IPv6 address without a net
        mask or subnet prefix. For example, '192.168.1.1'.
    :returns boolean: Flag indicating whether address is in network.
    """
    try:
        network = netaddr.IPNetwork(network)
    except (netaddr.core.AddrFormatError, ValueError):
        raise ValueError("Network (%s) is not in CIDR presentation format" %
                         network)

    try:
        address = netaddr.IPAddress(address)
    except (netaddr.core.AddrFormatError, ValueError):
        raise ValueError("Address (%s) is not in correct presentation format" %
                         address)

    if address in network:
        return True
    else:
        return False
项目:charm-heat    作者:openstack    | 项目源码 | 文件源码
def _get_for_address(address, key):
    """Retrieve an attribute of or the physical interface that
    the IP address provided could be bound to.

    :param address (str): An individual IPv4 or IPv6 address without a net
        mask or subnet prefix. For example, '192.168.1.1'.
    :param key: 'iface' for the physical interface name or an attribute
        of the configured interface, for example 'netmask'.
    :returns str: Requested attribute or None if address is not bindable.
    """
    address = netaddr.IPAddress(address)
    for iface in netifaces.interfaces():
        addresses = netifaces.ifaddresses(iface)
        if address.version == 4 and netifaces.AF_INET in addresses:
            addr = addresses[netifaces.AF_INET][0]['addr']
            netmask = addresses[netifaces.AF_INET][0]['netmask']
            network = netaddr.IPNetwork("%s/%s" % (addr, netmask))
            cidr = network.cidr
            if address in cidr:
                if key == 'iface':
                    return iface
                else:
                    return addresses[netifaces.AF_INET][0][key]

        if address.version == 6 and netifaces.AF_INET6 in addresses:
            for addr in addresses[netifaces.AF_INET6]:
                network = _get_ipv6_network_from_address(addr)
                if not network:
                    continue

                cidr = network.cidr
                if address in cidr:
                    if key == 'iface':
                        return iface
                    elif key == 'netmask' and cidr:
                        return str(cidr).split('/')[1]
                    else:
                        return addr[key]
    return None
项目:charm-heat    作者:openstack    | 项目源码 | 文件源码
def is_ip(address):
    """
    Returns True if address is a valid IP address.
    """
    try:
        # Test to see if already an IPv4/IPv6 address
        address = netaddr.IPAddress(address)
        return True
    except (netaddr.AddrFormatError, ValueError):
        return False