Python tarfile 模块,TarFile() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tarfile.TarFile()

项目:py-cloud-compute-cannon    作者:Autodesk    | 项目源码 | 文件源码
def make_tar_stream(sdict):
    """ Create a file-like tarfile object

    Args:
        sdict (Mapping[str, pyccc.FileReferenceBase]): dict mapping filenames to file references

    Returns:
        Filelike: TarFile stream
    """
    # TODO: this is currently done in memory - bad for big files!
    tarbuffer = io.BytesIO()
    tf = tarfile.TarFile(fileobj=tarbuffer, mode='w')
    for name, fileobj in sdict.items():
        tar_add_bytes(tf, name, fileobj.read('rb'))
    tf.close()
    tarbuffer.seek(0)
    return tarbuffer
项目:android-backup-tools    作者:bluec0re    | 项目源码 | 文件源码
def read_data(self, password=None):
        """
        Helper function which decrypts and decompresses the data if necessary
        and returns a tarfile.TarFile to interact with
        """
        fp = self.fp
        fp.seek(self.__data_start)

        if self.is_encrypted():
            fp = self._decrypt(fp, password=password)

        if self.compression == CompressionType.ZLIB:
            fp = self._decompress(fp)

        if self.stream:
            mode = 'r|*'
        else:
            mode = 'r:*'
        tar = tarfile.open(fileobj=fp, mode=mode)
        return tar
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def tar_compiled(file, dir, expression='^.+$',
                 exclude_content_from=None):
    """Used to tar a compiled application.
    The content of models, views, controllers is not stored in the tar file.
    """

    tar = tarfile.TarFile(file, 'w')
    for file in listdir(dir, expression, add_dirs=True,
                        exclude_content_from=exclude_content_from):
        filename = os.path.join(dir, file)
        if os.path.islink(filename):
            continue
        if os.path.isfile(filename) and file[-4:] != '.pyc':
            if file[:6] == 'models':
                continue
            if file[:5] == 'views':
                continue
            if file[:11] == 'controllers':
                continue
            if file[:7] == 'modules':
                continue
        tar.add(filename, file, False)
    tar.close()
项目:apocalypse    作者:dhoomakethu    | 项目源码 | 文件源码
def copy_to_container(self, container, source, dest):
        tar_stream = BytesIO()
        tar_file = tarfile.TarFile(fileobj=tar_stream, mode='w')
        file_data = open(source, mode='rb').read()
        fil_size = os.stat(source).st_size
        tarinfo = tarfile.TarInfo(name=os.path.basename(source))
        tarinfo.size = fil_size
        tarinfo.mtime = time.time()
        # tarinfo.mode = 0600
        tar_file.addfile(tarinfo, BytesIO(file_data))
        tar_file.close()
        tar_stream.seek(0)
        res = self.client.put_archive(container=container['Id'],
                                      path=dest,
                                      data=tar_stream
                                      )
        return res
项目:dockerscan    作者:cr0hn    | 项目源码 | 文件源码
def extract_layer_in_tmp_dir(img: tarfile.TarFile,
                             layer_digest: str) -> str:
    """
    This context manager allow to extract a selected layer into a temporal
    directory and yield the directory path

    >>> with open_docker_image(image_path) as (img,
                                               top_layer,
                                               _,
                                               manifest):
            last_layer_digest = get_last_image_layer(manifest)
            with extract_layer_in_tmp_dir(img, last_layer_digest) as d:
                print(d)
    """
    with tempfile.TemporaryDirectory() as d:
        log.debug(" > Extracting layer content in temporal "
                  "dir: {}".format(d))

        extract_docker_layer(img, layer_digest, d)

        yield d
项目:dockerscan    作者:cr0hn    | 项目源码 | 文件源码
def get_root_json_from_image(img: tarfile.TarFile) -> Tuple[str, dict]:
    """
    Every docker image has a root .json file with the metadata information.
    this function locate this file, load it and return the value of it and
    their name

    >>> get_docker_image_layers(img)
    ('db079554b4d2f7c65c4df3adae88cb72d051c8c3b8613eb44e86f60c945b1ca7', dict(...))
    """
    for f in img.getmembers():
        if f.name.endswith("json") and "/" not in f.name:
            c = img.extractfile(f.name).read()
            if hasattr(c, "decode"):
                c = c.decode()

            return f.name.split(".")[0], json.loads(c)

    return None, None
