Python ipaddress 模块,ip_network() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ipaddress.ip_network()

项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def public_network(self):
        """
        All nodes must have the same public network.  The public network
        must be valid.
        """
        for node in self.data.keys():
            public_network = self.data[node].get("public_network", "")
            net_list = Util.parse_list_from_string(public_network)

            log.debug("public_network: {} {}".format(node, net_list))
            for network in net_list:
                try:
                    ipaddress.ip_network(u'{}'.format(network))
                except ValueError as err:
                    msg = "{} on {} is not valid: {}".format(network, node, err)
                    self.errors.setdefault('public_network', []).append(msg)

        self._set_pass_status('public_network')
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def cluster_network(self):
        """
        All storage nodes must have the same cluster network.  The cluster
        network must be valid.
        """
        for node in self.data.keys():
            if ('roles' in self.data[node] and
                'storage' in self.data[node]['roles']):

                cluster_network = self.data[node].get("cluster_network", "")
                net_list = Util.parse_list_from_string(cluster_network)
                log.debug("cluster_network: {} {}".format(node, net_list))
                for network in net_list:
                    try:
                        ipaddress.ip_network(u'{}'.format(network))
                    except ValueError as err:
                        msg = "{} on {} is not valid: {}".format(network, node, err)
                        self.errors.setdefault('cluster_network', []).append(msg)

        self._set_pass_status('cluster_network')
项目:thorn    作者:robinhood    | 项目源码 | 文件源码
def block_cidr_network(*blocked_networks):
    # type: (*str) -> Callable
    """Block recipient URLs from a list of CIDR networks.

    Example:
        >>> block_cidr_network('192.168.0.0/24', '132.34.23.0/24')
    """
    _blocked_networks = [ip_network(text_type(x)) for x in blocked_networks]

    def validate_cidr(recipient_url):
        # type: (str) -> None
        recipient_addr = _url_ip_address(recipient_url)
        for blocked_network in _blocked_networks:
            if recipient_addr in blocked_network:
                raise SecurityError(
                    'IP address of recipient {0}={1} is in network {2}'.format(
                        recipient_url, recipient_addr, blocked_network,
                    ))
    validate_cidr._args = blocked_networks
    validate_cidr._validator = 'block_cidr_network'
    return validate_cidr
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def test_cidr_cast(self):
        import ipaddress as ip
        cur = self.conn.cursor()
        psycopg2.extras.register_ipaddress(cur)

        cur.execute("select null::cidr")
        self.assertTrue(cur.fetchone()[0] is None)

        cur.execute("select '127.0.0.0/24'::cidr")
        obj = cur.fetchone()[0]
        self.assertTrue(isinstance(obj, ip.IPv4Network), repr(obj))
        self.assertEqual(obj, ip.ip_network('127.0.0.0/24'))

        cur.execute("select '::ffff:102:300/128'::cidr")
        obj = cur.fetchone()[0]
        self.assertTrue(isinstance(obj, ip.IPv6Network), repr(obj))
        self.assertEqual(obj, ip.ip_network('::ffff:102:300/128'))
项目:smc-python    作者:gabstopper    | 项目源码 | 文件源码
def set_unset(self, interface_id, attribute, address=None):
        """
        Set attribute to True and unset the same attribute for all other
        interfaces. This is used for interface options that can only be
        set on one engine interface.
        """
        for interface in self:
            for sub_interface in interface.sub_interfaces():
                # Skip VLAN only interfaces (no addresses)
                if not isinstance(sub_interface, PhysicalVlanInterface):
                    if getattr(sub_interface, attribute) is not None:
                        if sub_interface.nicid == str(interface_id):
                            if address is not None: # Find IP on Node Interface
                                if ipaddress.ip_address(bytes_to_unicode(address)) in \
                                    ipaddress.ip_network(sub_interface.network_value):
                                    setattr(sub_interface, attribute, True)
                                else:
                                    setattr(sub_interface, attribute, False)
                            else:
                                setattr(sub_interface, attribute, True)
                        else: #unset
                            setattr(sub_interface, attribute, False)
项目:SupercomputerInABriefcase    作者:SupercomputerInABriefcase    | 项目源码 | 文件源码
def process_arguments(args: List[str]) -> ConnectionData:
    assert isinstance(args, list)
    for a in args:
        assert isinstance(a, str)
    if len(args) == 0:
        #rv = ConnectionData(ip_net='192.168.3.0/24', iface='eth0')
        rv = ConnectionData(ip_net='192.168.3.0/24', iface='wlan0')
    elif len(args) == 1:
        rv = ConnectionData(ip_net=args[0], iface='eth0')
    elif len(args) == 2:
        rv = ConnectionData(ip_net=args[0], iface=args[1])
    else:
        print('Incorrect number of arguments.')
        sys.exit(1)
    try:
        ipaddress.ip_network(rv.ip_net)
    except ValueError:
        print("First parameter doesn't appear to be an IP network specification.")
        sys.exit(1)
    return rv
项目:do-portal    作者:certeu    | 项目源码 | 文件源码
def _validate_cidr_format(cidr):
    """Validate CIDR IP range
    :param str cidr:
    :return:
    :rtype: bool
    """
    try:
        ipaddress.ip_network(cidr, strict=False)
    except (ValueError, ipaddress.AddressValueError,
            ipaddress.NetmaskValueError):
        return False
    if '/' not in cidr:
        return False
    if re.search('\s', cidr):
        return False
    return True
