Python shutil 模块,copymode() 实例源码

我们从Python开源项目中,提取了以下38个代码示例,用于说明如何使用shutil.copymode()

项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def write_file(self, new_text, filename, old_text, encoding):
        if not self.nobackups:
            # Make backup
            backup = filename + ".bak"
            if os.path.lexists(backup):
                try:
                    os.remove(backup)
                except os.error, err:
                    self.log_message("Can't remove backup %s", backup)
            try:
                os.rename(filename, backup)
            except os.error, err:
                self.log_message("Can't rename %s to %s", filename, backup)
        # Actually write the new file
        write = super(StdoutRefactoringTool, self).write_file
        write(new_text, filename, old_text, encoding)
        if not self.nobackups:
            shutil.copymode(backup, filename)
项目:warriorframework    作者:warriorframework    | 项目源码 | 文件源码
def copymode(src, dst):
    """
    Copy the permission bits from src to dst. The file contents, owner, and
    group are unaffected. src and dst are path names given as strings.
    :Arguments:
        src - mode of the file to be copied
        dst - file on which mode has to be copied
    :Return:
        True/False - based on the success/failure of the operation
    """
    status = False
    try:
        shutil.copymode(src, dst)
        print_info("mode of src {} copied to dst {} successfully".
                   format(src, dst))
        status = True
    except Exception as e:
        print_error("copying file mode from {} to file {} raised exception {}".
                    format(src, dst, str(e)))
    return status
项目:cx_Freeze    作者:anthony-tuininga    | 项目源码 | 文件源码
def _CopyFile(self, source, target, copyDependentFiles,
            includeMode = False):
        normalizedSource = os.path.normcase(os.path.normpath(source))
        normalizedTarget = os.path.normcase(os.path.normpath(target))
        if normalizedTarget in self.filesCopied:
            return
        if normalizedSource == normalizedTarget:
            return
        self._RemoveFile(target)
        targetDir = os.path.dirname(target)
        self._CreateDirectory(targetDir)
        if not self.silent:
            sys.stdout.write("copying %s -> %s\n" % (source, target))
        shutil.copyfile(source, target)
        shutil.copystat(source, target)
        if includeMode:
            shutil.copymode(source, target)
        self.filesCopied[normalizedTarget] = None
        if copyDependentFiles \
                and source not in self.finder.excludeDependentFiles:
            for source in self._GetDependentFiles(source):
                target = os.path.join(targetDir, os.path.basename(source))
                self._CopyFile(source, target, copyDependentFiles)
项目:nitro    作者:KVM-VMI    | 项目源码 | 文件源码
def add_file(self, path):
        source = Path(path)
        destination = os.path.join(self.cdrom_dir, source.name)
        shutil.copyfile(str(source), destination)
        shutil.copymode(str(source), destination)
