Python websockets 模块,serve() 实例源码

我们从Python开源项目中,提取了以下37个代码示例,用于说明如何使用websockets.serve()

项目:socketshark    作者:closeio    | 项目源码 | 文件源码
def start(self):
        """
        Called by SocketShark to initialize the server and prepare & run
        SocketShark.
        """
        async def serve(websocket, path):
            client = Client(self.shark, websocket)
            await client.consumer_handler()

        config = self.shark.config
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self.shark.prepare())
        ssl_context = self.shark.get_ssl_context()
        start_server = websockets.serve(serve,
                                        config['WS_HOST'],
                                        config['WS_PORT'],
                                        ssl=ssl_context)
        self.server = loop.run_until_complete(start_server)
        self.shark.signal_ready()
        loop.run_until_complete(self.shark.run())
        loop.run_forever()
        loop.run_until_complete(self.shutdown())
        self.shark.signal_shutdown()
项目:Main    作者:N-BodyPhysicsSimulator    | 项目源码 | 文件源码
def handle(self, generator):
        self.clients = []

        async def server(client, _):
            self.clients.append(client)

            for state in generator:
                await client.send(json.dumps(state.to_dict()))

        asyncio.set_event_loop(
            asyncio.new_event_loop()
        )

        asyncio.get_event_loop().run_until_complete(
            websockets.serve(server, self.args.get('ws_host'), self.args.get('ws_port'))
        )

        asyncio.get_event_loop().run_forever()
项目:cscoins    作者:csgames    | 项目源码 | 文件源码
def serve(self):
        self.initialize()

        self.challenge_thread = ChallengeThread.ChallengeThread(self)
        self.challenge_thread.start()

        if self.ssl_on:
            ssl_context = ssl.SSLContext(protocol=ssl.PROTOCOL_SSLv23)
            ssl_context.load_cert_chain(self.ssl_cert)
            self.server_socket = websockets.serve(
                self.handle_connection, self.listen_address, self.port, ssl=ssl_context)
        else:
            self.server_socket = websockets.serve(
                self.handle_connection, self.listen_address, self.port)

        try:
            asyncio.get_event_loop().run_until_complete(self.server_socket)
            asyncio.get_event_loop().run_forever()
        except KeyboardInterrupt:
            print("Closing the server")
            asyncio.get_event_loop().close()
项目:monitor    作者:ReCodEx    | 项目源码 | 文件源码
def __init__(self, websock_uri, connections, loop, logger):
        """
        Initialize new instance

        :param websock_uri: Tuple containing hostname and port for websocket server
        :param connections: Reference to ClientConnections class through which are
            sent messages from other threads. Note, that this must be invoked thread
            safe via given message loop of asyncio module.
        :param loop: Asyncio message loop for handling connections
        :param logger: System logger instance
        """
        super().__init__()
        self._connections = connections
        self._loop = loop
        self._logger = logger
        hostname, port = websock_uri
        asyncio.set_event_loop(loop)
        start_server = websockets.serve(self.connection_handler, hostname, port)
        loop.run_until_complete(start_server)
        self._logger.info("websocket server initialized at {}:{}".format(hostname, port))
项目:cscoins    作者:BrandonWade    | 项目源码 | 文件源码
def serve(self):
        self.initialize()

        self.challenge_thread = ChallengeThread.ChallengeThread(self)
        self.challenge_thread.start()

        if self.ssl_on:
            ssl_context = ssl.SSLContext(protocol=ssl.PROTOCOL_SSLv23)
            ssl_context.load_cert_chain(self.ssl_cert)
            self.server_socket = websockets.serve(self.handle_connection, self.listen_address, self.port, ssl=ssl_context)
        else:
            self.server_socket = websockets.serve(self.handle_connection, self.listen_address, self.port)

        try:
            asyncio.get_event_loop().run_until_complete(self.server_socket)
            asyncio.get_event_loop().run_forever()
        except KeyboardInterrupt:
            print("Closing the server")
            asyncio.get_event_loop().close()
