Python zipfile 模块,ZipFile() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用zipfile.ZipFile()

项目:django-magic-cards    作者:pbaranay    | 项目源码 | 文件源码
def fetch_data():
    try:
        r = requests.get(MTG_JSON_URL)
    except requests.ConnectionError:
        r = requests.get(FALLBACK_MTG_JSON_URL)
    with closing(r), zipfile.ZipFile(io.BytesIO(r.content)) as archive:
        unzipped_files = archive.infolist()
        if len(unzipped_files) != 1:
            raise RuntimeError("Found an unexpected number of files in the MTGJSON archive.")
        data = archive.read(archive.infolist()[0])
    decoded_data = data.decode('utf-8')
    sets_data = json.loads(decoded_data)
    return sets_data
项目:quizbot-2017    作者:pycontw    | 项目源码 | 文件源码
def generate_info():
    tickets_archive_path = ROOT_DIR_PATH.joinpath('tickets.zip')
    ensure_data_file(tickets_archive_path, DATA_FILE_INFO['TICKETS_URL'])

    with zipfile.ZipFile(str(tickets_archive_path)) as zf:
        for name in zf.namelist():
            stem, ext = os.path.splitext(name)
            if ext != '.csv':
                continue
            with zf.open(name) as f:
                # Zipfile only opens file in binary mode, but csv only accepts
                # text files, so we need to wrap this.
                # See <https://stackoverflow.com/questions/5627954>.
                textfile = io.TextIOWrapper(f, encoding='utf8', newline='')
                for row in csv.DictReader(textfile):
                    yield Registration(row)
项目:AutoML5    作者:djajetic    | 项目源码 | 文件源码
def zipdir(archivename, basedir):
    '''Zip directory, from J.F. Sebastian http://stackoverflow.com/'''
    assert os.path.isdir(basedir)
    with closing(ZipFile(archivename, "w", ZIP_DEFLATED)) as z:
        for root, dirs, files in os.walk(basedir):
            #NOTE: ignore empty directories
            for fn in files:
                if fn[-4:]!='.zip':
                    absfn = os.path.join(root, fn)
                    zfn = absfn[len(basedir)+len(os.sep):] #XXX: relative path
                    z.write(absfn, zfn)

# ================ Inventory input data and create data structure =================
项目:distributional_perspective_on_RL    作者:Kiwoo    | 项目源码 | 文件源码
def pickle_load(path, compression=False):
    """Unpickle a possible compressed pickle.

    Parameters
    ----------
    path: str
        path to the output file
    compression: bool
        if true assumes that pickle was compressed when created and attempts decompression.

    Returns
    -------
    obj: object
        the unpickled object
    """

    if compression:
        with zipfile.ZipFile(path, "r", compression=zipfile.ZIP_DEFLATED) as myzip:
            with myzip.open("data") as f:
                return pickle.load(f)
    else:
        with open(path, "rb") as f:
            return pickle.load(f)
项目:gimel    作者:Alephbet    | 项目源码 | 文件源码
def prepare_zip():
    from pkg_resources import resource_filename as resource
    from config import config
    from json import dumps
    logger.info('creating/updating gimel.zip')
    with ZipFile('gimel.zip', 'w', ZIP_DEFLATED) as zipf:
        info = ZipInfo('config.json')
        info.external_attr = 0o664 << 16
        zipf.writestr(info, dumps(config))
        zipf.write(resource('gimel', 'config.py'), 'config.py')
        zipf.write(resource('gimel', 'gimel.py'), 'gimel.py')
        zipf.write(resource('gimel', 'logger.py'), 'logger.py')
        for root, dirs, files in os.walk(resource('gimel', 'vendor')):
            for file in files:
                real_file = os.path.join(root, file)
                relative_file = os.path.relpath(real_file,
                                                resource('gimel', ''))
                zipf.write(real_file, relative_file)
