Python IPy 模块,IP 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用IPy.IP

项目:FCParser    作者:josecamachop    | 项目源码 | 文件源码
def equals(self, raw_value):
        """Compares this IP address to a given one, OR
        Checks this IP address matchtype.
        Suported matchtypes: 'private', 'public'.

        raw_value -- Specific IP address, OR matchtype of IP.
        """
        if self.value is None:
            output = False
        elif raw_value == 'private':
            output = (self.value.iptype() == 'PRIVATE')
        elif raw_value == 'public':
            output = (self.value.iptype() == 'PUBLIC')
        else:
            value = self.load(raw_value)
            output = (self.value == value)

        return output
项目:node-agent    作者:Tendrl    | 项目源码 | 文件源码
def _parse_heal_info_stats(tree):
    bricks_dict = {}
    for brick in tree.findall("healInfo/bricks/brick"):
        brick_name = brick.find("name").text
        brick_host = brick_name.split(":")[0]
        brick_path = brick_name.split(":")[1]

        # If brick host is returned as an IP conver to FQDN
        try:
            from IPy import IP
            from dns import resolver, reversename
            IP(brick_host)
            addr = reversename.from_address(brick_host)
            brick_host = str(resolver.query(addr, "PTR")[0])[:-1]
        except ValueError:
            pass

        no_of_entries = 0
        try:
            no_of_entries = int(brick.find("numberOfEntries").text)
        except ValueError:
            no_of_entries = 0
        bricks_dict["%s:%s" % (brick_host, brick_path)] = no_of_entries
    return bricks_dict
项目:pr0xy    作者:VillanCh    | 项目源码 | 文件源码
def generate_target(IPs, ports):
    """Generate the target for scanning HTTP proxy"""

    gen = None
    if '-' in IPs:
        pairs = IPs.split('-')
        start = pairs[0]
        end = pairs[1]
        gen = int2ips(IP(start).int(), IP(end).int())
    else:
        gen = IP(IPs)

    for i in gen:
        for port in ports:
            if isinstance(port, int):
                port = str(port)
            yield ':'.join([i.__str__(), port])

#----------------------------------------------------------------------
项目:pr0xy    作者:VillanCh    | 项目源码 | 文件源码
def runTest(self):
        IPs_1 = '123.1.2.0/24'
        IPs_2 = '123.1.2.3-123.1.3.5'
        ip_gen = generate_target(IPs_1, PORTS)
        for i in ip_gen:
            ip_port = i.split(':')
            print ip_port
            try:
                IPy.IP(ip_port[0])
            except ValueError:
                pass

        ip_gen = generate_target(IPs_2, PORTS)
        for i in ip_gen:
            ip_port = i.split(':')
            print ip_port
            try:
                IPy.IP(ip_port[0])
            except ValueError:
                pass
项目:g3ar    作者:VillanCh    | 项目源码 | 文件源码
def is_ipv4(ip):
    """Check if the [ip] is a real ip addr.

    Params:
        ip: :str: General IP format.

    Returns:
        :type: bool
        :desc: if ip is a valid IP addr, return true
            else return False"""
    try:
        IP(ip)
        return True
    except ValueError:
        return False

#----------------------------------------------------------------------
项目:fenghuangscanner_v3    作者:0xwindows    | 项目源码 | 文件源码
def getips(self,ip):
        iplist=[]
        try:
            if "-" in ip.split(".")[3]:
                startnum=int(ip.split(".")[3].split("-")[0])
                endnum=int(ip.split(".")[3].split("-")[1])
                for i in range(startnum,endnum):
                    iplist.append("%s.%s.%s.%s" %(ip.split(".")[0],ip.split(".")[1],ip.split(".")[2],i))
            else:
                ips=IP(ip)
                for i in ips:
                    iplist.append(str(i))

            return iplist

        except:
            printRed("[!] not a valid ip given. you should put ip like 192.168.1.0/24, 192.168.0.0/16,192.168.0.1-200")
            exit()
