Python tarfile 模块,TarError() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tarfile.TarError()

项目:jd4    作者:vijos    | 项目源码 | 文件源码
def try_open_tar(file):
    try:
        tar_file = tarfile.open(fileobj=file)
    except tarfile.TarError:
        return None
    lock = RLock()
    def open(name):
        try:
            with lock:
                file = tar_file.extractfile(name)
        except KeyError:
            raise FileNotFoundError(name) from None
        unlocked_read = file.read
        def locked_read(n=-1):
            with lock:
                return unlocked_read(n)
        file.read = locked_read
        return file
    return open
项目:temci    作者:parttimenerd    | 项目源码 | 文件源码
def copy_revision(self, id_or_num: t.Union[int, str], sub_dir: str, dest_dirs: t.List[str]):
        typecheck_locals(id_or_num=self.id_type, dest_dirs=List(Str())|Str())
        if isinstance(dest_dirs, str):
            dest_dirs = [dest_dirs]
        if id_or_num == -1 or id_or_num == "HEAD":
            self._copy_dir(sub_dir, dest_dirs)
        sub_dir = os.path.join(self.base_path, sub_dir)
        tar_file = os.path.abspath(os.path.join(Settings()["tmp_dir"], "tmp.tar"))
        cmd = "git archive --format tar --output {} {}".format(tar_file, self._commit_number_to_id(id_or_num))
        self._exec_command(cmd)
        try:
            with tarfile.open(tar_file) as tar:
                for dest in dest_dirs:
                    if sub_dir == ".":
                        tar.extractall(os.path.abspath(dest))
                    else:
                        subdir_and_files = [
                                tarinfo for tarinfo in tar.getmembers() if tarinfo.name.startswith(sub_dir + "/") or tarinfo.name is sub_dir
                            ]
                        tar.extractall(members=subdir_and_files, path=os.path.abspath(dest))
        except tarfile.TarError as err:
            os.remove(tar_file)
            raise VCSError(str(err))
        os.remove(tar_file)
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def extract(self, password=None):
        self.verify(password)

        try:
            with tarfile.open(self.filename, errorlevel=2) as t:
                t.extractall(self.dest)
                self.files = t.getnames()
            return self.files

        except tarfile.ExtractError, e:
            self.log_warning(e)

        except tarfile.CompressionError, e:
            raise CRCError(e)

        except (OSError, tarfile.TarError), e:
            raise ArchiveError(e)
