Python time 模块,monotonic() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用time.monotonic()

项目:uchroma    作者:cyanogen    | 项目源码 | 文件源码
def smart_delay(delay: float, last_cmd: float, remain: int=0) -> float:
    """
    A "smart" delay mechanism which tries to reduce the
    delay as much as possible based on the time the last
    delay happened.

    :param delay: delay in seconds
    :param last_cmd: time of last command
    :param remain: counter, skip delay unless it's zero

    :return: timestamp to feed to next invocation
    """
    now = time.monotonic()

    if remain == 0 and last_cmd is not None and delay > 0.0:

        delta = now - last_cmd
        if delta < delay:
            sleep = delay - delta
            time.sleep(sleep)

    return now
项目:mongodb-monitoring    作者:jruaux    | 项目源码 | 文件源码
def acquire(self, blocking=True, timeout=None):
        if not blocking and timeout is not None:
            raise ValueError("can't specify timeout for non-blocking acquire")
        rc = False
        endtime = None
        self._cond.acquire()
        while self._value == 0:
            if not blocking:
                break
            if timeout is not None:
                if endtime is None:
                    endtime = _time() + timeout
                else:
                    timeout = endtime - _time()
                    if timeout <= 0:
                        break
            self._cond.wait(timeout)
        else:
            self._value = self._value - 1
            rc = True
        self._cond.release()
        return rc
项目:runcommands    作者:wylee    | 项目源码 | 文件源码
def run(self, run_config, argv, **kwargs):
        if self.timed:
            start_time = time.monotonic()

        run_env = self.get_run_env(run_config.env, run_config.default_env)
        run_config = run_config.copy(env=run_env)

        config = Config(run=run_config, **self.config.copy())

        all_args = self.parse_args(config, argv)
        all_args.update(kwargs)
        result = self(config, **all_args)

        if self.timed:
            hide = kwargs.get('hide', config.run.hide)
            if not Hide.hide_stdout(hide):
                self.print_elapsed_time(time.monotonic() - start_time)

        return result
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def acquire(self, blocking=True, timeout=None):
        if not blocking and timeout is not None:
            raise ValueError("can't specify timeout for non-blocking acquire")
        rc = False
        endtime = None
        self._cond.acquire()
        while self._value == 0:
            if not blocking:
                break
            if timeout is not None:
                if endtime is None:
                    endtime = _time() + timeout
                else:
                    timeout = endtime - _time()
                    if timeout <= 0:
                        break
            self._cond.wait(timeout)
        else:
            self._value = self._value - 1
            rc = True
        self._cond.release()
        return rc
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def acquire(self, blocking=True, timeout=None):
        if not blocking and timeout is not None:
            raise ValueError("can't specify timeout for non-blocking acquire")
        rc = False
        endtime = None
        self._cond.acquire()
        while self._value == 0:
            if not blocking:
                break
            if timeout is not None:
                if endtime is None:
                    endtime = _time() + timeout
                else:
                    timeout = endtime - _time()
                    if timeout <= 0:
                        break
            self._cond.wait(timeout)
        else:
            self._value = self._value - 1
            rc = True
        self._cond.release()
        return rc
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def acquire(self, blocking=True, timeout=None):
        if not blocking and timeout is not None:
            raise ValueError("can't specify timeout for non-blocking acquire")
        rc = False
        endtime = None
        self._cond.acquire()
        while self._value == 0:
            if not blocking:
                break
            if timeout is not None:
                if endtime is None:
                    endtime = _time() + timeout
                else:
                    timeout = endtime - _time()
                    if timeout <= 0:
                        break
            self._cond.wait(timeout)
        else:
            self._value = self._value - 1
            rc = True
        self._cond.release()
        return rc
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def acquire(self, blocking=True, timeout=None):
        if not blocking and timeout is not None:
            raise ValueError("can't specify timeout for non-blocking acquire")
        rc = False
        endtime = None
        self._cond.acquire()
        while self._value == 0:
            if not blocking:
                break
            if timeout is not None:
                if endtime is None:
                    endtime = _time() + timeout
                else:
                    timeout = endtime - _time()
                    if timeout <= 0:
                        break
            self._cond.wait(timeout)
        else:
            self._value = self._value - 1
            rc = True
        self._cond.release()
        return rc
