Python shutil 模块,chown() 实例源码

我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用shutil.chown()

项目:py_daemoniker    作者:Muterra    | 项目源码 | 文件源码
def daemote(pid_file, user, group):
    ''' Change gid and uid, dropping privileges.

    Either user or group may explicitly pass None to keep it the same.

    The pid_file will be chown'ed so it can still be cleaned up.
    '''
    if not _SUPPORTED_PLATFORM:
        raise OSError('Daemotion is unsupported on your platform.')

    # No need to do anything special, just chown the pidfile
    # This will also catch any bad group, user names
    shutil.chown(pid_file, user, group)

    # Now update group and then user
    _setgroup(group)
    _setuser(user)
项目:sadm    作者:prologin    | 项目源码 | 文件源码
def install_service_dir(path, owner, mode):
    if not os.path.exists('/var/prologin'):
        mkdir('/var/prologin', mode=0o755, owner='root:root')
    name = os.path.basename(path)  # strip service kind from path (eg. django)
    # Nothing in Python allows merging two directories together...
    # Be careful with rsync(1) arguments: to merge two directories, trailing
    # slash are meaningful.
    system('rsync -rv %s/ /var/prologin/%s' % (path, name))
    user, group = owner.split(':')

    shutil.chown('/var/prologin/%s' % name, user, group)
    os.chmod('/var/prologin/%s' % name, mode)
    for root, dirs, files in os.walk('/var/prologin/%s' % name):
        for dir in dirs:
            shutil.chown(os.path.join(root, dir), user, group)
            os.chmod(os.path.join(root, dir), mode)
        for file in files:
            shutil.chown(os.path.join(root, file), user, group)
            os.chmod(os.path.join(root, file), mode)
项目:xontrib-z    作者:astronouth7303    | 项目源码 | 文件源码
def save_data(self, data):
        # Age data
        if hasattr(data, '__len__') and len(data) > self.GROOM_THRESHOLD:
            for i, e in enumerate(data):
                data[i] = ZEntry(e.path, int(e.rank * self.GROOM_LEVEL), e.time)

        # Use a temporary file to minimize time the file is open and minimize clobbering
        from tempfile import NamedTemporaryFile
        with NamedTemporaryFile('wt', encoding=sys.getfilesystemencoding()) as f:
            for e in data:
                f.write("{}|{}|{}\n".format(e.path, int(e.rank), int(e.time.timestamp())))
            f.flush()

            if self.Z_OWNER:
                shutil.chown(f.name, user=self.Z_OWNER)

            # On POSIX, rename() is atomic and will clobber
            # On Windows, neither of these is true, so remove first.
            from xonsh.platform import ON_WINDOWS
            if ON_WINDOWS and os.path.exists(self.Z_DATA):
                os.remove(self.Z_DATA)
            shutil.copy(f.name, self.Z_DATA)
项目:apex    作者:opnfv    | 项目源码 | 文件源码
def setup_volumes(self):
        for img_file in ('overcloud-full.vmlinuz', 'overcloud-full.initrd',
                         self.image_name):
            src_img = os.path.join(self.image_path, img_file)
            if img_file == self.image_name:
                dest_img = os.path.join(constants.LIBVIRT_VOLUME_PATH,
                                        'undercloud.qcow2')
            else:
                dest_img = os.path.join(constants.LIBVIRT_VOLUME_PATH,
                                        img_file)
            if not os.path.isfile(src_img):
                raise ApexUndercloudException(
                    "Required source file does not exist:{}".format(src_img))
            if os.path.exists(dest_img):
                os.remove(dest_img)
            shutil.copyfile(src_img, dest_img)
            shutil.chown(dest_img, user='qemu', group='qemu')
            os.chmod(dest_img, 0o0744)
        # TODO(trozet):check if resize needed right now size is 50gb
        # there is a lib called vminspect which has some dependencies and is
        # not yet available in pip.  Consider switching to this lib later.
项目:apex    作者:opnfv    | 项目源码 | 文件源码
def inject_auth(self):
        virt_ops = list()
        # virt-customize keys/pws
        if self.root_pw:
            pw_op = "password:{}".format(self.root_pw)
            virt_ops.append({constants.VIRT_PW: pw_op})
        # ssh key setup
        virt_ops.append({constants.VIRT_RUN_CMD:
                        'mkdir -p /root/.ssh'})
        virt_ops.append({constants.VIRT_UPLOAD:
                         '/root/.ssh/id_rsa.pub:/root/.ssh/authorized_keys'})
        run_cmds = [
            'chmod 600 /root/.ssh/authorized_keys',
            'restorecon /root/.ssh/authorized_keys',
            'cp /root/.ssh/authorized_keys /home/stack/.ssh/',
            'chown stack:stack /home/stack/.ssh/authorized_keys',
            'chmod 600 /home/stack/.ssh/authorized_keys'
        ]
        for cmd in run_cmds:
            virt_ops.append({constants.VIRT_RUN_CMD: cmd})
        virt_utils.virt_customize(virt_ops, self.volume)
