Python requests.auth 模块,HTTPBasicAuth() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用requests.auth.HTTPBasicAuth()

项目:core-python    作者:yidao620c    | 项目源码 | 文件源码
def auth():
    # Basic Authentication
    requests.get('https://api.github.com/user', auth=HTTPBasicAuth('user', 'pass'))
    requests.get('https://api.github.com/user', auth=('user', 'pass'))

    # Digest Authentication
    url = 'http://httpbin.org/digest-auth/auth/user/pass'
    requests.get(url, auth=HTTPDigestAuth('user', 'pass'))

    # OAuth2 Authentication????requests-oauthlib
    url = 'https://api.twitter.com/1.1/account/verify_credentials.json'
    auth = OAuth2('YOUR_APP_KEY', 'YOUR_APP_SECRET', 'USER_OAUTH_TOKEN')
    requests.get(url, auth=auth)



    pass
项目:aiolocust    作者:kpidata    | 项目源码 | 文件源码
def __init__(self, base_url, *args, **kwargs):
        requests.Session.__init__(self, *args, **kwargs)

        self.base_url = base_url

        # Check for basic authentication
        parsed_url = urlparse(self.base_url)
        if parsed_url.username and parsed_url.password:
            netloc = parsed_url.hostname
            if parsed_url.port:
                netloc += ":%d" % parsed_url.port

            # remove username and password from the base_url
            self.base_url = urlunparse((parsed_url.scheme, netloc, parsed_url.path, parsed_url.params, parsed_url.query, parsed_url.fragment))
            # configure requests to use basic auth
            self.auth = HTTPBasicAuth(parsed_url.username, parsed_url.password)
项目:python-moira-client    作者:moira-alert    | 项目源码 | 文件源码
def __init__(self, api_url, auth_custom=None, auth_user=None, auth_pass=None, login=None):
        """

        :param api_url: str Moira API URL
        :param auth_custom: dict auth custom headers
        :param auth_user: str auth user
        :param auth_pass: str auth password
        :param login: str auth login
        """
        if not api_url.endswith('/'):
            self.api_url = api_url + '/'
        else:
            self.api_url = api_url

        self.auth = None
        self.auth_custom = {'X-Webauth-User': login}

        if auth_user and auth_pass:
            self.auth = HTTPBasicAuth(auth_user, auth_pass)

        if auth_custom:
            self.auth_custom.update(auth_custom)
项目:pact-broker-client    作者:Babylonpartners    | 项目源码 | 文件源码
def __init__(
        self,
        *,
        broker_url,
        pact_dir='.',
        user=None,
        password=None,
        authentication=False
    ):
        self.broker_url = broker_url.rstrip('/')
        self.username = user
        self.password = password
        self.pact_dir = pact_dir
        self.authentication = authentication
        self._auth = None

        if self.authentication:
            if not (self.username and self.password):
                raise ValueError(
                    'When authentication is True, username and password '
                    'must be provided.'
                )
            else:
                self._auth = HTTPBasicAuth(self.username, self.password)
项目:pact-broker-client    作者:Babylonpartners    | 项目源码 | 文件源码
def test_pull_pact_authentication(mock_get, pull_pact_response):
    mock_get.return_value = pull_pact_response
    broker_client = BrokerClient(
        broker_url=settings.PACT_BROKER_URL,
        user='user',
        password='password',
        authentication=True
    )

    broker_client.pull_pact(
        provider=PROVIDER,
        consumer=CONSUMER,
    )

    mock_get.assert_called_once_with(
        EXPECTED_PULL_PACT_URL,
        auth=HTTPBasicAuth('user', 'password')
    )
项目:tfs    作者:devopshq    | 项目源码 | 文件源码
def __init__(self, server_url, project="DefaultCollection", user=None, password=None, verify=False,
                 auth_type=HTTPBasicAuth,
                 connect_timeout=20, read_timeout=180, ):
        """
        :param server_url: url to TFS server, e.g. https://tfs.example.com/
        :param project: Collection or Collection\\Project
        :param user: username, or DOMAIN\\username
        :param password: password
        :param verify: True|False - verify HTTPS cert
        :param connect_timeout: Requests CONNECTION timeout, sec or None
        :param read_timeout: Requests READ timeout, sec or None
        """
        if user is None or password is None:
            raise ValueError('User name and api-key must be specified!')
        self.rest_client = TFSHTTPClient(server_url,
                                         project=project,
                                         user=user, password=password,
                                         verify=verify,
                                         timeout=(connect_timeout, read_timeout),
                                         auth_type=auth_type,
                                         )
