Python os 模块,scandir() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.scandir()

项目:ArffFileGeneratorForWeka    作者:unicod3    | 项目源码 | 文件源码
def generate(self):
        """
        Main method of this class it walks in a directory and gets txt files to process
        :return: string name of created arff file.
        """
        folders = [f.path for f in os.scandir(self.folderpath) if f.is_dir()]
        for folder in folders:
            # get txt files from folder path
            files = [f.path for f in os.scandir(folder) if f.name.endswith(".txt")]
            self.empty_counter()
            for f_name in files:
                self.totalCounter = self.count_words(f_name)
                self.counterList[f_name] = self.totalCounter

        arff_file = self.create_arff()  # generate file

        return arff_file
项目:hadan-gcloud    作者:youkpan    | 项目源码 | 文件源码
def __init__(self, dirName):
        """
        Args:
            dirName (string): directory where to load the corpus
        """
        self.MAX_NUMBER_SUBDIR = 10
        self.conversations = []
        __dir = os.path.join(dirName, "dialogs")
        number_subdir = 0
        for sub in tqdm(os.scandir(__dir), desc="Ubuntu dialogs subfolders", total=len(os.listdir(__dir))):
            if number_subdir == self.MAX_NUMBER_SUBDIR:
                print("WARNING: Early stoping, only extracting {} directories".format(self.MAX_NUMBER_SUBDIR))
                return

            if sub.is_dir():
                number_subdir += 1
                for f in os.scandir(sub.path):
                    if f.name.endswith(".tsv"):
                        self.conversations.append({"lines": self.loadLines(f.path)})
项目:SDK    作者:Keypirinha    | 项目源码 | 文件源码
def _do_tree(root_src, root_dest, tmpl_dict, tmpl_ext, tag_delim, level=0):
    if level == 0:
        _mkdir(root_dest)

    for entry in os.scandir(root_src):
        src_path = os.path.join(root_src, entry.name)
        dest_path = os.path.join(root_dest,
                                 do_text(entry.name, tmpl_dict, tag_delim))

        if entry.is_dir():
            _mkdir(dest_path, copy_stats_from=src_path)
            _do_tree(src_path, dest_path, tmpl_dict, tmpl_ext, tag_delim,
                     level + 1)
        elif entry.is_file():
            was_tmpl = False
            for ext in tmpl_ext:
                ext = ext.lower()
                if entry.name.lower().endswith(ext):
                    was_tmpl = True
                    dest_path = dest_path[0:-len(ext)]
                    do_file(src_path, dest_path, tmpl_dict, tag_delim)
                    break

            if not was_tmpl:
                shutil.copy2(src_path, dest_path, follow_symlinks=False)
项目:website-fingerprinting    作者:AxelGoetz    | 项目源码 | 文件源码
def extract_features(feature_extraction, save_dir, data_dir=DATA_DIR, extension=".cell", model_name=""):
    """
    For all of the files in the in the `data_dir` with the `extension` extension, it extracts the features using the `feature_extraction` function.

    @param feature_extraction is a function that takes a trace as an input and returns a list of features (1D)
    @param save_dir is the directory where you save the features for the traces.
        Every in this dir is called `{website}-{id}.cellf` with both `website` and `id` being integers
    @param data_dir is the absolute path to the data directory
    @param extension is the extension of the files that contain the raw traces
    @param model_name is used for printing for what model we are extracting features
    """
    paths = []
    for i, f in enumerate(scandir(data_dir)):
        if f.is_file() and f.name[-len(extension):] == extension:
            paths.append(f.path)

    extract_features_from_files(feature_extraction, paths, save_dir, extension=extension, model_name=model_name)
项目:i3config_patcher    作者:David96    | 项目源码 | 文件源码
def __load_mergers(self):
        mergers = {}
        for f in os.scandir(os.path.join(
                    os.path.dirname(os.path.realpath(__file__)),
                    "mergers")):
            if f.is_file() and f.name.endswith(".py"):
                name = f.name[:-3]
                mod = __import__("mergers.%s" % name, fromlist=["Merger"])
                try:
                    m = getattr(mod, "Merger")
                    if issubclass(m, BaseMerger):
                        merger = m()
                        logging.debug("Found merger for %s", \
                                merger.get_supported_software().keys())
                        for software in merger.get_supported_software():
                            if software in mergers:
                                mergers[software].append(merger)
                            else:
                                mergers[software] = [ merger ]
                except AttributeError:
                    logging.warning("Merger %s found but doesn't implement a Merger class inheriting from BaseMerger", name)
        return mergers
项目:UrbanSearch    作者:urbansearchTUD    | 项目源码 | 文件源码
def read_files_worker(self, directory, queue):
        """ Read all files in a directory and output to the queue. First line
        of every file should contain the index. Worker separates first line
        and parses to dict. Tuple of index and text is added to queue.

        :directory: Source directory containing files
        :queue: Queue to add the tuples to
        """
        for file in os.scandir(directory):
            if file.is_file():
                with open(file.path, 'r', errors='replace') as f:
                    text = f.readlines()
                    try:
                        index = literal_eval(text.pop(0).strip())
                        queue.put((index, '\n'.join(text)), block=True)
                    except IndexError:
                        LOGGER.error('File {0} is not classifyable'
                                     .format(file.path))
        LOGGER.info('File reading worker done.')