项目:kekescan    作者:xiaoxiaoleo    | 项目源码 | 文件源码
def getips(self,ip):
        iplist=[]
        try:
            if "-" in ip.split(".")[3]:
                startnum=int(ip.split(".")[3].split("-")[0])
                endnum=int(ip.split(".")[3].split("-")[1])
                for i in range(startnum,endnum):
                    iplist.append("%s.%s.%s.%s" %(ip.split(".")[0],ip.split(".")[1],ip.split(".")[2],i))
            else:
                ips=IP(ip)
                for i in ips:
                    iplist.append(str(i))

            return iplist

        except:
            printRed("[!] not a valid ip given. you should put ip like 192.168.1.0/24, 192.168.0.0/16,192.168.0.1-200")
            exit()
项目:check_wan_latency    作者:computingbee    | 项目源码 | 文件源码
def checkstate(lat):
    if lat < 0:
        print "UNKNOWN: {0}ms, something must have gone wrong or your IP is unpingable".format(lat)
        sys.exit(STATE_UNKNOWN)
    elif TS_AVGLATW <= 0 or TS_AVGLATC <= 0:
        print "OK: {0}ms from {1}, no thresholds given or you just want see values first".format(lat,CITY)
        sys.exit(STATE_OK)
    elif TS_AVGLATW <= lat < TS_AVGLATC:
        print "WARNING: {0}ms from {1} is more than {2}ms but less than {3}ms for ip address {4}".format(lat,CITY,TS_AVGLATW,TS_AVGLATC,WAN_IP)
        sys.exit(STATE_WARNING)
    elif lat >= TS_AVGLATC:
        print "CRITICAL: {0}ms from {1} more than {2}ms for ip address {3}".format(lat,CITY,TS_AVGLATC,WAN_IP)
        sys.exit(STATE_CRITICAL)
    else:
        print "OK: {0}ms from {1}".format(lat,CITY)
        sys.exit(STATE_OK)
项目:dancedeets-monorepo    作者:mikelambert    | 项目源码 | 文件源码
def authorize_middleware(app, check_env_for_admin=_no_admin):
    def wsgi_app(environ, start_response):
        # deferred.py needs REMOTE_ADDR set to a specific value,
        # so set it here if we're an internal request
        private_ip = IPy.IP(environ['REMOTE_ADDR']).iptype() == 'PRIVATE'
        if private_ip and _check_for_builtin(environ):
            environ['REMOTE_ADDR'] = '0.1.0.2'

        if _check_for_builtin(environ) or check_env_for_admin(environ):
            return app(environ, start_response)
        else:
            logging.warning('Failed to authorize request, environment is: %s', environ)
            status = '403 Forbidden'
            headers = [('Content-type', 'text/plain')]
            start_response(status, headers)
            return ['Forbidden']

    return wsgi_app
项目:dancedeets-monorepo    作者:mikelambert    | 项目源码 | 文件源码
def get_location_data_for(ip):
    if not ip:
        return {}
    # Check the common private IP spaces (used by Google for sending requests)
    if IPy.IP(ip).iptype() == 'PRIVATE':
        return {}
    start = time.time()
    data = _get_cache(ip)
    timelog.log_time_since('Getting IP Cache', start)
    if not data:
        #TODO: consider using http://geoiplookup.net/ , which might offer better granularity/resolution
        url = 'http://freegeoip.net/json/%s' % ip
        start = time.time()
        results = urllib.urlopen(url).read()
        timelog.log_time_since('Getting IPData', start)
        data = json.loads(results)
        start = time.time()
        _save_cache(ip, data)
        timelog.log_time_since('Saving IPCache', start)
    return data
项目:dreamr-botnet    作者:YinAndYangSecurityAwareness    | 项目源码 | 文件源码
def isPublic(self, ipaddr):
        if (Dreamr.server == True):
            return True
        else:

            # Check if host's interface address is public
            addrType = IPy.IP(ipaddr).iptype()
            if (addrType == "PUBLIC"):
                print("BECOME_SERVER")
                return True

            # Server Debug
            if (DEBUG_MODE):
                print("BECOME_SERVER_DEBUG")
                return True
        return False

