Python serial 模块,Serial() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用serial.Serial()

项目:esys-pbi    作者:fsxfreak    | 项目源码 | 文件源码
def find_port(self):
    # Finds the serial port names
    if sys.platform.startswith('win'):
      ports = ['COM%s' % (i+1) for i in range(256)]
    elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'):
      ports = glob.glob('/dev/ttyUSB*')
    elif sys.platform.startswith('darwin'):
      ports = glob.glob('/dev/tty.usbserial*')
    else:
      raise EnvironmentError('Error finding ports on your operating system')
    openbci_port = ''
    for port in ports:
      try:
        s = serial.Serial(port= port, baudrate = self.baudrate, timeout=self.timeout)
        s.write(b'v')
        openbci_serial = self.openbci_id(s)
        s.close()
        if openbci_serial:
          openbci_port = port;
      except (OSError, serial.SerialException):
        pass
    if openbci_port == '':
      raise OSError('Cannot find OpenBCI port')
    else:
      return openbci_port
项目:android3dblendermouse    作者:sketchpunk    | 项目源码 | 文件源码
def ThreadWorker(contextz,port,baud):
        print("ThreadProcess Start")
        btnState = "btn_a_0"

        obj = serial.Serial(port,baud,timeout=0.1)                         
        buf = b''

        while SerialListener.isActive:
            data = obj.readline()
            if data.__len__() > 0:
                buf += data
                if b'\n' in buf:
                    tmp = buf.decode().strip(' \t\n\r')
                    buf = b''
                    CmdActions.parse(tmp)

        obj.close()    
        print("ThreadProcess End")
        SerialListener.isActive = False
        return     

#====================================================================================
项目:Pyquino    作者:kmolLin    | 项目源码 | 文件源码
def open(self):
        print("i am the", self._port_)
        #self._port_ = 'COM4'
        print(self._baud_)
        #print("the open")
        self._serial_port_ = serial.Serial(self._port_,int(self._baud_))
        print("thread")
        #self._serial_port_.setRTS(self._RTS_)
        #self._serial_port_.setDTR(self._DTR_)
        self._received_thread_ = threading.Thread(target = self.__recv_func__,args=(self,))
        #print("thread")
        self._is_running_ = True
        #print("thread1")
        self._received_thread_.setDaemon(True)
        #print("thread2")
        self._received_thread_.setName("SerialPortRecvThread")
        self._received_thread_.start()
项目:astm-serial    作者:rendiya    | 项目源码 | 文件源码
def __init__(self, port='/dev/ttyACM0', baudrate=9600,timeout=10):
        """Initialize internal variables and serial connection
        :param port: The COM port to open. See the documentation for
            `pyserial <http://pyserial.sourceforge.net/>`_ for an explanation
            of the possible value. The default value is '/dev/ttyACM0'.
        :type port: str or int
        :param baudrate: 9600, 19200, 38400 where 9600 is the default
        :type baudrate: int
        """
        # The serial connection should be setup with the following parameters:
        # 1 start bit, 8 data bits, No parity bit, 1 stop bit, no hardware
        # handshake. These are all default for Serial and therefore not input
        # below
        #self.serial = serial.Serial(port=port, baudrate=baudrate, timeout=1)
        self.serial = serial.Serial(port = port, baudrate=baudrate, 
        timeout=timeout, writeTimeout=timeout,stopbits=serial.STOPBITS_ONE,
        bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE)
项目:astm-serial    作者:rendiya    | 项目源码 | 文件源码
def __init__(self, port='/dev/ttyACM0', baudrate=9600,timeout=10):
        """Initialize internal variables and serial connection
        :param port: The COM port to open. See the documentation for
            `pyserial <http://pyserial.sourceforge.net/>`_ for an explanation
            of the possible value. The default value is '/dev/ttyUSB0'.
        :type port: str or int
        :param baudrate: 9600, 19200, 38400 where 9600 is the default
        :type baudrate: int
        """
        # The serial connection should be setup with the following parameters:
        # 1 start bit, 8 data bits, No parity bit, 1 stop bit, no hardware
        # handshake. These are all default for Serial and therefore not input
        # below
        #self.serial = serial.Serial(port=port, baudrate=baudrate, timeout=1)
        self.serial = serial.Serial(port = port, baudrate=baudrate, 
        timeout=timeout, writeTimeout=timeout,stopbits=serial.STOPBITS_ONE,
        bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE)
        self.string = DataHandler()
