Python tarfile 模块,TarInfo() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tarfile.TarInfo()

项目:camera_calibration_frontend    作者:groundmelon    | 项目源码 | 文件源码
def do_tarfile_save(self, tf):
        """ Write images and calibration solution to a tarfile object """

        def taradd(name, buf):
            s = StringIO(buf)
            ti = tarfile.TarInfo(name)
            ti.size = len(s.getvalue())
            ti.uname = 'calibrator'
            ti.mtime = int(time.time())
            tf.addfile(tarinfo=ti, fileobj=s)

        ims = [("left-%04d.png" % i, im) for i,(_, im) in enumerate(self.db)]
        for (name, im) in ims:
            taradd(name, cv2.imencode(".png", im)[1].tostring())

        if self.calibrated:
            taradd('ost.yaml', self.yaml())
            taradd('ost.txt', self.ost())
        else:
            print("Doing none-calibration tarfile save!")
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def make_trivial_sdist(dist_path, setup_py):
    """Create a simple sdist tarball at dist_path, containing just a
    setup.py, the contents of which are provided by the setup_py string.
    """

    setup_py_file = tarfile.TarInfo(name='setup.py')
    try:
        # Python 3 (StringIO gets converted to io module)
        MemFile = BytesIO
    except AttributeError:
        MemFile = StringIO
    setup_py_bytes = MemFile(setup_py.encode('utf-8'))
    setup_py_file.size = len(setup_py_bytes.getvalue())
    dist = tarfile.open(dist_path, 'w:gz')
    try:
        dist.addfile(setup_py_file, fileobj=setup_py_bytes)
    finally:
        dist.close()
项目:rules_docker    作者:bazelbuild    | 项目源码 | 文件源码
def create_bundle(output, tag_to_config, diffid_to_blobsum,
                  blobsum_to_unzipped, blobsum_to_zipped, blobsum_to_legacy):
  """Creates a Docker image from a list of layers.

  Args:
    output: the name of the docker image file to create.
    layers: the layers (tar files) to join to the image.
    tag_to_layer: a map from docker_name.Tag to the layer id it references.
    layer_to_tags: a map from the name of the layer tarball as it appears
            in our archives to the list of tags applied to it.
  """

  with tarfile.open(output, 'w') as tar:
    def add_file(filename, contents):
      info = tarfile.TarInfo(filename)
      info.size = len(contents)
      tar.addfile(tarinfo=info, fileobj=cStringIO.StringIO(contents))

    tag_to_image = {}
    for (tag, config) in six.iteritems(tag_to_config):
      tag_to_image[tag] = FromParts(
          config, diffid_to_blobsum,
          blobsum_to_unzipped, blobsum_to_zipped, blobsum_to_legacy)

    v2_2_save.multi_image_tarball(tag_to_image, tar)
项目:django-dbbackup-ui    作者:timonweb    | 项目源码 | 文件源码
def backup_media():
    extension = "tar.gz"
    filename = utils.filename_generate(extension, content_type='media')
    # Create tarball
    media_storage = get_storage_class()()
    outputfile = utils.create_spooled_temporary_file()
    tar_file = tarfile.open(name=filename, fileobj=outputfile, mode='w:gz')
    for media_filename in explore_storage(media_storage):
        tarinfo = tarfile.TarInfo(media_filename)
        media_file = media_storage.open(media_filename)
        tarinfo.size = len(media_file)
        tar_file.addfile(tarinfo, media_file)
    # Close the TAR for writing
    tar_file.close()
    # Store backup
    outputfile.seek(0)

    return outputfile, filename
项目:containerregistry    作者:google    | 项目源码 | 文件源码
def tarball(
    name,
    image,
    tar
):
  """Produce a "docker save" compatible tarball from the DockerImage.

  Args:
    name: The tag name to write into the repositories file.
    image: a docker image to save.
    tar: the open tarfile into which we are writing the image tarball.
  """
  def add_file(filename, contents):
    info = tarfile.TarInfo(filename)
    info.size = len(contents)
    tar.addfile(tarinfo=info, fileobj=cStringIO.StringIO(contents))

  multi_image_tarball({name: image}, tar)

  # Add our convenience file with the top layer's ID.
  add_file('top', image.top())
