Python sqlalchemy.sql 模块,false() 实例源码

我们从Python开源项目中,提取了以下23个代码示例,用于说明如何使用sqlalchemy.sql.false()

项目:craton    作者:openstack    | 项目源码 | 文件源码
def _add_var_filters_to_query(query, model, var_filters, resolved=True):
    # vars filters are of form ?vars=a:b[,c:d,...] - the filters in
    # this case are intersecting ("and" queries)
    kv = dict(pairing.split(':', 1) for pairing in var_filters)
    resource_ids = set(
        resource.id
        for resource in matching_resources(query, model, kv, resolved)
    )
    if not resource_ids:
        # short circuit; this also avoids SQLAlchemy reporting that it is
        # working with an empty "in" clause
        return query.filter(sql.false())
    return query.filter(model.id.in_(resource_ids))
项目:quark    作者:openstack    | 项目源码 | 文件源码
def upgrade():
    op.create_table(
        'reservations',
        sa.Column('id', sa.String(length=36), nullable=False),
        sa.Column('tenant_id', sa.String(length=255), nullable=True),
        sa.Column('expiration', sa.DateTime(), nullable=True),
        sa.PrimaryKeyConstraint('id'))

    op.create_table(
        'resourcedeltas',
        sa.Column('resource', sa.String(length=255), nullable=False),
        sa.Column('reservation_id', sa.String(length=36), nullable=False),
        sa.Column('amount', sa.Integer(), nullable=True),
        sa.ForeignKeyConstraint(['reservation_id'], ['reservations.id'],
                                ondelete='CASCADE'),
        sa.PrimaryKeyConstraint('resource', 'reservation_id'))

    op.create_table(
        'quotausages',
        sa.Column('tenant_id', sa.String(length=255),
                  nullable=False, primary_key=True, index=True),
        sa.Column('resource', sa.String(length=255),
                  nullable=False, primary_key=True, index=True),
        sa.Column('dirty', sa.Boolean(), nullable=False,
                  server_default=sql.false()),
        sa.Column('in_use', sa.Integer(), nullable=False,
                  server_default='0'),
        sa.Column('reserved', sa.Integer(), nullable=False,
                  server_default='0'))
项目:open-pos-api    作者:saurabh1e    | 项目源码 | 文件源码
def has_read_permission(self, qs):
        if current_user.has_permission('view_product'):
            return qs.filter(self.model.retail_shop_id.in_(current_user.retail_shop_ids))
        return qs.filter(false())
项目:open-pos-api    作者:saurabh1e    | 项目源码 | 文件源码
def has_read_permission(self, qs):
        if current_user.has_permission('view_tag'):
            return qs.filter(self.model.retail_shop_id.in_(current_user.retail_shop_ids))
        return qs.filter(false())
项目:open-pos-api    作者:saurabh1e    | 项目源码 | 文件源码
def has_read_permission(self, qs):
        if current_user.has_permission('view_stock'):
            return qs.filter(self.model.retail_shop_id.in_(current_user.retail_shop_ids))
        return qs.filter(false())
项目:open-pos-api    作者:saurabh1e    | 项目源码 | 文件源码
def has_read_permission(self, qs):
        if current_user.has_permission('view_distributor'):
            return qs.filter(self.model.retail_shop_id.in_(current_user.retail_shop_ids))
        return qs.filter(false())
项目:open-pos-api    作者:saurabh1e    | 项目源码 | 文件源码
def has_read_permission(self, qs):
        if current_user.has_permission('view_distributor_bill'):
            return qs.filter(self.model.retail_shop_id.in_(current_user.retail_shop_ids))
        return qs.filter(false())
项目:open-pos-api    作者:saurabh1e    | 项目源码 | 文件源码
def has_read_permission(self, qs):
        if current_user.has_permission('view_tax'):
            return qs.filter(self.model.retail_shop_id.in_(current_user.retail_shop_ids))
        return qs.filter(false())
项目:open-pos-api    作者:saurabh1e    | 项目源码 | 文件源码
def has_read_permission(self, qs):
        if current_user.has_permission('view_salt'):
            return qs.filter(self.model.retail_shop_id.in_(current_user.retail_shop_ids))
        return qs.filter(false())
