Python netaddr 模块,EUI 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用netaddr.EUI

项目:myautotest    作者:auuppp    | 项目源码 | 文件源码
def get_ipv6_addr_by_EUI64(cidr, mac):
    """Generate a IPv6 addr by EUI-64 with CIDR and MAC

    :param str cidr: a IPv6 CIDR
    :param str mac: a MAC address
    :return: an IPv6 Address
    :rtype: netaddr.IPAddress
    """
    # Check if the prefix is IPv4 address
    is_ipv4 = netaddr.valid_ipv4(cidr)
    if is_ipv4:
        msg = "Unable to generate IP address by EUI64 for IPv4 prefix"
        raise TypeError(msg)
    try:
        eui64 = int(netaddr.EUI(mac).eui64())
        prefix = netaddr.IPNetwork(cidr)
        return netaddr.IPAddress(prefix.first + eui64 ^ (1 << 57))
    except (ValueError, netaddr.AddrFormatError):
        raise TypeError('Bad prefix or mac format for generating IPv6 '
                        'address by EUI-64: %(prefix)s, %(mac)s:'
                        % {'prefix': cidr, 'mac': mac})
    except TypeError:
        raise TypeError('Bad prefix type for generate IPv6 address by '
                        'EUI-64: %s' % cidr)
项目:platform-install    作者:opencord    | 项目源码 | 文件源码
def genmac(value, prefix='', length=12):
    '''
    deterministically generates a "random" MAC with a configurable prefix
    '''

    # from: http://serverfault.com/questions/40712/what-range-of-mac-addresses-can-i-safely-use-for-my-virtual-machines
    if prefix == '' :
        mac_prefix = "0ac04d" # random "cord"-esque

    # deterministically generate a value
    h = hashlib.new('sha1')
    h.update(value)

    # build/trim MAC
    mac_string = (mac_prefix + h.hexdigest())[0:length]

    return netaddr.EUI(mac_string)
项目:WMIControl    作者:crutchcorn    | 项目源码 | 文件源码
def sendWoL(mac, broadcast=findBroadcast()):
    """Given string mac and broadcast: Turn on computer using WoL
    This was taken from http://code.activestate.com/recipes/358449-wake-on-lan/. Thanks, Fadly!
    """
    # Cleans and removes delimiters from MAC
    try:
        mac = format_mac(EUI(mac), mac_bare)
    except AddrFormatError:
        raise ValueError('Incorrect MAC address format')

    # Pad the synchronization stream.
    data = ''.join(['FFFFFFFFFFFF', mac * 20])
    send_data = b''

    # Split up the hex values and pack.
    for i in range(0, len(data), 2):
        send_data = b''.join([send_data, pack('B', int(data[i: i + 2], 16))])

    # Broadcast it to the LAN.
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    sock.sendto(send_data, (broadcast, 7))
项目:quark    作者:openstack    | 项目源码 | 文件源码
def _to_mac_range(val):
    cidr_parts = val.split("/")
    prefix = cidr_parts[0]

    # FIXME(anyone): replace is slow, but this doesn't really
    #               get called ever. Fix maybe?
    prefix = prefix.replace(':', '')
    prefix = prefix.replace('-', '')
    prefix_length = len(prefix)
    if prefix_length < 6 or prefix_length > 12:
        raise q_exc.InvalidMacAddressRange(cidr=val)

    diff = 12 - len(prefix)
    if len(cidr_parts) > 1:
        mask = int(cidr_parts[1])
    else:
        mask = 48 - diff * 4
    mask_size = 1 << (48 - mask)
    prefix = "%s%s" % (prefix, "0" * diff)
    try:
        cidr = "%s/%s" % (str(netaddr.EUI(prefix)).replace("-", ":"), mask)
    except netaddr.AddrFormatError:
        raise q_exc.InvalidMacAddressRange(cidr=val)
    prefix_int = int(prefix, base=16)
    return cidr, prefix_int, prefix_int + mask_size
项目:quark    作者:openstack    | 项目源码 | 文件源码
def test_allocate_v6_with_mac_fails_policy_raises(self):
        cidr = netaddr.IPNetwork("fe80::dead:beef/64")
        allocation_pool = [{"start": cidr[-4], "end": cidr[-2]}]
        subnet = dict(allocation_pools=allocation_pool,
                      cidr="fe80::dead:beef/64", ip_version=6,
                      next_auto_assign_ip=0, tenant_id="fake")
        subnet = {"subnet": subnet}

        network = dict(name="public", tenant_id="fake", network_plugin="BASE")
        network = {"network": network}

        ip_policy = {"exclude": ["fe80::dead:beef/64"]}
        ip_policy = {"ip_policy": ip_policy}

        mac = models.MacAddress()
        mac["address"] = netaddr.EUI("AA:BB:CC:DD:EE:FF")

        old_override = cfg.CONF.QUARK.v6_allocation_attempts
        cfg.CONF.set_override('v6_allocation_attempts', 1, 'QUARK')

        with self._stubs(network, subnet, ip_policy) as (net, sub, ipp):
            with self.assertRaises(n_exc.IpAddressGenerationFailure):
                self.ipam.allocate_ip_address(self.context, [], net["id"], 0,
                                              0, subnets=[sub["id"]])
        cfg.CONF.set_override('v6_allocation_attempts', old_override, 'QUARK')
