Python datetime.timezone 模块,utc() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用datetime.timezone.utc()

项目:flash_services    作者:textbook    | 项目源码 | 文件源码
def to_utc_timestamp(date_time):
    """Convert a naive or timezone-aware datetime to UTC timestamp.

    Arguments:
      date_time (:py:class:`datetime.datetime`): The datetime to
        convert.

    Returns:
      :py:class:`int`: The timestamp (in seconds).

    """
    if date_time is None:
        return
    if date_time.tzname() is None:
        timestamp = date_time.replace(tzinfo=timezone.utc).timestamp()
    else:
        timestamp = date_time.timestamp()
    return int(round(timestamp, 0))
项目:flash_services    作者:textbook    | 项目源码 | 文件源码
def occurred(at_):
    """Calculate when a service event occurred.

    Arguments:
      at_ (:py:class:`str`): When the event occurred.

    Returns:
      :py:class:`str`: The humanized occurrence time.

    """
    try:
        occurred_at = parse(at_)
    except (TypeError, ValueError):
        logger.warning('failed to parse occurrence time %r', at_)
        return 'time not available'
    utc_now = datetime.now(tz=timezone.utc)
    try:
        return naturaltime((utc_now - occurred_at).total_seconds())
    except TypeError:  # at_ is a naive datetime
        return naturaltime((datetime.now() - occurred_at).total_seconds())
项目:flash_services    作者:textbook    | 项目源码 | 文件源码
def calculate_timeout(http_date):
        """Extract request timeout from e.g. ``Retry-After`` header.

        Note:
          Per :rfc:`2616#section-14.37`, the ``Retry-After`` header can
          be either an integer number of seconds or an HTTP date. This
          function can handle either.

        Arguments:
          http_date (:py:class:`str`): The date to parse.

        Returns:
          :py:class:`int`: The timeout, in seconds.

        """
        try:
            return int(http_date)
        except ValueError:
            date_after = parse(http_date)
        utc_now = datetime.now(tz=timezone.utc)
        return int((date_after - utc_now).total_seconds())
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def reset(self, variantId, buildId, resultHash):
        self.__variantId = variantId
        self.__buildId = buildId
        self.__resultHash = resultHash
        self.__recipes = None
        self.__defines = {}
        u = os.uname()
        self.__build = {
            'sysname'  : u.sysname,
            'nodename' : u.nodename,
            'release'  : u.release,
            'version'  : u.version,
            'machine'  : u.machine,
            'date'     : datetime.now(timezone.utc).isoformat(),
        }
        self.__env = ""
        self.__metaEnv = {}
        self.__scms = []
        self.__deps = []
        self.__tools = {}
        self.__sandbox = None
        self.__id = None
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def test_throttling_compute_fine(SETTINGS):
    SETTINGS.RATE_LIMIT_THRESHOLD = 0
    from jenkins_epo.procedures import compute_throttling

    # Consumed 1/5 calls at 2/3 of the time.
    now = datetime(2017, 1, 18, 14, 40, tzinfo=timezone.utc)
    reset = datetime(2017, 1, 18, 15, tzinfo=timezone.utc)
    remaining = 4000
    seconds = compute_throttling(
        now=now,
        rate_limit=dict(rate=dict(
            limit=5000, remaining=remaining,
            reset=reset.timestamp(),
        )),
    )
    assert 0 == seconds  # Fine !
项目:Ruby-Bot    作者:ahuei123456    | 项目源码 | 文件源码
def convert_time(time):
    # 2014-11-05T09:00:00Z
    # yyyy-mm-ddThh:mm:ssZ
    # 2013-06-12T16:00:00+09:00
    if time is None:
        return None
    if len(time) < 10:
        return None

    year = int(time[:4])
    month = int(time[5:7])
    date = int(time[8:10])

    hour = int(time[11:13])
    minute = int(time[14:16])
    second = int(time[17:19])

    if len(time) > 21:
        tz = timezone(timedelta(hours=9))
    else:
        tz = timezone.utc

    dt = datetime.datetime(year, month, date, hour, minute, second, 0, tz)
    return dt
项目:Ruby-Bot    作者:ahuei123456    | 项目源码 | 文件源码
def event_remaining(dt_start, dt_end):
    now = datetime.datetime.now(timezone.utc)

    diff_end = dt_end - now
    diff_start = now - dt_start
    if diff_start.total_seconds() < 0:
        return "Event has not started yet!"
    elif diff_end.total_seconds() < 0:
        return "Event has ended!"
    else:
        seconds = diff_end.seconds

        hours = seconds // 3600
        seconds -= hours * 3600

        minutes = seconds // 60
        seconds -= minutes * 60

        return "Event ends in {} days, {} hours, {} minutes and {} seconds.".format(diff_end.days, hours, minutes, seconds)

###### Sunshine ######
项目:atoolbox    作者:liweitianux    | 项目源码 | 文件源码
def header(self):
        dDc = self.configs.Dc_cell
        Dc_min, Dc_max = self.configs.Dc_limit
        header = fits.Header()
        header["BUNIT"] = (self.configs.unit, "Data unit")
        header["zmin"] = (self.configs.zmin, "HI simulation minimum redshift")
        header["zmax"] = (self.configs.zmax, "HI simulation maximum redshift")
        header["dz"] = (self.configs.dz, "HI simulation redshift step size")
        header["Dc_min"] = (Dc_min, "[cMpc] comoving distance at zmin")
        header["Dc_max"] = (Dc_max, "[cMpc] comoving distance at zmax")
        header["Dc_step"] = (dDc, "[cMpc] comoving distance between slices")
        header["Lside"] = (self.configs.Lside, "[cMpc] Simulation side length")
        header["Nside"] = (self.configs.Nside, "Number of cells at each side")
        header["DATE"] = (datetime.now(timezone.utc).astimezone().isoformat(),
                          "File creation date")
        header.add_history(" ".join(sys.argv))
        header.extend(self.wcs.to_header(), update=True)
        return header
