Python prompt_toolkit 模块,prompt() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用prompt_toolkit.prompt()

项目:Slack-Gitsin    作者:yasintoy    | 项目源码 | 文件源码
def post_message(self, channel_name):
        # self.channels_history(channel_name)
        os.system("echo '\u001b[1m\u001b[31m  To mention a user write @ while chatting \u001b[0m'")
        text = prompt("your message > ", completer=WordCompleter(users))
        channel_id = self.find_channel_id(channel_name)
        url = "https://slack.com/api/chat.postMessage?token={token}&channel={channel_id}&text={text}&as_user=true&link_names=1".format(
            token=settings.token,
            text=text,
            channel_id=channel_id)
        response = requests.get(url).json()
        # TODO : retrieve message history and print to screen while chatting
        if response["ok"]:
            os.system("figlet 'Sent' | lolcat")
            time.sleep(2)
            os.system("clear")
        else:
            print "something goes wrong :( (\u001b[1m\u001b[31m " + response["error"] + "\u001b[0m)"
项目:Slack-Gitsin    作者:yasintoy    | 项目源码 | 文件源码
def channels_invite(self, channel_name):
        channel_id = self.find_channel_id(channel_name)
        invites = prompt("send invites -> ", completer=WordCompleter(users),
                         style=DocumentStyle)
        for i in invites.split(" "):
            user_id = self.find_user_id(i.strip("@"))
            url = "https://slack.com/api/channels.invite?token={token}&channel={channel_id}&user={user}".format(
                token=settings.token,
                channel_id=channel_id,
                user=user_id)
            response = requests.get(url).json()
            if response["ok"]:
                os.system("figlet 'Invited " + i + "' | lolcat")
                time.sleep(2)
                os.system("clear")
            else:
                print "something goes wrong :( (\u001b[1m\u001b[31m " + response["error"] + "\u001b[0m)"
项目:Slack-Gitsin    作者:yasintoy    | 项目源码 | 文件源码
def channels_create(self):
        os.system("clear")
        fields = ["name", "purpose(OPTINAL)", "send invite"]
        for i in fields:
            if i == "name":
                name = raw_input("\u001b[1m\u001b[31m channel name -> \u001b[0m")
            elif i == "purpose(OPTINAL)":
                purpose = raw_input("\u001b[1m\u001b[31m purpose of the channel(OPTINAL) : \u001b[0m")
            else:
                invites = prompt("send invites -> ", completer=WordCompleter(users),
                                 style=DocumentStyle)

        url = "https://slack.com/api/channels.create?token={token}&name={name}&purpose={purpose}".format(
            token=settings.token,
            name=name,
            purpose=purpose)
        response = requests.get(url).json()
        # TODO :  send channel and user_id to channels_invite
        if response["ok"]:
            os.system("figlet 'Created' | lolcat")
            time.sleep(2)
            os.system("clear")
项目:Slack-Gitsin    作者:yasintoy    | 项目源码 | 文件源码
def main():
    """ 
         Start the Slack Client 
    """
    os.system("clear; figlet 'Slack Gitsin' | lolcat")
    history = FileHistory(os.path.expanduser("~/.slackHistory"))
    while True:
        text = prompt("slack> ", history=history,
                      auto_suggest=AutoSuggestFromHistory(),
                      on_abort=AbortAction.RETRY,
                      style=DocumentStyle,
                      completer=Completer(fuzzy_match=False,
                                          text_utils=TextUtils()),
                      complete_while_typing=Always(),
                      get_bottom_toolbar_tokens=get_bottom_toolbar_tokens,
                      key_bindings_registry=manager.registry,
                      accept_action=AcceptAction.RETURN_DOCUMENT
        )
        slack = Slack(text)
        slack.run_command()
项目:azure-cli-shell    作者:Azure    | 项目源码 | 文件源码
def config_settings(event):
    """ opens the configuration """
    global PROMPTING
    telemetry.track_key('F1')

    PROMPTING = True
    config = azclishell.configuration.CONFIGURATION
    answer = ""
    questions = {
        "Do you want command descriptions" : "command_description",
        "Do you want parameter descriptions" : "param_description",
        "Do you want examples" : "examples"
    }
    for question in questions:
        while answer.lower() != 'y' and answer.lower() != 'n':
            answer = prompt(u'\n%s (y/n): ' % question)
        config.set_val('Layout', questions[question], format_response(answer))
        answer = ""

    PROMPTING = False
    print("\nPlease restart shell for changes to take effect.\n\n")
    event.cli.set_return_value(event.cli.current_buffer)
项目:bptc_wallet    作者:ceddie    | 项目源码 | 文件源码
def _process_input(self):
        input_ = prompt('> ', get_bottom_toolbar_tokens=self._get_toolbar,
                        style=self.style, history=self.history,
                        completer=self.completer, complete_while_typing=False,
                        key_bindings_registry=self.registry)
        input_ = input_.split(' ')
        cmd = input_[0]
        args = input_[1:]
        if cmd == '':
            return
        try:
            args = self.parser.parse_args(input_)
            result = getattr(self, 'cmd_{}'.format(cmd))(args)
            if result:
                print(result)
        except SystemExit:
            pass
