Python requests 模块,ConnectTimeout() 实例源码

我们从Python开源项目中,提取了以下26个代码示例,用于说明如何使用requests.ConnectTimeout()

项目:icheckin    作者:chunkhang    | 项目源码 | 文件源码
def update_status():
   ''' 
   -< True  : Version is test; no update needed
   -> False : Check for update failed
   -> str   : Latest version to update to
   '''
   try:
      r = requests.get(constants.UPDATES, timeout=5)
   except (requests.ConnectionError, requests.ConnectTimeout, 
      requests.ReadTimeout):
      return False
   else:
      latestVersion = r.json()['latest_version']
      if constants.VERSION == latestVersion:
         return True
      else:
         return latestVersion
项目:my-tools    作者:fiht    | 项目源码 | 文件源码
def get_title_by_url(url, timeout=5, pattern='<title>(.*?)</title>'):
        """return {url:title}, if title do not find we return{url:None}"""
        try:
            raw_http = requests.get(url, timeout=timeout)
            raw_http.encoding = raw_http.apparent_encoding
        except requests.ConnectionError or requests.ConnectTimeout:
            logger_util.log_debug('Connect failed to %s ' % url)
            return
        title = re.findall(pattern, raw_http.text)
        if not title:
            logger_util.log_debug('This page do not have title %s' % url)
            return {url: None}
        else:
            return {url: title[0]}

    # ----------------------------------------------------------------------
项目:uhu    作者:updatehub    | 项目源码 | 文件源码
def test_raises_error_with_any_other_requests_exception(self, mock):
        exceptions = [
            requests.exceptions.HTTPError,
            requests.exceptions.ConnectionError,
            requests.exceptions.ProxyError,
            requests.exceptions.SSLError,
            requests.exceptions.Timeout,
            requests.exceptions.ConnectTimeout,
            requests.exceptions.ReadTimeout,
            requests.exceptions.URLRequired,
            requests.exceptions.TooManyRedirects,
            requests.exceptions.MissingSchema,
            requests.exceptions.InvalidSchema,
            requests.exceptions.InvalidURL,
            requests.exceptions.InvalidHeader,
            requests.exceptions.ChunkedEncodingError,
            requests.exceptions.ContentDecodingError,
            requests.exceptions.StreamConsumedError,
            requests.exceptions.RetryError,
            requests.exceptions.UnrewindableBodyError,
        ]
        mock.side_effect = exceptions
        for _ in exceptions:
            with self.assertRaises(HTTPError):
                request('GET', 'foo')
项目:learn-python    作者:ankitpokhrel    | 项目源码 | 文件源码
def crawl(self, job: Job):
        try:
            worker = Worker(job)
            worker.start()
            worker.join()
        except ValueError as e:
            print("Couldn't parse url: ", job.url, e)
            pass
        except (requests.ConnectionError, requests.ConnectTimeout, requests.exceptions.SSLError) as e:
            print("Couldn't parse url: ", job.url, e.strerror)
            pass
        else:
            while not linkparser.links.empty():
                job = linkparser.links.get()

                if job.priority < self._depth:
                    self.crawl(job)

            linkparser.links.task_done()
项目:learn-python    作者:ankitpokhrel    | 项目源码 | 文件源码
def crawl(self, job: Job):
        try:
            r = requests.get(job.url)

            link_parser = LinkParser(job)

            link_parser.parse(r.text)

            links = link_parser.get_links()
        except ValueError as e:
            print("Couldn't parse url: ", job.url, e)
            pass
        except (requests.ConnectionError, requests.ConnectTimeout, requests.exceptions.SSLError) as e:
            print("Couldn't parse url: ", job.url, e.strerror)
            pass
        else:
            while not links.empty():
                job = links.get()

                self.print_status(job, links)

                if job.priority < self._depth:
                    self.crawl(job)

            links.task_done()
