Python click 模块,UsageError() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用click.UsageError()

项目:rio-l8qa    作者:mapbox    | 项目源码 | 文件源码
def main(qatif, stats, outdir, cloudmask):


    if not stats and not outdir and not cloudmask:
        raise click.UsageError(
            "Specify --stats, --cloudmask MASK, or --outdir DIR")

    if outdir:
        stats = True

    with rasterio.open(qatif) as src:
        arr = src.read(1)
        profile = src.profile

    if cloudmask:
        write_cloud_mask(arr, profile=profile, cloudmask=cloudmask)

    if stats:
        base = os.path.basename(qatif)
        summary = summary_stats(arr, basename=base, outdir=outdir, profile=profile)
        click.echo(json.dumps(summary, indent=2))

        if outdir:
            click.echo("QA variables written as uint8 tifs to {}".format(outdir),
                    err=True)
项目:globus-cli    作者:globus    | 项目源码 | 文件源码
def rename_command(source, destination):
    """
    Executor for `globus rename`
    """
    source_ep, source_path = source
    dest_ep, dest_path = destination

    if source_ep != dest_ep:
        raise click.UsageError(('rename requires that the source and dest '
                                'endpoints are the same, {} != {}')
                               .format(source_ep, dest_ep))
    endpoint_id = source_ep

    client = get_client()
    autoactivate(client, endpoint_id, if_expires_in=60)

    res = client.operation_rename(endpoint_id, oldpath=source_path,
                                  newpath=dest_path)
    formatted_print(res, text_format=FORMAT_TEXT_RAW, response_key='message')
项目:globus-cli    作者:globus    | 项目源码 | 文件源码
def endpoint_search(filter_fulltext, filter_owner_id, filter_scope):
    """
    Executor for `globus endpoint search`
    """
    if filter_scope == 'all' and not filter_fulltext:
        raise click.UsageError(
            'When searching all endpoints (--filter-scope=all, the default), '
            'a full-text search filter is required. Other scopes (e.g. '
            '--filter-scope=recently-used) may be used without specifying '
            'an additional filter.')

    client = get_client()

    owner_id = filter_owner_id
    if owner_id:
        owner_id = maybe_lookup_identity_id(owner_id)

    search_iterator = client.endpoint_search(
        filter_fulltext=filter_fulltext, filter_scope=filter_scope,
        filter_owner_id=owner_id)

    formatted_print(search_iterator, fields=ENDPOINT_LIST_FIELDS,
                    json_converter=iterable_response_to_dict)
项目:globus-cli    作者:globus    | 项目源码 | 文件源码
def role_create(role, principal, endpoint_id):
    """
    Executor for `globus endpoint role show`
    """
    principal_type, principal_val = principal

    client = get_client()

    if principal_type == 'identity':
        principal_val = maybe_lookup_identity_id(principal_val)
        if not principal_val:
            raise click.UsageError(
                'Identity does not exist. '
                'Use --provision-identity to auto-provision an identity.')
    elif principal_type == 'provision-identity':
        principal_val = maybe_lookup_identity_id(principal_val, provision=True)
        principal_type = 'identity'

    role_doc = assemble_generic_doc(
        'role', principal_type=principal_type, principal=principal_val,
        role=role)

    res = client.add_endpoint_role(endpoint_id, role_doc)
    formatted_print(res, simple_text='ID: {}'.format(res['id']))
项目:zalando-deploy-cli    作者:robin-wittler    | 项目源码 | 文件源码
def copy_template(template_path: Path, path: Path, variables: dict):
    for d in template_path.iterdir():
        target_path = path / d.relative_to(template_path)
        if d.is_dir():
            copy_template(d, target_path, variables)
        elif target_path.exists():
            # better not overwrite any existing files!
            raise click.UsageError('Target file "{}" already exists. Aborting!'.format(target_path))
        else:
            with Action('Writing {}..'.format(target_path)):
                target_path.parent.mkdir(parents=True, exist_ok=True)
                with d.open() as fd:
                    contents = fd.read()
                template = string.Template(contents)
                contents = template.safe_substitute(variables)
                with target_path.open('w') as fd:
                    fd.write(contents)
项目:crc-diagram    作者:IuryAlves    | 项目源码 | 文件源码
def main(source, out, raw, format, view):
    """
    Generate CRCDiagrams from SOURCE saving they as OUT.
    \n
    The default output format is png.
    \n
    Example:\n
    crc-diagram source_file.py output.png
    """
    if os.path.isdir(source):
        crc_cards = crc_diagram.folder_to_crc(source)
    else:
        crc_cards = crc_diagram.to_crc(source)

    if raw:
        out = path_to_stream(out or sys.stdout, 'w')
        json.dump([crc.to_dict() for crc in crc_cards], out, indent=4)
    else:
        if out is None:
            raise click.UsageError('Missing argument "out".')
        DotRender(crc_cards, format=format).render(out, view=view)