项目:quark    作者:openstack    | 项目源码 | 文件源码
def test_update_port_with_security_groups(self, redis_cli):
        mock_client = mock.MagicMock()
        redis_cli.return_value = mock_client

        port_id = str(uuid.uuid4())
        device_id = str(uuid.uuid4())
        mac_address = netaddr.EUI("AA:BB:CC:DD:EE:FF").value
        security_groups = [str(uuid.uuid4())]
        payload = {}
        mock_client.serialize_groups.return_value = payload
        self.driver.update_port(
            context=self.context, network_id="public_network", port_id=port_id,
            device_id=device_id, mac_address=mac_address,
            security_groups=security_groups)
        mock_client.serialize_groups.assert_called_once_with(security_groups)
        mock_client.apply_rules.assert_called_once_with(
            device_id, mac_address, payload)
项目:quark    作者:openstack    | 项目源码 | 文件源码
def test_delete_port_redis_is_dead(self, sg_cli):
        device_id = str(uuid.uuid4())
        mac_address = netaddr.EUI("AA:BB:CC:DD:EE:FF").value
        mock_client = mock.MagicMock()
        sg_cli.return_value = mock_client
        mock_client.delete_vif.side_effect = Exception

        try:
            self.driver.delete_port(context=self.context, port_id=2,
                                    mac_address=mac_address,
                                    device_id=device_id)
            mock_client.delete_vif.assert_called_once_with(
                device_id, mac_address)
        except Exception:
            # This test fails without the exception handling in
            # _delete_port_security_groups
            self.fail("This shouldn't have raised")
项目:quark    作者:openstack    | 项目源码 | 文件源码
def test_apply_rules(self, strict_redis, uuid4):
        client = sg_client.SecurityGroupsClient()
        device_id = "device"
        uuid4.return_value = "uuid"

        mac_address = netaddr.EUI("AA:BB:CC:DD:EE:FF")
        client.apply_rules(device_id, mac_address.value, [])
        self.assertTrue(client._client.master.hset.called)

        redis_key = client.vif_key(device_id, mac_address.value)

        rule_dict = {"rules": []}

        client._client.master.hset.assert_any_call(
            redis_key, sg_client.SECURITY_GROUP_HASH_ATTR,
            json.dumps(rule_dict))

        client._client.master.hset.assert_any_call(
            redis_key, sg_client.SECURITY_GROUP_ACK, False)
项目:EvilTwinFramework    作者:Esser420    | 项目源码 | 文件源码
def __init__(self,  id=0, name=None, mac_address=None, ip_address=None,
                        connected_ssid=None, inactivity_time=None,
                        rx_packets=None, tx_packets=None,
                        rx_bitrate=None, tx_bitrate=None,
                        signal=None):

        self.id = id
        self.name = name
        self.mac_address = mac_address
        self.ip_address = ip_address
        self.connected_ssid = connected_ssid
        self.inactivity_time = inactivity_time
        self.rx_packets = rx_packets
        self.tx_packets = tx_packets
        self.tx_bitrate = tx_bitrate
        self.rx_bitrate = rx_bitrate
        self.signal = signal
        self.vendor = None
        try:
            self.vendor = EUI(mac_address).oui.registration().org      # OUI - Organizational Unique Identifier
        except Exception:
            pass
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def to_global(prefix, mac, project_id):
    project_hash = netaddr.IPAddress(
                   int(hashlib.sha1(project_id).hexdigest()[:8], 16) << 32)
    static_num = netaddr.IPAddress(0xff << 24)

    try:
        mac_suffix = netaddr.EUI(mac).words[3:]
        int_addr = int(''.join(['%02x' % i for i in mac_suffix]), 16)
        mac_addr = netaddr.IPAddress(int_addr)
        maskIP = netaddr.IPNetwork(prefix).ip
        return (project_hash ^ static_num ^ mac_addr | maskIP).format()
    except netaddr.AddrFormatError:
        raise TypeError(_('Bad mac for to_global_ipv6: %s') % mac)
    except TypeError:
        raise TypeError(_('Bad prefix for to_global_ipv6: %s') % prefix)
    except NameError:
        raise TypeError(_('Bad project_id for to_global_ipv6: %s') %
                        project_id)
项目:queue-messaging    作者:socialwifi    | 项目源码 | 文件源码
def test_serialization_with_eui_in_different_dialect(self, model):
        field = fields.MACAddressField()
        model.mac = netaddr.EUI('78-F8-82-B2-E5-5A', dialect=netaddr.mac_eui48)
        serialized = field.serialize('mac', model)
        assert serialized == '78:f8:82:b2:e5:5a'
