Python voluptuous 模块,Optional() 实例源码

我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用voluptuous.Optional()

项目:cricri    作者:Maillol    | 项目源码 | 文件源码
def setUp(self):
        spy = unittest.mock.Mock()

        class MyClient(Client):
            attr_name = 'my_clients'

            @staticmethod
            def validator():
                return {
                    voluptuous.Required('foo'): int,
                    voluptuous.Optional('bar', default='def bar'): str
                }

            def __init__(self, **kwargs):
                spy(kwargs)

            def close(self):
                spy('close')

        self.spy = spy
        MetaServerTestState.bind_class_client(MyClient)
        self.addCleanup(lambda: MetaServerTestState.forget_client(MyClient))
项目:cricri    作者:Maillol    | 项目源码 | 文件源码
def _build_attributes_validator(mcs):
        """
        Returns validator to validate the sub-classes attributes.
        """

        valid_attributes = {
            Required("commands", 'required class attribute'): [
                {
                    Required("name"): str,
                    Required("cmd"): [str],
                    Optional("kill-signal", default=signal.SIGINT): int
                }
            ]
        }

        for attr_name, class_client in mcs._class_clients.items():
            client_validator = {
                Required("name"): str,
            }
            client_validator.update(class_client.validator())

            key = Optional(attr_name, 'required class attribute')
            valid_attributes[key] = [client_validator]

        return Schema(valid_attributes, extra=ALLOW_EXTRA)
项目:hass_config    作者:azogue    | 项目源码 | 文件源码
def _get_sensor_state(self, entity_id, remote_states=None):

        def _opt_float(x: str) -> Optional[float]:
            try:
                return float(x)
            except ValueError:
                return None

        value = None
        if remote_states is not None:
            value = [_opt_float(s.state) for s in
                     filter(lambda x: x.entity_id == entity_id,
                            remote_states)]
            if value:
                value = value[0]
        else:
            sensor = self.hass.states.get(entity_id)
            if sensor is not None:
                value = _opt_float(sensor.state)
        return value
项目:homeassistant    作者:NAStools    | 项目源码 | 文件源码
def run_information(point_in_time: Optional[datetime]=None):
    """Return information about current run.

    There is also the run that covers point_in_time.
    """
    _verify_instance()

    recorder_runs = get_model('RecorderRuns')
    if point_in_time is None or point_in_time > _INSTANCE.recording_start:
        return recorder_runs(
            end=None,
            start=_INSTANCE.recording_start,
            closed_incorrect=False)

    return query('RecorderRuns').filter(
        (recorder_runs.start < point_in_time) &
        (recorder_runs.end > point_in_time)).first()
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def __init__(self, data):
        """
        Initialize with preparsed data object.

        :param ConfigParser data: config file accessor
        """
        self._data = data or {}

        # self.latitude = None  # type: Optional[float]
        # self.longitude = None  # type: Optional[float]
        # self.elevation = None  # type: Optional[int]
        # self.location_name = None  # type: Optional[str]
        # self.time_zone = None  # type: Optional[str]
        # self.units = METRIC_SYSTEM  # type: UnitSystem

        # # If True, pip install is skipped for requirements on startup
        # self.skip_pip = False  # type: bool

        # # List of loaded components
        # self.components = set()

        # # Remote.API object pointing at local API
        # self.api = None

        # # Directory that holds the configuration
        # self.config_dir = None

        self._scarlett_name = None
        self._coordinates = None
        self._longitude = None
        self._latitude = None
        self._pocketsphinx = None
        self._elevation = None
        self._unit_system = None
        self._time_zone = None
        self._owner_name = None
        self._keyword_list = None
        self._features_enabled = None
        # self._units = METRIC_SYSTEM  # type: UnitSystem
项目:aioautomatic    作者:armills    | 项目源码 | 文件源码
def opt(key):
    """Create an optional key that returns a default of None."""
    return vol.Optional(key, default=None)
项目:monasca-analytics    作者:openstack    | 项目源码 | 文件源码
def validate_config(_config):
        iptables_sql_schema = voluptuous.Schema({
            "module": voluptuous.And(basestring, vu.NoSpaceCharacter()),
            voluptuous.Optional("db_name"): voluptuous.And(
                basestring, vu.NoSpaceCharacter()),
        }, required=True)
        return iptables_sql_schema(_config)
项目:monasca-analytics    作者:openstack    | 项目源码 | 文件源码
def validate_config(_config):
        source_schema = voluptuous.Schema({
            "module": voluptuous.And(basestring, vu.NoSpaceCharacter()),
            "params": {
                "host": voluptuous.And(basestring, vu.NoSpaceCharacter()),
                "port": int,
                "model": {
                    "name": voluptuous.And(basestring, vu.NoSpaceCharacter()),
                    "params": {
                        "origin_types": voluptuous.And([
                            {
                                "origin_type": voluptuous.And(
                                    basestring, vu.NoSpaceCharacter()),
                                "weight": voluptuous.And(
                                    voluptuous.Or(int, float),
                                    voluptuous.Range(
                                        min=0, min_included=False)),
                            }
                        ], vu.NotEmptyArray()),
                        voluptuous.Optional("key_causes"): dict
                    }
                },
                "alerts_per_burst": voluptuous.And(
                    int, voluptuous.Range(min=1)),
                "idle_time_between_bursts": voluptuous.And(
                    voluptuous.Or(int, float),
                    voluptuous.Range(min=0, min_included=False))
            }
        }, required=True)
        return source_schema(_config)
