Python ruamel.yaml 模块,load() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ruamel.yaml.load()

项目:artman    作者:googleapis    | 项目源码 | 文件源码
def load_config_spec(config_spec, config_sections, repl_vars, language):
    config_split = config_spec.strip().split(':')
    config_path = config_split[0]
    if len(config_split) > 1:
        config_sections = config_split[1].split('|')
    with open(config_path) as config_file:
        all_config_data = yaml.load(config_file, Loader=yaml.Loader)

    # Make a list of the appropriate configuration sections (just the ones
    # we are actually using) from the YAML file.
    segments = [all_config_data[i] for i in config_sections]
    segments.append(all_config_data.get(language, {}))

    # Merge all of the segments of data into a single config dictionary.
    config = merge(*segments)

    # Perform final replacements.
    return replace_vars(config, repl_vars)
项目:nlppln    作者:nlppln    | 项目源码 | 文件源码
def generate_cwl_documentation(_):
    cur_dir = os.path.abspath(os.path.dirname(__file__))

    # find all cwl files
    with WorkflowGenerator() as wf:
        cwl_files = [step.run for step in wf.steps_library.steps.values()]
    # sort alphabetically
    cwl_files.sort()

    tools_file = os.path.join(cur_dir, 'tools.rst')
    tool_template = '\n{}\n{}\n\n{}\n'
    with codecs.open(tools_file, 'wb', encoding='utf-8') as f:
        f.write('Tools\n=====\n')
        f.write('\n``nlppln`` contains the following tools:\n')
        for cwl in cwl_files:
            tool_name = os.path.basename(cwl)
            plusses = '+'*len(tool_name)
            with codecs.open(cwl) as c:
                try:
                    cwl_yaml = yaml.load(c, Loader=yaml.RoundTripLoader)
                    doc = cwl_yaml.get('doc', 'No documentation')
                    f.write(tool_template.format(tool_name, plusses, doc))
                except yaml.YAMLError:
                    pass
项目:hatchery    作者:ajk8    | 项目源码 | 文件源码
def from_yaml():
    """ Load configuration from yaml source(s), cached to only run once """
    default_yaml_str = snippets.get_snippet_content('hatchery.yml')
    ret = yaml.load(default_yaml_str, Loader=yaml.RoundTripLoader)
    for config_path in CONFIG_LOCATIONS:
        config_path = os.path.expanduser(config_path)
        if os.path.isfile(config_path):
            with open(config_path) as config_file:
                config_dict = yaml.load(config_file, Loader=yaml.RoundTripLoader)
                if config_dict is None:
                    continue
                for k, v in config_dict.items():
                    if k not in ret.keys():
                        raise ConfigError(
                            'found garbage key "{}" in {}'.format(k, config_path)
                        )
                    ret[k] = v
    return ret
项目:ommprotocol    作者:insilichem    | 项目源码 | 文件源码
def from_parmed(cls, path, *args, **kwargs):
        """
        Try to load a file automatically with ParmEd. Not guaranteed to work, but
        might be useful if it succeeds.

        Arguments
        ---------
        path : str
            Path to file that ParmEd can load
        """
        st = parmed.load_file(path, structure=True, *args, **kwargs)
        box = kwargs.pop('box', getattr(st, 'box', None))
        velocities = kwargs.pop('velocities', getattr(st, 'velocities', None))
        positions = kwargs.pop('positions', getattr(st, 'positions', None))
        return cls(master=st, topology=st.topology, positions=positions, box=box,
                   velocities=velocities, path=path, **kwargs)
项目:ommprotocol    作者:insilichem    | 项目源码 | 文件源码
def _pickle_load(path):
        """
        Loads pickled topology. Careful with Python versions though!
        """
        _, ext = os.path.splitext(path)
        topology = None
        if sys.version_info.major == 2:
            if ext == '.pickle2':
                with open(path, 'rb') as f:
                    topology = pickle.load(f)
            elif ext in ('.pickle3', '.pickle'):
                with open(path, 'rb') as f:
                    topology = pickle.load(f, protocol=3)
        elif sys.version_info.major == 3:
            if ext == '.pickle2':
                with open(path, 'rb') as f:
                    topology = pickle.load(f)
            elif ext in ('.pickle3', '.pickle'):
                with open(path, 'rb') as f:
                    topology = pickle.load(f)
        if topology is None:
            raise ValueError('File {} is not compatible with this version'.format(path))
        return topology