项目:litecord-reference    作者:lnmds    | 项目源码 | 文件源码
def init_task(self, flags):
        """dude people are gonna send voice packets of them crying"""

        async def voice_henlo(ws, path):
            log.info(f'[vws] new connection at {path}')

            v_conn = VoiceConnection(ws, server=self, path=path)
            await v_conn.run()

        vws = flags['server']['voice_ws']
        log.info(f'[voice_ws] running at {vws[0]}:{vws[1]}')
        self.vws_tuple = vws

        ws_server = websockets.serve(voice_henlo, host=vws[0], port=vws[1])
        await ws_server
        return True
项目:hololens-dv-server    作者:AdamNiederer    | 项目源码 | 文件源码
def serve(socket, path):
    req = await socket.recv()
    try:
        req = loads(req)
        if req.get("cmd") == "all":
            res = model.everything
        if req.get("cmd") == "search":
            res = model.similar(req.get("query") or "Ling is weird")
        elif req.get("cmd") == "node":
            res = model.doc(req.get("id"))
        else:
            res = {"resp": "err"}
        res = dumps(res)
        await socket.send(res)
    except JSONDecodeError:
        await socket.send(dumps({"resp": "err"}))
项目:Farm-server    作者:MakersLab    | 项目源码 | 文件源码
def main():
    start_server = websockets.serve(handler, '0.0.0.0', CONFIG['websocket']['port'])
    asyncio.get_event_loop().run_until_complete(start_server)
    asyncio.get_event_loop().run_forever()
项目:wsstat    作者:Fitblip    | 项目源码 | 文件源码
def setup_tasks(self):
        super().setup_tasks()
        start_server = websockets.serve(echo_time, '127.0.0.1', 65432)
        asyncio.ensure_future(start_server, loop=self.loop)
项目:fygimbal    作者:scanlime    | 项目源码 | 文件源码
def serve(self):
        return websockets.serve(self.handle_client, self.host, self.port)
项目:fygimbal    作者:scanlime    | 项目源码 | 文件源码
def run_server(gimbal, **kw):
    server = SocketServer(gimbal, **kw)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(server.serve())
    print("Server running at %s" % server.uri())
    loop.run_forever()
项目:Tuxemon-Server    作者:Tuxemon    | 项目源码 | 文件源码
def configure(self, parser, host='localhost', port=8765):
        self._server = websockets.serve(self.handler, host, port)
        self._parser = parser
项目:arc    作者:lap00zza    | 项目源码 | 文件源码
def run(self):
        """
        This is a blocking call.
        """
        self.loop.create_task(self.heartbeat())
        start_server = websockets.serve(self.ws_handler, "0.0.0.0", 5555, loop=self.loop)
        self.loop.run_until_complete(start_server)
        self.loop.run_forever()
项目:drone    作者:arunsoman    | 项目源码 | 文件源码
def start(self):
        start_server = websockets.serve(
            self.handler, '0.0.0.0', WS_SERVER_PORT)
        print(" **** starting debug socket server...")
        asyncio.get_event_loop().create_task(start_server)
项目:rabbit    作者:sopython    | 项目源码 | 文件源码
def main():
    start_server = websockets.serve(handler, 'localhost', 8000)

    asyncio.get_event_loop().run_until_complete(start_server)
    asyncio.get_event_loop().run_forever()
项目:async-skeleton-murder    作者:Euphe    | 项目源码 | 文件源码
def run(self):
        logger.info('Starting server')
        self.websocket_server = websockets.serve(self.handler, self.host, self.port, timeout=60)
        self.loop.run_until_complete(self.websocket_server)
        asyncio.async(wakeup()) #HACK so keyboard interrupt works on Windows
        self.loop.run_forever()
        self.loop.close()
        self.clean_up()
项目:highway.py    作者:PhilipTrauner    | 项目源码 | 文件源码
def start(self, address, port):
        start_server = serve(self.server, address, port)

        get_event_loop().run_until_complete(start_server)
        get_event_loop().run_forever()
