Python psycopg2 模块,TimestampFromTicks() 实例源码

我们从Python开源项目中,提取了以下35个代码示例,用于说明如何使用psycopg2.TimestampFromTicks()

项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def test_date_time_allocation_bug(self):
        d1 = psycopg2.Date(2002, 12, 25)
        d2 = psycopg2.DateFromTicks(time.mktime((2002, 12, 25, 0, 0, 0, 0, 0, 0)))
        t1 = psycopg2.Time(13, 45, 30)
        t2 = psycopg2.TimeFromTicks(time.mktime((2001, 1, 1, 13, 45, 30, 0, 0, 0)))
        t1 = psycopg2.Timestamp(2002, 12, 25, 13, 45, 30)
        t2 = psycopg2.TimestampFromTicks(
            time.mktime((2002, 12, 25, 13, 45, 30, 0, 0, 0)))
        del d1, d2, t1, t2
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def test_timestamp_value_error_sec_59_99(self):
        from datetime import datetime
        s = psycopg2.TimestampFromTicks(1273173119.99992)
        self.assertEqual(s.adapted,
            datetime(2010, 5, 6, 14, 11, 59, 999920,
                tzinfo=FixedOffsetTimezone(-5 * 60)))
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def test_date_time_allocation_bug(self):
        d1 = psycopg2.Date(2002, 12, 25)
        d2 = psycopg2.DateFromTicks(time.mktime((2002, 12, 25, 0, 0, 0, 0, 0, 0)))
        t1 = psycopg2.Time(13, 45, 30)
        t2 = psycopg2.TimeFromTicks(time.mktime((2001, 1, 1, 13, 45, 30, 0, 0, 0)))
        t1 = psycopg2.Timestamp(2002, 12, 25, 13, 45, 30)
        t2 = psycopg2.TimestampFromTicks(
            time.mktime((2002, 12, 25, 13, 45, 30, 0, 0, 0)))
        del d1, d2, t1, t2
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def test_timestamp_value_error_sec_59_99(self):
        from datetime import datetime
        s = psycopg2.TimestampFromTicks(1273173119.99992)
        self.assertEqual(s.adapted,
            datetime(2010, 5, 6, 14, 11, 59, 999920,
                tzinfo=FixedOffsetTimezone(-5 * 60)))
项目:CommunityCellularManager    作者:facebookincubator    | 项目源码 | 文件源码
def get_records(self, start_timestamp=0, end_timestamp=None):
        """Gets records from the table between the specified timestamps.

        Args:
          Timestamps are given in seconds since epoch.

        Returns a list of dicts, each of the form: {
            'id': 3,
            'record_timestamp': 1341556432,
            'imsi': 'IMSI901550000000084',
            'ipaddr': '192.168.99.3',
            'uploaded_bytes': 5567,
            'downloaded_bytes': 9987,
            'uploaded_bytes_delta': 74,
            'downloaded_bytes_delta': 139
        }
        """
        start = psycopg2.TimestampFromTicks(start_timestamp)
        if not end_timestamp:
            end_timestamp = time.time()
        end = psycopg2.TimestampFromTicks(end_timestamp)
        template = ('select * from %s where record_timestamp >= %s'
                    ' and record_timestamp <= %s')
        command = template % (self.table_name, start, end)
        with self.connection.cursor(
            cursor_factory=psycopg2.extras.RealDictCursor) as cursor:
            cursor.execute(command)
            self.connection.commit()
            return cursor.fetchall()
项目:CommunityCellularManager    作者:facebookincubator    | 项目源码 | 文件源码
def delete_records(self, timestamp):
        """Deletes records older than the given epoch timestamp."""
        timestamp = psycopg2.TimestampFromTicks(timestamp)
        template = 'delete from %s where record_timestamp < %s'
        command = template % (self.table_name, timestamp)
        with self.connection.cursor() as cursor:
            cursor.execute(command)
            self.connection.commit()
