Python os 模块,truncate() 实例源码

我们从Python开源项目中,提取了以下19个代码示例,用于说明如何使用os.truncate()

项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_does_not_fit(self):
        # The contents of a structure is too large for the image size.
        workdir = self._resources.enter_context(TemporaryDirectory())
        # See LP: #1666580
        main(('snap', '--workdir', workdir,
              '--thru', 'load_gadget_yaml',
              self.model_assertion))
        # Make the gadget's mbr contents too big.
        path = os.path.join(workdir, 'unpack', 'gadget', 'pc-boot.img')
        os.truncate(path, 512)
        mock = self._resources.enter_context(patch(
            'ubuntu_image.__main__._logger.error'))
        code = main(('snap', '--workdir', workdir, '--resume'))
        self.assertEqual(code, 1)
        self.assertEqual(
            mock.call_args_list[-1],
            call('Volume contents do not fit (72B over): '
                 'volumes:<pc>:structure:<mbr> [#0]'))
项目:CodeGra.fs    作者:CodeGra-de    | 项目源码 | 文件源码
def test_truncate_fixed(sub_open):
    fname = join(sub_open, 'new_file')

    f = open(fname, 'w')
    f.write('hello\n')
    f.flush()

    ff = open(fname, 'a')
    ff.truncate(1)

    ff.close()
    ff = open(fname, 'r')
    assert ff.read() == 'h'

    ff.close()

    f.close()

    os.truncate(fname, 0)
    f = open(fname, 'r')
    assert f.read() == ''
    f.close()
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_ftruncate(self):
        if hasattr(os, "ftruncate"):
            self.check(os.truncate, 0)
            self.check(os.ftruncate, 0)
项目:intel-iot-refkit    作者:intel    | 项目源码 | 文件源码
def create_internal_disk(self):
        """Create a internal-image*.wic in the resultdir that runqemu can use."""
        copyfile(os.path.join(self.image_dir, 'refkit-installer-image-%s.qemuboot.conf' % self.image_arch),
                 os.path.join(self.resultdir, 'internal-image-%s.qemuboot.conf' % self.image_arch))
        for ovmf in glob('%s/ovmf*' % self.image_dir):
            os.symlink(ovmf, os.path.join(self.resultdir, os.path.basename(ovmf)))
        with open(os.path.join(self.resultdir, 'internal-image-%s.wic' % self.image_arch), 'w') as f:
            # empty, sparse file of 8GB
            os.truncate(f.fileno(), 8 * 1024 * 1024 * 1024)
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def get_default_sector_size():
    with NamedTemporaryFile() as fp:
        # Truncate to zero, so that extending the size in the next call
        # will cause all the bytes to read as zero.  Stevens $4.13
        os.truncate(fp.name, 0)
        os.truncate(fp.name, MiB(1))
        return Device(fp.name).sectorSize
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def __init__(self, path, size, schema=None):
        """Initialize an image file to a given size in bytes.

        :param path: Path to image file on the file system.
        :type path: str
        :param size: Size in bytes to set the image file to.
        :type size: int
        :param schema: The partitioning schema of the volume.
        :type schema: VolumeSchema

        Public attributes:

        * path - Path to the image file.
        """
        self.path = path
        # Create an empty image file of a fixed size.  Unlike
        # truncate(1) --size 0, os.truncate(path, 0) doesn't touch the
        # file; i.e. it must already exist.
        with open(path, 'wb'):
            pass
        # Truncate to zero, so that extending the size in the next call
        # will cause all the bytes to read as zero.  Stevens $4.13
        os.truncate(path, 0)
        os.truncate(path, size)
        # Prepare the device and disk objects for parted to be used for all
        # future partition() calls.  Only do it if we actually care about the
        # partition table.
        if schema is None:
            self.sector_size = 512
            self.device = None
            self.disk = None
            self.schema = None
        else:
            self.device = parted.Device(self.path)
            label = 'msdos' if schema is VolumeSchema.mbr else 'gpt'
            self.schema = schema
            self.disk = parted.freshDisk(self.device, label)
            self.sector_size = self.device.sectorSize
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_sparse_copy(self):
        with ExitStack() as resources:
            tmpdir = resources.enter_context(TemporaryDirectory())
            sparse_file = os.path.join(tmpdir, 'sparse.dat')
            fp = resources.enter_context(open(sparse_file, 'w'))
            os.truncate(fp.fileno(), 1000000)
            # This file is sparse.
            self.assertTrue(is_sparse(sparse_file))
            copied_file = os.path.join(tmpdir, 'copied.dat')
            sparse_copy(sparse_file, copied_file)
            self.assertTrue(is_sparse(copied_file))
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_copy_symlink(self):
        with ExitStack() as resources:
            tmpdir = resources.enter_context(TemporaryDirectory())
            sparse_file = os.path.join(tmpdir, 'sparse.dat')
            fp = resources.enter_context(open(sparse_file, 'w'))
            os.truncate(fp.fileno(), 1000000)
            # This file is sparse.
            self.assertTrue(is_sparse(sparse_file))
            # Create a symlink to the sparse file.
            linked_file = os.path.join(tmpdir, 'linked.dat')
            os.symlink(sparse_file, linked_file)
            self.assertTrue(os.path.islink(linked_file))
            copied_link = os.path.join(tmpdir, 'copied.dat')
            sparse_copy(linked_file, copied_link, follow_symlinks=False)
            self.assertTrue(os.path.islink(copied_link))
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_ftruncate(self):
        self.check(os.truncate, 0)
        self.check(os.ftruncate, 0)
