Python time 模块,sleep_ms() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用time.sleep_ms()

项目:uPython-esp8266-httpserver    作者:ernitron    | 项目源码 | 文件源码
def do_connect(ssid, pwd, TYPE, hard_reset=True):
    interface = network.WLAN(TYPE)

    # Stage zero if credential are null disconnect
    if not pwd or not ssid :
        print('Disconnecting ', TYPE)
        interface.active(False)
        return None

    if TYPE == network.AP_IF:
        interface.active(True)
        time.sleep_ms(200)
        interface.config(essid=ssid, password=pwd)
        return interface

    if hard_reset:
        interface.active(True)
        interface.connect(ssid, pwd)

    # Stage one check for default connection
    print('Connecting')
    for t in range(120):
        time.sleep_ms(250)
        if interface.isconnected():
            print('Yes! Connected')
            return interface
        if t == 60 and not hard_reset:
            # if still not connected
            interface.active(True)
            interface.connect(ssid, pwd)

    # No way we are not connected
    print('Cant connect', ssid)
    return None

#----------------------------------------------------------------
# MAIN PROGRAM STARTS HERE
项目:micropython-deauth    作者:PakchoiFood    | 项目源码 | 文件源码
def deauth(_ap,_client,type,reason):
    # 0 - 1   type, subtype c0: deauth (a0: disassociate)
    # 2 - 3   duration (SDK takes care of that)
    # 4 - 9   reciever (target)
    # 10 - 15 source (ap)
    # 16 - 21 BSSID (ap)
    # 22 - 23 fragment & squence number
    # 24 - 25 reason code (1 = unspecified reason)
    packet=bytearray([0xC0,0x00,0x00,0x00,0xBB,0xBB,0xBB,0xBB,0xBB,0xBB,0xCC, 0xCC, 0xCC, 0xCC, 0xCC, 0xCC,0xCC, 0xCC, 0xCC, 0xCC, 0xCC, 0xCC,0x00, 0x00,0x01, 0x00])
    for i in range(0,6):
        packet[4 + i] =_client[i]
        packet[10 + i] = packet[16 + i] =_ap[i]
    #set type
    packet[0] = type;
    packet[24] = reason
    result=sta_if.send_pkt_freedom(packet)
    if result==0:
        time.sleep_ms(1)
        return True
    else:
        return False
项目:micropython    作者:stlk    | 项目源码 | 文件源码
def read_temp():
    # the device is on GPIO12
    dat = machine.Pin(5)

    # create the onewire object
    ds = ds18x20.DS18X20(onewire.OneWire(dat))

    # scan for devices on the bus
    roms = ds.scan()
    # print('found devices:', roms)
    # print('temperature:', end=' ')
    ds.convert_temp()
    time.sleep_ms(750)
    temp = ds.read_temp(roms[0])
    # print(temp)
    return temp
项目:micropython-adafruit-tcs34725    作者:adafruit    | 项目源码 | 文件源码
def active(self, value=None):
        if value is None:
            return self._active
        value = bool(value)
        if self._active == value:
            return
        self._active = value
        enable = self._register8(_REGISTER_ENABLE)
        if value:
            self._register8(_REGISTER_ENABLE, enable | _ENABLE_PON)
            time.sleep_ms(3)
            self._register8(_REGISTER_ENABLE,
                enable | _ENABLE_PON | _ENABLE_AEN)
        else:
            self._register8(_REGISTER_ENABLE,
                enable & ~(_ENABLE_PON | _ENABLE_AEN))
项目:ulnoiot    作者:ulno    | 项目源码 | 文件源码
def loop():
    show_temp=True
    while True:
        g=get()
        display.fill(0)
        display.text("Weather in",0,0)
        display.text(g['name'],0,8)
        if left_button.value()==0 or right_button.value()==0:
            show_temp=not show_temp
        if show_temp:
            celsius=g['main']['temp']-273.15
            display.text("Temp/C: %.1f"%celsius,0,24)
            display.text("Temp/F: %.1f"%(celsius*1.8+32),0,32)
        else: # humidity
            display.text("Humidity: %d%%"%g['main']['humidity'],0,24)
        if lower_button.value()==0:
            break
        display.show()
        time.sleep_ms(500)
