Python pytz 模块,utc() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pytz.utc()

项目:django_pipedrive    作者:MasAval    | 项目源码 | 文件源码
def datetime_from_simple_time(cls, el, datetime_field):
        """
        The function takes a datetime_fieldname and
        returns a datetime aware of the timezone.
        Returns None if fields do not exist in el.
        """
        if (datetime_field not in el or
            el[datetime_field] is None or
            el[datetime_field] == '0000-00-00 00:00:00' or
                el[datetime_field] == ''):

            return None
        else:
            return timezone.make_aware(
                datetime.datetime.strptime(u"{} UTC".format(el[datetime_field]), "%Y-%m-%d %H:%M:%S %Z"),
                pytz.utc)
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def _create_daily_stats(self, perfs):
        # create daily and cumulative stats dataframe
        daily_perfs = []
        # TODO: the loop here could overwrite expected properties
        # of daily_perf. Could potentially raise or log a
        # warning.
        for perf in perfs:
            if 'daily_perf' in perf:

                perf['daily_perf'].update(
                    perf['daily_perf'].pop('recorded_vars')
                )
                perf['daily_perf'].update(perf['cumulative_risk_metrics'])
                daily_perfs.append(perf['daily_perf'])
            else:
                self.risk_report = perf

        daily_dts = [np.datetime64(perf['period_close'], utc=True)
                     for perf in daily_perfs]
        daily_stats = pd.DataFrame(daily_perfs, index=daily_dts)

        return daily_stats
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def on_dt_changed(self, dt):
        """
        Callback triggered by the simulation loop whenever the current dt
        changes.

        Any logic that should happen exactly once at the start of each datetime
        group should happen here.
        """
        assert isinstance(dt, datetime), \
            "Attempt to set algorithm's current time with non-datetime"
        assert dt.tzinfo == pytz.utc, \
            "Algorithm expects a utc datetime"

        self.datetime = dt
        self.perf_tracker.set_date(dt)
        self.blotter.set_date(dt)
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def create_test_panel_ohlc_source(sim_params, env):
    start = sim_params.first_open \
        if sim_params else pd.datetime(1990, 1, 3, 0, 0, 0, 0, pytz.utc)

    end = sim_params.last_close \
        if sim_params else pd.datetime(1990, 1, 8, 0, 0, 0, 0, pytz.utc)

    index = env.days_in_range(start, end)
    price = np.arange(0, len(index)) + 100
    high = price * 1.05
    low = price * 0.95
    open_ = price + .1 * (price % 2 - .5)
    volume = np.ones(len(index)) * 1000
    arbitrary = np.ones(len(index))

    df = pd.DataFrame({'price': price,
                       'high': high,
                       'low': low,
                       'open': open_,
                       'volume': volume,
                       'arbitrary': arbitrary},
                      index=index)
    panel = pd.Panel.from_dict({0: df})

    return DataPanelSource(panel), panel
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def _coerce_datetime(maybe_dt):
    if isinstance(maybe_dt, datetime.datetime):
        return maybe_dt
    elif isinstance(maybe_dt, datetime.date):
        return datetime.datetime(
            year=maybe_dt.year,
            month=maybe_dt.month,
            day=maybe_dt.day,
            tzinfo=pytz.utc,
        )
    elif isinstance(maybe_dt, (tuple, list)) and len(maybe_dt) == 3:
        year, month, day = maybe_dt
        return datetime.datetime(
            year=year,
            month=month,
            day=day,
            tzinfo=pytz.utc,
        )
    else:
        raise TypeError('Cannot coerce %s into a datetime.datetime'
                        % type(maybe_dt).__name__)
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def test_generator_dates(self):
        """
        Ensure the pipeline of generators are in sync, at least as far as
        their current dates.
        """

        sim_params = factory.create_simulation_parameters(
            start=datetime(2011, 7, 30, tzinfo=pytz.utc),
            end=datetime(2012, 7, 30, tzinfo=pytz.utc),
            env=self.env,
        )
        algo = TestAlgo(self, sim_params=sim_params, env=self.env)
        trade_source = factory.create_daily_trade_source(
            [8229],
            sim_params,
            env=self.env,
        )
        algo.set_sources([trade_source])

        gen = algo.get_generator()
        self.assertTrue(list(gen))

        self.assertTrue(algo.slippage.latest_date)
        self.assertTrue(algo.latest_date)
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def test_progress(self):
        """
        Ensure the pipeline of generators are in sync, at least as far as
        their current dates.
        """
        sim_params = factory.create_simulation_parameters(
            start=datetime(2008, 1, 1, tzinfo=pytz.utc),
            end=datetime(2008, 1, 5, tzinfo=pytz.utc),
            env=self.env,
        )
        algo = TestAlgo(self, sim_params=sim_params, env=self.env)
        trade_source = factory.create_daily_trade_source(
            [8229],
            sim_params,
            env=self.env,
        )
        algo.set_sources([trade_source])

        gen = algo.get_generator()
        results = list(gen)
        self.assertEqual(results[-2]['progress'], 1.0)
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def test_serialization(self):
        start_dt = datetime(year=2008,
                            month=10,
                            day=9,
                            tzinfo=pytz.utc)
        end_dt = datetime(year=2008,
                          month=10,
                          day=16,
                          tzinfo=pytz.utc)

        sim_params = SimulationParameters(
            period_start=start_dt,
            period_end=end_dt,
            env=self.env,
        )

        perf_tracker = perf.PerformanceTracker(
            sim_params, env=self.env
        )
        check_perf_tracker_serialization(perf_tracker)
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def test_yahoo_bars_to_panel_source(self):
        env = TradingEnvironment()
        finder = AssetFinder(env.engine)
        stocks = ['AAPL', 'GE']
        env.write_data(equities_identifiers=stocks)
        start = pd.datetime(1993, 1, 1, 0, 0, 0, 0, pytz.utc)
        end = pd.datetime(2002, 1, 1, 0, 0, 0, 0, pytz.utc)
        data = factory.load_bars_from_yahoo(stocks=stocks,
                                            indexes={},
                                            start=start,
                                            end=end)
        check_fields = ['sid', 'open', 'high', 'low', 'close',
                        'volume', 'price']

        copy_panel = data.copy()
        sids = finder.map_identifier_index_to_sids(
            data.items, data.major_axis[0]
        )
        copy_panel.items = sids
        source = DataPanelSource(copy_panel)
        for event in source:
            for check_field in check_fields:
                self.assertIn(check_field, event)
            self.assertTrue(isinstance(event['volume'], (integer_types)))
            self.assertTrue(event['sid'] in sids)
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def test_load_bars_from_yahoo(self):
        stocks = ['AAPL', 'GE']
        start = pd.datetime(1993, 1, 1, 0, 0, 0, 0, pytz.utc)
        end = pd.datetime(2002, 1, 1, 0, 0, 0, 0, pytz.utc)
        data = load_bars_from_yahoo(stocks=stocks, start=start, end=end)

        assert data.major_axis[0] == pd.Timestamp('1993-01-04 00:00:00+0000')
        assert data.major_axis[-1] == pd.Timestamp('2001-12-31 00:00:00+0000')
        for stock in stocks:
            assert stock in data.items

        for ohlc in ['open', 'high', 'low', 'close', 'volume', 'price']:
            assert ohlc in data.minor_axis

        np.testing.assert_raises(
            AssertionError, load_bars_from_yahoo, stocks=stocks,
            start=end, end=start
        )
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def test_day_after_thanksgiving(self):
        early_closes = tradingcalendar.get_early_closes(
            tradingcalendar.start,
            tradingcalendar.end.replace(year=tradingcalendar.end.year + 1)
        )

        #    November 2012
        # Su Mo Tu We Th Fr Sa
        #              1  2  3
        #  4  5  6  7  8  9 10
        # 11 12 13 14 15 16 17
        # 18 19 20 21 22 23 24
        # 25 26 27 28 29 30
        fourth_friday = datetime.datetime(2012, 11, 23, tzinfo=pytz.utc)
        self.assertIn(fourth_friday, early_closes)

        #    November 2013
        # Su Mo Tu We Th Fr Sa
        #                 1  2
        #  3  4  5  6  7  8  9
        # 10 11 12 13 14 15 16
        # 17 18 19 20 21 22 23
        # 24 25 26 27 28 29 30
        fifth_friday = datetime.datetime(2013, 11, 29, tzinfo=pytz.utc)
        self.assertIn(fifth_friday, early_closes)
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def test_partial_month(self):

        start = datetime.datetime(
            year=1991,
            month=1,
            day=1,
            hour=0,
            minute=0,
            tzinfo=pytz.utc)

        # 1992 and 1996 were leap years
        total_days = 365 * 5 + 2
        end = start + datetime.timedelta(days=total_days)
        sim_params90s = SimulationParameters(
            period_start=start,
            period_end=end,
            env=self.env,
        )

        returns = factory.create_returns_from_range(sim_params90s)
        returns = returns[:-10]  # truncate the returns series to end mid-month
        metrics = risk.RiskReport(returns, sim_params90s, env=self.env)
        total_months = 60
        self.check_metrics(metrics, total_months, start)
