Python os 模块,lseek() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.lseek()

项目:isar    作者:ilbers    | 项目源码 | 文件源码
def _lseek(file_obj, offset, whence):
    """This is a helper function which invokes 'os.lseek' for file object
    'file_obj' and with specified 'offset' and 'whence'. The 'whence'
    argument is supposed to be either '_SEEK_DATA' or '_SEEK_HOLE'. When
    there is no more data or hole starting from 'offset', this function
    returns '-1'.  Otherwise the data or hole position is returned."""

    try:
        return os.lseek(file_obj.fileno(), offset, whence)
    except OSError as err:
        # The 'lseek' system call returns the ENXIO if there is no data or
        # hole starting from the specified offset.
        if err.errno == os.errno.ENXIO:
            return -1
        elif err.errno == os.errno.EINVAL:
            raise ErrorNotSupp("the kernel or file-system does not support "
                               "\"SEEK_HOLE\" and \"SEEK_DATA\"")
        else:
            raise
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_fs_holes(self):
        # Even if the filesystem doesn't report holes,
        # if the OS supports it the SEEK_* constants
        # will be defined and will have a consistent
        # behaviour:
        # os.SEEK_DATA = current position
        # os.SEEK_HOLE = end of file position
        with open(support.TESTFN, 'r+b') as fp:
            fp.write(b"hello")
            fp.flush()
            size = fp.tell()
            fno = fp.fileno()
            try :
                for i in range(size):
                    self.assertEqual(i, os.lseek(fno, i, os.SEEK_DATA))
                    self.assertLessEqual(size, os.lseek(fno, i, os.SEEK_HOLE))
                self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_DATA)
                self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_HOLE)
            except OSError :
                # Some OSs claim to support SEEK_HOLE/SEEK_DATA
                # but it is not true.
                # For instance:
                # http://lists.freebsd.org/pipermail/freebsd-amd64/2012-January/014332.html
                raise unittest.SkipTest("OSError raised!")
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def is_sparse(path):
    # LP: #1656371 - Looking at stat().st_blocks won't work on ZFS file
    # systems, since that seems to return 1, whereas on EXT4 it returns 0.
    # Rather than hard code the value based on file system type (which could
    # be different even on other file systems), a more reliable way seems to
    # be to use SEEK_DATA with an offset of 0 to find the first block of data
    # after position 0.  If there is no data, an ENXIO will get raised, at
    # least on any modern Linux kernels we care about.  See lseek(2) for
    # details.
    with open(path, 'r') as fp:
        try:
            os.lseek(fp.fileno(), 0, os.SEEK_DATA)
        except OSError as error:
            # There is no OSError subclass for ENXIO.
            if error.errno != errno.ENXIO:
                raise
            # The expected exception occurred, meaning, there is no data in
            # the file, so it's entirely sparse.
            return True
    # The expected exception did not occur, so there is data in the file.
    return False
