Python django.db 模块,DataError() 实例源码

我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用django.db.DataError()

项目:FinSL-signbank    作者:Signbank    | 项目源码 | 文件源码
def test_idgloss(self):
        """Tests idgloss"""
        gloss = Gloss.objects.get(idgloss="testgloss")

        # Check for some weird characters
        weird_chars = ("äöåÄÖŨ^~'* ´`üÜÿŸëêËÊ€$#", "?????????????????????????????", "? ???????? ?? ???, ?????? ???",)
        for my_str in weird_chars:
            gloss.idgloss = my_str
            gloss.save()
            self.assertEqual(gloss.idgloss, str(gloss.idgloss))
            self.assertEqual(gloss.idgloss, my_str)

        # Test that the length of idgloss can't be too long
        with self.assertRaises(DataError):
            gloss.idgloss = "afasdkfjsdalkfjdsaljfl^¨'*´`} sajfljadsklfjasdklfjsadkjflÄÖÅlöjsadkfjasdkljflaksdjfkljds"
            "fljasdlkfjakdslkafjsdlkafjölasdjfkldsajlaköfjsdakljfklasdjfkldsjaflkajdsflökjdsalkfjadslköfjdsalökjfklsd"
            "ajflkdsjlkfajöldskjflkadsjflkdsajfladslkfjdlksa"
            gloss.save()
项目:django-oauth2-test    作者:ONSdigital    | 项目源码 | 文件源码
def delete(self, request):
        """
        Take the user object from the request and call the 'delete' method on it if it exists in the DB.
        If this succeeds then we can report a success.

        :param request:
        :param args:
        :param kwargs:
        :return: Serialised JSON Response Object to indicate the resource has been created
        """

        stdlogger.debug("Hitting HTTP DELETE account view")
        user_email = request.user.email
        try:
            stdlogger.debug("Deleting this user object")
            request.user.delete()
        except (IntegrityError, InternalError, DataError, DatabaseError):
            # The chances of this happening are slim to none! And this line of code should never happen. So we really
            # need to tell the other system we are not capable of creating the resource.
            raise DatabaseFailureException

        context = {'account': user_email, 'deleted': 'success'}
        json_context = JSONRenderer().render(context)

        return Response(data=json_context, status=status.HTTP_201_CREATED,)
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def fetch_og_preview(content, urls):
    """Fetch first opengraph entry for a list of urls."""
    for url in urls:
        # See first if recently cached already
        if OpenGraphCache.objects.filter(url=url, modified__gte=now() - datetime.timedelta(days=7)).exists():
            opengraph = OpenGraphCache.objects.get(url=url)
            Content.objects.filter(id=content.id).update(opengraph=opengraph)
            return opengraph
        try:
            og = OpenGraph(url=url, parser="lxml")
        except AttributeError:
            continue
        if not og or ("title" not in og and "site_name" not in og and "description" not in og and "image" not in og):
            continue
        try:
            title = og.title if "title" in og else og.site_name if "site_name" in og else ""
            description = og.description if "description" in og else ""
            image = og.image if "image" in og and not content.is_nsfw else ""
            try:
                with transaction.atomic():
                    opengraph = OpenGraphCache.objects.create(
                        url=url,
                        title=truncate_letters(safe_text(title), 250),
                        description=safe_text(description),
                        image=safe_text(image),
                    )
            except DataError:
                continue
        except IntegrityError:
            # Some other process got ahead of us
            opengraph = OpenGraphCache.objects.get(url=url)
            Content.objects.filter(id=content.id).update(opengraph=opengraph)
            return opengraph
        Content.objects.filter(id=content.id).update(opengraph=opengraph)
        return opengraph
    return False
