Python netaddr 模块,mac_unix_expanded() 实例源码


项目:taf    作者:taf3    | 项目源码 | 文件源码
def table_9_test_preparation(self, env, tg_port, sw_port, packet_num, ipgap, offset=0, arp_packet=None):
        """Prepare ports, packets for table 9 related tests.

        # Configure ARP packet and stream
        if arp_packet:
            srcmac = arp_packet[0]['Ether']['src']
            arp_packet[0]['Ether']['src'] = str(EUI(EUI(srcmac).value + offset, dialect=mac_unix_expanded))
            srcip = arp_packet[1]['ARP']['psrc']
            arp_packet[1]['ARP']['psrc'] = str(IPAddress(srcip) + offset)
            raise UIException("ARP packet not supplied")

        # set admin status of host and switch to Up, wait till operational status is up
        sw_port_id = int(sw_port.split()[1])
        sw_instances = [switch for switch in env.switch[1].node.values() if != sw_port.split()[0]]
        sw_instance = [switch for switch in env.switch[1].node.values() if == sw_port.split()[0]]
        if not sw_instance:
            raise UIException("Not found switch id and port pair connection in configuration")[1].connect_port(tg_port)
        sw_instance[0].ui.modify_ports(ports=[sw_port_id], adminMode='Up')
        sw_instance[0].ui.wait_for_port_value_to_change([sw_port_id], 'operationalStatus', "Up")

        # Prepare and send stream
        arp_stream =[1].set_stream(arp_packet, count=packet_num, inter=ipgap, iface=tg_port,
                                          arp_sa_increment=(1, packet_num + offset),
                                          arp_sip_increment=(1, packet_num + offset))[1].start_streams([arp_stream])
        time.sleep(ipgap * packet_num)[1].stop_streams([arp_stream])
        return sw_instances
项目:vswitchperf    作者:opnfv    | 项目源码 | 文件源码
def process_flow_template(self, bridge, flow_template):
        """Method adds flows into the vswitch based on given flow template
           and configuration of multistream feature.
        if ('pre_installed_flows' in self._traffic and
                self._traffic['pre_installed_flows'].lower() == 'yes' and
                'multistream' in self._traffic and self._traffic['multistream'] > 0 and
                'stream_type' in self._traffic):
            # multistream feature is enabled and flows should be inserted into OVS
            # so generate flows based on template and multistream configuration
            if self._traffic['stream_type'] == 'L2':
                # iterate through destimation MAC address
                dst_mac_value = netaddr.EUI(self._traffic['l2']['dstmac']).value
                for i in range(self._traffic['multistream']):
                    tmp_mac = netaddr.EUI(dst_mac_value + i)
                    tmp_mac.dialect = netaddr.mac_unix_expanded
                    # optimize flow insertion by usage of cache
                    self._vswitch.add_flow(bridge, flow_template, cache='on')
            elif self._traffic['stream_type'] == 'L3':
                # iterate through destimation IP address
                dst_ip_value = netaddr.IPAddress(self._traffic['l3']['dstip']).value
                for i in range(self._traffic['multistream']):
                    tmp_ip = netaddr.IPAddress(dst_ip_value + i)
                    flow_template.update({'dl_type':'0x0800', 'nw_dst':tmp_ip})
                    # optimize flow insertion by usage of cache
                    self._vswitch.add_flow(bridge, flow_template, cache='on')
            elif self._traffic['stream_type'] == 'L4':
                # read transport protocol from configuration and iterate through its destination port
                proto = _PROTO_TCP if self._traffic['l3']['proto'].lower() == 'tcp' else _PROTO_UDP
                for i in range(self._traffic['multistream']):
                    flow_template.update({'dl_type':'0x0800', 'nw_proto':proto, 'tp_dst':i})
                    # optimize flow insertion by usage of cache
                    self._vswitch.add_flow(bridge, flow_template, cache='on')
                self._logger.error('Stream type is set to uknown value %s', self._traffic['stream_type'])
            # insert cached flows into the OVS
            self._vswitch.add_flow(bridge, [], cache='flush')
            self._vswitch.add_flow(bridge, flow_template)
项目:ovsdbapp    作者:openstack    | 项目源码 | 文件源码
def __init__(self, api, router, port, mac, networks,
                 peer=None, may_exist=False, **columns):
        self.mac = str(netaddr.EUI(mac, dialect=netaddr.mac_unix_expanded))
        self.networks = [str(netaddr.IPNetwork(net)) for net in networks]
        self.router = router
        self.port = port
        self.peer = peer
        self.may_exist = may_exist
        self.columns = columns
        super(LrpAddCommand, self).__init__(api)
