Python zlib 模块,compressobj() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zlib.compressobj()

项目:pytoshop    作者:mdboom    | 项目源码 | 文件源码
def compress_zip(fd,      # type: BinaryIO
                 image,   # type: np.ndarray
                 depth,   # type: int
                 version  # type: int
                 ):       # type: (...) -> None
    """
    Write a Numpy array to a zip (zlib) compressed stream.

{}
    """
    image = normalize_image(image, depth)
    if util.needs_byteswap(image):
        compressor = zlib.compressobj()
        for row in image:
            row = util.do_byteswap(row)
            fd.write(compressor.compress(row))
        fd.write(compressor.flush())
    else:
        fd.write(zlib.compress(image))
项目:tools    作者:pwnaccelerator    | 项目源码 | 文件源码
def gzip_encode_add_padding(content):
    "* Compressing content..."
    num_chunks = len(content) / CHUNK_SIZE # let's not care about remainders
    gzip_compress = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
    data = gzip_compress.compress(content)
    comp_cnt = 0
    replay = reorder(content[0:num_chunks*CHUNK_SIZE], arg_blocks)
    assert(len(replay) % CHUNK_SIZE == 0)
    num_chunks = len(replay) / CHUNK_SIZE # update the blocks
    print "** Duplicating content (CBC attack)..."
    data += gzip_compress.compress(replay) # duplicate cipher, should result in duplicate plaintext (prefixed by some garbage)
    while comp_cnt < WRAP_SIZE-(num_chunks*CHUNK_SIZE+10*CHUNK_SIZE):
        data += gzip_compress.compress("A"*CHUNK_SIZE)
        comp_cnt += CHUNK_SIZE
    print "** Copy original padding..."
    data += gzip_compress.compress(content[len(content) - 10*CHUNK_SIZE:len(content)]) # copy valid PKCS7 padding
    data = data + gzip_compress.flush()
    print "*** Finished"
    return data
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def compress (filename, input, output):
    output.write('\037\213\010')        # Write the header, ...
    output.write(chr(FNAME))            # ... flag byte ...

    statval = os.stat(filename)           # ... modification time ...
    mtime = statval[8]
    write32(output, mtime)
    output.write('\002')                # ... slowest compression alg. ...
    output.write('\377')                # ... OS (=unknown) ...
    output.write(filename+'\000')       # ... original filename ...

    crcval = zlib.crc32("")
    compobj = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS,
                             zlib.DEF_MEM_LEVEL, 0)
    while True:
        data = input.read(1024)
        if data == "":
            break
        crcval = zlib.crc32(data, crcval)
        output.write(compobj.compress(data))
    output.write(compobj.flush())
    write32(output, crcval)             # ... the CRC ...
    write32(output, statval[6])         # and the file size.
项目:ted-editor    作者:tarnheld    | 项目源码 | 文件源码
def deflate(data, compresslevel=9):
    # Compress
    compress = zlib.compressobj(compresslevel, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0)
    deflated = compress.compress(data)
    deflated += compress.flush()
    # Add PDs compression magic and negated file length (8 bytes)
    length = int(-len(deflated))
    magic = bytearray(b'\xc5\xee\xf7\xff\x00\x00\x00\x00')
    magic[4] = byte(length, 0)
    magic[5] = byte(length >> 8, 0)
    magic[6] = byte(length >> 0x10, 0)
    magic[7] = byte(length >> 0x18, 0)
    deflatedwithheader = bytes(magic) + deflated
    finaldata = xor(deflatedwithheader)
    return finaldata

# Decrypts the GT6TED, removes the magic and negated file length and
# decompresses the data
项目:compression-identifier    作者:koutto    | 项目源码 | 文件源码
def compress_gzip(data):
    compressed = None
    gzip_compress = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
    try:
        compressed = gzip_compress.compress(data) + gzip_compress.flush()
    except Exception as e:
        print_error('Error when compressing with Gzip: {0}'.format(e))
    return compressed