项目:Liljimbo-Chatbot    作者:chrisjim316    | 项目源码 | 文件源码
def interactive(self, context=None):
        """Runs interactive command line chat between user and bot. Runs
        indefinitely until EOF is entered to the prompt.

        context -- optional initial context. Set to {} if omitted
        """
        if context is None:
            context = {}

        # input/raw_input are not interchangeable between Python 2 and 3
        try:
            input_function = raw_input
        except NameError:
            input_function = input

        history = InMemoryHistory()
        while True:
            try:
                message = prompt(INTERACTIVE_PROMPT, history=history, mouse_support=True).rstrip()
            except (KeyboardInterrupt, EOFError):
                return
            print(self.message(message, context))
项目:neo-python    作者:CityOfZion    | 项目源码 | 文件源码
def BootstrapBlockchain():

    current_chain_dir = settings.LEVELDB_PATH

    bootstrap_file = settings.BOOTSTRAP_FILE

    if bootstrap_file is None:
        print("no bootstrap file specified.  Please update your configuration file.")
        sys.exit(0)

    print("This will overwrite any data currently in %s.\nType 'confirm' to continue" % current_chain_dir)

    confirm = prompt("[confirm]> ", is_password=False)

    if confirm == 'confirm':
        return do_bootstrap()

    print("bootstrap cancelled")
    sys.exit(0)
项目:indy-node    作者:hyperledger    | 项目源码 | 文件源码
def _migrate_legacy_app_data_if_just_upgraded_and_user_agrees():
        if is_cli_base_dir_untouched() and legacy_base_dir_exists():
            print('Application data from previous Indy version has been found')
            answer = prompt('Do you want to migrate it? [Y/n] ')

            if not answer or answer.upper().startswith('Y'):
                try:
                    combined_migration.migrate()
                    # Invalidate config caches to pick up overridden config
                    # parameters from migrated application data
                    invalidate_config_caches()
                    print('Application data has been migrated')

                except Exception as e:
                    print('Error occurred when trying to migrate'
                          ' application data: {}'.format(e))
                    traceback.print_exc()
                    print('Application data has not been migrated')

            else:
                print('Application data was not migrated')
项目:temci    作者:parttimenerd    | 项目源码 | 文件源码
def prompt_yesno(msg: str, default: bool = None, meta_dict: t.Dict[str, str] = None) -> bool:
    """
    Prompt for simple yes or no decision.

    :param msg: asked question
    :param default: default value
    :param meta_dict: mapping 'yes' or 'no' to further explanations
    :return: user input converted to bool
    """
    valid_words = ["yes", "no", "y", "n"]
    if default is not None:
        msg += "[" + ("y" if default else "n") + "] "
        valid_words.append("")
    completer = WordCompleter(["yes", "no"], ignore_case=True, meta_dict=meta_dict)
    text = prompt(msg, completer=completer, display_completions_in_columns=True,
                  validator=WordValidator(valid_words, ignore_case=True))
    if text == "":
        return default
    return text.lower().startswith("y")
项目:temci    作者:parttimenerd    | 项目源码 | 文件源码
def default_prompt(msg: str, default: t.Optional = None, **kwargs):
    """
    Wrapper around prompt that shows a nicer prompt with a default value that isn't editable.
    Interpretes the empty string as "use default value".

    :param msg: message
    :param default: default value
    :param kwargs: arguments passed directly to the prompt function
    :return: user input
    """
    msg = message(msg, default)
    if default is not None and "validator" in kwargs:
        vali = kwargs["validator"]
        if isinstance(vali, TypeValidator):
            vali.allow_empty = True
        if isinstance(vali, WordValidator):
            vali.allow_empty = True
    res = prompt(msg, **kwargs)
    if res == "" and default is not None:
        return default
    return res
项目:temci    作者:parttimenerd    | 项目源码 | 文件源码
def prompt_attributes_dict(default_description: str = None) -> t.Dict[str, str]:
    """
    Prompts for the contents of the attributes dict.

    :param default_description: default value for the description attribute
    :return: attributes dict
    """
    attributes = {}
    descr_msg = "Give a description for the current block: "
    if default_description is not None:
        attributes["description"] = default_prompt(descr_msg, default_description,
                                                   completer=WordCompleter([default_description]))
    else:
        attributes["description"] = prompt(descr_msg, validator=NonEmptyValidator())
    try:
        while prompt_yesno("Do you want to set or add another attribute? ", default=False):
            name = prompt("Attribute name: ", validator=NonEmptyValidator(),
                          completer=WordCompleter(sorted(list(attributes.keys())), meta_dict=attributes))
            default = attributes[name] if name in attributes else ""
            attributes[name] = prompt("Attribute value: ", default=default, validator=NonEmptyValidator())
    except KeyboardInterrupt:
        pass
    return attributes
