Python voluptuous 模块,Any() 实例源码

我们从Python开源项目中,提取了以下20个代码示例,用于说明如何使用voluptuous.Any()

项目:jarvis-kube    作者:lahsivjar    | 项目源码 | 文件源码
def __init__(self):
    self.redisinstance = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0)
    self.validateschema = Schema({
        Required(RegistryKeys.SERVICE_ID): Any(str, unicode),
        Required(RegistryKeys.SERVICE_NAME): Any(str, unicode),
        Required(RegistryKeys.SERVICE_PORT): int,
        Required(RegistryKeys.NAMESPACE, default='default'): Any(str, unicode),
        Required(RegistryKeys.ENDPOINTS): [{
          Required(RegistryKeys.URI): Any(str, unicode),
          Required(RegistryKeys.ACCEPTANCE_REGEX): Any(str, unicode),
          RegistryKeys.FILTER_REGEX: {
            Required(RegistryKeys.PATTERN): Any(str, unicode),
            Required(RegistryKeys.REPLACE): Any(str, unicode)
          }
        }]
    })
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        super(ResourceTypeSchemaManager, self).__init__(*args, **kwargs)
        type_schemas = tuple([ext.plugin.meta_schema()
                              for ext in self.extensions])
        self._schema = voluptuous.Schema({
            "name": six.text_type,
            voluptuous.Required("attributes", default={}): {
                six.text_type: voluptuous.Any(*tuple(type_schemas))
            }
        })

        type_schemas = tuple([ext.plugin.meta_schema(for_update=True)
                              for ext in self.extensions])
        self._schema_for_update = voluptuous.Schema({
            "name": six.text_type,
            voluptuous.Required("attributes", default={}): {
                six.text_type: voluptuous.Any(*tuple(type_schemas))
            }
        })
项目:tcconfig    作者:thombashi    | 项目源码 | 文件源码
def load_tcconfig(self, config_file_path):
        import json
        from voluptuous import Schema, Required, Any, ALLOW_EXTRA

        schema = Schema({
            Required(six.text_type): {
                Any(*TrafficDirection.LIST): {
                    six.text_type: {
                        six.text_type: Any(six.text_type, int, float)
                    },
                }
            },
        }, extra=ALLOW_EXTRA)

        with open(config_file_path) as fp:
            self.__config_table = json.load(fp)

        schema(self.__config_table)
        self.__logger.debug("tc config file: {:s}".format(
            json.dumps(self.__config_table, indent=4)))
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def boolean(value: Any) -> bool:
    """Validate and coerce a boolean value."""
    if isinstance(value, str):
        value = value.lower()
        if value in ('1', 'true', 'yes', 'on', 'enable'):
            return True
        if value in ('0', 'false', 'no', 'off', 'disable'):
            return False
        raise vol.Invalid('invalid boolean value {}'.format(value))
    return bool(value)
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def isfile(value: Any) -> str:
    """Validate that the value is an existing file."""
    if value is None:
        raise vol.Invalid('None is not file')
    file_in = os.path.expanduser(str(value))

    if not os.path.isfile(file_in):
        raise vol.Invalid('not a file')
    if not os.access(file_in, os.R_OK):
        raise vol.Invalid('file not readable')
    return file_in
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def entity_id(value: Any) -> str:
    """Validate Entity ID."""
    value = string(value).lower()
    if valid_entity_id(value):
        return value
    raise vol.Invalid('Entity ID {} is an invalid entity id'.format(value))
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def string(value: Any) -> str:
    """Coerce value to string, except for None."""
    if value is not None:
        return str(value)
    raise vol.Invalid('string value is None')
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def url(value: Any) -> str:
    """Validate an URL."""
    url_in = str(value)

    if urlparse(url_in).scheme in ['http', 'https']:
        return vol.Schema(vol.Url())(url_in)

    raise vol.Invalid('invalid url')
