Python jsonschema 模块,ValidationError() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用jsonschema.ValidationError()

项目:zun    作者:openstack    | 项目源码 | 文件源码
def validate(self, *args, **kwargs):
        try:
            self.validator.validate(*args, **kwargs)
        except jsonschema.ValidationError as ex:
            if len(ex.path) > 0:
                if self.is_body:
                    detail = _("Invalid input for field '%(path)s'."
                               "Value: '%(value)s'. %(message)s")
                else:
                    detail = _("Invalid input for query parameters "
                               "'%(path)s'. Value: '%(value)s'. %(message)s")
                detail = detail % {
                    'path': ex.path.pop(), 'value': ex.instance,
                    'message': six.text_type(ex)
                }
            else:
                detail = six.text_type(ex)
            raise exception.SchemaValidationError(detail=detail)
项目:pytablereader    作者:thombashi    | 项目源码 | 文件源码
def to_table_data(self):
        """
        :raises ValueError:
        :raises pytablereader.error.ValidationError:
        """

        self._validate_source_data()

        attr_name_set = set()
        for json_record in self._buffer:
            attr_name_set = attr_name_set.union(six.viewkeys(json_record))

        self._loader.inc_table_count()

        yield TableData(
            table_name=self._make_table_name(),
            header_list=sorted(attr_name_set),
            record_list=self._buffer,
            quoting_flags=self._loader.quoting_flags)
项目:pytablereader    作者:thombashi    | 项目源码 | 文件源码
def to_table_data(self):
        """
        :raises ValueError:
        :raises pytablereader.error.ValidationError:
        """

        self._validate_source_data()
        self._loader.inc_table_count()

        header_list = sorted(six.viewkeys(self._buffer))

        yield TableData(
            table_name=self._make_table_name(),
            header_list=header_list,
            record_list=zip(
                *[self._buffer.get(header) for header in header_list]),
            quoting_flags=self._loader.quoting_flags)
项目:pytablereader    作者:thombashi    | 项目源码 | 文件源码
def to_table_data(self):
        """
        :raises ValueError:
        :raises pytablereader.error.ValidationError:
        """

        self._validate_source_data()

        for table_key, json_record_list in six.iteritems(self._buffer):
            attr_name_set = set()
            for json_record in json_record_list:
                attr_name_set = attr_name_set.union(six.viewkeys(json_record))

            self._loader.inc_table_count()
            self._table_key = table_key

            yield TableData(
                table_name=self._make_table_name(),
                header_list=sorted(attr_name_set),
                record_list=json_record_list,
                quoting_flags=self._loader.quoting_flags)
项目:pytablereader    作者:thombashi    | 项目源码 | 文件源码
def to_table_data(self):
        """
        :raises ValueError:
        :raises pytablereader.error.ValidationError:
        """

        self._validate_source_data()

        for table_key, json_record_list in six.iteritems(self._buffer):
            header_list = sorted(six.viewkeys(json_record_list))

            self._loader.inc_table_count()
            self._table_key = table_key

            yield TableData(
                table_name=self._make_table_name(),
                header_list=header_list,
                record_list=zip(
                    *[json_record_list.get(header) for header in header_list]),
                quoting_flags=self._loader.quoting_flags)
项目:pywhdfs    作者:yassineazzouz    | 项目源码 | 文件源码
def __init__(self, path=None):
      self.path = path or os.getenv('WEBHDFS_CONFIG', self.default_path)
      if osp.exists(self.path):
        try:
          self.config = json.loads(open(self.path).read())
          self.schema = json.loads(resource_string(__name__, 'resources/config_schema.json'))
          #self.schema = open("resources/schema.config").read()
          try:
            js.validate(self.config, self.schema)
          except js.ValidationError as e:
            print e.message
          except js.SchemaError as e:
            print e

        except ParsingError:
          raise HdfsError('Invalid configuration file %r.', self.path)

        _logger.info('Instantiated configuration from %r.', self.path)
      else:
        raise HdfsError('Invalid configuration file %r.', self.path)
项目:drift    作者:dgnorth    | 项目源码 | 文件源码
def schema_request(media_type_name):
    def wrapper(fn):
        @wraps(fn)
        def decorated(*args, **kwargs):
            if isinstance(media_type_name, basestring):
                schema = get_schema_for_media_type(media_type_name)
            else:
                schema = media_type_name
            json_object = request.get_json()
            try:
                validate(json_object, schema)
            except ValidationError as e:
                report = generate_validation_error_report(e, json_object)
                abort(400, description=report)
            return fn(*args, **kwargs)

        return decorated
    return wrapper
