Python bz2 模块,compress() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用bz2.compress()

项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def _test_partial_input(self, mode):
        class MyBytesIO(io.BytesIO):
            hit_eof = False
            def read(self, n):
                if self.hit_eof:
                    raise AssertionError("infinite loop detected in tarfile.open()")
                self.hit_eof = self.tell() == len(self.getvalue())
                return super(MyBytesIO, self).read(n)
            def seek(self, *args):
                self.hit_eof = False
                return super(MyBytesIO, self).seek(*args)

        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
        for x in range(len(data) + 1):
            try:
                tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode)
            except tarfile.ReadError:
                pass # we have no interest in ReadErrors
项目:taf    作者:taf3    | 项目源码 | 文件源码
def encode(payload):
    """Encode a string for tranmission over the network using base64 encoding of the bz2 compressed string.

    We bz2 compress because we can and also to counteract the inefficiency of the base64 encoding.

    Args:
        payload(str): string we want to transmit over the wire

    Returns:
        str: base64 encoded, bz2 compressed string

    """
    return base64.urlsafe_b64encode(bz2.compress(payload.encode('utf-8'))).decode('utf-8')


# max is 1024**127
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def _test_partial_input(self, mode):
        class MyStringIO(StringIO.StringIO):
            hit_eof = False
            def read(self, n):
                if self.hit_eof:
                    raise AssertionError("infinite loop detected in tarfile.open()")
                self.hit_eof = self.pos == self.len
                return StringIO.StringIO.read(self, n)
            def seek(self, *args):
                self.hit_eof = False
                return StringIO.StringIO.seek(self, *args)

        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
        for x in range(len(data) + 1):
            try:
                tarfile.open(fileobj=MyStringIO(data[:x]), mode=mode)
            except tarfile.ReadError:
                pass # we have no interest in ReadErrors
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def _test_partial_input(self, mode):
        class MyStringIO(StringIO.StringIO):
            hit_eof = False
            def read(self, n):
                if self.hit_eof:
                    raise AssertionError("infinite loop detected in tarfile.open()")
                self.hit_eof = self.pos == self.len
                return StringIO.StringIO.read(self, n)
            def seek(self, *args):
                self.hit_eof = False
                return StringIO.StringIO.seek(self, *args)

        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
        for x in range(len(data) + 1):
            try:
                tarfile.open(fileobj=MyStringIO(data[:x]), mode=mode)
            except tarfile.ReadError:
                pass # we have no interest in ReadErrors
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def _test_partial_input(self, mode):
        class MyBytesIO(io.BytesIO):
            hit_eof = False
            def read(self, n):
                if self.hit_eof:
                    raise AssertionError("infinite loop detected in "
                                         "tarfile.open()")
                self.hit_eof = self.tell() == len(self.getvalue())
                return super(MyBytesIO, self).read(n)
            def seek(self, *args):
                self.hit_eof = False
                return super(MyBytesIO, self).seek(*args)

        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
        for x in range(len(data) + 1):
            try:
                tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode)
            except tarfile.ReadError:
                pass # we have no interest in ReadErrors
项目:covertutils    作者:operatorequals    | 项目源码 | 文件源码
def compress( self, message ) :
        """
This funtion performs all provided compression algorithm to the *message* parameter and decides which does the most efficient compression.
It does so by comparing the output lengths.

:param str message: The data to be compressed in raw bytes.
:rtype: str
:return: Data compressed by most efficient available algorithm.

"""
        zips = []
        for comp in self.comps :
            zfile = comp( message )
            zips.append( zfile )

        sorted_zips = sorted( zips, key = lambda tup:len( tup ) )
        winner = sorted_zips[0]
        return winner
