Python ipaddr 模块,IPNetwork() 实例源码

我们从Python开源项目中,提取了以下37个代码示例,用于说明如何使用ipaddr.IPNetwork()

项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def __init__(self,
                 data   # Data suitable for this class
                 ):

        valid, message = data_is_valid(data)
        if not valid:
            raise ValueError("Invalid data: %s" % message)

        self.cidrs = []
        for iface in netifaces.interfaces():
            ifaddrs = netifaces.ifaddresses(iface)
            if netifaces.AF_INET in ifaddrs:
                for ifaddr in ifaddrs[netifaces.AF_INET]:
                    if 'addr' in ifaddr:
                        self.cidrs.append(ipaddr.IPNetwork(ifaddr['addr']))
            if netifaces.AF_INET6 in ifaddrs:
                for ifaddr in ifaddrs[netifaces.AF_INET6]:
                    if 'addr' in ifaddr:
                        #add v6 but remove stuff like %eth0 that gets thrown on end of some addrs
                        self.cidrs.append(ipaddr.IPNetwork(ifaddr['addr'].split('%')[0]))
项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def evaluate(self,
                 hints  # Information used for doing identification
                 ):

        """Given a set of hints, evaluate this identifier and return True if
        an identification is made.

        """

        try:
            ip = ipaddr.IPNetwork(hints['requester'])
        except KeyError:
            return False

        # TODO: Find out of there's a more hash-like way to do this
        # instead of a linear search.  This would be great if it
        # weren't GPL: https://pypi.python.org/pypi/pytricia

        for cidr in self.cidrs:
            if ip in cidr:
                return True

        return False

# A short test program
项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def __init__(self,
                 data   # Data suitable for this class
                 ):

        valid, message = data_is_valid(data)
        if not valid:
            raise ValueError("Invalid data: %s" % message)

        self.exclude = [ ipaddr.IPNetwork(addr)
                         for addr in data['exclude'] ]

        try:
            timeout = pscheduler.iso8601_as_timedelta(data['timeout'])
            self.timeout = pscheduler.timedelta_as_seconds(timeout)
        except KeyError:
            self.timeout = 2

        try:
            self.fail_result = data['fail-result']
        except KeyError:
            self.fail_result = False

        self.resolver = dns.resolver.Resolver()
        self.resolver.timeout = self.timeout
        self.resolver.lifetime = self.timeout
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def Validate(self, value, unused_key=None):
    """Validates a subnet."""
    if value is None:
      raise validation.MissingAttribute('subnet must be specified')
    if not isinstance(value, basestring):
      raise validation.ValidationError('subnet must be a string, not \'%r\'' %
                                       type(value))
    try:
      ipaddr.IPNetwork(value)
    except ValueError:
      raise validation.ValidationError('%s is not a valid IPv4 or IPv6 subnet' %
                                       value)


    parts = value.split('/')
    if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]):
      raise validation.ValidationError('Prefix length of subnet %s must be an '
                                       'integer (quad-dotted masks are not '
                                       'supported)' % value)

    return value
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def Validate(self, value, unused_key=None):
    """Validates a subnet."""
    if value is None:
      raise validation.MissingAttribute('subnet must be specified')
    if not isinstance(value, basestring):
      raise validation.ValidationError('subnet must be a string, not \'%r\'' %
                                       type(value))
    try:
      ipaddr.IPNetwork(value)
    except ValueError:
      raise validation.ValidationError('%s is not a valid IPv4 or IPv6 subnet' %
                                       value)


    parts = value.split('/')
    if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]):
      raise validation.ValidationError('Prefix length of subnet %s must be an '
                                       'integer (quad-dotted masks are not '
                                       'supported)' % value)

    return value