项目:smartcontainers    作者:crcresearch    | 项目源码 | 文件源码
def fileCopyOut(self, containerid, filename, path):
        """Copy file from container to the local machine.

        Args:
            containerid: Container ID
            filename: Name of file to be copied in.
            path: Path in the container where file should be copied.
        Returns: Nothing.

        """
        # Copies file from container to local machine.
        # File transmits as a tar stream. Saves to local disk as tar.
        # Extracts file for local manipulation.
        tarObj, stats = super(scClient,
                              self).get_archive(container=containerid,
                                                path=path + filename)
        with open('temp.tar', 'w') as destination:
            for line in tarObj:
                destination.write(line)
            destination.seek(0)
            thisTar = tarfile.TarFile(destination.name)
            thisTar.extract(filename)
            os.remove('temp.tar')
项目:true_review_web2py    作者:lucadealfaro    | 项目源码 | 文件源码
def tar_compiled(file, dir, expression='^.+$',
                 exclude_content_from=None):
    """Used to tar a compiled application.
    The content of models, views, controllers is not stored in the tar file.
    """

    tar = tarfile.TarFile(file, 'w')
    for file in listdir(dir, expression, add_dirs=True,
                        exclude_content_from=exclude_content_from):
        filename = os.path.join(dir, file)
        if os.path.islink(filename):
            continue
        if os.path.isfile(filename) and file[-4:] != '.pyc':
            if file[:6] == 'models':
                continue
            if file[:5] == 'views':
                continue
            if file[:11] == 'controllers':
                continue
            if file[:7] == 'modules':
                continue
        tar.add(filename, file, False)
    tar.close()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_ignore_zeros(self):
        # Test TarFile's ignore_zeros option.
        if self.mode.endswith(":gz"):
            _open = gzip.GzipFile
        elif self.mode.endswith(":bz2"):
            _open = bz2.BZ2File
        else:
            _open = open

        for char in (b'\0', b'a'):
            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
            # are ignored correctly.
            with _open(tmpname, "wb") as fobj:
                fobj.write(char * 1024)
                fobj.write(tarfile.TarInfo("foo").tobuf())

            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
            try:
                self.assertListEqual(tar.getnames(), ["foo"],
                    "ignore_zeros=True should have skipped the %r-blocks" % char)
            finally:
                tar.close()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_init_close_fobj(self):
        # Issue #7341: Close the internal file object in the TarFile
        # constructor in case of an error. For the test we rely on
        # the fact that opening an empty file raises a ReadError.
        empty = os.path.join(TEMPDIR, "empty")
        with open(empty, "wb") as fobj:
            fobj.write(b"")

        try:
            tar = object.__new__(tarfile.TarFile)
            try:
                tar.__init__(empty)
            except tarfile.ReadError:
                self.assertTrue(tar.fileobj.closed)
            else:
                self.fail("ReadError not raised")
        finally:
            support.unlink(empty)
项目:Problematica-public    作者:TechMaz    | 项目源码 | 文件源码
def tar_compiled(file, dir, expression='^.+$',
                 exclude_content_from=None):
    """Used to tar a compiled application.
    The content of models, views, controllers is not stored in the tar file.
    """

    tar = tarfile.TarFile(file, 'w')
    for file in listdir(dir, expression, add_dirs=True,
                        exclude_content_from=exclude_content_from):
        filename = os.path.join(dir, file)
        if os.path.islink(filename):
            continue
        if os.path.isfile(filename) and file[-4:] != '.pyc':
            if file[:6] == 'models':
                continue
            if file[:5] == 'views':
                continue
            if file[:11] == 'controllers':
                continue
            if file[:7] == 'modules':
                continue
        tar.add(filename, file, False)
    tar.close()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_init_close_fobj(self):
        # Issue #7341: Close the internal file object in the TarFile
        # constructor in case of an error. For the test we rely on
        # the fact that opening an empty file raises a ReadError.
        empty = os.path.join(TEMPDIR, "empty")
        with open(empty, "wb") as fobj:
            fobj.write("")

        try:
            tar = object.__new__(tarfile.TarFile)
            try:
                tar.__init__(empty)
            except tarfile.ReadError:
                self.assertTrue(tar.fileobj.closed)
            else:
                self.fail("ReadError not raised")
        finally:
            support.unlink(empty)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_init_close_fobj(self):
        # Issue #7341: Close the internal file object in the TarFile
        # constructor in case of an error. For the test we rely on
        # the fact that opening an empty file raises a ReadError.
        empty = os.path.join(TEMPDIR, "empty")
        with open(empty, "wb") as fobj:
            fobj.write("")

        try:
            tar = object.__new__(tarfile.TarFile)
            try:
                tar.__init__(empty)
            except tarfile.ReadError:
                self.assertTrue(tar.fileobj.closed)
            else:
                self.fail("ReadError not raised")
        finally:
            support.unlink(empty)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_ignore_zeros(self):
        # Test TarFile's ignore_zeros option.
        for char in (b'\0', b'a'):
            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
            # are ignored correctly.
            with self.open(tmpname, "w") as fobj:
                fobj.write(char * 1024)
                fobj.write(tarfile.TarInfo("foo").tobuf())

            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
            try:
                self.assertListEqual(tar.getnames(), ["foo"],
                    "ignore_zeros=True should have skipped the %r-blocks" %
                    char)
            finally:
                tar.close()
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_init_close_fobj(self):
        # Issue #7341: Close the internal file object in the TarFile
        # constructor in case of an error. For the test we rely on
        # the fact that opening an empty file raises a ReadError.
        empty = os.path.join(TEMPDIR, "empty")
        with open(empty, "wb") as fobj:
            fobj.write(b"")

        try:
            tar = object.__new__(tarfile.TarFile)
            try:
                tar.__init__(empty)
            except tarfile.ReadError:
                self.assertTrue(tar.fileobj.closed)
            else:
                self.fail("ReadError not raised")
        finally:
            support.unlink(empty)
