Python pytz 模块,AmbiguousTimeError() 实例源码

我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用pytz.AmbiguousTimeError()

项目:CommunityCellularManager    作者:facebookincubator    | 项目源码 | 文件源码
def timezone_offset(timezone):
    """Takes a tz name like 'US/Eastern' and returns 'US/Eastern - UTC-05:00'.

    Intended to be used by the timezone notification page fragment.  The pytz
    package should gracefully handle DST so the above will render 'US/Eastern -
    UTC-04:00' when DST is in effect.
    """
    if timezone == 'UTC':
        return 'UTC'
    now = datetime.datetime.now()
    try:
        seconds = pytz.timezone(timezone).utcoffset(now).total_seconds()
    except (pytz.NonExistentTimeError, pytz.AmbiguousTimeError):
        # If we're in the midst of a DST transition, add an hour and try again.
        now = now + datetime.timedelta(hours=1)
        seconds = pytz.timezone(timezone).utcoffset(now).total_seconds()
    sign = '+'
    if seconds < 0:
        # The minus sign is added automatically!
        sign = ''
    hours, remainder = divmod(seconds, 60*60)
    minutes, _ = divmod(remainder, 60)
    offset = '%02d:%02d' % (hours, minutes)
    display = '%s (UTC%s%s)' % (timezone, sign, offset)
    return display
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def test_tz_localize_dti(self):
        dti = DatetimeIndex(start='1/1/2005', end='1/1/2005 0:00:30.256',
                            freq='L')
        dti2 = dti.tz_localize(self.tzstr('US/Eastern'))

        dti_utc = DatetimeIndex(start='1/1/2005 05:00',
                                end='1/1/2005 5:00:30.256', freq='L', tz='utc')

        self.assert_numpy_array_equal(dti2.values, dti_utc.values)

        dti3 = dti2.tz_convert(self.tzstr('US/Pacific'))
        self.assert_numpy_array_equal(dti3.values, dti_utc.values)

        dti = DatetimeIndex(start='11/6/2011 1:59', end='11/6/2011 2:00',
                            freq='L')
        self.assertRaises(pytz.AmbiguousTimeError, dti.tz_localize,
                          self.tzstr('US/Eastern'))

        dti = DatetimeIndex(start='3/13/2011 1:59', end='3/13/2011 2:00',
                            freq='L')
        self.assertRaises(pytz.NonExistentTimeError, dti.tz_localize,
                          self.tzstr('US/Eastern'))
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def test_with_tz_ambiguous_times(self):
        tz = self.tz('US/Eastern')

        # March 13, 2011, spring forward, skip from 2 AM to 3 AM
        dr = date_range(datetime(2011, 3, 13, 1, 30), periods=3,
                        freq=datetools.Hour())
        self.assertRaises(pytz.NonExistentTimeError, dr.tz_localize, tz)

        # after dst transition, it works
        dr = date_range(datetime(2011, 3, 13, 3, 30), periods=3,
                        freq=datetools.Hour(), tz=tz)

        # November 6, 2011, fall back, repeat 2 AM hour
        dr = date_range(datetime(2011, 11, 6, 1, 30), periods=3,
                        freq=datetools.Hour())
        self.assertRaises(pytz.AmbiguousTimeError, dr.tz_localize, tz)

        # UTC is OK
        dr = date_range(datetime(2011, 3, 13), periods=48,
                        freq=datetools.Minute(30), tz=pytz.utc)
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def _CombineDateAndTime(date, time, tzinfo):
    """Creates a datetime object from date and time objects.

    This is similar to the datetime.combine method, but its timezone
    calculations are designed to work with pytz.

    Arguments:
      date: a datetime.date object, in any timezone
      time: a datetime.time object, in any timezone
      tzinfo: a pytz timezone object, or None

    Returns:
      a datetime.datetime object, in the timezone 'tzinfo'
    """
    naive_result = datetime.datetime(
        date.year, date.month, date.day, time.hour, time.minute, time.second)
    if tzinfo is None:
      return naive_result

    try:
      return tzinfo.localize(naive_result, is_dst=None)
    except AmbiguousTimeError:


      return min(tzinfo.localize(naive_result, is_dst=True),
                 tzinfo.localize(naive_result, is_dst=False))
    except NonExistentTimeError:




      while True:
        naive_result += datetime.timedelta(minutes=1)
        try:
          return tzinfo.localize(naive_result, is_dst=None)
        except NonExistentTimeError:
          pass
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def _CombineDateAndTime(date, time, tzinfo):
    """Creates a datetime object from date and time objects.

    This is similar to the datetime.combine method, but its timezone
    calculations are designed to work with pytz.

    Arguments:
      date: a datetime.date object, in any timezone
      time: a datetime.time object, in any timezone
      tzinfo: a pytz timezone object, or None

    Returns:
      a datetime.datetime object, in the timezone 'tzinfo'
    """
    naive_result = datetime.datetime(
        date.year, date.month, date.day, time.hour, time.minute, time.second)
    if tzinfo is None:
      return naive_result

    try:
      return tzinfo.localize(naive_result, is_dst=None)
    except AmbiguousTimeError:


      return min(tzinfo.localize(naive_result, is_dst=True),
                 tzinfo.localize(naive_result, is_dst=False))
    except NonExistentTimeError:




      while True:
        naive_result += datetime.timedelta(minutes=1)
        try:
          return tzinfo.localize(naive_result, is_dst=None)
        except NonExistentTimeError:
          pass
