Python requests 模块,ReadTimeout() 实例源码

我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用requests.ReadTimeout()

项目:icheckin    作者:chunkhang    | 项目源码 | 文件源码
def update_status():
   ''' 
   -< True  : Version is test; no update needed
   -> False : Check for update failed
   -> str   : Latest version to update to
   '''
   try:
      r = requests.get(constants.UPDATES, timeout=5)
   except (requests.ConnectionError, requests.ConnectTimeout, 
      requests.ReadTimeout):
      return False
   else:
      latestVersion = r.json()['latest_version']
      if constants.VERSION == latestVersion:
         return True
      else:
         return latestVersion
项目:docker-enforcer    作者:piontec    | 项目源码 | 文件源码
def get_params(self, container_id: str) -> Optional[Dict[str, Any]]:
        if self._config.cache_params and container_id in self._params_cache:
            logger.debug("Returning cached params for container {0}".format(container_id))
            return self._params_cache[container_id]

        logger.debug("[{0}] Starting to fetch params for {1}".format(threading.current_thread().name, container_id))
        try:
            params = self._client.inspect_container(container_id)
        except NotFound as e:
            logger.warning("Container {0} not found - {1}.".format(container_id, e))
            return None
        except (ReadTimeout, ProtocolError, JSONDecodeError) as e:
            logger.error("Communication error when fetching params for container {0}: {1}".format(container_id, e))
            return {}
        except Exception as e:
            logger.error("Unexpected error when fetching params for container {0}: {1}".format(container_id, e))
            return {}
        logger.debug("[{0}] Params fetched for {1}".format(threading.current_thread().name, container_id))

        if not self._config.cache_params:
            return params

        logger.debug("[{0}] Storing params of {1} in cache".format(threading.current_thread().name, container_id))
        self._params_cache[container_id] = params
        return params
项目:ImagesOfNetwork    作者:amici-ursi    | 项目源码 | 文件源码
def _run_once(self):
        try:
            if self.settings.DO_INBOX or self.settings.DO_FALSEPOS:
                await self._process_messages()
                asyncio.sleep(5)

            for multi in settings.MULTIREDDITS:
                if self.settings.DO_OC:
                    asyncio.sleep(5)
                    await self._process_oc_stream(multi)

            for multi in settings.MULTIREDDITS + [settings.PARENT_SUB]:
                if self.settings.DO_MODLOG:
                    asyncio.sleep(5)
                    await self._process_network_modlog(multi)

        except (HTTPException, requests.ReadTimeout,
                requests.ConnectionError) as ex:
            LOG.error('%s: %s', type(ex), ex)
        else:
            LOG.debug('All tasks processed.')

    # ======================================================
项目:monitoring-scripts    作者:CntoDev    | 项目源码 | 文件源码
def main(url, timeout=30, redirect_unknown=True, debug=False):
    """Actual monitoring execution"""

    logging.basicConfig(level=logging.WARNING)
    logger = logging.getLogger(__name__)

    if debug:  # pragma: no cover
        logger.setLevel(logging.DEBUG)
        logger.info('debug logging enabled')

    # Check if URL is valid
    logger.debug('perform URL validation check')
    if not valid_http_url(url):
        nagios.plugin_exit(nagios.Codes.UNKNOWN, 'provided URL is not valid')

    # Send a HEAD request
    logger.debug('send HEAD request')
    try:
        response = requests.head(url, timeout=timeout)
    except requests.ConnectTimeout:
        nagios.plugin_exit(nagios.Codes.CRITICAL, 'connection timeout')
    except requests.ReadTimeout:
        nagios.plugin_exit(nagios.Codes.CRITICAL, 'no response received before'
                                                  'timeout')
    except requests.ConnectionError:
        nagios.plugin_exit(nagios.Codes.UNKNOWN, 'connection error')
    else:
        logger.debug('response received')
        if response.status_code == requests.codes.ok:
            # Response is OK
            nagios.plugin_exit(nagios.Codes.OK,
                               'status code is %d' % response.status_code)
        elif redirect_unknown and response.status_code == requests.codes.found:
            # Redirect considered as UNKNOWN
            nagios.plugin_exit(nagios.Codes.UNKNOWN,
                               'redirection with code %d' %
                               response.status_code)
        else:
            # Other code, considered not working
            nagios.plugin_exit(nagios.Codes.CRITICAL,
                               'status code is %d' % response.status_code)