项目:stereo    作者:suda    | 项目源码 | 文件源码
def generate(filename, data_file, output_dir, template_file, skip_first_row):
    if not os.path.exists(filename):
        raise click.UsageError("Layout not found: %s\n" % filename)
    try:
        module = _import_file(filename)
    except (ImportError, ValueError) as e:
        raise click.UsageError("Unable to load %r: %s\n" % (filename, e))
    layout_classes = list(_iter_layout_classes(module))
    if not layout_classes:
        raise click.UsageError("No layout found in file: %s\n" % filename)
    layout_cls = layout_classes.pop()
    click.secho('Generating documents...', fg='white')
    layout = layout_cls(data_file, output_dir, template_file, skip_first_row)

    # Check files/paths
    layout._check_paths()

    data = csv.reader(open(layout.data_file))
    if layout.skip_first_row:
        next(data)

    # TODO: Show info about rows count
    for row in data:
        layout.generate_document(row)
项目:incubator-ariatosca    作者:apache    | 项目源码 | 文件源码
def resolve_command(self, ctx, args):
        """
        Override clicks ``resolve_command`` method and appends *Did you mean ...* suggestions to the
        raised exception message.
        """
        try:
            return super(AliasedGroup, self).resolve_command(ctx, args)
        except click.exceptions.UsageError as error:
            error_msg = str(error)
            original_cmd_name = click.utils.make_str(args[0])
            matches = difflib.get_close_matches(
                original_cmd_name,
                self.list_commands(ctx),
                self.max_suggestions,
                self.cutoff)
            if matches:
                error_msg += '{0}{0}Did you mean one of these?{0}    {1}'.format(
                    os.linesep,
                    '{0}    '.format(os.linesep).join(matches, ))
            raise click.exceptions.UsageError(error_msg, error.ctx)
项目:elsa    作者:pyvec    | 项目源码 | 文件源码
def freeze_app(app, freezer, path, base_url):
    if not base_url:
        raise click.UsageError('No base URL provided, use --base-url')
    print('Generating HTML...')
    app.config['FREEZER_DESTINATION'] = path
    app.config['FREEZER_BASE_URL'] = base_url
    app.config['SERVER_NAME'] = urllib.parse.urlparse(base_url).netloc

    # make sure Frozen Flask warnings are treated as errors
    warnings.filterwarnings('error', category=flask_frozen.FrozenFlaskWarning)

    try:
        freezer.freeze()
    except flask_frozen.FrozenFlaskWarning as w:
        print('Error:', w, file=sys.stderr)
        sys.exit(1)
项目:docker-service-registrator-route53    作者:mvanholsteijn    | 项目源码 | 文件源码
def get_dns_name_for_hosted_zone_id(self, hosted_zone_id):
        """
        gets the dns name associated with the 'hosted_zone_id'.
        ensure:
            self.hosted_zone_id == hosted_zone_id
            self.dns_name[-1] == '.'
        """
        assert hosted_zone_id is not None
        self.hosted_zone_id = hosted_zone_id
        try:
            response = self.route53.get_hosted_zone(Id=hosted_zone_id)
            self.dns_name = response['HostedZone']['Name']
        except ClientError as e:
            raise click.UsageError('%s' % e)

        log.info('found hosted zone %s for dns name %s' % (self.hosted_zone_id, self.dns_name))

        assert self.dns_name[-1] == '.'
        assert self.hosted_zone_id is not None
项目:docker-service-registrator-route53    作者:mvanholsteijn    | 项目源码 | 文件源码
def get_hosted_zone_id_for_dns_name(self, dns_name):
        """
        gets the hosted_zone_id associated with the 'dns_name'. 
        require:
            there is exactly 1 hosted zone for  'dns_name' in Route53.
        ensure:
            self.hosted_zone_id == hosted_zone_id
            self.dns_name[-1] == '.'
            self.dns_name[-1] == dns_name
        """
        self.dns_name = dns_name if dns_name[-1] == '.' else '%s.' % dns_name
        response = self.route53.list_hosted_zones_by_name(DNSName=self.dns_name)
        zones = filter(lambda r: r['Name'] == self.dns_name, response['HostedZones'])
        if len(zones) == 1:
            self.hosted_zone_id = zones[0]['Id'].split('/')[-1]
        elif len(zones) > 1:
            raise click.UsageError('There are %d hosted zones for the DNS name %s, please specify --hosted-zone-id' % (len(zones), self.dns_name))
        else:
            raise click.UsageError('There is no hosted zones for the DNS name %s' % self.dns_name)

        assert self.dns_name[-1] == '.'
        assert self.hosted_zone_id is not None

        log.info('found dns name %s for hosted zone id %s' %
                 (self.dns_name, self.hosted_zone_id))
