Python ipaddress 模块,IPv6Address() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ipaddress.IPv6Address()

项目:typepy    作者:thombashi    | 项目源码 | 文件源码
def exeute(self, method, value):
        str_convert_type = (
            six.text_type, ipaddress.IPv4Address, ipaddress.IPv6Address)

        try:
            result = getattr(
                self.typeclass(value, self.strict_level), method)()

            if method == "validate":
                result = "NOP [#f1]_"
            elif isinstance(result, str_convert_type):
                result = '``"{}"``'.format(result)
        except TypeError:
            result = "E [#f2]_"
        except typepy.TypeConversionError:
            result = "E [#f3]_"

        return result
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:validus    作者:shopnilsazal    | 项目源码 | 文件源码
def isipv6(value):
    """
    Return whether or not given value is an IP version 6.
    If the value is an IP version 6, this function returns ``True``, otherwise ``False``.

    Examples::

        >>> isipv6('2001:41d0:2:a141::1')
        True

        >>> isipv6('127.0.0.1')
        False

    :param value: string to validate IP version 6
    """
    try:
        ip_addr = ipaddress.IPv6Address(value)
    except ipaddress.AddressValueError:
        return False
    return ip_addr.version == 6
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:dhcpkit_vpp    作者:sjm-steffann    | 项目源码 | 文件源码
def setUp(self):
        self.packet_class = IPv6

        self.packet_fixture = bytes.fromhex(
            '6efbcdef001811fd200109e0000000000000000000032002200109e0000400320212004500320222'
            '123456780018a695') + b'Demo UDP packet!'

        self.message_fixture = IPv6(
            traffic_class=239,
            flow_label=773615,
            next_header=17,
            hop_limit=253,
            source=IPv6Address('2001:9e0::3:2002'),
            destination=IPv6Address('2001:9e0:4:32:212:45:32:0222'),
            payload=UDP(
                source_port=4660,
                destination_port=22136,
                checksum=42645,
                payload=b'Demo UDP packet!'
            )
        )

        self.parse_packet()
项目:dhcpkit_vpp    作者:sjm-steffann    | 项目源码 | 文件源码
def test_l4_payload_type(self):
        message = IPv6(
            traffic_class=0,
            flow_label=0,
            next_header=0,
            hop_limit=0,
            source=IPv6Address('::'),
            destination=IPv6Address('::'),
            payload=UnknownLayer4Protocol(b'1234')
        )
        packet = bytes.fromhex('6000000000040000000000000000000000000000'
                               '0000000000000000000000000000000000000000'
                               '31323334')
        parsed_len, parsed_message = IPv6.parse(packet)
        self.assertEqual(parsed_len, len(packet))
        self.assertEqual(parsed_message, message)
        self.assertEqual(message.save(), packet)
项目:dhcpkit_vpp    作者:sjm-steffann    | 项目源码 | 文件源码
def find_link_address(link_address: Optional[IPv6Address],
                          interface_addresses: Iterable[IPv6Address]) -> IPv6Address:
        """
        Find the appropriate reply-from address

        :param link_address: The link-address address specified in the configuration, if any
        :param interface_addresses: The list of addresses on the interface
        :return: The reply-from address to use
        """
        if link_address:
            return link_address
        else:
            # Pick the first global address
            global_addresses = [address for address in interface_addresses if is_global_unicast(address)]
            if global_addresses:
                return global_addresses[0]
            else:
                return IPv6Address('::')
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:archive-Holmes-Totem-Service-Library    作者:HolmesProcessing    | 项目源码 | 文件源码
def ValidateIP(ip):
    """
    Check if an IP is public or not.
    Public IP is any IP that is neither of:

    - private
    - loopback
    - broadcast / multicast
    - link-local

    Returns True on success and False on failure.

    Raises a ValueError exception if the input is not of type
    ipaddress.IPv4Address or ipaddress.IPv6Address.
    """
    return __validateIP(ip)
