我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用bz2.BZ2Decompressor()。
def _decompress_bz2(filename, blocksize=900*1024): """ Decompress .tar.bz2 to .tar on disk (for faster access) Use TemporaryFile to guarentee write access. """ if not filename.endswith('.tar.bz2'): return filename fd, path = mkstemp() with os.fdopen(fd, 'wb') as fo: with open(filename, 'rb') as fi: z = bz2.BZ2Decompressor() for block in iter(lambda: fi.read(blocksize), b''): fo.write(z.decompress(block)) return path
def init(self): import lzma self.pos = 0 if self.mode == "r": self.cmpobj = lzma.BZ2Decompressor() self.fileobj.seek(0) self.buf = "" else: self.cmpobj = lzma.BZ2Compressor() # class _XZProxy #------------------------ # Extraction file object #------------------------
def test_stream_padding(self): # Test for bug #1543303. tar = tarfile.open(tmpname, self.mode) tar.close() if self.mode.endswith("gz"): with gzip.GzipFile(tmpname) as fobj: data = fobj.read() elif self.mode.endswith("bz2"): dec = bz2.BZ2Decompressor() with open(tmpname, "rb") as fobj: data = fobj.read() data = dec.decompress(data) self.assertTrue(len(dec.unused_data) == 0, "found trailing data") else: with open(tmpname, "rb") as fobj: data = fobj.read() self.assertTrue(data.count(b"\0") == tarfile.RECORDSIZE, "incorrect zero padding")
def test_stream_padding(self): # Test for bug #1543303. tar = tarfile.open(tmpname, self.mode) tar.close() if self.mode.endswith("gz"): with gzip.GzipFile(tmpname) as fobj: data = fobj.read() elif self.mode.endswith("bz2"): dec = bz2.BZ2Decompressor() with open(tmpname, "rb") as fobj: data = fobj.read() data = dec.decompress(data) self.assertTrue(len(dec.unused_data) == 0, "found trailing data") else: with open(tmpname, "rb") as fobj: data = fobj.read() self.assertTrue(data.count("\0") == tarfile.RECORDSIZE, "incorrect zero padding")
def bz2_compress(self,file,type=True): # Compress/Decompress files into/from the bz2 format. compress if type else decompess if not os.path.exists(file) or os.path.isdir(file): return False try: filesize = os.path.getsize(file) except: return False if not type and not file.endswith(".bz2"): return False blocksize = 102400 if type: compressor = bz2.BZ2Compressor() else: decompressor = bz2.BZ2Decompressor() handle1 = open(file,"rb") handle2 = open(file+".bz2","wb") if type else open(file[:-4],"wb") for i in range(int(math.ceil(float(filesize)/blocksize))): if type: handle2.write(compressor.compress(handle1.read(blocksize))) else: handle2.write(decompressor.decompress(handle1.read(blocksize))) if type: handle2.write(compressor.flush()) handle1.close(); handle2.close() self.debug("Successfully "+("" if type else "de")+"compressed file : "+file) return True ################################################## Client Behaviour Functions ##################################################
def test_stream_padding(self): # Test for bug #1543303. tar = tarfile.open(tmpname, self.mode) tar.close() if self.mode.endswith("gz"): fobj = gzip.GzipFile(tmpname) data = fobj.read() fobj.close() elif self.mode.endswith("bz2"): dec = bz2.BZ2Decompressor() with open(tmpname, "rb") as fobj: data = fobj.read() data = dec.decompress(data) self.assertTrue(len(dec.unused_data) == 0, "found trailing data") else: fobj = open(tmpname, "rb") data = fobj.read() fobj.close() self.assertTrue(data.count("\0") == tarfile.RECORDSIZE, "incorrect zero padding")
def test_stream_padding(self): # Test for bug #1543303. tar = tarfile.open(tmpname, self.mode) tar.close() if self.mode.endswith("gz"): fobj = gzip.GzipFile(tmpname) data = fobj.read() fobj.close() elif self.mode.endswith("bz2"): dec = bz2.BZ2Decompressor() data = open(tmpname, "rb").read() data = dec.decompress(data) self.assertTrue(len(dec.unused_data) == 0, "found trailing data") else: fobj = open(tmpname, "rb") data = fobj.read() fobj.close() self.assertTrue(data.count("\0") == tarfile.RECORDSIZE, "incorrect zero padding")
def init(self): import bz2 self.pos = 0 if self.mode == "r": self.bz2obj = bz2.BZ2Decompressor() self.fileobj.seek(0) self.buf = b"" else: self.bz2obj = bz2.BZ2Compressor()
def init(self): import bz2 self.pos = 0 if self.mode == "r": self.bz2obj = bz2.BZ2Decompressor() self.fileobj.seek(0) self.buf = "" else: self.bz2obj = bz2.BZ2Compressor()
def decompress(chunks, compression): """Decompress :param __generator[bytes] chunks: compressed body chunks. :param str compression: compression constant. :rtype: __generator[bytes] :return: decompressed chunks. :raise: TypeError, DecompressError """ if compression not in SUPPORTED_COMPRESSIONS: raise TypeError('Unsupported compression type: %s' % (compression,)) try: de_compressor = DECOMPRESSOR_FACTORIES[compression]() for chunk in chunks: try: yield de_compressor.decompress(chunk) except OSError as err: # BZ2Decompressor: invalid data stream raise DecompressError(err) from None # BZ2Decompressor does not support flush() interface. if hasattr(de_compressor, 'flush'): yield de_compressor.flush() except zlib.error as err: raise DecompressError(err) from None
def __init__(self, fileobj): self._fileobj = fileobj self._bz2 = bz2.BZ2Decompressor() self._line_buffer = collections.deque([""]) self._current_line = "" self._current_offset = 0
def init(self): import bz2 self.pos = 0 if self.mode == "r": self.cmpobj = bz2.BZ2Decompressor() self.fileobj.seek(0) self.buf = "" else: self.cmpobj = bz2.BZ2Compressor() # class _BZ2Proxy