项目:moniqueio-tools    作者:monique-io    | 项目源码 | 文件源码
def import_(ctx, dirs):
    for dir in dirs:
        if not os.path.isdir(dir):
            raise click.UsageError('<%s> must be a directory' % dir)

    def do_import(report_name, instances):
        count = 0
        try:
            for instance in instances:
                import_report_instance(ctx, report_name, instance)
                count += 1
        finally:
            echo_info('Imported %d %s instances' % (count, report_name))

    def instances(dir):
        for filename in glob.glob(os.path.join(dir, '*.json')):
            with open(filename) as f:
                yield json.loads(f.read())

    for dir in dirs:
        report_name = dir.rstrip('/').split('/')[-1]
        do_import(report_name, instances(dir))
项目:jsoncsv    作者:alingse    | 项目源码 | 文件源码
def jsoncsv(output, input, expand_, restore_, safe, separator):
    if expand_ and restore_:
        raise click.UsageError('can not choose both, default is `-e`')

    func = expand
    if restore_:
        func = restore

    for line in input:
        obj = json.loads(line)
        new = func(obj, separator=separator, safe=safe)

        content = json.dumps(new, ensure_ascii=False).encode('utf-8')
        output.write(content)
        output.write('\n')

    input.close()
    output.close()
项目:go2mapillary    作者:enricofer    | 项目源码 | 文件源码
def parent(ctx, input, depth):
    """Takes a [x, y, z] tile as input and writes its parent to stdout
    in the same form.

    $ echo "[486, 332, 10]" | mercantile parent

    Output:

    [243, 166, 9]
    """
    src = normalize_input(input)
    for line in iter_lines(src):
        tile = json.loads(line)[:3]
        if tile[2] - depth < 0:
            raise click.UsageError("Invalid parent level: {0}".format(tile[2] - depth))
        for i in range(depth):
            tile = mercantile.parent(tile)
        output = json.dumps(tile)
        click.echo(output)
项目:kael    作者:360skyeye    | 项目源码 | 文件源码
def run(s, p):
    if s:
        server = micro_server("s1", auri=AMQ_URI)

        @server.service("foobar")
        def h(a):
            print a, os.getpid()
            return {"b": a}

        server.start_service(2, daemon=False)
    elif p:
        if not os.path.isfile(p) or os.path.splitext(p)[1] != '.yaml':
            raise click.BadParameter(
                'the param must be yaml config')
        w = WORK_FRAME(auri=AMQ_URI, service_group_conf=p)
        w.frame_start()
    else:
        raise click.UsageError(
            'Could not find other command. You can run kael run --help to see more information')
项目:valohai-cli    作者:valohai    | 项目源码 | 文件源码
def stop(counters, all=False):
    """
    Stop one or more in-progress executions.
    """
    project = get_project(require=True)
    params = {'project': project.id}
    if counters and all:
        raise click.UsageError('Pass either an execution # or `--all`, not both.')
    elif counters:
        params['counter'] = sorted(IntegerRange.parse(counters).as_set())
    elif all:
        params['status'] = 'incomplete'
    else:
        warn('Nothing to stop (pass #s or `--all`)')
        return 1

    for execution in request('get', '/api/v0/executions/', params=params).json()['results']:
        click.echo('Stopping #{counter}... '.format(counter=execution['counter']), nl=False)
        resp = request('post', execution['urls']['stop'])
        click.echo(resp.text)
    success('Done.')
项目:origin-ci-tool    作者:openshift    | 项目源码 | 文件源码
def validate_git_specifier(refspec, branch, commit, tag):
    """
    Validate that the set of specifiers given is consistent.
    The set is valid if:
     - only a branch is given
     - only a commit is given
     - only a tag is given
     - a refspec and target non-master branch is given

    :param refspec: provided refspec like 'pull/1/head'
    :param branch: provided branch like 'master'
    :param commit: provided commit SHA like '2cbd73cbd5aacc965ecfa480fa90164a85191489'
    :param tag: provided tag like 'v1.3.0-rc2'
    """
    if commit and (refspec or branch or tag):
        raise UsageError('If a commit is specified, neither a refspec, branch, or tag can also be specified.')

    if tag and (commit or refspec or branch):
        raise UsageError('If a tag is specified, neither a refspec, branch, or commit can also be specified.')

    if refspec and not branch:
        raise UsageError('If a refspec is specified, the name of the branch to create for it is required.')

    if refspec and branch == 'master':
        raise UsageError('The branch specified for a refspec cannot be the master branch.')
