Python requests 模块,Session() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.Session()

项目:crepriceSpider    作者:zhousenbiao    | 项目源码 | 文件源码
def get_cookie(account, password):
    s = requests.Session()
    payload = {
        'login_uid1': account,
        'login_pwd1': password,
        'agreeRule': "1",
        'loginsubmit': "??",
        # 'redirect_to': "http://www.creprice.cn",
        # 'testcookie': "1"
    }
    response = s.post(login_url, data=payload, allow_redirects=False)
    cookies = response.cookies.get_dict()
    logger.warning("get cookie success!!!(account is:%s)" % account)
    return json.dumps(cookies)

# ?Cookies??Redis???
项目:numerai    作者:gansanay    | 项目源码 | 文件源码
def upload_prediction(self, file_path):
        filename, signedRequest, headers, status_code = self.authorize(file_path)
        if status_code!=200:
            return status_code

        dataset_id, comp_id, status_code = self.get_current_competition()
        if status_code!=200:
            return status_code

        with open(file_path, 'rb') as fp:
            r = requests.Request('PUT', signedRequest, data=fp.read())
            prepped = r.prepare()
            s = requests.Session()
            resp = s.send(prepped)
            if resp.status_code!=200:
                return resp.status_code

        r = requests.post(self._submissions_url,
                    data={'competition_id':comp_id, 'dataset_id':dataset_id, 'filename':filename},
                    headers=headers)

        return r.status_code
项目:QQBot    作者:springhack    | 项目源码 | 文件源码
def prepareLogin(self):
        self.clientid = 53999199
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:27.0) Gecko/20100101 Firefox/27.0',
            'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'
        })    
        self.urlGet(
            'https://ui.ptlogin2.qq.com/cgi-bin/login?daid=164&target=self&style=16&mibao_css=m_webqq&' + \
            'appid=501004106&enable_qlogin=0&no_verifyimg=1&s_url=http%3A%2F%2Fw.qq.com%2Fproxy.html&' + \
            'f_url=loginerroralert&strong_login=1&login_state=10&t=20131024001'
        )
        self.session.cookies.update(dict(
            RK='OfeLBai4FB', ptcz='ad3bf14f9da2738e09e498bfeb93dd9da7540dea2b7a71acfb97ed4d3da4e277',
            pgv_pvi='911366144', pgv_info='ssid pgv_pvid=1051433466',
            qrsig='hJ9GvNx*oIvLjP5I5dQ19KPa3zwxNI62eALLO*g2JLbKPYsZIRsnbJIxNe74NzQQ'
        ))
        self.getAuthStatus()
        self.session.cookies.pop('qrsig')
项目:sopel-modules    作者:phixion    | 项目源码 | 文件源码
def hltb(bot,trigger):
    if not trigger.group(2):
        return bot.say("Enter a game name to search.")
    game = trigger.group(2)
    url = "http://howlongtobeat.com/search_main.php?page=1"
    payload = {"queryString":game,"t":"games","sorthead":"popular","sortd":"Normal Order","length_type":"main","detail":"0"}
    test = {'Content-type':'application/x-www-form-urlencoded', 'User-Agent':'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.97 Safari/537.36','origin':'https://howlongtobeat.com','referer':'https://howlongtobeat.com'}
    session = requests.Session()
    session.post(url, headers=test, data=payload)
    r = session.post(url, headers=test, data=payload)
    if len(r.content) < 250:
        return bot.say("No results.")
    bs = BeautifulSoup(r.content)
    first = bs.findAll("div", {"class":"search_list_details"})[0]
    name = first.a.text
    time = first.findAll('div')[3].text
    bot.say('{} - {}'.format(name, time))
项目:ccu_and_eccu_publish    作者:gaofubin    | 项目源码 | 文件源码
def AkamaiEdgeGridConfig_Setup(config_file, section):
    config_file = os.path.expanduser(config_file)   
    if debug: print "DEBUG: config_file", config_file

    #Currently unused.
    required_options = ['client_token','client_secret','host','access_token']
    EdgeGridConfig = {}

    if os.path.isfile(config_file):
        config = ConfigParser.ConfigParser()
        config.readfp(open(config_file))
        for key, value in config.items(section):
            # ConfigParser lowercases magically
            EdgeGridConfig[key] = value
    else:
        print "Missing configuration file.  Run python gen_creds.py to get your credentials file set up once you've provisioned credentials in LUNA."
        exit()

    EdgeGridConfig['host'] = '%s://%s' % ('https', EdgeGridConfig['host'])

    if debug: print EdgeGridConfig
    return EdgeGridConfig

