Python requests.auth 模块,HTTPDigestAuth() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用requests.auth.HTTPDigestAuth()

项目:core-python    作者:yidao620c    | 项目源码 | 文件源码
def auth():
    # Basic Authentication
    requests.get('https://api.github.com/user', auth=HTTPBasicAuth('user', 'pass'))
    requests.get('https://api.github.com/user', auth=('user', 'pass'))

    # Digest Authentication
    url = 'http://httpbin.org/digest-auth/auth/user/pass'
    requests.get(url, auth=HTTPDigestAuth('user', 'pass'))

    # OAuth2 Authentication????requests-oauthlib
    url = 'https://api.twitter.com/1.1/account/verify_credentials.json'
    auth = OAuth2('YOUR_APP_KEY', 'YOUR_APP_SECRET', 'USER_OAUTH_TOKEN')
    requests.get(url, auth=auth)



    pass
项目:cc-server    作者:curious-containers    | 项目源码 | 文件源码
def _auth(http_auth):
    if not http_auth:
        return None

    if http_auth['auth_type'] == 'basic':
        return HTTPBasicAuth(
            http_auth['username'],
            http_auth['password']
        )

    if http_auth['auth_type'] == 'digest':
        return HTTPDigestAuth(
            http_auth['username'],
            http_auth['password']
        )

    raise Exception('Authorization information is not valid.')
项目:moore    作者:UTNkar    | 项目源码 | 文件源码
def update_membership_status():
    r = requests.get(
        'https://register.utn.se/api.php',
        auth=HTTPDigestAuth(settings.MEMBERSHIP_API_USER,
                            settings.MEMBERSHIP_API_PASSWORD),
        params={
            'action': 'list',
        },
    )
    try:
        data = r.json()
    except ValueError:
        return

    for member in Member.objects.all():
        if member.person_number().replace('-', '') in data:
            member.update_status(data='member')
        else:
            member.update_status(data='nonmember')
        Member.objects.filter(pk=member.pk).update(
            status=member.status, status_changed=member.status_changed
        )
项目:sdk-samples    作者:cradlepoint    | 项目源码 | 文件源码
def __init__(self):

        CradlepointClient.__init__(self)

        # save the last 'request' and 'response' - init in the base class
        # self.last_url = None, self.last_reply = None

        # this is the base URL used by request calls,
        # and will be like "http://192.168.1.1/api"
        # self.router_ip = None
        self.base_url = None

        # self.digest will hold the HTTPDigestAuth(),
        # ONCE the user/password are properly set
        self.digest = None

        return
项目:sdk-samples    作者:cradlepoint    | 项目源码 | 文件源码
def set_user_password(self, user_name, password):
        """
        Create the DIGEST authentication handler, which handles the
        Cradlepoint's challenge/response to Router API (CURL-like) calls.

        :param str user_name:
        :param str password:
        :return:
        """
        from requests.auth import HTTPDigestAuth

        assert user_name is not None and isinstance(user_name, str)
        assert password is not None and isinstance(password, str)

        self.digest = HTTPDigestAuth(user_name, password)

        # if self._logger is not None:
        #     self._logger.debug("CSClient() set user={}, pass=*****".format(
        # user_name))
        return
项目:Firefly    作者:Firefly-Automation    | 项目源码 | 文件源码
def get_devices(self):
    '''
    Gets all devices and configs from Indigo.

    Returns:
      devices (dict): A dict of all devices from Indigo

    '''
    devices = {}
    url = self.url + '/devices.json'
    device_list = requests.get(url,auth=HTTPDigestAuth(self.user, self.password), verify=False).json()

    for d in device_list:
      url = self.url + d.get('restURL')
      ff_name = 'indigo-%s' % d.get('name')
      devices[ff_name] = requests.get(url,auth=HTTPDigestAuth(self.user, self.password), verify=False).json()
      devices[ff_name]['restURL'] = d.get('restURL')

    return devices