项目:gui_tool    作者:UAVCAN    | 项目源码 | 文件源码
def register_event(self, timestamp):
        self._events_since_checkpoint += 1

        dt = timestamp - self._checkpoint_ts
        if dt >= self._update_interval:
            # Resetting the stat if expired
            mono_ts = time.monotonic()
            expired = mono_ts > self._estimate_expires_at
            self._estimate_expires_at = mono_ts + self._estimate_lifetime
            if expired:
                self._hist = []
            elif len(self._hist) >= self._averaging_period:
                self._hist.pop()
            # Updating the history
            self._hist.insert(0, self._events_since_checkpoint / dt)
            self._checkpoint_ts = timestamp
            self._events_since_checkpoint = 0
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def __init__(self):
        self._timer_cancelled_count = 0
        self._closed = False
        self._ready = collections.deque()
        self._scheduled = []
        self._default_executor = None
        self._internal_fds = 0
        # Identifier of the thread running the event loop, or None if the
        # event loop is not running
        self._thread_id = None
        self._clock_resolution = time.get_clock_info('monotonic').resolution
        self._exception_handler = None
        self._debug = (not sys.flags.ignore_environment
                       and bool(os.environ.get('PYTHONASYNCIODEBUG')))
        # In debug mode, if the execution of a callback or a step of a task
        # exceed this duration in seconds, the slow callback/task is logged.
        self.slow_callback_duration = 0.1
        self._current_handle = None
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test__run_once(self):
        h1 = asyncio.TimerHandle(time.monotonic() + 5.0, lambda: True, (),
                                 self.loop)
        h2 = asyncio.TimerHandle(time.monotonic() + 10.0, lambda: True, (),
                                 self.loop)

        h1.cancel()

        self.loop._process_events = mock.Mock()
        self.loop._scheduled.append(h1)
        self.loop._scheduled.append(h2)
        self.loop._run_once()

        t = self.loop._selector.select.call_args[0][0]
        self.assertTrue(9.5 < t < 10.5, t)
        self.assertEqual([h2], self.loop._scheduled)
        self.assertTrue(self.loop._process_events.called)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test__run_once_schedule_handle(self):
        handle = None
        processed = False

        def cb(loop):
            nonlocal processed, handle
            processed = True
            handle = loop.call_soon(lambda: True)

        h = asyncio.TimerHandle(time.monotonic() - 1, cb, (self.loop,),
                                self.loop)

        self.loop._process_events = mock.Mock()
        self.loop._scheduled.append(h)
        self.loop._run_once()

        self.assertTrue(processed)
        self.assertEqual([handle], list(self.loop._ready))
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_timeout(self):
        s = self.SELECTOR()
        self.addCleanup(s.close)

        rd, wr = self.make_socketpair()

        s.register(wr, selectors.EVENT_WRITE)
        t = time()
        self.assertEqual(1, len(s.select(0)))
        self.assertEqual(1, len(s.select(-1)))
        self.assertLess(time() - t, 0.5)

        s.unregister(wr)
        s.register(rd, selectors.EVENT_READ)
        t = time()
        self.assertFalse(s.select(0))
        self.assertFalse(s.select(-1))
        self.assertLess(time() - t, 0.5)

        t0 = time()
        self.assertFalse(s.select(1))
        t1 = time()
        dt = t1 - t0
        # Tolerate 2.0 seconds for very slow buildbots
        self.assertTrue(0.8 <= dt <= 2.0, dt)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_select_interrupt(self):
        s = self.SELECTOR()
        self.addCleanup(s.close)

        rd, wr = self.make_socketpair()

        orig_alrm_handler = signal.signal(signal.SIGALRM, lambda *args: None)
        self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)
        self.addCleanup(signal.alarm, 0)

        signal.alarm(1)

        s.register(rd, selectors.EVENT_READ)
        t = time()
        self.assertFalse(s.select(2))
        self.assertLess(time() - t, 2.5)