项目:zoomdata-tools    作者:Zoomdata    | 项目源码 | 文件源码
def __request_data(self):
        if self.__is_cache_valid():
            return True

        # requesting data
        try:
            # setting 5 sec timeouts for connect and read
            r = requests.get(self.data_url, verify=False, allow_redirects=True, timeout=(5, 5))
        except requests.ConnectionError as e:
            print("Unable to connect to ", self.data_url, " error is ", e, file=sys.stderr)
            return False
        except requests.ConnectTimeout as e:
            print("Timed out connection to ", self.data_url, " error is ", e, file=sys.stderr)
            return False
        except requests.ReadTimeout as e:
            print("Timed out while reading data from ", self.data_url, " error is ", e, file=sys.stderr)
            return False

        if r.status_code == 200:
            # got HTTP/200 for request - storing it in cache
            try:
                open(self.temp_file_name, mode="w").write(json.dumps(r.json()))
            except IOError as e:
                print("IO error while trying to store cache into file ", self.temp_file_name, " error is ",
                      e, file=sys.stderr)
                return False
            return True
        else:
            return False
项目:quant    作者:yutiansut    | 项目源码 | 文件源码
def _download(self, request, spider):
        def _retry():
            if self.retry_on_download_timeout:
                self.logger.debug('Read timed out, retry request {}'.format(request))
                self.crawl(request, spider)

        try:
            self._process_request(request, spider)

            if request is None:
                return

            method = request.method.upper()

            resp = None
            kw_params = {
                'timeout': self.download_timeout,
                'cookies': request.cookies,
                'headers': request.headers,
                'proxies': {
                    'http': request.proxy,
                    'https': request.proxy
                }
            }

            self.logger.debug('[{}]<{} {}>'.format(spider.name, method, request.url))

            if method == 'GET':
                resp = requests.get(request.url, **kw_params)
            elif method == 'POST':
                resp = requests.post(request.url, request.data, **kw_params)

            self._responses_queue.put((Response(resp.url, resp.status_code, resp.content, request,
                                                resp.cookies), spider))
        except (requests.ReadTimeout, requests.ConnectTimeout, requests.ConnectionError):
            _retry()
        except Exception as err:
            self.logger.error(err, exc_info=True)
项目:docker-enforcer    作者:piontec    | 项目源码 | 文件源码
def check_container(self, container_id: str, check_source: CheckSource, remove_from_cache: bool=False) \
            -> Optional[Container]:
        try:
            if remove_from_cache:
                self.remove_from_cache(container_id)

            if not self._config.disable_params:
                params = self.get_params(container_id)
            else:
                params = {}
            if not self._config.disable_metrics:
                logger.debug("[{0}] Starting to fetch metrics for {1}".format(threading.current_thread().name,
                                                                              container_id))
                metrics = self._client.stats(container=container_id, decode=True, stream=False)
            else:
                metrics = {}
            logger.debug("[{0}] Fetched data for container {1}".format(threading.current_thread().name, container_id))
        except NotFound as e:
            logger.warning("Container {0} not found - {1}.".format(container_id, e))
            return None
        except (ReadTimeout, ProtocolError, JSONDecodeError) as e:
            logger.error("Communication error when fetching info about container {0}: {1}".format(container_id, e))
            return None
        except Exception as e:
            logger.error("Unexpected error when fetching info about container {0}: {1}".format(container_id, e))
            return None
        if params is None or metrics is None:
            logger.warning("Params or metrics were not fetched for container {}. Not returning container."
                           .format(container_id))
            return None
        return Container(container_id, params, metrics, 0, check_source)
