Python rest_framework.exceptions 模块,PermissionDenied() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用rest_framework.exceptions.PermissionDenied()

项目:ODM2WebSDL    作者:ODM2    | 项目源码 | 文件源码
def authenticate(self, request):
        if request.META['REQUEST_METHOD'] != 'POST':
            return None

        print 'Request data: {}'.format(request.data)
        if 'HTTP_TOKEN' not in request.META:
            raise exceptions.ParseError("Registration Token not present in the request.")
        elif 'sampling_feature' not in request.data:
            raise exceptions.ParseError("Sampling feature UUID not present in the request.")

        # Get auth_token(uuid) from header, get registration object with auth_token, get the user from that registration, verify sampling_feature uuid is registered by this user, be happy.
        token = request.META['HTTP_TOKEN']
        registration = SiteRegistration.objects.filter(registration_token=token).first()
        if not registration:
            raise exceptions.PermissionDenied('Invalid Security Token')

        # request needs to have the sampling feature uuid of the registration -
        if str(registration.sampling_feature.sampling_feature_uuid) != request.data['sampling_feature']:
            raise exceptions.AuthenticationFailed(
                'Site Identifier is not associated with this Token')  # or other related exception

        return None
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def test_publish_release(self, mock_client):
        self.client = DockerClient()
        self.client.publish_release('ozzy/embryo:git-f2a8020',
                                    {'POWERED_BY': 'Deis'}, 'ozzy/embryo:v4', True)
        self.assertTrue(self.client.client.pull.called)
        self.assertTrue(self.client.client.tag.called)
        self.assertTrue(self.client.client.build.called)
        self.assertTrue(self.client.client.push.called)
        # Test that a registry host prefix is replaced with deis-registry for the target
        self.client.publish_release('ozzy/embryo:git-f2a8020',
                                    {'POWERED_BY': 'Deis'}, 'quay.io/ozzy/embryo:v4', True)
        docker_push = self.client.client.push
        docker_push.assert_called_with(
            'localhost:5000/ozzy/embryo', tag='v4', insecure_registry=True, stream=True)
        # Test that blacklisted image names can't be published
        with self.assertRaises(PermissionDenied):
            self.client.publish_release(
                'deis/controller:v1.11.1', {}, 'deis/controller:v1.11.1', True)
        with self.assertRaises(PermissionDenied):
            self.client.publish_release(
                'localhost:5000/deis/controller:v1.11.1', {}, 'deis/controller:v1.11.1', True)
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def test_build(self, mock_client):
        # test that self.client.build was called with proper arguments
        self.client = DockerClient()
        self.client.build('ozzy/embryo:git-f3a8020', {'POWERED_BY': 'Deis'}, 'ozzy/embryo', 'v4')
        docker_build = self.client.client.build
        self.assertTrue(docker_build.called)
        args = {"rm": True, "tag": u'localhost:5000/ozzy/embryo:v4', "stream": True}
        kwargs = docker_build.call_args[1]
        self.assertDictContainsSubset(args, kwargs)
        # test that the fileobj arg to "docker build" contains a correct Dockerfile
        f = kwargs['fileobj']
        self.assertEqual(f.read(), "FROM ozzy/embryo:git-f3a8020\nENV POWERED_BY='Deis'")
        # Test that blacklisted image names can't be built
        with self.assertRaises(PermissionDenied):
            self.client.build('deis/controller:v1.11.1', {}, 'deis/controller', 'v1.11.1')
        with self.assertRaises(PermissionDenied):
            self.client.build(
                'localhost:5000/deis/controller:v1.11.1', {}, 'deis/controller', 'v1.11.1')
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def passwd(self, request, **kwargs):
        caller_obj = self.get_object()
        target_obj = self.get_object()
        if request.data.get('username'):
            # if you "accidentally" target yourself, that should be fine
            if caller_obj.username == request.data['username'] or caller_obj.is_superuser:
                target_obj = get_object_or_404(User, username=request.data['username'])
            else:
                raise PermissionDenied()
        if request.data.get('password') or not caller_obj.is_superuser:
            if not target_obj.check_password(request.data['password']):
                return Response({'detail': 'Current password does not match'},
                                status=status.HTTP_400_BAD_REQUEST)
        target_obj.set_password(request.data['new_password'])
        target_obj.save()
        return Response({'status': 'password set'})
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def has_permission(self, request, view):
        """
        If settings.REGISTRATION_MODE does not exist, such as during a test, return True
        Return `True` if permission is granted, `False` otherwise.
        """
        try:
            if settings.REGISTRATION_MODE == 'disabled':
                raise exceptions.PermissionDenied('Registration is disabled')
            if settings.REGISTRATION_MODE == 'enabled':
                return True
            elif settings.REGISTRATION_MODE == 'admin_only':
                return request.user.is_superuser
            else:
                raise Exception("{} is not a valid registation mode"
                                .format(settings.REGISTRATION_MODE))
        except AttributeError:
            return True
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def test_publish_release(self, mock_client):
        self.client = DockerClient()
        self.client.publish_release('ozzy/embryo:git-f2a8020',
                                    {'POWERED_BY': 'Deis'}, 'ozzy/embryo:v4', True)
        self.assertTrue(self.client.client.pull.called)
        self.assertTrue(self.client.client.tag.called)
        self.assertTrue(self.client.client.build.called)
        self.assertTrue(self.client.client.push.called)
        # Test that a registry host prefix is replaced with deis-registry for the target
        self.client.publish_release('ozzy/embryo:git-f2a8020',
                                    {'POWERED_BY': 'Deis'}, 'quay.io/ozzy/embryo:v4', True)
        docker_push = self.client.client.push
        docker_push.assert_called_with(
            'localhost:5000/ozzy/embryo', tag='v4', insecure_registry=True, stream=True)
        # Test that blacklisted image names can't be published
        with self.assertRaises(PermissionDenied):
            self.client.publish_release(
                'deis/controller:v1.11.1', {}, 'deis/controller:v1.11.1', True)
        with self.assertRaises(PermissionDenied):
            self.client.publish_release(
                'localhost:5000/deis/controller:v1.11.1', {}, 'deis/controller:v1.11.1', True)
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def test_build(self, mock_client):
        # test that self.client.build was called with proper arguments
        self.client = DockerClient()
        self.client.build('ozzy/embryo:git-f3a8020', {'POWERED_BY': 'Deis'}, 'ozzy/embryo', 'v4')
        docker_build = self.client.client.build
        self.assertTrue(docker_build.called)
        args = {"rm": True, "tag": u'localhost:5000/ozzy/embryo:v4', "stream": True}
        kwargs = docker_build.call_args[1]
        self.assertDictContainsSubset(args, kwargs)
        # test that the fileobj arg to "docker build" contains a correct Dockerfile
        f = kwargs['fileobj']
        self.assertEqual(f.read(), "FROM ozzy/embryo:git-f3a8020\nENV POWERED_BY='Deis'")
        # Test that blacklisted image names can't be built
        with self.assertRaises(PermissionDenied):
            self.client.build('deis/controller:v1.11.1', {}, 'deis/controller', 'v1.11.1')
        with self.assertRaises(PermissionDenied):
            self.client.build(
                'localhost:5000/deis/controller:v1.11.1', {}, 'deis/controller', 'v1.11.1')
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def passwd(self, request, **kwargs):
        caller_obj = self.get_object()
        target_obj = self.get_object()
        if request.data.get('username'):
            # if you "accidentally" target yourself, that should be fine
            if caller_obj.username == request.data['username'] or caller_obj.is_superuser:
                target_obj = get_object_or_404(User, username=request.data['username'])
            else:
                raise PermissionDenied()
        if request.data.get('password') or not caller_obj.is_superuser:
            if not target_obj.check_password(request.data['password']):
                return Response({'detail': 'Current password does not match'},
                                status=status.HTTP_400_BAD_REQUEST)
        target_obj.set_password(request.data['new_password'])
        target_obj.save()
        return Response({'status': 'password set'})
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def has_permission(self, request, view):
        """
        If settings.REGISTRATION_MODE does not exist, such as during a test, return True
        Return `True` if permission is granted, `False` otherwise.
        """
        try:
            if settings.REGISTRATION_MODE == 'disabled':
                raise exceptions.PermissionDenied('Registration is disabled')
            if settings.REGISTRATION_MODE == 'enabled':
                return True
            elif settings.REGISTRATION_MODE == 'admin_only':
                return request.user.is_superuser
            else:
                raise Exception("{} is not a valid registation mode"
                                .format(settings.REGISTRATION_MODE))
        except AttributeError:
            return True