项目:usres_monitor    作者:pierky    | 项目源码 | 文件源码
def test_duplicate(net_str, net2_str=None, dup_found=True):
    usres_monitor.add_net(ipaddr.IPNetwork(net_str))
    try:
        usres_monitor.add_net(
            ipaddr.IPNetwork(net2_str if net2_str else net_str)
        )
    except USRESMonitorException as e:
        if "it was already in the db" in str(e):
            test_outcome("test_duplicate",
                         "{} and {}".format(
                             net_str,
                             net2_str if net2_str else net_str),
                         "OK" if dup_found else "FAIL")
            if not dup_found:
                raise AssertionError("Duplicate found")
            return
    test_outcome("test_duplicate",
                    "{} and {}".format(
                        net_str,
                        net2_str if net2_str else net_str),
                    "FAIL" if dup_found else "OK")
    if dup_found:
        raise AssertionError("Duplicate not found")
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def Validate(self, value, unused_key=None):
    """Validates a subnet."""
    if value is None:
      raise validation.MissingAttribute('subnet must be specified')
    if not isinstance(value, basestring):
      raise validation.ValidationError('subnet must be a string, not \'%r\'' %
                                       type(value))
    try:
      ipaddr.IPNetwork(value)
    except ValueError:
      raise validation.ValidationError('%s is not a valid IPv4 or IPv6 subnet' %
                                       value)


    parts = value.split('/')
    if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]):
      raise validation.ValidationError('Prefix length of subnet %s must be an '
                                       'integer (quad-dotted masks are not '
                                       'supported)' % value)

    return value
项目:sonic-mgmt    作者:Azure    | 项目源码 | 文件源码
def read_testbed_topo(self):
        with open(self.testbed_filename) as f:
            topo = csv.DictReader(f)
            for line in topo:
                tb_prop = {}
                name = ''
                for key in line:
                    if ('uniq-name' in key or 'conf-name' in key) and '#' in line[key]:
                        ### skip comment line
                        continue
                    elif 'uniq-name' in key or 'conf-name' in key:
                        name = line[key]
                    elif 'ptf_ip' in key and line[key]:
                        ptfaddress = ipaddress.IPNetwork(line[key])
                        tb_prop['ptf_ip'] = str(ptfaddress.ip)
                        tb_prop['ptf_netmask'] = str(ptfaddress.netmask)
                    else:
                        tb_prop[key] = line[key]
                if name:
                    self.testbed_topo[name] = tb_prop
        return
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def Validate(self, value, unused_key=None):
    """Validates a subnet."""
    if value is None:
      raise validation.MissingAttribute('subnet must be specified')
    if not isinstance(value, basestring):
      raise validation.ValidationError('subnet must be a string, not \'%r\'' %
                                       type(value))
    try:
      ipaddr.IPNetwork(value)
    except ValueError:
      raise validation.ValidationError('%s is not a valid IPv4 or IPv6 subnet' %
                                       value)


    parts = value.split('/')
    if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]):
      raise validation.ValidationError('Prefix length of subnet %s must be an '
                                       'integer (quad-dotted masks are not '
                                       'supported)' % value)

    return value
项目:Docker-XX-Net    作者:kuanghy    | 项目源码 | 文件源码
def Validate(self, value, unused_key=None):
    """Validates a subnet."""
    if value is None:
      raise validation.MissingAttribute('subnet must be specified')
    if not isinstance(value, basestring):
      raise validation.ValidationError('subnet must be a string, not \'%r\'' %
                                       type(value))
    try:
      ipaddr.IPNetwork(value)
    except ValueError:
      raise validation.ValidationError('%s is not a valid IPv4 or IPv6 subnet' %
                                       value)


    parts = value.split('/')
    if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]):
      raise validation.ValidationError('Prefix length of subnet %s must be an '
                                       'integer (quad-dotted masks are not '
                                       'supported)' % value)

    return value
