Python ipaddress 模块,IPv4Network() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ipaddress.IPv4Network()

项目:foremast    作者:gogoair    | 项目源码 | 文件源码
def _validate_cidr(self, rule):
        """Validate the cidr block in a rule.

        Returns:
            True: Upon successful completion.

        Raises:
            SpinnakerSecurityGroupCreationFailed: CIDR definition is invalid or
                the network range is too wide.
        """
        try:
            network = ipaddress.IPv4Network(rule['app'])
        except (ipaddress.NetmaskValueError, ValueError) as error:
            raise SpinnakerSecurityGroupCreationFailed(error)

        self.log.debug('Validating CIDR: %s', network.exploded)

        return True
项目:privcount    作者:privcount    | 项目源码 | 文件源码
def validate_ip_network_address_prefix(address, prefix, strict=True):
    '''
    If address/prefix is a valid IP network when parsed according to strict,
    return it as an ipnetwork object.
    Otherwise, return None.
    '''
    ip_address = validate_ip_address(address)
    if ip_address is None or ip_address.version not in [4,6]:
        return None
    try:
        if ip_address.version == 4:
            return ipaddress.IPv4Network((ip_address, prefix), strict=strict)
        else:
            return ipaddress.IPv6Network((ip_address, prefix), strict=strict)
    except ValueError:
        return None
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def test_cidr_cast(self):
        import ipaddress as ip
        cur = self.conn.cursor()
        psycopg2.extras.register_ipaddress(cur)

        cur.execute("select null::cidr")
        self.assertTrue(cur.fetchone()[0] is None)

        cur.execute("select '127.0.0.0/24'::cidr")
        obj = cur.fetchone()[0]
        self.assertTrue(isinstance(obj, ip.IPv4Network), repr(obj))
        self.assertEqual(obj, ip.ip_network('127.0.0.0/24'))

        cur.execute("select '::ffff:102:300/128'::cidr")
        obj = cur.fetchone()[0]
        self.assertTrue(isinstance(obj, ip.IPv6Network), repr(obj))
        self.assertEqual(obj, ip.ip_network('::ffff:102:300/128'))
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def test_cidr_cast(self):
        import ipaddress as ip
        cur = self.conn.cursor()
        psycopg2.extras.register_ipaddress(cur)

        cur.execute("select null::cidr")
        self.assert_(cur.fetchone()[0] is None)

        cur.execute("select '127.0.0.0/24'::cidr")
        obj = cur.fetchone()[0]
        self.assert_(isinstance(obj, ip.IPv4Network), repr(obj))
        self.assertEquals(obj, ip.ip_network('127.0.0.0/24'))

        cur.execute("select '::ffff:102:300/128'::cidr")
        obj = cur.fetchone()[0]
        self.assert_(isinstance(obj, ip.IPv6Network), repr(obj))
        self.assertEquals(obj, ip.ip_network('::ffff:102:300/128'))
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:telepresence    作者:datawire    | 项目源码 | 文件源码
def test_covering_cidr(ips):
    """
    covering_cidr() gets the minimal CIDR that covers given IPs.

    In particular, that means any subnets should *not* cover all given IPs.
    """
    cidr = telepresence.vpn.covering_cidr(ips)
    assert isinstance(cidr, str)
    cidr = ipaddress.IPv4Network(cidr)
    assert cidr.prefixlen <= 24
    # All IPs in given CIDR:
    ips = [ipaddress.IPv4Address(i) for i in ips]
    assert all([ip in cidr for ip in ips])
    # Subnets do not contain all IPs if we're not in minimum 24 bit CIDR:
    if cidr.prefixlen < 24:
        for subnet in cidr.subnets():
            assert not all([ip in subnet for ip in ips])
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:kuberdock-platform    作者:cloudlinux    | 项目源码 | 文件源码
def recreate_routable_ip_pool(self):
        self.ip_pools.clear()
        # First we parse get the default interface:
        # 8.8.8.8 via 192.168.115.254 dev ens3  src 192.168.113.245
        #
        # Then we extract IP/Preffix from this:
        # 2: ens3: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast
        # link/ether 02:00:c0:a8:71:f5 brd ff:ff:ff:ff:ff:ff
        # inet 192.168.113.245/22 brd 192.168.115.255 scope global ens3

        cmd = """
        MAIN_INTERFACE=$(ip route get 8.8.8.8 | egrep 'dev\s+.+?\s+src' -o | awk '{print $2}');  # noqa
        ip addr show dev $MAIN_INTERFACE | awk 'NR==3 { print $2 }'
        """
        _, main_ip, _ = self.ssh_exec('master', cmd)

        ip_pool = str(IPv4Network(unicode(main_ip), strict=False))
        self.ip_pools.add(ip_pool, includes=self.env['KD_ONE_PUB_IPS'])
