Python requests 模块,auth() 实例源码

我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用requests.auth()

项目:cerebros_bot    作者:jslim18    | 项目源码 | 文件源码
def getESCXBalance(address): 
   try:
      payload = {
         "method": "get_balances",
         "params": {
            "filters":[{"field": "address", "op": "==", "value": address},
                       {"field": "asset", "op": "==", "value": "ESCX"}],
            "filterop": "and"
            },
          "jsonrpc":"2.0",
          "id":0
         }
      response = requests.post(url, data=json.dumps(payload), headers=headers, auth=auth)
      json_data = response.json()
      #quantity = json_data.quantity 
      return (json_data['result'].pop()['quantity']) / 100000000
   except: 
      return 0;
项目:core-python    作者:yidao620c    | 项目源码 | 文件源码
def auth():
    # Basic Authentication
    requests.get('https://api.github.com/user', auth=HTTPBasicAuth('user', 'pass'))
    requests.get('https://api.github.com/user', auth=('user', 'pass'))

    # Digest Authentication
    url = 'http://httpbin.org/digest-auth/auth/user/pass'
    requests.get(url, auth=HTTPDigestAuth('user', 'pass'))

    # OAuth2 Authentication????requests-oauthlib
    url = 'https://api.twitter.com/1.1/account/verify_credentials.json'
    auth = OAuth2('YOUR_APP_KEY', 'YOUR_APP_SECRET', 'USER_OAUTH_TOKEN')
    requests.get(url, auth=auth)



    pass
项目:cerebros_bot    作者:jslim18    | 项目源码 | 文件源码
def getESCXBalance(address): 
   try:
      payload = {
         "method": "get_balances",
         "params": {
            "filters":[{"field": "address", "op": "==", "value": address},
                       {"field": "asset", "op": "==", "value": "ESCX"}],
            "filterop": "and"
            },
          "jsonrpc":"2.0",
          "id":0
         }
      response = requests.post(url, data=json.dumps(payload), headers=headers, auth=auth)
      json_data = response.json()
      #quantity = json_data.quantity 
      return (json_data['result'].pop()['quantity']) / 100000000
   except: 
      return 0;
项目:upstox-python    作者:upstox    | 项目源码 | 文件源码
def retrieve_access_token(self):
        """ once you have the authorization code, you can call this function to get
            the access_token. The access_token gives you full access to the API and is
            valid throughout the day
        """
        if self.api_key is None:
            raise (TypeError, 'Value api_key cannot be None. Please go to the Developer Console to get this value')
        if self.redirect_uri is None:
            raise (TypeError, 'Value redirect_uri cannot be None. Please go to the Developer Console to get this value')
        if self.api_secret is None:
            raise (TypeError, 'Value api_secret cannot be None. Please go to the Developer Console to get this value')
        if self.code is None:
            raise (TypeError, 'Value code cannot be None. Please visit the login URL to generate a code')

        params = {'code': self.code, 'redirect_uri': self.redirect_uri, 'grant_type': 'authorization_code'}

        url = self.config['host'] + self.config['routes']['accessToken']
        headers = {"Content-Type" : "application/json", "x-api-key" : self.api_key}
        r = requests.post(url, auth=(self.api_key, self.api_secret), data=json.dumps(params), headers=headers)
        body = json.loads(r.text)
        if 'access_token' not in body:
            raise SystemError(body);
        return body['access_token']
项目:fauxmo-plugins    作者:n8henrie    | 项目源码 | 文件源码
def get_state(self) -> str:
        """Get device state.

        Returns:
            "on", "off", or "unknown"

        """
        if self.state_cmd is None:
            return "unknown"

        resp = requests.request(self.state_method, self.state_cmd,
                                data=self.state_data, json=self.state_json,
                                headers=self.headers, auth=self.auth)

        if self.state_response_off in resp.text:
            return "off"
        elif self.state_response_on in resp.text:
            return "on"
        return "unknown"
项目:stackstorm-ghost2logger    作者:StackStorm-Exchange    | 项目源码 | 文件源码
def _ghost2loggergetGUID(self):
        # Let's deal with the JSON API calls here

        HEADERS = {"Content-Type": 'application/json'}

        _URL = self._ghost_url + "/v1/guid"
        try:
            r = requests.get(_URL, headers=HEADERS, verify=False,
                             auth=(self._username, self._password))
            _data = r.json()
            if _data['host'] == "GUID":
                for _item in _data['pattern']:
                    _guid = str(_item)

                return str(_guid)
            else:
                return ""

        except Exception as e:
            self._logger.info('[Ghost2logger]: Cannot get GUID: ' +
                              str(e))
