Python click 模块,echo_via_pager() 实例源码

我们从Python开源项目中,提取了以下27个代码示例,用于说明如何使用click.echo_via_pager()

项目:freckles    作者:makkus    | 项目源码 | 文件源码
def output(python_object, format="raw", pager=False):
    if format == 'yaml':
        output_string = yaml.safe_dump(python_object, default_flow_style=False, encoding='utf-8', allow_unicode=True)
    elif format == 'json':
        output_string = json.dumps(python_object, sort_keys=4, indent=4)
    elif format == 'raw':
        output_string = str(python_object)
    elif format == 'pformat':
        output_string = pprint.pformat(python_object)
    else:
        raise Exception("No valid output format provided. Supported: 'yaml', 'json', 'raw', 'pformat'")

    if pager:
        click.echo_via_pager(output_string)
    else:
        click.echo(output_string)
项目:http-prompt    作者:eliangcs    | 项目源码 | 文件源码
def write(self, data):
        if isinstance(data, six.binary_type):
            data = data.decode('utf-8')

        # echo_via_pager() already appends a '\n' at the end of text,
        # so we use rstrip() to remove extra newlines (#89)
        click.echo_via_pager(data.rstrip())
项目:EasyEuler    作者:Encrylize    | 项目源码 | 文件源码
def cli(sort):
    """ Lists all available problems. """

    problems = sorted(data.problems, key=lambda problem: problem[sort.lower()])
    problem_list = ((problem['id'], problem['name'],
                     '%d%%' % problem['difficulty']) for problem in problems)

    table = tabulate(problem_list, TABLE_HEADERS, tablefmt='fancy_grid')
    click.echo_via_pager(table)
项目:grab-screen    作者:andrei-shabanski    | 项目源码 | 文件源码
def config_list():
    lines = []

    for option in config:
        value = config.get(option)
        lines.append('{} = {}'.format(option, value))

    output = '\n'.join(lines)
    click.echo_via_pager(output)
项目:inspire-crawler    作者:inspirehep    | 项目源码 | 文件源码
def _show_file(file_path, header_name='Shown'):
    if file_path.startswith('file:/'):
        file_path = file_path[6:]

    click.secho("%s file: %s" % (header_name, file_path), fg='blue')
    if not os.path.exists(file_path):
        click.secho('    The file does not exist', fg='yellow')
    else:
        with open(file_path) as fd:
            click.echo_via_pager(fd.read())
项目:ceph-lcm    作者:Mirantis    | 项目源码 | 文件源码
def log(ctx, execution_id, client):
    """Get execution log (plain text from ansible-playbook) for a certain
    execution."""

    response = client.get_execution_log(execution_id,
                                        headers={"Content-Type": "text/plain"},
                                        raw_response=True, stream=True)
    if ctx.obj["pager"]:
        click.echo_via_pager(response.text)
    else:
        for line in response.iter_lines(decode_unicode=True):
            click.echo(line)
项目:ceph-lcm    作者:Mirantis    | 项目源码 | 文件源码
def format_output_json(ctx, response, error=False):
    response = json_dumps(response)
    response = colorize(response, ctx.obj["color"], "json")

    if error:
        click.echo(response, err=True)
    elif ctx.obj["pager"]:
        click.echo_via_pager(response)
    else:
        click.echo(response)
项目:gitsome    作者:donnemartin    | 项目源码 | 文件源码
def build_table(self, view_entries, limit, pager, format_method,
                    build_urls=True, print_output=True):
        """Build the table used for the gh view command.

        :type view_entries: list
        :param view_entries: A list of `github3` items.

        :type limit: int
        :param limit: Determines the number of items to show.

        :type format_method: callable
        :param format_method: A method called to format each item in the table.

        :type build_urls: bool
        :param build_urls: Determines whether to build urls for the
                gh view # command.

        :type print_output: bool
        :param print_output: determines whether to print the output
                (True) or return the output as a string (False).

        :rtype: str
        :return: the output if print_output is True, else, return None.
        """
        if build_urls:
            self.build_table_urls(view_entries)
        index = 0
        output = ''
        for view_entry in view_entries:
            index += 1
            view_entry.index = index
            output += format_method(view_entry) + '\n'
            if index >= limit:
                break
        if build_urls:
            if len(view_entries) > limit:
                output += click.style(('       <Hiding ' +
                                       str(len(view_entries) - limit) +
                                       ' item(s) with the -l/--limit flag>\n'),
                                      fg=self.config.clr_message)
        if index == 0:
            output += click.style('No results found',
                                  fg=self.config.clr_message)
        elif build_urls:
            output += click.style(self.create_tip(index))
        else:
            output += click.style('')
        if print_output:
            if pager:
                color = None
                if platform.system() == 'Windows':
                    color = True
                    # Strip out Unicode, which seems to have issues on
                    # Windows with click.echo_via_pager.
                    output = re.sub(r'[^\x00-\x7F]+', '', output)
                click.echo_via_pager(output, color)
            else:
                click.secho(output)
            return None
        else:
            return output