#Setup a EdgeGrid Session using the EdgeGridConfig previously loaded.
项目:python-cachetclient    作者:dmsimard    | 项目源码 | 文件源码
def __init__(self, **kwargs):
        """
        Initialize the class, get the necessary parameters
        """
        self.user_agent = 'python-cachetclient'
        try:
            self.endpoint = kwargs['endpoint']
        except KeyError:
            raise KeyError('Cachet API endpoint is required')

        self.api_token = kwargs.get('api_token', None)
        self.timeout = kwargs.get('timeout', None)
        self.verify = kwargs.get('verify', None)
        self.pagination = kwargs.get('pagination', False)

        self.http = requests.Session()
项目:imageDownloader    作者:whcacademy    | 项目源码 | 文件源码
def setupSession():
    session = requests.Session()
    session.header = { 'User-Agent': "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:34.0) Gecko/20100101 Firefox/34.0","Accept-Encoding": "gzip, deflate, sdch"}
    return session
项目:pyTBA    作者:Thing342    | 项目源码 | 文件源码
def __init__(self, name: str = None, description: str = None, version: str = None):
        self.app_id = {'X-TBA-App-Id': ""}
        self.session = requests.Session()
        self.session = CacheControl(self.session, heuristic=LastModified())
        self.session.headers.update(self.app_id)
        if name is not None: self.set_api_key(name, description, version)
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def __init__(self):
        Analyzer.__init__(self)
        self.service = self.get_param('config.service', None, 'EmergingThreats service is missing')
        self.apikey = self.get_param('config.key', None, 'EmergingThreats apikey is missing')
        self.session = requests.Session()
        self.session.headers.update({"Authorization": self.apikey})
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def __init__(self, key, client_id, client_version='0.1'):
        self.api_key = key
        self.session = requests.Session()
        self.url = 'https://safebrowsing.googleapis.com/v4/threatMatches:find?key={}'.format(key)
        self.client_id = client_id
        self.client_version = client_version
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def run(path, quiet=False):
    """
    Downloads all available hash files to a given path.

    :param path: Path to download directory
    :param quiet: If set to True, no progressbar is displayed
    """
    if os.path.isdir(path):
        session = requests.Session()
        session.headers = {'User-agent': 'Mozilla/5.0 Chrome/57.0.2987.110'}
        max_num = max(list(map(int, re.sub(r'[\<\>]',
                                           '',
                                           '\n'.join(re.findall(r'\>[1-9][0-9]{2}\<',
                                                                session.get('https://virusshare.com/hashes.4n6').text
                                                                )
                                                     )
                                           ).split('\n')
                               )
                           )
                      )
        if not quiet:
            p = progressbar.ProgressBar(max_value=max_num)
        for i in range(max_num):
            filename = str(i).zfill(3) + '.md5'
            if os.path.exists(os.path.join(path, filename)):
                continue
            if not quiet:
                p.update(i)
            url = URL + filename
            head = session.head(url)
            if head.status_code == 200:
                body = session.get(url, stream=True)
                with io.open(os.path.join(path, str(i).zfill(3) + '.md5'), mode='wb') as afile:
                    for chunk in body.iter_content(chunk_size=1024):
                        afile.write(b'' + chunk)
                body.close()
    else:
        print('Given path is not a directory.')
        sys.exit(1)
项目:Zoom2Youtube    作者:Welltory    | 项目源码 | 文件源码
def session(self, email, password):
        session = requests.Session()
        session.headers.update({
            'content-type': 'application/x-www-form-urlencoded'
        })
        response = session.post(
            ZoomClient.SIGNIN_URL, data={'email': email, 'password': password}
        )
        return session, response
项目:rca-evaluation    作者:sieve-microservices    | 项目源码 | 文件源码
def check(auth_ref, args):
    # We call get_keystone_client here as there is some logic within to get a
    # new token if previous one is bad.
    keystone = get_keystone_client(auth_ref)
    auth_token = keystone.auth_token
    registry_endpoint = 'http://{ip}:9191'.format(ip=args.ip)

    s = requests.Session()

    s.headers.update(
        {'Content-type': 'application/json',
         'x-auth-token': auth_token})

    try:
        # /images returns a list of public, non-deleted images
        r = s.get('%s/images' % registry_endpoint, verify=False, timeout=10)
        is_up = r.ok
    except (exc.ConnectionError, exc.HTTPError, exc.Timeout):
        is_up = False
    except Exception as e:
        status_err(str(e))

    metric_values = dict()

    status_ok()
    metric_bool('glance_registry_local_status', is_up)
    # only want to send other metrics if api is up
    if is_up:
        milliseconds = r.elapsed.total_seconds() * 1000
        metric('glance_registry_local_response_time', 'double',
               '%.3f' % milliseconds, 'ms')
        metric_values['glance_registry_local_response_time'] = ('%.3f' % milliseconds)
        metric_influx(INFLUX_MEASUREMENT_NAME, metric_values)
