Python serial 模块,to_bytes() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用serial.to_bytes()

项目:android3dblendermouse    作者:sketchpunk    | 项目源码 | 文件源码
def encode(self, input, final=False):
        state = self.state
        encoded = []
        for c in input.upper():
            if c in HEXDIGITS:
                z = HEXDIGITS.index(c)
                if state:
                    encoded.append(z + (state & 0xf0))
                    state = 0
                else:
                    state = 0x100 + (z << 4)
            elif c == ' ':      # allow spaces to separate values
                if state and self.errors == 'strict':
                    raise UnicodeError('odd number of hex digits')
                state = 0
            else:
                if self.errors == 'strict':
                    raise UnicodeError('non-hex digit found: %r' % c)
        self.state = state
        return serial.to_bytes(encoded)
项目:microperi    作者:c0d3st0rm    | 项目源码 | 文件源码
def encode(self, input, final=False):
        state = self.state
        encoded = []
        for c in input.upper():
            if c in HEXDIGITS:
                z = HEXDIGITS.index(c)
                if state:
                    encoded.append(z + (state & 0xf0))
                    state = 0
                else:
                    state = 0x100 + (z << 4)
            elif c == ' ':      # allow spaces to separate values
                if state and self.errors == 'strict':
                    raise UnicodeError('odd number of hex digits')
                state = 0
            else:
                if self.errors == 'strict':
                    raise UnicodeError('non-hex digit found: %r' % c)
        self.state = state
        return serial.to_bytes(encoded)
项目:gcodeplot    作者:arpruss    | 项目源码 | 文件源码
def encode(self, data, final=False):
        """\
        Incremental encode, keep track of digits and emit a byte when a pair
        of hex digits is found. The space is optional unless the error
        handling is defined to be 'strict'.
        """
        state = self.state
        encoded = []
        for c in data.upper():
            if c in HEXDIGITS:
                z = HEXDIGITS.index(c)
                if state:
                    encoded.append(z + (state & 0xf0))
                    state = 0
                else:
                    state = 0x100 + (z << 4)
            elif c == ' ':      # allow spaces to separate values
                if state and self.errors == 'strict':
                    raise UnicodeError('odd number of hex digits')
                state = 0
            else:
                if self.errors == 'strict':
                    raise UnicodeError('non-hex digit found: {!r}'.format(c))
        self.state = state
        return serial.to_bytes(encoded)
项目:bitio    作者:whaleygeek    | 项目源码 | 文件源码
def encode(self, data, final=False):
        """\
        Incremental encode, keep track of digits and emit a byte when a pair
        of hex digits is found. The space is optional unless the error
        handling is defined to be 'strict'.
        """
        state = self.state
        encoded = []
        for c in data.upper():
            if c in HEXDIGITS:
                z = HEXDIGITS.index(c)
                if state:
                    encoded.append(z + (state & 0xf0))
                    state = 0
                else:
                    state = 0x100 + (z << 4)
            elif c == ' ':      # allow spaces to separate values
                if state and self.errors == 'strict':
                    raise UnicodeError('odd number of hex digits')
                state = 0
            else:
                if self.errors == 'strict':
                    raise UnicodeError('non-hex digit found: {!r}'.format(c))
        self.state = state
        return serial.to_bytes(encoded)
项目:microbit-serial    作者:martinohanlon    | 项目源码 | 文件源码
def encode(self, input, final=False):
        state = self.state
        encoded = []
        for c in input.upper():
            if c in HEXDIGITS:
                z = HEXDIGITS.index(c)
                if state:
                    encoded.append(z + (state & 0xf0))
                    state = 0
                else:
                    state = 0x100 + (z << 4)
            elif c == ' ':      # allow spaces to separate values
                if state and self.errors == 'strict':
                    raise UnicodeError('odd number of hex digits')
                state = 0
            else:
                if self.errors == 'strict':
                    raise UnicodeError('non-hex digit found: %r' % c)
        self.state = state
        return serial.to_bytes(encoded)
