Python click 模块,style() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用click.style()

项目:hlsclt    作者:benjmarshall    | 项目源码 | 文件源码
def print_project_status(ctx):
    config = ctx.obj.config
    solution_num = ctx.obj.solution_num
    project_status = gather_project_status(ctx)
    # Print out a 'pretty' message showing project status, first up some project details
    click.secho("Project Details", bold=True)
    click.echo("  Project Name: " + config["project_name"])
    click.echo("  Number of solutions generated: " + str(solution_num))
    click.echo("  Latest Solution Folder: '" + config["project_name"] + "/solution" + str(solution_num) + "'")
    click.echo("  Language Choice: " + config["language"])
    # And now details about what builds have been run/are passing.
    # This section uses lots (too many!) 'conditional expressions' to embed formatting into the output.
    click.secho("Build Status", bold=True)
    click.echo("  C Simulation: " + (click.style("Pass", fg='green') if "csim_pass" in project_status else (click.style("Fail", fg='red') if "csim_fail" in project_status else (click.style("Run (Can't get status)", fg='yellow') if "csim_done" in project_status else click.style("Not Run", fg='yellow')))))
    click.echo("  C Synthesis:  " + (click.style("Run", fg='green') if "syn_done" in project_status else click.style("Not Run", fg='yellow')))
    click.echo("  Cosimulation: " + (click.style("Pass", fg='green') if "cosim_pass" in project_status else (click.style("Fail", fg='red') if "cosim_fail" in project_status else (click.style("Run (Can't get status)", fg='yellow') if "cosim_done" in project_status else click.style("Not Run", fg='yellow')))))
    click.echo("  Export:" )
    click.echo("    IP Catalog:        " + (click.style("Run", fg='green') if "export_ip_done" in project_status else click.style("Not Run", fg='yellow')))
    click.echo("    System Generator:  " + (click.style("Run", fg='green') if "export_sysgen_done" in project_status else click.style("Not Run", fg='yellow')))
    click.echo("    Export Evaluation: " + (click.style("Run", fg='green') if "evaluate_done" in project_status else click.style("Not Run", fg='yellow')))

### Click Command Definitions ###
# Report Command
项目:bottery    作者:rougeth    | 项目源码 | 文件源码
def run(self):
        click.echo('{now}\n{bottery} version {version}'.format(
            now=datetime.now().strftime('%B %m, %Y -  %H:%M:%S'),
            bottery=click.style('Bottery', fg='green'),
            version=bottery.__version__
        ))

        self.loop.run_until_complete(self.configure())

        if self._server is not None:
            handler = self.server.make_handler()
            setup_server = self.loop.create_server(handler, '0.0.0.0', 7000)
            self.loop.run_until_complete(setup_server)
            click.echo('Server running at http://localhost:7000')

        if not self.tasks:
            click.secho('No tasks found.', fg='red')
            self.stop()
            sys.exit(1)

        for task in self.tasks:
            self.loop.create_task(task())

        click.echo('Quit the bot with CONTROL-C')
        self.loop.run_forever()
项目:dlcli    作者:outlyerapp    | 项目源码 | 文件源码
def metrics(period, resolution, tag, threads, metricsfile):
    try:
        pool = Pool(processes=threads)
        period_seconds = period * 3600
        resolution_seconds = resolution * 3600

        if metricsfile:
            with open(metricsfile) as fp:
                m = json.loads(fp.read().replace('][', ','))
        else:
            m = metrics_api.get_tag_metrics(tag_name=tag, **context.settings)

        click.echo(click.style('Found: %s metrics', fg='green') % (len(m)))

        expire = partial(_expire_metric_path, period_seconds, resolution_seconds, tag)
        expired_paths = tqdm(pool.imap_unordered(expire, m))
        expired_paths = sum(filter(None, expired_paths))
        click.echo(click.style('Expired: %s metric paths', fg='green') % expired_paths)

    except Exception, e:
        print 'Cleanup metrics failed. %s' % e

    finally:
        pool.terminate()
        pool.join()
项目:flactory    作者:mehdy    | 项目源码 | 文件源码
def check_name(ctx, param, value):
    """
        Checks the project name to be a valid name
    :param ctx: app context
    :param param: parameter
    :param value: the parameter value
    :return:
    """
    regex = re.compile(r'^[a-zA-Z_]*')
    if regex.match(value):
        return value
    while True:
        try:
            click.echo(
                click.style("{}".format(value), bold=True) +
                ' is not a valid python package name. try something else',
                err=True)
            value = param.prompt_for_value(ctx)
            if regex.match(value):
                return value
            continue
        except (KeyboardInterrupt, EOFError):
            raise ctx.Abort()
