Python click 模块,Path() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用click.Path()

项目:pip-update-requirements    作者:alanhamlett    | 项目源码 | 文件源码
def parse_requirement_line(line, filename, line_number, session, finder):
    """Parse a requirement line and return an InstallRequirement instance.

    :param line:         One line from a requirements.txt file.
    :param filename:     Path to a requirements.txt file.
    :param line_number:  The integer line number of the current line.
    :param session:      Instance of pip.download.PipSession.
    :param finder:       Instance of pip.download.PackageFinder.
    """

    if not line:
        return None

    reqs = list(req_file.process_line(
                line, filename, line_number, session=session, finder=finder))
    return reqs[0] if len(reqs) > 0 else None
项目:lopocs    作者:Oslandia    | 项目源码 | 文件源码
def create_potree_page(work_dir, server_url, tablename, column):
    '''Create an html demo page with potree viewer
    '''
    # get potree build
    potree = os.path.join(work_dir, 'potree')
    potreezip = os.path.join(work_dir, 'potree.zip')
    if not os.path.exists(potree):
        download('Getting potree code', 'http://3d.oslandia.com/potree.zip', potreezip)
        # unzipping content
        with ZipFile(potreezip) as myzip:
            myzip.extractall(path=work_dir)
    tablewschema = tablename.split('.')[-1]
    sample_page = os.path.join(work_dir, 'potree-{}.html'.format(tablewschema))
    abs_sample_page = str(Path(sample_page).absolute())
    pending('Creating a potree demo page : file://{}'.format(abs_sample_page))
    resource = '{}.{}'.format(tablename, column)
    server_url = server_url.replace('http://', '')
    with io.open(sample_page, 'wb') as html:
        html.write(potree_page.format(resource=resource, server_url=server_url).encode())
    ok()
项目:lopocs    作者:Oslandia    | 项目源码 | 文件源码
def create_cesium_page(work_dir, tablename, column):
    '''Create an html demo page with cesium viewer
    '''
    cesium = os.path.join(work_dir, 'cesium')
    cesiumzip = os.path.join(work_dir, 'cesium.zip')
    if not os.path.exists(cesium):
        download('Getting cesium code', 'http://3d.oslandia.com/cesium.zip', cesiumzip)
        # unzipping content
        with ZipFile(cesiumzip) as myzip:
            myzip.extractall(path=work_dir)
    tablewschema = tablename.split('.')[-1]
    sample_page = os.path.join(work_dir, 'cesium-{}.html'.format(tablewschema))
    abs_sample_page = str(Path(sample_page).absolute())
    pending('Creating a cesium demo page : file://{}'.format(abs_sample_page))
    resource = '{}.{}'.format(tablename, column)
    with io.open(sample_page, 'wb') as html:
        html.write(cesium_page.format(resource=resource).encode())
    ok()
项目:linchpin    作者:CentOS-PaaS-SIG    | 项目源码 | 文件源码
def up(ctx, targets, run_id):
    """
    Provisions nodes from the given target(s) in the given PinFile.

    pinfile:    Path to pinfile (Default: workspace path)

    targets:    Provision ONLY the listed target(s). If omitted, ALL targets in
    the appropriate PinFile will be provisioned.
    """

    pf_w_path = _get_pinfile_path()

    try:
        return_code, results = lpcli.lp_up(pf_w_path, targets, run_id=run_id)
        _handle_results(ctx, results, return_code)

    except LinchpinError as e:
        ctx.log_state(e)
        sys.exit(1)
项目:linchpin    作者:CentOS-PaaS-SIG    | 项目源码 | 文件源码
def destroy(ctx, targets, run_id):
    """
    Destroys nodes from the given target(s) in the given PinFile.

    pinfile:    Path to pinfile (Default: workspace path)

    targets:    Destroy ONLY the listed target(s). If omitted, ALL targets in
    the appropriate PinFile will be destroyed.

    """

    pf_w_path = _get_pinfile_path()

    try:
        return_code, results = lpcli.lp_destroy(pf_w_path,
                                                targets,
                                                run_id=run_id)

        _handle_results(ctx, results, return_code)

    except LinchpinError as e:
        ctx.log_state(e)
        sys.exit(1)