项目:apocalypse    作者:dhoomakethu    | 项目源码 | 文件源码
def copy_to_container(self, container, source, dest):
        tar_stream = BytesIO()
        tar_file = tarfile.TarFile(fileobj=tar_stream, mode='w')
        file_data = open(source, mode='rb').read()
        fil_size = os.stat(source).st_size
        tarinfo = tarfile.TarInfo(name=os.path.basename(source))
        tarinfo.size = fil_size
        tarinfo.mtime = time.time()
        # tarinfo.mode = 0600
        tar_file.addfile(tarinfo, BytesIO(file_data))
        tar_file.close()
        tar_stream.seek(0)
        res = self.client.put_archive(container=container['Id'],
                                      path=dest,
                                      data=tar_stream
                                      )
        return res
项目:drydock    作者:att-comdev    | 项目源码 | 文件源码
def tarbuilder(asset_list=None):
        """Create a tar file from rendered assets.

        Add each asset in ``asset_list`` to a tar file with the defined
        path and permission. The assets need to have the rendered_bytes field
        populated. Return a tarfile.TarFile.

        :param hostname: the hostname the tar is destined for
        :param balltype: the type of assets being included
        :param asset_list: list of objects.BootActionAsset instances
        """
        tarbytes = io.BytesIO()
        tarball = tarfile.open(
            mode='w:gz', fileobj=tarbytes, format=tarfile.GNU_FORMAT)
        asset_list = asset_list or []
        for a in asset_list:
            fileobj = io.BytesIO(a.rendered_bytes)
            tarasset = tarfile.TarInfo(name=a.path)
            tarasset.size = len(a.rendered_bytes)
            tarasset.mode = a.permissions if a.permissions else 0o600
            tarasset.uid = 0
            tarasset.gid = 0
            tarball.addfile(tarasset, fileobj=fileobj)
        tarball.close()
        return tarbytes.getvalue()
