Python io 模块,BufferedIOBase() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用io.BufferedIOBase()

项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def readline(self, size=-1):
        """Read a line of uncompressed bytes from the file.

        The terminating newline (if present) is retained. If size is
        non-negative, no more than size bytes will be read (in which
        case the line may be incomplete). Returns b'' if already at EOF.
        """
        self._check_can_read()
        # Shortcut for the common case - the whole line is in the buffer.
        if size < 0:
            end = self._buffer.find(b"\n", self._buffer_offset) + 1
            if end > 0:
                line = self._buffer[self._buffer_offset : end]
                self._buffer_offset = end
                self._pos += len(line)
                return line
        return io.BufferedIOBase.readline(self, size)
项目:pyimc    作者:oysstu    | 项目源码 | 文件源码
def __init__(self, lsf_path: str, types: List[Type[pyimc.Message]] = None, make_index=True):
        """
        Reads an LSF file.
        :param lsf_path: The path to the LSF file.
        :param types: The message types to return. List of pyimc message classes.
        :param make_index: If true, an index that speeds up subsequent reads is created.
        """
        self.fpath = lsf_path
        self.f = None  # type: io.BufferedIOBase
        self.header = IMCHeader()  # Preallocate header buffer
        self.parser = pyimc.Parser()
        self.idx = {}  # type: Dict[Union[int, str], List[int]]
        self.make_index = make_index

        if types:
            self.msg_types = [pyimc.Factory.id_from_abbrev(x.__name__) for x in types]
        else:
            self.msg_types = None
项目:wechat-async-sdk    作者:ihjmh    | 项目源码 | 文件源码
def _upload_media_py3(self, media_type, media_file, extension=''):
        if isinstance(media_file, io.IOBase) and hasattr(media_file, 'name'):
            extension = media_file.name.split('.')[-1].lower()
            if not is_allowed_extension(extension):
                raise ValueError('Invalid file type.')
            filename = media_file.name
        elif isinstance(media_file, io.BytesIO):
            extension = extension.lower()
            if not is_allowed_extension(extension):
                raise ValueError('Please provide \'extension\' parameters when the type of \'media_file\' is \'io.BytesIO\'.')
            filename = 'temp.' + extension
        else:
            raise ValueError('Parameter media_file must be io.BufferedIOBase(open a file with \'rb\') or io.BytesIO object.')

        return self.request.post(
            url='https://api.weixin.qq.com/cgi-bin/media/upload',
            params={
                'type': media_type,
            },
            files={
                'media': (filename, media_file, convert_ext_to_mime(extension))
            }
        )
项目:WXBotForPi    作者:nemoTyrant    | 项目源码 | 文件源码
def _get_writable(stream_or_path, mode):
    """This method returns a tuple containing the stream and a flag to indicate
    if the stream should be automatically closed.

    The `stream_or_path` parameter is returned if it is an open writable stream.
    Otherwise, it treats the `stream_or_path` parameter as a file path and
    opens it with the given mode.

    It is used by the svg and png methods to interpret the file parameter.

    :type stream_or_path: str | io.BufferedIOBase
    :type mode: str | unicode
    :rtype: (io.BufferedIOBase, bool)
    """
    is_stream = hasattr(stream_or_path, 'write')
    if not is_stream:
        # No stream provided, treat "stream_or_path" as path
        stream_or_path = open(stream_or_path, mode)
    return stream_or_path, not is_stream
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def readline(self, size=-1):
        """Read a line of uncompressed bytes from the file.

        The terminating newline (if present) is retained. If size is
        non-negative, no more than size bytes will be read (in which
        case the line may be incomplete). Returns b'' if already at EOF.
        """
        if not isinstance(size, int):
            if not hasattr(size, "__index__"):
                raise TypeError("Integer argument expected")
            size = size.__index__()
        with self._lock:
            self._check_can_read()
            # Shortcut for the common case - the whole line is in the buffer.
            if size < 0:
                end = self._buffer.find(b"\n", self._buffer_offset) + 1
                if end > 0:
                    line = self._buffer[self._buffer_offset : end]
                    self._buffer_offset = end
                    self._pos += len(line)
                    return line
            return io.BufferedIOBase.readline(self, size)