项目:uPython-ESP8266-01-umqtt    作者:phieber    | 项目源码 | 文件源码
def __init__(self, i2c, i2c_addr, num_lines, num_columns):
        self.i2c = i2c
        self.i2c_addr = i2c_addr
        self.i2c.writeto(self.i2c_addr, bytes([0]))
        sleep_ms(20)   # Allow LCD time to powerup
        # Send reset 3 times
        self.hal_write_init_nibble(self.LCD_FUNCTION_RESET)
        sleep_ms(5)    # need to delay at least 4.1 msec
        self.hal_write_init_nibble(self.LCD_FUNCTION_RESET)
        sleep_ms(1)
        self.hal_write_init_nibble(self.LCD_FUNCTION_RESET)
        sleep_ms(1)
        # Put LCD into 4 bit mode
        self.hal_write_init_nibble(self.LCD_FUNCTION)
        sleep_ms(1)
        LcdApi.__init__(self, num_lines, num_columns)
        cmd = self.LCD_FUNCTION
        if num_lines > 1:
            cmd |= self.LCD_FUNCTION_2LINES
        self.hal_write_command(cmd)
项目:uPython-ESP8266-01-umqtt    作者:phieber    | 项目源码 | 文件源码
def hal_write_command(self, cmd):
        """Writes a command to the LCD.

        Data is latched on the falling edge of E.
        """
        byte=((self.backlight << SHIFT_BACKLIGHT) |
                (((cmd >> 4) & 0x0f) << SHIFT_DATA))
        self.i2c.writeto(self.i2c_addr, bytes([byte | MASK_E]))
        self.i2c.writeto(self.i2c_addr, bytes([byte]))
        byte=((self.backlight << SHIFT_BACKLIGHT) |
                ((cmd & 0x0f) << SHIFT_DATA))
        self.i2c.writeto(self.i2c_addr, bytes([byte | MASK_E]))
        self.i2c.writeto(self.i2c_addr, bytes([byte]))
        if cmd <= 3:
            # The home and clear commands require a worst
            # case sleep_ms of 4.1 msec
            sleep_ms(5)
项目:uPython-ESP8266-01-umqtt    作者:phieber    | 项目源码 | 文件源码
def readDS18x20():
    from machine import Pin
    import onewire, ds18x20

    OneWirePin = 0

    # the device is on GPIO12
    dat = Pin(OneWirePin)
    # create the onewire object
    ds = ds18x20.DS18X20(onewire.OneWire(dat))
    # scan for devices on the bus
    roms = ds.scan()
    ds.convert_temp()
    time.sleep_ms(750)
    values = []

    for rom in roms:
        values.append(ds.read_temp(rom))

    print(values)
    return values