项目:ddt4all    作者:cedricp    | 项目源码 | 文件源码
def encode(self, data, final=False):
        """\
        Incremental encode, keep track of digits and emit a byte when a pair
        of hex digits is found. The space is optional unless the error
        handling is defined to be 'strict'.
        """
        state = self.state
        encoded = []
        for c in data.upper():
            if c in HEXDIGITS:
                z = HEXDIGITS.index(c)
                if state:
                    encoded.append(z + (state & 0xf0))
                    state = 0
                else:
                    state = 0x100 + (z << 4)
            elif c == ' ':      # allow spaces to separate values
                if state and self.errors == 'strict':
                    raise UnicodeError('odd number of hex digits')
                state = 0
            else:
                if self.errors == 'strict':
                    raise UnicodeError('non-hex digit found: {!r}'.format(c))
        self.state = state
        return serial.to_bytes(encoded)
项目:mt7687-serial-uploader    作者:will127534    | 项目源码 | 文件源码
def encode(self, data, final=False):
        """\
        Incremental encode, keep track of digits and emit a byte when a pair
        of hex digits is found. The space is optional unless the error
        handling is defined to be 'strict'.
        """
        state = self.state
        encoded = []
        for c in data.upper():
            if c in HEXDIGITS:
                z = HEXDIGITS.index(c)
                if state:
                    encoded.append(z + (state & 0xf0))
                    state = 0
                else:
                    state = 0x100 + (z << 4)
            elif c == ' ':      # allow spaces to separate values
                if state and self.errors == 'strict':
                    raise UnicodeError('odd number of hex digits')
                state = 0
            else:
                if self.errors == 'strict':
                    raise UnicodeError('non-hex digit found: {!r}'.format(c))
        self.state = state
        return serial.to_bytes(encoded)
项目:Jackal_Velodyne_Duke    作者:MengGuo    | 项目源码 | 文件源码
def handle_socket_read(self):
        """Read from socket"""
        try:
            # read a chunk from the serial port
            data = self.socket.recv(1024)
            if data:
                # Process RFC 2217 stuff when enabled
                if self.rfc2217:
                    data = serial.to_bytes(self.rfc2217.filter(data))
                # add data to buffer
                self.buffer_net2ser += data
            else:
                # empty read indicates disconnection
                self.handle_disconnect()
        except socket.error:
            self.handle_socket_error()
项目:Jackal_Velodyne_Duke    作者:MengGuo    | 项目源码 | 文件源码
def encode(self, data, final=False):
        """\
        Incremental encode, keep track of digits and emit a byte when a pair
        of hex digits is found. The space is optional unless the error
        handling is defined to be 'strict'.
        """
        state = self.state
        encoded = []
        for c in data.upper():
            if c in HEXDIGITS:
                z = HEXDIGITS.index(c)
                if state:
                    encoded.append(z + (state & 0xf0))
                    state = 0
                else:
                    state = 0x100 + (z << 4)
            elif c == ' ':      # allow spaces to separate values
                if state and self.errors == 'strict':
                    raise UnicodeError('odd number of hex digits')
                state = 0
            else:
                if self.errors == 'strict':
                    raise UnicodeError('non-hex digit found: {!r}'.format(c))
        self.state = state
        return serial.to_bytes(encoded)
项目:Jackal_Velodyne_Duke    作者:MengGuo    | 项目源码 | 文件源码
def encode(self, data, final=False):
        """\
        Incremental encode, keep track of digits and emit a byte when a pair
        of hex digits is found. The space is optional unless the error
        handling is defined to be 'strict'.
        """
        state = self.state
        encoded = []
        for c in data.upper():
            if c in HEXDIGITS:
                z = HEXDIGITS.index(c)
                if state:
                    encoded.append(z + (state & 0xf0))
                    state = 0
                else:
                    state = 0x100 + (z << 4)
            elif c == ' ':      # allow spaces to separate values
                if state and self.errors == 'strict':
                    raise UnicodeError('odd number of hex digits')
                state = 0
            else:
                if self.errors == 'strict':
                    raise UnicodeError('non-hex digit found: {!r}'.format(c))
        self.state = state
        return serial.to_bytes(encoded)
项目:btc-fpga-miner    作者:marsohod4you    | 项目源码 | 文件源码
def reader(self):
        """loop forever and copy serial->socket"""
        self.log.debug('reader thread started')
        while self.alive:
            try:
                data = self.serial.read(1)              # read one, blocking
                n = self.serial.inWaiting()             # look if there is more
                if n:
                    data = data + self.serial.read(n)   # and get as much as possible
                if data:
                    # escape outgoing data when needed (Telnet IAC (0xff) character)
                    data = serial.to_bytes(self.rfc2217.escape(data))
                    self._write_lock.acquire()
                    try:
                        self.socket.sendall(data)       # send it over TCP
                    finally:
                        self._write_lock.release()
            except socket.error, msg:
                self.log.error('%s' % (msg,))
                # probably got disconnected
                break
        self.alive = False
        self.log.debug('reader thread terminated')