项目:git-annex-metadata-gui    作者:alpernebbi    | 项目源码 | 文件源码
def __call__(self, instance=None):
        if instance is not None and instance is not self._instance:
            fmt = "Instance mismatch on autoconsumer {}, ({} vs {})."
            msg = fmt.format(
                self._function.__name__,
                instance, self._instance,
            )
            logger.critical(msg)

        if self._generator is None:
            return

        try:
            endtime = time.monotonic() + self._timeout
            while time.monotonic() < endtime:
                next(self._generator)

        except StopIteration:
            self._generator = None

        else:
            QtCore.QMetaObject.invokeMethod(
                self._instance, self._function.__name__,
                Qt.Qt.QueuedConnection,
            )
项目:Godavaru    作者:Godavaru    | 项目源码 | 文件源码
def ping(self, ctx):
        """Check my response time and my websocket ping

        **Usage:** `g_ping`

        **Permission:** User"""
        console = self.bot.get_channel(316688736089800715)
        before = datetime.datetime.utcnow()
        ping_msg = await console.send(":mega: **Pinging...**")
        ping = (datetime.datetime.utcnow() - before) * 1000
        before2 = time.monotonic()
        await (await self.bot.ws.ping())
        after = time.monotonic()
        ping2 = (after - before2) * 1000
        var = int(random.random() * 5)
        v = ["a", "e", "i", "o", "u"]
        await ping_msg.edit(content=':warning: [`'+str(datetime.datetime.now().strftime("%d/%m/%y %H:%M:%S"))+'`] `' + ctx.message.author.name + '#' + ctx.message.author.discriminator + '` checked my ping in the channel `' + ctx.message.channel.name + '` in the server `' + ctx.message.guild.name + '`. The result was {:.2f}ms'.format(ping.total_seconds())+" with a websocket ping of {0:.0f}ms".format(ping2))
        await ctx.send(":mega: P"+v[var]+"ng! The message took **{:.0f}ms**!".format(ping.total_seconds())+" `Websocket: {0:.0f}ms` :thinking:".format(ping2))
项目:merakicommons    作者:meraki-analytics    | 项目源码 | 文件源码
def test_window_decorator_timing():
    from time import monotonic

    limiter = FixedWindowRateLimiter(SECONDS, PERMITS)

    @limiter.limit
    def call():
        return monotonic()

    times = []
    for _ in range(VALUE_COUNT):
        times.append(call())

    start_indexes = [i for i in range(VALUE_COUNT) if i % PERMITS == 0]

    last = -SECONDS
    for index in start_indexes:
        assert times[index] - last >= SECONDS - EPSILON
        last = times[index]
项目:merakicommons    作者:meraki-analytics    | 项目源码 | 文件源码
def test_window_decorator_across_windows():
    from time import monotonic, sleep

    limiter = FixedWindowRateLimiter(SECONDS, 1)

    @limiter.limit
    def call():
        t = monotonic()
        sleep(SECONDS * 1.25)
        return t

    # This should take up two fixed windows for the first task, and the second shouldn't execute until the third (after 2 x SECONDS).
    first = call()
    second = call()

    assert (SECONDS - EPSILON) * 3 >= second - first >= (SECONDS - EPSILON) * 2

##########################
# TokenBucketRateLimiter #
##########################
项目:merakicommons    作者:meraki-analytics    | 项目源码 | 文件源码
def test_bucket_acquire_timing():
    from time import monotonic

    limiter = TokenBucketRateLimiter(SECONDS, PERMITS, BURST, TOKENS)
    times = []
    for _ in range(VALUE_COUNT):
        with limiter:
            times.append(monotonic())

    frequency = SECONDS / PERMITS

    first_time = times[0]
    expected_times = []
    for i in range(BURST, len(times)):
        expected_times.append(first_time + (i - BURST + 1) * frequency)

    # Shouldn't wait for BURST calls
    for i in range(BURST - 1):
        assert times[i + 1] - times[i] < frequency - EPSILON

    # After burst, the rest should fall at about the expected frequency
    times = times[BURST:]
    epsilon = 0.04
    for i, time in enumerate(times):
        assert expected_times[i] - epsilon <= time <= expected_times[i] + epsilon
