Python os 模块,replace() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.replace()

项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def __save(self):
        if self.__asynchronous == 0:
            state = {
                "version" : _BobState.CUR_VERSION,
                "byNameDirs" : self.__byNameDirs,
                "results" : self.__results,
                "inputs" : self.__inputs,
                "jenkins" : self.__jenkins,
                "dirStates" : self.__dirStates,
                "buildState" : self.__buildState,
            }
            tmpFile = self.__path+".new"
            try:
                with open(tmpFile, "wb") as f:
                    pickle.dump(state, f)
                    f.flush()
                    os.fsync(f.fileno())
                os.replace(tmpFile, self.__path)
            except OSError as e:
                raise ParseError("Error saving workspace state: " + str(e))
            self.__dirty = False
        else:
            self.__dirty = True
项目:youtube    作者:FishyFing    | 项目源码 | 文件源码
def save_json(self, filename, data):
        """Atomically saves json file"""
        rnd = randint(1000, 9999)
        path, ext = os.path.splitext(filename)
        tmp_file = "{}-{}.tmp".format(path, rnd)
        self._save_json(tmp_file, data)
        try:
            self._read_json(tmp_file)
        except json.decoder.JSONDecodeError:
            self.logger.exception("Attempted to write file {} but JSON "
                                  "integrity check on tmp file has failed. "
                                  "The original file is unaltered."
                                  "".format(filename))
            return False
        os.replace(tmp_file, filename)
        return True
项目:Quiver    作者:DeflatedPickle    | 项目源码 | 文件源码
def zip_file(self):
        amount = functions.folder_files(self.parent.directory)
        progress = dialog.ProgressWindow(self.parent, title="Zipping Pack", maximum=amount)

        count = 0

        with zipfile.ZipFile(self.parent.directory + ".zip", "w") as z:
            for root, dirs, files in os.walk(self.parent.directory.replace("\\", "/"), topdown=True):
                new_root = root.replace("\\", "/").split("/")
                # print(root, dirs, files)
                for name in files:
                    z.write(os.path.join(root, name),
                            "/".join(new_root[new_root.index(self.parent.directory.split("/")[-1]) + 1:]) + "/" + name)

                    count += 1
                    progress.variable_name.set("Current File: " + name)
                    progress.variable_percent.set("{}% Complete".format(round(100 * float(count) / float(amount))))
                    progress.variable_progress.set(progress.variable_progress.get() + 1)

            z.close()

        progress.destroy()
        messagebox.showinfo(title="Information", message="Zipping complete.")
项目:srctools    作者:TeamSpen210    | 项目源码 | 文件源码
def __exit__(self, exc_type, exc_value, tback):
        # Pass to tempfile, which also closes().
        temp_path = self.temp.name
        self.temp.__exit__(exc_type, exc_value, tback)
        self.temp = None
        if exc_type is not None:
            # An exception occurred, clean up.
            try:
                _os.remove(temp_path)
            except FileNotFoundError:
                pass
        else:
            # No exception, commit changes
            _os.replace(temp_path, self.filename)

        return False  # Don't cancel the exception.


# Import these, so people can reference 'srctools.Vec' instead of
# 'srctools.vec.Vec'.
# Should be done after other code, so everything's initialised.
# Not all classes are imported, just most-used ones.
项目:selfbot    作者:Discord-ian    | 项目源码 | 文件源码
def save_json(self, filename, data):
        """Atomically saves json file"""
        rnd = randint(1000, 9999)
        path, ext = os.path.splitext(filename)
        tmp_file = "{}-{}.tmp".format(path, rnd)
        self._save_json(tmp_file, data)
        try:
            self._read_json(tmp_file)
        except json.decoder.JSONDecodeError:
            self.logger.exception("Attempted to write file {} but JSON "
                                  "integrity check on tmp file has failed. "
                                  "The original file is unaltered."
                                  "".format(filename))
            return False
        os.replace(tmp_file, filename)
        return True
