Python requests 模块,session() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.session()

项目:ArticleSpider    作者:mtianyan    | 项目源码 | 文件源码
def get_captcha():
    import time
    t = str(int(time.time()*1000))
    captcha_url = "https://www.zhihu.com/captcha.gif?r={0}&type=login".format(t)
    t = session.get(captcha_url, headers=header)
    with open("captcha.jpg","wb") as f:
        f.write(t.content)
        f.close()

    from PIL import Image
    try:
        im = Image.open('captcha.jpg')
        im.show()
        im.close()
    except:
        pass

    captcha = input("?????\n>")
    return captcha
项目:PGO-mapscan-opt    作者:seikur0    | 项目源码 | 文件源码
def new_session(account):
    if account.get('session',None) is None:
        session = requests.session()

        session.verify = True
        session.headers.update({'User-Agent': 'Niantic App'})  # session.headers.update({'User-Agent': 'niantic'})
        if not account['proxy'] is None:
            session.proxies.update(account['proxy'])
        account['session'] = session
    else:
        account['session'].close()
        account['session'].cookies.clear()

    account['session_time'] = get_time()
    account['session_hash'] = os.urandom(32)

    account['api_url'] = API_URL
    account['auth_ticket'] = None
项目:gee-bridge    作者:francbartoli    | 项目源码 | 文件源码
def get_upload_url(self,session):
        """Summary

        Args:
            session (TYPE): Description

        Returns:
            TYPE: Description
        """
        r = session.get ( DataManagement.__APPSPOT_URL )
        if r.text.startswith ( '\n<!DOCTYPE html>' ):
            self.logger.debug ( 'Incorrect credentials. Probably. If you are sure the credentials are OK, '
                                'refresh the authentication token. If it did not work report a problem. '
                                'They might have changed something in the Matrix.' )
            sys.exit ( 1 )
        elif r.text.startswith ( '<HTML>' ):
            self.logger.debug ( 'Redirecting to upload URL' )
            r = session.get ( DataManagement.__APPSPOT_URL )
            d = ast.literal_eval ( r.text )

        return d['url']
项目:bitrader    作者:jr-minnaar    | 项目源码 | 文件源码
def __init__(self, cache: bool = False, future: bool = True):
        if cache:
            redis_conn = redis.StrictRedis(host='redis')
            self.session = requests_cache.core.CachedSession(
                cache_name='api_cache',
                backend='redis', expire_after=60 * 60 * 24 * 30,
                allowable_codes=(200,),
                allowable_methods=('GET',),
                old_data_on_error=False,
                connection=redis_conn,
            )
        else:
            self.session = session()
        if future:
            self.future_session = FuturesSession(max_workers=10, session=self.session)
        self.url = self.url_template.format(resource='', token=self.token)
项目:sinal2    作者:observerss    | 项目源码 | 文件源码
def get_token(self, symbols, wlist):
        ip = Helper.get_ip()
        url = 'https://current.sina.com.cn/auth/api/jsonp.php/' + \
            'var%20KKE_auth_{}=/'.format(Helper.random_string(9)) + \
            'AuthSign_Service.getSignCode?' + \
            'query=hq_pjb&ip={}&list={}&kick=1'.format(ip, wlist)
        resp = self.session.get(url)
        pat = re.compile('result:"([^"]+)",timeout:(\d+)')
        m = pat.search(resp.text)
        if m:
            token, timeout = m.groups()
            timeout = int(timeout)
            return token
        else:
            log.error('token error: {}'.format(resp.text))
            return self.get_token(symbols, wlist)
项目:pyfilmweb    作者:lopezloo    | 项目源码 | 文件源码
def login(self, name, password):
      """Authenticates user.

      :param str name: account name
      :param str password: account password

      .. note::
         Throws :class:`exceptions.RequestFailed` (20, 'badCreadentials') if bad credentials are passed.
      """
      self.session = requests.session()

      remember = True
      data = self._request('login', [name, password, int(remember)], hmethod='POST')
      return LoggedUser(fw=self, uid=data[3], name=data[0], img=common.img_path_to_relative(data[1]), sex=data[4], birth_date=data[5])

   # This method is one big TODO.
项目:ins-account    作者:E0han    | 项目源码 | 文件源码
def first_get(self):
            global _session
            _session=requests.session()
            main_url='https://www.instagram.com'
            _session.get(main_url,proxies=self.use_proxy,verify=True)
            self.save_cookies()
            if os.path.exists('cookiefile'):#print('have cookies')
                self.csrf=self.read_cookies()
                self.data=self.create_ajax()
                print(self.data)
                self.ins()
                time.sleep(5)#wait for 5 seconds
                self.my_selfie=get_pic.get_pic()#???——??-??
                self.my_selfie.get_selfie()#download random selfie picture to local folder
                self.upload()#upload the selfie
            else:
                pass