项目:merakicommons    作者:meraki-analytics    | 项目源码 | 文件源码
def test_windowed_bucket_acquire_timing():
    from time import monotonic

    limiter = WindowedTokenBucketRateLimiter(SECONDS, PERMITS, BURST, TOKENS)
    times = []
    for _ in range(VALUE_COUNT):
        with limiter:
            times.append(monotonic())

    frequency = SECONDS / PERMITS

    first_time = times[0]
    expected_times = [first_time]
    for i in range(1, len(times)):
        expected_times.append(first_time + i * frequency)

    epsilon = 0.04
    for i, time in enumerate(times):
        assert expected_times[i] - epsilon <= time <= expected_times[i] + epsilon
项目:merakicommons    作者:meraki-analytics    | 项目源码 | 文件源码
def test_windowed_bucket_decorator_timing():
    from time import monotonic

    limiter = WindowedTokenBucketRateLimiter(SECONDS, PERMITS, BURST, TOKENS)

    @limiter.limit
    def call():
        return monotonic()

    times = []
    for _ in range(VALUE_COUNT):
        times.append(call())

    frequency = SECONDS / PERMITS

    first_time = times[0]
    expected_times = [first_time]
    for i in range(1, len(times)):
        expected_times.append(first_time + i * frequency)

    epsilon = 0.04
    for i, time in enumerate(times):
        assert expected_times[i] - epsilon <= time <= expected_times[i] + epsilon
项目:Monocle    作者:Noctem    | 项目源码 | 文件源码
def try_point(self, point, spawn_time=None, spawn_id=None):
        try:
            point = randomize_point(point)
            skip_time = monotonic() + (conf.GIVE_UP_KNOWN if spawn_time else conf.GIVE_UP_UNKNOWN)
            worker = await self.best_worker(point, skip_time)
            if not worker:
                if spawn_time:
                    self.skipped += 1
                return
            async with worker.busy:
                if spawn_time:
                    worker.after_spawn = time() - spawn_time

                if await worker.visit(point, spawn_id):
                    self.visits += 1
        except CancelledError:
            raise
        except Exception:
            self.log.exception('An exception occurred in try_point')
        finally:
            self.coroutine_semaphore.release()
项目:Monocle    作者:Noctem    | 项目源码 | 文件源码
def best_worker(self, point, skip_time):
        good_enough = conf.GOOD_ENOUGH
        while self.running:
            gen = (w for w in self.workers if not w.busy.locked())
            try:
                worker = next(gen)
                lowest_speed = worker.travel_speed(point)
            except StopIteration:
                lowest_speed = float('inf')
            for w in gen:
                speed = w.travel_speed(point)
                if speed < lowest_speed:
                    lowest_speed = speed
                    worker = w
                    if speed < good_enough:
                        break
            if lowest_speed < conf.SPEED_LIMIT:
                worker.speed = lowest_speed
                return worker
            if skip_time and monotonic() > skip_time:
                return None
            await sleep(conf.SEARCH_SLEEP, loop=LOOP)
项目:Monocle    作者:Noctem    | 项目源码 | 文件源码
def __init__(self):
        self.cache = NotificationCache()
        self.notify_ranking = conf.NOTIFY_RANKING
        self.initial_score = conf.INITIAL_SCORE
        self.minimum_score = conf.MINIMUM_SCORE
        self.last_notification = monotonic() - (conf.FULL_TIME / 2)
        self.always_notify = []
        self.log = get_logger('notifier')
        self.never_notify = conf.NEVER_NOTIFY_IDS
        self.rarity_override = conf.RARITY_OVERRIDE
        self.sent = 0
        if self.notify_ranking:
            self.initialize_ranking()
            LOOP.call_later(3600, self.set_notify_ids)
        elif conf.NOTIFY_IDS or conf.ALWAYS_NOTIFY_IDS:
            self.notify_ids = conf.NOTIFY_IDS or conf.ALWAYS_NOTIFY_IDS
            self.always_notify = conf.ALWAYS_NOTIFY_IDS
            self.notify_ranking = len(self.notify_ids)