项目:hatch    作者:ofek    | 项目源码 | 文件源码
def temp_move_path(path, d):
    if os.path.exists(path):
        dst = shutil.move(path, d)

        try:
            yield dst
        finally:
            try:
                os.replace(dst, path)
            except OSError:  # no cov
                shutil.move(dst, path)
    else:
        try:
            yield
        finally:
            remove_path(path)
项目:nimp    作者:dontnod    | 项目源码 | 文件源码
def _run_fileset(self, env, file_mapper):
        stash_dir = env.format('{root_dir}/.nimp/stash')
        nimp.system.safe_makedirs(stash_dir)

        stash_file = os.path.join(stash_dir, env.fileset)
        nimp.system.safe_delete(stash_file)

        with open(stash_file, 'w') as stash:
            for src, _ in file_mapper():
                src = nimp.system.sanitize_path(src)
                if not os.path.isfile(src):
                    continue
                if src.endswith('.stash'):
                    continue
                md5 = hashlib.md5(src.encode('utf8')).hexdigest()
                os.replace(src, os.path.join(stash_dir, md5))
                logging.info('Stashing %s as %s', src, md5)
                stash.write('%s %s\n' % (md5, src))

        return True
项目:nimp    作者:dontnod    | 项目源码 | 文件源码
def _run_fileset(self, env, file_mapper):
        stash_dir = env.format('{root_dir}/.nimp/stash')
        stash_file = os.path.join(stash_dir, env.fileset)

        success = True
        with open(stash_file, 'r') as stash:
            for dst in stash.readlines():
                try:
                    md5, dst = dst.strip().split()
                    src = os.path.join(stash_dir, md5)
                    logging.info('Unstashing %s as %s', md5, dst)
                    nimp.system.safe_delete(dst)
                    os.replace(src, dst)
                except Exception as ex: #pylint: disable=broad-except
                    logging.error(ex)
                    success = False
        nimp.system.safe_delete(stash_file)

        return success
项目:gym    作者:openai    | 项目源码 | 文件源码
def atomic_write(filepath, binary=False, fsync=False):
    """ Writeable file object that atomically updates a file (using a temporary file). In some cases (namely Python < 3.3 on Windows), this could result in an existing file being temporarily unlinked.

    :param filepath: the file path to be opened
    :param binary: whether to open the file in a binary mode instead of textual
    :param fsync: whether to force write the file to disk
    """

    tmppath = filepath + '~'
    while os.path.isfile(tmppath):
        tmppath += '~'
    try:
        with open(tmppath, 'wb' if binary else 'w') as file:
            yield file
            if fsync:
                file.flush()
                os.fsync(file.fileno())
        replace(tmppath, filepath)
    finally:
        try:
            os.remove(tmppath)
        except (IOError, OSError):
            pass
项目:goldmine    作者:Armored-Dragon    | 项目源码 | 文件源码
def save_json(self, filename, data):
        """Atomically saves json file"""
        rnd = randint(1000, 9999)
        path, ext = os.path.splitext(filename)
        tmp_file = "{}-{}.tmp".format(path, rnd)
        self._save_json(tmp_file, data)
        try:
            self._read_json(tmp_file)
        except json.decoder.JSONDecodeError:
            self.logger.exception("Attempted to write file {} but JSON "
                                  "integrity check on tmp file has failed. "
                                  "The original file is unaltered."
                                  "".format(filename))
            return False
        os.replace(tmp_file, filename)
        return True
项目:Shallus-Bot    作者:cgropp    | 项目源码 | 文件源码
def save_json(self, filename, data):
        """Atomically saves json file"""
        rnd = randint(1000, 9999)
        path, ext = os.path.splitext(filename)
        tmp_file = "{}-{}.tmp".format(path, rnd)
        self._save_json(tmp_file, data)
        try:
            self._read_json(tmp_file)
        except json.decoder.JSONDecodeError:
            self.logger.exception("Attempted to write file {} but JSON "
                                  "integrity check on tmp file has failed. "
                                  "The original file is unaltered."
                                  "".format(filename))
            return False
        os.replace(tmp_file, filename)
        return True
