Python voluptuous 模块,Length() 实例源码

我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用voluptuous.Length()

项目:lxdock    作者:lxdock    | 项目源码 | 文件源码
def test_raise_invalid_if_provisioner_schema_is_not_satisfied(self, mock_Provisioner):
        mock_Provisioner.provisioners = {
            'mp1': MockProvisioner1,
            'mp2': MockProvisioner2,
            'mp3': MockProvisioner3}
        schema = get_schema()
        with pytest.raises(Invalid) as e:
            schema({
                'name': 'dummy-test',
                'provisioning': [{
                    'type': 'mp1',
                    'a': 'dummy',
                    'b': '16'
                }, {
                    'type': 'mp2',
                    'a': 'dummydummy',  # Exceeds Length(min=5, max=5)
                }, {
                    'type': 'mp3',
                    'b': 'yes'
                }]
            })
        assert "['provisioning'][1]['a']" in str(e)
项目:eInvoice    作者:dpalominop    | 项目源码 | 文件源码
def validate(self):
        supplier = Schema({
            Required('ruc'): All(str, Length(min=11, max=11)),
            Required('registration_name'): str,
            Optional('address'): dict,
            Optional('commercial_name'): str
        }, extra=ALLOW_EXTRA)
        customer = Schema({
            Required('ruc'): All(str, Length(min=1, max=11))
        }, extra=ALLOW_EXTRA)
        schema = Schema({
            Required('issue_date'): str,
            Required('supplier'): supplier,
            Required('customer'): customer,
            Required('voucher_type'): str,
            Required('currency'): str,
            Required('voucher_number'): str,
            Required('lines'): All(list, Length(min=1))
        }, extra=ALLOW_EXTRA)
        schema(self._data)
项目:voat    作者:Cightline    | 项目源码 | 文件源码
def get(self):  
        parser        = reqparse.RequestParser()
        return_data   = []

        parser.add_argument('subvoat_name')

        args = parser.parse_args()

        schema = Schema({ Required('subvoat_name'): All(str, Length(min=self.config['min_length_subvoat_name']))})

        try:
            schema({'subvoat_name':args.get('subvoat_name')})

        except MultipleInvalid as e:
            return {'error':'%s %s' % (e.msg, e.path)}

        posts = self.subvoat_utils.get_posts(args['subvoat_name'])


        for p in posts:
            return_data.append(p)


        return {'result':return_data}