项目:Sentry    作者:NetEaseGame    | 项目源码 | 文件源码
def is_valid_ip(ip_address, project):
    """
    Verify that an IP address is not being blacklisted
    for the given project.
    """
    blacklist = project.get_option('sentry:blacklisted_ips')
    if not blacklist:
        return True

    ip_network = IPNetwork(ip_address)
    for addr in blacklist:
        # We want to error fast if it's an exact match
        if ip_address == addr:
            return False

        # Check to make sure it's actually a range before
        # attempting to see if we're within that range
        if '/' in addr and ip_network in IPNetwork(addr):
            return False

    return True

# #845, add for server name filter, by hzwangzhiwei @20160803
项目:sonic-buildimage    作者:Azure    | 项目源码 | 文件源码
def parse_device_desc_xml(filename):
    root = ET.parse(filename).getroot()
    (lo_prefix, mgmt_prefix, hostname, hwsku, d_type) = parse_device(root)

    results = {}
    results['DEVICE_METADATA'] = {'localhost': {
        'hostname': hostname,
        'hwsku': hwsku,
        }}

    results['LOOPBACK_INTERFACE'] = {('lo', lo_prefix): {}}

    mgmt_intf = {}
    mgmtipn = ipaddress.IPNetwork(mgmt_prefix)
    gwaddr = ipaddress.IPAddress(int(mgmtipn.network) + 1)
    results['MGMT_INTERFACE'] = {('eth0', mgmt_prefix): {'gwaddr': gwaddr}}

    return results
项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def __init__(self,
                 data   # Data suitable for this class
                 ):

        valid, message = data_is_valid(data)
        if not valid:
            raise ValueError("Invalid data: %s" % message)

        self.cidrs = []
        for cidr in data['cidrs']:
            self.cidrs.append(ipaddr.IPNetwork(cidr))
项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def evaluate(self,
                 hints  # Information used for doing identification
                 ):

        """Given a set of hints, evaluate this identifier and return True if
        an identification is made.

        """

        try:
            ip = ipaddr.IPNetwork(hints['requester'])
        except KeyError:
            return False

        # TODO: Find out of there's a more hash-like way to do this
        # instead of a linear search.  This would be great if it
        # weren't GPL: https://pypi.python.org/pypi/pytricia

        for cidr in self.cidrs:
            if ip in cidr:
                return True

        return False


# A short test program
项目:docker-db    作者:EXASOL    | 项目源码 | 文件源码
def net_is_valid(self, net):
        """
        Returns true if the given string is a valid IP network (v4 or v6).
        """
        try:
            ipaddr.IPNetwork(net)
            return True
        except ValueError:
            return False
#}}}

#{{{ IP type
项目:docker-db    作者:EXASOL    | 项目源码 | 文件源码
def get_network(self, net_type):
        """ 
        Returns a network (as a string) that includes the private/public IPs of all nodes in the config.
        Raises an EXAConfError if an invalid IP is found or the IP of at least one node is not part
        of the network defined by the first node section.

        This function assumes that all nodes have an entry for the requested network type. The calling 
        function has to check if the network type is actually present (private / public).
        """

        network = "" 
        for section in self.config.sections:
            if self.is_node(section):
                node_sec = self.config[section]
                node_network = node_sec.get(net_type)
                if not node_network or node_network == "":
                    raise EXAConfError("Network type '%s' is missing in section '%s'!" % (net_type, section))
                node_ip = node_network.split("/")[0].strip()
                # check if the extracted IP is valid
                if not self.ip_is_valid(node_ip):
                    raise EXAConfError("IP %s in section '%s' is invalid!" % (node_ip, section))

                # first node : choose the private net as the cluster network (and make it a 'real' network)
                if network == "":
                    subnet = ipaddr.IPNetwork(node_network)
                    network = "%s/%s" % (str(subnet.network), str(subnet.prefixlen))
                # other nodes : check if their IP is part of the chosen net
                elif ipaddr.IPAddress(node_ip) not in ipaddr.IPNetwork(network):
                    raise EXAConfError("IP %s is not part of network %s!" % (node_ip, network))

        return network
#}}}

