Python rest_framework.status 模块,HTTP_404_NOT_FOUND 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用rest_framework.status.HTTP_404_NOT_FOUND

项目:timeblob    作者:dkieffer    | 项目源码 | 文件源码
def time_entry_item(request, id):
    try:
        item = TimeEntry.objects.get(user=request.user, id=id)
    except NoCurrentEntry:
        return Response(status=status.HTTP_404_NOT_FOUND)
    user = request.user
    if request.method == 'GET':
        serializer = TimeEntrySerializer(request.user, item)
        return Response(serializer.data)
    elif request.method == 'PUT':
        serializer = TimeEntrySerializer(request.user, item, data=request.data)
        if (serializer.is_valid()):
            serializer.save()
            return Response(serializer.data)
        return Response(serializer.errors)
    elif request.method == 'DELETE':
        pass
项目:BackendAllStars    作者:belatrix    | 项目源码 | 文件源码
def delete(self, request, id, kind, format=None):
        """
        WARNING: Force delete
        """
        if kind == 'badge':
            kind = get_object_or_404(Badge, pk=id)
        elif kind == 'category':
            kind = get_object_or_404(Category, pk=id)
        elif kind == 'event':
            kind = get_object_or_404(Event, pk=id)
        elif kind == 'keyword':
            kind = get_object_or_404(Keyword, pk=id)
        elif kind == 'location':
            kind = get_object_or_404(Location, pk=id)
        elif kind == 'position':
            kind = get_object_or_404(Position, pk=id)
        elif kind == 'role':
            kind = get_object_or_404(Role, pk=id)
        else:
            return Response(status=status.HTTP_404_NOT_FOUND)

        kind.delete()
        return Response(status=status.HTTP_204_NO_CONTENT)
项目:django-elasticsearch-dsl-drf    作者:barseghyanartur    | 项目源码 | 文件源码
def _test_pagination(self):
        """Test pagination."""
        self.authenticate()

        publishers_url = reverse('publisherdocument-list', kwargs={})
        books_url = reverse('bookdocument-list', kwargs={})
        data = {}

        invalid_page_url = books_url + '?page=3&page_size=30'

        invalid_response = self.client.get(invalid_page_url, data)
        self.assertEqual(
            invalid_response.status_code,
            status.HTTP_404_NOT_FOUND
        )

        valid_page_url = publishers_url + '?limit=5&offset=8'

        # Check if response now is valid
        valid_response = self.client.get(valid_page_url, data)
        self.assertEqual(valid_response.status_code, status.HTTP_200_OK)

        # Check totals
        self.assertEqual(len(valid_response.data['results']), 5)
项目:Django-Efficient-Rest    作者:G4brym    | 项目源码 | 文件源码
def process(self):
        if self.requires_action:
            if settings.DEBUG:
                self.setCode(getattr(self, str(self.request.method.lower() + "_" + self.action))())
            else:
                try:
                    self.setCode(getattr(self, str(self.request.method.lower() + "_" + self.action))())
                except AttributeError:
                    self.addError("not_found")
                    self.setCode(status.HTTP_404_NOT_FOUND)
        else:
            try:
                self.setCode(getattr(self, str(self.request.method.lower() + "_process"))())
            except AttributeError:
                self.addError("not_found")
                self.setCode(status.HTTP_404_NOT_FOUND)
项目:Django-Efficient-Rest    作者:G4brym    | 项目源码 | 文件源码
def get_process_single(self, id):
        #####
        # Retrieves a single object
        #####

        try:
            clean_id = int(id)
        except:
            return status.HTTP_400_BAD_REQUEST

        try:
            object = self.Meta.Model.objects.get(id=clean_id)
        except ObjectDoesNotExist:
            return status.HTTP_404_NOT_FOUND

        self.setResult(object.get_as_dict())
        return status.HTTP_200_OK
项目:Django-Efficient-Rest    作者:G4brym    | 项目源码 | 文件源码
def put_process_single(self, id):
        #####
        # Updates an Object
        #####

        try:
            clean_id = int(id)
        except:
            return status.HTTP_400_BAD_REQUEST

        try:
            object = self.Meta.Model.objects.get(id=clean_id)
        except:
            return status.HTTP_404_NOT_FOUND

        serializer = self.Meta.Serializer(object, data=self.getInput(), partial=True)

        if serializer.is_valid():
            serializer.save()
            self.setResult(serializer.data)
            return status.HTTP_202_ACCEPTED
项目:Django-Efficient-Rest    作者:G4brym    | 项目源码 | 文件源码
def delete_process_single(self, id):
        #####
        # Deletes a single product
        #####

        try:
            clean_id = int(id)
        except:
            return status.HTTP_400_BAD_REQUEST

        try:
            object = self.Meta.Model.objects.get(id=clean_id)
        except ObjectDoesNotExist:
            return status.HTTP_404_NOT_FOUND

        object.delete()

        return status.HTTP_200_OK
项目:OpsManage    作者:welliamcao    | 项目源码 | 文件源码
def user_detail(request, id,format=None):
    """
    Retrieve, update or delete a server assets instance.
    """
    try:
        snippet = User.objects.get(id=id)
    except User.DoesNotExist:
        return Response(status=status.HTTP_404_NOT_FOUND)

    if request.method == 'GET':
        serializer = UserSerializer(snippet)
        return Response(serializer.data)

    elif request.method == 'PUT':
        serializer = UserSerializer(snippet, data=request.data)
        if serializer.is_valid():
            serializer.save()
            return Response(serializer.data)
        return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

    elif request.method == 'DELETE':
        if not request.user.has_perm('OpsManage.delete_user'):
            return Response(status=status.HTTP_403_FORBIDDEN)
        snippet.delete()
        return Response(status=status.HTTP_204_NO_CONTENT)
