我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用requests.auth.HTTPDigestAuth()。
def auth(): # Basic Authentication requests.get('https://api.github.com/user', auth=HTTPBasicAuth('user', 'pass')) requests.get('https://api.github.com/user', auth=('user', 'pass')) # Digest Authentication url = 'http://httpbin.org/digest-auth/auth/user/pass' requests.get(url, auth=HTTPDigestAuth('user', 'pass')) # OAuth2 Authentication????requests-oauthlib url = 'https://api.twitter.com/1.1/account/verify_credentials.json' auth = OAuth2('YOUR_APP_KEY', 'YOUR_APP_SECRET', 'USER_OAUTH_TOKEN') requests.get(url, auth=auth) pass
def _auth(http_auth): if not http_auth: return None if http_auth['auth_type'] == 'basic': return HTTPBasicAuth( http_auth['username'], http_auth['password'] ) if http_auth['auth_type'] == 'digest': return HTTPDigestAuth( http_auth['username'], http_auth['password'] ) raise Exception('Authorization information is not valid.')
def update_membership_status(): r = requests.get( 'https://register.utn.se/api.php', auth=HTTPDigestAuth(settings.MEMBERSHIP_API_USER, settings.MEMBERSHIP_API_PASSWORD), params={ 'action': 'list', }, ) try: data = r.json() except ValueError: return for member in Member.objects.all(): if member.person_number().replace('-', '') in data: member.update_status(data='member') else: member.update_status(data='nonmember') Member.objects.filter(pk=member.pk).update( status=member.status, status_changed=member.status_changed )
def __init__(self): CradlepointClient.__init__(self) # save the last 'request' and 'response' - init in the base class # self.last_url = None, self.last_reply = None # this is the base URL used by request calls, # and will be like "http://192.168.1.1/api" # self.router_ip = None self.base_url = None # self.digest will hold the HTTPDigestAuth(), # ONCE the user/password are properly set self.digest = None return
def set_user_password(self, user_name, password): """ Create the DIGEST authentication handler, which handles the Cradlepoint's challenge/response to Router API (CURL-like) calls. :param str user_name: :param str password: :return: """ from requests.auth import HTTPDigestAuth assert user_name is not None and isinstance(user_name, str) assert password is not None and isinstance(password, str) self.digest = HTTPDigestAuth(user_name, password) # if self._logger is not None: # self._logger.debug("CSClient() set user={}, pass=*****".format( # user_name)) return
def get_devices(self): ''' Gets all devices and configs from Indigo. Returns: devices (dict): A dict of all devices from Indigo ''' devices = {} url = self.url + '/devices.json' device_list = requests.get(url,auth=HTTPDigestAuth(self.user, self.password), verify=False).json() for d in device_list: url = self.url + d.get('restURL') ff_name = 'indigo-%s' % d.get('name') devices[ff_name] = requests.get(url,auth=HTTPDigestAuth(self.user, self.password), verify=False).json() devices[ff_name]['restURL'] = d.get('restURL') return devices
def __init__(self, hass, device_info): """Initialize a generic camera.""" super().__init__() self.hass = hass self._authentication = device_info.get(CONF_AUTHENTICATION) self._name = device_info.get(CONF_NAME) self._still_image_url = device_info[CONF_STILL_IMAGE_URL] self._still_image_url.hass = hass self._limit_refetch = device_info[CONF_LIMIT_REFETCH_TO_URL_CHANGE] username = device_info.get(CONF_USERNAME) password = device_info.get(CONF_PASSWORD) if username and password: if self._authentication == HTTP_DIGEST_AUTHENTICATION: self._auth = HTTPDigestAuth(username, password) else: self._auth = aiohttp.BasicAuth(username, password=password) else: self._auth = None self._last_url = None self._last_image = None
def target_function(self, url, user, password): name = threading.current_thread().name user = user.encode('utf-8').strip() password = password.encode('utf-8').strip() response = http_request(method="GET", url=url, auth=HTTPDigestAuth(user, password)) if response is not None and response.status_code != 401: print_success("Target: {}:{} {}: Authentication Succeed - Username: '{}' Password: '{}'".format(self.target, self.port, name, user, password), verbose=self.verbosity) self.credentials.append((self.target, self.port, user, password)) if self.stop_on_success: raise StopThreadPoolExecutor else: print_error("Target: {}:{} {}: Authentication Failed - Username: '{}' Password: '{}'".format(self.target, self.port, name, user, password), verbose=self.verbosity)
def target_function(self, url, creds): name = threading.current_thread().name user, password = creds user = user.encode('utf-8').strip() password = password.encode('utf-8').strip() response = http_request(method="GET", url=url, auth=HTTPDigestAuth(user, password)) if response is not None and response.status_code != 401: print_success("Target: {}:{} {}: Authentication Succeed - Username: '{}' Password: '{}'".format(self.target, self.port, name, user, password), verbose=self.verbosity) self.credentials.append((self.target, self.port, user, password)) if self.stop_on_success: raise StopThreadPoolExecutor else: print_error("Target: {}:{} {}: Authentication Failed - Username: '{}' Password: '{}'".format(self.target, self.port, name, user, password), verbose=self.verbosity)
def http_get(self,ip,port,user,password,path,payload,auth_mode=0): url = "http://{}:{}/{}".format(ip,port,path) self.logger.debug("http_get: Sending: %s %s auth_mode=%d" % (url, payload, auth_mode) ) if auth_mode == 0: auth = HTTPBasicAuth(user,password) elif auth_mode == 1: auth = HTTPDigestAuth(user,password) else: self.send_error("Unknown auth_mode '%s' for request '%s'. Must be 0 for 'digest' or 1 for 'basic'." % (auth_mode, url) ) return False try: response = requests.get( url, auth=auth, params=payload, timeout=5 ) # This is supposed to catch all request excpetions. except requests.exceptions.RequestException as e: self.send_error("Connection error for %s: %s" % (url, e)) return False self.logger.debug("http_get: Got: code=%s", response.status_code) if response.status_code == 200: #self.logger.debug("http_get: Got: text=%s", response.text) return response.text elif response.status_code == 400: self.send_error("Bad request: %s" % (url) ) elif response.status_code == 404: self.send_error("Not Found: %s" % (url) ) elif response.status_code == 401: # Authentication error self.send_error( "Failed to authenticate, please check your username and password") else: self.send_error("Unknown response %s: %s" % (response.status_code, url) ) return False
def get_books(self, url, username, password, vipGroups, picture_path, conn_auth="basic", conn_verify=True, debug=False): if debug: print("get_books(%s)" % url) # url base url_split = urllib.parse.urlparse(url) url_base = url_split.scheme + '://' + url_split.netloc # authentification settings = {"verify": conn_verify} if conn_auth == "basic": settings["auth"] = (username, password) elif conn_auth == "digest": from requests.auth import HTTPDigestAuth settings["auth"] = HTTPDigestAuth(user, passwd) session = requests.session() xml = self._get_xml(session, url, settings, debug) hrefs = self._process_xml(xml, debug) # convert into vcard objects cards = [] for href in hrefs.keys(): cards.append(self._get_vcard(session, url_base + href, settings, debug)) vcf = fritzbox.VCF.Import() books = vcf.get_books_by_cards(cards, vipGroups, picture_path, debug=debug) return books
def bruteForce(password, username): #print str(username) + str(password) tryComb = requests.post("http://" + target +":9990/management", auth=HTTPDigestAuth(str(username), str(password))) statusCode = tryComb.status_code if statusCode == 415: print "\033[92m\033[1m[+]\033[0m Credential Found => " + username + ":" + password
def _http_digest_auth(self, auth_id, auth_pw): # We got everything as we expected, create the HTTPDigestAuth object. self._auth = HTTPDigestAuth(auth_id, auth_pw)
def _get_query(): registry = args.registry if not registry: parser.error("No registry provided, neither as a command argument nor through the environment variable " "REGISTRY.") if registry.startswith('http'): base_url = registry else: base_url = 'https://{0}'.format(registry) kwargs = {} if args.user: if args.digest_auth: kwargs['auth'] = HTTPDigestAuth(args.user, args.password) else: kwargs['auth'] = HTTPBasicAuth(args.user, args.password) elif os.path.isfile(DOCKER_CONFIG_FILE): config_auth = _get_auth_config(registry) if config_auth: kwargs['auth'] = HTTPBase64Auth(config_auth) if args.verify is not None: kwargs['verify'] = value_or_false(args.verify) if args.client_cert: if args.client_key: kwargs['cert'] = (args.client_cert, args.client_key) else: kwargs['cert'] = args.client_cert if args.use_get_manifest: kwargs['use_get_manifest'] = True client = DockerRegistryClient(base_url, **kwargs) query = DockerRegistryQuery(client) cache_fn = _get_cache_name(registry) if cache_fn and os.path.isfile(cache_fn) and not args.refresh: with open(cache_fn) as f: query.load(f) return query
def execute(self, **kwargs): """ Calls the FritzBox action and returns a dictionary with the arguments. """ headers = self.header.copy() headers['soapaction'] = '%s#%s' % (self.service_type, self.name) data = self.envelope.strip() % self._body_builder(kwargs) url = 'http://%s:%s%s' % (self.address, self.port, self.control_url) auth = None if self.password: auth=HTTPDigestAuth(self.user, self.password) response = requests.post(url, data=data, headers=headers, auth=auth) # lxml needs bytes, therefore response.content (not response.text) result = self.parse_response(response.content) return result
def test_DIGEST_HTTP_200_OK_GET(self, httpbin): auth = HTTPDigestAuth('user', 'pass') url = httpbin('digest-auth', 'auth', 'user', 'pass') r = requests.get(url, auth=auth) assert r.status_code == 200 r = requests.get(url) assert r.status_code == 401 s = requests.session() s.auth = HTTPDigestAuth('user', 'pass') r = s.get(url) assert r.status_code == 200
def test_DIGEST_AUTH_RETURNS_COOKIE(self, httpbin): url = httpbin('digest-auth', 'auth', 'user', 'pass') auth = HTTPDigestAuth('user', 'pass') r = requests.get(url) assert r.cookies['fake'] == 'fake_value' r = requests.get(url, auth=auth) assert r.status_code == 200
def test_DIGEST_AUTH_SETS_SESSION_COOKIES(self, httpbin): url = httpbin('digest-auth', 'auth', 'user', 'pass') auth = HTTPDigestAuth('user', 'pass') s = requests.Session() s.get(url, auth=auth) assert s.cookies['fake'] == 'fake_value'
def test_DIGEST_STREAM(self, httpbin): auth = HTTPDigestAuth('user', 'pass') url = httpbin('digest-auth', 'auth', 'user', 'pass') r = requests.get(url, auth=auth, stream=True) assert r.raw.read() != b'' r = requests.get(url, auth=auth, stream=False) assert r.raw.read() == b''
def test_DIGESTAUTH_WRONG_HTTP_401_GET(self, httpbin): auth = HTTPDigestAuth('user', 'wrongpass') url = httpbin('digest-auth', 'auth', 'user', 'pass') r = requests.get(url, auth=auth) assert r.status_code == 401 r = requests.get(url) assert r.status_code == 401 s = requests.session() s.auth = auth r = s.get(url) assert r.status_code == 401
def __init__(self): username = config.CLOUDAPP_USERNAME password = config.CLOUDAPP_PASSWORD if not username or not password: raise StorageError("Set 'cloudapp_username' and 'cloudapp_password' configs.") self._cloudapp_auth = HTTPDigestAuth(username, password) self._session = requests.Session() self._session.headers = { 'Accept': 'application/json', }
def __init__(self, url, username, password, user): self.url = url self.user = user self.auth = HTTPDigestAuth(username, password) self.form_list_url = urljoin(self.url, 'formList') self.submission_list_url = urljoin(self.url, 'view/submissionList') self.download_submission_url = urljoin(self.url, 'view/downloadSubmission') self.forms_path = os.path.join( self.user.username, 'briefcase', 'forms') self.resumption_cursor = 0 self.logger = logging.getLogger('console_logger')
def get_auth(): """Get authorization token for https """ import getpass from requests.auth import HTTPDigestAuth #This binds raw_input to input for Python 2 try: input = raw_input except NameError: pass uname = input("MODSCAG Username:") pw = getpass.getpass("MODSCAG Password:") auth = HTTPDigestAuth(uname, pw) #wget -A'h8v4*snow_fraction.tif' --user=uname --password=pw return auth
def auth(self): """ Return credentials for current user. """ return HTTPDigestAuth(self.username, self.password)
def run(self, data): port = data["port"] users = data["users"] passwords = data["passwords"] url = "http://{0}:{1}".format(self.ip, port) response = requests.get(url) message = "" if response is not None: if response.status_code != 401: message = "{} is not protected by authentication system".format(self.ip) return message else: if 'Basic' in response.headers['WWW-Authenticate']: auth = "basic" elif 'Digest' in response.headers['WWW-Authenticate']: auth = "digest" else: auth = None for user in users: for password in passwords: try: if auth == "basic": response = requests.get(url, auth=HTTPBasicAuth(user, password.strip())) elif auth == "digest": response = requests.get(url, auth=HTTPDigestAuth(user, password.strip())) else: return None if response.status_code == 200: message += "Success!\nCredentials found\nUsername: {0} Password: {1}\n".format(user, password) except requests.ConnectionError: return None if message == "": message = "Error. Credentials not found" return message
def setup_cluster(self): if self.couple is not None and self.couple_pass is None: self.couple_pass = self.adminpass self.couple_user = self.adminuser if self.boothost is None: self.boothost = self.host[0] self.host.remove(self.boothost) if self.ml_init(self.boothost): self.ml_security(self.boothost) conn = Connection(self.boothost, HTTPDigestAuth(self.adminuser, self.adminpass)) self.marklogic = MarkLogic(conn) for hostname in self.host: self.ml_init(hostname) self.ml_join(self.boothost, hostname) if self.name is not None: print("{0}: rename cluster...".format(self.boothost)) cluster = self.marklogic.cluster() cluster.set_cluster_name(self.name) cluster.update() if self.couple is not None: for couple in self.couple: print("{0}: couple with {1}...".format(self.boothost, couple)) altconn = Connection(couple, HTTPDigestAuth(self.couple_user, self.couple_pass)) altml = MarkLogic(altconn) altcluster = altml.cluster() cluster = self.marklogic.cluster() cluster.couple(altcluster) print("Finished")
def _get_http_auth(username: str, password: str, auth_type: str) -> AuthBase: if auth_type == 'basic': return HTTPBasicAuth(username, password) if auth_type == 'digest': return HTTPDigestAuth(username, password) raise RetsClientError('unknown auth type %s' % auth_type)
def update_status(self, data=None): if data is None: if self.person_number() == '': return try: r = requests.get( 'https://register.utn.se/api.php', auth=HTTPDigestAuth(settings.MEMBERSHIP_API_USER, settings.MEMBERSHIP_API_PASSWORD), params={ 'action': 'check', 'person_number': self.person_number().replace('-', '') }, ) data = r.json().get('status') except requests.exceptions.ConnectionError: data = 'unknown' except ValueError: return if data == 'member': self.status = 'member' elif data == 'nonmember': if self.status in ['unknown', 'nonmember']: self.status = 'nonmember' else: self.status = 'alumnus' self.status_changed = timezone.now()
def get_auth_obj(method, user, password): """ Generates an authentication code object depending of method as parameter: * "basic" * "digest" * "ntlm" .. warning: This function may be removed in future versions of GoLismero. :param method: Auth method: basic, digest, ntlm. :type method: str :param user: string with user text :type user: str :param password: string with password text :type password: str :return: an object with authentication or None if error/problem. """ m_auth_obj = None if method: m_method = method.lower() if m_method == "basic": m_auth_obj = HTTPBasicAuth(user, password) elif m_method == "digest": m_auth_obj = HTTPDigestAuth(user, password) elif m_method == "ntlm": m_auth_obj = HttpNtlmAuth(user, password) return m_auth_obj #------------------------------------------------------------------------------
def get_digest(): return HTTPDigestAuth(g_dev_client_username, g_dev_client_password) # Returns boolean to indicate if the NCOS device is # in DEV mode