项目:virt-bootstrap    作者:virt-manager    | 项目源码 | 文件源码
def create_tar_members(self, tar_handle, members, m_type, uid=0, gid=0):
        """
        Add members to tar file.
        """
        for member_name in members:
            member_data = ''
            permissions = DEFAULT_FILE_MODE
            if isinstance(member_name, tuple):
                if len(member_name) == 3:
                    member_data = member_name[2]
                member_name, permissions = member_name[:2]
            data_encoded = member_data.encode('utf-8')

            t_info = tarfile.TarInfo(member_name)
            t_info.type = m_type
            t_info.mode = permissions
            t_info.uid = uid
            t_info.gid = gid
            t_info.size = len(data_encoded)

            tar_handle.addfile(t_info, io.BytesIO(data_encoded))
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def make_trivial_sdist(dist_path, setup_py):
    """Create a simple sdist tarball at dist_path, containing just a
    setup.py, the contents of which are provided by the setup_py string.
    """

    setup_py_file = tarfile.TarInfo(name='setup.py')
    try:
        # Python 3 (StringIO gets converted to io module)
        MemFile = BytesIO
    except AttributeError:
        MemFile = StringIO
    setup_py_bytes = MemFile(setup_py.encode('utf-8'))
    setup_py_file.size = len(setup_py_bytes.getvalue())
    dist = tarfile.open(dist_path, 'w:gz')
    try:
        dist.addfile(setup_py_file, fileobj=setup_py_bytes)
    finally:
        dist.close()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_ignore_zeros(self):
        # Test TarFile's ignore_zeros option.
        if self.mode.endswith(":gz"):
            _open = gzip.GzipFile
        elif self.mode.endswith(":bz2"):
            _open = bz2.BZ2File
        else:
            _open = open

        for char in (b'\0', b'a'):
            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
            # are ignored correctly.
            with _open(tmpname, "wb") as fobj:
                fobj.write(char * 1024)
                fobj.write(tarfile.TarInfo("foo").tobuf())

            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
            try:
                self.assertListEqual(tar.getnames(), ["foo"],
                    "ignore_zeros=True should have skipped the %r-blocks" % char)
            finally:
                tar.close()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_100_char_name(self):
        # The name field in a tar header stores strings of at most 100 chars.
        # If a string is shorter than 100 chars it has to be padded with '\0',
        # which implies that a string of exactly 100 chars is stored without
        # a trailing '\0'.
        name = "0123456789" * 10
        tar = tarfile.open(tmpname, self.mode)
        try:
            t = tarfile.TarInfo(name)
            tar.addfile(t)
        finally:
            tar.close()

        tar = tarfile.open(tmpname)
        try:
            self.assertTrue(tar.getnames()[0] == name,
                    "failed to store 100 char filename")
        finally:
            tar.close()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_pax_extended_header(self):
        # The fields from the pax header have priority over the
        # TarInfo.
        pax_headers = {"path": "foo", "uid": "123"}

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1")
        try:
            t = tarfile.TarInfo()
            t.name = "\xe4\xf6\xfc" # non-ASCII
            t.uid = 8**8 # too large
            t.pax_headers = pax_headers
            tar.addfile(t)
        finally:
            tar.close()

        tar = tarfile.open(tmpname, encoding="iso8859-1")
        try:
            t = tar.getmembers()[0]
            self.assertEqual(t.pax_headers, pax_headers)
            self.assertEqual(t.name, "foo")
            self.assertEqual(t.uid, 123)
        finally:
            tar.close()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_unicode_filename_error(self):
        if self.format == tarfile.PAX_FORMAT:
            # PAX_FORMAT ignores encoding in write mode.
            return

        tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict")
        try:
            tarinfo = tarfile.TarInfo()

            tarinfo.name = "\xe4\xf6\xfc"
            self.assertRaises(UnicodeError, tar.addfile, tarinfo)

            tarinfo.name = "foo"
            tarinfo.uname = "\xe4\xf6\xfc"
            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
        finally:
            tar.close()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_uname_unicode(self):
        t = tarfile.TarInfo("foo")
        t.uname = "\xe4\xf6\xfc"
        t.gname = "\xe4\xf6\xfc"

        tar = tarfile.open(tmpname, mode="w", format=self.format, encoding="iso8859-1")
        try:
            tar.addfile(t)
        finally:
            tar.close()

        tar = tarfile.open(tmpname, encoding="iso8859-1")
        try:
            t = tar.getmember("foo")
            self.assertEqual(t.uname, "\xe4\xf6\xfc")
            self.assertEqual(t.gname, "\xe4\xf6\xfc")

            if self.format != tarfile.PAX_FORMAT:
                tar.close()
                tar = tarfile.open(tmpname, encoding="ascii")
                t = tar.getmember("foo")
                self.assertEqual(t.uname, "\udce4\udcf6\udcfc")
                self.assertEqual(t.gname, "\udce4\udcf6\udcfc")
        finally:
            tar.close()
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def make_trivial_sdist(dist_path, setup_py):
    """Create a simple sdist tarball at dist_path, containing just a
    setup.py, the contents of which are provided by the setup_py string.
    """

    setup_py_file = tarfile.TarInfo(name='setup.py')
    try:
        # Python 3 (StringIO gets converted to io module)
        MemFile = BytesIO
    except AttributeError:
        MemFile = StringIO
    setup_py_bytes = MemFile(setup_py.encode('utf-8'))
    setup_py_file.size = len(setup_py_bytes.getvalue())
    dist = tarfile.open(dist_path, 'w:gz')
    try:
        dist.addfile(setup_py_file, fileobj=setup_py_bytes)
    finally:
        dist.close()
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def make_trivial_sdist(dist_path, setup_py):
    """Create a simple sdist tarball at dist_path, containing just a
    setup.py, the contents of which are provided by the setup_py string.
    """

    setup_py_file = tarfile.TarInfo(name='setup.py')
    try:
        # Python 3 (StringIO gets converted to io module)
        MemFile = BytesIO
    except AttributeError:
        MemFile = StringIO
    setup_py_bytes = MemFile(setup_py.encode('utf-8'))
    setup_py_file.size = len(setup_py_bytes.getvalue())
    dist = tarfile.open(dist_path, 'w:gz')
    try:
        dist.addfile(setup_py_file, fileobj=setup_py_bytes)
    finally:
        dist.close()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_premature_end_of_archive(self):
        for size in (512, 600, 1024, 1200):
            with tarfile.open(tmpname, "w:") as tar:
                t = tarfile.TarInfo("foo")
                t.size = 1024
                tar.addfile(t, StringIO.StringIO("a" * 1024))

            with open(tmpname, "r+b") as fobj:
                fobj.truncate(size)

            with tarfile.open(tmpname) as tar:
                with self.assertRaisesRegexp(tarfile.ReadError, "unexpected end of data"):
                    for t in tar:
                        pass

            with tarfile.open(tmpname) as tar:
                t = tar.next()

                with self.assertRaisesRegexp(tarfile.ReadError, "unexpected end of data"):
                    tar.extract(t, TEMPDIR)

                with self.assertRaisesRegexp(tarfile.ReadError, "unexpected end of data"):
                    tar.extractfile(t).read()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_100_char_name(self):
        # The name field in a tar header stores strings of at most 100 chars.
        # If a string is shorter than 100 chars it has to be padded with '\0',
        # which implies that a string of exactly 100 chars is stored without
        # a trailing '\0'.
        name = "0123456789" * 10
        tar = tarfile.open(tmpname, self.mode)
        try:
            t = tarfile.TarInfo(name)
            tar.addfile(t)
        finally:
            tar.close()

        tar = tarfile.open(tmpname)
        try:
            self.assertTrue(tar.getnames()[0] == name,
                    "failed to store 100 char filename")
        finally:
            tar.close()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def _test(self, name, link=None):
        # See GNUWriteTest.
        tarinfo = tarfile.TarInfo(name)
        if link:
            tarinfo.linkname = link
            tarinfo.type = tarfile.LNKTYPE

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
        try:
            tar.addfile(tarinfo)
        finally:
            tar.close()

        tar = tarfile.open(tmpname)
        try:
            if link:
                l = tar.getmembers()[0].linkname
                self.assertTrue(link == l, "PAX longlink creation failed")
            else:
                n = tar.getmembers()[0].name
                self.assertTrue(name == n, "PAX longname creation failed")
        finally:
            tar.close()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_pax_extended_header(self):
        # The fields from the pax header have priority over the
        # TarInfo.
        pax_headers = {u"path": u"foo", u"uid": u"123"}

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1")
        try:
            t = tarfile.TarInfo()
            t.name = u"\xe4\xf6\xfc"     # non-ASCII
            t.uid = 8**8        # too large
            t.pax_headers = pax_headers
            tar.addfile(t)
        finally:
            tar.close()

        tar = tarfile.open(tmpname, encoding="iso8859-1")
        try:
            t = tar.getmembers()[0]
            self.assertEqual(t.pax_headers, pax_headers)
            self.assertEqual(t.name, "foo")
            self.assertEqual(t.uid, 123)
        finally:
            tar.close()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_unicode_filename_error(self):
        tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict")
        try:
            tarinfo = tarfile.TarInfo()

            tarinfo.name = "\xe4\xf6\xfc"
            if self.format == tarfile.PAX_FORMAT:
                self.assertRaises(UnicodeError, tar.addfile, tarinfo)
            else:
                tar.addfile(tarinfo)

            tarinfo.name = u"\xe4\xf6\xfc"
            self.assertRaises(UnicodeError, tar.addfile, tarinfo)

            tarinfo.name = "foo"
            tarinfo.uname = u"\xe4\xf6\xfc"
            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
        finally:
            tar.close()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def _test_partial_input(self, mode):
        class MyStringIO(StringIO.StringIO):
            hit_eof = False
            def read(self, n):
                if self.hit_eof:
                    raise AssertionError("infinite loop detected in tarfile.open()")
                self.hit_eof = self.pos == self.len
                return StringIO.StringIO.read(self, n)
            def seek(self, *args):
                self.hit_eof = False
                return StringIO.StringIO.seek(self, *args)

        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
        for x in range(len(data) + 1):
            try:
                tarfile.open(fileobj=MyStringIO(data[:x]), mode=mode)
            except tarfile.ReadError:
                pass # we have no interest in ReadErrors
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_premature_end_of_archive(self):
        for size in (512, 600, 1024, 1200):
            with tarfile.open(tmpname, "w:") as tar:
                t = tarfile.TarInfo("foo")
                t.size = 1024
                tar.addfile(t, StringIO.StringIO("a" * 1024))

            with open(tmpname, "r+b") as fobj:
                fobj.truncate(size)

            with tarfile.open(tmpname) as tar:
                with self.assertRaisesRegexp(tarfile.ReadError, "unexpected end of data"):
                    for t in tar:
                        pass

            with tarfile.open(tmpname) as tar:
                t = tar.next()

                with self.assertRaisesRegexp(tarfile.ReadError, "unexpected end of data"):
                    tar.extract(t, TEMPDIR)

                with self.assertRaisesRegexp(tarfile.ReadError, "unexpected end of data"):
                    tar.extractfile(t).read()
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_100_char_name(self):
        # The name field in a tar header stores strings of at most 100 chars.
        # If a string is shorter than 100 chars it has to be padded with '\0',
        # which implies that a string of exactly 100 chars is stored without
        # a trailing '\0'.
        name = "0123456789" * 10
        tar = tarfile.open(tmpname, self.mode)
        try:
            t = tarfile.TarInfo(name)
            tar.addfile(t)
        finally:
            tar.close()

        tar = tarfile.open(tmpname)
        try:
            self.assertTrue(tar.getnames()[0] == name,
                    "failed to store 100 char filename")
        finally:
            tar.close()
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def _test(self, name, link=None):
        # See GNUWriteTest.
        tarinfo = tarfile.TarInfo(name)
        if link:
            tarinfo.linkname = link
            tarinfo.type = tarfile.LNKTYPE

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
        try:
            tar.addfile(tarinfo)
        finally:
            tar.close()

        tar = tarfile.open(tmpname)
        try:
            if link:
                l = tar.getmembers()[0].linkname
                self.assertTrue(link == l, "PAX longlink creation failed")
            else:
                n = tar.getmembers()[0].name
                self.assertTrue(name == n, "PAX longname creation failed")
        finally:
            tar.close()
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_pax_extended_header(self):
        # The fields from the pax header have priority over the
        # TarInfo.
        pax_headers = {u"path": u"foo", u"uid": u"123"}

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1")
        try:
            t = tarfile.TarInfo()
            t.name = u"\xe4\xf6\xfc"     # non-ASCII
            t.uid = 8**8        # too large
            t.pax_headers = pax_headers
            tar.addfile(t)
        finally:
            tar.close()

        tar = tarfile.open(tmpname, encoding="iso8859-1")
        try:
            t = tar.getmembers()[0]
            self.assertEqual(t.pax_headers, pax_headers)
            self.assertEqual(t.name, "foo")
            self.assertEqual(t.uid, 123)
        finally:
            tar.close()
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_unicode_filename_error(self):
        tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict")
        try:
            tarinfo = tarfile.TarInfo()

            tarinfo.name = "\xe4\xf6\xfc"
            if self.format == tarfile.PAX_FORMAT:
                self.assertRaises(UnicodeError, tar.addfile, tarinfo)
            else:
                tar.addfile(tarinfo)

            tarinfo.name = u"\xe4\xf6\xfc"
            self.assertRaises(UnicodeError, tar.addfile, tarinfo)

            tarinfo.name = "foo"
            tarinfo.uname = u"\xe4\xf6\xfc"
            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
        finally:
            tar.close()
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def _test_partial_input(self, mode):
        class MyStringIO(StringIO.StringIO):
            hit_eof = False
            def read(self, n):
                if self.hit_eof:
                    raise AssertionError("infinite loop detected in tarfile.open()")
                self.hit_eof = self.pos == self.len
                return StringIO.StringIO.read(self, n)
            def seek(self, *args):
                self.hit_eof = False
                return StringIO.StringIO.seek(self, *args)

        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
        for x in range(len(data) + 1):
            try:
                tarfile.open(fileobj=MyStringIO(data[:x]), mode=mode)
            except tarfile.ReadError:
                pass # we have no interest in ReadErrors