项目:Box    作者:cdgriffith    | 项目源码 | 文件源码
def from_json(cls, json_string=None, filename=None,
                  encoding="utf-8", errors="strict", **kwargs):
        """
        Transform a json object string into a Box object. If the incoming
        json is a list, you must use BoxList.from_json.

        :param json_string: string to pass to `json.loads`
        :param filename: filename to open and pass to `json.load`
        :param encoding: File encoding
        :param errors: How to handle encoding errors
        :param kwargs: parameters to pass to `Box()` or `json.loads`
        :return: Box object from json data
        """
        bx_args = {}
        for arg in kwargs.copy():
            if arg in BOX_PARAMETERS:
                bx_args[arg] = kwargs.pop(arg)

        data = _from_json(json_string, filename=filename,
                          encoding=encoding, errors=errors, **kwargs)

        if not isinstance(data, dict):
            raise BoxError('json data not returned as a dictionary, '
                           'but rather a {0}'.format(type(data).__name__))
        return cls(data, **bx_args)
项目:Box    作者:cdgriffith    | 项目源码 | 文件源码
def from_yaml(cls, yaml_string=None, filename=None,
                      encoding="utf-8", errors="strict",
                      **kwargs):
            """
            Transform a yaml object string into a Box object.

            :param yaml_string: string to pass to `yaml.load`
            :param filename: filename to open and pass to `yaml.load`
            :param encoding: File encoding
            :param errors: How to handle encoding errors
            :param kwargs: parameters to pass to `Box()` or `yaml.load`
            :return: Box object from yaml data
            """
            bx_args = {}
            for arg in kwargs.copy():
                if arg in BOX_PARAMETERS:
                    bx_args[arg] = kwargs.pop(arg)

            data = _from_yaml(yaml_string=yaml_string, filename=filename,
                              encoding=encoding, errors=errors, **kwargs)
            if not isinstance(data, dict):
                raise BoxError('yaml data not returned as a dictionary'
                               'but rather a {0}'.format(type(data).__name__))
            return cls(data, **bx_args)
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def main():
    parser = setup_parser()
    options = parser.parse_args()

    log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    logging.basicConfig(level=logging.DEBUG, format=log_format)
    logging.getLogger('botocore').setLevel(logging.WARNING)

    with open(options.config) as fh:
        config = yaml.load(fh.read(), Loader=yaml.SafeLoader)

    jsonschema.validate(config, CONFIG_SCHEMA)
    setup_defaults(config)

    tester = MailerTester(
        options.MESSAGE_FILE, config, msg_plain=options.plain,
        json_dump_file=options.json_dump_file
    )
    tester.run(options.dry_run, options.print_only)
项目:smarthome    作者:smarthomeNG    | 项目源码 | 文件源码
def _ordered_load(stream, Loader=yaml.Loader, object_pairs_hook=OrderedDict):
    """
    Ordered yaml loader
    Use this instead ot yaml.loader/yaml.saveloader to get an Ordereddict

    :param stream: stream to read from
    :param Loader: yaml-loader to use
    :object_pairs_hook: ...

    :return: OrderedDict structure
    """

    # usage example: ordered_load(stream, yaml.SafeLoader)
    class OrderedLoader(Loader):
        pass
    def construct_mapping(loader, node):
        loader.flatten_mapping(node)
        return object_pairs_hook(loader.construct_pairs(node))
    OrderedLoader.add_constructor(
        yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
        construct_mapping)
    return yaml.load(stream, OrderedLoader)
项目:smarthome    作者:smarthomeNG    | 项目源码 | 文件源码
def yaml_load_roundtrip(filename):
    """
    Load contents of a yaml file into an dict structure for editing (using Roundtrip Loader)

    :param filename: name of the yaml file to load
    :return: data structure loaded from file
    """

    if not EDITING_ENABLED:
        return None

    y = None
    try:
        with open(filename+YAML_FILE, 'r') as stream:
            sdata = stream.read()
        sdata = sdata.replace('\n', '\n\n')
        y = yaml.load(sdata, yaml.RoundTripLoader)
    except Exception as e:
        logger.error("yaml_load_roundtrip: YAML-file load error: '%s'" % (e))
        y = {} 
    return y
项目:smarthome    作者:smarthomeNG    | 项目源码 | 文件源码
def writeBackToFile(filename, itempath, itemattr, value):
    """
    write the value of an item's attribute back to the yaml-file

    :param filename: name of the yaml-file (without the .yaml extension!)
    :param itempath: path of the item to modify
    :param itemattr: name of the item's attribute to modify
    :param value: new value for the attribute

    :return: formatted string
    """

    itemyamlfile = yamlfile(filename)
    if os.path.isfile(filename+YAML_FILE):
        itemyamlfile.load()
    itemyamlfile.setleafvalue(itempath, itemattr, value)
    itemyamlfile.save()