项目:maas    作者:maas    | 项目源码 | 文件源码
def init_db():
    """Initialize the database."""
    config_data = get_config_data()
    db_path = os.path.join(os.environ['SNAP_COMMON'], 'db')
    if os.path.exists(db_path):
        shutil.rmtree(db_path)
    os.mkdir(db_path)
    shutil.chown(db_path, user='nobody', group='nogroup')
    log_path = os.path.join(os.environ['SNAP_COMMON'], 'log', 'postgresql.log')
    if not os.path.exists(log_path):
        open(log_path, 'a').close()
    shutil.chown(log_path, user='nobody', group='nogroup')

    def _init_db():
        subprocess.check_output([
            os.path.join(os.environ['SNAP'], 'bin', 'initdb'),
            '-D', os.path.join(os.environ['SNAP_COMMON'], 'db'),
            '-U', 'postgres', '-E', 'UTF8', '--locale=C'],
            stderr=subprocess.STDOUT)
        with with_postgresql():
            create_db(config_data)
    run_with_drop_privileges(_init_db)
项目:ahenk    作者:Pardus-LiderAhenk    | 项目源码 | 文件源码
def change_owner(full_path, user_name=None, group_name=None):
        try:
            shutil.chown(full_path, user_name, group_name)
        except:
            raise
项目:nbpuller    作者:data-8    | 项目源码 | 文件源码
def chown(path, filename):
    """Set owner and group of file to that of the parent directory."""
    s = os.stat(path)
    os.chown(os.path.join(path, filename), s.st_uid, s.st_gid)
项目:nbpuller    作者:data-8    | 项目源码 | 文件源码
def chown_dir(directory, username):
    """Set owner and group of directory to username."""
    shutil.chown(directory, username, username)
    for root, dirs, files in os.walk(directory):
        for child in dirs + files:
            shutil.chown(os.path.join(root, child), username, username)
    logger.info("{} chown'd to {}".format(directory, username))
项目:interact    作者:data-8    | 项目源码 | 文件源码
def chown(path, filename):
    """Set owner and group of file to that of the parent directory."""
    s = os.stat(path)
    os.chown(os.path.join(path, filename), s.st_uid, s.st_gid)
项目:interact    作者:data-8    | 项目源码 | 文件源码
def chown_dir(directory, username):
    """Set owner and group of directory to username."""
    shutil.chown(directory, username, username)
    for root, dirs, files in os.walk(directory):
        for child in dirs + files:
            shutil.chown(os.path.join(root, child), username, username)
    logger.info("{} chown'd to {}".format(directory, username))
项目:axel    作者:craigcabrey    | 项目源码 | 文件源码
def handle_sonarr(torrent):
    for index, file in torrent.files().items():
        file_path = os.path.join(torrent.downloadDir, file['name'])
        if file_path.endswith('rar'):
            with tempfile.TemporaryDirectory() as temp_dir:
                paths = extract(file_path, temp_dir)

                if paths:
                    # Move extracted files to sonarr drone factory
                    for path in paths:
                        shutil.chown(path, group=plex_group)
                        os.chmod(path, 0o664)
                        shutil.move(path, drone_factory)
项目:sadm    作者:prologin    | 项目源码 | 文件源码
def mkdir(path, mode, owner='root:root'):
    if os.path.exists(path):
        os.chmod(path, mode)
    else:
        os.mkdir(path, mode)
    user, group = owner.split(':')
    shutil.chown(path, user, group)
项目:sadm    作者:prologin    | 项目源码 | 文件源码
def copy(old, new, mode=0o600, owner='root:root'):
    print('Copying %s -> %s (mode: %o) (own: %s)' % (old, new, mode, owner))
    shutil.copy(old, new)
    os.chmod(new, mode)
    user, group = owner.split(':')
    shutil.chown(new, user, group)
项目:sadm    作者:prologin    | 项目源码 | 文件源码
def copytree(old, new, dir_mode=0o700, file_mode=0o600, owner='root:root'):
    print('Copying %s -> %s (file mode: %o) (dir mode: %o) (own: %s)' % (old, new, file_mode, dir_mode, owner))
    shutil.copytree(old, new)
    user, group = owner.split(':')
    for root, dirs, files in os.walk(new):
        for momo in dirs:
            path = os.path.join(root, momo)
            os.chmod(path, dir_mode)
            shutil.chown(path, user, group)
        for momo in files:
            path = os.path.join(root, momo)
            os.chmod(path, file_mode)
            shutil.chown(path, user, group)
