Python zlib 模块,compress() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zlib.compress()

项目:openstack-deploy    作者:yaoice    | 项目源码 | 文件源码
def copy_from_host(module):
    compress = module.params.get('compress')
    src = module.params.get('src')

    if not os.path.exists(src):
        module.fail_json(msg="file not found: {}".format(src))
    if not os.access(src, os.R_OK):
        module.fail_json(msg="file is not readable: {}".format(src))

    mode = oct(os.stat(src).st_mode & 0o777)

    with open(src, 'rb') as f:
        raw_data = f.read()

    sha1 = hashlib.sha1(raw_data).hexdigest()
    data = zlib.compress(raw_data) if compress else raw_data

    module.exit_json(content=base64.b64encode(data), sha1=sha1, mode=mode,
                     source=src)
项目:openstack-deploy    作者:yaoice    | 项目源码 | 文件源码
def main():
    argument_spec = dict(
        compress=dict(default=True, type='bool'),
        dest=dict(type='str'),
        mode=dict(default='0644', type='str'),
        sha1=dict(default=None, type='str'),
        src=dict(required=True, type='str')
    )
    module = AnsibleModule(argument_spec)

    dest = module.params.get('dest')

    try:
        if dest:
            copy_to_host(module)
        else:
            copy_from_host(module)
    except Exception:
        module.exit_json(failed=True, changed=True,
                         msg=repr(traceback.format_exc()))


# import module snippets
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def get_console_log(session, arg_dict):
    try:
        raw_dom_id = arg_dict['dom_id']
    except KeyError:
        raise dom0_pluginlib.PluginError("Missing dom_id")
    try:
        dom_id = int(raw_dom_id)
    except ValueError:
        raise dom0_pluginlib.PluginError("Invalid dom_id")

    logfile = open(CONSOLE_LOG_FILE_PATTERN % dom_id, 'rb')
    try:
        try:
            log_content = _last_bytes(logfile)
        except IOError, e:  # noqa
            msg = "Error reading console: %s" % e
            logging.debug(msg)
            raise dom0_pluginlib.PluginError(msg)
    finally:
        logfile.close()

    return base64.b64encode(zlib.compress(log_content))
项目:Tinychat-Bot--Discontinued    作者:Tinychat    | 项目源码 | 文件源码
def test_compressed(self):
        """
        ByteArrays can be compressed. Test the C{compressed} attribute for
        validity.
        """
        try:
            import zlib
        except ImportError:
            self.skipTest('zlib is missing')

        ba = amf3.ByteArray()

        self.assertFalse(ba.compressed)

        z = zlib.compress('b' * 100)
        ba = amf3.ByteArray(z)

        self.assertTrue(ba.compressed)

        z = zlib.compress('\x00' * 100)
        ba = amf3.ByteArray(z)

        self.assertTrue(ba.compressed)
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def set(self, key, value, timeout=DEFAULT_TIMEOUT, version=None):
        self._createdir()  # Cache dir can be deleted at any time.
        fname = self._key_to_file(key, version)
        self._cull()  # make some room if necessary
        fd, tmp_path = tempfile.mkstemp(dir=self._dir)
        renamed = False
        try:
            with io.open(fd, 'wb') as f:
                expiry = self.get_backend_timeout(timeout)
                f.write(pickle.dumps(expiry, -1))
                f.write(zlib.compress(pickle.dumps(value), -1))
            file_move_safe(tmp_path, fname, allow_overwrite=True)
            renamed = True
        finally:
            if not renamed:
                os.remove(tmp_path)
项目:NarshaTech    作者:KimJangHyeon    | 项目源码 | 文件源码
def set(self, key, value, timeout=DEFAULT_TIMEOUT, version=None):
        self._createdir()  # Cache dir can be deleted at any time.
        fname = self._key_to_file(key, version)
        self._cull()  # make some room if necessary
        fd, tmp_path = tempfile.mkstemp(dir=self._dir)
        renamed = False
        try:
            with io.open(fd, 'wb') as f:
                expiry = self.get_backend_timeout(timeout)
                f.write(pickle.dumps(expiry, pickle.HIGHEST_PROTOCOL))
                f.write(zlib.compress(pickle.dumps(value, pickle.HIGHEST_PROTOCOL)))
            file_move_safe(tmp_path, fname, allow_overwrite=True)
            renamed = True
        finally:
            if not renamed:
                os.remove(tmp_path)