项目:flactory    作者:mehdy    | 项目源码 | 文件源码
def check_template(ctx, _, value):
    """
        Checks the template to be valid template
    :param ctx: app context
    :param value: the parameter value
    :return:
    """
    if not value:
        # TODO: get list and show
        raise ctx.abort()
    else:
        url = urlparse(value)
        if not url.netloc:
            url = url._replace(netloc='github.com')
        if url.path[-4:] == '.git':
            url = url._replace(path=url.path[:-4])
        path = os.path.join(os.environ['HOME'],
                            '.flactory/templates',
                            url.netloc, url.path)
        if os.path.isdir(path):
            return path
        # TODO: if not exist pull it automatically
        repr_name = click.style("[{}] {}".format(url.netloc, url.path[:-4]),
                                bold=True)
        raise ctx.fail(repr_name + " doesn't exist.")
项目:apio    作者:FPGAwars    | 项目源码 | 文件源码
def list_boards(self):
        """Return a list with all the supported boards"""

        # Print table
        click.echo('\nSupported boards:\n')

        BOARDLIST_TPL = ('{board:22} {fpga:20} {type:<5} {size:<5} {pack:<10}')
        terminal_width, _ = click.get_terminal_size()

        click.echo('-' * terminal_width)
        click.echo(BOARDLIST_TPL.format(
            board=click.style('Board', fg='cyan'), fpga='FPGA', type='Type',
            size='Size', pack='Pack'))
        click.echo('-' * terminal_width)

        for board in self.boards:
            fpga = self.boards[board]['fpga']
            click.echo(BOARDLIST_TPL.format(
                board=click.style(board, fg='cyan'),
                fpga=fpga,
                type=self.fpgas[fpga]['type'],
                size=self.fpgas[fpga]['size'],
                pack=self.fpgas[fpga]['pack']))

        click.secho(BOARDS_MSG, fg='green')
项目:apio    作者:FPGAwars    | 项目源码 | 文件源码
def list_fpgas(self):
        """Return a list with all the supported FPGAs"""

        # Print table
        click.echo('\nSupported FPGAs:\n')

        FPGALIST_TPL = ('{fpga:30} {type:<5} {size:<5} {pack:<10}')
        terminal_width, _ = click.get_terminal_size()

        click.echo('-' * terminal_width)
        click.echo(FPGALIST_TPL.format(
            fpga=click.style('FPGA', fg='cyan'), type='Type',
            size='Size', pack='Pack'))
        click.echo('-' * terminal_width)

        for fpga in self.fpgas:
            click.echo(FPGALIST_TPL.format(
                fpga=click.style(fpga, fg='cyan'),
                type=self.fpgas[fpga]['type'],
                size=self.fpgas[fpga]['size'],
                pack=self.fpgas[fpga]['pack']))
项目:zotero-cli    作者:jbaiter    | 项目源码 | 文件源码
def pick_item(zot, item_id):
    if not ID_PAT.match(item_id):
        items = tuple(zot.search(item_id))
        if len(items) > 1:
            click.echo("Multiple matches available.")
            item_descriptions = []
            for it in items:
                desc = click.style(it.title, fg='blue')
                if it.creator:
                    desc = click.style(it.creator + u': ', fg="cyan") + desc
                if it.date:
                    desc += click.style(" ({})".format(it.date), fg='yellow')
                item_descriptions.append(desc)
            return select(zip(items, item_descriptions)).key
        elif items:
            return items[0].key
        else:
            raise ValueError("Could not find any items for the query.")
    return item_id
项目:acsoo    作者:acsone    | 项目源码 | 文件源码
def _failures_to_str(fails, no_fails):
    def _r(l):
        for msg, count, expected_count in l:
            res.append('  {}: {}'.format(msg, count))
            if expected_count is not None:
                res.append(' (expected {})'.format(expected_count))
            res.append('\n')

    res = []
    if no_fails:
        res.append(click.style(
            'messages that did not cause failure:\n',
            bold=True))
        _r(no_fails)
    if fails:
        res.append(click.style(
            'messages that caused failure:\n',
            bold=True))
        _r(fails)
    return ''.join(res)
项目:neres    作者:glogiotatidis    | 项目源码 | 文件源码
def add_monitor(ctx, name, uri, location, frequency, email, validation_string,
                bypass_head_request, verify_ssl, redirect_is_failure, sla_threshold, raw):
    if validation_string:
        # We must bypass head request we're to validate string.
        bypass_head_request = True

    with Spinner('Creating monitor: ', remove_message=raw):
        status, message, monitor = newrelic.create_monitor(
            ctx.obj['ACCOUNT'], name, uri, frequency, location, email,
            validation_string, bypass_head_request, verify_ssl, redirect_is_failure,
            sla_threshold)

    if raw:
        print(json.dumps(monitor))
        return

    if status == 0:
        print(click.style(u'OK', fg='green', bold=True))
        print('Monitor: ' + message)
    else:
        print(click.style(u'Error', fg='red', bold=True))
        raise click.ClickException(message)
