Python io 模块,close() 实例源码

我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用io.close()

项目:RealtimePythonChat    作者:quangtqag    | 项目源码 | 文件源码
def close(self):
        if self._closed:
            return
        self.flush()
        self._closed = True
        if self._readable:
            self.hub.cancel_wait(self._read_event, cancel_wait_ex)
        if self._writable:
            self.hub.cancel_wait(self._write_event, cancel_wait_ex)
        fileno = self._fileno
        if self._closefd:
            self._fileno = None
            os.close(fileno)

    # RawIOBase provides a 'read' method that will call readall() if
    # the `size` was missing or -1 and otherwise call readinto(). We
    # want to take advantage of this to avoid single byte reads when
    # possible. This is highlighted by a bug in BufferedIOReader that
    # calls read() in a loop when its readall() method is invoked;
    # this was fixed in Python 3.3. See
    # https://github.com/gevent/gevent/issues/675)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def close(self):
        if self._closed:
            return
        self.flush()
        self._closed = True
        if self._readable:
            self.hub.cancel_wait(self._read_event, cancel_wait_ex)
        if self._writable:
            self.hub.cancel_wait(self._write_event, cancel_wait_ex)
        fileno = self._fileno
        if self._closefd:
            self._fileno = None
            os.close(fileno)

    # RawIOBase provides a 'read' method that will call readall() if
    # the `size` was missing or -1 and otherwise call readinto(). We
    # want to take advantage of this to avoid single byte reads when
    # possible. This is highlighted by a bug in BufferedIOReader that
    # calls read() in a loop when its readall() method is invoked;
    # this was fixed in Python 3.3. See
    # https://github.com/gevent/gevent/issues/675)
项目:RealtimePythonChat    作者:quangtqag    | 项目源码 | 文件源码
def _do_close(self, io, closefd):
        try:
            io.close()
            # self.fileio already knows whether or not to close the
            # file descriptor
            self.fileio.close()
        finally:
            self._fobj = None
            self.fileio = None
项目:ruffruffs    作者:di    | 项目源码 | 文件源码
def test_response_is_iterable(self):
        r = requests.Response()
        io = StringIO.StringIO('abc')
        read_ = io.read

        def read_mock(amt, decode_content=None):
            return read_(amt)
        setattr(io, 'read', read_mock)
        r.raw = io
        assert next(iter(r))
        io.close()
项目:ruffruffs    作者:di    | 项目源码 | 文件源码
def test_session_close_proxy_clear(self, mocker):
        proxies = {
          'one': mocker.Mock(),
          'two': mocker.Mock(),
        }
        session = requests.Session()
        mocker.patch.dict(session.adapters['http://'].proxy_manager, proxies)
        session.close()
        proxies['one'].clear.assert_called_once_with()
        proxies['two'].clear.assert_called_once_with()
项目:filegardener    作者:smorin    | 项目源码 | 文件源码
def test_response_is_iterable(self):
        r = requests.Response()
        io = StringIO.StringIO('abc')
        read_ = io.read

        def read_mock(amt, decode_content=None):
            return read_(amt)
        setattr(io, 'read', read_mock)
        r.raw = io
        assert next(iter(r))
        io.close()
项目:filegardener    作者:smorin    | 项目源码 | 文件源码
def test_session_close_proxy_clear(self, mocker):
        proxies = {
          'one': mocker.Mock(),
          'two': mocker.Mock(),
        }
        session = requests.Session()
        mocker.patch.dict(session.adapters['http://'].proxy_manager, proxies)
        session.close()
        proxies['one'].clear.assert_called_once_with()
        proxies['two'].clear.assert_called_once_with()
项目:filegardener    作者:smorin    | 项目源码 | 文件源码
def test_response_is_iterable(self):
        r = requests.Response()
        io = StringIO.StringIO('abc')
        read_ = io.read

        def read_mock(amt, decode_content=None):
            return read_(amt)
        setattr(io, 'read', read_mock)
        r.raw = io
        assert next(iter(r))
        io.close()
项目:filegardener    作者:smorin    | 项目源码 | 文件源码
def test_session_close_proxy_clear(self, mocker):
        proxies = {
          'one': mocker.Mock(),
          'two': mocker.Mock(),
        }
        session = requests.Session()
        mocker.patch.dict(session.adapters['http://'].proxy_manager, proxies)
        session.close()
        proxies['one'].clear.assert_called_once_with()
        proxies['two'].clear.assert_called_once_with()
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def _do_close(self, io, closefd):
        try:
            io.close()
            # self.fileio already knows whether or not to close the
            # file descriptor
            self.fileio.close()
        finally:
            self._fobj = None
            self.fileio = None
项目:Codeforces-Sublime-Plugin    作者:karunk    | 项目源码 | 文件源码
def test_response_is_iterable(self):
        r = requests.Response()
        io = StringIO.StringIO('abc')
        read_ = io.read

        def read_mock(amt, decode_content=None):
            return read_(amt)
        setattr(io, 'read', read_mock)
        r.raw = io
        assert next(iter(r))
        io.close()