项目:rekall-agent-server    作者:rekall-innovations    | 项目源码 | 文件源码
def tar_compiled(file, dir, expression='^.+$',
                 exclude_content_from=None):
    """Used to tar a compiled application.
    The content of models, views, controllers is not stored in the tar file.
    """

    tar = tarfile.TarFile(file, 'w')
    for file in listdir(dir, expression, add_dirs=True,
                        exclude_content_from=exclude_content_from):
        filename = os.path.join(dir, file)
        if os.path.islink(filename):
            continue
        if os.path.isfile(filename) and file[-4:] != '.pyc':
            if file[:6] == 'models':
                continue
            if file[:5] == 'views':
                continue
            if file[:11] == 'controllers':
                continue
            if file[:7] == 'modules':
                continue
        tar.add(filename, file, False)
    tar.close()
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def _tar_add_string_file(self, tarobj, fpath, content):
        """
        Given a tarfile object, add a file to it at ``fpath``, with content
        ``content``.

        Largely based on: http://stackoverflow.com/a/40392022

        :param tarobj: the tarfile to add to
        :type tarobj: tarfile.TarFile
        :param fpath: path to put the file at in the archive
        :type fpath: str
        :param content: file content
        :type content: str
        """
        logger.debug('Adding %d-length string to tarfile at %s',
                     len(content), fpath)
        data = content.encode('utf-8')
        f = BytesIO(data)
        info = tarfile.TarInfo(name=fpath)
        info.size = len(data)
        tarobj.addfile(tarinfo=info, fileobj=f)
项目:web3py    作者:web2py    | 项目源码 | 文件源码
def tar_compiled(file, dir, expression='^.+$',
                 exclude_content_from=None):
    """Used to tar a compiled application.
    The content of models, views, controllers is not stored in the tar file.
    """

    tar = tarfile.TarFile(file, 'w')
    for file in listdir(dir, expression, add_dirs=True,
                        exclude_content_from=exclude_content_from):
        filename = os.path.join(dir, file)
        if os.path.islink(filename):
            continue
        if os.path.isfile(filename) and file[-4:] != '.pyc':
            if file[:6] == 'models':
                continue
            if file[:5] == 'views':
                continue
            if file[:11] == 'controllers':
                continue
            if file[:7] == 'modules':
                continue
        tar.add(filename, file, False)
    tar.close()
项目:virt-backup    作者:Anthony25    | 项目源码 | 文件源码
def backup_img(self, disk, target, target_filename=None):
        """
        Backup a disk image

        :param disk: path of the image to backup
        :param target: dir or filename to copy into/as. If self.compress,
                       target has to be a tarfile.TarFile
        :param target_filename: destination file will have this name, or keep
                                the original one. target has to be a dir
                                (if not exists, will be created)
        :returns backup_path: complete path of our backup
        """
        if self.compression:
            backup_path = self._add_img_to_tarfile(
                disk, target, target_filename
            )
        else:
            backup_path = self._copy_img_to_file(disk, target, target_filename)

        logger.debug("{} successfully copied".format(disk))
        return os.path.abspath(backup_path)