项目:pyload-plugins    作者:pyload    | 项目源码 | 文件源码
def extract(self, password=None):
        self.verify(password)

        try:
            with tarfile.open(self.filename, errorlevel=2) as t:
                t.extractall(self.dest)
                self.files = t.getnames()
            return self.files

        except tarfile.ExtractError, e:
            self.log_warning(e)

        except tarfile.CompressionError, e:
            raise CRCError(e)

        except (OSError, tarfile.TarError), e:
            raise ArchiveError(e)
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def __find_extract_offsets(self):
                """Private helper method to find offsets for individual archive
                member extraction.
                """

                if self.__extract_offsets:
                        return

                # This causes the entire archive to be read, but is the only way
                # to find the offsets to extract everything.
                try:
                        for member in self.__arc_tfile.getmembers():
                                self.__extract_offsets[member.name] = \
                                    member.offset
                except tf.TarError:
                        # Read error encountered.
                        raise InvalidArchive(self.__arc_name)
                except EnvironmentError as e:
                        raise apx._convert_error(e)
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def downloadPackage(self, buildId, audit, content, verbose):
        if not self.canDownloadLocal():
            return False

        if verbose > 0:
            print(colorize("   DOWNLOAD  {} from {} .. "
                            .format(content, self._remoteName(buildId, ARTIFACT_SUFFIX)), "32"),
                  end="")
        else:
            print(colorize("   DOWNLOAD  {} .. ".format(content), "32"), end="")
        try:
            with self._openDownloadFile(buildId, ARTIFACT_SUFFIX) as (name, fileobj):
                with tarfile.open(name, "r|*", fileobj=fileobj, errorlevel=1) as tar:
                    removePath(audit)
                    removePath(content)
                    os.makedirs(content)
                    self.__extractPackage(tar, audit, content)
            print(colorize("ok", "32"))
            return True
        except ArtifactNotFoundError:
            print(colorize("not found", "33"))
            return False
        except ArtifactDownloadError as e:
            print(colorize(e.reason, "33"))
            return False
        except BuildError as e:
            print(colorize("error", "31"))
            raise
        except OSError as e:
            print(colorize("error", "31"))
            raise BuildError("Cannot download artifact: " + str(e))
        except tarfile.TarError as e:
            print(colorize("error", "31"))
            raise BuildError("Error extracting binary artifact: " + str(e))
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def uploadPackage(self, buildId, audit, content, verbose):
        if not self.canUploadLocal():
            return

        shown = False
        try:
            with self._openUploadFile(buildId, ARTIFACT_SUFFIX) as (name, fileobj):
                pax = { 'bob-archive-vsn' : "1" }
                if verbose > 0:
                    print(colorize("   UPLOAD    {} to {} .. "
                                    .format(content, self._remoteName(buildId, ARTIFACT_SUFFIX)), "32"),
                          end="")
                else:
                    print(colorize("   UPLOAD    {} .. ".format(content), "32"), end="")
                shown = True
                with gzip.open(name or fileobj, 'wb', 6) as gzf:
                    with tarfile.open(name, "w", fileobj=gzf,
                                      format=tarfile.PAX_FORMAT, pax_headers=pax) as tar:
                        tar.add(audit, "meta/" + os.path.basename(audit))
                        tar.add(content, arcname="content")
            print(colorize("ok", "32"))
        except ArtifactExistsError:
            if shown:
                print("skipped ({} exists in archive)".format(content))
            else:
                print("   UPLOAD    skipped ({} exists in archive)".format(content))
        except (ArtifactUploadError, tarfile.TarError, OSError) as e:
            if shown:
                if verbose > 0:
                    print(colorize("error ("+str(e)+")", "31"))
                else:
                    print(colorize("error", "31"))
            if not self.__ignoreErrors:
                raise BuildError("Cannot upload artifact: " + str(e))
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
    """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`

    Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
    by ``tarfile.open()``).  See ``unpack_archive()`` for an explanation
    of the `progress_filter` argument.
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise UnrecognizedFormat(
            "%s is not a compressed or uncompressed tar file" % (filename,)
        )
    try:
        tarobj.chown = lambda *args: None   # don't do any chowning!
        for member in tarobj:
            name = member.name
            # don't extract absolute paths or ones with .. in them
            if not name.startswith('/') and '..' not in name.split('/'):
                prelim_dst = os.path.join(extract_dir, *name.split('/'))

                # resolve any links and to extract the link targets as normal files
                while member is not None and (member.islnk() or member.issym()):
                    linkpath = member.linkname
                    if member.issym():
                        linkpath = posixpath.join(posixpath.dirname(member.name), linkpath)
                        linkpath = posixpath.normpath(linkpath)
                    member = tarobj._getmember(linkpath)

                if member is not None and (member.isfile() or member.isdir()):
                    final_dst = progress_filter(name, prelim_dst)
                    if final_dst:
                        if final_dst.endswith(os.sep):
                            final_dst = final_dst[:-1]
                        try:
                            tarobj._extract_member(member, final_dst)  # XXX Ugh
                        except tarfile.ExtractError:
                            pass    # chown/chmod/mkfifo/mknode/makedev failed
        return True
    finally:
        tarobj.close()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _generate_tar(dir_path):
    """Private function that reads a local directory and generates a tar archive from it"""
    try:
        with tarfile.open(dir_path + '.tar', 'w') as tar:
            tar.add(dir_path)
    except tarfile.TarError as e:
        stderr("Error: tar archive creation failed [" + str(e) + "]", exit=1)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _generate_tar(dir_path):
    """Private function that reads a local directory and generates a tar archive from it"""
    try:
        with tarfile.open(dir_path + '.tar', 'w') as tar:
            tar.add(dir_path)
    except tarfile.TarError as e:
        stderr("Error: tar archive creation failed [" + str(e) + "]", exit=1)
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
    """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`

    Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
    by ``tarfile.open()``).  See ``unpack_archive()`` for an explanation
    of the `progress_filter` argument.
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise UnrecognizedFormat(
            "%s is not a compressed or uncompressed tar file" % (filename,)
        )
    try:
        tarobj.chown = lambda *args: None   # don't do any chowning!
        for member in tarobj:
            name = member.name
            # don't extract absolute paths or ones with .. in them
            if not name.startswith('/') and '..' not in name.split('/'):
                prelim_dst = os.path.join(extract_dir, *name.split('/'))

                # resolve any links and to extract the link targets as normal files
                while member is not None and (member.islnk() or member.issym()):
                    linkpath = member.linkname
                    if member.issym():
                        linkpath = posixpath.join(posixpath.dirname(member.name), linkpath)
                        linkpath = posixpath.normpath(linkpath)
                    member = tarobj._getmember(linkpath)

                if member is not None and (member.isfile() or member.isdir()):
                    final_dst = progress_filter(name, prelim_dst)
                    if final_dst:
                        if final_dst.endswith(os.sep):
                            final_dst = final_dst[:-1]
                        try:
                            tarobj._extract_member(member, final_dst)  # XXX Ugh
                        except tarfile.ExtractError:
                            pass    # chown/chmod/mkfifo/mknode/makedev failed
        return True
    finally:
        tarobj.close()
项目:kuberdock-platform    作者:cloudlinux    | 项目源码 | 文件源码
def _extract_tar(self, file_obj, path):
            try:
                with tarfile.open(fileobj=file_obj, mode='r:gz') as archive:
                    archive.extractall(path)
            except tarfile.TarError:
                raise self.BadArchive
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def _unpack_tarfile(filename, extract_dir):
    """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise ReadError(
            "%s is not a compressed or uncompressed tar file" % filename)
    try:
        tarobj.extractall(extract_dir)
    finally:
        tarobj.close()
项目:news-for-good    作者:thecodinghub    | 项目源码 | 文件源码
def _unpack_tarfile(filename, extract_dir):
    """Unpack tar/tar.gz/tar.bz2/tar.xz `filename` to `extract_dir`
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise ReadError(
            "%s is not a compressed or uncompressed tar file" % filename)
    try:
        tarobj.extractall(extract_dir)
    finally:
        tarobj.close()
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
    """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`

    Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
    by ``tarfile.open()``).  See ``unpack_archive()`` for an explanation
    of the `progress_filter` argument.
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise UnrecognizedFormat(
            "%s is not a compressed or uncompressed tar file" % (filename,)
        )
    try:
        tarobj.chown = lambda *args: None   # don't do any chowning!
        for member in tarobj:
            name = member.name
            # don't extract absolute paths or ones with .. in them
            if not name.startswith('/') and '..' not in name.split('/'):
                prelim_dst = os.path.join(extract_dir, *name.split('/'))

                # resolve any links and to extract the link targets as normal files
                while member is not None and (member.islnk() or member.issym()):
                    linkpath = member.linkname
                    if member.issym():
                        linkpath = posixpath.join(posixpath.dirname(member.name), linkpath)
                        linkpath = posixpath.normpath(linkpath)
                    member = tarobj._getmember(linkpath)

                if member is not None and (member.isfile() or member.isdir()):
                    final_dst = progress_filter(name, prelim_dst)
                    if final_dst:
                        if final_dst.endswith(os.sep):
                            final_dst = final_dst[:-1]
                        try:
                            tarobj._extract_member(member, final_dst)  # XXX Ugh
                        except tarfile.ExtractError:
                            pass    # chown/chmod/mkfifo/mknode/makedev failed
        return True
    finally:
        tarobj.close()