项目:fuel-ccp    作者:openstack    | 项目源码 | 文件源码
def validate_service_definitions(components_map, components=None):
    if not components:
        components = components_map.keys()
    else:
        validation_base.validate_components_names(components, components_map)

    not_passed_components = set()

    for component in components:
        try:
            jsonschema.validate(components_map[component]["service_content"],
                                SERVICE_SCHEMA,
                                format_checker=ServiceFormatChecker())
        except jsonschema.ValidationError as e:
            LOG.error("Validation of service definitions for component '%s' "
                      "is not passed: '%s'", component, e.message)
            not_passed_components.add(component)

    if not_passed_components:
        raise RuntimeError(
            "Validation of service definitions for {} of {} components is "
            "not passed.".format(len(not_passed_components), len(components))
        )
    else:
        LOG.info("Service definitions validation passed successfully")
项目:apm-agent-python    作者:elastic    | 项目源码 | 文件源码
def __call__(self, environ, start_response):
        code = self.code
        content = self.content
        request = Request(environ)
        self.requests.append(request)
        data = request.data
        if request.content_encoding == 'deflate':
            data = zlib.decompress(data)
        data = data.decode(request.charset)
        if request.content_type == 'application/json':
            data = json.loads(data)
        self.payloads.append(data)
        validator = VALIDATORS.get(request.path, None)
        if validator and not self.skip_validate:
            try:
                validator.validate(data)
                code = 202
            except jsonschema.ValidationError as e:
                code = 400
                content = json.dumps({'status': 'error', 'message': str(e)})
        response = Response(status=code)
        response.headers.clear()
        response.headers.extend(self.headers)
        response.data = content
        return response(environ, start_response)
项目:export2hdf5    作者:bwrc    | 项目源码 | 文件源码
def validate_config(fname):
    """
    Validate configuration file in json format.
    """
    # Load schema
    schema_fname = pkg_resources.resource_filename('export2hdf5', "config_schema.json")
    schema = load_json_file(schema_fname)
    config = load_json_file(fname)

    res = None
    try:
        jsonschema.validate(config, schema)
    except jsonschema.ValidationError as e:
        res = e.message
    except jsonschema.SchemaError as e:
        res = e

    return res
项目:chaos-monkey-engine    作者:BBVA    | 项目源码 | 文件源码
def execute_plan(self, name, planner_config, attack_config):
        """
        Execute a plan with a planner and executor config to create executors based on the configs

        It also validates the planner and executor config against the modules

        :param name:                Plan name
        :param planner_config:      Dict with planner config
        :param attack_config:       Dict with attack config
        """
        try:
            planner_class = self._planners_store.get(planner_config.get("ref"))
            attack_class = self._attacks_store.get(attack_config.get("ref"))
        except ModuleLookupError as e:
            raise APIError("invalid planner %s" % e.message)

        # Validate both executor and planner configs
        try:
            validate(planner_config.get("args"), planner_class.schema)
            validate(attack_config, attack_class.schema)
        except ValidationError as e:
            raise APIError("invalid payload %s" % e.message)

        planner = planner_class(name)
        planner.plan(planner_config, attack_config)
项目:chaos-monkey-engine    作者:BBVA    | 项目源码 | 文件源码
def validate_payload(request, schema):
    """
    validates a request payload against a json schema

    :param request: request received with valid json body
    :param schema:  schema to validate the request payload

    :return: True
    :raises: :meth:`chaosmonkey.api.api_errors`
    """
    try:
        json = request.get_json()
        validate(json, schema)
    except ValidationError as e:
        raise APIError("invalid payload %s" % e.message)
    except Exception:
        raise APIError("payload must be a valid json")
    else:
        return True
项目:tcex    作者:ThreatConnect-Inc    | 项目源码 | 文件源码
def validate(install_json):
        """Validate install.json file for required parameters"""

        # install.json validation
        try:
            with open(install_json) as fh:
                data = json.loads(fh.read())
            validate(data, schema)
            print('{} is valid'.format(install_json))
        except SchemaError as e:
            print('{} is invalid "{}"'.format(install_json, e))
        except ValidationError as e:
            print('{} is invalid "{}"'.format(install_json, e))

    # @staticmethod
    # def _wrap(data):
    #     """Wrap any parameters that contain spaces
#
    #     Returns:
    #         (string): String containing parameters wrapped in double quotes
    #     """
    #     if len(re.findall(r'[!\-\s\$]{1,}', data)) > 0:
    #         data = '"{}"'.format(data)
    #     return data
