Python lzma 模块,decompress() 实例源码

我们从Python开源项目中,提取了以下26个代码示例,用于说明如何使用lzma.decompress()

项目:Charcoal    作者:somebody1234    | 项目源码 | 文件源码
def DecompressBrotli(string):
    """
    DecompressBrotli(string) -> str
    Returns the original form of the given string compressed \
using Google's brotli compression method., passed without delimiters.

    """
    number = 0
    for character in string:
        ordinal = OrdinalLookup.get(character, ord(character))
        number = number * 255 + ordinal - (ordinal > gap)
    compressed = []
    while number > 1:
        compressed = [number % 256] + compressed
        number //= 256
    return brotli.decompress(bytes(compressed)).decode("ascii")
项目:slider    作者:llllllllll    | 项目源码 | 文件源码
def _consume_actions(buffer):
    compressed_byte_count = _consume_int(buffer)
    compressed_data = buffer[:compressed_byte_count]
    del buffer[:compressed_byte_count]
    decompressed_data = lzma.decompress(compressed_data)

    out = []
    offset = 0
    for raw_action in decompressed_data.split(b','):
        if not raw_action:
            continue
        raw_offset, x, y, raw_action_mask = raw_action.split(b'|')
        action_mask = ActionBitMask.unpack(int(raw_action_mask))
        offset += int(raw_offset)
        out.append(Action(
            datetime.timedelta(milliseconds=offset),
            Position(float(x), float(y)),
            action_mask['m1'],
            action_mask['m2'],
            action_mask['k1'],
            action_mask['k2'],
        ))
    return out
项目:pynix    作者:adnelson    | 项目源码 | 文件源码
def import_to_store(self, compressed_nar):
        """Given a compressed NAR, extract it and import it into the nix store.

        :param compressed_nar: The bytes of a NAR, compressed.
        :type  compressed_nar: ``str``
        """
        # Figure out how to extract the content.
        if self.compression.lower() in ("xz", "xzip"):
            data = lzma.decompress(compressed_nar)
        elif self.compression.lower() in ("bz2", "bzip2"):
            data = bz2.decompress(compressed_nar)
        else:
            data = gzip.decompress(compressed_nar)

        # Once extracted, convert it into a nix export object and import.
        export = self.nar_to_export(data)
        imported_path = export.import_to_store()
项目:Charcoal    作者:somebody1234    | 项目源码 | 文件源码
def DecompressLZMA(string):
    """
    DecompressBrotli(string) -> str
    Returns the original form of the given string compressed \
using Google's brotli compression method., passed without delimiters.

    """
    number = 0
    for character in string:
        ordinal = OrdinalLookup.get(character, ord(character))
        number = number * 255 + ordinal - (ordinal > gap)
    compressed = []
    while number > 1:
        compressed = [number % 256] + compressed
        number //= 256
    return lzma.decompress(bytes(compressed)).decode("ascii")
项目:ekko    作者:openstack    | 项目源码 | 文件源码
def decompress(data):
        return lzma.decompress(data)
项目:PyCIRCLeanMail    作者:CIRCL    | 项目源码 | 文件源码
def _lzma(self):
        '''LZMA processor'''
        try:
            archive = lzma.decompress(self.cur_attachment.file_obj.read())
            new_fn, ext = os.path.splitext(self.cur_attachment.orig_filename)
            cur_file = File(archive, new_fn)
            self.process_payload(cur_file)
        except:
            self.cur_attachment.make_dangerous()
        return self.cur_attachment
项目:PyCIRCLeanMail    作者:CIRCL    | 项目源码 | 文件源码
def _bzip(self):
        '''BZip2 processor'''
        try:
            archive = bz2.decompress(self.cur_attachment.file_obj.read())
            new_fn, ext = os.path.splitext(self.cur_attachment.orig_filename)
            cur_file = File(archive, new_fn)
            self.process_payload(cur_file)
        except:
            self.cur_attachment.make_dangerous()
        return self.cur_attachment
项目:TACTIC-Handler    作者:listyque    | 项目源码 | 文件源码
def bz2_pack(source):
    """
    Returns 'source' as a bzip2-compressed, self-extracting python script.

    .. note::

        This method uses up more space than the zip_pack method but it has the
        advantage in that the resulting .py file can still be imported into a
        python program.
    """
    import bz2, base64
    out = ""
    # Preserve shebangs (don't care about encodings for this)
    first_line = source.split('\n')[0]
    if analyze.shebang.match(first_line):
        if py3:
            if first_line.rstrip().endswith('python'): # Make it python3
                first_line = first_line.rstrip()
                first_line += '3' #!/usr/bin/env python3
        out = first_line + '\n'
    compressed_source = bz2.compress(source.encode('utf-8'))
    out += 'import bz2, base64\n'
    out += "exec(bz2.decompress(base64.b64decode('"
    out += base64.b64encode(compressed_source).decode('utf-8')
    out += "')))\n"
    return out
