Python voluptuous 模块,Required() 实例源码

我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用voluptuous.Required()

项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def test_socket_timeout():
    """Test socket timeout validator."""
    TEST_CONF_TIMEOUT = 'timeout'

    schema = vol.Schema(
        {vol.Required(TEST_CONF_TIMEOUT, default=None): cv.socket_timeout})

    with pytest.raises(vol.Invalid):
        schema({TEST_CONF_TIMEOUT: 0.0})

    with pytest.raises(vol.Invalid):
        schema({TEST_CONF_TIMEOUT: -1})

    assert _GLOBAL_DEFAULT_TIMEOUT == schema({TEST_CONF_TIMEOUT:
                                              None})[TEST_CONF_TIMEOUT]

    assert 1.0 == schema({TEST_CONF_TIMEOUT: 1})[TEST_CONF_TIMEOUT]
项目:cricri    作者:Maillol    | 项目源码 | 文件源码
def setUp(self):
        spy = unittest.mock.Mock()

        class MyClient(Client):
            attr_name = 'my_clients'

            @staticmethod
            def validator():
                return {
                    voluptuous.Required('foo'): int,
                    voluptuous.Optional('bar', default='def bar'): str
                }

            def __init__(self, **kwargs):
                spy(kwargs)

            def close(self):
                spy('close')

        self.spy = spy
        MetaServerTestState.bind_class_client(MyClient)
        self.addCleanup(lambda: MetaServerTestState.forget_client(MyClient))
项目:boartty    作者:openstack    | 项目源码 | 文件源码
def getSchema(self, data):
        schema = v.Schema({v.Required('servers'): self.servers,
                           'palettes': self.palettes,
                           'palette': str,
                           'keymaps': self.keymaps,
                           'keymap': str,
                           'commentlinks': self.commentlinks,
                           'dashboards': self.dashboards,
                           'reviewkeys': self.reviewkeys,
                           'story-list-query': str,
                           'diff-view': str,
                           'hide-comments': self.hide_comments,
                           'display-times-in-utc': bool,
                           'handle-mouse': bool,
                           'breadcrumbs': bool,
                           'story-list-options': self.story_list_options,
                           'expire-age': str,
                           })
        return schema
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        super(ResourceTypeSchemaManager, self).__init__(*args, **kwargs)
        type_schemas = tuple([ext.plugin.meta_schema()
                              for ext in self.extensions])
        self._schema = voluptuous.Schema({
            "name": six.text_type,
            voluptuous.Required("attributes", default={}): {
                six.text_type: voluptuous.Any(*tuple(type_schemas))
            }
        })

        type_schemas = tuple([ext.plugin.meta_schema(for_update=True)
                              for ext in self.extensions])
        self._schema_for_update = voluptuous.Schema({
            "name": six.text_type,
            voluptuous.Required("attributes", default={}): {
                six.text_type: voluptuous.Any(*tuple(type_schemas))
            }
        })
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def get_pagination_options(params, default):
    try:
        opts = voluptuous.Schema({
            voluptuous.Required(
                "limit", default=pecan.request.conf.api.max_limit):
            voluptuous.All(voluptuous.Coerce(int),
                           voluptuous.Range(min=1),
                           voluptuous.Clamp(
                               min=1, max=pecan.request.conf.api.max_limit)),
            "marker": six.text_type,
            voluptuous.Required("sort", default=default):
            voluptuous.All(
                voluptuous.Coerce(arg_to_list),
                [six.text_type]),
        }, extra=voluptuous.REMOVE_EXTRA)(params)
    except voluptuous.Invalid as e:
        abort(400, {"cause": "Argument value error",
                    "reason": str(e)})
    opts['sorts'] = opts['sort']
    del opts['sort']
    return opts
项目:voat    作者:Cightline    | 项目源码 | 文件源码
def get(self):  
        parser        = reqparse.RequestParser()
        return_data   = []

        parser.add_argument('subvoat_name')

        args = parser.parse_args()

        schema = Schema({ Required('subvoat_name'): All(str, Length(min=self.config['min_length_subvoat_name']))})

        try:
            schema({'subvoat_name':args.get('subvoat_name')})

        except MultipleInvalid as e:
            return {'error':'%s %s' % (e.msg, e.path)}

        posts = self.subvoat_utils.get_posts(args['subvoat_name'])


        for p in posts:
            return_data.append(p)


        return {'result':return_data}