项目:rca-evaluation    作者:sieve-microservices    | 项目源码 | 文件源码
def check(args):
    metadata_endpoint = ('http://{ip}:8775'.format(ip=args.ip))
    is_up = True

    s = requests.Session()

    try:
        # looks like we can only get / (ec2 versions) without specifying
        # an instance ID and other headers
        versions = s.get('%s/' % metadata_endpoint,
                         verify=False,
                         timeout=10)
        milliseconds = versions.elapsed.total_seconds() * 1000
        if not versions.ok or '1.0' not in versions.content.splitlines():
            is_up = False
    except (exc.ConnectionError, exc.HTTPError, exc.Timeout) as e:
        is_up = False
    except Exception as e:
        status_err(str(e))

    metric_values = dict()

    status_ok()
    metric_bool('nova_api_metadata_local_status', is_up)
    # only want to send other metrics if api is up
    if is_up:
        metric('nova_api_metadata_local_response_time',
               'double',
               '%.3f' % milliseconds,
               'ms')

        metric_values['nova_api_metadata_local_response_time'] = ('%.3f' % milliseconds)
        metric_influx(INFLUX_MEASUREMENT_NAME, metric_values)
项目:service-fabric-cli    作者:Azure    | 项目源码 | 文件源码
def upload(path, imagestore_string='fabric:ImageStore', show_progress=False):  # pylint: disable=too-many-locals,missing-docstring
    from sfctl.config import (client_endpoint, no_verify_setting, ca_cert_info,
                              cert_info)
    import requests

    abspath = validate_app_path(path)
    basename = os.path.basename(abspath)

    endpoint = client_endpoint()
    cert = cert_info()
    ca_cert = True
    if no_verify_setting():
        ca_cert = False
    elif ca_cert_info():
        ca_cert = ca_cert_info()

    if all([no_verify_setting(), ca_cert_info()]):
        raise CLIError('Cannot specify both CA cert info and no verify')


    # Upload to either to a folder, or native image store only
    if 'file:' in imagestore_string:
        dest_path = path_from_imagestore_string(imagestore_string)
        upload_to_fileshare(abspath, os.path.join(dest_path, basename),
                            show_progress)
    elif imagestore_string == 'fabric:ImageStore':
        with requests.Session() as sesh:
            sesh.verify = ca_cert
            sesh.cert = cert
            upload_to_native_imagestore(sesh, endpoint, abspath, basename,
                                        show_progress)
    else:
        raise CLIError('Unsupported image store connection string')
项目:consul-pg    作者:adamcstephens    | 项目源码 | 文件源码
def __init__(self, service, role_source, configfile=DEFAULT_CONFIGFILE):
        self.service = service
        self.role_source = role_source

        self.api_endpoint = 'http://127.0.0.1:8500/v1'
        self.api_session = requests.Session()

        self.hostname = gethostname()
        self.short_hostname = self.hostname.split('.')[0]

        self.update_service = False
        self.valid_states = ['master', 'slave', 'fail']

        self.configfile = configfile
        self.leader_uri = self.api_endpoint + '/kv/session/' + self.service + '/leader'
项目:consul-pg    作者:adamcstephens    | 项目源码 | 文件源码
def current_leader_session_id(self):
        check_current_leader = self.api_session.get(self.leader_uri)
        if check_current_leader.status_code == 200:
            return check_current_leader.json()[0].get('Session')
项目:OpenCouture-Dev    作者:9-9-0    | 项目源码 | 文件源码
def __init__(self):
        self.URL_vendor      = 'http://shop.bdgastore.com/'
        self.URL_product     = 'http://shop.bdgastore.com/collections/footwear/products/y-3-pureboost-zg'
        self.URL_addToCart   = 'http://shop.bdgastore.com/cart/add.js'
        self.URL_cart        = 'http://shop.bdgastore.com/cart'
        self.user_size       = '8'
        self.user_session    = requests.Session()