项目:iot    作者:akademikbilisim    | 项目源码 | 文件源码
def __init__(self, port=DEFAULT_PORT, baud=ESP_ROM_BAUD):
        """Base constructor for ESPLoader bootloader interaction

        Don't call this constructor, either instantiate ESP8266ROM
        or ESP32ROM, or use ESPLoader.detect_chip().

        This base class has all of the instance methods for bootloader
        functionality supported across various chips & stub
        loaders. Subclasses replace the functions they don't support
        with ones which throw NotImplementedInROMError().

        """
        if isinstance(port, serial.Serial):
            self._port = port
        else:
            self._port = serial.serial_for_url(port)
        self._slip_reader = slip_reader(self._port)
        # setting baud rate in a separate step is a workaround for
        # CH341 driver on some Linux versions (this opens at 9600 then
        # sets), shouldn't matter for other platforms/drivers. See
        # https://github.com/espressif/esptool/issues/44#issuecomment-107094446
        self._port.baudrate = baud
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def __init__(self, protocol, deviceNameOrPortNumber, reactor, 
        baudrate = 9600, bytesize = EIGHTBITS, parity = PARITY_NONE,
        stopbits = STOPBITS_ONE, timeout = 3, xonxoff = 0, rtscts = 0):
        # do NOT use timeout = 0 !!
        self._serial = serial.Serial(deviceNameOrPortNumber, baudrate = baudrate, bytesize = bytesize, parity = parity, stopbits = stopbits, timeout = timeout, xonxoff = xonxoff, rtscts = rtscts)
        javareactor.JConnection.__init__(self, self._serial.sPort, protocol, None)
        self.flushInput()
        self.flushOutput()

        self.reactor = reactor
        self.protocol = protocol
        self.protocol.makeConnection(self)
        wb = javareactor.WriteBlocker(self, reactor.q)
        wb.start()
        self.writeBlocker = wb
        javareactor.ReadBlocker(self, reactor.q).start()
项目:android3dblendermouse    作者:sketchpunk    | 项目源码 | 文件源码
def write(self, b):
        """Write to port, controlling RTS before and after transmitting."""
        if self._alternate_rs485_settings is not None:
            # apply level for TX and optional delay
            self.setRTS(self._alternate_rs485_settings.rts_level_for_tx)
            if self._alternate_rs485_settings.delay_before_tx is not None:
                time.sleep(self._alternate_rs485_settings.delay_before_tx)
            # write and wait for data to be written
            super(RS485, self).write(b)
            super(RS485, self).flush()
            # optional delay and apply level for RX
            if self._alternate_rs485_settings.delay_before_rx is not None:
                time.sleep(self._alternate_rs485_settings.delay_before_rx)
            self.setRTS(self._alternate_rs485_settings.rts_level_for_rx)
        else:
            super(RS485, self).write(b)

    # redirect where the property stores the settings so that underlying Serial
    # instance does not see them
项目:fygimbal    作者:scanlime    | 项目源码 | 文件源码
def __init__(self, port='/dev/ttyAMA0', baudrate=115200, verbose=True, connected=None):
        self.verbose = verbose
        self.version = None

        self.connectedCV = threading.Condition()
        self.responseQueue = queue.Queue()
        self.port = serial.Serial(port, baudrate=baudrate)
        self._transactionLock = threading.Lock()

        self.tx = self.transmitThreadClass(self.port, verbose=self.verbose)
        self.rx = self.receiverThreadClass(self.port, callback=self._receive, verbose=self.verbose)
        self.rx.start()
        self.tx.start()

        if connected is None:
            self.connected = True
            self.connected = self._testForExistingConnection()
        else:
            self.connected = connected
        if self.verbose:
            if self.connected:
                print("Already connected to gimbal, version %s" % self.version)
            else:
                print("Waiting for gimbal to power on")
项目:OpenXCAccessory    作者:openxc    | 项目源码 | 文件源码
def __init__(self, dev, debug = 0):
        self.name = dev
        self.debug = debug
        if self.debug:
            print (self.name + " init")
        # Work-around for OXM-43: Intermittently Telit devices doesn't exist when system boot up
        attempt = 1
        while (attempt <= xcModemSer.MAX_ATTEMPT):
            try:
                self.freeser = serial.Serial(self.name, 115200, timeout=0, rtscts=0)
            except IOError as e:
                LOG.error("%s %s - Work-around attempt %s" % (self.name, e, attempt))
                cmd = ('xc_modem_gsm.sh -R 0; sleep 3; xc_modem_gsm.sh -R 1; xc_modem_gsm.sh -w "%s"' % dev)
                # LOG.debug("issuing: " + cmd)
                subprocess.call(cmd, shell=True)
                attempt += 1
            else:
                break;
