Python os 模块,chmod() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用os.chmod()

项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
    """Create a directory"""
    log("Making dir {} {}:{} {:o}".format(path, owner, group,
                                          perms))
    uid = pwd.getpwnam(owner).pw_uid
    gid = grp.getgrnam(group).gr_gid
    realpath = os.path.abspath(path)
    path_exists = os.path.exists(realpath)
    if path_exists and force:
        if not os.path.isdir(realpath):
            log("Removing non-directory file {} prior to mkdir()".format(path))
            os.unlink(realpath)
            os.makedirs(realpath, perms)
    elif not path_exists:
        os.makedirs(realpath, perms)
    os.chown(realpath, uid, gid)
    os.chmod(realpath, perms)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def postprocess(self, tempname, filename):
        """Perform any platform-specific postprocessing of `tempname`

        This is where Mac header rewrites should be done; other platforms don't
        have anything special they should do.

        Resource providers should call this method ONLY after successfully
        extracting a compressed resource.  They must NOT call it on resources
        that are already in the filesystem.

        `tempname` is the current (temporary) name of the file, and `filename`
        is the name it will be renamed to by the caller after this routine
        returns.
        """

        if os.name == 'posix':
            # Make the resource executable
            mode = ((os.stat(tempname).st_mode) | 0o555) & 0o7777
            os.chmod(tempname, mode)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def test_zipfile_attributes():
    # With the change from ZipFile.write() to .writestr(), we need to manually
    # set member attributes.
    with temporary_directory() as tempdir:
        files = (('foo', 0o644), ('bar', 0o755))
        for filename, mode in files:
            path = os.path.join(tempdir, filename)
            with codecs.open(path, 'w', encoding='utf-8') as fp:
                fp.write(filename + '\n')
            os.chmod(path, mode)
        zip_base_name = os.path.join(tempdir, 'dummy')
        zip_filename = wheel.archive.make_wheelfile_inner(
            zip_base_name, tempdir)
        with readable_zipfile(zip_filename) as zf:
            for filename, mode in files:
                info = zf.getinfo(os.path.join(tempdir, filename))
                assert info.external_attr == (mode | 0o100000) << 16
                assert info.compress_type == zipfile.ZIP_DEFLATED
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def postprocess(self, tempname, filename):
        """Perform any platform-specific postprocessing of `tempname`

        This is where Mac header rewrites should be done; other platforms don't
        have anything special they should do.

        Resource providers should call this method ONLY after successfully
        extracting a compressed resource.  They must NOT call it on resources
        that are already in the filesystem.

        `tempname` is the current (temporary) name of the file, and `filename`
        is the name it will be renamed to by the caller after this routine
        returns.
        """

        if os.name == 'posix':
            # Make the resource executable
            mode = ((os.stat(tempname).st_mode) | 0o555) & 0o7777
            os.chmod(tempname, mode)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def copymode(src, dst, *, follow_symlinks=True):
    """Copy mode bits from src to dst.

    If follow_symlinks is not set, symlinks aren't followed if and only
    if both `src` and `dst` are symlinks.  If `lchmod` isn't available
    (e.g. Linux) this method does nothing.

    """
    if not follow_symlinks and os.path.islink(src) and os.path.islink(dst):
        if hasattr(os, 'lchmod'):
            stat_func, chmod_func = os.lstat, os.lchmod
        else:
            return
    elif hasattr(os, 'chmod'):
        stat_func, chmod_func = os.stat, os.chmod
    else:
        return

    st = stat_func(src)
    chmod_func(dst, stat.S_IMODE(st.st_mode))
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def postprocess(self, tempname, filename):
        """Perform any platform-specific postprocessing of `tempname`

        This is where Mac header rewrites should be done; other platforms don't
        have anything special they should do.

        Resource providers should call this method ONLY after successfully
        extracting a compressed resource.  They must NOT call it on resources
        that are already in the filesystem.

        `tempname` is the current (temporary) name of the file, and `filename`
        is the name it will be renamed to by the caller after this routine
        returns.
        """

        if os.name == 'posix':
            # Make the resource executable
            mode = ((os.stat(tempname).st_mode) | 0o555) & 0o7777
            os.chmod(tempname, mode)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def postprocess(self, tempname, filename):
        """Perform any platform-specific postprocessing of `tempname`

        This is where Mac header rewrites should be done; other platforms don't
        have anything special they should do.

        Resource providers should call this method ONLY after successfully
        extracting a compressed resource.  They must NOT call it on resources
        that are already in the filesystem.

        `tempname` is the current (temporary) name of the file, and `filename`
        is the name it will be renamed to by the caller after this routine
        returns.
        """

        if os.name == 'posix':
            # Make the resource executable
            mode = ((os.stat(tempname).st_mode) | 0o555) & 0o7777
            os.chmod(tempname, mode)
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def _test_NoAccessDir(self, nodeName):
        devBooter, devMgr = self.launchDeviceManager("/nodes/%s/DeviceManager.dcd.xml" % nodeName)
        device = devMgr._get_registeredDevices()[0]
        fileMgr = self._domMgr._get_fileMgr()

        dirname = '/noaccess'
        testdir = os.path.join(scatest.getSdrPath(), 'dom' + dirname)
        if not os.path.exists(testdir):
            os.mkdir(testdir, 0000)
        else:
            os.chmod(testdir, 0000)

        try:
            self.assertFalse(os.access(testdir, os.R_OK|os.X_OK), 'Current user can still access directory')
            self.assertRaises(CF.LoadableDevice.LoadFail, device.load, fileMgr, dirname, CF.LoadableDevice.SHARED_LIBRARY)                                                                                                                  
        finally:
            os.rmdir(testdir)
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def test_ExistsException(self):
        self.assertNotEqual(self._domMgr, None)
        fileMgr = self._domMgr._get_fileMgr()

        # Makes sure that FileSystem::exists() throws correct exception and
        # doesn't kill domain for files in directories it cannot access
        dirname = '/noaccess'
        testdir = os.path.join(scatest.getSdrPath(), 'dom' + dirname)
        if not os.path.exists(testdir):
            os.mkdir(testdir, 0644)
        else:
            os.chmod(testdir, 0644)

        try:
            self.assertFalse(os.access(testdir, os.R_OK|os.X_OK), 'Current user can still access directory')
            self.assertRaises(CF.InvalidFileName, fileMgr.exists, os.path.join(dirname, 'testfile'))
        finally:
            os.rmdir(testdir)
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
    """Create a directory"""
    log("Making dir {} {}:{} {:o}".format(path, owner, group,
                                          perms))
    uid = pwd.getpwnam(owner).pw_uid
    gid = grp.getgrnam(group).gr_gid
    realpath = os.path.abspath(path)
    path_exists = os.path.exists(realpath)
    if path_exists and force:
        if not os.path.isdir(realpath):
            log("Removing non-directory file {} prior to mkdir()".format(path))
            os.unlink(realpath)
            os.makedirs(realpath, perms)
    elif not path_exists:
        os.makedirs(realpath, perms)
    os.chown(realpath, uid, gid)
    os.chmod(realpath, perms)
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
    """Create a directory"""
    log("Making dir {} {}:{} {:o}".format(path, owner, group,
                                          perms))
    uid = pwd.getpwnam(owner).pw_uid
    gid = grp.getgrnam(group).gr_gid
    realpath = os.path.abspath(path)
    path_exists = os.path.exists(realpath)
    if path_exists and force:
        if not os.path.isdir(realpath):
            log("Removing non-directory file {} prior to mkdir()".format(path))
            os.unlink(realpath)
            os.makedirs(realpath, perms)
    elif not path_exists:
        os.makedirs(realpath, perms)
    os.chown(realpath, uid, gid)
    os.chmod(realpath, perms)
