Python oauth2 模块,SignatureMethod_HMAC_SHA1() 实例源码

我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用oauth2.SignatureMethod_HMAC_SHA1()

项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_set_signature_method(self):
        consumer = oauth.Consumer('key', 'secret')
        client = oauth.Client(consumer)

        class Blah:
            pass

        try:
            client.set_signature_method(Blah())
            self.fail("Client.set_signature_method() accepted invalid method.")
        except ValueError:
            pass

        m = oauth.SignatureMethod_HMAC_SHA1()
        client.set_signature_method(m)
        self.assertEqual(m, client.method)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def sign_request(self, url, url_params={}):
        oauth_request = oauth2.Request(
            method="GET",
            url=url,
            parameters=url_params
        )
        oauth_request.update(
            {
                'oauth_nonce': oauth2.generate_nonce(),
                'oauth_timestamp': oauth2.generate_timestamp(),
                'oauth_token': self.token.key,
                'oauth_consumer_key': self.consumer.key
            }
        )
        oauth_request.sign_request(
            oauth2.SignatureMethod_HMAC_SHA1(),
            self.consumer,
            self.token
        )
        return oauth_request.to_url()
项目:fluiddb    作者:fluidinfo    | 项目源码 | 文件源码
def verifySignature(self, secret):
        """See L{IOAuthCredentials#verifySignature}."""
        consumer = Consumer(key=self.consumerKey, secret=secret)
        oauthRequest = Request.from_request(
            self.method, self.url, headers=self.headers,
            query_string=self.arguments)

        # verify the request has been oauth authorized, we only support
        # HMAC-SHA1, reject OAuth signatures if they use a different method
        if self.signatureMethod != 'HMAC-SHA1':
            raise NotImplementedError(
                'Unknown signature method: %s' % self.signatureMethod)
        signatureMethod = SignatureMethod_HMAC_SHA1()
        result = signatureMethod.check(oauthRequest, consumer, None,
                                       self.signature)
        return result
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_set_signature_method(self):
        consumer = oauth.Consumer('key', 'secret')
        client = oauth.Client(consumer)

        class Blah:
            pass

        try:
            client.set_signature_method(Blah())
            self.fail("Client.set_signature_method() accepted invalid method.")
        except ValueError:
            pass

        m = oauth.SignatureMethod_HMAC_SHA1()
        client.set_signature_method(m)
        self.assertEqual(m, client.method)
项目:oasis    作者:mhfowler    | 项目源码 | 文件源码
def __init__(self, access_token_key, access_token_secret, consumer_key, consumer_secret):
        self.access_token_key = access_token_key
        self.access_token_secret = access_token_secret
        self.consumer_key = consumer_key
        self.consumer_secret = consumer_secret

        _debug = 0

        self.oauth_token = oauth.Token(key=self.access_token_key, secret=self.access_token_secret)
        self.oauth_consumer = oauth.Consumer(key=self.consumer_key, secret=self.consumer_secret)

        self.signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1()


        self.http_handler  = urllib.HTTPHandler(debuglevel=_debug)
        self.https_handler = urllib.HTTPSHandler(debuglevel=_debug)
项目:MusicGenreClassification    作者:mlachmish    | 项目源码 | 文件源码
def signed_request(url, access_token=None):
    ''' return a signed request. Usually not used, save for specific
        functions like deriving a preview-clip URL.
    '''
    consumer = _consumer()

    req = oauth.Request.from_consumer_and_token(
                consumer,
                http_url=url,
                is_form_encoded=True,
                parameters={'country':api_settings.country})

    signing_method = oauth.SignatureMethod_HMAC_SHA1()
    req.sign_request(signing_method, consumer, access_token)

    return req
项目:MusicGenreClassification    作者:mlachmish    | 项目源码 | 文件源码
def signed_request(url, access_token=None):
    ''' return a signed request. Usually not used, save for specific
        functions like deriving a preview-clip URL.
    '''
    consumer = _consumer()

    req = oauth.Request.from_consumer_and_token(
                consumer,
                http_url=url,
                is_form_encoded=True,
                parameters={'country':api_settings.country})

    signing_method = oauth.SignatureMethod_HMAC_SHA1()
    req.sign_request(signing_method, consumer, access_token)

    return req