项目:promenade    作者:att-comdev    | 项目源码 | 文件源码
def add(self, *, path, data, mode):
        if path.startswith('/'):
            path = path[1:]

        tar_info = tarfile.TarInfo(name=path)
        if isinstance(data, str):
            data_bytes = data.encode('utf-8')
        else:
            data_bytes = data
        tar_info.size = len(data_bytes)
        tar_info.mode = mode

        if tar_info.size > 0:
            # Ignore bandit false positive: B303:blacklist
            # This is a basic checksum for debugging not a secure hash.
            LOG.debug(  # nosec
                'Adding file path=%s size=%s md5=%s', path, tar_info.size,
                hashlib.md5(data_bytes).hexdigest())
        else:
            LOG.warning('Zero length file added to path=%s', path)

        self._tf.addfile(tar_info, io.BytesIO(data_bytes))
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def make_trivial_sdist(dist_path, setup_py):
    """Create a simple sdist tarball at dist_path, containing just a
    setup.py, the contents of which are provided by the setup_py string.
    """

    setup_py_file = tarfile.TarInfo(name='setup.py')
    try:
        # Python 3 (StringIO gets converted to io module)
        MemFile = BytesIO
    except AttributeError:
        MemFile = StringIO
    setup_py_bytes = MemFile(setup_py.encode('utf-8'))
    setup_py_file.size = len(setup_py_bytes.getvalue())
    dist = tarfile.open(dist_path, 'w:gz')
    try:
        dist.addfile(setup_py_file, fileobj=setup_py_bytes)
    finally:
        dist.close()
