Python os 模块,R_OK 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.R_OK

项目:openstack-deploy    作者:yaoice    | 项目源码 | 文件源码
def copy_from_host(module):
    compress = module.params.get('compress')
    src = module.params.get('src')

    if not os.path.exists(src):
        module.fail_json(msg="file not found: {}".format(src))
    if not os.access(src, os.R_OK):
        module.fail_json(msg="file is not readable: {}".format(src))

    mode = oct(os.stat(src).st_mode & 0o777)

    with open(src, 'rb') as f:
        raw_data = f.read()

    sha1 = hashlib.sha1(raw_data).hexdigest()
    data = zlib.compress(raw_data) if compress else raw_data

    module.exit_json(content=base64.b64encode(data), sha1=sha1, mode=mode,
                     source=src)
项目:cemu    作者:hugsy    | 项目源码 | 文件源码
def populate_memory(self, areas):
        for name, address, size, permission, input_file in areas:
            perm = self.unicorn_permissions(permission)
            self.vm.mem_map(address, size, perm)
            self.areas[name] = [address, size, permission,]

            msg = "Map %s @%x (size=%d,perm=%s)" % (name, address, size, permission)
            if input_file is not None and os.access(input_file, os.R_OK):
                code = open(input_file, 'rb').read()
                self.vm.mem_write(address, bytes(code[:size]))
                msg += " and content from '%s'" % input_file

            self.log(msg, "Setup")

        self.start_addr = self.areas[".text"][0]
        self.end_addr = -1
        return True
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def _test_NoAccessDir(self, nodeName):
        devBooter, devMgr = self.launchDeviceManager("/nodes/%s/DeviceManager.dcd.xml" % nodeName)
        device = devMgr._get_registeredDevices()[0]
        fileMgr = self._domMgr._get_fileMgr()

        dirname = '/noaccess'
        testdir = os.path.join(scatest.getSdrPath(), 'dom' + dirname)
        if not os.path.exists(testdir):
            os.mkdir(testdir, 0000)
        else:
            os.chmod(testdir, 0000)

        try:
            self.assertFalse(os.access(testdir, os.R_OK|os.X_OK), 'Current user can still access directory')
            self.assertRaises(CF.LoadableDevice.LoadFail, device.load, fileMgr, dirname, CF.LoadableDevice.SHARED_LIBRARY)                                                                                                                  
        finally:
            os.rmdir(testdir)
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def test_ExistsException(self):
        self.assertNotEqual(self._domMgr, None)
        fileMgr = self._domMgr._get_fileMgr()

        # Makes sure that FileSystem::exists() throws correct exception and
        # doesn't kill domain for files in directories it cannot access
        dirname = '/noaccess'
        testdir = os.path.join(scatest.getSdrPath(), 'dom' + dirname)
        if not os.path.exists(testdir):
            os.mkdir(testdir, 0644)
        else:
            os.chmod(testdir, 0644)

        try:
            self.assertFalse(os.access(testdir, os.R_OK|os.X_OK), 'Current user can still access directory')
            self.assertRaises(CF.InvalidFileName, fileMgr.exists, os.path.join(dirname, 'testfile'))
        finally:
            os.rmdir(testdir)
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def check_fastq(fastq):
    # Check if fastq is readable
    if not os.access(fastq, os.R_OK):
        martian.exit("Do not have file read permission for FASTQ file: %s" % fastq)

    # Check if fastq is gzipped
    is_gzip_fastq = True
    try:
        with gzip.open(fastq) as f:
            f.read(1)
    except:
        is_gzip_fastq = False

    if is_gzip_fastq and not fastq.endswith(cr_constants.GZIP_SUFFIX):
        martian.exit("Input FASTQ file is gzipped but filename does not have %s suffix: %s" % (fastq, cr_constants.GZIP_SUFFIX))
    if not is_gzip_fastq and fastq.endswith(cr_constants.GZIP_SUFFIX):
        martian.exit("Input FASTQ file is not gzipped but filename has %s suffix: %s" % (fastq, cr_constants.GZIP_SUFFIX))
项目:mbootuz    作者:ckhung    | 项目源码 | 文件源码
def find_boot_files(name, shortname, basedir):
# find vmlinuz or initrd
    if name:
        fullpath = name if name[0]=='/' else basedir + '/boot/' + name
    else:
        # try the (only) symlink at the root directory
        try1 = basedir + '/' + shortname + '*'
        found = sorted(glob.glob(try1))
        if len(found) >= 1 and os.access(found[0], os.R_OK):
            fullpath = os.path.realpath(found[0])
        else:
            # try the highest numbered version at /boot
            try2 = basedir + '/boot/' + shortname + '*'
            found = sorted(glob.glob(try2))
            if len(found) < 1:
                sys.exit('cannot read ' + try1 + ' and cannot find ' + try2)
            fullpath = found[-1]
            if (len(found) > 1):
                warnings.warn('found more than one ' + try2 + ' , using ' + fullpath)
    if not os.access(fullpath, os.R_OK):
        sys.exit('failed to read ' + fullpath)
    return fullpath
