Python tarfile 模块,open() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用tarfile.open()

项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def convert_image(inpath, outpath, size):
    """Convert an image file using `sips`.

    Args:
        inpath (str): Path of source file.
        outpath (str): Path to destination file.
        size (int): Width and height of destination image in pixels.

    Raises:
        RuntimeError: Raised if `sips` exits with non-zero status.
    """
    cmd = [
        b'sips',
        b'-z', b'{0}'.format(size), b'{0}'.format(size),
        inpath,
        b'--out', outpath]
    # log().debug(cmd)
    with open(os.devnull, 'w') as pipe:
        retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)

    if retcode != 0:
        raise RuntimeError('sips exited with {0}'.format(retcode))
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def read_text_file(filename):
    """Return the contents of *filename*.

    Try to decode the file contents with utf-8, the preferred system encoding
    (e.g., cp1252 on some Windows machines), and latin1, in that order.
    Decoding a byte string with latin1 will never raise an error. In the worst
    case, the returned string will contain some garbage characters.

    """
    with open(filename, 'rb') as fp:
        data = fp.read()

    encodings = ['utf-8', locale.getpreferredencoding(False), 'latin1']
    for enc in encodings:
        try:
            data = data.decode(enc)
        except UnicodeDecodeError:
            continue
        break

    assert type(data) != bytes  # Latin1 should have worked.
    return data
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def read_text_file(filename):
    """Return the contents of *filename*.

    Try to decode the file contents with utf-8, the preferred system encoding
    (e.g., cp1252 on some Windows machines), and latin1, in that order.
    Decoding a byte string with latin1 will never raise an error. In the worst
    case, the returned string will contain some garbage characters.

    """
    with open(filename, 'rb') as fp:
        data = fp.read()

    encodings = ['utf-8', locale.getpreferredencoding(False), 'latin1']
    for enc in encodings:
        try:
            data = data.decode(enc)
        except UnicodeDecodeError:
            continue
        break

    assert type(data) != bytes  # Latin1 should have worked.
    return data
项目:ml    作者:hohoins    | 项目源码 | 文件源码
def maybe_download_and_extract():
  """Download and extract the tarball from Alex's website."""
  dest_directory = FLAGS.data_dir
  if not os.path.exists(dest_directory):
    os.makedirs(dest_directory)
  filename = DATA_URL.split('/')[-1]
  filepath = os.path.join(dest_directory, filename)
  if not os.path.exists(filepath):
    def _progress(count, block_size, total_size):
      sys.stdout.write('\r>> Downloading %s %.1f%%' % (filename,
          float(count * block_size) / float(total_size) * 100.0))
      sys.stdout.flush()
    filepath, _ = urllib.request.urlretrieve(DATA_URL, filepath,
                                             reporthook=_progress)
    print()
    statinfo = os.stat(filepath)
    print('Successfully downloaded', filename, statinfo.st_size, 'bytes.')
    tarfile.open(filepath, 'r:gz').extractall(dest_directory)
项目:ml    作者:hohoins    | 项目源码 | 文件源码
def maybe_download_and_extract():
  """Download and extract the tarball from Alex's website."""
  dest_directory = FLAGS.data_dir
  if not os.path.exists(dest_directory):
    os.makedirs(dest_directory)
  filename = DATA_URL.split('/')[-1]
  filepath = os.path.join(dest_directory, filename)
  if not os.path.exists(filepath):
    def _progress(count, block_size, total_size):
      sys.stdout.write('\r>> Downloading %s %.1f%%' % (filename,
          float(count * block_size) / float(total_size) * 100.0))
      sys.stdout.flush()
    filepath, _ = urllib.request.urlretrieve(DATA_URL, filepath,
                                             reporthook=_progress)
    print()
    statinfo = os.stat(filepath)
    print('Successfully downloaded', filename, statinfo.st_size, 'bytes.')
    tarfile.open(filepath, 'r:gz').extractall(dest_directory)
项目:android-backup-tools    作者:bluec0re    | 项目源码 | 文件源码
def __init__(self, fname=None, password=None, stream=True):
        """
        :param fname: The filename of the backup file or a file-like object
        :param password: The password to use for the en-/decryption
        :param stream: Open the backup file in stream mode. Reduces memory usage
                       but allows only sequential reads. Default: True 
        """
        self.fp = None
        self.version = None
        self.compression = None
        self.encryption = None
        self.stream = stream
        self.password = password
        # position of the actual file data (after the header)
        self.__data_start = 0

        if isinstance(fname, str):
            self.open(fname)
        else:
            self.fp = fname