项目:conda-mirror    作者:maxpoint    | 项目源码 | 文件源码
def _write_repodata(package_dir, repodata_dict):
    data = json.dumps(repodata_dict, indent=2, sort_keys=True)
    # strip trailing whitespace
    data = '\n'.join(line.rstrip() for line in data.splitlines())
    # make sure we have newline at the end
    if not data.endswith('\n'):
        data += '\n'

    with open(os.path.join(package_dir,
                           'repodata.json'), 'w') as fo:
        fo.write(data)

    # compress repodata.json into the bz2 format. some conda commands still
    # need it
    bz2_path = os.path.join(package_dir, 'repodata.json.bz2')
    with open(bz2_path, 'wb') as fo:
        fo.write(bz2.compress(data.encode('utf-8')))
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def _test_partial_input(self, mode):
        class MyStringIO(StringIO.StringIO):
            hit_eof = False
            def read(self, n):
                if self.hit_eof:
                    raise AssertionError("infinite loop detected in tarfile.open()")
                self.hit_eof = self.pos == self.len
                return StringIO.StringIO.read(self, n)
            def seek(self, *args):
                self.hit_eof = False
                return StringIO.StringIO.seek(self, *args)

        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
        for x in range(len(data) + 1):
            try:
                tarfile.open(fileobj=MyStringIO(data[:x]), mode=mode)
            except tarfile.ReadError:
                pass # we have no interest in ReadErrors
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _test_partial_input(self, mode):
        class MyBytesIO(io.BytesIO):
            hit_eof = False
            def read(self, n):
                if self.hit_eof:
                    raise AssertionError("infinite loop detected in "
                                         "tarfile.open()")
                self.hit_eof = self.tell() == len(self.getvalue())
                return super(MyBytesIO, self).read(n)
            def seek(self, *args):
                self.hit_eof = False
                return super(MyBytesIO, self).seek(*args)

        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
        for x in range(len(data) + 1):
            try:
                tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode)
            except tarfile.ReadError:
                pass # we have no interest in ReadErrors
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def _test_partial_input(self, mode):
        class MyStringIO(StringIO.StringIO):
            hit_eof = False
            def read(self, n):
                if self.hit_eof:
                    raise AssertionError("infinite loop detected in tarfile.open()")
                self.hit_eof = self.pos == self.len
                return StringIO.StringIO.read(self, n)
            def seek(self, *args):
                self.hit_eof = False
                return StringIO.StringIO.seek(self, *args)

        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
        for x in range(len(data) + 1):
            try:
                tarfile.open(fileobj=MyStringIO(data[:x]), mode=mode)
            except tarfile.ReadError:
                pass # we have no interest in ReadErrors
项目:voltha    作者:opencord    | 项目源码 | 文件源码
def load(cls, branch, kv_store, msg_cls, hash):
        #  Update the branch's config store
        blob = kv_store[hash]
        if cls.compress:
            blob = decompress(blob)
        data = loads(blob)

        config_hash = data['config']
        config_data = cls.load_config(kv_store, msg_cls, config_hash)

        children_list = data['children']
        assembled_children = {}
        node = branch._node
        for field_name, meta in children_fields(msg_cls).iteritems():
            child_msg_cls = tmp_cls_loader(meta.module, meta.type)
            children = []
            for child_hash in children_list[field_name]:
                child_node = node._mknode(child_msg_cls)
                child_node.load_latest(child_hash)
                child_rev = child_node.latest
                children.append(child_rev)
            assembled_children[field_name] = children
        rev = cls(branch, config_data, assembled_children)
        return rev
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def _test_partial_input(self, mode):
        class MyBytesIO(io.BytesIO):
            hit_eof = False
            def read(self, n):
                if self.hit_eof:
                    raise AssertionError("infinite loop detected in "
                                         "tarfile.open()")
                self.hit_eof = self.tell() == len(self.getvalue())
                return super(MyBytesIO, self).read(n)
            def seek(self, *args):
                self.hit_eof = False
                return super(MyBytesIO, self).seek(*args)

        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
        for x in range(len(data) + 1):
            try:
                tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode)
            except tarfile.ReadError:
                pass # we have no interest in ReadErrors
项目:DroidWatcher    作者:suemi994    | 项目源码 | 文件源码
def __init__(self, path="./libsimilarity/libsimilarity.so"):
        super(SIMILARITYNative, self).__init__(True)

        self._u = cdll.LoadLibrary( path )

        self._u.compress.restype = c_uint
        self._u.ncd.restype = c_int
        self._u.ncs.restype = c_int
        self._u.cmid.restype = c_int
        self._u.entropy.restype = c_double
        self._u.levenshtein.restype = c_uint

        self._u.kolmogorov.restype = c_uint
        self._u.bennett.restype = c_double
        self._u.RDTSC.restype = c_double

        self.__libsim_t = LIBSIMILARITY_T()

        self.set_compress_type( ZLIB_COMPRESS )