项目:virt-backup    作者:Anthony25    | 项目源码 | 文件源码
def _add_img_to_tarfile(self, img, target, target_filename):
        """
        :param img: source img path
        :param target: tarfile.TarFile where img will be added
        :param target_filename: img name in the tarfile
        """
        total_size = os.path.getsize(img)
        tqdm_kwargs = {
            "total": total_size, "unit": "B", "unit_scale": True,
            "ncols": 0, "mininterval": 0.5
        }
        logger.debug("Copy {}…".format(img))
        if self.compression == "xz":
            backup_path = target.fileobj._fp.name
        else:
            backup_path = target.fileobj.name
        self.pending_info["tar"] = os.path.basename(backup_path)
        self._dump_pending_info()

        with tqdm(**tqdm_kwargs) as pbar:
            target.fileobject = get_progress_bar_tar(pbar)
            target.add(img, arcname=target_filename)

        return backup_path
项目:patentdata    作者:benhoyle    | 项目源码 | 文件源码
def get_archive_names(self, filename):
        """ Return names of files within archive having filename. """
        try:
            if filename.lower().endswith(".zip"):
                with zipfile.ZipFile(
                    os.path.join(self.path, filename), "r"
                ) as z:
                    names = z.namelist()
            elif filename.lower().endswith(".tar"):
                with tarfile.TarFile(
                    os.path.join(self.path, filename), "r"
                ) as t:
                    names = t.getnames()
        except Exception:
            logging.exception(
                "Exception opening file:" +
                str(os.path.join(self.path, filename))
            )
            names = []
        return names
项目:chainercv    作者:chainer    | 项目源码 | 文件源码
def extractall(file_path, destination, ext):
    """Extracts an archive file.

    This function extracts an archive file to a destination.

    Args:
        file_path (str): The path of a file to be extracted.
        destination (str): A directory path. The archive file
            will be extracted under this directory.
        ext (str): An extension suffix of the archive file.
            This function supports :obj:`'.zip'`, :obj:`'.tar'`,
            :obj:`'.gz'` and :obj:`'.tgz'`.

    """

    if ext == '.zip':
        with zipfile.ZipFile(file_path, 'r') as z:
            z.extractall(destination)
    elif ext == '.tar':
        with tarfile.TarFile(file_path, 'r') as t:
            t.extractall(destination)
    elif ext == '.gz' or ext == '.tgz':
        with tarfile.open(file_path, 'r:gz') as t:
            t.extractall(destination)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_ignore_zeros(self):
        # Test TarFile's ignore_zeros option.
        if self.mode.endswith(":gz"):
            _open = gzip.GzipFile
        elif self.mode.endswith(":bz2"):
            _open = bz2.BZ2File
        else:
            _open = open

        for char in ('\0', 'a'):
            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
            # are ignored correctly.
            with _open(tmpname, "wb") as fobj:
                fobj.write(char * 1024)
                fobj.write(tarfile.TarInfo("foo").tobuf())

            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
            try:
                self.assertListEqual(tar.getnames(), ["foo"],
                    "ignore_zeros=True should have skipped the %r-blocks" %
                    char)
            finally:
                tar.close()
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_init_close_fobj(self):
        # Issue #7341: Close the internal file object in the TarFile
        # constructor in case of an error. For the test we rely on
        # the fact that opening an empty file raises a ReadError.
        empty = os.path.join(TEMPDIR, "empty")
        with open(empty, "wb") as fobj:
            fobj.write(b"")

        try:
            tar = object.__new__(tarfile.TarFile)
            try:
                tar.__init__(empty)
            except tarfile.ReadError:
                self.assertTrue(tar.fileobj.closed)
            else:
                self.fail("ReadError not raised")
        finally:
            test_support.unlink(empty)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_ignore_zeros(self):
        # Test TarFile's ignore_zeros option.
        for char in (b'\0', b'a'):
            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
            # are ignored correctly.
            with self.open(tmpname, "w") as fobj:
                fobj.write(char * 1024)
                fobj.write(tarfile.TarInfo("foo").tobuf())

            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
            try:
                self.assertListEqual(tar.getnames(), ["foo"],
                    "ignore_zeros=True should have skipped the %r-blocks" %
                    char)
            finally:
                tar.close()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_init_close_fobj(self):
        # Issue #7341: Close the internal file object in the TarFile
        # constructor in case of an error. For the test we rely on
        # the fact that opening an empty file raises a ReadError.
        empty = os.path.join(TEMPDIR, "empty")
        with open(empty, "wb") as fobj:
            fobj.write(b"")

        try:
            tar = object.__new__(tarfile.TarFile)
            try:
                tar.__init__(empty)
            except tarfile.ReadError:
                self.assertTrue(tar.fileobj.closed)
            else:
                self.fail("ReadError not raised")
        finally:
            support.unlink(empty)
