Python urllib.error 模块,URLError() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用urllib.error.URLError()

项目:SmallReptileTraining    作者:yanbober    | 项目源码 | 文件源码
def download(self, url, retry_count=3, headers=None, proxy=None, data=None):
        if url is None:
            return None
        try:
            req = request.Request(url, headers=headers, data=data)
            cookie = cookiejar.CookieJar()
            cookie_process = request.HTTPCookieProcessor(cookie)
            opener = request.build_opener()
            if proxy:
                proxies = {urlparse(url).scheme: proxy}
                opener.add_handler(request.ProxyHandler(proxies))
            content = opener.open(req).read()
        except error.URLError as e:
            print('HtmlDownLoader download error:', e.reason)
            content = None
            if retry_count > 0:
                if hasattr(e, 'code') and 500 <= e.code < 600:
                    #??? HTTPError ??? HTTP CODE ? 5XX ???????????????????
                    return self.download(url, retry_count-1, headers, proxy, data)
        return content
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def requestData(url, user_agent):
    try:
        req = request.Request(url)
        req.add_header('User-Agent', user_agent)
        response = request.urlopen(req,timeout = 8)
        #bytes?????
        content = response.read().decode('gbk')
    except error.URLError as e:
        if hasattr(e,'code'):
            print (e.code)
        if hasattr(e,'reason'):
            print (e.reason)
    except error.HTTPError as e:
        if hasattr(e,'code'):
            print(e.code)
        if hasattr(e,'reason'):
            print(e.reason)
        print('HTTPError!!!')

    return content
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def getAbstractInfo(self):

        try:
            content = requestData(self.url, self.user_agent)
            self.getDetailList(content)

        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            if hasattr(e,'code'):
                print(e.code)
            if hasattr(e,'reason'):
                print(e.reason)
            print('HTTPError!!!')
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def requestData(self,url, user_agent):
        try:
            req = request.Request(url)
            req.add_header('User-Agent', user_agent)
            response = request.urlopen(req,timeout = 8)
            #bytes?????
            content = response.read().decode('utf-8')
            return content
        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            if hasattr(e,'code'):
                print(e.code)
            if hasattr(e,'reason'):
                print(e.reason)
            print('HTTPError!!!')
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def requestData(self,url, user_agent):
        try:
            req = request.Request(url)
            req.add_header('User-Agent', user_agent)
            response = request.urlopen(req,timeout = 3)
            #bytes?????
            content = response.read().decode('gbk')
            return content
        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            if hasattr(e,'code'):
                print(e.code)
            if hasattr(e,'reason'):
                print(e.reason)
            print('HTTPError!!!')
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def requestData(self,url, user_agent):
        try:
            req = request.Request(url)
            req.add_header('User-Agent', user_agent)
            response = request.urlopen(req,timeout = 8)
            #bytes?????
            content = response.read().decode('utf-8')
            return content
        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            if hasattr(e,'code'):
                print(e.code)
            if hasattr(e,'reason'):
                print(e.reason)
            print('HTTPError!!!')
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def requestData(url, user_agent):
    try:
        req = request.Request(url)
        req.add_header('User-Agent', user_agent)
        response = request.urlopen(req,timeout = 8)
        #bytes?????
        content = response.read().decode('gbk')
    except error.URLError as e:
        if hasattr(e,'code'):
            print (e.code)
        if hasattr(e,'reason'):
            print (e.reason)
    except error.HTTPError as e:
        if hasattr(e,'code'):
            print(e.code)
        if hasattr(e,'reason'):
            print(e.reason)
        print('HTTPError!!!')

    return content
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def getAbstractInfo(self):

        try:
            content = requestData(self.url, self.user_agent)
            self.getDetailList(content)

        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            if hasattr(e,'code'):
                print(e.code)
            if hasattr(e,'reason'):
                print(e.reason)
            print('HTTPError!!!')
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def requestData(self,url, user_agent):
        try:
            req = request.Request(url)
            req.add_header('User-Agent', user_agent)
            response = request.urlopen(req,timeout = 3)
            #bytes?????
            content = response.read().decode('gbk')
            return content
        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            if hasattr(e,'code'):
                print(e.code)
            if hasattr(e,'reason'):
                print(e.reason)
            print('HTTPError!!!')
