Python io 模块,DEFAULT_BUFFER_SIZE 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用io.DEFAULT_BUFFER_SIZE

项目:jira_worklog_scanner    作者:pgarneau    | 项目源码 | 文件源码
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    """
    Backport of ``socket.makefile`` from Python 3.5.
    """
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
        )
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
    else:
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def __init__(self, reader, writer,
                 buffer_size=DEFAULT_BUFFER_SIZE, max_buffer_size=None):
        """Constructor.

        The arguments are two RawIO instances.
        """
        if max_buffer_size is not None:
            warnings.warn("max_buffer_size is deprecated", DeprecationWarning, 2)

        if not reader.readable():
            raise IOError('"reader" argument must be readable.')

        if not writer.writable():
            raise IOError('"writer" argument must be writable.')

        self.reader = BufferedReader(reader, buffer_size)
        self.writer = BufferedWriter(writer, buffer_size)
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def readlines(self, hint=-1):
        # type: (int) -> List[bytes]
        """Read and return a list of lines from the stream. hint can be specified to control
        the number of lines read: no more lines will be read if the total size (in bytes/
        characters) of all lines so far exceeds hint.

        :type hint: int
        :returns: Lines of data
        :rtype: list of bytes
        """
        lines = []
        for line in self:  # type: ignore
            lines.append(line)
            if hint > 0 and len(lines) * io.DEFAULT_BUFFER_SIZE > hint:
                break
        return lines
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def __init__(self, reader, writer,
                 buffer_size=DEFAULT_BUFFER_SIZE, max_buffer_size=None):
        """Constructor.

        The arguments are two RawIO instances.
        """
        if max_buffer_size is not None:
            warnings.warn("max_buffer_size is deprecated", DeprecationWarning, 2)

        if not reader.readable():
            raise IOError('"reader" argument must be readable.')

        if not writer.writable():
            raise IOError('"writer" argument must be writable.')

        self.reader = BufferedReader(reader, buffer_size)
        self.writer = BufferedWriter(writer, buffer_size)
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def test_file_position_after_fromfile(self):
        # gh-4118
        sizes = [io.DEFAULT_BUFFER_SIZE//8,
                 io.DEFAULT_BUFFER_SIZE,
                 io.DEFAULT_BUFFER_SIZE*8]

        for size in sizes:
            f = open(self.filename, 'wb')
            f.seek(size-1)
            f.write(b'\0')
            f.close()

            for mode in ['rb', 'r+b']:
                err_msg = "%d %s" % (size, mode)

                f = open(self.filename, mode)
                f.read(2)
                np.fromfile(f, dtype=np.float64, count=1)
                pos = f.tell()
                f.close()
                assert_equal(pos, 10, err_msg=err_msg)
项目:atc_alexa    作者:ckuzma    | 项目源码 | 文件源码
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    """
    Backport of ``socket.makefile`` from Python 3.5.
    """
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
        )
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
    else:
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
项目:data_utilities    作者:fmv1992    | 项目源码 | 文件源码
def get_hash(self, x):
        # TODO: cached for strings and paths.
        # Need to generalize the reading of:
        #   1) file paths
        #   2) small strings
        #   3) python_objects
        if isinstance(x, (dict, bytes)) or not os.path.isfile(x):  # use function cache.
            return self._get_hash_from_hashable(self._transform_to_hashable(x))
        # For files.
        hash_obj = self.hash_function()
        iter_of_bytes = open(x, 'rb')
        try:
            data = iter_of_bytes.read(io.DEFAULT_BUFFER_SIZE)
            while data:
                hash_obj.update(data)
                data = iter_of_bytes.read(io.DEFAULT_BUFFER_SIZE)
        finally:
            iter_of_bytes.close()
        return hash_obj.digest()