项目:nfvOrchestrator    作者:uestcNFVproject    | 项目源码 | 文件源码
def build_trace_req_header(oam_type, sil, remote_ip, remote_port):
    trace_req_header_values = TRACEREQHEADER()
    trace_req_header_values.oam_type = oam_type
    trace_req_header_values.sil = sil
    trace_req_header_values.port = int(remote_port)

    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.connect((remote_ip, trace_req_header_values.port))
    # print(s.getsockname()[0])
    src_addr = ipaddress.ip_address(s.getsockname()[0])
    if src_addr.version == 4:
        trace_req_header_values.ip_1 = 0x00000000
        trace_req_header_values.ip_2 = 0x00000000
        trace_req_header_values.ip_3 = 0x0000FFFF
        trace_req_header_values.ip_4 = int(ipaddress.IPv4Address(src_addr))
    elif src_addr.version == 6:
        int_addr6 = int(ipaddress.IPv6Address(src_addr))
        trace_req_header_values.ip_1 = int_addr6 >> 96
        trace_req_header_values.ip_2 = (int_addr6 >> 64) & 0x0FFFFFFFF
        trace_req_header_values.ip_3 = (int_addr6 >> 32) & 0x0FFFFFFFF
        trace_req_header_values.ip_4 = int_addr6 & 0x0FFFFFFFF

    return trace_req_header_values
项目:pysnmp    作者:etingof    | 项目源码 | 文件源码
def getSendTo(addressType):
        def sendto(s, _data, _to):
            ancdata = []
            if type(_to) == addressType:
                addr = ipaddress.ip_address(_to.getLocalAddress()[0])
            else:
                addr = ipaddress.ip_address(s.getsockname()[0])
            if type(addr) == ipaddress.IPv4Address:
                _f = in_pktinfo()
                _f.ipi_spec_dst = in_addr.from_buffer_copy(addr.packed)
                ancdata = [(socket.SOL_IP, socket.IP_PKTINFO, memoryview(_f).tobytes())]
            elif s.family == socket.AF_INET6 and type(addr) == ipaddress.IPv6Address:
                _f = in6_pktinfo()
                _f.ipi6_addr = in6_addr.from_buffer_copy(addr.packed)
                ancdata = [(socket.SOL_IPV6, socket.IPV6_PKTINFO, memoryview(_f).tobytes())]
            return s.sendmsg([_data], ancdata, 0, _to)

        return sendto
项目:socks5    作者:mike820324    | 项目源码 | 文件源码
def __init__(self, cmd, atyp, addr, port):
        if cmd not in REQ_COMMAND.values():
            raise ValueError("Unsupported request command {}".format(cmd))

        if atyp not in ADDR_TYPE.values():
            raise ValueError("Unsupported address type {}".format(atyp))

        if atyp == ADDR_TYPE["IPV4"]:
            try:
                addr = ipaddress.IPv4Address(addr)
            except ipaddress.AddressValueError:
                raise ValueError("Invalid ipaddress format for IPv4")
        elif atyp == ADDR_TYPE["IPV6"]:
            try:
                addr = ipaddress.IPv6Address(addr)
            except ipaddress.AddressValueError:
                raise ValueError("Invalid ipaddress format for IPv6")
        elif atyp == ADDR_TYPE["DOMAINNAME"] and not isinstance(addr, string_func):
            raise ValueError("Domain name expect to be unicode string")

        self.cmd = cmd
        self.atyp = atyp
        self.addr = addr
        self.port = port
项目:socks5    作者:mike820324    | 项目源码 | 文件源码
def __init__(self, status, atyp, addr, port):
        if status not in RESP_STATUS.values():
            raise ValueError("Unsupported status code {}".format(status))

        if atyp not in ADDR_TYPE.values():
            raise ValueError("Unsupported address type {}".format(atyp))

        if atyp == ADDR_TYPE["IPV4"]:
            try:
                addr = ipaddress.IPv4Address(addr)
            except ipaddress.AddressValueError:
                raise ValueError("Invalid ipaddress format for IPv4")
        elif atyp == ADDR_TYPE["IPV6"]:
            try:
                addr = ipaddress.IPv6Address(addr)
            except ipaddress.AddressValueError:
                raise ValueError("Invalid ipaddress format for IPv6")
        elif atyp == ADDR_TYPE["DOMAINNAME"] and not isinstance(addr, string_func):
            raise ValueError("Domain name expect to be unicode string")

        self.status = status
        self.atyp = atyp
        self.addr = addr
        self.port = port
