Python ipaddress 模块,ip_address() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用ipaddress.ip_address()

项目:typepy    作者:thombashi    | 项目源码 | 文件源码
def __init__(self):
        self.typeclass = None
        self.strict_level = None
        self.header_list = None
        self.value_list = None

        self.__table_writer = ptw.RstSimpleTableWriter()
        self.__table_writer._dp_extractor.type_value_mapping = {
            NullString(None).typecode: '``""``',
            NoneType(None).typecode: "``None``",
            Infinity(None).typecode: '``Decimal("inf")``',
            Nan(None).typecode: '``Decimal("nan")``',
        }
        self.__table_writer._dp_extractor.const_value_mapping = {
            True: "``True``",
            False: "``False``",
            '``"127.0.0.1"``': '``ip_address("127.0.0.1")``',
        }
项目:craton    作者:openstack    | 项目源码 | 文件源码
def make_hosts(region, cell):
    # no of hosts need to match ip_address available
    no_of_hosts = 2
    cab1 = region + "." + cell + "." + "C-1"
    cab2 = region + "." + cell + "." + "C-2"

    hosts = []
    for host in range(no_of_hosts):
        hostname = "host%s.%s.example1.com" % (host, cab1)
        hosts.append(hostname)

    for host in range(no_of_hosts):
        hostname = "host%s.%s.example2.com" % (host, cab2)
        hosts.append(hostname)

    return hosts
项目:craton    作者:openstack    | 项目源码 | 文件源码
def create_netdevice(self, name, device_type):
        network_devices_url = self.url + "/network-devices"
        payload = {"name": name,
                   "model_name": "model-x",
                   "os_version": "version-1",
                   "device_type": device_type,
                   "ip_address": "10.10.1.1",
                   "active": True,
                   "cloud_id": self.cloud.get("id"),
                   "region_id": self.region.get("id"),
                   "cell_id": self.cell.get("id")}

        resp = requests.post(network_devices_url, headers=self.headers,
                             data=json.dumps(payload), verify=False)
        if resp.status_code != 201:
            raise Exception(resp.text)

        return resp.json()
项目:privcount    作者:privcount    | 项目源码 | 文件源码
def get_random_load_nonentry():
    '''
    Return a random item that probably isn't in match_func_result['load'].
    '''
    match_type = sys.argv[1]
    if match_type == 'ipasn':
        # Yes, we could do IPv6 here. But the type of the list doesn't matter:
        # a random IPv4 might not be in an IPv4 list, and it won't be in an
        # IPv6 list
        random_32_bit = random.randint(0, 2**32 - 1)
        ip = ipaddress.ip_address(random_32_bit)
        return ip
    else:
        char_list = list(get_random_load_entry())
        random.shuffle(char_list)
        return "".join(char_list)

# try to make sure that other processes don't warp the results too much
项目:privcount    作者:privcount    | 项目源码 | 文件源码
def validate_ip_network_address_prefix(address, prefix, strict=True):
    '''
    If address/prefix is a valid IP network when parsed according to strict,
    return it as an ipnetwork object.
    Otherwise, return None.
    '''
    ip_address = validate_ip_address(address)
    if ip_address is None or ip_address.version not in [4,6]:
        return None
    try:
        if ip_address.version == 4:
            return ipaddress.IPv4Network((ip_address, prefix), strict=strict)
        else:
            return ipaddress.IPv6Network((ip_address, prefix), strict=strict)
    except ValueError:
        return None
项目:talisker    作者:canonical-ols    | 项目源码 | 文件源码
def private(f):
    """Only allow approved source addresses."""

    @functools.wraps(f)
    def wrapper(self, request):
        if not request.access_route:
            # this means something probably bugged in werkzeug, but let's fail
            # gracefully
            return Response('no client ip provided', status='403')
        ip_str = request.access_route[-1]
        if isinstance(ip_str, bytes):
            ip_str = ip_str.decode('utf8')
        ip = ip_address(ip_str)
        if ip.is_loopback or any(ip in network for network in get_networks()):
            return f(self, request)
        else:
            msg = PRIVATE_BODY_RESPONSE_TEMPLATE.format(
                ip_str,
                force_unicode(request.remote_addr),
                request.headers.get('x-forwarded-for'))
            return Response(msg, status='403')
    return wrapper