项目:monasca-analytics    作者:openstack    | 项目源码 | 文件源码
def validate_config(_config):
        base_schema = voluptuous.Schema({
            "module": voluptuous.And(
                basestring, lambda i: not any(c.isspace() for c in i)),
            voluptuous.Optional("db_name"): voluptuous.And(
                basestring, lambda i: not any(c.isspace() for c in i)),
        }, required=True)
        return base_schema(_config)
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def schema(self):
        if self.required:
            return {self.name: self.schema_ext}
        else:
            return {voluptuous.Optional(self.name): self.schema_ext}
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def ResourceSchema(schema):
    base_schema = {
        voluptuous.Optional('started_at'): utils.to_datetime,
        voluptuous.Optional('ended_at'): utils.to_datetime,
        voluptuous.Optional('user_id'): voluptuous.Any(None, six.text_type),
        voluptuous.Optional('project_id'): voluptuous.Any(None, six.text_type),
        voluptuous.Optional('metrics'): MetricsSchema,
    }
    base_schema.update(schema)
    return base_schema
项目:pilight    作者:DavidLP    | 项目源码 | 文件源码
def parse_option(option_string):
    if 'OPTION_NO_VALUE' in option_string:
        option = re.findall(r'\"(.*?)\"', option_string)[0]
        # The options without values seem to still need a value
        # when used with pilight-daemon, but this are not mandatory
        # options
        # E.G.: option 'on' is 'on': 1
        return {vol.Optional(option): vol.Coerce(int)}
    elif 'OPTION_HAS_VALUE' in option_string:
        options = re.findall(r'\"(.*?)\"', option_string)
        option = options[0]
        regex = None
        if len(options) > 1:  # Option has specified value by regex
            regex = options[1]
        if 'JSON_NUMBER' in option_string:
            return {vol.Required(option): vol.Coerce(int)}
        elif 'JSON_STRING' in option_string:
            return {vol.Required(option): vol.Coerce(str)}
        else:
            raise
    elif 'OPTION_OPT_VALUE' in option_string:
        options = re.findall(r'\"(.*?)\"', option_string)
        option = options[0]
        regex = None
        if len(options) > 1:  # Option has specified value by regex
            regex = options[1]
        if 'JSON_NUMBER' in option_string:
            return {vol.Required(option): vol.Coerce(int)}
        elif 'JSON_STRING' in option_string:
            return {vol.Required(option): vol.Coerce(str)}
        else:
            raise
    else:
        print(option_string)
        raise

    raise
项目:hass_config    作者:azogue    | 项目源码 | 文件源码
def get_dbt_rh_points(self):
        """Extract temperature - humidity points from sensors."""
        def _mean(values: Union[List[float],
                                Tuple[float]]) -> Optional[float]:
            if values:
                try:
                    return sum(values) / len(values)
                except TypeError:
                    _LOGGER.error('Bad values in mean: %s', values)
            return None

        results = yield from self.collect_states()
        points = {}
        for key, value in results.items():
            if isinstance(value, dict):
                temp_zone = humid_zone = counter = 0
                for k_room, s_values in value.items():
                    temp = _mean([v[0] for v in s_values])
                    humid = _mean([v[1] for v in s_values])
                    if temp is not None and humid is not None:
                        points[k_room] = (int(100 * temp) / 100.,
                                          int(100 * humid) / 100.)
                        temp_zone += temp
                        humid_zone += humid
                        counter += 1
                if counter:
                    points[key] = (int(100 * temp_zone / counter) / 100.,
                                   int(100 * humid_zone / counter) / 100.)
            else:
                temp = _mean([v[0] for v in value])
                humid = _mean([v[1] for v in value])
                if temp is not None and humid is not None:
                    points[key] = (int(100 * temp) / 100.,
                                   int(100 * humid) / 100.)
        return points
项目:homeassistant    作者:NAStools    | 项目源码 | 文件源码
def log_error(e: Exception, retry_wait: Optional[float]=0,
              rollback: Optional[bool]=True,
              message: Optional[str]="Error during query: %s") -> None:
    """Log about SQLAlchemy errors in a sane manner."""
    import sqlalchemy.exc
    if not isinstance(e, sqlalchemy.exc.OperationalError):
        _LOGGER.exception(str(e))
    else:
        _LOGGER.error(message, str(e))
    if rollback:
        Session.rollback()
    if retry_wait:
        _LOGGER.info("Retrying in %s seconds", retry_wait)
        time.sleep(retry_wait)