项目:NeuroMobile    作者:AndrewADykman    | 项目源码 | 文件源码
def make_trivial_sdist(dist_path, setup_py):
    """Create a simple sdist tarball at dist_path, containing just a
    setup.py, the contents of which are provided by the setup_py string.
    """

    setup_py_file = tarfile.TarInfo(name='setup.py')
    try:
        # Python 3 (StringIO gets converted to io module)
        MemFile = BytesIO
    except AttributeError:
        MemFile = StringIO
    setup_py_bytes = MemFile(setup_py.encode('utf-8'))
    setup_py_file.size = len(setup_py_bytes.getvalue())
    dist = tarfile.open(dist_path, 'w:gz')
    try:
        dist.addfile(setup_py_file, fileobj=setup_py_bytes)
    finally:
        dist.close()
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_ignore_zeros(self):
        # Test TarFile's ignore_zeros option.
        for char in (b'\0', b'a'):
            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
            # are ignored correctly.
            with self.open(tmpname, "w") as fobj:
                fobj.write(char * 1024)
                fobj.write(tarfile.TarInfo("foo").tobuf())

            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
            try:
                self.assertListEqual(tar.getnames(), ["foo"],
                    "ignore_zeros=True should have skipped the %r-blocks" %
                    char)
            finally:
                tar.close()
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_100_char_name(self):
        # The name field in a tar header stores strings of at most 100 chars.
        # If a string is shorter than 100 chars it has to be padded with '\0',
        # which implies that a string of exactly 100 chars is stored without
        # a trailing '\0'.
        name = "0123456789" * 10
        tar = tarfile.open(tmpname, self.mode)
        try:
            t = tarfile.TarInfo(name)
            tar.addfile(t)
        finally:
            tar.close()

        tar = tarfile.open(tmpname)
        try:
            self.assertEqual(tar.getnames()[0], name,
                    "failed to store 100 char filename")
        finally:
            tar.close()
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_pax_extended_header(self):
        # The fields from the pax header have priority over the
        # TarInfo.
        pax_headers = {"path": "foo", "uid": "123"}

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
                           encoding="iso8859-1")
        try:
            t = tarfile.TarInfo()
            t.name = "\xe4\xf6\xfc" # non-ASCII
            t.uid = 8**8 # too large
            t.pax_headers = pax_headers
            tar.addfile(t)
        finally:
            tar.close()

        tar = tarfile.open(tmpname, encoding="iso8859-1")
        try:
            t = tar.getmembers()[0]
            self.assertEqual(t.pax_headers, pax_headers)
            self.assertEqual(t.name, "foo")
            self.assertEqual(t.uid, 123)
        finally:
            tar.close()