项目:smc-python    作者:gabstopper    | 项目源码 | 文件源码
def set_unset(self, interface_id, attribute, address=None):
        """
        Set attribute to True and unset the same attribute for all other
        interfaces. This is used for interface options that can only be
        set on one engine interface.
        """
        for interface in self:
            for sub_interface in interface.sub_interfaces():
                # Skip VLAN only interfaces (no addresses)
                if not isinstance(sub_interface, PhysicalVlanInterface):
                    if getattr(sub_interface, attribute) is not None:
                        if sub_interface.nicid == str(interface_id):
                            if address is not None: # Find IP on Node Interface
                                if ipaddress.ip_address(bytes_to_unicode(address)) in \
                                    ipaddress.ip_network(sub_interface.network_value):
                                    setattr(sub_interface, attribute, True)
                                else:
                                    setattr(sub_interface, attribute, False)
                            else:
                                setattr(sub_interface, attribute, True)
                        else: #unset
                            setattr(sub_interface, attribute, False)
项目:SupercomputerInABriefcase    作者:SupercomputerInABriefcase    | 项目源码 | 文件源码
def add_service(self, zeroconf, type, name):
        """This gets called for services discovered.

        Example:

        Service SupercomputerInABriefcase on tom-xps._sciabc._tcp.local. added,
        service info:
            ServiceInfo(
                type='_sciabc._tcp.local.',
                name='SupercomputerInABriefcase on tom-xps._sciabc._tcp.local.',
                address=b'\n\xb7\xcd\xb6',
                port=34343,
                weight=0,
                priority=0,
                server='tom-xps.local.',
                properties={}
            )
        """
        info = zeroconf.get_service_info(type, name)
        print("Service %s added, service info: %s" % (name, info))
        ip = ipaddress.ip_address(info.address)
        append_ip_to_nodes_file(str(ip))
项目:python-miio    作者:rytilahti    | 项目源码 | 文件源码
def validate_ip(ctx, param, value):
    try:
        ipaddress.ip_address(value)
        return value
    except ValueError as ex:
        raise click.BadParameter("Invalid IP: %s" % ex)
项目:piqueserver    作者:piqueserver    | 项目源码 | 文件源码
def add_ban(self, ip, reason, duration, name=None):
        """
        Ban an ip with an optional reason and duration in minutes. If duration
        is None, ban is permanent.
        """
        network = ip_network(text_type(ip), strict=False)
        for connection in list(self.connections.values()):
            if ip_address(connection.address[0]) in network:
                name = connection.name
                connection.kick(silent=True)
        if duration:
            duration = reactor.seconds() + duration * 60
        else:
            duration = None
        self.bans[ip] = (name or '(unknown)', reason, duration)
        self.save_bans()
项目:iocage    作者:iocage    | 项目源码 | 文件源码
def sort_ip(ip, version="4"):
    """Sort the list by IP address."""
    list_length = len(ip)

    # Length 10 is list -l, 5 is list

    if list_length == 10:
        try:
            ip = ip[7] if version == "4" else ip[8]
            _ip = str(ipaddress.ip_address(ip.rsplit("|")[1]))
        except (ValueError, IndexError):
            # Lame hack to have "-" last.
            _ip = "Z"
    elif list_length == 6:
        try:
            _ip = str(ipaddress.ip_address(ip[5]))
        except ValueError:
            # Lame hack to have "-" last.
            _ip = "Z"
    else:
        _ip = ip

    return _ip
项目:sonic-snmpagent    作者:Azure    | 项目源码 | 文件源码
def test_getnextpdu(self):
        get_pdu = GetNextPDU(
            header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0),
            oids=(
                ObjectIdentifier(10, 0, 0, 0, (1, 3, 6, 1, 2, 1, 4, 21, 1, 7)),
            )
        )

        encoded = get_pdu.encode()
        response = get_pdu.make_response(self.lut)
        print(response)

        n = len(response.values)
        value0 = response.values[0]
        self.assertEqual(value0.type_, ValueType.IP_ADDRESS)
        self.assertEqual(str(value0.data), ipaddress.ip_address("10.0.0.1").packed.decode())
项目:sonic-snmpagent    作者:Azure    | 项目源码 | 文件源码
def test_getnextpdu_exactmatch(self):
        oid = ObjectIdentifier(14, 0, 1, 0, (1, 3, 6, 1, 2, 1, 4, 21, 1, 7, 0, 0, 0, 0))
        get_pdu = GetNextPDU(
            header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0),
            oids=[oid]
        )

        encoded = get_pdu.encode()
        response = get_pdu.make_response(self.lut)
        print(response)

        n = len(response.values)
        value0 = response.values[0]
        self.assertEqual(value0.type_, ValueType.IP_ADDRESS)
        print("test_getnextpdu_exactmatch: ", str(oid))
        self.assertEqual(str(value0.name), str(oid))
        self.assertEqual(str(value0.data), ipaddress.ip_address("10.0.0.1").packed.decode())
