Python sqlalchemy.orm 模块,contains_eager() 实例源码

我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用sqlalchemy.orm.contains_eager()

项目:neutron-dynamic-routing    作者:openstack    | 项目源码 | 文件源码
def get_dragents_hosting_bgp_speakers(self, context, bgp_speaker_ids,
                                          active=None, admin_state_up=None):
        query = context.session.query(BgpSpeakerDrAgentBinding)
        query = query.options(orm.contains_eager(
                              BgpSpeakerDrAgentBinding.dragent))
        query = query.join(BgpSpeakerDrAgentBinding.dragent)

        if len(bgp_speaker_ids) == 1:
            query = query.filter(
                BgpSpeakerDrAgentBinding.bgp_speaker_id == (
                    bgp_speaker_ids[0]))
        elif bgp_speaker_ids:
            query = query.filter(
                BgpSpeakerDrAgentBinding.bgp_speaker_id in bgp_speaker_ids)
        if admin_state_up is not None:
            query = query.filter(agent_model.Agent.admin_state_up ==
                                 admin_state_up)

        return [binding.dragent
                for binding in query
                if as_db.AgentSchedulerDbMixin.is_eligible_agent(
                                                active, binding.dragent)]
项目:zeus    作者:getsentry    | 项目源码 | 文件源码
def get(self, build: Build):
        """
        Return a list of test cases for a given build.
        """
        query = TestCase.query.options(contains_eager('job')).join(
            Job,
            TestCase.job_id == Job.id,
        ).filter(
            Job.build_id == build.id,
        )

        result = request.args.get('result')
        if result:
            try:
                query = query.filter(TestCase.result == getattr(Result, result))
            except AttributeError:
                raise NotImplementedError

        query = query.order_by((TestCase.result == Result.failed).desc(), TestCase.name.asc())

        return self.paginate_with_schema(testcases_schema, query)
项目:whalestack    作者:2gis    | 项目源码 | 文件源码
def get_l3_agents_hosting_routers(self, context, router_ids,
                                      admin_state_up=None,
                                      active=None):
        if not router_ids:
            return []
        query = context.session.query(RouterL3AgentBinding)
        query = query.options(orm.contains_eager(
                              RouterL3AgentBinding.l3_agent))
        query = query.join(RouterL3AgentBinding.l3_agent)
        query = query.filter(RouterL3AgentBinding.router_id.in_(router_ids))
        if admin_state_up is not None:
            query = (query.filter(agents_db.Agent.admin_state_up ==
                                  admin_state_up))
        l3_agents = [binding.l3_agent for binding in query]
        if active is not None:
            l3_agents = [l3_agent for l3_agent in
                         l3_agents if not
                         agents_db.AgentDbMixin.is_agent_down(
                             l3_agent['heartbeat_timestamp'])]
        return l3_agents
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def fixed_ip_get_by_instance(context, instance_uuid):
    if not uuidutils.is_uuid_like(instance_uuid):
        raise exception.InvalidUUID(uuid=instance_uuid)

    vif_and = and_(models.VirtualInterface.id ==
                   models.FixedIp.virtual_interface_id,
                   models.VirtualInterface.deleted == 0)
    result = model_query(context, models.FixedIp, read_deleted="no").\
                 filter_by(instance_uuid=instance_uuid).\
                 outerjoin(models.VirtualInterface, vif_and).\
                 options(contains_eager("virtual_interface")).\
                 options(joinedload('network')).\
                 options(joinedload('floating_ips')).\
                 order_by(asc(models.VirtualInterface.created_at),
                          asc(models.VirtualInterface.id)).\
                 all()

    if not result:
        raise exception.FixedIpNotFoundForInstance(instance_uuid=instance_uuid)

    return result
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def aggregate_metadata_get_by_host(context, host, key=None):
    query = model_query(context, models.Aggregate)
    query = query.join("_hosts")
    query = query.join("_metadata")
    query = query.filter(models.AggregateHost.host == host)
    query = query.options(contains_eager("_metadata"))

    if key:
        query = query.filter(models.AggregateMetadata.key == key)
    rows = query.all()

    metadata = collections.defaultdict(set)
    for agg in rows:
        for kv in agg._metadata:
            metadata[kv['key']].add(kv['value'])
    return dict(metadata)
