Python asyncio 模块,set_child_watcher() 实例源码

我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用asyncio.set_child_watcher()

项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def setUp(self):
            super().setUp()
            watcher = asyncio.SafeChildWatcher()
            watcher.attach_loop(self.loop)
            asyncio.set_child_watcher(watcher)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def tearDown(self):
            asyncio.set_child_watcher(None)
            super().tearDown()
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_event_loop_policy(self):
        policy = asyncio.AbstractEventLoopPolicy()
        self.assertRaises(NotImplementedError, policy.get_event_loop)
        self.assertRaises(NotImplementedError, policy.set_event_loop, object())
        self.assertRaises(NotImplementedError, policy.new_event_loop)
        self.assertRaises(NotImplementedError, policy.get_child_watcher)
        self.assertRaises(NotImplementedError, policy.set_child_watcher,
                          object())
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_read_all_from_pipe_reader(self):
        # See Tulip issue 168.  This test is derived from the example
        # subprocess_attach_read_pipe.py, but we configure the
        # StreamReader's limit so that twice it is less than the size
        # of the data writter.  Also we must explicitly attach a child
        # watcher to the event loop.

        code = """\
import os, sys
fd = int(sys.argv[1])
os.write(fd, b'data')
os.close(fd)
"""
        rfd, wfd = os.pipe()
        args = [sys.executable, '-c', code, str(wfd)]

        pipe = open(rfd, 'rb', 0)
        reader = asyncio.StreamReader(loop=self.loop, limit=1)
        protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop)
        transport, _ = self.loop.run_until_complete(
            self.loop.connect_read_pipe(lambda: protocol, pipe))

        watcher = asyncio.SafeChildWatcher()
        watcher.attach_loop(self.loop)
        try:
            asyncio.set_child_watcher(watcher)
            create = asyncio.create_subprocess_exec(*args,
                                                    pass_fds={wfd},
                                                    loop=self.loop)
            proc = self.loop.run_until_complete(create)
            self.loop.run_until_complete(proc.wait())
        finally:
            asyncio.set_child_watcher(None)

        os.close(wfd)
        data = self.loop.run_until_complete(reader.read(-1))
        self.assertEqual(data, b'data')
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def setUp(self):
            super().setUp()
            watcher = asyncio.SafeChildWatcher()
            watcher.attach_loop(self.loop)
            asyncio.set_child_watcher(watcher)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def tearDown(self):
            asyncio.set_child_watcher(None)
            super().tearDown()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_event_loop_policy(self):
        policy = asyncio.AbstractEventLoopPolicy()
        self.assertRaises(NotImplementedError, policy.get_event_loop)
        self.assertRaises(NotImplementedError, policy.set_event_loop, object())
        self.assertRaises(NotImplementedError, policy.new_event_loop)
        self.assertRaises(NotImplementedError, policy.get_child_watcher)
        self.assertRaises(NotImplementedError, policy.set_child_watcher,
                          object())
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_read_all_from_pipe_reader(self):
        # See asyncio issue 168.  This test is derived from the example
        # subprocess_attach_read_pipe.py, but we configure the
        # StreamReader's limit so that twice it is less than the size
        # of the data writter.  Also we must explicitly attach a child
        # watcher to the event loop.

        code = """\
import os, sys
fd = int(sys.argv[1])
os.write(fd, b'data')
os.close(fd)
"""
        rfd, wfd = os.pipe()
        args = [sys.executable, '-c', code, str(wfd)]

        pipe = open(rfd, 'rb', 0)
        reader = asyncio.StreamReader(loop=self.loop, limit=1)
        protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop)
        transport, _ = self.loop.run_until_complete(
            self.loop.connect_read_pipe(lambda: protocol, pipe))

        watcher = asyncio.SafeChildWatcher()
        watcher.attach_loop(self.loop)
        try:
            asyncio.set_child_watcher(watcher)
            create = asyncio.create_subprocess_exec(*args,
                                                    pass_fds={wfd},
                                                    loop=self.loop)
            proc = self.loop.run_until_complete(create)
            self.loop.run_until_complete(proc.wait())
        finally:
            asyncio.set_child_watcher(None)

        os.close(wfd)
        data = self.loop.run_until_complete(reader.read(-1))
        self.assertEqual(data, b'data')
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def setUp(self):
        super().setUp()

        watcher = asyncio.SafeChildWatcher()
        watcher.attach_loop(self.loop)
        asyncio.set_child_watcher(watcher)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def tearDown(self):
        asyncio.set_child_watcher(None)
        super().tearDown()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def setUp(self):
            super().setUp()
            watcher = asyncio.SafeChildWatcher()
            watcher.attach_loop(self.loop)
            asyncio.set_child_watcher(watcher)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def tearDown(self):
            asyncio.set_child_watcher(None)
            super().tearDown()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_event_loop_policy(self):
        policy = asyncio.AbstractEventLoopPolicy()
        self.assertRaises(NotImplementedError, policy.get_event_loop)
        self.assertRaises(NotImplementedError, policy.set_event_loop, object())
        self.assertRaises(NotImplementedError, policy.new_event_loop)
        self.assertRaises(NotImplementedError, policy.get_child_watcher)
        self.assertRaises(NotImplementedError, policy.set_child_watcher,
                          object())
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_read_all_from_pipe_reader(self):
        # See Tulip issue 168.  This test is derived from the example
        # subprocess_attach_read_pipe.py, but we configure the
        # StreamReader's limit so that twice it is less than the size
        # of the data writter.  Also we must explicitly attach a child
        # watcher to the event loop.

        code = """\
import os, sys
fd = int(sys.argv[1])
os.write(fd, b'data')
os.close(fd)
"""
        rfd, wfd = os.pipe()
        args = [sys.executable, '-c', code, str(wfd)]

        pipe = open(rfd, 'rb', 0)
        reader = asyncio.StreamReader(loop=self.loop, limit=1)
        protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop)
        transport, _ = self.loop.run_until_complete(
            self.loop.connect_read_pipe(lambda: protocol, pipe))

        watcher = asyncio.SafeChildWatcher()
        watcher.attach_loop(self.loop)
        try:
            asyncio.set_child_watcher(watcher)
            proc = self.loop.run_until_complete(
                asyncio.create_subprocess_exec(*args, pass_fds={wfd}, loop=self.loop))
            self.loop.run_until_complete(proc.wait())
        finally:
            asyncio.set_child_watcher(None)

        os.close(wfd)
        data = self.loop.run_until_complete(reader.read(-1))
        self.assertEqual(data, b'data')