项目:renga-python    作者:SwissDataScienceCenter    | 项目源码 | 文件源码
def stop_executions(config, context_id, endpoint, all_contexts):
    """Stop running executions."""
    client = from_config(config, endpoint=endpoint)

    if not bool(context_id) ^ all_contexts:
        raise click.UsageError(
            'Either specify context id or use --all-contexts')

    if context_id:
        contexts = (client.contexts[cid] for cid in context_id)
    else:
        contexts = client.contexts

    for context in contexts:
        for execution in context.executions:
            try:
                click.echo(
                    'Stopping execution {0.id} on context {1.id} ... '.format(
                        execution, context),
                    nl=False)
                execution.stop()
                click.secho('OK', fg='green')
            except errors.APIError:
                click.secho('FAIL', fg='red')
项目:planb-cassandra    作者:zalando-stups    | 项目源码 | 文件源码
def extend(from_region: str,
           to_region: str,
           cluster_name: str,
           ring_size: int,
           dc_suffix: str,
           num_tokens: int,
           instance_type: str,
           volume_type: str,
           volume_size: int,
           volume_iops: int,
           no_termination_protection: bool,
           use_dmz: bool,
           hosted_zone: str,
           artifact_name: str,
           docker_image: str,
           environment: list,
           sns_topic: str,
           sns_email: str):

    if from_region != to_region and not(use_dmz):
        raise click.UsageError('Extending to a new region requires --use-dmz')

    extend_cluster(options=locals())
项目:planb-cassandra    作者:zalando-stups    | 项目源码 | 文件源码
def validate_artifact_version(options: dict) -> dict:
    conflict_options_msg = """Conflicting options: --artifact-name and
--docker-image cannot be specified at the same time"""
    if not options['docker_image']:
        if not options['artifact_name']:
            options['artifact_name'] = 'planb-cassandra-3.0'
        image_version = get_latest_docker_image_version(options['artifact_name'])
        docker_image = 'registry.opensource.zalan.do/stups/{}:{}' \
                       .format(options['artifact_name'], image_version)
        info('Using docker image: {}'.format(docker_image))
    else:
        if options['artifact_name']:
            raise click.UsageError(conflict_options_msg)
        image_version = options['docker_image'].split(':')[-1]
        docker_image = options['docker_image']
    return dict(options, docker_image=docker_image, image_version=image_version)
项目:core-workflow    作者:python    | 项目源码 | 文件源码
def backport(self):
        if not self.branches:
            raise click.UsageError("At least one branch must be specified.")
        self.fetch_upstream()

        for maint_branch in self.sorted_branches:
            click.echo(f"Now backporting '{self.commit_sha1}' into '{maint_branch}'")

            cherry_pick_branch = self.get_cherry_pick_branch(maint_branch)
            self.checkout_branch(maint_branch)
            commit_message = ""
            try:
                self.cherry_pick()
                commit_message = self.amend_commit_message(cherry_pick_branch)
            except subprocess.CalledProcessError as cpe:
                click.echo(cpe.output)
                click.echo(self.get_exit_message(maint_branch))
            except CherryPickException:
                click.echo(self.get_exit_message(maint_branch))
                raise
            else:
                if self.push:
                    self.push_to_remote(maint_branch,
                                        cherry_pick_branch,
                                        commit_message)
                    self.cleanup_branch(cherry_pick_branch)
                else:
                    click.echo(\
f"""
Finished cherry-pick {self.commit_sha1} into {cherry_pick_branch} \U0001F600
--no-push option used.
... Stopping here.
To continue and push the changes:
    $ cherry_picker --continue

To abort the cherry-pick and cleanup:
    $ cherry_picker --abort
""")
项目:acsoo    作者:acsone    | 项目源码 | 文件源码
def do_pylintcmd(load_plugins, rcfile, module, expected, pylint_options):
    # import pdb; pdb.set_trace()
    if not module:
        module = []
        candidate_addons_dirs = (
            opj('odoo', 'addons'),
            'odoo_addons',
            '.',
        )
        for candidate_addons_dir in candidate_addons_dirs:
            if os.path.isdir(candidate_addons_dir):
                module.extend(
                    opj(candidate_addons_dir, addon) for addon in
                    get_installable_addons(candidate_addons_dir)
                )
        if not module:
            raise click.UsageError("Please provide module or package "
                                   "to lint (--module).")
    cmd = [
        '--load-plugins', load_plugins,
        '--rcfile', rcfile,
    ] + list(pylint_options) + list(module)
    log_cmd(['pylint'] + cmd, level=logging.INFO)
    lint_res = pylint.lint.Run(cmd[:], exit=False)
    sys.stdout.flush()
    sys.stderr.flush()
    expected = _consolidate_expected(rcfile, expected)
    fails, no_fails = _get_failures(lint_res.linter.stats, expected)
    if fails or no_fails:
        msg = cmd_string(['pylint'] + cmd)
        msg += '\n'
        msg += _failures_to_str(fails, no_fails)
        click.echo('\n')
        click.echo(msg)
    if fails:
        raise click.ClickException("pylint errors detected.")
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def remove(ctx, service_names, is_all):
    """
Remove a service from 21 sell

\b
Removing a service
$ 21 sell remove <service_name>

\b
Removing all services from 21 sell
$ 21 sell remove --all
"""
    if not service_names and is_all is False:
        raise click.UsageError('No service selected.', ctx=ctx)

    manager = ctx.obj['manager']
    logger.info(click.style("Removing services.", fg=cli_helpers.TITLE_COLOR))

    def service_successfully_removed_hook(tag):
        cli_helpers.print_str(tag, ["Removed"], "TRUE", True)

    def service_does_not_exists_hook(tag):
        cli_helpers.print_str(tag, ["Doesn't exist"], "FALSE", False)

    def service_failed_to_remove_hook(tag):
        cli_helpers.print_str(tag, ["Failed to remove"], "FALSE", False)

    if is_all:
        services_to_remove = manager.available_user_services()
    else:
        services_to_remove = service_names

    for service_name in services_to_remove:
        manager.remove_service(service_name, service_successfully_removed_hook, service_does_not_exists_hook,
                               service_failed_to_remove_hook)
