Python click 模块,Context() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用click.Context()

项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def _write_requirements(ctx: click.Context, packages_list, outfile, prefix_list=None):
    with open('temp', 'w') as source_file:
        source_file.write('\n'.join(packages_list))
    packages, _, ret = do_ex(
        ctx,
        [
            'pip-compile',
            '--index',
            '--upgrade',
            '--annotate',
            '--no-header',
            '-n',
            'temp'
        ]
    )
    os.remove('temp')
    with open(outfile, 'w') as req_file:
        if prefix_list:
            for prefix in prefix_list:
                req_file.write(f'{prefix}\n')
        for package in packages.splitlines():
            req_file.write(f'{package}\n')
项目:click-man    作者:click-contrib    | 项目源码 | 文件源码
def generate_man_page(ctx, version=None):
    """
    Generate documentation for the given command.

    :param click.Context ctx: the click context for the
                              cli application.

    :rtype: str
    :returns: the generate man page from the given click Context.
    """
    # Create man page with the details from the given context
    man_page = ManPage(ctx.command_path)
    man_page.version = version
    man_page.short_help = ctx.command.short_help
    man_page.description = ctx.command.help
    man_page.synopsis = ' '.join(ctx.command.collect_usage_pieces(ctx))
    man_page.options = [x.get_help_record(None) for x in ctx.command.params if isinstance(x, click.Option)]
    commands = getattr(ctx.command, 'commands', None)
    if commands:
        man_page.commands = [(k, v.short_help) for k, v in commands.items()]

    return str(man_page)
项目:click-man    作者:click-contrib    | 项目源码 | 文件源码
def write_man_pages(name, cli, parent_ctx=None, version=None, target_dir=None):
    """
    Generate man page files recursively
    for the given click cli function.

    :param str name: the cli name
    :param cli: the cli instance
    :param click.Context parent_ctx: the parent click context
    :param str target_dir: the directory where the generated
                           man pages are stored.
    """
    ctx = click.Context(cli, info_name=name, parent=parent_ctx)

    man_page = generate_man_page(ctx, version)
    path = '{0}.1'.format(ctx.command_path.replace(' ', '-'))
    if target_dir:
        path = os.path.join(target_dir, path)

    with open(path, 'w+') as f:
        f.write(man_page)

    commands = getattr(cli, 'commands', {})
    for name, command in commands.items():
        write_man_pages(name, command, parent_ctx=ctx, version=version, target_dir=target_dir)
项目:reqwire    作者:darvid    | 项目源码 | 文件源码
def main(ctx,               # type: click.Context
         directory,         # type: click.Path
         quiet,             # type: bool
         extension,         # type: str
         source_directory,  # type: str
         build_directory,   # type: str
         ):
    # type: (...) -> None
    """reqwire: micromanages your requirements."""
    requirements_dir = pathlib.Path(str(directory))
    console.verbose = not quiet
    ctx.obj = {
        'build_dir': build_directory,
        'directory': requirements_dir,
        'extension': extension,
        'source_dir': source_directory,
    }
项目:chalice    作者:aws    | 项目源码 | 文件源码
def deploy(ctx, autogen_policy, profile, api_gateway_stage, stage,
           connection_timeout):
    # type: (click.Context, Optional[bool], str, str, str, int) -> None
    factory = ctx.obj['factory']  # type: CLIFactory
    factory.profile = profile
    config = factory.create_config_obj(
        chalice_stage_name=stage, autogen_policy=autogen_policy,
        api_gateway_stage=api_gateway_stage,
    )
    session = factory.create_botocore_session(
        connection_timeout=connection_timeout)
    d = factory.create_default_deployer(session=session,
                                        ui=UI())
    deployed_values = d.deploy(config, chalice_stage_name=stage)
    record_deployed_values(deployed_values, os.path.join(
        config.project_dir, '.chalice', 'deployed.json'))
项目:chalice    作者:aws    | 项目源码 | 文件源码
def url(ctx, stage):
    # type: (click.Context, str) -> None
    factory = ctx.obj['factory']  # type: CLIFactory
    config = factory.create_config_obj(stage)
    deployed = config.deployed_resources(stage)
    if deployed is not None:
        click.echo(
            "https://{api_id}.execute-api.{region}.amazonaws.com/{stage}/"
            .format(api_id=deployed.rest_api_id,
                    region=deployed.region,
                    stage=deployed.api_gateway_stage)
        )
    else:
        e = click.ClickException(
            "Could not find a record of deployed values to chalice stage: '%s'"
            % stage)
        e.exit_code = 2
        raise e