项目:TACTIC-Handler    作者:listyque    | 项目源码 | 文件源码
def gz_pack(source):
    """
    Returns 'source' as a gzip-compressed, self-extracting python script.

    .. note::

        This method uses up more space than the zip_pack method but it has the
        advantage in that the resulting .py file can still be imported into a
        python program.
    """
    import zlib, base64
    out = ""
    # Preserve shebangs (don't care about encodings for this)
    first_line = source.split('\n')[0]
    if analyze.shebang.match(first_line):
        if py3:
            if first_line.rstrip().endswith('python'): # Make it python3
                first_line = first_line.rstrip()
                first_line += '3' #!/usr/bin/env python3
        out = first_line + '\n'
    compressed_source = zlib.compress(source.encode('utf-8'))
    out += 'import zlib, base64\n'
    out += "exec(zlib.decompress(base64.b64decode('"
    out += base64.b64encode(compressed_source).decode('utf-8')
    out += "')))\n"
    return out
项目:TACTIC-Handler    作者:listyque    | 项目源码 | 文件源码
def lzma_pack(source):
    """
    Returns 'source' as a lzma-compressed, self-extracting python script.

    .. note::

        This method uses up more space than the zip_pack method but it has the
        advantage in that the resulting .py file can still be imported into a
        python program.
    """
    import lzma, base64
    out = ""
    # Preserve shebangs (don't care about encodings for this)
    first_line = source.split('\n')[0]
    if analyze.shebang.match(first_line):
        if py3:
            if first_line.rstrip().endswith('python'): # Make it python3
                first_line = first_line.rstrip()
                first_line += '3' #!/usr/bin/env python3
        out = first_line + '\n'
    compressed_source = lzma.compress(source.encode('utf-8'))
    out += 'import lzma, base64\n'
    out += "exec(lzma.decompress(base64.b64decode('"
    out += base64.b64encode(compressed_source).decode('utf-8')
    out += "')))\n"
    return out
项目:chrome-prerender    作者:bosondata    | 项目源码 | 文件源码
def get(self, key: str, format: str = 'html') -> Optional[bytes]:
        loop = asyncio.get_event_loop()
        cache_get = self._cache.get
        data = await loop.run_in_executor(None, cache_get, key + format)
        if data is not None:
            res = await loop.run_in_executor(None, lzma.decompress, data)
            return res
项目:py2p    作者:p2p-today    | 项目源码 | 文件源码
def __decompress_string(cls, string, compressions=None):
        # type: (Any, bytes, Union[None, Iterable[int]]) -> Tuple[bytes, bool]
        """Returns a tuple containing the decompressed :py:class:`bytes` and a
        :py:class:`bool` as to whether decompression failed or not

        Args:
            string:         The possibly-compressed message you wish to parse
            compressions:   A list of the standard compression methods this
                                message may be under (default: ``[]``)

        Returns:
            A decompressed version of the message

        Raises:
           ValueError:  Unrecognized compression method fed in compressions

        Warning:
            Do not feed it with the size header, it will throw errors
        """
        compression_fail = False
        # second is module scope compression
        for method in intersect(compressions, compression):
            try:
                string = decompress(string, method)
                compression_fail = False
                break
            except:
                compression_fail = True
                continue
        return (string, compression_fail)
项目:osu-replay-parser    作者:kszlim    | 项目源码 | 文件源码
def parse_play_data(self, replay_data):
        offset_end = self.offset+self.__replay_length
        if self.game_mode != GameMode.Standard:
            self.play_data = None
        else:
            datastring = lzma.decompress(replay_data[self.offset:offset_end], format=lzma.FORMAT_AUTO).decode('ascii')[:-1]
            events = [eventstring.split('|') for eventstring in datastring.split(',')]
            self.play_data = [ReplayEvent(int(event[0]), float(event[1]), float(event[2]), int(event[3])) for event in events]
        self.offset = offset_end
项目:arclib    作者:kirbyfan64    | 项目源码 | 文件源码
def test_incremental_compress():
    basic_test_c(lzma.Compressor(), decompress)
项目:tasker    作者:wavenator    | 项目源码 | 文件源码
def decompress(
        data,
    ):
        decompressed_object = lzma.decompress(data)

        return decompressed_object