项目:microperi    作者:c0d3st0rm    | 项目源码 | 文件源码
def write(self, b):
        """Write to port, controlling RTS before and after transmitting."""
        if self._alternate_rs485_settings is not None:
            # apply level for TX and optional delay
            self.setRTS(self._alternate_rs485_settings.rts_level_for_tx)
            if self._alternate_rs485_settings.delay_before_tx is not None:
                time.sleep(self._alternate_rs485_settings.delay_before_tx)
            # write and wait for data to be written
            super(RS485, self).write(b)
            super(RS485, self).flush()
            # optional delay and apply level for RX
            if self._alternate_rs485_settings.delay_before_rx is not None:
                time.sleep(self._alternate_rs485_settings.delay_before_rx)
            self.setRTS(self._alternate_rs485_settings.rts_level_for_rx)
        else:
            super(RS485, self).write(b)

    # redirect where the property stores the settings so that underlying Serial
    # instance does not see them
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def find_finger():
    ps = request.vars.ps
    import serial
    ser = serial.Serial('/dev/ttyACM0', 9600)
    ser.write('5')
    # ser.write(b'5') #Prefixo b necessario se estiver utilizando Python 3.X
    while True:
        line = ser.readline()
        print line
        if "FINGERFOUND" in line:
            id = line.split(",")[1]
            ser.close()
            r = requests.get("http://174.138.34.125:8081/walletapi/customer/%s/" % id)
            obj = json.loads(r.text)
            if ps == obj['password']:
                return r.text
            else:
                return 'error'
项目:adbmirror    作者:fhorinek    | 项目源码 | 文件源码
def __init__(self):
        res = Xlib.display.Display().screen().root.get_geometry()
        self.w = res.width
        self.h = res.height
        self.p = Popen("/usr/bin/xte", stdin=PIPE)

        self.down = False

        self.state = 0

        self.min_x = 63081
        self.max_x = 65449
        self.min_y = 63480
        self.max_y = 65402

        self.simulate = False
        if not self.simulate:
            port = "/dev/ttyS3"
            speed = 38400
            self.serial = serial.Serial(port, speed, timeout=0)
项目:ModbusSim    作者:emdem    | 项目源码 | 文件源码
def __init__(self, mode, port, baud=None, hostname=None, verbose=None):
        self.rtu = None
        self.mode = mode
        if self.mode == 'rtu' and baud and port:
            self.rtu = serial.Serial(port=port, baudrate=baud)
            Simulator.__init__(self, ModbusRtuServer(self.rtu))
            # timeout is too fast for 19200 so increase a little bit
            self.server._serial.timeout *= 2
            self.server._serial.interCharTimeout *= 2
            LOGGER.info('Initializing modbus %s simulator: baud = %d port = %s parity = %s' % (self.mode, baud, port, self.rtu.parity))
            LOGGER.info('stop bits = %d xonxoff = %d' % (self.rtu.stopbits, self.rtu.xonxoff))
        elif self.mode == 'tcp' and hostname and port:
            Simulator.__init__(self, TcpServer(address=hostname, port=port))
            LOGGER.info('Initializing modbus %s simulator: addr = %s port = %s' % (self.mode, hostname, port))
        else:
            raise ModbusSimError('Unknown mode: %s' % (mode))

        self.server.set_verbose(True)
项目:nodemcu-pyflasher    作者:marcelstoer    | 项目源码 | 文件源码
def __init__(self, port=DEFAULT_PORT, baud=ESP_ROM_BAUD, trace_enabled=False):
        """Base constructor for ESPLoader bootloader interaction

        Don't call this constructor, either instantiate ESP8266ROM
        or ESP32ROM, or use ESPLoader.detect_chip().

        This base class has all of the instance methods for bootloader
        functionality supported across various chips & stub
        loaders. Subclasses replace the functions they don't support
        with ones which throw NotImplementedInROMError().

        """
        if isinstance(port, serial.Serial):
            self._port = port
        else:
            self._port = serial.serial_for_url(port)
        self._slip_reader = slip_reader(self._port, self.trace)
        # setting baud rate in a separate step is a workaround for
        # CH341 driver on some Linux versions (this opens at 9600 then
        # sets), shouldn't matter for other platforms/drivers. See
        # https://github.com/espressif/esptool/issues/44#issuecomment-107094446
        self._set_port_baudrate(baud)
        self._trace_enabled = trace_enabled
项目:data-diode    作者:thephez    | 项目源码 | 文件源码
def openserialport():
    '''
    Attempt to open a serial connection using the baud rate defined in the BAUD global variable
     and continue retrying until successful.
    On successful connection, send a message out the port, return the connection to the calling function,
     and register with atexit to close the serial port when the program exits
    '''

    while True:
        try:
            ser = Serial(port=getportname(), baudrate=BAUD, bytesize=8, parity='N', stopbits=1, timeout=None, xonxoff=0, rtscts=0)
            atexit.register(closeserialport, connection=ser)
            logger.info('-'*30 + ' Serial Port opened ' + '-'*30)
            logger.info('Serial port opened successfully.\n\tPort Configuration: {}'.format(ser))
            return ser
        except Exception as e:
            logger.critical('Exception opening serial port. Retrying...\n\tException Message: {}'.format(e))

        time.sleep(10)

    return ser
