我们从Python开源项目中,提取了以下25个代码示例,用于说明如何使用psycopg2.extras.register_default_json()。
def on_connect(self): extras = self._psycopg2_extras() extensions = self._psycopg2_extensions() fns = [] if self.client_encoding is not None: def on_connect(conn): conn.set_client_encoding(self.client_encoding) fns.append(on_connect) if self.isolation_level is not None: def on_connect(conn): self.set_isolation_level(conn, self.isolation_level) fns.append(on_connect) if self.dbapi and self.use_native_uuid: def on_connect(conn): extras.register_uuid(None, conn) fns.append(on_connect) if self.dbapi and self.use_native_unicode: def on_connect(conn): extensions.register_type(extensions.UNICODE, conn) extensions.register_type(extensions.UNICODEARRAY, conn) fns.append(on_connect) if self.dbapi and self.use_native_hstore: def on_connect(conn): hstore_oids = self._hstore_oids(conn) if hstore_oids is not None: oid, array_oid = hstore_oids kw = {'oid': oid} if util.py2k: kw['unicode'] = True if self.psycopg2_version >= \ self.FEATURE_VERSION_MAP['array_oid']: kw['array_oid'] = array_oid extras.register_hstore(conn, **kw) fns.append(on_connect) if self.dbapi and self._json_deserializer: def on_connect(conn): if self._has_native_json: extras.register_default_json( conn, loads=self._json_deserializer) if self._has_native_jsonb: extras.register_default_jsonb( conn, loads=self._json_deserializer) fns.append(on_connect) if fns: def on_connect(conn): for fn in fns: fn(conn) return on_connect else: return None
def on_connect(self): from psycopg2 import extras, extensions fns = [] if self.client_encoding is not None: def on_connect(conn): conn.set_client_encoding(self.client_encoding) fns.append(on_connect) if self.isolation_level is not None: def on_connect(conn): self.set_isolation_level(conn, self.isolation_level) fns.append(on_connect) if self.dbapi and self.use_native_uuid: def on_connect(conn): extras.register_uuid(None, conn) fns.append(on_connect) if self.dbapi and self.use_native_unicode: def on_connect(conn): extensions.register_type(extensions.UNICODE, conn) extensions.register_type(extensions.UNICODEARRAY, conn) fns.append(on_connect) if self.dbapi and self.use_native_hstore: def on_connect(conn): hstore_oids = self._hstore_oids(conn) if hstore_oids is not None: oid, array_oid = hstore_oids kw = {'oid': oid} if util.py2k: kw['unicode'] = True if self.psycopg2_version >= (2, 4, 3): kw['array_oid'] = array_oid extras.register_hstore(conn, **kw) fns.append(on_connect) if self.dbapi and self._json_deserializer: def on_connect(conn): if self._has_native_json: extras.register_default_json( conn, loads=self._json_deserializer) if self._has_native_jsonb: extras.register_default_jsonb( conn, loads=self._json_deserializer) fns.append(on_connect) if fns: def on_connect(conn): for fn in fns: fn(conn) return on_connect else: return None
def on_connect(self): from psycopg2 import extras, extensions fns = [] if self.client_encoding is not None: def on_connect(conn): conn.set_client_encoding(self.client_encoding) fns.append(on_connect) if self.isolation_level is not None: def on_connect(conn): self.set_isolation_level(conn, self.isolation_level) fns.append(on_connect) if self.dbapi and self.use_native_unicode: def on_connect(conn): extensions.register_type(extensions.UNICODE, conn) extensions.register_type(extensions.UNICODEARRAY, conn) fns.append(on_connect) if self.dbapi and self.use_native_hstore: def on_connect(conn): hstore_oids = self._hstore_oids(conn) if hstore_oids is not None: oid, array_oid = hstore_oids if util.py2k: extras.register_hstore(conn, oid=oid, array_oid=array_oid, unicode=True) else: extras.register_hstore(conn, oid=oid, array_oid=array_oid) fns.append(on_connect) if self.dbapi and self._json_deserializer: def on_connect(conn): extras.register_default_json( conn, loads=self._json_deserializer) fns.append(on_connect) if fns: def on_connect(conn): for fn in fns: fn(conn) return on_connect else: return None