项目:homeassistant    作者:NAStools    | 项目源码 | 文件源码
def __init__(self, hass, device_info):
        """Initialize a generic camera."""
        super().__init__()
        self.hass = hass
        self._authentication = device_info.get(CONF_AUTHENTICATION)
        self._name = device_info.get(CONF_NAME)
        self._still_image_url = device_info[CONF_STILL_IMAGE_URL]
        self._still_image_url.hass = hass
        self._limit_refetch = device_info[CONF_LIMIT_REFETCH_TO_URL_CHANGE]

        username = device_info.get(CONF_USERNAME)
        password = device_info.get(CONF_PASSWORD)

        if username and password:
            if self._authentication == HTTP_DIGEST_AUTHENTICATION:
                self._auth = HTTPDigestAuth(username, password)
            else:
                self._auth = aiohttp.BasicAuth(username, password=password)
        else:
            self._auth = None

        self._last_url = None
        self._last_image = None
项目:routersploit    作者:reverse-shell    | 项目源码 | 文件源码
def target_function(self, url, user, password):
        name = threading.current_thread().name

        user = user.encode('utf-8').strip()
        password = password.encode('utf-8').strip()

        response = http_request(method="GET", url=url, auth=HTTPDigestAuth(user, password))

        if response is not None and response.status_code != 401:
            print_success("Target: {}:{} {}: Authentication Succeed - Username: '{}' Password: '{}'".format(self.target, self.port, name, user, password), verbose=self.verbosity)
            self.credentials.append((self.target, self.port, user, password))
            if self.stop_on_success:
                raise StopThreadPoolExecutor
        else:
            print_error("Target: {}:{} {}: Authentication Failed - Username: '{}' Password: '{}'".format(self.target, self.port, name, user, password), verbose=self.verbosity)
项目:routersploit    作者:reverse-shell    | 项目源码 | 文件源码
def target_function(self, url, creds):
        name = threading.current_thread().name
        user, password = creds
        user = user.encode('utf-8').strip()
        password = password.encode('utf-8').strip()

        response = http_request(method="GET", url=url, auth=HTTPDigestAuth(user, password))

        if response is not None and response.status_code != 401:
            print_success("Target: {}:{} {}: Authentication Succeed - Username: '{}' Password: '{}'".format(self.target, self.port, name, user, password), verbose=self.verbosity)
            self.credentials.append((self.target, self.port, user, password))
            if self.stop_on_success:
                raise StopThreadPoolExecutor
        else:
            print_error("Target: {}:{} {}: Authentication Failed - Username: '{}' Password: '{}'".format(self.target, self.port, name, user, password), verbose=self.verbosity)
项目:purelove    作者:hucmosin    | 项目源码 | 文件源码
def target_function(self, url, user, password):
        name = threading.current_thread().name

        user = user.encode('utf-8').strip()
        password = password.encode('utf-8').strip()

        response = http_request(method="GET", url=url, auth=HTTPDigestAuth(user, password))

        if response is not None and response.status_code != 401:
            print_success("Target: {}:{} {}: Authentication Succeed - Username: '{}' Password: '{}'".format(self.target, self.port, name, user, password), verbose=self.verbosity)
            self.credentials.append((self.target, self.port, user, password))
            if self.stop_on_success:
                raise StopThreadPoolExecutor
        else:
            print_error("Target: {}:{} {}: Authentication Failed - Username: '{}' Password: '{}'".format(self.target, self.port, name, user, password), verbose=self.verbosity)
项目:purelove    作者:hucmosin    | 项目源码 | 文件源码
def target_function(self, url, creds):
        name = threading.current_thread().name
        user, password = creds
        user = user.encode('utf-8').strip()
        password = password.encode('utf-8').strip()

        response = http_request(method="GET", url=url, auth=HTTPDigestAuth(user, password))

        if response is not None and response.status_code != 401:
            print_success("Target: {}:{} {}: Authentication Succeed - Username: '{}' Password: '{}'".format(self.target, self.port, name, user, password), verbose=self.verbosity)
            self.credentials.append((self.target, self.port, user, password))
            if self.stop_on_success:
                raise StopThreadPoolExecutor
        else:
            print_error("Target: {}:{} {}: Authentication Failed - Username: '{}' Password: '{}'".format(self.target, self.port, name, user, password), verbose=self.verbosity)