项目:kolla-kubernetes-personal    作者:rthallisey    | 项目源码 | 文件源码
def copy_from_host(module):
    compress = module.params.get('compress')
    src = module.params.get('src')

    if not os.path.exists(src):
        module.fail_json(msg="file not found: {}".format(src))
    if not os.access(src, os.R_OK):
        module.fail_json(msg="file is not readable: {}".format(src))

    mode = oct(os.stat(src).st_mode & 0o777)

    with open(src, 'rb') as f:
        raw_data = f.read()

    sha1 = hashlib.sha1(raw_data).hexdigest()
    data = zlib.compress(raw_data) if compress else raw_data

    module.exit_json(content=base64.b64encode(data), sha1=sha1, mode=mode,
                     source=src)
项目:sc-controller    作者:kozec    | 项目源码 | 文件源码
def check_access(filename, write_required=True):
    """
    Checks if user has read and optionaly write access to specified file.
    Uses acl first and possix file permisions if acl cannot be used.
    Returns true only if user has both required access rights.
    """
    if HAVE_POSIX1E:
        for pset in posix1e.ACL(file=filename):
            if pset.tag_type == posix1e.ACL_USER and pset.qualifier == os.geteuid():
                if pset.permset.test(posix1e.ACL_READ) and (not write_required or pset.permset.test(posix1e.ACL_WRITE)):
                    return True
            if pset.tag_type == posix1e.ACL_GROUP and pset.qualifier in os.getgroups():
                if pset.permset.test(posix1e.ACL_READ) and (not write_required or pset.permset.test(posix1e.ACL_WRITE)):
                    return True
    if write_required:
        return os.access(filename, os.R_OK | os.W_OK)
    return os.access(filename, os.R_OK)
项目:enigma2    作者:OpenLD    | 项目源码 | 文件源码
def checkUSBStick(self):
        self.target_dir = None
        allpartitions = [ (r.description, r.mountpoint) for r in harddiskmanager.getMountedPartitions(onlyhotplug = True)]
        print "[checkUSBStick] found partitions:", allpartitions
        usbpartition = []
        for x in allpartitions:
            print x, x[1] == '/', x[0].find("USB"), access(x[1], R_OK)
            if x[1] != '/' and x[0].find("USB") > -1:  # and access(x[1], R_OK) is True:
                usbpartition.append(x)

        print usbpartition
        if len(usbpartition) == 1:
            self.target_dir = usbpartition[0][1]
            self.md5_passback = self.getFeed
            self.md5_failback = self.askStartWizard
            self.md5verify(self.stickimage_md5, self.target_dir)
        elif not usbpartition:
            print "[NFIFlash] needs to create usb flasher stick first!"
            self.askStartWizard()
        else:
            self.askStartWizard()
项目:python-application    作者:AGProjects    | 项目源码 | 文件源码
def __new__(cls, filename):
        files = []
        timestamp = 0
        for path in process.get_config_directories():
            config_file = os.path.realpath(os.path.join(path, filename))
            if config_file not in files and os.access(config_file, os.R_OK):
                try:
                    timestamp = max(timestamp, os.stat(config_file).st_mtime)
                except (OSError, IOError):
                    continue
                files.append(config_file)

        instance = cls.instances.get(filename, None)
        if instance is None or instance.files != files or instance.timestamp < timestamp:
            instance = object.__new__(cls)
            instance.parser = SafeConfigParser()
            instance.parser.optionxform = lambda x: x.replace('-', '_')
            instance.files = instance.parser.read(files)
            instance.filename = filename
            instance.timestamp = timestamp
            cls.instances[filename] = instance
        return instance
项目:civet    作者:TheJacksonLaboratory    | 项目源码 | 文件源码
def effectivelyReadable(self):
        uid = os.getuid()
        euid = os.geteuid()
        gid = os.getgid()
        egid = os.getegid()

        # This is probably true most of the time, so just let os.access()
        # handle it.  Avoids potential bugs in the rest of this function.
        if uid == euid and gid == egid:
            return os.access(self.name, os.R_OK)

        st = os.stat(self.name)

        # This may be wrong depending on the semantics of your OS.
        # i.e. if the file is -------r--, does the owner have access or not?
        if st.st_uid == euid:
            return st.st_mode & stat.S_IRUSR != 0

        # See comment for UID check above.
        groups = os.getgroups()
        if st.st_gid == egid or st.st_gid in groups:
            return st.st_mode & stat.S_IRGRP != 0

        return st.st_mode & stat.S_IROTH != 0
项目:civet    作者:TheJacksonLaboratory    | 项目源码 | 文件源码
def effectivelyReadable(self):
        uid = os.getuid()
        euid = os.geteuid()
        gid = os.getgid()
        egid = os.getegid()

        # This is probably true most of the time, so just let os.access()
        # handle it.  Avoids potential bugs in the rest of this function.
        if uid == euid and gid == egid:
            return os.access(self.name, os.R_OK)

        st = os.stat(self.name)

        # This may be wrong depending on the semantics of your OS.
        # i.e. if the file is -------r--, does the owner have access or not?
        if st.st_uid == euid:
            return st.st_mode & stat.S_IRUSR != 0

        # See comment for UID check above.
        groups = os.getgroups()
        if st.st_gid == egid or st.st_gid in groups:
            return st.st_mode & stat.S_IRGRP != 0

        return st.st_mode & stat.S_IROTH != 0