项目:awsmfa    作者:dcoker    | 项目源码 | 文件源码
def use_testing_credentials(args, credentials):
    print("Skipping AWS API calls because AWSMFA_TESTING_MODE is set.",
          file=sys.stderr)
    # AWS returns offset-aware UTC times, so we fake that in order to
    # verify consistent code paths between py2 and py3 datetime.
    fake_expiration = (datetime.datetime.now(tz=pytz.utc) +
                       datetime.timedelta(minutes=5))
    fake_credentials = {
        'AccessKeyId': credentials.get(args.identity_profile,
                                       'aws_access_key_id'),
        'SecretAccessKey': credentials.get(args.identity_profile,
                                           'aws_secret_access_key'),
        'SessionToken': "420",
        'Expiration': fake_expiration,
    }
    print_expiration_time(fake_expiration)
    update_credentials_file(args.aws_credentials,
                            args.target_profile,
                            args.identity_profile,
                            credentials,
                            fake_credentials)
项目:corporadb    作者:nlesc-sherlock    | 项目源码 | 文件源码
def add_emails(self, topic_ids):
    '''
    add all emails and email_blobs
    '''
    # loop over emails
    for email in range(0, self.num_emails):  # loop over emails
      em = self.metadata[email]
      dtime_orig = dateparse(em['Date'])
      dtime_utc = dtime_orig.astimezone(pytz.utc)
      values = nparray([em['Subject'], em['From'], em['To'], em['Cc'],
                        em['Bcc'], dtime_orig, dtime_utc])
      values = nparray([value.replace("'", " ") if
                        (value and isinstance(value, str)) else value for value in values])
      rows = nparray(['subject', 'sender', 'receiver', 'cc', 'bcc',
                      'send_time', 'send_time_utc'])
      bool = nparray([True if a else False for a in values])
      self.add_email('email', rows[bool], values[bool])
      for idx2, t_id in enumerate(topic_ids):  # loop over topics
        rows = nparray(['topic_id', 'topic_probability'])
        values = nparray([t_id, self.email_prob[idx2, email]])
        self.add_blob('email_blob', rows, values)
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_stats_hours(self):
        """Test CACHE PROJECT STATS hours works."""
        pr = ProjectFactory.create()
        task = TaskFactory.create(n_answers=1)
        today = datetime.now(pytz.utc)
        TaskFactory.create()
        TaskRunFactory.create(project=pr, task=task)
        AnonymousTaskRunFactory.create(project=pr)
        hours, hours_anon, hours_auth, max_hours, \
            max_hours_anon, max_hours_auth = stats_hours(pr.id)
        assert len(hours) == 24, len(hours)
        assert hours[today.strftime('%H')] == 2, hours[today.strftime('%H')]
        assert hours_anon[today.strftime('%H')] == 1, hours_anon[today.strftime('%H')]
        assert hours_auth[today.strftime('%H')] == 1, hours_auth[today.strftime('%H')]
        assert max_hours == 2
        assert max_hours_anon == 1
        assert max_hours_auth == 1
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_stats_hours_with_period(self):
        """Test CACHE PROJECT STATS hours with period works."""
        pr = ProjectFactory.create()
        today = datetime.now(pytz.utc)
        d = date.today() - timedelta(days=6)
        task = TaskFactory.create(n_answers=1, created=d)
        TaskRunFactory.create(project=pr, task=task, created=d, finish_time=d)
        d = date.today() - timedelta(days=16)
        AnonymousTaskRunFactory.create(project=pr, created=d, finish_time=d)
        hours, hours_anon, hours_auth, max_hours, \
            max_hours_anon, max_hours_auth = stats_hours(pr.id)
        assert len(hours) == 24, len(hours)
        # We use 00 as the timedelta sets the hour to 00
        assert hours['00'] == 1, hours[today.strftime('%H')]
        assert hours_anon['00'] == 0, hours_anon[today.strftime('%H')]
        assert hours_auth['00'] == 1, hours_auth[today.strftime('%H')]
        assert max_hours == 1
        assert max_hours_anon is None
        assert max_hours_auth == 1
