Python voluptuous 模块,MultipleInvalid() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用voluptuous.MultipleInvalid()

项目:picoCTF    作者:picoCTF    | 项目源码 | 文件源码
def validate(schema, data):
    """
    A wrapper around the call to voluptuous schema to raise the proper exception.

    Args:
        schema: The voluptuous Schema object
        data: The validation data for the schema object

    Raises:
        APIException with status 0 and the voluptuous error message
    """

    try:
        schema(data)
    except MultipleInvalid as error:
        raise APIException(0, None, error.msg)
项目:picoCTF    作者:picoCTF    | 项目源码 | 文件源码
def get_problem(problem_path):
    """
    Retrieve a problem spec from a given problem directory.

    Args:
        problem_path: path to the root of the problem directory.

    Returns:
        A problem object.
    """

    json_path = join(problem_path, "problem.json")
    problem = json.loads(open(json_path, "r").read())

    try:
        problem_schema(problem)
    except MultipleInvalid as e:
        logger.critical("Error validating problem object at '%s'!", json_path)
        logger.critical(e)
        raise FatalException

    return problem
项目:picoCTF    作者:royragsdale    | 项目源码 | 文件源码
def validate(schema, data):
    """
    A wrapper around the call to voluptuous schema to raise the proper exception.

    Args:
        schema: The voluptuous Schema object
        data: The validation data for the schema object

    Raises:
        APIException with status 0 and the voluptuous error message
    """

    try:
        schema(data)
    except MultipleInvalid as error:
        raise APIException(0, None, error.msg)
项目:picoCTF    作者:royragsdale    | 项目源码 | 文件源码
def get_problem(problem_path):
    """
    Retrieve a problem spec from a given problem directory.

    Args:
        problem_path: path to the root of the problem directory.

    Returns:
        A problem object.
    """

    json_path = join(problem_path, "problem.json")
    problem = json.loads(open(json_path, "r").read())

    try:
        problem_schema(problem)
    except MultipleInvalid as e:
        logger.critical("Error validating problem object at '%s'!", json_path)
        logger.critical(e)
        raise FatalException

    return problem
项目:picoCTF    作者:royragsdale    | 项目源码 | 文件源码
def get_bundle(bundle_path):
    """
    Retrieve a bundle spec from a given bundle directory.

    Args:
        bundle_path: path to the root of the bundle directory.

    Returns:
        A bundle object.
    """

    json_path = join(bundle_path, "bundle.json")

    bundle = json.loads(open(json_path, "r").read())

    try:
        bundle_schema(bundle)
    except MultipleInvalid as e:
        logger.critical("Error validating bundle object at '%s'!", json_path)
        logger.critical(e)
        raise FatalException

    return bundle
项目:xgovctf    作者:alphagov    | 项目源码 | 文件源码
def validate(schema, data):
    """
    A wrapper around the call to voluptuous schema to raise the proper exception.

    Args:
        schema: The voluptuous Schema object
        data: The validation data for the schema object

    Raises:
        APIException with status 0 and the voluptuous error message
    """

    try:
        schema(data)
    except MultipleInvalid as error:
        raise APIException(0, None, error.msg)
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def test_entity_ids():
    """Test entity ID validation."""
    schema = vol.Schema(cv.entity_ids)

    for value in (
        'invalid_entity',
        'sensor.light,sensor_invalid',
        ['invalid_entity'],
        ['sensor.light', 'sensor_invalid'],
        ['sensor.light,sensor_invalid'],
    ):
        with pytest.raises(vol.MultipleInvalid):
            schema(value)

    for value in (
        [],
        ['sensor.light'],
        'sensor.light'
    ):
        schema(value)

    assert schema('sensor.LIGHT, light.kitchen ') == [
        'sensor.light', 'light.kitchen'
    ]
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def test_time_period():
    """Test time_period validation."""
    schema = vol.Schema(cv.time_period)

    for value in (
        None, '', 'hello:world', '12:', '12:34:56:78',
        {}, {'wrong_key': -10}
    ):
        with pytest.raises(vol.MultipleInvalid):
            schema(value)

    for value in (
        '8:20', '23:59', '-8:20', '-23:59:59', '-48:00', {'minutes': 5}, 1, '5'
    ):
        schema(value)

    assert timedelta(seconds=180) == schema('180')
    assert timedelta(hours=23, minutes=59) == schema('23:59')
    assert -1 * timedelta(hours=1, minutes=15) == schema('-1:15')