# ==================================================================================
#   class yamlfile (for editing multiple entries at a time)
#
项目:smarthome    作者:smarthomeNG    | 项目源码 | 文件源码
def yaml_save(filename, data):
    """
    ***Converter Special ***

    Save contents of an OrderedDict structure to a yaml file

    :param filename: name of the yaml file to save to
    :param data: OrderedDict to save
    """

    sdata = convert_yaml(data)

    print(", saving to '{}'".format(os.path.basename(filename)+'.yaml'))
    if store_raw_output == True:
        with open(filename+'_raw.yaml', 'w') as outfile:
            outfile.write( sdata )

    # Test if roundtrip gives the same result
    data = yaml.load(sdata, yaml.RoundTripLoader)
    _yaml_save_roundtrip(filename, data)
项目:deep_rl_vizdoom    作者:mihahauke    | 项目源码 | 文件源码
def load_settings(default_settings_file, override_settings_files):
    yaml = ruamel.yaml.YAML()
    yaml.allow_duplicate_keys = False
    try:
        log("Loading common default settings from: " + DEFAULT_COMMON_SETTINGS_FILE)
        settings = dict(yaml.load(open(DEFAULT_COMMON_SETTINGS_FILE)))
        log("Loading default settings from: " + default_settings_file)
        settings.update(yaml.load(open(default_settings_file)))

        for settings_fpath in override_settings_files:
            log("Loading settings from: " + settings_fpath)
            override_settings = yaml.load(open(settings_fpath))
            settings.update(override_settings)
        log("Loaded settings.")
    except ruamel.yaml.constructor.DuplicateKeyError as ex:
        log(red(ex))
        log(red("Aborting!"))
        exit(1)

    return settings
项目:python-cwlgen    作者:common-workflow-language    | 项目源码 | 文件源码
def import_cwl(self, cwl_path):
        """
        Load content of cwl into the :class:`cwlgen.CommandLineTool` object.

        :param cwl_path: Path of the CWL tool to be loaded.
        :type cwl_path: STRING
        :return: CWL tool content in cwlgen model.
        :rtype: :class:`cwlgen.CommandLineTool`
        """
        with open(cwl_path) as yaml_file:
            cwl_dict = ryaml.load(yaml_file, Loader=ryaml.Loader)
        tool = self._init_tool(cwl_dict)
        for key, element in cwl_dict.items():
            try:
                getattr(self, '_load_{}'.format(key))(tool, element)
            except AttributeError:
                logger.warning(key + " content is not processed (yet).")
        return tool
项目:artman    作者:googleapis    | 项目源码 | 文件源码
def main(*args):
    """Main method of artman."""
    # If no arguments are sent, we are using the entry point; derive
    # them from sys.argv.
    if not args:
        args = sys.argv[1:]

    # Get to a normalized set of arguments.
    flags = parse_args(*args)
    user_config = read_user_config(flags)
    _adjust_root_dir(flags.root_dir)
    pipeline_name, pipeline_kwargs = normalize_flags(flags, user_config)

    if flags.local:
        try:
            pipeline = pipeline_factory.make_pipeline(pipeline_name, False,
                                                      **pipeline_kwargs)
            # Hardcoded to run pipeline in serial engine, though not necessarily.
            engine = engines.load(
                pipeline.flow, engine='serial', store=pipeline.kwargs)
            engine.run()
        except:
            logger.fatal(traceback.format_exc())
        finally:
            _change_owner(flags, pipeline_name, pipeline_kwargs)
    else:
        support.check_docker_requirements(flags.image)
        # Note: artman currently won't work if input directory doesn't contain
        # shared configuration files (e.g. gapic/packaging/dependencies.yaml).
        # This will make artman less useful for non-Google APIs.
        # TODO(ethanbao): Fix that by checking the input directory and
        # pulling the shared configuration files if necessary.
        logger.info('Running artman command in a Docker instance.')
        _run_artman_in_docker(flags)
项目:artman    作者:googleapis    | 项目源码 | 文件源码
def read_user_config(flags):
    """Read the user config from disk and return it.

    Args:
        flags (argparse.Namespace): The flags from sys.argv.

    Returns:
        dict: The user config.
    """
    # Load the user configuration if it exists and save a dictionary.
    user_config = {}
    user_config_file = os.path.realpath(os.path.expanduser(flags.user_config))
    if os.path.isfile(user_config_file):
        with io.open(user_config_file) as ucf:
            user_config = yaml.load(ucf.read(), Loader=yaml.Loader) or {}

    # Sanity check: Is there a configuration? If not, abort.
    if not user_config:
        setup_logging(INFO)
        logger.critical('No user configuration found.')
        logger.warn('This is probably your first time running Artman.')
        logger.warn('Run `configure-artman` to get yourself set up.')
        sys.exit(64)

    # Done; return the user config.
    return user_config
