Python click 模块,File() 实例源码

我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用click.File()

项目:chainerboard    作者:koreyou    | 项目源码 | 文件源码
def run(infile, outfile, time_interval, quiet):
    logging.basicConfig(level=logging.WARN if quiet else logging.INFO)
    logger = logging.getLogger(__name__)
    logger.info('loading input file %s ...' % infile)
    with open(infile) as fin:
        # Do not use click.File because we want close the file asap
        data = json.load(fin)
    n = len(data)
    logger.info(
        'loading input file %s done. %d data found.'% (infile, n))
    for i in xrange(len(data)):
        logger.info('Sleeping for %d sec [%d/%d] ...' % (time_interval, i+1, n))
        time.sleep(time_interval)
        with open(outfile, 'w') as fout:
            json.dump(data[:(i+1)], fout)
        logger.info('Dumped %dth/%d data to %s' % (i+1, n, outfile))
项目:cfdilib    作者:Vauxoo    | 项目源码 | 文件源码
def cfdv32mx(config):
    """Format cfdi v3.2 for Mexico.

    \b
    File where the files will be written document.xml.
        cfdicli --in_file /path/to/yout/json/documnt.json cfdv32mx

    \b
    File where the files will be written from document.json.
        cfdicli --out_file ./document.xml cfdv32mx
    """
    # TODO: look for a secure option for eval.
    #       Or simply the CLI only should manage json?
    # TODO: Implement json option also.
    dict_input = eval(config.in_file.read())
    invoice = cfdv32.get_invoice(dict_input)
    if invoice.valid:
        config.out_file.write(invoice.document)
        config.out_file.flush()
        click.echo('Document %s has been created.' % config.out_file.name)
    else:
        click.echo(invoice.ups.message)
项目:client    作者:wandb    | 项目源码 | 文件源码
def pull(project, run, kind, entity):
    project, run = api.parse_slug(run, project=project)

    urls = api.download_urls(project, run=run, entity=entity)
    if len(urls) == 0:
        raise ClickException("Run has no files")
    click.echo("Downloading: {project}/{run}".format(
        project=click.style(project, bold=True), run=run
    ))

    for name in urls:
        if api.file_current(name, urls[name]['md5']):
            click.echo("File %s is up to date" % name)
        else:
            length, response = api.download_file(urls[name]['url'])
            with click.progressbar(length=length, label='File %s' % name,
                                   fill_char=click.style('&', fg='green')) as bar:
                with open(name, "wb") as f:
                    for data in response.iter_content(chunk_size=4096):
                        f.write(data)
                        bar.update(len(data))
项目:openag_python    作者:OpenAgInitiative    | 项目源码 | 文件源码
def codegen_options(f):
    f = click.option(
        "-c", "--categories", multiple=True, default=default_categories,
        type=click.Choice(all_categories),
        help="A list of the categories of inputs and outputs that should "
        "be enabled"
    )(f)
    f = click.option(
        "-f", "--param_file",
        type=click.File(),
        help="""YAML or JSON file describing the firmware module configuration to be flashed.
        This is the same file that is used for rosparam in the launch file."""
        "code"
    )(f)
    f = click.option(
        "-p", "--plugin", multiple=True, help="Enable a specific plugin"
    )(f)
    f = click.option(
        "-t", "--target", help="PlatformIO target (e.g.  upload)"
    )(f)
    f = click.option(
        "--status_update_interval", default=5,
        help="Minimum interval between driver status updates (in seconds)"
    )(f)
    return f
项目:openbadges-bakery    作者:concentricsky    | 项目源码 | 文件源码
def bake(input_file, output_file, data):
    """
    This command bakes Open Badges data into a file and saves
    the result to an output file.

    Positional Arguments:

    \b
      Input filename:    File must exist.
    \b
      Output filename:   If file exists, it will be overwritten.
    """
    output = utils.bake(input_file, data, output_file)

    click.echo(
        "{} is done baking. Remember to let it cool".format(output_file.name)
    )
