我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.exceptions.RequestException()。
def _call_api(self, api, params=None, request_body=None): request_params = params or {} if request_body: request_params['body'] = request_body request = api(**request_params) retry_policy = RetryPolicy( ExpBackoffPolicy(with_jitter=True), max_retry_count=get_config().schematizer_client_max_connection_retry ) response = retry_on_exception( retry_policy=retry_policy, retry_exceptions=RequestException, func_to_retry=self._get_api_result, request=request ) return response
def _getpage(self, page): data = { 'mid': str(self._mid), 'pagesize': '30', 'tid': '0', 'page': str(page), 'keyword': '', 'order': 'senddate', '_': '1496812411295' } # http://space.bilibili.com/ajax/member/getSubmitVideos?mid=15989779 # &pagesize=30&tid=0&page=1&keyword=&order=senddate&_=1496812411295 url = "http://space.bilibili.com/ajax/member/getSubmitVideos?" + urlencode(data) try: response = requests.get(url) if response.status_code != 200: return None html_cont = response.text return html_cont except RequestException: return None
def add_txt_record(self, domain, record_name, record_content): """ Add a TXT record using the supplied information. :param str domain: The domain to use to look up the managed zone. :param str record_name: The record name (typically beginning with '_acme-challenge.'). :param str record_content: The record content (typically the challenge validation). :raises errors.PluginError: if an error occurs communicating with the DNS Provider API """ self._find_domain_id(domain) try: self.provider.create_record(type='TXT', name=record_name, content=record_content) except RequestException as e: logger.debug('Encountered error adding TXT record: %s', e, exc_info=True) raise errors.PluginError('Error adding TXT record: {0}'.format(e))
def del_txt_record(self, domain, record_name, record_content): """ Delete a TXT record using the supplied information. :param str domain: The domain to use to look up the managed zone. :param str record_name: The record name (typically beginning with '_acme-challenge.'). :param str record_content: The record content (typically the challenge validation). :raises errors.PluginError: if an error occurs communicating with the DNS Provider API """ try: self._find_domain_id(domain) except errors.PluginError as e: logger.debug('Encountered error finding domain_id during deletion: %s', e, exc_info=True) return try: self.provider.delete_record(type='TXT', name=record_name, content=record_content) except RequestException as e: logger.debug('Encountered error deleting TXT record: %s', e, exc_info=True)
def _retry(func): """ retry decorator """ @_wraps(func) def retrying(*args, **kwargs): problems = [] for delay in _chain(retryDelays, [None]): try: # attempt call return func(*args, **kwargs) # we need to try again except RequestException as problem: problems.append(problem) if delay is None: logger.debug(problems) raise RetryException( 'retryDelays exhausted ' + str(problem)) else: # log exception and wait logger.debug(problem) logger.info("-- retrying in %ds", delay) sleep(delay) return retrying
def requests_company_detail_data(company_id): """?????????""" headers = generate_http_header() crawler_sleep() try: response = requests.get( url=constants.COMPANY_DETAIL_URL.format(company_id=company_id), headers=headers, cookies=Cookies.get_random_cookies(), allow_redirects=False, timeout=constants.TIMEOUT) except RequestException as e: logging.error(e) raise RequestsError(error_log=e) html = etree.HTML(response.text) advantage = html.xpath('//div[@id="tags_container"]//li/text()') size = html.xpath('//div[@id="basic_container"]//li[3]/span/text()') address = html.xpath('//p[@class="mlist_li_desc"]/text()') introduce = html.xpath('//span[@class="company_content"]//text()') return format_tag(advantage, address, size, introduce, company_id)
def request_company_json(url, page_no): prams = { 'first': False, 'pn': page_no, 'sortField': 1, 'havemark': 0, } headers = generate_http_header() crawler_sleep() try: cookies = Cookies.get_random_cookies() response_json = requests.get( url=url, params=prams, headers=headers, cookies=cookies, allow_redirects=False, timeout=constants.TIMEOUT).json() if 'totalCount' not in response_json: Cookies.remove_cookies(cookies) raise RequestsError(error_log='wrong response content') except RequestException as e: logging.error(e) raise RequestsError(error_log=e) return response_json
def requests_job_detail_data(job_id): """?????????""" headers = generate_http_header() crawler_sleep() try: response = requests.get( url=constants.JOB_DETAIL_URL.format(job_id=job_id), headers=headers, cookies=Cookies.get_random_cookies(), allow_redirects=False, timeout=constants.TIMEOUT) except RequestException as e: logging.error(e) raise RequestsError(error_log=e) html = etree.HTML(response.text) department = html.xpath('//div[@class="job-name"]/div[@class="company"]/text()') description = html.xpath('//dd[@class="job_bt"]/div//text()') keywords = html.xpath('//dd[@class="job_request"]//li[@class="labels"]/text()') return format_tag(department, description, keywords, job_id)
def request_job_json(company_id, page_no): prams = { 'companyId': company_id, 'positionFirstType': u"??", 'pageNo': page_no, 'pageSize': 10, } headers = generate_http_header() crawler_sleep() try: cookies = Cookies.get_random_cookies() response_json = requests.get( url=constants.COMPANY_JOB_URL, params=prams, headers=headers, cookies=cookies, timeout=constants.TIMEOUT).json() if 'content' not in response_json: Cookies.remove_cookies(cookies) raise RequestsError(error_log='wrong response content') except RequestException as e: logging.error(e) raise RequestsError(error_log=e) return response_json
def store_companies_house_profile_in_session_and_validate( session, company_number ): try: store_companies_house_profile_in_session( session=session, company_number=company_number, ) except RequestException as error: if error.response.status_code == http.client.NOT_FOUND: raise ValidationError(validators.MESSAGE_COMPANY_NOT_FOUND) else: raise ValidationError(validators.MESSAGE_COMPANY_ERROR) else: company_status = get_company_status_from_session( session ) validators.company_active(company_status) validators.company_unique(company_number)
def test_submit_enrolment_handles_api_company_not_found(client): with patch('enrolment.helpers.get_company_from_companies_house') as mock: mock.side_effect = RequestException( response=Mock(status_code=http.client.NOT_FOUND), request=Mock(), ) response = client.get( reverse('register-submit'), { 'company_number': '12345678', 'has_exported_before': 'True', } ) assert "Company not found" in str(response.content)
def add_torrent(self, torrent_data: Union[str, bytes], download_dir: str=None) -> bool: self.total_size = 0 self.expected_torrent_name = '' lf = NamedTemporaryFile() lf.write(torrent_data) params = {'action': 'add-file', 'token': self.token} files = {'torrent_file': open(lf.name, 'rb')} try: response = requests.post( self.UTORRENT_URL, auth=self.auth, params=params, files=files, timeout=25).json() lf.close() if 'error' in response: return False else: return True except RequestException: lf.close() return False
def add_url(self, url: str, download_dir: str=None) -> bool: self.total_size = 0 self.expected_torrent_name = '' params = {'action': 'add-url', 'token': self.token, 's': url} try: response = requests.get( self.UTORRENT_URL, auth=self.auth, # cookies=self.cookies, params=params, timeout=25).json() if 'error' in response: return False else: return True except RequestException: return False
def validate_shared_args(args: configargparse.Namespace) -> None: if args.docker_tls_verify: if not os.path.isfile(args.docker_tls_verify) or not os.access(args.docker_tls_verify, os.R_OK): raise ErrorMessage("File not found: %s (or not readable)" % highlight(args.docker_tls_verify)) if args.docker_tls_client_cert: if not os.path.isfile(args.docker_tls_client_cert) or not os.access(args.docker_tls_client_cert, os.R_OK): raise ErrorMessage("File not found: %s (or not readable)" % highlight(args.docker_tls_client_cert)) try: x = int(args.docker_ssl_version) if x < 1 or x > 5: raise ErrorMessage("Unknown value %s for SSL Protocol version. Valid are values 1-5." % args.docker_ssl_version) except ValueError: raise ErrorMessage("Parameter to --docker-ssl-version must be an integer between 1 and 5") dcl = get_docker_client(args) try: info = dcl.info() except RequestException as e: raise ErrorMessage("GoPythonGo can't talk to the Docker API at %s (Error was: %s)" % (highlight(args.docker_api), str(e))) from e
def request_handler(method, url, params=None, auth=None, headers=None, json=None, data=None): try: if method == 'POST': response = requests.post(url, params=params, auth=auth, headers=headers, json=json, data=data) elif method == 'PUT': response = requests.put(url, params=params, auth=auth, headers=headers, json=json, data=data) else: response = requests.get(url, params=params, auth=auth, headers=headers) except RequestException as e: logger.exception('Failed to connect to external service', method=method, url=url, exception=str(e)) raise ApiError(url) logger.debug('Request response', method=method, url=url, status=response.status_code) return response
def _send_cloud_req(req_method, req_path, err_prefix, **kwargs): url = conf['registry'] + req_path err = None try: r = req_method(url, **kwargs) if r.status_code == 200: return json.loads(r.text) else: err = RegistrationServerError(r, err_prefix) except socket.error as ex: err = RegistrationClientError(('socket error connecting to %s' % (url, )), ex, err_prefix) except RequestException as ex: err = RegistrationClientError('request to %s failed' % (url, ), ex, err_prefix) raise err
def getfromkrak(coin, currency): abbrev = { "Btc": ["xbt", "XXBTZ"], "Eth": ["eth", "XETHZ"], "Ltc": ["ltc", "XLTCZ"], } data = abbrev.get(coin, None) if not data: return epair = "{}{}".format(data[0], currency) tickname = "{}{}".format(data[1], currency.upper()) try: krakenget = requests.get('https://api.kraken.com/0/public/Ticker?pair='+epair).json() except (RequestException, Exception): return "No connection" kethusdask = float(krakenget['result'][tickname]['a'][0]) kethusdbid = float(krakenget['result'][tickname]['b'][0]) return coin+": "+str((kethusdask+kethusdbid)/2)[0:6]
def update(self, widgets): timestamp = int(time.time()) if self._nextcheck < timestamp: try: self._nextcheck = timestamp + self._interval*60 weather_url = "http://api.openweathermap.org/data/2.5/weather?appid={}".format(self._apikey) weather_url = "{}&units={}".format(weather_url, self._unit) if self._location == "auto": location_url = "http://ipinfo.io/json" location = json.loads(requests.get(location_url).text) coord = location["loc"].split(",") self._city = location["city"] weather_url = "{url}&lat={lat}&lon={lon}".format(url=weather_url, lat=coord[0], lon=coord[1]) else: weather_url = "{url}&q={city}".format(url=weather_url, city=self._location) weather = json.loads(requests.get(weather_url).text) self._temperature = int(weather['main']['temp']) self._weather = weather['weather'][0]['main'].lower() self._valid = True except RequestException: self._valid = False except Exception: self._valid = False # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
def handle(self, *args, **options): interval = float(options.get("interval", DEFAULT_PING_INTERVAL)) checkrate = float(options.get("checkrate", DEFAULT_PING_CHECKRATE)) server = options.get("server", DEFAULT_PING_SERVER_URL) self.started = datetime.now() while True: try: logging.info("Attempting a ping.") data = self.perform_ping(server) logging.info("Ping succeeded! (response: {}) Sleeping for {} minutes.".format(data, interval)) time.sleep(interval * 60) continue except ConnectionError: logging.warn("Ping failed (could not connect). Trying again in {} minutes.".format(checkrate)) except Timeout: logging.warn("Ping failed (connection timed out). Trying again in {} minutes.".format(checkrate)) except RequestException as e: logging.warn("Ping failed ({})! Trying again in {} minutes.".format(e, checkrate)) time.sleep(checkrate * 60)
def _save_url(self, url, bucket, object_name, num_retries, seconds_between_retries, retry_num=1): try: res = self._reuqests_get(url) except RequestException as e: if retry_num < num_retries: logging.exception(e) logging.info("retry {} / {}, waiting {} seconds before retrying...".format(retry_num, num_retries, seconds_between_retries)) time.sleep(seconds_between_retries) return self._save_url(url, bucket, object_name, num_retries, seconds_between_retries, retry_num + 1) else: raise if res.status_code == 200: object_storage.write(self.s3, bucket, object_name, res.content) return True else: return False
def _save_url(self, url, bucket, object_name, num_retries, seconds_between_retries, retry_num=1): try: res = self._reuqests_get(url) except RequestException as e: if retry_num < num_retries: logging.exception(e) logging.info("retry {} / {}, waiting {} seconds before retrying...".format(retry_num, num_retries, seconds_between_retries)) time.sleep(seconds_between_retries) return self._save_url(url, bucket, object_name, num_retries, seconds_between_retries, retry_num+1) else: raise if res.status_code == 200: object_storage.write(self.s3, bucket, object_name, res.content, public_bucket=True) return True else: return False
def download_other(obj, timeout=0, chunk_size=config.DEFAULT_CHUNKSIZE): url = obj.get('url', None) if url is None: return None logger.info('downloading video: ' + url) try: response = requests.get(url, stream=True, timeout=timeout) except RequestErrors.RequestException as e: logger.error(e) return None else: filename = gen_filename(config.DROP_FOLDER_LOCATION, extension='movie') with open(filename, 'wb') as out_file: for chunk in response.iter_content(chunk_size=chunk_size): out_file.write(chunk) return filename
def get_page_index(offset,keyword): data = { 'offset':offset, 'format':'json', 'keyword':keyword, 'autoload':'true', 'count':'20', 'cur_tab':3 } url = 'http://www.toutiao.com/search_content?'+urlencode(data) try: response = requests.get(url) if response.status_code==200: return response.text return None except RequestException: print ("??????") return None
def fetch_prs(self): """Gets the PRs for the configured repository. Returns: A tuple of (pr_list, error). pr_list is an iterable of GithubPRs if successful, otherwise empty. error is None if successful, or the error message otherwise. """ try: prs = self.get(GITHUB_PULLS_ENDPOINT) return [GithubPR(self, pr) for pr in prs], None except HTTPError as exc: return [], 'Non-200 HTTP Code Received: {}'.format(exc) except Timeout as exc: return [], 'Request timed out: {}'.format(exc) except RequestException as exc: return [], 'Catastrophic error in requests: {}'.format(exc)
def get(self, endpoint): """get makes a GET request against a specified endpoint. Args: endpoint: URL to which to make the GET request. URL is relative to https://api.github.com/repos/{self.org}/{self.repo}/ Returns: JSON object retrieved from the endpoint. Raises: HTTPError: if we get an HTTP error status. Timeout: for timeouts. RequestException: for other assorted exceptional cases. """ url = urlparse.urljoin(self.repo_url, endpoint) resp = requests.get(url, auth=(BOT_NAME, self.bot_key)) if resp.status_code is 200: return resp.json() resp.raise_for_status()
def task_user_callback_cb(task_id, parent_task_id, cb, **kwargs): """ Task for calling remote url in user defined callback """ try: obj = get_vms_object(kwargs) except ObjectDoesNotExist: obj = None user = User.objects.get(id=user_id_from_task_id(parent_task_id)) payload, status = get_task_status(parent_task_id) try: response = UserCallback(parent_task_id).request(cb, user.callback_key, payload) except RequestException as ex: status = states.FAILURE details = ex else: status = states.SUCCESS details = str(response.status_code) + ': ' + response.reason if cb.get('cb_log'): task_log(parent_task_id, LOG_REMOTE_CALLBACK, obj=obj, task_status=status, detail=details)
def execute(query, connection=None, data=None, external=None, stream=False): host, params, files = prepare(query, connection, external=external) response = requests.post(host, params=params, data=data, stream=stream, files=files) try: response.raise_for_status() except RequestException as e: if response.content: raise ClickhouseException(response.content) else: raise e if stream: return response.raw else: return response.content
def get_page_index(offset, keyword): data = { 'offset':offset, 'format':'json', 'keyword':keyword, 'autoload':'true', 'count':20, 'cur_tab':3 } url = 'http://www.toutiao.com/search_content/?' + urlencode(data) print('([%d] ??????? %s' % (os.getpid(), url)) try: response = requests.get(url) if response.status_code == 200: return response.text return None except RequestException: print('???????', url) return None
def get_site_description(url): site_content = description = '' try: site_content = requests.get(url, timeout=1).content except RequestException: logger.warn('Connection error trying to connect to {}'.format(url)) if site_content: target_markup = BeautifulSoup(site_content, 'html.parser') meta_description = target_markup.head.find('meta', {'name': 'description'}) if meta_description: description = meta_description.get('content') else: first_paragraph = target_markup.find('p') if first_paragraph: description = first_paragraph.string return description
def alert(self, matches): body = self.create_alert_body(matches) # post to pagerduty headers = {'content-type': 'application/json'} payload = { 'service_key': self.pagerduty_service_key, 'description': self.rule['name'], 'event_type': 'trigger', 'incident_key': self.pagerduty_incident_key, 'client': self.pagerduty_client_name, 'details': { "information": body.encode('UTF-8'), }, } # set https proxy, if it was provided proxies = {'https': self.pagerduty_proxy} if self.pagerduty_proxy else None try: response = requests.post(self.url, data=json.dumps(payload, cls=DateTimeEncoder, ensure_ascii=False), headers=headers, proxies=proxies) response.raise_for_status() except RequestException as e: raise EAException("Error posting to pagerduty: %s" % e) elastalert_logger.info("Trigger sent to PagerDuty")
def alert(self, matches): body = self.create_alert_body(matches) # post to victorops headers = {'content-type': 'application/json'} # set https proxy, if it was provided proxies = {'https': self.victorops_proxy} if self.victorops_proxy else None payload = { "message_type": self.victorops_message_type, "entity_display_name": self.victorops_entity_display_name, "monitoring_tool": "ElastAlert", "state_message": body } try: response = requests.post(self.url, data=json.dumps(payload, cls=DateTimeEncoder), headers=headers, proxies=proxies) response.raise_for_status() except RequestException as e: raise EAException("Error posting to VictorOps: %s" % e) elastalert_logger.info("Trigger sent to VictorOps")
def get_aggregate_swagger(self): """Get swagger files associated with the aggregates. Returns: A dict of swagger spec. """ if 'apis' in self.yaml_file: # Check if apis is in the config file for api_name, api_url in self.yaml_file['apis'].items(): if api_name not in self.swagger_apis: # Get the swagger.json try: self.swagger_apis[api_name] = {'spec': self.get_swagger_from_url(api_url), 'url': self.parse_value(api_url)} self.errors.remove(api_url) except (JSONDecodeError, RequestException) as exc: if api_url not in self.errors: self.errors.append(api_url) logger.warning(u'Cannot get swagger from {0}: {1}'.format(api_url, repr(exc))) except ValueError: logger.info(u'Cannot remove {0} from errors'.format(api_url)) return self.swagger_apis
def get_ticker(self): if (self._last_update is None) or (self._time_since_last_update() >= self.update_interval) \ or self._cached_response is None: # First run # 'or' # Limits for the API # Please limit requests to no more than 10 per minute. # Endpoints update every 5 minutes. try: self._cached_response = requests.get(self._url + self._currency).json() self._last_update = datetime.datetime.utcnow() self.internet_available = True except (RequestException, ): if self._cached_response is None: self.internet_available = False pass
def test_dispatch_event__handle_request_exception(self): """ Test that dispatch event handles exceptions and logs error. """ url = 'https://www.optimizely.com' params = { 'accountId': '111001', 'eventName': 'test_event', 'eventEntityId': '111028', 'visitorId': 'oeutest_user' } event = event_builder.Event(url, params, http_verb='POST', headers={'Content-Type': 'application/json'}) with mock.patch('requests.post', side_effect=request_exception.RequestException('Failed Request')) as mock_request_post,\ mock.patch('logging.error') as mock_log_error: event_dispatcher.EventDispatcher.dispatch_event(event) mock_request_post.assert_called_once_with(url, data=json.dumps(params), headers={'Content-Type': 'application/json'}, timeout=event_dispatcher.REQUEST_TIMEOUT) mock_log_error.assert_called_once_with('Dispatch event failed. Error: Failed Request')
def call_oncall(self, url): url = str(self.endpoint + url) try: r = self.requests.get(url) except RequestException: logger.exception('Failed hitting oncall-api for url "%s"', url) return None if r.status_code != 200: logger.error('Invalid response from oncall-api for URL "%s". Code: %s. Content: "%s"', url, r.status_code, r.content) return None try: return r.json() except ValueError: logger.exception('Failed decoding json from oncall-api. URL: "%s" Code: %s', url, r.status_code) return None
def send_metrics(self, metrics): if not self.enable_metrics: return now = str(datetime.now()) payload = [] for metric, value in metrics.iteritems(): data = { 'measurement': self.appname + '_' + metric, 'tags': {}, 'time': now, 'fields': { 'value': value } } if self.extra_tags: data['tags'].update(self.extra_tags) payload.append(data) try: self.client.write_points(payload) except (RequestException, InfluxDBClientError, InfluxDBServerError): logger.exception('Failed to send metrics to influxdb')
def get_instrument_precision(self, INSTRUMENT): QUERY_URL = self.DEFAULT_URL + '/v3/accounts/{}/instruments'.format(ID) params = { 'instruments': INSTRUMENT } try: response = requests.get(QUERY_URL, headers=DEFAULT_HEADERS, params=params).json() except RequestException as e: print ("Error while retrieving instrument information: %s"%e) return None pip_base = response['instruments'][0]['pipLocation'] return 10 ** pip_base
def link_ice_entry_to_study(self, user_token, strain, study): """ Task runs the code to register a link between an ICE entry and an EDD study. :param user_token: the token used to identify a user to ICE :param strain: the primary key of the EDD main.models.Strain in the link :param study: the primary key of the EDD main.models.Study in the link :throws Exception: for any errors other than communication errors to ICE instance """ # check that strain and study are still linked query = models.Strain.objects.filter(pk=strain, line__study__pk=study) if query.exists(): try: ice = create_ice_connection(user_token) record = query.annotate( study_slug=F('line__study__slug'), study_name=F('line__study__name'), ).distinct().get() url = build_study_url(record.study_slug) ice.link_entry_to_study(record.registry_id, study, url, record.study_name) except RequestException as e: # Retry when there are errors communicating with ICE raise self.retry(exc=e, countdown=delay_calculation(self), max_retries=10) except Exception as e: raise e
def unlink_ice_entry_from_study(self, user_token, strain, study): """ Task runs the code to de-register a link between an ICE entry and an EDD study. :param user_token: the token used to identify a user to ICE :param strain: the primary key of the EDD main.models.Strain in the former link :param study: the primary key of the EDD main.models.Study in the former link :throws Exception: for any errors other than communication errors to ICE instance """ query = models.Strain.objects.filter(pk=strain, line__study__pk=study) if not query.exists(): try: ice = create_ice_connection(user_token) record = models.Strain.objects.get(pk=strain) study_obj = models.Study.objects.get(pk=study) url = build_study_url(study_obj.slug) ice.unlink_entry_from_study(record.registry_id, study, url) except RequestException as e: # Retry when there are errors communicating with ICE raise self.retry(exc=e, countdown=delay_calculation(self), max_retries=10) except Exception as e: raise e
def get_page_index(offset,keyword ): data={ "offset":offset, "format":"json", "keyword":keyword, "autoload":"true", "count":20, "cur_tab":3 } url="http://www.toutiao.com/search_content/?"+urlencode(data) response=requests.get(url) try: if response.status_code==200: return response.text else: return None except RequestException: print("????") return None
def _load_image_from_url_or_local_path(self, file_path): urlparts = urlparse(file_path) if urlparts.scheme in ('http', 'https'): try: file = self._session.get(file_path, stream=True).raw except RequestException: log.warning("Image skipped", exc_info=True) return None elif urlparts.scheme in ("ftp", "data"): try: file = urlopen(file_path) except (URLError, ) + ftplib.all_errors: log.warning("Image skipped", exc_info=True) return None else: file = file_path try: return open_image(file) except (IOError, ValueError): log.warning("Image skipped", exc_info=True) return None
def search(self): text = self.searchBox.text().strip() if text: try: r = execute(lambda: requests.get("http://mapstory.org/api/base/search", params={"type__in":"map", "limit":50, "q": text})) r.raise_for_status() mapstories = r.json()["objects"] if mapstories: s = "<div><ul>" for mapstory in mapstories: link = "<a href='%s' class='button'>Open</a>" % (mapstory["id"]) s += "<li>%s <h3>%s</h3> %s <br> </li>" % (link, mapstory["title"], mapstory["abstract"]) s += "</ul></div>" else: s = "<h2>No maps matching your search criteria were found.</h2>" self.browser.setHtml(s) except RequestException as e: QMessageBox.warning(self, "Search", u"There has been a problem performing the search:\n" + str(e.args[0]), QMessageBox.Ok)
def fire(self): headers = {'content-type': 'application/json'} payload = { 'service_key': self.pagerduty_service_key, 'description': self.title, 'event_type': 'trigger', 'incident_key': tattle.make_md5(self.title), 'client': self.pagerduty_client_name, 'client_url': self.client_url, 'details': { 'alert-info': self.alert, 'matches': self.matches }, } try: response = requests.post(self.url, data=json.dumps(payload, ensure_ascii=False), headers=headers) response.raise_for_status() except RequestException as e: #raise RequestException("Error posting to pagerduty: %s" % e) logger.error('Error posting message to pagerduty: %s' % e) self.firemsg = """msg="{}", tale_name="{}", title="{}" """.format("PagerDuty Alert Sent", self.alert.get('name'), self.title)
def login(method): """Require user to login.""" def wrapper(*args, **kwargs): crawler = args[0].crawler # args[0] is a NetEase object try: if os.path.isfile(cookie_path): with open(cookie_path, 'r') as cookie_file: cookie = cookie_file.read() expire_time = re.compile(r'\d{4}-\d{2}-\d{2}').findall(cookie) now = time.strftime('%Y-%m-%d', time.localtime(time.time())) if expire_time[0] > now: crawler.session.cookies.load() else: crawler.login() else: crawler.login() except RequestException: click.echo('Maybe password error, please try again.') sys.exit(1) result = method(*args, **kwargs) return result return wrapper
def download_song_by_id(self, song_id, song_name, folder='.'): """Download a song by id and save it to disk. :params song_id: song id. :params song_name: song name. :params folder: storage path. """ try: url = self.crawler.get_song_url(song_id) if self.lyric: # use old api lyric_info = self.crawler.get_song_lyric(song_id) else: lyric_info = None song_name = song_name.replace('/', '') song_name = song_name.replace('.', '') self.crawler.get_song_by_url(url, song_name, folder, lyric_info) except RequestException as exception: click.echo(exception)
def download_artist_by_id(self, artist_id, artist_name): """Download a artist's top50 songs by his/her id. :params artist_id: artist id. :params artist_name: artist name. """ try: # use old api songs = self.crawler.get_artists_hot_songs(artist_id) except RequestException as exception: click.echo(exception) else: folder = os.path.join(self.folder, artist_name) for song in songs: self.download_song_by_id(song.song_id, song.song_name, folder)
def download_playlist_by_id(self, playlist_id, playlist_name): """Download a playlist's songs by its id. :params playlist_id: playlist id. :params playlist_name: playlist name. """ try: songs = self.crawler.get_playlist_songs( playlist_id) except RequestException as exception: click.echo(exception) else: folder = os.path.join(self.folder, playlist_name) for song in songs: self.download_song_by_id(song.song_id, song.song_name, folder)
def exception_handle(method): """Handle exception raised by requests library.""" def wrapper(*args, **kwargs): try: result = method(*args, **kwargs) return result except ProxyError: LOG.exception('ProxyError when try to get %s.', args) raise ProxyError('A proxy error occurred.') except ConnectionException: LOG.exception('ConnectionError when try to get %s.', args) raise ConnectionException('DNS failure, refused connection, etc.') except Timeout: LOG.exception('Timeout when try to get %s', args) raise Timeout('The request timed out.') except RequestException: LOG.exception('RequestException when try to get %s.', args) raise RequestException('Please check out your network.') return wrapper
def get_govuk_payment(self, govuk_id): response = requests.get( govuk_url('/payments/%s' % govuk_id), headers=govuk_headers(), timeout=15 ) if response.status_code != 200: if response.status_code == 404: return None raise RequestException( 'Unexpected status code: %s' % response.status_code, response=response ) try: data = response.json() try: validate_email(data.get('email')) except ValidationError: data['email'] = None return data except (ValueError, KeyError): raise RequestException('Cannot parse response', response=response)