项目:cerberus-core    作者:ovh    | 项目源码 | 文件源码
def valid_adapter_response(base_name, func_name, data):
    """
        Valid that given data match described schema

        :param str base_name: The name of the asbtract
        :param str func_name: The name of the called function
        :parma raw data: The data to valid
        :raises InvalidFormatError: if data is not compliant
    """
    if not data:
        return
    try:
        Schemas[base_name][func_name](data)
    except (KeyError, TypeError, ValueError):
        raise SchemaNotFound('Schema not found for %s.%s' % (base_name, func_name))
    except (Invalid, MultipleInvalid) as ex:
        raise InvalidFormatError('Given data is not compliant to %s.%s schema: %s' % (base_name, func_name, str(ex)))
项目:cerberus-core    作者:ovh    | 项目源码 | 文件源码
def validate_body(schema_desc):
    """
        Validate json body
    """
    def real_decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            try:
                body = request.get_json()
                if not Schemas.get(func.__name__):
                    Schemas[func.__name__] = Schema(schema_desc, required=True)
                    Logger.debug(unicode('registering schema for %s' % (func.__name__)))
                Schemas[func.__name__](body)
            except (Invalid, MultipleInvalid) as ex:
                Logger.error(unicode(ex))
                msg = 'Missing or invalid field(s) in body, expecting {}'.format(schema_desc)
                raise BadRequest(msg)
            return func(*args, **kwargs)
        return wrapper
    return real_decorator
项目:foreman-yml    作者:adfinis-sygroup    | 项目源码 | 文件源码
def process_cleanup_arch(self):
        log.log(log.LOG_INFO, "Processing Cleanup of Architectures")
        for arch in self.get_config_section('cleanup-architecture'):
            try:
                self.validator.cleanup_arch(arch)
            except MultipleInvalid as e:
                log.log(log.LOG_WARN, "Cannot delete Architecture '{0}': YAML validation Error: {1}".format(arch['name'], e))
                continue

            try:
                self.fm.architectures.show(arch['name'])['id']
                log.log(log.LOG_INFO, "Delete Architecture '{0}'".format(arch['name']))

                self.fm.architectures.destroy( arch['name'] )
            except:
                log.log(log.LOG_WARN, "Architecture '{0}' already absent.".format(arch['name']))
项目:foreman-yml    作者:adfinis-sygroup    | 项目源码 | 文件源码
def process_cleanup_computeprfl(self):
        log.log(log.LOG_INFO, "Processing Cleanup of Compute profiles")
        for computeprfl in self.get_config_section('cleanup-compute-profile'):
            try:
                self.validator.cleanup_computeprfl(computeprfl)
            except MultipleInvalid as e:
                log.log(log.LOG_WARN, "Cannot delete Compute profile '{0}': YAML validation Error: {1}".format(computeprfl['name'], e))
                continue

            try:
                self.fm.compute_profiles.show(computeprfl['name'])['id']
                log.log(log.LOG_INFO, "Delete Compute profile '{0}'".format(computeprfl['name']))

                self.fm.compute_profiles.destroy( computeprfl['name'] )
            except:
                log.log(log.LOG_WARN, "Compute profile '{0}' already absent.".format(computeprfl['name']))
项目:foreman-yml    作者:adfinis-sygroup    | 项目源码 | 文件源码
def process_cleanup_medium(self):
        log.log(log.LOG_INFO, "Processing Cleanup of Media")
        medialist = self.fm.media.index(per_page=99999)['results']
        for medium in self.get_config_section('cleanup-medium'):
            try:
                self.validator.cleanup_medium(medium)
            except MultipleInvalid as e:
                log.log(log.LOG_WARN, "Cannot delete Medium '{0}': YAML validation Error: {1}".format(medium['name'], e))
                continue

            medium_deleted = False
            # fm.media.show(name) does not work, we need to iterate over fm.media.index()
            for mediac in medialist:
                if (mediac['name'] == medium['name']):
                    medium_deleted = True
                    log.log(log.LOG_INFO, "Delete Medium '{0}'".format(medium['name']))

                    self.fm.media.destroy( medium['name'] )
                    continue
            if not medium_deleted:
                log.log(log.LOG_WARN, "Medium '{0}' already absent.".format(medium['name']))