项目:atoolbox    作者:liweitianux    | 项目源码 | 文件源码
def write_slice(self, outfile, data, z, clobber=False):
        freq = z2freq(z)
        Dc = cosmo.comoving_distance(z).value  # [Mpc]
        header = fits.Header()
        header["BUNIT"] = (self.header["BUNIT"],
                           self.header.comments["BUNIT"])
        header["Lside"] = (self.header["Lside"],
                           self.header.comments["Lside"])
        header["Nside"] = (self.header["Nside"],
                           self.header.comments["Nside"])
        header["REDSHIFT"] = (z, "redshift of this slice")
        header["FREQ"] = (freq, "[MHz] observed HI signal frequency")
        header["Dc"] = (Dc, "[cMpc] comoving distance")
        header["DATE"] = (datetime.now(timezone.utc).astimezone().isoformat(),
                          "File creation date")
        header.add_history(" ".join(sys.argv))
        hdu = fits.PrimaryHDU(data=data, header=header)
        try:
            hdu.writeto(outfile, overwrite=clobber)
        except TypeError:
            hdu.writeto(outfile, clobber=clobber)
        logger.info("Wrote slice to file: %s" % outfile)
项目:atoolbox    作者:liweitianux    | 项目源码 | 文件源码
def header(self):
        dDc = self.Dc_cell
        header = fits.Header()
        header["BUNIT"] = (str(self.unit), "Data unit")
        header["zmin"] = (self.zmin, "HI simulation minimum redshift")
        header["zmax"] = (self.zmax, "HI simulation maximum redshift")
        header["Dc_min"] = (self.Dc_min, "[cMpc] comoving distance at zmin")
        header["Dc_max"] = (self.Dc_max, "[cMpc] comoving distance at zmax")
        header["Dc_step"] = (dDc, "[cMpc] comoving distance between slices")
        header["Lside"] = (self.Lside, "[cMpc] Simulation side length")
        header["Nside"] = (self.Nside, "Number of cells at each side")
        header["DATE"] = (datetime.now(timezone.utc).astimezone().isoformat(),
                          "File creation date")
        header.add_history(" ".join(sys.argv))
        header.extend(self.wcs.to_header(), update=True)
        return header
项目:league    作者:massgo    | 项目源码 | 文件源码
def create_game():
    """Create a new game."""
    form = GameCreateForm(request.form)
    _set_game_create_choices(form)
    if form.validate_on_submit():
        white = Player.get_by_id(form.white_id.data)
        black = Player.get_by_id(form.black_id.data)
        played_at = None
        if form.played_at.data is not None:
            played_at = form.played_at.data.astimezone(timezone.utc)
        game = Game.create(
            white=white,
            black=black,
            winner=form.winner.data,
            handicap=form.handicap.data,
            komi=form.komi.data,
            season=form.season.data,
            episode=form.episode.data,
            played_at=played_at
        )
        messenger.notify_slack(_slack_game_msg(game))
        return jsonify(game.to_dict()), 201
    else:
        return jsonify(**form.errors), 404
项目:league    作者:massgo    | 项目源码 | 文件源码
def _slack_game_msg(game):
    if game.winner is Color.white:
        msg = '<{w_url}|{w_name}> (W) defeated <{b_url}|{b_name}> (B)'
    else:
        msg = '<{b_url}|{b_name}> (B) defeated <{w_url}|{w_name}> (W)'
    result = (msg + ' at {handicap} stones, {komi}.5 komi at <!date^{date_val}'
              '^{{time}} on {{date_num}}|{date_string}> '
              '(S{season:0>2}E{episode:0>2})')

    # Gross hack around the fact that we retrieve as naive DateTimes.
    # See: https://github.com/massgo/league/issues/93
    utc_time = int(game.played_at.replace(tzinfo=timezone.utc).timestamp())

    return result.format(w_name=game.white.full_name,
                         w_url=url_for('dashboard.get_player',
                                       player_id=game.white.id, _external=True),
                         b_name=game.black.full_name,
                         b_url=url_for('dashboard.get_player',
                                       player_id=game.black.id, _external=True),
                         handicap=game.handicap,
                         komi=game.komi,
                         date_string=game.played_at,
                         date_val=utc_time,
                         season=game.season,
                         episode=game.episode)
项目:league    作者:massgo    | 项目源码 | 文件源码
def update_game():
    """Update an existing game."""
    form = GameUpdateForm(request.form)
    _set_game_create_choices(form)
    if form.validate_on_submit():
        white = Player.get_by_id(form.white_id.data)
        black = Player.get_by_id(form.black_id.data)
        played_at = None
        if form.played_at.data is not None:
            played_at = form.played_at.data.astimezone(timezone.utc)
        game = Game.get_by_id(form.game_id.data)
        game.update(
            white=white,
            black=black,
            winner=form.winner.data,
            handicap=form.handicap.data,
            komi=form.komi.data,
            season=form.season.data,
            episode=form.episode.data,
            played_at=played_at
        )
        return jsonify(game.to_dict()), 200
    else:
        return jsonify(**form.errors), 404
项目:tukio    作者:optiflows    | 项目源码 | 文件源码
def _step(self, exc=None):
        """
        Wrapper around `Task._step()` to automatically dispatch a
        `TaskExecState.BEGIN` event.
        """
        if not self._in_progress:
            self._start = datetime.now(timezone.utc)
            source = {'task_exec_id': self.uid}
            if self._template:
                source['task_template_id'] = self._template.uid
            if self._workflow:
                source['workflow_template_id'] = self._workflow.template.uid
                source['workflow_exec_id'] = self._workflow.uid
            self._source = EventSource(**source)
            self._in_progress = True
            data = {
                'type': TaskExecState.BEGIN.value,
                'content': self._inputs
            }
            self._broker.dispatch(
                data,
                topics=workflow_exec_topics(self._source._workflow_exec_id),
                source=self._source,
            )
        super()._step(exc)
