Python requests.exceptions 模块,RequestException() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.exceptions.RequestException()

项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def _call_api(self, api, params=None, request_body=None):
        request_params = params or {}
        if request_body:
            request_params['body'] = request_body
        request = api(**request_params)
        retry_policy = RetryPolicy(
            ExpBackoffPolicy(with_jitter=True),
            max_retry_count=get_config().schematizer_client_max_connection_retry
        )
        response = retry_on_exception(
            retry_policy=retry_policy,
            retry_exceptions=RequestException,
            func_to_retry=self._get_api_result,
            request=request
        )
        return response
项目:danmu-bilibili    作者:saberxxy    | 项目源码 | 文件源码
def _getpage(self, page):
        data = {
            'mid': str(self._mid),
            'pagesize': '30',
            'tid': '0',
            'page': str(page),
            'keyword': '',
            'order': 'senddate',
            '_': '1496812411295'
        }
        # http://space.bilibili.com/ajax/member/getSubmitVideos?mid=15989779
        # &pagesize=30&tid=0&page=1&keyword=&order=senddate&_=1496812411295
        url = "http://space.bilibili.com/ajax/member/getSubmitVideos?" + urlencode(data)
        try:
            response = requests.get(url)
            if response.status_code != 200:
                return None
            html_cont = response.text
            return html_cont
        except RequestException:
            return None
项目:danmu-bilibili    作者:saberxxy    | 项目源码 | 文件源码
def _getpage(self, page):
        data = {
            'mid': str(self._mid),
            'pagesize': '30',
            'tid': '0',
            'page': str(page),
            'keyword': '',
            'order': 'senddate',
            '_': '1496812411295'
        }
        # http://space.bilibili.com/ajax/member/getSubmitVideos?mid=15989779
        # &pagesize=30&tid=0&page=1&keyword=&order=senddate&_=1496812411295
        url = "http://space.bilibili.com/ajax/member/getSubmitVideos?" + urlencode(data)
        try:
            response = requests.get(url)
            if response.status_code != 200:
                return None
            html_cont = response.text
            return html_cont
        except RequestException:
            return None
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def add_txt_record(self, domain, record_name, record_content):
        """
        Add a TXT record using the supplied information.

        :param str domain: The domain to use to look up the managed zone.
        :param str record_name: The record name (typically beginning with '_acme-challenge.').
        :param str record_content: The record content (typically the challenge validation).
        :raises errors.PluginError: if an error occurs communicating with the DNS Provider API
        """
        self._find_domain_id(domain)

        try:
            self.provider.create_record(type='TXT', name=record_name, content=record_content)
        except RequestException as e:
            logger.debug('Encountered error adding TXT record: %s', e, exc_info=True)
            raise errors.PluginError('Error adding TXT record: {0}'.format(e))
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def del_txt_record(self, domain, record_name, record_content):
        """
        Delete a TXT record using the supplied information.

        :param str domain: The domain to use to look up the managed zone.
        :param str record_name: The record name (typically beginning with '_acme-challenge.').
        :param str record_content: The record content (typically the challenge validation).
        :raises errors.PluginError: if an error occurs communicating with the DNS Provider  API
        """
        try:
            self._find_domain_id(domain)
        except errors.PluginError as e:
            logger.debug('Encountered error finding domain_id during deletion: %s', e,
                         exc_info=True)
            return

        try:
            self.provider.delete_record(type='TXT', name=record_name, content=record_content)
        except RequestException as e:
            logger.debug('Encountered error deleting TXT record: %s', e, exc_info=True)
项目:marconibot    作者:s4w3d0ff    | 项目源码 | 文件源码
def _retry(func):
        """ retry decorator """
        @_wraps(func)
        def retrying(*args, **kwargs):
            problems = []
            for delay in _chain(retryDelays, [None]):
                try:
                    # attempt call
                    return func(*args, **kwargs)

                # we need to try again
                except RequestException as problem:
                    problems.append(problem)
                    if delay is None:
                        logger.debug(problems)
                        raise RetryException(
                            'retryDelays exhausted ' + str(problem))
                    else:
                        # log exception and wait
                        logger.debug(problem)
                        logger.info("-- retrying in %ds", delay)
                        sleep(delay)
        return retrying
项目:webspider    作者:GuozhuHe    | 项目源码 | 文件源码
def requests_company_detail_data(company_id):
    """?????????"""
    headers = generate_http_header()
    crawler_sleep()
    try:
        response = requests.get(
            url=constants.COMPANY_DETAIL_URL.format(company_id=company_id),
            headers=headers,
            cookies=Cookies.get_random_cookies(),
            allow_redirects=False,
            timeout=constants.TIMEOUT)
    except RequestException as e:
        logging.error(e)
        raise RequestsError(error_log=e)
    html = etree.HTML(response.text)
    advantage = html.xpath('//div[@id="tags_container"]//li/text()')
    size = html.xpath('//div[@id="basic_container"]//li[3]/span/text()')
    address = html.xpath('//p[@class="mlist_li_desc"]/text()')
    introduce = html.xpath('//span[@class="company_content"]//text()')

    return format_tag(advantage, address, size, introduce, company_id)