项目:sadm    作者:prologin    | 项目源码 | 文件源码
def touch(path, mode=0o600, owner='root:root'):
    print('Touching %s' % path)
    with open(path, 'a'):
        os.utime(path, None)
    os.chmod(path, mode)
    user, group = owner.split(':')
    shutil.chown(path, user, group)
项目:edi    作者:lueschem    | 项目源码 | 文件源码
def chown_to_user(path):
    shutil.chown(path, get_user_uid(), get_user_gid())
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_module_all_attribute(self):
        self.assertTrue(hasattr(shutil, '__all__'))
        target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat',
                      'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error',
                      'SpecialFileError', 'ExecError', 'make_archive',
                      'get_archive_formats', 'register_archive_format',
                      'unregister_archive_format', 'get_unpack_formats',
                      'register_unpack_format', 'unregister_unpack_format',
                      'unpack_archive', 'ignore_patterns', 'chown', 'which',
                      'get_terminal_size', 'SameFileError']
        if hasattr(os, 'statvfs') or os.name == 'nt':
            target_api.append('disk_usage')
        self.assertEqual(set(shutil.__all__), set(target_api))
项目:indy-plenum    作者:hyperledger    | 项目源码 | 文件源码
def changeOwnerAndGrpToLoggedInUser(directory, raiseEx=False):
    loggedInUser = getLoggedInUser()
    try:
        shutil.chown(directory, loggedInUser, loggedInUser)
    except Exception as e:
        if raiseEx:
            raise e
        else:
            pass
项目:tm-manifesting    作者:FabricAttachedMemory    | 项目源码 | 文件源码
def chgrp(fname, grpname):
    try:
        shutil.chown(fname, group=grpname)
    except OSError as e:
        raise RuntimeError('chown(%s) failed: %s' % (fname, str(e)))
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_chown(self):

        # cleaned-up automatically by TestShutil.tearDown method
        dirname = self.mkdtemp()
        filename = tempfile.mktemp(dir=dirname)
        write_file(filename, 'testing chown function')

        with self.assertRaises(ValueError):
            shutil.chown(filename)

        with self.assertRaises(LookupError):
            shutil.chown(filename, user='non-exising username')

        with self.assertRaises(LookupError):
            shutil.chown(filename, group='non-exising groupname')

        with self.assertRaises(TypeError):
            shutil.chown(filename, b'spam')

        with self.assertRaises(TypeError):
            shutil.chown(filename, 3.14)

        uid = os.getuid()
        gid = os.getgid()

        def check_chown(path, uid=None, gid=None):
            s = os.stat(filename)
            if uid is not None:
                self.assertEqual(uid, s.st_uid)
            if gid is not None:
                self.assertEqual(gid, s.st_gid)

        shutil.chown(filename, uid, gid)
        check_chown(filename, uid, gid)
        shutil.chown(filename, uid)
        check_chown(filename, uid)
        shutil.chown(filename, user=uid)
        check_chown(filename, uid)
        shutil.chown(filename, group=gid)
        check_chown(filename, gid=gid)

        shutil.chown(dirname, uid, gid)
        check_chown(dirname, uid, gid)
        shutil.chown(dirname, uid)
        check_chown(dirname, uid)
        shutil.chown(dirname, user=uid)
        check_chown(dirname, uid)
        shutil.chown(dirname, group=gid)
        check_chown(dirname, gid=gid)

        user = pwd.getpwuid(uid)[0]
        group = grp.getgrgid(gid)[0]
        shutil.chown(filename, user, group)
        check_chown(filename, uid, gid)
        shutil.chown(dirname, user, group)
        check_chown(dirname, uid, gid)