项目:stackstorm-ghost2logger    作者:StackStorm-Exchange    | 项目源码 | 文件源码
def _ghost2loggergpurge(self):
        # Let's deal with the JSON API calls here
        self._logger.info('[Ghost2logger]: Purge Ghost2logger')

        HEADERS = {"Content-Type": 'application/json'}

        _URL = self._ghost_url + "/v1/all"
        try:
            r = requests.delete(_URL, headers=HEADERS, verify=False,
                                auth=(self._username, self._password))

            _data = r.json()
            self._logger.info('[Ghost2logger]: Purged ghost2logger' +
                              str(_data))
        except Exception as e:
            self._logger.info('[Ghost2logger]: Cannot purge Ghost2logger: ' +
                              str(e))
项目:FileNetPEAPI    作者:wandss    | 项目源码 | 文件源码
def __getAppSpaces(self):

        """Returns a list with the name of availables appspaces on FileNet,
        to apps variable.
        """        
        try:
            appspaces = requests.get(self.baseurl+'appspacenames',
                                     auth=self.cred)
            appspaces.raise_for_status()
            self.appspaces = appspaces.json()            
            self.apps = appspaces.json().keys()

        except Exception as e:
            print (str(e)+':\n'+str(appspaces.json()['UserMessage']['Text']))
            print (appspaces.json())
            self.apps =  appspaces.json()
项目:FileNetPEAPI    作者:wandss    | 项目源码 | 文件源码
def __getQueues(self):

        """Creates a list with URL adresses from workbaskets. Also creates a
        dictionary with workbasket name as key and it's URL as value.
        """
        for apps in self.roles.keys():
            for roles in self.roles.values():
                if roles:
                    for role in roles:
                        my_role = requests.get(self.baseurl
                                               +'appspaces/'
                                               +apps+'/roles/'
                                               +role,
                                               auth = self.cred)
                        if my_role.ok:                            
                            for uri in my_role.json()['workbaskets'].values():
                                self.queue_urls.append(uri['URI'])                                
                                self.workbaskets[uri['URI'].split(
                                    '/')[-1]] = uri['URI']
项目:FileNetPEAPI    作者:wandss    | 项目源码 | 文件源码
def getQueue(self, work_basket):

        """Returns a Queue for a given Workbasket.
        Usage:
        >>> my_queue = pe.getQueue('workbasket_name')
        >>> my_queue.get('count')->Variable with the total tasks in this Queue.
        """                

        queue = requests.get(self.client.baseurl
                             + self.client.workbaskets.get(work_basket),
                             auth = self.client.cred)
        count = requests.get(queue.url + '/queueelements/count',
                             auth = self.client.cred).json()['count']
        queue = queue.json()
        queue['count'] = count
        return queue
项目:FileNetPEAPI    作者:wandss    | 项目源码 | 文件源码
def getAllTasks(self):

        """Returns all tasks from all Queues.
        Usage:
        >>> tasks = pe.getAllTasks()
        When a Queue has no tasks, a message informing which Queues are empty,
        will be printed.
        """
        tasks = []
        for uri in self.client.queue_urls:
            queue = requests.get(self.client.baseurl + uri,
                                 auth = self.client.cred)
            found_tasks = self.getTasks(queue.json())
            if found_tasks:
                tasks.append(found_tasks)
        return [tsk for task in tasks for tsk in task]
项目:FileNetPEAPI    作者:wandss    | 项目源码 | 文件源码
def lockTask(self, task):

        """Receives a task dictionary, obtainned with getTasks() method,
        and locks the task so other users can't access this task at same time.
        Usage:
        >>> pe.lockTask(task)
        """

        locked = requests.get(self.client.baseurl
                              +task['stepElement'],
                              auth = self.client.cred)
        eTag = locked.headers['ETag']
        locked = requests.put(self.client.baseurl
                              + task['stepElement'],
                              auth = self.client.cred,
                              params={'action':'lock',
                                      'If-Match':eTag}
                              )
