Python requests 模块,hooks() 实例源码

我们从Python开源项目中,提取了以下29个代码示例,用于说明如何使用requests.hooks()

项目:trip    作者:littlecodersh    | 项目源码 | 文件源码
def __init__(self):
        self.headers = default_headers()
        self.auth = None
        self.proxies = {}
        self.hooks = default_hooks()
        self.params = {}
        self.stream = False
        self.verify = True
        self.cert = None
        self.max_redirects = DEFAULT_REDIRECT_LIMIT
        self.trust_env = True
        self.cookies = cookiejar_from_dict({})
        self.adapter = HTTPAdapter()
项目:trip    作者:littlecodersh    | 项目源码 | 文件源码
def prepare_request(self, request):
        cookies = request.cookies or {}

        # Bootstrap CookieJar.
        if not isinstance(cookies, cookielib.CookieJar):
            cookies = cookiejar_from_dict(cookies)

        # Merge with session cookies
        merged_cookies = merge_cookies(
            merge_cookies(RequestsCookieJar(), self.cookies), cookies)

        # Set environment's basic authentication if not explicitly set.
        # auth = request.auth
        # if self.trust_env and not auth and not self.auth:
        #     auth = get_netrc_auth(request.url)

        p = PreparedRequest()
        p.prepare(
            method=request.method.upper(),
            url=request.url,
            files=request.files,
            data=request.data,
            json=request.json,
            headers=merge_setting(request.headers,
                self.headers, dict_class=CaseInsensitiveDict),
            params=merge_setting(request.params, self.params),
            auth=merge_setting(request.auth, self.auth),
            cookies=merged_cookies,
            hooks=merge_hooks(request.hooks, self.hooks),
        )
        return p
项目:trip    作者:littlecodersh    | 项目源码 | 文件源码
def request(self, method, url,
            params=None, data=None, headers=None, cookies=None, files=None,
            auth=None, timeout=None, allow_redirects=True, proxies=None,
            hooks=None, stream=None, verify=None, cert=None, json=None):

        req = Request(
            method=method.upper(),
            url=url,
            headers=headers,
            files=files,
            data=data or {},
            json=json,
            params=params or {},
            auth=auth,
            cookies=cookies,
            hooks=hooks,
        )
        request = self.prepare_request(req)

        proxies = proxies or {}

        settings = self.merge_environment_settings(
            request.url, proxies, stream, verify, cert
        )

        # Send the request.
        send_kwargs = {
            'timeout': timeout,
            'allow_redirects': allow_redirects,
        }
        send_kwargs.update(settings)

        return self.send(request, **send_kwargs)
项目:ruffruffs    作者:di    | 项目源码 | 文件源码
def test_hook_receives_request_arguments(self, httpbin):
        def hook(resp, **kwargs):
            assert resp is not None
            assert kwargs != {}

        s = requests.Session()
        r = requests.Request('GET', httpbin(), hooks={'response': hook})
        prep = s.prepare_request(r)
        s.send(prep)
项目:ruffruffs    作者:di    | 项目源码 | 文件源码
def test_session_hooks_are_used_with_no_request_hooks(self, httpbin):
        hook = lambda x, *args, **kwargs: x
        s = requests.Session()
        s.hooks['response'].append(hook)
        r = requests.Request('GET', httpbin())
        prep = s.prepare_request(r)
        assert prep.hooks['response'] != []
        assert prep.hooks['response'] == [hook]
项目:ruffruffs    作者:di    | 项目源码 | 文件源码
def test_session_hooks_are_overridden_by_request_hooks(self, httpbin):
        hook1 = lambda x, *args, **kwargs: x
        hook2 = lambda x, *args, **kwargs: x
        assert hook1 is not hook2
        s = requests.Session()
        s.hooks['response'].append(hook2)
        r = requests.Request('GET', httpbin(), hooks={'response': [hook1]})
        prep = s.prepare_request(r)
        assert prep.hooks['response'] == [hook1]
项目:ruffruffs    作者:di    | 项目源码 | 文件源码
def test_prepared_request_hook(self, httpbin):
        def hook(resp, **kwargs):
            resp.hook_working = True
            return resp

        req = requests.Request('GET', httpbin(), hooks={'response': hook})
        prep = req.prepare()

        s = requests.Session()
        s.proxies = getproxies()
        resp = s.send(prep)

        assert hasattr(resp, 'hook_working')
