Python oauth2 模块,Token() 实例源码

我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用oauth2.Token()

项目:socialauth    作者:emilyhorsman    | 项目源码 | 文件源码
def decode_oauth_token_secret(self):
        if not self.token_cookie:
            raise Error('No token cookie given')

        try:
            payload = jwt.decode(self.token_cookie,
                                 self.token_secret,
                                 algorithm = 'HS256')
        except:
            raise Error('Failed to retrieve oauth_token_secret from token')

        self.oauth_token_secret = payload.get('data', {}).get('id', None)
        if not self.oauth_token_secret:
            raise Error('Token does not have an oauth_token_secret')

        return self.oauth_token_secret
项目:sndlatr    作者:Schibum    | 项目源码 | 文件源码
def oauth_login(self, url, oauth_token, oauth_token_secret,
                    consumer_key='anonymous', consumer_secret='anonymous'):
        """Authenticate using the OAUTH method.

        This only works with IMAP servers that support OAUTH (e.g. Gmail).
        """
        if oauth_module:
            token = oauth_module.Token(oauth_token, oauth_token_secret)
            consumer = oauth_module.Consumer(consumer_key, consumer_secret)
            xoauth_callable = lambda x: oauth_module.build_xoauth_string(url,
                                                                         consumer,
                                                                         token)
            return self._command_and_check('authenticate', 'XOAUTH',
                                           xoauth_callable, unpack=True)
        else:
            raise self.Error(
                'The optional oauth2 package is needed for OAUTH authentication')
项目:yahoo-fantasy-football-metrics    作者:uberfastman    | 项目源码 | 文件源码
def get_token_and_auth_url(self, callback_url=None):
        """First step is to get the token and then send the request that
        provides the auth URL

        Returns a tuple of token and the authorisation URL.

        """

        client = oauth.Client(self.consumer)

        params = {}
        params['oauth_callback'] = callback_url or 'oob'

        request = oauth.Request(parameters=params)
        url = REQUEST_TOKEN_URL
        resp, content = client.request(url, "POST", request.to_postdata())

        if resp.get('status') == '200':
            token = oauth.Token.from_string(content)
            yql_logger.debug("token: %s", token)
            data = dict(parse_qsl(content))
            yql_logger.debug("data: %s", data)
            return token, data['xoauth_request_auth_url']
        else:
            raise YQLError(resp, content, url)
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def get_token_and_auth_url(self, callback_url=None):
        """First step is to get the token and then send the request that
        provides the auth URL

        Returns a tuple of token and the authorisation URL.

        """

        client = oauth.Client(self.consumer)

        params = {}
        params['oauth_callback'] = callback_url or 'oob'

        request = oauth.Request(parameters=params)
        url = REQUEST_TOKEN_URL
        resp, content = client.request(url, "POST", request.to_postdata())

        if resp.get('status') == '200':
            token = oauth.Token.from_string(content)
            yql_logger.debug("token: %s", token)
            data = dict(parse_qsl(content))
            yql_logger.debug("data: %s", data)
            return token, data['xoauth_request_auth_url']
        else:
            raise YQLError, (resp, content, url)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_from_string(self):
        self.assertRaises(ValueError, lambda: oauth.Token.from_string(''))
        self.assertRaises(ValueError, lambda: oauth.Token.from_string('blahblahblah'))
        self.assertRaises(ValueError, lambda: oauth.Token.from_string('blah=blah'))

        self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token_secret=asfdasf'))
        self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token_secret='))
        self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=asfdasf'))
        self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token='))
        self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=&oauth_token_secret='))
        self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=tooken%26oauth_token_secret=seecret'))

        string = self.token.to_string()
        new = oauth.Token.from_string(string)
        self._compare_tokens(new)

        self.token.set_callback('http://www.example.com/my-callback')
        string = self.token.to_string()
        new = oauth.Token.from_string(string)
        self._compare_tokens(new)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_from_token_and_callback(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_version': "1.0",
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': "137131200",
            'oauth_consumer_key': "0685bd9184jfhq22",
            'oauth_signature_method': "HMAC-SHA1",
            'oauth_token': "ad180jjd733klru7",
            'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
        }

        tok = oauth.Token(key="tok-test-key", secret="tok-test-secret")
        req = oauth.Request.from_token_and_callback(tok)
        self.assertFalse('oauth_callback' in req)
        self.assertEqual(req['oauth_token'], tok.key)

        req = oauth.Request.from_token_and_callback(tok, callback=url)
        self.assertTrue('oauth_callback' in req)
        self.assertEqual(req['oauth_callback'], url)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def setUp(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_version': "1.0",
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': int(time.time()),
            'bar': 'blerg',
            'multi': ['FOO','BAR'],
            'foo': 59
        }

        self.consumer = oauth.Consumer(key="consumer-key",
            secret="consumer-secret")
        self.token = oauth.Token(key="token-key", secret="token-secret")

        params['oauth_token'] = self.token.key
        params['oauth_consumer_key'] = self.consumer.key
        self.request = oauth.Request(method="GET", url=url, parameters=params)

        signature_method = oauth.SignatureMethod_HMAC_SHA1()
        self.request.sign_request(signature_method, self.consumer, self.token)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_init(self):
        class Blah():
            pass

        try:
            client = oauth.Client(Blah())
            self.fail("Client.__init__() accepted invalid Consumer.")
        except ValueError:
            pass

        consumer = oauth.Consumer('token', 'secret')
        try:
            client = oauth.Client(consumer, Blah())
            self.fail("Client.__init__() accepted invalid Token.")
        except ValueError:
            pass
