Python time 模块,ticks_diff() 实例源码

我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用time.ticks_diff()

项目:templogger    作者:aequitas    | 项目源码 | 文件源码
def templog(sleep=True):
    """Log voltage and temperature to MQTT."""
    start = time.ticks_ms()

    # get sensor values
    values = read_voltage()
    values.update(read_temps())
    print(values)

    # send values over MQTT if connected
    if wait_connect():
        if not mqtt_send(values):
            machine.reset()
    else:
        # failed to connect, reboot
        machine.reset()

    if sleep:
        delta = time.ticks_diff(start, time.ticks_ms())
        deepsleep(delta)
项目:esp    作者:mhoffma    | 项目源码 | 文件源码
def __init__(self,pno=4,bsz=1024,dbgport=None):
        self.BUFSZ=int(pow(2, math.ceil(math.log(bsz)/math.log(2))))
        self.MASK=self.BUFSZ-1
        self.pin=pno
        self.ir=Pin(pno,Pin.IN)
        self.buf=array.array('i',0 for i in range(self.BUFSZ))
        self.ledge=0
        self.wptr=0
        self.rptr=0
        self.dbgport=dbgport
        if dbgport is None:
            self.dbgport=DummyPort()
        self.dbgport.low()
        # cache ticks functions for native call
        self.ticks_diff = time.ticks_diff
        self.ticks_us = time.ticks_us
        self.start()
项目:illuminOS    作者:idimitrakopoulos    | 项目源码 | 文件源码
def timed_function(f, *args, **kwargs):
    import time
    myname = str(f).split(' ')[1]
    def new_func(*args, **kwargs):
        t = time.ticks_us()
        result = f(*args, **kwargs)
        delta = time.ticks_diff(t, time.ticks_us())
        log.debug('GC: {} Function {} Time = {:6.3f}ms'.format(str(gc.mem_free()), myname, delta/1000))
        return result

    return new_func

# @timed_function
项目:uPyLoader    作者:BetaRavener    | 项目源码 | 文件源码
def _read_timeout(cnt, timeout_ms=2000):
    time_support = "ticks_ms" in dir(time)
    s_time = time.ticks_ms() if time_support else 0
    data = sys.stdin.read(cnt)
    if len(data) != cnt or (time_support and time.ticks_diff(time.ticks_ms(), s_time) > timeout_ms):
        return None
    return data
项目:uPyLoader    作者:BetaRavener    | 项目源码 | 文件源码
def _read_timeout(cnt, timeout_ms=2000):
    time_support = "ticks_ms" in dir(time)
    s_time = time.ticks_ms() if time_support else 0
    data = sys.stdin.read(cnt)
    if len(data) != cnt or (time_support and time.ticks_diff(time.ticks_ms(), s_time) > timeout_ms):
        return None
    return data
项目:blynk-library-python    作者:vshymanskyy    | 项目源码 | 文件源码
def sleep_from_until (start, delay):
    while time.ticks_diff(start, time.ticks_ms()) < delay:
        idle_func()
    return start + delay
项目:micropython-share    作者:pacmac    | 项目源码 | 文件源码
def sleep_from_until (start, delay):
  ## while time.ticks_diff(start, time.ticks_ms()) < delay:
  while time.ticks_diff(time.ticks_ms(),start) < delay:
    machine.idle()
  return start + delay
项目:ulnoiot    作者:ulno    | 项目源码 | 文件源码
def _update(self):
        if self.turn_start is not None: # turn in process
            current=time.ticks_ms()
            if time.ticks_diff(current, self.turn_start) >= self.turn_time_ms:
                if len(self.angle_list) > 0:
                    self._trigger_next_turn()
                else:
                    self._release()
项目:ulnoiot    作者:ulno    | 项目源码 | 文件源码
def _publish_status(device_list=None, ignore_time=False):
    global _client, _last_publish

    if device_list is None:
        device_list = _devlist.values()

    current_time = time.ticks_us()
    if ignore_time or \
                    time.ticks_diff(current_time,
                                    _last_publish) >= MIN_PUBLISH_TIME_US:
        _last_publish = current_time
        try:
            for d in device_list:
                v = d.value()
                if v is not None:
                    rt = (_topic + "/" + d.name).encode()
                    for s in d.getters:
                        if s == "":
                            t = rt
                        else:
                            t = rt + "/" + s.encode()
                        my_value = d.getters[s]()
                        print('Publishing', t, my_value)
                        if type(my_value) is bytes or type(
                                my_value) is bytearray:
                            _client.publish(t, my_value)
                        else:
                            _client.publish(t, str(my_value).encode())
        except Exception as e:
            print('Trouble publishing, re-init network.')
            print(e)
            _init_mqtt()