项目:queue-messaging    作者:socialwifi    | 项目源码 | 文件源码
def test_deserialization(self):
        field = fields.MACAddressField()
        value = '78:f8:82:b2:e5:5a'
        deserialized = field.deserialize(value)
        assert deserialized == '78:f8:82:b2:e5:5a'
        assert repr(deserialized) == 'EUI(\'78:f8:82:b2:e5:5a\')'
        assert deserialized.dialect == netaddr.mac_unix_expanded
项目:queue-messaging    作者:socialwifi    | 项目源码 | 文件源码
def _to_python(self, value):
        eui = netaddr.EUI(value)
        eui.dialect = self.default_dialect
        return eui
项目:napalm-base    作者:napalm-automation    | 项目源码 | 文件源码
def mac(raw):
    """
    Converts a raw string to a standardised MAC Address EUI Format.

    :param raw: the raw string containing the value of the MAC Address
    :return: a string with the MAC Address in EUI format

    Example:

    .. code-block:: python

        >>> mac('0123.4567.89ab')
        u'01:23:45:67:89:AB'

    Some vendors like Cisco return MAC addresses like a9:c5:2e:7b:6: which is not entirely valid
    (with respect to EUI48 or EUI64 standards). Therefore we need to stuff with trailing zeros

    Example
    >>> mac('a9:c5:2e:7b:6:')
    u'A9:C5:2E:7B:60:00'

    If Cisco or other obscure vendors use their own standards, will throw an error and we can fix
    later, however, still works with weird formats like:

    >>> mac('123.4567.89ab')
    u'01:23:45:67:89:AB'
    >>> mac('23.4567.89ab')
    u'00:23:45:67:89:AB'
    """
    if raw.endswith(':'):
        flat_raw = raw.replace(':', '')
        raw = '{flat_raw}{zeros_stuffed}'.format(
            flat_raw=flat_raw,
            zeros_stuffed='0'*(12-len(flat_raw))
        )
    return py23_compat.text_type(EUI(raw, dialect=_MACFormat))
项目:deb-oslo.utils    作者:openstack    | 项目源码 | 文件源码
def get_ipv6_addr_by_EUI64(prefix, mac):
    """Calculate IPv6 address using EUI-64 specification.

    This method calculates the IPv6 address using the EUI-64
    addressing scheme as explained in rfc2373.

    :param prefix: IPv6 prefix.
    :param mac: IEEE 802 48-bit MAC address.
    :returns: IPv6 address on success.
    :raises ValueError, TypeError: For any invalid input.

    .. versionadded:: 1.4
    """
    # Check if the prefix is an IPv4 address
    if is_valid_ipv4(prefix):
        msg = _("Unable to generate IP address by EUI64 for IPv4 prefix")
        raise ValueError(msg)
    try:
        eui64 = int(netaddr.EUI(mac).eui64())
        prefix = netaddr.IPNetwork(prefix)
        return netaddr.IPAddress(prefix.first + eui64 ^ (1 << 57))
    except (ValueError, netaddr.AddrFormatError):
        raise ValueError(_('Bad prefix or mac format for generating IPv6 '
                           'address by EUI-64: %(prefix)s, %(mac)s:')
                         % {'prefix': prefix, 'mac': mac})
    except TypeError:
        raise TypeError(_('Bad prefix type for generating IPv6 address by '
                          'EUI-64: %s') % prefix)
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def slaac(value, query = ''):
    ''' Get the SLAAC address within given network '''
    try:
        vtype = ipaddr(value, 'type')
        if vtype == 'address':
            v = ipaddr(value, 'cidr')
        elif vtype == 'network':
            v = ipaddr(value, 'subnet')

        if ipaddr(value, 'version') != 6:
            return False

        value = netaddr.IPNetwork(v)
    except:
        return False

    if not query:
        return False

    try:
        mac = hwaddr(query, alias = 'slaac')

        eui = netaddr.EUI(mac)
    except:
        return False

    return eui.ipv6(value.network)


# ---- HWaddr / MAC address filters ----
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def hwaddr(value, query = '', alias = 'hwaddr'):
    ''' Check if string is a HW/MAC address and filter it '''

    query_func_extra_args = {
            '': ('value',),
            }
    query_func_map = {
            '': _empty_hwaddr_query,
            'bare': _bare_query,
            'bool': _bool_hwaddr_query,
            'int': _int_hwaddr_query,
            'cisco': _cisco_query,
            'eui48': _win_query,
            'linux': _linux_query,
            'pgsql': _postgresql_query,
            'postgresql': _postgresql_query,
            'psql': _postgresql_query,
            'unix': _unix_query,
            'win': _win_query,
            }

    try:
        v = netaddr.EUI(value)
    except:
        if query and query != 'bool':
            raise errors.AnsibleFilterError(alias + ': not a hardware address: %s' % value)

    extras = []
    for arg in query_func_extra_args.get(query, tuple()):
        extras.append(locals()[arg])
    try:
        return query_func_map[query](v, *extras)
    except KeyError:
        raise errors.AnsibleFilterError(alias + ': unknown filter type: %s' % query)

    return False