项目:data-diode    作者:thephez    | 项目源码 | 文件源码
def openserialport():
    '''
    Attempt to open a serial connection using the baud rate defined in the BAUD global variable
     and continue retrying until successful.
    On successful connection, send a message out the port, return the connection to the calling function,
     and register with atexit to close the serial port when the program exits
    '''

    while True:
        try:
            ser = Serial(port=getportname(), baudrate=BAUD, bytesize=8, parity='N', stopbits=1, timeout=None, xonxoff=0, rtscts=0)
            atexit.register(closeserialport, connection=ser)
            logger.info('-'*30 + ' Serial Port opened ' + '-'*30)
            logger.info('Serial port opened successfully. Port Configuration: {}'.format(ser))
            sendmessage(ser, 'Startup - Serial Port {} Opened\n'.format(ser.port).encode())
            return ser
        except Exception as e:
            logger.critical('Exception opening serial port. Retrying...\n\tException Message: {}'.format(e))

            time.sleep(10)

    return ser
项目:pyTENMA    作者:swharden    | 项目源码 | 文件源码
def __init__(self,port=None,logFileName=False):
        print("\n### TENMA MULTIMETER INTERFACE ###")
        devList=self.device_list()
        self.port=port
        self.logFileName=logFileName
        if self.logFileName:
            self.log_init()
        if port is None:
            self.port=self.guessPort
        if not self.port in devList:
            print("!!!  %s isn't in the list of serial ports !!!"%self.port)
        print("Preparing to use",self.port)
        self.ser = serial.Serial()
        self.ser.port = self.port
        self.ser.baudrate = 19230
        self.ser.bytesize = serial.SEVENBITS
        self.ser.parity = serial.PARITY_ODD
        self.ser.stopbits = serial.STOPBITS_ONE
        self.ser.timeout = 3
        self.ser.xonxoff = False
        self.ser.rtscts = False
        self.ser.dsrdtr = False
        self.ser.writeTimeout = 3
项目:live-serial    作者:rosenbrockc    | 项目源码 | 文件源码
def __init__(self, port="lscom-w", sensors=["W", None, "K"],
                 dataform=[(int, float), (float, float, float), (int, float)],
                 seed=42):
        threading.Thread.__init__(self)
        self.dataform = {s: d for s, d in zip(sensors, dataform)}
        self.sensors = sensors
        from os import name
        if name  == 'nt': # pragma: no cover
            self.port = port
        else:
            self.port = "/dev/tty.{}".format(port)

        from serial import Serial
        self.serial = Serial(self.port, 9600, dsrdtr=True, rtscts=True)
        self.seed = seed
        self.alive = threading.Event()
        self.alive.set()
项目:SeymourSolo    作者:mapossum    | 项目源码 | 文件源码
def __init__(self, shotmgr):
                # initialize shotmanager object
                self.shotmgr = shotmgr
                self.vehicle = self.shotmgr.vehicle
                self.sPort = None

                self.registerCallbacks()
                if not os.path.exists(LOG_DIR):
                     os.makedirs(LOG_DIR)

                self.dataFile = time.strftime("%Y%m%d-%H%M%S") + ".log"

                temp_list = glob.glob ('/dev/tty[A-Za-z]*')

                for t in temp_list:
                        if "ttyACM" in t:
                                self.sPort = serial.Serial(t, 19200, timeout=1)
项目:SeymourSolo    作者:mapossum    | 项目源码 | 文件源码
def camera_feedback_callback(self, vehicle, name, msg):
                self.logFile = open(LOG_DIR + self.dataFile, "a")

                if self.sPort == None:
                        print "No Serial Relay installed"
                else:
                        cSerial(0.25, self.sPort)
                #        p = Process(target=cSerial, args=(0.2, self.sPort))
                #        p.start()

                self.logFile.write(str(msg.time_usec) + "," + str(float(msg.lat) / 10000000.0) + "," + str(float(msg.lng) / 10000000.0) + "," + str(msg.alt_rel) + "\n")
                print msg.time_usec, msg.lat * 10000000, msg.lng * 10000000, msg.alt_rel
                self.logFile.flush()
                self.logFile.close()

        #Have Camera Take Picture on Mode Change
        #This helps in testing and also ensures camera does not fall asleep before timeing out
        #Also creates a picture at a known event in the log file incase camera clock is off
项目:BiblioPixel2    作者:ManiacalLabs    | 项目源码 | 文件源码
def setDeviceID(dev, id):
        if id < 0 or id > 255:
            raise ValueError("ID must be an unsigned byte!")

        try:
            com = serial.Serial(dev, timeout=5)

            packet = DriverSerial._generateHeader(CMDTYPE.SETID, 1)
            packet.append(id)
            com.write(packet)

            resp = com.read(1)
            if len(resp) == 0:
                DriverSerial._comError()
            else:
                if ord(resp) != RETURN_CODES.SUCCESS:
                    DriverSerial._printError(ord(resp))

        except serial.SerialException:
            log.error("Problem connecting to serial device.")
            raise IOError("Problem connecting to serial device.")