# def decompress_gzip(data):
#   decompressed=None
#   try:
#       decompressed = zlib.decompress(gzip_data, zlib.MAX_WBITS|16)
#   except:
#       pass
#   return decompressed
项目:sphinx-fakeinv    作者:dahlia    | 项目源码 | 文件源码
def print_inventory(file_, package, version, objects):
    file_.write(b'# Sphinx inventory version 2\n')
    file_.write(b'# Project: ')
    file_.write(package.encode('utf-8'))
    file_.write(b'\n')
    file_.write(b'# Version: ')
    file_.write(version.encode('utf-8'))
    file_.write(b'\n')
    file_.write(b'# The remainder of this file is compressed using zlib.\n')
    codec = zlib.compressobj()
    fmt = '{0} {1} {2} . -\n'.format
    for name, kind, mysterious_number, _ in objects:
        line = fmt(name, kind, mysterious_number)
        code = codec.compress(line.encode('utf-8'))
        file_.write(code)
    file_.write(codec.flush())
    file_.flush()
项目:3dstools    作者:ObsidianX    | 项目源码 | 文件源码
def compress_file(self):
        # stream-compress to another file then overwrite original
        self.file = open(self.filename, 'rb')
        compressed_filename = '%s.zlib' % self.filename
        compressed_file = open(compressed_filename, 'wb')
        compressor = zlib.compressobj(self.compression_level)

        compressed_file.write(struct.pack('>I', os.stat(self.filename).st_size))

        data = self.file.read(READ_AMOUNT)
        while len(data) > 0:
            compressed_file.write(compressor.compress(data))
            data = self.file.read(READ_AMOUNT)

        compressed_file.write(compressor.flush(zlib.Z_FINISH))

        self.file.close()
        compressed_file.close()

        os.rename(compressed_filename, self.filename)
项目:mechanize    作者:python-mechanize    | 项目源码 | 文件源码
def compress_readable_output(src_file, compress_level=6):
    crc = zlib.crc32(b"")
    size = 0
    zobj = zlib.compressobj(compress_level, zlib.DEFLATED, -zlib.MAX_WBITS,
                            zlib.DEF_MEM_LEVEL, zlib.Z_DEFAULT_STRATEGY)
    prefix_written = False
    while True:
        data = src_file.read(DEFAULT_BUFFER_SIZE)
        if not data:
            break
        size += len(data)
        crc = zlib.crc32(data, crc)
        data = zobj.compress(data)
        if not prefix_written:
            prefix_written = True
            data = gzip_prefix() + data
        yield data
    yield zobj.flush() + struct.pack(b"<LL", crc & 0xffffffffL, size)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def _newKeys(self):
        """
        Called back by a subclass once a I{MSG_NEWKEYS} message has been
        received.  This indicates key exchange has completed and new encryption
        and compression parameters should be adopted.  Any messages which were
        queued during key exchange will also be flushed.
        """
        log.msg('NEW KEYS')
        self.currentEncryptions = self.nextEncryptions
        if self.outgoingCompressionType == b'zlib':
            self.outgoingCompression = zlib.compressobj(6)
        if self.incomingCompressionType == b'zlib':
            self.incomingCompression = zlib.decompressobj()

        self._keyExchangeState = self._KEY_EXCHANGE_NONE
        messages = self._blockedByKeyExchange
        self._blockedByKeyExchange = None
        for (messageType, payload) in messages:
            self.sendPacket(messageType, payload)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def add_compression_filter(self, encoding='deflate', *,
                               EOF_MARKER=EOF_MARKER, EOL_MARKER=EOL_MARKER):
        """Compress incoming stream with deflate or gzip encoding."""
        zlib_mode = (16 + zlib.MAX_WBITS
                     if encoding == 'gzip' else -zlib.MAX_WBITS)
        zcomp = zlib.compressobj(wbits=zlib_mode)

        chunk = yield
        while True:
            if chunk is EOF_MARKER:
                yield zcomp.flush()
                chunk = yield EOF_MARKER

            else:
                yield zcomp.compress(chunk)
                chunk = yield EOL_MARKER
项目:pytest-vts    作者:bhodorog    | 项目源码 | 文件源码
def test_recording_gzipped_responses_as_text(vts_rec_on, httpserver):
    data = "Hello!"
    # http://stackoverflow.com/a/22310760
    gzip_compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
    gzipped = gzip_compressor.compress(data.encode()) + gzip_compressor.flush()
    httpserver.serve_content(
        gzipped, 200,
        headers={"Content-Encoding": "gzip"})
    url = "{}/".format(httpserver.url)
    resp = requests.get(url)
    assert resp.status_code == 200
    assert resp.text == data
    assert len(vts_rec_on.cassette) == 1
    track = vts_rec_on.cassette[0]
    assert track['request']['url'] == url
    assert "Content-Encoding" in track['response']['headers']
    assert track['response']['body'] == data


