Python gzip 模块,decompress() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用gzip.decompress()

项目:xuexi    作者:liwanlei    | 项目源码 | 文件源码
def show():

        city=e1.get()
        url='http://wthrcdn.etouch.cn/WeatherApi?city='+urllib.parse.quote(city)
        weather= urllib.request.urlopen(url).read()
        weather_data = gzip.decompress(weather).decode('utf-8')
        try:
                soup=BeautifulSoup(weather_data)
                wheater=soup.find_all('weather')
                Text = (('??:%s'%soup.shidu.text),('??:%s'%soup.fengli.text),wheater[0].high.text,wheater[0].low.text,('??:%s'%wheater[0].type.text))
                e2['state']= 'normal'
                e2.delete(1.0,tk.END)
                e2.insert(tk.END,Text)
                e2['state']= 'disabled'
        except:
                Text ='??????????????????'
                e2['state']= 'normal'
                e2.delete(1.0,tk.END)
                e2.insert(tk.END,Text)
                e2['state']= 'disabled'
项目:ssbio    作者:SBRG    | 项目源码 | 文件源码
def download_sifts_xml(pdb_id, outdir='', outfile=''):
    """Download the SIFTS file for a PDB ID.

    Args:
        pdb_id:
        outdir:
        outfile:

    Returns:

    """
    baseURL = 'ftp://ftp.ebi.ac.uk/pub/databases/msd/sifts/xml/'
    filename = '{}.xml.gz'.format(pdb_id)

    if outfile:
        outfile = op.join(outdir, outfile)
    else:
        outfile = op.join(outdir, filename.split('.')[0] + '.sifts.xml')

    if not op.exists(outfile):
        response = urlopen(baseURL + filename)
        with open(outfile, 'wb') as f:
            f.write(gzip.decompress(response.read()))

    return outfile
项目:forget    作者:codl    | 项目源码 | 文件源码
def test_brotli_dynamic_cache(br_client):
    from brotli import decompress
    from time import sleep

    br_client.get(
            '/',
            headers=[('accept-encoding', 'gzip, br')])

    sleep(0.5)

    resp = br_client.get(
            '/',
            headers=[('accept-encoding', 'gzip, br')])

    assert resp.headers.get('x-brotli-cache') == 'HIT'
    assert resp.headers.get('content-encoding') == 'br'
    assert b"Hello, world!" in decompress(resp.data)
项目:n-gram    作者:sunshineclt    | 项目源码 | 文件源码
def saveContentOfURL(target_url):
    # ?????????URL??????,???
    if target_url in searched_url:
        return
    try:
        # ??GET??target_url
        article_response = urllib.request.urlopen(target_url)
        raw_data = article_response.read()
        # ???????gzip????????
        if article_response.getheader("Content-Encoding") == "gzip":
            raw_data = gzip.decompress(raw_data)
        # gb2312??,???????????????,??????
        article_data = raw_data.decode('gb2312', 'ignore')
        # ?????<p></p>????clean????
        forEachMatch(pattern_str='<p>(.*?)</p>', to_match_str=article_data,
                     func=lambda match: file_operator.writeFile(cleanArticle(match.group(1))))
    except urllib.error.URLError:
        print(target_url, 'is a wrong url')
    except BaseException as message:
        print(message)
    # ?????????URL
    searched_url.add(target_url)


# ??<p></p>????,???????
项目:pynix    作者:adnelson    | 项目源码 | 文件源码
def import_to_store(self, compressed_nar):
        """Given a compressed NAR, extract it and import it into the nix store.

        :param compressed_nar: The bytes of a NAR, compressed.
        :type  compressed_nar: ``str``
        """
        # Figure out how to extract the content.
        if self.compression.lower() in ("xz", "xzip"):
            data = lzma.decompress(compressed_nar)
        elif self.compression.lower() in ("bz2", "bzip2"):
            data = bz2.decompress(compressed_nar)
        else:
            data = gzip.decompress(compressed_nar)

        # Once extracted, convert it into a nix export object and import.
        export = self.nar_to_export(data)
        imported_path = export.import_to_store()
