Python asyncio 模块,StreamReaderProtocol() 实例源码

我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用asyncio.StreamReaderProtocol()

项目:aionotify    作者:rbarrois    | 项目源码 | 文件源码
def stream_from_fd(fd, loop):
    """Recieve a streamer for a given file descriptor."""
    reader = asyncio.StreamReader(loop=loop)
    protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
    waiter = asyncio.futures.Future(loop=loop)

    transport = UnixFileDescriptorTransport(
        loop=loop,
        fileno=fd,
        protocol=protocol,
        waiter=waiter,
    )

    try:
        yield from waiter
    except:
        transport.close()
        raise

    if loop.get_debug():
        logger.debug("Read fd %r connected: (%r, %r)", fd, transport, protocol)
    return reader, transport
项目:azmq    作者:ereOn    | 项目源码 | 文件源码
def open_pipe_connection(
    path=None,
    *,
    loop=None,
    limit=DEFAULT_LIMIT,
    **kwargs
):
    """
    Connect to a server using a Windows named pipe.
    """
    path = path.replace('/', '\\')
    loop = loop or asyncio.get_event_loop()

    reader = asyncio.StreamReader(limit=limit, loop=loop)
    protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
    transport, _ = await loop.create_pipe_connection(
        lambda: protocol,
        path,
        **kwargs
    )
    writer = asyncio.StreamWriter(transport, protocol, reader, loop)

    return reader, writer
项目:arduino-ciao-meteor-ddp-connector    作者:andrea689    | 项目源码 | 文件源码
def connection_made(self, transport):
        """
        A peer is now connected and we receive an instance
        of the underlying :class:`asyncio.Transport`.

        We :class:`asyncio.StreamReader` is created
        and the transport is associated before the
        initial HTTP handshake is undertaken.
        """
        #self.transport = transport
        #self.stream = asyncio.StreamReader()
        #self.stream.set_transport(transport)
        asyncio.StreamReaderProtocol.connection_made(self, transport)
        # Let make it concurrent for others to tag along
        f = asyncio.async(self.handle_initial_handshake())
        f.add_done_callback(self.terminated)
项目:wptagent    作者:WPO-Foundation    | 项目源码 | 文件源码
def connection_made(self, transport):
        """
        A peer is now connected and we receive an instance
        of the underlying :class:`asyncio.Transport`.

        We :class:`asyncio.StreamReader` is created
        and the transport is associated before the
        initial HTTP handshake is undertaken.
        """
        #self.transport = transport
        #self.stream = asyncio.StreamReader()
        #self.stream.set_transport(transport)
        asyncio.StreamReaderProtocol.connection_made(self, transport)
        # Let make it concurrent for others to tag along
        f = asyncio.async(self.handle_initial_handshake())
        f.add_done_callback(self.terminated)
项目:telegram-uz-bot    作者:vit-    | 项目源码 | 文件源码
def get_reader(self):
        if self._reader is None:
            self._reader = asyncio.StreamReader()
            protocol = asyncio.StreamReaderProtocol(self._reader)
            loop = asyncio.get_event_loop()
            await loop.connect_read_pipe(lambda: protocol, sys.stdin)
        return self._reader
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def connect_read_pipe(file):
    loop = asyncio.get_event_loop()
    stream_reader = asyncio.StreamReader(loop=loop)
    def factory():
        return asyncio.StreamReaderProtocol(stream_reader)
    transport, _ = yield from loop.connect_read_pipe(factory, file)
    return stream_reader, transport