项目:kuberdock-platform    作者:cloudlinux    | 项目源码 | 文件源码
def run(self, subnet, exclude, include, node=None):
        if exclude and include:
            raise InvalidCommand('Can\'t specify both -e and -i')

        if include:
            to_include = ippool.IpAddrPool().parse_autoblock(include)
            net = IPv4Network(unicode(subnet))
            hosts = {str(i) for i in net.hosts()}
            # Do not include network and broadcast IP address
            # TODO: Fix according to AC-4044
            hosts.add(str(net.network_address))
            hosts.add(str(net.broadcast_address))

            exclude = ','.join(hosts - to_include)

        ippool.IpAddrPool().create({
            'network': subnet.decode(),
            'autoblock': exclude,
            'node': node
        })
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def test_cidr_cast(self):
        import ipaddress as ip
        cur = self.conn.cursor()
        psycopg2.extras.register_ipaddress(cur)

        cur.execute("select null::cidr")
        self.assertTrue(cur.fetchone()[0] is None)

        cur.execute("select '127.0.0.0/24'::cidr")
        obj = cur.fetchone()[0]
        self.assertTrue(isinstance(obj, ip.IPv4Network), repr(obj))
        self.assertEqual(obj, ip.ip_network('127.0.0.0/24'))

        cur.execute("select '::ffff:102:300/128'::cidr")
        obj = cur.fetchone()[0]
        self.assertTrue(isinstance(obj, ip.IPv6Network), repr(obj))
        self.assertEqual(obj, ip.ip_network('::ffff:102:300/128'))
项目:nmbs-realtime-feed    作者:datamindedbe    | 项目源码 | 文件源码
def test_cidr_cast(self):
        import ipaddress as ip
        cur = self.conn.cursor()
        psycopg2.extras.register_ipaddress(cur)

        cur.execute("select null::cidr")
        self.assert_(cur.fetchone()[0] is None)

        cur.execute("select '127.0.0.0/24'::cidr")
        obj = cur.fetchone()[0]
        self.assert_(isinstance(obj, ip.IPv4Network), repr(obj))
        self.assertEquals(obj, ip.ip_network('127.0.0.0/24'))

        cur.execute("select '::ffff:102:300/128'::cidr")
        obj = cur.fetchone()[0]
        self.assert_(isinstance(obj, ip.IPv6Network), repr(obj))
        self.assertEquals(obj, ip.ip_network('::ffff:102:300/128'))
项目:python-mebo    作者:crlane    | 项目源码 | 文件源码
def _discover(self, network):
        """
        Runs the discovery scan to find Mebo on your LAN

        :param network: The IPv4 network netmask as a CIDR string. Example: 192.168.1.0/24
        :returns: The IP address of the Mebo found on the LAN
        :raises: `MeboDiscoveryError` if broadcast discovery times out or the API probe produces a 40X or 50x status code
        """
        try:
            print('Looking for Mebo...')
            self._network = IPv4Network(network)
            addr = self._network.broadcast_address.compressed
            broadcast = self._get_broadcast(addr)
            api_response = self._probe(broadcast.ip)
            api_response.raise_for_status()
            print('Mebo found at {}'.format(broadcast.ip))
            return broadcast.ip
        except (socket.timeout, HTTPError):
            raise MeboDiscoveryError(('Unable to locate Mebo on the network.\n'
                                      '\tMake sure it is powered on and connected to LAN.\n'
                                      '\tIt may be necessary to power cycle the Mebo.'))
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:l3overlay    作者:catalyst    | 项目源码 | 文件源码
def ip_address_get(value):
    '''
    Get an IP address from a string. Raises a GetError if the string
    is not a valid IP address. Supports both IPv4 and IPv6.
    '''

    if isinstance(value, (ipaddress.IPv4Network, ipaddress.IPv6Network)):
        raise GetError(
            "%s not allowed to be converted to IP address, use class attributes" %
            type(value).__name__,
        )

    try:
        return ipaddress.ip_address(value)
    except ValueError:
        raise GetError("value '%s' is not a valid IPv4/IPv6 address" % value)