项目:temci    作者:parttimenerd    | 项目源码 | 文件源码
def prompt_rusage_exec_dict(run_dict: dict) -> dict:
    """
    Prompt for the config of the rusage exec runner.

    :param run_dict: run config dict (without the runner part)
    :return: runner config
    """
    runner_dict = {}
    default_props = ", ".join(RusageExecRunner.misc_options["properties"].get_default())

    class RusagePropertiesValidator(Validator):

        def validate(self, document: Document):
            vals = [elem.strip() for elem in document.text.split(",")]
            ret = verbose_isinstance(vals, ValidRusagePropertyList())
            if not ret:
                raise ValidationError(message=str(ret), cursor_position=len(document.text))

    props = prompt("Which properties should be obtained from getrusage(1)? ",
                   validator=RusagePropertiesValidator(), default=default_props,
                   completer=WordCompleter(sorted(list(set(get_av_rusage_properties().keys()))),
                                           meta_dict=get_av_rusage_properties(), ignore_case=False, WORD=True))
    runner_dict["properties"] = [prop.strip() for prop in props.split(",")]
    return runner_dict
项目:temci    作者:parttimenerd    | 项目源码 | 文件源码
def prompt_time_exec_dict(run_dict: dict) -> dict:
    """
    Prompt for the config of the time exec runner.

    :param run_dict: run config dict (without the runner part)
    :return: runner config
    """
    runner_dict = {}
    default_props = ", ".join(TimeExecRunner.misc_options["properties"].get_default())

    class TimePropertiesValidator(Validator):

        def validate(self, document: Document):
            vals = [elem.strip() for elem in document.text.split(",")]
            ret = verbose_isinstance(vals, ValidTimePropertyList())
            if not ret:
                raise ValidationError(message=str(ret), cursor_position=len(document.text))

    props = prompt("Which properties should be obtained from gnu time? ",
                   validator=TimePropertiesValidator(), default=default_props,
                   completer=WordCompleter(sorted(list(set(get_av_time_properties().keys()))),
                                           meta_dict=get_av_rusage_properties(), ignore_case=False, WORD=True))
    runner_dict["properties"] = [prop.strip() for prop in props.split(",")]
    return runner_dict
项目:n4    作者:technige    | 项目源码 | 文件源码
def read(self):
        if self.multi_line:
            self.multi_line = False
            return prompt(u"", multiline=True, **self.prompt_args)

        def get_prompt_tokens(_):
            tokens = []
            if self.tx is None:
                tokens.append((Token.Prompt, "\n-> "))
            else:
                tokens.append((Token.Prompt, "\n-("))
                tokens.append((Token.TxCounter, "{}".format(self.tx_counter)))
                tokens.append((Token.Prompt, ")-> "))
            return tokens

        return prompt(get_prompt_tokens=get_prompt_tokens, **self.prompt_args)
项目:azure-cli    作者:Azure    | 项目源码 | 文件源码
def config_settings(event):
    """ opens the configuration """
    global PROMPTING
    shell_telemetry.track_key('F1')

    PROMPTING = True
    config = azclishell.configuration.CONFIGURATION
    answer = ""
    questions = {
        "Do you want command descriptions": "command_description",
        "Do you want parameter descriptions": "param_description",
        "Do you want examples": "examples"
    }
    for question in questions:
        while answer.lower() != 'y' and answer.lower() != 'n':
            answer = prompt(u'\n%s (y/n): ' % question)
        config.set_val('Layout', questions[question], format_response(answer))
        answer = ""

    PROMPTING = False
    print("\nPlease restart the interactive mode for changes to take effect.\n\n")
    event.cli.set_return_value(event.cli.current_buffer)
项目:meetup-utilities    作者:igauravsehrawat    | 项目源码 | 文件源码
def auto_complete_prompt(allowed_extensions, file_type):
    os_walk = os.walk(".")
    all_files = []
    for cur_dir, child_dirs, files in os_walk:
        if cur_dir == ".":
            for file in files:
                file_name, extension = os.path.splitext(file)
                if (extension in allowed_extensions):
                    all_files.append(file)
            break
    auto_complete_files = WordCompleter(all_files, ignore_case=True)
    video_file_name = prompt(
        "Which {0} file you like to trim: ".format(file_type),
        completer=auto_complete_files,
        complete_while_typing=True)
    return video_file_name
项目:meetup-utilities    作者:igauravsehrawat    | 项目源码 | 文件源码
def trim_the_video():
    allowed_extensions = [".mp4", ".mov", ".webm"]
    video_file_name = auto_complete_prompt(allowed_extensions, "video")
    start_time = prompt("Start time for the video: ")
    end_time = prompt("End time for the video: ")
    file_name, extension = os.path.splitext(video_file_name)
    if extension not in allowed_extensions:
        print("The file you specified isn't supported at the moment.")
        print("Exiting now.")
        exit()
    command = (
        "ffmpeg -i {0} -r 24 -ss {1} -to {2} trimmed-{0}.mp4"
    ).format(video_file_name, start_time, end_time, video_file_name)

    args = shlex.split(command)
    with Halo(text="Generating preface video...", spinner='earth'):
        subprocess.run(args)
    return "trimmed-{0}.mp4".format(video_file_name)