项目:chi    作者:rmst    | 项目源码 | 文件源码
def rcollect(path, depth, filter=None):
  filter = filter or (lambda n: not n.startswith('.'))
  path = os.path.expanduser(path)
  if os.path.exists(path):
    for f in os.scandir(path):
      if filter(f.name):
        t = 'undefined'
        try:
          t = 'file' if f.is_file() else 'dir' if f.is_dir() else 'undefined'
        except OSError:
          pass
        if t == 'file':
          yield f
        elif t == 'dir' and depth > 0:
          for e in rcollect(f.path, depth - 1, filter):
            yield e
项目:hatch    作者:ofek    | 项目源码 | 文件源码
def get_new_venv_name(count=1):
    if not os.path.exists(get_venv_dir()):  # no cov
        if count == 1:
            return get_random_venv_name()
        else:
            return sorted(get_random_venv_name() for _ in range(count))

    current_venvs = set(p.name for p in os.scandir(get_venv_dir()))
    new_venvs = set()

    while len(new_venvs) < count:
        name = get_random_venv_name()
        while name in current_venvs or name in new_venvs:  # no cov
            name = get_random_venv_name()
        new_venvs.add(name)

    return new_venvs.pop() if count == 1 else sorted(new_venvs)
项目:Python-Programs    作者:OmkarPathak    | 项目源码 | 文件源码
def organize_junk():
    for entry in os.scandir():
        if entry.is_dir():
            continue
        file_path = Path(entry.name)
        file_format = file_path.suffix.lower()
        if file_format in FILE_FORMATS:
            directory_path = Path(FILE_FORMATS[file_format])
            directory_path.mkdir(exist_ok=True)
            file_path.rename(directory_path.joinpath(file_path))

    try:
        os.mkdir("OTHER-FILES")
    except:
        pass

    for dir in os.scandir():
        try:
            if dir.is_dir():
                os.rmdir(dir)
            else:
                os.rename(os.getcwd() + '/' + str(Path(dir)), os.getcwd() + '/OTHER-FILES/' + str(Path(dir)))
        except:
            pass
项目:dirtools3    作者:kirpit    | 项目源码 | 文件源码
def _iter_items(self, path: str) -> AsyncIterable:
        with os.scandir(path) as directory:
            for item in directory:
                current_level = self._get_depth(item.path)

                # happy scenario, we are in the exact level of requested
                # so return whatever it is, a folder, a file or a symlink.
                if current_level == self._level:
                    yield item
                    continue

                # we did't reach the requested sub level yet
                # so send recursive _scan inside if this is a folder.
                elif current_level < self._level and item.is_dir():
                    async for e in self._iter_items(item.path):
                        yield e

                # and ignore any other scenario including ignoring files and
                # symlinks, if the level is not reached yet.
                continue
项目:DeepQA    作者:Conchylicultor    | 项目源码 | 文件源码
def __init__(self, dirName):
        """
        Args:
            dirName (string): directory where to load the corpus
        """
        self.MAX_NUMBER_SUBDIR = 10
        self.conversations = []
        __dir = os.path.join(dirName, "dialogs")
        number_subdir = 0
        for sub in tqdm(os.scandir(__dir), desc="Ubuntu dialogs subfolders", total=len(os.listdir(__dir))):
            if number_subdir == self.MAX_NUMBER_SUBDIR:
                print("WARNING: Early stoping, only extracting {} directories".format(self.MAX_NUMBER_SUBDIR))
                return

            if sub.is_dir():
                number_subdir += 1
                for f in os.scandir(sub.path):
                    if f.name.endswith(".tsv"):
                        self.conversations.append({"lines": self.loadLines(f.path)})
项目:CycleGAN-TensorFlow    作者:vanhuyz    | 项目源码 | 文件源码
def data_reader(input_dir, shuffle=True):
  """Read images from input_dir then shuffle them
  Args:
    input_dir: string, path of input dir, e.g., /path/to/dir
  Returns:
    file_paths: list of strings
  """
  file_paths = []

  for img_file in scandir(input_dir):
    if img_file.name.endswith('.jpg') and img_file.is_file():
      file_paths.append(img_file.path)

  if shuffle:
    # Shuffle the ordering of all image files in order to guarantee
    # random ordering of the images with respect to label in the
    # saved TFRecord files. Make the randomization repeatable.
    shuffled_index = list(range(len(file_paths)))
    random.seed(12345)
    random.shuffle(shuffled_index)

    file_paths = [file_paths[i] for i in shuffled_index]

  return file_paths