项目:neres    作者:glogiotatidis    | 项目源码 | 文件源码
def delete_monitor(ctx, monitor, confirm):
    if not confirm:
        confirm = click.prompt('''
 ! WARNING: Destructive Action
 ! This command will destroy the monitor: {monitor}
 ! To proceed, type "{monitor}" or
   re-run this command with --confirm={monitor}

'''.format(monitor=monitor), prompt_suffix='> ')
    if confirm.strip() != monitor:
        print('abort')
        sys.exit(1)

    with Spinner('Deleting monitor {}: '.format(monitor), remove_message=False):
        newrelic.delete_monitor(ctx.obj['ACCOUNT'], monitor)
    print(click.style(u'OK', fg='green', bold=True))
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def get_vm_options():
    """ Get user-selected config options for the 21 VM.
    """

    logger.info(click.style("Configure 21 virtual machine.", fg=TITLE_COLOR))
    logger.info("Press return to accept defaults.")

    default_disk = Two1MachineVirtual.DEFAULT_VDISK_SIZE
    default_memory = Two1MachineVirtual.DEFAULT_VM_MEMORY
    default_port = Two1MachineVirtual.DEFAULT_SERVICE_PORT

    disk_size = click.prompt("  Virtual disk size in MB (default = %s)" % default_disk,
                             type=int, default=default_disk, show_default=False)
    vm_memory = click.prompt("  Virtual machine memory in MB (default = %s)" % default_memory,
                             type=int, default=default_memory, show_default=False)
    server_port = click.prompt("  Port for micropayments server (default = %s)" % default_port,
                               type=int, default=default_port, show_default=False)

    return VmConfiguration(disk_size=disk_size,
                           vm_memory=vm_memory,
                           server_port=server_port)
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def get_zerotier_address(marketplace):
    """ Gets the zerotier IP address from the given marketplace name

    Args:
        marketplace (str): name of the marketplace network to lookup ip

    Returns:
        str: a string representation of the zerotier IP address

    Raises:
        UnloggedException: if the zt network doesn't exist
    """
    logger.info("You might need to enter your superuser password.")
    address = zerotier.get_address(marketplace)
    if not address:
        join_cmd = click.style("21 join", bold=True, reset=False)
        no_zt_network = click.style(
            "You are not part of the {}. Use {} to join the market.",
            fg="red")
        raise UnloggedException(no_zt_network.format(marketplace, join_cmd))

    return address
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def add(ctx, service_name, docker_image_name):
    """
Add a new service to 21 sell

\b
Adding a new service
$ 21 sell add <service_name> <docker_image_name>
"""
    manager = ctx.obj['manager']
    logger.info(click.style("Adding services.", fg=cli_helpers.TITLE_COLOR))

    def service_successfully_added_hook(tag):
        cli_helpers.print_str(tag, ["Added"], "TRUE", True)

    def service_already_exists_hook(tag):
        cli_helpers.print_str(tag, ["Already exists"], "FALSE", False)

    def service_failed_to_add_hook(tag):
        cli_helpers.print_str(tag, ["Failed to add"], "FALSE", False)

    manager.add_service(service_name, docker_image_name, service_successfully_added_hook, service_already_exists_hook,
                        service_failed_to_add_hook)
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def _leave(client, network):
    """Join the given network.

    Args:
        client (two1.server.rest_client.TwentyOneRestClient) an object
            for sending authenticated requests to the TwentyOne
            backend
        network (str): the name of the network being joined. Defaults
        to 21mkt
    """
    # ensures the zerotier daemon is running
    zerotier.start_daemon()
    try:
        nwid = zerotier.get_network_id(network)
    except KeyError:
        logger.info('not in network')
        return {'left': False, 'reason': 'not in network'}
    try:
        zerotier.leave_network(nwid)
        logger.info(uxstring.UxString.successful_leave.format(click.style(network, fg="magenta")))
        return {'left': True}
    except subprocess.CalledProcessError as e:
        logger.info(str(e))
        return {'left': False, 'reason': str(e)}
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def get_or_create_wallet(wallet_path):
    """Create a new wallet or return the currently existing one."""
    data_provider = twentyone_provider.TwentyOneProvider(two1.TWO1_PROVIDER_HOST)

    if wallet.Two1Wallet.check_wallet_file(wallet_path):
        return wallet.Wallet(wallet_path=wallet_path, data_provider=data_provider)

    # configure wallet with default options
    click.pause(uxstring.UxString.create_wallet)

    wallet_options = dict(data_provider=data_provider, wallet_path=wallet_path)

    if not wallet.Two1Wallet.configure(wallet_options):
        raise click.ClickException(uxstring.UxString.Error.create_wallet_failed)

    # Display the wallet mnemonic and tell user to back it up.
    # Read the wallet JSON file and extract it.
    with open(wallet_path, 'r') as f:
        wallet_config = json.load(f)
        mnemonic = wallet_config['master_seed']

    click.pause(uxstring.UxString.create_wallet_done % click.style(mnemonic, fg='green'))

    if wallet.Two1Wallet.check_wallet_file(wallet_path):
        return wallet.Wallet(wallet_path=wallet_path, data_provider=data_provider)