项目:OpsManage    作者:welliamcao    | 项目源码 | 文件源码
def assetsLog_detail(request, id,format=None):
    """
    Retrieve, update or delete a server assets instance.
    """    
    try:
        snippet = Log_Assets.objects.get(id=id)
    except Log_Assets.DoesNotExist:
        return Response(status=status.HTTP_404_NOT_FOUND)

    if request.method == 'GET':
        serializer = AssetsLogsSerializer(snippet)
        return Response(serializer.data)

    elif request.method == 'DELETE' and request.user.has_perm('OpsManage.delete_log_assets'):
        if not request.user.has_perm('OpsManage.delete_log_assets'):
            return Response(status=status.HTTP_403_FORBIDDEN)
        snippet.delete()
        return Response(status=status.HTTP_204_NO_CONTENT)
项目:OpsManage    作者:welliamcao    | 项目源码 | 文件源码
def deploy_detail(request, id,format=None):
    """
    Retrieve, update or delete a server assets instance.
    """
    try:
        snippet = Project_Config.objects.get(id=id)
    except Project_Config.DoesNotExist:
        return Response(status=status.HTTP_404_NOT_FOUND)

    if request.method == 'GET':
        serializer = ProjectConfigSerializer(snippet)
        return Response(serializer.data)

    elif request.method == 'DELETE':
        if not request.user.has_perm('OpsManage.delete_project_config'):
            return Response(status=status.HTTP_403_FORBIDDEN)
        recordProject.delay(project_user=str(request.user),project_id=id,project_name=snippet.project_name,project_content="????")
        snippet.delete()
        return Response(status=status.HTTP_204_NO_CONTENT)
项目:OpsManage    作者:welliamcao    | 项目源码 | 文件源码
def deployLogs_detail(request, id,format=None):
    """
    Retrieve, update or delete a server assets instance.
    """
    try:
        snippet = Log_Project_Config.objects.get(id=id)
    except Log_Project_Config.DoesNotExist:
        return Response(status=status.HTTP_404_NOT_FOUND)

    if request.method == 'GET':
        serializer = DeployLogsSerializer(snippet)
        return Response(serializer.data)

    elif request.method == 'DELETE':
        if not request.user.has_perm('OpsManage.delete_log_project_config'):
            return Response(status=status.HTTP_403_FORBIDDEN)
        snippet.delete()
        return Response(status=status.HTTP_204_NO_CONTENT)
项目:OpsManage    作者:welliamcao    | 项目源码 | 文件源码
def modelLogsdetail(request, id,format=None):
    """
    Retrieve, update or delete a server assets instance.
    """
    try:
        snippet = Log_Ansible_Model.objects.get(id=id)
    except Log_Ansible_Model.DoesNotExist:
        return Response(status=status.HTTP_404_NOT_FOUND)

    if request.method == 'GET':
        serializer = AnsibleModelLogsSerializer(snippet)
        return Response(serializer.data)

    elif request.method == 'DELETE':
        if not request.user.has_perm('OpsManage.can_delete_log_ansible_model'):
            return Response(status=status.HTTP_403_FORBIDDEN)
        snippet.delete()
        return Response(status=status.HTTP_204_NO_CONTENT)
项目:OpsManage    作者:welliamcao    | 项目源码 | 文件源码
def playbookLogsdetail(request, id,format=None):
    """
    Retrieve, update or delete a server assets instance.
    """
    try:
        snippet = Log_Ansible_Playbook.objects.get(id=id)
    except Log_Ansible_Playbook.DoesNotExist:
        return Response(status=status.HTTP_404_NOT_FOUND)

    if request.method == 'GET':
        serializer = AnsiblePlaybookLogsSerializer(snippet)
        return Response(serializer.data)

    elif request.method == 'DELETE':
        if not request.user.has_perm('OpsManage.delete_log_ansible_playbook'):
            return Response(status=status.HTTP_403_FORBIDDEN)
        snippet.delete()
        return Response(status=status.HTTP_204_NO_CONTENT)
项目:OpsManage    作者:welliamcao    | 项目源码 | 文件源码
def cron_detail(request, id,format=None):
    """
    Retrieve, update or delete a server assets instance.
    """
    try:
        snippet = Cron_Config.objects.get(id=id)
    except Cron_Config.DoesNotExist:
        return Response(status=status.HTTP_404_NOT_FOUND)

    if request.method == 'GET':
        serializer = CronSerializer(snippet)
        return Response(serializer.data)

    elif request.method == 'PUT':
        serializer = CronSerializer(snippet, data=request.data)
        if serializer.is_valid():
            serializer.save()
            return Response(serializer.data)
        return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

    elif request.method == 'DELETE':
        if not request.user.has_perm('OpsManage.delete_service_assets'):
            return Response(status=status.HTTP_403_FORBIDDEN)
        snippet.delete()
        return Response(status=status.HTTP_204_NO_CONTENT)