项目:chalice    作者:aws    | 项目源码 | 文件源码
def generate_sdk(ctx, sdk_type, stage, outdir):
    # type: (click.Context, str, str, str) -> None
    factory = ctx.obj['factory']  # type: CLIFactory
    config = factory.create_config_obj(stage)
    session = factory.create_botocore_session()
    client = TypedAWSClient(session)
    deployed = config.deployed_resources(stage)
    if deployed is None:
        click.echo("Could not find API ID, has this application "
                   "been deployed?", err=True)
        raise click.Abort()
    else:
        rest_api_id = deployed.rest_api_id
        api_gateway_stage = deployed.api_gateway_stage
        client.download_sdk(rest_api_id, outdir,
                            api_gateway_stage=api_gateway_stage,
                            sdk_type=sdk_type)
项目:temci    作者:parttimenerd    | 项目源码 | 文件源码
def validate(type_scheme: Type) -> t.Callable[[click.Context, str, t.Any], t.Any]:
    """
    Creates a valid click option validator function that can be passed to click via the callback
    parameter.
    The validator function expects the type of the value to be the raw type of the type scheme.

    :param type_scheme: type scheme the validator validates against
    :return: the validator function
    """
    def func(ctx, param, value):
        param = param.human_readable_name
        param = param.replace("-", "")
        res = verbose_isinstance(value, type_scheme, value_name=param)
        if not res:
            raise click.BadParameter(str(res))
        return value
    return func
项目:bocco-api-python    作者:YUKAI    | 项目源码 | 文件源码
def messages(ctx,
             room_uuid,
             newer_than,
             older_than,
             limit,
             verbose):
    # type: (click.Context, str, int, int, int, bool) -> None
    """???????????????"""
    api = ctx.obj['api']
    messages = api.get_messages(uuid.UUID(room_uuid),
                                newer_than=newer_than,
                                older_than=older_than)
    template = u'{m[date]} {m[user][nickname]} {m[text]}'
    if verbose:
        template = u'''
{m[date]} {m[user][nickname]} {m[text]}
\tid: {m[id]}
\tunique_id: {m[unique_id]}
\tmedia: {m[media]}
\tmessage_type: {m[message_type]}
\tdictated: {m[dictated]}
'''.strip()
    for m in messages[-limit:]:
        click.echo(template.format(m=m))  # type: ignore
项目:sphinx-click    作者:click-contrib    | 项目源码 | 文件源码
def test_no_parameters(self):
        """Validate a `click.Group` with no parameters.

        This exercises the code paths for a group with *no* arguments, *no*
        options and *no* environment variables.
        """

        @click.group()
        def cli():
            """A sample command group."""
            pass

        ctx = click.Context(cli, info_name='cli')
        output = list(ext._format_command(ctx, show_nested=False))

        self.assertEqual(textwrap.dedent("""
        .. program:: cli
        .. code-block:: shell

            cli [OPTIONS] COMMAND [ARGS]...
        """).lstrip(), '\n'.join(output))
项目:TigerHost    作者:naphatkrit    | 项目源码 | 文件源码
def require_executable(executable):
    """Check if an executable is installed. If not, exit with an
    error.
    """
    def decorator(f):
        @click.pass_context
        def new_func(ctx, *args, **kwargs):
            """
            :param click.Context ctx:
            """
            if utils.which(executable) is None:
                click.echo('{} is not installed. Please install it.'.format(executable))
                ctx.exit(code=exit_codes.OTHER_FAILURE)
            return ctx.invoke(f, *args, **kwargs)
        return update_wrapper(new_func, f)
    return decorator
项目:TigerHost    作者:naphatkrit    | 项目源码 | 文件源码
def ensure_executable_exists(name, get_executable):
    """Ensures that the private executable exists. If it doesn't, then
    call get_executable, which must be a callable that installs
    the executable.
    """
    def decorator(f):
        @click.pass_context
        def new_func(ctx, *args, **kwargs):
            """
            @type ctx: click.Context
            """
            path = path_utils.executable_path(name)
            if not os.path.exists(path):
                echo_heading('Installing {}.'.format(name), marker='-', marker_color='magenta')
                get_executable()
                assert os.path.exists(path)
            return ctx.invoke(f, *args, **kwargs)
        return update_wrapper(new_func, f)
    return decorator
项目:sceptre    作者:cloudreach    | 项目源码 | 文件源码
def get_stack_or_env(ctx, path):
    """
    Parses the path to generate relevant Envrionment and Stack object.

    :param ctx: Cli context.
    :type ctx: click.Context
    :param path: Path to either stack config or environment folder.
    :type path: str
    """
    stack = None
    env = None

    config_reader = ConfigReader(ctx.obj["sceptre_dir"], ctx.obj["options"])

    if os.path.splitext(path)[1]:
        stack = config_reader.construct_stack(path)
    else:
        env = config_reader.construct_environment(path)

    return (stack, env)