项目:peerme    作者:cooperlees    | 项目源码 | 文件源码
def validate_ip_address(self, address, af=6):
        if af == 4:
            try:
                ipaddress.IPv4Address(address)
                return True
            except ipaddress.AddressValueError as ex:
                logging.error(ex)
        elif af == 6:
            try:
                ipaddress.IPv6Address(address)
                return True
            except ipaddress.AddressValueError as ex:
                logging.error(ex)
        else:
            raise Exception("Invalid AF: {}".format(af))
        return False
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:recipe-catalog    作者:inocybe    | 项目源码 | 文件源码
def build_trace_req_header(oam_type, sil, remote_ip, remote_port):
    trace_req_header_values = TRACEREQHEADER()
    trace_req_header_values.oam_type = oam_type
    trace_req_header_values.sil = sil
    trace_req_header_values.port = int(remote_port)

    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.connect((remote_ip, trace_req_header_values.port))
    # print(s.getsockname()[0])
    src_addr = ipaddress.ip_address(s.getsockname()[0])
    if src_addr.version == 4:
        trace_req_header_values.ip_1 = 0x00000000
        trace_req_header_values.ip_2 = 0x00000000
        trace_req_header_values.ip_3 = 0x0000FFFF
        trace_req_header_values.ip_4 = int(ipaddress.IPv4Address(src_addr))
    elif src_addr.version == 6:
        int_addr6 = int(ipaddress.IPv6Address(src_addr))
        trace_req_header_values.ip_1 = int_addr6 >> 96
        trace_req_header_values.ip_2 = (int_addr6 >> 64) & 0x0FFFFFFFF
        trace_req_header_values.ip_3 = (int_addr6 >> 32) & 0x0FFFFFFFF
        trace_req_header_values.ip_4 = int_addr6 & 0x0FFFFFFFF

    return trace_req_header_values
项目:nat64check    作者:sjm-steffann    | 项目源码 | 文件源码
def get_addresses(hostname) -> List[Union[IPv4Address, IPv6Address]]:
    # Get DNS info
    try:
        a_records = subprocess.check_output(args=['dig', '+short', 'a', hostname],
                                            stderr=subprocess.DEVNULL)
        aaaa_records = subprocess.check_output(args=['dig', '+short', 'aaaa', hostname],
                                               stderr=subprocess.DEVNULL)
        dns_records = a_records + aaaa_records

        dns_results = []
        for line in dns_records.decode('utf-8').strip().split():
            try:
                dns_results.append(str(ip_address(line)))
            except ValueError:
                pass
    except subprocess.CalledProcessError:
        dns_results = []

    return dns_results
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:smarthome    作者:smarthomeNG    | 项目源码 | 文件源码
def is_ipv6(string):
        """
        Checks if a string is a valid ip-address (v6)

        :param string: String to check
        :type string: str

        :return: True if an ipv6, false otherwise.
        :rtype: bool
        """

        try:
            ipaddress.IPv6Address(string)
            return True
        except ipaddress.AddressValueError:
            return False