# enable pytester fixture which allows running pytests within tests
项目:aiohttp-tokio    作者:fafhrd91    | 项目源码 | 文件源码
def test_response_with_precompressed_body_gzip(loop, test_client):

    @asyncio.coroutine
    def handler(request):
        headers = {'Content-Encoding': 'gzip'}
        zcomp = zlib.compressobj(wbits=16 + zlib.MAX_WBITS)
        data = zcomp.compress(b'mydata') + zcomp.flush()
        return web.Response(body=data, headers=headers)

    app = web.Application()
    app.router.add_get('/', handler)
    client = yield from test_client(app)

    resp = yield from client.get('/')
    assert 200 == resp.status
    data = yield from resp.read()
    assert b'mydata' == data
    assert resp.headers.get('Content-Encoding') == 'gzip'
项目:aiohttp-tokio    作者:fafhrd91    | 项目源码 | 文件源码
def test_response_with_precompressed_body_deflate(loop, test_client):

    @asyncio.coroutine
    def handler(request):
        headers = {'Content-Encoding': 'deflate'}
        zcomp = zlib.compressobj(wbits=-zlib.MAX_WBITS)
        data = zcomp.compress(b'mydata') + zcomp.flush()
        return web.Response(body=data, headers=headers)

    app = web.Application()
    app.router.add_get('/', handler)
    client = yield from test_client(app)

    resp = yield from client.get('/')
    assert 200 == resp.status
    data = yield from resp.read()
    assert b'mydata' == data
    assert resp.headers.get('Content-Encoding') == 'deflate'
项目:AwesomenautsFileDumper    作者:Nodja    | 项目源码 | 文件源码
def deflate(data):
    c = zlib.compressobj()
    out = c.compress(data)
    out += c.flush(zlib.Z_SYNC_FLUSH)
    return out
项目:sndlatr    作者:Schibum    | 项目源码 | 文件源码
def start_compressing(self):
        """start_compressing()
        Enable deflate compression on the socket (RFC 4978)."""

        # rfc 1951 - pure DEFLATE, so use -15 for both windows
        self.decompressor = zlib.decompressobj(-15)
        self.compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _create_compressor(self):
        return zlib.compressobj(tornado.web.GZipContentEncoding.GZIP_LEVEL,
                                zlib.DEFLATED, -self._max_wbits)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _create_compressor(self):
        return zlib.compressobj(tornado.web.GZipContentEncoding.GZIP_LEVEL,
                                zlib.DEFLATED, -self._max_wbits)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _create_compressor(self):
        return zlib.compressobj(tornado.web.GZipContentEncoding.GZIP_LEVEL,
                                zlib.DEFLATED, -self._max_wbits)
项目:histwords    作者:williamleif    | 项目源码 | 文件源码
def write_response(handler, code, headers, data=""):
    handler.send_response(200)
    for header in headers:
        i = header.index(":")
        s,e = header[:i], header[i+1:]
        handler.send_header(s,e)

    if data:
        zlib_encode = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
        content = zlib_encode.compress(data) + zlib_encode.flush()

        if len(content) < len(data):
            handler.send_header('Content-Encoding', 'gzip')
            handler.send_header('Content-Length', len(content))
        else:
            content = data

        handler.end_headers()

        handler.wfile.write(content)
    else:
        handler.wfile.write(data)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def ssh_NEWKEYS(self, packet):
        if packet != '':
            self.sendDisconnect(DISCONNECT_PROTOCOL_ERROR, "NEWKEYS takes no data")
        self.currentEncryptions = self.nextEncryptions
        if self.outgoingCompressionType == 'zlib':
            self.outgoingCompression = zlib.compressobj(6)
            #self.outgoingCompression.compress = lambda x: self.outgoingCompression.compress(x) + self.outgoingCompression.flush(zlib.Z_SYNC_FLUSH)
        if self.incomingCompressionType == 'zlib':
            self.incomingCompression = zlib.decompressobj()
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def ssh_NEWKEYS(self, packet):
        if packet != '':
            self.sendDisconnect(DISCONNECT_PROTOCOL_ERROR, "NEWKEYS takes no data")
        if not self.nextEncryptions.enc_block_size:
            self._gotNewKeys = 1
            return
        self.currentEncryptions = self.nextEncryptions
        if self.outgoingCompressionType == 'zlib':
            self.outgoingCompression = zlib.compressobj(6)
            #self.outgoingCompression.compress = lambda x: self.outgoingCompression.compress(x) + self.outgoingCompression.flush(zlib.Z_SYNC_FLUSH)
        if self.incomingCompressionType == 'zlib':
            self.incomingCompression = zlib.decompressobj()
        self.connectionSecure()
