Python django.db.models.aggregates 模块,Max() 实例源码

我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用django.db.models.aggregates.Max()

项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def save(self, *args, **kwargs):
        if self.parent and self.share_of:
            raise ValueError("Can't be both a reply and a share!")
        self.cache_data()

        if self.parent:
            self.content_type = ContentType.REPLY
            # Ensure replies have sane values
            self.visibility = self.parent.visibility
            self.pinned = False
        elif self.share_of:
            self.content_type = ContentType.SHARE

        if not self.pk:
            if not self.guid:
                self.guid = uuid4()
            if self.pinned:
                max_order = Content.objects.top_level().filter(author=self.author).aggregate(Max("order"))["order__max"]
                if max_order is not None:  # If max_order is None, there is likely to be no content yet
                    self.order = max_order + 1

        self.fix_local_uploads()
        super().save(*args, **kwargs)
        self.cache_related_object_data()
项目:prestashop-sync    作者:dragoon    | 项目源码 | 文件源码
def search_shops_on_forum(force=False):
    # Get member pages
    step = 500
    last_page = page_number = (Member.objects.aggregate(Max('page_number')) and not force) or 1
    page_url = 'http://www.prestashop.com/forums/members/page__sort_key__members_display_name__sort_order__asc__max_results__%d__st__%d' % (step, (last_page-1)*step)
    while page_url:
        page = document_fromstring(urllib2.urlopen(page_url).read())
        for member in page.cssselect('ul.members li h3.bar a:first'):
            # member url
            Member.objects.get_or_create(link=member.get('href'), defaults={'page_number':page_number})
        page_url = page.cssselect('ul.pagination.left li.next a').get('href')
        page_number+=1

    for member in Member.objects.filter(page_number__gte=last_page):
        member_page = document_fromstring(urllib2.urlopen(member.link).read())
        for link in member_page.cssselect('div.general_box div.signature a'):
            ShopLink.objects.get_or_create(link=link.get('href'), member=member)
项目:prestashop-sync    作者:dragoon    | 项目源码 | 文件源码
def search_shops_on_rus_forum(force=False):
    last_page = (MemberRus.objects.aggregate(Max('page_number')) and not force) or 1

    for i in range(last_page, 4219):
        page_url = 'http://prestadev.ru/forum/profile.php?u='+str(i)
        page = document_fromstring(urllib2.urlopen(page_url).read())
        messages = 0
        try:
            messages = int(page.cssselect('div.wttborder td strong')[2].text.strip())
        except:
            pass
        try:
            params = {'title': page.cssselect('#profilename')[0].text.strip(),
                  'messages': messages,
                  'page_number': i,
                  'home_page':page.cssselect('div.wttborder td.row1')[4]}
        except IndexError:
            continue
        member = MemberRus.objects.get_or_create(**params)[0]
        for link in page.cssselect('div.wgborder td.row1 a'):
            ShopLinkRus.objects.get_or_create(link=link.get('href'), member=member)
项目:dsmr-reader    作者:dennissiemensma    | 项目源码 | 文件源码
def range_statistics(start, end):
    """ Returns the statistics (totals) for a target date. Its month will be used. """
    return DayStatistics.objects.filter(day__gte=start, day__lt=end).aggregate(
        total_cost=Sum('total_cost'),
        electricity1=Sum('electricity1'),
        electricity1_cost=Sum('electricity1_cost'),
        electricity1_returned=Sum('electricity1_returned'),
        electricity2=Sum('electricity2'),
        electricity2_cost=Sum('electricity2_cost'),
        electricity2_returned=Sum('electricity2_returned'),
        electricity_merged=Sum(models.F('electricity1') + models.F('electricity2')),
        electricity_cost_merged=Sum(models.F('electricity1_cost') + models.F('electricity2_cost')),
        electricity_returned_merged=Sum(models.F('electricity1_returned') + models.F('electricity2_returned')),
        gas=Sum('gas'),
        gas_cost=Sum('gas_cost'),
        temperature_min=Min('lowest_temperature'),
        temperature_max=Max('highest_temperature'),
        temperature_avg=Avg('average_temperature'),
    )
项目:tunga-api    作者:tunga-io    | 项目源码 | 文件源码
def annotate_channel_queryset_with_latest_activity_at(queryset, user):
    return queryset.annotate(
        latest_activity_timestamp=Max('action_targets__timestamp'),
    ).annotate(
        latest_activity_at=Case(
            When(
                latest_activity_timestamp__isnull=True,
                then='created_at'
            ),
            When(
                latest_activity_timestamp__gt=F('created_at'),
                then='latest_activity_timestamp'
            ),
            default='created_at',
            output_field=DateTimeField()
        )
    )
