Python io 模块,SEEK_CUR 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用io.SEEK_CUR

项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def recvfrom(self, bufsize, flags=0):
        if self.type != socket.SOCK_DGRAM:
            return _BaseSocket.recvfrom(self, bufsize, flags)
        if not self._proxyconn:
            self.bind(("", 0))

        buf = BytesIO(_BaseSocket.recv(self, bufsize, flags))
        buf.seek(+2, SEEK_CUR)
        frag = buf.read(1)
        if ord(frag):
            raise NotImplementedError("Received UDP packet fragment")
        fromhost, fromport = self._read_SOCKS5_address(buf)

        if self.proxy_peername:
            peerhost, peerport = self.proxy_peername
            if fromhost != peerhost or peerport not in (0, fromport):
                raise socket.error(EAGAIN, "Packet filtered")

        return (buf.read(), (fromhost, fromport))
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def recvfrom(self, bufsize, flags=0):
        if self.type != socket.SOCK_DGRAM:
            return _BaseSocket.recvfrom(self, bufsize, flags)
        if not self._proxyconn:
            self.bind(("", 0))

        buf = BytesIO(_BaseSocket.recv(self, bufsize, flags))
        buf.seek(+2, SEEK_CUR)
        frag = buf.read(1)
        if ord(frag):
            raise NotImplementedError("Received UDP packet fragment")
        fromhost, fromport = self._read_SOCKS5_address(buf)

        if self.proxy_peername:
            peerhost, peerport = self.proxy_peername
            if fromhost != peerhost or peerport not in (0, fromport):
                raise socket.error(EAGAIN, "Packet filtered")

        return (buf.read(), (fromhost, fromport))
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def recvfrom(self, bufsize, flags=0):
        if self.type != socket.SOCK_DGRAM:
            return _BaseSocket.recvfrom(self, bufsize, flags)
        if not self._proxyconn:
            self.bind(("", 0))

        buf = BytesIO(_BaseSocket.recv(self, bufsize, flags))
        buf.seek(+2, SEEK_CUR)
        frag = buf.read(1)
        if ord(frag):
            raise NotImplementedError("Received UDP packet fragment")
        fromhost, fromport = self._read_SOCKS5_address(buf)

        if self.proxy_peername:
            peerhost, peerport = self.proxy_peername
            if fromhost != peerhost or peerport not in (0, fromport):
                raise socket.error(EAGAIN, "Packet filtered")

        return (buf.read(), (fromhost, fromport))
项目:Docker-XX-Net    作者:kuanghy    | 项目源码 | 文件源码
def recvfrom(self, bufsize, flags=0):
        if self.type != socket.SOCK_DGRAM:
            return _BaseSocket.recvfrom(self, bufsize, flags)
        if not self._proxyconn:
            self.bind(("", 0))

        buf = BytesIO(_BaseSocket.recv(self, bufsize, flags))
        buf.seek(+2, SEEK_CUR)
        frag = buf.read(1)
        if ord(frag):
            raise NotImplementedError("Received UDP packet fragment")
        fromhost, fromport = self._read_SOCKS5_address(buf)

        if self.proxy_peername:
            peerhost, peerport = self.proxy_peername
            if fromhost != peerhost or peerport not in (0, fromport):
                raise socket.error(EAGAIN, "Packet filtered")

        return (buf.read(), (fromhost, fromport))
项目:bfres    作者:Syroot    | 项目源码 | 文件源码
def load(self, loader: bfres.core.ResFileLoader):
        self.value_type = ShaderParamType(loader.read_byte())
        if loader.res_file.version >= 0x03030000:
            siz_data = loader.read_byte()
            self.data_offset = loader.read_uint16()
            offset = loader.read_int32()  # Uniform variable offset.
            callback_pointer = loader.read_uint32()
            self.depended_index = loader.read_uint16()
            self.depend_index = loader.read_uint16()
            self.name = loader.load_string()
        else:
            # GUESS
            loader.seek(1, io.SEEK_CUR)
            self.data_offset = loader.read_uint16()
            offset = loader.read_int32()  # Uniform variable offset.
            self.name = loader.load_string()