项目:litecord-reference    作者:lnmds    | 项目源码 | 文件源码
def __init__(self, server):
        self.server = server
        self.guild_man = server.guild_man

        self.channel_edit_base = Schema({
            'name': All(str, Length(min=2, max=100)),
            'position': int,
            Optional('nsfw'): bool,
        }, required=True)

        self.textchan_editschema = self.channel_edit_base.extend({
            'topic': All(str, Length(min=0, max=1024))
        })

        self.voicechan_editschema = self.channel_edit_base.extend({
            'bitrate': All(int, Range(min=8000, max=96000)),
            'user_limit': All(int, Range(min=0, max=99)),
        })

        self.register()
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def schema_ext(self):
        return voluptuous.All(six.text_type,
                              voluptuous.Length(
                                  min=self.min_length,
                                  max=self.max_length))
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def MetricSchema(v):
    """metric keyword schema

    It could be:

    ["metric", "metric-ref", "aggregation"]

    or

    ["metric, ["metric-ref", "aggregation"], ["metric-ref", "aggregation"]]
    """
    if not isinstance(v, (list, tuple)):
        raise voluptuous.Invalid("Expected a tuple/list, got a %s" % type(v))
    elif not v:
        raise voluptuous.Invalid("Operation must not be empty")
    elif len(v) < 2:
        raise voluptuous.Invalid("Operation need at least one argument")
    elif v[0] != u"metric":
        # NOTE(sileht): this error message doesn't looks related to "metric",
        # but because that the last schema validated by voluptuous, we have
        # good chance (voluptuous.Any is not predictable) to print this
        # message even if it's an other operation that invalid.
        raise voluptuous.Invalid("'%s' operation invalid" % v[0])

    return [u"metric"] + voluptuous.Schema(voluptuous.Any(
        voluptuous.ExactSequence([six.text_type, six.text_type]),
        voluptuous.All(
            voluptuous.Length(min=1),
            [voluptuous.ExactSequence([six.text_type, six.text_type])],
        )), required=True)(v[1:])
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def patch(self):
        ap = pecan.request.indexer.get_archive_policy(self.archive_policy)
        if not ap:
            abort(404, six.text_type(
                indexer.NoSuchArchivePolicy(self.archive_policy)))
        enforce("update archive policy", ap)

        body = deserialize_and_validate(voluptuous.Schema({
            voluptuous.Required("definition"):
                voluptuous.All([{
                    "granularity": Timespan,
                    "points": PositiveNotNullInt,
                    "timespan": Timespan}], voluptuous.Length(min=1)),
            }))
        # Validate the data
        try:
            ap_items = [archive_policy.ArchivePolicyItem(**item) for item in
                        body['definition']]
        except ValueError as e:
            abort(400, six.text_type(e))

        try:
            return pecan.request.indexer.update_archive_policy(
                self.archive_policy, ap_items)
        except indexer.UnsupportedArchivePolicyChange as e:
            abort(400, six.text_type(e))
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def post(self):
        enforce("create archive policy", {})
        # NOTE(jd): Initialize this one at run-time because we rely on conf
        conf = pecan.request.conf
        valid_agg_methods = (
            archive_policy.ArchivePolicy.VALID_AGGREGATION_METHODS_VALUES
        )
        ArchivePolicySchema = voluptuous.Schema({
            voluptuous.Required("name"): six.text_type,
            voluptuous.Required("back_window", default=0): PositiveOrNullInt,
            voluptuous.Required(
                "aggregation_methods",
                default=set(conf.archive_policy.default_aggregation_methods)):
            voluptuous.All(list(valid_agg_methods), voluptuous.Coerce(set)),
            voluptuous.Required("definition"):
            voluptuous.All([{
                "granularity": Timespan,
                "points": PositiveNotNullInt,
                "timespan": Timespan,
                }], voluptuous.Length(min=1)),
            })

        body = deserialize_and_validate(ArchivePolicySchema)
        # Validate the data
        try:
            ap = archive_policy.ArchivePolicy.from_dict(body)
        except ValueError as e:
            abort(400, six.text_type(e))
        enforce("create archive policy", ap)
        try:
            ap = pecan.request.indexer.create_archive_policy(ap)
        except indexer.ArchivePolicyAlreadyExists as e:
            abort(409, six.text_type(e))

        location = "/archive_policy/" + ap.name
        set_resp_location_hdr(location)
        pecan.response.status = 201
        return ap
项目:voat    作者:Cightline    | 项目源码 | 文件源码
def subvoat_name(self, subvoat_name):
        schema = Schema({ Required('subvoat_name'): All(str, Length(min=self.config['min_length_subvoat_name']))})

        return self.try_schema('subvoat_name', subvoat_name, schema)
项目:voat    作者:Cightline    | 项目源码 | 文件源码
def uuid(self, uuid):
        schema = Schema({ Required('uuid'): All(str, Length(min=36, max=36))})

        return self.try_schema('uuid', uuid, schema)
项目:voat    作者:Cightline    | 项目源码 | 文件源码
def comment_body(self, comment_body):
        schema = Schema({ Required('comment_body'): All(str, Length(min=self.config['min_length_comment_body'], 
                                                                    max=self.config['max_length_comment_body']))})

        return self.try_schema('comment_body', comment_body, schema)
项目:voat    作者:Cightline    | 项目源码 | 文件源码
def thread(self, subvoat_name, title, body):
        title_schema = Schema({ Required('title'): All(str, Length(min=self.config['min_length_thread_title'],
                                                             max=self.config['max_length_thread_title']))})

        body_schema = Schema({  Required('body'):  All(str, Length(min=self.config['min_length_thread_body'],
                                                             max=self.config['max_length_thread_body']))})




        # validate the subvoat_name first
        sn_status, sn_result = self.subvoat_name(subvoat_name)

        if not sn_status:
            return [sn_status, sn_result]

        # Then validate the thread title
        t_status, t_result = self.try_schema('title', title, title_schema)

        if not t_status:
            return [t_status, t_result]

        # and thread body
        b_status, b_result = self.try_schema('body', body, body_schema)

        if not b_status:
            return [b_status, b_result]

        # return True if everything is ok
        return [True, None]