项目:webspider    作者:GuozhuHe    | 项目源码 | 文件源码
def request_company_json(url, page_no):
    prams = {
        'first': False,
        'pn': page_no,
        'sortField': 1,
        'havemark': 0,
    }
    headers = generate_http_header()
    crawler_sleep()
    try:
        cookies = Cookies.get_random_cookies()
        response_json = requests.get(
            url=url,
            params=prams,
            headers=headers,
            cookies=cookies,
            allow_redirects=False,
            timeout=constants.TIMEOUT).json()
        if 'totalCount' not in response_json:
            Cookies.remove_cookies(cookies)
            raise RequestsError(error_log='wrong response content')
    except RequestException as e:
        logging.error(e)
        raise RequestsError(error_log=e)
    return response_json
项目:webspider    作者:GuozhuHe    | 项目源码 | 文件源码
def requests_job_detail_data(job_id):
    """?????????"""
    headers = generate_http_header()
    crawler_sleep()
    try:
        response = requests.get(
            url=constants.JOB_DETAIL_URL.format(job_id=job_id),
            headers=headers,
            cookies=Cookies.get_random_cookies(),
            allow_redirects=False,
            timeout=constants.TIMEOUT)
    except RequestException as e:
        logging.error(e)
        raise RequestsError(error_log=e)
    html = etree.HTML(response.text)
    department = html.xpath('//div[@class="job-name"]/div[@class="company"]/text()')
    description = html.xpath('//dd[@class="job_bt"]/div//text()')
    keywords = html.xpath('//dd[@class="job_request"]//li[@class="labels"]/text()')
    return format_tag(department, description, keywords, job_id)
项目:webspider    作者:GuozhuHe    | 项目源码 | 文件源码
def request_job_json(company_id, page_no):
    prams = {
        'companyId': company_id,
        'positionFirstType': u"??",
        'pageNo': page_no,
        'pageSize': 10,
    }
    headers = generate_http_header()
    crawler_sleep()
    try:
        cookies = Cookies.get_random_cookies()
        response_json = requests.get(
            url=constants.COMPANY_JOB_URL,
            params=prams,
            headers=headers,
            cookies=cookies,
            timeout=constants.TIMEOUT).json()
        if 'content' not in response_json:
            Cookies.remove_cookies(cookies)
            raise RequestsError(error_log='wrong response content')
    except RequestException as e:
        logging.error(e)
        raise RequestsError(error_log=e)
    return response_json
项目:directory-ui-buyer    作者:uktrade    | 项目源码 | 文件源码
def store_companies_house_profile_in_session_and_validate(
        session, company_number
):
    try:
        store_companies_house_profile_in_session(
            session=session,
            company_number=company_number,
        )
    except RequestException as error:
        if error.response.status_code == http.client.NOT_FOUND:
            raise ValidationError(validators.MESSAGE_COMPANY_NOT_FOUND)
        else:
            raise ValidationError(validators.MESSAGE_COMPANY_ERROR)
    else:
        company_status = get_company_status_from_session(
            session
        )
        validators.company_active(company_status)
        validators.company_unique(company_number)
项目:directory-ui-buyer    作者:uktrade    | 项目源码 | 文件源码
def test_submit_enrolment_handles_api_company_not_found(client):

    with patch('enrolment.helpers.get_company_from_companies_house') as mock:
        mock.side_effect = RequestException(
            response=Mock(status_code=http.client.NOT_FOUND),
            request=Mock(),
        )

        response = client.get(
            reverse('register-submit'),
            {
                'company_number': '12345678',
                'has_exported_before': 'True',
            }
        )

    assert "Company not found" in str(response.content)
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def add_torrent(self, torrent_data: Union[str, bytes], download_dir: str=None) -> bool:

        self.total_size = 0
        self.expected_torrent_name = ''
        lf = NamedTemporaryFile()
        lf.write(torrent_data)

        params = {'action': 'add-file', 'token': self.token}
        files = {'torrent_file': open(lf.name, 'rb')}
        try:
            response = requests.post(
                self.UTORRENT_URL,
                auth=self.auth,
                params=params,
                files=files,
                timeout=25).json()
            lf.close()
            if 'error' in response:
                return False
            else:
                return True
        except RequestException:
            lf.close()
            return False
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def add_url(self, url: str, download_dir: str=None) -> bool:

        self.total_size = 0
        self.expected_torrent_name = ''

        params = {'action': 'add-url', 'token': self.token, 's': url}
        try:
            response = requests.get(
                self.UTORRENT_URL,
                auth=self.auth,
                # cookies=self.cookies,
                params=params,
                timeout=25).json()
            if 'error' in response:
                return False
            else:
                return True
        except RequestException:
            return False