项目:promenade    作者:att-comdev    | 项目源码 | 文件源码
def check_schema(document):
    if type(document) != dict:
        LOG.error('Non-dictionary document passed to schema validation.')
        return

    schema_name = document.get('schema', '<missing>')

    LOG.debug('Validating schema for schema=%s metadata.name=%s', schema_name,
              document.get('metadata', {}).get('name', '<missing>'))

    if schema_name in SCHEMAS:
        try:
            jsonschema.validate(document.get('data'), SCHEMAS[schema_name])
        except jsonschema.ValidationError as e:
            raise exceptions.ValidationException(str(e))
    else:
        LOG.warning('Skipping validation for unknown schema: %s', schema_name)
项目:bigchaindb    作者:bigchaindb    | 项目源码 | 文件源码
def _validate_schema(schema, body):
    """Validate data against a schema"""

    # Note
    #
    # Schema validation is currently the major CPU bottleneck of
    # BigchainDB. the `jsonschema` library validates python data structures
    # directly and produces nice error messages, but validation takes 4+ ms
    # per transaction which is pretty slow. The rapidjson library validates
    # much faster at 1.5ms, however it produces _very_ poor error messages.
    # For this reason we use both, rapidjson as an optimistic pathway and
    # jsonschema as a fallback in case there is a failure, so we can produce
    # a helpful error message.

    try:
        schema[1].validate(rapidjson.dumps(body))
    except ValueError as exc:
        try:
            jsonschema.validate(body, schema[0])
        except jsonschema.ValidationError as exc2:
            raise SchemaValidationError(str(exc2)) from exc2
        logger.warning('code problem: jsonschema did not raise an exception, wheras rapidjson raised %s', exc)
        raise SchemaValidationError(str(exc)) from exc
项目:django-api-bouncer    作者:menecio    | 项目源码 | 文件源码
def validate(self, data):
        name = data.get('name')
        config = data.get('config', {})

        if not name or name not in plugins:
            raise serializers.ValidationError('Invalid plugin name')

        plugin_schema = plugins[name]

        if self.instance:
            initial_config = self.instance.config
            initial_config.update(config)
            config = initial_config

        try:
            jsonschema.validate(config, plugin_schema)
        except jsonschema.ValidationError as e:
            raise serializers.ValidationError({'config': e})

        plugin_validators = validators.validator_classes.get(name, [])
        for validator in plugin_validators:
            validate = validator(data['config'])
            validate()
        return data
项目:pyblish-starter    作者:pyblish    | 项目源码 | 文件源码
def test_unsuccessful_validation(self):
        error = ValidationError("I am an error!", instance=1)
        stdout, stderr = StringIO(), StringIO()
        exit_code = cli.run(
            {
                "validator": fake_validator([error]),
                "schema": {},
                "instances": [1],
                "error_format": "{error.instance} - {error.message}",
            },
            stdout=stdout,
            stderr=stderr,
        )
        self.assertFalse(stdout.getvalue())
        self.assertEqual(stderr.getvalue(), "1 - I am an error!")
        self.assertEqual(exit_code, 1)
项目:pyblish-starter    作者:pyblish    | 项目源码 | 文件源码
def test_unsuccessful_validation_multiple_instances(self):
        first_errors = [
            ValidationError("9", instance=1),
            ValidationError("8", instance=1),
        ]
        second_errors = [ValidationError("7", instance=2)]
        stdout, stderr = StringIO(), StringIO()
        exit_code = cli.run(
            {
                "validator": fake_validator(first_errors, second_errors),
                "schema": {},
                "instances": [1, 2],
                "error_format": "{error.instance} - {error.message}\t",
            },
            stdout=stdout,
            stderr=stderr,
        )
        self.assertFalse(stdout.getvalue())
        self.assertEqual(stderr.getvalue(), "1 - 9\t1 - 8\t2 - 7\t")
        self.assertEqual(exit_code, 1)
项目:jupyterlab_launcher    作者:jupyterlab    | 项目源码 | 文件源码
def put(self, section_name):
        if not self.settings_dir:
            raise web.HTTPError(404, "No current settings directory")

        raw = self.request.body.strip().decode(u"utf-8")

        # Validate the data against the schema.
        schema = _get_schema(self.schemas_dir, section_name, self.overrides)
        validator = Validator(schema)
        try:
            validator.validate(json.loads(json_minify(raw)))
        except ValidationError as e:
            raise web.HTTPError(400, str(e))

        # Write the raw data (comments included) to a file.
        path = _path(self.settings_dir, section_name, _file_extension, True)
        with open(path, "w") as fid:
            fid.write(raw)

        self.set_status(204)
