Python voluptuous 模块,Invalid() 实例源码

我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用voluptuous.Invalid()

项目:bossimage    作者:cloudboss    | 项目源码 | 文件源码
def load_config(path='.boss.yml'):
    loader = j.FileSystemLoader('.')
    try:
        template = loader.load(j.Environment(), path, os.environ)
        yml = template.render()
        doc = yaml.load(yml)
        return transform_config(doc)
    except j.TemplateNotFound:
        error = 'Error loading {}: not found'.format(path)
        raise ConfigurationError(error)
    except j.TemplateSyntaxError as e:
        error = 'Error loading {}: {}, line {}'.format(path, e, e.lineno)
        raise ConfigurationError(error)
    except IOError as e:
        error = 'Error loading {}: {}'.format(path, e.strerror)
        raise ConfigurationError(error)
    except v.Invalid as e:
        error = 'Error validating {}: {}'.format(path, e)
        raise ConfigurationError(error)
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def test_isfile():
    """Validate that the value is an existing file."""
    schema = vol.Schema(cv.isfile)

    fake_file = 'this-file-does-not.exist'
    assert not os.path.isfile(fake_file)

    for value in ('invalid', None, -1, 0, 80000, fake_file):
        with pytest.raises(vol.Invalid):
            schema(value)

    # patching methods that allow us to fake a file existing
    # with write access
    with patch('os.path.isfile', Mock(return_value=True)), \
            patch('os.access', Mock(return_value=True)):
        schema('test.txt')
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def match_all(value):
    """Validator that matches all values."""
    return value


# def platform_validator(domain):
#     """Validate if platform exists for given domain."""
#     def validator(value):
#         """Test if platform exists."""
#         if value is None:
#             raise vol.Invalid('platform cannot be None')
#         if get_platform(domain, str(value)):
#             return value
#         raise vol.Invalid(
#             'platform {} does not exist for {}'.format(value, domain))
#     return validator
项目:cricri    作者:Maillol    | 项目源码 | 文件源码
def __new__(mcs, cls_name, bases, attrs, previous=None, start=False):
        if bases and TestServer in bases:

            for attr_name in mcs._class_clients:
                attrs.setdefault(attr_name, [])

            validate_attributes = mcs._build_attributes_validator()
            try:
                attrs = validate_attributes(attrs)

            except Invalid as error:
                msg = "{} @ {}.{}".format(error.error_message,
                                          cls_name, error.path[0])
                msg += ''.join('[{!r}]'.format(element)
                               for element in error.path[1:])

                raise AttributeError(msg)

        return MetaTestState.__new__(mcs, cls_name, bases, attrs, previous,
                                     start)
项目:cerberus-core    作者:ovh    | 项目源码 | 文件源码
def valid_adapter_response(base_name, func_name, data):
    """
        Valid that given data match described schema

        :param str base_name: The name of the asbtract
        :param str func_name: The name of the called function
        :parma raw data: The data to valid
        :raises InvalidFormatError: if data is not compliant
    """
    if not data:
        return
    try:
        Schemas[base_name][func_name](data)
    except (KeyError, TypeError, ValueError):
        raise SchemaNotFound('Schema not found for %s.%s' % (base_name, func_name))
    except (Invalid, MultipleInvalid) as ex:
        raise InvalidFormatError('Given data is not compliant to %s.%s schema: %s' % (base_name, func_name, str(ex)))
项目:cerberus-core    作者:ovh    | 项目源码 | 文件源码
def validate_body(schema_desc):
    """
        Validate json body
    """
    def real_decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            try:
                body = request.get_json()
                if not Schemas.get(func.__name__):
                    Schemas[func.__name__] = Schema(schema_desc, required=True)
                    Logger.debug(unicode('registering schema for %s' % (func.__name__)))
                Schemas[func.__name__](body)
            except (Invalid, MultipleInvalid) as ex:
                Logger.error(unicode(ex))
                msg = 'Missing or invalid field(s) in body, expecting {}'.format(schema_desc)
                raise BadRequest(msg)
            return func(*args, **kwargs)
        return wrapper
    return real_decorator