项目:numerai    作者:gansanay    | 项目源码 | 文件源码
def download_current_dataset(self, dest_path='.', unzip=True):
        now = datetime.now().strftime('%Y%m%d')
        file_name = 'numerai_dataset_{0}.zip'.format(now)
        dest_file_path ='{0}/{1}'.format(dest_path, file_name)

        r = requests.get(self._dataset_url)
        if r.status_code!=200:
            return r.status_code

        with open(dest_file_path, "wb") as fp:
            for byte in r.content:
                fp.write(byte)

        if unzip:
            with zipfile.ZipFile(dest_file_path, "r") as z:
                z.extractall(dest_path)
        return r.status_code
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def __init__(self, file):
        self.file = file
        if file == '':
            self.infile = sys.stdin
        elif file.lower().startswith('http://') or file.lower().startswith('https://'):
            try:
                if sys.hexversion >= 0x020601F0:
                    self.infile = urllib23.urlopen(file, timeout=5)
                else:
                    self.infile = urllib23.urlopen(file)
            except urllib23.HTTPError:
                print('Error accessing URL %s' % file)
                print(sys.exc_info()[1])
                sys.exit()
        elif file.lower().endswith('.zip'):
            try:
                self.zipfile = zipfile.ZipFile(file, 'r')
                self.infile = self.zipfile.open(self.zipfile.infolist()[0], 'r', C2BIP3('infected'))
            except:
                print('Error opening file %s' % file)
                print(sys.exc_info()[1])
                sys.exit()
        else:
            try:
                self.infile = open(file, 'rb')
            except:
                print('Error opening file %s' % file)
                print(sys.exc_info()[1])
                sys.exit()
        self.ungetted = []
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def test_pydist():
    """Make sure pydist.json exists and validates against our schema."""
    # XXX this test may need manual cleanup of older wheels

    import jsonschema

    def open_json(filename):
        return json.loads(open(filename, 'rb').read().decode('utf-8'))

    pymeta_schema = open_json(resource_filename('wheel.test',
                                                'pydist-schema.json'))
    valid = 0
    for dist in ("simple.dist", "complex-dist"):
        basedir = pkg_resources.resource_filename('wheel.test', dist)
        for (dirname, subdirs, filenames) in os.walk(basedir):
            for filename in filenames:
                if filename.endswith('.whl'):
                    whl = ZipFile(os.path.join(dirname, filename))
                    for entry in whl.infolist():
                        if entry.filename.endswith('/metadata.json'):
                            pymeta = json.loads(whl.read(entry).decode('utf-8'))
                            jsonschema.validate(pymeta, pymeta_schema)
                            valid += 1
    assert valid > 0, "No metadata.json found"
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def test_zipfile_attributes():
    # With the change from ZipFile.write() to .writestr(), we need to manually
    # set member attributes.
    with temporary_directory() as tempdir:
        files = (('foo', 0o644), ('bar', 0o755))
        for filename, mode in files:
            path = os.path.join(tempdir, filename)
            with codecs.open(path, 'w', encoding='utf-8') as fp:
                fp.write(filename + '\n')
            os.chmod(path, mode)
        zip_base_name = os.path.join(tempdir, 'dummy')
        zip_filename = wheel.archive.make_wheelfile_inner(
            zip_base_name, tempdir)
        with readable_zipfile(zip_filename) as zf:
            for filename, mode in files:
                info = zf.getinfo(os.path.join(tempdir, filename))
                assert info.external_attr == (mode | 0o100000) << 16
                assert info.compress_type == zipfile.ZIP_DEFLATED
项目:pbtk    作者:marin-m    | 项目源码 | 文件源码
def handle_file(self, fname):
        with open(fname, 'rb') as fd:
            if fd.read(4) == b'dex\n':
                new_jar = self.name + '/classes-dex2jar.jar'
                run([dex2jar, fname, '-f', '-o', new_jar], cwd=self.name, stderr=DEVNULL)
                fname = new_jar

        with ZipFile(fname) as jar:
            jar.extractall(self.name)

            for cls in jar.namelist():
                if cls.endswith('.class'):
                    cls = cls.replace('/', '.')[:-6]
                    self.classes.append(cls)

                elif cls.endswith('.dex'):
                    self.handle_file(self.name + '/' + cls)

                elif cls.endswith('.proto'):
                    self.bonus_protos[cls] = jar.read(cls).decode('utf8')

                elif cls.endswith('.so'):
                    self.bonus_protos.update(walk_binary(self.name + '/' + cls))
项目:table-compositor    作者:InvestmentSystems    | 项目源码 | 文件源码
def load_names_data():
    fp = os.path.join(tempfile.gettempdir(), ZIP_NAME)
    if not os.path.exists(fp):
        r = requests.get(URL_NAMES)
        with open(fp, 'wb') as f:
            f.write(r.content)

    post = collections.OrderedDict()
    with zipfile.ZipFile(fp) as zf:
        # get ZipInfo instances
        for zi in sorted(zf.infolist(), key=lambda zi: zi.filename):
            fn = zi.filename
            if fn.startswith('yob'):
                year = int(fn[3:7])
                df = pd.read_csv(
                    zf.open(zi),
                    header=None,
                    names=('name', 'gender', 'count'))
                df['year'] = year
                post[year] = df

        df = pd.concat(post.values())
        df.set_index('name', inplace=True, drop=True)
        return df