项目:btc-fpga-miner    作者:marsohod4you    | 项目源码 | 文件源码
def handle_socket_read(self):
        """Read from socket"""
        try:
            # read a chunk from the serial port
            data = self.socket.recv(1024)
            if data:
                # Process RFC 2217 stuff when enabled
                if self.rfc2217:
                    data = serial.to_bytes(self.rfc2217.filter(data))
                # add data to buffer
                self.buffer_net2ser += data
            else:
                # empty read indicates disconnection
                self.handle_disconnect()
        except socket.error:
            self.handle_socket_error()
项目:android3dblendermouse    作者:sketchpunk    | 项目源码 | 文件源码
def hex_encode(input, errors='strict'):
    return (serial.to_bytes([int(h, 16) for h in input.split()]), len(input))
项目:android3dblendermouse    作者:sketchpunk    | 项目源码 | 文件源码
def encode(self, input, errors='strict'):
        return serial.to_bytes([int(h, 16) for h in input.split()])
项目:microperi    作者:c0d3st0rm    | 项目源码 | 文件源码
def hex_encode(input, errors='strict'):
    return (serial.to_bytes([int(h, 16) for h in input.split()]), len(input))
项目:microperi    作者:c0d3st0rm    | 项目源码 | 文件源码
def encode(self, input, errors='strict'):
        return serial.to_bytes([int(h, 16) for h in input.split()])
项目:gcodeplot    作者:arpruss    | 项目源码 | 文件源码
def hex_encode(data, errors='strict'):
    """'40 41 42' -> b'@ab'"""
    return (serial.to_bytes([int(h, 16) for h in data.split()]), len(data))
项目:gcodeplot    作者:arpruss    | 项目源码 | 文件源码
def encode(self, data, errors='strict'):
        """'40 41 42' -> b'@ab'"""
        return serial.to_bytes([int(h, 16) for h in data.split()])
项目:bitio    作者:whaleygeek    | 项目源码 | 文件源码
def hex_encode(data, errors='strict'):
    """'40 41 42' -> b'@ab'"""
    return (serial.to_bytes([int(h, 16) for h in data.split()]), len(data))
项目:bitio    作者:whaleygeek    | 项目源码 | 文件源码
def encode(self, data, errors='strict'):
        """'40 41 42' -> b'@ab'"""
        return serial.to_bytes([int(h, 16) for h in data.split()])
项目:microbit-serial    作者:martinohanlon    | 项目源码 | 文件源码
def hex_encode(input, errors='strict'):
    return (serial.to_bytes([int(h, 16) for h in input.split()]), len(input))
项目:microbit-serial    作者:martinohanlon    | 项目源码 | 文件源码
def encode(self, input, errors='strict'):
        return serial.to_bytes([int(h, 16) for h in input.split()])
项目:ddt4all    作者:cedricp    | 项目源码 | 文件源码
def hex_encode(data, errors='strict'):
    """'40 41 42' -> b'@ab'"""
    return (serial.to_bytes([int(h, 16) for h in data.split()]), len(data))
项目:ddt4all    作者:cedricp    | 项目源码 | 文件源码
def encode(self, data, errors='strict'):
        """'40 41 42' -> b'@ab'"""
        return serial.to_bytes([int(h, 16) for h in data.split()])
项目:mt7687-serial-uploader    作者:will127534    | 项目源码 | 文件源码
def hex_encode(data, errors='strict'):
    """'40 41 42' -> b'@ab'"""
    return (serial.to_bytes([int(h, 16) for h in data.split()]), len(data))
项目:mt7687-serial-uploader    作者:will127534    | 项目源码 | 文件源码
def encode(self, data, errors='strict'):
        """'40 41 42' -> b'@ab'"""
        return serial.to_bytes([int(h, 16) for h in data.split()])