项目:pymotw3    作者:reingart    | 项目源码 | 文件源码
def handle(self):
        compressor = zlib.compressobj(1)

        # Find out what file the client wants
        filename = self.request.recv(1024).decode('utf-8')
        self.logger.debug('client asked for: %r', filename)

        # Send chunks of the file as they are compressed
        with open(filename, 'rb') as input:
            while True:
                block = input.read(BLOCK_SIZE)
                if not block:
                    break
                self.logger.debug('RAW %r', block)
                compressed = compressor.compress(block)
                if compressed:
                    self.logger.debug(
                        'SENDING %r',
                        binascii.hexlify(compressed))
                    self.request.send(compressed)
                else:
                    self.logger.debug('BUFFERING')

        # Send any data being buffered by the compressor
        remaining = compressor.flush()
        while remaining:
            to_send = remaining[:BLOCK_SIZE]
            remaining = remaining[BLOCK_SIZE:]
            self.logger.debug('FLUSHING %r',
                              binascii.hexlify(to_send))
            self.request.send(to_send)
        return
项目:piqueserver    作者:piqueserver    | 项目源码 | 文件源码
def __init__(self, map_, parent=False):
        # parent=True enables saving all data sent instead of just
        # deleting it afterwards.
        self.parent = parent
        self.generator = map_.get_generator()
        self.compressor = zlib.compressobj(COMPRESSION_LEVEL)
项目:watchmen    作者:lycclsltt    | 项目源码 | 文件源码
def __init__(self):
        self.z = zlib.compressobj(9)
项目:nojs    作者:chrisdickinson    | 项目源码 | 文件源码
def _CalculateCompressedSize(file_path):
  CHUNK_SIZE = 256 * 1024
  compressor = zlib.compressobj()
  total_size = 0
  with open(file_path, 'rb') as f:
    for chunk in iter(lambda: f.read(CHUNK_SIZE), ''):
      total_size += len(compressor.compress(chunk))
  total_size += len(compressor.flush())
  return total_size
项目:aquests    作者:hansroh    | 项目源码 | 文件源码
def __init__ (self, level = 5):
        self.compressor = zlib.compressobj (5, zlib.DEFLATED)
项目:aquests    作者:hansroh    | 项目源码 | 文件源码
def __init__ (self, level = 5):
        self.size = 0
        self.crc = zlib.crc32(b"")
        self.compressor = zlib.compressobj (level, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0)
        self.first_data = True
项目:aquests    作者:hansroh    | 项目源码 | 文件源码
def __init__ (self, producer, level=6):
        self.producer = producer
        self.compressor = zlib.compressobj (level, zlib.DEFLATED)
        self.override ()
项目:warcio    作者:webrecorder    | 项目源码 | 文件源码
def __init__(self, out):
        self.compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS + 16)
        self.out = out
项目:warcio    作者:webrecorder    | 项目源码 | 文件源码
def compress(buff):
    buff = buff.encode('utf-8')
    compressobj = zlib.compressobj(6, zlib.DEFLATED, zlib.MAX_WBITS + 16)
    compressed = compressobj.compress(buff)
    compressed += compressobj.flush()

    return compressed

# plain "inflate"
项目:warcio    作者:webrecorder    | 项目源码 | 文件源码
def compress_alt(buff):
    buff = buff.encode('utf-8')
    compressobj = zlib.compressobj(6, zlib.DEFLATED)
    compressed = compressobj.compress(buff)
    compressed += compressobj.flush()
    # drop gzip headers/tail
    compressed = compressed[2:-4]

    return compressed