项目:gitsome    作者:donnemartin    | 项目源码 | 文件源码
def _pipe_stdin(self, stdin):
        if stdin is None or isinstance(stdin, io.FileIO):
            return None
        tsi = self._temp_stdin
        bufsize = self.bufsize
        if isinstance(stdin, io.BufferedIOBase):
            buf = stdin.read(bufsize)
            while len(buf) != 0:
                tsi.write(buf)
                tsi.flush()
                buf = stdin.read(bufsize)
        elif isinstance(stdin, (str, bytes)):
            raw = stdin.encode() if isinstance(stdin, str) else stdin
            for i in range((len(raw)//bufsize) + 1):
                tsi.write(raw[i*bufsize:(i + 1)*bufsize])
                tsi.flush()
        else:
            raise ValueError('stdin not understood {0!r}'.format(stdin))
项目:rank_biased_precision_implementation    作者:vatsal-sodha    | 项目源码 | 文件源码
def write(fobj, content, convert=True):
        """
        Utility function used to write content to a file. Allow compatibility
        between Python versions.
        """

        # This function automatically converts strings to bytes
        # if running under Python 3. Otherwise we cannot write
        # to a file.

        # First detect whether fobj requires binary stream
        if hasattr(fobj, 'mode'):
            # A file-like object
            binary = 'b' in fobj.mode
        else:
            # A subclass of io.BufferedIOBase?
            binary = isinstance(fobj, io.BufferedIOBase)
        # If we are running under Python 3 and binary is required
        if sys.version_info[:2] >= (3, 0) and convert and binary:  # pragma: no cover
            content = bytes(content, 'utf-8')

        fobj.write(content)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def readline(self, size=-1):
        """Read a line of uncompressed bytes from the file.

        The terminating newline (if present) is retained. If size is
        non-negative, no more than size bytes will be read (in which
        case the line may be incomplete). Returns b'' if already at EOF.
        """
        if not isinstance(size, int):
            if not hasattr(size, "__index__"):
                raise TypeError("Integer argument expected")
            size = size.__index__()
        with self._lock:
            self._check_can_read()
            # Shortcut for the common case - the whole line is in the buffer.
            if size < 0:
                end = self._buffer.find(b"\n", self._buffer_offset) + 1
                if end > 0:
                    line = self._buffer[self._buffer_offset : end]
                    self._buffer_offset = end
                    self._pos += len(line)
                    return line
            return io.BufferedIOBase.readline(self, size)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def readline(self, size=-1):
        """Read a line of uncompressed bytes from the file.

        The terminating newline (if present) is retained. If size is
        non-negative, no more than size bytes will be read (in which
        case the line may be incomplete). Returns b'' if already at EOF.
        """
        self._check_can_read()
        # Shortcut for the common case - the whole line is in the buffer.
        if size < 0:
            end = self._buffer.find(b"\n", self._buffer_offset) + 1
            if end > 0:
                line = self._buffer[self._buffer_offset : end]
                self._buffer_offset = end
                self._pos += len(line)
                return line
        return io.BufferedIOBase.readline(self, size)
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def __init__(self, buffer):
        """
        :param buffer: Buffer
        :type buffer: io.BufferedIOBase
        """
        self.buffer = buffer
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        out = sys.stdout

    if isinstance(out, io.RawIOBase):
        buffer = io.BufferedIOBase(out)
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    # wrap a binary writer with TextIOWrapper
    return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
                                   errors='xmlcharrefreplace',
                                   newline='\n')
项目:OSPTF    作者:xSploited    | 项目源码 | 文件源码
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        out = sys.stdout

    if isinstance(out, io.RawIOBase):
        buffer = io.BufferedIOBase(out)
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    # wrap a binary writer with TextIOWrapper
    return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
                                   errors='xmlcharrefreplace',
                                   newline='\n')
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        out = sys.stdout

    if isinstance(out, io.RawIOBase):
        buffer = io.BufferedIOBase(out)
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    # wrap a binary writer with TextIOWrapper
    return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
                                   errors='xmlcharrefreplace',
                                   newline='\n')
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        out = sys.stdout

    if isinstance(out, io.RawIOBase):
        buffer = io.BufferedIOBase(out)
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    # wrap a binary writer with TextIOWrapper
    return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
                                   errors='xmlcharrefreplace',
                                   newline='\n')