项目:weasyl    作者:Weasyl    | 项目源码 | 文件源码
def select_reported_list(userid):
    q = (
        Report.query
        .join(ReportComment)
        .options(contains_eager(Report.comments))
        .options(joinedload('_target_sub'))
        .options(joinedload('_target_char'))
        .options(joinedload('_target_journal'))
        .filter(ReportComment.violation != 0)
        .filter_by(userid=userid))

    reports = q.all()
    for report in reports:
        report.latest_report = max(c.unixtime for c in report.comments)

    reports.sort(key=lambda r: r.latest_report, reverse=True)
    return reports
项目:pushkin    作者:Nordeus    | 项目源码 | 文件源码
def get_localized_message(login_id, message_id):
    '''
    Get message localization for language of a specific user.

    If translation for language of a user doesn't exist English translation is given.
    '''
    with session_scope() as session:
        localized_message = session.query(model.MessageLocalization).\
        from_statement(text("select lm.*, m.* from get_localized_message(:login_id, :message_id) lm inner join message m on lm.message_id = m.id")).\
        params(login_id=login_id, message_id=message_id).\
        options(contains_eager(model.MessageLocalization.message)).\
        one_or_none()
    return localized_message
项目:zun    作者:openstack    | 项目源码 | 文件源码
def list_inventories(self, context, filters=None, limit=None,
                         marker=None, sort_key=None, sort_dir=None):
        session = get_session()
        query = model_query(models.Inventory, session=session)
        query = self._add_inventories_filters(query, filters)
        query = query.join(models.Inventory.resource_provider)
        query = query.options(contains_eager('resource_provider'))
        return _paginate_query(models.Inventory, limit, marker,
                               sort_key, sort_dir, query)
项目:zun    作者:openstack    | 项目源码 | 文件源码
def get_inventory(self, context, inventory_id):
        session = get_session()
        query = model_query(models.Inventory, session=session)
        query = query.join(models.Inventory.resource_provider)
        query = query.options(contains_eager('resource_provider'))
        query = query.filter_by(id=inventory_id)
        try:
            return query.one()
        except NoResultFound:
            raise exception.InventoryNotFound(inventory=inventory_id)
项目:zun    作者:openstack    | 项目源码 | 文件源码
def list_allocations(self, context, filters=None, limit=None,
                         marker=None, sort_key=None, sort_dir=None):
        session = get_session()
        query = model_query(models.Allocation, session=session)
        query = self._add_allocations_filters(query, filters)
        query = query.join(models.Allocation.resource_provider)
        query = query.options(contains_eager('resource_provider'))
        return _paginate_query(models.Allocation, limit, marker,
                               sort_key, sort_dir, query)
项目:zun    作者:openstack    | 项目源码 | 文件源码
def get_allocation(self, context, allocation_id):
        session = get_session()
        query = model_query(models.Allocation, session=session)
        query = query.join(models.Allocation.resource_provider)
        query = query.options(contains_eager('resource_provider'))
        query = query.filter_by(id=allocation_id)
        try:
            return query.one()
        except NoResultFound:
            raise exception.AllocationNotFound(allocation=allocation_id)
项目:quark    作者:openstack    | 项目源码 | 文件源码
def ports_with_security_groups_find(context):
    query = context.session.query(models.Port)
    query = query.join(models.Port.security_groups)
    query = query.options(orm.contains_eager(models.Port.security_groups))
    return query
项目:zeus    作者:getsentry    | 项目源码 | 文件源码
def get(self, build: Build):
        """
        Return a list of style violations for a given build.
        """
        query = StyleViolation.query.options(contains_eager('job')).join(
            Job,
            StyleViolation.job_id == Job.id,
        ).filter(
            Job.build_id == build.id,
        )

        severity = request.args.get('severity')
        if severity:
            try:
                query = query.filter(
                    StyleViolation.severity == getattr(Severity, severity))
            except AttributeError:
                raise NotImplementedError

        query = query.order_by(
            (StyleViolation.severity == Severity.error).desc(),
            StyleViolation.filename.asc(),
            StyleViolation.lineno.asc(),
            StyleViolation.colno.asc())

        return self.paginate_with_schema(styleviolation_schema, query)