项目:yahoo-fantasy-football-metrics    作者:uberfastman    | 项目源码 | 文件源码
def __init__(self, api_key, shared_secret, httplib2_inst=None):
        """Override init to ensure required args"""
        super(TwoLegged, self).__init__(api_key, shared_secret, httplib2_inst)
        self.endpoint = PRIVATE_ENDPOINT
        self.hmac_sha1_signature = oauth.SignatureMethod_HMAC_SHA1()
        self.plaintext_signature = oauth.SignatureMethod_PLAINTEXT()
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def __init__(self, api_key, shared_secret, httplib2_inst=None):
        """Override init to ensure required args"""
        super(TwoLegged, self).__init__(api_key, shared_secret, httplib2_inst)
        self.endpoint = PRIVATE_ENDPOINT
        self.scheme = HTTPS_SCHEME
        self.hmac_sha1_signature = oauth.SignatureMethod_HMAC_SHA1()
        self.plaintext_signature = oauth.SignatureMethod_PLAINTEXT()
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_unset_consumer_and_token(self):
        consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret')
        token = oauth.Token('my_key', 'my_secret')
        request = oauth.Request("GET", "http://example.com/fetch.php")
        request.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer,
            token)

        self.assertEqual(consumer.key, request['oauth_consumer_key'])
        self.assertEqual(token.key, request['oauth_token'])
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_no_url_set(self):
        consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret')
        token = oauth.Token('my_key', 'my_secret')
        request = oauth.Request()
        self.assertRaises(ValueError,
                          request.sign_request,
                          oauth.SignatureMethod_HMAC_SHA1(), consumer, token)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_signature_base_unicode_nonascii(self):
        consumer = oauth.Consumer('consumer_token', 'consumer_secret')

        url = u('http://api.simplegeo.com:80/1.0/places/address.json'
                '?q=monkeys&category=animal'
                '&address=41+Decatur+St,+San+Francisc') + _U2766 + u(',+CA')
        req = oauth.Request("GET", url)
        self.assertReallyEqual(
            req.normalized_url,
            u('http://api.simplegeo.com/1.0/places/address.json'))
        req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
        self.assertReallyEqual(
            req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_signature_base_string_bytes_nonascii_nonutf8(self):
        consumer = oauth.Consumer('consumer_token', 'consumer_secret')

        url = (b'http://api.simplegeo.com:80/1.0/places/address.json'
               b'?q=monkeys&category=animal'
               b'&address=41+Decatur+St,+San+Francisc') + _B2766 + b',+CA'
        req = oauth.Request("GET", url)
        self.assertReallyEqual(
            req.normalized_url,
            u('http://api.simplegeo.com/1.0/places/address.json'))
        req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
        self.assertReallyEqual( #XXX
            req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_signature_base_bytes_nonascii_nonutf8_urlencoded(self):
        consumer = oauth.Consumer('consumer_token', 'consumer_secret')

        url = (b'http://api.simplegeo.com:80/1.0/places/address.json'
               b'?q=monkeys&category=animal'
               b'&address=41+Decatur+St,+San+Francisc%E2%9D%A6,+CA')
        req = oauth.Request("GET", url)
        self.assertReallyEqual(
            req.normalized_url,
            u('http://api.simplegeo.com/1.0/places/address.json'))
        req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
        self.assertReallyEqual(
            req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_signing_base(self):
        # example copied from
        # https://github.com/ciaranj/node-oauth/blob/master/tests/oauth.js
        # which in turns says that it was copied from
        # http://oauth.net/core/1.0/#sig_base_example .
        url = ("http://photos.example.net/photos?file=vacation.jpg"
               "&oauth_consumer_key=dpf43f3p2l4k3l03"
               "&oauth_nonce=kllo9940pd9333jh&oauth_signature_method=HMAC-SHA1"
               "&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk"
               "&oauth_version=1.0&size=original")

        req = oauth.Request("GET", url)

        sm = oauth.SignatureMethod_HMAC_SHA1()

        consumer = oauth.Consumer('dpf43f3p2l4k3l03', 'foo')
        key, raw = sm.signing_base(req, consumer, None)

        expected = b('GET&http%3A%2F%2Fphotos.example.net%2Fphotos'
                     '&file%3Dvacation.jpg'
                     '%26oauth_consumer_key%3Ddpf43f3p2l4k3l03'
                     '%26oauth_nonce%3Dkllo9940pd9333jh'
                     '%26oauth_signature_method%3DHMAC-SHA1'
                     '%26oauth_timestamp%3D1191242096'
                     '%26oauth_token%3Dnnch734d00sl2jdk'
                     '%26oauth_version%3D1.0%26size%3Doriginal')
        self.assertEqual(expected, raw)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_init(self):
        server = oauth.Server(signature_methods={'HMAC-SHA1' : oauth.SignatureMethod_HMAC_SHA1()})
        self.assertTrue('HMAC-SHA1' in server.signature_methods)
        self.assertTrue(isinstance(server.signature_methods['HMAC-SHA1'],
            oauth.SignatureMethod_HMAC_SHA1))

        server = oauth.Server()
        self.assertEqual(server.signature_methods, {})
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_add_signature_method(self):
        server = oauth.Server()
        res = server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
        self.assertTrue(len(res) == 1)
        self.assertTrue('HMAC-SHA1' in res)
        self.assertTrue(isinstance(res['HMAC-SHA1'],
            oauth.SignatureMethod_HMAC_SHA1))

        res = server.add_signature_method(oauth.SignatureMethod_PLAINTEXT())
        self.assertTrue(len(res) == 2)
        self.assertTrue('PLAINTEXT' in res)
        self.assertTrue(isinstance(res['PLAINTEXT'],
            oauth.SignatureMethod_PLAINTEXT))
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_verify_request(self):
        server = oauth.Server()
        server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())

        parameters = server.verify_request(self.request, self.consumer,
            self.token)

        self.assertTrue('bar' in parameters)
        self.assertTrue('foo' in parameters)
        self.assertTrue('multi' in parameters)
        self.assertEqual(parameters['bar'], 'blerg')
        self.assertEqual(parameters['foo'], 59)
        self.assertEqual(parameters['multi'], ['FOO','BAR'])
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_verify_request_invalid_signature(self):
        server = oauth.Server()
        server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
        self.request['oauth_signature'] = 'BOGUS'

        self.assertRaises(oauth.Error,
               server.verify_request, self.request, self.consumer, self.token)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_verify_request_invalid_timestamp(self):
        server = oauth.Server()
        server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
        self.request['oauth_timestamp'] -= 86400

        self.assertRaises(oauth.Error,
               server.verify_request, self.request, self.consumer, self.token)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_invalid_version(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_version': '222.9922',
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': int(time.time()),
            'bar': 'blerg',
            'multi': ['foo','bar'],
            'foo': 59
        }

        consumer = oauth.Consumer(key="consumer-key",
            secret="consumer-secret")
        token = oauth.Token(key="token-key", secret="token-secret")

        params['oauth_token'] = token.key
        params['oauth_consumer_key'] = consumer.key
        request = oauth.Request(method="GET", url=url, parameters=params)

        signature_method = oauth.SignatureMethod_HMAC_SHA1()
        request.sign_request(signature_method, consumer, token)

        server = oauth.Server()
        server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())

        self.assertRaises(oauth.Error, server.verify_request, request, consumer, token)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_invalid_signature_method(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_version': '1.0',
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': int(time.time()),
            'bar': 'blerg',
            'multi': ['FOO','BAR'],
            'foo': 59
        }

        consumer = oauth.Consumer(key="consumer-key",
            secret="consumer-secret")
        token = oauth.Token(key="token-key", secret="token-secret")

        params['oauth_token'] = token.key
        params['oauth_consumer_key'] = consumer.key
        request = oauth.Request(method="GET", url=url, parameters=params)

        signature_method = SignatureMethod_Bad()
        request.sign_request(signature_method, consumer, token)

        server = oauth.Server()
        server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())

        self.assertRaises(oauth.Error, server.verify_request, request,
            consumer, token)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_missing_signature(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_version': '1.0',
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': int(time.time()),
            'bar': 'blerg',
            'multi': ['FOO','BAR'],
            'foo': 59
        }

        consumer = oauth.Consumer(key="consumer-key",
            secret="consumer-secret")
        token = oauth.Token(key="token-key", secret="token-secret")

        params['oauth_token'] = token.key
        params['oauth_consumer_key'] = consumer.key
        request = oauth.Request(method="GET", url=url, parameters=params)

        signature_method = oauth.SignatureMethod_HMAC_SHA1()
        request.sign_request(signature_method, consumer, token)
        del request['oauth_signature']

        server = oauth.Server()
        server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())

        self.assertRaises(oauth.MissingSignature, server.verify_request,
            request, consumer, token)