项目:dayworkspace    作者:copie    | 项目源码 | 文件源码
def __call__(self):
        try:
            req = requests.get(self.url, headers=self.header,
                               timeout=10, proxies=self.proxies)
        except requests.ConnectTimeout or requests.exceptions.ReadTimeout as e:
            print(f"??{self.url}????")
            self.status = False
            return {"status": self.status, 'html': ''}
        try:
            encodeing = chardet.detect(req.content)['encoding']
            html = req.content.decode(encodeing, errors='replace')
        except Exception as e:
            print(e)
            print("?????,??????......")
            self.status = False
            return {"status": self.status, 'html': ''}
        return {"status": self.status, 'html': html}
项目:postmarker    作者:Stranger6667    | 项目源码 | 文件源码
def test_on_exception(self, catch_signal):
        with patch('requests.Session.request', side_effect=ConnectTimeout):
            with catch_signal(on_exception) as handler:
                send_mail(fail_silently=True, **SEND_KWARGS)
        assert handler.called
        kwargs = handler.call_args[1]
        assert kwargs['sender'] == EmailBackend
        assert kwargs['signal'] == on_exception
        assert isinstance(kwargs['exception'], ConnectTimeout)
        assert len(kwargs['raw_messages']) == 1
项目:jumpserver-python-sdk    作者:jumpserver    | 项目源码 | 文件源码
def request(self, api_name=None, pk=None, method='get', use_auth=True,
                data=None, params=None, content_type='application/json'):

        if api_name in self.api_url_mapping:
            path = self.api_url_mapping.get(api_name)
            if pk and '%s' in path:
                path = path % pk
        else:
            path = '/'

        url = self.endpoint.rstrip('/') + path
        self.req = req = Request(url, method=method, data=data,
                                 params=params, content_type=content_type,
                                 app_name=self.app_name)
        if use_auth:
            if not self._auth:
                raise RequestError('Authentication required')
            else:
                self._auth.sign_request(req)
        try:
            result = req.request()
            if result.status_code > 500:
                logging.warning('Server internal error')
        except (requests.ConnectionError, requests.ConnectTimeout):
            result = FakeResponse()
            logging.warning('Connect endpoint: {} error'.format(self.endpoint))
        return self.parse_result(result)
项目:monitoring-scripts    作者:CntoDev    | 项目源码 | 文件源码
def main(url, timeout=30, redirect_unknown=True, debug=False):
    """Actual monitoring execution"""

    logging.basicConfig(level=logging.WARNING)
    logger = logging.getLogger(__name__)

    if debug:  # pragma: no cover
        logger.setLevel(logging.DEBUG)
        logger.info('debug logging enabled')

    # Check if URL is valid
    logger.debug('perform URL validation check')
    if not valid_http_url(url):
        nagios.plugin_exit(nagios.Codes.UNKNOWN, 'provided URL is not valid')

    # Send a HEAD request
    logger.debug('send HEAD request')
    try:
        response = requests.head(url, timeout=timeout)
    except requests.ConnectTimeout:
        nagios.plugin_exit(nagios.Codes.CRITICAL, 'connection timeout')
    except requests.ReadTimeout:
        nagios.plugin_exit(nagios.Codes.CRITICAL, 'no response received before'
                                                  'timeout')
    except requests.ConnectionError:
        nagios.plugin_exit(nagios.Codes.UNKNOWN, 'connection error')
    else:
        logger.debug('response received')
        if response.status_code == requests.codes.ok:
            # Response is OK
            nagios.plugin_exit(nagios.Codes.OK,
                               'status code is %d' % response.status_code)
        elif redirect_unknown and response.status_code == requests.codes.found:
            # Redirect considered as UNKNOWN
            nagios.plugin_exit(nagios.Codes.UNKNOWN,
                               'redirection with code %d' %
                               response.status_code)
        else:
            # Other code, considered not working
            nagios.plugin_exit(nagios.Codes.CRITICAL,
                               'status code is %d' % response.status_code)