项目:django-zerodowntime    作者:rentlytics    | 项目源码 | 文件源码
def handle(self, *args, **options):
        commit_msg_path = os.path.join(self.HOOK_PATH, 'commit-msg')

        hook_exists = os.path.exists(commit_msg_path)

        if hook_exists:
            with open(commit_msg_path, 'r') as fp:
                hook_content = fp.read()
        else:
            hook_content = '#!/usr/bin/env bash\n\n'

        if 'ZERODOWNTIME_COMMIT_MSG_HOOK' not in hook_content:
            hook_content += COMMIT_MSG_HOOK

            with open(commit_msg_path, 'w') as fp:
                fp.write(hook_content)

            st = os.stat(commit_msg_path)
            os.chmod(commit_msg_path, st.st_mode | stat.S_IEXEC)
项目:mongoaudit    作者:Exploit-install    | 项目源码 | 文件源码
def _clean_upgrade(binary_ok, binary_path, path, temp_path):
    if binary_ok:
        import stat
        # save the permissions from the current binary
        old_stat = os.stat(binary_path)
        # rename the current binary in order to replace it with the latest
        os.rename(binary_path, path + "/old")
        os.rename(temp_path, binary_path)
        # set the same permissions that had the previous binary
        os.chmod(binary_path, old_stat.st_mode | stat.S_IEXEC)
        # delete the old binary
        os.remove(path + "/old")
        print("mongoaudit updated, restarting...")
        os.execl(binary_path, binary_path, *sys.argv)
    else:
        os.remove(temp_path)
        print("couldn't download the latest binary")
