Python gzip 模块,compress() 实例源码

我们从Python开源项目中,提取了以下27个代码示例,用于说明如何使用gzip.compress()

项目:miracle    作者:mozilla    | 项目源码 | 文件源码
def upload(processor, data):
    user_token = data['user']
    today = date.today()
    blob = json.dumps(data, separators=(',', ':')).encode('utf-8')
    blob = gzip.compress(blob, 7)
    name = 'v2/sessions/%s/%s/%s/%s.json.gz' % (
        today.year, today.month, user_token, uuid.uuid1().hex)
    try:
        processor.bucket.put(
            name, blob,
            ContentEncoding='gzip',
            ContentType='application/json')
    except ClientError:  # pragma: no cover
        processor.raven.captureException()
        return False

    return True
项目:IoT-Client    作者:suquark    | 项目源码 | 文件源码
def generate_msg(role, msg_dict=None):
    """
    See `IoT Protocol Specification`
    :param msg_dict: The data dict.
    :param role:
    :return: Encrypted bytes
    """
    logging.debug("Generating a message ...")
    msg = {'proto': 'iddp',
           'role': role,
           'timestamp': datetime.datetime.utcnow().timestamp(),
           'id': identity,
           'data': msg_dict}

    msg = simplejson.dumps(msg, separators=(',', ':')).encode()
    return crypto.encrypt(gzip.compress(msg))[0]
项目:kOVHernetes    作者:antoineco    | 项目源码 | 文件源码
def gen_kubeconfig(self, component, server='localhost'):
        """Generate kubeconfig"""

        kubeconfig = loads(files['kubeconfig'].decode(), object_pairs_hook=OrderedDict)
        kubeconfig['users'][0]['user']['client-certificate'] = 'tls/client/{}.crt'.format(component)
        kubeconfig['clusters'][0]['cluster']['server'] = 'https://' + server + ':6443'

        kubeconfig = compress((dumps(kubeconfig, indent=2) + '\n').encode())

        self.add_files([
            {
                'filesystem': 'root',
                'path': '/etc/kubernetes/kubeconfig-' + component,
                'mode': 416, # 0640
                'contents': {
                    'source': 'data:,' + quote(kubeconfig),
                    'compression': 'gzip'
                }
            }
        ])
项目:kOVHernetes    作者:antoineco    | 项目源码 | 文件源码
def gen_kubemanifest(self, component, tag):
        """Generate Kubernetes Pod manifest"""

        manifest = loads(files[component].decode(), object_pairs_hook=OrderedDict)
        manifest['spec']['containers'][0]['image'] = 'quay.io/coreos/hyperkube:' + tag

        manifest = compress((dumps(manifest, indent=2) + '\n').encode())

        self.add_files([
            {
                'filesystem': 'root',
                'path': '/etc/kubernetes/manifests/kube-{}.json'.format(component),
                'mode': 416, # 0640
                'contents': {
                    'source': 'data:,' + quote(manifest),
                    'compression': 'gzip'
                }
            }
        ])
项目:aws-emr-streaming-templates    作者:GINK03    | 项目源码 | 文件源码
def _map1(name):
  key_term_freq = {}
  print(name)
  for line in open(name):
    line = line.strip()
    #print(line)
    key, val = line.split('\t')
    val = json.loads(val)

    if key_term_freq.get(key) is None:
      key_term_freq[key] = {}
    for term, freq in val.items(): 
      if  key_term_freq[key].get(term) is None:
        key_term_freq[key][term] = 0 
      key_term_freq[key][term] += freq
    #print( term, key_term_freq[key][term] )
  save_name = 'shrink/{}.pkl.gz'.format(name.split('/').pop())
  #print( key_term_freq )
  open(save_name,'wb').write( gzip.compress(pickle.dumps(key_term_freq)) )