项目:monasca-analytics    作者:openstack    | 项目源码 | 文件源码
def post(self):

        try:
            body = json.loads(self.request.body)
            web_service_model.banana_model(body)
            type_table = self._monanas.compute_type_table(body["content"])
            self.write(type_table)
        except (AttributeError, voluptuous.Invalid, ValueError) as e:
            logger.warn("Wrong request: {}.".
                        format(e))
            self.set_status(400, "The request body was malformed.")
        except Exception as e:
            tb = traceback.format_exc()
            print(tb)
            logger.error("Unexpected error: {}. {}".
                         format(sys.exc_info()[0], e))
            self.set_status(500, "Internal server error.")

        self.flush()
        self.finish()
项目:monasca-analytics    作者:openstack    | 项目源码 | 文件源码
def validate_links(links):
    """Validate links to make sure, nothing is missing

    :type links: dict
    :param links: connection links to validate
    :raises: SchemaError -- if any link is missing
    """
    missing = set([])
    all_keys = set(links.keys())
    for connections in links.values():
        for component in connections:
            if component not in all_keys:
                missing.add(component.id())
    if len(missing) > 0:
        raise voluptuous.Invalid([
            "In connections section, the following components are not "
            "connected\n\t{}\n"
            "please modify the configuration so that their list of "
            "connections is at least '[]'".format(", ".join(missing))], [])
项目:monasca-analytics    作者:openstack    | 项目源码 | 文件源码
def _validate_existing_id(config, component_id):
    """Check that the id passed as parameter is defined in the configuration

    :type config: dict
    :param config: configuration model for the whole system
    :type component_id: str
    :param component_id: component ID to be found in configuration
    """
    found_id = False
    for comp_type in valid_connection_types.keys():
        if component_id in config[comp_type].keys():
            found_id = True
    if not found_id:
        raise voluptuous.Invalid([
            'In "connections", component `{}` hasn\'t been defined'
            .format(component_id)
        ], [])
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def get_pagination_options(params, default):
    try:
        opts = voluptuous.Schema({
            voluptuous.Required(
                "limit", default=pecan.request.conf.api.max_limit):
            voluptuous.All(voluptuous.Coerce(int),
                           voluptuous.Range(min=1),
                           voluptuous.Clamp(
                               min=1, max=pecan.request.conf.api.max_limit)),
            "marker": six.text_type,
            voluptuous.Required("sort", default=default):
            voluptuous.All(
                voluptuous.Coerce(arg_to_list),
                [six.text_type]),
        }, extra=voluptuous.REMOVE_EXTRA)(params)
    except voluptuous.Invalid as e:
        abort(400, {"cause": "Argument value error",
                    "reason": str(e)})
    opts['sorts'] = opts['sort']
    del opts['sort']
    return opts
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def post(self):
        schema = pecan.request.indexer.get_resource_type_schema()
        body = deserialize_and_validate(schema)
        body["state"] = "creating"

        try:
            rt = schema.resource_type_from_dict(**body)
        except resource_type.InvalidResourceAttribute as e:
            abort(400, "Invalid input: %s" % e)

        enforce("create resource type", body)
        try:
            rt = pecan.request.indexer.create_resource_type(rt)
        except indexer.ResourceTypeAlreadyExists as e:
            abort(409, six.text_type(e))
        set_resp_location_hdr("/resource_type/" + rt.name)
        pecan.response.status = 201
        return rt
项目:ratatoskr    作者:ngergo    | 项目源码 | 文件源码
def test_register_operation_wraps_protectron_unmatching_schema():

    @register_operation
    @protectron(Schema({'a': int}))
    def mirror2(a):
        return a

    event = {
        'operation': 'mirror2',
        'args': {
            'a': '42'
        }
    }

    with pytest.raises(Invalid):
        assert dispatch_event(event) == 42