项目:voat    作者:Cightline    | 项目源码 | 文件源码
def page(self, start, end):
        s_schema = Schema({ Required('start'): All(int, Range(min=0))})
        e_schema = Schema({ Required('end'):   All(int, Range(min=0))})

        s_status, s_result = self.try_schema('start', start, s_schema)

        if not s_status:
            return [s_status, s_result]


        e_status, e_result = self.try_schema('end', end, e_schema)

        if not e_status:
            return [e_status, e_result]

        if end < start:
            return [False, 'ending page cannot be lower than starting page']


        total_pages = end - start

        if total_pages > 50:
            return [False, 'you cannot request more than 50 pages']

        return [True, None]
项目:homeassistant    作者:NAStools    | 项目源码 | 文件源码
def test_socket_timeout():
    """Test socket timeout validator."""
    TEST_CONF_TIMEOUT = 'timeout'

    schema = vol.Schema(
        {vol.Required(TEST_CONF_TIMEOUT, default=None): cv.socket_timeout})

    with pytest.raises(vol.Invalid):
        schema({TEST_CONF_TIMEOUT: 0.0})

    with pytest.raises(vol.Invalid):
        schema({TEST_CONF_TIMEOUT: -1})

    assert _GLOBAL_DEFAULT_TIMEOUT == schema({TEST_CONF_TIMEOUT:
                                              None})[TEST_CONF_TIMEOUT]

    assert 1.0 == schema({TEST_CONF_TIMEOUT: 1})[TEST_CONF_TIMEOUT]
项目:home-assistant-dlna-dmr    作者:StevenLooman    | 项目源码 | 文件源码
def _state_variable_create_schema(self, type_info):
        # construct validators
        validators = []

        data_type = type_info['data_type_python']
        validators.append(data_type)

        if 'allowed_values' in type_info:
            allowed_values = type_info['allowed_values']
            in_ = vol.In(allowed_values)  # coerce allowed values? assume always string for now
            validators.append(in_)

        if 'allowed_value_range' in type_info:
            min_ = type_info['allowed_value_range'].get('min', None)
            max_ = type_info['allowed_value_range'].get('max', None)
            min_ = data_type(min_)
            max_ = data_type(max_)
            range_ = vol.Range(min=min_, max=max_)
            validators.append(range_)

        # construct key
        key = vol.Required('value')

        if 'default_value' in type_info:
            default_value = type_info['default_value']
            if data_type == bool:
                default_value = default_value == '1'
            else:
                default_value = data_type(default_value)
            key.default = default_value

        return vol.Schema({key: vol.All(*validators)})
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def meta_schema(cls, for_update=False):
        d = {
            voluptuous.Required('type'): cls.typename,
            voluptuous.Required('required', default=True): bool
        }
        if for_update:
            d[voluptuous.Required('options', default={})] = OperationOptions
        if callable(cls.meta_schema_ext):
            d.update(cls.meta_schema_ext())
        else:
            d.update(cls.meta_schema_ext)
        return d
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def patch(self):
        ap = pecan.request.indexer.get_archive_policy(self.archive_policy)
        if not ap:
            abort(404, six.text_type(
                indexer.NoSuchArchivePolicy(self.archive_policy)))
        enforce("update archive policy", ap)

        body = deserialize_and_validate(voluptuous.Schema({
            voluptuous.Required("definition"):
                voluptuous.All([{
                    "granularity": Timespan,
                    "points": PositiveNotNullInt,
                    "timespan": Timespan}], voluptuous.Length(min=1)),
            }))
        # Validate the data
        try:
            ap_items = [archive_policy.ArchivePolicyItem(**item) for item in
                        body['definition']]
        except ValueError as e:
            abort(400, six.text_type(e))

        try:
            return pecan.request.indexer.update_archive_policy(
                self.archive_policy, ap_items)
        except indexer.UnsupportedArchivePolicyChange as e:
            abort(400, six.text_type(e))
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def post(self):
        enforce("create archive policy", {})
        # NOTE(jd): Initialize this one at run-time because we rely on conf
        conf = pecan.request.conf
        valid_agg_methods = (
            archive_policy.ArchivePolicy.VALID_AGGREGATION_METHODS_VALUES
        )
        ArchivePolicySchema = voluptuous.Schema({
            voluptuous.Required("name"): six.text_type,
            voluptuous.Required("back_window", default=0): PositiveOrNullInt,
            voluptuous.Required(
                "aggregation_methods",
                default=set(conf.archive_policy.default_aggregation_methods)):
            voluptuous.All(list(valid_agg_methods), voluptuous.Coerce(set)),
            voluptuous.Required("definition"):
            voluptuous.All([{
                "granularity": Timespan,
                "points": PositiveNotNullInt,
                "timespan": Timespan,
                }], voluptuous.Length(min=1)),
            })

        body = deserialize_and_validate(ArchivePolicySchema)
        # Validate the data
        try:
            ap = archive_policy.ArchivePolicy.from_dict(body)
        except ValueError as e:
            abort(400, six.text_type(e))
        enforce("create archive policy", ap)
        try:
            ap = pecan.request.indexer.create_archive_policy(ap)
        except indexer.ArchivePolicyAlreadyExists as e:
            abort(409, six.text_type(e))

        location = "/archive_policy/" + ap.name
        set_resp_location_hdr(location)
        pecan.response.status = 201
        return ap
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def patch(self):
        ArchivePolicyRuleSchema = voluptuous.Schema({
            voluptuous.Required("name"): six.text_type,
            })
        body = deserialize_and_validate(ArchivePolicyRuleSchema)
        enforce("update archive policy rule", {})
        try:
            return pecan.request.indexer.update_archive_policy_rule(
                self.archive_policy_rule.name, body["name"])
        except indexer.UnsupportedArchivePolicyRuleChange as e:
            abort(400, six.text_type(e))
