Python pycurl 模块,HEADER 实例源码

我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用pycurl.HEADER

项目:Instagram-API    作者:danleyb2    | 项目源码 | 文件源码
def request(self, endpoint, post=None):
        buffer = BytesIO()

        ch = pycurl.Curl()
        ch.setopt(pycurl.URL, Constants.API_URL + endpoint)
        ch.setopt(pycurl.USERAGENT, self.userAgent)
        ch.setopt(pycurl.WRITEFUNCTION, buffer.write)
        ch.setopt(pycurl.FOLLOWLOCATION, True)
        ch.setopt(pycurl.HEADER, True)
        ch.setopt(pycurl.VERBOSE, False)
        ch.setopt(pycurl.COOKIEFILE, os.path.join(self.IGDataPath, self.username, self.username + "-cookies.dat"))
        ch.setopt(pycurl.COOKIEJAR, os.path.join(self.IGDataPath, self.username, self.username + "-cookies.dat"))

        if post is not None:
            ch.setopt(pycurl.POST, True)
            ch.setopt(pycurl.POSTFIELDS, post)

        if self.proxy:
            ch.setopt(pycurl.PROXY, self.proxyHost)
            if self.proxyAuth:
                ch.setopt(pycurl.PROXYUSERPWD, self.proxyAuth)

        ch.perform()
        resp = buffer.getvalue()
        header_len = ch.getinfo(pycurl.HEADER_SIZE)
        header = resp[0: header_len]
        body = resp[header_len:]

        ch.close()

        if self.debug:
            print("REQUEST: " + endpoint)
            if post is not None:
                if not isinstance(post, list):
                    print("DATA: " + str(post))
            print("RESPONSE: " + body)

        return [header, json_decode(body)]
项目:falsy    作者:pingf    | 项目源码 | 文件源码
def setup_curl_for_post(c, p, data_buf, headers=None, share=None):
    setup_curl_basic(c, p, data_buf, headers, share)
    httpheader = p.get('httpheader', ['Accept: application/json', "Content-type: application/json"])
    if httpheader:
        # c.setopt(pycurl.HEADER, p.get('header', 1))
        c.setopt(pycurl.HTTPHEADER, httpheader)
    post301 = getattr(pycurl, 'POST301', None)
    if post301 is not None:
        # Added in libcurl 7.17.1.
        c.setopt(post301, True)
    c.setopt(pycurl.POST, 1)
    postfields = p.get('postfields')
    if postfields:
        postfields = json.dumps(postfields, indent=2, ensure_ascii=False)
        c.setopt(pycurl.POSTFIELDS, postfields)
    return c
项目:Instagram-API    作者:danleyb2    | 项目源码 | 文件源码
def request(self, endpoint, headers=None, post=None, first=True):
        buffer = BytesIO()

        ch = pycurl.Curl()

        ch.setopt(pycurl.URL, endpoint)
        ch.setopt(pycurl.USERAGENT, self.userAgent)
        ch.setopt(pycurl.WRITEFUNCTION, buffer.write)
        ch.setopt(pycurl.FOLLOWLOCATION, True)
        ch.setopt(pycurl.HEADER, True)
        if headers:
            ch.setopt(pycurl.HTTPHEADER, headers)

        ch.setopt(pycurl.VERBOSE, self.debug)
        ch.setopt(pycurl.SSL_VERIFYPEER, False)
        ch.setopt(pycurl.SSL_VERIFYHOST, False)
        ch.setopt(pycurl.COOKIEFILE, self.settingsPath + self.username + '-cookies.dat')
        ch.setopt(pycurl.COOKIEJAR, self.settingsPath + self.username + '-cookies.dat')

        if post:
            import urllib
            ch.setopt(pycurl.POST, len(post))
            ch.setopt(pycurl.POSTFIELDS, urllib.urlencode(post))

        ch.perform()
        resp = buffer.getvalue()
        header_len = ch.getinfo(pycurl.HEADER_SIZE)
        header = resp[0: header_len]
        body = resp[header_len:]
        ch.close()

        if self.debug:
            import urllib
            print("REQUEST: " + endpoint)
            if post is not None:
                if not isinstance(post, list):
                    print('DATA: ' + urllib.unquote_plus(json.dumps(post)))
            print("RESPONSE: " + body + "\n")

        return [header, json_decode(body)]
