我们从Python开源项目中,提取了以下6个代码示例,用于说明如何使用socket.BTPROTO_RFCOMM。
def __init__(self, addr): connected = False while(not connected): try: self.sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) self.sock.connect((addr, 1)) connected = True except KeyboardInterrupt: sys.exit(0) except Exception as e: # TODO: Move to the logging library logging.error("Exception: "+str(e)) logging.info("Could not connect. Waiting 10s") time.sleep(10) # set the timeout to 5 seconds. self.sock.settimeout(5) ## Read data from the GPS feed # # @returns Data read from the GPS
def BluetoothSocket(): return socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM)
def _connect_bluetooth(self, host: str) -> int: """ Create a socket, that holds a bluetooth-connection to an EV3 """ self._socket = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) self._socket.connect((host, 1))
def start(self): """ Starts the Bluetooth server if its not already running. The server needs to be started before connections can be made. """ if not self._running: if self._power_up_device: self.adapter.powered = True if not self.adapter.powered: raise Exception("Bluetooth device {} is turned off".format(self.adapter.device)) #register the serial port profile with Bluetooth register_spp(self._port) #start Bluetooth server #open the Bluetooth socket self._server_sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) self._server_sock.settimeout(BLUETOOTH_TIMEOUT) try: self._server_sock.bind((self.server_address, self.port)) except (socket.error, OSError) as e: if e.errno == errno.EADDRINUSE: print("Bluetooth address {} is already in use - is the server already running?".format(self.server_address)) raise e self._server_sock.listen(1) #wait for client connection self._conn_thread = WrapThread(target=self._wait_for_connection) self._conn_thread.start() self._running = True
def connect(self): """ Connect to a Bluetooth server. """ if not self._connected: if self._power_up_device: self.adapter.powered = True if not self.adapter.powered: raise Exception("Bluetooth device {} is turned off".format(self.adapter.device)) #try and find the server name or MAC address in the paired devices list server_mac = None for device in self.adapter.paired_devices: if self._server == device[0] or self._server == device[1]: server_mac = device[0] break if server_mac == None: raise Exception("Server {} not found in paired devices".format(self._server)) #create a socket self._client_sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) self._client_sock.bind((self.adapter.address, self._port)) self._client_sock.connect((server_mac, self._port)) self._connected = True self._conn_thread = WrapThread(target=self._read) self._conn_thread.start()
def _open(self) -> None: """Opens a Bluetooth socket connection to a sensor.""" if not self._server_mac_address: self.logger.error('MAC address of server not set') return try: self._sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) self._sock.connect((self._server_mac_address, self._port)) except OSError as e: self.logger.error(e)