项目:ovsdbapp    作者:openstack    | 项目源码 | 文件源码
def __init__(self, api, router, nat_type, external_ip, logical_ip,
                 logical_port=None, external_mac=None, may_exist=False):
        if nat_type not in const.NAT_TYPES:
            raise TypeError("nat_type not in %s" % str(const.NAT_TYPES))
        external_ip = str(netaddr.IPAddress(external_ip))
        if nat_type == const.NAT_DNAT:
            logical_ip = str(netaddr.IPAddress(logical_ip))
            net = netaddr.IPNetwork(logical_ip)
            logical_ip = str(net.ip if net.prefixlen == 32 else net)
        if (logical_port is None) != (external_mac is None):
            msg = "logical_port and external_mac must be passed together"
            raise TypeError(msg)
        if logical_port and nat_type != const.NAT_BOTH:
            msg = "logical_port/external_mac only valid for %s" % (
            raise TypeError(msg)
        if external_mac:
            external_mac = str(
                netaddr.EUI(external_mac, dialect=netaddr.mac_unix_expanded))
        super(LrNatAddCommand, self).__init__(api)
        self.router = router
        self.nat_type = nat_type
        self.external_ip = external_ip
        self.logical_ip = logical_ip
        self.logical_port = logical_port or []
        self.external_mac = external_mac or []
        self.may_exist = may_exist
项目:taf    作者:taf3    | 项目源码 | 文件源码
def arp_packets_sending(self, env, tg_port, sw_port, packet_num, ipgap, arp_packet=None, offset=0, retry=False):
        cmd = "match -f 5555 -p 30001 get_rules table 9"
        sw_t9 = {}
        sw_instances = self.table_9_test_preparation(env, tg_port, sw_port, ipgap=ipgap,
                                                     packet_num=packet_num, arp_packet=arp_packet,
        # wait for table 9 to be populated
        # check table 9 entries
        assert sw_instances, "No end point ports found connected to the host"
        for switch in sw_instances:
            output = switch.ui.cli_send_command(cmd).stdout
            sw_t9[switch] = len(re.findall(r'^table\s+:\s+9', output, re.MULTILINE))
        if not retry:
            return sw_t9

        failed = False
        for switch, count in sw_t9.items():
            if count != packet_num:
                failed = True
        if not failed:
            return sw_t9
        # in case not all entries are learned fast, try add them again
        arp_packet = deepcopy(arp_packet)
        srcmac = arp_packet[0]['Ether']['src']
        srcip = arp_packet[1]['ARP']['psrc']
        count = 2
        while failed and count:
            count -= 1
  "Adding not learned MAC addresses to the table 9")
            sw_instances = [switch for switch in env.switch[1].node.values() if != sw_port.split()[0]]
            for switch in sw_instances:
                output = switch.ui.cli_send_command("match -f 5555 -p 30001 get_rules table 9").stdout
                learned = re.findall(r'ethernet.dst_mac\s=\s((?:[\w]{2}:){5}[\w]{2})', output, re.MULTILINE)
                to_add = [el for el in [str(EUI(EUI(srcmac).value + ix, dialect=mac_unix_expanded))
                                        for ix in range(packet_num)] if el not in learned]
      "{} entries are missing for switch {}".format(len(to_add),
                for mac in to_add:
                    diff = EUI(mac).value - EUI(srcmac).value
                    arp_packet[0]['Ether']['src'] = str(EUI(EUI(srcmac).value + diff, dialect=mac_unix_expanded))
                    arp_packet[1]['ARP']['psrc'] = str(IPAddress(IPAddress(srcip).value + diff))
                    # Prepare and send stream
                    arp_stream =[1].set_stream(arp_packet, count=1, iface=tg_port)

  "Verify table#9 entries")
            for switch in sw_instances:
                output = switch.ui.cli_send_command(cmd).stdout
                if packet_num == len(re.findall(r'^table\s+:\s+9', output, re.MULTILINE)):
                    failed = False
                    failed = True

        for switch in sw_instances:
            output = switch.ui.cli_send_command(cmd).stdout
            sw_t9[switch] = len(re.findall(r'^table\s+:\s+9', output, re.MULTILINE))
        return sw_t9
项目:vswitchperf    作者:opnfv    | 项目源码 | 文件源码
def _expand_vm_settings(self, key, vm_number):
        Expand VM option with given key for given number of VMs
        tmp_value = self.getValue(key)
        if isinstance(tmp_value, str):
            scalar = True
            master_value = tmp_value
            tmp_value = [tmp_value]
            scalar = False
            master_value = tmp_value[0]

        master_value_str = str(master_value)
        if master_value_str.find('#') >= 0:
            self.__dict__[key] = []
            for vmindex in range(vm_number):
                value = master_value_str.replace('#VMINDEX', str(vmindex))
                for macro, args, param, _, step in re.findall(_PARSE_PATTERN, value):
                    multi = int(step) if len(step) and int(step) else 1
                    if macro == '#EVAL':
                        # pylint: disable=eval-used
                        tmp_result = str(eval(param))
                    elif macro == '#MAC':
                        mac_value = netaddr.EUI(param).value
                        mac = netaddr.EUI(mac_value + vmindex * multi)
                        mac.dialect = netaddr.mac_unix_expanded
                        tmp_result = str(mac)
                    elif macro == '#IP':
                        ip_value = netaddr.IPAddress(param).value
                        tmp_result = str(netaddr.IPAddress(ip_value + vmindex * multi))
                        raise RuntimeError('Unknown configuration macro {} in {}'.format(macro, key))

                    value = value.replace("{}{}".format(macro, args), tmp_result)

                # retype value to original type if needed
                if not isinstance(master_value, str):
                    value = ast.literal_eval(value)
            for vmindex in range(len(tmp_value), vm_number):

        if scalar:
            self.__dict__[key] = self.__dict__[key][0]

        _LOGGER.debug("Expanding option: %s = %s", key, self.__dict__[key])
项目:pynoc    作者:SimplicityGuy    | 项目源码 | 文件源码
def _parse_mac_address_table_output(self, output, ignore_port=None):
        """MAC Address Table parsing.

        Parses the output of 'sh mac address-table' command.

        :param output: the output of the command
                      Mac Address Table
            Vlan    Mac Address       Type        Ports
            ----    -----------       --------    -----
             All    0100.0ccc.cccc    STATIC      CPU
             All    ffff.ffff.ffff    STATIC      CPU
             601    000b.7866.5240    DYNAMIC     Gi1/0/48
             601    0013.20fe.56b4    DYNAMIC     Gi1/0/35
             Total Mac Addresses for this criterion: 59
        :param ignore_port: ignore this port
        :return: list of dicts containing device connection metrics
        lookup = []
        lines = [line.strip() for line in output.splitlines()]
        while len(lines) > 0:
            line = lines.pop(0)

            # Table entries will always have a '.' for the MAC address.
            # If there isn't one it's not a row we care about.
            if line.find('.') < 0:

            values = [entry for entry in line.split() if entry]

            # Ignore non-physical ports
            port_types = self.PORT_NOTATION.values()
            if all(values[3].lower().find(port.lower()) != 0
                   for port in port_types):

            # If the ignore_port is specified and is the port in question,
            # ignore it.
            ignore_port = self._shorthand_port_notation(ignore_port)
            if ignore_port and values[3] == ignore_port:

                    'mac': EUI(values[1], dialect=mac_unix_expanded),
                    'interface': self._shorthand_port_notation(values[3])
        return sorted(lookup, key=lambda k: k['interface'])
项目:pynoc    作者:SimplicityGuy    | 项目源码 | 文件源码
def _parse_ipdt_output(self, output):
        """IPDT output parsing.

        Parses the output of the `show ip device track` command.

        :param output: the output of the command
            IP Device Tracking = Enabled
            IP Device Tracking Probe Count = 3
            IP Device Tracking Probe Interval = 30
            IP Device Tracking Probe Delay Interval = 0
              IP Address     MAC Address   Vlan  Interface              STATE
       6cec.eb68.c86f  601  GigabitEthernet1/0/14  ACTIVE
       6cec.eb67.836c  601  GigabitEthernet1/0/12  ACTIVE

            Total number interfaces enabled: 47
            Enabled interfaces:
             Gi1/0/1, Gi1/0/2, Gi1/0/3, Gi1/0/4, Gi1/0/5, Gi1/0/6, Gi1/0/7,
        :return: list of dicts containing device connection metrics
        lookup = []
        lines = [line.strip() for line in output.splitlines()]
        while len(lines) > 0:
            line = lines.pop(0)

            # Table entries will always have a '.' for the MAC address.
            # If there isn't one it's not a row we care about.
            if line.find('.') < 0:

            values = [entry for entry in line.split() if entry]

            # Ignore any 'INACTIVE' entries, meaning that there hasn't been
            # traffic from that MAC Address is has likely been unplugged.
            if values[4] == 'INACTIVE':

                    'ip': values[0],
                    'mac': EUI(values[1], dialect=mac_unix_expanded),
                    'interface': self._shorthand_port_notation(values[3])
        return sorted(lookup, key=lambda k: k['interface'])