项目:Playground3    作者:CrimsonVista    | 项目源码 | 文件源码
def seek(self, offset, whence=io.SEEK_SET):
        self.__raiseIfClosed()
        if not self.__seekable:
            raise OSError("Seek not enabled for this stream")
        if whence == io.SEEK_SET:
            if offset < 0:
                raise ValueError("Cannot have a negative absolute seek")
            newStreamPosition = offset
        elif whence == io.SEEK_CUR:
            newStreamPosition = self.__streamPosition + whence
        elif whence == io.SEEK_END:
            if not self.__buffers:
                newStreamPosition = 0
            else:
                newStreamPosition = self.__streamEnd + offset
        self.__streamPosition = newStreamPosition
项目:XX-Net-Mini    作者:GFWParty    | 项目源码 | 文件源码
def recvfrom(self, bufsize, flags=0):
        if self.type != socket.SOCK_DGRAM:
            return _BaseSocket.recvfrom(self, bufsize, flags)
        if not self._proxyconn:
            self.bind(("", 0))

        buf = BytesIO(_BaseSocket.recv(self, bufsize, flags))
        buf.seek(+2, SEEK_CUR)
        frag = buf.read(1)
        if ord(frag):
            raise NotImplementedError("Received UDP packet fragment")
        fromhost, fromport = self._read_SOCKS5_address(buf)

        if self.proxy_peername:
            peerhost, peerport = self.proxy_peername
            if fromhost != peerhost or peerport not in (0, fromport):
                raise socket.error(EAGAIN, "Packet filtered")

        return (buf.read(), (fromhost, fromport))
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def seek(self, offset, whence=io.SEEK_CUR):
        return self.buffer.seek(offset, whence)
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def seek(self, offset):
        """
        We need to override the second param to move from the current position.

        :param offset: offset to move away.
        :type offset: int
        """
        return await self.buffer.seek(offset, io.SEEK_CUR)
项目:io_scene_bfres    作者:Syroot    | 项目源码 | 文件源码
def align(self, alignment):
        self.reader.seek(-self.reader.tell() % alignment, io.SEEK_CUR)
项目:io_scene_bfres    作者:Syroot    | 项目源码 | 文件源码
def align(self, alignment):
        self.writer.seek(-self.writer.tell() % alignment, io.SEEK_CUR)
项目:ivaochdoc    作者:ivaoch    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:pyimc    作者:oysstu    | 项目源码 | 文件源码
def peek_header(self):
        bytes_read = self.f.readinto(self.header)
        if bytes_read < ctypes.sizeof(IMCHeader):
            raise RuntimeError('LSF file ended abruptly.')

        # Return file position to before header
        self.f.seek(-ctypes.sizeof(IMCHeader), io.SEEK_CUR)
项目:pyimc    作者:oysstu    | 项目源码 | 文件源码
def write_index(self):
        """
        Run through the lsf-file and generate an index file
        :return:
        """
        self.f.seek(0)

        # Check for file end
        while self.f.peek(1):
            self.peek_header()

            # Timestamp of first message is used to avoid index/lsf mismatch on load
            if self.f.tell() == 0:
                self.idx['timestamp'] = self.header.timestamp

            # Store position for this message
            try:
                self.idx[self.header.mgid].append(self.f.tell())
            except (KeyError, AttributeError) as e:
                self.idx[self.header.mgid] = [self.f.tell()]

            # Go to next message
            self.f.seek(ctypes.sizeof(IMCHeader) + self.header.size + ctypes.sizeof(IMCFooter), io.SEEK_CUR)

        self.f.seek(0)

        # Store index
        fbase, ext = os.path.splitext(self.fpath)
        with open(fbase + '.pyimc_idx', mode='wb') as f:
            pickle.dump(self.idx, f)