项目:sndlatr    作者:Schibum    | 项目源码 | 文件源码
def test_fromutc(self):
        # naive datetime.
        dt1 = datetime(2011, 10, 31)

        # localized datetime, same timezone.
        dt2 = self.tz.localize(dt1)

        # Both should give the same results. Note that the standard
        # Python tzinfo.fromutc() only supports the second.
        for dt in [dt1, dt2]:
            loc_dt = self.tz.fromutc(dt)
            loc_dt2 = pytz.utc.localize(dt1).astimezone(self.tz)
            self.assertEqual(loc_dt, loc_dt2)

        # localized datetime, different timezone.
        new_tz = pytz.timezone('Europe/Paris')
        self.assertTrue(self.tz is not new_tz)
        dt3 = new_tz.localize(dt1)
        self.assertRaises(ValueError, self.tz.fromutc, dt3)
项目:thorn    作者:robinhood    | 项目源码 | 文件源码
def test_encode_datetime():
    now = datetime.utcnow()
    now_utc = now.replace(tzinfo=pytz.utc)
    stripped = datetime(*now.timetuple()[:3])
    serialized = loads(dumps({
        'datetime': now,
        'tz': now_utc,
        'date': now.date(),
        'time': now.time()},
    ))
    assert serialized == {
        'datetime': now.isoformat(),
        'tz': '{0}Z'.format(now_utc.isoformat().split('+', 1)[0]),
        'time': now.time().isoformat(),
        'date': stripped.isoformat(),
    }