项目:OpsManage    作者:welliamcao    | 项目源码 | 文件源码
def cronLogsdetail(request, id,format=None):
    """
    Retrieve, update or delete a server assets instance.
    """
    try:
        snippet = Log_Cron_Config.objects.get(id=id)
    except Log_Cron_Config.DoesNotExist:
        return Response(status=status.HTTP_404_NOT_FOUND)

    if request.method == 'GET':
        serializer = CronLogsSerializer(snippet)
        return Response(serializer.data)

    elif request.method == 'DELETE':
        if not request.user.has_perm('OpsManage.delete_log_cron_config'):
            return Response(status=status.HTTP_403_FORBIDDEN)
        snippet.delete()
        return Response(status=status.HTTP_204_NO_CONTENT)
项目:pfb-network-connectivity    作者:azavea    | 项目源码 | 文件源码
def results(self, request, pk=None):
        job = self.get_object()

        if job.status == AnalysisJob.Status.COMPLETE:
            results = OrderedDict([
                ('census_block_count', job.census_block_count),
                ('census_blocks_url', job.census_blocks_url),
                ('connected_census_blocks_url', job.connected_census_blocks_url),
                ('destinations_urls', job.destinations_urls),
                ('tile_urls', job.tile_urls),
                ('overall_scores', job.overall_scores),
                ('overall_scores_url', job.overall_scores_url),
                ('score_inputs_url', job.score_inputs_url),
                ('ways_url', job.ways_url),
            ])
            return Response(results, status=status.HTTP_200_OK)
        else:
            return Response(None, status=status.HTTP_404_NOT_FOUND)
项目:django-rest-framework-passwordless    作者:aaronn    | 项目源码 | 文件源码
def post(self, request, *args, **kwargs):
        if self.alias_type.upper() not in api_settings.PASSWORDLESS_AUTH_TYPES:
            # Only allow auth types allowed in settings.
            return Response(status=status.HTTP_404_NOT_FOUND)

        serializer = self.serializer_class(data=request.data, context={'request': request})
        if serializer.is_valid(raise_exception=True):
            # Validate -
            user = serializer.validated_data['user']
            # Create and send callback token
            success = TokenService.send_token(user, self.alias_type, **self.message_payload)

            # Respond With Success Or Failure of Sent
            if success:
                status_code = status.HTTP_200_OK
                response_detail = self.success_response
            else:
                status_code = status.HTTP_400_BAD_REQUEST
                response_detail = self.failure_response
            return Response({'detail': response_detail}, status=status_code)
        else:
            return Response(serializer.error_messages, status=status.HTTP_400_BAD_REQUEST)
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def test_coupon_not_redeemable(self):
        """
        A 404 should be returned if coupon is not redeemable
        """
        with patch(
            'ecommerce.views.is_coupon_redeemable', autospec=True
        ) as _is_redeemable_mock:
            _is_redeemable_mock.return_value = False
            resp = self.client.post(
                reverse('coupon-user-create', kwargs={'code': self.coupon.coupon_code}),
                data={
                    "username": get_social_username(self.user)
                },
                format='json',
            )
            assert resp.status_code == status.HTTP_404_NOT_FOUND
        _is_redeemable_mock.assert_called_with(self.coupon, self.user)
项目:VManagePlatform    作者:welliamcao    | 项目源码 | 文件源码
def vmServer_detail(request, id,format=None):
    """
    Retrieve, update or delete a server assets instance.
    """
    try:
        snippet = VmServer.objects.get(id=id)
    except VmServer.DoesNotExist:
        return Response(status=status.HTTP_404_NOT_FOUND)

    if request.method == 'GET':
        serializer = VmServerSerializer(snippet)
        return Response(serializer.data)

    elif request.method == 'PUT':
        serializer = VmServerSerializer(snippet, data=request.data)
        if serializer.is_valid():
            serializer.save()
            return Response(serializer.data)
        return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

    elif request.method == 'DELETE':
        if not request.user.has_perm('vmanageplatform.delete_vmserver'):
            return Response(status=status.HTTP_403_FORBIDDEN)
        snippet.delete()
        return Response(status=status.HTTP_204_NO_CONTENT)
项目:VManagePlatform    作者:welliamcao    | 项目源码 | 文件源码
def vmlog_detail(request, id,format=None):
    """
    Retrieve, update or delete a server assets instance.
    """
    try:
        snippet = VmLogs.objects.get(id=id)
    except VmLogs.DoesNotExist:
        return Response(status=status.HTTP_404_NOT_FOUND)

    if request.method == 'GET':
        serializer = VmLogsSerializer(snippet)
        return Response(serializer.data)

    elif request.method == 'PUT':
        serializer = VmLogsSerializer(snippet, data=request.data)
        if serializer.is_valid():
            serializer.save()
            return Response(serializer.data)
        return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

    elif request.method == 'DELETE':
        if not request.user.has_perm('vmanageplatform.delete_vmserver'):
            return Response(status=status.HTTP_403_FORBIDDEN)
        snippet.delete()
        return Response(status=status.HTTP_204_NO_CONTENT)
项目:django-rest-framework-jwt-refresh-token    作者:lock8    | 项目源码 | 文件源码
def test_get_refresth_token_detail(self):
        self.client.credentials(
            HTTP_AUTHORIZATION='JWT ' + utils.jwt_encode_handler(
                utils.jwt_payload_handler(self.user)))
        response = self.client.get(self.detail_url)
        self.assertEqual(
            response.status_code,
            status.HTTP_200_OK,
            (response.status_code, response.content)
        )
        response = self.client.get(self.detail_url1)
        self.assertEqual(
            response.status_code,
            status.HTTP_404_NOT_FOUND,
            (response.status_code, response.content)
        )
