我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用psycopg2.extras()。
def test_unicode(self): cur = self.conn.cursor() ext.register_type(ext.UNICODE, cur) snowman = "\u2603" # unicode in statement psycopg2.extras.execute_batch(cur, "insert into testfast (id, data) values (%%s, %%s) -- %s" % snowman, [(1, 'x')]) cur.execute("select id, data from testfast where id = 1") self.assertEqual(cur.fetchone(), (1, 'x')) # unicode in data psycopg2.extras.execute_batch(cur, "insert into testfast (id, data) values (%s, %s)", [(2, snowman)]) cur.execute("select id, data from testfast where id = 2") self.assertEqual(cur.fetchone(), (2, snowman)) # unicode in both psycopg2.extras.execute_batch(cur, "insert into testfast (id, data) values (%%s, %%s) -- %s" % snowman, [(3, snowman)]) cur.execute("select id, data from testfast where id = 3") self.assertEqual(cur.fetchone(), (3, snowman))
def test_adapt_subclass(self): from psycopg2.extras import json, Json class DecimalEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, Decimal): return float(obj) return json.JSONEncoder.default(self, obj) class MyJson(Json): def dumps(self, obj): return json.dumps(obj, cls=DecimalEncoder) curs = self.conn.cursor() obj = Decimal('123.45') self.assertEqual(curs.mogrify("%s", (MyJson(obj),)), b"'123.45'")
def test_adapt_dumps(self): from psycopg2.extras import json, Json class DecimalEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, Decimal): return float(obj) return json.JSONEncoder.default(self, obj) curs = self.conn.cursor() obj = Decimal('123.45') def dumps(obj): return json.dumps(obj, cls=DecimalEncoder) self.assertEqual(curs.mogrify("%s", (Json(obj, dumps=dumps),)), b"'123.45'")
def test_unicode(self): cur = self.conn.cursor() ext.register_type(ext.UNICODE, cur) snowman = "\u2603" # unicode in statement psycopg2.extras.execute_values(cur, "insert into testfast (id, data) values %%s -- %s" % snowman, [(1, 'x')]) cur.execute("select id, data from testfast where id = 1") self.assertEqual(cur.fetchone(), (1, 'x')) # unicode in data psycopg2.extras.execute_values(cur, "insert into testfast (id, data) values %s", [(2, snowman)]) cur.execute("select id, data from testfast where id = 2") self.assertEqual(cur.fetchone(), (2, snowman)) # unicode in both psycopg2.extras.execute_values(cur, "insert into testfast (id, data) values %%s -- %s" % snowman, [(3, snowman)]) cur.execute("select id, data from testfast where id = 3") self.assertEqual(cur.fetchone(), (3, snowman))
def connectToDB(dbName=None, userName=None, dbPassword=None, dbHost=None, dbPort=None, dbCursor=psycopg2.extras.DictCursor): ''' Connect to a specified PostgreSQL DB and return connection and cursor objects. ''' # Start DB connection try: connectionString = "dbname='" + dbName + "'" if userName != None and userName != '': connectionString += " user='" + userName + "'" if dbHost != None and dbHost != '': connectionString += " host='" + dbHost + "'" if dbPassword != None and dbPassword != '': connectionString += " password='" + dbPassword + "'" if dbPort != None: connectionString += " port='" + str(dbPort) + "'" connection = psycopg2.connect(connectionString) register_adapter(numpy.float64, addapt_numpy_float64) register_adapter(numpy.int64, addapt_numpy_int64) except: raise # if the connection succeeded get a cursor cursor = connection.cursor(cursor_factory=dbCursor) return connection, cursor
def test_cursor_factory(self): self.assertEqual(self.conn.cursor_factory, None) cur = self.conn.cursor() cur.execute("select 1 as a") self.assertRaises(TypeError, (lambda r: r['a']), cur.fetchone()) self.conn.cursor_factory = psycopg2.extras.DictCursor self.assertEqual(self.conn.cursor_factory, psycopg2.extras.DictCursor) cur = self.conn.cursor() cur.execute("select 1 as a") self.assertEqual(cur.fetchone()['a'], 1) self.conn.cursor_factory = None self.assertEqual(self.conn.cursor_factory, None) cur = self.conn.cursor() cur.execute("select 1 as a") self.assertRaises(TypeError, (lambda r: r['a']), cur.fetchone())
def test_inet_cast(self): import ipaddress as ip cur = self.conn.cursor() psycopg2.extras.register_ipaddress(cur) cur.execute("select null::inet") self.assertTrue(cur.fetchone()[0] is None) cur.execute("select '127.0.0.1/24'::inet") obj = cur.fetchone()[0] self.assertTrue(isinstance(obj, ip.IPv4Interface), repr(obj)) self.assertEqual(obj, ip.ip_interface('127.0.0.1/24')) cur.execute("select '::ffff:102:300/128'::inet") obj = cur.fetchone()[0] self.assertTrue(isinstance(obj, ip.IPv6Interface), repr(obj)) self.assertEqual(obj, ip.ip_interface('::ffff:102:300/128'))
def test_cidr_cast(self): import ipaddress as ip cur = self.conn.cursor() psycopg2.extras.register_ipaddress(cur) cur.execute("select null::cidr") self.assertTrue(cur.fetchone()[0] is None) cur.execute("select '127.0.0.0/24'::cidr") obj = cur.fetchone()[0] self.assertTrue(isinstance(obj, ip.IPv4Network), repr(obj)) self.assertEqual(obj, ip.ip_network('127.0.0.0/24')) cur.execute("select '::ffff:102:300/128'::cidr") obj = cur.fetchone()[0] self.assertTrue(isinstance(obj, ip.IPv6Network), repr(obj)) self.assertEqual(obj, ip.ip_network('::ffff:102:300/128'))
def testUUIDARRAY(self): import uuid psycopg2.extras.register_uuid() u = [uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350'), uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e352')] s = self.execute("SELECT %s AS foo", (u,)) self.assertTrue(u == s) # array with a NULL element u = [uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350'), None] s = self.execute("SELECT %s AS foo", (u,)) self.assertTrue(u == s) # must survive NULL cast to a uuid[] s = self.execute("SELECT NULL::uuid[] AS foo") self.assertTrue(s is None) # what about empty arrays? s = self.execute("SELECT '{}'::uuid[] AS foo") self.assertTrue(type(s) == list and len(s) == 0)
def test_adapt_8(self): if self.conn.server_version >= 90000: return self.skipTest("skipping dict adaptation with PG pre-9 syntax") from psycopg2.extras import HstoreAdapter o = {'a': '1', 'b': "'", 'c': None} if self.conn.encoding == 'UTF8': o['d'] = '\xe0' a = HstoreAdapter(o) a.prepare(self.conn) q = a.getquoted() self.assertTrue(q.startswith(b"(("), q) ii = q[1:-1].split(b"||") ii.sort() self.assertEqual(len(ii), len(o)) self.assertQuotedEqual(ii[0], b"('a' => '1')") self.assertQuotedEqual(ii[1], b"('b' => '''')") self.assertQuotedEqual(ii[2], b"('c' => NULL)") if 'd' in o: encc = '\xe0'.encode(psycopg2.extensions.encodings[self.conn.encoding]) self.assertQuotedEqual(ii[3], b"('d' => '" + encc + b"')")
def test_register_globally(self): from psycopg2.extras import register_hstore, HstoreAdapter oids = HstoreAdapter.get_oids(self.conn) try: register_hstore(self.conn, globally=True) conn2 = self.connect() try: cur2 = self.conn.cursor() cur2.execute("select 'a => b'::hstore") r = cur2.fetchone() self.assertTrue(isinstance(r[0], dict)) finally: conn2.close() finally: psycopg2.extensions.string_types.pop(oids[0][0]) # verify the caster is not around anymore cur = self.conn.cursor() cur.execute("select 'a => b'::hstore") r = cur.fetchone() self.assertTrue(isinstance(r[0], str))
def test_roundtrip_unicode(self): from psycopg2.extras import register_hstore register_hstore(self.conn, str=True) cur = self.conn.cursor() def ok(d): cur.execute("select %s", (d,)) d1 = cur.fetchone()[0] self.assertEqual(len(d), len(d1)) for k, v in d1.items(): self.assertTrue(k in d, k) self.assertEqual(d[k], v) self.assertTrue(isinstance(k, str)) self.assertTrue(v is None or isinstance(v, str)) ok({}) ok({'a': 'b', 'c': None, 'd': '\u20ac', '\u2603': 'e'}) ab = list(map(chr, list(range(1, 1024)))) ok({''.join(ab): ''.join(ab)}) ok(dict(list(zip(ab, ab))))
def test_oid(self): cur = self.conn.cursor() cur.execute("select 'hstore'::regtype::oid") oid = cur.fetchone()[0] # Note: None as conn_or_cursor is just for testing: not public # interface and it may break in future. from psycopg2.extras import register_hstore register_hstore(None, globally=True, oid=oid) try: cur.execute("select null::hstore, ''::hstore, 'a => b'::hstore") t = cur.fetchone() self.assertTrue(t[0] is None) self.assertEqual(t[1], {}) self.assertEqual(t[2], {'a': 'b'}) finally: psycopg2.extensions.string_types.pop(oid)
def test_array_cast_oid(self): cur = self.conn.cursor() cur.execute("select 'hstore'::regtype::oid, 'hstore[]'::regtype::oid") oid, aoid = cur.fetchone() from psycopg2.extras import register_hstore register_hstore(None, globally=True, oid=oid, array_oid=aoid) try: cur.execute(""" select null::hstore, ''::hstore, 'a => b'::hstore, '{a=>b}'::hstore[]""") t = cur.fetchone() self.assertTrue(t[0] is None) self.assertEqual(t[1], {}) self.assertEqual(t[2], {'a': 'b'}) self.assertEqual(t[3], [{'a': 'b'}]) finally: psycopg2.extensions.string_types.pop(oid) psycopg2.extensions.string_types.pop(aoid)
def test_non_dbapi_connection(self): from psycopg2.extras import RealDictConnection from psycopg2.extras import register_hstore conn = self.connect(connection_factory=RealDictConnection) try: register_hstore(conn) curs = conn.cursor() curs.execute("select ''::hstore as x") self.assertEqual(curs.fetchone()['x'], {}) finally: conn.close() conn = self.connect(connection_factory=RealDictConnection) try: curs = conn.cursor() register_hstore(curs) curs.execute("select ''::hstore as x") self.assertEqual(curs.fetchone()['x'], {}) finally: conn.close()
def test_cast_nested(self): self._create_type("type_is", [("anint", "integer"), ("astring", "text")]) self._create_type("type_r_dt", [("adate", "date"), ("apair", "type_is")]) self._create_type("type_r_ft", [("afloat", "float8"), ("anotherpair", "type_r_dt")]) psycopg2.extras.register_composite("type_is", self.conn) psycopg2.extras.register_composite("type_r_dt", self.conn) psycopg2.extras.register_composite("type_r_ft", self.conn) curs = self.conn.cursor() r = (0.25, (date(2011, 1, 2), (42, "hello"))) curs.execute("select %s::type_r_ft;", (r,)) v = curs.fetchone()[0] self.assertEqual(r, v) try: from collections import namedtuple # noqa except ImportError: pass else: self.assertEqual(v.anotherpair.apair.astring, "hello")
def test_register_on_connection(self): self._create_type("type_ii", [("a", "integer"), ("b", "integer")]) conn1 = self.connect() conn2 = self.connect() try: psycopg2.extras.register_composite("type_ii", conn1) curs1 = conn1.cursor() curs2 = conn2.cursor() curs1.execute("select (1,2)::type_ii") self.assertEqual(curs1.fetchone()[0], (1, 2)) curs2.execute("select (1,2)::type_ii") self.assertEqual(curs2.fetchone()[0], "(1,2)") finally: conn1.close() conn2.close()
def test_register_globally(self): self._create_type("type_ii", [("a", "integer"), ("b", "integer")]) conn1 = self.connect() conn2 = self.connect() try: t = psycopg2.extras.register_composite("type_ii", conn1, globally=True) try: curs1 = conn1.cursor() curs2 = conn2.cursor() curs1.execute("select (1,2)::type_ii") self.assertEqual(curs1.fetchone()[0], (1, 2)) curs2.execute("select (1,2)::type_ii") self.assertEqual(curs2.fetchone()[0], (1, 2)) finally: # drop the registered typecasters to help the refcounting # script to return precise values. del psycopg2.extensions.string_types[t.typecaster.values[0]] if t.array_typecaster: del psycopg2.extensions.string_types[ t.array_typecaster.values[0]] finally: conn1.close() conn2.close()
def test_composite_namespace(self): curs = self.conn.cursor() curs.execute(""" select nspname from pg_namespace where nspname = 'typens'; """) if not curs.fetchone(): curs.execute("create schema typens;") self.conn.commit() self._create_type("typens.typens_ii", [("a", "integer"), ("b", "integer")]) t = psycopg2.extras.register_composite( "typens.typens_ii", self.conn) self.assertEqual(t.schema, 'typens') curs.execute("select (4,8)::typens.typens_ii") self.assertEqual(curs.fetchone()[0], (4, 8))
def test_non_dbapi_connection(self): from psycopg2.extras import RealDictConnection from psycopg2.extras import register_composite self._create_type("type_ii", [("a", "integer"), ("b", "integer")]) conn = self.connect(connection_factory=RealDictConnection) try: register_composite('type_ii', conn) curs = conn.cursor() curs.execute("select '(1,2)'::type_ii as x") self.assertEqual(curs.fetchone()['x'], (1, 2)) finally: conn.close() conn = self.connect(connection_factory=RealDictConnection) try: curs = conn.cursor() register_composite('type_ii', conn) curs.execute("select '(1,2)'::type_ii as x") self.assertEqual(curs.fetchone()['x'], (1, 2)) finally: conn.close()
def test_subclass(self): oid = self._create_type("type_isd", [('anint', 'integer'), ('astring', 'text'), ('adate', 'date')]) from psycopg2.extras import register_composite, CompositeCaster class DictComposite(CompositeCaster): def make(self, values): return dict(list(zip(self.attnames, values))) t = register_composite('type_isd', self.conn, factory=DictComposite) self.assertEqual(t.name, 'type_isd') self.assertEqual(t.oid, oid) curs = self.conn.cursor() r = (10, 'hello', date(2011, 1, 2)) curs.execute("select %s::type_isd;", (r,)) v = curs.fetchone()[0] self.assertTrue(isinstance(v, dict)) self.assertEqual(v['anint'], 10) self.assertEqual(v['astring'], "hello") self.assertEqual(v['adate'], date(2011, 1, 2))
def test_register_default(self): curs = self.conn.cursor() def loads(s): return psycopg2.extras.json.loads(s, parse_float=Decimal) psycopg2.extras.register_default_json(curs, loads=loads) curs.execute("""select '{"a": 100.0, "b": null}'::json""") data = curs.fetchone()[0] self.assertTrue(isinstance(data['a'], Decimal)) self.assertEqual(data['a'], Decimal('100.0')) curs.execute("""select array['{"a": 100.0, "b": null}']::json[]""") data = curs.fetchone()[0] self.assertTrue(isinstance(data[0]['a'], Decimal)) self.assertEqual(data[0]['a'], Decimal('100.0'))
def test_register_globally(self): old = psycopg2.extensions.string_types.get(3802) olda = psycopg2.extensions.string_types.get(3807) try: new, newa = psycopg2.extras.register_json(self.conn, loads=self.myloads, globally=True, name='jsonb') curs = self.conn.cursor() curs.execute("""select '{"a": 100.0, "b": null}'::jsonb""") self.assertEqual(curs.fetchone()[0], {'a': 100.0, 'b': None, 'test': 1}) finally: psycopg2.extensions.string_types.pop(new.values[0]) psycopg2.extensions.string_types.pop(newa.values[0]) if old: psycopg2.extensions.register_type(old) if olda: psycopg2.extensions.register_type(olda)
def test_loads(self): json = psycopg2.extras.json def loads(s): return json.loads(s, parse_float=Decimal) psycopg2.extras.register_json(self.conn, loads=loads, name='jsonb') curs = self.conn.cursor() curs.execute("""select '{"a": 100.0, "b": null}'::jsonb""") data = curs.fetchone()[0] self.assertTrue(isinstance(data['a'], Decimal)) self.assertEqual(data['a'], Decimal('100.0')) # sure we are not manling json too? curs.execute("""select '{"a": 100.0, "b": null}'::json""") data = curs.fetchone()[0] self.assertTrue(isinstance(data['a'], float)) self.assertEqual(data['a'], 100.0)
def test_register_default(self): curs = self.conn.cursor() def loads(s): return psycopg2.extras.json.loads(s, parse_float=Decimal) psycopg2.extras.register_default_jsonb(curs, loads=loads) curs.execute("""select '{"a": 100.0, "b": null}'::jsonb""") data = curs.fetchone()[0] self.assertTrue(isinstance(data['a'], Decimal)) self.assertEqual(data['a'], Decimal('100.0')) curs.execute("""select array['{"a": 100.0, "b": null}']::jsonb[]""") data = curs.fetchone()[0] self.assertTrue(isinstance(data[0]['a'], Decimal)) self.assertEqual(data[0]['a'], Decimal('100.0'))
def test_keywords(self): from psycopg2.extras import Range r = Range(upper=20) self.assertEqual(r.lower, None) self.assertEqual(r.upper, 20) self.assertTrue(not r.isempty) self.assertTrue(r.lower_inf) self.assertTrue(not r.upper_inf) self.assertTrue(not r.lower_inc) self.assertTrue(not r.upper_inc) r = Range(lower=10, bounds='(]') self.assertEqual(r.lower, 10) self.assertEqual(r.upper, None) self.assertTrue(not r.isempty) self.assertTrue(not r.lower_inf) self.assertTrue(r.upper_inf) self.assertTrue(not r.lower_inc) self.assertTrue(not r.upper_inc)
def test_cast_numbers(self): from psycopg2.extras import NumericRange cur = self.conn.cursor() for type in ('int4range', 'int8range'): cur.execute("select '(10,20)'::%s" % type) r = cur.fetchone()[0] self.assertTrue(isinstance(r, NumericRange)) self.assertTrue(not r.isempty) self.assertEqual(r.lower, 11) self.assertEqual(r.upper, 20) self.assertTrue(not r.lower_inf) self.assertTrue(not r.upper_inf) self.assertTrue(r.lower_inc) self.assertTrue(not r.upper_inc) cur.execute("select '(10.2,20.6)'::numrange") r = cur.fetchone()[0] self.assertTrue(isinstance(r, NumericRange)) self.assertTrue(not r.isempty) self.assertEqual(r.lower, Decimal('10.2')) self.assertEqual(r.upper, Decimal('20.6')) self.assertTrue(not r.lower_inf) self.assertTrue(not r.upper_inf) self.assertTrue(not r.lower_inc) self.assertTrue(not r.upper_inc)
def test_cast_timestamptz(self): from psycopg2.extras import DateTimeTZRange from psycopg2.tz import FixedOffsetTimezone cur = self.conn.cursor() ts1 = datetime(2000, 1, 1, tzinfo=FixedOffsetTimezone(600)) ts2 = datetime(2000, 12, 31, 23, 59, 59, 999, tzinfo=FixedOffsetTimezone(600)) cur.execute("select tstzrange(%s, %s, '[]')", (ts1, ts2)) r = cur.fetchone()[0] self.assertTrue(isinstance(r, DateTimeTZRange)) self.assertTrue(not r.isempty) self.assertEqual(r.lower, ts1) self.assertEqual(r.upper, ts2) self.assertTrue(not r.lower_inf) self.assertTrue(not r.upper_inf) self.assertTrue(r.lower_inc) self.assertTrue(r.upper_inc)
def setUp(self): ConnectingTestCase.setUp(self) from psycopg2.extras import NamedTupleConnection try: from collections import namedtuple # noqa except ImportError: return self.conn = self.connect(connection_factory=NamedTupleConnection) curs = self.conn.cursor() curs.execute("CREATE TEMPORARY TABLE nttest (i int, s text)") curs.execute("INSERT INTO nttest VALUES (1, 'foo')") curs.execute("INSERT INTO nttest VALUES (2, 'bar')") curs.execute("INSERT INTO nttest VALUES (3, 'baz')") self.conn.commit()
def test_error_message(self): try: from collections import namedtuple # noqa except ImportError: # an import error somewhere from psycopg2.extras import NamedTupleConnection try: self.conn = self.connect( connection_factory=NamedTupleConnection) curs = self.conn.cursor() curs.execute("select 1") curs.fetchone() except ImportError: pass else: self.fail("expecting ImportError") else: return self.skipTest("namedtuple available")
def test_inet_cast(self): import ipaddress as ip cur = self.conn.cursor() psycopg2.extras.register_ipaddress(cur) cur.execute("select null::inet") self.assert_(cur.fetchone()[0] is None) cur.execute("select '127.0.0.1/24'::inet") obj = cur.fetchone()[0] self.assert_(isinstance(obj, ip.IPv4Interface), repr(obj)) self.assertEquals(obj, ip.ip_interface('127.0.0.1/24')) cur.execute("select '::ffff:102:300/128'::inet") obj = cur.fetchone()[0] self.assert_(isinstance(obj, ip.IPv6Interface), repr(obj)) self.assertEquals(obj, ip.ip_interface('::ffff:102:300/128'))
def test_cidr_cast(self): import ipaddress as ip cur = self.conn.cursor() psycopg2.extras.register_ipaddress(cur) cur.execute("select null::cidr") self.assert_(cur.fetchone()[0] is None) cur.execute("select '127.0.0.0/24'::cidr") obj = cur.fetchone()[0] self.assert_(isinstance(obj, ip.IPv4Network), repr(obj)) self.assertEquals(obj, ip.ip_network('127.0.0.0/24')) cur.execute("select '::ffff:102:300/128'::cidr") obj = cur.fetchone()[0] self.assert_(isinstance(obj, ip.IPv6Network), repr(obj)) self.assertEquals(obj, ip.ip_network('::ffff:102:300/128'))
def testUUIDARRAY(self): import uuid psycopg2.extras.register_uuid() u = [uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350'), uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e352')] s = self.execute("SELECT %s AS foo", (u,)) self.failUnless(u == s) # array with a NULL element u = [uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350'), None] s = self.execute("SELECT %s AS foo", (u,)) self.failUnless(u == s) # must survive NULL cast to a uuid[] s = self.execute("SELECT NULL::uuid[] AS foo") self.failUnless(s is None) # what about empty arrays? s = self.execute("SELECT '{}'::uuid[] AS foo") self.failUnless(type(s) == list and len(s) == 0)
def test_adapt_8(self): if self.conn.server_version >= 90000: return self.skipTest("skipping dict adaptation with PG pre-9 syntax") from psycopg2.extras import HstoreAdapter o = {'a': '1', 'b': "'", 'c': None} if self.conn.encoding == 'UTF8': o['d'] = u'\xe0' a = HstoreAdapter(o) a.prepare(self.conn) q = a.getquoted() self.assert_(q.startswith(b"(("), q) ii = q[1:-1].split(b"||") ii.sort() self.assertEqual(len(ii), len(o)) self.assertQuotedEqual(ii[0], b"('a' => '1')") self.assertQuotedEqual(ii[1], b"('b' => '''')") self.assertQuotedEqual(ii[2], b"('c' => NULL)") if 'd' in o: encc = u'\xe0'.encode(psycopg2.extensions.encodings[self.conn.encoding]) self.assertQuotedEqual(ii[3], b"('d' => '" + encc + b"')")
def test_register_globally(self): from psycopg2.extras import register_hstore, HstoreAdapter oids = HstoreAdapter.get_oids(self.conn) try: register_hstore(self.conn, globally=True) conn2 = self.connect() try: cur2 = self.conn.cursor() cur2.execute("select 'a => b'::hstore") r = cur2.fetchone() self.assert_(isinstance(r[0], dict)) finally: conn2.close() finally: psycopg2.extensions.string_types.pop(oids[0][0]) # verify the caster is not around anymore cur = self.conn.cursor() cur.execute("select 'a => b'::hstore") r = cur.fetchone() self.assert_(isinstance(r[0], str))
def test_roundtrip_unicode(self): from psycopg2.extras import register_hstore register_hstore(self.conn, unicode=True) cur = self.conn.cursor() def ok(d): cur.execute("select %s", (d,)) d1 = cur.fetchone()[0] self.assertEqual(len(d), len(d1)) for k, v in d1.iteritems(): self.assert_(k in d, k) self.assertEqual(d[k], v) self.assert_(isinstance(k, unicode)) self.assert_(v is None or isinstance(v, unicode)) ok({}) ok({'a': 'b', 'c': None, 'd': u'\u20ac', u'\u2603': 'e'}) ab = map(unichr, range(1, 1024)) ok({u''.join(ab): u''.join(ab)}) ok(dict(zip(ab, ab)))
def test_oid(self): cur = self.conn.cursor() cur.execute("select 'hstore'::regtype::oid") oid = cur.fetchone()[0] # Note: None as conn_or_cursor is just for testing: not public # interface and it may break in future. from psycopg2.extras import register_hstore register_hstore(None, globally=True, oid=oid) try: cur.execute("select null::hstore, ''::hstore, 'a => b'::hstore") t = cur.fetchone() self.assert_(t[0] is None) self.assertEqual(t[1], {}) self.assertEqual(t[2], {'a': 'b'}) finally: psycopg2.extensions.string_types.pop(oid)
def test_roundtrip_array(self): from psycopg2.extras import register_hstore register_hstore(self.conn) ds = [] ds.append({}) ds.append({'a': 'b', 'c': None}) ab = map(chr, range(32, 128)) ds.append(dict(zip(ab, ab))) ds.append({''.join(ab): ''.join(ab)}) self.conn.set_client_encoding('latin1') if sys.version_info[0] < 3: ab = map(chr, range(32, 127) + range(160, 255)) else: ab = bytes(range(32, 127) + range(160, 255)).decode('latin1') ds.append({''.join(ab): ''.join(ab)}) ds.append(dict(zip(ab, ab))) cur = self.conn.cursor() cur.execute("select %s", (ds,)) ds1 = cur.fetchone()[0] self.assertEqual(ds, ds1)
def test_array_cast_oid(self): cur = self.conn.cursor() cur.execute("select 'hstore'::regtype::oid, 'hstore[]'::regtype::oid") oid, aoid = cur.fetchone() from psycopg2.extras import register_hstore register_hstore(None, globally=True, oid=oid, array_oid=aoid) try: cur.execute(""" select null::hstore, ''::hstore, 'a => b'::hstore, '{a=>b}'::hstore[]""") t = cur.fetchone() self.assert_(t[0] is None) self.assertEqual(t[1], {}) self.assertEqual(t[2], {'a': 'b'}) self.assertEqual(t[3], [{'a': 'b'}]) finally: psycopg2.extensions.string_types.pop(oid) psycopg2.extensions.string_types.pop(aoid)
def test_empty_string(self): # issue #141 self._create_type("type_ss", [('s1', 'text'), ('s2', 'text')]) curs = self.conn.cursor() psycopg2.extras.register_composite("type_ss", curs) def ok(t): curs.execute("select %s::type_ss", (t,)) rv = curs.fetchone()[0] self.assertEqual(t, rv) ok(('a', 'b')) ok(('a', '')) ok(('', 'b')) ok(('a', None)) ok((None, 'b')) ok(('', '')) ok((None, None))
def test_composite_array(self): self._create_type("type_isd", [('anint', 'integer'), ('astring', 'text'), ('adate', 'date')]) t = psycopg2.extras.register_composite("type_isd", self.conn) curs = self.conn.cursor() r1 = (10, 'hello', date(2011, 1, 2)) r2 = (20, 'world', date(2011, 1, 3)) curs.execute("select %s::type_isd[];", ([r1, r2],)) v = curs.fetchone()[0] self.assertEqual(len(v), 2) self.assert_(isinstance(v[0], t.type)) self.assertEqual(v[0][0], 10) self.assertEqual(v[0][1], "hello") self.assertEqual(v[0][2], date(2011, 1, 2)) self.assert_(isinstance(v[1], t.type)) self.assertEqual(v[1][0], 20) self.assertEqual(v[1][1], "world") self.assertEqual(v[1][2], date(2011, 1, 3))
def test_subclass(self): oid = self._create_type("type_isd", [('anint', 'integer'), ('astring', 'text'), ('adate', 'date')]) from psycopg2.extras import register_composite, CompositeCaster class DictComposite(CompositeCaster): def make(self, values): return dict(zip(self.attnames, values)) t = register_composite('type_isd', self.conn, factory=DictComposite) self.assertEqual(t.name, 'type_isd') self.assertEqual(t.oid, oid) curs = self.conn.cursor() r = (10, 'hello', date(2011, 1, 2)) curs.execute("select %s::type_isd;", (r,)) v = curs.fetchone()[0] self.assert_(isinstance(v, dict)) self.assertEqual(v['anint'], 10) self.assertEqual(v['astring'], "hello") self.assertEqual(v['adate'], date(2011, 1, 2))