项目:sonic-snmpagent    作者:Azure    | 项目源码 | 文件源码
def test_getnextpdu(self):
        get_pdu = GetNextPDU(
            header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0),
            oids=(
                ObjectIdentifier(21, 0, 0, 0, (1, 3, 6, 1, 2, 1, 4, 24, 4, 1, 1, 0)),
            )
        )

        encoded = get_pdu.encode()
        response = get_pdu.make_response(self.lut)
        print(response)

        n = len(response.values)
        value0 = response.values[0]
        self.assertEqual(value0.type_, ValueType.IP_ADDRESS)
        self.assertEqual(str(value0.data), ipaddress.ip_address("0.0.0.0").packed.decode())
项目:sonic-snmpagent    作者:Azure    | 项目源码 | 文件源码
def test_getnextpdu_exactmatch(self):
        oid = ObjectIdentifier(24, 0, 1, 0, (1, 3, 6, 1, 2, 1, 4, 24, 4, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 0, 0, 17))
        get_pdu = GetNextPDU(
            header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0),
            oids=[oid]
        )

        encoded = get_pdu.encode()
        response = get_pdu.make_response(self.lut)
        print(response)

        n = len(response.values)
        value0 = response.values[0]
        self.assertEqual(value0.type_, ValueType.IP_ADDRESS)
        print("test_getnextpdu_exactmatch: ", str(oid))
        self.assertEqual(str(value0.name), str(oid))
        self.assertEqual(str(value0.data), ipaddress.ip_address("0.0.0.0").packed.decode())
项目:sonic-snmpagent    作者:Azure    | 项目源码 | 文件源码
def reinit_data(self):
        """
        Subclass update loopback information
        """
        self.loips = {}

        self.db_conn.connect(mibs.APPL_DB)
        loopbacks = self.db_conn.keys(mibs.APPL_DB, "INTF_TABLE:lo:*")
        if not loopbacks:
            return

        # collect only ipv4 interfaces
        for loopback in loopbacks:
            lostr = loopback.decode()
            loip = lostr[len("INTF_TABLE:lo:"):]
            ipa = ipaddress.ip_address(loip)
            if isinstance(ipa, ipaddress.IPv4Address):
                self.loips[loip] = ipa
项目:desec-stack    作者:desec-io    | 项目源码 | 文件源码
def perform_create(self, serializer, remote_ip):
        captcha = (
                ipaddress.ip_address(remote_ip) not in ipaddress.IPv6Network(os.environ['DESECSTACK_IPV6_SUBNET'])
                and (
                    User.objects.filter(
                        created__gte=timezone.now()-timedelta(hours=settings.ABUSE_BY_REMOTE_IP_PERIOD_HRS),
                        registration_remote_ip=remote_ip
                    ).count() >= settings.ABUSE_BY_REMOTE_IP_LIMIT
                    or
                    User.objects.filter(
                        created__gte=timezone.now() - timedelta(hours=settings.ABUSE_BY_EMAIL_HOSTNAME_PERIOD_HRS),
                        email__endswith=serializer.validated_data['email'].split('@')[-1]
                    ).count() >= settings.ABUSE_BY_EMAIL_HOSTNAME_LIMIT
                )
            )

        user = serializer.save(registration_remote_ip=remote_ip, captcha_required=captcha)
        if user.captcha_required:
            send_account_lock_email(self.request, user)
        elif not user.dyn:
            context = {'token': user.get_token()}
            send_token_email(context, user)
        signals.user_registered.send(sender=self.__class__, user=user, request=self.request)
项目:toshi-reputation-service    作者:toshiapp    | 项目源码 | 文件源码
def get_location_from_geolite2(pool, ip_addr):

    try:
        ip_addr = ipaddress.ip_address(ip_addr)
        # fix issues with asyncpg not handling ip address netmask defaults
        # correctly
        ip_addr = "{}/{}".format(ip_addr, 32 if ip_addr.version == 4 else 128)
        async with pool.acquire() as con:
            row = await con.fetchrow(
                "SELECT cs.country_iso_code FROM geolite2_ip_addresses ips "
                "JOIN geolite2_countries cs ON ips.geoname_id = cs.geoname_id "
                "WHERE ips.network >> $1", ip_addr)
        return row['country_iso_code'] if row else None
    except ValueError:
        return None
    except asyncpg.exceptions.UndefinedTableError:
        log.warning("Missing GeoLite2 database tables")
        return None