项目:zeus    作者:getsentry    | 项目源码 | 文件源码
def get(self, revision: Revision):
        """
        Return a list of test cases for a given revision.
        """
        build = fetch_build_for_revision(revision.repository, revision)
        if not build:
            return self.respond(status=404)
        build_ids = [original.id for original in build.original]
        query = TestCase.query.options(contains_eager('job')).join(
            Job,
            TestCase.job_id == Job.id,
        ).filter(
            Job.build_id.in_(build_ids),
        )

        result = request.args.get('result')
        if result:
            try:
                query = query.filter(
                    TestCase.result == getattr(Result, result))
            except AttributeError:
                raise NotImplementedError

        query = query.order_by(
            (TestCase.result == Result.failed).desc(), TestCase.name.asc())

        return self.paginate_with_schema(testcases_schema, query)
项目:mogan    作者:openstack    | 项目源码 | 文件源码
def aggregate_get_by_metadata_key(self, context, key):
        query = model_query(context, models.Aggregate)
        query = query.join("_metadata")
        query = query.filter(models.AggregateMetadata.key == key)
        query = query.options(contains_eager("_metadata"))
        return query.all()
项目:mogan    作者:openstack    | 项目源码 | 文件源码
def aggregate_get_by_metadata(self, context, key, value):
        query = model_query(context, models.Aggregate)
        query = query.join("_metadata")
        query = query.filter(and_(models.AggregateMetadata.key == key,
                                  models.AggregateMetadata.value == value))
        query = query.options(contains_eager("_metadata"))
        return query.all()
项目:railgun    作者:xin-xinhanggao    | 项目源码 | 文件源码
def _show_handins(username=None):
    """Render an administrator page to show submissions.

    :param user: The interested user's name. If not given, show submissions
        from all users.
    :type user: :class:`str`
    :return: A :class:`flask.Response` object.
    """
    # get pagination argument
    try:
        page = int(request.args.get('page', 1))
    except ValueError:
        page = 1
    try:
        perpage = int(request.args.get('perpage', 10))
    except ValueError:
        perpage = 10
    # query about all handins
    handins = Handin.query.join(Handin.user). \
        options(contains_eager(Handin.user)).filter()
    # whether we want to view the submissions from one single user?
    if username:
        user = User.query.filter(User.name == username).first()
        if not user:
            raise NotFound()
        handins = handins.filter(Handin.user_id == user.id)
    # Sort the handins
    handins = handins.order_by(-Handin.id)
    # build pagination object
    return render_template(
        'admin.handins.html', the_page=handins.paginate(page, perpage),
        username=username
    )
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def aggregate_get_by_metadata_key(context, key):
    """Return rows that match metadata key.

    :param key Matches metadata key.
    """
    query = model_query(context, models.Aggregate)
    query = query.join("_metadata")
    query = query.filter(models.AggregateMetadata.key == key)
    query = query.options(contains_eager("_metadata"))
    query = query.options(joinedload("_hosts"))
    return query.all()
项目:weasyl    作者:Weasyl    | 项目源码 | 文件源码
def comment_tree(cls, target):
        # XXX: needs to filter more stuff out
        comments = (
            cls.query
            .filter_by(**target._comment_criteria())
            .join(Login, cls.userid == Login.userid)
            .options(contains_eager(cls.poster))
            .order_by(cls.unixtime.asc())
            .all())

        comment_map = {}
        ret = []
        users = set()
        for c in comments:
            comment_map[c.commentid] = c
            c.subcomments = []
            if c.parentid:
                comment_map[c.parentid].subcomments.append(c)
            else:
                ret.append(c)
            users.add(c.poster)

        from libweasyl.media import populate_with_user_media
        populate_with_user_media(users)
        ret.reverse()
        for comment in comment_map.values():
            comment.subcomments.reverse()
        return len(comment_map), ret
