我们从Python开源项目中,提取了以下35个代码示例,用于说明如何使用psycopg2.TimestampFromTicks()。
def test_date_time_allocation_bug(self): d1 = psycopg2.Date(2002, 12, 25) d2 = psycopg2.DateFromTicks(time.mktime((2002, 12, 25, 0, 0, 0, 0, 0, 0))) t1 = psycopg2.Time(13, 45, 30) t2 = psycopg2.TimeFromTicks(time.mktime((2001, 1, 1, 13, 45, 30, 0, 0, 0))) t1 = psycopg2.Timestamp(2002, 12, 25, 13, 45, 30) t2 = psycopg2.TimestampFromTicks( time.mktime((2002, 12, 25, 13, 45, 30, 0, 0, 0))) del d1, d2, t1, t2
def test_timestamp_value_error_sec_59_99(self): from datetime import datetime s = psycopg2.TimestampFromTicks(1273173119.99992) self.assertEqual(s.adapted, datetime(2010, 5, 6, 14, 11, 59, 999920, tzinfo=FixedOffsetTimezone(-5 * 60)))
def get_records(self, start_timestamp=0, end_timestamp=None): """Gets records from the table between the specified timestamps. Args: Timestamps are given in seconds since epoch. Returns a list of dicts, each of the form: { 'id': 3, 'record_timestamp': 1341556432, 'imsi': 'IMSI901550000000084', 'ipaddr': '192.168.99.3', 'uploaded_bytes': 5567, 'downloaded_bytes': 9987, 'uploaded_bytes_delta': 74, 'downloaded_bytes_delta': 139 } """ start = psycopg2.TimestampFromTicks(start_timestamp) if not end_timestamp: end_timestamp = time.time() end = psycopg2.TimestampFromTicks(end_timestamp) template = ('select * from %s where record_timestamp >= %s' ' and record_timestamp <= %s') command = template % (self.table_name, start, end) with self.connection.cursor( cursor_factory=psycopg2.extras.RealDictCursor) as cursor: cursor.execute(command) self.connection.commit() return cursor.fetchall()
def delete_records(self, timestamp): """Deletes records older than the given epoch timestamp.""" timestamp = psycopg2.TimestampFromTicks(timestamp) template = 'delete from %s where record_timestamp < %s' command = template % (self.table_name, timestamp) with self.connection.cursor() as cursor: cursor.execute(command) self.connection.commit()
def setUp(self): # Connect to the GPRSDB. self.gprs_db = gprs_database.GPRSDB() # Add some records to the GPRSDB. self.now = time.time() records = [ (psycopg2.TimestampFromTicks(self.now - 60), 'IMSI901550000000084', '192.168.99.1', 100, 200, 100, 200), (psycopg2.TimestampFromTicks(self.now - 30), 'IMSI901550000000084', '192.168.99.1', 300, 500, 200, 300), (psycopg2.TimestampFromTicks(self.now - 10), 'IMSI901550000000084', '192.168.99.1', 700, 600, 400, 100), (psycopg2.TimestampFromTicks(self.now - 5), 'IMSI901550000000084', '192.168.99.1', 750, 625, 50, 25), ] schema = ('record_timestamp, imsi, ipaddr, uploaded_bytes,' ' downloaded_bytes, uploaded_bytes_delta,' ' downloaded_bytes_delta') connection = psycopg2.connect(host='localhost', database='endaga', user=PG_USER, password=PG_PASSWORD) with connection.cursor() as cursor: for record in records: values = "%s, '%s', '%s', %s, %s, %s, %s" % record command = 'insert into gprs_records (%s) values(%s)' % ( schema, values) cursor.execute(command) connection.commit()
def test_date_time_allocation_bug(self): d1 = psycopg2.Date(2002,12,25) d2 = psycopg2.DateFromTicks(time.mktime((2002,12,25,0,0,0,0,0,0))) t1 = psycopg2.Time(13,45,30) t2 = psycopg2.TimeFromTicks(time.mktime((2001,1,1,13,45,30,0,0,0))) t1 = psycopg2.Timestamp(2002,12,25,13,45,30) t2 = psycopg2.TimestampFromTicks( time.mktime((2002,12,25,13,45,30,0,0,0)))
def setUpClass(cls): # Monkeypatch Subscriber so sub balance lookups succeed. cls.original_subscriber = utilities.subscriber cls.mock_subscriber = mocks.MockSubscriber() utilities.subscriber = cls.mock_subscriber subscriber.create_subscriber('IMSI901550000000084', '5551234') subscriber.create_subscriber('IMSI901550000000082', '5551235') # Connect to the GPRSDB and EventStore. cls.gprs_db = gprs_database.GPRSDB() cls.event_store = events.EventStore() # Add some records to the GPRSDB. The method we're testing should # extract these records and create events in the EventStore. cls.now = time.time() records = [ (psycopg2.TimestampFromTicks(cls.now - 120), 'IMSI901550000000084', '192.168.99.1', 50, 80, 50, 80), (psycopg2.TimestampFromTicks(cls.now - 60), 'IMSI901550000000084', '192.168.99.1', 50, 80, 0, 0), (psycopg2.TimestampFromTicks(cls.now - 30), 'IMSI901550000000084', '192.168.99.1', 300, 500, 250, 420), (psycopg2.TimestampFromTicks(cls.now - 10), 'IMSI901550000000084', '192.168.99.1', 700, 600, 400, 100), (psycopg2.TimestampFromTicks(cls.now - 5), 'IMSI901550000000084', '192.168.99.1', 750, 625, 50, 25), # Create events for a different IMSI. (psycopg2.TimestampFromTicks(cls.now - 60), 'IMSI901550000000082', '192.168.99.2', 50, 80, 0, 0), (psycopg2.TimestampFromTicks(cls.now - 10), 'IMSI901550000000082', '192.168.99.2', 400, 300, 350, 220), (psycopg2.TimestampFromTicks(cls.now - 5), 'IMSI901550000000082', '192.168.99.2', 450, 325, 50, 25), ] schema = ('record_timestamp, imsi, ipaddr, uploaded_bytes,' ' downloaded_bytes, uploaded_bytes_delta,' ' downloaded_bytes_delta') connection = psycopg2.connect(host='localhost', database='endaga', user=PG_USER, password=PG_PASSWORD) with connection.cursor() as cursor: for record in records: values = "%s, '%s', '%s', %s, %s, %s, %s" % record command = 'insert into gprs_records (%s) values(%s)' % ( schema, values) cursor.execute(command) connection.commit()