# Request Token: http://oauth-sandbox.sevengoslings.net/request_token
# Auth: http://oauth-sandbox.sevengoslings.net/authorize
# Access Token: http://oauth-sandbox.sevengoslings.net/access_token
# Two-legged: http://oauth-sandbox.sevengoslings.net/two_legged
# Three-legged: http://oauth-sandbox.sevengoslings.net/three_legged
# Key: bd37aed57e15df53
# Secret: 0e9e6413a9ef49510a4f68ed02cd
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_url_with_query_string(self, mockHttpRequest):
        uri = 'http://example.com/foo/bar/?show=thundercats&character=snarf'
        client = oauth.Client(self.consumer, None)
        random_result = random.randint(1,100)

        def mockrequest(cl, ur, **kw):
            self.assertTrue(cl is client)
            self.assertEqual(frozenset(kw.keys()),
                                 frozenset(['method', 'body', 'redirections',
                                            'connection_type', 'headers']))
            self.assertEqual(kw['body'], b'')
            self.assertEqual(kw['connection_type'], None)
            self.assertEqual(kw['method'], 'GET')
            self.assertEqual(kw['redirections'],
                                 httplib2.DEFAULT_MAX_REDIRECTS)
            self.assertTrue(isinstance(kw['headers'], dict))

            req = oauth.Request.from_consumer_and_token(self.consumer, None,
                    http_method='GET', http_url=uri, parameters={})
            req.sign_request(oauth.SignatureMethod_HMAC_SHA1(),
                             self.consumer, None)
            expected = parse_qsl(
                            urlparse(req.to_url()).query)
            actual = parse_qsl(urlparse(ur).query)
            self.assertEqual(len(expected), len(actual))
            actual = dict(actual)
            for key, value in expected:
                if key not in ('oauth_signature',
                               'oauth_nonce', 'oauth_timestamp'):
                    self.assertEqual(actual[key], value)

            return random_result

        mockHttpRequest.side_effect = mockrequest

        client.request(uri, 'GET')
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def __init__(self, key: str, secret: str) -> None:
        super(RequestValidatorMixin, self).__init__()
        self.consumer_key = key
        self.consumer_secret = secret

        self.oauth_server = oauth2.Server()
        signature_method = oauth2.SignatureMethod_HMAC_SHA1()
        self.oauth_server.add_signature_method(signature_method)
        self.oauth_consumer = oauth2.Consumer(
            self.consumer_key, self.consumer_secret
        )
