Python os 模块,SEEK_SET 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.SEEK_SET

项目:fieldsight-kobocat    作者:awemulya    | 项目源码 | 文件源码
def create_media(media):
    """Download media link"""
    if is_valid_url(media.data_value):
        filename = media.data_value.split('/')[-1]
        data_file = NamedTemporaryFile()
        content_type = mimetypes.guess_type(filename)
        with closing(requests.get(media.data_value, stream=True)) as r:
            for chunk in r.iter_content(chunk_size=CHUNK_SIZE):
                if chunk:
                    data_file.write(chunk)
        data_file.seek(os.SEEK_SET, os.SEEK_END)
        size = os.path.getsize(data_file.name)
        data_file.seek(os.SEEK_SET)
        media.data_value = filename
        media.data_file = InMemoryUploadedFile(
            data_file, 'data_file', filename, content_type,
            size, charset=None)

        return media

    return None
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def seek(self, pos, whence=os.SEEK_SET):
        """Seek to a position in the file.
        """
        if self.closed:
            raise ValueError("I/O operation on closed file")

        if whence == os.SEEK_SET:
            self.position = min(max(pos, 0), self.size)
        elif whence == os.SEEK_CUR:
            if pos < 0:
                self.position = max(self.position + pos, 0)
            else:
                self.position = min(self.position + pos, self.size)
        elif whence == os.SEEK_END:
            self.position = max(min(self.size + pos, self.size), 0)
        else:
            raise ValueError("Invalid argument")

        self.buffer = b""
        self.fileobj.seek(self.position)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def seek(self, pos, whence=os.SEEK_SET):
        """Seek to a position in the file.
        """
        if self.closed:
            raise ValueError("I/O operation on closed file")

        if whence == os.SEEK_SET:
            self.position = min(max(pos, 0), self.size)
        elif whence == os.SEEK_CUR:
            if pos < 0:
                self.position = max(self.position + pos, 0)
            else:
                self.position = min(self.position + pos, self.size)
        elif whence == os.SEEK_END:
            self.position = max(min(self.size + pos, self.size), 0)
        else:
            raise ValueError("Invalid argument")

        self.buffer = b""
        self.fileobj.seek(self.position)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def seek(self, pos, whence=os.SEEK_SET):
        """Seek to a position in the file.
        """
        if self.closed:
            raise ValueError("I/O operation on closed file")

        if whence == os.SEEK_SET:
            self.position = min(max(pos, 0), self.size)
        elif whence == os.SEEK_CUR:
            if pos < 0:
                self.position = max(self.position + pos, 0)
            else:
                self.position = min(self.position + pos, self.size)
        elif whence == os.SEEK_END:
            self.position = max(min(self.size + pos, self.size), 0)
        else:
            raise ValueError("Invalid argument")

        self.buffer = ""
        self.fileobj.seek(self.position)
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def setup(self):
        # Reset everything.
        self.f.seek(self.beginning, os.SEEK_SET)

        self.whereToWriteNewIFDOffset = None
        self.offsetOfNewPage = 0

        self.IIMM = IIMM = self.f.read(4)
        if not IIMM:
            # empty file - first page
            self.isFirst = True
            return

        self.isFirst = False
        if IIMM == b"II\x2a\x00":
            self.setEndian("<")
        elif IIMM == b"MM\x00\x2a":
            self.setEndian(">")
        else:
            raise RuntimeError("Invalid TIFF file header")

        self.skipIFDs()
        self.goToEnd()
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def seek(self, pos, whence=os.SEEK_SET):
        """Seek to a position in the file.
        """
        if self.closed:
            raise ValueError("I/O operation on closed file")

        if whence == os.SEEK_SET:
            self.position = min(max(pos, 0), self.size)
        elif whence == os.SEEK_CUR:
            if pos < 0:
                self.position = max(self.position + pos, 0)
            else:
                self.position = min(self.position + pos, self.size)
        elif whence == os.SEEK_END:
            self.position = max(min(self.size + pos, self.size), 0)
        else:
            raise ValueError("Invalid argument")

        self.buffer = b""
        self.fileobj.seek(self.position)