项目:android-backup-tools    作者:bluec0re    | 项目源码 | 文件源码
def read_data(self, password=None):
        """
        Helper function which decrypts and decompresses the data if necessary
        and returns a tarfile.TarFile to interact with
        """
        fp = self.fp
        fp.seek(self.__data_start)

        if self.is_encrypted():
            fp = self._decrypt(fp, password=password)

        if self.compression == CompressionType.ZLIB:
            fp = self._decompress(fp)

        if self.stream:
            mode = 'r|*'
        else:
            mode = 'r:*'
        tar = tarfile.open(fileobj=fp, mode=mode)
        return tar
项目:PyWebRunner    作者:IntuitiveWebSolutions    | 项目源码 | 文件源码
def download_driver_file(whichbin, url, base_path):
    if url.endswith('.tar.gz'):
        ext = '.tar.gz'
    else:
        ext = '.zip'
    print("Downloading from: {}".format(url))
    download_file(url, '/tmp/pwr_temp{}'.format(ext))
    if ext == '.tar.gz':
        import tarfile
        tar = tarfile.open('/tmp/pwr_temp{}'.format(ext), "r:gz")
        tar.extractall('{}/'.format(base_path))
        tar.close()
    else:
        import zipfile
        with zipfile.ZipFile('/tmp/pwr_temp{}'.format(ext), "r") as z:
            z.extractall('{}/'.format(base_path))

    # if whichbin == 'wires' and '/v{}/'.format(latest_gecko_driver) in url:
    #     os.rename('{}/geckodriver'.format(base_path),
    #               '{}/wires'.format(base_path))
    #     os.chmod('{}/wires'.format(base_path), 0o775)
    if whichbin == 'wires':
        os.chmod('{}/geckodriver'.format(base_path), 0o775)
    else:
        os.chmod('{}/chromedriver'.format(base_path), 0o775)
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def testUploadPackageNoFail(self):
        """The nofail option must prevent fatal error on upload failures"""

        archive = self.__getArchiveInstance({"flags" : ["upload", "download", "nofail"]})
        archive.wantUpload(True)
        with TemporaryDirectory() as tmp:
            # create simple workspace
            audit = os.path.join(tmp, "audit.json.gz")
            content = os.path.join(tmp, "workspace")
            with open(audit, "wb") as f:
                f.write(b"AUDIT")
            os.mkdir(content)
            with open(os.path.join(content, "data"), "wb") as f:
                f.write(b"DATA")

            # must not throw
            archive.uploadPackage(ERROR_UPLOAD_ARTIFACT, audit, content, 0)
            archive.uploadPackage(ERROR_UPLOAD_ARTIFACT, audit, content, 1)

        # also live-build-id upload errors must not throw with nofail
        archive.uploadLocalLiveBuildId(ERROR_UPLOAD_ARTIFACT, b'\x00', 0)
        archive.uploadLocalLiveBuildId(ERROR_UPLOAD_ARTIFACT, b'\x00', 1)
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def testUploadJenkinsNoFail(self):
        """The nofail option must prevent fatal error on upload failures"""

        archive = self.__getArchiveInstance({"flags" : ["upload", "download", "nofail"]})
        archive.wantUpload(True)

        with TemporaryDirectory() as tmp:
            with open(os.path.join(tmp, "error.buildid"), "wb") as f:
                f.write(ERROR_UPLOAD_ARTIFACT)
            self.__createArtifactByName(os.path.join(tmp, "result.tgz"))

            # these uploads must not fail even though they do not succeed
            script = archive.upload(None, "error.buildid", "result.tgz")
            callJenkinsScript(script, tmp)
            script = archive.uploadJenkinsLiveBuildId(None, "error.buildid", "test.buildid")
            callJenkinsScript(script, tmp)
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def testInvalidServer(self):
        """Test download on non-existent server"""

        spec = { 'url' : "https://127.1.2.3:7257" }
        archive = SimpleHttpArchive(spec)
        archive.wantDownload(True)
        archive.wantUpload(True)

        # Local
        archive.downloadPackage(b'\x00'*20, "unused", "unused", 0)
        archive.downloadPackage(b'\x00'*20, "unused", "unused", 1)
        self.assertEqual(archive.downloadLocalLiveBuildId(b'\x00'*20, 0), None)

        # Jenkins
        with TemporaryDirectory() as workspace:
            with open(os.path.join(workspace, "test.buildid"), "wb") as f:
                f.write(b'\x00'*20)
            script = archive.download(None, "test.buildid", "result.tgz")
            callJenkinsScript(script, workspace)
