我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用rest_framework.status.HTTP_200_OK。
def employee_role_list(request): """ Returns employee role full list --- serializer: employees.serializers.EmployeeRoleListSerializer responseMessages: - code: 401 message: Unauthorized. Authentication credentials were not provided. Invalid token. - code: 403 message: Forbidden. - code: 404 message: Not found """ if request.method == 'GET': role_list = get_list_or_404(Role) serializer = EmployeeRoleListSerializer(role_list, many=True) return Response(serializer.data, status=status.HTTP_200_OK)
def get(self, request, format=None): """ List all badges --- serializer: administrator.serializers.BadgeSerializer parameters: - name: pagination required: false type: string paramType: query """ badges = get_list_or_404(Badge) if request.GET.get('pagination'): pagination = request.GET.get('pagination') if pagination == 'true': paginator = AdministratorPagination() results = paginator.paginate_queryset(badges, request) serializer = BadgeSerializer(results, many=True) return paginator.get_paginated_response(serializer.data) else: return Response(status=status.HTTP_400_BAD_REQUEST) else: serializer = BadgeSerializer(badges, many=True) return Response(serializer.data, status=status.HTTP_200_OK)
def get(self, request, format=None): """ List all employee positions --- serializer: administrator.serializers.PositionSerializer parameters: - name: pagination required: false type: string paramType: query """ positions = get_list_or_404(Position) if request.GET.get('pagination'): pagination = request.GET.get('pagination') if pagination == 'true': paginator = AdministratorPagination() results = paginator.paginate_queryset(positions, request) serializer = PositionSerializer(results, many=True) return paginator.get_paginated_response(serializer.data) else: return Response(status=status.HTTP_400_BAD_REQUEST) else: serializer = PositionSerializer(positions, many=True) return Response(serializer.data, status=status.HTTP_200_OK)
def SwaggerRequestMethodMaker(model = None): def model_handler(self, request, *args, **kwargs): data = kwargs.get('data', None) resp = model return Response(resp, status = status.HTTP_200_OK) def empty_handler(self, request, *args, **kwargs): data = kwargs.get('data', None) resp = None return Response(resp, status = status.HTTP_200_OK) if model: return model_handler else: return empty_handler # make named APIView class with specified methods
def test_stop_entry(self): DESCRIPTION = 'EXAMPLE' startTime = timezone.now() currentEntry = TimeEntry(user=self.TestUser, description=DESCRIPTION, start=startTime) currentEntry.save() url = reverse("api:time-entry-stop") data = {} response = self.client.post(url, data) response_start_time = dateparse.parse_datetime(response.data['start']) response_stop_time = dateparse.parse_datetime(response.data['stop']) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.data['description'], DESCRIPTION) self.assertEqual(response_start_time, startTime) self.assertGreater(response_stop_time, response_start_time) self.assertEqual(response.data['duration'],(response_stop_time- response_start_time).total_seconds())
def category_detail(request, category_id): """ Category detail --- serializer: categories.serializers.CategorySerializer responseMessages: - code: 400 message: Bad request. - code: 401 message: Unauthorized. Authentication credentials were not provided. Invalid token. - code: 403 message: Forbidden. - code: 404 message: Not found """ category = get_object_or_404(Category, pk=category_id) if request.method == 'GET': serializer = CategorySerializer(category) return Response(serializer.data, status=status.HTTP_200_OK)
def keyword_detail(request, keyword_id): """ Keyword detail --- serializer: categories.serializers.KeywordSerializer responseMessages: - code: 400 message: Bad request. - code: 401 message: Unauthorized. Authentication credentials were not provided. Invalid token. - code: 403 message: Forbidden. - code: 404 message: Not found """ keyword = get_object_or_404(Keyword, pk=keyword_id) if request.method == 'GET': serializer = KeywordSerializer(keyword) return Response(serializer.data, status=status.HTTP_200_OK)
def employee(request, employee_id): """ Returns employee details --- serializer: employees.serializers.EmployeeSerializer responseMessages: - code: 401 message: Unauthorized. Authentication credentials were not provided. Invalid token. - code: 403 message: Forbidden. - code: 404 message: Not found """ if request.method == 'GET': employee = get_object_or_404(Employee, pk=employee_id) serializer = EmployeeSerializer(employee) return Response(serializer.data, status=status.HTTP_200_OK)
def employee_position_list(request): """ Returns employee position full list --- serializer: employees.serializers.EmployeePositionListSerializer responseMessages: - code: 401 message: Unauthorized. Authentication credentials were not provided. Invalid token. - code: 403 message: Forbidden. - code: 404 message: Not found """ if request.method == 'GET': position_list = get_list_or_404(Position) serializer = EmployeePositionListSerializer(position_list, many=True) return Response(serializer.data, status=status.HTTP_200_OK)
def get(self, request, format=None): """ List all employees --- serializer: administrator.serializers.EmployeeSerializer parameters: - name: pagination required: false type: string paramType: query """ employees = get_list_or_404(Employee) if request.GET.get('pagination'): pagination = request.GET.get('pagination') if pagination == 'true': paginator = AdministratorPagination() results = paginator.paginate_queryset(employees, request) serializer = EmployeeSerializer(results, many=True) return paginator.get_paginated_response(serializer.data) else: return Response(status=status.HTTP_400_BAD_REQUEST) else: serializer = EmployeeSerializer(employees, many=True) return Response(serializer.data, status=status.HTTP_200_OK)
def get(self, request, format=None): """ List all events --- serializer: administrator.serializers.EventSerializer parameters: - name: pagination required: false type: string paramType: query """ events = get_list_or_404(Event) if request.GET.get('pagination'): pagination = request.GET.get('pagination') if pagination == 'true': paginator = AdministratorPagination() results = paginator.paginate_queryset(events, request) serializer = EventSerializer(results, many=True) return paginator.get_paginated_response(serializer.data) else: return Response(status=status.HTTP_400_BAD_REQUEST) else: serializer = EventSerializer(events, many=True) return Response(serializer.data, status=status.HTTP_200_OK)
def get(self, request, format=None): """ List all employee positions --- serializer: administrator.serializers.LocationSerializer parameters: - name: pagination required: false type: string paramType: query """ locations = get_list_or_404(Location) if request.GET.get('pagination'): pagination = request.GET.get('pagination') if pagination == 'true': paginator = AdministratorPagination() results = paginator.paginate_queryset(locations, request) serializer = LocationSerializer(results, many=True) return paginator.get_paginated_response(serializer.data) else: return Response(status=status.HTTP_400_BAD_REQUEST) else: serializer = LocationSerializer(locations, many=True) return Response(serializer.data, status=status.HTTP_200_OK)
def get(self, request, format=None): """ List all messages --- serializer: administrator.serializers.MessageSerializer parameters: - name: pagination required: false type: string paramType: query """ messages = get_list_or_404(Message) if request.GET.get('pagination'): pagination = request.GET.get('pagination') if pagination == 'true': paginator = AdministratorPagination() results = paginator.paginate_queryset(messages, request) serializer = MessageSerializer(results, many=True) return paginator.get_paginated_response(serializer.data) else: return Response(status=status.HTTP_400_BAD_REQUEST) else: serializer = MessageSerializer(messages, many=True) return Response(serializer.data, status=status.HTTP_200_OK)
def get(self, request, employee_id, format=None): """ List all messages from employee --- serializer: administrator.serializers.MessageSerializer parameters: - name: pagination required: false type: string paramType: query """ employee = get_object_or_404(Employee, pk=employee_id) messages = get_list_or_404(Message, from_user=employee) if request.GET.get('pagination'): pagination = request.GET.get('pagination') if pagination == 'true': paginator = AdministratorPagination() results = paginator.paginate_queryset(messages, request) serializer = MessageSerializer(results, many=True) return paginator.get_paginated_response(serializer.data) else: return Response(status=status.HTTP_400_BAD_REQUEST) else: serializer = MessageSerializer(messages, many=True) return Response(serializer.data, status=status.HTTP_200_OK)
def get(self, request, format=None): """ Get site info --- serializer: administrator.serializers.SiteInfoSerializer """ email_domain = settings.EMAIL_DOMAIN_LIST[0] current_site = Site.objects.get_current() version = config.VERSION data = {'site': current_site.domain, 'email_domain': email_domain, 'backend_version': version} serializer = SiteInfoSerializer(data) return Response(serializer.data, status=status.HTTP_200_OK)
def test_distance_uppsala(self): point_uppsala = {'lat': 59.858564, 'lng': 17.638927} url = '%s?dist=100000&point=%s,%s' % ( reverse('record_test_list'), point_uppsala['lng'], point_uppsala['lat']) response = self.client.get(url, format='json') self.assertEqual(response.status_code, status.HTTP_200_OK) json_response = json.loads(response.content) order_list = ['Uppsala', 'Gamla Stan', 'Hornstull'] self.assertEqual(len(json_response), len(order_list)) for idx, order_key in enumerate(order_list): self.assertTrue(json_response[idx]['title'] == order_key) self.assertEqual(json_response[0]['distance'], 0.0)
def _search_by_field(self, field_name, search_term): """Search by field.""" self.authenticate() url = reverse('bookdocument-list', kwargs={}) data = {} # Should contain 20 results response = self.client.get(url, data) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(len(response.data['results']), self.all_count) # Should contain only 10 results filtered_response = self.client.get( url + '?search={}'.format(search_term), data ) self.assertEqual(filtered_response.status_code, status.HTTP_200_OK) self.assertEqual( len(filtered_response.data['results']), self.special_count )
def _search_by_nested_field(self, search_term): """Search by field.""" self.authenticate() url = reverse('citydocument-list', kwargs={}) data = {} # Should contain 20 results response = self.client.get(url, data) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(len(response.data['results']), self.all_cities_count) # Should contain only 10 results filtered_response = self.client.get( url + '?search={}'.format(search_term), data ) self.assertEqual(filtered_response.status_code, status.HTTP_200_OK) self.assertEqual( len(filtered_response.data['results']), self.switz_cities_count )
def _field_filter_value(self, field_name, value, count): """Field filter value. Usage example: >>> self._field_filter_value( >>> 'title__wildcard', >>> self.prefix[3:-3], >>> self.prefix_count >>> ) """ url = self.base_url[:] data = {} response = self.client.get( url + '?{}={}'.format(field_name, value), data ) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual( len(response.data['results']), count )
def _field_filter_terms_list(self, field_name, in_values, count): """Field filter terms. Example: http://localhost:8000/api/articles/?id=1&id=2&id=3 """ url = self.base_url[:] data = {} url_parts = ['{}={}'.format(field_name, val) for val in in_values] response = self.client.get( url + '?{}'.format('&'.join(url_parts)), data ) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual( len(response.data['results']), count )
def _test_pagination(self): """Test pagination.""" self.authenticate() publishers_url = reverse('publisherdocument-list', kwargs={}) books_url = reverse('bookdocument-list', kwargs={}) data = {} invalid_page_url = books_url + '?page=3&page_size=30' invalid_response = self.client.get(invalid_page_url, data) self.assertEqual( invalid_response.status_code, status.HTTP_404_NOT_FOUND ) valid_page_url = publishers_url + '?limit=5&offset=8' # Check if response now is valid valid_response = self.client.get(valid_page_url, data) self.assertEqual(valid_response.status_code, status.HTTP_200_OK) # Check totals self.assertEqual(len(valid_response.data['results']), 5)
def test_question_create_vote(self): """ Assert POST /api/question/:id/vote creates a vote for a user """ self.login() question = Question.objects.create(**fixtures['question']) endpoint = question.get_api_detail_url() + "vote/" self.assertEqual(question.votes.count(), 0) response = self.client.post(endpoint, {'vote': 1}, format='json') self.assertEqual(response.status_code, status.HTTP_200_OK) expected = {'created': True, 'status': 'vote recorded', 'display': 'upvote'} self.assertDictContainsSubset(expected, response.data) self.assertEqual(question.votes.count(), 1)
def test_question_update_vote(self): """ Assert POST /api/question/:id/vote updates if already voted """ self.login() question = Question.objects.create(**fixtures['question']) vote, _ = Vote.objects.punch_ballot(content=question, user=self.user, vote=1) endpoint = question.get_api_detail_url() + "vote/" self.assertEqual(question.votes.count(), 1) response = self.client.post(endpoint, {'vote': -1}, format='json') self.assertEqual(response.status_code, status.HTTP_200_OK) expected = {'created': False, 'status': 'vote recorded', 'display': 'downvote'} self.assertDictContainsSubset(expected, response.data) self.assertEqual(question.votes.count(), 1)
def results(self, request, pk=None): job = self.get_object() if job.status == AnalysisJob.Status.COMPLETE: results = OrderedDict([ ('census_block_count', job.census_block_count), ('census_blocks_url', job.census_blocks_url), ('connected_census_blocks_url', job.connected_census_blocks_url), ('destinations_urls', job.destinations_urls), ('tile_urls', job.tile_urls), ('overall_scores', job.overall_scores), ('overall_scores_url', job.overall_scores_url), ('score_inputs_url', job.score_inputs_url), ('ways_url', job.ways_url), ]) return Response(results, status=status.HTTP_200_OK) else: return Response(None, status=status.HTTP_404_NOT_FOUND)
def set_password(self, request, pk=None): """Detail ``POST`` endpoint for changing a user's password Args: request (rest_framework.request.Request) pk (str): primary key for user to retrieve user from database Returns: Response """ old_password = request.data.get('oldPassword') user = authenticate(email=PFBUser.objects.get(uuid=pk).email, password=old_password) if not user: raise ValidationError({'detail': 'Unable to complete password change'}) new_password = request.data.get('newPassword') if not new_password: raise ValidationError({'detail': 'Unable to complete password change'}) user.set_password(new_password) user.save() return Response({'detail': 'Successfully changed password'}, status=status.HTTP_200_OK)
def test_login_and_logout(self): """ tests login and logout for a single user """ # there should be zero tokens self.assertEqual(MultiToken.objects.all().count(), 0) token = self.login_and_obtain_token('user1', 'secret1') self.assertEqual(MultiToken.objects.all().count(), 1) # verify the token is for user 1 self.assertEqual( MultiToken.objects.filter(key=token).first().user.username, 'user1' ) # logout response = self.rest_do_logout(token) # make sure the response is "logged_out" self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertContains(response, "{\"status\":\"logged out\"}") # there should now again be zero tokens in the database self.assertEqual(MultiToken.objects.all().count(), 0)
def post(self, request, *args, **kwargs): if self.alias_type.upper() not in api_settings.PASSWORDLESS_AUTH_TYPES: # Only allow auth types allowed in settings. return Response(status=status.HTTP_404_NOT_FOUND) serializer = self.serializer_class(data=request.data, context={'request': request}) if serializer.is_valid(raise_exception=True): # Validate - user = serializer.validated_data['user'] # Create and send callback token success = TokenService.send_token(user, self.alias_type, **self.message_payload) # Respond With Success Or Failure of Sent if success: status_code = status.HTTP_200_OK response_detail = self.success_response else: status_code = status.HTTP_400_BAD_REQUEST response_detail = self.failure_response return Response({'detail': response_detail}, status=status_code) else: return Response(serializer.error_messages, status=status.HTTP_400_BAD_REQUEST)
def post(self, request, *args, **kwargs): serializer = self.serializer_class(data=request.data) if serializer.is_valid(raise_exception=True): user = serializer.validated_data['user'] token, created = Token.objects.get_or_create(user=user) if created: # Initially set an unusable password if a user is created through this. user.set_unusable_password() user.save() if token: # Return our key for consumption. return Response({'token': token.key}, status=status.HTTP_200_OK) else: log.error( "Couldn't log in unknown user. Errors on serializer: %s" % (serializer.error_messages, )) return Response({'detail': 'Couldn\'t log you in. Try again later.'}, status=status.HTTP_400_BAD_REQUEST)
def test_email_auth_success(self): data = {'email': self.email} response = self.client.post(self.url, data) self.assertEqual(response.status_code, status.HTTP_200_OK) # Token sent to alias callback_token = CallbackToken.objects.filter(user=self.user, is_active=True).first() challenge_data = {'token': callback_token} # Try to auth with the callback token challenge_response = self.client.post(self.challenge_url, challenge_data) self.assertEqual(challenge_response.status_code, status.HTTP_200_OK) # Verify Auth Token auth_token = challenge_response.data['token'] self.assertEqual(auth_token, Token.objects.filter(key=auth_token).first().key)
def test_mobile_signup_success(self): mobile = '+15551234567' data = {'mobile': mobile} # Verify user doesn't exist yet user = User.objects.filter(**{self.mobile_field_name: '+15551234567'}).first() # Make sure our user isn't None, meaning the user was created. self.assertEqual(user, None) # verify a new user was created with serializer response = self.client.post(self.url, data) self.assertEqual(response.status_code, status.HTTP_200_OK) user = User.objects.get(**{self.mobile_field_name: '+15551234567'}) self.assertNotEqual(user, None) # Verify a token exists for the user self.assertEqual(CallbackToken.objects.filter(user=user, is_active=True).exists(), 1)
def test_mobile_auth_success(self): data = {'mobile': self.mobile} response = self.client.post(self.url, data) self.assertEqual(response.status_code, status.HTTP_200_OK) # Token sent to alias callback_token = CallbackToken.objects.filter(user=self.user, is_active=True).first() challenge_data = {'token': callback_token} # Try to auth with the callback token challenge_response = self.client.post(self.challenge_url, challenge_data) self.assertEqual(challenge_response.status_code, status.HTTP_200_OK) # Verify Auth Token auth_token = challenge_response.data['token'] self.assertEqual(auth_token, Token.objects.filter(key=auth_token).first().key)
def test_detail_todo_200(self): view = ObjectListViewSet.as_view() data = { 'name':'other list', 'priority':3, 'author':self.user_author } response = self.create_todo(data) url = 'api/v1/list/%s/' % response.data['uuid'] request = self.factory.get(url) force_authenticate(request,user=self.user_author) response = view(request,'v1',response.data['uuid']) self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_update_todo_200(self): view = ObjectListViewSet.as_view() data = { 'name':'bad name list', 'priority':1, 'author':self.user_author } response = self.create_todo(data) url = 'api/v1/list/%s/' % response.data['uuid'] new_data = { 'name':'Good name list', 'priority':1, } request = self.factory.put(url,new_data) force_authenticate(request,user=self.user_author) response = view(request,'v1',response.data['uuid']) self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_todos_author_200(self): view = AuthorListViewSet.as_view() all_data = [ {'name':'By author 1','priority':1,'author':self.user_author}, {'name':'By author 2','priority':1,'author':self.user_author}, {'name':'By author 3','priority':1,'author':self.user_author} ] for data in all_data: self.create_todo(data) # Make an authenticated request to the view... request = self.factory.get('api/v1/author_list/') force_authenticate(request,user=self.user_author) response = view(request,'v1') self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_list_all_item_200(self): all_data = [ { 'author':self.user_author,'note':'Note 1 for one activity','priority':2, 'title':'activity 1','uuid_list':self.list,'assigned_to':self.user_author}, {'author':self.user_author,'note':'Note 2 for','priority':3,'title':'activity 2','uuid_list':self.list,'assigned_to':self.user_author} ] for data in all_data: self.create_item(data) view = AllItemViewSet.as_view() request = self.factory.get('api/v1/item/') force_authenticate(request, user=self.user_author) response = view(request,'v1') self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_detail_item_200(self): """ detail item""" data = { 'author':self.user_author, 'note':'Note 1 for one activity', 'priority':2, 'title':'activity 1', 'uuid_list':self.list, 'assigned_to':self.user_author } response = self.create_item(data) view = ObjectItemViewSet.as_view() request = self.factory.get('api/v1/item/%s/' % response.data['uuid']) force_authenticate(request, user=self.user_author) response = view(request,'v1',response.data['uuid']) self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_all_item_for_list_200(self): all_data = [ { 'author':self.user_author,'note':'Note 1 for one activity','priority':2, 'title':'activity 1','uuid_list':self.list,'assigned_to':self.user_author}, { 'author':self.user_author,'note':'Note 2 for','priority':3,'title':'activity 2','uuid_list':self.list,'assigned_to':self.user_author} ] for data in all_data: self.create_item(data) view = AllItemForListViewSet.as_view() request = self.factory.get('api/v1/item/list/%s' % self.list) force_authenticate(request, user=self.user_author) response = view(request,'v1',self.list) self.assertEqual(response.status_code, status.HTTP_200_OK)
def put(self, request, version,uuid): """ Update a Item """ serializer = ItemSerializer(data=request.data) if not serializer.is_valid(): return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) data = serializer.data object = self.get_object(uuid) object.note = data['note'] object.title = data['title'] object.priority = data['priority'] try: object.due_date = data['due_date'] except KeyError: pass object.completed = data['completed'] object.save() return Response(status=status.HTTP_200_OK)
def test_POST_user_detail(self): token = Token.objects.get_or_create(user=self.user)[0] client = APIClient() client.credentials(HTTP_AUTHORIZATION='Token ' + token.key) name = self.user_data['first_name'] user = User.objects.filter(first_name=name)[0] temp = {'first_name': 'test', 'email': user.email} # TODO: Figure out why we need to pass 'email' to make patch request response = client.patch(reverse('user-detail', kwargs={'pk': user.pk}), data=temp) self.assertNotEqual(user.first_name, response.data['first_name']) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(temp['first_name'], response.data['first_name'])
def assign(self, request, pk): try: task = Task.objects.get(pk=pk, assignee=None) except Task.DoesNotExist: return Response(json.dumps({"message": "Already taken"}), status=status.HTTP_400_BAD_REQUEST) expense, created = TaskExpense.objects.get_or_create( task=task, executor_id=request.user.pk, money=task.money) if created: with transaction.atomic(): request.user.update_balance(u"???? ??????", task.money, task=task) Task.objects.filter(pk=pk, assignee=None).update(assignee=request.user) return Response(json.dumps({'message': "Taken"}), status=status.HTTP_200_OK)
def test_enrollment(self, mock_refresh, mock_edx_enr, mock_index): # pylint: disable=unused-argument """ Test for happy path """ cache_enr = CachedEnrollment.objects.filter( user=self.user, course_run__edx_course_key=self.course_id).first() assert cache_enr is None enr_json = {'course_details': {'course_id': self.course_id}} enrollment = Enrollment(enr_json) mock_edx_enr.return_value = enrollment resp = self.client.post(self.url, {'course_id': self.course_id}, format='json') assert resp.status_code == status.HTTP_200_OK assert mock_edx_enr.call_count == 1 assert mock_edx_enr.call_args[0][1] == self.course_id assert resp.data == enr_json mock_index.delay.assert_called_once_with([self.user.id], check_if_changed=True) cache_enr = CachedEnrollment.objects.filter( user=self.user, course_run__edx_course_key=self.course_id).first() assert cache_enr is not None
def test_skipped_financialaid_object_created(self): """ Tests that the user can create a skipped FinancialAid if it doesn't already exist """ assert FinancialAidAudit.objects.count() == 0 assert FinancialAid.objects.exclude(status=FinancialAidStatus.RESET).count() == 0 self.make_http_request(self.client.patch, self.skip_url, status.HTTP_200_OK) assert FinancialAidAudit.objects.count() == 1 assert FinancialAid.objects.exclude(status=FinancialAidStatus.RESET).count() == 1 financial_aid = FinancialAid.objects.exclude(status=FinancialAidStatus.RESET).first() assert financial_aid.tier_program == get_no_discount_tier_program(self.program) assert financial_aid.user == self.profile.user assert financial_aid.status == FinancialAidStatus.SKIPPED assert is_near_now(financial_aid.date_exchange_rate) assert financial_aid.country_of_income == self.profile.country assert financial_aid.country_of_residence == self.profile.country
def test_skipped_financialaid_object_updated(self, financial_aid_status): """ Tests that an existing FinancialAid object is updated to have the status "skipped" """ financial_aid = FinancialAidFactory.create( user=self.profile.user, tier_program=self.tier_programs["75k"], status=financial_aid_status, ) assert FinancialAidAudit.objects.count() == 0 self.make_http_request(self.client.patch, self.skip_url, status.HTTP_200_OK) assert FinancialAid.objects.exclude(status=FinancialAidStatus.RESET).count() == 1 financial_aid.refresh_from_db() assert financial_aid.tier_program == self.tier_programs["75k"] assert financial_aid.status == FinancialAidStatus.SKIPPED # Check logging assert FinancialAidAudit.objects.count() == 1
def test_provides_edx_link(self): """If the program doesn't have financial aid, the checkout API should provide a link to go to edX""" user = UserFactory.create() self.client.force_login(user) course_run = CourseRunFactory.create( course__program__live=True, course__program__financial_aid_availability=False, ) resp = self.client.post(reverse('checkout'), {'course_id': course_run.edx_course_key}, format='json') assert resp.status_code == status.HTTP_200_OK assert resp.json() == { 'payload': {}, 'url': 'http://edx_base/course_modes/choose/{}/'.format(course_run.edx_course_key), 'method': 'GET', } # We should only create Order objects for a Cybersource checkout assert Order.objects.count() == 0
def test_ignore_duplicate_cancel(self): """ If the decision is CANCEL and we already have a duplicate failed order, don't change anything. """ course_run, user = create_purchasable_course_run() order = create_unfulfilled_order(course_run.edx_course_key, user) order.status = Order.FAILED order.save() data = { 'req_reference_number': make_reference_id(order), 'decision': 'CANCEL', } with patch( 'ecommerce.views.IsSignedByCyberSource.has_permission', return_value=True ): resp = self.client.post(reverse('order-fulfillment'), data=data) assert resp.status_code == status.HTTP_200_OK assert Order.objects.count() == 1 assert Order.objects.get(id=order.id).status == Order.FAILED
def test_should_patch_and_update(self): """ Should let us update an existing automatic email! """ automatic = AutomaticEmailFactory.create(staff_user=self.staff_user) url = reverse('automatic_email_api-detail', kwargs={'email_id': automatic.id}) update = { "enabled": not automatic.enabled, "email_subject": "new subject", "email_body": "new body", "id": automatic.id } response = self.client.patch(url, update, format='json') assert response.status_code == status.HTTP_200_OK automatic.refresh_from_db() assert automatic.email_subject == update["email_subject"] assert automatic.email_body == update["email_body"] assert automatic.enabled == update["enabled"]
def test_send_course_team_email_view(self, mock_mailgun_client): """ Test that course team emails are correctly sent through the view """ self.client.force_login(self.staff_user) mock_mailgun_client.send_course_team_email.return_value = Mock( spec=Response, status_code=status.HTTP_200_OK, json=mocked_json() ) url = reverse(self.url_name, kwargs={'course_id': self.course.id}) resp_post = self.client.post(url, data=self.request_data, format='json') assert resp_post.status_code == status.HTTP_200_OK assert mock_mailgun_client.send_course_team_email.called _, called_kwargs = mock_mailgun_client.send_course_team_email.call_args assert called_kwargs['user'] == self.staff_user assert called_kwargs['course'] == self.course assert called_kwargs['subject'] == self.request_data['email_subject'] assert called_kwargs['body'] == self.request_data['email_body'] assert 'raise_for_status' not in called_kwargs
def test_send_learner_email_view(self, mock_mailgun_client): """ Test that learner emails are correctly sent through the view """ self.client.force_login(self.staff_user) mock_mailgun_client.send_individual_email.return_value = Mock( spec=Response, status_code=status.HTTP_200_OK, json=mocked_json() ) url = reverse(self.url_name, kwargs={'student_id': self.recipient_user.profile.student_id}) resp_post = self.client.post(url, data=self.request_data, format='json') assert resp_post.status_code == status.HTTP_200_OK assert mock_mailgun_client.send_individual_email.called _, called_kwargs = mock_mailgun_client.send_individual_email.call_args assert called_kwargs['subject'] == self.request_data['email_subject'] assert called_kwargs['body'] == self.request_data['email_body'] assert called_kwargs['recipient'] == self.recipient_user.email assert called_kwargs['sender_address'] == self.staff_user.email assert called_kwargs['sender_name'] == self.staff_user.profile.display_name assert 'raise_for_status' not in called_kwargs
def test_send_financial_aid_view(self, mock_mailgun_client): """ Test that the FinancialAidMailView will accept and return expected values """ self.client.force_login(self.staff_user_profile.user) mock_mailgun_client.send_financial_aid_email.return_value = Mock( spec=Response, status_code=status.HTTP_200_OK, json=mocked_json() ) resp_post = self.client.post(self.url, data=self.request_data, format='json') assert resp_post.status_code == status.HTTP_200_OK assert mock_mailgun_client.send_financial_aid_email.called _, called_kwargs = mock_mailgun_client.send_financial_aid_email.call_args assert called_kwargs['acting_user'] == self.staff_user_profile.user assert called_kwargs['financial_aid'] == self.financial_aid assert called_kwargs['subject'] == self.request_data['email_subject'] assert called_kwargs['body'] == self.request_data['email_body'] assert 'raise_for_status' not in called_kwargs