项目:vCloudDirectorManager    作者:blackms    | 项目源码 | 文件源码
def get_less_used_network(num_of_requested_ip, global_variable, config_obj):
    """
    Get the network with more IP available.

    :return: Network
    """
    # identify less used network
    while global_variable.free_ip_lock is True:
        time.sleep(1)
    free_ip = global_variable.free_ips
    less_used_network = sorted(free_ip, key=lambda k: len(free_ip[k]), reverse=True)[0]
    # network must have enough IPs
    if len(global_variable.free_ips[less_used_network]) < num_of_requested_ip + int(
            config_section_map(config_obj, 'vcloud')['min_spare_ip']):
        return None
    ip_list = global_variable.free_ips[less_used_network][:num_of_requested_ip]
    pvdc_external_networks = global_variable.vcs.get_network()
    from ipaddress import IPv4Address, IPv4Network

    external_network = filter(lambda x: IPv4Address(x.gateway) in IPv4Network(less_used_network), pvdc_external_networks)
    return {'object': external_network[0],
            'iplist': ip_list}
项目:slack_scholar    作者:xLeitix    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:djangolg    作者:wolcomm    | 项目源码 | 文件源码
def __init__(self, value):
        """Initialise new IPPrefix instance."""
        try:
            obj = ipaddress.ip_address(value)
        except Exception:
            try:
                obj = ipaddress.ip_network(value, strict=False)
            except Exception:
                raise
        if type(obj) in [ipaddress.IPv4Address, ipaddress.IPv6Address]:
            self.prefix = None
            self.type = self.HOST
        elif type(obj) in [ipaddress.IPv4Network, ipaddress.IPv6Network]:
            self.prefix = obj.network_address
            self.type = self.PREFIX
        self.version = obj.version
        self.txt = obj.compressed
项目:RemoteTree    作者:deNULL    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:quickstart-git2s3    作者:aws-quickstart    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:Docker-XX-Net    作者:kuanghy    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:vpc-router    作者:romana    | 项目源码 | 文件源码
def is_cidr_in_cidr(small_cidr, big_cidr):
    """
    Return True if the small CIDR is contained in the big CIDR.

    """
    # The default route (0.0.0.0/0) is handled differently, since every route
    # would always be contained in there. Instead, only a small CIDR of
    # "0.0.0.0/0" can match against it. Other small CIDRs will always result in
    # 'False' (not contained).
    if small_cidr == "0.0.0.0/0":
        return big_cidr == "0.0.0.0/0"
    else:
        if big_cidr == "0.0.0.0/0":
            return False

    s = ipaddress.IPv4Network(unicode(small_cidr))
    b = ipaddress.IPv4Network(unicode(big_cidr))
    return s.subnet_of(b)
项目:PyQYT    作者:collinsctk    | 项目源码 | 文件源码
def __init__(self, value):
        if not isinstance(
            value,
            (
                ipaddress.IPv4Address,
                ipaddress.IPv6Address,
                ipaddress.IPv4Network,
                ipaddress.IPv6Network
            )
        ):
            raise TypeError(
                "value must be an instance of ipaddress.IPv4Address, "
                "ipaddress.IPv6Address, ipaddress.IPv4Network, or "
                "ipaddress.IPv6Network"
            )

        self._value = value
项目:cdnsim    作者:cnplab    | 项目源码 | 文件源码
def parseIRLorigin(self, fileName, ip2as=True, as2ip=False):
        re_origin = re.compile('(\d+\.\d+\.\d+\.\d+/\d+)\t(\d+)', re.UNICODE)
        if as2ip:
            self.as2ip = dict()
        if ip2as:
            self.ip2as = SubnetTree.SubnetTree()
        F_origin = open(fileName, 'r')
        for line in iter(F_origin):
            match = re_origin.match(line)
            if match is not None:
                asNum = int(match.group(2))
                if ip2as:
                    self.ip2as[match.group(1)] = asNum
                if as2ip:
                    newNet = IPv4Network(match.group(1))
                    if newNet.prefixlen > self.smallSubnetPrefix:
                        continue
                    if asNum in self.netGraph:
                        if asNum not in self.as2ip:
                            self.as2ip[asNum] = [newNet]
                        else:
                            self.as2ip[asNum].append(newNet)
        return None
项目:flask    作者:bobohope    | 项目源码 | 文件源码
def test_cidr_cast(self):
        import ipaddress as ip
        cur = self.conn.cursor()
        psycopg2.extras.register_ipaddress(cur)

        cur.execute("select null::cidr")
        self.assert_(cur.fetchone()[0] is None)

        cur.execute("select '127.0.0.0/24'::cidr")
        obj = cur.fetchone()[0]
        self.assert_(isinstance(obj, ip.IPv4Network), repr(obj))
        self.assertEquals(obj, ip.ip_network('127.0.0.0/24'))

        cur.execute("select '::ffff:102:300/128'::cidr")
        obj = cur.fetchone()[0]
        self.assert_(isinstance(obj, ip.IPv6Network), repr(obj))
        self.assertEquals(obj, ip.ip_network('::ffff:102:300/128'))
