Python voluptuous 模块,All() 实例源码

我们从Python开源项目中,提取了以下29个代码示例,用于说明如何使用voluptuous.All()

项目:eInvoice    作者:dpalominop    | 项目源码 | 文件源码
def validate(self):
        supplier = Schema({
            Required('ruc'): All(str, Length(min=11, max=11)),
            Required('registration_name'): str,
            Optional('address'): dict,
            Optional('commercial_name'): str
        }, extra=ALLOW_EXTRA)
        customer = Schema({
            Required('ruc'): All(str, Length(min=1, max=11))
        }, extra=ALLOW_EXTRA)
        schema = Schema({
            Required('issue_date'): str,
            Required('supplier'): supplier,
            Required('customer'): customer,
            Required('voucher_type'): str,
            Required('currency'): str,
            Required('voucher_number'): str,
            Required('lines'): All(list, Length(min=1))
        }, extra=ALLOW_EXTRA)
        schema(self._data)
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def get_pagination_options(params, default):
    try:
        opts = voluptuous.Schema({
            voluptuous.Required(
                "limit", default=pecan.request.conf.api.max_limit):
            voluptuous.All(voluptuous.Coerce(int),
                           voluptuous.Range(min=1),
                           voluptuous.Clamp(
                               min=1, max=pecan.request.conf.api.max_limit)),
            "marker": six.text_type,
            voluptuous.Required("sort", default=default):
            voluptuous.All(
                voluptuous.Coerce(arg_to_list),
                [six.text_type]),
        }, extra=voluptuous.REMOVE_EXTRA)(params)
    except voluptuous.Invalid as e:
        abort(400, {"cause": "Argument value error",
                    "reason": str(e)})
    opts['sorts'] = opts['sort']
    del opts['sort']
    return opts
项目:voat    作者:Cightline    | 项目源码 | 文件源码
def get(self):  
        parser        = reqparse.RequestParser()
        return_data   = []

        parser.add_argument('subvoat_name')

        args = parser.parse_args()

        schema = Schema({ Required('subvoat_name'): All(str, Length(min=self.config['min_length_subvoat_name']))})

        try:
            schema({'subvoat_name':args.get('subvoat_name')})

        except MultipleInvalid as e:
            return {'error':'%s %s' % (e.msg, e.path)}

        posts = self.subvoat_utils.get_posts(args['subvoat_name'])


        for p in posts:
            return_data.append(p)


        return {'result':return_data}
项目:voat    作者:Cightline    | 项目源码 | 文件源码
def page(self, start, end):
        s_schema = Schema({ Required('start'): All(int, Range(min=0))})
        e_schema = Schema({ Required('end'):   All(int, Range(min=0))})

        s_status, s_result = self.try_schema('start', start, s_schema)

        if not s_status:
            return [s_status, s_result]


        e_status, e_result = self.try_schema('end', end, e_schema)

        if not e_status:
            return [e_status, e_result]

        if end < start:
            return [False, 'ending page cannot be lower than starting page']


        total_pages = end - start

        if total_pages > 50:
            return [False, 'you cannot request more than 50 pages']

        return [True, None]
项目:litecord-reference    作者:lnmds    | 项目源码 | 文件源码
def __init__(self, server):
        self.server = server
        self.guild_man = server.guild_man

        self.channel_edit_base = Schema({
            'name': All(str, Length(min=2, max=100)),
            'position': int,
            Optional('nsfw'): bool,
        }, required=True)

        self.textchan_editschema = self.channel_edit_base.extend({
            'topic': All(str, Length(min=0, max=1024))
        })

        self.voicechan_editschema = self.channel_edit_base.extend({
            'bitrate': All(int, Range(min=8000, max=96000)),
            'user_limit': All(int, Range(min=0, max=99)),
        })

        self.register()
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def enum(enumClass):
    """Create validator for specified enum."""
    return vol.All(vol.In(enumClass.__members__), enumClass.__getitem__)