项目:pupy    作者:ru-faraon    | 项目源码 | 文件源码
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        out = sys.stdout

    if isinstance(out, io.RawIOBase):
        buffer = io.BufferedIOBase(out)
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    # wrap a binary writer with TextIOWrapper
    return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
                                   errors='xmlcharrefreplace',
                                   newline='\n')
项目:cdbcli    作者:kevinjqiu    | 项目源码 | 文件源码
def output(self, text, highlighter=None):
        """Send text to the environment's output stream.
        :param text: the text to output
        :param highlighter: an optional function to colourize the text
        """
        if not self.has_pipe:  # only colourize when the output is not piped
            highlighter = highlighter or (lambda x: x)
            text = highlighter(text)
        output = "{}\n".format(text)
        if isinstance(self.output_stream, io.BufferedIOBase):
            output = bytes(output, encoding='utf-8')

        self.output_stream.write(output)
        self.output_stream.flush()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        out = sys.stdout

    if isinstance(out, io.RawIOBase):
        buffer = io.BufferedIOBase(out)
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    # wrap a binary writer with TextIOWrapper
    return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
                                   errors='xmlcharrefreplace',
                                   newline='\n')
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        out = sys.stdout

    if isinstance(out, io.RawIOBase):
        buffer = io.BufferedIOBase(out)
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    # wrap a binary writer with TextIOWrapper
    return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
                                   errors='xmlcharrefreplace',
                                   newline='\n')
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        out = sys.stdout

    if isinstance(out, io.RawIOBase):
        buffer = io.BufferedIOBase(out)
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    # wrap a binary writer with TextIOWrapper
    return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
                                   errors='xmlcharrefreplace',
                                   newline='\n')
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_io_buffered_by_default(self):
        p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
                             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
                             stderr=subprocess.PIPE)
        try:
            self.assertIsInstance(p.stdin, io.BufferedIOBase)
            self.assertIsInstance(p.stdout, io.BufferedIOBase)
            self.assertIsInstance(p.stderr, io.BufferedIOBase)
        finally:
            p.stdin.close()
            p.stdout.close()
            p.stderr.close()
            p.wait()
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        return sys.stdout

    if isinstance(out, io.TextIOBase):
        # use a text writer as is
        return out

    # wrap a binary writer with TextIOWrapper
    if isinstance(out, io.RawIOBase):
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        class _wrapper:
            __class__ = out.__class__
            def __getattr__(self, name):
                return getattr(out, name)
        buffer = _wrapper()
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    return io.TextIOWrapper(buffer, encoding=encoding,
                            errors='xmlcharrefreplace',
                            newline='\n',
                            write_through=True)