项目:slugiot-client    作者:slugiot    | 项目源码 | 文件源码
def tar_compiled(file, dir, expression='^.+$',
                 exclude_content_from=None):
    """Used to tar a compiled application.
    The content of models, views, controllers is not stored in the tar file.
    """

    tar = tarfile.TarFile(file, 'w')
    for file in listdir(dir, expression, add_dirs=True,
                        exclude_content_from=exclude_content_from):
        filename = os.path.join(dir, file)
        if os.path.islink(filename):
            continue
        if os.path.isfile(filename) and file[-4:] != '.pyc':
            if file[:6] == 'models':
                continue
            if file[:5] == 'views':
                continue
            if file[:11] == 'controllers':
                continue
            if file[:7] == 'modules':
                continue
        tar.add(filename, file, False)
    tar.close()
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_ignore_zeros(self):
        # Test TarFile's ignore_zeros option.
        if self.mode.endswith(":gz"):
            _open = gzip.GzipFile
        elif self.mode.endswith(":bz2"):
            _open = bz2.BZ2File
        else:
            _open = open

        for char in ('\0', 'a'):
            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
            # are ignored correctly.
            fobj = _open(tmpname, "wb")
            fobj.write(char * 1024)
            fobj.write(tarfile.TarInfo("foo").tobuf())
            fobj.close()

            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
            self.assertListEqual(tar.getnames(), ["foo"],
                    "ignore_zeros=True should have skipped the %r-blocks" % char)
            tar.close()
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_init_close_fobj(self):
        # Issue #7341: Close the internal file object in the TarFile
        # constructor in case of an error. For the test we rely on
        # the fact that opening an empty file raises a ReadError.
        empty = os.path.join(TEMPDIR, "empty")
        open(empty, "wb").write("")

        try:
            tar = object.__new__(tarfile.TarFile)
            try:
                tar.__init__(empty)
            except tarfile.ReadError:
                self.assertTrue(tar.fileobj.closed)
            else:
                self.fail("ReadError not raised")
        finally:
            os.remove(empty)
项目:hop    作者:dudadornelles    | 项目源码 | 文件源码
def copy_to_container(src, dest, owner, group, container):

    def create_tar_stream(file_content, file_name):
        # metadata for internal file
        tarinfo = tarfile.TarInfo(name=file_name)
        tarinfo.size = len(file_content)
        tarinfo.mtime = time.time()

        tarstream = BytesIO()
        tar = tarfile.TarFile(fileobj=tarstream, mode='w')
        tar.addfile(tarinfo, BytesIO(file_content))
        tar.close()
        tarstream.seek(0)
        return tarstream

    log.info('copying to container: %s', locals())
    dest_file = os.path.basename(dest)
    dest_dir = os.path.dirname(dest)
    file_data = open(src).read().encode('utf-8')

    tar_stream = create_tar_stream(file_content=file_data, file_name=dest_file)
    container.put_archive(path=dest_dir, data=tar_stream)
    container.exec_run('chown {0}:{1} -R {2}'.format(owner, group, dest))
项目:GMAN    作者:iDurugkar    | 项目源码 | 文件源码
def _download_cifar(out_dir):
    url = 'https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz'
    print(url)
    # if set_name == 'test':
    #     out_name = 'test_lmdb.zip'
    # else:
    #     out_name = '{category}_{set_name}_lmdb.zip'.format(**locals())

    file_path = os.path.join(out_dir, 'cifar-10-python.tar.gz')
    if not os.path.exists(file_path):
        cmd = ['wget', url, '-P', out_dir]
        print('Downloading CIFAR')
        subprocess.call(cmd)
    # tfile = tarfile.TarFile(file_path)
    with tarfile.open(name=file_path, mode='r:gz') as tfile:
        tfile.extractall(path=out_dir)
项目:ray-legacy    作者:ray-project    | 项目源码 | 文件源码
def load_chunk(tarfile, size=None):
  """Load a number of images from a single imagenet .tar file.

  This function also converts the image from grayscale to RGB if necessary.

  Args:
    tarfile (tarfile.TarFile): The archive from which the files get loaded.
    size (Optional[Tuple[int, int]]): Resize the image to this size if provided.

  Returns:
    numpy.ndarray: Contains the image data in format [batch, w, h, c]
  """
  result = []
  filenames = []
  for member in tarfile.getmembers():
    filename = member.path
    content = tarfile.extractfile(member)
    img = Image.open(content)
    rgbimg = Image.new("RGB", img.size)
    rgbimg.paste(img)
    if size != None:
      rgbimg = rgbimg.resize(size, Image.ANTIALIAS)
    result.append(np.array(rgbimg).reshape(1, rgbimg.size[0], rgbimg.size[1], 3))
    filenames.append(filename)
  return np.concatenate(result), filenames
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_ignore_zeros(self):
        # Test TarFile's ignore_zeros option.
        for char in (b'\0', b'a'):
            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
            # are ignored correctly.
            with self.open(tmpname, "w") as fobj:
                fobj.write(char * 1024)
                fobj.write(tarfile.TarInfo("foo").tobuf())

            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
            try:
                self.assertListEqual(tar.getnames(), ["foo"],
                    "ignore_zeros=True should have skipped the %r-blocks" %
                    char)
            finally:
                tar.close()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_init_close_fobj(self):
        # Issue #7341: Close the internal file object in the TarFile
        # constructor in case of an error. For the test we rely on
        # the fact that opening an empty file raises a ReadError.
        empty = os.path.join(TEMPDIR, "empty")
        with open(empty, "wb") as fobj:
            fobj.write(b"")

        try:
            tar = object.__new__(tarfile.TarFile)
            try:
                tar.__init__(empty)
            except tarfile.ReadError:
                self.assertTrue(tar.fileobj.closed)
            else:
                self.fail("ReadError not raised")
        finally:
            support.unlink(empty)
