Python click 模块,clear() 实例源码

我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用click.clear()

项目:polyaxon-cli    作者:polyaxon    | 项目源码 | 文件源码
def resources(cls, jobs_resources):
        jobs_resources = to_list(jobs_resources)
        click.clear()
        data = [['Job', 'Mem Usage/Limit', 'CPU%-CPUs', 'GPU Mem', 'GPU Usage']]
        for job_resources in jobs_resources:
            job_resources = ContainerResourcesConfig.from_dict(job_resources)
            line = [
                job_resources.job_uuid.hex,
                '{} / {}'.format(job_resources.memory_used / (1024 ** 3),
                                 job_resources.memory_limit / (1024 ** 3)),
                '{} - {}'.format(job_resources.cpu_percentage,
                                 len(job_resources.percpu_percentage))]
            if job_resources.gpu_resources:
                pass
            data.append(line)
        click.echo(tabulate(data, headers="firstrow"))
        sys.stdout.flush()
项目:habu    作者:portantier    | 项目源码 | 文件源码
def procpkt(pkt):

    now = time()
    output = '{seconds}\t{ip}\t{hwaddr}\t{vendor}'

    if 'ARP' in pkt:
        hosts[pkt[ARP].psrc] = {}
        hosts[pkt[ARP].psrc]['hwaddr'] = pkt[ARP].hwsrc
        hosts[pkt[ARP].psrc]['vendor'] = mac2vendor.get_comment(pkt[ARP].hwsrc)
        hosts[pkt[ARP].psrc]['time'] = time()

        click.clear()

        for ip in sorted(hosts):
            print(output.format(
                seconds = int(now - hosts[ip]['time']),
                ip = ip,
                hwaddr = hosts[ip]['hwaddr'],
                vendor = hosts[ip]['vendor']
            ))
项目:valohai-cli    作者:valohai    | 项目源码 | 文件源码
def draw(self):
        execution = self.log_manager.execution
        events = self.events
        l = Layout()
        l.add(self.get_header_flex(execution))
        l.add(self.get_stat_flex(execution))
        l.add(Divider('='))
        available_height = l.height - len(l.rows) - 2
        if available_height > 0:
            for event in events[-available_height:]:
                l.add(
                    Flex(style=stream_styles.get(event['stream']))
                    .add(event['time'].split('T')[1][:-4] + '  ', flex=0)
                    .add(event['message'], flex=4)
                )
        click.clear()
        l.draw()
项目:defplorex    作者:trendmicro    | 项目源码 | 文件源码
def monitor(index, delta, query_string):
    click.clear()

    def cnt():
        q = Q('query_string', query=query_string)
        s = Search(
                using=es.client,
                index=index).query(q)
        return s.count()

    N = cnt()
    tot = Search(using=es.client, index=index).count()

    if not delta:
        N = tot

    log.info('Processing %d records (total: %d)', N, tot)

    click.echo('You can exit by CTRL-C: results will still process')

    bar = SlowOverallFancyBar('', max=N, grand_total=tot)
    while True:
        time.sleep(5.0)
        try:
            n = cnt()
            if isinstance(n, int):
                if delta:
                    done = N - n
                else:
                    done = n
                bar.goto(done)
        except Exception as e:
            log.warn('Cannot count: %s', e)
    bar.finish()
项目:defplorex    作者:trendmicro    | 项目源码 | 文件源码
def delete_index(index):
    """Delete an index"""
    click.clear()

    click.confirm(
            click.style(
                'Really DELETE index "%s"?' % index,
                fg='white',
                bg='red'), abort=True)

    es.client.indices.delete(index=index)

    log.info('Index deleted')
项目:defplorex    作者:trendmicro    | 项目源码 | 文件源码
def clone_index(use_helper, from_index, to_index):
    """Clone an index"""
    from elasticsearch_dsl import Search
    from elasticsearch.helpers import reindex

    click.clear()

    if not es.client.indices.exists(index=to_index):
        click.secho('%s not existing!'.format(to_index), fg='red')
        return 1

    cnt = Search(using=es.client, index=to_index).count()
    message = 'Index %s already exists (%d records). Overwrite?' % (
            to_index, cnt)

    click.confirm(message, abort=True)

    if use_helper:
        reindex(
                client=es.client,
                source_index=from_index,
                target_index=to_index)
    else:
        es.client.reindex(
                body=dict(
                    source=dict(index=from_index),
                    dest=dict(index=to_index)),
                wait_for_completion=False)
