Python serial 模块,SerialTimeoutException() 实例源码

我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用serial.SerialTimeoutException()

项目:pyUSBtin    作者:fishpepper    | 项目源码 | 文件源码
def close_can_channel(self):
        """Close CAN channel."""
        try:
            self.stop_rx_thread()
            self.serial_port.write("C\r".encode('ascii'))
        except serial.SerialTimeoutException as e:
            raise USBtinException(e)

        self.firmware_version = 0
        self.hardware_version = 0
项目:SuperOcto    作者:mcecchi    | 项目源码 | 文件源码
def write(self, data):
        if self._debug_awol:
            return len(data)

        if self._debug_drop_connection:
            self._logger.info("Debug drop of connection requested, raising SerialTimeoutException")
            raise SerialTimeoutException()

        with self._incoming_lock:
            if self.incoming is None or self.outgoing is None:
                return 0

            if "M112" in data and self._supportM112:
                self._seriallog.info("<<< {}".format(data.strip()))
                self._kill()
                return len(data)

            try:
                written = self.incoming.put(data, timeout=self._write_timeout, partial=True)
                self._seriallog.info("<<< {}".format(data.strip()))
                return written
            except queue.Full:
                self._logger.info("Incoming queue is full, raising SerialTimeoutException")
                raise SerialTimeoutException()
项目:SuperOcto    作者:mcecchi    | 项目源码 | 文件源码
def write(self, data):
        if self._debug_awol:
            return len(data)

        if self._debug_drop_connection:
            self._logger.info("Debug drop of connection requested, raising SerialTimeoutException")
            raise SerialTimeoutException()

        with self._incoming_lock:
            if self.incoming is None or self.outgoing is None:
                return 0

            if "M112" in data and self._supportM112:
                self._seriallog.info("<<< {}".format(data.strip()))
                self._kill()
                return len(data)

            try:
                written = self.incoming.put(data, timeout=self._write_timeout, partial=True)
                self._seriallog.info("<<< {}".format(data.strip()))
                return written
            except queue.Full:
                self._logger.info("Incoming queue is full, raising SerialTimeoutException")
                raise SerialTimeoutException()
项目:PJON-python    作者:Girgitt    | 项目源码 | 文件源码
def send_byte(self, b):
        try:
            if type(b) is str and len(b) == 1:
                log.debug("sending byte: %s (%s)" % (b, ord(b)))
                self._ser.write(b)
            elif type(b) is int:
                b = chr(b)
                if type(b) is str and len(b) == 1:
                    log.debug("sending byte: %s (%s)" % (b, ord(b)))
                    self._ser.write(b)
                else:
                    raise TypeError
            else:
                raise TypeError
        except TypeError:
                raise UnsupportedPayloadType("byte type should be str length 1 or int but %s found" % type(b))

        except SerialTimeoutException:
            log.exception("write timeout")

        return 0
项目:pyUSBtin    作者:fishpepper    | 项目源码 | 文件源码
def send_first_tx_fifo_message(self):
        """ Send first message in tx fifo """
        if len(self.tx_fifo) == 0:
            return

        canmsg = self.tx_fifo[0]

        try:
            self.serial_port.write('{}\r'.format(canmsg.to_string()).encode('ascii'))
        except serial.SerialTimeoutException as e:
            raise USBtinException(e)
项目:pyUSBtin    作者:fishpepper    | 项目源码 | 文件源码
def write_mcp_register(self, register, value):
        """Write given register of MCP2515

        Keyword arguments:
         register -- Register address
         value -- Value to write
        """
        try:
            cmd = "W{:02x}{:02x}".format(register, value)
            self.transmit(cmd)
        except serial.SerialTimeoutException as e:
            raise USBtinException(e)
项目:nfcpy    作者:nfcpy    | 项目源码 | 文件源码
def write(self, frame):
        if self.tty is not None:
            log.log(logging.DEBUG-1, ">>> %s", hexlify(frame))
            self.tty.flushInput()
            try:
                self.tty.write(str(frame))
            except serial.SerialTimeoutException:
                raise IOError(errno.EIO, os.strerror(errno.EIO))