项目:pyimc    作者:oysstu    | 项目源码 | 文件源码
def read_message(self):
        """
        Returns a generator that yields the messages in the currently open LSF file.
        This requires the LSFReader object to be opened using the "with" statement.
        See read(), where this is done automatically.
        :return:
        """

        if self.idx:
            # Read using index
            for pos in self.sorted_idx_iter(self.msg_types):
                self.f.seek(pos)
                self.peek_header()
                self.parser.reset()
                b = self.f.read(self.header.size + ctypes.sizeof(IMCHeader) + ctypes.sizeof(IMCFooter))
                msg = self.parser.parse(b)
                yield msg
        else:
            # Read file without index
            # Check for file end
            while self.f.peek(1):
                self.peek_header()

                if not self.msg_types or self.header.mgid in self.msg_types:
                    self.parser.reset()
                    b = self.f.read(self.header.size + ctypes.sizeof(IMCHeader) + ctypes.sizeof(IMCFooter))
                    msg = self.parser.parse(b)
                    yield msg
                else:
                    self.f.seek(ctypes.sizeof(IMCHeader) + self.header.size + ctypes.sizeof(IMCFooter), io.SEEK_CUR)
项目:io_scene_mk8muunt    作者:Syroot    | 项目源码 | 文件源码
def align(self, alignment):
        self.reader.seek(-self.reader.tell() % alignment, io.SEEK_CUR)
项目:io_scene_mk8muunt    作者:Syroot    | 项目源码 | 文件源码
def align(self, alignment):
        self.writer.seek(-self.writer.tell() % alignment, io.SEEK_CUR)
项目:news-for-good    作者:thecodinghub    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:Tencent_Cartoon_Download    作者:Fretice    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:fieldsight-kobocat    作者:awemulya    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:PINCE    作者:korcankaraokcu    | 项目源码 | 文件源码
def invoke(self, arg, from_tty):
        contents_recv = receive_from_pince()
        hex_byte_list = []
        address = contents_recv[0]
        offset = contents_recv[1]
        with open(ScriptUtils.mem_file, "rb") as FILE:
            FILE.seek(address)
            for item in range(offset):
                try:
                    current_item = " ".join(format(n, '02x') for n in FILE.read(1))
                except IOError:
                    current_item = "??"
                    FILE.seek(1, io.SEEK_CUR)  # Necessary since read() failed to execute
                hex_byte_list.append(current_item)
        send_to_pince(hex_byte_list)
项目:python3-ipfs-api    作者:jgraef    | 项目源码 | 文件源码
def seek(self, offset, whence):
        if (whence == io.SEEK_SET):
            self._pos = offset
        elif (whence == io.SEEK_CUR):
            self._pos += offset
        elif (whence == io.SEEK_END):
            self._pos = self._file._filesize + offset
        return self._pos
项目:python3-ipfs-api    作者:jgraef    | 项目源码 | 文件源码
def test_small_file_seek_and_read(self):
        with self.fs.open(self.KEY_LOGO_PNG, "rb") as f:
            self.assertEqual(64, f.seek(64, io.SEEK_CUR))
            self.assertEqual(128, f.seek(64, io.SEEK_CUR))
            self.assertEqual(256, f.seek(256, io.SEEK_SET))
            self.assertEqual(24610, f.seek(-256, io.SEEK_END))
            self.assertEqual(
                b'\x04$\x00_\x85$\xfb^\xf8\xe8]\x7f;}\xa8\xb7',
                f.read(16))
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:pyfilesystem2    作者:PyFilesystem    | 项目源码 | 文件源码
def seekable(self):
        try:
            return self._f.seekable()
        except AttributeError:
            try:
                self.seek(0, SEEK_CUR)
            except IOError:
                return False
            else:
                return True
项目:ivport-v2    作者:ivmech    | 项目源码 | 文件源码
def seek(self, offset, whence=io.SEEK_SET):
        """
        Change the stream position to the given byte *offset*. *offset* is
        interpreted relative to the position indicated by *whence*. Values for
        *whence* are:

        * ``SEEK_SET`` or ``0`` – start of the stream (the default); *offset*
          should be zero or positive

        * ``SEEK_CUR`` or ``1`` – current stream position; *offset* may be
          negative

        * ``SEEK_END`` or ``2`` – end of the stream; *offset* is usually
          negative

        Return the new absolute position.
        """
        with self.lock:
            if whence == io.SEEK_CUR:
                offset = self._pos + offset
            elif whence == io.SEEK_END:
                offset = self._length + offset
            if offset < 0:
                raise ValueError(
                    'New position is before the start of the stream')
            self._set_pos(offset)
            return self._pos
