Python os 模块,ftruncate() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.ftruncate()

项目:kAFL    作者:RUB-SysSec    | 项目源码 | 文件源码
def wipe(self):
        filter_bitmap_fd = os.open("/dev/shm/kafl_filter0", os.O_RDWR | os.O_SYNC | os.O_CREAT)
        os.ftruncate(filter_bitmap_fd, self.config.config_values['BITMAP_SHM_SIZE'])
        filter_bitmap = mmap.mmap(filter_bitmap_fd, self.config.config_values['BITMAP_SHM_SIZE'], mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ)
        for i in range(self.config.config_values['BITMAP_SHM_SIZE']):
            filter_bitmap[i] = '\x00'
        filter_bitmap.close()
        os.close(filter_bitmap_fd)

        filter_bitmap_fd = os.open("/dev/shm/kafl_tfilter", os.O_RDWR | os.O_SYNC | os.O_CREAT)
        os.ftruncate(filter_bitmap_fd, 0x1000000)
        filter_bitmap = mmap.mmap(filter_bitmap_fd, 0x1000000, mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ)
        for i in range(0x1000000):
            filter_bitmap[i] = '\x00'
        filter_bitmap.close()
        os.close(filter_bitmap_fd)
项目:kAFL    作者:RUB-SysSec    | 项目源码 | 文件源码
def __set_binary(self, filename, binaryfile, max_size):
        shm_fd = os.open(filename, os.O_RDWR | os.O_SYNC | os.O_CREAT)
        os.ftruncate(shm_fd, max_size)
        shm = mmap.mmap(shm_fd, max_size, mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ)
        shm.seek(0x0)
        shm.write('\x00' * max_size)
        shm.seek(0x0)

        f = open(binaryfile, "rb")
        bytes = f.read(1024)
        if bytes:
            shm.write(bytes)
        while bytes != "":
            bytes = f.read(1024)
            if bytes:
                shm.write(bytes)

        f.close()
        shm.close()
        os.close(shm_fd)
项目:kAFL    作者:RUB-SysSec    | 项目源码 | 文件源码
def init(self):
        self.control = socket.socket(socket.AF_UNIX)
        while True:
            try:
                self.control.connect(self.control_filename)
                #self.control.connect(self.control_filename)
                break
            except socket_error:
                pass
                #time.sleep(0.01)

        self.kafl_shm_f     = os.open(self.bitmap_filename, os.O_RDWR | os.O_SYNC | os.O_CREAT)
        self.fs_shm_f       = os.open(self.payload_filename, os.O_RDWR | os.O_SYNC | os.O_CREAT)
        #argv_fd             = os.open(self.argv_filename, os.O_RDWR | os.O_SYNC | os.O_CREAT)
        os.ftruncate(self.kafl_shm_f, self.bitmap_size)
        os.ftruncate(self.fs_shm_f, (128 << 10))
        #os.ftruncate(argv_fd, (4 << 10))

        self.kafl_shm       = mmap.mmap(self.kafl_shm_f, self.bitmap_size, mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ)
        self.fs_shm         = mmap.mmap(self.fs_shm_f, (128 << 10),  mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ)

        return True
项目:isar    作者:ilbers    | 项目源码 | 文件源码
def prepare_rootfs_btrfs(self, rootfs, oe_builddir, rootfs_dir):
        """
        Prepare content for a btrfs rootfs partition.

        Currently handles ext2/3/4 and btrfs.
        """
        du_cmd = "sudo du -ks %s" % rootfs_dir
        out = exec_cmd(du_cmd)
        actual_rootfs_size = int(out.split()[0])

        rootfs_size = self.get_rootfs_size(actual_rootfs_size)

        with open(rootfs, 'w') as sparse:
            os.ftruncate(sparse.fileno(), rootfs_size * 1024)

        label_str = ""
        if self.label:
            label_str = "-L %s" % self.label

        mkfs_cmd = "sudo mkfs.%s -b %d -r %s %s %s" % \
            (self.fstype, rootfs_size * 1024, rootfs_dir, label_str, rootfs)
        exec_cmd(mkfs_cmd)
项目:isar    作者:ilbers    | 项目源码 | 文件源码
def prepare_empty_partition_ext(self, rootfs, oe_builddir):
        """
        Prepare an empty ext2/3/4 partition.
        """
        size = self.disk_size
        with open(rootfs, 'w') as sparse:
            os.ftruncate(sparse.fileno(), size * 1024)

        extra_imagecmd = "-i 8192"

        label_str = ""
        if self.label:
            label_str = "-L %s" % self.label

        mkfs_cmd = "sudo mkfs.%s -F %s %s %s" % \
            (self.fstype, extra_imagecmd, label_str, rootfs)
        exec_cmd(mkfs_cmd)