# SERVER HANDLER THREAD | Fork off a client thread to handle the socket
# This is a connected client or server!
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def filter_active_router_addresses(gwportprefixes):
    """Filters a GwPortPrefix queryset, leaving only active router addresses.

    For any given prefix, if multiple router addresses exist, the lowest IP
    address will be picked.  If the prefix has a virtual address, it will be
    picked instead.

    :param gwportprefixes: A GwPortPrefix QuerySet.
    :returns: A list of GwPortPrefix objects.

    """
    # It is more or less impossible to get Django's ORM to generate the
    # wonderfully complex SQL needed for this, so we do it by hand.
    raddrs = gwportprefixes.order_by('prefix__id', '-virtual', 'gw_ip')
    grouper = groupby(raddrs, attrgetter('prefix_id'))
    return [next(group) for _key, group in grouper]
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def usage(self, request, *args, **kwargs):
        "Return usage for Prefix.pk == pk"
        pk = kwargs.pop("pk", None)
        prefix = Prefix.objects.get(pk=pk)
        max_addr = IP(prefix.net_address).len()
        active_addr = prefix_collector.collect_active_ip(prefix)
        # calculate allocated ratio
        query = PrefixQuerysetBuilder().within(prefix.net_address)
        allocated = query.finalize().exclude(vlan__net_type="scope")
        total_allocated = sum(p.get_prefix_size() for p in allocated)
        payload = {
            "max_addr": max_addr,
            "active_addr": active_addr,
            "usage": 1.0 * active_addr / max_addr,
            "allocated": 1.0 * total_allocated / max_addr,
            "pk": pk
        }
        return Response(payload, status=status.HTTP_200_OK)
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def filter_full_prefixes(self):
        """Remove /32 (or /128) prefixes from the queryset. Often useful to
        reduce noise, as these are of little or no value to most network
        planning operations.

        Returns:
            A lazy iterator of all filtered prefixes

        """
        def _filter_full_prefixes(q):
            for prefix in q:
                ip = IP(prefix.net_address)
                if ip.version() == 4 and ip.prefixlen() < 32:
                    yield prefix
                    continue
                if ip.version() == 6 and ip.prefixlen() < 128:
                    yield prefix
                    continue
        self.post_hooks.append(_filter_full_prefixes)
        return self
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def _get_available_subnets(prefix_or_prefixes, used_prefixes):
    """Get available prefixes within a list of CIDR addresses, based on what
    prefixes are in use. E.g. this is `get_available_subnets`, but with
    explicit dependency injection.

    Args:
        prefix_or_prefixes: a single or a list of prefixes ("10.0.0.0/8") or
                          IPy.IP objects
        used_prefixes: prefixes that are in use

    Returns:
           An iterable IPy.IPSet of available addresses within prefix_or_prefixes

    """
    if not isinstance(prefix_or_prefixes, list):
        prefix_or_prefixes = [prefix_or_prefixes]
    base_prefixes = [str(prefix) for prefix in prefix_or_prefixes]
    acc = IPSet([IP(prefix) for prefix in prefix_or_prefixes])
    used_prefixes = IPSet([IP(used) for used in used_prefixes])
    # remove used prefixes
    acc.discard(used_prefixes)
    # filter away original prefixes
    return sorted([ip for ip in acc if str(ip) not in base_prefixes])
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def process_searchform(form):
    """Get searchresults based on form data"""
    extra = {}
    kwargs = {
        'last_changed__gte': datetime.now() - timedelta(
            days=form.cleaned_data['days'])
    }

    if form.cleaned_data['searchtype'] == 'ip':
        ip = IP(form.cleaned_data['searchvalue'])
        if ip.len() == 1:
            kwargs['ip'] = str(ip)
        else:
            extra['where'] = ["ip << '%s'" % str(ip)]
    else:
        key = form.cleaned_data['searchtype'] + '__icontains'
        kwargs[key] = form.cleaned_data['searchvalue']

    if form.cleaned_data['status'] != 'any':
        kwargs['status'] = form.cleaned_data['status']

    return Identity.objects.filter(**kwargs).extra(**extra)
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def __init__(self, host):
        if isinstance(host, Host):
            self.host = host.host
            self.ip = host.ip
            self.hostname = host.hostname
            return
        elif not host:
            raise ValueError("Host argument must be hostname or IP address")

        self.host = host
        if self.is_ip():
            self.ip = host
            self.hostname = self.get_host_by_addr() or host
        else:
            self.hostname = host
            self.ip = self.get_host_by_name() or None
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def get_queryset(self):
        """Filter for ip family"""
        if 'scope' in self.request.GET:
            queryset = (manage.Prefix.objects.within(
                self.request.GET.get('scope')).select_related('vlan')
                        .order_by('net_address'))
        elif self.request.GET.get('family'):
            queryset = manage.Prefix.objects.extra(
                where=['family(netaddr)=%s'],
                params=[self.request.GET['family']])
        else:
            queryset = manage.Prefix.objects.all()

        # Filter prefixes that is smaller than minimum prefix length
        results = [p for p in queryset
                   if IP(p.net_address).len() >= MINIMUMPREFIXLENGTH]

        return results
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def get(request, prefix):
        """Handles get request for prefix usage"""

        try:
            ip_prefix = IP(prefix)
        except ValueError:
            return Response("Bad prefix", status=status.HTTP_400_BAD_REQUEST)

        if ip_prefix.len() < MINIMUMPREFIXLENGTH:
            return Response("Prefix is too small",
                            status=status.HTTP_400_BAD_REQUEST)

        starttime, endtime = get_times(request)
        db_prefix = manage.Prefix.objects.get(net_address=prefix)
        serializer = serializers.PrefixUsageSerializer(
            prefix_collector.fetch_usage(db_prefix, starttime, endtime))

        return Response(serializer.data)
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def __init__(self, prefix, active_addresses, starttime=None, endtime=None):
        """

        :type prefix: manage.Prefix
        :type active_addresses: int
        :type starttime: datetime.datetime
        :type endtime: datetime.datetime
        """
        self.prefix = prefix.net_address
        self.active_addresses = active_addresses
        self.max_addresses = IP(self.prefix).len()
        self.max_hosts = self.max_addresses - 2
        self.usage = self.active_addresses / float(self.max_hosts) * 100
        self.starttime = starttime
        self.net_ident = prefix.vlan.net_ident
        self.vlan_id = prefix.vlan.vlan
        self.endtime = endtime if self.starttime else None
        self.url_machinetracker = reverse(
            'machinetracker-prefixid_search_active', args=[prefix.pk])
        self.url_report = reverse('report-prefix-prefix', args=[prefix.pk])
        self.url_vlan = reverse('vlan-details', args=[prefix.vlan.pk])
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def hostname(ip):
    """
    Performs a DNS reverse lookup for an IP address and caches the result in
    a global variable, which is really, really stupid.

    :param ip: And IP address string.
    :returns: A hostname string or a False value if the lookup failed.

    """
    addr = unicode(ip)
    if addr in _cached_hostname:
        return _cached_hostname[addr]

    try:
        dns = gethostbyaddr(addr)
    except herror:
        return False

    _cached_hostname[addr] = dns[0]
    return dns[0]
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def get_prefix_info(addr):
    """Returns the smallest prefix from the NAVdb that an IP address fits into.

    :param addr: An IP address string.
    :returns: A Prefix object or None if no prefixes matched.

    """
    try:
        return Prefix.objects.select_related().extra(
            select={"mask_size": "masklen(netaddr)"},
            where=["%s << netaddr AND nettype <> 'scope'"],
            order_by=["-mask_size"],
            params=[addr]
        )[0]
    except (IndexError, DatabaseError):
        return None
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def normalize_ip_to_string(ipaddr):
    """Normalizes an IP address to a a sortable string.

    When sending IP addresses to a browser and asking JavaScript to sort them
    as strings, this function will help.

    An IPv4 address will be normalized to '4' + <15-character dotted quad>.
    An IPv6 address will be normalized to '6' + <39 character IPv6 address>

    """
    try:
        ipaddr = IP(ipaddr)
    except ValueError:
        return ipaddr

    if ipaddr.version() == 4:
        quad = str(ipaddr).split('.')
        return '4%s' % '.'.join([i.zfill(3) for i in quad])
    else:
        return '6%s' % ipaddr.strFullsize()
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def resolve_ip_and_sysname(name):
    """Given a name that can be either an ip or a hostname/domain name, this
    function looks up IP and hostname.

    name - ip or hostname

    Returns:
     - tuple with ip-addres and sysname
    """
    try:
        ip_addr = IP(name)
    except ValueError:
        ip_addr = IP(gethostbyname(name))
    try:
        sysname = gethostbyaddr(unicode(ip_addr))[0]
    except SocketError:
        sysname = unicode(ip_addr)
    return (ip_addr, sysname)
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def does_ip_exist(ip_addr, netbox_id=None):
    """Checks if the given IP already exist in database.

    Parameters:
     * ip_addr   - the IP addres to look for.
     * netbox_id - a netbox primary key that can have the given ip_addr, and
                   the function will still return False.

    Returns:
     - True if the IP already exists in the database (and the netbox with the
       IP is not the same as the given netbox_id).
     - False if not.
    """
    if netbox_id:
        ip_qs = Netbox.objects.filter(Q(ip=unicode(ip_addr)), ~Q(id=netbox_id))
    else:
        ip_qs = Netbox.objects.filter(ip=unicode(ip_addr))
    return ip_qs.count() > 0
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def group_scopes(scopes):
    """Group scopes by version and type
    :type scopes: list[Prefix]
    """
    def _prefix_as_int(prefix):
        return IP(prefix.net_address).int()

    groups = defaultdict(list)
    for scope in scopes:
        prefix = IP(scope.net_address)
        if prefix.iptype() == 'PRIVATE':
            groups['private'].append(scope)
        elif prefix.version() == 4:
            groups['ipv4'].append(scope)
        elif prefix.version() == 6:
            groups['ipv6'].append(scope)

    if any([groups['private'], groups['ipv4'], groups['ipv6']]):
        return IpGroup(*[sorted(groups[x], key=_prefix_as_int)
                         for x in ('private', 'ipv4', 'ipv6')])
    else:
        return []
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def find_input_type(ip_or_mac):
    """Try to determine whether input is a valid ip-address,
    mac-address or an swportid. Return type and reformatted input as a
    tuple"""
    # Support mac-adresses on xx:xx... format
    ip_or_mac = str(ip_or_mac)
    mac = ip_or_mac.replace(':', '')

    input_type = "UNKNOWN"
    if is_valid_ip(ip_or_mac, use_socket_lib=True):
        input_type = "IP"
    elif re.match("^[A-Fa-f0-9]{12}$", mac):
        input_type = "MAC"
    elif re.match(r"^\d+$", ip_or_mac):
        input_type = "SWPORTID"

    return input_type
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def check_non_block(ip):
    """Checks if the ip is in the nonblocklist."""
    nonblockdict = parse_nonblock_file(NONBLOCKFILE)

    # We have the result of the nonblock.cfg-file in the dict
    # nonblockdict. This dict contains 3 things:
    # 1 - Specific ip-addresses
    # 2 - Ip-ranges (129.241.xxx.xxx/xx)
    # 3 - Ip lists (129.241.xxx.xxx-xxx)

    # Specific ip-addresses
    if ip in nonblockdict['ip']:
        LOGGER.info('Computer in nonblock list, skipping')
        raise InExceptionListError

    # Ip-ranges
    for ip_range in nonblockdict['range']:
        if ip in IP(ip_range):
            raise InExceptionListError
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def __init__(self, record, local_address=None):
        """Given a supported neighbor record, tries to identify the remote
        device and port among the ones registered in NAV's database.

        If a neighbor can be identified, the identified attribute is set to
        True.  The netbox and interface attributes will represent the
        identified items.

        :param record: Some namedtuple instance representing the
                       neighboring record read from the device.
        :param local_address: The management IP address used by the local
                              system. If supplied, will be used to identify
                              and ignore possible self-loops.

        """
        self.record = record
        self._invalid_neighbor_ips = list(INVALID_IPS)
        if local_address:
            self._invalid_neighbor_ips.append(str(local_address))

        self.netbox = self.interfaces = None
        self.identified = False

        self.identify()
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def _netbox_from_ip(self, ip):
        """Tries to find a Netbox from NAV's database based on an IP address.

        :returns: A shadows.Netbox object representing the netbox, or None if no
                  corresponding netbox was found.

        """
        try:
            ip = six.text_type(IP(ip))
        except ValueError:
            self._logger.warning("Invalid IP (%s) in neighbor record: %r",
                                 ip, self.record)
            return

        assert ip
        if ip in self._invalid_neighbor_ips:
            return
        return (self._netbox_query(Q(ip=ip)) or
                self._netbox_query(Q(interface__gwportprefix__gw_ip=ip)))
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def _load_existing_mappings(self):
        """Load the existing ARP records for this box from the db.

        Returns:

          A deferred whose result is a dictionary: { (ip, mac): arpid }
        """
        self._logger.debug("Loading open arp records from database")
        open_arp_records_queryset = manage.Arp.objects.filter(
            netbox__id=self.netbox.id,
            end_time__gte=datetime.max).values('id', 'ip', 'mac')
        open_arp_records = yield db.run_in_thread(
            storage.shadowify_queryset_and_commit,
            open_arp_records_queryset)
        self._logger.debug("Loaded %d open records from arp",
                           len(open_arp_records))

        open_mappings = dict(((IP(arp['ip']), arp['mac']), arp['id'])
                             for arp in open_arp_records)
        defer.returnValue(open_mappings)
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def get_ipv4_multicast_groups_per_port(self):
        """
        Returns IGMP snooping information from ports.

        :returns: A Deferred whose result is a list of MulticastStat tuples
        """
        column = "hpIgmpStatsPortAccess2"
        ports = yield self.retrieve_columns(
            [column]
        ).addCallback(self.translate_result).addCallback(reduce_index)

        def _split(item):
            index, columns = item
            vlan = index[0]
            group = index[1:5]
            ifindex = index[5]
            access = columns[column]
            return MulticastStat(IP('.'.join(str(i) for i in group)), ifindex,
                                 vlan, access)

        defer.returnValue([_split(i) for i in iteritems(ports)])
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def oid_to_ipv4(oid):
    """Converts a sequence of 4 numbers to an IPv4 object in the fastest
    known way.

    :param oid: Any list or tuple of 4 integers.

    """
    if len(oid) != 4:
        raise ValueError("IPv4 address must be 4 octets, not %d" % len(oid))
    try:
        addr, = unpack("!I", array.array("B", oid).tostring())
    except OverflowError as error:
        raise ValueError(error)
    return IP(addr, ipversion=4)