项目:jumpscale_portal    作者:jumpscale7    | 项目源码 | 文件源码
def __copy(self, src, dst):
        """Internal copy procedure"""
        dstDir = os.path.dirname(dst)
        if not self.__isAllowed(src, 'read'):
            self.__errorData(src, 'Access denied')
            return False
        if not self.__isAllowed(dstDir, 'write'):
            self.__errorData(dstDir, 'Access denied')
            return False
        if os.path.exists(dst):
            self.__errorData(dst, 'File or folder with the same name already exists')
            return False

        if not os.path.isdir(src):
            try:
                shutil.copyfile(src, dst)
                shutil.copymode(src, dst)
                return True
            except:
                self.__errorData(src, 'Unable to copy files')
                return False
        else:
            try:
                os.mkdir(dst)
                shutil.copymode(src, dst)
            except:
                self.__errorData(src, 'Unable to copy files')
                return False

            for i in os.listdir(src):
                newSrc = os.path.join(src, i)
                newDst = os.path.join(dst, i)
                if not self.__copy(newSrc, newDst):
                    self.__errorData(newSrc, 'Unable to copy files')
                    return False

        return True
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def write_file(self, new_text, filename, old_text, encoding):
        orig_filename = filename
        if self._output_dir:
            if filename.startswith(self._input_base_dir):
                filename = os.path.join(self._output_dir,
                                        filename[len(self._input_base_dir):])
            else:
                raise ValueError('filename %s does not start with the '
                                 'input_base_dir %s' % (
                                         filename, self._input_base_dir))
        if self._append_suffix:
            filename += self._append_suffix
        if orig_filename != filename:
            output_dir = os.path.dirname(filename)
            if not os.path.isdir(output_dir):
                os.makedirs(output_dir)
            self.log_message('Writing converted %s to %s.', orig_filename,
                             filename)
        if not self.nobackups:
            # Make backup
            backup = filename + ".bak"
            if os.path.lexists(backup):
                try:
                    os.remove(backup)
                except os.error as err:
                    self.log_message("Can't remove backup %s", backup)
            try:
                os.rename(filename, backup)
            except os.error as err:
                self.log_message("Can't rename %s to %s", filename, backup)
        # Actually write the new file
        write = super(StdoutRefactoringTool, self).write_file
        write(new_text, filename, old_text, encoding)
        if not self.nobackups:
            shutil.copymode(backup, filename)
        if orig_filename != filename:
            # Preserve the file mode in the new output directory.
            shutil.copymode(orig_filename, filename)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def write_file(self, new_text, filename, old_text, encoding):
        orig_filename = filename
        if self._output_dir:
            if filename.startswith(self._input_base_dir):
                filename = os.path.join(self._output_dir,
                                        filename[len(self._input_base_dir):])
            else:
                raise ValueError('filename %s does not start with the '
                                 'input_base_dir %s' % (
                                         filename, self._input_base_dir))
        if self._append_suffix:
            filename += self._append_suffix
        if orig_filename != filename:
            output_dir = os.path.dirname(filename)
            if not os.path.isdir(output_dir):
                os.makedirs(output_dir)
            self.log_message('Writing converted %s to %s.', orig_filename,
                             filename)
        if not self.nobackups:
            # Make backup
            backup = filename + ".bak"
            if os.path.lexists(backup):
                try:
                    os.remove(backup)
                except os.error, err:
                    self.log_message("Can't remove backup %s", backup)
            try:
                os.rename(filename, backup)
            except os.error, err:
                self.log_message("Can't rename %s to %s", filename, backup)
        # Actually write the new file
        write = super(StdoutRefactoringTool, self).write_file
        write(new_text, filename, old_text, encoding)
        if not self.nobackups:
            shutil.copymode(backup, filename)
        if orig_filename != filename:
            # Preserve the file mode in the new output directory.
            shutil.copymode(orig_filename, filename)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def write_file(self, new_text, filename, old_text, encoding):
        orig_filename = filename
        if self._output_dir:
            if filename.startswith(self._input_base_dir):
                filename = os.path.join(self._output_dir,
                                        filename[len(self._input_base_dir):])
            else:
                raise ValueError('filename %s does not start with the '
                                 'input_base_dir %s' % (
                                         filename, self._input_base_dir))
        if self._append_suffix:
            filename += self._append_suffix
        if orig_filename != filename:
            output_dir = os.path.dirname(filename)
            if not os.path.isdir(output_dir):
                os.makedirs(output_dir)
            self.log_message('Writing converted %s to %s.', orig_filename,
                             filename)
        if not self.nobackups:
            # Make backup
            backup = filename + ".bak"
            if os.path.lexists(backup):
                try:
                    os.remove(backup)
                except os.error, err:
                    self.log_message("Can't remove backup %s", backup)
            try:
                os.rename(filename, backup)
            except os.error, err:
                self.log_message("Can't rename %s to %s", filename, backup)
        # Actually write the new file
        write = super(StdoutRefactoringTool, self).write_file
        write(new_text, filename, old_text, encoding)
        if not self.nobackups:
            shutil.copymode(backup, filename)
        if orig_filename != filename:
            # Preserve the file mode in the new output directory.
            shutil.copymode(orig_filename, filename)
项目:specto    作者:mrknow    | 项目源码 | 文件源码
def write_file(self, new_text, filename, old_text, encoding):
        orig_filename = filename
        if self._output_dir:
            if filename.startswith(self._input_base_dir):
                filename = os.path.join(self._output_dir,
                                        filename[len(self._input_base_dir):])
            else:
                raise ValueError('filename %s does not start with the '
                                 'input_base_dir %s' % (
                                         filename, self._input_base_dir))
        if self._append_suffix:
            filename += self._append_suffix
        if orig_filename != filename:
            output_dir = os.path.dirname(filename)
            if not os.path.isdir(output_dir):
                os.makedirs(output_dir)
            self.log_message('Writing converted %s to %s.', orig_filename,
                             filename)
        if not self.nobackups:
            # Make backup
            backup = filename + ".bak"
            if os.path.lexists(backup):
                try:
                    os.remove(backup)
                except os.error, err:
                    self.log_message("Can't remove backup %s", backup)
            try:
                os.rename(filename, backup)
            except os.error, err:
                self.log_message("Can't rename %s to %s", filename, backup)
        # Actually write the new file
        write = super(StdoutRefactoringTool, self).write_file
        write(new_text, filename, old_text, encoding)
        if not self.nobackups:
            shutil.copymode(backup, filename)
        if orig_filename != filename:
            # Preserve the file mode in the new output directory.
            shutil.copymode(orig_filename, filename)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_copymode_follow_symlinks(self):
        tmp_dir = self.mkdtemp()
        src = os.path.join(tmp_dir, 'foo')
        dst = os.path.join(tmp_dir, 'bar')
        src_link = os.path.join(tmp_dir, 'baz')
        dst_link = os.path.join(tmp_dir, 'quux')
        write_file(src, 'foo')
        write_file(dst, 'foo')
        os.symlink(src, src_link)
        os.symlink(dst, dst_link)
        os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
        # file to file
        os.chmod(dst, stat.S_IRWXO)
        self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
        shutil.copymode(src, dst)
        self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
        # follow src link
        os.chmod(dst, stat.S_IRWXO)
        shutil.copymode(src_link, dst)
        self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
        # follow dst link
        os.chmod(dst, stat.S_IRWXO)
        shutil.copymode(src, dst_link)
        self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
        # follow both links
        os.chmod(dst, stat.S_IRWXO)
        shutil.copymode(src_link, dst)
        self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_copymode_symlink_to_symlink(self):
        tmp_dir = self.mkdtemp()
        src = os.path.join(tmp_dir, 'foo')
        dst = os.path.join(tmp_dir, 'bar')
        src_link = os.path.join(tmp_dir, 'baz')
        dst_link = os.path.join(tmp_dir, 'quux')
        write_file(src, 'foo')
        write_file(dst, 'foo')
        os.symlink(src, src_link)
        os.symlink(dst, dst_link)
        os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
        os.chmod(dst, stat.S_IRWXU)
        os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
        # link to link
        os.lchmod(dst_link, stat.S_IRWXO)
        shutil.copymode(src_link, dst_link, follow_symlinks=False)
        self.assertEqual(os.lstat(src_link).st_mode,
                         os.lstat(dst_link).st_mode)
        self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
        # src link - use chmod
        os.lchmod(dst_link, stat.S_IRWXO)
        shutil.copymode(src_link, dst, follow_symlinks=False)
        self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
        # dst link - use chmod
        os.lchmod(dst_link, stat.S_IRWXO)
        shutil.copymode(src, dst_link, follow_symlinks=False)
        self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_copymode_symlink_to_symlink_wo_lchmod(self):
        tmp_dir = self.mkdtemp()
        src = os.path.join(tmp_dir, 'foo')
        dst = os.path.join(tmp_dir, 'bar')
        src_link = os.path.join(tmp_dir, 'baz')
        dst_link = os.path.join(tmp_dir, 'quux')
        write_file(src, 'foo')
        write_file(dst, 'foo')
        os.symlink(src, src_link)
        os.symlink(dst, dst_link)
        shutil.copymode(src_link, dst_link, follow_symlinks=False)  # silent fail
