Python ipaddr 模块,IPAddress() 实例源码

我们从Python开源项目中,提取了以下23个代码示例,用于说明如何使用ipaddr.IPAddress()

项目:farfetchd    作者:isislovecruft    | 项目源码 | 文件源码
def isIPAddress(ip, compressed=True):
    """Check if an arbitrary string is an IP address, and that it's valid.

    :type ip: basestring or int
    :param ip: The IP address to check.
    :param boolean compressed: If True, return a string representing the
        compressed form of the address. Otherwise, return an
        :class:`ipaddr.IPAddress` instance.
    :rtype: A :class:`ipaddr.IPAddress`, or a string, or False
    :returns: The IP, as a string or a class, if it passed the
        checks. Otherwise, returns False.
    """
    try:
        ip = ipaddr.IPAddress(ip)
    except ValueError:
        return False
    else:
        if isValidIP(ip):
            if compressed:
                return ip.compressed
            else:
                return ip
    return False
项目:farfetchd    作者:isislovecruft    | 项目源码 | 文件源码
def isIPv(version, ip):
    """Check if **ip** is a certain **version** (IPv4 or IPv6).

    .. warning: Do *not* put any calls to the logging module in this function,
        or else an infinite recursion will occur when the call is made, due
        the the log :class:`~logging.Filter`s in :mod:`~bridgedb.safelog`
        using this function to validate matches from the regular expression
        for IP addresses.

    :param integer version: The IPv[4|6] version to check; must be either
        ``4`` or ``6``. Any other value will be silently changed to ``4``.
    :param ip: The IP address to check. May be an any type which
        :class:`ipaddr.IPAddress` will accept.
    :rtype: boolean
    :returns: ``True``, if the address is an IPv4 address.
    """
    try:
        ipaddr.IPAddress(ip, version=version)
    except (ipaddr.AddressValueError, Exception):
        return False
    else:
        return True
    return False
项目:sonic-buildimage    作者:Azure    | 项目源码 | 文件源码
def parse_device_desc_xml(filename):
    root = ET.parse(filename).getroot()
    (lo_prefix, mgmt_prefix, hostname, hwsku, d_type) = parse_device(root)

    results = {}
    results['DEVICE_METADATA'] = {'localhost': {
        'hostname': hostname,
        'hwsku': hwsku,
        }}

    results['LOOPBACK_INTERFACE'] = {('lo', lo_prefix): {}}

    mgmt_intf = {}
    mgmtipn = ipaddress.IPNetwork(mgmt_prefix)
    gwaddr = ipaddress.IPAddress(int(mgmtipn.network) + 1)
    results['MGMT_INTERFACE'] = {('eth0', mgmt_prefix): {'gwaddr': gwaddr}}

    return results
项目:docker-db    作者:EXASOL    | 项目源码 | 文件源码
def ip_is_valid(self, ip):
        """
        Returns true if the given string is a valid IP address (v4 or v6).
        """
        try:
            ipaddr.IPAddress(ip)
            return True
        except ValueError:
            return False
#}}}

#{{{ Check if network is valid
项目:docker-db    作者:EXASOL    | 项目源码 | 文件源码
def get_network(self, net_type):
        """ 
        Returns a network (as a string) that includes the private/public IPs of all nodes in the config.
        Raises an EXAConfError if an invalid IP is found or the IP of at least one node is not part
        of the network defined by the first node section.

        This function assumes that all nodes have an entry for the requested network type. The calling 
        function has to check if the network type is actually present (private / public).
        """

        network = "" 
        for section in self.config.sections:
            if self.is_node(section):
                node_sec = self.config[section]
                node_network = node_sec.get(net_type)
                if not node_network or node_network == "":
                    raise EXAConfError("Network type '%s' is missing in section '%s'!" % (net_type, section))
                node_ip = node_network.split("/")[0].strip()
                # check if the extracted IP is valid
                if not self.ip_is_valid(node_ip):
                    raise EXAConfError("IP %s in section '%s' is invalid!" % (node_ip, section))

                # first node : choose the private net as the cluster network (and make it a 'real' network)
                if network == "":
                    subnet = ipaddr.IPNetwork(node_network)
                    network = "%s/%s" % (str(subnet.network), str(subnet.prefixlen))
                # other nodes : check if their IP is part of the chosen net
                elif ipaddr.IPAddress(node_ip) not in ipaddr.IPNetwork(network):
                    raise EXAConfError("IP %s is not part of network %s!" % (node_ip, network))

        return network