项目:vialer-middleware    作者:VoIPGRID    | 项目源码 | 文件源码
def test_check_status_code(self):
        """
        Test status codes and exceptions raised.
        """
        # Step 1: Status code 200.
        self.authentication._check_status_code(200)

        # Step 2: Status code 401.
        with self.assertRaises(AuthenticationFailed):
            self.authentication._check_status_code(401)

        # Step 3: Status code 403.
        with self.assertRaises(PermissionDenied):
            self.authentication._check_status_code(403)

        # Step 4: Status code other than tested.
        with self.assertRaises(UnavailableException):
            self.authentication._check_status_code(500)
项目:momo-wps    作者:WPS-team-4    | 项目源码 | 文件源码
def post(self, request, *args, **kwargs):
        user = authenticate(userid=request.data['username'], password=request.data['password'])
        if user:
            is_active = user.is_active
            if is_active:
                token, _ = Token.objects.get_or_create(user=user)
                response = Response({"token": token.key,
                                     "user_pk": token.user_id,
                                     "created": token.created}, status=status.HTTP_200_OK)
                return response
            else:
                detail = "?? ??? ??????."
                raise PermissionDenied(detail=detail)
        else:
            detail = "???? ?? ? ????. username? password? ?? ??????."
        raise ValidationError(detail=detail)
