我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用psycopg2.InterfaceError()。
def execute_read_query(self, query: str, args: tuple = ()) -> (list, list): """ results are returned as a list of dicts""" if not self.__conn: raise psycopg2.InterfaceError("null connection") rows = None try: with self.__conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as curs: curs.execute(query, args) rows = curs.fetchall() except Exception as e: tb.print_exc() raise(e) if len(rows) == 0: return [] return rows
def execute_write_query(self, query: str, args: tuple = ()) -> int: """ perform a write query query -- the query string with optional %s placeholders args -- optional tuple of values for %s placeholders returns number of rows modified """ if not self.__conn: raise psycopg2.InterfaceError("null connection") try: with self.__conn.cursor() as curs: curs.execute(query, args) except Exception as e: tb.print_exc() raise(e) return curs.rowcount
def test_isolation_level_closed(self): cnn = self.connect() cnn.close() self.assertRaises(psycopg2.InterfaceError, cnn.set_isolation_level, 0) self.assertRaises(psycopg2.InterfaceError, cnn.set_isolation_level, 1)
def test_closed(self): self.conn.close() self.assertRaises(psycopg2.InterfaceError, self.conn.set_session, ext.ISOLATION_LEVEL_SERIALIZABLE)
def test_closed(self): self.conn.close() self.assertRaises(psycopg2.InterfaceError, setattr, self.conn, 'autocommit', True) # The getter doesn't have a guard. We may change this in future # to make it consistent with other methods; meanwhile let's just check # it doesn't explode. try: self.assertTrue(self.conn.autocommit in (True, False)) except psycopg2.InterfaceError: pass
def test_with_closed(self): def f(): with self.conn: pass self.conn.close() self.assertRaises(psycopg2.InterfaceError, f)
def test_parse(self): from psycopg2.extras import HstoreAdapter def ok(s, d): self.assertEqual(HstoreAdapter.parse(s, None), d) ok(None, None) ok('', {}) ok('"a"=>"1", "b"=>"2"', {'a': '1', 'b': '2'}) ok('"a" => "1" , "b" => "2"', {'a': '1', 'b': '2'}) ok('"a"=>NULL, "b"=>"2"', {'a': None, 'b': '2'}) ok(r'"a"=>"\"", "\""=>"2"', {'a': '"', '"': '2'}) ok('"a"=>"\'", "\'"=>"2"', {'a': "'", "'": '2'}) ok('"a"=>"1", "b"=>NULL', {'a': '1', 'b': None}) ok(r'"a\\"=>"1"', {'a\\': '1'}) ok(r'"a\""=>"1"', {'a"': '1'}) ok(r'"a\\\""=>"1"', {r'a\"': '1'}) ok(r'"a\\\\\""=>"1"', {r'a\\"': '1'}) def ko(s): self.assertRaises(psycopg2.InterfaceError, HstoreAdapter.parse, s, None) ko('a') ko('"a"') ko(r'"a\\""=>"1"') ko(r'"a\\\\""=>"1"') ko('"a=>"1"') ko('"a"=>"1", "b"=>NUL')
def test_async_cursor_gone(self): import gc cur = self.conn.cursor() cur.execute("select 42;") del cur gc.collect() self.assertRaises(psycopg2.InterfaceError, self.wait, self.conn) # The connection is still usable cur = self.conn.cursor() cur.execute("select 42;") self.wait(self.conn) self.assertEqual(cur.fetchone(), (42,))
def test_write_after_close(self): lo = self.conn.lobject() lo.close() self.assertRaises(psycopg2.InterfaceError, lo.write, b"some data")
def test_read_after_close(self): lo = self.conn.lobject() lo.close() self.assertRaises(psycopg2.InterfaceError, lo.read, 5)
def test_seek_after_close(self): lo = self.conn.lobject() lo.close() self.assertRaises(psycopg2.InterfaceError, lo.seek, 0)
def test_tell_after_close(self): lo = self.conn.lobject() lo.close() self.assertRaises(psycopg2.InterfaceError, lo.tell)
def test_seek_larger_than_2gb(self): lo = self.conn.lobject() offset = 1 << 32 # 4gb self.assertRaises( (OverflowError, psycopg2.InterfaceError, psycopg2.NotSupportedError), lo.seek, offset, 0)
def test_truncate_larger_than_2gb(self): lo = self.conn.lobject() length = 1 << 32 # 4gb self.assertRaises( (OverflowError, psycopg2.InterfaceError, psycopg2.NotSupportedError), lo.truncate, length)
def parse(self, s, cur, _bsdec=_re.compile(r"\\(.)")): """Parse an hstore representation in a Python string. The hstore is represented as something like:: "a"=>"1", "b"=>"2" with backslash-escaped strings. """ if s is None: return None rv = {} start = 0 for m in self._re_hstore.finditer(s): if m is None or m.start() != start: raise psycopg2.InterfaceError( "error parsing hstore pair at char %d" % start) k = _bsdec.sub(r'\1', m.group(1)) v = m.group(2) if v is not None: v = _bsdec.sub(r'\1', v) rv[k] = v start = m.end() if start < len(s): raise psycopg2.InterfaceError( "error parsing hstore: unparsed data after char %d" % start) return rv
def tokenize(self, s): rv = [] for m in self._re_tokenize.finditer(s): if m is None: raise psycopg2.InterfaceError("can't parse type: %r" % s) if m.group(1) is not None: rv.append(None) elif m.group(2) is not None: rv.append(self._re_undouble.sub(r"\1", m.group(2))) else: rv.append(m.group(3)) return rv
def test_closed(self): self.conn.close() self.assertRaises(psycopg2.InterfaceError, setattr, self.conn, 'autocommit', True) # The getter doesn't have a guard. We may change this in future # to make it consistent with other methods; meanwhile let's just check # it doesn't explode. try: self.assert_(self.conn.autocommit in (True, False)) except psycopg2.InterfaceError: pass
def test_bad_subclass(self): # check that we get an error message instead of a segfault # for badly written subclasses. # see http://stackoverflow.com/questions/22019341/ class StupidCursor(psycopg2.extensions.cursor): def __init__(self, *args, **kwargs): # I am stupid so not calling superclass init pass cur = StupidCursor() self.assertRaises(psycopg2.InterfaceError, cur.execute, 'select 1') self.assertRaises(psycopg2.InterfaceError, cur.executemany, 'select 1', [])
def test_truncate_after_close(self): lo = self.conn.lobject() lo.close() self.assertRaises(psycopg2.InterfaceError, lo.truncate)
def update_exact(self, table_name: str, conditions: dict, newvalues: dict) -> int: """ updates rows that match the given equality conditions with new values. both conditions and newvalues are dictionaries. kv pairs in conditions are translated into 'column = value' conditions and kv pairs in newvalues are used in the SET clause returns the number of rows updated """ if not self.__conn: raise psycopg2.InterfaceError("null connection") try: pass except Exception as e: tb.print_exc() raise(e)
def insert_one(self, table_name: str, row: dict, retcol: str): """ inserts a single row into the specified table and returns the value of the specified column. row - a dict containing all row values to insert returns: new id """ if not self.__conn or not row: raise psycopg2.InterfaceError("null connection") try: columns = "(" + ",".join(row.keys()) + ")" holders = "(" + ",".join(["%s"] * len(row.keys())) + ")" fillers = tuple(row.values()) query = "insert into {table_name} {columnnames} values {values} returning {retcol}".format(**{ "table_name": table_name, "columnnames": columns, "values": holders, "retcol": retcol }) with self.__conn.cursor() as curs: curs.execute(query, fillers) return curs.fetchone()[0] except Exception as e: tb.print_exc() raise(e)
def insert(self, table_name: str, rows: list) -> int: """ inserts the given data into the specified table. An individual insert statement is generated per row. The query is handled as a prepared statement so multi-inserts are more efficient. table_name -- rows -- list of dicts. each dict representing a row to be inserted returns: the number of rows inserted """ n_inserted = 0 if not self.__conn or not rows: raise psycopg2.InterfaceError("null connection") try: for row in rows: columns = "(" + ",".join(row.keys()) + ")" # "(col1, col2, col3...)" params = ",".join(["%s"] * len(row.keys())) # "%s, %s, %s..." query = "insert into {table_name} {columns} select {params}".format(**{ "table_name": table_name, "columns": columns, "params": params, }) n_inserted += self.execute_write_query(query, tuple(row.values())) return n_inserted except Exception as e: tb.print_exc() raise(e)
def test_isolation_level_closed(self): cnn = self.connect() cnn.close() self.assertRaises(psycopg2.InterfaceError, getattr, cnn, 'isolation_level') self.assertRaises(psycopg2.InterfaceError, cnn.set_isolation_level, 0) self.assertRaises(psycopg2.InterfaceError, cnn.set_isolation_level, 1)
def test_closed(self): self.conn.close() self.assertRaises(psycopg2.InterfaceError, self.conn.set_session, psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)