项目:showroom    作者:wlerin    | 项目源码 | 文件源码
def rename(srcpath, destpath, data):
    import glob
    import os

    episodes = sorted(data.keys())
    name_pattern = '{date} Showroom - AKB48 no Myonichi Yoroshiku! #{ep} ({name}).mp4'
    long_date_pattern = '20{}-{}-{}'

    for file in glob.glob('{}/*.mp4'.format(srcpath)):
        match = file_re.match(os.path.basename(file))
        date = match.groupdict()['date']
        long_date = long_date_pattern.format(*[date[i:i+2] for i in range(0, 6, 2)])
        new_file = name_pattern.format(
            date=date,
            ep=episodes.index(long_date)+1,
            name=data[long_date]['engName'],
        )
        os.replace(
            file,
            '{}/{}'.format(destpath, new_file)
        )
项目:showroom    作者:wlerin    | 项目源码 | 文件源码
def prune_folder(folder, needed_list):
    oldcwd = os.getcwd()
    os.chdir(folder)

    os.makedirs('unneeded'.format(folder), exist_ok=True)
    files = glob.glob('*.mp4'.format(folder))
    for file in files:
        # print(repr(file))
        if file not in needed_list:
            # print('{} -> {}'.format(file, 'unneeded/{}'.format(file)))
            os.replace(file, 'unneeded/{}'.format(file))
        else:
            # print('Needed:', repr(file))
            pass

    os.chdir(oldcwd)
项目:showroom    作者:wlerin    | 项目源码 | 文件源码
def update_streaming_url_web(self):
        """Updates streaming urls from the showroom website.

        Fallback if api changes again"""
        r = self._session.get(self._room.long_url)

        if r.ok:
            match = hls_url_re1.search(r.text)
            # TODO: check if there was a match
            if not match:
                # no url found in the page
                # probably the stream has ended but is_live returned true
                # just don't update the urls
                # except what happens if they are still "" ?
                return
            hls_url = match.group(0)
            rtmps_url = match.group(1).replace('https', 'rtmps')
            rtmp_url = "rtmp://{}.{}.{}.{}:1935/liveedge/{}".format(*match.groups()[1:])
            with self._lock:
                self._rtmp_url = rtmp_url
                self._hls_url = hls_url
                self._rtmps_url = rtmps_url
项目:mlens    作者:flennerhag    | 项目源码 | 文件源码
def concurrency_safe_rename(src, dst):
        """Renames ``src`` into ``dst`` overwriting ``dst`` if it exists.

        On Windows os.replace (or for Python 2.7 its implementation
        through MoveFileExW) can yield permission errors if executed by
        two different processes.
        """
        max_sleep_time = 1
        total_sleep_time = 0
        sleep_time = 0.001
        while total_sleep_time < max_sleep_time:
            try:
                replace(src, dst)
                break
            except Exception as exc:
                if getattr(exc, 'winerror', None) == error_access_denied:
                    time.sleep(sleep_time)
                    total_sleep_time += sleep_time
                    sleep_time *= 2
                else:
                    raise
        else:
            raise
项目:pipenv    作者:pypa    | 项目源码 | 文件源码
def replace(src, dst):
        # argument names match stdlib docs, docstring below
        try:
            # ReplaceFile fails if the dest file does not exist, so
            # first try to rename it into position
            os.rename(src, dst)
            return
        except WindowsError as we:
            if we.errno == errno.EEXIST:
                pass  # continue with the ReplaceFile logic below
            else:
                raise

        src = path_to_unicode(src)
        dst = path_to_unicode(dst)
        res = _ReplaceFile(c_wchar_p(dst), c_wchar_p(src),
                           None, 0, None, None)
        if not res:
            raise OSError('failed to replace %r with %r' % (dst, src))
        return