项目:grenouillebot    作者:FroggedTV    | 项目源码 | 文件源码
def get_next_event(self):
        """Access to the next Event in the calendar.

        Returns:
            The Event object corresponding to the next event in the calendar
            or None if there is no event.
        """
        now = datetime.now(timezone.utc)
        while self.event_list and self.event_list[0].end < now:
            self.event_list.pop(0)

        if len(self.event_list) == 0:
            return None
        elif self.event_list[0].start > now:
            return self.event_list[0]
        elif len(self.event_list) == 1:
            return None
        else:
            return self.event_list[1]
项目:grenouillebot    作者:FroggedTV    | 项目源码 | 文件源码
def get_now_event(self):
        """Access to the current Event in the calendar.

        Returns:
            The Event object corresponding to the current event in the calendar
            or None if there is no event.
        """
        now = datetime.now(timezone.utc)
        while self.event_list and self.event_list[0].end < now:
            self.event_list.pop(0)

        if len(self.event_list) == 0:
            return None
        elif self.event_list[0].start < now < self.event_list[0].end:
            return self.event_list[0]
        else:
            return None
项目:audio-feeder    作者:pganssle    | 项目源码 | 文件源码
def make_new_entry(self, rel_path, id_handler):
        """
        Generates a new entry for the specified path.

        Note: This will mutate the id_handler!
        """
        # Try to match to an existing book.
        e_id = id_handler.new_id()

        abs_path = os.path.join(read_from_config('media_loc').path, rel_path)
        lmtime = os.path.getmtime(abs_path)
        added_dt = datetime.utcfromtimestamp(lmtime)
        last_modified = added_dt.replace(tzinfo=timezone.utc)

        entry_obj = oh.Entry(id=e_id, path=rel_path,
            date_added=datetime.now(timezone.utc),
            last_modified=last_modified,
            type='Book', table=self.BOOK_TABLE_NAME, data_id=None,
            hashseed=_rand.randint(0, 2**32))

        return entry_obj
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def map_external_gallery_data_to_internal(gallery_data: DataDict) -> GalleryData:
    internal_gallery_data = GalleryData(
        gallery_data['gid'],
        token=gallery_data['token'],
        archiver_key=gallery_data['archiver_key'],
        title=unescape(gallery_data['title']),
        title_jpn=unescape(gallery_data['title_jpn']),
        thumbnail_url=gallery_data['thumb'],
        category=gallery_data['category'],
        provider=constants.provider_name,
        uploader=gallery_data['uploader'],
        posted=datetime.fromtimestamp(int(gallery_data['posted']), timezone.utc),
        filecount=gallery_data['filecount'],
        filesize=gallery_data['filesize'],
        expunged=gallery_data['expunged'],
        rating=gallery_data['rating'],
        tags=translate_tag_list(gallery_data['tags']),
    )
    m = re.search(constants.default_fjord_tags, ",".join(internal_gallery_data.tags))
    if m:
        internal_gallery_data.fjord = True
    if constants.ex_thumb_url in internal_gallery_data.thumbnail_url:
        internal_gallery_data.thumbnail_url = internal_gallery_data.thumbnail_url.replace(constants.ex_thumb_url, constants.ge_thumb_url)
    return internal_gallery_data
项目:taskhud    作者:usefulthings    | 项目源码 | 文件源码
def t_date(s):
    """
    TaskWarrior provides times as UTC timestamps in ISO 8601
    """
    year   = int(s[0:4])
    month  = int(s[4:6])
    day    = int(s[6:8])

    hour   = int(s[9:11])
    minute = int(s[11:13])
    second = int(s[13:15])

    # This is UTC time
    ts = datetime(year, month, day, hour, minute, second)

    # Convert to local time
    local_time = ts.replace(tzinfo=timezone.utc).astimezone(tz=None)
    # Convert to ISO display format, and remove timezone offset
    iso_format = local_time.isoformat(sep=" ")[:-6]

    return iso_format

# TODO: move to separate module
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def convert_between_tz_and_utc(self, tz, utc):
        dston = self.dston.replace(tzinfo=tz)
        # Because 1:MM on the day DST ends is taken as being standard time,
        # there is no spelling in tz for the last hour of daylight time.
        # For purposes of the test, the last hour of DST is 0:MM, which is
        # taken as being daylight time (and 1:MM is taken as being standard
        # time).
        dstoff = self.dstoff.replace(tzinfo=tz)
        for delta in (timedelta(weeks=13),
                      DAY,
                      HOUR,
                      timedelta(minutes=1),
                      timedelta(microseconds=1)):

            self.checkinside(dston, tz, utc, dston, dstoff)
            for during in dston + delta, dstoff - delta:
                self.checkinside(during, tz, utc, dston, dstoff)

            self.checkoutside(dstoff, tz, utc)
            for outside in dston - delta, dstoff + delta:
                self.checkoutside(outside, tz, utc)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_easy(self):
        # Despite the name of this test, the endcases are excruciating.
        self.convert_between_tz_and_utc(Eastern, utc_real)
        self.convert_between_tz_and_utc(Pacific, utc_real)
        self.convert_between_tz_and_utc(Eastern, utc_fake)
        self.convert_between_tz_and_utc(Pacific, utc_fake)
        # The next is really dancing near the edge.  It works because
        # Pacific and Eastern are far enough apart that their "problem
        # hours" don't overlap.
        self.convert_between_tz_and_utc(Eastern, Pacific)
        self.convert_between_tz_and_utc(Pacific, Eastern)
        # OTOH, these fail!  Don't enable them.  The difficulty is that
        # the edge case tests assume that every hour is representable in
        # the "utc" class.  This is always true for a fixed-offset tzinfo
        # class (lke utc_real and utc_fake), but not for Eastern or Central.
        # For these adjacent DST-aware time zones, the range of time offsets
        # tested ends up creating hours in the one that aren't representable
        # in the other.  For the same reason, we would see failures in the
        # Eastern vs Pacific tests too if we added 3*HOUR to the list of
        # offset deltas in convert_between_tz_and_utc().
        #
        # self.convert_between_tz_and_utc(Eastern, Central)  # can't work
        # self.convert_between_tz_and_utc(Central, Eastern)  # can't work