项目:gopythongo    作者:gopythongo    | 项目源码 | 文件源码
def validate_shared_args(args: configargparse.Namespace) -> None:
    if args.docker_tls_verify:
        if not os.path.isfile(args.docker_tls_verify) or not os.access(args.docker_tls_verify, os.R_OK):
            raise ErrorMessage("File not found: %s (or not readable)" % highlight(args.docker_tls_verify))

    if args.docker_tls_client_cert:
        if not os.path.isfile(args.docker_tls_client_cert) or not os.access(args.docker_tls_client_cert, os.R_OK):
            raise ErrorMessage("File not found: %s (or not readable)" % highlight(args.docker_tls_client_cert))

    try:
        x = int(args.docker_ssl_version)
        if x < 1 or x > 5:
            raise ErrorMessage("Unknown value %s for SSL Protocol version. Valid are values 1-5." %
                               args.docker_ssl_version)
    except ValueError:
        raise ErrorMessage("Parameter to --docker-ssl-version must be an integer between 1 and 5")

    dcl = get_docker_client(args)
    try:
        info = dcl.info()
    except RequestException as e:
        raise ErrorMessage("GoPythonGo can't talk to the Docker API at %s (Error was: %s)" %
                           (highlight(args.docker_api), str(e))) from e
项目:ras-backstage    作者:ONSdigital    | 项目源码 | 文件源码
def request_handler(method, url, params=None, auth=None, headers=None, json=None, data=None):
    try:
        if method == 'POST':
            response = requests.post(url, params=params, auth=auth, headers=headers,
                                     json=json, data=data)
        elif method == 'PUT':
            response = requests.put(url, params=params, auth=auth, headers=headers,
                                    json=json, data=data)
        else:
            response = requests.get(url, params=params, auth=auth, headers=headers)
    except RequestException as e:
        logger.exception('Failed to connect to external service',
                         method=method, url=url, exception=str(e))
        raise ApiError(url)

    logger.debug('Request response', method=method, url=url, status=response.status_code)
    return response
项目:CommunityCellularManager    作者:facebookincubator    | 项目源码 | 文件源码
def _send_cloud_req(req_method, req_path, err_prefix, **kwargs):
    url = conf['registry'] + req_path
    err = None
    try:
        r = req_method(url, **kwargs)
        if r.status_code == 200:
            return json.loads(r.text)
        else:
            err = RegistrationServerError(r, err_prefix)
    except socket.error as ex:
        err = RegistrationClientError(('socket error connecting to %s' %
                                       (url, )),
                                      ex, err_prefix)
    except RequestException as ex:
        err = RegistrationClientError('request to %s failed' % (url, ),
                                      ex, err_prefix)
    raise err
项目:bumblebee-status    作者:tobi-wan-kenobi    | 项目源码 | 文件源码
def getfromkrak(coin, currency):
    abbrev = {
        "Btc": ["xbt", "XXBTZ"],
        "Eth": ["eth", "XETHZ"],
        "Ltc": ["ltc", "XLTCZ"],
    }
    data = abbrev.get(coin, None)
    if not data: return
    epair = "{}{}".format(data[0], currency)
    tickname = "{}{}".format(data[1], currency.upper())
    try:
        krakenget = requests.get('https://api.kraken.com/0/public/Ticker?pair='+epair).json()
    except (RequestException, Exception):
        return "No connection"
    kethusdask = float(krakenget['result'][tickname]['a'][0])
    kethusdbid = float(krakenget['result'][tickname]['b'][0])
    return coin+": "+str((kethusdask+kethusdbid)/2)[0:6]
项目:bumblebee-status    作者:tobi-wan-kenobi    | 项目源码 | 文件源码
def update(self, widgets):
        timestamp = int(time.time())
        if self._nextcheck < timestamp:
            try:
                self._nextcheck = timestamp + self._interval*60
                weather_url = "http://api.openweathermap.org/data/2.5/weather?appid={}".format(self._apikey)
                weather_url = "{}&units={}".format(weather_url, self._unit)
                if self._location == "auto":
                    location_url = "http://ipinfo.io/json"
                    location = json.loads(requests.get(location_url).text)
                    coord = location["loc"].split(",")
                    self._city = location["city"]
                    weather_url = "{url}&lat={lat}&lon={lon}".format(url=weather_url, lat=coord[0], lon=coord[1])
                else:
                    weather_url = "{url}&q={city}".format(url=weather_url, city=self._location)
                weather = json.loads(requests.get(weather_url).text)
                self._temperature = int(weather['main']['temp'])
                self._weather = weather['weather'][0]['main'].lower()
                self._valid = True
            except RequestException:
                self._valid = False
            except Exception:
                self._valid = False

# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
项目:kolibri    作者:learningequality    | 项目源码 | 文件源码
def handle(self, *args, **options):

        interval = float(options.get("interval", DEFAULT_PING_INTERVAL))
        checkrate = float(options.get("checkrate", DEFAULT_PING_CHECKRATE))
        server = options.get("server", DEFAULT_PING_SERVER_URL)

        self.started = datetime.now()

        while True:
            try:
                logging.info("Attempting a ping.")
                data = self.perform_ping(server)
                logging.info("Ping succeeded! (response: {}) Sleeping for {} minutes.".format(data, interval))
                time.sleep(interval * 60)
                continue
            except ConnectionError:
                logging.warn("Ping failed (could not connect). Trying again in {} minutes.".format(checkrate))
            except Timeout:
                logging.warn("Ping failed (connection timed out). Trying again in {} minutes.".format(checkrate))
            except RequestException as e:
                logging.warn("Ping failed ({})! Trying again in {} minutes.".format(e, checkrate))
            time.sleep(checkrate * 60)
项目:knesset-data-pipelines    作者:hasadna    | 项目源码 | 文件源码
def _save_url(self, url, bucket, object_name, num_retries, seconds_between_retries, retry_num=1):
        try:
            res = self._reuqests_get(url)
        except RequestException as e:
            if retry_num < num_retries:
                logging.exception(e)
                logging.info("retry {} / {}, waiting {} seconds before retrying...".format(retry_num, num_retries,
                                                                                           seconds_between_retries))
                time.sleep(seconds_between_retries)
                return self._save_url(url, bucket, object_name, num_retries, seconds_between_retries, retry_num + 1)
            else:
                raise
        if res.status_code == 200:
            object_storage.write(self.s3, bucket, object_name, res.content)
            return True
        else:
            return False
项目:knesset-data-pipelines    作者:hasadna    | 项目源码 | 文件源码
def _save_url(self, url, bucket, object_name, num_retries, seconds_between_retries, retry_num=1):
        try:
            res = self._reuqests_get(url)
        except RequestException as e:
            if retry_num < num_retries:
                logging.exception(e)
                logging.info("retry {} / {}, waiting {} seconds before retrying...".format(retry_num, num_retries, seconds_between_retries))
                time.sleep(seconds_between_retries)
                return self._save_url(url, bucket, object_name, num_retries, seconds_between_retries, retry_num+1)
            else:
                raise
        if res.status_code == 200:
            object_storage.write(self.s3, bucket, object_name, res.content, public_bucket=True)
            return True
        else:
            return False
项目:antioch    作者:pytube    | 项目源码 | 文件源码
def download_other(obj, timeout=0, chunk_size=config.DEFAULT_CHUNKSIZE):
    url = obj.get('url', None)
    if url is None:
        return None

    logger.info('downloading video: ' + url)

    try:
        response = requests.get(url, stream=True, timeout=timeout)

    except RequestErrors.RequestException as e:
        logger.error(e)
        return None

    else:
        filename = gen_filename(config.DROP_FOLDER_LOCATION, extension='movie')
        with open(filename, 'wb') as out_file:
            for chunk in response.iter_content(chunk_size=chunk_size):
                out_file.write(chunk)
        return filename
项目:python3_crawl    作者:princewen    | 项目源码 | 文件源码
def get_page_index(offset,keyword):
    data = {
        'offset':offset,
        'format':'json',
        'keyword':keyword,
        'autoload':'true',
        'count':'20',
        'cur_tab':3
    }
    url = 'http://www.toutiao.com/search_content?'+urlencode(data)
    try:
        response = requests.get(url)
        if response.status_code==200:
            return response.text
        return None
    except RequestException:
        print ("??????")
        return None
项目:merge-bot    作者:jasonkuster    | 项目源码 | 文件源码
def fetch_prs(self):
        """Gets the PRs for the configured repository.

        Returns:
            A tuple of (pr_list, error).
            pr_list is an iterable of GithubPRs if successful, otherwise empty.
            error is None if successful, or the error message otherwise.
        """
        try:
            prs = self.get(GITHUB_PULLS_ENDPOINT)
            return [GithubPR(self, pr) for pr in prs], None
        except HTTPError as exc:
            return [], 'Non-200 HTTP Code Received: {}'.format(exc)
        except Timeout as exc:
            return [], 'Request timed out: {}'.format(exc)
        except RequestException as exc:
            return [], 'Catastrophic error in requests: {}'.format(exc)