项目:trump2cash    作者:maxbbraun    | 项目源码 | 文件源码
def make_request(self, url, method="GET", body="", headers=None):
        """Makes a request to the TradeKing API."""

        consumer = Consumer(key=TRADEKING_CONSUMER_KEY,
                            secret=TRADEKING_CONSUMER_SECRET)
        token = Token(key=TRADEKING_ACCESS_TOKEN,
                      secret=TRADEKING_ACCESS_TOKEN_SECRET)
        client = Client(consumer, token)

        self.logs.debug("TradeKing request: %s %s %s %s" %
                        (url, method, body, headers))
        response, content = client.request(url, method=method, body=body,
                                           headers=headers)
        self.logs.debug("TradeKing response: %s %s" % (response, content))

        try:
            return loads(content)
        except ValueError:
            self.logs.error("Failed to decode JSON response: %s" % content)
            return None
项目:socialauth    作者:emilyhorsman    | 项目源码 | 文件源码
def get_user_information(self):
        token = oauth2.Token(self.oauth_token, self.oauth_token_secret)
        token.set_verifier(self.oauth_verifier)
        client = oauth2.Client(self.consumer, token)

        url = 'https://api.twitter.com/oauth/access_token'
        resp, content = client.request(url, 'POST')
        if resp.status != 200:
            raise Error('{} from Twitter'.format(resp.status))

        provider_user = dict(parse_qsl(content.decode('utf-8')))
        if 'user_id' not in provider_user:
            raise Error('No user_id from Twitter')

        self.status    = 200
        self.user_id   = provider_user.get('user_id')
        self.user_name = provider_user.get('screen_name', None)
        return (self.user_id, self.user_name,)