项目:docker-enforcer    作者:piontec    | 项目源码 | 文件源码
def check_containers(self, check_source: CheckSource) -> Iterable[Container]:
        with self._padlock:
            if self._check_in_progress:
                logger.warning("[{0}] Previous check did not yet complete, consider increasing CHECK_INTERVAL_S"
                               .format(threading.current_thread().name))
                return
            self._check_in_progress = True
        logger.debug("Periodic check start: connecting to get the list of containers")
        self.last_check_containers_run_start_timestamp = datetime.datetime.utcnow()
        try:
            containers = self._client.containers(quiet=True)
            logger.debug("[{0}] Fetched containers list from docker daemon".format(threading.current_thread().name))
        except (ReadTimeout, ProtocolError, JSONDecodeError) as e:
            logger.error("Timeout while trying to get list of containers from docker: {0}".format(e))
            with self._padlock:
                self._check_in_progress = False
            self.last_periodic_run_ok = False
            return
        except Exception as e:
            logger.error("Unexpected error while trying to get list of containers from docker: {0}".format(e))
            with self._padlock:
                self._check_in_progress = False
            self.last_periodic_run_ok = False
            return
        ids = [container['Id'] for container in containers]
        for container_id in ids:
            container = self.check_container(container_id, check_source)
            if container is None:
                continue
            yield container
        logger.debug("Containers checked")
        if self._config.cache_params:
            logger.debug("Purging cache")
            self.purge_cache(ids)
        self.last_periodic_run_ok = True
        self.last_check_containers_run_end_timestamp = datetime.datetime.utcnow()
        self.last_check_containers_run_time = self.last_check_containers_run_end_timestamp \
            - self.last_check_containers_run_start_timestamp
        logger.debug("Periodic check done")
        with self._padlock:
            self._check_in_progress = False
项目:docker-enforcer    作者:piontec    | 项目源码 | 文件源码
def get_events_observable(self) -> Iterable[Any]:
        successful = False
        ev = None
        while not successful:
            try:
                ev = self._client.events(decode=True)
            except (ReadTimeout, ProtocolError, JSONDecodeError) as e:
                logger.error("Communication error when subscribing for container events, retrying in 5s: {0}".format(e))
                time.sleep(5)
            except Exception as e:
                logger.error("Unexpected error when subscribing for container events, retrying in 5s: {0}".format(e))
                time.sleep(5)
            successful = True
        return ev
项目:docker-enforcer    作者:piontec    | 项目源码 | 文件源码
def kill_container(self, container: Container) -> None:
        try:
            self._client.stop(container.params['Id'])
        except (ReadTimeout, ProtocolError) as e:
            logger.error("Communication error when stopping container {0}: {1}".format(container.cid, e))
        except Exception as e:
            logger.error("Unexpected error when stopping container {0}: {1}".format(container.cid, e))
项目:ImagesOfNetwork    作者:amici-ursi    | 项目源码 | 文件源码
def run(self):
        while True:
            stream = submission_stream(self.r, 'all', verbosity=0)

            try:
                for post in stream:
                    self._do_post(post)
            except (HTTPException, requests.ReadTimeout,
                    requests.ConnectionError) as e:
                LOG.error('{}: {}'.format(type(e), e))
            else:
                LOG.error('Stream ended.')

            LOG.info('Sleeping for {} minutes.'.format(RETRY_MINUTES))
            sleep(60 * RETRY_MINUTES)