项目:netbox    作者:digitalocean    | 项目源码 | 文件源码
def run(self, request, pk):
        """
        Run a Report and create a new ReportResult, overwriting any previous result for the Report.
        """

        # Check that the user has permission to run reports.
        if not request.user.has_perm('extras.add_reportresult'):
            raise PermissionDenied("This user does not have permission to run reports.")

        # Retrieve and run the Report. This will create a new ReportResult.
        report = self._retrieve_report(pk)
        report.run()

        serializer = serializers.ReportDetailSerializer(report)

        return Response(serializer.data)


#
# User activity
#
项目:Sentry    作者:NetEaseGame    | 项目源码 | 文件源码
def delete(self, request, group, note_id):
        if not request.user.is_authenticated():
            raise PermissionDenied(detail="Key doesn't have permission to delete Note")

        try:
            note = Activity.objects.get(
                group=group,
                type=Activity.NOTE,
                user=request.user,
                id=note_id,
            )
        except Activity.DoesNotExist:
            raise ResourceDoesNotExist

        note.delete()

        return Response(status=204)
项目:Sentry    作者:NetEaseGame    | 项目源码 | 文件源码
def put(self, request, group, note_id):
        if not request.user.is_authenticated():
            raise PermissionDenied(detail="Key doesn't have permission to edit Note")

        try:
            note = Activity.objects.get(
                group=group,
                type=Activity.NOTE,
                user=request.user,
                id=note_id,
            )
        except Activity.DoesNotExist:
            raise ResourceDoesNotExist

        serializer = NoteSerializer(data=request.DATA)

        if serializer.is_valid():
            # Would be nice to have a last_modified timestamp we could bump here
            note.data = dict(serializer.object)
            note.save()

            return Response(serialize(note, request.user), status=200)

        return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
项目:FormShare    作者:qlands    | 项目源码 | 文件源码
def clone(self, request, *args, **kwargs):
        self.object = self.get_object()
        data = {'xform': self.object.pk, 'username': request.DATA['username']}
        serializer = CloneXFormSerializer(data=data)
        if serializer.is_valid():
            clone_to_user = User.objects.get(username=data['username'])
            if not request.user.has_perm('can_add_xform',
                                         clone_to_user.profile):
                raise exceptions.PermissionDenied(
                    detail=_(u"User %(user)s has no permission to add "
                             "xforms to account %(account)s" %
                             {'user': request.user.username,
                              'account': data['username']}))
            xform = serializer.save()
            serializer = XFormSerializer(
                xform.cloned_form, context={'request': request})

            return Response(data=serializer.data,
                            status=status.HTTP_201_CREATED)

        return Response(data=serializer.errors,
                        status=status.HTTP_400_BAD_REQUEST)
