Python click 模块,group() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用click.group()

项目:polyaxon-cli    作者:polyaxon    | 项目源码 | 文件源码
def delete(experiment, project):
    """Delete experiment group."""
    user, project_name = get_project_or_local(project)
    if not click.confirm("Are sure you want to delete experiment `{}`".format(experiment)):
        click.echo('Existing without deleting experiment.')
        sys.exit(1)

    try:
        response = PolyaxonClients().experiment.delete_experiment(
            user, project_name, experiment)
    except (PolyaxonHTTPError, PolyaxonShouldExitError) as e:
        Printer.print_error('Could not delete experiment `{}`.'.format(experiment))
        Printer.print_error('Error message `{}`.'.format(e))
        sys.exit(1)

    if response.status_code == 204:
        Printer.print_success("Experiment `{}` was delete successfully".format(experiment))
项目:polyaxon-cli    作者:polyaxon    | 项目源码 | 文件源码
def get(group, project):
    """Get experiment group by uuid.

    Examples:
polyaxon group get 13
```
"""
user, project_name = get_project_or_local(project)
try:
    response = PolyaxonClients().experiment_group.get_experiment_group(
        user, project_name, group)
except (PolyaxonHTTPError, PolyaxonShouldExitError) as e:
    Printer.print_error('Could not get experiment group `{}`.'.format(group))
    Printer.print_error('Error message `{}`.'.format(e))
    sys.exit(1)

response = response.to_light_dict()
Printer.print_header("Experiment group info:")
dict_tabulate(response)

