Python pycurl 模块,HTTPGET 实例源码

我们从Python开源项目中,提取了以下7个代码示例,用于说明如何使用pycurl.HTTPGET

项目:multiplierz    作者:BlaisProteomics    | 项目源码 | 文件源码
def scan(self, time):
        scan_time,mz,scan_name,st,scan_mode = min(self.scans(), key=lambda si: abs(time - si[0]))

        self.crl.setopt(pycurl.HTTPGET, True)
        self.crl.setopt(pycurl.URL, str(scan_name + '.txt'))

        response = cStringIO.StringIO()
        self.crl.setopt(pycurl.WRITEFUNCTION, response.write)

        for i in range(5):
            #print 'scan %d' % i
            self.crl.perform()
            if response.getvalue():
                break

        scan = response.getvalue().splitlines()

        # how to get charge for an mzURL scan?
        return mzScan([tuple(float(v) for v in s.split()) for s in scan],
                      scan_time, mode=scan_mode, mz=mz)
项目:multiplierz    作者:BlaisProteomics    | 项目源码 | 文件源码
def xic(self, start_time, stop_time, start_mz, stop_mz, filter=None):
        xic_url = str(self.data_file + ('/ric/%s-%s/%s-%s.txt' % (start_time, stop_time,
                                                                  start_mz, stop_mz)))

        self.crl.setopt(pycurl.HTTPGET, True)
        self.crl.setopt(pycurl.URL, xic_url)

        response = cStringIO.StringIO()
        self.crl.setopt(pycurl.WRITEFUNCTION, response.write)

        for i in range(5):
            #print 'xic %d' % i
            self.crl.perform()
            if response.getvalue():
                break

        scan = response.getvalue().splitlines()

        return [tuple(float(v) for v in s.split()) for s in scan]
项目:xtdpy    作者:psycofdj    | 项目源码 | 文件源码
def _init_method(self):
    if self.m_request.m_method == "GET":
      self.m_handle.setopt(pycurl.HTTPGET, 1)
    elif self.m_request.m_method == "PUT":
      self.m_handle.setopt(pycurl.PUT, 1)
    elif self.m_request.m_method == "POST":
      if self.m_request.m_data:
        l_data = self.m_request.m_data
        self.m_handle.setopt(pycurl.POSTFIELDS, l_data)
      else:
        self.m_handle.setopt(pycurl.CUSTOMREQUEST, "POST")
    elif self.m_request.m_method == "HEAD":
      self.m_handle.setopt(pycurl.NOBODY, 1)
    elif self.m_request.m_method == "DELETE":
      self.m_handle.setopt(pycurl.CUSTOMREQUEST, "DELETE")
项目:sogaQuant    作者:idoplay    | 项目源码 | 文件源码
def curl_get(self, url, refUrl=None):
        buf = cStringIO.StringIO()
        curl = pycurl.Curl()
        curl.setopt(curl.URL, url)
        curl.setopt(curl.WRITEFUNCTION, buf.write)
        curl.setopt(pycurl.SSL_VERIFYPEER, 0)
        #curl.setopt(pycurl.SSL_VERIFYHOST, 0)
        #curl.setopt(pycurl.HEADERFUNCTION, self.headerCookie)
        curl.setopt(pycurl.VERBOSE, 0)
        curl.setopt(pycurl.USERAGENT, 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.11; rv:46.0) Gecko/20100101 Firefox/46.0')
        #curl.setopt(pycurl.HTTPGET,1)
        #curl.setopt(pycurl.COOKIE, Cookie)
        #curl.setopt(pycurl.POSTFIELDS, 'j_username={ngnms_user}&j_password={ngnms_password}'.format(**self.ngnms_login))
        curl.setopt(pycurl.COOKIEJAR, '/htdocs/logs/py_cookie.txt')
        curl.setopt(pycurl.COOKIEFILE, '/htdocs/logs/py_cookie.txt')
        if refUrl:
            curl.setopt(pycurl.REFERER, refUrl)
        #curl.setopt(c.CONNECTTIMEOUT, 5)
        #curl.setopt(c.TIMEOUT, 8)
        curl.perform()
        backinfo = ''
        if curl.getinfo(pycurl.RESPONSE_CODE) == 200:
            backinfo = buf.getvalue()
        curl.close()
        return backinfo