#{{{ Get private network
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def _ValidateEntry(self, entry):
    if not entry.subnet:
      return MISSING_SUBNET
    try:
      ipaddr.IPNetwork(entry.subnet)
    except ValueError:
      return BAD_IPV_SUBNET % entry.subnet
    parts = entry.subnet.split('/')
    if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]):
      return BAD_PREFIX_LENGTH % entry.subnet
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def _ValidateEntry(self, entry):
    if not entry.subnet:
      return MISSING_SUBNET
    try:
      ipaddr.IPNetwork(entry.subnet)
    except ValueError:
      return BAD_IPV_SUBNET % entry.subnet
    parts = entry.subnet.split('/')
    if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]):
      return BAD_PREFIX_LENGTH % entry.subnet
项目:omni    作者:openstack    | 项目源码 | 文件源码
def is_private_network(cidr):
        return ipaddr.IPNetwork(cidr).is_private
项目:omni    作者:openstack    | 项目源码 | 文件源码
def is_private_network(cidr):
        return ipaddr.IPNetwork(cidr).is_private
项目:usres_monitor    作者:pierky    | 项目源码 | 文件源码
def test_min_max(net_str, target_prefix_len, exp_first, exp_last,
                 shoud_fail=False):
    failed = False
    try:
        net = ipaddr.IPNetwork(net_str)

#        assert net_str.lower() == "{}/{}".format(exp_first, net.prefixlen), \
#               ("String representations of original net and first expected "
#                "prefix don't match.")

        min_int, max_int, _ = usres_monitor.get_sre(net, target_prefix_len)
        first = usres_monitor.get_ip_repr(net.version, min_int)
        last = usres_monitor.get_ip_repr(net.version, max_int)

        res = "{}/{}".format(first, target_prefix_len)
        exp = "{}/{}".format(exp_first.lower(), target_prefix_len)
        assert res == exp, \
               ("First expected prefix doesn't match: {} vs {}".format(
                   res, exp)
               )

        res = "{}/{}".format(last, target_prefix_len)
        exp = "{}/{}".format(exp_last.lower(), target_prefix_len)
        assert res == exp, \
               ("Last expected prefix doesn't match: {} vs {}".format(
                   res, exp)
               )

    except AssertionError:
        failed = True
        if shoud_fail:
            pass
        else:
            raise

    assert shoud_fail == failed

    test_outcome("get_sre",
                 "{} in /{}".format(net_str, target_prefix_len),
                 "OK{}".format(" (failed as expected)" if shoud_fail else ""))
项目:usres_monitor    作者:pierky    | 项目源码 | 文件源码
def get_net(net):
        if isinstance(net, (ipaddr.IPv4Network, ipaddr.IPv6Network)):
            return net
        return ipaddr.IPNetwork(net)
项目:ceph-lcm    作者:Mirantis    | 项目源码 | 文件源码
def load_cluster_networks(self):
        self.cluster_net = None
        self.public_net = None

        osd = self.get_alive_osd()
        if osd is not None:
            cluster_net_str = osd.config.get('cluster_network')
            if cluster_net_str is not None and cluster_net_str != "":
                self.cluster_net = IPNetwork(cluster_net_str)

            public_net_str = osd.config.get('public_network', None)
            if public_net_str is not None and public_net_str != "":
                self.public_net = IPNetwork(public_net_str)
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def _ValidateEntry(self, entry):
    if not entry.subnet:
      return MISSING_SUBNET
    try:
      ipaddr.IPNetwork(entry.subnet)
    except ValueError:
      return BAD_IPV_SUBNET % entry.subnet
    parts = entry.subnet.split('/')
    if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]):
      return BAD_PREFIX_LENGTH % entry.subnet
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def _ValidateEntry(self, entry):
    if not entry.subnet:
      return MISSING_SUBNET
    try:
      ipaddr.IPNetwork(entry.subnet)
    except ValueError:
      return BAD_IPV_SUBNET % entry.subnet
    parts = entry.subnet.split('/')
    if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]):
      return BAD_PREFIX_LENGTH % entry.subnet