项目:CommunityCellularManager    作者:facebookincubator    | 项目源码 | 文件源码
def setUp(self):
        # Connect to the GPRSDB.
        self.gprs_db = gprs_database.GPRSDB()
        # Add some records to the GPRSDB.
        self.now = time.time()
        records = [
            (psycopg2.TimestampFromTicks(self.now - 60), 'IMSI901550000000084',
             '192.168.99.1', 100, 200, 100, 200),
            (psycopg2.TimestampFromTicks(self.now - 30), 'IMSI901550000000084',
             '192.168.99.1', 300, 500, 200, 300),
            (psycopg2.TimestampFromTicks(self.now - 10), 'IMSI901550000000084',
             '192.168.99.1', 700, 600, 400, 100),
            (psycopg2.TimestampFromTicks(self.now - 5), 'IMSI901550000000084',
             '192.168.99.1', 750, 625, 50, 25),
        ]
        schema = ('record_timestamp, imsi, ipaddr, uploaded_bytes,'
                  ' downloaded_bytes, uploaded_bytes_delta,'
                  ' downloaded_bytes_delta')
        connection = psycopg2.connect(host='localhost', database='endaga',
                                      user=PG_USER, password=PG_PASSWORD)
        with connection.cursor() as cursor:
            for record in records:
                values = "%s, '%s', '%s', %s, %s, %s, %s" % record
                command = 'insert into gprs_records (%s) values(%s)' % (
                    schema, values)
                cursor.execute(command)
        connection.commit()
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def test_date_time_allocation_bug(self):
        d1 = psycopg2.Date(2002, 12, 25)
        d2 = psycopg2.DateFromTicks(time.mktime((2002, 12, 25, 0, 0, 0, 0, 0, 0)))
        t1 = psycopg2.Time(13, 45, 30)
        t2 = psycopg2.TimeFromTicks(time.mktime((2001, 1, 1, 13, 45, 30, 0, 0, 0)))
        t1 = psycopg2.Timestamp(2002, 12, 25, 13, 45, 30)
        t2 = psycopg2.TimestampFromTicks(
            time.mktime((2002, 12, 25, 13, 45, 30, 0, 0, 0)))
        del d1, d2, t1, t2
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def test_timestamp_value_error_sec_59_99(self):
        from datetime import datetime
        s = psycopg2.TimestampFromTicks(1273173119.99992)
        self.assertEqual(s.adapted,
            datetime(2010, 5, 6, 14, 11, 59, 999920,
                tzinfo=FixedOffsetTimezone(-5 * 60)))
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def test_date_time_allocation_bug(self):
        d1 = psycopg2.Date(2002,12,25)
        d2 = psycopg2.DateFromTicks(time.mktime((2002,12,25,0,0,0,0,0,0)))
        t1 = psycopg2.Time(13,45,30)
        t2 = psycopg2.TimeFromTicks(time.mktime((2001,1,1,13,45,30,0,0,0)))
        t1 = psycopg2.Timestamp(2002,12,25,13,45,30)
        t2 = psycopg2.TimestampFromTicks(
            time.mktime((2002,12,25,13,45,30,0,0,0)))
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def test_timestamp_value_error_sec_59_99(self):
        from datetime import datetime
        s = psycopg2.TimestampFromTicks(1273173119.99992)
        self.assertEqual(s.adapted,
            datetime(2010, 5, 6, 14, 11, 59, 999920,
                tzinfo=FixedOffsetTimezone(-5 * 60)))
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def test_date_time_allocation_bug(self):
        d1 = psycopg2.Date(2002,12,25)
        d2 = psycopg2.DateFromTicks(time.mktime((2002,12,25,0,0,0,0,0,0)))
        t1 = psycopg2.Time(13,45,30)
        t2 = psycopg2.TimeFromTicks(time.mktime((2001,1,1,13,45,30,0,0,0)))
        t1 = psycopg2.Timestamp(2002,12,25,13,45,30)
        t2 = psycopg2.TimestampFromTicks(
            time.mktime((2002,12,25,13,45,30,0,0,0)))
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def test_timestamp_value_error_sec_59_99(self):
        from datetime import datetime
        s = psycopg2.TimestampFromTicks(1273173119.99992)
        self.assertEqual(s.adapted,
            datetime(2010, 5, 6, 14, 11, 59, 999920,
                tzinfo=FixedOffsetTimezone(-5 * 60)))
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def test_date_time_allocation_bug(self):
        d1 = psycopg2.Date(2002,12,25)
        d2 = psycopg2.DateFromTicks(time.mktime((2002,12,25,0,0,0,0,0,0)))
        t1 = psycopg2.Time(13,45,30)
        t2 = psycopg2.TimeFromTicks(time.mktime((2001,1,1,13,45,30,0,0,0)))
        t1 = psycopg2.Timestamp(2002,12,25,13,45,30)
        t2 = psycopg2.TimestampFromTicks(
            time.mktime((2002,12,25,13,45,30,0,0,0)))
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def test_date_time_allocation_bug(self):
        d1 = psycopg2.Date(2002,12,25)
        d2 = psycopg2.DateFromTicks(time.mktime((2002,12,25,0,0,0,0,0,0)))
        t1 = psycopg2.Time(13,45,30)
        t2 = psycopg2.TimeFromTicks(time.mktime((2001,1,1,13,45,30,0,0,0)))
        t1 = psycopg2.Timestamp(2002,12,25,13,45,30)
        t2 = psycopg2.TimestampFromTicks(
            time.mktime((2002,12,25,13,45,30,0,0,0)))
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def test_timestamp_value_error_sec_59_99(self):
        from datetime import datetime
        s = psycopg2.TimestampFromTicks(1273173119.99992)
        self.assertEqual(s.adapted,
            datetime(2010, 5, 6, 14, 11, 59, 999920,
                tzinfo=FixedOffsetTimezone(-5 * 60)))
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def test_date_time_allocation_bug(self):
        d1 = psycopg2.Date(2002,12,25)
        d2 = psycopg2.DateFromTicks(time.mktime((2002,12,25,0,0,0,0,0,0)))
        t1 = psycopg2.Time(13,45,30)
        t2 = psycopg2.TimeFromTicks(time.mktime((2001,1,1,13,45,30,0,0,0)))
        t1 = psycopg2.Timestamp(2002,12,25,13,45,30)
        t2 = psycopg2.TimestampFromTicks(
            time.mktime((2002,12,25,13,45,30,0,0,0)))
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def test_timestamp_value_error_sec_59_99(self):
        from datetime import datetime
        s = psycopg2.TimestampFromTicks(1273173119.99992)
        self.assertEqual(s.adapted,
            datetime(2010, 5, 6, 14, 11, 59, 999920,
                tzinfo=FixedOffsetTimezone(-5 * 60)))
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def test_date_time_allocation_bug(self):
        d1 = psycopg2.Date(2002,12,25)
        d2 = psycopg2.DateFromTicks(time.mktime((2002,12,25,0,0,0,0,0,0)))
        t1 = psycopg2.Time(13,45,30)
        t2 = psycopg2.TimeFromTicks(time.mktime((2001,1,1,13,45,30,0,0,0)))
        t1 = psycopg2.Timestamp(2002,12,25,13,45,30)
        t2 = psycopg2.TimestampFromTicks(
            time.mktime((2002,12,25,13,45,30,0,0,0)))
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def test_date_time_allocation_bug(self):
        d1 = psycopg2.Date(2002,12,25)
        d2 = psycopg2.DateFromTicks(time.mktime((2002,12,25,0,0,0,0,0,0)))
        t1 = psycopg2.Time(13,45,30)
        t2 = psycopg2.TimeFromTicks(time.mktime((2001,1,1,13,45,30,0,0,0)))
        t1 = psycopg2.Timestamp(2002,12,25,13,45,30)
        t2 = psycopg2.TimestampFromTicks(
            time.mktime((2002,12,25,13,45,30,0,0,0)))
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def test_timestamp_value_error_sec_59_99(self):
        from datetime import datetime
        s = psycopg2.TimestampFromTicks(1273173119.99992)
        self.assertEqual(s.adapted,
            datetime(2010, 5, 6, 14, 11, 59, 999920,
                tzinfo=FixedOffsetTimezone(-5 * 60)))
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def test_date_time_allocation_bug(self):
        d1 = psycopg2.Date(2002,12,25)
        d2 = psycopg2.DateFromTicks(time.mktime((2002,12,25,0,0,0,0,0,0)))
        t1 = psycopg2.Time(13,45,30)
        t2 = psycopg2.TimeFromTicks(time.mktime((2001,1,1,13,45,30,0,0,0)))
        t1 = psycopg2.Timestamp(2002,12,25,13,45,30)
        t2 = psycopg2.TimestampFromTicks(
            time.mktime((2002,12,25,13,45,30,0,0,0)))
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def test_timestamp_value_error_sec_59_99(self):
        from datetime import datetime
        s = psycopg2.TimestampFromTicks(1273173119.99992)
        self.assertEqual(s.adapted,
            datetime(2010, 5, 6, 14, 11, 59, 999920,
                tzinfo=FixedOffsetTimezone(-5 * 60)))
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def test_date_time_allocation_bug(self):
        d1 = psycopg2.Date(2002,12,25)
        d2 = psycopg2.DateFromTicks(time.mktime((2002,12,25,0,0,0,0,0,0)))
        t1 = psycopg2.Time(13,45,30)
        t2 = psycopg2.TimeFromTicks(time.mktime((2001,1,1,13,45,30,0,0,0)))
        t1 = psycopg2.Timestamp(2002,12,25,13,45,30)
        t2 = psycopg2.TimestampFromTicks(
            time.mktime((2002,12,25,13,45,30,0,0,0)))
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def test_date_time_allocation_bug(self):
        d1 = psycopg2.Date(2002,12,25)
        d2 = psycopg2.DateFromTicks(time.mktime((2002,12,25,0,0,0,0,0,0)))
        t1 = psycopg2.Time(13,45,30)
        t2 = psycopg2.TimeFromTicks(time.mktime((2001,1,1,13,45,30,0,0,0)))
        t1 = psycopg2.Timestamp(2002,12,25,13,45,30)
        t2 = psycopg2.TimestampFromTicks(
            time.mktime((2002,12,25,13,45,30,0,0,0)))
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def test_timestamp_value_error_sec_59_99(self):
        from datetime import datetime
        s = psycopg2.TimestampFromTicks(1273173119.99992)
        self.assertEqual(s.adapted,
            datetime(2010, 5, 6, 14, 11, 59, 999920,
                tzinfo=FixedOffsetTimezone(-5 * 60)))