项目:Anonymous_message    作者:DevRoss    | 项目源码 | 文件源码
def authenticate_credentials(self, key):
        token_cache = 'token_' + key
        cache_user = cache.get(token_cache)
        if cache_user:
            return (cache_user, key)
        try:
            token = self.model.objects.get(key=key)
        except self.model.DoesNotExist:
            raise exceptions.AuthenticationFailed('User does not exist.')
        if not token.user.is_active:
            raise exceptions.PermissionDenied('The user is forbidden.')
        utc_now = timezone.now()
        if token.created < utc_now - timezone.timedelta(hours=24 * 30):
            raise exceptions.AuthenticationFailed('Token has been expired.')
        if token:
            token_cache = 'token_' + key
            cache.set(token_cache, token.user, 24 * 7 * 60 * 60)
        return (token.user, token)
项目:Anonymous_message    作者:DevRoss    | 项目源码 | 文件源码
def my_exception_handler(exc, context):
    # Call REST framework's default exception handler first,
    # to get the standard error response.
    response = exception_handler(exc, context)

    # Now add the HTTP status code to the response.
    # print(exc)
    # print(context)
    if response is not None:
        if isinstance(exc, exceptions.AuthenticationFailed):
            response.data['error_code'] = 2
        elif isinstance(exc, exceptions.PermissionDenied):
            response.data['error_code'] = 3
        else:
            response.data['error_code'] = 1
    return response
项目:ws-backend-community    作者:lavalamp-    | 项目源码 | 文件源码
def place_order(request, pk=None):
    """
    Place a specific order.
    """
    order = get_object_or_404(rest.models.Order, pk=pk)
    if not request.user.is_superuser:
        if not order.organization.can_user_scan(request.user):
            raise PermissionDenied("You do not have sufficient privileges to start scans for that organization.")
    if not order.is_ready_to_place:
        raise PermissionDenied(order.get_ready_errors())
    order.place_order()
    order.save()
    send_emails_for_placed_order.delay(
        order_uuid=unicode(order.uuid),
        receipt_description=order.get_receipt_description(),
    )
    handle_placed_order.delay(order_uuid=unicode(order.uuid))
    return Response(status=204)
项目:sigma-backend    作者:SRLKilling    | 项目源码 | 文件源码
def change_rights(self, request, pk):
        """
            Used to change a member's rights.
            Only administrators can change member's rights.
            Only super-administrator can change admin's rights.
            If the super-administrator right is requested, then it must come from the current
            super-administrator whose thus, losing his status.

            If succeeded, returns HTTP_200_OK with the updated GroupMember object
        """
        user = request.user
        member = self.get_or_404(pk)
        rights_serializer, rights = SigmaViewSet.get_deserialized(GroupMemberRightsSerializer, request.data)

        if not GroupMember.model.can_change_rights(user, member, rights):
            raise PermissionDenied()

        if rights.is_super_administrator:
            pass                                                                                                # TODO : de-superadminer le gars qui file ses droits

        member_serializer, member = self.get_deserialized(rights, member, partial=True)
        member_serializer.save()
        return Response(member_serializer.data, status=status.HTTP_200_OK)
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def add_follower(self, request, pk=None):
        guid = request.data.get("guid")
        try:
            target_profile = Profile.objects.get(guid=guid)
        except Profile.DoesNotExist:
            raise PermissionDenied("Profile given does not exist.")
        profile = self.get_object()
        if profile.guid == guid:
            raise ValidationError("Cannot follow self!")
        profile.following.add(target_profile)
        return Response({"status": "Follower added."})
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def remove_follower(self, request, pk=None):
        guid = request.data.get("guid")
        try:
            target_profile = Profile.objects.get(guid=guid)
        except Profile.DoesNotExist:
            raise PermissionDenied("Profile given does not exist.")
        profile = self.get_object()
        if profile.guid == guid:
            raise ValidationError("Cannot unfollow self!")
        profile.following.remove(target_profile)
        return Response({"status": "Follower removed."})
项目:mk42    作者:Peer-Lab    | 项目源码 | 文件源码
def log_pending(self, request, pk=None, **kwargs):
        """
        Create event log with status == "mk42.apps.core.constants.STATUS_PENDING". Actually NO.

        :param request: django request instance.
        :type request: django.http.request.HttpRequest.
        :param pk: event object primary key.
        :type pk: unicode.
        :param kwargs: additional args.
        :type kwargs: dict.
        :return: django rest framework response.
        :rtype: rest_framework.response.Response.
        """

        raise PermissionDenied(detail=_("Okay move along, move along people, there's nothing to see here!"))