项目:Jackal_Velodyne_Duke    作者:MengGuo    | 项目源码 | 文件源码
def reader(self):
        """loop forever and copy serial->socket"""
        self.log.debug('reader thread started')
        while self.alive:
            try:
                data = self.serial.read(self.serial.in_waiting or 1)
                if data:
                    # escape outgoing data when needed (Telnet IAC (0xff) character)
                    self.write(serial.to_bytes(self.rfc2217.escape(data)))
            except socket.error as msg:
                self.log.error('{}'.format(msg))
                # probably got disconnected
                break
        self.alive = False
        self.log.debug('reader thread terminated')
项目:Jackal_Velodyne_Duke    作者:MengGuo    | 项目源码 | 文件源码
def writer(self):
        """loop forever and copy socket->serial"""
        while self.alive:
            try:
                data = self.socket.recv(1024)
                if not data:
                    break
                self.serial.write(serial.to_bytes(self.rfc2217.filter(data)))
            except socket.error as msg:
                self.log.error('{}'.format(msg))
                # probably got disconnected
                break
        self.stop()
项目:Jackal_Velodyne_Duke    作者:MengGuo    | 项目源码 | 文件源码
def handle_serial_read(self):
        """Reading from serial port"""
        try:
            data = os.read(self.serial.fileno(), 1024)
            if data:
                # store data in buffer if there is a client connected
                if self.socket is not None:
                    # escape outgoing data when needed (Telnet IAC (0xff) character)
                    if self.rfc2217:
                        data = serial.to_bytes(self.rfc2217.escape(data))
                    self.buffer_ser2net += data
            else:
                self.handle_serial_error()
        except Exception as msg:
            self.handle_serial_error(msg)
项目:Jackal_Velodyne_Duke    作者:MengGuo    | 项目源码 | 文件源码
def test_to_bytes(self):
        self.assertEqual(serial.to_bytes([1, 2, 3]), b'\x01\x02\x03')
        self.assertEqual(serial.to_bytes(b'\x01\x02\x03'), b'\x01\x02\x03')
        self.assertEqual(serial.to_bytes(bytearray([1,2,3])), b'\x01\x02\x03')
        # unicode is not supported test. use decode() instead of u'' syntax to be
        # compatible to Python 3.x < 3.4
        self.assertRaises(TypeError, serial.to_bytes, b'hello'.decode('utf-8'))
项目:Jackal_Velodyne_Duke    作者:MengGuo    | 项目源码 | 文件源码
def test_readlines(self):
        """Test readlines method"""
        self.s.write(serial.to_bytes([0x31, 0x0a, 0x32, 0x0a, 0x33, 0x0a]))
        self.assertEqual(
                self.s.readlines(),
                [serial.to_bytes([0x31, 0x0a]), serial.to_bytes([0x32, 0x0a]), serial.to_bytes([0x33, 0x0a])]
                )
项目:Jackal_Velodyne_Duke    作者:MengGuo    | 项目源码 | 文件源码
def test_xreadlines(self):
        """Test xreadlines method (skipped for io based systems)"""
        if hasattr(self.s, 'xreadlines'):
            self.s.write(serial.to_bytes([0x31, 0x0a, 0x32, 0x0a, 0x33, 0x0a]))
            self.assertEqual(
                    list(self.s.xreadlines()),
                    [serial.to_bytes([0x31, 0x0a]), serial.to_bytes([0x32, 0x0a]), serial.to_bytes([0x33, 0x0a])]
                    )
项目:Jackal_Velodyne_Duke    作者:MengGuo    | 项目源码 | 文件源码
def test_for_in(self):
        """Test for line in s"""
        self.s.write(serial.to_bytes([0x31, 0x0a, 0x32, 0x0a, 0x33, 0x0a]))
        lines = []
        for line in self.s:
            lines.append(line)
        self.assertEqual(
                lines,
                [serial.to_bytes([0x31, 0x0a]), serial.to_bytes([0x32, 0x0a]), serial.to_bytes([0x33, 0x0a])]
                )
项目:Jackal_Velodyne_Duke    作者:MengGuo    | 项目源码 | 文件源码
def test_alternate_eol(self):
        """Test readline with alternative eol settings (skipped for io based systems)"""
        if hasattr(self.s, 'xreadlines'):  # test if it is our FileLike base class
            self.s.write(serial.to_bytes("no\rno\nyes\r\n"))
            self.assertEqual(
                    self.s.readline(eol=serial.to_bytes("\r\n")),
                    serial.to_bytes("no\rno\nyes\r\n"))