# ======= Setup and execution
项目:ulnoiot    作者:ulno    | 项目源码 | 文件源码
def run(updates=5, sleepms=1, poll_rate_inputs=4, poll_rate_network=10):
    # updates: send out a status every this amount of seconds.
    #          If 0, never send time based status updates only when change happened.
    # sleepms: going o sleep for how long between each loop run
    # poll_rate_network: how often to evaluate incoming network commands
    # poll_rate_inputs: how often to evaluate inputs
    _init_mqtt()
    if updates == 0:
        overflow = 1000
    else:
        overflow = updates * 1000
    overflow *= poll_rate_network * poll_rate_inputs / sleepms
    overflow = int(overflow)
    #    print("Overflow:",overflow)
    counter = 0

    while True:
        if counter % poll_rate_network == 0:
            _poll_subscripton()
        if counter % poll_rate_inputs == 0:
            device_list = []
            for d in _devlist.values():
                if d.update():
                    device_list.append(d)
            if len(device_list) > 0:
                _publish_status(device_list)
        if updates != 0 and counter % (updates * 1000) == 0:
            print("Publishing full update.")
            _publish_status(ignore_time=True)
        # execute things on timestack
        now = time.ticks_ms()
        for id in list(_timestack):
            t, cb = _timestack[id]
            if time.ticks_diff(now, t) >= 0:
                del (_timestack[id])
                cb(id)

        time.sleep_ms(sleepms)
        counter += 1
        if counter >= overflow:
            counter = 0
项目:ulnoiot    作者:ulno    | 项目源码 | 文件源码
def _update(self):
        t = time.ticks_ms()
        #if self.suspend_start is not None \
        #        and time.ticks_diff(t,self.suspend_start) <= self.suspend_time:
        #    print("Bus access suspended.")
        if self.suspend_start is None \
            or time.ticks_diff(t,self.suspend_start) > self.suspend_time:
            self.suspend_start = None  # done waiting
            try:
                if self.msgq is not None:
                    self.pin.writeto(self.addr, self.msgq)
                    self.msgq = None
                s = self.pin.readfrom(self.addr,self.BUFFER_SIZE)
            except:
                print("Trouble accessing i2c.")
            else:
                if s[0]==255 and s[1]==255: # illegal read
                    print("Got garbage, waiting 1s before accessing bus again.")
                    print("data:", s)
                    self.suspend_time=1000 # take a break
                    self.suspend_start = time.ticks_ms();
                else:
                    count = s[0]*256+s[1]
                    if self.count != count:
                        l = s[2];
                        self.suspend_time = s[3];
                        # scale timer
                        if self.suspend_time & 128:
                            self.suspend_time = (self.suspend_time & 127) * 100
                            if self.suspend_time>5000: # should not happen
                                print("Garbage time -> waiting 1s before accessing bus again.")
                                print("data:",s)
                                self.suspend_time=1000
                        if self.suspend_time > 0:
                            self.suspend_start = time.ticks_ms();
                            print("Bus suspend for",self.suspend_time,"ms requested.")
                        if l <= self.BUFFER_SIZE: # discard if too big
                            self.count = count
                            self.current_value = s[4:4+l]
项目:ulnoiot    作者:ulno    | 项目源码 | 文件源码
def _update(self):
        if ticks_diff(ticks_ms(), self._last_measured) > self.INTERVAL:
            value = self._measure()
            if abs(value - self._distance) >= self.precision:
                self._distance = value
            self._last_measured = ticks_ms()
项目:ulnoiot    作者:ulno    | 项目源码 | 文件源码
def flush(self):
        t = time.ticks_ms()
        if self.out_fill == 0: # reset time, if there is nothing to send
            self.out_last_sent = t
            # debug dp.println("rt {}".format(t))
        elif time.ticks_diff(t, self.out_last_sent) > self.INTERVAL:
            # debug           dp.println("t {},{},{}".format(time.ticks_diff(t,self.out_last_sent),
            #                                           t,self.out_last_sent))
            self.acquire_out_buffer()
            self._send()
            self.release_out_buffer()