#############################################################################
# Varios OID consumer functions, which can be fed to the consume() function #
#############################################################################
# pylint: disable=invalid-name
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def __init__(self, host, community="public", version="1", port=161,
                 retries=3, timeout=1):
        """Makes a new Snmp-object.

        :param host: hostname or IP address
        :param community: community (password), defaults to "public"
        :param port: udp port number, defaults to "161"

        """

        self.host = host
        self.community = str(community)
        self.version = str(version)
        if self.version == '2':
            self.version = '2c'
        if self.version not in ('1', '2c'):
            raise UnsupportedSnmpVersionError(self.version)
        self.port = int(port)
        self.retries = retries
        self.timeout = timeout

        self.handle = _MySnmpSession(self._build_cmdline())
        self.handle.open()
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def _build_cmdline(self):
        try:
            address = IP(self.host)
        except ValueError:
            host = self.host
        else:
            host = ('udp6:[%s]' % self.host if address.version() == 6
                    else self.host)

        return (
            '-v' + self.version,
            '-c', self.community,
            '-r', str(self.retries),
            '-t', str(self.timeout),
            '%s:%s' % (host, self.port)
            )
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def is_valid_cidr(cidr):
    """Verifies that a string is valid IPv4 or IPv6 CIDR specification.

    A cleaned up version of the CIDR string is returned if it is verified,
    otherwise a false value is returned.

    Uses the IPy library to verify addresses.
    """
    if (isinstance(cidr, six.string_types) and
            not cidr.isdigit() and '/' in cidr):
        try:
            valid_cidr = IPy.IP(cidr) is not None
        except (ValueError, TypeError):
            return False
        else:
            return valid_cidr
    return False