项目:FileNetPEAPI    作者:wandss    | 项目源码 | 文件源码
def endTask(self, task, comment=None):        
        """Receives a task and finishes it, finishing the workflow itself or
        moving to the next step in the task. Is also possible to create a
        comment before ending the task.
        Usage:
        >>> pe.endTask(task) #or
        >>> pe.endTask(task, u'Completed the task!')

        """
        params = {'action':'dispatch'}
        step = self.getStep(task)
        if step.get('systemProperties').get('responses')\
           and not step.get('systemProperties').get('selectedResponse'):
            return "This task needs to be updated. Check the updateTask method."
        else:
            params['selectedResponse'] = 1           

        if comment:
            task = self.saveAndUnlockTask(task, comment)

        lock = self.lockTask(task)
        params['If-Match'] = task['ETag']
        dispatched = requests.put(self.client.baseurl + task['stepElement'],
                                  auth = self.client.cred,
                                  params=params)
项目:FileNetPEAPI    作者:wandss    | 项目源码 | 文件源码
def getUser(self, search_string):
        """Receives a string and looks for it in directory service. If the
        string search isn't found, the message "User not Found" will be
        returned, otherwise a list with all matching cases, limited to 50
        results will be returned.
        Usage:
        >>> users = pe.getUser('user_name')
        """

        users = []
        user = requests.get(self.client.baseurl+'users',
                            auth=self.client.cred,
                            params={'searchPattern':search_string,
                                    'searchType':4, 'limit':50})
        if user.json().get('users'):            
            for usr in user.json()['users']:
                users.append(usr['displayName'])
        else:
            return "User not Found"
        return users
项目:FileNetPEAPI    作者:wandss    | 项目源码 | 文件源码
def getGroup(self, search_string):
        """Receives a string and looks for it in directory service. If the
        string search isn't found, the message "Group not Found" will be
        returned, otherwise a list with all matching cases, limited to 50
        results will be returned.
        Usage:
        >>> users = pe.getGroup('group_name')
        """        

        groups = []
        group = requests.get(self.client.baseurl+'groups',
                            auth=self.client.cred,
                            params={'searchPattern':search_string,
                                    'searchType':4, 'limit':3000})
        if group.json().get('groups'):
            for grp in group.json()['groups']:
                groups.append(grp['displayName'])
        else:
            return "Group not Found"
        return groups
项目:AutoTriageBot    作者:salesforce    | 项目源码 | 文件源码
def sendMessage() -> str:
    """ Send a message (internal or external) to the HackerOne report identified by the given ID"""
    data = request.get_json(force=True)
    message = data['message']
    internal = data['internal']
    id = data['id']

    if config.DEBUG:
        print("/v1/sendMessage: id=%s, internal=%s" % (id, internal))
    if config.DEBUGVERBOSE:
        print("message=%s" % message)

    h1Data = {'data': {'type': 'activity-comment',
                       'attributes': {'message': message,
                                      'internal': internal}}}
    headers = {'Content-Type': 'application/json'}
    resp = requests.post('https://api.hackerone.com/v1/reports/%s/activities' % id,
                         headers=headers,
                         data=json.dumps(h1Data).encode('utf-8'),
                         auth=(config.apiName, secrets.apiToken))
    return json.dumps(resp.json())
项目:AutoTriageBot    作者:salesforce    | 项目源码 | 文件源码
def changeStatus() -> str:
    """ Change the status of the report at the given ID to the given status """
    data = request.get_json(force=True)
    status = data['status']
    message = data['message']
    id = data['id']

    if config.DEBUG:
        print("/v1/changeStatus: id=%s, status=%s" % (id, status))
    if config.DEBUGVERBOSE:
        print("message=%s" % message)

    h1Data = {'data': {'type': 'state-change',
                       'attributes': {'message': message,
                                      'state': status}}}
    headers = {'Content-Type': 'application/json'}
    resp = requests.post('https://api.hackerone.com/v1/reports/%s/state_changes' % id,
                         headers=headers,
                         data=json.dumps(h1Data).encode('utf-8'),
                         auth=(config.apiName, secrets.apiToken))
    return json.dumps(resp.json())