项目:ewe_ebooks    作者:jaymcgrath    | 项目源码 | 文件源码
def get_access_token(self, oauth_token, oauth_token_secret, oauth_verifier):
        """
        Func for final leg of three-legged oauth,
        takes token and secret returned in oauth_callback GET dict and
         exchanges them for the permanent access token and secret
        returns an access_token dict with two keys, oauth_token and oauth_token_secret
        """
        token = oauth.Token(oauth_token, oauth_token_secret)
        token.set_verifier(oauth_verifier)
        consumer = oauth.Consumer(CONSUMER_KEY, CONSUMER_SECRET)
        client = oauth.Client(consumer, token)

        # Now we fire the request at access token url instead of request token
        resp, content = client.request(self.ACCESS_TOKEN_URL, "POST")

        if resp['status'] != '200':
            raise Exception("Invalid response %s." % resp['status'])

        access_token = dict(urllib.parse.parse_qsl(content))

        # urllib returns bytes, which will need to be decoded using the string.decode() method before use
        access_token = {key.decode(): value.decode() for key, value in access_token.items()}

        # Return the token dict containing token and secret
        return access_token
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_from_string(self):
        self.assertRaises(ValueError, lambda: oauth.Token.from_string(''))
        self.assertRaises(ValueError, lambda: oauth.Token.from_string('blahblahblah'))
        self.assertRaises(ValueError, lambda: oauth.Token.from_string('blah=blah'))

        self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token_secret=asfdasf'))
        self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token_secret='))
        self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=asfdasf'))
        self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token='))
        self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=&oauth_token_secret='))
        self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=tooken%26oauth_token_secret=seecret'))

        string = self.token.to_string()
        new = oauth.Token.from_string(string)
        self._compare_tokens(new)

        self.token.set_callback('http://www.example.com/my-callback')
        string = self.token.to_string()
        new = oauth.Token.from_string(string)
        self._compare_tokens(new)
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_from_token_and_callback(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_version': "1.0",
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': "137131200",
            'oauth_consumer_key': "0685bd9184jfhq22",
            'oauth_signature_method': "HMAC-SHA1",
            'oauth_token': "ad180jjd733klru7",
            'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
        }

        tok = oauth.Token(key="tok-test-key", secret="tok-test-secret")
        req = oauth.Request.from_token_and_callback(tok)
        self.assertFalse('oauth_callback' in req)
        self.assertEqual(req['oauth_token'], tok.key)

        req = oauth.Request.from_token_and_callback(tok, callback=url)
        self.assertTrue('oauth_callback' in req)
        self.assertEqual(req['oauth_callback'], url)
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def setUp(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_version': "1.0",
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': int(time.time()),
            'bar': 'blerg',
            'multi': ['FOO','BAR'],
            'foo': 59
        }

        self.consumer = oauth.Consumer(key="consumer-key",
            secret="consumer-secret")
        self.token = oauth.Token(key="token-key", secret="token-secret")

        params['oauth_token'] = self.token.key
        params['oauth_consumer_key'] = self.consumer.key
        self.request = oauth.Request(method="GET", url=url, parameters=params)

        signature_method = oauth.SignatureMethod_HMAC_SHA1()
        self.request.sign_request(signature_method, self.consumer, self.token)
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_init(self):
        class Blah():
            pass

        try:
            client = oauth.Client(Blah())
            self.fail("Client.__init__() accepted invalid Consumer.")
        except ValueError:
            pass

        consumer = oauth.Consumer('token', 'secret')
        try:
            client = oauth.Client(consumer, Blah())
            self.fail("Client.__init__() accepted invalid Token.")
        except ValueError:
            pass
项目:oasis    作者:mhfowler    | 项目源码 | 文件源码
def __init__(self, access_token_key, access_token_secret, consumer_key, consumer_secret):
        self.access_token_key = access_token_key
        self.access_token_secret = access_token_secret
        self.consumer_key = consumer_key
        self.consumer_secret = consumer_secret

        _debug = 0

        self.oauth_token = oauth.Token(key=self.access_token_key, secret=self.access_token_secret)
        self.oauth_consumer = oauth.Consumer(key=self.consumer_key, secret=self.consumer_secret)

        self.signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1()


        self.http_handler  = urllib.HTTPHandler(debuglevel=_debug)
        self.https_handler = urllib.HTTPSHandler(debuglevel=_debug)