项目:Tencent_Cartoon_Download    作者:Fretice    | 项目源码 | 文件源码
def _unpack_tarfile(filename, extract_dir):
    """Unpack tar/tar.gz/tar.bz2/tar.xz `filename` to `extract_dir`
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise ReadError(
            "%s is not a compressed or uncompressed tar file" % filename)
    try:
        tarobj.extractall(extract_dir)
    finally:
        tarobj.close()
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
    """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`

    Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
    by ``tarfile.open()``).  See ``unpack_archive()`` for an explanation
    of the `progress_filter` argument.
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise UnrecognizedFormat(
            "%s is not a compressed or uncompressed tar file" % (filename,)
        )
    try:
        tarobj.chown = lambda *args: None   # don't do any chowning!
        for member in tarobj:
            name = member.name
            # don't extract absolute paths or ones with .. in them
            if not name.startswith('/') and '..' not in name.split('/'):
                prelim_dst = os.path.join(extract_dir, *name.split('/'))

                # resolve any links and to extract the link targets as normal files
                while member is not None and (member.islnk() or member.issym()):
                    linkpath = member.linkname
                    if member.issym():
                        linkpath = posixpath.join(posixpath.dirname(member.name), linkpath)
                        linkpath = posixpath.normpath(linkpath)
                    member = tarobj._getmember(linkpath)

                if member is not None and (member.isfile() or member.isdir()):
                    final_dst = progress_filter(name, prelim_dst)
                    if final_dst:
                        if final_dst.endswith(os.sep):
                            final_dst = final_dst[:-1]
                        try:
                            tarobj._extract_member(member, final_dst)  # XXX Ugh
                        except tarfile.ExtractError:
                            pass    # chown/chmod/mkfifo/mknode/makedev failed
        return True
    finally:
        tarobj.close()
项目:fieldsight-kobocat    作者:awemulya    | 项目源码 | 文件源码
def _unpack_tarfile(filename, extract_dir):
    """Unpack tar/tar.gz/tar.bz2/tar.xz `filename` to `extract_dir`
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise ReadError(
            "%s is not a compressed or uncompressed tar file" % filename)
    try:
        tarobj.extractall(extract_dir)
    finally:
        tarobj.close()
项目:ingestors    作者:alephdata    | 项目源码 | 文件源码
def unpack(self, file_path, temp_dir):
        try:
            with tarfile.open(name=file_path, mode='r:*') as tf:
                tf.extractall(temp_dir)
        except tarfile.TarError as err:
            raise ProcessingException('Invalid Tar file: %s' % err)
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def verify(self, password=None):
        try:
            t = tarfile.open(self.filename, errorlevel=1)

        except tarfile.CompressionError, e:
            raise CRCError(e)

        except (OSError, tarfile.TarError), e:
            raise ArchiveError(e)

        else:
            t.close()
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
    """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`

    Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
    by ``tarfile.open()``).  See ``unpack_archive()`` for an explanation
    of the `progress_filter` argument.
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise UnrecognizedFormat(
            "%s is not a compressed or uncompressed tar file" % (filename,)
        )
    try:
        tarobj.chown = lambda *args: None   # don't do any chowning!
        for member in tarobj:
            name = member.name
            # don't extract absolute paths or ones with .. in them
            if not name.startswith('/') and '..' not in name.split('/'):
                prelim_dst = os.path.join(extract_dir, *name.split('/'))

                # resolve any links and to extract the link targets as normal files
                while member is not None and (member.islnk() or member.issym()):
                    linkpath = member.linkname
                    if member.issym():
                        linkpath = posixpath.join(posixpath.dirname(member.name), linkpath)
                        linkpath = posixpath.normpath(linkpath)
                    member = tarobj._getmember(linkpath)

                if member is not None and (member.isfile() or member.isdir()):
                    final_dst = progress_filter(name, prelim_dst)
                    if final_dst:
                        if final_dst.endswith(os.sep):
                            final_dst = final_dst[:-1]
                        try:
                            tarobj._extract_member(member, final_dst)  # XXX Ugh
                        except tarfile.ExtractError:
                            pass    # chown/chmod/mkfifo/mknode/makedev failed
        return True
    finally:
        tarobj.close()