项目:dbt    作者:fishtown-analytics    | 项目源码 | 文件源码
def _add_validation(context):
    validation_utils = dbt.utils.AttrDict({
        'any': voluptuous.Any,
        'all': voluptuous.All,
    })

    return dbt.utils.merge(
        context,
        {'validation': validation_utils})
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def MetricSchema(v):
    """metric keyword schema

    It could be:

    ["metric", "metric-ref", "aggregation"]

    or

    ["metric, ["metric-ref", "aggregation"], ["metric-ref", "aggregation"]]
    """
    if not isinstance(v, (list, tuple)):
        raise voluptuous.Invalid("Expected a tuple/list, got a %s" % type(v))
    elif not v:
        raise voluptuous.Invalid("Operation must not be empty")
    elif len(v) < 2:
        raise voluptuous.Invalid("Operation need at least one argument")
    elif v[0] != u"metric":
        # NOTE(sileht): this error message doesn't looks related to "metric",
        # but because that the last schema validated by voluptuous, we have
        # good chance (voluptuous.Any is not predictable) to print this
        # message even if it's an other operation that invalid.
        raise voluptuous.Invalid("'%s' operation invalid" % v[0])

    return [u"metric"] + voluptuous.Schema(voluptuous.Any(
        voluptuous.ExactSequence([six.text_type, six.text_type]),
        voluptuous.All(
            voluptuous.Length(min=1),
            [voluptuous.ExactSequence([six.text_type, six.text_type])],
        )), required=True)(v[1:])
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def OperationsSchema(v):
    if isinstance(v, six.text_type):
        try:
            v = pyparsing.OneOrMore(
                pyparsing.nestedExpr()).parseString(v).asList()[0]
        except pyparsing.ParseException as e:
            api.abort(400, {"cause": "Invalid operations",
                            "reason": "Fail to parse the operations string",
                            "detail": six.text_type(e)})
    return voluptuous.Schema(voluptuous.Any(*OperationsSchemaBase),
                             required=True)(v)
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def ResourceSchema(schema):
    base_schema = {
        voluptuous.Optional('started_at'): utils.to_datetime,
        voluptuous.Optional('ended_at'): utils.to_datetime,
        voluptuous.Optional('user_id'): voluptuous.Any(None, six.text_type),
        voluptuous.Optional('project_id'): voluptuous.Any(None, six.text_type),
        voluptuous.Optional('metrics'): MetricsSchema,
    }
    base_schema.update(schema)
    return base_schema
项目:litecord-reference    作者:lnmds    | 项目源码 | 文件源码
def __init__(self, server):
        self.server = server
        self.guild_man = server.guild_man

        _o = Optional
        self.guild_edit_schema = Schema({
            _o('name'): str,
            _o('region'): str,
            _o('verification_level'): int,
            _o('default_message_notifications'): int,
            _o('afk_channel_id'): str,
            _o('afk_timeout'): int,
            _o('icon'): str,
            _o('owner_id'): str,
        }, required=True, extra=REMOVE_EXTRA)

        self.guild_create_schema = Schema({
            'name': str,
            'region': str,
            'icon': Any(None, str),
            'verification_level': int,
            'default_message_notifications': int,
        }, extra=REMOVE_EXTRA)

        self.channel_create_schema = Schema({
            'name': All(str, Length(min=2, max=100)),
            _o('type'): int,
            _o('bitrate'): int,
            _o('user_limit'): int,
            _o('permission_overwrites'): list,
        }, required=True, extra=REMOVE_EXTRA)

        self.register()
项目:homeassistant    作者:NAStools    | 项目源码 | 文件源码
def boolean(value: Any) -> bool:
    """Validate and coerce a boolean value."""
    if isinstance(value, str):
        value = value.lower()
        if value in ('1', 'true', 'yes', 'on', 'enable'):
            return True
        if value in ('0', 'false', 'no', 'off', 'disable'):
            return False
        raise vol.Invalid('invalid boolean value {}'.format(value))
    return bool(value)