项目:repo-checker    作者:1dot75cm    | 项目源码 | 文件源码
def get_page(self, _url):
        ''' ????????
        return str '''

        header = { 'Accept-Encoding': 'gzip' }
        header['User-Agent'] = self.ualist[random.randint(0, len(self.ualist)-1)]
        if opts['user_agent']: header['User-Agent'] = opts['user_agent']

        with (yield from semaphore):
            response = yield from aiohttp.request('GET', _url, headers = header)
            page = yield from response.read()

        try:
            if self.url_type == "2": return "None Content"
            if self.url_type == "4": return gzip.decompress(page).decode('gb2312').encode('utf-8')
            else:                    return gzip.decompress(page)
        except OSError:
            return page
项目:Seedbox-Statistics-For-InfluxDB    作者:barrycarey    | 项目源码 | 文件源码
def _process_response(self, res):
        """
        Take the response object and return JSON
        :param res:
        :return:
        """
        # TODO Figure out exceptions here
        if res.headers['Content-Encoding'] == 'gzip':
            self.send_log('Detected gzipped response', 'debug')
            raw_output = gzip.decompress(res.read()).decode('utf-8')
        else:
            self.send_log('Detected other type of response encoding: {}'.format(res.headers['Content-Encoding']), 'debug')
            raw_output = res.read().decode('utf-8')

        json_output = json.loads(raw_output)

        return json_output
项目:tm-manifesting    作者:FabricAttachedMemory    | 项目源码 | 文件源码
def _load_data():
    # https://github.com/raumkraut/python-debian/blob/master/README.deb822

    global _data
    mirror = BP.config['L4TM_MIRROR']
    release = BP.config['L4TM_RELEASE']
    repo = '%s/dists/%s/%%s/%%s/Packages.gz' % (mirror, release)

    _data = {}
    for area in BP.config['L4TM_AREAS']:
        for arch in ('binary-all', 'binary-arm64'):
            BP.logger.info('Loading/processing %s/%s/Packages.gz...' % (
                area, arch))
            pkgarea = repo % (area, arch)
            pkgresp = HTTP_REQUESTS.get(pkgarea)
            if pkgresp.status_code != 200:
                BP.logger.error('%s not found' % arch)
                continue
            BP.logger.debug('Uncompressing %s bytes' % pkgresp.headers['content-length'])

            unzipped = gzip.decompress(pkgresp.content) # bytes all around
            BP.logger.debug('Parsing %d bytes of package data' % len(unzipped))
            unzipped = BytesIO(unzipped)    # the next step needs read()
            tmp = [ src for src in Packages.iter_paragraphs(unzipped) ]
            _data.update(dict((pkg['Package'], pkg) for pkg in tmp))
项目:hh-page-classifier    作者:TeamHG-Memex    | 项目源码 | 文件源码
def extract_features(doc):
    html = doc['html'] or ''
    if not doc_is_extra_sampled(doc):
        try:
            html = gzip.decompress(base64.b64decode(html)).decode('utf8')
        except Exception:
            pass  # support not compressed html too
    text = html_text.extract_text(html)
    try:
        lang = langdetect.detect(text)
    except LangDetectException:
        lang = None
    return {
        'text': text,
        'language': lang,
    }
项目:defuse_division    作者:lelandbatey    | 项目源码 | 文件源码
def msg_recv(conn, sendfunc, closefunc):
    '''
    Function msg_recv reads null-delimited series of bytes from `conn`, which
    is a socket. Each series of bytes is then de-serialized into a json object,
    and `sendfunc` is called with that json object.
    `closefunc` is called if/when the socket `conn` is closed.
    '''
    buf = bytes()
    while True:
        try:
            data = conn.recv(8192)

            # No data means the connection is closed
            if not data:
                closefunc()
                return

            inbuf = buf + data
            if SEP in inbuf:
                parts = inbuf.split(SEP)
                # logging.debug("Length of parts: {}".format(len(parts)))
                tosend = [parts[0]]
                for p in parts[1:-1]:
                    tosend.append(p)
                buf = parts[-1]
                for msg in tosend:
                    m = gzip.decompress(msg)
                    m = m.decode('utf-8')
                    logging.debug("Msg: {}".format(m[:150]+'...' if len(m) > 150 else m))
                    obj = json.loads(m)
                    sendfunc(obj)
            else:
                buf += data
        except Exception as e:
            logging.exception(e)
