Python jsonschema 模块,SchemaError() 实例源码

我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用jsonschema.SchemaError()

项目:pywhdfs    作者:yassineazzouz    | 项目源码 | 文件源码
def __init__(self, path=None):
      self.path = path or os.getenv('WEBHDFS_CONFIG', self.default_path)
      if osp.exists(self.path):
        try:
          self.config = json.loads(open(self.path).read())
          self.schema = json.loads(resource_string(__name__, 'resources/config_schema.json'))
          #self.schema = open("resources/schema.config").read()
          try:
            js.validate(self.config, self.schema)
          except js.ValidationError as e:
            print e.message
          except js.SchemaError as e:
            print e

        except ParsingError:
          raise HdfsError('Invalid configuration file %r.', self.path)

        _logger.info('Instantiated configuration from %r.', self.path)
      else:
        raise HdfsError('Invalid configuration file %r.', self.path)
项目:export2hdf5    作者:bwrc    | 项目源码 | 文件源码
def validate_config(fname):
    """
    Validate configuration file in json format.
    """
    # Load schema
    schema_fname = pkg_resources.resource_filename('export2hdf5', "config_schema.json")
    schema = load_json_file(schema_fname)
    config = load_json_file(fname)

    res = None
    try:
        jsonschema.validate(config, schema)
    except jsonschema.ValidationError as e:
        res = e.message
    except jsonschema.SchemaError as e:
        res = e

    return res
项目:tcex    作者:ThreatConnect-Inc    | 项目源码 | 文件源码
def validate(install_json):
        """Validate install.json file for required parameters"""

        # install.json validation
        try:
            with open(install_json) as fh:
                data = json.loads(fh.read())
            validate(data, schema)
            print('{} is valid'.format(install_json))
        except SchemaError as e:
            print('{} is invalid "{}"'.format(install_json, e))
        except ValidationError as e:
            print('{} is invalid "{}"'.format(install_json, e))

    # @staticmethod
    # def _wrap(data):
    #     """Wrap any parameters that contain spaces
#
    #     Returns:
    #         (string): String containing parameters wrapped in double quotes
    #     """
    #     if len(re.findall(r'[!\-\s\$]{1,}', data)) > 0:
    #         data = '"{}"'.format(data)
    #     return data
项目:son-cli    作者:sonata-nfv    | 项目源码 | 文件源码
def validate(self, descriptor, schema_id):
        """
        Validate a descriptor against a schema template
        :param descriptor:
        :param schema_id:
        :return:
        """
        try:
            jsonschema.validate(descriptor, self.load_schema(schema_id))
            return True

        except ValidationError as e:
            log.error("Failed to validate Descriptor against schema '{}'"
                      .format(schema_id))
            self.error_msg = e.message
            log.error(e.message)
            return

        except SchemaError as e:
            log.error("Invalid Schema '{}'".format(schema_id))
            self.error_msg = e.message
            log.debug(e)
            return
项目:packetary    作者:openstack    | 项目源码 | 文件源码
def test_validate_invalid_data(self, jsonschema_mock):
        paths = [(("a", 0), "\['a'\]\[0\]"), ((), "")]
        for path, details in paths:
            msg = "Invalid data: error."
            if details:
                msg += "\nField: {0}".format(details)
            with self.assertRaisesRegexp(ValueError, msg):
                jsonschema_mock.validate.side_effect = ValidationError(
                    "error", path=path
                )
                validators._validate_data(self.data, self.schema)

            msg = "Invalid schema: error."
            if details:
                msg += "\nField: {0}".format(details)
            with self.assertRaisesRegexp(ValueError, msg):
                jsonschema_mock.validate.side_effect = SchemaError(
                    "error", schema_path=path
                )
                validators._validate_data(self.data, self.schema)
项目:threatconnect-developer-docs    作者:ThreatConnect-Inc    | 项目源码 | 文件源码
def validate(install_json):
        """Validate install.json file for required parameters"""

        # install.json validation
        try:
            with open(install_json) as fh:
                data = json.loads(fh.read())
            validate(data, schema)
            print('{} is valid'.format(install_json))
        except SchemaError as e:
            print('{} is invalid "{}"'.format(install_json, e))
        except ValidationError as e:
            print('{} is invalid "{}"'.format(install_json, e))

    # @staticmethod
    # def _wrap(data):
    #     """Wrap any parameters that contain spaces