项目:packetary    作者:openstack    | 项目源码 | 文件源码
def _copy_stream(self, fd, url, offset):
        """Copies remote file to local.

        :param fd: the file`s descriptor
        :param url: the remote file`s url
        :param offset: the number of bytes from the beginning,
                       that will be skipped
        :return: the count of actually copied bytes
        """

        source = self.open_stream(url, offset)
        os.ftruncate(fd, offset)
        os.lseek(fd, offset, os.SEEK_SET)
        chunk_size = 16 * 1024
        size = 0
        while 1:
            chunk = source.read(chunk_size)
            if not chunk:
                break
            os.write(fd, chunk)
            size += len(chunk)
        return size
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def __init__(self, transfer_size):
        fd, self.filename = tempfile.mkstemp()
        os.ftruncate(fd, 20)
        self.buf = mmap.mmap(fd, 20, mmap.MAP_SHARED, mmap.PROT_WRITE)
        os.close(fd)
        self.total_bytes = ctypes.c_uint64.from_buffer(self.buf)
        self.total_bytes.value = 0
        self.average_time = ctypes.c_double.from_buffer(self.buf, 8)
        self.average_time.value = 0.0
        self.transfer_size = ctypes.c_uint32.from_buffer(self.buf, 16)
        self.transfer_size.value = transfer_size
项目:kAFL    作者:RUB-SysSec    | 项目源码 | 文件源码
def create_shm(self):
        for j in range(len(self.files)):
            for i in range(self.num_processes):
                shm_f = os.open(self.files[j]+str(i), os.O_CREAT | os.O_RDWR | os.O_SYNC)
                os.ftruncate(shm_f, self.sizes[j]*self.tasks_per_requests)
                os.close(shm_f)
项目:kAFL    作者:RUB-SysSec    | 项目源码 | 文件源码
def open_global_bitmap(self):
        self.global_bitmap_fd = os.open(self.config.argument_values['work_dir'] + "/bitmap", os.O_RDWR | os.O_SYNC | os.O_CREAT)
        os.ftruncate(self.global_bitmap_fd, self.bitmap_size)
        self.global_bitmap = mmap.mmap(self.global_bitmap_fd, self.bitmap_size, mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ)
项目:shadowsocksR-b    作者:hao35954514    | 项目源码 | 文件源码
def write_pid_file(pid_file, pid):
    import fcntl
    import stat

    try:
        fd = os.open(pid_file, os.O_RDWR | os.O_CREAT,
                     stat.S_IRUSR | stat.S_IWUSR)
    except OSError as e:
        shell.print_exception(e)
        return -1
    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
    assert flags != -1
    flags |= fcntl.FD_CLOEXEC
    r = fcntl.fcntl(fd, fcntl.F_SETFD, flags)
    assert r != -1
    # There is no platform independent way to implement fcntl(fd, F_SETLK, &fl)
    # via fcntl.fcntl. So use lockf instead
    try:
        fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB, 0, 0, os.SEEK_SET)
    except IOError:
        r = os.read(fd, 32)
        if r:
            logging.error('already started at pid %s' % common.to_str(r))
        else:
            logging.error('already started')
        os.close(fd)
        return -1
    os.ftruncate(fd, 0)
    os.write(fd, common.to_bytes(str(pid)))
    return 0
项目:shadowsocksr    作者:shadowsocksr-backup    | 项目源码 | 文件源码
def write_pid_file(pid_file, pid):
    import fcntl
    import stat

    try:
        fd = os.open(pid_file, os.O_RDWR | os.O_CREAT,
                     stat.S_IRUSR | stat.S_IWUSR)
    except OSError as e:
        shell.print_exception(e)
        return -1
    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
    assert flags != -1
    flags |= fcntl.FD_CLOEXEC
    r = fcntl.fcntl(fd, fcntl.F_SETFD, flags)
    assert r != -1
    # There is no platform independent way to implement fcntl(fd, F_SETLK, &fl)
    # via fcntl.fcntl. So use lockf instead
    try:
        fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB, 0, 0, os.SEEK_SET)
    except IOError:
        r = os.read(fd, 32)
        if r:
            logging.error('already started at pid %s' % common.to_str(r))
        else:
            logging.error('already started')
        os.close(fd)
        return -1
    os.ftruncate(fd, 0)
    os.write(fd, common.to_bytes(str(pid)))
    return 0
项目:bap-python    作者:BinaryAnalysisPlatform    | 项目源码 | 文件源码
def mmap(self, data):
        url = "mmap://{0}?offset=0&length={1}".format(
            self.temp.name, len(data))
        os.ftruncate(self.temp.fileno(), 4096)
        mm = mmap(self.temp.fileno(), 4096)
        mm.write(data)
        mm.close()
        return url
项目:ShadowSocks    作者:immqy    | 项目源码 | 文件源码
def write_pid_file(pid_file, pid):
    import fcntl
    import stat

    try:
        fd = os.open(pid_file, os.O_RDWR | os.O_CREAT,
                     stat.S_IRUSR | stat.S_IWUSR)
    except OSError as e:
        shell.print_exception(e)
        return -1
    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
    assert flags != -1
    flags |= fcntl.FD_CLOEXEC
    r = fcntl.fcntl(fd, fcntl.F_SETFD, flags)
    assert r != -1
    # There is no platform independent way to implement fcntl(fd, F_SETLK, &fl)
    # via fcntl.fcntl. So use lockf instead
    try:
        fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB, 0, 0, os.SEEK_SET)
    except IOError:
        r = os.read(fd, 32)
        if r:
            logging.error('already started at pid %s' % common.to_str(r))
        else:
            logging.error('already started')
        os.close(fd)
        return -1
    os.ftruncate(fd, 0)
    os.write(fd, common.to_bytes(str(pid)))
    return 0