项目:sublimeTextConfig    作者:luoye-fe    | 项目源码 | 文件源码
def write_file(self, new_text, filename, old_text, encoding):
        orig_filename = filename
        if self._output_dir:
            if filename.startswith(self._input_base_dir):
                filename = os.path.join(self._output_dir,
                                        filename[len(self._input_base_dir):])
            else:
                raise ValueError('filename %s does not start with the '
                                 'input_base_dir %s' % (
                                         filename, self._input_base_dir))
        if self._append_suffix:
            filename += self._append_suffix
        if orig_filename != filename:
            output_dir = os.path.dirname(filename)
            if not os.path.isdir(output_dir):
                os.makedirs(output_dir)
            self.log_message('Writing converted %s to %s.', orig_filename,
                             filename)
        if not self.nobackups:
            # Make backup
            backup = filename + ".bak"
            if os.path.lexists(backup):
                try:
                    os.remove(backup)
                except os.error as err:
                    self.log_message("Can't remove backup %s", backup)
            try:
                os.rename(filename, backup)
            except os.error as err:
                self.log_message("Can't rename %s to %s", filename, backup)
        # Actually write the new file
        write = super(StdoutRefactoringTool, self).write_file
        write(new_text, filename, old_text, encoding)
        if not self.nobackups:
            shutil.copymode(backup, filename)
        if orig_filename != filename:
            # Preserve the file mode in the new output directory.
            shutil.copymode(orig_filename, filename)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def write_file(self, new_text, filename, old_text, encoding):
        orig_filename = filename
        if self._output_dir:
            if filename.startswith(self._input_base_dir):
                filename = os.path.join(self._output_dir,
                                        filename[len(self._input_base_dir):])
            else:
                raise ValueError('filename %s does not start with the '
                                 'input_base_dir %s' % (
                                         filename, self._input_base_dir))
        if self._append_suffix:
            filename += self._append_suffix
        if orig_filename != filename:
            output_dir = os.path.dirname(filename)
            if not os.path.isdir(output_dir):
                os.makedirs(output_dir)
            self.log_message('Writing converted %s to %s.', orig_filename,
                             filename)
        if not self.nobackups:
            # Make backup
            backup = filename + ".bak"
            if os.path.lexists(backup):
                try:
                    os.remove(backup)
                except os.error, err:
                    self.log_message("Can't remove backup %s", backup)
            try:
                os.rename(filename, backup)
            except os.error, err:
                self.log_message("Can't rename %s to %s", filename, backup)
        # Actually write the new file
        write = super(StdoutRefactoringTool, self).write_file
        write(new_text, filename, old_text, encoding)
        if not self.nobackups:
            shutil.copymode(backup, filename)
        if orig_filename != filename:
            # Preserve the file mode in the new output directory.
            shutil.copymode(orig_filename, filename)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_copymode_follow_symlinks(self):
        tmp_dir = self.mkdtemp()
        src = os.path.join(tmp_dir, 'foo')
        dst = os.path.join(tmp_dir, 'bar')
        src_link = os.path.join(tmp_dir, 'baz')
        dst_link = os.path.join(tmp_dir, 'quux')
        write_file(src, 'foo')
        write_file(dst, 'foo')
        os.symlink(src, src_link)
        os.symlink(dst, dst_link)
        os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
        # file to file
        os.chmod(dst, stat.S_IRWXO)
        self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
        shutil.copymode(src, dst)
        self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
        # On Windows, os.chmod does not follow symlinks (issue #15411)
        if os.name != 'nt':
            # follow src link
            os.chmod(dst, stat.S_IRWXO)
            shutil.copymode(src_link, dst)
            self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
            # follow dst link
            os.chmod(dst, stat.S_IRWXO)
            shutil.copymode(src, dst_link)
            self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
            # follow both links
            os.chmod(dst, stat.S_IRWXO)
            shutil.copymode(src_link, dst_link)
            self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_copymode_symlink_to_symlink(self):
        tmp_dir = self.mkdtemp()
        src = os.path.join(tmp_dir, 'foo')
        dst = os.path.join(tmp_dir, 'bar')
        src_link = os.path.join(tmp_dir, 'baz')
        dst_link = os.path.join(tmp_dir, 'quux')
        write_file(src, 'foo')
        write_file(dst, 'foo')
        os.symlink(src, src_link)
        os.symlink(dst, dst_link)
        os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
        os.chmod(dst, stat.S_IRWXU)
        os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
        # link to link
        os.lchmod(dst_link, stat.S_IRWXO)
        shutil.copymode(src_link, dst_link, follow_symlinks=False)
        self.assertEqual(os.lstat(src_link).st_mode,
                         os.lstat(dst_link).st_mode)
        self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
        # src link - use chmod
        os.lchmod(dst_link, stat.S_IRWXO)
        shutil.copymode(src_link, dst, follow_symlinks=False)
        self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
        # dst link - use chmod
        os.lchmod(dst_link, stat.S_IRWXO)
        shutil.copymode(src, dst_link, follow_symlinks=False)
        self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_copymode_symlink_to_symlink_wo_lchmod(self):
        tmp_dir = self.mkdtemp()
        src = os.path.join(tmp_dir, 'foo')
        dst = os.path.join(tmp_dir, 'bar')
        src_link = os.path.join(tmp_dir, 'baz')
        dst_link = os.path.join(tmp_dir, 'quux')
        write_file(src, 'foo')
        write_file(dst, 'foo')
        os.symlink(src, src_link)
        os.symlink(dst, dst_link)
        shutil.copymode(src_link, dst_link, follow_symlinks=False)  # silent fail
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_module_all_attribute(self):
        self.assertTrue(hasattr(shutil, '__all__'))
        target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat',
                      'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error',
                      'SpecialFileError', 'ExecError', 'make_archive',
                      'get_archive_formats', 'register_archive_format',
                      'unregister_archive_format', 'get_unpack_formats',
                      'register_unpack_format', 'unregister_unpack_format',
                      'unpack_archive', 'ignore_patterns', 'chown', 'which',
                      'get_terminal_size', 'SameFileError']
        if hasattr(os, 'statvfs') or os.name == 'nt':
            target_api.append('disk_usage')
        self.assertEqual(set(shutil.__all__), set(target_api))
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def write_file(self, new_text, filename, old_text, encoding):
        orig_filename = filename
        if self._output_dir:
            if filename.startswith(self._input_base_dir):
                filename = os.path.join(self._output_dir,
                                        filename[len(self._input_base_dir):])
            else:
                raise ValueError('filename %s does not start with the '
                                 'input_base_dir %s' % (
                                         filename, self._input_base_dir))
        if self._append_suffix:
            filename += self._append_suffix
        if orig_filename != filename:
            output_dir = os.path.dirname(filename)
            if not os.path.isdir(output_dir):
                os.makedirs(output_dir)
            self.log_message('Writing converted %s to %s.', orig_filename,
                             filename)
        if not self.nobackups:
            # Make backup
            backup = filename + ".bak"
            if os.path.lexists(backup):
                try:
                    os.remove(backup)
                except OSError as err:
                    self.log_message("Can't remove backup %s", backup)
            try:
                os.rename(filename, backup)
            except OSError as err:
                self.log_message("Can't rename %s to %s", filename, backup)
        # Actually write the new file
        write = super(StdoutRefactoringTool, self).write_file
        write(new_text, filename, old_text, encoding)
        if not self.nobackups:
            shutil.copymode(backup, filename)
        if orig_filename != filename:
            # Preserve the file mode in the new output directory.
            shutil.copymode(orig_filename, filename)