项目:ndr    作者:SecuredByTHEM    | 项目源码 | 文件源码
def test_scandata_parsing(self):
        '''Tests parsing of the basic surface scan'''
        scan = ndr.NmapScan()

        scan.parse_nmap_xml(load_nmap_xml_data(TEST_SURFACE_SCAN_DATA))
        self.assertEqual(5, len(scan))

        # Got three hosts, find a host by MAC address
        host = scan.find_by_mac('84:39:BE:64:3F:E5')[0]

        # SHould only find one IP address
        self.assertNotEqual(None, host)

        self.assertTrue(host.addr, ipaddress.ip_address("192.168.2.100"))
        self.assertEqual(host.reason, ndr.NmapReasons.ARP_RESPONSE)

        # Confirm that we're not finding phantom hosts
        self.assertEqual(scan.find_by_mac('NOT_A_REAL_MAC'), None)
项目:ndr    作者:SecuredByTHEM    | 项目源码 | 文件源码
def test_ipv6_linklocal_parsing(self):
        '''Tests the results of the IPv6 link-local scan'''

        scan = ndr.NmapScan()
        scan.parse_nmap_xml(load_nmap_xml_data(TEST_V6_LINK_LOCAL_SCAN_DATA))

        self.assertEqual(3, len(scan))

        # Got three hosts, find a host by MAC address
        host_list = scan.find_by_mac('84:39:BE:64:3F:E5')

        # SHould only find one IP address
        self.assertEqual(len(host_list), 1)

        host = host_list[0]
        self.assertTrue(host.reason, ndr.NmapReasons.ND_RESPONSE)
        self.assertEqual(host.addr, ipaddress.ip_address("fe80::8639:beff:fe64:3fe5"))
项目:ndr    作者:SecuredByTHEM    | 项目源码 | 文件源码
def from_dict(self, config_dict):
        '''Load settings from dictionary'''
        # Load the easy objects first
        if config_dict.get('version', None) != 1:
            raise ValueError("Unknown NDR NMAP config file version!")

        # Clean out the IP lists
        self.mac_address_config = {}
        self.ip_address_config = {}

        machine_ips = config_dict.get('machine_ips', dict())
        machine_macs = config_dict.get('machine_macs', dict())

        # Load in the machine IP addresses
        for ip_addr, value in machine_ips.items():
            self.ip_address_config[ipaddress.ip_address(ip_addr)] = NmapScanMode(value)

        # Now do it again with the MAC addresses. When we load in MACs, make them all
        # upper case to be consistent with NmapHosts

        for mac_addr, value in machine_macs.items():
            self.mac_address_config[mac_addr.upper()] = NmapScanMode(value)
项目:ndr    作者:SecuredByTHEM    | 项目源码 | 文件源码
def build_nmap_commandline(self, base_flags, address, interface=None):
        '''Builds common NMAP option command lines'''
        ipaddr = ipaddress.ip_address(address)


        # Several bits of magic are required here
        # 1. If we're v6 address or range, we need -6
        # 2. If we're link-local, we need to specify the interface

        options = base_flags
        if ipaddr.version == 6:
            options = "-6 " + options
        if ipaddr.is_link_local:
            options = "-e " + interface + " " + options

        return options
项目:ndr    作者:SecuredByTHEM    | 项目源码 | 文件源码
def from_dict(self, msg_dict):
        '''Converts the entry back to dict format'''
        self.timestamp = msg_dict['timestamp']
        self.proto = ndr.PortProtocols(msg_dict['proto'])
        self.src = ipaddress.ip_address(msg_dict['src'])

        if msg_dict['srcport'] is not None:
            self.srcport = int(msg_dict['srcport'])

        self.dst = ipaddress.ip_address(msg_dict['dst'])

        if msg_dict['dstport'] is not None:
            self.dstport = int(msg_dict['dstport'])

        self.ethsrc = msg_dict['ethsrc']
        self.ethdst = msg_dict['ethdst']
        self.ethlen = int(msg_dict['ethlen'])
        self.tcpflags = msg_dict['tcpflags']

        if 'tcpseq' is not None:
            self.tcpseq = msg_dict['tcpseq']
        else:
            self.tcpseq = None
项目:raiden    作者:raiden-network    | 项目源码 | 文件源码
def valid_mappable_ipv4(address):
    try:
        address_uni = unicode(address, errors='ignore')
    except TypeError:
        address_uni = address

    if address_uni in NON_MAPPABLE:
        return False

    try:
        parsed = ipaddress.ip_address(address_uni)
    except ValueError:
        log.debug('invalid IPv4 address', input=address)
        return False
    if parsed is not None and parsed.version == 4:
        return True
    else:
        return False
