Python oslo_utils.uuidutils 模块,is_uuid_like() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用oslo_utils.uuidutils.is_uuid_like()

项目:cyborg    作者:openstack    | 项目源码 | 文件源码
def add_identity_filter(query, value):
    """Adds an identity filter to a query.

    Filters results by ID, if supplied value is a valid integer.
    Otherwise attempts to filter results by UUID.

    :param query: Initial query to add filter to.
    :param value: Value for filtering results by.
    :return: Modified query.
    """
    if strutils.is_int_like(value):
        return query.filter_by(id=value)
    elif uuidutils.is_uuid_like(value):
        return query.filter_by(uuid=value)
    else:
        raise exception.InvalidIdentity(identity=value)
项目:iotronic    作者:openstack    | 项目源码 | 文件源码
def add_identity_filter(query, value):
    """Adds an identity filter to a query.

    Filters results by ID, if supplied value is a valid integer.
    Otherwise attempts to filter results by UUID.

    :param query: Initial query to add filter to.
    :param value: Value for filtering results by.
    :return: Modified query.
    """
    if strutils.is_int_like(value):
        return query.filter_by(id=value)
    elif uuidutils.is_uuid_like(value):
        return query.filter_by(uuid=value)
    else:
        raise exception.InvalidIdentity(identity=value)
项目:iotronic    作者:openstack    | 项目源码 | 文件源码
def destroy_board(self, board_id):

        session = get_session()
        with session.begin():
            query = model_query(models.Board, session=session)
            query = add_identity_filter(query, board_id)
            try:
                board_ref = query.one()
            except NoResultFound:
                raise exception.BoardNotFound(board=board_id)

            # Get board ID, if an UUID was supplied. The ID is
            # required for deleting all ports, attached to the board.
            if uuidutils.is_uuid_like(board_id):
                board_id = board_ref['id']

            location_query = model_query(models.Location, session=session)
            location_query = self._add_location_filter_by_board(
                location_query, board_id)
            location_query.delete()

            query.delete()
项目:iotronic    作者:openstack    | 项目源码 | 文件源码
def destroy_plugin(self, plugin_id):

        session = get_session()
        with session.begin():
            query = model_query(models.Plugin, session=session)
            query = add_identity_filter(query, plugin_id)
            try:
                plugin_ref = query.one()
            except NoResultFound:
                raise exception.PluginNotFound(plugin=plugin_id)

            # Get plugin ID, if an UUID was supplied. The ID is
            # required for deleting all ports, attached to the plugin.
            if uuidutils.is_uuid_like(plugin_id):
                plugin_id = plugin_ref['id']

            query.delete()
项目:iotronic    作者:openstack    | 项目源码 | 文件源码
def get_rpc_plugin(plugin_ident):
    """Get the RPC plugin from the plugin uuid or logical name.

    :param plugin_ident: the UUID or logical name of a plugin.

    :returns: The RPC Plugin.
    :raises: InvalidUuidOrName if the name or uuid provided is not valid.
    :raises: PluginNotFound if the plugin is not found.
    """
    # Check to see if the plugin_ident is a valid UUID.  If it is, treat it
    # as a UUID.
    if uuidutils.is_uuid_like(plugin_ident):
        return objects.Plugin.get_by_uuid(pecan.request.context, plugin_ident)

    # We can refer to plugins by their name, if the client supports it
    # if allow_plugin_logical_names():
    #    if utils.is_hostname_safe(plugin_ident):
    else:
        return objects.Plugin.get_by_name(pecan.request.context, plugin_ident)

    raise exception.InvalidUuidOrName(name=plugin_ident)

    raise exception.PluginNotFound(plugin=plugin_ident)
项目:zun    作者:openstack    | 项目源码 | 文件源码
def add_identity_filter(query, value):
    """Adds an identity filter to a query.

    Filters results by ID, if supplied value is a valid integer.
    Otherwise attempts to filter results by UUID.

    :param query: Initial query to add filter to.
    :param value: Value for filtering results by.
    :return: Modified query.
    """
    if strutils.is_int_like(value):
        return query.filter_by(id=value)
    elif uuidutils.is_uuid_like(value):
        return query.filter_by(uuid=value)
    else:
        raise exception.InvalidIdentity(identity=value)