项目:splunk_ta_ps4_f1_2016    作者:jonathanvarley    | 项目源码 | 文件源码
def test_unsuccessful_validation(self):
        error = ValidationError("I am an error!", instance=1)
        stdout, stderr = StringIO(), StringIO()
        exit_code = cli.run(
            {
                "validator": fake_validator([error]),
                "schema": {},
                "instances": [1],
                "error_format": "{error.instance} - {error.message}",
            },
            stdout=stdout,
            stderr=stderr,
        )
        self.assertFalse(stdout.getvalue())
        self.assertEqual(stderr.getvalue(), "1 - I am an error!")
        self.assertEqual(exit_code, 1)
项目:splunk_ta_ps4_f1_2016    作者:jonathanvarley    | 项目源码 | 文件源码
def test_unsuccessful_validation_multiple_instances(self):
        first_errors = [
            ValidationError("9", instance=1),
            ValidationError("8", instance=1),
        ]
        second_errors = [ValidationError("7", instance=2)]
        stdout, stderr = StringIO(), StringIO()
        exit_code = cli.run(
            {
                "validator": fake_validator(first_errors, second_errors),
                "schema": {},
                "instances": [1, 2],
                "error_format": "{error.instance} - {error.message}\t",
            },
            stdout=stdout,
            stderr=stderr,
        )
        self.assertFalse(stdout.getvalue())
        self.assertEqual(stderr.getvalue(), "1 - 9\t1 - 8\t2 - 7\t")
        self.assertEqual(exit_code, 1)
项目:splunk_ta_ps4_f1_2016    作者:jonathanvarley    | 项目源码 | 文件源码
def test_it_delegates_to_a_legacy_ref_resolver(self):
        """
        Legacy RefResolvers support only the context manager form of
        resolution.

        """

        class LegacyRefResolver(object):
            @contextmanager
            def resolving(this, ref):
                self.assertEqual(ref, "the ref")
                yield {"type" : "integer"}

        resolver = LegacyRefResolver()
        schema = {"$ref" : "the ref"}

        with self.assertRaises(ValidationError):
            self.validator_class(schema, resolver=resolver).validate(None)
项目:lcp-testing-tools    作者:edrlab    | 项目源码 | 文件源码
def test_validate_lcpl(self):
        """
        Validate an LCP license file
        """

        # validate the license using the JSON schema, includes:
        #   check the profile Value (basic or 1.0)
        #   check the encryption method (aes-cbc), user key (sha256) and signature algorithm (ecdsa-sha256)

        lcpl_json_schema_path = os.path.join(
            JSON_SCHEMA_DIR_PATH, 'lcpl_schema.json')

        with open(lcpl_json_schema_path) as schema_file:
            lcpl_json_schema = json.loads(schema_file.read())
            try:
                jsonschema.validate(self.lcpl, lcpl_json_schema)
            except jsonschema.ValidationError as err:
                raise TestSuiteRunningError(err)
项目:ingest-client    作者:jhuapl-boss    | 项目源码 | 文件源码
def validate_schema(self):
        """
        Method to validate the JSON data against the schema

        Args:

        Returns:
            str: An error message if schema validation fails

        """
        if not self.schema:
            raise ValueError("Schema has not been populated yet. Cannot validate.")

        try:
            jsonschema.validate(self.config, self.schema)
        except jsonschema.ValidationError as e:
            return e

        return None
项目:TA-SyncKVStore    作者:georgestarcher    | 项目源码 | 文件源码
def test_unsuccessful_validation(self):
        error = ValidationError("I am an error!", instance=1)
        stdout, stderr = StringIO(), StringIO()
        exit_code = cli.run(
            {
                "validator": fake_validator([error]),
                "schema": {},
                "instances": [1],
                "error_format": "{error.instance} - {error.message}",
            },
            stdout=stdout,
            stderr=stderr,
        )
        self.assertFalse(stdout.getvalue())
        self.assertEqual(stderr.getvalue(), "1 - I am an error!")
        self.assertEqual(exit_code, 1)