项目:containernet    作者:containernet    | 项目源码 | 文件源码
def removeSAPNAT(self, SAPSwitch):

        SAPip = SAPSwitch.ip
        SAPNet = str(ipaddress.IPv4Network(unicode(SAPip), strict=False))
        # due to a bug with python-iptables, removing and finding rules does not succeed when the mininet CLI is running
        # so we use the iptables tool
        rule0_ = "iptables -t nat -D POSTROUTING ! -o {0} -s {1} -j MASQUERADE".format(SAPSwitch.deployed_name, SAPNet)
        p = Popen(shlex.split(rule0_))
        p.communicate()

        rule1_ = "iptables -D FORWARD -o {0} -j ACCEPT".format(SAPSwitch.deployed_name)
        p = Popen(shlex.split(rule1_))
        p.communicate()

        rule2_ = "iptables -D FORWARD -i {0} -j ACCEPT".format(SAPSwitch.deployed_name)
        p = Popen(shlex.split(rule2_))
        p.communicate()

        info("remove SAP NAT rules for: {0} - {1}\n".format(SAPSwitch.name, SAPNet))
项目:IP    作者:DannyMcwaves    | 项目源码 | 文件源码
def __init__(self, address: str, mask: int=32):
        """
        :param address: The IP address
        :param mask: this is the mask it should take for the number of hosts that you are planning to access
                      the mask can be a valid host mask or a valid net_mask or a num_digit slash.
                      host mask starts with 0 and contain either 0's and 255's
                      net masks start with 255 and contain either 0's and 255's
        """
        try:
            self.ip_address = Resolve(address)
            self.__mask = mask
            addr = self.ip_address + str(mask)
            self.mask_addr = ipaddress.IPv4Network(addr, strict=False) if self.ip_address.version == "IPv4" else \
                ipaddress.IPv6Network(addr, strict=False)
        except UnResolvedException as unRes:
            print(unRes.args[1])
            sys.exit()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _validate_ip_name(self, tree):
        if any(isinstance(name, IPAddress) and not isinstance(
            name.value, (ipaddress.IPv4Network, ipaddress.IPv6Network)
        ) for name in tree):
            raise TypeError(
                "IPAddress name constraints must be an IPv4Network or"
                " IPv6Network object"
            )
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _validate_ip_name(self, tree):
        if any(isinstance(name, IPAddress) and not isinstance(
            name.value, (ipaddress.IPv4Network, ipaddress.IPv6Network)
        ) for name in tree):
            raise TypeError(
                "IPAddress name constraints must be an IPv4Network or"
                " IPv6Network object"
            )
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def register_ipaddress(conn_or_curs=None):
    """
    Register conversion support between `ipaddress` objects and `network types`__.

    :param conn_or_curs: the scope where to register the type casters.
        If `!None` register them globally.

    After the function is called, PostgreSQL :sql:`inet` values will be
    converted into `~ipaddress.IPv4Interface` or `~ipaddress.IPv6Interface`
    objects, :sql:`cidr` values into into `~ipaddress.IPv4Network` or
    `~ipaddress.IPv6Network`.

    .. __: https://www.postgresql.org/docs/current/static/datatype-net-types.html
    """
    global ipaddress
    import ipaddress

    global _casters
    if _casters is None:
        _casters = _make_casters()

    for c in _casters:
        register_type(c, conn_or_curs)

    for t in [ipaddress.IPv4Interface, ipaddress.IPv6Interface,
              ipaddress.IPv4Network, ipaddress.IPv6Network]:
        register_adapter(t, adapt_ipaddress)
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def test_cidr_array_cast(self):
        import ipaddress as ip
        cur = self.conn.cursor()
        psycopg2.extras.register_ipaddress(cur)
        cur.execute("select '{NULL,127.0.0.1,::ffff:102:300/128}'::cidr[]")
        l = cur.fetchone()[0]
        self.assertTrue(l[0] is None)
        self.assertEqual(l[1], ip.ip_network('127.0.0.1'))
        self.assertEqual(l[2], ip.ip_network('::ffff:102:300/128'))
        self.assertTrue(isinstance(l[1], ip.IPv4Network), l)
        self.assertTrue(isinstance(l[2], ip.IPv6Network), l)
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def register_ipaddress(conn_or_curs=None):
    """
    Register conversion support between `ipaddress` objects and `network types`__.

    :param conn_or_curs: the scope where to register the type casters.
        If `!None` register them globally.

    After the function is called, PostgreSQL :sql:`inet` values will be
    converted into `~ipaddress.IPv4Interface` or `~ipaddress.IPv6Interface`
    objects, :sql:`cidr` values into into `~ipaddress.IPv4Network` or
    `~ipaddress.IPv6Network`.

    .. __: https://www.postgresql.org/docs/current/static/datatype-net-types.html
    """
    global ipaddress
    import ipaddress

    global _casters
    if _casters is None:
        _casters = _make_casters()

    for c in _casters:
        register_type(c, conn_or_curs)

    for t in [ipaddress.IPv4Interface, ipaddress.IPv6Interface,
              ipaddress.IPv4Network, ipaddress.IPv6Network]:
        register_adapter(t, adapt_ipaddress)
