Python sqlalchemy.orm 模块,load_only() 实例源码

我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用sqlalchemy.orm.load_only()

项目:csirtg-smrt-py    作者:csirtgadgets    | 项目源码 | 文件源码
def cache_provider(self, provider):
        if self.memcached_provider == provider:
            return

        self.memcached_provider = provider
        self.memcache = {}
        logger.info("Caching archived indicators for provider {}".format(provider))
        q = self.handle().query(Indicator) \
            .filter_by(provider=provider) \
            .order_by(asc(Indicator.lasttime), asc(Indicator.firsttime), asc(Indicator.created_at))

        q = q.options(load_only("indicator", "group", "tags", "firsttime", "lasttime"))
        q = q.yield_per(1000)
        for i in q:
            self.memcache[i.indicator] = (i.group, i.tags, i.firsttime, i.lasttime)

        logger.info("Cached provider {} in memory, {} objects".format(provider, len(self.memcache)))
项目:osm-wikidata    作者:EdwardBetts    | 项目源码 | 文件源码
def wbgetentities(self, debug=False):
        sub = (session.query(Item.item_id)
                      .join(ItemTag)
                      .group_by(Item.item_id)
                      .subquery())
        q = (self.items.filter(Item.item_id == sub.c.item_id)
                       .options(load_only(Item.qid)))

        if debug:
            print('running wbgetentities query')
            print(q)
            print(q.count())
        items = {i.qid: i for i in q}
        if debug:
            print('{} items'.format(len(items)))

        for qid, entity in wikidata.entity_iter(items.keys(), debug=debug):
            if debug:
                print(qid)
            items[qid].entity = entity
项目:osm-wikidata    作者:EdwardBetts    | 项目源码 | 文件源码
def space():
    overpass_dir = app.config['OVERPASS_DIR']
    files = [{'file': f, 'size': f.stat().st_size} for f in os.scandir(overpass_dir) if '_' not in f.name and f.name.endswith('.xml')]
    files.sort(key=lambda f: f['size'], reverse=True)
    files = files[:200]

    place_lookup = {int(f['file'].name[:-4]): f for f in files}
    # q = Place.query.outerjoin(Changeset).filter(Place.place_id.in_(place_lookup.keys())).add_columns(func.count(Changeset.id))
    q = (database.session.query(Place, func.count(Changeset.id))
                         .outerjoin(Changeset)
                         .filter(Place.place_id.in_(place_lookup.keys()))
                         .options(load_only(Place.place_id, Place.display_name, Place.state))
                         .group_by(Place.place_id, Place.display_name, Place.state))
    for place, num in q:
        place_id = place.place_id
        place_lookup[place_id]['place'] = place
        place_lookup[place_id]['changesets'] = num

    return render_template('space.html', files=files)
项目:Spirit    作者:TunnelBlanket    | 项目源码 | 文件源码
def handleGetPlayerInfoById(self, data):


    playerId = data[4]

    if playerId.isdigit():
        playerId = int(playerId)

        if playerId == self.user.Id:
            playerSwid = self.user.Swid
            username = self.user.Username

        else:
            playerModel = self.session.query(User).options(load_only("Username", "Swid")) \
                .filter_by(Id=playerId).first()

            if playerModel is None:
                return

            username = playerModel.Username
            playerSwid = playerModel.Swid

        self.sendXt("pbi", playerSwid, playerId, username)
项目:pygcam    作者:JGCRI    | 项目源码 | 文件源码
def getTimeSeries(self, simId, paramName, expList):
        '''
        Retrieve all timeseries rows for the given simId and paramName.

        :param simId: simulation ID
        :param paramName: name of output parameter
        :param expList: (list of str) the names of the experiments to select
           results for.
        :return: list of TimeSeries tuples or None
        '''
        cols = ['seriesId', 'runId', 'outputId', 'units'] + self.yearCols()

        with self.sessionScope() as session:
            query = session.query(TimeSeries, Experiment.expName).options(load_only(*cols)). \
                join(Run).filter_by(simId=simId).filter_by(status='succeeded'). \
                join(Experiment).filter(Experiment.expName.in_(expList)). \
                join(Output).filter_by(name=paramName)

            rslt = query.all()
            return rslt