项目:overstream    作者:BirknerAlex    | 项目源码 | 文件源码
def run(self):
        self.logger = logging.getLogger("websocket")
        self.logger.info("Starting OverStream Websocket on Port {port}".format(port=self.PORT))

        print(self.plugin_engine.get_data())

        start_server = websockets.serve(self.websocket_connection, 'localhost', self.PORT)

        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)

        asyncio.get_event_loop().run_until_complete(start_server)
        asyncio.get_event_loop().run_forever()
项目:click    作者:bendgk    | 项目源码 | 文件源码
def run(self, host='192.168.1.9', port=25565):
        start_server = websockets.serve(self.handler, host, port)

        asyncio.get_event_loop().run_until_complete(start_server)
        asyncio.get_event_loop().run_forever()
项目:monitor    作者:ReCodEx    | 项目源码 | 文件源码
def run(self):
        """
        Function to start websocket server, which handle and serve all connections.

        :return: This function returns when given message loop is stopped and returns
            nothing.
        """
        asyncio.set_event_loop(self._loop)
        self._loop.run_forever()
项目:chromewhip    作者:chuckus    | 项目源码 | 文件源码
def test_send_command_can_trigger_on_event_prior_to_commmand_containing_event_id(event_loop, chrome_tab):

    msg_id = 4
    frame_id = '3228.1'
    url = 'http://example.com'

    chrome_tab._message_id = msg_id - 1
    f = page.Frame(frame_id, 'test', url, 'test', 'text/html')
    p = page.Page.navigate(url)
    fe = page.FrameNavigatedEvent(f)

    ack = {'id': msg_id, 'result': {'frameId': frame_id}}
    triggers = {
        msg_id: [ack]
    }

    end_msg = copy.copy(p[0])
    end_msg['id'] = msg_id
    q = queue.Queue()
    q.put(end_msg)

    initial_msgs = [fe]

    test_server = init_test_server(triggers, initial_msgs=initial_msgs, expected=q)
    start_server = websockets.serve(test_server, TEST_HOST, TEST_PORT)
    server = await start_server
    await chrome_tab.connect()

    log.info('Sending command and awaiting...')
    result = await chrome_tab.send_command(p, await_on_event_type=page.FrameNavigatedEvent)
    assert result.get('ack') is not None
    assert result.get('event') is not None
    event = result.get('event')
    assert isinstance(event, page.FrameNavigatedEvent)
    assert event.frame.id == f.id
    assert event.frame.url == f.url

    server.close()
    await server.wait_closed()
项目:chromewhip    作者:chuckus    | 项目源码 | 文件源码
def test_send_command_can_trigger_on_event_after_commmand_containing_event_id(event_loop, chrome_tab):
    msg_id = 4
    frame_id = '3228.1'
    url = 'http://example.com'

    chrome_tab._message_id = msg_id - 1
    f = page.Frame(frame_id, 'test', url, 'test', 'text/html')
    p = page.Page.navigate(url)
    fe = page.FrameNavigatedEvent(f)

    ack = {'id': msg_id, 'result': {'frameId': frame_id}}
    triggers = {
        msg_id: [ack, delay_s(1), fe]
    }

    end_msg = copy.copy(p[0])
    end_msg['id'] = msg_id
    q = queue.Queue()
    q.put(end_msg)
    q.put(copy.copy(end_msg))

    test_server = init_test_server(triggers, expected=q)
    start_server = websockets.serve(test_server, TEST_HOST, TEST_PORT)
    server = await start_server
    await chrome_tab.connect()

    log.info('Sending command and awaiting...')
    result = await chrome_tab.send_command(p, await_on_event_type=page.FrameNavigatedEvent)
    assert result.get('ack') is not None
    assert result.get('event') is not None
    event = result.get('event')
    assert isinstance(event, page.FrameNavigatedEvent)
    assert event.frame.id == f.id
    assert event.frame.url == f.url

    server.close()
    await server.wait_closed()