项目:ansible-modules    作者:cytopia    | 项目源码 | 文件源码
def cfndiff_module_validation(module):
    '''
    Validate for correct module call/usage in ansible.
    '''
    # Boto3 is required!
    if not HAS_BOTO3:
        module.fail_json(msg='boto3 is required. Try pip install boto3')

    # cfn_flip is required!
    if not HAS_CFN_FLIP:
        module.fail_json(msg='cfn_flip is required. Try pip install cfn_flip')

    template = module.params['template']
    b_template = to_bytes(template, errors='surrogate_or_strict')

    # Validate path of template
    if not os.path.exists(b_template):
        module.fail_json(msg="template %s not found" % (template))
    if not os.access(b_template, os.R_OK):
        module.fail_json(msg="template %s not readable" % (template))
    if os.path.isdir(b_template):
        module.fail_json(msg="diff does not support recursive diff of directory: %s" % (template))

    return module
项目:dingdang-robot    作者:wzpan    | 项目源码 | 文件源码
def get_pip_requirements(fname=os.path.join(dingdangpath.LIB_PATH,
                                            'requirements.txt')):
    """
    Gets the PIP requirements from a text file. If the files does not exists
    or is not readable, it returns None

    Arguments:
        fname -- (optional) the requirement text file (Default:
                 "client/requirements.txt")

    Returns:
        A list of pip requirement objects or None
    """
    logger = logging.getLogger(__name__)
    if os.access(fname, os.R_OK):
        reqs = list(pip.req.parse_requirements(fname))
        logger.debug("Found %d PIP requirements in file '%s'", len(reqs),
                     fname)
        return reqs
    else:
        logger.debug("PIP requirements file '%s' not found or not readable",
                     fname)
项目:intera_sdk    作者:RethinkRobotics    | 项目源码 | 文件源码
def _setup_image(self, image_path):
        """
        Load the image located at the specified path

        @type image_path: str
        @param image_path: the relative or absolute file path to the image file

        @rtype: sensor_msgs/Image or None
        @param: Returns sensor_msgs/Image if image convertable and None otherwise
        """
        if not os.access(image_path, os.R_OK):
            rospy.logerr("Cannot read file at '{0}'".format(image_path))
            return None

        img = cv2.imread(image_path)
        # Return msg
        return cv_bridge.CvBridge().cv2_to_imgmsg(img, encoding="bgr8")
项目:django-gstorage    作者:fyndiq    | 项目源码 | 文件源码
def copydir(self, path):
        """
        Copy the contents of the local directory given by path to google cloud.
        Maintain the same directory structure on remote.
        This is (intentionally) a blocking call, so clients can report errors if
        the transfer fails.

        :type path: string
        :param path: relative or absolute path to the directory that needs to be copied
        :return: True when transfer is complete
        :raises OSError: path doesn't exist or permission denied
        :raises ValueError: if the library cannot determine the file size
        :raises gcloud.exceptions.GCloudError: if upload status gives error response
        """
        if not os.access(path, os.R_OK):
            raise OSError('Permission denied')
        for filename in find_files(path):
            blob = Blob(filename, self)
            blob.upload_from_filename(filename)
        return True
项目:SameKeyProxy    作者:xzhou    | 项目源码 | 文件源码
def register(self,name,URI):
        origname,name=name,self.validateName(name)
        URI=self.validateURI(URI)
        fn=self.translate(name)
        self.lock.acquire()
        try:
            if os.access(fn,os.R_OK):
                Log.msg('NameServer','name already exists:',name)
                raise Pyro.errors.NamingError('name already exists',name)
            try:
                open(fn,'w').write(str(URI)+'\n')
                self._dosynccall("register",origname,URI)
                Log.msg('NameServer','registered',name,'with URI',str(URI))
            except IOError,x:
                if x.errno==errno.ENOENT:
                    raise Pyro.errors.NamingError('(parent)group not found')
                elif x.errno==errno.ENOTDIR:
                    raise Pyro.errors.NamingError('parent is no group')
                else:
                    raise Pyro.errors.NamingError(str(x))
        finally:
            self.lock.release()
项目:SameKeyProxy    作者:xzhou    | 项目源码 | 文件源码
def deleteGroup(self,groupname):
        groupname=self.validateName(groupname)
        if groupname==':':
            Log.msg('NameServer','attempt to deleteGroup root group')
            raise Pyro.errors.NamingError('not allowed to delete root group')
        dirnam = self.translate(groupname)
        self.lock.acquire()
        try:
            if not os.access(dirnam,os.R_OK):
                raise Pyro.errors.NamingError('group not found',groupname)
            try:
                shutil.rmtree(dirnam)
                self._dosynccall("deleteGroup",groupname)
                Log.msg('NameServer','deleted group',groupname)
            except OSError,x:
                if x.errno==errno.ENOENT:
                    raise Pyro.errors.NamingError('group not found',groupname)
                elif x.errno==errno.ENOTDIR:
                    raise Pyro.errors.NamingError('is no group',groupname)
                else:
                    raise Pyro.errors.NamingError(str(x))
        finally:
            self.lock.release()
项目:demos    作者:dfirence    | 项目源码 | 文件源码
def set_imageinfo( memoryFilePath ):
    path = r'{}'.format( memoryFilePath )
    path = os.path.abspath( path )
    try:
        if os.access( path, os.F_OK):
            if  os.access( path, os.R_OK ):
                cwd       = os.getcwd()
                imageinfo = [ 'vol.py', '-f', '{}'.format( path ), 'imageinfo',  \
                                        '--output=text', \
                                        '--output-file={}'.format( os.path.join( cwd, 'imageinfo.text' ))]
                return imageinfo
            else:
                print '\n[!] Error File Permissions: No Read Access for {}\n'.format( path )
        else:
            print '\n[!] Error FilePath:  Does not exist {}\n'.format( path )

    except Exception as set_imageinfo_error:
        print '[!] EXCEPTION ERROR:  < set_imageinfo > function'
        print set_imageinfo_error