项目:DroidWatcher    作者:suemi994    | 项目源码 | 文件源码
def _ncd(self, s1, s2, s1size=0, s2size=0):
        if s1size == 0:
            s1size = self.compress(s1)

        if s2size == 0:
            s2size = self.compress(s2)

        s3size = self.compress(s1+s2)

        smax = max(s1size, s2size)
        smin = min(s1size, s2size)

        res = (abs(s3size - smin)) / float(smax)
        if res > 1.0:
            res = 1.0

        return res, s1size, s2size, 0
项目:DroidWatcher    作者:suemi994    | 项目源码 | 文件源码
def __init__(self, path="./libsimilarity/libsimilarity.so"):
        super(SIMILARITYNative, self).__init__(True)

        self._u = cdll.LoadLibrary( path )

        self._u.compress.restype = c_uint
        self._u.ncd.restype = c_int
        self._u.ncs.restype = c_int
        self._u.cmid.restype = c_int
        self._u.entropy.restype = c_double
        self._u.levenshtein.restype = c_uint

        self._u.kolmogorov.restype = c_uint
        self._u.bennett.restype = c_double
        self._u.RDTSC.restype = c_double

        self.__libsim_t = LIBSIMILARITY_T()

        self.set_compress_type( ZLIB_COMPRESS )
项目:DroidWatcher    作者:suemi994    | 项目源码 | 文件源码
def _ncd(self, s1, s2, s1size=0, s2size=0):
        if s1size == 0:
            s1size = self.compress(s1)

        if s2size == 0:
            s2size = self.compress(s2)

        s3size = self.compress(s1+s2)

        smax = max(s1size, s2size)
        smin = min(s1size, s2size)

        res = (abs(s3size - smin)) / float(smax)
        if res > 1.0:
            res = 1.0

        return res, s1size, s2size, 0
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_queueMessages_processes_files_message_instantly(self):
        worker = StatusWorkerService(sentinel.dbtasks)
        mock_processMessage = self.patch(worker, "_processMessage")
        contents = b'These are the contents of the file.'
        encoded_content = encode_as_base64(bz2.compress(contents))
        message = self.make_message()
        message['files'] = [
            {
                "path": "sample.txt",
                "encoding": "uuencode",
                "compression": "bzip2",
                "content": encoded_content
            }
        ]
        nodes_with_tokens = yield deferToDatabase(self.make_nodes_with_tokens)
        node, token = nodes_with_tokens[0]
        yield worker.queueMessage(token.key, message)
        self.assertThat(
            mock_processMessage,
            MockCalledOnceWith(node, message))
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_queueMessages_handled_invalid_nodekey_with_instant_msg(self):
        worker = StatusWorkerService(sentinel.dbtasks)
        mock_processMessage = self.patch(worker, "_processMessage")
        contents = b'These are the contents of the file.'
        encoded_content = encode_as_base64(bz2.compress(contents))
        message = self.make_message()
        message['files'] = [
            {
                "path": "sample.txt",
                "encoding": "uuencode",
                "compression": "bzip2",
                "content": encoded_content
            }
        ]
        nodes_with_tokens = yield deferToDatabase(self.make_nodes_with_tokens)
        node, token = nodes_with_tokens[0]
        yield deferToDatabase(token.delete)
        yield worker.queueMessage(token.key, message)
        self.assertThat(
            mock_processMessage, MockNotCalled())
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_status_with_file_bad_encoder_fails(self):
        node = factory.make_Node(
            interface=True, status=NODE_STATUS.COMMISSIONING)
        contents = b'These are the contents of the file.'
        encoded_content = encode_as_base64(bz2.compress(contents))
        payload = {
            'event_type': 'finish',
            'result': 'FAILURE',
            'origin': 'curtin',
            'name': 'commissioning',
            'description': 'Commissioning',
            'timestamp': datetime.utcnow(),
            'files': [
                {
                    "path": "sample.txt",
                    "encoding": "uuencode",
                    "compression": "bzip2",
                    "content": encoded_content
                }
            ]
        }
        with ExpectedException(ValueError):
            self.processMessage(node, payload)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_status_with_file_bad_compression_fails(self):
        node = factory.make_Node(
            interface=True, status=NODE_STATUS.COMMISSIONING)
        contents = b'These are the contents of the file.'
        encoded_content = encode_as_base64(bz2.compress(contents))
        payload = {
            'event_type': 'finish',
            'result': 'FAILURE',
            'origin': 'curtin',
            'name': 'commissioning',
            'description': 'Commissioning',
            'timestamp': datetime.utcnow(),
            'files': [
                {
                    "path": "sample.txt",
                    "encoding": "base64",
                    "compression": "jpeg",
                    "content": encoded_content
                }
            ]
        }
        with ExpectedException(ValueError):
            self.processMessage(node, payload)