项目:PyWebRunner    作者:IntuitiveWebSolutions    | 项目源码 | 文件源码
def download_driver_file(whichbin, url, base_path):
    if url.endswith('.tar.gz'):
        ext = '.tar.gz'
    else:
        ext = '.zip'
    print("Downloading from: {}".format(url))
    download_file(url, '/tmp/pwr_temp{}'.format(ext))
    if ext == '.tar.gz':
        import tarfile
        tar = tarfile.open('/tmp/pwr_temp{}'.format(ext), "r:gz")
        tar.extractall('{}/'.format(base_path))
        tar.close()
    else:
        import zipfile
        with zipfile.ZipFile('/tmp/pwr_temp{}'.format(ext), "r") as z:
            z.extractall('{}/'.format(base_path))

    # if whichbin == 'wires' and '/v{}/'.format(latest_gecko_driver) in url:
    #     os.rename('{}/geckodriver'.format(base_path),
    #               '{}/wires'.format(base_path))
    #     os.chmod('{}/wires'.format(base_path), 0o775)
    if whichbin == 'wires':
        os.chmod('{}/geckodriver'.format(base_path), 0o775)
    else:
        os.chmod('{}/chromedriver'.format(base_path), 0o775)
项目:myDotFiles    作者:GuidoFe    | 项目源码 | 文件源码
def execute(self):
        mode = self.rest(1)
        if not mode:
            mode = str(self.quantifier)

        try:
            mode = int(mode, 8)
            if mode < 0 or mode > 0o777:
                raise ValueError
        except ValueError:
            self.fm.notify("Need an octal number between 0 and 777!", bad=True)
            return

        for file in self.fm.thistab.get_selection():
            try:
                os.chmod(file.path, mode)
            except Exception as ex:
                self.fm.notify(ex)

        try:
            # reloading directory.  maybe its better to reload the selected
            # files only.
            self.fm.thisdir.load_content()
        except Exception:
            pass
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def postprocess(self, tempname, filename):
        """Perform any platform-specific postprocessing of `tempname`

        This is where Mac header rewrites should be done; other platforms don't
        have anything special they should do.

        Resource providers should call this method ONLY after successfully
        extracting a compressed resource.  They must NOT call it on resources
        that are already in the filesystem.

        `tempname` is the current (temporary) name of the file, and `filename`
        is the name it will be renamed to by the caller after this routine
        returns.
        """

        if os.name == 'posix':
            # Make the resource executable
            mode = ((os.stat(tempname).st_mode) | 0x16D) & 0xFFF # 0555, 07777
            os.chmod(tempname, mode)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def postprocess(self, tempname, filename):
        """Perform any platform-specific postprocessing of `tempname`

        This is where Mac header rewrites should be done; other platforms don't
        have anything special they should do.

        Resource providers should call this method ONLY after successfully
        extracting a compressed resource.  They must NOT call it on resources
        that are already in the filesystem.

        `tempname` is the current (temporary) name of the file, and `filename`
        is the name it will be renamed to by the caller after this routine
        returns.
        """

        if os.name == 'posix':
            # Make the resource executable
            mode = ((os.stat(tempname).st_mode) | 0x16D) & 0xFFF # 0555, 07777
            os.chmod(tempname, mode)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def rmtree_errorhandler(func, path, exc_info):
    """On Windows, the files in .svn are read-only, so when rmtree() tries to
    remove them, an exception is thrown.  We catch that here, remove the
    read-only attribute, and hopefully continue without problems."""
    exctype, value = exc_info[:2]
    if not ((exctype is WindowsError and value.args[0] == 5) or #others
            (exctype is OSError and value.args[0] == 13) or #python2.4
            (exctype is PermissionError and value.args[3] == 5) #python3.3
            ):
        raise
    # file type should currently be read only
    if ((os.stat(path).st_mode & stat.S_IREAD) != stat.S_IREAD):
        raise
    # convert to read/write
    os.chmod(path, stat.S_IWRITE)
    # use the original function to repeat the operation
    func(path)
