Python sqlalchemy.orm 模块,object_session() 实例源码

我们从Python开源项目中,提取了以下25个代码示例,用于说明如何使用sqlalchemy.orm.object_session()

项目:temporal-sqlalchemy    作者:CloverHealth    | 项目源码 | 文件源码
def clock_tick(self, activity: TemporalActivityMixin = None):
        warnings.warn("clock_tick is going away in 0.5.0",
                      PendingDeprecationWarning)
        """Increments vclock by 1 with changes scoped to the session"""
        if self.temporal_options.activity_cls is not None and activity is None:
            raise ValueError("activity is missing on edit") from None

        session = orm.object_session(self)
        with session.no_autoflush:
            yield self

        if session.is_modified(self):
            self.vclock += 1

            new_clock_tick = self.temporal_options.clock_model(
                entity=self, tick=self.vclock)
            if activity is not None:
                new_clock_tick.activity = activity

            session.add(new_clock_tick)
项目:griffith    作者:Strit    | 项目源码 | 文件源码
def _set_loaned_flag(self, flag):
        """Sets loaned flag in current collection and all associated movies.

        :param flag: if True and there are loaned movies in the collection
            already, exception will be raised (whole collection cannot be
            loaned if one of the movies is not available).
            Please also remember to create new entry in loans table later (no
            need to do that if flag is False).
        """

        session = object_session(self)

        if flag: # loaning whole collection
            loaned_movies = session.execute(select([tables.movies.columns.movie_id])\
                    .where(and_(tables.movies.columns.collection_id == self.collection_id,\
                        tables.movies.columns.loaned == True))).fetchall()
            if loaned_movies:
                log.error('cannot loan it, collection contains loaned movie(s): %s', loaned_movies)
                raise Exception('loaned movies in the collection already')

        self._loaned = flag
        update_query = update(tables.movies, tables.movies.columns.collection_id == self.collection_id)
        session.execute(update_query, params={'loaned': flag})
项目:mzidentml_writer    作者:mobiusklein    | 项目源码 | 文件源码
def _composition_listener(attr):
    '''
    Attach event listeners to an InstrumentedAttribute
    to trigger formula parsing on load and on change.
    '''
    @event.listens_for(attr, "set")
    def _update_composition_from_formula(target, value, oldvalue, initiator):
        session = object_session(target)
        if value == "" or value is None:
            return
        # If the object hasn't been associated with a session,
        # we can't look up bricks.
        if session is None:
            return
        target.composition = _formula_parser(value, session)

    @event.listens_for(attr.class_, "load")
    def _update_composition_on_load(target, context):
        value = getattr(target, attr.prop.key)
        if value == "" or value is None:
            return
        session = object_session(target)
        target.composition = _formula_parser(value, session)
项目:mzidentml_writer    作者:mobiusklein    | 项目源码 | 文件源码
def composition(self):
        composition = CompositionType()
        session = object_session(self)
        for fragment_composition_relation in self._fragment_composition:
            symbol = fragment_composition_relation.brick_string
            isotope, element = re.search(r"(?P<isotope>\d*)?(?P<element>\S+)", symbol).groups()
            count = fragment_composition_relation.count
            if count is not None:
                count = int(count)
            else:
                count = 1
            if isotope != "":
                name = _make_isotope_string(element, isotope)
            else:
                name = element
            is_brick = session.query(Brick).filter(Brick.brick == name).first()
            if is_brick is None:
                composition[str(name)] += count
            else:
                composition += is_brick.composition * count
        return composition
项目:osm-wikidata    作者:EdwardBetts    | 项目源码 | 文件源码
def covers(self, item):
        return object_session(self).scalar(
                select([func.ST_Covers(Place.geom, item['location'])]).where(Place.place_id == self.place_id))
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def _record(mapper, target, operation):
        s = orm.object_session(target)
        if isinstance(s, _SignallingSession):
            pk = tuple(mapper.primary_key_from_instance(target))
            s._model_changes[pk] = (target, operation)
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def _record(mapper, target, operation):
        s = orm.object_session(target)
        if isinstance(s, SignallingSession) and s.emit_modification_signals:
            pk = tuple(mapper.primary_key_from_instance(target))
            s._model_changes[pk] = (target, operation)
项目:ml-annotate    作者:planbrothers    | 项目源码 | 文件源码
def dataset_count(self):
        from annotator.models import Dataset

        return object_session(self).query(Dataset).filter(
            Dataset.problem == self
        ).count()
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def _record(mapper, target, operation):
        s = orm.object_session(target)
        if isinstance(s, _SignallingSession):
            pk = tuple(mapper.primary_key_from_instance(target))
            s._model_changes[pk] = (target, operation)
项目:Callandtext    作者:iaora    | 项目源码 | 文件源码
def _record(mapper, target, operation):
        s = orm.object_session(target)
        if isinstance(s, SignallingSession) and s.emit_modification_signals:
            pk = tuple(mapper.primary_key_from_instance(target))
            s._model_changes[pk] = (target, operation)
项目:griffith    作者:Strit    | 项目源码 | 文件源码
def _get_loan_history(self):
        where = [tables.loans.c.movie_id == self.movie_id]
        if self.collection_id is not None:
            where.append(tables.loans.c.collection_id == self.collection_id)
        if self.volume_id is not None:
            where.append(tables.loans.c.volume_id == self.volume_id)
        return object_session(self).query(Loan).filter(\
                and_(tables.loans.c.return_date != None, or_(*where))).all()
项目:griffith    作者:Strit    | 项目源码 | 文件源码
def _get_loan_details(self):
        where = [tables.loans.c.movie_id == self.movie_id]
        if self.collection_id is not None:
            where.append(tables.loans.c.collection_id == self.collection_id)
        if self.volume_id is not None:
            where.append(tables.loans.c.volume_id == self.volume_id)
        return object_session(self).query(Loan).filter(and_(tables.loans.c.return_date == None, or_(*where))).first()