项目:alfred-10000ft-scripts    作者:jceelen    | 项目源码 | 文件源码
def get(self, url="", params=None):
        "Ship a GET request for a specified URL, capture the response."
        if params:
            url += "?" + urllib_parse.urlencode(params)
        self.set_option(pycurl.HTTPGET, 1)
        return self.__request(url)
项目:oriskami-python    作者:oriskami    | 项目源码 | 文件源码
def request(self, method, url, headers, post_data=None):
        s = util.StringIO.StringIO()
        rheaders = util.StringIO.StringIO()
        curl = pycurl.Curl()

        proxy = self._get_proxy(url)
        if proxy:
            if proxy.hostname:
                curl.setopt(pycurl.PROXY, proxy.hostname)
            if proxy.port:
                curl.setopt(pycurl.PROXYPORT, proxy.port)
            if proxy.username or proxy.password:
                curl.setopt(
                    pycurl.PROXYUSERPWD,
                    "%s:%s" % (proxy.username, proxy.password))

        if method == 'get':
            curl.setopt(pycurl.HTTPGET, 1)
        elif method == 'post':
            curl.setopt(pycurl.POST, 1)
            curl.setopt(pycurl.POSTFIELDS, post_data)
        else:
            curl.setopt(pycurl.CUSTOMREQUEST, method.upper())

        # pycurl doesn't like unicode URLs
        curl.setopt(pycurl.URL, util.utf8(url))

        curl.setopt(pycurl.WRITEFUNCTION, s.write)
        curl.setopt(pycurl.HEADERFUNCTION, rheaders.write)
        curl.setopt(pycurl.NOSIGNAL, 1)
        curl.setopt(pycurl.CONNECTTIMEOUT, 30)
        curl.setopt(pycurl.TIMEOUT, 80)
        curl.setopt(pycurl.HTTPHEADER, ['%s: %s' % (k, v)
                                        for k, v in headers.items()])
        if self._verify_ssl_certs:
            curl.setopt(pycurl.CAINFO, os.path.join(
                os.path.dirname(__file__), 'data/ca-certificates.crt'))
        else:
            curl.setopt(pycurl.SSL_VERIFYHOST, False)

        try:
            curl.perform()
        except pycurl.error as e:
            self._handle_request_error(e)
        rbody = s.getvalue()
        rcode = curl.getinfo(pycurl.RESPONSE_CODE)

        return rbody, rcode, self.parse_headers(rheaders.getvalue())
项目:multiplierz    作者:BlaisProteomics    | 项目源码 | 文件源码
def scans(self):
        '''Gets the scan_info and caches it, to save extra trips to the server'''

        if not self._scans:
            self.crl.setopt(pycurl.HTTPGET, True)
            self.crl.setopt(pycurl.URL, str(self.data_file + '/scan_info'))

            # the scan_name is the URL of that scan, which is just the time
            scan_url = self.data_file + '/scans/%s'

            response = cStringIO.StringIO()
            self.crl.setopt(pycurl.WRITEFUNCTION, response.write)

            #self.crl.perform()
            for i in range(5):
                #print 'scans %d' % i
                self.crl.perform()
                if response.getvalue():
                    break

            scans = response.getvalue().splitlines()

            self._scans = []
            for scan_line in scans:
                time,mz,scan_type,scan_mode = scan_line.split()
                self._scans.append((float(time),
                                    float(mz),
                                    scan_url % time,
                                    scan_type.upper(),
                                    scan_mode.lower()))

            self._scans.sort()

        return self._scans