项目:TA-SyncKVStore    作者:georgestarcher    | 项目源码 | 文件源码
def test_unsuccessful_validation_multiple_instances(self):
        first_errors = [
            ValidationError("9", instance=1),
            ValidationError("8", instance=1),
        ]
        second_errors = [ValidationError("7", instance=2)]
        stdout, stderr = StringIO(), StringIO()
        exit_code = cli.run(
            {
                "validator": fake_validator(first_errors, second_errors),
                "schema": {},
                "instances": [1, 2],
                "error_format": "{error.instance} - {error.message}\t",
            },
            stdout=stdout,
            stderr=stderr,
        )
        self.assertFalse(stdout.getvalue())
        self.assertEqual(stderr.getvalue(), "1 - 9\t1 - 8\t2 - 7\t")
        self.assertEqual(exit_code, 1)
项目:TA-SyncKVStore    作者:georgestarcher    | 项目源码 | 文件源码
def test_it_delegates_to_a_legacy_ref_resolver(self):
        """
        Legacy RefResolvers support only the context manager form of
        resolution.

        """

        class LegacyRefResolver(object):
            @contextmanager
            def resolving(this, ref):
                self.assertEqual(ref, "the ref")
                yield {"type" : "integer"}

        resolver = LegacyRefResolver()
        schema = {"$ref" : "the ref"}

        with self.assertRaises(ValidationError):
            self.validator_class(schema, resolver=resolver).validate(None)
项目:cb-defense-splunk-app    作者:carbonblack    | 项目源码 | 文件源码
def test_unsuccessful_validation(self):
        error = ValidationError("I am an error!", instance=1)
        stdout, stderr = StringIO(), StringIO()
        exit_code = cli.run(
            {
                "validator": fake_validator([error]),
                "schema": {},
                "instances": [1],
                "error_format": "{error.instance} - {error.message}",
            },
            stdout=stdout,
            stderr=stderr,
        )
        self.assertFalse(stdout.getvalue())
        self.assertEqual(stderr.getvalue(), "1 - I am an error!")
        self.assertEqual(exit_code, 1)
项目:cb-defense-splunk-app    作者:carbonblack    | 项目源码 | 文件源码
def test_unsuccessful_validation_multiple_instances(self):
        first_errors = [
            ValidationError("9", instance=1),
            ValidationError("8", instance=1),
        ]
        second_errors = [ValidationError("7", instance=2)]
        stdout, stderr = StringIO(), StringIO()
        exit_code = cli.run(
            {
                "validator": fake_validator(first_errors, second_errors),
                "schema": {},
                "instances": [1, 2],
                "error_format": "{error.instance} - {error.message}\t",
            },
            stdout=stdout,
            stderr=stderr,
        )
        self.assertFalse(stdout.getvalue())
        self.assertEqual(stderr.getvalue(), "1 - 9\t1 - 8\t2 - 7\t")
        self.assertEqual(exit_code, 1)
项目:cb-defense-splunk-app    作者:carbonblack    | 项目源码 | 文件源码
def test_it_delegates_to_a_legacy_ref_resolver(self):
        """
        Legacy RefResolvers support only the context manager form of
        resolution.

        """

        class LegacyRefResolver(object):
            @contextmanager
            def resolving(this, ref):
                self.assertEqual(ref, "the ref")
                yield {"type" : "integer"}

        resolver = LegacyRefResolver()
        schema = {"$ref" : "the ref"}

        with self.assertRaises(ValidationError):
            self.validator_class(schema, resolver=resolver).validate(None)
项目:baroque    作者:baroquehq    | 项目源码 | 文件源码
def validate(evt, evttype):
        """Validates the content of an event against the JSON schema of its type.

        Args:
            evt (:obj:`baroque.entities.event.Event`): the event to be validated
            evttype (:obj:`baroque.entities.eventtype.EventType`): the type of
            the event that needs to be validated

        Returns:
            ``True`` if validation is OK, ``False`` otherwise

        """
        try:
            assert evt.type == evttype
            check(loads(evt.json()), loads(evttype.jsonschema))
            return True
        except (AssertionError, ValidationError):
            return False
项目:ceph-lcm    作者:Mirantis    | 项目源码 | 文件源码
def consume(self, value):
        if isinstance(value, list):
            value = {item["id"]: item["value"] for item in value}

        try:
            self.validator.validate(value)
        except jsonschema.ValidationError as exc:
            LOG.warning("Cannot validate hints: %s", exc)
            raise ValueError("Cannot validate hints") from exc

        values = {}
        for key, schema_value in self.schema.items():
            default_value = schema_value.get("default_value", None)
            values[key] = value.get(key, default_value)

        return values