项目:minihydra    作者:VillanCh    | 项目源码 | 文件源码
def is_ipv4(ip):
    """Check if the [ip] is a real ip addr.

    Params:
        ip: :str: General IP format.

    Returns:
        :type: bool
        :desc: if ip is a valid IP addr, return true
            else return False"""
    try:
        IP(ip)
        return True
    except ValueError:
        return False

#----------------------------------------------------------------------
项目:N4xD0rk    作者:n4xh4ck5    | 项目源码 | 文件源码
def VerifyIp(ip):

    ip_type = None

    try:

        ip_type = IP(ip).iptype()

        if ip_type is not "PUBLIC":

            ip_type = 'Private'

        else:

            ip_type = 'Public'

    except Exception:

        pass

    finally:

        return ip_type
项目:ripe-atlas-monitor    作者:pierky    | 项目源码 | 文件源码
def __str__(self):
        all_subnets = True
        for ip in self.dst_ip:
            all_subnets = all_subnets and ip.prefixlen() not in [32, 128]

        more_than_one = len(self.dst_ip) > 1

        if all_subnets:
            tpl = "Destination IP must fall into {}"
        else:
            if more_than_one:
                tpl = "Destination IP must be in {}"
            else:
                tpl = "Destination IP must be {}"

        return tpl.format(self._str_list())