项目:django-rest-framework-jwt-refresh-token    作者:lock8    | 项目源码 | 文件源码
def test_delete_refresth_token(self):
        self.client.credentials(
            HTTP_AUTHORIZATION='JWT ' + utils.jwt_encode_handler(
                utils.jwt_payload_handler(self.user)))
        response = self.client.delete(self.detail_url)
        self.assertEqual(
            response.status_code,
            status.HTTP_204_NO_CONTENT,
            (response.status_code, response.content)
        )
        response = self.client.delete(self.detail_url1)
        self.assertEqual(
            response.status_code,
            status.HTTP_404_NOT_FOUND,
            (response.status_code, response.content)
        )
项目:polyglot-server    作者:MontrealCorpusTools    | 项目源码 | 文件源码
def get_corpus_status(request, name=None):
    if request.method == 'GET':
        try:
            corpus = Corpus.objects.get(name=name)
        except ObjectDoesNotExist:
            return HttpResponse('Could not find the specified corpus.', status=status.HTTP_404_NOT_FOUND)

        if corpus.current_task_id is None:
            return JsonResponse(data={'data': 'ready'}, status=status.HTTP_200_OK)
        else:
            res = AsyncResult(corpus.current_task_id)
            if res.state == cstates.SUCCESS:
                corpus.current_task_id = None
                if corpus.status == Corpus.IMPORT_RUNNING:
                    corpus.status = Corpus.IMPORTED
                corpus.save()
                return JsonResponse(data={'data': 'ready'}, status=status.HTTP_200_OK)
            elif res.state == cstates.FAILURE:
                corpus.current_task_id = None
                if corpus.status == Corpus.IMPORT_RUNNING:
                    corpus.status = Corpus.NOT_IMPORTED
                corpus.save()
                return JsonResponse(data={'data': 'error'}, status=status.HTTP_200_OK)
            return JsonResponse(data={'data': 'busy'}, status=status.HTTP_200_OK)
项目:desec-stack    作者:desec-io    | 项目源码 | 文件源码
def testCanDeleteOwnedDomain(self):
        httpretty.enable()
        httpretty.register_uri(httpretty.DELETE, settings.NSLORD_PDNS_API + '/zones/' + self.ownedDomains[1].name + '.')
        httpretty.register_uri(httpretty.DELETE, settings.NSMASTER_PDNS_API + '/zones/' + self.ownedDomains[1].name+ '.')

        url = reverse('domain-detail', args=(self.ownedDomains[1].pk,))
        response = self.client.delete(url)
        self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
        self.assertEqual(httpretty.last_request().method, 'DELETE')
        self.assertEqual(httpretty.last_request().headers['Host'], 'nsmaster:8081')

        httpretty.reset()
        httpretty.register_uri(httpretty.DELETE, settings.NSLORD_PDNS_API + '/zones/' + self.ownedDomains[1].name + '.')
        httpretty.register_uri(httpretty.DELETE, settings.NSMASTER_PDNS_API + '/zones/' + self.ownedDomains[1].name+ '.')

        response = self.client.get(url)
        self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
        self.assertTrue(isinstance(httpretty.last_request(), httpretty.core.HTTPrettyRequestEmpty))
项目:desec-stack    作者:desec-io    | 项目源码 | 文件源码
def testCanDeleteOwnedDynDomain(self):
        httpretty.enable()
        httpretty.register_uri(httpretty.DELETE, settings.NSLORD_PDNS_API + '/zones/' + self.ownedDomains[1].name + '.')
        httpretty.register_uri(httpretty.DELETE, settings.NSMASTER_PDNS_API + '/zones/' + self.ownedDomains[1].name + '.')

        url = reverse('domain-detail', args=(self.ownedDomains[1].pk,))
        response = self.client.delete(url)
        self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)

        # FIXME In this testing scenario, the parent domain dedyn.io does not
        # have the proper NS and DS records set up, so we cannot test their
        # deletion.

        httpretty.reset()
        httpretty.register_uri(httpretty.DELETE, settings.NSLORD_PDNS_API + '/zones/' + self.ownedDomains[1].name + '.')
        httpretty.register_uri(httpretty.DELETE, settings.NSMASTER_PDNS_API + '/zones/' + self.ownedDomains[1].name+ '.')

        response = self.client.get(url)
        self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
        self.assertTrue(isinstance(httpretty.last_request(), httpretty.core.HTTPrettyRequestEmpty))
项目:desec-stack    作者:desec-io    | 项目源码 | 文件源码
def assertIP(self, ipv4=None, ipv6=None, name=None):
        old_credentials = self.client._credentials['HTTP_AUTHORIZATION']
        self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.password)
        name = name or self.username

        def verify_response(type_, ip):
            url = reverse('rrset', args=(name, '', type_,))
            response = self.client.get(url)

            if ip is not None:
                self.assertEqual(response.data['records'][0], ip)
                self.assertEqual(response.status_code, status.HTTP_200_OK)
                self.assertEqual(response.data['ttl'], 60)
            else:
                self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)

        verify_response('A', ipv4)
        verify_response('AAAA', ipv6)

        self.client.credentials(HTTP_AUTHORIZATION=old_credentials)