项目:kolla-ansible    作者:openstack    | 项目源码 | 文件源码
def copy_from_host(module):
    compress = module.params.get('compress')
    src = module.params.get('src')

    if not os.path.exists(src):
        module.fail_json(msg="file not found: {}".format(src))
    if not os.access(src, os.R_OK):
        module.fail_json(msg="file is not readable: {}".format(src))

    mode = oct(os.stat(src).st_mode & 0o777)

    with open(src, 'rb') as f:
        raw_data = f.read()

    sha1 = hashlib.sha1(raw_data).hexdigest()
    data = zlib.compress(raw_data) if compress else raw_data

    module.exit_json(content=base64.b64encode(data), sha1=sha1, mode=mode,
                     source=src)
项目:cos-python-sdk-v4    作者:tencentyun    | 项目源码 | 文件源码
def check_local_file_valid(self, local_path):
        """????????(??????)

        :param local_path:
        :return:
        """
        if not os.path.exists(local_path):
            self._err_tips = 'local_file %s not exist!' % local_path
            return False
        if not os.path.isfile(local_path):
            self._err_tips = 'local_file %s is not regular file!' % local_path
            return False
        if not os.access(local_path, os.R_OK):
            self._err_tips = 'local_file %s is not readable!' % local_path
            return False
        return True
项目:contrail-ansible    作者:Juniper    | 项目源码 | 文件源码
def copy_from_host(module):
    compress = module.params.get('compress')
    src = module.params.get('src')

    if not os.path.exists(src):
        module.fail_json(msg="file not found: {}".format(src))
    if not os.access(src, os.R_OK):
        module.fail_json(msg="file is not readable: {}".format(src))

    mode = oct(os.stat(src).st_mode & 0o777)

    with open(src, 'rb') as f:
        raw_data = f.read()

    sha1 = hashlib.sha1(raw_data).hexdigest()
    data = zlib.compress(raw_data) if compress else raw_data

    module.exit_json(content=base64.b64encode(data), sha1=sha1, mode=mode,
                     source=src)
项目:charm-ceph-mon    作者:openstack    | 项目源码 | 文件源码
def check_file_freshness(filename, newer_than=3600):
    """
    Check a file exists, is readable and is newer than <n> seconds (where
    <n> defaults to 3600).
    """
    # First check the file exists and is readable
    if not os.path.exists(filename):
        raise CriticalError("%s: does not exist." % (filename))
    if os.access(filename, os.R_OK) == 0:
        raise CriticalError("%s: is not readable." % (filename))

    # Then ensure the file is up-to-date enough
    mtime = os.stat(filename).st_mtime
    last_modified = time.time() - mtime
    if last_modified > newer_than:
        raise CriticalError("%s: was last modified on %s and is too old "
                            "(> %s seconds)."
                            % (filename, time.ctime(mtime), newer_than))
    if last_modified < 0:
        raise CriticalError("%s: was last modified on %s which is in the "
                            "future."
                            % (filename, time.ctime(mtime)))
项目:gopythongo    作者:gopythongo    | 项目源码 | 文件源码
def validate_args(self, args: configargparse.Namespace) -> None:
        _docker_args.validate_shared_args(args)

        if not args.docker_buildfile:
            raise ErrorMessage("Using the docker builder requires you to specify a Dockerfile template via "
                               "--docker-buildfile.")

        if not os.path.exists(args.docker_buildfile) or not os.access(args.docker_buildfile, os.R_OK):
            raise ErrorMessage("It seems that GoPythonGo can't find or isn't allowed to read %s" %
                               highlight(args.docker_buildfile))

        for arg in args.docker_buildargs:
            if "=" not in arg:
                raise ErrorMessage("A Docker build arg must be in the form 'key=value'. Consult the %s "
                                   "documentation for more information. '%s' does not contain a '='." %
                                   (highlight("docker build"), arg))

        for var in args.dockerfile_vars:
            if "=" not in var:
                raise ErrorMessage("A Dockerfile Jinja template context variable must be in the form 'key=value'. "
                                   "'%s' does not contain a '='" % var)
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def user_password(self):
        passwd = ''
        if HAVE_SPWD:
            try:
                passwd = spwd.getspnam(self.name)[1]
            except KeyError:
                return passwd
        if not self.user_exists():
            return passwd
        elif self.SHADOWFILE:
            # Read shadow file for user's encrypted password string
            if os.path.exists(self.SHADOWFILE) and os.access(self.SHADOWFILE, os.R_OK):
                for line in open(self.SHADOWFILE).readlines():
                    if line.startswith('%s:' % self.name):
                        passwd = line.split(':')[1]
        return passwd
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def get_pid_location(module):
    """
    Try to find a pid directory in the common locations, falling 
    back to the user's home directory if no others exist
    """
    for dir in ['/var/run', '/var/lib/run', '/run', os.path.expanduser("~/")]:
        try:
            if os.path.isdir(dir) and os.access(dir, os.R_OK|os.W_OK):
                return os.path.join(dir, '.accelerate.pid')
        except:
            pass
    module.fail_json(msg="couldn't find any valid directory to use for the accelerate pid file")