项目:mlens    作者:flennerhag    | 项目源码 | 文件源码
def __init__(self, filename, mode="rb", compresslevel=9):
        # This lock must be recursive, so that BufferedIOBase's
        # readline(), readlines() and writelines() don't deadlock.
        self._lock = RLock()
        self._fp = None
        self._closefp = False
        self._mode = _MODE_CLOSED
        self._pos = 0
        self._size = -1

        if not isinstance(compresslevel, int) or not (1 <= compresslevel <= 9):
            raise ValueError("'compresslevel' must be an integer "
                             "between 1 and 9. You provided 'compresslevel={}'"
                             .format(compresslevel))

        if mode == "rb":
            mode_code = _MODE_READ
            self._decompressor = zlib.decompressobj(self.wbits)
            self._buffer = b""
            self._buffer_offset = 0
        elif mode == "wb":
            mode_code = _MODE_WRITE
            self._compressor = zlib.compressobj(compresslevel,
                                                zlib.DEFLATED,
                                                self.wbits,
                                                zlib.DEF_MEM_LEVEL,
                                                0)
        else:
            raise ValueError("Invalid mode: %r" % (mode,))

        if isinstance(filename, _basestring):
            self._fp = io.open(filename, mode)
            self._closefp = True
            self._mode = mode_code
        elif hasattr(filename, "read") or hasattr(filename, "write"):
            self._fp = filename
            self._mode = mode_code
        else:
            raise TypeError("filename must be a str or bytes object, "
                            "or a file")
项目:mlens    作者:flennerhag    | 项目源码 | 文件源码
def readinto(self, b):
        """Read up to len(b) bytes into b.

        Returns the number of bytes read (0 for EOF).
        """
        with self._lock:
            return io.BufferedIOBase.readinto(self, b)
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        out = sys.stdout

    if isinstance(out, io.RawIOBase):
        buffer = io.BufferedIOBase(out)
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    # wrap a binary writer with TextIOWrapper
    return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
                                   errors='xmlcharrefreplace',
                                   newline='\n')
项目:odin    作者:imito    | 项目源码 | 文件源码
def is_fileobj(f):
  """ Check if an object `f` is intance of FileIO object created
  by `open()`"""
  return isinstance(f, io.TextIOBase) or \
      isinstance(f, io.BufferedIOBase) or \
      isinstance(f, io.RawIOBase) or \
      isinstance(f, io.IOBase)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        out = sys.stdout

    if isinstance(out, io.RawIOBase):
        buffer = io.BufferedIOBase(out)
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    # wrap a binary writer with TextIOWrapper
    return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
                                   errors='xmlcharrefreplace',
                                   newline='\n')
# PyPy: moved this class outside the function above
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def readinto(self, b):
        """Read up to len(b) bytes into b.

        Returns the number of bytes read (0 for EOF).
        """
        with self._lock:
            return io.BufferedIOBase.readinto(self, b)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def readlines(self, size=-1):
        """Read a list of lines of uncompressed bytes from the file.

        size can be specified to control the number of lines read: no
        further lines will be read once the total size of the lines read
        so far equals or exceeds size.
        """
        if not isinstance(size, int):
            if not hasattr(size, "__index__"):
                raise TypeError("Integer argument expected")
            size = size.__index__()
        with self._lock:
            return io.BufferedIOBase.readlines(self, size)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def writelines(self, seq):
        """Write a sequence of byte strings to the file.

        Returns the number of uncompressed bytes written.
        seq can be any iterable yielding byte strings.

        Line separators are not added between the written byte strings.
        """
        with self._lock:
            return io.BufferedIOBase.writelines(self, seq)

    # Rewind the file to the beginning of the data stream.
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_io_buffered_by_default(self):
        p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
                             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
                             stderr=subprocess.PIPE)
        try:
            self.assertIsInstance(p.stdin, io.BufferedIOBase)
            self.assertIsInstance(p.stdout, io.BufferedIOBase)
            self.assertIsInstance(p.stderr, io.BufferedIOBase)
        finally:
            p.stdin.close()
            p.stdout.close()
            p.stderr.close()
            p.wait()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        return sys.stdout

    if isinstance(out, io.TextIOBase):
        # use a text writer as is
        return out

    if isinstance(out, (codecs.StreamWriter, codecs.StreamReaderWriter)):
        # use a codecs stream writer as is
        return out

    # wrap a binary writer with TextIOWrapper
    if isinstance(out, io.RawIOBase):
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        class _wrapper:
            __class__ = out.__class__
            def __getattr__(self, name):
                return getattr(out, name)
        buffer = _wrapper()
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    return io.TextIOWrapper(buffer, encoding=encoding,
                            errors='xmlcharrefreplace',
                            newline='\n',
                            write_through=True)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        out = sys.stdout

    if isinstance(out, io.RawIOBase):
        buffer = io.BufferedIOBase(out)
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    # wrap a binary writer with TextIOWrapper
    class UnbufferedTextIOWrapper(io.TextIOWrapper):
        def write(self, s):
            super(UnbufferedTextIOWrapper, self).write(s)
            self.flush()
    return UnbufferedTextIOWrapper(buffer, encoding=encoding,
                                   errors='xmlcharrefreplace',
                                   newline='\n')