项目:packagecore    作者:BytePackager    | 项目源码 | 文件源码
def makeSrcTar(self, container):
        # this function violates the chroot encapsulation quite badly...
        filename = os.path.join(self._pkgBuildDir, self._pkgTar)
        if filename.endswith(".tar"):
            mode = "w"
        elif filename.endswith(".tar.gz"):
            mode = "w:gz"
        else:
            raise Exception("Unknown tar format '%s'" % filename)
        # make tarball from source
        with tarfile.open(filename, mode) as tar:
            tar.add(container.getSourceDir(),
                    arcname=os.path.basename(container.getSourceDir()))

    ##
    # @brief Write a control file for building the debian package.
    #
    # @return None
项目:factotum    作者:Denubis    | 项目源码 | 文件源码
def safeInstall():
    FACTORIOPATH = getFactorioPath()

    try:
        if not os.path.isdir("%s" % (FACTORIOPATH) ):       

            if os.access("%s/.." % (FACTORIOPATH), os.W_OK):
                os.mkdir(FACTORIOPATH, 0o777)
            else:
                subprocess.call(['sudo', 'mkdir', '-p', FACTORIOPATH])
                subprocess.call(['sudo', 'chown', getpass.getuser(), FACTORIOPATH])


            os.mkdir(os.path.join(FACTORIOPATH, "saves"))
            os.mkdir(os.path.join(FACTORIOPATH, "config"))
            with open("%s/.bashrc" % (os.path.expanduser("~")), "r+") as bashrc:
                lines = bashrc.read()

                if lines.find("eval \"$(_FACTOTUM_COMPLETE=source factotum)\"\n") == -1:
                    bashrc.write("eval \"$(_FACTOTUM_COMPLETE=source factotum)\"\n")
                    print("You'll want to restart your shell for command autocompletion. Tab is your friend.")
        updateFactorio()
    except IOError as e:
        print("Cannot make %s. Please check permissions. Error %s" % (FACTORIOPATH, e))
        sys.exit(1)
项目:Gank-Alfred-Workflow    作者:hujiaweibujidao    | 项目源码 | 文件源码
def convert_image(inpath, outpath, size):
    """Convert an image file using `sips`.

    Args:
        inpath (str): Path of source file.
        outpath (str): Path to destination file.
        size (int): Width and height of destination image in pixels.

    Raises:
        RuntimeError: Raised if `sips` exits with non-zero status.
    """
    cmd = [
        b'sips',
        b'-z', b'{0}'.format(size), b'{0}'.format(size),
        inpath,
        b'--out', outpath]
    # log().debug(cmd)
    with open(os.devnull, 'w') as pipe:
        retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)

    if retcode != 0:
        raise RuntimeError('sips exited with {0}'.format(retcode))
项目:Gank-Alfred-Workflow    作者:hujiaweibujidao    | 项目源码 | 文件源码
def convert_image(inpath, outpath, size):
    """Convert an image file using `sips`.

    Args:
        inpath (str): Path of source file.
        outpath (str): Path to destination file.
        size (int): Width and height of destination image in pixels.

    Raises:
        RuntimeError: Raised if `sips` exits with non-zero status.
    """
    cmd = [
        b'sips',
        b'-z', b'{0}'.format(size), b'{0}'.format(size),
        inpath,
        b'--out', outpath]
    # log().debug(cmd)
    with open(os.devnull, 'w') as pipe:
        retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)

    if retcode != 0:
        raise RuntimeError('sips exited with {0}'.format(retcode))
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def setUp(self):
        self.dir = tempfile.mkdtemp()
        setup = os.path.join(self.dir, 'setup.py')
        f = open(setup, 'w')
        f.write(SETUP_PY)
        f.close()
        self.old_cwd = os.getcwd()
        os.chdir(self.dir)

        self.old_enable_site = site.ENABLE_USER_SITE
        self.old_file = easy_install_pkg.__file__
        self.old_base = site.USER_BASE
        site.USER_BASE = tempfile.mkdtemp()
        self.old_site = site.USER_SITE
        site.USER_SITE = tempfile.mkdtemp()
        easy_install_pkg.__file__ = site.USER_SITE
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def make_trivial_sdist(dist_path, setup_py):
    """Create a simple sdist tarball at dist_path, containing just a
    setup.py, the contents of which are provided by the setup_py string.
    """

    setup_py_file = tarfile.TarInfo(name='setup.py')
    try:
        # Python 3 (StringIO gets converted to io module)
        MemFile = BytesIO
    except AttributeError:
        MemFile = StringIO
    setup_py_bytes = MemFile(setup_py.encode('utf-8'))
    setup_py_file.size = len(setup_py_bytes.getvalue())
    dist = tarfile.open(dist_path, 'w:gz')
    try:
        dist.addfile(setup_py_file, fileobj=setup_py_bytes)
    finally:
        dist.close()