项目:navigator    作者:naviga-tor    | 项目源码 | 文件源码
def write(self, worker, probe, dest):
        """
        Serialize probe data, compress it and write it exclusively
        to output file.
        """
        data = StringIO()
        data.write(compress(dumps(probe, HIGHEST_PROTOCOL)))
        data.seek(0)
        info = tarfile.TarInfo()
        info.name = 'Probe_%s.lzo' % dest
        info.uid = 0
        info.gid = 0
        info.size = len(data.buf)
        info.mode = S_IMODE(0o0444)
        info.mtime = mktime(probe.circs[0].created.timetuple())
        with self._lock:
            # Maximum file size is about 1 GB
            if self._bytes_written >= 1 * 1000 * 1000 * 1000:
                self._tar.close()
                self._tar = self._create_tar_file()
                self._bytes_written = 0
            self._tar.addfile(tarinfo=info, fileobj=data)
            self._bytes_written += info.size
            self._threads_finished.append(worker)
            self._worker_finished.set()
项目:cmdchallenge-site    作者:jarv    | 项目源码 | 文件源码
def mkbuildcontext(dockerfile):
    f = tempfile.NamedTemporaryFile()
    t = tarfile.open(mode='w', fileobj=f)
    if isinstance(dockerfile, io.StringIO):
        dfinfo = tarfile.TarInfo('Dockerfile')
        if six.PY3:
            raise TypeError('Please use io.BytesIO to create in-memory '
                            'Dockerfiles with Python 3')
        else:
            dfinfo.size = len(dockerfile.getvalue())
            dockerfile.seek(0)
    elif isinstance(dockerfile, io.BytesIO):
        dfinfo = tarfile.TarInfo('Dockerfile')
        dfinfo.size = len(dockerfile.getvalue())
        dockerfile.seek(0)
    else:
        dfinfo = t.gettarinfo(fileobj=dockerfile, arcname='Dockerfile')
    t.addfile(dfinfo, dockerfile)
    t.close()
    f.seek(0)
    return f
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def _tar_add_string_file(self, tarobj, fpath, content):
        """
        Given a tarfile object, add a file to it at ``fpath``, with content
        ``content``.

        Largely based on: http://stackoverflow.com/a/40392022

        :param tarobj: the tarfile to add to
        :type tarobj: tarfile.TarFile
        :param fpath: path to put the file at in the archive
        :type fpath: str
        :param content: file content
        :type content: str
        """
        logger.debug('Adding %d-length string to tarfile at %s',
                     len(content), fpath)
        data = content.encode('utf-8')
        f = BytesIO(data)
        info = tarfile.TarInfo(name=fpath)
        info.size = len(data)
        tarobj.addfile(tarinfo=info, fileobj=f)