项目:AI-Fight-the-Landlord    作者:YoungGer    | 项目源码 | 文件源码
def atomic_write(filepath, binary=False, fsync=False):
    """ Writeable file object that atomically updates a file (using a temporary file). In some cases (namely Python < 3.3 on Windows), this could result in an existing file being temporarily unlinked.

    :param filepath: the file path to be opened
    :param binary: whether to open the file in a binary mode instead of textual
    :param fsync: whether to force write the file to disk
    """

    tmppath = filepath + '~'
    while os.path.isfile(tmppath):
        tmppath += '~'
    try:
        with open(tmppath, 'wb' if binary else 'w') as file:
            yield file
            if fsync:
                file.flush()
                os.fsync(file.fileno())
        replace(tmppath, filepath)
    finally:
        try:
            os.remove(tmppath)
        except (IOError, OSError):
            pass
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def close(self):
            try:
                if self.__inFile:
                    self.__inFile.close()
                if self.__outFile:
                    self.__outFile.close()
                    os.replace(self.__outFile.name, self.__cachePath)
            except OSError as e:
                raise BuildError("Error closing hash cache: " + str(e))
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def __generatePackages(self, nameFormatter, env, cacheKey, sandboxEnabled):
        # use separate caches with and without sandbox
        if sandboxEnabled:
            cacheName = ".bob-packages-sb.pickle"
        else:
            cacheName = ".bob-packages.pickle"

        # try to load the persisted packages
        states = { n:s() for (n,s) in self.__states.items() }
        rootPkg = Package()
        rootPkg.construct("<root>", [], nameFormatter, None, [], [], states,
            {}, {}, None, None, [], {}, -1)
        try:
            with open(cacheName, "rb") as f:
                persistedCacheKey = f.read(len(cacheKey))
                if cacheKey == persistedCacheKey:
                    tmp = PackageUnpickler(f, self.getRecipe, self.__plugins,
                                           nameFormatter).load()
                    return tmp.toStep(nameFormatter, rootPkg).getPackage()
        except (EOFError, OSError, pickle.UnpicklingError):
            pass

        # not cached -> calculate packages
        result = self.__rootRecipe.prepare(nameFormatter, env, sandboxEnabled,
                                           states)[0]

        # save package tree for next invocation
        tmp = CoreStepRef(rootPkg, result.getPackageStep())
        try:
            newCacheName = cacheName + ".new"
            with open(newCacheName, "wb") as f:
                f.write(cacheKey)
                PackagePickler(f, nameFormatter).dump(tmp)
            os.replace(newCacheName, cacheName)
        except OSError as e:
            print("Error saving internal state:", str(e), file=sys.stderr)

        return result
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def _make_text_stream(stream, encoding, errors):
    if encoding is None:
        encoding = get_best_encoding(stream)
    if errors is None:
        errors = 'replace'
    return _NonClosingTextIOWrapper(stream, encoding, errors,
                                    line_buffering=True)
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def filename_to_ui(value):
        if isinstance(value, bytes):
            value = value.decode(get_filesystem_encoding(), 'replace')
        return value
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def _force_correct_text_reader(text_reader, encoding, errors):
        if _is_binary_reader(text_reader, False):
            binary_reader = text_reader
        else:
            # If there is no target encoding set, we need to verify that the
            # reader is not actually misconfigured.
            if encoding is None and not _stream_is_misconfigured(text_reader):
                return text_reader

            if _is_compatible_text_stream(text_reader, encoding, errors):
                return text_reader

            # If the reader has no encoding, we try to find the underlying
            # binary reader for it.  If that fails because the environment is
            # misconfigured, we silently go with the same reader because this
            # is too common to happen.  In that case, mojibake is better than
            # exceptions.
            binary_reader = _find_binary_reader(text_reader)
            if binary_reader is None:
                return text_reader

        # At this point, we default the errors to replace instead of strict
        # because nobody handles those errors anyways and at this point
        # we're so fundamentally fucked that nothing can repair it.
        if errors is None:
            errors = 'replace'
        return _make_text_stream(binary_reader, encoding, errors)
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def _force_correct_text_writer(text_writer, encoding, errors):
        if _is_binary_writer(text_writer, False):
            binary_writer = text_writer
        else:
            # If there is no target encoding set, we need to verify that the
            # writer is not actually misconfigured.
            if encoding is None and not _stream_is_misconfigured(text_writer):
                return text_writer

            if _is_compatible_text_stream(text_writer, encoding, errors):
                return text_writer

            # If the writer has no encoding, we try to find the underlying
            # binary writer for it.  If that fails because the environment is
            # misconfigured, we silently go with the same writer because this
            # is too common to happen.  In that case, mojibake is better than
            # exceptions.
            binary_writer = _find_binary_writer(text_writer)
            if binary_writer is None:
                return text_writer

        # At this point, we default the errors to replace instead of strict
        # because nobody handles those errors anyways and at this point
        # we're so fundamentally fucked that nothing can repair it.
        if errors is None:
            errors = 'replace'
        return _make_text_stream(binary_writer, encoding, errors)
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def filename_to_ui(value):
        if isinstance(value, bytes):
            value = value.decode(get_filesystem_encoding(), 'replace')
        else:
            value = value.encode('utf-8', 'surrogateescape') \
                .decode('utf-8', 'replace')
        return value
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def _make_text_stream(stream, encoding, errors):
    if encoding is None:
        encoding = get_best_encoding(stream)
    if errors is None:
        errors = 'replace'
    return _NonClosingTextIOWrapper(stream, encoding, errors,
                                    line_buffering=True)
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def filename_to_ui(value):
        if isinstance(value, bytes):
            value = value.decode(get_filesystem_encoding(), 'replace')
        return value
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def _force_correct_text_reader(text_reader, encoding, errors):
        if _is_binary_reader(text_reader, False):
            binary_reader = text_reader
        else:
            # If there is no target encoding set, we need to verify that the
            # reader is not actually misconfigured.
            if encoding is None and not _stream_is_misconfigured(text_reader):
                return text_reader

            if _is_compatible_text_stream(text_reader, encoding, errors):
                return text_reader

            # If the reader has no encoding, we try to find the underlying
            # binary reader for it.  If that fails because the environment is
            # misconfigured, we silently go with the same reader because this
            # is too common to happen.  In that case, mojibake is better than
            # exceptions.
            binary_reader = _find_binary_reader(text_reader)
            if binary_reader is None:
                return text_reader

        # At this point, we default the errors to replace instead of strict
        # because nobody handles those errors anyways and at this point
        # we're so fundamentally fucked that nothing can repair it.
        if errors is None:
            errors = 'replace'
        return _make_text_stream(binary_reader, encoding, errors)
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def _force_correct_text_writer(text_writer, encoding, errors):
        if _is_binary_writer(text_writer, False):
            binary_writer = text_writer
        else:
            # If there is no target encoding set, we need to verify that the
            # writer is not actually misconfigured.
            if encoding is None and not _stream_is_misconfigured(text_writer):
                return text_writer

            if _is_compatible_text_stream(text_writer, encoding, errors):
                return text_writer

            # If the writer has no encoding, we try to find the underlying
            # binary writer for it.  If that fails because the environment is
            # misconfigured, we silently go with the same writer because this
            # is too common to happen.  In that case, mojibake is better than
            # exceptions.
            binary_writer = _find_binary_writer(text_writer)
            if binary_writer is None:
                return text_writer

        # At this point, we default the errors to replace instead of strict
        # because nobody handles those errors anyways and at this point
        # we're so fundamentally fucked that nothing can repair it.
        if errors is None:
            errors = 'replace'
        return _make_text_stream(binary_writer, encoding, errors)
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def filename_to_ui(value):
        if isinstance(value, bytes):
            value = value.decode(get_filesystem_encoding(), 'replace')
        else:
            value = value.encode('utf-8', 'surrogateescape') \
                .decode('utf-8', 'replace')
        return value
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def _make_text_stream(stream, encoding, errors):
    if encoding is None:
        encoding = get_best_encoding(stream)
    if errors is None:
        errors = 'replace'
    return _NonClosingTextIOWrapper(stream, encoding, errors,
                                    line_buffering=True)
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def filename_to_ui(value):
        if isinstance(value, bytes):
            value = value.decode(get_filesystem_encoding(), 'replace')
        return value
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def _force_correct_text_reader(text_reader, encoding, errors):
        if _is_binary_reader(text_reader, False):
            binary_reader = text_reader
        else:
            # If there is no target encoding set, we need to verify that the
            # reader is not actually misconfigured.
            if encoding is None and not _stream_is_misconfigured(text_reader):
                return text_reader

            if _is_compatible_text_stream(text_reader, encoding, errors):
                return text_reader

            # If the reader has no encoding, we try to find the underlying
            # binary reader for it.  If that fails because the environment is
            # misconfigured, we silently go with the same reader because this
            # is too common to happen.  In that case, mojibake is better than
            # exceptions.
            binary_reader = _find_binary_reader(text_reader)
            if binary_reader is None:
                return text_reader

        # At this point, we default the errors to replace instead of strict
        # because nobody handles those errors anyways and at this point
        # we're so fundamentally fucked that nothing can repair it.
        if errors is None:
            errors = 'replace'
        return _make_text_stream(binary_reader, encoding, errors)
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def _force_correct_text_writer(text_writer, encoding, errors):
        if _is_binary_writer(text_writer, False):
            binary_writer = text_writer
        else:
            # If there is no target encoding set, we need to verify that the
            # writer is not actually misconfigured.
            if encoding is None and not _stream_is_misconfigured(text_writer):
                return text_writer

            if _is_compatible_text_stream(text_writer, encoding, errors):
                return text_writer

            # If the writer has no encoding, we try to find the underlying
            # binary writer for it.  If that fails because the environment is
            # misconfigured, we silently go with the same writer because this
            # is too common to happen.  In that case, mojibake is better than
            # exceptions.
            binary_writer = _find_binary_writer(text_writer)
            if binary_writer is None:
                return text_writer

        # At this point, we default the errors to replace instead of strict
        # because nobody handles those errors anyways and at this point
        # we're so fundamentally fucked that nothing can repair it.
        if errors is None:
            errors = 'replace'
        return _make_text_stream(binary_writer, encoding, errors)
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def filename_to_ui(value):
        if isinstance(value, bytes):
            value = value.decode(get_filesystem_encoding(), 'replace')
        else:
            value = value.encode('utf-8', 'surrogateescape') \
                .decode('utf-8', 'replace')
        return value