项目:kuberdock-platform    作者:cloudlinux    | 项目源码 | 文件源码
def _compose_args(self, method, data=None):
        args = {}
        if data is None:
            data = {}
        query = 'params' if method == 'get' else 'data'
        args[query] = self._structure.get('common-params', {})
        args[query].update(data)
        args['headers'] = self._structure.get('common-headers', {})
        auth = self._structure.get('auth')
        if auth == 'params':
            args[query].update({'username': self.billing_username,
                                'password': self.billing_password})
        elif auth == 'headers':
            args['auth'] = HTTPBasicAuth(
                self.billing_username, self.billing_password)
        if self._structure.get('verify') == False:
            args['verify'] = False
        return args
项目:badwolf    作者:bosondata    | 项目源码 | 文件源码
def grant_access_token(self, code):
        res = self._session.post(
            'https://bitbucket.org/site/oauth2/access_token',
            data={
                'grant_type': 'authorization_code',
                'code': code,
            },
            auth=HTTPBasicAuth(self._oauth_key, self._oauth_secret)
        )
        try:
            res.raise_for_status()
        except requests.RequestException as reqe:
            error_info = res.json()
            raise BitbucketAPIError(
                res.status_code,
                error_info.get('error', ''),
                error_info.get('error_description', ''),
                request=reqe.request,
                response=reqe.response
            )
        data = res.json()
        self._access_token = data['access_token']
        self._refresh_token = data['refresh_token']
        self._token_type = data['token_type']
        return data
项目:spotify-api    作者:steinitzu    | 项目源码 | 文件源码
def refresh_token(self):
        """
        Refresh token regardless of when it expires.

        Raises HTTPError on any non 2xx response code
        """
        refresh_token = self._token['refresh_token']        
        params = {
            'refresh_token': refresh_token,
            'grant_type': 'refresh_token'
        }
        auth = HTTPBasicAuth(self.client_id, self.client_secret)

        response = self.session.post(self.TOKEN_URL, data=params, auth=auth)
        response.raise_for_status()
        token = response.json()
        token['expires_at'] = int(time.time()) + token['expires_in']
        if 'refresh_token' not in token:
            token['refresh_token'] = refresh_token
        self.token = token
项目:picopayments-cli-python    作者:CounterpartyXCP    | 项目源码 | 文件源码
def jsonrpc_call(url, method, params={}, verify_ssl_cert=True,
                 username=None, password=None):
    payload = {"method": method, "params": params, "jsonrpc": "2.0", "id": 0}
    kwargs = {
        "url": url,
        "headers": {'content-type': 'application/json'},
        "data": json.dumps(payload),
        "verify": verify_ssl_cert,
    }
    if username and password:
        kwargs["auth"] = HTTPBasicAuth(username, password)

    global method_cumulative_calltime
    begin = time.time()
    response = requests.post(**kwargs).json()
    method_cumulative_calltime[method] += (time.time() - begin)

    if "result" not in response:
        raise JsonRpcCallFailed(payload, response)
    return response["result"]
项目:flickr_downloader    作者:Denisolt    | 项目源码 | 文件源码
def _handle_basic_auth_401(self, r, kwargs):
        if self.pos is not None:
            r.request.body.seek(self.pos)

        # Consume content and release the original connection
        # to allow our new request to reuse the same one.
        r.content
        r.raw.release_conn()
        prep = r.request.copy()
        if not hasattr(prep, '_cookies'):
            prep._cookies = cookies.RequestsCookieJar()
        cookies.extract_cookies_to_jar(prep._cookies, r.request, r.raw)
        prep.prepare_cookies(prep._cookies)

        self.auth = auth.HTTPBasicAuth(self.username, self.password)
        prep = self.auth(prep)
        _r = r.connection.send(prep, **kwargs)
        _r.history.append(r)
        _r.request = prep

        return _r
项目:flickr_downloader    作者:Denisolt    | 项目源码 | 文件源码
def add_strategy(self, domain, strategy):
        """Add a new domain and authentication strategy.

        :param str domain: The domain you wish to match against. For example:
            ``'https://api.github.com'``
        :param str strategy: The authentication strategy you wish to use for
            that domain. For example: ``('username', 'password')`` or
            ``requests.HTTPDigestAuth('username', 'password')``

        .. code-block:: python

            a = AuthHandler({})
            a.add_strategy('https://api.github.com', ('username', 'password'))

        """
        # Turn tuples into Basic Authentication objects
        if isinstance(strategy, tuple):
            strategy = HTTPBasicAuth(*strategy)

        key = self._key_from_url(domain)
        self.strategies[key] = strategy