项目:ripe-atlas-monitor    作者:pierky    | 项目源码 | 文件源码
def __init__(self, cfg):
        ExpResCriterion_DNSRecord.__init__(self, cfg)

        self.address = []
        addresses = self._enforce_list("address", str)
        for address in addresses:
            try:
                ip = IPy.IP(address)
            except:
                raise ConfigError(
                    "Invalid IP for {} record: {}".format(
                        self.RECORD_TYPE, address
                    )
                )
            if ip.version() != self.IP_VER:
                raise ConfigError(
                    "Invalid IP version ({}) for record type {}.".format(
                        ip.version(), self.RECORD_TYPE
                    )
                )
            self.address.append(ip)
项目:LinuxBashShellScriptForOps    作者:DingGuodong    | 项目源码 | 文件源码
def is_valid_ipv4(ip, version=4):
    import os
    import sys

    try:
        from IPy import IP
    except ImportError:
        try:
            command_to_execute = "pip install IPy || easy_install IPy"
            os.system(command_to_execute)
        except OSError:
            print "Can NOT install 'IPy', Aborted!"
            sys.exit(1)
        except Exception as e:
            print "Uncaught exception, %s" % e.message
            sys.exit(1)
        from IPy import IP
    try:
        result = IP(ip, ipversion=version)
    except ValueError:
        return False
    if result is not None and result != "":
        return True