项目:do-audit    作者:omni-digital    | 项目源码 | 文件源码
def click_echo_kvp(key, value, padding=20, color='green'):
    """
    Helper function for pretty printing key value pairs in click

    :param key: item key
    :type key: str
    :param value: item value
    :type value: any
    :param padding: key padding
    :type padding: int
    :param color: key color (ANSI compliant)
    :type color: str
    :returns: proper `click.echo` call to stdout
    :rtype: None
    """
    return click.echo(
        click.style('{key:<{padding}}'.format(
            key=key + ':',
            padding=padding
        ), fg=color) +
        str(value)
    )
项目:onedrived-dev    作者:xybu    | 项目源码 | 文件源码
def authenticate_account(get_auth_url=False, code=None, for_business=False):
    if for_business:
        error(translator['od_pref.authenticate_account.for_business_unsupported'])
        return
    authenticator = od_auth.OneDriveAuthenticator()
    click.echo(translator['od_pref.authenticate_account.permission_note'])
    if code is None:
        click.echo(translator['od_pref.authenticate_account.paste_url_note'])
        click.echo('\n' + click.style(authenticator.get_auth_url(), underline=True) + '\n')
        if get_auth_url:
            return
        click.echo(translator['od_pref.authenticate_account.paste_url_instruction'].format(
            redirect_url=click.style(authenticator.APP_REDIRECT_URL, bold=True)))
        url = click.prompt(translator['od_pref.authenticate_account.paste_url_prompt'], type=str)
        code = extract_qs_param(url, 'code')
        if code is None:
            error(translator['od_pref.authenticate_account.error.code_not_found_in_url'])
            return

    try:
        authenticator.authenticate(code)
        success(translator['od_pref.authenticate_account.success.authorized'])
        save_account(authenticator)
    except Exception as e:
        error(translator['od_pref.authenticate_account.error.authorization'].format(error_message=str(e)))
项目:onedrived-dev    作者:xybu    | 项目源码 | 文件源码
def print_saved_drives():
    click.echo(click.style('Drives that have been set up:', bold=True))
    all_drive_ids = context.all_drives()
    if len(all_drive_ids) > 0:
        click.echo()
        for i, drive_id in enumerate(all_drive_ids):
            curr_drive_config = context.get_drive(drive_id)
            curr_account = context.get_account(curr_drive_config.account_id)
            click.echo(' ' + click.style('#%d - Drive "%s":' % (i, curr_drive_config.drive_id), underline=True))
            click.echo('   Account:     %s (%s)' % (curr_account.account_email, curr_drive_config.account_id))
            click.echo('   Local root:  %s' % curr_drive_config.localroot_path)
            click.echo('   Ignore file: %s' % curr_drive_config.ignorefile_path)
            click.echo()
    else:
        click.echo(' No Drive has been setup with onedrived.\n')
    return all_drive_ids
项目:coreapi-cli    作者:core-api    | 项目源码 | 文件源码
def debug_request(request):
    def request_echo(fmt, *args):
        click.echo(click.style('> ', fg='blue') + expand_args(fmt, args))

    headers = request.headers
    headers['host'] = urlparse.urlparse(request.url).hostname

    request_echo(click.style('%s %s HTTP/1.1', bold=True), request.method, request.path_url)
    for key, value in sorted(headers.items()):
        request_echo('%s: %s', key.title(), value)
    if request.body:
        body_text = request.body
        if isinstance(body_text, bytes):
            body_text = body_text.decode('utf-8')
        request_echo('')
        for line in body_text.splitlines():
            request_echo(line)
项目:Zappa    作者:Miserlou    | 项目源码 | 文件源码
def package(self, output=None):
        """
        Only build the package
        """
        # Make sure we're in a venv.
        self.check_venv()

        # force not to delete the local zip
        self.override_stage_config_setting('delete_local_zip', False)
        # Execute the prebuild script
        if self.prebuild_script:
            self.execute_prebuild_script()
        # Create the Lambda Zip
        self.create_package(output)
        self.callback('zip')
        size = human_size(os.path.getsize(self.zip_path))
        click.echo(click.style("Package created", fg="green", bold=True) + ": " + click.style(self.zip_path, bold=True) + " (" + size + ")")