项目:GOKU    作者:bingweichen    | 项目源码 | 文件源码
def __init__(self):
        self.curl = pycurl.Curl()
        self.curl.setopt(pycurl.SSL_VERIFYHOST, False)
        self.curl.setopt(pycurl.SSL_VERIFYPEER, False)
        # ?????header
        self.curl.setopt(pycurl.HEADER, False)
项目:AppBackend    作者:540871129    | 项目源码 | 文件源码
def handle_request(self):
        curl_handle = pycurl.Curl()
        # set default options.
        curl_handle.setopt(pycurl.URL, self.request_url)
        curl_handle.setopt(pycurl.REFERER, self.request_url)
        curl_handle.setopt(pycurl.USERAGENT, self.useragent)
        curl_handle.setopt(pycurl.TIMEOUT, self.curlopts['TIMEOUT'])
        curl_handle.setopt(pycurl.CONNECTTIMEOUT, self.curlopts['CONNECTTIMEOUT'])
        curl_handle.setopt(pycurl.HEADER, True)
        #curl_handle.setopt(pycurl.VERBOSE, 1)
        curl_handle.setopt(pycurl.FOLLOWLOCATION, 1)
        curl_handle.setopt(pycurl.MAXREDIRS, 5)
        if(self.request_headers and len(self.request_headers) > 0):
            tmplist = list()
            for(key, value) in self.request_headers.items():
                tmplist.append(key + ':' + value)
            curl_handle.setopt(pycurl.HTTPHEADER, tmplist)
        #??????POST
        curl_handle.setopt(pycurl.HTTPPROXYTUNNEL, 1)
        curl_handle.setopt(pycurl.POSTFIELDS, self.request_body)

        response = StringIO.StringIO()
        curl_handle.setopt(pycurl.WRITEFUNCTION, response.write)

        try:
            curl_handle.perform()
        except pycurl.error as error:
            raise ChannelException(error, 5)

        self.response_code = curl_handle.getinfo(curl_handle.HTTP_CODE)
        header_size = curl_handle.getinfo(curl_handle.HEADER_SIZE)
        resp_str = response.getvalue()
        self.response_headers = resp_str[0 : header_size]
        self.response_body = resp_str[header_size : ]

        response.close()
        curl_handle.close()
项目:Video-Downloader    作者:EvilCult    | 项目源码 | 文件源码
def getPage (self, url, requestHeader = []) :
        resultFormate = StringIO.StringIO()

        fakeIp = self.fakeIp()
        requestHeader.append('CLIENT-IP:' + fakeIp)
        requestHeader.append('X-FORWARDED-FOR:' + fakeIp)

        try:
            curl = pycurl.Curl()
            curl.setopt(pycurl.URL, url.strip())
            curl.setopt(pycurl.ENCODING, 'gzip,deflate')
            curl.setopt(pycurl.HEADER, 1)
            curl.setopt(pycurl.TIMEOUT, 120)
            curl.setopt(pycurl.SSL_VERIFYPEER, 0)   
            curl.setopt(pycurl.SSL_VERIFYHOST, 0)
            curl.setopt(pycurl.HTTPHEADER, requestHeader)
            curl.setopt(pycurl.WRITEFUNCTION, resultFormate.write)
            curl.perform()
            headerSize = curl.getinfo(pycurl.HEADER_SIZE)
            curl.close()

            header = resultFormate.getvalue()[0 : headerSize].split('\r\n')
            body = resultFormate.getvalue()[headerSize : ]
        except Exception, e:
            header = ''
            body = ''

        return header, body
项目:falsy    作者:pingf    | 项目源码 | 文件源码
def setup_curl_for_get(c, p, data_buf, headers=None, share=None):
    setup_curl_basic(c, p, data_buf, headers, share)
    httpheader = p.get('httpheader')
    if httpheader:
        # c.setopt(pycurl.HEADER, p.get('header', 1))
        c.setopt(c.HTTPHEADER, httpheader)
    return c