项目:AJudge    作者:AJudge-team    | 项目源码 | 文件源码
def write_files_in_sandbox(self, files: List[Tuple[str, bytes]], dest_dir: str) -> bool:
        tar_file = BytesIO()  # use BytesIO to store tarfile in memory

        tarobj = tarfile.open(fileobj=tar_file, mode="w")
        for file_name, file_contents in files:
            tarinfo = tarfile.TarInfo(name=file_name) # set file name
            tarinfo.size = len(file_contents)  # set file size
            tarobj.addfile(tarinfo, BytesIO(file_contents))  # add file to archive

        tarobj.close()

        tar_file.seek(0)  # set stream position is start of the stream
        data = tar_file.read()
        return self.docker_client.put_archive(self.container.get('Id'), dest_dir, data)

    # get file from sandbox
    # file_path : A path to a file or a directory
    # For a directory, file_path should be end with '/' or '/.'
    # If path ends in /. then this indicates that only the contents of the path directory should be copied.
    # A symlink is always resolved to its target.
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_ignore_zeros(self):
        # Test TarFile's ignore_zeros option.
        if self.mode.endswith(":gz"):
            _open = gzip.GzipFile
        elif self.mode.endswith(":bz2"):
            _open = bz2.BZ2File
        else:
            _open = open

        for char in ('\0', 'a'):
            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
            # are ignored correctly.
            with _open(tmpname, "wb") as fobj:
                fobj.write(char * 1024)
                fobj.write(tarfile.TarInfo("foo").tobuf())

            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
            try:
                self.assertListEqual(tar.getnames(), ["foo"],
                    "ignore_zeros=True should have skipped the %r-blocks" %
                    char)
            finally:
                tar.close()
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_100_char_name(self):
        # The name field in a tar header stores strings of at most 100 chars.
        # If a string is shorter than 100 chars it has to be padded with '\0',
        # which implies that a string of exactly 100 chars is stored without
        # a trailing '\0'.
        name = "0123456789" * 10
        tar = tarfile.open(tmpname, self.mode)
        try:
            t = tarfile.TarInfo(name)
            tar.addfile(t)
        finally:
            tar.close()

        tar = tarfile.open(tmpname)
        try:
            self.assertTrue(tar.getnames()[0] == name,
                    "failed to store 100 char filename")
        finally:
            tar.close()
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_pax_extended_header(self):
        # The fields from the pax header have priority over the
        # TarInfo.
        pax_headers = {u"path": u"foo", u"uid": u"123"}

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
                           encoding="iso8859-1")
        try:
            t = tarfile.TarInfo()
            t.name = u"äöü"     # non-ASCII
            t.uid = 8**8        # too large
            t.pax_headers = pax_headers
            tar.addfile(t)
        finally:
            tar.close()

        tar = tarfile.open(tmpname, encoding="iso8859-1")
        try:
            t = tar.getmembers()[0]
            self.assertEqual(t.pax_headers, pax_headers)
            self.assertEqual(t.name, "foo")
            self.assertEqual(t.uid, 123)
        finally:
            tar.close()
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_unicode_filename_error(self):
        tar = tarfile.open(tmpname, "w", format=self.format,
                           encoding="ascii", errors="strict")
        try:
            tarinfo = tarfile.TarInfo()

            tarinfo.name = "äöü"
            if self.format == tarfile.PAX_FORMAT:
                self.assertRaises(UnicodeError, tar.addfile, tarinfo)
            else:
                tar.addfile(tarinfo)

            tarinfo.name = u"äöü"
            self.assertRaises(UnicodeError, tar.addfile, tarinfo)

            tarinfo.name = "foo"
            tarinfo.uname = u"äöü"
            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
        finally:
            tar.close()
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_uname_unicode(self):
        for name in (u"äöü", "äöü"):
            t = tarfile.TarInfo("foo")
            t.uname = name
            t.gname = name

            fobj = StringIO.StringIO()
            tar = tarfile.open("foo.tar", mode="w", fileobj=fobj, format=self.format, encoding="iso8859-1")
            tar.addfile(t)
            tar.close()
            fobj.seek(0)

            tar = tarfile.open("foo.tar", fileobj=fobj, encoding="iso8859-1")
            t = tar.getmember("foo")
            self.assertEqual(t.uname, "äöü")
            self.assertEqual(t.gname, "äöü")
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def _test_partial_input(self, mode):
        class MyStringIO(StringIO.StringIO):
            hit_eof = False
            def read(self, n):
                if self.hit_eof:
                    raise AssertionError("infinite loop detected in tarfile.open()")
                self.hit_eof = self.pos == self.len
                return StringIO.StringIO.read(self, n)
            def seek(self, *args):
                self.hit_eof = False
                return StringIO.StringIO.seek(self, *args)

        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
        for x in range(len(data) + 1):
            try:
                tarfile.open(fileobj=MyStringIO(data[:x]), mode=mode)
            except tarfile.ReadError:
                pass # we have no interest in ReadErrors