项目:pyseeder    作者:PurpleI2P    | 项目源码 | 文件源码
def download(url, filename):
    """Download .su3 file, return True on success"""
    USER_AGENT = "Wget/1.11.4"

    url = "{}i2pseeds.su3".format(url)
    req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT})

    try:
        with urllib.request.urlopen(req) as resp:
            with open(filename, 'wb') as f:
                f.write(resp.read())

        if os.stat(filename).st_size > 0:
            return True
        else:
            return False
    except URLError as e:
        return False
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def compute_dependencies(self, filename=REQUIRES):
        text = Utils.readf(filename)
        data = safe_urlencode([('text', text)])

        if '--offline' in sys.argv:
            self.constraints = self.local_resolve(text)
        else:
            req = Request(get_resolve_url(), data)
            try:
                response = urlopen(req, timeout=TIMEOUT)
            except URLError as e:
                Logs.warn('The package server is down! %r' % e)
                self.constraints = self.local_resolve(text)
            else:
                ret = response.read()
                try:
                    ret = ret.decode('utf-8')
                except Exception:
                    pass
                self.trace(ret)
                self.constraints = parse_constraints(ret)
        self.check_errors()
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def compute_dependencies(self, filename=REQUIRES):
        text = Utils.readf(filename)
        data = safe_urlencode([('text', text)])

        if '--offline' in sys.argv:
            self.constraints = self.local_resolve(text)
        else:
            req = Request(get_resolve_url(), data)
            try:
                response = urlopen(req, timeout=TIMEOUT)
            except URLError as e:
                Logs.warn('The package server is down! %r' % e)
                self.constraints = self.local_resolve(text)
            else:
                ret = response.read()
                try:
                    ret = ret.decode('utf-8')
                except Exception:
                    pass
                self.trace(ret)
                self.constraints = parse_constraints(ret)
        self.check_errors()
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def compute_dependencies(self, filename=REQUIRES):
        text = Utils.readf(filename)
        data = safe_urlencode([('text', text)])

        if '--offline' in sys.argv:
            self.constraints = self.local_resolve(text)
        else:
            req = Request(get_resolve_url(), data)
            try:
                response = urlopen(req, timeout=TIMEOUT)
            except URLError as e:
                Logs.warn('The package server is down! %r' % e)
                self.constraints = self.local_resolve(text)
            else:
                ret = response.read()
                try:
                    ret = ret.decode('utf-8')
                except Exception:
                    pass
                self.trace(ret)
                self.constraints = parse_constraints(ret)
        self.check_errors()
项目:thesaurus_query.vim    作者:Ron89    | 项目源码 | 文件源码
def _woxikon_de_url_handler(target):
    '''
    Query woxikon for sysnonym
    '''
    time_out_choice = float(get_variable(
        'tq_online_backends_timeout', _timeout_period_default))
    try:
        response = urlopen(fixurl(u'http://synonyms.woxikon.com/de/{0}'.format(target)).decode('ASCII'), timeout = time_out_choice)
        web_content = StringIO(unescape(decode_utf_8(response.read())))
        response.close()
    except HTTPError:
        return 1
    except URLError as err:
        if isinstance(err.reason, socket.timeout):  # timeout error?
            return 1
        return -1   # other error
    except socket.timeout:  # timeout error failed to be captured by URLError
        return 1
    return web_content
项目:thesaurus_query.vim    作者:Ron89    | 项目源码 | 文件源码
def _jeck_ru_url_handler(target):
    '''
    Query jiport for sysnonym
    '''
    time_out_choice = float(get_variable(
        'tq_online_backends_timeout', _timeout_period_default))
    try:
        response = urlopen(fixurl(u'http://jeck.ru/tools/SynonymsDictionary/{0}'.format(target)).decode('ASCII'), timeout = time_out_choice)
        web_content = StringIO(decode_utf_8(response.read()))
        response.close()
    except HTTPError:
        return 1
    except URLError as err:
        if isinstance(err.reason, socket.timeout):  # timeout error?
            return 1
        return -1   # any other error
    except socket.timeout:  # if timeout error not captured by URLError
        return 1
    return web_content