项目:pip-update-requirements    作者:alanhamlett    | 项目源码 | 文件源码
def postprocess(self, tempname, filename):
        """Perform any platform-specific postprocessing of `tempname`

        This is where Mac header rewrites should be done; other platforms don't
        have anything special they should do.

        Resource providers should call this method ONLY after successfully
        extracting a compressed resource.  They must NOT call it on resources
        that are already in the filesystem.

        `tempname` is the current (temporary) name of the file, and `filename`
        is the name it will be renamed to by the caller after this routine
        returns.
        """

        if os.name == 'posix':
            # Make the resource executable
            mode = ((os.stat(tempname).st_mode) | 0o555) & 0o7777
            os.chmod(tempname, mode)
项目:doctr    作者:drdoctr    | 项目源码 | 文件源码
def decrypt_file(file, key):
    """
    Decrypts the file ``file``.

    The encrypted file is assumed to end with the ``.enc`` extension. The
    decrypted file is saved to the same location without the ``.enc``
    extension.

    The permissions on the decrypted file are automatically set to 0o600.

    See also :func:`doctr.local.encrypt_file`.

    """
    if not file.endswith('.enc'):
        raise ValueError("%s does not end with .enc" % file)

    fer = Fernet(key)

    with open(file, 'rb') as f:
        decrypted_file = fer.decrypt(f.read())

    with open(file[:-4], 'wb') as f:
        f.write(decrypted_file)

    os.chmod(file[:-4], 0o600)
项目:ssbio    作者:SBRG    | 项目源码 | 文件源码
def write_torque_script(command, outfile, walltime, queue, name, out, err, print_exec=True):

    with open(outfile, 'w') as script:
        script.write('#PBS -l walltime={}\n'.format(walltime))
        script.write('#PBS -q regular\n')
        script.write('#PBS -N {}\n'.format(name))
        script.write('#PBS -o {}.o\n'.format(out))
        script.write('#PBS -e {}.e\n'.format(err))
        script.write('cd ${PBS_O_WORKDIR}\n')
        script.write(command)

    os.chmod(outfile, 0o755)

    if print_exec:
        print('qsub {};'.format(outfile))

    return outfile
项目:ssbio    作者:SBRG    | 项目源码 | 文件源码
def make_run_all_script(wd):
    run_all = """#!/bin/bash

for i in *.pdb; do
    echo "Running ${i}..."
    seq=`echo ${i}  | cut -d. -f1`
    curr_dir=`pwd`

    cp ${i} temp.pdb

    tleap -f leaprc

    rm temp.pdb
    mv temp_mod.pdb xleap_modified/${seq}_xleap.pdb
    mv temp_mod.prmtop amber_minimized/${seq}.prmtop
    mv temp_mod.inpcrd amber_minimized/${seq}.inpcrd

done
"""
    my_file = op.join(wd, 'run_all.sh')
    with open(my_file, 'w') as f:
        f.write(run_all)
    os.chmod(my_file, 0o755)
    return my_file
项目:charm-heat    作者:openstack    | 项目源码 | 文件源码
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
    """Create a directory"""
    log("Making dir {} {}:{} {:o}".format(path, owner, group,
                                          perms))
    uid = pwd.getpwnam(owner).pw_uid
    gid = grp.getgrnam(group).gr_gid
    realpath = os.path.abspath(path)
    path_exists = os.path.exists(realpath)
    if path_exists and force:
        if not os.path.isdir(realpath):
            log("Removing non-directory file {} prior to mkdir()".format(path))
            os.unlink(realpath)
            os.makedirs(realpath, perms)
    elif not path_exists:
        os.makedirs(realpath, perms)
    os.chown(realpath, uid, gid)
    os.chmod(realpath, perms)