项目:artman    作者:googleapis    | 项目源码 | 文件源码
def execute(self, gapic_code_dir, grpc_code_dir, proto_code_dir, gapic_api_yaml):
        with open(gapic_api_yaml[0]) as f:
            gapic_config = yaml.load(f, Loader=yaml.Loader)
        package_name = gapic_config.get('language_settings').get('csharp').get('package_name')
        package_root = '{0}/{1}'.format(gapic_code_dir, package_name)
        prod_dir = '{0}/{1}'.format(package_root, package_name)
        # Copy proto/grpc .cs files into prod directory
        self.exec_command(['sh', '-c', 'cp {0}/*.cs {1}'.format(proto_code_dir, prod_dir)])
        self.exec_command(['sh', '-c', 'cp {0}/*.cs {1}'.format(grpc_code_dir, prod_dir)])
项目:artman    作者:googleapis    | 项目源码 | 文件源码
def execute(self, src_proto_path, import_proto_path, common_protos_yaml,
                organization_name):
        self._organization_name = organization_name

        with io.open(common_protos_yaml) as file_:
            common_protos_data = yaml.load(file_, Loader=yaml.Loader)

        # Treat google.protobuf, google.iam as a common proto package, even
        # though they are not included in the common-protos we generate.
        #
        # TODO (geigerj): remove 'google.iam' when it is included in the common
        # protos package.
        common_protos = ['google.protobuf', 'google.iam']
        for package in common_protos_data['packages']:
            common_protos.append('google.' + package['name'].replace('/', '.'))

        tmpdir = os.path.join(
            tempfile.gettempdir(), 'artman-python', str(int(time.time())))
        new_proto_dir = os.path.join(tmpdir, 'proto')
        new_src_path = set()
        new_import_path = [new_proto_dir]

        self._copy_and_transform_directories(
            src_proto_path, new_proto_dir, common_protos, paths=new_src_path)
        self._copy_and_transform_directories(
            import_proto_path, new_proto_dir, common_protos)

        # Update src_proto_path, import_proto_path
        return list(new_src_path), new_import_path
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def read_header(path):

        path = Path(path)
        data = yload(path.text(encoding='utf8'))

        return data['header']
项目:sequana    作者:sequana    | 项目源码 | 文件源码
def _parse_data(self):
        with open(self.filename, 'r') as fin:
            data = json.load(fin)
        return data
项目:sequana    作者:sequana    | 项目源码 | 文件源码
def __init__(self, data=None, converts_none_to_str=True):
        """Could be a JSON or a YAML file

        :param str filename: filename to a config file in json or YAML format.

        SEQUANA config files must have some specific fields::

            input_directory
            input_samples...
        """
        # Create a dummy YAML code to hold data in case the input is a json
        # or a dictionary structure. We use a CommentedMap that works like
        # a dictionary. Be aware that the update method will lose the comments
        if data is None:
            self.config = AttrDict()
            self._yaml_code = comments.CommentedMap()
        elif isinstance(data, str): # else is it a filename ?
            if os.path.exists(data):
                if data.endswith(".yaml") or data.endswith(".yml"):
                    with open(data, "r") as fh:
                        self._yaml_code = ruamel.yaml.load(
                            fh.read(), ruamel.yaml.RoundTripLoader)
                else:
                    # read a JSON
                    import yaml
                    with open(data, "r") as fh:
                        self._yaml_code =  yaml.load(json.dumps(
                            json.loads(fh.read())))
                config = load_configfile(data)
            else:
                raise IOError("input string must be an existing file (%s)" % data)
            self.config = AttrDict(**config)
        elif isinstance(data, SequanaConfig): # else maybe a SequanaConfig ?
            self.config = AttrDict(**data.config)
            self._yaml_code = comments.CommentedMap(self.config.copy())
        else: # or a pure dictionary ?
            self.config = AttrDict(**data)
            self._yaml_code = comments.CommentedMap(self.config.copy())
        self.cleanup_config()
项目:sequana    作者:sequana    | 项目源码 | 文件源码
def add_stats_summary_json(json_list, parser):
    if not parser.stats:
        return
    for jfile in json_list:
        with open(jfile, 'r') as fp:
            jdict = json.load(fp)
        jdict['stats'] = parser.stats
        j = json.dumps(jdict)
        with open(jfile, 'w') as fp:
            print(j, file=fp)