项目:empyrion-python-api    作者:huhlig    | 项目源码 | 文件源码
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        out = sys.stdout

    if isinstance(out, io.RawIOBase):
        buffer = io.BufferedIOBase(out)
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    # wrap a binary writer with TextIOWrapper
    return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
                                   errors='xmlcharrefreplace',
                                   newline='\n')
项目:gitsome    作者:donnemartin    | 项目源码 | 文件源码
def _stdin_filename(self, stdin):
        if stdin is None: 
            rtn = None
        elif isinstance(stdin, io.FileIO) and os.path.isfile(stdin.name):
            rtn = stdin.name
        elif isinstance(stdin, (io.BufferedIOBase, str, bytes)):
            self._temp_stdin = tsi = tempfile.NamedTemporaryFile()
            rtn = tsi.name
        else:
            raise ValueError('stdin not understood {0!r}'.format(stdin))
        return rtn
项目:TransIP-STACK-API    作者:Paradoxis    | 项目源码 | 文件源码
def download_into(self, file: str, buffer: Union[BufferedIOBase, BytesIO]=None, remote_path: str=None) -> IOBase:
        """
        Download a file from your STACK account
        :param file: File name to download
        :param remote_path: Path to find the file in
        :param buffer: Buffer to download into (BytesIO, StringIO, file pointer)
        :return: BytesIO buffer
        """
        if not remote_path:
            remote_path = self.__cwd

        file = join(remote_path, file.lstrip("/"))

        if not buffer:
            buffer = BytesIO()

        if not isinstance(buffer, BufferedIOBase):
            raise StackException("Download buffer must be a binary IO type, please use BytesIO or open your file in 'rb' mode.")

        try:
            self.webdav.download_to(buffer, file.lstrip("/"))
            buffer.seek(0)
            return buffer

        except WebDavException as e:
            raise StackException(e)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def readinto(self, b):
        """Read up to len(b) bytes into b.

        Returns the number of bytes read (0 for EOF).
        """
        with self._lock:
            return io.BufferedIOBase.readinto(self, b)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def readlines(self, size=-1):
        """Read a list of lines of uncompressed bytes from the file.

        size can be specified to control the number of lines read: no
        further lines will be read once the total size of the lines read
        so far equals or exceeds size.
        """
        if not isinstance(size, int):
            if not hasattr(size, "__index__"):
                raise TypeError("Integer argument expected")
            size = size.__index__()
        with self._lock:
            return io.BufferedIOBase.readlines(self, size)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def writelines(self, seq):
        """Write a sequence of byte strings to the file.

        Returns the number of uncompressed bytes written.
        seq can be any iterable yielding byte strings.

        Line separators are not added between the written byte strings.
        """
        with self._lock:
            return io.BufferedIOBase.writelines(self, seq)

    # Rewind the file to the beginning of the data stream.
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_io_buffered_by_default(self):
        p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
                             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
                             stderr=subprocess.PIPE)
        try:
            self.assertIsInstance(p.stdin, io.BufferedIOBase)
            self.assertIsInstance(p.stdout, io.BufferedIOBase)
            self.assertIsInstance(p.stderr, io.BufferedIOBase)
        finally:
            p.stdin.close()
            p.stdout.close()
            p.stderr.close()
            p.wait()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        return sys.stdout

    if isinstance(out, io.TextIOBase):
        # use a text writer as is
        return out

    if isinstance(out, (codecs.StreamWriter, codecs.StreamReaderWriter)):
        # use a codecs stream writer as is
        return out

    # wrap a binary writer with TextIOWrapper
    if isinstance(out, io.RawIOBase):
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        class _wrapper:
            __class__ = out.__class__
            def __getattr__(self, name):
                return getattr(out, name)
        buffer = _wrapper()
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    return io.TextIOWrapper(buffer, encoding=encoding,
                            errors='xmlcharrefreplace',
                            newline='\n',
                            write_through=True)
