Python click 模块,BadOptionUsage() 实例源码

我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用click.BadOptionUsage()

项目:csvtotable    作者:vividvilla    | 项目源码 | 文件源码
def cli(*args, **kwargs):
    """
    CSVtoTable commandline utility.
    """
    # Convert CSV file
    content = convert.convert(kwargs["input_file"], **kwargs)

    # Serve the temporary file in browser.
    if kwargs["serve"]:
        convert.serve(content)
    # Write to output file
    elif kwargs["output_file"]:
        # Check if file can be overwrite
        if (not kwargs["overwrite"] and
                not prompt_overwrite(kwargs["output_file"])):
            raise click.Abort()

        convert.save(kwargs["output_file"], content)
        click.secho("File converted successfully: {}".format(
            kwargs["output_file"]), fg="green")
    else:
        # If its not server and output file is missing then raise error
        raise click.BadOptionUsage("Missing argument \"output_file\".")
项目:filegardener    作者:smorin    | 项目源码 | 文件源码
def onlycopy(ctx, srcdir, srcfile, checkdir, relpath, failonerror, include_regex, include_src_regex, include_dst_regex):
    """
    onlycopy command prints list of all the files that aren't in the srcdir
    """

    if not (srcdir or srcfile):
        raise click.BadOptionUsage("The regex didn't compile. For correct usage"
                                   " see:  https://docs.python.org/2/library/re.html")
    # validate the regex options
    check_regex(include_regex)
    if include_regex is not None:
        include_src_regex = include_regex
        include_dst_regex = include_regex
    check_regex(include_src_regex)
    check_regex(include_dst_regex)

    if relpath:
        basepath = os.getcwd()
        for i in onlycopy_yield(srcdir, checkdir, failonerror, src_regex=include_src_regex, dst_regex=include_dst_regex,
                                srcfile=srcfile):
            click.echo(os.path.relpath(i, basepath))
    else:
        for i in onlycopy_yield(srcdir, checkdir, failonerror, src_regex=include_src_regex, dst_regex=include_dst_regex,
                                srcfile=srcfile):
            click.echo(i)
项目:zellij    作者:nedbat    | 项目源码 | 文件源码
def debug_type(s):
    # I am certain there's a better way to get click to do this...
    global DEBUGS
    debugs = [sp.strip() for sp in re.split(r"[ ,;]+", s)]
    debugs = [d for d in debugs if d]
    for d in debugs:
        if d not in VALID_DEBUGS:
            raise click.BadOptionUsage(f"--debug={d}?? Choose from {', '.join(VALID_DEBUGS)}")
    DEBUGS = debugs
    return debugs
项目:kuberdock-platform    作者:cloudlinux    | 项目源码 | 文件源码
def required_exactly_one_of(*arg_names):
    def decorator(fn):
        @wraps(fn)
        def wrapper(*args, **kwargs):
            n = sum(1 for arg_name in arg_names
                    if kwargs.get(arg_name, None) is not None)
            if n != 1:
                raise click.BadOptionUsage('Please specify exactly one of %s '
                                           % ', '.join(arg_names))
            return fn(*args, **kwargs)

        return wrapper

    return decorator
项目:elevation    作者:bopen    | 项目源码 | 文件源码
def clip(bounds, reference, **kwargs):
    if not bounds and not reference:
        raise click.BadOptionUsage("One of --bounds or --reference must be supplied.")
    if not bounds:
        bounds = spatial.import_bounds(reference)
    elevation.clip(bounds, **kwargs)
项目:bellows    作者:rcloran    | 项目源码 | 文件源码
def channel_mask(channels):
    mask = 0
    for channel in channels:
        if not (11 <= channel <= 26):
            raise click.BadOptionUsage("channels must be from 11 to 26")
        mask |= 1 << channel
    return mask
项目:jsoncsv    作者:alingse    | 项目源码 | 文件源码
def separator_type(sep):
    import click

    if len(sep) > 1:
        raise click.BadOptionUsage('separator can only be a char')
    if sep == unit_char:
        raise click.BadOptionUsage('separator can not be `\\` ')
    return sep
