Python getpass 模块,getpass() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用getpass.getpass()

项目:pyability    作者:syedur-rahman    | 项目源码 | 文件源码
def user_credentials_prompt():
    """ user credentials prompt """

    # user message
    usr_msg = "Please type in your router credentials."
    print(usr_msg)

    # user credential prompt
    user = input('User: ')
    user_pw = getpass.getpass('User Password: ')
    enable_pw = getpass.getpass('Enable Password: ')

    # console formatting
    print('')

    return user, user_pw, enable_pw
项目:safetyculture-sdk-python    作者:SafetyCulture    | 项目源码 | 文件源码
def get_user_api_token(logger):
    """
    Generate iAuditor API Token
    :param logger:  the logger
    :return:        API Token if authenticated else None
    """
    username = input("iAuditor username: ")
    password = getpass()
    generate_token_url = "https://api.safetyculture.io/auth"
    payload = "username=" + username + "&password=" + password + "&grant_type=password"
    headers = {
        'content-type': "application/x-www-form-urlencoded",
        'cache-control': "no-cache",
    }
    response = requests.request("POST", generate_token_url, data=payload, headers=headers)
    if response.status_code == requests.codes.ok:
        return response.json()['access_token']
    else:
        logger.error('An error occurred calling ' + generate_token_url + ': ' + str(response.json()))
        return None
项目:service-fabric-cli    作者:Azure    | 项目源码 | 文件源码
def repo_creds(username, encrypted_password, has_pass):
    """Get a representation of the container repository credentials"""
    from azure.servicefabric.models import RegistryCredential
    from getpass import getpass

    # Wonky since we allow empty string as an encrypted passphrase
    if not any([username, encrypted_password is not None, has_pass]):
        return None

    if (encrypted_password is not None) and (not username):
        raise CLIError('Missing container repository username')

    if has_pass and (not username):
        raise CLIError('Missing container repository username')

    if encrypted_password is not None:
        return RegistryCredential(registry_user_name=username,
                                  registry_password=encrypted_password,
                                  password_encrypted=True)
    elif has_pass:
        passphrase = getpass(prompt='Container repository password: ')
        return RegistryCredential(registry_user_name=username,
                                  registry_password=passphrase,
                                  password_encrypted=False)
    return RegistryCredential(registry_user_name=username)
项目:okta-awscli    作者:jmhale    | 项目源码 | 文件源码
def __init__(self, okta_profile, verbose):
        home_dir = os.path.expanduser('~')
        okta_config = home_dir + '/.okta-aws'
        parser = RawConfigParser()
        parser.read(okta_config)
        profile = okta_profile
        if parser.has_option(profile, 'base-url'):
            self.base_url = "https://%s" % parser.get(profile, 'base-url')
        else:
            print("No base-url set in ~/.okta-aws")
        if parser.has_option(profile, 'username'):
            self.username = parser.get(profile, 'username')
            if verbose:
                print("Authenticating as: %s" % self.username)
        else:
            self.username = raw_input('Enter username: ')
        if parser.has_option(profile, 'password'):
            self.password = parser.get(profile, 'password')
        else:
            self.password = getpass('Enter password: ')
        self.verbose = verbose
项目:meetup-facebook-bot    作者:Stark-Mountain    | 项目源码 | 文件源码
def bootstrap(branch='master'):
    env.sudo_password = getpass('Initial value for env.sudo_password: ')
    env.domain_name = prompt('Enter your domain name:', default='meetup_facebook_bot')
    create_permanent_folder()
    create_log_folder()
    install_postgres()
    database_url = setup_postgres(username=env.user, database_name=env.user)
    renew_ini_file(database_url)
    install_python()
    fetch_sources_from_repo(branch, PROJECT_FOLDER)
    reinstall_venv()
    install_modules()
    install_nginx()
    configure_letsencrypt_if_necessary()
    add_nginx_reload_crontab_job()
    configure_nginx_if_necessary()
    setup_ufw()
    start_systemctl_service(UWSGI_SERVICE_NAME)
    start_systemctl_service('nginx')
    run_setup_scripts()
    status()