项目:Imagetyperz-Python-API-examples    作者:zphanjakidze    | 项目源码 | 文件源码
def __init__(self, username, password, timeout = 120, ref_id = 0):
        self._username = username
        self._password = password

        self._ref_id = '{}'.format(ref_id)      # save as str
        self._timeout = timeout
        self._session = session()       # init a new session

        self._normal_captcha = None            # save last solved captcha
        self._recaptcha = None

        self._error = None              # keep track of last error

        self._headers = {               # use this user agent
            'User-Agent' : USER_AGENT
        }

    # solve normal captcha
项目:smc-python    作者:gabstopper    | 项目源码 | 文件源码
def _get_log_schema(self):
        """
        Get the log schema for this SMC version.

        :return: dict
        """
        if self.session and self.session_id:
            schema = '{}/{}/monitoring/log/schemas'.format(self.url, self.api_version)

            response = self.session.get(
                url=schema,
                headers={'cookie': self.session_id,
                         'content-type': 'application/json'})

            if response.status_code in (200, 201):
                return response.json()
项目:smc-python    作者:gabstopper    | 项目源码 | 文件源码
def available_api_versions(base_url, timeout=10, verify=True):
    """
    Get all available API versions for this SMC

    :return version numbers
    :rtype: list
    """
    try:
        r = requests.get('%s/api' % base_url, timeout=timeout,
                         verify=verify)  # no session required

        if r.status_code == 200:
            j = json.loads(r.text)
            versions = []
            for version in j['version']:
                versions.append(version['rel'])
            #versions = [float(i) for i in versions]
            return versions

        raise SMCConnectionError(
            'Invalid status received while getting entry points from SMC. '
            'Status code received %s. Reason: %s' % (r.status_code, r.reason))

    except requests.exceptions.RequestException as e:
        raise SMCConnectionError(e)
项目:Recruit    作者:Weiyanyu    | 项目源码 | 文件源码
def getListProxies(self):  
        session = requests.session()
        self.getRandomUserAgent()  
        page = session.get("http://www.xicidaili.com/nn", headers=self.headers)  
        soup = BeautifulSoup(page.text, 'lxml')  

        proxyList = []  
        taglist = soup.find_all('tr', attrs={'class': re.compile("(odd)|()")})  
        for trtag in taglist:  
            tdlist = trtag.find_all('td')  
            proxy = { 'https': tdlist[1].string + ':' + tdlist[2].string }  
            proxyList.append(proxy)  

        return proxyList

    #????????????????????
项目:Preston    作者:Celeo    | 项目源码 | 文件源码
def __init__(self, key=None, code=None, user_agent='', cache=None):
        """
        This class is the base for access the XML API. Attributes
        can be called on it to build a path to an endpoint.

        Args:
            key (str) - optional authentication keyID
            code (str) - optional authentication vCode
            user_agent (str) - optional (recommended) User-Agent
                                to use when making web requests
            cache (preston.xmlapi.cache.Cache) - page cache

        Returns:
            None
        """
        self.cache = cache or Cache()
        self.user_agent = user_agent or 'Preston (github.com/Celeo/Preston)'
        self.session = requests.session()
        self.session.headers.update({'User-Agent': self.user_agent})
        self.key = key
        self.code = code
项目:weibo    作者:windskyer    | 项目源码 | 文件源码
def __init__(self, api_key, api_secret, redirect_uri, token=None):
        # const define
        self.site = 'https://api.weibo.com/'
        self.authorization_url = self.site + 'oauth2/authorize'
        self.token_url = self.site + 'oauth2/access_token'
        self.api_url = self.site + '2/'

        # init basic info
        self.client_id = api_key
        self.client_secret = api_secret
        self.redirect_uri = redirect_uri

        self.session = requests.session()

        # activate client directly if given token
        if token:
            self.set_token(token)
项目:weibo    作者:windskyer    | 项目源码 | 文件源码
def __init__(self, api_key, api_secret, redirect_uri, token=None):
        # const define
        self.site = 'https://api.weibo.com/'
        self.authorization_url = self.site + 'oauth2/authorize'
        self.token_url = self.site + 'oauth2/access_token'
        self.api_url = self.site + '2/'

        # init basic info
        self.client_id = api_key
        self.client_secret = api_secret
        self.redirect_uri = redirect_uri

        self.session = requests.session()

        # activate client directly if given token
        if token:
            self.set_token(token)