项目:kekescan    作者:xiaoxiaoleo    | 项目源码 | 文件源码
def getSendTo(addressType):
        def sendto(s, _data, _to):
            ancdata = []
            if type(_to) == addressType:
                addr = ipaddress.ip_address(_to.getLocalAddress()[0])
            else:
                addr = ipaddress.ip_address(s.getsockname()[0])
            if type(addr) == ipaddress.IPv4Address:
                _f = in_pktinfo()
                _f.ipi_spec_dst = in_addr.from_buffer_copy(addr.packed)
                ancdata = [(socket.SOL_IP, socket.IP_PKTINFO, memoryview(_f).tobytes())]
            elif s.family == socket.AF_INET6 and type(addr) == ipaddress.IPv6Address:
                _f = in6_pktinfo()
                _f.ipi6_addr = in6_addr.from_buffer_copy(addr.packed)
                ancdata = [(socket.SOL_IPV6, socket.IPV6_PKTINFO, memoryview(_f).tobytes())]
            return s.sendmsg([_data], ancdata, 0, _to)
        return sendto
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_bad_address_split_v6_repeated_double_colon(self):
        def assertBadSplit(addr):
            msg = "At most one '::' permitted in %r"
            with self.assertAddressError(msg, addr):
                ipaddress.IPv6Address(addr)

        assertBadSplit("3ffe::1::1")
        assertBadSplit("1::2::3::4:5")
        assertBadSplit("2001::db:::1")
        assertBadSplit("3ffe::1::")
        assertBadSplit("::3ffe::1")
        assertBadSplit(":3ffe::1::1")
        assertBadSplit("3ffe::1::1:")
        assertBadSplit(":3ffe::1::1:")
        assertBadSplit(":::")
        assertBadSplit('2001:db8:::1')
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testTeredo(self):
        # stolen from wikipedia
        server = ipaddress.IPv4Address('65.54.227.120')
        client = ipaddress.IPv4Address('192.0.2.45')
        teredo_addr = '2001:0000:4136:e378:8000:63bf:3fff:fdd2'
        self.assertEqual((server, client),
                         ipaddress.ip_address(teredo_addr).teredo)
        bad_addr = '2000::4136:e378:8000:63bf:3fff:fdd2'
        self.assertFalse(ipaddress.ip_address(bad_addr).teredo)
        bad_addr = '2001:0001:4136:e378:8000:63bf:3fff:fdd2'
        self.assertFalse(ipaddress.ip_address(bad_addr).teredo)

        # i77
        teredo_addr = ipaddress.IPv6Address('2001:0:5ef5:79fd:0:59d:a0e5:ba1')
        self.assertEqual((ipaddress.IPv4Address('94.245.121.253'),
                          ipaddress.IPv4Address('95.26.244.94')),
                         teredo_addr.teredo)
项目:slack_scholar    作者:xLeitix    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:pyspinel    作者:openthread    | 项目源码 | 文件源码
def handle_ipaddr_insert(self, prefix, prefix_len, _stable, flags, _is_local):
        """ Add an ip address for each prefix on prefix change. """

        ipaddr_str = str(ipaddress.IPv6Address(prefix)) + \
            str(self.wpan_api.nodeid)
        if CONFIG.DEBUG_LOG_PROP:
            print("\n>>>> new PREFIX add ipaddr: " + ipaddr_str)

        valid = 1
        preferred = 1
        flags = 0
        ipaddr = ipaddress.IPv6Interface(unicode(ipaddr_str))
        self.autoAddresses.add(ipaddr)

        arr = self.encode_fields('6CLLC',
                                 ipaddr.ip.packed,
                                 prefix_len,
                                 valid,
                                 preferred,
                                 flags)

        self.wpan_api.prop_insert_async(SPINEL.PROP_IPV6_ADDRESS_TABLE,
                                        arr, str(len(arr)) + 's',
                                        SPINEL.HEADER_EVENT_HANDLER)
项目:pyspinel    作者:openthread    | 项目源码 | 文件源码
def handle_property(self, line, prop_id, mixed_format='B', output=True):
        """ Helper to set property when line argument passed, get otherwise. """
        value = self.prop_get_or_set_value(prop_id, line, mixed_format)
        if not output:
            return value

        if value is None or value == "":
            print("Error")
            return None

        if line is None or line == "":
            # Only print value on PROP_VALUE_GET
            if mixed_format == '6':
                print(str(ipaddress.IPv6Address(value)))
            elif (mixed_format == 'D') or (mixed_format == 'E'):
                print(util.hexify_str(value, ''))
            elif mixed_format == 'H':
                print("%04x" % value)
            else:
                print(str(value))

        print("Done")
        return value