项目:CloudPrint    作者:William-An    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:pyheatshrink    作者:johan-sports    | 项目源码 | 文件源码
def test_seeking_forward_from_current(self):
        contents = TEXT

        with EncodedFile(io.BytesIO(COMPRESSED)) as fp:
            self.assertEqual(fp.read(100), contents[:100])
            fp.seek(50, io.SEEK_CUR)  # Move 50 forwards
            self.assertEqual(fp.read(100), contents[150:250])
项目:pyheatshrink    作者:johan-sports    | 项目源码 | 文件源码
def test_seeking_backwards_from_current(self):
        with EncodedFile(io.BytesIO(COMPRESSED)) as fp:
            contents = fp.read()
            fp.seek(-100, io.SEEK_CUR)
            self.assertEqual(fp.read(), contents[-100:])
项目:pyheatshrink    作者:johan-sports    | 项目源码 | 文件源码
def test_seeking_beyond_beginning_from_current(self):
        with EncodedFile(io.BytesIO(COMPRESSED)) as fp:
            self.assertRaises(IOError, fp.seek, -100, io.SEEK_CUR)
项目:pyheatshrink    作者:johan-sports    | 项目源码 | 文件源码
def seek(self, offset, whence=io.SEEK_SET):
        # Recalculate offset as an absolute file position.
        if whence == io.SEEK_SET:
            pass
        elif whence == io.SEEK_CUR:
            offset += self._pos
        elif whence == io.SEEK_END:
            if self._size < 0:
                # Finish reading the file
                while self.read(io.DEFAULT_BUFFER_SIZE):
                    pass
            offset += self._size
        else:
            raise ValueError('Invalid value for whence: {}'.format(whence))

        if offset < 0:
            msg = '[Error {code}] {msg}'
            raise IOError(msg.format(code=errno.EINVAL,
                                     msg=os.strerror(errno.EINVAL)))

        # Make it so that offset is the number of bytes to skip forward.
        if offset < self._pos:
            self._rewind()
        else:
            offset -= self._pos

        # Read and discard data until we reach the desired position
        while offset > 0:
            data = self.read(min(io.DEFAULT_BUFFER_SIZE, offset))
            if not data:
                break
            offset -= len(data)

        return self._pos
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:gardenbot    作者:GoestaO    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:projeto    作者:BarmyPenguin    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:flask-zhenai-mongo-echarts    作者:Fretice    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:jumper-ble-logger    作者:Jumperr-labs    | 项目源码 | 文件源码
def run(self):
        try:
            while not self._should_stop:
                readable, _, _ = select.select(self._inputs, [], [], 1)

                if self._hci_socket in readable:
                    source = 'socket'
                    packet = self._hci_socket.recv(4096)
                    self._logger.debug('SOCKET: %s', RawCopy(HciPacket).parse(packet))
                    self.handle_packet(packet, source)

                if self._pty_fd in readable:
                    data = os.read(self._pty_master, 4096)
                    self._logger.debug('Raw PTY data: %s', repr(data))
                    self._pty_buffer.write(data)
                    self._pty_buffer.seek(-len(data), SEEK_CUR)

                    source = 'pty'
                    while True:
                        if self._pty_buffer.pos == self._pty_buffer.len:
                            break
                        parsed_packet = RawCopy(HciPacket).parse_stream(self._pty_buffer)
                        if not parsed_packet:
                            break
                        self._logger.debug('PTY: %s', parsed_packet)
                        packet = parsed_packet.data
                        self.handle_packet(packet, source)
        except KeyboardInterrupt:
            log.info("Received SIGTERM, exiting")