#
# Example
#
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def task():
    rfd, wfd = os.pipe()
    args = [sys.executable, '-c', code, str(wfd)]

    pipe = open(rfd, 'rb', 0)
    reader = asyncio.StreamReader(loop=loop)
    protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
    transport, _ = yield from loop.connect_read_pipe(lambda: protocol, pipe)

    proc = yield from asyncio.create_subprocess_exec(*args, pass_fds={wfd})
    yield from proc.wait()

    os.close(wfd)
    data = yield from reader.read()
    print("read = %r" % data.decode())
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_read_all_from_pipe_reader(self):
        # See Tulip issue 168.  This test is derived from the example
        # subprocess_attach_read_pipe.py, but we configure the
        # StreamReader's limit so that twice it is less than the size
        # of the data writter.  Also we must explicitly attach a child
        # watcher to the event loop.

        code = """\
import os, sys
fd = int(sys.argv[1])
os.write(fd, b'data')
os.close(fd)
"""
        rfd, wfd = os.pipe()
        args = [sys.executable, '-c', code, str(wfd)]

        pipe = open(rfd, 'rb', 0)
        reader = asyncio.StreamReader(loop=self.loop, limit=1)
        protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop)
        transport, _ = self.loop.run_until_complete(
            self.loop.connect_read_pipe(lambda: protocol, pipe))

        watcher = asyncio.SafeChildWatcher()
        watcher.attach_loop(self.loop)
        try:
            asyncio.set_child_watcher(watcher)
            create = asyncio.create_subprocess_exec(*args,
                                                    pass_fds={wfd},
                                                    loop=self.loop)
            proc = self.loop.run_until_complete(create)
            self.loop.run_until_complete(proc.wait())
        finally:
            asyncio.set_child_watcher(None)

        os.close(wfd)
        data = self.loop.run_until_complete(reader.read(-1))
        self.assertEqual(data, b'data')
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_streamreader_constructor(self):
        self.addCleanup(asyncio.set_event_loop, None)
        asyncio.set_event_loop(self.loop)

        # Tulip issue #184: Ensure that StreamReaderProtocol constructor
        # retrieves the current loop if the loop parameter is not set
        reader = asyncio.StreamReader()
        self.assertIs(reader._loop, self.loop)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_streamreaderprotocol_constructor(self):
        self.addCleanup(asyncio.set_event_loop, None)
        asyncio.set_event_loop(self.loop)

        # Tulip issue #184: Ensure that StreamReaderProtocol constructor
        # retrieves the current loop if the loop parameter is not set
        reader = mock.Mock()
        protocol = asyncio.StreamReaderProtocol(reader)
        self.assertIs(protocol._loop, self.loop)