项目:weibo    作者:windskyer    | 项目源码 | 文件源码
def write_pid_file(pid_file, pid):
    try:
        fd = os.open(pid_file, os.O_RDWR | os.O_CREAT,
                     stat.S_IRUSR | stat.S_IWUSR)
    except OSError as e:
        LOG.exception(e)
        return -1
    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
    assert flags != -1
    flags |= fcntl.FD_CLOEXEC
    r = fcntl.fcntl(fd, fcntl.F_SETFD, flags)
    assert r != -1
    # There is no platform independent way to implement fcntl(fd, F_SETLK, &fl)
    # via fcntl.fcntl. So use lockf instead
    try:
        fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB, 0, 0, os.SEEK_SET)
    except IOError:
        r = os.read(fd, 32)
        if r:
            logging.error('already started at pid %s' % utils.to_str(r))
        else:
            logging.error('already started')
        os.close(fd)
        return -1
    os.ftruncate(fd, 0)
    os.write(fd, utils.to_bytes(str(pid)))
    return 0
项目:isar    作者:ilbers    | 项目源码 | 文件源码
def _probe_seek_hole(self):
        """
        Check whether the system implements 'SEEK_HOLE' and 'SEEK_DATA'.
        Unfortunately, there seems to be no clean way for detecting this,
        because often the system just fakes them by just assuming that all
        files are fully mapped, so 'SEEK_HOLE' always returns EOF and
        'SEEK_DATA' always returns the requested offset.

        I could not invent a better way of detecting the fake 'SEEK_HOLE'
        implementation than just to create a temporary file in the same
        directory where the image file resides. It would be nice to change this
        to something better.
        """

        directory = os.path.dirname(self._image_path)

        try:
            tmp_obj = tempfile.TemporaryFile("w+", dir=directory)
        except IOError as err:
            raise ErrorNotSupp("cannot create a temporary in \"%s\": %s"
                              % (directory, err))

        try:
            os.ftruncate(tmp_obj.fileno(), self.block_size)
        except OSError as err:
            raise ErrorNotSupp("cannot truncate temporary file in \"%s\": %s"
                               % (directory, err))

        offs = _lseek(tmp_obj, 0, _SEEK_HOLE)
        if offs != 0:
            # We are dealing with the stub 'SEEK_HOLE' implementation which
            # always returns EOF.
            self._log.debug("lseek(0, SEEK_HOLE) returned %d" % offs)
            raise ErrorNotSupp("the file-system does not support "
                               "\"SEEK_HOLE\" and \"SEEK_DATA\" but only "
                               "provides a stub implementation")

        tmp_obj.close()
项目:isar    作者:ilbers    | 项目源码 | 文件源码
def prepare_rootfs_ext(self, rootfs, oe_builddir, rootfs_dir):
        """
        Prepare content for an ext2/3/4 rootfs partition.
        """
        du_cmd = "sudo du -ks %s" % rootfs_dir
        out = exec_cmd(du_cmd)
        actual_rootfs_size = int(out.split()[0])

        rootfs_size = self.get_rootfs_size(actual_rootfs_size)

        with open(rootfs, 'w') as sparse:
            os.ftruncate(sparse.fileno(), rootfs_size * 1024)

        extra_imagecmd = "-i 8192"

        label_str = ""
        if self.label:
            label_str = "-L %s" % self.label

        mkfs_cmd = "sudo mkfs.%s -F %s %s %s -d %s" % \
            (self.fstype, extra_imagecmd, rootfs, label_str, rootfs_dir)
        exec_cmd(mkfs_cmd)

        try:
            mkfs_cmd = "fsck.%s -pvfD %s" % (self.fstype, rootfs)
            exec_cmd(mkfs_cmd)
        except WicExecError as e:
            if e.returncode != 1:
                raise e
项目:isar    作者:ilbers    | 项目源码 | 文件源码
def prepare_empty_partition_btrfs(self, rootfs, oe_builddir):
        """
        Prepare an empty btrfs partition.
        """
        size = self.disk_size
        with open(rootfs, 'w') as sparse:
            os.ftruncate(sparse.fileno(), size * 1024)

        label_str = ""
        if self.label:
            label_str = "-L %s" % self.label

        mkfs_cmd = "sudo mkfs.%s -b %d %s %s" % \
            (self.fstype, self.size * 1024, label_str, rootfs)
        exec_cmd(mkfs_cmd)