项目:defplorex    作者:trendmicro    | 项目源码 | 文件源码
def monitor_clone_index(from_index, to_index):
    """Monitor the size of an index"""
    from elasticsearch_dsl import Search

    click.clear()

    cnt = Search(using=es.client, index=from_index).count()

    bar = SlowFancyBar('', max=cnt)
    while True:
        time.sleep(2.0)
        _cnt = Search(using=es.client, index=to_index).count()
        bar.goto(_cnt)
    bar.finish()
项目:Dallinger    作者:Dallinger    | 项目源码 | 文件源码
def monitor(app):
    """Set up application monitoring."""
    heroku_app = HerokuApp(dallinger_uid=app)
    webbrowser.open(heroku_app.dashboard_url)
    webbrowser.open("https://requester.mturk.com/mturk/manageHITs")
    heroku_app.open_logs()
    check_call(["open", heroku_app.db_uri])
    while _keep_running():
        summary = get_summary(app)
        click.clear()
        click.echo(header)
        click.echo("\nExperiment {}\n".format(app))
        click.echo(summary)
        time.sleep(10)
项目:flashcards    作者:zergov    | 项目源码 | 文件源码
def start(self, cards):
        """
        Start a StudySession with the iterator of cards given.

        :param cards: cards iterator.
        """
        self.question_num = len(cards)
        self.question_count = 1  # starts at 1 for display convenience

        for card in cards:
            click.clear()
            self.show_question(card.question)
            self.show_answer(card.answer)
项目:CrickFev    作者:sipah00    | 项目源码 | 文件源码
def main(ch):
    click.clear()
    getScore(ch)
项目:CrickFev    作者:sipah00    | 项目源码 | 文件源码
def getLiveEvents():
    click.clear()
    click.secho("Events Going On Live Right Now", bold=True, fg='green')
    for ix in range(0, len(tableHeads)):
        click.secho("\t" + str(ix+1) + "| " + str(tableHeads[ix].h2.text), fg='cyan')
项目:spoppy    作者:sindrig    | 项目源码 | 文件源码
def navigate_to(self, going):
        logger.debug('navigating to: %s' % going)
        self.session.process_events()
        going.initialize()
        while self.navigating:
            self.check_spotipy_me()
            click.clear()
            self.print_header()
            self.print_menu(going.get_ui())
            response = going.get_response()
            if callable(response):
                response = response()
                logger.debug('Got response %s after evaluation' % response)
            if response == responses.QUIT:
                click.clear()
                click.echo('Thanks, bye!')
                self.navigating = False
                return
            elif response == responses.UP:
                break
            elif response == responses.NOOP:
                continue
            elif response == responses.PLAYER:
                self.navigate_to(self.player)
            elif response != going:
                self.navigate_to(response)
            # This happens when the `going` instance gets control again. We
            # don't want to remember the query and we want to rebuild the
            # menu's options
            # (and possibly something else?)
            going.initialize()
项目:pyFootball    作者:JustnAugr    | 项目源码 | 文件源码
def main(user, flag):
    global _user
    global _league
    _user = user
    _league = user.get_league()
    if flag:
        click.clear()
        print('%s Managerial office' % _user.get_team().get_name())
        print('-'.ljust(39,'-'))    
    nav = click.prompt('What would you like to do?', type=click.Choice(['help','inbox','schedule','squad','standings','personal', 'save', 'exit']))
    if nav == 'help':
        print('\nWhile in your office, you have a variety of resources available to you. You may:\n\n'
                'inbox      :  access any new mail you\'ve received, whether they be newsletters, injury news, player communications, or transfer offers\n'
                'schedule   :  take a look at upcoming games and past results of both your team and other teams in the league\n'
                'squad      :  check on how your players are doing, including their stats based on recent training sessions\n'
                'standings  :  see how your team ranks up on the table, along with other useful information and stats\n'
                'personal   :  see your own personal stats and information\n'
                'save       :  save your in-game progress\n'
                'exit       :  exit out of the game, although why would you do that?\n')
        main(_user, False)
    elif nav == 'exit':
        pass
    elif nav == 'inbox':
        inbox()
    elif nav == 'schedule':
        schedule()
    elif nav == 'squad':
        squad()
    elif nav == 'standings':
        standings()
    elif nav == 'personal':
        personal()
    elif nav == 'save':
        save()