项目:krpcScripts    作者:jwvanderbeck    | 项目源码 | 文件源码
def test_file_position_after_fromfile(self):
        # gh-4118
        sizes = [io.DEFAULT_BUFFER_SIZE//8,
                 io.DEFAULT_BUFFER_SIZE,
                 io.DEFAULT_BUFFER_SIZE*8]

        for size in sizes:
            f = open(self.filename, 'wb')
            f.seek(size-1)
            f.write(b'\0')
            f.close()

            for mode in ['rb', 'r+b']:
                err_msg = "%d %s" % (size, mode)

                f = open(self.filename, mode)
                f.read(2)
                np.fromfile(f, dtype=np.float64, count=1)
                pos = f.tell()
                f.close()
                assert_equal(pos, 10, err_msg=err_msg)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def __init__(self, reader, writer,
                 buffer_size=DEFAULT_BUFFER_SIZE, max_buffer_size=None):
        """Constructor.

        The arguments are two RawIO instances.
        """
        if max_buffer_size is not None:
            warnings.warn("max_buffer_size is deprecated", DeprecationWarning, 2)

        if not reader.readable():
            raise IOError('"reader" argument must be readable.')

        if not writer.writable():
            raise IOError('"writer" argument must be writable.')

        self.reader = BufferedReader(reader, buffer_size)
        self.writer = BufferedWriter(writer, buffer_size)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def __init__(self, reader, writer,
                 buffer_size=DEFAULT_BUFFER_SIZE, max_buffer_size=None):
        """Constructor.

        The arguments are two RawIO instances.
        """
        if max_buffer_size is not None:
            warnings.warn("max_buffer_size is deprecated", DeprecationWarning, 2)

        if not reader.readable():
            raise IOError('"reader" argument must be readable.')

        if not writer.writable():
            raise IOError('"writer" argument must be writable.')

        self.reader = BufferedReader(reader, buffer_size)
        self.writer = BufferedWriter(writer, buffer_size)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def __init__(self, reader, writer,
                 buffer_size=DEFAULT_BUFFER_SIZE, max_buffer_size=None):
        """Constructor.

        The arguments are two RawIO instances.
        """
        if max_buffer_size is not None:
            warnings.warn("max_buffer_size is deprecated", DeprecationWarning, 2)

        if not reader.readable():
            raise IOError('"reader" argument must be readable.')

        if not writer.writable():
            raise IOError('"writer" argument must be writable.')

        self.reader = BufferedReader(reader, buffer_size)
        self.writer = BufferedWriter(writer, buffer_size)
项目:pyfilesystem2    作者:PyFilesystem    | 项目源码 | 文件源码
def copy_file_data(src_file, dst_file, chunk_size=None):
    """Copy data from one file object to another.

    Arguments:
        src_file (io.IOBase): File open for reading.
        dst_file (io.IOBase): File open for writing.
        chunk_size (int, optional): Number of bytes to copy at
            a time (or `None` to use sensible default).

    """
    chunk_size = chunk_size or io.DEFAULT_BUFFER_SIZE
    read = src_file.read
    write = dst_file.write
    # The 'or None' is so that it works with binary and text files
    for chunk in iter(lambda: read(chunk_size) or None, None):
        write(chunk)
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def test_file_position_after_fromfile(self):
        # gh-4118
        sizes = [io.DEFAULT_BUFFER_SIZE//8,
                 io.DEFAULT_BUFFER_SIZE,
                 io.DEFAULT_BUFFER_SIZE*8]

        for size in sizes:
            f = open(self.filename, 'wb')
            f.seek(size-1)
            f.write(b'\0')
            f.close()

            for mode in ['rb', 'r+b']:
                err_msg = "%d %s" % (size, mode)

                f = open(self.filename, mode)
                f.read(2)
                np.fromfile(f, dtype=np.float64, count=1)
                pos = f.tell()
                f.close()
                assert_equal(pos, 10, err_msg=err_msg)
