Python oauth2 模块,Request() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用oauth2.Request()

项目:yahoo-fantasy-football-metrics    作者:uberfastman    | 项目源码 | 文件源码
def __two_legged_request(self, parameters=None, method=None):
        """Sign a request for two-legged authentication"""
        params = self.get_base_params()
        if parameters:
            params.update(parameters)
        url = self.endpoint
        yql_logger.debug("params: %s", params)
        yql_logger.debug("endpoint_url: %s", url)
        if not method:
            method = "GET"

        consumer = oauth.Consumer(self.api_key, self.secret)
        request = oauth.Request(method=method, url=url, parameters=params)
        sig = self.get_signature(url)
        yql_logger.debug("signature: %s", sig)
        request.sign_request(sig, consumer, None)
        return request
项目:yahoo-fantasy-football-metrics    作者:uberfastman    | 项目源码 | 文件源码
def get_token_and_auth_url(self, callback_url=None):
        """First step is to get the token and then send the request that
        provides the auth URL

        Returns a tuple of token and the authorisation URL.

        """

        client = oauth.Client(self.consumer)

        params = {}
        params['oauth_callback'] = callback_url or 'oob'

        request = oauth.Request(parameters=params)
        url = REQUEST_TOKEN_URL
        resp, content = client.request(url, "POST", request.to_postdata())

        if resp.get('status') == '200':
            token = oauth.Token.from_string(content)
            yql_logger.debug("token: %s", token)
            data = dict(parse_qsl(content))
            yql_logger.debug("data: %s", data)
            return token, data['xoauth_request_auth_url']
        else:
            raise YQLError(resp, content, url)
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def __two_legged_request(self, resource_url, parameters=None, method=None):
        """Sign a request for two-legged authentication"""

        params = self.get_base_params()
        if parameters:
            params.update(parameters)

        yql_logger.debug("params: %s", params)
        yql_logger.debug("resource_url: %s", resource_url)
        if not method:
            method = "GET"

        consumer = oauth.Consumer(self.api_key, self.secret)
        request = oauth.Request(method=method, url=resource_url,
                                                        parameters=params)
        request.sign_request(self.hmac_sha1_signature, consumer, None)
        return request
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def get_token_and_auth_url(self, callback_url=None):
        """First step is to get the token and then send the request that
        provides the auth URL

        Returns a tuple of token and the authorisation URL.

        """

        client = oauth.Client(self.consumer)

        params = {}
        params['oauth_callback'] = callback_url or 'oob'

        request = oauth.Request(parameters=params)
        url = REQUEST_TOKEN_URL
        resp, content = client.request(url, "POST", request.to_postdata())

        if resp.get('status') == '200':
            token = oauth.Token.from_string(content)
            yql_logger.debug("token: %s", token)
            data = dict(parse_qsl(content))
            yql_logger.debug("data: %s", data)
            return token, data['xoauth_request_auth_url']
        else:
            raise YQLError, (resp, content, url)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_get_nonoauth_parameters(self):

        oauth_params = {
            'oauth_consumer': 'asdfasdfasdf'
        }

        other_params = {
            u('foo'): u('baz'),
            u('bar'): u('foo'),
            u('multi'): [u('FOO'), u('BAR')],
            u('uni_utf8'): u(b'\xae', 'latin1'),
            u('uni_unicode'): _UGLYPH,
            u('uni_unicode_2'):
                u(b'\xc3\xa5\xc3\x85\xc3\xb8\xc3\x98', 'latin1'), # 'åÅøØ'
        }

        params = oauth_params
        params.update(other_params)

        req = oauth.Request("GET", "http://example.com", params)
        self.assertEqual(other_params, req.get_nonoauth_parameters())
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_to_postdata(self):
        realm = "http://sp.example.com/"

        params = {
            'multi': ['FOO','BAR'],
            'oauth_version': "1.0",
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': "137131200",
            'oauth_consumer_key': "0685bd9184jfhq22",
            'oauth_signature_method': "HMAC-SHA1",
            'oauth_token': "ad180jjd733klru7",
            'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
        }

        req = oauth.Request("GET", realm, params)

        flat = [('multi','FOO'),('multi','BAR')]
        del params['multi']
        flat.extend(params.items())
        kf = lambda x: x[0]
        self.assertEqual(
                sorted(flat, key=kf),
                sorted(parse_qsl(req.to_postdata()), key=kf))
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_to_url(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_version': "1.0",
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': "137131200",
            'oauth_consumer_key': "0685bd9184jfhq22",
            'oauth_signature_method': "HMAC-SHA1",
            'oauth_token': "ad180jjd733klru7",
            'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
        }

        req = oauth.Request("GET", url, params)
        exp = urlparse("%s?%s" % (url, urlencode(params)))
        res = urlparse(req.to_url())
        self.assertEqual(exp.scheme, res.scheme)
        self.assertEqual(exp.netloc, res.netloc)
        self.assertEqual(exp.path, res.path)

        exp_parsed = parse_qs(exp.query)
        res_parsed = parse_qs(res.query)
        self.assertEqual(exp_parsed, res_parsed)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_get_normalized_parameters_from_url(self):
        # example copied from
        # https://github.com/ciaranj/node-oauth/blob/master/tests/oauth.js
        # which in turns says that it was copied from
        # http://oauth.net/core/1.0/#sig_base_example .
        url = ("http://photos.example.net/photos?file=vacation.jpg"
               "&oauth_consumer_key=dpf43f3p2l4k3l03"
               "&oauth_nonce=kllo9940pd9333jh&oauth_signature_method=HMAC-SHA1"
               "&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk"
               "&oauth_version=1.0&size=original")

        req = oauth.Request("GET", url)

        res = req.get_normalized_parameters()

        expected = ('file=vacation.jpg&oauth_consumer_key=dpf43f3p2l4k3l03'
                    '&oauth_nonce=kllo9940pd9333jh'
                    '&oauth_signature_method=HMAC-SHA1'
                    '&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk'
                    '&oauth_version=1.0&size=original')

        self.assertEqual(expected, res)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_get_normalized_parameters_ignores_auth_signature(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_version': "1.0",
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': "137131200",
            'oauth_consumer_key': "0685bd9184jfhq22",
            'oauth_signature_method': "HMAC-SHA1",
            'oauth_signature': "some-random-signature-%d" % random.randint(1000, 2000),
            'oauth_token': "ad180jjd733klru7",
        }

        req = oauth.Request("GET", url, params)

        res = req.get_normalized_parameters()

        self.assertNotEqual(urlencode(sorted(params.items())), res)

        foo = params.copy()
        del foo["oauth_signature"]
        self.assertEqual(urlencode(sorted(foo.items())), res)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_from_token_and_callback(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_version': "1.0",
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': "137131200",
            'oauth_consumer_key': "0685bd9184jfhq22",
            'oauth_signature_method': "HMAC-SHA1",
            'oauth_token': "ad180jjd733klru7",
            'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
        }

        tok = oauth.Token(key="tok-test-key", secret="tok-test-secret")
        req = oauth.Request.from_token_and_callback(tok)
        self.assertFalse('oauth_callback' in req)
        self.assertEqual(req['oauth_token'], tok.key)

        req = oauth.Request.from_token_and_callback(tok, callback=url)
        self.assertTrue('oauth_callback' in req)
        self.assertEqual(req['oauth_callback'], url)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def setUp(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_version': "1.0",
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': int(time.time()),
            'bar': 'blerg',
            'multi': ['FOO','BAR'],
            'foo': 59
        }

        self.consumer = oauth.Consumer(key="consumer-key",
            secret="consumer-secret")
        self.token = oauth.Token(key="token-key", secret="token-secret")

        params['oauth_token'] = self.token.key
        params['oauth_consumer_key'] = self.consumer.key
        self.request = oauth.Request(method="GET", url=url, parameters=params)

        signature_method = oauth.SignatureMethod_HMAC_SHA1()
        self.request.sign_request(signature_method, self.consumer, self.token)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def sign_request(self, url, url_params={}):
        oauth_request = oauth2.Request(
            method="GET",
            url=url,
            parameters=url_params
        )
        oauth_request.update(
            {
                'oauth_nonce': oauth2.generate_nonce(),
                'oauth_timestamp': oauth2.generate_timestamp(),
                'oauth_token': self.token.key,
                'oauth_consumer_key': self.consumer.key
            }
        )
        oauth_request.sign_request(
            oauth2.SignatureMethod_HMAC_SHA1(),
            self.consumer,
            self.token
        )
        return oauth_request.to_url()
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_get_nonoauth_parameters(self):

        oauth_params = {
            'oauth_consumer': 'asdfasdfasdf'
        }

        other_params = {
            u('foo'): u('baz'),
            u('bar'): u('foo'),
            u('multi'): [u('FOO'), u('BAR')],
            u('uni_utf8'): u(b'\xae', 'latin1'),
            u('uni_unicode'): _UGLYPH,
            u('uni_unicode_2'):
                u(b'\xc3\xa5\xc3\x85\xc3\xb8\xc3\x98', 'latin1'), # 'åÅøØ'
        }

        params = oauth_params
        params.update(other_params)

        req = oauth.Request("GET", "http://example.com", params)
        self.assertEqual(other_params, req.get_nonoauth_parameters())
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_to_postdata(self):
        realm = "http://sp.example.com/"

        params = {
            'multi': ['FOO','BAR'],
            'oauth_version': "1.0",
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': "137131200",
            'oauth_consumer_key': "0685bd9184jfhq22",
            'oauth_signature_method': "HMAC-SHA1",
            'oauth_token': "ad180jjd733klru7",
            'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
        }

        req = oauth.Request("GET", realm, params)

        flat = [('multi','FOO'),('multi','BAR')]
        del params['multi']
        flat.extend(params.items())
        kf = lambda x: x[0]
        self.assertEqual(
                sorted(flat, key=kf),
                sorted(parse_qsl(req.to_postdata()), key=kf))
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_to_url(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_version': "1.0",
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': "137131200",
            'oauth_consumer_key': "0685bd9184jfhq22",
            'oauth_signature_method': "HMAC-SHA1",
            'oauth_token': "ad180jjd733klru7",
            'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
        }

        req = oauth.Request("GET", url, params)
        exp = urlparse("%s?%s" % (url, urlencode(params)))
        res = urlparse(req.to_url())
        self.assertEqual(exp.scheme, res.scheme)
        self.assertEqual(exp.netloc, res.netloc)
        self.assertEqual(exp.path, res.path)

        exp_parsed = parse_qs(exp.query)
        res_parsed = parse_qs(res.query)
        self.assertEqual(exp_parsed, res_parsed)
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_get_normalized_parameters_from_url(self):
        # example copied from
        # https://github.com/ciaranj/node-oauth/blob/master/tests/oauth.js
        # which in turns says that it was copied from
        # http://oauth.net/core/1.0/#sig_base_example .
        url = ("http://photos.example.net/photos?file=vacation.jpg"
               "&oauth_consumer_key=dpf43f3p2l4k3l03"
               "&oauth_nonce=kllo9940pd9333jh&oauth_signature_method=HMAC-SHA1"
               "&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk"
               "&oauth_version=1.0&size=original")

        req = oauth.Request("GET", url)

        res = req.get_normalized_parameters()

        expected = ('file=vacation.jpg&oauth_consumer_key=dpf43f3p2l4k3l03'
                    '&oauth_nonce=kllo9940pd9333jh'
                    '&oauth_signature_method=HMAC-SHA1'
                    '&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk'
                    '&oauth_version=1.0&size=original')

        self.assertEqual(expected, res)
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_get_normalized_parameters_ignores_auth_signature(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_version': "1.0",
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': "137131200",
            'oauth_consumer_key': "0685bd9184jfhq22",
            'oauth_signature_method': "HMAC-SHA1",
            'oauth_signature': "some-random-signature-%d" % random.randint(1000, 2000),
            'oauth_token': "ad180jjd733klru7",
        }

        req = oauth.Request("GET", url, params)

        res = req.get_normalized_parameters()

        self.assertNotEqual(urlencode(sorted(params.items())), res)

        foo = params.copy()
        del foo["oauth_signature"]
        self.assertEqual(urlencode(sorted(foo.items())), res)
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_from_token_and_callback(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_version': "1.0",
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': "137131200",
            'oauth_consumer_key': "0685bd9184jfhq22",
            'oauth_signature_method': "HMAC-SHA1",
            'oauth_token': "ad180jjd733klru7",
            'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
        }

        tok = oauth.Token(key="tok-test-key", secret="tok-test-secret")
        req = oauth.Request.from_token_and_callback(tok)
        self.assertFalse('oauth_callback' in req)
        self.assertEqual(req['oauth_token'], tok.key)

        req = oauth.Request.from_token_and_callback(tok, callback=url)
        self.assertTrue('oauth_callback' in req)
        self.assertEqual(req['oauth_callback'], url)
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def setUp(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_version': "1.0",
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': int(time.time()),
            'bar': 'blerg',
            'multi': ['FOO','BAR'],
            'foo': 59
        }

        self.consumer = oauth.Consumer(key="consumer-key",
            secret="consumer-secret")
        self.token = oauth.Token(key="token-key", secret="token-secret")

        params['oauth_token'] = self.token.key
        params['oauth_consumer_key'] = self.consumer.key
        self.request = oauth.Request(method="GET", url=url, parameters=params)

        signature_method = oauth.SignatureMethod_HMAC_SHA1()
        self.request.sign_request(signature_method, self.consumer, self.token)