项目:zoomdata-tools    作者:Zoomdata    | 项目源码 | 文件源码
def __request_data(self):
        if self.__is_cache_valid():
            return True

        # requesting data
        try:
            # setting 5 sec timeouts for connect and read
            r = requests.get(self.data_url, verify=False, allow_redirects=True, timeout=(5, 5))
        except requests.ConnectionError as e:
            print("Unable to connect to ", self.data_url, " error is ", e, file=sys.stderr)
            return False
        except requests.ConnectTimeout as e:
            print("Timed out connection to ", self.data_url, " error is ", e, file=sys.stderr)
            return False
        except requests.ReadTimeout as e:
            print("Timed out while reading data from ", self.data_url, " error is ", e, file=sys.stderr)
            return False

        if r.status_code == 200:
            # got HTTP/200 for request - storing it in cache
            try:
                open(self.temp_file_name, mode="w").write(json.dumps(r.json()))
            except IOError as e:
                print("IO error while trying to store cache into file ", self.temp_file_name, " error is ",
                      e, file=sys.stderr)
                return False
            return True
        else:
            return False
项目:sherlock    作者:vandalvnl    | 项目源码 | 文件源码
def isActiveLink(link):
        try:
            request = requests.get(link)
            return request.status_code
        except (requests.ConnectionError, requests.ConnectTimeout):
            return False
项目:Uxie    作者:Protectator    | 项目源码 | 文件源码
def downloadFile(url):
    tries = 0
    while tries < 3:
        try:
            r = requests.get(BASE_URL + url)
            r.encoding = "utf-8"
            fileContent = r.text
            return fileContent.strip(), len(fileContent)
        except (requests.ConnectionError, requests.ConnectTimeout) as e:
            tries += 1
            if tries >= 3:
                raise e


# Decorator
项目:quant    作者:yutiansut    | 项目源码 | 文件源码
def _download(self, request, spider):
        def _retry():
            if self.retry_on_download_timeout:
                self.logger.debug('Read timed out, retry request {}'.format(request))
                self.crawl(request, spider)

        try:
            self._process_request(request, spider)

            if request is None:
                return

            method = request.method.upper()

            resp = None
            kw_params = {
                'timeout': self.download_timeout,
                'cookies': request.cookies,
                'headers': request.headers,
                'proxies': {
                    'http': request.proxy,
                    'https': request.proxy
                }
            }

            self.logger.debug('[{}]<{} {}>'.format(spider.name, method, request.url))

            if method == 'GET':
                resp = requests.get(request.url, **kw_params)
            elif method == 'POST':
                resp = requests.post(request.url, request.data, **kw_params)

            self._responses_queue.put((Response(resp.url, resp.status_code, resp.content, request,
                                                resp.cookies), spider))
        except (requests.ReadTimeout, requests.ConnectTimeout, requests.ConnectionError):
            _retry()
        except Exception as err:
            self.logger.error(err, exc_info=True)
项目:subDomainsBrute    作者:0xa-saline    | 项目源码 | 文件源码
def search_by_baidu(self, url, index):
        # print 'baidu', GetNowTime()
        if url is None:
            url = r'https://www.baidu.com/s?&wd=site%3A' + self.domain
        try:
            baidu_rsp = requests.get(url=url, headers=headers, timeout=5)
        except requests.ConnectTimeout, requests.ConnectionError:
            # print e.message, "????????"
            return url, index
项目:subDomainsBrute    作者:0xa-saline    | 项目源码 | 文件源码
def is_https1(self):
        url = self.PREFIX_URL2 + self.raw_domain
        try:
            requests.get(url, timeout=4)
        except requests.ConnectionError, requests.ConnectTimeout:
            return False
项目:subDomainsBrute    作者:0xa-saline    | 项目源码 | 文件源码
def is_https2(self):
        url = self.PREFIX_URL1 + self.raw_domain
        try:
            requests.get(url, timeout=4)
        except requests.ConnectionError, requests.ConnectTimeout:
            return False