项目:ruffruffs    作者:di    | 项目源码 | 文件源码
def test_data_argument_accepts_tuples(data):
    """Ensure that the data argument will accept tuples of strings
    and properly encode them.
    """
    p = PreparedRequest()
    p.prepare(
        method='GET',
        url='http://www.example.com',
        data=data,
        hooks=default_hooks()
    )
    assert p.body == urlencode(data)
项目:filegardener    作者:smorin    | 项目源码 | 文件源码
def test_hook_receives_request_arguments(self, httpbin):
        def hook(resp, **kwargs):
            assert resp is not None
            assert kwargs != {}

        s = requests.Session()
        r = requests.Request('GET', httpbin(), hooks={'response': hook})
        prep = s.prepare_request(r)
        s.send(prep)
项目:filegardener    作者:smorin    | 项目源码 | 文件源码
def test_session_hooks_are_used_with_no_request_hooks(self, httpbin):
        hook = lambda x, *args, **kwargs: x
        s = requests.Session()
        s.hooks['response'].append(hook)
        r = requests.Request('GET', httpbin())
        prep = s.prepare_request(r)
        assert prep.hooks['response'] != []
        assert prep.hooks['response'] == [hook]
项目:filegardener    作者:smorin    | 项目源码 | 文件源码
def test_session_hooks_are_overridden_by_request_hooks(self, httpbin):
        hook1 = lambda x, *args, **kwargs: x
        hook2 = lambda x, *args, **kwargs: x
        assert hook1 is not hook2
        s = requests.Session()
        s.hooks['response'].append(hook2)
        r = requests.Request('GET', httpbin(), hooks={'response': [hook1]})
        prep = s.prepare_request(r)
        assert prep.hooks['response'] == [hook1]
项目:filegardener    作者:smorin    | 项目源码 | 文件源码
def test_prepared_request_hook(self, httpbin):
        def hook(resp, **kwargs):
            resp.hook_working = True
            return resp

        req = requests.Request('GET', httpbin(), hooks={'response': hook})
        prep = req.prepare()

        s = requests.Session()
        s.proxies = getproxies()
        resp = s.send(prep)

        assert hasattr(resp, 'hook_working')
项目:filegardener    作者:smorin    | 项目源码 | 文件源码
def test_data_argument_accepts_tuples(data):
    """Ensure that the data argument will accept tuples of strings
    and properly encode them.
    """
    p = PreparedRequest()
    p.prepare(
        method='GET',
        url='http://www.example.com',
        data=data,
        hooks=default_hooks()
    )
    assert p.body == urlencode(data)
项目:filegardener    作者:smorin    | 项目源码 | 文件源码
def test_hook_receives_request_arguments(self, httpbin):
        def hook(resp, **kwargs):
            assert resp is not None
            assert kwargs != {}

        s = requests.Session()
        r = requests.Request('GET', httpbin(), hooks={'response': hook})
        prep = s.prepare_request(r)
        s.send(prep)
项目:filegardener    作者:smorin    | 项目源码 | 文件源码
def test_session_hooks_are_used_with_no_request_hooks(self, httpbin):
        hook = lambda x, *args, **kwargs: x
        s = requests.Session()
        s.hooks['response'].append(hook)
        r = requests.Request('GET', httpbin())
        prep = s.prepare_request(r)
        assert prep.hooks['response'] != []
        assert prep.hooks['response'] == [hook]
项目:filegardener    作者:smorin    | 项目源码 | 文件源码
def test_session_hooks_are_overridden_by_request_hooks(self, httpbin):
        hook1 = lambda x, *args, **kwargs: x
        hook2 = lambda x, *args, **kwargs: x
        assert hook1 is not hook2
        s = requests.Session()
        s.hooks['response'].append(hook2)
        r = requests.Request('GET', httpbin(), hooks={'response': [hook1]})
        prep = s.prepare_request(r)
        assert prep.hooks['response'] == [hook1]
项目:filegardener    作者:smorin    | 项目源码 | 文件源码
def test_prepared_request_hook(self, httpbin):
        def hook(resp, **kwargs):
            resp.hook_working = True
            return resp

        req = requests.Request('GET', httpbin(), hooks={'response': hook})
        prep = req.prepare()

        s = requests.Session()
        s.proxies = getproxies()
        resp = s.send(prep)

        assert hasattr(resp, 'hook_working')
项目:filegardener    作者:smorin    | 项目源码 | 文件源码
def test_data_argument_accepts_tuples(data):
    """Ensure that the data argument will accept tuples of strings
    and properly encode them.
    """
    p = PreparedRequest()
    p.prepare(
        method='GET',
        url='http://www.example.com',
        data=data,
        hooks=default_hooks()
    )
    assert p.body == urlencode(data)