项目:scrapy-soccerway    作者:tvl    | 项目源码 | 文件源码
def parse_match(self, response):
        item = MatchInfo()
        item['id'] = parse_qs(response.xpath('//div[@class="clearfix subnav level-1"]//li//a/@href').extract()[3])['id'][0]
        item['area'] = response.xpath('//div[@class="clearfix subnav level-1"]//li//a/text()').extract()[1]
        item['competition'] = response.xpath('//div[@class="clearfix subnav level-1"]//li//a/text()').extract()[2]
        item['home_team'] = response.xpath('//div[@class="container left"]//a/text()').extract_first()
        item['away_team'] = response.xpath('//div[@class="container right"]//a/text()').extract_first()
        item['ht_last5'] = ''.join(response.xpath('//div[@class="container left"]//a/text()').extract()[1:6])
        item['at_last5'] = ''.join(response.xpath('//div[@class="container right"]//a/text()').extract()[1:6])
        item['datetime'] = datetime.fromtimestamp(int(response.xpath('//div[@class="details clearfix"]/dl/dt[.="Date"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Date"]//span/@data-value').extract_first()), timezone.utc).isoformat(' ')
        #item['competition'] = response.xpath('//div[@class="details clearfix"]/dl/dt[.="Competition"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Competition"]/a/text()').extract_first()
        item['game_week'] = response.xpath('//div[@class="details clearfix"]/dl/dt[.="Game week"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Game week"]/text()').extract_first()
        item['kick_off'] = response.xpath('//div[@class="details clearfix"]/dl/dt[.="Kick-off"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Kick-off"]//span/text()').extract_first()
        item['venue'] = response.xpath('//div[@class="details clearfix"]/dl/dt[.="Venue"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Venue"]//a/text()').extract_first()
        item['updated'] = datetime.utcnow().isoformat(' ')
        yield item
        return item
        #self.log('URL: {}'.format(response.url))
项目:arduino-ciao-meteor-ddp-connector    作者:andrea689    | 项目源码 | 文件源码
def _decode(self, o):
        if isinstance(o, dict):
            if len(o) == 1:
                if "$escape" in o:
                    return self._decode_escaped(o['$escape'])
                if "$date" in o:
                    return datetime.fromtimestamp(o["$date"] / 1000.0, timezone.utc)
                if "$binary" in o:
                    return b64decode(o['$binary'])
            if len(o) == 2 and "$type" in o and "$value" in o:
                try:
                    reviver = self.custom_type_hooks[o['$type']]
                except KeyError:
                    raise UnknownTypeError(o["$type"])

                return reviver(o["$value"])

            if self.object_pairs_hook is not None:
                return self.object_pairs_hook((k, self._decode(v)) for k, v in o.items())
            return {k: self._decode(v) for k, v in o.items()}

        if isinstance(o, (list, tuple)):
            return [self._decode(v) for v in o]

        return o
项目:aw-server    作者:ActivityWatch    | 项目源码 | 文件源码
def benchmark():
    ds = aw_datastore.Datastore(aw_datastore.storages.PeeweeStorage, testing=True)
    api = aw_server.api.ServerAPI(ds, testing=True)

    print(api.get_info())

    bucket_id = "test-benchmark"

    try:
        api.create_bucket(bucket_id, "test", "test", "test")
    except Exception as e:
        print(e)

    print("Benchmarking... this will take 30 seconds")
    for i in range(120):
        sleep(0.1)
        api.heartbeat(bucket_id, Event(timestamp=datetime.now(tz=tz.utc), data={"test": str(int(i))}), pulsetime=0.3)
项目:aw-server    作者:ActivityWatch    | 项目源码 | 文件源码
def _create_heartbeat_events(start=datetime.now(tz=timezone.utc),
                             delta=timedelta(seconds=1)):
    e1_ts = start
    e2_ts = e1_ts + delta

    # Needed since server (or underlying datastore) drops precision up to milliseconds.
    # Update: Even with millisecond precision it sometimes fails. (tried using `round` and `int`)
    #         Now rounding down to 10ms precision to prevent random failure.
    #         10ms precision at least seems to work well.
    # TODO: Figure out why it sometimes fails with millisecond precision. Would probably
    #       be useful to find the microsecond values where it consistently always fails.
    e1_ts = e1_ts.replace(microsecond=int(e1_ts.microsecond / 10000) * 100)
    e2_ts = e2_ts.replace(microsecond=int(e2_ts.microsecond / 10000) * 100)

    e1 = Event(timestamp=e1_ts, data={"label": "test"})
    e2 = Event(timestamp=e2_ts, data={"label": "test"})

    return e1, e2
项目:aw-server    作者:ActivityWatch    | 项目源码 | 文件源码
def test_midnight_heartbeats(client, bucket):
    now = datetime.now(tz=timezone.utc)
    midnight = now.replace(hour=23, minute=50)
    events = _create_periodic_events(20, start=midnight, delta=timedelta(minutes=1))

    label_ring = ["1", "1", "2", "3", "4"]
    for i, e in enumerate(events):
        e.data["label"] = label_ring[i % len(label_ring)]
        client.heartbeat(bucket, e, pulsetime=90)

    recv_events_merged = client.get_events(bucket, limit=-1)
    assert len(recv_events_merged) == 4 / 5 * len(events)

    recv_events_after_midnight = client.get_events(bucket, start=midnight + timedelta(minutes=10))
    pprint(recv_events_after_midnight)
    assert len(recv_events_after_midnight) == int(len(recv_events_merged) / 2)
