Python django.db.models.aggregates 模块,Count() 实例源码

我们从Python开源项目中,提取了以下25个代码示例,用于说明如何使用django.db.models.aggregates.Count()

项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def get_usable_luns(cls, queryset):
        """
        Get all Luns which are not used by Targets and have enough VolumeNode configuration
        to be used as a Target (i.e. have only one node or at least have a primary node set)

        Luns are usable if they have only one VolumeNode (i.e. no HA available but
        we can definitively say where it should be mounted) or if they have
        a primary VolumeNode (i.e. one or more VolumeNodes is available and we
        know at least where the primary mount should be)
        """
        queryset = cls.get_unused_luns(queryset)\
                      .filter(volumenode__host__not_deleted=True)\
                      .annotate(has_primary=BoolOr('volumenode__primary'), num_volumenodes=Count('volumenode'))\
                      .filter(Q(num_volumenodes=1) | Q(has_primary=True))

        return queryset
项目:django-datawatch    作者:RegioHelden    | 项目源码 | 文件源码
def get_stats(self):
        return self.values('status').annotate(amount=Count('id')).with_status_name()
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def get_count(self, using):
        """
        Performs a COUNT() query using the current filter constraints.
        """
        obj = self.clone()
        obj.add_annotation(Count('*'), alias='__count', is_summary=True)
        number = obj.get_aggregation(using, ['__count'])['__count']
        if number is None:
            number = 0
        return number
项目:fantasie    作者:shawn1m    | 项目源码 | 文件源码
def get_random_song_list(self, n):
        # ?????n?QueryObject??,??QuerySet
        # ???????order_by('?')
        result = []
        count = self.aggregate(count=Count('id'))['count']
        for _ in range(n):
            try:
                random_index = randint(0, count - 1)
            except ValueError:
                raise MusicLibrary.DoesNotExist
            result.append(self.all()[random_index])
        return result
项目:ODM2WebSDL    作者:ODM2    | 项目源码 | 文件源码
def fix_results_value_count():
        results = Result.objects.annotate(number_of_values=Count('timeseriesresult__values'))
        for result in results:
            result.value_count = result.number_of_values
            result.save()
项目:verbes    作者:larose    | 项目源码 | 文件源码
def random(self, user):
        conjugations = self
        if not user.is_anonymous():
            mood_tense = MoodTense.objects.filter(users=user)
            if mood_tense.count():
                conjugations = conjugations.filter(mood_tense__in=MoodTense.objects.filter(users=user))

        count = conjugations.aggregate(count=Count('id'))['count']
        random_index = random.randint(0, count - 1)
        return conjugations.order_by('id')[random_index]
项目:kolibri    作者:learningequality    | 项目源码 | 文件源码
def filter_popular(self, queryset, value):
        """
        Recommend content that is popular with all users.

        :param queryset: all content nodes for this channel
        :param value: id of currently logged in user, or none if user is anonymous
        :return: 10 most popular content nodes
        """
        if ContentSessionLog.objects.count() < 50:
            # return 25 random content nodes if not enough session logs
            pks = queryset.values_list('pk', flat=True).exclude(kind=content_kinds.TOPIC)
            # .count scales with table size, so can get slow on larger channels
            count_cache_key = 'content_count_for_popular'
            count = cache.get(count_cache_key) or min(pks.count(), 25)
            return queryset.filter(pk__in=sample(list(pks), count))

        cache_key = 'popular_content'
        if cache.get(cache_key):
            return cache.get(cache_key)

        # get the most accessed content nodes
        content_counts_sorted = ContentSessionLog.objects \
            .values_list('content_id', flat=True) \
            .annotate(Count('content_id')) \
            .order_by('-content_id__count')

        most_popular = queryset.filter(content_id__in=list(content_counts_sorted[:10]))

        # cache the popular results queryset for 10 minutes, for efficiency
        cache.set(cache_key, most_popular, 60 * 10)
        return most_popular
项目:osedev    作者:damoti    | 项目源码 | 文件源码
def data(self):
        return self.get_query().annotate(
            minutes=Sum('minutes'),
            users=Count('user', distinct=True)
        )
项目:osedev    作者:damoti    | 项目源码 | 文件源码
def get_queryset(self, request):
        return (
            super().get_queryset(request)
            .annotate(participants_count=Count('participants', distinct=True))
            .annotate(messages_count=Count('messages', distinct=True))
        )
项目:osedev    作者:damoti    | 项目源码 | 文件源码
def get_queryset(self, request):
        return super().get_queryset(request).annotate(applications_count=Count('instances'))
项目:osedev    作者:damoti    | 项目源码 | 文件源码
def get_queryset(self, request):
        return super().get_queryset(request).annotate(terms_count=Count('terms'))
项目:vaultier    作者:Movile    | 项目源码 | 文件源码
def _get_unrecoverable_nodes(self, user):
        """
        Filter Workspace where the user is the only member
        :param user:
        :return: QuerySet
        """
        member_cls = get_model('accounts', 'Member')
        return get_model('nodes', 'Node').objects.filter(
            pk__in=member_cls.objects.filter(user=user).values_list(
                'node_id', flat=True)
        ).annotate(is_recoverable=Count('membership')).exclude(
            is_recoverable__gt=1)
项目:ewe_ebooks    作者:jaymcgrath    | 项目源码 | 文件源码
def random(self):
        count = self.aggregate(count=Count('id'))['count']
        random_index = randint(0, count - 1)
        return self.all()[random_index]
