Python django.db.models.aggregates 模块,Sum() 实例源码

我们从Python开源项目中,提取了以下26个代码示例,用于说明如何使用django.db.models.aggregates.Sum()

项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def check_expression_support(self, expression):
        bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField)
        bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev)
        if isinstance(expression, bad_aggregates):
            for expr in expression.get_source_expressions():
                try:
                    output_field = expr.output_field
                    if isinstance(output_field, bad_fields):
                        raise NotImplementedError(
                            'You cannot use Sum, Avg, StdDev, and Variance '
                            'aggregations on date/time fields in sqlite3 '
                            'since date/time is saved as text.'
                        )
                except FieldError:
                    # Not every subexpression has an output_field which is fine
                    # to ignore.
                    pass
项目:autostew    作者:Autostew    | 项目源码 | 文件源码
def gap(self):
        if self.race_position == 1:
            return None
        leader = Participant.objects.get(session=self.session, race_position=1)
        if self.session.session_stage.name.startswith("Race") or self.session.finished:
            if self.total_time:
                return self.total_time - leader.total_time
            if leader.current_lap != self.current_lap:
                return None
            return None
            """my_total_time = self.self_or_parent().lap_set.filter(lap__lt=self.current_lap).aggregate(Sum('lap_time'))['lap_time__sum']
            leader_total_time = leader.self_or_parent().lap_set.filter(lap__lt=self.current_lap).aggregate(Sum('lap_time'))['lap_time__sum']
            if my_total_time is None or leader_total_time is None:
                return None
            return my_total_time - leader_total_time"""

        if not self.fastest_lap_time:
            return None
        return self.fastest_lap_time - leader.fastest_lap_time
项目:osedev    作者:damoti    | 项目源码 | 文件源码
def data(self):
        users = User.objects.filter(
            is_current=True,
            id__in=Entry.objects
            .filter(day__gte=self.start, day__lte=self.end)
            .order_by('user_id')
            .values_list('user_id', flat=True)
            .distinct()
        )
        for user in users:
            data = self.get_query(user=user).annotate(minutes=Sum('minutes'))
            yield {
                'color': 'rgb({}, {}, {})'.format(
                    randint(1, 255),
                    randint(1, 255),
                    randint(1, 255),
                ),
                'data': list(self.extrapolate(data)),
                'name': user.first_name+(' '+user.last_name[0]+'.' if user.last_name else ''),
            }
项目:lifesoundtrack    作者:MTG    | 项目源码 | 文件源码
def check_expression_support(self, expression):
        bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField)
        bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev)
        if isinstance(expression, bad_aggregates):
            for expr in expression.get_source_expressions():
                try:
                    output_field = expr.output_field
                    if isinstance(output_field, bad_fields):
                        raise NotImplementedError(
                            'You cannot use Sum, Avg, StdDev, and Variance '
                            'aggregations on date/time fields in sqlite3 '
                            'since date/time is saved as text.'
                        )
                except FieldError:
                    # Not every subexpression has an output_field which is fine
                    # to ignore.
                    pass
项目:liberator    作者:libscie    | 项目源码 | 文件源码
def check_expression_support(self, expression):
        bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField)
        bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev)
        if isinstance(expression, bad_aggregates):
            for expr in expression.get_source_expressions():
                try:
                    output_field = expr.output_field
                    if isinstance(output_field, bad_fields):
                        raise NotImplementedError(
                            'You cannot use Sum, Avg, StdDev, and Variance '
                            'aggregations on date/time fields in sqlite3 '
                            'since date/time is saved as text.'
                        )
                except FieldError:
                    # Not every subexpression has an output_field which is fine
                    # to ignore.
                    pass
项目:djanoDoc    作者:JustinChavez    | 项目源码 | 文件源码
def check_expression_support(self, expression):
        bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField)
        bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev)
        if isinstance(expression, bad_aggregates):
            for expr in expression.get_source_expressions():
                try:
                    output_field = expr.output_field
                    if isinstance(output_field, bad_fields):
                        raise NotImplementedError(
                            'You cannot use Sum, Avg, StdDev, and Variance '
                            'aggregations on date/time fields in sqlite3 '
                            'since date/time is saved as text.'
                        )
                except FieldError:
                    # Not every subexpression has an output_field which is fine
                    # to ignore.
                    pass