项目:AutoTriageBot    作者:salesforce    | 项目源码 | 文件源码
def postComment(id: str, vti: VulnTestInfo, internal=False, addStopMessage=False) -> Mapping:
    """ Post a comment to the report with the given ID using the information in the given VulnTestInfo
          - Set internal=True in order to post an internal comment
          - Set addStopMessage=True in order to add the stop message """
    if config.DEBUG:
        print("Posting comment: internal=%s, reproduced=%s, id=%s" % (str(internal), str(vti.reproduced), id))

    if addStopMessage:
        message = vti.message + '\n\n' + constants.disableMessage
    else:
        message = vti.message

    postMessage("Posting Message: \n\n%s" % message)  # TODO: Delete this

    resp = requests.post('http://api:8080/v1/sendMessage',
                         json={'message': message, 'internal': internal, 'id': id},
                         auth=HTTPBasicAuth('AutoTriageBot', secrets.apiBoxToken))

    if config.triageOnReproduce and vti.reproduced:
        changeStatus(id, 'triaged')

    return resp.json()
项目:python-moira-client    作者:moira-alert    | 项目源码 | 文件源码
def __init__(self, api_url, auth_custom=None, auth_user=None, auth_pass=None, login=None):
        """

        :param api_url: str Moira API URL
        :param auth_custom: dict auth custom headers
        :param auth_user: str auth user
        :param auth_pass: str auth password
        :param login: str auth login
        """
        if not api_url.endswith('/'):
            self.api_url = api_url + '/'
        else:
            self.api_url = api_url

        self.auth = None
        self.auth_custom = {'X-Webauth-User': login}

        if auth_user and auth_pass:
            self.auth = HTTPBasicAuth(auth_user, auth_pass)

        if auth_custom:
            self.auth_custom.update(auth_custom)
项目:python-moira-client    作者:moira-alert    | 项目源码 | 文件源码
def get(self, path='', **kwargs):
        """

        :param path: str api path
        :param kwargs: additional parameters for request
        :return: dict response

        :raises: HTTPError
        :raises: InvalidJSONError
        """
        r = requests.get(self._path_join(path), headers=self.auth_custom, auth=self.auth, **kwargs)
        r.raise_for_status()
        try:
            return r.json()
        except ValueError:
            raise InvalidJSONError(r.content)
项目:python-moira-client    作者:moira-alert    | 项目源码 | 文件源码
def delete(self, path='', **kwargs):
        """

        :param path: str api path
        :param kwargs: additional parameters for request
        :return: dict response

        :raises: HTTPError
        :raises: InvalidJSONError
        """
        r = requests.delete(self._path_join(path), headers=self.auth_custom, auth=self.auth, **kwargs)
        r.raise_for_status()
        try:
            return r.json()
        except ValueError:
            raise InvalidJSONError(r.content)
项目:python-moira-client    作者:moira-alert    | 项目源码 | 文件源码
def put(self, path='', **kwargs):
        """

        :param path: str api path
        :param kwargs: additional parameters for request
        :return: dict response

        :raises: HTTPError
        :raises: InvalidJSONError
        """
        r = requests.put(self._path_join(path), headers=self.auth_custom, auth=self.auth, **kwargs)
        r.raise_for_status()
        try:
            return r.json()
        except ValueError:
            raise InvalidJSONError(r.content)
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def add_torrent(self, torrent_data: Union[str, bytes], download_dir: str=None) -> bool:

        self.total_size = 0
        self.expected_torrent_name = ''
        lf = NamedTemporaryFile()
        lf.write(torrent_data)

        params = {'action': 'add-file', 'token': self.token}
        files = {'torrent_file': open(lf.name, 'rb')}
        try:
            response = requests.post(
                self.UTORRENT_URL,
                auth=self.auth,
                params=params,
                files=files,
                timeout=25).json()
            lf.close()
            if 'error' in response:
                return False
            else:
                return True
        except RequestException:
            lf.close()
            return False
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def add_url(self, url: str, download_dir: str=None) -> bool:

        self.total_size = 0
        self.expected_torrent_name = ''

        params = {'action': 'add-url', 'token': self.token, 's': url}
        try:
            response = requests.get(
                self.UTORRENT_URL,
                auth=self.auth,
                # cookies=self.cookies,
                params=params,
                timeout=25).json()
            if 'error' in response:
                return False
            else:
                return True
        except RequestException:
            return False