项目:dsmr_parser    作者:ndokter    | 项目源码 | 文件源码
def read(self):
        """
        Read complete DSMR telegram's from the serial interface and parse it
        into CosemObject's and MbusObject's

        :rtype: generator
        """
        with serial.Serial(**self.serial_settings) as serial_handle:
            while True:
                data = serial_handle.readline()
                self.telegram_buffer.append(data.decode('ascii'))

                for telegram in self.telegram_buffer.get_all():
                    try:
                        yield self.telegram_parser.parse(telegram)
                    except InvalidChecksumError as e:
                        logger.warning(str(e))
                    except ParseError as e:
                        logger.error('Failed to parse telegram: %s', e)
项目:OpenHWControl    作者:kusti8    | 项目源码 | 文件源码
def alternatingApply(self):
        try:
            if self.alternatingList.count() != 2:
                self.error("Alternating must have two colors")
            else:
                with serial.Serial(self.portTxt.text(), 256000) as ser:
                    if self.getChannel() == None:
                        hue.power(ser, 0, "off")
                    else:
                        speed = self.alternatingSpeed.value()
                        size = self.alternatingSize.value()
                        direction = 1 if self.alternatingBackwards.isChecked() else 0
                        moving = self.alternatingMoving.isChecked()
                        hue.alternating(ser, 0, self.getChannel(), self.getColors(self.alternatingList), speed, size, moving, direction)
        except serial.serialutil.SerialException:
            self.error("Serial port is invalid. Try /dev/ttyACM0 for Linux or COM3 or COM4 for Windows")
    ## candle
项目:dashgo    作者:EAIBOT    | 项目源码 | 文件源码
def connect(self):
        try:
            print "Connecting to Arduino on port", self.port, "..."
            self.port = Serial(port=self.port, baudrate=self.baudrate, timeout=self.timeout, writeTimeout=self.writeTimeout)
            # The next line is necessary to give the firmware time to wake up.
            time.sleep(1)
            test = self.get_baud()
            if test != self.baudrate:
                time.sleep(1)
                test = self.get_baud()   
                if test != self.baudrate:
                    raise SerialException
            print "Connected at", self.baudrate
            print "Arduino is ready."

        except SerialException:
            print "Serial Exception:"
            print sys.exc_info()
            print "Traceback follows:"
            traceback.print_exc(file=sys.stdout)
            print "Cannot connect to Arduino!"
            os._exit(1)
项目:hologram-python    作者:hologram-io    | 项目源码 | 文件源码
def openSerialPort(self, device_name=None):

        if device_name is None:
            device_name = self.device_name

        try:
            self.serial_port = serial.Serial(device_name, baudrate=self.baud_rate,
                                             bytesize=8, parity='N', stopbits=1,
                                             timeout=self.timeout, write_timeout=1)
        except Exception as e:
            return False

        if not self.serial_port.isOpen():
            return False

        return True
项目:pystudio    作者:satorchi    | 项目源码 | 文件源码
def calsource_init(self,source=None):
    '''
    setup communication to the Low Frequency source
    '''
    if source==None:
        print('Please enter the calibration source: HF or LF')
        return None

    if source.upper()=='LF':
        dev='/dev/calsource-LF'
        which_freq='Low'
    else:
        dev='/dev/calsource-HF'
        which_freq='High'

    if not os.path.exists(dev):
        print('ERROR! could not connect to the %s Frequency Calibration Source.' % which_freq)
        return None

    connection=serial.Serial(dev)
    return connection
项目:creator-system-test-framework    作者:CreatorDev    | 项目源码 | 文件源码
def __init__(self, clientID, bootstrapURI, port, serialPort, debug=False, verbose=False):
        super(ConstrainedClientSerial, self).__init__(clientID, bootstrapURI, port, debug, verbose)

        print("Creating serial")
        serialConnection = serial.Serial(
                                     port=serialPort,
                                     baudrate=57600,  # 115200,
                                     parity=serial.PARITY_NONE,
                                     stopbits=serial.STOPBITS_ONE,  # STOPBITS_TWO
                                     bytesize=serial.EIGHTBITS
        )

        #self._clientSession = fdpexpect.fdspawn(self._serial, logfile=open("constrained_client_serial.log", "w"))
        self._clientSession = SerialSession(serialConnection)
        self._sendline("")  # FIXME - This should be a reboot?

        self.expect("Contiki>")
        self.init()
项目:gcodeplot    作者:arpruss    | 项目源码 | 文件源码
def write(self, b):
        """Write to port, controlling RTS before and after transmitting."""
        if self._alternate_rs485_settings is not None:
            # apply level for TX and optional delay
            self.setRTS(self._alternate_rs485_settings.rts_level_for_tx)
            if self._alternate_rs485_settings.delay_before_tx is not None:
                time.sleep(self._alternate_rs485_settings.delay_before_tx)
            # write and wait for data to be written
            super(RS485, self).write(b)
            super(RS485, self).flush()
            # optional delay and apply level for RX
            if self._alternate_rs485_settings.delay_before_rx is not None:
                time.sleep(self._alternate_rs485_settings.delay_before_rx)
            self.setRTS(self._alternate_rs485_settings.rts_level_for_rx)
        else:
            super(RS485, self).write(b)

    # redirect where the property stores the settings so that underlying Serial
    # instance does not see them