项目:subDomainsBrute    作者:0xa-saline    | 项目源码 | 文件源码
def get_html(url):
    for i in range(3):
        try:
            head=headers
            head['Referer']='https://www.bing.com'
            rsp = requests.get(url, headers=head, timeout=5)
        except requests.ConnectionError, requests.ConnectTimeout:
            # print e.message, 'bing?????????5?'
            time.sleep(5)
            continue
        if rsp.status_code == 200:
            root = etree.HTML(rsp.text)
            return root
        else:
            time.sleep(5)
项目:builder    作者:elifesciences    | 项目源码 | 文件源码
def acme_enabled(url):
    "if given url can be hit and it looks like the acme hidden dir exists, return True."
    url = 'http://' + url + "/.well-known/acme-challenge/" # ll: http://lax.elifesciences.org/.well-known/acme-challenge
    try:
        resp = requests.head(url, allow_redirects=False)
        if 'crm.elifesciences' in url:
            return resp.status_code == 404 # apache behaves differently to nginx
        return resp.status_code == 403 # forbidden rather than not found.
    except (requests.ConnectionError, requests.ConnectTimeout):
        # couldn't connect for whatever reason
        return False
项目:yjspider    作者:junyu1991    | 项目源码 | 文件源码
def _download(self,url='http://zt.bdinfo.net/speedtest/wo3G.rar',filepath="/work/test/tempdown"):
        '''
        Download file from website to local file
        @param:url,download url
        @param:filepath:the file location u want to save file
        @return:None
        '''
        if not url:
            return
        print('downloading:%s' % url)
        #r=requests.get(url,stream=True,timeout=5)
        filename=os.path.join(filepath,'.'+urlparse.urlsplit(url).path)
        chunk_size=1024*1024
        print filename
        if not os.path.exists(os.path.dirname(filename)):
            os.makedirs(os.path.dirname(filename))
        try:
            r=requests.get(url,stream=True,timeout=5)
            with open(filename,'wb')  as f:
                for data in r.iter_content(chunk_size=chunk_size):
                    #data=temp.read(1024*1024)
                    f.write(data)
            self._add_downloaded(url)
        except requests.ConnectTimeout,requests.ReadTimeout:
            print("Download %s timeout,this will redownload later.\n" % (url))
            if self._redis_enable:
                self._r.lpush(self._download_list,url)
项目:yjspider    作者:junyu1991    | 项目源码 | 文件源码
def _get_response(self,url,method='get',headers={},files=[],data=None,cookies=None,cert=None,timeout=30,**kwargs):
        method=method.upper()
        #self._s.headers.update(headers)
        pre=requests.Request(method=method,url=url,data=data,files=files)
        prepped=self._s.prepare_request(pre)
        try:
            with self._s.send(prepped,stream=True,cert=cert,timeout=timeout) as resp:
                #self._header.parse_header(dict(resp.headers))
                #self._s.headers.update(self._header.get_default_header())
                #content_type=resp.headers.get('content-type')
                #encoding=self._get_content_encoding(content_type)
                #regx=re.compile('.*(text\/html|text\/xml).*')
                if resp.status_code==requests.codes.OK:
                    #Don't handle redirect url for now
                    '''
                    with open('temp.txt','wb') as f:
                        f.write(resp.content)
                    '''
                    self._handler.handle_data(resp)
                elif resp.status_code!=requests.codes.OK:
                    print("Connected url %s \t %d" % (url,resp.status_code))
                else:
                    #If the response is not html or xml ,save the url to redis 
                    pass
        except requests.ConnectTimeout,requests.ReadTimeout:
            print('Connect %s timeout .\n' % (url))
            if self._redis_enable:
                self._r.lpush(self._start,url)
            else:
                task.url_task.append(url)