项目:taf    作者:taf3    | 项目源码 | 文件源码
def table_9_test_preparation(self, env, tg_port, sw_port, packet_num, ipgap, offset=0, arp_packet=None):
        """Prepare ports, packets for table 9 related tests.

        """
        # Configure ARP packet and stream
        if arp_packet:
            srcmac = arp_packet[0]['Ether']['src']
            arp_packet[0]['Ether']['src'] = str(EUI(EUI(srcmac).value + offset, dialect=mac_unix_expanded))
            srcip = arp_packet[1]['ARP']['psrc']
            arp_packet[1]['ARP']['psrc'] = str(IPAddress(srcip) + offset)
        else:
            raise UIException("ARP packet not supplied")

        # set admin status of host and switch to Up, wait till operational status is up
        sw_port_id = int(sw_port.split()[1])
        sw_instances = [switch for switch in env.switch[1].node.values() if switch.id != sw_port.split()[0]]
        sw_instance = [switch for switch in env.switch[1].node.values() if switch.id == sw_port.split()[0]]
        if not sw_instance:
            raise UIException("Not found switch id and port pair connection in configuration")
        env.tg[1].connect_port(tg_port)
        sw_instance[0].ui.modify_ports(ports=[sw_port_id], adminMode='Up')
        sw_instance[0].ui.wait_for_port_value_to_change([sw_port_id], 'operationalStatus', "Up")

        # Prepare and send stream
        arp_stream = env.tg[1].set_stream(arp_packet, count=packet_num, inter=ipgap, iface=tg_port,
                                          arp_sa_increment=(1, packet_num + offset),
                                          arp_sip_increment=(1, packet_num + offset))
        env.tg[1].start_streams([arp_stream])
        time.sleep(ipgap * packet_num)
        env.tg[1].stop_streams([arp_stream])
        return sw_instances
项目:quark    作者:openstack    | 项目源码 | 文件源码
def _port_dict(port, fields=None):
    res = {"id": port.get("id"),
           "name": port.get("name"),
           "network_id": port["network_id"],
           "tenant_id": port.get("tenant_id"),
           "mac_address": port.get("mac_address"),
           "admin_state_up": port.get("admin_state_up"),
           "status": "ACTIVE",
           "security_groups": [group.get("id", None) for group in
                               port.get("security_groups", None)],
           "device_id": port.get("device_id"),
           "device_owner": port.get("device_owner")}

    if "mac_address" in res and res["mac_address"]:
        mac = str(netaddr.EUI(res["mac_address"])).replace('-', ':')
        res["mac_address"] = mac

    # NOTE(mdietz): more pythonic key in dict check fails here. Leave as get
    if port.get("bridge"):
        res["bridge"] = port["bridge"]

    # NOTE(ClifHouck): This causes another trip to the DB since tags are
    # are not eager loaded. According to mdietz this be a small impact on
    # performance, but if the tag system gets used more on ports, we may
    # want to eager load the tags.
    try:
        t = PORT_TAG_REGISTRY.get_all(port)
        res.update(t)
    except Exception as e:
        # NOTE(morgabra) We really don't want to break port-listing if
        # this goes sideways here, so we pass.
        msg = ("Unknown error loading tags for port %s: %s"
               % (port["id"], e))
        LOG.exception(msg)

    return res
项目:quark    作者:openstack    | 项目源码 | 文件源码
def delete_port(context, id):
    """Delete a port.

    : param context: neutron api request context
    : param id: UUID representing the port to delete.
    """
    LOG.info("delete_port %s for tenant %s" % (id, context.tenant_id))

    port = db_api.port_find(context, id=id, scope=db_api.ONE)
    if not port:
        raise n_exc.PortNotFound(port_id=id)

    if 'device_id' in port:  # false is weird, but ignore that
        LOG.info("delete_port %s for tenant %s has device %s" %
                 (id, context.tenant_id, port['device_id']))

    backend_key = port["backend_key"]
    mac_address = netaddr.EUI(port["mac_address"]).value
    ipam_driver = _get_ipam_driver(port["network"], port=port)
    ipam_driver.deallocate_mac_address(context, mac_address)
    ipam_driver.deallocate_ips_by_port(
        context, port, ipam_reuse_after=CONF.QUARK.ipam_reuse_after)

    net_driver = _get_net_driver(port["network"], port=port)
    base_net_driver = _get_net_driver(port["network"])
    net_driver.delete_port(context, backend_key, device_id=port["device_id"],
                           mac_address=port["mac_address"],
                           base_net_driver=base_net_driver)

    with context.session.begin():
        db_api.port_delete(context, port)
项目:quark    作者:openstack    | 项目源码 | 文件源码
def test_to_mac_range_cidr_format(self):
        cidr, first, last = mac_address_ranges._to_mac_range("AA:BB:CC/24")
        first_mac = str(netaddr.EUI(first, dialect=netaddr.mac_unix))
        last_mac = str(netaddr.EUI(last, dialect=netaddr.mac_unix))
        self.assertEqual(cidr, "AA:BB:CC:00:00:00/24")
        self.assertEqual(first_mac, "aa:bb:cc:0:0:0")
        self.assertEqual(last_mac, "aa:bb:cd:0:0:0")