# NOTE: this shares a fair amount of code in common with async_wrapper, if async_wrapper were a new module we could move
# this into utils.module_common and probably should anyway
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def _configure_base(module, base, conf_file, disable_gpg_check):
    """Configure the dnf Base object."""
    conf = base.conf

    # Turn off debug messages in the output
    conf.debuglevel = 0

    # Set whether to check gpg signatures
    conf.gpgcheck = not disable_gpg_check

    # Don't prompt for user confirmations
    conf.assumeyes = True

    # Change the configuration file path if provided
    if conf_file:
        # Fail if we can't read the configuration file.
        if not os.access(conf_file, os.R_OK):
            module.fail_json(
                msg="cannot read configuration file", conf_file=conf_file)
        else:
            conf.config_file_path = conf_file

    # Read the configuration file
    conf.read()
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def get_file_content(path, default=None, strip=True):
    data = default
    if os.path.exists(path) and os.access(path, os.R_OK):
        try:
            try:
                datafile = open(path)
                data = datafile.read()
                if strip:
                    data = data.strip()
                if len(data) == 0:
                    data = default
            finally:
                datafile.close()
        except:
            # ignore errors as some jails/containers might have readable permissions but not allow reads to proc
            # done in 2 blocks for 2.4 compat
            pass
    return data
项目:cemu    作者:hugsy    | 项目源码 | 文件源码
def getMappingsFromTable(self):
        self._maps = []
        sz = self.memory_mapping.rowCount()
        for i in range(sz):
            name = self.memory_mapping.item(i, 0)
            if not name:
                continue
            name = name.text()

            address = self.memory_mapping.item(i, 1)
            if address:
                if ishex(address.text()):
                    address = int(address.text(), 0x10)
                else:
                    address = int(address.text())

            size = self.memory_mapping.item(i, 2)
            if size:
                size = int(size.text(), 0x10) if ishex(size.text()) else int(size.text())

            permission = self.memory_mapping.item(i, 3)
            if permission:
                permission = permission.text()

            read_from_file = self.memory_mapping.item(i, 4)
            if read_from_file and not os.access(read_from_file.text(), os.R_OK):
                read_from_file = None

            self._maps.append([name, address, size, permission, read_from_file])
        return
项目:cemu    作者:hugsy    | 项目源码 | 文件源码
def loadCode(self, title, filter, run_disassembler):
        qFile, qFilter = QFileDialog.getOpenFileName(self, title, EXAMPLES_PATH, filter)

        if not os.access(qFile, os.R_OK):
            return

        if run_disassembler or qFile.endswith(".raw"):
            body = disassemble_file(qFile, self.arch)
            self.loadFile(qFile, data=body)
        else:
            self.loadFile(qFile)
        return
项目:PyWallet    作者:AndreMiras    | 项目源码 | 文件源码
def test_get_default_keystore_path(self):
        """
        Checks we the default keystore directory exists or create it.
        Verify the path is correct and that we have read/write access to it.
        """
        keystore_dir = PyWalib.get_default_keystore_path()
        if not os.path.exists(keystore_dir):
            os.makedirs(keystore_dir)
        # checks path correctness
        self.assertTrue(keystore_dir.endswith(".config/pyethapp/keystore/"))
        # checks read/write access
        self.assertEqual(os.access(keystore_dir, os.R_OK), True)
        self.assertEqual(os.access(keystore_dir, os.W_OK), True)
项目:openstack-deploy    作者:yaoice    | 项目源码 | 文件源码
def read_file(filename):
    filename_path = os.path.join('/etc/ceph', filename)

    if not os.path.exists(filename_path):
        json_exit("file not found: {}".format(filename_path), failed=True)
    if not os.access(filename_path, os.R_OK):
        json_exit("file not readable: {}".format(filename_path), failed=True)

    with open(filename_path, 'rb') as f:
        raw_data = f.read()

    return {'content': base64.b64encode(zlib.compress(raw_data)),
            'sha1': hashlib.sha1(raw_data).hexdigest(),
            'filename': filename}