项目:flask_api    作者:nullcc    | 项目源码 | 文件源码
def after_request(self, response):
        accept_encoding = request.headers.get('Accept-Encoding', '')
        if not accept_encoding:
            return response

        encodings = accept_encoding.split(',')
        if 'gzip' not in encodings:
            return response

        if (200 > response.status_code >= 300) or len(response.data) < 500 or 'Content-Encoding' in response.headers:
            return response

        response.data = gzip.compress(response.data, compresslevel=self.compress_level)
        response.headers['Content-Encoding'] = 'gzip'
        response.headers['Content-Length'] = len(response.data)

        return response
项目:defuse_division    作者:lelandbatey    | 项目源码 | 文件源码
def send(conn, obj):
    msg = json_dump(obj)
    msg = msg.encode('utf-8')
    msg = gzip.compress(msg)
    msg += SEP
    conn.sendall(msg)
项目:scibot    作者:SciCrunch    | 项目源码 | 文件源码
def export(request):
    print('starting csv export')
    output_rows, DATE = export_impl()    
    data = StringIO()
    writer = csv.writer(data)
    writer.writerows(sorted(output_rows))

    r = Response(gzip.compress(data.getvalue().encode()))
    r.content_type = 'text/csv'
    r.headers.update({
        'Content-Disposition':'attachment;filename = RRID-data-%s.csv' % DATE,
        'Content-Encoding':'gzip'
        })

    return r
项目:scibot    作者:SciCrunch    | 项目源码 | 文件源码
def export_json(request):
    print('starting json export')
    output_json, DATE = export_json_impl()    
    data = json.dumps(output_json, sort_keys=True, indent=4)

    r = Response(gzip.compress(data.encode()))
    r.content_type = 'application/json'
    r.headers.update({
        'Content-Encoding':'gzip'
        })

    return r
项目:postmarker    作者:Stranger6667    | 项目源码 | 文件源码
def encode(value, compress=False):
    """
    Converts dict to JSON and encodes it to Base64.
    """
    encoded = json.dumps(value, separators=(',', ':')).encode()
    if compress:
        encoded = gzip.compress(encoded)
    return base64.b64encode(encoded).decode()
项目:Telethon    作者:LonamiWebs    | 项目源码 | 文件源码
def __bytes__(self):
        # TODO Maybe compress level could be an option
        return struct.pack('<I', GzipPacked.CONSTRUCTOR_ID) + \
               TLObject.serialize_bytes(gzip.compress(self.data))
项目:opentsdb-py    作者:scarchik    | 项目源码 | 文件源码
def sendall(self, *metrics) -> dict:
        logger.debug("Send metrics:\n %s", '\n'.join(str(m) for m in metrics))
        response = self.connect.post(
            self.tsdb_urls.put,
            data=gzip.compress(json.dumps(metrics).encode()) if self.compression else json.dumps(metrics),
            timeout=self.SEND_TIMEOUT)
        return response.json()
项目:floto    作者:babbel    | 项目源码 | 文件源码
def compress_generator_result(result):
    serializable = [t.serializable() for t in result]
    if result and COMPRESS_GENERATOR_RESULT:
        j = floto.specs.JSONEncoder.dump_object(serializable)
        z = gzip.compress(j.encode())
        z = 'x'.join([format(c, 'x') for c in z])
        return z
    else:
        return serializable
项目:tasker    作者:wavenator    | 项目源码 | 文件源码
def compress(
        data,
    ):
        compressed_object = gzip.compress(data)

        return compressed_object
项目:kOVHernetes    作者:antoineco    | 项目源码 | 文件源码
def res_gzip(resource):
    """Returns package data as gzipped bytes"""
    return compress(res_plain(resource))