项目:osurender    作者:jamesguessis    | 项目源码 | 文件源码
def parseReplay(filename):
    osr = open(filename, 'rb').read()
    offset = 0
    data = {}
    data['mode'], offset = parseNum(osr, offset, 1)
    data['version'], offset = parseNum(osr, offset, 4)
    data['beatmap_md5'], offset = parseString(osr, offset)
    data['player'], offset = parseString(osr, offset)
    data['player_lower'] = data['player'].lower()
    data['replay_md5'], offset = parseString(osr, offset)
    data['num_300'], offset = parseNum(osr, offset, 2)
    data['num_100'], offset = parseNum(osr, offset, 2)
    data['num_50'], offset = parseNum(osr, offset, 2)
    data['num_geki'], offset = parseNum(osr, offset, 2)
    data['num_katu'], offset = parseNum(osr, offset, 2)
    data['num_miss'], offset = parseNum(osr, offset, 2)
    data['score'], offset = parseNum(osr, offset, 4)
    data['max_combo'], offset = parseNum(osr, offset, 2)
    data['perfect_combo'], offset = parseNum(osr, offset, 1)
    mods, offset = parseNum(osr, offset, 4)
    data['mods'] = parseMods(mods)
    data['mods_bitwise'] = mods
    graphString, offset = parseString(osr, offset)
    data['life_graph'] = parseLifeGraph(graphString)
    data['time_stamp'], offset = parseDate(osr, offset)
    data_len, offset = parseNum(osr, offset, 4)
    replay_str = str(lzma.decompress(osr[offset:offset+data_len]), 'utf-8')
    data['replay_data'] = parseReplayString(replay_str)

    return data
项目:edi    作者:lueschem    | 项目源码 | 文件源码
def _gz_decompress(data):
    return zlib.decompress(data, 16+zlib.MAX_WBITS)
项目:edi    作者:lueschem    | 项目源码 | 文件源码
def decompress(data):
    for item in decompressor_from_magic:
        if data.startswith(item[0]):
            return item[1](data)
    raise FatalError("Unknown compression type!")
项目:shellsploit-library    作者:riusksk    | 项目源码 | 文件源码
def gz_pack(source):
    """
    Returns 'source' as a gzip-compressed, self-extracting python script.

    .. note::

        This method uses up more space than the zip_pack method but it has the
        advantage in that the resulting .py file can still be imported into a
        python program.
    """
    import zlib
    import base64
    out = ""
    # Preserve shebangs (don't care about encodings for this)
    first_line = source.split('\n')[0]
    if analyze.shebang.match(first_line):
        if py3:
            if first_line.rstrip().endswith('python'):  # Make it python3
                first_line = first_line.rstrip()
                first_line += '3'  # !/usr/bin/env python3
        out = first_line + '\n'
    compressed_source = zlib.compress(source.encode('utf-8'))
    out += 'import zlib, base64\n'
    out += "exec(zlib.decompress(base64.b64decode('"
    out += base64.b64encode(compressed_source).decode('utf-8')
    out += "')))\n"
    return out
项目:shellsploit-library    作者:riusksk    | 项目源码 | 文件源码
def lzma_pack(source):
    """
    Returns 'source' as a lzma-compressed, self-extracting python script.

    .. note::

        This method uses up more space than the zip_pack method but it has the
        advantage in that the resulting .py file can still be imported into a
        python program.
    """
    import lzma
    import base64
    out = ""
    # Preserve shebangs (don't care about encodings for this)
    first_line = source.split('\n')[0]
    if analyze.shebang.match(first_line):
        if py3:
            if first_line.rstrip().endswith('python'):  # Make it python3
                first_line = first_line.rstrip()
                first_line += '3'  # !/usr/bin/env python3
        out = first_line + '\n'
    compressed_source = lzma.compress(source.encode('utf-8'))
    out += 'import lzma, base64\n'
    out += "exec(lzma.decompress(base64.b64decode('"
    out += base64.b64encode(compressed_source).decode('utf-8')
    out += "')))\n"
    return out
项目:django-asyncio-redis    作者:mackeyja92    | 项目源码 | 文件源码
def decompress(self, data):
        try:
            return lzma.decompress(data)
        except lzma.LZMAError as e:
            raise CompressorError(e)
项目:SoS    作者:vatlab    | 项目源码 | 文件源码
def loadTask(filename):
    try:
        with open(filename, 'rb') as task:
            try:
                header = task.readline().decode()
                if header.startswith('SOSTASK1.1'):
                    # ignore the tags
                    task.readline()
                    return pickle.load(task)
                elif header.startswith('SOSTASK1.2'):
                    task.readline()
                    try:
                        return pickle.loads(lzma.decompress(task.read()))
                    except:
                        # at some point, the task files were compressed with zlib
                        import zlib
                        return pickle.loads(zlib.decompress(task.read()))
                else:
                    raise ValueError('Try old format')
            except:
                # old format
                task.seek(0)
                param = pickle.load(task)
                # old format does not have tags
                param.tags = []
                return param
    except ImportError as e:
        raise RuntimeError(
            f'Failed to load task {os.path.basename(filename)}, which is likely caused by incompatible python modules between local and remote hosts: {e}')