项目:ewe_ebooks    作者:jaymcgrath    | 项目源码 | 文件源码
def random(self):
        count = self.aggregate(count=Count('id'))['count']
        random_index = randint(0, count - 1)
        return self.all()[random_index]
项目:lifesoundtrack    作者:MTG    | 项目源码 | 文件源码
def get_count(self, using):
        """
        Performs a COUNT() query using the current filter constraints.
        """
        obj = self.clone()
        obj.add_annotation(Count('*'), alias='__count', is_summary=True)
        number = obj.get_aggregation(using, ['__count'])['__count']
        if number is None:
            number = 0
        return number
项目:ssbc    作者:banwagong-news    | 项目源码 | 文件源码
def top_hourly(self):
        now = timezone.now()
        time_begin = now - datetime.timedelta(hours=1)
        cursor = self.filter(log_time__gte=time_begin).values('keyword').annotate(pv=Count('ip', distinct=True)).order_by('-pv')
        return cursor
项目:ssbc    作者:banwagong-news    | 项目源码 | 文件源码
def top_daily(self):
        now = timezone.now()
        time_begin = now - datetime.timedelta(days=1)
        cursor = self.filter(log_time__gte=time_begin).values('keyword').annotate(pv=Count('ip', distinct=True)).order_by('-pv')
        return cursor
项目:ssbc    作者:banwagong-news    | 项目源码 | 文件源码
def top_hourly(self):
        now = timezone.now()
        time_begin = now - datetime.timedelta(hours=1)
        cursor = self.filter(log_time__gte=time_begin).values('hash_id').annotate(pv=Count('ip', distinct=True)).order_by('-pv')
        return cursor
项目:ssbc    作者:banwagong-news    | 项目源码 | 文件源码
def top_daily(self):
        now = timezone.now()
        time_begin = now - datetime.timedelta(days=1)
        cursor = self.filter(log_time__gte=time_begin).values('hash_id').annotate(pv=Count('ip', distinct=True)).order_by('-pv')
        return cursor
项目:liberator    作者:libscie    | 项目源码 | 文件源码
def get_count(self, using):
        """
        Performs a COUNT() query using the current filter constraints.
        """
        obj = self.clone()
        obj.add_annotation(Count('*'), alias='__count', is_summary=True)
        number = obj.get_aggregation(using, ['__count'])['__count']
        if number is None:
            number = 0
        return number
项目:djanoDoc    作者:JustinChavez    | 项目源码 | 文件源码
def get_count(self, using):
        """
        Performs a COUNT() query using the current filter constraints.
        """
        obj = self.clone()
        obj.add_annotation(Count('*'), alias='__count', is_summary=True)
        number = obj.get_aggregation(using, ['__count'])['__count']
        if number is None:
            number = 0
        return number
项目:django-next-train    作者:bitpixdigital    | 项目源码 | 文件源码
def get_count(self, using):
        """
        Performs a COUNT() query using the current filter constraints.
        """
        obj = self.clone()
        obj.add_annotation(Count('*'), alias='__count', is_summary=True)
        number = obj.get_aggregation(using, ['__count'])['__count']
        if number is None:
            number = 0
        return number
项目:LatinSounds_AppEnviaMail    作者:G3ek-aR    | 项目源码 | 文件源码
def get_count(self, using):
        """
        Performs a COUNT() query using the current filter constraints.
        """
        obj = self.clone()
        obj.add_annotation(Count('*'), alias='__count', is_summary=True)
        number = obj.get_aggregation(using, ['__count'])['__count']
        if number is None:
            number = 0
        return number
项目:django-wechat-api    作者:crazy-canux    | 项目源码 | 文件源码
def get_count(self, using):
        """
        Performs a COUNT() query using the current filter constraints.
        """
        obj = self.clone()
        obj.add_annotation(Count('*'), alias='__count', is_summary=True)
        number = obj.get_aggregation(using, ['__count'])['__count']
        if number is None:
            number = 0
        return number
项目:osedev    作者:damoti    | 项目源码 | 文件源码
def data(self):
        return self.get_query(product=self.product).annotate(complete=Max('complete'))

#def wiki_registrations():
#    return (
#        MediaWikiUser.objects
#        .annotate(year=Substr('registration', 1, 4))
#        .order_by('year')
#        .values('year')
#        .annotate(
#            users=Count('id', distinct=True)
#        )
#    )
#
#
#def wiki_revisions():
#    return (
#        Revision.objects
#        .annotate(year=Substr('rev_timestamp', 1, 4))
#        .order_by('year')
#        .values('year')
#        .annotate(
#            revisions=Count('id')
#        )
#    )
#
#
#def wiki_registrations_year(year):
#    return (
#        MediaWikiUser.objects
#        .annotate(
#            year=Substr('registration', 1, 4),
#            month=Substr('registration', 5, 2)
#        )
#        .filter(year=str(year).encode())
#        .order_by('month')
#        .values('month')
#        .annotate(
#            users=Count('id', distinct=True)
#        )
#    )
#
#
#def wiki_revisions_year(year):
#    return (
#        Revision.objects
#        .annotate(
#            year=Substr('rev_timestamp', 1, 4),
#            month=Substr('rev_timestamp', 5, 2)
#        )
#        .filter(year=str(year).encode())
#        .order_by('month')
#        .values('month')
#        .annotate(
#            revisions=Count('id')
#        )
#    )