```

项目:polyaxon-cli    作者:polyaxon    | 项目源码 | 文件源码
def delete(group, project):
    """Delete experiment group."""
    user, project_name = get_project_or_local(project)
    if not click.confirm("Are sure you want to delete experiment group `{}`".format(group)):
        click.echo('Existing without deleting experiment group.')
        sys.exit(0)

    try:
        response = PolyaxonClients().experiment_group.delete_experiment_group(
            user, project_name, group)
    except (PolyaxonHTTPError, PolyaxonShouldExitError) as e:
        Printer.print_error('Could not delete experiment group `{}`.'.format(group))
        Printer.print_error('Error message `{}`.'.format(e))
        sys.exit(1)

    if response.status_code == 204:
        Printer.print_success("Experiment group `{}` was delete successfully".format(group))
项目:kubey    作者:bradrf    | 项目源码 | 文件源码
def webui(obj):
    '''List dashboard links for matching pods (if only one matched, URL is opened in browser).'''
    kubectl = obj.kubey.kubectl
    info = click.unstyle(kubectl.call_capture('cluster-info'))
    dash_endpoint = re.search(r'kubernetes-dashboard.*?(http\S+)', info).group(1)
    urls = []
    for pod in obj.kubey.each_pod(obj.maximum):
        pod_path = '/#/pod/{0}/{1}?namespace={0}'.format(pod.namespace, pod.name)
        urls.append(dash_endpoint + pod_path)
    if len(urls) == 1:
        url = urls[0]
        click.echo(url)
        click.launch(url)
    else:
        for url in urls:
            click.echo(url)
项目:adefa    作者:butomo1989    | 项目源码 | 文件源码
def group(name, project, device):
    """
    Create a device group / pool.
    :param name: group name
    :param project: project id
    :param device: device id
    """
    device_str = '['
    for pos, item in enumerate(device):
        device_str += '"{item}"'.format(item=item)
        if pos == len(device) - 1:
            device_str += ']'
        else:
            device_str += ', '
    rules = [{'attribute': 'ARN', 'operator': 'IN', 'value': device_str}]
    res = client.create_device_pool(name=name, projectArn=project, rules=rules)
    print(res.get('devicePool').get('arn'))
项目:adefa    作者:butomo1989    | 项目源码 | 文件源码
def run(name, project, app, type, test, group):
    """
    Schedule test to be run.
    :param name: run name
    :param project: project id
    :param app: app id
    :param type: run type
    :param test: test id
    :param group: device group id
    """
    if app:
        res = client.schedule_run(
            name=name, projectArn=project, appArn=app, test={'type': type, 'testPackageArn': test}, devicePoolArn=group
        )
    else:
        res = client.schedule_run(
            name=name, projectArn=project, test={'type': type, 'testPackageArn': test}, devicePoolArn=group
        )
    print(res.get('run').get('arn'))
项目:kelctl    作者:kelproject    | 项目源码 | 文件源码
def show_obj(group, manifest, kind):
    """
    Write a Layer 1 object to stdout.
    """
    if not os.path.exists("cluster.yml"):
        error("cluster.yml does not exist. Did you configure?")
    with open("cluster.yml") as fp:
        config = yaml.load(fp.read())
    cluster = Cluster(config)
    cluster.key_keeper = KeyKeeper("./keys")
    Resource = COMPONENTS.get(manifest)
    if Resource is None:
        click.echo('No component named "{}"'.format(manifest))
        sys.exit(1)
    objs = Resource(cluster).get_api_objs(group, manifest)[kind]
    for obj in objs:
        click.echo(json.dumps(obj.obj, indent=2))
项目:StrepHit    作者:Wikidata    | 项目源码 | 文件源码
def parse_and_save(text, separator, out_file, url):
    current_item = None
    for line in text.split('\n'):
        if not line:
            continue
        match = re.match(separator, line)
        if match:
            if current_item:
                json.dump(current_item, out_file)
                out_file.write('\n')

            print '%s' % match.group(1)
            current_item = {
                'url': url,
                'name': match.group(1),
                'bio': '',
            }
        elif current_item:
            current_item['bio'] += line
项目:kostka    作者:pixers    | 项目源码 | 文件源码
def run_hooks(type, *args):
    dirname = os.path.dirname(__file__)
    hooks_dir = os.path.join(dirname, 'hooks', type)
    dirs = [os.path.join(dirname, 'hooks', type),
            os.path.join('/usr/lib/kostka/hooks', type),
            os.path.join('/etc/kostka/hooks', type), ]
    hooks = []
    for hooks_dir in dirs:
        if not os.path.exists(hooks_dir):
            continue

        hooks += map(lambda f: os.path.join(hooks_dir, f), os.listdir(hooks_dir))

    for ep in pkg_resources.iter_entry_points(group='kostka'):
        path = os.path.join(ep.dist.location, ep.dist.key.replace('-', '_'), 'hooks', type)
        if os.path.exists(path):
            hooks += map(lambda f: os.path.join(path, f), os.listdir(path))

    for path in sorted(hooks):
        if not os.path.isfile(path) or not os.access(path, os.X_OK):
            print("NOT running hook {path} "
                  "(`chmod +x {path}`?)".format(path=path), file=sys.stderr)
            continue

        subprocess.call([path] + list(args))
项目:sphinx-click    作者:click-contrib    | 项目源码 | 文件源码
def test_no_parameters(self):
        """Validate a `click.Group` with no parameters.

        This exercises the code paths for a group with *no* arguments, *no*
        options and *no* environment variables.
        """

        @click.group()
        def cli():
            """A sample command group."""
            pass

        ctx = click.Context(cli, info_name='cli')
        output = list(ext._format_command(ctx, show_nested=False))

        self.assertEqual(textwrap.dedent("""
        .. program:: cli
        .. code-block:: shell

            cli [OPTIONS] COMMAND [ARGS]...
        """).lstrip(), '\n'.join(output))
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def filter_creation_date(groups, start, end):
    """Filter log groups by their creation date.

    Also sets group specific value for start to the minimum
    of creation date or start.
    """
    results = []
    for g in groups:
        created = datetime.fromtimestamp(g['creationTime'] / 1000.0)
        if created > end:
            continue
        if created > start:
            g['exportStart'] = created
        else:
            g['exportStart'] = start
        results.append(g)
    return results
项目:histonets-cv    作者:sul-cidr    | 项目源码 | 文件源码
def test_pair_options_to_argument(self):
        code = """
import json
import click
from histonets import utils
@click.group()
def main():
    pass
@main.command()
@click.argument('img')
@click.argument('arg', nargs=-1, required=True)
@click.option('-o', '--option', multiple=True)
@utils.pair_options_to_argument('arg', {'option': 0}, args_slice=(2, None))
def command(img, arg, option):
    click.echo(json.dumps((arg, option)))
main()
        """
        cmd = ("echo \"{}\" "
               "| python - command im t1 -o 1 t2 t3 -o 3".format(code))
        ps = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
                              stderr=subprocess.STDOUT)
        output = ps.communicate()[0].decode()
        assert 'Error' not in output
        assert [["t1", "t2", "t3"], ["1", 0, "3"]] == json.loads(output)
项目:nokdoc    作者:hellt    | 项目源码 | 文件源码
def get_json_resp(responce):
    '''
    Sometimes documentation server returns junk data before
    json responce. This will raise decode exception.
    Strip this data if any
    for example this one gives trouble if using .json() instead
    nokdoc -l rdodin getlinks -p nuage-vns -r 4.0.r6
    '''
    try:
        logging.debug(pformat(responce.text))
        json_resp = responce.json()
        return json_resp
    except json.decoder.JSONDecodeError:
        json_str = re.search(r'{.*}', responce.text, flags=re.DOTALL).group()
        json_str = json_str.replace('\n', '').replace('\r', '')
        json_resp = json.loads(json_str)
    return json_resp
项目:solidfire-cli    作者:solidfire    | 项目源码 | 文件源码
def disableauthentication(ctx):
    """The DisableLdapAuthentication method enables you to disable LDAP authentication and remove all LDAP configuration settings. This method does not remove any configured cluster admin accounts (user or group). However, those cluster admin accounts will no longer be able to log in."""



    cli_utils.establish_connection(ctx)


    ctx.logger.info(""": """+""";"""+"")
    try:
        _DisableLdapAuthenticationResult = ctx.element.disable_ldap_authentication()
    except common.ApiServerError as e:
        ctx.logger.error(e.message)
        exit()
    except BaseException as e:
        ctx.logger.error(e.__str__())
        exit()
    if ctx.json:
        print(simplejson.dumps(simplejson.loads(_DisableLdapAuthenticationResult), indent=4))
        return
    else:
        cli_utils.print_result(_DisableLdapAuthenticationResult, ctx.logger, as_json=ctx.json, as_pickle=ctx.pickle, depth=ctx.depth, filter_tree=ctx.filter_tree)
项目:koi    作者:openpermissions    | 项目源码 | 文件源码
def get_command(self, ctx, name):
        """Get the command from either the commands dir or default group"""
        if not self._commands_dir:
            return defaults.get_command(ctx, name)

        ns = {}
        fn = os.path.join(self._commands_dir, name + '.py')

        try:
            with open(fn) as f:
                code = compile(f.read(), fn, 'exec')
        except IOError:
            return defaults.get_command(ctx, name)

        eval(code, ns, ns)

        CLI = 'cli'

        try:
            return ns[CLI]
        except KeyError:
            ctx.fail('`{}` not defined in "{}"'.format(CLI, name))
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def init():
    """Return top level command handler"""

    @click.group()
    @click.option('--cell', required=True,
                  envvar='TREADMILL_CELL',
                  is_eager=True, callback=cli.handle_context_opt,
                  expose_value=False)
    @click.option('--zookeeper', required=False,
                  envvar='TREADMILL_ZOOKEEPER',
                  callback=cli.handle_context_opt,
                  expose_value=False)
    def master_group():
        """Manage Treadmill master data"""
        pass

    cell_group(master_group)
    bucket_group(master_group)
    server_group(master_group)
    app_group(master_group)
    monitor_group(master_group)
    identity_group_group(master_group)

    return master_group
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def init():
    """Return top level command handler."""

    @click.group()
    @click.option('--aliases-path', required=False,
                  envvar='TREADMILL_ALIASES_PATH',
                  help='Colon separated command alias paths')
    def node_group(aliases_path):
        """Manage Treadmill node data"""
        if aliases_path:
            os.environ['TREADMILL_ALIASES_PATH'] = aliases_path

    lvm_group(node_group)
    benchmark_group(node_group)

    return node_group
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def init():
    """Return top level command handler."""

    @click.group()
    @click.option('--zookeeper', required=False,
                  envvar='TREADMILL_ZOOKEEPER',
                  callback=cli.handle_context_opt,
                  expose_value=False)
    @click.option('--cell', required=True,
                  envvar='TREADMILL_CELL',
                  callback=cli.handle_context_opt,
                  expose_value=False)
    def top():
        """Report scheduler state."""
        pass

    view_group(top)
    explain_group(top)
    return top
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def init():
    """Return top level command handler"""

    @click.group()
    def ldap_group():
        """Manage Treadmill LDAP data"""
        pass

    cell_group(ldap_group)
    server_group(ldap_group)
    app_group(ldap_group)
    dns_group(ldap_group)
    app_groups_group(ldap_group)
    ldap_tenant_group(ldap_group)
    ldap_allocations_group(ldap_group)
    partition_group(ldap_group)
    haproxy_group(ldap_group)

    # Low level ldap access.
    direct_group(ldap_group)
    schema_group(ldap_group)
    init_group(ldap_group)

    return ldap_group
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def init():
    """Return top level command handler."""

    @click.group(cls=cli.make_commands(__name__))
    @click.pass_context
    def run(ctx):
        """Admin commands."""
        cli.init_logger('admin.conf')

        log_level = logging.WARN
        if ctx.obj.get('logging.debug'):
            log_level = logging.DEBUG

        logging.getLogger('treadmill').setLevel(log_level)
        logging.getLogger().setLevel(log_level)

    return run
项目:config-sesame    作者:1and1    | 项目源码 | 文件源码
def license_option(*param_decls, **attrs):
    """``--license`` option that prints license information and then exits."""
    def decorator(func):
        "decorator inner wrapper"
        def callback(ctx, _dummy, value):
            "click option callback"
            if not value or ctx.resilient_parsing:
                return

            from . import __doc__ as license_text
            license_text = re.sub(r"``([^`]+?)``", lambda m: click.style(m.group(1), bold=True), license_text)
            click.echo(license_text)
            ctx.exit()

        attrs.setdefault('is_flag', True)
        attrs.setdefault('expose_value', False)
        attrs.setdefault('is_eager', True)
        attrs.setdefault('help', 'Show the license and exit.')
        attrs['callback'] = callback
        return click.option(*(param_decls or ('--license',)), **attrs)(func)

    return decorator


