Python psycopg2.extensions 模块,ISOLATION_LEVEL_READ_COMMITTED 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED

项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def test_isolation_level_read_committed(self):
        cnn1 = self.connect()
        cnn2 = self.connect()
        cnn2.set_isolation_level(ext.ISOLATION_LEVEL_READ_COMMITTED)

        cur1 = cnn1.cursor()
        cur1.execute("select count(*) from isolevel;")
        self.assertEqual(0, cur1.fetchone()[0])
        cnn1.commit()

        cur2 = cnn2.cursor()
        cur2.execute("insert into isolevel values (10);")
        cur1.execute("insert into isolevel values (20);")

        cur2.execute("select count(*) from isolevel;")
        self.assertEqual(1, cur2.fetchone()[0])
        cnn1.commit()
        cur2.execute("select count(*) from isolevel;")
        self.assertEqual(2, cur2.fetchone()[0])

        cur1.execute("select count(*) from isolevel;")
        self.assertEqual(1, cur1.fetchone()[0])
        cnn2.commit()
        cur1.execute("select count(*) from isolevel;")
        self.assertEqual(2, cur1.fetchone()[0])
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def test_isolation_level_read_committed(self):
        cnn1 = self.connect()
        cnn2 = self.connect()
        cnn2.set_isolation_level(ext.ISOLATION_LEVEL_READ_COMMITTED)

        cur1 = cnn1.cursor()
        cur1.execute("select count(*) from isolevel;")
        self.assertEqual(0, cur1.fetchone()[0])
        cnn1.commit()

        cur2 = cnn2.cursor()
        cur2.execute("insert into isolevel values (10);")
        cur1.execute("insert into isolevel values (20);")

        cur2.execute("select count(*) from isolevel;")
        self.assertEqual(1, cur2.fetchone()[0])
        cnn1.commit()
        cur2.execute("select count(*) from isolevel;")
        self.assertEqual(2, cur2.fetchone()[0])

        cur1.execute("select count(*) from isolevel;")
        self.assertEqual(1, cur1.fetchone()[0])
        cnn2.commit()
        cur1.execute("select count(*) from isolevel;")
        self.assertEqual(2, cur1.fetchone()[0])