项目:StrepHit    作者:Wikidata    | 项目源码 | 文件源码
def generate_crowdflower_interface_template(input_csv, output_html):
    """ Generate CrowFlower interface template based on input data spreadsheet

    :param file input_csv: CSV file with the input data
    :param output_html: File in which to write the output
    :type output_html: file
    :return: 0 on success
    """
    # Get the filed names of the input data spreadsheet
    sheet = csv.DictReader(input_csv)
    fields = sheet.fieldnames
    # Get "fe_[0-9][0-9]" fields
    fe_fields = [f for f in fields if re.match(r'fe_[0-9]{2}$', f)]
    # Get "chunk[0-9][0-9]" fields
    token_fields = [f for f in fields if re.match(r'chunk_[0-9]{2}$', f)]
    # Generate fe blocks for every token field
    fe_blocks = []
    for fe_field in fe_fields:
        fe_blocks.append(FE_TEMPLATE % {'fe_field': fe_field})
    crowdflower_interface_template = HEADER
    # Generate fe_name blocks(question blocks) for every fe_name field
    for idx, token_field in enumerate(token_fields):
        dic = {'question_num': idx + 1, 'token_field': token_field}
        # Add fe blocks into template
        dic['fe_blocks'] = ''.join(fe_blocks)
        # Add current fe_name block or question block into template
        crowdflower_interface_template += (TOKEN_TEMPLATE % dic)

    crowdflower_interface_template += FOOTER
    output_html.write(crowdflower_interface_template)
    return 0
项目:coreapi-cli    作者:core-api    | 项目源码 | 文件源码
def parse_files(ctx, param, values):
    ret = []
    converter = click.File('rb')

    for item in values:
        if '=' not in item:
            raise click.BadParameter('String parameter "%s" should be in form of FIELD=VALUE')
        field, value = item.split('=', 1)
        input_file = converter.convert(value, param, ctx)
        pair = (field, input_file)
        ret.append(pair)

    return ret
项目:swag-client    作者:Netflix-Skunkworks    | 项目源码 | 文件源码
def file(ctx, data_dir, data_file):
    """Use the File SWAG Backend"""
    if not ctx.file:
        ctx.data_file = data_file

    if not ctx.data_dir:
        ctx.data_dir = data_dir

    ctx.type = 'file'
项目:k2mosaic    作者:KeplerGO    | 项目源码 | 文件源码
def tpflist(campaign, channel, sc, wget):
    """Prints the Target Pixel File URLS for a given CAMPAIGN/QUARTER and ccd CHANNEL.

    CAMPAIGN can refer to a K2 Campaign (e.g. 'C4') or a Kepler Quarter (e.g. 'Q4').
    """
    try:
        urls = mast.get_tpf_urls(campaign, channel=channel, short_cadence=sc)
        if wget:
            WGET_CMD = 'wget -nH --cut-dirs=6 -c -N '
            print('\n'.join([WGET_CMD + url for url in urls]))
        else:
            print('\n'.join(urls))
    except mast.NoDataFoundException as e:
        click.echo(e)
项目:DeepLearnTute    作者:DouglasOrr    | 项目源码 | 文件源码
def load_hdf5(path):
    '''Load a Dataset object from an HDF5 file.
    '''
    with h5py.File(path, 'r') as f:
        x = f['x'][...]
        return Dataset(
            x=x.reshape(x.shape[0], -1),
            y=f['y'][...].astype(np.int32),
            vocab=f['vocab'][...],
            height=x.shape[1],
            width=x.shape[2]
        )
项目:DeepLearnTute    作者:DouglasOrr    | 项目源码 | 文件源码
def cli_render(input, output, size):
    '''Render a JSONlines dataset to numpy arrays, saved in an HDF5 file.
    '''
    chars = []
    images = []
    for line in input:
        datum = json.loads(line)
        chars.append(datum['target'])
        images.append(render(
            [np.array(s) for s in datum['strokes']],
            size))

    vocab = list(sorted(set(chars)))
    char_to_index = {ch: y for y, ch in enumerate(vocab)}

    with h5py.File(output, 'a') as f:
        str_dt = h5py.special_dtype(vlen=str)
        f.require_dataset(
            'vocab', (len(vocab),), dtype=str_dt
        )[...] = vocab
        f.require_dataset(
            'x', shape=(len(images), size, size), dtype=np.float32
        )[...] = np.array(images)
        f.require_dataset(
            'y', shape=(len(chars),), dtype=np.int
        )[...] = np.array([char_to_index[ch] for ch in chars])
项目:histonets-cv    作者:sul-cidr    | 项目源码 | 文件源码
def convert(self, param=None, ctx=None, value=None):
        if not value:
            content = sys.stdin.readlines()
        else:
            is_compressed = value.endswith('.gz')
            is_binary = self.mode.endswith('b')
            if os.path.exists(value):
                local_mode = 'rb' if is_compressed else self.mode
                file_obj = open(value, local_mode)
            else:
                url = urlparse(value)
                if not url.scheme:
                    raise click.BadParameter(
                        'File \'{}\' not found'.format(url))
                elif url.scheme not in self.SUPPORTED_SCHEMES:
                    raise click.BadParameter(
                        'Scheme \'{}\' not supported'.format(url.scheme))
                else:
                    file_obj = urlopen(value)
            if is_compressed:
                with gzip.GzipFile(mode=self.mode, fileobj=file_obj) as file:
                    content = file.read()
                    if not is_binary:
                        content = local_decode(content)
            else:
                content = file_obj.read()
            file_obj.close()
        return content