项目:merge-bot    作者:jasonkuster    | 项目源码 | 文件源码
def get(self, endpoint):
        """get makes a GET request against a specified endpoint.

        Args:
            endpoint: URL to which to make the GET request. URL is relative
            to https://api.github.com/repos/{self.org}/{self.repo}/
        Returns:
            JSON object retrieved from the endpoint.
        Raises:
            HTTPError: if we get an HTTP error status.
            Timeout: for timeouts.
            RequestException: for other assorted exceptional cases.
        """
        url = urlparse.urljoin(self.repo_url, endpoint)
        resp = requests.get(url, auth=(BOT_NAME, self.bot_key))
        if resp.status_code is 200:
            return resp.json()
        resp.raise_for_status()
项目:esdc-ce    作者:erigones    | 项目源码 | 文件源码
def task_user_callback_cb(task_id, parent_task_id, cb, **kwargs):
    """
    Task for calling remote url in user defined callback
    """
    try:
        obj = get_vms_object(kwargs)
    except ObjectDoesNotExist:
        obj = None

    user = User.objects.get(id=user_id_from_task_id(parent_task_id))
    payload, status = get_task_status(parent_task_id)

    try:
        response = UserCallback(parent_task_id).request(cb, user.callback_key, payload)
    except RequestException as ex:
        status = states.FAILURE
        details = ex
    else:
        status = states.SUCCESS
        details = str(response.status_code) + ': ' + response.reason

    if cb.get('cb_log'):
        task_log(parent_task_id, LOG_REMOTE_CALLBACK, obj=obj, task_status=status, detail=details)
项目:pandahouse    作者:kszucs    | 项目源码 | 文件源码
def execute(query, connection=None, data=None, external=None, stream=False):
    host, params, files = prepare(query, connection, external=external)

    response = requests.post(host, params=params, data=data,
                             stream=stream, files=files)

    try:
        response.raise_for_status()
    except RequestException as e:
        if response.content:
            raise ClickhouseException(response.content)
        else:
            raise e

    if stream:
        return response.raw
    else:
        return response.content
项目:spider_python3_scrapy    作者:chaiwenjun000    | 项目源码 | 文件源码
def get_page_index(offset, keyword):
    data = {
        'offset':offset,
        'format':'json',
        'keyword':keyword,
        'autoload':'true',
        'count':20,
        'cur_tab':3
    }
    url = 'http://www.toutiao.com/search_content/?' + urlencode(data)
    print('([%d] ??????? %s' % (os.getpid(), url))
    try:
        response = requests.get(url)
        if response.status_code == 200:
            return response.text
        return None
    except RequestException:
        print('???????', url)
        return None
项目:inspace    作者:mochaoss    | 项目源码 | 文件源码
def get_site_description(url):
    site_content = description = ''
    try:
        site_content = requests.get(url, timeout=1).content
    except RequestException:
        logger.warn('Connection error trying to connect to {}'.format(url))

    if site_content:
        target_markup = BeautifulSoup(site_content, 'html.parser')
        meta_description = target_markup.head.find('meta', {'name': 'description'})
        if meta_description:
            description = meta_description.get('content')
        else:
            first_paragraph = target_markup.find('p')
            if first_paragraph:
                description = first_paragraph.string
    return description
项目:elastalert-ui    作者:steelheaddigital    | 项目源码 | 文件源码
def alert(self, matches):
        body = self.create_alert_body(matches)

        # post to pagerduty
        headers = {'content-type': 'application/json'}
        payload = {
            'service_key': self.pagerduty_service_key,
            'description': self.rule['name'],
            'event_type': 'trigger',
            'incident_key': self.pagerduty_incident_key,
            'client': self.pagerduty_client_name,
            'details': {
                "information": body.encode('UTF-8'),
            },
        }

        # set https proxy, if it was provided
        proxies = {'https': self.pagerduty_proxy} if self.pagerduty_proxy else None
        try:
            response = requests.post(self.url, data=json.dumps(payload, cls=DateTimeEncoder, ensure_ascii=False), headers=headers, proxies=proxies)
            response.raise_for_status()
        except RequestException as e:
            raise EAException("Error posting to pagerduty: %s" % e)
        elastalert_logger.info("Trigger sent to PagerDuty")
项目:elastalert-ui    作者:steelheaddigital    | 项目源码 | 文件源码
def alert(self, matches):
        body = self.create_alert_body(matches)

        # post to victorops
        headers = {'content-type': 'application/json'}
        # set https proxy, if it was provided
        proxies = {'https': self.victorops_proxy} if self.victorops_proxy else None
        payload = {
            "message_type": self.victorops_message_type,
            "entity_display_name": self.victorops_entity_display_name,
            "monitoring_tool": "ElastAlert",
            "state_message": body
        }

        try:
            response = requests.post(self.url, data=json.dumps(payload, cls=DateTimeEncoder), headers=headers, proxies=proxies)
            response.raise_for_status()
        except RequestException as e:
            raise EAException("Error posting to VictorOps: %s" % e)
        elastalert_logger.info("Trigger sent to VictorOps")