项目:dymo-m10-python    作者:pbrf    | 项目源码 | 文件源码
def make_trivial_sdist(dist_path, setup_py):
    """Create a simple sdist tarball at dist_path, containing just a
    setup.py, the contents of which are provided by the setup_py string.
    """

    setup_py_file = tarfile.TarInfo(name='setup.py')
    try:
        # Python 3 (StringIO gets converted to io module)
        MemFile = BytesIO
    except AttributeError:
        MemFile = StringIO
    setup_py_bytes = MemFile(setup_py.encode('utf-8'))
    setup_py_file.size = len(setup_py_bytes.getvalue())
    dist = tarfile.open(dist_path, 'w:gz')
    try:
        dist.addfile(setup_py_file, fileobj=setup_py_bytes)
    finally:
        dist.close()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_ignore_zeros(self):
        # Test TarFile's ignore_zeros option.
        for char in (b'\0', b'a'):
            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
            # are ignored correctly.
            with self.open(tmpname, "w") as fobj:
                fobj.write(char * 1024)
                fobj.write(tarfile.TarInfo("foo").tobuf())

            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
            try:
                self.assertListEqual(tar.getnames(), ["foo"],
                    "ignore_zeros=True should have skipped the %r-blocks" %
                    char)
            finally:
                tar.close()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_premature_end_of_archive(self):
        for size in (512, 600, 1024, 1200):
            with tarfile.open(tmpname, "w:") as tar:
                t = tarfile.TarInfo("foo")
                t.size = 1024
                tar.addfile(t, io.BytesIO(b"a" * 1024))

            with open(tmpname, "r+b") as fobj:
                fobj.truncate(size)

            with tarfile.open(tmpname) as tar:
                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
                    for t in tar:
                        pass

            with tarfile.open(tmpname) as tar:
                t = tar.next()

                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
                    tar.extract(t, TEMPDIR)

                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
                    tar.extractfile(t).read()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_100_char_name(self):
        # The name field in a tar header stores strings of at most 100 chars.
        # If a string is shorter than 100 chars it has to be padded with '\0',
        # which implies that a string of exactly 100 chars is stored without
        # a trailing '\0'.
        name = "0123456789" * 10
        tar = tarfile.open(tmpname, self.mode)
        try:
            t = tarfile.TarInfo(name)
            tar.addfile(t)
        finally:
            tar.close()

        tar = tarfile.open(tmpname)
        try:
            self.assertEqual(tar.getnames()[0], name,
                    "failed to store 100 char filename")
        finally:
            tar.close()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _test(self, name, link=None):
        # See GNUWriteTest.
        tarinfo = tarfile.TarInfo(name)
        if link:
            tarinfo.linkname = link
            tarinfo.type = tarfile.LNKTYPE

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
        try:
            tar.addfile(tarinfo)
        finally:
            tar.close()

        tar = tarfile.open(tmpname)
        try:
            if link:
                l = tar.getmembers()[0].linkname
                self.assertEqual(link, l, "PAX longlink creation failed")
            else:
                n = tar.getmembers()[0].name
                self.assertEqual(name, n, "PAX longname creation failed")
        finally:
            tar.close()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_pax_extended_header(self):
        # The fields from the pax header have priority over the
        # TarInfo.
        pax_headers = {"path": "foo", "uid": "123"}

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
                           encoding="iso8859-1")
        try:
            t = tarfile.TarInfo()
            t.name = "\xe4\xf6\xfc" # non-ASCII
            t.uid = 8**8 # too large
            t.pax_headers = pax_headers
            tar.addfile(t)
        finally:
            tar.close()

        tar = tarfile.open(tmpname, encoding="iso8859-1")
        try:
            t = tar.getmembers()[0]
            self.assertEqual(t.pax_headers, pax_headers)
            self.assertEqual(t.name, "foo")
            self.assertEqual(t.uid, 123)
        finally:
            tar.close()