项目:drf-url-filters    作者:manjitkumar    | 项目源码 | 文件源码
def Alphanumeric(msg=None):
    '''
    Checks whether a value is:
        - int, or
        - long, or
        - float without a fractional part, or
        - str or unicode composed only of alphanumeric characters
    '''
    def fn(value):
        if not any([
            isinstance(value, numbers.Integral),
            (isinstance(value, float) and value.is_integer()),
            (isinstance(value, basestring) and value.isalnum())
        ]):
            raise Invalid(msg or (
                'Invalid input <{0}>; expected an integer'.format(value))
            )
        else:
            return value
    return fn
项目:drf-url-filters    作者:manjitkumar    | 项目源码 | 文件源码
def StrictlyAlphanumeric(msg=None):
    '''
    Checks whether a value is:
        - str or unicode, and
        - composed of both alphabets and digits
    '''
    def fn(value):
        if not (
            isinstance(value, basestring) and
            value.isalnum() and not
            value.isdigit() and not
            value.isalpha()
        ):
            raise Invalid(msg or (
                'Invalid input <{0}>; expected an integer'.format(value))
            )
        else:
            return value
    return fn
项目:drf-url-filters    作者:manjitkumar    | 项目源码 | 文件源码
def __call__(self, value):
        '''
        Checks whether a value is list of given validation_function.
        Returns list of validated values or just one valid value in
        list if there is only one element in given CSV string.
        '''
        try:
            if isinstance(value, basestring):
                if self.separator in value:
                    seperated_string_values =[item.strip() for item
                        in value.split(self.separator)]
                    values = [self.validation_function(item) for item
                        in seperated_string_values]
                else:
                    values = [self.validation_function(value)]
                return values
            else:
                raise ValueError
        except (Invalid, ValueError) as e:
            raise Invalid(self.msg or
                ('<{0}> is not valid set of <{1}>, {2}'.format(
                    value,
                    self.value_type,
                        e)))
项目:almanach    作者:openstack    | 项目源码 | 文件源码
def test_update_active_instance_entity_with_bad_payload(self):
        self.entity_ctl.update_active_instance_entity.side_effect = ValueError(
            'Expecting object: line 1 column 15 (char 14)'
        )
        instance_id = 'INSTANCE_ID'
        data = {
            'flavor': 'A_FLAVOR',
        }

        code, result = self.api_put('/v1/entity/instance/INSTANCE_ID',
                                    data=data,
                                    headers={'X-Auth-Token': 'some token value'})

        self.entity_ctl.update_active_instance_entity.assert_called_once_with(instance_id=instance_id, **data)
        self.assertIn("error", result)
        self.assertEqual(result["error"], 'Invalid parameter or payload')
        self.assertEqual(code, 400)
项目:almanach    作者:openstack    | 项目源码 | 文件源码
def test_update_active_instance_entity_with_wrong_attribute_raise_exception(self):
        errors = [
            Invalid(message="error message1", path=["my_attribute1"]),
            Invalid(message="error message2", path=["my_attribute2"]),
        ]
        self.entity_ctl.update_active_instance_entity.side_effect = exception.InvalidAttributeException(errors)

        formatted_errors = {
            "my_attribute1": "error message1",
            "my_attribute2": "error message2",
        }

        instance_id = 'INSTANCE_ID'
        data = {
            'flavor': 'A_FLAVOR',
        }

        code, result = self.api_put('/v1/entity/instance/INSTANCE_ID',
                                    data=data,
                                    headers={'X-Auth-Token': 'some token value'})

        self.entity_ctl.update_active_instance_entity.assert_called_once_with(instance_id=instance_id, **data)
        self.assertIn("error", result)
        self.assertEqual(result['error'], formatted_errors)
        self.assertEqual(code, 400)