项目:Quiver    作者:DeflatedPickle    | 项目源码 | 文件源码
def close(self):
        if self.directory_real:
            # print(self.directory, self.directory_real)
            with zipfile.ZipFile(self.directory_real, "w") as z:
                for root, dirs, files in os.walk(self.directory.replace("\\", "/"), topdown=True):
                    new_root = root.replace("\\", "/").split("/")
                    # print(root, dirs, files)
                    for name in files:
                        z.write(os.path.join(root, name), "/".join(new_root[new_root.index(
                            self.directory.replace("\\", "/").split("/")[-1]) + 1:]) + "/" + name)

            self.d.cleanup()

        self.destroy()
项目:Quiver    作者:DeflatedPickle    | 项目源码 | 文件源码
def replace_file(self):
        old_file = self.widget_tree.item(self.widget_tree.focus())["tags"][0]
        file = filedialog.askopenfile()

        if file:
            new_file = file.name

            # os.replace(new_file, old_file)
            shutil.copy2(new_file, old_file)
            self.cmd.tree_refresh()
项目:inplace    作者:jwodder    | 项目源码 | 文件源码
def force_rename(oldpath, newpath):
    """
    Move the file at ``oldpath`` to ``newpath``, deleting ``newpath``
    beforehand if necessary
    """
    if hasattr(os, 'replace'):  # Python 3.3+
        os.replace(oldpath, newpath)
    else:
        if sys.platform.startswith('win'):
            try_unlink(newpath)
        os.rename(oldpath, newpath)