# Reusable data from static files
项目:kOVHernetes    作者:antoineco    | 项目源码 | 文件源码
def gen_etc_hosts(self, client, net):
        """Generate /etc/hosts file containing all subnet hosts

        Makes it possible to register k8s nodes by hostname.
        Disgusting hack to make up for OVH's terrible DNS.
        """
        from ipaddress import IPv4Network

        subnet = client.get('/cloud/project/{}/network/private/{}/subnet'.format(client._project, net))[0]
        hosts = IPv4Network(subnet['cidr']).hosts()
        hosts_content = compress(
            ('127.0.0.1\tlocalhost\n' + '::1\t\tlocalhost\n' +
             '\n'.join(['{}\t{}'.format(ip, 'host-'+str(ip).replace('.', '-')) for ip in hosts]) + '\n').encode()
        )

        self.add_files([
            {
                'filesystem': 'root',
                'path': '/etc/hosts',
                'mode': 420, # 0644
                'contents': {
                    'source': 'data:,' + quote(hosts_content),
                    'compression': 'gzip'
                }
            }
        ])
项目:mobai    作者:eguven    | 项目源码 | 文件源码
def serialize(gamestate):
        return gzip.compress(pickle.dumps(gamestate), compresslevel=1)
项目:quilt    作者:quiltdata    | 项目源码 | 文件源码
def gzip_compress(data):
    """
    Compress a string. Same as gzip.compress in Python3.
    """
    buf = BytesIO()
    with gzip.GzipFile(fileobj=buf, mode='wb') as fd:
        fd.write(data)
    return buf.getvalue()
项目:Codex    作者:TomCrypto    | 项目源码 | 文件源码
def save(self, dataset):
        with open(dataset, 'wb') as datafile:
            datafile.write(gzip.compress(pickle.dumps((
                self.markers, self.vectors,\
                self.classes, self.classif,
                self.threshold))))


###########################
#### CLI scripts below ####
###########################
项目:xphyle    作者:jdidion    | 项目源码 | 文件源码
def test_xopen_compressed_stream(self):
        # Try autodetect compressed
        with intercept_stdin(gzip.compress(b'foo\n'), is_bytes=True):
            with xopen(
                    STDIN, 'rt', compression=True, context_wrapper=True) as i:
                self.assertEqual(i.compression, 'gzip')
                self.assertEqual(i.read(), 'foo\n')
项目:maas    作者:maas    | 项目源码 | 文件源码
def test__raises_when_inner_encapsulation_is_not_bson(self):
        self.write_secret()
        payload = fernet_encrypt_psk(compress(b"\n\n"), raw=True)
        packet = _make_beacon_payload(payload=payload)
        with ExpectedException(
                InvalidBeaconingPacket, ".*beacon payload is not BSON.*"):
            read_beacon_payload(packet)
项目:pyoffers    作者:Stranger6667    | 项目源码 | 文件源码
def body(self, value):
        if self.is_gzip:
            value = gzip.compress(value)
        self.data['response']['body']['base64_string'] = base64.b64encode(value).decode()