项目:globus-cli    作者:globus    | 项目源码 | 文件源码
def task_event_list(task_id, limit, filter_errors, filter_non_errors):
    """
    Executor for `globus task-event-list`
    """
    client = get_client()

    # cannot filter by both errors and non errors
    if filter_errors and filter_non_errors:
        raise click.UsageError("Cannot filter by both errors and non errors")

    elif filter_errors:
        filter_string = "is_error:1"

    elif filter_non_errors:
        filter_string = "is_error:0"

    else:
        filter_string = ""

    event_iterator = client.task_event_list(
        task_id, num_results=limit, filter=filter_string)

    formatted_print(event_iterator,
                    fields=(('Time', 'time'), ('Code', 'code'),
                            ('Is Error', 'is_error'), ('Details', 'details')),
                    json_converter=iterable_response_to_dict)
项目:globus-cli    作者:globus    | 项目源码 | 文件源码
def create_command(principal, permissions, endpoint_plus_path):
    """
    Executor for `globus endpoint permission create`
    """
    if not principal:
        raise click.UsageError(
            'A security principal is required for this command')

    endpoint_id, path = endpoint_plus_path
    principal_type, principal_val = principal

    client = get_client()

    if principal_type == 'identity':
        principal_val = maybe_lookup_identity_id(principal_val)
        if not principal_val:
            raise click.UsageError(
                'Identity does not exist. '
                'Use --provision-identity to auto-provision an identity.')
    elif principal_type == 'provision-identity':
        principal_val = maybe_lookup_identity_id(principal_val, provision=True)
        principal_type = 'identity'

    rule_data = assemble_generic_doc(
        'access', permissions=permissions, principal=principal_val,
        principal_type=principal_type, path=path)

    res = client.add_endpoint_acl_rule(endpoint_id, rule_data)
    formatted_print(res, text_format=FORMAT_TEXT_RECORD,
                    fields=[('Message', 'message'), ('Rule ID', 'access_id')])
项目:luda    作者:ryanolson    | 项目源码 | 文件源码
def exclusive(ctx_params, exclusive_params, error_message):
    """
    Enables defining mutually exclusive options.
    https://gist.github.com/thebopshoobop/51c4b6dce31017e797699030e3975dbf

    :param ctx_params: 
    :param exclusive_params: 
    :param error_message: 
    :return: 
    """
    if sum([1 if ctx_params[p] else 0 for p in exclusive_params]) > 1:
        raise click.UsageError(error_message)
项目:homely    作者:phodge    | 项目源码 | 文件源码
def _globals(command):
    def proxy(verbose, alwaysprompt, neverprompt, **kwargs):
        # handle these global options
        setverbose(verbose)
        if alwaysprompt:
            if neverprompt:
                raise UsageError("--alwaysprompt and --neverprompt options"
                                 " cannot be used together")
            setwantprompt(PROMPT_ALWAYS)
        elif neverprompt:
            setwantprompt(PROMPT_NEVER)
        # pass all other arguments on to the command
        command(**kwargs)
    proxy.__name__ = command.__name__
    proxy.__doc__ = command.__doc__
    proxy = option('-a', '--alwaysprompt', is_flag=True,
                   help="Always prompt the user to answer questions, even"
                   " named questions that they have answered on previous runs"
                   )(proxy)
    proxy = option('-n', '--neverprompt', is_flag=True,
                   help="Never prompt the user to answer questions. Questions"
                   " will be answered automatically using the user's previous"
                   " answer or the `noprompt` value.")(proxy)
    proxy = option('-v', '--verbose', 'verbose', is_flag=True,
                   help="Produce extra output")(proxy)
    return proxy