项目:open-pos-api    作者:saurabh1e    | 项目源码 | 文件源码
def has_read_permission(self, qs):
        if current_user.has_permission('view_add_on'):
            return qs.filter(self.model.retail_shop_id.in_(current_user.retail_shop_ids))
        return qs.filter(false())
项目:open-pos-api    作者:saurabh1e    | 项目源码 | 文件源码
def has_read_permission(self, qs):
        if current_user.has_permission('view_combo'):
            return qs.filter(self.model.retail_shop_id.in_(current_user.retail_shop_ids))
        return qs.filter(false())
项目:open-pos-api    作者:saurabh1e    | 项目源码 | 文件源码
def has_read_permission(self, qs):
        if current_user.has_permission('view_product_distributor'):
            return qs.filter(self.model.retail_shop_id.in_(current_user.retail_shop_ids))
        return qs.filter(false())
项目:open-pos-api    作者:saurabh1e    | 项目源码 | 文件源码
def has_read_permission(self, qs):
        if current_user.has_permission('view_product_salt'):
            return qs.filter(self.model.retail_shop_id.in_(current_user.retail_shop_ids))
        return qs.filter(false())
项目:open-pos-api    作者:saurabh1e    | 项目源码 | 文件源码
def has_read_permission(self, qs):
        if current_user.has_permission('view_product_tax'):
            return qs.filter(self.model.retail_shop_id.in_(current_user.retail_shop_ids))
        return qs.filter(false())
项目:open-pos-api    作者:saurabh1e    | 项目源码 | 文件源码
def has_read_permission(self, qs):
        if current_user.has_permission('view_brand_distributor'):
            return qs.filter(self.model.retail_shop_id.in_(current_user.retail_shop_ids))
        return qs.filter(false())
项目:open-pos-api    作者:saurabh1e    | 项目源码 | 文件源码
def has_read_permission(self, qs):
        if current_user.has_permission('view_order'):
            return qs.filter(self.model.retail_shop_id.in_(current_user.retail_shop_ids))
        else:
            return qs.filter(false())
项目:open-pos-api    作者:saurabh1e    | 项目源码 | 文件源码
def has_add_permission(self, objects):
        if not current_user.has_permission('create_order'):
            return False

        for obj in objects:
            if not current_user.has_shop_access(obj.retail_shop_id):
                return false
        return True
项目:pycroft    作者:agdsn    | 项目源码 | 文件源码
def has_property(self, prop):
        property_granted_select = select(
            [null()],
            from_obj=[
                Property.__table__,
                PropertyGroup.__table__,
                Membership.__table__
            ]
        ).where(
            and_(
                Property.name == prop,
                Property.property_group_id == PropertyGroup.id,
                PropertyGroup.id == Membership.group_id,
                Membership.user_id == self.id,
                Membership.active
            )
        )
        #.cte("property_granted_select")
        return and_(
            not_(exists(
                property_granted_select.where(
                    Property.granted == false())

            )),
            exists(
                property_granted_select.where(
                    Property.granted == true()
                )
            )
        )
项目:clickhouse-sqlalchemy    作者:xzkostyan    | 项目源码 | 文件源码
def test_true_false(self):
        self.assertEqual(self.compile(sql.false()), '0')
        self.assertEqual(self.compile(sql.true()), '1')
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def service_get_minimum_version(context, binary):
    min_version = context.session.query(
        func.min(models.Service.version)).\
                         filter(models.Service.binary == binary).\
                         filter(models.Service.forced_down == false()).\
                         scalar()
    return min_version
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def compute_node_statistics(context):
    """Compute statistics over all compute nodes."""

    # TODO(sbauza): Remove the service_id filter in a later release
    # once we are sure that all compute nodes report the host field
    _filter = or_(models.Service.host == models.ComputeNode.host,
                  models.Service.id == models.ComputeNode.service_id)

    result = model_query(context,
                         models.ComputeNode, (
                             func.count(models.ComputeNode.id),
                             func.sum(models.ComputeNode.vcpus),
                             func.sum(models.ComputeNode.memory_mb),
                             func.sum(models.ComputeNode.local_gb),
                             func.sum(models.ComputeNode.vcpus_used),
                             func.sum(models.ComputeNode.memory_mb_used),
                             func.sum(models.ComputeNode.local_gb_used),
                             func.sum(models.ComputeNode.free_ram_mb),
                             func.sum(models.ComputeNode.free_disk_gb),
                             func.sum(models.ComputeNode.current_workload),
                             func.sum(models.ComputeNode.running_vms),
                             func.sum(models.ComputeNode.disk_available_least),
                         ), read_deleted="no").\
                         filter(models.Service.disabled == false()).\
                         filter(models.Service.binary == "nova-compute").\
                         filter(_filter).\
                         first()

    # Build a dict of the info--making no assumptions about result
    fields = ('count', 'vcpus', 'memory_mb', 'local_gb', 'vcpus_used',
              'memory_mb_used', 'local_gb_used', 'free_ram_mb', 'free_disk_gb',
              'current_workload', 'running_vms', 'disk_available_least')
    return {field: int(result[idx] or 0)
            for idx, field in enumerate(fields)}