项目:TumblrVideos    作者:moedje    | 项目源码 | 文件源码
def __init__(self, consumer_key, consumer_secret="", oauth_token="", oauth_secret="", host="https://api.tumblr.com",
                 proxy_url=None):
        self.host = host
        self.consumer = oauth.Consumer(key=consumer_key, secret=consumer_secret)
        self.token = oauth.Token(key=oauth_token, secret=oauth_secret)
        self.proxy_url = proxy_url
        if proxy_url:
            print("Generating Proxy From proxy_url")
            self.proxy_info = httplib2.proxy_info_from_url("https://" + proxy_url, 'http')
            self.proxy_info.proxy_rdns = True
            # uri = urlparse(proxy_url)
            # self.proxy_info = ProxyInfo(socks.PROXY_TYPE_HTTP,uri.hostname,uri.port,proxy_rdns=True)
        else:
            print("Generating proxy from ENV")
            proxy_url = os.environ.get('HTTPS_PROXY', None)
            if proxy_url:
                uri = urlparse(proxy_url)
                self.proxy_info = ProxyInfo(socks.PROXY_TYPE_HTTP, uri.hostname, uri.port, proxy_rdns=True)
            else:
                self.proxy_info = None
项目:dancedeets-monorepo    作者:mikelambert    | 项目源码 | 文件源码
def twitter_oauth2(oauth_token, oauth_verifier):
    auth_tokens = db.OAuthToken.query(db.OAuthToken.temp_oauth_token == oauth_token, db.OAuthToken.application == db.APP_TWITTER).fetch(1)
    if not auth_tokens:
        return None
    auth_token = auth_tokens[0]
    # Step 3: Once the consumer has redirected the user back to the oauth_callback
    # URL you can request the access token the user has approved. You use the
    # request token to sign this request. After this is done you throw away the
    # request token and use the access token returned. You should store this
    # access token somewhere safe, like a database, for future use.
    token = oauth.Token(oauth_token, auth_token.temp_oauth_token_secret)
    token.set_verifier(oauth_verifier)
    consumer = oauth.Consumer(auth.consumer_key, auth.consumer_secret)
    client = oauth.Client(consumer, token)

    resp, content = client.request(access_token_url, "POST")
    access_token = dict(urlparse.parse_qsl(content))
    auth_token.oauth_token = access_token['oauth_token']
    auth_token.oauth_token_secret = access_token['oauth_token_secret']
    auth_token.valid_token = True
    auth_token.time_between_posts = 5 * 60  # every 5 minutes
    auth_token.put()
    return auth_token
项目:splitwise    作者:namaggarwal    | 项目源码 | 文件源码
def __init__(self,consumer_key,consumer_secret,access_token=None):
        """ Initializes the splitwise class. Sets consumer and access token

        Args:
            consumer_key (str) : Consumer Key provided by Spliwise
            consumer_secret (str): Consumer Secret provided by Splitwise
            access_token (:obj: `dict`) Access Token is a combination of oauth_token and oauth_token_secret

        Returns:
            A Splitwise Object
        """
        self.consumer = oauth.Consumer(consumer_key, consumer_secret)

        #If access token is present then set the Access token
        if access_token:
            self.setAccessToken(access_token)
项目:splitwise    作者:namaggarwal    | 项目源码 | 文件源码
def getAccessToken(self,oauth_token,oauth_token_secret,oauth_verifier):

        token = oauth.Token(oauth_token,oauth_token_secret)
        token.set_verifier(oauth_verifier)
        client = oauth.Client(self.consumer, token)

        resp, content = client.request(Splitwise.ACCESS_TOKEN_URL, "POST")

        if Splitwise.isDebug():
            print(resp, content)

        #Check if the response is correct
        if resp['status'] != '200':
            raise Exception("Invalid response %s. Please check your consumer key and secret." % resp['status'])

        access_token = dict(parse_qsl(content.decode("utf-8")))

        return access_token
