Python lzma 模块,LZMACompressor() 实例源码

我们从Python开源项目中,提取了以下25个代码示例,用于说明如何使用lzma.LZMACompressor()

项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_simple_bad_args(self):
        self.assertRaises(TypeError, LZMACompressor, [])
        self.assertRaises(TypeError, LZMACompressor, format=3.45)
        self.assertRaises(TypeError, LZMACompressor, check="")
        self.assertRaises(TypeError, LZMACompressor, preset="asdf")
        self.assertRaises(TypeError, LZMACompressor, filters=3)
        # Can't specify FORMAT_AUTO when compressing.
        self.assertRaises(ValueError, LZMACompressor, format=lzma.FORMAT_AUTO)
        # Can't specify a preset and a custom filter chain at the same time.
        with self.assertRaises(ValueError):
            LZMACompressor(preset=7, filters=[{"id": lzma.FILTER_LZMA2}])

        self.assertRaises(TypeError, LZMADecompressor, ())
        self.assertRaises(TypeError, LZMADecompressor, memlimit=b"qw")
        with self.assertRaises(TypeError):
            LZMADecompressor(lzma.FORMAT_RAW, filters="zzz")
        # Cannot specify a memory limit with FILTER_RAW.
        with self.assertRaises(ValueError):
            LZMADecompressor(lzma.FORMAT_RAW, memlimit=0x1000000)
        # Can only specify a custom filter chain with FILTER_RAW.
        self.assertRaises(ValueError, LZMADecompressor, filters=FILTERS_RAW_1)
        with self.assertRaises(ValueError):
            LZMADecompressor(format=lzma.FORMAT_XZ, filters=FILTERS_RAW_1)
        with self.assertRaises(ValueError):
            LZMADecompressor(format=lzma.FORMAT_ALONE, filters=FILTERS_RAW_1)

        lzc = LZMACompressor()
        self.assertRaises(TypeError, lzc.compress)
        self.assertRaises(TypeError, lzc.compress, b"foo", b"bar")
        self.assertRaises(TypeError, lzc.flush, b"blah")
        empty = lzc.flush()
        self.assertRaises(ValueError, lzc.compress, b"quux")
        self.assertRaises(ValueError, lzc.flush)

        lzd = LZMADecompressor()
        self.assertRaises(TypeError, lzd.decompress)
        self.assertRaises(TypeError, lzd.decompress, b"foo", b"bar")
        lzd.decompress(empty)
        self.assertRaises(EOFError, lzd.decompress, b"quux")
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_bad_filter_spec(self):
        self.assertRaises(TypeError, LZMACompressor, filters=[b"wobsite"])
        self.assertRaises(ValueError, LZMACompressor, filters=[{"xyzzy": 3}])
        self.assertRaises(ValueError, LZMACompressor, filters=[{"id": 98765}])
        with self.assertRaises(ValueError):
            LZMACompressor(filters=[{"id": lzma.FILTER_LZMA2, "foo": 0}])
        with self.assertRaises(ValueError):
            LZMACompressor(filters=[{"id": lzma.FILTER_DELTA, "foo": 0}])
        with self.assertRaises(ValueError):
            LZMACompressor(filters=[{"id": lzma.FILTER_X86, "foo": 0}])
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_decompressor_bad_input(self):
        lzd = LZMADecompressor()
        self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_RAW_1)

        lzd = LZMADecompressor(lzma.FORMAT_XZ)
        self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_ALONE)

        lzd = LZMADecompressor(lzma.FORMAT_ALONE)
        self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_XZ)

        lzd = LZMADecompressor(lzma.FORMAT_RAW, filters=FILTERS_RAW_1)
        self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_XZ)

    # Test that LZMACompressor->LZMADecompressor preserves the input data.
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_roundtrip_xz(self):
        lzc = LZMACompressor()
        cdata = lzc.compress(INPUT) + lzc.flush()
        lzd = LZMADecompressor()
        self._test_decompressor(lzd, cdata, lzma.CHECK_CRC64)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_roundtrip_alone(self):
        lzc = LZMACompressor(lzma.FORMAT_ALONE)
        cdata = lzc.compress(INPUT) + lzc.flush()
        lzd = LZMADecompressor()
        self._test_decompressor(lzd, cdata, lzma.CHECK_NONE)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_roundtrip_chunks(self):
        lzc = LZMACompressor()
        cdata = []
        for i in range(0, len(INPUT), 10):
            cdata.append(lzc.compress(INPUT[i:i+10]))
        cdata.append(lzc.flush())
        cdata = b"".join(cdata)
        lzd = LZMADecompressor()
        self._test_decompressor(lzd, cdata, lzma.CHECK_CRC64)

    # LZMADecompressor intentionally does not handle concatenated streams.
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_compressor_bigmem(self, size):
        lzc = LZMACompressor()
        cdata = lzc.compress(b"x" * size) + lzc.flush()
        ddata = lzma.decompress(cdata)
        try:
            self.assertEqual(len(ddata), size)
            self.assertEqual(len(ddata.strip(b"x")), 0)
        finally:
            ddata = None
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_simple_bad_args(self):
        self.assertRaises(TypeError, LZMACompressor, [])
        self.assertRaises(TypeError, LZMACompressor, format=3.45)
        self.assertRaises(TypeError, LZMACompressor, check="")
        self.assertRaises(TypeError, LZMACompressor, preset="asdf")
        self.assertRaises(TypeError, LZMACompressor, filters=3)
        # Can't specify FORMAT_AUTO when compressing.
        self.assertRaises(ValueError, LZMACompressor, format=lzma.FORMAT_AUTO)
        # Can't specify a preset and a custom filter chain at the same time.
        with self.assertRaises(ValueError):
            LZMACompressor(preset=7, filters=[{"id": lzma.FILTER_LZMA2}])

        self.assertRaises(TypeError, LZMADecompressor, ())
        self.assertRaises(TypeError, LZMADecompressor, memlimit=b"qw")
        with self.assertRaises(TypeError):
            LZMADecompressor(lzma.FORMAT_RAW, filters="zzz")
        # Cannot specify a memory limit with FILTER_RAW.
        with self.assertRaises(ValueError):
            LZMADecompressor(lzma.FORMAT_RAW, memlimit=0x1000000)
        # Can only specify a custom filter chain with FILTER_RAW.
        self.assertRaises(ValueError, LZMADecompressor, filters=FILTERS_RAW_1)
        with self.assertRaises(ValueError):
            LZMADecompressor(format=lzma.FORMAT_XZ, filters=FILTERS_RAW_1)
        with self.assertRaises(ValueError):
            LZMADecompressor(format=lzma.FORMAT_ALONE, filters=FILTERS_RAW_1)

        lzc = LZMACompressor()
        self.assertRaises(TypeError, lzc.compress)
        self.assertRaises(TypeError, lzc.compress, b"foo", b"bar")
        self.assertRaises(TypeError, lzc.flush, b"blah")
        empty = lzc.flush()
        self.assertRaises(ValueError, lzc.compress, b"quux")
        self.assertRaises(ValueError, lzc.flush)

        lzd = LZMADecompressor()
        self.assertRaises(TypeError, lzd.decompress)
        self.assertRaises(TypeError, lzd.decompress, b"foo", b"bar")
        lzd.decompress(empty)
        self.assertRaises(EOFError, lzd.decompress, b"quux")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_bad_filter_spec(self):
        self.assertRaises(TypeError, LZMACompressor, filters=[b"wobsite"])
        self.assertRaises(ValueError, LZMACompressor, filters=[{"xyzzy": 3}])
        self.assertRaises(ValueError, LZMACompressor, filters=[{"id": 98765}])
        with self.assertRaises(ValueError):
            LZMACompressor(filters=[{"id": lzma.FILTER_LZMA2, "foo": 0}])
        with self.assertRaises(ValueError):
            LZMACompressor(filters=[{"id": lzma.FILTER_DELTA, "foo": 0}])
        with self.assertRaises(ValueError):
            LZMACompressor(filters=[{"id": lzma.FILTER_X86, "foo": 0}])
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_decompressor_bad_input(self):
        lzd = LZMADecompressor()
        self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_RAW_1)

        lzd = LZMADecompressor(lzma.FORMAT_XZ)
        self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_ALONE)

        lzd = LZMADecompressor(lzma.FORMAT_ALONE)
        self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_XZ)

        lzd = LZMADecompressor(lzma.FORMAT_RAW, filters=FILTERS_RAW_1)
        self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_XZ)

    # Test that LZMACompressor->LZMADecompressor preserves the input data.
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_roundtrip_xz(self):
        lzc = LZMACompressor()
        cdata = lzc.compress(INPUT) + lzc.flush()
        lzd = LZMADecompressor()
        self._test_decompressor(lzd, cdata, lzma.CHECK_CRC64)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_roundtrip_alone(self):
        lzc = LZMACompressor(lzma.FORMAT_ALONE)
        cdata = lzc.compress(INPUT) + lzc.flush()
        lzd = LZMADecompressor()
        self._test_decompressor(lzd, cdata, lzma.CHECK_NONE)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_roundtrip_chunks(self):
        lzc = LZMACompressor()
        cdata = []
        for i in range(0, len(INPUT), 10):
            cdata.append(lzc.compress(INPUT[i:i+10]))
        cdata.append(lzc.flush())
        cdata = b"".join(cdata)
        lzd = LZMADecompressor()
        self._test_decompressor(lzd, cdata, lzma.CHECK_CRC64)

    # LZMADecompressor intentionally does not handle concatenated streams.
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_compressor_bigmem(self, size):
        lzc = LZMACompressor()
        cdata = lzc.compress(b"x" * size) + lzc.flush()
        ddata = lzma.decompress(cdata)
        try:
            self.assertEqual(len(ddata), size)
            self.assertEqual(len(ddata.strip(b"x")), 0)
        finally:
            ddata = None
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_pickle(self):
        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
            with self.assertRaises(TypeError):
                pickle.dumps(LZMACompressor(), proto)
            with self.assertRaises(TypeError):
                pickle.dumps(LZMADecompressor(), proto)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_simple_bad_args(self):
        self.assertRaises(TypeError, LZMACompressor, [])
        self.assertRaises(TypeError, LZMACompressor, format=3.45)
        self.assertRaises(TypeError, LZMACompressor, check="")
        self.assertRaises(TypeError, LZMACompressor, preset="asdf")
        self.assertRaises(TypeError, LZMACompressor, filters=3)
        # Can't specify FORMAT_AUTO when compressing.
        self.assertRaises(ValueError, LZMACompressor, format=lzma.FORMAT_AUTO)
        # Can't specify a preset and a custom filter chain at the same time.
        with self.assertRaises(ValueError):
            LZMACompressor(preset=7, filters=[{"id": lzma.FILTER_LZMA2}])

        self.assertRaises(TypeError, LZMADecompressor, ())
        self.assertRaises(TypeError, LZMADecompressor, memlimit=b"qw")
        with self.assertRaises(TypeError):
            LZMADecompressor(lzma.FORMAT_RAW, filters="zzz")
        # Cannot specify a memory limit with FILTER_RAW.
        with self.assertRaises(ValueError):
            LZMADecompressor(lzma.FORMAT_RAW, memlimit=0x1000000)
        # Can only specify a custom filter chain with FILTER_RAW.
        self.assertRaises(ValueError, LZMADecompressor, filters=FILTERS_RAW_1)
        with self.assertRaises(ValueError):
            LZMADecompressor(format=lzma.FORMAT_XZ, filters=FILTERS_RAW_1)
        with self.assertRaises(ValueError):
            LZMADecompressor(format=lzma.FORMAT_ALONE, filters=FILTERS_RAW_1)

        lzc = LZMACompressor()
        self.assertRaises(TypeError, lzc.compress)
        self.assertRaises(TypeError, lzc.compress, b"foo", b"bar")
        self.assertRaises(TypeError, lzc.flush, b"blah")
        empty = lzc.flush()
        self.assertRaises(ValueError, lzc.compress, b"quux")
        self.assertRaises(ValueError, lzc.flush)

        lzd = LZMADecompressor()
        self.assertRaises(TypeError, lzd.decompress)
        self.assertRaises(TypeError, lzd.decompress, b"foo", b"bar")
        lzd.decompress(empty)
        self.assertRaises(EOFError, lzd.decompress, b"quux")
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_bad_filter_spec(self):
        self.assertRaises(TypeError, LZMACompressor, filters=[b"wobsite"])
        self.assertRaises(ValueError, LZMACompressor, filters=[{"xyzzy": 3}])
        self.assertRaises(ValueError, LZMACompressor, filters=[{"id": 98765}])
        with self.assertRaises(ValueError):
            LZMACompressor(filters=[{"id": lzma.FILTER_LZMA2, "foo": 0}])
        with self.assertRaises(ValueError):
            LZMACompressor(filters=[{"id": lzma.FILTER_DELTA, "foo": 0}])
        with self.assertRaises(ValueError):
            LZMACompressor(filters=[{"id": lzma.FILTER_X86, "foo": 0}])
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_decompressor_bad_input(self):
        lzd = LZMADecompressor()
        self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_RAW_1)

        lzd = LZMADecompressor(lzma.FORMAT_XZ)
        self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_ALONE)

        lzd = LZMADecompressor(lzma.FORMAT_ALONE)
        self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_XZ)

        lzd = LZMADecompressor(lzma.FORMAT_RAW, filters=FILTERS_RAW_1)
        self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_XZ)

    # Test that LZMACompressor->LZMADecompressor preserves the input data.
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_roundtrip_xz(self):
        lzc = LZMACompressor()
        cdata = lzc.compress(INPUT) + lzc.flush()
        lzd = LZMADecompressor()
        self._test_decompressor(lzd, cdata, lzma.CHECK_CRC64)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_roundtrip_alone(self):
        lzc = LZMACompressor(lzma.FORMAT_ALONE)
        cdata = lzc.compress(INPUT) + lzc.flush()
        lzd = LZMADecompressor()
        self._test_decompressor(lzd, cdata, lzma.CHECK_NONE)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_roundtrip_chunks(self):
        lzc = LZMACompressor()
        cdata = []
        for i in range(0, len(INPUT), 10):
            cdata.append(lzc.compress(INPUT[i:i+10]))
        cdata.append(lzc.flush())
        cdata = b"".join(cdata)
        lzd = LZMADecompressor()
        self._test_decompressor(lzd, cdata, lzma.CHECK_CRC64)

    # LZMADecompressor intentionally does not handle concatenated streams.
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_compressor_bigmem(self, size):
        lzc = LZMACompressor()
        cdata = lzc.compress(b"x" * size) + lzc.flush()
        ddata = lzma.decompress(cdata)
        try:
            self.assertEqual(len(ddata), size)
            self.assertEqual(len(ddata.strip(b"x")), 0)
        finally:
            ddata = None
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_pickle(self):
        with self.assertRaises(TypeError):
            pickle.dumps(LZMACompressor())
        with self.assertRaises(TypeError):
            pickle.dumps(LZMADecompressor())