项目:weibo    作者:windskyer    | 项目源码 | 文件源码
def __init__(self, api_key, api_secret, redirect_uri, token=None):
        # const define
        self.site = 'https://api.weibo.com/'
        self.authorization_url = self.site + 'oauth2/authorize'
        self.token_url = self.site + 'oauth2/access_token'
        self.api_url = self.site + '2/'

        # init basic info
        self.client_id = api_key
        self.client_secret = api_secret
        self.redirect_uri = redirect_uri

        self.session = requests.session()

        # activate client directly if given token
        if token:
            self.set_token(token)
项目:sfm-weibo-harvester    作者:gwu-libraries    | 项目源码 | 文件源码
def get(self, uri, **kwargs):
        """
        Request resource by get method.
        """
        # 500 test url
        # self.api_url='https://httpbin.org/status/500'

        url = "{0}{1}.json".format(self.api_url, uri)

        res = self.session.get(url, params=kwargs)
        # other error code with server will be deal in low level app
        # 403 for invalid access token and rate limit
        # 400 for information of expire token
        if res.status_code in [200, 400, 403]:
            self._assert_error(res.json())
        return res
项目:ZhihuSpider    作者:KEN-LJQ    | 项目源码 | 文件源码
def create_session_proxy(self, proxy_info):
        # ??????
        ip = proxy_info['ip']
        port = proxy_info['port']
        protocol = proxy_info['protocol'].lower()
        proxy = {protocol: ip + ':' + port}

        # ??session
        session = requests.session()
        session.headers = requestHeader
        session.cookies.update(self.account_manager.get_auth_token())
        session.proxies = proxy

        # ? session ?????
        self.session_pool.put(session)
        self.created_session_num_change(1)
        self.available_session_num_change(1)

    # ????????? session
项目:ZhihuSpider    作者:KEN-LJQ    | 项目源码 | 文件源码
def fetch_proxy_data(self, page):
        # ??????
        self.session.headers = requestHeader

        # ???? URL
        url = requestUrl + str(page)

        # ????
        retry_time = 0
        while retry_time < NETWORK_RETRY_TIMES:
            try:
                response = self.session.get(url, timeout=CONNECT_TIMEOUT)
                return response.text
            except Exception:
                # ????
                # print("[????]?????????????")
                retry_time += 1
        time.sleep(NETWORK_RECONNECT_INTERVAL)

        # ???????? None
        return None
项目:pyChomikBox    作者:JuniorJPDJ    | 项目源码 | 文件源码
def __init__(self, url, name=None, requests_session=None, timeout=30):
        IOBase.__init__(self)
        self.url = url
        self.sess = requests_session if requests_session is not None else requests.session()
        self._seekable = False
        self.timeout = timeout
        f = self.sess.head(url, headers={'Range': 'bytes=0-'}, timeout=timeout)
        if f.status_code == 206 and 'Content-Range' in f.headers:
            self._seekable = True
        self.len = int(f.headers["Content-Length"])
        if name is None:
            if "Content-Disposition" in f.headers:
                value, params = cgi.parse_header(f.headers["Content-Disposition"])
                if "filename" in params:
                    self.name = params["filename"]
        else:
            self.name = name
        f.close()
        self._pos = 0
        self._r = None
项目:TapNews    作者:AaronTL    | 项目源码 | 文件源码
def extract_news(news_url):
    # Fetch html
    session_requests = requests.session()
    response = session_requests.get(news_url, headers=getHeaders())

    news = {}

    try:
        # Parse html
        tree = html.fromstring(response.content)
        # Extract information
        news = tree.xpath(GET_CNN_NEWS_XPATH)
        news = ''.join(news)
    except Exception as e:
        print # coding=utf-8
        return {}

    return news
项目:BaiduCloudHelper    作者:yp05327    | 项目源码 | 文件源码
def logout(self):
        '''
        ????

        :returns: True????False????????
        '''

        #passport_logout_response = self.get_response(logout_url)

        self.session.cookies = cookielib.CookieJar()
        response = self.get_response(logout_url)
        check_logout = re.findall('????', response)

        if len(check_logout) > 0:
            self.logined = False
            self.remove_cookies()
            return True
        else:
            return False
项目:BaiduCloudHelper    作者:yp05327    | 项目源码 | 文件源码
def save_cookies(self):
        '''
        ??cookie

        :returns: True????False????????
        '''

        try:
            if os.path.isfile('cookie.list'):
                os.remove('cookie.list')

            fd = open('cookie.list', "wb+")

            cookie_list = requests.utils.dict_from_cookiejar(self.session.cookies)

            fd.write(json.dumps(cookie_list))
            fd.close()

            return True
        except Exception:
            utils.show_msg(traceback.print_exc())
            utils.show_msg('???Can\'t save cookie list file.')
            return False