项目:yahoo-fantasy-football-metrics    作者:uberfastman    | 项目源码 | 文件源码
def refresh_token(self, token):
        """Access Tokens only last for one hour from the point of being issued.

        When a token has expired it needs to be refreshed this method takes an
        expired token and refreshes it.

        token parameter can be either a token object or a token string.

        """
        if not hasattr(token, "key"):
            token = YahooToken.from_string(token)

        params = self.get_base_params()
        params['oauth_token'] = token.key
        params['oauth_token_secret'] = token.secret
        params['oauth_session_handle'] = token.session_handle

        oauth_request = oauth.Request.from_consumer_and_token(
                               self.consumer, token=token,
                               http_url=ACCESS_TOKEN_URL,
                               http_method="POST",
                               parameters=params)

        yql_logger.debug("oauth_request: %s", oauth_request)
        oauth_request.sign_request(
                self.hmac_sha1_signature, self.consumer, token)

        url = oauth_request.to_url()
        yql_logger.debug("oauth_url: %s", url)
        postdata = oauth_request.to_postdata()
        yql_logger.debug("oauth_postdata: %s", postdata)
        resp, content = self.http.request(url, "POST", postdata)

        if resp.get('status') == '200':
            access_token = YahooToken.from_string(content)
            yql_logger.debug("oauth_access_token: %s", access_token)
            access_token.timestamp = oauth_request['oauth_timestamp']
            return access_token
        else:
            raise YQLError(resp, content, url)