项目:charm-heat    作者:openstack    | 项目源码 | 文件源码
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
    """Create a directory"""
    log("Making dir {} {}:{} {:o}".format(path, owner, group,
                                          perms))
    uid = pwd.getpwnam(owner).pw_uid
    gid = grp.getgrnam(group).gr_gid
    realpath = os.path.abspath(path)
    path_exists = os.path.exists(realpath)
    if path_exists and force:
        if not os.path.isdir(realpath):
            log("Removing non-directory file {} prior to mkdir()".format(path))
            os.unlink(realpath)
            os.makedirs(realpath, perms)
    elif not path_exists:
        os.makedirs(realpath, perms)
    os.chown(realpath, uid, gid)
    os.chmod(realpath, perms)
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
    """Create a directory"""
    log("Making dir {} {}:{} {:o}".format(path, owner, group,
                                          perms))
    uid = pwd.getpwnam(owner).pw_uid
    gid = grp.getgrnam(group).gr_gid
    realpath = os.path.abspath(path)
    path_exists = os.path.exists(realpath)
    if path_exists and force:
        if not os.path.isdir(realpath):
            log("Removing non-directory file {} prior to mkdir()".format(path))
            os.unlink(realpath)
            os.makedirs(realpath, perms)
    elif not path_exists:
        os.makedirs(realpath, perms)
    os.chown(realpath, uid, gid)
    os.chmod(realpath, perms)
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
    """Create a directory"""
    log("Making dir {} {}:{} {:o}".format(path, owner, group,
                                          perms))
    uid = pwd.getpwnam(owner).pw_uid
    gid = grp.getgrnam(group).gr_gid
    realpath = os.path.abspath(path)
    path_exists = os.path.exists(realpath)
    if path_exists and force:
        if not os.path.isdir(realpath):
            log("Removing non-directory file {} prior to mkdir()".format(path))
            os.unlink(realpath)
            os.makedirs(realpath, perms)
    elif not path_exists:
        os.makedirs(realpath, perms)
    os.chown(realpath, uid, gid)
    os.chmod(realpath, perms)
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
    """Create a directory"""
    log("Making dir {} {}:{} {:o}".format(path, owner, group,
                                          perms))
    uid = pwd.getpwnam(owner).pw_uid
    gid = grp.getgrnam(group).gr_gid
    realpath = os.path.abspath(path)
    path_exists = os.path.exists(realpath)
    if path_exists and force:
        if not os.path.isdir(realpath):
            log("Removing non-directory file {} prior to mkdir()".format(path))
            os.unlink(realpath)
            os.makedirs(realpath, perms)
    elif not path_exists:
        os.makedirs(realpath, perms)
    os.chown(realpath, uid, gid)
    os.chmod(realpath, perms)
项目:CaptureBaits    作者:WhosMyName    | 项目源码 | 文件源码
def get_stream(playlist, directory, own_name):
    modellist.append(str(own_name))
    start_time = "_" + str(datetime.datetime.now().hour) + "-" + str(datetime.datetime.now().minute)  
    ffs_script = directory + "ts_to_mp4.sh"
    merged_file = encoded + own_name + start_time + ".mp4"
    stream_file = directory + own_name + start_time + ".ts"
    with open(ffs_script, "a+", encoding="utf8") as ffs:
        ffs.write("\nffmpeg -i " + stream_file + " -strict -2 -c:v copy " + merged_file + "\n")
        ffs.write("chmod 666 " + merged_file + "\n")
    os.chmod(ffs_script, 0o777)
    with open(oneclick_file, "a+", encoding="utf8") as ocf:
        ocf.write("\nffmpeg -i " + stream_file + " -strict -2 -c:v copy " + merged_file + "\n")
        ocf.write("chmod 666 " + merged_file + "\n")
    os.chmod(oneclick_file, 0o777)
    hlsvar = "hlsvariant://" + playlist
    print("Retrieving the Streamfile for " + str(own_name))
    baitlist_file = cwd + "baitlist.txt"
    with open(baitlist_file, "a+", encoding="utf8") as bl:
        bl.write(own_name + "\n")
    if not os.path.isfile(stream_file):
        subprocess.check_call(["livestreamer", "--hls-segment-threads", str(numcpucores), "--retry-streams", "5", "--retry-open", "5", "--hls-segment-attempts", "5", "--hls-segment-timeout", "20", "--http-header", "User-Agent=Mozilla/5.0 (X11; Fedora; Linux x86_64; rv:50.0) Gecko/20100101 Firefox/50.0", "-o",  stream_file , hlsvar, "best"])
        os.chmod(stream_file, 0o666)
    modellist.remove(str(own_name))
    return
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def apply_copy(self):
    Utils.def_attrs(self, fun=copy_func)
    self.default_install_path = 0

    lst = self.to_list(self.source)
    self.meths.remove('process_source')

    for filename in lst:
        node = self.path.find_resource(filename)
        if not node: raise Errors.WafError('cannot find input file %s for processing' % filename)

        target = self.target
        if not target or len(lst)>1: target = node.name

        # TODO the file path may be incorrect
        newnode = self.path.find_or_declare(target)

        tsk = self.create_task('copy', node, newnode)
        tsk.fun = self.fun
        tsk.chmod = getattr(self, 'chmod', Utils.O644)

        if not tsk.env:
            tsk.debug()
            raise Errors.WafError('task without an environment')
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def apply_copy(self):
    Utils.def_attrs(self, fun=copy_func)
    self.default_install_path = 0

    lst = self.to_list(self.source)
    self.meths.remove('process_source')

    for filename in lst:
        node = self.path.find_resource(filename)
        if not node: raise Errors.WafError('cannot find input file %s for processing' % filename)

        target = self.target
        if not target or len(lst)>1: target = node.name

        # TODO the file path may be incorrect
        newnode = self.path.find_or_declare(target)

        tsk = self.create_task('copy', node, newnode)
        tsk.fun = self.fun
        tsk.chmod = getattr(self, 'chmod', Utils.O644)

        if not tsk.env:
            tsk.debug()
            raise Errors.WafError('task without an environment')