#
    #     Returns:
    #         (string): String containing parameters wrapped in double quotes
    #     """
    #     if len(re.findall(r'[!\-\s\$]{1,}', data)) > 0:
    #         data = '"{}"'.format(data)
    #     return data
项目:linchpin    作者:CentOS-PaaS-SIG    | 项目源码 | 文件源码
def validate(self):
        data = self.get_data(self.data_file)
        schema = open(self.schema_file).read()

        try:
            # result may be needed, commented out in case it can be used
            # result = jsonschema.validate(json.loads(data), json.loads(schema))
            jsonschema.validate(json.loads(data), json.loads(schema))
            return (True, json.loads(data))
        except jsonschema.ValidationError as e:
            return (False, "ValidationError: {0}".format(e.message))
        except jsonschema.SchemaError as e:
            return (False, "SchemaError: {0}".format(e))
        except Exception as e:
            return (False, "Unknown Error: {0}".format(e))
项目:jack    作者:uclmr    | 项目源码 | 文件源码
def main(arg1, arg2):
    with open(arg1) as f:
        data = json.load(f)

    with open(arg2) as f:
        schema = json.load(f)

    try:
        jsonschema.validate(data, schema)
        return 'JSON successfully validated.'
    except jsonschema.ValidationError as e:
        return e.message
    except jsonschema.SchemaError as e:
        return e
项目:async-pluct    作者:globocom    | 项目源码 | 文件源码
def is_valid(self):
        handlers = {'https': self.session_request_json,
                    'http': self.session_request_json}
        schema = await self.schema.raw_schema
        resolver = RefResolver.from_schema(schema,
                                           handlers=handlers)
        try:
            validate(self.data, schema, resolver=resolver)
        except (SchemaError, ValidationError):
            return False
        return True
项目:notifiers    作者:liiight    | 项目源码 | 文件源码
def _validate_schema(self, validator: jsonschema.Draft4Validator):
        """
        Validates provider schema for syntax issues. Raises :class:`SchemaError` if relevant

        :param validator: :class:`jsonschema.Draft4Validator`
        """
        try:
            log.debug('validating provider schema')
            validator.check_schema(self.schema)
        except jsonschema.SchemaError as e:
            # todo generate custom errors when relevant
            raise SchemaError(schema_error=e.message, provider=self.provider_name, data=self.schema)
项目:eq-survey-runner    作者:ONSdigital    | 项目源码 | 文件源码
def validate_json_with_schema(data, schema):
    errors = []

    try:
        validate(data, schema)

    except ValidationError as e:
        errors.append('Schema Validation Error! message [{}] does not validate against schema. Error [{}]'.format(data,
                                                                                                                  e))

    except SchemaError as e:
        errors.append('JSON Parse Error! Could not parse [{}]. Error [{}]'.format(data, e))

    return errors
项目:jamdb    作者:CenterForOpenScience    | 项目源码 | 文件源码
def validate_schema(self, schema):
        try:
            jsonschema.Draft4Validator.check_schema(schema)
        except jsonschema.SchemaError:
            raise exceptions.InvalidSchema('jsonschema')
项目:music_recommend    作者:YeEmrick    | 项目源码 | 文件源码
def check(name, correct_schema):
    schema = validator.schema[name]
    cls = validator_for(schema)
    is_valid_schema = True
    try:
        cls.check_schema(schema)
    except SchemaError:
        is_valid_schema = False
    assert is_valid_schema
    assert schema == correct_schema


# ????json
项目:music_recommend    作者:YeEmrick    | 项目源码 | 文件源码
def validate(self, json, schema_name):
        """
        ????
        :param json: type(str) ??json
        :param schema_name: type(str) schema??
        :return: (boolean, dict)
                ???????? (True, );??????? (False, err)
                err = {
                    "key_name":"",
                    "key_path":"",
                    "msg": ""
                }
        :type json: dict
        :type schema_name: str
        :rtype (bool, str)
        """
        err = {
            "key_name": "",
            "key_path": "",
            "msg": ""
        }
        try:
            jsonschema.validate(json, self.schema[schema_name], format_checker=jsonschema.FormatChecker())
            return True, err
        except ValidationError, e:
            key_name = ""
            if e.path:
                key_name = e.path.pop().replace(".", "")
            key_path = str(list(e.path)).replace("[", "").replace("]", "").replace(", ", ".").replace("'", "")
            msg = e.message
            err['key_name'] = key_name
            err['key_path'] = key_path
            err['msg'] = msg
            return False, err
        except SchemaError:
            raise SchemaError("invalid schema in validator[%s], please checking" % schema_name)
        except KeyError:
            raise ValueError("not found schema name [%s] in schema config" % schema_name)
