Python mongoengine 模块,Q 实例源码

我们从Python开源项目中,提取了以下12个代码示例,用于说明如何使用mongoengine.Q

项目:AppBackend    作者:540871129    | 项目源码 | 文件源码
def scene_search(request):
    if request.method == "POST":
        try:
            skip = int(request.POST.get('skip', 0))
        except TypeError:
            skip = 0
        except ValueError:
            skip = 0
        try:
            limit = int(request.POST.get('limit', 5))
        except TypeError:
            limit = 5
        except ValueError:
            limit = 5
        # # ?????????
        # try:
        #     latitude = float(request.POST.get('latitude', None))
        #     longitude = float(request.POST.get('longitude', None))
        # except:
        #     return JsonResponse(resultMsg['CoordinatesError'])
        # coordinates = [longitude, latitude]     # ?????

        # ??????
        content = request.POST.get('searchContent', None)
        if not content:
            return JsonResponse(resultMsg['NeedParameter'])
        lists = Scene.objects(
            (Q(name={"$regex": content}) |
            Q(city={"$regex": content}) |
            Q(province={"$regex": content})),
            status='online'
            # location__near=coordinates, location__max_distance=50000
        ).all()[skip: limit]
        lists = json.loads(lists.to_json())
        map(more_replace, lists)
        return JsonResponse({"result": lists})
    raise Http404
项目:mist.api    作者:mistio    | 项目源码 | 文件源码
def q(self):
        return me.Q()
项目:mist.api    作者:mistio    | 项目源码 | 文件源码
def owner_query(self):
        return me.Q(owner=self.owner)
项目:mist.api    作者:mistio    | 项目源码 | 文件源码
def q(self):
        if self.operator == 'eq':
            return me.Q(**{self.field: self.value})
        return me.Q(**{'%s__%s' % (self.field, self.operator): self.value})
项目:mist.api    作者:mistio    | 项目源码 | 文件源码
def q(self):
        rtype = self._instance.condition_resource_cls._meta["collection"]
        ids = set()
        for key, value in self.tags.iteritems():
            query = {
                'owner': self._instance.owner,
                'resource_type': rtype,
                'key': key,
            }
            if value:
                query['value'] = value
            ids |= set(tag.resource.id
                       for tag in Tag.objects(**query).only('resource'))
        return me.Q(id__in=ids)
项目:mist.api    作者:mistio    | 项目源码 | 文件源码
def q(self):
        return me.Q(id__in=self.ids)
项目:mist.api    作者:mistio    | 项目源码 | 文件源码
def _list_images__fetch_images(self, search=None):
        default_images = config.EC2_IMAGES[self.cloud.region]
        image_ids = default_images.keys() + self.cloud.starred
        if not search:
            try:
                # this might break if image_ids contains starred images
                # that are not valid anymore for AWS
                images = self.connection.list_images(None, image_ids)
            except Exception as e:
                bad_ids = re.findall(r'ami-\w*', e.message, re.DOTALL)
                for bad_id in bad_ids:
                    self.cloud.starred.remove(bad_id)
                self.cloud.save()
                images = self.connection.list_images(None,
                                                     default_images.keys() +
                                                     self.cloud.starred)
            for image in images:
                if image.id in default_images:
                    image.name = default_images[image.id]
            images += self.connection.list_images(ex_owner='self')
        else:
            image_models = CloudImage.objects(
                me.Q(cloud_provider=self.connection.type,
                     image_id__icontains=search) |
                me.Q(cloud_provider=self.connection.type,
                     name__icontains=search)
            )[:200]
            images = [NodeImage(id=image.image_id, name=image.name,
                                driver=self.connection, extra={})
                      for image in image_models]
            if not images:
                # Actual search on EC2.
                images = self.connection.list_images(
                    ex_filters={'name': '*%s*' % search}
                )
        return images