项目:pyseeder    作者:PurpleI2P    | 项目源码 | 文件源码
def keygen(args):
    """Sub-command to generate keys"""
    if not args.cert:
        args.cert = "data/{}.crt".format(args.signer_id.replace("@", "_at_"))

    for f in [args.cert, args.private_key]: check_writable(f)

    from pyseeder.crypto import keygen

    if args.no_encryption:
        priv_key_password = None
    else:
        from getpass import getpass
        priv_key_password = getpass("Set private key password: ").encode("utf-8")

    keygen(args.cert, args.private_key, args.signer_id, priv_key_password)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def prompt_pass(name, default=None):
    """
    Grabs hidden (password) input from command line.

    :param name: prompt text
    :param default: default value if no input provided.
    """

    prompt = name + (default and ' [%s]' % default or '')
    prompt += name.endswith('?') and ' ' or ': '
    while True:
        rv = getpass.getpass(prompt)
        if rv:
            return rv
        if default is not None:
            return default
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def popPeek(server, user, port=110):

    try:
        P = poplib.POP3(server, port)
        P.user(user)
        P.pass_(getpass.getpass())
    except:
        print "Failed to connect to server."
        sys.exit(1)

    deleted = 0

    try:
        l = P.list()
        msgcount = len(l[1])
        for i in range(msgcount):
            msg = i+1
            top = P.top(msg, 0)
            for line in top[1]:
                print line
            input = raw_input("D to delete, any other key to leave message on server: ")
            if input=="D":
                P.dele(msg)
                deleted += 1
        P.quit()                
        print "%d messages deleted. %d messages left on server" % (deleted, msgcount-deleted)
    except:
        P.rset()
        P.quit()
        deleted = 0
        print "\n%d messages deleted. %d messages left on server" % (deleted, msgcount-deleted)
项目:cli    作者:sparkl    | 项目源码 | 文件源码
def login(args):
    """
    Logs in the specified user, prompting for password
    if necessary.
    """
    if not args.password:
        args.password = getpass.getpass("Password: ")

    data = {
        "email": args.user,
        "password": args.password}

    if args.token:
        data["token"] = args.token

    response = sync_request(
        args, "POST", "sse_cfg/user",
        data=data)

    if response:
        return response.json()

    raise CliException(
        "Failed to login {User}".format(
            User=args.user))
项目:cli    作者:sparkl    | 项目源码 | 文件源码
def register(args):
    """
    Registers the specified user, prompting twice for
    password if necessary.
    """
    if not args.password:
        args.password = getpass.getpass("Password: ")
        check = getpass.getpass("Repeat: ")
        if args.password != check:
            raise CliException(
                "Passwords do not match")

    response = sync_request(
        args, "POST", "sse_cfg/register",
        data={
            "email": args.user,
            "password": args.password})

    if response:
        return response.json()

    raise CliException(
        "Failed to register {User}".format(
            User=args.user))
项目:TACTIC-Handler    作者:listyque    | 项目源码 | 文件源码
def _test_resource_paths(my):
        path = my.server.get_resource_path('admin')
        # not a very accurate test
        my.assertEquals(True, 'etc/admin.tacticrc' in  path)

        paths = my.server.create_resource_paths()
        sys_login = getpass.getuser()
        dir = my.server.get_home_dir()
        is_dir_writeable = os.access(dir, os.W_OK) and os.path.isdir(dir)
        if dir and is_dir_writeable:
            dir = "%s/.tactic/etc" % dir
        else:
            if os.name == 'nt':
                dir = 'C:/sthpw/etc'
            else:
                dir = '/tmp/sthpw/etc'
        compared = '%s/%s.tacticrc' %(dir, sys_login) in paths
        my.assertEquals(True, compared)

        # since we use admin to get resource path , my.login should also be admin
        my.assertEquals('admin', my.server.get_login())
