Python zlib 模块,decompressobj() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zlib.decompressobj()

项目:jx-sqlite    作者:mozilla    | 项目源码 | 文件源码
def icompressed2ibytes(source):
    """
    :param source: GENERATOR OF COMPRESSED BYTES
    :return: GENERATOR OF BYTES
    """
    decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS)
    last_bytes_count = 0  # Track the last byte count, so we do not show too many debug lines
    bytes_count = 0
    for bytes_ in source:
        try:
            data = decompressor.decompress(bytes_)
        except Exception as e:
            Log.error("problem", cause=e)
        bytes_count += len(data)
        if Math.floor(last_bytes_count, 1000000) != Math.floor(bytes_count, 1000000):
            last_bytes_count = bytes_count
            if DEBUG:
                Log.note("bytes={{bytes}}", bytes=bytes_count)
        yield data
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            return self._obj.decompress(data)
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def content(self):
        """Raw content of response (i.e. bytes).

        :returns: Body of HTTP response
        :rtype: :class:`str`

        """
        if not self._content:

            # Decompress gzipped content
            if self._gzipped:
                decoder = zlib.decompressobj(16 + zlib.MAX_WBITS)
                self._content = decoder.decompress(self.raw.read())

            else:
                self._content = self.raw.read()

            self._content_loaded = True

        return self._content
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            return self._obj.decompress(data)
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            decompressed = self._obj.decompress(data)
            if decompressed:
                self._first_try = False
                self._data = None
            return decompressed
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            return self._obj.decompress(data)
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:googletranslate.popclipext    作者:wizyoung    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            decompressed = self._obj.decompress(data)
            if decompressed:
                self._first_try = False
                self._data = None
            return decompressed
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:Gank-Alfred-Workflow    作者:hujiaweibujidao    | 项目源码 | 文件源码
def content(self):
        """Raw content of response (i.e. bytes)

        :returns: Body of HTTP response
        :rtype: :class:`str`

        """

        if not self._content:

            # Decompress gzipped content
            if self._gzipped:
                decoder = zlib.decompressobj(16 + zlib.MAX_WBITS)
                self._content = decoder.decompress(self.raw.read())

            else:
                self._content = self.raw.read()

            self._content_loaded = True

        return self._content
项目:Gank-Alfred-Workflow    作者:hujiaweibujidao    | 项目源码 | 文件源码
def content(self):
        """Raw content of response (i.e. bytes)

        :returns: Body of HTTP response
        :rtype: :class:`str`

        """

        if not self._content:

            # Decompress gzipped content
            if self._gzipped:
                decoder = zlib.decompressobj(16 + zlib.MAX_WBITS)
                self._content = decoder.decompress(self.raw.read())

            else:
                self._content = self.raw.read()

            self._content_loaded = True

        return self._content
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            decompressed = self._obj.decompress(data)
            if decompressed:
                self._first_try = False
                self._data = None
            return decompressed
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:pip-update-requirements    作者:alanhamlett    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            decompressed = self._obj.decompress(data)
            if decompressed:
                self._first_try = False
                self._data = None
            return decompressed
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:aws-waf-security-automation    作者:cerbo    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            return self._obj.decompress(data)
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:Pyrlang    作者:esl    | 项目源码 | 文件源码
def binary_to_term(data: bytes, options: dict = {}):
    """ Strip 131 header and unpack if the data was compressed.

        :param data: The incoming encoded data with the 131 byte
        :param options: See description on top of the module
        :raises ETFDecodeException: when the tag is not 131, when compressed
            data is incomplete or corrupted
    """
    if data[0] != ETF_VERSION_TAG:
        raise ETFDecodeException("Unsupported external term version")

    if data[1] == TAG_COMPRESSED:
        do = decompressobj()
        decomp_size = util.u32(data, 2)
        decomp = do.decompress(data[6:]) + do.flush()
        if len(decomp) != decomp_size:
            # Data corruption?
            raise ETFDecodeException("Compressed size mismatch with actual")

        return binary_to_term_2(decomp, options)

    return binary_to_term_2(data[1:], options)
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            return self._obj.decompress(data)
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            return self._obj.decompress(data)
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            return self._obj.decompress(data)
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            return self._obj.decompress(data)
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            return self._obj.decompress(data)
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            return self._obj.decompress(data)
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:jira_worklog_scanner    作者:pgarneau    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            decompressed = self._obj.decompress(data)
            if decompressed:
                self._first_try = False
                self._data = None
            return decompressed
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:jira_worklog_scanner    作者:pgarneau    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            return self._obj.decompress(data)
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:workflows.kyoyue    作者:wizyoung    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            decompressed = self._obj.decompress(data)
            if decompressed:
                self._first_try = False
                self._data = None
            return decompressed
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:workflows.kyoyue    作者:wizyoung    | 项目源码 | 文件源码
def content(self):
        """Raw content of response (i.e. bytes).

        :returns: Body of HTTP response
        :rtype: :class:`str`

        """
        if not self._content:

            # Decompress gzipped content
            if self._gzipped:
                decoder = zlib.decompressobj(16 + zlib.MAX_WBITS)
                self._content = decoder.decompress(self.raw.read())

            else:
                self._content = self.raw.read()

            self._content_loaded = True

        return self._content