项目:NeuroMobile    作者:AndrewADykman    | 项目源码 | 文件源码
def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
    """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`

    Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
    by ``tarfile.open()``).  See ``unpack_archive()`` for an explanation
    of the `progress_filter` argument.
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise UnrecognizedFormat(
            "%s is not a compressed or uncompressed tar file" % (filename,)
        )
    try:
        tarobj.chown = lambda *args: None   # don't do any chowning!
        for member in tarobj:
            name = member.name
            # don't extract absolute paths or ones with .. in them
            if not name.startswith('/') and '..' not in name.split('/'):
                prelim_dst = os.path.join(extract_dir, *name.split('/'))

                # resolve any links and to extract the link targets as normal files
                while member is not None and (member.islnk() or member.issym()):
                    linkpath = member.linkname
                    if member.issym():
                        linkpath = posixpath.join(posixpath.dirname(member.name), linkpath)
                        linkpath = posixpath.normpath(linkpath)
                    member = tarobj._getmember(linkpath)

                if member is not None and (member.isfile() or member.isdir()):
                    final_dst = progress_filter(name, prelim_dst)
                    if final_dst:
                        if final_dst.endswith(os.sep):
                            final_dst = final_dst[:-1]
                        try:
                            tarobj._extract_member(member, final_dst)  # XXX Ugh
                        except tarfile.ExtractError:
                            pass    # chown/chmod/mkfifo/mknode/makedev failed
        return True
    finally:
        tarobj.close()
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def _unpack_tarfile(filename, extract_dir):
    """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise ReadError(
            "%s is not a compressed or uncompressed tar file" % filename)
    try:
        tarobj.extractall(extract_dir)
    finally:
        tarobj.close()
项目:devpi    作者:devpi    | 项目源码 | 文件源码
def Archive(path_or_file):
    """ return in-memory Archive object, wrapping ZipArchive or TarArchive
    with uniform methods.  If an error is raised, any passed in file will
    be closed. An Archive instance acts as a context manager so that
    you can use::

        with Archive(...) as archive:
            archive.extract(...)  # or other methods

    and be sure that file handles will be closed.
    If you do not use it as a context manager, you need to call
    archive.close() yourself.
    """
    if hasattr(path_or_file, "seek"):
        f = path_or_file
    else:
        f = open(str(path_or_file), "rb")
    try:
        try:
            return ZipArchive(f)
        except zipfile.BadZipfile:
            f.seek(0)
            try:
                return TarArchive(f)
            except tarfile.TarError:
                raise UnsupportedArchive()
    except Exception:
        f.close()
        raise