项目:filegardener    作者:smorin    | 项目源码 | 文件源码
def check_regex(regexstring):
    """ method to check that the regex works"""
    if regexstring is not None:
        try:
            compiledregex = re.compile(regexstring, flags=re.IGNORECASE)
            # result = compiledregex.match(string)
        except:
            raise click.BadOptionUsage("The regex didn't compile. For correct usage"
                                       " see:  https://docs.python.org/2/library/re.html")
项目:simphony-remote    作者:simphony    | 项目源码 | 文件源码
def revoke(ctx, image, user, revoke_all, allow_home, allow_view, volume):
    """Revokes access to application identified by IMAGE to a specific
    user USER and specified parameters."""
    allow_common = False
    source = target = mode = None

    if volume is not None:
        allow_common = True
        try:
            source, target, mode = parse_volume_string(volume)
        except ValueError as e:
            raise click.BadOptionUsage("volume", str(e))

    session = ctx.obj.session
    with orm.transaction(session):
        orm_app = session.query(orm.Application).filter(
            orm.Application.image == image).one()

        orm_user = session.query(orm.User).filter(
            orm.User.name == user).one()

        if revoke_all:
            session.query(orm.Accounting).filter(
                orm.Accounting.application == orm_app,
                orm.Accounting.user == orm_user,
                ).delete()

        else:
            orm_policy = session.query(orm.ApplicationPolicy).filter(
                orm.ApplicationPolicy.allow_home == allow_home,
                orm.ApplicationPolicy.allow_common == allow_common,
                orm.ApplicationPolicy.allow_view == allow_view,
                orm.ApplicationPolicy.volume_source == source,
                orm.ApplicationPolicy.volume_target == target,
                orm.ApplicationPolicy.volume_mode == mode).one()

            session.query(orm.Accounting).filter(
                orm.Accounting.application == orm_app,
                orm.Accounting.user == orm_user,
                orm.Accounting.application_policy == orm_policy,
            ).delete()
项目:anchore    作者:anchore    | 项目源码 | 文件源码
def audit(anchore_config, ctx, image, imagefile, include_allanchore):
    """
    Image IDs can be specified as hash ids, repo names (e.g. centos), or tags (e.g. centos:latest).
    """

    global config, imagelist, nav
    ecode = 0
    success = True
    config = anchore_config

    #include_allanchore = True

    if image and imagefile:
        raise click.BadOptionUsage('Can only use one of --image, --imagefile')

    #if image or imagefile:
    #    include_allanchore = False

    try:
        imagedict = build_image_list(anchore_config, image, imagefile, not (image or imagefile), include_allanchore)
        imagelist = imagedict.keys()
        try:
            ret = anchore_utils.discover_imageIds(imagelist)
        except ValueError as err:
            raise err
        else:
            imagelist = ret

    except Exception as err:
        anchore_print_err("could not load input images")
        sys.exit(1)
项目:apimas    作者:grnet    | 项目源码 | 文件源码
def get_command(self, ctx, name):
        config_file = ctx.params.get('config')
        try:
            conf = config.configure(path=config_file)
        except ex.ApimasException as e:
            raise click.BadOptionUsage(str(e))
        root_url, spec = conf['root'], conf['spec']
        cli = _construct_cli(root_url, spec)
        if name is None:
            return cli.get_base_command()
        return cli.endpoint_groups.get(name)