项目:pyledger    作者:guillemborrell    | 项目源码 | 文件源码
def stdio(loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()

    reader = asyncio.StreamReader()
    reader_protocol = asyncio.StreamReaderProtocol(reader)

    writer_transport, writer_protocol = await loop.connect_write_pipe(
        FlowControlMixin, os.fdopen(0, 'wb'))
    writer = StreamWriter(writer_transport, writer_protocol, None, loop)

    await loop.connect_read_pipe(lambda: reader_protocol, sys.stdin)

    return reader, writer
项目:jd4    作者:vijos    | 项目源码 | 文件源码
def read_pipe(file, size):
    loop = get_event_loop()
    reader = StreamReader()
    protocol = StreamReaderProtocol(reader)
    transport, _ = await loop.connect_read_pipe(
        lambda: protocol, fdopen(os_open(file, O_RDONLY | O_NONBLOCK)))
    chunks = list()
    while size > 0:
        chunk = await reader.read(size)
        if not chunk:
            break
        chunks.append(chunk)
        size -= len(chunk)
    transport.close()
    return b''.join(chunks)
项目:asif    作者:minus7    | 项目源码 | 文件源码
def stdio(loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()

    reader = asyncio.StreamReader()
    reader_protocol = asyncio.StreamReaderProtocol(reader)

    writer_transport, writer_protocol = await loop.connect_write_pipe(FlowControlMixin, os.fdopen(1, 'wb'))
    writer = StreamWriter(writer_transport, writer_protocol, None, loop)

    await loop.connect_read_pipe(lambda: reader_protocol, sys.stdin)

    return reader, writer
项目:azmq    作者:ereOn    | 项目源码 | 文件源码
def start_pipe_server(
    client_connected_cb,
    *,
    path,
    loop=None,
    limit=DEFAULT_LIMIT
):
    """
    Start listening for connection using Windows named pipes.
    """
    path = path.replace('/', '\\')
    loop = loop or asyncio.get_event_loop()

    def factory():
        reader = asyncio.StreamReader(limit=limit, loop=loop)
        protocol = asyncio.StreamReaderProtocol(
            reader,
            client_connected_cb,
            loop=loop,
        )

        return protocol

    server, *_ = await loop.start_serving_pipe(factory, address=path)

    # The returned instance sadly doesn't have a `wait_closed` method so we add
    # one.
    closed = asyncio.Event(loop=loop)
    original_close = server.close

    def close():
        original_close()
        closed.set()

    server.close = close
    server.wait_closed = closed.wait

    return server
项目:arduino-ciao-meteor-ddp-connector    作者:andrea689    | 项目源码 | 文件源码
def __init__(self, handler_cls):
        asyncio.StreamReaderProtocol.__init__(self, asyncio.StreamReader(),
                                              self._pseudo_connected)
        self.ws = handler_cls(self)
项目:wptagent    作者:WPO-Foundation    | 项目源码 | 文件源码
def __init__(self, handler_cls):
        asyncio.StreamReaderProtocol.__init__(self, asyncio.StreamReader(),
                                              self._pseudo_connected)
        self.ws = handler_cls(self)
项目:aiotk    作者:AndreLouisCaron    | 项目源码 | 文件源码
def mempipe(loop=None, limit=None):
    """In-memory pipe, returns a ``(reader, writer)`` pair.

    .. versionadded:: 0.1
    """

    loop = loop or asyncio.get_event_loop()
    limit = limit or _DEFAULT_LIMIT

    reader = asyncio.StreamReader(loop=loop, limit=limit)
    writer = asyncio.StreamWriter(
        transport=_MemoryTransport(reader),
        protocol=asyncio.StreamReaderProtocol(reader, loop=loop),
        reader=reader,
        loop=loop,
    )
    return reader, writer
项目:aws-acl-helper    作者:brandond    | 项目源码 | 文件源码
def stdio(loop=None):
    """Set up stdin/stdout stream handlers"""
    if loop is None:
        loop = asyncio.get_event_loop()

    reader = asyncio.StreamReader()
    reader_protocol = asyncio.StreamReaderProtocol(reader)

    writer_transport, writer_protocol = yield from loop.connect_write_pipe(FlowControlMixin, os.fdopen(0, 'wb'))
    writer = StreamWriter(writer_transport, writer_protocol, None, loop)

    yield from loop.connect_read_pipe(lambda: reader_protocol, sys.stdin)

    return reader, writer
项目:halite-starter-python3-alt    作者:dvdotsenko    | 项目源码 | 文件源码
def get_stdin(loop, exit_callbacks):
    """
    !!!! Super super important !!!!
     Must create stdout ***before*** stdin. Otherwise deadlock.

    :param loop:
    :return:
    :rtype: asyncio.StreamReader
    """

    stdin = asyncio.StreamReader()
    stdin_fio = os.fdopen(os.dup(sys.stdin.fileno()), 'rb')

    reader_protocol = asyncio.StreamReaderProtocol(stdin)
    read_transport, read_protocol = await loop.connect_read_pipe(
        lambda: reader_protocol,
        stdin_fio
    )

    # async def _close_transport():
    #     read_transport.close()
    #
    # exit_callbacks.append(
    #     _close_transport
    # )

    return stdin
项目:buildhub    作者:mozilla-services    | 项目源码 | 文件源码
def stream_as_generator(loop, stream):
    reader = asyncio.StreamReader(loop=loop)
    reader_protocol = asyncio.StreamReaderProtocol(reader)
    await loop.connect_read_pipe(lambda: reader_protocol, stream)

    while 'stream receives input':
        line = await reader.readline()
        if not line:  # EOF.
            break
        yield line
项目:mt7687-serial-uploader    作者:will127534    | 项目源码 | 文件源码
def open_serial_connection(**kwargs):
    """A wrapper for create_serial_connection() returning a (reader,
    writer) pair.

    The reader returned is a StreamReader instance; the writer is a
    StreamWriter instance.

    The arguments are all the usual arguments to Serial(). Additional
    optional keyword arguments are loop (to set the event loop instance
    to use) and limit (to set the buffer limit passed to the
    StreamReader.

    This function is a coroutine.
    """
    # in order to avoid errors when pySerial is installed under Python 2,
    # avoid Pyhthon 3 syntax here. So do not use this function as a good
    # example!
    loop = kwargs.get('loop', asyncio.get_event_loop())
    limit = kwargs.get('limit', asyncio.streams._DEFAULT_LIMIT)
    reader = asyncio.StreamReader(limit=limit, loop=loop)
    protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
    # in Python 3 we would write "yield transport, _ from c()"
    for transport, _ in create_serial_connection(
            loop=loop,
            protocol_factory=lambda: protocol,
            **kwargs):
        yield transport, _
    writer = asyncio.StreamWriter(transport, protocol, reader, loop)
    # in Python 3 we would write "return reader, writer"
    raise StopIteration(reader, writer)


# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# test
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_read_all_from_pipe_reader(self):
        # See asyncio issue 168.  This test is derived from the example
        # subprocess_attach_read_pipe.py, but we configure the
        # StreamReader's limit so that twice it is less than the size
        # of the data writter.  Also we must explicitly attach a child
        # watcher to the event loop.

        code = """\
import os, sys
fd = int(sys.argv[1])
os.write(fd, b'data')
os.close(fd)
"""
        rfd, wfd = os.pipe()
        args = [sys.executable, '-c', code, str(wfd)]

        pipe = open(rfd, 'rb', 0)
        reader = asyncio.StreamReader(loop=self.loop, limit=1)
        protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop)
        transport, _ = self.loop.run_until_complete(
            self.loop.connect_read_pipe(lambda: protocol, pipe))

        watcher = asyncio.SafeChildWatcher()
        watcher.attach_loop(self.loop)
        try:
            asyncio.set_child_watcher(watcher)
            create = asyncio.create_subprocess_exec(*args,
                                                    pass_fds={wfd},
                                                    loop=self.loop)
            proc = self.loop.run_until_complete(create)
            self.loop.run_until_complete(proc.wait())
        finally:
            asyncio.set_child_watcher(None)

        os.close(wfd)
        data = self.loop.run_until_complete(reader.read(-1))
        self.assertEqual(data, b'data')
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_streamreader_constructor(self):
        self.addCleanup(asyncio.set_event_loop, None)
        asyncio.set_event_loop(self.loop)

        # asyncio issue #184: Ensure that StreamReaderProtocol constructor
        # retrieves the current loop if the loop parameter is not set
        reader = asyncio.StreamReader()
        self.assertIs(reader._loop, self.loop)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_streamreaderprotocol_constructor(self):
        self.addCleanup(asyncio.set_event_loop, None)
        asyncio.set_event_loop(self.loop)

        # asyncio issue #184: Ensure that StreamReaderProtocol constructor
        # retrieves the current loop if the loop parameter is not set
        reader = mock.Mock()
        protocol = asyncio.StreamReaderProtocol(reader)
        self.assertIs(protocol._loop, self.loop)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _test_pipe(self):
        ADDRESS = r'\\.\pipe\_test_pipe-%s' % os.getpid()

        with self.assertRaises(FileNotFoundError):
            yield from self.loop.create_pipe_connection(
                asyncio.Protocol, ADDRESS)

        [server] = yield from self.loop.start_serving_pipe(
            UpperProto, ADDRESS)
        self.assertIsInstance(server, windows_events.PipeServer)

        clients = []
        for i in range(5):
            stream_reader = asyncio.StreamReader(loop=self.loop)
            protocol = asyncio.StreamReaderProtocol(stream_reader,
                                                    loop=self.loop)
            trans, proto = yield from self.loop.create_pipe_connection(
                lambda: protocol, ADDRESS)
            self.assertIsInstance(trans, asyncio.Transport)
            self.assertEqual(protocol, proto)
            clients.append((stream_reader, trans))

        for i, (r, w) in enumerate(clients):
            w.write('lower-{}\n'.format(i).encode())

        for i, (r, w) in enumerate(clients):
            response = yield from r.readline()
            self.assertEqual(response, 'LOWER-{}\n'.format(i).encode())
            w.close()

        server.close()

        with self.assertRaises(FileNotFoundError):
            yield from self.loop.create_pipe_connection(
                asyncio.Protocol, ADDRESS)

        return 'done'
项目:aiosmtpd    作者:aio-libs    | 项目源码 | 文件源码
def _client_connected_cb(self, reader, writer):
        # This is redundant since we subclass StreamReaderProtocol, but I like
        # the shorter names.
        self._reader = reader
        self._writer = writer
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_read_all_from_pipe_reader(self):
        # See Tulip issue 168.  This test is derived from the example
        # subprocess_attach_read_pipe.py, but we configure the
        # StreamReader's limit so that twice it is less than the size
        # of the data writter.  Also we must explicitly attach a child
        # watcher to the event loop.

        code = """\
import os, sys
fd = int(sys.argv[1])
os.write(fd, b'data')
os.close(fd)
"""
        rfd, wfd = os.pipe()
        args = [sys.executable, '-c', code, str(wfd)]

        pipe = open(rfd, 'rb', 0)
        reader = asyncio.StreamReader(loop=self.loop, limit=1)
        protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop)
        transport, _ = self.loop.run_until_complete(
            self.loop.connect_read_pipe(lambda: protocol, pipe))

        watcher = asyncio.SafeChildWatcher()
        watcher.attach_loop(self.loop)
        try:
            asyncio.set_child_watcher(watcher)
            proc = self.loop.run_until_complete(
                asyncio.create_subprocess_exec(*args, pass_fds={wfd}, loop=self.loop))
            self.loop.run_until_complete(proc.wait())
        finally:
            asyncio.set_child_watcher(None)

        os.close(wfd)
        data = self.loop.run_until_complete(reader.read(-1))
        self.assertEqual(data, b'data')
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def _test_pipe(self):
        ADDRESS = r'\\.\pipe\_test_pipe-%s' % os.getpid()

        with self.assertRaises(FileNotFoundError):
            yield from self.loop.create_pipe_connection(
                asyncio.Protocol, ADDRESS)

        [server] = yield from self.loop.start_serving_pipe(
            UpperProto, ADDRESS)
        self.assertIsInstance(server, windows_events.PipeServer)

        clients = []
        for i in range(5):
            stream_reader = asyncio.StreamReader(loop=self.loop)
            protocol = asyncio.StreamReaderProtocol(stream_reader)
            trans, proto = yield from self.loop.create_pipe_connection(
                lambda: protocol, ADDRESS)
            self.assertIsInstance(trans, asyncio.Transport)
            self.assertEqual(protocol, proto)
            clients.append((stream_reader, trans))

        for i, (r, w) in enumerate(clients):
            w.write('lower-{}\n'.format(i).encode())

        for i, (r, w) in enumerate(clients):
            response = yield from r.readline()
            self.assertEqual(response, 'LOWER-{}\n'.format(i).encode())
            w.close()

        server.close()

        with self.assertRaises(FileNotFoundError):
            yield from self.loop.create_pipe_connection(
                asyncio.Protocol, ADDRESS)

        return 'done'