项目:foreman-yml    作者:adfinis-sygroup    | 项目源码 | 文件源码
def process_cleanup_ptable(self):
        log.log(log.LOG_INFO, "Processing Cleanup of Partition Tables")
        for ptable in self.get_config_section('cleanup-partition-table'):
            try:
                self.validator.cleanup_ptable(ptable)
            except MultipleInvalid as e:
                log.log(log.LOG_WARN, "Cannot delete Partition Table '{0}': YAML validation Error: {1}".format(ptable['name'], e))
                continue

            try:
                self.fm.ptables.show(ptable['name'])['id']
                log.log(log.LOG_INFO, "Delete Partition Table '{0}'".format(ptable['name']))

                self.fm.ptables.destroy( ptable['name'] )
            except:
                log.log(log.LOG_WARN, "Partition Table '{0}' already absent.".format(ptable['name']))
项目:foreman-yml    作者:adfinis-sygroup    | 项目源码 | 文件源码
def process_cleanup_provisioningtpl(self):
        log.log(log.LOG_INFO, "Processing Cleanup of Provisioning Templates")
        ptlist = self.fm.provisioning_templates.index(per_page=99999)['results']
        for pt in self.get_config_section('cleanup-provisioning-template'):
            try:
                self.validator.cleanup_provt(pt)
            except MultipleInvalid as e:
                log.log(log.LOG_WARN, "Cannot delete Provisioning Template '{0}': YAML validation Error: {1}".format(pt['name'], e))
                continue

            # fm.provisioning_templates.show(name) does not work as expected, we need to iterate over fm.provisioning_templates.index()
            pt_deleted = False
            for ptc in ptlist:
                if (ptc['name'] == pt['name']):
                    pt_deleted = True
                    log.log(log.LOG_INFO, "Delete Provisioning Template '{0}'".format(pt['name']))

                    self.fm.provisioning_templates.destroy( pt['name'] )
                    continue
            if not pt_deleted:
                log.log(log.LOG_WARN, "Provisioning Template '{0}' already absent.".format(pt['name']))
项目:foreman-yml    作者:adfinis-sygroup    | 项目源码 | 文件源码
def process_config_enviroment(self):
        log.log(log.LOG_INFO, "Processing Environments")
        envlist = self.fm.environments.index(per_page=99999)['results']
        for env in self.get_config_section('environment'):
            try:
                self.validator.enviroment(env)
            except MultipleInvalid as e:
                log.log(log.LOG_WARN, "Cannot create Environment '{0}': YAML validation Error: {1}".format(env['name'], e))
                continue

            env_id = False
            # fm.media.show(name) does not work, we need to iterate over fm.media.index()
            for envc in envlist:
                if (env['name'] == envc['name']):
                    env_id = envc['id']
                    log.log(log.LOG_DEBUG, "Environment '{0}' (id={1}) already present.".format(env['name'], env_id))
                    continue
            if not env_id:
                log.log(log.LOG_INFO, "Create Environment '{0}'".format(env['name']))
                self.fm.environments.create( environment = { 'name': env['name'] } )
项目:foreman-yml    作者:adfinis-sygroup    | 项目源码 | 文件源码
def process_config_model(self):
        log.log(log.LOG_INFO, "Processing Models")
        for model in self.get_config_section('model'):
            try:
                self.validator.model(model)
            except MultipleInvalid as e:
                log.log(log.LOG_WARN, "Cannot create Model '{0}': YAML validation Error: {1}".format(model['name'], e))
                continue
            try:
                model_id = self.fm.models.show(model['name'])['id']
                log.log(log.LOG_DEBUG, "Model '{0}' (id={1}) already present.".format(model['name'], model_id))
            except:
                log.log(log.LOG_INFO, "Create Model '{0}'".format(model['name']))
                model_tpl = {
                    'name':             model['name'],
                    'info':             model['info'],
                    'vendor_class':     model['vendor-class'],
                    'hardware_model':   model['hardware-model']
                }
                self.fm.models.create( model = model_tpl )