项目:q2cli    作者:qiime2    | 项目源码 | 文件源码
def get_click_options(self):
        import click
        import q2cli
        import q2cli.core

        name = '--' + self.cli_name
        type = click.Path(exists=True, file_okay=True, dir_okay=False,
                          readable=True)
        type = q2cli.core.MultipleType(type)
        help = ('Metadata file or artifact viewable as metadata. This '
                'option may be supplied multiple times to merge metadata.')

        if self.default is None:
            requirement = '[optional]'
        else:
            requirement = '[required]'

        option = q2cli.Option([name], type=type, help=help, multiple=True)
        yield self._add_description(option, requirement)
项目:reqwire    作者:darvid    | 项目源码 | 文件源码
def main(ctx,               # type: click.Context
         directory,         # type: click.Path
         quiet,             # type: bool
         extension,         # type: str
         source_directory,  # type: str
         build_directory,   # type: str
         ):
    # type: (...) -> None
    """reqwire: micromanages your requirements."""
    requirements_dir = pathlib.Path(str(directory))
    console.verbose = not quiet
    ctx.obj = {
        'build_dir': build_directory,
        'directory': requirements_dir,
        'extension': extension,
        'source_dir': source_directory,
    }
项目:lightflow    作者:AustralianSynchrotron    | 项目源码 | 文件源码
def config_examples(dest, user_dir):
    """ Copy the example workflows to a directory.

    \b
    DEST: Path to which the examples should be copied.
    """
    examples_path = Path(lightflow.__file__).parents[1] / 'examples'
    if examples_path.exists():
        dest_path = Path(dest).resolve()
        if not user_dir:
            dest_path = dest_path / 'examples'

        if dest_path.exists():
            if not click.confirm('Directory already exists. Overwrite existing files?',
                                 default=True, abort=True):
                return
        else:
            dest_path.mkdir()

        for example_file in examples_path.glob('*.py'):
            shutil.copy(str(example_file), str(dest_path / example_file.name))
        click.echo('Copied examples to {}'.format(str(dest_path)))
    else:
        click.echo('The examples source path does not exist')
项目:pytgasu    作者:alemonmk    | 项目源码 | 文件源码
def prepare(sets):
    """Prepare sticker sets to be uploaded by this tool.

    Reads any given directories, process any file Telegram won't accept.
    Generates and overwrites existing .ssd file.
    """
    # TODO: do first-run configurations to setting paths to executables of pngquant and/or waifu2x

    from pytgasu.prepare import SetDefGenerator, PrepareImageFiles

    for set_dir in sets:
        set_dir = Path(set_dir).resolve()
        PrepareImageFiles(set_dir=set_dir)
        SetDefGenerator(set_dir=set_dir)

    print(NOTICE_GO_EDIT_DEFS)
项目:pytgasu    作者:alemonmk    | 项目源码 | 文件源码
def logout():
    """Logout from Telegram."""
    if not Path(PATH_TGSESSION_FILE).exists():
        # Return early because TelegramClient creates a session first,
        # but only when you can log_out() the session file gets deleted.
        # If it's not there, don't create it and waste time talking to Telegram.
        print(ERROR_NOT_LOGGEDIN)
        return

    from telethon import TelegramClient
    from pytgasu.upload import CustomisedSession

    tc = TelegramClient(
        session=CustomisedSession.try_load_or_create_new(),
        api_id=TG_API_ID,
        api_hash=TG_API_HASH)
    tc.connect()
    # I guess even if user is not authorised, invoking LogOutRequest does not cause problems
    tc.log_out()
    Path(PATH_TGSESSION_FILE).parent.unlink()