项目:CrowdAnki    作者:Stvad    | 项目源码 | 文件源码
def parse_parts(self, parts):
        if six.PY2:
            parts = _py2_fsencode(parts)
        parsed = []
        sep = self.sep
        altsep = self.altsep
        drv = root = ''
        it = reversed(parts)
        for part in it:
            if not part:
                continue
            if altsep:
                part = part.replace(altsep, sep)
            drv, root, rel = self.splitroot(part)
            if sep in rel:
                for x in reversed(rel.split(sep)):
                    if x and x != '.':
                        parsed.append(intern(x))
            else:
                if rel and rel != '.':
                    parsed.append(intern(rel))
            if drv or root:
                if not drv:
                    # If no drive is present, try to find one in the previous
                    # parts. This makes the result of parsing e.g.
                    # ("C:", "/", "a") reasonably intuitive.
                    for part in it:
                        if not part:
                            continue
                        if altsep:
                            part = part.replace(altsep, sep)
                        drv = self.splitroot(part)[0]
                        if drv:
                            break
                break
        if drv or root:
            parsed.append(drv + root)
        parsed.reverse()
        return drv, root, parsed
项目:CrowdAnki    作者:Stvad    | 项目源码 | 文件源码
def as_posix(self):
        """Return the string representation of the path with forward (/)
        slashes."""
        f = self._flavour
        return str(self).replace(f.sep, '/')