项目:aweasome_learning    作者:Knight-ZXW    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:blog_flask    作者:momantai    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:bfres    作者:Syroot    | 项目源码 | 文件源码
def load(self, loader: bfres.core.ResFileLoader):
        loader.check_signature(self._SIGNATURE)
        self._flags = loader.read_uint32()
        num_bone = loader.read_uint16()
        num_smooth_matrix = loader.read_uint16()
        num_rigid_matrix = loader.read_uint16()
        loader.seek(2, io.SEEK_CUR)
        self.bones = loader.load_dict(bfres.Bone)
        ofs_bone_list = loader.read_offset()  # Only load dict
        self.matrix_to_bone_list = loader.load_custom(lambda l: l.read_uint16s(num_smooth_matrix + num_rigid_matrix))
        self.inverse_model_matrices = loader.load_custom(lambda l: l.read_matrix3x4s(num_smooth_matrix))
        user_pointer = loader.read_uint32()
项目:bfres    作者:Syroot    | 项目源码 | 文件源码
def load(self, loader: bfres.core.ResFileLoader):
        loader.check_signature("FVTX")
        num_vertex_attrib = loader.read_byte()
        num_buffer = loader.read_byte()
        idx = loader.read_uint16()
        vertex_count = loader.read_uint32()
        self.vertex_skin_count = loader.read_byte()
        loader.seek(3, io.SEEK_CUR)
        ofs_vertex_attrib_list = loader.read_offset()  # Only load dict.
        self.attributes = loader.load_dict(bfres.VertexAttrib)
        self.buffers = loader.load_list(bfres.Buffer, num_buffer)
        user_pointer = loader.read_uint32()
项目:bfres    作者:Syroot    | 项目源码 | 文件源码
def load(self, loader: bfres.core.ResFileLoader):
        self.primitive_type = bfres.gx2.GX2PrimitiveType(loader.read_uint32())
        self.index_format = bfres.gx2.GX2IndexFormat(loader.read_uint32())
        index_count = loader.read_uint32()
        num_sub_mesh = loader.read_uint16()
        loader.seek(2, io.SEEK_CUR)
        self.sub_meshes = loader.load_list(bfres.SubMesh, num_sub_mesh)
        self.index_buffer = loader.load(bfres.Buffer)
        self.first_vertex = loader.read_uint32()
项目:bfres    作者:Syroot    | 项目源码 | 文件源码
def load(self, loader: bfres.core.ResFileLoader):
        self.name = loader.load_string()
        self.buffer_index = loader.read_byte()
        loader.seek(1, io.SEEK_CUR)
        self.offset = loader.read_uint16()
        self.format = bfres.gx2.GX2AttribFormat(loader.read_uint32())
项目:bfres    作者:Syroot    | 项目源码 | 文件源码
def load(self, loader: bfres.core.ResFileLoader):
        count = loader.read_uint16()
        self.value_type = RenderInfoType(loader.read_byte())
        loader.seek(1, io.SEEK_CUR)
        self.name = loader.load_string()
        if self.value_type == RenderInfoType.INT32:
            self.value = loader.read_int32s(count)
        elif self.value_type == RenderInfoType.SINGLE:
            self.value = loader.read_singles(count)
        elif self.value_type == RenderInfoType.STRING:
            self.value = loader.load_strings(count)
项目:bfres    作者:Syroot    | 项目源码 | 文件源码
def load(self, loader: bfres.core.ResFileLoader):
        loader.check_signature("FTEX")
        self.dim = bfres.gx2.GX2SurfaceDim(loader.read_uint32())
        self.width = loader.read_uint32()
        self.height = loader.read_uint32()
        self.depth = loader.read_uint32()
        self.mip_count = loader.read_uint32()
        self.format = bfres.gx2.GX2SurfaceFormat(loader.read_uint32())
        self.aa_mode = bfres.gx2.GX2AAMode(loader.read_uint32())
        self.use = bfres.gx2.GX2SurfaceUse(loader.read_uint32())
        siz_data = loader.read_uint32()
        image_pointer = loader.read_uint32()
        siz_map_data = loader.read_uint32()
        mip_pointer = loader.read_uint32()
        self.tile_mode = bfres.gx2.GX2TileMode(loader.read_uint32())
        self.swizzle = loader.read_uint32()
        self.alignment = loader.read_uint32()
        self.pitch = loader.read_uint32()
        self.mip_offsets = loader.read_uint32s(13)
        self.view_mip_first = loader.read_uint32()
        self.view_mip_count = loader.read_uint32()
        self.view_slice_first = loader.read_uint32()
        self.view_slice_count = loader.read_uint32()
        self.comp_sel_r = bfres.gx2.GX2CompSel(loader.read_byte())
        self.comp_sel_g = bfres.gx2.GX2CompSel(loader.read_byte())
        self.comp_sel_b = bfres.gx2.GX2CompSel(loader.read_byte())
        self.comp_sel_a = bfres.gx2.GX2CompSel(loader.read_byte())
        self.regs = loader.read_uint32s(5)
        handle = loader.read_uint32()
        self.array_length = loader.read_uint32()  # Possibly just a byte.
        self.name = loader.load_string()
        self.path = loader.load_string()
        self.data = loader.load_custom(lambda l: l.read_bytes(siz_data))
        self.mip_data = loader.load_custom(lambda l: l.read_bytes(siz_map_data))
        self.user_data = loader.load_dict(bfres.UserData)
        num_user_data = loader.read_uint16()
        loader.seek(2, io.SEEK_CUR)
