Python stat 模块,S_IRUSR 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用stat.S_IRUSR

项目:ave    作者:sonyxperiadev    | 项目源码 | 文件源码
def t05(w,h):
    pretty = '%s t5' % __file__
    print(pretty)

    result_path = w.make_tempdir()
    os.chmod(result_path, stat.S_IRUSR) # user read permission only
    try:
        h.run_gtest('some_target', result_path)
        print(
            'FAIL %s: expected error (not writable result directory): %s'
            % (pretty, temp_dir)
        )
        return False
    except Exception as e:
        if e.message != 'result_path not a writable directory: %s' % result_path:
            print('FAIL %s: wrong error message: %s' % (pretty, str(e)))
            return False
    finally:
        os.removedirs(result_path)
    return True

# successful execution of run_gtest(): check files on host, verfiy their
#   content and that traces from run removed from handset
项目:ave    作者:sonyxperiadev    | 项目源码 | 文件源码
def t05(w,h):
    pretty = '%s t5' % __file__
    print(pretty)

    result_path = w.make_tempdir()
    os.chmod(result_path, stat.S_IRUSR) # user read permission only
    try:
        h.run_gtest('some_target', result_path)
        print(
            'FAIL %s: expected error (not writable result directory): %s'
            % (pretty, temp_dir)
        )
        return False
    except Exception as e:
        if e.message != 'result_path not a writable directory: %s' % result_path:
            print('FAIL %s: wrong error message: %s' % (pretty, str(e)))
            return False
    finally:
        os.removedirs(result_path)
    return True

# successful execution of run_gtest(): check files on host, verfiy their
#   content and that traces from run removed from handset
项目:docker-db    作者:EXASOL    | 项目源码 | 文件源码
def commit(self):
        """ 
        Writes the configuration to disk (into '$RootDir/EXAConf')
        """
        self.config.write()
        # reload in order to force type conversion 
        # --> parameters added as lists during runtime are converted back to strings (as if they have been added manually)
        self.config.reload()
        # modify permissions
        try:
            os.chmod(self.conf_path, stat.S_IRUSR | stat.S_IWUSR)
        except OSError as e:
            raise EXAConfError("Failed to change permissions for '%s': %s" % (self.conf_path, e))
#}}}

#{{{ Platform supported
项目:nitro    作者:KVM-VMI    | 项目源码 | 文件源码
def main(args):
    vm_name = args['<vm_name>']
    # get domain from libvirt
    con = libvirt.open('qemu:///system')
    domain = con.lookupByName(vm_name)

    path = os.path.join(os.getcwd(), '{}.raw'.format(vm_name))
    with open(path, 'w') as f:
        # chmod to be r/w by everyone
        os.chmod(path, stat.S_IRUSR | stat.S_IWUSR |
                                stat.S_IRGRP | stat.S_IWGRP |
                                stat.S_IROTH | stat.S_IWOTH)
        # take a ram dump
        flags = libvirt.VIR_DUMP_MEMORY_ONLY
        dumpformat = libvirt.VIR_DOMAIN_CORE_DUMP_FORMAT_RAW
        domain.coreDumpWithFormat(path, dumpformat, flags)