项目:pact-broker-client    作者:Babylonpartners    | 项目源码 | 文件源码
def push_pact(self, *, pact_file, provider, consumer, consumer_version):
        request_url = urls.PUSH_PACT_URL.format(
            broker_url=self.broker_url,
            provider=provider,
            consumer=consumer,
            consumer_version=consumer_version
        )

        with open(pact_file) as data_file:
            pact_json = json.load(data_file)

        response = requests.put(
            request_url,
            auth=self._auth,
            json=pact_json
        )
        response.raise_for_status()
        return response, (
            f'Pact between {consumer} and {provider} pushed.'
        )
项目:pact-broker-client    作者:Babylonpartners    | 项目源码 | 文件源码
def tag_consumer(self, *, provider, consumer, consumer_version, tag):
        request_url = urls.TAG_CONSUMER_URL.format(
            broker_url=self.broker_url,
            consumer=consumer,
            consumer_version=consumer_version,
            tag=tag
        )
        response = requests.put(
            request_url,
            headers={'Content-Type': 'application/json'},
            auth=self._auth
        )
        response.raise_for_status()
        return response, (
            f'{consumer} version {consumer_version} tagged as {tag}'
        )
项目:tfs    作者:devopshq    | 项目源码 | 文件源码
def __init__(self, base_url, project, user, password, verify=False, timeout=None, auth_type=None):
        if not base_url.endswith('/'):
            base_url += '/'

        collection, project = self.get_collection_and_project(project)
        # Remove part after / in project-name, like DefaultCollection/MyProject => DefaultCollection
        # API responce only in Project, without subproject
        self._url = base_url + '%s/_apis/' % collection
        if project:
            self._url_prj = base_url + '%s/%s/_apis/' % (collection, project)
        else:
            self._url_prj = self._url

        self.http_session = requests.Session()
        auth = auth_type(user, password)
        self.http_session.auth = auth

        self.timeout = timeout
        self._verify = verify
        if not self._verify:
            from requests.packages.urllib3.exceptions import InsecureRequestWarning
            requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
项目:kuberdock-platform    作者:cloudlinux    | 项目源码 | 文件源码
def _compose_args(self, method, data=None):
        args = {}
        if data is None:
            data = {}
        query = 'params' if method == 'get' else 'data'
        args[query] = self._structure.get('common-params', {})
        args[query].update(data)
        args['headers'] = self._structure.get('common-headers', {})
        auth = self._structure.get('auth')
        if auth == 'params':
            args[query].update({'username': self.billing_username,
                                'password': self.billing_password})
        elif auth == 'headers':
            args['auth'] = HTTPBasicAuth(
                self.billing_username, self.billing_password)
        if self._structure.get('verify') == False:
            args['verify'] = False
        return args
项目:pathman-sr    作者:CiscoDevNet    | 项目源码 | 文件源码
def get_url(url):
    '''request url'''
    headers = {'Content-type': 'application/json'}
    auth = (controller['any_user'], controller['any_pass'])
    logging.info(url)
    try:
        response = requests.get(url, headers=headers, auth=auth, verify=False)
        logging.info("Url GET Status: %s" % response.status_code)

        if response.status_code in [200]:
            return True, response.json()
        else:
            return False, str(response.text)
    except requests.exceptions.ConnectionError, e:
        logging.error('Connection Error: %s' % e.message)
        return False, str(e.message)
项目:pathman-sr    作者:CiscoDevNet    | 项目源码 | 文件源码
def post_xml(url, data):
    '''request post'''
    headers = {'Content-type': 'application/xml'}
    auth = (controller['any_user'], controller['any_pass'])
    try:
        response =  requests.post(url, data=data, auth=auth, headers=headers, verify=False)
        logging.info("Url POST Status: %s" % response.status_code)
        if response.status_code in [200, 204]:
            if len(response.text) > 0:
                return True, response.json()
            else:
                return True, {}
        else:
            return False, str(response.text)
    except requests.exceptions.ConnectionError, e:
        logging.error('Connection Error: %s' % e.message)
        return False, str(e.message)
项目:pathman-sr    作者:CiscoDevNet    | 项目源码 | 文件源码
def put_xml(url, data):
    """request post"""
    headers = {'Content-type': 'application/xml'}
    auth = (controller['any_user'], controller['any_pass'])
    try:
        response = requests.put(url, data=data, auth=auth, headers=headers, verify=False)
        logging.info("Url PUT Status: %s" % response.status_code)
        if response.status_code in [200, 201, 204]:
            if len(response.text) > 0:
                return True, response.json()
            else:
                return True, {}
        else:
            return False, str(response.text)
    except requests.exceptions.ConnectionError, e:
        logging.error('Connection Error: %s' % e.message)
        return False, str(e.message)