项目:serverthrallapi    作者:NullSoldier    | 项目源码 | 文件源码
def _sync_character(server, character, data):
    has_no_changes = (
        (data['x'] == character.x) or
        (data['y'] == character.y) or
        (data['z'] == character.y))

    last_online = (datetime
        .utcfromtimestamp(data['last_online'])
        .replace(tzinfo=pytz.utc))

    character.server         = server
    character.conan_id       = data['conan_id']
    character.name           = data['name']
    character.level          = data['level']
    character.is_online      = data['is_online']
    character.steam_id       = data['steam_id']
    character.last_killed_by = data['last_killed_by']
    character.x              = data['x']
    character.y              = data['y']
    character.z              = data['z']
    character.last_online    = last_online
    character.save()

    return not has_no_changes
项目:infi.clickhouse_orm    作者:Infinidat    | 项目源码 | 文件源码
def test_to_dict(self):
        instance = SimpleModel(date_field='1973-12-06', int_field='100', float_field='7')
        self.assertDictEqual(instance.to_dict(), {
            "date_field": datetime.date(1973, 12, 6),
            "int_field": 100,
            "float_field": 7.0,
            "datetime_field": datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc),
            "alias_field": 0.0,
            'str_field': 'dozo'
        })
        self.assertDictEqual(instance.to_dict(include_readonly=False), {
            "date_field": datetime.date(1973, 12, 6),
            "int_field": 100,
            "float_field": 7.0,
            "datetime_field": datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc),
            'str_field': 'dozo'
        })
        self.assertDictEqual(
            instance.to_dict(include_readonly=False, field_names=('int_field', 'alias_field', 'datetime_field')), {
                "int_field": 100,
                "datetime_field": datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc)
            })
项目:infi.clickhouse_orm    作者:Infinidat    | 项目源码 | 文件源码
def test_datetime_field(self):
        f = DateTimeField()
        epoch = datetime(1970, 1, 1, tzinfo=pytz.utc)
        # Valid values
        for value in (date(1970, 1, 1), datetime(1970, 1, 1), epoch,
                      epoch.astimezone(pytz.timezone('US/Eastern')), epoch.astimezone(pytz.timezone('Asia/Jerusalem')),
                      '1970-01-01 00:00:00', '1970-01-17 00:00:17', '0000-00-00 00:00:00', 0,
                      '2017-07-26T08:31:05', '2017-07-26T08:31:05Z', '2017-07-26 08:31',
                      '2017-07-26T13:31:05+05', '2017-07-26 13:31:05+0500'):
            dt = f.to_python(value, pytz.utc)
            self.assertEquals(dt.tzinfo, pytz.utc)
            # Verify that conversion to and from db string does not change value
            dt2 = f.to_python(f.to_db_string(dt, quote=False), pytz.utc)
            self.assertEquals(dt, dt2)
        # Invalid values
        for value in ('nope', '21/7/1999', 0.5,
                      '2017-01 15:06:00', '2017-01-01X15:06:00', '2017-13-01T15:06:00'):
            with self.assertRaises(ValueError):
                f.to_python(value, pytz.utc)
项目:infi.clickhouse_orm    作者:Infinidat    | 项目源码 | 文件源码
def test_date_field(self):
        f = DateField()
        epoch = date(1970, 1, 1)
        # Valid values
        for value in (datetime(1970, 1, 1), epoch, '1970-01-01', '0000-00-00', 0):
            d = f.to_python(value, pytz.utc)
            self.assertEquals(d, epoch)
            # Verify that conversion to and from db string does not change value
            d2 = f.to_python(f.to_db_string(d, quote=False), pytz.utc)
            self.assertEquals(d, d2)
        # Invalid values
        for value in ('nope', '21/7/1999', 0.5):
            with self.assertRaises(ValueError):
                f.to_python(value, pytz.utc)
        # Range check
        for value in (date(1900, 1, 1), date(2900, 1, 1)):
            with self.assertRaises(ValueError):
                f.validate(value)
项目:infi.clickhouse_orm    作者:Infinidat    | 项目源码 | 文件源码
def from_tsv(cls, line, field_names=None, timezone_in_use=pytz.utc, database=None):
        '''
        Create a model instance from a tab-separated line. The line may or may not include a newline.
        The `field_names` list must match the fields defined in the model, but does not have to include all of them.
        If omitted, it is assumed to be the names of all fields in the model, in order of definition.

        - `line`: the TSV-formatted data.
        - `field_names`: names of the model fields in the data.
        - `timezone_in_use`: the timezone to use when parsing dates and datetimes.
        - `database`: if given, sets the database that this instance belongs to.
        '''
        from six import next
        field_names = field_names or [name for name, field in cls._fields]
        values = iter(parse_tsv(line))
        kwargs = {}
        for name in field_names:
            field = getattr(cls, name)
            kwargs[name] = field.to_python(next(values), timezone_in_use)

        obj = cls(**kwargs)
        if database is not None:
            obj.set_database(database)

        return obj