项目:aws-lambda-numpy    作者:vitolimandibhrata    | 项目源码 | 文件源码
def test_file_position_after_fromfile(self):
        # gh-4118
        sizes = [io.DEFAULT_BUFFER_SIZE//8,
                 io.DEFAULT_BUFFER_SIZE,
                 io.DEFAULT_BUFFER_SIZE*8]

        for size in sizes:
            f = open(self.filename, 'wb')
            f.seek(size-1)
            f.write(b'\0')
            f.close()

            for mode in ['rb', 'r+b']:
                err_msg = "%d %s" % (size, mode)

                f = open(self.filename, mode)
                f.read(2)
                np.fromfile(f, dtype=np.float64, count=1)
                pos = f.tell()
                f.close()
                assert_equal(pos, 10, err_msg=err_msg)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE):
    """Yield pieces of data from a file-like object until EOF."""
    while True:
        chunk = file.read(size)
        if not chunk:
            break
        yield chunk
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    """
    Backport of ``socket.makefile`` from Python 3.5.
    """
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
        )
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
    else:
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE):
    """Yield pieces of data from a file-like object until EOF."""
    while True:
        chunk = file.read(size)
        if not chunk:
            break
        yield chunk
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def readall(self):
        """Read until EOF, using multiple read() call."""
        res = bytearray()
        while True:
            data = self.read(DEFAULT_BUFFER_SIZE)
            if not data:
                break
            res += data
        if res:
            return bytes(res)
        else:
            # b'' or None
            return data
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def __init__(self, raw, buffer_size=DEFAULT_BUFFER_SIZE):
        """Create a new buffered reader using the given readable raw IO object.
        """
        if not raw.readable():
            raise IOError('"raw" argument must be readable.')

        _BufferedIOMixin.__init__(self, raw)
        if buffer_size <= 0:
            raise ValueError("invalid buffer size")
        self.buffer_size = buffer_size
        self._reset_read_buf()
        self._read_lock = Lock()
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def __init__(self, raw,
                 buffer_size=DEFAULT_BUFFER_SIZE, max_buffer_size=None):
        if not raw.writable():
            raise IOError('"raw" argument must be writable.')

        _BufferedIOMixin.__init__(self, raw)
        if buffer_size <= 0:
            raise ValueError("invalid buffer size")
        if max_buffer_size is not None:
            warnings.warn("max_buffer_size is deprecated", DeprecationWarning,
                          self._warning_stack_offset)
        self.buffer_size = buffer_size
        self._write_buf = bytearray()
        self._write_lock = Lock()
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def __init__(self, raw,
                 buffer_size=DEFAULT_BUFFER_SIZE, max_buffer_size=None):
        raw._checkSeekable()
        BufferedReader.__init__(self, raw, buffer_size)
        BufferedWriter.__init__(self, raw, buffer_size, max_buffer_size)
项目:googletranslate.popclipext    作者:wizyoung    | 项目源码 | 文件源码
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    """
    Backport of ``socket.makefile`` from Python 3.5.
    """
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
        )
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
    else:
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    """
    Backport of ``socket.makefile`` from Python 3.5.
    """
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
        )
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
    else:
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
项目:pip-update-requirements    作者:alanhamlett    | 项目源码 | 文件源码
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    """
    Backport of ``socket.makefile`` from Python 3.5.
    """
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
        )
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
    else:
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
项目:pip-update-requirements    作者:alanhamlett    | 项目源码 | 文件源码
def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE):
    """Yield pieces of data from a file-like object until EOF."""
    while True:
        chunk = file.read(size)
        if not chunk:
            break
        yield chunk
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE):
    """Yield pieces of data from a file-like object until EOF."""
    while True:
        chunk = file.read(size)
        if not chunk:
            break
        yield chunk
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE):
    """Yield pieces of data from a file-like object until EOF."""
    while True:
        chunk = file.read(size)
        if not chunk:
            break
        yield chunk
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE):
    """Yield pieces of data from a file-like object until EOF."""
    while True:
        chunk = file.read(size)
        if not chunk:
            break
        yield chunk
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE):
    """Yield pieces of data from a file-like object until EOF."""
    while True:
        chunk = file.read(size)
        if not chunk:
            break
        yield chunk
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE):
    """Yield pieces of data from a file-like object until EOF."""
    while True:
        chunk = file.read(size)
        if not chunk:
            break
        yield chunk
项目:jira_worklog_scanner    作者:pgarneau    | 项目源码 | 文件源码
def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE):
    """Yield pieces of data from a file-like object until EOF."""
    while True:
        chunk = file.read(size)
        if not chunk:
            break
        yield chunk
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def test_base64io_decode_readline(bytes_to_read, expected_bytes_read):
    source_plaintext = os.urandom(io.DEFAULT_BUFFER_SIZE * 2)
    source_stream = io.BytesIO(base64.b64encode(source_plaintext))

    with Base64IO(source_stream) as decoder:
        test = decoder.readline(bytes_to_read)

    assert test == source_plaintext[:expected_bytes_read]
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def readline(self, limit=-1):
        # type: (int) -> bytes
        """Read and return one line from the stream.
        If limit is specified, at most limit bytes will be read.

        .. note::

            Because the source that this reads from may not contain any OEL characters, we
            read "lines" in chunks of length ``io.DEFAULT_BUFFER_SIZE``.

        :type limit: int
        :rtype: bytes
        """
        return self.read(limit if limit > 0 else io.DEFAULT_BUFFER_SIZE)