项目:acceptable    作者:canonical-ols    | 项目源码 | 文件源码
def test_raises_on_bad_schema(self):
        def fn():
            pass
        self.assertRaises(
            jsonschema.SchemaError,
            validate_body({'required': 'bar'}),
            fn
        )
项目:acceptable    作者:canonical-ols    | 项目源码 | 文件源码
def test_raises_on_bad_schema(self):
        def fn():
            pass
        self.assertRaises(
            jsonschema.SchemaError,
            validate_output({'required': 'bar'}),
            fn
        )
项目:acceptable    作者:canonical-ols    | 项目源码 | 文件源码
def validate_schema(schema):
    """Validate that 'schema' is correct.

    This validates against the jsonschema v4 draft.

    :raises jsonschema.SchemaError: if the schema is invalid.
    """
    jsonschema.Draft4Validator.check_schema(schema)
项目:son-cli    作者:sonata-nfv    | 项目源码 | 文件源码
def get_descriptor_type(self, descriptor):
        """
        This function obtains the type of a descriptor.
        Its methodology is based on trial-error, since it
        attempts to validate the descriptor against the
        available schema templates until a success is achieved
        """
        # Gather schema templates ids
        templates = {self.SCHEMA_PACKAGE_DESCRIPTOR,
                     self.SCHEMA_SERVICE_DESCRIPTOR,
                     self.SCHEMA_FUNCTION_DESCRIPTOR}

        # Cycle through templates until a success validation is return
        for schema_id in templates:
            try:
                jsonschema.validate(descriptor, self.load_schema(schema_id))
                return schema_id

            except ValidationError:

                continue

            except SchemaError as error_detail:
                log.error("Invalid Schema '{}'".format(schema_id))
                log.debug(error_detail)
                return
项目:openadms-node    作者:dabamos    | 项目源码 | 文件源码
def add_schema(self,
                   data_type: str,
                   path: str) -> bool:
        """Reads a JSON schemes file from the given path and stores it in the
        internal dictionary.

        Args:
            data_type: The name of the data type (e.g., 'observation').
            path: The path to the JSON schemes file.

        Returns:
            True if schemes has been added, False if not.
        """
        if self._schema.get(data_type):
            return False

        schema_path = Path(self._schema_root_path, path)

        if not schema_path.exists():
            self.logger.error('Schema file "{}" not found.'
                              .format(schema_path))
            return False

        with open(str(schema_path), encoding='utf-8') as data_file:
            try:
                schema = json.loads(data_file.read())
                jsonschema.Draft4Validator.check_schema(schema)

                self._schema[data_type] = schema
                self.logger.debug('Loaded schema "{}"'
                                  .format(data_type))
            except json.JSONDecodeError:
                self.logger.error('Invalid JSON file "{}"'
                                  .format(schema_path))
                return False
            except jsonschema.SchemaError:
                self.logger.error('Invalid JSON schema "{}"'
                                  .format(schema_path))
                return False

        return True
项目:stash-scanner    作者:senuido    | 项目源码 | 文件源码
def init(cls):
        try:
            with open(Filter.FILTER_SCHEMA_FNAME) as f:
                schema = json.load(f)

            jsonschema.validate({}, schema)
        except jsonschema.ValidationError:
            cls.schema_validator = jsonschema.Draft4Validator(schema)
        except jsonschema.SchemaError as e:
            raise AppException('Failed loading filter validation schema.\n{}'.format(FILTER_SCHEMA_ERROR.format(e)))
        # except FileNotFoundError as e:
        except Exception as e:
            raise AppException('Failed loading filter validation schema.\n{}\n'
                               'Make sure the file are valid and in place.'.format(e))
项目:radar    作者:renalreg    | 项目源码 | 文件源码
def __init__(self, registry, schema_data):
        try:
            jsonschema.validate(schema_data, registry.schema)
        except jsonschema.SchemaError as e:
            print(e)
            raise SchemaError()

        self.registry = registry
        self.fields = []

        for field_data in schema_data['fields']:
            self.fields.append(Field(self, field_data))