项目:spider    作者:aweng126    | 项目源码 | 文件源码
def get_all_pic_url(question_num, answer_offset,answer_limit):
    url = 'https://www.zhihu.com/api/v4/questions/{qnum}/answers?include=data%5B*%5D.is_normal%2Cis_collapsed%2Cannotation_action%' \
          '2Cannotation_detail%2Ccollapse_reason%2Cis_sticky%2Ccollapsed_by%2Csuggest_edit%2Ccomment_count%2Ccan_comment%2Ccontent%2Ceditable_content%2Cvoteup_count%2Creshipment_settings%2Ccomment_permission%' \
          '2Cmark_infos%2Ccreated_time%2Cupdated_time%2Creview_info%2Crelationship.is_authorized%2Cis_author%2Cvoting%2Cis_thanked%2Cis_nothelp%2Cupvoted_followees%3Bdata%5B*%5D.author.follower_count%2Cbadge%5B%3F(type%3Dbest_answerer)%5D.topics&' \
          'offset={offset}&limit={limit}&sort_by=default'
    response = session.get(url.format(qnum=question_num,offset=answer_offset,limit=answer_limit), headers=headers, allow_redirects=False)

    print('json_response', response)
    json_response = response.json()

    answer = json_response['data']
    pattern = re.compile(r'data-original=\"https\:(.*?)\.(jpg|png)"')
    urls = [];
    for i in range(0, len(answer)):
        per_answer_dict = answer[i]  # dict
        per_answer_content_str = per_answer_dict['content']
        match = pattern.findall(per_answer_content_str)
        urls.extend(["https:" + i[0] + ".jpg" for i in match[1::2]])
    return urls
项目:fac    作者:mickael9    | 项目源码 | 文件源码
def login(self, username, password, require_ownership=False):
        resp = self.session.post(
            self.login_url,
            params=dict(require_game_ownership=int(require_ownership)),
            data=dict(username=username, password=password)
        )

        try:
            json = resp.json()
        except:
            json = None

        try:
            resp.raise_for_status()
            return json[0]
        except requests.HTTPError:
            if isinstance(json, dict) and 'message' in json:
                if json['message'] == 'Insufficient membership':
                    raise OwnershipError(json['message'])
                else:
                    raise AuthError(json['message'])
            else:
                raise
项目:ptp    作者:fordham-css    | 项目源码 | 文件源码
def login(self, username, password):
        self.username = username
        self.password = password
        data = "password=%s&username=%s" % (self.password, self.username)
        res = self.session.post(self.endpoints['login'], data=data)
        res = res.json()
        try:
            self.auth_token = res['token']
        except KeyError:
            return False
        self.headers['Authorization'] = 'Token '+self.auth_token
        return True


    ##############################
    #GET DATA 
    ##############################
项目:ICP    作者:initiate6    | 项目源码 | 文件源码
def run(self):
            try:
                    url = 'http://%s/Home/Save' % (self.url)
                    data = { 'u' : self.username, 'p' : self.password }
                    print(url)
                    print(data)
                    s = requests.session()
                    r = s.post(url, data, verify=False)

                    print(r.status_code, r.content )
                    print('-----------------------------------------------------')

            except Exception as e:
                    print(e)
                    pass

                #self.parent and self.parent.on_thread_finished(self, 'done')

# top-level domains
项目:plugin.video.telekom-sport    作者:asciidisco    | 项目源码 | 文件源码
def __init__(self, constants, util, settings):
        """
        Injects instances, sets session file & loads initial session

        :param constants: Constants instance
        :type constants: resources.lib.Constants
        :param util: Utils instance
        :type util: resources.lib.Utils
        :param settings: Settings instance
        :type settings: resources.lib.Settings
        """
        self.constants = constants
        self.utils = util
        self.settings = settings
        addon = self.utils.get_addon()
        verify_ssl = True if addon.getSetting('verifyssl') == 'True' else False
        self.session_file = self.utils.get_addon_data().get('cookie_path')
        self.verify_ssl = verify_ssl
        self._session = self.load_session()
项目:plugin.video.telekom-sport    作者:asciidisco    | 项目源码 | 文件源码
def load_session(self):
        """
        Generates the build up session object,
        loads & deserializes Cookie file if exists

        :returns:  requests.session -- Session object
        """
        _session = session()
        _session.headers.update({
            'User-Agent': self.utils.get_user_agent(),
            'Accept-Encoding': 'gzip'
        })
        if path.isfile(self.session_file):
            with open(self.session_file, 'r') as handle:
                try:
                    _cookies = utils.cookiejar_from_dict(pickle.load(handle))
                except EOFError:
                    _cookies = utils.cookiejar_from_dict({})
                _session.cookies = _cookies
        return _session