项目:zing    作者:evernote    | 项目源码 | 文件源码
def max_pk():
        """Returns the sum of all the highest PKs for each submodel."""
        return reduce(
            lambda x, y: x + y,
            [int(p.objects.aggregate(Max('pk')).values()[0] or 0)
             for p in AbstractPage.__subclasses__()],
        )
项目:wifi-attendance    作者:elvinzeng    | 项目源码 | 文件源码
def get(self, request):
        if request.user.is_authenticated():
            histories = OnlineHistory.objects.filter(mac=request.user.username)\
                .values('date').annotate(min_time=Min('time'), max_time=Max('time')).order_by("-date")
            return render(request, "index.html", locals())
        else:
            verification_token = str(uuid.uuid4())
            request.session["verification_token"] = verification_token
            return render(request, "authentication.html", locals())
项目:wifi-attendance    作者:elvinzeng    | 项目源码 | 文件源码
def get(self, request):
        if request.user.has_perm("mobile_scanner.view_staffonlinehistory"):
            histories = OnlineHistory.objects.filter()\
                .values('date', 'user__last_name', 'user__first_name').annotate(min_time=Min('time'), max_time=Max('time')).order_by("-date")
            return render(request, "staff.html", locals())
        else:
            msg = "?????"
            return render(request, "msg.html", locals())
项目:osedev    作者:damoti    | 项目源码 | 文件源码
def get_queryset(self, request):
        return (
            super().get_queryset(request)
            .annotate(minutes=Coalesce(Sum('entries__minutes'), 0))
            .annotate(last_log=Max('entries__updated'))
        )
项目:osedev    作者:damoti    | 项目源码 | 文件源码
def data(self):
        return self.get_query(product=self.product).annotate(complete=Max('complete'))

#def wiki_registrations():
#    return (
#        MediaWikiUser.objects
#        .annotate(year=Substr('registration', 1, 4))
#        .order_by('year')
#        .values('year')
#        .annotate(
#            users=Count('id', distinct=True)
#        )
#    )
#
#
#def wiki_revisions():
#    return (
#        Revision.objects
#        .annotate(year=Substr('rev_timestamp', 1, 4))
#        .order_by('year')
#        .values('year')
#        .annotate(
#            revisions=Count('id')
#        )
#    )
#
#
#def wiki_registrations_year(year):
#    return (
#        MediaWikiUser.objects
#        .annotate(
#            year=Substr('registration', 1, 4),
#            month=Substr('registration', 5, 2)
#        )
#        .filter(year=str(year).encode())
#        .order_by('month')
#        .values('month')
#        .annotate(
#            users=Count('id', distinct=True)
#        )
#    )
#
#
#def wiki_revisions_year(year):
#    return (
#        Revision.objects
#        .annotate(
#            year=Substr('rev_timestamp', 1, 4),
#            month=Substr('rev_timestamp', 5, 2)
#        )
#        .filter(year=str(year).encode())
#        .order_by('month')
#        .values('month')
#        .annotate(
#            revisions=Count('id')
#        )
#    )
项目:tunga-api    作者:tunga-io    | 项目源码 | 文件源码
def handle(self, *args, **options):
        """
        Send new message notifications
        """
        # command to run: python manage.py tunga_send_customer_emails
        min_date = datetime.datetime.utcnow() - relativedelta(minutes=1)  # 5 minute window to read new messages

        customer_channels = Channel.objects.filter(
            type=CHANNEL_TYPE_SUPPORT,
            created_by__isnull=True,
            content_type=ContentType.objects.get_for_model(Inquirer)
        ).annotate(new_messages=Sum(
            Case(
                When(
                    ~Q(action_targets__actor_content_type=F('content_type')) &
                    Q(action_targets__gt=F('last_read')) &
                    Q(action_targets__timestamp__lte=min_date) &
                    Q(action_targets__verb__in=[verbs.SEND, verbs.UPLOAD]),
                    then=1
                ),
                default=0,
                output_field=IntegerField()
            )
        ), latest_message=Max('action_targets__id')).filter(new_messages__gt=0)

        for channel in customer_channels:
            customer = channel.content_object
            if customer.email:
                activities = Action.objects.filter(
                    channels=channel, id__gt=channel.last_read, verb__in=[verbs.SEND]
                ).order_by('id')

                messages = [activity.action_object for activity in activities]

                if messages:
                    to = [customer.email]
                    subject = "[Tunga Support] Help"
                    ctx = {
                        'customer': customer,
                        'count': channel.new_messages,
                        'messages': messages,
                        'channel_url': '%s/customer/help/%s/' % (TUNGA_URL, channel.id)
                    }

                    if send_mail(subject, 'tunga/email/unread_help_messages', to, ctx):
                        channel.last_read = channel.latest_message
                        channel.save()