# Single instance of the class. Use 'getDatabase' constructor
# to ensure that this instance is returned if already created.
项目:uchan    作者:Floens    | 项目源码 | 文件源码
def get_all_board_names() -> List[str]:
    local_cached = local_cache.get('all_board_names')
    if local_cached:
        return local_cached

    all_board_names_cached = cache.get(cache_key('all_board_names'))
    if all_board_names_cached is not None:
        # No need to map a list of strings
        res = all_board_names_cached
    else:
        with session() as s:
            q = s.query(BoardOrmModel).options(load_only('name')).order_by(BoardOrmModel.name)
            # No mapping here either
            res = list(map(lambda i: i.name, q.all()))
            s.commit()

        cache.set(cache_key('all_board_names'), res)

    local_cache.set('all_board_names', res)

    return res
项目:osm-wikidata    作者:EdwardBetts    | 项目源码 | 文件源码
def get_top_existing(limit=39):
    cols = [Place.place_id, Place.display_name, Place.area, Place.state,
            Place.candidate_count, Place.item_count]
    c = func.count(Changeset.place_id)

    q = (Place.query.filter(Place.state.in_(['ready', 'refresh']),
                            Place.area > 0,
                            Place.candidate_count > 4)
                    .options(load_only(*cols))
                    .outerjoin(Changeset)
                    .group_by(*cols)
                    .having(c == 0)
                    .order_by((Place.item_count / Place.area).desc()))
    return q[:limit]
项目:flask-maple    作者:honmaple    | 项目源码 | 文件源码
def load_only(self, *columns):
        return self.options(load_only(*columns))
项目:dino    作者:thenetcircle    | 项目源码 | 文件源码
def get_all_user_ids(self, session=None) -> list:
        from sqlalchemy.orm import load_only
        users = session.query(Users).options(load_only('uuid')).all()
        return [user.uuid for user in users]
项目:maple-file    作者:honmaple    | 项目源码 | 文件源码
def load_only(self, *columns):
        return self.options(load_only(*columns))
项目:eggsnspam    作者:wayfair    | 项目源码 | 文件源码
def list_breakfasts():
    """List all breakfasts currently in the database"""
    breakfasts = Breakfast.query.with_hint(Breakfast, "WITH (NOLOCK)").options(load_only("id")).all()
    return jsonify(breakfasts=[{"id": x.id} for x in breakfasts])
项目:eggsnspam    作者:wayfair    | 项目源码 | 文件源码
def list_ingredients():
    """List all ingredients currently in the database"""
    ingredients = Ingredient.query.with_hint(Ingredient, "WITH (NOLOCK)").options(load_only("id")).all()
    return jsonify(ingredients=[{"id": x.id} for x in ingredients])
项目:stream2segment    作者:rizac    | 项目源码 | 文件源码
def getseg(session, segment_id, cols2load=None):
    '''Returns the segment identified by id `segment_id` by querying the session and,
    if not found, by querying the database
    :param cols2load: if the db has to be queried, specifies a list of columns to load. E.g.:
    `cols2load=[Segment.id]`
    '''
    seg = session.query(Segment).get(segment_id)
    if seg:
        return seg
    query = session.query(Segment).filter(Segment.id == segment_id)
    if cols2load:
        query = query.options(load_only(*cols2load))
    return query.first()
项目:stream2segment    作者:rizac    | 项目源码 | 文件源码
def toggle_class_id(session, segment_id, class_id):
    segment = session.query(Segment).options(load_only(Segment.id)).first()
    clz = segment.classes
    if any(c.id == class_id for c in clz):
        segment.del_classes(class_id)
    else:
        segment.add_classes(class_id)
    # re-query the database to be sure:
    return {'classes': get_classes(session),
            'segment_class_ids': get_classes(session, segment_id)}