项目:Build-a-Bot    作者:ShanTulshi    | 项目源码 | 文件源码
def get_tweets(twitter_handle):
    CONSUMER_KEY = 'YurZ2aWgVEYVhE5c60XyWRGG0'
    CONSUMER_SECRET = 'TIaSl3xnVTNUzHrwz84lOycTYkwe9l1NocwkB4hGbd2ngMufn6'
    ACCESS_KEY = '564268368-tRdDLN3O9EKzlaY28NzHCgNsYJX59YRngv3qjjJh'
    ACCESS_SECRET = 'VUjRU2ftlmnUdqDtppMiV6LLqT83ZbKDDUjcpWtrT1PG4'

    consumer = oauth.Consumer(key=CONSUMER_KEY, secret=CONSUMER_SECRET)
    access_token = oauth.Token(key=ACCESS_KEY, secret=ACCESS_SECRET)
    client = oauth.Client(consumer, access_token)

    endpoint = "https://api.twitter.com/1.1/search/tweets.json?q=from%3A" + twitter_handle + "&result_type=all&count=100"
    response, data = client.request(endpoint)

    tweet_data = json.loads(data.decode('utf-8'))
    tweets = []
    for tweet in tweet_data['statuses']:
        tweets.append(re.sub(r'https:\/\/t\.co\/.{10}', '', tweet['text'] + ' \n'))
    return tweets

#example
# print(get_tweets("Cmdr_Hadfield"))
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def authenticate(self, url, consumer, token):
        if consumer is not None and not isinstance(consumer, oauth2.Consumer):
            raise ValueError("Invalid consumer.")

        if token is not None and not isinstance(token, oauth2.Token):
            raise ValueError("Invalid token.")

        self.docmd('AUTH', 'XOAUTH %s' % \
            base64.b64encode(oauth2.build_xoauth_string(url, consumer, token)))
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def authenticate(self, url, consumer, token):
        if consumer is not None and not isinstance(consumer, oauth2.Consumer):
            raise ValueError("Invalid consumer.")

        if token is not None and not isinstance(token, oauth2.Token):
            raise ValueError("Invalid token.")

        imaplib.IMAP4_SSL.authenticate(self, 'XOAUTH',
            lambda x: oauth2.build_xoauth_string(url, consumer, token))
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def authenticate(self, url, consumer, token):
        if consumer is not None and not isinstance(consumer, oauth2.Consumer):
            raise ValueError("Invalid consumer.")

        if token is not None and not isinstance(token, oauth2.Token):
            raise ValueError("Invalid token.")

        self.docmd('AUTH', 'XOAUTH %s' % \
            base64.b64encode(oauth2.build_xoauth_string(url, consumer, token)))
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def authenticate(self, url, consumer, token):
        if consumer is not None and not isinstance(consumer, oauth2.Consumer):
            raise ValueError("Invalid consumer.")

        if token is not None and not isinstance(token, oauth2.Token):
            raise ValueError("Invalid token.")

        imaplib.IMAP4_SSL.authenticate(self, 'XOAUTH',
            lambda x: oauth2.build_xoauth_string(url, consumer, token))
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def setUp(self):
        self.key = 'my-key'
        self.secret = 'my-secret'
        self.token = oauth.Token(self.key, self.secret)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_basic(self):
        self.assertRaises(ValueError, lambda: oauth.Token(None, None))
        self.assertRaises(ValueError, lambda: oauth.Token('asf', None))
        self.assertRaises(ValueError, lambda: oauth.Token(None, 'dasf'))
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_unset_consumer_and_token(self):
        consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret')
        token = oauth.Token('my_key', 'my_secret')
        request = oauth.Request("GET", "http://example.com/fetch.php")
        request.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer,
            token)

        self.assertEqual(consumer.key, request['oauth_consumer_key'])
        self.assertEqual(token.key, request['oauth_token'])
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_no_url_set(self):
        consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret')
        token = oauth.Token('my_key', 'my_secret')
        request = oauth.Request()
        self.assertRaises(ValueError,
                          request.sign_request,
                          oauth.SignatureMethod_HMAC_SHA1(), consumer, token)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_from_consumer_and_token(self):
        url = "http://sp.example.com/"

        tok = oauth.Token(key="tok-test-key", secret="tok-test-secret")
        tok.set_verifier('this_is_a_test_verifier')
        con = oauth.Consumer(key="con-test-key", secret="con-test-secret")
        req = oauth.Request.from_consumer_and_token(con, token=tok,
            http_method="GET", http_url=url)

        self.assertEqual(req['oauth_token'], tok.key)
        self.assertEqual(req['oauth_consumer_key'], con.key)
        self.assertEqual(tok.verifier, req['oauth_verifier'])
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_no_version(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': int(time.time()),
            'bar': 'blerg',
            'multi': ['FOO','BAR'],
            'foo': 59
        }

        self.consumer = oauth.Consumer(key="consumer-key",
            secret="consumer-secret")
        self.token = oauth.Token(key="token-key", secret="token-secret")

        params['oauth_token'] = self.token.key
        params['oauth_consumer_key'] = self.consumer.key
        self.request = oauth.Request(method="GET", url=url, parameters=params)

        signature_method = oauth.SignatureMethod_HMAC_SHA1()
        self.request.sign_request(signature_method, self.consumer, self.token)

        server = oauth.Server()
        server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())

        parameters = server.verify_request(self.request, self.consumer,
            self.token)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_invalid_version(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_version': '222.9922',
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': int(time.time()),
            'bar': 'blerg',
            'multi': ['foo','bar'],
            'foo': 59
        }

        consumer = oauth.Consumer(key="consumer-key",
            secret="consumer-secret")
        token = oauth.Token(key="token-key", secret="token-secret")

        params['oauth_token'] = token.key
        params['oauth_consumer_key'] = consumer.key
        request = oauth.Request(method="GET", url=url, parameters=params)

        signature_method = oauth.SignatureMethod_HMAC_SHA1()
        request.sign_request(signature_method, consumer, token)

        server = oauth.Server()
        server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())

        self.assertRaises(oauth.Error, server.verify_request, request, consumer, token)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def test_missing_signature(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_version': '1.0',
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': int(time.time()),
            'bar': 'blerg',
            'multi': ['FOO','BAR'],
            'foo': 59
        }

        consumer = oauth.Consumer(key="consumer-key",
            secret="consumer-secret")
        token = oauth.Token(key="token-key", secret="token-secret")

        params['oauth_token'] = token.key
        params['oauth_consumer_key'] = consumer.key
        request = oauth.Request(method="GET", url=url, parameters=params)

        signature_method = oauth.SignatureMethod_HMAC_SHA1()
        request.sign_request(signature_method, consumer, token)
        del request['oauth_signature']

        server = oauth.Server()
        server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())

        self.assertRaises(oauth.MissingSignature, server.verify_request,
            request, consumer, token)