项目:rex    作者:shellphish    | 项目源码 | 文件源码
def test_binary(self, enable_randomness=True, times=1, timeout=15):
        """
        Test the binary generated
        """

        # dump the binary code
        pov_binary_filename = tempfile.mktemp(dir='/tmp', prefix='rex-pov-')
        self.dump_binary(filename=pov_binary_filename)
        os.chmod(pov_binary_filename, 0755)

        pov_tester = CGCPovSimulator()
        result = pov_tester.test_binary_pov(
                pov_binary_filename,
                self.crash.binary,
                enable_randomness=enable_randomness,
                timeout=timeout,
                times=times)

        # remove the generated pov
        os.remove(pov_binary_filename)

        return result
项目:spoonybard    作者:notnownikki    | 项目源码 | 文件源码
def upload_file(self, filename, content, mode=None):
        ftp = self.client.open_sftp()
        file = ftp.file(filename, 'w', -1)
        file.write(content)
        file.flush()
        if mode:
            ftp.chmod(self.script_filename, 0o744)
        ftp.close()
项目:spoonybard    作者:notnownikki    | 项目源码 | 文件源码
def run_script(self, script):
        fd, self.tmp_script_filename = tempfile.mkstemp()
        os.close(fd)
        f = open(self.tmp_script_filename, 'w')
        f.write(script)
        f.close()
        os.chmod(self.tmp_script_filename, 0o744)
        p = subprocess.Popen(
            self.tmp_script_filename, shell=True, stdin=subprocess.PIPE,
            stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
        self.process = p
        self.stream = p.stdout
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def extractall(self, path=".", members=None):
        """Extract all members from the archive to the current working
           directory and set owner, modification time and permissions on
           directories afterwards. `path' specifies a different directory
           to extract to. `members' is optional and must be a subset of the
           list returned by getmembers().
        """
        directories = []

        if members is None:
            members = self

        for tarinfo in members:
            if tarinfo.isdir():
                # Extract directories with a safe mode.
                directories.append(tarinfo)
                tarinfo = copy.copy(tarinfo)
                tarinfo.mode = 0o700
            # Do not set_attrs directories, as we will do that further down
            self.extract(tarinfo, path, set_attrs=not tarinfo.isdir())

        # Reverse sort directories.
        directories.sort(key=lambda a: a.name)
        directories.reverse()

        # Set correct owner, mtime and filemode on directories.
        for tarinfo in directories:
            dirpath = os.path.join(path, tarinfo.name)
            try:
                self.chown(tarinfo, dirpath)
                self.utime(tarinfo, dirpath)
                self.chmod(tarinfo, dirpath)
            except ExtractError as e:
                if self.errorlevel > 1:
                    raise
                else:
                    self._dbg(1, "tarfile: %s" % e)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def chmod(self, tarinfo, targetpath):
        """Set file permissions of targetpath according to tarinfo.
        """
        if hasattr(os, 'chmod'):
            try:
                os.chmod(targetpath, tarinfo.mode)
            except EnvironmentError as e:
                raise ExtractError("could not change mode")
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def copymode(src, dst):
    """Copy mode bits from src to dst"""
    if hasattr(os, 'chmod'):
        st = os.stat(src)
        mode = stat.S_IMODE(st.st_mode)
        os.chmod(dst, mode)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def copystat(src, dst):
    """Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""
    st = os.stat(src)
    mode = stat.S_IMODE(st.st_mode)
    if hasattr(os, 'utime'):
        os.utime(dst, (st.st_atime, st.st_mtime))
    if hasattr(os, 'chmod'):
        os.chmod(dst, mode)
    if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
        try:
            os.chflags(dst, st.st_flags)
        except OSError as why:
            if (not hasattr(errno, 'EOPNOTSUPP') or
                why.errno != errno.EOPNOTSUPP):
                raise
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def rmtree_errorhandler(func, path, exc_info):
    """On Windows, the files in .svn are read-only, so when rmtree() tries to
    remove them, an exception is thrown.  We catch that here, remove the
    read-only attribute, and hopefully continue without problems."""
    # if file type currently read only
    if os.stat(path).st_mode & stat.S_IREAD:
        # convert to read/write
        os.chmod(path, stat.S_IWRITE)
        # use the original function to repeat the operation
        func(path)
        return
    else:
        raise
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def unzip_file(filename, location, flatten=True):
    """
    Unzip the file (with path `filename`) to the destination `location`.  All
    files are written based on system defaults and umask (i.e. permissions are
    not preserved), except that regular file members with any execute
    permissions (user, group, or world) have "chmod +x" applied after being
    written. Note that for windows, any execute changes using os.chmod are
    no-ops per the python docs.
    """
    ensure_dir(location)
    zipfp = open(filename, 'rb')
    try:
        zip = zipfile.ZipFile(zipfp, allowZip64=True)
        leading = has_leading_dir(zip.namelist()) and flatten
        for info in zip.infolist():
            name = info.filename
            data = zip.read(name)
            fn = name
            if leading:
                fn = split_leading_dir(name)[1]
            fn = os.path.join(location, fn)
            dir = os.path.dirname(fn)
            if fn.endswith('/') or fn.endswith('\\'):
                # A directory
                ensure_dir(fn)
            else:
                ensure_dir(dir)
                fp = open(fn, 'wb')
                try:
                    fp.write(data)
                finally:
                    fp.close()
                    mode = info.external_attr >> 16
                    # if mode and regular file and any execute permissions for
                    # user/group/world?
                    if mode and stat.S_ISREG(mode) and mode & 0o111:
                        # make dest file have execute for user/group/world
                        # (chmod +x) no-op on windows per python docs
                        os.chmod(fn, (0o777 - current_umask() | 0o111))
    finally:
        zipfp.close()
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def chmod(path, mode):
    log.debug("changing mode of %s to %o", path, mode)
    try:
        _chmod(path, mode)
    except os.error as e:
        log.debug("chmod failed: %s", e)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def unpack_zipfile(filename, extract_dir, progress_filter=default_filter):
    """Unpack zip `filename` to `extract_dir`

    Raises ``UnrecognizedFormat`` if `filename` is not a zipfile (as determined
    by ``zipfile.is_zipfile()``).  See ``unpack_archive()`` for an explanation
    of the `progress_filter` argument.
    """

    if not zipfile.is_zipfile(filename):
        raise UnrecognizedFormat("%s is not a zip file" % (filename,))

    with ContextualZipFile(filename) as z:
        for info in z.infolist():
            name = info.filename

            # don't extract absolute paths or ones with .. in them
            if name.startswith('/') or '..' in name.split('/'):
                continue

            target = os.path.join(extract_dir, *name.split('/'))
            target = progress_filter(name, target)
            if not target:
                continue
            if name.endswith('/'):
                # directory
                ensure_directory(target)
            else:
                # file
                ensure_directory(target)
                data = z.read(info.filename)
                with open(target, 'wb') as f:
                    f.write(data)
            unix_attributes = info.external_attr >> 16
            if unix_attributes:
                os.chmod(target, unix_attributes)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def extractall(self, path=".", members=None, *, numeric_owner=False):
        """Extract all members from the archive to the current working
           directory and set owner, modification time and permissions on
           directories afterwards. `path' specifies a different directory
           to extract to. `members' is optional and must be a subset of the
           list returned by getmembers(). If `numeric_owner` is True, only
           the numbers for user/group names are used and not the names.
        """
        directories = []

        if members is None:
            members = self

        for tarinfo in members:
            if tarinfo.isdir():
                # Extract directories with a safe mode.
                directories.append(tarinfo)
                tarinfo = copy.copy(tarinfo)
                tarinfo.mode = 0o700
            # Do not set_attrs directories, as we will do that further down
            self.extract(tarinfo, path, set_attrs=not tarinfo.isdir(),
                         numeric_owner=numeric_owner)

        # Reverse sort directories.
        directories.sort(key=lambda a: a.name)
        directories.reverse()

        # Set correct owner, mtime and filemode on directories.
        for tarinfo in directories:
            dirpath = os.path.join(path, tarinfo.name)
            try:
                self.chown(tarinfo, dirpath, numeric_owner=numeric_owner)
                self.utime(tarinfo, dirpath)
                self.chmod(tarinfo, dirpath)
            except ExtractError as e:
                if self.errorlevel > 1:
                    raise
                else:
                    self._dbg(1, "tarfile: %s" % e)
项目:sipxecs-voicemail-transcription    作者:andrewsauder    | 项目源码 | 文件源码
def _setupSock(self):
        if self.sockStr.lower().startswith('inet:'):
            junk , ip , port = self.sockStr.split(':')
            self.sock = socket.socket(socket.AF_INET , socket.SOCK_STREAM)
            self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self.sock.bind((ip , int(port)))
        else:
            if os.path.exists(self.sockStr):
                os.unlink(self.sockStr)
            self.sock = socket.socket(socket.AF_UNIX , socket.SOCK_STREAM)
            self.sock.bind(self.sockStr)
            os.chmod(self.sockStr , self.sockChmod)
        self.sock.settimeout(3)
        self.sock.listen(self.listenq)
项目:sipxecs-voicemail-transcription    作者:andrewsauder    | 项目源码 | 文件源码
def _setupSock(self):
        if self.sockStr.lower().startswith('inet:'):
            junk , ip , port = self.sockStr.split(':')
            self.sock = socket.socket(socket.AF_INET , socket.SOCK_STREAM)
            self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self.sock.bind((ip , int(port)))
        else:
            if os.path.exists(self.sockStr):
                os.unlink(self.sockStr)
            self.sock = socket.socket(socket.AF_UNIX , socket.SOCK_STREAM)
            self.sock.bind(self.sockStr)
            os.chmod(self.sockStr , self.sockChmod)
        self.sock.settimeout(3)
        self.sock.listen(self.listenq)
项目:fuel-nailgun-extension-iac    作者:openstack    | 项目源码 | 文件源码
def _create_key_file(self, repo_name, data):
        key_path = self._get_key_path(repo_name)
        with open(key_path, 'w') as key_file:
            key_file.write(data)
        os.chmod(key_path, 0o600)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def extractall(self, path=".", members=None):
        """Extract all members from the archive to the current working
           directory and set owner, modification time and permissions on
           directories afterwards. `path' specifies a different directory
           to extract to. `members' is optional and must be a subset of the
           list returned by getmembers().
        """
        directories = []

        if members is None:
            members = self

        for tarinfo in members:
            if tarinfo.isdir():
                # Extract directories with a safe mode.
                directories.append(tarinfo)
                tarinfo = copy.copy(tarinfo)
                tarinfo.mode = 0o700
            # Do not set_attrs directories, as we will do that further down
            self.extract(tarinfo, path, set_attrs=not tarinfo.isdir())

        # Reverse sort directories.
        directories.sort(key=lambda a: a.name)
        directories.reverse()

        # Set correct owner, mtime and filemode on directories.
        for tarinfo in directories:
            dirpath = os.path.join(path, tarinfo.name)
            try:
                self.chown(tarinfo, dirpath)
                self.utime(tarinfo, dirpath)
                self.chmod(tarinfo, dirpath)
            except ExtractError as e:
                if self.errorlevel > 1:
                    raise
                else:
                    self._dbg(1, "tarfile: %s" % e)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def chmod(self, tarinfo, targetpath):
        """Set file permissions of targetpath according to tarinfo.
        """
        if hasattr(os, 'chmod'):
            try:
                os.chmod(targetpath, tarinfo.mode)
            except EnvironmentError as e:
                raise ExtractError("could not change mode")
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def copymode(src, dst):
    """Copy mode bits from src to dst"""
    if hasattr(os, 'chmod'):
        st = os.stat(src)
        mode = stat.S_IMODE(st.st_mode)
        os.chmod(dst, mode)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def copystat(src, dst):
    """Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""
    st = os.stat(src)
    mode = stat.S_IMODE(st.st_mode)
    if hasattr(os, 'utime'):
        os.utime(dst, (st.st_atime, st.st_mtime))
    if hasattr(os, 'chmod'):
        os.chmod(dst, mode)
    if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
        try:
            os.chflags(dst, st.st_flags)
        except OSError as why:
            if (not hasattr(errno, 'EOPNOTSUPP') or
                why.errno != errno.EOPNOTSUPP):
                raise