项目:empyrion-python-api    作者:huhlig    | 项目源码 | 文件源码
def write_file(self, new_text, filename, old_text, encoding):
        orig_filename = filename
        if self._output_dir:
            if filename.startswith(self._input_base_dir):
                filename = os.path.join(self._output_dir,
                                        filename[len(self._input_base_dir):])
            else:
                raise ValueError('filename %s does not start with the '
                                 'input_base_dir %s' % (
                                         filename, self._input_base_dir))
        if self._append_suffix:
            filename += self._append_suffix
        if orig_filename != filename:
            output_dir = os.path.dirname(filename)
            if not os.path.isdir(output_dir):
                os.makedirs(output_dir)
            self.log_message('Writing converted %s to %s.', orig_filename,
                             filename)
        if not self.nobackups:
            # Make backup
            backup = filename + ".bak"
            if os.path.lexists(backup):
                try:
                    os.remove(backup)
                except os.error, err:
                    self.log_message("Can't remove backup %s", backup)
            try:
                os.rename(filename, backup)
            except os.error, err:
                self.log_message("Can't rename %s to %s", filename, backup)
        # Actually write the new file
        write = super(StdoutRefactoringTool, self).write_file
        write(new_text, filename, old_text, encoding)
        if not self.nobackups:
            shutil.copymode(backup, filename)
        if orig_filename != filename:
            # Preserve the file mode in the new output directory.
            shutil.copymode(orig_filename, filename)