项目:SuperOcto    作者:mcecchi    | 项目源码 | 文件源码
def readline(self):
        if self._debug_awol:
            time.sleep(self._read_timeout)
            return ""

        if self._debug_drop_connection:
            raise SerialTimeoutException()

        if self._debug_sleep > 0:
            # if we are supposed to sleep, we sleep not longer than the read timeout
            # (and then on the next call sleep again if there's time to sleep left)
            sleep_for = min(self._debug_sleep, self._read_timeout)
            self._debug_sleep -= sleep_for
            time.sleep(sleep_for)

            if self._debug_sleep > 0:
                # we slept the full read timeout, return an empty line
                return ""

            # otherwise our left over timeout is the read timeout minus what we already
            # slept for
            timeout = self._read_timeout - sleep_for

        else:
            # use the full read timeout as timeout
            timeout = self._read_timeout

        try:
            # fetch a line from the queue, wait no longer than timeout
            line = self.outgoing.get(timeout=timeout)
            self._seriallog.info(">>> {}".format(line.strip()))
            self.outgoing.task_done()
            return line
        except queue.Empty:
            # queue empty? return empty line
            return ""
项目:SuperOcto    作者:mcecchi    | 项目源码 | 文件源码
def readline(self):
        if self._debug_awol:
            time.sleep(self._read_timeout)
            return ""

        if self._debug_drop_connection:
            raise SerialTimeoutException()

        if self._debug_sleep > 0:
            # if we are supposed to sleep, we sleep not longer than the read timeout
            # (and then on the next call sleep again if there's time to sleep left)
            sleep_for = min(self._debug_sleep, self._read_timeout)
            self._debug_sleep -= sleep_for
            time.sleep(sleep_for)

            if self._debug_sleep > 0:
                # we slept the full read timeout, return an empty line
                return ""

            # otherwise our left over timeout is the read timeout minus what we already
            # slept for
            timeout = self._read_timeout - sleep_for

        else:
            # use the full read timeout as timeout
            timeout = self._read_timeout

        try:
            # fetch a line from the queue, wait no longer than timeout
            line = self.outgoing.get(timeout=timeout)
            self._seriallog.info(">>> {}".format(line.strip()))
            self.outgoing.task_done()
            return line
        except queue.Empty:
            # queue empty? return empty line
            return ""
项目:pyshtrih    作者:oleg-golovanov    | 项目源码 | 文件源码
def init(self):
        """
        ????? ????????????? ?????????? ????? ????????? ???????.
        """

        try:
            self.serial.write(ENQ)
            byte = self.serial.read()
            if not byte:
                raise excepts.NoConnectionError()

            if byte == NAK:
                pass
            elif byte == ACK:
                self.handle_response()
            else:
                while self.serial.read():
                    pass
                return False

            return True

        except serial.SerialTimeoutException:
            self.serial.flushOutput()
            raise excepts.ProtocolError(u'?? ??????? ???????? ???? ? ???')
        except serial.SerialException as exc:
            self.serial.flushInput()
            raise excepts.ProtocolError(unicode(exc))
项目:Jackal_Velodyne_Duke    作者:MengGuo    | 项目源码 | 文件源码
def test_WriteTimeout(self):
        """Test write() timeout."""
        # use xonxoff setting and the loop-back adapter to switch traffic on hold
        self.s.port = PORT
        self.s.write_timeout = 1.0
        self.s.xonxoff = True
        self.s.open()
        self.s.write(serial.XOFF)
        time.sleep(0.5)  # some systems need a little delay so that they can react on XOFF
        t1 = time.time()
        self.assertRaises(serial.SerialTimeoutException, self.s.write, b"timeout please" * 200)
        t2 = time.time()
        self.assertTrue(0.9 <= (t2 - t1) < 2.1, "Timeout not in the given interval ({})".format(t2 - t1))
项目:btc-fpga-miner    作者:marsohod4you    | 项目源码 | 文件源码
def test_WriteTimeout(self):
        """Test write() timeout."""
        # use xonxoff setting and the loop-back adapter to switch traffic on hold
        self.s.port = PORT
        self.s.writeTimeout = 1
        self.s.xonxoff = 1
        self.s.open()
        self.s.write(serial.XOFF)
        time.sleep(0.5) # some systems need a little delay so that they can react on XOFF
        t1 = time.time()
        self.failUnlessRaises(serial.SerialTimeoutException, self.s.write, data("timeout please"*200))
        t2 = time.time()
        self.failUnless( 0.9 <= (t2-t1) < 2.1, "Timeout not in the given interval (%s)" % (t2-t1))