项目:papis    作者:alejandrogallo    | 项目源码 | 文件源码
def confirm(prompt, yes=True):
    """Confirm with user input

    :param prompt: Question or text that the user gets.
    :type  prompt: str
    :param yes: If yes should be the default.
    :type  yes: bool
    :returns: True if go ahead, False if stop
    :rtype:  bool

    """
    import prompt_toolkit
    result = prompt_toolkit.prompt(
        prompt + ' (%s): ' % ('Y/n' if yes else 'y/N')
    )
    if yes:
        return result not in ['N', 'n']
    else:
        return result not in ['Y', 'y']
项目:papis    作者:alejandrogallo    | 项目源码 | 文件源码
def input(prompt, default=""):
    """Prompt user for input

    :param prompt: Question or text that the user gets.
    :type  prompt: str
    :param default: Default value to give if the user does not input anything
    :type  default: str
    :returns: User input or default
    :rtype:  bool

    """
    import prompt_toolkit
    result = prompt_toolkit.prompt(
        prompt + ' (%s): ' % (default)
    )
    return result if result else default
项目:baseline    作者:dpressel    | 项目源码 | 文件源码
def tagger_repl(tagger, **kwargs):
    mxlen = int(kwargs.get('mxlen', 100))
    maxw = int(kwargs.get('maxw', 100))
    #zeropad = int(kwargs.get('zeropad', 0))
    prompt_name = kwargs.get('prompt', 'class> ')
    history_file = kwargs.get('history_file', '.history')
    history = FileHistory(history_file)

    while True:
        text = prompt(prompt_name, history=history)
        text = text.strip()
        if text == 'quit':
            break
        try:
            tokens = text.split(' ')
            best = tagger.predict_text(tokens, mxlen=mxlen, maxw=maxw)
            print(best)
        except Exception as e:
            logging.exception('Error')
项目:bothub-cli    作者:bothub-studio    | 项目源码 | 文件源码
def test(self):
        self._load_auth()
        history = FileHistory('.history')

        project_id = self._get_current_project_id()
        bot = self._load_bot()

        while True:
            try:
                line = prompt('BotHub> ', history=history)
                if not line:
                    continue
                event = make_event(line)
                context = {}
                bot.handle_message(event, context)
            except (EOFError, KeyboardInterrupt):
                break
            except Exception:
                traceback.print_exc()
项目:pygcam    作者:JGCRI    | 项目源码 | 文件源码
def askYesNo(msg, default=None):
    default = default and default.lower()
    y = 'Y' if default == 'y' else 'y'
    n = 'N' if default == 'n' else 'n'
    prompt = msg + ' (%s/%s)? ' % (y, n)

    value = None
    while value is None:
        value = input(prompt).lower()
        if value == '' and default:
            return default == 'y'

        if value not in ('y', 'n', 'yes', 'no'):
            value = None

    return value in ('y', 'yes')
项目:pocket2quiver    作者:AnsonT    | 项目源码 | 文件源码
def prompt_path_if_none(arguments, key, key_desc, force_prompt=False, extension=""):
  value = arguments[key] or jkv.get(key, None)
  if not value or force_prompt:
    confirm = False
    while not confirm:
      confirm = True
      value = prompt(
        message=key_desc+": ", 
        validator=NotEmptyValidator(error=key_desc+' required'), 
        completer=PathCompleter(
          expanduser=True,
          file_filter=has_extension(extension) ))
      value = abspath(expanduser(value))
      root, ext = splitext(value)
      value = root + extension
      print(value)
      if not exists(value):
        confirm = prompt_yn('Create {}? (Y/n): '.format(value))
  jkv[key] = value
  return value
项目:objection    作者:sensepost    | 项目源码 | 文件源码
def set_prompt_tokens(self, device_info: tuple) -> None:
        """
            Set prompt tokens sourced from a command.device.device_info()
            call.

            :param device_info:
            :return:
        """

        device_name, system_name, model, system_version = device_info

        self.prompt_tokens = [
            (Token.Applicationname, device_name),
            (Token.On, ' on '),
            (Token.Devicetype, '(' + model + ': '),
            (Token.Version, system_version + ') '),
            (Token.Connection, '[' + state_connection.get_comms_type_string() + '] # '),
        ]
项目:objection    作者:sensepost    | 项目源码 | 文件源码
def get_prompt_tokens(self, _) -> list:
        """
            Return prompt tokens to use in the cli app.

            If none were set during the init of this class, it
            is assumed that the connection failed.

            :param _:
            :return:
        """

        if self.prompt_tokens:
            return self.prompt_tokens

        return [
            (Token.Applicationname, 'unknown application'),
            (Token.On, ''),
            (Token.Devicetype, ''),
            (Token.Version, ' '),
            (Token.Connection, '[' + state_connection.get_comms_type_string() + '] # '),
        ]
项目:uhu    作者:updatehub    | 项目源码 | 文件源码
def _get_object_option_value_message(option, set_=None):
    """Retuns a message for object_option_value prompt."""
    if option.default is not None:
        default_msg = option.default
        if option.type_name == 'boolean':
            if default_msg:
                default_msg = 'Y/n'
            else:
                default_msg = 'y/N'
        msg = '{} [{}]'.format(option.verbose_name.title(), default_msg)
    else:
        msg = '{}'.format(option.verbose_name.title())
    set_msg = ''
    if set_ is not None:
        set_msg = ' (installation set {})'.format(set_)
    msg = '{}{}: '.format(msg, set_msg)
    return msg