项目:openstack-deploy    作者:yaoice    | 项目源码 | 文件源码
def copy_to_host(module):
    compress = module.params.get('compress')
    dest = module.params.get('dest')
    mode = int(module.params.get('mode'), 0)
    sha1 = module.params.get('sha1')
    src = module.params.get('src')

    data = base64.b64decode(src)
    raw_data = zlib.decompress(data) if compress else data

    if sha1:
        if os.path.exists(dest):
            if os.access(dest, os.R_OK):
                with open(dest, 'rb') as f:
                    if hashlib.sha1(f.read()).hexdigest() == sha1:
                        module.exit_json(changed=False)
            else:
                module.exit_json(failed=True, changed=False,
                                 msg='file is not accessible: {}'.format(dest))

        if sha1 != hashlib.sha1(raw_data).hexdigest():
            module.exit_json(failed=True, changed=False,
                             msg='sha1 sum does not match data')

    with os.fdopen(os.open(dest, os.O_WRONLY | os.O_CREAT, mode), 'wb') as f:
        f.write(raw_data)

    module.exit_json(changed=True)
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def test_NoWriteCache(self):
        cachedir = os.getcwd()+'/sdr/cache/.BasicTestDevice_node'
        (status,output) = commands.getstatusoutput('mkdir -p '+cachedir)
        (status,output) = commands.getstatusoutput('chmod 000 '+cachedir)
        self.assertFalse(os.access(cachedir, os.R_OK|os.W_OK|os.X_OK), 'Current user can still access directory')
        devmgr_nb, devMgr = self.launchDeviceManager("/nodes/test_BasicTestDevice_node/DeviceManager.dcd.xml")
        self.assertEquals(255, devmgr_nb.returncode)
        self.assertEquals(devMgr, None)
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def main(args, outs):
    if not (args.filtered_matrices_h5 and os.path.exists(args.filtered_matrices_h5)):
        martian.exit("Filtered matrices do not exist: %s" % args.filtered_matrices_h5)

    if not os.access(args.filtered_matrices_h5, os.R_OK):
        martian.exit("Filtered matrices file is not readable, please check file permissions: %s" % args.filtered_matrices_h5)

    h5_filetype = cr_utils.get_h5_filetype(args.filtered_matrices_h5)
    if h5_filetype and h5_filetype != cr_matrix.MATRIX_H5_FILETYPE:
        martian.exit("Input is a %s file, but a matrix file is required" % h5_filetype)

    flt_genomes = cr_matrix.GeneBCMatrices.load_genomes_from_h5(args.filtered_matrices_h5)

    if len(flt_genomes) != 1:
        martian.exit("Reanalyzer only supports matrices with one genome. This matrix has: %s" % flt_genomes)
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def validate_csv(csv_file, entry_type, entry_colname):
    if not os.path.exists(csv_file):
        martian.exit("Specified %s file does not exist: %s" % (entry_type, csv_file))
    elif not os.access(csv_file, os.R_OK):
        martian.exit("Specified %s file is not readable, please check file permissions: %s" % (entry_type, csv_file))
    with open(csv_file) as f:
        header = f.readline().strip().split(',')
        if header[0] != entry_colname:
            martian.exit("First line of %s file must be a header line, with '%s' as the first column." % (entry_type, entry_colname))
        counts = sum(1 for line in f) # count remaining lines
    if counts == 0:
        martian.exit("Specified %s file must contain at least one entry." % entry_type)
    return counts
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def parse_parameters(filename):
    if filename is None:
        return {}

    if not os.path.exists(filename):
        martian.exit("Parameters file does not exist: %s" % filename)

    if not os.access(filename, os.R_OK):
        martian.exit("Parameters file is not readable, please check file permissions: %s" % filename)

    params = {}
    with open(filename, 'rU') as f:
        # skip comment lines
        ff = filter(lambda row: not row.startswith('#') , f)
        reader = csv.reader(ff)
        for (i, row) in enumerate(reader, start=1):
            if len(row) != 2:
                martian.exit("Row %d is incorrectly formatted (must have exactly 2 columns)" % i)
            name = row[0].strip().lower()
            value = row[1].strip()
            if name not in ANALYSIS_PARAMS:
                martian.exit("Unrecognized parameter: %s" % name)
            if name in params:
                martian.exit("Cannot specify the same parameter twice: %s" % name)
            required_type = ANALYSIS_PARAMS[name]
            try:
                cast_value = required_type(value)
                params[name] = cast_value
            except ValueError:
                martian.exit("Parameter %s could not be cast to the required type: %s" % (name, required_type))

    return params
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def check_runinfo_xml(folder_path):
    """
    :return: path to valid RunInfo.xml in folder_path
    :rtype: string
    """
    hostname = socket.gethostname()
    check_folder("sequencing run", folder_path, hostname)
    runinfo = os.path.join(folder_path, "RunInfo.xml")
    if not os.path.exists(runinfo):
        martian.exit("On machine: %s, RunInfo.xml not found. Cannot verify run was 10X-prepped." % hostname)
    if not os.access(runinfo, os.R_OK):
        martian.exit("On machine: %s, insufficient permission to open RunInfo.xml." % hostname)
    return runinfo
项目:cloud-volume    作者:seung-lab    | 项目源码 | 文件源码
def __init__(self, cloudpath, mip=0, bounded=True, fill_missing=False, 
      cache=False, cdn_cache=True, progress=INTERACTIVE, info=None, provenance=None):

    self.path = lib.extract_path(cloudpath)

    self.progress = progress
    self.mip = mip
    self.bounded = bounded
    self.fill_missing = fill_missing
    self.cache = cache
    self.cdn_cache = cdn_cache

    if self.cache:
      if not os.path.exists(self.cache_path):
        mkdir(self.cache_path)

      if not os.access(self.cache_path, os.R_OK|os.W_OK):
        raise IOError('Cache directory needs read/write permission: ' + self.cache_path)

    if info is None:
      self.refresh_info()
      if self.cache:
        self._check_cached_info_validity()
    else:
      self.info = info

    if provenance is None:
      self.provenance = None
      self.refresh_provenance()
      self._check_cached_provenance_validity()
    else:
      self.provenance = self._cast_provenance(provenance)

    try:
      self.mip = self.available_mips[self.mip]
    except:
      raise Exception("MIP {} has not been generated.".format(self.mip))
项目:pyseeder    作者:PurpleI2P    | 项目源码 | 文件源码
def check_readable(f):
    """Checks if path exists and readable"""
    if not os.path.exists(f) or not os.access(f, os.R_OK):
        raise PyseederException("Error accessing path: {}".format(f))
