Python pathlib 模块,PurePath() 实例源码

我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用pathlib.PurePath()

项目:conda-tools    作者:groutr    | 项目源码 | 文件源码
def sane_members(members, destination):
    resolve = lambda path: realpath(normpath(join(destination, path)))

    destination = PurePath(destination)

    for member in members:
        mpath = PurePath(resolve(member.path))

        # Check if mpath is under destination
        if destination not in mpath.parents:
            raise BadPathError("Bad path to outside destination directory: {}".format(mpath))
        elif member.issym() or member.islnk():
            # Check link to make sure it resolves under destination
            lnkpath = PurePath(member.linkpath)
            if lnkpath.is_absolute() or lnkpath.is_reserved():
                raise BadLinkError("Bad link: {}".format(lnkpath))

            # resolve the link to an absolute path
            lnkpath = PurePath(resolve(lnkpath))
            if destination not in lnkpath.parents:
                raise BadLinkError("Bad link to outside destination directory: {}".format(lnkpath))

        yield member
项目:wargame    作者:maximinus    | 项目源码 | 文件源码
def load(self, root_folder):
        # config files under ROOT/resources/config
        # iterate through those files
        for subdir, dirs, files in os.walk(root_folder):
            for file in files:
                # load this yaml file
                filepath = os.path.join(subdir, file)
                data = yaml.load(open(filepath, 'r'))
                path = PurePath(filepath).parts
                path = '/'.join(path[path.index('resources'):])
                config = get_config_item(data, path)
                if config is None:
                    logger.error('Failed to load {0}'.format(path))
                    continue
                # store this item
                self.data[config.data.type] = config.data
项目:aioworkers    作者:aioworkers    | 项目源码 | 文件源码
def flat(parts):
    if isinstance(parts, str):
        if os.path.isabs(parts):
            raise ValueError('Path must be relative. '
                             '[{}]'.format(parts))
        yield parts
    elif isinstance(parts, PurePath):
        if parts.is_absolute():
            raise ValueError('Path must be relative. '
                             '[{}]'.format(parts))
        yield parts
    elif isinstance(parts, (list, tuple)):
        for p in parts:
            yield from flat(p)
    else:
        raise TypeError(
            'Key must be relative path [str or Path]. '
            'But {}'.format(parts))
项目:aioworkers    作者:aioworkers    | 项目源码 | 文件源码
def test_key(loop):
    with tempfile.TemporaryDirectory() as d:
        config = MergeDict(
            name='',
            path=d,
            executor=None,
        )
        context = Context({}, loop=loop)
        storage = FileSystemStorage(config, context=context, loop=loop)
        await storage.init()

        assert str(storage.raw_key(('1', '3', ('4',)))).endswith('1/3/4')
        assert str(storage.raw_key((PurePath('1'), '3', ('4',)))).endswith('1/3/4')

        with pytest.raises(TypeError):
            storage.raw_key(1)

        with pytest.raises(ValueError):
            storage.raw_key('../..')

        with pytest.raises(ValueError):
            storage.raw_key('/abs/path')

        with pytest.raises(ValueError):
            storage.raw_key(PurePath('/abs/path'))
项目:kge-server    作者:vfrico    | 项目源码 | 文件源码
def set_model(self, dataset_id, model_path):
        """Saves on database the model path file of a dataset_id

        :param int dataset_id: The id of the dataset
        :param str model_path: The path where binary file is located
        """
        # Substract to the model_path the relative bin_path
        rel_model_path = PurePath(model_path).relative_to(self.bin_path)
        query = "UPDATE dataset SET binary_model=? WHERE id=? ;"
        res = self.execute_insertion(query, str(rel_model_path), dataset_id)

        if res.rowcount == 1:
            res.close()
            return True, None
        else:
            res.close()
            return False, (400, "Failed when trying to save dataset on db")