项目:python-karborclient    作者:openstack    | 项目源码 | 文件源码
def do_verification_create(cs, args):
    """Creates a verification."""
    if not uuidutils.is_uuid_like(args.provider_id):
        raise exceptions.CommandError(
            "Invalid provider id provided.")

    if not uuidutils.is_uuid_like(args.checkpoint_id):
        raise exceptions.CommandError(
            "Invalid checkpoint id provided.")

    verification_parameters = arg_utils.extract_parameters(args)
    verification = cs.verifications.create(args.provider_id,
                                           args.checkpoint_id,
                                           verification_parameters)
    dict_format_list = {"parameters"}
    utils.print_dict(verification.to_dict(), dict_format_list=dict_format_list)
项目:mogan    作者:openstack    | 项目源码 | 文件源码
def add_identity_filter(query, value):
    """Adds an identity filter to a query.

    Filters results by ID, if supplied value is a valid integer.
    Otherwise attempts to filter results by UUID.

    :param query: Initial query to add filter to.
    :param value: Value for filtering results by.
    :return: Modified query.
    """
    if strutils.is_int_like(value):
        return query.filter_by(id=value)
    elif uuidutils.is_uuid_like(value):
        return query.filter_by(uuid=value)
    else:
        raise exception.InvalidParameterValue(identity=value)
项目:python-masakariclient    作者:openstack    | 项目源码 | 文件源码
def get_uuid_by_name(manager, name, segment=None):
    """Helper methods for getting uuid of segment or host by name.

    :param manager: A client manager class
    :param name: The resource we are trying to find a uuid
    :param segment: segment id, default None
    :return: The uuid of found resource
    """

    # If it cannot be found return the name.
    uuid = name
    if not uuidutils.is_uuid_like(name):
        if segment:
            items = manager.hosts(segment)
        else:
            items = manager.segments()

        for item in items:
            item_name = getattr(item, 'name')
            if item_name == name:
                uuid = getattr(item, 'uuid')
                break
    return uuid
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def _get_floating_ip_pool_id_by_name_or_id(self, client, name_or_id):
        search_opts = {constants.NET_EXTERNAL: True, 'fields': 'id'}
        if uuidutils.is_uuid_like(name_or_id):
            search_opts.update({'id': name_or_id})
        else:
            search_opts.update({'name': name_or_id})
        data = client.list_networks(**search_opts)
        nets = data['networks']

        if len(nets) == 1:
            return nets[0]['id']
        elif len(nets) == 0:
            raise exception.FloatingIpPoolNotFound()
        else:
            msg = (_("Multiple floating IP pools matches found for name '%s'")
                   % name_or_id)
            raise exception.NovaException(message=msg)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def fixed_ip_get_by_instance(context, instance_uuid):
    if not uuidutils.is_uuid_like(instance_uuid):
        raise exception.InvalidUUID(uuid=instance_uuid)

    vif_and = and_(models.VirtualInterface.id ==
                   models.FixedIp.virtual_interface_id,
                   models.VirtualInterface.deleted == 0)
    result = model_query(context, models.FixedIp, read_deleted="no").\
                 filter_by(instance_uuid=instance_uuid).\
                 outerjoin(models.VirtualInterface, vif_and).\
                 options(contains_eager("virtual_interface")).\
                 options(joinedload('network')).\
                 options(joinedload('floating_ips')).\
                 order_by(asc(models.VirtualInterface.created_at),
                          asc(models.VirtualInterface.id)).\
                 all()

    if not result:
        raise exception.FixedIpNotFoundForInstance(instance_uuid=instance_uuid)

    return result
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def _get_requested_instance_group(context, filter_properties,
                                      check_quota):
        if (not filter_properties or
                not filter_properties.get('scheduler_hints')):
            return

        group_hint = filter_properties.get('scheduler_hints').get('group')
        if not group_hint:
            return

        # TODO(gibi): We need to remove the following validation code when
        # removing legacy v2 code.
        if not uuidutils.is_uuid_like(group_hint):
            msg = _('Server group scheduler hint must be a UUID.')
            raise exception.InvalidInput(reason=msg)

        return objects.InstanceGroup.get_by_uuid(context, group_hint)