项目:async-streams    作者:asvetlov    | 项目源码 | 文件源码
def test_read_all_from_pipe_reader(self):
    # See asyncio issue 168.  This test is derived from the example
    # subprocess_attach_read_pipe.py, but we configure the
    # StreamReader's limit so that twice it is less than the size
    # of the data writter.  Also we must explicitly attach a child
    # watcher to the event loop.

    code = """\
import os, sys
fd = int(sys.argv[1])
os.write(fd, b'data')
os.close(fd)
"""
    rfd, wfd = os.pipe()
    args = [sys.executable, '-c', code, str(wfd)]

    pipe = open(rfd, 'rb', 0)
    reader = asyncio.StreamReader(loop=self.loop, limit=1)
    protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop)
    transport, _ = self.loop.run_until_complete(
        self.loop.connect_read_pipe(lambda: protocol, pipe))

    watcher = asyncio.SafeChildWatcher()
    watcher.attach_loop(self.loop)
    try:
        asyncio.set_child_watcher(watcher)
        create = asyncio.create_subprocess_exec(*args,
                                                pass_fds={wfd},
                                                loop=self.loop)
        proc = self.loop.run_until_complete(create)
        self.loop.run_until_complete(proc.wait())
    finally:
        asyncio.set_child_watcher(None)

    os.close(wfd)
    data = self.loop.run_until_complete(reader.read(-1))
    self.assertEqual(data, b'data')