Python jsonschema 模块,FormatChecker() 实例源码

我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用jsonschema.FormatChecker()

项目:ceph-lcm    作者:Mirantis    | 项目源码 | 文件源码
def require_schema(schema):
    """This decorator verifies that request JSON matches given JSONSchema.

    http://json-schema.org
    """

    validator = jsonschema.Draft4Validator(
        schema,
        format_checker=jsonschema.FormatChecker()
    )

    def outer_decorator(func):
        @functools.wraps(func)
        def inner_decorator(self, *args, **kwargs):
            errors = validator.iter_errors(self.request_json)
            errors = [err.message for err in errors]

            if errors:
                LOG.warning("Cannot validate request: %s", errors)
                raise exceptions.InvalidJSONError(errors)

            return func(self, *args, **kwargs)

        return inner_decorator
    return outer_decorator
项目:mogan    作者:openstack    | 项目源码 | 文件源码
def check_schema(body, schema):
    """Ensure all necessary keys are present and correct in create body.

    Check that the user-specified create body is in the expected format and
    include the required information.

    :param body: create body
    :raises InvalidParameterValue: if validation of create body fails.
    """
    validator = jsonschema.Draft4Validator(
        schema, format_checker=jsonschema.FormatChecker())
    try:
        validator.validate(body)
    except jsonschema.ValidationError as exc:
        raise exception.InvalidParameterValue(_('Invalid create body: %s') %
                                              exc)
项目:flusk    作者:dimmg    | 项目源码 | 文件源码
def validate_schema(payload, schema):
    """Validates the payload against a
    defined json schema for the requested
    endpoint.

    :param payload: incoming request data
    :type payload: dict
    :param schema: the schema the request payload should
                   be validated against
    :type schema: .json file
    :returns: errors if any
    :rtype: list
    """
    errors = []
    validator = jsonschema.Draft4Validator(schema,
                                           format_checker=jsonschema.FormatChecker())
    for error in sorted(validator.iter_errors(payload), key=str):
        errors.append(error.message)

    return errors
项目:singer-tools    作者:singer-io    | 项目源码 | 文件源码
def add(self, message):
        if isinstance(message, singer.RecordMessage):
            stream = self.ensure_stream(message.stream)
            if stream.latest_schema:
                validator_fn = extend_with_default(Draft4Validator)
                validator = validator_fn(
                    stream.latest_schema, format_checker=FormatChecker())
                validator.validate(copy.deepcopy(message.record))
            else:
                print('I saw a record for stream {} before the schema'.format(
                    message.stream))
                exit(1)
            stream.num_records += 1

        elif isinstance(message, singer.SchemaMessage):
            stream = self.ensure_stream(message.stream)
            stream.num_schemas += 1
            stream.latest_schema = message.schema

        elif isinstance(message, singer.StateMessage):
            self.latest_state = message.value
            self.num_states += 1
项目:python-ecsclient    作者:EMCECS    | 项目源码 | 文件源码
def is_valid_response(response, schema):
    """
    Returns True if the response validates with the schema, False otherwise

    :param response: The response returned by the API
    :param schema: The schema to validate with
    :returns: True if the response validates with the schema, False otherwise
    """
    try:
        validate(response, schema, format_checker=FormatChecker())
        return True
    except Exception as e:
        log.warning("Response is not valid: %s" % (e,))
        return False
项目:drift    作者:dgnorth    | 项目源码 | 文件源码
def validate(instance, schema, cls=None, *args, **kwargs):
    """
    Calls jsonschema.validate() with the arguments.
    """
    format_checker = FormatChecker()
    _validate(instance, schema, cls, *args, format_checker=format_checker, **kwargs)
项目:zun    作者:openstack    | 项目源码 | 文件源码
def __init__(self, schema, is_body=True):
        self.is_body = is_body
        validators = {
            'minimum': self._validate_minimum,
            'maximum': self._validate_maximum
        }
        validator_cls = jsonschema.validators.extend(self.validator_org,
                                                     validators)
        fc = jsonschema.FormatChecker()
        self.validator = validator_cls(schema, format_checker=fc)
项目:bbtornado    作者:bakkenbaeck    | 项目源码 | 文件源码
def validate_json(json_data,
                  json_schema=None,
                  json_example=None,
                  validator_cls=None,
                  format_checker=jsonschema.FormatChecker(),
                  on_empty_404=False):
    # if output is empty, auto return the error 404.
    if not json_data and on_empty_404:
        raise JsonError(404, "Not found.")

    if json_schema is not None:
        json_data = _to_json(json_data)
        # We wrap output in an object before validating in case
        # output is a string (and ergo not a validatable JSON
        # object)
        jsonschema.validate(
            {
                "result": json_data
            },
            {
                "type": "object",
                "properties": {
                    "result": json_schema
                },
                "required": ["result"]
            },
            cls=validator_cls,
            format_checker=format_checker
        )

    return json_data
项目:valence    作者:openstack    | 项目源码 | 文件源码
def __init__(self, name):
        self.name = name
        self.schema = schemas.SCHEMAS.get(name)
        checker = jsonschema.FormatChecker()
        self.validator = jsonschema.Draft4Validator(self.schema,
                                                    format_checker=checker)
项目:daisy    作者:opnfv    | 项目源码 | 文件源码
def _validate(data, schema):
    v = Draft4Validator(schema, format_checker=FormatChecker())
    errors = sorted(v.iter_errors(data), key=lambda e: e.path)
    return errors