项目:ssrr    作者:do21    | 项目源码 | 文件源码
def write_pid_file(pid_file, pid):
    import fcntl
    import stat

    try:
        fd = os.open(pid_file, os.O_RDWR | os.O_CREAT,
                     stat.S_IRUSR | stat.S_IWUSR)
    except OSError as e:
        shell.print_exception(e)
        return -1
    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
    assert flags != -1
    flags |= fcntl.FD_CLOEXEC
    r = fcntl.fcntl(fd, fcntl.F_SETFD, flags)
    assert r != -1
    # There is no platform independent way to implement fcntl(fd, F_SETLK, &fl)
    # via fcntl.fcntl. So use lockf instead
    try:
        fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB, 0, 0, os.SEEK_SET)
    except IOError:
        r = os.read(fd, 32)
        if r:
            logging.error('already started at pid %s' % common.to_str(r))
        else:
            logging.error('already started')
        os.close(fd)
        return -1
    os.ftruncate(fd, 0)
    os.write(fd, common.to_bytes(str(pid)))
    return 0
项目:shadowsocksr    作者:ShadowsocksR-Live    | 项目源码 | 文件源码
def write_pid_file(pid_file, pid):
    import fcntl
    import stat

    try:
        fd = os.open(pid_file, os.O_RDWR | os.O_CREAT,
                     stat.S_IRUSR | stat.S_IWUSR)
    except OSError as e:
        shell.print_exception(e)
        return -1
    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
    assert flags != -1
    flags |= fcntl.FD_CLOEXEC
    r = fcntl.fcntl(fd, fcntl.F_SETFD, flags)
    assert r != -1
    # There is no platform independent way to implement fcntl(fd, F_SETLK, &fl)
    # via fcntl.fcntl. So use lockf instead
    try:
        fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB, 0, 0, os.SEEK_SET)
    except IOError:
        r = os.read(fd, 32)
        if r:
            logging.error('already started at pid %s' % common.to_str(r))
        else:
            logging.error('already started')
        os.close(fd)
        return -1
    os.ftruncate(fd, 0)
    os.write(fd, common.to_bytes(str(pid)))
    return 0
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_ftruncate(self):
        if hasattr(os, "ftruncate"):
            self.check(os.ftruncate, 0)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_ftruncate(self):
        self.check(os.ftruncate, 0)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_ftruncate(self):
        self.check(os.ftruncate, 0)
项目:python-wayland    作者:sde1000    | 项目源码 | 文件源码
def __init__(self, size):
        xdg_runtime_dir = os.getenv('XDG_RUNTIME_DIR')
        if not xdg_runtime_dir:
            raise NoXDGRuntimeDir()
        self._fd, name = tempfile.mkstemp(dir=xdg_runtime_dir)
        os.ftruncate(self._fd, size)
项目:shadowsocksr    作者:shadowsocks-r    | 项目源码 | 文件源码
def write_pid_file(pid_file, pid):
    import fcntl
    import stat

    try:
        fd = os.open(pid_file, os.O_RDWR | os.O_CREAT,
                     stat.S_IRUSR | stat.S_IWUSR)
    except OSError as e:
        shell.print_exception(e)
        return -1
    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
    assert flags != -1
    flags |= fcntl.FD_CLOEXEC
    r = fcntl.fcntl(fd, fcntl.F_SETFD, flags)
    assert r != -1
    # There is no platform independent way to implement fcntl(fd, F_SETLK, &fl)
    # via fcntl.fcntl. So use lockf instead
    try:
        fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB, 0, 0, os.SEEK_SET)
    except IOError:
        r = os.read(fd, 32)
        if r:
            logging.error('already started at pid %s' % common.to_str(r))
        else:
            logging.error('already started')
        os.close(fd)
        return -1
    os.ftruncate(fd, 0)
    os.write(fd, common.to_bytes(str(pid)))
    return 0
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_ftruncate(self):
        if hasattr(os, "ftruncate"):
            self.check(os.truncate, 0)
            self.check(os.ftruncate, 0)
项目:remoteControlPPT    作者:htwenning    | 项目源码 | 文件源码
def truncate(self, size=-1):
            self.flush()
            if size == -1:
                size = self.tell()
            try:
                rv = os.ftruncate(self.fileno(), size)
            except OSError as e:
                raise IOError(*e.args)
            else:
                self.seek(size)  # move position&clear buffer
                return rv
项目:remoteControlPPT    作者:htwenning    | 项目源码 | 文件源码
def truncate(self, size=-1):
        if size == -1:
            size = self.tell()
        try:
            rv = _original_os.ftruncate(self._fileno, size)
        except OSError as e:
            raise IOError(*e.args)
        else:
            self.seek(size)  # move position&clear buffer
            return rv
项目:shadowsocks_manyuser_speedfast365    作者:ShenYinjie    | 项目源码 | 文件源码
def write_pid_file(pid_file, pid):
    import fcntl
    import stat

    try:
        fd = os.open(pid_file, os.O_RDWR | os.O_CREAT,
                     stat.S_IRUSR | stat.S_IWUSR)
    except OSError as e:
        shell.print_exception(e)
        return -1
    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
    assert flags != -1
    flags |= fcntl.FD_CLOEXEC
    r = fcntl.fcntl(fd, fcntl.F_SETFD, flags)
    assert r != -1
    # There is no platform independent way to implement fcntl(fd, F_SETLK, &fl)
    # via fcntl.fcntl. So use lockf instead
    try:
        fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB, 0, 0, os.SEEK_SET)
    except IOError:
        r = os.read(fd, 32)
        if r:
            logging.error('already started at pid %s' % common.to_str(r))
        else:
            logging.error('already started')
        os.close(fd)
        return -1
    os.ftruncate(fd, 0)
    os.write(fd, common.to_bytes(str(pid)))
    return 0