项目:Docker-XX-Net    作者:kuanghy    | 项目源码 | 文件源码
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        out = sys.stdout

    if isinstance(out, io.RawIOBase):
        buffer = io.BufferedIOBase(out)
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    # wrap a binary writer with TextIOWrapper
    return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
                                   errors='xmlcharrefreplace',
                                   newline='\n')
项目:kind2anki    作者:prz3m    | 项目源码 | 文件源码
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        out = sys.stdout

    if isinstance(out, io.RawIOBase):
        buffer = io.BufferedIOBase(out)
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    # wrap a binary writer with TextIOWrapper
    return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
                                   errors='xmlcharrefreplace',
                                   newline='\n')
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def readline(self, limit=-1):
        """Read and return a line from the stream.

        If limit is specified, at most limit bytes will be read.
        """

        if not self._universal and limit < 0:
            # Shortcut common case - newline found in buffer.
            i = self._readbuffer.find('\n', self._offset) + 1
            if i > 0:
                line = self._readbuffer[self._offset: i]
                self._offset = i
                return line

        if not self._universal:
            return io.BufferedIOBase.readline(self, limit)

        line = ''
        while limit < 0 or len(line) < limit:
            readahead = self.peek(2)
            if readahead == '':
                return line

            #
            # Search for universal newlines or line chunks.
            #
            # The pattern returns either a line chunk or a newline, but not
            # both. Combined with peek(2), we are assured that the sequence
            # '\r\n' is always retrieved completely and never split into
            # separate newlines - '\r', '\n' due to coincidental readaheads.
            #
            match = self.PATTERN.search(readahead)
            newline = match.group('newline')
            if newline is not None:
                if self.newlines is None:
                    self.newlines = []
                if newline not in self.newlines:
                    self.newlines.append(newline)
                self._offset += len(newline)
                return line + '\n'

            chunk = match.group('chunk')
            if limit >= 0:
                chunk = chunk[: limit - len(line)]

            self._offset += len(chunk)
            line += chunk

        return line
项目:pyfc4    作者:ghukill    | 项目源码 | 文件源码
def _prep_binary_content(self):

        '''     
        Sets delivery method of either payload or header
        Favors Content-Location header if set

        Args:
            None

        Returns:
            None: sets attributes in self.binary and headers
        '''

        # nothing present
        if not self.data and not self.location and 'Content-Location' not in self.resource.headers.keys():
            raise Exception('creating/updating NonRDFSource requires content from self.binary.data, self.binary.location, or the Content-Location header')

        elif 'Content-Location' in self.resource.headers.keys():
            logger.debug('Content-Location header found, using')
            self.delivery = 'header'

        # if Content-Location is not set, look for self.data_location then self.data
        elif 'Content-Location' not in self.resource.headers.keys():

            # data_location set, trumps Content self.data
            if self.location:
                # set appropriate header
                self.resource.headers['Content-Location'] = self.location
                self.delivery = 'header'

            # data attribute is plain text, binary, or file-like object
            elif self.data:

                # if file-like object, set flag for api.http_request
                if isinstance(self.data, io.BufferedIOBase):
                    logger.debug('detected file-like object')
                    self.delivery = 'payload'

                # else, just bytes
                else:
                    logger.debug('detected bytes')
                    self.delivery = 'payload'
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def readline(self, limit=-1):
        """Read and return a line from the stream.

        If limit is specified, at most limit bytes will be read.
        """

        if not self._universal and limit < 0:
            # Shortcut common case - newline found in buffer.
            i = self._readbuffer.find('\n', self._offset) + 1
            if i > 0:
                line = self._readbuffer[self._offset: i]
                self._offset = i
                return line

        if not self._universal:
            return io.BufferedIOBase.readline(self, limit)

        line = ''
        while limit < 0 or len(line) < limit:
            readahead = self.peek(2)
            if readahead == '':
                return line

            #
            # Search for universal newlines or line chunks.
            #
            # The pattern returns either a line chunk or a newline, but not
            # both. Combined with peek(2), we are assured that the sequence
            # '\r\n' is always retrieved completely and never split into
            # separate newlines - '\r', '\n' due to coincidental readaheads.
            #
            match = self.PATTERN.search(readahead)
            newline = match.group('newline')
            if newline is not None:
                if self.newlines is None:
                    self.newlines = []
                if newline not in self.newlines:
                    self.newlines.append(newline)
                self._offset += len(newline)
                return line + '\n'

            chunk = match.group('chunk')
            if limit >= 0:
                chunk = chunk[: limit - len(line)]

            self._offset += len(chunk)
            line += chunk

        return line