#}}}

#{{{ Get private network
项目:usres_monitor    作者:pierky    | 项目源码 | 文件源码
def get_ip_repr(ip_ver, net_int):
        return ipaddr.IPAddress(net_int if ip_ver == 4 else net_int << 64)
项目:dnsAutoRebinding    作者:Tr3jer    | 项目源码 | 文件源码
def analy_req(address):
    mainDomain = conf_read('maindomain')
    address = address[:-len(mainDomain) - 1]
    payload = conf_read('payload')
    encoding = conf_read('encoding')
    record = address

    try:

        if encoding == 'int':
            record = ipaddr.IPAddress(int(address)).__str__()
        elif encoding == 'hex':
            try:
                address = address.decode('hex')

                if ipaddr.IPAddress(address).version == 4:
                    record = address
                elif conf_read('type') == 'AAAA' and ipaddr.IPAddress(address).version == 6:
                    record = address
                else:
                    pass
            except:
                pass
        # elif False not in map(lambda x:x in map(lambda x:chr(x),range(97,108)),list(address)):
        elif encoding == 'en':
            record = numToEnToNum(address)
        elif payload != 'None' and payload.find(mainDomain) == -1:
            # record = payload + "www.google.com"
            record = payload + mainDomain

    except Exception,e:
        print '[!] Subdomain Invalid {}'.format(e)
    finally:
        return record
项目:dnsAutoRebinding    作者:Tr3jer    | 项目源码 | 文件源码
def ipListBuild(address):
    print '1. Single IP Covert For En\n2. Build IP List'
    opt_req = raw_input("[+] [1 By Default/2]") or '1'
    if opt_req == '1':
        print numToEnToNum(address)
        exit()

    conf_main = conf_read('maindomain')[:-1]
    seg_len = raw_input("[+] Please Input Segment Length [24 By Default]") or 24
    encode_req = raw_input("[+] Please Input Encoding ['ipv4' By Default]")
    mainDomain = raw_input("[+] Please Input Server Root Address [{} By Default]".format(conf_main)) or conf_main
    segment = eval("ipaddr.IPv4Network('{}/{}').iterhosts()".format(address, int(seg_len)))
    save_file = "{}_{}_{}.txt".format(time.strftime("%Y%m%d%X", time.localtime()).replace(':', ''), mainDomain.replace('.','_'),(encode_req if encode_req else 'ipv4'))
    results = []

    try:

        if encode_req == '': results += ["{}.{}".format(str(i),mainDomain) for i in list(segment)]
        elif encode_req == 'en':
            results += ["{}.{}".format(numToEnToNum(str(i)),mainDomain) for i in list(segment)]
        elif encode_req == 'int':
            results += ["{}.{}".format(int(ipaddr.IPAddress(str(i))),mainDomain) for i in list(segment)]
        elif encode_req == 'hex':
            results += ["{}.{}".format(str(i).encode('hex'),mainDomain) for i in list(segment)]
        else:
            pass

        f = open(save_file,'a')
        [f.write(i+'\n') for i in results]
        f.close()
        print '[+] Stored in the {}'.format(save_file)
    except Exception,e:
        print e
        exit()