项目:InplusTrader_Linux    作者:zhengwsh    | 项目源码 | 文件源码
def load_config(config_path, loader=yaml.Loader, verify_version=True):
    if not os.path.exists(config_path):
        system_log.error(_("config.yml not found in {config_path}").format(config_path))
        return False
    with codecs.open(config_path, encoding="utf-8") as stream:
        config = yaml.load(stream, loader)
    if verify_version:
        config = config_version_verify(config, config_path)
    return config
项目:InplusTrader_Linux    作者:zhengwsh    | 项目源码 | 文件源码
def load_config(config_path, loader=yaml.Loader):
    if config_path is None:
        return {}
    if not os.path.exists(config_path):
        system_log.error(_(u"config.yml not found in {config_path}").format(config_path))
        return False
    with codecs.open(config_path, encoding="utf-8") as stream:
        config = yaml.load(stream, loader)
    return config
项目:Auspex    作者:BBN-Q    | 项目源码 | 文件源码
def __init__(self, filename):
        self.filename = filename
        with open(filename, 'r') as f:
            self.data = yaml.load(f, Loader=yaml.RoundTripLoader)
项目:Auspex    作者:BBN-Q    | 项目源码 | 文件源码
def load_meas_file(filename=None):
    global LogDir, KernelDir, AWGDir, meas_file

    if filename:
        meas_file = filename
    else:
        meas_file = find_meas_file()

    with open(meas_file, 'r') as fid:
        Loader.add_constructor('!include', Loader.include)
        load = Loader(fid)
        code = load.get_single_data()
        load.dispose()

    # Get the config values out of the measure_file.
    if not 'config' in code.keys():
        raise KeyError("Could not find config section of the yaml file.")

    if 'AWGDir' in code['config'].keys():
        AWGDir = os.path.abspath(code['config']['AWGDir'])
    else:
        raise KeyError("Could not find AWGDir in the YAML config section")

    if 'KernelDir' in code['config'].keys():
        KernelDir = os.path.abspath(code['config']['KernelDir'])
    else:
        raise KeyError("Could not find KernelDir in the YAML config section")

    if 'LogDir' in code['config'].keys():
        LogDir = os.path.abspath(code['config']['LogDir'])
    else:
        raise KeyError("Could not find LogDir in the YAML config section")

    # Create directories if necessary
    for d in [KernelDir, LogDir]:
        if not os.path.isdir(d):
            os.mkdir(d)

    return code
项目:jd4    作者:vijos    | 项目源码 | 文件源码
def _init():
    parallelism = config.get('parallelism', 1)
    logger.info('Using parallelism: %d', parallelism)
    for sandbox in await create_sandboxes(parallelism):
        _sandbox_pool.put_nowait(sandbox)

    try:
        with open(_LANGS_FILE) as file:
            langs_config = yaml.load(file, Loader=yaml.RoundTripLoader)
    except FileNotFoundError:
        logger.error('Language file %s not found.', _LANGS_FILE)
        exit(1)
    for lang_name, lang_config in langs_config.items():
        if lang_config['type'] == 'compiler':
            compiler = Compiler(lang_config['compiler_file'],
                                shlex.split(lang_config['compiler_args']),
                                lang_config['code_file'],
                                lang_config['execute_file'],
                                shlex.split(lang_config['execute_args']))
            _langs[lang_name] = partial(
                _compiler_build, compiler,
                time_limit_ns=lang_config.get('time_limit_ms', DEFAULT_TIME_MS) * 1000000,
                memory_limit_bytes=lang_config.get('memory_limit_kb', DEFAULT_MEM_KB) * 1024,
                process_limit=lang_config.get('process_limit', PROCESS_LIMIT))
        elif lang_config['type'] == 'interpreter':
            interpreter = Interpreter(lang_config['code_file'],
                                      lang_config['execute_file'],
                                      shlex.split(lang_config['execute_args']))
            _langs[lang_name] = partial(_interpreter_build, interpreter)
        else:
            logger.error('Unknown type %s', lang_config['type'])
项目:jd4    作者:vijos    | 项目源码 | 文件源码
def _load_config():
    try:
        with open(_CONFIG_FILE, encoding='utf-8') as file:
            return yaml.load(file, Loader=yaml.RoundTripLoader)
    except FileNotFoundError:
        logger.error('Config file %s not found.', _CONFIG_FILE)
        exit(1)
项目:pyrpl    作者:lneuhaus    | 项目源码 | 文件源码
def load(f):
        return ruamel.yaml.load(f, ruamel.yaml.RoundTripLoader)