项目:pip-update-requirements    作者:alanhamlett    | 项目源码 | 文件源码
def seek(self, pos, whence=os.SEEK_SET):
        """Seek to a position in the file.
        """
        if self.closed:
            raise ValueError("I/O operation on closed file")

        if whence == os.SEEK_SET:
            self.position = min(max(pos, 0), self.size)
        elif whence == os.SEEK_CUR:
            if pos < 0:
                self.position = max(self.position + pos, 0)
            else:
                self.position = min(self.position + pos, self.size)
        elif whence == os.SEEK_END:
            self.position = max(min(self.size + pos, self.size), 0)
        else:
            raise ValueError("Invalid argument")

        self.buffer = b""
        self.fileobj.seek(self.position)
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def seek(self, pos, whence=os.SEEK_SET):
        """Seek to a position in the file.
        """
        if self.closed:
            raise ValueError("I/O operation on closed file")

        if whence == os.SEEK_SET:
            self.position = min(max(pos, 0), self.size)
        elif whence == os.SEEK_CUR:
            if pos < 0:
                self.position = max(self.position + pos, 0)
            else:
                self.position = min(self.position + pos, self.size)
        elif whence == os.SEEK_END:
            self.position = max(min(self.size + pos, self.size), 0)
        else:
            raise ValueError("Invalid argument")

        self.buffer = b""
        self.fileobj.seek(self.position)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def seek(self, pos, whence=os.SEEK_SET):
        """Seek to a position in the file.
        """
        if self.closed:
            raise ValueError("I/O operation on closed file")

        if whence == os.SEEK_SET:
            self.position = min(max(pos, 0), self.size)
        elif whence == os.SEEK_CUR:
            if pos < 0:
                self.position = max(self.position + pos, 0)
            else:
                self.position = min(self.position + pos, self.size)
        elif whence == os.SEEK_END:
            self.position = max(min(self.size + pos, self.size), 0)
        else:
            raise ValueError("Invalid argument")

        self.buffer = b""
        self.fileobj.seek(self.position)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def seek(self, pos, whence=os.SEEK_SET):
        """Seek to a position in the file.
        """
        if self.closed:
            raise ValueError("I/O operation on closed file")

        if whence == os.SEEK_SET:
            self.position = min(max(pos, 0), self.size)
        elif whence == os.SEEK_CUR:
            if pos < 0:
                self.position = max(self.position + pos, 0)
            else:
                self.position = min(self.position + pos, self.size)
        elif whence == os.SEEK_END:
            self.position = max(min(self.size + pos, self.size), 0)
        else:
            raise ValueError("Invalid argument")

        self.buffer = b""
        self.fileobj.seek(self.position)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def seek(self, pos, whence=os.SEEK_SET):
        """Seek to a position in the file.
        """
        if self.closed:
            raise ValueError("I/O operation on closed file")

        if whence == os.SEEK_SET:
            self.position = min(max(pos, 0), self.size)
        elif whence == os.SEEK_CUR:
            if pos < 0:
                self.position = max(self.position + pos, 0)
            else:
                self.position = min(self.position + pos, self.size)
        elif whence == os.SEEK_END:
            self.position = max(min(self.size + pos, self.size), 0)
        else:
            raise ValueError("Invalid argument")

        self.buffer = b""
        self.fileobj.seek(self.position)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def seek(self, pos, whence=os.SEEK_SET):
        """Seek to a position in the file.
        """
        if self.closed:
            raise ValueError("I/O operation on closed file")

        if whence == os.SEEK_SET:
            self.position = min(max(pos, 0), self.size)
        elif whence == os.SEEK_CUR:
            if pos < 0:
                self.position = max(self.position + pos, 0)
            else:
                self.position = min(self.position + pos, self.size)
        elif whence == os.SEEK_END:
            self.position = max(min(self.size + pos, self.size), 0)
        else:
            raise ValueError("Invalid argument")

        self.buffer = b""
        self.fileobj.seek(self.position)