# pylint: disable=too-many-arguments
项目:Slack-Gitsin    作者:yasintoy    | 项目源码 | 文件源码
def file_upload(self, channel_name):
        os.system("clear")
        channel_id = self.find_channel_id(channel_name)
        fields = ["file", "content", "filename", "title", "initial_comment"]
        for i in fields:
            if i == "file":
                os.system("echo 'opening the file dialog. wait...' ")
                file = subprocess.check_output(['zenity', '--file-selection'])
                os.system("echo '\u001b[1m\u001b[31m file : \u001b[0m'" + file + "'")
            elif i == "content":
                content = raw_input("\u001b[1m\u001b[31m content : \u001b[0m")
            elif i == "filename":
                filename = raw_input("\u001b[1m\u001b[31m filename : \u001b[0m")
            elif i == "title":
                title = raw_input("\u001b[1m\u001b[31m title : \u001b[0m")
            else:
                initial_comment = prompt("add comment : ", completer=WordCompleter(users),
                                         style=DocumentStyle)
        url = "https://slack.com/api/files.upload?token={token}&content={content}&filename={filename}&channels={channel_id}&title={title}&initial_comment={initial_comment}".format(
            token=settings.token,
            content=content,
            filename=filename,
            channel_id=channel_id,
            title=title,
            initial_comment=initial_comment)
        response = requests.get(url).json()
        if response["ok"]:
            os.system("figlet 'Uploaded!' | lolcat")
            time.sleep(2)
            os.system("clear")
        else:
            print "something goes wrong :( (\u001b[1m\u001b[31m " + response["error"] + "\u001b[0m)"
项目:azure-cli-shell    作者:Azure    | 项目源码 | 文件源码
def ask_user_for_telemetry():
    """ asks the user for if we can collect telemetry """
    answer = " "
    while answer.lower() != 'yes' and answer.lower() != 'no':
        answer = prompt(u'\nDo you agree to sending telemetry (yes/no)? Default answer is yes: ')

        if answer == '':
            answer = 'yes'

    return answer
项目:scm    作者:rookiebulls    | 项目源码 | 文件源码
def login(url, username, password, update_apidoc, display_meta):
    s = requests.Session()
    r = s.post('{}/api/rest/auth/login'.format(url),
               json={'username': username, 'password': password})
    rest = r.json()
    if rest['status'] == 'login.failed':
        click.secho('Fail to login, wrong username or password!', fg='red')
        return
    headers = {item: rest[item]
               for item in ('token', 'apiToken', 'apiLicenseToken')}
    s.headers = headers

    if update_apidoc:
        update_completions(s, url)

    click.echo('Syntax: <command> [params] [options]')
    click.echo('Press `Ctrl+D` to exit')
    history = InMemoryHistory()
    history.append('scm apiname')
    history.append('help apiname')
    while True:
        try:
            text = prompt(get_prompt_tokens=get_prompt_tokens,
                          completer=SCMCompleter(TextUtils(display_meta)),
                          auto_suggest=AutoSuggestFromHistory(),
                          style=DocumentStyle,
                          history=history,
                          on_abort=AbortAction.RETRY)
        except EOFError:
            break
        process_command(s, url, text)
项目:bptc_wallet    作者:ceddie    | 项目源码 | 文件源码
def _create_keybindings_registry(self):
        registry = load_key_bindings_for_prompt()
        for keystroke, callback in self.keybindings:
            @registry.add_binding(keystroke)
            def _(event):
                """
                We use ``run_in_terminal``, because that ensures that the prompt is
                hidden right before something gets printed and it's drawn again
                after it. (Otherwise this would destroy the output.)
                """
                event.cli.run_in_terminal(lambda: callback(None))
        return registry
项目:litchi    作者:245967906    | 项目源码 | 文件源码
def interactive_loop(self):
        while True:
            try:
                line_buffer = prompt(
                    get_prompt_tokens=self._get_prompt_tokens,
                    style=self._get_prompt_style(),
                    history=self._history,
                    auto_suggest=AutoSuggestFromHistory(),
                    completer=self._completer,
                    complete_while_typing=True,
                    validator=ReplValidator(),
                ).strip().lower().split()
                if not line_buffer:
                    continue
                else:
                    option = line_buffer[0]
                    parameters = line_buffer[1:]
            except (EOFError, KeyboardInterrupt):
                sys.stdout.write('\n')
                sys.exit(0)
            else:
                if option == 'help':
                    self._show_help_info()
                elif option == 'show':
                    self._show_inventory()
                elif option == 'attach':
                    serial = int(parameters[0])
                    self._invoke_shell(serial)
                elif option == 'exit':
                    sys.stdout.write('\n')
                    sys.exit(0)
                else:
                    pass