项目:bch-firmware-tool    作者:bigclownlabs    | 项目源码 | 文件源码
def __init__(self, device):
        self.ser = None
        self.ser = serial.Serial(device,
                                 baudrate=921600,  # 1152000,
                                 bytesize=serial.EIGHTBITS,
                                 parity=serial.PARITY_EVEN,
                                 stopbits=serial.STOPBITS_ONE,
                                 timeout=0.1,
                                 xonxoff=False,
                                 rtscts=False,
                                 dsrdtr=False)
        self._connect = False

        self._lock()
        self._speed_up()

        self.reset_input_buffer = self.ser.reset_input_buffer
        self.reset_output_buffer = self.ser.reset_output_buffer

        self.write = self.ser.write
        self.read = self.ser.read
        self.flush = self.ser.flush
项目:py-midi    作者:edouardtheron    | 项目源码 | 文件源码
def __init__(self, port, baudrate=31250, timeout=None):
        """
        ``port`` must be string, representing the path used for connecting to the machine's serial interface
        Example: on Raspberry3, the path to serial port is '/dev/serial0'

        ``baudrate`` is an int set up at 31250 by default and should not be changed. This is the standard
        baudrate, used by all MIDI devices.

        ``timeout`` default=None. If you *don't want* the MidiConnector.read() method to block for ever if it receives
        nothing, use this keyword argument to set up a maximum duration of blocking. The timeout is only used for reading,
        not writing.
        """

        if timeout and not (isinstance(timeout, float) or isinstance(timeout, int)):
                raise TypeError('Specified timeout must be float or integer ({} given)'.format(type(timeout)))

        self.timeout = timeout
        self.baudrate = baudrate
        self.port = port
        self.connector = serial.Serial(port=self.port, baudrate=self.baudrate, timeout=self.timeout)
项目:buttshock-py    作者:metafetish    | 项目源码 | 文件源码
def __init__(self, port, key=None, shift_baud_rate=False):
        """Initialization function. Follows RAII, so creating the object opens the
        port."""
        super(ET312SerialSync, self).__init__(key)
        # Allow derived classes to set up a port to mock serial ports for
        # tests. There are cleaner ways to mock this, but this will do for now.
        if not hasattr(self, "port"):
            # Check argument validity
            import serial
            if not port or type(port) is not str:
                raise ButtshockIOError("Serial port name is missing or is not string!")

            self.port = serial.Serial(port, 19200, timeout=1,
                                      parity=serial.PARITY_NONE,
                                      bytesize=8, stopbits=1,
                                      xonxoff=0, rtscts=0)

        self.needs_bytes = False
        # Test for python 3
        if not isinstance(bytes([0]), str):
            self.needs_bytes = True
        self.shift_baud_rate = shift_baud_rate
项目:joysix    作者:niberger    | 项目源码 | 文件源码
def __init__(self):
        self.ser = serial.Serial('/dev/ttyACM0', 9600)

        #geometrical calibration
        self.rs = [50, 50, 50]
        self.ls = [95, 130, 95]
        self.pot_rad_per_unit = 1./3000.*math.pi
        angles = [2./3.*math.pi, 0., -2./3.*math.pi]

        #placements of the 3 joysticks
        self.placements = []
        #attach point on the ball
        self.attach_ps = []
        for r,l,a in zip(self.rs, self.ls, angles):
            p_init = pose.exp(col([0, 0, 0, 0, 0, -(r+l)]))
            p_rot = pose.exp(col([0, a, 0, 0, 0, 0]))
            placement = p_rot * p_init
            self.placements.append(placement)
            attach_p = placement * col([0, 0, l])
            self.attach_ps.append(attach_p)

        #last calculated pose in logarithmic coordinates
        self.last_x = col([0, 0, 0, 0, 0, 0])
        #definition of the numerical solver
        f = lambda x: self.getValuesFromPose(pose.exp(x))
        self.solver = solver.Solver(f)
项目:robik    作者:RecunchoMaker    | 项目源码 | 文件源码
def __init__(self):
        self.tty = '/dev/ttyUSB0'
        self.serial = serial.Serial(self.tty, timeout=0.01)

        self.count = 0
        self.listo = False
        self.reset()
项目:robik    作者:RecunchoMaker    | 项目源码 | 文件源码
def __init__(self):
        self.tty = '/dev/ttyUSB0'
        self.serial = serial.Serial(self.tty, timeout=0.01)
项目:dabdabrevolution    作者:harryparkdotio    | 项目源码 | 文件源码
def wait_on_serial():
  p = serial.Serial(SERIAL_PORT, 115200, timeout=3)
  while True:
    new_data.append(p.readline().rstrip('\n\r'))