项目:binaryanalysis    作者:armijnhemel    | 项目源码 | 文件源码
def cleanupdir(temporarydir):
    osgen = os.walk(temporarydir)
    try:
        while True:
            i = osgen.next()
            ## make sure all directories can be accessed
            for d in i[1]:
                if not os.path.islink(os.path.join(i[0], d)):
                    os.chmod(os.path.join(i[0], d), stat.S_IRUSR|stat.S_IWUSR|stat.S_IXUSR)
            for p in i[2]:
                try:
                    if not os.path.islink(os.path.join(i[0], p)):
                        os.chmod(os.path.join(i[0], p), stat.S_IRUSR|stat.S_IWUSR|stat.S_IXUSR)
                except Exception, e:
                    #print e
                    pass
    except StopIteration:
        pass
    try:
        shutil.rmtree(temporarydir)
    except:
        ## nothing that can be done right now, so just give up
        pass
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_read_only_bytecode(self):
        # When bytecode is read-only but should be rewritten, fail silently.
        with source_util.create_modules('_temp') as mapping:
            # Create bytecode that will need to be re-created.
            py_compile.compile(mapping['_temp'])
            bytecode_path = imp.cache_from_source(mapping['_temp'])
            with open(bytecode_path, 'r+b') as bytecode_file:
                bytecode_file.seek(0)
                bytecode_file.write(b'\x00\x00\x00\x00')
            # Make the bytecode read-only.
            os.chmod(bytecode_path,
                        stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
            try:
                # Should not raise IOError!
                self.import_(mapping['_temp'], '_temp')
            finally:
                # Make writable for eventual clean-up.
                os.chmod(bytecode_path, stat.S_IWUSR)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_execute_bit_not_copied(self):
        # Issue 6070: under posix .pyc files got their execute bit set if
        # the .py file had the execute bit set, but they aren't executable.
        with temp_umask(0o022):
            sys.path.insert(0, os.curdir)
            try:
                fname = TESTFN + os.extsep + "py"
                open(fname, 'w').close()
                os.chmod(fname, (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH |
                                 stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))
                fn = imp.cache_from_source(fname)
                unlink(fn)
                __import__(TESTFN)
                if not os.path.exists(fn):
                    self.fail("__import__ did not result in creation of "
                              "either a .pyc or .pyo file")
                s = os.stat(fn)
                self.assertEqual(stat.S_IMODE(s.st_mode),
                                 stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
            finally:
                del sys.path[0]
                remove_files(TESTFN)
                unload(TESTFN)
项目:indy-node    作者:hyperledger    | 项目源码 | 文件源码
def set_own_perm(usr, dir_list):
    uid = pwd.getpwnam(usr).pw_uid
    gid = grp.getgrnam(usr).gr_gid
    perm_mask_rw = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP
    perm_mask_rwx = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP

    for cdir in dir_list:
        os.chown(cdir, uid, gid)
        os.chmod(cdir, perm_mask_rwx)
        for croot, sub_dirs, cfiles in os.walk(cdir):
            for fs_name in sub_dirs:
                os.chown(os.path.join(croot, fs_name), uid, gid)
                os.chmod(os.path.join(croot, fs_name), perm_mask_rwx)
            for fs_name in cfiles:
                os.chown(os.path.join(croot, fs_name), uid, gid)
                os.chmod(os.path.join(croot, fs_name), perm_mask_rw)
项目:indy-node    作者:hyperledger    | 项目源码 | 文件源码
def set_own_perm(usr, dir_list):
    uid = pwd.getpwnam(usr).pw_uid
    gid = grp.getgrnam(usr).gr_gid
    perm_mask_rw = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP
    perm_mask_rwx = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP

    for cdir in dir_list:
        os.chown(cdir, uid, gid)
        os.chmod(cdir, perm_mask_rwx)
        for croot, sub_dirs, cfiles in os.walk(cdir):
            for fs_name in sub_dirs:
                os.chown(os.path.join(croot, fs_name), uid, gid)
                os.chmod(os.path.join(croot, fs_name), perm_mask_rwx)
            for fs_name in cfiles:
                os.chown(os.path.join(croot, fs_name), uid, gid)
                os.chmod(os.path.join(croot, fs_name), perm_mask_rw)
项目:Polyglot    作者:UniversalDevicesInc    | 项目源码 | 文件源码
def write(self):
        """ Writes configuration file """
        # wait for file to unlock. Timeout after 5 seconds
        for _ in range(5):
            if not self._writing:
                self._writing = True
                break
            time.sleep(1)
        else:
            _LOGGER.error('Could not write to configuration file. It is busy.')
            return

        # dump JSON to config file and unlock
        encoded = self.encode()
        json.dump(encoded, open(self._filetmp, 'w'), sort_keys=True, indent=4,
                  separators=(',', ': '))
        os.chmod(self._filetmp, stat.S_IRUSR | stat.S_IWUSR)
        try:
            shutil.move(self._filetmp, self._file)
            _LOGGER.debug('Config file succesfully updated.')
        except shutil.Error as e:
            _LOGGER.error('Failed to move temp config file to original error: ' + str(e))

        self._writing = False
        _LOGGER.debug('Wrote configuration file')
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_execute_bit_not_copied(self):
        # Issue 6070: under posix .pyc files got their execute bit set if
        # the .py file had the execute bit set, but they aren't executable.
        oldmask = os.umask(022)
        sys.path.insert(0, os.curdir)
        try:
            fname = TESTFN + os.extsep + "py"
            f = open(fname, 'w').close()
            os.chmod(fname, (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH |
                             stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))
            __import__(TESTFN)
            fn = fname + 'c'
            if not os.path.exists(fn):
                fn = fname + 'o'
                if not os.path.exists(fn):
                    self.fail("__import__ did not result in creation of "
                              "either a .pyc or .pyo file")
            s = os.stat(fn)
            self.assertEqual(stat.S_IMODE(s.st_mode),
                             stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
        finally:
            os.umask(oldmask)
            remove_files(TESTFN)
            unload(TESTFN)
            del sys.path[0]
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_readonly_files(self):
        dir = _fname
        os.mkdir(dir)
        try:
            fname = os.path.join(dir, 'db')
            f = dumbdbm.open(fname, 'n')
            self.assertEqual(list(f.keys()), [])
            for key in self._dict:
                f[key] = self._dict[key]
            f.close()
            os.chmod(fname + ".dir", stat.S_IRUSR)
            os.chmod(fname + ".dat", stat.S_IRUSR)
            os.chmod(dir, stat.S_IRUSR|stat.S_IXUSR)
            f = dumbdbm.open(fname, 'r')
            self.assertEqual(sorted(f.keys()), sorted(self._dict))
            f.close()  # don't write
        finally:
            test_support.rmtree(dir)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_execute_bit_not_copied(self):
        # Issue 6070: under posix .pyc files got their execute bit set if
        # the .py file had the execute bit set, but they aren't executable.
        oldmask = os.umask(022)
        sys.path.insert(0, os.curdir)
        try:
            fname = TESTFN + os.extsep + "py"
            f = open(fname, 'w').close()
            os.chmod(fname, (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH |
                             stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))
            __import__(TESTFN)
            fn = fname + 'c'
            if not os.path.exists(fn):
                fn = fname + 'o'
                if not os.path.exists(fn):
                    self.fail("__import__ did not result in creation of "
                              "either a .pyc or .pyo file")
            s = os.stat(fn)
            self.assertEqual(stat.S_IMODE(s.st_mode),
                             stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
        finally:
            os.umask(oldmask)
            remove_files(TESTFN)
            unload(TESTFN)
            del sys.path[0]
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_readonly_files(self):
        dir = _fname
        os.mkdir(dir)
        try:
            fname = os.path.join(dir, 'db')
            f = dumbdbm.open(fname, 'n')
            self.assertEqual(list(f.keys()), [])
            for key in self._dict:
                f[key] = self._dict[key]
            f.close()
            os.chmod(fname + ".dir", stat.S_IRUSR)
            os.chmod(fname + ".dat", stat.S_IRUSR)
            os.chmod(dir, stat.S_IRUSR|stat.S_IXUSR)
            f = dumbdbm.open(fname, 'r')
            self.assertEqual(sorted(f.keys()), sorted(self._dict))
            f.close()  # don't write
        finally:
            test_support.rmtree(dir)
项目:orangecloud-client    作者:antechrestos    | 项目源码 | 文件源码
def _init_client():
    global _configuration_directory

    def prompt(msg):
        sys.stdout.write('%s: ' % msg)
        response = sys.stdin.readline()
        return response.rstrip('\r\n')

    client_id = prompt('Client Id')
    client_secret = prompt('Client Secret')
    redirect_uri = prompt('Redirect Uri')
    if not os.path.exists(_configuration_directory):
        os.mkdir(_configuration_directory, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
    client = _CommandClient(client_id, client_secret, redirect_uri)
    _init_oauth_process(client)
    # save first time, meaning configuration works
    client.save_configuration()
    return client
项目:ssh-ca-server    作者:commercehub-oss    | 项目源码 | 文件源码
def create_ca(self):
        """ Create a new CA """

        temp_ca_key_file = '/tmp/{}'.format(self.ca_name)
        temp_ca_cert_file = '{}.pub'.format(temp_ca_key_file)

        ca_key_file = self.full_path(self.ca_path, self.ca_name)
        ca_cert_file = '{}.pub'.format(ca_key_file)

        subprocess.call(['ssh-keygen',
                         '-f', temp_ca_key_file,
                         '-q', '-P', ''])

        if not os.path.exists(self.ca_path):
            self.mkdir_recursive(self.ca_path)

        # If CA was successfully created move it to the correct location and
        # set appropriate permissions
        if os.path.isfile(temp_ca_key_file) and os.path.isfile(temp_ca_cert_file):

            os.rename(temp_ca_cert_file, ca_cert_file)
            os.chmod(ca_cert_file, stat.S_IRUSR)

            os.rename(temp_ca_key_file, ca_key_file)
            os.chmod(ca_key_file, stat.S_IRUSR)
项目:polyloader    作者:elfsternberg    | 项目源码 | 文件源码
def test_execute_bit_not_copied(self):
        # Issue 6070: under posix .pyc files got their execute bit set if
        # the .py file had the execute bit set, but they aren't executable.
        oldmask = os.umask(0o22)
        sys.path.insert(0, os.curdir)
        try:
            fname = TESTFN + os.extsep + "py"
            f = open(fname, 'w').close()
            os.chmod(fname, (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH |
                             stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))
            __import__(TESTFN)
            fn = fname + 'c'
            if not os.path.exists(fn):
                fn = fname + 'o'
                if not os.path.exists(fn):
                    self.fail("__import__ did not result in creation of "
                              "either a .pyc or .pyo file")
            s = os.stat(fn)
            assert(s.st_mode & (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH))
            assert(not (s.st_mode & (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)))
        finally:
            os.umask(oldmask)
            clean_tmpfiles(fname)
            unload(TESTFN)
            del sys.path[0]
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_read_only_bytecode(self):
        # When bytecode is read-only but should be rewritten, fail silently.
        with source_util.create_modules('_temp') as mapping:
            # Create bytecode that will need to be re-created.
            py_compile.compile(mapping['_temp'])
            bytecode_path = imp.cache_from_source(mapping['_temp'])
            with open(bytecode_path, 'r+b') as bytecode_file:
                bytecode_file.seek(0)
                bytecode_file.write(b'\x00\x00\x00\x00')
            # Make the bytecode read-only.
            os.chmod(bytecode_path,
                        stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
            try:
                # Should not raise IOError!
                self.import_(mapping['_temp'], '_temp')
            finally:
                # Make writable for eventual clean-up.
                os.chmod(bytecode_path, stat.S_IWUSR)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_mknod_dir_fd(self):
        # Test using mknodat() to create a FIFO (the only use specified
        # by POSIX).
        support.unlink(support.TESTFN)
        mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
        f = posix.open(posix.getcwd(), posix.O_RDONLY)
        try:
            posix.mknod(support.TESTFN, mode, 0, dir_fd=f)
        except OSError as e:
            # Some old systems don't allow unprivileged users to use
            # mknod(), or only support creating device nodes.
            self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
        else:
            self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
        finally:
            posix.close(f)
项目:partysig    作者:str4d    | 项目源码 | 文件源码
def secure_file(fname, remove_existing=False):
    flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL  # Refer to "man 2 open".
    mode = stat.S_IRUSR | stat.S_IWUSR  # This is 0o600 in octal and 384 in decimal.

    if remove_existing:
        # For security, remove file with potentially elevated mode
        try:
            os.remove(fname)
        except OSError:
            pass

    # Open file descriptor
    umask_original = os.umask(0)
    try:
        fdesc = os.open(fname, flags, mode)
    finally:
        os.umask(umask_original)

    return fdesc
项目:AmqpCode    作者:SVADemoAPP    | 项目源码 | 文件源码
def testXid(self):
    sc = StringCodec()
    xid = Xid(format=0, global_id="gid", branch_id="bid")
    sc.write_compound(xid)
    assert sc.encoded == '\x00\x00\x00\x10\x06\x04\x07\x00\x00\x00\x00\x00\x03gid\x03bid'
    dec = sc.read_compound(Xid)
    assert xid.__dict__ == dec.__dict__

#   def testLoadReadOnly(self):
#     spec = "amqp.0-10-qpid-errata.xml"
#     f = testrunner.get_spec_file(spec)
#     dest = tempfile.mkdtemp()
#     shutil.copy(f, dest)
#     shutil.copy(os.path.join(os.path.dirname(f), "amqp.0-10.dtd"), dest)
#     os.chmod(dest, stat.S_IRUSR | stat.S_IXUSR)
#     fname = os.path.join(dest, spec)
#     load(fname)
#     assert not os.path.exists("%s.pcl" % fname)
项目:find_circ2    作者:rajewsky-lab    | 项目源码 | 文件源码
def store_index(self,ipath):
        self.logger.info("# indexed_fasta.store_index('%s')" % ipath)

        # write to tmp-file first and in the end rename in order to have this atomic 
        # otherwise parallel building of the same index may screw it up.

        import tempfile
        tmp = tempfile.NamedTemporaryFile(mode="w",dir = os.path.dirname(ipath),delete=False)
        for chrom in sorted(self.chrom_stats.keys()):
            ofs,ldata,skip,skipchar,size = self.chrom_stats[chrom]
            tmp.write("%s\t%d\t%d\t%d\t%r\t%d\n" % (chrom,ofs,ldata,skip,skipchar,size))

        # make sure everything is on disk
        os.fsync(tmp)
        tmp.close()

        # make it accessible to everyone
        import stat
        os.chmod(tmp.name, stat.S_IROTH | stat.S_IRGRP | stat.S_IRUSR)

        # this is atomic on POSIX as we have created tmp in the same directory, 
        # therefore same filesystem
        os.rename(tmp.name,ipath)
项目:find_circ    作者:rajewsky-lab    | 项目源码 | 文件源码
def store_index(self,ipath):
        debug("# indexed_fasta.store_index('%s')" % ipath)

        # write to tmp-file first and in the end rename in order to have this atomic 
        # otherwise parallel building of the same index may screw it up.

        import tempfile
        tmp = tempfile.NamedTemporaryFile(mode="w",dir = os.path.dirname(ipath),delete=False)
        for chrom in sorted(self.chrom_stats.keys()):
            ofs,ldata,skip,skipchar,size = self.chrom_stats[chrom]
            tmp.write("%s\t%d\t%d\t%d\t%r\t%d\n" % (chrom,ofs,ldata,skip,skipchar,size))

        # make sure everything is on disk
        os.fsync(tmp)
        tmp.close()

        # make it accessible to everyone
        import stat
        os.chmod(tmp.name, stat.S_IROTH | stat.S_IRGRP | stat.S_IRUSR)

        # this is atomic on POSIX as we have created tmp in the same directory, 
        # therefore same filesystem
        os.rename(tmp.name,ipath)
项目:qpid-python    作者:apache    | 项目源码 | 文件源码
def testXid(self):
    sc = StringCodec()
    xid = Xid(format=0, global_id="gid", branch_id="bid")
    sc.write_compound(xid)
    assert sc.encoded == '\x00\x00\x00\x10\x06\x04\x07\x00\x00\x00\x00\x00\x03gid\x03bid'
    dec = sc.read_compound(Xid)
    assert xid.__dict__ == dec.__dict__

#   def testLoadReadOnly(self):
#     spec = "amqp.0-10-qpid-errata.xml"
#     f = testrunner.get_spec_file(spec)
#     dest = tempfile.mkdtemp()
#     shutil.copy(f, dest)
#     shutil.copy(os.path.join(os.path.dirname(f), "amqp.0-10.dtd"), dest)
#     os.chmod(dest, stat.S_IRUSR | stat.S_IXUSR)
#     fname = os.path.join(dest, spec)
#     load(fname)
#     assert not os.path.exists("%s.pcl" % fname)
项目:machammer    作者:filipp    | 项目源码 | 文件源码
def login(func):
    """Install Python function as a LoginHook."""
    def func_wrapper(hook='login'):
        path = '/var/root/Library/mh_%shook.py' % hook

        writesource(func, path)

        # only root should read and execute
        os.chown(path, 0, 0)
        os.chmod(path, stat.S_IXUSR | stat.S_IRUSR)

        if hook == 'login':
            hooks.login(path)
        else:
            hooks.logout(path)

        return path

    return func_wrapper
项目:ClockworkVMs    作者:csd-dev-tools    | 项目源码 | 文件源码
def tearDown(self):
        '''
        Disconnect ramdisk, unloading data to pre-build location.
        '''
        self.mbl.chownR(self.keyuser, self.tmphome + "/src")

        # chmod so it's readable by everyone, writable by the group
        self.mbl.chmodR(stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH |
                        stat.S_IWGRP, self.tmphome + "/src", "append")
        self.libc.sync()
        self.libc.sync()
        # Copy back to pseudo-build directory
        call([self.RSYNC, "-aqp", self.tmphome + "/src/MacBuild/dmgs", self.buildHome])
        call([self.RSYNC, "-aqp", self.tmphome + "/src/", self.buildHome + "/builtSrc"])
        self.libc.sync()
        self.libc.sync()

        os.chdir(self.buildHome)
        self._exit(self.ramdisk, self.luggage, 0)
项目:deepsleepnet    作者:akaraspt    | 项目源码 | 文件源码
def _create_eae_zipfile(self, zip_file_name, main_file_path, data_files=None):

        to_zip = []
        if data_files is None:
            data_files = []

        # Handle main script
        to_zip.append(main_file_path)

        # Prepare the zip file
        zip_path = "/tmp/" + zip_file_name
        zipf = zipfile.ZipFile(zip_path, mode='w', compression=zipfile.ZIP_DEFLATED, allowZip64=True)
        for f in to_zip:
            zipf.write(f)
        zipf.close()

        # Handle other files & dirs
        for f in data_files:
            zipCommand = "zip -r -u -0 " + zip_path + " " + f
            call([zipCommand], shell=True)

        # Chmod 666 the zip file so it can be accessed
        os.chmod(zip_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH | stat.S_IWOTH)

        return zip_path
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def test_create_script_perms(self):
        """this tests the create_script function (permissions).
        """
        script_file = os.path.join(self.root, 'script')
        # Test non-default mode (+x)
        mode = (stat.S_IRUSR |
                stat.S_IRGRP |
                stat.S_IROTH)

        utils.create_script(
            script_file,
            's6.run',
            mode=mode,
            user='testproid',
            home='home',
            shell='shell',
            _alias={
                's6_setuidgid': '/test/s6-setuidgid',
            }
        )

        self.assertEqual(utils.os.stat(script_file).st_mode, 33060)
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def save_app(manifest, container_dir, app_json=STATE_JSON):
    """Saves app manifest and freezes to object."""
    # Save the manifest with allocated vip and ports in the state
    state_file = os.path.join(container_dir, app_json)
    fs.write_safe(
        state_file,
        lambda f: json.dump(manifest, f)
    )
    # chmod for the file to be world readable.
    if os.name == 'posix':
        os.chmod(
            state_file,
            stat.S_IWUSR | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
        )

    # Freeze the app data into a namedtuple object
    return utils.to_obj(manifest)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_execute_bit_not_copied(self):
        # Issue 6070: under posix .pyc files got their execute bit set if
        # the .py file had the execute bit set, but they aren't executable.
        oldmask = os.umask(022)
        sys.path.insert(0, os.curdir)
        try:
            fname = TESTFN + os.extsep + "py"
            f = open(fname, 'w').close()
            os.chmod(fname, (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH |
                             stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))
            __import__(TESTFN)
            fn = fname + 'c'
            if not os.path.exists(fn):
                fn = fname + 'o'
                if not os.path.exists(fn):
                    self.fail("__import__ did not result in creation of "
                              "either a .pyc or .pyo file")
            s = os.stat(fn)
            self.assertEqual(stat.S_IMODE(s.st_mode),
                             stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
        finally:
            os.umask(oldmask)
            remove_files(TESTFN)
            unload(TESTFN)
            del sys.path[0]
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_read_only_bytecode(self):
        # When bytecode is read-only but should be rewritten, fail silently.
        with source_util.create_modules('_temp') as mapping:
            # Create bytecode that will need to be re-created.
            py_compile.compile(mapping['_temp'])
            bytecode_path = self.util.cache_from_source(mapping['_temp'])
            with open(bytecode_path, 'r+b') as bytecode_file:
                bytecode_file.seek(0)
                bytecode_file.write(b'\x00\x00\x00\x00')
            # Make the bytecode read-only.
            os.chmod(bytecode_path,
                        stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
            try:
                # Should not raise OSError!
                self.import_(mapping['_temp'], '_temp')
            finally:
                # Make writable for eventual clean-up.
                os.chmod(bytecode_path, stat.S_IWUSR)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_mknod(self):
        # Test using mknod() to create a FIFO (the only use specified
        # by POSIX).
        support.unlink(support.TESTFN)
        mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
        try:
            posix.mknod(support.TESTFN, mode, 0)
        except OSError as e:
            # Some old systems don't allow unprivileged users to use
            # mknod(), or only support creating device nodes.
            self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
        else:
            self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))

        # Keyword arguments are also supported
        support.unlink(support.TESTFN)
        try:
            posix.mknod(path=support.TESTFN, mode=mode, device=0,
                dir_fd=None)
        except OSError as e:
            self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_mknod_dir_fd(self):
        # Test using mknodat() to create a FIFO (the only use specified
        # by POSIX).
        support.unlink(support.TESTFN)
        mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
        f = posix.open(posix.getcwd(), posix.O_RDONLY)
        try:
            posix.mknod(support.TESTFN, mode, 0, dir_fd=f)
        except OSError as e:
            # Some old systems don't allow unprivileged users to use
            # mknod(), or only support creating device nodes.
            self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
        else:
            self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
        finally:
            posix.close(f)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_execute_bit_not_copied(self):
        # Issue 6070: under posix .pyc files got their execute bit set if
        # the .py file had the execute bit set, but they aren't executable.
        oldmask = os.umask(022)
        sys.path.insert(0, os.curdir)
        try:
            fname = TESTFN + os.extsep + "py"
            f = open(fname, 'w').close()
            os.chmod(fname, (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH |
                             stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))
            __import__(TESTFN)
            fn = fname + 'c'
            if not os.path.exists(fn):
                fn = fname + 'o'
                if not os.path.exists(fn):
                    self.fail("__import__ did not result in creation of "
                              "either a .pyc or .pyo file")
            s = os.stat(fn)
            self.assertEqual(stat.S_IMODE(s.st_mode),
                             stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
        finally:
            os.umask(oldmask)
            remove_files(TESTFN)
            unload(TESTFN)
            del sys.path[0]
项目:Chromium_DepotTools    作者:p07r0457    | 项目源码 | 文件源码
def _get_netrc(cls):
    # Buffer the '.netrc' path. Use an empty file if it doesn't exist.
    path = cls.get_netrc_path()
    content = ''
    if os.path.exists(path):
      st = os.stat(path)
      if st.st_mode & (stat.S_IRWXG | stat.S_IRWXO):
        print >> sys.stderr, (
            'WARNING: netrc file %s cannot be used because its file '
            'permissions are insecure.  netrc file permissions should be '
            '600.' % path)
      with open(path) as fd:
        content = fd.read()

    # Load the '.netrc' file. We strip comments from it because processing them
    # can trigger a bug in Windows. See crbug.com/664664.
    content = '\n'.join(l for l in content.splitlines()
                        if l.strip() and not l.strip().startswith('#'))
    with tempdir() as tdir:
      netrc_path = os.path.join(tdir, 'netrc')
      with open(netrc_path, 'w') as fd:
        fd.write(content)
      os.chmod(netrc_path, (stat.S_IRUSR | stat.S_IWUSR))
      return cls._get_netrc_from_path(netrc_path)
项目:kibitzr    作者:kibitzr    | 项目源码 | 文件源码
def create_boilerplate():
    """
    Create kibitzr.yml and kibitzr-creds.yml in current directory
    if they do not exist.
    """
    if not os.path.exists('kibitzr.yml'):
        with open('kibitzr.yml', 'wt') as fp:
            logger.info("Saving sample check in kibitzr.yml")
            fp.write(KIBITZR_YML)
    else:
        logger.info("kibitzr.yml already exists. Skipping")
    if not os.path.exists('kibitzr-creds.yml'):
        with open('kibitzr-creds.yml', 'wt') as fp:
            logger.info("Creating kibitzr-creds.yml")
            fp.write(KIBITZR_CREDS_YML)
        os.chmod('kibitzr-creds.yml', stat.S_IRUSR | stat.S_IWUSR)
    else:
        logger.info("kibitzr-creds.yml already exists. Skipping")
项目:spark    作者:chenguolin    | 项目源码 | 文件源码
def _octal_to_perm(octal):
    perms = list("-" * 9)
    if octal & stat.S_IRUSR:
        perms[0] = "r"
    if octal & stat.S_IWUSR:
        perms[1] = "w"
    if octal & stat.S_IXUSR:
        perms[2] = "x"
    if octal & stat.S_IRGRP:
        perms[3] = "r"
    if octal & stat.S_IWGRP:
        perms[4] = "w"
    if octal & stat.S_IXGRP:
        perms[5] = "x"
    if octal & stat.S_IROTH:
        perms[6] = "r"
    if octal & stat.S_IWOTH:
        perms[7] = "w"
    if octal & stat.S_IXOTH:
        perms[8] = "x"
    return "".join(perms)
项目:node-gn    作者:Shouqun    | 项目源码 | 文件源码
def _get_netrc(cls):
    # Buffer the '.netrc' path. Use an empty file if it doesn't exist.
    path = cls.get_netrc_path()
    content = ''
    if os.path.exists(path):
      st = os.stat(path)
      if st.st_mode & (stat.S_IRWXG | stat.S_IRWXO):
        print >> sys.stderr, (
            'WARNING: netrc file %s cannot be used because its file '
            'permissions are insecure.  netrc file permissions should be '
            '600.' % path)
      with open(path) as fd:
        content = fd.read()

    # Load the '.netrc' file. We strip comments from it because processing them
    # can trigger a bug in Windows. See crbug.com/664664.
    content = '\n'.join(l for l in content.splitlines()
                        if l.strip() and not l.strip().startswith('#'))
    with tempdir() as tdir:
      netrc_path = os.path.join(tdir, 'netrc')
      with open(netrc_path, 'w') as fd:
        fd.write(content)
      os.chmod(netrc_path, (stat.S_IRUSR | stat.S_IWUSR))
      return cls._get_netrc_from_path(netrc_path)
项目:HaboMalHunter    作者:Tencent    | 项目源码 | 文件源码
def init_workspace(cfg, is_inplace=False):
    log.info("init workspace is_inplace:%r",is_inplace)
    if is_inplace:
        init_workspace_inplace(cfg)
    else:
        # create a clean workspace
        workspace_dir = os.path.join(BASE_HOME,cfg.exec_home)
        if not os.path.exists(workspace_dir):
            os.mkdir(workspace_dir)
        else:
            shutil.rmtree(workspace_dir)
            os.mkdir(workspace_dir)
        cfg.workspace_dir = workspace_dir

        # copy target file into workspace
        if os.path.exists(cfg.target):
            shutil.copy(cfg.target, workspace_dir)
            target_abs_path = os.path.join(workspace_dir, os.path.basename(cfg.target))
            cfg.target_abs_path = target_abs_path
            os.chmod(target_abs_path, stat.S_IRUSR)
        else:
            log.critical("%s dose not exist.", cfg.target)
            os._exit(1)
        log.info("Target absolute path: %s", cfg.target_abs_path)
        # I can not change dir at here, the dir will be changed everytime when analyzer starts.
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_read_only_bytecode(self):
        # When bytecode is read-only but should be rewritten, fail silently.
        with source_util.create_modules('_temp') as mapping:
            # Create bytecode that will need to be re-created.
            py_compile.compile(mapping['_temp'])
            bytecode_path = self.util.cache_from_source(mapping['_temp'])
            with open(bytecode_path, 'r+b') as bytecode_file:
                bytecode_file.seek(0)
                bytecode_file.write(b'\x00\x00\x00\x00')
            # Make the bytecode read-only.
            os.chmod(bytecode_path,
                        stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
            try:
                # Should not raise OSError!
                self.import_(mapping['_temp'], '_temp')
            finally:
                # Make writable for eventual clean-up.
                os.chmod(bytecode_path, stat.S_IWUSR)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_mknod_dir_fd(self):
        # Test using mknodat() to create a FIFO (the only use specified
        # by POSIX).
        support.unlink(support.TESTFN)
        mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
        f = posix.open(posix.getcwd(), posix.O_RDONLY)
        try:
            posix.mknod(support.TESTFN, mode, 0, dir_fd=f)
        except OSError as e:
            # Some old systems don't allow unprivileged users to use
            # mknod(), or only support creating device nodes.
            self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
        else:
            self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
        finally:
            posix.close(f)
项目:ansibilo    作者:Polyconseil    | 项目源码 | 文件源码
def write(filename, data, undisclosed=False):
    """
    Write JSON data to (owner only readable) file.

    This will also create missing directories.

    Args:
        filename (str): file path
        data (str): data to write
        undisclosed (bool): whether data should be owner only readable
    """
    makedirs(os.path.dirname(filename))
    with open(filename, 'w') as fp:
        if undisclosed:
            os.chmod(filename, stat.S_IRUSR | stat.S_IWUSR)
        json.dump(data, fp)
项目:sodium11    作者:trbs    | 项目源码 | 文件源码
def open_for_writing(filename, permissions=None, mode="w", force=False):
    if permissions is None:
        permissions = stat.S_IRUSR | stat.S_IWUSR

    if force and os.path.exists(filename):
        os.unlink(filename)

    umask_original = os.umask(0)
    try:
        fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_EXCL, permissions)
    finally:
        os.umask(umask_original)

    # Open file handle and write to file
    with os.fdopen(fd, mode) as f:
        yield f
项目:Security-Research    作者:RhinoSecurityLabs    | 项目源码 | 文件源码
def printlog(message, logpath=False):
    # I think the logging module is great, but this will be used for the time being
    # Eventually, we will want to write out multiple output formats: xml,json, etc
    if logpath:
        # Next iteration of project will include a more secure logger,
        # for now, we will just write results to a file directly.

        # flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
        # mode = stat.S_IRUSR | stat.S_IWUSR
        # umask_original = os.umask(0)
        # try:
        #     fdesc = os.open(logpath, flags, mode)
        # # except(OSError):
        # #     print("[-] Log file exists. Remove it, or change log filename")
        # #     raise SystemExit
        # finally:
        #     os.umask(umask_original)
        # with os.fdopen(fdesc, 'w') as fout:
        with open(logpath, 'a') as fout:
            fout.write("{}\n".format(message))
    print("{}".format(message))
项目:depot_tools    作者:webrtc-uwp    | 项目源码 | 文件源码
def _get_netrc(cls):
    # Buffer the '.netrc' path. Use an empty file if it doesn't exist.
    path = cls.get_netrc_path()
    content = ''
    if os.path.exists(path):
      st = os.stat(path)
      if st.st_mode & (stat.S_IRWXG | stat.S_IRWXO):
        print >> sys.stderr, (
            'WARNING: netrc file %s cannot be used because its file '
            'permissions are insecure.  netrc file permissions should be '
            '600.' % path)
      with open(path) as fd:
        content = fd.read()

    # Load the '.netrc' file. We strip comments from it because processing them
    # can trigger a bug in Windows. See crbug.com/664664.
    content = '\n'.join(l for l in content.splitlines()
                        if l.strip() and not l.strip().startswith('#'))
    with tempdir() as tdir:
      netrc_path = os.path.join(tdir, 'netrc')
      with open(netrc_path, 'w') as fd:
        fd.write(content)
      os.chmod(netrc_path, (stat.S_IRUSR | stat.S_IWUSR))
      return cls._get_netrc_from_path(netrc_path)
项目:OnlineSchemaChange    作者:facebookincubator    | 项目源码 | 文件源码
def is_file_readable(filepath):
    """
    Check if the file given is readable to the user we are currently running
    at
    """
    uid = os.getuid()
    euid = os.geteuid()
    gid = os.getgid()
    egid = os.getegid()

    # This is probably true most of the time, so just let os.access()
    # handle it.  Avoids potential bugs in the rest of this function.
    if uid == euid and gid == egid:
        return os.access(filepath, os.R_OK)

    st = os.stat(filepath)

    if st.st_uid == euid:
        return st.st_mode & stat.S_IRUSR != 0

    groups = os.getgroups()
    if st.st_gid == egid or st.st_gid in groups:
        return st.st_mode & stat.S_IRGRP != 0

    return st.st_mode & stat.S_IROTH != 0
项目:x-mario-center    作者:fossasia    | 项目源码 | 文件源码
def test_safe_makedirs(self):
        from softwarecenter.utils import safe_makedirs
        from tempfile import mkdtemp
        tmp = mkdtemp()
        # create base dir
        target = os.path.join(tmp, "foo", "bar")
        safe_makedirs(target)
        # we need the patch to ensure that the code is actually executed
        with patch("os.path.exists") as mock_:
            mock_.return_value = False
            self.assertTrue(os.path.isdir(target))
            # ensure that creating the base dir again does not crash
            safe_makedirs(target)
            self.assertTrue(os.path.isdir(target))
            # ensure we still get regular errors like permission denied
            # (stat.S_IRUSR)
            os.chmod(os.path.join(tmp, "foo"), 0400)
            self.assertRaises(OSError, safe_makedirs, target)
            # set back to stat.(S_IRUSR|S_IWUSR|S_IXUSR) to make rmtree work
            os.chmod(os.path.join(tmp, "foo"), 0700)
        # cleanup
        shutil.rmtree(tmp)
项目:x-mario-center    作者:fossasia    | 项目源码 | 文件源码
def test_no_write_access_for_cache_dir(self):
        """ test for bug LP: #688682 """
        # make the test cache dir non-writeable
        import softwarecenter.paths
        cache_dir = softwarecenter.paths.SOFTWARE_CENTER_CACHE_DIR
        # set not-writable (mode 0400)
        os.chmod(cache_dir, stat.S_IRUSR)
        self.assertFalse(os.access(cache_dir, os.W_OK))
        # and then start up the logger
        import softwarecenter.log
        softwarecenter.log # pyflakes
        # check that the old directory was moved aside (renamed with a ".0" appended)
        self.assertTrue(os.path.exists(cache_dir + ".0"))
        self.assertFalse(os.path.exists(cache_dir + ".1"))
        # and check that the new directory was created and is now writable
        self.assertTrue(os.path.exists(cache_dir))
        self.assertTrue(os.access(cache_dir, os.W_OK))
项目:charm-helpers    作者:juju    | 项目源码 | 文件源码
def apply_playbook(playbook, tags=None, extra_vars=None):
    tags = tags or []
    tags = ",".join(tags)
    charmhelpers.contrib.templating.contexts.juju_state_to_yaml(
        ansible_vars_path, namespace_separator='__',
        allow_hyphens_in_keys=False, mode=(stat.S_IRUSR | stat.S_IWUSR))

    # we want ansible's log output to be unbuffered
    env = os.environ.copy()
    env['PYTHONUNBUFFERED'] = "1"
    call = [
        'ansible-playbook',
        '-c',
        'local',
        playbook,
    ]
    if tags:
        call.extend(['--tags', '{}'.format(tags)])
    if extra_vars:
        extra = ["%s=%s" % (k, v) for k, v in extra_vars.items()]
        call.extend(['--extra-vars', " ".join(extra)])
    subprocess.check_call(call, env=env)
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def testPermission(self):
        with TemporaryDirectory() as tmp:
            d = os.path.join(tmp, "dir")
            os.mkdir(d)
            with open(os.path.join(d, "file"), "w") as f:
                f.write("data")

            os.chmod(d, stat.S_IRUSR | stat.S_IXUSR)
            self.assertRaises(BuildError, removePath, tmp)
            os.chmod(d, stat.S_IRWXU)