项目:home-assistant-dlna-dmr    作者:StevenLooman    | 项目源码 | 文件源码
def state_variables(self):
        """Get All UpnpStateVariables for this UpnpService."""
        return self._state_variables
项目:home-assistant-dlna-dmr    作者:StevenLooman    | 项目源码 | 文件源码
def actions(self):
        """Get All UpnpActions for this UpnpService."""
        return self._actions
项目:home-assistant-dlna-dmr    作者:StevenLooman    | 项目源码 | 文件源码
def _state_variable_create_schema(self, type_info):
        # construct validators
        validators = []

        data_type = type_info['data_type_python']
        validators.append(data_type)

        if 'allowed_values' in type_info:
            allowed_values = type_info['allowed_values']
            in_ = vol.In(allowed_values)  # coerce allowed values? assume always string for now
            validators.append(in_)

        if 'allowed_value_range' in type_info:
            min_ = type_info['allowed_value_range'].get('min', None)
            max_ = type_info['allowed_value_range'].get('max', None)
            min_ = data_type(min_)
            max_ = data_type(max_)
            range_ = vol.Range(min=min_, max=max_)
            validators.append(range_)

        # construct key
        key = vol.Required('value')

        if 'default_value' in type_info:
            default_value = type_info['default_value']
            if data_type == bool:
                default_value = default_value == '1'
            else:
                default_value = data_type(default_value)
            key.default = default_value

        return vol.Schema({key: vol.All(*validators)})
项目:dbt    作者:fishtown-analytics    | 项目源码 | 文件源码
def _add_validation(context):
    validation_utils = dbt.utils.AttrDict({
        'any': voluptuous.Any,
        'all': voluptuous.All,
    })

    return dbt.utils.merge(
        context,
        {'validation': validation_utils})
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def schema_ext(self):
        return voluptuous.All(six.text_type,
                              voluptuous.Length(
                                  min=self.min_length,
                                  max=self.max_length))
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def schema_ext(self):
        return voluptuous.All(numbers.Real,
                              voluptuous.Range(min=self.min,
                                               max=self.max))
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def MetricSchema(v):
    """metric keyword schema

    It could be:

    ["metric", "metric-ref", "aggregation"]

    or

    ["metric, ["metric-ref", "aggregation"], ["metric-ref", "aggregation"]]
    """
    if not isinstance(v, (list, tuple)):
        raise voluptuous.Invalid("Expected a tuple/list, got a %s" % type(v))
    elif not v:
        raise voluptuous.Invalid("Operation must not be empty")
    elif len(v) < 2:
        raise voluptuous.Invalid("Operation need at least one argument")
    elif v[0] != u"metric":
        # NOTE(sileht): this error message doesn't looks related to "metric",
        # but because that the last schema validated by voluptuous, we have
        # good chance (voluptuous.Any is not predictable) to print this
        # message even if it's an other operation that invalid.
        raise voluptuous.Invalid("'%s' operation invalid" % v[0])

    return [u"metric"] + voluptuous.Schema(voluptuous.Any(
        voluptuous.ExactSequence([six.text_type, six.text_type]),
        voluptuous.All(
            voluptuous.Length(min=1),
            [voluptuous.ExactSequence([six.text_type, six.text_type])],
        )), required=True)(v[1:])
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def patch(self):
        ap = pecan.request.indexer.get_archive_policy(self.archive_policy)
        if not ap:
            abort(404, six.text_type(
                indexer.NoSuchArchivePolicy(self.archive_policy)))
        enforce("update archive policy", ap)

        body = deserialize_and_validate(voluptuous.Schema({
            voluptuous.Required("definition"):
                voluptuous.All([{
                    "granularity": Timespan,
                    "points": PositiveNotNullInt,
                    "timespan": Timespan}], voluptuous.Length(min=1)),
            }))
        # Validate the data
        try:
            ap_items = [archive_policy.ArchivePolicyItem(**item) for item in
                        body['definition']]
        except ValueError as e:
            abort(400, six.text_type(e))

        try:
            return pecan.request.indexer.update_archive_policy(
                self.archive_policy, ap_items)
        except indexer.UnsupportedArchivePolicyChange as e:
            abort(400, six.text_type(e))