项目:craton    作者:openstack    | 项目源码 | 文件源码
def create_project(self, name, variables=None):
        url = self.url + '/v1/projects'
        payload = {'name': name}
        if variables:
            payload['variables'] = variables
        response = self.post(url, headers=self.root_headers, data=payload)
        self.assertEqual(201, response.status_code)
        self.assertIn('Location', response.headers)
        project = response.json()
        self.assertTrue(uuidutils.is_uuid_like(project['id']))
        self.assertEqual(
            response.headers['Location'],
            "{}/{}".format(url, project['id'])
        )
        return project
项目:craton    作者:openstack    | 项目源码 | 文件源码
def process_request(self, request):
        headers = request.headers
        project_id = headers.get('X-Auth-Project')
        if not uuidutils.is_uuid_like(project_id):
            raise exceptions.AuthenticationError(
                message="Project ID ('{}') is not a valid UUID".format(
                    project_id
                )
            )

        ctx = self.make_context(
            request,
            auth_token=headers.get('X-Auth-Token', None),
            user=headers.get('X-Auth-User', None),
            tenant=project_id,
            )

        # NOTE(sulo): this means every api call hits the db
        # at least once for auth. Better way to handle this?
        try:
            user_info = dbapi.get_user_info(ctx,
                                            headers.get('X-Auth-User', None))
            if user_info.api_key != headers.get('X-Auth-Token', None):
                raise exceptions.AuthenticationError
            if user_info.is_root:
                ctx.is_admin = True
                ctx.is_admin_project = True
            elif user_info.is_admin:
                ctx.is_admin = True
                ctx.is_admin_project = False
            else:
                ctx.is_admin = False
                ctx.is_admin_project = False
        except exceptions.NotFound:
            raise exceptions.AuthenticationError
项目:cyborg    作者:openstack    | 项目源码 | 文件源码
def validate(value):
        if not uuidutils.is_uuid_like(value):
            raise exception.InvalidUUID(uuid=value)
        return value
项目:iotronic    作者:openstack    | 项目源码 | 文件源码
def validate(value):
        if not (uuidutils.is_uuid_like(value)
                or v1_utils.is_valid_logical_name(value)):
            raise exception.InvalidUuidOrName(name=value)
        return value
项目:iotronic    作者:openstack    | 项目源码 | 文件源码
def validate(value):
        if not uuidutils.is_uuid_like(value):
            raise exception.InvalidUUID(uuid=value)
        return value
项目:iotronic    作者:openstack    | 项目源码 | 文件源码
def is_valid_board_name(name):
    """Determine if the provided name is a valid board name.

    Check to see that the provided board name is valid, and isn't a UUID.

    :param: name: the board name to check.
    :returns: True if the name is valid, False otherwise.
    """
    return utils.is_hostname_safe(name) and (not uuidutils.is_uuid_like(name))
项目:iotronic    作者:openstack    | 项目源码 | 文件源码
def is_valid_name(name):
    """Determine if the provided name is a valid name.

    Check to see that the provided board name isn't a UUID.

    :param: name: the board name to check.
    :returns: True if the name is valid, False otherwise.
    """
    return not uuidutils.is_uuid_like(name)
项目:iotronic    作者:openstack    | 项目源码 | 文件源码
def get(cls, context, plugin_id):
        """Find a plugin based on its id or uuid and return a Board object.

        :param plugin_id: the id *or* uuid of a plugin.
        :returns: a :class:`Board` object.
        """
        if strutils.is_int_like(plugin_id):
            return cls.get_by_id(context, plugin_id)
        elif uuidutils.is_uuid_like(plugin_id):
            return cls.get_by_uuid(context, plugin_id)
        else:
            raise exception.InvalidIdentity(identity=plugin_id)