项目:Parallel.GAMIT    作者:demiangomez    | 项目源码 | 文件源码
def readline(self, limit=-1):
        """Read and return a line from the stream.

        If limit is specified, at most limit bytes will be read.
        """

        if not self._universal and limit < 0:
            # Shortcut common case - newline found in buffer.
            i = self._readbuffer.find('\n', self._offset) + 1
            if i > 0:
                line = self._readbuffer[self._offset: i]
                self._offset = i
                return line

        if not self._universal:
            return io.BufferedIOBase.readline(self, limit)

        line = ''
        while limit < 0 or len(line) < limit:
            readahead = self.peek(2)
            if readahead == '':
                return line

            #
            # Search for universal newlines or line chunks.
            #
            # The pattern returns either a line chunk or a newline, but not
            # both. Combined with peek(2), we are assured that the sequence
            # '\r\n' is always retrieved completely and never split into
            # separate newlines - '\r', '\n' due to coincidental readaheads.
            #
            match = self.PATTERN.search(readahead)
            newline = match.group('newline')
            if newline is not None:
                if self.newlines is None:
                    self.newlines = []
                if newline not in self.newlines:
                    self.newlines.append(newline)
                self._offset += len(newline)
                return line + '\n'

            chunk = match.group('chunk')
            if limit >= 0:
                chunk = chunk[: limit - len(line)]

            self._offset += len(chunk)
            line += chunk

        return line