项目:proxenos    作者:darvid    | 项目源码 | 文件源码
def main(ctx):
    # type: (click.Context) -> None
    """proxenos-cli: Consistently select nodes from service discovery."""
项目:SIDR    作者:damurdock    | 项目源码 | 文件源码
def validate_taxdump(value, method):
    if (os.path.isfile("%s/%s" % (value, "names.dmp")) and
        os.path.isfile("%s/%s" % (value, "nodes.dmp")) and
        os.path.isfile("%s/%s" % (value, "merged.dmp")) and
        os.path.isfile("%s/%s" % (value, "delnodes.dmp"))):
        return value
    else:
        with click.Context(method) as ctx:
            click.echo(ctx.get_help())
            raise click.BadParameter("Could not find names.dmp in taxdump, specify a value or make sure the files are present")
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def cd(path):
    """
    Context to temporarily change the working directory

    Args:
        path: working directory to cd into
    """
    old_dir = os.getcwd()
    os.chdir(path)
    try:
        yield
    finally:
        os.chdir(old_dir)
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def _print_version(ctx: click.Context, param, value):
    if not value or ctx.resilient_parsing:
        return

    ensure_repo()

    _get_version(ctx)
    exit(0)


# @click.group(invoke_without_command=True)
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def reqs(ctx: click.Context, prod, test, dev):
    """
    Write requirements files
    """
    if not find_executable('pip-compile'):
        click.secho('Missing module "pip-tools".\n'
                    'Install it manually with: "pip install pip-tools"\n'
                    'Or install all dependencies with: "pip install -r requirements-dev.txt"',
                    err=True, fg='red')
        exit(-1)
    if prod:
        sys.path.insert(0, os.path.abspath('.'))
        from setup import install_requires
        _write_requirements(
            ctx,
            packages_list=install_requires,
            outfile='requirements.txt'
        )
        sys.path.pop(0)
    if test:
        """Writes requirements-test.txt"""
        from setup import test_requires
        _write_requirements(
            ctx,
            packages_list=test_requires,
            outfile='requirements-test.txt',
            prefix_list=['-r requirements.txt']
        )
    if dev:
        """Writes requirements-dev.txt"""
        from setup import dev_requires
        _write_requirements(
            ctx,
            packages_list=dev_requires,
            outfile='requirements-dev.txt',
            prefix_list=['-r requirements.txt', '-r requirements-test.txt']
        )
项目:reqwire    作者:darvid    | 项目源码 | 文件源码
def pip_install(ctx, *specifiers):
    # type: (click.Context, str) -> None
    try:
        result = sh.pip.install(*specifiers, _err_to_out=True, _iter=True)
        for line in result:
            click.echo(line, nl=False)
    except sh.ErrorReturnCode:
        ctx.abort()
项目:reqwire    作者:darvid    | 项目源码 | 文件源码
def main_build(ctx,                  # type: click.Context
               options,              # type: Dict[str, Any]
               all,                  # type: bool
               tag,                  # type: Iterable[str]
               pip_compile_options,  # type: Iterable[str]
               ):
    # type: (...) -> None
    """Build requirements with pip-compile."""
    if not options['directory'].exists():
        console.error('run `{} init\' first', ctx.find_root().info_name)
        ctx.abort()
    if not all and not tag:
        console.error('either --all or --tag must be provided.')
        ctx.abort()
    src_dir = options['directory'] / options['source_dir']
    dest_dir = options['directory'] / options['build_dir']
    if not dest_dir.exists():
        dest_dir.mkdir()
    default_args = ['-r']
    if not tag:
        pattern = '*{}'.format(options['extension'])
        tag = (path.stem for path in src_dir.glob(pattern))
    for tag_name in tag:
        src = src_dir / ''.join((tag_name, options['extension']))
        dest = dest_dir / '{}.txt'.format(tag_name)
        console.info('building {}', click.format_filename(str(dest)))
        args = default_args[:]
        args += [str(src)]
        args += list(pip_compile_options)
        with atomicwrites.AtomicWriter(str(dest), 'w', True).open() as f:
            f.write(reqwire.scaffold.MODELINES_HEADER)
            with tempfile.NamedTemporaryFile() as temp_file:
                args += ['-o', temp_file.name]
                sh.pip_compile(*args, _out=f, _tty_out=False)