项目:bart-crime    作者:ben174    | 项目源码 | 文件源码
def parse_time(input_time):
    """Parse an Atom time stamp."""
    parsed = None
    try:
        parsed = timezone.make_aware(timezone.datetime(*input_time[:-3]),
                                     timezone.utc)
    except (pytz.NonExistentTimeError, pytz.AmbiguousTimeError):
        added_hour = (timezone.datetime(*input_time[:-3]) +
                      datetime.timedelta(hours=1))
        parsed = timezone.make_aware(added_hour, timezone.utc)
    return parsed.astimezone(TZ)
项目:tzcron    作者:bloomberg    | 项目源码 | 文件源码
def test_end_dst_double_occurrence(self):
        cron_expression = "30 1 * * * *"
        timezone = pytz.timezone("Europe/London")
        start = dt.datetime.strptime('2015-10-25T00:00:00',
                                     "%Y-%m-%dT%H:%M:%S")
        start = timezone.localize(start, is_dst=True)
        testee = tzcron.Schedule(cron_expression, timezone, start)
        self.assertRaises(pytz.AmbiguousTimeError,
                          lambda: next(testee))
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def test_ambiguous_infer(self):
        # November 6, 2011, fall back, repeat 2 AM hour
        # With no repeated hours, we cannot infer the transition
        tz = self.tz('US/Eastern')
        dr = date_range(datetime(2011, 11, 6, 0), periods=5,
                        freq=datetools.Hour())
        self.assertRaises(pytz.AmbiguousTimeError, dr.tz_localize, tz)

        # With repeated hours, we can infer the transition
        dr = date_range(datetime(2011, 11, 6, 0), periods=5,
                        freq=datetools.Hour(), tz=tz)
        times = ['11/06/2011 00:00', '11/06/2011 01:00', '11/06/2011 01:00',
                 '11/06/2011 02:00', '11/06/2011 03:00']
        di = DatetimeIndex(times)
        localized = di.tz_localize(tz, ambiguous='infer')
        self.assert_numpy_array_equal(dr, localized)
        with tm.assert_produces_warning(FutureWarning):
            localized_old = di.tz_localize(tz, infer_dst=True)
        self.assert_numpy_array_equal(dr, localized_old)
        self.assert_numpy_array_equal(dr, DatetimeIndex(times, tz=tz,
                                                        ambiguous='infer'))

        # When there is no dst transition, nothing special happens
        dr = date_range(datetime(2011, 6, 1, 0), periods=10,
                        freq=datetools.Hour())
        localized = dr.tz_localize(tz)
        localized_infer = dr.tz_localize(tz, ambiguous='infer')
        self.assert_numpy_array_equal(localized, localized_infer)
        with tm.assert_produces_warning(FutureWarning):
            localized_infer_old = dr.tz_localize(tz, infer_dst=True)
        self.assert_numpy_array_equal(localized, localized_infer_old)
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def _CombineDateAndTime(date, time, tzinfo):
    """Creates a datetime object from date and time objects.

    This is similar to the datetime.combine method, but its timezone
    calculations are designed to work with pytz.

    Arguments:
      date: a datetime.date object, in any timezone
      time: a datetime.time object, in any timezone
      tzinfo: a pytz timezone object, or None

    Returns:
      a datetime.datetime object, in the timezone 'tzinfo'
    """
    naive_result = datetime.datetime(
        date.year, date.month, date.day, time.hour, time.minute, time.second)
    if tzinfo is None:
      return naive_result

    try:
      return tzinfo.localize(naive_result, is_dst=None)
    except AmbiguousTimeError:


      return min(tzinfo.localize(naive_result, is_dst=True),
                 tzinfo.localize(naive_result, is_dst=False))
    except NonExistentTimeError:




      while True:
        naive_result += datetime.timedelta(minutes=1)
        try:
          return tzinfo.localize(naive_result, is_dst=None)
        except NonExistentTimeError:
          pass
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def _CombineDateAndTime(date, time, tzinfo):
    """Creates a datetime object from date and time objects.

    This is similar to the datetime.combine method, but its timezone
    calculations are designed to work with pytz.

    Arguments:
      date: a datetime.date object, in any timezone
      time: a datetime.time object, in any timezone
      tzinfo: a pytz timezone object, or None

    Returns:
      a datetime.datetime object, in the timezone 'tzinfo'
    """
    naive_result = datetime.datetime(
        date.year, date.month, date.day, time.hour, time.minute, time.second)
    if tzinfo is None:
      return naive_result

    try:
      return tzinfo.localize(naive_result, is_dst=None)
    except AmbiguousTimeError:


      return min(tzinfo.localize(naive_result, is_dst=True),
                 tzinfo.localize(naive_result, is_dst=False))
    except NonExistentTimeError:




      while True:
        naive_result += datetime.timedelta(minutes=1)
        try:
          return tzinfo.localize(naive_result, is_dst=None)
        except NonExistentTimeError:
          pass
