Python pycurl 模块,IPRESOLVE_V4 实例源码

我们从Python开源项目中,提取了以下9个代码示例,用于说明如何使用pycurl.IPRESOLVE_V4

项目:annotated-py-tornado    作者:hhstore    | 项目源码 | 文件源码
def _process_queue(self):
        while True:
            started = 0
            while self._free_list and self._requests:
                started += 1
                curl = self._free_list.pop()
                (request, callback) = self._requests.popleft()
                curl.info = {
                    "headers": httputil.HTTPHeaders(),
                    "buffer": cStringIO.StringIO(),
                    "request": request,
                    "callback": callback,
                    "start_time": time.time(),
                }
                # Disable IPv6 to mitigate the effects of this bug
                # on curl versions <= 7.21.0
                # http://sourceforge.net/tracker/?func=detail&aid=3017819&group_id=976&atid=100976
                if pycurl.version_info()[2] <= 0x71500:  # 7.21.0
                    curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4)
                _curl_setup_request(curl, request, curl.info["buffer"],
                                    curl.info["headers"])
                self._multi.add_handle(curl)

            if not started:
                break
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def setInterface(self, options):

        interface, proxy, ipv6 = options["interface"], options["proxies"], options["ipv6"]

        if interface and interface.lower() != "none":
            self.c.setopt(pycurl.INTERFACE, str(interface))

        if proxy:
            if proxy["type"] == "socks4":
                self.c.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS4)
            elif proxy["type"] == "socks5":
                self.c.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS5)
            else:
                self.c.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_HTTP)

            self.c.setopt(pycurl.PROXY, str(proxy["address"]))
            self.c.setopt(pycurl.PROXYPORT, proxy["port"])

            if proxy["username"]:
                self.c.setopt(pycurl.PROXYUSERPWD, str("%s:%s" % (proxy["username"], proxy["password"])))

        if ipv6:
            self.c.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_WHATEVER)
        else:
            self.c.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4)

        if "auth" in options:
            self.c.setopt(pycurl.USERPWD, str(options["auth"]))

        if "timeout" in options:
            self.c.setopt(pycurl.LOW_SPEED_TIME, options["timeout"])
项目:time2go    作者:twitchyliquid64    | 项目源码 | 文件源码
def _process_queue(self):
        with stack_context.NullContext():
            while True:
                started = 0
                while self._free_list and self._requests:
                    started += 1
                    curl = self._free_list.pop()
                    (request, callback) = self._requests.popleft()
                    curl.info = {
                        "headers": httputil.HTTPHeaders(),
                        "buffer": cStringIO.StringIO(),
                        "request": request,
                        "callback": callback,
                        "curl_start_time": time.time(),
                    }
                    # Disable IPv6 to mitigate the effects of this bug
                    # on curl versions <= 7.21.0
                    # http://sourceforge.net/tracker/?func=detail&aid=3017819&group_id=976&atid=100976
                    if pycurl.version_info()[2] <= 0x71500:  # 7.21.0
                        curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4)
                    _curl_setup_request(curl, request, curl.info["buffer"],
                                        curl.info["headers"])
                    self._multi.add_handle(curl)

                if not started:
                    break
项目:annotated-py-tornado    作者:hhstore    | 项目源码 | 文件源码
def _process_queue(self):
        with stack_context.NullContext():
            while True:
                started = 0
                while self._free_list and self._requests:
                    started += 1
                    curl = self._free_list.pop()
                    (request, callback) = self._requests.popleft()
                    curl.info = {
                        "headers": httputil.HTTPHeaders(),
                        "buffer": BytesIO(),
                        "request": request,
                        "callback": callback,
                        "curl_start_time": time.time(),
                    }
                    # Disable IPv6 to mitigate the effects of this bug
                    # on curl versions <= 7.21.0
                    # http://sourceforge.net/tracker/?func=detail&aid=3017819&group_id=976&atid=100976
                    if pycurl.version_info()[2] <= 0x71500:  # 7.21.0
                        curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4)
                    _curl_setup_request(curl, request, curl.info["buffer"],
                                        curl.info["headers"])
                    self._multi.add_handle(curl)

                if not started:
                    break