项目:reqwire    作者:darvid    | 项目源码 | 文件源码
def main_init(ctx,              # type: click.Context
              options,          # type: Dict[str, Any]
              force,            # type: bool
              index_url,        # type: str
              tag,              # type: Iterable[str]
              extra_index_url,  # type: Tuple[str]
              ):
    # type: (...) -> None
    """Initialize reqwire in the current directory."""
    if not force and options['directory'].exists():
        console.error('requirements directory already exists')
        ctx.abort()
    src_dir = reqwire.scaffold.init_source_dir(
        options['directory'], exist_ok=force, name=options['source_dir'])
    console.info('created {}', click.format_filename(str(src_dir)))

    build_dir = reqwire.scaffold.init_source_dir(
        options['directory'], exist_ok=force, name=options['build_dir'])
    console.info('created {}', click.format_filename(str(build_dir)))

    if not tag:
        tag = ('docs', 'main', 'qa', 'test')
    for tag_name in tag:
        filename = reqwire.scaffold.init_source_file(
            working_directory=options['directory'],
            tag_name=tag_name,
            extension=options['extension'],
            index_url=index_url,
            extra_index_urls=extra_index_url)
        console.info('created {}', click.format_filename(str(filename)))
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def get_full_name(ctx):
    """
    Return the fully qualified name of the command.

    >>> main_ctx = click.Context(click.Command('main'))
    >>> market_ctx = click.Context(click.Command('market'), parent=main_ctx)
    >>> join_ctx = click.Context(click.Command('join'), parent=market_ctx)
    >>> get_full_name(join_ctx)
    '21_market_join'
    """
    if ctx.parent is None:  # This is `two1.cli.main`
        return '21'
    else:
        return get_full_name(ctx.parent) + '_' + ctx.command.name
项目:chalice    作者:aws    | 项目源码 | 文件源码
def cli(ctx, project_dir, debug=False):
    # type: (click.Context, str, bool) -> None
    if project_dir is None:
        project_dir = os.getcwd()
    ctx.obj['project_dir'] = project_dir
    ctx.obj['debug'] = debug
    ctx.obj['factory'] = CLIFactory(project_dir, debug)
    os.chdir(project_dir)
项目:chalice    作者:aws    | 项目源码 | 文件源码
def local(ctx, host='127.0.0.1', port=8000, stage=DEFAULT_STAGE_NAME):
    # type: (click.Context, str, int, str) -> None
    factory = ctx.obj['factory']  # type: CLIFactory
    run_local_server(factory, host, port, stage, os.environ)
项目:chalice    作者:aws    | 项目源码 | 文件源码
def delete(ctx, profile, stage):
    # type: (click.Context, str, str) -> None
    factory = ctx.obj['factory']  # type: CLIFactory
    factory.profile = profile
    config = factory.create_config_obj(chalice_stage_name=stage)
    session = factory.create_botocore_session()
    d = factory.create_default_deployer(session=session, ui=UI())
    d.delete(config, chalice_stage_name=stage)
    remove_stage_from_deployed_values(stage, os.path.join(
        config.project_dir, '.chalice', 'deployed.json'))
项目:chalice    作者:aws    | 项目源码 | 文件源码
def logs(ctx, num_entries, include_lambda_messages, stage, profile):
    # type: (click.Context, int, bool, str, str) -> None
    factory = ctx.obj['factory']  # type: CLIFactory
    factory.profile = profile
    config = factory.create_config_obj(stage, False)
    deployed = config.deployed_resources(stage)
    if deployed is not None:
        session = factory.create_botocore_session()
        retriever = factory.create_log_retriever(
            session, deployed.api_handler_arn)
        display_logs(retriever, num_entries, include_lambda_messages,
                     sys.stdout)
项目:chalice    作者:aws    | 项目源码 | 文件源码
def package(ctx, single_file, stage, out):
    # type: (click.Context, bool, str, str) -> None
    factory = ctx.obj['factory']  # type: CLIFactory
    config = factory.create_config_obj(stage)
    packager = factory.create_app_packager(config)
    if single_file:
        dirname = tempfile.mkdtemp()
        try:
            packager.package_app(config, dirname)
            create_zip_file(source_dir=dirname, outfile=out)
        finally:
            shutil.rmtree(dirname)
    else:
        packager.package_app(config, out)
项目:chalice    作者:aws    | 项目源码 | 文件源码
def generate_pipeline(ctx, codebuild_image, source, buildspec_file, filename):
    # type: (click.Context, str, str, str, str) -> None
    """Generate a cloudformation template for a starter CD pipeline.

    This command will write a starter cloudformation template to
    the filename you provide.  It contains a CodeCommit repo,
    a CodeBuild stage for packaging your chalice app, and a
    CodePipeline stage to deploy your application using cloudformation.

    You can use any AWS SDK or the AWS CLI to deploy this stack.
    Here's an example using the AWS CLI:

        \b
        $ chalice generate-pipeline pipeline.json
        $ aws cloudformation deploy --stack-name mystack \b
            --template-file pipeline.json --capabilities CAPABILITY_IAM
    """
    from chalice import pipeline
    factory = ctx.obj['factory']  # type: CLIFactory
    config = factory.create_config_obj()
    p = pipeline.CreatePipelineTemplate()
    params = pipeline.PipelineParameters(
        app_name=config.app_name,
        lambda_python_version=config.lambda_python_version,
        codebuild_image=codebuild_image,
        code_source=source,
    )
    output = p.create_template(params)
    if buildspec_file:
        extractor = pipeline.BuildSpecExtractor()
        buildspec_contents = extractor.extract_buildspec(output)
        with open(buildspec_file, 'w') as f:
            f.write(buildspec_contents)
    with open(filename, 'w') as f:
        f.write(serialize_to_json(output))