项目:table-compositor    作者:InvestmentSystems    | 项目源码 | 文件源码
def load_names_data():
    fp = os.path.join(tempfile.gettempdir(), ZIP_NAME)
    if not os.path.exists(fp):
        r = requests.get(URL_NAMES)
        with open(fp, 'wb') as f:
            f.write(r.content)

    post = collections.OrderedDict()
    with zipfile.ZipFile(fp) as zf:
        # get ZipInfo instances
        for zi in sorted(zf.infolist(), key=lambda zi: zi.filename):
            fn = zi.filename
            if fn.startswith('yob'):
                year = int(fn[3:7])
                df = pd.read_csv(
                    zf.open(zi),
                    header=None,
                    names=('name', 'gender', 'count'))
                df['year'] = year
                post[year] = df

        df = pd.concat(post.values())
        df.set_index('name', inplace=True, drop=True)
        return df
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def file(self):
        """
        Returns a file pointer to this binary

        :example:

        >>> process_obj = c.select(Process).where("process_name:svch0st.exe").first()
        >>> binary_obj = process_obj.binary
        >>> print(binary_obj.file.read(2))
        MZ
        """
        # TODO: I don't like reaching through to the session...
        with closing(self._cb.session.get("/api/v1/binary/{0:s}".format(self.md5sum), stream=True)) as r:
            z = StringIO(r.content)
            zf = ZipFile(z)
            fp = zf.open('filedata')
            return fp
项目:aws-greengrass-mini-fulfillment    作者:awslabs    | 项目源码 | 文件源码
def _create_lambda(arn, func_name, func_desc, lambda_handler, lambda_main,
                   runtime):
    func = dict()
    lamb = boto3.client('lambda')
    with open(temp_deploy_zip) as deploy:
        func['ZipFile'] = deploy.read()
    try:
        resp = lamb.create_function(
            FunctionName=func_name, Runtime=runtime, Publish=True,
            Description=func_desc,
            Role=arn, Code=func, Handler='{0}.{1}'.format(
                lambda_main, lambda_handler
            ))
        logging.info("Create Lambda Function resp:{0}".format(
            json.dumps(resp, indent=4, sort_keys=True))
        )
        return resp
    except ClientError as ce:
        if ce.response['Error']['Code'] == 'ValidationException':
            logging.warning("Validation Error {0} creating function '{1}'.".format(
                ce, func_name))
        else:
            logging.error("Unexpected Error: {0}".format(ce))
项目:aws-greengrass-mini-fulfillment    作者:awslabs    | 项目源码 | 文件源码
def _update_lambda_function(zip_file, func_name):
    lamb = boto3.client('lambda')
    try:
        resp = lamb.update_function_code(
            FunctionName=func_name,
            ZipFile=zip_file.read(),
            Publish=True
        )
        return resp['Version']
    except ClientError as ce:
        if ce.response['Error']['Code'] == 'ValidationException':
            logging.warning(
                "Validation Error {0} updating function '{1}'.".format(
                    ce, func_name))
        else:
            logging.error("Unexpected Error: {0}".format(ce))
项目:PyWebRunner    作者:IntuitiveWebSolutions    | 项目源码 | 文件源码
def download_driver_file(whichbin, url, base_path):
    if url.endswith('.tar.gz'):
        ext = '.tar.gz'
    else:
        ext = '.zip'
    print("Downloading from: {}".format(url))
    download_file(url, '/tmp/pwr_temp{}'.format(ext))
    if ext == '.tar.gz':
        import tarfile
        tar = tarfile.open('/tmp/pwr_temp{}'.format(ext), "r:gz")
        tar.extractall('{}/'.format(base_path))
        tar.close()
    else:
        import zipfile
        with zipfile.ZipFile('/tmp/pwr_temp{}'.format(ext), "r") as z:
            z.extractall('{}/'.format(base_path))

    # if whichbin == 'wires' and '/v{}/'.format(latest_gecko_driver) in url:
    #     os.rename('{}/geckodriver'.format(base_path),
    #               '{}/wires'.format(base_path))
    #     os.chmod('{}/wires'.format(base_path), 0o775)
    if whichbin == 'wires':
        os.chmod('{}/geckodriver'.format(base_path), 0o775)
    else:
        os.chmod('{}/chromedriver'.format(base_path), 0o775)
项目:pyseeder    作者:PurpleI2P    | 项目源码 | 文件源码
def reseed(self, netdb):
        """Compress netdb entries and set content"""
        zip_file = io.BytesIO()
        dat_files = []

        for root, dirs, files in os.walk(netdb):
            for f in files:
                if f.endswith(".dat"):
                    # TODO check modified time
                    # may be not older than 10h
                    dat_files.append(os.path.join(root, f))

        if len(dat_files) == 0:
            raise PyseederException("Can't get enough netDb entries")
        elif len(dat_files) > 75:
            dat_files = random.sample(dat_files, 75)

        with ZipFile(zip_file, "w", compression=ZIP_DEFLATED) as zf:
            for f in dat_files: 
                zf.write(f, arcname=os.path.split(f)[1])

        self.FILE_TYPE = 0x00
        self.CONTENT_TYPE = 0x03
        self.CONTENT = zip_file.getvalue()
        self.CONTENT_LENGTH = len(self.CONTENT)