项目:bmap-tools    作者:intel    | 项目源码 | 文件源码
def _lseek(file_obj, offset, whence):
    """This is a helper function which invokes 'os.lseek' for file object
    'file_obj' and with specified 'offset' and 'whence'. The 'whence'
    argument is supposed to be either '_SEEK_DATA' or '_SEEK_HOLE'. When
    there is no more data or hole starting from 'offset', this function
    returns '-1'.  Otherwise the data or hole position is returned."""

    try:
        return os.lseek(file_obj.fileno(), offset, whence)
    except OSError as err:
        # The 'lseek' system call returns the ENXIO if there is no data or
        # hole starting from the specified offset.
        if err.errno == os.errno.ENXIO:
            return -1
        elif err.errno == os.errno.EINVAL:
            raise ErrorNotSupp("the kernel or file-system does not support "
                               "\"SEEK_HOLE\" and \"SEEK_DATA\"")
        else:
            raise
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_writev(self):
        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
        try:
            n = os.writev(fd, (b'test1', b'tt2', b't3'))
            self.assertEqual(n, 10)

            os.lseek(fd, 0, os.SEEK_SET)
            self.assertEqual(b'test1tt2t3', posix.read(fd, 10))

            # Issue #20113: empty list of buffers should not crash
            try:
                size = posix.writev(fd, [])
            except OSError:
                # writev(fd, []) raises OSError(22, "Invalid argument")
                # on OpenIndiana
                pass
            else:
                self.assertEqual(size, 0)
        finally:
            os.close(fd)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_readv(self):
        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
        try:
            os.write(fd, b'test1tt2t3')
            os.lseek(fd, 0, os.SEEK_SET)
            buf = [bytearray(i) for i in [5, 3, 2]]
            self.assertEqual(posix.readv(fd, buf), 10)
            self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf])

            # Issue #20113: empty list of buffers should not crash
            try:
                size = posix.readv(fd, [])
            except OSError:
                # readv(fd, []) raises OSError(22, "Invalid argument")
                # on OpenIndiana
                pass
            else:
                self.assertEqual(size, 0)
        finally:
            os.close(fd)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_fs_holes(self):
        # Even if the filesystem doesn't report holes,
        # if the OS supports it the SEEK_* constants
        # will be defined and will have a consistent
        # behaviour:
        # os.SEEK_DATA = current position
        # os.SEEK_HOLE = end of file position
        with open(support.TESTFN, 'r+b') as fp:
            fp.write(b"hello")
            fp.flush()
            size = fp.tell()
            fno = fp.fileno()
            try :
                for i in range(size):
                    self.assertEqual(i, os.lseek(fno, i, os.SEEK_DATA))
                    self.assertLessEqual(size, os.lseek(fno, i, os.SEEK_HOLE))
                self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_DATA)
                self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_HOLE)
            except OSError :
                # Some OSs claim to support SEEK_HOLE/SEEK_DATA
                # but it is not true.
                # For instance:
                # http://lists.freebsd.org/pipermail/freebsd-amd64/2012-January/014332.html
                raise unittest.SkipTest("OSError raised!")
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_writev(self):
        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
        try:
            n = os.writev(fd, (b'test1', b'tt2', b't3'))
            self.assertEqual(n, 10)

            os.lseek(fd, 0, os.SEEK_SET)
            self.assertEqual(b'test1tt2t3', posix.read(fd, 10))

            # Issue #20113: empty list of buffers should not crash
            try:
                size = posix.writev(fd, [])
            except OSError:
                # writev(fd, []) raises OSError(22, "Invalid argument")
                # on OpenIndiana
                pass
            else:
                self.assertEqual(size, 0)
        finally:
            os.close(fd)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_readv(self):
        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
        try:
            os.write(fd, b'test1tt2t3')
            os.lseek(fd, 0, os.SEEK_SET)
            buf = [bytearray(i) for i in [5, 3, 2]]
            self.assertEqual(posix.readv(fd, buf), 10)
            self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf])

            # Issue #20113: empty list of buffers should not crash
            try:
                size = posix.readv(fd, [])
            except OSError:
                # readv(fd, []) raises OSError(22, "Invalid argument")
                # on OpenIndiana
                pass
            else:
                self.assertEqual(size, 0)
        finally:
            os.close(fd)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_fs_holes(self):
        # Even if the filesystem doesn't report holes,
        # if the OS supports it the SEEK_* constants
        # will be defined and will have a consistent
        # behaviour:
        # os.SEEK_DATA = current position
        # os.SEEK_HOLE = end of file position
        with open(support.TESTFN, 'r+b') as fp:
            fp.write(b"hello")
            fp.flush()
            size = fp.tell()
            fno = fp.fileno()
            try :
                for i in range(size):
                    self.assertEqual(i, os.lseek(fno, i, os.SEEK_DATA))
                    self.assertLessEqual(size, os.lseek(fno, i, os.SEEK_HOLE))
                self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_DATA)
                self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_HOLE)
            except OSError :
                # Some OSs claim to support SEEK_HOLE/SEEK_DATA
                # but it is not true.
                # For instance:
                # http://lists.freebsd.org/pipermail/freebsd-amd64/2012-January/014332.html
                raise unittest.SkipTest("OSError raised!")