项目:foreman-yml    作者:adfinis-sygroup    | 项目源码 | 文件源码
def process_config_medium(self):
        log.log(log.LOG_INFO, "Processing Media")
        medialist = self.fm.media.index(per_page=99999)['results']
        for medium in self.get_config_section('medium'):
            try:
                self.validator.medium(medium)
            except MultipleInvalid as e:
                log.log(log.LOG_WARN, "Cannot create Media '{0}': YAML validation Error: {1}".format(medium['name'], e))
                continue

            medium_id = False
            # fm.media.show(name) does not work, we need to iterate over fm.media.index()
            for mediac in medialist:
                if (mediac['name'] == medium['name']):
                    medium_id = mediac['id']
                    log.log(log.LOG_DEBUG, "Medium '{0}' (id={1}) already present.".format(medium['name'], medium_id))
            if not medium_id:
                log.log(log.LOG_INFO, "Create Medium '{0}'".format(medium['name']))
                medium_tpl = {
                    'name':        medium['name'],
                    'path':        medium['path'],
                    'os_family':   medium['os-family']
                }
                self.fm.media.create( medium = medium_tpl )
项目:foreman-yml    作者:adfinis-sygroup    | 项目源码 | 文件源码
def process_config_settings(self):
        log.log(log.LOG_INFO, "Processing Foreman Settings")
        for setting in self.get_config_section('setting'):
            try:
                self.validator.setting(setting)
            except MultipleInvalid as e:
                log.log(log.LOG_WARN, "Cannot update Setting '{0}': YAML validation Error: {1}".format(setting['name'], e))
                continue

            setting_id = False
            try:
                setting_id = self.fm.settings.show(setting['name'])['id']
            except:
                log.log(log.LOG_WARN, "Cannot get ID of Setting '{0}', skipping".format(setting['name']))

            setting_tpl = {
                'value':            setting['value']
            }

            if setting_id:
                log.log(log.LOG_INFO, "Update Setting '{0}'".format(setting['name']))
                self.fm.settings.update(setting_tpl, setting_id)
项目:foreman-yml    作者:adfinis-sygroup    | 项目源码 | 文件源码
def process_auth_sources_ldap(self):
        log.log(log.LOG_INFO, "Processing LDAP auth sources")
        for auth in self.get_config_section('auth-source-ldap'):
            # validate yaml
            try:
                self.validator.auth_source_ldaps(auth)
            except MultipleInvalid as e:
                log.log(log.LOG_WARN, "Cannot create LDAP source '{0}': YAML validation Error: {1}".format(auth['name'], e))
                continue
            try:
                as_id   = self.fm.auth_source_ldaps.show(auth['name'])['id']
                log.log(log.LOG_WARN, "LDAP source {0} allready exists".format(auth['name']))
                continue
            except TypeError:
                pass
            ldap_auth_obj = self.dict_underscore(auth)
            try:
                self.fm.auth_source_ldaps.create( auth_source_ldap=ldap_auth_obj )
            except:
                log.log(log.LOG_ERROR, "Something went wrong creating LDAP source {0}".format(auth['name']))
项目:voat    作者:Cightline    | 项目源码 | 文件源码
def get(self):  
        parser        = reqparse.RequestParser()
        return_data   = []

        parser.add_argument('subvoat_name')

        args = parser.parse_args()

        schema = Schema({ Required('subvoat_name'): All(str, Length(min=self.config['min_length_subvoat_name']))})

        try:
            schema({'subvoat_name':args.get('subvoat_name')})

        except MultipleInvalid as e:
            return {'error':'%s %s' % (e.msg, e.path)}

        posts = self.subvoat_utils.get_posts(args['subvoat_name'])


        for p in posts:
            return_data.append(p)


        return {'result':return_data}
项目:homeassistant    作者:NAStools    | 项目源码 | 文件源码
def test_entity_ids():
    """Test entity ID validation."""
    schema = vol.Schema(cv.entity_ids)

    for value in (
        'invalid_entity',
        'sensor.light,sensor_invalid',
        ['invalid_entity'],
        ['sensor.light', 'sensor_invalid'],
        ['sensor.light,sensor_invalid'],
    ):
        with pytest.raises(vol.MultipleInvalid):
            schema(value)

    for value in (
        [],
        ['sensor.light'],
        'sensor.light'
    ):
        schema(value)

    assert schema('sensor.LIGHT, light.kitchen ') == [
        'sensor.light', 'light.kitchen'
    ]