项目:pfb-network-connectivity    作者:azavea    | 项目源码 | 文件源码
def get(self, request, format=None, *args, **kwargs):
        query = """
        SELECT row_to_json(fc)
        FROM (
            SELECT 'FeatureCollection' AS type,
                array_to_json(array_agg(f)) AS features
            FROM (SELECT 'Feature' AS type,
                  ST_AsGeoJSON(g.geom_simple)::json AS geometry,
                  g.uuid AS id,
                  row_to_json((SELECT p FROM (
                    SELECT uuid AS id, name, label, state_abbrev, organization_id) AS p))
                    AS properties
            FROM pfb_analysis_neighborhood AS g WHERE g.uuid = %s) AS f) AS fc;
        """

        # get the neighborhood ID from the request
        uuid = kwargs.get('neighborhood', '')
        if not uuid:
            return Response({})

        try:
            with connection.cursor() as cursor:
                cursor.execute(query, [uuid])
                json = cursor.fetchone()
                if not json or not len(json):
                    return Response({})
        except DataError:
            raise NotFound(detail='{} is not a valid neighborhood UUID.'.format(uuid))

        return Response(json[0])
项目:django-modeltrans    作者:zostera    | 项目源码 | 文件源码
def test_textfield(self):
        '''
        Constrains on the original field should also be enforced on the
        translated virtual fields (except for null/blank).

        Note that the database contraints are not enforced on the virtual fields,
        because those are ignored by Django.
        '''

        expected_message = 'value too long for type character varying(50)'

        short_str = 'bla bla'
        long_str = 'bla' * 40

        with transaction.atomic():
            with self.assertRaisesMessage(DataError, expected_message):
                TextModel.objects.create(title=long_str)

        with self.assertRaises(ValidationError) as e:
            b = TextModel.objects.create(title=short_str, title_nl=long_str)
            b.full_clean()

        self.assertEquals(sorted(list(e.exception), key=lambda v: v[0]), [
            ('description', ['This field cannot be blank.']),
            ('title_nl', ['Ensure this value has at most 50 characters (it has 120).']),
        ])

        TextModel.objects.create(title=short_str, description=long_str)

        m = TextModel.objects.create(
            title=short_str,
            description=short_str,
            description_nl=long_str,
            description_de=long_str
        )
        self.assertEquals(m.description_nl, long_str)
项目:sdining    作者:Lurance    | 项目源码 | 文件源码
def qs_exists(queryset):
    try:
        return queryset.exists()
    except (TypeError, ValueError, DataError):
        return False
项目:sdining    作者:Lurance    | 项目源码 | 文件源码
def qs_filter(queryset, **kwargs):
    try:
        return queryset.filter(**kwargs)
    except (TypeError, ValueError, DataError):
        return queryset.none()
项目:django-adldap-sync    作者:marchete    | 项目源码 | 文件源码
def sync_ldap_groups(self, ldap_groups):
        """Synchronize LDAP groups with local group model."""
        groupname_field = 'name'
        self.stats_group_total = len(ldap_groups)

        for cname, ldap_attributes in ldap_groups:
            defaults = {}
            try:
                for name, attribute in ldap_attributes.items():
                    defaults[self.conf_LDAP_SYNC_GROUP_ATTRIBUTES[name]] = attribute[0].decode('utf-8')
            except AttributeError:
                # In some cases attrs is a list instead of a dict; skip these invalid groups
                continue

            try:
                groupname = defaults[groupname_field]
            except KeyError:
                logger.warning("Group is missing a required attribute '%s'" % groupname_field)
                self.stats_group_errors += 1
                continue

            kwargs = {
                groupname_field + '__iexact': groupname,
                'defaults': defaults,
            }
            try:
                group, created = Group.objects.get_or_create(**kwargs)
            except (IntegrityError, DataError) as e:
                logger.error("Error creating group %s: %s" % (groupname, e))
                self.stats_group_errors += 1
            else:
                if created:
                    self.stats_group_added += 1
                    logger.debug("Created group %s" % groupname)

        logger.info("Groups are synchronized")