项目:LabbookDB    作者:TheChymera    | 项目源码 | 文件源码
def animal_info(identifier, database,
    db_path=None,
    ):
    """Return the __str__ attribute of an Animal object query filterd by the id column OR by arguments of the external_id objects.

    Parameters
    ----------

    db_path : string
    Path to a LabbookDB formatted database.

    identifier : int or string
    The identifier of the animal

    database : string or None, optional
    If specified gives a constraint on the AnimalExternalIdentifier.database column AND truns the identifier attribute into a constraint on the AnimalExternalIdentifier.identifier column. If unspecified, the identfier argument is used as a constraint on the Animal.id column.
    """

    if not db_path:
        db_path = os.environ["LDB_PATH"]

    if database and identifier:
        session, engine = load_session(db_path)
        sql_query = session.query(Animal)
        sql_query = sql_query.join(Animal.external_ids)\
        .filter(AnimalExternalIdentifier.database == database).filter(AnimalExternalIdentifier.identifier == identifier)\
        .options(contains_eager('external_ids'))
        identifier = [i for i in sql_query][0].id
        session.close()
        engine.dispose()

    session, engine = load_session(db_path)
    sql_query = session.query(Animal)
    sql_query = sql_query.filter(Animal.id == identifier)
    try:
        animal = [i.__str__() for i in sql_query][0]
    except:
        if database == None:
            print("No entry was found with {id} in the Animal.id column of the database located at {loc}.".format(id=identifier,loc=db_path))
        else:
            print("No entry was found with {id} in the AnimalExternalIdentifier.identifier column and {db} in the AnimalExternalIdentifier.database column of the database located at {loc}.".format(id=identifier,db=database,loc=db_path))
    else:
        print(animal)
    session.close()
    engine.dispose()
项目:weasyl    作者:Weasyl    | 项目源码 | 文件源码
def select_list(userid, form):
    # Find the unique violation types and the number of reporters. This will be
    # joined against the Report model to get the violations/reporters for each
    # selected report.
    subq = (
        ReportComment.dbsession.query(
            ReportComment.reportid,
            sa.func.count(),
            sa.type_coerce(
                sa.func.array_agg(ReportComment.violation.distinct()),
                ARRAY(sa.Integer, as_tuple=True)).label('violations'))
        .filter(ReportComment.violation != 0)
        .group_by(ReportComment.reportid)
        .subquery())

    # Find reports, joining against the aforementioned subquery, and eager-load
    # the reports' owners.
    q = (
        Report.dbsession.query(Report, subq)
        .options(joinedload(Report.owner))
        .join(subq, Report.reportid == subq.c.reportid)
        .reset_joinpoint())

    # For each type of report, eagerly load the content reported and the
    # content's owner. Also, keep track of the Login model aliases used for each
    # report type so they can be filtered against later.
    login_aliases = []
    for column_name in _report_types:
        login_alias = aliased(Login)
        login_aliases.append(login_alias)
        q = (
            q
            .outerjoin(getattr(Report, column_name))
            .outerjoin(login_alias)
            .options(contains_eager(column_name + '.owner', alias=login_alias))
            .reset_joinpoint())

    # Filter by report status. form.status can also be 'all', in which case no
    # filter is applied.
    if form.status == 'closed':
        q = q.filter_by(is_closed=True)
    elif form.status == 'open':
        q = q.filter_by(is_closed=False)

    # If filtering by the report's content's owner, iterate over the previously
    # collected Login model aliases to compare against Login.login_name.
    if form.submitter:
        submitter = legacy.login_name(form.submitter)
        q = q.filter(sa.or_(l.login_name == submitter for l in login_aliases))

    # If filtering by violation type, see if the violation is in the array
    # aggregate of unique violations for this report.
    if form.violation and form.violation != '-1':
        q = q.filter(sa.literal(int(form.violation)) == sa.func.any(subq.c.violations))

    q = q.order_by(Report.opened_at.desc())
    return [(report, report_count, map(_convert_violation, violations))
            for report, _, report_count, violations in q.all()]