项目:deb-python-sqlalchemy-utils    作者:openstack    | 项目源码 | 文件源码
def __init__(self, max_length=50, *args, **kwargs):
        if not ip_address:
            raise ImproperlyConfigured(
                "'ipaddr' package is required to use 'IPAddressType' "
                "in python 2"
            )

        super(IPAddressType, self).__init__(*args, **kwargs)
        self.impl = types.Unicode(max_length)
项目:deb-python-sqlalchemy-utils    作者:openstack    | 项目源码 | 文件源码
def process_result_value(self, value, dialect):
        return ip_address(value) if value else None
项目:deb-python-sqlalchemy-utils    作者:openstack    | 项目源码 | 文件源码
def _coerce(self, value):
        return ip_address(value) if value else None
项目:pycroft    作者:agdsn    | 项目源码 | 文件源码
def _ip_subnet_valid(self, ip, subnet):
        return ipaddr.IPAddress(ip) in ipaddr.IPNetwork(subnet.address)
项目:pycroft    作者:agdsn    | 项目源码 | 文件源码
def select_subnet_for_ip(ip, subnets):
    for subnet in subnets:
        if ipaddr.IPAddress(ip) in ipaddr.IPv4Network(subnet.address):
            return subnet
项目:rvmi-rekall    作者:fireeye    | 项目源码 | 文件源码
def __init__(self, server_address, handler, *args, **kwargs):
        self.session = kwargs.pop("session")
        (address, _) = server_address
        version = ipaddr.IPAddress(address).version
        if version == 4:
            self.address_family = socket.AF_INET
        elif version == 6:
            self.address_family = socket.AF_INET6

        BaseHTTPServer.HTTPServer.__init__(
            self, server_address, handler, *args, **kwargs)
项目:ooniprobe-debian    作者:TheTorProject    | 项目源码 | 文件源码
def buildSocket(self, addr):
        global s
        ip = ipaddr.IPAddress(addr) ## learn if we're IPv4 or IPv6
        if ip.version == 4:
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        elif ip.version == 6:
            s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
        return s
项目:ooniprobe-debian    作者:TheTorProject    | 项目源码 | 文件源码
def contains(self, ip_address):
        ip = ipaddr.IPAddress(ip_address)
        if isinstance(ip, ipaddr.IPv4Address):
            networks = self.ipv4_networks
        elif isinstance(ip, ipaddr.IPv6Address):
            networks = self.ipv6_networks
        else:
            raise RuntimeError("Should never happen")
        for network in networks:
            if network.Contains(ip):
                return True
        return False
项目:ooniprobe-debian    作者:TheTorProject    | 项目源码 | 文件源码
def getAddresses():
    from scapy.all import get_if_addr, get_if_list
    from ipaddr import IPAddress

    addresses = set()
    for i in get_if_list():
        try:
            addresses.add(get_if_addr(i))
        except:
            pass
    if '0.0.0.0' in addresses:
        addresses.remove('0.0.0.0')
    return [IPAddress(addr) for addr in addresses]
项目:arouteserver    作者:pierky    | 项目源码 | 文件源码
def __init__(self, ip):
        if ip_library == "ipaddr":
            self.obj = ipaddr.IPAddress(ip)
            self.ip = str(self.obj)
        else:
            self.obj = ipaddress.ip_address(ip)
            self.ip = str(self.obj.compressed)

        self.version = self.obj.version
