Python shutil 模块,unpack_archive() 实例源码

我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用shutil.unpack_archive()

项目:dcss_single_cell    作者:srmcc    | 项目源码 | 文件源码
def wishbone_my_setup(setup_dir):
    # install GSEA, diffusion components, and download data.
    tools_dir = setup_dir + '/tools'
    if not os.path.exists(tools_dir + '/DiffusionGeometry/'):
        shutil.unpack_archive(tools_dir + '/DiffusionGeometry.zip', tools_dir +
                              '/DiffusionGeometry/')
    if not os.path.exists(tools_dir + '/mouse/'):
        shutil.unpack_archive(tools_dir + '/mouse_gene_sets.tar.gz', tools_dir)
    if not os.path.exists(tools_dir + '/human/'):
        shutil.unpack_archive(tools_dir + '/human_gene_sets.tar.gz', tools_dir)
    if not os.path.exists( setup_dir +'/data/GSE72857_umitab.txt.gz'):
        # downloads mouse UMI from GSE72857 Transcriptional heterogeneity and lineage commitment in myeloid progenitors [single cell RNA-seq]
        os.system("wget -m -nH -nd -P "+ setup_dir + "/data/ ftp://ftp.ncbi.nlm.nih.gov/geo/series/GSE72nnn/GSE72857/suppl/GSE72857%5Fumitab%2Etxt%2Egz")
    x=pd.read_csv(setup_dir + '/data/GSE72857_umitab.txt.gz', sep = '\t', compression="gzip")
    y=pd.read_csv(setup_dir + '/data/sample_scseq_data.csv', index_col=[0])
    scdata_raw= x.T.loc[y.index]
    scdata_raw = wishbone.wb.SCData(scdata_raw.astype('float'), data_type='sc-seq')
    return(scdata_raw)
项目:indy-node    作者:hyperledger    | 项目源码 | 文件源码
def _restore_from_backup(self, version):
        logger.debug('Restoring from backup for {}'.format(version))
        for file_path in self.files_to_preserve:
            try:
                shutil.copy2(os.path.join(self.backup_target, file_path),
                             os.path.join(self.tmp_dir, file_path))
            except IOError as e:
                logger.warning('Copying {} failed due to {}'
                               .format(file_path, e))
        shutil.unpack_archive(self._backup_name_ext(
            version), self.backup_target, self.backup_format)
        for file_path in self.files_to_preserve:
            try:
                shutil.copy2(os.path.join(self.tmp_dir, file_path),
                             os.path.join(self.backup_target, file_path))
            except IOError as e:
                logger.warning('Copying {} failed due to {}'
                               .format(file_path, e))
        shutil.rmtree(self.tmp_dir, ignore_errors=True)
项目:UPBGE-CommunityAddon    作者:elmeunick9    | 项目源码 | 文件源码
def execute(self, context):
        name = os.path.basename(self.filepath)

        #TODO: put on another file and module

        def checkProjectFile(path):
            if os.path.basename(path) != "project.json": return False
            if not os.path.isfile(path): return False

            utils.loadProjectFile(path)

            if not 'bge-project' in utils.project_data: return False

            return True

        def getMainBlend(path):
            path = os.path.dirname(path) + os.sep + "project" +  os.sep + "main.blend"
            if os.path.isfile(path): return path

        #If we have a zip
        if os.path.isfile(self.filepath):
            for x in [".zip", ".tar", ".gztar", ".bztar", ".xztar"]:
                if self.filepath.endswith(x):
                    tpath = tempfile.gettempdir() + os.sep + os.path.basename(self.filepath)[:-len(x)] + os.sep
                    shutil.unpack_archive(self.filepath, tpath)
                    self.filepath = tpath
                    print("Extracted to: ", tpath)

        #Once we have a folder...
        list=[self.filepath, self.filepath + "project.json", os.path.dirname(self.filepath) + os.sep + "project.json", os.path.dirname(self.filepath) + os.sep + "../" + "project.json"]

        endpath=None
        for path in list:
            if checkProjectFile(path): endpath=getMainBlend(path)

        if endpath==None:
            self.report({'ERROR_INVALID_INPUT'}, "Error loading project, project file not found.")
            return {'CANCELLED'}

        bpy.ops.wm.open_mainfile(filepath=endpath)
        return {'FINISHED'}