项目:ceph-lcm    作者:Mirantis    | 项目源码 | 文件源码
def copy_plugin_contents(path, plugin):
    module_name = plugin.module_name.split(".", 1)[0]
    plugin_path = path.joinpath("plugin")
    plugin_path.mkdir()

    for entry in plugin.dist.resource_listdir(module_name):
        if entry == "__pycache__":
            continue

        filename = plugin.dist.get_resource_filename(
            pkg_resources._manager,
            os.path.join(module_name, entry)
        )
        filename = pathlib.Path(filename).absolute()
        destpath = plugin_path.joinpath(filename.name)

        if filename.is_dir():
            shutil.copytree(filename.as_posix(), destpath.as_posix(),
                            symlinks=True)
        else:
            shutil.copy2(filename.as_posix(), destpath.as_posix(),
                         follow_symlinks=False)
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def init():
    """Return top level command handler."""

    @click.command()
    @click.option('--api', required=False, help='API url to use.',
                  envvar='TREADMILL_RESTAPI')
    @click.option('-m', '--manifest', help='App manifest file (stream)',
                  type=click.Path(exists=True, readable=True))
    @click.option('--delete', help='Delete the app.',
                  is_flag=True, default=False)
    @click.argument('appname', required=False)
    @cli.handle_exceptions(restclient.CLI_REST_EXCEPTIONS)
    def configure(api, manifest, delete, appname):
        """Configure a Treadmill app"""
        restapi = context.GLOBAL.admin_api(api)
        if appname:
            if delete:
                return _delete(restapi, appname)
            return _configure(restapi, manifest, appname)
        else:
            return _list(restapi)

    return configure
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def init():
    """Return top level command handler."""

    @click.command()
    @click.argument('inputfile', type=click.Path(exists=True))
    @click.argument('params', nargs=-1,
                    type=click.Path(exists=True, readable=True))
    def interpolate(inputfile, params):
        """Interpolate input file template."""
        env = jinja2.Environment(
            loader=jinja2.FileSystemLoader(os.path.dirname(inputfile)),
            keep_trailing_newline=True
        )

        data = {}
        for param in params:
            with io.open(param, 'rb') as fd:
                data.update(yaml.load(stream=fd))

        cli.out(env.get_template(os.path.basename(inputfile)).render(data))

    return interpolate
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def init():
    """Return top level command handler"""

    @click.command()
    @click.option('--treadmill-root', type=click.Path(exists=True),
                  envvar='TREADMILL_APPROOT', required=True,
                  help='Treadmill root path.')
    @click.option('--upload-user',
                  envvar='TREADMILL_ID', required=True,
                  help='Upload postmortem statistics with this user.')
    @click.option('--upload-url',
                  help='Upload postmortem statistics to this file url.')
    def collect(treadmill_root, upload_user, upload_url):
        """Collect Treadmill node data"""
        os.environ['TREADMILL_ID'] = upload_user
        postmortem.run(treadmill_root, upload_url)

    return collect
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def init():
    """Top level command handler."""

    @click.command()
    @click.option('--approot', type=click.Path(exists=True),
                  envvar='TREADMILL_APPROOT', required=True)
    @click.option('--runtime', envvar='TREADMILL_RUNTIME', required=True)
    @click.argument('container_dir', type=click.Path(exists=True))
    def finish(approot, runtime, container_dir):
        """Finish treadmill application on the node."""
        # Run with finish context as finish runs in cleanup.
        with lc.LogContext(_LOGGER, os.path.basename(container_dir),
                           lc.ContainerAdapter) as log:
            log.info('finish (approot %s)', approot)
            tm_env = appenv.AppEnvironment(approot)

            app_runtime.get_runtime(runtime, tm_env, container_dir).finish()

    return finish
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def init():
    """Top level command handler."""

    @click.command()
    @click.option('--approot', type=click.Path(exists=True),
                  envvar='TREADMILL_APPROOT', required=True)
    @click.option('--runtime', envvar='TREADMILL_RUNTIME', required=True)
    @click.argument('eventfile', type=click.Path(exists=True))
    def configure(approot, runtime, eventfile):
        """Configure local manifest and schedule app to run."""
        tm_env = appenv.AppEnvironment(root=approot)

        container_dir = app_cfg.configure(tm_env, eventfile, runtime)
        _LOGGER.info('Configured %r', container_dir)

    return configure