项目:porcupine    作者:Akuli    | 项目源码 | 文件源码
def _open_log_file():
    """Open a Porcupine log file.

    Usually this opens and overwrites log.txt. If another Porcupine
    process has it currently opened, this opens log1.txt instead, then
    log2.txt and so on.
    """
    # create an iterator 'log.txt', 'log2.txt', 'log3.txt', ...
    filenames = itertools.chain(
        ['log.txt'],
        map('log{}.txt'.format, itertools.count(start=2)),
    )

    for filename in filenames:
        path = os.path.join(dirs.cachedir, filename)
        # unfortunately there's not a mode that would open in write but
        # not truncate like 'w' or seek to end like 'a'
        fileno = os.open(path, os.O_WRONLY | os.O_CREAT, 0o644)

        if _lock(fileno):
            # now we can delete the old content, can't use os.truncate
            # here because it doesn't exist on windows
            file = open(fileno, 'w')
            file.truncate(0)
            return file
        else:
            os.close(fileno)


# FileHandler doesn't take already opened files and StreamHandler
# doesn't close the file :(
项目:CodeGra.fs    作者:CodeGra-de    | 项目源码 | 文件源码
def truncate(self, length):
        raise FuseOSError(EPERM)
项目:CodeGra.fs    作者:CodeGra-de    | 项目源码 | 文件源码
def truncate(self, length):
        self.data = self.get_data()

        if length == 0:
            self.data = bytes('', 'utf8')
        elif length <= len(self.data):
            self.data = self.data[:length]
        else:
            self.data = self.data + bytes(
                '\0' * (length - self.stat['st_size']), 'utf8'
            )

        self.overwrite = True
项目:CodeGra.fs    作者:CodeGra-de    | 项目源码 | 文件源码
def truncate(self, length):
        if self._handle is None:
            os.truncate(self.full_path, length)
        else:
            self._handle.seek(0)
            self._handle.truncate(length)
            self._handle.flush()
项目:CodeGra.fs    作者:CodeGra-de    | 项目源码 | 文件源码
def truncate(self, length):
        old_data = self.data

        if length == 0:
            self.data = bytes('', 'utf8')
        elif length <= len(old_data):
            self.data = self.data[:length]
        else:
            self.data = old_data + bytes(
                '\0' * (length - len(old_data)), 'utf8'
            )
        self.stat['st_atime'] = time()
        self.stat['st_mtime'] = time()
        self.dirty = True
项目:CodeGra.fs    作者:CodeGra-de    | 项目源码 | 文件源码
def truncate(self, path, length, fh=None):
        with self._lock:
            if length < 0:  # pragma: no cover
                raise FuseOSError(EINVAL)

            if fh is None:
                file = self.get_file(path, expect_type=SingleFile)
            else:  # pragma: no cover
                file = self._open_files[fh]

            if self.fixed and not isinstance(file, (TempFile, SpecialFile)):
                raise FuseOSError(EPERM)

            file.truncate(length)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_ftruncate(self):
        self.check(os.truncate, 0)
        self.check(os.ftruncate, 0)
项目:tm-librarian    作者:FabricAttachedMemory    | 项目源码 | 文件源码
def _consistent(self, cached):
        try:
            all_fh = [ ]
            for vlist in cached.open_handle.values():
                all_fh += vlist
            for v_fh in all_fh:
                assert (None, v_fh) in self._shelfcache, 'Inconsistent list members'
        except Exception as e:
            self.logger.error('Shadow cache is corrupt: %s' % str(e))
            if self.verbose > 3:
                set_trace()
            raise TmfsOSError(errno.EBADFD)

    # Most uses send an open handle fh (integer) as key.  truncate by name
    # is the exception.  An update needs to be reflected for all keys.
项目:tm-librarian    作者:FabricAttachedMemory    | 项目源码 | 文件源码
def truncate(self, shelf, length, fh):
        if fh is not None:
            # This is an update, but there's no good way to flag that to
            # __setitem__.  Do an idiot check here.
            assert (None, fh) in self, 'VFS thinks %s is open but LFS does not, shelf_id: %s' % (
                shelf.name, shelf.id)
        self[(shelf.id, None)] = shelf
        return 0
项目:tm-librarian    作者:FabricAttachedMemory    | 项目源码 | 文件源码
def truncate(self, shelf, length, fd):  # shadow_dir, yes an fd
        try:
            if fd:  # It's an open shelf
                assert self[shelf.name]._fd == fd, 'fd mismatch on truncate'
            os.truncate(
                shelf._fd if shelf._fd >= 0 else self.shadowpath(shelf.name),
                length)
            shelf.size_bytes = length
            if shelf.open_handle is None:
                shelf = self[shelf.id]
                if shelf is not None:
                    shelf.size_bytes = length
            return 0
        except OSError as e:
            raise TmfsOSError(e.errno)