项目:axel    作者:craigcabrey    | 项目源码 | 文件源码
def handle_manual(torrent):
    auto_processed = False

    def handle_media(path, move):
        nonlocal auto_processed

        guess = guessit.guessit(path)

        if guess['type'] == 'episode':
            move_episode(path, guess, move)
            auto_processed = True
        elif guess['type'] == 'movie':
            move_movie(path, guess, move)
            auto_processed = True

    part_regex = re.compile('.*part(\d+).rar', re.IGNORECASE)
    for index, file in torrent.files().items():
        file_path = os.path.join(torrent.downloadDir, file['name'])
        if check_extension(file_path) and 'sample' not in file_path.lower():
            # Log and ignore mkv files of less than ~92MiB
            try:
                if os.path.getsize(file_path) >= 96811278:
                    handle_media(file_path, False)
                else:
                    syslog.syslog(
                        syslog.LOG_ERR, 'Detected false media file, skipping'
                    )
            except FileNotFoundError:
                syslog.syslog(syslog.LOG_ERR, 'Torrent file missing, skipping')
        elif file_path.endswith('rar'):
            # Ignore parts beyond the first in a rar series
            match = part_regex.match(file_path)
            if match and int(match.group(1)) > 1:
                continue

            with tempfile.TemporaryDirectory() as temp_dir:
                paths = extract(file_path, temp_dir)

                if paths:
                    for path in paths:
                        shutil.chown(path, group=plex_group)
                        os.chmod(path, 0o664)

                        handle_media(path, True)

    if auto_processed:
        pb_notify(
            textwrap.dedent(
                '''
                    Manually added torrent {0} finished downloading
                    and was auto-processed
                '''.format(torrent.name)
            ).strip()
        )
    else:
        pb_notify(
            'Manually added torrent {0} finished downloading'.format(
                torrent.name
            )
        )
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_chown(self):

        # cleaned-up automatically by TestShutil.tearDown method
        dirname = self.mkdtemp()
        filename = tempfile.mktemp(dir=dirname)
        write_file(filename, 'testing chown function')

        with self.assertRaises(ValueError):
            shutil.chown(filename)

        with self.assertRaises(LookupError):
            shutil.chown(filename, user='non-exising username')

        with self.assertRaises(LookupError):
            shutil.chown(filename, group='non-exising groupname')

        with self.assertRaises(TypeError):
            shutil.chown(filename, b'spam')

        with self.assertRaises(TypeError):
            shutil.chown(filename, 3.14)

        uid = os.getuid()
        gid = os.getgid()

        def check_chown(path, uid=None, gid=None):
            s = os.stat(filename)
            if uid is not None:
                self.assertEqual(uid, s.st_uid)
            if gid is not None:
                self.assertEqual(gid, s.st_gid)

        shutil.chown(filename, uid, gid)
        check_chown(filename, uid, gid)
        shutil.chown(filename, uid)
        check_chown(filename, uid)
        shutil.chown(filename, user=uid)
        check_chown(filename, uid)
        shutil.chown(filename, group=gid)
        check_chown(filename, gid=gid)

        shutil.chown(dirname, uid, gid)
        check_chown(dirname, uid, gid)
        shutil.chown(dirname, uid)
        check_chown(dirname, uid)
        shutil.chown(dirname, user=uid)
        check_chown(dirname, uid)
        shutil.chown(dirname, group=gid)
        check_chown(dirname, gid=gid)

        user = pwd.getpwuid(uid)[0]
        group = grp.getgrgid(gid)[0]
        shutil.chown(filename, user, group)
        check_chown(filename, uid, gid)
        shutil.chown(dirname, user, group)
        check_chown(dirname, uid, gid)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_chown(self):

        # cleaned-up automatically by TestShutil.tearDown method
        dirname = self.mkdtemp()
        filename = tempfile.mktemp(dir=dirname)
        write_file(filename, 'testing chown function')

        with self.assertRaises(ValueError):
            shutil.chown(filename)

        with self.assertRaises(LookupError):
            shutil.chown(filename, user='non-exising username')

        with self.assertRaises(LookupError):
            shutil.chown(filename, group='non-exising groupname')

        with self.assertRaises(TypeError):
            shutil.chown(filename, b'spam')

        with self.assertRaises(TypeError):
            shutil.chown(filename, 3.14)

        uid = os.getuid()
        gid = os.getgid()

        def check_chown(path, uid=None, gid=None):
            s = os.stat(filename)
            if uid is not None:
                self.assertEqual(uid, s.st_uid)
            if gid is not None:
                self.assertEqual(gid, s.st_gid)

        shutil.chown(filename, uid, gid)
        check_chown(filename, uid, gid)
        shutil.chown(filename, uid)
        check_chown(filename, uid)
        shutil.chown(filename, user=uid)
        check_chown(filename, uid)
        shutil.chown(filename, group=gid)
        check_chown(filename, gid=gid)

        shutil.chown(dirname, uid, gid)
        check_chown(dirname, uid, gid)
        shutil.chown(dirname, uid)
        check_chown(dirname, uid)
        shutil.chown(dirname, user=uid)
        check_chown(dirname, uid)
        shutil.chown(dirname, group=gid)
        check_chown(dirname, gid=gid)

        user = pwd.getpwuid(uid)[0]
        group = grp.getgrgid(gid)[0]
        shutil.chown(filename, user, group)
        check_chown(filename, uid, gid)
        shutil.chown(dirname, user, group)
        check_chown(dirname, uid, gid)