项目:mk42    作者:Peer-Lab    | 项目源码 | 文件源码
def log_canceled(self, request, pk=None, **kwargs):
        """
        Create event log with status == "mk42.apps.core.constants.STATUS_CANCELED".

        :param request: django request instance.
        :type request: django.http.request.HttpRequest.
        :param pk: event object primary key.
        :type pk: unicode.
        :param kwargs: additional args.
        :type kwargs: dict.
        :return: django rest framework response.
        :rtype: rest_framework.response.Response.
        """

        obj = self.get_object()

        if request.user != obj.group.owner:
            # only group owner can change event status
            raise PermissionDenied(detail=_("You must be owner of this group to perform this action."))

        log = obj.log_canceled(**kwargs)

        if not log:
            # can't create event logs with status == "mk42.apps.core.constants.STATUS_CANCELED"
            # if log with status == "mk42.apps.core.constants.STATUS_PENDING" does not exist
            # or log with status == "mk42.apps.core.constants.STATUS_ONGOING" exist
            raise PermissionDenied(detail=_("Can't change status to '{status}'.").format(**{"status": dict(obj.STATUS_CHOICES).get(obj.STATUS_CANCELED), }))

        return Response({"detail": EventLogSerializer(instance=log, context={"request": request, }).data if log else None, })
项目:mk42    作者:Peer-Lab    | 项目源码 | 文件源码
def log_ongoing(self, request, pk=None, **kwargs):
        """
        Create event log with status == "mk42.apps.core.constants.STATUS_ONGOING".

        :param request: django request instance.
        :type request: django.http.request.HttpRequest.
        :param pk: event object primary key.
        :type pk: unicode.
        :param kwargs: additional args.
        :type kwargs: dict.
        :return: django rest framework response.
        :rtype: rest_framework.response.Response.
        """

        obj = self.get_object()

        if request.user != obj.group.owner:
            # only group owner can change event status
            raise PermissionDenied(detail=_("You must be owner of this group to perform this action."))

        log = obj.log_ongoing(**kwargs)

        if not log:
            # can't create event logs with status == "mk42.apps.core.constants.STATUS_ONGOING"
            # if log with status == "mk42.apps.core.constants.STATUS_FINISHED" exist
            # if log with status == "mk42.apps.core.constants.STATUS_CANCELED" exist
            # or log with status == "mk42.apps.core.constants.STATUS_PENDING" does not exist
            raise PermissionDenied(detail=_("Can't change status to '{status}'.").format(**{"status": dict(obj.STATUS_CHOICES).get(obj.STATUS_ONGOING), }))

        return Response({"detail": EventLogSerializer(instance=log, context={"request": request, }).data if log else None, })
项目:mk42    作者:Peer-Lab    | 项目源码 | 文件源码
def log_finished(self, request, pk=None, **kwargs):
        """
        Create event log with status == "mk42.apps.core.constants.STATUS_FINISHED".

        :param request: django request instance.
        :type request: django.http.request.HttpRequest.
        :param pk: event object primary key.
        :type pk: unicode.
        :param kwargs: additional args.
        :type kwargs: dict.
        :return: django rest framework response.
        :rtype: rest_framework.response.Response.
        """

        obj = self.get_object()

        if request.user != obj.group.owner:
            # only group owner can change event status
            raise PermissionDenied(detail=_("You must be owner of this group to perform this action."))

        log = obj.log_finished(**kwargs)

        if not log:
            # can't create event logs with status == "mk42.apps.core.constants.STATUS_FINISHED"
            # if log with status == "mk42.apps.core.constants.STATUS_FINISHED" exist
            # if log with status == "mk42.apps.core.constants.STATUS_CANCELED" exist
            # or log with status == "mk42.apps.core.constants.STATUS_ONGOING" does not exist
            raise PermissionDenied(detail=_("Can't change status to '{status}'.").format(**{"status": dict(obj.STATUS_CHOICES).get(obj.STATUS_FINISHED), }))

        return Response({"detail": EventLogSerializer(instance=log, context={"request": request, }).data if log else None, })
