Python sqlalchemy 模块,bindparam() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.bindparam()

项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def search(cls, connection: SQLSession, term: str) -> Sequence['Mod']:
        """Search for Mods that contain TERM in name or summary.

        Keyword arguments:
            connection: Database connection to ask on.
            term: The term to search for.

        Returns:
            Sequence of matching mods (possibly empty).
        """

        query = SQLBakery(lambda conn: conn.query(cls))
        query += lambda q: q.filter(or_(
            cls.name.like(bindparam('term')),
            cls.summary.like(bindparam('term')),
        ))
        query += lambda q: q.order_by(cls.name)

        return query(connection).params(term='%{}%'.format(term)).all()
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def find(cls, connection: SQLSession, name: str) -> 'Mod':
        """Find exactly one Mod named NAME.

        Keyword Arguments:
            connection: Database connection to ask on.
            name: The name of the mod to search for.

        Returns:
            The requested mod.

        Raises:
            NoResultsFound: The name does not match any known mod.
            MultipleResultsFound: The name is too ambiguous,
                multiple matching mods found.
        """

        query = SQLBakery(lambda conn: conn.query(cls))
        query += lambda q: q.filter(cls.name.like(bindparam('name')))

        return query(connection).params(name='{}'.format(name)).one()
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def with_id(cls, connection: SQLSession, id: int) -> 'Mod':
        """Fetch mod with id from database.

        Keyword arguments:
            connection: Database connection to fetch from.
            id: The id field of the mod to get.

        Returns:
            The requested mod.

        Raises:
            NoResultFound: Mod with specified id does not exist.
        """

        query = SQLBakery(lambda conn: conn.query(cls))
        query += lambda q: q.filter(cls.id == bindparam('id'))

        return query(connection).params(id=id).one()
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_bound_limit(self):
        table = self.tables.some_table
        self._assert_result(
            select([table]).order_by(table.c.id).limit(bindparam('l')),
            [(1, 1, 2), (2, 2, 3)],
            params={"l": 2}
        )
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_bound_offset(self):
        table = self.tables.some_table
        self._assert_result(
            select([table]).order_by(table.c.id).offset(bindparam('o')),
            [(3, 3, 4), (4, 4, 5)],
            params={"o": 2}
        )
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_bound_limit_offset(self):
        table = self.tables.some_table
        self._assert_result(
            select([table]).order_by(table.c.id).
            limit(bindparam("l")).offset(bindparam("o")),
            [(2, 2, 3), (3, 3, 4)],
            params={"l": 2, "o": 1}
        )
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def unique_params(self, *optionaldict, **kwargs):
        """Return a copy with :func:`bindparam()` elements replaced.

        Same functionality as ``params()``, except adds `unique=True`
        to affected bind parameters so that multiple statements can be
        used.

        """
        return self._params(True, optionaldict, kwargs)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _params(self, unique, optionaldict, kwargs):
        if len(optionaldict) == 1:
            kwargs.update(optionaldict[0])
        elif len(optionaldict) > 1:
            raise exc.ArgumentError(
                "params() takes zero or one positional dictionary argument")

        def visit_bindparam(bind):
            if bind.key in kwargs:
                bind.value = kwargs[bind.key]
                bind.required = False
            if unique:
                bind._convert_to_unique()
        return cloned_traverse(self, {}, {'bindparam': visit_bindparam})
项目:pushkin    作者:Nordeus    | 项目源码 | 文件源码
def update_canonicals(canonicals):
    '''
    Update canonical data for android devices.
    '''
    global ENGINE
    binding = [{"p_{}".format(k): v for k, v in canonical.items()} for canonical in canonicals]
    device_table = model.metadata.tables['device']
    stmt = update(device_table).\
        values(device_token_new=bindparam('p_new_token')).\
        where(and_(device_table.c.login_id == bindparam('p_login_id'),
                   func.coalesce(device_table.c.device_token_new, device_table.c.device_token) == bindparam('p_old_token')))
    ENGINE.execute(stmt, binding)

    with session_scope() as session:
        query = text('SELECT keep_max_users_per_device( \
                     (:platform_id)::int2, :device_token, (:max_users_per_device)::int2)')
        for canonical in canonicals:
            session.execute(query,
                            {'platform_id': constants.PLATFORM_ANDROID,
                             'device_token': canonical['new_token'],
                             'max_users_per_device': config.max_users_per_device
                            })
            session.execute(query,
                            {'platform_id': constants.PLATFORM_ANDROID_TABLET,
                             'device_token': canonical['new_token'],
                             'max_users_per_device': config.max_users_per_device
                            })
        session.commit()