项目:piqueserver    作者:piqueserver    | 项目源码 | 文件源码
def add_ban(self, ip, reason, duration, name=None):
        """
        Ban an ip with an optional reason and duration in minutes. If duration
        is None, ban is permanent.
        """
        network = ip_network(text_type(ip), strict=False)
        for connection in list(self.connections.values()):
            if ip_address(connection.address[0]) in network:
                name = connection.name
                connection.kick(silent=True)
        if duration:
            duration = reactor.seconds() + duration * 60
        else:
            duration = None
        self.bans[ip] = (name or '(unknown)', reason, duration)
        self.save_bans()
项目:pixie    作者:algorithm-ninja    | 项目源码 | 文件源码
def load_config(self, config):
        with open(config) as c:
            cfg = json.load(c)
        m = hashlib.sha224()
        m.update(bytes(cfg['subnet'], 'utf-8'))
        m.update(bytes(cfg['swap_size']))
        m.update(bytes(cfg['root_size']))
        m.update(bytes(cfg['extra_args'], 'utf-8'))
        # TODO: check sizes
        for f in cfg['hashes']:
            with open(f, 'rb') as fl:
                for line in fl:
                    m.update(line)
        cfg['sha224'] = m.hexdigest()
        cfg['subnet'] = ipaddress.ip_network(cfg['subnet'])
        cfg['image_method'] = IMAGE_METHOD
        return cfg
项目:solace-messaging-cf-dev    作者:SolaceLabs    | 项目源码 | 文件源码
def getTestNetworkIps(manifest):
    testSubnets = getTestSubnets(manifest)
    testSubnet = testSubnets[0]
    otherSubnets = testSubnets[1:]

    testIpSubnet = ipaddress.ip_network(testSubnet["range"])
    otherIpSubnets = [ipaddress.ip_network(s["range"]) for s in otherSubnets]
    subnetGateway = testSubnet["gateway"]

    overlapOtherIpSubnets = [s for s in otherIpSubnets if testIpSubnet.overlaps(s)]
    for otherIpSubnet in overlapOtherIpSubnets:
        testIpSubnet = testIpSubnet.address_exclude(otherIpSubnet)

    testHosts = [h.exploded for h in testIpSubnet.hosts()]
    testHosts.remove(subnetGateway)
    return testHosts
项目:kuberdock-platform    作者:cloudlinux    | 项目源码 | 文件源码
def _ip_network_hosts(obj, page=None):
    """
    Returns a portion of IP addresses based on page parameter.
    :returns: IP addresses
    :rtype: iterator (AC-3531)
    Previous rtype was a generator
    """
    pages = obj.pages()
    page = _page(page, pages)
    net_ip = obj.network_address + (page - 1) * 2 ** obj.page_bits
    net_pl = obj.max_prefixlen - obj.page_bits if pages > 1 else obj.prefixlen
    network = ipaddress.ip_network(u'{0}/{1}'.format(net_ip, net_pl))
    # This method used to return _BaseNetwork.hosts() generator,
    # however the default implementation would skip network and
    # broadcast addresses, which made the final IP pool short of
    # two IP addresses.  As of AC-3531 fix, now it returns
    # the _BaseNetwork.__iter__() instead, which does not skip any addresses.
    return iter(network)
项目:kuberdock-platform    作者:cloudlinux    | 项目源码 | 文件源码
def to_dict(self, include=None, exclude=None, page=None):
        free_hosts_and_busy = self.free_hosts_and_busy(page=page)
        pages = ip_network(self.network).pages()
        page = _page(page, pages)
        data = dict(
            id=self.network,
            network=self.network,
            ipv6=self.ipv6,
            free_hosts=self.free_hosts(page=page),
            blocked_list=list(self.get_blocked_set()),
            node=None if self.node is None else self.node.hostname,
            allocation=free_hosts_and_busy,
            page=page,
            pages=pages,
        )
        return data
项目:kuberdock-platform    作者:cloudlinux    | 项目源码 | 文件源码
def init():
    config = read_config_ini('/run/flannel/subnet.env')
    config_network = config['flannel_network']
    config_subnet = config['flannel_subnet']
    _update_ipset('kuberdock_flannel', [config_network], set_type='hash:net')
    _update_ipset('kuberdock_nodes', [])
    _update_ipset('kuberdock_cluster',
                  ['kuberdock_flannel', 'kuberdock_nodes'],
                  set_type='list:set')
    network = ipaddress.ip_network(unicode(config_network))
    subnet = ipaddress.ip_network(unicode(config_subnet), strict=False)
    etcd = ETCD()
    for user in etcd.users():
        for pod in etcd.pods(user):
            pod_ip = ipaddress.ip_address(pod)
            if pod_ip not in network or pod_ip in subnet:
                etcd.delete_user(user, pod)
    update_ipset()