项目:kge-server    作者:vfrico    | 项目源码 | 文件源码
def set_search_index(self, dataset_id, index_path):
        """Saves on database the index of a dataset

        :param int dataset_id: The id of the dataset
        :param string index_path: The path on the filesystem of the index
        :returns: If operation was successful
        :rtype: tuple
        """
        # Substract to the index_path the relative bin_path
        rel_index_path = PurePath(index_path).relative_to(self.bin_path)
        query = "UPDATE dataset SET binary_index=? WHERE id=? ;"

        res = self.execute_insertion(query, str(rel_index_path), dataset_id)

        if res.rowcount == 1:
            res.close()
            return True, None
        else:
            res.close()
            return False, (400, "Failed when trying to save index on db")
项目:PyGObject-examples    作者:VatsalP    | 项目源码 | 文件源码
def file_new(self, button):
        if (not self.currentfile) and (not self.filechanged):
            self.textbuffer.set_text("")
        else:
            self.notsave.set_transient_for(self.window)
            self.response = self.notsave.run()

            if self.response == Gtk.ResponseType.YES:
                self.save(self.filesave)
                self.textbuffer.set_text("")
                self.notsave.hide()
            else:
                self.textbuffer.set_text("")
                self.currentfile = ""
                self.filechanged = False
                self.notsave.hide()

        self.path = pathlib.PurePath(self.currentfile)
        if self.parts:
            self.window.set_title(self.path.parts[-1])
        else:
            self.window.set_title("Untitled document")
项目:PyGObject-examples    作者:VatsalP    | 项目源码 | 文件源码
def save_as(self, button):
        self.filesave.set_transient_for(self.window)
        self.response = self.filesave.run()

        if self.response == Gtk.ResponseType.OK:
            self.file_to_save = self.filesave.get_filename()
            if self.file_to_save:
                self.currentfile = self.file_to_save
                self.path = pathlib.PurePath(self.currentfile)
                if self.path.parts:
                    self.window.set_title(self.path.parts[-1])
                else:
                    self.window.set_title("Untitled document")
                with codecs.open(self.file_to_save,
                                 'w',
                                 encoding="utf-8") as f:
                    f.write(self.textbuffer.get_text(
                        self.textbuffer.get_start_iter(),
                        self.textbuffer.get_end_iter(),
                        False
                    ))
        self.filechanged = False
        self.filesave.hide()
项目:isambard    作者:woolfson-group    | 项目源码 | 文件源码
def __init__(self, pdb, path=True, pdb_id='', ignore_end=False):
        self.proc_functions = {
            'ATOM': self.proc_atom,
            'HETATM': self.proc_atom,
            'ENDMDL': self.change_state,
            'END': self.end
        }
        if path:
            pdb_path = pathlib.PurePath(pdb)
            with open(str(pdb_path), 'r') as inf:
                pdb_str = inf.read()
            self.id = pdb_path.stem
        else:
            pdb_str = pdb
            self.id = pdb_id
        self.pdb_lines = pdb_str.splitlines()
        self.new_labels = False
        self.state = 0
        self.pdb_parse_tree = None
        self.current_line = None
        self.ignore_end = ignore_end
        self.parse_pdb_file()
项目:polyaxon-cli    作者:polyaxon    | 项目源码 | 文件源码
def _remove_slash_prefix(line):
        """For details on the glob matcher,
        see: https://docs.python.org/3/library/pathlib.html#pathlib.PurePath.match
        """
        return line[1:] if line.startswith('/') else line
项目:polyaxon-cli    作者:polyaxon    | 项目源码 | 文件源码
def _matches_patterns(path, patterns):
        """Given a list of patterns, returns a if a path matches any pattern."""
        for glob in patterns:
            try:
                if PurePath(path).match(glob):
                    return True
            except TypeError:
                pass
        return False
项目:utils    作者:rapydo    | 项目源码 | 文件源码
def parts(my_path):
    return PurePath(my_path).parts