项目:chirribackup    作者:killabytenow    | 项目源码 | 文件源码
def __init__(self, algorithm, target_file = None):
        self.algorithm = algorithm
        self.bytes_in  = 0
        self.bytes_out = 0
        if self.algorithm is None:
            self.compressor = None
        elif self.algorithm == "lzma":
            if not lzma_available:
                raise BadCompressionException("LZMA (xz) compressor not available.")
            self.compressor = lzma.LZMACompressor()
        else:
            raise BadCompressionException("Uknown compression algorithm '%s'." % self.algorithm)

        self.out_file = open(target_file, "wb") \
                            if target_file is not None else None
项目:crypto-detector    作者:Wind-River    | 项目源码 | 文件源码
def __init__(self, name, mode, comptype, fileobj, bufsize):
        """Construct a _Stream object.
        """
        self._extfileobj = True
        if fileobj is None:
            fileobj = _LowLevelFile(name, mode)
            self._extfileobj = False

        if comptype == '*':
            # Enable transparent compression detection for the
            # stream interface
            fileobj = _StreamProxy(fileobj)
            comptype = fileobj.getcomptype()

        self.name     = name or ""
        self.mode     = mode
        self.comptype = comptype
        self.fileobj  = fileobj
        self.bufsize  = bufsize
        self.buf      = ""
        self.pos      = 0
        self.closed   = False

        if comptype == "gz":
            try:
                import zlib
            except ImportError:
                raise CompressionError("zlib module is not available")
            self.zlib = zlib
            self.crc = zlib.crc32("")
            if mode == "r":
                self._init_read_gz()
            else:
                self._init_write_gz()

        if comptype == "bz2":
            try:
                import bz2
            except ImportError:
                raise CompressionError("bz2 module is not available")
            if mode == "r":
                self.dbuf = ""
                self.cmp = bz2.BZ2Decompressor()
            else:
                self.cmp = bz2.BZ2Compressor()

        if comptype == "xz":
            try:
                import lzma
            except ImportError:
                raise CompressionError("lzma module is not available")
            if mode == "r":
                self.dbuf = ""
                self.cmp = lzma.LZMADecompressor()
            else:
                self.cmp = lzma.LZMACompressor()