项目:sentry-health    作者:getsentry    | 项目源码 | 文件源码
def _ip_is_system_blacklisted(self, addr):
        for net in self.env.get_config('apiserver.whitelisted_ips'):
            try:
                net = ip_network(net, strict=False)
            except ValueError:
                continue
            if addr in net:
                return False
        for net in self.env.get_config('apiserver.blacklisted_ips'):
            try:
                net = ip_network(net, strict=False)
            except ValueError:
                continue
            if addr in net:
                return True
        return False
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def test_cidr_cast(self):
        import ipaddress as ip
        cur = self.conn.cursor()
        psycopg2.extras.register_ipaddress(cur)

        cur.execute("select null::cidr")
        self.assertTrue(cur.fetchone()[0] is None)

        cur.execute("select '127.0.0.0/24'::cidr")
        obj = cur.fetchone()[0]
        self.assertTrue(isinstance(obj, ip.IPv4Network), repr(obj))
        self.assertEqual(obj, ip.ip_network('127.0.0.0/24'))

        cur.execute("select '::ffff:102:300/128'::cidr")
        obj = cur.fetchone()[0]
        self.assertTrue(isinstance(obj, ip.IPv6Network), repr(obj))
        self.assertEqual(obj, ip.ip_network('::ffff:102:300/128'))
项目:nmbs-realtime-feed    作者:datamindedbe    | 项目源码 | 文件源码
def test_cidr_cast(self):
        import ipaddress as ip
        cur = self.conn.cursor()
        psycopg2.extras.register_ipaddress(cur)

        cur.execute("select null::cidr")
        self.assert_(cur.fetchone()[0] is None)

        cur.execute("select '127.0.0.0/24'::cidr")
        obj = cur.fetchone()[0]
        self.assert_(isinstance(obj, ip.IPv4Network), repr(obj))
        self.assertEquals(obj, ip.ip_network('127.0.0.0/24'))

        cur.execute("select '::ffff:102:300/128'::cidr")
        obj = cur.fetchone()[0]
        self.assert_(isinstance(obj, ip.IPv6Network), repr(obj))
        self.assertEquals(obj, ip.ip_network('::ffff:102:300/128'))
项目:pyikev2    作者:alejandro-perez    | 项目源码 | 文件源码
def process_acquire(self, xfrm_acquire, xfrm_tmpl):
        small_tsi = TrafficSelector.from_network(ip_network(xfrm_acquire.sel.saddr.to_ipaddr()),
                                                 xfrm_acquire.sel.sport, xfrm_acquire.sel.proto)
        small_tsr = TrafficSelector.from_network(ip_network(xfrm_acquire.sel.daddr.to_ipaddr()),
                                                 xfrm_acquire.sel.dport, xfrm_acquire.sel.proto)
        acquire = Acquire(small_tsi, small_tsr, xfrm_acquire.policy.index)

        request = None
        if self.state == IkeSa.State.INITIAL:
            request = self.generate_ike_sa_init_request(acquire)
        elif self.state == IkeSa.State.ESTABLISHED:
            request = self.generate_create_child_sa_request(acquire)
        else:
            logging.warning('Cannot process acquire while waiting for a response.')

        # TODO: Use a request queue for simplicity and to avoid problems with
        # state machine
        if request:
            request_data = request.to_bytes()
            self.log_message(request, self.peer_addr, request_data, send=True)
            return request_data
        else:
            return None
项目:pyikev2    作者:alejandro-perez    | 项目源码 | 文件源码
def create_policies(self, my_addr, peer_addr, ike_conf):
        for ipsec_conf in ike_conf['protect']:
            if ipsec_conf['mode'] == Mode.TUNNEL:
                src_selector = ipsec_conf['my_subnet']
                dst_selector = ipsec_conf['peer_subnet']
            else:
                src_selector = ip_network(my_addr)
                dst_selector = ip_network(peer_addr)

            # generate an index for outbound policies
            index = SystemRandom().randint(0, 10000) << 2 | XFRM_POLICY_OUT
            ipsec_conf['index'] = index

            self._create_policy(src_selector, dst_selector, ipsec_conf['my_port'],
                                ipsec_conf['peer_port'], ipsec_conf['ip_proto'], XFRM_POLICY_OUT,
                                ipsec_conf['ipsec_proto'], ipsec_conf['mode'], my_addr, peer_addr,
                                index=index)
            self._create_policy(dst_selector, src_selector, ipsec_conf['peer_port'],
                                ipsec_conf['my_port'], ipsec_conf['ip_proto'], XFRM_POLICY_IN,
                                ipsec_conf['ipsec_proto'], ipsec_conf['mode'], peer_addr, my_addr)
            self._create_policy(dst_selector, src_selector,
                                ipsec_conf['peer_port'], ipsec_conf['my_port'],
                                ipsec_conf['ip_proto'], XFRM_POLICY_FWD, ipsec_conf['ipsec_proto'],
                                ipsec_conf['mode'], peer_addr, my_addr)