项目:pip-update-requirements    作者:alanhamlett    | 项目源码 | 文件源码
def read_text_file(filename):
    """Return the contents of *filename*.

    Try to decode the file contents with utf-8, the preferred system encoding
    (e.g., cp1252 on some Windows machines), and latin1, in that order.
    Decoding a byte string with latin1 will never raise an error. In the worst
    case, the returned string will contain some garbage characters.

    """
    with open(filename, 'rb') as fp:
        data = fp.read()

    encodings = ['utf-8', locale.getpreferredencoding(False), 'latin1']
    for enc in encodings:
        try:
            data = data.decode(enc)
        except UnicodeDecodeError:
            continue
        break

    assert type(data) != bytes  # Latin1 should have worked.
    return data
项目:PyCIRCLeanMail    作者:CIRCL    | 项目源码 | 文件源码
def _tar(self):
        '''Tar processor'''
        archive = tarfile.open(mode='r', fileobj=self.cur_attachment.file_obj)
        loc_attach = []
        for subfile in archive.getmembers():
            f = archive.extractfile(subfile)
            if f is None:
                # Directory
                continue
            try:
                cur_file = File(f.read(), subfile.name)
                self.process_payload(cur_file)
                loc_attach.append(self.cur_attachment)
            except Exception:
                self.cur_attachment.make_dangerous()
                return self.cur_attachment
        return loc_attach
项目:deligan    作者:val-iisc    | 项目源码 | 文件源码
def maybe_download_and_extract(data_dir, url='http://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz'):
    if not os.path.exists(os.path.join(data_dir, 'cifar-10-batches-py')):
        if not os.path.exists(data_dir):
            os.makedirs(data_dir)
        filename = url.split('/')[-1]
        filepath = os.path.join(data_dir, filename)
        if not os.path.exists(filepath):
            def _progress(count, block_size, total_size):
                sys.stdout.write('\r>> Downloading %s %.1f%%' % (filename,
                    float(count * block_size) / float(total_size) * 100.0))
                sys.stdout.flush()
            filepath, _ = urllib.request.urlretrieve(url, filepath, _progress)
            print()
            statinfo = os.stat(filepath)
            print('Successfully downloaded', filename, statinfo.st_size, 'bytes.')
            tarfile.open(filepath, 'r:gz').extractall(data_dir)
项目:otest    作者:rohe    | 项目源码 | 文件源码
def create_tar_archive(issuer, test_profile):
    mk_tar_dir(issuer, test_profile)

    wd = os.getcwd()
    _dir = os.path.join(wd, "tar", issuer)
    os.chdir(_dir)

    tar = tarfile.open("{}.tar".format(test_profile), "w")

    for item in os.listdir(test_profile):
        if item.startswith("."):
            continue

        fn = os.path.join(test_profile, item)

        if os.path.isfile(fn):
            tar.add(fn)
    tar.close()
    os.chdir(wd)
项目:Semi_Supervised_GAN    作者:ChunyuanLI    | 项目源码 | 文件源码
def maybe_download_and_extract(data_dir, url='http://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz'):
    if not os.path.exists(os.path.join(data_dir, 'cifar-10-batches-py')):
        if not os.path.exists(data_dir):
            os.makedirs(data_dir)
        filename = url.split('/')[-1]
        filepath = os.path.join(data_dir, filename)
        if not os.path.exists(filepath):
            def _progress(count, block_size, total_size):
                sys.stdout.write('\r>> Downloading %s %.1f%%' % (filename,
                    float(count * block_size) / float(total_size) * 100.0))
                sys.stdout.flush()
            filepath, _ = urllib.request.urlretrieve(url, filepath, _progress)
            print()
            statinfo = os.stat(filepath)
            print('Successfully downloaded', filename, statinfo.st_size, 'bytes.')
            tarfile.open(filepath, 'r:gz').extractall(data_dir)