项目:sc8pr    作者:dmaccarthy    | 项目源码 | 文件源码
def _load(self, fn, notify=False):
        "Load the video from a ZIP file"
        with ZipFile(fn) as zf:
            self._loadMeta(zf)
            self._costumes = []
            i = 0
            while i >= 0:
                try:
                    if notify: notify(fn, i, self)
                    data = zf.read(str(i))
                    if data: data = data[:-12], data[-12:]
                    else: data = self._costumes[i-1]
                    i += 1
                    self._costumes.append(data)
                except:
                    if notify: notify(fn, None, self)
                    i = -1
项目:sketch-components    作者:ibhubs    | 项目源码 | 文件源码
def compare_component_output(self, input_path, expected_output_path):
        rendering_engine = self.get_rendering_engine()
        temp_dir = tempfile.gettempdir()
        output_dir = os.path.join(temp_dir, str(uuid.uuid4()))
        process_sketch_archive(zip_path=input_path, compress_zip=False,
                               output_path=output_dir, engine=rendering_engine)
        self.assertTrue(dircmp.is_same(expected_output_path, output_dir))
        shutil.rmtree(output_dir)
        storage.clear()
        output_zip = os.path.join(temp_dir, "{}.zip".format(str(uuid.uuid4())))
        process_sketch_archive(zip_path=input_path, compress_zip=True,
                               output_path=output_zip, engine=rendering_engine)
        z = zipfile.ZipFile(output_zip)
        z.extractall(output_dir)
        self.assertTrue(dircmp.is_same(expected_output_path, output_dir))
        shutil.rmtree(output_dir)
        os.remove(output_zip)
项目:sketch-components    作者:ibhubs    | 项目源码 | 文件源码
def write_file_to_zip_at_path(zip_path=None, archive=None, file_path='',
                              content='', encode=False):
    if zip_path:
        ensure_directories_for_path(zip_path)
        z = zipfile.ZipFile(zip_path, mode="a")
    elif archive:
        z = archive
    else:
        raise Exception('One of zip_path, archive must be provided.')
    print("Adding {} to archive ...".format(file_path))
    if encode:
        z.writestr(file_path, content.encode('utf-8'))
    else:
        z.writestr(file_path, content)
    storage.file_paths.add(file_path)
    if zip_path:
        z.close()
项目:CausalGAN    作者:mkocaoglu    | 项目源码 | 文件源码
def download_celeb_a(base_path):
    data_path = os.path.join(base_path, 'celebA')
    images_path = os.path.join(data_path, 'images')
    if os.path.exists(data_path):
        print('[!] Found celeb-A - skip')
        return

    filename, drive_id  = "img_align_celeba.zip", "0B7EVK8r0v71pZjFTYXZWM3FlRnM"
    save_path = os.path.join(base_path, filename)

    if os.path.exists(save_path):
        print('[*] {} already exists'.format(save_path))
    else:
        download_file_from_google_drive(drive_id, save_path)

    zip_dir = ''
    with zipfile.ZipFile(save_path) as zf:
        zip_dir = zf.namelist()[0]
        zf.extractall(base_path)
    if not os.path.exists(data_path):
        os.mkdir(data_path)
    os.rename(os.path.join(base_path, "img_align_celeba"), images_path)
    os.remove(save_path)

    download_attr_file(data_path)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def listdir(path):
    """Replacement for os.listdir that works in frozen environments."""
    if not hasattr(sys, 'frozen'):
        return os.listdir(path)

    (zipPath, archivePath) = splitZip(path)
    if archivePath is None:
        return os.listdir(path)

    with zipfile.ZipFile(zipPath, "r") as zipobj:
        contents = zipobj.namelist()
    results = set()
    for name in contents:
        # components in zip archive paths are always separated by forward slash
        if name.startswith(archivePath) and len(name) > len(archivePath):
            name = name[len(archivePath):].split('/')[0]
            results.add(name)
    return list(results)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def listdir(path):
    """Replacement for os.listdir that works in frozen environments."""
    if not hasattr(sys, 'frozen'):
        return os.listdir(path)

    (zipPath, archivePath) = splitZip(path)
    if archivePath is None:
        return os.listdir(path)

    with zipfile.ZipFile(zipPath, "r") as zipobj:
        contents = zipobj.namelist()
    results = set()
    for name in contents:
        # components in zip archive paths are always separated by forward slash
        if name.startswith(archivePath) and len(name) > len(archivePath):
            name = name[len(archivePath):].split('/')[0]
            results.add(name)
    return list(results)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def find_package(self, package):
        for path in self.paths():
            full = os.path.join(path, package)
            if os.path.exists(full):
                return package, full
            if not os.path.isdir(path) and zipfile.is_zipfile(path):
                zip = zipfile.ZipFile(path, 'r')
                try:
                    zip.read(os.path.join(package, '__init__.py'))
                except KeyError:
                    pass
                else:
                    zip.close()
                    return package, full
                zip.close()
        ## FIXME: need special error for package.py case:
        raise InstallationError(
            'No package with the name %s found' % package)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def create_zipfile(self, filename):
        zip_file = zipfile.ZipFile(filename, "w")
        try:
            self.mkpath(self.target_dir)  # just in case
            for root, dirs, files in os.walk(self.target_dir):
                if root == self.target_dir and not files:
                    raise DistutilsOptionError(
                        "no files found in upload directory '%s'"
                        % self.target_dir)
                for name in files:
                    full = os.path.join(root, name)
                    relative = root[len(self.target_dir):].lstrip(os.path.sep)
                    dest = os.path.join(relative, name)
                    zip_file.write(full, dest)
        finally:
            zip_file.close()
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def setUp(self):
        if self.datafile is None or self.dataname is None:
            return

        if not os.path.isfile(self.datafile):
            self.old_cwd = None
            return

        self.old_cwd = os.getcwd()

        self.temp_dir = tempfile.mkdtemp()
        zip_file, source, target = [None, None, None]
        try:
            zip_file = zipfile.ZipFile(self.datafile)
            for files in zip_file.namelist():
                _extract(zip_file, files, self.temp_dir)
        finally:
            if zip_file:
                zip_file.close()
            del zip_file

        os.chdir(os.path.join(self.temp_dir, self.dataname))
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_create_zipfile(self):
        # Test to make sure zipfile creation handles common cases.
        # This explicitly includes a folder containing an empty folder.

        dist = Distribution()

        cmd = upload_docs(dist)
        cmd.upload_dir = self.upload_dir
        cmd.target_dir = self.upload_dir
        tmp_dir = tempfile.mkdtemp()
        tmp_file = os.path.join(tmp_dir, 'foo.zip')
        try:
            zip_file = cmd.create_zipfile(tmp_file)

            assert zipfile.is_zipfile(tmp_file)

            zip_file = zipfile.ZipFile(tmp_file) # woh...

            assert zip_file.namelist() == ['index.html']

            zip_file.close()
        finally:
            shutil.rmtree(tmp_dir)