项目:delft3d-gt-server    作者:openearth    | 项目源码 | 文件源码
def test_scenario_put(self):
        # detail view for PUT (udpate)
        url = reverse('scenario-detail', args=[self.scenario.pk])

        # foo can update
        self.client.login(username='foo', password='secret')
        data = {
            "name": "New Scenario (Updated)",
            "scene_set": [],
        }
        response = self.client.put(url, data, format='json')
        self.assertEqual(response.status_code, status.HTTP_200_OK)

        # bar cannot update
        self.client.login(username='bar', password='secret')
        data = {
            "name": "New Scenario (Updated)",
            "scene_set": [],
        }
        response = self.client.put(url, data, format='json')
        self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
项目:delft3d-gt-server    作者:openearth    | 项目源码 | 文件源码
def test_scenario_start(self, mocked_scene_method):
        # start view
        url = reverse('scenario-start', args=[self.scenario.pk])

        # bar cannot see
        self.client.login(username='bar', password='secret')
        response = self.client.put(url, {}, format='json')
        self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
        self.assertEqual(mocked_scene_method.call_count, 0)

        # foo can start
        self.client.login(username='foo', password='secret')

        response = self.client.put(url, {}, format='json')
        self.assertEqual(response.status_code, status.HTTP_200_OK)
        mocked_scene_method.assert_called_with(
            self.scenario, self.user_foo)
项目:fexum    作者:KDD-OpenSource    | 项目源码 | 文件源码
def post(self, request, target_id):
        target = get_object_or_404(Feature, pk=target_id)
        feature_ids = request.data.get('features') or []
        features_queryset = Feature.objects.filter(id__in=feature_ids, dataset=target.dataset)

        # Make sure that we filtered for all feature ids
        if features_queryset.count() != len(feature_ids) and len(feature_ids) > 0:
            return Response(status=HTTP_404_NOT_FOUND, data={'detail': 'Not found.'})

        result = ResultCalculationMap.objects.filter(target=target).last()
        slices_queryset = Slice.objects.filter(result_calculation_map=result).annotate(
            feature_count=Count('features')).filter(feature_count=len(feature_ids))
        for feature in features_queryset.all():
            slices_queryset = slices_queryset.filter(features=feature)

        if slices_queryset.count() == 1:
            output_definition = slices_queryset.last().output_definition
        else:
            return Response([])
        return Response(output_definition)
项目:State-TalentMAP-API    作者:18F    | 项目源码 | 文件源码
def email_share(self, user, email, type, id):
        # Get our e-mail formatter
        formatter = self.get_email_formatter(type)
        instance = None

        # Attempt to get the object instance we want to share
        try:
            instance = apps.get_model(self.AVAILABLE_TYPES[type]).objects.get(id=id)
        except ObjectDoesNotExist:
            # If it doesn't exist, respond with a 404
            return Response({"message": f"Object with id {id} does not exist"}, status=status.HTTP_404_NOT_FOUND)

        # Create our e-mail body
        email_body = {
            "to": email,
            "subject": f"[TalentMAP] Shared {type}",
            "body": formatter(instance)
        }

        # TODO: Implement actual e-mail sending here when avaiable e-mail servers are clarified

        # Return a 202 ACCEPTED with a copy of the email body
        return Response({"message": f"Position shared externally via email at {email}", "email_body": email_body}, status=status.HTTP_202_ACCEPTED)
项目:State-TalentMAP-API    作者:18F    | 项目源码 | 文件源码
def internal_share(self, user, email, type, id):
        receiving_user = None

        # Attempt to get the object instance we want to share
        try:
            apps.get_model(self.AVAILABLE_TYPES[type]).objects.get(id=id)
        except ObjectDoesNotExist:
            # If it doesn't exist, respond with a 404
            return Response({"message": f"Object with id {id} does not exist"}, status=status.HTTP_404_NOT_FOUND)

        # Attempt to get the receiving user by e-mail address
        try:
            receiving_user = UserProfile.objects.get(user__email=email)
        except ObjectDoesNotExist:
            return Response({"message": f"User with email {email} does not exist"}, status=status.HTTP_404_NOT_FOUND)

        # Create our sharable object using the source user, receiving user, id, and model
        # This will auto-populate in the receiving user's received shares on their profile
        Sharable.objects.create(sharing_user=user.profile,
                                receiving_user=receiving_user,
                                sharable_id=id,
                                sharable_model=self.AVAILABLE_TYPES[type])

        return Response({"message": f"Position shared internally to user with email {email}"}, status=status.HTTP_202_ACCEPTED)
项目:State-TalentMAP-API    作者:18F    | 项目源码 | 文件源码
def test_favorite_action_endpoints(authorized_client, authorized_user):
    position = mommy.make_recipe('talentmap_api.position.tests.position')
    response = authorized_client.get(f'/api/v1/position/{position.id}/favorite/')

    assert response.status_code == status.HTTP_404_NOT_FOUND

    response = authorized_client.put(f'/api/v1/position/{position.id}/favorite/')

    assert response.status_code == status.HTTP_204_NO_CONTENT

    response = authorized_client.get(f'/api/v1/position/{position.id}/favorite/')

    assert response.status_code == status.HTTP_204_NO_CONTENT

    response = authorized_client.delete(f'/api/v1/position/{position.id}/favorite/')

    assert response.status_code == status.HTTP_204_NO_CONTENT

    response = authorized_client.get(f'/api/v1/position/{position.id}/favorite/')

    assert response.status_code == status.HTTP_404_NOT_FOUND