项目:mobot    作者:JokerQyou    | 项目源码 | 文件源码
def filter(self, message):
        # First filter by weekday: we don't work on weekends
        date = TZ.normalize(message.date.replace(tzinfo=pytz.utc))
        if not is_workday(date):
            return False

        # Then filter by time: we work in range of [9.am, 18.pm]
        if date.hour < START or date.hour >= END:
            return False

        # Then filter by message text
        text = message.text

        if self._bad_keyword_detect(text) or self._keyword_detect(text):
            return True

        return False
项目:mobot    作者:JokerQyou    | 项目源码 | 文件源码
def workday_end_time(bot, update):
    bot.sendChatAction(
        chat_id=update.message.chat_id, action=ChatAction.TYPING
    )
    text_templates = (
        '{}?????????????',
        '?????{}',
        '??{}????',
        '??????{}',
        '???{}????',
        '????{}',
        '??????: {}',
    )

    now = TZ.normalize(update.message.date.replace(tzinfo=pytz.utc))
    end_time = TZ.localize(datetime(now.year, now.month, now.day, hour=18))
    duration = end_time - now

    hour = duration.seconds // 3600
    minute = (duration.seconds % 3600) // 60
    time_remaining = ' {} ??'.format(hour) if hour else ''
    time_remaining += ' {} ??'.format(minute)

    text = random.choice(text_templates).format(time_remaining)
    update.message.reply_text(text, quote=True)
项目:sqlakeyset    作者:djrobstep    | 项目源码 | 文件源码
def test_serial():
    assert s.serialize_value(None) == 'x'
    assert s.serialize_value(True) == 'true'
    assert s.serialize_value(False) == 'false'
    assert s.serialize_value(5) == 'i:5'
    assert s.serialize_value(5.0) == 'f:5.0'
    assert s.serialize_value(decimal.Decimal('5.5')) == 'n:5.5'
    assert s.serialize_value('abc') == 's:abc'
    assert s.serialize_value(b'abc') == 'b:YWJj'
    assert s.serialize_value(b'abc') == 'b:YWJj'
    assert s.serialize_value(datetime.date(2007, 12, 5)) == 'd:2007-12-05'
    assert s.serialize_value(datetime.datetime(2007, 12, 5, 12, 30, 30, tzinfo=utc)) \
        == 'dt:2007-12-05 12:30:30+00:00'
    assert s.serialize_value(datetime.time(12, 34, 56)) == 't:12:34:56'
    with raises(NotImplementedError):
        s.serialize_value(csv.reader)
项目:sqlakeyset    作者:djrobstep    | 项目源码 | 文件源码
def test_unserial():
    def twoway(x):
        assert s.unserialize_value(s.serialize_value(x)) == x

    twoway(None)
    twoway(True)
    twoway(False)
    twoway(5)
    twoway(5.0)
    twoway(decimal.Decimal('5.5'))
    twoway('abc')
    twoway(b'abc')
    twoway(b'abc')
    twoway(datetime.date(2007, 12, 5))
    twoway(datetime.datetime(2007, 12, 5, 12, 30, 30, tzinfo=utc))
    twoway(Z('abc'))

    with raises(ValueError):
        s.unserialize_value('zzzz:abc')
项目:love    作者:Yelp    | 项目源码 | 文件源码
def utc_week_limits(utc_dt):
    """Returns US/Pacific start (12:00 am Sunday) and end (11:59 pm Saturday) of the week containing utc_dt, in UTC."""
    local_now = utc_dt.replace(tzinfo=pytz.utc).astimezone(pytz.timezone('US/Pacific'))

    local_week_start = local_now - timedelta(
        days=local_now.weekday() + 1,
        hours=local_now.hour,
        minutes=local_now.minute,
        seconds=local_now.second,
        microseconds=local_now.microsecond,
    )
    local_week_end = local_week_start + timedelta(days=7, minutes=-1)

    utc_week_start = local_week_start.astimezone(pytz.utc).replace(tzinfo=None)
    utc_week_end = local_week_end.astimezone(pytz.utc).replace(tzinfo=None)

    return (utc_week_start, utc_week_end)
项目:health-mosconi    作者:GNUHealth-Mosconi    | 项目源码 | 文件源码
def parse(cls, report, objects, data, localcontext):
        Company = Pool().get('company.company')

        timezone = None
        company_id = Transaction().context.get('company')
        if company_id:
            company = Company(company_id)
            if company.timezone:
                timezone = pytz.timezone(company.timezone)

        dt = datetime.now()
        localcontext['print_date'] = datetime.astimezone(dt.replace(
            tzinfo=pytz.utc), timezone)
        localcontext['print_time'] = localcontext['print_date'].time()

        return super(PatientEvaluationReport, cls).parse(report, objects, data, 
            localcontext)