项目:deb-python-greenio    作者:openstack    | 项目源码 | 文件源码
def db():
        conn = GreenConnection(host='localhost')

        try:
            with conn as cur:
                print('>> sleeping')
                st = time.monotonic()
                cur.execute('SELECT SLEEP(2)')
                en = time.monotonic() - st
                assert en >= 2
                print('<< sleeping {:.3f}s'.format(en))

                cur.execute('SELECT 42')
                print('"SELECT 42" -> {!r}'.format(cur.fetchone()))

                print('>> sleeping')
                st = time.monotonic()
                cur.execute('SELECT SLEEP(1)')
                en = time.monotonic() - st
                assert en >= 1
                print('<< sleeping {:.3f}s'.format(en))
        finally:
            conn.close()
项目:exchangelib    作者:ecederstrand    | 项目源码 | 文件源码
def test(items):
    t1 = time.monotonic()
    ids = account.calendar.bulk_create(items=items)
    t2 = time.monotonic()
    account.bulk_delete(ids)
    t3 = time.monotonic()

    delta1 = t2 - t1
    rate1 = len(ids) / delta1
    delta2 = t3 - t2
    rate2 = len(ids) / delta2
    print(('Time to process %s items (batchsize %s/%s, poolsize %s): %s / %s (%s / %s per sec)' % (
        len(ids), services.CreateItem.CHUNKSIZE, services.DeleteItem.CHUNKSIZE,
        config.protocol.poolsize, delta1, delta2, rate1, rate2)))


# Generate items
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def __init__(self, rate=0.0, autojump_threshold=inf):
        # when the real clock said 'real_base', the virtual time was
        # 'virtual_base', and since then it's advanced at 'rate' virtual
        # seconds per real second.
        self._real_base = 0.0
        self._virtual_base = 0.0
        self._rate = 0.0
        self._autojump_threshold = 0.0
        self._autojump_task = None
        self._autojump_cancel_scope = None
        # kept as an attribute so that our tests can monkeypatch it
        self._real_clock = time.monotonic

        # use the property update logic to set initial values
        self.rate = rate
        self.autojump_threshold = autojump_threshold
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def test_timekeeping():
    # probably a good idea to use a real clock for *one* test anyway...
    TARGET = 0.1
    # give it a few tries in case of random CI server flakiness
    for _ in range(4):
        real_start = time.monotonic()
        with _core.open_cancel_scope() as scope:
            scope.deadline = _core.current_time() + TARGET
            await sleep_forever()
        real_duration = time.monotonic() - real_start
        accuracy = real_duration / TARGET
        print(accuracy)
        # Actual time elapsed should always be >= target time
        # (== is possible because time.monotonic on Windows is really low res)
        if 1.0 <= accuracy < 2:  # pragma: no branch
            break
    else:  # pragma: no cover
        assert False
项目:pgbench    作者:MagicStack    | 项目源码 | 文件源码
def worker(executor, eargs, start, duration, timeout):
    queries = 0
    rows = 0
    latency_stats = np.zeros((timeout * 100,))
    min_latency = float('inf')
    max_latency = 0.0

    while time.monotonic() - start < duration:
        req_start = time.monotonic()
        rows += await executor(*eargs)
        req_time = round((time.monotonic() - req_start) * 1000 * 100)

        if req_time > max_latency:
            max_latency = req_time
        if req_time < min_latency:
            min_latency = req_time
        latency_stats[req_time] += 1
        queries += 1

    return queries, rows, latency_stats, min_latency, max_latency