项目:voat    作者:Cightline    | 项目源码 | 文件源码
def subvoat_name(self, subvoat_name):
        schema = Schema({ Required('subvoat_name'): All(str, Length(min=self.config['min_length_subvoat_name']))})

        return self.try_schema('subvoat_name', subvoat_name, schema)
项目:voat    作者:Cightline    | 项目源码 | 文件源码
def uuid(self, uuid):
        schema = Schema({ Required('uuid'): All(str, Length(min=36, max=36))})

        return self.try_schema('uuid', uuid, schema)
项目:voat    作者:Cightline    | 项目源码 | 文件源码
def comment_body(self, comment_body):
        schema = Schema({ Required('comment_body'): All(str, Length(min=self.config['min_length_comment_body'], 
                                                                    max=self.config['max_length_comment_body']))})

        return self.try_schema('comment_body', comment_body, schema)
项目:voat    作者:Cightline    | 项目源码 | 文件源码
def thread(self, subvoat_name, title, body):
        title_schema = Schema({ Required('title'): All(str, Length(min=self.config['min_length_thread_title'],
                                                             max=self.config['max_length_thread_title']))})

        body_schema = Schema({  Required('body'):  All(str, Length(min=self.config['min_length_thread_body'],
                                                             max=self.config['max_length_thread_body']))})




        # validate the subvoat_name first
        sn_status, sn_result = self.subvoat_name(subvoat_name)

        if not sn_status:
            return [sn_status, sn_result]

        # Then validate the thread title
        t_status, t_result = self.try_schema('title', title, title_schema)

        if not t_status:
            return [t_status, t_result]

        # and thread body
        b_status, b_result = self.try_schema('body', body, body_schema)

        if not b_status:
            return [b_status, b_result]

        # return True if everything is ok
        return [True, None]
项目:voat    作者:Cightline    | 项目源码 | 文件源码
def username(self, username):
        schema = Schema({ Required('username'): All(str, Length(min=self.config['min_length_username'], max=self.config['max_length_username']))})

        return self.try_schema('username', username, schema)
项目:voat    作者:Cightline    | 项目源码 | 文件源码
def password(self, password):
        schema = Schema({ Required('password'): All(str, Length(min=self.config['min_length_password'], max=self.config['max_length_password']))})

        return self.try_schema('password', password, schema)
项目:voat    作者:Cightline    | 项目源码 | 文件源码
def api_token(self, api_token):
        # FIX: make this actually check the token type
        schema = Schema({ Required('api_token'): All(str, Length(min=36, max=36))})

        return self.try_schema('api_token', api_token, schema)
项目:homeassistant-config    作者:arsaboo    | 项目源码 | 文件源码
def get(self, request):
        """Handle a GET request."""
        hass = request.app['hass']
        data = request.GET

        if 'code' not in data or 'state' not in data:
            return self.json_message('Authentication failed, not the right '
                                     'variables, try again.', HTTP_BAD_REQUEST)

        self.lyric.authorization_code(code=data['code'], state=data['state'])

        return self.json_message('OK! All good. Got the respons! You can close'
                                 'this window now, and click << Continue >>'
                                 'in the configurator.')
