我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.exceptions.ReadTimeout()。
def get_cids(self, cas): """ Use the PubChem API to get the CID :param cas: string - CAS identifier :return: list of CIDs """ uri = "http://pubchem.ncbi.nlm.nih.gov/rest/pug/compound/name/%s/cids/json" \ "?email=%s" try: response = get((uri % (cas, app.config['ADMIN_EMAIL']))).json() try: cids = response['IdentifierList']['CID'] return cids except KeyError: return None except (exceptions.ConnectionError, TimeoutError, exceptions.Timeout, exceptions.ConnectTimeout, exceptions.ReadTimeout) as e: # Error. return the error and the CAS number that this error occured on sys.stderr.write("Error: %s. Occurred on CAS: %s", (e, cas)) sys.stderr.flush() sys.stdout.flush()
def execute(self): try: self.system.run() except (ReadTimeout, ConnectionError, JSONDecodeError): pass except exceptions.TradingSystemException as e: curr = datetime.now() print('{time} - {text}'.format(time=curr.strftime('%Y-%m-%d %H:%M:%S'), text=str(e))) except Exception as e: curr = datetime.now() print('{time} - {text} - {args}'.format(time=curr.strftime('%Y-%m-%d %H:%M:%S'), text=str(e), args=e.args)) traceback.print_exc() if self.interval: threading.Timer(self.interval, self.execute).start()
def test_make_request_timeout(self): """ Test request timeout exception raised if hit on multiple nodes. """ self.clnt.session = Mock() self.clnt.session.return_value = True self.clnt.session.get.side_effect = ReadTimeout('Timeout') self.clnt._create_session = Mock() self.clnt.NUM_RETRY_REQUESTS = 3 self.clnt.connect_timeout = 2 self.clnt.node_cnt = 3 self.clnt.url_prefix = 'https://1.1.1.1:7777/web' self.clnt._is_good_response = Mock(return_value='Good') self.assertIsNone(self.clnt.last_used_node) with self.assertRaises(ReadTimeout): self.clnt._make_request('GET', 'url', 2, {'data': 'data'}) self.assertEqual(self.clnt.last_used_node, '1.1.1.1')
def delete_user_from_group(self,uname,gid): """ ??????????????????? """ uid = "" for user in self.group_members[gid]: if user['NickName'] == uname: uid = user['UserName'] if uid == "": return False url = self.base_uri + '/webwxupdatechatroom?fun=delmember&pass_ticket=%s' % self.pass_ticket params ={ "DelMemberList": uid, "ChatRoomName": gid, "BaseRequest": self.base_request } headers = {'content-type': 'application/json; charset=UTF-8'} data = json.dumps(params, ensure_ascii=False).encode('utf8') try: r = self.session.post(url, data=data, headers=headers) except (ConnectionError, ReadTimeout): return False dic = r.json() return dic['BaseResponse']['Ret'] == 0
def send_msg_by_uid(self, word, dst='filehelper'): url = self.base_uri + '/webwxsendmsg?pass_ticket=%s' % self.pass_ticket msg_id = str(int(time.time() * 1000)) + str(random.random())[:5].replace('.', '') word = self.to_unicode(word) params = { 'BaseRequest': self.base_request, 'Msg': { "Type": 1, "Content": word, "FromUserName": self.my_account['UserName'], "ToUserName": dst, "LocalID": msg_id, "ClientMsgId": msg_id } } headers = {'content-type': 'application/json; charset=UTF-8'} data = json.dumps(params, ensure_ascii=False).encode('utf8') try: r = self.session.post(url, data=data, headers=headers) except (ConnectionError, ReadTimeout): return False dic = r.json() return dic['BaseResponse']['Ret'] == 0
def do_work(job): tstart = time.perf_counter() status_code = None timeout = False connection_error = False expect_not_present = False try: result = requests.get(job.get('url'), timeout=job.get('timeout')) status_code = result.status_code if job.get('expect'): if job.get('expect') not in result.text: expect_not_present = True except ReadTimeout: timeout = True except socket_error: connection_error = True return {'status_code': status_code, 'ms': (time.perf_counter()-tstart)*1000, 'timeout': timeout, 'connection_error': connection_error, 'tstart': tstart, 'tstop': time.perf_counter(), 'expect_not_present': expect_not_present}
def delete_user_from_group(self, uname, gid): """ ??????????????????? """ uid = "" for user in self.group_members[gid]: if user['NickName'] == uname: uid = user['UserName'] if uid == "": return False url = self.base_uri + '/webwxupdatechatroom?fun=delmember&pass_ticket=%s' % self.pass_ticket params = { "DelMemberList": uid, "ChatRoomName": gid, "BaseRequest": self.base_request } headers = {'content-type': 'application/json; charset=UTF-8'} data = json.dumps(params, ensure_ascii=False).encode('utf8') try: r = self.session.post(url, data=data, headers=headers) except (ConnectionError, ReadTimeout): return False dic = r.json() return dic['BaseResponse']['Ret'] == 0
def makeRequest(url, logger, redirect=True, proxies={}): packages.urllib3.disable_warnings() try: response = get(url, allow_redirects=redirect, verify=False, proxies=proxies, timeout=25) newurl = "{}/{}".format(url, meta_redirect(response.text)) while newurl: response = get(newurl, allow_redirects=redirect, verify=False, proxies=proxies, timeout=25) newurl = meta_redirect(response.text) except exceptions.ConnectionError, e: logger.debug("[-] {} - {}".format(url, str(e))) return except exceptions.ReadTimeout, e: logger.debug("[-] {} - {}".format(url, str(e))) return else: return parseResponse(response)
def single_thread_solve(): """ ???????????????, ????????, ?????????? """ with open("solve_result.txt", "w") as f: check_codes = dict() for i in range(355, 1000 + 1): # ??? 1 ? 1000 ?????????? check_code = get_verify_code(i, check_codes, f) # ???? url = ("http://www.qlcoder.com/train/handsomerank?_token=d4texP05ci7veIAztvnwe5yETOFhlLWkSaBYC51B" "&user=w%40tch&checkcode={}".format(check_code)) while True: try: response = requests.get(url, timeout=10) if "?????" not in response.text: print("[+] ????? {} ?".format(i), file=f) except (ConnectTimeout, ReadTimeout, ValueError, ConnectionError, TooManyRedirects): print("[-] ??? {} ???".format(i), file=f) else: break
def set_group_name(self,gid,gname): """ ?????? """ url = self.base_uri + '/webwxupdatechatroom?fun=modtopic&pass_ticket=%s' % self.pass_ticket params ={ "NewTopic": gname, "ChatRoomName": gid, "BaseRequest": self.base_request } headers = {'content-type': 'application/json; charset=UTF-8'} data = json.dumps(params, ensure_ascii=False).encode('utf8') try: r = self.session.post(url, data=data, headers=headers) except (ConnectionError, ReadTimeout): return False dic = r.json() return dic['BaseResponse']['Ret'] == 0
def common_compile(container, package_config): docker_cli.start(container) timeout = int(package_config['limits']['timeout'] / 1000) try: code = docker_cli.wait(container, timeout=timeout) except ReadTimeout: status = 'timeout' else: if code == 0: status = 'ok' else: status = 'error' stdout, stderr = destroy_container(container) return CompileStatus(status, stderr + stdout)
def pubchem_counter(self, cid, collection): """ Use the SDQAgent that PubChem uses on their compound pages to get counts for a collection. cid: integer. The pubchem compound identifier Collection. String. One of the pubchem collections. E.g. "bioactivity" or "biocollection" Returns: Integer count """ uri = 'https://pubchem.ncbi.nlm.nih.gov/sdq/sdqagent.cgi?' \ 'infmt=json&outfmt=json' \ '&query={"select":["*"],"collection":"%s",' \ '"where":{"ors":{"cid":"%s"}},"start":1,"limit":1}' % (collection, cid) try: response = get(uri).json() try: count = response['SDQOutputSet'][0]['totalCount'] sys.stdout.write(str(count) + "\n") sys.stdout.flush() return count except KeyError: return None except (exceptions.ConnectionError, TimeoutError, exceptions.Timeout, exceptions.ConnectTimeout, exceptions.ReadTimeout) as e: # Error. return the error and the CID number that this error occured on # Save what have so far sys.stderr.write("Error: %s. Occurred on CID: %s", (e, cid)) sys.stderr.flush() sys.stdout.flush() quit() except exceptions.ChunkedEncodingError as e: sys.stderr.write("Error: %s. Occurred on CID: %s", (e, cid)) sys.stderr.flush() quit()
def test_retry_api_call(self, schematizer, biz_schema): with mock.patch.object( schematizer, '_get_api_result', side_effect=[ConnectionError, ReadTimeout, None] ) as api_spy: schematizer._call_api( api=schematizer._client.schemas.get_schema_by_id, params={'schema_id': biz_schema.schema_id} ) assert api_spy.call_count == 3
def run(self): try: while not self.rt.main_thread.quit_event.is_set(): try: log.info('Waiting for wake word...') try: self.recognizer.wait_for_wake_word() except SkipActivationSignal: pass self.rt.formats.faceplate.command('mouth.listen') if isfile(self.start_listening_file): play_audio(self.start_listening_file) log.info('Recording...') recording = self.recognizer.record_phrase() log.info('Done recording.') self.rt.formats.faceplate.command('mouth.reset') if isfile(self.stop_listening_file): play_audio(self.stop_listening_file) try: utterance = self.stt.transcribe(recording) except (HTTPError, ValueError, ReadTimeout): log.exception('Speech Client') utterance = '' log.info('Utterance: ' + utterance) self.send_query(utterance) except NewQuerySignal: pass except SystemExit: pass
def apply_useradd_requests(self,RecommendInfo): url = self.base_uri + '/webwxverifyuser?r='+str(int(time.time()))+'&lang=zh_CN' params = { "BaseRequest": self.base_request, "Opcode": 3, "VerifyUserListSize": 1, "VerifyUserList": [ { "Value": RecommendInfo['UserName'], "VerifyUserTicket": RecommendInfo['Ticket'] } ], "VerifyContent": "", "SceneListCount": 1, "SceneList": [ 33 ], "skey": self.skey } headers = {'content-type': 'application/json; charset=UTF-8'} data = json.dumps(params, ensure_ascii=False).encode('utf8') try: r = self.session.post(url, data=data, headers=headers) except (ConnectionError, ReadTimeout): return False dic = r.json() return dic['BaseResponse']['Ret'] == 0
def add_groupuser_to_friend_by_uid(self,uid,VerifyContent): """ ??????????????????? uid-?????uid VerifyContent-?????? ??????????????????????????????????????? """ if self.is_contact(uid): return True url = self.base_uri + '/webwxverifyuser?r='+str(int(time.time()))+'&lang=zh_CN' params ={ "BaseRequest": self.base_request, "Opcode": 2, "VerifyUserListSize": 1, "VerifyUserList": [ { "Value": uid, "VerifyUserTicket": "" } ], "VerifyContent": VerifyContent, "SceneListCount": 1, "SceneList": [ 33 ], "skey": self.skey } headers = {'content-type': 'application/json; charset=UTF-8'} data = json.dumps(params, ensure_ascii=False).encode('utf8') try: r = self.session.post(url, data=data, headers=headers) except (ConnectionError, ReadTimeout): return False dic = r.json() return dic['BaseResponse']['Ret'] == 0
def add_friend_to_group(self,uid,group_name): """ ????????? """ gid = '' #???????id,???????????????? for group in self.group_list: if group['NickName'] == group_name: gid = group['UserName'] if gid == '': return False #?????????????? group_num=len(self.group_members[gid]) print '[DEBUG] group_name:%s group_num:%s' % (group_name,group_num) #???id??uid????? for user in self.group_members[gid]: if user['UserName'] == uid: #???????,???? return True if group_num<=100: url = self.base_uri + '/webwxupdatechatroom?fun=addmember&pass_ticket=%s' % self.pass_ticket params ={ "AddMemberList": uid, "ChatRoomName": gid, "BaseRequest": self.base_request } else: url = self.base_uri + '/webwxupdatechatroom?fun=invitemember' params ={ "InviteMemberList": uid, "ChatRoomName": gid, "BaseRequest": self.base_request } headers = {'content-type': 'application/json; charset=UTF-8'} data = json.dumps(params, ensure_ascii=False).encode('utf8') try: r = self.session.post(url, data=data, headers=headers) except (ConnectionError, ReadTimeout): return False dic = r.json() return dic['BaseResponse']['Ret'] == 0
def invite_friend_to_group(self,uid,group_name): """ ???????????????????????? ???????????add_friend_to_group????????(Ret=1)??????????? """ gid = '' # ???????id,???????????????? for group in self.group_list: if group['NickName'] == group_name: gid = group['UserName'] if gid == '': return False # ???id??uid????? for user in self.group_members[gid]: if user['UserName'] == uid: # ???????,???? return True url = self.base_uri + '/webwxupdatechatroom?fun=invitemember&pass_ticket=%s' % self.pass_ticket params = { "InviteMemberList": uid, "ChatRoomName": gid, "BaseRequest": self.base_request } headers = {'content-type': 'application/json; charset=UTF-8'} data = json.dumps(params, ensure_ascii=False).encode('utf8') try: r = self.session.post(url, data=data, headers=headers) except (ConnectionError, ReadTimeout): return False dic = r.json() return dic['BaseResponse']['Ret'] == 0
def add_friend_to_group(self,uid,group_name): """ ????????? """ gid = '' #???????id,???????????????? for group in self.group_list: if group['NickName'] == group_name: gid = group['UserName'] if gid == '': return False #???id??uid????? for user in self.group_members[gid]: if user['UserName'] == uid: #???????,???? return True url = self.base_uri + '/webwxupdatechatroom?fun=addmember&pass_ticket=%s' % self.pass_ticket params ={ "AddMemberList": uid, "ChatRoomName": gid, "BaseRequest": self.base_request } headers = {'content-type': 'application/json; charset=UTF-8'} data = json.dumps(params, ensure_ascii=False).encode('utf8') try: r = self.session.post(url, data=data, headers=headers) except (ConnectionError, ReadTimeout): return False dic = r.json() return dic['BaseResponse']['Ret'] == 0
def _soap_call(self, service, params): try: params = utils.add_extra_info(service, params) if Configuration.sanitize: params = utils._check_sanitize(params=params, is_root=True) if not params.get('psp_ClientSession'): params = utils.add_secure_hash(params, Configuration.secret_key) response = getattr(self._client.service, service)(params) return response except ReadTimeout: raise ApiException except ConnectTimeout: raise ApiException
def azure_destroy_classic(creds, name): logger = logging.getLogger(__name__) self = azure.AzureClassic() driver = self._get_user_driver(**creds) logger.info('Waiting for server to be destroyed...') while self._find_server(driver, name) is not None: sleep(0.5) logger.info('Removing cloud service...') driver.ex_destroy_cloud_service(name) if len([cloud for cloud in driver.ex_list_cloud_services() if cloud.service_name.startswith(name.rsplit('-', 1)[0])]) < 1: logger.info('Removing storage service...') while True: try: driver.ex_destroy_storage_service(name.rsplit('-', 1)[0].replace('-', '')) except (libcloud.common.types.LibcloudError, ReadTimeout) as e: if 'has some active image(s)' not in repr(e): logger.info(repr(e)) except AttributeError: # Generally caused by "Too Many Requests" response not being parsed sleep(2) else: break finally: sleep(0.5)
def apply_useradd_requests(self, RecommendInfo): url = self.base_uri + '/webwxverifyuser?r=' + str(int(time.time())) + '&lang=zh_CN' params = { "BaseRequest": self.base_request, "Opcode": 3, "VerifyUserListSize": 1, "VerifyUserList": [ { "Value": RecommendInfo['UserName'], "VerifyUserTicket": RecommendInfo['Ticket']} ], "VerifyContent": "", "SceneListCount": 1, "SceneList": [ 33 ], "skey": self.skey } headers = {'content-type': 'application/json; charset=UTF-8'} data = json.dumps(params, ensure_ascii=False).encode('utf8') try: r = self.session.post(url, data=data, headers=headers) except (ConnectionError, ReadTimeout): return False dic = r.json() return dic['BaseResponse']['Ret'] == 0
def add_groupuser_to_friend_by_uid(self, uid, VerifyContent): """ ??????????????????? uid-?????uid VerifyContent-?????? ??????????????????????????????????????? """ if self.is_contact(uid): return True url = self.base_uri + '/webwxverifyuser?r=' + str(int(time.time())) + '&lang=zh_CN' params = { "BaseRequest": self.base_request, "Opcode": 2, "VerifyUserListSize": 1, "VerifyUserList": [ { "Value": uid, "VerifyUserTicket": "" } ], "VerifyContent": VerifyContent, "SceneListCount": 1, "SceneList": [ 33 ], "skey": self.skey } headers = {'content-type': 'application/json; charset=UTF-8'} data = json.dumps(params, ensure_ascii=False).encode('utf8') try: r = self.session.post(url, data=data, headers=headers) except (ConnectionError, ReadTimeout): return False dic = r.json() return dic['BaseResponse']['Ret'] == 0
def add_friend_to_group(self, uid, group_name): """ ????????? """ gid = '' # ???????id,???????????????? for group in self.group_list: if group['NickName'] == group_name: gid = group['UserName'] if gid == '': return False # ???id??uid????? for user in self.group_members[gid]: if user['UserName'] == uid: # ???????,???? return True url = self.base_uri + '/webwxupdatechatroom?fun=addmember&pass_ticket=%s' % self.pass_ticket params = { "AddMemberList": uid, "ChatRoomName": gid, "BaseRequest": self.base_request } headers = {'content-type': 'application/json; charset=UTF-8'} data = json.dumps(params, ensure_ascii=False).encode('utf8') try: r = self.session.post(url, data=data, headers=headers) except (ConnectionError, ReadTimeout): return False dic = r.json() return dic['BaseResponse']['Ret'] == 0
def invite_friend_to_group(self, uid, group_name): """ ???????????????????????? ???????????add_friend_to_group????????(Ret=1)??????????? """ gid = '' # ???????id,???????????????? for group in self.group_list: if group['NickName'] == group_name: gid = group['UserName'] if gid == '': return False # ???id??uid????? for user in self.group_members[gid]: if user['UserName'] == uid: # ???????,???? return True url = self.base_uri + '/webwxupdatechatroom?fun=invitemember&pass_ticket=%s' % self.pass_ticket params = { "InviteMemberList": uid, "ChatRoomName": gid, "BaseRequest": self.base_request } headers = {'content-type': 'application/json; charset=UTF-8'} data = json.dumps(params, ensure_ascii=False).encode('utf8') try: r = self.session.post(url, data=data, headers=headers) except (ConnectionError, ReadTimeout): return False dic = r.json() return dic['BaseResponse']['Ret'] == 0