项目:fluxpart    作者:usda-ars-ussl    | 项目源码 | 文件源码
def tutor_fpout():
    pklout = os.path.join(RESDIR, TUTORPKL)
    if os.path.exists(pklout):
        with open(pklout, 'rb') as f:
            fpout = pickle.load(f)
    else:
        print('re-creating fp results ... this could take a few minutes')
        zip_archive = os.path.join(DATADIR, ZIPFILE)
        with zipfile.ZipFile(zip_archive, 'r') as zfile:
            zfile.extractall(DATADIR)
        fpout = tutor_example()
        make_clean_dat()
        os.makedirs(RESDIR, exist_ok=True)
        with open(pklout, 'wb') as f:
            pickle.dump(fpout, f)
    return fpout
项目:fluxpart    作者:usda-ars-ussl    | 项目源码 | 文件源码
def hfdata_plot():
    make_quickdata()
    unit_convert = {'q': 1e-3, 'c': 1e-6, 'P': 1e3}
    converters = {
        k: _converter_func(float(v), 0.) for k, v in unit_convert.items()}
    converters['T'] = _converter_func(1., 273.15)
    with zipfile.ZipFile(os.path.join(DATADIR, ZIPFILE), 'r') as zarch:
        with zarch.open(QUICKFILE, 'r') as datafile:
            data = HFData(
                fname=datafile,
                cols=(2, 3, 4, 6, 5, 7, 8),
                converters=converters,
                delimiter=",",
                skip_header=4)
    fig = plot_hfdata(data)
    return fig
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def extract_zip_file(zipFilePath, extractDir):
    if not os.path.exists(extractDir):
        os.mkdir(extractDir)    
    print '''Extracting
    %s 
to
    %s...''' % (zipFilePath, extractDir),
    zfile = zipfile.ZipFile(zipFilePath)

    uncompress_size = sum((file.file_size for file in zfile.infolist()))

    extracted_size = 0

    print '\n'
    for _file in zfile.infolist():
        extracted_size += _file.file_size
        sys.stdout.write("    %s%%\t\t%s\n" % (extracted_size * 100/uncompress_size, _file.filename))
        zfile.extract(_file, path=extractDir)
# ORIG  zip.extractall(path=extractDir)
    print 'Ok'
项目:TACTIC-Handler    作者:listyque    | 项目源码 | 文件源码
def get_client_version(my):
        '''API Function: get_client_version()

        @return:
            string - Version of TACTIC that this client came from'''

        # may use pkg_resources in 2.6
        if '.zip' in __file__:
            import zipfile
            parts = __file__.split('.zip')
            zip_name  = '%s.zip'%parts[0]
            if zipfile.is_zipfile(zip_name):
                z = zipfile.ZipFile(zip_name)
                version = z.read('pyasm/application/common/interpreter/tactic_client_lib/VERSION')
                version = version.strip()
                z.close()
        else:
            dir = os.path.dirname(__file__)
            f = open('%s/VERSION' % dir, 'r')
            version = f.readline().strip()
            f.close()
        return version