项目:pilight    作者:DavidLP    | 项目源码 | 文件源码
def parse_option(option_string):
    if 'OPTION_NO_VALUE' in option_string:
        option = re.findall(r'\"(.*?)\"', option_string)[0]
        # The options without values seem to still need a value
        # when used with pilight-daemon, but this are not mandatory
        # options
        # E.G.: option 'on' is 'on': 1
        return {vol.Optional(option): vol.Coerce(int)}
    elif 'OPTION_HAS_VALUE' in option_string:
        options = re.findall(r'\"(.*?)\"', option_string)
        option = options[0]
        regex = None
        if len(options) > 1:  # Option has specified value by regex
            regex = options[1]
        if 'JSON_NUMBER' in option_string:
            return {vol.Required(option): vol.Coerce(int)}
        elif 'JSON_STRING' in option_string:
            return {vol.Required(option): vol.Coerce(str)}
        else:
            raise
    elif 'OPTION_OPT_VALUE' in option_string:
        options = re.findall(r'\"(.*?)\"', option_string)
        option = options[0]
        regex = None
        if len(options) > 1:  # Option has specified value by regex
            regex = options[1]
        if 'JSON_NUMBER' in option_string:
            return {vol.Required(option): vol.Coerce(int)}
        elif 'JSON_STRING' in option_string:
            return {vol.Required(option): vol.Coerce(str)}
        else:
            raise
    else:
        print(option_string)
        raise

    raise
项目:ratatoskr    作者:ngergo    | 项目源码 | 文件源码
def test_protectron_kwargs_input_schema_with_default():

    @protectron(Schema({Required('a', default=42): int, 'b': int}))
    def foo(a, b):
        return a

    assert foo(b=42) == 42
项目:voat    作者:Cightline    | 项目源码 | 文件源码
def subvoat_name(self, subvoat_name):
        schema = Schema({ Required('subvoat_name'): All(str, Length(min=self.config['min_length_subvoat_name']))})

        return self.try_schema('subvoat_name', subvoat_name, schema)
项目:voat    作者:Cightline    | 项目源码 | 文件源码
def uuid(self, uuid):
        schema = Schema({ Required('uuid'): All(str, Length(min=36, max=36))})

        return self.try_schema('uuid', uuid, schema)