项目:pycroft    作者:agdsn    | 项目源码 | 文件源码
def test_0020_select_subnet_for_ip(self):
        subnets = dormitory.Subnet.q.order_by(dormitory.Subnet.gateway).all()
        for subnet in subnets:
            for ip in ipaddr.IPNetwork(subnet.address).iterhosts():
                selected = select_subnet_for_ip(ip.compressed, subnets)
                self.assertEqual(subnet, selected)
项目:pycroft    作者:agdsn    | 项目源码 | 文件源码
def netmask(self):
        net = ipaddr.IPNetwork(self.address)
        return str(net.netmask)
项目:pycroft    作者:agdsn    | 项目源码 | 文件源码
def ip_version(self):
        return ipaddr.IPNetwork(self.address).version
项目:pycroft    作者:agdsn    | 项目源码 | 文件源码
def _ip_subnet_valid(self, ip, subnet):
        return ipaddr.IPAddress(ip) in ipaddr.IPNetwork(subnet.address)
项目:enjoliver    作者:JulienBalestra    | 项目源码 | 文件源码
def cni_ipam(host_cidrv4: str, host_gateway: str):
        """
        With the class variables provide a way to generate a static host-local ipam
        :param host_cidrv4:
        :param host_gateway:
        :return: dict
        """
        host = ipaddr.IPNetwork(host_cidrv4)
        subnet = host
        ip_cut = int(host.ip.__str__().split(".")[-1])
        if ConfigSyncSchedules.sub_ips:
            sub = ipaddr.IPNetwork(host.network).ip + (ip_cut * ConfigSyncSchedules.sub_ips)
            host = ipaddr.IPNetwork(sub)
        range_start = host.ip + ConfigSyncSchedules.skip_ips
        range_end = range_start + ConfigSyncSchedules.range_nb_ips
        ipam = {
            "type": "host-local",
            "subnet": "%s/%s" % (subnet.network.__str__(), subnet.prefixlen),
            "rangeStart": range_start.__str__(),
            "rangeEnd": range_end.__str__(),
            "gateway": host_gateway,
            "routes": [
                {"dst": "%s/32" % EC.perennial_local_host_ip, "gw": ipaddr.IPNetwork(host_cidrv4).ip.__str__()},
                {"dst": "0.0.0.0/0"},
            ],
            "dataDir": "/var/lib/cni/networks"
        }
        return ipam
项目:Docker-XX-Net    作者:kuanghy    | 项目源码 | 文件源码
def _ValidateEntry(self, entry):
    if not entry.subnet:
      return MISSING_SUBNET
    try:
      ipaddr.IPNetwork(entry.subnet)
    except ValueError:
      return BAD_IPV_SUBNET % entry.subnet
    parts = entry.subnet.split('/')
    if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]):
      return BAD_PREFIX_LENGTH % entry.subnet
项目:arouteserver    作者:pierky    | 项目源码 | 文件源码
def __init__(self, ip):
        if ip_library == "ipaddr":
            self.obj = ipaddr.IPNetwork(ip)
            self.ip = str(self.obj.ip)
        else:
            self.obj = ipaddress.ip_network(ip)
            self.ip = str(self.obj.network_address)

        self.prefixlen = self.obj.prefixlen
        self.max_prefixlen = self.obj.max_prefixlen
        self.version = self.obj.version
项目:Sentry    作者:NetEaseGame    | 项目源码 | 文件源码
def is_valid_url(url):
    """
    Tests a URL to ensure it doesn't appear to be a blacklisted IP range.
    """
    parsed = urlparse(url)
    if not parsed.hostname:
        return False

    server_hostname = get_server_hostname()

    if parsed.hostname == server_hostname:
        return True

    try:
        ip_address = socket.gethostbyname(parsed.hostname)
    except socket.gaierror:
        return False

    if ip_address == server_hostname:
        return True

    ip_network = IPNetwork(ip_address)
    for addr in DISALLOWED_IPS:
        if ip_network in addr:
            return False

    return True