项目:parkkihubi    作者:City-of-Helsinki    | 项目源码 | 文件源码
def parkings_exception_handler(exc, context):
    response = exception_handler(exc, context)

    if response is not None:
        if isinstance(exc, ParkingException):
            response.data['code'] = exc.get_codes()
        elif isinstance(exc, PermissionDenied):
            response.data['code'] = 'permission_denied'

    return response
项目:Server    作者:malaonline    | 项目源码 | 文件源码
def get_profile(self):
        try:
            profile = self.request.user.profile
        except exceptions.ObjectDoesNotExist:
            raise PermissionDenied(detail='Role incorrect')
        return profile
项目:Server    作者:malaonline    | 项目源码 | 文件源码
def get_queryset(self):
        only_valid = self.request.query_params.get('only_valid', '')
        only_valid = only_valid == 'true'
        user = self.request.user
        try:
            queryset = user.parent.coupon_set.all()
        except exceptions.ObjectDoesNotExist:
            raise PermissionDenied(detail='Role incorrect')

        now = timezone.now()
        out_time = models.Coupon.OUT_OF_DATE_TIME
        if only_valid:
            # ??????????
            # ????, ????? => ?????, ???????????
            queryset = queryset.filter(
                expired_at__gt=now,
                used=False,
            ).order_by('-amount', 'expired_at')
        else:
            # ??????????, ???????
            # ????, ????? => ?????
            queryset = queryset.filter(
                expired_at__gt=now - out_time,
            ).extra(
                # ???????????????
                select={'date_diff': 'abs(extract(epoch from (now()-expired_at)))'}
            ).order_by('date_diff', '-amount')

        # ???????
        if self.action == 'list':
            return sorted(queryset, key=lambda x: x.sort_key())

        return queryset
项目:Server    作者:malaonline    | 项目源码 | 文件源码
def get_parent(self):
        try:
            parent = self.request.user.parent
        except (AttributeError, exceptions.ObjectDoesNotExist):
            raise PermissionDenied(detail='Role incorrect')
        return parent
项目:django-postcode-lookup    作者:LabD    | 项目源码 | 文件源码
def _enforce_csrf(self, request):
        """Make sure that we have a valid CSRF token.

        Django restframework does validate this when using the
        SessionAuthentication but since that also checks if the user is
        authenticated we can't really use that

        """
        reason = CSRFCheck().process_view(request, None, (), {})
        if reason:
            # CSRF failed, bail with explicit error message
            raise PermissionDenied('CSRF Failed: %s' % reason)
项目:sdining    作者:Lurance    | 项目源码 | 文件源码
def has_view_permissions(self, path, method, view):
        """
        Return `True` if the incoming request has the correct view permissions.
        """
        if view.request is None:
            return True

        try:
            view.check_permissions(view.request)
        except (exceptions.APIException, Http404, PermissionDenied):
            return False
        return True
项目:sdining    作者:Lurance    | 项目源码 | 文件源码
def get(self, request, *args, **kwargs):
        schema = self.schema_generator.get_schema(request, self.public)
        if schema is None:
            raise exceptions.PermissionDenied()
        return Response(schema)
项目:sdining    作者:Lurance    | 项目源码 | 文件源码
def exception_handler(exc, context):
    """
    Returns the response that should be used for any given exception.

    By default we handle the REST framework `APIException`, and also
    Django's built-in `Http404` and `PermissionDenied` exceptions.

    Any unhandled exceptions may return `None`, which will cause a 500 error
    to be raised.
    """
    if isinstance(exc, exceptions.APIException):
        headers = {}
        if getattr(exc, 'auth_header', None):
            headers['WWW-Authenticate'] = exc.auth_header
        if getattr(exc, 'wait', None):
            headers['Retry-After'] = '%d' % exc.wait

        if isinstance(exc.detail, (list, dict)):
            data = exc.detail
        else:
            data = {'detail': exc.detail}

        set_rollback()
        return Response(data, status=exc.status_code, headers=headers)

    elif isinstance(exc, Http404):
        msg = _('Not found.')
        data = {'detail': six.text_type(msg)}

        set_rollback()
        return Response(data, status=status.HTTP_404_NOT_FOUND)

    elif isinstance(exc, PermissionDenied):
        msg = _('Permission denied.')
        data = {'detail': six.text_type(msg)}

        set_rollback()
        return Response(data, status=status.HTTP_403_FORBIDDEN)

    return None