项目:NFStest    作者:thombashi    | 项目源码 | 文件源码
def write_data(self, fd, offset=0, size=None, pattern=None):
        """Write data to the file given by the file descriptor

           fd:
               File descriptor
           offset:
               File offset where data will be written to [default: 0]
           size:
               Total number of bytes to write [default: --filesize option]
           pattern:
               Data pattern to write to the file [default: data_pattern default]
        """
        if size is None:
            size = self.filesize

        while size > 0:
            # Write as much as wsize bytes per write call
            dsize = min(self.wsize, size)
            os.lseek(fd, offset, 0)
            count = os.write(fd, self.data_pattern(offset, dsize, pattern))
            size -= count
            offset += count
项目:packetary    作者:openstack    | 项目源码 | 文件源码
def _copy_stream(self, fd, url, offset):
        """Copies remote file to local.

        :param fd: the file`s descriptor
        :param url: the remote file`s url
        :param offset: the number of bytes from the beginning,
                       that will be skipped
        :return: the count of actually copied bytes
        """

        source = self.open_stream(url, offset)
        os.ftruncate(fd, offset)
        os.lseek(fd, offset, os.SEEK_SET)
        chunk_size = 16 * 1024
        size = 0
        while 1:
            chunk = source.read(chunk_size)
            if not chunk:
                break
            os.write(fd, chunk)
            size += len(chunk)
        return size
项目:blobfs    作者:mbartoli    | 项目源码 | 文件源码
def read(self, path, length, offset, fh):
        if debug:
            print "read:       " + path
            print "offset:  " 
            print offset
            print "length: "
            print length 
            print fh
        full_path = self._full_path(path)
        print full_path
        #os.lseek(fh, offset, os.SEEK_SET)
        #if os.path.isfile(full_path) == False:
        import config as config
        account_name = config.STORAGE_ACCOUNT_NAME
        account_key = config.STORAGE_ACCOUNT_KEY
        containername = path.split('/')[1]
        filename = path.split('/')[2]
        service = baseblobservice.BaseBlobService(account_name, account_key)
        blob = service.get_blob_to_bytes(containername, filename, None, offset, offset+length-1)
        #blob = blob[offset:(offset+length)]
        bytes = blob.content 
        return bytes
        """try:
            if os.path.isdir(path.split('/')[1]) == False:
                os.mkdir(full_path.split('/')[0]+'/'+containername)
            if os.path.isfile(full_path) == False:
                print "read block blob" 
                block_blob_service.get_blob_to_path(containername, filename, full_path)
            else:
                os.remove(full_path)
                block_blob_service.get_blob_to_path(containername, filename, full_path)
        except:
            pass

        fhn = os.open(full_path, 32768)
        os.lseek(fhn, offset, os.SEEK_SET)

        #print "os.read(fh, length)"
        #print os.read(fh, length)
        return os.read(fhn, length)"""
项目:blobfs    作者:mbartoli    | 项目源码 | 文件源码
def write(self, path, buf, offset, fh):
        if debug:
            print "write:   " + path
        os.lseek(fh, offset, os.SEEK_SET)
        return os.write(fh, buf)
项目:abusehelper    作者:Exploit-install    | 项目源码 | 文件源码
def try_seek(fd, offset):
    try:
        if offset is None:
            os.lseek(fd, 0, os.SEEK_END)
        elif offset >= 0:
            os.lseek(fd, offset, os.SEEK_SET)
        else:
            os.lseek(fd, offset, os.SEEK_END)
    except OSError as ose:
        if ose.args[0] != errno.ESPIPE:
            raise
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def readChunk(self, offset, length):
        return self.server.avatar._runAsUser([ (os.lseek, (self.fd, offset, 0)),
                                               (os.read, (self.fd, length)) ])
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def writeChunk(self, offset, data):
        return self.server.avatar._runAsUser([(os.lseek, (self.fd, offset, 0)),
                                       (os.write, (self.fd, data))])
项目:python-lunrclient    作者:rackerlabs    | 项目源码 | 文件源码
def get_volume(self, id):
        """
        return volume information if the argument is an id or a path
        """
        # If the id is actually a path
        if exists(id):
            with open(id) as file:
                size = os.lseek(file.fileno(), 0, os.SEEK_END)
            return {'path': id, 'size': size}
        return self.volume.get(id)