项目:BitBot    作者:crack00r    | 项目源码 | 文件源码
def _handle_gzip_packed(self, msg_id, sequence, reader, updates):
        self._logger.debug('Handling gzip packed data')
        reader.read_int(signed=False)  # code
        packed_data = reader.tgread_bytes()
        unpacked_data = gzip.decompress(packed_data)

        with BinaryReader(unpacked_data) as compressed_reader:
            return self._process_msg(
                msg_id, sequence, compressed_reader, updates)

    # endregion
项目:PyCIRCLeanMail    作者:CIRCL    | 项目源码 | 文件源码
def _lzma(self):
        '''LZMA processor'''
        try:
            archive = lzma.decompress(self.cur_attachment.file_obj.read())
            new_fn, ext = os.path.splitext(self.cur_attachment.orig_filename)
            cur_file = File(archive, new_fn)
            self.process_payload(cur_file)
        except:
            self.cur_attachment.make_dangerous()
        return self.cur_attachment
项目:PyCIRCLeanMail    作者:CIRCL    | 项目源码 | 文件源码
def _bzip(self):
        '''BZip2 processor'''
        try:
            archive = bz2.decompress(self.cur_attachment.file_obj.read())
            new_fn, ext = os.path.splitext(self.cur_attachment.orig_filename)
            cur_file = File(archive, new_fn)
            self.process_payload(cur_file)
        except:
            self.cur_attachment.make_dangerous()
        return self.cur_attachment
项目:ssbio    作者:SBRG    | 项目源码 | 文件源码
def download_biological_assemblies(pdb_id, outdir):
    """Downloads biological assembly file from:
    `ftp://ftp.wwpdb.org/pub/pdb/data/biounit/coordinates/divided/`

    Args:
        outdir (str): Output directory of the decompressed assembly

    """

    # TODO: not tested yet
    if not op.exists(outdir):
        raise ValueError('{}: output directory does not exist'.format(outdir))

    folder = pdb_id[1:3]
    server = 'ftp://ftp.wwpdb.org/pub/pdb/data/biounit/coordinates/divided/{}/'.format(folder)
    html_folder = urlopen(server).readlines()
    for line in html_folder:
        if pdb_id in str(line).strip():
            file_name = '%s' % (pdb_id + str(line).strip().split(pdb_id)[1].split('\r\n')[0])
            outfile_name = file_name.replace('.', '_')
            outfile_name = outfile_name.replace('_gz', '.pdb')
            f = urlopen(op.join(server, file_name))
            decompressed_data = zlib.decompress(f.read(), 16 + zlib.MAX_WBITS)
            with open(op.join(outdir, outfile_name), 'wb') as f:
                f.write(decompressed_data)
                f.close()
            log.debug('{}: downloaded biological assembly')
            return op.join(outdir, outfile_name)
项目:postmarker    作者:Stranger6667    | 项目源码 | 文件源码
def decode(value, decompress=False):
    """
    Decodes response from Base64 encoded string.
    """
    decoded = base64.b64decode(value)
    if decompress:
        decoded = gzip.decompress(decoded)
    return json.loads(decoded.decode())
项目:Telethon    作者:LonamiWebs    | 项目源码 | 文件源码
def read(reader):
        assert reader.read_int(signed=False) == GzipPacked.CONSTRUCTOR_ID
        return gzip.decompress(reader.tgread_bytes())