项目:deb-python-django-pyscss    作者:openstack    | 项目源码 | 文件源码
def handle_import(self, name, compilation, rule):
        """
        Re-implementation of the core Sass import mechanism, which looks for
        files using the staticfiles storage and staticfiles finders.
        """
        original_path = PurePath(name)

        search_exts = list(compilation.compiler.dynamic_extensions)
        if original_path.suffix and original_path.suffix in search_exts:
            basename = original_path.stem
        else:
            basename = original_path.name

        if original_path.is_absolute():
            # Remove the beginning slash
            search_path = original_path.relative_to('/').parent
        elif rule.source_file.origin:
            search_path = rule.source_file.origin
            if original_path.parent:
                search_path = os.path.normpath(str(search_path / original_path.parent))
        else:
            search_path = original_path.parent

        for prefix, suffix in product(('_', ''), search_exts):
            filename = PurePath(prefix + basename + suffix)

            full_filename, storage = get_file_and_storage(str(search_path / filename))

            if full_filename:
                with storage.open(full_filename) as f:
                    return SourceFile.from_file(f, origin=search_path, relpath=filename)
项目:deb-python-django-pyscss    作者:openstack    | 项目源码 | 文件源码
def compile(self, *paths):
        compilation = self.make_compilation()
        for path in paths:
            path = PurePath(path)
            if path.is_absolute():
                path = path.relative_to('/')
            filename, storage = get_file_and_storage(str(path))
            with storage.open(filename) as f:
                source = SourceFile.from_file(f, origin=path.parent, relpath=PurePath(path.name))
            compilation.add_source(source)
        return self.call_and_catch_errors(compilation.run)
项目:deb-python-django-pyscss    作者:openstack    | 项目源码 | 文件源码
def compile_string(self, string, filename=None):
        compilation = self.make_compilation()
        if filename is not None:
            f = StringIO(string)
            filename = PurePath(filename)
            source = SourceFile.from_file(f, origin=filename.parent, relpath=PurePath(filename.name))
        else:
            source = SourceFile.from_string(string)
        compilation.add_source(source)
        return self.call_and_catch_errors(compilation.run)
项目:kit-ilias-fuse    作者:brantsch    | 项目源码 | 文件源码
def __path_to_object(self, path):
        """
        Map from paths to objects.
        """
        purepath = pathlib.PurePath(path)
        node = self.dashboard
        for part in purepath.parts[1:]:
            node = node.get_child_by_name(part)
            if not node:
                break
        return node
项目:aioworkers    作者:aioworkers    | 项目源码 | 文件源码
def raw_key(self, *key):
        rel = os.path.normpath(str(PurePath(*flat(key))))
        path = self._path.joinpath(self.path_transform(rel)).normpath
        if path.relative_to(self._path) == '.':
            raise ValueError('Access denied: %s' % path)
        return path
项目:lxdock    作者:lxdock    | 项目源码 | 文件源码
def test_can_copy_file(self):
        class DummyGuest(Guest):
            name = 'dummy'
        guest = DummyGuest(FakeContainer())
        guest.lxd_container.files.put.return_value = True
        with tempfile.NamedTemporaryFile() as f:
            f.write(b'dummy file')
            f.flush()
            guest.copy_file(pathlib.Path(f.name), pathlib.PurePath('/a/b/c'))
        assert guest.lxd_container.execute.call_count == 1
        assert guest.lxd_container.execute.call_args[0] == (['mkdir', '-p', '/a/b'], )
        assert guest.lxd_container.files.put.call_count == 1
        assert guest.lxd_container.files.put.call_args[0] == ('/a/b/c', b'dummy file')
项目:python-humble-utils    作者:webyneter    | 项目源码 | 文件源码
def test_generate_random_dir_path(subdir_count: int):
    dir_path = generate_random_dir_path(subdir_count)
    subdirs = PurePath(dir_path).parts[1:]
    assert len(subdirs) == subdir_count
    # (Not testing the randomness of underlying UUIDs,
    # since that is the implementation detail we do not want
    # to rely upon.)