项目:mongodb_backup_script    作者:hxt168    | 项目源码 | 文件源码
def md5sum2(filename, offset=0, partsize=0):
    m = get_md5()
    fp = open(filename, 'rb')
    if offset > os.path.getsize(filename):
        fp.seek(os.SEEK_SET, os.SEEK_END)
    else:
        fp.seek(offset)

    left_len = partsize
    BufferSize = BUFFER_SIZE
    while True:
        if left_len <= 0:
            break
        elif left_len < BufferSize:
            buffer_content = fp.read(left_len)
        else:
            buffer_content = fp.read(BufferSize)
        m.update(buffer_content)
        left_len = left_len - len(buffer_content)
    md5sum = m.hexdigest()
    return md5sum
项目:jira_worklog_scanner    作者:pgarneau    | 项目源码 | 文件源码
def seek(self, pos, whence=os.SEEK_SET):
        """Seek to a position in the file.
        """
        if self.closed:
            raise ValueError("I/O operation on closed file")

        if whence == os.SEEK_SET:
            self.position = min(max(pos, 0), self.size)
        elif whence == os.SEEK_CUR:
            if pos < 0:
                self.position = max(self.position + pos, 0)
            else:
                self.position = min(self.position + pos, self.size)
        elif whence == os.SEEK_END:
            self.position = max(min(self.size + pos, self.size), 0)
        else:
            raise ValueError("Invalid argument")

        self.buffer = b""
        self.fileobj.seek(self.position)
项目:workflows.kyoyue    作者:wizyoung    | 项目源码 | 文件源码
def setup(self):
        # Reset everything.
        self.f.seek(self.beginning, os.SEEK_SET)

        self.whereToWriteNewIFDOffset = None
        self.offsetOfNewPage = 0

        self.IIMM = IIMM = self.f.read(4)
        if not IIMM:
            # empty file - first page
            self.isFirst = True
            return

        self.isFirst = False
        if IIMM == b"II\x2a\x00":
            self.setEndian("<")
        elif IIMM == b"MM\x00\x2a":
            self.setEndian(">")
        else:
            raise RuntimeError("Invalid TIFF file header")

        self.skipIFDs()
        self.goToEnd()
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def seek(self, pos, whence=os.SEEK_SET):
        """Seek to a position in the file.
        """
        if self.closed:
            raise ValueError("I/O operation on closed file")

        if whence == os.SEEK_SET:
            self.position = min(max(pos, 0), self.size)
        elif whence == os.SEEK_CUR:
            if pos < 0:
                self.position = max(self.position + pos, 0)
            else:
                self.position = min(self.position + pos, self.size)
        elif whence == os.SEEK_END:
            self.position = max(min(self.size + pos, self.size), 0)
        else:
            raise ValueError("Invalid argument")

        self.buffer = b""
        self.fileobj.seek(self.position)
项目:CyberScan    作者:medbenali    | 项目源码 | 文件源码
def _get_org(self, ipnum):
        """
        Seek and return organization or ISP name for ipnum.
        Return org/isp name.

        :arg ipnum: Result of ip2long conversion
        """
        seek_org = self._seek_country(ipnum)
        if seek_org == self._databaseSegments:
            return None

        read_length = (2 * self._recordLength - 1) * self._databaseSegments
        try:
            self._lock.acquire()
            self._fp.seek(seek_org + read_length, os.SEEK_SET)
            buf = self._fp.read(const.MAX_ORG_RECORD_LENGTH)
        finally:
            self._lock.release()

        if PY3 and type(buf) is bytes:
            buf = buf.decode(ENCODING)

        return buf[:buf.index(chr(0))]
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def seek(self, pos, whence=os.SEEK_SET):
        """Seek to a position in the file.
        """
        if self.closed:
            raise ValueError("I/O operation on closed file")

        if whence == os.SEEK_SET:
            self.position = min(max(pos, 0), self.size)
        elif whence == os.SEEK_CUR:
            if pos < 0:
                self.position = max(self.position + pos, 0)
            else:
                self.position = min(self.position + pos, self.size)
        elif whence == os.SEEK_END:
            self.position = max(min(self.size + pos, self.size), 0)
        else:
            raise ValueError("Invalid argument")

        self.buffer = b""
        self.fileobj.seek(self.position)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def seek(self, pos, whence=os.SEEK_SET):
        """Seek to a position in the file.
        """
        if self.closed:
            raise ValueError("I/O operation on closed file")

        if whence == os.SEEK_SET:
            self.position = min(max(pos, 0), self.size)
        elif whence == os.SEEK_CUR:
            if pos < 0:
                self.position = max(self.position + pos, 0)
            else:
                self.position = min(self.position + pos, self.size)
        elif whence == os.SEEK_END:
            self.position = max(min(self.size + pos, self.size), 0)
        else:
            raise ValueError("Invalid argument")

        self.buffer = b""
        self.fileobj.seek(self.position)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def seek(self, pos, whence=os.SEEK_SET):
        """Seek to a position in the file.
        """
        if self.closed:
            raise ValueError("I/O operation on closed file")

        if whence == os.SEEK_SET:
            self.position = min(max(pos, 0), self.size)
        elif whence == os.SEEK_CUR:
            if pos < 0:
                self.position = max(self.position + pos, 0)
            else:
                self.position = min(self.position + pos, self.size)
        elif whence == os.SEEK_END:
            self.position = max(min(self.size + pos, self.size), 0)
        else:
            raise ValueError("Invalid argument")

        self.buffer = ""
        self.fileobj.seek(self.position)