项目:pushkin    作者:Nordeus    | 项目源码 | 文件源码
def update_unregistered_devices(unregistered):
    '''
    Update data for unregistered Android devices.

    Unregistered device will not receive notifications and will be deleted when number of devices exceeds maximum.
    '''
    global ENGINE
    binding = [{"p_{}".format(k): v for k, v in u.items()} for u in unregistered]
    device_table = model.metadata.tables['device']
    stmt = update(device_table).\
        values(unregistered_ts=func.now()).\
        where(and_(device_table.c.login_id == bindparam('p_login_id'),
                   func.coalesce(device_table.c.device_token_new, device_table.c.device_token) == bindparam('p_device_token')))
    ENGINE.execute(stmt, binding)
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def test_bound_limit(self):
        table = self.tables.some_table
        self._assert_result(
            select([table]).order_by(table.c.id).limit(bindparam('l')),
            [(1, 1, 2), (2, 2, 3)],
            params={"l": 2}
        )
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def test_bound_offset(self):
        table = self.tables.some_table
        self._assert_result(
            select([table]).order_by(table.c.id).offset(bindparam('o')),
            [(3, 3, 4), (4, 4, 5)],
            params={"o": 2}
        )
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def test_bound_limit_offset(self):
        table = self.tables.some_table
        self._assert_result(
            select([table]).order_by(table.c.id).
            limit(bindparam("l")).offset(bindparam("o")),
            [(2, 2, 3), (3, 3, 4)],
            params={"l": 2, "o": 1}
        )
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def unique_params(self, *optionaldict, **kwargs):
        """Return a copy with :func:`bindparam()` elements replaced.

        Same functionality as ``params()``, except adds `unique=True`
        to affected bind parameters so that multiple statements can be
        used.

        """
        return self._params(True, optionaldict, kwargs)
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def _params(self, unique, optionaldict, kwargs):
        if len(optionaldict) == 1:
            kwargs.update(optionaldict[0])
        elif len(optionaldict) > 1:
            raise exc.ArgumentError(
                "params() takes zero or one positional dictionary argument")

        def visit_bindparam(bind):
            if bind.key in kwargs:
                bind.value = kwargs[bind.key]
                bind.required = False
            if unique:
                bind._convert_to_unique()
        return cloned_traverse(self, {}, {'bindparam': visit_bindparam})
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def unique_params(self, *optionaldict, **kwargs):
        """Return a copy with :func:`bindparam()` elements replaced.

        Same functionality as ``params()``, except adds `unique=True`
        to affected bind parameters so that multiple statements can be
        used.

        """
        return self._params(True, optionaldict, kwargs)
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def params(self, *optionaldict, **kwargs):
        """Return a copy with :func:`bindparam()` elements replaced.

        Returns a copy of this ClauseElement with :func:`bindparam()`
        elements replaced with values taken from the given dictionary::

          >>> clause = column('x') + bindparam('foo')
          >>> print clause.compile().params
          {'foo':None}
          >>> print clause.params({'foo':7}).compile().params
          {'foo':7}

        """
        return self._params(False, optionaldict, kwargs)
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def _params(self, unique, optionaldict, kwargs):
        if len(optionaldict) == 1:
            kwargs.update(optionaldict[0])
        elif len(optionaldict) > 1:
            raise exc.ArgumentError(
                "params() takes zero or one positional dictionary argument")

        def visit_bindparam(bind):
            if bind.key in kwargs:
                bind.value = kwargs[bind.key]
                bind.required = False
            if unique:
                bind._convert_to_unique()
        return cloned_traverse(self, {}, {'bindparam': visit_bindparam})
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def unique_params(self, *optionaldict, **kwargs):
        """Return a copy with :func:`bindparam()` elements replaced.

        Same functionality as ``params()``, except adds `unique=True`
        to affected bind parameters so that multiple statements can be
        used.

        """
        return self._params(True, optionaldict, kwargs)
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def params(self, *optionaldict, **kwargs):
        """Return a copy with :func:`bindparam()` elements replaced.

        Returns a copy of this ClauseElement with :func:`bindparam()`
        elements replaced with values taken from the given dictionary::

          >>> clause = column('x') + bindparam('foo')
          >>> print clause.compile().params
          {'foo':None}
          >>> print clause.params({'foo':7}).compile().params
          {'foo':7}

        """
        return self._params(False, optionaldict, kwargs)
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def _params(self, unique, optionaldict, kwargs):
        if len(optionaldict) == 1:
            kwargs.update(optionaldict[0])
        elif len(optionaldict) > 1:
            raise exc.ArgumentError(
                "params() takes zero or one positional dictionary argument")

        def visit_bindparam(bind):
            if bind.key in kwargs:
                bind.value = kwargs[bind.key]
                bind.required = False
            if unique:
                bind._convert_to_unique()
        return cloned_traverse(self, {}, {'bindparam': visit_bindparam})
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def test_bound_limit(self):
        table = self.tables.some_table
        self._assert_result(
            select([table]).order_by(table.c.id).limit(bindparam('l')),
            [(1, 1, 2), (2, 2, 3)],
            params={"l": 2}
        )
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def test_bound_offset(self):
        table = self.tables.some_table
        self._assert_result(
            select([table]).order_by(table.c.id).offset(bindparam('o')),
            [(3, 3, 4), (4, 4, 5)],
            params={"o": 2}
        )
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def test_bound_limit_offset(self):
        table = self.tables.some_table
        self._assert_result(
            select([table]).order_by(table.c.id).
            limit(bindparam("l")).offset(bindparam("o")),
            [(2, 2, 3), (3, 3, 4)],
            params={"l": 2, "o": 1}
        )
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def unique_params(self, *optionaldict, **kwargs):
        """Return a copy with :func:`bindparam()` elements replaced.

        Same functionality as ``params()``, except adds `unique=True`
        to affected bind parameters so that multiple statements can be
        used.

        """
        return self._params(True, optionaldict, kwargs)
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def _params(self, unique, optionaldict, kwargs):
        if len(optionaldict) == 1:
            kwargs.update(optionaldict[0])
        elif len(optionaldict) > 1:
            raise exc.ArgumentError(
                "params() takes zero or one positional dictionary argument")

        def visit_bindparam(bind):
            if bind.key in kwargs:
                bind.value = kwargs[bind.key]
                bind.required = False
            if unique:
                bind._convert_to_unique()
        return cloned_traverse(self, {}, {'bindparam': visit_bindparam})
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def test_bound_limit(self):
        table = self.tables.some_table
        self._assert_result(
            select([table]).order_by(table.c.id).limit(bindparam('l')),
            [(1, 1, 2), (2, 2, 3)],
            params={"l": 2}
        )
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def test_bound_offset(self):
        table = self.tables.some_table
        self._assert_result(
            select([table]).order_by(table.c.id).offset(bindparam('o')),
            [(3, 3, 4), (4, 4, 5)],
            params={"o": 2}
        )
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def test_bound_limit_offset(self):
        table = self.tables.some_table
        self._assert_result(
            select([table]).order_by(table.c.id).
            limit(bindparam("l")).offset(bindparam("o")),
            [(2, 2, 3), (3, 3, 4)],
            params={"l": 2, "o": 1}
        )
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def unique_params(self, *optionaldict, **kwargs):
        """Return a copy with :func:`bindparam()` elements replaced.

        Same functionality as ``params()``, except adds `unique=True`
        to affected bind parameters so that multiple statements can be
        used.

        """
        return self._params(True, optionaldict, kwargs)
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def params(self, *optionaldict, **kwargs):
        """Return a copy with :func:`bindparam()` elements replaced.

        Returns a copy of this ClauseElement with :func:`bindparam()`
        elements replaced with values taken from the given dictionary::

          >>> clause = column('x') + bindparam('foo')
          >>> print clause.compile().params
          {'foo':None}
          >>> print clause.params({'foo':7}).compile().params
          {'foo':7}

        """
        return self._params(False, optionaldict, kwargs)
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def test_bound_limit(self):
        table = self.tables.some_table
        self._assert_result(
            select([table]).order_by(table.c.id).limit(bindparam('l')),
            [(1, 1, 2), (2, 2, 3)],
            params={"l": 2}
        )
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def test_bound_offset(self):
        table = self.tables.some_table
        self._assert_result(
            select([table]).order_by(table.c.id).offset(bindparam('o')),
            [(3, 3, 4), (4, 4, 5)],
            params={"o": 2}
        )
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def test_bound_limit_offset(self):
        table = self.tables.some_table
        self._assert_result(
            select([table]).order_by(table.c.id).
            limit(bindparam("l")).offset(bindparam("o")),
            [(2, 2, 3), (3, 3, 4)],
            params={"l": 2, "o": 1}
        )
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def unique_params(self, *optionaldict, **kwargs):
        """Return a copy with :func:`bindparam()` elements replaced.

        Same functionality as ``params()``, except adds `unique=True`
        to affected bind parameters so that multiple statements can be
        used.

        """
        return self._params(True, optionaldict, kwargs)
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def params(self, *optionaldict, **kwargs):
        """Return a copy with :func:`bindparam()` elements replaced.

        Returns a copy of this ClauseElement with :func:`bindparam()`
        elements replaced with values taken from the given dictionary::

          >>> clause = column('x') + bindparam('foo')
          >>> print clause.compile().params
          {'foo':None}
          >>> print clause.params({'foo':7}).compile().params
          {'foo':7}

        """
        return self._params(False, optionaldict, kwargs)