项目:camera-polyglot    作者:jimboca    | 项目源码 | 文件源码
def http_get(self,ip,port,user,password,path,payload,auth_mode=0):
        url = "http://{}:{}/{}".format(ip,port,path)

        self.logger.debug("http_get: Sending: %s %s auth_mode=%d" % (url, payload, auth_mode) )
        if auth_mode == 0:
            auth = HTTPBasicAuth(user,password)
        elif auth_mode == 1:
            auth = HTTPDigestAuth(user,password)
        else:
            self.send_error("Unknown auth_mode '%s' for request '%s'.  Must be 0 for 'digest' or 1 for 'basic'." % (auth_mode, url) )
            return False

        try:
            response = requests.get(
                url,
                auth=auth,
                params=payload,
                timeout=5
            )
        # This is supposed to catch all request excpetions.
        except requests.exceptions.RequestException as e:
            self.send_error("Connection error for %s: %s" % (url, e))
            return False
        self.logger.debug("http_get: Got: code=%s", response.status_code)
        if response.status_code == 200:
            #self.logger.debug("http_get: Got: text=%s", response.text)
            return response.text
        elif response.status_code == 400:
            self.send_error("Bad request: %s" % (url) )
        elif response.status_code == 404:
            self.send_error("Not Found: %s" % (url) )
        elif response.status_code == 401:
            # Authentication error
            self.send_error(
                "Failed to authenticate, please check your username and password")
        else:
            self.send_error("Unknown response %s: %s" % (response.status_code, url) )
        return False
项目:python-fritzbox    作者:pamapa    | 项目源码 | 文件源码
def get_books(self, url, username, password, vipGroups, picture_path,
                conn_auth="basic", conn_verify=True, debug=False):
    if debug: print("get_books(%s)" % url)

    # url base
    url_split = urllib.parse.urlparse(url)
    url_base = url_split.scheme + '://' + url_split.netloc

    # authentification
    settings = {"verify": conn_verify}
    if conn_auth == "basic":
      settings["auth"] = (username, password)
    elif conn_auth == "digest":
      from requests.auth import HTTPDigestAuth
      settings["auth"] = HTTPDigestAuth(user, passwd)

    session = requests.session()
    xml = self._get_xml(session, url, settings, debug)
    hrefs = self._process_xml(xml, debug)

    # convert into vcard objects
    cards = []
    for href in hrefs.keys():
      cards.append(self._get_vcard(session, url_base + href, settings, debug))

    vcf = fritzbox.VCF.Import()
    books = vcf.get_books_by_cards(cards, vipGroups, picture_path, debug=debug)
    return books
项目:wildPwn    作者:hlldz    | 项目源码 | 文件源码
def bruteForce(password, username):
        #print str(username) + str(password)
        tryComb     = requests.post("http://" + target +":9990/management", auth=HTTPDigestAuth(str(username), str(password)))
        statusCode  = tryComb.status_code
        if statusCode == 415:
            print "\033[92m\033[1m[+]\033[0m Credential Found => " + username + ":" + password
项目:python-gerrit    作者:gillarkod    | 项目源码 | 文件源码
def _http_digest_auth(self, auth_id, auth_pw):
        # We got everything as we expected, create the HTTPDigestAuth object.
        self._auth = HTTPDigestAuth(auth_id, auth_pw)