项目:stalkerGKSU    作者:zense    | 项目源码 | 文件源码
def get_data(username, no):
    if no == 0:
        z = 'followers'
    else:
        z = 'following'
    # these lines of code gets the list of followers or the following on the first page
    # when there are no further pages of followers or following. And if there are go forward with the next page
    s = requests.Session()
    final=[]
    x = 1
    pages = [""]
    data = []
    while(pages != [] and x <= max_number/no_per_page):
        r = s.get('https://github.com/' + username + '?page=' +  str(x) + '&tab=' + z) #first getting all the followers for z=0, and following for z=1
        soup = BeautifulSoup(r.text)
        data = data + soup.find_all("div", {"class" : "d-table col-12 width-full py-4 border-bottom border-gray-light"})
        pages = soup.find_all("div", {"class" : "pagination"})
        x += 1
   # getting company and area.
    for i in data:
        username = i.find_all("a")[0]['href']
        try:
            company = i.find_all("span", {"class" : "mr-3"})[0].text.strip()
        except:
            company = "xxxxx"
        try:
            area = i.find_all("p", {"class" : "text-gray text-small mb-0"})[0].text.strip()
        except:
            area = "xxxxx"
        soup2 = BeautifulSoup(str(i))
        name = soup2.find_all("span",{"class" : "f4 link-gray-dark"})[0].text
        final.append([username,company,area,name])
    return final
项目:stalkerGKSU    作者:zense    | 项目源码 | 文件源码
def scrape_org(org,main_list,organisation):
     s = requests.Session()
     r = s.get('https://github.com/orgs/'+org+'/people')
     soup = BeautifulSoup(r.text)
     data = soup.find_all("li", {"class" : "table-list-item member-list-item js-bulk-actions-item "})

     for i in data:
         soup2=BeautifulSoup(str(i))
         data2=soup2.find_all("div",{"class" : "table-list-cell py-3 pl-3 v-align-middle member-avatar-cell css-truncate pr-0"})
         username = data2[0].find_all("a")[0]['href']
         data3 = soup2.find_all("div",{"class" : "table-list-cell py-3 v-align-middle member-info css-truncate pl-3"})
         name = data3[0].find_all("a")[0].text.strip()
         main_list.append([username,name])
项目:stalkerGKSU    作者:zense    | 项目源码 | 文件源码
def update_org_list(main_list,organisation):
    s = requests.Session()
    for i in main_list:
         r = s.get('https://github.com/'+i[0])
         soup = BeautifulSoup(r.text)
         data = soup.find_all("li",{"aria-label":"Organization"})
         try:
             if data[0].text not in organisation:
                 organisation.append(data[0].text)
         except:
             continue
    return organisation
项目:stalkerGKSU    作者:zense    | 项目源码 | 文件源码
def scrape_org_general(org,main_list,organisation):
    org.replace(" ","+")
    s = requests.Session()
    count = 1
    k = "https://github.com/search?p="+str(count)+"&q="+org+"+type%3Auser&type=Users&utf8=%E2%9C%93"
    r = s.get(k)

    soup = BeautifulSoup(r.text,"lxml")
    data = soup.find_all("div",{"class":"user-list-info ml-2"})
    while data!=[]:
        for i in data:
            username = i.find_all("a")[0]['href']
            name = i.find_all("span",{"class":"f4 ml-1"})[0].text.strip()
            main_list.append([username,name])
        count+=1
        k = "https://github.com/search?p="+str(count)+"&q="+org+"+type%3Auser&type=Users&utf8=%E2%9C%93"
        r = s.get(k)
        soup = BeautifulSoup(r.text,"lxml")
        data = soup.find_all("div",{"class":"user-list-info ml-2"})

# scraping the github pages
项目:potatoygg    作者:Ripolin    | 项目源码 | 文件源码
def setUp(self, conf='/test.cfg'):
        settings = Settings()
        settings.setFile(base_path + conf)
        Env.set('settings', settings)
        Env.set('http_opener', requests.Session())
        Env.set('cache', NoCache())
        YGG.log.logger.setLevel('DEBUG')
        YGG.log.logger.addHandler(handler)
        return YGG()
项目:javID-SearchTool    作者:VegetableCat    | 项目源码 | 文件源码
def get_av_magnet(avcode):




    Referer={
    "Referer": "123"
    }

    s = requests.Session()
    gid,uc = download_image(avcode)

    params={
    'gid':gid,
    'uc':uc,
    'lang':'zh'
    }

    r2 = s.get("http://www.javbus.com/ajax/uncledatoolsbyajax.php",params=params, proxies=proxy, headers=Referer)
    soup = BeautifulSoup(r2.content.decode('utf-8', 'ignore'),'html.parser')

    trs  = soup.findAll('tr',attrs={"height":"35px"})
    print '[*] get magnet link'
    for tr in trs:
        trsoup = BeautifulSoup(str(tr).decode('utf-8', 'ignore'),'html.parser')
        td2 = trsoup.findAll('td',attrs={"style":"text-align:center;white-space:nowrap"})
        a = td2[0].find('a')
        magnet = a.get("href") #unicode object
        size = a.text.strip()
        print '[*] '+magnet,size

    os.chdir("../..")