项目:pathman-sr    作者:CiscoDevNet    | 项目源码 | 文件源码
def put_json(url, data):
    """request post"""
    headers = {'Content-type': 'application/json'}
    auth = (controller['any_user'], controller['any_pass'])
    try:
        response =  requests.put(url, data=json.dumps(data), auth=auth, headers=headers, verify=False)
        logging.info("Url PUT Status: %s" % response.status_code)
        if response.status_code in [200, 204]:
            if len(response.text) > 0:
                return True, response.json()
            else:
                return True, {}
        else:
            return False, str(response.text)
    except requests.exceptions.ConnectionError, e:
        logging.error('Connection Error: %s' % e.message)
        return False, str(e.message)
项目:badwolf    作者:bosondata    | 项目源码 | 文件源码
def grant_access_token(self, code):
        res = self._session.post(
            'https://bitbucket.org/site/oauth2/access_token',
            data={
                'grant_type': 'authorization_code',
                'code': code,
            },
            auth=HTTPBasicAuth(self._oauth_key, self._oauth_secret)
        )
        try:
            res.raise_for_status()
        except requests.RequestException as reqe:
            error_info = res.json()
            raise BitbucketAPIError(
                res.status_code,
                error_info.get('error', ''),
                error_info.get('error_description', ''),
                request=reqe.request,
                response=reqe.response
            )
        data = res.json()
        self._access_token = data['access_token']
        self._refresh_token = data['refresh_token']
        self._token_type = data['token_type']
        return data
项目:spotify-api    作者:steinitzu    | 项目源码 | 文件源码
def refresh_token(self):
        """
        Refresh token regardless of when it expires.

        Raises HTTPError on any non 2xx response code
        """
        refresh_token = self._token['refresh_token']        
        params = {
            'refresh_token': refresh_token,
            'grant_type': 'refresh_token'
        }
        auth = HTTPBasicAuth(self.client_id, self.client_secret)

        response = self.session.post(self.TOKEN_URL, data=params, auth=auth)
        response.raise_for_status()
        token = response.json()
        token['expires_at'] = int(time.time()) + token['expires_in']
        if 'refresh_token' not in token:
            token['refresh_token'] = refresh_token
        self.token = token
项目:picopayments-cli-python    作者:CounterpartyXCP    | 项目源码 | 文件源码
def jsonrpc_call(url, method, params={}, verify_ssl_cert=True,
                 username=None, password=None):
    payload = {"method": method, "params": params, "jsonrpc": "2.0", "id": 0}
    kwargs = {
        "url": url,
        "headers": {'content-type': 'application/json'},
        "data": json.dumps(payload),
        "verify": verify_ssl_cert,
    }
    if username and password:
        kwargs["auth"] = HTTPBasicAuth(username, password)

    global method_cumulative_calltime
    begin = time.time()
    response = requests.post(**kwargs).json()
    method_cumulative_calltime[method] += (time.time() - begin)

    if "result" not in response:
        raise JsonRpcCallFailed(payload, response)
    return response["result"]
项目:flickr_downloader    作者:Denisolt    | 项目源码 | 文件源码
def get_strategy_for(self, url):
        """Retrieve the authentication strategy for a specified URL.

        :param str url: The full URL you will be making a request against. For
            example, ``'https://api.github.com/user'``
        :returns: Callable that adds authentication to a request.

        .. code-block:: python

            import requests
            a = AuthHandler({'example.com', ('foo', 'bar')})
            strategy = a.get_strategy_for('http://example.com/example')
            assert isinstance(strategy, requests.auth.HTTPBasicAuth)

        """
        key = self._key_from_url(url)
        return self.strategies.get(key, NullAuthStrategy())