项目:kuberdock-platform    作者:cloudlinux    | 项目源码 | 文件源码
def setUp(self):
        network = u'192.168.1.0/30'
        self.node = self.fixtures.node()
        IpAddrPool().create(
            {'network': network, 'autoblock': '192.168.1.1', 'node':
                self.node.id})
        self.ippool = IPPool.query.get(network)
        self.pod = self.fixtures.pod(owner_id=self.user.id)
        self.pod_ip = PodIP(
            pod_id=self.pod.id,
            network=self.ippool.network,
            ip_address=int(ipaddress.ip_address(u'192.168.1.2')),
        )
        self.db.session.add(self.pod_ip)
        self.db.session.add(self.node)
        self.db.session.commit()

        self.stubs = K8SAPIStubs()
        self.stubs.node_info_in_k8s_api(self.node.hostname)
        self.stubs.node_info_update_in_k8s_api(self.node.hostname)
项目:kuberdock-platform    作者:cloudlinux    | 项目源码 | 文件源码
def setUp(self):
        from kubedock.pods.models import PodIP, IPPool
        self.ip = ipaddress.ip_address(u'192.168.43.4')
        self.with_ip_conf = {
            'public_ip': unicode(self.ip),
            'containers': [
                {'ports': [{'isPublic': True}, {}, {'isPublic': False}]},
                {'ports': [{'isPublic': True}, {}, {'isPublic': False}]},
            ],
        }
        self.without_ip_conf = {
            'public_ip_before_freed': unicode(self.ip),
            'containers': [
                {'ports': [{'isPublic_before_freed': True},
                           {'isPublic_before_freed': None},
                           {'isPublic_before_freed': False}]},
                {'ports': [{'isPublic_before_freed': True},
                           {'isPublic_before_freed': None},
                           {'isPublic_before_freed': False}]},
            ],
        }
        self.pod = self.fixtures.pod(config=json.dumps(self.with_ip_conf))
        self.ippool = IPPool(network='192.168.43.0/29').save()
        self.podip = PodIP(pod_id=self.pod.id, network=self.ippool.network,
                           ip_address=int(self.ip)).save()
项目:kuberdock-platform    作者:cloudlinux    | 项目源码 | 文件源码
def free_hosts_and_busy(self, as_int=None, page=None):
        pods = PodIP.filter_by(network=self.network)
        allocated_ips = {int(pod): pod.get_pod() for pod in pods}
        blocked_ips = self.get_blocked_set(as_int=True)
        hosts = self.hosts(as_int=True, page=page)

        def get_ip_state(ip, pod):
            if pod:
                return 'busy'
            if ip in blocked_ips:
                return 'blocked'
            return 'free'

        def get_ip_info(ip):
            pod = allocated_ips.get(ip)
            state = get_ip_state(ip, pod)
            return ip if as_int else str(ipaddress.ip_address(ip)), pod, state

        return [get_ip_info(ip) for ip in hosts]
项目:kuberdock-platform    作者:cloudlinux    | 项目源码 | 文件源码
def get_free_host(cls, as_int=None, node=None, ip=None):
        """Return free host if available.
        :param as_int: return ip as long int
        :param node: if set, get free host only for this node
        :param ip: if set, first try to check if this ip available, if not,
        then return any other available ip
        :return: ip address, as str or int, depends on  as_int value

        """
        if ip:
            network = cls.get_network_by_ip(ip)
            if network and network.is_ip_available(ip, node):
                return int(ipaddress.ip_address(ip)) if as_int else ip
        if node is None:
            networks = cls.all()
        else:
            networks = cls.filter(cls.node.has(hostname=node))
        for n in networks:
            free_host = n.get_first_free_host(as_int=as_int)
            if free_host is not None:
                return free_host
        raise NoFreeIPs()
项目:kuberdock-platform    作者:cloudlinux    | 项目源码 | 文件源码
def init():
    config = read_config_ini('/run/flannel/subnet.env')
    config_network = config['flannel_network']
    config_subnet = config['flannel_subnet']
    _update_ipset('kuberdock_flannel', [config_network], set_type='hash:net')
    _update_ipset('kuberdock_nodes', [])
    _update_ipset('kuberdock_cluster',
                  ['kuberdock_flannel', 'kuberdock_nodes'],
                  set_type='list:set')
    network = ipaddress.ip_network(unicode(config_network))
    subnet = ipaddress.ip_network(unicode(config_subnet), strict=False)
    etcd = ETCD()
    for user in etcd.users():
        for pod in etcd.pods(user):
            pod_ip = ipaddress.ip_address(pod)
            if pod_ip not in network or pod_ip in subnet:
                etcd.delete_user(user, pod)
    update_ipset()