项目:dsmr-reader    作者:dennissiemensma    | 项目源码 | 文件源码
def range_statistics(start, end):
    """ Returns the statistics (totals) for a target date. Its month will be used. """
    return DayStatistics.objects.filter(day__gte=start, day__lt=end).aggregate(
        total_cost=Sum('total_cost'),
        electricity1=Sum('electricity1'),
        electricity1_cost=Sum('electricity1_cost'),
        electricity1_returned=Sum('electricity1_returned'),
        electricity2=Sum('electricity2'),
        electricity2_cost=Sum('electricity2_cost'),
        electricity2_returned=Sum('electricity2_returned'),
        electricity_merged=Sum(models.F('electricity1') + models.F('electricity2')),
        electricity_cost_merged=Sum(models.F('electricity1_cost') + models.F('electricity2_cost')),
        electricity_returned_merged=Sum(models.F('electricity1_returned') + models.F('electricity2_returned')),
        gas=Sum('gas'),
        gas_cost=Sum('gas_cost'),
        temperature_min=Min('lowest_temperature'),
        temperature_max=Max('highest_temperature'),
        temperature_avg=Avg('average_temperature'),
    )
项目:django-next-train    作者:bitpixdigital    | 项目源码 | 文件源码
def check_expression_support(self, expression):
        bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField)
        bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev)
        if isinstance(expression, bad_aggregates):
            for expr in expression.get_source_expressions():
                try:
                    output_field = expr.output_field
                    if isinstance(output_field, bad_fields):
                        raise NotImplementedError(
                            'You cannot use Sum, Avg, StdDev, and Variance '
                            'aggregations on date/time fields in sqlite3 '
                            'since date/time is saved as text.'
                        )
                except FieldError:
                    # Not every subexpression has an output_field which is fine
                    # to ignore.
                    pass
项目:LatinSounds_AppEnviaMail    作者:G3ek-aR    | 项目源码 | 文件源码
def check_expression_support(self, expression):
        bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField)
        bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev)
        if isinstance(expression, bad_aggregates):
            for expr in expression.get_source_expressions():
                try:
                    output_field = expr.output_field
                    if isinstance(output_field, bad_fields):
                        raise NotImplementedError(
                            'You cannot use Sum, Avg, StdDev, and Variance '
                            'aggregations on date/time fields in sqlite3 '
                            'since date/time is saved as text.'
                        )
                except FieldError:
                    # Not every subexpression has an output_field which is fine
                    # to ignore.
                    pass
项目:tunga-api    作者:tunga-io    | 项目源码 | 文件源码
def channel_new_messages_annotation(user):
    """
    Queryset needs to annotated with channel_last_read for this to work
    :param user:
    :return:
    """
    return Sum(
        Case(
            When(
                ~Q(action_targets__actor_object_id=user.id) &
                Q(action_targets__gt=F('channel_last_read')) &
                Q(action_targets__verb__in=[verbs.SEND, verbs.UPLOAD]),
                then=1
            ),
            default=0,
            output_field=IntegerField()
        )
    )
项目:osedev    作者:damoti    | 项目源码 | 文件源码
def data(self):
        return self.get_query(user=self.user).annotate(minutes=Sum('minutes'))
项目:osedev    作者:damoti    | 项目源码 | 文件源码
def data(self):
        return self.get_query().annotate(
            minutes=Sum('minutes'),
            users=Count('user', distinct=True)
        )
项目:osedev    作者:damoti    | 项目源码 | 文件源码
def get_queryset(self, request):
        return (
            super().get_queryset(request)
            .annotate(minutes=Coalesce(Sum('entries__minutes'), 0))
            .annotate(last_log=Max('entries__updated'))
        )
项目:DjangoBlog    作者:0daybug    | 项目源码 | 文件源码
def check_expression_support(self, expression):
        bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField)
        bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev)
        if isinstance(expression, bad_aggregates):
            try:
                output_field = expression.input_field.output_field
                if isinstance(output_field, bad_fields):
                    raise NotImplementedError(
                        'You cannot use Sum, Avg, StdDev and Variance aggregations '
                        'on date/time fields in sqlite3 '
                        'since date/time is saved as text.')
            except FieldError:
                # not every sub-expression has an output_field which is fine to
                # ignore
                pass
项目:trydjango18    作者:wei0104    | 项目源码 | 文件源码
def check_expression_support(self, expression):
        bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField)
        bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev)
        if isinstance(expression, bad_aggregates):
            try:
                output_field = expression.input_field.output_field
                if isinstance(output_field, bad_fields):
                    raise NotImplementedError(
                        'You cannot use Sum, Avg, StdDev and Variance aggregations '
                        'on date/time fields in sqlite3 '
                        'since date/time is saved as text.')
            except FieldError:
                # not every sub-expression has an output_field which is fine to
                # ignore
                pass