项目:cc-server    作者:curious-containers    | 项目源码 | 文件源码
def notify(tee, servers, meta_data):
    for server in servers:
        connector_access = server['connector_access']
        try:
            json_data = connector_access.get('json_data')
            if connector_access.get('add_meta_data'):
                json_data = connector_access.get('json_data', {})
                for key, val in meta_data.items():
                    json_data[key] = val

            r = requests.post(
                connector_access['url'],
                json=json_data,
                auth=_auth(connector_access.get('auth')),
                verify=connector_access.get('ssl_verify', True)
            )
            r.raise_for_status()
        except:
            tee('Could not notify server {}: {}'.format(connector_access['url'], format_exc()))
项目:Liljimbo-Chatbot    作者:chrisjim316    | 项目源码 | 文件源码
def get_strategy_for(self, url):
        """Retrieve the authentication strategy for a specified URL.

        :param str url: The full URL you will be making a request against. For
            example, ``'https://api.github.com/user'``
        :returns: Callable that adds authentication to a request.

        .. code-block:: python

            import requests
            a = AuthHandler({'example.com', ('foo', 'bar')})
            strategy = a.get_strategy_for('http://example.com/example')
            assert isinstance(strategy, requests.auth.HTTPBasicAuth)

        """
        key = self._key_from_url(url)
        return self.strategies.get(key, NullAuthStrategy())
项目:ruffruffs    作者:di    | 项目源码 | 文件源码
def test_prepared_from_session(self, httpbin):
        class DummyAuth(requests.auth.AuthBase):
            def __call__(self, r):
                r.headers['Dummy-Auth-Test'] = 'dummy-auth-test-ok'
                return r

        req = requests.Request('GET', httpbin('headers'))
        assert not req.auth

        s = requests.Session()
        s.auth = DummyAuth()

        prep = s.prepare_request(req)
        resp = s.send(prep)

        assert resp.json()['headers'][
            'Dummy-Auth-Test'] == 'dummy-auth-test-ok'
项目:rets    作者:refindlyllc    | 项目源码 | 文件源码
def _user_agent_digest_hash(self):
        """
        Hash the user agent and user agent password
        Section 3.10 of https://www.nar.realtor/retsorg.nsf/retsproto1.7d6.pdf
        :return: md5
        """
        if not self.version:
            raise MissingVersion("A version is required for user agent auth. The RETS server should set this"
                                 "automatically but it has not. Please instantiate the session with a version argument"
                                 "to provide the version.")
        version_number = self.version.strip('RETS/')
        user_str = '{0!s}:{1!s}'.format(self.user_agent, self.user_agent_password).encode('utf-8')
        a1 = hashlib.md5(user_str).hexdigest()
        session_id = self.session_id if self.session_id is not None else ''
        digest_str = '{0!s}::{1!s}:{2!s}'.format(a1, session_id, version_number).encode('utf-8')
        digest = hashlib.md5(digest_str).hexdigest()
        return digest
项目:nflpool    作者:prcutler    | 项目源码 | 文件源码
def division_standings():
    url = base_url + 'division_team_standings.json'
    response = requests.get((url),
                            auth=HTTPBasicAuth(secret.msf_username, secret.msf_pw))

    data = response.json()

    for division in data['divisionteamstandings']['division']:
        for team in division['teamentry']:
            team_id = team['team']['ID']
            rank = team['rank']

            conn = sqlite3.connect('nflpool.sqlite')
            cur = conn.cursor()

            cur.execute('''INSERT INTO division_standings(team_id, rank)
                VALUES(?,?)''', (team_id, rank))

            conn.commit()
            conn.close()


# Get Playoff Standings for each team (need number 5 & 6 in each conference for Wild Card picks)
项目:nflpool    作者:prcutler    | 项目源码 | 文件源码
def playoff_standings():
    url = base_url + 'playoff_team_standings.json'
    response = requests.get((url),
                            auth=HTTPBasicAuth(secret.msf_username, secret.msf_pw))

    data = response.json()

    for conference in data['playoffteamstandings']['conference']:
        for team in conference['teamentry']:
            team_id = team['team']['ID']
            rank = team['rank']

            conn = sqlite3.connect('nflpool.sqlite')
            cur = conn.cursor()

            cur.execute('''INSERT INTO playoff_rankings(team_id, rank)
                VALUES(?,?)''', (team_id, rank))

            conn.commit()
            conn.close()