项目:do-portal    作者:certeu    | 项目源码 | 文件源码
def get_vxstream_report(sha256, envid, type_):
    # XML, HTML, BIN and PCAP are GZipped
    Sample.query.filter_by(sha256=sha256).first_or_404()
    headers = {
        'Accept': 'text/html',
        'User-Agent': 'VxStream Sandbox API Client'}
    params = {'type': type_, 'environmentId': envid}
    vx = vxstream.api.get('result/{}'.format(sha256),
                          params=params, headers=headers)
    if type_ in ['xml', 'html', 'bin', 'pcap']:
        return gzip.decompress(vx)
    return vx
项目:do-portal    作者:certeu    | 项目源码 | 文件源码
def get_fireeye_report(sha256, envid, type):
    raise ApiException({}, 501)
    # XML, HTML, BIN and PCAP are GZipped
    Sample.query.filter_by(sha256=sha256).first_or_404()
    headers = {
        'Accept': 'text/html',
        'User-Agent': 'FireEye Sandbox API Client'}
    params = {'type': type, 'environmentId': envid}
    vx = fireeye.api.get('result/{}'.format(sha256),
                         params=params, headers=headers)
    if type in ['xml', 'html', 'bin', 'pcap']:
        return gzip.decompress(vx)
    return vx
项目:do-portal    作者:certeu    | 项目源码 | 文件源码
def get_cp_vxstream_report(sha256, envid, type_):
    # XML, HTML, BIN and PCAP are GZipped
    Sample.query.filter_by(sha256=sha256, user_id=g.user.id).first_or_404()
    headers = {
        'Accept': 'text/html',
        'User-Agent': 'VxStream Sandbox API Client'}
    params = {'type': type_, 'environmentId': envid}
    vx = vxstream.api.get('result/{}'.format(sha256),
                          params=params, headers=headers)
    if type_ in ['xml', 'html', 'bin', 'pcap']:
        return gzip.decompress(vx)
    return vx
项目:google-crawler    作者:xibowang    | 项目源码 | 文件源码
def fetch_page(query):
    url = REQUEST_URL.format(BASE_URL, query)
    request = urllib.request.Request(url)
    request.add_header('User-agent', _random_user_agent())
    request.add_header('connection', 'keep-alive')
    request.add_header('Accept-Encoding', 'gzip, deflate, sdch, br')
    request.add_header('referer', REQUEST_URL.format(BASE_URL, ""))
    print(url)
    response = urllib.request.urlopen(request)

    data = response.read()
    print(type(data))
    return gzip.decompress(data)
项目:google-crawler    作者:xibowang    | 项目源码 | 文件源码
def read_bytes_data():
    with open('sample.bytes', 'rb') as file:
        content = file.read()
        # dom = BeautifulSoup(gzip.decompress(content))
        data = gzip.decompress(content)
        # soup = BeautifulSoup(data, 'html.parser')
        # links = soup.find_all('a', {'class':'l _HId'})
        # for link in links:
        #     print(link.get('href'))
        # print(soup.prettify())
项目:mugen    作者:PeterDing    | 项目源码 | 文件源码
def decode_gzip(content):
    assert isinstance(content, bytes)
    return gzip.decompress(content)
项目:mugen    作者:PeterDing    | 项目源码 | 文件源码
def decode_deflate(content):
    assert isinstance(content, bytes)
    try:
        return zlib.decompress(content)
    except Exception:
        return zlib.decompress(content, -zlib.MAX_WBITS)
项目:forget    作者:codl    | 项目源码 | 文件源码
def test_brotli_static_gzip(br_client):
    from gzip import decompress

    gzip_resp = br_client.get(
            '/static/bee.txt',
            headers=[('accept-encoding', 'gzip')])
    assert gzip_resp.headers.get('content-encoding') == 'gzip'

    assert BEE_SCRIPT in decompress(gzip_resp.data)