项目:charm-glusterfs    作者:openstack    | 项目源码 | 文件源码
def get_local_bricks(volume: str) -> Result:
    """
        Return all bricks that are being served locally in the volume
        volume: Name of the volume to get local bricks for
    """
    try:
        vol_info = volume_info(volume)
        local_ip = get_local_ip()
        if local_ip.is_err():
            return Err(local_ip.value)
        local_brick_list = []
        for vol in vol_info:
            for brick in vol.bricks:
                if ip_address(brick.peer.hostname) == local_ip.value:
                    local_brick_list.append(brick)
        return Ok(local_brick_list)
    except GlusterCmdOutputParseError:
        raise
项目:igd-exporter    作者:yrro    | 项目源码 | 文件源码
def main():
    '''
    You are here.
    '''
    parser = argparse.ArgumentParser()
    parser.add_argument('--bind-address', type=ipaddress.ip_address, default='::', help='IPv6 or IPv4 address to listen on')
    parser.add_argument('--bind-port', type=int, default=9196, help='Port to listen on')
    parser.add_argument('--bind-v6only', type=int, choices=[0, 1], help='If 1, prevent IPv6 sockets from accepting IPv4 connections; if 0, allow; if unspecified, use OS default')
    parser.add_argument('--thread-count', type=int, help='Number of request-handling threads to spawn')
    args = parser.parse_args()

    server = wsgiext.Server((args.bind_address, args.bind_port), wsgiext.SilentRequestHandler, args.thread_count, args.bind_v6only)
    server.set_app(exporter.wsgi_app)
    wsgi_thread = threading.Thread(target=functools.partial(server.serve_forever, 86400), name='wsgi')

    def handle_sigterm(signum, frame):
        server.shutdown()
    signal.signal(signal.SIGTERM, handle_sigterm)

    wsgi_thread.start()

    wsgi_thread.join()

    server.server_close()
项目:kawaii-player    作者:kanishka-linux    | 项目源码 | 文件源码
def get_external_url_status(self, finalUrl):
        external_url = False
        if finalUrl.startswith('"http') or finalUrl.startswith('http'):
            try:
                ip_addr = finalUrl.split('/')[2]
                if ':' in ip_addr:
                    ip_addr = ip_addr.split(':')[0]
            except Exception as e:
                print(e)
                ip_addr = 'none'
            private_ip = False
            try:
                ip_obj = ipaddress.ip_address(ip_addr)
                print(ip_obj)
                if ip_obj.is_private:
                    private_ip = True
            except Exception as e:
                print(e)
            if not private_ip:
                external_url = True
        return external_url
项目:nfvOrchestrator    作者:uestcNFVproject    | 项目源码 | 文件源码
def build_trace_req_header(oam_type, sil, remote_ip, remote_port):
    trace_req_header_values = TRACEREQHEADER()
    trace_req_header_values.oam_type = oam_type
    trace_req_header_values.sil = sil
    trace_req_header_values.port = int(remote_port)

    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.connect((remote_ip, trace_req_header_values.port))
    # print(s.getsockname()[0])
    src_addr = ipaddress.ip_address(s.getsockname()[0])
    if src_addr.version == 4:
        trace_req_header_values.ip_1 = 0x00000000
        trace_req_header_values.ip_2 = 0x00000000
        trace_req_header_values.ip_3 = 0x0000FFFF
        trace_req_header_values.ip_4 = int(ipaddress.IPv4Address(src_addr))
    elif src_addr.version == 6:
        int_addr6 = int(ipaddress.IPv6Address(src_addr))
        trace_req_header_values.ip_1 = int_addr6 >> 96
        trace_req_header_values.ip_2 = (int_addr6 >> 64) & 0x0FFFFFFFF
        trace_req_header_values.ip_3 = (int_addr6 >> 32) & 0x0FFFFFFFF
        trace_req_header_values.ip_4 = int_addr6 & 0x0FFFFFFFF

    return trace_req_header_values
项目:nfvOrchestrator    作者:uestcNFVproject    | 项目源码 | 文件源码
def _get_current_ip_version(self, ip):
        """
        Get current IP address version

        :param ip: IP address
        :type ip: str

        :return int

        """
        if '/' in ip:
            ip_parts = ip.split('/')
            ip = ip_parts[0]

        ip = ipaddress.ip_address(ip)

        return ip.version
项目:cocrawler    作者:cocrawler    | 项目源码 | 文件源码
def resolve(self, host, port, **kwargs):
        with stats.record_latency('fetcher DNS lookup', url=host):
            with stats.coroutine_state('fetcher DNS lookup'):
                addrs = await super().resolve(host, port, **kwargs)

        ret = []
        for a in addrs:
            try:
                ip = ipaddress.ip_address(a.host)
            except ValueError:
                continue
            if not self._crawllocalhost and ip.is_localhost:
                continue
            if not self._crawlprivate and ip.is_private:
                continue
            ret.append(a)

        if len(addrs) != len(ret):
            LOGGER.info('threw out some ip addresses for %s', host)

        return ret