项目:ApkParser    作者:yigitozgumus    | 项目源码 | 文件源码
def readline(self, limit=-1):
        """Read and return a line from the stream.

        If limit is specified, at most limit bytes will be read.
        """

        if not self._universal and limit < 0:
            # Shortcut common case - newline found in buffer.
            i = self._readbuffer.find('\n', self._offset) + 1
            if i > 0:
                line = self._readbuffer[self._offset: i]
                self._offset = i
                return line

        if not self._universal:
            return io.BufferedIOBase.readline(self, limit)

        line = ''
        while limit < 0 or len(line) < limit:
            readahead = self.peek(2)
            if readahead == '':
                return line

            #
            # Search for universal newlines or line chunks.
            #
            # The pattern returns either a line chunk or a newline, but not
            # both. Combined with peek(2), we are assured that the sequence
            # '\r\n' is always retrieved completely and never split into
            # separate newlines - '\r', '\n' due to coincidental readaheads.
            #
            match = self.PATTERN.search(readahead)
            newline = match.group('newline')
            if newline is not None:
                if self.newlines is None:
                    self.newlines = []
                if newline not in self.newlines:
                    self.newlines.append(newline)
                self._offset += len(newline)
                return line + '\n'

            chunk = match.group('chunk')
            if limit >= 0:
                chunk = chunk[: limit - len(line)]

            self._offset += len(chunk)
            line += chunk

        return line
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def readline(self, limit=-1):
        """Read and return a line from the stream.

        If limit is specified, at most limit bytes will be read.
        """

        if not self._universal and limit < 0:
            # Shortcut common case - newline found in buffer.
            i = self._readbuffer.find('\n', self._offset) + 1
            if i > 0:
                line = self._readbuffer[self._offset: i]
                self._offset = i
                return line

        if not self._universal:
            return io.BufferedIOBase.readline(self, limit)

        line = ''
        while limit < 0 or len(line) < limit:
            readahead = self.peek(2)
            if readahead == '':
                return line

            #
            # Search for universal newlines or line chunks.
            #
            # The pattern returns either a line chunk or a newline, but not
            # both. Combined with peek(2), we are assured that the sequence
            # '\r\n' is always retrieved completely and never split into
            # separate newlines - '\r', '\n' due to coincidental readaheads.
            #
            match = self.PATTERN.search(readahead)
            newline = match.group('newline')
            if newline is not None:
                if self.newlines is None:
                    self.newlines = []
                if newline not in self.newlines:
                    self.newlines.append(newline)
                self._offset += len(newline)
                return line + '\n'

            chunk = match.group('chunk')
            if limit >= 0:
                chunk = chunk[: limit - len(line)]

            self._offset += len(chunk)
            line += chunk

        return line
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def readline(self, limit=-1):
        """Read and return a line from the stream.

        If limit is specified, at most limit bytes will be read.
        """

        if not self._universal and limit < 0:
            # Shortcut common case - newline found in buffer.
            i = self._readbuffer.find('\n', self._offset) + 1
            if i > 0:
                line = self._readbuffer[self._offset: i]
                self._offset = i
                return line

        if not self._universal:
            return io.BufferedIOBase.readline(self, limit)

        line = ''
        while limit < 0 or len(line) < limit:
            readahead = self.peek(2)
            if readahead == '':
                return line

            #
            # Search for universal newlines or line chunks.
            #
            # The pattern returns either a line chunk or a newline, but not
            # both. Combined with peek(2), we are assured that the sequence
            # '\r\n' is always retrieved completely and never split into
            # separate newlines - '\r', '\n' due to coincidental readaheads.
            #
            match = self.PATTERN.search(readahead)
            newline = match.group('newline')
            if newline is not None:
                if self.newlines is None:
                    self.newlines = []
                if newline not in self.newlines:
                    self.newlines.append(newline)
                self._offset += len(newline)
                return line + '\n'

            chunk = match.group('chunk')
            if limit >= 0:
                chunk = chunk[: limit - len(line)]

            self._offset += len(chunk)
            line += chunk

        return line
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def readline(self, limit=-1):
        """Read and return a line from the stream.

        If limit is specified, at most limit bytes will be read.
        """

        if not self._universal and limit < 0:
            # Shortcut common case - newline found in buffer.
            i = self._readbuffer.find(b'\n', self._offset) + 1
            if i > 0:
                line = self._readbuffer[self._offset: i]
                self._offset = i
                return line

        if not self._universal:
            return io.BufferedIOBase.readline(self, limit)

        line = b''
        while limit < 0 or len(line) < limit:
            readahead = self.peek(2)
            if readahead == b'':
                return line

            #
            # Search for universal newlines or line chunks.
            #
            # The pattern returns either a line chunk or a newline, but not
            # both. Combined with peek(2), we are assured that the sequence
            # '\r\n' is always retrieved completely and never split into
            # separate newlines - '\r', '\n' due to coincidental readaheads.
            #
            match = self.PATTERN.search(readahead)
            newline = match.group('newline')
            if newline is not None:
                if self.newlines is None:
                    self.newlines = []
                if newline not in self.newlines:
                    self.newlines.append(newline)
                self._offset += len(newline)
                return line + b'\n'

            chunk = match.group('chunk')
            if limit >= 0:
                chunk = chunk[: limit - len(line)]

            self._offset += len(chunk)
            line += chunk

        return line