项目:AutoTriageBot    作者:salesforce    | 项目源码 | 文件源码
def sendResponse(reportID: str, actionName: str, respURL: str) -> None:
    if actionName == 'body':
        text = getBody(reportID)
    elif actionName == 'metadata':
        text = getMetadata(reportID)
    else:
        raise ValueError("Button %s not defined!" % actionName)
    ephemeralJson = {'response_type': 'ephemeral',
                     'replace_original': False,
                     'text': text}
    # Even if an attacker gets the verification token, we will still refuse to post the data to non-slack.com URLs
    if urlparse(respURL).hostname.endswith('slack.com'):
        requests.post(respURL, json.dumps(ephemeralJson).encode('utf-8'))
    else:
        ephemeralJson['text'] = (('Failed URL check, respURL=%s which is not on the slack.com domain name! This check '
                                  'is theoretically not required (since we verify the verification token), but done as '
                                  'an extra defensive step. To disable this, edit slackServer.py in the project root.')
                                 % respURL)
        requests.post(respURL, json.dumps(ephemeralJson).encode('utf-8'))
        raise URLError("respURL=%s not on slack.com domain!" % respURL)
项目:AutoTriageBot    作者:salesforce    | 项目源码 | 文件源码
def testGETOpenRedirect(url: str, cookies: Mapping[str, str]) -> Optional[str]:
    """ If the given URL redirects when accessed with the given cookies via GET, return the new URL, otherwise
        return None """
    driver = SeleniumDrivers.getFirefoxDriver()
    driver.setCookies(url, cookies)

    try:
        driver.get(url)

        time.sleep(config.timeout)

        if driver.current_url == url:
            driver.reset()
            return None
        else:
            url = driver.current_url
            driver.reset()
            return url
    except (TimeoutException, URLError):
        driver.reset()
        return None
项目:AutoTriageBot    作者:salesforce    | 项目源码 | 文件源码
def testPOSTOpenRedirect(url: str, cookies: Mapping[str, str], data: Mapping[str, str]) -> Optional[str]:
    """ If the given URL redirects when accessed with the given cookies via POST, return the new URL, otherwise
        return None """
    driver = SeleniumDrivers.getFirefoxDriver()
    driver.setCookies(url, cookies)
    try:
        driver.post(url, data)

        time.sleep(config.timeout)

        if driver.current_url == url:
            driver.reset()
            return None
        else:
            url = driver.current_url
            driver.reset()
            return url
    except (TimeoutException, URLError):
        driver.reset()
        return None
项目:AutoTriageBot    作者:salesforce    | 项目源码 | 文件源码
def testPOSTXSSDriver(url: str, cookies: Mapping[str, str], data: Mapping[str, str], driver: webdriver) -> \
        Optional[str]:
    """ If the given URL pops an alert box when accessed with the given cookies, return the contents of the alert box,
        otherwise return None """
    driver.setCookies(url, cookies)

    try:
        driver.post(url, data)

        WebDriverWait(driver, config.timeout).until(expected_conditions.alert_is_present())
        # Note that despite the name switch_to_alert also handles prompt:
        #   - http://selenium-python.readthedocs.io/navigating.html#popup-dialogs
        alert = driver.switch_to_alert()
        text = alert.text
        driver.reset()
        return text
    except (TimeoutException, URLError):
        driver.reset()
        return None
项目:geocoder-ie    作者:devgateway    | 项目源码 | 文件源码
def query(location, cty_codes, query_method, fuzzy):
    results = []
    try:
        base_url = get_geonames_base_url()
        username = get_geonames_user_name()
        query_string = base_url + 'username={user}&{query_method}={name}&' \
                                  'style=FULL&orderby={order}&startRow=0&maxRows=5&fuzzy={fuzzy}' \
            .format(user=username, query_method=query_method, name=quote(location), order='relevance', fuzzy=fuzzy)
        if cty_codes and len(cty_codes) > 0:
            query_string = query_string + '&' + '&'.join([('country={}'.format(c)) for c in cty_codes])

        json_decode = json.JSONDecoder()  # used to parse json response
        response = urlopen(query_string)
        response_string = response.read().decode('utf-8')
        parsed_response = json_decode.decode(response_string)
        if parsed_response.get('geonames') and len(parsed_response.get('geonames')) > 0:
            for item in parsed_response['geonames']:
                results.append(parse(item))

    except URLError as e:
        logger.info("Oops!  something didn't go well")
        logger.info(e)

    return results