项目:iotronic    作者:openstack    | 项目源码 | 文件源码
def get(cls, context, session_or_board_uuid):
        """Find a session based on its id or uuid and return a SessionWP object.

        :param session_id: the id *or* uuid of a session.
        :returns: a :class:`SessionWP` object.
        """
        if strutils.is_int_like(session_or_board_uuid):
            return cls.get_by_id(context, session_or_board_uuid)
        elif uuidutils.is_uuid_like(session_or_board_uuid):
            return cls.get_by_uuid(context, session_or_board_uuid)
        else:
            raise exception.InvalidIdentity(identity=session_or_board_uuid)
项目:trio2o    作者:openstack    | 项目源码 | 文件源码
def test_request_id(self):
        response = self.app.get('/v2.1/')
        self.assertIn('x-openstack-request-id', response.headers)
        self.assertTrue(
            response.headers['x-openstack-request-id'].startswith('req-'))
        id_part = response.headers['x-openstack-request-id'].split('req-')[1]
        self.assertTrue(uuidutils.is_uuid_like(id_part))
项目:trio2o    作者:openstack    | 项目源码 | 文件源码
def test_request_id(self):
        response = self.app.get('/')
        self.assertIn('x-openstack-request-id', response.headers)
        self.assertTrue(
            response.headers['x-openstack-request-id'].startswith('req-'))
        id_part = response.headers['x-openstack-request-id'].split('req-')[1]
        self.assertTrue(uuidutils.is_uuid_like(id_part))
项目:trio2o    作者:openstack    | 项目源码 | 文件源码
def test_request_id(self):
        response = self.app.get('/')
        self.assertIn('x-openstack-request-id', response.headers)
        self.assertTrue(
            response.headers['x-openstack-request-id'].startswith('req-'))
        id_part = response.headers['x-openstack-request-id'].split('req-')[1]
        self.assertTrue(uuidutils.is_uuid_like(id_part))
项目:meteos    作者:openstack    | 项目源码 | 文件源码
def param2id(object_id):
    """Helper function to convert various id types to internal id.

    args: [object_id], e.g. 'vol-0000000a' or 'volume-0000000a' or '10'
    """
    if uuidutils.is_uuid_like(object_id):
        return object_id
    elif '-' in object_id:
        # FIXME(ja): mapping occurs in nova?
        pass
    else:
        return int(object_id)
项目:zun    作者:openstack    | 项目源码 | 文件源码
def get_neutron_network(self, network):
        if uuidutils.is_uuid_like(network):
            networks = self.neutron.list_networks(id=network)['networks']
        else:
            networks = self.neutron.list_networks(name=network)['networks']

        if len(networks) == 0:
            raise exception.NetworkNotFound(network=network)
        elif len(networks) > 1:
            raise exception.Conflict(_(
                'Multiple neutron networks exist with same name. '
                'Please use the uuid instead.'))

        network = networks[0]
        return network
项目:zun    作者:openstack    | 项目源码 | 文件源码
def get_neutron_port(self, port):
        if uuidutils.is_uuid_like(port):
            ports = self.list_ports(id=port)['ports']
        else:
            ports = self.list_ports(name=port)['ports']

        if len(ports) == 0:
            raise exception.PortNotFound(port=port)
        elif len(ports) > 1:
            raise exception.Conflict(_(
                'Multiple neutron ports exist with same name. '
                'Please use the uuid instead.'))

        port = ports[0]
        return port
项目:zun    作者:openstack    | 项目源码 | 文件源码
def get_resource_provider(self, context, provider_ident):
        if uuidutils.is_uuid_like(provider_ident):
            return self._get_resource_provider_by_uuid(context, provider_ident)
        else:
            return self._get_resource_provider_by_name(context, provider_ident)