项目:kolla-kubernetes-personal    作者:rthallisey    | 项目源码 | 文件源码
def copy_from_host(module):
    compress = module.params.get('compress')
    src = module.params.get('src')

    if not os.path.exists(src):
        module.fail_json(msg="file not found: {}".format(src))
    if not os.access(src, os.R_OK):
        module.fail_json(msg="file is not readable: {}".format(src))

    mode = oct(os.stat(src).st_mode & 0o777)

    with open(src, 'rb') as f:
        raw_data = f.read()

    sha1 = hashlib.sha1(raw_data).hexdigest()
    data = zlib.compress(raw_data) if compress else raw_data

    module.exit_json(content=base64.b64encode(data), sha1=sha1, mode=mode,
                     source=src)
项目:kolla-kubernetes-personal    作者:rthallisey    | 项目源码 | 文件源码
def main():
    argument_spec = dict(
        compress=dict(default=True, type='bool'),
        dest=dict(type='str'),
        mode=dict(default='0644', type='str'),
        sha1=dict(default=None, type='str'),
        src=dict(required=True, type='str')
    )
    module = AnsibleModule(argument_spec)

    dest = module.params.get('dest')

    try:
        if dest:
            copy_to_host(module)
        else:
            copy_from_host(module)
    except Exception as e:
        module.exit_json(failed=True, changed=True, msg=repr(e))


# import module snippets
项目:django-lrucache-backend    作者:kogan    | 项目源码 | 文件源码
def test_binary_string(self):
        # Binary strings should be cacheable
        cache = self.cache
        from zlib import compress, decompress
        value = 'value_to_be_compressed'
        compressed_value = compress(value.encode())

        # Test set
        cache.set('binary1', compressed_value)
        compressed_result = cache.get('binary1')
        self.assertEqual(compressed_value, compressed_result)
        self.assertEqual(value, decompress(compressed_result).decode())

        # Test add
        cache.add('binary1-add', compressed_value)
        compressed_result = cache.get('binary1-add')
        self.assertEqual(compressed_value, compressed_result)
        self.assertEqual(value, decompress(compressed_result).decode())

        # Test set_many
        cache.set_many({'binary1-set_many': compressed_value})
        compressed_result = cache.get('binary1-set_many')
        self.assertEqual(compressed_value, compressed_result)
        self.assertEqual(value, decompress(compressed_result).decode())
项目:CoBL-public    作者:lingdb    | 项目源码 | 文件源码
def compressedField(field):
    # Decorator for compressed fields:

    def fget(self):
        data = getattr(self, field)
        if data is None:
            return None
        return zlib.decompress(data)

    def fset(self, value):
        setattr(self, field, zlib.compress(value.encode()))

    def fdel(self):
        delattr(self, field)
    return {'doc': "The compression property for %s." % field,
            'fget': fget,
            'fset': fset,
            'fdel': fdel}
项目:Scrum    作者:prakharchoudhary    | 项目源码 | 文件源码
def set(self, key, value, timeout=DEFAULT_TIMEOUT, version=None):
        self._createdir()  # Cache dir can be deleted at any time.
        fname = self._key_to_file(key, version)
        self._cull()  # make some room if necessary
        fd, tmp_path = tempfile.mkstemp(dir=self._dir)
        renamed = False
        try:
            with io.open(fd, 'wb') as f:
                expiry = self.get_backend_timeout(timeout)
                f.write(pickle.dumps(expiry, pickle.HIGHEST_PROTOCOL))
                f.write(zlib.compress(pickle.dumps(value, pickle.HIGHEST_PROTOCOL)))
            file_move_safe(tmp_path, fname, allow_overwrite=True)
            renamed = True
        finally:
            if not renamed:
                os.remove(tmp_path)
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def _open(self, fps=12, loop=True, html=False, compress=False): 
            if not _swf:
                load_lib()

            self._arg_fps = int(fps)
            self._arg_loop = bool(loop)
            self._arg_html = bool(html)
            self._arg_compress = bool(compress)

            self._fp = self.request.get_file()
            self._framecounter = 0
            self._framesize = (100, 100)

            # For compress, we use an in-memory file object
            if self._arg_compress:
                self._fp_real = self._fp
                self._fp = BytesIO()