项目:hoaxy-backend    作者:IUNetSci    | 项目源码 | 文件源码
def is_url_parsed(self, *url_tuple):
        """Test whether this URL is parsed.

        Before parse the article, test whether this URL is parsed before.
        If so, ignore and return True. Otherwise, need to parse, return False.

        Parameters
        ----------
        url_tuple : tuple
            Tuple (url_id, created_at, canonical)

        Returns
        -------
        bool
            Whether this URL is parsed or not.
        """
        url_id, created_at, canonical = url_tuple
        marticle = self.session.query(Article)\
            .filter_by(canonical_url=canonical)\
            .options(load_only('id', 'date_captured'))\
            .one_or_none()
        # marticle exists
        # update date_captured of this article
        if marticle is not None:
            # marticle.date_captured > article['date_captured']
            if marticle.date_captured > created_at:
                marticle.date_captured = created_at
            try:
                self.session.query(Url).filter_by(id=url_id).update(
                    dict(article_id=marticle.id, status_code=U_WP_SUCCESS))
                self.session.commit()
                return True
            except SQLAlchemyError as e:
                logger.error('Error when update url: %s', e)
                raise
        else:
            return False
项目:uchan    作者:Floens    | 项目源码 | 文件源码
def _set_all_board_names_cache(s):
    all_board_names_q = s.query(BoardOrmModel).options(load_only('name')).order_by(BoardOrmModel.name)
    cache.set(cache_key('all_board_names'), list(map(lambda i: i.name, all_board_names_q.all())))
项目:hoaxy-backend    作者:IUNetSci    | 项目源码 | 文件源码
def get_or_create_murl(session,
                       data,
                       platform_id=None,
                       load_cols=['id', 'date_published']):
    """Get a URL record from table, if not exists, insert it.

    The function is similar as `get_or_create_m`. The difference is how to
    handle duplications. In this function, try to update 'date_published'
    if `data['date_published']` is not None.

    Parameters
    ----------
    session : object
        An instance of SQLAlchemy Session.
    data : dict
        A dict that contains necessary attributes of the ORM objects.
    platform_id : int
        The id of a platform object.
    load_cols : list
        The columns to be loaded. Default is ['id', 'date_published'].

    Returns
    -------
    object
        A URL model object.
    """
    q = session.query(Url).filter_by(raw=data['raw'])\
        .options(load_only(*load_cols))
    murl = q.one_or_none()
    if murl:
        # update date_published if possible
        if murl.date_published is None and \
                data.get('date_published', None) is not None:
            murl.date_published = data['date_published']
            session.commit()
    else:
        murl = Url(**data)
        session.add(murl)
        try:
            session.commit()
        except IntegrityError as e:
            logger.warning('Concurrecy conflict %s', e)
            session.rollback()
            murl = q.one()
    if platform_id is not None:
        append_platform_to_url(session, murl.id, platform_id)
    return murl
项目:hoaxy-backend    作者:IUNetSci    | 项目源码 | 文件源码
def process_item(self, item, spider):
        """Main function that process Article item (third phase)."""
        url_id = item.pop('url_id')
        marticle = spider.session.query(Article)\
            .filter_by(canonical_url=item['canonical_url'])\
            .options(load_only('id', 'date_published', 'date_captured'))\
            .one_or_none()
        # marticle exists
        # update datetime of this article
        if marticle:
            # marticle.date_published is None
            if marticle.date_published is None:
                marticle.date_published = item['date_published']
            # marticle.date_captured > article['date_captured']
            if marticle.date_captured > item['date_captured']:
                marticle.date_captured = item['date_captured']
        # new article
        else:
            if item['site_id'] is not None:
                item['group_id'] = self.get_or_next_group_id(
                    spider.session, item['title'], item['site_id'])
            # create this article
            marticle = Article(**item)
            spider.session.add(marticle)
        # commit changes
        try:
            spider.session.commit()
        except SQLAlchemyError as e:
            logger.error('Error when inserting article: %s', e)
            spider.session.rollback()
            raise DropItem()
        try:
            # finally update url status and article_id
            spider.session.query(Url).filter_by(id=url_id)\
                .update(dict(status_code=U_WP_SUCCESS,
                             article_id=marticle.id))
            spider.session.commit()
        except SQLAlchemyError as e:
            logger.error('Error when update url: %s', e)
            spider.session.rollback()
            raise DropItem()
        item['url_id'] = url_id
        item['id'] = marticle.id
        return item