项目:CloudPrint    作者:William-An    | 项目源码 | 文件源码
def _unpack_tarfile(filename, extract_dir):
    """Unpack tar/tar.gz/tar.bz2/tar.xz `filename` to `extract_dir`
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise ReadError(
            "%s is not a compressed or uncompressed tar file" % filename)
    try:
        tarobj.extractall(extract_dir)
    finally:
        tarobj.close()
项目:dymo-m10-python    作者:pbrf    | 项目源码 | 文件源码
def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
    """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`

    Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
    by ``tarfile.open()``).  See ``unpack_archive()`` for an explanation
    of the `progress_filter` argument.
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise UnrecognizedFormat(
            "%s is not a compressed or uncompressed tar file" % (filename,)
        )
    try:
        tarobj.chown = lambda *args: None   # don't do any chowning!
        for member in tarobj:
            name = member.name
            # don't extract absolute paths or ones with .. in them
            if not name.startswith('/') and '..' not in name.split('/'):
                prelim_dst = os.path.join(extract_dir, *name.split('/'))

                # resolve any links and to extract the link targets as normal files
                while member is not None and (member.islnk() or member.issym()):
                    linkpath = member.linkname
                    if member.issym():
                        linkpath = posixpath.join(posixpath.dirname(member.name), linkpath)
                        linkpath = posixpath.normpath(linkpath)
                    member = tarobj._getmember(linkpath)

                if member is not None and (member.isfile() or member.isdir()):
                    final_dst = progress_filter(name, prelim_dst)
                    if final_dst:
                        if final_dst.endswith(os.sep):
                            final_dst = final_dst[:-1]
                        try:
                            tarobj._extract_member(member, final_dst)  # XXX Ugh
                        except tarfile.ExtractError:
                            pass    # chown/chmod/mkfifo/mknode/makedev failed
        return True
    finally:
        tarobj.close()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _unpack_tarfile(filename, extract_dir):
    """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise ReadError(
            "%s is not a compressed or uncompressed tar file" % filename)
    try:
        tarobj.extractall(extract_dir)
    finally:
        tarobj.close()
项目:gardenbot    作者:GoestaO    | 项目源码 | 文件源码
def _unpack_tarfile(filename, extract_dir):
    """Unpack tar/tar.gz/tar.bz2/tar.xz `filename` to `extract_dir`
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise ReadError(
            "%s is not a compressed or uncompressed tar file" % filename)
    try:
        tarobj.extractall(extract_dir)
    finally:
        tarobj.close()
项目:projeto    作者:BarmyPenguin    | 项目源码 | 文件源码
def _unpack_tarfile(filename, extract_dir):
    """Unpack tar/tar.gz/tar.bz2/tar.xz `filename` to `extract_dir`
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise ReadError(
            "%s is not a compressed or uncompressed tar file" % filename)
    try:
        tarobj.extractall(extract_dir)
    finally:
        tarobj.close()
项目:flask-zhenai-mongo-echarts    作者:Fretice    | 项目源码 | 文件源码
def _unpack_tarfile(filename, extract_dir):
    """Unpack tar/tar.gz/tar.bz2/tar.xz `filename` to `extract_dir`
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise ReadError(
            "%s is not a compressed or uncompressed tar file" % filename)
    try:
        tarobj.extractall(extract_dir)
    finally:
        tarobj.close()
项目:aweasome_learning    作者:Knight-ZXW    | 项目源码 | 文件源码
def _unpack_tarfile(filename, extract_dir):
    """Unpack tar/tar.gz/tar.bz2/tar.xz `filename` to `extract_dir`
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise ReadError(
            "%s is not a compressed or uncompressed tar file" % filename)
    try:
        tarobj.extractall(extract_dir)
    finally:
        tarobj.close()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
    """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`

    Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
    by ``tarfile.open()``).  See ``unpack_archive()`` for an explanation
    of the `progress_filter` argument.
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise UnrecognizedFormat(
            "%s is not a compressed or uncompressed tar file" % (filename,)
        )
    try:
        tarobj.chown = lambda *args: None   # don't do any chowning!
        for member in tarobj:
            name = member.name
            # don't extract absolute paths or ones with .. in them
            if not name.startswith('/') and '..' not in name.split('/'):
                prelim_dst = os.path.join(extract_dir, *name.split('/'))

                # resolve any links and to extract the link targets as normal files
                while member is not None and (member.islnk() or member.issym()):
                    linkpath = member.linkname
                    if member.issym():
                        linkpath = posixpath.join(posixpath.dirname(member.name), linkpath)
                        linkpath = posixpath.normpath(linkpath)
                    member = tarobj._getmember(linkpath)

                if member is not None and (member.isfile() or member.isdir()):
                    final_dst = progress_filter(name, prelim_dst)
                    if final_dst:
                        if final_dst.endswith(os.sep):
                            final_dst = final_dst[:-1]
                        try:
                            tarobj._extract_member(member, final_dst)  # XXX Ugh
                        except tarfile.ExtractError:
                            pass    # chown/chmod/mkfifo/mknode/makedev failed
        return True
    finally:
        tarobj.close()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def _unpack_tarfile(filename, extract_dir):
    """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise ReadError(
            "%s is not a compressed or uncompressed tar file" % filename)
    try:
        tarobj.extractall(extract_dir)
    finally:
        tarobj.close()
项目:micro-blog    作者:nickChenyx    | 项目源码 | 文件源码
def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
    """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`

    Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
    by ``tarfile.open()``).  See ``unpack_archive()`` for an explanation
    of the `progress_filter` argument.
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise UnrecognizedFormat(
            "%s is not a compressed or uncompressed tar file" % (filename,)
        )
    try:
        tarobj.chown = lambda *args: None   # don't do any chowning!
        for member in tarobj:
            name = member.name
            # don't extract absolute paths or ones with .. in them
            if not name.startswith('/') and '..' not in name.split('/'):
                prelim_dst = os.path.join(extract_dir, *name.split('/'))

                # resolve any links and to extract the link targets as normal files
                while member is not None and (member.islnk() or member.issym()):
                    linkpath = member.linkname
                    if member.issym():
                        linkpath = posixpath.join(posixpath.dirname(member.name), linkpath)
                        linkpath = posixpath.normpath(linkpath)
                    member = tarobj._getmember(linkpath)

                if member is not None and (member.isfile() or member.isdir()):
                    final_dst = progress_filter(name, prelim_dst)
                    if final_dst:
                        if final_dst.endswith(os.sep):
                            final_dst = final_dst[:-1]
                        try:
                            tarobj._extract_member(member, final_dst)  # XXX Ugh
                        except tarfile.ExtractError:
                            pass    # chown/chmod/mkfifo/mknode/makedev failed
        return True
    finally:
        tarobj.close()
项目:pyload-plugins    作者:pyload    | 项目源码 | 文件源码
def verify(self, password=None):
        try:
            t = tarfile.open(self.filename, errorlevel=1)

        except tarfile.CompressionError, e:
            raise CRCError(e)

        except (OSError, tarfile.TarError), e:
            raise ArchiveError(e)

        else:
            t.close()
项目:gitlab_tools    作者:isginf    | 项目源码 | 文件源码
def archivate(src_dir, dest_dir, prefix="", console=False):
    """
    Zip src_dir to dest_dir
    """
    filename = os.path.join(dest_dir, "%s%s.tgz" % (prefix, os.path.basename(src_dir)))
    error_msg = None

    try:
        if console:
            tar_cmd = "tar xvfz %s %s" % (filename, src_dir)
            debug("Running " + tar_cmd)

            tar = subprocess.Popen(tar_cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
            tar.wait(timeout=TAR_TIMEOUT)
            tar_error = str(tar.stderr.read()).lower()

            if "fatal" in tar_error or "error" in tar_error:
                error_msg = tar_error
        else:
            debug("Creating tar archive %s from %s" % (filename, src_dir))
            tar = tarfile.open(filename, "w:gz")
            tar.add(src_dir, arcname=".", recursive=True)
            tar.close()
    except (FileExistsError):
        os.unlink(filename)
        archivate(src_dir, dest_dir, prefix, console)
    except (FileNotFoundError) as e:
        log("Failed to tar %s with Python lib. Trying to use console tar. Error was %s" % (src_dir, str(e)))
    except (tarfile.TarError, OSError) as e:
        error_msg = str(e)

    if error_msg and not console:
        archivate(src_dir, dest_dir, prefix, console=True)
    elif error_msg:
        error(error_msg)
项目:blog_flask    作者:momantai    | 项目源码 | 文件源码
def _unpack_tarfile(filename, extract_dir):
    """Unpack tar/tar.gz/tar.bz2/tar.xz `filename` to `extract_dir`
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise ReadError(
            "%s is not a compressed or uncompressed tar file" % filename)
    try:
        tarobj.extractall(extract_dir)
    finally:
        tarobj.close()
项目:MyFriend-Rob    作者:lcheniv    | 项目源码 | 文件源码
def _unpack_tarfile(filename, extract_dir):
    """Unpack tar/tar.gz/tar.bz2/tar.xz `filename` to `extract_dir`
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise ReadError(
            "%s is not a compressed or uncompressed tar file" % filename)
    try:
        tarobj.extractall(extract_dir)
    finally:
        tarobj.close()
项目:Alfred    作者:jkachhadia    | 项目源码 | 文件源码
def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
    """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`

    Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
    by ``tarfile.open()``).  See ``unpack_archive()`` for an explanation
    of the `progress_filter` argument.
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise UnrecognizedFormat(
            "%s is not a compressed or uncompressed tar file" % (filename,)
        )
    try:
        tarobj.chown = lambda *args: None   # don't do any chowning!
        for member in tarobj:
            name = member.name
            # don't extract absolute paths or ones with .. in them
            if not name.startswith('/') and '..' not in name.split('/'):
                prelim_dst = os.path.join(extract_dir, *name.split('/'))

                # resolve any links and to extract the link targets as normal files
                while member is not None and (member.islnk() or member.issym()):
                    linkpath = member.linkname
                    if member.issym():
                        linkpath = posixpath.join(posixpath.dirname(member.name), linkpath)
                        linkpath = posixpath.normpath(linkpath)
                    member = tarobj._getmember(linkpath)

                if member is not None and (member.isfile() or member.isdir()):
                    final_dst = progress_filter(name, prelim_dst)
                    if final_dst:
                        if final_dst.endswith(os.sep):
                            final_dst = final_dst[:-1]
                        try:
                            tarobj._extract_member(member, final_dst)  # XXX Ugh
                        except tarfile.ExtractError:
                            pass    # chown/chmod/mkfifo/mknode/makedev failed
        return True
    finally:
        tarobj.close()