项目:hocrviewer-mirador    作者:jbaiter    | 项目源码 | 文件源码
def _update_search_index(self, doc_id, autocomplete_min_count):
        # FIXME: This is a bit unwiedly and I'd prefer there was a nicely
        #        scalable in-SQL solution, but unfortunately keeping the
        #        term frequencies for each document in a table makes
        #        the database size explode, so gzipped json-dumped counters
        #        it is for now :/
        with self._db as cur:
            terms_before = Counter(dict(
                cur.execute("SELECT term, cnt FROM text_vocab").fetchall()))
            cur.execute(UPDATE_INDEX_SINGLE_DOCUMENT, {'document_id': doc_id})
            terms_after = Counter(dict(
                cur.execute("SELECT term, cnt FROM text_vocab").fetchall()))
            doc_terms = Counter(dict(
                (term, cnt_after - terms_before.get('term', 0))
                for term, cnt_after in terms_after.items()
                if cnt_after != terms_before.get('term')))
            # Purge terms below threshold to save on size
            to_purge = []
            for term, cnt in doc_terms.items():
                if cnt < autocomplete_min_count:
                    to_purge.append(term)
            for term in to_purge:
                del doc_terms[term]
            cur.execute(
                "INSERT INTO lexica (document_id, counter) VALUES (?, ?)",
                (doc_id, gzip.compress(json.dumps(doc_terms).encode('utf8'))))
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def execute(self, method, *args):
        payload = dumps(args, methodname=method, allow_none=True)
        body = gzip.compress(payload.encode('utf8'))
        try:
            res = await self.loop.run_in_executor(None, self.__request, body)
            data, _ = loads(res.text, use_datetime=True)
            if isinstance(data, (tuple, list)) and len(data) > 0 and len(data[0]) > 0:
                if isinstance(data[0][0], dict) and 'faultCode' in data[0][0]:
                    raise DedimaniaFault(faultCode=data[0][0]['faultCode'], faultString=data[0][0]['faultString'])
                self.retries = 0
                return data[0]
            raise DedimaniaTransportException('Invalid response from dedimania!')
        except (ConnectionError, ReadTimeout, ConnectionRefusedError) as e:
            raise DedimaniaTransportException(e) from e
        except ConnectTimeout as e:
            raise DedimaniaTransportException(e) from e
        except DedimaniaTransportException:
            # Try to setup new session.
            self.retries += 1
            if self.retries > 5:
                raise DedimaniaTransportException('Dedimania didn\'t gave the right answer after few retries!')
            self.client = requests.session()
            try:
                await self.authenticate()
                return await self.execute(method, *args)
            except Exception as e:
                logger.error('XML-RPC Fault retrieved from Dedimania: {}'.format(str(e)))
                handle_exception(e, __name__, 'execute')
                raise DedimaniaTransportException('Could not retrieve data from dedimania!')
        except DedimaniaFault as e:
            if 'Bad SessionId' in e.faultString or ('SessionId' in e.faultString and 'not found' in e.faultString):
                try:
                    self.retries += 1
                    if self.retries > 5:
                        raise DedimaniaTransportException('Max retries reached for reauthenticating with dedimania!')
                    await self.authenticate()
                    return await self.execute(method, *args)
                except:
                    return
            logger.error('XML-RPC Fault retrieved from Dedimania: {}'.format(str(e)))
            handle_exception(e, __name__, 'execute', extra_data={
                'dedimania_retries': self.retries,
            })
            raise DedimaniaTransportException('Could not retrieve data from dedimania!')
项目:rpi-can-logger    作者:JonnoFTW    | 项目源码 | 文件源码
def export_files(sock):
    print("currently writing", writing_to['name'])
    for fname in sorted(glob(log_folder + "/*.json*")):
        if fname.endswith(writing_to['name']):
            print(fname, "is currently being written to")
            continue
        if fname.endswith('.done'):
            print("Skipping", fname)
            sock.send("$skipping={}!\n".format(fname))
            continue
        # we will send base64 encoded gzipped json
        with open(fname, 'rb') as infile:
            file_bytes = infile.read()
            if fname.endswith(".json"):
                if len(file_bytes) == 0:
                    print("Skipping empty file:", fname)
                    os.remove(fname)
                    continue
                json_gzip_bytes = gzip.compress(file_bytes)
            else:
                json_gzip_bytes = file_bytes
            json_gzip_base64 = base64.b64encode(json_gzip_bytes)
            try:

                if struct.unpack('I', json_gzip_bytes[-4:])[0] == 0:
                    # don't send empty files
                    logging.warning("Skipping empty file: " + pathlib.Path(fname).name)
                    os.remove(fname)
                    continue
            except:
                logging.warning("Not a GZIP file: " + fname)
                continue
            msg = '$export={}={}!\n'.format(len(json_gzip_bytes), pathlib.Path(fname).name)
            print(msg, end='')
            sock.send(msg)
            n = 900
            to_send_str = str(json_gzip_base64, 'ascii')
            lines = [to_send_str[i:i + n] for i in range(0, len(to_send_str), n)]
            for line in lines:
                sock.send("$export={}\n".format(line))
            sock.send("$done\n")
            os.rename(fname, fname+'.done')
        sock.send('$export=done\n')