###################
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def fixed_ip_disassociate_all_by_timeout(context, host, time):
    # NOTE(vish): only update fixed ips that "belong" to this
    #             host; i.e. the network host or the instance
    #             host matches. Two queries necessary because
    #             join with update doesn't work.
    host_filter = or_(and_(models.Instance.host == host,
                           models.Network.multi_host == true()),
                      models.Network.host == host)
    result = model_query(context, models.FixedIp, (models.FixedIp.id,),
                         read_deleted="no").\
            filter(models.FixedIp.allocated == false()).\
            filter(models.FixedIp.updated_at < time).\
            join((models.Network,
                  models.Network.id == models.FixedIp.network_id)).\
            join((models.Instance,
                  models.Instance.uuid == models.FixedIp.instance_uuid)).\
            filter(host_filter).\
            all()
    fixed_ip_ids = [fip[0] for fip in result]
    if not fixed_ip_ids:
        return 0
    result = model_query(context, models.FixedIp).\
                         filter(models.FixedIp.id.in_(fixed_ip_ids)).\
                         update({'instance_uuid': None,
                                 'leased': False,
                                 'updated_at': timeutils.utcnow()},
                                synchronize_session='fetch')
    return result
项目:whalestack    作者:2gis    | 项目源码 | 文件源码
def reschedule_routers_from_down_agents(self):
        """Reschedule routers from down l3 agents if admin state is up."""
        agent_dead_limit = self.agent_dead_limit_seconds()
        self.wait_down_agents('L3', agent_dead_limit)
        cutoff = self.get_cutoff_time(agent_dead_limit)

        context = n_ctx.get_admin_context()
        down_bindings = (
            context.session.query(RouterL3AgentBinding).
            join(agents_db.Agent).
            filter(agents_db.Agent.heartbeat_timestamp < cutoff,
                   agents_db.Agent.admin_state_up).
            outerjoin(l3_attrs_db.RouterExtraAttributes,
                      l3_attrs_db.RouterExtraAttributes.router_id ==
                      RouterL3AgentBinding.router_id).
            filter(sa.or_(l3_attrs_db.RouterExtraAttributes.ha == sql.false(),
                          l3_attrs_db.RouterExtraAttributes.ha == sql.null())))
        try:
            for binding in down_bindings:
                agent_mode = self._get_agent_mode(binding.l3_agent)
                if agent_mode == constants.L3_AGENT_MODE_DVR:
                    # rescheduling from l3 dvr agent on compute node doesn't
                    # make sense. Router will be removed from that agent once
                    # there are no dvr serviceable ports on that compute node
                    LOG.warn(_LW('L3 DVR agent on node %(host)s is down. '
                                 'Not rescheduling from agent in \'dvr\' '
                                 'mode.'), {'host': binding.l3_agent.host})
                    continue
                LOG.warn(_LW(
                    "Rescheduling router %(router)s from agent %(agent)s "
                    "because the agent did not report to the server in "
                    "the last %(dead_time)s seconds."),
                    {'router': binding.router_id,
                     'agent': binding.l3_agent_id,
                     'dead_time': agent_dead_limit})
                try:
                    self.reschedule_router(context, binding.router_id)
                except (l3agentscheduler.RouterReschedulingFailed,
                        oslo_messaging.RemoteError):
                    # Catch individual router rescheduling errors here
                    # so one broken one doesn't stop the iteration.
                    LOG.exception(_LE("Failed to reschedule router %s"),
                                  binding.router_id)
        except Exception:
            # we want to be thorough and catch whatever is raised
            # to avoid loop abortion
            LOG.exception(_LE("Exception encountered during router "
                              "rescheduling."))