项目:homeassistant    作者:NAStools    | 项目源码 | 文件源码
def test_event_schema():
    """Test event_schema validation."""
    for value in (
        {}, None,
        {
            'event_data': {},
        },
        {
            'event': 'state_changed',
            'event_data': 1,
        },
    ):
        with pytest.raises(vol.MultipleInvalid):
            cv.EVENT_SCHEMA(value)

    for value in (
        {'event': 'state_changed'},
        {'event': 'state_changed', 'event_data': {'hello': 'world'}},
    ):
        cv.EVENT_SCHEMA(value)
项目:homeassistant    作者:NAStools    | 项目源码 | 文件源码
def test_core_config_schema(self):
        """Test core config schema."""
        for value in (
            {CONF_UNIT_SYSTEM: 'K'},
            {'time_zone': 'non-exist'},
            {'latitude': '91'},
            {'longitude': -181},
            {'customize': 'bla'},
            {'customize': {'invalid_entity_id': {}}},
            {'customize': {'light.sensor': 100}},
        ):
            with pytest.raises(MultipleInvalid):
                config_util.CORE_CONFIG_SCHEMA(value)

        config_util.CORE_CONFIG_SCHEMA({
            'name': 'Test name',
            'latitude': '-23.45',
            'longitude': '123.45',
            CONF_UNIT_SYSTEM: CONF_UNIT_SYSTEM_METRIC,
            'customize': {
                'sensor.temperature': {
                    'hidden': True,
                },
            },
        })
项目:picoCTF    作者:picoCTF    | 项目源码 | 文件源码
def get_bundle(bundle_path):
    """
    Retrieve a bundle spec from a given bundle directory.

    Args:
        bundle_path: path to the root of the bundle directory.

    Returns:
        A bundle object.
    """

    json_path = join(bundle_path, "bundle.json")

    bundle = json.loads(open(json_path, "r").read())

    try:
        bundle_schema(bundle)
    except MultipleInvalid as e:
        logger.critical("Error validating bundle object at '%s'!", json_path)
        logger.critical(e)
        raise FatalException

    return bundle
项目:picoCTF    作者:picoCTF    | 项目源码 | 文件源码
def verify_config(config_object):
    """
    Verifies the given configuration dict against the config_schema and the port_range_schema
    Raise FatalException if failed.

    Args:
        config_object: The configuration options in a dict
    """

    try:
        config_schema(config_object)
    except MultipleInvalid as e:
        logger.critical("Error validating config file at '%s'!", path)
        logger.critical(e)
        raise FatalException

    for port_range in config_object["banned_ports"]:
        try:
            port_range_schema(port_range)
            assert port_range["start"] <= port_range["end"]
        except MultipleInvalid as e:
            logger.critical("Error validating port range in config file at '%s'!", path)
            logger.critical(e)
            raise FatalException
        except AssertionError as e:
            logger.critical("Invalid port range: (%d -> %d)", port_range["start"], port_range["end"])
            raise FatalException
项目:picoCTF    作者:royragsdale    | 项目源码 | 文件源码
def verify_config(config_object):
    """
    Verifies the given configuration dict against the config_schema and the port_range_schema
    Raise FatalException if failed.

    Args:
        config_object: The configuration options in a dict
    """

    try:
        config_schema(config_object)
    except MultipleInvalid as e:
        logger.critical("Error validating config file at '%s'!", path)
        logger.critical(e)
        raise FatalException

    for port_range in config_object["banned_ports"]:
        try:
            port_range_schema(port_range)
            assert port_range["start"] <= port_range["end"]
        except MultipleInvalid as e:
            logger.critical("Error validating port range in config file at '%s'!", path)
            logger.critical(e)
            raise FatalException
        except AssertionError as e:
            logger.critical("Invalid port range: (%d -> %d)", port_range["start"], port_range["end"])
            raise FatalException
项目:correios-lib    作者:trocafone    | 项目源码 | 文件源码
def __init__(self, **kargs):
        super(EntityBase, self).__init__(kargs)
        self.validate()

        try:
            raise getattr(self, 'error')
        except AttributeError:
            pass
        except Invalid as e:
            raise MultipleInvalid([e])
项目:correios-lib    作者:trocafone    | 项目源码 | 文件源码
def test_invalid_products(self):
        self.assertRaises(MultipleInvalid, Produto, codigo='abc')