项目:esdc-ce    作者:erigones    | 项目源码 | 文件源码
def web_data_admin(self):
        """Return dict used in admin/DC web templates"""
        return {
            'name': self.name,
            'alias': self.alias,
            'access': self.access,
            'owner': self.owner.username,
            'desc': self.desc,
            'ip_network': str(self.ip_network),
            'network': self.network,
            'netmask': self.netmask,
            'gateway': self.gateway,
            'nic_tag': self.nic_tag,
            'nic_tag_type': self.nic_tag_type,
            'vlan_id': self.vlan_id,
            'vxlan_id': self.vxlan_id,
            'mtu': self.mtu,
            'resolvers': self.get_resolvers(),
            'dns_domain': self.dns_domain,
            'ptr_domain': self.ptr_domain,
            'dc_bound': self.dc_bound_bool,
            'dhcp_passthrough': self.dhcp_passthrough,
        }
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_incompatible_versions(self):
        # These should always raise TypeError
        v4addr = ipaddress.ip_address('1.1.1.1')
        v4net = ipaddress.ip_network('1.1.1.1')
        v6addr = ipaddress.ip_address('::1')
        v6net = ipaddress.ip_address('::1')

        self.assertRaises(TypeError, v4addr.__lt__, v6addr)
        self.assertRaises(TypeError, v4addr.__gt__, v6addr)
        self.assertRaises(TypeError, v4net.__lt__, v6net)
        self.assertRaises(TypeError, v4net.__gt__, v6net)

        self.assertRaises(TypeError, v6addr.__lt__, v4addr)
        self.assertRaises(TypeError, v6addr.__gt__, v4addr)
        self.assertRaises(TypeError, v6net.__lt__, v4net)
        self.assertRaises(TypeError, v6net.__gt__, v4net)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testIpFromInt(self):
        self.assertEqual(self.ipv4_interface._ip,
                         ipaddress.IPv4Interface(16909060)._ip)

        ipv4 = ipaddress.ip_network('1.2.3.4')
        ipv6 = ipaddress.ip_network('2001:658:22a:cafe:200:0:0:1')
        self.assertEqual(ipv4, ipaddress.ip_network(int(ipv4.network_address)))
        self.assertEqual(ipv6, ipaddress.ip_network(int(ipv6.network_address)))

        v6_int = 42540616829182469433547762482097946625
        self.assertEqual(self.ipv6_interface._ip,
                         ipaddress.IPv6Interface(v6_int)._ip)

        self.assertEqual(ipaddress.ip_network(self.ipv4_address._ip).version,
                         4)
        self.assertEqual(ipaddress.ip_network(self.ipv6_address._ip).version,
                         6)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testHash(self):
        self.assertEqual(hash(ipaddress.ip_interface('10.1.1.0/24')),
                         hash(ipaddress.ip_interface('10.1.1.0/24')))
        self.assertEqual(hash(ipaddress.ip_network('10.1.1.0/24')),
                         hash(ipaddress.ip_network('10.1.1.0/24')))
        self.assertEqual(hash(ipaddress.ip_address('10.1.1.0')),
                         hash(ipaddress.ip_address('10.1.1.0')))
        # i70
        self.assertEqual(hash(ipaddress.ip_address('1.2.3.4')),
                         hash(ipaddress.ip_address(
                    int(ipaddress.ip_address('1.2.3.4')._ip))))
        ip1 = ipaddress.ip_address('10.1.1.0')
        ip2 = ipaddress.ip_address('1::')
        dummy = {}
        dummy[self.ipv4_address] = None
        dummy[self.ipv6_address] = None
        dummy[ip1] = None
        dummy[ip2] = None
        self.assertTrue(self.ipv4_address in dummy)
        self.assertTrue(ip2 in dummy)
项目:time2go    作者:twitchyliquid64    | 项目源码 | 文件源码
def testInetRoundtrip(self):
        try:
            import ipaddress

            v = ipaddress.ip_network('192.168.0.0/28')
            self.cursor.execute("select %s as f1", (v,))
            retval = self.cursor.fetchall()
            self.assertEqual(retval[0][0], v)

            v = ipaddress.ip_address('192.168.0.1')
            self.cursor.execute("select %s as f1", (v,))
            retval = self.cursor.fetchall()
            self.assertEqual(retval[0][0], v)

        except ImportError:
            for v in ('192.168.100.128/25', '192.168.0.1'):
                self.cursor.execute(
                    "select cast(cast(%s as varchar) as inet) as f1", (v,))
                retval = self.cursor.fetchall()
                self.assertEqual(retval[0][0], v)