项目:noc    作者:onfsdn    | 项目源码 | 文件源码
def __init__(self, vid, conf=None):
        if conf is None:
            conf = {}
        self.vid = vid
        self.tagged = []
        self.untagged = []
        self.name = conf.setdefault('name', str(vid))
        self.description = conf.setdefault('description', self.name)
        self.controller_ips = conf.setdefault('controller_ips', [])
        if self.controller_ips:
            self.controller_ips = [
                ipaddr.IPNetwork(ip) for ip in self.controller_ips]
        self.unicast_flood = conf.setdefault('unicast_flood', True)
        self.routes = conf.setdefault('routes', {})
        self.ipv4_routes = {}
        self.ipv6_routes = {}
        if self.routes:
            self.routes = [route['route'] for route in self.routes]
            for route in self.routes:
                ip_gw = ipaddr.IPAddress(route['ip_gw'])
                ip_dst = ipaddr.IPNetwork(route['ip_dst'])
                assert(ip_gw.version == ip_dst.version)
                if ip_gw.version == 4:
                    self.ipv4_routes[ip_dst] = ip_gw
                else:
                    self.ipv6_routes[ip_dst] = ip_gw
        self.arp_cache = {}
        self.nd_cache = {}
        self.max_hosts = conf.setdefault('max_hosts', None)
        self.host_cache = {}
项目:rest_api    作者:opentargets    | 项目源码 | 文件源码
def get(self):
        esstore = current_app.extensions['es_access_store']
        mpstore = current_app.extensions['mp_access_store']
        args = self.parser.parse_args()
        event = args['event'][:120]
        auth_token=request.headers.get('Auth-Token')
        ip_resolver = current_app.config['IP_RESOLVER']
        ip = RateLimiter.get_remote_addr()
        ip_net = IPNetwork(ip)
        resolved_org = ip_resolver['default']
        for net, org in ip_resolver.items():
            if isinstance(net, (IPv4Network, IPv6Network)):
                if net.overlaps(ip_net):
                    resolved_org = org
                    break
        data = dict(ip=ip,
                    org=resolved_org,
                    host=request.host,
                    timestamp=datetime.now(),
                    event=event)
        if auth_token:
            payload = TokenAuthentication.get_payload_from_token(auth_token)
            data['app_name'] = payload['app_name']
        # esstore.store_event(data)
        mpstore.store_event(data)
        data['timestamp']= str(data['timestamp'])
        return CTTVResponse.OK(SimpleResult(None, data=data))
项目:sonic-mgmt    作者:Azure    | 项目源码 | 文件源码
def parse_graph(self):
        """
        Parse  the xml graph file
        """
        deviceinfo = {}
        deviceroot = self.root.find(self.pngtag).find('Devices')
        devices = deviceroot.findall('Device')
        if devices is not None:
            for dev in devices:
                hostname = dev.attrib['Hostname']
                if hostname is not None:
                    deviceinfo[hostname] = {}
                    hwsku = dev.attrib['HwSku']
                    devtype = dev.attrib['Type']
                    deviceinfo[hostname]['HwSku'] = hwsku
                    deviceinfo[hostname]['Type'] = devtype
                    self.links[hostname] = {}
        devicel2info = {}
        devicel3s = self.root.find(self.dpgtag).findall('DevicesL3Info')
        devicel2s = self.root.find(self.dpgtag).findall('DevicesL2Info')
        if devicel2s is not None:
            for l2info in devicel2s:
                hostname = l2info.attrib['Hostname']
                if hostname is not None:
                    devicel2info[hostname] = {}
                    vlans = l2info.findall('InterfaceVlan')
                    for vlan in vlans:
                        portname = vlan.attrib['portname']
                        portmode = vlan.attrib['mode']
                        portvlanid = vlan.attrib['vlanids']
                        portvlanlist = self.port_vlanlist(portvlanid)
                        devicel2info[hostname][portname] = {'mode': portmode, 'vlanids': portvlanid, 'vlanlist': portvlanlist}
        if devicel3s is not None:
            for l3info in devicel3s:
                hostname = l3info.attrib['Hostname']
                if hostname is not None:
                    management_ip = l3info.find('ManagementIPInterface').attrib['Prefix']
                    deviceinfo[hostname]['ManagementIp'] = management_ip
                    mgmtip = ipaddress.IPNetwork(management_ip)
                    deviceinfo[hostname]['mgmtip'] = str(mgmtip.ip)
                    management_gw = str(mgmtip.network+1)
                    deviceinfo[hostname]['ManagementGw'] = management_gw
        allinks = self.root.find(self.pngtag).find('DeviceInterfaceLinks').findall('DeviceInterfaceLink')
        if allinks is not None:
            for link in allinks:
                start_dev = link.attrib['StartDevice']
                end_dev = link.attrib['EndDevice']
                if start_dev:
                    self.links[start_dev][link.attrib['StartPort']] = {'peerdevice':link.attrib['EndDevice'], 'peerport': link.attrib['EndPort']}
                if end_dev:
                    self.links[end_dev][link.attrib['EndPort']] = {'peerdevice': link.attrib['StartDevice'], 'peerport': link.attrib['StartPort']}
        self.devices = deviceinfo
        self.vlanport = devicel2info