项目:pgbench    作者:MagicStack    | 项目源码 | 文件源码
def sync_worker(executor, eargs, start, duration, timeout):
    queries = 0
    rows = 0
    latency_stats = np.zeros((timeout * 100,))
    min_latency = float('inf')
    max_latency = 0.0

    while time.monotonic() - start < duration:
        req_start = time.monotonic()
        rows += executor(*eargs)
        req_time = round((time.monotonic() - req_start) * 1000 * 100)

        if req_time > max_latency:
            max_latency = req_time
        if req_time < min_latency:
            min_latency = req_time
        latency_stats[req_time] += 1
        queries += 1

    return queries, rows, latency_stats, min_latency, max_latency
项目:_    作者:zengchunyun    | 项目源码 | 文件源码
def callback(self, ch, method, properties, body):
        """
        ????,????????????rabbitmq???
        :param ch:  ???self.channel
        :param method:
        :param properties:???????????
        :param body:????
        :return:
        """
        before = time.monotonic()  # ?????????????
        exec_cmd = threading.Thread(target=self.exec_call, args=(body,))
        exec_cmd.start()
        exec_cmd.join(self.timeout)
        after = time.monotonic()  # ????????????,????????????
        if (after - before) > self.timeout:  # ????????????????,??????????,???????????
            self.response = bytes("command exec timeout", "utf8")
        print(" [*] Got a task {}".format(str(body, "utf8)")))
        message = {"host": self.id, "data": self.response}
        ch.basic_publish(exchange="",
                         routing_key=properties.reply_to,
                         properties=pika.BasicProperties(
                             correlation_id=properties.correlation_id,),
                         body=bytes(str(message), "utf-8"))
        ch.basic_ack(delivery_tag=method.delivery_tag)
项目:respeaker_python_library    作者:respeaker    | 项目源码 | 文件源码
def authenticate(self):
        if self.expire_time is None or monotonic() > self.expire_time:  # first credential request, or the access token from the previous one expired
            # get an access token using OAuth
            credential_url = "https://api.cognitive.microsoft.com/sts/v1.0/issueToken"
            headers = {"Ocp-Apim-Subscription-Key": self.key}

            start_time = monotonic()
            response = self.session.post(credential_url, headers=headers)

            if response.status_code != 200:
                raise RequestError("http request error with status code {}".format(response.status_code))

            self.access_token = response.content
            expiry_seconds = 590 # document mentions the access token is expired in 10 minutes

            self.expire_time = start_time + expiry_seconds
项目:MPV-VJ2    作者:paulguy    | 项目源码 | 文件源码
def sendResponse(self, responseType, value, args=None):
        if args is None:
            args = {responseType: value}
        else:
            if type(args) != dict:
                raise TypeError
            args.update({responseType: value})

        if self.socket is not None:
            if self.socket.connected:
                self.logger.log("server <-- " + repr(args))
            else:
                self.logger.log("buffer <-- " + repr(args))
            try:
                self.socket.sendObjAsJSON(args)
                self.connectTime = time.monotonic()
            except ConnectionRefusedError:
                self.logger.log("Connection refused.")
                self.disconnect()
            except BrokenPipeError:
                self.logger.log("Broken pipe, connection dropped.")
                self.disconnect()
        else:
            self.logger.log("nobody <-- " + repr(args))
项目:stig    作者:rndusr    | 项目源码 | 文件源码
def apply(self, items, inplace=False, item_getter=lambda item: item):
        """Sort sequence `items`

        item_getter: Callable that gets an item of `items` and returns an
                     object that can be sorted with any of the sorters
                     specified in the SORTSPECS variable. (This allows for
                     sorting of widgets as long as they can provide a sortable
                     object.)
        inplace: Modify `items` if True, otherwise return a new list
        """
        import time
        start_time = time.monotonic()

        for sorter in self._sortfuncs:
            items = sorter(items, inplace=inplace, item_getter=item_getter)

        log.debug('-> Sorted %d items by %s in %.3fms',
                  len(items), self, (time.monotonic()-start_time)*1e3)

        if not inplace:
            return items