项目:stereo    作者:suda    | 项目源码 | 文件源码
def _check_paths(self):
        if not os.path.exists(self.data_file):
            raise click.UsageError("Data file not found: %s\n" % self.data_file)
        if self.template_file is not None and not os.path.exists(self.template_file):
            raise click.UsageError("Template file not found: %s\n" % self.template_file)
        if os.path.exists(self.output_dir) and not os.access(self.output_dir, os.W_OK):
            raise click.UsageError("Output is not writable: %s\n" % self.output_dir)

        if not os.path.exists(self.output_dir):
            os.makedirs(self.output_dir)
项目:incubator-ariatosca    作者:apache    | 项目源码 | 文件源码
def handle_parse_result(self, ctx, opts, args):
        if (self.name in opts) and self.mutually_exclusive.intersection(opts):
            raise click.UsageError('Illegal usage: {0}'.format(self._message))
        return super(MutuallyExclusiveOption, self).handle_parse_result(ctx, opts, args)
项目:photon    作者:metacloud    | 项目源码 | 文件源码
def main(az, workflow, resume, config):
    try:
        c = Config(az=az, workflow=workflow, config_file=config)
    except ConfigError as e:
        raise click.UsageError(str(e))

    p = Provisioner(c)
    p.run(resume)
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def validate_parent_id(ctx, param, value):
    if (ctx.params['resource_id'].startswith('sg-') and not value):
        raise click.UsageError(
            "Security Group lock status requires --parent-id flag")
    return value
项目:moniqueio-tools    作者:monique-io    | 项目源码 | 文件源码
def check_resp(r):
    if r.json()['success']:
        return
    if r.status_code == 401:
        raise click.UsageError('API call error: wrong API key')
    raise click.UsageError('API call error:\n%s' % pformat(r.json()['details']))
项目:moniqueio-tools    作者:monique-io    | 项目源码 | 文件源码
def req_api_key(self):
        if self.api_key is None:
            raise click.UsageError('"moniqueio --api-key" argument is not specified')
        return self.api_key
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def _check_reserve_usage(empty, memory, cpu, disk):
    """Checks params constraints for reserve verb."""
    if empty:
        if memory:
            raise click.UsageError('Cannot combine --empty and --memory')
        if cpu:
            raise click.UsageError('Cannot combine --empty and --cpu')
        if disk:
            raise click.UsageError('Cannot combine --empty and --disk')
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def _check_tenant_exists(restapi, allocation):
    """Check if tenant exist."""
    tenant_url = '/tenant/{}'.format(allocation)

    # Check if tenant exists.
    try:
        restclient.get(restapi, tenant_url).json()
    except restclient.NotFoundError:
        raise click.UsageError(
            'Allocation not found, '
            'run allocation configure {} --systems ...'.format(allocation))
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def init():
    """Return top level command handler."""

    @click.command()
    @click.option('--run/--no-run', is_flag=True, default=False)
    @click.option('--treadmill-id', help='Treadmill admin user.')
    @click.pass_context
    def spawn(ctx, treadmill_id, run):
        """Installs Treadmill spawn."""
        ctx.obj['PARAMS']['zookeeper'] = context.GLOBAL.zk.url
        ctx.obj['PARAMS']['ldap'] = context.GLOBAL.ldap.url

        dst_dir = ctx.obj['PARAMS']['dir']
        profile = ctx.obj['PARAMS'].get('profile')

        bootstrap.wipe(
            os.path.join(dst_dir, 'wipe_me'),
            os.path.join(dst_dir, 'bin', 'wipe_spawn.sh')
        )

        run_script = None
        if run:
            run_script = os.path.join(dst_dir, 'bin', 'run.sh')

        if treadmill_id:
            ctx.obj['PARAMS']['treadmillid'] = treadmill_id

        if not ctx.obj['PARAMS'].get('treadmillid'):
            raise click.UsageError(
                '--treadmill-id is required, '
                'unable to derive treadmill-id from context.')

        bootstrap.install(
            'spawn',
            dst_dir,
            ctx.obj['PARAMS'],
            run=run_script,
            profile=profile,
        )

    return spawn
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def _blackout_server(zkclient, server, reason):
    """Blackout server."""
    if not reason:
        raise click.UsageError('--reason is required.')

    path = z.path.blackedout_server(server)
    zkutils.ensure_exists(
        zkclient,
        path,
        acl=[zkutils.make_host_acl(server, 'rwcda')],
        data=str(reason)
    )
    presence.kill_node(zkclient, server)
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def handle_parse_result(self, ctx, opts, args):
        if self.mutually_exclusive.intersection(opts) and \
           self.name in opts:
            raise click.UsageError(
                "Illegal usage: `{}` is mutually exclusive with "
                "arguments `{}`.".format(
                    self.name,
                    ', '.join(self.mutually_exclusive)
                )
            )
        if self.name == _OPTIONS_FILE and self.name in opts:
            _file = opts.pop(_OPTIONS_FILE)
            for _param in ctx.command.params:
                opts[_param.name] = _param.default or \
                    _param.value_from_envvar(ctx) or ''
            with open(_file, 'r') as stream:
                data = yaml.load(stream)

            _command_name = ctx.command.name
            if data.get(_command_name, None):
                opts.update(data[_command_name])
            else:
                raise click.BadParameter(
                    'Manifest file should have %s scope' % _command_name
                )
            opts['vpc_id'] = opts.pop('vpc_name')
            ctx.params = opts

        return super().handle_parse_result(ctx, opts, args)