项目:KSURCT-TEST    作者:jisaiahgarrett    | 项目源码 | 文件源码
def start_server(self):
        logger.info('server starting up')
        self.server = await websockets.serve(self.handle_new_connection, '0.0.0.0', self.port, timeout=1)
项目:portkey    作者:red8012    | 项目源码 | 文件源码
def start_app(host='127.0.0.1', port=5000, debug=True, log_level=logging.DEBUG, **ws_options):
    """Start the portkey app running in the event loop. This function should run once and only once for a portkey app. 

    Args:
        host: The hostname to listen on. Set this to '0.0.0.0' to have the server available externally
        port: The port of the server
        debug: Whether to automatically reload the program when the code is modified
        log_level: The log level to be passed to Portkey logger
        **ws_options: Other options to be passed to `websockets.serve()`
    """
    starter_function = functools.partial(_start_app, host=host, port=port, log_level=log_level, **ws_options)
    if debug:
        run_with_reloader(starter_function)
    else:
        starter_function()
项目:stepmania-server    作者:ningirsu    | 项目源码 | 文件源码
def start_server(self):
        """ Start the websocket server """

        self._serv = self.loop.run_until_complete(
            websockets.serve(
                self._accept_client,
                host=self.ip,
                port=self.port,
                loop=self.loop,
            )
        )
        return self._serv
项目:DarkWallet    作者:DissentDifference    | 项目源码 | 文件源码
def serve(self):
        port = self.settings.port
        return await websockets.serve(self._accept, "localhost", port)
项目:DarkWallet    作者:DissentDifference    | 项目源码 | 文件源码
def start_ws(settings):
    gateway = Gateway(settings)

    tasks = [gateway.serve()]

    # Handle CTRL-C
    def signal_handler():
        print("Stopping darkwallet-daemon...")
        gateway.stop()
        loop.stop()

    loop.add_signal_handler(signal.SIGINT, signal_handler)
    loop.run_until_complete(asyncio.wait(tasks))
    loop.run_forever()
项目:DarkWallet    作者:DissentDifference    | 项目源码 | 文件源码
def serve(self, websocket, path):
        success = True
        while success:
            message = await websocket.recv()
            success = await self.process(websocket, message)
            print("resp", len(json.dumps(success)))
            await websocket.send(json.dumps(success))
项目:DarkWallet    作者:DissentDifference    | 项目源码 | 文件源码
def start(settings):
    loop = zmq.asyncio.ZMQEventLoop()
    asyncio.set_event_loop(loop)
    context = libbitcoin.server.Context()
    darkwallet = Gateway(context, settings, loop)
    tasks = [
        websockets.serve(darkwallet.serve, 'localhost', 8888)
    ]
    tasks.extend(context.tasks())
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
项目:basic-lan-webserver    作者:TacticAlpha    | 项目源码 | 文件源码
def run(ip, port, *, client_timeout:int=45, request_timeout:int=2):

    print('Starting server...')

    requests.timeout_delay = client_timeout
    requests.request_timeout = request_timeout

    start_server = websockets.serve(requests.handler, ip, port)

    asyncio.get_event_loop().run_until_complete(start_server)

    print('Server started and listening for requests.')

    asyncio.get_event_loop().run_forever()
项目:motion-detection    作者:fabiojmendes    | 项目源码 | 文件源码
def run():
    stop_app = asyncio.Event()

    loop = asyncio.get_event_loop()
    loop.add_signal_handler(signal.SIGINT, stop_running, loop, stop_app)
    loop.add_signal_handler(signal.SIGTERM, stop_running, loop, stop_app)

    server_config = websockets.serve(handler, 'localhost', 8765)
    ws_server = loop.run_until_complete(server_config)
    print('After run until complete', ws_server, server_config)
    loop.run_until_complete(setup_redis(loop, stop_app))

    ws_server.close()
    loop.run_until_complete(ws_server.wait_closed())
    loop.close()