项目:swagger-aggregator    作者:Trax-air    | 项目源码 | 文件源码
def get_aggregate_swagger(self):
        """Get swagger files associated with the aggregates.

        Returns:
            A dict of swagger spec.
        """
        if 'apis' in self.yaml_file:  # Check if apis is in the config file
            for api_name, api_url in self.yaml_file['apis'].items():
                if api_name not in self.swagger_apis:
                    # Get the swagger.json
                    try:
                        self.swagger_apis[api_name] = {'spec': self.get_swagger_from_url(api_url),
                                                       'url': self.parse_value(api_url)}
                        self.errors.remove(api_url)
                    except (JSONDecodeError, RequestException) as exc:
                        if api_url not in self.errors:
                            self.errors.append(api_url)
                        logger.warning(u'Cannot get swagger from {0}: {1}'.format(api_url, repr(exc)))
                    except ValueError:
                        logger.info(u'Cannot remove {0} from errors'.format(api_url))
        return self.swagger_apis
项目:cryptocoin-quotes    作者:Sayan98    | 项目源码 | 文件源码
def get_ticker(self):
        if (self._last_update is None) or (self._time_since_last_update() >= self.update_interval) \
                or self._cached_response is None:
            # First run
            # 'or'
            # Limits for the API
            # Please limit requests to no more than 10 per minute.
            # Endpoints update every 5 minutes.
            try:
                self._cached_response = requests.get(self._url + self._currency).json()
                self._last_update = datetime.datetime.utcnow()
                self.internet_available = True
            except (RequestException, ):
                if self._cached_response is None:
                    self.internet_available = False
                pass
项目:python-sdk    作者:optimizely    | 项目源码 | 文件源码
def test_dispatch_event__handle_request_exception(self):
    """ Test that dispatch event handles exceptions and logs error. """

    url = 'https://www.optimizely.com'
    params = {
      'accountId': '111001',
      'eventName': 'test_event',
      'eventEntityId': '111028',
      'visitorId': 'oeutest_user'
    }
    event = event_builder.Event(url, params, http_verb='POST', headers={'Content-Type': 'application/json'})

    with mock.patch('requests.post',
                    side_effect=request_exception.RequestException('Failed Request')) as mock_request_post,\
        mock.patch('logging.error') as mock_log_error:
      event_dispatcher.EventDispatcher.dispatch_event(event)

    mock_request_post.assert_called_once_with(url, data=json.dumps(params),
                                              headers={'Content-Type': 'application/json'},
                                              timeout=event_dispatcher.REQUEST_TIMEOUT)
    mock_log_error.assert_called_once_with('Dispatch event failed. Error: Failed Request')
项目:iris    作者:linkedin    | 项目源码 | 文件源码
def call_oncall(self, url):
        url = str(self.endpoint + url)
        try:
            r = self.requests.get(url)
        except RequestException:
            logger.exception('Failed hitting oncall-api for url "%s"', url)
            return None

        if r.status_code != 200:
            logger.error('Invalid response from oncall-api for URL "%s". Code: %s. Content: "%s"', url, r.status_code, r.content)
            return None

        try:
            return r.json()
        except ValueError:
            logger.exception('Failed decoding json from oncall-api. URL: "%s" Code: %s', url, r.status_code)
            return None
项目:iris    作者:linkedin    | 项目源码 | 文件源码
def send_metrics(self, metrics):
        if not self.enable_metrics:
            return
        now = str(datetime.now())
        payload = []
        for metric, value in metrics.iteritems():
            data = {
                'measurement': self.appname + '_' + metric,
                'tags': {},
                'time': now,
                'fields': {
                    'value': value
                }
            }
            if self.extra_tags:
                data['tags'].update(self.extra_tags)
            payload.append(data)

        try:
            self.client.write_points(payload)
        except (RequestException, InfluxDBClientError, InfluxDBServerError):
            logger.exception('Failed to send metrics to influxdb')
项目:TradingTensors    作者:Henry-bee    | 项目源码 | 文件源码
def get_instrument_precision(self, INSTRUMENT):

        QUERY_URL = self.DEFAULT_URL + '/v3/accounts/{}/instruments'.format(ID)

        params = {
            'instruments': INSTRUMENT
        }

        try:
            response = requests.get(QUERY_URL, headers=DEFAULT_HEADERS, params=params).json()
        except RequestException as e:
            print ("Error while retrieving instrument information: %s"%e)
            return None

        pip_base = response['instruments'][0]['pipLocation']

        return 10 ** pip_base