项目:tunga-api    作者:tunga-io    | 项目源码 | 文件源码
def update_task_pm_updates(task):
    task = clean_instance(task, Task)

    target_task = task
    if task.parent:
        # for sub-tasks, create all pm updates on the project
        target_task = task.parent

    if target_task.closed or target_task.is_task or not target_task.approved or not target_task.pm:
        # only request pm updates for project which are approved and not closed
        return

    if target_task.update_interval and target_task.update_interval_units:
        periodic_start_date = ProgressEvent.objects.filter(
            Q(task=target_task) | Q(task__parent=target_task), type=PROGRESS_EVENT_TYPE_PM
        ).aggregate(latest_date=Max('due_at'))['latest_date']

        now = datetime.datetime.utcnow()
        if periodic_start_date and periodic_start_date > now:
            return

        if not periodic_start_date:
            periodic_start_date = datetime.datetime.utcnow()

        if periodic_start_date:
            last_update_at = clean_update_datetime(periodic_start_date, target_task)
            while True:
                last_update_day = last_update_at.weekday()
                next_update_at = last_update_at
                if last_update_day < 3:
                    # Last was before Thursday so schedule for Thursday
                    next_update_at += relativedelta(days=3 - last_update_day)
                else:
                    # Last was on after Thursday so schedule for Monday
                    next_update_at += relativedelta(days=7 - last_update_day)

                if next_update_at >= now:
                    future_by_18_hours = now + relativedelta(hours=18)
                    if next_update_at <= future_by_18_hours and (
                        not target_task.deadline or next_update_at < target_task.deadline):
                        num_updates_within_on_same_day = ProgressEvent.objects.filter(
                            task=target_task, type=PROGRESS_EVENT_TYPE_PM,
                            due_at__contains=next_update_at.date()
                        ).count()

                        if num_updates_within_on_same_day == 0:
                            # Schedule at most one pm update for any day
                            ProgressEvent.objects.update_or_create(
                                task=target_task, type=PROGRESS_EVENT_TYPE_PM,
                                due_at=next_update_at, defaults={'title': 'PM Report'}
                            )
                    break
                else:
                    last_update_at = next_update_at
项目:tunga-api    作者:tunga-io    | 项目源码 | 文件源码
def update_task_client_surveys(task):
    task = clean_instance(task, Task)

    target_task = task
    if task.parent:
        # for sub-tasks, create all surveys on the project
        target_task = task.parent

    if target_task.closed or not (
            target_task.survey_client and target_task.approved and target_task.active_participants):
        # only conduct survey for approved tasks that have been assigned devs and aren't closed
        return

    if target_task.update_interval and target_task.update_interval_units:
        periodic_start_date = ProgressEvent.objects.filter(
            Q(task=target_task) | Q(task__parent=target_task), type=PROGRESS_EVENT_TYPE_CLIENT
        ).aggregate(latest_date=Max('due_at'))['latest_date']

        now = datetime.datetime.utcnow()
        if periodic_start_date and periodic_start_date > now:
            return

        if not periodic_start_date:
            periodic_start_date = datetime.datetime.utcnow()

        if periodic_start_date:
            last_update_at = clean_update_datetime(periodic_start_date, target_task)
            while True:
                last_update_day = last_update_at.weekday()
                # Schedule next survey for Monday
                next_update_at = last_update_at + relativedelta(days=7 - last_update_day)

                if next_update_at >= now:
                    future_by_18_hours = now + relativedelta(hours=18)
                    if next_update_at <= future_by_18_hours and (
                        not target_task.deadline or next_update_at < target_task.deadline):
                        num_updates_on_same_day = ProgressEvent.objects.filter(
                            task=target_task, type=PROGRESS_EVENT_TYPE_CLIENT,
                            due_at__contains=next_update_at.date()
                        ).count()

                        if num_updates_on_same_day == 0:
                            # Schedule at most one survey for any day
                            ProgressEvent.objects.update_or_create(
                                task=target_task, type=PROGRESS_EVENT_TYPE_CLIENT,
                                due_at=next_update_at, defaults={'title': 'Weekly Survey'}
                            )
                    break
                else:
                    last_update_at = next_update_at