项目:sdining    作者:Lurance    | 项目源码 | 文件源码
def permission_denied(self, request, message=None):
        """
        If request is not permitted, determine what kind of exception to raise.
        """
        if request.authenticators and not request.successful_authenticator:
            raise exceptions.NotAuthenticated()
        raise exceptions.PermissionDenied(detail=message)
项目:sdining    作者:Lurance    | 项目源码 | 文件源码
def enforce_csrf(self, request):
        """
        Enforce CSRF validation for session based authentication.
        """
        reason = CSRFCheck().process_view(request, None, (), {})
        if reason:
            # CSRF failed, bail with explicit error message
            raise exceptions.PermissionDenied('CSRF Failed: %s' % reason)
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def check_blacklist(repo):
    """Check a Docker repository name for collision with deis/* components."""
    blacklisted = [  # NOTE: keep this list up to date!
        'builder', 'cache', 'controller', 'database', 'logger', 'logspout',
        'publisher', 'registry', 'router', 'store-admin', 'store-daemon',
        'store-gateway', 'store-metadata', 'store-monitor', 'swarm', 'mesos-master',
        'mesos-marathon', 'mesos-slave', 'zookeeper',
    ]
    if any("deis/{}".format(c) in repo for c in blacklisted):
        raise PermissionDenied("Repository name {} is not allowed".format(repo))
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def test_pull(self, mock_client):
        self.client = DockerClient()
        self.client.pull('alpine', '3.2')
        docker_pull = self.client.client.pull
        docker_pull.assert_called_once_with(
            'alpine', tag='3.2', insecure_registry=True, stream=True)
        # Test that blacklisted image names can't be pulled
        with self.assertRaises(PermissionDenied):
            self.client.pull('deis/controller', 'v1.11.1')
        with self.assertRaises(PermissionDenied):
            self.client.pull('localhost:5000/deis/controller', 'v1.11.1')
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def test_tag(self, mock_client):
        self.client = DockerClient()
        self.client.tag('ozzy/embryo:git-f2a8020', 'ozzy/embryo', 'v4')
        docker_tag = self.client.client.tag
        docker_tag.assert_called_once_with(
            'ozzy/embryo:git-f2a8020', 'ozzy/embryo', tag='v4', force=True)
        # Test that blacklisted image names can't be tagged
        with self.assertRaises(PermissionDenied):
            self.client.tag('deis/controller:v1.11.1', 'deis/controller', 'v1.11.1')
        with self.assertRaises(PermissionDenied):
            self.client.tag('localhost:5000/deis/controller:v1.11.1', 'deis/controller', 'v1.11.1')
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def update(self, request, **kwargs):
        app = self.get_object()

        if request.data.get('owner'):
            if self.request.user != app.owner and not self.request.user.is_superuser:
                raise PermissionDenied()
            new_owner = get_object_or_404(User, username=request.data['owner'])
            app.owner = new_owner
        app.save()
        return Response(status=status.HTTP_200_OK)
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def create(self, request, *args, **kwargs):
        app = get_object_or_404(models.App, id=request.data['receive_repo'])
        request.user = get_object_or_404(User, username=request.data['receive_user'])
        # check the user is authorized for this app
        if not permissions.is_app_user(request, app):
            raise PermissionDenied()
        request.data['app'] = app
        request.data['owner'] = request.user
        return super(PushHookViewSet, self).create(request, *args, **kwargs)
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def create(self, request, *args, **kwargs):
        app = get_object_or_404(models.App, id=request.data['receive_repo'])
        self.user = request.user = get_object_or_404(User, username=request.data['receive_user'])
        # check the user is authorized for this app
        if not permissions.is_app_user(request, app):
            raise PermissionDenied()
        request.data['app'] = app
        request.data['owner'] = self.user
        super(BuildHookViewSet, self).create(request, *args, **kwargs)
        # return the application databag
        response = {'release': {'version': app.release_set.latest().version},
                    'domains': ['.'.join([app.id, settings.DEIS_DOMAIN])]}
        return Response(response, status=status.HTTP_200_OK)
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def create(self, request, *args, **kwargs):
        app = get_object_or_404(models.App, id=request.data['receive_repo'])
        request.user = get_object_or_404(User, username=request.data['receive_user'])
        # check the user is authorized for this app
        if not permissions.is_app_user(request, app):
            raise PermissionDenied()
        config = app.release_set.latest().config
        serializer = self.get_serializer(config)
        return Response(serializer.data, status=status.HTTP_200_OK)
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def destroy(self, request, **kwargs):
        app = get_object_or_404(models.App, id=self.kwargs['id'])
        user = get_object_or_404(User, username=kwargs['username'])

        perm_name = "api.{}".format(self.perm)
        if not user.has_perm(perm_name, app):
            raise PermissionDenied()

        if (user != request.user and
            not permissions.IsOwnerOrAdmin.has_object_permission(permissions.IsOwnerOrAdmin(),
                                                                 request, self, app)):
            raise PermissionDenied()
        remove_perm(self.perm, user, app)
        models.log_event(app, "User {} was revoked access to {}".format(user, app))
        return Response(status=status.HTTP_204_NO_CONTENT)
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def check_blacklist(repo):
    """Check a Docker repository name for collision with deis/* components."""
    blacklisted = [  # NOTE: keep this list up to date!
        'builder', 'cache', 'controller', 'database', 'logger', 'logspout',
        'publisher', 'registry', 'router', 'store-admin', 'store-daemon',
        'store-gateway', 'store-metadata', 'store-monitor', 'swarm', 'mesos-master',
        'mesos-marathon', 'mesos-slave', 'zookeeper',
    ]
    if any("deis/{}".format(c) in repo for c in blacklisted):
        raise PermissionDenied("Repository name {} is not allowed".format(repo))
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def test_tag(self, mock_client):
        self.client = DockerClient()
        self.client.tag('ozzy/embryo:git-f2a8020', 'ozzy/embryo', 'v4')
        docker_tag = self.client.client.tag
        docker_tag.assert_called_once_with(
            'ozzy/embryo:git-f2a8020', 'ozzy/embryo', tag='v4', force=True)
        # Test that blacklisted image names can't be tagged
        with self.assertRaises(PermissionDenied):
            self.client.tag('deis/controller:v1.11.1', 'deis/controller', 'v1.11.1')
        with self.assertRaises(PermissionDenied):
            self.client.tag('localhost:5000/deis/controller:v1.11.1', 'deis/controller', 'v1.11.1')
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def destroy(self, request, **kwargs):
        calling_obj = self.get_object()
        target_obj = calling_obj

        if request.data.get('username'):
            # if you "accidentally" target yourself, that should be fine
            if calling_obj.username == request.data['username'] or calling_obj.is_superuser:
                target_obj = get_object_or_404(User, username=request.data['username'])
            else:
                raise PermissionDenied()

        target_obj.delete()
        return Response(status=status.HTTP_204_NO_CONTENT)
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def update(self, request, **kwargs):
        app = self.get_object()

        if request.data.get('owner'):
            if self.request.user != app.owner and not self.request.user.is_superuser:
                raise PermissionDenied()
            new_owner = get_object_or_404(User, username=request.data['owner'])
            app.owner = new_owner
        app.save()
        return Response(status=status.HTTP_200_OK)
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def create(self, request, *args, **kwargs):
        app = get_object_or_404(models.App, id=request.data['receive_repo'])
        request.user = get_object_or_404(User, username=request.data['receive_user'])
        # check the user is authorized for this app
        if not permissions.is_app_user(request, app):
            raise PermissionDenied()
        request.data['app'] = app
        request.data['owner'] = request.user
        return super(PushHookViewSet, self).create(request, *args, **kwargs)
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def create(self, request, *args, **kwargs):
        app = get_object_or_404(models.App, id=request.data['receive_repo'])
        request.user = get_object_or_404(User, username=request.data['receive_user'])
        # check the user is authorized for this app
        if not permissions.is_app_user(request, app):
            raise PermissionDenied()
        config = app.release_set.latest().config
        serializer = self.get_serializer(config)
        return Response(serializer.data, status=status.HTTP_200_OK)
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def create(self, request, **kwargs):
        app = self.get_object()
        if not permissions.IsOwnerOrAdmin.has_object_permission(permissions.IsOwnerOrAdmin(),
                                                                request, self, app):
            raise PermissionDenied()

        user = get_object_or_404(User, username=request.data['username'])
        assign_perm(self.perm, user, app)
        models.log_event(app, "User {} was granted access to {}".format(user, app))
        return Response(status=status.HTTP_201_CREATED)