Python time 模块,clock_gettime() 实例源码

我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用time.clock_gettime()

项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_clock_monotonic(self):
        a = time.clock_gettime(time.CLOCK_MONOTONIC)
        b = time.clock_gettime(time.CLOCK_MONOTONIC)
        self.assertLessEqual(a, b)
项目:pypy-std-ssl    作者:pypy    | 项目源码 | 文件源码
def _monotonic_clock():
        return time.clock_gettime(time.CLOCK_MONOTONIC)
项目:ddmbot    作者:Budovi    | 项目源码 | 文件源码
def run(self):
        loops = 0  # loop counter
        input_not_ready = False  # to control log spam
        data_requested = self._frame_len  # to keep the alignment intact

        # capture the starting time
        start_time = time.clock_gettime(time.CLOCK_MONOTONIC_RAW)
        while not self._end.is_set():
            # increment loop counter
            loops += 1

            # try to read a frame from the input -- should be there all the time
            try:
                data = os.read(self._pipe_fd, data_requested)
                # so we apparently got some data, clear the flag and calculate things
                input_not_ready = False
                data_len = len(data)
                if data_len != 0 and data_len != self._frame_len:
                    log.warning('AacProcessor: Got partial buffer of size {}'.format(data_len))

                # call the callback
                self._play(data)

                # calculate requested size for the next iteration
                data_requested -= data_len
                if data_requested == 0:
                    data_requested = self._frame_len
            except OSError as e:
                if e.errno == errno.EAGAIN:
                    # prevent spamming the log with megabytes of text
                    if not input_not_ready:
                        log.error('AacProcessor: Buffer not ready')
                        input_not_ready = True
                else:
                    raise

            # calculate next transmission time
            next_time = start_time + self._frame_period * loops
            sleep_time = max(0, self._frame_period + (next_time - time.clock_gettime(time.CLOCK_MONOTONIC_RAW)))
            time.sleep(sleep_time)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_clock_realtime(self):
        time.clock_gettime(time.CLOCK_REALTIME)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_clock_monotonic(self):
        a = time.clock_gettime(time.CLOCK_MONOTONIC)
        b = time.clock_gettime(time.CLOCK_MONOTONIC)
        self.assertLessEqual(a, b)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_clock_settime(self):
        t = time.clock_gettime(time.CLOCK_REALTIME)
        try:
            time.clock_settime(time.CLOCK_REALTIME, t)
        except PermissionError:
            pass

        if hasattr(time, 'CLOCK_MONOTONIC'):
            self.assertRaises(OSError,
                              time.clock_settime, time.CLOCK_MONOTONIC, 0)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_monotonic_settime(self):
        t1 = time.monotonic()
        realtime = time.clock_gettime(time.CLOCK_REALTIME)
        # jump backward with an offset of 1 hour
        try:
            time.clock_settime(time.CLOCK_REALTIME, realtime - 3600)
        except PermissionError as err:
            self.skipTest(err)
        t2 = time.monotonic()
        time.clock_settime(time.CLOCK_REALTIME, realtime)
        # monotonic must not be affected by system clock updates
        self.assertGreaterEqual(t2, t1)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_clock_realtime(self):
        time.clock_gettime(time.CLOCK_REALTIME)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_clock_settime(self):
        t = time.clock_gettime(time.CLOCK_REALTIME)
        try:
            time.clock_settime(time.CLOCK_REALTIME, t)
        except PermissionError:
            pass

        if hasattr(time, 'CLOCK_MONOTONIC'):
            self.assertRaises(OSError,
                              time.clock_settime, time.CLOCK_MONOTONIC, 0)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_monotonic_settime(self):
        t1 = time.monotonic()
        realtime = time.clock_gettime(time.CLOCK_REALTIME)
        # jump backward with an offset of 1 hour
        try:
            time.clock_settime(time.CLOCK_REALTIME, realtime - 3600)
        except PermissionError as err:
            self.skipTest(err)
        t2 = time.monotonic()
        time.clock_settime(time.CLOCK_REALTIME, realtime)
        # monotonic must not be affected by system clock updates
        self.assertGreaterEqual(t2, t1)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_clock_realtime(self):
        time.clock_gettime(time.CLOCK_REALTIME)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_clock_monotonic(self):
        a = time.clock_gettime(time.CLOCK_MONOTONIC)
        b = time.clock_gettime(time.CLOCK_MONOTONIC)
        self.assertLessEqual(a, b)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_clock_settime(self):
        t = time.clock_gettime(time.CLOCK_REALTIME)
        try:
            time.clock_settime(time.CLOCK_REALTIME, t)
        except PermissionError:
            pass

        if hasattr(time, 'CLOCK_MONOTONIC'):
            self.assertRaises(OSError,
                              time.clock_settime, time.CLOCK_MONOTONIC, 0)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_monotonic_settime(self):
        t1 = time.monotonic()
        realtime = time.clock_gettime(time.CLOCK_REALTIME)
        # jump backward with an offset of 1 hour
        try:
            time.clock_settime(time.CLOCK_REALTIME, realtime - 3600)
        except PermissionError as err:
            self.skipTest(err)
        t2 = time.monotonic()
        time.clock_settime(time.CLOCK_REALTIME, realtime)
        # monotonic must not be affected by system clock updates
        self.assertGreaterEqual(t2, t1)