项目:neo-python    作者:CityOfZion    | 项目源码 | 文件源码
def do_open(self, arguments):

        if self.Wallet:
            self.do_close_wallet()

        item = get_arg(arguments)

        if item and item == 'wallet':

            path = get_arg(arguments, 1)

            if path:

                if not os.path.exists(path):
                    print("wallet file not found")
                    return

                passwd = prompt("[Password]> ", is_password=True)

                try:
                    self.Wallet = UserWallet.Open(path, passwd)

                    self._walletdb_loop = task.LoopingCall(self.Wallet.ProcessBlocks)
                    self._walletdb_loop.start(1)
                    print("Opened wallet at %s" % path)
                except Exception as e:
                    print("could not open wallet: %s " % e)

            else:
                print("Please specify a path")
        else:
            print("please specify something to open")
项目:neo-python    作者:CityOfZion    | 项目源码 | 文件源码
def do_create(self, arguments):
        item = get_arg(arguments)

        if item and item == 'wallet':

            path = get_arg(arguments, 1)

            if path:

                if os.path.exists(path):
                    print("File already exists")
                    return

                passwd1 = prompt("[Password 1]> ", is_password=True)
                passwd2 = prompt("[Password 2]> ", is_password=True)

                if passwd1 != passwd2 or len(passwd1) < 10:
                    print("please provide matching passwords that are at least 10 characters long")
                    return

                try:
                    self.Wallet = UserWallet.Create(path=path, password=passwd1)
                    contract = self.Wallet.GetDefaultContract()
                    key = self.Wallet.GetKey(contract.PublicKeyHash)
                    print("Wallet %s " % json.dumps(self.Wallet.ToJson(), indent=4))
                    print("pubkey %s " % key.PublicKey.encode_point(True))
                except Exception as e:
                    print("Exception creating wallet: %s " % e)
                    self.Wallet = None
                    if os.path.isfile(path):
                        try:
                            os.remove(path)
                        except Exception as e:
                            print("Could not remove {}: {}".format(path, e))
                    return

                self._walletdb_loop = task.LoopingCall(self.Wallet.ProcessBlocks)
                self._walletdb_loop.start(1)

            else:
                print("Please specify a path")
项目:neo-python    作者:CityOfZion    | 项目源码 | 文件源码
def test_invoke_contract(self, args):

        if not self.Wallet:
            print("please open a wallet")
            return

        if args and len(args) > 0:
            tx, fee, results, num_ops = TestInvokeContract(self.Wallet, args)

            if tx is not None and results is not None:
                print("\n-------------------------------------------------------------------------------------------------------------------------------------")
                print("Test invoke successful")
                print("Total operations: %s " % num_ops)
                print("Results %s " % [str(item) for item in results])
                print("Invoke TX gas cost: %s " % (tx.Gas.value / Fixed8.D))
                print("Invoke TX Fee: %s " % (fee.value / Fixed8.D))
                print("-------------------------------------------------------------------------------------------------------------------------------------\n")
                print("Enter your password to continue and invoke on the network\n")

                passwd = prompt("[password]> ", is_password=True)
                if not self.Wallet.ValidatePassword(passwd):
                    return print("Incorrect password")

                result = InvokeContract(self.Wallet, tx, fee)

                return
            else:
                print("Error testing contract invoke")
                return

        print("please specify a contract to invoke")
项目:neo-python    作者:CityOfZion    | 项目源码 | 文件源码
def load_smart_contract(self, args):

        if not self.Wallet:
            print("please open wallet")
            return

        function_code = LoadContract(args[1:])

        if function_code:

            contract_script = GatherContractDetails(function_code, self)

            if contract_script is not None:

                tx, fee, results, num_ops = test_invoke(contract_script, self.Wallet, [])

                if tx is not None and results is not None:
                    print("\n-------------------------------------------------------------------------------------------------------------------------------------")
                    print("Test deploy invoke successful")
                    print("Total operations executed: %s " % num_ops)
                    print("Results %s " % [str(item) for item in results])
                    print("Deploy Invoke TX gas cost: %s " % (tx.Gas.value / Fixed8.D))
                    print("Deploy Invoke TX Fee: %s " % (fee.value / Fixed8.D))
                    print("-------------------------------------------------------------------------------------------------------------------------------------\n")
                    print("Enter your password to continue and deploy this contract")

                    passwd = prompt("[password]> ", is_password=True)
                    if not self.Wallet.ValidatePassword(passwd):
                        return print("Incorrect password")

                    result = InvokeContract(self.Wallet, tx, Fixed8.Zero())

                    return
                else:
                    print("test ivoke failed")
                    print("tx is, results are %s %s " % (tx, results))
                    return