项目:gitsome    作者:donnemartin    | 项目源码 | 文件源码
def generate_url_contents(self, url):
        """Generate the formatted contents of the given item's url.

        Converts the HTML to text using HTML2Text, colors it, then displays
            the output in a pager.

        :type url: str
        :param url: The url whose contents to fetch.

        :rtype: str
        :return: The string representation of the formatted url contents.
        """
        try:
            headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}  # NOQA
            raw_response = requests.get(url, headers=headers,
                                        verify=self.config.verify_ssl)
        except (requests.exceptions.SSLError,
                requests.exceptions.ConnectionError) as e:
            contents = 'Error: ' + str(e) + '\n'
            contents += 'Try running gh view # with the --browser/-b flag\n'
            return contents
        contents = self.html_to_text.handle(raw_response.text)
        # Strip out Unicode, which seems to have issues when html2txt is
        # coupled with click.echo_via_pager.
        contents = re.sub(r'[^\x00-\x7F]+', '', contents)
        contents = self.format_markdown(contents)
        return contents
项目:gitsome    作者:donnemartin    | 项目源码 | 文件源码
def view_url(self, url):
        """View the given url.

        :type index: int
        :param index: The index for the given item, used with the
            gh view [index] commend.

        :type url: str
        :param url: The url to view
        """
        contents = self.generate_url_contents(url)
        header = click.style('Viewing ' + url + '\n\n',
                             fg=self.config.clr_message)
        contents = header + contents
        contents += click.style(('\nView this article in a browser with'
                                 ' the -b/--browser flag.\n'),
                                fg=self.config.clr_message)
        contents += click.style(('\nPress q to quit viewing this '
                                 'article.\n'),
                                fg=self.config.clr_message)
        if contents == '{"error":"Not Found"}\n':
            click.secho('Invalid user/repo combination.')
            return
        color = None
        if platform.system() == 'Windows':
            color = True
        click.echo_via_pager(contents, color)
项目:tempy    作者:Dascr32    | 项目源码 | 文件源码
def delete(config, a, se):
    """
    Deletes all the directory content (files, dirs)
    """
    if a and click.confirm("Delete all contents of " + config.dir_to_use + " ?"):
        click.echo("Attempting to delete: " + str(analyzer.get_entries_count()) + " entries...\n")

        cleaner.delete_dir_content(config.dir_to_use)
        filemanager.write_cleanup_report(cleaner.cleanup_data, config.app_dir)
        filemanager.pickle_data("last-cleanup", cleaner.cleanup_data, config.app_dir)  # Make clean up data persistent

        click.echo("\nDeletion complete!")
        click.echo("* Deletions: " + str(cleaner.cleanup_data["deletions"]))
        click.echo("* Deletion size: " + converter.human_readable_size(cleaner.cleanup_data["size"]))
        click.echo("* Errors: " + str(cleaner.cleanup_data["error_count"]))

    if se:
        try:
            last_cleanup = filemanager.unpickle_data("last-cleanup")

            click.echo("Errors encountered during the last deletion [" + last_cleanup["datetime"] + "]:")
            click.echo("Total: " + str(last_cleanup["error_count"]) + "\n")
            click.echo_via_pager("\n\n".join("* %s" % error
                                             for error in last_cleanup["errors"]))
        except FileNotFoundError:
            click.echo("No error data was found.")
项目:bootcamp_cli    作者:jokamjohn    | 项目源码 | 文件源码
def get_news_articles(source):
    data = requests.get(
        "https://newsapi.org/v1/articles?source=" + source + "&apiKey=ba4d03d4a6f84f10962a79fd977e43b2")
    if data.status_code == 200:
        news_json = data.json()
        articles = news_json["articles"]
        for article in articles:
            author = article['author']
            title = article['title']
            description = article['description']

            click.echo_via_pager(" TITLE: {} \n DESCRIPTION: {} \n AUTHOR: {}".format(title, description, author))
            click.secho("=" * 130, fg='blue')
    else:
        click.echo("An error occurred try again later")
项目:cuvner    作者:meejah    | 项目源码 | 文件源码
def readme():
    """
    View the README
    """
    click.echo_via_pager(
        highlight(
            pkg_resources.resource_string("cuv", "README.rst"),
            get_lexer_by_name('rst'),
            formatter=TerminalFormatter(),
        )
    )