项目:async-streams    作者:asvetlov    | 项目源码 | 文件源码
def test_read_all_from_pipe_reader(self):
    # See asyncio issue 168.  This test is derived from the example
    # subprocess_attach_read_pipe.py, but we configure the
    # StreamReader's limit so that twice it is less than the size
    # of the data writter.  Also we must explicitly attach a child
    # watcher to the event loop.

    code = """\
import os, sys
fd = int(sys.argv[1])
os.write(fd, b'data')
os.close(fd)
"""
    rfd, wfd = os.pipe()
    args = [sys.executable, '-c', code, str(wfd)]

    pipe = open(rfd, 'rb', 0)
    reader = asyncio.StreamReader(loop=self.loop, limit=1)
    protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop)
    transport, _ = self.loop.run_until_complete(
        self.loop.connect_read_pipe(lambda: protocol, pipe))

    watcher = asyncio.SafeChildWatcher()
    watcher.attach_loop(self.loop)
    try:
        asyncio.set_child_watcher(watcher)
        create = asyncio.create_subprocess_exec(*args,
                                                pass_fds={wfd},
                                                loop=self.loop)
        proc = self.loop.run_until_complete(create)
        self.loop.run_until_complete(proc.wait())
    finally:
        asyncio.set_child_watcher(None)

    os.close(wfd)
    data = self.loop.run_until_complete(reader.read(-1))
    self.assertEqual(data, b'data')