项目:xmusic-crawler    作者:rockers7414    | 项目源码 | 文件源码
def get_new_release_albums(self):
        self.logger.info("Get albums of the new release from Spotify.")
        service_host = "https://api.spotify.com"
        """
            Get client_id and client_secret value from local.
        """
        config = Config()
        client_id = config.spotify_client_id
        client_secret = config.spotify_client_secret

        access_token = Spotify(client_id, client_secret).get_token()
        headers = ["Authorization: Bearer " + access_token]

        offset = 0
        new_release = []
        while True:
            params = {
                "offset": offset,
                "limit": 50
            }

            buf = BytesIO()

            client = pycurl.Curl()
            client.setopt(pycurl.HEADER, False)
            client.setopt(pycurl.HTTPHEADER, headers)
            client.setopt(pycurl.URL, service_host +
                          "/v1/browse/new-releases" + "?" + urlencode(params))
            client.setopt(pycurl.WRITEFUNCTION, buf.write)
            client.perform()
            client.close()

            body = json.loads(buf.getvalue().decode("utf-8"))
            buf.close()

            if body["albums"]["total"] > 0:
                for data in body["albums"]["items"]:
                    album = Album(data["name"], None)

                    images = []
                    for image_data in data["images"]:
                        images.append(Image(image_data["url"],
                                            image_data["width"],
                                            image_data["height"]))

                    album.images = images

                    album.provider = self._provider
                    album.provider_res_id = data["id"]

                    artist_id = data["artists"][0]["id"] \
                        if len(data["artists"]) > 0 else ""
                    new_release.append((artist_id, album))

            if body["albums"]["total"] \
                    > body["albums"]["limit"] * (offset + 1):
                offset = offset + 1
            else:
                break

        return new_release
项目:obplayer    作者:openbroadcaster    | 项目源码 | 文件源码
def version_update(self):
        if not obplayer.Config.setting('sync_url'):
            return
        obplayer.Log.log('sending player version to server: ' + obplayer.Config.version, 'sync')

        postfields = {}
        postfields['id'] = obplayer.Config.setting('sync_device_id')
        postfields['pw'] = obplayer.Config.setting('sync_device_password')
        postfields['version'] = obplayer.Config.version
        postfields['longitude'] = obplayer.Config.setting('location_longitude')
        postfields['latitude'] = obplayer.Config.setting('location_latitude')

        curl = pycurl.Curl()

        enc_postfields = urllib.urlencode(postfields)

        curl.setopt(pycurl.NOSIGNAL, 1)
        curl.setopt(pycurl.USERAGENT, 'OpenBroadcaster Player')
        curl.setopt(pycurl.URL, obplayer.Config.setting('sync_url') + '?action=version')
        curl.setopt(pycurl.HEADER, False)
        curl.setopt(pycurl.POST, True)
        curl.setopt(pycurl.POSTFIELDS, enc_postfields)

        curl.setopt(pycurl.LOW_SPEED_LIMIT, 10)
        curl.setopt(pycurl.LOW_SPEED_TIME, 60)

        curl.setopt(pycurl.NOPROGRESS, 0)
        curl.setopt(pycurl.PROGRESSFUNCTION, self.curl_progress)

        class CurlResponse:
            def __init__(self):
                self.buffer = u''
            def __call__(self, data):
                self.buffer += data.decode('utf-8')

        curl_response = CurlResponse()
        curl.setopt(pycurl.WRITEFUNCTION, curl_response)

        try:
            curl.perform()
        except:
            obplayer.Log.log("exception in VersionUpdate thread", 'error')
            obplayer.Log.log(traceback.format_exc(), 'error')

        curl.close()

        if curl_response.buffer:
            version = json.loads(curl_response.buffer)
            obplayer.Log.log("server version reported as " + str(version), 'sync')
            if not self.check_min_version(version):
                obplayer.Log.log("minimum server version " + str(MIN_SERVER_VERSION) + " is required. Please update server software before continuing", 'error')
        else:
            obplayer.Log.log("server did not report a version number", 'warning')