项目:Jackal_Velodyne_Duke    作者:MengGuo    | 项目源码 | 文件源码
def hex_encode(data, errors='strict'):
    """'40 41 42' -> b'@ab'"""
    return (serial.to_bytes([int(h, 16) for h in data.split()]), len(data))
项目:Jackal_Velodyne_Duke    作者:MengGuo    | 项目源码 | 文件源码
def hex_encode(data, errors='strict'):
    """'40 41 42' -> b'@ab'"""
    return (serial.to_bytes([int(h, 16) for h in data.split()]), len(data))
项目:Jackal_Velodyne_Duke    作者:MengGuo    | 项目源码 | 文件源码
def encode(self, data, errors='strict'):
        """'40 41 42' -> b'@ab'"""
        return serial.to_bytes([int(h, 16) for h in data.split()])
项目:btc-fpga-miner    作者:marsohod4you    | 项目源码 | 文件源码
def writer(self):
        """loop forever and copy socket->serial"""
        while self.alive:
            try:
                data = self.socket.recv(1024)
                if not data:
                    break
                self.serial.write(serial.to_bytes(self.rfc2217.filter(data)))
            except socket.error, msg:
                self.log.error('%s' % (msg,))
                # probably got disconnected
                break
        self.stop()
项目:btc-fpga-miner    作者:marsohod4you    | 项目源码 | 文件源码
def handle_serial_read(self):
        """Reading from serial port"""
        try:
            data = os.read(self.serial.fileno(), 1024)
            if data:
                # store data in buffer if there is a client connected
                if self.socket is not None:
                    # escape outgoing data when needed (Telnet IAC (0xff) character)
                    if self.rfc2217:
                        data = serial.to_bytes(self.rfc2217.escape(data))
                    self.buffer_ser2net += data
            else:
                self.handle_serial_error()
        except Exception, msg:
            self.handle_serial_error(msg)
项目:btc-fpga-miner    作者:marsohod4you    | 项目源码 | 文件源码
def test_readline(self):
        """Test readline method"""
        self.s.write(serial.to_bytes("1\n2\n3\n"))
        self.failUnlessEqual(self.s.readline(), serial.to_bytes("1\n"))
        self.failUnlessEqual(self.s.readline(), serial.to_bytes("2\n"))
        self.failUnlessEqual(self.s.readline(), serial.to_bytes("3\n"))
        # this time we will get a timeout
        self.failUnlessEqual(self.s.readline(), serial.to_bytes(""))
项目:btc-fpga-miner    作者:marsohod4you    | 项目源码 | 文件源码
def test_xreadlines(self):
        """Test xreadlines method (skipped for io based systems)"""
        if hasattr(self.s, 'xreadlines'):
            self.s.write(serial.to_bytes("1\n2\n3\n"))
            self.failUnlessEqual(
                    list(self.s.xreadlines()),
                    [serial.to_bytes("1\n"), serial.to_bytes("2\n"), serial.to_bytes("3\n")]
                    )
项目:btc-fpga-miner    作者:marsohod4you    | 项目源码 | 文件源码
def test_for_in(self):
        """Test for line in s"""
        self.s.write(serial.to_bytes("1\n2\n3\n"))
        lines = []
        for line in self.s:
            lines.append(line)
        self.failUnlessEqual(
                lines,
                [serial.to_bytes("1\n"), serial.to_bytes("2\n"), serial.to_bytes("3\n")]
                )
项目:btc-fpga-miner    作者:marsohod4you    | 项目源码 | 文件源码
def test_alternate_eol(self):
        """Test readline with alternative eol settings (skipped for io based systems)"""
        if hasattr(self.s, 'xreadlines'): # test if it is our FileLike base class
            self.s.write(serial.to_bytes("no\rno\nyes\r\n"))
            self.failUnlessEqual(
                    self.s.readline(eol=serial.to_bytes("\r\n")),
                    serial.to_bytes("no\rno\nyes\r\n"))
项目:yatta_reader    作者:sound88    | 项目源码 | 文件源码
def yatta_txData(dataBuff):
        global yatta_mode , uart
        if(yatta_mode == 0):
                uart.write(serial.to_bytes(dataBuff))