# Brotli
项目:wsproto    作者:python-hyper    | 项目源码 | 文件源码
def frame_outbound(self, proto, opcode, rsv, data, fin):
        if not self._compressible_opcode(opcode):
            return (rsv, data)

        if opcode is not Opcode.CONTINUATION:
            rsv = RsvBits(True, *rsv[1:])

        if self._compressor is None:
            assert opcode is not Opcode.CONTINUATION
            if proto.client:
                bits = self.client_max_window_bits
            else:
                bits = self.server_max_window_bits
            self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
                                                zlib.DEFLATED, -int(bits))

        data = self._compressor.compress(bytes(data))

        if fin:
            data += self._compressor.flush(zlib.Z_SYNC_FLUSH)
            data = data[:-4]

            if proto.client:
                no_context_takeover = self.client_no_context_takeover
            else:
                no_context_takeover = self.server_no_context_takeover

            if no_context_takeover:
                self._compressor = None

        return (rsv, data)
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def write_to_stdout(data):
        sys.stdout.write(data + '\n')
        sys.stdout.flush()

    # The standard zlib.compressobj() accepts only positional arguments.
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def zlib_compressobj(level=6, method=zlib.DEFLATED, wbits=15, memlevel=8,
                         strategy=zlib.Z_DEFAULT_STRATEGY):
        return zlib.compressobj(level, method, wbits, memlevel, strategy)
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def gzip_app_iter(app_iter):
    size = 0
    crc = zlib.crc32(b"") & 0xffffffff
    compress = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS,
                                zlib.DEF_MEM_LEVEL, 0)

    yield _gzip_header
    for item in app_iter:
        size += len(item)
        crc = zlib.crc32(item, crc) & 0xffffffff
        yield compress.compress(item)
    yield compress.flush()
    yield struct.pack("<2L", crc, size & 0xffffffff)
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def write_to_stdout(data):
        sys.stdout.write(data + '\n')
        sys.stdout.flush()

    # The standard zlib.compressobj() accepts only positional arguments.
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def zlib_compressobj(level=6, method=zlib.DEFLATED, wbits=15, memlevel=8,
                         strategy=zlib.Z_DEFAULT_STRATEGY):
        return zlib.compressobj(level, method, wbits, memlevel, strategy)
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def gzip_app_iter(app_iter):
    size = 0
    crc = zlib.crc32(b"") & 0xffffffff
    compress = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS,
                                zlib.DEF_MEM_LEVEL, 0)

    yield _gzip_header
    for item in app_iter:
        size += len(item)
        crc = zlib.crc32(item, crc) & 0xffffffff
        yield compress.compress(item)
    yield compress.flush()
    yield struct.pack("<2L", crc, size & 0xffffffff)
项目:pytoshop    作者:mdboom    | 项目源码 | 文件源码
def compress_zip_prediction(fd,      # type: BinaryIO
                            image,   # type: np.ndarray
                            depth,   # type: int
                            version  # type: int
                            ):       # type: (...) -> None
    """
    Write a Numpy array to a zip (zlib) with prediction compressed
    stream.

    Not supported for 1- or 32-bit images.

{}
    """
    if depth == 1:  # pragma: no cover
        raise ValueError(
            "zip with prediction is not supported for 1-bit images")
    elif depth == 32:  # pragma: no cover
        raise ValueError(
            "zip with prediction is not implemented for 32-bit images")
    elif depth == 8:
        encoder = packbits.encode_prediction_8bit
    elif depth == 16:
        encoder = packbits.encode_prediction_16bit

    compressor = zlib.compressobj()
    for row in image:
        encoder(row.flatten())
        row = util.ensure_bigendian(row)
        fd.write(compressor.compress(row))
    fd.write(compressor.flush())
项目:chromium-build    作者:discordapp    | 项目源码 | 文件源码
def _CalculateCompressedSize(file_path):
  CHUNK_SIZE = 256 * 1024
  compressor = zlib.compressobj()
  total_size = 0
  with open(file_path, 'rb') as f:
    for chunk in iter(lambda: f.read(CHUNK_SIZE), ''):
      total_size += len(compressor.compress(chunk))
  total_size += len(compressor.flush())
  return total_size
项目:tools    作者:pwnaccelerator    | 项目源码 | 文件源码
def gzip_encode(content):
    gzip_compress = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
    data = gzip_compress.compress(content) + gzip_compress.flush()
    return data
项目:gn_build    作者:realcome    | 项目源码 | 文件源码
def _CalculateCompressedSize(file_path):
  CHUNK_SIZE = 256 * 1024
  compressor = zlib.compressobj()
  total_size = 0
  with open(file_path, 'rb') as f:
    for chunk in iter(lambda: f.read(CHUNK_SIZE), ''):
      total_size += len(compressor.compress(chunk))
  total_size += len(compressor.flush())
  return total_size