项目:obplayer    作者:openbroadcaster    | 项目源码 | 文件源码
def now_playing_update_thread(self, playlist_id, playlist_end, media_id, media_end, show_name):

        if not obplayer.Config.setting('sync_url'):
            return

        postfields = {}
        postfields['id'] = obplayer.Config.setting('sync_device_id')
        postfields['pw'] = obplayer.Config.setting('sync_device_password')

        postfields['playlist_id'] = playlist_id
        postfields['media_id'] = media_id
        postfields['show_name'] = show_name

        if playlist_end != '':
            postfields['playlist_end'] = int(round(playlist_end))
        else:
            postfields['playlist_end'] = ''

        if media_end != '':
            postfields['media_end'] = int(round(media_end))
        else:
            postfields['media_end'] = ''

        curl = pycurl.Curl()

        enc_postfields = urllib.urlencode(postfields)

        curl.setopt(pycurl.NOSIGNAL, 1)
        curl.setopt(pycurl.USERAGENT, 'OpenBroadcaster Player')
        curl.setopt(pycurl.URL, obplayer.Config.setting('sync_url') + '?action=now_playing')
        curl.setopt(pycurl.HEADER, False)
        curl.setopt(pycurl.POST, True)
        curl.setopt(pycurl.POSTFIELDS, enc_postfields)
        #curl.setopt(pycurl.FOLLOWLOCATION, 1)

        curl.setopt(pycurl.LOW_SPEED_LIMIT, 10)
        curl.setopt(pycurl.LOW_SPEED_TIME, 60)

        curl.setopt(pycurl.NOPROGRESS, 0)
        curl.setopt(pycurl.PROGRESSFUNCTION, self.curl_progress)

        try:
            curl.perform()
        except:
            obplayer.Log.log("exception in NowPlayingUpdate thread", 'error')
            obplayer.Log.log(traceback.format_exc(), 'error')

        curl.close()

    #
    # Request sync data from web application.
    # This is used by sync (with request_type='schedule') and sync_priority_broadcasts (with request_type='emerg').
    # Function outputs XML response from server.
    #
项目:obplayer    作者:openbroadcaster    | 项目源码 | 文件源码
def sync_request(self, request_type='', data=False):
        sync_url = obplayer.Config.setting('sync_url')
        if not sync_url:
            obplayer.Log.log("sync url is blank, skipping sync request", 'sync')
            return ''

        curl = pycurl.Curl()

        postfields = {}
        postfields['id'] = obplayer.Config.setting('sync_device_id')
        postfields['pw'] = obplayer.Config.setting('sync_device_password')
        postfields['hbuffer'] = obplayer.Config.setting('sync_buffer')
        if data:
            postfields['data'] = data

        enc_postfields = urllib.urlencode(postfields)

        curl.setopt(pycurl.NOSIGNAL, 1)
        curl.setopt(pycurl.USERAGENT, 'OpenBroadcaster Player')
        curl.setopt(pycurl.URL, sync_url + '?action=' + request_type)
        curl.setopt(pycurl.HEADER, False)
        curl.setopt(pycurl.POST, True)
        curl.setopt(pycurl.POSTFIELDS, enc_postfields)

        # some options so that it'll abort the transfer if the speed is too low (i.e., network problem)
        # low speed abort set to 0.01Kbytes/s for 60 seconds).
        curl.setopt(pycurl.LOW_SPEED_LIMIT, 10)
        curl.setopt(pycurl.LOW_SPEED_TIME, 60)

        curl.setopt(pycurl.NOPROGRESS, 0)
        curl.setopt(pycurl.PROGRESSFUNCTION, self.curl_progress)

        class CurlResponse:
            def __init__(self):
                self.buffer = u''
            def __call__(self, data):
                self.buffer += data.decode('utf-8')

        curl_response = CurlResponse()
        curl.setopt(pycurl.WRITEFUNCTION, curl_response)

        try:
            curl.perform()
        #except pycurl.error as error:
        #    (errno, errstr) = error
        #    obplayer.Log.log('network error: ' + errstr, 'error')
        except:
            obplayer.Log.log("exception in sync " + request_type + " thread", 'error')
            obplayer.Log.log(traceback.format_exc(), 'error')

        curl.close()

        return curl_response.buffer

    #
    # Fetch media from web application.  Saves under media directory.
    # media_id : id of the media we want
    # filename : filename to save under.
    #