# Get individual statistics for each category
项目:igmonplugins    作者:innogames    | 项目源码 | 文件源码
def parse_auth_argument(args):
    auth = args.auth
    if auth == 'basic':
        if not (args.username and args.password):
            print("For basic authentication, 'username' and 'password' "
                  "parameter are needed")
            exit(3)
        auth = HTTPBasicAuth(args.username, args.password)
    elif auth == 'oauth':
        if not (args.consumer_key and args.private_key):
            print("For oauth authentication, 'consumer-key' "
                  "and 'private-key' parameter are needed")
            exit(3)
        auth = get_oauth1session(args.consumer_key, args.consumer_secret,
                                 args.private_key, args.passphrase)

    return auth
项目:igmonplugins    作者:innogames    | 项目源码 | 文件源码
def parse_auth_argument(args):
    auth = args.auth
    if auth == 'basic':
        if not (args.username and args.password):
            print("For basic authentication, 'username' and 'password' "
                  "parameter are needed")
            exit(3)
        auth = HTTPBasicAuth(args.username, args.password)
    elif auth == 'oauth':
        if not (args.consumer_key and args.private_key):
            print("For oauth authentication, 'consumer-key' "
                  "and 'private-key' parameter are needed")
            exit(3)
        auth = get_oauth1session(args.consumer_key, args.consumer_secret,
                                 args.private_key, args.passphrase)

    return auth
项目:igmonplugins    作者:innogames    | 项目源码 | 文件源码
def parse_auth_argument(args):
    auth = args.auth
    if auth == 'basic':
        if not (args.username and args.password):
            print("For basic authentication, 'username' and 'password' "
                  "parameter are needed")
            exit(3)
        auth = HTTPBasicAuth(args.username, args.password)
    elif auth == 'oauth':
        if not (args.consumer_key and args.private_key):
            print("For oauth authentication, 'consumer-key' "
                  "and 'private-key' parameter are needed")
            exit(3)
        auth = get_oauth1session(args.consumer_key, args.consumer_secret,
                                 args.private_key, args.passphrase)

    return auth
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def run(self):

        try:
            if self.data_type == 'hash':
                query_url = 'scan/'
                query_data = self.getParam('data', None, 'Hash is missing')

            elif self.data_type == 'file':
                query_url = 'scan/'
                hashes = self.getParam('attachment.hashes', None)

                if hashes is None:
                    filepath = self.getParam('file', None, 'File is missing')
                    query_data = hashlib.sha256(open(filepath, 'r').read()).hexdigest()
                else:
                    # find SHA256 hash
                    query_data = next(h for h in hashes if len(h) == 64)

            elif self.data_type == 'filename':
                query_url = 'search?query=filename:'
                query_data = self.getParam('data', None, 'Filename is missing')
            else:
                self.notSupported()

            url = str(self.basic_url) + str(query_url) + str(query_data)

            error = True
            while error:
                r = requests.get(url, headers=self.headers, auth=HTTPBasicAuth(self.api_key, self.secret), verify=False)
                if "error" in r.json().get('response') == "Exceeded maximum API requests per minute(5). Please try again later.":
                    time.sleep(60)
                else:
                    error = False

            self.report({'results': r.json()})

        except ValueError as e:
            self.unexpectedError(e)
项目:sentry-plugins    作者:getsentry    | 项目源码 | 文件源码
def get_token(self, token_only=True, scopes=None):
        if scopes is None:
            scopes = ['send_notification', 'view_room']

        cache_key = 'hipchat-tokens:%s:%s' % (self.id, ','.join(scopes))

        def gen_token():
            data = {
                'grant_type': 'client_credentials',
                'scope': ' '.join(scopes),
            }
            resp = requests.post(
                self.token_url, data=data, auth=HTTPBasicAuth(self.id, self.secret), timeout=10
            )
            if resp.status_code == 200:
                return resp.json()
            elif resp.status_code == 401:
                raise OauthClientInvalidError(self)
            else:
                raise Exception('Invalid token: %s' % resp.text)

        if token_only:
            token = cache.get(cache_key)
            if not token:
                data = gen_token()
                token = data['access_token']
                cache.set(cache_key, token, data['expires_in'] - 20)
            return token
        return gen_token()
项目:home-cluster    作者:fabianvf    | 项目源码 | 文件源码
def _get_session(self):
        if not self.session:
            self.session = requests.session()
            self.session.auth = HTTPBasicAuth(self.foreman_user, self.foreman_pw)
            self.session.verify = self.foreman_ssl_verify
        return self.session