项目:TwitterCompetitionBot    作者:BenAllenUK    | 项目源码 | 文件源码
def SetCredentials(self,
                       consumer_key,
                       consumer_secret,
                       access_token_key=None,
                       access_token_secret=None):
        '''Set the consumer_key and consumer_secret for this instance

        Args:
          consumer_key:
            The consumer_key of the twitter account.
          consumer_secret:
            The consumer_secret for the twitter account.
          access_token_key:
            The oAuth access token key value you retrieved
            from running get_access_token.py.
          access_token_secret:
            The oAuth access token's secret, also retrieved
            from the get_access_token.py run.
        '''
        self._consumer_key        = consumer_key
        self._consumer_secret     = consumer_secret
        self._access_token_key    = access_token_key
        self._access_token_secret = access_token_secret
        self._oauth_consumer      = None

        if consumer_key is not None and consumer_secret is not None and \
                        access_token_key is not None and access_token_secret is not None:
            self._signature_method_plaintext = oauth.SignatureMethod_PLAINTEXT()
            self._signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1()

            self._oauth_token    = oauth.Token(key=access_token_key, secret=access_token_secret)
            self._oauth_consumer = oauth.Consumer(key=consumer_key, secret=consumer_secret)
项目:gif-disco    作者:futurice    | 项目源码 | 文件源码
def SetCredentials(self,
                     consumer_key,
                     consumer_secret,
                     access_token_key=None,
                     access_token_secret=None):
    '''Set the consumer_key and consumer_secret for this instance

    Args:
      consumer_key:
        The consumer_key of the twitter account.
      consumer_secret:
        The consumer_secret for the twitter account.
      access_token_key:
        The oAuth access token key value you retrieved
        from running get_access_token.py.
      access_token_secret:
        The oAuth access token's secret, also retrieved
        from the get_access_token.py run.
    '''
    self._consumer_key        = consumer_key
    self._consumer_secret     = consumer_secret
    self._access_token_key    = access_token_key
    self._access_token_secret = access_token_secret
    self._oauth_consumer      = None

    if consumer_key is not None and consumer_secret is not None and \
       access_token_key is not None and access_token_secret is not None:
      self._signature_method_plaintext = oauth.SignatureMethod_PLAINTEXT()
      self._signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1()

      self._oauth_token    = oauth.Token(key=access_token_key, secret=access_token_secret)
      self._oauth_consumer = oauth.Consumer(key=consumer_key, secret=consumer_secret)
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_unset_consumer_and_token(self):
        consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret')
        token = oauth.Token('my_key', 'my_secret')
        request = oauth.Request("GET", "http://example.com/fetch.php")
        request.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer,
            token)

        self.assertEqual(consumer.key, request['oauth_consumer_key'])
        self.assertEqual(token.key, request['oauth_token'])
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_no_url_set(self):
        consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret')
        token = oauth.Token('my_key', 'my_secret')
        request = oauth.Request()
        self.assertRaises(ValueError,
                          request.sign_request,
                          oauth.SignatureMethod_HMAC_SHA1(), consumer, token)
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_signature_base_unicode_nonascii(self):
        consumer = oauth.Consumer('consumer_token', 'consumer_secret')

        url = u('http://api.simplegeo.com:80/1.0/places/address.json'
                '?q=monkeys&category=animal'
                '&address=41+Decatur+St,+San+Francisc') + _U2766 + u(',+CA')
        req = oauth.Request("GET", url)
        self.assertReallyEqual(
            req.normalized_url,
            u('http://api.simplegeo.com/1.0/places/address.json'))
        req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
        self.assertReallyEqual(
            req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_signature_base_string_bytes_nonascii_nonutf8(self):
        consumer = oauth.Consumer('consumer_token', 'consumer_secret')

        url = (b'http://api.simplegeo.com:80/1.0/places/address.json'
               b'?q=monkeys&category=animal'
               b'&address=41+Decatur+St,+San+Francisc') + _B2766 + b',+CA'
        req = oauth.Request("GET", url)
        self.assertReallyEqual(
            req.normalized_url,
            u('http://api.simplegeo.com/1.0/places/address.json'))
        req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
        self.assertReallyEqual( #XXX
            req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_signature_base_bytes_nonascii_nonutf8_urlencoded(self):
        consumer = oauth.Consumer('consumer_token', 'consumer_secret')

        url = (b'http://api.simplegeo.com:80/1.0/places/address.json'
               b'?q=monkeys&category=animal'
               b'&address=41+Decatur+St,+San+Francisc%E2%9D%A6,+CA')
        req = oauth.Request("GET", url)
        self.assertReallyEqual(
            req.normalized_url,
            u('http://api.simplegeo.com/1.0/places/address.json'))
        req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
        self.assertReallyEqual(
            req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_signing_base(self):
        # example copied from
        # https://github.com/ciaranj/node-oauth/blob/master/tests/oauth.js
        # which in turns says that it was copied from
        # http://oauth.net/core/1.0/#sig_base_example .
        url = ("http://photos.example.net/photos?file=vacation.jpg"
               "&oauth_consumer_key=dpf43f3p2l4k3l03"
               "&oauth_nonce=kllo9940pd9333jh&oauth_signature_method=HMAC-SHA1"
               "&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk"
               "&oauth_version=1.0&size=original")

        req = oauth.Request("GET", url)

        sm = oauth.SignatureMethod_HMAC_SHA1()

        consumer = oauth.Consumer('dpf43f3p2l4k3l03', 'foo')
        key, raw = sm.signing_base(req, consumer, None)

        expected = b('GET&http%3A%2F%2Fphotos.example.net%2Fphotos'
                     '&file%3Dvacation.jpg'
                     '%26oauth_consumer_key%3Ddpf43f3p2l4k3l03'
                     '%26oauth_nonce%3Dkllo9940pd9333jh'
                     '%26oauth_signature_method%3DHMAC-SHA1'
                     '%26oauth_timestamp%3D1191242096'
                     '%26oauth_token%3Dnnch734d00sl2jdk'
                     '%26oauth_version%3D1.0%26size%3Doriginal')
        self.assertEqual(expected, raw)
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_init(self):
        server = oauth.Server(signature_methods={'HMAC-SHA1' : oauth.SignatureMethod_HMAC_SHA1()})
        self.assertTrue('HMAC-SHA1' in server.signature_methods)
        self.assertTrue(isinstance(server.signature_methods['HMAC-SHA1'],
            oauth.SignatureMethod_HMAC_SHA1))

        server = oauth.Server()
        self.assertEqual(server.signature_methods, {})
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_add_signature_method(self):
        server = oauth.Server()
        res = server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
        self.assertTrue(len(res) == 1)
        self.assertTrue('HMAC-SHA1' in res)
        self.assertTrue(isinstance(res['HMAC-SHA1'],
            oauth.SignatureMethod_HMAC_SHA1))

        res = server.add_signature_method(oauth.SignatureMethod_PLAINTEXT())
        self.assertTrue(len(res) == 2)
        self.assertTrue('PLAINTEXT' in res)
        self.assertTrue(isinstance(res['PLAINTEXT'],
            oauth.SignatureMethod_PLAINTEXT))
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_verify_request(self):
        server = oauth.Server()
        server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())

        parameters = server.verify_request(self.request, self.consumer,
            self.token)

        self.assertTrue('bar' in parameters)
        self.assertTrue('foo' in parameters)
        self.assertTrue('multi' in parameters)
        self.assertEqual(parameters['bar'], 'blerg')
        self.assertEqual(parameters['foo'], 59)
        self.assertEqual(parameters['multi'], ['FOO','BAR'])
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_verify_request_invalid_signature(self):
        server = oauth.Server()
        server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
        self.request['oauth_signature'] = 'BOGUS'

        self.assertRaises(oauth.Error,
               server.verify_request, self.request, self.consumer, self.token)
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_verify_request_invalid_timestamp(self):
        server = oauth.Server()
        server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
        self.request['oauth_timestamp'] -= 86400

        self.assertRaises(oauth.Error,
               server.verify_request, self.request, self.consumer, self.token)
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_invalid_version(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_version': '222.9922',
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': int(time.time()),
            'bar': 'blerg',
            'multi': ['foo','bar'],
            'foo': 59
        }

        consumer = oauth.Consumer(key="consumer-key",
            secret="consumer-secret")
        token = oauth.Token(key="token-key", secret="token-secret")

        params['oauth_token'] = token.key
        params['oauth_consumer_key'] = consumer.key
        request = oauth.Request(method="GET", url=url, parameters=params)

        signature_method = oauth.SignatureMethod_HMAC_SHA1()
        request.sign_request(signature_method, consumer, token)

        server = oauth.Server()
        server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())

        self.assertRaises(oauth.Error, server.verify_request, request, consumer, token)
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_invalid_signature_method(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_version': '1.0',
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': int(time.time()),
            'bar': 'blerg',
            'multi': ['FOO','BAR'],
            'foo': 59
        }

        consumer = oauth.Consumer(key="consumer-key",
            secret="consumer-secret")
        token = oauth.Token(key="token-key", secret="token-secret")

        params['oauth_token'] = token.key
        params['oauth_consumer_key'] = consumer.key
        request = oauth.Request(method="GET", url=url, parameters=params)

        signature_method = SignatureMethod_Bad()
        request.sign_request(signature_method, consumer, token)

        server = oauth.Server()
        server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())

        self.assertRaises(oauth.Error, server.verify_request, request,
            consumer, token)
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_missing_signature(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_version': '1.0',
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': int(time.time()),
            'bar': 'blerg',
            'multi': ['FOO','BAR'],
            'foo': 59
        }

        consumer = oauth.Consumer(key="consumer-key",
            secret="consumer-secret")
        token = oauth.Token(key="token-key", secret="token-secret")

        params['oauth_token'] = token.key
        params['oauth_consumer_key'] = consumer.key
        request = oauth.Request(method="GET", url=url, parameters=params)

        signature_method = oauth.SignatureMethod_HMAC_SHA1()
        request.sign_request(signature_method, consumer, token)
        del request['oauth_signature']

        server = oauth.Server()
        server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())

        self.assertRaises(oauth.MissingSignature, server.verify_request,
            request, consumer, token)