项目:plugin.video.jen    作者:midraal    | 项目源码 | 文件源码
def send(self, request, **kwargs):
        if (self._is_cache_disabled
            or request.method not in self._cache_allowable_methods):
            response = super(CachedSession, self).send(request, **kwargs)
            response.from_cache = False
            return response

        cache_key = self.cache.create_key(request)

        def send_request_and_cache_response():
            response = super(CachedSession, self).send(request, **kwargs)
            if response.status_code in self._cache_allowable_codes:
                self.cache.save_response(cache_key, response)
            response.from_cache = False
            return response

        response, timestamp = self.cache.get_response_and_time(cache_key)
        if response is None:
            return send_request_and_cache_response()

        if self._cache_expire_after is not None:
            is_expired = datetime.utcnow() - timestamp > self._cache_expire_after
            if is_expired:
                if not self._return_old_data_on_error:
                    self.cache.delete(cache_key)
                    return send_request_and_cache_response()
                try:
                    new_response = send_request_and_cache_response()
                except Exception:
                    return response
                else:
                    if new_response.status_code not in self._cache_allowable_codes:
                        return response
                    return new_response

        # dispatch hook here, because we've removed it before pickling
        response.from_cache = True
        response = dispatch_hook('response', request.hooks, response, **kwargs)
        return response
项目:repository.midraal    作者:midraal    | 项目源码 | 文件源码
def send(self, request, **kwargs):
        if (self._is_cache_disabled
            or request.method not in self._cache_allowable_methods):
            response = super(CachedSession, self).send(request, **kwargs)
            response.from_cache = False
            return response

        cache_key = self.cache.create_key(request)

        def send_request_and_cache_response():
            response = super(CachedSession, self).send(request, **kwargs)
            if response.status_code in self._cache_allowable_codes:
                self.cache.save_response(cache_key, response)
            response.from_cache = False
            return response

        response, timestamp = self.cache.get_response_and_time(cache_key)
        if response is None:
            return send_request_and_cache_response()

        if self._cache_expire_after is not None:
            is_expired = datetime.utcnow() - timestamp > self._cache_expire_after
            if is_expired:
                if not self._return_old_data_on_error:
                    self.cache.delete(cache_key)
                    return send_request_and_cache_response()
                try:
                    new_response = send_request_and_cache_response()
                except Exception:
                    return response
                else:
                    if new_response.status_code not in self._cache_allowable_codes:
                        return response
                    return new_response

        # dispatch hook here, because we've removed it before pickling
        response.from_cache = True
        response = dispatch_hook('response', request.hooks, response, **kwargs)
        return response
项目:Codeforces-Sublime-Plugin    作者:karunk    | 项目源码 | 文件源码
def test_hook_receives_request_arguments(self, httpbin):
        def hook(resp, **kwargs):
            assert resp is not None
            assert kwargs != {}

        requests.Request('GET', httpbin(), hooks={'response': hook})
项目:Codeforces-Sublime-Plugin    作者:karunk    | 项目源码 | 文件源码
def test_session_hooks_are_used_with_no_request_hooks(self, httpbin):
        hook = lambda x, *args, **kwargs: x
        s = requests.Session()
        s.hooks['response'].append(hook)
        r = requests.Request('GET', httpbin())
        prep = s.prepare_request(r)
        assert prep.hooks['response'] != []
        assert prep.hooks['response'] == [hook]
项目:Codeforces-Sublime-Plugin    作者:karunk    | 项目源码 | 文件源码
def test_session_hooks_are_overridden_by_request_hooks(self, httpbin):
        hook1 = lambda x, *args, **kwargs: x
        hook2 = lambda x, *args, **kwargs: x
        assert hook1 is not hook2
        s = requests.Session()
        s.hooks['response'].append(hook2)
        r = requests.Request('GET', httpbin(), hooks={'response': [hook1]})
        prep = s.prepare_request(r)
        assert prep.hooks['response'] == [hook1]
项目:Codeforces-Sublime-Plugin    作者:karunk    | 项目源码 | 文件源码
def test_prepared_request_hook(self, httpbin):
        def hook(resp, **kwargs):
            resp.hook_working = True
            return resp

        req = requests.Request('GET', httpbin(), hooks={'response': hook})
        prep = req.prepare()

        s = requests.Session()
        s.proxies = getproxies()
        resp = s.send(prep)

        assert hasattr(resp, 'hook_working')