项目:quark    作者:openstack    | 项目源码 | 文件源码
def test_to_mac_range_just_prefix(self):
        cidr, first, last = mac_address_ranges._to_mac_range("AA:BB:CC")
        first_mac = str(netaddr.EUI(first, dialect=netaddr.mac_unix))
        last_mac = str(netaddr.EUI(last, dialect=netaddr.mac_unix))
        self.assertEqual(cidr, "AA:BB:CC:00:00:00/24")
        self.assertEqual(first_mac, "aa:bb:cc:0:0:0")
        self.assertEqual(last_mac, "aa:bb:cd:0:0:0")
项目:quark    作者:openstack    | 项目源码 | 文件源码
def test_to_mac_range_unix_cidr_format(self):
        cidr, first, last = mac_address_ranges._to_mac_range("AA-BB-CC/24")
        first_mac = str(netaddr.EUI(first, dialect=netaddr.mac_unix))
        last_mac = str(netaddr.EUI(last, dialect=netaddr.mac_unix))
        self.assertEqual(cidr, "AA:BB:CC:00:00:00/24")
        self.assertEqual(first_mac, "aa:bb:cc:0:0:0")
        self.assertEqual(last_mac, "aa:bb:cd:0:0:0")
项目:quark    作者:openstack    | 项目源码 | 文件源码
def test_to_mac_range_unix_cidr_format_normal_length(self):
        cidr, first, last = mac_address_ranges._to_mac_range("aabbcc000000/29")
        first_mac = str(netaddr.EUI(first, dialect=netaddr.mac_unix))
        last_mac = str(netaddr.EUI(last, dialect=netaddr.mac_unix))
        self.assertEqual(cidr, "AA:BB:CC:00:00:00/29")
        self.assertEqual(first_mac, "aa:bb:cc:0:0:0")
        self.assertEqual(last_mac, "aa:bb:cc:8:0:0")
项目:quark    作者:openstack    | 项目源码 | 文件源码
def _build_expected_unicorn_request_body(self, floating_ip_address, ports,
                                             actual_body=None):
        if actual_body:
            # Since the port order is non-deterministic, we need to ensure
            # that the order is correct
            actual_port_ids = [endpoint['port']['uuid'] for endpoint in
                               actual_body['floating_ip']['endpoints']]
            reordered_ports = []
            for port_id in actual_port_ids:
                for port in ports:
                    if port['id'] == port_id:
                        reordered_ports.append(port)
            ports = reordered_ports
        endpoints = []
        for port in ports:
            fixed_ips = []
            for fixed_ip in port['fixed_ips']:
                fixed_ips.append({
                    'ip_address': fixed_ip['ip_address'],
                    'version': self.user_subnet['ip_version'],
                    'subnet_id': self.user_subnet['id'],
                    'cidr': self.user_subnet['cidr'],
                    'address_type': 'fixed'
                })
            port_mac = int(netaddr.EUI(port['mac_address'].replace(':', '-')))
            endpoints.append({
                'port': {
                    'uuid': port['id'],
                    'name': port['name'],
                    'network_uuid': port['network_id'],
                    'mac_address': port_mac,
                    'device_id': port['device_id'],
                    'device_owner': port['device_owner'],
                    'fixed_ip': fixed_ips
                },
                'private_ip': port['fixed_ips'][0]['ip_address']
            })
        body = {'public_ip': floating_ip_address,
                'endpoints': endpoints}
        return {'floating_ip': body}
项目:quark    作者:openstack    | 项目源码 | 文件源码
def test_mac_address_ranges(self):
        mr1_mac = netaddr.EUI("AA:AA:AA:00:00:00")
        mr1 = {"cidr": "AA:AA:AA/24", "do_not_use": False,
               "first_address": mr1_mac.value,
               "last_address": netaddr.EUI("AA:AA:AA:FF:FF:FF").value,
               "next_auto_assign_mac": mr1_mac.value}
        with self._fixtures([mr1]):
            with self.context.session.begin():
                ranges = db_api.mac_address_range_find_allocation_counts(
                    self.context)
                self.assertTrue(ranges[0]["cidr"], mr1["cidr"])
项目:quark    作者:openstack    | 项目源码 | 文件源码
def test_mac_address_ranges_do_not_use_returns_nothing(self):
        mr1_mac = netaddr.EUI("AA:AA:AA:00:00:00")
        mr1 = {"cidr": "AA:AA:AA/24", "do_not_use": True,
               "first_address": mr1_mac.value,
               "last_address": netaddr.EUI("AA:AA:AA:FF:FF:FF").value,
               "next_auto_assign_mac": mr1_mac.value}

        with self._fixtures([mr1]):
            with self.context.session.begin():
                ranges = db_api.mac_address_range_find_allocation_counts(
                    self.context)
                self.assertTrue(ranges is None)