项目:shadowsocksr    作者:yzou    | 项目源码 | 文件源码
def write_pid_file(pid_file, pid):
    import fcntl
    import stat

    try:
        fd = os.open(pid_file, os.O_RDWR | os.O_CREAT,
                     stat.S_IRUSR | stat.S_IWUSR)
    except OSError as e:
        shell.print_exception(e)
        return -1
    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
    assert flags != -1
    flags |= fcntl.FD_CLOEXEC
    r = fcntl.fcntl(fd, fcntl.F_SETFD, flags)
    assert r != -1
    # There is no platform independent way to implement fcntl(fd, F_SETLK, &fl)
    # via fcntl.fcntl. So use lockf instead
    try:
        fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB, 0, 0, os.SEEK_SET)
    except IOError:
        r = os.read(fd, 32)
        if r:
            logging.error('already started at pid %s' % common.to_str(r))
        else:
            logging.error('already started')
        os.close(fd)
        return -1
    os.ftruncate(fd, 0)
    os.write(fd, common.to_bytes(str(pid)))
    return 0
项目:AnnotatedShadowSocks    作者:xuelangZF    | 项目源码 | 文件源码
def write_pid_file(pid_file, pid):
    """ Use the pid file to govern that the daemon is only running one instance.

    Open the pid file and set the close-on-exec flag firstly.
    Then try to acquire the exclusive lock of the pid file:
        If success, return 0 to start the daemon process.
        else, there already is a daemon process running, return -1.
    """
    import fcntl
    import stat

    # https://github.com/xuelangZF/AnnotatedShadowSocks/issues/23
    try:
        fd = os.open(pid_file, os.O_RDWR | os.O_CREAT,
                     stat.S_IRUSR | stat.S_IWUSR)
    except OSError as e:
        logging.error(e)
        return -1

    # https://github.com/xuelangZF/AnnotatedShadowSocks/issues/25
    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
    assert flags != -1
    flags |= fcntl.FD_CLOEXEC
    r = fcntl.fcntl(fd, fcntl.F_SETFD, flags)
    assert r != -1

    # https://github.com/xuelangZF/AnnotatedShadowSocks/issues/26
    try:
        fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB, 0, 0, os.SEEK_SET)
    except IOError:
        r = os.read(fd, 32)
        if r:
            logging.error('already started at pid %s' % common.to_str(r))
        else:
            logging.error('already started')
        os.close(fd)
        return -1
    os.ftruncate(fd, 0)
    os.write(fd, common.to_bytes(str(pid)))
    return 0
项目:bmap-tools    作者:intel    | 项目源码 | 文件源码
def _probe_seek_hole(self):
        """
        Check whether the system implements 'SEEK_HOLE' and 'SEEK_DATA'.
        Unfortunately, there seems to be no clean way for detecting this,
        because often the system just fakes them by just assuming that all
        files are fully mapped, so 'SEEK_HOLE' always returns EOF and
        'SEEK_DATA' always returns the requested offset.

        I could not invent a better way of detecting the fake 'SEEK_HOLE'
        implementation than just to create a temporary file in the same
        directory where the image file resides. It would be nice to change this
        to something better.
        """

        directory = os.path.dirname(self._image_path)

        try:
            tmp_obj = tempfile.TemporaryFile("w+", dir=directory)
        except IOError as err:
            raise ErrorNotSupp("cannot create a temporary in \"%s\": %s"
                               % (directory, err))

        try:
            os.ftruncate(tmp_obj.fileno(), self.block_size)
        except OSError as err:
            raise ErrorNotSupp("cannot truncate temporary file in \"%s\": %s"
                               % (directory, err))

        offs = _lseek(tmp_obj, 0, _SEEK_HOLE)
        if offs != 0:
            # We are dealing with the stub 'SEEK_HOLE' implementation which
            # always returns EOF.
            _log.debug("lseek(0, SEEK_HOLE) returned %d" % offs)
            raise ErrorNotSupp("the file-system does not support "
                               "\"SEEK_HOLE\" and \"SEEK_DATA\" but only "
                               "provides a stub implementation")

        tmp_obj.close()