项目:python-keylime    作者:mit-ll    | 项目源码 | 文件源码
def read_private():
    global global_password
    if global_password is None:
        setpassword(getpass.getpass("Please enter the password to decrypt your keystore: "))

    if os.path.exists('private.json'):
        with open('private.json','r') as f:
            toread = json.load(f)
        key = crypto.kdf(global_password,toread['salt'])
        try:
            plain = crypto.decrypt(toread['priv'],key)
        except ValueError:
            raise Exception("Invalid password for keystore")

        return json.loads(plain),toread['salt']
    else:
        #file doesn't exist, just invent a salt
        return {'revoked_keys':[]},base64.b64encode(crypto.generate_random_key())
项目:python-keylime    作者:mit-ll    | 项目源码 | 文件源码
def init_mtls(config):
    logger.info("Setting up mTLS...")

    tls_dir = config["ca_dir"]
    if tls_dir[0]!='/':
        tls_dir = os.path.abspath('%s/%s'%(common.WORK_DIR,tls_dir))

    # We need to securely pull in the ca password 
    my_key_pw = getpass.getpass("Please enter the password to decrypt your keystore: ")
    ca_util.setpassword(my_key_pw)

    # Create HIL Server Connect certs (if not already present) 
    if not os.path.exists("%s/%s-cert.crt"%(tls_dir,config["ip"])):
        logger.info("Generating new Node Monitor TLS Certs in %s for connecting"%tls_dir)
        ca_util.cmd_mkcert(tls_dir,config["ip"])

    ca_path = "%s/cacert.crt"%(tls_dir)
    my_cert = "%s/%s-cert.crt"%(tls_dir,config["ip"])
    my_priv_key = "%s/%s-private.pem"%(tls_dir,config["ip"])

    context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
    context.load_verify_locations(cafile=ca_path)
    context.load_cert_chain(certfile=my_cert,keyfile=my_priv_key,password=my_key_pw)
    context.verify_mode = ssl.CERT_REQUIRED
    return context
项目:health-mosconi    作者:GNUHealth-Mosconi    | 项目源码 | 文件源码
def validate_password():
    passwd = getpass.getpass()
    print "Again"
    passwd2 = getpass.getpass()

    if (passwd != passwd2):
        print "Password mismatch"
        return validate_password()

    """Check against cracklib to avoid simple passwords"""
    try: 
        cracklib.VeryFascistCheck (passwd)
    except ValueError as msg:
        print msg
        return validate_password()

    return passwd
项目:questionnaire    作者:kylebebak    | 项目源码 | 文件源码
def raw(prompt, *args, **kwargs):
    """Calls input to allow user to input an arbitrary string. User can go
    back by entering the `go_back` string. Works in both Python 2 and 3.
    """
    go_back = kwargs.get('go_back', '<')
    type_ = kwargs.get('type', str)
    with stdout_redirected(sys.stderr):
        while True:
            try:
                if kwargs.get('secret', False):
                    answer = getpass.getpass(prompt)
                elif sys.version_info < (3, 0):
                    answer = raw_input(prompt)
                else:
                    answer = input(prompt)

                if answer == go_back:
                    raise QuestionnaireGoBack
                return type_(answer)
            except ValueError:
                eprint('\n`{}` is not a valid `{}`\n'.format(answer, type_))
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def changePassPhrase(options):
    if not options['filename']:
        filename = os.path.expanduser('~/.ssh/id_rsa')
        options['filename'] = raw_input('Enter file in which the key is (%s): ' % filename)
    try:
        key = keys.getPrivateKeyObject(options['filename'])
    except keys.BadKeyError, e:
        if e.args[0] != 'encrypted key with no passphrase':
            raise
        else:
            if not options['pass']:
                options['pass'] = getpass.getpass('Enter old passphrase: ')
            key = keys.getPrivateKeyObject(options['filename'], passphrase = options['pass'])
    if not options['newpass']:
        while 1:
            p1 = getpass.getpass('Enter new passphrase (empty for no passphrase): ')
            p2 = getpass.getpass('Enter same passphrase again: ')
            if p1 == p2:
                break
            print 'Passphrases do not match.  Try again.'
        options['newpass'] = p1
    open(options['filename'], 'w').write(
    keys.makePrivateKeyString(key, passphrase=options['newpass']))
    print 'Your identification has been saved with the new passphrase.'
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def getGenericAnswers(self, name, instruction, prompts):
        responses = []
        try:
            oldout, oldin = sys.stdout, sys.stdin
            sys.stdin = sys.stdout = open('/dev/tty','r+')
            if name:
                print name
            if instruction:
                print instruction
            for prompt, echo in prompts:
                if echo:
                    responses.append(raw_input(prompt))
                else:
                    responses.append(getpass.getpass(prompt))
        finally: 
            sys.stdout,sys.stdin=oldout,oldin
        return defer.succeed(responses)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def defConv(items):
    resp = []
    for i in range(len(items)):
        message, kind = items[i]
        if kind == 1: # password
            p = getpass.getpass(message)
            resp.append((p, 0))
        elif kind == 2: # text
            p = raw_input(message)
            resp.append((p, 0))
        elif kind in (3,4):
            print message
            resp.append(("", 0))
        else:
            return defer.fail('foo')
    d = defer.succeed(resp)
    return d