项目:transfert    作者:rbernand    | 项目源码 | 文件源码
def _connect(self):
        self._session = requests.Session()
        adaptator = requests.adapters.HTTPAdapter()
        adaptator.max_retries = HttpRetry(
            read=self.READ_MAX_RETRIES,
            connect=self.CONN_MAX_RETRIES,
            backoff_factor=self.BACKOFF_FACTOR)
        self._session.mount(str(self.url), adaptator)
        self.__conn = self._session.get(
            self.url,
            stream=True,
            timeout=(self.CONN_TIMEOUT, self.READ_TIMEOUT))
项目:xxetimes    作者:ropnop    | 项目源码 | 文件源码
def makeRequestSession(self):
        host = self.requestHandler.headers.getheader('host',None)
        path = self.requestHandler.path
        self.url = self.uri + "://" + host + path

        session = requests.Session()

        for header in self.requestHandler.headers.keys():
            if header != 'content-length':
                session.headers.update({header : self.requestHandler.headers.getheader(header)})

        if self.proxies:
            session.proxies = self.proxies
        return session
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def setUp(self):
        self.tls_adapter = CbAPISessionAdapter(force_tls_1_2=True)
        self.session = requests.Session()
        self.session.mount("https://", self.tls_adapter)
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def _url_to_key(self, url):
        session = requests.Session()
        return self.create_key(session.prepare_request(requests.Request('GET', url)))
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def uninstall_cache():
    """ Restores ``requests.Session`` and disables cache
    """
    _patch_session_factory(OriginalSession)
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def __init__(self, server, ssl_verify=True, token=None, ignore_system_proxy=False,
                 use_https_proxy=None, ssl_verify_hostname=True, use_http_proxy=None):
        """ Requires:
                server -    URL to the Carbon Black server.  Usually the same as
                            the web GUI.
                ssl_verify - verify server SSL certificate
                token - this is for CLI API interface
        """

        # We will uncomment this once cbapi 1.0.0 is released
        # warn("CbApi is deprecated and will be removed as of cbapi 2.0.0.", DeprecationWarning)

        if not server.startswith("http"):
            raise TypeError("Server must be URL: e.g, http://cb.example.com")

        if token is None:
            raise TypeError("Missing required authentication token.")

        self.server = server.rstrip("/")
        self.ssl_verify = ssl_verify
        self.token = token
        self.token_header = {'X-Auth-Token': self.token}
        self.session = requests.Session()

        if not ssl_verify_hostname:
            self.session.mount("https://", HostNameIgnoringAdapter())

        self.proxies = {}
        if ignore_system_proxy:         # see https://github.com/kennethreitz/requests/issues/879
            self.proxies = {
                'no': 'pass'
            }
        else:
            if use_http_proxy:
                self.proxies['http'] = use_http_proxy
            if use_https_proxy:
                self.proxies['https'] = use_https_proxy
项目:LFISuite    作者:D35m0nd142    | 项目源码 | 文件源码
def send(self, *a, **kw):
        a[0].url = a[0].url.replace(urllib.quote("<"), "<")
        a[0].url = a[0].url.replace(urllib.quote(" "), " ")
        a[0].url = a[0].url.replace(urllib.quote(">"), ">")
        return requests.Session.send(self, *a, **kw)
项目:dashboard-duty    作者:jrasell    | 项目源码 | 文件源码
def index():

    try:
        api_key = os.environ['DASHBOARD_DUTY_KEY']
        service_key = os.environ['DASHBOARD_DUTY_SERVICE']
    except KeyError:
        logging.error('Missing Environment Variable(s)')
        exit(1)

    session = requests.Session()
    d_session = dashboard_duty.Core(session, api_key, service_key)

    service = d_session.service()
    incidents = d_session.incident(service['id'])

    if service['escalation_policy']['escalation_rules'][0]['targets'][0]['type'] == 'schedule_reference':
        service_id = service['escalation_policy']['escalation_rules'][0]['targets'][0]['id']
        oncall = d_session.oncall_schedule_policy(service_id)

    elif service['escalation_policy']['escalation_rules'][0]['targets'][0]['type'] == 'user_reference':
        username = service['escalation_policy']['escalation_rules'][0]['targets'][0]['summary']
        oncall = d_session.oncall_user_policy(username)
    else:
        logging.error('Unable to handle oncall policy for %s' % service_key)
        exit(1)

    return render_template('index.html', service=service, incidents=incidents, oncall=oncall)