项目:covar_me_app    作者:CovarMe    | 项目源码 | 文件源码
def acquire(self, blocking=True, timeout=None):
        if not blocking and timeout is not None:
            raise ValueError("can't specify timeout for non-blocking acquire")
        rc = False
        endtime = None
        self._cond.acquire()
        while self._value == 0:
            if not blocking:
                break
            if timeout is not None:
                if endtime is None:
                    endtime = _time() + timeout
                else:
                    timeout = endtime - _time()
                    if timeout <= 0:
                        break
            self._cond.wait(timeout)
        else:
            self._value = self._value - 1
            rc = True
        self._cond.release()
        return rc
项目:Chiaki-Nanami    作者:Ikusaba-san    | 项目源码 | 文件源码
def ping(self, ctx):
        """Your average ping command."""
        # Set the embed for the pre-ping
        clock = random.randint(0x1F550, 0x1F567)  # pick a random clock
        embed = discord.Embed(colour=0xFFC107)
        embed.set_author(name=random.choice(PRE_PING_REMARKS), icon_url=emoji_url(chr(clock)))

        # Do the classic ping
        start = time.perf_counter()     # fuck time.monotonic()
        message = await ctx.send(embed=embed)
        end = time.perf_counter()       # fuck time.monotonic()
        ms = (end - start) * 1000

        # Edit the embed to show the actual ping
        embed.colour = 0x4CAF50
        embed.set_author(name='Poing!', icon_url=emoji_url('\U0001f3d3'))
        embed.add_field(name='Latency', value=f'{ctx.bot.latency * 1000 :.0f} ms')
        embed.add_field(name='Classic', value=f'{ms :.0f} ms', inline=False)

        await message.edit(embed=embed)
项目:Chiaki-Nanami    作者:Ikusaba-san    | 项目源码 | 文件源码
def stop(self):
        """Exit the help page"""
        super().stop()

        # Only do it for a minute, so if someone does a quick stop,
        # we'll grant them their wish of stopping early.
        end = time.monotonic()
        if end - self._start_time < 60:
            return

        final_embed = (discord.Embed(colour=self.colour, description='*Just remember...* \N{HEAVY BLACK HEART}')
                       .set_author(name='Thank you for looking at the help page!')
                       .set_image(url=CHIAKI_MOTIVATION_URL)
                       )

        # haaaaaaaaaaaack
        await self._message.edit(embed=final_embed)
        return await asyncio.sleep(10)
项目:performance    作者:python    | 项目源码 | 文件源码
def main(self):
        self.start = time.monotonic()

        self.prepare()
        failed = self.compile_bench()

        dt = time.monotonic() - self.start
        dt = datetime.timedelta(seconds=dt)
        self.logger.error("Benchmark completed in %s" % dt)

        if self.uploaded:
            self.logger.error("Benchmark results uploaded and written into %s"
                              % self.upload_filename)
        elif failed:
            self.logger.error("Benchmark failed but results written into %s"
                              % self.filename)
        else:
            self.logger.error("Benchmark result written into %s"
                              % self.filename)

        if failed:
            sys.exit(EXIT_BENCH_ERROR)
项目:websearch    作者:abelkhan    | 项目源码 | 文件源码
def acquire(self, blocking=True, timeout=None):
        if not blocking and timeout is not None:
            raise ValueError("can't specify timeout for non-blocking acquire")
        rc = False
        endtime = None
        self._cond.acquire()
        while self._value == 0:
            if not blocking:
                break
            if timeout is not None:
                if endtime is None:
                    endtime = _time() + timeout
                else:
                    timeout = endtime - _time()
                    if timeout <= 0:
                        break
            self._cond.wait(timeout)
        else:
            self._value = self._value - 1
            rc = True
        self._cond.release()
        return rc