项目:iotronic-lightning-rod    作者:openstack    | 项目源码 | 文件源码
def read(self, path, length, offset, fh):
        os.lseek(fh, offset, os.SEEK_SET)
        return os.read(fh, length)
项目:iotronic-lightning-rod    作者:openstack    | 项目源码 | 文件源码
def write(self, path, buf, offset, fh):
        os.lseek(fh, offset, os.SEEK_SET)
        return os.write(fh, buf)
项目:iotronic-lightning-rod    作者:openstack    | 项目源码 | 文件源码
def read(self, path, length, offset, fh):
        os.lseek(fh, offset, os.SEEK_SET)
        return os.read(fh, length)
项目:iotronic-lightning-rod    作者:openstack    | 项目源码 | 文件源码
def write(self, path, buf, offset, fh):
        os.lseek(fh, offset, os.SEEK_SET)
        return os.write(fh, buf)
项目:iotronic-lightning-rod    作者:openstack    | 项目源码 | 文件源码
def read(self, path, length, offset, fh):
        os.lseek(fh, offset, os.SEEK_SET)
        return os.read(fh, length)
项目:PiBunny    作者:tholum    | 项目源码 | 文件源码
def smbComWrite(connId, smbServer, SMBCommand, recvPacket):
        connData = smbServer.getConnectionData(connId)

        respSMBCommand        = smb.SMBCommand(smb.SMB.SMB_COM_WRITE)
        respParameters        = smb.SMBWriteResponse_Parameters()
        respData              = ''

        comWriteParameters =  smb.SMBWrite_Parameters(SMBCommand['Parameters'])
        comWriteData = smb.SMBWrite_Data(SMBCommand['Data'])

        if connData['OpenedFiles'].has_key(comWriteParameters['Fid']):
             fileHandle = connData['OpenedFiles'][comWriteParameters['Fid']]['FileHandle']
             errorCode = STATUS_SUCCESS
             try:
                 if fileHandle != PIPE_FILE_DESCRIPTOR:
                     # TODO: Handle big size files
                     # If we're trying to write past the file end we just skip the write call (Vista does this)
                     if os.lseek(fileHandle, 0, 2) >= comWriteParameters['Offset']: 
                         os.lseek(fileHandle,comWriteParameters['Offset'],0)
                         os.write(fileHandle,comWriteData['Data'])
                 else:
                     sock = connData['OpenedFiles'][comWriteParameters['Fid']]['Socket']
                     sock.send(comWriteData['Data'])
                 respParameters['Count']    = comWriteParameters['Count']
             except Exception, e:
                 smbServer.log('smbComWrite: %s' % e, logging.ERROR)
                 errorCode = STATUS_ACCESS_DENIED
        else:
            errorCode = STATUS_INVALID_HANDLE


        if errorCode > 0:
            respParameters = ''
            respData       = ''

        respSMBCommand['Parameters']             = respParameters
        respSMBCommand['Data']                   = respData 
        smbServer.setConnectionData(connId, connData)

        return [respSMBCommand], None, errorCode
项目:PiBunny    作者:tholum    | 项目源码 | 文件源码
def smbComRead(connId, smbServer, SMBCommand, recvPacket):
        connData = smbServer.getConnectionData(connId)

        respSMBCommand        = smb.SMBCommand(smb.SMB.SMB_COM_READ)
        respParameters        = smb.SMBReadResponse_Parameters()
        respData              = smb.SMBReadResponse_Data()

        comReadParameters =  smb.SMBRead_Parameters(SMBCommand['Parameters'])

        if connData['OpenedFiles'].has_key(comReadParameters['Fid']):
             fileHandle = connData['OpenedFiles'][comReadParameters['Fid']]['FileHandle']
             errorCode = STATUS_SUCCESS
             try:
                 if fileHandle != PIPE_FILE_DESCRIPTOR:
                     # TODO: Handle big size files
                     os.lseek(fileHandle,comReadParameters['Offset'],0)
                     content = os.read(fileHandle,comReadParameters['Count'])
                 else:
                     sock = connData['OpenedFiles'][comReadParameters['Fid']]['Socket']
                     content = sock.recv(comReadParameters['Count'])
                 respParameters['Count']    = len(content)
                 respData['DataLength']     = len(content)
                 respData['Data']           = content
             except Exception, e:
                 smbServer.log('smbComRead: %s ' % e, logging.ERROR)
                 errorCode = STATUS_ACCESS_DENIED
        else:
            errorCode = STATUS_INVALID_HANDLE

        if errorCode > 0:
            respParameters = ''
            respData       = ''

        respSMBCommand['Parameters']             = respParameters
        respSMBCommand['Data']                   = respData 
        smbServer.setConnectionData(connId, connData)

        return [respSMBCommand], None, errorCode