项目:grid-control    作者:akej74    | 项目源码 | 文件源码
def initialize_grid(ser, lock):
    """Initialize the Grid by sending "0xC0", expected response is "0x21"

    Returns:
        - True for successful initialization
        - False otherwise
    """

    try:
        with lock:
            # Flush input and output buffers
            ser.reset_input_buffer()
            ser.reset_output_buffer()

            # Write data to serial port to initialize the Grid
            bytes_written = ser.write(serial.to_bytes([0xC0]))

            # Wait before checking response
            time.sleep(WAIT_GRID)

            # Read response, one byte = 0x21 is expected for a successful initialization
            response = ser.read(size=1)

            # Check if the Grid responded with any data
            if response:
                # Check for correct response (should be 0x21)
                if response[0] == int("0x21", 16):
                    print("Grid initialized")
                    return True

                # Incorrect response received from the grid
                else:
                    helper.show_error("Problem initializing the Grid unit.\n\n"
                                      "Response 0x21 expected, got " + hex(ord(response)) + ".\n\n"
                                      "Please check serial port " + ser.port +".\n")
                    return False

            # In case no response (0 bytes) from the Grid
            else:
                helper.show_error("Problem initializing the Grid unit.\n\n"
                                  "Response 0x21 expected, no response received.\n\n"
                                   "Please check serial port " + ser.port +".\n")
                return False

    except Exception as e:
            helper.show_error("Problem initializing the Grid unit.\n\n"
                              "Exception:\n" + str(e) + "\n\n"
                              "The application will now exit.")
            sys.exit(0)
项目:grid-control    作者:akej74    | 项目源码 | 文件源码
def set_fan(ser, fan, voltage, lock):
    """Sets voltage of a specific fan.
    Note:
        The Grid only supports voltages between 4.0V and 12.0V in 0.5V steps (e.g. 4.0, 7.5. 12.0)
        Configuring "0V" stops a fan.
    """

    # Valid voltages and corresponding data (two bytes)
    speed_data = {0:    [0x00, 0x00],  # 0%     (Values below 4V is not supported by the Grid, fans will be stopped)
                  4.0:  [0x04, 0x00],  # 33.3%  (4.0V)
                  4.5:  [0x04, 0x50],  # 37.5%  (4.5V)
                  5.0:  [0x05, 0x00],  # 41.7%  (5V)
                  5.5:  [0x05, 0x50],  # 45.8%  (5.5V)
                  6.0:  [0x06, 0x00],  # 50.0%  (6V)
                  6.5:  [0x06, 0x50],  # 54.2%  (6.5V)
                  7.0:  [0x07, 0x00],  # 58.3%  (7V)
                  7.5:  [0x07, 0x50],  # 62.5%  (7.5V)
                  8.0:  [0x08, 0x00],  # 66.7%  (8V)
                  8.5:  [0x08, 0x50],  # 70.1%  (8.5V)
                  9.0:  [0x09, 0x00],  # 75.0%  (9.0V)
                  9.5:  [0x09, 0x50],  # 79.2%  (9.5V)
                  10.0: [0x0A, 0x00],  # 83.3%  (10.0V)
                  10.5: [0x0A, 0x50],  # 87.5%  (10.5V)
                  11.0: [0x0B, 0x00],  # 91.7%  (11.0V)
                  11.5: [0x0B, 0x50],  # 95.8%  (11.5V)
                  12.0: [0x0C, 0x00]}  # 100.0% (12.0V)

    fan_data = {1: 0x01,  # Fan 1
                2: 0x02,  # Fan 2
                3: 0x03,  # Fan 3
                4: 0x04,  # Fan 4
                5: 0x05,  # Fan 5
                6: 0x06}  # Fan 6

    # Define bytes to be sent to the Grid for configuring a specific fan's voltage
    # Format is seven bytes:
    # 44 <fan id> C0 00 00 <voltage integer> <voltage decimal>
    #
    # Example configuring "7.5V" for fan "1":
    # 44 01 C0 00 00 07 50
    serial_data = [0x44, fan_data[fan], 0xC0, 0x00, 0x00, speed_data[voltage][0], speed_data[voltage][1]]

    try:
        with lock:
            bytes_written = ser.write(serial.to_bytes(serial_data))
            time.sleep(WAIT_GRID)

            # TODO: Check reponse
            # Expected response is one byte
            response = ser.read(size=1)
            print("Fan " + str(fan) + " updated")
    except Exception as e:
        helper.show_error("Could not set speed for fan " + str(fan) + ".\n\n"
                          "Please check settings for serial port " + str(ser.port) + ".\n\n"
                          "Exception:\n" + str(e) + "\n\n"
                          "The application will now exit.")
        sys.exit(0)