项目:Project_Automail    作者:ThermoNuclearPanda    | 项目源码 | 文件源码
def connect(self, port):
        global serial
        try:
            serial = serial.Serial(port, 9600)
        except SerialException:
            print("port already open")
        return serial

    # Writes to the Arduino
项目:Project_Automail    作者:ThermoNuclearPanda    | 项目源码 | 文件源码
def __init__(self, tty_port):
    self.arduino = Arduino()
    self.port = self.arduino.getPort()
    self.serial = self.arduino.connect(self.port)
        self.ser = serial.Serial(port=tty_port, baudrate=9600, timeout=3, dsrdtr=1)
        self.buffer = []
        self.listeners = []

    # Recieves Packet
项目:PyVMU    作者:JosephRedfern    | 项目源码 | 文件源码
def __init__(self,
                 device="/dev/tty.usbmodem1411",
                 accelerometer=False,
                 magnetometer=False,
                 gyroscope=False,
                 euler=False,
                 quaternion=False,
                 heading=False
                 ):
        """
        Opens a connection to the VMU931 device

        :param device: Serial device name (on Windows) or path (nix, including OS X).
        :param accelerometer: Enable/disable accelerometer data streaming.
        :param magnetometer: Enable/disable magnetometer data streaming.
        :param gyroscope: Enable/disable gyroscope data streaming.
        :param euler: Enable/disable euler angle data streaming.
        :param quaternion: Enable/disable quaternion data streaming.
        :param heading: Enable/disable compass heading data streaming. 
        """
        self.ser = serial.Serial(device)
        self.device_status = None
        self.parse()

        self.set_accelerometer(accelerometer)
        self.set_magnetometer(magnetometer)
        self.set_gyroscope(gyroscope)
        self.set_euler(euler)
        self.set_quaternion(quaternion)
        self.set_heading(heading)
项目:rpi-can-logger    作者:JonnoFTW    | 项目源码 | 文件源码
def __init__(self, port, baudrate=115200, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE,
                 bytesize=serial.EIGHTBITS, timeout=0.5):
        self.port = port
        self.baudrate = baudrate
        self.parity = parity
        self.stopbits = stopbits
        self.bytesize = bytesize
        self.timeout = timeout
        self.ser = serial.Serial(
            port=self.port,
            baudrate=self.baudrate,
            parity=self.parity,
            stopbits=self.stopbits,
            bytesize=self.bytesize,
            timeout=self.timeout
        )
        atexit.register(self.close)
        for msg in """24 45 49 47 50 51 2c 44 54 4d 2a 33 42 0d 0a b5 62 06 01 03 00 f0 0a 00 04 23
24 45 49 47 50 51 2c 47 42 53 2a 33 30 0d 0a b5 62 06 01 03 00 f0 09 00 03 21
24 45 49 47 50 51 2c 47 4c 4c 2a 32 31 0d 0a b5 62 06 01 03 00 f0 01 00 fb 11
24 45 49 47 50 51 2c 47 52 53 2a 32 30 0d 0a b5 62 06 01 03 00 f0 06 00 00 1b
24 45 49 47 50 51 2c 47 53 41 2a 33 33 0d 0a b5 62 06 01 03 00 f0 02 00 fc 13
24 45 49 47 50 51 2c 47 53 54 2a 32 36 0d 0a b5 62 06 01 03 00 f0 07 00 01 1d
24 45 49 47 50 51 2c 47 53 56 2a 32 34 0d 0a b5 62 06 01 03 00 f0 03 00 fd 15
24 45 49 47 50 51 2c 56 54 47 2a 32 33 0d 0a b5 62 06 01 03 00 f0 05 00 ff 19
24 45 49 47 50 51 2c 5a 44 41 2a 33 39 0d 0a b5 62 06 01 03 00 f0 08 00 02 1f
24 45 49 47 50 51 2c 47 47 41 2a 32 37 0d 0a b5 62 06 01 03 00 f0 00 01 fb 10
24 45 49 47 50 51 2c 52 4d 43 2a 33 41 0d 0a b5 62 06 01 03 00 f0 04 01 ff 18
B5 62 06 08 06 00 64 00 01 00 01 00 7A 12 B5 62 06 08 00 00 0E 30
""".splitlines():
            self.ser.write(bytes(map(lambda x: int(x, 16), msg.strip().split(' '))))
            time.sleep(0.1)
            pass
项目:pyMonitor    作者:ahmedalkabir    | 项目源码 | 文件源码
def __init__(self, port, baud_rate, byte_size=serial.EIGHTBITS, parity=serial.PARITY_NONE, stop_bit=serial.STOPBITS_ONE):

                # initialize the connection and if anything ok open the port
                self.__main_conn = serial.Serial(port, baud_rate, bytesize=byte_size, parity=parity, stopbits=stop_bit)

                # Queue
                self.dataQueue = queue.Queue()
                self.checkStatusQueue = queue.Queue()

                # Define Variables
                self.out = ''
                self.__flag_out = True