项目:noc    作者:onfsdn    | 项目源码 | 文件源码
def add_controller_ips(self, controller_ips, vlan):
        ofmsgs = []
        for controller_ip in controller_ips:
            controller_ip_host = ipaddr.IPNetwork(
                '/'.join((str(controller_ip.ip),
                          str(controller_ip.max_prefixlen))))
            if controller_ip_host.version == 4:
                ofmsgs.append(self.valve_flowcontroller(
                    self.dp.eth_src_table,
                    self.valve_in_match(
                        eth_type=ether.ETH_TYPE_ARP,
                        nw_dst=controller_ip_host,
                        vlan=vlan),
                    priority=self.dp.highest_priority))
                ofmsgs.append(self.valve_flowcontroller(
                    self.dp.eth_src_table,
                    self.valve_in_match(
                        eth_type=ether.ETH_TYPE_IP,
                        eth_dst=self.FAUCET_MAC,
                        vlan=vlan,
                        nw_proto=inet.IPPROTO_ICMP,
                        nw_src=controller_ip,
                        nw_dst=controller_ip_host),
                    priority=self.dp.highest_priority))
            else:
                ofmsgs.append(self.valve_flowcontroller(
                    self.dp.eth_src_table,
                    self.valve_in_match(
                        eth_type=ether.ETH_TYPE_IPV6,
                        vlan=vlan,
                        nw_proto=inet.IPPROTO_ICMPV6,
                        ipv6_nd_target=controller_ip_host,
                        icmpv6_type=icmpv6.ND_NEIGHBOR_SOLICIT),
                    priority=self.dp.highest_priority))
                ofmsgs.append(self.valve_flowcontroller(
                    self.dp.eth_src_table,
                    self.valve_in_match(
                        eth_type=ether.ETH_TYPE_IPV6,
                        eth_dst=self.FAUCET_MAC,
                        vlan=vlan,
                        nw_proto=inet.IPPROTO_ICMPV6,
                        icmpv6_type=icmpv6.ND_NEIGHBOR_ADVERT),
                    priority=self.dp.highest_priority))
                ofmsgs.append(self.valve_flowcontroller(
                    self.dp.eth_src_table,
                    self.valve_in_match(
                        eth_type=ether.ETH_TYPE_IPV6,
                        vlan=vlan,
                        nw_proto=inet.IPPROTO_ICMPV6,
                        nw_dst=controller_ip_host,
                        icmpv6_type=icmpv6.ICMPV6_ECHO_REQUEST),
                    priority=self.dp.highest_priority))
        return ofmsgs