项目:shadowsocksr-20170728    作者:lhp7895    | 项目源码 | 文件源码
def write_pid_file(pid_file, pid):
    import fcntl
    import stat

    try:
        fd = os.open(pid_file, os.O_RDWR | os.O_CREAT,
                     stat.S_IRUSR | stat.S_IWUSR)
    except OSError as e:
        shell.print_exception(e)
        return -1
    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
    assert flags != -1
    flags |= fcntl.FD_CLOEXEC
    r = fcntl.fcntl(fd, fcntl.F_SETFD, flags)
    assert r != -1
    # There is no platform independent way to implement fcntl(fd, F_SETLK, &fl)
    # via fcntl.fcntl. So use lockf instead
    try:
        fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB, 0, 0, os.SEEK_SET)
    except IOError:
        r = os.read(fd, 32)
        if r:
            logging.error('already started at pid %s' % common.to_str(r))
        else:
            logging.error('already started')
        os.close(fd)
        return -1
    os.ftruncate(fd, 0)
    os.write(fd, common.to_bytes(str(pid)))
    return 0
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_ftruncate(self):
        self.check(os.ftruncate, 0)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_ftruncate(self):
        self.check(os.truncate, 0)
        self.check(os.ftruncate, 0)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_ftruncate(self):
        if hasattr(os, "ftruncate"):
            self.check(os.ftruncate, 0)
项目:shadowsocksr-python    作者:nanqinlang-shadowsocksr    | 项目源码 | 文件源码
def write_pid_file(pid_file, pid):
    import fcntl
    import stat

    try:
        fd = os.open(pid_file, os.O_RDWR | os.O_CREAT,
                     stat.S_IRUSR | stat.S_IWUSR)
    except OSError as e:
        shell.print_exception(e)
        return -1
    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
    assert flags != -1
    flags |= fcntl.FD_CLOEXEC
    r = fcntl.fcntl(fd, fcntl.F_SETFD, flags)
    assert r != -1
    # There is no platform independent way to implement fcntl(fd, F_SETLK, &fl)
    # via fcntl.fcntl. So use lockf instead
    try:
        fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB, 0, 0, os.SEEK_SET)
    except IOError:
        r = os.read(fd, 32)
        if r:
            logging.error('already started at pid %s' % common.to_str(r))
        else:
            logging.error('already started')
        os.close(fd)
        return -1
    os.ftruncate(fd, 0)
    os.write(fd, common.to_bytes(str(pid)))
    return 0
项目:luci-oso21    作者:oso21    | 项目源码 | 文件源码
def write_pid_file(pid_file, pid):
    import fcntl
    import stat

    try:
        fd = os.open(pid_file, os.O_RDWR | os.O_CREAT,
                     stat.S_IRUSR | stat.S_IWUSR)
    except OSError as e:
        shell.print_exception(e)
        return -1
    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
    assert flags != -1
    flags |= fcntl.FD_CLOEXEC
    r = fcntl.fcntl(fd, fcntl.F_SETFD, flags)
    assert r != -1
    # There is no platform independent way to implement fcntl(fd, F_SETLK, &fl)
    # via fcntl.fcntl. So use lockf instead
    try:
        fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB, 0, 0, os.SEEK_SET)
    except IOError:
        r = os.read(fd, 32)
        if r:
            logging.error('already started at pid %s' % common.to_str(r))
        else:
            logging.error('already started')
        os.close(fd)
        return -1
    os.ftruncate(fd, 0)
    os.write(fd, common.to_bytes(str(pid)))
    return 0
项目:today    作者:WooSoftware    | 项目源码 | 文件源码
def write_pid_file(pid_file, pid):
    import fcntl
    import stat

    try:
        fd = os.open(pid_file, os.O_RDWR | os.O_CREAT,
                     stat.S_IRUSR | stat.S_IWUSR)
    except OSError as e:
        shell.print_exception(e)
        return -1
    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
    assert flags != -1
    flags |= fcntl.FD_CLOEXEC
    r = fcntl.fcntl(fd, fcntl.F_SETFD, flags)
    assert r != -1
    # There is no platform independent way to implement fcntl(fd, F_SETLK, &fl)
    # via fcntl.fcntl. So use lockf instead
    try:
        fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB, 0, 0, os.SEEK_SET)
    except IOError:
        r = os.read(fd, 32)
        if r:
            logging.error('already started at pid %s' % common.to_str(r))
        else:
            logging.error('already started')
        os.close(fd)
        return -1
    os.ftruncate(fd, 0)
    os.write(fd, common.to_bytes(str(pid)))
    return 0
项目:shadowsocksrr    作者:moinuxx    | 项目源码 | 文件源码
def write_pid_file(pid_file, pid):
    import fcntl
    import stat

    try:
        fd = os.open(pid_file, os.O_RDWR | os.O_CREAT,
                     stat.S_IRUSR | stat.S_IWUSR)
    except OSError as e:
        shell.print_exception(e)
        return -1
    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
    assert flags != -1
    flags |= fcntl.FD_CLOEXEC
    r = fcntl.fcntl(fd, fcntl.F_SETFD, flags)
    assert r != -1
    # There is no platform independent way to implement fcntl(fd, F_SETLK, &fl)
    # via fcntl.fcntl. So use lockf instead
    try:
        fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB, 0, 0, os.SEEK_SET)
    except IOError:
        r = os.read(fd, 32)
        if r:
            logging.error('already started at pid %s' % common.to_str(r))
        else:
            logging.error('already started')
        os.close(fd)
        return -1
    os.ftruncate(fd, 0)
    os.write(fd, common.to_bytes(str(pid)))
    return 0