项目:ulnoiot    作者:ulno    | 项目源码 | 文件源码
def ticks_diff(a, b): return a - b
项目:ulnoiot    作者:ulno    | 项目源码 | 文件源码
def receive(self, request=0, timeoutms=None):
        # receive into network buffer,
        # fill buffer once and decrypt
        # if request>0 wait blocking for request number of bytes (if timeout
        # given interrupt after timeoutms ms)
        # timeout==None: block
        # timeout==0: return immediately after trying to read something from
        #             network buffer
        # timeout>0: try for time specified to read something before returning
        #            this function always returns a pointer to the buffer and
        #            number of bytes read (could be 0)
        data = self.netbuf_in
        data_mv = self.netbuf_in_mv
        readbytes = 0
        start_t = ticks_ms()
        while readbytes < self.netbuf_size:
            try:
                if self.sock_read(data_mv[readbytes:readbytes+1]):
                    readbytes += 1 # result was not 0 or none
                else:
                    if readbytes >= request:
                        break  # break if not blocking to request size
            except OSError as e:
                if len(e.args) > 0 \
                        and (e.args[0] == errno.EAGAIN
                        or e.args[0] == errno.ETIMEDOUT):
                    if readbytes >= request:
                        break  # break if not blocking to request size
                else:
                    raise
            if timeoutms is not None \
                    and ticks_diff(ticks_ms(), start_t) >= timeoutms:
                break
            sleep_ms(1) # prevent busy waiting
        if readbytes>0: self.crypt_in.decrypt(data,length=readbytes)
        return data,readbytes
项目:py-mpu6050    作者:larsks    | 项目源码 | 文件源码
def isr(self, pin):
        # debounce
        if time.ticks_diff(time.ticks_ms(), self.last_isr) < 10:
            return

        print('! reset gyro request')
        self.flag_reset_gyro = True
        self.last_isr = time.ticks_ms()
项目:py-mpu6050    作者:larsks    | 项目源码 | 文件源码
def serve(self):
        print('starting mpu server on port {}'.format(self.port))

        lastgc = lastsent = lastread = time.ticks_ms()

        while True:
            now = time.ticks_ms()
            write_dt = time.ticks_diff(now, lastsent)
            read_dt = time.ticks_diff(now, lastread)
            gc_dt = time.ticks_diff(now, lastgc)

            time.sleep_ms(max(0, 1-read_dt))

            if self.flag_reset_gyro:
                self.mpu.filter.reset_gyro()
                self.flag_reset_gyro = False

            values = self.mpu.read_position()
            lastread = now

            if write_dt >= self.write_interval:
                lastsent = time.ticks_ms()
                self.sock.sendto(tojson(values), ('192.168.4.2', 8000))

            if gc_dt >= self.gc_interval:
                gc.collect()
项目:py-mpu6050    作者:larsks    | 项目源码 | 文件源码
def input(self, vals):
        now = time.ticks_ms()

        # unpack sensor readings
        accel_data = vals[0:3]
        gyro_data = vals[4:7]

        # convert accelerometer reading to degrees
        self.accel_pos = self.calculate_accel_pos(*accel_data)

        # if this is our first chunk of data, simply accept
        # the accelerometer reads and move on.
        if self.last == 0:
            self.filter_pos = self.gyro_pos = self.accel_pos
            self.last = now
            return

        # calculate the elapsed time (in seconds) since last data.
        # we need this because the gyroscope measures movement in
        # degrees/second.
        dt = time.ticks_diff(now, self.last)/1000
        self.last = now

        # calculate change in position from gyroscope readings
        gyro_delta = [i * dt for i in gyro_data]
        self.gyro_pos = [i + j for i, j in zip(self.gyro_pos, gyro_delta)]

        # pitch
        self.filter_pos[0] = (
            self.gyro_weight * (self.filter_pos[0] + gyro_delta[0])
            + (1-self.gyro_weight) * self.accel_pos[0])

        # roll
        self.filter_pos[1] = (
            self.gyro_weight * (self.filter_pos[1] + gyro_delta[1])
            + (1-self.gyro_weight) * self.accel_pos[1])
项目:MASUGUX    作者:DjamesSuhanko    | 项目源码 | 文件源码
def waiting(self):
        acum = time.ticks_ms()
        delta = time.ticks_diff(time.ticks_ms(),acum)
        while delta < 5000:
            delta = time.ticks_diff(time.ticks_ms(), acum)
            time.sleep_ms(5)

        self.DO_SLEEP = True
        print("Ordem para dormir")
项目:esp    作者:mhoffma    | 项目源码 | 文件源码
def timeseq_isr(self,p):
        '''compute edge to edge transition time, ignore polarity'''
        e=self.ticks_us()
        self.dbgport.high()
        s=2*p.value()-1
        d=self.ticks_diff(e,self.ledge)
        self.ledge=e
        self.buf[self.wptr]=d*s
        self.wptr=(self.wptr+1)&self.MASK
        self.dbgport.low()
项目:esp    作者:mhoffma    | 项目源码 | 文件源码
def __call__(self,pin):
        self.stop=time.ticks_us()
        self.flight=time.ticks_diff(self.stop,self.start)