项目:pendulum    作者:sdispater    | 项目源码 | 文件源码
def test_astimezone(self):
        d = Pendulum(2015, 1, 15, 18, 15, 34)
        now = Pendulum(2015, 1, 15, 18, 15, 34)
        self.assertEqual('UTC', d.timezone_name)
        self.assertPendulum(d, now.year, now.month, now.day, now.hour, now.minute)

        d = d.astimezone('Europe/Paris')
        self.assertEqual('Europe/Paris', d.timezone_name)
        self.assertPendulum(d, now.year, now.month, now.day, now.hour + 1, now.minute)

        if sys.version_info >= (3, 2):
            d = d.astimezone(timezone.utc)
            self.assertEqual('+00:00', d.timezone_name)
            self.assertPendulum(d, now.year, now.month, now.day, now.hour, now.minute)

            d = d.astimezone(timezone(timedelta(hours=-8)))
            self.assertEqual('-08:00', d.timezone_name)
            self.assertPendulum(d, now.year, now.month, now.day, now.hour - 8, now.minute)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def convert_between_tz_and_utc(self, tz, utc):
        dston = self.dston.replace(tzinfo=tz)
        # Because 1:MM on the day DST ends is taken as being standard time,
        # there is no spelling in tz for the last hour of daylight time.
        # For purposes of the test, the last hour of DST is 0:MM, which is
        # taken as being daylight time (and 1:MM is taken as being standard
        # time).
        dstoff = self.dstoff.replace(tzinfo=tz)
        for delta in (timedelta(weeks=13),
                      DAY,
                      HOUR,
                      timedelta(minutes=1),
                      timedelta(microseconds=1)):

            self.checkinside(dston, tz, utc, dston, dstoff)
            for during in dston + delta, dstoff - delta:
                self.checkinside(during, tz, utc, dston, dstoff)

            self.checkoutside(dstoff, tz, utc)
            for outside in dston - delta, dstoff + delta:
                self.checkoutside(outside, tz, utc)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_easy(self):
        # Despite the name of this test, the endcases are excruciating.
        self.convert_between_tz_and_utc(Eastern, utc_real)
        self.convert_between_tz_and_utc(Pacific, utc_real)
        self.convert_between_tz_and_utc(Eastern, utc_fake)
        self.convert_between_tz_and_utc(Pacific, utc_fake)
        # The next is really dancing near the edge.  It works because
        # Pacific and Eastern are far enough apart that their "problem
        # hours" don't overlap.
        self.convert_between_tz_and_utc(Eastern, Pacific)
        self.convert_between_tz_and_utc(Pacific, Eastern)
        # OTOH, these fail!  Don't enable them.  The difficulty is that
        # the edge case tests assume that every hour is representable in
        # the "utc" class.  This is always true for a fixed-offset tzinfo
        # class (lke utc_real and utc_fake), but not for Eastern or Central.
        # For these adjacent DST-aware time zones, the range of time offsets
        # tested ends up creating hours in the one that aren't representable
        # in the other.  For the same reason, we would see failures in the
        # Eastern vs Pacific tests too if we added 3*HOUR to the list of
        # offset deltas in convert_between_tz_and_utc().
        #
        # self.convert_between_tz_and_utc(Eastern, Central)  # can't work
        # self.convert_between_tz_and_utc(Central, Eastern)  # can't work
项目:twtxt    作者:buckket    | 项目源码 | 文件源码
def parse_tweets(raw_tweets, source, now=None):
    """
        Parses a list of raw tweet lines from a twtxt file
        and returns a list of :class:`Tweet` objects.

        :param list raw_tweets: list of raw tweet lines
        :param Source source: the source of the given tweets
        :param Datetime now: the current datetime

        :returns: a list of parsed tweets :class:`Tweet` objects
        :rtype: list
    """
    if now is None:
        now = datetime.now(timezone.utc)

    tweets = []
    for line in raw_tweets:
        try:
            tweet = parse_tweet(line, source, now)
        except (ValueError, OverflowError) as e:
            logger.debug("{0} - {1}".format(source.url, e))
        else:
            tweets.append(tweet)

    return tweets
项目:twtxt    作者:buckket    | 项目源码 | 文件源码
def parse_tweet(raw_tweet, source, now=None):
    """
        Parses a single raw tweet line from a twtxt file
        and returns a :class:`Tweet` object.

        :param str raw_tweet: a single raw tweet line
        :param Source source: the source of the given tweet
        :param Datetime now: the current datetime

        :returns: the parsed tweet
        :rtype: Tweet
    """
    if now is None:
        now = datetime.now(timezone.utc)

    raw_created_at, text = raw_tweet.split("\t", 1)
    created_at = parse_iso8601(raw_created_at)

    if created_at > now:
        raise ValueError("Tweet is from the future")

    return Tweet(click.unstyle(text.strip()), created_at, source)
项目:scrapy-wayback-machine    作者:sangaline    | 项目源码 | 文件源码
def set_time_range(self, time_range):
        # allow a single time to be passed in place of a range
        if type(time_range) not in [tuple, list]:
            time_range = (time_range, time_range)

        # translate the times to unix timestamps
        def parse_time(time):
            if type(time) in [int, float, str]:
                time = int(time)
                # realistic timestamp range
                if 10**8 < time < 10**13:
                    return time
                # otherwise archive.org timestamp format (possibly truncated)
                time_string = str(time)[::-1].zfill(14)[::-1]
                time = datetime.strptime(time_string, self.timestamp_format)
                time = time.replace(tzinfo=timezone.utc)
            return time.timestamp()

        self.time_range = [parse_time(time) for time in time_range]