项目:packagecore    作者:BytePackager    | 项目源码 | 文件源码
def test_generateScriptEnv(self):
        filename = "/tmp/test.sh"

        testFile = "/tmp/test.txt"

        cmds = """
X="${ENV_TEST}"

touch "${X}"

exit 0
"""

        generateScript(filename, cmds, {"ENV_TEST": testFile})

        # check permissions
        self.assertTrue(os.access(filename, os.F_OK))
        self.assertTrue(os.access(filename, os.X_OK | os.R_OK))

        # execute the script and expect it to create the file
        status = subprocess.call([filename])
        self.assertEqual(status, 0)

        # check that the testFile got created by the script
        self.assertTrue(os.access(testFile, os.F_OK))

        # cleanup
        os.remove(filename)
        os.remove(testFile)
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def _select_config_file_path():
    """
    Return an openATTIC configuration pathname
    """
    possible_paths = ("/etc/default/openattic", "/etc/sysconfig/openattic")
    for path in possible_paths:
        if os.access(path, os.F_OK) and os.access(path, os.R_OK | os.W_OK):
            return path
    raise CommandExecutionError(
        ("No openATTIC config file found in the following locations: "
         "{}".format(possible_paths)))
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def find_on_path(importer, path_item, only=False):
    """Yield distributions accessible on a sys.path directory"""
    path_item = _normalize_cached(path_item)

    if os.path.isdir(path_item) and os.access(path_item, os.R_OK):
        if path_item.lower().endswith('.egg'):
            # unpacked egg
            yield Distribution.from_filename(
                path_item, metadata=PathMetadata(
                    path_item, os.path.join(path_item,'EGG-INFO')
                )
            )
        else:
            # scan for .egg and .egg-info in directory
            for entry in os.listdir(path_item):
                lower = entry.lower()
                if lower.endswith('.egg-info') or lower.endswith('.dist-info'):
                    fullpath = os.path.join(path_item, entry)
                    if os.path.isdir(fullpath):
                        # egg-info directory, allow getting metadata
                        metadata = PathMetadata(path_item, fullpath)
                    else:
                        metadata = FileMetadata(fullpath)
                    yield Distribution.from_location(
                        path_item,entry,metadata,precedence=DEVELOP_DIST
                    )
                elif not only and lower.endswith('.egg'):
                    for dist in find_distributions(os.path.join(path_item, entry)):
                        yield dist
                elif not only and lower.endswith('.egg-link'):
                    entry_file = open(os.path.join(path_item, entry))
                    try:
                        entry_lines = entry_file.readlines()
                    finally:
                        entry_file.close()
                    for line in entry_lines:
                        if not line.strip(): continue
                        for item in find_distributions(os.path.join(path_item,line.rstrip())):
                            yield item
                        break
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def find_on_path(importer, path_item, only=False):
    """Yield distributions accessible on a sys.path directory"""
    path_item = _normalize_cached(path_item)

    if os.path.isdir(path_item) and os.access(path_item, os.R_OK):
        if path_item.lower().endswith('.egg'):
            # unpacked egg
            yield Distribution.from_filename(
                path_item, metadata=PathMetadata(
                    path_item, os.path.join(path_item,'EGG-INFO')
                )
            )
        else:
            # scan for .egg and .egg-info in directory
            for entry in os.listdir(path_item):
                lower = entry.lower()
                if lower.endswith('.egg-info') or lower.endswith('.dist-info'):
                    fullpath = os.path.join(path_item, entry)
                    if os.path.isdir(fullpath):
                        # egg-info directory, allow getting metadata
                        metadata = PathMetadata(path_item, fullpath)
                    else:
                        metadata = FileMetadata(fullpath)
                    yield Distribution.from_location(
                        path_item,entry,metadata,precedence=DEVELOP_DIST
                    )
                elif not only and lower.endswith('.egg'):
                    for dist in find_distributions(os.path.join(path_item, entry)):
                        yield dist
                elif not only and lower.endswith('.egg-link'):
                    entry_file = open(os.path.join(path_item, entry))
                    try:
                        entry_lines = entry_file.readlines()
                    finally:
                        entry_file.close()
                    for line in entry_lines:
                        if not line.strip(): continue
                        for item in find_distributions(os.path.join(path_item,line.rstrip())):
                            yield item
                        break
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def _setup(self):
        """
        sets up the SMIME.SMIME instance
        and loads the CA certificates store
        """
        smime = SMIME.SMIME()
        st = X509.X509_Store()
        if not os.access(self._certstore, os.R_OK):
            raise VerifierError, "cannot access %s" % self._certstore
        st.load_info(self._certstore)
        smime.set_x509_store(st)
        self._smime = smime
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def __init__(self, filename, flag='c', mode=None, format='pickle', *args, **kwds):
        self.flag = flag                    # r=readonly, c=create, or n=new
        self.mode = mode                    # None or an octal triple like 0644
        self.format = format                # 'csv', 'json', or 'pickle'
        self.filename = filename
        if flag != 'n' and os.access(filename, os.R_OK):
            fileobj = open(filename, 'rb' if format=='pickle' else 'r')
            with fileobj:
                self.load(fileobj)
        dict.__init__(self, *args, **kwds)
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def find_on_path(importer, path_item, only=False):
    """Yield distributions accessible on a sys.path directory"""
    path_item = _normalize_cached(path_item)

    if os.path.isdir(path_item) and os.access(path_item, os.R_OK):
        if _is_unpacked_egg(path_item):
            yield Distribution.from_filename(
                path_item, metadata=PathMetadata(
                    path_item, os.path.join(path_item,'EGG-INFO')
                )
            )
        else:
            # scan for .egg and .egg-info in directory
            for entry in os.listdir(path_item):
                lower = entry.lower()
                if lower.endswith('.egg-info') or lower.endswith('.dist-info'):
                    fullpath = os.path.join(path_item, entry)
                    if os.path.isdir(fullpath):
                        # egg-info directory, allow getting metadata
                        metadata = PathMetadata(path_item, fullpath)
                    else:
                        metadata = FileMetadata(fullpath)
                    yield Distribution.from_location(
                        path_item, entry, metadata, precedence=DEVELOP_DIST
                    )
                elif not only and _is_unpacked_egg(entry):
                    dists = find_distributions(os.path.join(path_item, entry))
                    for dist in dists:
                        yield dist
                elif not only and lower.endswith('.egg-link'):
                    with open(os.path.join(path_item, entry)) as entry_file:
                        entry_lines = entry_file.readlines()
                    for line in entry_lines:
                        if not line.strip():
                            continue
                        path = os.path.join(path_item, line.rstrip())
                        dists = find_distributions(path)
                        for item in dists:
                            yield item
                        break
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def convert(self, value, param, ctx):
        rv = value

        is_dash = self.file_okay and self.allow_dash and rv in (b'-', '-')

        if not is_dash:
            if self.resolve_path:
                rv = os.path.realpath(rv)

            try:
                st = os.stat(rv)
            except OSError:
                if not self.exists:
                    return self.coerce_path_result(rv)
                self.fail('%s "%s" does not exist.' % (
                    self.path_type,
                    filename_to_ui(value)
                ), param, ctx)

            if not self.file_okay and stat.S_ISREG(st.st_mode):
                self.fail('%s "%s" is a file.' % (
                    self.path_type,
                    filename_to_ui(value)
                ), param, ctx)
            if not self.dir_okay and stat.S_ISDIR(st.st_mode):
                self.fail('%s "%s" is a directory.' % (
                    self.path_type,
                    filename_to_ui(value)
                ), param, ctx)
            if self.writable and not os.access(value, os.W_OK):
                self.fail('%s "%s" is not writable.' % (
                    self.path_type,
                    filename_to_ui(value)
                ), param, ctx)
            if self.readable and not os.access(value, os.R_OK):
                self.fail('%s "%s" is not readable.' % (
                    self.path_type,
                    filename_to_ui(value)
                ), param, ctx)

        return self.coerce_path_result(rv)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def find_on_path(importer, path_item, only=False):
    """Yield distributions accessible on a sys.path directory"""
    path_item = _normalize_cached(path_item)

    if os.path.isdir(path_item) and os.access(path_item, os.R_OK):
        if _is_unpacked_egg(path_item):
            yield Distribution.from_filename(
                path_item, metadata=PathMetadata(
                    path_item, os.path.join(path_item,'EGG-INFO')
                )
            )
        else:
            # scan for .egg and .egg-info in directory
            for entry in os.listdir(path_item):
                lower = entry.lower()
                if lower.endswith('.egg-info') or lower.endswith('.dist-info'):
                    fullpath = os.path.join(path_item, entry)
                    if os.path.isdir(fullpath):
                        # egg-info directory, allow getting metadata
                        metadata = PathMetadata(path_item, fullpath)
                    else:
                        metadata = FileMetadata(fullpath)
                    yield Distribution.from_location(
                        path_item, entry, metadata, precedence=DEVELOP_DIST
                    )
                elif not only and _is_unpacked_egg(entry):
                    dists = find_distributions(os.path.join(path_item, entry))
                    for dist in dists:
                        yield dist
                elif not only and lower.endswith('.egg-link'):
                    with open(os.path.join(path_item, entry)) as entry_file:
                        entry_lines = entry_file.readlines()
                    for line in entry_lines:
                        if not line.strip():
                            continue
                        path = os.path.join(path_item, line.rstrip())
                        dists = find_distributions(path)
                        for item in dists:
                            yield item
                        break
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def find_on_path(importer, path_item, only=False):
    """Yield distributions accessible on a sys.path directory"""
    path_item = _normalize_cached(path_item)

    if os.path.isdir(path_item) and os.access(path_item, os.R_OK):
        if path_item.lower().endswith('.egg'):
            # unpacked egg
            yield Distribution.from_filename(
                path_item, metadata=PathMetadata(
                    path_item, os.path.join(path_item,'EGG-INFO')
                )
            )
        else:
            # scan for .egg and .egg-info in directory
            for entry in os.listdir(path_item):
                lower = entry.lower()
                if lower.endswith('.egg-info') or lower.endswith('.dist-info'):
                    fullpath = os.path.join(path_item, entry)
                    if os.path.isdir(fullpath):
                        # egg-info directory, allow getting metadata
                        metadata = PathMetadata(path_item, fullpath)
                    else:
                        metadata = FileMetadata(fullpath)
                    yield Distribution.from_location(
                        path_item, entry, metadata, precedence=DEVELOP_DIST
                    )
                elif not only and lower.endswith('.egg'):
                    dists = find_distributions(os.path.join(path_item, entry))
                    for dist in dists:
                        yield dist
                elif not only and lower.endswith('.egg-link'):
                    with open(os.path.join(path_item, entry)) as entry_file:
                        entry_lines = entry_file.readlines()
                    for line in entry_lines:
                        if not line.strip():
                            continue
                        path = os.path.join(path_item, line.rstrip())
                        dists = find_distributions(path)
                        for item in dists:
                            yield item
                        break