项目:ask_data_science    作者:AngelaVC    | 项目源码 | 文件源码
def __init__(self, url=None):
        self.url = url
        self.html = None
        self.links = []
        self.soup = None
        self.text = None
        self.title = None

        req = Request(self.url, headers={'User-Agent': "Magic Browser"})
        try:
            self.html = urlopen(req)
        except URLError as e:
            if hasattr(e, 'reason'):
                print('We failed to reach a server.')
                print('Reason: ', e.reason)

            elif hasattr(e, 'code'):
                print('The server couldn\'t fulfill the request.')
                print('Error code: ', e.code)
项目:MailFail    作者:m0rtem    | 项目源码 | 文件源码
def load_url(url, timeout):
    # Build URL query to email signup page
    urlquery = "http://" + url + "/m-users-a-email_list-job-add-email-" + targetEmail + "-source-2.htm"
    print_out(Style.BRIGHT + Fore.WHITE + "Sending request to: " + url)
    # Build the request
    req = urllib.request.Request(
        urlquery, 
        data=None, 
        headers={
            'User-Agent': random.choice(useragents),
            'Host': url
        }
    )
    # Send
    try:
        f = urllib.request.urlopen(req)
        print_out(Style.BRIGHT + Fore.GREEN + "Successfully sent!")
        f.close()
    except urllib.error.URLError as e:
        print_out(Style.BRIGHT + Fore.RED + e.reason)
项目:dataScryer    作者:Griesbacher    | 项目源码 | 文件源码
def run(self):
        if len(self.__update_rates) == 0:
            return

        # wait up to 120 seconds, to get some distortion
        self.__stop_event.wait(randint(0, 120))

        while not self.__stop_event.is_set():
            start = time.time()
            for update in self.__update_rates:
                rate = update[0]
                now = time.time()
                time_to_wait = round(start - now + rate / 1000, 0)
                interrupt = self.__stop_event.wait(time_to_wait)
                if interrupt:
                    return
                try:
                    self.start_calculation(update[1])
                except URLError as e:
                    logging.getLogger(__name__).error("Could not connect to InfluxDB: " + str(e))
                except:
                    logging.getLogger(__name__).error("Job execution failed", exc_info=True)
项目:UKCensusAPI    作者:virgesmith    | 项目源码 | 文件源码
def __fetch_json(self, path, query_params):
    # add API KEY to params
    query_params["uid"] = Nomisweb.KEY

    query_string = Nomisweb.URL + path + str(urlencode(query_params))

    #print(query_string)
    reply = {}
    try:
      response = request.urlopen(query_string, timeout=Nomisweb.Timeout)
    except (HTTPError, URLError) as error:
      print('ERROR: ', error, '\n', query_string)
    except timeout:
      print('ERROR: request timed out\n', query_string)
    else:
      reply = json.loads(response.read().decode("utf-8"))
    return reply

  # save metadata as JSON for future reference
项目:Spider_Hub    作者:WiseDoge    | 项目源码 | 文件源码
def getLinks(articleUrl):
    try:
        html = urlopen("http://en.wikipedia.org"+articleUrl)
    except HTTPError:
        ServerLog.writeLog("HTTPError")
        return None
    except URLError:
        ServerLog.writeLog("URLError")
        print("Sleeping!")
        time.sleep(URLERROR_SLEEP_TIME)
        html = urlopen("http://en.wikipedia.org"+articleUrl)
    bsObj = BeautifulSoup(html, "lxml")
    return bsObj.find("div", {"id":"bodyContent"}).findAll("a", href=re.compile("^(/wiki/)((?!:).)*$"))


# ??IP
项目:Spider_Hub    作者:WiseDoge    | 项目源码 | 文件源码
def getHistoryIPs(pageUrl):
    pageUrl = pageUrl.replace("/wiki/", "")
    historyUrl = "http://en.wikipedia.org/w/index.php?title="+pageUrl+"&action=history"
    print("history url:", historyUrl)

    time.sleep(SLEEP_TIME)

    try:
        html = urlopen(historyUrl)
    except HTTPError:
        return None
    except URLError:
        print("Sleeping!")
        time.sleep(URLERROR_SLEEP_TIME)
        html = urlopen(historyUrl)
    bsObj = BeautifulSoup(html, "lxml")
    ipAddresses = bsObj.findAll("a", {"class":"mw-anonuserlink"})

    addressList = set()
    for ipAddress in ipAddresses:
        print(pageUrl+": "+ipAddress.get_text())
        addressList.add(ipAddress.get_text())
    return addressList #????IP??