项目:correios-lib    作者:trocafone    | 项目源码 | 文件源码
def test_invalid_coletas(self):
        self.assertRaises(MultipleInvalid, ColetaSimultanea, codigo='abc')
项目:correios-lib    作者:trocafone    | 项目源码 | 文件源码
def test_invalid_objetos(self):
        self.assertRaises(MultipleInvalid, Objeto, id='1')
        self.assertRaises(MultipleInvalid, Objeto, item=None)
项目:correios-lib    作者:trocafone    | 项目源码 | 文件源码
def test_invalid_coletas(self):
        self.assertRaises(MultipleInvalid, Coleta, codigo='abc')
项目:correios-lib    作者:trocafone    | 项目源码 | 文件源码
def __init__(self, **kargs):
        super(EntityBase, self).__init__(kargs)
        self.validate()

        try:
            raise getattr(self, 'error')
        except AttributeError:
            pass
        except Invalid as e:
            raise MultipleInvalid([e])
项目:correios-lib    作者:trocafone    | 项目源码 | 文件源码
def test_invalid_products(self):
        self.assertRaises(MultipleInvalid, Produto, codigo='abc')
项目:correios-lib    作者:trocafone    | 项目源码 | 文件源码
def test_invalid_coletas(self):
        self.assertRaises(MultipleInvalid, ColetaSimultanea, codigo='abc')
项目:correios-lib    作者:trocafone    | 项目源码 | 文件源码
def test_invalid_objetos(self):
        self.assertRaises(MultipleInvalid, Objeto, id='1')
        self.assertRaises(MultipleInvalid, Objeto, item=None)
项目:correios-lib    作者:trocafone    | 项目源码 | 文件源码
def test_invalid_coletas(self):
        self.assertRaises(MultipleInvalid, Coleta, codigo='abc')
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def test_boolean():
    """Test boolean validation."""
    schema = vol.Schema(cv.boolean)

    for value in ('T', 'negative', 'lock'):
        with pytest.raises(vol.MultipleInvalid):
            schema(value)

    for value in ('true', 'On', '1', 'YES', 'enable', 1, True):
        assert schema(value)

    for value in ('false', 'Off', '0', 'NO', 'disable', 0, False):
        assert not schema(value)
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def test_latitude():
    """Test latitude validation."""
    schema = vol.Schema(cv.latitude)

    for value in ('invalid', None, -91, 91, '-91', '91', '123.01A'):
        with pytest.raises(vol.MultipleInvalid):
            schema(value)

    for value in ('-89', 89, '12.34'):
        schema(value)
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def test_longitude():
    """Test longitude validation."""
    schema = vol.Schema(cv.longitude)

    for value in ('invalid', None, -181, 181, '-181', '181', '123.01A'):
        with pytest.raises(vol.MultipleInvalid):
            schema(value)

    for value in ('-179', 179, '12.34'):
        schema(value)
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def test_port():
    """Test TCP/UDP network port."""
    schema = vol.Schema(cv.port)

    for value in ('invalid', None, -1, 0, 80000, '81000'):
        with pytest.raises(vol.MultipleInvalid):
            schema(value)

    for value in ('1000', 21, 24574):
        schema(value)
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def test_url():
    """Test URL."""
    schema = vol.Schema(cv.url)

    for value in ('invalid', None, 100, 'htp://ha.io', 'http//ha.io',
                  'http://??,**', 'https://??,**'):
        with pytest.raises(vol.MultipleInvalid):
            schema(value)

    for value in ('http://localhost', 'https://localhost/test/index.html',
                  'http://home-assistant.io', 'http://home-assistant.io/test/',
                  'https://community.home-assistant.io/'):
        assert schema(value)
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def test_icon():
    """Test icon validation."""
    schema = vol.Schema(cv.icon)

    for value in (False, 'work', 'icon:work'):
        with pytest.raises(vol.MultipleInvalid):
            schema(value)

    schema('mdi:work')
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def test_service():
    """Test service validation."""
    schema = vol.Schema(cv.service)

    with pytest.raises(vol.MultipleInvalid):
        schema('invalid_turn_on')

    schema('scarlett_os.turn_on')
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def test_slug():
    """Test slug validation."""
    schema = vol.Schema(cv.slug)

    for value in (None, 'hello world'):
        with pytest.raises(vol.MultipleInvalid):
            schema(value)

    for value in (12345, 'hello'):
        schema(value)
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def test_temperature_unit():
    """Test temperature unit validation."""
    schema = vol.Schema(cv.temperature_unit)

    with pytest.raises(vol.MultipleInvalid):
        schema('K')

    schema('C')
    schema('F')
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def test_time_zone():
    """Test time zone validation."""
    schema = vol.Schema(cv.time_zone)

    with pytest.raises(vol.MultipleInvalid):
        schema('America/Do_Not_Exist')

    schema('America/Los_Angeles')
    schema('UTC')
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def test_has_at_least_one_key():
    """Test has_at_least_one_key validator."""
    schema = vol.Schema(cv.has_at_least_one_key('beer', 'soda'))

    for value in (None, [], {}, {'wine': None}):
        with pytest.raises(vol.MultipleInvalid):
            schema(value)

    for value in ({'beer': None}, {'soda': None}):
        schema(value)