项目:cuvner    作者:meejah    | 项目源码 | 文件源码
def __exit__(self, a, b, c):
        msg = ''.join(self._lines)
        click.echo_via_pager(msg, color=True)
项目:cuvner    作者:meejah    | 项目源码 | 文件源码
def paged_echo():
    """
    Unfortunately, to use .echo_via_pager() in Click, you have to feed
    it all of the lines at once.

    This returns a context-manager that lets you incrementally call
    '.echo' (same as 'click.echo') incrementally, only outputting (to
    the pager) when the context closes.
    """
    return _PagedEcho()
项目:databrewer    作者:rmax    | 项目源码 | 文件源码
def _echo(output, min_lines=10):
    ctx = click.get_current_context()
    if ctx.obj.get('use_pager') and output.count('\n') > min_lines:
        _func = click.echo_via_pager
    else:
        _func = click.echo
    _func(output, sys.stdout)
项目:clickhouse-cli    作者:hatarist    | 项目源码 | 文件源码
def pager(self, text, end=None):
        return echo_via_pager(text)
项目:qypi    作者:jwodder    | 项目源码 | 文件源码
def readme(packages):
    """
    View packages' long descriptions.

    If stdout is a terminal, the descriptions are passed to a pager program
    (e.g., `less(1)`).

    Packages can be specified as either ``packagename`` to show the latest
    version or as ``packagename==version`` to show the long description for
    ``version``.
    """
    for pkg in packages:
        click.echo_via_pager(pkg["info"]["description"])
项目:ecxclient    作者:catalogicsoftware    | 项目源码 | 文件源码
def list(ctx, **kwargs):
    jobs = JobAPI(ecx_session=ctx.ecx_session).list()
    if ctx.json:
        ctx.print_response(jobs)
        return

    job_table_info = [(x['name'], x['id'], x['status'], format_last_run_time(x['lastRunTime'])) for x in jobs]
    if not job_table_info:
        return

    print
    click.echo_via_pager(tabulate(job_table_info, headers=["Name","ID", "Status", "Last run"]))
    print
项目:ecxclient    作者:catalogicsoftware    | 项目源码 | 文件源码
def list(ctx, **kwargs):
    resp = ctx.ecx_session.get(restype=ctx.restype, endpoint=ctx.endpoint)

    list_field = kwargs.get('listfield') or resource_to_listfield.get(ctx.restype) or (ctx.restype + 's')

    if ctx.json or list_field not in resp:
        ctx.print_response(resp)
        return

    resources = resp[list_field]
    if kwargs['fields']:
        fields = [x.strip() for x in kwargs['fields'].split(',')]
    else:
        fields = ["name", "id"]

    table_data = []
    for res in resources:
        row = []
        for field in fields:
            row.append(res.get(field, None))

        table_data.append(row)

    if not table_data:
        return

    print
    click.echo_via_pager(tabulate(table_data, headers=fields))
    print
项目:ecxclient    作者:catalogicsoftware    | 项目源码 | 文件源码
def usedby(ctx, id, **kwargs):
    resp = AssociationAPI(ecx_session=ctx.ecx_session).get_using_resources(ctx.restype, id)["resources"]
    if ctx.json:
        ctx.print_response(resp)
        return

    table_data = [(x["type"], x["resourceId"], x["name"]) for x in resp]

    print
    click.echo_via_pager(tabulate(table_data, headers=["Type", "ID", "Name"]))
    print
项目:ecxclient    作者:catalogicsoftware    | 项目源码 | 文件源码
def print_response(self, resp):
        if not self.links:
            remove_links(resp)

        click.echo_via_pager(json.dumps(resp, indent=4))
项目:ghutil    作者:jwodder    | 项目源码 | 文件源码
def cli(issue, since):
    """ View comments on an issue/PR """
    output = show_comment(issue.data)
    for comment in issue.comments.get(params={"since": since}):
        output += '\n' + show_comment(comment)
    # echo_via_pager adds a newline, so remove the "extra" newline at the end
    click.echo_via_pager(output.rstrip('\r\n'))
项目:ghutil    作者:jwodder    | 项目源码 | 文件源码
def cli(issue, since):
    """ View comments on an issue/PR """
    output = show_comment(issue.data)
    for comment in issue.comments.get(params={"since": since}):
        output += '\n' + show_comment(comment)
    # echo_via_pager adds a newline, so remove the "extra" newline at the end
    click.echo_via_pager(output.rstrip('\r\n'))