项目:Radiojavan    作者:nimasaj    | 项目源码 | 文件源码
def list_DL(List):
    List=List[1:]
    List2=[]
    ID_list=[]
    List_dl=[]
    k=0
    j=0
    p=0

    while p*2<len(List):
        List2.append(List[p*2])
        p+=1

    for i in List2:
        #--------------------
        s1=requests.Session()
        s1.get(url0)
        url_list=s1.get(List2[j], headers=headers)
        #--------------------
        #a1=requests.get(List2[j])
        #html=a1.text
        html=url_list.text
        a2=html.find('<a href="javascript:void(0)" link="')
        a2_len=len('<a href="javascript:void(0)" link="')
        if a2<0:
            a2=html.find("RJ.currentMP3Url = 'mp3/")
            a2_len=len("RJ.currentMP3Url = 'mp3/")
        a3=html.find('" target="_blank" class="mp3_download_link">')
        if a3<0:
            a3=html.find("RJ.currentMP3 = '")
        a4=html[a2+a2_len:a3]
        if a4.find("'")>0:
            a4=a4[:a4.find("'")]
            ID_list.append(a4+'.mp3')
            while k < len(ID_list):
                List_dl.append(check_host(l_mp3[:4],ID_list[k])[0])
                k+=1
        else:
            List_dl.append(a4)
        j+=1
    return(List_dl)
项目:googletranslate.popclipext    作者:wizyoung    | 项目源码 | 文件源码
def __init__(self, service_urls=None, user_agent=DEFAULT_USER_AGENT):
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': user_agent,
        })
        self.service_urls = service_urls or ['translate.google.com']
        self.token_acquirer = TokenAcquirer(session=self.session, host=self.service_urls[0])

        # Use HTTP2 Adapter if hyper is installed
        try:  # pragma: nocover
            from hyper.contrib import HTTP20Adapter
            self.session.mount(urls.BASE, HTTP20Adapter())
        except ImportError:  # pragma: nocover
            pass
项目:googletranslate.popclipext    作者:wizyoung    | 项目源码 | 文件源码
def __init__(self, tkk='0', session=None, host='translate.google.com'):
        self.session = session or requests.Session()
        self.tkk = tkk
        self.host = host if 'http' in host else 'https://' + host
项目:ccu_and_eccu_publish    作者:gaofubin    | 项目源码 | 文件源码
def AkamaiEdgeGridSession_Setup(AkamaiEdgeGridConfig):
    session = requests.Session()

    session.auth = EdgeGridAuth(
                client_token=AkamaiEdgeGridConfig['client_token'],
                client_secret=AkamaiEdgeGridConfig['client_secret'],
                access_token=AkamaiEdgeGridConfig['access_token']
    )

    return session

#Actually call akamai and return the result.
项目:py-tendermint    作者:davebryson    | 项目源码 | 文件源码
def __init__(self, host="127.0.0.1", port=46657):
        # Tendermint endpoint
        self.uri = "http://{}:{}".format(host, port)

        # Keep a session
        self.session = requests.Session()

        # Request counter for json-rpc
        self.request_counter = itertools.count()

        # request headers
        self.headers = {
            'user-agent': AGENT,
            'Content-Type': 'application/json'
        }
项目:webtzite    作者:materialsproject    | 项目源码 | 文件源码
def __init__(self, api_key=None, endpoint=None):
        if api_key is None or endpoint is None:
            try:
                from pymatgen import SETTINGS
            except ImportError:
                warnings.warn('MPResterBase: not using pymatgen SETTINGS!')
                SETTINGS = {}
        if api_key is not None:
            self.api_key = api_key
        else:
            self.api_key = SETTINGS.get("PMG_MAPI_KEY", "")
        if endpoint is not None:
            self.preamble = endpoint
        else:
            self.preamble = SETTINGS.get(
                "PMG_MAPI_ENDPOINT", "https://www.materialsproject.org/rest/v2"
            )
        if not self.api_key:
            raise ValueError('API key not set. Run `pmg config --add PMG_MAPI_KEY <USER_API_KEY>`.')
        self.session = requests.Session()
        self.session.headers = {"x-api-key": self.api_key}