项目:pyrpl    作者:lneuhaus    | 项目源码 | 文件源码
def load(stream, Loader=yaml.SafeLoader, object_pairs_hook=OrderedDict):
        class OrderedLoader(Loader):
            pass
        def construct_mapping(loader, node):
            loader.flatten_mapping(node)
            return object_pairs_hook(loader.construct_pairs(node))
        OrderedLoader.add_constructor(
            yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
            construct_mapping)
        return yaml.load(stream, OrderedLoader)
项目:pyrpl    作者:lneuhaus    | 项目源码 | 文件源码
def save(data, stream=None, Dumper=yaml.SafeDumper,
             default_flow_style=False,
             encoding='utf-8',
             **kwds):
        class OrderedDumper(Dumper):
            pass
        def _dict_representer(dumper, data):
            return dumper.represent_mapping(
                yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
                data.items())
        OrderedDumper.add_representer(OrderedDict, _dict_representer)
        OrderedDumper.add_representer(np.float64,
                    lambda dumper, data: dumper.represent_float(float(data)))
        OrderedDumper.add_representer(complex,
                    lambda dumper, data: dumper.represent_str(str(data)))
        OrderedDumper.add_representer(np.complex128,
                    lambda dumper, data: dumper.represent_str(str(data)))
        OrderedDumper.add_representer(np.ndarray,
                    lambda dumper, data: dumper.represent_list(list(data)))
        # I added the following two lines to make pyrpl compatible with pyinstruments. In principle they can be erased
        if isinstance(data, dict) and not isinstance(data, OrderedDict):
            data = OrderedDict(data)
        return yaml.dump(data,
                         stream=stream,
                         Dumper=OrderedDumper,
                         default_flow_style=default_flow_style,
                         encoding=encoding,
                         **kwds)

    # usage example:
    # load(stream, yaml.SafeLoader)
    # save(data, stream=f, Dumper=yaml.SafeDumper)
项目:pyrpl    作者:lneuhaus    | 项目源码 | 文件源码
def _set_yml(self, yml_content):
        """
        :param yml_content: sets the branch to yml_content
        :return: None
        """
        branch = load(yml_content)
        self._parent._data[self._branch] = branch
        self._save()
项目:pyrpl    作者:lneuhaus    | 项目源码 | 文件源码
def _load(self):
        """ loads data from file """
        if self._filename is None:
            # if no file is used, just ignore this call
            return
        logger.debug("Loading config file %s", self._filename)
        # read file from disc
        with open(self._filename) as f:
            self._data = load(f)
        # store the modification time of this file version
        self._mtime = os.path.getmtime(self._filename)
        # make sure that reload timeout starts from this moment
        self._lastreload = time()
        # empty file gives _data=None
        if self._data is None:
            self._data = OrderedDict()
        # update dict of the MemoryTree object
        to_remove = []
        # remove all obsolete entries
        for name in self.__dict__:
            if not name.startswith('_') and name not in self._data:
                to_remove.append(name)
        for name in to_remove:
            self.__dict__.pop(name)
        # insert the branches into the object __dict__ for auto-completion
        self.__dict__.update(self._data)
项目:shipmaster    作者:damoti    | 项目源码 | 文件源码
def from_workspace(cls, path):
        filename = os.path.join(path, '.shipmaster.yaml')
        if not os.path.exists(filename):
            return None
        with open(filename, 'r') as file:
            return cls.from_kwargs(path, **yaml.load(file, yaml.RoundTripLoader))
项目:pypyr-cli    作者:pypyr    | 项目源码 | 文件源码
def test_fileformatyaml_pass_no_substitutions():
    """Relative path to file should succeed.

     Strictly speaking not a unit test.
    """
    context = Context({
        'ok1': 'ov1',
        'fileFormatYamlIn': './tests/testfiles/test.yaml',
        'fileFormatYamlOut': './tests/testfiles/out/out.yaml'})

    fileformat.run_step(context)

    assert context, "context shouldn't be None"
    assert len(context) == 3, "context should have 2 items"
    assert context['ok1'] == 'ov1'
    assert context['fileFormatYamlIn'] == './tests/testfiles/test.yaml'
    assert context['fileFormatYamlOut'] == './tests/testfiles/out/out.yaml'

    with open('./tests/testfiles/out/out.yaml') as outfile:
        outcontents = yaml.load(outfile, Loader=yaml.RoundTripLoader)

    assert len(outcontents) == 3
    assert outcontents['key'] == 'value1 !£$%# *'
    assert outcontents['key2'] == 'blah'
    assert outcontents['key3'] == ['l1',
                                   '!£$% *',
                                   'l2',
                                   [
                                       'l31',
                                       {'l32': ['l321', 'l322']}
                                   ]
                                   ]

    # atrociously lazy test clean-up
    os.remove('./tests/testfiles/out/out.yaml')