项目:temci    作者:parttimenerd    | 项目源码 | 文件源码
def convert(self, value, param, ctx: click.Context) -> t.List[str]:
        """
        Convert method that makes this class usable as a click type.
        """
        if isinstance(value, self):
            return value
        elif isinstance(value, str):
            value = str(value)
            return value.split(",")
        self.fail("{} is no valid comma separated string list".format(value), param, ctx)
项目:temci    作者:parttimenerd    | 项目源码 | 文件源码
def convert(self, value, param, ctx: click.Context) -> t.Optional[bool]:
        """
        Convert method that makes this class usable as a click type.
        """
        if isinstance(value, self):
            return value
        elif isinstance(value, str):
            value = value.lower()
            if value == "true" :
                return True
            elif value == "false":
                return False
            elif value == "none":
                return None
        self.fail("{} is no valid bool or 'none'".format(value), param, ctx)
项目:temci    作者:parttimenerd    | 项目源码 | 文件源码
def convert(self, value, param, ctx: click.Context) -> int:
        """
        Convert method that makes this class usable as a click type.
        """
        if isinstance(value, self):
            return value
        self.fail("{} is no valid time span".format(value), param, ctx)
项目:Zappa    作者:Miserlou    | 项目源码 | 文件源码
def disable_click_colors():
    """
    Set a Click context where colors are disabled. Creates a throwaway BaseCommand
    to play nicely with the Context constructor.
    The intended side-effect here is that click.echo() checks this context and will
    suppress colors.
    https://github.com/pallets/click/blob/e1aa43a3/click/globals.py#L39
    """

    ctx = Context(BaseCommand('AllYourBaseAreBelongToUs'))
    ctx.color = False
    push_context(ctx)
项目:bocco-api-python    作者:YUKAI    | 项目源码 | 文件源码
def cli(ctx, config, access_token):
    # type: (click.Context, str, str) -> None
    """BOCCO API http://api-docs.bocco.me/ ? CLI ????????"""
    debug = False
    downloads = None
    if config:
        with open(config, 'r') as f:
            config_json = json.load(f)
            debug = config_json['debug']
            downloads = config_json['downloads']
            access_token = config_json['access_token']

    ctx.obj['api'] = Client(access_token)
    ctx.obj['debug'] = debug
    ctx.obj['downloads'] = downloads
项目:bocco-api-python    作者:YUKAI    | 项目源码 | 文件源码
def rooms(ctx, verbose):
    # type: (click.Context, bool) -> None
    """???????"""
    api = ctx.obj['api']
    template = u'{index}. {r[name]}\n\t{r[uuid]}'
    if verbose:
        template = u'''
{index}. {r[name]}
\tUUID: {r[uuid]}
\tMembers({members_count}): {members}
\tSensors({sensors_count}): {sensors}
\tLast message: {last_message_id}
\tUpdated at: {r[updated_at]}
'''.strip()

    for i, r in enumerate(api.get_rooms()):
        member_names = [m['user']['nickname'] for m in r['members']]
        sensor_names = [s['nickname'] for s in r['sensors']]
        last_message_id = 0
        if 0 < len(r['messages']):
            last_message_id = r['messages'][0]['id']
        click.echo(template.format(  # type: ignore
                index=i + 1,
                r=r,
                members_count=len(member_names),
                members=u', '.join(member_names),
                sensors_count=len(sensor_names),
                last_message_id=last_message_id,
                sensors=u', '.join(sensor_names)))
项目:bocco-api-python    作者:YUKAI    | 项目源码 | 文件源码
def send(ctx, room_uuid, text):
    # type: (click.Context, str, str) -> None
    """????????????."""
    api = ctx.obj['api']
    click.echo(u'????????...')  # type: ignore
    api.post_text_message(uuid.UUID(room_uuid), text)
项目:bocco-api-python    作者:YUKAI    | 项目源码 | 文件源码
def web(ctx):
    # type: (click.Context) -> None
    """Web ????? API ?????????"""
    api = ctx.obj['api']
    debug = ctx.obj['debug']
    downloads = ctx.obj['downloads']

    app.config.update(dict(DEBUG=debug, DOWNLOADS=downloads))
    app.api = api
    app.run()