项目:django-heartbeat    作者:pbs    | 项目源码 | 文件源码
def is_authorized(ip, authorized_ips):
    ip = ip_address(ip)

    for item in authorized_ips:
        try:
            if ip == ip_address(item):
                return True
        except ValueError:
            try:
                if ip in ip_network(item):
                    return True
            except ValueError:
                logger.warn('The "authorized_ip" list (settings.HEARTBEAT)'
                            'contains an item that is neither an ip address '
                            'nor an ip network: {}'.format(item))
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def _ipaddress_match(ipname, host_ip):
    """Exact matching of IP addresses.

    RFC 6125 explicitly doesn't define an algorithm for this
    (section 1.7.2 - "Out of Scope").
    """
    # OpenSSL may add a trailing newline to a subjectAltName's IP address
    # Divergence from upstream: ipaddress can't handle byte str
    ip = ipaddress.ip_address(_to_unicode(ipname).rstrip())
    return ip == host_ip
项目:Home-Assistant    作者:jmart518    | 项目源码 | 文件源码
def check_access(clientip):
    global BANNED_IPS
    if clientip in BANNED_IPS:
        return False
    if not ALLOWED_NETWORKS:
        return True
    for net in ALLOWED_NETWORKS:
        ipobject = ipaddress.ip_address(clientip)
        if ipobject in ipaddress.ip_network(net):
            return True
    BANNED_IPS.append(clientip)
    return False
项目:dhcp-stats-prometheus    作者:atonkyra    | 项目源码 | 文件源码
def test_restricted(remote_address):
    addr = ipaddress.ip_address(remote_address)
    if args.restrict is not None:
        allowed = False
        for restrict_address in restricted_addresses:
            if test_address_pair(restrict_address, addr):
                allowed = True
                break
        return allowed
    else:
        return True
项目:googletranslate.popclipext    作者:wizyoung    | 项目源码 | 文件源码
def _ipaddress_match(ipname, host_ip):
    """Exact matching of IP addresses.

    RFC 6125 explicitly doesn't define an algorithm for this
    (section 1.7.2 - "Out of Scope").
    """
    # OpenSSL may add a trailing newline to a subjectAltName's IP address
    # Divergence from upstream: ipaddress can't handle byte str
    ip = ipaddress.ip_address(_to_unicode(ipname).rstrip())
    return ip == host_ip
项目:marathon-acme    作者:praekeltfoundation    | 项目源码 | 文件源码
def parse_listen_addr(listen_addr):
    """
    Parse an address of the form [ipaddress]:port into a tcp or tcp6 Twisted
    endpoint description string for use with
    ``twisted.internet.endpoints.serverFromString``.
    """
    if ':' not in listen_addr:
        raise ValueError(
            "'%s' does not have the correct form for a listen address: "
            '[ipaddress]:port' % (listen_addr,))
    host, port = listen_addr.rsplit(':', 1)

    # Validate the host
    if host == '':
        protocol = 'tcp'
        interface = None
    else:
        if host.startswith('[') and host.endswith(']'):  # IPv6 wrapped in []
            host = host[1:-1]
        ip_address = ipaddress.ip_address(_to_unicode(host))
        protocol = 'tcp6' if ip_address.version == 6 else 'tcp'
        interface = str(ip_address)

    # Validate the port
    if not port.isdigit() or int(port) < 1 or int(port) > 65535:
        raise ValueError(
            "'%s' does not appear to be a valid port number" % (port,))

    args = [protocol, port]
    kwargs = {'interface': interface} if interface is not None else {}

    return _create_tx_endpoints_string(args, kwargs)