项目:correct_shit_f    作者:PrieureDeSion    | 项目源码 | 文件源码
def tar_error(tmpdir):
    def fixture(filename):
        path = os.path.join(str(tmpdir), filename)

        def reset(path):
            os.mkdir('d')
            with tarfile.TarFile(path, 'w') as archive:
                for file in ('a', 'b', 'c', 'd/e'):
                    with open(file, 'w') as f:
                        f.write('*')

                    archive.add(file)

                    os.remove(file)

            with tarfile.TarFile(path, 'r') as archive:
                archive.extractall()

        os.chdir(str(tmpdir))
        reset(path)

        assert set(os.listdir('.')) == {filename, 'a', 'b', 'c', 'd'}
        assert set(os.listdir('./d')) == {'e'}

    return fixture
项目:alterchef    作者:libremesh    | 项目源码 | 文件源码
def test_load_from_tar(self):

        def generate_inmemory_tar(content):
            import StringIO
            import tarfile
            tar_sio = StringIO.StringIO()
            with tarfile.TarFile(fileobj=tar_sio, mode="w") as tar:
                content_sio = StringIO.StringIO()
                content_sio.write(content)
                content_sio.seek(0)
                info = tarfile.TarInfo(name=u"fóø".encode("utf-8"))
                info.size = len(content_sio.buf)
                tar.addfile(tarinfo=info, fileobj=content_sio)
            tar_sio.seek(0)
            tar_sio.name = "foo"
            return tar_sio

        content = u"testing únicódè"
        inc_files = IncludeFiles.load_from_tar(generate_inmemory_tar(content.encode("utf-8")))
        self.assertEqual(inc_files.files.values()[0], content)

        # if encoding is not utf-8 then we cant load the file
        self.assertRaises(UnicodeDecodeError, IncludeFiles.load_from_tar,
                          generate_inmemory_tar(content.encode("latin-1")))
项目:convirt    作者:fromanirh    | 项目源码 | 文件源码
def setUp(self):
        self.pid = 0
        testdir = os.path.dirname(os.path.abspath(__file__))
        self.root = os.path.join(testdir, 'fake')

        self.procfsroot = os.path.join(
            self.root, convirt.metrics.cgroups.PROCFS
        )
        self.cgroupfsroot = os.path.join(
            self.root, convirt.metrics.cgroups.CGROUPFS
        )

        with move_into(self.root):
            cgroupsdata = os.path.join(self.root, 'cgroups.tgz')
            with gzip.GzipFile(cgroupsdata) as gz:
                tar = tarfile.TarFile(fileobj=gz)
                tar.extractall()

        self.patch = monkey.Patch([
            (convirt.metrics.cgroups, '_PROCBASE', self.procfsroot),
            (convirt.metrics.cgroups, '_CGROUPBASE', self.cgroupfsroot),
        ])
        self.patch.apply()
项目:dagda    作者:eliasgranderubio    | 项目源码 | 文件源码
def extract_filesystem_bundle(docker_driver, container_id=None, image_name=None):
    temporary_dir = tempfile.mkdtemp()
    # Get and save filesystem bundle
    if container_id is not None:
        data = docker_driver.get_docker_client().export(container=container_id).data
        name = container_id
    else:
        data = docker_driver.get_docker_client().get_image(image=image_name).data
        name = image_name.replace('/', '_').replace(':', '_')
    with open(temporary_dir + "/" + name + ".tar", "wb") as file:
        file.write(data)
    # Untar filesystem bundle
    tarfile = TarFile(temporary_dir + "/" + name + ".tar")
    tarfile.extractall(temporary_dir)
    os.remove(temporary_dir + "/" + name + ".tar")
    if image_name is not None:
        layers = _get_layers_from_manifest(temporary_dir)
        _untar_layers(temporary_dir, layers)
    # Return
    return temporary_dir