项目:ops_agent    作者:sjqzhang    | 项目源码 | 文件源码
def updateFile(install_path, diff_path, diff_list, conf_file_list, user, group, pkg_type):
    uid = pwd.getpwnam(user).pw_uid
    gid = grp.getgrnam(group).gr_gid
    for diff in diff_list:
        file = diff['file'].lstrip('/')
        if pkg_type == 'conf' and file == "package.conf.yaml":
            continue
        src = os.path.join(diff_path, file)
        dst = os.path.join(install_path, file)
        if pkg_type != 'conf' and os.path.realpath(dst) in conf_file_list:
            continue
        if diff['op'] == "M":
            shutil.copy2(src, dst)
            os.chown(dst, uid, gid)
        elif diff['op'] == "A":
            public_file_list.append(file)
            if not os.path.isdir(os.path.dirname(dst)):
                mkdir(os.path.dirname(dst))
                os.chown(os.path.dirname(dst), uid, gid)
            if os.path.isdir(src):
                mkdir(dst)
                os.chown(dst, uid, gid)
            else:
                shutil.copy2(src, dst)
                os.chown(dst, uid, gid)
        elif diff['op'] == "X":
            shutil.copymode(src, dst)
        elif diff['op'] == "D":
            if file in public_file_list:
                continue
            if os.path.isfile(dst):
                os.remove(dst)
            elif os.path.isdir(dst):
                if os.path.islink(dst):
                    os.remove(dst)
                else:
                    shutil.rmtree(dst)
    return 0, 'ok'
项目:greptile    作者:ncornette    | 项目源码 | 文件源码
def sed_i(files, expr, replace_exp, only_first_occurrence=False):
    """
    Massively search/replace matching lines in files.

    Similar to:

    sed -i "s/expr/replace_expr/g" files...

    :type files: enumerate or list
    :param files: file names generator
    :type expr: str or pattern
    :type replace_exp: str
    :param only_first_occurrence: replace only first occurrence per line
    """
    r = _compiled_re(expr)
    for f in files:
        with open(f, 'r') as source:
            tmp_f = f + '.pygrep.tmp'
            with open(tmp_f, 'w') as dest:
                sed(source, r, replace_exp, dest, only_first_occurrence)

        shutil.copymode(f, tmp_f)
        ori_f = f + '.pygrep.ori'
        os.rename(f, ori_f)
        os.rename(tmp_f, f)
        os.remove(ori_f)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_copymode_follow_symlinks(self):
        tmp_dir = self.mkdtemp()
        src = os.path.join(tmp_dir, 'foo')
        dst = os.path.join(tmp_dir, 'bar')
        src_link = os.path.join(tmp_dir, 'baz')
        dst_link = os.path.join(tmp_dir, 'quux')
        write_file(src, 'foo')
        write_file(dst, 'foo')
        os.symlink(src, src_link)
        os.symlink(dst, dst_link)
        os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
        # file to file
        os.chmod(dst, stat.S_IRWXO)
        self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
        shutil.copymode(src, dst)
        self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
        # On Windows, os.chmod does not follow symlinks (issue #15411)
        if os.name != 'nt':
            # follow src link
            os.chmod(dst, stat.S_IRWXO)
            shutil.copymode(src_link, dst)
            self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
            # follow dst link
            os.chmod(dst, stat.S_IRWXO)
            shutil.copymode(src, dst_link)
            self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
            # follow both links
            os.chmod(dst, stat.S_IRWXO)
            shutil.copymode(src_link, dst_link)
            self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_copymode_symlink_to_symlink(self):
        tmp_dir = self.mkdtemp()
        src = os.path.join(tmp_dir, 'foo')
        dst = os.path.join(tmp_dir, 'bar')
        src_link = os.path.join(tmp_dir, 'baz')
        dst_link = os.path.join(tmp_dir, 'quux')
        write_file(src, 'foo')
        write_file(dst, 'foo')
        os.symlink(src, src_link)
        os.symlink(dst, dst_link)
        os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
        os.chmod(dst, stat.S_IRWXU)
        os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
        # link to link
        os.lchmod(dst_link, stat.S_IRWXO)
        shutil.copymode(src_link, dst_link, follow_symlinks=False)
        self.assertEqual(os.lstat(src_link).st_mode,
                         os.lstat(dst_link).st_mode)
        self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
        # src link - use chmod
        os.lchmod(dst_link, stat.S_IRWXO)
        shutil.copymode(src_link, dst, follow_symlinks=False)
        self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
        # dst link - use chmod
        os.lchmod(dst_link, stat.S_IRWXO)
        shutil.copymode(src, dst_link, follow_symlinks=False)
        self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_copymode_symlink_to_symlink_wo_lchmod(self):
        tmp_dir = self.mkdtemp()
        src = os.path.join(tmp_dir, 'foo')
        dst = os.path.join(tmp_dir, 'bar')
        src_link = os.path.join(tmp_dir, 'baz')
        dst_link = os.path.join(tmp_dir, 'quux')
        write_file(src, 'foo')
        write_file(dst, 'foo')
        os.symlink(src, src_link)
        os.symlink(dst, dst_link)
        shutil.copymode(src_link, dst_link, follow_symlinks=False)  # silent fail
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def write_file(self, new_text, filename, old_text, encoding):
        orig_filename = filename
        if self._output_dir:
            if filename.startswith(self._input_base_dir):
                filename = os.path.join(self._output_dir,
                                        filename[len(self._input_base_dir):])
            else:
                raise ValueError('filename %s does not start with the '
                                 'input_base_dir %s' % (
                                         filename, self._input_base_dir))
        if self._append_suffix:
            filename += self._append_suffix
        if orig_filename != filename:
            output_dir = os.path.dirname(filename)
            if not os.path.isdir(output_dir):
                os.makedirs(output_dir)
            self.log_message('Writing converted %s to %s.', orig_filename,
                             filename)
        if not self.nobackups:
            # Make backup
            backup = filename + ".bak"
            if os.path.lexists(backup):
                try:
                    os.remove(backup)
                except OSError as err:
                    self.log_message("Can't remove backup %s", backup)
            try:
                os.rename(filename, backup)
            except OSError as err:
                self.log_message("Can't rename %s to %s", filename, backup)
        # Actually write the new file
        write = super(StdoutRefactoringTool, self).write_file
        write(new_text, filename, old_text, encoding)
        if not self.nobackups:
            shutil.copymode(backup, filename)
        if orig_filename != filename:
            # Preserve the file mode in the new output directory.
            shutil.copymode(orig_filename, filename)