项目:SmoothStreamsTV-playlist    作者:stvhwrd    | 项目源码 | 文件源码
def getCredentials():
    '''prompt user for username and password'''

    colourPrint('bold',
                ('You may wish to store your credentials and server ' +
                 'preferences in this file by opening it in a text editor ' +
                 'and filling in the username, password, and server ' +
                 'fields.\nIf you choose not to do this, you will be ' +
                 'prompted for this information on each run of this script.'))

    colourPrint('yellow',
                '\nPlease enter your username for SmoothStreamsTV:')
    username = input('')
    colourPrint('green',
                '\nThank you, ' + username + '.\n')

    colourPrint('yellow',
                '\nPlease enter your password for SmoothStreamsTV:')
    password = getpass('')

    return username, password
# end getCredentials()
项目:SDK    作者:Keypirinha    | 项目源码 | 文件源码
def ask(message, ofile=sys.stderr, ifile=sys.stdin, style=Fore.MAGENTA,
        noecho=False, accept_empty=True):
    """
    Print a question on *ofile* and wait for an answer on *ifile* using
    :py:meth:`io.TextIOBase.readline`.

    *style* may be ``None`` for non-colored output (this does not override the
    behavior setup by :py:func:`enable_colors`).
    """
    if noecho and ifile != sys.stdin:
        raise ValueError("noecho option implies input from stdin")

    while True:
        with ScopedColoredStream(ofile, style, flush_on_exit=True) as stream:
            stream.write(message)

        if noecho:
            ans = getpass.getpass(prompt="", stream=ofile)
        else:
            ans = ifile.readline().rstrip("\n\r")

        if not accept_empty and not ans.strip():
            continue
        return ans
项目:NordVPN-NetworkManager    作者:Chadsr    | 项目源码 | 文件源码
def save_new_credentials(self):
        username = None
        password = None

        print("\nPlease input your NordVPN credentials:")

        while not username and not password:
            username = input("Username/Email: ")
            password = getpass.getpass("Password: ")

        if not self.config.has_section(self.SECTION_TITLE):
            self.config.add_section(self.SECTION_TITLE)

        self.config.set(self.SECTION_TITLE, 'username', username)
        self.config.set(self.SECTION_TITLE, 'password', password)
        self.save()
        self.logger.info("New credentials saved successfully!")
项目:Leics    作者:LeicsFrameWork    | 项目源码 | 文件源码
def adduser():
    """add user"""
    from getpass import getpass
    username = raw_input("\_username: ")
    email = raw_input("\_email: ")
    role_id = raw_input("\_[1:moderator 2:admin 3:user]: ")
    password = getpass("\_password: ")
    u = User(
        email = email,
        username = username,
        password = password,
        role_id = role_id
    )
    db.session.add(u)
    db.session.commit()
    print "<user %s add in database>" % username