项目:SSPANEL-V3-shadowsockR    作者:neophack    | 项目源码 | 文件源码
def write_pid_file(pid_file, pid):
    import fcntl
    import stat

    try:
        fd = os.open(pid_file, os.O_RDWR | os.O_CREAT,
                     stat.S_IRUSR | stat.S_IWUSR)
    except OSError as e:
        shell.print_exception(e)
        return -1
    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
    assert flags != -1
    flags |= fcntl.FD_CLOEXEC
    r = fcntl.fcntl(fd, fcntl.F_SETFD, flags)
    assert r != -1
    # There is no platform independent way to implement fcntl(fd, F_SETLK, &fl)
    # via fcntl.fcntl. So use lockf instead
    try:
        fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB, 0, 0, os.SEEK_SET)
    except IOError:
        r = os.read(fd, 32)
        if r:
            logging.error('already started at pid %s' % common.to_str(r))
        else:
            logging.error('already started')
        os.close(fd)
        return -1
    os.ftruncate(fd, 0)
    os.write(fd, common.to_bytes(str(pid)))
    return 0
项目:shadowsocks-remix    作者:SuperSuperSuperSuper5    | 项目源码 | 文件源码
def write_pid_file(pid_file, pid):
    import fcntl
    import stat

    try:
        fd = os.open(pid_file, os.O_RDWR | os.O_CREAT,
                     stat.S_IRUSR | stat.S_IWUSR)
    except OSError as e:
        shell.print_exception(e)
        return -1
    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
    assert flags != -1
    flags |= fcntl.FD_CLOEXEC
    r = fcntl.fcntl(fd, fcntl.F_SETFD, flags)
    assert r != -1
    # There is no platform independent way to implement fcntl(fd, F_SETLK, &fl)
    # via fcntl.fcntl. So use lockf instead
    try:
        fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB, 0, 0, os.SEEK_SET)
    except IOError:
        r = os.read(fd, 32)
        if r:
            logging.error('already started at pid %s' % common.to_str(r))
        else:
            logging.error('already started')
        os.close(fd)
        return -1
    os.ftruncate(fd, 0)
    os.write(fd, common.to_bytes(str(pid)))
    return 0
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_ftruncate(self):
        self.check(os.truncate, 0)
        self.check(os.ftruncate, 0)
项目:shadowsocksr    作者:emacsenli    | 项目源码 | 文件源码
def write_pid_file(pid_file, pid):
    import fcntl
    import stat

    try:
        fd = os.open(pid_file, os.O_RDWR | os.O_CREAT,
                     stat.S_IRUSR | stat.S_IWUSR)
    except OSError as e:
        shell.print_exception(e)
        return -1
    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
    assert flags != -1
    flags |= fcntl.FD_CLOEXEC
    r = fcntl.fcntl(fd, fcntl.F_SETFD, flags)
    assert r != -1
    # There is no platform independent way to implement fcntl(fd, F_SETLK, &fl)
    # via fcntl.fcntl. So use lockf instead
    try:
        fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB, 0, 0, os.SEEK_SET)
    except IOError:
        r = os.read(fd, 32)
        if r:
            logging.error('already started at pid %s' % common.to_str(r))
        else:
            logging.error('already started')
        os.close(fd)
        return -1
    os.ftruncate(fd, 0)
    os.write(fd, common.to_bytes(str(pid)))
    return 0
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def truncate(self, size=-1):
            self.flush()
            if size == -1:
                size = self.tell()
            try:
                rv = os.ftruncate(self.fileno(), size)
            except OSError as e:
                raise IOError(*e.args)
            else:
                self.seek(size)  # move position&clear buffer
                return rv
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def truncate(self, size=-1):
        if size == -1:
            size = self.tell()
        try:
            rv = _original_os.ftruncate(self._fileno, size)
        except OSError as e:
            raise IOError(*e.args)
        else:
            self.seek(size)  # move position&clear buffer
            return rv
项目:shadowsocksrh    作者:hhhizzz    | 项目源码 | 文件源码
def write_pid_file(pid_file, pid):
    import fcntl
    import stat

    try:
        fd = os.open(pid_file, os.O_RDWR | os.O_CREAT,
                     stat.S_IRUSR | stat.S_IWUSR)
    except OSError as e:
        shell.print_exception(e)
        return -1
    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
    assert flags != -1
    flags |= fcntl.FD_CLOEXEC
    r = fcntl.fcntl(fd, fcntl.F_SETFD, flags)
    assert r != -1
    # There is no platform independent way to implement fcntl(fd, F_SETLK, &fl)
    # via fcntl.fcntl. So use lockf instead
    try:
        fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB, 0, 0, os.SEEK_SET)
    except IOError:
        r = os.read(fd, 32)
        if r:
            logging.error('already started at pid %s' % common.to_str(r))
        else:
            logging.error('already started')
        os.close(fd)
        return -1
    os.ftruncate(fd, 0)
    os.write(fd, common.to_bytes(str(pid)))
    return 0
