Python zlib 模块,DEF_MEM_LEVEL 实例源码

我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用zlib.DEF_MEM_LEVEL

项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def compress (filename, input, output):
    output.write('\037\213\010')        # Write the header, ...
    output.write(chr(FNAME))            # ... flag byte ...

    statval = os.stat(filename)           # ... modification time ...
    mtime = statval[8]
    write32(output, mtime)
    output.write('\002')                # ... slowest compression alg. ...
    output.write('\377')                # ... OS (=unknown) ...
    output.write(filename+'\000')       # ... original filename ...

    crcval = zlib.crc32("")
    compobj = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS,
                             zlib.DEF_MEM_LEVEL, 0)
    while True:
        data = input.read(1024)
        if data == "":
            break
        crcval = zlib.crc32(data, crcval)
        output.write(compobj.compress(data))
    output.write(compobj.flush())
    write32(output, crcval)             # ... the CRC ...
    write32(output, statval[6])         # and the file size.
项目:ted-editor    作者:tarnheld    | 项目源码 | 文件源码
def deflate(data, compresslevel=9):
    # Compress
    compress = zlib.compressobj(compresslevel, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0)
    deflated = compress.compress(data)
    deflated += compress.flush()
    # Add PDs compression magic and negated file length (8 bytes)
    length = int(-len(deflated))
    magic = bytearray(b'\xc5\xee\xf7\xff\x00\x00\x00\x00')
    magic[4] = byte(length, 0)
    magic[5] = byte(length >> 8, 0)
    magic[6] = byte(length >> 0x10, 0)
    magic[7] = byte(length >> 0x18, 0)
    deflatedwithheader = bytes(magic) + deflated
    finaldata = xor(deflatedwithheader)
    return finaldata

# Decrypts the GT6TED, removes the magic and negated file length and
# decompresses the data
项目:mechanize    作者:python-mechanize    | 项目源码 | 文件源码
def compress_readable_output(src_file, compress_level=6):
    crc = zlib.crc32(b"")
    size = 0
    zobj = zlib.compressobj(compress_level, zlib.DEFLATED, -zlib.MAX_WBITS,
                            zlib.DEF_MEM_LEVEL, zlib.Z_DEFAULT_STRATEGY)
    prefix_written = False
    while True:
        data = src_file.read(DEFAULT_BUFFER_SIZE)
        if not data:
            break
        size += len(data)
        crc = zlib.crc32(data, crc)
        data = zobj.compress(data)
        if not prefix_written:
            prefix_written = True
            data = gzip_prefix() + data
        yield data
    yield zobj.flush() + struct.pack(b"<LL", crc & 0xffffffffL, size)
项目:aquests    作者:hansroh    | 项目源码 | 文件源码
def __init__ (self, level = 5):
        self.size = 0
        self.crc = zlib.crc32(b"")
        self.compressor = zlib.compressobj (level, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0)
        self.first_data = True
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def gzip_app_iter(app_iter):
    size = 0
    crc = zlib.crc32(b"") & 0xffffffff
    compress = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS,
                                zlib.DEF_MEM_LEVEL, 0)

    yield _gzip_header
    for item in app_iter:
        size += len(item)
        crc = zlib.crc32(item, crc) & 0xffffffff
        yield compress.compress(item)
    yield compress.flush()
    yield struct.pack("<2L", crc, size & 0xffffffff)
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def gzip_app_iter(app_iter):
    size = 0
    crc = zlib.crc32(b"") & 0xffffffff
    compress = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS,
                                zlib.DEF_MEM_LEVEL, 0)

    yield _gzip_header
    for item in app_iter:
        size += len(item)
        crc = zlib.crc32(item, crc) & 0xffffffff
        yield compress.compress(item)
    yield compress.flush()
    yield struct.pack("<2L", crc, size & 0xffffffff)