项目:flickr_downloader    作者:Denisolt    | 项目源码 | 文件源码
def get_strategy_for(self, url):
        """Retrieve the authentication strategy for a specified URL.

        :param str url: The full URL you will be making a request against. For
            example, ``'https://api.github.com/user'``
        :returns: Callable that adds authentication to a request.

        .. code-block:: python

            import requests
            a = AuthHandler({'example.com', ('foo', 'bar')})
            strategy = a.get_strategy_for('http://example.com/example')
            assert isinstance(strategy, requests.auth.HTTPBasicAuth)

        """
        key = self._key_from_url(url)
        return self.strategies.get(key, NullAuthStrategy())
项目:cc-server    作者:curious-containers    | 项目源码 | 文件源码
def _auth(http_auth):
    if not http_auth:
        return None

    if http_auth['auth_type'] == 'basic':
        return HTTPBasicAuth(
            http_auth['username'],
            http_auth['password']
        )

    if http_auth['auth_type'] == 'digest':
        return HTTPDigestAuth(
            http_auth['username'],
            http_auth['password']
        )

    raise Exception('Authorization information is not valid.')
项目:Liljimbo-Chatbot    作者:chrisjim316    | 项目源码 | 文件源码
def _handle_basic_auth_401(self, r, kwargs):
        if self.pos is not None:
            r.request.body.seek(self.pos)

        # Consume content and release the original connection
        # to allow our new request to reuse the same one.
        r.content
        r.raw.release_conn()
        prep = r.request.copy()
        if not hasattr(prep, '_cookies'):
            prep._cookies = cookies.RequestsCookieJar()
        cookies.extract_cookies_to_jar(prep._cookies, r.request, r.raw)
        prep.prepare_cookies(prep._cookies)

        self.auth = auth.HTTPBasicAuth(self.username, self.password)
        prep = self.auth(prep)
        _r = r.connection.send(prep, **kwargs)
        _r.history.append(r)
        _r.request = prep

        return _r
项目:Liljimbo-Chatbot    作者:chrisjim316    | 项目源码 | 文件源码
def add_strategy(self, domain, strategy):
        """Add a new domain and authentication strategy.

        :param str domain: The domain you wish to match against. For example:
            ``'https://api.github.com'``
        :param str strategy: The authentication strategy you wish to use for
            that domain. For example: ``('username', 'password')`` or
            ``requests.HTTPDigestAuth('username', 'password')``

        .. code-block:: python

            a = AuthHandler({})
            a.add_strategy('https://api.github.com', ('username', 'password'))

        """
        # Turn tuples into Basic Authentication objects
        if isinstance(strategy, tuple):
            strategy = HTTPBasicAuth(*strategy)

        key = self._key_from_url(domain)
        self.strategies[key] = strategy
项目:Liljimbo-Chatbot    作者:chrisjim316    | 项目源码 | 文件源码
def get_strategy_for(self, url):
        """Retrieve the authentication strategy for a specified URL.

        :param str url: The full URL you will be making a request against. For
            example, ``'https://api.github.com/user'``
        :returns: Callable that adds authentication to a request.

        .. code-block:: python

            import requests
            a = AuthHandler({'example.com', ('foo', 'bar')})
            strategy = a.get_strategy_for('http://example.com/example')
            assert isinstance(strategy, requests.auth.HTTPBasicAuth)

        """
        key = self._key_from_url(url)
        return self.strategies.get(key, NullAuthStrategy())
项目:valence    作者:openstack    | 项目源码 | 文件源码
def test_get_podm_status_Offline_by_http_exception(self, mock_get):
        mock_get.side_effect = requests.ConnectionError
        self.assertEqual(redfish.pod_status('url', 'username', 'password'),
                         constants.PODM_STATUS_OFFLINE)
        mock_get.asset_called_once_with('url',
                                        auth=auth.HTTPBasicAuth('username',
                                                                'password'))
        # SSL Error
        mock_get.side_effect = requests.exceptions.SSLError
        self.assertEqual(redfish.pod_status('url', 'username', 'password'),
                         constants.PODM_STATUS_OFFLINE)
        self.assertEqual(mock_get.call_count, 2)
        # Timeout
        mock_get.side_effect = requests.Timeout
        self.assertEqual(redfish.pod_status('url', 'username', 'password'),
                         constants.PODM_STATUS_OFFLINE)
        self.assertEqual(mock_get.call_count, 3)