项目:quark    作者:openstack    | 项目源码 | 文件源码
def test_update_port_with_security_groups_removal(self, redis_cli):
        mock_client = mock.MagicMock()
        redis_cli.return_value = mock_client

        port_id = str(uuid.uuid4())
        device_id = str(uuid.uuid4())
        mac_address = netaddr.EUI("AA:BB:CC:DD:EE:FF").value
        security_groups = []
        self.driver.update_port(
            context=self.context, network_id="public_network", port_id=port_id,
            device_id=device_id, mac_address=mac_address,
            security_groups=security_groups)
        self.assertEqual(mock_client.serialize_groups.call_count, 0)
        mock_client.delete_vif_rules.assert_called_once_with(
            device_id, mac_address)
项目:quark    作者:openstack    | 项目源码 | 文件源码
def test_delete_port(self, sg_cli):
        device_id = str(uuid.uuid4())
        mac_address = netaddr.EUI("AA:BB:CC:DD:EE:FF").value
        mock_client = mock.MagicMock()
        sg_cli.return_value = mock_client
        self.driver.delete_port(context=self.context, port_id=2,
                                mac_address=mac_address, device_id=device_id)
        mock_client.delete_vif.assert_called_once_with(
            device_id, mac_address)
项目:quark    作者:openstack    | 项目源码 | 文件源码
def test_allocate_v6_with_mac_generates_exceeds_limit_raises(self):
        subnet6 = dict(cidr="feed::/104",
                       first_ip=self.v6_fip.value,
                       id=1,
                       ip_version=6,
                       ip_policy=None,
                       last_ip=self.v6_lip.value,
                       next_auto_assign_ip=self.v6_fip.value)

        address = models.IPAddress()
        address["address"] = self.v46_val
        address["version"] = 4
        address["subnet"] = models.Subnet(cidr="::ffff:0:0/96")
        address["allocated_at"] = timeutils.utcnow()

        mac = models.MacAddress()
        mac["address"] = netaddr.EUI("AA:BB:CC:DD:EE:FF")
        old_override = cfg.CONF.QUARK.v6_allocation_attempts

        cfg.CONF.set_override('v6_allocation_attempts', 0, 'QUARK')

        with self._stubs(subnets=[[(subnet6, 0)]],
                         addresses=[address, None, None]):
            with self.assertRaises(n_exc.IpAddressGenerationFailure):
                addr = []
                self.ipam.allocate_ip_address(self.context, addr, 0, 0, 0,
                                              mac_address=mac)
        cfg.CONF.set_override('v6_allocation_attempts', old_override, 'QUARK')
项目:quark    作者:openstack    | 项目源码 | 文件源码
def test_allocate_v6_with_mac(self):
        port_id = "945af340-ed34-4fec-8c87-853a2df492b4"
        subnet6 = dict(id=1, first_ip=0, last_ip=0,
                       cidr="feed::/104", ip_version=6,
                       next_auto_assign_ip=0,
                       ip_policy=None)
        subnet6 = models.Subnet(**subnet6)

        mac = models.MacAddress()
        mac["address"] = netaddr.EUI("AA:BB:CC:DD:EE:FF")

        old_override = cfg.CONF.QUARK.v6_allocation_attempts
        cfg.CONF.set_override('v6_allocation_attempts', 1, 'QUARK')
        ip_address = netaddr.IPAddress("fe80::")

        with self._stubs(policies=[], ip_address=ip_address) as (
                policy_find, ip_find, ip_create, ip_update):
            a = self.ipam._allocate_from_v6_subnet(self.context, 0, subnet6,
                                                   port_id, self.reuse_after,
                                                   mac_address=mac)
            self.assertEqual(ip_address.value, a["address"])

            # NCP-1548 - leaving this test to show the change from sometimes
            # creating the IP address to always creating the IP address.
            self.assertEqual(0, ip_update.call_count)
            self.assertEqual(1, ip_create.call_count)

        cfg.CONF.set_override('v6_allocation_attempts', old_override, 'QUARK')
项目:quark    作者:openstack    | 项目源码 | 文件源码
def test_rfc2462_generates_valid_ip(self):
        mac = netaddr.EUI("AA:BB:CC:DD:EE:FF")
        cidr = "fe80::/120"
        ip = quark.ipam.rfc2462_ip(mac, cidr)
        self.assertEqual(ip,
                         netaddr.IPAddress('fe80::a8bb:ccff:fedd:eeff').value)
项目:quark    作者:openstack    | 项目源码 | 文件源码
def test_v6_generator(self):
        mac = netaddr.EUI("AA:BB:CC:DD:EE:FF")
        cidr = "fe80::/120"
        port_id = "945af340-ed34-4fec-8c87-853a2df492b4"
        cidr = "fe80::/120"
        gen = quark.ipam.generate_v6(mac, port_id, cidr)
        ip = gen.next()
        self.assertEqual(ip,
                         netaddr.IPAddress('fe80::a8bb:ccff:fedd:eeff').value)
        ip = gen.next()
        self.assertEqual(ip,
                         netaddr.IPAddress('fe80::40c9:a95:d83a:2ffa').value)