项目:forget    作者:codl    | 项目源码 | 文件源码
def test_brotli_static_br(br_client):
    from brotli import decompress

    br_resp = br_client.get(
            '/static/bee.txt',
            headers=[('accept-encoding', 'gzip, br')])
    assert br_resp.headers.get('content-encoding') == 'br'

    assert BEE_SCRIPT in decompress(br_resp.data)
项目:forget    作者:codl    | 项目源码 | 文件源码
def test_brotli_dynamic(br_client):
    from brotli import decompress

    resp = br_client.get(
            '/',
            headers=[('accept-encoding', 'gzip, br')])

    assert resp.headers.get('x-brotli-cache') == 'MISS'
    assert resp.headers.get('content-encoding') == 'br'
    assert b"Hello, world!" in decompress(resp.data)
项目:miracle    作者:mozilla    | 项目源码 | 文件源码
def test_main(bucket, crypto, processor):
    _p = functools.partial(_payload, crypto)
    today = date.today()
    user = 'foo'
    prefix = 'v2/sessions/%s/%s/%s/' % (today.year, today.month, user)

    records = [
        make_record('2', _p({'user': user, 'extra': 1})),
    ]
    assert main(processor, records) == ('2', 0)

    objs = list(bucket.filter(Prefix=prefix))
    assert len(objs) == 1
    assert objs[0].key.endswith('.json.gz')

    obj = bucket.get(objs[0].key)
    assert obj['ContentEncoding'] == 'gzip'
    assert obj['ContentType'] == 'application/json'
    body = obj['Body'].read()
    obj['Body'].close()

    body = json.loads(gzip.decompress(body).decode('utf-8'))
    assert body == {'user': user, 'extra': 1}

    # Upload a second time
    records = [
        make_record('3', _p({'user': user, 'extra': 2})),
    ]
    assert main(processor, records) == ('3', 0)

    objs = list(bucket.filter(Prefix=prefix))
    assert len(objs) == 2
项目:floto    作者:babbel    | 项目源码 | 文件源码
def _decompress_result(self, compressed_result):
        result_bytes = bytes([int(c, 16) for c in compressed_result.split('x')])
        result = gzip.decompress(result_bytes).decode()
        result = json.loads(result)
        return result
项目:tasker    作者:wavenator    | 项目源码 | 文件源码
def decompress(
        data,
    ):
        decompressed_object = gzip.decompress(data)

        return decompressed_object
项目:analytics-platform-ops    作者:ministryofjustice    | 项目源码 | 文件源码
def log_file_s3_object(record):
    return gzip.decompress(s3.get_object(
        Bucket=record['s3']['bucket']['name'],
        Key=record['s3']['object']['key'])['Body'].read())
项目:bcloud    作者:wangYanJava    | 项目源码 | 文件源码
def urlopen(url, headers={}, data=None, retries=RETRIES, timeout=TIMEOUT):
    '''????http??, ???Request.

    headers ???dict. ?????????, ??User-Agent, Referer?, ?
    ????????.

    ????????http??, ??????????.
    ???????gzip????, ????gzip???????, ???????
    ??.
    req.data ?????????http????, ????UTF-8?????.
    '''
    headers_merged = default_headers.copy()
    for key in headers.keys():
        headers_merged[key] = headers[key]
    opener = urllib.request.build_opener(ForbiddenHandler)
    opener.addheaders = [(k, v) for k,v in headers_merged.items()]

    for i in range(retries):
        try:
            req = opener.open(url, data=data, timeout=timeout)
            encoding = req.headers.get('Content-encoding')
            req.data = req.read()
            if encoding == 'gzip':
                req.data = gzip.decompress(req.data)
            elif encoding == 'deflate':
                req.data = zlib.decompress(req.data, -zlib.MAX_WBITS)
            return req
        except OSError:
            logger.error(traceback.format_exc())

        except:
            logger.error(traceback.format_exc())

    return None