项目:docker-registry-util    作者:merll    | 项目源码 | 文件源码
def _get_query():
    registry = args.registry
    if not registry:
        parser.error("No registry provided, neither as a command argument nor through the environment variable "
                     "REGISTRY.")
    if registry.startswith('http'):
        base_url = registry
    else:
        base_url = 'https://{0}'.format(registry)
    kwargs = {}
    if args.user:
        if args.digest_auth:
            kwargs['auth'] = HTTPDigestAuth(args.user, args.password)
        else:
            kwargs['auth'] = HTTPBasicAuth(args.user, args.password)
    elif os.path.isfile(DOCKER_CONFIG_FILE):
        config_auth = _get_auth_config(registry)
        if config_auth:
            kwargs['auth'] = HTTPBase64Auth(config_auth)
    if args.verify is not None:
        kwargs['verify'] = value_or_false(args.verify)
    if args.client_cert:
        if args.client_key:
            kwargs['cert'] = (args.client_cert, args.client_key)
        else:
            kwargs['cert'] = args.client_cert
    if args.use_get_manifest:
        kwargs['use_get_manifest'] = True

    client = DockerRegistryClient(base_url, **kwargs)
    query = DockerRegistryQuery(client)
    cache_fn = _get_cache_name(registry)
    if cache_fn and os.path.isfile(cache_fn) and not args.refresh:
        with open(cache_fn) as f:
            query.load(f)
    return query
项目:Zipatoapi    作者:ggruner    | 项目源码 | 文件源码
def execute(self, **kwargs):
        """
        Calls the FritzBox action and returns a dictionary with the arguments.
        """
        headers = self.header.copy()
        headers['soapaction'] = '%s#%s' % (self.service_type, self.name)
        data = self.envelope.strip() % self._body_builder(kwargs)
        url = 'http://%s:%s%s' % (self.address, self.port, self.control_url)
        auth = None
        if self.password:
            auth=HTTPDigestAuth(self.user, self.password)
        response = requests.post(url, data=data, headers=headers, auth=auth)
        # lxml needs bytes, therefore response.content (not response.text)
        result = self.parse_response(response.content)
        return result
项目:ruffruffs    作者:di    | 项目源码 | 文件源码
def test_DIGEST_HTTP_200_OK_GET(self, httpbin):

        auth = HTTPDigestAuth('user', 'pass')
        url = httpbin('digest-auth', 'auth', 'user', 'pass')

        r = requests.get(url, auth=auth)
        assert r.status_code == 200

        r = requests.get(url)
        assert r.status_code == 401

        s = requests.session()
        s.auth = HTTPDigestAuth('user', 'pass')
        r = s.get(url)
        assert r.status_code == 200
项目:ruffruffs    作者:di    | 项目源码 | 文件源码
def test_DIGEST_AUTH_RETURNS_COOKIE(self, httpbin):
        url = httpbin('digest-auth', 'auth', 'user', 'pass')
        auth = HTTPDigestAuth('user', 'pass')
        r = requests.get(url)
        assert r.cookies['fake'] == 'fake_value'

        r = requests.get(url, auth=auth)
        assert r.status_code == 200
项目:ruffruffs    作者:di    | 项目源码 | 文件源码
def test_DIGEST_AUTH_SETS_SESSION_COOKIES(self, httpbin):
        url = httpbin('digest-auth', 'auth', 'user', 'pass')
        auth = HTTPDigestAuth('user', 'pass')
        s = requests.Session()
        s.get(url, auth=auth)
        assert s.cookies['fake'] == 'fake_value'
项目:ruffruffs    作者:di    | 项目源码 | 文件源码
def test_DIGEST_STREAM(self, httpbin):

        auth = HTTPDigestAuth('user', 'pass')
        url = httpbin('digest-auth', 'auth', 'user', 'pass')

        r = requests.get(url, auth=auth, stream=True)
        assert r.raw.read() != b''

        r = requests.get(url, auth=auth, stream=False)
        assert r.raw.read() == b''
项目:ruffruffs    作者:di    | 项目源码 | 文件源码
def test_DIGESTAUTH_WRONG_HTTP_401_GET(self, httpbin):

        auth = HTTPDigestAuth('user', 'wrongpass')
        url = httpbin('digest-auth', 'auth', 'user', 'pass')

        r = requests.get(url, auth=auth)
        assert r.status_code == 401

        r = requests.get(url)
        assert r.status_code == 401

        s = requests.session()
        s.auth = auth
        r = s.get(url)
        assert r.status_code == 401