项目:py    作者:pytest-dev    | 项目源码 | 文件源码
def copy(self, target, mode=False, stat=False):
        """ copy path to target.

            If mode is True, will copy copy permission from path to target.
            If stat is True, copy permission, last modification
            time, last access time, and flags from path to target.
        """
        if self.check(file=1):
            if target.check(dir=1):
                target = target.join(self.basename)
            assert self!=target
            copychunked(self, target)
            if mode:
                copymode(self.strpath, target.strpath)
            if stat:
                copystat(self, target)
        else:
            def rec(p):
                return p.check(link=0)
            for x in self.visit(rec=rec):
                relpath = x.relto(self)
                newx = target.join(relpath)
                newx.dirpath().ensure(dir=1)
                if x.check(link=1):
                    newx.mksymlinkto(x.readlink())
                    continue
                elif x.check(file=1):
                    copychunked(x, newx)
                elif x.check(dir=1):
                    newx.ensure(dir=1)
                if mode:
                    copymode(x.strpath, newx.strpath)
                if stat:
                    copystat(x, newx)
项目:py    作者:pytest-dev    | 项目源码 | 文件源码
def copymode(src, dest):
    """ copy permission from src to dst. """
    import shutil
    shutil.copymode(src, dest)
项目:mcconf    作者:ManyThreads    | 项目源码 | 文件源码
def install(self, tgtdir, tmplenv):
        """install the file into the target directory."""
        srcfile = os.path.abspath(self.srcfile)
        tgtfile = os.path.abspath(os.path.join(tgtdir, self.dstfile))

        logging.debug('installing file %s to %s mode %s from module %s',
                          srcfile, tgtfile, self.installMode, self.module)
        if not os.path.isfile(srcfile):
            logging.warning('file %s is missing or not regular file, provided by %s from %s',
                            self.srcfile, self.module, self.module.modulefile)

        if not os.path.exists(os.path.dirname(tgtfile)):
            os.makedirs(os.path.dirname(tgtfile))
        # TODO ideally we should never overwrite files, reconfig run should delete previously installed files
        if os.path.exists(tgtfile) or os.path.islink(tgtfile):
            os.unlink(tgtfile)

        if self.installMode=='link':
            os.symlink(os.path.relpath(srcfile, os.path.dirname(tgtfile)), tgtfile)
        elif self.installMode=='hardlink':
            os.link(srcfile, tgtfile)
        elif self.installMode=='cinclude':
            with open(tgtfile, 'w') as f:
                f.write('#include "'+os.path.relpath(srcfile, tgtfile)+'"\n')
        elif self.installMode=='mako':
            with open(tgtfile, 'w') as f:
                tmpl = mako.template.Template(filename=self.srcfile,
                         imports=['import os'])
                ctx = mako.runtime.Context(f, **tmplenv)
                tmpl.render_context(ctx)
            shutil.copymode(srcfile, tgtfile)
        else: # copy the file
            shutil.copy2(srcfile, tgtfile)