项目:ceph-lcm    作者:Mirantis    | 项目源码 | 文件源码
def ssh_command(func):
    func = click.option(
        "-i", "--identity-file",
        type=click.File(lazy=False),
        default=str(get_private_key_path()),
        help="Path to the private key file",
        show_default=True
    )(func)
    func = click.option(
        "-b", "--batch-size",
        type=int,
        default=20,
        help="By default, command won't connect to all servers "
             "simultaneously, it is trying to process servers in batches. "
             "Negative number or 0 means connect to all hosts",
        show_default=True,
    )(func)

    @functools.wraps(func)
    @click.pass_context
    def decorator(ctx, identity_file, batch_size, *args, **kwargs):
        private_key = asyncssh.import_private_key(identity_file.read())
        batch_size = batch_size if batch_size > 0 else None
        identity_file.close()

        ctx.obj["private_key"] = private_key
        ctx.obj["batch_size"] = batch_size
        ctx.obj["event_loop"] = asyncio.get_event_loop()

        return func(*args, **kwargs)

    return decorator
项目:ns-bot    作者:eigenein    | 项目源码 | 文件源码
def main(
    telegram_token: str,
    ns_login: str,
    ns_password: str,
    log_file: click.File,
    verbose: bool,
):
    logging.basicConfig(
        datefmt="%Y-%m-%d %H:%M:%S",
        format="%(asctime)s [%(levelname).1s] %(message)s",
        level=(logging.INFO if not verbose else logging.DEBUG),
        stream=(log_file or click.get_text_stream("stderr")),
    )

    logging.info("Starting bot…")
    with ExitStack() as exit_stack:
        telegram = exit_stack.enter_context(closing(Telegram(telegram_token)))
        ns = exit_stack.enter_context(closing(Ns(ns_login, ns_password)))
        bot = exit_stack.enter_context(closing(Bot(telegram, ns)))
        try:
            asyncio.ensure_future(bot.run())
            asyncio.get_event_loop().run_forever()
        finally:
            bot.stop()


# Bot response phrases.
# ----------------------------------------------------------------------------------------------------------------------
项目:redbiom    作者:biocore    | 项目源码 | 文件源码
def fetch_samples_from_obserations(features, exact, from_, output,
                                   context):
    """Fetch sample data containing features."""
    import redbiom.util
    iterable = redbiom.util.from_or_nargs(from_, features)

    import redbiom.fetch
    tab, map_ = redbiom.fetch.data_from_features(context, iterable, exact)

    import h5py
    with h5py.File(output, 'w') as fp:
        tab.to_hdf5(fp, 'redbiom')

    _write_ambig(map_, output)
项目:redbiom    作者:biocore    | 项目源码 | 文件源码
def fetch_samples_from_samples(samples, from_, output, context):
    """Fetch sample data."""
    import redbiom.util
    iterable = redbiom.util.from_or_nargs(from_, samples)

    import redbiom.fetch
    table, ambig = redbiom.fetch.data_from_samples(context, iterable)

    import h5py
    with h5py.File(output, 'w') as fp:
        table.to_hdf5(fp, 'redbiom')

    _write_ambig(ambig, output)
项目:sodium11    作者:trbs    | 项目源码 | 文件源码
def test_persistfile_exists(self, filename, add_postfix=True):
        filename = self._get_persist_filename(filename, add_postfix=add_postfix)
        if os.path.isfile(filename):
            raise click.UsageError("File '%s' already exists" % filename)
项目:sodium11    作者:trbs    | 项目源码 | 文件源码
def cli_decrypt(ctx, filename, key_file, passphrase, public_keyfile, verify, keep, progress, leave_progress_bar):
    """Decrypt file(s) with private key."""
    if progress is None:
        progress = sys.stdout.isatty()

    if passphrase is None:
        if progress:
            passphrase = click.prompt('Passphrase', hide_input=True)
        else:
            raise click.ClickException("No passphrase given.")

    ed_prv = load_private_keyfile(key_file, passphrase)

    sender_pubkey = None
    if public_keyfile:
        sender_pubkey = load_public_keyfile(public_keyfile)

    for fh in filename:
        if not fh.name.endswith(".s11"):
            # TODO: implement output-filename as cli option
            raise click.UsageError("File '%s' does not end with .s11" % fh.name)
        output_filename = fh.name[:-4]

        _decrypt(
            f=fh,
            ed_prv=ed_prv,
            sender_pubkey=sender_pubkey,
            verify=verify,
            progress=progress,
            leave_progress_bar=leave_progress_bar,
            output_filename=output_filename,
        )
        fh.close()
        if verify is False and not keep:
            os.unlink(fh.name)