项目:kuberdock-platform    作者:cloudlinux    | 项目源码 | 文件源码
def main(paths, pipelines, pipelines_skip, pipeline_tags, infra_provider,
         live_log, all_tests, cluster_debug, junit_xml, test):
    if bool(all_tests) == bool(test):
        raise click.BadOptionUsage(
            'Should specify either --test NAME or --all-tests')

    if bool(pipelines) and bool(pipelines_skip):
        raise click.BadOptionUsage(
            'Can not use both --pipelines and --pipelines-skip')

    if cluster_debug and (infra_provider == 'aws'):
        # FIXME in AC-5210
        raise click.BadOptionUsage(
            'AWS does not have an automatic cleaner thus debug is forbidden.'
        )

    with closing(multilogger.init_handler(logger, live_log)) as multilog:
        discovered = discover_integration_tests(
            paths or [INTEGRATION_TESTS_PATH])

        print_msg(u'Discovered tests in:\n{}\n'.format('\n'.join(discovered)))

        filtered = _filter_by_infra_provider(registered_pipelines,
                                             infra_provider)
        if not pipelines:
            # Take effect only if pipelines to run are not set explicitly.
            filtered = _filter_by_pipeline_tags(filtered, pipeline_tags)
        filtered = _filter_by_include_exclude(filtered, pipelines,
                                              pipelines_skip)
        filtered = _filter_by_test_name(filtered, test)

        if os.environ.get('BUILD_CLUSTER') != '1' and len(filtered) > 1:
            sys.exit('Can not run multiple pipelines without BUILD_CLUSTER')

        slots = infra_provider_slots[infra_provider]
        print_msg(u'Requested pipelines: {}'.format(len(filtered)))
        print_msg(u'Infra provider "{}" slots: {}'.format(infra_provider,
                                                          slots))
        q_len = int(math.ceil(len(filtered) / float(slots)))
        if q_len > 1:
            print_msg(u'Pipelines will be queued into slots.')
            print_msg(u'Estimated longest queue: {} pipelines'.format(q_len))

        with ThreadPoolExecutor(max_workers=slots) as executor:
            for pipe, tests in filtered.items():
                full_name = u'{}_{}'.format(*pipe)
                executor.submit(run_tests_in_a_pipeline,
                                full_name, tests, cluster_debug)

        _print_logs(multilog, live_log)

        if junit_xml:
            write_junit_xml(junit_xml, test_results)

        if test_results.has_any_failures():
            sys.exit(1)
项目:adbons    作者:dbaelz    | 项目源码 | 文件源码
def __determine_device_id(ctx_params):
    # All attached devices
    devices = Adb.get_devices_as_list()

    if not devices:
        raise click.ClickException("No devices attached")

    device_param = ctx_params["device"]
    index_param = ctx_params["index"]
    if device_param is not None and index_param is not None:
        # Only one of the two options can be passed
        raise click.BadOptionUsage(
            "Only one of the options '--device' or '--index' is allowed")

    elif device_param is not None:
        # Only return the device id when the device is attached
        if __is_device_id_attached(devices, device_param):
            return device_param
        else:
            raise click.ClickException(
                "The device '%s' isn't attached" % device_param)

    elif index_param is not None:
        # Get the device id from the index
        # Only return the device id when the device is attached
        try:
            return devices[index_param][0]
        except IndexError:
            raise click.ClickException(
                "No device with the index '%s' available" % index_param)

    # When no option is provided, then read the local and global config
    device_id = Config.read_value(Config.SECTION_DEVICE,
                                  Config.KEY_DEFAULT)

    if device_id and __is_device_id_attached(devices, device_id):
        # Only return the device id when the device is attached
        return device_id

    if len(devices) == 1:
        # Last resort: Return the only attached device
        return devices[0][0]
    else:
        raise click.ClickException("Can't determine the best matching device")
项目:bellows    作者:rcloran    | 项目源码 | 文件源码
def config(ctx, config, all_):
    """Get/set configuration on the NCP"""
    click.secho(
        "NOTE: Configuration changes do not persist across resets",
        fg='red'
    )
    if config and all_:
        raise click.BadOptionUsage("Specify a config or --all, not both")

    if not (config or all_):
        raise click.BadOptionUsage("One of config or --all must be specified")

    s = yield from util.setup(ctx.obj['device'], ctx.obj['baudrate'], util.print_cb)

    if all_:
        for config in t.EzspConfigId:
            v = yield from s.getConfigurationValue(config)
            if v[0] == t.EzspStatus.ERROR_INVALID_ID:
                continue
            click.echo("%s=%s" % (config.name, v[1]))
        s.close()
        return

    if '=' in config:
        config, value = config.split("=", 1)
        if config.isdigit():
            try:
                config = t.EzspConfigId(int(config))
            except ValueError:
                raise click.BadArgumentUsage("Invalid config ID: %s" % (
                    config,
                ))
        else:
            try:
                config = t.EzspConfigId[config]
            except KeyError:
                raise click.BadArgumentUsage("Invalid config name: %s" % (
                    config,
                ))
        try:
            value = t.uint16_t(value)
            if not (0 <= value <= 65535):
                raise ValueError("%s out of allowed range 0..65535" % (
                    value,
                ))
        except ValueError as e:
            raise click.BadArgumentUsage("Invalid value: %s" % (e, ))

        v = yield from s.setConfigurationValue(config, value)
        click.echo(v)
        s.close()
        return

    v = yield from s.getConfigurationValue(config)
    click.echo(v)