项目:TigerHost    作者:naphatkrit    | 项目源码 | 文件源码
def clone(force):
    """Clone a copy of the TigerHost project to
    a private location and set the project path
    to the cloned location.
    """
    path = default_project_path()
    if os.path.exists(path):
        if not force:
            click.confirm('Path {} already exists. Continuing will remove this path.'.format(
                path), default=True, abort=True)
        if os.path.isdir(path):
            shutil.rmtree(path)
        else:
            os.remove(path)
    click.echo('Cloning to {}...'.format(path), nl=False)
    clone_project()
    click.secho('Done', fg='black', bg='green')
    save_project_path(path)
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def modpack_file(path: Path) -> Generator[ModPack, None, None]:
    """Context manager for manipulation of existing mod-pack.

    Keyword arguments:
        path: Path to the existing ModPack file, which should be provided.

    Yields:
        ModPack loaded from path. If no exception occurs, the provided modpack
        is written (with changes) back to the file on context exit.
    """

    with path.open(encoding='utf-8', mode='r') as istream:
        mp = ModPack.load(istream)

    yield mp

    with path.open(encoding='utf-8', mode='w') as ostream:
        mp.dump(ostream)
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def new(ctx, pack, path, gamever):
    """Create and initialize a new mod-pack."""

    # Check file system state
    pack_path = Path(pack)
    mods_path = Path(path)

    if not pack_path.parent.exists():
        msg = _('Mod-pack directory does not exists: {}').format(pack_path.parent)
        raise UserReport(msg)
    # Mods path existence is checked by click

    # Setup game fro the mod-pack
    game = ctx['default_game']
    if gamever is not None:
        game.version = gamever

    with pack_path.open(mode='w', encoding='utf-8') as stream:
        mp = ModPack(game, mods_path.relative_to(pack_path.parent))
        mp.dump(stream)
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def install(ctx, pack, release, mod):
    """Install new MOD into a mod-pack."""

    with modpack_file(Path(pack)) as pack:
        moddb = pack.game.database
        mod = Mod.find(moddb.session(), mod)

        proxy_session = requests.Session()
        with ctx['token_path'].open(encoding='utf-8') as token:
            proxy_session.auth = Authorization.load(token)

        changes = pack.install_changes(
            mod=mod,
            min_release=Release[release.capitalize()],
            session=proxy_session,
        )
        pack.apply(changes)
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def upgrade(ctx, pack, release, mod):
    """Upgrade MOD and its dependencies."""

    with modpack_file(Path(pack)) as pack:
        moddb = pack.game.database
        mod = Mod.find(moddb.session(), mod)

        proxy_session = requests.Session()
        with ctx['token_path'].open(encoding='utf-8') as token:
            proxy_session.auth = Authorization.load(token)

        changes = pack.upgrade_changes(
            mod=mod,
            min_release=Release[release.capitalize()],
            session=proxy_session,
        )
        if not changes:
            raise AlreadyUpToDate(mod.name)

        pack.apply(changes)
项目:yui    作者:item4    | 项目源码 | 文件源码
def load_config(func):
    """Decorator for add load config file option"""

    @functools.wraps(func)
    def internal(*args, **kwargs):
        filename = kwargs.pop('config')
        if filename is None:
            click.echo('--config option is required.', err=True)
            raise SystemExit(1)

        config = load(pathlib.Path(filename))
        kwargs['config'] = config
        return func(*args, **kwargs)

    decorator = click.option(
        '--config',
        '-c',
        type=click.Path(exists=True),
        envvar='YUI_CONFIG_FILE_PATH',
    )

    return decorator(internal)