项目:alphy    作者:maximepeschard    | 项目源码 | 文件源码
def content(self):
        """Raw content of response (i.e. bytes).

        :returns: Body of HTTP response
        :rtype: :class:`str`

        """
        if not self._content:

            # Decompress gzipped content
            if self._gzipped:
                decoder = zlib.decompressobj(16 + zlib.MAX_WBITS)
                self._content = decoder.decompress(self.raw.read())

            else:
                self._content = self.raw.read()

            self._content_loaded = True

        return self._content
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            return self._obj.decompress(data)
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            return self._obj.decompress(data)
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:YoWhenReady    作者:jnsdrtlf    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            return self._obj.decompress(data)
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            return self._obj.decompress(data)
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            return self._obj.decompress(data)
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:purelove    作者:hucmosin    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            return self._obj.decompress(data)
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:alexa-stackoverflow    作者:benvand    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            return self._obj.decompress(data)
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:hgvm-builder    作者:BD2KGenomics    | 项目源码 | 文件源码
def __init__(self, stream):
        """
        Encapsulate the given stream in an unzipper.
        """

        # Keep the stream
        self.stream = stream

        # Make a decompressor with the accept a gzip header flag.
        # See <http://stackoverflow.com/a/22311297/402891>.
        # Will get None'd out after we're done decompressing and it is flushed.
        self.decompressor = zlib.decompressobj(zlib.MAX_WBITS + 16)

        # Are we really compressed? This gets set to True if we successfully
        # read the first block, and False if we failed to read the first block.
        # If it's False, there's no need to try to decompress any more blocks.
        self.header_read = None

        # We need to do lines ourselves, so we need to keep a buffer
        self.buffer = ""

        # Track out throughput
        self.compressed_bytes = 0
        self.uncompressed_bytes = 0
项目:danmu-bilibili    作者:saberxxy    | 项目源码 | 文件源码
def getDanmu(cid):
    if not cid:
        return "?????"
    try:
        cid_url = "http://comment.bilibili.com/%s.xml" % cid
        danmu_xml = urllib.request.urlopen(cid_url).read()
        xml = zlib.decompressobj(-zlib.MAX_WBITS).decompress(danmu_xml).decode()  # ????????

        return xml  # ?????
    except Exception:
        pass




# ??xml??????????????
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            return self._obj.decompress(data)
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            return self._obj.decompress(data)
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:harbour-sailfinder    作者:DylanVanAssche    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            return self._obj.decompress(data)
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:harbour-sailfinder    作者:DylanVanAssche    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            return self._obj.decompress(data)
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:GoToMeetingTools    作者:plongitudes    | 项目源码 | 文件源码
def content(self):
        """Raw content of response (i.e. bytes).

        :returns: Body of HTTP response
        :rtype: :class:`str`

        """
        if not self._content:

            # Decompress gzipped content
            if self._gzipped:
                decoder = zlib.decompressobj(16 + zlib.MAX_WBITS)
                self._content = decoder.decompress(self.raw.read())

            else:
                self._content = self.raw.read()

            self._content_loaded = True

        return self._content
项目:behelper    作者:istommao    | 项目源码 | 文件源码
def content(self):
        """Raw content of response (i.e. bytes).

        :returns: Body of HTTP response
        :rtype: :class:`str`

        """
        if not self._content:

            # Decompress gzipped content
            if self._gzipped:
                decoder = zlib.decompressobj(16 + zlib.MAX_WBITS)
                self._content = decoder.decompress(self.raw.read())

            else:
                self._content = self.raw.read()

            self._content_loaded = True

        return self._content
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            return self._obj.decompress(data)
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:ascii-art-py    作者:blinglnav    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            return self._obj.decompress(data)
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def content(self):
        """Raw content of response (i.e. bytes).

        :returns: Body of HTTP response
        :rtype: str

        """
        if not self._content:

            # Decompress gzipped content
            if self._gzipped:
                decoder = zlib.decompressobj(16 + zlib.MAX_WBITS)
                self._content = decoder.decompress(self.raw.read())

            else:
                self._content = self.raw.read()

            self._content_loaded = True

        return self._content
项目:Texty    作者:sarthfrey    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            return self._obj.decompress(data)
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:alfred-workflows    作者:arthurhammer    | 项目源码 | 文件源码
def content(self):
        """Raw content of response (i.e. bytes).

        :returns: Body of HTTP response
        :rtype: :class:`str`

        """
        if not self._content:

            # Decompress gzipped content
            if self._gzipped:
                decoder = zlib.decompressobj(16 + zlib.MAX_WBITS)
                self._content = decoder.decompress(self.raw.read())

            else:
                self._content = self.raw.read()

            self._content_loaded = True

        return self._content
项目:ghostlines-robofont    作者:ghostlines    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            return self._obj.decompress(data)
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:ghostlines-robofont    作者:ghostlines    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            return self._obj.decompress(data)
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:alfred-zebra    作者:r0x73    | 项目源码 | 文件源码
def content(self):
        """Raw content of response (i.e. bytes).

        :returns: Body of HTTP response
        :rtype: :class:`str`

        """
        if not self._content:

            # Decompress gzipped content
            if self._gzipped:
                decoder = zlib.decompressobj(16 + zlib.MAX_WBITS)
                self._content = decoder.decompress(self.raw.read())

            else:
                self._content = self.raw.read()

            self._content_loaded = True

        return self._content
项目:Dshield    作者:ywjt    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            return self._obj.decompress(data)
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:atc_alexa    作者:ckuzma    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            return self._obj.decompress(data)
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None
项目:WeiboPictureWorkflow    作者:cielpy    | 项目源码 | 文件源码
def decompress(self, data):
        if not data:
            return data

        if not self._first_try:
            return self._obj.decompress(data)

        self._data += data
        try:
            return self._obj.decompress(data)
        except zlib.error:
            self._first_try = False
            self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
            try:
                return self.decompress(self._data)
            finally:
                self._data = None