项目:saffron    作者:Lamden    | 项目源码 | 文件源码
def deploy(self, cwd=False):
        assert not self.is_deployed, 'This contract already exists on the chain.'
        assert self.sol, 'No solidity code loaded into this object'
        response = database.insert_contract(self.name,
                                            self.abi,
                                            self.bytecode,
                                            self.gas_estimates,
                                            self.method_identifiers,
                                            cwd)
        okay = web3.personal.Personal(self.web3)
        options = 'Unlock: \n' + '\n'.join([' '.join([str(i),':',x]) for i, x in enumerate(okay.listAccounts)]) + '\n'
        self.defaultAccount = okay.listAccounts[int(input(options))]
        result = okay.unlockAccount(self.defaultAccount, getpass.getpass('\nPassword:'), 5000)
        if result:
            self.address = self.web3.eth.sendTransaction(transaction={'data' : '0x' + self.bytecode, 'from': self.defaultAccount, 'gaslimit': 30000})
            self.instance = self.web3.eth.contract(self.address)
        else:
            raise Exepction('unable to unlock account')
        #update the deployed and address to the db and an instance for pulling and interacting with the contract again
        return update_contract(json.dumps(self.address), self.method_identifiers, self.name)
项目:SDK    作者:ZoomEye    | 项目源码 | 文件源码
def zoomeye_api_test():
    zoomeye = ZoomEye()
    zoomeye.username = raw_input('ZoomEye Username: ')
    zoomeye.password = getpass.getpass(prompt='ZoomEye Password: ')
    zoomeye.login()
    print(zoomeye.resources_info())

    data = zoomeye.dork_search('solr')
    show_site_ip(data)

    data = zoomeye.dork_search('country:cn')
    show_site_ip(data)

    data = zoomeye.dork_search('solr country:cn')
    show_site_ip(data)

    data = zoomeye.dork_search('solr country:cn', resource='host')
    show_ip_port(data)
项目:zeronet-debian    作者:bashrc    | 项目源码 | 文件源码
def siteSign(self, address, privatekey=None, inner_path="content.json", publish=False):
        from Site import Site
        logging.info("Signing site: %s..." % address)
        site = Site(address, allow_create=False)

        if not privatekey:  # If no privatekey definied
            from User import UserManager
            user = UserManager.user_manager.get()
            if user:
                site_data = user.getSiteData(address)
                privatekey = site_data.get("privatekey")
            else:
                privatekey = None
            if not privatekey:
                # Not found in users.json, ask from console
                import getpass
                privatekey = getpass.getpass("Private key (input hidden):")
        succ = site.content_manager.sign(inner_path=inner_path, privatekey=privatekey, update_changed_files=True)
        if succ and publish:
            self.sitePublish(address, inner_path=inner_path)
项目:artman    作者:googleapis    | 项目源码 | 文件源码
def _configure_github(github):
    """Determine and return the GitHub configuration.

    Args:
        github (dict): The current GitHub configuration.

    Returns:
        dict: The new GitHub configuration.
    """
    answer = copy(github)
    logger.info('Since you intend to publish to GitHub, you need to '
                'supply credentials.')
    logger.info('Create an access token at: '
                'https://github.com/settings/tokens')
    logger.info('It needs the "repo" scope and nothing else.')
    while not answer.get('username'):
        answer['username'] = six.moves.input('GitHub username: ')
    while not answer.get('token'):
        answer['token'] = getpass.getpass('GitHub token (input is hidden): ')
    return answer
项目:DualisWatcher    作者:LucaVazz    | 项目源码 | 文件源码
def interactively_acquire_token(self) -> str:
        """
        Walks the user through executing a login into the Dualis-System to get the Token and saves it.
        @return: The Token for Dualis.
        """
        print('[The following Input is not saved, it is only used temporarily to generate a login token.]')

        token = None
        while token is None:
            dualis_username = input('Username for Dualis:   ')
            dualis_password = getpass('Password for Dualis [no output]:   ')
            try:
                token = login_helper.obtain_login_token(dualis_username, dualis_password)
            except RequestRejectedError as error:
                print('Login Failed! (%s) Please try again.' % (error))
            except (ValueError, RuntimeError) as error:
                print('Error while communicating with the Dualis System! (%s) Please try again.' % (error))

        self.config_helper.set_property('token', token)

        return token