项目:Django-ERP    作者:andyzsf    | 项目源码 | 文件源码
def get_changeform_initial_data(self, request):
        apps = generic.get_app_model_info_from_request(request)
        obj = getattr(apps,'obj',None)
        current = request.user
        if obj:
            current = obj.user
        sm = Loan.objects.filter(user=current).aggregate(Sum('loan_amount')).get('loan_amount__sum') or 0.00
        return {'loan_amount':sm}
项目:Django-ERP    作者:andyzsf    | 项目源码 | 文件源码
def collection_amount(self):
        return PaymentCollection.objects.filter(so=self).aggregate(Sum('collection_amount')).get('collection_amount__sum') or 0.00
项目:Django-ERP    作者:andyzsf    | 项目源码 | 文件源码
def invoice_amount(self):
        total = Invoice.objects.filter(po=self).aggregate(Sum('vo_amount')).get('vo_amount__sum') or 0.00
        return total
项目:Django-ERP    作者:andyzsf    | 项目源码 | 文件源码
def payed_amount(self):
        return Payment.objects.filter(po=self).aggregate(Sum('py_amount')).get('py_amount__sum') or 0.00
项目:dsmr-reader    作者:dennissiemensma    | 项目源码 | 文件源码
def electricity_tariff_percentage(start_date):
    """ Returns the total electricity consumption percentage by tariff (high/low tariff). """
    totals = DayStatistics.objects.filter(day__gte=start_date).aggregate(
        electricity1=Sum('electricity1'),
        electricity2=Sum('electricity2'),
    )

    # Empty data will crash.
    if not all(totals.values()):
        return None

    global_total = totals['electricity1'] + totals['electricity2']
    totals['electricity1'] = math.ceil(totals['electricity1'] / global_total * 100)
    totals['electricity2'] = 100 - totals['electricity1']
    return totals
项目:finem_imperii    作者:jardiacaj    | 项目源码 | 文件源码
def get_initial_manpower(self):
        return BattleUnit.objects.filter(battle_organization=self).\
            aggregate(Sum('starting_manpower'))['starting_manpower__sum']
项目:our-book    作者:wayhome25    | 项目源码 | 文件源码
def get_total_price(cls, year, month):
        total_price = cls.objects.filter(created_at__year=year, created_at__month=month).aggregate(total=Coalesce(Sum('price'), 0))
        return total_price['total']
项目:django-wechat-api    作者:crazy-canux    | 项目源码 | 文件源码
def check_expression_support(self, expression):
        bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField)
        bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev)
        if isinstance(expression, bad_aggregates):
            try:
                output_field = expression.input_field.output_field
                if isinstance(output_field, bad_fields):
                    raise NotImplementedError(
                        'You cannot use Sum, Avg, StdDev and Variance aggregations '
                        'on date/time fields in sqlite3 '
                        'since date/time is saved as text.')
            except FieldError:
                # not every sub-expression has an output_field which is fine to
                # ignore
                pass
项目:CommunityCellularManager    作者:facebookincubator    | 项目源码 | 文件源码
def timeseries(self, kind=None, **kwargs):
        """Get GPRS timeseries.

        Wraps StatsClientBase.aggregate_timeseries with some filtering
          capabilities.

        Note that GPRS UEs are all of the type "gprs," but each event of this
        type contains a count for uploaded and downloaded bytes.

        Args:
            kind: the kind of GPRS UsageEvent to query for, valid values are
                  downloaded_data, uploaded_data and the default, total_data.
                  The default will return the sum of downloaded and uploaded.

        Keyword Args:
            start_time_epoch, end_time_epoch, interval: are all passed on to
            StatsClientBase.aggregate_timeseries
        """
        start_time_epoch = kwargs.pop('start_time_epoch', 0)
        end_time_epoch = kwargs.pop('end_time_epoch', -1)
        interval = kwargs.pop('interval', 'months')
        if kind in (None, 'total_data', 'uploaded_data'):
            uploaded_usage = self.aggregate_timeseries(
                'gprs', aggregation='up_byte_count', interval=interval,
                start_time_epoch=start_time_epoch,
                end_time_epoch=end_time_epoch)
            uploaded_usage = self.convert_to_megabytes(uploaded_usage)
        if kind in (None, 'total_data', 'downloaded_data'):
            downloaded_usage = self.aggregate_timeseries(
                'gprs', aggregation='down_byte_count', interval=interval,
                start_time_epoch=start_time_epoch,
                end_time_epoch=end_time_epoch)
            downloaded_usage = self.convert_to_megabytes(downloaded_usage)
        if kind == 'uploaded_data':
            return uploaded_usage
        elif kind == 'downloaded_data':
            return downloaded_usage
        elif kind in (None, 'total_data'):
            # Sum uploaded and downloaded.
            up_values = [v[1] for v in uploaded_usage]
            down_values = [v[1] for v in downloaded_usage]
            totals = [sum(i) for i in zip(up_values, down_values)]
            # The dates are all the same for uploaded and downloaded, so we'll
            # just use the uploaded usage dates.
            dates = [v[0] for v in uploaded_usage]
            return zip(dates, totals)