项目:djangolg    作者:wolcomm    | 项目源码 | 文件源码
def __init__(self, value):
        """Initialise new IPPrefix instance."""
        try:
            obj = ipaddress.ip_address(value)
        except Exception:
            try:
                obj = ipaddress.ip_network(value, strict=False)
            except Exception:
                raise
        if type(obj) in [ipaddress.IPv4Address, ipaddress.IPv6Address]:
            self.prefix = None
            self.type = self.HOST
        elif type(obj) in [ipaddress.IPv4Network, ipaddress.IPv6Network]:
            self.prefix = obj.network_address
            self.type = self.PREFIX
        self.version = obj.version
        self.txt = obj.compressed
项目:RemoteTree    作者:deNULL    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:quickstart-git2s3    作者:aws-quickstart    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:Docker-XX-Net    作者:kuanghy    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:PyQYT    作者:collinsctk    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:PyQYT    作者:collinsctk    | 项目源码 | 文件源码
def getSendTo(addressType):
        def sendto(s, _data, _to):
            ancdata = []
            if type(_to) == addressType:
                addr = ipaddress.ip_address(_to.getLocalAddress()[0])
            else:
                addr = ipaddress.ip_address(s.getsockname()[0])
            if type(addr) == ipaddress.IPv4Address:
                _f = in_pktinfo()
                _f.ipi_spec_dst = in_addr.from_buffer_copy(addr.packed)
                ancdata = [(socket.SOL_IP, socket.IP_PKTINFO, memoryview(_f).tobytes())]
            elif s.family == socket.AF_INET6 and type(addr) == ipaddress.IPv6Address:
                _f = in6_pktinfo()
                _f.ipi6_addr = in6_addr.from_buffer_copy(addr.packed)
                ancdata = [(socket.SOL_IPV6, socket.IPV6_PKTINFO, memoryview(_f).tobytes())]
            return s.sendmsg([_data], ancdata, 0, _to)
        return sendto
项目:typepy    作者:thombashi    | 项目源码 | 文件源码
def _is_ipaddress(value):
        import ipaddress

        return isinstance(
            value, (ipaddress.IPv4Address, ipaddress.IPv6Address))
项目:zinc    作者:PressLabs    | 项目源码 | 文件源码
def is_ipv6(ip_addr):
    try:
        ipaddress.IPv6Address(ip_addr)
        return True
    except ipaddress.AddressValueError:
        return False
项目:endosome    作者:teor2345    | 项目源码 | 文件源码
def unpack_ip_address_bytes(data_bytes, addr_type):
    '''
    Return a tuple containing the unpacked addr_type IP address string, and the
    remainder of data_bytes.
    addr_type must be either IPV4_ADDRESS_TYPE or IPV6_ADDRESS_TYPE.
    data_bytes must be at least IPV4_ADDRESS_LEN or IPV6_ADDRESS_LEN long.
    '''
    addr_len = get_addr_type_len(addr_type)
    assert len(data_bytes) >= addr_len
    if addr_type == IPV4_ADDRESS_TYPE:
        assert addr_len == IPV4_ADDRESS_LEN
        (addr_bytes, remaining_bytes) = split_field(addr_len, data_bytes)
        # Some ipaddress variants demand bytearray, others demand bytes
        try:
            addr_value = ipaddress.IPv4Address(bytearray(addr_bytes))
            return (str(addr_value), remaining_bytes)
        except ipaddress.AddressValueError:
            pass
        except UnicodeDecodeError:
            pass
        addr_value = ipaddress.IPv4Address(bytes(addr_bytes))
    elif addr_type == IPV6_ADDRESS_TYPE:
        assert addr_len == IPV6_ADDRESS_LEN
        (addr_bytes, remaining_bytes) = split_field(addr_len, data_bytes)
        try:
            addr_value = ipaddress.IPv6Address(bytearray(addr_bytes))
            return (str(addr_value), remaining_bytes)
        except ipaddress.AddressValueError:
            pass
        except UnicodeDecodeError:
            pass
        addr_value = ipaddress.IPv6Address(bytes(addr_bytes))
    else:
        raise ValueError('Unexpected address type: {}'.format(addr_type))
    return (str(addr_value), remaining_bytes)