项目:python-bileanclient    作者:openstack    | 项目源码 | 文件源码
def get_password(max_password_prompts=3):
    """Read password from TTY."""
    verify = strutils.bool_from_string(env("OS_VERIFY_PASSWORD"))
    pw = None
    if hasattr(sys.stdin, "isatty") and sys.stdin.isatty():
        # Check for Ctrl-D
        try:
            for __ in moves.range(max_password_prompts):
                pw1 = getpass.getpass("OS Password: ")
                if verify:
                    pw2 = getpass.getpass("Please verify: ")
                else:
                    pw2 = pw1
                if pw1 == pw2 and pw1:
                    pw = pw1
                    break
        except EOFError:
            pass
    return pw
项目:MusicBot    作者:BjoernPetersen    | 项目源码 | 文件源码
def request_secret(secret_key, message, hidden=True):
    """
    Request a secret from the user. Save the secrets to disk afterwards.
    If there is already a secret for the key, return it.
    :param secret_key: the key the input should be stored under in the secrets
    :param message: the message to show to the user
    :param hidden: hide the input (recommended for passwords and such)
    :return the secret
    """
    if secret_key in _secrets:
        return _secrets[secret_key]

    if hidden:
        secret = getpass(message).strip()
    else:
        secret = input(message).strip()

    _secrets[secret_key] = secret
    save_secrets()

    return secret
项目:txt2evernote    作者:Xunius    | 项目源码 | 文件源码
def rawInput(message, isPass=False):
    if isPass:
        data = getpass.getpass(message)
    else:
        data = raw_input(message)
    return tools.stdinEncode(data)
项目:rust_pypi_example    作者:mckaymatt    | 项目源码 | 文件源码
def main(args):
    public_key = fetch_public_key(args.repo)
    password = args.password or getpass('PyPI password: ')
    update_travis_deploy_password(encrypt(public_key, password.encode()))
    print("Wrote encrypted password to .travis.yml -- you're ready to deploy")
项目:HTP    作者:nklose    | 项目源码 | 文件源码
def login(self):
        db = Database()
        valid_creds = False
        attempts = 1
        while not valid_creds:
            username = raw_input('\n\tUsername: ')
            password = getpass('\tPassword: ')

            sql = 'SELECT password FROM users WHERE username = %s'
            password_hash = db.get_query(sql, [username])

            if attempts > 4:
                gc.error('\nDisconnecting after 5 failed login attempts.')
                exit()
            elif len(password_hash) == 0 or not gc.check_hash(password, password_hash[0][0]):
                time.sleep(2)
                gc.error('\nInvalid credentials. Please try again.')
                attempts += 1
            else:
                gc.success('\nCredentials validated. Logging in...')
                self.name = username
                self.lookup()
                valid_creds = True

        # close database
        db.close()
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def handle_401(self, resp, **kwargs):
        # We only care about 401 responses, anything else we want to just
        #   pass through the actual response
        if resp.status_code != 401:
            return resp

        # We are not able to prompt the user so simply return the response
        if not self.prompting:
            return resp

        parsed = urllib_parse.urlparse(resp.url)

        # Prompt the user for a new username and password
        username = six.moves.input("User for %s: " % parsed.netloc)
        password = getpass.getpass("Password: ")

        # Store the new username and password to use for future requests
        if username or password:
            self.passwords[parsed.netloc] = (username, password)

        # Consume content and release the original connection to allow our new
        #   request to reuse the same one.
        resp.content
        resp.raw.release_conn()

        # Add our new username and password to the request
        req = HTTPBasicAuth(username or "", password or "")(resp.request)

        # Send our new request
        new_resp = resp.connection.send(req, **kwargs)
        new_resp.history.append(resp)

        return new_resp
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def finalize_options(self):
        orig.upload.finalize_options(self)
        self.username = (
            self.username or
            getpass.getuser()
        )
        # Attempt to obtain password. Short circuit evaluation at the first
        # sign of success.
        self.password = (
            self.password or
            self._load_password_from_keyring() or
            self._prompt_for_password()
        )
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def _prompt_for_password(self):
        """
        Prompt for a password on the tty. Suppress Exceptions.
        """
        try:
            return getpass.getpass()
        except (Exception, KeyboardInterrupt):
            pass
