Python pycurl 模块,TIMEOUT_MS 实例源码

我们从Python开源项目中,提取了以下3个代码示例,用于说明如何使用pycurl.TIMEOUT_MS

项目:xtdpy    作者:psycofdj    | 项目源码 | 文件源码
def __init__(self, p_request, p_timeoutMs = 1000, p_curlOpts=None):
    if isinstance(p_request, str):
      p_request = HTTPRequest(p_url=p_request)
    self.m_request   = p_request
    self.m_timeoutMs = p_timeoutMs
    self.m_response  = None
    self.m_handle    = None
    self.m_data      = None
    self.m_headers   = None
    self.m_handle    = pycurl.Curl()
    self.m_opts      = p_curlOpts
    if p_curlOpts is None:
      self.m_opts = {}

    self.cleanup()
    self._init_opt()
    self._init_url()
    self._init_method()
    self._init_headers()

    self.m_handle.setopt(pycurl.USERAGENT,      self.m_request.m_agent)
    self.m_handle.setopt(pycurl.HEADERFUNCTION, self._read_header)
    if self.m_timeoutMs:
      self.m_handle.setopt(pycurl.TIMEOUT_MS, self.m_timeoutMs)
    self.m_handle.setopt(pycurl.FOLLOWLOCATION, True)
项目:searx-stats2    作者:dalf    | 项目源码 | 文件源码
def set_timeout(self, timeout):
        self.timeout = int(timeout)
        self.curl_handler.setopt(pycurl.CONNECTTIMEOUT_MS, self.timeout)
        self.curl_handler.setopt(pycurl.TIMEOUT_MS, self.timeout)
项目:searx-stats2    作者:dalf    | 项目源码 | 文件源码
def __init__(self,
                 url,
                 curl_handler,
                 method='GET',
                 headers=None,
                 cookies=None,
                 callback=None,
                 callback_parameters=None,
                 data=None,
                 timeout=5000,
                 ssl_verification=True,
                 proxy=None
                 ):

        if headers is None:
            headers = {}

        if cookies is None:
            cookies = {}

        if callback is None:
            callback = callback_get_response

        if callback_parameters is None:
            callback_parameters = ()

        self.url = url
        self.timeout = timeout

        self.callback = callback
        self.callback_parameters = callback_parameters
        self.curl_handler = curl_handler
        self._response_buffer = BytesIO()
        self.response = None

        # curl_handler
        curl_handler.setopt(curl_handler.URL, url)

        curl_handler.setopt(curl_handler.WRITEFUNCTION, self._response_buffer.write)

        curl_handler.setopt(curl_handler.SSL_VERIFYPEER, int(ssl_verification))

        curl_handler.setopt(curl_handler.CONNECTTIMEOUT_MS, self.timeout)
        curl_handler.setopt(curl_handler.TIMEOUT_MS, self.timeout)

        curl_handler.setopt(curl_handler.HTTPHEADER,
                            ['{0}: {1}'.format(k, v)
                             for k, v in headers.items()])

        if data is not None:
            curl_handler.setopt(curl_handler.POSTFIELDS, urlencode(data))

        if proxy is not None:
            curl_handler.setopt(curl_handler.PROXY, proxy)

        if cookies:
            curl_handler.setopt(curl_handler.COOKIE, '; '.join('{0}={1}'.format(k, v)
                                for k, v in cookies.items()))
        else:
            curl_handler.unsetopt(curl_handler.COOKIE)