项目:ParlAI    作者:facebookresearch    | 项目源码 | 文件源码
def untar(path, fname, deleteTar=True):
    """Unpacks the given archive file to the same directory, then (by default)
    deletes the archive file.
    """
    print('unpacking ' + fname)
    fullpath = os.path.join(path, fname)
    shutil.unpack_archive(fullpath, path)
    if deleteTar:
        os.remove(fullpath)
项目:polyglot-server    作者:MontrealCorpusTools    | 项目源码 | 文件源码
def extract_neo4j(database_name, archive_path):
    database_directory = os.path.join(settings.POLYGLOT_DATA_DIRECTORY, database_name)
    neo4j_directory = os.path.join(database_directory, 'neo4j')
    if os.path.exists(neo4j_directory):
        return False
    shutil.unpack_archive(archive_path, database_directory)
    for d in os.listdir(database_directory):
        if d.startswith('neo4j'):
            os.rename(os.path.join(database_directory, d), neo4j_directory)
    return True
项目:polyglot-server    作者:MontrealCorpusTools    | 项目源码 | 文件源码
def extract_influxdb(database_name, archive_path):
    database_directory = os.path.join(settings.POLYGLOT_DATA_DIRECTORY, database_name)
    influxdb_directory = os.path.join(database_directory, 'influxdb')
    if os.path.exists(influxdb_directory):
        return False
    shutil.unpack_archive(archive_path, database_directory)
    for d in os.listdir(database_directory):
        if d.startswith('influxdb'):
            os.rename(os.path.join(database_directory, d), influxdb_directory)
    return True
项目:zeex    作者:zbarge    | 项目源码 | 文件源码
def zipfile_unzip(file_path=None, extract_dir=None, **filedialog_kwargs):
    if file_path is None:
        file_path = QtGui.QFileDialog.getOpenFileName(**filedialog_kwargs)[0]
    if extract_dir is None:
        extract_dir = os.path.dirname(file_path)
    shutil.unpack_archive(file_path, extract_dir=extract_dir)
项目:taktyk    作者:kosior    | 项目源码 | 文件源码
def unpack_archive(file, extract_dir, format_, msg):
    try:
        shutil.unpack_archive(file, extract_dir=extract_dir, format=format_)
    except (ValueError, OSError) as err:
        logging.debug(traceback.format_exc())
        logging.critical(err)
        logging.critical(msg)
        raise SystemExit
项目:pypi2pkgbuild    作者:anntzer    | 项目源码 | 文件源码
def _get_url_unpacked_path_or_null(url):
    parsed = urllib.parse.urlparse(url)
    if parsed.scheme == "file" and parsed.path.endswith(".whl"):
        return Path("/dev/null")
    try:
        cache_dir, packed_path = _get_url_impl(url)
    except CalledProcessError:
        return Path("/dev/null")
    if packed_path.is_file():  # pip://
        shutil.unpack_archive(str(packed_path), cache_dir.name)
    unpacked_path, = (
        path for path in Path(cache_dir.name).iterdir() if path.is_dir())
    return unpacked_path
项目:personalized-dialog    作者:chaitjo    | 项目源码 | 文件源码
def untar(fname):
    print('unpacking ' + fname)
    fullpath = os.path.join(fname)
    shutil.unpack_archive(fullpath)
    os.remove(fullpath)
项目:personalized-dialog    作者:chaitjo    | 项目源码 | 文件源码
def untar(path, fname):
    print('unpacking ' + fname)
    fullpath = os.path.join(path, fname)
    shutil.unpack_archive(fullpath, path)
    os.remove(fullpath)