项目:calendary    作者:DavidHickman    | 项目源码 | 文件源码
def main(args):
    public_key = fetch_public_key(args.repo)
    password = args.password or getpass('PyPI password: ')
    update_travis_deploy_password(encrypt(public_key, password.encode()))
    print("Wrote encrypted password to .travis.yml -- you're ready to deploy")
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def handle_401(self, resp, **kwargs):
        # We only care about 401 responses, anything else we want to just
        #   pass through the actual response
        if resp.status_code != 401:
            return resp

        # We are not able to prompt the user so simply return the response
        if not self.prompting:
            return resp

        parsed = urllib_parse.urlparse(resp.url)

        # Prompt the user for a new username and password
        username = six.moves.input("User for %s: " % parsed.netloc)
        password = getpass.getpass("Password: ")

        # Store the new username and password to use for future requests
        if username or password:
            self.passwords[parsed.netloc] = (username, password)

        # Consume content and release the original connection to allow our new
        #   request to reuse the same one.
        resp.content
        resp.raw.release_conn()

        # Add our new username and password to the request
        req = HTTPBasicAuth(username or "", password or "")(resp.request)

        # Send our new request
        new_resp = resp.connection.send(req, **kwargs)
        new_resp.history.append(resp)

        return new_resp
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def finalize_options(self):
        orig.upload.finalize_options(self)
        self.username = (
            self.username or
            getpass.getuser()
        )
        # Attempt to obtain password. Short circuit evaluation at the first
        # sign of success.
        self.password = (
            self.password or
            self._load_password_from_keyring() or
            self._prompt_for_password()
        )
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def _prompt_for_password(self):
        """
        Prompt for a password on the tty. Suppress Exceptions.
        """
        try:
            return getpass.getpass()
        except (Exception, KeyboardInterrupt):
            pass
项目:vsphere_ansible_inventory    作者:mikesimos    | 项目源码 | 文件源码
def main():

    # - Get command line args and config args.
    args = get_args()
    (filters, cache_path, cache_ttl, vcenter_host, vsphere_user, vsphere_pass, vsphere_port, cert_check) \
        = parse_config()

    # - Override settings with arg parameters if defined
    if not args.password:
        if not vsphere_pass:
            import getpass
            vsphere_pass = getpass.getpass()
        setattr(args, 'password', vsphere_pass)
    if not args.username:
        setattr(args, 'username', vsphere_user)
    if not args.hostname:
        setattr(args, 'hostname', vcenter_host)
    if not args.port:
        setattr(args, 'port', vsphere_port)
    if not args.no_cert_check:
        setattr(args, 'cert_check', cert_check)
    else:
        setattr(args, 'cert_check', False)

    # - Perform requested operations (list, host/guest, reload cache)
    if args.host or args.guest:
        print ('{}')
        exit(0)
    elif args.list or args.reload_cache:
        v = VSphere(args.hostname, args.username, args.password, vsphere_port=443, cert_check=args.cert_check)
        data = v.cached_inventory(filters, cache_path=cache_path, cache_ttl=cache_ttl, refresh=args.reload_cache)
        print ("{}".format(dumps(data)))
        exit(0)
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def handle(self, *args, **options):
        if options['source_db_password'] is None:
            options['source_db_password'] = getpass('Database Password: ')

        instance = Controller.prepare(options['pool']).instance
        converter = get_converter(
            options['source_format'], instance=instance, db_name=options['source_db_name'],
            db_type=options['source_db_type'], db_user=options['source_db_username'],
            db_port=options['source_db_port'], db_password=options['source_db_password'],
            db_host=options['source_db_host'], prefix=options['source_db_prefix']
        )

        instance.loop.run_until_complete(self.convert(instance, converter))