项目:pytestlab    作者:sangoma    | 项目源码 | 文件源码
def load(config_path):
    with config_path.open() as fp:
        config = yaml.load(fp.read(), yaml.RoundTripLoader)

    envs = dict(_parse_environments(config.get('environments', {})))
    zones = dict(_parse_zones(config.get('zones', {})))
    return envs, zones
项目:ommprotocol    作者:insilichem    | 项目源码 | 文件源码
def construct_include(self, node):
        """Include file referenced at node."""

        filename = os.path.join(self._root, self.construct_scalar(node))
        filename = os.path.abspath(filename)
        extension = os.path.splitext(filename)[1].lstrip('.')

        with open(filename, 'r') as f:
            if extension in ('yaml', 'yml'):
                return yaml.load(f, Loader=self)
            else:
                return ''.join(f.readlines())
项目:ommprotocol    作者:insilichem    | 项目源码 | 文件源码
def load(cls, path, *args, **kwargs):
        name, ext = os.path.splitext(path)
        try:
            return cls._loaders(ext.lstrip('.'))(path, *args, **kwargs)
        except KeyError:
            logger.error('! Unknown loader for format %s. '
                         'Trying with ParmEd as fallback', ext)
            return cls.from_parmed(path, *args, **kwargs)
        except IOError:
            raise IOError('Could not access file {}'.format(path))
项目:ommprotocol    作者:insilichem    | 项目源码 | 文件源码
def __init__(self, master=None, **kwargs):
        InputContainer.__init__(self, **kwargs)
        if isinstance(master, str):
            raise ValueError('To instantiate from file, use .load() or '
                             'one of the .from_*() methods.')
        self.master = master
        self._path = kwargs.get('path')
项目:ommprotocol    作者:insilichem    | 项目源码 | 文件源码
def prepare_handler(cfg):
    """
    Load all files into single object.
    """
    positions, velocities, box = None, None, None
    _path = cfg['_path']
    forcefield = cfg.pop('forcefield', None)
    topology = sanitize_path_for_file(cfg.pop('topology'), _path)

    if 'checkpoint' in cfg:
        restart_path = sanitize_path_for_file(cfg['checkpoint'], _path)
        restart = Restart.load(restart_path)
        positions = restart.positions
        velocities = restart.velocities
        box = restart.box

    if 'positions' in cfg:
        positions_path = sanitize_path_for_file(cfg.pop('positions'), _path)
        positions = Positions.load(positions_path)
        box = BoxVectors.load(positions_path)

    if 'velocities' in cfg:
        velocities_path = sanitize_path_for_file(cfg.pop('velocities'), _path)
        velocities = Velocities.load(velocities_path)

    if 'box' in cfg:
        box_path = sanitize_path_for_file(cfg.pop('box'), _path)
        box = BoxVectors.load(box_path)

    options = {}
    for key in 'positions velocities box forcefield'.split():
        value = locals()[key]
        if value is not None:
            options[key] = value

    return SystemHandler.load(topology, **options)
项目:ommprotocol    作者:insilichem    | 项目源码 | 文件源码
def statexml2pdb(topology, state, output=None):
    """
    Given an OpenMM xml file containing the state of the simulation,
    generate a PDB snapshot for easy visualization.
    """
    state = Restart.from_xml(state)
    system = SystemHandler.load(topology, positions=state.positions)
    if output is None:
        output = topology + '.pdb'
    system.write_pdb(output)
项目:aetros-cli    作者:aetros    | 项目源码 | 文件源码
def assertParametersConverted(self, actual, expected):
        print(yaml.load(expected, Loader=yaml.RoundTripLoader)['parameters'])
        print(lose_parameters_to_full(yaml.load(actual, Loader=yaml.RoundTripLoader)['parameters']))
        self.assertEquals(
            yaml.load(expected, Loader=yaml.RoundTripLoader),
            {'parameters': lose_parameters_to_full(yaml.load(actual, Loader=yaml.RoundTripLoader)['parameters'])}
        )
项目:petal    作者:hdmifish    | 项目源码 | 文件源码
def load(self, vb=False):
        try:
            with open('config.yaml', 'r') as fp:
                self.doc = yaml.load(fp, Loader=yaml.RoundTripLoader)
        except IOError as e:
            log.err("Could not open config.yaml: " + str(e))
        except Exception as e:
            log.err("An unexcpected exception of type: "
                    + type(e).__name__
                    + "has occurred: " + str(e))
        else:
            return self