项目:prngmgr    作者:wolcomm    | 项目源码 | 文件源码
def get_bgp_table(host):
    """Return the contents of the cbgpPeer2Table table."""
    obj = ObjectIdentity('CISCO-BGP4-MIB', 'cbgpPeer2Table')
    target = UdpTransportTarget((host, SNMP['port']))

    results = defaultdict(dict)

    for (errorIndication, errorStatus,
         errorIndex, varBinds) in bulkCmd(snmp, usm, target, context, 0, 25,
                                          ObjectType(obj),
                                          lexicographicMode=False,
                                          lookupMib=True):
        if errorIndication:
            raise RuntimeError(errorIndication)
        elif errorStatus:
            raise RuntimeError('%s at %s'
                               % (errorStatus.prettyPrint(),
                                  errorIndex
                                  and varBinds[-1][int(errorIndex)-1] or '?'))
        else:
            for (key, val) in varBinds:
                if not isinstance(val, EndOfMibView):
                    (mib, name, index) = \
                        key.loadMibs('BGP4-MIB').getMibSymbol()
                    a = index[1].prettyPrint()
                    results[a]['cbgpPeer2Type'] = index[0]
                    results[a][name] = val
    for address in results:
        if results[address]['cbgpPeer2Type'] == 1:
            results[address]['cbgpPeer2RemoteAddr'] = \
                ipaddress.IPv4Address(int(address, 16))
        elif results[address]['cbgpPeer2Type'] == 2:
            results[address]['cbgpPeer2RemoteAddr'] = \
                ipaddress.IPv6Address(int(address, 16))
    return results
项目:dhcpkit_vpp    作者:sjm-steffann    | 项目源码 | 文件源码
def test_checksum_calculation(self):
        dummy_ipv6 = IPv6(
            source=IPv6Address('2001:9e0::3:2002'),
            destination=IPv6Address('2001:9e0:4:32:212:45:32:0222'),
        )
        self.assertEqual(self.message_fixture.checksum, self.message.calculate_checksum(dummy_ipv6))
项目:dhcpkit_vpp    作者:sjm-steffann    | 项目源码 | 文件源码
def test_save_with_checksum_calculation(self):
        dummy_ipv6 = IPv6(
            source=IPv6Address('2001:9e0::3:2002'),
            destination=IPv6Address('2001:9e0:4:32:212:45:32:0222'),
        )
        self.message.checksum = -1
        saved = self.message.save(recalculate_checksum_for=dummy_ipv6)
        self.assertEqual(saved, self.packet_fixture)
项目:dhcpkit_vpp    作者:sjm-steffann    | 项目源码 | 文件源码
def test_validate_source(self):
        self.message.source = bytes.fromhex('20010db8000000000000000000000001')
        with self.assertRaisesRegex(ValueError, 'Source .* IPv6 address'):
            self.message.validate()

        self.message.source = IPv6Address('ff02::1')
        with self.assertRaisesRegex(ValueError, 'Source .* non-multicast IPv6 address'):
            self.message.validate()
项目:dhcpkit_vpp    作者:sjm-steffann    | 项目源码 | 文件源码
def setUp(self):
        self.packet_class = Ethernet

        self.packet_fixture = bytes.fromhex(
            '0123456789abcdef0123456786dd'
            '6efbcdef001811fd200109e0000000000000000000032002200109e0000400320212004500320222'
            '123456780018a695') + b'Demo UDP packet!'

        self.message_fixture = Ethernet(
            destination=bytes.fromhex('0123456789ab'),
            source=bytes.fromhex('cdef01234567'),
            ethertype=int('86dd', 16),
            payload=IPv6(
                traffic_class=239,
                flow_label=773615,
                next_header=17,
                hop_limit=253,
                source=IPv6Address('2001:9e0::3:2002'),
                destination=IPv6Address('2001:9e0:4:32:212:45:32:0222'),
                payload=UDP(
                    source_port=4660,
                    destination_port=22136,
                    checksum=42645,
                    payload=b'Demo UDP packet!'
                )
            )
        )
        self.parse_packet()