项目:zun    作者:openstack    | 项目源码 | 文件源码
def get_resource_class(self, context, resource_ident):
        if uuidutils.is_uuid_like(resource_ident):
            return self._get_resource_class_by_uuid(context, resource_ident)
        else:
            return self._get_resource_class_by_name(context, resource_ident)
项目:zun    作者:openstack    | 项目源码 | 文件源码
def get_resource_class(self, context, ident):
        if uuidutils.is_uuid_like(ident):
            return self._get_resource_class_by_uuid(context, ident)
        else:
            return self._get_resource_class_by_name(context, ident)
项目:zun    作者:openstack    | 项目源码 | 文件源码
def search_volume(self, volume):
        if uuidutils.is_uuid_like(volume):
            volume = self.cinder.volumes.get(volume)
        else:
            try:
                volume = self.cinder.volumes.find(name=volume)
            except cinder_exception.NotFound:
                raise exception.VolumeNotFound(volume=volume)
            except cinder_exception.NoUniqueMatch:
                raise exception.Conflict(_(
                    'Multiple cinder volumes exist with same name. '
                    'Please use the uuid instead.'))

        return volume
项目:zun    作者:openstack    | 项目源码 | 文件源码
def validate(cls, value):
        if not uuidutils.is_uuid_like(value):
            raise exception.InvalidUUID(uuid=value)
        return value
项目:zun    作者:openstack    | 项目源码 | 文件源码
def _check_security_group(self, context, security_group, container):
        if security_group.get("uuid"):
            security_group_id = security_group.get("uuid")
            if not uuidutils.is_uuid_like(security_group_id):
                raise exception.InvalidUUID(uuid=security_group_id)
            if security_group_id in container.security_groups:
                msg = _("security_group %s already present in container") % \
                    security_group_id
                raise exception.InvalidValue(msg)
        else:
            security_group_ids = utils.get_security_group_ids(
                context, [security_group['name']])
            if len(security_group_ids) > len(security_group):
                msg = _("Multiple security group matches "
                        "found for name %(name)s, use an ID "
                        "to be more specific. ") % security_group
                raise exception.Conflict(msg)
            else:
                security_group_id = security_group_ids[0]
        container_ports_detail = utils.list_ports(context, container)

        for container_port_detail in container_ports_detail:
            if security_group_id in container_port_detail['security_groups']:
                msg = _("security_group %s already present in container") % \
                    list(security_group.values())[0]
                raise exception.InvalidValue(msg)
        return security_group_id
项目:zun    作者:openstack    | 项目源码 | 文件源码
def get_resource(resource, resource_ident):
    """Get the resource from the uuid or logical name.

    :param resource: the resource type.
    :param resource_ident: the UUID or logical name of the resource.

    :returns: The resource.
    """
    resource = getattr(objects, resource)

    if uuidutils.is_uuid_like(resource_ident):
        return resource.get_by_uuid(pecan.request.context, resource_ident)

    return resource.get_by_name(pecan.request.context, resource_ident)
项目:zun    作者:openstack    | 项目源码 | 文件源码
def check_active(self, server):
        """Check server status.

        Accepts both server IDs and server objects.
        Returns True if server is ACTIVE,
        raises errors when server has an ERROR or unknown to Zun status,
        returns False otherwise.

        """
        # not checking with is_uuid_like as most tests use strings e.g. '1234'
        if isinstance(server, six.string_types):
            server = self.fetch_server(server)
            if server is None:
                return False
            else:
                status = self.get_status(server)
        else:
            status = self.get_status(server)
            if status != 'ACTIVE':
                self.refresh_server(server)
                status = self.get_status(server)

        if status in self.deferred_server_statuses:
            return False
        elif status == 'ACTIVE':
            return True
        elif status == 'ERROR':
            fault = getattr(server, 'fault', {})
            raise exception.ServerInError(
                resource_status=status,
                status_reason=_("Message: %(message)s, Code: %(code)s") % {
                    'message': fault.get('message', _('Unknown')),
                    'code': fault.get('code', _('Unknown'))
                })
        else:
            raise exception.ServerUnknownStatus(
                resource_status=server.status,
                status_reason=_('Unknown'),
                result=_('Server is not active'))