项目:litecord-reference    作者:lnmds    | 项目源码 | 文件源码
def __init__(self, server):
        self.server = server
        self.guild_man = server.guild_man

        _o = Optional
        self.guild_edit_schema = Schema({
            _o('name'): str,
            _o('region'): str,
            _o('verification_level'): int,
            _o('default_message_notifications'): int,
            _o('afk_channel_id'): str,
            _o('afk_timeout'): int,
            _o('icon'): str,
            _o('owner_id'): str,
        }, required=True, extra=REMOVE_EXTRA)

        self.guild_create_schema = Schema({
            'name': str,
            'region': str,
            'icon': Any(None, str),
            'verification_level': int,
            'default_message_notifications': int,
        }, extra=REMOVE_EXTRA)

        self.channel_create_schema = Schema({
            'name': All(str, Length(min=2, max=100)),
            _o('type'): int,
            _o('bitrate'): int,
            _o('user_limit'): int,
            _o('permission_overwrites'): list,
        }, required=True, extra=REMOVE_EXTRA)

        self.register()
项目:litecord-reference    作者:lnmds    | 项目源码 | 文件源码
def __init__(self, server):
        self.server = server
        self.guild_man = server.guild_man

        self.invite_create_schema = Schema({
            Required('max_age', default=86400): All(int, Range(min=10, max=86400)),
            Required('max_uses', default=0): All(int, Range(min=0, max=50)),
            Required('temporary', default=False): bool,
            Required('unique', default=True): bool,
        }, extra=REMOVE_EXTRA)

        self.register(server.app)
项目:homeassistant    作者:NAStools    | 项目源码 | 文件源码
def enum(enumClass):
    """Create validator for specified enum."""
    return vol.All(vol.In(enumClass.__members__), enumClass.__getitem__)
项目:homeassistant    作者:NAStools    | 项目源码 | 文件源码
def async_load_config(path: str, hass: HomeAssistantType,
                      consider_home: timedelta):
    """Load devices from YAML configuration file.

    This method is a coroutine.
    """
    dev_schema = vol.Schema({
        vol.Required('name'): cv.string,
        vol.Optional('track', default=False): cv.boolean,
        vol.Optional('mac', default=None): vol.Any(None, vol.All(cv.string,
                                                                 vol.Upper)),
        vol.Optional(CONF_AWAY_HIDE, default=DEFAULT_AWAY_HIDE): cv.boolean,
        vol.Optional('gravatar', default=None): vol.Any(None, cv.string),
        vol.Optional('picture', default=None): vol.Any(None, cv.string),
        vol.Optional(CONF_CONSIDER_HOME, default=consider_home): vol.All(
            cv.time_period, cv.positive_timedelta),
        vol.Optional('vendor', default=None): vol.Any(None, cv.string),
    })
    try:
        result = []
        try:
            devices = yield from hass.loop.run_in_executor(
                None, load_yaml_config_file, path)
        except HomeAssistantError as err:
            _LOGGER.error('Unable to load %s: %s', path, str(err))
            return []

        for dev_id, device in devices.items():
            try:
                device = dev_schema(device)
                device['dev_id'] = cv.slugify(dev_id)
            except vol.Invalid as exp:
                async_log_exception(exp, dev_id, devices, hass)
            else:
                result.append(Device(hass, **device))
        return result
    except (HomeAssistantError, FileNotFoundError):
        # When YAML file could not be loaded/did not contain a dict
        return []
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def MetricSchema(definition):
        creator = pecan.request.auth_helper.get_current_user(
            pecan.request)

        # First basic validation
        schema = voluptuous.Schema({
            "archive_policy_name": six.text_type,
            "resource_id": functools.partial(ResourceID, creator=creator),
            "name": six.text_type,
            voluptuous.Optional("unit"):
            voluptuous.All(six.text_type, voluptuous.Length(max=31)),
        })
        definition = schema(definition)
        archive_policy_name = definition.get('archive_policy_name')

        name = definition.get('name')
        if name and '/' in name:
            abort(400, "'/' is not supported in metric name")
        if archive_policy_name is None:
            try:
                ap = pecan.request.indexer.get_archive_policy_for_metric(name)
            except indexer.NoArchivePolicyRuleMatch:
                # NOTE(jd) Since this is a schema-like function, we
                # should/could raise ValueError, but if we do so, voluptuous
                # just returns a "invalid value" with no useful message – so we
                # prefer to use abort() to make sure the user has the right
                # error message
                abort(400, "No archive policy name specified "
                      "and no archive policy rule found matching "
                      "the metric name %s" % name)
            else:
                definition['archive_policy_name'] = ap.name

        resource_id = definition.get('resource_id')
        if resource_id is None:
            original_resource_id = None
        else:
            if name is None:
                abort(400,
                      {"cause": "Attribute value error",
                       "detail": "name",
                       "reason": "Name cannot be null "
                       "if resource_id is not null"})
            original_resource_id, resource_id = resource_id

        enforce("create metric", {
            "creator": creator,
            "archive_policy_name": archive_policy_name,
            "resource_id": resource_id,
            "original_resource_id": original_resource_id,
            "name": name,
            "unit": definition.get('unit'),
        })

        return definition
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def _ResourceSearchSchema():
    user = pecan.request.auth_helper.get_current_user(
        pecan.request)
    _ResourceUUID = functools.partial(ResourceUUID, creator=user)

    return voluptuous.Schema(
        voluptuous.All(
            voluptuous.Length(min=0, max=1),
            {
                voluptuous.Any(
                    u"=", u"==", u"eq",
                    u"<", u"lt",
                    u">", u"gt",
                    u"<=", u"?", u"le",
                    u">=", u"?", u"ge",
                    u"!=", u"?", u"ne",
                ): voluptuous.All(
                    voluptuous.Length(min=1, max=1),
                    {"id": _ResourceUUID,
                     NotIDKey: ResourceSearchSchemaAttributeValue},
                ),
                u"like": voluptuous.All(
                    voluptuous.Length(min=1, max=1),
                    {NotIDKey: ResourceSearchSchemaAttributeValue},
                ),
                u"in": voluptuous.All(
                    voluptuous.Length(min=1, max=1),
                    {"id": voluptuous.All(
                        [_ResourceUUID],
                        voluptuous.Length(min=1)),
                     NotIDKey: voluptuous.All(
                         [ResourceSearchSchemaAttributeValue],
                         voluptuous.Length(min=1))}
                ),
                voluptuous.Any(
                    u"and", u"?",
                    u"or", u"?",
                ): voluptuous.All(
                    [ResourceSearchSchema], voluptuous.Length(min=1)
                ),
                u"not": ResourceSearchSchema,
            }
        )
    )