项目:OSPTF    作者:xSploited    | 项目源码 | 文件源码
def get_encoded_library_string(arch):
    filepath=None
    if arch=="x86":
        filepath=os.path.join("resources","libraryx86.zip")
    elif arch=="x64":
        filepath=os.path.join("resources","libraryx64.zip")
    else:
        raise Exception("unknown arch %s"%arch)
    f = StringIO.StringIO()
    f.write(open(filepath, "rb").read())

    zip = zipfile.ZipFile(f)

    modules = dict([(z.filename, zip.open(z.filename,).read()) for z in zip. infolist() if os.path.splitext(z.filename)[1] in [".py",".pyd",".dll",".pyc",".pyo"]])

    return zlib.compress(marshal.dumps(modules),9)
项目:lan-ichat    作者:Forec    | 项目源码 | 文件源码
def run(self):
        print("VEDIO client starts...")
        while True:
            try:
                self.sock.connect(self.ADDR)
                break
            except:
                time.sleep(3)
                continue
        print("VEDIO client connected...")
        while self.cap.isOpened():
            ret, frame = self.cap.read()
            sframe = cv2.resize(frame, (0,0), fx=self.fx, fy=self.fx)
            data = pickle.dumps(sframe)
            zdata = zlib.compress(data, zlib.Z_BEST_COMPRESSION)
            try:
                self.sock.sendall(struct.pack("L", len(zdata)) + zdata)
            except:
                break
            for i in range(self.interval):
                self.cap.read()
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def _add_array_helper(self, data, array_type, prop_type):
        assert(isinstance(data, array.array))
        assert(data.typecode == array_type)

        length = len(data)

        if _IS_BIG_ENDIAN:
            data = data[:]
            data.byteswap()
        data = data.tobytes()

        # mimic behavior of fbxconverter (also common sense)
        # we could make this configurable.
        encoding = 0 if len(data) <= 128 else 1
        if encoding == 0:
            pass
        elif encoding == 1:
            data = zlib.compress(data, 1)

        comp_len = len(data)

        data = pack('<3I', length, encoding, comp_len) + data

        self.props_type.append(prop_type)
        self.props.append(data)
项目:django    作者:alexsukhrin    | 项目源码 | 文件源码
def set(self, key, value, timeout=DEFAULT_TIMEOUT, version=None):
        self._createdir()  # Cache dir can be deleted at any time.
        fname = self._key_to_file(key, version)
        self._cull()  # make some room if necessary
        fd, tmp_path = tempfile.mkstemp(dir=self._dir)
        renamed = False
        try:
            with io.open(fd, 'wb') as f:
                expiry = self.get_backend_timeout(timeout)
                f.write(pickle.dumps(expiry, pickle.HIGHEST_PROTOCOL))
                f.write(zlib.compress(pickle.dumps(value, pickle.HIGHEST_PROTOCOL)))
            file_move_safe(tmp_path, fname, allow_overwrite=True)
            renamed = True
        finally:
            if not renamed:
                os.remove(tmp_path)
项目:Auspex    作者:BBN-Q    | 项目源码 | 文件源码
def push(self, data):
        if hasattr(data, 'size'):
            self.points_taken += data.size
        else:
            try:
                self.points_taken += len(data)
            except:
                try:
                    junk = data + 1.0
                    self.points_taken += 1
                except:
                    raise ValueError("Got data {} that is neither an array nor a float".format(data))
        if self.compression == 'zlib':
            message = {"type": "data", "compression": "zlib", "data": zlib.compress(pickle.dumps(data, -1))}
        else:
            message = {"type": "data", "compression": "none", "data": data}

        # This can be replaced with some other serialization method
        # and also should support sending via zmq.
        await self.queue.put(message)