项目:Zappa    作者:Miserlou    | 项目源码 | 文件源码
def collision_warning(self, item):
        """
        Given a string, print a warning if this could
        collide with a Zappa core package module.

        Use for app functions and events.
        """

        namespace_collisions = [
            "zappa.", "wsgi.", "middleware.", "handler.", "util.", "letsencrypt.", "cli."
        ]
        for namespace_collision in namespace_collisions:
            if namespace_collision in item:
                click.echo(click.style("Warning!", fg="red", bold=True) +
                           " You may have a namespace collision with " + click.style(item, bold=True) +
                           "! You may want to rename that file.")
项目:yadage    作者:diana-hep    | 项目源码 | 文件源码
def decide_rule(rule, controller, idbased):
    click.secho('we could extend DAG with rule', fg='blue')
    if idbased:
        rule = next(x for x in controller.adageobj.rules if x.identifier == rule)

    click.secho('rule: {}/{} ({})'.format(rule.offset,
                                          rule.rule.name, rule.identifier))
    # IPython.embed()
    try:
        resp = raw_input(click.style("Shall we? (y/N) ", fg='blue'))
    except NameError:
        resp = input(click.style("Shall we? (y/N) ", fg='blue'))

    shall = resp.lower() == 'y'
    if shall:
        click.secho('ok we will extend.', fg='green')
    else:
        click.secho('maybe another time... your response: {}'.format(
            resp), fg='yellow')
    return shall
项目:yadage    作者:diana-hep    | 项目源码 | 文件源码
def decide_step(node, controller, idbased):


    if idbased:
        node = controller.adageobj.dag.getNode(node)

    click.echo('we could submit a DAG node {}'.format(node))
    # IPython.embed()
    try:
        resp = raw_input(click.style("Shall we? (y/N) ", fg='magenta'))
    except NameError:
        resp = input(click.style("Shall we? (y/N) ", fg='magenta'))

    shall = resp.lower() == 'y'
    if shall:
        click.secho('ok we will submit.', fg='green')
    else:
        click.secho('will not submit for now... your response: {}'.format(
            resp), fg='yellow')
    return shall
项目:python-cireqs    作者:trustpilot    | 项目源码 | 文件源码
def cli(ctx, dirpath, pythonversion, timeout):
    ctx.obj = conf(
        python_version=pythonversion,
        dir_path=dirpath or getcwd(),
        timeout=timeout
    )
    cireqs.set_log_level(click_log.get_level())

    splash = r"""
           o8o
           `"'
 .ooooo.  oooo  oooo d8b  .ooooo.   .ooooo oo  .oooo.o
d88' `"Y8 `888  `888""8P d88' `88b d88' `888  d88(  "8
888        888   888     888ooo888 888   888  `"Y88b.
888   .o8  888   888     888    .o 888   888  o.  )88b
`Y8bod8P' o888o d888b    `Y8bod8P' `V8bod888  8""888P'
                                         888.
                                         8P'  """
    splash = click.style(splash, fg='green') + click.style(
        "v{}".format(cireqs.__version__.__version__), fg='red') + "\n"

    if ctx.invoked_subcommand is None:
        click.echo("\n".join([splash, ctx.get_help()]))
        return
项目:lightflow    作者:AustralianSynchrotron    | 项目源码 | 文件源码
def _style(enabled, text, **kwargs):
    """ Helper function to enable/disable styled output text.

    Args:
        enable (bool): Turn on or off styling.
        text (string): The string that should be styled.
        kwargs (dict): Parameters that are passed through to click.style

    Returns:
        string: The input with either the styling applied (enabled=True)
                or just the text (enabled=False)
    """
    if enabled:
        return click.style(text, **kwargs)
    else:
        return text
项目:twtxt    作者:buckket    | 项目源码 | 文件源码
def follow(ctx, nick, url, force):
    """Add a new source to your followings."""
    source = Source(nick, url)
    sources = ctx.obj['conf'].following

    if not force:
        if source.nick in (source.nick for source in sources):
            click.confirm("? You’re already following {0}. Overwrite?".format(
                click.style(source.nick, bold=True)), default=False, abort=True)

        _, status = get_remote_status([source])[0]
        if not status or status.status_code != 200:
            click.confirm("? The feed of {0} at {1} is not available. Follow anyway?".format(
                click.style(source.nick, bold=True),
                click.style(source.url, bold=True)), default=False, abort=True)

    ctx.obj['conf'].add_source(source)
    click.echo("? You’re now following {0}.".format(
        click.style(source.nick, bold=True)))