项目:nmbs-realtime-feed    作者:datamindedbe    | 项目源码 | 文件源码
def test_isolation_level_read_committed(self):
        cnn1 = self.connect()
        cnn2 = self.connect()
        cnn2.set_isolation_level(ext.ISOLATION_LEVEL_READ_COMMITTED)

        cur1 = cnn1.cursor()
        cur1.execute("select count(*) from isolevel;")
        self.assertEqual(0, cur1.fetchone()[0])
        cnn1.commit()

        cur2 = cnn2.cursor()
        cur2.execute("insert into isolevel values (10);")
        cur1.execute("insert into isolevel values (20);")

        cur2.execute("select count(*) from isolevel;")
        self.assertEqual(1, cur2.fetchone()[0])
        cnn1.commit()
        cur2.execute("select count(*) from isolevel;")
        self.assertEqual(2, cur2.fetchone()[0])

        cur1.execute("select count(*) from isolevel;")
        self.assertEqual(1, cur1.fetchone()[0])
        cnn2.commit()
        cur1.execute("select count(*) from isolevel;")
        self.assertEqual(2, cur1.fetchone()[0])
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _isolation_lookup(self):
        extensions = self._psycopg2_extensions()
        return {
            'AUTOCOMMIT': extensions.ISOLATION_LEVEL_AUTOCOMMIT,
            'READ COMMITTED': extensions.ISOLATION_LEVEL_READ_COMMITTED,
            'READ UNCOMMITTED': extensions.ISOLATION_LEVEL_READ_UNCOMMITTED,
            'REPEATABLE READ': extensions.ISOLATION_LEVEL_REPEATABLE_READ,
            'SERIALIZABLE': extensions.ISOLATION_LEVEL_SERIALIZABLE
        }
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def connect(*args, **kwargs):
    """connect(dsn, ...) -> new psycopg 1.1.x compatible connection object"""
    kwargs['connection_factory'] = connection
    conn = _2connect(*args, **kwargs)
    conn.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
    return conn
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def autocommit(self, on_off=1):
        """autocommit(on_off=1) -> switch autocommit on (1) or off (0)"""
        if on_off > 0:
            self.set_isolation_level(_ext.ISOLATION_LEVEL_AUTOCOMMIT)
        else:
            self.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def test_set_isolation_level(self):
        conn = self.connect()
        curs = conn.cursor()

        levels = [
            ('read uncommitted',
                ext.ISOLATION_LEVEL_READ_UNCOMMITTED),
            ('read committed', ext.ISOLATION_LEVEL_READ_COMMITTED),
            ('repeatable read', ext.ISOLATION_LEVEL_REPEATABLE_READ),
            ('serializable', ext.ISOLATION_LEVEL_SERIALIZABLE),
        ]
        for name, level in levels:
            conn.set_isolation_level(level)

            # the only values available on prehistoric PG versions
            if conn.server_version < 80000:
                if level in (
                        ext.ISOLATION_LEVEL_READ_UNCOMMITTED,
                        ext.ISOLATION_LEVEL_REPEATABLE_READ):
                    name, level = levels[levels.index((name, level)) + 1]

            self.assertEqual(conn.isolation_level, level)

            curs.execute('show transaction_isolation;')
            got_name = curs.fetchone()[0]

            self.assertEqual(name, got_name)
            conn.commit()

        self.assertRaises(ValueError, conn.set_isolation_level, -1)
        self.assertRaises(ValueError, conn.set_isolation_level, 5)
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def test_set_isolation_level_abort(self):
        conn = self.connect()
        cur = conn.cursor()

        self.assertEqual(ext.TRANSACTION_STATUS_IDLE,
            conn.get_transaction_status())
        cur.execute("insert into isolevel values (10);")
        self.assertEqual(ext.TRANSACTION_STATUS_INTRANS,
            conn.get_transaction_status())

        conn.set_isolation_level(
            psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
        self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
            conn.get_transaction_status())
        cur.execute("select count(*) from isolevel;")
        self.assertEqual(0, cur.fetchone()[0])

        cur.execute("insert into isolevel values (10);")
        self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_INTRANS,
            conn.get_transaction_status())
        conn.set_isolation_level(
            psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
        self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
            conn.get_transaction_status())
        cur.execute("select count(*) from isolevel;")
        self.assertEqual(0, cur.fetchone()[0])

        cur.execute("insert into isolevel values (10);")
        self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
            conn.get_transaction_status())
        conn.set_isolation_level(
            psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
        self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
            conn.get_transaction_status())
        cur.execute("select count(*) from isolevel;")
        self.assertEqual(1, cur.fetchone()[0])
        self.assertEqual(conn.isolation_level,
            psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def test_set_isolation_level(self):
        cur = self.conn.cursor()
        self.conn.set_session(
            ext.ISOLATION_LEVEL_SERIALIZABLE)
        cur.execute("SHOW transaction_isolation;")
        self.assertEqual(cur.fetchone()[0], 'serializable')
        self.conn.rollback()

        self.conn.set_session(
            ext.ISOLATION_LEVEL_REPEATABLE_READ)
        cur.execute("SHOW transaction_isolation;")
        if self.conn.server_version > 80000:
            self.assertEqual(cur.fetchone()[0], 'repeatable read')
        else:
            self.assertEqual(cur.fetchone()[0], 'serializable')
        self.conn.rollback()

        self.conn.set_session(
            isolation_level=ext.ISOLATION_LEVEL_READ_COMMITTED)
        cur.execute("SHOW transaction_isolation;")
        self.assertEqual(cur.fetchone()[0], 'read committed')
        self.conn.rollback()

        self.conn.set_session(
            isolation_level=ext.ISOLATION_LEVEL_READ_UNCOMMITTED)
        cur.execute("SHOW transaction_isolation;")
        if self.conn.server_version > 80000:
            self.assertEqual(cur.fetchone()[0], 'read uncommitted')
        else:
            self.assertEqual(cur.fetchone()[0], 'read committed')
        self.conn.rollback()
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def connect(*args, **kwargs):
    """connect(dsn, ...) -> new psycopg 1.1.x compatible connection object"""
    kwargs['connection_factory'] = connection
    conn = _2connect(*args, **kwargs)
    conn.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
    return conn
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def autocommit(self, on_off=1):
        """autocommit(on_off=1) -> switch autocommit on (1) or off (0)"""
        if on_off > 0:
            self.set_isolation_level(_ext.ISOLATION_LEVEL_AUTOCOMMIT)
        else:
            self.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def test_set_isolation_level(self):
        conn = self.connect()
        curs = conn.cursor()

        levels = [
            ('read uncommitted',
                ext.ISOLATION_LEVEL_READ_UNCOMMITTED),
            ('read committed', ext.ISOLATION_LEVEL_READ_COMMITTED),
            ('repeatable read', ext.ISOLATION_LEVEL_REPEATABLE_READ),
            ('serializable', ext.ISOLATION_LEVEL_SERIALIZABLE),
        ]
        for name, level in levels:
            conn.set_isolation_level(level)

            # the only values available on prehistoric PG versions
            if conn.server_version < 80000:
                if level in (
                        ext.ISOLATION_LEVEL_READ_UNCOMMITTED,
                        ext.ISOLATION_LEVEL_REPEATABLE_READ):
                    name, level = levels[levels.index((name, level)) + 1]

            self.assertEqual(conn.isolation_level, level)

            curs.execute('show transaction_isolation;')
            got_name = curs.fetchone()[0]

            self.assertEqual(name, got_name)
            conn.commit()

        self.assertRaises(ValueError, conn.set_isolation_level, -1)
        self.assertRaises(ValueError, conn.set_isolation_level, 5)
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def test_set_isolation_level(self):
        cur = self.conn.cursor()
        self.conn.set_session(
            ext.ISOLATION_LEVEL_SERIALIZABLE)
        cur.execute("SHOW transaction_isolation;")
        self.assertEqual(cur.fetchone()[0], 'serializable')
        self.conn.rollback()

        self.conn.set_session(
            ext.ISOLATION_LEVEL_REPEATABLE_READ)
        cur.execute("SHOW transaction_isolation;")
        if self.conn.server_version > 80000:
            self.assertEqual(cur.fetchone()[0], 'repeatable read')
        else:
            self.assertEqual(cur.fetchone()[0], 'serializable')
        self.conn.rollback()

        self.conn.set_session(
            isolation_level=ext.ISOLATION_LEVEL_READ_COMMITTED)
        cur.execute("SHOW transaction_isolation;")
        self.assertEqual(cur.fetchone()[0], 'read committed')
        self.conn.rollback()

        self.conn.set_session(
            isolation_level=ext.ISOLATION_LEVEL_READ_UNCOMMITTED)
        cur.execute("SHOW transaction_isolation;")
        if self.conn.server_version > 80000:
            self.assertEqual(cur.fetchone()[0], 'read uncommitted')
        else:
            self.assertEqual(cur.fetchone()[0], 'read committed')
        self.conn.rollback()
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def _isolation_lookup(self):
        extensions = self._psycopg2_extensions()
        return {
            'AUTOCOMMIT': extensions.ISOLATION_LEVEL_AUTOCOMMIT,
            'READ COMMITTED': extensions.ISOLATION_LEVEL_READ_COMMITTED,
            'READ UNCOMMITTED': extensions.ISOLATION_LEVEL_READ_UNCOMMITTED,
            'REPEATABLE READ': extensions.ISOLATION_LEVEL_REPEATABLE_READ,
            'SERIALIZABLE': extensions.ISOLATION_LEVEL_SERIALIZABLE
        }
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def _isolation_lookup(self):
        from psycopg2 import extensions
        return {
            'AUTOCOMMIT': extensions.ISOLATION_LEVEL_AUTOCOMMIT,
            'READ COMMITTED': extensions.ISOLATION_LEVEL_READ_COMMITTED,
            'READ UNCOMMITTED': extensions.ISOLATION_LEVEL_READ_UNCOMMITTED,
            'REPEATABLE READ': extensions.ISOLATION_LEVEL_REPEATABLE_READ,
            'SERIALIZABLE': extensions.ISOLATION_LEVEL_SERIALIZABLE
        }
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def _isolation_lookup(self):
        from psycopg2 import extensions
        return {
            'AUTOCOMMIT': extensions.ISOLATION_LEVEL_AUTOCOMMIT,
            'READ COMMITTED': extensions.ISOLATION_LEVEL_READ_COMMITTED,
            'READ UNCOMMITTED': extensions.ISOLATION_LEVEL_READ_UNCOMMITTED,
            'REPEATABLE READ': extensions.ISOLATION_LEVEL_REPEATABLE_READ,
            'SERIALIZABLE': extensions.ISOLATION_LEVEL_SERIALIZABLE
        }
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def _isolation_lookup(self):
        extensions = self._psycopg2_extensions()
        return {
            'AUTOCOMMIT': extensions.ISOLATION_LEVEL_AUTOCOMMIT,
            'READ COMMITTED': extensions.ISOLATION_LEVEL_READ_COMMITTED,
            'READ UNCOMMITTED': extensions.ISOLATION_LEVEL_READ_UNCOMMITTED,
            'REPEATABLE READ': extensions.ISOLATION_LEVEL_REPEATABLE_READ,
            'SERIALIZABLE': extensions.ISOLATION_LEVEL_SERIALIZABLE
        }
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def _isolation_lookup(self):
        extensions = self._psycopg2_extensions()
        return {
            'AUTOCOMMIT': extensions.ISOLATION_LEVEL_AUTOCOMMIT,
            'READ COMMITTED': extensions.ISOLATION_LEVEL_READ_COMMITTED,
            'READ UNCOMMITTED': extensions.ISOLATION_LEVEL_READ_UNCOMMITTED,
            'REPEATABLE READ': extensions.ISOLATION_LEVEL_REPEATABLE_READ,
            'SERIALIZABLE': extensions.ISOLATION_LEVEL_SERIALIZABLE
        }
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def connect(*args, **kwargs):
    """connect(dsn, ...) -> new psycopg 1.1.x compatible connection object"""
    kwargs['connection_factory'] = connection
    conn = _2connect(*args, **kwargs)
    conn.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
    return conn
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def autocommit(self, on_off=1):
        """autocommit(on_off=1) -> switch autocommit on (1) or off (0)"""
        if on_off > 0:
            self.set_isolation_level(_ext.ISOLATION_LEVEL_AUTOCOMMIT)
        else:
            self.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def test_set_isolation_level(self):
        conn = self.connect()
        curs = conn.cursor()

        levels = [
            ('read uncommitted',
                ext.ISOLATION_LEVEL_READ_UNCOMMITTED),
            ('read committed', ext.ISOLATION_LEVEL_READ_COMMITTED),
            ('repeatable read', ext.ISOLATION_LEVEL_REPEATABLE_READ),
            ('serializable', ext.ISOLATION_LEVEL_SERIALIZABLE),
        ]
        for name, level in levels:
            conn.set_isolation_level(level)

            # the only values available on prehistoric PG versions
            if conn.server_version < 80000:
                if level in (
                        ext.ISOLATION_LEVEL_READ_UNCOMMITTED,
                        ext.ISOLATION_LEVEL_REPEATABLE_READ):
                    name, level = levels[levels.index((name, level)) + 1]

            self.assertEqual(conn.isolation_level, level)

            curs.execute('show transaction_isolation;')
            got_name = curs.fetchone()[0]

            self.assertEqual(name, got_name)
            conn.commit()

        self.assertRaises(ValueError, conn.set_isolation_level, -1)
        self.assertRaises(ValueError, conn.set_isolation_level, 5)
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def test_set_isolation_level_abort(self):
        conn = self.connect()
        cur = conn.cursor()

        self.assertEqual(ext.TRANSACTION_STATUS_IDLE,
            conn.get_transaction_status())
        cur.execute("insert into isolevel values (10);")
        self.assertEqual(ext.TRANSACTION_STATUS_INTRANS,
            conn.get_transaction_status())

        conn.set_isolation_level(
            psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
        self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
            conn.get_transaction_status())
        cur.execute("select count(*) from isolevel;")
        self.assertEqual(0, cur.fetchone()[0])

        cur.execute("insert into isolevel values (10);")
        self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_INTRANS,
            conn.get_transaction_status())
        conn.set_isolation_level(
            psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
        self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
            conn.get_transaction_status())
        cur.execute("select count(*) from isolevel;")
        self.assertEqual(0, cur.fetchone()[0])

        cur.execute("insert into isolevel values (10);")
        self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
            conn.get_transaction_status())
        conn.set_isolation_level(
            psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
        self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
            conn.get_transaction_status())
        cur.execute("select count(*) from isolevel;")
        self.assertEqual(1, cur.fetchone()[0])
        self.assertEqual(conn.isolation_level,
            psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def test_set_isolation_level(self):
        cur = self.conn.cursor()
        self.conn.set_session(
            ext.ISOLATION_LEVEL_SERIALIZABLE)
        cur.execute("SHOW transaction_isolation;")
        self.assertEqual(cur.fetchone()[0], 'serializable')
        self.conn.rollback()

        self.conn.set_session(
            ext.ISOLATION_LEVEL_REPEATABLE_READ)
        cur.execute("SHOW transaction_isolation;")
        if self.conn.server_version > 80000:
            self.assertEqual(cur.fetchone()[0], 'repeatable read')
        else:
            self.assertEqual(cur.fetchone()[0], 'serializable')
        self.conn.rollback()

        self.conn.set_session(
            isolation_level=ext.ISOLATION_LEVEL_READ_COMMITTED)
        cur.execute("SHOW transaction_isolation;")
        self.assertEqual(cur.fetchone()[0], 'read committed')
        self.conn.rollback()

        self.conn.set_session(
            isolation_level=ext.ISOLATION_LEVEL_READ_UNCOMMITTED)
        cur.execute("SHOW transaction_isolation;")
        if self.conn.server_version > 80000:
            self.assertEqual(cur.fetchone()[0], 'read uncommitted')
        else:
            self.assertEqual(cur.fetchone()[0], 'read committed')
        self.conn.rollback()
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def _isolation_lookup(self):
        extensions = __import__('psycopg2.extensions').extensions
        return {
            'AUTOCOMMIT': extensions.ISOLATION_LEVEL_AUTOCOMMIT,
            'READ COMMITTED': extensions.ISOLATION_LEVEL_READ_COMMITTED,
            'READ UNCOMMITTED': extensions.ISOLATION_LEVEL_READ_UNCOMMITTED,
            'REPEATABLE READ': extensions.ISOLATION_LEVEL_REPEATABLE_READ,
            'SERIALIZABLE': extensions.ISOLATION_LEVEL_SERIALIZABLE
        }
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def connect(*args, **kwargs):
    """connect(dsn, ...) -> new psycopg 1.1.x compatible connection object"""
    kwargs['connection_factory'] = connection
    conn = _2connect(*args, **kwargs)
    conn.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
    return conn
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def autocommit(self, on_off=1):
        """autocommit(on_off=1) -> switch autocommit on (1) or off (0)"""
        if on_off > 0:
            self.set_isolation_level(_ext.ISOLATION_LEVEL_AUTOCOMMIT)
        else:
            self.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def connect(*args, **kwargs):
    """connect(dsn, ...) -> new psycopg 1.1.x compatible connection object"""
    kwargs['connection_factory'] = connection
    conn = _2connect(*args, **kwargs)
    conn.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
    return conn
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def autocommit(self, on_off=1):
        """autocommit(on_off=1) -> switch autocommit on (1) or off (0)"""
        if on_off > 0:
            self.set_isolation_level(_ext.ISOLATION_LEVEL_AUTOCOMMIT)
        else:
            self.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def connect(*args, **kwargs):
    """connect(dsn, ...) -> new psycopg 1.1.x compatible connection object"""
    kwargs['connection_factory'] = connection
    conn = _2connect(*args, **kwargs)
    conn.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
    return conn
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def connect(*args, **kwargs):
    """connect(dsn, ...) -> new psycopg 1.1.x compatible connection object"""
    kwargs['connection_factory'] = connection
    conn = _2connect(*args, **kwargs)
    conn.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
    return conn
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def autocommit(self, on_off=1):
        """autocommit(on_off=1) -> switch autocommit on (1) or off (0)"""
        if on_off > 0:
            self.set_isolation_level(_ext.ISOLATION_LEVEL_AUTOCOMMIT)
        else:
            self.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def connect(*args, **kwargs):
    """connect(dsn, ...) -> new psycopg 1.1.x compatible connection object"""
    kwargs['connection_factory'] = connection
    conn = _2connect(*args, **kwargs)
    conn.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
    return conn
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def autocommit(self, on_off=1):
        """autocommit(on_off=1) -> switch autocommit on (1) or off (0)"""
        if on_off > 0:
            self.set_isolation_level(_ext.ISOLATION_LEVEL_AUTOCOMMIT)
        else:
            self.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def connect(*args, **kwargs):
    """connect(dsn, ...) -> new psycopg 1.1.x compatible connection object"""
    kwargs['connection_factory'] = connection
    conn = _2connect(*args, **kwargs)
    conn.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
    return conn
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def connect(*args, **kwargs):
    """connect(dsn, ...) -> new psycopg 1.1.x compatible connection object"""
    kwargs['connection_factory'] = connection
    conn = _2connect(*args, **kwargs)
    conn.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
    return conn
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def autocommit(self, on_off=1):
        """autocommit(on_off=1) -> switch autocommit on (1) or off (0)"""
        if on_off > 0:
            self.set_isolation_level(_ext.ISOLATION_LEVEL_AUTOCOMMIT)
        else:
            self.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def connect(*args, **kwargs):
    """connect(dsn, ...) -> new psycopg 1.1.x compatible connection object"""
    kwargs['connection_factory'] = connection
    conn = _2connect(*args, **kwargs)
    conn.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
    return conn
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def autocommit(self, on_off=1):
        """autocommit(on_off=1) -> switch autocommit on (1) or off (0)"""
        if on_off > 0:
            self.set_isolation_level(_ext.ISOLATION_LEVEL_AUTOCOMMIT)
        else:
            self.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def connect(*args, **kwargs):
    """connect(dsn, ...) -> new psycopg 1.1.x compatible connection object"""
    kwargs['connection_factory'] = connection
    conn = _2connect(*args, **kwargs)
    conn.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
    return conn
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def _isolation_lookup(self):
        extensions = self._psycopg2_extensions()
        return {
            'AUTOCOMMIT': extensions.ISOLATION_LEVEL_AUTOCOMMIT,
            'READ COMMITTED': extensions.ISOLATION_LEVEL_READ_COMMITTED,
            'READ UNCOMMITTED': extensions.ISOLATION_LEVEL_READ_UNCOMMITTED,
            'REPEATABLE READ': extensions.ISOLATION_LEVEL_REPEATABLE_READ,
            'SERIALIZABLE': extensions.ISOLATION_LEVEL_SERIALIZABLE
        }
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def connect(*args, **kwargs):
    """connect(dsn, ...) -> new psycopg 1.1.x compatible connection object"""
    kwargs['connection_factory'] = connection
    conn = _2connect(*args, **kwargs)
    conn.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
    return conn
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def autocommit(self, on_off=1):
        """autocommit(on_off=1) -> switch autocommit on (1) or off (0)"""
        if on_off > 0:
            self.set_isolation_level(_ext.ISOLATION_LEVEL_AUTOCOMMIT)
        else:
            self.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
项目:nmbs-realtime-feed    作者:datamindedbe    | 项目源码 | 文件源码
def connect(*args, **kwargs):
    """connect(dsn, ...) -> new psycopg 1.1.x compatible connection object"""
    kwargs['connection_factory'] = connection
    conn = _2connect(*args, **kwargs)
    conn.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
    return conn
项目:nmbs-realtime-feed    作者:datamindedbe    | 项目源码 | 文件源码
def autocommit(self, on_off=1):
        """autocommit(on_off=1) -> switch autocommit on (1) or off (0)"""
        if on_off > 0:
            self.set_isolation_level(_ext.ISOLATION_LEVEL_AUTOCOMMIT)
        else:
            self.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
项目:nmbs-realtime-feed    作者:datamindedbe    | 项目源码 | 文件源码
def test_set_isolation_level(self):
        conn = self.connect()
        curs = conn.cursor()

        levels = [
            ('read uncommitted',
                ext.ISOLATION_LEVEL_READ_UNCOMMITTED),
            ('read committed', ext.ISOLATION_LEVEL_READ_COMMITTED),
            ('repeatable read', ext.ISOLATION_LEVEL_REPEATABLE_READ),
            ('serializable', ext.ISOLATION_LEVEL_SERIALIZABLE),
        ]
        for name, level in levels:
            conn.set_isolation_level(level)

            # the only values available on prehistoric PG versions
            if conn.server_version < 80000:
                if level in (
                        ext.ISOLATION_LEVEL_READ_UNCOMMITTED,
                        ext.ISOLATION_LEVEL_REPEATABLE_READ):
                    name, level = levels[levels.index((name, level)) + 1]

            self.assertEqual(conn.isolation_level, level)

            curs.execute('show transaction_isolation;')
            got_name = curs.fetchone()[0]

            self.assertEqual(name, got_name)
            conn.commit()

        self.assertRaises(ValueError, conn.set_isolation_level, -1)
        self.assertRaises(ValueError, conn.set_isolation_level, 5)
项目:nmbs-realtime-feed    作者:datamindedbe    | 项目源码 | 文件源码
def test_set_isolation_level_abort(self):
        conn = self.connect()
        cur = conn.cursor()

        self.assertEqual(ext.TRANSACTION_STATUS_IDLE,
            conn.get_transaction_status())
        cur.execute("insert into isolevel values (10);")
        self.assertEqual(ext.TRANSACTION_STATUS_INTRANS,
            conn.get_transaction_status())

        conn.set_isolation_level(
            psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
        self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
            conn.get_transaction_status())
        cur.execute("select count(*) from isolevel;")
        self.assertEqual(0, cur.fetchone()[0])

        cur.execute("insert into isolevel values (10);")
        self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_INTRANS,
            conn.get_transaction_status())
        conn.set_isolation_level(
            psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
        self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
            conn.get_transaction_status())
        cur.execute("select count(*) from isolevel;")
        self.assertEqual(0, cur.fetchone()[0])

        cur.execute("insert into isolevel values (10);")
        self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
            conn.get_transaction_status())
        conn.set_isolation_level(
            psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
        self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
            conn.get_transaction_status())
        cur.execute("select count(*) from isolevel;")
        self.assertEqual(1, cur.fetchone()[0])
        self.assertEqual(conn.isolation_level,
            psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
项目:nmbs-realtime-feed    作者:datamindedbe    | 项目源码 | 文件源码
def test_set_isolation_level(self):
        cur = self.conn.cursor()
        self.conn.set_session(
            ext.ISOLATION_LEVEL_SERIALIZABLE)
        cur.execute("SHOW transaction_isolation;")
        self.assertEqual(cur.fetchone()[0], 'serializable')
        self.conn.rollback()

        self.conn.set_session(
            ext.ISOLATION_LEVEL_REPEATABLE_READ)
        cur.execute("SHOW transaction_isolation;")
        if self.conn.server_version > 80000:
            self.assertEqual(cur.fetchone()[0], 'repeatable read')
        else:
            self.assertEqual(cur.fetchone()[0], 'serializable')
        self.conn.rollback()

        self.conn.set_session(
            isolation_level=ext.ISOLATION_LEVEL_READ_COMMITTED)
        cur.execute("SHOW transaction_isolation;")
        self.assertEqual(cur.fetchone()[0], 'read committed')
        self.conn.rollback()

        self.conn.set_session(
            isolation_level=ext.ISOLATION_LEVEL_READ_UNCOMMITTED)
        cur.execute("SHOW transaction_isolation;")
        if self.conn.server_version > 80000:
            self.assertEqual(cur.fetchone()[0], 'read uncommitted')
        else:
            self.assertEqual(cur.fetchone()[0], 'read committed')
        self.conn.rollback()
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def _isolation_lookup(self):
        extensions = self._psycopg2_extensions()
        return {
            'AUTOCOMMIT': extensions.ISOLATION_LEVEL_AUTOCOMMIT,
            'READ COMMITTED': extensions.ISOLATION_LEVEL_READ_COMMITTED,
            'READ UNCOMMITTED': extensions.ISOLATION_LEVEL_READ_UNCOMMITTED,
            'REPEATABLE READ': extensions.ISOLATION_LEVEL_REPEATABLE_READ,
            'SERIALIZABLE': extensions.ISOLATION_LEVEL_SERIALIZABLE
        }
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def _isolation_lookup(self):
        extensions = self._psycopg2_extensions()
        return {
            'AUTOCOMMIT': extensions.ISOLATION_LEVEL_AUTOCOMMIT,
            'READ COMMITTED': extensions.ISOLATION_LEVEL_READ_COMMITTED,
            'READ UNCOMMITTED': extensions.ISOLATION_LEVEL_READ_UNCOMMITTED,
            'REPEATABLE READ': extensions.ISOLATION_LEVEL_REPEATABLE_READ,
            'SERIALIZABLE': extensions.ISOLATION_LEVEL_SERIALIZABLE
        }
项目:Callandtext    作者:iaora    | 项目源码 | 文件源码
def _isolation_lookup(self):
        extensions = self._psycopg2_extensions()
        return {
            'AUTOCOMMIT': extensions.ISOLATION_LEVEL_AUTOCOMMIT,
            'READ COMMITTED': extensions.ISOLATION_LEVEL_READ_COMMITTED,
            'READ UNCOMMITTED': extensions.ISOLATION_LEVEL_READ_UNCOMMITTED,
            'REPEATABLE READ': extensions.ISOLATION_LEVEL_REPEATABLE_READ,
            'SERIALIZABLE': extensions.ISOLATION_LEVEL_SERIALIZABLE
        }