项目:grid-control    作者:akej74    | 项目源码 | 文件源码
def read_fan_rpm(ser, lock):
    """Reads the current rpm of each fan.
    Returns:
        - If success: A list with rpm data for each fan
        - If failure to read data: An empty list
    """

    # List to hold fan rpm data to be returned
    fans = []

    with lock:
        for fan in [0x01, 0x02, 0x03, 0x04, 0x05, 0x06]:
            try:
                # Define bytes to be sent to the Grid for reading rpm for a specific fan
                # Format is two bytes:
                # 8A <fan id>
                serial_data = [0x8A, fan]

                ser.reset_output_buffer()
                # TODO: Check bytes written
                bytes_written = ser.write(serial.to_bytes(serial_data))

                # Wait before checking response
                time.sleep(WAIT_GRID)

                # Expected response is 5 bytes
                # Example response: C0 00 00 03 00 = 0x0300 = 768 rpm (two bytes unsigned)
                response = ser.read(size=5)

                # Check if the Grid responded with any data
                if response:
                    # Check for correct response, first three bytes should be C0 00 00
                    if response[0] == int("0xC0", 16) and response[1] == response[2] == int("0x00", 16):
                        # Convert rpm from 2-bytes unsigned value to decimal
                        rpm = response[3] * 256 + response[4]
                        fans.append(rpm)

                    # An incorrect response was received, return an empty list
                    else:
                        return []

                # In case no response (0 bytes) received, return an empty list
                else:
                    return []

            except Exception as e:
                helper.show_error("Could not read rpm for fan " + str(fan) + ".\n\n"
                                  "Please check serial port settings.\n\n"
                                  "Exception:\n" + str(e) + "\n\n"
                                  "The application will now exit.")
                print(str(e))
                sys.exit(0)

        # Fan
        return fans
项目:grid-control    作者:akej74    | 项目源码 | 文件源码
def read_fan_voltage(ser, lock):
    """Reads the current voltage of each fan.

    Returns:
        - If success: a list with voltage data for each fan
        - If failure to read data: An empty list
    """

    # List to hold fan voltage data to be returned
    fans = []

    with lock:
        for fan in [0x01, 0x02, 0x03, 0x04, 0x05, 0x06]:
            try:
                # Define bytes to be sent to the Grid for reading voltage for a specific fan
                # Format is two bytes, e.g. [0x84, <fan id>]
                serial_data = [0x84, fan]

                ser.reset_output_buffer()
                bytes_written = ser.write(serial.to_bytes(serial_data))

                # Wait before checking response
                time.sleep(WAIT_GRID)

                # Expected response is 5 bytes
                # Example response: 00 00 00 0B 01 = 0x0B 0x01 = 11.01 volt
                response = ser.read(size=5)

                # Check if the Grid responded with any data
                if response:
                    # Check for correct response (first three bytes should be 0x00)
                    if response[0] == int("0xC0", 16) and response[1] == response[2] == int("0x00", 16):
                        # Convert last two bytes to a decimal float value
                        voltage = float(str(response[3]) + "." + str(response[4]))
                        # Add the voltage for the current fan to the list
                        fans.append(voltage)

                    # An incorrect response was received
                    else:
                        print("Error reading fan voltage, incorrect response")
                        return []

                # In case no response (0 bytes) is returned from the Grid
                else:
                    print("Error reading fan voltage, no data returned")
                    return []

            except Exception as e:
                helper.show_error("Could not read fan voltage.\n\n"
                                  "Please check serial port " + ser.port + ".\n\n"
                                  "Exception:\n" + str(e) + "\n\n"
                                  "The application will now exit.")
                print(str(e))
                sys.exit(0)

        return fans
