我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用dateutil.tz()。
def in_words_from_now(stamp, sep='_', precision='{:0.1f}'): if stamp is None: return 'never' nw = now() if nw > stamp: words = ('ago',) rdate = delta(nw, stamp) else: words = ('from', 'now') rdate = delta(stamp, nw) if rdate.days > 0 or rdate.weeks > 0 or rdate.months > 0 or rdate.years > 0: return stamp.astimezone(dateutil.tz.tzlocal()).isoformat() if rdate.hours > 0: value = rdate.hours + (rdate.minutes / 60.0) label = 'hours' elif rdate.minutes > 0: value = rdate.minutes + (rdate.seconds / 60.0) label = 'min' else: value = rdate.seconds + (rdate.microseconds / 1000000.0) label = 'sec' return sep.join((precision.format(value), label) + words)
def datetime_from_timestamp(ts, tz=None, as_utc=False): # type: (float, dateutil.tz, bool) -> datetime.datetime """Convert a timestamp into datetime with offset :param float ts: timestamp :param dateutil.tz tz: time zone or local tz if not specified :param bool as_utc: convert datetime to UTC :rtype: datetime.datetime :return: converted timestamp to datetime """ if tz is None: tz = dateutil.tz.tzlocal() dt = datetime.datetime.fromtimestamp(ts, tz=tz) if as_utc: return dt.astimezone(tz=dateutil.tz.tzutc()) else: return dt
def test_index_convert_to_datetime_array(self): tm._skip_if_no_pytz() def _check_rng(rng): converted = rng.to_pydatetime() tm.assertIsInstance(converted, np.ndarray) for x, stamp in zip(converted, rng): tm.assertIsInstance(x, datetime) self.assertEqual(x, stamp.to_pydatetime()) self.assertEqual(x.tzinfo, stamp.tzinfo) rng = date_range('20090415', '20090519') rng_eastern = date_range('20090415', '20090519', tz='US/Eastern') rng_utc = date_range('20090415', '20090519', tz='utc') _check_rng(rng) _check_rng(rng_eastern) _check_rng(rng_utc)
def test_index_convert_to_datetime_array_explicit_pytz(self): tm._skip_if_no_pytz() import pytz def _check_rng(rng): converted = rng.to_pydatetime() tm.assertIsInstance(converted, np.ndarray) for x, stamp in zip(converted, rng): tm.assertIsInstance(x, datetime) self.assertEqual(x, stamp.to_pydatetime()) self.assertEqual(x.tzinfo, stamp.tzinfo) rng = date_range('20090415', '20090519') rng_eastern = date_range('20090415', '20090519', tz=pytz.timezone('US/Eastern')) rng_utc = date_range('20090415', '20090519', tz=pytz.utc) _check_rng(rng) _check_rng(rng_eastern) _check_rng(rng_utc)
def test_index_convert_to_datetime_array_dateutil(self): tm._skip_if_no_dateutil() import dateutil def _check_rng(rng): converted = rng.to_pydatetime() tm.assertIsInstance(converted, np.ndarray) for x, stamp in zip(converted, rng): tm.assertIsInstance(x, datetime) self.assertEqual(x, stamp.to_pydatetime()) self.assertEqual(x.tzinfo, stamp.tzinfo) rng = date_range('20090415', '20090519') rng_eastern = date_range('20090415', '20090519', tz='dateutil/US/Eastern') rng_utc = date_range('20090415', '20090519', tz=dateutil.tz.tzutc()) _check_rng(rng) _check_rng(rng_eastern) _check_rng(rng_utc)
def test_dti_constructor_years_only(self): # GH 6961 for tz in [None, 'UTC', 'Asia/Tokyo', 'dateutil/US/Pacific']: rng1 = date_range('2014', '2015', freq='M', tz=tz) expected1 = date_range('2014-01-31', '2014-12-31', freq='M', tz=tz) rng2 = date_range('2014', '2015', freq='MS', tz=tz) expected2 = date_range('2014-01-01', '2015-01-01', freq='MS', tz=tz) rng3 = date_range('2014', '2020', freq='A', tz=tz) expected3 = date_range('2014-12-31', '2019-12-31', freq='A', tz=tz) rng4 = date_range('2014', '2020', freq='AS', tz=tz) expected4 = date_range('2014-01-01', '2020-01-01', freq='AS', tz=tz) for rng, expected in [(rng1, expected1), (rng2, expected2), (rng3, expected3), (rng4, expected4)]: tm.assert_index_equal(rng, expected)
def test_append_concat_tz(self): # GH 2938 tm._skip_if_no_pytz() rng = date_range('5/8/2012 1:45', periods=10, freq='5T', tz='US/Eastern') rng2 = date_range('5/8/2012 2:35', periods=10, freq='5T', tz='US/Eastern') rng3 = date_range('5/8/2012 1:45', periods=20, freq='5T', tz='US/Eastern') ts = Series(np.random.randn(len(rng)), rng) df = DataFrame(np.random.randn(len(rng), 4), index=rng) ts2 = Series(np.random.randn(len(rng2)), rng2) df2 = DataFrame(np.random.randn(len(rng2), 4), index=rng2) result = ts.append(ts2) result_df = df.append(df2) self.assertTrue(result.index.equals(rng3)) self.assertTrue(result_df.index.equals(rng3)) appended = rng.append(rng2) self.assertTrue(appended.equals(rng3))
def test_append_concat_tz_explicit_pytz(self): # GH 2938 tm._skip_if_no_pytz() from pytz import timezone as timezone rng = date_range('5/8/2012 1:45', periods=10, freq='5T', tz=timezone('US/Eastern')) rng2 = date_range('5/8/2012 2:35', periods=10, freq='5T', tz=timezone('US/Eastern')) rng3 = date_range('5/8/2012 1:45', periods=20, freq='5T', tz=timezone('US/Eastern')) ts = Series(np.random.randn(len(rng)), rng) df = DataFrame(np.random.randn(len(rng), 4), index=rng) ts2 = Series(np.random.randn(len(rng2)), rng2) df2 = DataFrame(np.random.randn(len(rng2), 4), index=rng2) result = ts.append(ts2) result_df = df.append(df2) self.assertTrue(result.index.equals(rng3)) self.assertTrue(result_df.index.equals(rng3)) appended = rng.append(rng2) self.assertTrue(appended.equals(rng3))
def test_period_resample_with_local_timezone_dateutil(self): # GH5430 tm._skip_if_no_dateutil() import dateutil local_timezone = 'dateutil/America/Los_Angeles' start = datetime(year=2013, month=11, day=1, hour=0, minute=0, tzinfo=dateutil.tz.tzutc()) # 1 day later end = datetime(year=2013, month=11, day=2, hour=0, minute=0, tzinfo=dateutil.tz.tzutc()) index = pd.date_range(start, end, freq='H') series = pd.Series(1, index=index) series = series.tz_convert(local_timezone) result = series.resample('D', kind='period').mean() # Create the expected series # Index is moved back a day with the timezone conversion from UTC to # Pacific expected_index = (pd.period_range(start=start, end=end, freq='D') - 1) expected = pd.Series(1, index=expected_index) assert_series_equal(result, expected)
def test_take(self): dates = [datetime(2010, 1, 1, 14), datetime(2010, 1, 1, 15), datetime(2010, 1, 1, 17), datetime(2010, 1, 1, 21)] for tz in [None, 'US/Eastern', 'Asia/Tokyo']: idx = DatetimeIndex(start='2010-01-01 09:00', end='2010-02-01 09:00', freq='H', tz=tz, name='idx') expected = DatetimeIndex(dates, freq=None, name='idx', tz=tz) taken1 = idx.take([5, 6, 8, 12]) taken2 = idx[[5, 6, 8, 12]] for taken in [taken1, taken2]: self.assertTrue(taken.equals(expected)) tm.assertIsInstance(taken, DatetimeIndex) self.assertIsNone(taken.freq) self.assertEqual(taken.tz, expected.tz) self.assertEqual(taken.name, expected.name)
def test_dti_set_index_reindex(self): # GH 6631 df = DataFrame(np.random.random(6)) idx1 = date_range('2011/01/01', periods=6, freq='M', tz='US/Eastern') idx2 = date_range('2013', periods=6, freq='A', tz='Asia/Tokyo') df = df.set_index(idx1) self.assertTrue(df.index.equals(idx1)) df = df.reindex(idx2) self.assertTrue(df.index.equals(idx2)) # 11314 # with tz index = date_range(datetime(2015, 10, 1), datetime( 2015, 10, 1, 23), freq='H', tz='US/Eastern') df = DataFrame(np.random.randn(24, 1), columns=['a'], index=index) new_index = date_range(datetime(2015, 10, 2), datetime(2015, 10, 2, 23), freq='H', tz='US/Eastern') # TODO: unused? result = df.set_index(new_index) # noqa self.assertEqual(new_index.freq, index.freq)
def test_cant_compare_tz_naive_w_aware(self): tm._skip_if_no_pytz() # #1404 a = Timestamp('3/12/2012') b = Timestamp('3/12/2012', tz='utc') self.assertRaises(Exception, a.__eq__, b) self.assertRaises(Exception, a.__ne__, b) self.assertRaises(Exception, a.__lt__, b) self.assertRaises(Exception, a.__gt__, b) self.assertRaises(Exception, b.__eq__, a) self.assertRaises(Exception, b.__ne__, a) self.assertRaises(Exception, b.__lt__, a) self.assertRaises(Exception, b.__gt__, a) if sys.version_info < (3, 3): self.assertRaises(Exception, a.__eq__, b.to_pydatetime()) self.assertRaises(Exception, a.to_pydatetime().__eq__, b) else: self.assertFalse(a == b.to_pydatetime()) self.assertFalse(a.to_pydatetime() == b)
def test_cant_compare_tz_naive_w_aware_explicit_pytz(self): tm._skip_if_no_pytz() from pytz import utc # #1404 a = Timestamp('3/12/2012') b = Timestamp('3/12/2012', tz=utc) self.assertRaises(Exception, a.__eq__, b) self.assertRaises(Exception, a.__ne__, b) self.assertRaises(Exception, a.__lt__, b) self.assertRaises(Exception, a.__gt__, b) self.assertRaises(Exception, b.__eq__, a) self.assertRaises(Exception, b.__ne__, a) self.assertRaises(Exception, b.__lt__, a) self.assertRaises(Exception, b.__gt__, a) if sys.version_info < (3, 3): self.assertRaises(Exception, a.__eq__, b.to_pydatetime()) self.assertRaises(Exception, a.to_pydatetime().__eq__, b) else: self.assertFalse(a == b.to_pydatetime()) self.assertFalse(a.to_pydatetime() == b)
def test_cant_compare_tz_naive_w_aware_dateutil(self): tm._skip_if_no_dateutil() from dateutil.tz import tzutc utc = tzutc() # #1404 a = Timestamp('3/12/2012') b = Timestamp('3/12/2012', tz=utc) self.assertRaises(Exception, a.__eq__, b) self.assertRaises(Exception, a.__ne__, b) self.assertRaises(Exception, a.__lt__, b) self.assertRaises(Exception, a.__gt__, b) self.assertRaises(Exception, b.__eq__, a) self.assertRaises(Exception, b.__ne__, a) self.assertRaises(Exception, b.__lt__, a) self.assertRaises(Exception, b.__gt__, a) if sys.version_info < (3, 3): self.assertRaises(Exception, a.__eq__, b.to_pydatetime()) self.assertRaises(Exception, a.to_pydatetime().__eq__, b) else: self.assertFalse(a == b.to_pydatetime()) self.assertFalse(a.to_pydatetime() == b)
def test_tz_localize_dti(self): dti = DatetimeIndex(start='1/1/2005', end='1/1/2005 0:00:30.256', freq='L') dti2 = dti.tz_localize(self.tzstr('US/Eastern')) dti_utc = DatetimeIndex(start='1/1/2005 05:00', end='1/1/2005 5:00:30.256', freq='L', tz='utc') self.assert_numpy_array_equal(dti2.values, dti_utc.values) dti3 = dti2.tz_convert(self.tzstr('US/Pacific')) self.assert_numpy_array_equal(dti3.values, dti_utc.values) dti = DatetimeIndex(start='11/6/2011 1:59', end='11/6/2011 2:00', freq='L') self.assertRaises(pytz.AmbiguousTimeError, dti.tz_localize, self.tzstr('US/Eastern')) dti = DatetimeIndex(start='3/13/2011 1:59', end='3/13/2011 2:00', freq='L') self.assertRaises(pytz.NonExistentTimeError, dti.tz_localize, self.tzstr('US/Eastern'))
def test_utc_box_timestamp_and_localize(self): rng = date_range('3/11/2012', '3/12/2012', freq='H', tz='utc') rng_eastern = rng.tz_convert(self.tzstr('US/Eastern')) tz = self.tz('US/Eastern') expected = rng[-1].astimezone(tz) stamp = rng_eastern[-1] self.assertEqual(stamp, expected) self.assertEqual(stamp.tzinfo, expected.tzinfo) # right tzinfo rng = date_range('3/13/2012', '3/14/2012', freq='H', tz='utc') rng_eastern = rng.tz_convert(self.tzstr('US/Eastern')) # test not valid for dateutil timezones. # self.assertIn('EDT', repr(rng_eastern[0].tzinfo)) self.assertTrue('EDT' in repr(rng_eastern[0].tzinfo) or 'tzfile' in repr(rng_eastern[0].tzinfo))
def test_with_tz_ambiguous_times(self): tz = self.tz('US/Eastern') # March 13, 2011, spring forward, skip from 2 AM to 3 AM dr = date_range(datetime(2011, 3, 13, 1, 30), periods=3, freq=datetools.Hour()) self.assertRaises(pytz.NonExistentTimeError, dr.tz_localize, tz) # after dst transition, it works dr = date_range(datetime(2011, 3, 13, 3, 30), periods=3, freq=datetools.Hour(), tz=tz) # November 6, 2011, fall back, repeat 2 AM hour dr = date_range(datetime(2011, 11, 6, 1, 30), periods=3, freq=datetools.Hour()) self.assertRaises(pytz.AmbiguousTimeError, dr.tz_localize, tz) # UTC is OK dr = date_range(datetime(2011, 3, 13), periods=48, freq=datetools.Minute(30), tz=pytz.utc)
def test_infer_tz(self): eastern = self.tz('US/Eastern') utc = pytz.utc _start = datetime(2001, 1, 1) _end = datetime(2009, 1, 1) start = self.localize(eastern, _start) end = self.localize(eastern, _end) assert (tools._infer_tzinfo(start, end) is self.localize( eastern, _start).tzinfo) assert (tools._infer_tzinfo(start, None) is self.localize( eastern, _start).tzinfo) assert (tools._infer_tzinfo(None, end) is self.localize(eastern, _end).tzinfo) start = utc.localize(_start) end = utc.localize(_end) assert (tools._infer_tzinfo(start, end) is utc) end = self.localize(eastern, _end) self.assertRaises(Exception, tools._infer_tzinfo, start, end) self.assertRaises(Exception, tools._infer_tzinfo, end, start)
def test_index_astype_asobject_tzinfos(self): # #1345 # dates around a dst transition rng = date_range('2/13/2010', '5/6/2010', tz=self.tzstr('US/Eastern')) objs = rng.asobject for i, x in enumerate(objs): exval = rng[i] self.assertEqual(x, exval) self.assertEqual(x.tzinfo, exval.tzinfo) objs = rng.astype(object) for i, x in enumerate(objs): exval = rng[i] self.assertEqual(x, exval) self.assertEqual(x.tzinfo, exval.tzinfo)
def test_localized_at_time_between_time(self): from datetime import time rng = date_range('4/16/2012', '5/1/2012', freq='H') ts = Series(np.random.randn(len(rng)), index=rng) ts_local = ts.tz_localize(self.tzstr('US/Eastern')) result = ts_local.at_time(time(10, 0)) expected = ts.at_time(time(10, 0)).tz_localize(self.tzstr( 'US/Eastern')) tm.assert_series_equal(result, expected) self.assertTrue(self.cmptz(result.index.tz, self.tz('US/Eastern'))) t1, t2 = time(10, 0), time(11, 0) result = ts_local.between_time(t1, t2) expected = ts.between_time(t1, t2).tz_localize(self.tzstr('US/Eastern')) tm.assert_series_equal(result, expected) self.assertTrue(self.cmptz(result.index.tz, self.tz('US/Eastern')))
def test_series_frame_tz_localize(self): rng = date_range('1/1/2011', periods=100, freq='H') ts = Series(1, index=rng) result = ts.tz_localize('utc') self.assertEqual(result.index.tz.zone, 'UTC') df = DataFrame({'a': 1}, index=rng) result = df.tz_localize('utc') expected = DataFrame({'a': 1}, rng.tz_localize('UTC')) self.assertEqual(result.index.tz.zone, 'UTC') assert_frame_equal(result, expected) df = df.T result = df.tz_localize('utc', axis=1) self.assertEqual(result.columns.tz.zone, 'UTC') assert_frame_equal(result, expected.T) # Can't localize if already tz-aware rng = date_range('1/1/2011', periods=100, freq='H', tz='utc') ts = Series(1, index=rng) tm.assertRaisesRegexp(TypeError, 'Already tz-aware', ts.tz_localize, 'US/Eastern')
def test_series_frame_tz_convert(self): rng = date_range('1/1/2011', periods=200, freq='D', tz='US/Eastern') ts = Series(1, index=rng) result = ts.tz_convert('Europe/Berlin') self.assertEqual(result.index.tz.zone, 'Europe/Berlin') df = DataFrame({'a': 1}, index=rng) result = df.tz_convert('Europe/Berlin') expected = DataFrame({'a': 1}, rng.tz_convert('Europe/Berlin')) self.assertEqual(result.index.tz.zone, 'Europe/Berlin') assert_frame_equal(result, expected) df = df.T result = df.tz_convert('Europe/Berlin', axis=1) self.assertEqual(result.columns.tz.zone, 'Europe/Berlin') assert_frame_equal(result, expected.T) # can't convert tz-naive rng = date_range('1/1/2011', periods=200, freq='D') ts = Series(1, index=rng) tm.assertRaisesRegexp(TypeError, "Cannot convert tz-naive", ts.tz_convert, 'US/Eastern')
def test_append_aware_naive(self): rng1 = date_range('1/1/2011 01:00', periods=1, freq='H') rng2 = date_range('1/1/2011 02:00', periods=1, freq='H', tz='US/Eastern') ts1 = Series(np.random.randn(len(rng1)), index=rng1) ts2 = Series(np.random.randn(len(rng2)), index=rng2) ts_result = ts1.append(ts2) self.assertTrue(ts_result.index.equals(ts1.index.asobject.append( ts2.index.asobject))) # mixed rng1 = date_range('1/1/2011 01:00', periods=1, freq='H') rng2 = lrange(100) ts1 = Series(np.random.randn(len(rng1)), index=rng1) ts2 = Series(np.random.randn(len(rng2)), index=rng2) ts_result = ts1.append(ts2) self.assertTrue(ts_result.index.equals(ts1.index.asobject.append( ts2.index)))
def test_equal_join_ensure_utc(self): rng = date_range('1/1/2011', periods=10, freq='H', tz='US/Eastern') ts = Series(np.random.randn(len(rng)), index=rng) ts_moscow = ts.tz_convert('Europe/Moscow') result = ts + ts_moscow self.assertIs(result.index.tz, pytz.utc) result = ts_moscow + ts self.assertIs(result.index.tz, pytz.utc) df = DataFrame({'a': ts}) df_moscow = df.tz_convert('Europe/Moscow') result = df + df_moscow self.assertIs(result.index.tz, pytz.utc) result = df_moscow + df self.assertIs(result.index.tz, pytz.utc)
def test_arith_utc_convert(self): rng = date_range('1/1/2011', periods=100, freq='H', tz='utc') perm = np.random.permutation(100)[:90] ts1 = Series(np.random.randn(90), index=rng.take(perm).tz_convert('US/Eastern')) perm = np.random.permutation(100)[:90] ts2 = Series(np.random.randn(90), index=rng.take(perm).tz_convert('Europe/Berlin')) result = ts1 + ts2 uts1 = ts1.tz_convert('utc') uts2 = ts2.tz_convert('utc') expected = uts1 + uts2 self.assertEqual(result.index.tz, pytz.UTC) tm.assert_series_equal(result, expected)
def test_tzaware_offset(self): dates = date_range('2012-11-01', periods=3, tz='US/Pacific') offset = dates + offsets.Hour(5) self.assertEqual(dates[0] + offsets.Hour(5), offset[0]) # GH 6818 for tz in ['UTC', 'US/Pacific', 'Asia/Tokyo']: dates = date_range('2010-11-01 00:00', periods=3, tz=tz, freq='H') expected = DatetimeIndex(['2010-11-01 05:00', '2010-11-01 06:00', '2010-11-01 07:00'], freq='H', tz=tz) offset = dates + offsets.Hour(5) self.assertTrue(offset.equals(expected)) offset = dates + np.timedelta64(5, 'h') self.assertTrue(offset.equals(expected)) offset = dates + timedelta(hours=5) self.assertTrue(offset.equals(expected))
def create_curtailment_publish(self, current_time_str, device_name, meta): try: headers = { "Date": current_time_str, "min_compatible_version": "3.0", "MessageType": "Control" } subdevices = self.curtailment.get_device(device_name).command_status.keys() for subdevice in subdevices: currently_curtailed = self.curtailment.get_device(device_name).currently_curtailed[subdevice] curtailment_topic = "/".join([self.update_base_topic, device_name[0], subdevice]) curtailment_status = "Active" if currently_curtailed else "Inactive" curtailment_message = [ { "DeviceState": curtailment_status }, { "DeviceState": {"tz": "US/Pacific", "type": "string"} } ] self.vip.pubsub.publish('pubsub', curtailment_topic, headers=headers, message=curtailment_message).get(timeout=15.0) except: _log.debug("Unable to publish device/subdevice curtailment status message.")
def test_vtimezone_creation(self): tzs = dateutil.tz.tzical("test_files/timezones.ics") pacific = icalendar.TimezoneComponent(tzs.get('US/Pacific')) self.assertEqual( str(pacific), "<VTIMEZONE | <TZID{}US/Pacific>>" ) santiago = icalendar.TimezoneComponent(tzs.get('Santiago')) self.assertEqual( str(santiago), "<VTIMEZONE | <TZID{}Santiago>>" ) for year in range(2001, 2010): for month in (2, 9): dt = datetime.datetime(year, month, 15, tzinfo = tzs.get('Santiago')) #if dt.replace(tzinfo=tzs.get('Santiago')) != dt: self.assertTrue(dt.replace(tzinfo=tzs.get('Santiago')), dt)
def test_scratchbuild(self): """ CreateCalendar 2.0 format from scratch """ test_cal = get_test_file("simple_2_0_test.ics") cal = base.newFromBehavior('vcalendar', '2.0') cal.add('vevent') cal.vevent.add('dtstart').value = datetime.datetime(2006, 5, 9) cal.vevent.add('description').value = "Test event" cal.vevent.add('created').value = \ datetime.datetime(2006, 1, 1, 10, tzinfo=dateutil.tz.tzical( "test_files/timezones.ics").get('US/Pacific')) cal.vevent.add('uid').value = "Not very random UID" cal.vevent.add('dtstamp').value = datetime.datetime(2017, 6, 26, 0, tzinfo=tzutc()) # Note we're normalizing line endings, because no one got time for that. self.assertEqual( cal.serialize().replace('\r\n', '\n'), test_cal.replace('\r\n', '\n') )
def test_vtimezone_creation(self): """ Test timezones """ tzs = dateutil.tz.tzical("test_files/timezones.ics") pacific = icalendar.TimezoneComponent(tzs.get('US/Pacific')) self.assertEqual( str(pacific), "<VTIMEZONE | <TZID{}US/Pacific>>" ) santiago = icalendar.TimezoneComponent(tzs.get('Santiago')) self.assertEqual( str(santiago), "<VTIMEZONE | <TZID{}Santiago>>" ) for year in range(2001, 2010): for month in (2, 9): dt = datetime.datetime(year, month, 15, tzinfo=tzs.get('Santiago')) self.assertTrue(dt.replace(tzinfo=tzs.get('Santiago')), dt)
def test_timezone_serializing(): """ Serializing with timezones test """ tzs = dateutil.tz.tzical("test_files/timezones.ics") pacific = tzs.get('US/Pacific') cal = base.Component('VCALENDAR') cal.setBehavior(icalendar.VCalendar2_0) ev = cal.add('vevent') ev.add('dtstart').value = datetime.datetime(2005, 10, 12, 9, tzinfo=pacific) evruleset = rruleset() evruleset.rrule(rrule(WEEKLY, interval=2, byweekday=[2,4], until=datetime.datetime(2005, 12, 15, 9))) evruleset.rrule(rrule(MONTHLY, bymonthday=[-1,-5])) evruleset.exdate(datetime.datetime(2005, 10, 14, 9, tzinfo=pacific)) ev.rruleset = evruleset ev.add('duration').value = datetime.timedelta(hours=1) apple = tzs.get('America/Montreal') ev.dtstart.value = datetime.datetime(2005, 10, 12, 9, tzinfo=apple)
def now(): return datetime.now(dateutil.tz.tzlocal())
def as_local(stamp): return stamp.astimezone(dateutil.tz.tzlocal())
def security_credentials_role_name(): role_arn = _get_role_arn() credentials = _credential_map.get(role_arn) # Refresh credentials if going to expire soon. now = datetime.datetime.now(tz=dateutil.tz.tzutc()) if not credentials or credentials['Expiration'] < now + _refresh_timeout: try: # Use any boto3 credential provider except the instance metadata provider. botocore_session = botocore.session.Session() botocore_session.get_component('credential_provider').remove('iam-role') session = boto3.session.Session(botocore_session=botocore_session) credentials = session.client('sts').assume_role(RoleArn=role_arn, RoleSessionName="ectou-metadata")['Credentials'] credentials['LastUpdated'] = now _credential_map[role_arn] = credentials except Exception as e: bottle.response.status = 404 bottle.response.content_type = 'text/plain' # EC2 serves json as text/plain return json.dumps({ 'Code': 'Failure', 'Message': e.message, }, indent=2) # Return current credential. bottle.response.content_type = 'text/plain' # EC2 serves json as text/plain return json.dumps({ 'Code': 'Success', 'LastUpdated': _format_iso(credentials['LastUpdated']), "Type": "AWS-HMAC", 'AccessKeyId': credentials['AccessKeyId'], 'SecretAccessKey': credentials['SecretAccessKey'], 'Token': credentials['SessionToken'], 'Expiration': _format_iso(credentials['Expiration']) }, indent=2)
def datetime_now(): # type: (None) -> datetime.datetime """Return a timezone-aware datetime instance with local offset :rtype: datetime.datetime :return: datetime now with local tz """ return datetime.datetime.now(tz=dateutil.tz.tzlocal())
def _create_downloader_for_start(td): d = ops.Downloader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) d._cleanup_temporary_files = mock.MagicMock() d._download_start = datetime.datetime.now(tz=dateutil.tz.tzlocal()) d._initialize_transfer_threads = mock.MagicMock() d._general_options.concurrency.crypto_processes = 1 d._general_options.concurrency.md5_processes = 1 d._general_options.concurrency.disk_threads = 1 d._general_options.concurrency.transfer_threads = 1 d._general_options.resume_file = pathlib.Path(str(td.join('rf'))) d._spec.sources = [] d._spec.options = mock.MagicMock() d._spec.options.chunk_size_bytes = 1 d._spec.options.mode = azmodels.StorageModes.Auto d._spec.options.overwrite = True d._spec.options.rename = False d._spec.skip_on = mock.MagicMock() d._spec.skip_on.md5_match = False d._spec.skip_on.lmt_ge = False d._spec.skip_on.filesize_match = False d._spec.destination = mock.MagicMock() d._spec.destination.path = pathlib.Path(str(td)) d._download_start_time = util.datetime_now() d._pre_md5_skip_on_check = mock.MagicMock() d._check_download_conditions = mock.MagicMock() d._all_remote_files_processed = False p = '/cont/remote/path' asp = azops.SourcePath() asp.add_path_with_storage_account(p, 'sa') d._spec.sources.append(asp) return d
def is_time_to_send_summary(self, bot, report_by): tz = dateutil.tz.gettz(bot.plugin_config['timezone']) report_by = dateutil.parser.parse(report_by).replace(tzinfo=tz) now = datetime.now(dateutil.tz.tzlocal()) return now >= report_by
def is_last_call(self, bot, report_by): tz = dateutil.tz.gettz(bot.plugin_config['timezone']) report_by = dateutil.parser.parse(report_by).replace(tzinfo=tz) last_call = dateutil.parser.parse(bot.plugin_config['last_call']) last_call = timedelta(hours=last_call.hour, minutes=last_call.minute) if not last_call: return now = datetime.now(dateutil.tz.tzlocal()) return report_by - now < last_call
def is_too_early_to_ask(self, bot, ask_earliest): tz = dateutil.tz.gettz(bot.plugin_config['timezone']) ask_earliest = dateutil.parser.parse(ask_earliest).replace(tzinfo=tz) now = datetime.now(dateutil.tz.tzlocal()) return now < ask_earliest
def inject_globals(): import datetime import dateutil.tz import pytz utc = datetime.datetime.now(tz=pytz.utc) sa_tz = pytz.timezone('Africa/Johannesburg') sa = utc.astimezone(sa_tz) local_tz = dateutil.tz.tzlocal() local = utc.astimezone(local_tz) cti = utc.strftime('%Y-%m-%d %H:%M') + ' (UTC) • ' + sa.strftime('%H:%M (%Z)') if local.tzname() not in ('UTC', sa.tzname()): cti += ' • ' + local.strftime('%H:%M (%Z)') vi = 'Librarian %s (%s)' % (app.config['_version_string'], app.config['_git_hash']) lds_info = app.config.get('local_disk_staging') if lds_info is not None: staging_available = True staging_dest_displayed = lds_info['displayed_dest'] staging_dest_path = lds_info['dest_prefix'] staging_username_placeholder = lds_info['username_placeholder'] else: staging_available = False staging_dest_displayed = None staging_dest_path = None staging_username_placeholder = None return { 'current_time_info': cti, 'version_info': vi, 'staging_available': staging_available, 'staging_dest_displayed': staging_dest_displayed, 'staging_dest_path': staging_dest_path, 'staging_username_placeholder': staging_username_placeholder, } # JSON API
def test_index_unique(self): uniques = self.dups.index.unique() expected = DatetimeIndex([datetime(2000, 1, 2), datetime(2000, 1, 3), datetime(2000, 1, 4), datetime(2000, 1, 5)]) self.assertEqual(uniques.dtype, 'M8[ns]') # sanity self.assertTrue(uniques.equals(expected)) self.assertEqual(self.dups.index.nunique(), 4) # #2563 self.assertTrue(isinstance(uniques, DatetimeIndex)) dups_local = self.dups.index.tz_localize('US/Eastern') dups_local.name = 'foo' result = dups_local.unique() expected = DatetimeIndex(expected).tz_localize('US/Eastern') self.assertTrue(result.tz is not None) self.assertEqual(result.name, 'foo') self.assertTrue(result.equals(expected)) # NaT, note this is excluded arr = [1370745748 + t for t in range(20)] + [iNaT] idx = DatetimeIndex(arr * 3) self.assertTrue(idx.unique().equals(DatetimeIndex(arr))) self.assertEqual(idx.nunique(), 20) self.assertEqual(idx.nunique(dropna=False), 21) arr = [Timestamp('2013-06-09 02:42:28') + timedelta(seconds=t) for t in range(20)] + [NaT] idx = DatetimeIndex(arr * 3) self.assertTrue(idx.unique().equals(DatetimeIndex(arr))) self.assertEqual(idx.nunique(), 20) self.assertEqual(idx.nunique(dropna=False), 21)
def test_recreate_from_data(self): freqs = ['M', 'Q', 'A', 'D', 'B', 'BH', 'T', 'S', 'L', 'U', 'H', 'N', 'C'] for f in freqs: org = DatetimeIndex(start='2001/02/01 09:00', freq=f, periods=1) idx = DatetimeIndex(org, freq=f) self.assertTrue(idx.equals(org)) org = DatetimeIndex(start='2001/02/01 09:00', freq=f, tz='US/Pacific', periods=1) idx = DatetimeIndex(org, freq=f, tz='US/Pacific') self.assertTrue(idx.equals(org))
def assert_range_equal(left, right): assert (left.equals(right)) assert (left.freq == right.freq) assert (left.tz == right.tz)
def test_timestamp_to_datetime(self): tm._skip_if_no_pytz() rng = date_range('20090415', '20090519', tz='US/Eastern') stamp = rng[0] dtval = stamp.to_pydatetime() self.assertEqual(stamp, dtval) self.assertEqual(stamp.tzinfo, dtval.tzinfo)
def test_timestamp_to_datetime_explicit_pytz(self): tm._skip_if_no_pytz() import pytz rng = date_range('20090415', '20090519', tz=pytz.timezone('US/Eastern')) stamp = rng[0] dtval = stamp.to_pydatetime() self.assertEqual(stamp, dtval) self.assertEqual(stamp.tzinfo, dtval.tzinfo)
def test_timestamp_to_datetime_explicit_dateutil(self): tm._skip_if_windows_python_3() tm._skip_if_no_dateutil() from pandas.tslib import _dateutil_gettz as gettz rng = date_range('20090415', '20090519', tz=gettz('US/Eastern')) stamp = rng[0] dtval = stamp.to_pydatetime() self.assertEqual(stamp, dtval) self.assertEqual(stamp.tzinfo, dtval.tzinfo)
def test_to_datetime_utc_is_true(self): # See gh-11934 start = pd.Timestamp('2014-01-01', tz='utc') end = pd.Timestamp('2014-01-03', tz='utc') date_range = pd.bdate_range(start, end) result = pd.to_datetime(date_range, utc=True) expected = pd.DatetimeIndex(data=date_range) tm.assert_index_equal(result, expected)
def test_to_datetime_tz_psycopg2(self): # xref 8260 try: import psycopg2 except ImportError: raise nose.SkipTest("no psycopg2 installed") # misc cases tz1 = psycopg2.tz.FixedOffsetTimezone(offset=-300, name=None) tz2 = psycopg2.tz.FixedOffsetTimezone(offset=-240, name=None) arr = np.array([datetime(2000, 1, 1, 3, 0, tzinfo=tz1), datetime(2000, 6, 1, 3, 0, tzinfo=tz2)], dtype=object) result = pd.to_datetime(arr, errors='coerce', utc=True) expected = DatetimeIndex(['2000-01-01 08:00:00+00:00', '2000-06-01 07:00:00+00:00'], dtype='datetime64[ns, UTC]', freq=None) tm.assert_index_equal(result, expected) # dtype coercion i = pd.DatetimeIndex([ '2000-01-01 08:00:00+00:00' ], tz=psycopg2.tz.FixedOffsetTimezone(offset=-300, name=None)) self.assertFalse(com.is_datetime64_ns_dtype(i)) # tz coerceion result = pd.to_datetime(i, errors='coerce') tm.assert_index_equal(result, i) result = pd.to_datetime(i, errors='coerce', utc=True) expected = pd.DatetimeIndex(['2000-01-01 13:00:00'], dtype='datetime64[ns, UTC]') tm.assert_index_equal(result, expected)
def test_to_datetime_freq(self): xp = bdate_range('2000-1-1', periods=10, tz='UTC') rs = xp.to_datetime() self.assertEqual(xp.freq, rs.freq) self.assertEqual(xp.tzinfo, rs.tzinfo)
def test_to_period_tz_pytz(self): tm._skip_if_no_pytz() from dateutil.tz import tzlocal from pytz import utc as UTC xp = date_range('1/1/2000', '4/1/2000').to_period() ts = date_range('1/1/2000', '4/1/2000', tz='US/Eastern') result = ts.to_period()[0] expected = ts[0].to_period() self.assertEqual(result, expected) self.assertTrue(ts.to_period().equals(xp)) ts = date_range('1/1/2000', '4/1/2000', tz=UTC) result = ts.to_period()[0] expected = ts[0].to_period() self.assertEqual(result, expected) self.assertTrue(ts.to_period().equals(xp)) ts = date_range('1/1/2000', '4/1/2000', tz=tzlocal()) result = ts.to_period()[0] expected = ts[0].to_period() self.assertEqual(result, expected) self.assertTrue(ts.to_period().equals(xp))