项目:ops    作者:xiaomatech    | 项目源码 | 文件源码
def network_size(net, dhcp=None):
        """
        Func return gateway, mask and dhcp pool.
        """
        mask = IP(net).strNetmask()
        addr = IP(net)
        gateway = addr[1].strNormal()
        dhcp_pool = [addr[2].strNormal(), addr[addr.len() - 2].strNormal()]
        if dhcp:
            return gateway, mask, dhcp_pool
        else:
            return gateway, mask, None


# Read write lock
# ---------------
项目:ops    作者:xiaomatech    | 项目源码 | 文件源码
def get_ipv4_network(self):
        xml = self._XMLDesc(0)
        if util.get_xml_path(xml, "/network/ip") is None:
            return None
        addrStr = util.get_xml_path(xml, "/network/ip/@address")
        netmaskStr = util.get_xml_path(xml, "/network/ip/@netmask")
        prefix = util.get_xml_path(xml, "/network/ip/@prefix")

        if prefix:
            prefix = int(prefix)
            binstr = ((prefix * "1") + ((32 - prefix) * "0"))
            netmaskStr = str(IP(int(binstr, base=2)))

        if netmaskStr:
            netmask = IP(netmaskStr)
            gateway = IP(addrStr)
            network = IP(gateway.int() & netmask.int())
            ret = IP(str(network) + "/" + netmaskStr)
        else:
            ret = IP(str(addrStr))

        return ret