项目:openbadges-bakery    作者:concentricsky    | 项目源码 | 文件源码
def unbake(input_file, output_file):
    """
    This command extracts Open Badges data from an image and
    prints it to a file or the standard output.

    Positional Arguments:

    \b
      Input filename:    File must exist.
    \b
      Output filename:   If file exists, it will be overwritten.
    """
    click.echo('')
    output_file.write(utils.unbake(input_file))
    click.echo('\n')
项目:client    作者:wandb    | 项目源码 | 文件源码
def status(run, settings, project):
    if settings:
        click.echo(click.style("Logged in?", bold=True) + " %s" %
                   bool(api.api_key))
        click.echo(click.style("Current Settings", bold=True) +
                   " (%s)" % api.settings_file)
        settings = api.settings()
        click.echo(json.dumps(
            settings,
            sort_keys=True,
            indent=2,
            separators=(',', ': ')
        ))

    # project, run = api.parse_slug(run, project=project)
    # existing = set()  # TODO: populate this set with the current files in the run dir
    # remote = api.download_urls(project, run)
    # not_synced = set()
    # remote_names = set([name for name in remote])
    # for file in existing:
    #    meta = remote.get(file)
    #    if meta and not api.file_current(file, meta['md5']):
    #        not_synced.add(file)
    #    elif not meta:
    #        not_synced.add(file)
    # TODO: remove items that exists and have the md5
    # only_remote = remote_names.difference(existing)
    # up_to_date = existing.difference(only_remote).difference(not_synced)
    # click.echo('File status for ' + click.style('"%s/%s" ' %
    #                                            (project, run), bold=True))
    # if len(not_synced) > 0:
    #    click.echo(click.style('Push needed: ', bold=True) +
    #               click.style(", ".join(not_synced), fg="red"))
    # if len(only_remote) > 0:
    #    click.echo(click.style('Pull needed: ', bold=True) +
    #               click.style(", ".join(only_remote), fg="red"))
    # if len(up_to_date) > 0:
    #    click.echo(click.style('Up to date: ', bold=True) +
    #               click.style(", ".join(up_to_date), fg="green"))


#@cli.command(context_settings=CONTEXT, help="Store notes for a future training run")
项目:ampy    作者:adafruit    | 项目源码 | 文件源码
def put(local, remote):
    """Put a file or folder and its contents on the board.

    Put will upload a local file or folder  to the board.  If the file already
    exists on the board it will be overwritten with no warning!  You must pass
    at least one argument which is the path to the local file/folder to
    upload.  If the item to upload is a folder then it will be copied to the
    board recursively with its entire child structure.  You can pass a second
    optional argument which is the path and name of the file/folder to put to
    on the connected board.

    For example to upload a main.py from the current directory to the board's
    root run:

      ampy --port /board/serial/port put main.py

    Or to upload a board_boot.py from a ./foo subdirectory and save it as boot.py
    in the board's root run:

      ampy --port /board/serial/port put ./foo/board_boot.py boot.py

    To upload a local folder adafruit_library and all of its child files/folders
    as an item under the board's root run:

      ampy --port /board/serial/port put adafruit_library

    Or to put a local folder adafruit_library on the board under the path
    /lib/adafruit_library on the board run:

      ampy --port /board/serial/port put adafruit_library /lib/adafruit_library
    """
    # Use the local filename if no remote filename is provided.
    if remote is None:
        remote = os.path.basename(os.path.abspath(local))
    # Check if path is a folder and do recursive copy of everything inside it.
    # Otherwise it's a file and should simply be copied over.
    if os.path.isdir(local):
        # Directory copy, create the directory and walk all children to copy
        # over the files.
        board_files = files.Files(_board)
        for parent, child_dirs, child_files in os.walk(local):
            # Create board filesystem absolute path to parent directory.
            remote_parent = posixpath.normpath(posixpath.join(remote, os.path.relpath(parent, local)))
            try:
                # Create remote parent directory.
                board_files.mkdir(remote_parent)
                # Loop through all the files and put them on the board too.
                for filename in child_files:
                    with open(os.path.join(parent, filename), 'rb') as infile:
                        remote_filename = posixpath.join(remote_parent, filename)
                        board_files.put(remote_filename, infile.read())
            except files.DirectoryExistsError:
                # Ignore errors for directories that already exist.
                pass

    else:
        # File copy, open the file and copy its contents to the board.
        # Put the file on the board.
        with open(local, 'rb') as infile:
            board_files = files.Files(_board)
            board_files.put(remote, infile.read())