我们从Python开源项目中,提取了以下32个代码示例,用于说明如何使用django.utils.datastructures.MultiValueDictKeyError()。
def put(self, request): try: zip_file = request.FILES['file'] archive = zipfile.ZipFile(zip_file) except (MultiValueDictKeyError, zipfile.BadZipfile): raise NotZIPFileError try: csv_name = [item for item in archive.namelist() if item.endswith('csv')][0] except IndexError: raise NoCSVInArchiveFoundError with archive.open(csv_name) as zip_csv_file: # Convert zipfile handle to Django file handle csv_file = File(zip_csv_file) dataset = Dataset.objects.create( name=zip_csv_file.name, content=csv_file, uploaded_by=request.user) # Start tasks for feature calculation initialize_from_dataset.delay(dataset_id=dataset.id) serializer = DatasetSerializer(instance=dataset) return Response(serializer.data)
def get(self, request): try: window = int(request.query_params[LOOKBACK_PARAM_NAME]) except MultiValueDictKeyError: # MultiValueDictKeyError happens when a key doesn't exist window = 1 except ValueError: # ValueError if something entered for a window that couldn't be interpreted return Response(status=400) user = request.user sleep_activities = SleepLog.objects.filter(user=user) builder = SleepActivityDataframeBuilder(sleep_activities) sleep_aggregate = builder.get_sleep_history_series() sleep_average = sleep_aggregate.rolling(window=window, min_periods=1).mean() result = sleep_average.to_json(date_format='iso') result = json.loads(result) return Response(data=result)
def post(self, request): data = request.data user = request.user try: initial_data = { 'start_date': data['start_date'], 'end_date': data['end_date'], } except (MultiValueDictKeyError, KeyError) as exc: return Response('Missing POST parameters {}'.format(exc), status=400) serializer = FitbitAPIRequestSerializer(data=initial_data) serializer.is_valid(raise_exception=True) # send the job off to celery so it's an async task import_user_fitbit_history_via_api.delay(user=user, **serializer.validated_data) return Response(status=202)
def get(self, request, *args, **kwargs): try: key = request.query_params['key'] user = MomoUser.objects.get(hash_username=key) user.is_active = True user.save() except MomoUser.DoesNotExist: raise Http404 except MultipleObjectsReturned: raise ValidationError(detail="?? ?? url? ?? ?????.") except MultiValueDictKeyError: raise ValidationError(detail="?? ?? url? ?? ?????.") return Response({"user_pk": user.pk, "detail": "user? ????????.", "url": reverse('index', request=request)}, status=status.HTTP_200_OK, template_name='member/activate.html')
def delete(request): if request.method != 'POST': return HttpResponseRedirect('/crm/search/') try: person = Person.objects.get(pk=request.POST['person_id']) except (Person.DoesNotExist, MultiValueDictKeyError): raise Http404('Person has already been deleted') # copy contact data to DeletedConcact for contact in person.contact_set.all(): del_contact = DeletedContact( original_pk=contact.pk, original_person_id=contact.author.pk, event=contact.event, date_of_contact=contact.date_of_contact, notes=contact.notes, method=contact.notes, ) del_contact.save() add_change_record(person, 'delete', request.user) person.delete() return HttpResponseRedirect('/crm/search/')
def save_person_details(request): """ ajax call to save person details and update that section of page """ updated_details_success = None person = None person_details_form = PersonDetailsForm() if request.method == 'POST': try: person = Person.objects.get(pk=request.POST['person_id']) add_change_record(person, 'update', request.user) add_to_recent_contacts(request, request.POST['person_id']) person_details_form = PersonDetailsForm(request.POST, instance=person) if person_details_form.is_valid(): person_details_form.save() updated_details_success = True except (Person.DoesNotExist, MultiValueDictKeyError): raise Http404('Sorry, this person seems to have been deleted ' \ 'from the database') context = { 'person': person, 'person_details_form': person_details_form, 'updated_details_success': updated_details_success } return render(request, 'crm/detail_addins/person_detail_edit_panel.html', context)
def filter_venue(request): """ Dynamically filter list of venues based on city """ venue_form = VenueForm() try: city_partial = request.GET['city_partial'] venue_list = Venue.objects.filter( city__icontains=city_partial).order_by('city', 'name') except MultiValueDictKeyError: city_partial = None venue_list = Venue.objects.all().order_by('city', 'name') context = { 'venue_form': venue_form, 'venue_list': venue_list, } return render(request, 'registration/addins/venue_sidebar.html', context)
def get(self, request): try: crawler_id = request.GET['crawler_id'] except MultiValueDictKeyError: return ErrorResponse.error_response(-100, 'Invalid crawler_id') subscription_list = Subscription.objects.filter(crawler_id=crawler_id) total = [] for subscription in subscription_list: subscriber = subscription.subscriber push_token_list = PushToken.objects.filter(owner=subscriber) for push_token in push_token_list: arr = {'email': push_token.owner.email, 'push_token': push_token.push_token} total.append(arr) return_data = {'ErrorCode': 0, 'data': total, 'message': 'successfully return list'} return Response(return_data)
def post(self, request, format=None): """Search is a POST.""" try: query_term = request.data['term'] except MultiValueDictKeyError: raise ValidationError(detail=['Parameter "term" is mandatory.']) offset = request.data.get('offset', 0) limit = request.data.get('limit', 100) doc_type = request.data.get('doc_type') results = ESConnector().search_by_term( term=query_term, doc_type=doc_type, offset=int(offset), limit=int(limit) ) formatted_results = format_es_results(results) return Response(data=formatted_results)
def sso_entry(request): """ Entrypoint view for SSO. Gathers the parameters from the HTTP request, stores them in the session and redirects the requester to the login_process view. """ passed_data = request.POST if request.method == 'POST' else request.GET try: request.session['SAMLRequest'] = passed_data['SAMLRequest'] except (KeyError, MultiValueDictKeyError) as e: return HttpResponseBadRequest(e) request.session['RelayState'] = passed_data.get('RelayState', '') # TODO check how the redirect saml way works. Taken from example idp in pysaml2. if "SigAlg" in passed_data and "Signature" in passed_data: request.session['SigAlg'] = passed_data['SigAlg'] request.session['Signature'] = passed_data['Signature'] return HttpResponseRedirect(reverse('saml_login_process')) # TODO Add http redirect logic based on https://github.com/rohe/pysaml2/blob/master/example/idp2_repoze/idp.py#L327
def post(self, request): user = request.user data = request.data try: initial_data = { 'rescuetime_api_key': data['rescuetime_api_key'], 'start_date': data['start_date'], 'end_date': data['end_date'], } except (MultiValueDictKeyError, KeyError) as exc: return Response('Missing POST parameters {}'.format(exc), status=400) serializer = RescueTimeAPIRequestSerializer(data=initial_data) serializer.is_valid(raise_exception=True) # send the job off to celery so it's an async task import_user_rescuetime_history_via_api.delay(user=user, **serializer.validated_data) return Response(status=202)
def host_mgr(request): try: group_id = request.GET['groupid'] except MultiValueDictKeyError as e: host_list = request.user.bind_hosts.select_related() group_name = '???' else: host_list = models.BindHostToUser.objects.filter(host_groups__id=group_id) group_name = models.HostGroup.objects.get(id=group_id) # print(locals()) return render(request, 'hosts/host_mgr.html', locals())
def submit_task(request): try: task_type = request.POST['task_type'] except MultiValueDictKeyError as e: return HttpResponse('chu cuo le, ni mei you chuan task_type') else: task_handler = task.Task(request) task_id = task_handler.call(task_type) # backend func() returns return HttpResponse(json.dumps(task_id))
def blog(request, current_paging_number, category): if current_paging_number == '': current_paging_number = '1' try: page = Paginator(Gallery.objects.filter(Q(detail__contains=request.GET['q']) | Q(title__contains=request.GET['q'])). order_by('-created_date'), 5) except MultiValueDictKeyError: if category == '': page = Paginator(Gallery.objects.order_by('-created_date'), 5) else: page = Paginator(Gallery.objects.filter(categorys=category).order_by('-created_date'), 5) try: # ??? ??? ??? ? page_count = 5 # ?? ??? ? page_max_count = page._get_num_pages() get_gallery = page get_image = Image.objects.filter(thumbnail='True') get_category = Categorys.objects.all() if (page_max_count-1) // page_count == (int(current_paging_number)-1) // page_count: page_loop = ((page_max_count-1) % page_count)+1 else: page_loop = page_count return render(request, 'cluster/blog.html', { 'blog_list': get_gallery.page(current_paging_number), 'image_list': get_image, 'paging_number': current_paging_number, 'page_loop': page_loop, 'page_size': page_count, 'page_max_count': page_max_count, 'category_list': get_category, 'category': category, }) except: return redirect('/blog/')
def add_contact_history(request): person = None new_contact_form = NewContactForm() if request.method == 'POST': new_contact_form = NewContactForm(request.POST) try: person = Person.objects.get(pk=request.POST['person_id']) add_to_recent_contacts(request, request.POST['person_id']) if new_contact_form.is_valid(): event = Event.objects.get(pk=request.POST['event']) new_contact = Contact( person=person, event=event, date_of_contact=timezone.now(), notes=new_contact_form.cleaned_data['notes'], method=new_contact_form.cleaned_data['method'], author=request.user, ) new_contact.save() person.date_modified = timezone.now() person.modified_by = request.user person.save() except (Person.DoesNotExist, MultiValueDictKeyError): raise Http404('Sorry, this person seems to have been deleted ' \ 'from the database.') context = { 'person': person, 'new_contact_form': new_contact_form, } return render(request, 'crm/detail_addins/detail_contact_history.html', context)
def add_master_list_select(request): select_form = MasterTerritoryForm() list_selects = None sample_select = None select_count = 0 if request.method == 'POST': try: event = Event.objects.get(pk=request.POST['conf_id']) except (Event.DoesNotExist, MultiValueDictKeyError): raise Http404('Something is wrong - that event does not exist') select_form = MasterTerritoryForm(request.POST) if select_form.is_valid(): new_select = select_form.save(commit=False) new_select.event = event new_select.save() select_form = MasterTerritoryForm() list_selects = MasterListSelections.objects.filter(event=event) sample_select = build_master_territory_list(list_selects) select_count = sample_select.count() sample_select = sample_select.order_by('?')[:250] sample_select = sorted(sample_select, key=lambda o: o.company) context = { 'select_form': select_form, 'list_selects': list_selects, 'sample_select': sample_select, 'select_count': select_count, } return render(request, 'crm/territory_addins/master_territory_panel.html', context)
def delete_master_list_select(request): select_form = MasterTerritoryForm() list_selects = None sample_select = None select_count = 0 if request.method == 'POST': try: event = Event.objects.get(pk=request.POST['conf_id']) select = MasterListSelections.objects.get( pk=request.POST['select_id'] ) select.delete() except (Event.DoesNotExist, MultiValueDictKeyError): raise Http404('Something is wrong - that event does not exist') except MasterListSelections.DoesNotExist: pass list_selects = MasterListSelections.objects.filter(event=event) sample_select = build_master_territory_list(list_selects) select_count = sample_select.count() sample_select = sample_select.order_by('?')[:250] sample_select = sorted(sample_select, key=lambda o: o.company) context = { 'select_form': select_form, 'list_selects': list_selects, 'sample_select': sample_select, 'select_count': select_count, } return render(request, 'crm/territory_addins/master_territory_panel.html', context)
def get_recent_contacts(request): """ ajax call to populate recent contacts on sidebar """ if 'recent_contacts' not in request.session: request.session['recent_contacts'] = [] recent_contact_list = [] for contact in request.session['recent_contacts']: try: recent_contact_list.append(Person.objects.get(pk=contact)) except (Person.DoesNotExist, MultiValueDictKeyError): pass context = { 'recent_contact_list': recent_contact_list, } return render(request, 'crm/addins/recently_viewed.html', context)
def update_user_assignments(request): """ ajax call to update d/b when a user is moved to a new category called from manage_territory.html -> territory_builder.html """ role_map_dict = { 'sales-staff': 'SA', 'sponsorship-staff': 'SP', 'pd-staff': 'PD', 'unassigned-staff': 'NA', } if request.method != 'POST': return HttpResponse('') try: event = Event.objects.get(pk=request.POST['conf_id']) user = User.objects.get(pk=request.POST['user_id']) role = role_map_dict[request.POST['role']] except (Event.DoesNotExist, MultiValueDictKeyError): raise Http404('Something is wrong - that event does not exist') except User.DoesNotExist: raise Http404("Something is wrong - that user does not exist") except KeyError: raise Http404('Unrecognized target category') # if user has an assignment (and possibly sub-selects), update that record if EventAssignment.objects.filter(event=event, user=user).exists(): curr_event = EventAssignment.objects.get(event=event, user=user) curr_event.role = role curr_event.save() # otherwise, create new assignment record else: EventAssignment(user=user, event=event, role=role).save() return HttpResponse('') ############################ # GRAPHIC ASSETS ############################
def get_company_details(request): if 'company' not in request.GET: company = None else: try: company = Company.objects.get(pk=request.GET['company']) except (Company.DoesNotExist, MultiValueDictKeyError, ValueError): company = None if company: company_data = model_to_dict(company) company_data['company_name'] = company_data.pop('name') else: company_data = {'id': None} return JsonResponse(company_data)
def add_edit_conference(request): """ Renders conference page """ edit_action = 'blank' event = None event_option_set = None conference_edit_form = ConferenceEditForm() conference_option_form = ConferenceOptionForm() if request.method == 'GET': edit_action = request.GET.get('action', 'blank') if edit_action not in ('blank', 'new', 'edit'): edit_action = 'blank' if edit_action == 'edit': try: event = Event.objects.get(pk=request.GET['id']) conference_edit_form = ConferenceEditForm(instance=event) event_option_set = event.eventoptions_set.all() except (Event.DoesNotExist, MultiValueDictKeyError): edit_action = 'blank' venue_list = Venue.objects.all().order_by('city', 'name') venue_form = VenueForm() conference_select_form = ConferenceSelectForm() context = { 'edit_action': edit_action, 'venue_form': venue_form, 'venue_list': venue_list, 'conference_select_form': conference_select_form, 'conference_edit_form': conference_edit_form, 'conference_option_form': conference_option_form, 'event': event, 'event_option_set': event_option_set, } return render(request, 'registration/conference.html', context)
def delete_temp_conf(request): """ ajax call to delete temporary conference and options """ event = None if request.method == 'POST': try: event = Event.objects.get(pk=request.POST['event_id']) except (Event.DoesNotExist, MultiValueDictKeyError): event = None if event: event.delete() return HttpResponse('')
def save_conference_changes(request): """ AJAX call to save changes to conference being edited """ event = None edit_action = None conference_option_form = ConferenceOptionForm() conference_edit_form = ConferenceEditForm() event_option_set = None if request.method == 'POST': try: event_id = request.POST['event_id'] if event_id != 'new': event = Event.objects.get(pk=event_id) conference_edit_form = ConferenceEditForm( request.POST, instance=event ) if conference_edit_form.is_valid(): event = conference_edit_form.save() return HttpResponse( '<div class="row text-center">' \ '<a href="/registration/new_delegate_search/" class="btn btn-primary">Create New Registration</a>' \ '</div>' ) except (Event.DoesNotExist, MultiValueDictKeyError): conference_edit_form = ConferenceEditForm(request.POST) context = { 'edit_action': edit_action, 'conference_edit_form': conference_edit_form, 'conference_option_form': conference_option_form, 'event': event, 'event_option_set': event_option_set, } return render(request, 'registration/addins/conference_edit_panel.html', context)
def select_conference_to_edit(request): """ ajax call to select conference for editing """ event = None edit_action = 'new' conference_edit_form = ConferenceEditForm() conference_option_form = ConferenceOptionForm() event_option_set = None if request.method == 'POST': if request.POST['edit_action'] != 'edit': edit_action = 'blank' else: try: event = Event.objects.get(pk=request.POST['conf_id']) edit_action = 'edit' conference_edit_form = ConferenceEditForm(instance=event) event_option_set = event.eventoptions_set.all() except (Event.DoesNotExist, MultiValueDictKeyError): edit_action = 'blank' context = { 'edit_action': edit_action, 'conference_edit_form': conference_edit_form, 'conference_option_form': conference_option_form, 'event': event, 'event_option_set': event_option_set, } return render(request, 'registration/addins/conference_edit_panel.html', context)
def _set_event(self): if self.request.POST['event_id'] in ('', None): self.event = None else: try: self.event = Event.objects.get(pk=self.request.POST['event_id']) except (MultiValueDictKeyError, Event.DoesNotExist): self.event = None
def _set_event_option(self): try: self.event_option = EventOptions.objects.get( pk=self.request.POST['option_id'] ) except (MultiValueDictKeyError, Event.DoesNotExist, ValueError): self.event_option = None
def put(self, request): # get data from request. try: email = request.data['email'] except MultiValueDictKeyError: return ErrorResponse.error_response(-101, "No email") try: password = request.data['password'] except MultiValueDictKeyError: return ErrorResponse.error_response(-102, "No current password") try: new_password = request.data['new_password'] except MultiValueDictKeyError: return ErrorResponse.error_response(-103, "No new_password") try: user = UserProfile.objects.get(email=email, sign_up_type='email') except ObjectDoesNotExist: return ErrorResponse.error_response(-200, "Invalid user") chk_password = check_password(password=password, encoded=user.password) if chk_password is False: return ErrorResponse.error_response(-300, "Not correct current password") user.password = make_password(password=new_password, salt=None, hasher='default') user.save() return_data = {"message": "success", "ErrorCode": 0} return Response(return_data) # Sign up and Sign in class for social user
def post(self, request, format = None): try: crawler_id = request.data['crawler_id'] except MultiValueDictKeyError: return ErrorResponse.error_response(-101, "No crawler id data in http request") try: crawler = Crawler.objects.get(crawler_id=crawler_id) except ObjectDoesNotExist: return ErrorResponse.error_response(-200, "Invalid crawler") subscriber = request.user Subscription.objects.create(subscriber=subscriber, crawler=crawler, latest_pushtime=timezone.now()) return_data = {"message": 'success', 'ErrorCode': 0} return Response(return_data)
def delete(self, request, format=None): try: crawler_id = request.data['crawler_id'] except MultiValueDictKeyError: return ErrorResponse.error_response(-101, "No crawler data in request") subscriber = request.user try: subscription = Subscription.objects.get(subscriber=subscriber, crawler_id=crawler_id) except ObjectDoesNotExist: return ErrorResponse.error_response(-200, "Invalid subscriptions") subscription.delete() return_data = {"message": "success", "ErrorCode": 0} return Response(return_data)
def test_upload_with_wrong_key(self): """ Should throw error while uploading with wrong key """ with open(self.assets_dir + 'abc.json', 'rb') as f, \ self.assertRaises(MultiValueDictKeyError) as err: self.client.post('/abc/', {'wrong_key': f}) self.assertIn( "Upload a file with the key 'file'", err.exception.args )
def upload(self, request, *args, **kwargs): try: uploaded_file = request.data['file'] except MultiValueDictKeyError: raise MultiValueDictKeyError("Upload a file with the key 'file'") content = b'' for chunk in uploaded_file.chunks(): content += chunk # try parsers defined in the `file_content_parser_classes` for parser_cls in self.file_content_parser_classes: try: data = parser_cls().parse(six.BytesIO(content)) break except (ParseError, ValidationError): # try all parsers provided continue else: raise ParseError( 'Could not parse content of the file to any of the parsers ' 'provided.' ) # create model instances from contents of the file serializer = self.get_serializer(data=data, many=True) serializer.is_valid(raise_exception=True) self.perform_create(serializer) headers = self.get_success_headers(serializer.data) return Response( data=serializer.data, status=HTTP_201_CREATED, headers=headers )
def post(self, request, format=None): data = request.data try: email = data['email'] except MultiValueDictKeyError: return_data = {'message': 'No email', 'ErrorCode': -100} return Response(return_data) try: password = data['password'] except MultiValueDictKeyError: return_data = {'message': 'No password', 'ErrorCode': -200} return Response(return_data) try: push_token = data['push_token'] except MultiValueDictKeyError: return_data = {'message': 'No push token', 'ErrorCode': -300} return Response(return_data) try: UserProfile.objects.get(email=email, sign_up_type='email') except ObjectDoesNotExist: return_data = {'message': 'Invalid user', 'ErrorCode': -101} return Response(return_data) if UserProfile.objects.get(email=email).is_email_authenticated() is not True: return_data = {'message': 'Need authentication', 'ErrorCode': -102} return Response(return_data) user = Auth.authenticate(email=email, sign_up_type='email', password=password) if user is -1: return_data = {'message': 'Invalid user', 'ErrorCode': -103} if push_token == '-1': return redirect('/', return_message=return_data['message']) return Response(return_data) elif user is -2: return_data = {'message': 'Invalid password', 'ErrorCode': -201} if push_token == '-1': return redirect('/', return_message=return_data['message']) return Response(return_data) else: user_token = dict() user_token['owner'] = user user_token['push_token'] = push_token pushTokenSerializer = PushTokenSerializer(data=user_token) token, created = Token.objects.get_or_create(user=user) if pushTokenSerializer.is_valid(): pushTokenSerializer.save() return_data = {'token': token.key, 'email': email, 'message': "Login success", 'ErrorCode': 0} if push_token == '-1': return redirect('/', return_message=return_data['message']) return Response(return_data) return Response(return_data) # class for log out