# ????IP?????
项目:Spider_Hub    作者:WiseDoge    | 项目源码 | 文件源码
def getCountry(ipAddress):
    '''
    ????IP????
    '''
    try:
        response = urlopen("http://freegeoip.net/json/" +
                           ipAddress).read().decode('utf-8')
    except URLError:
        print("Sleeping!")
        time.sleep(URLERROR_SLEEP_TIME)
        response = urlopen("http://freegeoip.net/json/" +
                           ipAddress).read().decode('utf-8')
    except:
        return 'Unknown'
    responseJson = json.loads(response)
    return responseJson.get("country_code")  # ??????
项目:Spider_Hub    作者:WiseDoge    | 项目源码 | 文件源码
def getLinks(articleUrl):
    '''
    ????????????
    '''
    try:
        html = urlopen("http://en.wikipedia.org" + articleUrl)
    except HTTPError:
        return None
    except URLError:
        print("Sleeping!")
        time.sleep(URLERROR_SLEEP_TIME)
        html = urlopen("http://en.wikipedia.org" + articleUrl)
    bsObj = BeautifulSoup(html, "lxml")
    return bsObj.find("div", {"id": "bodyContent"}).findAll("a", href=re.compile("^(/wiki/)((?!:).)*$"))


# ??????
项目:forget    作者:codl    | 项目源码 | 文件源码
def refresh_posts(posts):
    if not posts:
        return posts

    t = get_twitter_for_acc(posts[0].author)
    if not t:
        return
    try:
        tweets = t.statuses.lookup(
                    _id=",".join((post.twitter_id for post in posts)),
                    trim_user=True, tweet_mode='extended')
    except (URLError, TwitterError) as e:
        handle_error(e)
    refreshed_posts = list()
    for post in posts:
        tweet = next(
            (tweet for tweet in tweets if tweet['id_str'] == post.twitter_id),
            None)
        if not tweet:
            db.session.delete(post)
        else:
            post = db.session.merge(post_from_api_tweet_object(tweet))
            refreshed_posts.append(post)

    return refreshed_posts
项目:stephanie-va    作者:SlapBot    | 项目源码 | 文件源码
def give_a_summary(self):
        self.assistant.say("What would you like to know about?")
        text = self.assistant.listen().decipher()
        text = text.strip().replace(" ", "%20")
        request = Request(
            'https://en.wikipedia.org/w/api.php?'
            'format=json&action=query&prop=extracts&exintro=&explaintext=&titles=' + text
        )
        try:
            response = urlopen(request)
            data = json.loads(
                response.read().decode(
                    response.info().get_param('charset') or 'utf-8'
                )
            )
            output = data["query"]["pages"]
            final = output[list(output.keys())[0]]["extract"]
            return final

        except URLError:
            return "Unable to search your given query."
项目:amadown2py    作者:aesuli    | 项目源码 | 文件源码
def download_page(url, referer, maxretries, timeout, pause):
    tries = 0
    htmlpage = None
    while tries < maxretries and htmlpage is None:
        try:
            code = 404
            req = request.Request(url)
            req.add_header('Referer', referer)
            req.add_header('User-agent',
                           'Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.30 (KHTML, like Gecko) Ubuntu/11.04 Chromium/12.0.742.91 Chrome/12.0.742.91 Safari/534.30')
            with closing(request.urlopen(req, timeout=timeout)) as f:
                code = f.getcode()
                htmlpage = f.read()
                sleep(pause)
        except (urlerror.URLError, socket.timeout, socket.error):
            tries += 1
    if htmlpage:
        return htmlpage.decode('utf-8'), code
    else:
        return None, code
项目:steamwatch    作者:akeil    | 项目源码 | 文件源码
def _get(url):
    LOG.debug('GET {u!r}'.format(u=url))
    # TODO proper error handling - or none
    try:
        response = urlopen(url)
    except HTTPError:
        raise
    except ContentTooShortError:
        raise
    except URLError:
        raise
    except Exception:
        raise

    LOG.debug('{} {}'.format(response.status, response.reason))

    if response.status not in (200,):
        raise ValueError('{} {}'.format(response.status, response.reason))

    return response