项目:instagram_private_api    作者:ping    | 项目源码 | 文件源码
def chunk_generator(chunk_count, chunk_size, file_data):
    """
    Generic chunk generator logic

    :param chunk_count: Number of chunks wanted
    :param chunk_size: Size of each chunk
    :param file_data: bytes to be split into chunk
    :return:
    """
    try:
        total_len = len(file_data)
        is_fp = False
    except TypeError:
        total_len = get_file_size(file_data)
        is_fp = True
    for i in range(chunk_count):
        start_range = i * chunk_size
        end_range = (start_range + chunk_size) if i < (chunk_count - 1) else total_len
        chunk_info = Chunk(i, start_range, end_range, chunk_count)
        if is_fp:
            file_data.seek(chunk_info.start, os.SEEK_SET)
            yield chunk_info, file_data.read(chunk_info.length)
        else:
            yield chunk_info, file_data[chunk_info.start: chunk_info.end]
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def seek(self, pos, whence=os.SEEK_SET):
        """Seek to a position in the file.
        """
        if self.closed:
            raise ValueError("I/O operation on closed file")

        if whence == os.SEEK_SET:
            self.position = min(max(pos, 0), self.size)
        elif whence == os.SEEK_CUR:
            if pos < 0:
                self.position = max(self.position + pos, 0)
            else:
                self.position = min(self.position + pos, self.size)
        elif whence == os.SEEK_END:
            self.position = max(min(self.size + pos, self.size), 0)
        else:
            raise ValueError("Invalid argument")

        self.buffer = b""
        self.fileobj.seek(self.position)
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def seek(self, pos, whence=os.SEEK_SET):
        """Seek to a position in the file.
        """
        if self.closed:
            raise ValueError("I/O operation on closed file")

        if whence == os.SEEK_SET:
            self.position = min(max(pos, 0), self.size)
        elif whence == os.SEEK_CUR:
            if pos < 0:
                self.position = max(self.position + pos, 0)
            else:
                self.position = min(self.position + pos, self.size)
        elif whence == os.SEEK_END:
            self.position = max(min(self.size + pos, self.size), 0)
        else:
            raise ValueError("Invalid argument")

        self.buffer = b""
        self.fileobj.seek(self.position)
项目:ascii-art-py    作者:blinglnav    | 项目源码 | 文件源码
def seek(self, pos, whence=os.SEEK_SET):
        """Seek to a position in the file.
        """
        if self.closed:
            raise ValueError("I/O operation on closed file")

        if whence == os.SEEK_SET:
            self.position = min(max(pos, 0), self.size)
        elif whence == os.SEEK_CUR:
            if pos < 0:
                self.position = max(self.position + pos, 0)
            else:
                self.position = min(self.position + pos, self.size)
        elif whence == os.SEEK_END:
            self.position = max(min(self.size + pos, self.size), 0)
        else:
            raise ValueError("Invalid argument")

        self.buffer = b""
        self.fileobj.seek(self.position)