项目:grab-screen    作者:andrei-shabanski    | 项目源码 | 文件源码
def __init__(self):
        username = config.CLOUDAPP_USERNAME
        password = config.CLOUDAPP_PASSWORD
        if not username or not password:
            raise StorageError("Set 'cloudapp_username' and 'cloudapp_password' configs.")

        self._cloudapp_auth = HTTPDigestAuth(username, password)
        self._session = requests.Session()
        self._session.headers = {
            'Accept': 'application/json',
        }
项目:fieldsight-kobocat    作者:awemulya    | 项目源码 | 文件源码
def __init__(self, url, username, password, user):
        self.url = url
        self.user = user
        self.auth = HTTPDigestAuth(username, password)
        self.form_list_url = urljoin(self.url, 'formList')
        self.submission_list_url = urljoin(self.url, 'view/submissionList')
        self.download_submission_url = urljoin(self.url,
                                               'view/downloadSubmission')
        self.forms_path = os.path.join(
            self.user.username, 'briefcase', 'forms')
        self.resumption_cursor = 0
        self.logger = logging.getLogger('console_logger')
项目:pygeotools    作者:dshean    | 项目源码 | 文件源码
def get_auth():
    """Get authorization token for https
    """
    import getpass
    from requests.auth import HTTPDigestAuth
    #This binds raw_input to input for Python 2
    try:
       input = raw_input
    except NameError:
       pass
    uname = input("MODSCAG Username:")
    pw = getpass.getpass("MODSCAG Password:")
    auth = HTTPDigestAuth(uname, pw)
    #wget -A'h8v4*snow_fraction.tif' --user=uname --password=pw
    return auth
项目:gerrit.py    作者:yumminhuang    | 项目源码 | 文件源码
def auth(self):
        """ Return credentials for current user. """
        return HTTPDigestAuth(self.username, self.password)
项目:rexploit    作者:DaniLabs    | 项目源码 | 文件源码
def run(self, data):
        port = data["port"]
        users = data["users"]
        passwords = data["passwords"]
        url = "http://{0}:{1}".format(self.ip, port)
        response = requests.get(url)
        message = ""

        if response is not None:
            if response.status_code != 401:
                message = "{} is not protected by authentication system".format(self.ip)
                return message
            else:
                if 'Basic' in response.headers['WWW-Authenticate']:
                    auth = "basic"
                elif 'Digest' in response.headers['WWW-Authenticate']:
                    auth = "digest"
                else:
                    auth = None

        for user in users:
            for password in passwords:
                try:
                    if auth == "basic":
                        response = requests.get(url, auth=HTTPBasicAuth(user, password.strip()))
                    elif auth == "digest":
                        response = requests.get(url, auth=HTTPDigestAuth(user, password.strip()))
                    else:
                        return None

                    if response.status_code == 200:
                        message += "Success!\nCredentials found\nUsername: {0} Password: {1}\n".format(user, password)
                except requests.ConnectionError:
                    return None

        if message == "":
            message = "Error. Credentials not found"

        return message
项目:ml-docker    作者:DALDEI    | 项目源码 | 文件源码
def setup_cluster(self):
        if self.couple is not None and self.couple_pass is None:
            self.couple_pass = self.adminpass
            self.couple_user = self.adminuser

        if self.boothost is None:
            self.boothost = self.host[0]
            self.host.remove(self.boothost)

        if self.ml_init(self.boothost):
            self.ml_security(self.boothost)

        conn = Connection(self.boothost, HTTPDigestAuth(self.adminuser, self.adminpass))
        self.marklogic = MarkLogic(conn)

        for hostname in self.host:
            self.ml_init(hostname)
            self.ml_join(self.boothost, hostname)

        if self.name is not None:
            print("{0}: rename cluster...".format(self.boothost))
            cluster = self.marklogic.cluster()
            cluster.set_cluster_name(self.name)
            cluster.update()

        if self.couple is not None:
            for couple in self.couple:
                print("{0}: couple with {1}...".format(self.boothost, couple))
                altconn = Connection(couple,
                                     HTTPDigestAuth(self.couple_user,
                                                    self.couple_pass))
                altml = MarkLogic(altconn)
                altcluster = altml.cluster()
                cluster = self.marklogic.cluster()
                cluster.couple(altcluster)

        print("Finished")