项目:kolla-ansible    作者:openstack    | 项目源码 | 文件源码
def copy_from_host(module):
    compress = module.params.get('compress')
    src = module.params.get('src')

    if not os.path.exists(src):
        module.fail_json(msg="file not found: {}".format(src))
    if not os.access(src, os.R_OK):
        module.fail_json(msg="file is not readable: {}".format(src))

    mode = oct(os.stat(src).st_mode & 0o777)

    with open(src, 'rb') as f:
        raw_data = f.read()

    sha1 = hashlib.sha1(raw_data).hexdigest()
    data = zlib.compress(raw_data) if compress else raw_data

    module.exit_json(content=base64.b64encode(data), sha1=sha1, mode=mode,
                     source=src)
项目:kolla-ansible    作者:openstack    | 项目源码 | 文件源码
def main():
    argument_spec = dict(
        compress=dict(default=True, type='bool'),
        dest=dict(type='str'),
        mode=dict(default='0644', type='str'),
        sha1=dict(default=None, type='str'),
        src=dict(required=True, type='str')
    )
    module = AnsibleModule(argument_spec)

    dest = module.params.get('dest')

    try:
        if dest:
            copy_to_host(module)
        else:
            copy_from_host(module)
    except Exception:
        module.exit_json(failed=True, changed=True,
                         msg=repr(traceback.format_exc()))


# import module snippets
项目:mitogen    作者:dw    | 项目源码 | 文件源码
def __init__(self, context, core_src):
        self._context = context
        self._present = {'mitogen': [
            'mitogen.ansible',
            'mitogen.compat',
            'mitogen.compat.pkgutil',
            'mitogen.fakessh',
            'mitogen.master',
            'mitogen.ssh',
            'mitogen.sudo',
            'mitogen.utils',
        ]}
        self.tls = threading.local()
        self._cache = {}
        if core_src:
            self._cache['mitogen.core'] = (
                None,
                'mitogen/core.py',
                zlib.compress(core_src),
            )
项目:codemap    作者:c0demap    | 项目源码 | 文件源码
def SaveModuleBP():
    global codemap
    try:
        modname = AskStr('', 'module name : ')
        bpo = ''
        for e in Functions():
            func = e.startEA
            length = e.endEA - e.startEA
            if length < codemap.func_min_size:
                continue
            offset = func - get_imagebase()
            bpo += str(offset) + '\n'
        print 'bp offset generation complete! ' + str(len(bpo))
        payload = bpo
        with open(codemap.homedir + modname + '.bpo', 'wb') as f:
            f.write(zlib.compress(payload))
    except:
        traceback.print_exc(file=sys.stdout)
项目:contrail-ansible    作者:Juniper    | 项目源码 | 文件源码
def copy_from_host(module):
    compress = module.params.get('compress')
    src = module.params.get('src')

    if not os.path.exists(src):
        module.fail_json(msg="file not found: {}".format(src))
    if not os.access(src, os.R_OK):
        module.fail_json(msg="file is not readable: {}".format(src))

    mode = oct(os.stat(src).st_mode & 0o777)

    with open(src, 'rb') as f:
        raw_data = f.read()

    sha1 = hashlib.sha1(raw_data).hexdigest()
    data = zlib.compress(raw_data) if compress else raw_data

    module.exit_json(content=base64.b64encode(data), sha1=sha1, mode=mode,
                     source=src)
项目:contrail-ansible    作者:Juniper    | 项目源码 | 文件源码
def main():
    argument_spec = dict(
        compress=dict(default=True, type='bool'),
        dest=dict(type='str'),
        mode=dict(default='0644', type='str'),
        sha1=dict(default=None, type='str'),
        src=dict(required=True, type='str')
    )
    module = AnsibleModule(argument_spec)

    dest = module.params.get('dest')

    try:
        if dest:
            copy_to_host(module)
        else:
            copy_from_host(module)
    except Exception:
        module.exit_json(failed=True, changed=True,
                         msg=repr(traceback.format_exc()))