# Main command (root)
项目:service-level-reporting    作者:zalando-zmon    | 项目源码 | 文件源码
def product_group_update(obj, name, new_name, department):
    """Update product group"""
    client = get_client(obj)

    pgs = client.product_group_list(name)
    if not pgs:
        fatal_error('Product group {} does not exist'.format(name))

    with Action('Updating product_group: {}'.format(name), nl=True):
        pg = pgs[0]
        if new_name:
            pg['name'] = new_name
        if department:
            pg['department'] = department

        pg = client.product_group_update(pg)

        print(json.dumps(pg, indent=4))
项目:service-level-reporting    作者:zalando-zmon    | 项目源码 | 文件源码
def product_create(obj, name, product_group_name, email):
    """Create new product"""
    client = get_client(obj)

    with Action('Creating product: {}'.format(name), nl=True) as act:
        try:
            pgs = client.product_group_list(name=product_group_name)
            if not pgs:
                act.fatal_error('Product group does not exist!')

            pg = pgs[0]

            p = client.product_create(name, product_group_uri=pg['uri'], email=email)

            print(json.dumps(p, indent=4))
        except SLRClientError as e:
            act.error(e)
项目:lambkin    作者:jarpy    | 项目源码 | 文件源码
def main():
    if platform.system() == 'Windows':
        print "Lambkin doesn't run on Windows yet. Sorry."
        sys.exit(1)

    @click.group(invoke_without_command=True, no_args_is_help=True)
    @click.pass_context
    @click.option('--version', help='Show the version.', is_flag=True)
    def cli(ctx, version):
        if ctx.invoked_subcommand is None and version:
            click.echo(VERSION)

    subcommands = [create, list_published, build, publish, run, schedule,
                   unpublish]
    for cmd in subcommands:
        cli.add_command(cmd)
    cli()