项目:OpenHWControl    作者:kusti8    | 项目源码 | 文件源码
def initialize_grid(ser, lock):
    """Initialize the Grid by sending "0xC0", expected response is "0x21"

    Returns:
        - True for successful initialization
        - False otherwise
    """

    try:
        with lock:
            # Flush input and output buffers
            ser.reset_input_buffer()
            ser.reset_output_buffer()

            # Write data to serial port to initialize the Grid
            bytes_written = ser.write(serial.to_bytes([0xC0]))

            # Wait before checking response
            time.sleep(WAIT_GRID)

            # Read response, one byte = 0x21 is expected for a successful initialization
            response = ser.read(size=1)

            # Check if the Grid responded with any data
            if response:
                # Check for correct response (should be 0x21)
                if response[0] == int("0x21", 16):
                    print("Grid initialized")
                    return True

                # Incorrect response received from the grid
                else:
                    helper.show_error("Problem initializing the Grid unit.\n\n"
                                      "Response 0x21 expected, got " + hex(ord(response)) + ".\n\n"
                                      "Please check serial port " + ser.port +".\n")
                    return False

            # In case no response (0 bytes) from the Grid
            else:
                helper.show_error("Problem initializing the Grid unit.\n\n"
                                  "Response 0x21 expected, no response received.\n\n"
                                   "Please check serial port " + ser.port +".\n")
                return False

    except Exception as e:
            pass
项目:OpenHWControl    作者:kusti8    | 项目源码 | 文件源码
def set_fan(ser, fan, voltage, lock):
    """Sets voltage of a specific fan.
    Note:
        The Grid only supports voltages between 4.0V and 12.0V in 0.5V steps (e.g. 4.0, 7.5. 12.0)
        Configuring "0V" stops a fan.
    """

    # Valid voltages and corresponding data (two bytes)
    speed_data = {0:    [0x00, 0x00],  # 0%     (Values below 4V is not supported by the Grid, fans will be stopped)
                  4.0:  [0x04, 0x00],  # 33.3%  (4.0V)
                  4.5:  [0x04, 0x50],  # 37.5%  (4.5V)
                  5.0:  [0x05, 0x00],  # 41.7%  (5V)
                  5.5:  [0x05, 0x50],  # 45.8%  (5.5V)
                  6.0:  [0x06, 0x00],  # 50.0%  (6V)
                  6.5:  [0x06, 0x50],  # 54.2%  (6.5V)
                  7.0:  [0x07, 0x00],  # 58.3%  (7V)
                  7.5:  [0x07, 0x50],  # 62.5%  (7.5V)
                  8.0:  [0x08, 0x00],  # 66.7%  (8V)
                  8.5:  [0x08, 0x50],  # 70.1%  (8.5V)
                  9.0:  [0x09, 0x00],  # 75.0%  (9.0V)
                  9.5:  [0x09, 0x50],  # 79.2%  (9.5V)
                  10.0: [0x0A, 0x00],  # 83.3%  (10.0V)
                  10.5: [0x0A, 0x50],  # 87.5%  (10.5V)
                  11.0: [0x0B, 0x00],  # 91.7%  (11.0V)
                  11.5: [0x0B, 0x50],  # 95.8%  (11.5V)
                  12.0: [0x0C, 0x00]}  # 100.0% (12.0V)

    fan_data = {1: 0x01,  # Fan 1
                2: 0x02,  # Fan 2
                3: 0x03,  # Fan 3
                4: 0x04,  # Fan 4
                5: 0x05,  # Fan 5
                6: 0x06}  # Fan 6

    # Define bytes to be sent to the Grid for configuring a specific fan's voltage
    # Format is seven bytes:
    # 44 <fan id> C0 00 00 <voltage integer> <voltage decimal>
    #
    # Example configuring "7.5V" for fan "1":
    # 44 01 C0 00 00 07 50
    serial_data = [0x44, fan_data[fan], 0xC0, 0x00, 0x00, speed_data[voltage][0], speed_data[voltage][1]]

    try:
        with lock:
            bytes_written = ser.write(serial.to_bytes(serial_data))
            time.sleep(WAIT_GRID)

            # TODO: Check reponse
            # Expected response is one byte
            response = ser.read(size=1)
            print("Fan " + str(fan) + " updated")
    except Exception as e:
        helper.show_error("Could not set speed for fan " + str(fan) + ".\n\n"
                          "Please check settings for serial port " + str(ser.port) + ".\n\n"
                          "Exception:\n" + str(e) + "\n\n"
                          "The application will now exit.")
        sys.exit(0)