项目:mlens    作者:flennerhag    | 项目源码 | 文件源码
def __init__(self, filename, mode="rb", compresslevel=9):
        # This lock must be recursive, so that BufferedIOBase's
        # readline(), readlines() and writelines() don't deadlock.
        self._lock = RLock()
        self._fp = None
        self._closefp = False
        self._mode = _MODE_CLOSED
        self._pos = 0
        self._size = -1

        if not isinstance(compresslevel, int) or not (1 <= compresslevel <= 9):
            raise ValueError("'compresslevel' must be an integer "
                             "between 1 and 9. You provided 'compresslevel={}'"
                             .format(compresslevel))

        if mode == "rb":
            mode_code = _MODE_READ
            self._decompressor = zlib.decompressobj(self.wbits)
            self._buffer = b""
            self._buffer_offset = 0
        elif mode == "wb":
            mode_code = _MODE_WRITE
            self._compressor = zlib.compressobj(compresslevel,
                                                zlib.DEFLATED,
                                                self.wbits,
                                                zlib.DEF_MEM_LEVEL,
                                                0)
        else:
            raise ValueError("Invalid mode: %r" % (mode,))

        if isinstance(filename, _basestring):
            self._fp = io.open(filename, mode)
            self._closefp = True
            self._mode = mode_code
        elif hasattr(filename, "read") or hasattr(filename, "write"):
            self._fp = filename
            self._mode = mode_code
        else:
            raise TypeError("filename must be a str or bytes object, "
                            "or a file")
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def gzip_app_iter(app_iter):
    size = 0
    crc = zlib.crc32(b"") & 0xffffffff
    compress = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS,
                                zlib.DEF_MEM_LEVEL, 0)

    yield _gzip_header
    for item in app_iter:
        size += len(item)
        crc = zlib.crc32(item, crc) & 0xffffffff
        yield compress.compress(item)
    yield compress.flush()
    yield struct.pack("<2L", crc, size & 0xffffffff)
项目:autosub-bootstrapbill    作者:BenjV    | 项目源码 | 文件源码
def compress(body, compress_level):
    """Compress 'body' at the given compress_level."""
    import zlib

    # See http://www.gzip.org/zlib/rfc-gzip.html
    yield ntob('\x1f\x8b')       # ID1 and ID2: gzip marker
    yield ntob('\x08')           # CM: compression method
    yield ntob('\x00')           # FLG: none set
    # MTIME: 4 bytes
    yield struct.pack('<L', int(time.time()) & int('FFFFFFFF', 16))
    yield ntob('\x02')           # XFL: max compression, slowest algo
    yield ntob('\xff')           # OS: unknown

    crc = zlib.crc32(ntob(''))
    size = 0
    zobj = zlib.compressobj(compress_level,
                            zlib.DEFLATED, -zlib.MAX_WBITS,
                            zlib.DEF_MEM_LEVEL, 0)
    for line in body:
        size += len(line)
        crc = zlib.crc32(line, crc)
        yield zobj.compress(line)
    yield zobj.flush()

    # CRC32: 4 bytes
    yield struct.pack('<L', crc & int('FFFFFFFF', 16))
    # ISIZE: 4 bytes
    yield struct.pack('<L', size & int('FFFFFFFF', 16))
项目:vcfpy    作者:bihealth    | 项目源码 | 文件源码
def _write_block(self, block):
        # print("Saving %i bytes" % len(block))
        assert len(block) <= 65536
        # Giving a negative window bits means no gzip/zlib headers,
        # -15 used in samtools
        c = zlib.compressobj(self.compresslevel,
                             zlib.DEFLATED,
                             -15,
                             zlib.DEF_MEM_LEVEL,
                             0)
        compressed = c.compress(block) + c.flush()
        del c
        assert len(compressed) < 65536, \
            "TODO - Didn't compress enough, try less data in this block"
        crc = zlib.crc32(block)
        # Should cope with a mix of Python platforms...
        if crc < 0:
            crc = struct.pack("<i", crc)
        else:
            crc = struct.pack("<I", crc)
        bsize = struct.pack("<H", len(compressed) + 25)  # includes -1
        crc = struct.pack("<I", zlib.crc32(block) & 0xffffffff)
        uncompressed_length = struct.pack("<I", len(block))
        # Fixed 16 bytes,
        # gzip magic bytes (4) mod time (4),
        # gzip flag (1), os (1), extra length which is six (2),
        # sub field which is BC (2), sub field length of two (2),
        # Variable data,
        # 2 bytes: block length as BC sub field (2)
        # X bytes: the data
        # 8 bytes: crc (4), uncompressed data length (4)
        data = _bgzf_header + bsize + compressed + crc + uncompressed_length
        self._handle.write(data)
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def gzip_app_iter(app_iter):
    size = 0
    crc = zlib.crc32(b"") & 0xffffffff
    compress = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS,
                                zlib.DEF_MEM_LEVEL, 0)

    yield _gzip_header
    for item in app_iter:
        size += len(item)
        crc = zlib.crc32(item, crc) & 0xffffffff
        yield compress.compress(item)
    yield compress.flush()
    yield struct.pack("<2L", crc, size & 0xffffffff)