项目:LinkR    作者:Toure    | 项目源码 | 文件源码
def _upload(url, data_file, username, password):
        """
        Upload will submit a results file via polarion ReST interface.
        """
        url_match = '(http(s)?)\:\/\/localhost'
        if re.search(url_match, url):
            print("Please configure url settings.")
            exit(1)

        polarion_request = post(url,
                                data=data_file,
                                auth=auth.HTTPBasicAuth(username,
                                                        password))
        status_code = polarion_request.status_code
        if status_code == codes.ok:
            return status_code
        else:
            print("Results upload failed with the follow: {}".format(
                polarion_request.status_code))
            raise exceptions.RequestException
项目:nflpool    作者:prcutler    | 项目源码 | 文件源码
def division_standings():
    url = base_url + 'division_team_standings.json'
    response = requests.get((url),
                            auth=HTTPBasicAuth(secret.msf_username, secret.msf_pw))

    data = response.json()

    for division in data['divisionteamstandings']['division']:
        for team in division['teamentry']:
            team_id = team['team']['ID']
            rank = team['rank']

            conn = sqlite3.connect('nflpool.sqlite')
            cur = conn.cursor()

            cur.execute('''INSERT INTO division_standings(team_id, rank)
                VALUES(?,?)''', (team_id, rank))

            conn.commit()
            conn.close()


# Get Playoff Standings for each team (need number 5 & 6 in each conference for Wild Card picks)
项目:nflpool    作者:prcutler    | 项目源码 | 文件源码
def playoff_standings():
    url = base_url + 'playoff_team_standings.json'
    response = requests.get((url),
                            auth=HTTPBasicAuth(secret.msf_username, secret.msf_pw))

    data = response.json()

    for conference in data['playoffteamstandings']['conference']:
        for team in conference['teamentry']:
            team_id = team['team']['ID']
            rank = team['rank']

            conn = sqlite3.connect('nflpool.sqlite')
            cur = conn.cursor()

            cur.execute('''INSERT INTO playoff_rankings(team_id, rank)
                VALUES(?,?)''', (team_id, rank))

            conn.commit()
            conn.close()


# Get individual statistics for each category
项目:igmonplugins    作者:innogames    | 项目源码 | 文件源码
def parse_auth_argument(args):
    auth = args.auth
    if auth == 'basic':
        if not (args.username and args.password):
            print("For basic authentication, 'username' and 'password' "
                  "parameter are needed")
            exit(3)
        auth = HTTPBasicAuth(args.username, args.password)
    elif auth == 'oauth':
        if not (args.consumer_key and args.private_key):
            print("For oauth authentication, 'consumer-key' "
                  "and 'private-key' parameter are needed")
            exit(3)
        auth = get_oauth1session(args.consumer_key, args.consumer_secret,
                                 args.private_key, args.passphrase)

    return auth
项目:igmonplugins    作者:innogames    | 项目源码 | 文件源码
def parse_auth_argument(args):
    auth = args.auth
    if auth == 'basic':
        if not (args.username and args.password):
            print("For basic authentication, 'username' and 'password' "
                  "parameter are needed")
            exit(3)
        auth = HTTPBasicAuth(args.username, args.password)
    elif auth == 'oauth':
        if not (args.consumer_key and args.private_key):
            print("For oauth authentication, 'consumer-key' "
                  "and 'private-key' parameter are needed")
            exit(3)
        auth = get_oauth1session(args.consumer_key, args.consumer_secret,
                                 args.private_key, args.passphrase)

    return auth
项目:igmonplugins    作者:innogames    | 项目源码 | 文件源码
def parse_auth_argument(args):
    auth = args.auth
    if auth == 'basic':
        if not (args.username and args.password):
            print("For basic authentication, 'username' and 'password' "
                  "parameter are needed")
            exit(3)
        auth = HTTPBasicAuth(args.username, args.password)
    elif auth == 'oauth':
        if not (args.consumer_key and args.private_key):
            print("For oauth authentication, 'consumer-key' "
                  "and 'private-key' parameter are needed")
            exit(3)
        auth = get_oauth1session(args.consumer_key, args.consumer_secret,
                                 args.private_key, args.passphrase)

    return auth