项目:async-streams    作者:asvetlov    | 项目源码 | 文件源码
def test_streamreader_constructor(self):
    self.addCleanup(asyncio.set_event_loop, None)
    asyncio.set_event_loop(self.loop)

    # asyncio issue #184: Ensure that StreamReaderProtocol constructor
    # retrieves the current loop if the loop parameter is not set
    reader = asyncio.StreamReader()
    self.assertIs(reader._loop, self.loop)
项目:async-streams    作者:asvetlov    | 项目源码 | 文件源码
def test_streamreaderprotocol_constructor(self):
    self.addCleanup(asyncio.set_event_loop, None)
    asyncio.set_event_loop(self.loop)

    # asyncio issue #184: Ensure that StreamReaderProtocol constructor
    # retrieves the current loop if the loop parameter is not set
    reader = mock.Mock()
    protocol = asyncio.StreamReaderProtocol(reader)
    self.assertIs(protocol._loop, self.loop)
项目:datapackage-pipelines    作者:frictionlessdata    | 项目源码 | 文件源码
def collect_stats(infile):
    reader = asyncio.StreamReader()
    reader_protocol = asyncio.StreamReaderProtocol(reader)
    transport, _ = await asyncio.get_event_loop() \
        .connect_read_pipe(lambda: reader_protocol, infile)
    count = 0
    dp = None
    stats = None
    while True:
        try:
            line = await reader.readline()
        except ValueError:
            logging.exception('Too large stats object!')
            break
        if line == b'':
            break
        stats = line
        if dp is None:
            try:
                dp = json.loads(line.decode('ascii'))
            except JSONDecodeError:
                break
        count += 1

    transport.close()

    if dp is None or count == 0:
        return {}

    try:
        stats = json.loads(stats.decode('ascii'))
    except JSONDecodeError:
        stats = {}

    return stats