项目:Docker-XX-Net    作者:kuanghy    | 项目源码 | 文件源码
def gzip_app_iter(app_iter):
    size = 0
    crc = zlib.crc32(b"") & 0xffffffff
    compress = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS,
                                zlib.DEF_MEM_LEVEL, 0)

    yield _gzip_header
    for item in app_iter:
        size += len(item)
        crc = zlib.crc32(item, crc) & 0xffffffff
        yield compress.compress(item)
    yield compress.flush()
    yield struct.pack("<2L", crc, size & 0xffffffff)
项目:watcher    作者:nosmokingbandit    | 项目源码 | 文件源码
def compress(body, compress_level):
    """Compress 'body' at the given compress_level."""
    import zlib

    # See http://www.gzip.org/zlib/rfc-gzip.html
    yield ntob('\x1f\x8b')       # ID1 and ID2: gzip marker
    yield ntob('\x08')           # CM: compression method
    yield ntob('\x00')           # FLG: none set
    # MTIME: 4 bytes
    yield struct.pack("<L", int(time.time()) & int('FFFFFFFF', 16))
    yield ntob('\x02')           # XFL: max compression, slowest algo
    yield ntob('\xff')           # OS: unknown

    crc = zlib.crc32(ntob(""))
    size = 0
    zobj = zlib.compressobj(compress_level,
                            zlib.DEFLATED, -zlib.MAX_WBITS,
                            zlib.DEF_MEM_LEVEL, 0)
    for line in body:
        size += len(line)
        crc = zlib.crc32(line, crc)
        yield zobj.compress(line)
    yield zobj.flush()

    # CRC32: 4 bytes
    yield struct.pack("<L", crc & int('FFFFFFFF', 16))
    # ISIZE: 4 bytes
    yield struct.pack("<L", size & int('FFFFFFFF', 16))
项目:dango.py    作者:khazhyk    | 项目源码 | 文件源码
def make_zerobin_payload(string_content):
    compress = zlib.compressobj(
        0, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 4)
    compressed_data = compress.compress(bytes(mangle_string(string_content)))
    compressed_data += compress.flush()
    encoded_data = base64.b64encode(compressed_data)
    key = os.urandom(32)
    encoded_key = base64.urlsafe_b64encode(key)
    encrypted_data = sjcl.SJCL().encrypt(encoded_data, encoded_key)
    return encrypted_data, encoded_key
项目:Proxy-Factory    作者:ping99    | 项目源码 | 文件源码
def filter(self, handler):
        is_local_client = handler.client_address[0] in ('127.0.0.1', '::1')
        pacfile = os.path.join(os.path.dirname(os.path.abspath(__file__)), common.PAC_FILE)
        urlparts = urlparse.urlsplit(handler.path)
        if handler.command == 'GET' and urlparts.path.lstrip('/') == common.PAC_FILE:
            if urlparts.query == 'flush':
                if is_local_client:
                    thread.start_new_thread(PacUtil.update_pacfile, (pacfile,))
                else:
                    return 'mock', {'status': 403, 'headers': {'Content-Type': 'text/plain'}, 'body': 'client address %r not allowed' % handler.client_address[0]}
            if time.time() - os.path.getmtime(pacfile) > common.PAC_EXPIRED:
                # check system uptime > 30 minutes
                uptime = get_uptime()
                if uptime and uptime > 1800:
                    thread.start_new_thread(lambda: os.utime(pacfile, (time.time(), time.time())) or PacUtil.update_pacfile(pacfile), tuple())
            with open(pacfile, 'rb') as fp:
                content = fp.read()
                if not is_local_client:
                    serving_addr = urlparts.hostname or ProxyUtil.get_listen_ip()
                    content = content.replace('127.0.0.1', serving_addr)
                headers = {'Content-Type': 'text/plain'}
                if 'gzip' in handler.headers.get('Accept-Encoding', ''):
                    headers['Content-Encoding'] = 'gzip'
                    compressobj = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0)
                    dataio = io.BytesIO()
                    dataio.write('\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xff')
                    dataio.write(compressobj.compress(content))
                    dataio.write(compressobj.flush())
                    dataio.write(struct.pack('<LL', zlib.crc32(content) & 0xFFFFFFFFL, len(content) & 0xFFFFFFFFL))
                    content = dataio.getvalue()
                return 'mock', {'status': 200, 'headers': headers, 'body': content}