项目:rets    作者:opendoor-labs    | 项目源码 | 文件源码
def _get_http_auth(username: str, password: str, auth_type: str) -> AuthBase:
    if auth_type == 'basic':
        return HTTPBasicAuth(username, password)
    if auth_type == 'digest':
        return HTTPDigestAuth(username, password)
    raise RetsClientError('unknown auth type %s' % auth_type)
项目:moore    作者:UTNkar    | 项目源码 | 文件源码
def update_status(self, data=None):
        if data is None:
            if self.person_number() == '':
                return
            try:
                r = requests.get(
                    'https://register.utn.se/api.php',
                    auth=HTTPDigestAuth(settings.MEMBERSHIP_API_USER,
                                        settings.MEMBERSHIP_API_PASSWORD),
                    params={
                        'action': 'check',
                        'person_number': self.person_number().replace('-', '')
                    },
                )
                data = r.json().get('status')
            except requests.exceptions.ConnectionError:
                data = 'unknown'
            except ValueError:
                return

        if data == 'member':
            self.status = 'member'
        elif data == 'nonmember':
            if self.status in ['unknown', 'nonmember']:
                self.status = 'nonmember'
            else:
                self.status = 'alumnus'

        self.status_changed = timezone.now()
项目:filegardener    作者:smorin    | 项目源码 | 文件源码
def test_DIGEST_HTTP_200_OK_GET(self, httpbin):

        auth = HTTPDigestAuth('user', 'pass')
        url = httpbin('digest-auth', 'auth', 'user', 'pass')

        r = requests.get(url, auth=auth)
        assert r.status_code == 200

        r = requests.get(url)
        assert r.status_code == 401

        s = requests.session()
        s.auth = HTTPDigestAuth('user', 'pass')
        r = s.get(url)
        assert r.status_code == 200
项目:filegardener    作者:smorin    | 项目源码 | 文件源码
def test_DIGEST_AUTH_RETURNS_COOKIE(self, httpbin):
        url = httpbin('digest-auth', 'auth', 'user', 'pass')
        auth = HTTPDigestAuth('user', 'pass')
        r = requests.get(url)
        assert r.cookies['fake'] == 'fake_value'

        r = requests.get(url, auth=auth)
        assert r.status_code == 200
项目:filegardener    作者:smorin    | 项目源码 | 文件源码
def test_DIGEST_AUTH_SETS_SESSION_COOKIES(self, httpbin):
        url = httpbin('digest-auth', 'auth', 'user', 'pass')
        auth = HTTPDigestAuth('user', 'pass')
        s = requests.Session()
        s.get(url, auth=auth)
        assert s.cookies['fake'] == 'fake_value'
项目:filegardener    作者:smorin    | 项目源码 | 文件源码
def test_DIGEST_STREAM(self, httpbin):

        auth = HTTPDigestAuth('user', 'pass')
        url = httpbin('digest-auth', 'auth', 'user', 'pass')

        r = requests.get(url, auth=auth, stream=True)
        assert r.raw.read() != b''

        r = requests.get(url, auth=auth, stream=False)
        assert r.raw.read() == b''
项目:filegardener    作者:smorin    | 项目源码 | 文件源码
def test_DIGESTAUTH_WRONG_HTTP_401_GET(self, httpbin):

        auth = HTTPDigestAuth('user', 'wrongpass')
        url = httpbin('digest-auth', 'auth', 'user', 'pass')

        r = requests.get(url, auth=auth)
        assert r.status_code == 401

        r = requests.get(url)
        assert r.status_code == 401

        s = requests.session()
        s.auth = auth
        r = s.get(url)
        assert r.status_code == 401