项目:S4    作者:MichaelAquilina    | 项目源码 | 文件源码
def traverse(path, ignore_files=None):
    if not os.path.exists(path):
        return

    if ignore_files is None:
        ignore_files = []

    for item in scandir(path):
        if any(fnmatch.fnmatch(item.name, pattern) for pattern in ignore_files):
            logger.debug('Ignoring %s', item)
            continue

        if item.is_dir():
            for result in traverse(item.path, ignore_files):
                yield os.path.join(item.name, result)
        else:
            yield item.name
项目:pytorch_fnet    作者:AllenCellModeling    | 项目源码 | 文件源码
def get_sources_from_files(path):
    def order(fname):
        if 'bf' in fname: return 0
        if 'lamin' in fname: return 1
        if 'fibrillarin' in fname: return 2
        if 'tom' in fname: return 3
        if 'all' in fname: return 10
        return 0
    files = [i.path for i in os.scandir(path) if i.is_file()]
    paths_sources = [i for i in files if (('rgb.tif' in i) or ('bf.tif' in i))]
    paths_sources.sort(key=order)
    sources = []
    for path in paths_sources:
        source = tifffile.imread(path)
        sources.append(source)
    return sources
项目:pytorch_fnet    作者:AllenCellModeling    | 项目源码 | 文件源码
def save_backup(path_file, n_backups=5):
    if not os.path.exists(path_file):
        return
    path_dir, path_base = os.path.split(path_file)
    path_backup_dir = os.path.join(path_dir, 'backups')
    if not os.path.exists(path_backup_dir):
        os.makedirs(path_backup_dir)
    paths_existing_backups = [i.path for i in os.scandir(path_backup_dir)
                              if (path_base in i.path and i.path.split('.')[-1].isdigit())]
    paths_existing_backups.sort(key=lambda x: os.path.getmtime(x))
    tag = 0
    if len(paths_existing_backups) > 0:
        tag = (int(paths_existing_backups[-1].split('.')[-1]) + 1) % 100
    paths_delete = paths_existing_backups[:-(n_backups - 1)] if n_backups > 1 else paths_existing_backups
    for path in paths_delete:
        os.remove(path)
    path_backup = os.path.join(path_backup_dir, path_base + '.{:02}'.format(tag))
    shutil.copyfile(path_file, path_backup)
    print('wrote to:', path_backup)
项目:pytorch_fnet    作者:AllenCellModeling    | 项目源码 | 文件源码
def save_backup(path_file, n_backups=5):
    if not os.path.exists(path_file):
        return
    path_dir, path_base = os.path.split(path_file)
    path_backup_dir = os.path.join(path_dir, 'backups')
    if not os.path.exists(path_backup_dir):
        os.makedirs(path_backup_dir)
    paths_existing_backups = [i.path for i in os.scandir(path_backup_dir)
                              if (path_base in i.path and i.path.split('.')[-1].isdigit())]
    paths_existing_backups.sort(key=lambda x: os.path.getmtime(x))
    tag = 0
    if len(paths_existing_backups) > 0:
        tag = (int(paths_existing_backups[-1].split('.')[-1]) + 1) % 100
    paths_delete = paths_existing_backups[:-(n_backups - 1)] if n_backups > 1 else paths_existing_backups
    for path in paths_delete:
        os.remove(path)
    path_backup = os.path.join(path_backup_dir, path_base + '.{:02}'.format(tag))
    shutil.copyfile(path_file, path_backup)
    print('wrote to:', path_backup)
项目:pytorch_fnet    作者:AllenCellModeling    | 项目源码 | 文件源码
def find_source_dirs(path_root_dir):
    """Find source directories to make layouts, going at most 1 layer deep.

    Returns : list of source directories
    """
    def is_source_dir(path):
        if not os.path.isdir(path):
            return False
        has_signal, has_target, has_prediction = False, False, False
        for entry in [i.path for i in os.scandir(path) if i.is_file()]:
            if any(tag in entry for tag in TAGS_SIGNAL):
                has_signal = True
            if any(tag in entry for tag in TAGS_TARGET):
                has_target = True
            if any(tag in entry for tag in TAGS_PREDICTION):
                has_prediction = True
        return has_signal and has_target and has_prediction

    if is_source_dir(path_root_dir):
        return [path_root_dir]
    results = []
    for entry in os.scandir(path_root_dir):
        if is_source_dir(entry.path):
            results.append(entry.path)
    return results
项目:friction    作者:tinruufu    | 项目源码 | 文件源码
def scan_dir(self, path):
        for entry in os.scandir(path):
            if entry.name.startswith('.'):
                continue

            if entry.is_dir():
                self.scan_dir(entry.path)
                continue

            name, ext = os.path.splitext(entry.name)

            if ext.lower() in IMAGE_EXTS:
                self.add_choice(path)
                break

            if ext.lower() in ARCHIVE_EXTS:
                self.add_choice(entry.path)