项目:marvin    作者:sdss    | 项目源码 | 文件源码
def _getPipeInfo(self, pipename):
        ''' Retrieve the pipeline Info for a given pipeline version name '''

        assert pipename.lower() in ['drp', 'dap'], 'Pipeline Name must either be DRP or DAP'

        # bindparam values
        bindname = 'drpver' if pipename.lower() == 'drp' else 'dapver'
        bindvalue = self._drpver if pipename.lower() == 'drp' else self._dapver

        # class names
        if pipename.lower() == 'drp':
            inclasses = self._tableInQuery('cube') or 'cube' in str(self.query.statement.compile())
        elif pipename.lower() == 'dap':
            inclasses = self._tableInQuery('file') or 'file' in str(self.query.statement.compile())

        # set alias
        pipealias = self._drp_alias if pipename.lower() == 'drp' else self._dap_alias

        # get the pipeinfo
        if inclasses:
            pipeinfo = marvindb.session.query(pipealias).\
                join(marvindb.datadb.PipelineName, marvindb.datadb.PipelineVersion).\
                filter(marvindb.datadb.PipelineName.label == pipename.upper(),
                       marvindb.datadb.PipelineVersion.version == bindparam(bindname, bindvalue)).one()
        else:
            pipeinfo = None

        return pipeinfo
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def test_bound_limit(self):
        table = self.tables.some_table
        self._assert_result(
            select([table]).order_by(table.c.id).limit(bindparam('l')),
            [(1, 1, 2), (2, 2, 3)],
            params={"l": 2}
        )
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def test_bound_offset(self):
        table = self.tables.some_table
        self._assert_result(
            select([table]).order_by(table.c.id).offset(bindparam('o')),
            [(3, 3, 4), (4, 4, 5)],
            params={"o": 2}
        )
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def test_bound_limit_offset(self):
        table = self.tables.some_table
        self._assert_result(
            select([table]).order_by(table.c.id).
            limit(bindparam("l")).offset(bindparam("o")),
            [(2, 2, 3), (3, 3, 4)],
            params={"l": 2, "o": 1}
        )
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def unique_params(self, *optionaldict, **kwargs):
        """Return a copy with :func:`bindparam()` elements replaced.

        Same functionality as ``params()``, except adds `unique=True`
        to affected bind parameters so that multiple statements can be
        used.

        """
        return self._params(True, optionaldict, kwargs)
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def _params(self, unique, optionaldict, kwargs):
        if len(optionaldict) == 1:
            kwargs.update(optionaldict[0])
        elif len(optionaldict) > 1:
            raise exc.ArgumentError(
                "params() takes zero or one positional dictionary argument")

        def visit_bindparam(bind):
            if bind.key in kwargs:
                bind.value = kwargs[bind.key]
                bind.required = False
            if unique:
                bind._convert_to_unique()
        return cloned_traverse(self, {}, {'bindparam': visit_bindparam})
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def test_bound_limit(self):
        table = self.tables.some_table
        self._assert_result(
            select([table]).order_by(table.c.id).limit(bindparam('l')),
            [(1, 1, 2), (2, 2, 3)],
            params={"l": 2}
        )
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def test_bound_offset(self):
        table = self.tables.some_table
        self._assert_result(
            select([table]).order_by(table.c.id).offset(bindparam('o')),
            [(3, 3, 4), (4, 4, 5)],
            params={"o": 2}
        )
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def test_bound_limit_offset(self):
        table = self.tables.some_table
        self._assert_result(
            select([table]).order_by(table.c.id).
            limit(bindparam("l")).offset(bindparam("o")),
            [(2, 2, 3), (3, 3, 4)],
            params={"l": 2, "o": 1}
        )
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def params(self, *optionaldict, **kwargs):
        """Return a copy with :func:`bindparam()` elements replaced.

        Returns a copy of this ClauseElement with :func:`bindparam()`
        elements replaced with values taken from the given dictionary::

          >>> clause = column('x') + bindparam('foo')
          >>> print clause.compile().params
          {'foo':None}
          >>> print clause.params({'foo':7}).compile().params
          {'foo':7}

        """
        return self._params(False, optionaldict, kwargs)
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def _params(self, unique, optionaldict, kwargs):
        if len(optionaldict) == 1:
            kwargs.update(optionaldict[0])
        elif len(optionaldict) > 1:
            raise exc.ArgumentError(
                "params() takes zero or one positional dictionary argument")

        def visit_bindparam(bind):
            if bind.key in kwargs:
                bind.value = kwargs[bind.key]
                bind.required = False
            if unique:
                bind._convert_to_unique()
        return cloned_traverse(self, {}, {'bindparam': visit_bindparam})
项目:Callandtext    作者:iaora    | 项目源码 | 文件源码
def test_bound_limit(self):
        table = self.tables.some_table
        self._assert_result(
            select([table]).order_by(table.c.id).limit(bindparam('l')),
            [(1, 1, 2), (2, 2, 3)],
            params={"l": 2}
        )
项目:Callandtext    作者:iaora    | 项目源码 | 文件源码
def test_bound_offset(self):
        table = self.tables.some_table
        self._assert_result(
            select([table]).order_by(table.c.id).offset(bindparam('o')),
            [(3, 3, 4), (4, 4, 5)],
            params={"o": 2}
        )
项目:Callandtext    作者:iaora    | 项目源码 | 文件源码
def test_bound_limit_offset(self):
        table = self.tables.some_table
        self._assert_result(
            select([table]).order_by(table.c.id).
            limit(bindparam("l")).offset(bindparam("o")),
            [(2, 2, 3), (3, 3, 4)],
            params={"l": 2, "o": 1}
        )