项目:quark    作者:openstack    | 项目源码 | 文件源码
def test_apply_rules_set_fails_gracefully(self):
        port_id = 1
        mac_address = netaddr.EUI("AA:BB:CC:DD:EE:FF")
        conn_err = redis.ConnectionError
        with mock.patch("quark.cache.security_groups_client."
                        "redis_base.ClientBase") as redis_mock:
            mocked_redis_cli = mock.MagicMock()
            redis_mock.return_value = mocked_redis_cli

            client = sg_client.SecurityGroupsClient()
            mocked_redis_cli.master.hset.side_effect = conn_err
            with self.assertRaises(q_exc.RedisConnectionFailure):
                client.apply_rules(port_id, mac_address.value, [])
项目:quark    作者:openstack    | 项目源码 | 文件源码
def rfc2462_ip(mac, cidr):
    # NOTE(mdietz): see RFC2462
    int_val = netaddr.IPNetwork(cidr).value
    mac = netaddr.EUI(mac)
    LOG.info("Using RFC2462 method to generate a v6 with MAC %s" % mac)
    int_val += mac.eui64().value
    int_val ^= MAGIC_INT
    return int_val
项目:quark    作者:openstack    | 项目源码 | 文件源码
def deallocate_mac_address(self, context, address, **kwargs):
        admin_context = context.elevated()
        mac = db_api.mac_address_find(admin_context, address=address,
                                      scope=db_api.ONE)
        if not mac:
            raise q_exc.MacAddressNotFound(
                mac_address_id=address,
                readable_mac=netaddr.EUI(address))

        if (mac["mac_address_range"] is None or
                mac["mac_address_range"]["do_not_use"]):
            db_api.mac_address_delete(admin_context, mac)
        else:
            db_api.mac_address_update(admin_context, mac, deallocated=True,
                                      deallocated_at=timeutils.utcnow())
项目:quark    作者:openstack    | 项目源码 | 文件源码
def vif_key(self, device_id, mac_address):
        mac = str(netaddr.EUI(mac_address))

        # Lower cases and strips hyphens from the mac
        mac = mac.translate(MAC_TRANS_TABLE, ":-")
        return "{0}.{1}".format(device_id, mac)
项目:kSnarf    作者:ICSec    | 项目源码 | 文件源码
def macGrab(self, packet):
        """Defines the OUI for a given MAC
        This function serves as an example, it is not ready for implementation
        """
        try:
            parsed_mac = netaddr.EUI(packet.addr2)
            print parsed_mac.oui.registration().org
        except netaddr.core.NotRegisteredError, e:
            fields.append('UNKNOWN')


    ### Move to handler.py
项目:vswitchperf    作者:opnfv    | 项目源码 | 文件源码
def process_flow_template(self, bridge, flow_template):
        """Method adds flows into the vswitch based on given flow template
           and configuration of multistream feature.
        """
        if ('pre_installed_flows' in self._traffic and
                self._traffic['pre_installed_flows'].lower() == 'yes' and
                'multistream' in self._traffic and self._traffic['multistream'] > 0 and
                'stream_type' in self._traffic):
            # multistream feature is enabled and flows should be inserted into OVS
            # so generate flows based on template and multistream configuration
            if self._traffic['stream_type'] == 'L2':
                # iterate through destimation MAC address
                dst_mac_value = netaddr.EUI(self._traffic['l2']['dstmac']).value
                for i in range(self._traffic['multistream']):
                    tmp_mac = netaddr.EUI(dst_mac_value + i)
                    tmp_mac.dialect = netaddr.mac_unix_expanded
                    flow_template.update({'dl_dst':tmp_mac})
                    # optimize flow insertion by usage of cache
                    self._vswitch.add_flow(bridge, flow_template, cache='on')
            elif self._traffic['stream_type'] == 'L3':
                # iterate through destimation IP address
                dst_ip_value = netaddr.IPAddress(self._traffic['l3']['dstip']).value
                for i in range(self._traffic['multistream']):
                    tmp_ip = netaddr.IPAddress(dst_ip_value + i)
                    flow_template.update({'dl_type':'0x0800', 'nw_dst':tmp_ip})
                    # optimize flow insertion by usage of cache
                    self._vswitch.add_flow(bridge, flow_template, cache='on')
            elif self._traffic['stream_type'] == 'L4':
                # read transport protocol from configuration and iterate through its destination port
                proto = _PROTO_TCP if self._traffic['l3']['proto'].lower() == 'tcp' else _PROTO_UDP
                for i in range(self._traffic['multistream']):
                    flow_template.update({'dl_type':'0x0800', 'nw_proto':proto, 'tp_dst':i})
                    # optimize flow insertion by usage of cache
                    self._vswitch.add_flow(bridge, flow_template, cache='on')
            else:
                self._logger.error('Stream type is set to uknown value %s', self._traffic['stream_type'])
            # insert cached flows into the OVS
            self._vswitch.add_flow(bridge, [], cache='flush')
        else:
            self._vswitch.add_flow(bridge, flow_template)