项目:dhcpkit_vpp    作者:sjm-steffann    | 项目源码 | 文件源码
def __init__(self, traffic_class: int = 0, flow_label: int = 0, next_header: int = 0, hop_limit: int = 0,
                 source: IPv6Address = None, destination: IPv6Address = None, payload: ProtocolElement = None):
        super().__init__()
        self.traffic_class = traffic_class
        self.flow_label = flow_label
        self.next_header = next_header
        self.hop_limit = hop_limit
        self.source = source
        self.destination = destination
        self.payload = payload
项目:dhcpkit_vpp    作者:sjm-steffann    | 项目源码 | 文件源码
def validate(self):
        """
        Validate that the contents of this object conform to protocol specs.
        """
        # Check if the traffic class fits in 8 bits
        if not isinstance(self.traffic_class, int) or not (0 <= self.traffic_class < 2 ** 8):
            raise ValueError("Traffic class must be an unsigned 8 bit integer")

        # Check if the flow label fits in 20 bits
        if not isinstance(self.flow_label, int) or not (0 <= self.flow_label < 2 ** 20):
            raise ValueError("Flow label must be an unsigned 20 bit integer")

        # Check if the next header fits in 8 bits
        if not isinstance(self.next_header, int) or not (0 <= self.next_header < 2 ** 8):
            raise ValueError("Next header type must be an unsigned 8 bit integer")

        # Check if the hop limit fits in 8 bits
        if not isinstance(self.hop_limit, int) or not (0 <= self.hop_limit < 2 ** 8):
            raise ValueError("Hop limit must be an unsigned 8 bit integer")

        # Check if the source and destination are IPv6 addresses
        if not isinstance(self.source, IPv6Address):
            raise ValueError("Source must be an IPv6 address")

        if self.source.is_multicast:
            raise ValueError("Source must be a non-multicast IPv6 address")

        if not isinstance(self.destination, IPv6Address):
            raise ValueError("Destination must be an IPv6 address")

        # Check if the payload is a protocol element
        if not isinstance(self.payload, ProtocolElement):
            raise ValueError("Payload must be a protocol element")

        # Check if all options are allowed
        self.validate_contains([self.payload])
        self.payload.validate()
项目:dhcpkit_vpp    作者:sjm-steffann    | 项目源码 | 文件源码
def __init__(self, interface_index: int,
                 interface_mac_address: bytes, client_mac_address: bytes,
                 reply_from: IPv6Address, reply_socket: socket.socket):
        self.interface_index = interface_index
        self.interface_mac_address = interface_mac_address
        self.client_mac_address = client_mac_address
        self.reply_from = reply_from
        self.reply_socket = reply_socket
项目:dhcpkit_vpp    作者:sjm-steffann    | 项目源码 | 文件源码
def find_reply_from(reply_from: Optional[IPv6Address], interface_name: str,
                        interface_addresses: Iterable[IPv6Address]) -> IPv6Address:
        """
        Find the appropriate reply-from address

        :param reply_from: The reply-from address specified in the configuration, if any
        :param interface_name: The name of the interface for logging purposes
        :param interface_addresses: The list of addresses on the interface
        :return: The reply-from address to use
        """
        if reply_from:
            # Check if this address exists
            if reply_from not in interface_addresses:
                raise ValueError("Reply-from address {addr} does not exist on {intf}".format(
                    addr=reply_from,
                    intf=interface_name
                ))
            return reply_from
        else:
            # Pick the first link-local address
            ll_addresses = [address for address in interface_addresses if address.is_link_local]
            if not ll_addresses:
                raise ValueError("No link-local address found on {intf}".format(
                    intf=interface_name
                ))
            return ll_addresses[0]
项目:mdb    作者:edb-gjengen    | 项目源码 | 文件源码
def set_domain_name_for_ipv6_subnet(sender, instance, **kwargs):
    if len(instance.domain_name) > 0:
        return

    network = ipaddress.IPv6Address("%s::" % instance.network)
    instance.domain_name = ".".join(network.exploded.replace(":", "")[:16])[::-1] + ".ip6.arpa"
项目:archive-Holmes-Totem-Service-Library    作者:HolmesProcessing    | 项目源码 | 文件源码
def makeIP(i, size):
    if size == 4:
        return ipaddress.IPv4Address(i)
    else:
        return ipaddress.IPv6Address(i)