项目:driveboardapp    作者:nortd    | 项目源码 | 文件源码
def _serial_write(self):
        ### sending super commands (handled in serial rx interrupt)
        if self.request_status == 1:
            self._send_char(CMD_STATUS)
            self.request_status = 0
        elif self.request_status == 2:
            self._send_char(CMD_SUPERSTATUS)
            self.request_status = 0

        if self.request_stop:
            self._send_char(CMD_STOP)
            self.request_stop = False

        if self.request_resume:
            self._send_char(CMD_RESUME)
            self.firmbuf_used = 0  # a resume resets the hardware's rx buffer
            self.request_resume = False
            self.reset_status()
            self.request_status = 2  # super request
        ### send buffer chunk
        if self.tx_buffer and len(self.tx_buffer) > self.tx_pos:
            if not self._paused:
                if (self.FIRMBUF_SIZE - self.firmbuf_used) > self.TX_CHUNK_SIZE:
                    try:
                        # to_send = ''.join(islice(self.tx_buffer, 0, self.TX_CHUNK_SIZE))
                        to_send = self.tx_buffer[self.tx_pos:self.tx_pos+self.TX_CHUNK_SIZE]
                        expectedSent = len(to_send)
                        # by protocol duplicate every char
                        to_send_double = []
                        for c in to_send:
                            to_send_double.append(c)
                            to_send_double.append(c)
                        to_send = ''.join(to_send_double)
                        #
                        t_prewrite = time.time()
                        actuallySent = self.device.write(to_send)
                        if actuallySent != expectedSent*2:
                            print "ERROR: write did not complete"
                            assumedSent = 0
                        else:
                            assumedSent = expectedSent
                            self.firmbuf_used += assumedSent
                            if self.firmbuf_used > self.FIRMBUF_SIZE:
                                print "ERROR: firmware buffer tracking too high"
                        if time.time() - t_prewrite > 0.1:
                            print "WARN: write delay 1"
                    except serial.SerialTimeoutException:
                        assumedSent = 0
                        print "ERROR: writeTimeoutError 2"
                    self.tx_pos += assumedSent
        else:
            if self.tx_buffer:  # job finished sending
                self.job_size = 0
                self.tx_buffer = []
                self.tx_pos = 0
项目:driveboardapp    作者:nortd    | 项目源码 | 文件源码
def _send_char(self, char):
        try:
            t_prewrite = time.time()
            # self.device.write(char)
            # print "send_char: [%s,%s]" % (str(ord(char)),str(ord(char)))
            self.device.write(char+char)  # by protocol send twice
            if time.time() - t_prewrite > 0.1:
                pass
                # print "WARN: write delay 2"
        except serial.SerialTimeoutException:
            print "ERROR: writeTimeoutError 1"





###########################################################################
### API ###################################################################
###########################################################################


# def find_controller(baudrate=conf['baudrate'], verbose=True):
#     if os.name == 'posix':
#         iterator = sorted(serial.tools.list_ports.grep('tty'))
#         for port, desc, hwid in iterator:
#             # print "Looking for controller on port: " + port
#             try:
#                 s = serial.Serial(port=port, baudrate=baudrate, timeout=2.0)
#                 lasaur_hello = s.read(8)
#                 if lasaur_hello.find(INFO_HELLO) > -1:
#                     return port
#                 s.close()
#             except serial.SerialException:
#                 pass
#     else:
#         # windows hack because pyserial does not enumerate USB-style com ports
#         if verbose:
#             print "Trying to find controller ..."
#         for i in range(24):
#             try:
#                 s = serial.Serial(port=i, baudrate=baudrate, timeout=2.0)
#                 lasaur_hello = s.read(8)
#                 if lasaur_hello.find(INFO_HELLO) > -1:
#                     return s.portstr
#                 s.close()
#             except serial.SerialException:
#                 pass
#     if verbose:
#         print "ERROR: No controller found."
#     return None
项目:SuperOcto    作者:mcecchi    | 项目源码 | 文件源码
def _do_send_without_checksum(self, cmd):
        if self._serial is None:
            return

        self._log("Send: " + str(cmd))

        cmd += "\n"
        written = 0
        passes = 0
        while written < len(cmd):
            to_send = cmd[written:]
            old_written = written

            try:
                result = self._serial.write(to_send)
                if result is None or not isinstance(result, int):
                    # probably some plugin not returning the written bytes, assuming all of them
                    written += len(cmd)
                else:
                    written += result
            except serial.SerialTimeoutException:
                self._log("Serial timeout while writing to serial port, trying again.")
                try:
                    result = self._serial.write(to_send)
                    if result is None or not isinstance(result, int):
                        # probably some plugin not returning the written bytes, assuming all of them
                        written += len(cmd)
                    else:
                        written += result
                except:
                    if not self._connection_closing:
                        self._logger.exception("Unexpected error while writing to serial port")
                        self._log("Unexpected error while writing to serial port: %s" % (get_exception_string()))
                        self._errorValue = get_exception_string()
                        self.close(is_error=True)
                    break
            except:
                if not self._connection_closing:
                    self._logger.exception("Unexpected error while writing to serial port")
                    self._log("Unexpected error while writing to serial port: %s" % (get_exception_string()))
                    self._errorValue = get_exception_string()
                    self.close(is_error=True)
                break

            if old_written == written:
                # nothing written this pass
                passes += 1
                if passes > self._max_write_passes:
                    # nothing written in max consecutive passes, we give up
                    message = "Could not write anything to the serial port in {} tries, something appears to be wrong with the printer communication".format(self._max_write_passes)
                    self._logger.error(message)
                    self._log(message)
                    self._errorValue = "Could not write to serial port"
                    self.close(is_error=True)
                    break

    ##~~ command handlers