项目:automation    作者:EvidentSecurity    | 项目源码 | 文件源码
def create_service_principal(options, sub_config):
    credentials = ServicePrincipalCredentials(
        tenant=options['tenant_id'],
        client_id=options['script_service_principal_client_id'],
        secret=options['script_service_principal_secret'],
        resource='https://graph.windows.net'
    )
    rbac_client = GraphRbacManagementClient(
        credentials, tenant_id=options['tenant_id'])
    # Create Service Principal
    current_time = datetime.now(timezone.utc)
    key = {
        'start_date': current_time.isoformat(),
        'end_date': current_time.replace(year=current_time.year + 3).isoformat(),
        'key_id': str(uuid.uuid4()),
        'value': str(uuid.uuid4())
    }
    sub_config['secret_key'] = key['value']
    sub_config['service_principal'] = rbac_client.service_principals.create({
        'app_id': sub_config['application'].app_id,
        'account_enabled': True,
        'password_credentials': [key]
    })
    return sub_config
项目:django-binder    作者:CodeYellowBV    | 项目源码 | 文件源码
def test_model_with_history_creates_changes_on_creation(self):
        model_data = {
            'name': 'Daffy Duck',
        }
        response = self.client.post('/animal/', data=json.dumps(model_data), content_type='application/json')
        self.assertEqual(response.status_code, 200)

        self.assertEqual(1, Changeset.objects.count())
        cs = Changeset.objects.get()
        self.assertEqual('testuser', cs.user.username)
        self.assertAlmostEqual(datetime.now(tz=timezone.utc), cs.date, delta=timedelta(seconds=1))

        self.assertEqual(5, Change.objects.count())
        self.assertEqual(1, Change.objects.filter(changeset=cs, model='Animal', field='name', before='null', after='"Daffy Duck"').count())
        self.assertEqual(1, Change.objects.filter(changeset=cs, model='Animal', field='id', before='null', after=Animal.objects.get().id).count())
        self.assertEqual(1, Change.objects.filter(changeset=cs, model='Animal', field='caretaker', before='null', after='null').count())
        self.assertEqual(1, Change.objects.filter(changeset=cs, model='Animal', field='zoo', before='null', after='null').count())
        self.assertEqual(1, Change.objects.filter(changeset=cs, model='Animal', field='deleted', before='null', after='false').count())
项目:django-binder    作者:CodeYellowBV    | 项目源码 | 文件源码
def test_model_with_history_creates_changes_on_creation(self):
        model_data = {
            'name': 'Daffy Duck',
        }
        response = self.client.post('/animal/', data=json.dumps(model_data), content_type='application/json')
        self.assertEqual(response.status_code, 200)

        self.assertEqual(1, Changeset.objects.count())
        cs = Changeset.objects.get()
        self.assertEqual('testuser', cs.user.username)
        self.assertAlmostEqual(datetime.now(tz=timezone.utc), cs.date, delta=timedelta(seconds=1))

        self.assertEqual(5, Change.objects.count())
        self.assertEqual(1, Change.objects.filter(changeset=cs, model='Animal', field='name', before='null', after='"Daffy Duck"').count())
        self.assertEqual(1, Change.objects.filter(changeset=cs, model='Animal', field='id', before='null', after=Animal.objects.get().id).count())
        self.assertEqual(1, Change.objects.filter(changeset=cs, model='Animal', field='caretaker', before='null', after='null').count())
        self.assertEqual(1, Change.objects.filter(changeset=cs, model='Animal', field='zoo', before='null', after='null').count())
        self.assertEqual(1, Change.objects.filter(changeset=cs, model='Animal', field='deleted', before='null', after='false').count())
项目:K8SWatch    作者:OpenSensee    | 项目源码 | 文件源码
def extract_event_info(ctxt, event, mode='good'):
    global gbl_channel
    my_list=[]

    create_field(my_list, "namespaces", event.metadata.namespace)
    create_field(my_list, "name", event.metadata.name)
    create_field(my_list, "labels", event.metadata.labels)
    create_field(my_list, "status.message", str(event.status.container_statuses[0].state))
    create_field(my_list, "status.start_time", event.status.start_time)
    create_field(my_list, "status.phase", event.status.phase)
    c = datetime.now(timezone.utc) - event.status.start_time
    create_field(my_list, "Runing since", str(c))
    text=[
    {

      "color" : mode,
      "pretext" : "Just from information from k8s" + ctxt ,
      "fields" : my_list
    }
    ]

    send_slack_msg(gbl_channel, ":k8s:", text)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def convert_between_tz_and_utc(self, tz, utc):
        dston = self.dston.replace(tzinfo=tz)
        # Because 1:MM on the day DST ends is taken as being standard time,
        # there is no spelling in tz for the last hour of daylight time.
        # For purposes of the test, the last hour of DST is 0:MM, which is
        # taken as being daylight time (and 1:MM is taken as being standard
        # time).
        dstoff = self.dstoff.replace(tzinfo=tz)
        for delta in (timedelta(weeks=13),
                      DAY,
                      HOUR,
                      timedelta(minutes=1),
                      timedelta(microseconds=1)):

            self.checkinside(dston, tz, utc, dston, dstoff)
            for during in dston + delta, dstoff - delta:
                self.checkinside(during, tz, utc, dston, dstoff)

            self.checkoutside(dstoff, tz, utc)
            for outside in dston - delta, dstoff + delta:
                self.checkoutside(outside, tz, utc)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_easy(self):
        # Despite the name of this test, the endcases are excruciating.
        self.convert_between_tz_and_utc(Eastern, utc_real)
        self.convert_between_tz_and_utc(Pacific, utc_real)
        self.convert_between_tz_and_utc(Eastern, utc_fake)
        self.convert_between_tz_and_utc(Pacific, utc_fake)
        # The next is really dancing near the edge.  It works because
        # Pacific and Eastern are far enough apart that their "problem
        # hours" don't overlap.
        self.convert_between_tz_and_utc(Eastern, Pacific)
        self.convert_between_tz_and_utc(Pacific, Eastern)
        # OTOH, these fail!  Don't enable them.  The difficulty is that
        # the edge case tests assume that every hour is representable in
        # the "utc" class.  This is always true for a fixed-offset tzinfo
        # class (lke utc_real and utc_fake), but not for Eastern or Central.
        # For these adjacent DST-aware time zones, the range of time offsets
        # tested ends up creating hours in the one that aren't representable
        # in the other.  For the same reason, we would see failures in the
        # Eastern vs Pacific tests too if we added 3*HOUR to the list of
        # offset deltas in convert_between_tz_and_utc().
        #
        # self.convert_between_tz_and_utc(Eastern, Central)  # can't work
        # self.convert_between_tz_and_utc(Central, Eastern)  # can't work