项目:music_recommend    作者:YeEmrick    | 项目源码 | 文件源码
def validate(self, json, schema_name):
        """
        ????
        :param json: type(str) ??json
        :param schema_name: type(str) schema??
        :return: (boolean, dict)
                ???????? (True, );??????? (False, err)
                err = {
                    "key_name":"",
                    "key_path":"",
                    "msg": ""
                }
        :type json: dict
        :type schema_name: str
        :rtype (bool, str)
        """
        err = {
            "key_name": "",
            "key_path": "",
            "msg": ""
        }
        try:
            jsonschema.validate(json, self.schema[schema_name], format_checker=jsonschema.FormatChecker())
            return True, err
        except ValidationError, e:
            key_name = ""
            if e.path:
                key_name = e.path.pop().replace(".", "")
            key_path = str(list(e.path)).replace("[", "").replace("]", "").replace(", ", ".").replace("'", "")
            msg = e.message
            err['key_name'] = key_name
            err['key_path'] = key_path
            err['msg'] = msg
            return False, err
        except SchemaError:
            raise SchemaError("invalid schema in validator[%s], please checking" % schema_name)
        except KeyError:
            raise ValueError("not found schema name [%s] in schema config" % schema_name)
项目:acceptable    作者:canonical-ols    | 项目源码 | 文件源码
def validate(payload, schema):
    """Validate `payload` against `schema`, returning an error list.

    jsonschema provides lots of information in it's errors, but it can be a bit
    of work to extract all the information.
    """
    v = jsonschema.Draft4Validator(
        schema, format_checker=jsonschema.FormatChecker())
    error_list = []
    for error in v.iter_errors(payload):
        message = error.message
        location = '/' + '/'.join([str(c) for c in error.absolute_path])
        error_list.append(message + ' at ' + location)
    return error_list
项目:masakari    作者:openstack    | 项目源码 | 文件源码
def __init__(self, schema, relax_additional_properties=False):
        validators = {
            'minimum': self._validate_minimum,
            'maximum': self._validate_maximum,
        }
        if relax_additional_properties:
            validators[
                'additionalProperties'] = _soft_validate_additional_properties

        validator_cls = jsonschema.validators.extend(self.validator_org,
                                                     validators)
        format_checker = FormatChecker()
        self.validator = validator_cls(schema, format_checker=format_checker)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def __init__(self, schema, relax_additional_properties=False):
        validators = {
            'minimum': self._validate_minimum,
            'maximum': self._validate_maximum,
        }
        if relax_additional_properties:
            validators[
                'additionalProperties'] = _soft_validate_additional_properties

        validator_cls = jsonschema.validators.extend(self.validator_org,
                                                     validators)
        format_checker = FormatChecker()
        self.validator = validator_cls(schema, format_checker=format_checker)
项目:target-stitch    作者:singer-io    | 项目源码 | 文件源码
def handle_batch(self, messages, schema, key_names, bookmark_names=None): # pylint: disable=no-self-use,unused-argument
        '''Handles messages by validating them against schema.'''
        schema = float_to_decimal(schema)
        validator = Draft4Validator(schema, format_checker=FormatChecker())
        for i, message in enumerate(messages):
            if isinstance(message, singer.RecordMessage):
                data = float_to_decimal(message.record)
                validator.validate(data)
                if key_names:
                    for k in key_names:
                        if k not in data:
                            raise TargetStitchException(
                                'Message {} is missing key property {}'.format(
                                    i, k))
        LOGGER.info('Batch is valid')
项目:lcp-testing-tools    作者:edrlab    | 项目源码 | 文件源码
def test_fetch_license(self):
        """ Fetch a license from the URL found in the status doc, then validates it """

        try:
            license = next((l for l in self.lsd['links'] if l['rel'] == 'license'))
        except StopIteration as err:
            raise TestSuiteRunningError(
                "Missing a 'license' link in the status document")

        license_url = license['href']
        LOGGER.debug("Fetch license at url %s:", license_url)   

        # fetch the license
        r = requests.get(license_url)
        try:
            self.lcpl = r.json()            
        except ValueError as err:
            LOGGER.debug(r.text)
            raise TestSuiteRunningError("Malformed JSON License Document")
        LOGGER.debug("The License is available")   

        # validate the license
        lcpl_json_schema_path = os.path.join(
            JSON_SCHEMA_DIR_PATH, 'lcpl_schema.json')

        with open(lcpl_json_schema_path) as schema_file:
            lcpl_json_schema = json.loads(schema_file.read())
            try:
                jsonschema.validate(self.lcpl, lcpl_json_schema, format_checker=jsonschema.FormatChecker())
            except jsonschema.ValidationError as err:
                raise TestSuiteRunningError(err)

        LOGGER.debug("The up to date License is available and valid")

        # display some license values
        LOGGER.info("date issued {}, updated {}".format(
            self.lcpl['issued'], self.lcpl['updated'] if "updated" in self.lcpl else "never"))
        LOGGER.info("rights print {}, copy {}".format(
            self.lcpl['rights']['print'], self.lcpl['rights']['copy']))
        LOGGER.info("rights start {}, end {}".format(
            self.lcpl['rights']['start'] if "start" in self.lcpl['rights'] else "none", 
            self.lcpl['rights']['end'] if "end" in self.lcpl['rights'] else "none"))