项目:anchore    作者:anchore    | 项目源码 | 文件源码
def query(anchore_config, image, imagefile, include_allanchore, module):
    """
    Image IDs can be specified as hash ids, repo names (e.g. centos), or tags (e.g. centos:latest).

    Execute the specified query (module) with any parameters it requires. Modules are scripts in a specific location.

    Each query has its own parameters and outputs.

    Examples using pre-defined queries:

    'anchore query --image nginx:latest list-packages all'
    'anchore query has-package wget'
    'anchore query --image nginx:latest list-files-detail all'
    'anchore query cve-scan all'

    """

    global config, imagelist, nav
    ecode = 0
    success = True
    config = anchore_config

    if module:
        if image and imagefile:
            raise click.BadOptionUsage('Can only use one of --image, --imagefile')

        try:
            imagedict = build_image_list(anchore_config, image, imagefile, not (image or imagefile), include_allanchore)
            imagelist = imagedict.keys()

            try:
                ret = anchore_utils.discover_imageIds(imagelist)
            except ValueError as err:
                raise err
            else:
                #imagelist = ret.keys()
                imagelist = ret

        except Exception as err:
            anchore_print_err("could not load input images")
            sys.exit(1)

    try:
        nav = init_nav_contexts()

        result = nav.run_query(list(module))
        if result:
            anchore_utils.print_result(config, result)

        if nav.check_for_warnings(result):
            ecode = 2

    except:
        anchore_print_err("query operation failed")
        ecode = 1

    contexts['anchore_allimages'].clear()
    sys.exit(ecode)
项目:anchore    作者:anchore    | 项目源码 | 文件源码
def build_image_list(config, image, imagefile, all_local, include_allanchore, dockerfile=None, exclude_file=None):
    """Given option inputs from the cli, construct a list of image ids. Includes all found with no exclusion logic"""

    if not image and not (imagefile or all_local):
        raise click.BadOptionUsage('No input found for image source. One of <image>, <imagefile>, or <all> must be specified')

    if image and imagefile:
        raise click.BadOptionUsage('Only one of <image> and <imagefile> can be specified')

    filter_images = []
    if exclude_file:
        with open(exclude_file) as f:
            for line in f.readlines():
                filter_images.append(line.strip())

    imagelist = {}
    if image:
        imagelist[image] = {'dockerfile':dockerfile}

    if imagefile:
        filelist = anchore_utils.read_kvfile_tolist(imagefile)
        for i in range(len(filelist)):
            l = filelist[i]
            imageId = l[0]
            try:
                dfile = l[1]
            except:
                dfile = None
            imagelist[imageId] = {'dockerfile':dfile}

    if all_local:
        docker_cli = contexts['docker_cli']
        if docker_cli:
            for f in docker_cli.images(all=True, quiet=True, filters={'dangling': False}):
                if f not in imagelist and f not in filter_images:
                    imagelist[f] = {'dockerfile':None}
        else:
            raise Exception("Could not load any images from local docker host - is docker running?")

    if include_allanchore:
        ret = contexts['anchore_db'].load_all_images().keys()
        if ret and len(ret) > 0:
            for l in list(set(imagelist.keys()) | set(ret)):
                imagelist[l] = {'dockerfile':None}

    # Remove excluded items
    for excluded in filter_images:
        docker_cli = contexts['docker_cli']
        if not docker_cli:
            raise Exception("Could not query docker - is docker running?")
        for img in docker_cli.images(name=excluded, quiet=True):
            imagelist.pop(img, None)

    return imagelist