项目:workflows.kyoyue    作者:wizyoung    | 项目源码 | 文件源码
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    """
    Backport of ``socket.makefile`` from Python 3.5.
    """
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
        )
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
    else:
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE):
    """Yield pieces of data from a file-like object until EOF."""
    while True:
        chunk = file.read(size)
        if not chunk:
            break
        yield chunk
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE):
    """Yield pieces of data from a file-like object until EOF."""
    while True:
        chunk = file.read(size)
        if not chunk:
            break
        yield chunk
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def readall(self):
        """Read until EOF, using multiple read() call."""
        res = bytearray()
        while True:
            data = self.read(DEFAULT_BUFFER_SIZE)
            if not data:
                break
            res += data
        if res:
            return bytes(res)
        else:
            # b'' or None
            return data
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def __init__(self, raw, buffer_size=DEFAULT_BUFFER_SIZE):
        """Create a new buffered reader using the given readable raw IO object.
        """
        if not raw.readable():
            raise IOError('"raw" argument must be readable.')

        _BufferedIOMixin.__init__(self, raw)
        if buffer_size <= 0:
            raise ValueError("invalid buffer size")
        self.buffer_size = buffer_size
        self._reset_read_buf()
        self._read_lock = Lock()
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def __init__(self, raw,
                 buffer_size=DEFAULT_BUFFER_SIZE, max_buffer_size=None):
        if not raw.writable():
            raise IOError('"raw" argument must be writable.')

        _BufferedIOMixin.__init__(self, raw)
        if buffer_size <= 0:
            raise ValueError("invalid buffer size")
        if max_buffer_size is not None:
            warnings.warn("max_buffer_size is deprecated", DeprecationWarning,
                          self._warning_stack_offset)
        self.buffer_size = buffer_size
        self._write_buf = bytearray()
        self._write_lock = Lock()
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE):
    """Yield pieces of data from a file-like object until EOF."""
    while True:
        chunk = file.read(size)
        if not chunk:
            break
        yield chunk
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE):
    """Yield pieces of data from a file-like object until EOF."""
    while True:
        chunk = file.read(size)
        if not chunk:
            break
        yield chunk
项目:purelove    作者:hucmosin    | 项目源码 | 文件源码
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    """
    Backport of ``socket.makefile`` from Python 3.5.
    """
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
        )
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
    else:
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    """
    Backport of ``socket.makefile`` from Python 3.5.
    """
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
        )
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
    else:
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
项目:harbour-sailfinder    作者:DylanVanAssche    | 项目源码 | 文件源码
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    """
    Backport of ``socket.makefile`` from Python 3.5.
    """
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
        )
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
    else:
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
项目:harbour-sailfinder    作者:DylanVanAssche    | 项目源码 | 文件源码
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    """
    Backport of ``socket.makefile`` from Python 3.5.
    """
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
        )
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
    else:
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    """
    Backport of ``socket.makefile`` from Python 3.5.
    """
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
        )
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
    else:
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
项目:ascii-art-py    作者:blinglnav    | 项目源码 | 文件源码
def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE):
    """Yield pieces of data from a file-like object until EOF."""
    while True:
        chunk = file.read(size)
        if not chunk:
            break
        yield chunk
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def test_file_position_after_tofile(self):
        # gh-4118
        sizes = [io.DEFAULT_BUFFER_SIZE//8,
                 io.DEFAULT_BUFFER_SIZE,
                 io.DEFAULT_BUFFER_SIZE*8]

        for size in sizes:
            err_msg = "%d" % (size,)

            f = open(self.filename, 'wb')
            f.seek(size-1)
            f.write(b'\0')
            f.seek(10)
            f.write(b'12')
            np.array([0], dtype=np.float64).tofile(f)
            pos = f.tell()
            f.close()
            assert_equal(pos, 10 + 2 + 8, err_msg=err_msg)

            f = open(self.filename, 'r+b')
            f.read(2)
            f.seek(0, 1)  # seek between read&write required by ANSI C
            np.array([0], dtype=np.float64).tofile(f)
            pos = f.tell()
            f.close()
            assert_equal(pos, 10, err_msg=err_msg)
项目:ghostlines-robofont    作者:ghostlines    | 项目源码 | 文件源码
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    """
    Backport of ``socket.makefile`` from Python 3.5.
    """
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
        )
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
    else:
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
项目:ghostlines-robofont    作者:ghostlines    | 项目源码 | 文件源码
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    """
    Backport of ``socket.makefile`` from Python 3.5.
    """
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
        )
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
    else:
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text