项目:netgrph    作者:yantisj    | 项目源码 | 文件源码
def find_cidr(ip):
    """Finds most specific CIDR in Networks"""

    # Check for non-ip, try DNS
    if not re.search(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', ip):
        try:
            ip = socket.gethostbyname(ip)
        except socket.gaierror:
            raise Exception("Hostname Lookup Failure on: " + ip)

    # Always start with default route
    mostSpecific = "0.0.0.0/0"

    # All networks
    networks = nglib.py2neo_ses.cypher.execute('MATCH (n:Network) RETURN n.cidr as cidr')

    if len(networks) > 1:
        for r in networks.records:
            if ipaddress.ip_address(ip) in ipaddress.ip_network(r.cidr):
                if nglib.verbose > 1:
                    print("find_cidr", ip + " in " + r.cidr)
                mostSpecific = compare_cidr(mostSpecific, r.cidr)

    return mostSpecific
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_incompatible_versions(self):
        # These should always raise TypeError
        v4addr = ipaddress.ip_address('1.1.1.1')
        v4net = ipaddress.ip_network('1.1.1.1')
        v6addr = ipaddress.ip_address('::1')
        v6net = ipaddress.ip_network('::1')

        self.assertRaises(TypeError, v4addr.__lt__, v6addr)
        self.assertRaises(TypeError, v4addr.__gt__, v6addr)
        self.assertRaises(TypeError, v4net.__lt__, v6net)
        self.assertRaises(TypeError, v4net.__gt__, v6net)

        self.assertRaises(TypeError, v6addr.__lt__, v4addr)
        self.assertRaises(TypeError, v6addr.__gt__, v4addr)
        self.assertRaises(TypeError, v6net.__lt__, v4net)
        self.assertRaises(TypeError, v6net.__gt__, v4net)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testIpFromInt(self):
        self.assertEqual(self.ipv4_interface._ip,
                         ipaddress.IPv4Interface(16909060)._ip)

        ipv4 = ipaddress.ip_network('1.2.3.4')
        ipv6 = ipaddress.ip_network('2001:658:22a:cafe:200:0:0:1')
        self.assertEqual(ipv4, ipaddress.ip_network(int(ipv4.network_address)))
        self.assertEqual(ipv6, ipaddress.ip_network(int(ipv6.network_address)))

        v6_int = 42540616829182469433547762482097946625
        self.assertEqual(self.ipv6_interface._ip,
                         ipaddress.IPv6Interface(v6_int)._ip)

        self.assertEqual(ipaddress.ip_network(self.ipv4_address._ip).version,
                         4)
        self.assertEqual(ipaddress.ip_network(self.ipv6_address._ip).version,
                         6)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testHash(self):
        self.assertEqual(hash(ipaddress.ip_interface('10.1.1.0/24')),
                         hash(ipaddress.ip_interface('10.1.1.0/24')))
        self.assertEqual(hash(ipaddress.ip_network('10.1.1.0/24')),
                         hash(ipaddress.ip_network('10.1.1.0/24')))
        self.assertEqual(hash(ipaddress.ip_address('10.1.1.0')),
                         hash(ipaddress.ip_address('10.1.1.0')))
        # i70
        self.assertEqual(hash(ipaddress.ip_address('1.2.3.4')),
                         hash(ipaddress.ip_address(
                    int(ipaddress.ip_address('1.2.3.4')._ip))))
        ip1 = ipaddress.ip_address('10.1.1.0')
        ip2 = ipaddress.ip_address('1::')
        dummy = {}
        dummy[self.ipv4_address] = None
        dummy[self.ipv6_address] = None
        dummy[ip1] = None
        dummy[ip2] = None
        self.assertIn(self.ipv4_address, dummy)
        self.assertIn(ip2, dummy)
项目:routing    作者:opennetworkinglab    | 项目源码 | 文件源码
def __init__(self, onosIps, num=1, numBgpSpeakers=1, asNum=65000, externalOnos=True,
                 peerIntfConfig=None, withFpm=False):
        super(SdnAutonomousSystem, self).__init__(asNum, numBgpSpeakers)
        self.onosIps = onosIps
        self.num = num
        self.numBgpSpeakers = numBgpSpeakers
        self.peerIntfConfig = peerIntfConfig
        self.withFpm = withFpm
        self.externalOnos= externalOnos
        self.internalPeeringSubnet = ip_network(u'1.1.1.0/24')

        for router in self.routers.values():
            # Add iBGP sessions to ONOS nodes
            for onosIp in onosIps:
                router.neighbors.append({'address':onosIp, 'as':asNum, 'port':2000})

            # Add iBGP sessions to other BGP speakers
            for i, router2 in self.routers.items():
                if router == router2:
                    continue
                cpIpBase = self.num*10
                ip = AutonomousSystem.getIthAddress(self.internalPeeringSubnet, cpIpBase+i)
                router.neighbors.append({'address':ip.ip, 'as':asNum})
项目:routing    作者:opennetworkinglab    | 项目源码 | 文件源码
def generateRoutes(baseRange, numRoutes, subnetSize=None):
    baseNetwork = ip_network(baseRange)

    # We need to get at least 2 addresses out of each subnet, so the biggest
    # prefix length we can have is /30
    maxPrefixLength = baseNetwork.max_prefixlen - 2

    if subnetSize is not None:
        return list(baseNetwork.subnets(new_prefix=subnetSize))

    trySubnetSize = baseNetwork.prefixlen + 1
    while trySubnetSize <= maxPrefixLength and \
            len(list(baseNetwork.subnets(new_prefix=trySubnetSize))) < numRoutes:
        trySubnetSize += 1

    if trySubnetSize > maxPrefixLength:
        raise Exception("Can't get enough routes from input parameters")

    return list(baseNetwork.subnets(new_prefix=trySubnetSize))[:numRoutes]
项目:sonic-mgmt    作者:Azure    | 项目源码 | 文件源码
def __init__(self, file_path):
        self._ipv4_lpm_dict = LpmDict()
        for ip in EXCLUDE_IPV4_PREFIXES:
            self._ipv4_lpm_dict[ip] = self.NextHop()

        self._ipv6_lpm_dict = LpmDict(ipv4=False)
        for ip in EXCLUDE_IPV6_PREFIXES:
            self._ipv6_lpm_dict[ip] = self.NextHop()

        # filter out empty lines and lines starting with '#'
        pattern = re.compile("^#.*$|^[ \t]*$")

        with open(file_path, 'r') as f:
            for line in f.readlines():
                if pattern.match(line): continue
                entry = line.split(' ', 1)
                prefix = ip_network(unicode(entry[0]))
                next_hop = self.NextHop(entry[1])
                if prefix.version is 4:
                    self._ipv4_lpm_dict[str(prefix)] = next_hop
                elif prefix.version is 6:
                    self._ipv6_lpm_dict[str(prefix)] = next_hop
项目:bgpfu    作者:bgpfu    | 项目源码 | 文件源码
def parse_prefix_range(expr=None):
        pattern = r'^(?P<prefix>((\d+(\.\d+){3})|([0-9a-fA-F:]{2,40}))/\d+)' \
                  r'(\^(?P<ge>\d+)-(?P<le>\d+))?$'
        match = re.match(pattern, expr)
        if match:
            prefix = ipaddress.ip_network(unicode(match.group("prefix")))
            afi = "ipv%d" % prefix.version
            entry = {"prefix": prefix}
            try:
                entry["greater-equal"] = int(match.group("ge"))
                entry["less-equal"] = int(match.group("le"))
            except (IndexError, TypeError):
                pass
        else:
            raise ValueError("no match found")
        return {afi: [entry]}
项目:netmon    作者:bullerian    | 项目源码 | 文件源码
def _ipnet(net_str):
        """
        This function is used for argparse module to verify validness of
        entered network. Valid networks are IPv4 networks in form
        A.B.C.D/prefix
        :param net_str: string that represents IPv4 net. Example A.B.C.D/prefix.
        :return: ipaddress.ip_network object or raises exception on fail
        """
        try:
            the_net = ipaddress.ip_network(net_str)
        except ValueError:
            msg = '{} is not a valid IP network address'.format(net_str)
            raise argparse.ArgumentTypeError(msg)

        if the_net.version != Utilities.IPV4:
            msg = 'Only IPv4 addresses are supported'
            raise argparse.ArgumentTypeError(msg)

        return the_net
项目:netmon    作者:bullerian    | 项目源码 | 文件源码
def test_arp_ping(self):
        """test ARP ping - compare to arping utilite"""

        arp_ping = Ping(IFACE, ARP_NAME, TIMEOUT, False)

        for ip in list(ipaddress.ip_network(TEST_NETWORK).hosts())[:5]:
            try:
                # need arping installed
                with os.popen('arping -c {} -t {} {}'.format(COUNT,
                                                             TIMEOUT,
                                                             str(ip)), 'r'):
                    # get exit code
                    ec = os.wait()[1] & 0xFF00
                res = arp_ping.ping_host(str(ip))
            except PermissionException:
                print('Need root previlegies')

            if res[STATUS_INDEX] == ONLINE:
                self.assertTrue(ec == 0)
            else:
                self.assertFalse(ec == 0)
项目:netmon    作者:bullerian    | 项目源码 | 文件源码
def test_icmp_ping(self):
        """test icmp ping - compare to icmping utilite"""

        icmp_ping = Ping(IFACE, ICMP_NAME,  TIMEOUT, False)

        for ip in list(ipaddress.ip_network(TEST_NETWORK).hosts())[:5]:
            try:
                # need arping installed
                with os.popen('ping -c {} -t {} {}'.format(COUNT,
                                                           TIMEOUT,
                                                           str(ip)), 'r'):
                    # get exit code
                    ec = os.wait()[1] & 0xFF00
                res = icmp_ping.ping_host(str(ip))
            except PermissionException:
                print('Need root previlegies')
            if res[STATUS_INDEX] == ONLINE:
                self.assertTrue(ec == 0)
            else:
                self.assertFalse(ec == 0)
项目:serveradmin    作者:innogames    | 项目源码 | 文件源码
def edit(request):
    if 'object_id' in request.GET:
        server = Query({'object_id': request.GET['object_id']}).get()
    else:
        servertype = Servertype.objects.get(pk=request.POST['attr_servertype'])
        project = Project.objects.get(pk=request.POST['attr_project'])
        hostname = request.POST['attr_hostname']
        if servertype.ip_addr_type == 'null':
            intern_ip = None
        elif servertype.ip_addr_type == 'network':
            intern_ip = ip_network(request.POST['attr_intern_ip'])
        else:
            intern_ip = ip_interface(request.POST['attr_intern_ip'])
        server = ServerObject.new(servertype, project, hostname, intern_ip)

    return _edit(request, server, True)
项目:oecluster    作者:OECFHTW    | 项目源码 | 文件源码
def scan_network(self):
        logger.info('scanning Network')
        network = self._config_reader.get_config_section("Networking")['network']
        netmask = self._config_reader.get_config_section("Networking")['netmask']
        my_net = ipaddress.ip_network(network+'/'+netmask)
        host_list = dict()
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.settimeout(2.0)

        for ip in my_net:
            try:
                # print(ip)
                host = self._generate_host(ip)
                if host is not None:
                    host_list[ip] = host
            except socket.herror as ex:
                pass
        return host_list
项目:asyncpg    作者:MagicStack    | 项目源码 | 文件源码
def setUp(self):
        super().setUp()

        self.cluster.reset_hba()

        create_script = []
        create_script.append('CREATE ROLE ssl_user WITH LOGIN;')

        self.cluster.add_hba_entry(
            type='hostssl', address=ipaddress.ip_network('127.0.0.0/24'),
            database='postgres', user='ssl_user',
            auth_method='trust')

        self.cluster.add_hba_entry(
            type='hostssl', address=ipaddress.ip_network('::1/128'),
            database='postgres', user='ssl_user',
            auth_method='trust')

        # Put hba changes into effect
        self.cluster.reload()

        create_script = '\n'.join(create_script)
        self.loop.run_until_complete(self.con.execute(create_script))
项目:djangolg    作者:wolcomm    | 项目源码 | 文件源码
def __init__(self, value):
        """Initialise new IPPrefix instance."""
        try:
            obj = ipaddress.ip_address(value)
        except Exception:
            try:
                obj = ipaddress.ip_network(value, strict=False)
            except Exception:
                raise
        if type(obj) in [ipaddress.IPv4Address, ipaddress.IPv6Address]:
            self.prefix = None
            self.type = self.HOST
        elif type(obj) in [ipaddress.IPv4Network, ipaddress.IPv6Network]:
            self.prefix = obj.network_address
            self.type = self.PREFIX
        self.version = obj.version
        self.txt = obj.compressed
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_incompatible_versions(self):
        # These should always raise TypeError
        v4addr = ipaddress.ip_address('1.1.1.1')
        v4net = ipaddress.ip_network('1.1.1.1')
        v6addr = ipaddress.ip_address('::1')
        v6net = ipaddress.ip_address('::1')

        self.assertRaises(TypeError, v4addr.__lt__, v6addr)
        self.assertRaises(TypeError, v4addr.__gt__, v6addr)
        self.assertRaises(TypeError, v4net.__lt__, v6net)
        self.assertRaises(TypeError, v4net.__gt__, v6net)

        self.assertRaises(TypeError, v6addr.__lt__, v4addr)
        self.assertRaises(TypeError, v6addr.__gt__, v4addr)
        self.assertRaises(TypeError, v6net.__lt__, v4net)
        self.assertRaises(TypeError, v6net.__gt__, v4net)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def testIpFromInt(self):
        self.assertEqual(self.ipv4_interface._ip,
                         ipaddress.IPv4Interface(16909060)._ip)

        ipv4 = ipaddress.ip_network('1.2.3.4')
        ipv6 = ipaddress.ip_network('2001:658:22a:cafe:200:0:0:1')
        self.assertEqual(ipv4, ipaddress.ip_network(int(ipv4.network_address)))
        self.assertEqual(ipv6, ipaddress.ip_network(int(ipv6.network_address)))

        v6_int = 42540616829182469433547762482097946625
        self.assertEqual(self.ipv6_interface._ip,
                         ipaddress.IPv6Interface(v6_int)._ip)

        self.assertEqual(ipaddress.ip_network(self.ipv4_address._ip).version,
                         4)
        self.assertEqual(ipaddress.ip_network(self.ipv6_address._ip).version,
                         6)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def testHash(self):
        self.assertEqual(hash(ipaddress.ip_interface('10.1.1.0/24')),
                         hash(ipaddress.ip_interface('10.1.1.0/24')))
        self.assertEqual(hash(ipaddress.ip_network('10.1.1.0/24')),
                         hash(ipaddress.ip_network('10.1.1.0/24')))
        self.assertEqual(hash(ipaddress.ip_address('10.1.1.0')),
                         hash(ipaddress.ip_address('10.1.1.0')))
        # i70
        self.assertEqual(hash(ipaddress.ip_address('1.2.3.4')),
                         hash(ipaddress.ip_address(
                    int(ipaddress.ip_address('1.2.3.4')._ip))))
        ip1 = ipaddress.ip_address('10.1.1.0')
        ip2 = ipaddress.ip_address('1::')
        dummy = {}
        dummy[self.ipv4_address] = None
        dummy[self.ipv6_address] = None
        dummy[ip1] = None
        dummy[ip2] = None
        self.assertIn(self.ipv4_address, dummy)
        self.assertIn(ip2, dummy)
项目:Sigyn    作者:freenode    | 项目源码 | 文件源码
def _ip_ranges (self, h):
        if '/' in h:
            if h.startswith('gateway/web/freenode/ip.'):
               h = h.replace('gateway/web/freenode/ip.','')
               return [ipaddress.ip_network(u'%s/27' % h, strict=False).with_prefixlen.encode('utf-8'),ipaddress.ip_network(u'%s/26' % h, strict=False).with_prefixlen.encode('utf-8'),ipaddress.ip_network(u'%s/25' % h, strict=False).with_prefixlen.encode('utf-8'),ipaddress.ip_network(u'%s/24' % h, strict=False).with_prefixlen.encode('utf-8')]
            return [h]
        if utils.net.isIPV4(h):
            return [ipaddress.ip_network(u'%s/27' % h, strict=False).with_prefixlen.encode('utf-8'),ipaddress.ip_network(u'%s/26' % h, strict=False).with_prefixlen.encode('utf-8'),ipaddress.ip_network(u'%s/25' % h, strict=False).with_prefixlen.encode('utf-8'),ipaddress.ip_network(u'%s/24' % h, strict=False).with_prefixlen.encode('utf-8')]
        if utils.net.bruteIsIPV6(h):
            r = []
            r.append(ipaddress.ip_network(u'%s/120' % h, False).with_prefixlen.encode('utf-8'))
            r.append(ipaddress.ip_network(u'%s/118' % h, False).with_prefixlen.encode('utf-8'))
            r.append(ipaddress.ip_network(u'%s/116' % h, False).with_prefixlen.encode('utf-8'))
            r.append(ipaddress.ip_network(u'%s/114' % h, False).with_prefixlen.encode('utf-8'))
            r.append(ipaddress.ip_network(u'%s/112' % h, False).with_prefixlen.encode('utf-8'))
            r.append(ipaddress.ip_network(u'%s/110' % h, False).with_prefixlen.encode('utf-8'))
            r.append(ipaddress.ip_network(u'%s/64' % h, False).with_prefixlen.encode('utf-8'))
            return r
        return [h]
项目:django-heartbeat    作者:pbs    | 项目源码 | 文件源码
def is_authorized(ip, authorized_ips):
    ip = ip_address(ip)

    for item in authorized_ips:
        try:
            if ip == ip_address(item):
                return True
        except ValueError:
            try:
                if ip in ip_network(item):
                    return True
            except ValueError:
                logger.warn('The "authorized_ip" list (settings.HEARTBEAT)'
                            'contains an item that is neither an ip address '
                            'nor an ip network: {}'.format(item))
项目:Home-Assistant    作者:jmart518    | 项目源码 | 文件源码
def check_access(clientip):
    global BANNED_IPS
    if clientip in BANNED_IPS:
        return False
    if not ALLOWED_NETWORKS:
        return True
    for net in ALLOWED_NETWORKS:
        ipobject = ipaddress.ip_address(clientip)
        if ipobject in ipaddress.ip_network(net):
            return True
    BANNED_IPS.append(clientip)
    return False
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def public_interface(self):
        """
        Check that all minions have an address on the public network
        """
        for node in self.data.keys():
            if ('roles' in self.data[node] and
                'master' in self.data[node]['roles']):
                continue
            found = False
            public_network = self.data[node].get("public_network", "")
            net_list = Util.parse_list_from_string(public_network)
            for address in self.grains[node]['ipv4']:
                try:
                    for network in net_list:
                        addr = ipaddress.ip_address(u'{}'.format(address))
                        net = ipaddress.ip_network(u'{}'.format(network))
                        if addr in net:
                            found = True
                except ValueError:
                    # Don't care about reporting a ValueError here if
                    # public_network is malformed, because the
                    # previous validation in public_network() will do that.
                    pass
            if not found:
                msg = "minion {} missing address on public network {}".format(node, public_network)
                self.errors.setdefault('public_interface', []).append(msg)

        self._set_pass_status('public_network')
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def cluster_interface(self):
        """
        Check that storage nodes have an interface on the cluster network
        """
        # pylint: disable=too-many-nested-blocks
        for node in self.data.keys():
            if ('roles' in self.data[node] and
                'storage' in self.data[node]['roles']):
                found = False
                cluster_network = self.data[node].get("cluster_network", "")
                net_list = Util.parse_list_from_string(cluster_network)
                for address in self.grains[node]['ipv4']:
                    try:
                        for network in net_list:
                            addr = ipaddress.ip_address(u'{}'.format(address))
                            net = ipaddress.ip_network(u'{}'.format(network))
                            if addr in net:
                                found = True
                    except ValueError:
                        # Don't care about reporting a ValueError here if
                        # cluster_network is malformed, because the
                        # previous validation in cluster_network() will do that.
                        pass
                if not found:
                    msg = "minion {}".format(node)
                    msg += " missing address on cluster network {}".format(cluster_network)
                    self.errors.setdefault('cluster_interface', []).append(msg)

        self._set_pass_status('cluster_interface')
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def address():
    """
    Find the public address for a minion
    """

    if 'public_network' not in __pillar__:
        return ""

    log.debug("pillar: {}".format(type(__pillar__['public_network'])))
    if isinstance(__pillar__['public_network'], str):
        networks = re.split(', *', __pillar__['public_network'])
    else:
        networks = __pillar__['public_network']

    log.debug("networks: {}".format(pprint.pformat(networks)))
    for public_network in networks:
        log.info('public_network: {}'.format(public_network))
        interfaces = __salt__['network.interfaces']()

        log.debug("interfaces: {}".format(pprint.pformat(interfaces)))
        for interface in interfaces:
            log.info("interface: {}".format(pprint.pformat(interface)))
            if 'inet' in interfaces[interface]:
                for entry in interfaces[interface]['inet']:
                    _address = entry['address']
                    log.info("Checking address {}".format(_address))
                    if (ipaddress.ip_address(u'{}'.format(_address)) in
                        ipaddress.ip_network(u'{}'.format(public_network))):
                        return _address
    return ""
项目:privcount    作者:privcount    | 项目源码 | 文件源码
def validate_ip_network(network, strict=True):
    '''
    If network is a valid IP network when parsed according to strict,
    return it as an ipnetwork object.
    Otherwise, return None.
    '''
    try:
        return ipaddress.ip_network(unicode(address), strict=strict)
    except ValueError:
        return None
项目:talisker    作者:canonical-ols    | 项目源码 | 文件源码
def get_networks():
    network_tokens = os.environ.get('TALISKER_NETWORKS', '').split()
    networks = [ip_network(force_unicode(n)) for n in network_tokens]
    if networks:
        logger = logging.getLogger(__name__)
        logger.info('configured TALISKER_NETWORKS',
                    extra={'networks': [str(n) for n in networks]})
    return networks
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def cast_network(s, cur=None):
    if s is None:
        return None
    return ipaddress.ip_network(str(s))