项目:CrowdAnki    作者:Stvad    | 项目源码 | 文件源码
def replace(self, target):
        """
        Rename this path to the given path, clobbering the existing
        destination if it exists.
        """
        if sys.version_info < (3, 3):
            raise NotImplementedError("replace() is only available "
                                      "with Python 3.3 and later")
        if self._closed:
            self._raise_closed()
        self._accessor.replace(self, target)
项目:Inkxbot    作者:InkxtheSquid    | 项目源码 | 文件源码
def _dump(self):
        temp = '%s-%s.tmp' % (uuid.uuid4(), self.name)
        with open(temp, 'w', encoding='utf-8') as tmp:
            json.dump(self._db.copy(), tmp, ensure_ascii=True, cls=self.encoder, separators=(',', ':'))

        # atomically move the file
        os.replace(temp, self.name)
项目:RPoint    作者:george17-meet    | 项目源码 | 文件源码
def _make_text_stream(stream, encoding, errors):
    if encoding is None:
        encoding = get_best_encoding(stream)
    if errors is None:
        errors = 'replace'
    return _NonClosingTextIOWrapper(stream, encoding, errors,
                                    line_buffering=True)
项目:RPoint    作者:george17-meet    | 项目源码 | 文件源码
def filename_to_ui(value):
        if isinstance(value, bytes):
            value = value.decode(get_filesystem_encoding(), 'replace')
        return value