项目:TACTIC-Handler    作者:listyque    | 项目源码 | 文件源码
def get_client_api_version(my):
        '''API Function: get_client_api_version()

        @return:
            string - client api version'''

        # may use pkg_resources in 2.6
        if '.zip' in __file__:
            import zipfile
            parts = __file__.split('.zip')
            zip_name  = '%s.zip'%parts[0]
            if zipfile.is_zipfile(zip_name):
                z = zipfile.ZipFile(zip_name)
                version = z.read('pyasm/application/common/interpreter/tactic_client_lib/VERSION_API')
                version = version.strip()
                z.close()
        else:
            dir = os.path.dirname(__file__)
            f = open('%s/VERSION_API' % dir, 'r')
            version = f.readline().strip()
            f.close()
        return version
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_export_result_csv_no_tasks_returns_empty_file(self):
        """Test WEB export Result to CSV returns empty file if no results in
        project."""
        project = ProjectFactory.create(short_name='no_tasks_here')
        uri = "/project/%s/tasks/export?type=result&format=csv" % project.short_name
        res = self.app.get(uri, follow_redirects=True)
        zip = zipfile.ZipFile(StringIO(res.data))
        extracted_filename = zip.namelist()[0]

        csv_content = StringIO(zip.read(extracted_filename))
        csvreader = unicode_csv_reader(csv_content)
        is_empty = True
        for line in csvreader:
            is_empty = False, line

        assert is_empty
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def test_pydist():
    """Make sure pydist.json exists and validates against our schema."""
    # XXX this test may need manual cleanup of older wheels

    import jsonschema

    def open_json(filename):
        with open(filename, 'rb') as json_file:
            return json.loads(json_file.read().decode('utf-8'))

    pymeta_schema = open_json(resource_filename('wheel.test',
                                                'pydist-schema.json'))
    valid = 0
    for dist in ("simple.dist", "complex-dist"):
        basedir = pkg_resources.resource_filename('wheel.test', dist)
        for (dirname, subdirs, filenames) in os.walk(basedir):
            for filename in filenames:
                if filename.endswith('.whl'):
                    whl = ZipFile(os.path.join(dirname, filename))
                    for entry in whl.infolist():
                        if entry.filename.endswith('/metadata.json'):
                            pymeta = json.loads(whl.read(entry).decode('utf-8'))
                            jsonschema.validate(pymeta, pymeta_schema)
                            valid += 1
    assert valid > 0, "No metadata.json found"
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def test_zipfile_attributes():
    # With the change from ZipFile.write() to .writestr(), we need to manually
    # set member attributes.
    with temporary_directory() as tempdir:
        files = (('foo', 0o644), ('bar', 0o755))
        for filename, mode in files:
            path = os.path.join(tempdir, filename)
            with codecs.open(path, 'w', encoding='utf-8') as fp:
                fp.write(filename + '\n')
            os.chmod(path, mode)
        zip_base_name = os.path.join(tempdir, 'dummy')
        zip_filename = wheel.archive.make_wheelfile_inner(
            zip_base_name, tempdir)
        with readable_zipfile(zip_filename) as zf:
            for filename, mode in files:
                info = zf.getinfo(os.path.join(tempdir, filename))
                assert info.external_attr == (mode | 0o100000) << 16
                assert info.compress_type == zipfile.ZIP_DEFLATED
项目:sndlatr    作者:Schibum    | 项目源码 | 文件源码
def init_zoneinfo():
    """
    Add each zone info to the datastore. This will overwrite existing zones.

    This must be called before the AppengineTimezoneLoader will work.
    """
    import os, logging
    from zipfile import ZipFile
    zoneobjs = []

    zoneinfo_path = os.path.abspath(os.path.join(os.path.dirname(__file__),
      'zoneinfo.zip'))

    with ZipFile(zoneinfo_path) as zf:
        for zfi in zf.filelist:
            key = ndb.Key('Zoneinfo', zfi.filename, namespace=NDB_NAMESPACE)
            zobj = Zoneinfo(key=key, data=zf.read(zfi))
            zoneobjs.append(zobj)

    logging.info("Adding %d timezones to the pytz-appengine database" %
        len(zoneobjs)
        )

    ndb.put_multi(zoneobjs)