项目:copaspedia    作者:selesdepselesnul    | 项目源码 | 文件源码
def set_content_image(self, list_image, des_dir):
        self.content_text_browser.clear()
        self.content_text_browser.setEnabled(True)
        for i in list_image:
            full_path = html.escape(des_dir + '/' + PurePath(i).name)
            self.content_text_browser.append(
                    "<img src='{}' title='store at : {}'/><br/>".format(full_path, full_path))

        self.__load_finished()
        QMessageBox.information(self, 'Download Completed',
            'All of your donwload images store at : {}'.format(des_dir))
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _check_str_subclass(self, *args):
        # Issue #21127: it should be possible to construct a PurePath object
        # from an str subclass instance, and it then gets converted to
        # a pure str object.
        class StrSubclass(str):
            pass
        P = self.cls
        p = P(*(StrSubclass(x) for x in args))
        self.assertEqual(p, P(*args))
        for part in p.parts:
            self.assertIs(type(part), str)
项目:sammy    作者:capless    | 项目源码 | 文件源码
def setUp(self):
        template_path = '{}/{}'.format(
            pl.PurePath(
                os.path.abspath(__file__)).parent / 'examples/yaml', self.template_name)
        with open(template_path, 'r') as f:
            self.template_dict = yaml.load(f)
项目:bedevere    作者:python    | 项目源码 | 文件源码
def check_news(event, gh, *args, **kwargs):
    pr_number = event.data['number']
    pull_request = event.data['pull_request']
    # For some unknown reason there isn't any files URL in a pull request
    # payload.
    files_url = f'/repos/python/cpython/pulls/{pr_number}/files'
    # Could have collected all the filenames in an async set comprehension,
    # but doing it this way potentially minimizes the number of API calls.
    in_next_dir = file_found = False
    async for file_data in gh.getiter(files_url):
        filename = file_data['filename']
        if not filename.startswith(NEWS_NEXT_DIR):
            continue
        in_next_dir = True
        file_path = pathlib.PurePath(filename)
        if len(file_path.parts) != 5:  # Misc, NEWS.d, next, <subsection>, <entry>
            continue
        file_found = True
        if FILENAME_RE.match(file_path.name):
            status = create_status(util.StatusState.SUCCESS,
                                   description='News entry found in Misc/NEWS.d')
            break
    else:
        issue = await util.issue_for_PR(gh, pull_request)
        if util.skip("news", issue):
            status = SKIP_LABEL_STATUS
        else:
            if not in_next_dir:
                description = f'No news entry in {NEWS_NEXT_DIR} or "skip news" label found'
            elif not file_found:
                description = "News entry not in an appropriate directory"
            else:
                description = "News entry file name incorrectly formatted"
            status = create_status(util.StatusState.FAILURE,
                                   description=description,
                                   target_url=DEVGUIDE_URL)

    await gh.post(pull_request['statuses_url'], data=status)
项目:PyGObject-examples    作者:VatsalP    | 项目源码 | 文件源码
def __init__(self):
        self.builder = Gtk.Builder()
        self.builder.add_from_file("simpletexteditor.glade")
        self.builder.connect_signals(self)

        self.window = self.builder.get_object("window")
        self.textview = self.builder.get_object("textview")
        self.textbuffer = self.builder.get_object("textbuffer")
        self.about = self.builder.get_object("about_page")
        self.fileopen = self.builder.get_object("fileopen")
        self.filesave = self.builder.get_object("filesave")
        self.fontchooser = self.builder.get_object("fontchooser")
        # notsave is msgdialog for asking
        # if user wants to save current textbuffer
        self.notsave = self.builder.get_object("notsave")
        self.notsavetwo = self.builder.get_object("notsavetwo")

        self.currentfile = ""
        self.path = pathlib.PurePath(self.currentfile)
        if self.path.parts:
            self.window.set_title(self.path.parts[-1])
        else:
            self.window.set_title("Untitled document")


        self.filechanged = False
        self.clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