项目:RPoint    作者:george17-meet    | 项目源码 | 文件源码
def _force_correct_text_reader(text_reader, encoding, errors):
        if _is_binary_reader(text_reader, False):
            binary_reader = text_reader
        else:
            # If there is no target encoding set, we need to verify that the
            # reader is not actually misconfigured.
            if encoding is None and not _stream_is_misconfigured(text_reader):
                return text_reader

            if _is_compatible_text_stream(text_reader, encoding, errors):
                return text_reader

            # If the reader has no encoding, we try to find the underlying
            # binary reader for it.  If that fails because the environment is
            # misconfigured, we silently go with the same reader because this
            # is too common to happen.  In that case, mojibake is better than
            # exceptions.
            binary_reader = _find_binary_reader(text_reader)
            if binary_reader is None:
                return text_reader

        # At this point, we default the errors to replace instead of strict
        # because nobody handles those errors anyways and at this point
        # we're so fundamentally fucked that nothing can repair it.
        if errors is None:
            errors = 'replace'
        return _make_text_stream(binary_reader, encoding, errors)
项目:RPoint    作者:george17-meet    | 项目源码 | 文件源码
def _force_correct_text_writer(text_writer, encoding, errors):
        if _is_binary_writer(text_writer, False):
            binary_writer = text_writer
        else:
            # If there is no target encoding set, we need to verify that the
            # writer is not actually misconfigured.
            if encoding is None and not _stream_is_misconfigured(text_writer):
                return text_writer

            if _is_compatible_text_stream(text_writer, encoding, errors):
                return text_writer

            # If the writer has no encoding, we try to find the underlying
            # binary writer for it.  If that fails because the environment is
            # misconfigured, we silently go with the same writer because this
            # is too common to happen.  In that case, mojibake is better than
            # exceptions.
            binary_writer = _find_binary_writer(text_writer)
            if binary_writer is None:
                return text_writer

        # At this point, we default the errors to replace instead of strict
        # because nobody handles those errors anyways and at this point
        # we're so fundamentally fucked that nothing can repair it.
        if errors is None:
            errors = 'replace'
        return _make_text_stream(binary_writer, encoding, errors)
项目:RPoint    作者:george17-meet    | 项目源码 | 文件源码
def filename_to_ui(value):
        if isinstance(value, bytes):
            value = value.decode(get_filesystem_encoding(), 'replace')
        else:
            value = value.encode('utf-8', 'surrogateescape') \
                .decode('utf-8', 'replace')
        return value
项目:subpar    作者:google    | 项目源码 | 文件源码
def create_final_from_temp(self, temp_parfile_name):
        """Move newly created parfile to its final filename."""
        # Python 2 doesn't have os.replace, so use os.rename which is
        # not atomic in all cases.
        os.chmod(temp_parfile_name, 0o0755)
        os.rename(temp_parfile_name, self.output_filename)
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def _make_text_stream(stream, encoding, errors):
    if encoding is None:
        encoding = get_best_encoding(stream)
    if errors is None:
        errors = 'replace'
    return _NonClosingTextIOWrapper(stream, encoding, errors,
                                    line_buffering=True)
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def filename_to_ui(value):
        if isinstance(value, bytes):
            value = value.decode(get_filesystem_encoding(), 'replace')
        return value
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def _force_correct_text_reader(text_reader, encoding, errors):
        if _is_binary_reader(text_reader, False):
            binary_reader = text_reader
        else:
            # If there is no target encoding set, we need to verify that the
            # reader is not actually misconfigured.
            if encoding is None and not _stream_is_misconfigured(text_reader):
                return text_reader

            if _is_compatible_text_stream(text_reader, encoding, errors):
                return text_reader

            # If the reader has no encoding, we try to find the underlying
            # binary reader for it.  If that fails because the environment is
            # misconfigured, we silently go with the same reader because this
            # is too common to happen.  In that case, mojibake is better than
            # exceptions.
            binary_reader = _find_binary_reader(text_reader)
            if binary_reader is None:
                return text_reader

        # At this point, we default the errors to replace instead of strict
        # because nobody handles those errors anyways and at this point
        # we're so fundamentally fucked that nothing can repair it.
        if errors is None:
            errors = 'replace'
        return _make_text_stream(binary_reader, encoding, errors)