项目:voat    作者:Cightline    | 项目源码 | 文件源码
def comment_body(self, comment_body):
        schema = Schema({ Required('comment_body'): All(str, Length(min=self.config['min_length_comment_body'], 
                                                                    max=self.config['max_length_comment_body']))})

        return self.try_schema('comment_body', comment_body, schema)
项目:voat    作者:Cightline    | 项目源码 | 文件源码
def thread(self, subvoat_name, title, body):
        title_schema = Schema({ Required('title'): All(str, Length(min=self.config['min_length_thread_title'],
                                                             max=self.config['max_length_thread_title']))})

        body_schema = Schema({  Required('body'):  All(str, Length(min=self.config['min_length_thread_body'],
                                                             max=self.config['max_length_thread_body']))})




        # validate the subvoat_name first
        sn_status, sn_result = self.subvoat_name(subvoat_name)

        if not sn_status:
            return [sn_status, sn_result]

        # Then validate the thread title
        t_status, t_result = self.try_schema('title', title, title_schema)

        if not t_status:
            return [t_status, t_result]

        # and thread body
        b_status, b_result = self.try_schema('body', body, body_schema)

        if not b_status:
            return [b_status, b_result]

        # return True if everything is ok
        return [True, None]
项目:voat    作者:Cightline    | 项目源码 | 文件源码
def username(self, username):
        schema = Schema({ Required('username'): All(str, Length(min=self.config['min_length_username'], max=self.config['max_length_username']))})

        return self.try_schema('username', username, schema)
项目:voat    作者:Cightline    | 项目源码 | 文件源码
def password(self, password):
        schema = Schema({ Required('password'): All(str, Length(min=self.config['min_length_password'], max=self.config['max_length_password']))})

        return self.try_schema('password', password, schema)
项目:voat    作者:Cightline    | 项目源码 | 文件源码
def api_token(self, api_token):
        # FIX: make this actually check the token type
        schema = Schema({ Required('api_token'): All(str, Length(min=36, max=36))})

        return self.try_schema('api_token', api_token, schema)
项目:almanach    作者:openstack    | 项目源码 | 文件源码
def __init__(self):
        self.schema = voluptuous.Schema({
            'name': six.text_type,
            'flavor': six.text_type,
            'os': {
                voluptuous.Required('distro'): six.text_type,
                voluptuous.Required('version'): six.text_type,
                voluptuous.Required('os_type'): six.text_type,
            },
            'metadata': dict,
            'start_date': voluptuous.Datetime(),
            'end_date': voluptuous.Datetime(),
        })
项目:homeassistant    作者:NAStools    | 项目源码 | 文件源码
def async_load_config(path: str, hass: HomeAssistantType,
                      consider_home: timedelta):
    """Load devices from YAML configuration file.

    This method is a coroutine.
    """
    dev_schema = vol.Schema({
        vol.Required('name'): cv.string,
        vol.Optional('track', default=False): cv.boolean,
        vol.Optional('mac', default=None): vol.Any(None, vol.All(cv.string,
                                                                 vol.Upper)),
        vol.Optional(CONF_AWAY_HIDE, default=DEFAULT_AWAY_HIDE): cv.boolean,
        vol.Optional('gravatar', default=None): vol.Any(None, cv.string),
        vol.Optional('picture', default=None): vol.Any(None, cv.string),
        vol.Optional(CONF_CONSIDER_HOME, default=consider_home): vol.All(
            cv.time_period, cv.positive_timedelta),
        vol.Optional('vendor', default=None): vol.Any(None, cv.string),
    })
    try:
        result = []
        try:
            devices = yield from hass.loop.run_in_executor(
                None, load_yaml_config_file, path)
        except HomeAssistantError as err:
            _LOGGER.error('Unable to load %s: %s', path, str(err))
            return []

        for dev_id, device in devices.items():
            try:
                device = dev_schema(device)
                device['dev_id'] = cv.slugify(dev_id)
            except vol.Invalid as exp:
                async_log_exception(exp, dev_id, devices, hass)
            else:
                result.append(Device(hass, **device))
        return result
    except (HomeAssistantError, FileNotFoundError):
        # When YAML file could not be loaded/did not contain a dict
        return []