项目:PyGObject-examples    作者:VatsalP    | 项目源码 | 文件源码
def file_open(self, button):
        if self.filechanged:
            self.notsavetwo.set_transient_for(self.window)
            self.response = self.notsavetwo.run()

            if self.response == Gtk.ResponseType.YES:
                self.save(self.filesave)

            self.notsavetwo.hide()


        self.fileopen.set_transient_for(self.window)
        self.response = self.fileopen.run()

        if self.response == Gtk.ResponseType.OK:
            self.file_to_open = self.fileopen.get_filename()
            if self.file_to_open:
                self.currentfile = self.file_to_open
                self.path = pathlib.PurePath(self.currentfile)
                self.window.set_title(self.path.parts[-1])
                if self.path.parts:
                    self.window.set_title(self.path.parts[-1])
                else:
                    self.window.set_title("Untitled document")
                with codecs.open(self.file_to_open,
                                 'r',
                                 encoding="utf-8") as f:
                    self.content_file = f.read()
                    self.textbuffer.set_text(self.content_file)

        self.fileopen.hide()
项目:vminspect    作者:noxdafox    | 项目源码 | 文件源码
def _extract_files(self, disk, files, path):
        path = str(PurePath(path, 'extracted_files'))

        makedirs(path)

        extracted, failed = extract_files(self.filesystems[disk], files, path)

        self.logger.info("Files extracted into %s.", path)
        if failed:
            self.logger.warning(
                "The following files could not be extracted: %s",
                '\n'.join(failed.values()))

        return extracted, failed
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def _check_str_subclass(self, *args):
        # Issue #21127: it should be possible to construct a PurePath object
        # from an str subclass instance, and it then gets converted to
        # a pure str object.
        class StrSubclass(str):
            pass
        P = self.cls
        p = P(*(StrSubclass(x) for x in args))
        self.assertEqual(p, P(*args))
        for part in p.parts:
            self.assertIs(type(part), str)
项目:py    作者:pytest-dev    | 项目源码 | 文件源码
def fspath(path):
        """
        Return the string representation of the path.
        If str or bytes is passed in, it is returned unchanged.
        This code comes from PEP 519, modified to support earlier versions of
        python.

        This is required for python < 3.6.
        """
        if isinstance(path, (py.builtin.text, py.builtin.bytes)):
            return path

        # Work from the object's type to match method resolution of other magic
        # methods.
        path_type = type(path)
        try:
            return path_type.__fspath__(path)
        except AttributeError:
            if hasattr(path_type, '__fspath__'):
                raise
            try:
                import pathlib
            except ImportError:
                pass
            else:
                if isinstance(path, pathlib.PurePath):
                    return py.builtin.text(path)

            raise TypeError("expected str, bytes or os.PathLike object, not "
                            + path_type.__name__)
项目:flexCE    作者:bretthandrews    | 项目源码 | 文件源码
def substitute_dir_in_path(path, olddir, newdir):
    pp = PurePath(path)
    parts = [p if p != olddir else newdir for p in pp.parts]
    return os.path.join(*parts)
项目:azure-data-lake-store-python    作者:Azure    | 项目源码 | 文件源码
def globless_prefix(self):
        """ Return shortest path prefix without glob quantifiers. """
        parts = []
        for part in self.parts:
            if any(q in part for q in ['*', '?']):
                break
            parts.append(part)
        return pathlib.PurePath(*parts)
项目:ceiba-dl    作者:lantw44    | 项目源码 | 文件源码
def download_link(self, path, node, retry, dcb, ecb):
        disk_path_object = pathlib.Path(path.lstrip('/'))
        disk_path = str(disk_path_object)
        if self.vfs.is_internal_link(node):
            link_target_path = str(pathlib.PurePath(node.read_link()))
        else:
            assert False

        if disk_path_object.is_symlink():
            existing_link_target_path = os.readlink(disk_path)
            if existing_link_target_path == link_target_path:
                self.logger.info('???????????????? {}' \
                    .format(disk_path))
                return True

        download_ok = False
        for i in range(retry):
            try:
                if i != 0:
                    self.logger.error('???????? {}?????? {} ?' \
                        .format(disk_path, i + 1))
                try:
                    dcb(path, False, None, None, None)
                    disk_path_object.symlink_to(link_target_path)
                    dcb(path, True, None, None, None)
                    ecb(path)
                    download_ok = True
                    break
                except FileExistsError:
                    if disk_path_object.is_symlink():
                        disk_path_object.unlink()
                        dcb(path, False, None, None, None)
                        disk_path_object.symlink_to(link_target_path)
                        dcb(path, True, None, None, None)
                        ecb(path)
                        download_ok = True
                        break
            except IOError as err:
                ecb(path)
                self.logger.error(err)

        if not download_ok:
            return False

        return True