项目:ShadowSocksShare-OpenShift    作者:the0demiurge    | 项目源码 | 文件源码
def write_pid_file(pid_file, pid):
    import fcntl
    import stat

    try:
        fd = os.open(pid_file, os.O_RDWR | os.O_CREAT,
                     stat.S_IRUSR | stat.S_IWUSR)
    except OSError as e:
        shell.print_exception(e)
        return -1
    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
    assert flags != -1
    flags |= fcntl.FD_CLOEXEC
    r = fcntl.fcntl(fd, fcntl.F_SETFD, flags)
    assert r != -1
    # There is no platform independent way to implement fcntl(fd, F_SETLK, &fl)
    # via fcntl.fcntl. So use lockf instead
    try:
        fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB, 0, 0, os.SEEK_SET)
    except IOError:
        r = os.read(fd, 32)
        if r:
            logging.error('already started at pid %s' % common.to_str(r))
        else:
            logging.error('already started')
        os.close(fd)
        return -1
    os.ftruncate(fd, 0)
    os.write(fd, common.to_bytes(str(pid)))
    return 0
项目:ssr-ml    作者:AlphaBrock    | 项目源码 | 文件源码
def write_pid_file(pid_file, pid):
    import fcntl
    import stat

    try:
        fd = os.open(pid_file, os.O_RDWR | os.O_CREAT,
                     stat.S_IRUSR | stat.S_IWUSR)
    except OSError as e:
        shell.print_exception(e)
        return -1
    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
    assert flags != -1
    flags |= fcntl.FD_CLOEXEC
    r = fcntl.fcntl(fd, fcntl.F_SETFD, flags)
    assert r != -1
    # There is no platform independent way to implement fcntl(fd, F_SETLK, &fl)
    # via fcntl.fcntl. So use lockf instead
    try:
        fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB, 0, 0, os.SEEK_SET)
    except IOError:
        r = os.read(fd, 32)
        if r:
            logging.error('already started at pid %s' % common.to_str(r))
        else:
            logging.error('already started')
        os.close(fd)
        return -1
    os.ftruncate(fd, 0)
    os.write(fd, common.to_bytes(str(pid)))
    return 0
项目:tm-librarian    作者:FabricAttachedMemory    | 项目源码 | 文件源码
def __init__(self, args, lfs_globals):
        super().__init__(args, lfs_globals)
        (head, tail) = os.path.split(args.shadow_file)
        assert os.path.isdir(head), 'No such directory %s' % head

        try:
            probe = tempfile.TemporaryFile(dir=head)
            probe.close()
        except OSError as e:
            raise RuntimeError('%s is not writeable' % head)

        if os.path.isfile(args.shadow_file):
            fd = os.open(args.shadow_file, os.O_RDWR)
        else:
            fd = os.open(args.shadow_file, os.O_RDWR | os.O_CREAT)
            size = lfs_globals['nvm_bytes_total']
            os.ftruncate(fd, size)
        self._shadow_fd = fd

        # Compare node requirements to actual file size
        statinfo = os.stat(args.shadow_file)
        assert self._S_IFREG_URW == self._S_IFREG_URW & statinfo.st_mode, \
            '%s is not RW'
        assert statinfo.st_size >= lfs_globals['nvm_bytes_total']
        args.aperture_base = 0
        args.aperture_size = statinfo.st_size
        args.addr_mode = self._MODE_FALLBACK
        super().__init__(args, lfs_globals)

    # open(), create(), release() only do caching as handled by superclass
项目:eos-data-distribution    作者:endlessm    | 项目源码 | 文件源码
def _create_files(self, filename):
        # XXX this is racy
        assert filename
        logger.debug('Opening files for ‘%s’', filename)

        mkdir_p(os.path.dirname(filename))
        self._part_filename = '%s.part' % (filename, )
        self._part_fd = os.open(
            self._part_filename, os.O_CREAT | os.O_WRONLY | os.O_NONBLOCK, 0o600)

        try:
            self._segments_file.lock()
        except IOError as e:
            # Clean up.
            os.close(self._part_fd)
            self._part_fd = -1
            self._part_filename = None

            if e.errno == errno.EAGAIN:
                # Cannot acquire lock: some other process (or part of this
                # process) is already downloading it. Clean up and watch that
                # file for completion.
                logger.debug('File ‘%s.sgt’ is locked: waiting on completion.',
                             filename)
                self._watch_for_completion(filename)
                return False
            else:
                raise

        # XXX hack
        return True

        # Reserve space for the full file and truncate any existing content to
        # the start of the final chunk (because it might be smaller than the
        # chunk size).
        size = self.chunk_size * (self._num_segments - 1)
        try:
            fallocate.fallocate(self._part_fd, 0, size)
        except IOError as e:  # if it fails, we might get surprises later, but it's ok.
            logger.debug('Error calling fallocate(%u, 0, %u): %s' %
                         (self._part_fd, self._size, e.message))
        try:
            os.ftruncate(self._part_fd, size)
        except IOError as e:
            logger.debug('Error calling ftruncate(%u, %u): %s' %
                         (self._part_fd, size, e.message))

        return True