项目:tumanov_castleoaks    作者:Roamdev    | 项目源码 | 文件源码
def geocode(address, timeout=5.0):
    """ ?????????? ?????? ????????? (longtitude, latitude,) ?? ?????? ?????? """
    params = parse.urlencode({'sensor': False, 'address': address})
    try:
        response = request.urlopen(conf.GEOCODE_URL + params, timeout=timeout)
    except error.URLError:
        return None

    try:
        dom = xml.dom.minidom.parseString(response.read())
        location_elem = dom.getElementsByTagName('location')[0]
        lng = location_elem.getElementsByTagName('lng')[0]
        lat = location_elem.getElementsByTagName('lat')[0]
    except IndexError:
        return None

    return lng.firstChild.data, lat.firstChild.data
项目:tumanov_castleoaks    作者:Roamdev    | 项目源码 | 文件源码
def get_info(cls, video_key):
        req = request.Request('http://rutube.ru/api/video/%s/?format=xml' % video_key, method='GET')
        try:
            logger.debug('{0.method} {0.full_url}'.format(req))
            response = request.urlopen(req, timeout=3)
        except error.URLError:
            return None

        if response.status != 200:
            return None

        dom = minidom.parseString(response.read())

        title = dom.getElementsByTagName('title').item(0)
        description = dom.getElementsByTagName('description').item(0)
        thumbnail = dom.getElementsByTagName('thumbnail_url').item(0)
        embed = dom.getElementsByTagName('html').item(0)

        return {
            'title': title.firstChild.data,
            'description': description.firstChild.data,
            'preview_url': thumbnail.firstChild.data,
            'embed': embed.firstChild.data
        }
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testPasswordProtectedSite(self):
        support.requires('network')
        with support.transient_internet('mueblesmoraleda.com'):
            url = 'http://mueblesmoraleda.com'
            robots_url = url + "/robots.txt"
            # First check the URL is usable for our purposes, since the
            # test site is a bit flaky.
            try:
                urlopen(robots_url)
            except HTTPError as e:
                if e.code not in {401, 403}:
                    self.skipTest(
                        "%r should return a 401 or 403 HTTP error, not %r"
                        % (robots_url, e.code))
            else:
                self.skipTest(
                    "%r should return a 401 or 403 HTTP error, not succeed"
                    % (robots_url))
            parser = urllib.robotparser.RobotFileParser()
            parser.set_url(url)
            try:
                parser.read()
            except URLError:
                self.skipTest('%s is unavailable' % url)
            self.assertEqual(parser.can_fetch("*", robots_url), False)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def handle(self, fn_name, action, *args, **kwds):
        self.parent.calls.append((self, fn_name, args, kwds))
        if action is None:
            return None
        elif action == "return self":
            return self
        elif action == "return response":
            res = MockResponse(200, "OK", {}, "")
            return res
        elif action == "return request":
            return Request("http://blah/")
        elif action.startswith("error"):
            code = action[action.rfind(" ")+1:]
            try:
                code = int(code)
            except ValueError:
                pass
            res = MockResponse(200, "OK", {}, "")
            return self.parent.error("http", args[0], res, code, "", {})
        elif action == "raise":
            raise urllib.error.URLError("blah")
        assert False
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_badly_named_methods(self):
        # test work-around for three methods that accidentally follow the
        # naming conventions for handler methods
        # (*_open() / *_request() / *_response())

        # These used to call the accidentally-named methods, causing a
        # TypeError in real code; here, returning self from these mock
        # methods would either cause no exception, or AttributeError.

        from urllib.error import URLError

        o = OpenerDirector()
        meth_spec = [
            [("do_open", "return self"), ("proxy_open", "return self")],
            [("redirect_request", "return self")],
            ]
        handlers = add_ordered_mock_handlers(o, meth_spec)
        o.add_handler(urllib.request.UnknownHandler())
        for scheme in "do", "proxy", "redirect":
            self.assertRaises(URLError, o.open, scheme+"://example.com/")
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_raise(self):
        # raising URLError stops processing of request
        o = OpenerDirector()
        meth_spec = [
            [("http_open", "raise")],
            [("http_open", "return self")],
            ]
        handlers = add_ordered_mock_handlers(o, meth_spec)

        req = Request("http://example.com/")
        self.assertRaises(urllib.error.URLError, o.open, req)
        self.assertEqual(o.calls, [(handlers[0], "http_open", (req,), {})])