项目:zun    作者:openstack    | 项目源码 | 文件源码
def get_server_id(self, server, raise_on_error=True):
        if uuidutils.is_uuid_like(server):
            return server
        elif isinstance(server, six.string_types):
            servers = self.client().servers.list(search_opts={'name': server})
            if len(servers) == 1:
                return servers[0].id

            if raise_on_error:
                raise exception.ZunException(_(
                    "Unable to get server id with name %s") % server)
        else:
            raise exception.ZunException(_("Unexpected server type"))
项目:ironic-tempest-plugin    作者:openstack    | 项目源码 | 文件源码
def resource_setup(cls):
        super(BaremetalStandaloneScenarioTest, cls).resource_setup()
        base.set_baremetal_api_microversion(cls.api_microversion)
        for v in cls.mandatory_attr:
            if getattr(cls, v) is None:
                raise lib_exc.InvalidConfiguration(
                    "Mandatory attribute %s not set." % v)
        image_checksum = None
        if not uuidutils.is_uuid_like(cls.image_ref):
            image_checksum = cls.image_checksum
        cls.node = cls.boot_node(cls.driver, cls.image_ref,
                                 image_checksum=image_checksum)
        cls.node_ip = cls.add_floatingip_to_node(cls.node['uuid'])
项目:deb-oslo.utils    作者:openstack    | 项目源码 | 文件源码
def test_is_uuid_like(self):
        self.assertTrue(uuidutils.is_uuid_like(str(uuid.uuid4())))
        self.assertTrue(uuidutils.is_uuid_like(
            '{12345678-1234-5678-1234-567812345678}'))
        self.assertTrue(uuidutils.is_uuid_like(
            '12345678123456781234567812345678'))
        self.assertTrue(uuidutils.is_uuid_like(
            'urn:uuid:12345678-1234-5678-1234-567812345678'))
        self.assertTrue(uuidutils.is_uuid_like(
            'urn:bbbaaaaa-aaaa-aaaa-aabb-bbbbbbbbbbbb'))
        self.assertTrue(uuidutils.is_uuid_like(
            'uuid:bbbaaaaa-aaaa-aaaa-aabb-bbbbbbbbbbbb'))
        self.assertTrue(uuidutils.is_uuid_like(
            '{}---bbb---aaa--aaa--aaa-----aaa---aaa--bbb-bbb---bbb-bbb-bb-{}'))
项目:deb-oslo.utils    作者:openstack    | 项目源码 | 文件源码
def test_is_uuid_like_insensitive(self):
        self.assertTrue(uuidutils.is_uuid_like(str(uuid.uuid4()).upper()))
项目:deb-oslo.utils    作者:openstack    | 项目源码 | 文件源码
def test_id_is_uuid_like(self):
        self.assertFalse(uuidutils.is_uuid_like(1234567))
项目:deb-oslo.utils    作者:openstack    | 项目源码 | 文件源码
def test_name_is_uuid_like(self):
        self.assertFalse(uuidutils.is_uuid_like('zhongyueluo'))
项目:vmware-nsxlib    作者:openstack    | 项目源码 | 文件源码
def _is_uuid_uri(self, uri):
        return uuidutils.is_uuid_like(
            urlparse.urlparse(uri).path.split('/')[-1])
项目:qinling    作者:openstack    | 项目源码 | 文件源码
def validate(value):
        if not uuidutils.is_uuid_like(value):
            raise exc.InputException(
                "Expected a uuid but received %s." % value
            )

        return value