项目:bcloud    作者:wangYanJava    | 项目源码 | 文件源码
def post_multipart(url, headers, fields, files, retries=RETRIES):
    content_type, body = encode_multipart_formdata(fields, files)
    schema = urllib.parse.urlparse(url)

    headers_merged = default_headers.copy()
    for key in headers.keys():
        headers_merged[key] = headers[key]
    headers_merged['Content-Type'] = content_type
    headers_merged['Content-length'] = str(len(body))

    for i in range(retries):
        try:
            h = http.client.HTTPConnection(schema.netloc)
            h.request('POST', url, body=body, headers=headers_merged)
            req = h.getresponse()
            encoding = req.getheader('Content-encoding')
            req.data = req.read()
            if encoding == 'gzip':
                req.data = gzip.decompress(req.data)
            elif encoding == 'deflate':
                req.data = zlib.decompress(req.data, -zlib.MAX_WBITS)
            return req
        except OSError:
            logger.error(traceback.format_exc())
        except:
            logger.error(traceback.format_exc())
            #return None
    return None
项目:amprolla    作者:parazyd    | 项目源码 | 文件源码
def rehash_release(_filelist, fdesc, rmstr):
    """
    Calculates checksums of a given filelist and writes them to the given
    file descriptor. Takes rmstr as the third argument, which is a string to
    remove from the path of the hashed file when writing it to a file.
    """
    info('Hashing checksums')
    for csum in checksums:
        fdesc.write('%s:\n' % csum['name'])
        for i in _filelist:
            if isfile(i):
                cont = open(i, 'rb').read()
                fdesc.write(' %s %8s %s\n' % (csum['f'](cont).hexdigest(),
                                              getsize(i),
                                              i.replace(rmstr+'/', '')))
            elif i.endswith('.xz') and isfile(i.replace('.xz', '.gz')):
                xzstr = lzma_comp(open(i.replace('.xz', '.gz'), 'rb').read())
                fdesc.write(' %s %8s %s\n' % (csum['f'](xzstr).hexdigest(),
                                              len(xzstr),
                                              i.replace(rmstr+'/', '')))
            elif not i.endswith('.gz') and isfile(i+'.gz'):
                uncomp = gzip_decomp(open(i+'.gz', 'rb').read())
                fdesc.write(' %s %8s %s\n' % (csum['f'](uncomp).hexdigest(),
                                              len(uncomp),
                                              i.replace(rmstr+'/', '')))
    return
项目:PythonBasicDemo    作者:actanble    | 项目源码 | 文件源码
def ungzip(data):
    try:        # ????
        print('????.....')
        data = gzip.decompress(data)
        print('????!')
    except:
        print('????, ????')
    return data
项目:PythonBasicDemo    作者:actanble    | 项目源码 | 文件源码
def ungzip(data):
    try:        # ????
        print('????.....')
        data = gzip.decompress(data)
        print('????!')
    except:
        print('????, ????')
    return data
项目:office_helper    作者:mikumikulch    | 项目源码 | 文件源码
def __ungzip(self, data):
        try:  # ????
            # print('????.....')
            data = gzip.decompress(data)
            # print('????!')
        except:
            logger.error('?????????????????')
        # print('????, ????')
        return data
项目:xuexi    作者:liwanlei    | 项目源码 | 文件源码
def Get_weather(city):
    url = 'http://wthrcdn.etouch.cn/WeatherApi?city=' + urllib.parse.quote(city)
    weather = urllib.request.urlopen(url).read()
    weather_data = gzip.decompress(weather).decode('utf-8')
    soup = BeautifulSoup(weather_data)
    return soup