项目:luma.core    作者:rm-hull    | 项目源码 | 文件源码
def __exit__(self, *args):
        """
        Sleeps for a variable amount of time (dependent on when it was last
        called), to give a consistent frame rate. If it cannot meet the desired
        frame rate (i.e. too much time has occurred since the last call), then
        it simply exits without blocking.
        """
        self.called += 1
        self.total_transit_time += monotonic() - self.enter_time
        if self.max_sleep_time >= 0:
            elapsed = monotonic() - self.last_time
            sleep_for = self.max_sleep_time - elapsed

            if sleep_for > 0:
                time.sleep(sleep_for)

        self.last_time = monotonic()
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def wait_for(self, predicate, timeout=None):
        """Wait until a condition evaluates to True.

        predicate should be a callable which result will be interpreted as a
        boolean value.  A timeout may be provided giving the maximum time to
        wait.

        """
        endtime = None
        waittime = timeout
        result = predicate()
        while not result:
            if waittime is not None:
                if endtime is None:
                    endtime = _time() + waittime
                else:
                    waittime = endtime - _time()
                    if waittime <= 0:
                        break
            self.wait(waittime)
            result = predicate()
        return result
项目:backend.ai-client-py    作者:lablup    | 项目源码 | 文件源码
def run_create_kernel(_idx):
    begin = time.monotonic()
    try:
        k = Kernel.get_or_create('python3')
        ret = k.kernel_id
    except:
        log.exception('run_create_kernel')
        ret = None
    finally:
        end = time.monotonic()
    t = end - begin
    return t, ret
项目:backend.ai-client-py    作者:lablup    | 项目源码 | 文件源码
def run_execute_code(kid):
    if kid is not None:
        begin = time.monotonic()
        console = []
        run_id = token_hex(8)
        while True:
            result = Kernel(kid).execute(run_id, sample_code)
            console.extend(result['console'])
            if result['status'] == 'finished':
                break
        stdout = ''.join(rec[1] for rec in console if rec[0] == 'stdout')
        end = time.monotonic()
        print(stdout)
        return end - begin
    return None
项目:backend.ai-client-py    作者:lablup    | 项目源码 | 文件源码
def run_restart_kernel(kid):
    # 2nd params is currently ignored.
    if kid is not None:
        begin = time.monotonic()
        Kernel(kid).restart()
        end = time.monotonic()
        return end - begin
    return None
项目:backend.ai-client-py    作者:lablup    | 项目源码 | 文件源码
def run_destroy_kernel(kid):
    if kid is not None:
        begin = time.monotonic()
        Kernel(kid).destroy()
        end = time.monotonic()
        return end - begin
    return None
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def time(self):
        """Returns the current time according to the `IOLoop`'s clock.

        The return value is a floating-point number relative to an
        unspecified time in the past.

        By default, the `IOLoop`'s time function is `time.time`.  However,
        it may be configured to use e.g. `time.monotonic` instead.
        Calls to `add_timeout` that pass a number instead of a
        `datetime.timedelta` should use this function to compute the
        appropriate time, so they can work no matter what time function
        is chosen.
        """
        return time.time()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def time(self):
        """Returns the current time according to the `IOLoop`'s clock.

        The return value is a floating-point number relative to an
        unspecified time in the past.

        By default, the `IOLoop`'s time function is `time.time`.  However,
        it may be configured to use e.g. `time.monotonic` instead.
        Calls to `add_timeout` that pass a number instead of a
        `datetime.timedelta` should use this function to compute the
        appropriate time, so they can work no matter what time function
        is chosen.
        """
        return time.time()
项目:uchroma    作者:cyanogen    | 项目源码 | 文件源码
def __enter__(self):
        self._tick_start = time.monotonic()
        return self
项目:uchroma    作者:cyanogen    | 项目源码 | 文件源码
def __exit__(self, *args):
        next_tick = time.monotonic() - self._tick_start

        if next_tick > self._interval:
            next_tick = next_tick % self._interval
        else:
            self._next_tick = self._interval - next_tick
项目:sublime-text-3-packages    作者:nickjj    | 项目源码 | 文件源码
def hit(self, view):
        """Add a lint request to the queue, return the time at which the request was enqueued."""
        timestamp = time.monotonic()
        self.q.put((view.id(), timestamp, self.get_delay(view)))
        return timestamp