项目:poco    作者:shiwaforce    | 项目源码 | 文件源码
def unpack(self):
        EnvironmentUtils.check_docker()
        poco_file = None
        for file in next(os.walk(os.getcwd()))[2]:
            if file.endswith(".poco"):
                poco_file = file
                tar = tarfile.open(file)
                tar.extractall()
                tar.close()
                break
        if poco_file is None:
            ColorPrint.exit_after_print_messages(message=".poco file not exists in current directory")

        cmd = list()
        cmd.append("docker")
        cmd.append("load")
        cmd.append("-i")
        cmd.append(poco_file.rstrip("poco") + "tar")
        self.run_script(cmd=cmd)
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def set_admin_token(admin_token='None'):
    """Set admin token according to deployment config or use a randomly
       generated token if none is specified (default).
    """
    if admin_token != 'None':
        log('Configuring Keystone to use a pre-configured admin token.')
        token = admin_token
    else:
        log('Configuring Keystone to use a random admin token.')
        if os.path.isfile(STORED_TOKEN):
            msg = 'Loading a previously generated' \
                  ' admin token from %s' % STORED_TOKEN
            log(msg)
            with open(STORED_TOKEN, 'r') as f:
                token = f.read().strip()
        else:
            token = pwgen(length=64)
            with open(STORED_TOKEN, 'w') as out:
                out.write('%s\n' % token)
    return(token)
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def set_admin_token(admin_token='None'):
    """Set admin token according to deployment config or use a randomly
       generated token if none is specified (default).
    """
    if admin_token != 'None':
        log('Configuring Keystone to use a pre-configured admin token.')
        token = admin_token
    else:
        log('Configuring Keystone to use a random admin token.')
        if os.path.isfile(STORED_TOKEN):
            msg = 'Loading a previously generated' \
                  ' admin token from %s' % STORED_TOKEN
            log(msg)
            with open(STORED_TOKEN, 'r') as f:
                token = f.read().strip()
        else:
            token = pwgen(length=64)
            with open(STORED_TOKEN, 'w') as out:
                out.write('%s\n' % token)
    return(token)