项目:raml2doc    作者:openconnectivityfoundation    | 项目源码 | 文件源码
def generate_swagger(self):
        """
        conversion of the raml info into swagger

        :return:
        """
        try:
            parse_tree = ramlparser.load(self.inputname)
        except ValidationError as e:
            print ('validation error:', e.errors)
            print ("could not load file: error loading file")
            traceback.print_exc()
            return

        title = self.get_first_display_name(parse_tree)
        version = parse_tree.version
        self.swag_openfile(version, title)
        self.swag_add_resource(parse_tree)
        self.swag_add_generic_parameters(parse_tree)
        self.swag_add_definitions(parse_tree)
        self.swag_closefile()
        print ("swagger document saved..", self.swagger)
        self.swag_verify()
项目:mogan    作者:openstack    | 项目源码 | 文件源码
def check_schema(body, schema):
    """Ensure all necessary keys are present and correct in create body.

    Check that the user-specified create body is in the expected format and
    include the required information.

    :param body: create body
    :raises InvalidParameterValue: if validation of create body fails.
    """
    validator = jsonschema.Draft4Validator(
        schema, format_checker=jsonschema.FormatChecker())
    try:
        validator.validate(body)
    except jsonschema.ValidationError as exc:
        raise exception.InvalidParameterValue(_('Invalid create body: %s') %
                                              exc)
项目:falcon-example    作者:jmvrbanac    | 项目源码 | 文件源码
def validate(schema):
    def decorator(func):
        def wrapper(self, req, resp, *args, **kwargs):
            try:
                raw_json = req.stream.read()
                obj = json.loads(raw_json.decode('utf-8'))
            except Exception:
                raise falcon.HTTPBadRequest(
                    'Invalid data',
                    'Could not properly parse the provided data as JSON'
                )

            try:
                jsonschema.validate(obj, schema)
            except jsonschema.ValidationError as e:
                raise falcon.HTTPBadRequest(
                    'Failed data validation',
                    e.message
                )

            return func(self, req, resp, *args, parsed=obj, **kwargs)
        return wrapper
    return decorator
项目:core    作者:getavalon    | 项目源码 | 文件源码
def test_unsuccessful_validation(self):
        error = ValidationError("I am an error!", instance=1)
        stdout, stderr = StringIO(), StringIO()
        exit_code = cli.run(
            {
                "validator": fake_validator([error]),
                "schema": {},
                "instances": [1],
                "error_format": "{error.instance} - {error.message}",
            },
            stdout=stdout,
            stderr=stderr,
        )
        self.assertFalse(stdout.getvalue())
        self.assertEqual(stderr.getvalue(), "1 - I am an error!")
        self.assertEqual(exit_code, 1)
项目:core    作者:getavalon    | 项目源码 | 文件源码
def test_unsuccessful_validation_multiple_instances(self):
        first_errors = [
            ValidationError("9", instance=1),
            ValidationError("8", instance=1),
        ]
        second_errors = [ValidationError("7", instance=2)]
        stdout, stderr = StringIO(), StringIO()
        exit_code = cli.run(
            {
                "validator": fake_validator(first_errors, second_errors),
                "schema": {},
                "instances": [1, 2],
                "error_format": "{error.instance} - {error.message}\t",
            },
            stdout=stdout,
            stderr=stderr,
        )
        self.assertFalse(stdout.getvalue())
        self.assertEqual(stderr.getvalue(), "1 - 9\t1 - 8\t2 - 7\t")
        self.assertEqual(exit_code, 1)
项目:son-cli    作者:sonata-nfv    | 项目源码 | 文件源码
def validate(self, descriptor, schema_id):
        """
        Validate a descriptor against a schema template
        :param descriptor:
        :param schema_id:
        :return:
        """
        try:
            jsonschema.validate(descriptor, self.load_schema(schema_id))
            return True

        except ValidationError as e:
            log.error("Failed to validate Descriptor against schema '{}'"
                      .format(schema_id))
            self.error_msg = e.message
            log.error(e.message)
            return

        except SchemaError as e:
            log.error("Invalid Schema '{}'".format(schema_id))
            self.error_msg = e.message
            log.debug(e)
            return
项目:dependencies-resolver    作者:onfido    | 项目源码 | 文件源码
def resolve_dependencies(args):
    """Main function to handle the dependencies from the config file.
    The dependencies will be downloaded from the specified S3 repository
    and will be placed in the location provided for the specific
    dependency from the configuration file.

    :param args: The user's arguments supplied from the main function.
    """
    try:
        config_file = args.config
        with open(config_file, 'r') as json_file:
            dependencies_data = json.load(json_file)
        validate_schema(dependencies_data)
        validate_data(dependencies_data)
        download_dependencies(dependencies_data)
        sys.exit()
    except IOError:
        handle_exception('File could not be opened')
    except ValueError:
        handle_exception('File could not be parsed')
    except ValidationError:
        handle_exception('File could not be validated')
    except Exception:
        handle_exception('Unknown error occurred')