项目:click-didyoumean    作者:click-contrib    | 项目源码 | 文件源码
def test_basic_functionality_with_group(runner):
    @click.group(cls=DYMGroup)
    def cli():
        pass

    @cli.command()
    def foo():
        pass

    @cli.command()
    def bar():
        pass

    @cli.command()
    def barrr():
        pass

    result = runner.invoke(cli, ["barr"])
    assert result.output == (
        "Usage: cli [OPTIONS] COMMAND [ARGS]...\n\n"
        "Error: No such command \"barr\".\n\n"
        "Did you mean one of these?\n"
        "    barrr\n"
        "    bar\n"
    )
项目:click-didyoumean    作者:click-contrib    | 项目源码 | 文件源码
def test_cutoff_factor(runner):
    @click.group(cls=DYMGroup, max_suggestions=3, cutoff=1.0)
    def cli():
        pass

    @cli.command()
    def foo():
        pass

    @cli.command()
    def bar():
        pass

    @cli.command()
    def barrr():
        pass

    # if cutoff factor is 1.0 the match must be perfect.
    result = runner.invoke(cli, ["barr"])
    assert result.output == (
        "Usage: cli [OPTIONS] COMMAND [ARGS]...\n\n"
        "Error: No such command \"barr\".\n"
    )
项目:shellcraft    作者:maebert    | 项目源码 | 文件源码
def main(game_path=None):
    game = get_game(game_path)

    # Cheat mode, properly hardcoded.
    if len(sys.argv) > 2 and sys.argv[1] == "debug":
        handle_debug(game)
        sys.exit(0)

    # Remove all commands from the main group that are not enabled in the game yet.
    if not game.state.debug:
        cli.commands = {cmd: command for cmd, command in cli.commands.items() if cmd in game.state.commands_enabled}

    for cmd in ('mine', 'craft', 'research'):
        if cmd in cli.commands:
            cli.commands[cmd].callback = action_step(cli.commands[cmd].callback, game)
    cli()