项目:jingdong618    作者:hupujrs2017    | 项目源码 | 文件源码
def getCoupon():
    sched_Timer="2017-06-14 20:00" ##???????
    ##????????url  ?????Network??
    couPonUrl="https://api.m.jd.com/client.action?functionId=newBabelAwardCollection&body=%7B%22activityId%22%3A%223tPzkSJZdNRuhgmowhPn7917dcq1%22%2C%22scene%22%3A%221%22%2C%22args%22%3A%22key%3D898c3948b1a44f36b032c8619e2514eb%2CroleId%3D6983488%2Cto%3Dpro.m.jd.com%2Fmall%2Factive%2F3tPzkSJZdNRuhgmowhPn7917dcq1%2Findex.html%22%2C%22mitemAddrId%22%3A%22%22%2C%22geo%22%3A%7B%22lng%22%3A%22%22%2C%22lat%22%3A%22%22%7D%7D&client=wh5&clientVersion=1.0.0&sid=dce17971eb6cbfcc2275dded296bcb58&uuid=1506710045&area=&_=1497422307569&callback=jsonp5"
    ##????????referer  ?????Network??
    referer="https://pro.m.jd.com/mall/active/3tPzkSJZdNRuhgmowhPn7917dcq1/index.html"
    while(1):
        now=datetime.datetime.now().strftime('%Y-%m-%d %H:%M');
        if now==sched_Timer:
            cj = requests.utils.cookiejar_from_dict(get_cookie())
            session.cookies = cj
            resp=session.get(
                          url=couPonUrl,
                          headers={
                         'Referer':referer ,
                       }
                        )
            logger.info(resp.text)
            break
项目:Fuck_Login    作者:devzhan    | 项目源码 | 文件源码
def getCaptcha(self):
        captchaImgUrl = 'https://passport.lagou.com/vcode/create?from=register&refresh=%s' % time.time()
        # ???????
        f = open(CaptchaImagePath, 'wb')
        f.write(self.session.get(captchaImgUrl, headers=HEADERS).content)
        f.close()
        # ???????
        if sys.platform.find('darwin') >= 0:

            subprocess.call(['open', CaptchaImagePath])
        elif sys.platform.find('linux') >= 0:
            subprocess.call(['xdg-open', CaptchaImagePath])
        else:
            os.startfile(CaptchaImagePath)

        # ???????
        captcha = input("???????(% s)????: " % CaptchaImagePath)
        print('????????:% s' % captcha)
        return captcha
项目:Fuck_Login    作者:devzhan    | 项目源码 | 文件源码
def get_vdcode():
    t = str(int(time.time()*1000))
    captcha_url = 'https://passport.bilibili.com/captcha.gif?r=' + t + "&type=login"
    r = session.get(captcha_url)
    with open('captcha.jpg', 'wb') as f:
        f.write(r.content)
        f.close()
    # ?pillow ? Image ?????
    # ?????? pillow ?????????????????????
    try:
        im = Image.open('captcha.jpg')
        im.show()
        im.close()
    except:
        print(u'?? %s ????captcha.jpg ????' % os.path.abspath('captcha.jpg'))
    captcha = input("please input the captcha\n>")
    return captcha
项目:apimatic-cli    作者:apimatic    | 项目源码 | 文件源码
def __init__(self, timeout=60, cache=False, max_retries=None, retry_interval=None):
        """The constructor.

        Args:
            timeout (float): The default global timeout(seconds).

        """
        self.timeout = timeout
        self.session = requests.session()

        if max_retries and retry_interval:
            retries = Retry(total=max_retries, backoff_factor=retry_interval)
            self.session.mount('http://', HTTPAdapter(max_retries=retries))
            self.session.mount('https://', HTTPAdapter(max_retries=retries))

        if cache:
            self.session = CacheControl(self.session)
项目:apimatic-cli    作者:apimatic    | 项目源码 | 文件源码
def execute_as_string(self, request):
        """Execute a given HttpRequest to get a string response back

        Args:
            request (HttpRequest): The given HttpRequest to execute.

        Returns:
            HttpResponse: The response of the HttpRequest.

        """
        response = self.session.request(HttpMethodEnum.to_string(request.http_method),
                                        request.query_url,
                                        headers=request.headers,
                                        params=request.query_parameters,
                                        data=request.parameters,
                                        files=request.files,
                                        timeout=self.timeout)

        return self.convert_response(response, False)
项目:apimatic-cli    作者:apimatic    | 项目源码 | 文件源码
def execute_as_binary(self, request):
        """Execute a given HttpRequest to get a binary response back

        Args:
            request (HttpRequest): The given HttpRequest to execute.

        Returns:
            HttpResponse: The response of the HttpRequest.

        """
        response = self.session.request(HttpMethodEnum.to_string(request.http_method),
                                        request.query_url,
                                        headers=request.headers,
                                        params=request.query_parameters,
                                        data=request.parameters,
                                        files=request.files,
                                        timeout=self.timeout)

        return self.convert_response(response, True)