项目:agdc_statistics    作者:GeoscienceAustralia    | 项目源码 | 文件源码
def valid_format_string(valid_fields):
    """
    Ensure that the provided string can be parsed as a python format string, and contains only `valid_fields`
    :param valid_fields: set or sequence of valid field names
    """
    f = Formatter()
    valid_fields = set(valid_fields)

    def validate_string(format_string):
        fields = set(field_name for _, field_name, _, _ in f.parse(format_string) if field_name)

        if fields < valid_fields:
            return format_string
        else:
            raise Invalid('format string specifies invalid field(s): %s' % (fields - valid_fields))

    return validate_string
项目:homeassistant    作者:NAStools    | 项目源码 | 文件源码
def test_isfile():
    """Validate that the value is an existing file."""
    schema = vol.Schema(cv.isfile)

    fake_file = 'this-file-does-not.exist'
    assert not os.path.isfile(fake_file)

    for value in ('invalid', None, -1, 0, 80000, fake_file):
        with pytest.raises(vol.Invalid):
            schema(value)

    # patching methods that allow us to fake a file existing
    # with write access
    with patch('os.path.isfile', Mock(return_value=True)), \
            patch('os.access', Mock(return_value=True)):
        schema('test.txt')
项目:homeassistant    作者:NAStools    | 项目源码 | 文件源码
def test_socket_timeout():
    """Test socket timeout validator."""
    TEST_CONF_TIMEOUT = 'timeout'

    schema = vol.Schema(
        {vol.Required(TEST_CONF_TIMEOUT, default=None): cv.socket_timeout})

    with pytest.raises(vol.Invalid):
        schema({TEST_CONF_TIMEOUT: 0.0})

    with pytest.raises(vol.Invalid):
        schema({TEST_CONF_TIMEOUT: -1})

    assert _GLOBAL_DEFAULT_TIMEOUT == schema({TEST_CONF_TIMEOUT:
                                              None})[TEST_CONF_TIMEOUT]

    assert 1.0 == schema({TEST_CONF_TIMEOUT: 1})[TEST_CONF_TIMEOUT]
项目:homeassistant    作者:NAStools    | 项目源码 | 文件源码
def test_config_error(self):
        """Test for configuration errors."""
        with self.assertRaises(vol.Invalid):
            unifi.PLATFORM_SCHEMA({
                # no username
                CONF_PLATFORM: unifi.DOMAIN,
                CONF_HOST: 'myhost',
                'port': 123,
            })
        with self.assertRaises(vol.Invalid):
            unifi.PLATFORM_SCHEMA({
                CONF_PLATFORM: unifi.DOMAIN,
                CONF_USERNAME: 'foo',
                CONF_PASSWORD: 'password',
                CONF_HOST: 'myhost',
                'port': 'foo',  # bad port!
            })
项目:homeassistant    作者:NAStools    | 项目源码 | 文件源码
def socket_timeout(value):
    """Validate timeout float > 0.0.

    None coerced to socket._GLOBAL_DEFAULT_TIMEOUT bare object.
    """
    if value is None:
        return _GLOBAL_DEFAULT_TIMEOUT
    else:
        try:
            float_value = float(value)
            if float_value > 0.0:
                return float_value
            raise vol.Invalid('Invalid socket timeout value.'
                              ' float > 0.0 required.')
        except Exception as _:
            raise vol.Invalid('Invalid socket timeout: {err}'.format(err=_))


# pylint: disable=no-value-for-parameter
项目:homeassistant    作者:NAStools    | 项目源码 | 文件源码
def post(self, request):
        """Accept the POST request for push registrations from a browser."""
        try:
            data = yield from request.json()
        except ValueError:
            return self.json_message('Invalid JSON', HTTP_BAD_REQUEST)

        try:
            data = REGISTER_SCHEMA(data)
        except vol.Invalid as ex:
            return self.json_message(humanize_error(data, ex),
                                     HTTP_BAD_REQUEST)

        name = ensure_unique_string('unnamed device',
                                    self.registrations.keys())

        self.registrations[name] = data

        if not _save_config(self.json_path, self.registrations):
            return self.json_message('Error saving registration.',
                                     HTTP_INTERNAL_SERVER_ERROR)

        return self.json_message('Push notification subscriber registered.')