项目:yahoo-fantasy-football-metrics    作者:uberfastman    | 项目源码 | 文件源码
def get_uri(self, query, params=None, **kwargs):
        """Get the the request url"""
        if isinstance(query, basestring):
            query = YQLQuery(query)
        query_params = self.get_query_params(query, params, **kwargs)

        token = kwargs.get("token")

        if hasattr(token, "yahoo_guid"):
            query_params["oauth_yahoo_guid"] = getattr(token, "yahoo_guid")

        if not token:
            raise ValueError("Without a token three-legged-auth cannot be"
                                                            " carried out")

        yql_logger.debug("query_params: %s", query_params)
        http_method = query.get_http_method()
        url = self.endpoint
        oauth_request = oauth.Request.from_consumer_and_token(
                                    self.consumer, http_url=url,
                                    token=token, parameters=query_params,
                                    http_method=http_method)
        yql_logger.debug("oauth_request: %s", oauth_request)
        # Sign request
        sig = self.get_signature(url)
        oauth_request.sign_request(sig, self.consumer, token)
        yql_logger.debug("oauth_signed_request: %s", oauth_request)
        url = oauth_request.to_url()
        url = clean_url(url)
        return url.replace('+', '%20').replace('%7E', '~')
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def refresh_token(self, token):
        """Access Tokens only last for one hour from the point of being issued.

        When a token has expired it needs to be refreshed this method takes an
        expired token and refreshes it.

        token parameter can be either a token object or a token string.

        """
        if not hasattr(token, "key"):
            token = YahooToken.from_string(token)

        params = self.get_base_params()
        params['oauth_token'] = token.key
        params['oauth_token_secret'] = token.secret
        params['oauth_session_handle'] = token.session_handle

        oauth_request = oauth.Request.from_consumer_and_token(
                               self.consumer, token=token,
                               http_url=ACCESS_TOKEN_URL,
                               http_method="POST",
                               parameters=params)

        yql_logger.debug("oauth_request: %s", oauth_request)
        oauth_request.sign_request(
                self.hmac_sha1_signature, self.consumer, token)

        url = oauth_request.to_url()
        yql_logger.debug("oauth_url: %s", url)
        postdata = oauth_request.to_postdata()
        yql_logger.debug("oauth_postdata: %s", postdata)
        resp, content = self.http.request(url, "POST", postdata)

        if resp.get('status') == '200':
            access_token = YahooToken.from_string(content)
            yql_logger.debug("oauth_access_token: %s", access_token)
            access_token.timestamp = oauth_request['oauth_timestamp']
            return access_token
        else:
            raise YQLError, (resp, content, url)
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def get_uri(self, query, params=None, **kwargs):
        """Get the the request url"""
        query_params = self.get_query_params(query, params, **kwargs)

        token = kwargs.get("token")

        if hasattr(token, "yahoo_guid"):
            query_params["oauth_yahoo_guid"] = getattr(token, "yahoo_guid")

        if not token:
            raise ValueError, "Without a token three-legged-auth cannot be"\
                                                              " carried out"

        yql_logger.debug("query_params: %s", query_params)
        http_method = get_http_method(query)
        oauth_request = oauth.Request.from_consumer_and_token(
                                        self.consumer, http_url=self.uri,
                                        token=token, parameters=query_params,
                                        http_method=http_method)
        yql_logger.debug("oauth_request: %s", oauth_request)
        # Sign request
        oauth_request.sign_request(
                            self.hmac_sha1_signature, self.consumer, token)

        yql_logger.debug("oauth_signed_request: %s", oauth_request)
        uri = "%s?%s" % (self.uri,  oauth_request.to_postdata())
        return uri.replace('+', '%20').replace('%7E', '~')
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test__init__(self):
        method = "GET"
        req = oauth.Request(method)
        self.assertFalse('url' in req.__dict__)
        self.assertFalse('normalized_url' in req.__dict__)
        self.assertRaises(AttributeError, getattr, req, 'url')
        self.assertRaises(AttributeError, getattr, req, 'normalized_url')
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_setter(self):
        url = "http://example.com"
        method = "GET"
        req = oauth.Request(method, url)
        self.assertEqual(req.url, url)
        self.assertEqual(req.normalized_url, url)
        req.url = url + '/?foo=bar'
        self.assertEqual(req.url, url + '/?foo=bar')
        self.assertEqual(req.normalized_url, url + '/')
        req.url = None
        self.assertEqual(req.url, None)
        self.assertEqual(req.normalized_url, None)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_deleter(self):
        url = "http://example.com"
        method = "GET"
        req = oauth.Request(method, url)
        del req.url
        self.assertRaises(AttributeError, getattr, req, 'url')
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_url(self):
        url1 = "http://example.com:80/foo.php"
        url2 = "https://example.com:443/foo.php"
        exp1 = "http://example.com/foo.php"
        exp2 = "https://example.com/foo.php"
        method = "GET"

        req = oauth.Request(method, url1)
        self.assertEqual(req.normalized_url, exp1)
        self.assertEqual(req.url, url1)

        req = oauth.Request(method, url2)
        self.assertEqual(req.normalized_url, exp2)
        self.assertEqual(req.url, url2)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_bad_url(self):
        request = oauth.Request()
        try:
            request.url = "ftp://example.com"
            self.fail("Invalid URL scheme was accepted.")
        except ValueError:
            pass
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_no_url_set(self):
        consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret')
        token = oauth.Token('my_key', 'my_secret')
        request = oauth.Request()
        self.assertRaises(ValueError,
                          request.sign_request,
                          oauth.SignatureMethod_HMAC_SHA1(), consumer, token)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_url_query(self):
        url = ("https://www.google.com/m8/feeds/contacts/default/full/?alt=json&max-contacts=10")
        normalized_url = urlunparse(urlparse(url)[:3] + (None, None, None))
        method = "GET"
        req = oauth.Request(method, url)
        self.assertEqual(req.url, url)
        self.assertEqual(req.normalized_url, normalized_url)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_get_parameter(self):
        url = "http://example.com"
        method = "GET"
        params = {'oauth_consumer' : 'asdf'}
        req = oauth.Request(method, url, parameters=params)

        self.assertEqual(req.get_parameter('oauth_consumer'), 'asdf')
        self.assertRaises(oauth.Error, req.get_parameter, 'blah')
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_to_url_nonascii(self):
        url = "http://sp.example.com/"

        params = {
            'nonasciithing': u'q\xbfu\xe9 ,aasp u?..a.s',
            'oauth_version': "1.0",
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': "137131200",
            'oauth_consumer_key': "0685bd9184jfhq22",
            'oauth_signature_method': "HMAC-SHA1",
            'oauth_token': "ad180jjd733klru7",
            'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
        }

        req = oauth.Request("GET", url, params)
        res = urlparse(req.to_url())

        params['nonasciithing'] = params['nonasciithing'].encode('utf-8')
        exp = urlparse("%s?%s" % (url, urlencode(params)))

        self.assertEquals(exp.netloc, res.netloc)
        self.assertEquals(exp.path, res.path)

        a = parse_qs(exp.query)
        b = parse_qs(res.query)
        self.assertEquals(a, b)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_to_header(self):
        realm = "http://sp.example.com/"

        params = {
            'oauth_version': "1.0",
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': "137131200",
            'oauth_consumer_key': "0685bd9184jfhq22",
            'oauth_signature_method': "HMAC-SHA1",
            'oauth_token': "ad180jjd733klru7",
            'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
        }

        req = oauth.Request("GET", realm, params)
        header, value = list(req.to_header(realm).items())[0]

        parts = value.split('OAuth ')
        vars = parts[1].split(', ')
        self.assertTrue(len(vars), (len(params) + 1))

        res = {}
        for v in vars:
            var, val = v.split('=')
            res[var] = unquote(val.strip('"'))

        self.assertEqual(realm, res['realm'])
        del res['realm']

        self.assertTrue(len(res), len(params))

        for key, val in res.items():
            self.assertEqual(val, params.get(key))
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_to_postdata_nonascii(self):
        realm = "http://sp.example.com/"

        params = {
            'nonasciithing': u('q\xbfu\xe9 ,aasp u?..a.s', 'latin1'),
            'oauth_version': "1.0",
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': "137131200",
            'oauth_consumer_key': "0685bd9184jfhq22",
            'oauth_signature_method': "HMAC-SHA1",
            'oauth_token': "ad180jjd733klru7",
            'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
        }

        req = oauth.Request("GET", realm, params)

        self.assertReallyEqual(
            req.to_postdata(),
            ('nonasciithing=q%C2%BFu%C3%A9%20%2Caasp%20u%3F..a.s'
             '&oauth_consumer_key=0685bd9184jfhq22'
             '&oauth_nonce=4572616e48616d6d65724c61686176'
             '&oauth_signature=wOJIO9A2W5mFwDgiDvZbTSMK%252FPY%253D'
             '&oauth_signature_method=HMAC-SHA1'
             '&oauth_timestamp=137131200'
             '&oauth_token=ad180jjd733klru7'
             '&oauth_version=1.0'
            ))
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_to_url_with_query(self):
        url = ("https://www.google.com/m8/feeds/contacts/default/full/"
               "?alt=json&max-contacts=10")

        params = {
            'oauth_version': "1.0",
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': "137131200",
            'oauth_consumer_key': "0685bd9184jfhq22",
            'oauth_signature_method': "HMAC-SHA1",
            'oauth_token': "ad180jjd733klru7",
            'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
        }

        req = oauth.Request("GET", url, params)
        # Note: the url above already has query parameters, so append new
        # ones with &
        exp = urlparse("%s&%s" % (url, urlencode(params)))
        res = urlparse(req.to_url())
        self.assertEqual(exp.scheme, res.scheme)
        self.assertEqual(exp.netloc, res.netloc)
        self.assertEqual(exp.path, res.path)

        exp_q = parse_qs(exp.query)
        res_q = parse_qs(res.query)
        self.assertTrue('alt' in res_q)
        self.assertTrue('max-contacts' in res_q)
        self.assertEqual(res_q['alt'], ['json'])
        self.assertEqual(res_q['max-contacts'], ['10'])
        self.assertEqual(exp_q, res_q)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_signature_base_string_bytes_nonascii_nonutf8(self):
        consumer = oauth.Consumer('consumer_token', 'consumer_secret')

        url = (b'http://api.simplegeo.com:80/1.0/places/address.json'
               b'?q=monkeys&category=animal'
               b'&address=41+Decatur+St,+San+Francisc') + _B2766 + b',+CA'
        req = oauth.Request("GET", url)
        self.assertReallyEqual(
            req.normalized_url,
            u('http://api.simplegeo.com/1.0/places/address.json'))
        req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
        self.assertReallyEqual( #XXX
            req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_signature_base_bytes_nonascii_nonutf8_urlencoded(self):
        consumer = oauth.Consumer('consumer_token', 'consumer_secret')

        url = (b'http://api.simplegeo.com:80/1.0/places/address.json'
               b'?q=monkeys&category=animal'
               b'&address=41+Decatur+St,+San+Francisc%E2%9D%A6,+CA')
        req = oauth.Request("GET", url)
        self.assertReallyEqual(
            req.normalized_url,
            u('http://api.simplegeo.com/1.0/places/address.json'))
        req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
        self.assertReallyEqual(
            req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_signature_base_unicode_nonascii_nonutf8_url_encoded(self):
        consumer = oauth.Consumer('consumer_token', 'consumer_secret')

        url = u('http://api.simplegeo.com:80/1.0/places/address.json'
                '?q=monkeys&category=animal'
                '&address=41+Decatur+St,+San+Francisc%E2%9D%A6,+CA')
        req = oauth.Request("GET", url)
        self.assertReallyEqual(
            req.normalized_url,
            u('http://api.simplegeo.com/1.0/places/address.json'))
        req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
        self.assertReallyEqual(
            req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_signature_base_string_with_query(self):
        url = ("https://www.google.com/m8/feeds/contacts/default/full/"
               "?alt=json&max-contacts=10")
        params = {
            'oauth_version': "1.0",
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': "137131200",
            'oauth_consumer_key': "0685bd9184jfhq22",
            'oauth_signature_method': "HMAC-SHA1",
            'oauth_token': "ad180jjd733klru7",
            'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
        }
        req = oauth.Request("GET", url, params)
        self.assertEqual(
            req.normalized_url,
            'https://www.google.com/m8/feeds/contacts/default/full/')
        self.assertEqual(req.url, url)
        normalized_params = parse_qsl(req.get_normalized_parameters())
        self.assertTrue(len(normalized_params), len(params) + 2)
        normalized_params = dict(normalized_params)
        for key, value in params.items():
            if key == 'oauth_signature':
                continue
            self.assertEqual(value, normalized_params[key])
        self.assertEqual(normalized_params['alt'], 'json')
        self.assertEqual(normalized_params['max-contacts'], '10')
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_get_normalized_parameters_empty(self):
        url = "http://sp.example.com/?empty="

        req = oauth.Request("GET", url)

        res = req.get_normalized_parameters()

        expected='empty='

        self.assertEqual(expected, res)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_get_normalized_parameters_multiple(self):
        url = "http://example.com/v2/search/videos?oauth_nonce=79815175&oauth_timestamp=1295397962&oauth_consumer_key=mykey&oauth_signature_method=HMAC-SHA1&oauth_version=1.0&offset=10&oauth_signature=spWLI%2FGQjid7sQVd5%2FarahRxzJg%3D&tag=one&tag=two"

        req = oauth.Request("GET", url)

        res = req.get_normalized_parameters()

        expected='oauth_consumer_key=mykey&oauth_nonce=79815175&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1295397962&oauth_version=1.0&offset=10&tag=one&tag=two'

        self.assertEqual(expected, res)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_signing_base(self):
        # example copied from
        # https://github.com/ciaranj/node-oauth/blob/master/tests/oauth.js
        # which in turns says that it was copied from
        # http://oauth.net/core/1.0/#sig_base_example .
        url = ("http://photos.example.net/photos?file=vacation.jpg"
               "&oauth_consumer_key=dpf43f3p2l4k3l03"
               "&oauth_nonce=kllo9940pd9333jh&oauth_signature_method=HMAC-SHA1"
               "&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk"
               "&oauth_version=1.0&size=original")

        req = oauth.Request("GET", url)

        sm = oauth.SignatureMethod_HMAC_SHA1()

        consumer = oauth.Consumer('dpf43f3p2l4k3l03', 'foo')
        key, raw = sm.signing_base(req, consumer, None)

        expected = b('GET&http%3A%2F%2Fphotos.example.net%2Fphotos'
                     '&file%3Dvacation.jpg'
                     '%26oauth_consumer_key%3Ddpf43f3p2l4k3l03'
                     '%26oauth_nonce%3Dkllo9940pd9333jh'
                     '%26oauth_signature_method%3DHMAC-SHA1'
                     '%26oauth_timestamp%3D1191242096'
                     '%26oauth_token%3Dnnch734d00sl2jdk'
                     '%26oauth_version%3D1.0%26size%3Doriginal')
        self.assertEqual(expected, raw)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_get_normalized_parameters(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_version': "1.0",
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': "137131200",
            'oauth_consumer_key': "0685bd9184jfhq22",
            'oauth_signature_method': "HMAC-SHA1",
            'oauth_token': "ad180jjd733klru7",
            'multi': ['FOO','BAR', _UGLYPH, b'\xc2\xae'],
            'multi_same': ['FOO','FOO'],
            'uni_utf8_bytes': b'\xc2\xae',
            'uni_unicode_object': _UGLYPH
        }

        req = oauth.Request("GET", url, params)

        res = req.get_normalized_parameters()

        expected = ('multi=BAR&multi=FOO&multi=%C2%AE&multi=%C2%AE'
                    '&multi_same=FOO&multi_same=FOO'
                    '&oauth_consumer_key=0685bd9184jfhq22'
                    '&oauth_nonce=4572616e48616d6d65724c61686176'
                    '&oauth_signature_method=HMAC-SHA1'
                    '&oauth_timestamp=137131200'
                    '&oauth_token=ad180jjd733klru7'
                    '&oauth_version=1.0'
                    '&uni_unicode_object=%C2%AE&uni_utf8_bytes=%C2%AE')

        self.assertEqual(expected, res)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_get_normalized_string_escapes_spaces_properly(self):
        url = "http://sp.example.com/"
        params = {
            "some_random_data": random.randint(100, 1000),
            "data": "This data with a random number (%d) has spaces!"
                                    % random.randint(1000, 2000),
        }

        req = oauth.Request("GET", url, params)
        res = req.get_normalized_parameters()
        expected = urlencode(sorted(params.items())).replace('+', '%20')
        self.assertEqual(expected, res)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_from_request_works_with_wsgi(self):
        """Make sure WSGI header HTTP_AUTHORIZATION is detected correctly."""
        url = "http://sp.example.com/"

        params = {
            'oauth_version': "1.0",
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': "137131200",
            'oauth_consumer_key': "0685bd9184jfhq22",
            'oauth_signature_method': "HMAC-SHA1",
            'oauth_token': "ad180jjd733klru7",
            'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
        }

        req = oauth.Request("GET", url, params)
        headers = req.to_header()

        # Munge the headers
        headers['HTTP_AUTHORIZATION'] = headers['Authorization']
        del headers['Authorization'] 

        # Test from the headers
        req = oauth.Request.from_request("GET", url, headers)
        self.assertEqual(req.method, "GET")
        self.assertEqual(req.url, url)
        self.assertEqual(params, req.copy())
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_from_consumer_and_token(self):
        url = "http://sp.example.com/"

        tok = oauth.Token(key="tok-test-key", secret="tok-test-secret")
        tok.set_verifier('this_is_a_test_verifier')
        con = oauth.Consumer(key="con-test-key", secret="con-test-secret")
        req = oauth.Request.from_consumer_and_token(con, token=tok,
            http_method="GET", http_url=url)

        self.assertEqual(req['oauth_token'], tok.key)
        self.assertEqual(req['oauth_consumer_key'], con.key)
        self.assertEqual(tok.verifier, req['oauth_verifier'])
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_no_version(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': int(time.time()),
            'bar': 'blerg',
            'multi': ['FOO','BAR'],
            'foo': 59
        }

        self.consumer = oauth.Consumer(key="consumer-key",
            secret="consumer-secret")
        self.token = oauth.Token(key="token-key", secret="token-secret")

        params['oauth_token'] = self.token.key
        params['oauth_consumer_key'] = self.consumer.key
        self.request = oauth.Request(method="GET", url=url, parameters=params)

        signature_method = oauth.SignatureMethod_HMAC_SHA1()
        self.request.sign_request(signature_method, self.consumer, self.token)

        server = oauth.Server()
        server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())

        parameters = server.verify_request(self.request, self.consumer,
            self.token)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_invalid_signature_method(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_version': '1.0',
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': int(time.time()),
            'bar': 'blerg',
            'multi': ['FOO','BAR'],
            'foo': 59
        }

        consumer = oauth.Consumer(key="consumer-key",
            secret="consumer-secret")
        token = oauth.Token(key="token-key", secret="token-secret")

        params['oauth_token'] = token.key
        params['oauth_consumer_key'] = consumer.key
        request = oauth.Request(method="GET", url=url, parameters=params)

        signature_method = SignatureMethod_Bad()
        request.sign_request(signature_method, consumer, token)

        server = oauth.Server()
        server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())

        self.assertRaises(oauth.Error, server.verify_request, request,
            consumer, token)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_missing_signature(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_version': '1.0',
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': int(time.time()),
            'bar': 'blerg',
            'multi': ['FOO','BAR'],
            'foo': 59
        }

        consumer = oauth.Consumer(key="consumer-key",
            secret="consumer-secret")
        token = oauth.Token(key="token-key", secret="token-secret")

        params['oauth_token'] = token.key
        params['oauth_consumer_key'] = consumer.key
        request = oauth.Request(method="GET", url=url, parameters=params)

        signature_method = oauth.SignatureMethod_HMAC_SHA1()
        request.sign_request(signature_method, consumer, token)
        del request['oauth_signature']

        server = oauth.Server()
        server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())

        self.assertRaises(oauth.MissingSignature, server.verify_request,
            request, consumer, token)