项目:git-big    作者:vertexai    | 项目源码 | 文件源码
def cmd_status(self):
        click.echo('On branch %s' % self.repo.active_branch)
        click.echo()
        click.echo('  Working')
        click.echo('    Cache')
        click.echo('      Depot')
        click.echo('          SHA-256    Size Path')
        for entry in self._entries():
            if self.depot:
                self.depot.get_status(entry)
            if entry.is_linked:
                w_bit = 'W'
            elif entry.in_working:
                w_bit = '*'
            else:
                w_bit = ' '
            c_bit = entry.in_cache and 'C' or ' '
            d_bit = entry.in_depot and 'D' or ' '
            click.echo('[ {} {} {} ] {} {:>6} {}'.format(
                w_bit, c_bit, d_bit, entry.digest[:8],
                human_size(entry.size), entry.rel_path))
        click.echo()
项目:origin-ci-tool    作者:openshift    | 项目源码 | 文件源码
def sync_source_option(func):
    """
    Add the sync source option to the decorated command func.

    :param func: Click CLI command to decorate
    :return: decorated CLI command
    """
    return option(
        '--src',
        '-s',
        'sync_source',
        type=Path(
            exists=True,
            file_okay=False,
            resolve_path=True,
        ),
        help='Local directory from which to sync. Optional.',
    )(func)
项目:origin-ci-tool    作者:openshift    | 项目源码 | 文件源码
def sync_destination_option(func):
    """
    Add the sync destination option to the decorated command func.

    :param func: Click CLI command to decorate
    :return: decorated CLI command
    """
    return option(
        '--dest',
        '-d',
        'sync_destination',
        type=Path(
            file_okay=False,
        ),
        help='Remote directory to sync to. Optional.',
    )(func)
项目:origin-ci-tool    作者:openshift    | 项目源码 | 文件源码
def make_directory_override_option(func):
    """
    Add the make destination option to the decorated command func.

    :param func: Click CLI command to decorate
    :return: decorated CLI command
    """
    return option(
        '--dest',
        '-d',
        'make_destination',
        type=Path(
            file_okay=False,
        ),
        help='Remote directory to run make in. Optional.',
    )(func)
项目:open-wob-api    作者:openstate    | 项目源码 | 文件源码
def extract_start(source_id, sources_config):
    """
    Start extraction for a pipeline specified by ``source_id`` defined in
    ``--sources-config``. ``--sources-config defaults to ``settings.SOURCES_CONFIG_FILE``.

    :param sources_config: Path to file containing pipeline definitions. Defaults to the value of ``settings.SOURCES_CONFIG_FILE``
    :param source_id: identifier used in ``--sources_config`` to describe pipeline
    """
    sources = load_sources_config(sources_config)

    # Find the requested source definition in the list of available sources
    source = None
    for candidate_source in sources:
        if candidate_source['id'] == source_id:
            source = candidate_source
            continue

    # Without a config we can't do anything, notify the user and exit
    if not source:
        click.echo('Error: unable to find source with id "%s" in sources '
                   'config' % source_id)
        return

    setup_pipeline(source)
项目:memote    作者:opencobra    | 项目源码 | 文件源码
def run(model, collect, filename, directory, ignore_git, pytest_args):
    """
    Run the test suite and collect results.

    MODEL: Path to model file. Can also be supplied via the environment variable
    MEMOTE_MODEL or configured in 'setup.cfg' or 'memote.ini'.
    """
    if ignore_git:
        repo = None
    else:
        repo = callbacks.probe_git()
    if not any(a.startswith("--tb") for a in pytest_args):
        pytest_args = ["--tb", "short"] + pytest_args
    if not any(a.startswith("-v") for a in pytest_args):
        pytest_args.append("-vv")
    if collect:
        if repo is not None and directory is not None:
            filename = join(directory,
                            "{}.json".format(repo.active_branch.commit.hexsha))
        code = api.test_model(model, filename, pytest_args=pytest_args)
    else:
        code = api.test_model(model, pytest_args=pytest_args)
    sys.exit(code)