# Request Token: http://oauth-sandbox.sevengoslings.net/request_token
# Auth: http://oauth-sandbox.sevengoslings.net/authorize
# Access Token: http://oauth-sandbox.sevengoslings.net/access_token
# Two-legged: http://oauth-sandbox.sevengoslings.net/two_legged
# Three-legged: http://oauth-sandbox.sevengoslings.net/three_legged
# Key: bd37aed57e15df53
# Secret: 0e9e6413a9ef49510a4f68ed02cd
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def __init__(
        self,
        consumer_key,
        consumer_secret,
        token,
        token_secret
    ):
        self.consumer = oauth2.Consumer(consumer_key, consumer_secret)
        self.token = oauth2.Token(token, token_secret)
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def is_valid_request(
        self,
        request: t.Any,
        parameters: t.MutableMapping[str, str] = {},
        fake_method: t.Any = None,
        handle_error: bool = True
    ) -> bool:
        '''
        Validates an OAuth request using the python-oauth2 library:
            https://github.com/simplegeo/python-oauth2
        '''

        def handle(e: oauth2.Error) -> bool:
            if handle_error:
                return False
            else:
                raise e

        try:
            method, url, headers, parameters = self.parse_request(
                request, parameters, fake_method
            )

            oauth_request = oauth2.Request.from_request(
                method, url, headers=headers, parameters=parameters
            )

            oauth2.Token
            self.oauth_server.verify_request(
                oauth_request, self.oauth_consumer, {}
            )

        except oauth2.Error as e:
            return handle(e)
        except ValueError as e:
            return handle(e)
        # Signature was valid
        return True
