我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用webob.exc()。
def __exit__(self, ex_type, ex_value, ex_traceback): if not ex_value: return True if isinstance(ex_value, exception.NotAuthorized): raise Fault(webob.exc.HTTPForbidden(explanation=ex_value.msg)) elif isinstance(ex_value, exception.Invalid): raise Fault(exception.ConvertedException( code=ex_value.code, explanation=ex_value.msg)) elif isinstance(ex_value, TypeError): exc_info = (ex_type, ex_value, ex_traceback) LOG.error(_LE( 'Exception handling resource: %s'), ex_value, exc_info=exc_info) raise Fault(webob.exc.HTTPBadRequest()) elif isinstance(ex_value, Fault): LOG.info(_LI("Fault thrown: %s"), ex_value) raise ex_value elif isinstance(ex_value, webob.exc.HTTPException): LOG.info(_LI("HTTP exception thrown: %s"), ex_value) raise Fault(ex_value) # We didn't handle the exception return False
def validate_name_and_description(body): name = body.get('name') if name is not None: if isinstance(name, six.string_types): body['name'] = name.strip() try: _check_string_length(body['name'], 'Name', min_length=0, max_length=255) except exception.InvalidInput as error: raise webob.exc.HTTPBadRequest(explanation=error.msg) description = body.get('description') if description is not None: try: _check_string_length(description, 'Description', min_length=0, max_length=255) except exception.InvalidInput as error: raise webob.exc.HTTPBadRequest(explanation=error.msg)
def validate_string_length(value, entity_name, min_length=0, max_length=None, remove_whitespaces=False): """Check the length of specified string. :param value: the value of the string :param entity_name: the name of the string :param min_length: the min_length of the string :param max_length: the max_length of the string :param remove_whitespaces: True if trimming whitespaces is needed else False """ if isinstance(value, six.string_types) and remove_whitespaces: value = value.strip() try: _check_string_length(value, entity_name, min_length=min_length, max_length=max_length) except exception.InvalidInput as error: raise webob.exc.HTTPBadRequest(explanation=error.msg)
def setUp(self): super(TestKeystoneMiddlewareRoles, self).setUp() @webob.dec.wsgify() def role_check_app(req): context = req.environ['masakari.context'] if "knight" in context.roles and "bad" not in context.roles: return webob.Response(status="200 Role Match") elif not context.roles: return webob.Response(status="200 No Roles") else: raise webob.exc.HTTPBadRequest("unexpected role header") self.middleware = ( masakari.api.auth.MasakariKeystoneContext(role_check_app)) self.request = webob.Request.blank('/') self.request.headers['X_USER'] = 'testuser' self.request.headers['X_TENANT_ID'] = 'testtenantid' self.request.headers['X_AUTH_TOKEN'] = 'testauthtoken' self.request.headers['X_SERVICE_CATALOG'] = jsonutils.dumps({}) self.roles = "pawn, knight, rook"
def exception_from_response(response): """Manage exceptions from HTTP response. # Copyright 2014 CSIC Convert an OpenStack V2 Fault into a webob exception. # Copyright 2015 LIP Convert an OpenStack V2 Fault into a webob exception. Since we are calling the OpenStack API we should process the Faults produced by them. Extract the Fault information according to [1] and convert it back to a webob exception. [1] http://docs.openstack.org/developer/nova/v2/faults.html :param response: a webob.Response containing an exception :returns: a webob.exc.exception object """ try: code = response.status_int title = response.json_body['results'] except Exception: code = 500 title = "Unknown error happened processing response" return manage_http_exception(code, title)
def translate_exception(req, e): """Translates all translatable elements of the given exception.""" # The RequestClass attribute in the webob.dec.wsgify decorator # does not guarantee that the request object will be a particular # type; this check is therefore necessary. if not hasattr(req, "best_match_language"): return e locale = req.best_match_language() if isinstance(e, webob.exc.HTTPError): e.explanation = i18n.translate(e.explanation, locale) e.detail = i18n.translate(e.detail, locale) if getattr(e, 'body_template', None): e.body_template = i18n.translate(e.body_template, locale) return e
def setUp(self): super(TestKeystoneMiddlewareRoles, self).setUp() @webob.dec.wsgify() def role_check_app(req): context = req.environ['nova.context'] if "knight" in context.roles and "bad" not in context.roles: return webob.Response(status="200 Role Match") elif context.roles == ['']: return webob.Response(status="200 No Roles") else: raise webob.exc.HTTPBadRequest("unexpected role header") self.middleware = nova.api.auth.NovaKeystoneContext(role_check_app) self.request = webob.Request.blank('/') self.request.headers['X_USER'] = 'testuser' self.request.headers['X_TENANT_ID'] = 'testtenantid' self.request.headers['X_AUTH_TOKEN'] = 'testauthtoken' self.request.headers['X_SERVICE_CATALOG'] = jsonutils.dumps({}) self.roles = "pawn, knight, rook"
def test_400_fault_json(self): # Test fault serialized to JSON via file-extension and/or header. requests = [ webob.Request.blank('/.json'), webob.Request.blank('/', headers={"Accept": "application/json"}), ] for request in requests: fault = wsgi.Fault(webob.exc.HTTPBadRequest(explanation='scram')) response = request.get_response(fault) expected = { "badRequest": { "message": "scram", "code": 400, }, } actual = jsonutils.loads(response.body) self.assertEqual(response.content_type, "application/json") self.assertEqual(expected, actual)
def test_check_img_metadata_properties_quota_inv_metadata(self): ctxt = utils.get_test_admin_context() metadata1 = {"a" * 260: "value"} self.assertRaises(webob.exc.HTTPBadRequest, common.check_img_metadata_properties_quota, ctxt, metadata1) metadata2 = {"": "value"} self.assertRaises(webob.exc.HTTPBadRequest, common.check_img_metadata_properties_quota, ctxt, metadata2) metadata3 = "invalid metadata" self.assertRaises(webob.exc.HTTPBadRequest, common.check_img_metadata_properties_quota, ctxt, metadata3) metadata4 = None self.assertIsNone(common.check_img_metadata_properties_quota(ctxt, metadata4)) metadata5 = {} self.assertIsNone(common.check_img_metadata_properties_quota(ctxt, metadata5))
def show(self, req, id): """Return data for the given key name.""" context = req.environ['nova.context'] authorize(context, action='show') try: # The return object needs to be a dict in order to pop the 'type' # field, since it is incompatible with API version <= 2.1. keypair = self.api.get_key_pair(context, context.user_id, id) keypair = self._filter_keypair(keypair, created_at=True, deleted=True, deleted_at=True, id=True, user_id=True, updated_at=True) except exception.KeypairNotFound as exc: raise webob.exc.HTTPNotFound(explanation=exc.format_message()) return {'keypair': keypair}
def __call__(self, environ, start_response): r"""Subclasses will probably want to implement __call__ like this: @webob.dec.wsgify(RequestClass=Request) def __call__(self, req): # Any of the following objects work as responses: # Option 1: simple string res = 'message\n' # Option 2: a nicely formatted HTTP exception page res = exc.HTTPForbidden(explanation='Nice try') # Option 3: a webob Response object (in case you need to play with # headers, or you want to be treated like an iterable) res = Response(); res.app_iter = open('somefile') # Option 4: any wsgi app to be run next res = self.application # Option 5: you can get a Response object for a wsgi app, too, to # play with headers etc res = req.get_response(self.application) # You can then just return your response... return res # ... or set req.response and return None. req.response = res See the end of http://pythonpaste.org/webob/modules/dec.html for more info. """ raise NotImplementedError(_('You must implement __call__'))
def assert_valid_body(body, entity_name): # NOTE: After v1 api is deprecated need to merge 'is_valid_body' and # 'assert_valid_body' in to one method. Right now it is not # possible to modify 'is_valid_body' to raise exception because # in case of V1 api when 'is_valid_body' return False, # 'HTTPUnprocessableEntity' exception is getting raised and in # V2 api 'HTTPBadRequest' exception is getting raised. if not Controller.is_valid_body(body, entity_name): raise webob.exc.HTTPBadRequest( explanation=_("Missing required element '%s' in " "request body.") % entity_name)
def __init__(self, exception): """Create a Fault for the given webob.exc.exception.""" self.wrapped_exc = exception self.status_int = exception.status_int
def __init__(self, message, details, retry_time): """Initialize new `OverLimitFault` with relevant information.""" hdrs = OverLimitFault._retry_after(retry_time) self.wrapped_exc = webob.exc.HTTPRequestEntityTooLarge(headers=hdrs) self.content = { "overLimitFault": { "code": self.wrapped_exc.status_int, "message": message, "details": details, }, }
def __exit__(self, ex_type, ex_value, ex_traceback): if not ex_value: return True if isinstance(ex_value, exception.NotAuthorized): msg = six.text_type(ex_value) raise Fault(webob.exc.HTTPForbidden(explanation=msg)) elif isinstance(ex_value, exception.VersionNotFoundForAPIMethod): raise elif isinstance(ex_value, exception.Invalid): raise Fault(exception.ConvertedException( code=ex_value.code, explanation=six.text_type(ex_value))) elif isinstance(ex_value, TypeError): exc_info = (ex_type, ex_value, ex_traceback) LOG.error('Exception handling resource: %s', ex_value, exc_info=exc_info) raise Fault(webob.exc.HTTPBadRequest()) elif isinstance(ex_value, Fault): LOG.info("Fault thrown: %s", six.text_type(ex_value)) raise ex_value elif isinstance(ex_value, webob.exc.HTTPException): LOG.info("HTTP exception thrown: %s", six.text_type(ex_value)) raise Fault(ex_value) # We didn't handle the exception return False
def __call__(self, request): """WSGI method that controls (de)serialization and method dispatch.""" LOG.info("%(method)s %(url)s" % {"method": request.method, "url": request.url}) if self.support_api_request_version: # Set the version of the API requested based on the header try: request.set_api_version_request() except exception.InvalidAPIVersionString as e: return Fault(webob.exc.HTTPBadRequest( explanation=six.text_type(e))) except exception.InvalidGlobalAPIVersion as e: return Fault(webob.exc.HTTPNotAcceptable( explanation=six.text_type(e))) # Identify the action, its arguments, and the requested # content type action_args = self.get_action_args(request.environ) action = action_args.pop('action', None) content_type, body = self.get_body(request) accept = request.best_match_content_type() # NOTE(Vek): Splitting the function up this way allows for # auditing by external tools that wrap the existing # function. If we try to audit __call__(), we can # run into troubles due to the @webob.dec.wsgify() # decorator. return self._process_stack(request, action, action_args, content_type, body, accept)
def dispatch(self, method, request, action_args): """Dispatch a call to the action-specific method.""" try: return method(req=request, **action_args) except exception.VersionNotFoundForAPIMethod: # We deliberately don't return any message information # about the exception to the user so it looks as if # the method is simply not implemented. return Fault(webob.exc.HTTPNotFound())
def authorize(arg): """Decorator for checking the policy on API methods. Add this decorator to any API method which takes a request object as the first parameter and belongs to a class which inherits from wsgi.Controller. The class must also have a class member called 'resource_name' which specifies the resource for the policy check. Can be used in any of the following forms @authorize @authorize('my_action_name') :param arg: Can either be the function being decorated or a str containing the 'action' for the policy check. If no action name is provided, the function name is assumed to be the action name. """ action_name = None def decorator(f): @functools.wraps(f) def wrapper(self, req, *args, **kwargs): action = action_name or f.__name__ context = req.environ['meteos.context'] try: policy.check_policy(context, self.resource_name, action) except exception.PolicyNotAuthorized: raise webob.exc.HTTPForbidden() return f(self, req, *args, **kwargs) return wrapper if callable(arg): return decorator(arg) else: action_name = arg return decorator
def validate_update(self, body, status_attr='status'): update = {} try: update[status_attr] = body[status_attr] except (TypeError, KeyError): msg = _("Must specify '%s'") % status_attr raise webob.exc.HTTPBadRequest(explanation=msg) if update[status_attr] not in self.valid_statuses[status_attr]: expl = (_("Invalid state. Valid states: %s.") % ", ".join(six.text_type(i) for i in self.valid_statuses[status_attr])) raise webob.exc.HTTPBadRequest(explanation=expl) return update
def _force_delete(self, req, id, body): """Delete a resource, bypassing the check for status.""" context = req.environ['meteos.context'] try: resource = self._get(context, id) except exception.NotFound as e: raise webob.exc.HTTPNotFound(six.text_type(e)) self._delete(context, resource, force=True) return webob.Response(status_int=202)
def test_import_string(self): self.assertEqual(webapp2.import_string('webob.exc'), webob.exc) self.assertEqual(webapp2.import_string('webob'), webob) self.assertEqual(webapp2.import_string('asdfg', silent=True), None) self.assertEqual( webapp2.import_string('webob.asdfg', silent=True), None ) self.assertRaises( webapp2.ImportStringError, webapp2.import_string, 'asdfg') self.assertRaises( webapp2.ImportStringError, webapp2.import_string, 'webob.asdfg')
def _validate_input_request(self, payload): for key in payload: if " " in payload.get(key): LOG.debug('Input payload contain white spaces %s' % str(payload)) msg = _('%s contains white spaces') % key raise webob.exc.HTTPBadRequest(explanation=msg)
def ord_notifier(self, **args): raise webob.exc.HTTPNotFound
def __exit__(self, ex_type, ex_value, ex_traceback): if not ex_value: return True if isinstance(ex_value, exception.Forbidden): raise Fault(webob.exc.HTTPForbidden( explanation=ex_value.format_message())) elif isinstance(ex_value, exception.VersionNotFoundForAPIMethod): raise elif isinstance(ex_value, exception.Invalid): raise Fault(exception.ConvertedException( code=ex_value.code, explanation=ex_value.format_message())) elif isinstance(ex_value, TypeError): exc_info = (ex_type, ex_value, ex_traceback) LOG.error('Exception handling resource: %s', ex_value, exc_info=exc_info) raise Fault(webob.exc.HTTPBadRequest()) elif isinstance(ex_value, Fault): LOG.info("Fault thrown: %s", ex_value) raise ex_value elif isinstance(ex_value, webob.exc.HTTPException): LOG.info("HTTP exception thrown: %s", ex_value) raise Fault(ex_value) # We didn't handle the exception return False
def __call__(self, request): """WSGI method that controls (de)serialization and method dispatch.""" if self.support_api_request_version: # Set the version of the API requested based on the header try: request.set_api_version_request() except exception.InvalidAPIVersionString as e: return Fault(webob.exc.HTTPBadRequest( explanation=e.format_message())) except exception.InvalidGlobalAPIVersion as e: return Fault(webob.exc.HTTPNotAcceptable( explanation=e.format_message())) # Identify the action, its arguments, and the requested # content type action_args = self.get_action_args(request.environ) action = action_args.pop('action', None) # NOTE: we filter out InvalidContentTypes early so we # know everything is good from here on out. try: content_type, body = self.get_body(request) accept = request.best_match_content_type() except exception.InvalidContentType: msg = _("Unsupported Content-Type") return Fault(webob.exc.HTTPUnsupportedMediaType(explanation=msg)) # NOTE: Splitting the function up this way allows for # auditing by external tools that wrap the existing # function. If we try to audit __call__(), we can # run into troubles due to the @webob.dec.wsgify() # decorator. return self._process_stack(request, action, action_args, content_type, body, accept)
def _get_method(self, request, action, content_type, body): """Look up the action-specific method and its extensions.""" # Look up the method try: if not self.controller: meth = getattr(self, action) else: meth = getattr(self.controller, action) except AttributeError: if (not self.wsgi_actions or action not in _ROUTES_METHODS + ['action']): if self.controller: msg = _("The request method: '%(method)s' with action: " "'%(action)s' is not allowed on this " "resource") % { 'method': request.method, 'action': action } raise webob.exc.HTTPMethodNotAllowed( explanation=msg, body_template='${explanation}') # Propagate the error raise else: return meth, self.wsgi_extensions.get(action, []) if action == 'action': action_name = action_peek(body) else: action_name = action # Look up the action method return (self.wsgi_actions[action_name], self.wsgi_action_extensions.get(action_name, []))
def manage_http_exception(code, message): exc = default_exceptions.get(code, webob.exc.HTTPInternalServerError) return exc("%s" % message)
def __init__(self, exc=None, message=None, code=None): details = get_exception_details(exc, message, code) self.message = details['message'] self.code = details['code'] make_log("debug", message)
def __init__(self, message, exc=None, code=400): super(ParseException, self).__init__( exc, message, code )
def __init__(self, message, exc=None, code=401): super(UserCredentialsException, self).__init__( exc, message, code ) self.message = ("User Credentials Exception: %s " % self.message)
def __init__(self, message, exc=None, code=None): super(ConfigurationException, self).__init__( exc, message, code ) self.message = ("Configuration Exception: %s " % self.message)
def __init__(self, exc=None, message=None, code=None): super(DockerException, self).__init__( exc, message, code ) self.message = ("Error: %s " % self.message)
def __call__(self, req): serializer = wsgi.JSONResponseSerializer() resp = webob.Response(request=req) default_webob_exc = webob.exc.HTTPInternalServerError() resp.status_code = self.error.get('code', default_webob_exc.code) serializer.default(resp, self.error) return resp
def __call__(self, req): try: return req.get_response(self.application) except Exception as exc: LOG.exception(exc) return req.get_response(Fault(self._error(exc)))
def reject(self, req, allowed_methods, *args, **kwargs): LOG.debug("The method %s is not allowed for this resource", req.environ['REQUEST_METHOD']) raise webob.exc.HTTPMethodNotAllowed( headers=[('Allow', allowed_methods)])
def _dispatch(req): """Called by self._router after matching the incoming request to a route and putting the information into req.environ. Either returns 404, 501, or the routed WSGI app's response. """ match = req.environ['wsgiorg.routing_args'][1] if not match: implemented_http_methods = ['GET', 'HEAD', 'POST', 'PUT', 'DELETE', 'PATCH'] if req.environ['REQUEST_METHOD'] not in implemented_http_methods: return webob.exc.HTTPNotImplemented() else: return webob.exc.HTTPNotFound() app = match['controller'] return app
def get_content_range(self): """Return the `Range` in a request.""" range_str = self.headers.get('Content-Range') if range_str is not None: range_ = webob.byterange.ContentRange.parse(range_str) if range_ is None: msg = _('Malformed Content-Range header: %s') % range_str raise webob.exc.HTTPBadRequest(explanation=msg) return range_
def test_413_fault_json(self): # Test fault serialized to JSON via file-extension and/or header. requests = [ webob.Request.blank('/.json'), webob.Request.blank('/', headers={"Accept": "application/json"}), ] for request in requests: exc = webob.exc.HTTPRequestEntityTooLarge # NOTE(aloga): we intentionally pass an integer for the # 'Retry-After' header. It should be then converted to a str fault = wsgi.Fault(exc(explanation='sorry', headers={'Retry-After': 4})) response = request.get_response(fault) expected = { "overLimit": { "message": "sorry", "code": 413, "retryAfter": "4", }, } actual = jsonutils.loads(response.body) self.assertEqual(response.content_type, "application/json") self.assertEqual(expected, actual)
def test_429_fault_json(self): # Test fault serialized to JSON via file-extension and/or header. requests = [ webob.Request.blank('/.json'), webob.Request.blank('/', headers={"Accept": "application/json"}), ] for request in requests: exc = webob.exc.HTTPTooManyRequests # NOTE(aloga): we intentionally pass an integer for the # 'Retry-After' header. It should be then converted to a str fault = wsgi.Fault(exc(explanation='sorry', headers={'Retry-After': 4})) response = request.get_response(fault) expected = { "overLimit": { "message": "sorry", "code": 429, "retryAfter": "4", }, } actual = jsonutils.loads(response.body) self.assertEqual(response.content_type, "application/json") self.assertEqual(expected, actual)
def test_fault_has_status_int(self): # Ensure the status_int is set correctly on faults. fault = wsgi.Fault(webob.exc.HTTPBadRequest(explanation='what?')) self.assertEqual(fault.status_int, 400)