项目:concourse-resource-bitbucket    作者:Karunamon    | 项目源码 | 文件源码
def post_result(url, user, password, verify, data, debug):
    r = requests.post(
        url,
        auth=HTTPBasicAuth(user, password),
        verify=verify,
        json=data
        )

    if debug:
        err("Request result: " + str(r))

    if r.status_code == 403:
        err("HTTP 403 Forbidden - Does your bitbucket user have rights to the repo?")
    elif r.status_code == 401:
        err("HTTP 401 Unauthorized - Are your bitbucket credentials correct?")

    # All other errors, just dump the JSON
    if r.status_code != 204:  # 204 is a success per Bitbucket docs
        err(json_pp(r.json()))

    return r

# Stop all this from executing if we were imported, say, for testing.
项目:adarnauth-esi    作者:Adarnof    | 项目源码 | 文件源码
def refresh(self, session=None, auth=None):
        """
        Refreshes the token.
        :param session: :class:`requests_oauthlib.OAuth2Session` for refreshing token with.
        :param auth: :class:`requests.auth.HTTPBasicAuth`
        """
        if self.can_refresh:
            if not session:
                session = OAuth2Session(app_settings.ESI_SSO_CLIENT_ID)
            if not auth:
                auth = HTTPBasicAuth(app_settings.ESI_SSO_CLIENT_ID, app_settings.ESI_SSO_CLIENT_SECRET)
            try:
                self.access_token = \
                    session.refresh_token(app_settings.ESI_TOKEN_URL, refresh_token=self.refresh_token, auth=auth)[
                        'access_token']
                self.created = timezone.now()
                self.save()
            except (InvalidGrantError, MissingTokenError):
                raise TokenInvalidError()
            except InvalidClientError:
                raise ImproperlyConfigured('Verify ESI_SSO_CLIENT_ID and ESI_SSO_CLIENT_SECRET settings.')
        else:
            raise NotRefreshableTokenError()
项目:ptauto    作者:IntegralDefense    | 项目源码 | 文件源码
def whois_search(self, query='', field=''):
        """
        Performs a WHOIS search with the given parameters

        :param query: the search term
        :type str:
        :param field: WHOIS field to execute the search on: domain, email,
                      name, organization, address, phone, nameserver
        :type str:
        :returns dict using json.loads()
        """
        log.debug('Permforming WHOIS search with query: {0} and field: '
                  '{1}'.format(query, field))
        params = {'query': query, 'field': field}
        r = requests.get('https://api.passivetotal.org/v2/whois/search',
                         auth=HTTPBasicAuth(self.username, self.apikey),
                         proxies=self.proxies, params=params)
        if r.status_code == 200:
            results = json.loads(r.text)
            return results
        else:
            log.error('HTTP code {0} returned during WHOIS search '
                      'request.'.format(r.status_code))
            return None
项目:ptauto    作者:IntegralDefense    | 项目源码 | 文件源码
def whois_search(self, query='', field=''):
        """
        Performs a WHOIS search with the given parameters

        :param query: the search term
        :type str:
        :param field: WHOIS field to execute the search on: domain, email,
                      name, organization, address, phone, nameserver
        :type str:
        :returns dict using json.loads()
        """
        log.debug('Permforming WHOIS search with query: {0} and field: '
                  '{1}'.format(query, field))
        params = {'query': query, 'field': field}
        r = requests.get('https://api.passivetotal.org/v2/whois/search',
                         auth=HTTPBasicAuth(self.username, self.apikey),
                         proxies=self.proxies, params=params)
        if r.status_code == 200:
            results = json.loads(r.text)
            return results
        else:
            log.error('HTTP code {0} returned during WHOIS search '
                      'request.'.format(r.status_code))
            return None
项目:telnet-iot-honeypot    作者:Phype    | 项目源码 | 文件源码
def cuckoo_check_if_dup(self, sha256):
        """
        Check if file already was analyzed by cuckoo
        """
        try:
            print("Looking for tasks for: {}".format(sha256))
            res = requests.get(urljoin(self.url_base, "/files/view/sha256/{}".format(sha256)),
                verify=False,
                auth=HTTPBasicAuth(self.api_user,self.api_passwd),
                timeout=60)
            if res and res.ok and res.status_code == 200:
                print("Sample found in Sandbox, with ID: {}".format(res.json().get("sample", {}).get("id", 0)))
                return True
            else:
                return False
        except Exception as e:
            print(e)

        return False
