我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib2.ProxyHandler()。
def get(self, url, proxy=None): if proxy: proxy = urllib2.ProxyHandler({'http': proxy}) opener = urllib2.build_opener(proxy) urllib2.install_opener(opener) try: response = urllib2.urlopen(url) except HTTPError, e: resp = e.read() self.status_code = e.code except URLError, e: resp = e.read() self.status_code = e.code else: self.status_code = response.code resp = response.read() return resp
def download_from_url(url): proxy = env_server.get_proxy() if proxy['enabled']: server = proxy['server'].replace('http://', '') proxy_dict = { 'http': 'http://{login}:{pass}@{0}'.format(server, **proxy) } proxy_handler = urllib2.ProxyHandler(proxy_dict) auth = urllib2.HTTPBasicAuthHandler() opener = urllib2.build_opener(proxy_handler, auth, urllib2.HTTPHandler) urllib2.install_opener(opener) run_thread = tc.ServerThread(env_inst.ui_main) run_thread.kwargs = dict(url=url, timeout=1) run_thread.routine = urllib2.urlopen run_thread.run() result_thread = tc.treat_result(run_thread, silent=True) if result_thread.isFailed(): return False else: return result_thread.result
def ipcheck(proxy): try: pxhandle = urllib2.ProxyHandler({"http": proxy}) opener = urllib2.build_opener(pxhandle) urllib2.install_opener(opener) myip = urllib2.urlopen('http://www.whatismyip.com/automation/n09230945.asp').read() xs = re.findall(('\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}'), StripTags(myip)) if xs[0] == myipadress or myipadress == myip: trans_list.append(proxy) print proxy[:-1],"\t- ALIVE -", timer(), "- TRANSPARENT" elif xs == None: pass else: anon_list.append(proxy) print proxy[:-1],"\t- ALIVE -", timer(), "- EXT-iP :",xs[0] except KeyboardInterrupt: print "\n\nCTRL+C - check temporary proxylist file\n\n" sys.exit(0) except: pass
def download_vcpython27(self): """ Download vcpython27 since some Windows 7 boxes have it and some don't. :return: None """ self._prepare_for_download() logger.info('Beginning download of vcpython27... this may take a few minutes...') with open(os.path.join(DOWNLOADS_DIR, 'vcpython27.msi'), 'wb') as f: if self.PROXY is not None: opener = urllib2.build_opener( urllib2.HTTPHandler(), urllib2.HTTPSHandler(), urllib2.ProxyHandler({'http': self.PROXY, 'https': self.PROXY}) ) urllib2.install_opener(opener) f.write(urllib2.urlopen(self.VCPYTHON27_DOWNLOAD_URL, timeout=self.DOWNLOAD_TIMEOUT).read()) logger.debug('Download of vcpython27 complete')
def download_python(self): """ Download Python :return: None """ self._prepare_for_download() logger.info('Beginning download of python') with open(os.path.join(DOWNLOADS_DIR, 'python-installer.msi'), 'wb') as f: if self.PROXY is not None: opener = urllib2.build_opener( urllib2.HTTPHandler(), urllib2.HTTPSHandler(), urllib2.ProxyHandler({'http': self.PROXY, 'https': self.PROXY}) ) urllib2.install_opener(opener) f.write(urllib2.urlopen(self.PYTHON_DOWNLOAD_URL, timeout=self.DOWNLOAD_TIMEOUT).read()) logger.debug('Download of python complete')
def wait_xxnet_exit(): def http_request(url, method="GET"): proxy_handler = urllib2.ProxyHandler({}) opener = urllib2.build_opener(proxy_handler) try: req = opener.open(url) return req except Exception as e: #logging.exception("web_control http_request:%s fail:%s", url, e) return False for i in range(20): host_port = config.get(["modules", "launcher", "control_port"], 8085) req_url = "http://127.0.0.1:{port}/quit".format(port=host_port) if http_request(req_url) == False: return True time.sleep(1) return False
def __init__(self, url, proxy, cafile): self.url = url self.proxy = proxy if proxy: logging.info("Using HTTPS proxy: " + proxy) proxy_handler = urllib2.ProxyHandler({'https': proxy}) opener = urllib2.build_opener(proxy_handler) urllib2.install_opener(opener) self.kwargs = {} if cafile and hasattr(ssl, "create_default_context"): logging.info("Using CA file: " + cafile) ctx = ssl.create_default_context() ctx.load_verify_locations(cafile = cafile) self.kwargs['context'] = ctx # given an infoMap returned by the local node, call up the home server
def download(url, headers, proxy, num_retries, data=None): print 'Downloading:', url request = urllib2.Request(url, data, headers) opener = urllib2.build_opener() if proxy: proxy_params = {urlparse.urlparse(url).scheme: proxy} opener.add_handler(urllib2.ProxyHandler(proxy_params)) try: response = opener.open(request) html = response.read() code = response.code except urllib2.URLError as e: print 'Download error:', e.reason html = '' if hasattr(e, 'code'): code = e.code if num_retries > 0 and 500 <= code < 600: # retry 5XX HTTP errors html = download(url, headers, proxy, num_retries-1, data) else: code = None return html
def download(url, headers, proxy, num_retries, data=None): print 'Downloading:', url request = urllib2.Request(url, data, headers) opener = urllib2.build_opener() if proxy: proxy_params = {urlparse.urlparse(url).scheme: proxy} opener.add_handler(urllib2.ProxyHandler(proxy_params)) try: response = opener.open(request) html = response.read() code = response.code except urllib2.URLError as e: print 'Download error:', e.reason html = '' if hasattr(e, 'code'): code = e.code if num_retries > 0 and 500 <= code < 600: # retry 5XX HTTP errors return download(url, headers, proxy, num_retries-1, data) else: code = None return html
def download5(url, user_agent='wswp', proxy=None, num_retries=2): """Download function with support for proxies""" print 'Downloading:', url headers = {'User-agent': user_agent} request = urllib2.Request(url, headers=headers) opener = urllib2.build_opener() if proxy: proxy_params = {urlparse.urlparse(url).scheme: proxy} opener.add_handler(urllib2.ProxyHandler(proxy_params)) try: html = opener.open(request).read() except urllib2.URLError as e: print 'Download error:', e.reason html = None if num_retries > 0: if hasattr(e, 'code') and 500 <= e.code < 600: # retry 5XX HTTP errors html = download5(url, user_agent, proxy, num_retries-1) return html
def download(self, url, headers, proxy, num_retries, data=None): print 'Downloading:', url request = urllib2.Request(url, data, headers or {}) opener = self.opener or urllib2.build_opener() if proxy: proxy_params = {urlparse.urlparse(url).scheme: proxy} opener.add_handler(urllib2.ProxyHandler(proxy_params)) try: response = opener.open(request) html = response.read() code = response.code except Exception as e: print 'Download error:', str(e) html = '' if hasattr(e, 'code'): code = e.code if num_retries > 0 and 500 <= code < 600: # retry 5XX HTTP errors return self._get(url, headers, proxy, num_retries-1, data) else: code = None return {'html': html, 'code': code}
def add_proxy(self, addr, proxy_type='all', user=None, password=None): """Add proxy""" if proxy_type == 'all': self.proxies = { 'http': addr, 'https': addr, 'ftp': addr } else: self.proxies[proxy_type] = addr proxy_handler = urllib2.ProxyHandler(self.proxies) self.__build_opener() self.opener.add_handler(proxy_handler) if user and password: pwd_manager = urllib2.HTTPPasswordMgrWithDefaultRealm() pwd_manager.add_password(None, addr, user, password) proxy_auth_handler = urllib2.ProxyBasicAuthHandler(pwd_manager) self.opener.add_handler(proxy_auth_handler) urllib2.install_opener(self.opener)
def check_single_proxy_status(self, proxy_address, domain_check): try: parse = urlparse(proxy_address) proxy_scheme = parse.scheme proxy = str(parse.hostname) + ':' + str(parse.port) proxy_handler = urllib2.ProxyHandler({ proxy_scheme: proxy}) opener = urllib2.build_opener(proxy_handler) opener.addheaders = [('User-agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36')] urllib2.install_opener(opener) req = urllib2.Request(domain_check) start_time = time.time() sock = urllib2.urlopen(req) end_time = time.time() diff_time = round(end_time - start_time, 3) log.console_log(Y + "{}[+] {} OK! Response Time : {}s".format(Y, proxy_address, str(diff_time), W )) return 'ok' except urllib2.HTTPError, e: print('Error code: ' + str(e.code)) return e.code except Exception, detail: print('ERROR ' + str(detail)) return 1
def _update_opener(self): ''' Builds and installs a new opener to be used by all future calls to :func:`urllib2.urlopen`. ''' if self._http_debug: http = urllib2.HTTPHandler(debuglevel=1) else: http = urllib2.HTTPHandler() if self._proxy: opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj), urllib2.ProxyHandler({'http': self._proxy}), urllib2.HTTPBasicAuthHandler(), http) else: opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj), urllib2.HTTPBasicAuthHandler(), http) urllib2.install_opener(opener)
def check_gn_proxy(proxy, protocal_type='HTTP'): url = 'http://icanhazip.com' proxy_handler = urllib2.ProxyHandler({ 'http': 'http://' + proxy, 'https': 'https://' + proxy, }) if protocal_type == 'HTTPS': url = 'https://icanhazip.com' opener = urllib2.build_opener(proxy_handler, urllib2.HTTPHandler) try: response = opener.open(url, timeout=3) res_ip = response.read().strip() return response.code == 200 and res_ip == proxy.split(':')[0] except Exception: return False
def checkAlive(self,ip,port,protocol): testUrl = "https://www.baidu.com/" req_timeout = 3 cookies = urllib2.HTTPCookieProcessor() proxyHost = "" if protocol == 'HTTP' or protocol == 'HTTPS': proxyHost = {"http":r'http://%s:%s' % (ip, port)} #print proxyHost proxyHandler = urllib2.ProxyHandler(proxyHost) opener = urllib2.build_opener(cookies, proxyHandler) opener.addheaders = [('User-Agent', 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36')] try: req = opener.open(testUrl, timeout=req_timeout) result = req.read() #print result gevent.sleep(2) return True except urllib2.HTTPError as e: print e.message return False
def open(aurl,post='',Referer=''): #proxy = 'http://127.0.0.1:8088' #opener = urllib2.build_opener( urllib2.ProxyHandler({'http':proxy}) ) #urllib2.install_opener(opener) if post!='': test_data_urlencode = urllib.urlencode(post) req = urllib2.Request(url=aurl,data = test_data_urlencode) else: req = urllib2.Request(url=aurl) if Referer!='': req.add_header('Referer',Referer) if aspxsession!="": req.add_header('Cookie',aspxsession) res_data = urllib2.urlopen(req) return res_data #????????session
def setUp(self): super(ProxyAuthTests, self).setUp() # Ignore proxy bypass settings in the environment. def restore_environ(old_environ): os.environ.clear() os.environ.update(old_environ) self.addCleanup(restore_environ, os.environ.copy()) os.environ['NO_PROXY'] = '' os.environ['no_proxy'] = '' self.digest_auth_handler = DigestAuthHandler() self.digest_auth_handler.set_users({self.USER: self.PASSWD}) self.digest_auth_handler.set_realm(self.REALM) # With Digest Authentication def create_fake_proxy_handler(*args, **kwargs): return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs) self.server = LoopbackHttpServerThread(create_fake_proxy_handler) self.server.start() self.server.ready.wait() proxy_url = "http://127.0.0.1:%d" % self.server.port handler = urllib2.ProxyHandler({"http" : proxy_url}) self.proxy_digest_handler = urllib2.ProxyDigestAuthHandler() self.opener = urllib2.build_opener(handler, self.proxy_digest_handler)
def test_proxy(self): o = OpenerDirector() ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128")) o.add_handler(ph) meth_spec = [ [("http_open", "return response")] ] handlers = add_ordered_mock_handlers(o, meth_spec) req = Request("http://acme.example.com/") self.assertEqual(req.get_host(), "acme.example.com") r = o.open(req) self.assertEqual(req.get_host(), "proxy.example.com:3128") self.assertEqual([(handlers[0], "http_open")], [tup[0:2] for tup in o.calls])
def test_proxy_https_proxy_authorization(self): o = OpenerDirector() ph = urllib2.ProxyHandler(dict(https='proxy.example.com:3128')) o.add_handler(ph) https_handler = MockHTTPSHandler() o.add_handler(https_handler) req = Request("https://www.example.com/") req.add_header("Proxy-Authorization","FooBar") req.add_header("User-Agent","Grail") self.assertEqual(req.get_host(), "www.example.com") self.assertIsNone(req._tunnel_host) r = o.open(req) # Verify Proxy-Authorization gets tunneled to request. # httpsconn req_headers do not have the Proxy-Authorization header but # the req will have. self.assertNotIn(("Proxy-Authorization","FooBar"), https_handler.httpconn.req_headers) self.assertIn(("User-Agent","Grail"), https_handler.httpconn.req_headers) self.assertIsNotNone(req._tunnel_host) self.assertEqual(req.get_host(), "proxy.example.com:3128") self.assertEqual(req.get_header("Proxy-authorization"),"FooBar")
def test_proxy_basic_auth(self): opener = OpenerDirector() ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128")) opener.add_handler(ph) password_manager = MockPasswordManager() auth_handler = urllib2.ProxyBasicAuthHandler(password_manager) realm = "ACME Networks" http_handler = MockHTTPHandler( 407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm) opener.add_handler(auth_handler) opener.add_handler(http_handler) self._test_basic_auth(opener, auth_handler, "Proxy-authorization", realm, http_handler, password_manager, "http://acme.example.com:3128/protected", "proxy.example.com:3128", )
def check_proxy(self, proxy_list): ava_list = [] test_url = "http://www.baidu.com/" for host, port in proxy_list: ret = False host_port = "%s:%s" % (host, port) proxy = { "http": "http://%s" %(host_port), "https": "https://%s" %(host_port), } proxy_handler = urllib2.ProxyHandler(proxy) opener = urllib2.build_opener(proxy_handler) try: conn = opener.open(test_url, timeout=2.5) data = conn.read() if "??" in data: ret = True ava_list.append((host, port)) except Exception as e: # print e ret = False print "checking proxy: %s ---> %s" % (host_port, str(ret)) return ava_list
def cache_resource(self, url): if self.proxy_url is not None: proxy = urllib2.ProxyHandler({'http': self.proxy_url}) opener = urllib2.build_opener(proxy) urllib2.install_opener(opener) request = urllib2.Request(url) user_agent = 'Mozilla/5.0 (X11; Linux i686) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.835.35 Safari/535.1' request.add_header('User-Agent', user_agent) handler = urllib2.urlopen(request, timeout=self.http_timeout) try: resource_type = MIME_TYPES[handler.headers.get('Content-Type')] if not resource_type: raise UnsupportedResourceFormat("Resource format not found") except KeyError: raise UnsupportedResourceFormat("Resource format not supported") etag = handler.headers.get('ETag') last_modified = handler.headers.get('Last-Modified') resource_key = self.get_resource_key(url) stream = handler.read() self.update_resource_params(resource_key, resource_type, etag, last_modified, stream) return stream, resource_type
def __init__(self, configuration): self.setup(configuration) self.echo = None if "ECHO" in configuration: self.echo = configuration['ECHO'] if self.proxy_scheme is not None and self.proxy_host is not None and \ self.proxy_port is not None: credentials = "" if self.proxy_username is not None and self.proxy_password is not None: credentials = self.proxy_username + ":" + self.proxy_password + "@" proxyDict = { self.proxy_scheme: self.proxy_scheme + "://" + credentials + self.proxy_host + ":" + self.proxy_port } proxy = urllib2.ProxyHandler(proxyDict) if credentials != '': auth = urllib2.HTTPBasicAuthHandler() opener = urllib2.build_opener(proxy, auth, urllib2.HTTPHandler) else: opener = urllib2.build_opener(proxy) urllib2.install_opener(opener)
def do_GET(self): filename = self.path.split('/')[-1] print "ProxyHandler: url:", self.path, " localfile:", filename if os.path.exists(filename): print "ProxyHandler: local file found:",filename self.copyfile(open(filename), self.wfile) else: filenamejson = filename + ".json" print "ProxyHandler: local file NOT found:", filename, " trying: ", filenamejson if os.path.exists(filenamejson): print "ProxyHandler: local file found:",filenamejson self.copyfile(open(filenamejson), self.wfile) else: print "ProxyHandler: trying url:",self.path proxy_handler = urllib2.ProxyHandler({}) opener = urllib2.build_opener(proxy_handler) try: req = urllib2.Request(self.path) self.copyfile(opener.open(req), self.wfile) except: print "ProxyHandler: file not found:", self.path
def _init_urllib(self, secure, debuglevel=0): cj = cookielib.CookieJar() no_proxy_support = urllib2.ProxyHandler({}) cookie_handler = urllib2.HTTPCookieProcessor(cj) ctx = None if not secure: self._logger.info('[WARNING] Skip certificate verification.') ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE https_handler = urllib2.HTTPSHandler(debuglevel=debuglevel, context=ctx) opener = urllib2.build_opener(no_proxy_support, cookie_handler, https_handler, MultipartPostHandler.MultipartPostHandler) opener.addheaders = [('User-agent', API_USER_AGENT)] urllib2.install_opener(opener)
def setUp(self): super(ProxyAuthTests, self).setUp() self.digest_auth_handler = DigestAuthHandler() self.digest_auth_handler.set_users({self.USER: self.PASSWD}) self.digest_auth_handler.set_realm(self.REALM) # With Digest Authentication def create_fake_proxy_handler(*args, **kwargs): return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs) self.server = LoopbackHttpServerThread(create_fake_proxy_handler) self.server.start() self.server.ready.wait() proxy_url = "http://127.0.0.1:%d" % self.server.port handler = urllib2.ProxyHandler({"http" : proxy_url}) self.proxy_digest_handler = urllib2.ProxyDigestAuthHandler() self.opener = urllib2.build_opener(handler, self.proxy_digest_handler)
def proxy_open(self, req, proxy, type): # This block is copied wholesale from Python2.6 urllib2. # It is idempotent, so the superclass method call executes as normal # if invoked. orig_type = req.get_type() proxy_type, user, password, hostport = self._parse_proxy(proxy) if proxy_type is None: proxy_type = orig_type if user and password: user_pass = "%s:%s" % (urllib2.unquote(user), urllib2.unquote(password)) creds = base64.b64encode(user_pass).strip() # Later calls overwrite earlier calls for the same header req.add_header("Proxy-authorization", "Basic " + creds) hostport = urllib2.unquote(hostport) req.set_proxy(hostport, proxy_type) # This condition is the change if orig_type == "https": return None return urllib2.ProxyHandler.proxy_open(self, req, proxy, type)
def proxy_identify(proxy, url): cookie = cookielib.LWPCookieJar() handler = urllib2.HTTPCookieProcessor(cookie) proxy_support = urllib2.ProxyHandler({'http': proxy}) opener = urllib2.build_opener(proxy_support, handler) try: response = opener.open(url, timeout=3) if response.code == 200: c = '' for item in cookie: c += item.name+'='+item.value+';' print c IpProxy.sogou_cookie.append(c) return True except Exception, error: print error return False
def checker(): while True: if proxyq.empty() is not True: proxy = "http://{}".format( proxyq.get() ) url = "http://icanhazip.com" proxy_handler = urllib2.ProxyHandler( { "http" : proxy } ) opener = urllib2.build_opener( proxy_handler ) urllib2.install_opener( opener ) printq.put( "[>] Trying {}".format( proxy ) ) try: response = urllib2.urlopen( url, timeout=3 ).readlines() for line in response: if line.rstrip( "\n" ) in proxy: printq.put( "[+] Working proxy: {}".format( proxy ) ) with open( "working.txt", "a" ) as log: log.write( "{}\n".format( proxy ) ) log.close() except Exception as ERROR: printq.put( "[!] Bad proxy: {}".format( proxy ) ) proxyq.task_done()
def init_options(proxy=None, cookie=None, ua=None, referer=None): globals()["_headers"] = dict(filter(lambda _: _[1], ((COOKIE, cookie), (UA, ua or NAME), (REFERER, referer)))) urllib2.install_opener(urllib2.build_opener(urllib2.ProxyHandler({'http': proxy})) if proxy else None) # if __name__ == "__main__": # print "%s #v%s\n by: %s\n" % (NAME, VERSION, AUTHOR) # parser = optparse.OptionParser(version=VERSION) # parser.add_option("-u", "--url", dest="url", help="Target URL (e.g. \"http://www.target.com/page.php?id=1\")") # parser.add_option("--data", dest="data", help="POST data (e.g. \"query=test\")") # parser.add_option("--cookie", dest="cookie", help="HTTP Cookie header value") # parser.add_option("--user-agent", dest="ua", help="HTTP User-Agent header value") # parser.add_option("--referer", dest="referer", help="HTTP Referer header value") # parser.add_option("--proxy", dest="proxy", help="HTTP proxy address (e.g. \"http://127.0.0.1:8080\")") # options, _ = parser.parse_args() # if options.url: # init_options(options.proxy, options.cookie, options.ua, options.referer) # result = scan_page(options.url if options.url.startswith("http") else "http://%s" % options.url, options.data) # print "\nscan results: %s vulnerabilities found" % ("possible" if result else "no") # else: # parser.print_help()
def setUp(self): mechanize._testcase.TestCase.setUp(self) self.test_uri = urljoin(self.uri, "test_fixtures") self.server = self.get_cached_fixture("server") if self.no_proxies: old_opener_m = mechanize._opener._opener old_opener_u = urllib2._opener mechanize.install_opener(mechanize.build_opener( mechanize.ProxyHandler(proxies={}))) urllib2.install_opener(urllib2.build_opener( urllib2.ProxyHandler(proxies={}))) def revert_install(): mechanize.install_opener(old_opener_m) urllib2.install_opener(old_opener_u) self.add_teardown(revert_install)