项目:Code_python    作者:RoJoHub    | 项目源码 | 文件源码
def snippet_detail(request, pk, format=None):
    """
    Retrieve, update or delete a snippet instance.
    """
    print(pk)
    try:
        snippet = Snippet.objects.get(pk=pk)
    except Snippet.DoesNotExist:
        return Response(status=status.HTTP_404_NOT_FOUND)

    if request.method == 'GET':
        serializer = SnippetSerializer(snippet)
        return Response(serializer.data)

    elif request.method == 'PUT':
        serializer = SnippetSerializer(snippet, data=request.data)
        if serializer.is_valid():
            serializer.save()
            return Response(serializer.data)
        return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

    elif request.method == 'DELETE':
        snippet.delete()
        return Response(status=status.HTTP_204_NO_CONTENT)
项目:CommunityCellularManager    作者:facebookincubator    | 项目源码 | 文件源码
def get(self, request):

        logger.warning("Use of deprecated API call %s" % (request.GET, ))

        if "uuid" not in request.GET:
            return Response("No uuid specified.",
                            status=status.HTTP_400_BAD_REQUEST)
        query_num = request.GET["uuid"]
        try:
            network = get_network_from_user(request.user)
            bts = BTS.objects.get(uuid=query_num)
            # Strip the protocol field and just return the rest, removing any
            # trailing slash.
            bts_info = urlparse.urlparse(bts.inbound_url)
            result = {
                'netloc': bts_info.netloc,
                'hostname': bts_info.hostname,
                'owner': bts.network.id
            }
            return Response(result, status=status.HTTP_200_OK)
        except Number.DoesNotExist:
            return Response("No such UUID", status=status.HTTP_404_NOT_FOUND)
项目:CommunityCellularManager    作者:facebookincubator    | 项目源码 | 文件源码
def get(self, request):
        """"Handles GET requests."""
        user_profile = models.UserProfile.objects.get(user=request.user)
        if not user_profile.user.is_staff:
            return response.Response('', status=status.HTTP_404_NOT_FOUND)
        numbers = models.Number.objects.all()
        # Configure the table of numbers.  Do not show any pagination controls
        # if the total number of numbers is small.
        number_table = django_tables.NumberTable(list(numbers))
        numbers_per_page = 20
        paginate = False
        if numbers.count() > numbers_per_page:
            paginate = {'per_page': numbers_per_page}
        tables.RequestConfig(request, paginate=paginate).configure(
            number_table)
        context = {
            'networks': get_objects_for_user(request.user, 'view_network', klass=models.Network),
            'user_profile': user_profile,
            'number_table': number_table,
        }
        # Render template.
        numbers_template = template.loader.get_template(
            'dashboard/staff/numbers.html')
        html = numbers_template.render(context, request)
        return http.HttpResponse(html)
项目:CommunityCellularManager    作者:facebookincubator    | 项目源码 | 文件源码
def delete(self, request, bts_uuid=None, number=None, format=None):
        """ Dis-associate a number from a BTS and mark it available. """
        if not (number or bts_uuid):
            return Response("", status=status.HTTP_400_BAD_REQUEST)
        network = get_network_from_user(request.user)
        try:
            bts = models.BTS.objects.get(uuid=bts_uuid,
                                         network=network)
        except models.BTS.DoesNotExist:
            return Response("User is not associated with that BTS.",
                            status=status.HTTP_403_FORBIDDEN)
        with transaction.atomic():
            q = models.Number.objects.filter(number__exact=number).filter(
                network=bts.network)
            for number in q:
                number.state = "available"
                number.network = None
                number.subscriber = None
                number.save()
                return Response(None, status=status.HTTP_200_OK)
            return Response(None, status=status.HTTP_404_NOT_FOUND)
项目:cjworkbench    作者:CJWorkbench    | 项目源码 | 文件源码
def test_wf_module_detail_get(self):
        # Also tests [Workflow, Module, WfModule].get
        workflow_id = Workflow.objects.get(name='Workflow 1').id
        module_id = Module.objects.get(name='Module 1').id
        module_version = ModuleVersion.objects.get(module=Module.objects.get(name='Module 1'))
        pk_wf_module = WfModule.objects.get(workflow_id=workflow_id,
                                            module_version=module_version).id
        notes = WfModule.objects.get(workflow_id=workflow_id,
                                     module_version=module_version).notes

        response = self.client.get('/api/wfmodules/%d/' % pk_wf_module)
        self.assertIs(response.status_code, status.HTTP_200_OK)
        self.assertEqual(response.data['id'], pk_wf_module)
        self.assertEqual(response.data['workflow'], workflow_id)
        self.assertEqual(response.data['notes'], notes)
        self.assertEqual(response.data['module_version']['module']['id'], module_id)
        self.assertEqual(response.data['status'], WfModule.READY)
        self.assertEqual(response.data['error_msg'], '')

        response = self.client.get('/api/wfmodules/%d/' % 10000)
        self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)

    # this constrains the output API format, detects changes that would break client code