项目:bfres    作者:Syroot    | 项目源码 | 文件源码
def load(self, loader: bfres.core.ResFileLoader):
        self.name = loader.load_string()
        count = loader.read_uint16()
        self.value_type = UserDataType(loader.read_byte())
        loader.seek(1, io.SEEK_CUR)
        if self.value_type == UserDataType.INT32:
            self.value = loader.read_int32s(count)
        elif self.value_type == UserDataType.SINGLE:
            self.value = loader.read_singles(count)
        elif self.value_type == UserDataType.STRING:
            self.value = loader.load_strings(count, "ascii")
        elif self.value_type == UserDataType.WSTRING:
            self.value = loader.load_strings(count, "utf-8")
        elif self.value_type == UserDataType.BYTE:
            self.value = loader.read_bytes(count)
项目:MyFriend-Rob    作者:lcheniv    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:Playground3    作者:CrimsonVista    | 项目源码 | 文件源码
def seek(self, offset, whence=SEEK_SET):
        "Adjust seek from prefix start and, if present, from prefix"
        if not self._rawStreamSize() >= (self.PREFIX_SIZE + self.SUFFIX_SIZE):
            return
        if whence == SEEK_SET:
            offset += self._prefixStart + self.PREFIX_SIZE
            return self._stream.seek(offset, whence)
        elif whence == SEEK_CUR:
            return self._stream.seek(offset, whence)
        elif whence == SEEK_END:
            # even if the suffix hasn't been received yet, we calculate our offsets as if it had.
            # why? because if it hasn't been received yet, we don't want to finish! The whole packet
            # isn't framed (verified) until the final bytes are received.
            offset = offset - self.SUFFIX_SIZE
            return self._stream.seek(offset, whence)
项目:pixelinkds    作者:hgrecco    | 项目源码 | 文件源码
def from_file(cls, fp, size=None):
        """Construct a Frame Descriptor object from binary data.

        Parameters
        ----------
        fp : file-like
             File-like object to read the Frame Descriptor from.
             The cursor must be at the position where the Frame Descriptor starts.

        size : int, optional
               the number of bytes to be read.
               If not given, the value inspected from the fp itself will be used.

        Returns
        -------
        FrameDescriptor

        .. note:: A number of bytes equal to `size` will be consumed from the fp.

        .. warning:
                  If the number of requested bytes is smaller than the size of the
                  Frame Descriptor, some fields might be left unfilled.


        """
        out = FrameDescriptor()

        if size is None:
            size = np.fromfile(fp, dtype=np.uint32, count=1)
            if size.size == 0:
                raise ValueError('Unexpected EOF')

            fp.seek(-size.nbytes, io.SEEK_CUR)

            size = size[0]

        if size > ct.sizeof(FrameDescriptor):
            raise ValueError('Number of requested bytes (%d) do not fit '
                             'in the data structure (%d)' % (size, ct.sizeof(out)))

        buf = fp.read(size)

        ct.memmove(ct.addressof(out), buf, size)

        return out