# Clean the temporary directory
项目:dagda    作者:eliasgranderubio    | 项目源码 | 文件源码
def _untar_layers(dir, layers):
    output = {}
    # Untar layer filesystem bundle
    for layer in layers:
        tarfile = TarFile(dir + "/" + layer)
        for member in tarfile.getmembers():
            output[member.name] = member
    for member_name in output:
        try:
            tarfile.extract(output[member_name], path=dir, set_attrs=False)
        except (ValueError, ReadError):
            pass

    # Clean up
    for layer in layers:
        clean_up(dir + "/" + layer[:-10])
项目:bdocker    作者:indigo-dc    | 项目源码 | 文件源码
def write_tar_raw_data_stream(path, stream, uid, gid):
    """Extract data from tar raw in stream data.

    It extract the data from a stream to a file, and
    change owner to the uid with gid.

    :param path: file path
    :param stream: data in stream.
    :param uid: user uid
    :param gid: group uid
    """
    try:
        stream_io = io.BytesIO(stream)
    except TypeError:
        stream_io = io.StringIO(stream)
    my_tar = tarfile.TarFile(fileobj=stream_io)
    my_tar.extractall(path=path)
    change_owner_dir(path, uid, gid)
项目:buckit    作者:facebookexperimental    | 项目源码 | 文件源码
def test_tarball(self):
        with self._temp_filesystem() as fs_path:
            fs_prefix = fs_path.lstrip('/')

            def strip_fs_prefix(tarinfo):
                if tarinfo.path.startswith(fs_prefix + '/'):
                    tarinfo.path = tarinfo.path[len(fs_prefix) + 1:]
                elif fs_prefix == tarinfo.path:
                    tarinfo.path = '.'
                else:
                    raise AssertionError(
                        f'{tarinfo.path} must start with {fs_prefix}'
                    )
                return tarinfo

            with tempfile.NamedTemporaryFile() as t:
                with tarfile.TarFile(t.name, 'w') as tar_obj:
                    tar_obj.add(fs_path, filter=strip_fs_prefix)

                self._check_item(
                    TarballItem(from_target='t', into_dir='y', tarball=t.name),
                    self._temp_filesystem_provides('y'),
                    {require_directory('y')},
                    f'tar --directory=y {t.name}',
                )
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_get_size(self):
        tar_file = self.mox.CreateMock(tarfile.TarFile)
        tar_info = self.mox.CreateMock(tarfile.TarInfo)

        image = utils.RawTGZImage(None)

        self.mox.StubOutWithMock(image, '_as_tarfile')

        image._as_tarfile().AndReturn(tar_file)
        tar_file.next().AndReturn(tar_info)
        tar_info.size = 124

        self.mox.ReplayAll()

        result = image.get_size()

        self.assertEqual(124, result)
        self.assertEqual(image._tar_info, tar_info)
        self.assertEqual(image._tar_file, tar_file)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_get_size_called_twice(self):
        tar_file = self.mox.CreateMock(tarfile.TarFile)
        tar_info = self.mox.CreateMock(tarfile.TarInfo)

        image = utils.RawTGZImage(None)

        self.mox.StubOutWithMock(image, '_as_tarfile')

        image._as_tarfile().AndReturn(tar_file)
        tar_file.next().AndReturn(tar_info)
        tar_info.size = 124

        self.mox.ReplayAll()

        image.get_size()
        result = image.get_size()

        self.assertEqual(124, result)
        self.assertEqual(image._tar_info, tar_info)
        self.assertEqual(image._tar_file, tar_file)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_stream_to_without_size_retrieved(self):
        source_tar = self.mox.CreateMock(tarfile.TarFile)
        first_tarinfo = self.mox.CreateMock(tarfile.TarInfo)
        target_file = self.mox.CreateMock(open)
        source_file = self.mox.CreateMock(open)

        image = utils.RawTGZImage(None)
        image._image_service_and_image_id = ('service', 'id')

        self.mox.StubOutWithMock(image, '_as_tarfile', source_tar)
        self.mox.StubOutWithMock(utils.shutil, 'copyfileobj')

        image._as_tarfile().AndReturn(source_tar)
        source_tar.next().AndReturn(first_tarinfo)
        source_tar.extractfile(first_tarinfo).AndReturn(source_file)
        utils.shutil.copyfileobj(source_file, target_file)
        source_tar.close()

        self.mox.ReplayAll()

        image.stream_to(target_file)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_stream_to_with_size_retrieved(self):
        source_tar = self.mox.CreateMock(tarfile.TarFile)
        first_tarinfo = self.mox.CreateMock(tarfile.TarInfo)
        target_file = self.mox.CreateMock(open)
        source_file = self.mox.CreateMock(open)
        first_tarinfo.size = 124

        image = utils.RawTGZImage(None)
        image._image_service_and_image_id = ('service', 'id')

        self.mox.StubOutWithMock(image, '_as_tarfile', source_tar)
        self.mox.StubOutWithMock(utils.shutil, 'copyfileobj')

        image._as_tarfile().AndReturn(source_tar)
        source_tar.next().AndReturn(first_tarinfo)
        source_tar.extractfile(first_tarinfo).AndReturn(source_file)
        utils.shutil.copyfileobj(source_file, target_file)
        source_tar.close()

        self.mox.ReplayAll()

        image.get_size()
        image.stream_to(target_file)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_start(self):
        outf = six.StringIO()
        producer = vdi_through_dev.TarGzProducer('fpath', outf,
            '100', 'fname')

        tfile = self.mox.CreateMock(tarfile.TarFile)
        tinfo = self.mox.CreateMock(tarfile.TarInfo)

        inf = self.mox.CreateMock(open)

        self.mox.StubOutWithMock(vdi_through_dev, 'tarfile')
        self.mox.StubOutWithMock(producer, '_open_file')

        vdi_through_dev.tarfile.TarInfo(name='fname').AndReturn(tinfo)
        vdi_through_dev.tarfile.open(fileobj=outf, mode='w|gz').AndReturn(
            fake_context(tfile))
        producer._open_file('fpath', 'rb').AndReturn(fake_context(inf))
        tfile.addfile(tinfo, fileobj=inf)
        outf.close()

        self.mox.ReplayAll()

        producer.start()

        self.assertEqual(100, tinfo.size)