项目:PyCIRCLeanMail    作者:CIRCL    | 项目源码 | 文件源码
def _gzip(self):
        '''GZip processor'''
        try:
            archive = gzip.decompress(self.cur_attachment.file_obj.read())
            new_fn, ext = os.path.splitext(self.cur_attachment.orig_filename)
            cur_file = File(archive, new_fn)
            self.process_payload(cur_file)
        except:
            self.cur_attachment.make_dangerous()
        return self.cur_attachment
项目:py2p    作者:p2p-today    | 项目源码 | 文件源码
def decompress(msg, method):
    # type: (bytes, int) -> bytes
    """Shortcut method for decompression

    Args:
        msg:    The message you wish to decompress, the type required is
                    defined by the requested method
        method: The decompression method you wish to use. Supported
                    (assuming installed):

                    - :py:data:`~py2p.flags.gzip`
                    - :py:data:`~py2p.flags.zlib`
                    - :py:data:`~py2p.flags.bz2`
                    - :py:data:`~py2p.flags.lzma`
                    - :py:data:`~py2p.flags.snappy`

    Returns:
        Defined by the decompression method, but typically the bytes of the
        compressed message

    Warning:
        The types fed are dependent on which decompression method you use.
        Best to assume most values are :py:class:`bytes` or
        :py:class:`bytearray`

    Raises:
        ValueError: if there is an unknown compression method, or a
            method-specific error
    """
    if method in (flags.gzip, flags.zlib):
        return zlib.decompress(msg, zlib.MAX_WBITS | 32)
    elif method == flags.bz2:
        return bz2.decompress(msg)
    elif method == flags.lzma:
        return lzma.decompress(msg)
    elif method == flags.snappy:
        return snappy.decompress(msg)
    else:  # pragma: no cover
        raise ValueError('Unknown decompression method')


# This should be in order of preference, with None being implied as last
项目:py2p    作者:p2p-today    | 项目源码 | 文件源码
def feed_string(
        cls,  # type: Any
        string,  # type: Union[bytes, bytearray, str]
        sizeless=False,  # type: bool
        compressions=None  # type: Union[None, Iterable[int]]
    ):  # type: (...) -> InternalMessage
        """Constructs a :py:class:`~py2p.messages.InternalMessage` from a string
        or :py:class:`bytes` object.

        Args:
            string:         The string you wish to parse
            sizeless:       A :py:class:`bool` which describes whether this
                                string has its size header (default: it does)
            compressions:   A iterable containing the standardized compression
                                methods this message might be under
                                (default: ``[]``)

        Returns:
            A :py:class:`~py2p.messages.InternalMessage` from the given string

        Raises:
           AttributeError: Fed a non-string, non-:py:class:`bytes` argument
           AssertionError: Initial size header is incorrect
           ValueError:     Unrecognized compression method fed in compressions
           IndexError:     Packet headers are incorrect OR
                               unrecognized compression
        """
        # First section checks size header
        _string = cls.__sanitize_string(string, sizeless)
        # Then we attempt to decompress
        _string, compression_fail = cls.__decompress_string(
            _string, compressions)
        id_ = _string[0:32]
        serialized = _string[32:]
        checksum = sha256(serialized).digest()
        assert id_ == checksum, "Checksum failed: {} != {}".format(
            id_, checksum)
        packets = unpackb(serialized)
        msg = cls(
            packets[0], packets[1], packets[3:], compression=compressions)
        msg.time = packets[2]
        msg.compression_fail = compression_fail
        msg._InternalMessage__id = checksum
        msg._InternalMessage__string = serialized
        # msg.__string = _string
        return msg
项目:shellsploit-library    作者:riusksk    | 项目源码 | 文件源码
def bz2_pack(source):
    """
    Returns 'source' as a bzip2-compressed, self-extracting python script.

    .. note::

        This method uses up more space than the zip_pack method but it has the
        advantage in that the resulting .py file can still be imported into a
        python program.
    """
    import bz2
    import base64
    out = ""
    # Preserve shebangs (don't care about encodings for this)
    first_line = source.split('\n')[0]
    if analyze.shebang.match(first_line):
        if py3:
            if first_line.rstrip().endswith('python'):  # Make it python3
                first_line = first_line.rstrip()
                first_line += '3'  # !/usr/bin/env python3
        out = first_line + '\n'
    compressed_source = bz2.compress(source.encode('utf-8'))
    out += 'import bz2, base64\n'
    out += "exec(bz2.decompress(base64.b64decode('"
    out += base64.b64encode(compressed_source).decode('utf-8')
    out += "')))\n"
    return out