项目:Docker-XX-Net    作者:kuanghy    | 项目源码 | 文件源码
def _CombineDateAndTime(date, time, tzinfo):
    """Creates a datetime object from date and time objects.

    This is similar to the datetime.combine method, but its timezone
    calculations are designed to work with pytz.

    Arguments:
      date: a datetime.date object, in any timezone
      time: a datetime.time object, in any timezone
      tzinfo: a pytz timezone object, or None

    Returns:
      a datetime.datetime object, in the timezone 'tzinfo'
    """
    naive_result = datetime.datetime(
        date.year, date.month, date.day, time.hour, time.minute, time.second)
    if tzinfo is None:
      return naive_result

    try:
      return tzinfo.localize(naive_result, is_dst=None)
    except AmbiguousTimeError:


      return min(tzinfo.localize(naive_result, is_dst=True),
                 tzinfo.localize(naive_result, is_dst=False))
    except NonExistentTimeError:




      while True:
        naive_result += datetime.timedelta(minutes=1)
        try:
          return tzinfo.localize(naive_result, is_dst=None)
        except NonExistentTimeError:
          pass
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def infer_freq(index, warn=True):
    """
    Infer the most likely frequency given the input index. If the frequency is
    uncertain, a warning will be printed.

    Parameters
    ----------
    index : DatetimeIndex or TimedeltaIndex
      if passed a Series will use the values of the series (NOT THE INDEX)
    warn : boolean, default True

    Returns
    -------
    freq : string or None
        None if no discernible frequency
        TypeError if the index is not datetime-like
        ValueError if there are less than three values.
    """
    import pandas as pd

    if isinstance(index, com.ABCSeries):
        values = index._values
        if not (com.is_datetime64_dtype(values) or
                com.is_timedelta64_dtype(values) or
                values.dtype == object):
            raise TypeError("cannot infer freq from a non-convertible "
                            "dtype on a Series of {0}".format(index.dtype))
        index = values

    if com.is_period_arraylike(index):
        raise TypeError("PeriodIndex given. Check the `freq` attribute "
                        "instead of using infer_freq.")
    elif isinstance(index, pd.TimedeltaIndex):
        inferer = _TimedeltaFrequencyInferer(index, warn=warn)
        return inferer.get_freq()

    if isinstance(index, pd.Index) and not isinstance(index, pd.DatetimeIndex):
        if isinstance(index, (pd.Int64Index, pd.Float64Index)):
            raise TypeError("cannot infer freq from a non-convertible index "
                            "type {0}".format(type(index)))
        index = index.values

    if not isinstance(index, pd.DatetimeIndex):
        try:
            index = pd.DatetimeIndex(index)
        except AmbiguousTimeError:
            index = pd.DatetimeIndex(index.asi8)

    inferer = _FrequencyInferer(index, warn=warn)
    return inferer.get_freq()