项目:hocrviewer-mirador    作者:jbaiter    | 项目源码 | 文件源码
def index_documents(ctx, hocr_files, autocomplete_min_count):
    def show_fn(hocr_path):
        if hocr_path is None:
            return ''
        else:
            return hocr_path.name
    global repository
    if repository is None:
        repository = DatabaseRepository(ctx.obj['DB_PATH'])

    hocr_files = tuple(pathlib.Path(p) for p in hocr_files)
    with click.progressbar(hocr_files, item_show_func=show_fn) as hocr_files:
        for hocr_path in hocr_files:
            try:
                repository.ingest_document(hocr_path, autocomplete_min_count)
            except Exception as e:
                logger.error("Could not ingest {}".format(hocr_path))
                logger.exception(e)
项目:pip-update-requirements    作者:alanhamlett    | 项目源码 | 文件源码
def get_requirements_and_latest(filename, force=False):
    """Parse a requirements file and get latest version for each requirement.

    Yields a tuple of (original line, InstallRequirement instance,
    spec_versions, latest_version).

    :param filename:  Path to a requirements.txt file.
    :param force:     Force getting latest version even for packages without
                      a version specified.
    """
    session = PipSession()
    finder = PackageFinder(
        session=session, find_links=[], index_urls=[PyPI.simple_url])

    _, content = get_file_content(filename, session=session)
    for line_number, line, orig_line in yield_lines(content):
        line = req_file.COMMENT_RE.sub('', line)
        line = line.strip()
        req = parse_requirement_line(line, filename, line_number, session, finder)
        if req is None or req.name is None or req_file.SCHEME_RE.match(req.name):
            yield (orig_line, None, None, None)
            continue
        spec_ver = current_version(req)
        if spec_ver or force:
            latest_ver = latest_version(req, session, finder)
            yield (orig_line, req, spec_ver, latest_ver)
项目:lopocs    作者:Oslandia    | 项目源码 | 文件源码
def tileset(table, column, server_url, work_dir):
    """
    (Re)build a tileset.json for a given table
    """
    # intialize flask application
    create_app()

    work_dir = Path(work_dir)

    if '.' not in table:
        table = 'public.{}'.format(table)

    lpsession = Session(table, column)
    # initialize range for level of details
    fullbbox = lpsession.boundingbox
    bbox = [
        fullbbox['xmin'], fullbbox['ymin'], fullbbox['zmin'],
        fullbbox['xmax'], fullbbox['ymax'], fullbbox['zmax']
    ]
    pending('Building tileset from database')
    hcy = threedtiles.build_hierarchy_from_pg(
        lpsession, server_url, bbox
    )
    ok()
    tileset = os.path.join(str(work_dir.resolve()), 'tileset-{}.{}.json'.format(table, column))
    pending('Writing tileset to disk')
    with io.open(tileset, 'wb') as out:
        out.write(hcy.encode())
    ok()
项目:lopocs    作者:Oslandia    | 项目源码 | 文件源码
def demo(sample, work_dir, server_url, usewith):
    '''
    download sample lidar data, load it into pgpointcloud
    '''
    srid = None
    if isinstance(samples[sample], (list, tuple)):
        # srid given
        srid = samples[sample][0]
        download_link = samples[sample][1]
    else:
        download_link = samples[sample]
    filepath = Path(download_link)
    pending('Using sample data {}: {}'.format(sample, filepath.name))
    dest = os.path.join(work_dir, filepath.name)
    ok()

    if not os.path.exists(dest):
        download('Downloading sample', download_link, dest)

    # now load data
    if srid:
        _load(dest, sample, 'points', work_dir, server_url, 400, usewith, srid=srid)
    else:
        _load(dest, sample, 'points', work_dir, server_url, 400, usewith)

    click.echo(
        'Now you can test lopocs server by executing "lopocs serve"'
        .format(sample)
    )