项目:hass_config    作者:azogue    | 项目源码 | 文件源码
def _get_remote_svg_tile(hass, host, port, prefix, name, width_tiles, c1, c2):
    """Get remote SVG file."""
    url_tile = URL_TILE_MASK.format(host, port, prefix, name, width_tiles)
    ok, r_svg, status = False, None, -1
    try:
        r_svg = yield from hass.async_add_job(
            partial(requests.get, url_tile, timeout=15))
        status = r_svg.status_code
        ok = r_svg.ok
    except (requests.ReadTimeout, requests.ConnectionError):
        pass
    if ok:
        # yield from asyncio.sleep(0)
        color1 = ', '.join([str(x) for x in c1])
        color2 = ', '.join([str(x) for x in c2])
        mask_bg = ('background-image: radial-gradient('
                   'farthest-corner at 70% 70%, rgba({}), rgba({}));'
                   .format(color1, color2))
        svg_text_sub = RG_TILE_BACKGROUND.sub(
            '{}{}'.format(MASK_SVG_STYLE, mask_bg),
            r_svg.content.decode(), count=1)
        # svg_text_sub = RG_TILE_SIZE.sub(
        #     # 'viewBox="0 0 600 400"',
        #     'viewBox="0 0 300 250"',
        #     svg_text_sub, count=1)
        # LOGGER.warning(svg_text_sub[:300])
        return svg_text_sub.encode()
    LOGGER.info('TILE REQUEST ERROR [code:{}]: {} - {}'
                .format(status, r_svg, url_tile))
    return None
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def execute(self, method, *args):
        payload = dumps(args, methodname=method, allow_none=True)
        body = gzip.compress(payload.encode('utf8'))
        try:
            res = await self.loop.run_in_executor(None, self.__request, body)
            data, _ = loads(res.text, use_datetime=True)
            if isinstance(data, (tuple, list)) and len(data) > 0 and len(data[0]) > 0:
                if isinstance(data[0][0], dict) and 'faultCode' in data[0][0]:
                    raise DedimaniaFault(faultCode=data[0][0]['faultCode'], faultString=data[0][0]['faultString'])
                self.retries = 0
                return data[0]
            raise DedimaniaTransportException('Invalid response from dedimania!')
        except (ConnectionError, ReadTimeout, ConnectionRefusedError) as e:
            raise DedimaniaTransportException(e) from e
        except ConnectTimeout as e:
            raise DedimaniaTransportException(e) from e
        except DedimaniaTransportException:
            # Try to setup new session.
            self.retries += 1
            if self.retries > 5:
                raise DedimaniaTransportException('Dedimania didn\'t gave the right answer after few retries!')
            self.client = requests.session()
            try:
                await self.authenticate()
                return await self.execute(method, *args)
            except Exception as e:
                logger.error('XML-RPC Fault retrieved from Dedimania: {}'.format(str(e)))
                handle_exception(e, __name__, 'execute')
                raise DedimaniaTransportException('Could not retrieve data from dedimania!')
        except DedimaniaFault as e:
            if 'Bad SessionId' in e.faultString or ('SessionId' in e.faultString and 'not found' in e.faultString):
                try:
                    self.retries += 1
                    if self.retries > 5:
                        raise DedimaniaTransportException('Max retries reached for reauthenticating with dedimania!')
                    await self.authenticate()
                    return await self.execute(method, *args)
                except:
                    return
            logger.error('XML-RPC Fault retrieved from Dedimania: {}'.format(str(e)))
            handle_exception(e, __name__, 'execute', extra_data={
                'dedimania_retries': self.retries,
            })
            raise DedimaniaTransportException('Could not retrieve data from dedimania!')
项目:feincms3    作者:matthiask    | 项目源码 | 文件源码
def oembed_json(url, cache_failures=True):
    """oembed_json(url, *, cache_failures=True)
    Asks Noembed_ for the embedding HTML code for arbitrary URLs. Sites
    supported include Youtube, Vimeo, Twitter and many others.

    Successful embeds are always cached for 30 days.

    Failures are cached if ``cache_failures`` is ``True`` (the default). The
    durations are as follows:

    - Connection errors are cached 60 seconds with the hope that the connection
      failure is only transient.
    - HTTP errors codes and responses in an unexpected format (no JSON) are
      cached for 24 hours.

    The return value is always a dictionary, but it may be empty.
    """
    # Thundering herd problem etc...
    key = 'oembed-url-%s-data' % md5(url.encode('utf-8')).hexdigest()
    data = cache.get(key)
    if data is not None:
        return data

    try:
        data = requests.get(
            'https://noembed.com/embed',
            params={
                'url': url,
                'nowrap': 'on',
                'maxwidth': 1200,
                'maxheight': 800,
            },
            timeout=2,
        ).json()
    except (requests.ConnectionError, requests.ReadTimeout):
        # Connection failed? Hopefully temporary, try again soon.
        timeout = 60
    except (ValueError, requests.HTTPError):
        # Oof... HTTP error code, or no JSON? Try again tomorrow,
        # and we should really log this.
        timeout = 86400
    else:
        # Perfect, cache for 30 days
        cache.set(key, data, timeout=30 * 86400)
        return data

    if cache_failures:
        cache.set(key, {}, timeout=timeout)
    return {}