项目:BitBot    作者:crack00r    | 项目源码 | 文件源码
def __init__(self, session_user_id, user_phone, api_id, api_hash,
                 proxy=None):
        print_title('Initialization')

        print('Initializing interactive example...')
        super().__init__(session_user_id, api_id, api_hash, proxy)

        # Store all the found media in memory here,
        # so it can be downloaded if the user wants
        self.found_media = set()

        print('Connecting to Telegram servers...')
        if not self.connect():
            print('Initial connection failed. Retrying...')
            if not self.connect():
                print('Could not connect to Telegram servers.')
                return

        # Then, ensure we're authorized and have access
        if not self.is_user_authorized():
            print('First run. Sending code request...')
            self.send_code_request(user_phone)

            self_user = None
            while self_user is None:
                code = input('Enter the code you just received: ')
                try:
                    self_user = self.sign_in(user_phone, code)

                # Two-step verification may be enabled
                except SessionPasswordNeededError:
                    pw = getpass('Two step verification is enabled. '
                                 'Please enter your password: ')

                    self_user = self.sign_in(password=pw)
项目:Facebook-Photos-Download-Bot    作者:NalinG    | 项目源码 | 文件源码
def __call__(self, parser, namespace, values, option_string):
        if values is None:
            values = getpass.getpass()

        setattr(namespace, self.dest, values)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def prompt_user_passwd(self, host, realm):
        """Override this in a GUI environment!"""
        import getpass
        try:
            user = raw_input("Enter username for %s at %s: " % (realm,
                                                                host))
            passwd = getpass.getpass("Enter password for %s in %s at %s: " %
                (user, realm, host))
            return user, passwd
        except KeyboardInterrupt:
            print
            return None, None


# Utility functions
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def get_api_key(cb, parser, args):
    if not args.password:
        password = getpass.getpass("Password for {0}: ".format(args.username))
    else:
        password = args.password

    print("API token for user {0}: {1}".format(args.username, get_api_token(cb.credentials.url,
                                                                            args.username, password,
                                                                            verify=cb.credentials.ssl_verify)))
项目:tokenize-uk    作者:lang-uk    | 项目源码 | 文件源码
def main(args):
    public_key = fetch_public_key(args.repo)
    password = args.password or getpass('PyPI password: ')
    update_travis_deploy_password(encrypt(public_key, password.encode()))
    print("Wrote encrypted password to .travis.yml -- you're ready to deploy")
项目:libcloud.api    作者:tonybaloney    | 项目源码 | 文件源码
def main(args):
    public_key = fetch_public_key(args.repo)
    password = args.password or getpass('PyPI password: ')
    update_travis_deploy_password(encrypt(public_key, password.encode()))
    print("Wrote encrypted password to .travis.yml -- you're ready to deploy")
项目:meetup-facebook-bot    作者:Stark-Mountain    | 项目源码 | 文件源码
def deploy(branch='master'):
    update_dependencies = confirm('Update dependencies?')
    print('OK, deploying branch %s' % branch)
    env.sudo_password = getpass('Initial value for env.sudo_password: ')
    fetch_sources_from_repo(branch, PROJECT_FOLDER)
    if update_dependencies:
        reinstall_venv()
        install_modules()
    start_systemctl_service(UWSGI_SERVICE_NAME)
    start_systemctl_service('nginx')
    status()
项目:meetup-facebook-bot    作者:Stark-Mountain    | 项目源码 | 文件源码
def status():
    if env.sudo_password is None:
        env.sudo_password = getpass('Initial value for env.sudo_password: ')
    sudo('systemctl status %s' % UWSGI_SERVICE_NAME)