项目:qiime2    作者:qiime2    | 项目源码 | 文件源码
def import_data(cls, type, view, view_type=None):
        type_, type = type, __builtins__['type']

        is_format = False
        if isinstance(type_, str):
            type_ = qiime2.sdk.parse_type(type_)

        if isinstance(view_type, str):
            view_type = qiime2.sdk.parse_format(view_type)
            is_format = True

        if view_type is None:
            if type(view) is str or isinstance(view, pathlib.PurePath):
                is_format = True
                pm = qiime2.sdk.PluginManager()
                output_dir_fmt = pm.get_directory_format(type_)
                if pathlib.Path(view).is_file():
                    if not issubclass(output_dir_fmt,
                                      model.SingleFileDirectoryFormatBase):
                        raise qiime2.plugin.ValidationError(
                            "Importing %r requires a directory, not %s"
                            % (output_dir_fmt.__name__, view))
                    view_type = output_dir_fmt.file.format
                else:
                    view_type = output_dir_fmt
            else:
                view_type = type(view)

        format_ = None
        md5sums = None
        if is_format:
            path = pathlib.Path(view)
            if path.is_file():
                md5sums = {path.name: util.md5sum(path)}
            elif path.is_dir():
                md5sums = util.md5sum_directory(path)
            else:
                raise qiime2.plugin.ValidationError(
                    "Path '%s' does not exist." % path)
            format_ = view_type

        provenance_capture = archive.ImportProvenanceCapture(format_, md5sums)
        return cls._from_view(type_, view, view_type, provenance_capture)
项目:copaspedia    作者:selesdepselesnul    | 项目源码 | 文件源码
def _extract_from_wiki(self):
        title = self.title_line_edit.text()
        if title:
            page = self.page_combo_box.currentText()
            wikipedia.set_lang(self.lang_combo_box.currentText())
            self.load_progressbar.setMinimum(0)
            self.load_progressbar.setMaximum(0)

            class ProgressThread(QThread, QWidget):

                content_link_arrived = pyqtSignal([list])
                content_text_arrived = pyqtSignal(['QString'])
                content_image_arrived = pyqtSignal([list, 'QString'])
                error_occurred = pyqtSignal()
                valid_images = []

                def run(self):
                    try:
                        wiki = wikipedia.page(title=title)
                        f = open('templates/template.html')
                        if page == 'Content':
                            self.content_text_arrived.emit(wiki.content)
                        elif page == 'Images':

                            print(wiki.images)

                            self.des_dir = Preferences.output_path + '/' + title 
                            self.valid_images = []
                            if not os.path.exists(self.des_dir):
                                print(self.des_dir)
                                os.mkdir(self.des_dir)   

                            for i in wiki.images:
                                if PurePath(i).suffix in Preferences.valid_image_formats:
                                    print(i)
                                    print(self.des_dir)
                                    wget.download(i, out=self.des_dir)
                                    self.valid_images.append(i)
                            self.content_image_arrived.emit(self.valid_images, self.des_dir)

                        elif page == 'Summary':
                            self.content_text_arrived.emit(wiki.summary)
                        elif page == 'Images Links':
                            self.content_link_arrived.emit(wiki.images)
                        elif page == 'References Links':
                            self.content_link_arrived.emit(wiki.references)


                    except:
                        self.error_occurred.emit()

            self.progress_thread = ProgressThread()
            self.progress_thread.content_link_arrived.connect(self.set_content_link)
            self.progress_thread.content_text_arrived.connect(self.set_content_text)
            self.progress_thread.content_image_arrived.connect(self.set_content_image)
            self.progress_thread.error_occurred.connect(self.handle_error_occurred)
            self.progress_thread.start()
        else:
            self.content_text_browser.clear()
            self.content_text_browser.setEnabled(False)