项目:griffith    作者:Strit    | 项目源码 | 文件源码
def _set_loaned_flag(self, flag):
        """Sets loaned flag in current volume and all associated movies.

        :param flag: if True, remember to create new entry in loans table
            later!
        """

        session = object_session(self)

        self._loaned = flag
        update_query = update(tables.movies, tables.movies.columns.volume_id == self.volume_id)
        session.execute(update_query, params={'loaned': flag})
项目:papersummarize    作者:mrdrozdov    | 项目源码 | 文件源码
def set_name(self, name):
        self.name = name
        unique_name = 'ARXIVID_{}_USERNAME_{}_TAGNAME_{}'.format(self.paper.arxiv_id, self.creator.name, name)

        if object_session(self).query(Tag).filter_by(unique_name=unique_name).first() is not None:
            raise ValueError("Tag with unique_name='{}' already exists.".format(unique_name))

        self.unique_name = unique_name
项目:stepmania-server    作者:ningirsu    | 项目源码 | 文件源码
def __init__(self, serv, token=None, session=None, connection=None):
        self.serv = serv
        if not token and not connection:
            raise TypeError("Need to provide a token or a connection")

        self._connection = connection
        if connection:
            token = connection.token
            session = object_session(connection)

        self.token = token
        self.session = session

        self.log = self.serv.log
项目:stepmania-server    作者:ningirsu    | 项目源码 | 文件源码
def best_scores(self):
        """ Return the best score for each feet """

        return [self.best_score(feet[0]) for feet in
                (object_session(self)
                 .query(song_stat.SongStat.feet)
                 .filter_by(song_id=self.id)
                 .group_by(song_stat.SongStat.feet)
                 .order_by(asc(song_stat.SongStat.feet)))
               ]
项目:stepmania-server    作者:ningirsu    | 项目源码 | 文件源码
def highscores(self, feet=None):
        """ Return the highest score for this song """

        query = (object_session(self)
                 .query(song_stat.SongStat)
                 .filter_by(song_id=self.id)
                 .order_by(desc(song_stat.SongStat.score)))
        if feet:
            query = query.filter_by(feet=feet)

        return query
项目:stepmania-server    作者:ningirsu    | 项目源码 | 文件源码
def room_privilege(self, room_id):
        if room_id in self._room_level:
            return self._room_level[room_id]

        priv = Privilege.find(room_id, self.id, object_session(self))

        self._room_level[room_id] = priv

        return priv
项目:stepmania-server    作者:ningirsu    | 项目源码 | 文件源码
def set_level(self, room_id, level):
        session = object_session(self)
        if not room_id:
            self.rank = level
            session.commit()
            return level

        priv = Privilege.find_or_update(room_id, self.id, session, level=level)
        self._room_level[room_id] = priv

        return level
项目:stepmania-server    作者:ningirsu    | 项目源码 | 文件源码
def last_game(self):
        return (object_session(self)
                .query(game.Game)
                .filter_by(room_id=self.id)
                .order_by(desc(game.Game.created_at))
                .first())
项目:stepmania-server    作者:ningirsu    | 项目源码 | 文件源码
def nb_players(self):
        """ Get the numer of users in the room """
        if self._nb_players:
            return self._nb_players

        self._nb_players = (
            object_session(self)
            .query(func.count(user.User.id))
            .filter_by(online=True, room_id=self.id)
            .scalar())

        return self._nb_players
项目:stepmania-server    作者:ningirsu    | 项目源码 | 文件源码
def online_users(self):
        """ Get the onlines user in this room """

        return (object_session(self)
                .query(user.User)
                .filter_by(online=True, room_id=self.id))
项目:stepmania-server    作者:ningirsu    | 项目源码 | 文件源码
def moderators(self):
        """ Get all the moderators in this room """

        return (object_session(self)
                .query(user.User)
                .join(privilege.Privilege)
                .filter(
                    privilege.Privilege.room_id == self.id,
                    privilege.Privilege.level >= 5
                ))
项目:nokia-deployer    作者:nokia    | 项目源码 | 文件源码
def default_roles(self):
        if self.username == 'default':
            return []
        if self._default_roles_cache is not None:
            return self._default_roles_cache
        self._default_roles_cache = orm.object_session(self).\
            query(Role).join(User, Role.users).\
            filter(User.username == 'default').\
            all()
        return self._default_roles_cache
项目:griffith    作者:Strit    | 项目源码 | 文件源码
def loan_to(self, person, whole_collection=False):
        """
        Loans movie, all other movies from the same volume and optionally
        movie's collection.

        :param person: Person instance or person_id.
        :param whole_collection=False: if True, will loan all movies from the same
            collection.
        """

        if self.loaned:
            log.warn('movie already loaned: %s', self.loan_details)
            return False

        session = object_session(self)

        if hasattr(person, 'person_id'):
            person = person.person_id
        elif not isinstance(person, int):
            raise ValueError("expecting int or Person instance, got %s instead" % type(person))

        loan = Loan()
        loan.person_id = person
        loan.movie = self

        if whole_collection:
            if self.collection:
                # next line will update the status of all other movies in collection
                # or raise and OtherMovieAlreadyLoanedError
                self.collection.loaned = True
                loan.collection_id = self.collection_id
            else:
                log.debug('this movie doesn\'t have collection assigned, whole_collection param ignored')

        if self.volume:
            # next line will update the status of all other movies in volume
            self.volume.loaned = True
            loan.volume_id = self.volume_id

        self.loaned = True
        session.add(loan)

        return True