项目:introspection    作者:navidshaikh    | 项目源码 | 文件源码
def extract_tarpath(self, tarpath, destpath):
        """
        Extract the tar path
        """
        try:
            tar = tarfile.open(tarpath)
            tar.extractall(path=destpath)
        except tarfile.TarError:
            msg = "Can not open the tar/gz file: %s" % tarpath
            log.error(msg)
            raise introexceptions.ImageLoadErrorFromTarfile(msg)
项目:BD_T2    作者:jfmolano1587    | 项目源码 | 文件源码
def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
    """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`

    Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
    by ``tarfile.open()``).  See ``unpack_archive()`` for an explanation
    of the `progress_filter` argument.
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise UnrecognizedFormat(
            "%s is not a compressed or uncompressed tar file" % (filename,)
        )
    try:
        tarobj.chown = lambda *args: None   # don't do any chowning!
        for member in tarobj:
            name = member.name
            # don't extract absolute paths or ones with .. in them
            if not name.startswith('/') and '..' not in name.split('/'):
                prelim_dst = os.path.join(extract_dir, *name.split('/'))

                # resolve any links and to extract the link targets as normal files
                while member is not None and (member.islnk() or member.issym()):
                    linkpath = member.linkname
                    if member.issym():
                        linkpath = posixpath.join(posixpath.dirname(member.name), linkpath)
                        linkpath = posixpath.normpath(linkpath)
                    member = tarobj._getmember(linkpath)

                if member is not None and (member.isfile() or member.isdir()):
                    final_dst = progress_filter(name, prelim_dst)
                    if final_dst:
                        if final_dst.endswith(os.sep):
                            final_dst = final_dst[:-1]
                        try:
                            tarobj._extract_member(member, final_dst)  # XXX Ugh
                        except tarfile.ExtractError:
                            pass    # chown/chmod/mkfifo/mknode/makedev failed
        return True
    finally:
        tarobj.close()
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
    """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`

    Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
    by ``tarfile.open()``).  See ``unpack_archive()`` for an explanation
    of the `progress_filter` argument.
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise UnrecognizedFormat(
            "%s is not a compressed or uncompressed tar file" % (filename,)
        )
    with contextlib.closing(tarobj):
        # don't do any chowning!
        tarobj.chown = lambda *args: None
        for member in tarobj:
            name = member.name
            # don't extract absolute paths or ones with .. in them
            if not name.startswith('/') and '..' not in name.split('/'):
                prelim_dst = os.path.join(extract_dir, *name.split('/'))

                # resolve any links and to extract the link targets as normal
                # files
                while member is not None and (member.islnk() or member.issym()):
                    linkpath = member.linkname
                    if member.issym():
                        base = posixpath.dirname(member.name)
                        linkpath = posixpath.join(base, linkpath)
                        linkpath = posixpath.normpath(linkpath)
                    member = tarobj._getmember(linkpath)

                if member is not None and (member.isfile() or member.isdir()):
                    final_dst = progress_filter(name, prelim_dst)
                    if final_dst:
                        if final_dst.endswith(os.sep):
                            final_dst = final_dst[:-1]
                        try:
                            # XXX Ugh
                            tarobj._extract_member(member, final_dst)
                        except tarfile.ExtractError:
                            # chown/chmod/mkfifo/mknode/makedev failed
                            pass
        return True
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
    """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`

    Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
    by ``tarfile.open()``).  See ``unpack_archive()`` for an explanation
    of the `progress_filter` argument.
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise UnrecognizedFormat(
            "%s is not a compressed or uncompressed tar file" % (filename,)
        )
    with contextlib.closing(tarobj):
        # don't do any chowning!
        tarobj.chown = lambda *args: None
        for member in tarobj:
            name = member.name
            # don't extract absolute paths or ones with .. in them
            if not name.startswith('/') and '..' not in name.split('/'):
                prelim_dst = os.path.join(extract_dir, *name.split('/'))

                # resolve any links and to extract the link targets as normal
                # files
                while member is not None and (member.islnk() or member.issym()):
                    linkpath = member.linkname
                    if member.issym():
                        base = posixpath.dirname(member.name)
                        linkpath = posixpath.join(base, linkpath)
                        linkpath = posixpath.normpath(linkpath)
                    member = tarobj._getmember(linkpath)

                if member is not None and (member.isfile() or member.isdir()):
                    final_dst = progress_filter(name, prelim_dst)
                    if final_dst:
                        if final_dst.endswith(os.sep):
                            final_dst = final_dst[:-1]
                        try:
                            # XXX Ugh
                            tarobj._extract_member(member, final_dst)
                        except tarfile.ExtractError:
                            # chown/chmod/mkfifo/mknode/makedev failed
                            pass
        return True
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def __scan(self, fileName, verbose):
        try:
            st = binStat(fileName)
            bid = bytes.fromhex(fileName[0:2] + fileName[3:5] + fileName[6:42])

            # validate entry in caching db
            if bid in self.__db:
                info = pickle.loads(self.__db[bid])
                if info['stat'] == st:
                    return
                del self.__db[bid]

            # read audit trail
            if verbose: print(fileName)
            with tarfile.open(fileName, errorlevel=1) as tar:
                # validate
                if tar.pax_headers.get('bob-archive-vsn') != "1":
                    print("Not a Bob archive:", fileName, "Ignored!")
                    return

                # find audit trail
                f = tar.next()
                while f:
                    if f.name == "meta/audit.json.gz": break
                    f = tar.next()
                else:
                    raise Error("Missing audit trail!")

                # read audit trail
                auditJsonGz = tar.extractfile(f)
                auditJson = gzip.GzipFile(fileobj=auditJsonGz)
                audit = Audit.fromByteStream(auditJson)

            # import data
            artifact = audit.getArtifact()
            self.__db[bid] = pickle.dumps({
                'stat' : st,
                'refs' : audit.getReferencedBuildIds(),
                'vars' : {
                    'meta' : artifact.getMetaData(),
                    'build' : artifact.getBuildInfo(),
                    'metaEnv' : artifact.getMetaEnv(),
                }
            })
        except tarfile.TarError as e:
            raise BobError("Cannot read {}: {}".format(fileName, str(e)))
        except OSError as e:
            raise BobError(str(e))
