Python os 模块,set_blocking() 实例源码

我们从Python开源项目中,提取了以下9个代码示例,用于说明如何使用os.set_blocking()

项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def __init__(self, args, **kwargs):
        if 'universal_newlines' in kwargs:
            raise RuntimeError('universal_newlines argument not supported')

        # If stdin has been given and it's set to a curio FileStream object,
        # then we need to flip it to blocking.
        if 'stdin' in kwargs:
            stdin = kwargs['stdin']
            if isinstance(stdin, FileStream):
                # At hell's heart I stab thy coroutine attempting to read from a stream
                # that's been used as a pipe input to a subprocess.  Must set back to
                # blocking or all hell breaks loose in the child.
                os.set_blocking(stdin.fileno(), True)

        self._popen = subprocess.Popen(args, **kwargs)

        if self._popen.stdin:
            self.stdin = FileStream(self._popen.stdin)
        if self._popen.stdout:
            self.stdout = FileStream(self._popen.stdout)
        if self._popen.stderr:
            self.stderr = FileStream(self._popen.stderr)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def _set_nonblocking(fd):
        os.set_blocking(fd, False)
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def check_reopen(r1, w):
    try:
        print("Reopening read end")
        r2 = os.open("/proc/self/fd/{}".format(r1), os.O_RDONLY)

        print("r1 is {}, r2 is {}".format(r1, r2))

        print("checking they both can receive from w...")

        os.write(w, b"a")
        assert os.read(r1, 1) == b"a"

        os.write(w, b"b")
        assert os.read(r2, 1) == b"b"

        print("...ok")

        print("setting r2 to non-blocking")
        os.set_blocking(r2, False)

        print("os.get_blocking(r1) ==", os.get_blocking(r1))
        print("os.get_blocking(r2) ==", os.get_blocking(r2))

        # Check r2 is really truly non-blocking
        try:
            os.read(r2, 1)
        except BlockingIOError:
            print("r2 definitely seems to be in non-blocking mode")

        # Check that r1 is really truly still in blocking mode
        def sleep_then_write():
            time.sleep(1)
            os.write(w, b"c")
        threading.Thread(target=sleep_then_write, daemon=True).start()
        assert os.read(r1, 1) == b"c"
        print("r1 definitely seems to be in blocking mode")
    except Exception as exc:
        print("ERROR: {!r}".format(exc))
项目:golightan    作者:shirou    | 项目源码 | 文件源码
def _set_nonblocking(fd):
        os.set_blocking(fd, False)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _set_nonblocking(fd):
        os.set_blocking(fd, False)
项目:sdk-samples    作者:cradlepoint    | 项目源码 | 文件源码
def __init__(self, fd, map=None):
            dispatcher.__init__(self, None, map)
            self.connected = True
            try:
                fd = fd.fileno()
            except AttributeError:
                pass
            self.set_file(fd)
            # set it to non-blocking mode
            os.set_blocking(fd, False)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def __init__(self, fileobj):
        assert not isinstance(fileobj, io.TextIOBase), 'Only binary mode files allowed'
        super().__init__(fileobj)
        os.set_blocking(int(self._fileno), False)

        # Common bound methods
        self._file_read = fileobj.read
        self._file_write = fileobj.write
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def blocking(self):
        '''
        Allow temporary access to the underlying file in blocking mode
        '''
        if self._buffer:
            raise IOError('There is unread buffered data.')
        try:
            os.set_blocking(int(self._fileno), True)
            yield self._file
        finally:
            os.set_blocking(int(self._fileno), False)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def _set_nonblocking(fd):
        os.set_blocking(fd, False)