我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用psycopg2.Binary()。
def __init__(self, obj, geography=False): """ Initialize on the spatial object. """ self.is_geometry = isinstance(obj, (Geometry, PostGISAdapter)) # Getting the WKB (in string form, to allow easy pickling of # the adaptor) and the SRID from the geometry or raster. if self.is_geometry: self.ewkb = bytes(obj.ewkb) self._adapter = Binary(self.ewkb) else: self.ewkb = to_pgraster(obj) self.srid = obj.srid self.geography = geography
def record_to_string(record): type_str = type(u'str') type_datetime = type(datetime.datetime.now()) type_buffer = type(buffer("")) type_None = type(None) type_float = type(0.0) values = [] for item in record: if type(item) == type_None: values.append('NULL') elif type(item) == type_str: values.append("'" + item.replace("'", "''") + "'") elif type(item) == type_datetime: values.append('"' + str(item) + '"') elif type(item) == type_buffer: values.append(psycopg2.Binary(item).getquoted()) elif type(item) == type_float: values.append("%.19f" % item) else: values.append(str(item)) return ','.join(values)
def __init__(self, obj, geography=False): """ Initialize on the spatial object. """ self.is_geometry = isinstance(obj, Geometry) # Getting the WKB (in string form, to allow easy pickling of # the adaptor) and the SRID from the geometry or raster. if self.is_geometry: self.ewkb = bytes(obj.ewkb) self._adapter = Binary(self.ewkb) else: self.ewkb = to_pgraster(obj) self.srid = obj.srid self.geography = geography
def __init__(self, string='', help='', required=False, readonly=False, domain=None, states=None, select=False, on_change=None, on_change_with=None, depends=None, filename=None, context=None, loading='lazy'): if filename is not None: self.filename = filename if depends is None: depends = [filename] else: depends.append(filename) super(Binary, self).__init__(string=string, help=help, required=required, readonly=readonly, domain=domain, states=states, select=select, on_change=on_change, on_change_with=on_change_with, depends=depends, context=context, loading=loading)
def sql_format(value): if isinstance(value, (Query, Expression)): return value db_type = backend.name() if db_type == 'postgresql' and value is not None: import psycopg2 return psycopg2.Binary(value) return value
def testBinary(self): if sys.version_info[0] < 3: s = ''.join([chr(x) for x in range(256)]) b = psycopg2.Binary(s) buf = self.execute("SELECT %s::bytea AS foo", (b,)) self.assertEqual(s, str(buf)) else: s = bytes(list(range(256))) b = psycopg2.Binary(s) buf = self.execute("SELECT %s::bytea AS foo", (b,)) self.assertEqual(s, buf.tobytes())
def testBinaryNone(self): b = psycopg2.Binary(None) buf = self.execute("SELECT %s::bytea AS foo", (b,)) self.assertEqual(buf, None)
def testBinaryEmptyString(self): # test to make sure an empty Binary is converted to an empty string if sys.version_info[0] < 3: b = psycopg2.Binary('') self.assertEqual(str(b), "''::bytea") else: b = psycopg2.Binary(bytes([])) self.assertEqual(str(b), "''::bytea")
def testBinaryRoundTrip(self): # test to make sure buffers returned by psycopg2 are # understood by execute: if sys.version_info[0] < 3: s = ''.join([chr(x) for x in range(256)]) buf = self.execute("SELECT %s::bytea AS foo", (psycopg2.Binary(s),)) buf2 = self.execute("SELECT %s::bytea AS foo", (buf,)) self.assertEqual(s, str(buf2)) else: s = bytes(list(range(256))) buf = self.execute("SELECT %s::bytea AS foo", (psycopg2.Binary(s),)) buf2 = self.execute("SELECT %s::bytea AS foo", (buf,)) self.assertEqual(s, buf2.tobytes())
def testByteaHexCheckFalsePositive(self): # the check \x -> x to detect bad bytea decode # may be fooled if the first char is really an 'x' o1 = psycopg2.Binary(b'x') o2 = self.execute("SELECT %s::bytea AS foo", (o1,)) self.assertEqual(b'x', o2[0])
def testBinary(self): if sys.version_info[0] < 3: s = ''.join([chr(x) for x in range(256)]) b = psycopg2.Binary(s) buf = self.execute("SELECT %s::bytea AS foo", (b,)) self.assertEqual(s, str(buf)) else: s = bytes(range(256)) b = psycopg2.Binary(s) buf = self.execute("SELECT %s::bytea AS foo", (b,)) self.assertEqual(s, buf.tobytes())
def testBinaryRoundTrip(self): # test to make sure buffers returned by psycopg2 are # understood by execute: if sys.version_info[0] < 3: s = ''.join([chr(x) for x in range(256)]) buf = self.execute("SELECT %s::bytea AS foo", (psycopg2.Binary(s),)) buf2 = self.execute("SELECT %s::bytea AS foo", (buf,)) self.assertEqual(s, str(buf2)) else: s = bytes(range(256)) buf = self.execute("SELECT %s::bytea AS foo", (psycopg2.Binary(s),)) buf2 = self.execute("SELECT %s::bytea AS foo", (buf,)) self.assertEqual(s, buf2.tobytes())
def __init__(self, geom, geography=False): "Initializes on the geometry." # Getting the WKB (in string form, to allow easy pickling of # the adaptor) and the SRID from the geometry. self.ewkb = bytes(geom.ewkb) self.srid = geom.srid self.geography = geography self._adapter = Binary(self.ewkb)
def UpsertAggregateInstance(self, aggregateId, snapshot, indexValues): upsertStatement = PostgresStatement.UpsertAggregateInstance.format(self.tableNames.instanceBaseTableName) self.ExecuteSingle(upsertStatement, aggregateId, Json(indexValues), psycopg2.Binary(snapshot))
def InsertEvent(self, aggregateId, logicId, eventTypeId, eventProto, eventVersion, receivedTimestamp, processedTimestamp): insertStatement = PostgresStatement.InsertEvent.format(self.tableNames.eventBaseTableName) return self.ReturnOne(insertStatement, aggregateId, logicId, eventTypeId, eventVersion, psycopg2.Binary(eventProto), receivedTimestamp, processedTimestamp)[0]
def InsertTag(self, aggregateId, logicId, tag, tagProto, tagVersion, tagTimestamp): insertStatement = PostgresStatement.InsertTag.format(self.tableNames.tagTableName) self.ExecuteSingle(insertStatement, aggregateId, logicId, tag, psycopg2.Binary(tagProto), tagVersion, tagTimestamp)
def UpdateEventPersistenceCheckpoint(self, requestId, indexValues, serializedResponse): insertStatement = PostgresStatement.InsertCheckpoint.format(self.tableNames.eventPersistenceStagingTableName) self.ExecuteSingle(insertStatement, requestId, Json(indexValues), psycopg2.Binary(serializedResponse))
def blobEncode(self, blob): try: from psycopg2 import Binary except ImportError: from psycopg import Binary return Binary(blob)
def testByteaHexCheckFalsePositive(self): # the check \x -> x to detect bad bytea decode # may be fooled if the first char is really an 'x' o1 = psycopg2.Binary(b('x')) o2 = self.execute("SELECT %s::bytea AS foo", (o1,)) self.assertEqual(b('x'), o2[0])