项目:twtxt    作者:buckket    | 项目源码 | 文件源码
def unfollow(ctx, nick):
    """Remove an existing source from your followings."""
    source = ctx.obj['conf'].get_source_by_nick(nick)

    try:
        with Cache.discover() as cache:
            cache.remove_tweets(source.url)
    except OSError as e:
        logger.debug(e)

    ret_val = ctx.obj['conf'].remove_source_by_nick(nick)
    if ret_val:
        click.echo("? You’ve unfollowed {0}.".format(
            click.style(source.nick, bold=True)))
    else:
        click.echo("? You’re not following {0}.".format(
            click.style(nick, bold=True)))
项目:twtxt    作者:buckket    | 项目源码 | 文件源码
def style_tweet(tweet, porcelain=False):
    conf = click.get_current_context().obj["conf"]
    limit = conf.character_limit

    if porcelain:
        return "{nick}\t{url}\t{tweet}".format(
            nick=tweet.source.nick,
            url=tweet.source.url,
            tweet=str(tweet))
    else:
        if sys.stdout.isatty() and not tweet.text.isprintable():
            return None
        styled_text = format_mentions(tweet.text)
        len_styling = len(styled_text) - len(click.unstyle(styled_text))
        final_text = textwrap.shorten(styled_text, limit + len_styling) if limit else styled_text
        timestamp = tweet.absolute_datetime if conf.use_abs_time else tweet.relative_datetime
        return "? {nick} ({time}):\n{tweet}".format(
            nick=click.style(tweet.source.nick, bold=True),
            tweet=final_text,
            time=click.style(timestamp, dim=True))
项目:twtxt    作者:buckket    | 项目源码 | 文件源码
def style_source_with_status(source, status, porcelain=False):
    if porcelain:
        return "{nick}\t{url}\t{status}\t{content_length}\t{last_modified}".format(
            nick=source.nick,
            url=source.url,
            status=status.status_code,
            content_length=status.content_length,
            last_modified=status.last_modified)
    else:
        if status.status_code == 200:
            scolor, smessage = "green", str(status.status_code)
        elif status:
            scolor, smessage = "red", str(status.status_code)
        else:
            scolor, smessage = "red", "ERROR"
        return "? {nick} @ {url} [{content_length}, {last_modified}] ({status})".format(
            nick=click.style(source.nick, bold=True, fg=scolor),
            url=source.url,
            status=click.style(smessage, fg=scolor),
            content_length=status.natural_content_length,
            last_modified=status.natural_last_modified)
项目:try    作者:timofurrer    | 项目源码 | 文件源码
def cli(packages, virtualenv, python, use_ipython, shell, keep, use_editor, tmpdir, index):  # pylint: disable=too-many-arguments
    """Easily try out python packages."""
    if not packages:
        raise click.BadArgumentUsage("At least one package is required.")

    if not shell and use_ipython:
        shell = "ipython"

    click.echo("==> Use python {0}".format(click.style(python, bold=True)))
    if shell:
        click.echo("==> Use shell {0}".format(click.style(shell, bold=True)))
    click.echo("[*] Downloading packages: {0}".format(click.style(",".join(p.url for p in packages), bold=True)))

    try:
        envdir = try_packages(packages, virtualenv, python, shell, use_editor, keep, tmpdir, index)
    except TryError as error:
        click.secho("[*] {0}".format(error), fg="red")
        sys.exit(1)

    if keep:
        click.echo("==> Have a look at the try environment at: {0}".format(envdir))
项目:AerisCloud    作者:AerisCloud    | 项目源码 | 文件源码
def _github_ask_credentials():
    click.echo('''
We need your GitHub credentials to create an access token to be stored in
AerisCloud. Your credentials won't be stored anywhere.

The access token will allow AerisCloud to access your private repositories
as well as those owned by any organization you might be a member of.

You can revoke this access token at any time by going to this URL:
%s
''' % (click.style('https://github.com/settings/applications', bold=True)))

    user = None
    pwd = None
    while not user:
        user = click.prompt('Github username')
    while not pwd:
        pwd = click.prompt('Github password', hide_input=True)

    return user, pwd
项目:AerisCloud    作者:AerisCloud    | 项目源码 | 文件源码
def _show_service_list(service_list, enabled_services):
    """
    Show the service list, coloring enabled services, return the list of non
    default services

    TODO: find a better command name, as it doesn't just show services
    """
    if not enabled_services:
        enabled_services = []

    service_complete = {}
    for service, service_info in sorted(service_list.iteritems()):
        if not service_info['default']:
            service_complete[service] = service_info['description']
            if service in enabled_services:
                click.secho('* %s - %s' % (
                    style(service, bold=True, fg='green'),
                    service_info['description']
                ))
            else:
                click.secho('* %s - %s' % (
                    style(service, bold=True),
                    service_info['description']
                ))
    return service_complete