项目:yamlpal    作者:jorisroovers    | 项目源码 | 文件源码
def find_element(yaml_dict, search_str):
    """ Given a dictionary representing a yaml document and a yaml path string, find the specified element in the
        dictionary."""

    # First split on / to determine which yaml dict we are searching in
    dict_parts = search_str.split("/")
    parsed_parts = []

    for dict_part in dict_parts:
        matches = re.match("(.*)(\[([0-9]+)\])", dict_part)
        if matches:
            list_name = matches.groups()[0]
            list_index = int(matches.groups()[2])
            parsed_parts.append(list_name)
            parsed_parts.append(list_index)
        else:
            parsed_parts.append(dict_part)

    # traverse the yaml path
    node = yaml_dict
    try:
        for key in parsed_parts:
            node = node[key]

    except (KeyError, IndexError, TypeError):
        raise exceptions.InvalidSearchStringException(search_str)

    # Try accessing the line of the path we are currently on. If we can't access it,
    # it means that the user has specified a path to a dict or list, without indicating an item within the
    # dictionary or list.
    try:
        node.line
        node.key = parsed_parts[-1]  # add the last parsed key as the node's key
    except AttributeError:
        click.echo("ERROR: Path exists but not specific enough (%s)." % search_str, err=True)
        exit(1)
    return node
项目:py-noisemaker    作者:aayars    | 项目源码 | 文件源码
def clut_option(**attrs):
    attrs.setdefault("help", "Color lookup table (path to PNG or JPEG image)")

    return str_option("--clut", type=click.Path(exists=True, dir_okay=False, resolve_path=True), **attrs)
项目:py-noisemaker    作者:aayars    | 项目源码 | 文件源码
def input_dir_option(**attrs):
    attrs.setdefault("help", "Input directory containing .jpg and/or .png images, for collage functions")

    return str_option("--input-dir", type=click.Path(exists=True, file_okay=False, resolve_path=True), **attrs)
项目:py-noisemaker    作者:aayars    | 项目源码 | 文件源码
def name_option(default=None, **attrs):
    attrs.setdefault("help", "Filename for image output (should end with .png or .jpg)")

    return str_option("--name", type=click.Path(dir_okay=False), default=default or "noise.png", **attrs)
项目:q2cli    作者:qiime2    | 项目源码 | 文件源码
def get_click_options(self):
        import click
        import q2cli

        yield q2cli.Option(
            ['--' + self.cli_name],
            type=click.Path(exists=False, dir_okay=True, file_okay=False,
                            writable=True),
            help='Output unspecified results to a directory')
项目:q2cli    作者:qiime2    | 项目源码 | 文件源码
def get_click_options(self):
        import click
        import q2cli

        yield q2cli.Option(
            ['--' + self.cli_name],
            type=click.Path(exists=True, dir_okay=False, file_okay=True,
                            readable=True),
            help='Use config file for command options')
项目:pib    作者:datawire    | 项目源码 | 文件源码
def load_envfile(config_path):
    """Load an Envfile.yaml into a envfile.System given a Path."""
    try:
        with config_path.open() as f:
            return _load_envfile(safe_load(f.read()))
    except ValidationError as e:
        click.echo("Error loading Envfile.yaml:")
        for error in e.errors:
            click.echo("---\n" + error)
        exit(1)
项目:pib    作者:datawire    | 项目源码 | 文件源码
def cli_deploy(logfile, directory, envfile_path):
    envfile = load_envfile(Path(envfile_path))
    directory = Path(directory)
    run_local = start(logfile)
    redeploy(run_local, envfile, directory)
    print_service_url(run_local, envfile)