项目:radar    作者:renalreg    | 项目源码 | 文件源码
def parse_default(self, default_data):
        """Returns a function to get the default value for a field.

        Can be any of:
        * constant (e.g. None, 3, "hello")
        * object - function

        A function allows you to choose a default value based on other
        submitted values or to default to today's date.

        The returned function should have two parameters:

        * form - parsed field values.
        * path - path to the current field as a list.

        The returned function should return a value to use as the default.
        """

        if default_data is None:
            # Default to None
            return wrap(None)
        elif isinstance(default_data, dict):
            # Run a function to get the default value
            name = default_data.get('name')

            if name is None:
                raise SchemaError()

            default = self.registry.get_default(self.type, name)(self, default_data)

            return default
        else:
            # Default to a constant
            try:
                # Need to parse the constant (e.g. dates)
                return wrap(self.parser(default_data))
            except ValueError:
                raise SchemaError()
项目:radar    作者:renalreg    | 项目源码 | 文件源码
def parse_required(self, required_data):
        """Returns a function to determine if a field is required.

        Can be any of:
        * True - field is required
        * False - field is optional
        * object - custom function

        The function option is useful when a field is only required in
        certain situations (e.g. if another field is a particular value).

        The returned function should have two parameters:

        * form - parsed field values.
        * path - path to the current field as a list.

        Note: The path is currently just the name of the field.

        The returned function should return true if the field
        is required.

        Missing values are replaced with nulls when the field is parsed.
        An error is returned if a null value is given for a required field.
        """

        if required_data is True:
            # Field is required (reject None)
            return required_f
        elif required_data is False:
            # Field is optional (accept None)
            return optional_f
        elif isinstance(required_data, dict):
            # Run a function to determine if this field is required
            name = required_data['name']
            required = self.registry.get_required(name)(self, required_data)
            return required
        else:
            # Should never reach here
            raise SchemaError()
项目:radar    作者:renalreg    | 项目源码 | 文件源码
def parse_visible(self, visible_data):
        """Returns a function that determines if the field is visible.

        Fields that are hidden have their value set to None.

        Can be any of:
        * True - field is visible
        * False - field is hidden
        * object - custom function

        The function option can be used to show or hide a field based on
        the value of another field.

        The returned function should have two parameters:

        * form - parsed field values.
        * path - path to the current field as a list.

        Note: The path is currently just the name of the field.

        The returned function should return true if the field
        is visible.
        """

        if visible_data is True:
            # Field is visible
            return visible_f
        elif visible_data is False:
            # Field is hidden (set value to None)
            return hidden_f
        elif isinstance(visible_data, dict):
            # Run a function to determine if this field is visible
            name = visible_data['name']
            visible = self.registry.get_visible(name)(self, visible_data)
            return visible
        else:
            # Should never reach here
            raise SchemaError()
项目:radar    作者:renalreg    | 项目源码 | 文件源码
def parse_validators(self, validators_data):
        """Parse a list of validators."""

        if not isinstance(validators_data, list):
            raise SchemaError()

        validators = []

        for validator_data in validators_data:
            validators.append(self.parse_validator(validator_data))

        return validators
项目:packetary    作者:openstack    | 项目源码 | 文件源码
def _validate_data(data, schema):
    """Validate the input data using jsonschema validation.

    :param data: a data to validate represented as a dict
    :param schema: a schema to validate represented as a dict;
                   must be in JSON Schema Draft 4 format.
    """
    try:
        jsonschema.validate(data, schema)
    except jsonschema.ValidationError as e:
        _raise_validation_error("data", e.message, e.path)
    except jsonschema.SchemaError as e:
        _raise_validation_error("schema", e.message, e.schema_path)
项目:bluesteel    作者:imvu    | 项目源码 | 文件源码
def validate_obj_schema(obj, schema):
    """ Validate an object with an schema, if false, returns error message """
    try:
        Draft4Validator.check_schema(schema)
        Draft4Validator(schema).validate(obj)
    except jsonschema.ValidationError as val_err:
        return (False, {'msg' : str(val_err)})
    except jsonschema.SchemaError as sch_err:
        return (False, {'msg' : str(sch_err)})
    return (True, obj)