项目:libmozdata    作者:mozilla    | 项目源码 | 文件源码
def get_date_ymd(dt):
    """Get a datetime from a string 'Year-month-day'

    Args:
        dt (str): a date

    Returns:
        datetime: a datetime object
    """
    assert dt

    if isinstance(dt, datetime):
        return as_utc(dt)

    if dt == 'today':
        today = datetime.utcnow()
        return pytz.utc.localize(datetime(today.year, today.month, today.day))
    elif dt == 'tomorrow':
        tomorrow = datetime.utcnow() + timedelta(1)
        return pytz.utc.localize(datetime(tomorrow.year, tomorrow.month, tomorrow.day))
    elif dt == 'yesterday':
        yesterday = datetime.utcnow() - timedelta(1)
        return pytz.utc.localize(datetime(yesterday.year, yesterday.month, yesterday.day))

    return as_utc(dateutil.parser.parse(dt))
项目:libmozdata    作者:mozilla    | 项目源码 | 文件源码
def get_date_from_buildid(bid):
    """Get a date from buildid

    Args:
        bid (str): build_id

    Returns:
        date: date object
    """
    # 20160407164938 == 2016 04 07 16 49 38
    bid = str(bid)
    year = int(bid[:4])
    month = int(bid[4:6])
    day = int(bid[6:8])
    hour = int(bid[8:10])
    minute = int(bid[10:12])
    second = int(bid[12:14])

    return __pacific.localize(datetime(year, month, day, hour, minute, second)).astimezone(pytz.utc)
项目:Texty    作者:sarthfrey    | 项目源码 | 文件源码
def test_create(self, request):
        resp = create_mock_json('tests/resources/task_router/activities_instance.json')
        resp.status_code = 201
        request.return_value = resp

        activities = Activities(BASE_URI, AUTH)
        activity = activities.create("Test Activity", True)
        self.assertTrue(activity is not None)
        self.assertEqual(activity.date_created, datetime(2014, 5, 14, 10, 50, 2, tzinfo=pytz.utc))
        exp_params = {
            'FriendlyName': "Test Activity",
            'Available': "true"
        }

        request.assert_called_with("POST", "{0}/Activities".format(BASE_URI),
                                   data=exp_params, auth=AUTH,
                                   use_json_extension=False)
项目:councillor-party    作者:carsonyl    | 项目源码 | 文件源码
def __init__(self, clip_id, project_id, rank, title, descr, clip_start_utc, url,
                 category=None, timecodes=None):
        url_parts = url.split('_')
        start_ts = url_parts[-2]
        start_ts = datetime.strptime(start_ts, '%Y%m%d%H%M%S')
        start_ts = start_ts.replace(tzinfo=pytz.utc)

        duration = url_parts[-1].replace('.mp4', '')
        duration = timedelta(hours=int(duration[0:2]), minutes=int(duration[2:4]), seconds=int(duration[4:6]))

        super().__init__(clip_id, title, category, start_ts.isoformat(), (start_ts + duration).isoformat(), timecodes, url)
        self.project_id = project_id
        self.rank = rank
        self.descr = descr
        self.clip_start_utc = clip_start_utc
        self.duration = duration
项目:councillor-party    作者:carsonyl    | 项目源码 | 文件源码
def parse_time_range_from_url(adaptive_url):
    """
    Parse the time information available in a video URL.

    :param adaptive_url: Video URL, which contains time info.
    :return: Tuple of start time, end time, and duration
    """
    filename = adaptive_url.split('/')[-1]
    filename = filename.replace('.mp4', '')
    find_part = '_pc_'
    ts_part = filename[filename.find(find_part) + len(find_part):]
    start_ts, duration = ts_part.split('_')
    start_ts = datetime.strptime(start_ts, '%Y%m%d%H%M%S')
    start_ts = start_ts.replace(tzinfo=pytz.utc)
    duration = timedelta(hours=int(duration[:2]), minutes=int(duration[2:4]), seconds=int(duration[4:]))
    end_ts = start_ts + duration
    return start_ts, end_ts, duration
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def _generate_cached_enrollments(user, program, num_course_runs=1, data=None):
        """
        Helper method to generate CachedEnrollments for test cases
        """
        fake = faker.Factory.create()
        course = CourseFactory.create(program=program)
        course_run_params = dict(before_now=True, after_now=False, tzinfo=pytz.utc)
        course_runs = [
            CourseRunFactory.create(
                course=course,
                enrollment_start=fake.date_time_this_month(**course_run_params),
                start_date=fake.date_time_this_month(**course_run_params),
                enrollment_end=fake.date_time_this_month(**course_run_params),
                end_date=fake.date_time_this_year(**course_run_params),
            ) for _ in range(num_course_runs)
        ]
        factory_kwargs = dict(user=user)
        if data is not None:
            factory_kwargs['data'] = data
        return [CachedEnrollmentFactory.create(course_run=course_run, **factory_kwargs) for course_run in course_runs]