项目:v8-nuget    作者:pmed    | 项目源码 | 文件源码
def write_hunks(self, srcname, tgtname, hunks):
    src = open(srcname, "rb")
    tgt = open(tgtname, "wb")

    debug("processing target file %s" % tgtname)

    tgt.writelines(self.patch_stream(src, hunks))

    tgt.close()
    src.close()
    # [ ] TODO: add test for permission copy
    shutil.copymode(srcname, tgtname)
    return True
项目:ZServer    作者:zopefoundation    | 项目源码 | 文件源码
def copydir(args, sourcedir, names):
    targetdir, replacements, uid, gid = args
    # Don't recurse into CVS directories:
    for name in names[:]:
        if os.path.normcase(name) in CVS_DIRS:
            names.remove(name)
        elif os.path.isfile(os.path.join(sourcedir, name)):
            # Copy the file:
            sn, ext = os.path.splitext(name)
            if os.path.normcase(ext) == ".in":
                dst = os.path.join(targetdir, sourcedir, sn)
                if os.path.exists(dst):
                    continue
                copyin(os.path.join(sourcedir, name), dst, replacements, uid,
                       gid)
                if uid is not None:
                    os.chown(dst, uid, gid)
            else:
                src = os.path.join(sourcedir, name)
                dst = os.path.join(targetdir, src)
                if os.path.exists(dst):
                    continue
                shutil.copyfile(src, dst)
                shutil.copymode(src, dst)
                if uid is not None:
                    os.chown(dst, uid, gid)
        else:
            # Directory:
            dn = os.path.join(targetdir, sourcedir, name)
            if not os.path.exists(dn):
                os.mkdir(dn)
                shutil.copymode(os.path.join(sourcedir, name), dn)
                if uid is not None:
                    os.chown(dn, uid, gid)
项目:ZServer    作者:zopefoundation    | 项目源码 | 文件源码
def copyin(src, dst, replacements, uid, gid):
    ifp = open(src)
    text = ifp.read()
    ifp.close()
    for k in replacements:
        text = text.replace("<<%s>>" % k, replacements[k])
    ofp = open(dst, "w")
    ofp.write(text)
    ofp.close()
    shutil.copymode(src, dst)
    if uid is not None:
        os.chown(dst, uid, gid)
项目:kobo    作者:release-engineering    | 项目源码 | 文件源码
def _copy_file(path_old, path_new, name):
    # HACK: use .template suffix to prevent .py file byte compiling
    if path_new.endswith(".template"):
        path_new = path_new[:-9]

    fp_old = open(path_old, 'r')
    fp_new = open(path_new, 'w')

    # following django template sysntax
    fp_new.write(fp_old.read()\
        .replace('{{ project_name }}', name)\
        .replace('{{ project_name|upper }}', name.upper())\
        .replace('{{ project_name|camel }}', _camelize(name))\
        .replace('{{ project_name|camel_cmd }}', _camelize(name, fill_char="_"))
    )
    fp_old.close()
    fp_new.close()

    # copy permissions
    try:
        shutil.copymode(path_old, path_new)
    except OSError:
        sys.stderr.write("Notice: Couldn't set permission bits on %s. You're probably using an uncommon filesystem setup. No problem.\n" % path_new)


# based on django.core.management.base.copy_helper()
项目:xpybuild    作者:xpybuild    | 项目源码 | 文件源码
def _copyFile(self, context, src, dest):
        src = normLongPath(src)
        dest = normLongPath(dest)
        with open(src, 'rb') as inp:
            with openForWrite(dest, 'wb') as out:
                shutil.copyfileobj(inp, out)
        shutil.copymode(src, dest)
        assert os.path.exists(dest)
项目:xpybuild    作者:xpybuild    | 项目源码 | 文件源码
def _copyFile(self, context, src, dest):
        dest = normLongPath(dest)
        src = normLongPath(src)
        with open(src, 'rb') as s:
            with openForWrite(dest, 'wb') as d:
                for m in self.mappers:
                    x = m.getHeader(context)
                    if x: 
                        self.__unusedMappers.discard(m)
                        d.write(x)

                for l in s:
                    for m in self.mappers:
                        prev = l
                        l = m.mapLine(context, l)
                        if prev != l:
                            self.__unusedMappers.discard(m)
                        if None == l:
                            break
                    if None != l:
                        d.write(l)

                for m in self.mappers:
                    x = m.getFooter(context)
                    if x: 
                        self.__unusedMappers.discard(m)
                        d.write(x)
        shutil.copymode(src, dest)
        assert os.path.exists(dest)