项目:apimas    作者:grnet    | 项目源码 | 文件源码
def get_or_create_endpoint_group(self, endpoint):
        """
        Gets a group of commands for a specific endpoint.

        If a group does not exist for a specific endpoint, then it is created
        and it's returned.

        :param endpoint: Name of the node which is an endpoint.
        :returns: A group of commands. Each command corresponds to an action
        performed on a collection which belongs to the endpoint.
        """
        def group():
            pass

        if endpoint not in self.endpoint_groups:
            base_cmd = self.get_base_command()
            endpoint_group = base_cmd.group(name=endpoint)(group)
            self.endpoint_groups[endpoint] = endpoint_group
            return endpoint_group
        return self.endpoint_groups[endpoint]
项目:ipbb    作者:ipbus    | 项目源码 | 文件源码
def dep(ctx, proj):
    '''Dependencies command group'''

    env = ctx.obj

    lProj = proj if proj is not None else env.project
    if lProj is not None:
        # Change directory before executing subcommand
        from .proj import cd
        ctx.invoke(cd, projname=lProj)
        return
    else:
        if env.project is None:
            raise click.ClickException('Project area not defined. Move into a project area and try again')

# ------------------------------------------------------------------------------


# ------------------------------------------------------------------------------
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def cli(ctx, config, verbose, noprompt):
    ctx.obj['verbose'] = verbose

    # Add our handlers at the parent level
    add_handler(logging.getLogger(SQLALCHEMY_LOGGER))
    add_handler(logging.getLogger(NEWSREAP_LOGGER))

    # Handle Verbosity
    set_verbosity(verbose)

    ctx.obj['noprompt'] = noprompt

    if config is not None and not isfile(config):
        logger.error(
            "The YAML config file '%s' was not found." % config,
        )
        exit(1)

    # NNTPSettings() for storing and retrieving settings
    ctx.obj['NNTPSettings'] = NNTPSettings(cfg_file=config)

    if not ctx.obj['NNTPSettings'].is_valid():
        # our configuration was invalid
        logger.error("No valid config.yaml file was found.")
        exit(1)

    # NNTPManager() for interacting with all configured NNTP Servers
    ctx.obj['NNTPManager'] = NNTPManager(
        settings=ctx.obj['NNTPSettings'],
    )