项目:ArticleSpider    作者:mtianyan    | 项目源码 | 文件源码
def is_login():
    #???????????????????????
    inbox_url = "https://www.zhihu.com/question/56250357/answer/148534773"
    response = session.get(inbox_url, headers=header, allow_redirects=False)
    if response.status_code != 200:
        return False
    else:
        return True
项目:ArticleSpider    作者:mtianyan    | 项目源码 | 文件源码
def get_xsrf():
    #??xsrf code
    response = session.get("https://www.zhihu.com", headers=header)
    response_text = response.text
    match_obj = re.match('.*name="_xsrf" value="(.*?)"', response_text, re.DOTALL)
    xsrf = ''
    if match_obj:
        xsrf = (match_obj.group(1))
        return xsrf
项目:ArticleSpider    作者:mtianyan    | 项目源码 | 文件源码
def get_index():
    response = session.get("https://www.zhihu.com", headers=header)
    with open("index_page.html", "wb") as f:
        f.write(response.text.encode("utf-8"))
    print ("ok")
项目:ArticleSpider    作者:mtianyan    | 项目源码 | 文件源码
def zhihu_login(account, password):
    #????
    if re.match("^1\d{10}",account):
        print ("??????")
        post_url = "https://www.zhihu.com/login/phone_num"
        post_data = {
            "_xsrf": get_xsrf(),
            "phone_num": account,
            "password": password,
            "captcha":get_captcha()
        }
    else:
        if "@" in account:
            #??????????
            print("??????")
            post_url = "https://www.zhihu.com/login/email"
            post_data = {
                "_xsrf": get_xsrf(),
                "email": account,
                "password": password
            }

    response_text = session.post(post_url, data=post_data, headers=header)
    session.cookies.save()

# get_index()
# is_login()
# get_captcha()
项目:taobao_bra_crawler    作者:nladuo    | 项目源码 | 文件源码
def get_http_client():
    if config['use_tor_proxy']:
        session = requesocks.session()
        session.proxies = {'http': 'socks5://127.0.0.1:%d' % config['tor_proxy_port'],
                           'https': 'socks5://127.0.0.1:%d' % config['tor_proxy_port']}
        return session
    else:
        return requests.session()


# ??get??
项目:abodepy    作者:MisterWil    | 项目源码 | 文件源码
def __init__(self, username=None, password=None,
                 auto_login=False, get_devices=False, get_automations=False):
        """Init Abode object."""
        self._username = username
        self._password = password
        self._session = None
        self._token = None
        self._panel = None
        self._user = None

        self._event_controller = AbodeEventController(self)

        self._default_alarm_mode = CONST.MODE_AWAY

        self._devices = None

        self._automations = None

        # Create a requests session to persist the cookies
        self._session = requests.session()

        if (self._username is not None and
                self._password is not None and
                auto_login):
            self.login()

        if get_devices:
            self.get_devices()

        if get_automations:
            self.get_automations()
项目:abodepy    作者:MisterWil    | 项目源码 | 文件源码
def logout(self):
        """Explicit Abode logout."""
        if self._token:
            header_data = {
                'ABODE-API-KEY': self._token
            }

            self._session = requests.session()
            self._token = None
            self._panel = None
            self._user = None
            self._devices = None
            self._automations = None

            try:
                response = self._session.post(
                    CONST.LOGOUT_URL, headers=header_data)
                response_object = json.loads(response.text)
            except OSError as exc:
                _LOGGER.warning("Caught exception during logout: %s", str(exc))
                return False

            if response.status_code != 200:
                raise AbodeAuthenticationException(
                    (response.status_code, response_object['message']))

            _LOGGER.debug("Logout Response: %s", response.text)

            _LOGGER.info("Logout successful")

        return True
项目:DropboxConnect    作者:raguay    | 项目源码 | 文件源码
def pinned_session(pool_maxsize=8):
    http_adapter = _SSLAdapter(pool_connections=4, pool_maxsize=pool_maxsize)
    _session = requests.session()
    _session.mount('https://', http_adapter)

    return _session
项目:PyZWare    作者:Z-WavePublic    | 项目源码 | 文件源码
def zw_init(url='https://127.0.0.1/', user='test_user', pswd='test_password'):
        zwareGlobals.zwareSession = requests.session()
        zwareGlobals.zwareUrl = url
        zwareGlobals.zwareSession.headers.update({'Content-Type':'application/x-www-form-urlencoded'}) # apache requires this
        zw_api('register/login.php', 'usrname='+ user + '&passwd=' + pswd) 
        zwareGlobals.zwareUrl += 'cgi/zcgi/networks//'
        return zw_api('zw_version')