项目:homeassistant    作者:NAStools    | 项目源码 | 文件源码
def isfile(value: Any) -> str:
    """Validate that the value is an existing file."""
    if value is None:
        raise vol.Invalid('None is not file')
    file_in = os.path.expanduser(str(value))

    if not os.path.isfile(file_in):
        raise vol.Invalid('not a file')
    if not os.access(file_in, os.R_OK):
        raise vol.Invalid('file not readable')
    return file_in
项目:homeassistant    作者:NAStools    | 项目源码 | 文件源码
def entity_id(value: Any) -> str:
    """Validate Entity ID."""
    value = string(value).lower()
    if valid_entity_id(value):
        return value
    raise vol.Invalid('Entity ID {} is an invalid entity id'.format(value))
项目:homeassistant    作者:NAStools    | 项目源码 | 文件源码
def string(value: Any) -> str:
    """Coerce value to string, except for None."""
    if value is not None:
        return str(value)
    raise vol.Invalid('string value is None')
项目:homeassistant    作者:NAStools    | 项目源码 | 文件源码
def url(value: Any) -> str:
    """Validate an URL."""
    url_in = str(value)

    if urlparse(url_in).scheme in ['http', 'https']:
        return vol.Schema(vol.Url())(url_in)

    raise vol.Invalid('invalid url')
项目:homeassistant    作者:NAStools    | 项目源码 | 文件源码
def async_setup_scanner_platform(hass: HomeAssistantType, config: ConfigType,
                                 scanner: Any, async_see_device: Callable):
    """Helper method to connect scanner-based platform to device tracker.

    This method is a coroutine.
    """
    interval = config.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)

    # Initial scan of each mac we also tell about host name for config
    seen = set()  # type: Any

    def device_tracker_scan(now: dt_util.dt.datetime):
        """Called when interval matches."""
        found_devices = scanner.scan_devices()

        for mac in found_devices:
            if mac in seen:
                host_name = None
            else:
                host_name = scanner.get_device_name(mac)
                seen.add(mac)
            hass.add_job(async_see_device(mac=mac, host_name=host_name))

    async_track_utc_time_change(
        hass, device_tracker_scan, second=range(0, 60, interval))

    hass.async_add_job(device_tracker_scan, None)
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def _ResourceSearchSchema():
    user = pecan.request.auth_helper.get_current_user(
        pecan.request)
    _ResourceUUID = functools.partial(ResourceUUID, creator=user)

    return voluptuous.Schema(
        voluptuous.All(
            voluptuous.Length(min=0, max=1),
            {
                voluptuous.Any(
                    u"=", u"==", u"eq",
                    u"<", u"lt",
                    u">", u"gt",
                    u"<=", u"?", u"le",
                    u">=", u"?", u"ge",
                    u"!=", u"?", u"ne",
                ): voluptuous.All(
                    voluptuous.Length(min=1, max=1),
                    {"id": _ResourceUUID,
                     NotIDKey: ResourceSearchSchemaAttributeValue},
                ),
                u"like": voluptuous.All(
                    voluptuous.Length(min=1, max=1),
                    {NotIDKey: ResourceSearchSchemaAttributeValue},
                ),
                u"in": voluptuous.All(
                    voluptuous.Length(min=1, max=1),
                    {"id": voluptuous.All(
                        [_ResourceUUID],
                        voluptuous.Length(min=1)),
                     NotIDKey: voluptuous.All(
                         [ResourceSearchSchemaAttributeValue],
                         voluptuous.Length(min=1))}
                ),
                voluptuous.Any(
                    u"and", u"?",
                    u"or", u"?",
                ): voluptuous.All(
                    [ResourceSearchSchema], voluptuous.Length(min=1)
                ),
                u"not": ResourceSearchSchema,
            }
        )
    )