项目:PiBunny    作者:tholum    | 项目源码 | 文件源码
def smb2Read(connId, smbServer, recvPacket):
        connData = smbServer.getConnectionData(connId)

        respSMBCommand = smb2.SMB2Read_Response()
        readRequest   = smb2.SMB2Read(recvPacket['Data'])

        respSMBCommand['Buffer'] = '\x00'

        if str(readRequest['FileID']) == '\xff'*16:
            # Let's take the data from the lastRequest
            if  connData['LastRequest'].has_key('SMB2_CREATE'):
                fileID = connData['LastRequest']['SMB2_CREATE']['FileID']
            else:
                fileID = str(readRequest['FileID'])
        else:
            fileID = str(readRequest['FileID'])

        if connData['OpenedFiles'].has_key(fileID):
             fileHandle = connData['OpenedFiles'][fileID]['FileHandle']
             errorCode = 0
             try:
                 if fileHandle != PIPE_FILE_DESCRIPTOR:
                     offset = readRequest['Offset']
                     os.lseek(fileHandle,offset,0)
                     content = os.read(fileHandle,readRequest['Length'])
                 else:
                     sock = connData['OpenedFiles'][fileID]['Socket']
                     content = sock.recv(readRequest['Length'])

                 respSMBCommand['DataOffset']   = 0x50
                 respSMBCommand['DataLength']   = len(content)
                 respSMBCommand['DataRemaining']= 0
                 respSMBCommand['Buffer']       = content
             except Exception, e:
                 smbServer.log('SMB2_READ: %s ' % e, logging.ERROR)
                 errorCode = STATUS_ACCESS_DENIED
        else:
            errorCode = STATUS_INVALID_HANDLE

        smbServer.setConnectionData(connId, connData)
        return [respSMBCommand], None, errorCode
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def seekable(self):
        if self._seekable is None:
            try:
                os.lseek(self._fileno, 0, os.SEEK_CUR)
            except OSError:
                self._seekable = False
            else:
                self._seekable = True
        return self._seekable
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def seek(self, offset, whence=0):
        return os.lseek(self._fileno, offset, whence)
项目:RealtimePythonChat    作者:quangtqag    | 项目源码 | 文件源码
def seekable(self):
        if self._seekable is None:
            try:
                os.lseek(self._fileno, 0, os.SEEK_CUR)
            except OSError:
                self._seekable = False
            else:
                self._seekable = True
        return self._seekable
项目:RealtimePythonChat    作者:quangtqag    | 项目源码 | 文件源码
def seek(self, offset, whence=0):
        return os.lseek(self._fileno, offset, whence)