项目:telnet-iot-honeypot    作者:Phype    | 项目源码 | 文件源码
def postfile(self, artifact, fileName):
        """
        Send a file to Cuckoo
        """
        files = {"file": (fileName, open(artifact, "rb").read())}
        try:
            res = requests.post(urljoin(self.url_base, "tasks/create/file").encode("utf-8"), files=files, auth=HTTPBasicAuth(
                            self.api_user,
                            self.api_passwd
                        ),
                        verify=False)
            if res and res.ok:
                print("Cuckoo Request: {}, Task created with ID: {}".format(res.status_code, res.json()["task_id"]))
            else:
                print("Cuckoo Request failed: {}".format(res.status_code))
        except Exception as e:
            print("Cuckoo Request failed: {}".format(e))
        return
项目:telnet-iot-honeypot    作者:Phype    | 项目源码 | 文件源码
def posturl(self, scanUrl):
        """
        Send a URL to Cuckoo
        """
        data = {"url": scanUrl}
        try:
            res = requests.post(urljoin(self.url_base, "tasks/create/url").encode("utf-8"), data=data, auth=HTTPBasicAuth(
                            self.api_user,
                            self.api_passwd
                        ),
                        verify=False)
            if res and res.ok:
                print("Cuckoo Request: {}, Task created with ID: {}".format(res.status_code, res.json()["task_id"]))
            else:
                print("Cuckoo Request failed: {}".format(res.status_code))
        except Exception as e:
            print("Cuckoo Request failed: {}".format(e))
        return
项目:rankedftw    作者:andersroos    | 项目源码 | 文件源码
def run(self, args, logger):
        response = request('POST',
                           'https://eu.battle.net/oauth/token',
                           auth=HTTPBasicAuth(config.API_KEY, config.API_SECRET),
                           params=dict(grant_type='client_credentials'),
                           allow_redirects=False)

        if response.status_code != 200:
            logger.error("failed to get access token got %s: %s" % (response.status_code, response.content))
            return 1

        data = response.json()
        access_token = data['access_token']
        logger.info("writing access_token to %s, expires in %s" % (args.filename, data['expires_in']))
        with open(args.filename, 'w') as f:
            f.write(access_token)
        return 0
项目:ringzer0    作者:F00ker    | 项目源码 | 文件源码
def ch138():
    ek = 'RZ_CH138_PW'

    s = requests.Session()
    auth = HTTPBasicAuth('captcha', 'QJc9U6wxD4SFT0u')
    url = 'http://captcha.ringzer0team.com:7421'
    for _ in xrange(1001):
        time.sleep(0.10)
        r = s.get('{0}/form1.php'.format(url), auth=auth)
        m = re.search(r'if \(A == "([a-z0-9]*)"\)', r.text)
        captcha = m.group(1)
        r = s.get('{0}/captcha/captchabroken.php?new'.format(url), auth=auth)
        payload = {'captcha':captcha}
        r = s.post('{0}/captcha1.php'.format(url), auth=auth, data=payload)
        doc = lxml.html.document_fromstring(r.text)
        alert = doc.xpath('//div[contains(@class, "alert")]')[0]
        msg = alert.text_content().strip()
        print msg
项目:opendxl-epo-service-python    作者:opendxl    | 项目源码 | 文件源码
def __init__(self, host, port, username, password, verify):
        """
        Initializes the epoRemote with the information for the target ePO instance

        :param host: the hostname of the ePO to run remote commands on
        :param port: the port of the desired ePO
        :param username: the username to run the remote commands as
        :param password: the password for the ePO user
        :param verify: Whether to verify the ePO server's certificate
        """

        logger.debug('Initializing epoRemote for ePO {} on port {} with user {}'.format(host, port, username))

        self._baseurl = 'https://{}:{}/remote'.format(host, port)
        self._auth = HTTPBasicAuth(username, password)
        self._session = requests.Session()
        self._verify = verify
项目:OctoFusion    作者:tapnair    | 项目源码 | 文件源码
def add_strategy(self, domain, strategy):
        """Add a new domain and authentication strategy.

        :param str domain: The domain you wish to match against. For example:
            ``'https://api.github.com'``
        :param str strategy: The authentication strategy you wish to use for
            that domain. For example: ``('username', 'password')`` or
            ``requests.HTTPDigestAuth('username', 'password')``

        .. code-block:: python

            a = AuthHandler({})
            a.add_strategy('https://api.github.com', ('username', 'password'))

        """
        # Turn tuples into Basic Authentication objects
        if isinstance(strategy, tuple):
            strategy = HTTPBasicAuth(*strategy)

        key = self._key_from_url(domain)
        self.strategies[key] = strategy