项目:nettool    作者:heyglen    | 项目源码 | 文件源码
def setup(self):
        self.invalid_types = (True, False, 1)
        self.address01 = IPv4Network(u'10.0.0.0/8')
        self.address02 = IPv4Network(u'192.168.0.0/16')
        self.address03 = IPv4Network(u'172.16.0.0/12')
项目:nettool    作者:heyglen    | 项目源码 | 文件源码
def test_addresses_getitem(self):
        group = NetworkGroup()
        assert_equals(group.addresses[0], IPv4Network(u'0.0.0.0/0'))
项目:nettool    作者:heyglen    | 项目源码 | 文件源码
def test_add(self):
        group = NetworkGroup()
        address04 = IPv4Network(u'10.0.0.0/8')
        assert_true(group.add(self.address01))
        assert_false(group.add(self.address01))
        assert_true(group.add(self.address02))
        assert_false(group.add(self.address02))
        assert_false(group.add(self.address01))
        assert_false(group.add(address04))
项目:nettool    作者:heyglen    | 项目源码 | 文件源码
def network_address(value):
                """ Convert a string to a network IP """
                value = NetTest.convert.string.cidr(value)
                return ipaddress.IPv4Network(value).network_address.exploded
项目:nettool    作者:heyglen    | 项目源码 | 文件源码
def broadcast_address(value):
                """ Convert a string to a broadcast IP """
                value = NetTest.convert.string.cidr(value)
                return ipaddress.IPv4Network(value).broadcast_address.exploded
项目:nettool    作者:heyglen    | 项目源码 | 文件源码
def address_builder(value):
        if isinstance(value, (basestring)):
            try:
                value = IPv4Interface(unicode(value)).network
            except AddressValueError:
                message = 'Unsupported string initialization format \'{}\''
                message = message.format(value)
                raise ValueError(message)
        elif not isinstance(value, IPv4Network):
            raise_type_exception(value, (IPv4Network, ), 'build with')
        return value
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def _validate_ip_name(self, tree):
        if any(isinstance(name, IPAddress) and not isinstance(
            name.value, (ipaddress.IPv4Network, ipaddress.IPv6Network)
        ) for name in tree):
            raise TypeError(
                "IPAddress name constraints must be an IPv4Network or"
                " IPv6Network object"
            )
项目:prngmgr    作者:wolcomm    | 项目源码 | 文件源码
def __init__(self, prefix=None, strict=False):
        """Init new Prefix instance."""
        if isinstance(prefix, (ipaddress.IPv4Network, ipaddress.IPv6Network)):
            self._prefix = prefix
        else:
            self._prefix = ipaddress.ip_network(prefix, strict=strict)
项目:mdb    作者:edb-gjengen    | 项目源码 | 文件源码
def create_ips_for_subnet(sender, instance, created, **kwargs):
    if not created:
        return

    subnet = ipaddress.IPv4Network(instance.network + "/" + instance.netmask)

    for addr in subnet.hosts():
        address = Ip4Address(address=str(addr), subnet=instance)
        address.save()
项目:mdb    作者:edb-gjengen    | 项目源码 | 文件源码
def num_addresses(self, ip4subnet):
        subnet = ipaddress.IPv4Network(ip4subnet.network + "/" + ip4subnet.netmask)
        return subnet.num_addresses
项目:mdb    作者:edb-gjengen    | 项目源码 | 文件源码
def broadcast_address(self, ip4subnet):
        subnet = ipaddress.IPv4Network(ip4subnet.network + "/" + ip4subnet.netmask)
        return subnet.broadcast_address
项目:mdb    作者:edb-gjengen    | 项目源码 | 文件源码
def first_address(self, ip4subnet):
        subnet = ipaddress.IPv4Network(ip4subnet.network + "/" + ip4subnet.netmask)
        return next(subnet.hosts())