项目:gns3-documentation-template    作者:GNS3    | 项目源码 | 文件源码
def get_appliances():
    """
    This will download an archive of all GNS3 appliances and load them to a dictionnary
    """
    appliances = {}

    url = 'https://github.com/GNS3/gns3-registry/archive/master.zip'
    response = urllib.request.urlopen(url)
    z = zipfile.ZipFile(io.BytesIO(response.read()))
    for path in z.namelist():
        if path.endswith('.gns3a'):
            with z.open(path, 'r') as f:
                id = os.path.basename(path).split('.')[0]
                appliance = json.loads(f.read().decode())
                if appliance["status"] != "broken":
                    appliances[id] = appliance
    return appliances
项目:gns3-documentation-template    作者:GNS3    | 项目源码 | 文件源码
def get_appliances():
    """
    This will download an archive of all GNS3 appliances and load them to a dictionnary
    """
    appliances = {}

    url = 'https://github.com/GNS3/gns3-registry/archive/master.zip'
    response = urllib.request.urlopen(url)
    z = zipfile.ZipFile(io.BytesIO(response.read()))
    for path in z.namelist():
        if path.endswith('.gns3a'):
            with z.open(path, 'r') as f:
                id = os.path.basename(path).split('.')[0]
                appliance = json.loads(f.read().decode())
                if appliance["status"] != "broken":
                    appliances[id] = appliance
    return appliances
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_pydist():
    """Make sure pydist.json exists and validates against our schema."""
    # XXX this test may need manual cleanup of older wheels

    import jsonschema

    def open_json(filename):
        return json.loads(open(filename, 'rb').read().decode('utf-8'))

    pymeta_schema = open_json(resource_filename('wheel.test',
                                                'pydist-schema.json'))
    valid = 0
    for dist in ("simple.dist", "complex-dist"):
        basedir = pkg_resources.resource_filename('wheel.test', dist)
        for (dirname, subdirs, filenames) in os.walk(basedir):
            for filename in filenames:
                if filename.endswith('.whl'):
                    whl = ZipFile(os.path.join(dirname, filename))
                    for entry in whl.infolist():
                        if entry.filename.endswith('/metadata.json'):
                            pymeta = json.loads(whl.read(entry).decode('utf-8'))
                            jsonschema.validate(pymeta, pymeta_schema)
                            valid += 1
    assert valid > 0, "No metadata.json found"
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def create_zipfile(self, filename):
        zip_file = zipfile.ZipFile(filename, "w")
        try:
            self.mkpath(self.target_dir)  # just in case
            for root, dirs, files in os.walk(self.target_dir):
                if root == self.target_dir and not files:
                    raise DistutilsOptionError(
                        "no files found in upload directory '%s'"
                        % self.target_dir)
                for name in files:
                    full = os.path.join(root, name)
                    relative = root[len(self.target_dir):].lstrip(os.path.sep)
                    dest = os.path.join(relative, name)
                    zip_file.write(full, dest)
        finally:
            zip_file.close()
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def extract_zipfile(archive_name, destpath):
    "Unpack a zip file"
    archive = zipfile.ZipFile(archive_name)
    archive.extractall(destpath)
项目:Adafruit_Python_PureIO    作者:adafruit    | 项目源码 | 文件源码
def get_zip_class():
    """
    Supplement ZipFile class to support context manager for Python 2.6
    """
    class ContextualZipFile(zipfile.ZipFile):
        def __enter__(self):
            return self
        def __exit__(self, type, value, traceback):
            self.close
    return zipfile.ZipFile if hasattr(zipfile.ZipFile, '__exit__') else \
        ContextualZipFile