# Request Token: http://oauth-sandbox.sevengoslings.net/request_token
# Auth: http://oauth-sandbox.sevengoslings.net/authorize
# Access Token: http://oauth-sandbox.sevengoslings.net/access_token
# Two-legged: http://oauth-sandbox.sevengoslings.net/two_legged
# Three-legged: http://oauth-sandbox.sevengoslings.net/three_legged
# Key: bd37aed57e15df53
# Secret: 0e9e6413a9ef49510a4f68ed02cd
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_url_with_query_string(self, mockHttpRequest):
        uri = 'http://example.com/foo/bar/?show=thundercats&character=snarf'
        client = oauth.Client(self.consumer, None)
        random_result = random.randint(1,100)

        def mockrequest(cl, ur, **kw):
            self.assertTrue(cl is client)
            self.assertEqual(frozenset(kw.keys()),
                                 frozenset(['method', 'body', 'redirections',
                                            'connection_type', 'headers']))
            self.assertEqual(kw['body'], b'')
            self.assertEqual(kw['connection_type'], None)
            self.assertEqual(kw['method'], 'GET')
            self.assertEqual(kw['redirections'],
                                 httplib2.DEFAULT_MAX_REDIRECTS)
            self.assertTrue(isinstance(kw['headers'], dict))

            req = oauth.Request.from_consumer_and_token(self.consumer, None,
                    http_method='GET', http_url=uri, parameters={})
            req.sign_request(oauth.SignatureMethod_HMAC_SHA1(),
                             self.consumer, None)
            expected = parse_qsl(
                            urlparse(req.to_url()).query)
            actual = parse_qsl(urlparse(ur).query)
            self.assertEqual(len(expected), len(actual))
            actual = dict(actual)
            for key, value in expected:
                if key not in ('oauth_signature',
                               'oauth_nonce', 'oauth_timestamp'):
                    self.assertEqual(actual[key], value)

            return random_result

        mockHttpRequest.side_effect = mockrequest

        client.request(uri, 'GET')