项目:xuexi    作者:liwanlei    | 项目源码 | 文件源码
def get_huochepiao(url1):
    headers={'Accept':'text/javascript, application/javascript, application/ecmascript, application/x-ecmascript, */*; q=0.01',
    'Accept-Encoding':'gzip, deflate, sdch',
    'Accept-Language':'zh-CN,zh;q=0.8',
    'Connection':'keep-alive',
    'Cookie':'QN99=3733; QN1=eIQiP1dEAASbOq51LyeNAg==; QunarGlobal=192.168.31.105_-4c70ffc_154e15d8dc8_2de|1464074245311; ag_fid=AsWRT9vZYYLJ23qF; __ag_cm_=1464074247005; QN269=906FD473217F11E6B393C4346BAC1530; PHPSESSID=epq85mhbfeg12b3t6q8rkic702; QN25=5cfd26dc-8670-44ec-aafc-94923235a6fc-9f992f90; QN42=zbua0851; _q=U.ryzxozi0081; _t=24542281; csrfToken=QxdjaQNPcDnkhaMMMwxbGbpwWeKXNtET; _s=s_2QHWQF6G6AI3QWPVO6UBTX2LZE; _v=-8JqPkXGW-Vsgcr1koBOn0mWlXDIk6gdgRyueLvJJO3C0Ru2ALnLJw7DFu6Y6FUrAWf8tU-PZtj1Dc2l_o50sSp6YyMnlDQ4dVpPmDi0QMz_XOGK0loLwpTeCoe0wvE0aHJKPGHtArx4jlrdtgWSX9O2IfI8qnNi3-wHXEY6rVEN; QN44=ryzxozi0081; _i=RBTjeomvkDExEx-xsOrmQxSvMXex; _vi=7AZYnlCS385W7Z8-IQdjp5sbVR1PFm8kL0-Qi39HR1-wvJEvexvDP9L5vcTyfiBM9AUeWbCi1osGa2UEs6aMSu-IrejFGqde7L7Y04s8z115RVvdF0h-VmYrWg5Ni-nNZVw8xz3rFA7Jcv-ASn9aff2fhGbtS_0JFDKWQkwggWMx; Hm_lvt_2e7c38479e4def08a8ea1c0ebdb3e0d6=1472535537; Hm_lpvt_2e7c38479e4def08a8ea1c0ebdb3e0d6=1472541016; QN268=|1472541016285_e1523dd1fcbd8c01',
    'Host':'train.qunar.com',
    'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/47.0.2526.80 Safari/537.36',
    'X-Requested-With':'XMLHttpRequest'
    }
    req=urllib.request.Request(url1,headers=headers)
    html=urllib.request.urlopen(req).read()
    decompressed_data = zlib.decompress(html ,16+zlib.MAX_WBITS)
    text = decompressed_data.decode("utf-8")
    soup=BeautifulSoup(text)
    m=str(soup)[46:-2]
    htm=json.loads(m)
    try:
        lines=htm['data']['s2sBeanList']
        print('??????%s??'%len(lines))
        i =1
        for item in lines:
            print("-------------?%s?????-------------"%i)
            print('--------------??????--------------')
            print('?????%s'%item['dptStationName']),\
            print('?????%s'%item['arrStationName']),\
            print('??:%s'%item['trainNo']),\
            print('????:%s'%item['dptTime']),\
            print('??%s'%item['arrTime']),\
            print('???%s'%item['extraBeanMap']['interval']),\
            print('?????%s'%item['extraBeanMap']['ticketType']),print('?????%s'%item['extraBeanMap']['stationType'])
            b=item['seats']
            for key ,value in b.items():
                    print('?????%s,???%s,??:%s'%(key,value['price'],value['count']))
            i+=1
    except:
        print('????????????????????????')
#--------??????
项目:django-brotli    作者:illagrenan    | 项目源码 | 文件源码
def test_middleware_compress_response(self):
        fake_request = FakeRequestAcceptsBrotli()
        response_content = UTF8_LOREM_IPSUM_IN_CZECH
        fake_response = FakeResponse(content=response_content)

        brotli_middleware = BrotliMiddleware()
        brotli_response = brotli_middleware.process_response(fake_request, fake_response)

        decompressed_response = brotli.decompress(data=brotli_response.content)  # type: bytes
        self.assertEqual(response_content, decompressed_response.decode(encoding='utf-8'))