项目:Crawler_and_Share    作者:f496328mm    | 项目源码 | 文件源码
def main_craw_ptt(i,ptt_class_name,sql_name,bo):
    #ptt_class_name = 'Soft_Job'
    index_name = 'http://www.ptt.cc'
    index_class = '/bbs/' + ptt_class_name + '/index'
    # i=4806, i=18

    index_url = index_name + index_class +str(i)+'.html' # ?? i ???
    #res = requests.get(index_url,verify = False)               
    res = requests.get(index_url,verify = True) # ?? html ???
    soup = BeautifulSoup(res.text, "lxml")# html????   ??????? 

    temp = soup.find_all("",{'class':'r-ent'})

    for i in range( len( temp ) ): # i=0 len( temp )
        #print(i)
        temp2 = temp[i].find('a')
        if( str( temp2 ) == 'None' ):# ???????   ? return error, ??????????
            print('error')
        elif( str( temp2 ) != 'None' ):# ???????
            #print(i)
            article_url = temp[i].find('a')['href']# ?????
            article_url = index_name+article_url# ? index ??
            title = temp[i].find('a').get_text()# ? title
            # article_url = 'https://www.ptt.cc/bbs/Soft_Job/M.1503652456.A.526.html'
            response = requests.session().get( article_url )#response, ????, 200????
            if( response.status_code == 404 ):
                print(404)
            elif( re.search('[??]',title) ):# ????
                print('[??]')
            elif( response.status_code == 200  ):# 200????
                if(bo == 'new'):# ??data, ??????, 
                    # max date time ??, ??sql?? max time, ??????, ??
                    date_time = catch_ptt_history_date_time(ptt_class_name,sql_name)
                    max_date_time = date_time
                elif(bo == 'his'):# ?????, ???????, ?????
                    max_date_time = 0
                tem = craw_ptt_data_fun(article_url,temp,i,index_url,sql_name,max_date_time,bo)
        else:
            print('other')
#---------------------------------------------------------------------------------  
# ???? data, ????????? index=100 ?, ?5????error, ???6?????
项目:Crawler_and_Share    作者:f496328mm    | 项目源码 | 文件源码
def fix_data(i,ptt_class_name,sql_name,bo,j):
    #ptt_class_name = 'Soft_Job'
    index_name = 'http://www.ptt.cc'
    index_class = '/bbs/' + ptt_class_name + '/index'
    # i=4806, i=18

    index_url = index_name + index_class +str(i)+'.html'
    #res = requests.get(index_url,verify = False)     
    # index_url = 'http://www.ruten.com.tw/'    
    res = requests.get(index_url,verify = True)
    soup = BeautifulSoup(res.text, "lxml")

    temp = soup.find_all("",{'class':'r-ent'})

    for i in range( j,len( temp ) ): # i=12 len( temp )
        #print(i)
        temp2 = temp[i].find('a')
        if( str( temp2 ) == 'None' ):
            print('error')
        elif( str( temp2 ) != 'None' ):
            #print(i)
            article_url = temp[i].find('a')['href']
            article_url = index_name+article_url
            title = temp[i].find('a').get_text()
            # article_url = 'https://www.ptt.cc/bbs/Soft_Job/M.1503652456.A.526.html'
            response = requests.session().get( article_url )
            if( response.status_code == 404 ):
                print(404)
            elif( re.search('[??]',title) ):
                print('[??]')
            elif( response.status_code == 200  ):
                if(bo == 'new'):
                    date_time = catch_ptt_history_date_time(ptt_class_name,sql_name)
                    max_date_time = max(date_time)
                elif(bo == 'his'):
                    max_date_time = 0
                tem = craw_ptt_data_fun(article_url,temp,i,index_url,sql_name,max_date_time,bo)
        else:
            print('other')
#---------------------------------------------------------------------------------              
# ?????, ??????
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def __init__(self, instance, server_login, dedi_code, path, pack_mask, server_version, server_build, game='TM2'):
        """
        Initiate dedi api.

        :param instance: ControllerInstance
        :param server_login: .
        :param dedi_code: .
        :param path: .
        :param pack_mask: .
        :param server_version: .
        :param server_build: .
        :param game: Game info
        :type instance: pyplanet.core.instance.Instance
        """
        self.instance = instance
        self.loop = instance.loop
        self.client = requests.session()
        self.headers = {
            'User-Agent': 'PyPlanet/{}'.format(version),
            'Accept': 'text/xml',
            'Accept-Encoding': 'gzip',
            'Content-Type': 'text/xml; charset=UTF-8',
            'Content-Encoding': 'gzip',
            'Keep-Alive': 'timeout=600, max=2000',
            'Connection': 'Keep-Alive',
        }

        self.server_login = server_login
        self.dedimania_code = dedi_code
        self.path = path
        self.pack_mask = pack_mask
        self.server_version = server_version
        self.server_build = server_build
        self.game = game

        self.update_task = None
        self.session_id = None
        self.retries = 0