项目:TwitterCompetitionBot    作者:BenAllenUK    | 项目源码 | 文件源码
def SetCredentials(self,
                       consumer_key,
                       consumer_secret,
                       access_token_key=None,
                       access_token_secret=None):
        '''Set the consumer_key and consumer_secret for this instance

        Args:
          consumer_key:
            The consumer_key of the twitter account.
          consumer_secret:
            The consumer_secret for the twitter account.
          access_token_key:
            The oAuth access token key value you retrieved
            from running get_access_token.py.
          access_token_secret:
            The oAuth access token's secret, also retrieved
            from the get_access_token.py run.
        '''
        self._consumer_key        = consumer_key
        self._consumer_secret     = consumer_secret
        self._access_token_key    = access_token_key
        self._access_token_secret = access_token_secret
        self._oauth_consumer      = None

        if consumer_key is not None and consumer_secret is not None and \
                        access_token_key is not None and access_token_secret is not None:
            self._signature_method_plaintext = oauth.SignatureMethod_PLAINTEXT()
            self._signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1()

            self._oauth_token    = oauth.Token(key=access_token_key, secret=access_token_secret)
            self._oauth_consumer = oauth.Consumer(key=consumer_key, secret=consumer_secret)
项目:gif-disco    作者:futurice    | 项目源码 | 文件源码
def SetCredentials(self,
                     consumer_key,
                     consumer_secret,
                     access_token_key=None,
                     access_token_secret=None):
    '''Set the consumer_key and consumer_secret for this instance

    Args:
      consumer_key:
        The consumer_key of the twitter account.
      consumer_secret:
        The consumer_secret for the twitter account.
      access_token_key:
        The oAuth access token key value you retrieved
        from running get_access_token.py.
      access_token_secret:
        The oAuth access token's secret, also retrieved
        from the get_access_token.py run.
    '''
    self._consumer_key        = consumer_key
    self._consumer_secret     = consumer_secret
    self._access_token_key    = access_token_key
    self._access_token_secret = access_token_secret
    self._oauth_consumer      = None

    if consumer_key is not None and consumer_secret is not None and \
       access_token_key is not None and access_token_secret is not None:
      self._signature_method_plaintext = oauth.SignatureMethod_PLAINTEXT()
      self._signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1()

      self._oauth_token    = oauth.Token(key=access_token_key, secret=access_token_secret)
      self._oauth_consumer = oauth.Consumer(key=consumer_key, secret=consumer_secret)
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def authenticate(self, url, consumer, token):
        if consumer is not None and not isinstance(consumer, oauth2.Consumer):
            raise ValueError("Invalid consumer.")

        if token is not None and not isinstance(token, oauth2.Token):
            raise ValueError("Invalid token.")

        self.docmd('AUTH', 'XOAUTH %s' % \
            base64.b64encode(oauth2.build_xoauth_string(url, consumer, token)))
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def authenticate(self, url, consumer, token):
        if consumer is not None and not isinstance(consumer, oauth2.Consumer):
            raise ValueError("Invalid consumer.")

        if token is not None and not isinstance(token, oauth2.Token):
            raise ValueError("Invalid token.")

        imaplib.IMAP4_SSL.authenticate(self, 'XOAUTH',
            lambda x: oauth2.build_xoauth_string(url, consumer, token))
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def setUp(self):
        self.key = 'my-key'
        self.secret = 'my-secret'
        self.token = oauth.Token(self.key, self.secret)
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_basic(self):
        self.assertRaises(ValueError, lambda: oauth.Token(None, None))
        self.assertRaises(ValueError, lambda: oauth.Token('asf', None))
        self.assertRaises(ValueError, lambda: oauth.Token(None, 'dasf'))
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_unset_consumer_and_token(self):
        consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret')
        token = oauth.Token('my_key', 'my_secret')
        request = oauth.Request("GET", "http://example.com/fetch.php")
        request.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer,
            token)

        self.assertEqual(consumer.key, request['oauth_consumer_key'])
        self.assertEqual(token.key, request['oauth_token'])
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_no_url_set(self):
        consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret')
        token = oauth.Token('my_key', 'my_secret')
        request = oauth.Request()
        self.assertRaises(ValueError,
                          request.sign_request,
                          oauth.SignatureMethod_HMAC_SHA1(), consumer, token)
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_from_consumer_and_token(self):
        url = "http://sp.example.com/"

        tok = oauth.Token(key="tok-test-key", secret="tok-test-secret")
        tok.set_verifier('this_is_a_test_verifier')
        con = oauth.Consumer(key="con-test-key", secret="con-test-secret")
        req = oauth.Request.from_consumer_and_token(con, token=tok,
            http_method="GET", http_url=url)

        self.assertEqual(req['oauth_token'], tok.key)
        self.assertEqual(req['oauth_consumer_key'], con.key)
        self.assertEqual(tok.verifier, req['oauth_verifier'])
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_no_version(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': int(time.time()),
            'bar': 'blerg',
            'multi': ['FOO','BAR'],
            'foo': 59
        }

        self.consumer = oauth.Consumer(key="consumer-key",
            secret="consumer-secret")
        self.token = oauth.Token(key="token-key", secret="token-secret")

        params['oauth_token'] = self.token.key
        params['oauth_consumer_key'] = self.consumer.key
        self.request = oauth.Request(method="GET", url=url, parameters=params)

        signature_method = oauth.SignatureMethod_HMAC_SHA1()
        self.request.sign_request(signature_method, self.consumer, self.token)

        server = oauth.Server()
        server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())

        parameters = server.verify_request(self.request, self.consumer,
            self.token)
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_invalid_version(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_version': '222.9922',
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': int(time.time()),
            'bar': 'blerg',
            'multi': ['foo','bar'],
            'foo': 59
        }

        consumer = oauth.Consumer(key="consumer-key",
            secret="consumer-secret")
        token = oauth.Token(key="token-key", secret="token-secret")

        params['oauth_token'] = token.key
        params['oauth_consumer_key'] = consumer.key
        request = oauth.Request(method="GET", url=url, parameters=params)

        signature_method = oauth.SignatureMethod_HMAC_SHA1()
        request.sign_request(signature_method, consumer, token)

        server = oauth.Server()
        server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())

        self.assertRaises(oauth.Error, server.verify_request, request, consumer, token)
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_missing_signature(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_version': '1.0',
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': int(time.time()),
            'bar': 'blerg',
            'multi': ['FOO','BAR'],
            'foo': 59
        }

        consumer = oauth.Consumer(key="consumer-key",
            secret="consumer-secret")
        token = oauth.Token(key="token-key", secret="token-secret")

        params['oauth_token'] = token.key
        params['oauth_consumer_key'] = consumer.key
        request = oauth.Request(method="GET", url=url, parameters=params)

        signature_method = oauth.SignatureMethod_HMAC_SHA1()
        request.sign_request(signature_method, consumer, token)
        del request['oauth_signature']

        server = oauth.Server()
        server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())

        self.assertRaises(oauth.MissingSignature, server.verify_request,
            request, consumer, token)


# Request Token: http://oauth-sandbox.sevengoslings.net/request_token
# Auth: http://oauth-sandbox.sevengoslings.net/authorize
# Access Token: http://oauth-sandbox.sevengoslings.net/access_token
# Two-legged: http://oauth-sandbox.sevengoslings.net/two_legged
# Three-legged: http://oauth-sandbox.sevengoslings.net/three_legged
# Key: bd37aed57e15df53
# Secret: 0e9e6413a9ef49510a4f68ed02cd
项目:social-vuln-scanner    作者:Betawolf    | 项目源码 | 文件源码
def __init__( self,  
                consumer_key,
                consumer_secret,
                user_token,
                user_secret,
                logger):
    logger.info("OAuth:\nConsumer Key:{}\nConsumer Secret:{}\nUser Token:{}\nUser Secret:{}".format(consumer_key,consumer_secret,user_token,user_secret))
    consumer = oauth.Consumer(consumer_key,consumer_secret)
    client = oauth.Client(consumer)
    access_token = oauth.Token(key=user_token,
                               secret=user_secret)
    client = oauth.Client(consumer,access_token,timeout=MediaConnection.timeout)
    self.client = client
    super().__init__(logger)