项目:ring-api    作者:sevaivanov    | 项目源码 | 文件源码
def _init_websockets(self):
        self.ws_eventloop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.ws_eventloop)

        self.ws_server = websockets.serve(
            self.ws_handle, self.host, self.ws_port)

        self.ws_eventloop.create_task(self.ws_server)
        self.ws_eventloop.create_task(self.ws_notify())
项目:ftcommunity-apps    作者:ftCommunity    | 项目源码 | 文件源码
def run(self): 
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)

        start_server = websockets.serve(self.handler, CLIENT, PORT)
        websocketServer = self.loop.run_until_complete(start_server)

        try:
            self.loop.run_forever()
        finally:
            websocketServer.close()
            self.loop.run_until_complete(websocketServer.wait_closed())
项目:chromewhip    作者:chuckus    | 项目源码 | 文件源码
def test_send_command_can_trigger_on_event_with_input_event(event_loop, chrome_tab):
    """test_send_command_can_trigger_on_event_with_input_event
    Below is test case that will workaround this issue
    https://github.com/chuckus/chromewhip/issues/2
    """
    msg_id = 4
    old_frame_id = '2000.1'
    frame_id = '3228.1'
    url = 'http://example.com'

    chrome_tab._message_id = msg_id - 1
    f = page.Frame(frame_id, 'test', url, 'test', 'text/html')
    p = page.Page.navigate(url)
    fe = page.FrameNavigatedEvent(f)
    fsle = page.FrameStoppedLoadingEvent(frame_id)

    # command ack is not related to proceeding events
    ack = {'id': msg_id, 'result': {'frameId': old_frame_id}}
    triggers = {
        msg_id: [ack, delay_s(1), fe, fsle]
    }

    end_msg = copy.copy(p[0])
    end_msg['id'] = msg_id
    q = queue.Queue()
    q.put(end_msg)

    test_server = init_test_server(triggers, expected=q)
    start_server = websockets.serve(test_server, TEST_HOST, TEST_PORT)
    server = await start_server
    await chrome_tab.connect()

    log.info('Sending command and awaiting...')
    result = await chrome_tab.send_command(p,
                                           input_event_type=page.FrameNavigatedEvent,
                                           await_on_event_type=page.FrameStoppedLoadingEvent)
    assert result.get('ack') is not None
    assert result.get('event') is not None
    event = result.get('event')
    assert isinstance(event, page.FrameStoppedLoadingEvent)
    assert event.frameId == f.id

    server.close()
    await server.wait_closed()
项目:chromewhip    作者:chuckus    | 项目源码 | 文件源码
def xtest_can_register_callback_on_devtools_event(event_loop, chrome_tab):
    # TODO: double check this part of the api is implemented
    interception_id = '3424.1'
    msg_id = 7
    chrome_tab._message_id = msg_id - 1
    fake_request = network.Request(url='http://httplib.org',
                                   method='POST',
                                   headers={},
                                   initialPriority='superlow',
                                   referrerPolicy='origin')
    msgs = [
        network.RequestInterceptedEvent(interceptionId=interception_id,
                                        request=fake_request,
                                        resourceType="Document",
                                        isNavigationRequest=False)

    ]

    enable = network.Network.setRequestInterceptionEnabled(enabled=True)

    # once emable command comes, send flurry in intercept events
    triggers = {
        msg_id: msgs
    }

    expected = queue.Queue()
    e0 = copy.copy(enable[0])
    e0['id'] = msg_id
    expected.put(e0)
    e1 = network.Network.continueInterceptedRequest(interceptionId=interception_id)
    expected.put(e1)

    test_server = init_test_server(triggers, expected=expected)
    start_server = websockets.serve(test_server, TEST_HOST, TEST_PORT)
    server = await start_server
    await chrome_tab.connect()

    log.info('Sending command and awaiting...')
    # TODO: registration api

    # no point returning data as nothing to do with it.
    # but how would i go about storing all the events being collected?
    #   - this is not the api for it, just add an api for storing events in a queue
    # TODO: how do declare return type of method?
    async def cb_coro(event: network.RequestInterceptedEvent):
        return network.Network.continueInterceptedRequest(interceptionId=event.interceptionId)

    with chrome_tab.schedule_coro_on_event(coro=cb_coro,
                                           event=network.RequestInterceptedEvent):
        await chrome_tab.send_command(enable)

    server.close()
    await server.wait_closed()