项目:Box    作者:cdgriffith    | 项目源码 | 文件源码
def _from_json(json_string=None, filename=None,
               encoding="utf-8", errors="strict", multiline=False, **kwargs):
    if filename:
        with open(filename, 'r', encoding=encoding, errors=errors) as f:
            if multiline:
                data = [json.loads(line.strip(), **kwargs) for line in f
                        if line.strip() and not line.strip().startswith("#")]
            else:
                data = json.load(f, **kwargs)
    elif json_string:
        data = json.loads(json_string, **kwargs)
    else:
        raise BoxError('from_json requires a string or filename')
    return data
项目:Box    作者:cdgriffith    | 项目源码 | 文件源码
def _from_yaml(yaml_string=None, filename=None,
               encoding="utf-8", errors="strict",
               **kwargs):
    if filename:
        with open(filename, 'r',
                  encoding=encoding, errors=errors) as f:
            data = yaml.load(f, **kwargs)
    elif yaml_string:
        data = yaml.load(yaml_string, **kwargs)
    else:
        raise BoxError('from_yaml requires a string or filename')
    return data

# Helper functions
项目:Box    作者:cdgriffith    | 项目源码 | 文件源码
def from_json(cls, json_string=None, filename=None, encoding="utf-8",
                  errors="strict", multiline=False, **kwargs):
        """
        Transform a json object string into a BoxList object. If the incoming
        json is a dict, you must use Box.from_json.

        :param json_string: string to pass to `json.loads`
        :param filename: filename to open and pass to `json.load`
        :param encoding: File encoding
        :param errors: How to handle encoding errors
        :param multiline: One object per line
        :param kwargs: parameters to pass to `Box()` or `json.loads`
        :return: BoxList object from json data
        """
        bx_args = {}
        for arg in kwargs.copy():
            if arg in BOX_PARAMETERS:
                bx_args[arg] = kwargs.pop(arg)

        data = _from_json(json_string, filename=filename, encoding=encoding,
                          errors=errors, multiline=multiline, **kwargs)

        if not isinstance(data, list):
            raise BoxError('json data not returned as a list, '
                           'but rather a {0}'.format(type(data).__name__))
        return cls(data, **bx_args)
项目:autosklearn-zeroconf    作者:paypal    | 项目源码 | 文件源码
def read_parameter(parameter_file, parameter):
    fr = open(parameter_file, "r")
    param = yaml.load(fr, yaml.RoundTripLoader)
    return merge_two_dicts(parameter,param)
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def get_and_validate_mailer_config(args):
    with open(args.config) as fh:
        config = yaml.load(fh.read(), Loader=yaml.SafeLoader)
    jsonschema.validate(config, CONFIG_SCHEMA)
    utils.setup_defaults(config)
    return config
项目:smarthome    作者:smarthomeNG    | 项目源码 | 文件源码
def yaml_load_fromstring(string, ordered=False):
    """
    Load contents of a string into an dict/OrderedDict structure. The string has to be valid yaml

    :param string: name of the yaml file to load
    :type string: str
    :param ordered: load to an OrderedDict? Default=False
    :type ordered: bool

    :return: configuration data loaded from the file (or None if an error occured)
    :rtype: Dict | OrderedDict | None
    """

    dict_type = 'dict'
    if ordered:
        dict_type = 'OrderedDict'
    logger.info("Loading '{}' to '{}'".format(string, dict_type))
    y = None

    estr = ''
    try:
        sdata = string
#        sdata = sdata.replace('\n', '\n\n')
        if ordered:
            y = _ordered_load(sdata, yaml.SafeLoader)
        else:
            y = yaml.load(sdata, yaml.SafeLoader)
    except Exception as e:
        estr = str(e)
        if "found character '\\t'" in estr:
            estr = estr[estr.find('line'):]
            estr = 'TABs are not allowed in YAML files, use spaces for indentation instead!\nError in ' + estr
        if ("while scanning a simple key" in estr) and ("could not found expected ':'" in estr):
            estr = estr[estr.find('column'):estr.find('could not')]
            estr = 'The colon (:) following a key has to be followed by a space. The space is missing!\nError in ' + estr

    return y, estr
项目:smarthome    作者:smarthomeNG    | 项目源码 | 文件源码
def load(self):
        """
        load the contents of the yaml-file to the data-structure
        """
        self.data = yaml_load_roundtrip(self.filename)