项目:barpyrus    作者:t-wissmann    | 项目源码 | 文件源码
def main_loop(bar, inputs = None):
    # TODO: remove eventinputs again?
    #inputs += bar.widget.eventinputs()
    if inputs == None:
        inputs = global_inputs

    global_update = True
    def signal_quit(signal, frame):
        quit_main_loop()
    signal.signal(signal.SIGINT, signal_quit)
    signal.signal(signal.SIGTERM, signal_quit)

    # main loop
    while not core.shutdown_requested() and bar.is_running():
        now = time.clock_gettime(time.CLOCK_MONOTONIC)
        if bar.widget.maybe_timeout(now):
            global_update = True
        data_ready = []
        if global_update:
            painter = bar.painter()
            painter.widget(bar.widget)
            data_ready = select.select(inputs,[],[], 0.00)[0]
            if not data_ready:
                #print("REDRAW: " + str(time.clock_gettime(time.CLOCK_MONOTONIC)))
                painter.flush()
                global_update = False
            else:
                pass
                #print("more data already ready")
        if not data_ready:
            # wait for new data
            next_timeout = now + 360 # wait for at most one hour until the next bar update
            to = bar.widget.next_timeout()
            if to != None:
                next_timeout = min(next_timeout, to)
            now = time.clock_gettime(time.CLOCK_MONOTONIC)
            next_timeout -= now
            next_timeout = max(next_timeout,0.1)
            #print("next timeout = " + str(next_timeout))
            data_ready = select.select(inputs,[],[], next_timeout)[0]
            if core.shutdown_requested():
                break
        if not data_ready:
            pass #print('timeout!')
        else:
            for x in data_ready:
                x.process()
                global_update = True
    bar.proc.kill()
    for i in inputs:
        i.kill()
    bar.proc.wait()
项目:django-workload    作者:Instagram    | 项目源码 | 文件源码
def memory_cpu_stats_middleware(get_response):
    import time
    import psutil

    from collections import Counter
    from django_statsd.clients import statsd
    from .global_request import get_view_name
    from django.conf import settings

    mem_entries = (
        'rss',
        'shared_clean', 'shared_dirty',
        'private_clean', 'private_dirty'
    )

    def summed(info):
        res = dict.fromkeys(mem_entries, 0)
        for path_info in info:
            for name in mem_entries:
                res[name] += getattr(path_info, name)
        return res

    def middleware(request):
        global SAMPLE_COUNT

        SAMPLE_COUNT += 1
        if SAMPLE_COUNT >= settings.SAMPLE_RATE:
            SAMPLE_COUNT = 0
            cpu_before = time.clock_gettime(time.CLOCK_PROCESS_CPUTIME_ID)
            mem_before = summed(psutil.Process().memory_maps())
            try:
                return get_response(request)
            finally:
                cpu_after = time.clock_gettime(time.CLOCK_PROCESS_CPUTIME_ID)
                statsd.gauge(
                    'cpu.{}'.format(get_view_name()),
                    cpu_after - cpu_before)
                mem_after = summed(psutil.Process().memory_maps())
                mem_key_base = 'memory.{}.{{}}'.format(get_view_name())
                for name, after in mem_after.items():
                    diff = after - mem_before[name]
                    statsd.gauge(mem_key_base.format(name) + '.total', after)
                    statsd.gauge(mem_key_base.format(name) + '.change', diff)
        else:
            return get_response(request)

    return middleware