项目:networking-hpe    作者:openstack    | 项目源码 | 文件源码
def _get_access_param(self, context, protocol, creds):
        if const.PROTOCOL_SNMP in protocol:
            if not uuidutils.is_uuid_like(creds):
                access_parameters = db.get_snmp_cred_by_name_and_protocol(
                    context, creds, protocol)
            else:
                access_parameters = db.get_snmp_cred_by_id(context, creds)
        else:
            if not uuidutils.is_uuid_like(creds):
                access_parameters = db.get_netconf_cred_by_name_and_protocol(
                    context, creds, protocol)
            else:
                access_parameters = db.get_netconf_cred_by_id(context, creds)
        if not access_parameters:
            raise webob.exc.HTTPBadRequest(
                _("Credentials not found for Id or name: %s") % creds)
        if isinstance(access_parameters, list) and len(access_parameters) > 1:
            raise webob.exc.HTTPBadRequest(
                _("Multiple credentials matches found "
                  "for name: %s, use an ID to be more specific.") % creds)
        if isinstance(access_parameters, list):
            access_parameters = access_parameters[0]
        if access_parameters['protocol_type'] != protocol:
            raise webob.exc.HTTPBadRequest(
                _("Credentials not found for Id or name: %s") % creds)
        return access_parameters
项目:networking-hpe    作者:openstack    | 项目源码 | 文件源码
def _get_credentials_dict(self, bnp_switch, func_name):
        if not bnp_switch:
            self._raise_ml2_error(wexc.HTTPNotFound, func_name)
        db_context = neutron_context.get_admin_context()
        creds_dict = {}
        creds_dict['ip_address'] = bnp_switch.ip_address
        prov_creds = bnp_switch.credentials
        prov_protocol = bnp_switch.management_protocol
        if hp_const.PROTOCOL_SNMP in prov_protocol:
            if not uuidutils.is_uuid_like(prov_creds):
                snmp_cred = db.get_snmp_cred_by_name(db_context, prov_creds)
                snmp_cred = snmp_cred[0]
            else:
                snmp_cred = db.get_snmp_cred_by_id(db_context, prov_creds)
            if not snmp_cred:
                LOG.error(_LE("Credentials does not match"))
                self._raise_ml2_error(wexc.HTTPNotFound, '')
            creds_dict['write_community'] = snmp_cred.write_community
            creds_dict['security_name'] = snmp_cred.security_name
            creds_dict['security_level'] = snmp_cred.security_level
            creds_dict['auth_protocol'] = snmp_cred.auth_protocol
            creds_dict['management_protocol'] = prov_protocol
            creds_dict['auth_key'] = snmp_cred.auth_key
            creds_dict['priv_protocol'] = snmp_cred.priv_protocol
            creds_dict['priv_key'] = snmp_cred.priv_key
        else:
            if not uuidutils.is_uuid_like(prov_creds):
                netconf_cred = db.get_netconf_cred_by_name(db_context,
                                                           prov_creds)
            else:
                netconf_cred = db.get_netconf_cred_by_id(db_context,
                                                         prov_creds)
            if not netconf_cred:
                LOG.error(_LE("Credentials does not match"))
                self._raise_ml2_error(wexc.HTTPNotFound, '')
            creds_dict['user_name'] = netconf_cred.write_community
            creds_dict['password'] = netconf_cred.security_name
            creds_dict['key_path'] = netconf_cred.security_level
        return creds_dict
项目:networking-hpe    作者:openstack    | 项目源码 | 文件源码
def validate_access_parameters(body):
    """Validate if the request body is in proper format."""
    protocol_dict = deepcopy(body)
    if const.NAME not in protocol_dict.keys():
        raise webob.exc.HTTPBadRequest(
            _("Name not found in request body"))
    if uuidutils.is_uuid_like(protocol_dict['name']):
        raise webob.exc.HTTPBadRequest(
            _("Name=%s should not be in uuid format") %
            protocol_dict['name'])
    protocol_dict.pop('name')
    keys = list(protocol_dict.keys())
    if not len(keys):
        raise webob.exc.HTTPBadRequest(
            _("Request body should have at least one protocol specified"))
    elif len(keys) > 1:
        raise webob.exc.HTTPBadRequest(
            _("multiple protocols in a single request is not supported"))
    key = keys[0]
    if key.lower() not in const.SUPPORTED_PROTOCOLS:
        raise webob.exc.HTTPBadRequest(
            _("'protocol %s' is not supported") % keys)
    if key.lower() == const.SNMP_V3:
        return validate_snmpv3_parameters(protocol_dict, key)
    elif key.lower() in [const.NETCONF_SSH, const.NETCONF_SOAP]:
        return validate_netconf_parameters(protocol_dict, key)
    else:
        return validate_snmp_parameters(protocol_dict, key)