# import module snippets
项目:pytoshop    作者:mdboom    | 项目源码 | 文件源码
def compress_zip(fd,      # type: BinaryIO
                 image,   # type: np.ndarray
                 depth,   # type: int
                 version  # type: int
                 ):       # type: (...) -> None
    """
    Write a Numpy array to a zip (zlib) compressed stream.

{}
    """
    image = normalize_image(image, depth)
    if util.needs_byteswap(image):
        compressor = zlib.compressobj()
        for row in image:
            row = util.do_byteswap(row)
            fd.write(compressor.compress(row))
        fd.write(compressor.flush())
    else:
        fd.write(zlib.compress(image))
项目:pytoshop    作者:mdboom    | 项目源码 | 文件源码
def compress_constant_zip(fd,      # type: BinaryIO
                          value,   # type: int
                          width,   # type: int
                          rows,    # type: int
                          depth,   # type: int
                          version  # type: int
                          ):       # type: (...) -> None
    """
    Write a virtual image containing a constant to a zip compressed
    stream.

{}
    """
    if depth == 1:
        image = _make_onebit_constant(value, width, rows)
        compress_zip(fd, image, depth, version)
    else:
        row = _make_constant_row(value, width, depth)
        row = row.tobytes()
        fd.write(zlib.compress(row * rows))
项目:Stitch    作者:nathanlopez    | 项目源码 | 文件源码
def to_png(self, data, output):
        ''' Dump data to the image file. Data is bytes(RGBRGB...RGB).
            Pure python PNG implementation.
            http://inaps.org/journal/comment-fonctionne-le-png
        '''

        p__ = pack
        line = self.width * 3
        png_filter = p__('>B', 0)
        scanlines = b''.join(
            [png_filter + data[y * line:y * line + line]
             for y in range(self.height)])

        magic = p__('>8B', 137, 80, 78, 71, 13, 10, 26, 10)

        # Header: size, marker, data, CRC32
        ihdr = [b'', b'IHDR', b'', b'']
        ihdr[2] = p__('>2I5B', self.width, self.height, 8, 2, 0, 0, 0)
        ihdr[3] = p__('>I', crc32(b''.join(ihdr[1:3])) & 0xffffffff)
        ihdr[0] = p__('>I', len(ihdr[2]))

        # Data: size, marker, data, CRC32
        idat = [b'', b'IDAT', compress(scanlines), b'']
        idat[3] = p__('>I', crc32(b''.join(idat[1:3])) & 0xffffffff)
        idat[0] = p__('>I', len(idat[2]))

        # Footer: size, marker, None, CRC32
        iend = [b'', b'IEND', b'', b'']
        iend[3] = p__('>I', crc32(iend[1]) & 0xffffffff)
        iend[0] = p__('>I', len(iend[2]))

        with open(output, 'wb') as fileh:
            fileh.write(magic)
            fileh.write(b''.join(ihdr))
            fileh.write(b''.join(idat))
            fileh.write(b''.join(iend))
            return

        err = 'Error writing data to "{0}".'.format(output)
        raise ScreenshotError(err)
项目:ekko    作者:openstack    | 项目源码 | 文件源码
def compress(data):
        return zlib.compress(data)
项目:openstack-deploy    作者:yaoice    | 项目源码 | 文件源码
def read_file(filename):
    filename_path = os.path.join('/etc/ceph', filename)

    if not os.path.exists(filename_path):
        json_exit("file not found: {}".format(filename_path), failed=True)
    if not os.access(filename_path, os.R_OK):
        json_exit("file not readable: {}".format(filename_path), failed=True)

    with open(filename_path, 'rb') as f:
        raw_data = f.read()

    return {'content': base64.b64encode(zlib.compress(raw_data)),
            'sha1': hashlib.sha1(raw_data).hexdigest(),
            'filename': filename}