项目:cjworkbench    作者:CJWorkbench    | 项目源码 | 文件源码
def test_workflow_duplicate_view(self):
        old_ids = [w.id for w in Workflow.objects.all()] # list of all current workflow ids
        response = self.client.get('/api/workflows/%d/duplicate' % self.workflow1.id)
        self.assertEqual(response.status_code, status.HTTP_201_CREATED)
        new_id = response.data['id']
        self.assertFalse(new_id in old_ids)         # created at entirely new id
        self.assertEqual(response.data['name'], 'Copy of ' + self.workflow1.name)
        new_wf = Workflow.objects.get(pk=new_id)    # will fail if no Workflow created

        # Ensure 404 with bad id
        response = self.client.get('/api/workflows/%d/duplicate' % 999999)
        self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)

        # Ensure 403 when another user tries to clone private workflow
        self.assertFalse(self.workflow1.public)
        self.client.force_login(self.otheruser)
        response = self.client.get('/api/workflows/%d/duplicate' % self.workflow1.id)
        self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)

        # But another user can duplicate public workflow
        self.workflow1.public = True
        self.workflow1.save()
        response = self.client.get('/api/workflows/%d/duplicate' % self.workflow1.id)
        self.assertEqual(response.status_code, status.HTTP_201_CREATED)
项目:cjworkbench    作者:CJWorkbench    | 项目源码 | 文件源码
def workflow_duplicate(request, pk):
    try:
        workflow = Workflow.objects.get(pk=pk)
    except Workflow.DoesNotExist:
        return Response(status=status.HTTP_404_NOT_FOUND)

    if not workflow.user_authorized_read(request.user):
        return HttpResponseForbidden()

    workflow2 = workflow.duplicate(request.user)
    serializer = WorkflowSerializerLite(workflow2)

    return Response(serializer.data, status.HTTP_201_CREATED)


# Undo or redo
项目:cjworkbench    作者:CJWorkbench    | 项目源码 | 文件源码
def parameterval_event(request, pk, format=None):
    try:
        param = ParameterVal.objects.get(pk=pk)
    except ParameterVal.DoesNotExist:
        return Response(status=status.HTTP_404_NOT_FOUND)

    if not param.user_authorized_write(request.user):
        return HttpResponseForbidden()

    # change parameter value
    data = request.data
    dispatch_response = module_dispatch_event(param.wf_module, parameter=param, event=data, request=request)
    if dispatch_response:
        return dispatch_response

    return Response(status=status.HTTP_204_NO_CONTENT)


# Return a parameter val that is actually an image
项目:fallball-service    作者:ingrammicro    | 项目源码 | 文件源码
def list(self, request, *args, **kwargs):
        application = Application.objects.filter(owner=request.user).first()
        if application:
            resellers = Reseller.objects.filter(application=application)
        else:
            resellers = Reseller.objects.filter(owner=request.user)
            if not resellers:
                client_user = ClientUser.objects.filter(owner=request.user).first()
                if not client_user:
                    return Response("Resellers do not exist for such account",
                                    status=HTTP_404_NOT_FOUND)

                queryset = [client_user.client.reseller, ]
                serializer = ResellerNameSerializer(queryset, many=True)
                return Response(serializer.data)

        queryset = resellers
        serializer = ResellerSerializer(queryset, many=True)
        return Response(serializer.data)
项目:fallball-service    作者:ingrammicro    | 项目源码 | 文件源码
def retrieve(self, request, *args, **kwargs):
        """
        Return particular client which owned by particular reseller
        """
        application = Application.objects.filter(owner=request.user).first()
        if application:
            reseller = Reseller.objects.filter(name=kwargs['reseller_name'],
                                               application=application).first()
        else:
            reseller = Reseller.objects.filter(name=kwargs['reseller_name'],
                                               owner=request.user).first()
            if not reseller:
                admin = ClientUser.objects.filter(owner=request.user, admin=True).first()
                if not admin:
                    return Response("Client does not exist", status=HTTP_404_NOT_FOUND)
                if not admin.client.name == kwargs['name']:
                    return Response("Authorization failed", status=status.HTTP_403_FORBIDDEN)
                reseller = admin.client.reseller

        client = Client.objects.filter(reseller=reseller, name=kwargs['name']).first()
        if not client:
            return Response("Client does not exist", status=status.HTTP_404_NOT_FOUND)
        queryset = (client, )
        serializer = ClientSerializer(queryset, many=True)
        return Response(serializer.data[0])
项目:fallball-service    作者:ingrammicro    | 项目源码 | 文件源码
def token(self, request, **kwargs):
        application = Application.objects.filter(owner=request.user).first()
        if application:
            reseller = Reseller.objects.filter(name=kwargs['reseller_name'],
                                               application=application).first()
        else:
            reseller = Reseller.objects.filter(name=kwargs['reseller_name'],
                                               owner=request.user).first()
            if not reseller:
                admin = ClientUser.objects.filter(owner=request.user, admin=True).first()
                if not admin:
                    return Response("Client does not exist", status=HTTP_404_NOT_FOUND)
                reseller = admin.client.reseller

        client = Client.objects.filter(reseller=reseller, name=kwargs['client_name'])
        clientuser = ClientUser.objects.filter(client=client, user_id=kwargs['user_id']).first()
        if not clientuser or not clientuser.owner:
            return Response("User does not exist", status=status.HTTP_404_NOT_FOUND)

        user = clientuser.owner

        token = get_jwt_token(user)
        return Response(token, status=status.HTTP_200_OK)