项目:personalized-dialog    作者:chaitjo    | 项目源码 | 文件源码
def untar(fname):
    print('unpacking ' + fname)
    fullpath = os.path.join(fname)
    shutil.unpack_archive(fullpath)
    os.remove(fullpath)
项目:personalized-dialog    作者:chaitjo    | 项目源码 | 文件源码
def untar(fname):
    print('unpacking ' + fname)
    fullpath = os.path.join(fname)
    shutil.unpack_archive(fullpath)
    os.remove(fullpath)
项目:personalized-dialog    作者:chaitjo    | 项目源码 | 文件源码
def untar(fname):
    print('unpacking ' + fname)
    fullpath = os.path.join(fname)
    shutil.unpack_archive(fullpath)
    os.remove(fullpath)
项目:cxflow    作者:Cognexa    | 项目源码 | 文件源码
def maybe_download_and_extract(data_root: str, url: str) -> None:
    """
    Maybe download the specified file to ``data_root`` and try to unpack it with ``shutil.unpack_archive``.

    :param data_root: data root to download the files to
    :param url: url to download from
    """

    # make sure data_root exists
    os.makedirs(data_root, exist_ok=True)

    filename = os.path.basename(url)

    # check whether the archive already exists
    filepath = os.path.join(data_root, filename)
    if os.path.exists(filepath):
        logging.info('\t`%s` already exists; skipping', filepath)
        return

    # download with progressbar
    logging.info('\tdownloading %s', filepath)
    req = requests.get(url, stream=True)
    expected_size = int(req.headers.get('content-length'))
    chunk_size = 1024
    with open(filepath, 'wb') as f_out,\
         click.progressbar(req.iter_content(chunk_size=chunk_size), length=expected_size/chunk_size) as bar:
        for chunk in bar:
            if chunk:
                f_out.write(chunk)
                f_out.flush()

    # extract
    try:
        shutil.unpack_archive(filepath, data_root)
    except (shutil.ReadError, ValueError):
        logging.info('File `%s` could not be extracted by `shutil.unpack_archive`. Please process it manually.',
                     filepath)
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def execute(self, context):
        import html.parser
        import urllib.request

        remote_platforms = []

        ps = context.scene.ge_publish_settings

        # create lib folder if not already available
        lib_path = bpy.path.abspath(ps.lib_path)
        if not os.path.exists(lib_path):
            os.makedirs(lib_path)

        print("Retrieving list of platforms from blender.org...", end=" ", flush=True)

        class AnchorParser(html.parser.HTMLParser):
            def handle_starttag(self, tag, attrs):
                if tag == 'a':
                    for key, value in attrs:
                        if key == 'href' and value.startswith('blender'):
                            remote_platforms.append(value)

        url = 'http://download.blender.org/release/Blender' + bpy.app.version_string.split()[0]
        parser = AnchorParser()
        data = urllib.request.urlopen(url).read()
        parser.feed(str(data))

        print("done", flush=True)

        print("Downloading files (this will take a while depending on your internet connection speed).", flush=True)
        for i in remote_platforms:
            src = '/'.join((url, i))
            dst = os.path.join(lib_path, i)

            dst_dir = '.'.join([i for i in dst.split('.') if i not in {'zip', 'tar', 'bz2'}])
            if not os.path.exists(dst) and not os.path.exists(dst.split('.')[0]):
                print("Downloading " + src + "...", end=" ", flush=True)
                urllib.request.urlretrieve(src, dst)
                print("done", flush=True)
            else:
                print("Reusing existing file: " + dst, flush=True)

            print("Unpacking " + dst + "...", end=" ", flush=True)
            if os.path.exists(dst_dir):
                shutil.rmtree(dst_dir)
            shutil.unpack_archive(dst, dst_dir)
            print("done", flush=True)

        print("Creating platform from libs...", flush=True)
        bpy.ops.scene.publish_auto_platforms()
        return {'FINISHED'}