项目:specto    作者:mrknow    | 项目源码 | 文件源码
def retimestamp_and_index_file(inpath, outpath=None, retimestamp=None):

    # no retimestamping needed
    if retimestamp is None:

        return index_file(inpath, outpath)

    # retimestamp the input in place and index
    elif retimestamp == 'inplace':
        from flvlib.scripts.retimestamp_flv import retimestamp_file_inplace

        log.debug("Retimestamping file `%s' in place", inpath)

        # retimestamp the file inplace
        if not retimestamp_file_inplace(inpath):
            log.error("Failed to retimestamp `%s' in place", inpath)
            return False

        return index_file(inpath, outpath)

    # retimestamp the input into a temporary file
    elif retimestamp == 'atomic':
        from flvlib.scripts.retimestamp_flv import retimestamp_file_atomically

        log.debug("Retimestamping file `%s' atomically", inpath)

        try:
            fd, temppath = tempfile.mkstemp()
            os.close(fd)
            # preserve the permission bits
            shutil.copymode(inpath, temppath)
        except EnvironmentError, (errno, strerror):
            log.error("Failed to create temporary file: %s", strerror)
            return False

        if not retimestamp_file_atomically(inpath, temppath):
            log.error("Failed to retimestamp `%s' atomically", inpath)
            # remove the temporary files
            force_remove(temppath)
            return False

        # index the temporary file
        if not index_file(temppath, outpath):
            force_remove(temppath)
            return False

        if not outpath:
            # If we were not writing directly to the output file
            # we need to overwrite the original
            try:
                shutil.move(temppath, inpath)
            except EnvironmentError, (errno, strerror):
                log.error("Failed to overwrite the original file with the "
                          "retimestamped and indexed version: %s", strerror)
                return False
        else:
            # if we were writing directly to the output file we need to remove
            # the retimestamped temporary file
            force_remove(temppath)

        return True
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def copy_helper(style, app_or_project, name, directory, other_name=''):
    """
    Copies either a Django application layout template or a Django project
    layout template into the specified directory.

    """
    # style -- A color style object (see django.core.management.color).
    # app_or_project -- The string 'app' or 'project'.
    # name -- The name of the application or project.
    # directory -- The directory to which the layout template should be copied.
    # other_name -- When copying an application layout, this should be the name
    #               of the project.
    import re
    import shutil
    other = {'project': 'app', 'app': 'project'}[app_or_project]
    if not re.search(r'^[_a-zA-Z]\w*$', name): # If it's not a valid directory name.
        # Provide a smart error message, depending on the error.
        if not re.search(r'^[_a-zA-Z]', name):
            message = 'make sure the name begins with a letter or underscore'
        else:
            message = 'use only numbers, letters and underscores'
        raise CommandError("%r is not a valid %s name. Please %s." % (name, app_or_project, message))
    top_dir = os.path.join(directory, name)
    try:
        os.mkdir(top_dir)
    except OSError, e:
        raise CommandError(e)

    # Determine where the app or project templates are. Use
    # django.__path__[0] because we don't know into which directory
    # django has been installed.
    template_dir = os.path.join(django.__path__[0], 'conf', '%s_template' % app_or_project)

    for d, subdirs, files in os.walk(template_dir):
        relative_dir = d[len(template_dir)+1:].replace('%s_name' % app_or_project, name)
        if relative_dir:
            os.mkdir(os.path.join(top_dir, relative_dir))
        for subdir in subdirs[:]:
            if subdir.startswith('.'):
                subdirs.remove(subdir)
        for f in files:
            if not f.endswith('.py'):
                # Ignore .pyc, .pyo, .py.class etc, as they cause various
                # breakages.
                continue
            path_old = os.path.join(d, f)
            path_new = os.path.join(top_dir, relative_dir, f.replace('%s_name' % app_or_project, name))
            fp_old = open(path_old, 'r')
            fp_new = open(path_new, 'w')
            fp_new.write(fp_old.read().replace('{{ %s_name }}' % app_or_project, name).replace('{{ %s_name }}' % other, other_name))
            fp_old.close()
            fp_new.close()
            try:
                shutil.copymode(path_old, path_new)
                _make_writeable(path_new)
            except OSError:
                sys.stderr.write(style.NOTICE("Notice: Couldn't set permission bits on %s. You're probably using an uncommon filesystem setup. No problem.\n" % path_new))
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def install_scripts(self, context, path):
        """
        Install scripts into the created environment from a directory.

        :param context: The information for the environment creation request
                        being processed.
        :param path:    Absolute pathname of a directory containing script.
                        Scripts in the 'common' subdirectory of this directory,
                        and those in the directory named for the platform
                        being run on, are installed in the created environment.
                        Placeholder variables are replaced with environment-
                        specific values.
        """
        binpath = context.bin_path
        plen = len(path)
        for root, dirs, files in os.walk(path):
            if root == path: # at top-level, remove irrelevant dirs
                for d in dirs[:]:
                    if d not in ('common', os.name):
                        dirs.remove(d)
                continue # ignore files in top level
            for f in files:
                srcfile = os.path.join(root, f)
                suffix = root[plen:].split(os.sep)[2:]
                if not suffix:
                    dstdir = binpath
                else:
                    dstdir = os.path.join(binpath, *suffix)
                if not os.path.exists(dstdir):
                    os.makedirs(dstdir)
                dstfile = os.path.join(dstdir, f)
                with open(srcfile, 'rb') as f:
                    data = f.read()
                if srcfile.endswith('.exe'):
                    mode = 'wb'
                else:
                    mode = 'w'
                    try:
                        data = data.decode('utf-8')
                        data = self.replace_variables(data, context)
                    except UnicodeDecodeError as e:
                        data = None
                        logger.warning('unable to copy script %r, '
                                       'may be binary: %s', srcfile, e)
                if data is not None:
                    with open(dstfile, mode) as f:
                        f.write(data)
                    shutil.copymode(srcfile, dstfile)