##     def test_error(self):
##         # XXX this doesn't actually seem to be used in standard library,
##         #  but should really be tested anyway...
项目:pytorch    作者:tylergenter    | 项目源码 | 文件源码
def download_file(url, path, binary=True):
    if sys.version_info < (3,):
        import urllib2
        request = urllib2
        error = urllib2
    else:
        import urllib.request
        import urllib.error
        request = urllib.request
        error = urllib.error

    if os.path.exists(path):
        return True
    try:
        data = request.urlopen(url, timeout=15).read()
        with open(path, 'wb' if binary else 'w') as f:
            f.write(data)
        return True
    except error.URLError as e:
        return False
项目:django-cra-helper    作者:MasterKale    | 项目源码 | 文件源码
def is_server_live(server_path: str) -> bool:
    # Ignore the server check if we're in production
    if settings.DEBUG:
        try:
            resp = request.urlopen(server_path)
            if resp.status == 200:
                logging.info('CRA liveserver is running')
                return True
            else:
                logging.warning('CRA liveserver is up but not serving bundle.js')
                return False
        except url_error.URLError as err:
            logging.warning('CRA liveserver is not running')
            return False
    else:
        return False
项目:pytorch-coriander    作者:hughperkins    | 项目源码 | 文件源码
def download_file(url, path, binary=True):
    if sys.version_info < (3,):
        import urllib2
        request = urllib2
        error = urllib2
    else:
        import urllib.request
        import urllib.error
        request = urllib.request
        error = urllib.error

    if os.path.exists(path):
        return True
    try:
        data = request.urlopen(url, timeout=15).read()
        with open(path, 'wb' if binary else 'w') as f:
            f.write(data)
        return True
    except error.URLError as e:
        return False
项目:pclcmd    作者:abbat    | 项目源码 | 文件源码
def pcl_put(options, source, target):
    """
    ?????????? ?????????? ??????? ???????? ????? ? ????????? (pcl_put_retry)
    """
    pcl_verbose("Transfer: {0} ({1}) -> {2}".format(source, pcl_human(os.path.getsize(source)), target), options.verbose)

    retry = 0
    while True:
        try:
            pcl_put_retry(options, source, target)
            break
        except (pclURLError, pclBadStatusLine, pclCannotSendRequest, ssl.SSLError, socket.error, pclError) as e:
            pcl_can_query_retry(e)
            retry += 1
            pcl_debug("Retry {0}/{1}: {2}".format(retry, options.retries, e), options.debug)
            if retry >= options.retries:
                raise pclError(1, e)
            time.sleep(options.delay)