项目:foreman-yml    作者:adfinis-sygroup    | 项目源码 | 文件源码
def process_config_domain(self):
        log.log(log.LOG_INFO, "Processing Domains")
        for domain in self.get_config_section('domain'):
            try:
                self.validator.domain(domain)
            except MultipleInvalid as e:
                log.log(log.LOG_WARN, "Cannot create Domain '{0}': YAML validation Error: {1}".format(domain['name'], e))
                continue
            try:
                dom_id = self.fm.domains.show(domain['name'])['id']
                log.log(log.LOG_DEBUG, "Domain '{0}' (id={1}) already present.".format(domain['name'], dom_id))
            except:
                dns_proxy_id = False
                try:
                    dns_proxy_id = self.fm.smart_proxies.show(domain['dns-proxy'])['id']
                except:
                    log.log(log.LOG_WARN, "Cannot get ID of DNS Smart Proxy '{0}', skipping".format(domain['dns-proxy']))

                log.log(log.LOG_INFO, "Create Domain '{0}'".format(domain['name']))
                dom_params = []
                if (domain['parameters']):
                    for name,value in domain['parameters'].iteritems():
                        p = {
                            'name':     name,
                            'value':    value
                        }
                        dom_params.append(p)
                dom_tpl = {
                    'name': domain['name'],
                    'fullname': domain['fullname'],
                }
                fixdom = {
                    'domain_parameters_attributes': dom_params
                }

                if dns_proxy_id: dom_tpl['dns_id'] = dns_proxy_id

                domo = self.fm.domains.create( domain = dom_tpl )
                if dom_params:
                    self.fm.domains.update(fixdom, domo['id'])
项目:foreman-yml    作者:adfinis-sygroup    | 项目源码 | 文件源码
def process_config_os(self):
        log.log(log.LOG_INFO, "Processing Operating Systems")
        for operatingsystem in self.get_config_section('os'):
            try:
                self.validator.os(operatingsystem)
            except MultipleInvalid as e:
                log.log(log.LOG_WARN, "Cannot create Operating System '{0}': YAML validation Error: {1}".format(operatingsystem['name'], e))
                continue
            try:
                os_id = self.fm.operatingsystems.show(operatingsystem['description'])['id']
                log.log(log.LOG_DEBUG, "Operating System '{0}' (id={1}) already present.".format(operatingsystem['name'], os_id))
            except:
                log.log(log.LOG_INFO, "Create Operating System '{0}'".format(operatingsystem['name']))
                os_tpl = {
                    'name':             operatingsystem['name'],
                    'description':      operatingsystem['description'],
                    'major':            operatingsystem['major'],
                    'minor':            operatingsystem['minor'],
                    'family':           operatingsystem['family'],
                    'release_name':     operatingsystem['release-name'],
                    'password_hash':    operatingsystem['password-hash']
                }
                os_obj = self.fm.operatingsystems.create(operatingsystem=os_tpl)

                #  host_params
                if operatingsystem['parameters'] is not None:
                    for name,value in operatingsystem['parameters'].iteritems():
                        p = {
                            'name':     name,
                            'value':    value
                        }
                        try:
                            self.fm.operatingsystems.parameters_create(os_obj['id'], p )
                        except:
                            log.log(log.LOG_WARN, "Error adding host parameter '{0}'".format(name))