项目:tunga-api    作者:tunga-io    | 项目源码 | 文件源码
def handle(self, *args, **options):
        """
        Send new message notifications
        """
        # command to run: python manage.py tunga_send_customer_emails
        min_date = datetime.datetime.utcnow() - relativedelta(minutes=1)  # 5 minute window to read new messages

        customer_channels = Channel.objects.filter(
            type=CHANNEL_TYPE_SUPPORT,
            created_by__isnull=True,
            content_type=ContentType.objects.get_for_model(Inquirer)
        ).annotate(new_messages=Sum(
            Case(
                When(
                    ~Q(action_targets__actor_content_type=F('content_type')) &
                    Q(action_targets__gt=F('last_read')) &
                    Q(action_targets__timestamp__lte=min_date) &
                    Q(action_targets__verb__in=[verbs.SEND, verbs.UPLOAD]),
                    then=1
                ),
                default=0,
                output_field=IntegerField()
            )
        ), latest_message=Max('action_targets__id')).filter(new_messages__gt=0)

        for channel in customer_channels:
            customer = channel.content_object
            if customer.email:
                activities = Action.objects.filter(
                    channels=channel, id__gt=channel.last_read, verb__in=[verbs.SEND]
                ).order_by('id')

                messages = [activity.action_object for activity in activities]

                if messages:
                    to = [customer.email]
                    subject = "[Tunga Support] Help"
                    ctx = {
                        'customer': customer,
                        'count': channel.new_messages,
                        'messages': messages,
                        'channel_url': '%s/customer/help/%s/' % (TUNGA_URL, channel.id)
                    }

                    if send_mail(subject, 'tunga/email/unread_help_messages', to, ctx):
                        channel.last_read = channel.latest_message
                        channel.save()
项目:tunga-api    作者:tunga-io    | 项目源码 | 文件源码
def handle(self, *args, **options):
        """
        Send new message notifications
        """
        # command to run: python manage.py tunga_send_message_emails
        utc_now = datetime.datetime.utcnow()
        min_date = utc_now - relativedelta(minutes=15)  # 15 minute window to read new messages
        min_last_email_date = utc_now - relativedelta(hours=3)  # Limit to 1 email every 3 hours per channel
        commission_date = parse('2016-08-08 00:00:00')  # Don't notify about events before the commissioning date

        user_channels = ChannelUser.objects.filter(
            Q(last_email_at__isnull=True) |
            Q(last_email_at__lt=min_last_email_date)
        ).annotate(new_messages=Sum(
            Case(
                When(
                    ~Q(channel__action_targets__actor_object_id=F('user_id')) &
                    Q(channel__action_targets__gt=F('last_read')) &
                    Q(channel__action_targets__timestamp__lte=min_date) &
                    Q(channel__action_targets__timestamp__gte=commission_date) &
                    (Q(last_email_at__isnull=True) | Q(channel__action_targets__timestamp__gt=F('last_email_at'))) &
                    Q(channel__action_targets__verb__in=[verbs.SEND, verbs.UPLOAD]),
                    then=1
                ),
                default=0,
                output_field=IntegerField()
            )
        )).filter(new_messages__gt=0)

        for user_channel in user_channels:
            channel_name = user_channel.channel.get_channel_display_name(user_channel.user)

            to = [user_channel.user.email]
            if user_channel.channel.type == CHANNEL_TYPE_DIRECT:
                conversation_subject = "New message{} from {}".format(
                    user_channel.new_messages == 1 and '' or 's',
                    channel_name
                )
            else:
                conversation_subject = "Conversation: {}".format(channel_name)
            subject = conversation_subject
            ctx = {
                'receiver': user_channel.user,
                'new_messages': user_channel.new_messages,
                'channel_name': channel_name,
                'channel': user_channel.channel,
                'channel_url': '%s/conversation/%s/' % (TUNGA_URL, user_channel.channel.id)
            }

            if send_mail(subject, 'tunga/email/unread_channel_messages', to, ctx):
                user_channel.last_email_at = datetime.datetime.utcnow()
                user_channel.save()