项目:networking-hpe    作者:openstack    | 项目源码 | 文件源码
def validate_access_parameters_for_update(body):
    """Validate if the request body is in proper format."""

    protocol_dict = deepcopy(body)
    if (const.NAME not in protocol_dict.keys() and
       not len(protocol_dict.keys())):
        raise webob.exc.HTTPBadRequest(
            _("Request must have name or one protocol type"))
    if const.NAME in protocol_dict.keys():
        if uuidutils.is_uuid_like(protocol_dict['name']):
            raise webob.exc.HTTPBadRequest(
                _("Name=%s should not be in uuid format") %
                protocol_dict['name'])
        protocol_dict.pop('name')
    if protocol_dict:
        keys = list(protocol_dict.keys())
        if len(keys) > 1:
            raise webob.exc.HTTPBadRequest(
                _("Multiple protocols in a single request is not supported"))
        key = keys[0]
        if key.lower() not in const.SUPPORTED_PROTOCOLS:
            raise webob.exc.HTTPBadRequest(
                _("Protocol %s is not supported") % keys)
        if key.lower() == const.SNMP_V3:
            return validate_snmpv3_parameters_for_update(protocol_dict, key)
        elif key.lower() in [const.NETCONF_SSH, const.NETCONF_SOAP]:
            return validate_netconf_parameters_for_update(protocol_dict, key)
        else:
            return validate_snmp_parameters_for_update(protocol_dict, key)
    else:
        return None
项目:bilean    作者:openstack    | 项目源码 | 文件源码
def _validate_uuid_format(uid):
    return uuidutils.is_uuid_like(uid)
项目:python-karborclient    作者:openstack    | 项目源码 | 文件源码
def take_action(self, parsed_args):
        client = self.app.client_manager.data_protection
        if not uuidutils.is_uuid_like(parsed_args.trigger_id):
            raise exceptions.CommandError(
                "Invalid trigger id provided.")
        so = client.scheduled_operations.create(
            parsed_args.name, parsed_args.operation_type,
            parsed_args.trigger_id, parsed_args.operation_definition)

        format_scheduledoperation(so._info)
        return zip(*sorted(six.iteritems(so._info)))
项目:python-karborclient    作者:openstack    | 项目源码 | 文件源码
def take_action(self, parsed_args):
        client = self.app.client_manager.data_protection
        if not uuidutils.is_uuid_like(parsed_args.provider_id):
            raise exceptions.CommandError(
                "Invalid provider id provided.")
        if not uuidutils.is_uuid_like(parsed_args.checkpoint_id):
            raise exceptions.CommandError(
                "Invalid checkpoint id provided.")

        restore_parameters = utils.extract_parameters(parsed_args)
        restore_auth = None
        if parsed_args.restore_target is not None:
            if parsed_args.restore_username is None:
                raise exceptions.CommandError(
                    "Must specify username for restore_target.")
            if parsed_args.restore_password is None:
                raise exceptions.CommandError(
                    "Must specify password for restore_target.")
            restore_auth = {
                'type': 'password',
                'username': parsed_args.restore_username,
                'password': parsed_args.restore_password,
            }
        restore = client.restores.create(parsed_args.provider_id,
                                         parsed_args.checkpoint_id,
                                         parsed_args.restore_target,
                                         restore_parameters, restore_auth)
        format_restore(restore._info)
        return zip(*sorted(restore._info.items()))