项目:filegardener    作者:smorin    | 项目源码 | 文件源码
def test_DIGEST_HTTP_200_OK_GET(self, httpbin):

        auth = HTTPDigestAuth('user', 'pass')
        url = httpbin('digest-auth', 'auth', 'user', 'pass')

        r = requests.get(url, auth=auth)
        assert r.status_code == 200

        r = requests.get(url)
        assert r.status_code == 401

        s = requests.session()
        s.auth = HTTPDigestAuth('user', 'pass')
        r = s.get(url)
        assert r.status_code == 200
项目:filegardener    作者:smorin    | 项目源码 | 文件源码
def test_DIGEST_AUTH_RETURNS_COOKIE(self, httpbin):
        url = httpbin('digest-auth', 'auth', 'user', 'pass')
        auth = HTTPDigestAuth('user', 'pass')
        r = requests.get(url)
        assert r.cookies['fake'] == 'fake_value'

        r = requests.get(url, auth=auth)
        assert r.status_code == 200
项目:filegardener    作者:smorin    | 项目源码 | 文件源码
def test_DIGEST_AUTH_SETS_SESSION_COOKIES(self, httpbin):
        url = httpbin('digest-auth', 'auth', 'user', 'pass')
        auth = HTTPDigestAuth('user', 'pass')
        s = requests.Session()
        s.get(url, auth=auth)
        assert s.cookies['fake'] == 'fake_value'
项目:filegardener    作者:smorin    | 项目源码 | 文件源码
def test_DIGEST_STREAM(self, httpbin):

        auth = HTTPDigestAuth('user', 'pass')
        url = httpbin('digest-auth', 'auth', 'user', 'pass')

        r = requests.get(url, auth=auth, stream=True)
        assert r.raw.read() != b''

        r = requests.get(url, auth=auth, stream=False)
        assert r.raw.read() == b''
项目:filegardener    作者:smorin    | 项目源码 | 文件源码
def test_DIGESTAUTH_WRONG_HTTP_401_GET(self, httpbin):

        auth = HTTPDigestAuth('user', 'wrongpass')
        url = httpbin('digest-auth', 'auth', 'user', 'pass')

        r = requests.get(url, auth=auth)
        assert r.status_code == 401

        r = requests.get(url)
        assert r.status_code == 401

        s = requests.session()
        s.auth = auth
        r = s.get(url)
        assert r.status_code == 401
项目:lalascan    作者:blackye    | 项目源码 | 文件源码
def get_auth_obj(method, user, password):
    """
    Generates an authentication code object depending of method as parameter:

    * "basic"
    * "digest"
    * "ntlm"

    .. warning:
       This function may be removed in future versions of GoLismero.

    :param method: Auth method: basic, digest, ntlm.
    :type method: str

    :param user: string with user text
    :type user: str

    :param password: string with password text
    :type password: str

    :return: an object with authentication or None if error/problem.
    """
    m_auth_obj = None

    if method:

        m_method = method.lower()
        if m_method == "basic":
            m_auth_obj = HTTPBasicAuth(user, password)
        elif m_method == "digest":
            m_auth_obj = HTTPDigestAuth(user, password)
        elif m_method == "ntlm":
            m_auth_obj = HttpNtlmAuth(user, password)

    return m_auth_obj


#------------------------------------------------------------------------------
项目:isf    作者:dark-lbp    | 项目源码 | 文件源码
def target_function(self, url, user, password):
        name = threading.current_thread().name

        user = user.encode('utf-8').strip()
        password = password.encode('utf-8').strip()

        response = http_request(method="GET", url=url, auth=HTTPDigestAuth(user, password))

        if response is not None and response.status_code != 401:
            print_success("Target: {}:{} {}: Authentication Succeed - Username: '{}' Password: '{}'".format(self.target, self.port, name, user, password), verbose=self.verbosity)
            self.credentials.append((self.target, self.port, user, password))
            if self.stop_on_success:
                raise StopThreadPoolExecutor
        else:
            print_error("Target: {}:{} {}: Authentication Failed - Username: '{}' Password: '{}'".format(self.target, self.port, name, user, password), verbose=self.verbosity)