项目:lksh-2015-winter-as-agar    作者:burunduk3    | 项目源码 | 文件源码
def accept (server, mask):
    global poll, cnt
    if mask & selectors.EVENT_READ:
        while True:
            try:
                conn, addr = sock.accept ()     
                print("Connected: " + str(addr))
                v["id"] = cnt                                  
                # conn.send(compress(bytes(json.dumps(v), 'utf-8')))
                conn.send(bytes(json.dumps(v), 'utf-8'))
            except BlockingIOError:
                break
            conn.setblocking (False)
            poll.register (conn, selectors.EVENT_READ, read)
            clients[conn] = cnt
            #FIXED THIS
            cnt += 1
            #cnt = random.randint(1, 10**40) 
        mask &=~ selectors.EVENT_READ
    assert mask == 0
项目:lksh-2015-winter-as-agar    作者:burunduk3    | 项目源码 | 文件源码
def sendMap(id, data):
    global localServer
    data = json.dumps(data)
    # if DEBUG_PROTOCOL_PRINT:
    #     print(data)
    q = clients
    for x in q:
        try:
            if (q[x] == id):
                # x.send(compress(bytes(data + '\n', 'utf-8')))
                x.send(bytes(data + '\n', 'utf-8'))
                break
        except:
            clientsLock.acquire()
            if x in clients:
                print("deleting user " + str(clients[x]))
                poll.unregister(x)
                x.close()
                del clients[x]
            localServer.UserExit(id)
            clientsLock.release()
            break
项目:lksh-2015-winter-as-agar    作者:burunduk3    | 项目源码 | 文件源码
def registerMe(name):     
    global sock, tcp_ip, tcp_port

    inf = open('config.txt', 'r')
    config = inf.readline()
    tcp_ip, tcp_port = config.split(' ')
    tcp_port = int(tcp_port)

    sock.connect((tcp_ip, tcp_port))
    data = sock.recv(MAX_LENGTH)
    # id = json.loads(str(decompress(data), 'utf-8'))['id']
    id = json.loads(str(data, 'utf-8'))['id']
    jdata = dict()
    jdata['name'] = name
    s = json.dumps(jdata) 
    # sock.send(compress(bytes(s + '\n', 'utf-8')))
    sock.send(bytes(s + '\n', 'utf-8'))
    return id
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def save(self, path=None):
        "Saves the database to path or most recently known path."
        if path is None:
            assert self.__path is not None, 'Path must be provided!'
            path = self.__path
        with open(path, 'wb') as file:
            file.write(bz2.compress(pickle.dumps(self)))
        self.__path = path
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def bz2_pack(source):
    "Returns 'source' as a bzip2-compressed, self-extracting python script."
    import bz2, base64
    out = ""
    compressed_source = bz2.compress(source)
    out += 'import bz2, base64\n'
    out += "exec bz2.decompress(base64.b64decode('"
    out += base64.b64encode((compressed_source))
    out += "'))\n"
    return out
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def gz_pack(source):
    "Returns 'source' as a gzip-compressed, self-extracting python script."
    import zlib, base64
    out = ""
    compressed_source = zlib.compress(source)
    out += 'import zlib, base64\n'
    out += "exec zlib.decompress(base64.b64decode('"
    out += base64.b64encode((compressed_source))
    out += "'))\n"
    return out