# Request Token: http://oauth-sandbox.sevengoslings.net/request_token
# Auth: http://oauth-sandbox.sevengoslings.net/authorize
# Access Token: http://oauth-sandbox.sevengoslings.net/access_token
# Two-legged: http://oauth-sandbox.sevengoslings.net/two_legged
# Three-legged: http://oauth-sandbox.sevengoslings.net/three_legged
# Key: bd37aed57e15df53
# Secret: 0e9e6413a9ef49510a4f68ed02cd
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_url_with_query_string(self, mockHttpRequest):
        uri = 'http://example.com/foo/bar/?show=thundercats&character=snarf'
        client = oauth.Client(self.consumer, None)
        random_result = random.randint(1,100)

        def mockrequest(cl, ur, **kw):
            self.assertTrue(cl is client)
            self.assertEqual(frozenset(kw.keys()),
                                 frozenset(['method', 'body', 'redirections',
                                            'connection_type', 'headers']))
            self.assertEqual(kw['body'], b'')
            self.assertEqual(kw['connection_type'], None)
            self.assertEqual(kw['method'], 'GET')
            self.assertEqual(kw['redirections'],
                                 httplib2.DEFAULT_MAX_REDIRECTS)
            self.assertTrue(isinstance(kw['headers'], dict))

            req = oauth.Request.from_consumer_and_token(self.consumer, None,
                    http_method='GET', http_url=uri, parameters={})
            req.sign_request(oauth.SignatureMethod_HMAC_SHA1(),
                             self.consumer, None)
            expected = parse_qsl(
                            urlparse(req.to_url()).query)
            actual = parse_qsl(urlparse(ur).query)
            self.assertEqual(len(expected), len(actual))
            actual = dict(actual)
            for key, value in expected:
                if key not in ('oauth_signature',
                               'oauth_nonce', 'oauth_timestamp'):
                    self.assertEqual(actual[key], value)

            return random_result

        mockHttpRequest.side_effect = mockrequest

        client.request(uri, 'GET')