项目:DeepLearningAndTensorflow    作者:azheng333    | 项目源码 | 文件源码
def __init__(self, dirName):
        """
        Args:
            dirName (string): directory where to load the corpus
        """
        self.MAX_NUMBER_SUBDIR = 10
        self.conversations = []
        __dir = os.path.join(dirName, "dialogs")
        number_subdir = 0
        for sub in tqdm(os.scandir(__dir), desc="Ubuntu dialogs subfolders", total=len(os.listdir(__dir))):
            if number_subdir == self.MAX_NUMBER_SUBDIR:
                print("WARNING: Early stoping, only extracting {} directories".format(self.MAX_NUMBER_SUBDIR))
                return

            if sub.is_dir():
                number_subdir += 1
                for f in os.scandir(sub.path):
                    if f.name.endswith(".tsv"):
                        self.conversations.append({"lines": self.loadLines(f.path)})
项目:dcos    作者:dcos    | 项目源码 | 文件源码
def make_custom_check_bins_package(source_dir, package_filename):
    with gen.util.pkgpanda_package_tmpdir() as tmpdir:
        tmp_source_dir = os.path.join(tmpdir, 'check_bins')
        shutil.copytree(source_dir, tmp_source_dir)

        # Apply permissions
        for entry in os.scandir(tmp_source_dir):
            # source_dir should have no subdirs.
            assert entry.is_file()
            os.chmod(entry.path, 0o755)

        # Add an empty pkginfo.json.
        pkginfo_filename = os.path.join(tmp_source_dir, 'pkginfo.json')
        assert not os.path.isfile(pkginfo_filename)
        with open(pkginfo_filename, 'w') as f:
            f.write('{}')
        os.chmod(pkginfo_filename, 0o644)

        gen.util.make_pkgpanda_package(tmp_source_dir, package_filename)
项目:denite.nvim    作者:Shougo    | 项目源码 | 文件源码
def scantree(path_name, skip_list=None):
        """This function returns the files present in path_name, including the
        files present in subfolders.

        Implementation uses scandir, if available, as it is faster than
        os.walk"""

        if skip_list is None:
            skip_list = DEFAULT_SKIP_LIST

        try:
            for entry in (e for e in scandir(path_name)
                          if not is_ignored(e.path, skip_list)):
                if entry.is_dir(follow_symlinks=False):
                    yield from scantree(entry.path, skip_list)
                else:
                    yield entry.path
        except PermissionError:
            yield 'PermissionError reading {}'.format(path_name)