项目:Codeforces-Sublime-Plugin    作者:karunk    | 项目源码 | 文件源码
def test_data_argument_accepts_tuples(list_of_tuples):
    """
    Ensure that the data argument will accept tuples of strings
    and properly encode them.
    """
    for data in list_of_tuples:
        p = PreparedRequest()
        p.prepare(
            method='GET',
            url='http://www.example.com',
            data=data,
            hooks=default_hooks()
        )
        assert p.body == urlencode(data)
项目:Codeforces-Sublime-Plugin    作者:karunk    | 项目源码 | 文件源码
def test_prepared_request_no_cookies_copy():
    p = PreparedRequest()
    p.prepare(
        method='GET',
        url='http://www.example.com',
        data='foo=bar',
        hooks=default_hooks()
    )
    assert_copy(p, p.copy())
项目:Codeforces-Sublime-Plugin    作者:karunk    | 项目源码 | 文件源码
def test_prepared_request_complete_copy():
    p = PreparedRequest()
    p.prepare(
        method='GET',
        url='http://www.example.com',
        data='foo=bar',
        hooks=default_hooks(),
        cookies={'foo': 'bar'}
    )
    assert_copy(p, p.copy())
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def send(self, request, **kwargs):
        if (self._is_cache_disabled
            or request.method not in self._cache_allowable_methods):
            response = super(CachedSession, self).send(request, **kwargs)
            response.from_cache = False
            return response

        cache_key = self.cache.create_key(request)

        def send_request_and_cache_response():
            if self._deny_outbound:
                print(request.url)
                raise Exception(("ERROR: OutBound communication was attempted,"
                                 " but deny_outbound was set to True"))

            cache_response = True
            response = super(CachedSession, self).send(request, **kwargs)
            if response.status_code in self._cache_allowable_codes:

                #
                # Special case for cblr:
                # if we get a status of pending then don't cache
                #
                try:
                    if request.url.find('cblr') != -1 and request.method == 'GET':
                        if isinstance(response.json(), dict) and response.json().get('status', '') == 'pending':
                            cache_response = False
                except:
                    cache_response = True
                if cache_response:
                    self.cache.save_response(cache_key, response)

            response.from_cache = False
            return response

        response = self.cache.get_response(cache_key)
        if response is None:
            return send_request_and_cache_response()

        if 'Content-Encoding' in response.headers:
            del response.headers['Content-Encoding']

        adapter = HTTPAdapter()
        response = adapter.build_response(request, response)


        # dispatch hook here, because we've removed it before pickling
        response.from_cache = True
        response = dispatch_hook('response', request.hooks, response, **kwargs)
        return response
项目:trip    作者:littlecodersh    | 项目源码 | 文件源码
def send(self, req, **kwargs):
        """Send a given PreparedRequest.

        :rtype: trip.gen.Future
        """

        if not isinstance(req, PreparedRequest):
            raise ValueError('You can only send PreparedRequests.')

        allow_redirects = kwargs.pop('allow_redirects', True)
        start_time = preferred_clock()

        r = yield self.adapter.send(req, **kwargs)

        if isinstance(r, Exception):
            raise gen.Return(r)
        else:
            r = self.prepare_response(req, r)

        r.elapsed = timedelta(seconds=(preferred_clock()-start_time))

        # Response manipulation hooks
        r = dispatch_hook('response', req.hooks, r, **kwargs)

        # Persist cookies
        if r.history:
            # If the hooks create history then we want those cookies too
            for resp in r.history:
                self.cookies.extract_cookies(
                    MockResponse(HTTPHeaderDict(resp.headers)), MockRequest(resp.request))

        self.cookies.extract_cookies(
            MockResponse(HTTPHeaderDict(r.headers)), MockRequest(req))

        # Redirect resolving generator.
        redirect_gen = self.resolve_redirects(r, req, **kwargs)

        # Resolve redirects if allowed.
        history = []
        if allow_redirects:
            for resp in redirect_gen:
                resp = yield resp
                history.append(resp)

        # Shuffle things around if there's history.
        if history:
            # Insert the first (original) request at the start
            history.insert(0, r)
            # Get the last request made
            r = history.pop()
            r.history = history

        # If redirects aren't being followed, store the response on the Request for Response.next().
        if not allow_redirects:
            try:
                r._next = next(self.resolve_redirects(r, req, yield_requests=True, **kwargs))
            except StopIteration:
                pass

        raise gen.Return(r)