项目:homeassistant    作者:NAStools    | 项目源码 | 文件源码
def async_run(self, variables: Optional[Sequence]=None) -> None:
        """Run script.

        This method is a coroutine.
        """
        if self._cur == -1:
            self._log('Running script')
            self._cur = 0

        # Unregister callback if we were in a delay but turn on is called
        # again. In that case we just continue execution.
        self._async_remove_listener()

        for cur, action in islice(enumerate(self.sequence), self._cur,
                                  None):

            if CONF_DELAY in action:
                # Call ourselves in the future to continue work
                @asyncio.coroutine
                def script_delay(now):
                    """Called after delay is done."""
                    self._async_unsub_delay_listener = None
                    self.hass.async_add_job(self.async_run(variables))

                delay = action[CONF_DELAY]

                if isinstance(delay, template.Template):
                    delay = vol.All(
                        cv.time_period,
                        cv.positive_timedelta)(
                            delay.async_render(variables))

                self._async_unsub_delay_listener = \
                    async_track_point_in_utc_time(
                        self.hass, script_delay,
                        date_util.utcnow() + delay)
                self._cur = cur + 1
                if self._change_listener:
                    self.hass.async_add_job(self._change_listener)
                return

            elif CONF_CONDITION in action:
                if not self._async_check_condition(action, variables):
                    break

            elif CONF_EVENT in action:
                self._async_fire_event(action)

            else:
                yield from self._async_call_service(action, variables)

        self._cur = -1
        self.last_action = None
        if self._change_listener:
            self.hass.async_add_job(self._change_listener)