我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用pyramid.httpexceptions.HTTPUnauthorized()。
def etalab_get_discussions(request): # According to the Etalab API specification, an Instance object must have the following fields: # - url: we send discussion.get_url(). Note: a discussion may have 2 URLs (HTTP and HTTPS). For this, see discussion.get_discussion_urls() # - name: we use discussion.topic # - adminEmail: email of the discussion creator, who made the API request to create a discusison (so in our case the field will not show if the discussion has been created by another mean) # - adminName: name of this guy, also provided in the discussion creation request (so in our case the field will not show if the discussion has been created by another mean) # - createdAt: creation date # We also send the following optional fields: # - id: this field is not in specification's optional nor mandatory fields # - status: "running" # - metadata: metadata.creation_date => will probably be renamed, see above view = "etalab" user_id = authenticated_userid(request) or Everyone permissions = request.permissions if P_READ not in permissions: raise HTTPUnauthorized() discussions = discussions_with_access(user_id) return {"items": [discussion.generic_json(view, user_id, permissions) for discussion in discussions]}
def userid_request_property(request): """ Used for the userid property on weasyl requests. """ api_token = request.headers.get('X_WEASYL_API_KEY') authorization = request.headers.get('AUTHORIZATION') if api_token is not None: # TODO: If reification of userid becomes an issue (e.g. because of userid changing after sign-in) revisit this. # It's possible that we don't need to reify the entire property, but just cache the result of this query in a # cache on arguments inner function. userid = d.engine.scalar("SELECT userid FROM api_tokens WHERE token = %(token)s", token=api_token) if not userid: raise HTTPUnauthorized(www_authenticate=('Weasyl-API-Key', 'realm="Weasyl"')) return userid elif authorization: from weasyl.oauth2 import get_userid_from_authorization userid = get_userid_from_authorization(request) if not userid: raise HTTPUnauthorized(www_authenticate=('Bearer', 'realm="Weasyl" error="invalid_token"')) return userid else: userid = request.weasyl_session.userid return 0 if userid is None else userid
def login(request): """the login view """ logger.debug('login start') login_name = request.params.get('login', '') password = request.params.get('password', '') # get the user again (first got it in validation) from stalker import User from sqlalchemy import or_ user = User.query \ .filter(or_(User.login == login_name, User.email == login_name))\ .first() if user and user.check_password(password): logger.debug('Login successful!') from pyramid.security import remember headers = remember(request, login_name) from stalker_pyramid.views.user import UserViews user_view = UserViews(request) user_view.entity_id = user.id response = user_view.get_entity() response.headers = headers return response else: logger.debug('Bad Login') from pyramid.httpexceptions import HTTPUnauthorized return HTTPUnauthorized(detail='Bad Login')
def forbidden_view(request): response = HTTPUnauthorized() response.headers.update(forget(request)) return response # Support the silly auth schemes that Subsonic has
def basic_challenge(request): response = HTTPUnauthorized() response.headers.update(forget(request)) return response
def __setitem__(self, key, value): if key not in Preferences.preference_data_key_set: raise KeyError("Unknown property") pref_data = self.dprefs.get_preference_data() req_permission = pref_data.get(key, {}).get( self.ALLOW_OVERRIDE, False) if (not req_permission) or not user_has_permission( self.target.id if self.target else None, self.user_id, req_permission): raise HTTPUnauthorized("Cannot edit") self.dprefs.validate(key, value) super(UserPreferenceCollection, self).__setitem__(key, value)
def safe_del(self, key, permissions=(P_READ,)): if not self.can_edit(key, permissions): raise HTTPUnauthorized("Cannot delete "+key) del self[key]
def safe_set(self, key, value, permissions=(P_READ,)): if not self.can_edit(key, permissions): raise HTTPUnauthorized("Cannot edit "+key) self[key] = value
def delete_post_instance(request): # Users who are allowed to delete (actually tombstone) a Post instance: # - user who is the author of the Post instance and who has the P_DELETE_MY_POST permission in this discussion # - user who has the P_DELETE_POST permission in this discussion ctx = request.context user_id = authenticated_userid(request) or Everyone permissions = ctx.get_permissions() instance = ctx._instance allowed = False if (user_id == instance.creator_id and P_DELETE_MY_POST in permissions) or (P_DELETE_POST in permissions): allowed = True if not allowed: raise HTTPUnauthorized() # Remove extracts associated to this post extracts_to_remove = instance.db.query(Extract).filter(Extract.content_id == instance.id).all() number_of_extracts = len(extracts_to_remove) for extract in extracts_to_remove: extract.delete() if user_id == instance.creator_id and P_DELETE_MY_POST in permissions: cause = PublicationStates.DELETED_BY_USER elif P_DELETE_POST in permissions: cause = PublicationStates.DELETED_BY_ADMIN instance.delete_post(cause) return { "result": "Post has been successfully deleted.", "removed_extracts": number_of_extracts }
def raise_if_cannot_moderate(request): ctx = request.context user_id = authenticated_userid(request) if not user_id: raise HTTPUnauthorized() permissions = ctx.get_permissions() if P_MODERATE not in permissions: raise HTTPUnauthorized()
def as_collection(self, parent_instance): from pyramid.threadlocal import get_current_request from pyramid.httpexceptions import HTTPUnauthorized from assembl.models.user_key_values import UserNsDict request = get_current_request() if request is not None: user_id = request.unauthenticated_userid # Check again downstream for real userid if user_id is None: raise HTTPUnauthorized() else: raise RuntimeError() return UserNsDict(parent_instance, user_id)
def api_login_required(view_callable): """ Like decorators.login_required, but returning json on an error. """ # TODO: If we replace the regular @login_required checks on POSTs with a tween, what do about this? def inner(request): if request.userid == 0: raise HTTPUnauthorized( json=_ERROR_UNSIGNED, www_authenticate=_STANDARD_WWW_AUTHENTICATE, ) return view_callable(request) return inner
def _do_update_from_json( self, json, parse_def, ctx, duplicate_handling=None, object_importer=None): from ..auth.util import user_has_permission user_id = ctx.get_user_id() target_user_id = user_id user = ctx.get_instance_of_class(User) if user: target_user_id = user.id if self.user_id: if target_user_id != self.user_id: if not user_has_permission(self.discussion_id, user_id, P_ADMIN_DISC): raise HTTPUnauthorized() # For now, do not allow changing user, it's way too complicated. if 'user' in json and User.get_database_id(json['user']) != self.user_id: raise HTTPBadRequest() else: json_user_id = json.get('user', None) if json_user_id is None: json_user_id = target_user_id else: json_user_id = User.get_database_id(json_user_id) if json_user_id != user_id and not user_has_permission(self.discussion_id, user_id, P_ADMIN_DISC): raise HTTPUnauthorized() self.user_id = json_user_id if self.discussion_id: if 'discussion_id' in json and Discussion.get_database_id(json['discussion_id']) != self.discussion_id: raise HTTPBadRequest() else: discussion_id = json.get('discussion', None) or ctx.get_discussion_id() if discussion_id is None: raise HTTPBadRequest() self.discussion_id = Discussion.get_database_id(discussion_id) new_type = json.get('@type', self.type) if self.external_typename() != new_type: polymap = inspect(self.__class__).polymorphic_identity if new_type not in polymap: raise HTTPBadRequest() new_type = polymap[new_type].class_ new_instance = self.change_class(new_type) return new_instance._do_update_from_json( json, parse_def, ctx, DuplicateHandling.USE_ORIGINAL, object_importer) creation_origin = json.get('creation_origin', "USER_REQUESTED") if creation_origin is not None: self.creation_origin = NotificationCreationOrigin.from_string(creation_origin) if json.get('parent_subscription', None) is not None: self.parent_subscription_id = self.get_database_id(json['parent_subscription']) status = json.get('status', None) if status: status = NotificationSubscriptionStatus.from_string(status) if status != self.status: self.status = status self.last_status_change_date = datetime.utcnow() return self
def _do_create_from_json( cls, json, parse_def, context, duplicate_handling=None, object_importer=None): user_id = context.get_user_id() permissions = context.get_permissions() duplicate_handling = \ duplicate_handling or cls.default_duplicate_handling can_create = cls.user_can_cls( user_id, CrudPermissions.CREATE, permissions) if duplicate_handling == DuplicateHandling.ERROR and not can_create: raise HTTPUnauthorized( "User id <%s> cannot create a <%s> object" % ( user_id, cls.__name__)) # creating an object can be a weird way to find an object by attributes inst = cls() i_context = inst.get_instance_context(context) result = inst._do_update_from_json( json, parse_def, i_context, duplicate_handling, object_importer) # Now look for missing relationships result.populate_from_context(context) result = result.handle_duplication( json, parse_def, context, duplicate_handling, object_importer) if result is inst and not can_create: raise HTTPUnauthorized( "User id <%s> cannot create a <%s> object" % ( user_id, cls.__name__)) elif result is not inst and \ not result.user_can( user_id, CrudPermissions.UPDATE, permissions ) and cls.default_db.is_modified(result, False): raise HTTPUnauthorized( "User id <%s> cannot modify a <%s> object" % ( user_id, cls.__name__)) if result is not inst: i_context = result.get_instance_context(context) cls.default_db.add(result) if '@id' in json and json['@id'] != result.uri() and object_importer: object_importer.associate(json['@id'], result) return i_context
def _assign_subobject_list(self, instances, accessor): # only known case yet is Langstring.entries if isinstance(accessor, RelationshipProperty): if not accessor.back_populates: # Try the brutal approach setattr(self, accessor.key, instances) else: from ..lib.history_mixin import TombstonableMixin current_instances = getattr(self, accessor.key) missing = set(instances) - set(current_instances) if missing: # Maybe tombstones missing = filter( lambda a: not isinstance(a, TombstonableMixin) or not a.is_tombstone, missing) assert not missing, "what's wrong with back_populates?" extra = set(current_instances) - set(instances) if extra: remote_columns = list(accessor.remote_side) if len(accessor.remote_side) > 1: if issubclass(accessor.mapper.class_, TombstonableMixin): remote_columns = filter(lambda c: c.name != 'tombstone_date', remote_columns) assert len(remote_columns) == 1 remote = remote_columns[0] if remote.nullable: # TODO: check update permissions on that object. for inst in missing: setattr(inst, remote.key, None) else: for inst in extra: if not inst.user_can( user_id, CrudPermissions.DELETE, permissions): raise HTTPUnauthorized( "Cannot delete object %s", inst.uri()) else: if isinstance(inst, TombstonableMixin): inst.is_tombstone = True else: self.db.delete(inst) elif isinstance(accessor, property): # Note: Does not happen yet. property.fset(self, instances) elif isinstance(accessor, Column): raise HTTPBadRequest( "%s cannot have multiple values" % (accessor.key, )) elif isinstance(accessor, AssociationProxy): # Also never happens current_instances = accessor.__get__(self, self.__class__) missing = set(instances) - set(current_instances) extra = set(current_instances) - set(instances) for inst in missing: accessor.add(inst) for inst in extra: accessor.remove(inst) else: assert False, "we should not get here"