项目:SpiderConfig    作者:brady-chen    | 项目源码 | 文件源码
def get_data(self):
        self.get_url_and_html()
        for html in self.list_html:
            try:
                soup = BeautifulSoup(str(html), 'lxml')
                # soup??????????????
                if soup.find("tr").findAll("td"):
                    ip_and_port = (
                        soup.find("tr").findAll("td")[1].get_text() +
                        ":" + soup.find("tr").findAll("td")[2].get_text()
                    )
                    proxies = {
                        "http": ip_and_port,
                        "https": ip_and_port
                    }
                    # ??ip?????????2?
                    response = requests.get(
                        "http://1212.ip138.com/ic.asp",
                        headers=headers,
                        proxies=proxies,
                        timeout=2
                    )
                    if response.status_code == 200:
                        self.ip_and_port = ip_and_port
                        print "ip???????" + self.ip_and_port
                        print "??????:{},?????:{},?????:{},?????:{},?????:{},?????:{}".format(
                            str(soup.find("tr").findAll("td")[3].get_text()).replace("\n", ""),
                            soup.find("tr").findAll("td")[4].get_text(),
                            soup.find("tr").findAll("td")[5].get_text(),
                            soup.find("tr").findAll("td")[6].find({"div", "title"}).attrs["title"],
                            soup.find("tr").findAll("td")[7].find({"div", "title"}).attrs["title"],
                            soup.find("tr").findAll("td")[8].get_text(),
                            soup.find("tr").findAll("td")[9].get_text()
                        )
                        break
                    else:
                        print "http????200"
                        raise requests.ConnectionError
            except requests.ReadTimeout:
                print "?ip??????????????ip"
            except requests.ConnectionError:
                print "?ip????"
            except Exception as e:
                print "??????????:%(errorName)s\n?????:\n%(detailInfo)s" % {
                    "errorName": e, "detailInfo": traceback.format_exc()}
项目:icheckin    作者:chunkhang    | 项目源码 | 文件源码
def checkin(cred, code):
   '''
   -> 0: Successful check-in
   -> 1: No internet connection
   -> 2: Invalid credentials
   -> 3: Not connected to SunwayEdu Wi-Fi
   -> 4: Invalid code
   -> 5: Wrong class
   -> 6: Already checked-in
   '''
   # Start a session
   session = requests.Session()
   # Login to iZone
   payload = {
      'form_action': 'submitted',
      'student_uid': cred[0],
      'password': cred[1],
   }
   try:
      r = session.post(constants.LOGIN, data=payload)
   except requests.ConnectionError:
      return 1
   if not r.history:
      return 2
   # Check for SunwayEdu Wi-Fi
   try:
      r = requests.get(constants.WIFI, timeout=2)
   except requests.ConnectTimeout:
      return 3
   except requests.ConnectionError:
      return 1
   # Check-in with code
   try:
      r = session.post(constants.CHECKIN, data={'checkin_code': code}, 
         timeout=2)
   except (requests.ReadTimeout, requests.ConnectionError):
      return 1
   if 'Checkin code not valid.' in r.text or \
      'The specified URL cannot be found.' in r.text:
      return 4
   if 'You cannot check in to a class you are not a part of.' in r.text:
      return 5
   if 'You have already checked in' in r.text:
      return 6
   return 0