项目:sphinx-click    作者:click-contrib    | 项目源码 | 文件源码
def _get_ctx():

        @click.group()
        def cli():
            """A sample command group."""
            pass

        @cli.command()
        def hello():
            """A sample command."""
            pass

        return click.Context(cli, info_name='cli')
项目:TigerHost    作者:naphatkrit    | 项目源码 | 文件源码
def ensure_project_path(f):
    """Ensure that the project path exists. In the event that it
    does not exist, prompt the user to specify the path
    """
    @click.pass_context
    def new_func(ctx, *args, **kwargs):
        """
        :param click.Context ctx:
        """
        if get_project_path() is None:
            click.echo('Config project_path not set.')
            click.echo('You can do one of the following:')
            choice = click_utils.prompt_choices([
                'Clone the repository to {}.'.format(default_project_path()),
                'Specify the path to an existing repo.'
            ])
            if choice == 0:
                path = default_project_path()
                if os.path.exists(path):
                    click.confirm('Path {} already exists. Continuing will remove this path.'.format(
                        path), default=True, abort=True)
                    if os.path.isdir(path):
                        shutil.rmtree(path)
                    else:
                        os.remove(path)
                click.echo('Cloning to {}...'.format(path), nl=False)
                clone_project()
                click.secho('Done', fg='black', bg='green')
                save_project_path(path)
            else:
                value = click.prompt(
                    'Please enter the path to your project', type=str)
                value = path_utils.canonical_path(value)
                if not os.path.exists(value) or not os.path.isdir(value):
                    click.echo('This directory does not exist.')
                    ctx.exit(code=exit_codes.OTHER_FAILURE)
                click.confirm('Is your project at {}?'.format(
                    value), default=True, abort=True)
                save_project_path(value)
        return ctx.invoke(f, *args, **kwargs)
    return update_wrapper(new_func, f)
项目:TigerHost    作者:naphatkrit    | 项目源码 | 文件源码
def option_hosted_zone_id(f):
    """Similar to click.option for hosted-zone-id, but in the event
    that the user does not specify the option, try to retrieve
    the ID from AWS and only error out if that fails or is ambiguous.
    """
    @click.option('--hosted-zone-id', '-hz', default=None, help='Route 53 Hosted Zone ID for {}'.format(settings.DOMAIN_NAME))
    @click.pass_context
    def new_func(ctx, hosted_zone_id, *args, **kwargs):
        """
        :param click.Context ctx:
        """
        if hosted_zone_id is None:
            echo_heading('Trying to find hosted zone for {}.'.format(settings.DOMAIN_NAME), marker='-', marker_color='magenta')
            client = boto3.client('route53')
            response = client.list_hosted_zones_by_name(
                DNSName=settings.DOMAIN_NAME
            )
            if len(response['HostedZones']) == 0:
                click.echo('Unable to find a Hosted Zone for {}. Please specify it explicitly by passing --hosted-zone-id/-hz.')
                ctx.exit(code=exit_codes.OTHER_FAILURE)
            elif len(response['HostedZones']) > 1:
                click.echo('Found multiple Hosted Zones for {}. Please specify one explicitly by passing --hosted-zone-id/-hz.')
                ctx.exit(code=exit_codes.OTHER_FAILURE)
            hosted_zone_id = response['HostedZones'][0]['Id'].split('/')[-1]
            click.echo('Done.')
        return ctx.invoke(f, hosted_zone_id=hosted_zone_id, *args, **kwargs)
    return update_wrapper(new_func, f)
项目:TigerHost    作者:naphatkrit    | 项目源码 | 文件源码
def skip_if_debug(f):
    @click.pass_context
    def new_func(ctx, *args, **kwargs):
        """
        :param click.Context ctx:
        """
        if settings.DEBUG:
            click.echo('Skipping because DEBUG is True.')
            ctx.exit()
        return ctx.invoke(f, *args, **kwargs)
    return update_wrapper(new_func, f)
项目:sceptre    作者:cloudreach    | 项目源码 | 文件源码
def get_stack(ctx, path):
    """
    Parses the path to generate relevant Envrionment and Stack object.

    :param ctx: Cli context.
    :type ctx: click.Context
    :param path: Path to either stack config or environment folder.
    :type path: str
    """
    return ConfigReader(
        ctx.obj["sceptre_dir"], ctx.obj["options"]
    ).construct_stack(path)
项目:logstapo    作者:ThiefMaster    | 项目源码 | 文件源码
def test_current_config():
    with click.Context(main).scope() as ctx:
        ctx.params['config'] = {'foo': 'bar'}
        assert config.current_config == {'foo': 'bar'}