项目:homeassistant    作者:NAStools    | 项目源码 | 文件源码
def async_load_config(path: str, hass: HomeAssistantType,
                      consider_home: timedelta):
    """Load devices from YAML configuration file.

    This method is a coroutine.
    """
    dev_schema = vol.Schema({
        vol.Required('name'): cv.string,
        vol.Optional('track', default=False): cv.boolean,
        vol.Optional('mac', default=None): vol.Any(None, vol.All(cv.string,
                                                                 vol.Upper)),
        vol.Optional(CONF_AWAY_HIDE, default=DEFAULT_AWAY_HIDE): cv.boolean,
        vol.Optional('gravatar', default=None): vol.Any(None, cv.string),
        vol.Optional('picture', default=None): vol.Any(None, cv.string),
        vol.Optional(CONF_CONSIDER_HOME, default=consider_home): vol.All(
            cv.time_period, cv.positive_timedelta),
        vol.Optional('vendor', default=None): vol.Any(None, cv.string),
    })
    try:
        result = []
        try:
            devices = yield from hass.loop.run_in_executor(
                None, load_yaml_config_file, path)
        except HomeAssistantError as err:
            _LOGGER.error('Unable to load %s: %s', path, str(err))
            return []

        for dev_id, device in devices.items():
            try:
                device = dev_schema(device)
                device['dev_id'] = cv.slugify(dev_id)
            except vol.Invalid as exp:
                async_log_exception(exp, dev_id, devices, hass)
            else:
                result.append(Device(hass, **device))
        return result
    except (HomeAssistantError, FileNotFoundError):
        # When YAML file could not be loaded/did not contain a dict
        return []
项目:monasca-analytics    作者:openstack    | 项目源码 | 文件源码
def _validate_schema(config):
    """Validate the configuration, with spark, up to the orchestration level

    Checks that hte spark configuration is valid, as well as the modules
    structure in the configuration up to the orchestration level.
    Each module will be responsible to validate its own sub-configuration.

    :type config: dict
    :param config: configuration model for the whole system
    :raises: SchemaError -- if the configuration, up to the
             orchestration level, is not valid
    """
    config_schema = voluptuous.Schema({
        "spark_config": {
            "appName": basestring,
            "streaming": {
                "batch_interval": voluptuous.And(int, voluptuous.Range(min=1))
            }
        },
        "server": {
            "port": int,
            "debug": bool
        },
        "sources": {
            voluptuous.Optional(basestring): {basestring: object}
        },
        "ingestors": {
            voluptuous.Optional(basestring): {basestring: object}
        },
        "smls": {
            voluptuous.Optional(basestring): {basestring: object}
        },
        "voters": {
            voluptuous.Optional(basestring): {basestring: object}
        },
        "sinks": {
            voluptuous.Optional(basestring): {basestring: object}
        },
        "ldps": {
            voluptuous.Optional(basestring): {basestring: object}
        },
        "connections": {
            voluptuous.Optional(basestring): [basestring]
        },
        "feedback": {
            voluptuous.Optional(basestring): [basestring]
        }
    }, required=True)
    return config_schema(config)
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def MetricSchema(definition):
        creator = pecan.request.auth_helper.get_current_user(
            pecan.request)

        # First basic validation
        schema = voluptuous.Schema({
            "archive_policy_name": six.text_type,
            "resource_id": functools.partial(ResourceID, creator=creator),
            "name": six.text_type,
            voluptuous.Optional("unit"):
            voluptuous.All(six.text_type, voluptuous.Length(max=31)),
        })
        definition = schema(definition)
        archive_policy_name = definition.get('archive_policy_name')

        name = definition.get('name')
        if name and '/' in name:
            abort(400, "'/' is not supported in metric name")
        if archive_policy_name is None:
            try:
                ap = pecan.request.indexer.get_archive_policy_for_metric(name)
            except indexer.NoArchivePolicyRuleMatch:
                # NOTE(jd) Since this is a schema-like function, we
                # should/could raise ValueError, but if we do so, voluptuous
                # just returns a "invalid value" with no useful message – so we
                # prefer to use abort() to make sure the user has the right
                # error message
                abort(400, "No archive policy name specified "
                      "and no archive policy rule found matching "
                      "the metric name %s" % name)
            else:
                definition['archive_policy_name'] = ap.name

        resource_id = definition.get('resource_id')
        if resource_id is None:
            original_resource_id = None
        else:
            if name is None:
                abort(400,
                      {"cause": "Attribute value error",
                       "detail": "name",
                       "reason": "Name cannot be null "
                       "if resource_id is not null"})
            original_resource_id, resource_id = resource_id

        enforce("create metric", {
            "creator": creator,
            "archive_policy_name": archive_policy_name,
            "resource_id": resource_id,
            "original_resource_id": original_resource_id,
            "name": name,
            "unit": definition.get('unit'),
        })

        return definition