# The test.+() functions below are for testing pyminifer...
项目:TACTIC-Handler    作者:listyque    | 项目源码 | 文件源码
def bz2_pack(source):
    """
    Returns 'source' as a bzip2-compressed, self-extracting python script.

    .. note::

        This method uses up more space than the zip_pack method but it has the
        advantage in that the resulting .py file can still be imported into a
        python program.
    """
    import bz2, base64
    out = ""
    # Preserve shebangs (don't care about encodings for this)
    first_line = source.split('\n')[0]
    if analyze.shebang.match(first_line):
        if py3:
            if first_line.rstrip().endswith('python'): # Make it python3
                first_line = first_line.rstrip()
                first_line += '3' #!/usr/bin/env python3
        out = first_line + '\n'
    compressed_source = bz2.compress(source.encode('utf-8'))
    out += 'import bz2, base64\n'
    out += "exec(bz2.decompress(base64.b64decode('"
    out += base64.b64encode(compressed_source).decode('utf-8')
    out += "')))\n"
    return out
项目:TACTIC-Handler    作者:listyque    | 项目源码 | 文件源码
def gz_pack(source):
    """
    Returns 'source' as a gzip-compressed, self-extracting python script.

    .. note::

        This method uses up more space than the zip_pack method but it has the
        advantage in that the resulting .py file can still be imported into a
        python program.
    """
    import zlib, base64
    out = ""
    # Preserve shebangs (don't care about encodings for this)
    first_line = source.split('\n')[0]
    if analyze.shebang.match(first_line):
        if py3:
            if first_line.rstrip().endswith('python'): # Make it python3
                first_line = first_line.rstrip()
                first_line += '3' #!/usr/bin/env python3
        out = first_line + '\n'
    compressed_source = zlib.compress(source.encode('utf-8'))
    out += 'import zlib, base64\n'
    out += "exec(zlib.decompress(base64.b64decode('"
    out += base64.b64encode(compressed_source).decode('utf-8')
    out += "')))\n"
    return out
项目:TACTIC-Handler    作者:listyque    | 项目源码 | 文件源码
def lzma_pack(source):
    """
    Returns 'source' as a lzma-compressed, self-extracting python script.

    .. note::

        This method uses up more space than the zip_pack method but it has the
        advantage in that the resulting .py file can still be imported into a
        python program.
    """
    import lzma, base64
    out = ""
    # Preserve shebangs (don't care about encodings for this)
    first_line = source.split('\n')[0]
    if analyze.shebang.match(first_line):
        if py3:
            if first_line.rstrip().endswith('python'): # Make it python3
                first_line = first_line.rstrip()
                first_line += '3' #!/usr/bin/env python3
        out = first_line + '\n'
    compressed_source = lzma.compress(source.encode('utf-8'))
    out += 'import lzma, base64\n'
    out += "exec(lzma.decompress(base64.b64decode('"
    out += base64.b64encode(compressed_source).decode('utf-8')
    out += "')))\n"
    return out
项目:compresso    作者:VCG    | 项目源码 | 文件源码
def compress(data):
        return data
项目:compresso    作者:VCG    | 项目源码 | 文件源码
def compress(data, *args, **kwargs):
        return compresso.compresso.compress(data, *args, **kwargs)
项目:compresso    作者:VCG    | 项目源码 | 文件源码
def compress(data, *args, **kwargs):
        return neuroglancer.neuroglancer.compress(data, *args, **kwargs)
项目:compresso    作者:VCG    | 项目源码 | 文件源码
def compress(data, *args, **kwargs):
        return bz2.compress(data, *args, **kwargs)
项目:compresso    作者:VCG    | 项目源码 | 文件源码
def compress(data, *args, **kwargs):
        if type(data) is np.ndarray:
            str_data = data.tobytes()
        elif type(data) is str:
            str_data = data
        else:
            raise ValueError('Data type not supported')

        dictionary = lz78.lz78.compress(str_data, *args, **kwargs)

        array = np.zeros(len(dictionary), dtype=np.uint32)
        retry = False
        for ie, entry in enumerate(dictionary):
            if entry[1] == '':
                if (entry[0] >= 2**24):
                    retry = True
                    break
                array[ie] = (entry[0] << 8)
            else:
                if (entry[0] >= 2**24):
                    retry =  True
                    break
                array[ie] = (entry[0] << 8) + ord(entry[1])

        if not retry: return array
        else:
            array = np.zeros(len(dictionary), dtype=np.uint64)
            for ie, entry in enumerate(dictionary):
                if entry[1] == '':
                    array[ie] = (entry[0] << 8)
                else:
                    array[ie] = (entry[0] << 8) + ord(entry[1])
            return array