项目:uhu    作者:updatehub    | 项目源码 | 文件源码
def request(method, url, *args, sign=True, **kwargs):
    custom_ca_certs_file = get_custom_ca_certs_file()

    if custom_ca_certs_file is not None and 'verify' not in kwargs:
        kwargs['verify'] = custom_ca_certs_file

    try:
        if sign:
            response = Request(url, method, *args, **kwargs).send()
        else:
            response = requests.request(
                method, url, *args, timeout=30, **kwargs)
    except HTTPError as error:
        raise error
    except (requests.exceptions.MissingSchema,
            requests.exceptions.InvalidSchema,
            requests.exceptions.URLRequired,
            requests.exceptions.InvalidURL):
        raise HTTPError('You have provided an invalid server URL.')
    except requests.ConnectTimeout:
        raise HTTPError('Connection timed out. Try again later.')
    except requests.ConnectionError:
        raise HTTPError('Server is not available. Try again later.')
    except requests.RequestException:
        raise HTTPError(UNKNOWN_ERROR)
    if response.status_code == 401:
        raise HTTPError('Unautorized. Did you set your credentials?')
    elif not response.ok:
        raise HTTPError(format_server_error(response))
    return response
项目:uhu    作者:updatehub    | 项目源码 | 文件源码
def test_raises_error_when_server_is_unavailable(self, mock):
        exceptions = [requests.ConnectionError, requests.ConnectTimeout]
        mock.side_effect = exceptions
        for _ in exceptions:
            with self.assertRaises(HTTPError):
                request('GET', 'foo')
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def execute(self, method, *args):
        payload = dumps(args, methodname=method, allow_none=True)
        body = gzip.compress(payload.encode('utf8'))
        try:
            res = await self.loop.run_in_executor(None, self.__request, body)
            data, _ = loads(res.text, use_datetime=True)
            if isinstance(data, (tuple, list)) and len(data) > 0 and len(data[0]) > 0:
                if isinstance(data[0][0], dict) and 'faultCode' in data[0][0]:
                    raise DedimaniaFault(faultCode=data[0][0]['faultCode'], faultString=data[0][0]['faultString'])
                self.retries = 0
                return data[0]
            raise DedimaniaTransportException('Invalid response from dedimania!')
        except (ConnectionError, ReadTimeout, ConnectionRefusedError) as e:
            raise DedimaniaTransportException(e) from e
        except ConnectTimeout as e:
            raise DedimaniaTransportException(e) from e
        except DedimaniaTransportException:
            # Try to setup new session.
            self.retries += 1
            if self.retries > 5:
                raise DedimaniaTransportException('Dedimania didn\'t gave the right answer after few retries!')
            self.client = requests.session()
            try:
                await self.authenticate()
                return await self.execute(method, *args)
            except Exception as e:
                logger.error('XML-RPC Fault retrieved from Dedimania: {}'.format(str(e)))
                handle_exception(e, __name__, 'execute')
                raise DedimaniaTransportException('Could not retrieve data from dedimania!')
        except DedimaniaFault as e:
            if 'Bad SessionId' in e.faultString or ('SessionId' in e.faultString and 'not found' in e.faultString):
                try:
                    self.retries += 1
                    if self.retries > 5:
                        raise DedimaniaTransportException('Max retries reached for reauthenticating with dedimania!')
                    await self.authenticate()
                    return await self.execute(method, *args)
                except:
                    return
            logger.error('XML-RPC Fault retrieved from Dedimania: {}'.format(str(e)))
            handle_exception(e, __name__, 'execute', extra_data={
                'dedimania_retries': self.retries,
            })
            raise DedimaniaTransportException('Could not retrieve data from dedimania!')