项目:pyFootball    作者:JustnAugr    | 项目源码 | 文件源码
def inbox():
    click.clear()
项目:pyFootball    作者:JustnAugr    | 项目源码 | 文件源码
def new_game():
    global user
    name = click.prompt('\nCongratulations on becoming a manager! What is your name?', confirmation_prompt=True)
    user = gen.Manager(name)
    print('\n')
    click.clear()
    print('Here are the list of teams in the league:')
    for n in range(10):
       print('%s) %s' % (str(n + 1), user.get_league().get_team_list()[n].get_name()))
    team_selection()
项目:kube-shell    作者:cloudnativelabs    | 项目源码 | 文件源码
def run_cli(self):

        def get_title():
            return "kube-shell"

        logger.info("running kube-shell event loop")
        if not os.path.exists(os.path.expanduser("~/.kube/config")):
            click.secho('Kube-shell uses ~/.kube/config for server side completion. Could not find ~/.kube/config. '
                    'Server side completion functionality may not work.', fg='red', blink=True, bold=True)
        while True:
            global inline_help
            try:
                Kubeshell.clustername, Kubeshell.user, Kubeshell.namespace = KubeConfig.parse_kubeconfig()
            except:
                logger.error("unable to parse ~/.kube/config %s", exc_info=1)
            completer.set_namespace(self.namespace)

            try:
                user_input = prompt('kube-shell> ',
                            history=self.history,
                            auto_suggest=AutoSuggestFromHistory(),
                            style=StyleFactory("vim").style,
                            lexer=KubectlLexer,
                            get_title=get_title,
                            enable_history_search=False,
                            get_bottom_toolbar_tokens=self.toolbar.handler,
                            vi_mode=True,
                            key_bindings_registry=registry,
                            completer=completer)
            except (EOFError, KeyboardInterrupt):
                sys.exit()

            if user_input == "clear":
                click.clear()
            elif user_input == "exit":
                sys.exit()

            # if execute shell command then strip "!"
            if user_input.startswith("!"):
                user_input = user_input[1:]

            if user_input:
                if '-o' in user_input and 'json' in user_input:
                    user_input += ' | pygmentize -l json'
                p = subprocess.Popen(user_input, shell=True)
                p.communicate()
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def watch(limit):
    """watch scan rates across the cluster"""
    period = 5.0
    prev = db.db()
    prev_totals = None

    while True:
        click.clear()
        time.sleep(period)
        cur = db.db()
        cur.data['gkrate'] = {}
        progress = []
        prev_buckets = {b.bucket_id: b for b in prev.buckets()}

        totals = {'scanned': 0, 'krate': 0, 'lrate': 0, 'bucket_id': 'totals'}

        for b in cur.buckets():
            if not b.scanned:
                continue

            totals['scanned'] += b.scanned
            totals['krate'] += b.krate
            totals['lrate'] += b.lrate

            if b.bucket_id not in prev_buckets:
                b.data['gkrate'][b.bucket_id] = b.scanned / period
            elif b.scanned == prev_buckets[b.bucket_id].scanned:
                continue
            else:
                b.data['gkrate'][b.bucket_id] =  (
                    b.scanned - prev_buckets[b.bucket_id].scanned) / period
            progress.append(b)

        if prev_totals is None:
            totals['gkrate'] = '...'
        else:
            totals['gkrate'] = (totals['scanned'] - prev_totals['scanned']) / period
        prev = cur
        prev_totals = totals

        progress = sorted(progress, key=lambda x: x.gkrate, reverse=True)

        if limit:
            progress = progress[:limit]

        progress.insert(0, utils.Bag(totals))
        format_plain(
            progress, None,
            explicit_only=True,
            keys=['bucket_id', 'scanned', 'gkrate', 'lrate', 'krate'])