项目:mist.api    作者:mistio    | 项目源码 | 文件源码
def remove_tags_from_resource(owner, resource_obj, tags, *args, **kwargs):
    """
    This function get a list of tags in the form [{'key': 'joe'}] or
    [{'key': 'joe', 'value': 'schmoe'}] and will delete them from the resource
    :param owner: the resource owner
    :param resource_obj: the resource object where the tags will be added
    :param rtype: resource type
    :param tags: list of tags to be deleted
    """
    # ensure there are no duplicate tag keys because mongoengine will
    # raise exception for duplicates in query
    key_list = list(set(tags))

    # create a query that will return all the tags with
    query = reduce(lambda q1, q2: q1.__or__(q2),
                   map(lambda key: Q(key=key), key_list))

    Tag.objects(Q(owner=owner) & Q(resource=resource_obj) & (query)).delete()

    # I think that the above overly complex query could simply be rewritten as
    # Tag.objects(owner=owner, resource=resource_obj,
    #             key__in=key_list).delete()

    # SEC
    owner.mapper.update(resource_obj)

    rtype = resource_obj._meta["collection"]

    trigger_session_update(owner,
                           [rtype + 's' if not rtype.endswith('s') else rtype])
项目:mist.api    作者:mistio    | 项目源码 | 文件源码
def owner_query(self):
        return me.Q(cloud__in=Cloud.objects(owner=self.owner).only('id'))
项目:ethereum_scanner    作者:chepe4pi    | 项目源码 | 文件源码
def get_queryset(self):
        q_list = []
        users_folows_list = Follow.objects.filter(user=self.request.user)
        for follow in users_folows_list:
            q_list.append(Q(fromAddress=follow.address, timestamp__gte=follow.created.timestamp()))
            q_list.append(Q(toAddress=follow.address, timestamp__gte=follow.created.timestamp()))
        if q_list:
            query = q_list.pop()
            for item in q_list:
                query |= item
            return EthTransactions.objects.filter(query)
        return EthTransactions.objects.none()
项目:bankex-bot    作者:BankEx    | 项目源码 | 文件源码
def _show_item(self, **kwargs):
        if self.user.offset < 0:
            self.send(self.t('NO_PREV'))
            return False

        condition = Q(bc_hash__ne=None) & \
                    Q(type=self.user.filter_buy_type)
        # Q(price__lte=self.user.filter_price)

        offers = Offer.objects(condition) \
            .skip(self.user.offset) \
            .limit(1)

        offer = None
        for item in offers:
            offer = item
            break

        if offer is None:
            self.send(self.t('NO_MORE'))
            return False

        try:
            path = offer.get_image_path(self.user.lang)
            if not path:
                raise Exception
            with open(path, 'rb') as f:
                inline_keyboard = InlineKeyboard(keyboard=[
                    [InlineKeyboardButton(text=self.t('BUY_CONTRACT'), callback_data=offer.get_id())]
                ])
                self.send_photo(
                    files=(('photo', path, f.read()),),
                    reply_markup=inline_keyboard
                )
        except Exception:
            if options.debug:
                traceback.print_exc()
            self.logger.error(offer.get_id())
            self.send(self.t('IMAGE_NOT_FOUND'))

        return True
项目:dogbot    作者:moondropx    | 项目源码 | 文件源码
def twitter(bot, message):
    """#twitter [-p ??]

    -p : ????
    """

    try:
        cmd, *args = shlex.split(message.text)
    except ValueError:
        return False
    if not cmd[0] in config['trigger']:
        return False
    if not cmd[1:] == 'twitter':
        return False
    try:
        options, args = getopt.gnu_getopt(args, 'hp:')
    except getopt.GetoptError:
        # ????
        reply(bot, message, twitter.__doc__)
        return True

    days = 0
    for o, a in options:
        if o == '-p':
            # ????
            try:
                days = int(a)
                if days < 0:
                    raise ValueError
            except ValueError:
                reply(bot, message, twitter.__doc__)
                return True
        elif o == '-h':
            # ??
            reply(bot, message, twitter.__doc__)
            return True
    tweets = Twitter.objects(Q(date__gte=datetime.now().date()+timedelta(days=-days)) & Q(date__lte=datetime.now().date()+timedelta(days=-days+1)))

    if tweets:
        reply(bot, message, '\n---------\n'.join([str(tweet) for tweet in tweets]))
        return True
    else:
        reply(bot, message, '??????...')
        return True