项目:openstack-deploy    作者:yaoice    | 项目源码 | 文件源码
def copy_to_host(module):
    compress = module.params.get('compress')
    dest = module.params.get('dest')
    mode = int(module.params.get('mode'), 0)
    sha1 = module.params.get('sha1')
    src = module.params.get('src')

    data = base64.b64decode(src)
    raw_data = zlib.decompress(data) if compress else data

    if sha1:
        if os.path.exists(dest):
            if os.access(dest, os.R_OK):
                with open(dest, 'rb') as f:
                    if hashlib.sha1(f.read()).hexdigest() == sha1:
                        module.exit_json(changed=False)
            else:
                module.exit_json(failed=True, changed=False,
                                 msg='file is not accessible: {}'.format(dest))

        if sha1 != hashlib.sha1(raw_data).hexdigest():
            module.exit_json(failed=True, changed=False,
                             msg='sha1 sum does not match data')

    with os.fdopen(os.open(dest, os.O_WRONLY | os.O_CREAT, mode), 'wb') as f:
        f.write(raw_data)

    module.exit_json(changed=True)
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def dumps(self, response, body=None):

        if body is None:
            body = response.read(decode_content=False)

            # NOTE: 99% sure this is dead code. I'm only leaving it
            #       here b/c I don't have a test yet to prove
            #       it. Basically, before using
            #       `cachecontrol.filewrapper.CallbackFileWrapper`,
            #       this made an effort to reset the file handle. The
            #       `CallbackFileWrapper` short circuits this code by
            #       setting the body as the content is consumed, the
            #       result being a `body` argument is *always* passed
            #       into cache_response, and in turn,
            #       `Serializer.dump`.
            response._fp = io.BytesIO(body)

        data = {
            "response": {
                "body": _b64_encode_bytes(body),
                "headers": dict(
                    (_b64_encode(k), _b64_encode(v))
                    for k, v in response.headers.items()
                ),
                "status": response.status,
                "version": response.version,
                "reason": _b64_encode_str(response.reason),
                "strict": response.strict,
                "decode_content": response.decode_content,
            },
        }

        return zlib.compress(
                json.dumps(
                    data, separators=(",", ":"), sort_keys=True,
                ).encode("utf8"),
                )
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def set_internal(self, key, value):
        # type: (str, bytes) -> None
        value = zlib.compress(value, self.ZLIB_COMPRESSION_LEVEL)
        self.kvstore.put(key, value)
项目:cloud-volume    作者:seung-lab    | 项目源码 | 文件源码
def encode_npz(subvol):
    """
    This file format is unrelated to np.savez
    We are just saving as .npy and the compressing
    using zlib. 
    The .npy format contains metadata indicating
    shape and dtype, instead of np.tobytes which doesn't
    contain any metadata.
    """
    fileobj = io.BytesIO()
    if len(subvol.shape) == 3:
        subvol = np.expand_dims(subvol, 0)
    np.save(fileobj, subvol)
    cdz = zlib.compress(fileobj.getvalue())
    return cdz
项目:imagepaste    作者:robinchenyu    | 项目源码 | 文件源码
def add_itxt(self, key, value, lang="", tkey="", zip=False):
        """Appends an iTXt chunk.

        :param key: latin-1 encodable text key name
        :param value: value for this key
        :param lang: language code
        :param tkey: UTF-8 version of the key name
        :param zip: compression flag

        """

        if not isinstance(key, bytes):
            key = key.encode("latin-1", "strict")
        if not isinstance(value, bytes):
            value = value.encode("utf-8", "strict")
        if not isinstance(lang, bytes):
            lang = lang.encode("utf-8", "strict")
        if not isinstance(tkey, bytes):
            tkey = tkey.encode("utf-8", "strict")

        if zip:
            self.add(b"iTXt", key + b"\0\x01\0" + lang + b"\0" + tkey + b"\0" +
                     zlib.compress(value))
        else:
            self.add(b"iTXt", key + b"\0\0\0" + lang + b"\0" + tkey + b"\0" +
                     value)
项目:imagepaste    作者:robinchenyu    | 项目源码 | 文件源码
def add_text(self, key, value, zip=0):
        """Appends a text chunk.

        :param key: latin-1 encodable text key name
        :param value: value for this key, text or an
           :py:class:`PIL.PngImagePlugin.iTXt` instance
        :param zip: compression flag

        """
        if isinstance(value, iTXt):
            return self.add_itxt(key, value, value.lang, value.tkey, bool(zip))

        # The tEXt chunk stores latin-1 text
        if not isinstance(value, bytes):
            try:
                value = value.encode('latin-1', 'strict')
            except UnicodeError:
                return self.add_itxt(key, value, zip=bool(zip))

        if not isinstance(key, bytes):
            key = key.encode('latin-1', 'strict')

        if zip:
            self.add(b"zTXt", key + b"\0\0" + zlib.compress(value))
        else:
            self.add(b"tEXt", key + b"\0" + value)