项目:noc    作者:onfsdn    | 项目源码 | 文件源码
def __init__(self, vid, conf=None):
        if conf is None:
            conf = {}
        self.vid = vid
        self.tagged = []
        self.untagged = []
        self.name = conf.setdefault('name', str(vid))
        self.description = conf.setdefault('description', self.name)
        self.controller_ips = conf.setdefault('controller_ips', [])
        if self.controller_ips:
            self.controller_ips = [
                ipaddr.IPNetwork(ip) for ip in self.controller_ips]
        self.unicast_flood = conf.setdefault('unicast_flood', True)
        self.routes = conf.setdefault('routes', {})
        self.ipv4_routes = {}
        self.ipv6_routes = {}
        if self.routes:
            self.routes = [route['route'] for route in self.routes]
            for route in self.routes:
                ip_gw = ipaddr.IPAddress(route['ip_gw'])
                ip_dst = ipaddr.IPNetwork(route['ip_dst'])
                assert(ip_gw.version == ip_dst.version)
                if ip_gw.version == 4:
                    self.ipv4_routes[ip_dst] = ip_gw
                else:
                    self.ipv6_routes[ip_dst] = ip_gw
        self.arp_cache = {}
        self.nd_cache = {}
        self.max_hosts = conf.setdefault('max_hosts', None)
        self.host_cache = {}
项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def evaluate(self,
                 hints  # Information used for doing identification
                 ):

        """Given a set of hints, evaluate this identifier and return True if
        an identification is made.

        """

        try:
            ip = hints['requester']
        except KeyError:
            return False

        octets = dns.reversename.from_address(ip)[0:-3]
        host = '.'.join(octets)
        host += '.v4.fullbogons.cymru.com' if len(octets) == 4 \
                else '.v6.fullbogons.cymru.com'

        # The query for this is always for an 'A' record, even for
        # IPv6 hosts.

        try:
            resolved = self.resolver.query(host, 'A')[0]
        except dns.resolver.NXDOMAIN:
            return False
        except (dns.exception.Timeout,
                dns.resolver.NoAnswer,
                dns.resolver.NoNameservers):
            return self.fail_result

        # The query will return 127.0.0.2 if it's in the bogon list.
        # See http://www.team-cymru.org/bogon-reference-dns.html.

        if str(resolved) != '127.0.0.2':
            return False

        # At this point, we have a bogon.  Filter out exclusions.

        net_ip = ipaddr.IPAddress(ip)
        for exclude in self.exclude:
            if net_ip in exclude:
                return False

        # Not excluded; must be a legit bogon.

        return True


# A short test program
项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def evaluate(self,
                 hints  # Information used for doing identification
                 ):

        """Given a set of hints, evaluate this identifier and return True if
        an identification is made.

        """

        try:
            ip = hints['requester']
        except KeyError:
            return False

        addr = ipaddr.IPAddress(ip)

        ip_reverse = dns.reversename.from_address(ip)

        # Resolve to a FQDN

        try:
            reverse = str(self.resolver.query(ip_reverse, 'PTR')[0])
        except (dns.resolver.NXDOMAIN,
                dns.exception.Timeout,
                dns.resolver.NoAnswer,
                dns.resolver.NoNameservers):
            return False

        # Resolve the FQDN back to an IP and see if they match.  This
        # prevents someone in control over their reverse resolution
        # from claiming they're someone they're not.

        # TODO: Check against _all_ returned IPs

        record = 'A' if addr.version == 4 else 'AAAA'
        try:
            forwards = self.resolver.query(reverse, record)
        except (dns.resolver.NXDOMAIN,
                dns.exception.Timeout,
                dns.resolver.NoAnswer,
                dns.resolver.NoNameservers):
            return False

        if ip not in [ str(f) for f in forwards ]:
            return False

        # Try to match with and without the dot at the end.

        for reverse_candidate in [ reverse, reverse.rstrip('.') ]:
            if self.matcher.matches(reverse_candidate):
                return True


        # No match, no dice.
        return False