项目:astk    作者:openalea-incubator    | 项目源码 | 文件源码
def __init__(self, data_file='', reader=septo3d_reader, wind_screen=2,
                 temperature_screen=2,
                 localisation={'city': 'Montpellier', 'latitude': 43.61,
                               'longitude': 3.87},
                 timezone='UTC'):
        self.data_path = data_file
        self.models = {'global_radiation': PPFD_to_global,
                       'vapor_pressure': humidity_to_vapor_pressure,
                       'PPFD': global_to_PPFD,
                       'degree_days': linear_degree_days}

        self.timezone = pytz.timezone(timezone)
        if data_file is '':
            self.data = None
        else:
            self.data = reader(data_file)
            date = self.data['date']
            date = map(lambda x: self.timezone.localize(x), date)
            utc = map(lambda x: x.astimezone(pytz.utc), date)
            self.data.index = utc
            self.data.index.name = 'date_utc'

        self.wind_screen = wind_screen
        self.temperature_screen = temperature_screen
        self.localisation = localisation
项目:python-zhmcclient    作者:zhmcclient    | 项目源码 | 文件源码
def test_success_datetime_from_timestamp(self, datetime_tuple, timestamp):
        """Test successful calls to datetime_from_timestamp()."""

        if os.name == 'nt' and timestamp > TS_3001_LIMIT:
            # Skip this test case, due to the lower limit on Windows
            return

        # Create the expected datetime result (always timezone-aware in UTC)
        dt_unaware = datetime(*datetime_tuple)
        exp_dt = pytz.utc.localize(dt_unaware)

        # Execute the code to be tested
        dt = datetime_from_timestamp(timestamp)

        # Verify the result
        assert dt == exp_dt
项目:immobilus    作者:pokidovea    | 项目源码 | 文件源码
def test_time_function():

    dt = datetime(1970, 1, 1)
    timestamp = _datetime_to_utc_timestamp(dt)
    assert timestamp == 0.0
    assert type(timestamp) is float
    assert time() != timestamp

    with immobilus(dt):
        assert time() == timestamp

    assert time() != timestamp

    with immobilus(dt.replace(tzinfo=pytz.utc)):
        assert time() == timestamp

    assert time() != timestamp
项目:canvasapi    作者:ucfopen    | 项目源码 | 文件源码
def test_create_notification(self, m):
        register_uris({'account': ['create_notification']}, m)

        subject = 'Subject'
        notif_dict = {
            'subject': subject,
            'message': 'Message',
            'start_at': '2015-04-01T00:00:00Z',
            'end_at': '2018-04-01T00:00:00Z'
        }
        notif = self.account.create_notification(notif_dict)

        self.assertIsInstance(notif, AccountNotification)
        self.assertTrue(hasattr(notif, 'subject'))
        self.assertEqual(notif.subject, subject)
        self.assertTrue(hasattr(notif, 'start_at_date'))
        self.assertIsInstance(notif.start_at_date, datetime.datetime)
        self.assertEqual(notif.start_at_date.tzinfo, pytz.utc)
项目:DataTree    作者:tvgrabbers    | 项目源码 | 文件源码
def set_timezone(self, timezone = None):
        with self.tree_lock:
            if isinstance(timezone, datetime.tzinfo):
                self.timezone = timezone

            else:
                if timezone == None:
                    timezone = self.data_value(["timezone"], str, default='utc')

                try:
                    oldtz = self.timezone
                    self.timezone = pytz.timezone(timezone)

                except:
                    if isinstance(oldtz, datetime.tzinfo):
                        self.warn('Invalid timezone "%s" suplied. Falling back to the old timezone "%s"' \
                            % (timezone, oldtz.tzname), dtdata_defWarning, 2)
                        self.timezone = oldtz

                    else:
                        self.warn('Invalid timezone "%s" suplied. Falling back to UTC' % (timezone, ), dtdata_defWarning, 2)
                        self.timezone = pytz.utc

            self.set_current_date()
            self.set_current_weekdays()
项目:DataTree    作者:tvgrabbers    | 项目源码 | 文件源码
def set_current_date(self, cdate = None):
        with self.tree_lock:
            if isinstance(cdate, datetime.datetime):
                if cdate.tzinfo == None:
                    self.current_date = self.timezone.localize(cdate).date()

                else:
                    self.current_date = self.timezone.normalize(cdate.astimezone(self.timezone)).date()

                self.current_ordinal = self.current_date.toordinal()

            elif isinstance(cdate, datetime.date):
                self.current_date = cdate
                self.current_ordinal = self.current_date.toordinal()

            elif isinstance(cdate, int):
                self.current_ordinal = cdate
                datetime.datetime.fromordinal(cdate)

            else:
                if cdate != None:
                    self.warn('Invalid or no current_date "%s" suplied. Falling back to NOW' % (cdate, ), dtdata_defWarning, 2)

                self.current_date = self.timezone.normalize(datetime.datetime.now(pytz.utc).astimezone(self.timezone)).date()
                self.current_ordinal = self.current_date.toordinal()