项目:OctoFusion    作者:tapnair    | 项目源码 | 文件源码
def get_strategy_for(self, url):
        """Retrieve the authentication strategy for a specified URL.

        :param str url: The full URL you will be making a request against. For
            example, ``'https://api.github.com/user'``
        :returns: Callable that adds authentication to a request.

        .. code-block:: python

            import requests
            a = AuthHandler({'example.com', ('foo', 'bar')})
            strategy = a.get_strategy_for('http://example.com/example')
            assert isinstance(strategy, requests.auth.HTTPBasicAuth)

        """
        key = self._key_from_url(url)
        return self.strategies.get(key, NullAuthStrategy())
项目:ansible-module-artifactory    作者:William1444    | 项目源码 | 文件源码
def artifactory_repo_present(data):

    del data['state']

    headers = {
        "Content-Type": "application/json"
    }

    user = data['user']
    password = data['password']
    del data['user']
    del data['password']
    url = "{}/{}/{}".format(data['artifactory'], 'api/repositories', data['key'])
    result = requests.put(url, json.dumps(data), headers=headers, auth=HTTPBasicAuth(user, password))

    if result.status_code == 200:
        return False, True, {"status": result.status_code}
    elif result.status_code == 400 and 'errors' in result.json():
        for errors in result.json()['errors']:
            if 'key already exists' in errors['message']:
                return False, False, result.json()
    # default: something went wrong
    meta = {"status": result.status_code, 'response': result.json()}
    return True, False, meta
项目:ansible-module-artifactory    作者:William1444    | 项目源码 | 文件源码
def artifactory_license(data):

    headers = {
        "Content-Type": "application/json"
    }

    user = data['user']
    password = data['password']
    del data['user']
    del data['password']
    url = "{}/{}".format(data['artifactory'], 'api/system/license')
    result = requests.post(url, json.dumps(data), headers=headers, auth=HTTPBasicAuth(user, password))

    if result.status_code == 200:
        return False, True, result.json()

    # default: something went wrong
    meta = {"status": result.status_code, 'response': result.json()}
    return True, False, meta
项目:solidfire-sdk-python    作者:solidfire    | 项目源码 | 文件源码
def post(self, data):
        """
        Post data to the associated endpoint and await the server's response.

        :param data: the data to be posted.
        :type data: str or json
        """
        auth = None
        if self._username is None or self._password is None:
            raise ValueError("Username or Password is not set")
        else:
            auth = HTTPBasicAuth(self._username, self._password)
        resp = requests.post(self._endpoint, data=data, json=None,
                             verify=self._verify_ssl, timeout=self._timeout,
                             auth=auth)
        if resp.text == '':
             return {"code": resp.status_code, "name":  resp.reason, "message": ""}
        return resp.text
项目:prbuilds    作者:guardian    | 项目源码 | 文件源码
def overwrite_comment(self, url, body):

        """ overwrite comment on the given api url with body """

    payload = { "body": body }

        res = self.requests.patch(
            url,
            data = json.dumps(payload),
            auth = HTTPBasicAuth(
                self.name,
                self.token
            )
        )

        res.raise_for_status()
项目:prbuilds    作者:guardian    | 项目源码 | 文件源码
def post_comment(self, url, body):

        """ Add github comment using url endpoint """

    payload = { "body": body }

        res = self.requests.post(
            url,
            data = json.dumps(payload),
            auth = HTTPBasicAuth(
                self.name,
                self.token
            )
        )

        res.raise_for_status()
项目:djangomailup    作者:simobasso    | 项目源码 | 文件源码
def _get_access_token(self):
        """Get token and refresh_token from mailup."""
        data = {
            "grant_type": "password",
            "username": self.username,
            "password": self.password,
        }

        response = self.post(
            ENDPOINT["token"],
            data=data,
            auth=HTTPBasicAuth(self.client_id, self.client_secret),
        )

        data = response_parser(response)

        self.refresh_token = data["refresh_token"]
        return data["access_token"]
项目:cloudcenter-content    作者:datacenter    | 项目源码 | 文件源码
def get_host_id(name):
    s = requests.Session()

    url = tower_base_url+"hosts/"

    headers = {
        'content-type': "application/json"
    }
    querystring = {"name": name}

    response = s.request("GET", url, headers=headers, params=querystring, verify=False,
                         auth=HTTPBasicAuth(args.tower_username, args.tower_password))

    results = response.json()['results']
    if len(results) < 1:
        print("No host found with that name.")
        sys.exit(1)
    elif len(results) > 1:
        print("Multiple hosts found with that name, so I won't remove any of them.")
        sys.exit(1)
    else:
        return results[0]['id']