项目:TumblrVideos    作者:moedje    | 项目源码 | 文件源码
def post_multipart(self, url, params, files):
        """
        Generates and issues a multipart request for data files

        :param url: a string, the url you are requesting
        :param params: a dict, a key-value of all the parameters
        :param files:  a list, the list of tuples for your data

        :returns: a dict parsed from the JSON response
        """
        # combine the parameters with the generated oauth params
        params = dict(params.items() + self.generate_oauth_params().items())
        faux_req = oauth.Request(method="POST", url=url, parameters=params)
        faux_req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), self.consumer, self.token)
        params = dict(parse_qsl(faux_req.to_postdata()))

        content_type, body = self.encode_multipart_formdata(params, files)
        headers = {'Content-Type': content_type, 'Content-Length': str(len(body))}

        # Do a bytearray of the body and everything seems ok
        r = urllib2.Request(url, body, headers)
        if self.proxy_url:
            proxy = urllib2.ProxyHandler({'http': self.proxy_url, 'https': self.proxy_url})
            opener = urllib2.build_opener(proxy)
            urllib2.install_opener(opener)
        content = urllib2.urlopen(r).read()
        return self.json_parse(content)
项目:Chirps    作者:vered1986    | 项目源码 | 文件源码
def SetCredentials(self,
                     consumer_key,
                     consumer_secret,
                     access_token_key=None,
                     access_token_secret=None):
    '''Set the consumer_key and consumer_secret for this instance

    Args:
      consumer_key:
        The consumer_key of the twitter account.
      consumer_secret:
        The consumer_secret for the twitter account.
      access_token_key:
        The oAuth access token key value you retrieved
        from running get_access_token.py.
      access_token_secret:
        The oAuth access token's secret, also retrieved
        from the get_access_token.py run.
    '''
    self._consumer_key        = consumer_key
    self._consumer_secret     = consumer_secret
    self._access_token_key    = access_token_key
    self._access_token_secret = access_token_secret
    self._oauth_consumer      = None

    if consumer_key is not None and consumer_secret is not None and \
       access_token_key is not None and access_token_secret is not None:
      self._signature_method_plaintext = oauth.SignatureMethod_PLAINTEXT()
      self._signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1()

      self._oauth_token    = oauth.Token(key=access_token_key, secret=access_token_secret)
      self._oauth_consumer = oauth.Consumer(key=consumer_key, secret=consumer_secret)