项目:portkey    作者:red8012    | 项目源码 | 文件源码
def _start_app(log_level, **ws_options):
    logger = logging.getLogger('Portkey')
    logger.setLevel(log_level)
    ch = logging.StreamHandler()
    ch.setLevel(log_level)
    ch.setFormatter(logging.Formatter('[%(asctime)s %(levelname)s] %(message)s'))
    logger.addHandler(ch)

    async def socket_handler(ws: websockets.WebSocketServerProtocol, path: str):
        try:
            context = types.SimpleNamespace()
            clients.add(ws)
            logger.info('Connection established on %s.', path)
            client_register_info = jsonify({
                'command': 'register',
                'data': list(id2handlerInfo.values())
            })
            # print('handlerInfo', client_register_info)
            await ws.send(client_register_info)
            logger.info('Client events registered.')
            while 1:
                msg = await ws.recv()
                # print('received', msg)
                msg = json.loads(msg)
                handler_id = msg['handler']
                ui_data = UIData(msg['uiData'])
                selector = Bundle(ws)
                # noinspection PyBroadException
                try:
                    # noinspection PyProtectedMember
                    await id2handlerInfo[handler_id]._handler(selector, ui_data, context)
                except:
                    sys.excepthook(*sys.exc_info())
                broadcast = selector.broadcast
                del selector.broadcast
                # print('sending', str(selector))
                await ws.send(str(selector))
                if broadcast.actions:
                    broadcast = str(broadcast)
                    for socket in clients:
                        await socket.send(broadcast)

        except websockets.exceptions.ConnectionClosed:
            logger.info('Connection closed.')
            clients.remove(ws)

    server = websockets.serve(socket_handler, **ws_options)
    loop.run_until_complete(server)
    try:
        logger.info('Server started on ws://%s:%d', ws_options['host'], ws_options['port'])
        loop.run_forever()
    except KeyboardInterrupt:
        pass
项目:litecord-reference    作者:lnmds    | 项目源码 | 文件源码
def start_all(app):
    """Start Gateway and HTTP."""

    server = app.litecord_server
    flags = server.flags

    await server.good.wait()

    async def henlo(ws, path):
        return await on_connection(server, ws, path)

    # we gotta get that SSL right
    # or else we are doomed
    context = None
    f_ssl = flags['ssl']
    ssl_on = f_ssl['on']
    if ssl_on:
        log.info('[ssl] creating context')

        context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
        certfile = f_ssl['certfile']
        keyfile = f_ssl['keyfile']
        context.load_cert_chain(certfile=certfile, keyfile=keyfile)
        server.ssl_cxt = context

        log.info('[ssl] done, cert_store=%r', context.cert_store_stats())
    else:
        log.info('[ssl] context not enabled')

    # start HTTP
    http = server.flags['server']['http']

    handler = app.make_handler()
    if ssl_on:
        server.http_server = app.loop.create_server(handler, host=http[0], \
            port=http[1], ssl=context)
    else:
        server.http_server = app.loop.create_server(handler, \
            host=http[0], port=http[1])

    log.info(f'[http] http://{http[0]}:{http[1]}')

    # start ws
    ws = flags['server']['ws']

    if ssl_on:
        server.ws_server = websockets.serve(henlo, host=ws[0], \
            port=ws[1], ssl=context)
    else:
        server.ws_server = websockets.serve(henlo, \
            host=ws[0], port=ws[1])

    log.info(f'[ws] ws://{ws[0]}:{ws[1]} {f"-> ws://{ws[2]}:{ws[1]}" if len(ws) > 2 else ""}')

    return True