项目:SublimeRemoteGDB    作者:summerwinter    | 项目源码 | 文件源码
def __init__(self):
        self.z = zlib.compressobj(9)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def main():
    if len(sys.argv) > 1:
        filename = sys.argv[1]
    else:
        filename = sys.argv[0]
    print 'Reading', filename

    f = open(filename, 'rb')           # Get the data to compress
    s = f.read()
    f.close()

    # First, we'll compress the string in one step
    comptext = zlib.compress(s, 1)
    decomp = zlib.decompress(comptext)

    print '1-step compression: (level 1)'
    print '    Original:', len(s), 'Compressed:', len(comptext),
    print 'Uncompressed:', len(decomp)

    # Now, let's compress the string in stages; set chunk to work in smaller steps

    chunk = 256
    compressor = zlib.compressobj(9)
    decompressor = zlib.decompressobj()
    comptext = decomp = ''
    for i in range(0, len(s), chunk):
        comptext = comptext+compressor.compress(s[i:i+chunk])
    # Don't forget to call flush()!!
    comptext = comptext + compressor.flush()

    for i in range(0, len(comptext), chunk):
        decomp = decomp + decompressor.decompress(comptext[i:i+chunk])
    decomp=decomp+decompressor.flush()

    print 'Progressive compression (level 9):'
    print '    Original:', len(s), 'Compressed:', len(comptext),
    print 'Uncompressed:', len(decomp)
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def _init_zlib(self):
    '''
    Internal method for setting up the zlib compression and
    decompression objects.
    '''
    self._zcomp_read = zlib.decompressobj()
    self._zcomp_write = zlib.compressobj(self.compresslevel)
项目:chalktalk_docs    作者:loremIpsum1771    | 项目源码 | 文件源码
def dump_inventory(self):
        self.info(bold('dumping object inventory... '), nonl=True)
        f = open(path.join(self.outdir, INVENTORY_FILENAME), 'wb')
        try:
            f.write((u'# Sphinx inventory version 2\n'
                     u'# Project: %s\n'
                     u'# Version: %s\n'
                     u'# The remainder of this file is compressed using zlib.\n'
                     % (self.config.project, self.config.version)).encode('utf-8'))
            compressor = zlib.compressobj(9)
            for domainname, domain in iteritems(self.env.domains):
                for name, dispname, type, docname, anchor, prio in \
                        sorted(domain.get_objects()):
                    if anchor.endswith(name):
                        # this can shorten the inventory by as much as 25%
                        anchor = anchor[:-len(name)] + '$'
                    uri = self.get_target_uri(docname) + '#' + anchor
                    if dispname == name:
                        dispname = u'-'
                    f.write(compressor.compress(
                        (u'%s %s:%s %s %s %s\n' % (name, domainname, type,
                                                   prio, uri, dispname)).encode('utf-8')))
            f.write(compressor.flush())
        finally:
            f.close()
        self.info('done')
项目:simple_ws    作者:WSnettverksprog    | 项目源码 | 文件源码
def __init__(self):
        self.compressor = zlib.compressobj(6, zlib.DEFLATED, -zlib.MAX_WBITS, 8)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def ssh_NEWKEYS(self, packet):
        if packet != '':
            self.sendDisconnect(DISCONNECT_PROTOCOL_ERROR, "NEWKEYS takes no data")
        self.currentEncryptions = self.nextEncryptions
        if self.outgoingCompressionType == 'zlib':
            self.outgoingCompression = zlib.compressobj(6)
            #self.outgoingCompression.compress = lambda x: self.outgoingCompression.compress(x) + self.outgoingCompression.flush(zlib.Z_SYNC_FLUSH)
        if self.incomingCompressionType == 'zlib':
            self.incomingCompression = zlib.decompressobj()
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def ssh_NEWKEYS(self, packet):
        if packet != '':
            self.sendDisconnect(DISCONNECT_PROTOCOL_ERROR, "NEWKEYS takes no data")
        if not self.nextEncryptions.enc_block_size:
            self._gotNewKeys = 1
            return
        self.currentEncryptions = self.nextEncryptions
        if self.outgoingCompressionType == 'zlib':
            self.outgoingCompression = zlib.compressobj(6)
            #self.outgoingCompression.compress = lambda x: self.outgoingCompression.compress(x) + self.outgoingCompression.flush(zlib.Z_SYNC_FLUSH)
        if self.incomingCompressionType == 'zlib':
            self.incomingCompression = zlib.decompressobj()
        self.connectionSecure()
项目:My-Web-Server-Framework-With-Python2.7    作者:syjsu    | 项目源码 | 文件源码
def _create_compressor(self):
        return zlib.compressobj(tornado.web.GZipContentEncoding.GZIP_LEVEL,
                                zlib.DEFLATED, -self._max_wbits)