# --------------------------------------------------------------------
# PNG image stream (IHDR/IEND)
项目:otRebuilder    作者:Pal3love    | 项目源码 | 文件源码
def compress(data, level=ZLIB_COMPRESSION_LEVEL):
    """ Compress 'data' to Zlib format. If 'USE_ZOPFLI' variable is True,
    zopfli is used instead of the zlib module.
    The compression 'level' must be between 0 and 9. 1 gives best speed,
    9 gives best compression (0 gives no compression at all).
    The default value is a compromise between speed and compression (6).
    """
    if not (0 <= level <= 9):
        raise ValueError('Bad compression level: %s' % level)
    if not USE_ZOPFLI or level == 0:
        from zlib import compress
        return compress(data, level)
    else:
        from zopfli.zlib import compress
        return compress(data, numiterations=ZOPFLI_LEVELS[level])
项目:otRebuilder    作者:Pal3love    | 项目源码 | 文件源码
def encodeData(self, data):
        self.origLength = len(data)
        if not self.uncompressed:
            compressedData = compress(data, self.zlibCompressionLevel)
        if self.uncompressed or len(compressedData) >= self.origLength:
            # Encode uncompressed
            rawData = data
            self.length = self.origLength
        else:
            rawData = compressedData
            self.length = len(rawData)
        return rawData
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def picklechops(chops):
    """Pickles and base64encodes it's argument chops"""

    value = zlib.compress(dumps(chops))
    encoded = base64.encodestring(value)
    return encoded.strip()
项目:sc8pr    作者:dmaccarthy    | 项目源码 | 文件源码
def surfaceData(srf, compress=zlib.compress):
    "Convert surface to bytes data with optional compression"
    if not isinstance(srf, pygame.Surface): srf = srf.image
    w, h = srf.get_size()
    a = hasAlpha(srf)
    mode = (1 if a else 0) + (2 if compress else 0)
    mode = struct.pack("!3I", mode, w, h)
    data = pygame.image.tostring(srf, "RGBA" if a else "RGB")
    return (compress(data) if compress else data), mode
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def add_itxt(self, key, value, lang="", tkey="", zip=False):
        """Appends an iTXt chunk.

        :param key: latin-1 encodable text key name
        :param value: value for this key
        :param lang: language code
        :param tkey: UTF-8 version of the key name
        :param zip: compression flag

        """

        if not isinstance(key, bytes):
            key = key.encode("latin-1", "strict")
        if not isinstance(value, bytes):
            value = value.encode("utf-8", "strict")
        if not isinstance(lang, bytes):
            lang = lang.encode("utf-8", "strict")
        if not isinstance(tkey, bytes):
            tkey = tkey.encode("utf-8", "strict")

        if zip:
            self.add(b"iTXt", key + b"\0\x01\0" + lang + b"\0" + tkey + b"\0" +
                     zlib.compress(value))
        else:
            self.add(b"iTXt", key + b"\0\0\0" + lang + b"\0" + tkey + b"\0" +
                     value)
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def add_text(self, key, value, zip=0):
        """Appends a text chunk.

        :param key: latin-1 encodable text key name
        :param value: value for this key, text or an
           :py:class:`PIL.PngImagePlugin.iTXt` instance
        :param zip: compression flag

        """
        if isinstance(value, iTXt):
            return self.add_itxt(key, value, value.lang, value.tkey, bool(zip))

        # The tEXt chunk stores latin-1 text
        if not isinstance(value, bytes):
            try:
                value = value.encode('latin-1', 'strict')
            except UnicodeError:
                return self.add_itxt(key, value, zip=bool(zip))

        if not isinstance(key, bytes):
            key = key.encode('latin-1', 'strict')

        if zip:
            self.add(b"zTXt", key + b"\0\0" + zlib.compress(value))
        else:
            self.add(b"tEXt", key + b"\0" + value)