项目:restcli    作者:dustinrohde    | 项目源码 | 文件源码
def error(self, message):
        raise click.UsageError(message)
项目:pipenv    作者:pypa    | 项目源码 | 文件源码
def get_auto_shell():
    """Return the shell that is calling this process"""
    try:
        import psutil
        parent = psutil.Process(os.getpid()).parent()
        if platform.system() == 'Windows':
            parent = parent.parent() or parent
        return parent.name().replace('.exe', '')
    except ImportError:
        raise click.UsageError("Please explicitly give the shell type or install the psutil package to activate the"
                               " automatic shell detection.")
项目:molminer    作者:gorgitko    | 项目源码 | 文件源码
def ner(**kwargs):
    kwargs["no_standardize"] = not kwargs["no_standardize"]
    kwargs["no_convert_ions"] = not kwargs["no_convert_ions"]
    kwargs["no_header"] = not kwargs["no_header"]
    kwargs["no_normalize_text"] = not kwargs["no_normalize_text"]
    kwargs["no_annotation"] = not kwargs["no_annotation"]

    is_output_file = bool(kwargs["output"])

    stdin = click.get_text_stream("stdin")
    input_text = ""
    if not stdin.isatty():
        kwargs["input_file"] = ""
        input_text = click.get_text_stream("stdin").read().strip()
        if not input_text and not kwargs["input_file"]:
            raise click.UsageError("Cannot perform NER: stdin is empty and input file is not provided.")

    kwargs["opsin_types"] = get_opsin_types(kwargs["opsin_types"])

    init_kwargs = get_kwargs(kwargs, KWARGS_CHS_INIT)
    process_kwargs = get_kwargs(kwargs, KWARGS_CHS_PROCESS)

    chemspot = ChemSpot(**init_kwargs)
    result = chemspot.process(input_text=input_text, **process_kwargs)

    if kwargs["dry_run"]:
        print(result)
        exit(0)

    if kwargs["raw_output"]:
        print(result["stdout"])
        eprint(result["stderr"])
        exit(0)

    if not is_output_file:
        print(dict_to_csv(result["content"], csv_delimiter=kwargs["delimiter"], write_header=kwargs["no_header"]))
项目:molminer    作者:gorgitko    | 项目源码 | 文件源码
def convert(**kwargs):
    kwargs["no_header"] = not kwargs["no_header"]
    kwargs["no_normalize_plurals"] = not kwargs["no_normalize_plurals"]
    kwargs["no_standardize"] = not kwargs["no_standardize"]
    kwargs["opsin_no_allow_acids_without_acid"] = not kwargs["opsin_no_allow_acids_without_acid"]
    kwargs["opsin_no_detailed_failure_analysis"] = not kwargs["opsin_no_detailed_failure_analysis"]
    kwargs["opsin_no_allow_radicals"] = not kwargs["opsin_no_allow_radicals"]
    kwargs["opsin_no_allow_uninterpretable_stereo"] = not kwargs["opsin_no_allow_uninterpretable_stereo"]

    is_output_file = bool(kwargs["output"])

    stdin = click.get_text_stream("stdin")
    input_text = ""
    if not stdin.isatty():
        kwargs["input_file"] = ""
        input_text = click.get_text_stream("stdin").read().strip()
    if not input_text and not kwargs["input_file"]:
        raise click.UsageError("Cannot do conversion: stdin is empty and input file is not provided.")

    init_kwargs = get_kwargs(kwargs, KWARGS_OPSIN_INIT)
    process_kwargs = get_kwargs(kwargs, KWARGS_OPSIN_PROCESS)

    opsin = OPSIN(**init_kwargs)
    result = opsin.process(input=input_text, output_formats=["smiles", "inchi", "inchikey"], **process_kwargs)

    if kwargs["dry_run"]:
        print(result)
        exit(0)

    if kwargs["raw_output"]:
        print(result["stdout"])
        eprint(result["stderr"])
        exit(0)

    if not is_output_file:
        print(dict_to_csv(result["content"], csv_delimiter=kwargs["delimiter"], write_header=kwargs["no_header"]))