项目:StuffShare    作者:StuffShare    | 项目源码 | 文件源码
def tar_compiled(file, dir, expression='^.+$'):
    """Used to tar a compiled application.
    The content of models, views, controllers is not stored in the tar file.
    """

    tar = tarfile.TarFile(file, 'w')
    for file in listdir(dir, expression, add_dirs=True):
        filename = os.path.join(dir, file)
        if os.path.islink(filename):
            continue
        if os.path.isfile(filename) and file[-4:] != '.pyc':
            if file[:6] == 'models':
                continue
            if file[:5] == 'views':
                continue
            if file[:11] == 'controllers':
                continue
            if file[:7] == 'modules':
                continue
        tar.add(filename, file, False)
    tar.close()
项目:android-backup-tools    作者:bluec0re    | 项目源码 | 文件源码
def unpack(self, target_dir=None, password=None, pickle_fname=None):
        """
        High level function for unpacking a backup file into the given
        target directory (will be generated based on the filename if not given).

        Creates also a filename.pickle file containing the exact order of the included files
        (required for repacking).

        :param target_dir: the directory to extract the backup file into
                           (default: filename + _unpacked)
        :param password: optional password for decrypting the backup
                         (can also be set in the constructor)
        """

        if target_dir is None:
           target_dir = os.path.basename(self.fp.name) + '_unpacked'
        if pickle_fname is None:
            pickle_fname = os.path.basename(fname) + '.pickle'
        if not os.path.exists(target_dir):
            os.mkdir(target_dir)

        tar = self.read_data(password)
        members = tar.getmembers()

        # reopen stream (TarFile is not able to seek)
        tar = self.read_data(password)

        tar.extractall(path=target_dir, members=members)

        with open(pickle_fname, 'wb') as fp:
            pickle.dump(members, fp)
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def tar_directory(path):
    cwd = os.getcwd()
    parent = os.path.dirname(path)
    directory = os.path.basename(path)
    tmp = tempfile.TemporaryFile()
    os.chdir(parent)
    tarball = tarfile.TarFile(fileobj=tmp, mode='w')
    tarball.add(directory)
    tarball.close()
    tmp.seek(0)
    out = tmp.read()
    tmp.close()
    os.chdir(cwd)
    return out
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def tar_directory(path):
    cwd = os.getcwd()
    parent = os.path.dirname(path)
    directory = os.path.basename(path)
    tmp = tempfile.TemporaryFile()
    os.chdir(parent)
    tarball = tarfile.TarFile(fileobj=tmp, mode='w')
    tarball.add(directory)
    tarball.close()
    tmp.seek(0)
    out = tmp.read()
    tmp.close()
    os.chdir(cwd)
    return out