项目:edd    作者:JBEI    | 项目源码 | 文件源码
def link_ice_entry_to_study(self, user_token, strain, study):
    """
    Task runs the code to register a link between an ICE entry and an EDD study.

    :param user_token: the token used to identify a user to ICE
    :param strain: the primary key of the EDD main.models.Strain in the link
    :param study: the primary key of the EDD main.models.Study in the link
    :throws Exception: for any errors other than communication errors to ICE instance
    """
    # check that strain and study are still linked
    query = models.Strain.objects.filter(pk=strain, line__study__pk=study)
    if query.exists():
        try:
            ice = create_ice_connection(user_token)
            record = query.annotate(
                study_slug=F('line__study__slug'),
                study_name=F('line__study__name'),
            ).distinct().get()
            url = build_study_url(record.study_slug)
            ice.link_entry_to_study(record.registry_id, study, url, record.study_name)
        except RequestException as e:
            # Retry when there are errors communicating with ICE
            raise self.retry(exc=e, countdown=delay_calculation(self), max_retries=10)
        except Exception as e:
            raise e
项目:edd    作者:JBEI    | 项目源码 | 文件源码
def unlink_ice_entry_from_study(self, user_token, strain, study):
    """
    Task runs the code to de-register a link between an ICE entry and an EDD study.

    :param user_token: the token used to identify a user to ICE
    :param strain: the primary key of the EDD main.models.Strain in the former link
    :param study: the primary key of the EDD main.models.Study in the former link
    :throws Exception: for any errors other than communication errors to ICE instance
    """
    query = models.Strain.objects.filter(pk=strain, line__study__pk=study)
    if not query.exists():
        try:
            ice = create_ice_connection(user_token)
            record = models.Strain.objects.get(pk=strain)
            study_obj = models.Study.objects.get(pk=study)
            url = build_study_url(study_obj.slug)
            ice.unlink_entry_from_study(record.registry_id, study, url)
        except RequestException as e:
            # Retry when there are errors communicating with ICE
            raise self.retry(exc=e, countdown=delay_calculation(self), max_retries=10)
        except Exception as e:
            raise e
项目:spider    作者:devzhan    | 项目源码 | 文件源码
def get_page_index(offset,keyword ):
    data={
        "offset":offset,
        "format":"json",
        "keyword":keyword,
        "autoload":"true",
        "count":20,
        "cur_tab":3
    }
    url="http://www.toutiao.com/search_content/?"+urlencode(data)
    response=requests.get(url)
    try:
        if response.status_code==200:
            return response.text
        else:
            return None
    except RequestException:
        print("????")
        return None
项目:orange3-imageanalytics    作者:biolab    | 项目源码 | 文件源码
def _load_image_from_url_or_local_path(self, file_path):
        urlparts = urlparse(file_path)
        if urlparts.scheme in ('http', 'https'):
            try:
                file = self._session.get(file_path, stream=True).raw
            except RequestException:
                log.warning("Image skipped", exc_info=True)
                return None
        elif urlparts.scheme in ("ftp", "data"):
            try:
                file = urlopen(file_path)
            except (URLError, ) + ftplib.all_errors:
                log.warning("Image skipped", exc_info=True)
                return None
        else:
            file = file_path

        try:
            return open_image(file)
        except (IOError, ValueError):
            log.warning("Image skipped", exc_info=True)
            return None
项目:qgis-mapstory-plugin    作者:boundlessgeo    | 项目源码 | 文件源码
def search(self):
        text = self.searchBox.text().strip()
        if text:
            try:
                r = execute(lambda: requests.get("http://mapstory.org/api/base/search", params={"type__in":"map", "limit":50, "q": text}))
                r.raise_for_status()
                mapstories = r.json()["objects"]
                if mapstories:
                    s = "<div><ul>"
                    for mapstory in mapstories:
                        link = "<a href='%s' class='button'>Open</a>" % (mapstory["id"])
                        s += "<li>%s <h3>%s</h3> %s <br> </li>" % (link, mapstory["title"], mapstory["abstract"])
                    s += "</ul></div>"
                else:
                    s = "<h2>No maps matching your search criteria were found.</h2>"
                self.browser.setHtml(s)
            except RequestException as e:
                    QMessageBox.warning(self, "Search",
                        u"There has been a problem performing the search:\n" + str(e.args[0]),
                        QMessageBox.Ok)