项目:cloudcenter-content    作者:datacenter    | 项目源码 | 文件源码
def add_host(host_name, inventory):
    s = requests.Session()

    url = tower_base_url+"hosts/"

    headers = {
        'content-type': "application/json"
    }

    payload = {
        "name": host_name,
        "enabled": True,
        "inventory": inventory
    }

    s.request("POST", url, headers=headers, data=json.dumps(payload), verify=False,
              auth=HTTPBasicAuth(args.tower_username, args.tower_password))
项目:lain-sdk    作者:laincloud    | 项目源码 | 文件源码
def get_jwt_for_registry(auth_url, registry, appname):
    # get auth username and password from dockercfg
    try:
        cfg = auth.resolve_authconfig(auth.load_config(), registry=registry)
        username = cfg['username'] if 'username' in cfg else cfg['Username']
        password = cfg['password'] if 'password' in cfg else cfg['Password']
        # phase, phase_config = get_phase_config_from_registry(registry)
        # domain = phase_config.get(user_config.domain_key, '')
        # only use `lain.local` as service
        url = "%s?service=%s&scope=repository:%s:push,pull&account=%s" % (
            auth_url, "lain.local", appname, username)
        response = requests.get(url, auth=HTTPBasicAuth(username, password))
        if response.status_code < 400 and response.json()['token']:
            return response.json()['token']
    except Exception as e:
        warn("can not load registry auth config : %s, need lain login first." % e)
        return ''
项目:cg-uaa-extras    作者:18F    | 项目源码 | 文件源码
def test_oauth_token(self):
        """oauth_token() makes a POST to /oauth/token with the appropriate headers and query params"""

        uaac = UAAClient('http://example.com', 'foo', False)
        m = Mock()
        uaac._request = m

        uaac.oauth_token('foo', 'bar', 'baz')

        args, kwargs = m.call_args

        assert args == ('/oauth/token', 'POST')

        assert kwargs['params'] == {
            'code': 'foo',
            'grant_type': 'authorization_code',
            'response_type': 'token'
        }

        assert isinstance(kwargs['auth'], HTTPBasicAuth)
        assert kwargs['auth'].username == 'bar'
        assert kwargs['auth'].password == 'baz'
项目:cg-uaa-extras    作者:18F    | 项目源码 | 文件源码
def test_get_client_token(self):
        """_get_client_token() makes a POST to /oauth/token with the appropriate headers and query params"""

        uaac = UAAClient('http://example.com', 'foo', False)
        m = Mock()
        uaac._request = m

        uaac._get_client_token('bar', 'baz')

        args, kwargs = m.call_args

        assert args == ('/oauth/token', 'POST')

        assert kwargs['params'] == {
            'grant_type': 'client_credentials',
            'response_type': 'token'
        }

        assert isinstance(kwargs['auth'], HTTPBasicAuth)
        assert kwargs['auth'].username == 'bar'
        assert kwargs['auth'].password == 'baz'
项目:cg-uaa-extras    作者:18F    | 项目源码 | 文件源码
def _get_client_token(self, client_id, client_secret):
        """ Returns the client credentials token

        Args:
            client_id: The oauth client id that this code was generated for
            client_secret: The secret for the client_id above

        Raises:
            UAAError: there was an error getting the token

        Returns:
            dict: An object representing the token

        """
        response = self._request(
            '/oauth/token',
            'POST',
            params={
                'grant_type': 'client_credentials',
                'response_type': 'token'
            },
            auth=HTTPBasicAuth(client_id, client_secret)
        )
        return response.get('access_token', None)
项目:bravialib    作者:8none1    | 项目源码 | 文件源码
def complete_pair(self, pin):
        # The user should have a PIN on the screen now, pass it in here to complete the pairing process
        payload = self._build_json_payload("actRegister", 
            [{"clientid":self.device_id, "nickname":self.nickname},
            [{"value":"no", "function":"WOL"}]])
        self.auth = HTTPBasicAuth('',pin) # Going to keep this in the object, just in case we need it again later
        r = self.do_POST(url='/sony/accessControl', payload=payload, auth=self.auth)
        if r.status_code == 200:
            print("have paired")
            self.paired = True
            # let's call connect again to get the cookies all set up properly
            a,b = self.connect()
            if b is True:
                return r,True
            else:  return r,False
        else:
            return None,False