项目:sphinxcontrib-versioning    作者:Robpol86    | 项目源码 | 文件源码
def export(local_root, commit, target):
    """Export git commit to directory. "Extracts" all files at the commit to the target directory.

    Set mtime of RST files to last commit date.

    :raise CalledProcessError: Unhandled git command failure.

    :param str local_root: Local path to git root directory.
    :param str commit: Git commit SHA to export.
    :param str target: Directory to export to.
    """
    log = logging.getLogger(__name__)
    target = os.path.realpath(target)
    mtimes = list()

    # Define extract function.
    def extract(stdout):
        """Extract tar archive from "git archive" stdout.

        :param file stdout: Handle to git's stdout pipe.
        """
        queued_links = list()
        try:
            with tarfile.open(fileobj=stdout, mode='r|') as tar:
                for info in tar:
                    log.debug('name: %s; mode: %d; size: %s; type: %s', info.name, info.mode, info.size, info.type)
                    path = os.path.realpath(os.path.join(target, info.name))
                    if not path.startswith(target):  # Handle bad paths.
                        log.warning('Ignoring tar object path %s outside of target directory.', info.name)
                    elif info.isdir():  # Handle directories.
                        if not os.path.exists(path):
                            os.makedirs(path, mode=info.mode)
                    elif info.issym() or info.islnk():  # Queue links.
                        queued_links.append(info)
                    else:  # Handle files.
                        tar.extract(member=info, path=target)
                        if os.path.splitext(info.name)[1].lower() == '.rst':
                            mtimes.append(info.name)
                for info in (i for i in queued_links if os.path.exists(os.path.join(target, i.linkname))):
                    tar.extract(member=info, path=target)
        except tarfile.TarError as exc:
            log.debug('Failed to extract output from "git archive" command: %s', str(exc))

    # Run command.
    run_command(local_root, ['git', 'archive', '--format=tar', commit], pipeto=extract)

    # Set mtime.
    for file_path in mtimes:
        last_committed = int(run_command(local_root, ['git', 'log', '-n1', '--format=%at', commit, '--', file_path]))
        os.utime(os.path.join(target, file_path), (last_committed, last_committed))
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
    """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`

    Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
    by ``tarfile.open()``).  See ``unpack_archive()`` for an explanation
    of the `progress_filter` argument.
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise UnrecognizedFormat(
            "%s is not a compressed or uncompressed tar file" % (filename,)
        )
    with contextlib.closing(tarobj):
        # don't do any chowning!
        tarobj.chown = lambda *args: None
        for member in tarobj:
            name = member.name
            # don't extract absolute paths or ones with .. in them
            if not name.startswith('/') and '..' not in name.split('/'):
                prelim_dst = os.path.join(extract_dir, *name.split('/'))

                # resolve any links and to extract the link targets as normal
                # files
                while member is not None and (member.islnk() or member.issym()):
                    linkpath = member.linkname
                    if member.issym():
                        base = posixpath.dirname(member.name)
                        linkpath = posixpath.join(base, linkpath)
                        linkpath = posixpath.normpath(linkpath)
                    member = tarobj._getmember(linkpath)

                if member is not None and (member.isfile() or member.isdir()):
                    final_dst = progress_filter(name, prelim_dst)
                    if final_dst:
                        if final_dst.endswith(os.sep):
                            final_dst = final_dst[:-1]
                        try:
                            # XXX Ugh
                            tarobj._extract_member(member, final_dst)
                        except tarfile.ExtractError:
                            # chown/chmod/mkfifo/mknode/makedev failed
                            pass
        return True
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
    """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`

    Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
    by ``tarfile.open()``).  See ``unpack_archive()`` for an explanation
    of the `progress_filter` argument.
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise UnrecognizedFormat(
            "%s is not a compressed or uncompressed tar file" % (filename,)
        )
    with contextlib.closing(tarobj):
        # don't do any chowning!
        tarobj.chown = lambda *args: None
        for member in tarobj:
            name = member.name
            # don't extract absolute paths or ones with .. in them
            if not name.startswith('/') and '..' not in name.split('/'):
                prelim_dst = os.path.join(extract_dir, *name.split('/'))

                # resolve any links and to extract the link targets as normal
                # files
                while member is not None and (member.islnk() or member.issym()):
                    linkpath = member.linkname
                    if member.issym():
                        base = posixpath.dirname(member.name)
                        linkpath = posixpath.join(base, linkpath)
                        linkpath = posixpath.normpath(linkpath)
                    member = tarobj._getmember(linkpath)

                if member is not None and (member.isfile() or member.isdir()):
                    final_dst = progress_filter(name, prelim_dst)
                    if final_dst:
                        if final_dst.endswith(os.sep):
                            final_dst = final_dst[:-1]
                        try:
                            # XXX Ugh
                            tarobj._extract_member(member, final_dst)
                        except tarfile.ExtractError:
                            # chown/chmod/mkfifo/mknode/makedev failed
                            pass
        return True
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
    """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`

    Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
    by ``tarfile.open()``).  See ``unpack_archive()`` for an explanation
    of the `progress_filter` argument.
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise UnrecognizedFormat(
            "%s is not a compressed or uncompressed tar file" % (filename,)
        )
    with contextlib.closing(tarobj):
        # don't do any chowning!
        tarobj.chown = lambda *args: None
        for member in tarobj:
            name = member.name
            # don't extract absolute paths or ones with .. in them
            if not name.startswith('/') and '..' not in name.split('/'):
                prelim_dst = os.path.join(extract_dir, *name.split('/'))

                # resolve any links and to extract the link targets as normal
                # files
                while member is not None and (member.islnk() or member.issym()):
                    linkpath = member.linkname
                    if member.issym():
                        base = posixpath.dirname(member.name)
                        linkpath = posixpath.join(base, linkpath)
                        linkpath = posixpath.normpath(linkpath)
                    member = tarobj._getmember(linkpath)

                if member is not None and (member.isfile() or member.isdir()):
                    final_dst = progress_filter(name, prelim_dst)
                    if final_dst:
                        if final_dst.endswith(os.sep):
                            final_dst = final_dst[:-1]
                        try:
                            # XXX Ugh
                            tarobj._extract_member(member, final_dst)
                        except tarfile.ExtractError:
                            # chown/chmod/mkfifo/mknode/makedev failed
                            pass
        return True
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
    """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`

    Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
    by ``tarfile.open()``).  See ``unpack_archive()`` for an explanation
    of the `progress_filter` argument.
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise UnrecognizedFormat(
            "%s is not a compressed or uncompressed tar file" % (filename,)
        )
    with contextlib.closing(tarobj):
        # don't do any chowning!
        tarobj.chown = lambda *args: None
        for member in tarobj:
            name = member.name
            # don't extract absolute paths or ones with .. in them
            if not name.startswith('/') and '..' not in name.split('/'):
                prelim_dst = os.path.join(extract_dir, *name.split('/'))

                # resolve any links and to extract the link targets as normal
                # files
                while member is not None and (member.islnk() or member.issym()):
                    linkpath = member.linkname
                    if member.issym():
                        base = posixpath.dirname(member.name)
                        linkpath = posixpath.join(base, linkpath)
                        linkpath = posixpath.normpath(linkpath)
                    member = tarobj._getmember(linkpath)

                if member is not None and (member.isfile() or member.isdir()):
                    final_dst = progress_filter(name, prelim_dst)
                    if final_dst:
                        if final_dst.endswith(os.sep):
                            final_dst = final_dst[:-1]
                        try:
                            # XXX Ugh
                            tarobj._extract_member(member, final_dst)
                        except tarfile.ExtractError:
                            # chown/chmod/mkfifo/mknode/makedev failed
                            pass
        return True
项目:jira_worklog_scanner    作者:pgarneau    | 项目源码 | 文件源码
def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
    """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`

    Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
    by ``tarfile.open()``).  See ``unpack_archive()`` for an explanation
    of the `progress_filter` argument.
    """
    try:
        tarobj = tarfile.open(filename)
    except tarfile.TarError:
        raise UnrecognizedFormat(
            "%s is not a compressed or uncompressed tar file" % (filename,)
        )
    with contextlib.closing(tarobj):
        # don't do any chowning!
        tarobj.chown = lambda *args: None
        for member in tarobj:
            name = member.name
            # don't extract absolute paths or ones with .. in them
            if not name.startswith('/') and '..' not in name.split('/'):
                prelim_dst = os.path.join(extract_dir, *name.split('/'))

                # resolve any links and to extract the link targets as normal
                # files
                while member is not None and (member.islnk() or member.issym()):
                    linkpath = member.linkname
                    if member.issym():
                        base = posixpath.dirname(member.name)
                        linkpath = posixpath.join(base, linkpath)
                        linkpath = posixpath.normpath(linkpath)
                    member = tarobj._getmember(linkpath)

                if member is not None and (member.isfile() or member.isdir()):
                    final_dst = progress_filter(name, prelim_dst)
                    if final_dst:
                        if final_dst.endswith(os.sep):
                            final_dst = final_dst[:-1]
                        try:
                            # XXX Ugh
                            tarobj._extract_member(member, final_dst)
                        except tarfile.ExtractError:
                            # chown/chmod/mkfifo/mknode/makedev failed
                            pass
        return True