项目:f5-aws-vpn    作者:f5devcentral    | 项目源码 | 文件源码
def slaac(value, query = ''):
    ''' Get the SLAAC address within given network '''
    try:
        vtype = ipaddr(value, 'type')
        if vtype == 'address':
            v = ipaddr(value, 'cidr')
        elif vtype == 'network':
            v = ipaddr(value, 'subnet')

        if v.version != 6:
            return False

        value = netaddr.IPNetwork(v)
    except:
        return False

    if not query:
        return False

    try:
        mac = hwaddr(query, alias = 'slaac')

        eui = netaddr.EUI(mac)
    except:
        return False

    return eui.ipv6(value.network)


# ---- HWaddr / MAC address filters ----
项目:f5-aws-vpn    作者:f5devcentral    | 项目源码 | 文件源码
def hwaddr(value, query = '', alias = 'hwaddr'):
    ''' Check if string is a HW/MAC address and filter it '''

    query_func_extra_args = {
            '': ('value',),
            }
    query_func_map = {
            '': _empty_hwaddr_query,
            'bare': _bare_query,
            'bool': _bool_hwaddr_query,
            'cisco': _cisco_query,
            'eui48': _win_query,
            'linux': _linux_query,
            'pgsql': _postgresql_query,
            'postgresql': _postgresql_query,
            'psql': _postgresql_query,
            'unix': _unix_query,
            'win': _win_query,
            }

    try:
        v = netaddr.EUI(value)
    except:
        if query and query != 'bool':
            raise errors.AnsibleFilterError(alias + ': not a hardware address: %s' % value)

    extras = []
    for arg in query_func_extra_args.get(query, tuple()):
        extras.append(locals()[arg])
    try:
        return query_func_map[query](v, *extras)
    except KeyError:
        raise errors.AnsibleFilterError(alias + ': unknown filter type: %s' % query)

    return False
项目:EvilTwinFramework    作者:Esser420    | 项目源码 | 文件源码
def __init__(self, id=0, ssid=None, client_mac=None, location=None):
        self.id = id
        self.ssid = ssid
        self.client_mac = client_mac
        self.location = location
        self.client_org = None
        try:
            self.client_org = EUI(self.client_mac).oui.registration().org   # OUI - Organizational Unique Identifier
        except: pass                                                        # OUI not registered exception
项目:EvilTwinFramework    作者:Esser420    | 项目源码 | 文件源码
def __init__(self, id=0, ssid=None, bssid=None, date=None, location=None):
        self.id = id
        self.ssid = ssid
        self.bssid = bssid
        self.date = date
        self.location = location
        self.ap_org = None
        try:
            self.ap_org = EUI(self.bssid).oui.registration().org    # OUI - Organizational Unique Identifier
        except: pass
项目:EvilTwinFramework    作者:Esser420    | 项目源码 | 文件源码
def __init__(self, id=0, ssid=None, client_mac=None, date=None, location=None):
        self.id = id
        self.ssid = ssid
        self.client_mac = client_mac
        self.date = date
        self.location = location
        self.client_org = None
        try:
            self.client_org = EUI(self.client_mac).oui.registration().org    # OUI - Organizational Unique Identifier
        except: pass
项目:EvilTwinFramework    作者:Esser420    | 项目源码 | 文件源码
def get_vendor(mac):
        if mac != "":
            maco = EUI(mac)                         # EUI - Extended Unique Identifier
            try:
                return maco.oui.registration().org  # OUI - Organizational Unique Identifier
            except:                                 # OUI not registered exception
                return None
        return None
项目:simple-ostinato    作者:little-dude    | 项目源码 | 文件源码
def source(self):
        """
        Source MAC address
        """
        return str(netaddr.EUI(self._src_mac))
项目:simple-ostinato    作者:little-dude    | 项目源码 | 文件源码
def source(self, value):
        self._src_mac = netaddr.EUI(value).value
项目:simple-ostinato    作者:little-dude    | 项目源码 | 文件源码
def destination(self):
        """
        destination MAC address
        """
        return str(netaddr.EUI(self._dst_mac))
项目:simple-ostinato    作者:little-dude    | 项目源码 | 文件源码
def destination(self, value):
        self._dst_mac = netaddr.EUI(value).value
项目:simple-ostinato    作者:little-dude    | 项目源码 | 文件源码
def test_traffic_variable(self):
        tx = self.layer.tx
        rx = self.layer.rx
        tx.streams[2].is_enabled = True
        tx.streams[2].save()
        utils.send_and_receive(tx, rx, duration=1, save_as='capture.pcap')
        if utils.is_pypy():
            return
        capture = pyshark.FileCapture('capture.pcap')
        for num_pkt, pkt in enumerate(capture):
            self.assertEqual((num_pkt + 1) * 10, int(pkt.eth.len))
            src = netaddr.EUI(pkt.eth.src).value
            self.assertEqual(num_pkt * 0x100, src)
            dst = netaddr.EUI(pkt.eth.dst).value
            self.assertEqual(0xffffffffffff - num_pkt * 0x1000000, dst)