项目:gee-bridge    作者:francbartoli    | 项目源码 | 文件源码
def create_google_session(self):
        """Summary

        Returns:
            TYPE: Description
        """
        session = requests.session ()
        login_html = session.get ( DataManagement.__GOOGLE_ACCOUNT_URL )

        #Check cookies returned because there is an issue with the authentication
        #GAPS , GALX , NID - These cookies are used to identify the user when using Google + functionality.
        #GAPS is still provided
        self.logger.debug(session.cookies.get_dict ().keys ())
        try:
            galx = session.cookies['GALX']
        except:
            self.logger.error('No cookie GALX')

        soup_login = BeautifulSoup ( login_html.content , 'html.parser' ).find ( 'form' ).find_all ( 'input' )
        payload = {}
        for u in soup_login:
            if u.has_attr ( 'value' ):
                payload[u['name']] = u['value']

        payload['Email'] = self.__username
        payload['Passwd'] = self.__password

        auto = login_html.headers.get ( 'X-Auto-Login' )
        follow_up = unquote ( unquote ( auto ) ).split ( 'continue=' )[-1]
        #Commented as suggested in https://github.com/tracek/gee_asset_manager/issues/36
        #galx = login_html.cookies['GALX']

        payload['continue'] = follow_up

        # Commented as suggested in https://github.com/tracek/gee_asset_manager/issues/36
        #payload['GALX'] = galx

        session.post ( DataManagement.__AUTHENTICATION_URL , data=payload )
        return session
项目:gee-bridge    作者:francbartoli    | 项目源码 | 文件源码
def __upload_image(self,file_path,session,upload_url,image_name,properties,nodata):
        """Summary

        Args:
            file_path (TYPE): Description
            session (TYPE): Description
            upload_url (TYPE): Description
            image_name (TYPE): Description
            properties (TYPE): Description
            nodata (TYPE): Description
        """
        with open ( file_path , 'rb' ) as f:
            files = {'file': f}
            resp = session.post ( upload_url , files=files )
            gsid = resp.json ()[0]
            asset_data = {"id": image_name ,
                          "tilesets": [
                              {"sources": [
                                  {"primaryPath": gsid ,
                                   "additionalPaths": []}
                              ]}
                          ] ,
                          "bands": [] ,
                          "properties": properties ,
                          "missingData": {"value": nodata}
                          }
            task_id = ee.data.newTaskId ( 1 )[0]
            _ = ee.data.startIngestion ( task_id , asset_data )
项目:gee-bridge    作者:francbartoli    | 项目源码 | 文件源码
def data_management(self,session,upload_url,assets_names,file_path, properties, nodata):
        """Summary

        Args:
            session (TYPE): Description
            upload_url (TYPE): Description
            assets_names (TYPE): Description
            file_path (TYPE): Description
            properties (TYPE): Description
            nodata (TYPE): Description
        """
        file_root = file_path.split ( "/" )[-1].split ( "." )[0]
        image_name = self.asset_path + '/%s' % file_root

        already_uploaded = False
        if file_root in assets_names:
            self.logger.error("%s already in collection" % file_root)
            already_uploaded = True

        #if name file already in that asset it throws an error
        if os.path.exists ( file_path ) and not already_uploaded:
            self.__upload_image(file_path , session , upload_url , image_name,properties,nodata)
        else:
            self.logger.debug('%s already uploaded in GEE - Deleting old file' % file_root)
            self.__delete_image(image_name)
            self.__upload_image ( file_path , session , upload_url , image_name , properties , nodata )
项目:home-cluster    作者:fabianvf    | 项目源码 | 文件源码
def __init__(self):
        self.inventory = defaultdict(list)  # A list of groups and the hosts in that group
        self.cache = dict()   # Details about hosts in the inventory
        self.params = dict()  # Params of each host
        self.facts = dict()   # Facts of each host
        self.hostgroups = dict()  # host groups
        self.session = None   # Requests session
        self.config_paths = [
            "/etc/ansible/foreman.ini",
            os.path.dirname(os.path.realpath(__file__)) + '/foreman.ini',
        ]
        env_value = os.environ.get('FOREMAN_INI_PATH')
        if env_value is not None:
            self.config_paths.append(os.path.expanduser(os.path.expandvars(env_value)))