# --------------------------------------------------------------------
# PNG image stream (IHDR/IEND)
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def testExtraPixels(self):
        """Test file that contains too many pixels."""

        def eachchunk(chunk):
            if chunk[0] != 'IDAT':
                return chunk
            data = zlib.decompress(chunk[1])
            data += strtobytes('\x00garbage')
            data = zlib.compress(data)
            chunk = (chunk[0], data)
            return chunk
        self.assertRaises(FormatError, self.helperFormat, eachchunk)
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def testNotEnoughPixels(self):
        def eachchunk(chunk):
            if chunk[0] != 'IDAT':
                return chunk
            # Remove last byte.
            data = zlib.decompress(chunk[1])
            data = data[:-1]
            data = zlib.compress(data)
            return (chunk[0], data)
        self.assertRaises(FormatError, self.helperFormat, eachchunk)
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def testBadFilter(self):
        def eachchunk(chunk):
            if chunk[0] != 'IDAT':
                return chunk
            data = zlib.decompress(chunk[1])
            # Corrupt the first filter byte
            data = strtobytes('\x99') + data[1:]
            data = zlib.compress(data)
            return (chunk[0], data)
        self.assertRaises(FormatError, self.helperFormat, eachchunk)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def dump_payload(self, obj):
        json = super(URLSafeSerializerMixin, self).dump_payload(obj)
        is_compressed = False
        compressed = zlib.compress(json)
        if len(compressed) < (len(json) - 1):
            json = compressed
            is_compressed = True
        base64d = base64_encode(json)
        if is_compressed:
            base64d = b'.' + base64d
        return base64d
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def _save(path, obj):
        "Save an object to the specified path."
        data = zlib.compress(pickletools.optimize(pickle.dumps(obj)), 9)
        with open(path, 'wb') as file:
            file.write(data)
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def quit_game(event=None):
    # Save HST and quit program.
    file(HST_FILE, 'wb').write(zlib.compress(repr(HS_database), 9))
    root.quit()

################################################################################

# HST PREPARATION FUNCTIONS
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def asciiCompress(data, level=9):
    """ compress data to printable ascii-code """

    code = zlib.compress(data,level)
    csum = zlib.crc32(code)
    code = base64.encodestring(code)
    return code, csum
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def bz2_pack(source):
    "Returns 'source' as a bzip2-compressed, self-extracting python script."
    import bz2, base64
    out = ""
    compressed_source = bz2.compress(source)
    out += 'import bz2, base64\n'
    out += "exec bz2.decompress(base64.b64decode('"
    out += base64.b64encode((compressed_source))
    out += "'))\n"
    return out
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def gz_pack(source):
    "Returns 'source' as a gzip-compressed, self-extracting python script."
    import zlib, base64
    out = ""
    compressed_source = zlib.compress(source)
    out += 'import zlib, base64\n'
    out += "exec zlib.decompress(base64.b64decode('"
    out += base64.b64encode((compressed_source))
    out += "'))\n"
    return out

# The test.+() functions below are for testing pyminifer...
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def load(cls, path):
        # Loads programs and handles optimized files.
        ws = path + '.ws'
        cp = path + '.wso'
        compiled = False
        if os.path.isfile(cp):
            compiled = True
            if os.path.isfile(ws):
                if os.path.getmtime(ws) > os.path.getmtime(cp):
                    compiled = False
        final = cls._final()
        cls._check(final)
        if compiled:
            try:
                with open(cp, 'rb') as file:
                    code = file.read(len(final))
                    cls._check(code)
                    data = file.read()
                return cls(pickle.loads(zlib.decompress(data)))
            except:
                pass
        data = load(ws)
        code = trinary(data)
        program = parse(code)
        serialized = pickle.dumps(program, pickle.HIGHEST_PROTOCOL)
        optimized = zlib.compress(serialized, 9)
        with open(cp, 'wb') as file:
            file.write(final + optimized)
        return cls(program)