项目:oio-sds-utils    作者:open-io    | 项目源码 | 文件源码
def maybe_download_and_extract():
  """Download and extract model tar file."""
  dest_directory = FLAGS.model_dir
  if not os.path.exists(dest_directory):
    os.makedirs(dest_directory)
  filename = DATA_URL.split('/')[-1]
  filepath = os.path.join(dest_directory, filename)
  if not os.path.exists(filepath):
    def _progress(count, block_size, total_size):
      sys.stdout.write('\r>> Downloading %s %.1f%%' % (
          filename, float(count * block_size) / float(total_size) * 100.0))
      sys.stdout.flush()
    filepath, _ = urllib.request.urlretrieve(DATA_URL, filepath, _progress)
    print()
    statinfo = os.stat(filepath)
    print('Successfully downloaded', filename, statinfo.st_size, 'bytes.')
  tarfile.open(filepath, 'r:gz').extractall(dest_directory)
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def add_tar_file(self, x, tar):
        """
        Add a file to the tar archive. Transform symlinks into files if the files lie out of the project tree.
        """
        p = self.get_tar_path(x)
        tinfo = tar.gettarinfo(name=p, arcname=self.get_tar_prefix() + '/' + x.path_from(self.base_path))
        tinfo.uid   = 0
        tinfo.gid   = 0
        tinfo.uname = 'root'
        tinfo.gname = 'root'

        fu = None
        try:
            fu = open(p, 'rb')
            tar.addfile(tinfo, fileobj=fu)
        finally:
            if fu:
                fu.close()
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def make_tarfile(self, filename, files, **kw):
        if kw.get('add_to_package', True):
            self.files.append(filename)

        with tarfile.open(filename, TARFORMAT) as tar:
            endname = os.path.split(filename)[-1]
            endname = endname.split('.')[0] + '/'
            for x in files:
                tarinfo = tar.gettarinfo(x, x)
                tarinfo.uid   = tarinfo.gid   = 0
                tarinfo.uname = tarinfo.gname = 'root'
                tarinfo.size = os.stat(x).st_size

                # TODO - more archive creation options?
                if kw.get('bare', True):
                    tarinfo.name = os.path.split(x)[1]
                else:
                    tarinfo.name = endname + x # todo, if tuple, then..
                Logs.debug("adding %r to %s" % (tarinfo.name, filename))
                with open(x, 'rb') as f:
                    tar.addfile(tarinfo, f)
        Logs.info('Created %s' % filename)
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def add_tar_file(self, x, tar):
        """
        Add a file to the tar archive. Transform symlinks into files if the files lie out of the project tree.
        """
        p = self.get_tar_path(x)
        tinfo = tar.gettarinfo(name=p, arcname=self.get_tar_prefix() + '/' + x.path_from(self.base_path))
        tinfo.uid   = 0
        tinfo.gid   = 0
        tinfo.uname = 'root'
        tinfo.gname = 'root'

        fu = None
        try:
            fu = open(p, 'rb')
            tar.addfile(tinfo, fileobj=fu)
        finally:
            if fu:
                fu.close()
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def make_tarfile(self, filename, files, **kw):
        if kw.get('add_to_package', True):
            self.files.append(filename)

        with tarfile.open(filename, TARFORMAT) as tar:
            endname = os.path.split(filename)[-1]
            endname = endname.split('.')[0] + '/'
            for x in files:
                tarinfo = tar.gettarinfo(x, x)
                tarinfo.uid   = tarinfo.gid   = 0
                tarinfo.uname = tarinfo.gname = 'root'
                tarinfo.size = os.stat(x).st_size

                # TODO - more archive creation options?
                if kw.get('bare', True):
                    tarinfo.name = os.path.split(x)[1]
                else:
                    tarinfo.name = endname + x # todo, if tuple, then..
                Logs.debug("adding %r to %s" % (tarinfo.name, filename))
                with open(x, 'rb') as f:
                    tar.addfile(tarinfo, f)
        Logs.info('Created %s' % filename)
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def add_tar_file(self, x, tar):
        """
        Add a file to the tar archive. Transform symlinks into files if the files lie out of the project tree.
        """
        p = self.get_tar_path(x)
        tinfo = tar.gettarinfo(name=p, arcname=self.get_tar_prefix() + '/' + x.path_from(self.base_path))
        tinfo.uid   = 0
        tinfo.gid   = 0
        tinfo.uname = 'root'
        tinfo.gname = 'root'

        fu = None
        try:
            fu = open(p, 'rb')
            tar.addfile(tinfo, fileobj=fu)
        finally:
            if fu:
                fu.close()
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def make_tarfile(self, filename, files, **kw):
        if kw.get('add_to_package', True):
            self.files.append(filename)

        with tarfile.open(filename, TARFORMAT) as tar:
            endname = os.path.split(filename)[-1]
            endname = endname.split('.')[0] + '/'
            for x in files:
                tarinfo = tar.gettarinfo(x, x)
                tarinfo.uid   = tarinfo.gid   = 0
                tarinfo.uname = tarinfo.gname = 'root'
                tarinfo.size = os.stat(x).st_size

                # TODO - more archive creation options?
                if kw.get('bare', True):
                    tarinfo.name = os.path.split(x)[1]
                else:
                    tarinfo.name = endname + x # todo, if tuple, then..
                Logs.debug("adding %r to %s" % (tarinfo.name, filename))
                with open(x, 'rb') as f:
                    tar.addfile(tarinfo, f)
        Logs.info('Created %s' % filename)
项目:bob.bio.base    作者:bioidiap    | 项目源码 | 文件源码
def open_compressed(filename, open_flag='r', compression_type='bz2'):
  """Opens a compressed HDF5File with the given opening flags.
  For the 'r' flag, the given compressed file will be extracted to a local space.
  For 'w', an empty HDF5File is created.
  In any case, the opened HDF5File is returned, which needs to be closed using the close_compressed() function.
  """
  # create temporary HDF5 file name
  hdf5_file_name = tempfile.mkstemp('.hdf5', 'bob_')[1]

  if open_flag == 'r':
    # extract the HDF5 file from the given file name into a temporary file name
    tar = tarfile.open(filename, mode="r:" + compression_type)
    memory_file = tar.extractfile(tar.next())
    real_file = open(hdf5_file_name, 'wb')
    real_file.write(memory_file.read())
    del memory_file
    real_file.close()
    tar.close()

  return bob.io.base.HDF5File(hdf5_file_name, open_flag)