项目:nmbs-realtime-feed    作者:datamindedbe    | 项目源码 | 文件源码
def test_date_time_allocation_bug(self):
        d1 = psycopg2.Date(2002, 12, 25)
        d2 = psycopg2.DateFromTicks(time.mktime((2002, 12, 25, 0, 0, 0, 0, 0, 0)))
        t1 = psycopg2.Time(13, 45, 30)
        t2 = psycopg2.TimeFromTicks(time.mktime((2001, 1, 1, 13, 45, 30, 0, 0, 0)))
        t1 = psycopg2.Timestamp(2002, 12, 25, 13, 45, 30)
        t2 = psycopg2.TimestampFromTicks(
            time.mktime((2002, 12, 25, 13, 45, 30, 0, 0, 0)))
        del d1, d2, t1, t2
项目:nmbs-realtime-feed    作者:datamindedbe    | 项目源码 | 文件源码
def test_timestamp_value_error_sec_59_99(self):
        from datetime import datetime
        s = psycopg2.TimestampFromTicks(1273173119.99992)
        self.assertEqual(s.adapted,
            datetime(2010, 5, 6, 14, 11, 59, 999920,
                tzinfo=FixedOffsetTimezone(-5 * 60)))
项目:aws-lambda-redshift-copy    作者:christianhxc    | 项目源码 | 文件源码
def test_date_time_allocation_bug(self):
        d1 = psycopg2.Date(2002,12,25)
        d2 = psycopg2.DateFromTicks(time.mktime((2002,12,25,0,0,0,0,0,0)))
        t1 = psycopg2.Time(13,45,30)
        t2 = psycopg2.TimeFromTicks(time.mktime((2001,1,1,13,45,30,0,0,0)))
        t1 = psycopg2.Timestamp(2002,12,25,13,45,30)
        t2 = psycopg2.TimestampFromTicks(
            time.mktime((2002,12,25,13,45,30,0,0,0)))