项目:voat    作者:Cightline    | 项目源码 | 文件源码
def password(self, password):
        schema = Schema({ Required('password'): All(str, Length(min=self.config['min_length_password'], max=self.config['max_length_password']))})

        return self.try_schema('password', password, schema)
项目:voat    作者:Cightline    | 项目源码 | 文件源码
def api_token(self, api_token):
        # FIX: make this actually check the token type
        schema = Schema({ Required('api_token'): All(str, Length(min=36, max=36))})

        return self.try_schema('api_token', api_token, schema)
项目:litecord-reference    作者:lnmds    | 项目源码 | 文件源码
def __init__(self, server):
        self.server = server
        self.guild_man = server.guild_man

        _o = Optional
        self.guild_edit_schema = Schema({
            _o('name'): str,
            _o('region'): str,
            _o('verification_level'): int,
            _o('default_message_notifications'): int,
            _o('afk_channel_id'): str,
            _o('afk_timeout'): int,
            _o('icon'): str,
            _o('owner_id'): str,
        }, required=True, extra=REMOVE_EXTRA)

        self.guild_create_schema = Schema({
            'name': str,
            'region': str,
            'icon': Any(None, str),
            'verification_level': int,
            'default_message_notifications': int,
        }, extra=REMOVE_EXTRA)

        self.channel_create_schema = Schema({
            'name': All(str, Length(min=2, max=100)),
            _o('type'): int,
            _o('bitrate'): int,
            _o('user_limit'): int,
            _o('permission_overwrites'): list,
        }, required=True, extra=REMOVE_EXTRA)

        self.register()
项目:homeassistant    作者:NAStools    | 项目源码 | 文件源码
def valid_subscribe_topic(value, invalid_chars='\0'):
    """Validate that we can subscribe using this MQTT topic."""
    if isinstance(value, str) and all(c not in value for c in invalid_chars):
        return vol.Length(min=1, max=65535)(value)
    raise vol.Invalid('Invalid MQTT topic name')
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def MetricSchema(definition):
        creator = pecan.request.auth_helper.get_current_user(
            pecan.request)

        # First basic validation
        schema = voluptuous.Schema({
            "archive_policy_name": six.text_type,
            "resource_id": functools.partial(ResourceID, creator=creator),
            "name": six.text_type,
            voluptuous.Optional("unit"):
            voluptuous.All(six.text_type, voluptuous.Length(max=31)),
        })
        definition = schema(definition)
        archive_policy_name = definition.get('archive_policy_name')

        name = definition.get('name')
        if name and '/' in name:
            abort(400, "'/' is not supported in metric name")
        if archive_policy_name is None:
            try:
                ap = pecan.request.indexer.get_archive_policy_for_metric(name)
            except indexer.NoArchivePolicyRuleMatch:
                # NOTE(jd) Since this is a schema-like function, we
                # should/could raise ValueError, but if we do so, voluptuous
                # just returns a "invalid value" with no useful message – so we
                # prefer to use abort() to make sure the user has the right
                # error message
                abort(400, "No archive policy name specified "
                      "and no archive policy rule found matching "
                      "the metric name %s" % name)
            else:
                definition['archive_policy_name'] = ap.name

        resource_id = definition.get('resource_id')
        if resource_id is None:
            original_resource_id = None
        else:
            if name is None:
                abort(400,
                      {"cause": "Attribute value error",
                       "detail": "name",
                       "reason": "Name cannot be null "
                       "if resource_id is not null"})
            original_resource_id, resource_id = resource_id

        enforce("create metric", {
            "creator": creator,
            "archive_policy_name": archive_policy_name,
            "resource_id": resource_id,
            "original_resource_id": original_resource_id,
            "name": name,
            "unit": definition.get('unit'),
        })

        return definition