项目:compresso    作者:VCG    | 项目源码 | 文件源码
def compress(data, *args, **kwargs):
        return lzma.compress(data, *args, **kwargs)
项目:compresso    作者:VCG    | 项目源码 | 文件源码
def compress(data, *args, **kwargs):
        return lzo.compress(data, *args, **kwargs)
项目:compresso    作者:VCG    | 项目源码 | 文件源码
def compress(data, *args, **kwargs):        
        if type(data) is np.ndarray:
            str_data = data.tobytes()
        elif type(data) is str:
            str_data = data
        else:
            raise ValueError('Data type not supported')

        # create an empty dictionary
        dict_size = 2**8
        dictionary = dict((chr(i), i) for i in xrange(dict_size))

        w = ''
        result = []
        for c in str_data:
            wc = w + c
            if wc in dictionary:
                w = wc
            else:
                result.append(dictionary[w])
                dictionary[wc] = dict_size
                dict_size += 1
                w = c

        if w:
            result.append(dictionary[w])

        return np.array(result, dtype=np.uint32)
项目:compresso    作者:VCG    | 项目源码 | 文件源码
def compress(data, *args, **kwargs):
        return zlib.compress(data, *args, **kwargs)
项目:compresso    作者:VCG    | 项目源码 | 文件源码
def compress(data, *args, **kwargs):
        return zstd.compress(data, *args, **kwargs)
项目:compresso    作者:VCG    | 项目源码 | 文件源码
def compress(data, *args, **kwargs):
        return _png.compress(data)
项目:compresso    作者:VCG    | 项目源码 | 文件源码
def compress(data, *args, **kwargs):
        return x264.compress(data)
项目:py2p    作者:p2p-today    | 项目源码 | 文件源码
def string(self):
        # type: (InternalMessage) -> bytes
        """Returns a :py:class:`bytes` representation of the message

        Raises:
            TypeError: See :py:func:`~py2p.base.InternalMessage._InternalMessage__non_len_string`
        """
        if not all((self.__id, self.__string, self.__full_string)):
            id_ = self.id
            ret = b''.join((id_, self.__non_len_string))
            compression_used = self.compression_used
            if compression_used:
                ret = compress(ret, compression_used)
            self.__full_string = b''.join((pack_value(4, len(ret)), ret))
        return self.__full_string
项目:arclib    作者:kirbyfan64    | 项目源码 | 文件源码
def test_incremental_decompress():
    basic_test_d(bz2.Decompressor(), compress)
项目:tasker    作者:wavenator    | 项目源码 | 文件源码
def compress(
        data,
    ):
        compressed_object = bz2.compress(data)

        return compressed_object
项目:py-cloud-compute-cannon    作者:Autodesk    | 项目源码 | 文件源码
def _contents(self, value):
        import bz2
        self._contents = bz2.compress(value, 1)
项目:taf    作者:taf3    | 项目源码 | 文件源码
def decode(payload_str):
    """Decode base64 encoded bz2 compress network payload

    Args:
        payload_str(str): base64 encoded bz2 compressed byte stream

    Returns:
        str: original byte-stream

    """
    return bz2.decompress(base64.urlsafe_b64decode(payload_str.encode('utf-8')).decode('utf-8'))
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def task_compress_zlib():
    """zlib compression (C)"""
    import zlib
    with open(__file__, "rb") as f:
        arg = f.read(5000) * 3

    def compress(s):
        zlib.decompress(zlib.compress(s, 5))
    return compress, (arg, )
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def task_compress_bz2():
    """bz2 compression (C)"""
    import bz2
    with open(__file__, "rb") as f:
        arg = f.read(3000) * 2

    def compress(s):
        bz2.compress(s)
    return compress, (arg, )
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def task_compress_zlib():
    """zlib compression (C)"""
    import zlib
    with open(__file__, "rb") as f:
        arg = f.read(5000) * 3

    def compress(s):
        zlib.decompress(zlib.compress(s, 5))
    return compress, (arg, )
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def task_compress_bz2():
    """bz2 compression (C)"""
    import bz2
    with open(__file__, "rb") as f:
        arg = f.read(3000) * 2

    def compress(s):
        bz2.compress(s)
    return compress, (arg, )