项目:sadm    作者:prologin    | 项目源码 | 文件源码
def spawn_server(config, rep_addr, pub_addr, nb_players, sockets_dir,
                       opts, file_opts):
    # Build command
    cmd = [config['path']['stechec_server'],
           "--rules", config['path']['rules'],
           "--rep_addr", rep_addr,
           "--pub_addr", pub_addr,
           "--nb_clients", str(nb_players),
           "--time", "3000",
           "--socket_timeout", "45000",
           "--dump", "/box/dump.json",
           "--verbose", "1"]

    if opts is not None:
        cmd += opts
    if file_opts is not None:
        fopts, tmp_files = create_file_opts(file_opts)
        cmd.extend(fopts)

    # Create the isolator
    limits = {'wall-time': config['timeout'].get('server', 400)}
    isolator = isolate.Isolator(
        limits, allowed_dirs=['/var', '/tmp', sockets_dir + ':rw'])
    async with isolator:
        # Run the isolated server
        await isolator.run(cmd, merge_outputs=True)

        # Retrieve the dump and gz-compress it
        try:
            dump_path = isolator.path / 'dump.json'
            with dump_path.open('rb') as dump:
                gzdump = gzip.compress(dump.read())
        except FileNotFoundError:
            raise_isolate_error("server: dump.json was not created.\n", cmd,
                                isolator)

    # Retrieve the output
    output = get_output(isolator)
    if isolator.isolate_retcode != 0:
        raise_isolate_error("server: exited with a non-zero code", cmd,
                            isolator)
    return output, gzdump
项目:UM3NetworkPrintingPlugin    作者:Ultimaker    | 项目源码 | 文件源码
def startPrint(self):
        try:
            self._send_gcode_start = time()
            self._progress_message = Message(i18n_catalog.i18nc("@info:status", "Sending data to printer"), 0, False, -1)
            self._progress_message.addAction("Abort", i18n_catalog.i18nc("@action:button", "Cancel"), None, "")
            self._progress_message.actionTriggered.connect(self._progressMessageActionTrigger)
            self._progress_message.show()
            Logger.log("d", "Started sending g-code to remote printer.")
            self._compressing_print = True
            ## Mash the data into single string
            byte_array_file_data = b""
            for line in self._gcode:
                if not self._compressing_print:
                    self._progress_message.hide()
                    return  # Stop trying to zip, abort was called.
                if self._use_gzip:
                    byte_array_file_data += gzip.compress(line.encode("utf-8"))
                    QCoreApplication.processEvents()  # Ensure that the GUI does not freeze.
                    # Pretend that this is a response, as zipping might take a bit of time.
                    self._last_response_time = time()
                else:
                    byte_array_file_data += line.encode("utf-8")

            if self._use_gzip:
                file_name = "%s.gcode.gz" % Application.getInstance().getPrintInformation().jobName
            else:
                file_name = "%s.gcode" % Application.getInstance().getPrintInformation().jobName

            self._compressing_print = False
            ##  Create multi_part request
            self._post_multi_part = QHttpMultiPart(QHttpMultiPart.FormDataType)

            ##  Create part (to be placed inside multipart)
            self._post_part = QHttpPart()
            self._post_part.setHeader(QNetworkRequest.ContentDispositionHeader,
                           "form-data; name=\"file\"; filename=\"%s\"" % file_name)
            self._post_part.setBody(byte_array_file_data)
            self._post_multi_part.append(self._post_part)

            url = QUrl("http://" + self._address + self._api_prefix + "print_job")

            ##  Create the QT request
            self._post_request = QNetworkRequest(url)

            ##  Post request + data
            self._post_reply = self._manager.post(self._post_request, self._post_multi_part)
            self._post_reply.uploadProgress.connect(self._onUploadProgress)

        except IOError:
            self._progress_message.hide()
            self._error_message = Message(i18n_catalog.i18nc("@info:status", "Unable to send data to printer. Is another job still active?"))
            self._error_message.show()
        except Exception as e:
            self._progress_message.hide()
            Logger.log("e", "An exception occurred in network connection: %s" % str(e))

    ##  Verify if we are authenticated to make requests.