# A short test program
项目:scanapi    作者:mozilla    | 项目源码 | 文件源码
def _pass_hostinfo(self, entry):
        s = None
        if entry['host'] not in self._state:
            s = {
                    'vulnerabilities':        [],
                    'exempt_vulnerabilities': [],
                    'ports':                  set(),
                    'hostname':               None,
                    'ipaddress':              None,
                    'os':                     None,
                    'credentialed_checks':    False
                    }
        else:
            s = self._state[entry['host']]

        # if the hostname has not been set yet, just default it to the key/target
        # value
        if s['hostname'] == None:
            s['hostname'] = entry['host']

        thishostinfo = self._hostinfo_locator(entry)

        # attempt to determine the ip address; if our target is an ip just use that,
        # otherwise try to locate the ip address using the supplementary host info
        try:
            ipaddr.IPAddress(entry['host'])
            s['ipaddress'] = entry['host']
        except:
            if thishostinfo != None:
                s['ipaddress'] = thishostinfo['host-ip']

        if thishostinfo != None and 'operating-system' in thishostinfo:
            s['os'] = thishostinfo['operating-system']

        # attempt to extract kernel hostname 
        if 'output of \"uname -a\" is' in entry['output']:
            unamestr = entry['output'].replace('\n', ' ')
            m = re.search('output of "uname -a" is : Linux (\S+) ', unamestr)
            if m != None:
                s['hostname'] = m.group(1)
        elif '= Computer name' in entry['output']:
            cnamestr = entry['output'].replace('\n', ' ')
            m = re.search('(\S+)\s+= Computer name', cnamestr)
            if m != None:
                s['hostname'] = m.group(1)

        # flip credentialed checks if we find plugin output indicating the scan
        # included successfully used credentials
        if 'Credentialed checks : yes' in entry['output']:
            s['credentialed_checks'] = True

        self._state[entry['host']] = s
项目:ceph-lcm    作者:Mirantis    | 项目源码 | 文件源码
def load_hosts(self):
        for host_name in self.storage.hosts[2]:
            stor_node = self.storage.get("hosts/" + host_name, expected_format=None)

            host = Host(host_name)
            self.hosts[host.name] = host

            lshw_xml = stor_node.get('lshw', expected_format='xml')

            if lshw_xml is None:
                host.hw_info = None
            else:
                try:
                    host.hw_info = get_hw_info(lshw_xml)
                except:
                    host.hw_info = None

            info = self.parse_meminfo(stor_node.get('meminfo'))
            host.mem_total = info['MemTotal']
            host.mem_free = info['MemFree']
            host.swap_total = info['SwapTotal']
            host.swap_free = info['SwapFree']
            loadavg = stor_node.get('loadavg')

            host.load_5m = None if loadavg is None else float(loadavg.strip().split()[1])

            ipa = self.storage.get('hosts/%s/ipa' % host.name)
            ip_rr_s = r"\d+:\s+(?P<adapter>.*?)\s+inet\s+(?P<ip>\d+\.\d+\.\d+\.\d+)/(?P<size>\d+)"

            info = collections.defaultdict(lambda: [])
            for line in ipa.split("\n"):
                match = re.match(ip_rr_s, line)
                if match is not None:
                    info[match.group('adapter')].append(
                        (IPAddress(match.group('ip')), int(match.group('size'))))

            for adapter, ips_with_sizes in info.items():
                for ip, sz in ips_with_sizes:
                    if self.public_net is not None and ip in self.public_net:
                        host.public_net = NetworkAdapter(adapter, ip)

                    if self.cluster_net is not None and ip in self.cluster_net:
                        host.cluster_net = NetworkAdapter(adapter, ip)

            interfaces = getattr(self.jstorage.hosts, host_name).interfaces
            for name, adapter_dct in interfaces.items():
                adapter_dct = adapter_dct.copy()

                dev = adapter_dct.pop('dev')
                adapter = NetworkAdapter(dev, None)
                adapter.__dict__.update(adapter_dct)
                host.net_adapters[dev] = adapter

            net_stats = self.get_node_net_stats(host.name)
            perf_adapters = [host.cluster_net, host.public_net] + list(host.net_adapters.values())

            for net in perf_adapters:
                if net is not None and net.name is not None:
                    net.perf_stats = net_stats.get(net.name)

            host.uptime = float(stor_node.get('uptime').split()[0])