项目:keedi    作者:t-mart    | 项目源码 | 文件源码
def main(word, word_dist, word_dist_rate, keyboard_name):
    """
    keedi, keyboard usage stats for words

    Invoke keedi on a word with the --word <word> option or pass words in on
    stdin.
    """
    if word:
        words = [word]
    elif not sys.stdin.isatty():
        words = filter(None, (transform_word(w) for w in sys.stdin))
    else:
        click.ClickException("no --word specified or standard input given.")

    for w in words:
        word_dist_computation = None
        keyboard = KEYBOARDS[keyboard_name]

        wd = None
        if word_dist:
            wd = word_distance(w, keyboard)

        wdr = None
        if word_dist_rate:
            try:
                wdr = word_distance_rate(w, keyboard,
                                         precomputation=word_dist_computation)
            except IncomputableRateException as e:
                click.UsageError(e).show()
                if sys.stdin.isatty():
                    sys.exit(1)
                else:
                    continue
        click.echo('\t'.join(item for item in (str(wd), str(wdr), w) if item))
    sys.exit(0)
项目:connect    作者:openhsr    | 项目源码 | 文件源码
def main():
    try:
        cli.add_command(sync_command)
        cli.add_command(update_password)
        cli.add_command(daemon)
        cli.add_command(edit)
        cli.add_command(browserhelp)
        cli(standalone_mode=False)
    except click.UsageError as e:
        e.show()
        exit(1)
    except (exceptions.Error, click.ClickException) as e:
        logger.error(e)
        logger.debug(e, exc_info=True)
        exit(1)
项目:simphony-remote    作者:simphony    | 项目源码 | 文件源码
def init(ctx):
    """Initializes the database."""
    db = ctx.obj.db
    db_url = db.url

    # Check if the database already exists
    if is_sqlitedb_url(db_url) and sqlitedb_present(db_url):
        raise click.UsageError("Refusing to overwrite database "
                               "at {}".format(db_url))
    db.reset()


# -------------------------------------------------------------------------
# User commands
项目:simphony-remote    作者:simphony    | 项目源码 | 文件源码
def user(ctx):
    """Subcommand to manage users."""
    db = ctx.obj.db
    db_url = db.url

    # sqlite driver for sqlalchemy creates an empty file on commit as a side
    # effect. We don't want this creation to happen, so before attempting
    # the creation we stop short if we already find out that the file is
    # missing and cannot possibly be initialized.
    if is_sqlitedb_url(db_url) and not sqlitedb_present(db_url):
        raise click.UsageError("Could not find database at {}".format(db_url))
项目:simphony-remote    作者:simphony    | 项目源码 | 文件源码
def app(ctx):
    """Subcommand to manage applications."""
    db = ctx.obj.db
    db_url = db.url

    if is_sqlitedb_url(db_url) and not sqlitedb_present(db_url):
        raise click.UsageError("Could not find database at {}".format(db_url))
项目:cuvner    作者:meejah    | 项目源码 | 文件源码
def cuv(ctx, coverage_fname, exclude, branch):
    """
    Cuv'ner provides ways to visualize your project's coverage data.

    Everything works on the console and assumes a unicode and
    256-color capable terminal. There must be a .coverage file which
    is loaded for coverage data; it is assumed to be in the top level
    of your source code checkout.
    """
    if coverage_fname is None:
        coverage_fname = find_coverage_data('.')
        # coverage_fname still could be None

    cfg = Config()
    ctx.obj = cfg

    cfg.nice_width = min(80, click.get_terminal_size()[0])
    cfg.exclude = exclude

    cfg.branch = branch
    if coverage_fname is not None:
        cfg.data = coverage.Coverage(data_file=coverage_fname)
        cfg.data.load()
    else:
        raise click.UsageError(
            "No coverage data. Do you have a .coverage file?"
        )
项目:jira_pub_sync    作者:d-lobanov    | 项目源码 | 文件源码
def url_validation(cls, url):
        r = re.match("^https?:\/\/[\w\-\.]+\.[a-z]{2,6}\.?(\/[\w\.]*)*\/?$", url)
        if r is None:
            raise click.UsageError('Please, type valid URL')

        return url
项目:sodium11    作者:trbs    | 项目源码 | 文件源码
def test_persistfile_exists(self, filename, add_postfix=True):
        filename = self._get_persist_filename(filename, add_postfix=add_postfix)
        if os.path.isfile(filename):
            raise click.UsageError("File '%s' already exists" % filename)