项目:django-brotli    作者:illagrenan    | 项目源码 | 文件源码
def test_etag_is_updated_if_present(self):
        fake_request = FakeRequestAcceptsBrotli()
        response_content = UTF8_LOREM_IPSUM_IN_CZECH * 5
        fake_etag_content = "\"foo\""
        fake_response = FakeResponse(content=response_content, headers={"ETag": fake_etag_content})

        self.assertEqual(fake_response['ETag'], fake_etag_content)

        brotli_middleware = BrotliMiddleware()
        brotli_response = brotli_middleware.process_response(fake_request, fake_response)

        decompressed_response = brotli.decompress(data=brotli_response.content)  # type: bytes
        self.assertEqual(response_content, decompressed_response.decode(encoding='utf-8'))

        self.assertEqual(brotli_response['ETag'], '"foo;br\\"')
项目:django-brotli    作者:illagrenan    | 项目源码 | 文件源码
def test_middleware_wont_compress_if_response_is_already_compressed(self):
        fake_request = FakeRequestAcceptsBrotli()
        response_content = UTF8_LOREM_IPSUM_IN_CZECH
        fake_response = FakeResponse(content=response_content)

        brotli_middleware = BrotliMiddleware()
        django_gzip_middleware = GZipMiddleware()

        gzip_response = django_gzip_middleware.process_response(fake_request, fake_response)
        brotli_response = brotli_middleware.process_response(fake_request, gzip_response)

        self.assertEqual(response_content, gzip.decompress(brotli_response.content).decode(encoding='utf-8'))
项目:GLaDOS2    作者:TheComet    | 项目源码 | 文件源码
def get_json_response(url):
    with urllib.request.urlopen(url) as response:
        result = gzip.decompress(response.read())
        return json.loads(result.decode('utf-8'))
项目:automatic-repo    作者:WZQ1397    | 项目源码 | 文件源码
def ungzip(data,url):
    try:
        print(url,"?????...")
        data = gzip.decompress(data)
        print(url,"????...")
    except:
        print(url,"?????????...")
    return data
项目:automatic-repo    作者:WZQ1397    | 项目源码 | 文件源码
def ungzip(data,url):
    try:
        #print(url,"?????...")
        data = gzip.decompress(data)
        #print(url,"????...")
    except:
        #print(url,"?????????...")
        pass
    return data
项目:automatic-repo    作者:WZQ1397    | 项目源码 | 文件源码
def ungzip(data,url):
    try:
        print(url,"?????...")
        data = gzip.decompress(data)
        print(url,"????...")
    except:
        print(url,"?????????...")
    return data
项目:automatic-repo    作者:WZQ1397    | 项目源码 | 文件源码
def ungzip(data,url):
    try:
        data = gzip.decompress(data)
    except:
        pass
    return data
项目:automatic-repo    作者:WZQ1397    | 项目源码 | 文件源码
def ungzip(data):
    try:
        #print("?????...")
        data = gzip.decompress(data)
        #print("????...")
    except:
        print("?????????...")
    return data

#CSDN???
项目:dcos    作者:dcos    | 项目源码 | 文件源码
def verify_unit_response(zip_ext_file, min_lines):
    assert isinstance(zip_ext_file, zipfile.ZipExtFile)
    unit_output = gzip.decompress(zip_ext_file.read())
    assert len(unit_output.decode().split('\n')) >= min_lines, 'Expect at least {} lines. Full unit output {}'.format(
        min_lines, unit_output)
项目:IoT-Client    作者:suquark    | 项目源码 | 文件源码
def resolve_msg(msg_bytes):
    try:
        msg = simplejson.loads(gzip.decompress(crypto.decrypt(msg_bytes)))
        return msg if msg['proto'] == 'iddp' else None
    except Exception as e:
        logging.error("Received an invalid message: %s" % str(e))
        return None


# iot_force_download_msg = {'proto': 'iddp', 'role': 'force_download', 'routine': ':19001/your_file'}
项目:etlpy    作者:ferventdesert    | 项目源码 | 文件源码
def ungzip(data):
    data = gzip.decompress(data)
    return data;