项目:isar    作者:ilbers    | 项目源码 | 文件源码
def _create_object_from_params(self, lookup, params):
        """
        Tries to create an object using passed params.
        Used by get_or_create and update_or_create
        """
        try:
            obj = self.create(**params)
            return obj, True
        except (IntegrityError, DataError):
            exc_info = sys.exc_info()
            try:
                return self.get(**lookup), False
            except self.model.DoesNotExist:
                pass
            six.reraise(*exc_info)
项目:transportation-backend    作者:hackoregon    | 项目源码 | 文件源码
def get_queryset(self,):
        params = self.request.query_params

        showNulls = params.get('showNulls', None)

        sourceName = params.get('source_name', None)

        filteredFeatures = Feature.objects.all()
        if sourceName:
            filteredFeatures = filteredFeatures.filter(source_name=sourceName)

        defaultRange = 180
        defaultStart = datetime.date.today() - datetime.timedelta(days=2)
        defaultEnd = defaultStart + datetime.timedelta(days=defaultRange)
        startDate = params.get('startDate', None)
        endDate = params.get('endDate', None)    

        if startDate or endDate:
            if startDate and endDate:
                try:
                    queryDateRange = DateRange(lower=startDate, upper=endDate)
                except DataError:
                    queryDateRange = DateRange(lower=defaultStart, upper=defaultEnd)
            elif startDate:
                calcEnd = parser.parse(startDate) + datetime.timedelta(days=3)
                queryDateRange = DateRange(lower=startDate, upper=calcEnd)
            else:
                queryDateRange = DateRange(lower=defaultStart, upper=defaultEnd)

            filteredFeatures = filteredFeatures.filter(canonical_daterange__overlap=queryDateRange)

        elif not showNulls:
            filteredFeatures = filteredFeatures.exclude(canonical_daterange=None).exclude(canonical_daterange__isempty=True)

        print('featurecount', filteredFeatures.count())
        return filteredFeatures
项目:django-oauth2-test    作者:ONSdigital    | 项目源码 | 文件源码
def post(self, request, *args, **kwargs):
        """
        Take the user object from the request and call the 'save' method on it to persist to the DB. If this succeeds
        then we can report a success.

        :param request:
        :param args:
        :param kwargs:
        :return: Serialised JSON Response Object to indicate the resource has been created
        """
        stdlogger.debug("Hitting HTTP POST account view")

        # Try and persist the user to the DB. Remember this could fail a data integrity check if some other system has
        # saved this user before we run this line of code!
        try:
            request.user.save()

        except (IntegrityError, InternalError, DataError, DatabaseError):
            # The chances of this happening are slim to none! And this line of code should never happen. So we really
            # need to tell the other system we are not capable of creating the resource.
            raise DatabaseFailureException

        context = {'account': request.user.email, 'created': 'success'}
        json_context = JSONRenderer().render(context)

        return Response(data=json_context, status=status.HTTP_201_CREATED,)
项目:django-oauth2-test    作者:ONSdigital    | 项目源码 | 文件源码
def put(self, request):
        """
        Take the user object from the request and updates user info if it exists in the DB. If the email (which is
        unique) is to be updated this will be in a new attribute within the PUT message called new_username.
        If this succeeds then we can report a success.

        :param request:
        :param args:
        :param kwargs:
        :return: Serialised JSON Response Object to indicate the resource has been created
        """

        stdlogger.debug("Hitting HTTP PUT account view")

        # Try and persist the user to the DB. Remember this could fail a data integrity check if some other system has
        # saved this user before we run this line of code!
        try:
            # Check to see if this PUT is changing the user ID. If it is, then keep the same user object with Primary
            # Key and change the email to the new one.
            if request.new_username:
                stdlogger.info("Admin is updating a user ID to a new value")
                request.user.email = request.new_username
            request.user.save()

        except (IntegrityError, InternalError, DataError, DatabaseError):
            # The chances of this happening are slim to none! And this line of code should never happen. So we really
            # need to tell the other system we are not capable of creating the resource.
            raise DatabaseFailureException

        context = {'account': request.user.email, 'updated': 'success'}
        json_context = JSONRenderer().render(context)

        return Response(data=json_context, status=status.HTTP_201_CREATED,)