项目:DataTree    作者:tvgrabbers    | 项目源码 | 文件源码
def __init__(self, data_def, data = None, warnaction = "default", warngoal = sys.stderr, caller_id = 0):
        self.tree_lock = RLock()
        with self.tree_lock:
            self.dtc = DataTreeConstants()
            self.ddconv = DataDef_Convert(warnaction = warnaction , warngoal = warngoal, caller_id = caller_id)
            self.caller_id = caller_id
            self.print_tags = False
            self.print_searchtree = False
            self.show_result = False
            self.fle = sys.stdout
            if sys.modules['DataTreeGrab']._warnings == None:
                sys.modules['DataTreeGrab']._warnings = _Warnings(warnaction, warngoal, caller_id)

            else:
                sys.modules['DataTreeGrab']._warnings.set_warnaction(warnaction, caller_id)

            self.searchtree = None
            self.timezone = pytz.utc
            self.errorcode = dte.dtDataInvalid
            self.result = []
            self.data_def = None
            self.init_data_def(data_def)
            if data != None:
                self.init_data(data)
项目:cti-taxii-server    作者:oasis-open    | 项目源码 | 文件源码
def format_datetime(dttm):
    # 1. Convert to timezone-aware
    # 2. Convert to UTC
    # 3. Format in ISO format
    # 4. Add subsecond value if non-zero
    # 5. Add "Z"

    if dttm.tzinfo is None or dttm.tzinfo.utcoffset(dttm) is None:
        # dttm is timezone-naive; assume UTC
        zoned = pytz.utc.localize(dttm)
    else:
        zoned = dttm.astimezone(pytz.utc)
    ts = zoned.strftime("%Y-%m-%dT%H:%M:%S")
    if zoned.microsecond > 0:
        ms = zoned.strftime("%f")
        ts = ts + '.' + ms.rstrip("0")
    return ts + "Z"
项目:filters    作者:eflglobal    | 项目源码 | 文件源码
def test_partial(self):
        """
        You can use Filter macros to create partials from other Filter
        types.
        """
        MyDatetime = filter_macro(f.Datetime, timezone=12)

        self.assertEqual(
            MyDatetime().apply('2015-10-13 15:22:18'),

            # By default, MyDatetime assumes a timezone of UTC+12...
            datetime(2015, 10, 13, 3, 22, 18, tzinfo=utc),
        )

        self.assertEqual(
            # ... however, we can override it.
            MyDatetime(timezone=6).apply('2015-10-13 15:22:18'),

            datetime(2015, 10, 13, 9, 22, 18, tzinfo=utc),
        )
项目:filters    作者:eflglobal    | 项目源码 | 文件源码
def test_pass_naive_timestamp_default_timezone(self):
        """
        The incoming value is a naive timestamp, but the Filter is
        configured not to treat naive timestamps as UTC.
        """
        self.assertFilterPasses(
            # The incoming value is a naive timestamp, and the Filter
            # is configured to use UTC+8 by default.
            self._filter(
                '2015-05-12 09:20:03',
                timezone = tzoffset('UTC+8', 8*3600),
            ),

            # The resulting datetime is still converted to UTC.
            datetime(2015, 5, 12, 1, 20, 3, tzinfo=utc),
        )
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def _ToTimeZone(t, tzinfo):
  """Converts 't' to the time zone 'tzinfo'.

  Arguments:
    t: a datetime object.  It may be in any pytz time zone, or it may be
        timezone-naive (interpreted as UTC).
    tzinfo: a pytz timezone object, or None.

  Returns:
    a datetime object in the time zone 'tzinfo'
  """
  if pytz is None:

    return t.replace(tzinfo=None)
  elif tzinfo:

    if not t.tzinfo:
      t = pytz.utc.localize(t)
    return tzinfo.normalize(t.astimezone(tzinfo))
  elif t.tzinfo:

    return pytz.utc.normalize(t.astimezone(pytz.utc)).replace(tzinfo=None)
  else:

    return t
项目:ELO-Darts    作者:pwgraham91    | 项目源码 | 文件源码
def convert_to_local_from_utc(utc_time):
    local_timezone = tzlocal.get_localzone()
    local_time = utc_time.replace(tzinfo=pytz.utc).astimezone(local_timezone)
    return local_time
项目:ELO-Darts    作者:pwgraham91    | 项目源码 | 文件源码
def convert_to_tz_from_utc(utc_time, tz):
    local_time = utc_time.replace(tzinfo=pytz.utc).astimezone(pytz.timezone(tz))
    return local_time
项目:django_pipedrive    作者:MasAval    | 项目源码 | 文件源码
def test_datetime_from_fields_correct(self):

        el = {
            "next_activity_date": "2017-03-20",
            "next_activity_time": "14:01:02"
        }

        result = Organization.datetime_from_fields(el, 'next_activity_date', 'next_activity_time')
        expected = datetime.datetime(2017, 3, 20, 14, 1, 2, 0, tzinfo=pytz.utc)

        self.assertEquals(result, expected)
项目:django_pipedrive    作者:MasAval    | 项目源码 | 文件源码
def test_datetime_from_simple_time_correct(self):

        el = {u'update_time': u'2017-04-03 16:21:12'}

        result = Organization.datetime_from_simple_time(el, u'update_time')
        expected = datetime.datetime(2017, 4, 3, 16, 21, 12, 0, tzinfo=pytz.utc)

        self.assertEquals(result, expected)