项目:py-mpu6050    作者:larsks    | 项目源码 | 文件源码
def get_sensor_avg(self, samples, softstart=100):
        '''Return the average readings from the sensors over the
        given number of samples.  Discard the first softstart
        samples to give things time to settle.'''
        sample = self.read_sensors()
        counters = [0] * 7

        for i in range(samples + softstart):
            # the sleep here is to ensure we read a new sample
            # each time
            time.sleep_ms(2)

            sample = self.read_sensors()
            if i < softstart:
                continue

            for j, val in enumerate(sample):
                counters[j] += val

        return [x//samples for x in counters]
项目:micropython-am2320    作者:mcauser    | 项目源码 | 文件源码
def measure(self):
        buf = self.buf
        address = self.address
        # wake sensor
        try:
            self.i2c.writeto(address, b'')
        except OSError:
            pass
        # read 4 registers starting at offset 0x00
        self.i2c.writeto(address, b'\x03\x00\x04')
        # wait at least 1.5ms
        time.sleep_ms(2)
        # read data
        self.i2c.readfrom_mem_into(address, 0, buf)
        crc = ustruct.unpack('<H', bytearray(buf[-2:]))[0]
        if (crc != self.crc16(buf[:-2])):
            raise Exception("checksum error")
项目:MicroPython-ESP8266-DHT-Nokia-5110    作者:mcauser    | 项目源码 | 文件源码
def measure(self):
        buf = self.buf
        address = self.address
        # wake sensor
        try:
            self.i2c.writeto(address, b'')
        except OSError:
            pass
        # read 4 registers starting at offset 0x00
        self.i2c.writeto(address, b'\x03\x00\x04')
        # wait at least 1.5ms
        time.sleep_ms(2)
        # read data
        self.i2c.readfrom_mem_into(address, 0, buf)
        # debug print
        print(ustruct.unpack('BBBBBBBB', buf))
        crc = ustruct.unpack('<H', bytearray(buf[-2:]))[0]
        if (crc != self.crc16(buf[:-2])):
            raise Exception("checksum error")
项目:esp8266-upy    作者:mchobby    | 项目源码 | 文件源码
def measure(self):
        self.wakeup()

        # request measure 
        wbuf = bytearray( 3 )
        wbuf[0] = AM2315_READREG
        wbuf[1] = 0x00 # Start at adress 0x00
        wbuf[2] = 0x04 # Request 4 byte data

        self.i2c.writeto( self.addr, wbuf ) # b'\x03\x00\x04' )

        # wait 1.5+ ms before reading
        time.sleep_ms( 2 )

        # Request 8 bytes from sensor
        #rbuf = bytearray( 8 )
        self.i2c.readfrom_mem_into(self.addr, 0, self.rbuf) # Read from Reg 0

        return self.check_response()
项目:MASUGUX    作者:DjamesSuhanko    | 项目源码 | 文件源码
def status(self):
        ow = onewire.OneWire(Pin(2))
        ds = ds18x20.DS18X20(ow)
        roms = ds.scan()

        if not len(roms):
            print("Nao encontrei um dispositivo onewire.")
            return None

        ds.convert_temp()
        time.sleep_ms(750)

        temp = 0
        for i in range(10):
            for rom in roms:
                temp += ds.read_temp(rom)
        return temp / 10

    # envia a temperatura para o broker
项目:pyCom    作者:dinosd    | 项目源码 | 文件源码
def _init__(self, cspin):
        self.cspin = Pin(cspin, mode=Pin.OUT)
        self.cspin.value(1)
        self.x = 0
        self.y = 0
        self.z = 0
        self.t = 0
        self.spi = SPI(0, mode=SPI.MASTER, baudrate=1000000, polarity=0, phase=0, firstbit=SPI.MSB)
        time.sleep_ms(1000)
        self.SPIwriteOneRegister(0x1F, 0x52)  #RESET
        time.sleep_ms(10)
        self.DEVICEID = self.SPIreadOneRegister(0x00)
        self.DEVID_MST = self.SPIreadOneRegister(0x01)
        self.PART_ID = self.SPIreadOneRegister(0x02)
        self.REV_ID = self.SPIreadOneRegister(0x03)
        self.STATUS = self.getStatus()
项目:pyCom    作者:dinosd    | 项目源码 | 文件源码
def __init__(self, csPIN, ISRPin, ActivityThreshold=70, ActivityTime=5, InactivityThreshold=1000, InactivityTime=5):
        self.steps = 0
        ADXL362.__init__(self, csPIN)
        self.isrpin = Pin(ISRPin, mode=Pin.IN)
        self.isrpin.callback(trigger=Pin.IRQ_RISING, handler=self._isr_handler)

        self.setupDCActivityInterrupt(ActivityThreshold, ActivityTime)
        self.setupDCInactivityInterrupt(InactivityThreshold, InactivityTime)

        #ACTIVITY/INACTIVITY CONTROL REGISTER
        self.SPIwriteOneRegister(0x27, 0B111111)  #B111111  Link/Loop:11 (Loop Mode), Inactive Ref Mode:1,Inactive Enable (under threshold):1,Active Ref Mode:1, Active Enable (over threshold):1

        #MAP INTERRUPT 1 REGISTER.
        self.SPIwriteOneRegister(0x2A,0B10010000) #B00010000 0=HIGH on ISR,AWAKE:0,INACT:0,ACT:1,FIFO_OVERRUN:0,FIFO_WATERMARK:0,FIFO_READY:0,DATA_READY:0

        #POWER CONTROL REGISTER
        self.SPIwriteOneRegister(0x2D, 0B1110)    #B1011 Wake-up:1,Autosleep:0,measure:11=Reserved (Maybe works better)
        self.beginMeasure()
        time.sleep_ms(100)
项目:esp8266sketches    作者:lekum    | 项目源码 | 文件源码
def send_cmd(self, cmd_request, response_size=6, read_delay_ms=100):
        """
        Send a command to the sensor and read (optionally) the response
        The responsed data is validated by CRC
        """
        try:
            self.i2c.start(); 
            self.i2c.writeto(self.i2c_addr, cmd_request); 
            if not response_size:
                self.i2c.stop();    
                return
            time.sleep_ms(read_delay_ms)
            data = self.i2c.readfrom(self.i2c_addr, response_size) 
            self.i2c.stop(); 
            for i in range(response_size//3):
                if not self._check_crc(data[i*3:(i+1)*3]): # pos 2 and 5 are CRC
                    raise SHT30Error(SHT30Error.CRC_ERROR)
            if data == bytearray(response_size):
                raise SHT30Error(SHT30Error.DATA_ERROR)
            return data
        except OSError as ex:
            if 'I2C' in ex.args[0]:
                raise SHT30Error(SHT30Error.BUS_ERROR)
            raise ex
项目:thunderstorm    作者:rnovacek    | 项目源码 | 文件源码
def run(func):
    result = func()
    for wait in result:
        time.sleep_ms(wait)
项目:blynk-library-python    作者:vshymanskyy    | 项目源码 | 文件源码
def _send(self, data, send_anyway=False):
        if self._tx_count < MAX_MSG_PER_SEC or send_anyway:
            retries = 0
            while retries <= MAX_TX_RETRIES:
                try:
                    self.conn.send(data)
                    self._tx_count += 1
                    break
                except socket.error as er:
                    if er.args[0] != EAGAIN:
                        raise
                    else:
                        time.sleep_ms(RE_TX_DELAY)
                        retries += 1
项目:micropython-adafruit-ads1015    作者:adafruit    | 项目源码 | 文件源码
def read(self, channel):
        self._write_register(_REGISTER_CONFIG, _CQUE_NONE | _CLAT_NONLAT |
            _CPOL_ACTVLOW | _CMODE_TRAD | _DR_1600SPS | _MODE_SINGLE |
            _OS_SINGLE | _GAINS[self.gain] | _CHANNELS[channel])
        while not self._read_register(_REGISTER_CONFIG) & _OS_NOTBUSY:
            time.sleep_ms(1)
        return self._read_register(_REGISTER_CONVERT)
项目:micropython-adafruit-ads1015    作者:adafruit    | 项目源码 | 文件源码
def diff(self, channel1, channel2):
        self._write_register(_REGISTER_CONFIG, _CQUE_NONE | _CLAT_NONLAT |
            _CPOL_ACTVLOW | _CMODE_TRAD | _DR_1600SPS | _MODE_SINGLE |
            _OS_SINGLE | _GAINS[self.gain] | _DIFFS[(channel1, channel2)])
        while not self._read_register(_REGISTER_CONFIG) & _OS_NOTBUSY:
            time.sleep_ms(1)
        return self._read_register(_REGISTER_CONVERT)
项目:esp32    作者:smeenka    | 项目源码 | 文件源码
def blink(n):
  for i in range(n):
    led.value(0)
    time.sleep_ms(100)
    led.value(1)
    time.sleep_ms(100)


#if machine.reset_cause() == machine.SOFT_RESET:
#    print ("Soft reset, doing nothing")
#    print('Connected!! network config:', wifi.wlan.ifconfig())
项目:micropython-share    作者:pacmac    | 项目源码 | 文件源码
def vpage(self,delay=10):
    show = False
    self.write_cmd(SET_START_LINE | 0)
    for x in range(self.height-1):
      self.write_cmd(SET_START_LINE | x)
      if not show:
        self.show()
        show = True
      time.sleep_ms(delay)
    self.fill(0)
项目:micropython-share    作者:pacmac    | 项目源码 | 文件源码
def reset(self, res):
    res.value(1)
    time.sleep_ms(1)
    res.value(0)
    time.sleep_ms(20)
    res.value(1)
    time.sleep_ms(20)
项目:micropython-share    作者:pacmac    | 项目源码 | 文件源码
def vpage(self,delay=10):
    show = False
    self.write_cmd(SET_START_LINE | 0)
    for x in range(self.height-1):
      self.write_cmd(SET_START_LINE | x)
      if not show:
        self.show()
        show = True
      time.sleep_ms(delay)
    self.fill(0)
项目:micropython-share    作者:pacmac    | 项目源码 | 文件源码
def reset(self, res):
    res.value(1)
    time.sleep_ms(1)
    res.value(0)
    time.sleep_ms(20)
    res.value(1)
    time.sleep_ms(20)
项目:micropython-share    作者:pacmac    | 项目源码 | 文件源码
def _send(self, data, send_anyway=False):
        if self._tx_count < MAX_MSG_PER_SEC or send_anyway:
            retries = 0
            while retries <= MAX_TX_RETRIES:
                try:
                    self.conn.send(data)
                    self._tx_count += 1
                    break
                except socket.error as er:
                    if er.args[0] != EAGAIN:
                        raise
                    else:
                        time.sleep_ms(RE_TX_DELAY)
                        retries += 1
项目:pycom-libraries    作者:pycom    | 项目源码 | 文件源码
def calibrate(self, samples=300):
        max_val = 0
        for _ in range(samples):
            val = self.pin()
            if val > max_val:
                max_val = val
            time.sleep_ms(10)

        self.threshold = max_val * 1.2
项目:pycom-libraries    作者:pycom    | 项目源码 | 文件源码
def __init__(self, i2c=None, sda='P22', scl='P21'):
        if i2c is not None:
            self.i2c = i2c
        else:
            self.i2c = I2C(0, mode=I2C.MASTER, pins=(sda, scl))

        self.sda = sda
        self.scl = scl
        self.clk_cal_factor = 1
        self.reg = bytearray(6)
        self.wake_int = False
        self.wake_int_pin = False
        self.wake_int_pin_rising_edge = True

        try:
            self.read_fw_version()
        except Exception:
            time.sleep_ms(2)
        try:
            # init the ADC for the battery measurements
            self.poke_memory(ANSELC_ADDR, 1 << 2)
            self.poke_memory(ADCON0_ADDR, (0x06 << _ADCON0_CHS_POSN) | _ADCON0_ADON_MASK)
            self.poke_memory(ADCON1_ADDR, (0x06 << _ADCON1_ADCS_POSN))
            # enable the pull-up on RA3
            self.poke_memory(WPUA_ADDR, (1 << 3))
            # make RC5 an input
            self.set_bits_in_memory(TRISC_ADDR, 1 << 5)
            # set RC6 and RC7 as outputs and enable power to the sensors and the GPS
            self.mask_bits_in_memory(TRISC_ADDR, ~(1 << 6))
            self.mask_bits_in_memory(TRISC_ADDR, ~(1 << 7))

            if self.read_fw_version() < 6:
                raise ValueError('Firmware out of date')

        except Exception:
            raise Exception('Board not detected')
项目:pycom-libraries    作者:pycom    | 项目源码 | 文件源码
def activity(self):
        if not self.debounced:
            time.sleep_ms(self.act_dur)
            self.debounced = True
        if self.int_pin():
            return True
        return False
项目:pycom-libraries    作者:pycom    | 项目源码 | 文件源码
def activity(self):
        if not self.debounced:
            time.sleep_ms(self.act_dur)
            self.debounced = True
        if self.int_pin():
            return True
        return False
项目:learn-micropython    作者:daniloqb    | 项目源码 | 文件源码
def pulse(led, delay):
    for i in range(200):
        led.duty(int(math.sin(i/10*math.pi)*500 + 500))
        time.sleep_ms(delay)
项目:learn-micropython    作者:daniloqb    | 项目源码 | 文件源码
def pulse(led, delay):
    for i in range(20):
        led.duty(int(math.sin(i/10*math.pi)*500 + 500))
        time.sleep_ms(delay)
项目:Smart-IoT-Planting-System    作者:Python-IoT    | 项目源码 | 文件源码
def sleep_ms(self, mseconds):
        try:
            time.sleep_ms(mseconds)
        except AttributeError:
            machine.delay(mseconds)
项目:Smart-IoT-Planting-System    作者:Python-IoT    | 项目源码 | 文件源码
def power_off(self):
        self.clear()
        self.command([0x20, 0x08])
        # 0x20 - basic instruction set
        # 0x08 - set display to blank (doesn't delete contents)
        self.sleep_ms(10)
        if self.pwr:
            self.pwr.value(0) # turn off power
项目:micropython    作者:stlk    | 项目源码 | 文件源码
def beacon():
    fader.stop()
    for repeat in range(0, 10):
        for pos in range(0, NEOPIXEL_COUNT):
            for i in range(0, NEOPIXEL_COUNT):
                if i == pos:
                    np[i] = (249, 72, 119)
                else:
                    np[i] = (0, 0, 0)
            np.write()
            time.sleep_ms(100)
    fader.start()
项目:micropython-adafruit-tcs34725    作者:adafruit    | 项目源码 | 文件源码
def read(self, raw=False):
        was_active = self.active()
        self.active(True)
        while not self._valid():
            time.sleep_ms(int(self._integration_time + 0.9))
        data = tuple(self._register16(register) for register in (
            _REGISTER_RDATA,
            _REGISTER_GDATA,
            _REGISTER_BDATA,
            _REGISTER_CDATA,
        ))
        self.active(was_active)
        if raw:
            return data
        return self._temperature_and_lux(data)
项目:SX127x_driver_for_MicroPython_on_ESP8266    作者:Wei1234c    | 项目源码 | 文件源码
def poweron(self):
        self.res(1)
        time.sleep_ms(1)
        self.res(0)
        time.sleep_ms(10)
        self.res(1)
项目:SX127x_driver_for_MicroPython_on_ESP8266    作者:Wei1234c    | 项目源码 | 文件源码
def poweron(self):
        self.res(1)
        time.sleep_ms(1)
        self.res(0)
        time.sleep_ms(10)
        self.res(1)
项目:ulnoiot    作者:ulno    | 项目源码 | 文件源码
def run(updates=5, sleepms=1, poll_rate_inputs=4, poll_rate_network=10):
    # updates: send out a status every this amount of seconds.
    #          If 0, never send time based status updates only when change happened.
    # sleepms: going o sleep for how long between each loop run
    # poll_rate_network: how often to evaluate incoming network commands
    # poll_rate_inputs: how often to evaluate inputs
    _init_mqtt()
    if updates == 0:
        overflow = 1000
    else:
        overflow = updates * 1000
    overflow *= poll_rate_network * poll_rate_inputs / sleepms
    overflow = int(overflow)
    #    print("Overflow:",overflow)
    counter = 0

    while True:
        if counter % poll_rate_network == 0:
            _poll_subscripton()
        if counter % poll_rate_inputs == 0:
            device_list = []
            for d in _devlist.values():
                if d.update():
                    device_list.append(d)
            if len(device_list) > 0:
                _publish_status(device_list)
        if updates != 0 and counter % (updates * 1000) == 0:
            print("Publishing full update.")
            _publish_status(ignore_time=True)
        # execute things on timestack
        now = time.ticks_ms()
        for id in list(_timestack):
            t, cb = _timestack[id]
            if time.ticks_diff(now, t) >= 0:
                del (_timestack[id])
                cb(id)

        time.sleep_ms(sleepms)
        counter += 1
        if counter >= overflow:
            counter = 0
项目:ulnoiot    作者:ulno    | 项目源码 | 文件源码
def acquire_out_buffer(self):
        while self.out_buffer_lock == True:
            time.sleep_ms(1) # Wait for release
        self.irqstate=machine.disable_irq()
        if self.out_buffer_lock == True: # TODO: check if this locking is enough
            machine.enable_irq(self.irqstate)
            return False
        self.out_buffer_lock=True
        return True
项目:ulnoiot    作者:ulno    | 项目源码 | 文件源码
def sleep_ms(t): time.sleep(t/1000)
项目:ulnoiot    作者:ulno    | 项目源码 | 文件源码
def receive(self, request=0, timeoutms=None):
        # receive into network buffer,
        # fill buffer once and decrypt
        # if request>0 wait blocking for request number of bytes (if timeout
        # given interrupt after timeoutms ms)
        # timeout==None: block
        # timeout==0: return immediately after trying to read something from
        #             network buffer
        # timeout>0: try for time specified to read something before returning
        #            this function always returns a pointer to the buffer and
        #            number of bytes read (could be 0)
        data = self.netbuf_in
        data_mv = self.netbuf_in_mv
        readbytes = 0
        start_t = ticks_ms()
        while readbytes < self.netbuf_size:
            try:
                if self.sock_read(data_mv[readbytes:readbytes+1]):
                    readbytes += 1 # result was not 0 or none
                else:
                    if readbytes >= request:
                        break  # break if not blocking to request size
            except OSError as e:
                if len(e.args) > 0 \
                        and (e.args[0] == errno.EAGAIN
                        or e.args[0] == errno.ETIMEDOUT):
                    if readbytes >= request:
                        break  # break if not blocking to request size
                else:
                    raise
            if timeoutms is not None \
                    and ticks_diff(ticks_ms(), start_t) >= timeoutms:
                break
            sleep_ms(1) # prevent busy waiting
        if readbytes>0: self.crypt_in.decrypt(data,length=readbytes)
        return data,readbytes
项目:uPython-ESP8266-01-umqtt    作者:phieber    | 项目源码 | 文件源码
def sub_cb(topic, msg):
    if lcd.cursor_y * lcd.num_columns + lcd.cursor_x + len(str(msg)) > (lcd.num_lines * lcd.num_columns):
        lcd.clear()
        lcd.move_to(0,0)
    lcd.putstr("Puffer[" + str(topic, "utf-8").split("/")[-1] + "]:" + str(msg, "utf-8") + "\n")
    time.sleep_ms(3000)
项目:uPython-ESP8266-01-umqtt    作者:phieber    | 项目源码 | 文件源码
def main():
    c.set_callback(sub_cb)
    c.connect()
    #c.subscribe(CLIENT_ID + "/display")
    c.subscribe("esp8266-35cacf00/temperature/0")
    c.subscribe("esp8266-35cacf00/temperature/1")
    c.subscribe("esp8266-35cacf00/temperature/2")

    while True:
        c.check_msg()
        time.sleep_ms(500)
    c.disconnect()
项目:templogger    作者:aequitas    | 项目源码 | 文件源码
def read_temps():
    """Read DS18B20's."""
    dat = machine.Pin(PIN_1W)
    ds = ds18x20.DS18X20(onewire.OneWire(dat))

    ds.convert_temp()
    time.sleep_ms(750)

    return {rom_to_hex(rom): ds.read_temp(rom) for rom in ds.scan()}
项目:micropython-adafruit-tsl2561    作者:adafruit    | 项目源码 | 文件源码
def _read(self):
        was_active = self.active()
        self.active(True)
        if not was_active:
            # if the sensor was off, wait for measurement
            time.sleep_ms(_INTEGRATION_TIME[self._integration_time][1])
        broadband = self._register16(_REGISTER_CHANNEL0)
        ir = self._register16(_REGISTER_CHANNEL1)
        self.active(was_active)
        return broadband, ir
项目:wipy-ussd1306    作者:mbirth    | 项目源码 | 文件源码
def sleep_ms(self, mseconds):
        try:
            time.sleep_ms(mseconds)
        except AttributeError:
            machine.delay(mseconds)
项目:wipy-ussd1306    作者:mbirth    | 项目源码 | 文件源码
def power_off(self):
        self.clear()
        self.set_power(self.POWER_DOWN)
        self.sleep_ms(10)
        if self.pwr:
            self.pwr.value(0) # turn off power
项目:developmentBoard    作者:TPYBoard    | 项目源码 | 文件源码
def led_state():
    p2 = Pin(2, Pin.OUT)
    p2.value(0)
    time.sleep_ms(500)
    p2.value(1)
    time.sleep_ms(500)
    p2.value(0)
    time.sleep_ms(500)
    p2.value(1)
项目:developmentBoard    作者:TPYBoard    | 项目源码 | 文件源码
def sleep_ms(self, mseconds):
        try:
            time.sleep_ms(mseconds)
        except AttributeError:
            machine.delay(mseconds)