项目:ascii-art-py    作者:blinglnav    | 项目源码 | 文件源码
def setup(self):
        # Reset everything.
        self.f.seek(self.beginning, os.SEEK_SET)

        self.whereToWriteNewIFDOffset = None
        self.offsetOfNewPage = 0

        self.IIMM = IIMM = self.f.read(4)
        if not IIMM:
            # empty file - first page
            self.isFirst = True
            return

        self.isFirst = False
        if IIMM == b"II\x2a\x00":
            self.setEndian("<")
        elif IIMM == b"MM\x00\x2a":
            self.setEndian(">")
        else:
            raise RuntimeError("Invalid TIFF file header")

        self.skipIFDs()
        self.goToEnd()
项目:ivaochdoc    作者:ivaoch    | 项目源码 | 文件源码
def seek(self, pos, whence=os.SEEK_SET):
        """Seek to a position in the file.
        """
        if self.closed:
            raise ValueError("I/O operation on closed file")

        if whence == os.SEEK_SET:
            self.position = min(max(pos, 0), self.size)
        elif whence == os.SEEK_CUR:
            if pos < 0:
                self.position = max(self.position + pos, 0)
            else:
                self.position = min(self.position + pos, self.size)
        elif whence == os.SEEK_END:
            self.position = max(min(self.size + pos, self.size), 0)
        else:
            raise ValueError("Invalid argument")

        self.buffer = b""
        self.fileobj.seek(self.position)
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def read_random_line(f):
    import os

    chunk_size = 16
    with open(f, 'rb') as f_handle:
        f_handle.seek(0, os.SEEK_END)
        size = f_handle.tell()
        # i = random.randint(0, size)
        i = u_randint(0, size)
        while True:
            i -= chunk_size
            if i < 0:
                chunk_size += i
                i = 0
            f_handle.seek(i, os.SEEK_SET)
            d = f_handle.read(chunk_size)
            i_newline = d.rfind(b'\n')
            if i_newline != -1:
                i += i_newline + 1
                break
            if i == 0:
                break
        f_handle.seek(i, os.SEEK_SET)
        return f_handle.readline()
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def uuid_from_file(fn, block_size=1 << 20):
    """
    Returns an arbitrary sized unique ASCII string based on the file contents.
    (exact hashing method may change).
    """
    with open(fn, 'rb') as f:
        # first get the size
        import os
        f.seek(0, os.SEEK_END)
        size = f.tell()
        f.seek(0, os.SEEK_SET)
        del os
        # done!

        import hashlib
        sha1 = hashlib.new('sha512')
        while True:
            data = f.read(block_size)
            if not data:
                break
            sha1.update(data)
        # skip the '0x'
        return hex(size)[2:] + sha1.hexdigest()
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def _uuid_from_file(fn, block_size=1 << 20):
    with open(fn, 'rb') as f:
        # first get the size
        f.seek(0, os.SEEK_END)
        size = f.tell()
        f.seek(0, os.SEEK_SET)
        # done!

        import hashlib
        sha1 = hashlib.new('sha512')
        while True:
            data = f.read(block_size)
            if not data:
                break
            sha1.update(data)
        return (hex(size)[2:] + sha1.hexdigest()).encode()
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def close(self):
        """
        Close the blend file
        writes the blend file to disk if changes has happened
        """
        handle = self.handle

        if self.is_modified:
            if self.is_compressed:
                log.debug("close compressed blend file")
                handle.seek(os.SEEK_SET, 0)
                log.debug("compressing started")
                fs = gzip.open(self.filepath_orig, "wb")
                data = handle.read(FILE_BUFFER_SIZE)
                while data:
                    fs.write(data)
                    data = handle.read(FILE_BUFFER_SIZE)
                fs.close()
                log.debug("compressing finished")

        handle.close()
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def get(self, path,
            default=...,
            sdna_index_refine=None,
            use_nil=True, use_str=True,
            base_index=0,
            ):

        ofs = self.file_offset
        if base_index != 0:
            assert(base_index < self.count)
            ofs += (self.size // self.count) * base_index
        self.file.handle.seek(ofs, os.SEEK_SET)

        if sdna_index_refine is None:
            sdna_index_refine = self.sdna_index
        else:
            self.file.ensure_subtype_smaller(self.sdna_index, sdna_index_refine)

        dna_struct = self.file.structs[sdna_index_refine]
        return dna_struct.field_get(
                self.file.header, self.file.handle, path,
                default=default,
                use_nil=use_nil, use_str=use_str,
                )
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def set(self, path, value,
            sdna_index_refine=None,
            ):

        if sdna_index_refine is None:
            sdna_index_refine = self.sdna_index
        else:
            self.file.ensure_subtype_smaller(self.sdna_index, sdna_index_refine)

        dna_struct = self.file.structs[sdna_index_refine]
        self.file.handle.seek(self.file_offset, os.SEEK_SET)
        self.file.is_modified = True
        return dna_struct.field_set(
                self.file.header, self.file.handle, path, value)

    # ---------------
    # Utility get/set
    #
    #   avoid inline pointer casting
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def iter_array(block, length=-1):
        assert(block.code == b'DATA')
        from . import blendfile
        import os
        handle = block.file.handle
        header = block.file.header

        for i in range(length):
            block.file.handle.seek(block.file_offset + (header.pointer_size * i), os.SEEK_SET)
            offset = blendfile.DNA_IO.read_pointer(handle, header)
            sub_block = block.file.find_block_from_offset(offset)
            yield sub_block


# -----------------------------------------------------------------------------
# ID Expand
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def uuid_from_file(fn, block_size=1 << 20):
    """
    Returns an arbitrary sized unique ASCII string based on the file contents.
    (exact hashing method may change).
    """
    with open(fn, 'rb') as f:
        # first get the size
        import os
        f.seek(0, os.SEEK_END)
        size = f.tell()
        f.seek(0, os.SEEK_SET)
        del os
        # done!

        import hashlib
        sha1 = hashlib.new('sha512')
        while True:
            data = f.read(block_size)
            if not data:
                break
            sha1.update(data)
        # skip the '0x'
        return hex(size)[2:] + sha1.hexdigest()
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def get(self, path,
            default=...,
            sdna_index_refine=None,
            use_nil=True, use_str=True,
            base_index=0,
            ):

        ofs = self.file_offset
        if base_index != 0:
            assert(base_index < self.count)
            ofs += (self.size // self.count) * base_index
        self.file.handle.seek(ofs, os.SEEK_SET)

        if sdna_index_refine is None:
            sdna_index_refine = self.sdna_index
        else:
            self.file.ensure_subtype_smaller(self.sdna_index, sdna_index_refine)

        dna_struct = self.file.structs[sdna_index_refine]
        return dna_struct.field_get(
                self.file.header, self.file.handle, path,
                default=default,
                use_nil=use_nil, use_str=use_str,
                )
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def set(self, path, value,
            sdna_index_refine=None,
            ):

        if sdna_index_refine is None:
            sdna_index_refine = self.sdna_index
        else:
            self.file.ensure_subtype_smaller(self.sdna_index, sdna_index_refine)

        dna_struct = self.file.structs[sdna_index_refine]
        self.file.handle.seek(self.file_offset, os.SEEK_SET)
        self.file.is_modified = True
        return dna_struct.field_set(
                self.file.header, self.file.handle, path, value)

    # ---------------
    # Utility get/set
    #
    #   avoid inline pointer casting
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def iter_array(block, length=-1):
        assert(block.code == b'DATA')
        from . import blendfile
        import os
        handle = block.file.handle
        header = block.file.header

        for i in range(length):
            block.file.handle.seek(block.file_offset + (header.pointer_size * i), os.SEEK_SET)
            offset = blendfile.DNA_IO.read_pointer(handle, header)
            sub_block = block.file.find_block_from_offset(offset)
            yield sub_block


# -----------------------------------------------------------------------------
# ID Expand
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def seek(self, pos, whence=os.SEEK_SET):
        """Seek to a position in the file.
        """
        if self.closed:
            raise ValueError("I/O operation on closed file")

        if whence == os.SEEK_SET:
            self.position = min(max(pos, 0), self.size)
        elif whence == os.SEEK_CUR:
            if pos < 0:
                self.position = max(self.position + pos, 0)
            else:
                self.position = min(self.position + pos, self.size)
        elif whence == os.SEEK_END:
            self.position = max(min(self.size + pos, self.size), 0)
        else:
            raise ValueError("Invalid argument")

        self.buffer = b""
        self.fileobj.seek(self.position)
项目:django    作者:alexsukhrin    | 项目源码 | 文件源码
def seek(self, pos, whence=os.SEEK_SET):
        """Seek to a position in the file.
        """
        if self.closed:
            raise ValueError("I/O operation on closed file")

        if whence == os.SEEK_SET:
            self.position = min(max(pos, 0), self.size)
        elif whence == os.SEEK_CUR:
            if pos < 0:
                self.position = max(self.position + pos, 0)
            else:
                self.position = min(self.position + pos, self.size)
        elif whence == os.SEEK_END:
            self.position = max(min(self.size + pos, self.size), 0)
        else:
            raise ValueError("Invalid argument")

        self.buffer = b""
        self.fileobj.seek(self.position)
项目:embeddeddata    作者:toolforge    | 项目源码 | 文件源码
def rar_v4(f):
    # based on http://www.forensicswiki.org/wiki/RAR
    # and http://acritum.com/winrar/rar-format
    while True:
        pos = f.tell()
        crc, typ, flags, size = struct.unpack('<HBHH', f.read(7))
        if flags & 0x8000:
            size += struct.unpack('<L', f.read(4))[0]
        if not 0x72 <= typ <= 0x7b:
            raise FileCorrupted
        f.try_seek(pos + size, os.SEEK_SET)
        # f.try_seek(size, os.SEEK_CUR)
        f.update_pos()
        if typ == 0x7b:
            break
        elif typ == 0x73 and flags & 0x80:
            return Ellipsis  # encrypted, assume bad faith
    return True
项目:RPoint    作者:george17-meet    | 项目源码 | 文件源码
def seek(self, pos, whence=os.SEEK_SET):
        """Seek to a position in the file.
        """
        if self.closed:
            raise ValueError("I/O operation on closed file")

        if whence == os.SEEK_SET:
            self.position = min(max(pos, 0), self.size)
        elif whence == os.SEEK_CUR:
            if pos < 0:
                self.position = max(self.position + pos, 0)
            else:
                self.position = min(self.position + pos, self.size)
        elif whence == os.SEEK_END:
            self.position = max(min(self.size + pos, self.size), 0)
        else:
            raise ValueError("Invalid argument")

        self.buffer = b""
        self.fileobj.seek(self.position)
项目:isni-reconcile    作者:cmh2166    | 项目源码 | 文件源码
def seek(self, pos, whence=os.SEEK_SET):
        """Seek to a position in the file.
        """
        if self.closed:
            raise ValueError("I/O operation on closed file")

        if whence == os.SEEK_SET:
            self.position = min(max(pos, 0), self.size)
        elif whence == os.SEEK_CUR:
            if pos < 0:
                self.position = max(self.position + pos, 0)
            else:
                self.position = min(self.position + pos, self.size)
        elif whence == os.SEEK_END:
            self.position = max(min(self.size + pos, self.size), 0)
        else:
            raise ValueError("Invalid argument")

        self.buffer = b""
        self.fileobj.seek(self.position)
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def _get_org(self, ipnum):
        """
        Seek and return organization (or ISP) name for converted IP addr.
        @param ipnum: Converted IP address
        @type ipnum: int
        @return: org/isp name
        @rtype: str
        """

        seek_org = self._seek_country(ipnum)
        if seek_org == self._databaseSegments:
            return None

        record_pointer = seek_org + (2 * self._recordLength - 1) * self._databaseSegments

        self._filehandle.seek(record_pointer, os.SEEK_SET)

        org_buf = self._filehandle.read(const.MAX_ORG_RECORD_LENGTH)

        return org_buf[:org_buf.index(chr(0))]
项目:AshsSDK    作者:thehappydinoa    | 项目源码 | 文件源码
def seek(self, pos, whence=os.SEEK_SET):
        """Seek to a position in the file.
        """
        if self.closed:
            raise ValueError("I/O operation on closed file")

        if whence == os.SEEK_SET:
            self.position = min(max(pos, 0), self.size)
        elif whence == os.SEEK_CUR:
            if pos < 0:
                self.position = max(self.position + pos, 0)
            else:
                self.position = min(self.position + pos, self.size)
        elif whence == os.SEEK_END:
            self.position = max(min(self.size + pos, self.size), 0)
        else:
            raise ValueError("Invalid argument")

        self.buffer = b""
        self.fileobj.seek(self.position)
项目:python-ebml    作者:QBobWatson    | 项目源码 | 文件源码
def write(self, stream, seekfirst=True):
        """Write header and child elements.

        This checks first whether the Element is in a consistent state.

        Args:
         + stream: As in Element.write().
         + seekfirst: As in Element.write().
        Raises:
         + EbmlException, if the write fails.
         + Inconsistent, if the Element is not in a consistent state.
        """
        self.check_consistency()
        if seekfirst:
            stream.seek(self.pos_absolute, SEEK_SET)
        stream.write(self.header.encode())
        Container._write(self, stream, False)
项目:python-ebml    作者:QBobWatson    | 项目源码 | 文件源码
def __init__(self, f, summary=True):
        """Args:
         + f: Either a file name or a seekable binary stream.
         + summary: If True, call self.read_summary().
        """
        super().__init__(0)
        if isinstance(f, IOBase):
            self.stream = f
        else:
            self.stream = open(f, 'rb')
        self.stream.seek(0, SEEK_END)
        self.stream_size = self.stream.tell()
        self.stream.seek(0, SEEK_SET)

        if summary:
            self.read_summary()
项目:python-ebml    作者:QBobWatson    | 项目源码 | 文件源码
def parse_SeekHead(self, child, stream): #pylint: disable=invalid-name
        "Parse SeekHead element and recursively read elements."
        LOG.debug("Segment: parsed {}".format(child))

        recursed = False
        for seek_entry in child.children_named('Seek'):
            try:
                self.find(seek_entry.seek_pos)
            except ValueError:
                # Recurse if this is the first time we've seen this seek entry
                LOG.debug("Segment: adding seek entry {}".format(seek_entry))
                if seek_entry.seek_id_name != 'Cluster' and stream:
                    # This recursively reads any elements this seek entry
                    # points to that haven't been read already.
                    self.read_element(stream, seek_entry.seek_pos,
                                      summary=True, seekfirst=True)
                    recursed = True
        if recursed:
            stream.seek(child.pos_end_absolute, SEEK_SET)
项目:python-ebml    作者:QBobWatson    | 项目源码 | 文件源码
def write(self, stream, seekfirst=True):
        """Write this element to a binary stream.

        If this element element is not dirty, this method should reproduce the
        byte stream used to read it.

        Raises:
         + ValueError: if the object's data cannot be encoded in self.size
           bytes.
        """
        if seekfirst:
            stream.seek(self.pos_absolute, SEEK_SET)
        stream.write(self.header.encode())
        stream.write(self.encode(self.value, self.size))

    # Virtual
项目:habilitacion    作者:GabrielBD    | 项目源码 | 文件源码
def seek(self, pos, whence=os.SEEK_SET):
        """Seek to a position in the file.
        """
        if self.closed:
            raise ValueError("I/O operation on closed file")

        if whence == os.SEEK_SET:
            self.position = min(max(pos, 0), self.size)
        elif whence == os.SEEK_CUR:
            if pos < 0:
                self.position = max(self.position + pos, 0)
            else:
                self.position = min(self.position + pos, self.size)
        elif whence == os.SEEK_END:
            self.position = max(min(self.size + pos, self.size), 0)
        else:
            raise ValueError("Invalid argument")

        self.buffer = b""
        self.fileobj.seek(self.position)
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def seek(self, offset, whence=os.SEEK_SET):
    """Set the file's current position.

    Args:
      offset: seek offset as number.
      whence: seek mode. Supported modes are os.SEEK_SET (absolute seek),
        and os.SEEK_CUR (seek relative to the current position) and os.SEEK_END
        (seek relative to the end, offset should be negative).
    """
    self._verify_read_mode()
    if whence == os.SEEK_SET:
      self._offset = offset
    elif whence == os.SEEK_CUR:
      self._offset += offset
    elif whence == os.SEEK_END:
      file_stat = self.stat()
      self._offset = file_stat.st_size + offset
    else:
      raise InvalidArgumentError('Whence mode %d is not supported', whence)