# Dynamically Build CLI List; This is done by iterating through
# plugin directories and looking for CLI_PLUGINS_MAPPING
# which is expected to be a dictionary containing the mapping of
# the cli group (the key) to the function prefixes defined.
#
# If we can load it we'll save it here
项目:polyaxon-cli    作者:polyaxon    | 项目源码 | 文件源码
def group():
    """Commands for experiment groups."""
    pass
项目:polyaxon-cli    作者:polyaxon    | 项目源码 | 文件源码
def update(group, project, description):
    """Update experiement group.

    Example:
polyaxon group update 2 --description="new description for my experiments"
```
"""
user, project_name = get_project_or_local(project)
update_dict = {}

if description:
    update_dict['description'] = description

if not update_dict:
    Printer.print_warning('No argument was provided to update the experiment group.')
    sys.exit(0)

try:
    response = PolyaxonClients().experiment_group.update_experiment_group(
        user, project_name, group, update_dict)
except (PolyaxonHTTPError, PolyaxonShouldExitError) as e:
    Printer.print_error('Could not update experiment group `{}`.'.format(group))
    Printer.print_error('Error message `{}`.'.format(e))
    sys.exit(1)

Printer.print_success("Experiment group updated.")
response = response.to_light_dict()
Printer.print_header("Experiment group info:")
dict_tabulate(response)