项目:Tattle    作者:nickmaccarthy    | 项目源码 | 文件源码
def fire(self):
        headers = {'content-type': 'application/json'}
        payload = {
            'service_key': self.pagerduty_service_key,
            'description': self.title,
            'event_type': 'trigger',
            'incident_key': tattle.make_md5(self.title),
            'client': self.pagerduty_client_name,
            'client_url': self.client_url,
            'details': {
                'alert-info': self.alert,
                'matches': self.matches
            },
        }

        try:
            response = requests.post(self.url, data=json.dumps(payload, ensure_ascii=False), headers=headers)
            response.raise_for_status()
        except RequestException as e:
            #raise RequestException("Error posting to pagerduty: %s" % e)
            logger.error('Error posting message to pagerduty: %s' % e)

        self.firemsg = """msg="{}", tale_name="{}", title="{}" """.format("PagerDuty Alert Sent", self.alert.get('name'), self.title)
项目:netease-dl    作者:ziwenxie    | 项目源码 | 文件源码
def login(method):
    """Require user to login."""

    def wrapper(*args, **kwargs):
        crawler = args[0].crawler  # args[0] is a NetEase object

        try:
            if os.path.isfile(cookie_path):
                with open(cookie_path, 'r') as cookie_file:
                    cookie = cookie_file.read()
                expire_time = re.compile(r'\d{4}-\d{2}-\d{2}').findall(cookie)
                now = time.strftime('%Y-%m-%d', time.localtime(time.time()))
                if expire_time[0] > now:
                    crawler.session.cookies.load()
                else:
                    crawler.login()
            else:
                crawler.login()
        except RequestException:
            click.echo('Maybe password error, please try again.')
            sys.exit(1)
        result = method(*args, **kwargs)
        return result

    return wrapper
项目:netease-dl    作者:ziwenxie    | 项目源码 | 文件源码
def download_song_by_id(self, song_id, song_name, folder='.'):
        """Download a song by id and save it to disk.

        :params song_id: song id.
        :params song_name: song name.
        :params folder: storage path.
        """

        try:
            url = self.crawler.get_song_url(song_id)
            if self.lyric:
                # use old api
                lyric_info = self.crawler.get_song_lyric(song_id)
            else:
                lyric_info = None
            song_name = song_name.replace('/', '')
            song_name = song_name.replace('.', '')
            self.crawler.get_song_by_url(url, song_name, folder, lyric_info)
        except RequestException as exception:
            click.echo(exception)
项目:netease-dl    作者:ziwenxie    | 项目源码 | 文件源码
def download_artist_by_id(self, artist_id, artist_name):
        """Download a artist's top50 songs by his/her id.

        :params artist_id: artist id.
        :params artist_name: artist name.
        """

        try:
            # use old api
            songs = self.crawler.get_artists_hot_songs(artist_id)
        except RequestException as exception:
            click.echo(exception)
        else:
            folder = os.path.join(self.folder, artist_name)
            for song in songs:
                self.download_song_by_id(song.song_id, song.song_name, folder)
项目:netease-dl    作者:ziwenxie    | 项目源码 | 文件源码
def download_playlist_by_id(self, playlist_id, playlist_name):
        """Download a playlist's songs by its id.

        :params playlist_id: playlist id.
        :params playlist_name: playlist name.
        """

        try:
            songs = self.crawler.get_playlist_songs(
                playlist_id)
        except RequestException as exception:
            click.echo(exception)
        else:
            folder = os.path.join(self.folder, playlist_name)
            for song in songs:
                self.download_song_by_id(song.song_id, song.song_name, folder)
项目:netease-dl    作者:ziwenxie    | 项目源码 | 文件源码
def exception_handle(method):
    """Handle exception raised by requests library."""

    def wrapper(*args, **kwargs):
        try:
            result = method(*args, **kwargs)
            return result
        except ProxyError:
            LOG.exception('ProxyError when try to get %s.', args)
            raise ProxyError('A proxy error occurred.')
        except ConnectionException:
            LOG.exception('ConnectionError when try to get %s.', args)
            raise ConnectionException('DNS failure, refused connection, etc.')
        except Timeout:
            LOG.exception('Timeout when try to get %s', args)
            raise Timeout('The request timed out.')
        except RequestException:
            LOG.exception('RequestException when try to get %s.', args)
            raise RequestException('Please check out your network.')

    return wrapper
项目:money-to-prisoners-send-money    作者:ministryofjustice    | 项目源码 | 文件源码
def get_govuk_payment(self, govuk_id):
        response = requests.get(
            govuk_url('/payments/%s' % govuk_id),
            headers=govuk_headers(),
            timeout=15
        )

        if response.status_code != 200:
            if response.status_code == 404:
                return None
            raise RequestException(
                'Unexpected status code: %s' % response.status_code,
                response=response
            )
        try:
            data = response.json()
            try:
                validate_email(data.get('email'))
            except ValidationError:
                data['email'] = None
            return data
        except (ValueError, KeyError):
            raise RequestException('Cannot parse response', response=response)