项目:astm-serial    作者:rendiya    | 项目源码 | 文件源码
def send_command(self, command):
        """Send a command and check if it is positively acknowledged
        :param command: The command to send
        :type command: str
        :raises IOError: if the negative acknowledged or a unknown response is returned
        <STX>[FN][TEXT]<ETB>[C1][C2]<CR><LF>
        """

        self.serial.write(self._astm_string(string=command))
        response = self.serial.readline()
        print response
        if response == NAK:
            message = 'Serial communication returned negative acknowledge'
            message = response.encode('hex')
            return IOError(message)
        elif response != ACK:
            message = response.encode('hex')
            return IOError(message)
项目:smart-card-removinator    作者:nkinder    | 项目源码 | 文件源码
def __init__(self, port=None):
        """Opens a connection to a Removinator controller on the specified
        serial port.

        If a port is not specified, an attempt will be made to auto-discover
        the Removinator controller.

        :param port: port that the Removinator is connected to
        :type port: str
        :raises: :exc:`ConnectError`
        :ivar last_result: The result output from the last executed command
        :ivar last_response: The response output from the last executed command
        :ivar port: The port that the Removinator is connected to
        """

        self.port = port
        self.last_result = ''
        self.last_response = ''

        # Attempt to discover the Removinator controller if a
        # port was not specified.
        if port is None:
            port = _discover_removinator()

        # Open a connection to the Removinator controller.
        try:
            self.connection = serial.Serial(
                port,
                9600,
                serial.EIGHTBITS,
                serial.PARITY_NONE,
                serial.STOPBITS_ONE,
                1)
            self.sio = io.TextIOWrapper(io.BufferedRWPair(self.connection,
                                                          self.connection))
        except serial.SerialException as e:
            raise ConnectError('Unable to open connection to Removinator '
                               'controller on port {0}'.format(port))
项目:fdxread    作者:lkarsten    | 项目源码 | 文件源码
def open(self):
        logging.debug("Opening serial port %s (read_timeout=%s)" % (self.serialport,
                      self.read_timeout))
        self.stream = serial.Serial(port=self.serialport,
                                    timeout=self.read_timeout)
        assert self.stream is not None
项目:SDS011-SDS018-and-SDS021-dust-sensors-reader    作者:MatejKovacic    | 项目源码 | 文件源码
def __init__(self, inport):
        self.serial = serial.Serial(port=inport, baudrate=9600)
项目:iot    作者:akademikbilisim    | 项目源码 | 文件源码
def __init__(self, port, baud, delay):
        self.port = port
        self.baud = baud
        self.serial = None
        self.delay = delay

        try:
            self.serial = serial.Serial(port, baud)
        except serial.SerialException as e:
            raise TransportError(e.strerror)

        self.serial.timeout = 3
        self.serial.interCharTimeout = 3
项目:muxrplot    作者:sirmo    | 项目源码 | 文件源码
def log(options):
    # configure the serial connections (the parameters differs on the device you are connecting to)
    ser = serial.Serial(
        port=options.device,
        baudrate=9600,
        parity=serial.PARITY_ODD,
        stopbits=serial.STOPBITS_TWO,
        bytesize=serial.SEVENBITS
    )

    prog = Progress()

    with open(options.outfile, 'w') as the_file:
        the_file.write('timestamp;value\n')
        while True:
            # \r\n is for device terminators set to CR LF
            ser.write((':FETCh?\r\n'))

            # wait one second before reading output.
            time.sleep(options.interval)
            out = ''
            while ser.inWaiting() > 0:
                out += ser.read(1)
            if out != '':
                out = out.rstrip()
                res = "%s;%s\n" % (time.time(), float(out))
                the_file.write(res)
                the_file.flush()
                prog.show()
项目:plantbot    作者:marooncn    | 项目源码 | 文件源码
def Start(self):
        self._Serial = serial.Serial(port = self._Port, baudrate = self._Baudrate, timeout = 1)

        self._KeepRunning = True
        self._ReceiverThread = threading.Thread(target=self._Listen)
        self._ReceiverThread.setDaemon(True)
        self._ReceiverThread.start()
项目:keithley2400    作者:bennomeier    | 项目源码 | 文件源码
def __init__(self, port=9):
        #port 9 is for the USB dongle.
        self.port = port
        self.ser =serial.Serial(self.port, baudrate=9600, stopbits=serial.STOPBITS_ONE,timeout=1)
        time.sleep(2)
项目:pypilot    作者:pypilot    | 项目源码 | 文件源码
def __init__(self, path):
        self.device = serial.Serial(*path)
        self.device.timeout=0 #nonblocking
        fcntl.ioctl(self.device.fileno(), TIOCEXCL)

        self.in_buffer = ''
        self.in_lines = []