项目:swaggerit    作者:dutradda    | 项目源码 | 文件源码
def __call__(self, req, session):
        denied = await self._authorize(req, session)
        if denied is not None:
            return denied

        response_headers = {'content-type': 'application/json'}

        try:
            body_params = await self._build_body_params(req)
            query_params = self._build_non_body_params(self._query_validator, req.query)
            path_params = self._build_non_body_params(self._path_validator,
                                                               req.path_params)
            headers_params = self._build_non_body_params(self._headers_validator, dict(req.headers))

        except (ValidationError, SchemaError) as error:
            return self._valdation_error_to_response(error, response_headers)

        req = SwaggerRequest(
            req.path,
            req.method,
            scheme=req.scheme,
            host=req.host,
            path_params=path_params,
            query=query_params,
            headers=headers_params,
            body=body_params,
            body_schema=self._body_validator.schema if self._body_validator else None,
            context=req.context
        )

        try:
            if session is None:
                resp = await self._operation(req)
            else:
                resp = await self._operation(req, session)

        except (ValidationError, SchemaError) as error:
            return self._valdation_error_to_response(error, response_headers)

        except Exception as error:
            body = ujson.dumps({'message': 'Something unexpected happened'})
            self._logger.exception('Unexpected')
            return SwaggerResponse(500, body=body, headers=response_headers)

        else:
            if resp.headers.get('content-type') is None:
                resp.headers.update(response_headers)
            return resp
项目:groundwork-spreadsheets    作者:useblocks    | 项目源码 | 文件源码
def _validate_json(self, excel_config_json_path):

        try:
            with open(excel_config_json_path) as file_pointer:
                json_obj = json.load(file_pointer)
        # the file is not deserializable as a json object
        except ValueError as exc:
            self._plugin.log.error('Malformed JSON file: {0} \n {1}'.format(excel_config_json_path, exc))
            raise exc
        # some os error occured (e.g file not found or malformed path string)
        # have to catch two exception classes: in py2 : IOError; py3: OSError
        except (IOError, OSError) as exc:
            self._plugin.log.error(exc)
            # raise only OSError to make error handling in caller easier
            raise OSError()

        # validate json object if schema file path is there; otherwise throw warning
        try:
            with open(JSON_SCHEMA_FILE_PATH) as file_pointer:
                schema_obj = json.load(file_pointer)
        # the file is not deserializable as a json object
        except ValueError as exc:
            self._plugin.log.error('Malformed JSON schema file: {0} \n {1}'.format(JSON_SCHEMA_FILE_PATH, exc))
            raise exc
        # some os error occured (e.g file not found or malformed path string)
        # have to catch two exception classes:  in py2 : IOError; py3: OSError
        except (IOError, OSError) as exc:
            self._plugin.log.error(exc)
            # raise only OSError to make error handling in caller easier
            raise OSError()

        # do the validation
        try:
            validate(json_obj, schema_obj)
        except ValidationError as error:
            self._plugin.log.error("Validation failed: {0}".format(error.message))
            raise
        except SchemaError:
            self._plugin.log.error("Invalid schema file: {0}".format(JSON_SCHEMA_FILE_PATH))
            raise

        return json_obj
项目:stash-scanner    作者:senuido    | 项目源码 | 文件源码
def loadFiltersFromFile(cls, fname, validate_data):
        filters = []

        cur_fname = fname
        try:
            with cls.filter_file_lock[fname]:
                with open(cur_fname, encoding="utf-8", errors="replace") as f:
                    data = json.load(f)

            cur_fname = FILTERS_FILE_SCHEMA_FNAME  # TODO: move schema loading to main init and store in class
            with open(cur_fname) as f:
                schema = json.load(f)

            # normalize keys and values
            data = lower_json(data)

            jsonschema.validate(data, schema)

            for item in data.get('filters', []):
                fltr = Filter.fromDict(item)
                filters.append(fltr)

            ver = data.get('version', FilterVersion.V1)
            if ver != FilterVersion.Latest:
                FilterManager.convert(ver, filters)
            last_update = data.get('last_update', '')

            if validate_data:
                for fltr in filters:
                    fltr.validate()

            try:
                last_update = datetime.strptime(last_update, '%Y-%m-%dT%H:%M:%S.%f')
            except ValueError:
                last_update = datetime.utcnow() - timedelta(minutes=FilterManager.UPDATE_INTERVAL)
        except FileNotFoundError:
            if cur_fname != FILTERS_FILE_SCHEMA_FNAME:
                raise
            else:
                raise AppException(FILTER_FILE_MISSING.format(cur_fname))
        except jsonschema.ValidationError as e:
            raise AppException(FILTERS_FILE_VALIDATION_ERROR.format(get_verror_msg(e, data)))
        except jsonschema.SchemaError as e:
            raise AppException(FILTERS_FILE_SCHEMA_ERROR.format(e.message))
        except json.decoder.JSONDecodeError as e:
            raise AppException(FILTER_INVALID_JSON.format(e, cur_fname))

        return filters, last_update