项目:DataFS    作者:ClimateImpactLab    | 项目源码 | 文件源码
def log(self):

        history = self.get_history()

        outputs = []

        for i, record in enumerate(history):
            output = ''

            record['timestamp'] = time.strftime(
                '%a, %d %b %Y %H:%M:%S +0000',
                time.strptime(record['updated'], '%Y%m%d-%H%M%S'))

            checksum = (
                ' ({algorithm} {checksum})\nDate:      {timestamp}'.format(
                    **record))

            if self.versioned:
                output = output + '\n' + (
                    click.style(
                        'version {}'.format(record['version']) + checksum,
                        fg='green'))

            else:
                output = output + '\n' + (
                    click.style(
                        'update {}'.format(len(history) - i) + checksum,
                        fg='green'))

            for attr, val in sorted(record['user_config'].items()):
                output = output + '\n' + (
                    '{:<10} {}'.format(attr+':', val))

            if record.get('message', None) is not None:
                wrapper = textwrap.TextWrapper(
                    initial_indent='    ',
                    subsequent_indent='    ',
                    width=66)

                output = output + '\n\n'
                output = output + '\n'.join(
                    wrapper.wrap(str(record['message'])))

            outputs.append(output)

        click.echo_via_pager('\n\n'.join(reversed(outputs)) + '\n')
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def emit(self, record):
        """ Echos the record.msg string by using click.echo()

            The record will have attributes set to it when a user logs a message
            with any kwargs given. This function looks for any attributes that
            are in ECHO_KWARGS. If record has any kwargs attributes, the record will
            be echoed with the given kwargs. This makes click logging with a logger very easy.

            A user can add {"pager": True} as an extra param to any of the log commands to print
            the content to a pager.

        Args:
            record (logging.LogRecord): record which gets echoed with click.echo()
        """
        try:
            # first format the record which adds the click style
            formatted_record = self.format(record)

            # user wants to use a pager to show message
            if hasattr(record, "pager") and record.pager:
                # save the original eviron dict
                original_env = dict(os.environ)

                # Force system to use pager and add default prompt at bottom left of the screen
                os.environ['PAGER'] = "less"
                os.environ['LESS'] = LESS_ENV
                try:
                    click.echo_via_pager(formatted_record.msg,
                                         color=record.color if hasattr(record, "color") else None)

                # being paranoid here because we do NOT want to mess with people's environment
                except:
                    os.environ.clear()
                    os.environ.update(original_env)
                    raise
                else:
                    os.environ.clear()
                    os.environ.update(original_env)
            else:
                # Sets the default kwargs
                kwargs = dict()

                # if user added a known kwarg, change the defaults
                for kwarg_name in self.ECHO_KWARGS:
                    if hasattr(record, kwarg_name):
                        kwargs[kwarg_name] = getattr(record, kwarg_name)

                # echo to console
                click.echo(formatted_record.msg, **kwargs)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)
项目:twtxt    作者:buckket    | 项目源码 | 文件源码
def timeline(ctx, pager, limit, twtfile, sorting, timeout, porcelain, source, cache, force_update):
    """Retrieve your personal timeline."""
    if source:
        source_obj = ctx.obj["conf"].get_source_by_nick(source)
        if not source_obj:
            logger.debug("Not following {0}, trying as URL".format(source))
            source_obj = Source(source, source)
        sources = [source_obj]
    else:
        sources = ctx.obj["conf"].following

    tweets = []

    if cache:
        try:
            with Cache.discover(update_interval=ctx.obj["conf"].timeline_update_interval) as cache:
                force_update = force_update or not cache.is_valid
                if force_update:
                    tweets = get_remote_tweets(sources, limit, timeout, cache)
                else:
                    logger.debug("Multiple calls to 'timeline' within {0} seconds. Skipping update".format(
                        cache.update_interval))
                    # Behold, almighty list comprehensions! (I might have gone overboard here…)
                    tweets = list(chain.from_iterable([cache.get_tweets(source.url) for source in sources]))
        except OSError as e:
            logger.debug(e)
            tweets = get_remote_tweets(sources, limit, timeout)
    else:
        tweets = get_remote_tweets(sources, limit, timeout)

    if twtfile and not source:
        source = Source(ctx.obj["conf"].nick, ctx.obj["conf"].twturl, file=twtfile)
        tweets.extend(get_local_tweets(source, limit))

    if not tweets:
        return

    tweets = sort_and_truncate_tweets(tweets, sorting, limit)

    if pager:
        click.echo_via_pager(style_timeline(tweets, porcelain))
    else:
        click.echo(style_timeline(tweets, porcelain))