项目:bob.bio.base    作者:bioidiap    | 项目源码 | 文件源码
def close_compressed(filename, hdf5_file, compression_type='bz2', create_link=False):
  """Closes the compressed hdf5_file that was opened with open_compressed.
  When the file was opened for writing (using the 'w' flag in open_compressed), the created HDF5 file is compressed into the given file name.
  To be able to read the data using the real tools, a link with the correct extension might is created, when create_link is set to True.
  """
  hdf5_file_name = hdf5_file.filename
  is_writable = hdf5_file.writable
  hdf5_file.close()

  if is_writable:
    # create compressed tar file
    tar = tarfile.open(filename, mode="w:" + compression_type)
    tar.add(hdf5_file_name, os.path.basename(filename))
    tar.close()

  if create_link:
    extension = {'': '.tar', 'bz2': '.tar.bz2',
                 'gz': 'tar.gz'}[compression_type]
    link_file = filename + extension
    if not os.path.exists(link_file):
      os.symlink(os.path.basename(filename), link_file)

  # clean up locally generated files
  os.remove(hdf5_file_name)
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def read_text_file(filename):
    """Return the contents of *filename*.

    Try to decode the file contents with utf-8, the preferred system encoding
    (e.g., cp1252 on some Windows machines), and latin1, in that order.
    Decoding a byte string with latin1 will never raise an error. In the worst
    case, the returned string will contain some garbage characters.

    """
    with open(filename, 'rb') as fp:
        data = fp.read()

    encodings = ['utf-8', locale.getpreferredencoding(False), 'latin1']
    for enc in encodings:
        try:
            data = data.decode(enc)
        except UnicodeDecodeError:
            continue
        break

    assert type(data) != bytes  # Latin1 should have worked.
    return data
项目:rules_docker    作者:bazelbuild    | 项目源码 | 文件源码
def add_pkg_metadata(self, metadata_tar, deb):
    try:
      with tarfile.open(metadata_tar) as tar:
        # Metadata is expected to be in a file.
        control_file_member = filter(lambda f: os.path.basename(f.name) == TarFile.PKG_METADATA_FILE, tar.getmembers())
        if not control_file_member:
           raise self.DebError(deb + ' does not Metadata File!')
        control_file = tar.extractfile(control_file_member[0])
        metadata = ''.join(control_file.readlines())
        destination_file = os.path.join(TarFile.DPKG_STATUS_DIR,
                                        TarFile.parse_pkg_name(metadata, deb))
        with self.write_temp_file(data=metadata) as metadata_file:
          self.add_file(metadata_file, destination_file)
    except (KeyError, TypeError) as e:
      raise self.DebError(deb + ' contains invalid Metadata! Exeception {0}'.format(e))
    except Exception as e:
      raise self.DebError('Unknown Exception {0}. Please report an issue at'
                          ' github.com/bazelbuild/rules_docker.'.format(e))
项目:rules_docker    作者:bazelbuild    | 项目源码 | 文件源码
def create_bundle(output, tag_to_config, diffid_to_blobsum,
                  blobsum_to_unzipped, blobsum_to_zipped, blobsum_to_legacy):
  """Creates a Docker image from a list of layers.

  Args:
    output: the name of the docker image file to create.
    layers: the layers (tar files) to join to the image.
    tag_to_layer: a map from docker_name.Tag to the layer id it references.
    layer_to_tags: a map from the name of the layer tarball as it appears
            in our archives to the list of tags applied to it.
  """

  with tarfile.open(output, 'w') as tar:
    def add_file(filename, contents):
      info = tarfile.TarInfo(filename)
      info.size = len(contents)
      tar.addfile(tarinfo=info, fileobj=cStringIO.StringIO(contents))

    tag_to_image = {}
    for (tag, config) in six.iteritems(tag_to_config):
      tag_to_image[tag] = FromParts(
          config, diffid_to_blobsum,
          blobsum_to_unzipped, blobsum_to_zipped, blobsum_to_legacy)

    v2_2_save.multi_image_tarball(tag_to_image, tar)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def extract_tarfile(archive_name, destpath):
    "Unpack a tar archive, optionally compressed"
    archive = tarfile.open(archive_name)
    archive.extractall(destpath)
项目:GELUs    作者:hendrycks    | 项目源码 | 文件源码
def load_batch(fpath):
    with open(fpath, 'rb') as f:
        d = pickle.load(f, encoding='latin1')
    data = d["data"]
    labels = d["labels"]
    return data, labels
项目:GELUs    作者:hendrycks    | 项目源码 | 文件源码
def untar(fname):
    if (fname.endswith("tar.gz")):
        tar = tarfile.open(fname)
        tar.extractall()
        tar.close()
        print("File Extracted in Current Directory")
    else:
        print("Not a tar.gz file: '%s '" % sys.argv[0])
项目:confusion    作者:abhimanyudubey    | 项目源码 | 文件源码
def write_set_file(fout, labels):
    with open(fout, 'w+') as f:
        for label in labels:
            f.write('%s/%s %s\n' % (cwd, label[0], label[1]))