项目:pclcmd    作者:abbat    | 项目源码 | 文件源码
def pcl_get(options, source, target):
    """
    ?????????? ?????????? ??????? ????????? ????? ?? ????????? (pcl_get_retry)
    """
    pcl_verbose("Transfer: {0} -> {1}".format(source, target), options.verbose)

    retry = 0
    while True:
        try:
            pcl_get_retry(options, source, target)
            break
        except (pclURLError, pclBadStatusLine, pclCannotSendRequest, ssl.SSLError, socket.error, pclError) as e:
            pcl_can_query_retry(e)
            retry += 1
            pcl_debug("Retry {0}/{1}: {2}".format(retry, options.retries, e), options.debug)
            if retry >= options.retries:
                raise pclError(1, e)
            time.sleep(options.delay)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def download(self, source, dest):
        """
        Download an archive file.

        :param str source: URL pointing to an archive file.
        :param str dest: Local path location to download archive file to.
        """
        # propogate all exceptions
        # URLError, OSError, etc
        proto, netloc, path, params, query, fragment = urlparse(source)
        if proto in ('http', 'https'):
            auth, barehost = splituser(netloc)
            if auth is not None:
                source = urlunparse((proto, barehost, path, params, query, fragment))
                username, password = splitpasswd(auth)
                passman = HTTPPasswordMgrWithDefaultRealm()
                # Realm is set to None in add_password to force the username and password
                # to be used whatever the realm
                passman.add_password(None, source, username, password)
                authhandler = HTTPBasicAuthHandler(passman)
                opener = build_opener(authhandler)
                install_opener(opener)
        response = urlopen(source)
        try:
            with open(dest, 'wb') as dest_file:
                dest_file.write(response.read())
        except Exception as e:
            if os.path.isfile(dest):
                os.unlink(dest)
            raise e

    # Mandatory file validation via Sha1 or MD5 hashing.
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def download(self, source, dest):
        """
        Download an archive file.

        :param str source: URL pointing to an archive file.
        :param str dest: Local path location to download archive file to.
        """
        # propogate all exceptions
        # URLError, OSError, etc
        proto, netloc, path, params, query, fragment = urlparse(source)
        if proto in ('http', 'https'):
            auth, barehost = splituser(netloc)
            if auth is not None:
                source = urlunparse((proto, barehost, path, params, query, fragment))
                username, password = splitpasswd(auth)
                passman = HTTPPasswordMgrWithDefaultRealm()
                # Realm is set to None in add_password to force the username and password
                # to be used whatever the realm
                passman.add_password(None, source, username, password)
                authhandler = HTTPBasicAuthHandler(passman)
                opener = build_opener(authhandler)
                install_opener(opener)
        response = urlopen(source)
        try:
            with open(dest, 'wb') as dest_file:
                dest_file.write(response.read())
        except Exception as e:
            if os.path.isfile(dest):
                os.unlink(dest)
            raise e

    # Mandatory file validation via Sha1 or MD5 hashing.
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def download(self, source, dest):
        """
        Download an archive file.

        :param str source: URL pointing to an archive file.
        :param str dest: Local path location to download archive file to.
        """
        # propogate all exceptions
        # URLError, OSError, etc
        proto, netloc, path, params, query, fragment = urlparse(source)
        if proto in ('http', 'https'):
            auth, barehost = splituser(netloc)
            if auth is not None:
                source = urlunparse((proto, barehost, path, params, query, fragment))
                username, password = splitpasswd(auth)
                passman = HTTPPasswordMgrWithDefaultRealm()
                # Realm is set to None in add_password to force the username and password
                # to be used whatever the realm
                passman.add_password(None, source, username, password)
                authhandler = HTTPBasicAuthHandler(passman)
                opener = build_opener(authhandler)
                install_opener(opener)
        response = urlopen(source)
        try:
            with open(dest, 'wb') as dest_file:
                dest_file.write(response.read())
        except Exception as e:
            if os.path.isfile(dest):
                os.unlink(dest)
            raise e

    # Mandatory file validation via Sha1 or MD5 hashing.
项目:BioDownloader    作者:biomadeira    | 项目源码 | 文件源码
def _download(self):
        try:
            try:
                import urllib.request
                from urllib.error import URLError, HTTPError
                with urllib.request.urlopen(self.url) as response, \
                        open(self.outputfile_origin, 'wb') as outfile:
                    shutil.copyfileobj(response, outfile)
            except (AttributeError, ImportError):
                import urllib
                urllib.urlretrieve(self.url, self.outputfile_origin)
        except (URLError, HTTPError, IOError, Exception) as e:
            logger.debug("Unable to retrieve %s for %s", self.url, e)
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_download_failed_URLError(self, mock_urlopen):
        mock_urlopen.side_effect = URLError(None)
        fake_request = urllib2.Request('http://fakeurl.com')

        self.assertRaises(
            self.glance.RetryableError,
            self.glance._download_tarball_and_verify,
            fake_request, 'fake_staging_path')
项目:charm-heat    作者:openstack    | 项目源码 | 文件源码
def download(self, source, dest):
        """
        Download an archive file.

        :param str source: URL pointing to an archive file.
        :param str dest: Local path location to download archive file to.
        """
        # propogate all exceptions
        # URLError, OSError, etc
        proto, netloc, path, params, query, fragment = urlparse(source)
        if proto in ('http', 'https'):
            auth, barehost = splituser(netloc)
            if auth is not None:
                source = urlunparse((proto, barehost, path, params, query, fragment))
                username, password = splitpasswd(auth)
                passman = HTTPPasswordMgrWithDefaultRealm()
                # Realm is set to None in add_password to force the username and password
                # to be used whatever the realm
                passman.add_password(None, source, username, password)
                authhandler = HTTPBasicAuthHandler(passman)
                opener = build_opener(authhandler)
                install_opener(opener)
        response = urlopen(source)
        try:
            with open(dest, 'wb') as dest_file:
                dest_file.write(response.read())
        except Exception as e:
            if os.path.isfile(dest):
                os.unlink(dest)
            raise e

    # Mandatory file validation via Sha1 or MD5 hashing.