项目:catchpy    作者:nmaekawa    | 项目源码 | 文件源码
def _create_from_webannotation(
        cls, catcha, preserve_create=False):
        '''creates new annotation instance and saves in db.'''

        # fetch reply-to if it's a reply
        body = cls._group_body_items(catcha)

        # fill up derived properties in catcha
        catcha['totalReplies'] = 0

        try:
            with transaction.atomic():
                a = Anno(
                    anno_id=catcha['id'],
                    schema_version=catcha['schema_version'],
                    creator_id=catcha['creator']['id'],
                    creator_name=catcha['creator']['name'],
                    anno_reply_to=body['reply_to'],
                    can_read=catcha['permissions']['can_read'],
                    can_update=catcha['permissions']['can_update'],
                    can_delete=catcha['permissions']['can_delete'],
                    can_admin=catcha['permissions']['can_admin'],
                    body_text=body['text'],
                    body_format=body['format'],
                    raw=catcha,
                )

                # validate  target objects
                target_list = cls._create_targets_for_annotation(a, catcha)

                # create anno, target, and tags relationship as transaction
                a.save()  # need to save before setting relationships
                for t in target_list:
                    t.save()
                tags = cls._create_taglist(body['tags'])
                a.anno_tags = tags

                # warn: order is important, update "created" after the first
                # save, or it won't take effect - first save is auto-now_add
                if preserve_create:
                    a.created = cls._get_original_created(catcha)

                a.raw['created'] = a.created.replace(microsecond=0).isoformat()
                a.save()
        except IntegrityError as e:
            msg = 'integrity error creating anno({}): {}'.format(
                catcha['id'], e)
            logger.error(msg, exc_info=True)
            raise DuplicateAnnotationIdError(msg)
        except DataError as e:
            msg = 'tag too long for anno({})'.format(catcha['id'])
            logger.error(msg, exc_info=True)
            raise InvalidInputWebAnnotationError(msg)
        else:
            return a
项目:catchpy    作者:nmaekawa    | 项目源码 | 文件源码
def _update_from_webannotation(cls, anno, catcha):
        '''updates anno according to catcha input.

        recreates list of tags and targets every time
        '''
        # fetch reply-to if it's a reply
        body = cls._group_body_items(catcha)

        # fill up derived properties in catcha
        catcha['totalReplies'] = anno.total_replies
        catcha['id'] = anno.anno_id

        # update the annotation object
        anno.schema_version = catcha['schema_version']
        anno.creator_id = catcha['creator']['id']
        anno.creator_name = catcha['creator']['name']
        anno.anno_reply_to = body['reply_to']
        anno.can_read = catcha['permissions']['can_read']
        anno.can_update = catcha['permissions']['can_update']
        anno.can_delete = catcha['permissions']['can_delete']
        anno.can_admin = catcha['permissions']['can_admin']
        anno.body_text = body['text']
        anno.body_format = body['format']
        anno.raw = catcha

        try:
            with transaction.atomic():
                # validate  input target objects
                target_list = cls._create_targets_for_annotation(anno, catcha)
                # remove all targets from annotation object
                cls._delete_targets(anno)
                # persist input target objects
                for t in target_list:
                    t.save()
                # dissociate tags from annotation
                anno.anno_tags.clear()
                # create tags
                if body['tags']:
                    tags = cls._create_taglist(body['tags'])
                    anno.anno_tags = tags
                anno.save()
        except (IntegrityError, DataError, DatabaseError) as e:
            msg = '-failed to create anno({}): {}'.format(anno.anno_id, str(e))
            logger.error(msg, exc_info=True)
            raise InvalidInputWebAnnotationError(msg)
        else:
            return anno