项目:typepy    作者:thombashi    | 项目源码 | 文件源码
def force_convert(self):
        import ipaddress

        try:
            return ipaddress.ip_address(six.text_type(self._value))
        except ValueError:
            raise TypeConversionError(
                "failed to force_convert to dictionary: type={}".format(
                    type(self._value)))
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def public_interface(self):
        """
        Check that all minions have an address on the public network
        """
        for node in self.data.keys():
            if ('roles' in self.data[node] and
                'master' in self.data[node]['roles']):
                continue
            found = False
            public_network = self.data[node].get("public_network", "")
            net_list = Util.parse_list_from_string(public_network)
            for address in self.grains[node]['ipv4']:
                try:
                    for network in net_list:
                        addr = ipaddress.ip_address(u'{}'.format(address))
                        net = ipaddress.ip_network(u'{}'.format(network))
                        if addr in net:
                            found = True
                except ValueError:
                    # Don't care about reporting a ValueError here if
                    # public_network is malformed, because the
                    # previous validation in public_network() will do that.
                    pass
            if not found:
                msg = "minion {} missing address on public network {}".format(node, public_network)
                self.errors.setdefault('public_interface', []).append(msg)

        self._set_pass_status('public_network')
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def cluster_interface(self):
        """
        Check that storage nodes have an interface on the cluster network
        """
        # pylint: disable=too-many-nested-blocks
        for node in self.data.keys():
            if ('roles' in self.data[node] and
                'storage' in self.data[node]['roles']):
                found = False
                cluster_network = self.data[node].get("cluster_network", "")
                net_list = Util.parse_list_from_string(cluster_network)
                for address in self.grains[node]['ipv4']:
                    try:
                        for network in net_list:
                            addr = ipaddress.ip_address(u'{}'.format(address))
                            net = ipaddress.ip_network(u'{}'.format(network))
                            if addr in net:
                                found = True
                    except ValueError:
                        # Don't care about reporting a ValueError here if
                        # cluster_network is malformed, because the
                        # previous validation in cluster_network() will do that.
                        pass
                if not found:
                    msg = "minion {}".format(node)
                    msg += " missing address on cluster network {}".format(cluster_network)
                    self.errors.setdefault('cluster_interface', []).append(msg)

        self._set_pass_status('cluster_interface')
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def address():
    """
    Find the public address for a minion
    """

    if 'public_network' not in __pillar__:
        return ""

    log.debug("pillar: {}".format(type(__pillar__['public_network'])))
    if isinstance(__pillar__['public_network'], str):
        networks = re.split(', *', __pillar__['public_network'])
    else:
        networks = __pillar__['public_network']

    log.debug("networks: {}".format(pprint.pformat(networks)))
    for public_network in networks:
        log.info('public_network: {}'.format(public_network))
        interfaces = __salt__['network.interfaces']()

        log.debug("interfaces: {}".format(pprint.pformat(interfaces)))
        for interface in interfaces:
            log.info("interface: {}".format(pprint.pformat(interface)))
            if 'inet' in interfaces[interface]:
                for entry in interfaces[interface]['inet']:
                    _address = entry['address']
                    log.info("Checking address {}".format(_address))
                    if (ipaddress.ip_address(u'{}'.format(_address)) in
                        ipaddress.ip_network(u'{}'.format(public_network))):
                        return _address
    return ""
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def _ipaddress_match(ipname, host_ip):
    """Exact matching of IP addresses.

    RFC 6125 explicitly doesn't define an algorithm for this
    (section 1.7.2 - "Out of Scope").
    """
    # OpenSSL may add a trailing newline to a subjectAltName's IP address
    # Divergence from upstream: ipaddress can't handle byte str
    ip = ipaddress.ip_address(_to_unicode(ipname).rstrip())
    return ip == host_ip
项目:yarl    作者:aio-libs    | 项目源码 | 文件源码
def with_host(self, host):
        """Return a new URL with host replaced.

        Autoencode host if needed.

        Changing host for relative URLs is not allowed, use .join()
        instead.

        """
        # N.B. doesn't cleanup query/fragment
        if not isinstance(host, str):
            raise TypeError("Invalid host type")
        if not self.is_absolute():
            raise ValueError("host replacement is not allowed "
                             "for relative URLs")
        if not host:
            raise ValueError("host removing is not allowed")
        try:
            ip = ip_address(host)
        except ValueError:
            host = host.encode('idna').decode('ascii')
        else:
            if ip.version == 6:
                host = '[' + host + ']'
        val = self._val
        return URL(
            self._val._replace(netloc=self._make_netloc(val.username,
                                                        val.password,
                                                        host,
                                                        val.port)),
            encoded=True)
项目:presto-python-client    作者:prestodb    | 项目源码 | 文件源码
def _normalize_url_with_hostname(url):
    # type: (Text) -> Text
    # TODO: replace urlparse by more robust utf-8 handling code
    parsed = urlparse(url.encode('utf-8'))
    hostname = parsed.hostname.decode('utf-8')  # type: ignore
    try:
        ipaddress.ip_address(hostname)
        hostname = socket.gethostbyaddr(hostname)[0].encode('utf-8')
    except ValueError:
        return url
    return parsed._replace(netloc=b'%s:%d' % (hostname, parsed.port)).geturl()
项目:craton    作者:openstack    | 项目源码 | 文件源码
def generate_ip_addresses(self, num_ips, starting_ip):
        start_ip_address = ip_address(starting_ip)
        ips = [str(start_ip_address + i) for i in range(num_ips)]
        return ips