项目:QUANTAXIS    作者:yutiansut    | 项目源码 | 文件源码
def __init__(self, broker="http://127.0.0.1:19820/api", encoding="utf-8", enc_key=None, enc_iv=None):
        super().__init__()

        self._endpoint = broker
        self._encoding = "utf-8"
        if enc_key == None or enc_iv == None:
            self._transport_enc = False
            self._transport_enc_key = None
            self._transport_enc_iv = None
            self._cipher = None
        else:
            self._transport_enc = True
            self._transport_enc_key = enc_key
            self._transport_enc_iv = enc_iv
            backend = default_backend()
            self._cipher = Cipher(algorithms.AES(
                enc_key), modes.CBC(enc_iv), backend=backend)

        self._session = requests.Session()
        self._event_dict = {'logon': self.on_login, 'logoff': self.on_logout, 'ping': self.on_ping,
                            'query_data': self.on_query_data, 'send_order': self.on_insert_order,
                            'cancel_order': self.on_cancel_order_event, 'get_quote': self.on_get_quote}
        self.client_id = ''
        self.account_id = ''
项目:hearthscan-bot    作者:d-schmidt    | 项目源码 | 文件源码
def test_hearthhead(self):
        with requests.Session() as s:
            self.assertEqual(scrape.getHearthHeadId('Quick Shot', 'Spell', s),
                'quick-shot')
            self.assertEqual(scrape.getHearthHeadId('Undercity Valiant',
                'Minion', s), 'undercity-valiant')
            self.assertEqual(scrape.getHearthHeadId('Gorehowl', 'Weapon', s),
                'gorehowl')
            self.assertEqual(scrape.getHearthHeadId('V-07-TR-0N',
                'Minion', s), 'v-07-tr-0n')
            self.assertEqual(scrape.getHearthHeadId("Al'Akir the Windlord",
                'Minion', s), 'alakir-the-windlord')
项目:hearthscan-bot    作者:d-schmidt    | 项目源码 | 文件源码
def test_Hearthpwn(self):
        with requests.Session() as s:
            self.assertEqual(scrape.getHearthpwnIdAndUrl('Quick Shot',
                    'Blackrock Mountain', 'Spell', False, s),
                    (14459, 'https://media-hearth.cursecdn.com/avatars/328/302/14459.png'))
            self.assertEqual(scrape.getHearthpwnIdAndUrl('Upgrade!',
                    'Classic', 'Spell', False, s),
                    (638, 'https://media-hearth.cursecdn.com/avatars/330/899/638.png'))
项目:hearthscan-bot    作者:d-schmidt    | 项目源码 | 文件源码
def loadTokens(tokens = {}, wantedTokens = {}):
    resultCards = {}
    with requests.Session() as session:
        for name, ids in wantedTokens.items():
            card = None

            if 'id' in ids:
                card = tokens[ids['id']]
                if name != card['name']:
                    log.warning('loadTokens() names do not match: %s - %s', name, tokens[ids['id']]['name'])

            if 'id' not in ids:
                for token in tokens.values():
                    if name == token['name']:
                        if card:
                            log.warning('loadTokens() found token again: %s', name)
                        card = token

            if not card:
                log.warning('loadTokens() could not find: %s', name)
                exit()

            r = session.get('http://www.hearthpwn.com/cards/{}'.format(ids['hpwn']))
            r.raise_for_status()
            image = fromstring(r.text).xpath('//img[@class="hscard-static"]')[0].get('src')
            if not image:
                image = 'https://media-hearth.cursecdn.com/avatars/148/738/687.png'

            card['cdn'] = image.replace('http://', 'https://').lower()
            card['hpwn'] = ids['hpwn']
            card['head'] = getHearthHeadId(card['name'], "ignored", "ignored")

            # since jade golem: overwrite scraped stats with prepared ones
            card['atk'] = ids.get('atk', card['atk'])
            card['cost'] = ids.get('cost', card['cost'])
            card['hp'] = ids.get('hp', card['hp'])

            resultCards[card['name']] = card
            print('.', end='')

    return resultCards
项目:CausalGAN    作者:mkocaoglu    | 项目源码 | 文件源码
def download_file_from_google_drive(id, destination):
    URL = "https://docs.google.com/uc?export=download"
    session = requests.Session()

    response = session.get(URL, params={ 'id': id }, stream=True)
    token = get_confirm_token(response)

    if token:
        params = { 'id' : id, 'confirm' : token }
        response = session.get(URL, params=params, stream=True)

    save_response_content(response, destination)