项目:pib    作者:datawire    | 项目源码 | 文件源码
def cli_watch(logfile, directory, envfile_path):
    envfile = load_envfile(Path(envfile_path))
    directory = Path(directory)
    run_local = start(logfile)
    redeploy(run_local, envfile, directory)
    print_service_url(run_local, envfile)
    watch(run_local, envfile, directory)
项目:qface    作者:Pelagicore    | 项目源码 | 文件源码
def antlr():
    """generate a new parser based on the grammar using antlr"""
    cwd = str(Path('qface/idl/parser').abspath())
    sh('antlr4 -Dlanguage=Python3 -Werror -package qface.idl.parser -o . -listener -visitor T.g4', cwd=cwd)
项目:qface    作者:Pelagicore    | 项目源码 | 文件源码
def on_any_event(self, event):
        if event.is_directory:
            return
        if Path(event.src_path).ext == '.py':
            sh('python3 -m pytest')
项目:qface    作者:Pelagicore    | 项目源码 | 文件源码
def run(self):
        if self.is_running:
            return
        self.is_running = True
        sh(self.script, cwd=Path.getcwd())
        self.is_running = False
项目:qface    作者:Pelagicore    | 项目源码 | 文件源码
def reload(script, input, output):
    """
    reloads the generator script when the script files
    or the input files changes
    """
    script = Path(script).expand().abspath()
    output = Path(output).expand().abspath()
    input = input if isinstance(input, (list, tuple)) else [input]
    output.makedirs_p()
    _script_reload(script, input, output)
项目:qface    作者:Pelagicore    | 项目源码 | 文件源码
def _script_reload(script, input, output):
    """run the named generator and monitor the input and generator folder"""
    input = [Path(entry).expand().abspath() for entry in input]
    output = Path(output).expand().abspath()
    cmd = 'python3 {0} {1} {2}'.format(script, ' '.join(input), output)
    event_handler = RunScriptChangeHandler(cmd)
    event_handler.run()  # run always once
    observer = Observer()
    path = script.dirname().expand().abspath()
    click.secho('watch: {0}'.format(path), fg='blue')
    observer.schedule(event_handler, path, recursive=True)
    for entry in input:
        entry = entry.dirname().expand().abspath()
        click.secho('watch: {0}'.format(entry), fg='blue')
        observer.schedule(event_handler, entry, recursive=True)
    path = Path(__file__).parent / 'qface'
    click.secho('watch: {0}'.format(path), fg='blue')
    observer.schedule(event_handler, path, recursive=True)
    observer.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()
项目:qface    作者:Pelagicore    | 项目源码 | 文件源码
def upload():
    sh('twine upload dist/*')
    Path('build').rmtree_p()
项目:qface    作者:Pelagicore    | 项目源码 | 文件源码
def pack():
    Path('build').rmtree_p()
    Path('dist').rmtree_p()
    sh('python3 setup.py bdist_wheel')
    sh('unzip -l dist/*.whl')
项目:qface    作者:Pelagicore    | 项目源码 | 文件源码
def clean():
    Path('build').rmtree_p()
    Path('dist').rmtree_p()
    Path('qface.egg-info').rmtree_p()
项目:millilauncher    作者:fhfuih    | 项目源码 | 文件源码
def _wizard():
    click.echo('Running the setup wizard. On each line, a default value is shown in the brackets if valid.\
        Leave blank to use it, or enter a new value.')
    config.mc_dir = click.prompt('Your \'.minecraft\' folder path', show_default=True, default=config.mc_dir, type=click.Path(exists=True))
    config.java_dir = click.prompt('Your \'javaw\' file path', show_default=True, default=config.java_dir, type=click.Path(exists=True))
    config.max_mem = click.prompt('Maximum memory allocated to Minecraft in MB', show_default=True, default=config.max_mem, type=int)
    config.username = click.prompt('Your Minecraft username', show_default=True, default=config.username)
    click.echo('Done! More entries can be reached later manually.\n')
    config.save()