项目:chomp-decomposition    作者:Staffjoy    | 项目源码 | 文件源码
def _process_task(self, task):
        # 1. Fetch schedule
        self.org = self.client.get_organization(
            task.data.get("organization_id"))
        self.loc = self.org.get_location(task.data.get("location_id"))
        self.role = self.loc.get_role(task.data.get("role_id"))
        self.sched = self.role.get_schedule(task.data.get("schedule_id"))

        self._compute_demand()
        self._subtract_existing_shifts_from_demand()

        # Run the  calculation
        s = Splitter(self.demand,
                     self.sched.data.get("min_shift_length_hour"),
                     self.sched.data.get("max_shift_length_hour"))
        s.calculate()
        s.efficiency()

        # Naive becuase not yet datetimes
        naive_shifts = s.get_shifts()
        logger.info("Starting upload of %s shifts", len(naive_shifts))

        local_start_time = self._get_local_start_time()

        for shift in naive_shifts:
            # We have to think of daylight savings time here, so we need to
            # guarantee that we don't have any errors. We do this by overshooting
            # the timedelta by an extra two hours, then rounding back to midnight.

            logger.debug("Processing shift %s", shift)

            start_day = normalize_to_midnight(
                deepcopy(local_start_time) + timedelta(days=shift["day"]))

            # Beware of time changes - duplicate times are possible
            try:
                start = start_day.replace(hour=shift["start"])
            except pytz.AmbiguousTimeError:
                # Randomly pick one. Minor tech debt.
                start = start_day.replace(hour=shift["start"], is_dst=False)

            stop = start + timedelta(hours=shift["length"])

            # Convert to the strings we are passing up to the cLoUd
            utc_start_str = start.astimezone(self.default_tz).isoformat()
            utc_stop_str = stop.astimezone(self.default_tz).isoformat()

            logger.info("Creating shift with start %s stop %s", start, stop)
            self.role.create_shift(start=utc_start_str, stop=utc_stop_str)
项目:chomp-decomposition    作者:Staffjoy    | 项目源码 | 文件源码
def _subtract_existing_shifts_from_demand(self):
        logger.info("Starting demand: %s", self.demand)
        demand_copy = deepcopy(self.demand)
        search_start = (self._get_local_start_time() - timedelta(
            hours=config.MAX_SHIFT_LENGTH_HOURS)).astimezone(self.default_tz)
        # 1 week
        search_end = (self._get_local_start_time() + timedelta(
            days=7, hours=config.MAX_SHIFT_LENGTH_HOURS)
                      ).astimezone(self.default_tz)

        shifts = self.role.get_shifts(start=search_start, end=search_end)

        logger.info("Checking %s shifts for existing demand", len(shifts))

        # Search hour by hour throughout the weeks
        for day in range(len(self.demand)):
            start_day = normalize_to_midnight(self._get_local_start_time() +
                                              timedelta(days=day))
            for start in range(len(self.demand[0])):

                # Beware of time changes - duplicate times are possible
                try:
                    start_hour = deepcopy(start_day).replace(hour=start)
                except pytz.AmbiguousTimeError:
                    # Randomly pick one - cause phucket. Welcome to chomp.
                    start_hour = deepcopy(start_day).replace(
                        hour=start, is_dst=False)

                try:
                    stop_hour = start_hour + timedelta(hours=1)
                except pytz.AmbiguousTimeError:
                    stop_hour = start_hour + timedelta(hours=1, is_dst=False)

                # Find shift
                current_staffing_level = 0
                for shift in shifts:
                    shift_start = iso8601.parse_date(
                        shift.data.get("start")).replace(
                            tzinfo=self.default_tz)
                    shift_stop = iso8601.parse_date(
                        shift.data.get("stop")).replace(tzinfo=self.default_tz)

                    if ((shift_start <= start_hour and shift_stop > stop_hour)
                            or
                        (shift_start >= start_hour and shift_start < stop_hour)
                            or
                        (shift_stop > start_hour and shift_stop <= stop_hour)):

                        # increment staffing level during that bucket
                        current_staffing_level += 1

                logger.debug("Current staffing level at day %s time %s is %s",
                             day, start, current_staffing_level)

                demand_copy[day][start] -= current_staffing_level
                # demand cannot be less than zero
                if demand_copy[day][start] < 0:
                    demand_copy[day][start] = 0

        logger.info("Demand minus existing shifts: %s", demand_copy)
        self.demand = demand_copy