项目:homeassistant    作者:NAStools    | 项目源码 | 文件源码
def post(self, request):
        """Handle the POST request for device identification."""
        try:
            req_data = yield from request.json()
        except ValueError:
            return self.json_message('Invalid JSON', HTTP_BAD_REQUEST)

        try:
            data = IDENTIFY_SCHEMA(req_data)
        except vol.Invalid as ex:
            return self.json_message(humanize_error(request.json, ex),
                                     HTTP_BAD_REQUEST)

        name = data.get(ATTR_DEVICE_ID)

        CONFIG_FILE[ATTR_DEVICES][name] = data

        if not _save_config(CONFIG_FILE_PATH, CONFIG_FILE):
            return self.json_message("Error saving device.",
                                     HTTP_INTERNAL_SERVER_ERROR)

        return self.json({"status": "registered"})
项目:picoCTF    作者:picoCTF    | 项目源码 | 文件源码
def check(*callback_tuples):
    """
    Voluptuous wrapper function to raise our APIException

    Args:
        callback_tuples: a callback_tuple should contain (status, msg, callbacks)
    Returns:
        Returns a function callback for the Schema
    """

    def v(value):
        """
        Trys to validate the value with the given callbacks.

        Args:
            value: the item to validate
        Raises:
            APIException with the given error code and msg.
        Returns:
            The value if the validation callbacks are satisfied.
        """

        for msg, callbacks in callback_tuples:
            for callback in callbacks:
                try:
                    result = callback(value)
                    if not result and type(result) == bool:
                        raise Invalid()
                except Exception:
                    raise WebException(msg)
        return value
    return v
项目:picoCTF    作者:royragsdale    | 项目源码 | 文件源码
def check(*callback_tuples):
    """
    Voluptuous wrapper function to raise our APIException

    Args:
        callback_tuples: a callback_tuple should contain (status, msg, callbacks)
    Returns:
        Returns a function callback for the Schema
    """

    def v(value):
        """
        Trys to validate the value with the given callbacks.

        Args:
            value: the item to validate
        Raises:
            APIException with the given error code and msg.
        Returns:
            The value if the validation callbacks are satisfied.
        """

        for msg, callbacks in callback_tuples:
            for callback in callbacks:
                try:
                    result = callback(value)
                    if not result and type(result) == bool:
                        raise Invalid()
                except Exception:
                    raise WebException(msg)
        return value
    return v
项目:correios-lib    作者:trocafone    | 项目源码 | 文件源码
def CNPJ(value):
    if not validate_cnpj(value):
        raise Invalid("Invalid CNPJ")
    return clean_id(value)
项目:correios-lib    作者:trocafone    | 项目源码 | 文件源码
def CPF(value):
    if not validate_cpf(value):
        raise Invalid("Invalid CPF")
    return clean_id(value)
项目:correios-lib    作者:trocafone    | 项目源码 | 文件源码
def CEP(value):
    try:
        format_cep(value)
    except ValueError as e:
        raise Invalid(e)

    return clean_id(value)
项目:correios-lib    作者:trocafone    | 项目源码 | 文件源码
def Date(value):
    fmt = '%d/%m/%Y'
    try:
        if isinstance(value, int):
            value = datetime.date.today() + datetime.timedelta(value)

        return value.strftime(fmt)
    except Exception:
        raise Invalid("Should be an instance of datetime.datetime")
项目:correios-lib    作者:trocafone    | 项目源码 | 文件源码
def __init__(self, **kargs):
        super(EntityBase, self).__init__(kargs)
        self.validate()

        try:
            raise getattr(self, 'error')
        except AttributeError:
            pass
        except Invalid as e:
            raise MultipleInvalid([e])
项目:correios-lib    作者:trocafone    | 项目源码 | 文件源码
def test_invalid_ceps(self):
        for i in self.invalid_cases:
            self.assertRaises(Invalid, CEP, i)