项目:isar    作者:ilbers    | 项目源码 | 文件源码
def _probe_seek_hole(self):
        """
        Check whether the system implements 'SEEK_HOLE' and 'SEEK_DATA'.
        Unfortunately, there seems to be no clean way for detecting this,
        because often the system just fakes them by just assuming that all
        files are fully mapped, so 'SEEK_HOLE' always returns EOF and
        'SEEK_DATA' always returns the requested offset.

        I could not invent a better way of detecting the fake 'SEEK_HOLE'
        implementation than just to create a temporary file in the same
        directory where the image file resides. It would be nice to change this
        to something better.
        """

        directory = os.path.dirname(self._image_path)

        try:
            tmp_obj = tempfile.TemporaryFile("w+", dir=directory)
        except IOError as err:
            raise ErrorNotSupp("cannot create a temporary in \"%s\": %s"
                              % (directory, err))

        try:
            os.ftruncate(tmp_obj.fileno(), self.block_size)
        except OSError as err:
            raise ErrorNotSupp("cannot truncate temporary file in \"%s\": %s"
                               % (directory, err))

        offs = _lseek(tmp_obj, 0, _SEEK_HOLE)
        if offs != 0:
            # We are dealing with the stub 'SEEK_HOLE' implementation which
            # always returns EOF.
            self._log.debug("lseek(0, SEEK_HOLE) returned %d" % offs)
            raise ErrorNotSupp("the file-system does not support "
                               "\"SEEK_HOLE\" and \"SEEK_DATA\" but only "
                               "provides a stub implementation")

        tmp_obj.close()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_read(self):
        with open(support.TESTFN, "w+b") as fobj:
            fobj.write(b"spam")
            fobj.flush()
            fd = fobj.fileno()
            os.lseek(fd, 0, 0)
            s = os.read(fd, 4)
            self.assertEqual(type(s), bytes)
            self.assertEqual(s, b"spam")
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_lseek(self):
        if hasattr(os, "lseek"):
            self.check(os.lseek, 0, 0)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_stdin_filedes(self):
        # stdin is set to open file descriptor
        tf = tempfile.TemporaryFile()
        self.addCleanup(tf.close)
        d = tf.fileno()
        os.write(d, b"pear")
        os.lseek(d, 0, 0)
        p = subprocess.Popen([sys.executable, "-c",
                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
                         stdin=d)
        p.wait()
        self.assertEqual(p.returncode, 1)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_stdout_filedes(self):
        # stdout is set to open file descriptor
        tf = tempfile.TemporaryFile()
        self.addCleanup(tf.close)
        d = tf.fileno()
        p = subprocess.Popen([sys.executable, "-c",
                          'import sys; sys.stdout.write("orange")'],
                         stdout=d)
        p.wait()
        os.lseek(d, 0, 0)
        self.assertEqual(os.read(d, 1024), b"orange")
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_stderr_filedes(self):
        # stderr is set to open file descriptor
        tf = tempfile.TemporaryFile()
        self.addCleanup(tf.close)
        d = tf.fileno()
        p = subprocess.Popen([sys.executable, "-c",
                          'import sys; sys.stderr.write("strawberry")'],
                         stderr=d)
        p.wait()
        os.lseek(d, 0, 0)
        self.assertStderrEqual(os.read(d, 1024), b"strawberry")
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_textmode(self):
        # _mkstemp_inner can create files in text mode
        if not has_textmode:
            return            # ugh, can't use SkipTest.

        # A text file is truncated at the first Ctrl+Z byte
        f = self.do_create(bin=0)
        f.write(b"blat\x1a")
        f.write(b"extra\n")
        os.lseek(f.fd, 0, os.SEEK_SET)
        self.assertEqual(os.read(f.fd, 20), b"blat")
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_lseek(self):
        if verbose:
            print('play around with os.lseek() with the built largefile')
        with self.open(TESTFN, 'rb') as f:
            self.assertEqual(os.lseek(f.fileno(), 0, 0), 0)
            self.assertEqual(os.lseek(f.fileno(), 42, 0), 42)
            self.assertEqual(os.lseek(f.fileno(), 42, 1), 84)
            self.assertEqual(os.lseek(f.fileno(), 0, 1), 84)
            self.assertEqual(os.lseek(f.fileno(), 0, 2), size+1+0)
            self.assertEqual(os.lseek(f.fileno(), -10, 2), size+1-10)
            self.assertEqual(os.lseek(f.fileno(), -size-1, 2), 0)
            self.assertEqual(os.lseek(f.fileno(), size, 0), size)
            # the 'a' that was written at the end of file above
            self.assertEqual(f.read(1), b'a')
项目:bt    作者:kracekumar    | 项目源码 | 文件源码
def _write(self, piece):
        pos = piece.index * self.torrent.info.piece_length
        os.lseek(self.fd, pos, os.SEEK_SET)
        os.write(self.fd, piece.data)
项目:bt    作者:kracekumar    | 项目源码 | 文件源码
def read(self, begin, index, length):
        pos = index * self.torrent.info.piece_length
        os.lseek(self.fd, pos, os.SEEK_SET)
        return os.read(self.fd, length)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_lseek(self):
        self.check(os.lseek, 0, 0)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_stdin_filedes(self):
        # stdin is set to open file descriptor
        tf = tempfile.TemporaryFile()
        d = tf.fileno()
        os.write(d, "pear")
        os.lseek(d, 0, 0)
        p = subprocess.Popen([sys.executable, "-c",
                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
                         stdin=d)
        p.wait()
        self.assertEqual(p.returncode, 1)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_stdout_filedes(self):
        # stdout is set to open file descriptor
        tf = tempfile.TemporaryFile()
        d = tf.fileno()
        p = subprocess.Popen([sys.executable, "-c",
                          'import sys; sys.stdout.write("orange")'],
                         stdout=d)
        p.wait()
        os.lseek(d, 0, 0)
        self.assertEqual(os.read(d, 1024), "orange")
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_stderr_filedes(self):
        # stderr is set to open file descriptor
        tf = tempfile.TemporaryFile()
        d = tf.fileno()
        p = subprocess.Popen([sys.executable, "-c",
                          'import sys; sys.stderr.write("strawberry")'],
                         stderr=d)
        p.wait()
        os.lseek(d, 0, 0)
        self.assertStderrEqual(os.read(d, 1024), "strawberry")
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_lseek(self):
        self.check(os.lseek, 0, 0)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_stdin_filedes(self):
        # stdin is set to open file descriptor
        tf = tempfile.TemporaryFile()
        d = tf.fileno()
        os.write(d, "pear")
        os.lseek(d, 0, 0)
        p = subprocess.Popen([sys.executable, "-c",
                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
                         stdin=d)
        p.wait()
        self.assertEqual(p.returncode, 1)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_stdout_filedes(self):
        # stdout is set to open file descriptor
        tf = tempfile.TemporaryFile()
        d = tf.fileno()
        p = subprocess.Popen([sys.executable, "-c",
                          'import sys; sys.stdout.write("orange")'],
                         stdout=d)
        p.wait()
        os.lseek(d, 0, 0)
        self.assertEqual(os.read(d, 1024), "orange")
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_stderr_filedes(self):
        # stderr is set to open file descriptor
        tf = tempfile.TemporaryFile()
        d = tf.fileno()
        p = subprocess.Popen([sys.executable, "-c",
                          'import sys; sys.stderr.write("strawberry")'],
                         stderr=d)
        p.wait()
        os.lseek(d, 0, 0)
        self.assertStderrEqual(os.read(d, 1024), "strawberry")
项目:autoinjection    作者:ChengWiLL    | 项目源码 | 文件源码
def __init__(self, buf = None, pos = 0, filename = None, fp = None):
        if fp is not None:
            try:
                fileno = fp.fileno()
                length = os.path.getsize(fp.name)
                import mmap
            except:
                # read whole file into memory
                buf = fp.read()
                pos = 0
            else:
                # map the whole file into memory
                if length:
                    # length must not be zero
                    buf = mmap.mmap(fileno, length, access = mmap.ACCESS_READ)
                    pos = os.lseek(fileno, 0, 1)
                else:
                    buf = ''
                    pos = 0

            if filename is None:
                try:
                    filename = fp.name
                except AttributeError:
                    filename = None

        self.buf = buf
        self.pos = pos
        self.line = 1
        self.col = 1
        self.filename = filename
项目:eos-data-distribution    作者:endlessm    | 项目源码 | 文件源码
def _save_chunk(self, n, data):
        if self._part_fd < 0:
            if not self._open_files():
                return False

        assert self._part_fd >= 0
        offs = self.chunk_size * n
        os.lseek(self._part_fd, offs, os.SEEK_SET)
        os.write(self._part_fd, data)
        self._segments_file.write(self._segments)

        return True