项目:shanghai    作者:chireiden    | 项目源码 | 文件源码
def stdin_reader(loop: asyncio.AbstractEventLoop,
                       input_handler: Callable[[str], Awaitable]
                       ) -> None:
    if sys.platform == 'win32':
        # Windows can't use SelectorEventLoop.connect_read_pipe
        # and ProactorEventLoop.connect_read_pipe apparently
        # doesn't work with sys.* streams or files.
        # http://stackoverflow.com/questions/31510190/aysncio-cannot-read-stdin-on-windows
        #
        # Running polling in an executor (thread) doesn't work properly either
        # since there is absolutely no way to stop the executor (sys.stdin.readline)
        # and make the program terminate.
        # So instead, we spawn a custom daemon thread.
        # Fuck yeah asyncio!
        import threading
        thread_close_evt = asyncio.Event()

        def reader_thread():
            while True:
                try:
                    line = sys.stdin.readline()
                except KeyboardInterrupt:
                    break
                if not line:
                    break
                loop.call_soon_threadsafe(lambda: loop.create_task(input_handler(line)))

            loop.call_soon_threadsafe(lambda: thread_close_evt.set())

        threading.Thread(target=reader_thread, daemon=True).start()
        await thread_close_evt.wait()

    else:
        reader = asyncio.StreamReader()
        make_protocol = partial(asyncio.StreamReaderProtocol, reader)
        await loop.connect_read_pipe(make_protocol, sys.stdin)

        while True:
            line_bytes = await reader.readline()
            line = line_bytes.decode(sys.stdin.encoding)
            if not line:
                break
            loop.create_task(input_handler(line))

        print("stdin stream closed")