```

项目:hlsclt    作者:benjmarshall    | 项目源码 | 文件源码
def do_end_build_stuff(ctx,sub_command_returns,report):
    # Check for reporting flag
    if report:
        if not sub_command_returns:
            # Must be on the default run, add all stages manually
            sub_command_returns = ['csim','syn','cosim','export']
        for report in sub_command_returns:
            open_report(ctx,report)

### Click Command Definitions ###
# Build group entry point
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def stage():
    """
    CLI 'stage' group command
    """
    pass
项目:adefa    作者:butomo1989    | 项目源码 | 文件源码
def delete(type, arn):
    """
    Delete data by arn.
    :param type: project | upload | group | run
    :param arn: id
    """
    if type == 'project':
        client.delete_project(arn=arn)
    elif type == 'upload':
        client.delete_upload(arn=arn)
    elif type == 'group':
        client.delete_device_pool(arn=arn)
    elif type == 'run':
        client.delete_run(arn=arn)
项目:dractor    作者:VerizonDigital    | 项目源码 | 文件源码
def cli(ctx, hostname, username, password, port, quiet, verbose): # pylint: disable=too-many-arguments
    """ Main entry point for application """

    #
    # Configure Logging
    #
    if verbose:
        level = logging.DEBUG
    elif quiet:
        level = logging.WARNING
    else:
        level = logging.INFO

    logging.basicConfig(level=level)

    if level != logging.DEBUG:
        # Make requests quiet unless we are debugging
        logging.getLogger("requests").setLevel(logging.WARNING)

    try:
        client = Client(hostname, port, username, password)
        client.connect()
    except WSMANConnectionError:
        print("\nFailed to connect to iDRAC")
        sys.exit(1)
    except Exception as exc:
        print("\nGeneral problem creating client")
        sys.exit(1)

    ctx.obj['client'] = client

#
# Recipe Commands
#

#
# RAID group
#
项目:bosh-azure-template    作者:cf-platform-eng    | 项目源码 | 文件源码
def cli(ctx, index):
    # Get settings from CustomScriptForLinux extension configurations
    waagent.LoggerInit('/var/log/waagent.log', '/dev/null')
    ctx.meta["settings"] = settings = get_settings()
    ctx.meta["index-file"] = index

    group = ctx.command
    commands = sorted([c for c in group.commands])

    # start logging to stdout and install.log
    sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)

    username = settings["username"]
    log_file = os.path.join("/home", username, "pcf_install.log")

    tee = Popen(["tee", log_file], stdin=PIPE)
    os.dup2(tee.stdin.fileno(), sys.stdout.fileno())
    os.dup2(tee.stdin.fileno(), sys.stderr.fileno())

    if ctx.invoked_subcommand is None:
        for command in commands:
            command = re.sub('\d{2}_', '', command)
            print "Running {0}...".format(command)
            ctx.invoke(eval(command))
    else:
        pass
项目:aiotasks    作者:cr0hn    | 项目源码 | 文件源码
def __call__(self, f):
        def wrapped_f(*args):
            fn = f
            for option in reversed(global_options_list):
                fn = option(f)

            fn = click.group(
                context_settings={'help_option_names': ['-h', '--help']},
                invoke_without_command=self.invoke_without_command)(fn)

            return fn

        return wrapped_f()


#
# HERE MORE EXAMPLES OF CMD OPTIONS
#
# -------------------------------------------------------------------------
# Options for "auto" command
# -------------------------------------------------------------------------
#
# auto_options_list = (
#     click.option('-T', '--timeout', 'timeout', type=int, default=60,
#                  help="max time to wait until actions are available"),
# )
#
#
# class auto_options(object):
#     def __call__(self, f):
#         def wrapped_f(*args):
#             fn = f
#             for option in reversed(auto_options_list):
#                 fn = option(f)
#
#             return fn
#
#         return wrapped_f()
项目:gluon    作者:openstack    | 项目源码 | 文件源码
def main():
    cli = types.FunctionType(dummy.func_code, {})
    cli = click.group()(cli)
    model_list = get_model_list()
    model = get_api_model(sys.argv, model_list)
    proc_model(cli,
               package_name="gluon",
               model_dir="models",
               api_model=model,
               hostenv="OS_PROTON_HOST",
               portenv="OS_PROTON_PORT",
               hostdefault=CONF.api.host,
               portdefault=CONF.api.port)
    cli()
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def _print_version(ctx: click.Context, param, value):
    if not value or ctx.resilient_parsing:
        return

    ensure_repo()

    _get_version(ctx)
    exit(0)


# @click.group(invoke_without_command=True)
项目:Dallinger    作者:Dallinger    | 项目源码 | 文件源码
def new_recruit(self, match):
        """Dispatched to by notify(). If a recruitment request has been issued,
        open a browser window for the a new participant (in this case the
        person doing local debugging).
        """
        self.out.log("new recruitment request!")
        url = match.group(1)
        if self.proxy_port is not None:
            self.out.log("Using proxy port {}".format(self.proxy_port))
            url = url.replace(str(get_config().get('base_port')), self.proxy_port)
        webbrowser.open(url, new=1, autoraise=True)
项目:Dallinger    作者:Dallinger    | 项目源码 | 文件源码
def start_replay(self, match):
        """Dispatched to by notify(). If a recruitment request has been issued,
        open a browser window for the a new participant (in this case the
        person doing local debugging).
        """
        self.out.log("replay ready!")
        url = match.group(1)
        webbrowser.open(url, new=1, autoraise=True)
项目:endpointer    作者:devjack    | 项目源码 | 文件源码
def cli(ctx, debug=False):
    """
    Base application/group. Subcommands 'extend' the options/args.
    """
    ctx.obj = Config(debug=debug)
项目:endpointer    作者:devjack    | 项目源码 | 文件源码
def cli(ctx, debug=False):
    """
    Base application/group. Subcommands 'extend' the options/args.
    """
    ctx.obj = Config(debug=debug)
项目:yoda    作者:yoda-pa    | 项目源码 | 文件源码
def dev():
    """
        Dev command group:\n
        contains commands helpful for developers
    """
项目:yoda    作者:yoda-pa    | 项目源码 | 文件源码
def life():
    """
        Life command group:\n
        contains helpful commands to organize your life
    """
项目:zotero-cli    作者:jbaiter    | 项目源码 | 文件源码
def find_storage_directories():
    home_dir = pathlib.Path(os.environ['HOME'])
    candidates = []
    if sys.platform == "linux" or sys.platform == "linux2":
        firefox_dir = home_dir/".mozilla"/"firefox"
        if firefox_dir.exists():
            candidates.append(firefox_dir.iterdir())
        zotero_dir = home_dir/".zotero"
        if zotero_dir.exists():
            candidates.append(zotero_dir.iterdir())
        zotero5_dir = home_dir/"Zotero/storage"
        if zotero5_dir.exists():
            yield ('default', zotero5_dir)
    elif sys.platform == "win32":
        win_support_dir = home_dir/"AppData"/"Roaming"
        zotero_win_dir = win_support_dir/"Zotero"/"Zotero"/"Profiles"
        if zotero_win_dir.exists():
            candidates.append(zotero_win_dir.iterdir())
    elif sys.platform == "darwin":
        osx_support_dir = home_dir/"Library"/"Application Support"
        zotero_osx_dir = osx_support_dir/"Zotero"/"Profiles"
        if zotero_osx_dir.exists():
            candidates.append(zotero_osx_dir.iterdir())
    candidate_iter = itertools.chain.from_iterable(candidates)
    for fpath in candidate_iter:
        if not fpath.is_dir():
            continue
        match = PROFILE_PAT.match(fpath.name)
        if match:
            storage_path = fpath/"zotero"/"storage"
            if storage_path.exists():
                yield (match.group(2), storage_path)
项目:dockerscan    作者:cr0hn    | 项目源码 | 文件源码
def __call__(self, f):
        def wrapped_f(*args):
            fn = f
            for option in reversed(global_options_list):
                fn = option(f)

            fn = click.group(context_settings={'help_option_names': ['-h', '--help']},
                             invoke_without_command=self.invoke_without_command)(fn)

            return fn

        return wrapped_f()


#
# HERE MORE EXAMPLES OF CMD OPTIONS
#
# --------------------------------------------------------------------------
# Options for "auto" command
# --------------------------------------------------------------------------
#
# auto_options_list = (
#     click.option('-T', '--timeout', 'timeout', type=int, default=60,
#                  help="max time to wait until actions are available"),
# )
#
#
# class auto_options(object):
#     def __call__(self, f):
#         def wrapped_f(*args):
#             fn = f
#             for option in reversed(auto_options_list):
#                 fn = option(f)
#
#             return fn
#
#         return wrapped_f()
项目:parsec-cloud    作者:Scille    | 项目源码 | 文件源码
def perform_group_add_identities(self, intent):
        async with self.connection.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute('SELECT body FROM groups WHERE id=%s', (intent.name, ))
                ret = await cur.fetchone()
                if ret is None:
                    raise GroupNotFound('Group not found.')
                group = ejson_loads(ret[0])
                group_entry = 'admins' if intent.admin else 'users'
                group[group_entry] = list(set(group[group_entry]) | set(intent.identities))
                await cur.execute('UPDATE groups SET body=%s WHERE id=%s',
                    (ejson_dumps(group), intent.name))
项目:parsec-cloud    作者:Scille    | 项目源码 | 文件源码
def perform_group_remove_identities(self, intent):
        async with self.connection.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute('SELECT body FROM groups WHERE id=%s', (intent.name, ))
                ret = await cur.fetchone()
                if ret is None:
                    raise GroupNotFound('Group not found.')
                group = ejson_loads(ret[0])
                group_entry = 'admins' if intent.admin else 'users'
                group[group_entry] = [identity for identity in group[group_entry]
                                      if identity not in intent.identities]
                await cur.execute('UPDATE groups SET body=%s WHERE id=%s',
                    (ejson_dumps(group), intent.name))