项目:Automatic_Speech_Recognition    作者:zzw922cn    | 项目源码 | 文件源码
def process_poetry(self, data_dir='/media/pony/DLdigest/data/languageModel/chinese-poetry/json'):
        save_dir = os.path.join(self.save_dir, 'poem')
        check_path_exists(save_dir)
        count = 0
        for entry in os.scandir(data_dir):
            if entry.name.startswith('poet'):
                with open(entry.path, 'r') as json_file:
                    poems = json.load(json_file)
                    for p in poems: 
                        paras = HanziConv.toSimplified(''.join(p['paragraphs']).replace('\n', ''))
                        paras = filter_punctuation(paras)
                        for para in paras.split(' '):
                            if len(para.strip())>1:
                                pys = ' '.join(np.array(pinyin(para)).flatten())
                                with open(os.path.join(save_dir, str(count//400000+1)+'.txt'), 'a') as f:
                                    f.write(para+','+pys+'\n')
                                count += 1
项目:picasso    作者:jungmannlab    | 项目源码 | 文件源码
def __init__(self, path, memmap_frames=False, verbose=False):
        self.path = _ospath.abspath(path)
        self.dir = _ospath.dirname(self.path)
        base, ext = _ospath.splitext(_ospath.splitext(self.path)[0])    # split two extensions as in .ome.tif
        base = _re.escape(base)
        pattern = _re.compile(base + '_(\d*).ome.tif')    # This matches the basename + an appendix of the file number
        entries = [_.path for _ in _os.scandir(self.dir) if _.is_file()]
        matches = [_re.match(pattern, _) for _ in entries]
        matches = [_ for _ in matches if _ is not None]
        paths_indices = [(int(_.group(1)), _.group(0)) for _ in matches]
        self.paths = [self.path] + [path for index, path in sorted(paths_indices)]
        self.maps = [TiffMap(path, verbose=verbose) for path in self.paths]
        self.n_maps = len(self.maps)
        self.n_frames_per_map = [_.n_frames for _ in self.maps]
        self.n_frames = sum(self.n_frames_per_map)
        self.cum_n_frames = _np.insert(_np.cumsum(self.n_frames_per_map), 0, 0)
        self.dtype = self.maps[0].dtype
        self.height = self.maps[0].height
        self.width = self.maps[0].width
        self.shape = (self.n_frames, self.height, self.width)
项目:ivy    作者:dmulholland    | 项目源码 | 文件源码
def hashsite(sitepath):
    hash = hashlib.sha256()

    def hashdir(dirpath, is_home):
        for entry in os.scandir(dirpath):
            if entry.is_file():
                if entry.name.endswith('~'):
                    continue
                mtime = os.path.getmtime(entry.path)
                hash.update(str(mtime).encode())
                hash.update(entry.name.encode())
            if entry.is_dir():
                if is_home and entry.name == 'out':
                    continue
                hashdir(entry.path, False)

    hashdir(sitepath, True)
    return hash.digest()
项目:SuperOcto    作者:mcecchi    | 项目源码 | 文件源码
def profiles_last_modified(self, slicer):
        """
        Retrieves the last modification date of ``slicer``'s profiles.

        Args:
            slicer (str): the slicer for which to retrieve the last modification date

        Returns:
            (float) the time stamp of the last modification of the slicer's profiles
        """

        if not slicer in self.registered_slicers:
            raise UnknownSlicer(slicer)

        slicer_profile_path = self.get_slicer_profile_path(slicer)
        lms = [os.stat(slicer_profile_path).st_mtime]
        lms += [os.stat(entry.path).st_mtime for entry in scandir(slicer_profile_path) if entry.name.endswith(".profile")]
        return max(lms)
项目:SuperOcto    作者:mcecchi    | 项目源码 | 文件源码
def _analysis_backlog_generator(self, path=None):
        if path is None:
            path = self.basefolder

        metadata = self._get_metadata(path)
        if not metadata:
            metadata = dict()
        for entry in scandir(path):
            if is_hidden_path(entry.name) or not octoprint.filemanager.valid_file_type(entry.name):
                continue

            if entry.is_file():
                if not entry.name in metadata or not isinstance(metadata[entry.name], dict) or not "analysis" in metadata[entry.name]:
                    printer_profile_rels = self.get_link(entry.path, "printerprofile")
                    if printer_profile_rels:
                        printer_profile_id = printer_profile_rels[0]["id"]
                    else:
                        printer_profile_id = None

                    yield entry.name, entry.path, printer_profile_id
            elif os.path.isdir(entry.path):
                for sub_entry in self._analysis_backlog_generator(entry.path):
                    yield self.join_path(entry.name, sub_entry[0]), sub_entry[1], sub_entry[2]
项目:SuperOcto    作者:mcecchi    | 项目源码 | 文件源码
def remove_folder(self, path, recursive=True):
        path, name = self.sanitize(path)

        folder_path = os.path.join(path, name)
        if not os.path.exists(folder_path):
            return

        empty = True
        for entry in scandir(folder_path):
            if entry.name == ".metadata.yaml":
                continue
            empty = False
            break

        if not empty and not recursive:
            raise StorageError("{name} in {path} is not empty".format(**locals()), code=StorageError.NOT_EMPTY)

        import shutil
        shutil.rmtree(folder_path)

        self._delete_metadata(folder_path)
项目:zajecia_python_mini_edycja4    作者:daftcode    | 项目源码 | 文件源码
def my_dir_walker_with_size_counting(topdir=None):
    if topdir is None:
        topdir = os.getcwd()
    sizes = {topdir: 0} # ?cie?ka: rozmiar w bajtach
    stack = []

    def inner_walker(new_topdir):
        stack.append(new_topdir)
        new_topdir_size = os.path.join(*stack)
        entries = os.scandir(new_topdir)
        size = 0
        for entry in entries:
            if entry.is_dir(follow_symlinks=False):
                entry_size = inner_walker(entry.name)
                sizes[os.path.join(*stack)] = entry_size
                size += entry_size
                stack.pop()
            else:
                fpath = os.path.join(*stack, entry.name)
                sizes[fpath] = os.path.getsize(fpath)
                size += os.path.getsize(fpath)
        return size
    inner_walker(topdir)
    return sizes
项目:zajecia_python_mini_edycja4    作者:daftcode    | 项目源码 | 文件源码
def my_directory_walker_with_size_counting(topdir=None):
    if topdir is None:
        topdir = os.getcwd()
    sizes = {topdir: 0}
    root_stack = []
    current_root_size = 0

    def inner_walker(new_topdir):
        root_stack.append(new_topdir)
        new_topdir_path = os.path.join(*root_stack)
        # TODO: PRACA DOMOWA: doda? obs?ug? b??dów
        entries = os.scandir(new_topdir_path)
        size = 0
        for entry in entries:
            if entry.is_dir(follow_symlinks=False):
                entry_size = inner_walker(entry.name)
                sizes[os.path.join(*root_stack)] = entry_size
                size += entry_size
                root_stack.pop()
            elif entry.is_file(follow_symlinks=False):
                sizes[os.path.join(*root_stack, entry.name)] = entry.stat().st_size
                size += entry.stat().st_size # os.path.getsize
        return size
    inner_walker(topdir)
    return sizes
项目:USTC-Software-2017    作者:igemsoftware2017    | 项目源码 | 文件源码
def clean_unused():
    """
    A function to clear unreferenced media files.
    """

    if not hasattr(cache, 'delete_pattern'):
        # Abort if cache backend is not redis
        warnings.warn(
            'Unused files clearing aborted due to bad cache backend settings.')
        return

    _resolve_referenced_files(_fields_to_search())

    with os.scandir(settings.MEDIA_ROOT) as iterator:

        for entry in iterator:

            name = entry.name

            if not entry.is_file() or\
                    cache.get(_make_key(name)) is not None:
                continue

            default_storage.delete(name)
项目:pywal    作者:dylanaraps    | 项目源码 | 文件源码
def get_random_image(img_dir):
    """Pick a random image file from a directory."""
    current_wall = wallpaper.get()
    current_wall = os.path.basename(current_wall)

    file_types = (".png", ".jpg", ".jpeg", ".jpe", ".gif",
                  ".PNG", ".JPG", ".JPEG", ".JPE", ".GIF")

    images = [img for img in os.scandir(img_dir)
              if img.name.endswith(file_types) and img.name != current_wall]

    if not images:
        print("image: No new images found (nothing to do), exiting...")
        sys.exit(1)

    return os.path.join(img_dir, random.choice(images).name)
项目:AsyncFTP    作者:helloqiu    | 项目源码 | 文件源码
def get_list(path):
    for file in os.scandir(path):
        stat = file.stat()
        perms = filemode(stat.st_mode)
        nlinks = stat.st_nlink
        if not nlinks:
            nlinks = 1
        size = stat.st_size
        try:
            uname = pwd.getpwuid(stat.st_uid).pw_name
        except:
            uname = 'owner'
        try:
            gname = grp.getgrgid(stat.st_gid).gr_name
        except:
            gname = 'group'
        mtime = time.gmtime(stat.st_mtime)
        mtime = time.strftime("%b %d %H:%M", mtime)
        mname = file.name
        yield "{}   {} {}    {}   {} {} {}".format(perms, nlinks, uname, gname, size, mtime, mname)
项目:AsyncFTP    作者:helloqiu    | 项目源码 | 文件源码
def test_get_mlsx():
    path = os.path.join(os.getcwd(), "tests")
    result = {}
    temp = get_mlsx(path)
    for f in temp:
        f = f.split(";")
        filename = f[3].strip()
        result[filename] = {}
        for i in range(0, 3):
            name = f[i].split('=')[0]
            result[filename][name] = f[i].split('=')[1]
    files = os.scandir(path)
    for file in files:
        assert file.name in result.keys()
        result_file = result[file.name]
        stat = file.stat()
        assert result_file['modify'] == str(time.strftime("%Y%m%d%H%M%S", time.gmtime(stat[8])))
        assert result_file['size'] == str(stat[6])
        assert result_file['type'] == "dir" if file.is_dir() else "file"
项目:ARTI-NOAH    作者:Glitch-is    | 项目源码 | 文件源码
def __init__(self, dirName):
        """
        Args:
            dirName (string): directory where to load the corpus
        """
        self.MAX_NUMBER_SUBDIR = 200
        self.conversations = []
        __dir = os.path.join(dirName, "dialogs")
        number_subdir = 0
        for sub in tqdm(os.scandir(__dir), desc="Ubuntu dialogs subfolders", total=len(os.listdir(__dir))):
            if number_subdir == self.MAX_NUMBER_SUBDIR:
                print("WARNING: Early stoping, only extracting {} directories".format(self.MAX_NUMBER_SUBDIR))
                return

            if sub.is_dir():
                number_subdir += 1
                for f in os.scandir(sub.path):
                    if f.name.endswith(".tsv"):
                        self.conversations.append({"lines": self.loadLines(f.path)})
项目:slider    作者:llllllllll    | 项目源码 | 文件源码
def _osu_files(path, recurse):
        """An iterator of ``.osu`` filepaths in a directory.

        Parameters
        ----------
        path : path-like
            The directory to search in.
        recurse : bool
            Recursively search ``path``?

        Yields
        ------
        path : str
            The path to a ``.osu`` file.
        """
        if recurse:
            for directory, _, filenames in os.walk(path):
                for filename in filenames:
                    if filename.endswith('.osu'):
                        yield pathlib.Path(os.path.join(directory, filename))
        else:
            for entry in os.scandir(directory):
                path = entry.path
                if path.endswith('.osu'):
                    yield pathlib.Path(path)
项目:bigcode-tools    作者:tuvistavie    | 项目源码 | 文件源码
def _iterdir(dirname, dironly):
    if not dirname:
        if isinstance(dirname, bytes):
            dirname = bytes(os.curdir, 'ASCII')
        else:
            dirname = os.curdir
    try:
        it = scandir(dirname)
        for entry in it:
            try:
                if not dironly or entry.is_dir():
                    yield entry.name
            except OSError:
                pass
    except OSError:
        return

# Recursively yields relative pathnames inside a literal directory.
项目:SerpentAI    作者:SerpentAI    | 项目源码 | 文件源码
def _discover_sprites(self):
        plugin_path = offshoot.config["file_paths"]["plugins"]
        sprites = dict()

        sprite_path = f"{plugin_path}/{self.__class__.__name__}Plugin/files/data/sprites"

        if os.path.isdir(sprite_path):
            files = os.scandir(sprite_path)

            for file in files:
                if file.name.endswith(".png"):
                    sprite_name = "_".join(file.name.split("/")[-1].split("_")[:-1]).replace(".png", "").upper()

                    sprite_image_data = skimage.io.imread(f"{sprite_path}/{file.name}")
                    sprite_image_data = sprite_image_data[:, :, :3, np.newaxis]

                    if sprite_name not in sprites:
                        sprite = Sprite(sprite_name, image_data=sprite_image_data)
                        sprites[sprite_name] = sprite
                    else:
                        sprites[sprite_name].append_image_data(sprite_image_data)

        return sprites
项目:tensorflow-cyclegan    作者:rickbarraza    | 项目源码 | 文件源码
def reader(path, shuffle=True):
    files = []

    for img_file in os.scandir(path):
        if img_file.name.lower().endswith('.jpg', ) and img_file.is_file():
            files.append(img_file.path)

    if shuffle:
        # Shuffle the ordering of all image files in order to guarantee
        # random ordering of the images with respect to label in the
        # saved TFRecord files. Make the randomization repeatable.
        shuffled_index = list(range(len(files)))
        random.shuffle(files)

        files = [files[i] for i in shuffled_index]

    return files
项目:clitorisvulgaris    作者:fhoehl    | 项目源码 | 文件源码
def set_random_clitoris_texture():
    """Picks a random texture from the TEXTURE_DIRECTORY_PATH and assigns it to
    the clitoris material.
    """

    try:
        texture_names = list(entry.name for entry in os.scandir(TEXTURE_DIRECTORY_PATH)
                             if not entry.name.startswith('.') and entry.is_file())

        random_texture_name = choice(texture_names)

        texture_image = bpy.data.images.load(os.path.join(TEXTURE_DIRECTORY_PATH,
                                                          random_texture_name))

        CLIT_MATERIAL.node_tree.nodes[TEXTURE_NODE_NAME].image = texture_image

        DEBUG_INFO["random_texture_name"] = random_texture_name

    except FileNotFoundError:
        print("Cloud not find the texture directory", file=sys.stderr)
    except IndexError:
        print("Texture directory is empty", file=sys.stderr)
项目:zielen    作者:lostatc    | 项目源码 | 文件源码
def test_remote_directories_moved_to_trash(command):
    """Remote directories are moved to the trash.

    Remote directories are moved to the trash if the local directory
    contained only symlinks.
    """
    with open("remote/letters/upper/A.txt", "w") as file:
        file.write("A"*BLOCK_SIZE*5)
    with open("remote/letters/upper/B.txt", "w") as file:
        file.write("B"*BLOCK_SIZE*5)
    command.main()
    shutil.rmtree("local/letters/upper")
    command.main()

    remote_trash_names = [
        entry.name for entry in os.scandir(command.remote_dir.trash_dir)]
    assert "upper" in remote_trash_names
项目:zielen    作者:lostatc    | 项目源码 | 文件源码
def check_excluded(
            self, paths: Iterable[str], start_path: str) -> Set[str]:
        """Get the paths that have been excluded by each client.

        Args:
            paths: The paths to check.
            start_path: The path of the directory to match globbing patterns
                against.

        Returns:
            The subset of input paths that have been excluded by each client.
        """
        pattern_files = []
        for entry in os.scandir(self._exclude_dir):
            pattern_files.append(ProfileExcludeFile(entry.path))

        rm_files = set()
        for path in paths:
            for pattern_file in pattern_files:
                if path not in pattern_file.all_matches(start_path):
                    break
            else:
                rm_files.add(path)

        return rm_files
项目:Frontdown    作者:pfirsich    | 项目源码 | 文件源码
def relativeWalk(path, startPath = None):
    if startPath == None: startPath = path
    # strxfrm -> local aware sorting - https://docs.python.org/3/howto/sorting.html#odd-and-ends
    for entry in sorted(os.scandir(path), key = lambda x: locale.strxfrm(x.name)):
        try:
            #print(entry.path, " ----- ",  entry.name)
            if entry.is_file():
                yield os.path.relpath(entry.path, startPath), False
            elif entry.is_dir():
                yield os.path.relpath(entry.path, startPath), True
                yield from relativeWalk(entry.path, startPath)
            else:
                logging.error("Encountered an object which is neither directory nor file: " + entry.path)
        except OSError as e:
            logging.error(e)

# Possible actions:
# copy (always from source to target),
# delete (always in target)
# hardlink (always from compare directory to target directory)
# rename (always in target) (2-variate) (only needed for move detection)
# hardlink2 (alway from compare directory to target directory) (2-variate) (only needed for move detection)
项目:denite-extra    作者:chemzqm    | 项目源码 | 文件源码
def gather_candidates(self, context):
        candidates = []
        for directory in context['__folders']:
            if not os.access(directory, os.X_OK):
                continue
            base = os.path.basename(directory)
            items = os.scandir(directory)
            for item in items:
                if item.name[0] == '.':
                    continue
                candidates.append({
                    'word': item.name,
                    'abbr': '%-14s %-20s' % (base, item.name),
                    'source__root': item.path,
                    'source__mtime': item.stat().st_mtime
                    })
        candidates = sorted(candidates, key=itemgetter('source__mtime'),
                            reverse=True)
        return candidates
项目:denite-extra    作者:chemzqm    | 项目源码 | 文件源码
def gather_candidates(self, context):
        root = context['__root']
        candidates = []

        items = os.scandir(root)
        now = time.time()
        for item in items:
            if item.is_file():
                mtime = item.stat().st_mtime
                extname = os.path.splitext(item.name)[0]
                candidates.append({
                    'word': '%s (%s)' % (extname, ago(now, mtime)),
                    'action__path': item.path,
                    'source_mtime': mtime
                    })

        candidates = sorted(candidates, key=lambda item: item['source_mtime'],
                            reverse=True)
        return candidates
项目:edata    作者:ap-Codkelden    | 项目源码 | 文件源码
def main():
    results = arg_parser.parse_args()
    if re.match('^.+\.sqlite$', results.database):
        results.database = re.sub('^(.+)\.sqlite$', '\\1', results.database)
    try:
        json_filenames = results.file if results.file else \
            [f.path for f in scandir() if f.is_file() and
             os.path.splitext(f.path)[1].lower() == '.json']
        if not json_filenames:
            raise NoFilesProvidedError
    except NoFilesProvidedError:
        sys.exit(2)
    else:
        edb = EDataSQLDatabase(database=results.database,
                               verbose=results.verbose)
        for f in [f for f in json_filenames if check_file(f)]:
            edb.import_file(f)
项目:dopplerr    作者:Stibbons    | 项目源码 | 文件源码
def _scan(self, root, media_dirs=None):
        i = 0
        with os.scandir(root) as it:
            for entry in it:
                i += 1
                if i > SPEED_LIMIT:
                    # this allows the event loop to update
                    await asyncio.sleep(SPEED_WAIT_SEC)
                    i = 0
                if self.stopped or self.interrupted:
                    return
                if media_dirs:
                    if entry.name in media_dirs:
                        await self._scan(entry.path, media_dirs=None)
                elif not entry.name.startswith('.'):
                    if entry.is_dir(follow_symlinks=False):
                        await self._scan(entry.path, media_dirs=None)
                    elif entry.name.rpartition('.')[2] in VIDEO_FILES_EXT:
                        await self._refresh_video(entry.path)
项目:reframe    作者:eth-cscs    | 项目源码 | 文件源码
def load_from_dir(self, dirname, recurse=False, **check_args):
        checks = []
        for entry in os.scandir(dirname):
            if recurse and entry.is_dir():
                checks.extend(
                    self.load_from_dir(entry.path, recurse, **check_args)
                )

            if (entry.name.startswith('.') or
                not entry.name.endswith('.py') or
                not entry.is_file()):
                continue

            checks.extend(self.load_from_file(entry.path, **check_args))

        return checks
项目:ArffFileGeneratorForWeka    作者:unicod3    | 项目源码 | 文件源码
def create_arff(self):
        """
        Create Arff file with a timestamp
        :return: string name of file
        """
        mdate = datetime.now().strftime('%d-%m-%Y_%H_%M_%S')
        my_arff_file = os.path.join("./arff_file/", mdate + ".arff")

        os.makedirs(os.path.dirname(my_arff_file), exist_ok=True)  # create folder if it does not exist
        with open(my_arff_file, mode='w', encoding='utf-8') as output:
            output.write("@relation " + self.relation + "\n")
            output.write("\n")
            for key, value in self.attribute_list.items():
                output.write("@attribute " + key + " numeric\n")
            output.write("@attribute folders {%s}\n" % ', '.join('{}'.format(f.name) for f in os.scandir(self.folderpath) if f.is_dir()))

            output.write("\n")
            output.write("@data\n")
            for key, counter_value in sorted(self.counterList.items()):
                # populate data part of the file with count of each word
                joker_key = os.path.split(os.path.split(key)[0])[1]
                line = ', '.join('{}'.format(counter_value[w]) for w in self.search_list)
                line += ', ' + joker_key
                output.write(line + "\n")

        return my_arff_file
项目:SDK    作者:Keypirinha    | 项目源码 | 文件源码
def dir_is_empty(path):
    """
    Check if the given directory is empty.
    May raise a FileNotFoundError or a NotADirectoryError exception.
    """
    for entry in os.scandir(path):
        return False
    return True
项目:SDK    作者:Keypirinha    | 项目源码 | 文件源码
def file_set_readonly(path, enable, follow_symlinks=True, recursive=False):
    """Apply or remove the read-only property of a given file or directory."""
    st_mode = os.stat(path, follow_symlinks=follow_symlinks).st_mode
    new_attr = (
        (st_mode | stat.S_IREAD) & ~stat.S_IWRITE if enable
        else (st_mode | stat.S_IWRITE) & ~stat.S_IREAD)
    if new_attr != st_mode:
        os.chmod(path, new_attr)

    if recursive and stat.S_ISDIR(st_mode):
        for entry in os.scandir(path):
            file_set_readonly(entry.path, enable, follow_symlinks, recursive)