项目:getTV    作者:mattsta    | 项目源码 | 文件源码
def __init__(self, config):
        self.dbFilename = "downloads.db"
        self.showsFilename = "SHOWS"
        self.sourceIP = ""
        self.transmissionHostRemote = ""
        self.userpass = ""
        self.downloadQuality = [720, 1080]
        self.speakDownload = True

        # We don't retain 'proxs' or 'requestsFromSource' in this
        # instance since they get stored/retained inside TorrentApiController
        proxs = {}
        requestsFromSource = requests.Session()

        self.establishConfiguration(config, requestsFromSource, proxs)
        self.torrentController = TorrentApiController(requestsFromSource, proxs)

        self.establishDatabase()
项目:cvprac    作者:aristanetworks    | 项目源码 | 文件源码
def _reset_session(self):
        ''' Get a new request session and try logging into the current
            CVP node. If the login succeeded None will be returned and
            self.session will be valid. If the login failed then an
            exception error will be returned and self.session will
            be set to None.
        '''
        self.session = requests.Session()
        error = None
        try:
            self._login()
        except (ConnectionError, CvpApiError, CvpRequestError,
                CvpSessionLogOutError, HTTPError, ReadTimeout, Timeout,
                TooManyRedirects) as error:
            self.log.error(error)
            # Any error that occurs during login is a good reason not to use
            # this CVP node.
            self.session = None
        return error
项目:skymod    作者:DelusionalLogic    | 项目源码 | 文件源码
def getSession(self):
        if self._session is None:
            self._session = requests.Session()
        return self._session
项目:db_wlan_manager    作者:sistason    | 项目源码 | 文件源码
def __init__(self, user_mode):
        self.user_mode = user_mode
        self.interface = None
        self.quota = None

        self.is_online = None
        self.new_api = None

        self.json_decoder = json.JSONDecoder()
        self.session = requests.Session()
        self.csrf_token = None

        self.resolver = dns.resolver.Resolver()
        self.resolver.nameservers = ['172.16.0.1']
        self.api_host_ip = self.resolve_url(self.api_host)
        self.api_host_new_ip = self.resolve_url(self.api_host_new)
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def gethtml(link):
    user_agent = "Mozilla/5.0 (Windows NT 6.1; rv:38.0) Gecko/20100101 Firefox/38.0"
    headers={'user-agent':user_agent}
    s = requests.Session()
    try:
        res = s.get(link,headers=headers).text
    except requests.exceptions.InvalidSchema:
        req=urllib2.Request(link,None,headers)
        r = urllib2.urlopen(req)
        res = r.read().decode('utf8')
    return res
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def main():
    # Start a session so we can have persistant cookies

    # Session() &gt;&gt; http://docs.python-requests.org/en/latest/api/#request-sessions
    session = requests.Session()

    # This is the form data that the page sends when logging in

    login_data = {
    'username': RegisterNumber,
    'password': DateofBirth,
    'submit': 'id',
    }
    print login_data

    # Authenticate
    r = session.post(URL, data = login_data)

    # Try accessing a page that requires you to be logged in
    r = session.get('http://sdpdr.nic.in/annauniv/result')

    web = QWebView()
    web.load(QUrl("http://sdpdr.nic.in/annauniv/result"))
    #web.show()

    printer = QPrinter()
    printer.setPageSize(QPrinter.A4)
    printer.setOutputFormat(QPrinter.PdfFormat)
    printer.setOutputFileName("result.pdf")

# convertion of page to pdf format
项目:imageDownloader    作者:whcacademy    | 项目源码 | 文件源码
def __init__(self, word, dirpath=None, processNum=30):
        #if " " in word:
            #raise AttributeError("This Script Only Support Single Keyword!")
        self.word = word
        self.char_table = {ord(key): ord(value)
                           for key, value in BaiduImgDownloader.char_table.items()}
        if not dirpath:
            dirpath = os.path.join(sys.path[0], 'results')
        self.dirpath = dirpath
        self.jsonUrlFile = os.path.join(sys.path[0], 'jsonUrl.txt')
        self.logFile = os.path.join(sys.path[0], 'logInfo.txt')
        self.errorFile = os.path.join(sys.path[0], 'errorUrl.txt')
        if os.path.exists(self.errorFile):
            os.remove(self.errorFile)
        if not os.path.exists(self.dirpath):
            os.mkdir(self.dirpath)
        self.pool = Pool(30)
        self.session = requests.Session()
        self.session.headers = BaiduImgDownloader.headers
        self.queue = Queue()
        self.messageQueue = Queue()
        self.index = 0
        self.promptNum = 10
        self.lock = threading.Lock()
        self.delay = 1.5
        self.QUIT = "QUIT"
        self.printPrefix = "**"
        self.processNum = processNum