我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用django.http.response.HttpResponseNotFound()。
def task_file(request, contest_id, file_id): contest = get_object_or_404(models.TaskBasedContest, pk=contest_id) if not contest.is_visible_in_list and not request.user.is_staff: return HttpResponseNotFound() file = get_object_or_404(tasks_models.TaskFile, pk=file_id) if not contest.has_task(file.task): return HttpResponseNotFound() if not contest.is_started() and not request.user.is_staff: return HttpResponseForbidden('Contest is not started') participant = contest.get_participant_for_user(request.user) if not is_task_open(contest, file.task, participant) and not request.user.is_staff: return HttpResponseForbidden('Task is closed') if file.participant is not None and file.participant.id != request.user.id: return HttpResponseForbidden() file_path = file.get_path_abspath() return respond_as_attachment(request, file_path, file.name, file.content_type)
def contest(request, contest_id): contest = get_object_or_404(models.Contest, pk=contest_id) if not contest.is_visible_in_list: has_access = (request.user.is_authenticated() and (request.user.is_staff or contest.is_user_participating(request.user))) if not has_access: return HttpResponseNotFound() participants = contest.participants.filter(is_approved=True) news = contest.news.order_by('-publish_time') if not request.user.is_staff: news = news.filter(is_published=True) return render(request, 'contests/contest.html', { 'current_contest': contest, 'contest': contest, 'news': news, 'participants': participants, })
def task_opens(request, contest_id, task_id): contest = get_object_or_404(models.TaskBasedContest, pk=contest_id) task = get_object_or_404(tasks_models.Task, pk=task_id) if not contest.has_task(task): return HttpResponseNotFound() participants = sorted(contest.participants.all(), key=operator.attrgetter('name')) for participant in participants: participant.is_task_open = is_task_open(contest, task, participant) is_manual_task_opening_available = is_manual_task_opening_available_in_contest(contest) return render(request, 'contests/task_opens.html', { 'current_contest': contest, 'contest': contest, 'task': task, 'participants': participants, 'is_manual_task_opening_available': is_manual_task_opening_available, })
def remove_member(request, team_id, user_id): team = get_object_or_404(models.Team, pk=team_id) # Only staff and team's captain can edit team if not request.user.is_staff and team.captain != request.user: return HttpResponseNotFound() user = get_object_or_404(users_models.User, pk=user_id) with transaction.atomic(): if user == team.captain: messages.warning(request, 'Captain can\'t leave the team') return redirect(team) if user not in team.members.all(): messages.warning(request, 'User %s not in the team' % (user.username,)) return redirect(team) team.members.remove(user) team.save() messages.success(request, 'User %s has left the team %s' % (user.username, team.name)) return redirect(team)
def handler404(request, template_name='404.html'): t = get_template(template_name) ctx = Context({}) return HttpResponseNotFound(t.render(ctx))
def set_crash_points_limit(request, pk): if not request.POST: return HttpResponseNotFound() server = get_object_or_404(Server, pk=pk, owner=request.user) server.back_crash_points_limit = request.POST.get('back_crash_points_limit') server.save() messages.add_message(request, messages.SUCCESS, "Crash points limit set to {}.".format(request.POST.get('back_crash_points_limit')), extra_tags='success') return redirect('account:home')
def set_custom_motd(request, pk): if not request.POST: return HttpResponseNotFound() server = get_object_or_404(Server, pk=pk, owner=request.user) server.back_custom_motd = request.POST.get('back_custom_motd') server.save() messages.add_message(request, messages.SUCCESS, "Welcome message changed.", extra_tags='success') return redirect('account:home')
def add_category(request, contest_id): contest = get_object_or_404(models.TaskBasedContest, pk=contest_id) if contest.tasks_grouping != models.TasksGroping.ByCategories: return HttpResponseNotFound() if request.method == 'POST': form = forms.CategoryForm(data=request.POST) if form.is_valid(): category = categories_models.Category( name=form.cleaned_data['name'], description=form.cleaned_data['description'] ) with transaction.atomic(): category.save() contest.categories_list.categories.add(category) contest.save() return redirect(urlresolvers.reverse('contests:tasks', args=[contest.id])) else: form = forms.CategoryForm() return render(request, 'contests/create_category.html', { 'current_contest': contest, 'contest': contest, 'form': form, })
def edit_category(request, contest_id, category_id): contest = get_object_or_404(models.TaskBasedContest, pk=contest_id) if contest.tasks_grouping != models.TasksGroping.ByCategories: return HttpResponseNotFound() category = get_object_or_404(categories_models.Category, pk=category_id) if not contest.categories_list.categories.filter(id=category.id).exists(): return HttpResponseNotFound() if request.method == 'POST': form = forms.CategoryForm(data=request.POST) if form.is_valid(): new_category = categories_models.Category( name=form.cleaned_data['name'], description=form.cleaned_data['description'] ) new_category.id = category.id new_category.save() return redirect(urlresolvers.reverse('contests:tasks', args=[contest.id])) else: form = forms.CategoryForm(initial=category.__dict__) return render(request, "contests/edit_category.html", { 'current_contest': contest, 'contest': contest, 'category': category, 'form': form, })
def delete_category(request, contest_id, category_id): contest = get_object_or_404(models.TaskBasedContest, pk=contest_id) if contest.tasks_grouping != models.TasksGroping.ByCategories: return HttpResponseNotFound() category = get_object_or_404(categories_models.Category, pk=category_id) if not contest.categories_list.categories.filter(id=category.id).exists(): return HttpResponseNotFound() contest.categories_list.categories.remove(category) contest.categories_list.save() return redirect(urlresolvers.reverse('contests:tasks', args=[contest.id]))
def change_participant_status(request, contest_id, participant_id): contest = get_object_or_404(models.Contest, pk=contest_id) participant = get_object_or_404(models.AbstractParticipant, pk=participant_id, contest_id=contest_id) parameter = request.POST['parameter'] value = request.POST['value'] == 'true' if parameter not in ('is_approved', 'is_disqualified', 'is_visible_in_scoreboard'): return HttpResponseNotFound() setattr(participant, parameter, value) participant.save() return redirect(urlresolvers.reverse('contests:participants', args=[contest.id]))
def add_task_to_category(request, contest_id, category_id): contest = get_object_or_404(models.TaskBasedContest, pk=contest_id) category = get_object_or_404(categories_models.Category, pk=category_id, contestcategories__contest_id=contest_id) if contest.tasks_grouping != models.TasksGroping.ByCategories: return HttpResponseNotFound() return add_task_to_contest_view(request, contest, category)
def add_task(request, contest_id): contest = get_object_or_404(models.TaskBasedContest, pk=contest_id) if contest.tasks_grouping != models.TasksGroping.OneByOne: return HttpResponseNotFound() return add_task_to_contest_view(request, contest)
def delete_task(request, contest_id, task_id): contest = get_object_or_404(models.TaskBasedContest, pk=contest_id) task = get_object_or_404(tasks_models.Task, pk=task_id) if not contest.has_task(task): return HttpResponseNotFound() task.delete() return redirect(urlresolvers.reverse('contests:tasks', args=[contest.id]))
def edit_news(request, contest_id, news_id): contest = get_object_or_404(models.Contest, pk=contest_id) news = get_object_or_404(models.News, pk=news_id) if contest.id != news.contest_id: return HttpResponseNotFound() if request.method == 'POST': form = forms.NewsForm(data=request.POST) if form.is_valid(): new_news = models.News( author=news.author, contest=contest, created_at=news.created_at, updated_at=news.updated_at, **form.cleaned_data ) new_news.id = news.id new_news.save() messages.success(request, 'News saved') return redirect(new_news) else: form = forms.NewsForm(initial=news.__dict__) return render(request, 'contests/edit_news.html', { 'current_contest': contest, 'contest': contest, 'news': news, 'form': form, })
def open_task(request, contest_id, task_id, participant_id): contest = get_object_or_404(models.TaskBasedContest, pk=contest_id) task = get_object_or_404(tasks_models.Task, pk=task_id) if not contest.has_task(task): return HttpResponseNotFound() if not is_manual_task_opening_available_in_contest(contest): messages.error(request, 'Manual task opening is forbidden for this contest') return redirect(urlresolvers.reverse('contests:task_opens', args=[contest.id, task.id])) participant = get_object_or_404(models.AbstractParticipant, pk=participant_id) if participant.contest_id != contest.id: return HttpResponseNotFound() qs = tasks_models.ManualOpenedTask.objects.filter( contest=contest, task=task, participant=participant ) # Toggle opens state: close if it's open, open otherwise if qs.exists(): qs.delete() if is_task_open(contest, task, participant): messages.warning(request, 'Task is opened for this participant not manually, you can\'t close it') else: messages.success(request, 'Task is closed for %s' % participant.name) else: tasks_models.ManualOpenedTask( contest=contest, task=task, participant=participant ).save() messages.success(request, 'Task is opened for %s' % participant.name) return JsonResponse({'done': 'ok'})
def respond_as_attachment(request, file_path, original_filename, content_type=None, encoding=None): if not os.path.exists(file_path): return HttpResponseNotFound() with open(file_path, 'rb') as fp: response = HttpResponse(fp.read()) if content_type is None: content_type, encoding = mimetypes.guess_type(original_filename) if content_type is None: content_type = 'application/octet-stream' response['Content-Type'] = content_type response['Content-Length'] = str(os.stat(file_path).st_size) if encoding is not None: response['Content-Encoding'] = encoding # To inspect details for the below code, see http://greenbytes.de/tech/tc2231/ if 'WebKit' in request.META['HTTP_USER_AGENT']: # Safari 3.0 and Chrome 2.0 accepts UTF-8 encoded string directly. filename_header = 'filename=%s' % original_filename elif 'MSIE' in request.META['HTTP_USER_AGENT']: # IE does not support internationalized filename at all. # It can only recognize internationalized URL, so we do the trick via routing rules. filename_header = '' else: # For others like Firefox, we follow RFC2231 (encoding extension in HTTP headers). filename_header = 'filename*=UTF-8\'\'%s' % urllib.parse.quote(original_filename) response['Content-Disposition'] = 'attachment; ' + filename_header return response
def edit(request, team_id): team = get_object_or_404(models.Team, pk=team_id) # Only staff and team's captain can edit team if not request.user.is_staff and team.captain != request.user: return HttpResponseNotFound() if settings.DRAPO_ONLY_STAFF_CAN_EDIT_TEAM_NAME and not request.user.is_staff: return HttpResponseNotFound() if request.method == 'POST': form = forms.TeamForm(data=request.POST) if form.is_valid(): team_name = form.cleaned_data['name'] with transaction.atomic(): if models.Team.objects.filter(name=team_name).exists(): form.add_error('name', 'Team with same name already exists') else: team.name = team_name team.save() messages.success(request, 'Team %s saved' % team.name) return redirect(team) else: form = forms.TeamForm(initial={'name': team.name}) return render(request, 'teams/edit.html', { 'team': team, 'form': form, })
def tasks(request, contest_id): contest = get_object_or_404(models.TaskBasedContest, pk=contest_id) if not contest.is_visible_in_list and not request.user.is_staff: return HttpResponseNotFound() if not contest.is_started() and not request.user.is_staff: messages.error(request, '%s is not started yet' % contest.name) return redirect(contest) solved_tasks_ids = {} if request.user.is_authenticated(): participant = contest.get_participant_for_user(request.user) solved_tasks_ids = contest.get_tasks_solved_by_participant(participant) else: participant = None # Iterate all policies, collect opened tasks opened_tasks_ids = set( itertools.chain.from_iterable( policy.get_open_tasks(participant) for policy in contest.tasks_opening_policies.all() ) ) if contest.tasks_grouping == models.TasksGroping.OneByOne: tasks = contest.tasks return render(request, 'contests/tasks_one_by_one.html', { 'current_contest': contest, 'contest': contest, 'tasks': tasks, 'solved_tasks_ids': solved_tasks_ids, 'opened_tasks_ids': opened_tasks_ids, }) if contest.tasks_grouping == models.TasksGroping.ByCategories: categories = contest.categories return render(request, 'contests/tasks_by_categories.html', { 'current_contest': contest, 'contest': contest, 'categories': categories, 'solved_tasks_ids': solved_tasks_ids, 'opened_tasks_ids': opened_tasks_ids, })
def join(request, invite_hash=None): if request.method == 'POST' and 'invite_hash' in request.POST: invite_hash = request.POST['invite_hash'] team = models.Team.objects.filter(invite_hash=invite_hash).first() error_message = None if team is None: error_message = 'Team not found. Return back and try one more time' if request.method == 'POST' and team is not None: if team.members.count() > settings.DRAPO_TEAM_SIZE_LIMIT: messages.error('You can\'t join the team because it\'s very big (%d members)' % team.members.count()) else: with transaction.atomic(): if request.user in team.members.all(): messages.warning(request, 'You are already in team ' + team.name) if 'next' in request.POST and '//' not in request.POST['next']: return redirect(request.POST['next']) return redirect(urlresolvers.reverse('teams:team', args=[team.id])) if settings.DRAPO_USER_CAN_BE_ONLY_IN_ONE_TEAM and \ models.Team.objects.filter(members=request.user).exists() and \ not request.user.is_staff: messages.error(request, 'You can\'t join team %s while you are a member of another team' % team.name) if 'next' in request.POST and '//' not in request.POST['next']: return redirect(request.POST['next']) return redirect(urlresolvers.reverse('teams:team', args=[team.id])) team.members.add(request.user) team.save() messages.success(request, 'You joined team ' + team.name + '!') if 'next' in request.POST and '//' not in request.POST['next']: return redirect(request.POST['next']) return redirect(urlresolvers.reverse('teams:team', args=[team.id])) elif request.method == 'GET': if team is None: return HttpResponseNotFound() return render(request, 'teams/join.html', { 'error_message': error_message, 'team': team })
def post(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: # POST received: # <QueryDict: { # 'client_id': ['docker'], # 'refresh_token': ['boink'], # 'service': ['registry.maurusnet.test'], # 'scope': ['repository:dev/destalinator:push,pull'], # 'grant_type': ['refresh_token']}> if "refresh_token" in request.POST and request.POST["grant_type"] == "refresh_token": tr = _tkr_parse(request.POST) if tr.scope: tp = TokenPermissions.parse_scope(tr.scope) else: return HttpResponseBadRequest("Can't issue access token without valid scope (scope=%s)", tr.scope) try: client = DockerRegistry.objects.get(client_id=tr.service) # type: DockerRegistry except DockerRegistry.DoesNotExist: return HttpResponseNotFound("No such registry/client from refresh token(%s)" % str(tr)) user = self._user_from_refresh_token(request.POST["refresh_token"], client.public_key_pem(), expected_issuer=request.get_host(), expected_audience=tr.service) if user: try: drepo = DockerRepo.objects.get(name=tp.path, registry_id=client.id) except DockerRepo.DoesNotExist: if settings.DOCKERAUTH_ALLOW_UNCONFIGURED_REPOS: drepo = DockerRepo() drepo.name = tp.path drepo.registry = client drepo.unauthenticated_read = True drepo.unauthenticated_write = True else: return HttpResponseNotFound("No such repo '%s'" % tp.path) if drepo.registry.has_access(user, tp) or drepo.has_access(user, tp): rightnow = datetime.datetime.now(tz=pytz.UTC) return HttpResponse(content=json.dumps({ "access_token": self._create_jwt( self._make_access_token(request, tr, rightnow, tp, user), client.private_key_pem(), ), "scope": tr.scope, "expires_in": 119, "refresh_token": self._create_jwt( self._make_refresh_token(request, tr, rightnow, user), client.private_key_pem(), ) }), status=200, content_type="application/json") else: return HttpResponseForbidden("User %s doesn't have access to repo %s" % (user.pk, tp.path)) else: return HttpResponse("Unauthorized", status=401) else: return HttpResponseBadRequest("POSTing to this endpoint requires a refresh_token")