项目:almond-nnparser    作者:Stanford-Mobisocial-IoT-Lab    | 项目源码 | 文件源码
def download_glove(glove):
    if os.path.exists(glove):
        return

    print('Downloading glove...')
    with tempfile.TemporaryFile() as tmp:
        with urllib.request.urlopen('http://nlp.stanford.edu/data/glove.42B.300d.zip') as res:
            shutil.copyfileobj(res, tmp)
        with zipfile.ZipFile(tmp, 'r') as glove_zip:
            glove_zip.extract('glove.42B.300d.txt', path=os.path.dirname(glove))
    print('Done')
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def _unpack_zipfile(filename, extract_dir):
    """Unpack zip `filename` to `extract_dir`
    """
    try:
        import zipfile
    except ImportError:
        raise ReadError('zlib not supported, cannot unpack this archive.')

    if not zipfile.is_zipfile(filename):
        raise ReadError("%s is not a zip file" % filename)

    zip = zipfile.ZipFile(filename)
    try:
        for info in zip.infolist():
            name = info.filename

            # don't extract absolute paths or ones with .. in them
            if name.startswith('/') or '..' in name:
                continue

            target = os.path.join(extract_dir, *name.split('/'))
            if not target:
                continue

            _ensure_directory(target)
            if not name.endswith('/'):
                # file
                data = zip.read(info.filename)
                f = open(target, 'wb')
                try:
                    f.write(data)
                finally:
                    f.close()
                    del data
    finally:
        zip.close()
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def __new__(cls, *args, **kwargs):
        """
        Construct a ZipFile or ContextualZipFile as appropriate
        """
        if hasattr(zipfile.ZipFile, '__exit__'):
            return zipfile.ZipFile(*args, **kwargs)
        return super(ContextualZipFile, cls).__new__(cls)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def unzip_file(filename, location, flatten=True):
    """
    Unzip the file (with path `filename`) to the destination `location`.  All
    files are written based on system defaults and umask (i.e. permissions are
    not preserved), except that regular file members with any execute
    permissions (user, group, or world) have "chmod +x" applied after being
    written. Note that for windows, any execute changes using os.chmod are
    no-ops per the python docs.
    """
    ensure_dir(location)
    zipfp = open(filename, 'rb')
    try:
        zip = zipfile.ZipFile(zipfp, allowZip64=True)
        leading = has_leading_dir(zip.namelist()) and flatten
        for info in zip.infolist():
            name = info.filename
            data = zip.read(name)
            fn = name
            if leading:
                fn = split_leading_dir(name)[1]
            fn = os.path.join(location, fn)
            dir = os.path.dirname(fn)
            if fn.endswith('/') or fn.endswith('\\'):
                # A directory
                ensure_dir(fn)
            else:
                ensure_dir(dir)
                fp = open(fn, 'wb')
                try:
                    fp.write(data)
                finally:
                    fp.close()
                    mode = info.external_attr >> 16
                    # if mode and regular file and any execute permissions for
                    # user/group/world?
                    if mode and stat.S_ISREG(mode) and mode & 0o111:
                        # make dest file have execute for user/group/world
                        # (chmod +x) no-op on windows per python docs
                        os.chmod(fn, (0o777 - current_umask() | 0o111))
    finally:
        zipfp.close()
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def readable_zipfile(path):
    # zipfile.ZipFile() isn't a context manager under Python 2.
    zf = zipfile.ZipFile(path, 'r')
    try:
        yield zf
    finally:
        zf.close()
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def test_verifying_zipfile():
    if not hasattr(zipfile.ZipExtFile, '_update_crc'):
        pytest.skip('No ZIP verification. Missing ZipExtFile._update_crc.')

    sio = StringIO()
    zf = zipfile.ZipFile(sio, 'w')
    zf.writestr("one", b"first file")
    zf.writestr("two", b"second file")
    zf.writestr("three", b"third file")
    zf.close()

    # In default mode, VerifyingZipFile checks the hash of any read file
    # mentioned with set_expected_hash(). Files not mentioned with
    # set_expected_hash() are not checked.
    vzf = wheel.install.VerifyingZipFile(sio, 'r')
    vzf.set_expected_hash("one", hashlib.sha256(b"first file").digest())
    vzf.set_expected_hash("three", "blurble")
    vzf.open("one").read()
    vzf.open("two").read()
    try:
        vzf.open("three").read()
    except wheel.install.BadWheelFile:
        pass
    else:
        raise Exception("expected exception 'BadWheelFile()'")

    # In strict mode, VerifyingZipFile requires every read file to be
    # mentioned with set_expected_hash().
    vzf.strict = True
    try:
        vzf.open("two").read()
    except wheel.install.BadWheelFile:
        pass
    else:
        raise Exception("expected exception 'BadWheelFile()'")

    vzf.set_expected_hash("two", None)
    vzf.open("two").read()
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def open(self, name_or_info, mode="r", pwd=None):
        """Return file-like object for 'name'."""
        # A non-monkey-patched version would contain most of zipfile.py
        ef = zipfile.ZipFile.open(self, name_or_info, mode, pwd)
        if isinstance(name_or_info, zipfile.ZipInfo):
            name = name_or_info.filename
        else:
            name = name_or_info
        if (name in self._expected_hashes
            and self._expected_hashes[name] != None):
            expected_hash = self._expected_hashes[name]
            try:
                _update_crc_orig = ef._update_crc
            except AttributeError:
                warnings.warn('Need ZipExtFile._update_crc to implement '
                              'file hash verification (in Python >= 2.7)')
                return ef
            running_hash = self._hash_algorithm()
            if hasattr(ef, '_eof'):  # py33
                def _update_crc(data):
                    _update_crc_orig(data)
                    running_hash.update(data)
                    if ef._eof and running_hash.digest() != expected_hash:
                        raise BadWheelFile("Bad hash for file %r" % ef.name)
            else:
                def _update_crc(data, eof=None):
                    _update_crc_orig(data, eof=eof)
                    running_hash.update(data)
                    if eof and running_hash.digest() != expected_hash:
                        raise BadWheelFile("Bad hash for file %r" % ef.name)
            ef._update_crc = _update_crc
        elif self.strict and name not in self._expected_hashes:
            raise BadWheelFile("No expected hash for file %r" % ef.name)
        return ef