项目:pipetree    作者:pipetree    | 项目源码 | 文件源码
def test_get_config_path(self):
        ctx = click.Context(click.Command('cli'))
        ctx.obj = {'project_dir': 'foo/bar/baz/'}
        path = _get_config_path(ctx)
        self.assertEqual(path, 'foo/bar/baz/.pipetree/config.json')
项目:proxenos    作者:darvid    | 项目源码 | 文件源码
def cmd_select(ctx,                  # type: click.Context
               backend,              # type: str
               host,                 # type: str
               port,                 # type: int
               filter_address,       # type: str
               filter_service_name,  # type: str
               filter_service_tags,  # type: str
               hash_method,          # type: proxenos.rendezvous.HashMethod
               key,                  # type: str
               ):
    # type: (...) -> None
    """Selects a node from a cluster."""
    if port is None:
        port = DEFAULT_PORTS[backend]
    driver_manager = stevedore.driver.DriverManager(
        namespace=proxenos.mappers.NAMESPACE,
        name=backend,
        invoke_on_load=False)
    try:
        mapper = driver_manager.driver(host=host, port=port)
    except proxenos.errors.ServiceDiscoveryConnectionError as err:
        click.secho(str(err), fg='red')
        ctx.exit(proxenos.const.ExitCode.CONNECTION_FAILURE)
    mapper.update()
    cluster = mapper.cluster.copy()
    # TODO(darvid): Add filtering support in the mapper API itself
    if filter_service_tags:
        pattern = re.compile(filter_service_tags)
        cluster = {service for service in cluster
                   if all(pattern.match(tag) for tag in service.tags)}
    if filter_service_name:
        pattern = re.compile(filter_service_name)
        cluster = {service for service in cluster
                   if pattern.match(service.name)}
    if filter_address:
        pattern = re.compile(filter_address)
        cluster = {service for service in cluster
                   if pattern.match(str(service.socket_address))}
    service = proxenos.rendezvous.select_node(
        cluster, key, hash_method=hash_method)
    if service is None:
        ctx.exit(proxenos.const.ExitCode.SERVICES_NOT_FOUND)
    click.echo(str(service.socket_address))
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def do_ex(ctx: click.Context, cmd: typing.List[str], cwd: str = '.') -> typing.Tuple[str, str, int]:
    """
    Executes a given command

    Args:
        ctx: Click context
        cmd: command to run
        cwd: working directory (defaults to ".")

    Returns: stdout, stderr, exit_code

    """

    def _popen_pipes(cmd_, cwd_):
        def _always_strings(env_dict):
            """
            On Windows and Python 2, environment dictionaries must be strings
            and not unicode.
            """
            env_dict.update(
                (key, str(value))
                for (key, value) in env_dict.items()
            )
            return env_dict

        return subprocess.Popen(
            cmd_,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            cwd=str(cwd_),
            env=_always_strings(
                dict(
                    os.environ,
                    # try to disable i18n
                    LC_ALL='C',
                    LANGUAGE='',
                    HGPLAIN='1',
                )
            )
        )

    def _ensure_stripped_str(_, str_or_bytes):
        if isinstance(str_or_bytes, str):
            return '\n'.join(str_or_bytes.strip().splitlines())
        else:
            return '\n'.join(str_or_bytes.decode('utf-8', 'surogate_escape').strip().splitlines())

    exe = find_executable(cmd.pop(0))
    if not exe:
        exit(-1)
    cmd.insert(0, exe)
    click.secho(f'{cmd}', nl=False, fg='magenta')
    p = _popen_pipes(cmd, cwd)
    out, err = p.communicate()
    click.secho(f' -> {p.returncode}', fg='magenta')
    return _ensure_stripped_str(ctx, out), _ensure_stripped_str(ctx, err), p.returncode
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def do(
    ctx: click.Context,
    cmd: typing.List[str],
    cwd: str = '.',
    mute_stdout: bool = False,
    mute_stderr: bool = False,
    # @formatter:off
    filter_output: typing.Union[None, typing.Iterable[str]]=None
    # @formatter:on
) -> str:
    """
    Executes a command and returns the result

    Args:
        ctx: click context
        cmd: command to execute
        cwd: working directory (defaults to ".")
        mute_stdout: if true, stdout will not be printed
        mute_stderr: if true, stderr will not be printed
        filter_output: gives a list of partial strings to filter out from the output (stdout or stderr)

    Returns: stdout
    """

    def _filter_output(input_):

        def _filter_line(line):
            # noinspection PyTypeChecker
            for filter_str in filter_output:
                if filter_str in line:
                    return False
            return True

        if filter_output is None:
            return input_
        return '\n'.join(filter(_filter_line, input_.split('\n')))

    if not isinstance(cmd, (list, tuple)):
        cmd = shlex.split(cmd)

    out, err, ret = do_ex(ctx, cmd, cwd)
    if out and not mute_stdout:
        click.secho(f'{_filter_output(out)}', fg='cyan')
    if err and not mute_stderr:
        click.secho(f'{_filter_output(err)}', fg='red')
    if ret:
        click.secho(f'command failed: {cmd}', err=True, fg='red')
        exit(ret)
    return out