项目:AerisCloud    作者:AerisCloud    | 项目源码 | 文件源码
def _up(box, make):
    click.echo('\nStarting box %s\n' %
               style(box.name(), bold=True))

    if not box.is_running():
        start_box(box)
    else:
        box.vagrant('provision')

    if make:
        click.echo('\nRunning make %s in your box\n' %
                   style(make, bold=True))
        if box.ssh_shell('make %s' % make) != 0:
            fatal('make command failed!')

        if sync(box, 'down') is False:
            # sync failed, message should already be displayed, exit
            sys.exit(1)
项目:AerisCloud    作者:AerisCloud    | 项目源码 | 文件源码
def _clone_project(project_name):
    gh = Github()

    (gh_repo, forked) = gh.get_repo(project_name, fork=True)
    if not gh_repo:
        click.echo('error: no repository named %s was found on '
                   'your user and configured organizations' %
                   click.style(project_name, bold=True))
        sys.exit(1)

    if forked:
        click.echo('We found a repository named %s and forked it to your '
                   'user, it now accessible at the following url:\n'
                   '%s' % (forked.full_name, gh_repo.html_url))

    dest_path = os.path.join(projects_path(), project_name)
    click.echo('We will clone %s in %s\n' % (gh_repo.ssh_url, dest_path))

    vgit('clone', gh_repo.ssh_url, dest_path)

    # add our upstream remote
    if gh_repo.parent:
        repo = Repo(dest_path)
        repo.create_remote('upstream', gh_repo.parent.ssh_url)
项目:AerisCloud    作者:AerisCloud    | 项目源码 | 文件源码
def list(output_json):
    """
    List the available base boxes
    """
    boxes = baseboxes()

    if output_json:
        json.dump(boxes, sys.stdout)
        return

    click.secho('The following boxes are available:\n', fg='yellow')

    if not boxes:
        click.secho('  No box found', fg='cyan')
        sys.exit(0)

    for infra in boxes:
        click.secho('* %s:' % (infra), fg='blue', bold=True)
        for box in boxes[infra]:
            click.echo('\t* %s' % (click.style(box, fg='green')))
项目:AerisCloud    作者:AerisCloud    | 项目源码 | 文件源码
def cli(boxes):
    """
    Suspend a running box, see resume
    """
    for project, project_boxes in boxes.iteritems():
        running_boxes = project_boxes.running()
        project_name = click.style(project.name(), fg='magenta')

        if not running_boxes:
            click.secho('No running boxes found for %s' % project_name,
                        fg='yellow', bold=True)
            continue

        click.secho('Suspending boxes for %s' % project_name,
                    fg='blue', bold=True)
        for box in running_boxes:
            box.suspend()
            click.echo(''.join([
                click.style('\tBox ', fg='green'),
                click.style(box.name(), bold=True),
                click.style(' has been suspended.', fg='green')
            ]))
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def queues():
    """Report on progress by queues."""
    conn = worker.connection
    failure_q = None

    def _repr(q):
        return "running:%d pending:%d finished:%d" % (
            StartedJobRegistry(q.name, conn).count,
            q.count,
            FinishedJobRegistry(q.name, conn).count)
    for q in Queue.all(conn):
        if q.name == 'failed':
            failure_q = q
            continue
        click.echo("%s %s" % (q.name, _repr(q)))
    if failure_q:
        click.echo(
            click.style(failure_q.name, fg='red') + ' %s' % _repr(failure_q))
项目:serplint    作者:beaugunderson    | 项目源码 | 文件源码
def log_message(self, line, character, error, message, reposition=True):
        """
        Log a linter message to stderr, ignoring duplicates.
        """
        if error[0] == 'E':
            formatted_code = click.style(error, fg='red')
        else:
            formatted_code = click.style(error, fg='yellow')

        if reposition:
            line, character = self.reposition(line, character)

        message = '{}:{}:{} {} {}'.format(self.filename,
                                          line,
                                          character,
                                          formatted_code,
                                          message)

        if message in self.logged_messages:
            return

        self.exit_code = 1
        self.logged_messages.append(message)

        click.echo(message)