项目:mosquito    作者:miti0    | 项目源码 | 文件源码
def get_volume_from_history(history, candle_size):
        """
        Returns volume for given candle_size
        :param history: history data
        :param candle_size: in minutes
        :return: Calculated volume for given candle_size
        """
        volume = 0.0
        epoch_now = int(time.time())
        epoch_candle_start = epoch_now - candle_size * 60
        pattern = '%Y-%m-%dT%H:%M:%S'
        for item in history:
            time_string = item['TimeStamp'].split('.', 1)[0]
            dt = datetime.datetime.strptime(time_string, pattern)
            item_epoch = dt.replace(tzinfo=timezone.utc).timestamp()
            if item_epoch >= epoch_candle_start:
                quantity = item['Quantity']
                volume += quantity
        return volume
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def tinkers_construct_file(tinkers_construct) -> addon.File:
    """Tinkers construct file."""

    return addon.File(
        id=2338518,
        mod=tinkers_construct,
        name='TConstruct-1.10.2-2.5.6b.jar',
        date=datetime(
            year=2016, month=10, day=22,
            hour=15, minute=11, second=19,
            tzinfo=timezone.utc,
        ),
        release=proxy.Release.Release,
        url='https://addons.cursecdn.com/files/2338/518/TConstruct-1.10.2-2.5.6b.jar',
        dependencies=[74924],
    )
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def tinkers_update(tinkers_construct) -> addon.File:
    """Update for tinkers_construct_file."""

    return addon.File(
        id=2353329,
        mod=tinkers_construct,
        name='TConstruct-1.10.2-2.6.1.jar',
        date=datetime(
            year=2016, month=12, day=7,
            hour=18, minute=35, second=45,
            tzinfo=timezone.utc,
        ),
        release=proxy.Release.Release,
        url='https://addons.cursecdn.com/files/2353/329/TConstruct-1.10.2-2.6.1.jar',
        dependencies=[74924],
    )
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def mantle_file(mantle) -> addon.File:
    """Mantle (Tinkers dependency) file."""

    return addon.File(
        id=2366244,
        mod=mantle,
        name='Mantle-1.10.2-1.1.4.jar',
        date=datetime(
            year=2017, month=1, day=9,
            hour=19, minute=40, second=41,
            tzinfo=timezone.utc,
        ),
        release=proxy.Release.Release,
        url='https://addons.cursecdn.com/files/2366/244/Mantle-1.10.2-1.1.4.jar',
        dependencies=[],
    )
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def test_file_init():
    """Does the File initialization behaves as expected?"""

    m = addon.Mod(id=42, name=str(), summary=str())

    addon.File(
        id=42, mod=m,
        name='test.jar', date=datetime.now(tz=timezone.utc),
        release=addon.Release.Release, url='https://httpbin.org',
    )
    addon.File(
        id=43, mod=m,
        name='test.jar', date=datetime.now(tz=timezone.utc),
        release=addon.Release.Alpha, url='https://httpbin.org',
    )

    with pytest.raises(TypeError):
        addon.File(
            id='43', mod=m,
            name='test.jar', date=datetime.now(tz=timezone.utc),
            release=addon.Release.Beta, url=None,
        )

    assert len(responses.calls) == 0
项目:flash_services    作者:textbook    | 项目源码 | 文件源码
def test_calculate_timeout_http_date():
    three_minutes_later = datetime.now(tz=timezone.utc) + timedelta(minutes=3)
    http_date = '%a, %d %b %Y %H:%M:%S %Z'
    assert 179 <= Service.calculate_timeout(
        three_minutes_later.strftime(http_date),
    ) <= 181
项目:sopel-modules    作者:phixion    | 项目源码 | 文件源码
def getinfo(run,now):
    schedule = run.find_all('tr',attrs={'class':None})
    game,runner,console,comment,eta,nextgame,nextrunner,nextconsole,nexteta,nextcomment = '','','','','','','','','',''
    for item in schedule:
        group = item.find_all('td')
        try:
            group2 = item.find_next_sibling().find_all('td')
        except:
            nextgame = False
            return (game, runner, console, comment, eta, nextgame, nextrunner, nexteta, nextconsole, nextcomment)
        st = group[0].getText()
        #estfix = timedelta(hours=-5)
        starttime = datetime.strptime(st, '%Y-%m-%dT%H:%M:%SZ' )
        starttime = starttime.replace(tzinfo=timezone.utc)
        #starttime = starttime + estfix
        try:
            offset = datetime.strptime(group2[0].getText().strip(), "%H:%M:%S")
            endtime = starttime + timedelta(hours = offset.hour, minutes = offset.minute, seconds=offset.second)
        except:
            endtime = datetime(2011,1,1,12,00)
        if starttime < now and endtime > now:
            game = group[1].getText()
            runner = group[2].getText()
            #console = group[3].getText()
            comment = group2[1].getText()
            eta = group2[0].getText().strip()
        if starttime > now:
            nextgame = group[1].getText()
            nextrunner = group[2].getText()
            #nextconsole = group[3].getText()
            nexteta = group2[0].getText().strip()
            nextcomment = group2[1].getText()
            break
        else:
            nextgame = 'done'
            nextrunner = 'done'
    return (game, runner, console, comment, eta, nextgame, nextrunner, nexteta, nextconsole, nextcomment)