项目:reqwire    作者:darvid    | 项目源码 | 文件源码
def main_add(ctx,                      # type: click.Context
             options,                  # type: Dict[str, Any]
             build,                    # type: bool
             editable,                 # type: Iterable[str]
             tag,                      # type: Iterable[str]
             install,                  # type: bool
             pre,                      # type: bool
             resolve_canonical_names,  # type: bool
             resolve_versions,         # type: bool
             specifiers,               # type: Tuple[str, ...]
             ):
    # type: (...) -> None
    """Add packages to requirement source files."""
    if not options['directory'].exists():
        console.error('run `{} init\' first', ctx.find_root().info_name)
        ctx.abort()

    specifiers = tuple('-e {}'.format(e) for e in editable) + specifiers

    if install:
        pip_args = tuple(shlex.split(' '.join(specifiers)))
        if pre:
            pip_args = ('--pre',) + pip_args
        pip_install(ctx, *pip_args)

    if not tag:
        tag = ('main',)

    pip_options, session = reqwire.helpers.requirements.build_pip_session()
    src_dir = options['directory'] / options['source_dir']
    lookup_index_urls = set()  # type: Set[str]

    for tag_name in tag:
        filename = src_dir / ''.join((tag_name, options['extension']))
        if not filename.exists():
            continue
        _, finder = reqwire.helpers.requirements.parse_requirements(
            filename=str(filename))
        lookup_index_urls |= set(finder.index_urls)

    try:
        for tag_name in tag:
            console.info('saving requirement(s) to {}', tag_name)
            reqwire.scaffold.extend_source_file(
                working_directory=str(options['directory']),
                tag_name=tag_name,
                specifiers=specifiers,
                extension=options['extension'],
                lookup_index_urls=lookup_index_urls,
                prereleases=pre,
                resolve_canonical_names=resolve_canonical_names,
                resolve_versions=resolve_versions)
    except piptools.exceptions.NoCandidateFound as err:
        console.error(str(err))
        ctx.abort()

    if build:
        ctx.invoke(main_build, all=False, tag=tag)
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def catch_all(func):
    """
    Catch (nearly) all exceptions unless in debug mode.

    Args:
        func (function): function being decorated
    """
    def _catch_all(ctx, *args, **kwargs):
        """
        Args:
            ctx (click.Context): cli context object
            args (tuple): tuple of args of the fuction
            kwargs (dict): keyword args of the function
        """
        def stderr(msg):
            click.echo(click.style(msg, fg='red'), file=sys.stderr)
        try:
            return func(ctx, *args, **kwargs)
        except click.Abort:
            # on SIGINT click.prompt raises click.Abort
            logger.error('')  # just to get a newline

        except click.ClickException:
            raise  # Let click deal with it

        except Exception:
            # generic error string
            stderr(
                'You have experienced a client-side technical error.')

            # only dump the stack traces if the debug flag is set
            if kwargs.get('debug') or ctx.obj['debug']:
                stderr("\nFunction: {}.{}".format(func.__module__, func.__name__))
                stderr("Args: {}".format(args))
                stderr("Kwargs: {}".format(kwargs))
                stderr("{}".format(traceback.format_exc()))
            else:
                stderr(
                    'For more detail, run your command with the debug flag: '
                    '`21 --debug <command>.`')
        sys.exit(1)

    return functools.update_wrapper(_catch_all, func)
项目:sphinx-click    作者:click-contrib    | 项目源码 | 文件源码
def test_basic_parameters(self):
        """Validate a combination of parameters.

        This exercises the code paths for a group with arguments, options and
        environment variables.
        """

        @click.group()
        @click.option('--param', envvar='PARAM', help='A sample option')
        @click.argument('ARG', envvar='ARG')
        def cli():
            """A sample command group."""
            pass

        ctx = click.Context(cli, info_name='cli')
        output = list(ext._format_command(ctx, show_nested=False))

        self.assertEqual(textwrap.dedent("""
        .. program:: cli
        .. code-block:: shell

            cli [OPTIONS] ARG COMMAND [ARGS]...

        .. rubric:: Options

        .. option:: --param <param>

            A sample option

        .. rubric:: Arguments

        .. option:: ARG

            Required argument

        .. rubric:: Environment variables

        .. envvar:: PARAM

            Provide a default for :option:`--param`

        .. envvar:: ARG

            Provide a default for :option:`ARG`
        """).lstrip(), '\n'.join(output))