项目:pyshtrih    作者:oleg-golovanov    | 项目源码 | 文件源码
def command_nopass(self, cmd, params=bytearray()):
        """
        ????? ???????? ??????? ??? ?????? ?????????.

        :type cmd: int
        :param cmd: ????? ???????
        :type params: bytearray
        :param params: ????? ?????????? ???????

        :rtype: dict
        :return: ????? ?????????? ?????? ? ???? ???????
        """

        if not isinstance(params, bytearray):
            raise TypeError(u'{} expected, got {} instead'.format(bytearray, type(params)))

        cmd_len = len(misc.int_to_bytes(cmd))
        buff = misc.bytearray_concat(
            misc.CAST_SIZE['1'](cmd_len + len(params)),
            misc.CAST_CMD[cmd_len](cmd),
            params
        )
        command = misc.bytearray_concat(STX, buff, misc.CAST_SIZE['1'](misc.lrc(buff)))

        for r in self.check(self.CHECK_NUM):
            if not r:
                continue

            for _ in xrange(self.MAX_ATTEMPTS):
                try:
                    self.serial.write(command)
                    byte = self.serial.read()
                    if byte == ACK:
                        return self.handle_response()

                except serial.SerialTimeoutException:
                    self.serial.flushOutput()
                    raise excepts.ProtocolError(u'?? ??????? ???????? ???? ? ???')
                except serial.SerialException as exc:
                    self.serial.flushInput()
                    raise excepts.ProtocolError(unicode(exc))
            else:
                raise excepts.NoConnectionError()
        else:
            raise excepts.NoConnectionError()
项目:pytelemetrycli    作者:Overdrivr    | 项目源码 | 文件源码
def do_pub(self, arg):
        """
Publishes a (value | string) on <topic>.

Usage: pub (--u8 | --u16 | --u32 | --i8 | --i16 | --i32 | --f32 | --s) <topic> <value>
        """

        if arg['--f32']:
            arg['<value>'] = float(arg['<value>'])
        elif not arg['--s']:
            try:
                arg['<value>'] = int(arg['<value>'])
            except:
                # User most likely entered a float with an integer flag
                inter = float(arg['<value>'])
                rounded = int(inter)

                if isclose(inter,rounded):
                    arg['<value>'] = rounded
                else:
                    s = "Aborted : Wrote decimal number ({0}) with integer flag.".format(arg['<value>'])
                    self.stdout.write(s + "\n")
                    logger.warning(s)
                    return


        subset = {k: arg[k] for k in ("--u8","--u16","--u32","--i8","--i16","--i32","--f32","--s")}

        valtype = None
        for i, k in subset.items():
            if k:
                valtype = self.types_lookup[i]

        if not valtype:
            logger.error(
                "Payload type [{0}] unkown."
                .format(arg))
            return

        try:
            self.telemetry.publish(arg['<topic>'],arg['<value>'],valtype)
        except SerialTimeoutException as e:
            self.stdout.write("Pub failed. Connection most likely terminated.")
            logger.error("Pub failed. Connection most likely terminated. exception : %s" % e)
            return
        except AttributeError as e:
            self.stdout.write("Pub failed because you are not connected to any device. Connect first using `serial` command.")
            logger.warning("Trying to publish while not connected. exception : %s" % e)
            return

        s = "Published on topic '{0}' : {1} [{2}]".format(arg['<topic>'], arg['<value>'],valtype)
        self.stdout.write(s + "\n")
        logger.info(s)