项目:correios-lib    作者:trocafone    | 项目源码 | 文件源码
def test_invalid_prices(self):
        for i in self.invalid_cases:
            self.assertRaises(Invalid, Price, i)
项目:correios-lib    作者:trocafone    | 项目源码 | 文件源码
def test_invalid_cnpjs(self):
        for i in self.invalid_cases:
            self.assertRaises(Invalid, CNPJ, i)
项目:correios-lib    作者:trocafone    | 项目源码 | 文件源码
def test_invalid_emails(self):
        for i in self.invalid_cases:
            self.assertRaises(Invalid, Email, i)
项目:correios-lib    作者:trocafone    | 项目源码 | 文件源码
def test_invalid_dates(self):
        for i in self.invalid_cases:
            self.assertRaises(Invalid, Date, i)
项目:correios-lib    作者:trocafone    | 项目源码 | 文件源码
def CNPJ(value):
    if not validate_cnpj(value):
        raise Invalid("Invalid CNPJ")
    return clean_id(value)
项目:correios-lib    作者:trocafone    | 项目源码 | 文件源码
def CPF(value):
    if not validate_cpf(value):
        raise Invalid("Invalid CPF")
    return clean_id(value)
项目:correios-lib    作者:trocafone    | 项目源码 | 文件源码
def CEP(value):
    try:
        format_cep(value)
    except ValueError as e:
        raise Invalid(e)

    return clean_id(value)
项目:correios-lib    作者:trocafone    | 项目源码 | 文件源码
def Price(value):
    if not (type(value) == int or type(value) == float):
        raise Invalid("This price is not and integer or a float.")
    value = float(value)
    return "%.2f" % value
项目:correios-lib    作者:trocafone    | 项目源码 | 文件源码
def __init__(self, **kargs):
        super(EntityBase, self).__init__(kargs)
        self.validate()

        try:
            raise getattr(self, 'error')
        except AttributeError:
            pass
        except Invalid as e:
            raise MultipleInvalid([e])
项目:correios-lib    作者:trocafone    | 项目源码 | 文件源码
def test_invalid_ceps(self):
        for i in self.invalid_cases:
            self.assertRaises(Invalid, CEP, i)
项目:correios-lib    作者:trocafone    | 项目源码 | 文件源码
def test_invalid_prices(self):
        for i in self.invalid_cases:
            self.assertRaises(Invalid, Price, i)
项目:correios-lib    作者:trocafone    | 项目源码 | 文件源码
def test_invalid_cnpjs(self):
        for i in self.invalid_cases:
            self.assertRaises(Invalid, CNPJ, i)
项目:correios-lib    作者:trocafone    | 项目源码 | 文件源码
def test_invalid_cnpjs(self):
        for i in self.invalid_cases:
            self.assertRaises(Invalid, CPF, i)
项目:correios-lib    作者:trocafone    | 项目源码 | 文件源码
def test_invalid_dates(self):
        for i in self.invalid_cases:
            self.assertRaises(Invalid, Date, i)
项目:bossimage    作者:cloudboss    | 项目源码 | 文件源码
def invalid(kind, item):
    return v.Invalid('Invalid {}: {}'.format(kind, item))
项目:xgovctf    作者:alphagov    | 项目源码 | 文件源码
def check(*callback_tuples):
    """
    Voluptuous wrapper function to raise our APIException

    Args:
        callback_tuples: a callback_tuple should contain (status, msg, callbacks)
    Returns:
        Returns a function callback for the Schema
    """

    def v(value):
        """
        Trys to validate the value with the given callbacks.

        Args:
            value: the item to validate
        Raises:
            APIException with the given error code and msg.
        Returns:
            The value if the validation callbacks are satisfied.
        """

        for msg, callbacks in callback_tuples:
            for callback in callbacks:
                try:
                    result = callback(value)
                    if not result and type(result) == bool:
                        raise Invalid()
                except Exception:
                    raise WebException(msg)
        return value
    return v