项目:blender-addons    作者:scorpion81    | 项目源码 | 文件源码
def execute(self, context):
        import html.parser
        import urllib.request

        remote_platforms = []

        ps = context.scene.ge_publish_settings

        # create lib folder if not already available
        lib_path = bpy.path.abspath(ps.lib_path)
        if not os.path.exists(lib_path):
            os.makedirs(lib_path)

        print("Retrieving list of platforms from blender.org...", end=" ", flush=True)

        class AnchorParser(html.parser.HTMLParser):
            def handle_starttag(self, tag, attrs):
                if tag == 'a':
                    for key, value in attrs:
                        if key == 'href' and value.startswith('blender'):
                            remote_platforms.append(value)

        url = 'http://download.blender.org/release/Blender' + bpy.app.version_string.split()[0]
        parser = AnchorParser()
        data = urllib.request.urlopen(url).read()
        parser.feed(str(data))

        print("done", flush=True)

        print("Downloading files (this will take a while depending on your internet connection speed).", flush=True)
        for i in remote_platforms:
            src = '/'.join((url, i))
            dst = os.path.join(lib_path, i)

            dst_dir = '.'.join([i for i in dst.split('.') if i not in {'zip', 'tar', 'bz2'}])
            if not os.path.exists(dst) and not os.path.exists(dst.split('.')[0]):
                print("Downloading " + src + "...", end=" ", flush=True)
                urllib.request.urlretrieve(src, dst)
                print("done", flush=True)
            else:
                print("Reusing existing file: " + dst, flush=True)

            print("Unpacking " + dst + "...", end=" ", flush=True)
            if os.path.exists(dst_dir):
                shutil.rmtree(dst_dir)
            shutil.unpack_archive(dst, dst_dir)
            print("done", flush=True)

        print("Creating platform from libs...", flush=True)
        bpy.ops.scene.publish_auto_platforms()
        return {'FINISHED'}
项目:jesse-james    作者:zeroSteiner    | 项目源码 | 文件源码
def fetch(source, destination, allow_file=False):
    """
    Fetch a group of files either from a file archive or version control
    repository.

    Supported URL schemes:
      - file
      - ftp
      - ftps
      - git
      - git+http
      - git+https
      - git+ssh
      - http
      - https

    :param str source: The source URL to retrieve.
    :param str destination: The directory into which the files should be placed.
    :param bool allow_file: Whether or not to permit the file:// URL for processing local resources.
    :return: The destination directory that was used.
    :rtype: str
    """
    source = source.strip()
    if os.path.exists(destination):
        raise ValueError('destination must not be an existing directory')

    parsed_url = urllib.parse.urlparse(source, scheme='file')
    if parsed_url.username is not None:
        # if the username is not none, then the password will be a string
        creds = Creds(parsed_url.username, parsed_url.password or '')
    else:
        creds = Creds(None, None)

    parsed_url = collections.OrderedDict(zip(('scheme', 'netloc', 'path', 'params', 'query', 'fragment'), parsed_url))
    parsed_url['netloc'] = parsed_url['netloc'].split('@', 1)[-1]
    parsed_url['scheme'] = parsed_url['scheme'].lower()


    if parsed_url['scheme'] == 'file':
        if not allow_file:
            raise RuntimeError('file: URLs are not allowed to be processed')
        tmp_path = parsed_url['path']
        if os.path.isdir(tmp_path):
            shutil.copytree(tmp_path, destination, symlinks=True)
        elif os.path.isfile(tmp_path):
            shutil.unpack_archive(tmp_path, destination)
    else:
        tmp_fd, tmp_path = tempfile.mkstemp(suffix='_' + os.path.basename(parsed_url['path']))
        os.close(tmp_fd)
        tmp_file = open(tmp_path, 'wb')
        try:
            _fetch_remote(source, destination, parsed_url, creds, tmp_file, tmp_path)
            if os.stat(tmp_path).st_size:
                shutil.unpack_archive(tmp_path, destination)
        finally:
            tmp_file.close()
            os.remove(tmp_path)
    return