项目:sopel-modules    作者:phixion    | 项目源码 | 文件源码
def gdq(bot, trigger):
    now = datetime.utcnow()
    now = now.replace(tzinfo=timezone.utc)
    delta = datetime(2018,1,7,16,30,tzinfo=timezone.utc) - now
    textdate = "January 7"
    url = 'https://gamesdonequick.com/schedule'
    try:
        x = requests.get(url).content
    except:
        return bot.say("GDQ is {0} days away ({1})".format(delta.days,textdate))
    bs = BeautifulSoup(x)
    try:
        run = bs.find("table",{"id":"runTable"}).tbody
    except:
        return bot.say("GDQ is {0} days away ({1})".format(delta.days, textdate))
    try:
        gdqstart = datetime.strptime(run.td.getText(), '%Y-%m-%dT%H:%M:%SZ')
        gdqstart = gdqstart.replace(tzinfo=timezone.utc)
    except:
        return bot.say("GDQ is {0} days away ({1})".format(delta.days, textdate))
    (game, runner, console, comment, eta, nextgame, nextrunner, nexteta, nextconsole, nextcomment) = getinfo(run,now)
    if not nextgame:
        return bot.say("GDQ is {0} days away ({1})".format(delta.days,textdate))
    if now < gdqstart:
        tts = gdqstart - now
        if tts.days <= 3:
            return bot.say("GDQ is {0}H{1}M away.  First game: {2} by {3} ETA: {4} Comment: {5} | https://gamesdonequick.com/schedule".format(int(tts.total_seconds() // 3600),int((tts.total_seconds() % 3600) // 60), nextgame, nextrunner, nexteta, nextcomment))
        else:
            return bot.say("GDQ is {0} days away ({1}) | https://gamesdonequick.com/schedule".format(tts.days,gdqstart.strftime('%m/%d/%Y')))

    if nextgame == 'done':
        return bot.say("GDQ is {0} days away ({1} [estimated])".format(delta.days,textdate))
    if game:
        if comment:
            bot.say("Current Game: {0} by {1} ETA: {2} Comment: {3} | Next Game: {4} by {5} | http://www.twitch.tv/gamesdonequick | https://gamesdonequick.com/schedule".format(game, runner, eta, comment, nextgame, nextrunner))
        else:
            bot.say("Current Game: {0} by {1} ETA: {2} | Next Game: {3} by {4} | http://www.twitch.tv/gamesdonequick | https://gamesdonequick.com/schedule".format(game, runner, eta, nextgame, nextrunner))
    else:
        bot.say("Current Game: setup?? | Next Game {0} by {1} | http://www.twitch.tv/gamesdonequick | https://gamesdonequick.com/schedule".format(nextgame, nextrunner))
项目:dontwi    作者:vocalodon    | 项目源码 | 文件源码
def get_processed_at_dict():
        return {"processed_at": datetime.now(timezone.utc).isoformat()}
项目:rpi-can-logger    作者:JonnoFTW    | 项目源码 | 文件源码
def read(self):
        out = {k: None for k in self.FIELDS + self.EXTRA_FIELD}
        start = datetime.now()
        self.ser.reset_input_buffer()
        while not all(out.values()):
            line = self.readline()
            if (datetime.now() - start).total_seconds() > self.timeout:
                break
            line = re.sub(r'[\x00-\x1F]|\r|\n|\t|\$', "", line)

            cmd = line.split(',')[0]
            if cmd not in ['GNGGA', 'GNRMC']:
                continue
            try:
                msg = pynmea2.parse(line)
                for key in out:
                    if hasattr(msg, key):
                        out[key] = getattr(msg, key)
            except pynmea2.ParseError as e:
                print("Parse error:", e)

        if out['datestamp'] is not None and out['timestamp'] is not None:
            timestamp = datetime.combine(out['datestamp'], out['timestamp']).replace(tzinfo=timezone.utc)
            out['timestamp'] = timestamp.isoformat()
        else:
            del out['timestamp']
        if out[self.FIELDS[-1]] is not None:
            out[self.FIELDS[-1]] *= self.KNOTS_PER_KMPH
        if out.get('latitude') is not None and out.get('longitude') is not None:
            if out['latitude'] != 0.0 and out['longitude'] != 0.0:
                out['pos'] = {
                    'type': 'Point',
                    'coordinates': [out['longitude'], out['latitude']]
                }
            del out['latitude']
            del out['longitude']
        for f in self.EXTRA_FIELD:
            if f in out:
                del out[f]

        return out
项目:rpi-can-logger    作者:JonnoFTW    | 项目源码 | 文件源码
def time_date(msg):
    if msg == b'\xff\xff\xff\xff\xff\xff\xff\xff':
        return 'err'
    return datetime(
        year=msg[5] + 1985,
        month=msg[3],
        day=msg[4] // 4 + 1,
        hour=msg[2],
        minute=msg[1],
        second=msg[0] // 4,
        tzinfo=timezone.utc

    ).isoformat()
项目:saapy    作者:ashapochka    | 项目源码 | 文件源码
def _wrap_payload(payload_key, payload):
        now = datetime.now()
        timestamp = now.replace(tzinfo=timezone.utc).timestamp()
        wrapper = {
            'meta': {
                'method': 'fernet',
                'timestamp': timestamp,
                'timezone': 'utc'
            },
            payload_key: payload
        }
        return wrapper