项目:aws-lambda-redshift-copy    作者:christianhxc    | 项目源码 | 文件源码
def test_timestamp_value_error_sec_59_99(self):
        from datetime import datetime
        s = psycopg2.TimestampFromTicks(1273173119.99992)
        self.assertEqual(s.adapted,
            datetime(2010, 5, 6, 14, 11, 59, 999920,
                tzinfo=FixedOffsetTimezone(-5 * 60)))
项目:PyEloqua-Examples    作者:colemanja91    | 项目源码 | 文件源码
def test_date_time_allocation_bug(self):
        d1 = psycopg2.Date(2002,12,25)
        d2 = psycopg2.DateFromTicks(time.mktime((2002,12,25,0,0,0,0,0,0)))
        t1 = psycopg2.Time(13,45,30)
        t2 = psycopg2.TimeFromTicks(time.mktime((2001,1,1,13,45,30,0,0,0)))
        t1 = psycopg2.Timestamp(2002,12,25,13,45,30)
        t2 = psycopg2.TimestampFromTicks(
            time.mktime((2002,12,25,13,45,30,0,0,0)))
项目:PyEloqua-Examples    作者:colemanja91    | 项目源码 | 文件源码
def test_timestamp_value_error_sec_59_99(self):
        from datetime import datetime
        s = psycopg2.TimestampFromTicks(1273173119.99992)
        self.assertEqual(s.adapted,
            datetime(2010, 5, 6, 14, 11, 59, 999920,
                tzinfo=FixedOffsetTimezone(-5 * 60)))