项目:deep-playlist    作者:anushabala    | 项目源码 | 文件源码
def get_preview_url(api_key, api_secret, sdl_id):
    consumer = oauth.Consumer(api_key, api_secret)
    request_url = "http://previews.7digital.com/clip/%s" % sdl_id

    req = oauth.Request(method="GET", url=request_url, is_form_encoded=True)

    req['oauth_timestamp'] = oauth.Request.make_timestamp()
    req['oauth_nonce'] = oauth.Request.make_nonce()
    req['country'] = 'US'
    sig_method = oauth.SignatureMethod_HMAC_SHA1()

    req.sign_request(sig_method, consumer, token=None)

    url = req.to_url()
    return url, False
项目:api_stuff    作者:tgebhart    | 项目源码 | 文件源码
def superSecureRequest(self, host, path, url_params=None):
        """Prepares OAuth authentication and sends the request to the API.
        Args:
            host (str): The domain host of the API.
            path (str): The path of the API after the domain.
            url_params (dict): An optional set of query parameters in the request.
        Returns:
            dict: The JSON response from the request.
        Raises:
            urllib2.HTTPError: An error occurs from the HTTP request.
        """
        url_params = url_params or {}
        url = 'https://{0}{1}?'.format(host, urllib.quote(path.encode('utf8')))

        consumer = oauth2.Consumer(self.oath_consumer_key, self.oath_consumer_secret)
        oauth_request = oauth2.Request(
            method="GET", url=url, parameters=url_params)

        oauth_request.update(
            {
                'oauth_nonce': oauth2.generate_nonce(),
                'oauth_timestamp': oauth2.generate_timestamp(),
                'oauth_token': self.oath_token,
                'oauth_consumer_key': self.oath_consumer_key
            }
        )
        token = oauth2.Token(self.oath_token, self.oath_token_secret)
        oauth_request.sign_request(
            oauth2.SignatureMethod_HMAC_SHA1(), consumer, token)
        signed_url = oauth_request.to_url()

        conn = urllib2.urlopen(signed_url, None)
        try:
            response = json.loads(conn.read().decode())
        finally:
            conn.close()

        return response