我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用rest_framework.serializers.ValidationError()。
def validate(self, data): if self.instance and (now() - self.instance.created_at) > settings.PARKKIHUBI_TIME_PARKINGS_EDITABLE: if set(data.keys()) != {'time_end'}: raise ParkingException( _('Grace period has passed. Only "time_end" can be updated via PATCH.'), code='grace_period_over', ) if self.instance: # a partial update might be missing one or both of the time fields time_start = data.get('time_start', self.instance.time_start) time_end = data.get('time_end', self.instance.time_end) else: time_start = data['time_start'] time_end = data['time_end'] if time_end is not None and time_start > time_end: raise serializers.ValidationError(_('"time_start" cannot be after "time_end".')) return data
def post(self, request, *args, **kwargs): """Login a user given a username password combination Args: request (rest_framework.request.Request) """ email = request.data.get('email', None) password = request.data.get('password', None) if not all([email, password]): raise serializers.ValidationError({'error': 'email and/or password not provided'}) user = authenticate(email=email, password=password) if user is not None: login(request, user) return Response(PFBUserSerializer(user).data) else: return Response({ 'detail': 'Unable to login with provided username/password' }, status=status.HTTP_401_UNAUTHORIZED)
def set_password(self, request, pk=None): """Detail ``POST`` endpoint for changing a user's password Args: request (rest_framework.request.Request) pk (str): primary key for user to retrieve user from database Returns: Response """ old_password = request.data.get('oldPassword') user = authenticate(email=PFBUser.objects.get(uuid=pk).email, password=old_password) if not user: raise ValidationError({'detail': 'Unable to complete password change'}) new_password = request.data.get('newPassword') if not new_password: raise ValidationError({'detail': 'Unable to complete password change'}) user.set_password(new_password) user.save() return Response({'detail': 'Successfully changed password'}, status=status.HTTP_200_OK)
def validate(self, attrs): username = attrs.get('username') password = attrs.get('password') if username and password: user = authenticate(username=username, password=password) if user: if not user.is_active: msg = _('User account is disabled.') raise serializers.ValidationError(msg) else: msg = _('Unable to log in with provided credentials.') raise serializers.ValidationError(msg) else: msg = _('Must include "username" and "password".') raise serializers.ValidationError(msg) attrs['user'] = user return attrs
def _check_current_version(self, instance): """ Check that the edited_version parameter matches the current version. If different, it indicates a probably "edit conflict": the submitted edit is being made to a stale version of the model. """ edited_version = self.context['request'].data['edited_version'] current_version = reversion.get_for_date( instance, timezone.now()).id if not current_version == edited_version: raise serializers.ValidationError(detail={ 'detail': 'Edit conflict error! The current version for this object ' 'does not match the reported version being edited.', 'current_version': current_version, 'submitted_data': self.context['request'].data, })
def _check_tag_data(self, instance, validated_data): tag_data = validated_data['tags'] # For PUT, check that special tags are retained and unchanged. if not self.partial: for tag in instance.special_tags: if tag in instance.tags and tag not in tag_data: raise serializers.ValidationError(detail={ 'detail': 'PUT requests must retain all special tags. ' 'Your request is missing the tag: {}'.format(tag)}) # Check that special tags are unchanged. for tag in instance.special_tags: if (tag in instance.tags and tag in tag_data and tag_data[tag] != instance.tags[tag]): raise serializers.ValidationError(detail={ 'detail': 'Updates (PUT or PATCH) must not attempt ' 'to change the values for special tags. Your request ' 'attempts to change the value for tag ' "'{}' from '{}' to '{}'".format( tag, instance.tags[tag], tag_data[tag])}) return tag_data
def create(self, validated_data): """ Check that all required tags are included in tag data before creating. """ if ['tags', 'variant'] != sorted(validated_data.keys()): raise serializers.ValidationError(detail={ 'detail': "Create (POST) should include the 'tags' and " "'variant' fields. Your request contains the following " 'fields: {}'.format(str(validated_data.keys()))}) if 'tags' in validated_data: for tag in Relation.required_tags: if tag not in validated_data['tags']: raise serializers.ValidationError(detail={ 'detail': 'Create (POST) tag data must include all ' 'required tags: {}'.format(Relation.required_tags)}) return super(RelationSerializer, self).create(validated_data)
def validate_tags(self, tags_dict_list): tags_creators = [] has_errors = False errors = [] for tag_dict in tags_dict_list: if not tag_dict.get('id'): serializer = TagSerializer(data=tag_dict) else: instance = Tag.objects.get(id=tag_dict.get('id')) serializer = TagSerializer(instance, data=tag_dict) if serializer.is_valid(): tags_creators.append(lambda: serializer.save()) errors.append({}) else: has_errors = True errors.append(serializer.errors) if has_errors: raise serializers.ValidationError(errors) return tags_creators
def validate_molecules(self, value): if not value: raise serializers.ValidationError(_('You must specify at least 1 molecule')) # Set the relative priority of molecules in order for i, molecule in enumerate(value): molecule['priority'] = i + 1 # Make sure each molecule has the same instrument name if len(set(molecule['instrument_name'] for molecule in value)) > 1: raise serializers.ValidationError(_('Each Molecule must specify the same instrument name')) if sum([mol.get('fill_window', False) for mol in value]) > 1: raise serializers.ValidationError(_('Only one molecule can have `fill_window` set')) return value
def validate(self, attrs): username = attrs.get('username') password = attrs.get('password') if username and password: user = authenticate(username=username, password=password) if user: # From Django 1.10 onwards the `authenticate` call simply # returns `None` for is_active=False users. # (Assuming the default `ModelBackend` authentication backend.) if not user.is_active: msg = _('User account is disabled.') raise serializers.ValidationError(msg, code='authorization') else: msg = _('Unable to log in with provided credentials.') raise serializers.ValidationError(msg, code='authorization') else: msg = _('Must include "username" and "password".') raise serializers.ValidationError(msg, code='authorization') attrs['user'] = user return attrs
def validate(self, data): datasource = self.initial_data # Convert incoming string dates into date objects for validation for date_key in ["cycle_end_date", "cycle_deadline_date", "cycle_start_date"]: date = datasource.get(date_key, None) if date: datasource[date_key] = datetime.strptime(date, '%Y-%m-%d').date() # Update our current data if we have any with new data if self.instance: instance_data = self.instance.__dict__ instance_data.update(datasource) datasource = instance_data # Validate our dates are in a chronologically sound order if datasource.get("cycle_end_date") < datasource.get("cycle_start_date"): raise serializers.ValidationError("Cycle start date must be before cycle end date") if datasource.get("cycle_end_date") < datasource.get("cycle_deadline_date"): raise serializers.ValidationError("Cycle deadline date must be on or before the cycle end date") if datasource.get("cycle_deadline_date") < datasource.get("cycle_start_date"): raise serializers.ValidationError("Cycle deadline date must be after cycle start date") return data
def validate(self, data): """ Object level validation, all descendants should call this via Super() if it is overriden. Validates that only fields available in writable_fields are being written """ # Data may be stripped of invalid fields before it gets here, check if len(data.keys()) == 0: raise serializers.ValidationError("Invalid data") # Get a list of writable fields writable_fields = self.get_writable_fields() invalid_fields = [x for x in data.keys() if x not in writable_fields] if len(invalid_fields) > 0: raise serializers.ValidationError(f"The following fields are not writable: {', '.join(invalid_fields)}") return data
def validate_cpu(self, value): for k, v in value.viewitems(): if v is None: # use NoneType to unset a value continue if not re.match(PROCTYPE_MATCH, k): raise serializers.ValidationError("Process types can only contain [a-z]") shares = re.match(CPUSHARE_MATCH, str(v)) if not shares: raise serializers.ValidationError("CPU shares must be an integer") for v in shares.groupdict().viewvalues(): try: i = int(v) except ValueError: raise serializers.ValidationError("CPU shares must be an integer") if i > 1024 or i < 0: raise serializers.ValidationError("CPU shares must be between 0 and 1024") return value
def validate_domain(self, value): """ Check that the hostname is valid """ if len(value) > 255: raise serializers.ValidationError('Hostname must be 255 characters or less.') if value[-1:] == ".": value = value[:-1] # strip exactly one dot from the right, if present labels = value.split('.') if 'xip.io' in value: return value if labels[0] == '*': raise serializers.ValidationError( 'Adding a wildcard subdomain is currently not supported.') allowed = re.compile("^(?!-)[a-z0-9-]{1,63}(?<!-)$", re.IGNORECASE) for label in labels: match = allowed.match(label) if not match or '--' in label or label.isdigit() or \ len(labels) == 1 and any(char.isdigit() for char in label): raise serializers.ValidationError('Hostname does not look valid.') if models.Domain.objects.filter(domain=value).exists(): raise serializers.ValidationError( "The domain {} is already in use by another app".format(value)) return value
def validate(self, validated_data): rehive = Rehive(validated_data.get('token')) try: user = rehive.user.get() groups = [g['name'] for g in user['permission_groups']] if len(set(["admin", "service"]).intersection(groups)) <= 0: raise serializers.ValidationError( {"token": ["Invalid admin user."]}) except APIException: raise serializers.ValidationError({"token": ["Invalid user."]}) try: company = rehive.admin.company.get() except APIException: raise serializers.ValidationError({"token": ["Invalid company."]}) if Company.objects.filter(identifier=company['identifier']).exists(): raise serializers.ValidationError( {"token": ["Company already activated."]}) validated_data['user'] = user validated_data['company'] = company return validated_data
def validate(self, validated_data): rehive = Rehive(validated_data.get('token')) try: user = rehive.user.get() groups = [g['name'] for g in user['permission_groups']] if len(set(["admin", "service"]).intersection(groups)) <= 0: raise serializers.ValidationError( {"token": ["Invalid admin user."]}) except APIException: raise serializers.ValidationError({"token": ["Invalid user."]}) try: validated_data['company'] = Company.objects.get( identifier=user['company']) except Company.DoesNotExist: raise serializers.ValidationError( {"token": ["Company has not been activated yet."]}) return validated_data
def create(self, validated_data): company = self.context['request'].user.company log_id = self.context.get('view').kwargs.get('log_id') recipient = validated_data.get('recipient') try: log = NotificationLog.objects.get(notification__company=company, id=log_id) except NotificationLog.DoesNotExist: raise exceptions.NotFound() try: log.trigger(recipient=recipient) except Exception as exc: raise serializers.ValidationError( {"non_field_errors": ["Internal server error."]}) return log
def validate(self, data): errors = dict() if data.get('password'): password = data.get('password', '') try: validate_password(password=password) except ValidationError as e: errors['password'] = list(e.messages) if data.get('email'): email = data.get('email', '') users = models.User.objects.filter(email=email, channel__slug=self.context["request"].channel) if users.count(): errors['email'] = "An user with this email is already registered." if errors: raise serializers.ValidationError(errors) return super(UserCreateSerializer, self).validate(data)
def validate(self, data): errors = dict() if data.get('password') or data.get('current_password'): current_password = data.pop('current_password', '') password = data.get('password', '') try: validate_password(password=password) except ValidationError as e: errors['password'] = list(e.messages) if not authenticate(email=self.context['request'].user.email, password=current_password, channel=self.context["request"].channel): errors['current_password'] = ["Invalid password."] if errors: raise serializers.ValidationError(errors) return super(UserCreateSerializer, self).validate(data)
def startremotechannelimport(self, request): try: channel_id = request.data["channel_id"] except KeyError: raise serializers.ValidationError("The channel_id field is required.") baseurl = request.data.get("baseurl", settings.CENTRAL_CONTENT_DOWNLOAD_BASE_URL) job_metadata = { "type": "REMOTECHANNELIMPORT", "started_by": request.user.pk, } job_id = get_client().schedule( call_command, "importchannel", "network", channel_id, baseurl=baseurl, extra_metadata=job_metadata, ) resp = _job_to_response(get_client().status(job_id)) return Response(resp)
def validate(self, data): """ Check that start_end/end_times are valid """ if data['start_time'] >= data['end_time']: raise serializers.ValidationError('End time must occur after start') create_model = self.context['view'].model user = self.context['request'].user start_time = data['start_time'] end_time = data['end_time'] if create_model.objects.filter(user=user).filter(end_time__gte=start_time, start_time__lte=end_time).exists(): raise ValidationError('Overlapping Periods Found') return data
def create(self, validated_data): """ This method is overwritten in order to create User object and associate it with reseller. This operation is needed to create token for reseller """ application_id = self.initial_data['application'].id reseller_name = validated_data['name'] username = '{application_id}.{reseller_name}'.format(application_id=application_id, reseller_name=reseller_name) if get_user_model().objects.filter(username=username).exists(): raise ValidationError('Reseller with such name is already created') user = get_user_model().objects.create(username=username) return Reseller.objects.create(owner=user, application=self.initial_data['application'], **validated_data)
def validate(self, data): """ Check that the user is a member of the club. """ club = data['club'] user = data['user'] project = data['project'] if not project.has_club_member(user): raise serializers.ValidationError( "The specified user must be a member of at least one of the " + "parent clubs!" ) if not club.has_member(user): raise serializers.ValidationError( "The specified user must be a member of the specified club!" ) return super(ProjectMembershipSerializer, self).validate(data)
def filter_queryset(self, queryset): site_id = self.kwargs.get('site_id', None) if site_id is None: # If no username is specified, the request must be authenticated if self.request.user.is_anonymous(): # raises a permission denied exception, forces authentication self.permission_denied(self.request) else: try: int(site_id) except: raise serializers.ValidationError({'site': "Site Id Not Given."}) else: return super(SiteFormViewSet, self).filter_queryset(queryset) return super(SiteFormViewSet, self).filter_queryset(queryset) site_id = int(site_id) queryset = queryset.filter(site__id=site_id) return queryset
def run_validation(self, data=[]): """ We override the default `run_validation`, because the validation performed by validators and the `.validate()` method should be coerced into an error dictionary with a 'non_fields_error' key. """ (is_empty_value, data) = self.validate_empty_values(data) if is_empty_value: return data try: value = self.to_internal_value(data) self.run_validators(value) value = self.validate(value) assert value is not None, '.validate() should return the validated data' # noqa except (ValidationError, DjangoValidationError) as exc: # TODO: Must be 'recipient' instead of 'to' in v2 raise ValidationError( detail={ 'to': data['to'], 'errors': get_validation_error_detail(exc)}) return value
def validate(self, attrs): """ http://www.django-rest-framework.org/topics/3.0-announcement/#differences-between-modelserializer-validation-and-modelform """ request = self.context.get('request', None) # Re-use model validation logic instance = Message(author=request.user, **attrs) try: instance.clean() if 'html' in attrs: instance.validate_html() except DjangoValidationError as err: message = {} for field, errors in err.message_dict.items(): if not isinstance(errors, list): errors = [errors] message[field] = errors raise ValidationError(message) return attrs
def validate(self, attrs): # this is defined as a pre_save because cant be checked in clean() as # there is not yet instance.message at this step. same_named = attrs['message'].attachments.filter( file__endswith='/{}'.format(attrs['file'].name)) if same_named.exists(): raise ValidationError(( 'A file with the same name already exists ' 'for this message : {}').format(same_named.first())) # Now check whether we would fit within the total allowed # attachment size for the message total_size = 0 for attachment in attrs['message'].attachments.all(): total_size += attachment.size total_size += attrs['file'].size if total_size > settings.CAMPAIGNS['ATTACHMENT_TOTAL_MAX_SIZE']: raise ValidationError(_( 'Can not accept this file. Total attachment size for this ' 'message ({} bytes) would exceed the maximum allowed size of ' '{} bytes'.format( total_size, settings.CAMPAIGNS[ 'ATTACHMENT_TOTAL_MAX_SIZE']))) return attrs
def is_partly_invalid(self): """ Allows to have a multiple serializer (many=True) with part of the raise serializers data valid and part of the data invalid. """ assert hasattr(self, 'initial_data'), ( 'Cannot call `.is_partly_valid()`no `data=` keyword argument was' 'passed when instantiating the serializer instance.' ) has_errors = False # this one may trigger a ValidationError up to http client self.run_list_size_validation(self.initial_data) # those can't, from here, validation is always considered good if not hasattr(self, '_validated_data'): self._validated_data, exc = self.run_partial_validation( self.initial_data) self._partial_errors = exc.detail if exc.detail: has_errors = True # partly valid if at least something is valid return has_errors
def run_partial_validation(self, data=fields.empty): """ We override the default `run_validation`, because the validation performed by validators and the `.validate()` method should be coerced into an error dictionary with a 'non_fields_error' key. """ # Called 1 time for all the data list # output of this function is set into self._validated_data by is_valid (is_empty_value, data) = self.validate_empty_values(data) if is_empty_value: return data value, error = self.to_internal_value_error(data) try: self.run_validators(value) # does nothing in our case value = self.validate(value) assert value is not None, \ '.validate() should return the validated data' except (ValidationError, DjangoValidationError) as exc: raise ValidationError(detail=get_validation_error_detail(exc)) return value, error
def validate_membership(self, attrs, source): all_modified = attrs.get('membership') all_existing = Member.objects.filter( user=self.object, status=MemberStatusField.STATUS_MEMBER ) all_existing = list(all_existing) all_modified = list(all_modified) for exi in all_existing: found = False for mod in all_modified: if exi.id == mod.id: found = True if mod.status != MemberStatusField.STATUS_MEMBER: msg = 'Only members with STATUS_MEMBER could be updated' raise serializers.ValidationError(msg) if not found: msg = 'All memberships keys has to be changed at once' raise serializers.ValidationError(msg) return attrs
def validate(self, data): name = data.get('name') config = data.get('config', {}) if not name or name not in plugins: raise serializers.ValidationError('Invalid plugin name') plugin_schema = plugins[name] if self.instance: initial_config = self.instance.config initial_config.update(config) config = initial_config try: jsonschema.validate(config, plugin_schema) except jsonschema.ValidationError as e: raise serializers.ValidationError({'config': e}) plugin_validators = validators.validator_classes.get(name, []) for validator in plugin_validators: validate = validator(data['config']) validate() return data
def validate(self, data): """ Verify that only existing intances connected to the old fermentable id are present here. The instances part of this data should only be used for removing certain fermentable instances from the data. """ if 'instances' in data['new_fermentable']: for instance_id in data['new_fermentable']['intances']: instance = models.FermentableInstance.objects.get(pk=instance_id) if instance is None: raise serializers.ValidationError(str(instance_id) + " does not correspond to a FermentableInstance.") if instance is not None: if instance.fermentable.id != old_fermentable_id: raise serializers.ValidationError( "Instance with id '{0}' does not belong to the old fermentable with " \ "id '{1}'. New instances cannot be added with this endpoint. This " \ "should only be used for taking old instances out of the suggestion." \ .format(intance_id, data['old_fermentable_id'])) return super(FermentableSuggestion, self).validate(data)
def validate_moira_lists(lists): """ Raise a validation error if any of the moira lists in a list does not exist or is not a mailing list Args: lists(list of MoiraList): List of moira lists Returns: (list of MoiraList) List of moira lists """ bad_lists = [] moira_client = get_moira_client() for mlist in lists: if not moira_client.list_exists(mlist.name): bad_lists.append(mlist.name) else: attributes = moira_client.client.service.getListAttributes(mlist.name, moira_client.proxy_id) if not (attributes and attributes[0]['mailList']): bad_lists.append(mlist.name) if bad_lists: raise serializers.ValidationError("Not found or not mailing list: {}".format(','.join(bad_lists))) return lists
def validate(self, data): """ Validate visibility is not required for replies. If given, make sure it is the same as parent. If not given, use parent visibility. """ parent = data.get("parent") if parent: if data.get("visibility") and parent.visibility != data.get("visibility"): raise serializers.ValidationError("Visibility was given but it doesn't match parent.") data["visibility"] = parent.visibility else: if not self.instance and not data.get("visibility"): raise serializers.ValidationError("Visibility is required") return data
def validate_parent(self, value): # Validate parent cannot be changed if self.instance and value != self.instance.parent: raise serializers.ValidationError("Parent cannot be changed for an existing Content instance.") # Validate user can see parent if not self.instance and value: request = self.context.get("request") if not value.visible_for_user(request.user): raise serializers.ValidationError("Parent not found") return value
def validate_username(self, username): if User.objects.filter(username=username).exists(): raise serializers.ValidationError('Username already exist') return username
def validate(self, data): if data['password1'] != data['password2']: raise serializers.ValidationError('Passwords didn\'t match') return data
def validate(self, data): if data['name'] != data['name'].replace(" ", "").lower(): raise serializers.ValidationError(config.KEYWORD_NOT_ALLOWED) return data
def validate(self, data): # To handle cases creation, update and partial update pizza = data.get('pizza', getattr(self.instance, 'pizza', None)) size = data.get('size', getattr(self.instance, 'size', None)) if size not in pizza.sizes: raise serializers.ValidationError(_('You should provide valid size for your pizza')) return data
def __call__(self, value): if value > self.high or value < self.low: raise serializers.ValidationError("value must be between %d and %d (inclusive)" % (self.low, self.high)) ########################################################################## ## Serializers ##########################################################################
def test_in_range_validator(self): """ Test the in-range validator """ validator = InRange(-1, 1) self.assertRaises(serializers.ValidationError, validator, -2) self.assertRaises(serializers.ValidationError, validator, 2) self.assertNotRaises(serializers.ValidationError, validator, -1) self.assertNotRaises(serializers.ValidationError, validator, 1) self.assertNotRaises(serializers.ValidationError, validator, 0)
def validate(self, attrs): if attrs['password'] != attrs['repeated']: raise serializers.ValidationError("passwords do not match!") return attrs
def __call__(self, value): if len(value) > self.limit: raise serializers.ValidationError( "Cannot add more than {} {}!".format(self.limit, self.things) ) ########################################################################## ## Serializers ##########################################################################
def create(self, validated_data): zone = Zone.objects.create(**validated_data) try: zone.r53_zone.create() except ClientError as e: raise serializers.ValidationError(detail=str(e)) return zone
def validate(self, attrs): alias = attrs.get(self.alias_type) if alias: # Create or authenticate a user # Return THem if api_settings.PASSWORDLESS_REGISTER_NEW_USERS is True: # If new aliases should register new users. user, created = User.objects.get_or_create(**{self.alias_type: alias}) else: # If new aliases should not register new users. try: user = User.objects.get(**{self.alias_type: alias}) except User.DoesNotExist: user = None if user: if not user.is_active: # If valid, return attrs so we can create a token in our logic controller msg = _('User account is disabled.') raise serializers.ValidationError(msg) else: msg = _('No account is associated with this alias.') raise serializers.ValidationError(msg) else: msg = _('Missing %s.') % self.alias_type raise serializers.ValidationError(msg) attrs['user'] = user return attrs
def validate(self, attrs): msg = _('There was a problem with your request.') if self.alias_type: # Get request.user # Get their specified valid endpoint # Validate request = self.context["request"] if request and hasattr(request, "user"): user = request.user if user: if not user.is_active: # If valid, return attrs so we can create a token in our logic controller msg = _('User account is disabled.') else: if hasattr(user, self.alias_type): # Has the appropriate alias type attrs['user'] = user return attrs else: msg = _('This user doesn\'t have an %s.' % self.alias_type) raise serializers.ValidationError(msg) else: msg = _('Missing %s.') % self.alias_type raise serializers.ValidationError(msg)
def token_age_validator(value): """ Check token age Makes sure a token is within the proper expiration datetime window. """ valid_token = validate_token_age(value) if not valid_token: raise serializers.ValidationError("The token you entered isn't valid.") return value