项目:openadms-node    作者:dabamos    | 项目源码 | 文件源码
def is_valid(self, data: Dict, schema_name: str) -> bool:
        """Validates data with JSON schemes and returns result.

        Args:
            data: The data.
            schema_name: The name of the schemes used for validation.

        Returns:
            True if data is valid, False if not.
        """
        if not self.has_schema(schema_name):
            self.logger.warning('JSON schemes "{}" not found'
                                .format(schema_name))
            return False

        try:
            schema = self._schema.get(schema_name)
            jsonschema.validate(data, schema)
        except jsonschema.ValidationError:
            return False

        return True
项目:swdestinydb-json-data    作者:fafranco82    | 项目源码 | 文件源码
def custom_check_card(self, card):
        validations = []
        #check foreing codes
        for collection in ["affiliation", "faction", "rarity", "type", "subtype"]:
            field = collection + "_code"
            if field in card and not card.get(field) in self.collections[collection]:
                validations.append("%s code '%s' does not exists in card '%s'" % (collection, card.get(field), card.get('code')))

        #check reprint of
        if 'reprint_of' in card and not card.get('reprint_of') in self.collections['card']:
            validations.append("Reprinted card %s does not exists" % (card.get('reprint_of')))

        #checks by type
        check_by_type_method = "custom_check_%s_card" % card.get('type_code')
        if hasattr(self, check_by_type_method) and callable(getattr(self, check_by_type_method)):
            validations.extend(getattr(self, check_by_type_method)(card))

        if validations:
            raise jsonschema.ValidationError("\n".join(["- %s" % v for v in validations]))
项目:deb-python-jsonschema    作者:openstack    | 项目源码 | 文件源码
def test_unsuccessful_validation(self):
        error = ValidationError("I am an error!", instance=1)
        stdout, stderr = StringIO(), StringIO()
        exit_code = cli.run(
            {
                "validator": fake_validator([error]),
                "schema": {},
                "instances": [1],
                "error_format": "{error.instance} - {error.message}",
            },
            stdout=stdout,
            stderr=stderr,
        )
        self.assertFalse(stdout.getvalue())
        self.assertEqual(stderr.getvalue(), "1 - I am an error!")
        self.assertEqual(exit_code, 1)
项目:deb-python-jsonschema    作者:openstack    | 项目源码 | 文件源码
def test_unsuccessful_validation_multiple_instances(self):
        first_errors = [
            ValidationError("9", instance=1),
            ValidationError("8", instance=1),
        ]
        second_errors = [ValidationError("7", instance=2)]
        stdout, stderr = StringIO(), StringIO()
        exit_code = cli.run(
            {
                "validator": fake_validator(first_errors, second_errors),
                "schema": {},
                "instances": [1, 2],
                "error_format": "{error.instance} - {error.message}\t",
            },
            stdout=stdout,
            stderr=stderr,
        )
        self.assertFalse(stdout.getvalue())
        self.assertEqual(stderr.getvalue(), "1 - 9\t1 - 8\t2 - 7\t")
        self.assertEqual(exit_code, 1)
项目:masakari    作者:openstack    | 项目源码 | 文件源码
def validate(self, *args, **kwargs):
        try:
            self.validator.validate(*args, **kwargs)
        except jsonschema.ValidationError as ex:
            if isinstance(ex.cause, exception.InvalidName):
                detail = ex.cause.format_message()
            elif len(ex.path) > 0:
                detail = _("Invalid input for field/attribute %(path)s."
                           " Value: %(value)s. %(message)s") % {
                    'path': ex.path.pop(), 'value': ex.instance,
                    'message': ex.message
                }
            else:
                detail = ex.message
            raise exception.ValidationError(detail=detail)
        except TypeError as ex:
            # NOTE: If passing non string value to patternProperties parameter,
            #       TypeError happens. Here is for catching the TypeError.
            detail = six.text_type(ex)
            raise exception.ValidationError(detail=detail)