项目:videofront    作者:openfun    | 项目源码 | 文件源码
def upload(self, request, video_id=None):
        """
        Upload a video file.
        """
        try:
            video_upload_url = models.VideoUploadUrl.objects.available().get(public_video_id=video_id)
        except models.VideoUploadUrl.DoesNotExist:
            return Response(status=rest_status.HTTP_404_NOT_FOUND)

        # CORS headers
        cors_headers = {}
        if video_upload_url.origin:
            cors_headers["Access-Control-Allow-Origin"] = video_upload_url.origin

        # OPTIONS call
        if request.method == 'OPTIONS':
            return Response({}, headers=cors_headers)

        # POST call
        video_file = request.FILES.get('file')
        if video_file is None or video_file.size == 0:
            return Response({'file': "Missing argument"}, status=rest_status.HTTP_400_BAD_REQUEST, headers=cors_headers)
        tasks.upload_video(video_upload_url.public_video_id, video_file)
        return Response({'id': video_upload_url.public_video_id}, headers=cors_headers)
项目:PmpApi    作者:pengcheng789    | 项目源码 | 文件源码
def get(self, request, commodity_id):
        result = check_token(request)
        if result['permission'] is False:
            return result['response']

        if all(auth not in result['auth'] for auth in ['1', '11', ]):
            alert = {'alert': '??????????????'}
            return Response(alert, status=status.HTTP_403_FORBIDDEN)

        commodity = self.get_object(commodity_id)
        if commodity is None:
            alert = {'alert': '?????????'}
            return Response(alert, status=status.HTTP_404_NOT_FOUND)

        serializer = CommoditySerializer(commodity)
        return Response(serializer.data)
项目:PmpApi    作者:pengcheng789    | 项目源码 | 文件源码
def get(self, request, ie_id):
        result = check_token(request)
        if result['permission'] is False:
            return result['response']

        if all(auth not in result['auth'] for auth in ['1', '11', ]):
            alert = {'alert': '??????????????'}
            return Response(alert, status=status.HTTP_403_FORBIDDEN)

        in_entrepot_detail_list = self.get_object_list(ie_id)
        if not in_entrepot_detail_list:
            try:
                in_entrepot = InEntrepot.objects.get(ie_id=ie_id)
            except InEntrepot.DoesNotExist:
                alert = {'alert': '????????'}
                return Response(alert, status=status.HTTP_404_NOT_FOUND)

            serializer = InEntrepotSerializer(in_entrepot)
            return Response(serializer.data)

        serializer = InEntrepotDetailSerializer(in_entrepot_detail_list, many=True)
        return Response(serializer.data)
项目:PmpApi    作者:pengcheng789    | 项目源码 | 文件源码
def get(self, request, oe_id):
        result = check_token(request)
        if result['permission'] is False:
            return result['response']

        if all(auth not in result['auth'] for auth in ['1', '11', ]):
            alert = {'alert': '??????????????'}
            return Response(alert, status=status.HTTP_403_FORBIDDEN)

        out_entrepot_detail_list = self.get_object_list(oe_id)
        if not out_entrepot_detail_list:
            try:
                out_entrepot = OutEntrepot.objects.get(oe_id=oe_id)
            except OutEntrepot.DoesNotExist:
                alert = {'alert': '????????'}
                return Response(alert, status=status.HTTP_404_NOT_FOUND)

            serializer = OutEntrepotSerializer(out_entrepot)
            return Response(serializer.data)

        serializer = OutEntrepotDetailSerializer(out_entrepot_detail_list, many=True)
        return Response(serializer.data)
项目:PmpApi    作者:pengcheng789    | 项目源码 | 文件源码
def get(self, request, emp_id):
        result = check_token(request)

        if result['permission'] is False:
            return result['response']

        if all(auth not in result['auth'] for auth in ['1', '2', ]) and result['emp_id'] != emp_id:
            alert = {'alert': '??????????????'}
            return Response(alert, status=status.HTTP_403_FORBIDDEN)

        employee = self.get_object(emp_id)
        if employee is None:
            alert = {'alert': '?????????'}
            return Response(alert, status=status.HTTP_404_NOT_FOUND)

        serializer = EmployeeSerializer(employee)
        return Response(serializer.data)
项目:PmpApi    作者:pengcheng789    | 项目源码 | 文件源码
def get(self, request, repair_id):
        result = check_token(request)
        if not result['permission']:
            return result['response']

        repair = self.get_object(repair_id)

        if repair is None:
            alert = {'alert': '????????'}
            return Response(alert, status=status.HTTP_404_NOT_FOUND)

        if all(auth not in result['auth'] for auth in ['1', '10']) and repair.part_id != result['part_id']:
            alert = {'alert': '?????????????'}
            return Response(alert, status=status.HTTP_403_FORBIDDEN)

        serializer = RepairSerializer(repair)
        return Response(serializer.data)
项目:PmpApi    作者:pengcheng789    | 项目源码 | 文件源码
def get(self, request):
        result = check_token(request)
        if not result['permission']:
            return result['response']

        if '1' in result['auth']:
            dailies = Daily.objects.all()
        else:
            try:
                part = Department.objects.get(part_id=result['part_id'])
            except Department.DoesNotExist:
                alert = {'alert': '???????'}
                return Response(alert, status=status.HTTP_404_NOT_FOUND)
            dailies = Daily.objects.filter(part=part)

        if not dailies:
            alert = {'alert': '??????!'}
            return Response(alert)

        serializer = DailySerializer(dailies, many=True)
        return Response(serializer.data)