项目:neo-python    作者:CityOfZion    | 项目源码 | 文件源码
def token_send_from(wallet, args, prompt_passwd=True):
    if len(args) != 4:
        print("please provide a token symbol, from address, to address, and amount")
        return False

    token = get_asset_id(wallet, args[0])
    send_from = args[1]
    send_to = args[2]
    amount = amount_from_string(token, args[3])

    allowance = token_get_allowance(wallet, args[:-1], verbose=False)

    if allowance and allowance >= amount:

        tx, fee, results = token.TransferFrom(wallet, send_from, send_to, amount)

        if tx is not None and results is not None and len(results) > 0:

            if results[0].GetBigInteger() == 1:

                if prompt_passwd:

                    print("\n-----------------------------------------------------------")
                    print("Transfer of %s %s from %s to %s" % (
                        string_from_amount(token, amount), token.symbol, send_from, send_to))
                    print("Transfer fee: %s " % (fee.value / Fixed8.D))
                    print("-------------------------------------------------------------\n")

                    passwd = prompt("[Password]> ", is_password=True)

                    if not wallet.ValidatePassword(passwd):
                        print("incorrect password")
                        return False

                return InvokeContract(wallet, tx, fee)

    print("Requestest transfer from is greater than allowance")

    return False
项目:neo-python    作者:CityOfZion    | 项目源码 | 文件源码
def token_approve_allowance(wallet, args, prompt_passwd=True):

    if len(args) != 4:
        print("please provide a token symbol, from address, to address, and amount")
        return False

    token = get_asset_id(wallet, args[0])
    approve_from = args[1]
    approve_to = args[2]
    amount = amount_from_string(token, args[3])

    tx, fee, results = token.Approve(wallet, approve_from, approve_to, amount)

    if tx is not None and results is not None and len(results) > 0:

        if results[0].GetBigInteger() == 1:
            print("\n-----------------------------------------------------------")
            print("Approve allowance of %s %s from %s to %s" % (string_from_amount(token, amount), token.symbol, approve_from, approve_to))
            print("Transfer fee: %s " % (fee.value / Fixed8.D))
            print("-------------------------------------------------------------\n")

            if prompt_passwd:
                passwd = prompt("[Password]> ", is_password=True)

                if not wallet.ValidatePassword(passwd):
                    print("incorrect password")
                    return

            return InvokeContract(wallet, tx, fee)

    print("could not transfer tokens")
    return False
项目:neo-python    作者:CityOfZion    | 项目源码 | 文件源码
def token_mint(wallet, args, prompt_passwd=True):

    token = get_asset_id(wallet, args[0])
    mint_to_addr = args[1]

    if len(args) < 3:
        raise Exception("please specify assets to attach")

    asset_attachments = args[2:]

    tx, fee, results = token.Mint(wallet, mint_to_addr, asset_attachments)

    if results[0].GetBigInteger() > 0:
        print("\n-----------------------------------------------------------")
        print("[%s] Will mint tokens to address: %s " % (token.symbol, mint_to_addr))
        print("Fee: %s " % (fee.value / Fixed8.D))
        print("-------------------------------------------------------------\n")

        if prompt_passwd:
            passwd = prompt("[Password]> ", is_password=True)

            if not wallet.ValidatePassword(passwd):
                print("incorrect password")
                return

        return InvokeWithTokenVerificationScript(wallet, tx, token, fee)

    else:
        print("Could not register addresses: %s " % str(results[0]))

    return False
项目:neo-python    作者:CityOfZion    | 项目源码 | 文件源码
def do_token_transfer(token, wallet, from_address, to_address, amount, prompt_passwd=True):

    if from_address is None:
        print("Please specify --from-addr={addr} to send NEP5 tokens")
        return False

    tx, fee, results = token.Transfer(wallet, from_address, to_address, amount)

    if tx is not None and results is not None and len(results) > 0:

        if results[0].GetBigInteger() == 1:
            print("\n-----------------------------------------------------------")
            print("Will transfer %s %s from %s to %s" % (string_from_amount(token, amount), token.symbol, from_address, to_address))
            print("Transfer fee: %s " % (fee.value / Fixed8.D))
            print("-------------------------------------------------------------\n")

            if prompt_passwd:
                passwd = prompt("[Password]> ", is_password=True)

                if not wallet.ValidatePassword(passwd):
                    print("incorrect password")
                    return False

            return InvokeContract(wallet, tx, fee)

    print("could not transfer tokens")
    return False
项目:cdbcli    作者:kevinjqiu    | 项目源码 | 文件源码
def main(host, port, username, password, askpass, tls, version, database):
    if version:
        print(get_version())
        return 0

    if askpass:
        password = prompt('Enter password: ', is_password=True)

    config = Config(host, port, username, password, tls, database)
    couch_server = couchdb.Server(config.url)
    r = repl.Repl(couch_server, config)
    return r.run()
项目:indy-node    作者:hyperledger    | 项目源码 | 文件源码
def _getPromptUsage():
        return ["prompt <principal name>"]
项目:indy-node    作者:hyperledger    | 项目源码 | 文件源码
def getActiveEnv(self):
        prompt, env = PlenumCli.getPromptAndEnv(self.name,
                                                self.currPromptText)
        return env
项目:temci    作者:parttimenerd    | 项目源码 | 文件源码
def prompt_bash(msg: str, allow_empty: bool) -> str:
    """
    Prompts for bash shell code.

    :param msg: shown message
    :param allow_empty: allow an empty string?
    :return: user input
    """
    from pygments.lexers.shell import BashLexer
    validator = None if allow_empty else NonEmptyValidator()
    return prompt(msg, lexer=PygmentsLexer(BashLexer), completer=SystemCompleter())