项目:deprecated_thedap    作者:unitedvote    | 项目源码 | 文件源码
def _process_queue(self):
        with stack_context.NullContext():
            while True:
                started = 0
                while self._free_list and self._requests:
                    started += 1
                    curl = self._free_list.pop()
                    (request, callback) = self._requests.popleft()
                    curl.info = {
                        "headers": httputil.HTTPHeaders(),
                        "buffer": cStringIO.StringIO(),
                        "request": request,
                        "callback": callback,
                        "curl_start_time": time.time(),
                    }
                    # Disable IPv6 to mitigate the effects of this bug
                    # on curl versions <= 7.21.0
                    # http://sourceforge.net/tracker/?func=detail&aid=3017819&group_id=976&atid=100976
                    if pycurl.version_info()[2] <= 0x71500:  # 7.21.0
                        curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4)
                    _curl_setup_request(curl, request, curl.info["buffer"],
                                        curl.info["headers"])
                    self._multi.add_handle(curl)

                if not started:
                    break
项目:get_started_with_respeaker    作者:respeaker    | 项目源码 | 文件源码
def _process_queue(self):
        with stack_context.NullContext():
            while True:
                started = 0
                while self._free_list and self._requests:
                    started += 1
                    curl = self._free_list.pop()
                    (request, callback) = self._requests.popleft()
                    curl.info = {
                        "headers": httputil.HTTPHeaders(),
                        "buffer": BytesIO(),
                        "request": request,
                        "callback": callback,
                        "curl_start_time": time.time(),
                    }
                    # Disable IPv6 to mitigate the effects of this bug
                    # on curl versions <= 7.21.0
                    # http://sourceforge.net/tracker/?func=detail&aid=3017819&group_id=976&atid=100976
                    if pycurl.version_info()[2] <= 0x71500:  # 7.21.0
                        curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4)
                    _curl_setup_request(curl, request, curl.info["buffer"],
                                        curl.info["headers"])
                    self._multi.add_handle(curl)

                if not started:
                    break
项目:pyload-requests    作者:pyload    | 项目源码 | 文件源码
def set_interface(self, options):
        interface, proxy, ipv6 = options[
            'interface'], options['proxies'], options['ipv6']

        if interface and interface.lower() != "none":
            self.setopt(pycurl.INTERFACE, interface)

        if proxy:
            if proxy['type'] == "socks4":
                self.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS4)
            elif proxy['type'] == "socks5":
                self.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS5)
            else:
                self.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_HTTP)

            self.setopt(pycurl.PROXY, proxy['host'])
            self.setopt(pycurl.PROXYPORT, proxy['port'])

            if proxy['username']:
                userpwd = "{0}:{1}".format(
                    proxy['username'], proxy['password'])
                self.setopt(pycurl.PROXYUSERPWD, userpwd)

        if ipv6:
            self.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_WHATEVER)
        else:
            self.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4)

        if "timeout" in options:
            self.setopt(pycurl.LOW_SPEED_TIME, options['timeout'])

        if "auth" in options:
            self.setopt(pycurl.USERPWD, self.options['auth'])
项目:madodl    作者:miezak    | 项目源码 | 文件源码
def curl_common_init(buf):
    handle = pycurl.Curl()

    handle.setopt(pycurl.WRITEDATA, buf)
    handle.setopt(pycurl.HEADERFUNCTION, curl_hdr)
    handle.setopt(pycurl.DEBUGFUNCTION, curl_debug)

    handle.setopt(pycurl.USERPWD, '{}:{}'.format(_g.conf._user,_g.conf._pass))

    handle.setopt(pycurl.FOLLOWLOCATION, True)

    # avoid FTP CWD for fastest directory transversal
    handle.setopt(pycurl.FTP_FILEMETHOD, pycurl.FTPMETHOD_NOCWD)

    # we always set this flag and let the logging module
    # handle filtering.
    handle.setopt(pycurl.VERBOSE, True)

    # use ipv4 for VPNs
    handle.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4)
    handle.setopt(pycurl.USE_SSL, True)
    handle.setopt(pycurl.SSL_VERIFYPEER, False)
    # XXX
    handle.setopt(pycurl.SSL_VERIFYHOST, 0)

    return handle
项目:JSParser    作者:nahamsec    | 项目源码 | 文件源码
def __init__(self, handle=None, options=None):
        self.setCurlHandle(handle)

        if options == None:
            options = Options()

        self.setOptions(options)

        # To start with, disable FOLLOWLOCATION since we'll handle it
        self._handle.setopt(pycurl.FOLLOWLOCATION, False)

        # Force IPv4, since this class isn't yet compatible with IPv6
        self._handle.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4)