项目:nebula-cmd    作者:nebula-orchestrator    | 项目源码 | 文件源码
def check_api(self):
        reply = self.connection.check_api()
        if reply.status_code == 200:
            reply_json = reply.json()
            if reply_json == {'api_available': 'True'}:
                click.echo("nebula responding as expected")
            else:
                click.echo(
                    click.style(
                        "nebula api not responding, are you logged in?",
                        fg="red"))
        else:
            click.echo(
                click.style(
                    "nebula api not responding, are you logged in?",
                    fg="red"))
项目:pGo-create    作者:diksm8    | 项目源码 | 文件源码
def worker_accountCreator(counters):
    while counters[0] != counters[3]:
        Account = creatorQueue.get()
        Account.setupMailbox()
        Account = makeClubAccount(Account)
        if Account.errorState == None:
            logQueue.put(click.style('Created account %s.' % Account.username, fg = 'green', bold=True))
            Account.save()
            counters[0]+=1
            logQueue.put('WRITE BLYAD')
            tosQueue.put(Account)
            verifierQueue.put(Account)
        else:
            logQueue.put(False)
            logQueue.put(click.style('Error occured at stage %d when creating account %s' % (Account.errorState, Account.username), fg='red', bold=True))
    return True
项目:pGo-create    作者:diksm8    | 项目源码 | 文件源码
def accept(self):
        counter = 0
        while counter != 60:
            emails = requests.get(self.inbox, verify=False).text
            if len(emails) <= 1:
                time.sleep(0.2)
                counter += 1
                continue
            for line in emails.split('\n'):
                if line.startswith('https://'):
                    try:
                        if 'Thank you for signing up! Your account is now active.' in requests.get(line).text:
                            self.accepted = True
                            return True
                    except requests.ConnectionError:
                        time.sleep(requestSleepTimerB)
                        if requestSleepTimerB == 5.1:
                            logQueue.put(click.style('[Mail accepter] Received slow down warning. Using max delay of 5.1 already.', fg='red', bold=True))
                        else:
                            logQueue.put(click.style('[Mail accepter] Received slow down warning. Increasing delay from %d to %d.' % (requestSleepTimerB, requestSleepTimerB+0.2), fg='red', bold=True))
                            requestSleepTimerB += 0.2
                    return False
        return False
项目:client    作者:wandb    | 项目源码 | 文件源码
def push(ctx, run, project, description, entity, force, files):
    # TODO: do we support the case of a run with the same name as a file?
    if os.path.exists(run):
        raise BadParameter("Run id is required if files are specified.")
    project, run = api.parse_slug(run, project=project)

    click.echo("Updating run: {project}/{run}".format(
        project=click.style(project, bold=True), run=run))

    candidates = []
    if len(files) == 0:
        raise BadParameter("No files specified")

    # TODO: Deal with files in a sub directory
    api.push(project, files=[f.name for f in files], run=run,
             description=description, entity=entity, force=force, progress=sys.stdout)


#@cli.command(context_settings=CONTEXT, help="Pull files from Weights & Biases")
项目:polyaxon-cli    作者:polyaxon    | 项目源码 | 文件源码
def add_color(value, color):
        return click.style('{}'.format(value), fg=color)
项目:gimel    作者:Alephbet    | 项目源码 | 文件源码
def format(self, record):
        import click
        s = super(ColorFormatter, self).format(record)
        if not record.exc_info:
            level = record.levelname.lower()
            if level in self.colors:
                s = click.style(s, **self.colors[level])
        return s
项目:defplorex    作者:trendmicro    | 项目源码 | 文件源码
def delete_index(index):
    """Delete an index"""
    click.clear()

    click.confirm(
            click.style(
                'Really DELETE index "%s"?' % index,
                fg='white',
                bg='red'), abort=True)

    es.client.indices.delete(index=index)

    log.info('Index deleted')
项目:findcve    作者:garethr    | 项目源码 | 文件源码
def print_vulns(package, vulns):
    if vulns:
        click.secho("%s has vulnerabilities" % package, fg="green")
        click.echo("   Currently installed %s" % vulns[0]["installed"])
        click.echo("   Latest version %s" % vulns[0]["latest"])
        for vuln in vulns:
            click.echo("   %s is %s" % (vuln["cve"], click.style(vuln["urgency"], fg=COLOR_MAPPING[vuln["urgency"].replace("*", "")])))
            if vuln["fixed"]:
                click.echo("      Vulnerability fixed in %s" % vuln["fixed"])
项目:albt    作者:geothird    | 项目源码 | 文件源码
def colored_text(text, color, bold=False):
        """
        Color text
        :param text:
        :param color:
        :param bold:
        :return:
        """
        return click.style(str(text), fg=color, bold=bold)
项目:albt    作者:geothird    | 项目源码 | 文件源码
def version():
    """
    Display albt version
    """
    click.echo(click.style('albt: ', fg='white') + click.style('1.0.2b4', fg='green'))