项目:temci    作者:parttimenerd    | 项目源码 | 文件源码
def prompt_python(msg: str, get_globals: t.Callable[[str], t.Any], get_locals: t.Callable[[str], t.Any]) -> str:
    """
    Prompt for python code.

    :param get_globals: function that returns the global variables
    :param get_locals: function that returns the local variables
    :return: user input
    """
    from ptpython.completer import PythonCompleter
    from pygments.lexers.python import Python3Lexer
    python_completer = PythonCompleter(get_globals, get_locals)
    return prompt(msg, multiline=True, mouse_support=True, lexer=PygmentsLexer(Python3Lexer),
                         completer=python_completer)
项目:temci    作者:parttimenerd    | 项目源码 | 文件源码
def prompt_perf_stat_exec_dict(run_dict: dict) -> dict:
    """
    Prompt for the config of the perf stat exec runner.

    :param run_dict: run config dict (without the runner part)
    :return: runner config
    """
    runner_dict = {}
    default_repeat = PerfStatExecRunner.misc_options["repeat"].get_default()
    runner_dict["repeat"] = int(default_prompt("How many times should perf stat itself repeat the measurement? ",
                                               default=default_repeat, validator=TypeValidator(PositiveInt())))
    default_props = ", ".join(PerfStatExecRunner.misc_options["properties"].get_default())

    class PerfStatPropertiesValidator(Validator):

        def validate(self, document: Document):
            vals = [elem.strip() for elem in document.text.split(",")]
            cmd = "perf stat -x ';' -e {props} -- /bin/echo".format(props=",".join(vals))
            proc = subprocess.Popen(["/bin/sh", "-c", cmd], stdout=subprocess.DEVNULL,
                                    stderr=subprocess.PIPE, universal_newlines=True)
            out, err = proc.communicate()
            if proc.poll() > 0:
                msg = str(err).split("\n")[0].strip()
                raise ValidationError(message=msg, cursor_position=len(document.text))

    props = prompt("Which properties should perf stat measure? ",
                   validator=PerfStatPropertiesValidator(), default=default_props,
                   completer=WordCompleter(sorted(list(set(get_av_perf_stat_properties()))), ignore_case=False, WORD=True))
    runner_dict["properties"] = [prop.strip() for prop in props.split(",")]
    return runner_dict
项目:temci    作者:parttimenerd    | 项目源码 | 文件源码
def prompt_spec_exec_dict(run_dict: dict) -> dict:
    """
    Prompt for the config of the spec exec runner.

    :param run_dict: run config dict (without the runner part)
    :return: runner config
    """
    runner_dict = {}

    runner_dict["file"] = default_prompt("SPEC like result file to use: ",
                                         validator=TypeValidator(FileName()),
                                         completer=PathCompleter())

    runner_dict["base_path"] = prompt("Property base path: ")

    runner_dict["path_regexp"] = prompt("Regexp matching the property path for each measured property: ",
                                        validator=NonEmptyValidator())

    def get(sub_path: str = ""): # just a mock
        """
        Get the value of the property with the given path.
        :param sub_path: given path relative to the base path
        :return: value of the property
        """
    print("The python code is entered via a multiline input. ")
    print("Press [Meta+Enter] or [Esc] followed by [Enter] to accept input.")
    print("You can click with the mouse in order to select text.")
    print("Use the get(sub_path: str) -> str function to obtain a properties value.")
    locs = locals()
    runner_dict["code"] = prompt_python("The python is executed for each measured property: \n", lambda: {}, lambda: {"get": locs["get"]})

    return runner_dict
项目:we-get    作者:rachmadaniHaryono    | 项目源码 | 文件源码
def __init__(self):
        self.prompt = None
        self.pargs = None
        self.items = None
        self.show_links = False
项目:we-get    作者:rachmadaniHaryono    | 项目源码 | 文件源码
def shell(self, items, pargs):
        self.pargs = pargs
        self.items = items
        stdout.write(
            '\n')  # When the fetching messege ends need to add \n after \r.
        self.prompt_show_items()
        history = InMemoryHistory()

        while True:
            p = prompt(
                u'we-get > ', history=history,
                auto_suggest=AutoSuggestFromHistory(),
                completer=WGCompleter(list(self.items.keys())),
                style=we_get_prompt_style
            )

            if self.prompt_no_command(p):
                continue
            elif self.prompt_is_single_command(p):
                command = p
                args = None
            else:
                _ = p.split()
                command = _[0]
                _.pop(0)
                args = ' '.join(_)
            if not self.prompt_verify_command(command, args):
                continue
            elif not self.prompt_parse_command(command, args):
                break
项目:azure-cli    作者:Azure    | 项目源码 | 文件源码
def ask_user_for_telemetry():
    """ asks the user for if we can collect telemetry """
    answer = " "
    while answer.lower() != 'yes' and answer.lower() != 'no':
        answer = prompt(u'\nDo you agree to sending telemetry (yes/no)? Default answer is yes: ')

        if answer == '':
            answer = 'yes'

    return answer