项目:functest    作者:opnfv    | 项目源码 | 文件源码
def update_hosts(self, hosts_info):  # pylint: disable=no-self-use
        """ Update hosts info """

        if not isinstance(hosts_info, dict):
            return api_utils.result_handler(
                status=1, data='Error, args should be a dict')

        for key, value in hosts_info.items():
            if key:
                try:
                    IPy.IP(value)
                except Exception:  # pylint: disable=broad-except
                    return api_utils.result_handler(
                        status=1, data='The IP %s is invalid' % value)
            else:
                return api_utils.result_handler(
                    status=1, data='Domain name is absent')

        try:
            functest_flag = "# SUT hosts info for Functest"
            hosts_list = ('\n{} {} {}'.format(ip, host_name, functest_flag)
                          for host_name, ip in hosts_info.items())

            with open("/etc/hosts", 'r') as file_hosts:
                origin_lines = [line for line in file_hosts
                                if functest_flag not in line]

            with open("/etc/hosts", 'w') as file_hosts:
                file_hosts.writelines(origin_lines)
                file_hosts.write(functest_flag)
                file_hosts.writelines(hosts_list)
        except Exception:  # pylint: disable=broad-except
            return api_utils.result_handler(
                status=1, data='Error when updating hosts info')
        else:
            return api_utils.result_handler(
                status=0, data='Update hosts info successfully')
项目:PRCDNS    作者:lbp0200    | 项目源码 | 文件源码
def data_received(self, data):
        data = self.get_data(data)
        if data is None:
            self.transport.close()
            return
        self.request = DNSRecord.parse(data)
        if self.args.debug:
            print('Data received: {!r}'.format(self.request))

        from IPy import IP
        ip = IP(self.peername[0])
        client_ip = self.peername[0]
        if ip.iptype() == 'PRIVATE':
            client_ip = self.args.myip

        url = 'https://dns.google.com/resolve?name={}&edns_client_subnet={}/24'.format(str(self.request.q.qname),
                                                                                       client_ip)
        # client = ProxyClient()
        # google_dns_resp = client.query_domain(url, self.args.proxy)
        try:
            from asyncio import ensure_future
        except ImportError:
            from asyncio import async as ensure_future

        asyncio.ensure_future(ProxyClient.query_domain(url, self.args.proxy), loop=self.loop).add_done_callback(
            functools.partial(self.send_resp))
项目:PRCDNS    作者:lbp0200    | 项目源码 | 文件源码
def get_arg():
    """????"""
    parser = argparse.ArgumentParser(prog='prcdns', description='google dns proxy.')
    parser.add_argument('--debug', help='debug model,default NO', default=False)
    parser.add_argument('-l', '--listen', help='listening IP,default 0.0.0.0', default='0.0.0.0')
    parser.add_argument('-p', '--port', help='listening Port,default 3535', default=3535)
    parser.add_argument('-r', '--proxy', help='Used For Query Google DNS,default direct', default=None)
    parser.add_argument('-ip', '--myip', help='IP location', default=None)

    return parser.parse_args()
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def IP(ip):
        return ip
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def formatStructValue(self, struct, name, value):
        if struct.startswith("sockaddr") and name.endswith("family"):
            return SOCKET_FAMILY.get(value, value)
        if struct == "sockaddr_in":
            if name == "sin_port":
                return ntoh_ushort(value)
            if name == "sin_addr":
                ip = ntoh_uint(value.s_addr)
                return IP(ip)
        return None
项目:maltego_tds_transforms    作者:passivetotal    | 项目源码 | 文件源码
def value_type(value):
    """Value type simply identifies if the passed value
    is a domain or IP address.

    @param  string  text to test as an IP
    @returns    domain or IP
    """
    try:
        IP(value)
        return 'ip'
    except:
        return 'domain'
项目:shodan-scripts    作者:heywoodlh    | 项目源码 | 文件源码
def gen_info():
        return ("IP: %s \n Organization: %s \n Operating System: %s" % (host['ip_str'], host.get('org', 'n/a'), host.get('os', 'n/a')))