项目:flask    作者:bobohope    | 项目源码 | 文件源码
def test_date_time_allocation_bug(self):
        d1 = psycopg2.Date(2002, 12, 25)
        d2 = psycopg2.DateFromTicks(time.mktime((2002, 12, 25, 0, 0, 0, 0, 0, 0)))
        t1 = psycopg2.Time(13, 45, 30)
        t2 = psycopg2.TimeFromTicks(time.mktime((2001, 1, 1, 13, 45, 30, 0, 0, 0)))
        t1 = psycopg2.Timestamp(2002, 12, 25, 13, 45, 30)
        t2 = psycopg2.TimestampFromTicks(
            time.mktime((2002, 12, 25, 13, 45, 30, 0, 0, 0)))
        del d1, d2, t1, t2
项目:flask    作者:bobohope    | 项目源码 | 文件源码
def test_timestamp_value_error_sec_59_99(self):
        from datetime import datetime
        s = psycopg2.TimestampFromTicks(1273173119.99992)
        self.assertEqual(s.adapted,
            datetime(2010, 5, 6, 14, 11, 59, 999920,
                tzinfo=FixedOffsetTimezone(-5 * 60)))
项目:CommunityCellularManager    作者:facebookincubator    | 项目源码 | 文件源码
def setUpClass(cls):
        # Monkeypatch Subscriber so sub balance lookups succeed.
        cls.original_subscriber = utilities.subscriber
        cls.mock_subscriber = mocks.MockSubscriber()
        utilities.subscriber = cls.mock_subscriber
        subscriber.create_subscriber('IMSI901550000000084', '5551234')
        subscriber.create_subscriber('IMSI901550000000082', '5551235')
        # Connect to the GPRSDB and EventStore.
        cls.gprs_db = gprs_database.GPRSDB()
        cls.event_store = events.EventStore()
        # Add some records to the GPRSDB.  The method we're testing should
        # extract these records and create events in the EventStore.
        cls.now = time.time()
        records = [
            (psycopg2.TimestampFromTicks(cls.now - 120), 'IMSI901550000000084',
             '192.168.99.1', 50, 80, 50, 80),
            (psycopg2.TimestampFromTicks(cls.now - 60), 'IMSI901550000000084',
             '192.168.99.1', 50, 80, 0, 0),
            (psycopg2.TimestampFromTicks(cls.now - 30), 'IMSI901550000000084',
             '192.168.99.1', 300, 500, 250, 420),
            (psycopg2.TimestampFromTicks(cls.now - 10), 'IMSI901550000000084',
             '192.168.99.1', 700, 600, 400, 100),
            (psycopg2.TimestampFromTicks(cls.now - 5), 'IMSI901550000000084',
             '192.168.99.1', 750, 625, 50, 25),
            # Create events for a different IMSI.
            (psycopg2.TimestampFromTicks(cls.now - 60), 'IMSI901550000000082',
             '192.168.99.2', 50, 80, 0, 0),
            (psycopg2.TimestampFromTicks(cls.now - 10), 'IMSI901550000000082',
             '192.168.99.2', 400, 300, 350, 220),
            (psycopg2.TimestampFromTicks(cls.now - 5), 'IMSI901550000000082',
             '192.168.99.2', 450, 325, 50, 25),
        ]
        schema = ('record_timestamp, imsi, ipaddr, uploaded_bytes,'
                  ' downloaded_bytes, uploaded_bytes_delta,'
                  ' downloaded_bytes_delta')
        connection = psycopg2.connect(host='localhost', database='endaga',
                                      user=PG_USER, password=PG_PASSWORD)
        with connection.cursor() as cursor:
            for record in records:
                values = "%s, '%s', '%s', %s, %s, %s, %s" % record
                command = 'insert into gprs_records (%s) values(%s)' % (
                    schema, values)
                cursor.execute(command)
        connection.commit()