项目:isf    作者:dark-lbp    | 项目源码 | 文件源码
def target_function(self, url, creds):
        name = threading.current_thread().name
        user, password = creds
        user = user.encode('utf-8').strip()
        password = password.encode('utf-8').strip()

        response = http_request(method="GET", url=url, auth=HTTPDigestAuth(user, password))

        if response is not None and response.status_code != 401:
            print_success("Target: {}:{} {}: Authentication Succeed - Username: '{}' Password: '{}'".format(self.target, self.port, name, user, password), verbose=self.verbosity)
            self.credentials.append((self.target, self.port, user, password))
            if self.stop_on_success:
                raise StopThreadPoolExecutor
        else:
            print_error("Target: {}:{} {}: Authentication Failed - Username: '{}' Password: '{}'".format(self.target, self.port, name, user, password), verbose=self.verbosity)
项目:sdk-samples    作者:cradlepoint    | 项目源码 | 文件源码
def get_digest():
    return HTTPDigestAuth(g_dev_client_username, g_dev_client_password)


# Returns boolean to indicate if the NCOS device is
# in DEV mode
项目:FormShare    作者:qlands    | 项目源码 | 文件源码
def __init__(self, url, username, password, user):
        self.url = url
        self.user = user
        self.auth = HTTPDigestAuth(username, password)
        self.form_list_url = urljoin(self.url, 'formList')
        self.submission_list_url = urljoin(self.url, 'view/submissionList')
        self.download_submission_url = urljoin(self.url,
                                               'view/downloadSubmission')
        self.forms_path = os.path.join(
            self.user.username, 'briefcase', 'forms')
        self.resumption_cursor = 0
        self.logger = logging.getLogger('console_logger')
项目:Codeforces-Sublime-Plugin    作者:karunk    | 项目源码 | 文件源码
def test_DIGEST_HTTP_200_OK_GET(self, httpbin):

        auth = HTTPDigestAuth('user', 'pass')
        url = httpbin('digest-auth', 'auth', 'user', 'pass')

        r = requests.get(url, auth=auth)
        assert r.status_code == 200

        r = requests.get(url)
        assert r.status_code == 401

        s = requests.session()
        s.auth = HTTPDigestAuth('user', 'pass')
        r = s.get(url)
        assert r.status_code == 200
项目:Codeforces-Sublime-Plugin    作者:karunk    | 项目源码 | 文件源码
def test_DIGEST_AUTH_RETURNS_COOKIE(self, httpbin):
        url = httpbin('digest-auth', 'auth', 'user', 'pass')
        auth = HTTPDigestAuth('user', 'pass')
        r = requests.get(url)
        assert r.cookies['fake'] == 'fake_value'

        r = requests.get(url, auth=auth)
        assert r.status_code == 200
项目:Codeforces-Sublime-Plugin    作者:karunk    | 项目源码 | 文件源码
def test_DIGEST_AUTH_SETS_SESSION_COOKIES(self, httpbin):
        url = httpbin('digest-auth', 'auth', 'user', 'pass')
        auth = HTTPDigestAuth('user', 'pass')
        s = requests.Session()
        s.get(url, auth=auth)
        assert s.cookies['fake'] == 'fake_value'
项目:Codeforces-Sublime-Plugin    作者:karunk    | 项目源码 | 文件源码
def test_DIGEST_STREAM(self, httpbin):

        auth = HTTPDigestAuth('user', 'pass')
        url = httpbin('digest-auth', 'auth', 'user', 'pass')

        r = requests.get(url, auth=auth, stream=True)
        assert r.raw.read() != b''

        r = requests.get(url, auth=auth, stream=False)
        assert r.raw.read() == b''