项目:inspire-schemas    作者:inspirehep    | 项目源码 | 文件源码
def test_validate_raises_if_invalid_data():
    data = {
        'foo': 'bar',
    }
    schema = {
        '$schema': 'http://json-schema.org/schema#',
        'type': 'object',
        'properties': {
            'foo': {
                'type': 'integer'
            }
        }
    }

    with pytest.raises(ValidationError):
        utils.validate(data, schema)
项目:ofd    作者:yandex    | 项目源码 | 文件源码
def _validate_logic(self, doc):
        doc_name = next(iter(doc))
        # ????????, ??? ???? ???? ?? ?????? ????????? ????
        doc_timestamp = doc[doc_name].get('dateTime')
        if self.min_date and self.min_date.timestamp() > doc_timestamp:
            doc_date = datetime.datetime.fromtimestamp(doc_timestamp)
            raise ValidationError('Document timestamp ' + str(doc_date) + ' is less than min. allowed date ' +
                                  str(self.min_date))

        # ????????, ??? ??? ????? ???? "?? ????????" ?????? ?? 24 ???? ?????? UTC
        future = datetime.datetime.utcnow() + datetime.timedelta(hours=self.future_hours)
        if doc_timestamp > future.timestamp():
            doc_date = datetime.datetime.fromtimestamp(doc_timestamp)

            raise ValidationError('Document timestamp {} is greater than now for {} hours'
                                  .format(str(doc_date), str(self.future_hours)))
项目:falcon-swagger    作者:dutradda    | 项目源码 | 文件源码
def _build_object(cls, value, schema, nested_types, input_):
        if 'object' in nested_types:
            raise ModelBaseError('nested object was not allowed', input_=input_)

        properties = value.split('|')
        dict_obj = dict()
        nested_types.add('object')
        for prop in properties:
            key, value = prop.split(':')
            prop_schema = schema['properties'].get(key)
            if prop_schema is None:
                raise ValidationError("Invalid property '{}'".format(key),
                    instance=input_, schema=schema)

            dict_obj[key] = \
                cls._build_value(value, prop_schema, nested_types, input_)

        nested_types.discard('object')
        return dict_obj
项目:falcon-swagger    作者:dutradda    | 项目源码 | 文件源码
def put_by_uri_template(cls, req, resp):
        session, req_body, id_, kwargs = cls._get_context_values(req.context)
        req_body_copy = deepcopy(req_body)

        cls._update_dict(req_body, id_)
        objs = cls.update(session, req_body, ids=id_, **kwargs)

        if not objs:
            req_body = req_body_copy

            ambigous_keys = [
                kwa for kwa in id_ if kwa in req_body and req_body[kwa] != id_[kwa]]
            if ambigous_keys:
                body_schema = req.context.get('body_schema')
                raise ValidationError(
                    "Ambiguous value for '{}'".format(
                        "', '".join(ambigous_keys)),
                    instance={'body': req_body, 'uri': id_}, schema=body_schema)

            req.context['parameters']['body'] = req_body
            cls._insert(req, resp, with_update=True)
        else:
            resp.body = json.dumps(objs[0])
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def __prepare_json(status, op=None, schema=None, data=None, errors=None):
        """Prepare json structure for returning."""

        ret_json = {"status": status}

        if errors:
                if not isinstance(errors, list):
                        ret_json["errors"] = [errors]
                else:
                        ret_json["errors"] = errors
        if data:
                ret_json["data"] = data
        if op:
                op_schema = _get_pkg_output_schema(op)
                try:
                        jsonschema.validate(ret_json, op_schema)
                except jsonschema.ValidationError as e:
                        newret_json = {"status": EXIT_OOPS,
                            "errors": [{"reason": str(e)}]}
                        return newret_json
        if schema:
                ret_json["schema"] = schema

        return ret_json
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def validate(self, *args, **kwargs):
        try:
            self.validator.validate(*args, **kwargs)
        except jsonschema.ValidationError as ex:
            if isinstance(ex.cause, exception.InvalidName):
                detail = ex.cause.format_message()
            elif len(ex.path) > 0:
                # NOTE: For whole OpenStack message consistency, this error
                #       message has been written as the similar format of WSME.
                detail = _("Invalid input for field/attribute %(path)s."
                           " Value: %(value)s. %(message)s") % {
                               'path': ex.path.pop(), 'value': ex.instance,
                               'message': ex.message
                           }
            else:
                detail = ex.message
            raise exception.ValidationError(detail=detail)
        except TypeError as ex:
            # NOTE: If passing non string value to patternProperties parameter,
            #       TypeError happens. Here is for catching the TypeError.
            detail = six.text_type(ex)
            raise exception.ValidationError(detail=detail)