# Images are ordered by species, so shuffle them
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def copy_stream(self, instream, outfile, encoding=None):
        assert not os.path.isdir(outfile)
        self.ensure_dir(os.path.dirname(outfile))
        logger.info('Copying stream %s to %s', instream, outfile)
        if not self.dry_run:
            if encoding is None:
                outstream = open(outfile, 'wb')
            else:
                outstream = codecs.open(outfile, 'w', encoding=encoding)
            try:
                shutil.copyfileobj(instream, outstream)
            finally:
                outstream.close()
        self.record_as_written(outfile)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def write_binary_file(self, path, data):
        self.ensure_dir(os.path.dirname(path))
        if not self.dry_run:
            with open(path, 'wb') as f:
                f.write(data)
        self.record_as_written(path)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def write_text_file(self, path, data, encoding):
        self.ensure_dir(os.path.dirname(path))
        if not self.dry_run:
            with open(path, 'wb') as f:
                f.write(data.encode(encoding))
        self.record_as_written(path)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def _csv_open(fn, mode, **kwargs):
    if sys.version_info[0] < 3:
        mode += 'b'
    else:
        kwargs['newline'] = ''
    return open(fn, mode, **kwargs)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def file_contents(filename):
    with open(filename, 'rb') as fp:
        return fp.read().decode('utf-8')
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def get_terminal_size():
    """Returns a tuple (x, y) representing the width(x) and the height(x)
    in characters of the terminal window."""
    def ioctl_GWINSZ(fd):
        try:
            import fcntl
            import termios
            import struct
            cr = struct.unpack(
                'hh',
                fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234')
            )
        except:
            return None
        if cr == (0, 0):
            return None
        return cr
    cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
    if not cr:
        try:
            fd = os.open(os.ctermid(), os.O_RDONLY)
            cr = ioctl_GWINSZ(fd)
            os.close(fd)
        except:
            pass
    if not cr:
        cr = (os.environ.get('LINES', 25), os.environ.get('COLUMNS', 80))
    return int(cr[1]), int(cr[0])
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def unzip_file(filename, location, flatten=True):
    """
    Unzip the file (with path `filename`) to the destination `location`.  All
    files are written based on system defaults and umask (i.e. permissions are
    not preserved), except that regular file members with any execute
    permissions (user, group, or world) have "chmod +x" applied after being
    written. Note that for windows, any execute changes using os.chmod are
    no-ops per the python docs.
    """
    ensure_dir(location)
    zipfp = open(filename, 'rb')
    try:
        zip = zipfile.ZipFile(zipfp, allowZip64=True)
        leading = has_leading_dir(zip.namelist()) and flatten
        for info in zip.infolist():
            name = info.filename
            data = zip.read(name)
            fn = name
            if leading:
                fn = split_leading_dir(name)[1]
            fn = os.path.join(location, fn)
            dir = os.path.dirname(fn)
            if fn.endswith('/') or fn.endswith('\\'):
                # A directory
                ensure_dir(fn)
            else:
                ensure_dir(dir)
                fp = open(fn, 'wb')
                try:
                    fp.write(data)
                finally:
                    fp.close()
                    mode = info.external_attr >> 16
                    # if mode and regular file and any execute permissions for
                    # user/group/world?
                    if mode and stat.S_ISREG(mode) and mode & 0o111:
                        # make dest file have execute for user/group/world
                        # (chmod +x) no-op on windows per python docs
                        os.chmod(fn, (0o777 - current_umask() | 0o111))
    finally:
        zipfp.close()
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def unpack_zipfile(filename, extract_dir, progress_filter=default_filter):
    """Unpack zip `filename` to `extract_dir`

    Raises ``UnrecognizedFormat`` if `filename` is not a zipfile (as determined
    by ``zipfile.is_zipfile()``).  See ``unpack_archive()`` for an explanation
    of the `progress_filter` argument.
    """

    if not zipfile.is_zipfile(filename):
        raise UnrecognizedFormat("%s is not a zip file" % (filename,))

    with ContextualZipFile(filename) as z:
        for info in z.infolist():
            name = info.filename

            # don't extract absolute paths or ones with .. in them
            if name.startswith('/') or '..' in name.split('/'):
                continue

            target = os.path.join(extract_dir, *name.split('/'))
            target = progress_filter(name, target)
            if not target:
                continue
            if name.endswith('/'):
                # directory
                ensure_directory(target)
            else:
                # file
                ensure_directory(target)
                data = z.read(info.filename)
                with open(target, 'wb') as f:
                    f.write(data)
            unix_attributes = info.external_attr >> 16
            if unix_attributes:
                os.chmod(target, unix_attributes)