项目:SpiderConfig    作者:brady-chen    | 项目源码 | 文件源码
def get_urls(self, get_proxie_or_not=False):
        """
        :type get_proxie_or_not: bool
        :param get_proxie_or_not: ??????ip
        :return: ?????url???????url??
        """
        list_url = []
        try:
            if get_proxie_or_not:
                p = Proxies()
                p.get_ip_and_port()
                self.session.proxies = {
                    "http": p.ip_and_port,
                    "https": p.ip_and_port
                }
            response = self.session.get(self.start_url, timeout=30)
            if response.status_code == 200:
                html = response.content
            else:
                # ??selenium+phantomjs???????
                # ??phantomjs????????
                desired_capabilities = DesiredCapabilities.PHANTOMJS.copy()
                headers = self.headers
                for key, value in headers.iteritems():
                    desired_capabilities['phantomjs.page.customHeaders.{}'.format(key)] = value
                driver = webdriver.PhantomJS(
                    desired_capabilities=desired_capabilities
                )
                driver.get(self.start_url)
                html = driver.page_source
                driver.quit()
            soup = BeautifulSoup(html, 'lxml')
            # ?????????????BeautifulSoup?????
            urls = soup.find()
            assert urls is not None
            repeat_num = 0
            for url in urls:
                if url['href'] not in list_url:
                    list_url.append(url['href'])
                else:
                    repeat_num += 1
            print "??%d??????????" % repeat_num
        except requests.ConnectTimeout:
            print "url????????????????"

        if list_url:
            return list_url
        else:
            print "??url????????"
            raise ValueError
项目:SpiderConfig    作者:brady-chen    | 项目源码 | 文件源码
def get_htmls(self, urls, get_proxie_or_not=False):
        """
        :type urls: list
        :type get_proxie_or_not: bool
        :param urls: ?????url??
        :param get_proxie_or_not: ??????ip
        :return: ???????html??
        """
        list_html = []
        for url in urls:
            try:
                if get_proxie_or_not:
                    p = Proxies()
                    p.get_ip_and_port()
                    self.session.proxies = {
                        "http": p.ip_and_port,
                        "https": p.ip_and_port
                    }
                response = self.session.get(url, timeout=30)
                if response.status_code == 200:
                    html = response.content
                else:
                    # ??selenium+phantomjs???????
                    # ??phantomjs????????
                    desired_capabilities = DesiredCapabilities.PHANTOMJS.copy()
                    headers = self.headers
                    for key, value in headers.iteritems():
                        desired_capabilities['phantomjs.page.customHeaders.{}'.format(key)] = value
                    driver = webdriver.PhantomJS(
                        desired_capabilities=desired_capabilities
                    )
                    driver.get(self.start_url)
                    html = driver.page_source
                    driver.quit()
                assert html is not None
                list_html.append(BeautifulSoup(html, 'lxml'))
            except requests.ConnectTimeout:
                print "url????"
        if list_html:
            return list_html
        else:
            print "??html???????????"
            raise ValueError
项目:icheckin    作者:chunkhang    | 项目源码 | 文件源码
def checkin(cred, code):
   '''
   -> 0: Successful check-in
   -> 1: No internet connection
   -> 2: Invalid credentials
   -> 3: Not connected to SunwayEdu Wi-Fi
   -> 4: Invalid code
   -> 5: Wrong class
   -> 6: Already checked-in
   '''
   # Start a session
   session = requests.Session()
   # Login to iZone
   payload = {
      'form_action': 'submitted',
      'student_uid': cred[0],
      'password': cred[1],
   }
   try:
      r = session.post(constants.LOGIN, data=payload)
   except requests.ConnectionError:
      return 1
   if not r.history:
      return 2
   # Check for SunwayEdu Wi-Fi
   try:
      r = requests.get(constants.WIFI, timeout=2)
   except requests.ConnectTimeout:
      return 3
   except requests.ConnectionError:
      return 1
   # Check-in with code
   try:
      r = session.post(constants.CHECKIN, data={'checkin_code': code}, 
         timeout=2)
   except (requests.ReadTimeout, requests.ConnectionError):
      return 1
   if 'Checkin code not valid.' in r.text or \
      'The specified URL cannot be found.' in r.text:
      return 4
   if 'You cannot check in to a class you are not a part of.' in r.text:
      return 5
   if 'You have already checked in' in r.text:
      return 6
   return 0