我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ruamel.yaml.load()。
def load_config_spec(config_spec, config_sections, repl_vars, language): config_split = config_spec.strip().split(':') config_path = config_split[0] if len(config_split) > 1: config_sections = config_split[1].split('|') with open(config_path) as config_file: all_config_data = yaml.load(config_file, Loader=yaml.Loader) # Make a list of the appropriate configuration sections (just the ones # we are actually using) from the YAML file. segments = [all_config_data[i] for i in config_sections] segments.append(all_config_data.get(language, {})) # Merge all of the segments of data into a single config dictionary. config = merge(*segments) # Perform final replacements. return replace_vars(config, repl_vars)
def generate_cwl_documentation(_): cur_dir = os.path.abspath(os.path.dirname(__file__)) # find all cwl files with WorkflowGenerator() as wf: cwl_files = [step.run for step in wf.steps_library.steps.values()] # sort alphabetically cwl_files.sort() tools_file = os.path.join(cur_dir, 'tools.rst') tool_template = '\n{}\n{}\n\n{}\n' with codecs.open(tools_file, 'wb', encoding='utf-8') as f: f.write('Tools\n=====\n') f.write('\n``nlppln`` contains the following tools:\n') for cwl in cwl_files: tool_name = os.path.basename(cwl) plusses = '+'*len(tool_name) with codecs.open(cwl) as c: try: cwl_yaml = yaml.load(c, Loader=yaml.RoundTripLoader) doc = cwl_yaml.get('doc', 'No documentation') f.write(tool_template.format(tool_name, plusses, doc)) except yaml.YAMLError: pass
def from_yaml(): """ Load configuration from yaml source(s), cached to only run once """ default_yaml_str = snippets.get_snippet_content('hatchery.yml') ret = yaml.load(default_yaml_str, Loader=yaml.RoundTripLoader) for config_path in CONFIG_LOCATIONS: config_path = os.path.expanduser(config_path) if os.path.isfile(config_path): with open(config_path) as config_file: config_dict = yaml.load(config_file, Loader=yaml.RoundTripLoader) if config_dict is None: continue for k, v in config_dict.items(): if k not in ret.keys(): raise ConfigError( 'found garbage key "{}" in {}'.format(k, config_path) ) ret[k] = v return ret
def from_parmed(cls, path, *args, **kwargs): """ Try to load a file automatically with ParmEd. Not guaranteed to work, but might be useful if it succeeds. Arguments --------- path : str Path to file that ParmEd can load """ st = parmed.load_file(path, structure=True, *args, **kwargs) box = kwargs.pop('box', getattr(st, 'box', None)) velocities = kwargs.pop('velocities', getattr(st, 'velocities', None)) positions = kwargs.pop('positions', getattr(st, 'positions', None)) return cls(master=st, topology=st.topology, positions=positions, box=box, velocities=velocities, path=path, **kwargs)
def _pickle_load(path): """ Loads pickled topology. Careful with Python versions though! """ _, ext = os.path.splitext(path) topology = None if sys.version_info.major == 2: if ext == '.pickle2': with open(path, 'rb') as f: topology = pickle.load(f) elif ext in ('.pickle3', '.pickle'): with open(path, 'rb') as f: topology = pickle.load(f, protocol=3) elif sys.version_info.major == 3: if ext == '.pickle2': with open(path, 'rb') as f: topology = pickle.load(f) elif ext in ('.pickle3', '.pickle'): with open(path, 'rb') as f: topology = pickle.load(f) if topology is None: raise ValueError('File {} is not compatible with this version'.format(path)) return topology
def from_json(cls, json_string=None, filename=None, encoding="utf-8", errors="strict", **kwargs): """ Transform a json object string into a Box object. If the incoming json is a list, you must use BoxList.from_json. :param json_string: string to pass to `json.loads` :param filename: filename to open and pass to `json.load` :param encoding: File encoding :param errors: How to handle encoding errors :param kwargs: parameters to pass to `Box()` or `json.loads` :return: Box object from json data """ bx_args = {} for arg in kwargs.copy(): if arg in BOX_PARAMETERS: bx_args[arg] = kwargs.pop(arg) data = _from_json(json_string, filename=filename, encoding=encoding, errors=errors, **kwargs) if not isinstance(data, dict): raise BoxError('json data not returned as a dictionary, ' 'but rather a {0}'.format(type(data).__name__)) return cls(data, **bx_args)
def from_yaml(cls, yaml_string=None, filename=None, encoding="utf-8", errors="strict", **kwargs): """ Transform a yaml object string into a Box object. :param yaml_string: string to pass to `yaml.load` :param filename: filename to open and pass to `yaml.load` :param encoding: File encoding :param errors: How to handle encoding errors :param kwargs: parameters to pass to `Box()` or `yaml.load` :return: Box object from yaml data """ bx_args = {} for arg in kwargs.copy(): if arg in BOX_PARAMETERS: bx_args[arg] = kwargs.pop(arg) data = _from_yaml(yaml_string=yaml_string, filename=filename, encoding=encoding, errors=errors, **kwargs) if not isinstance(data, dict): raise BoxError('yaml data not returned as a dictionary' 'but rather a {0}'.format(type(data).__name__)) return cls(data, **bx_args)
def main(): parser = setup_parser() options = parser.parse_args() log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' logging.basicConfig(level=logging.DEBUG, format=log_format) logging.getLogger('botocore').setLevel(logging.WARNING) with open(options.config) as fh: config = yaml.load(fh.read(), Loader=yaml.SafeLoader) jsonschema.validate(config, CONFIG_SCHEMA) setup_defaults(config) tester = MailerTester( options.MESSAGE_FILE, config, msg_plain=options.plain, json_dump_file=options.json_dump_file ) tester.run(options.dry_run, options.print_only)
def _ordered_load(stream, Loader=yaml.Loader, object_pairs_hook=OrderedDict): """ Ordered yaml loader Use this instead ot yaml.loader/yaml.saveloader to get an Ordereddict :param stream: stream to read from :param Loader: yaml-loader to use :object_pairs_hook: ... :return: OrderedDict structure """ # usage example: ordered_load(stream, yaml.SafeLoader) class OrderedLoader(Loader): pass def construct_mapping(loader, node): loader.flatten_mapping(node) return object_pairs_hook(loader.construct_pairs(node)) OrderedLoader.add_constructor( yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, construct_mapping) return yaml.load(stream, OrderedLoader)
def yaml_load_roundtrip(filename): """ Load contents of a yaml file into an dict structure for editing (using Roundtrip Loader) :param filename: name of the yaml file to load :return: data structure loaded from file """ if not EDITING_ENABLED: return None y = None try: with open(filename+YAML_FILE, 'r') as stream: sdata = stream.read() sdata = sdata.replace('\n', '\n\n') y = yaml.load(sdata, yaml.RoundTripLoader) except Exception as e: logger.error("yaml_load_roundtrip: YAML-file load error: '%s'" % (e)) y = {} return y
def writeBackToFile(filename, itempath, itemattr, value): """ write the value of an item's attribute back to the yaml-file :param filename: name of the yaml-file (without the .yaml extension!) :param itempath: path of the item to modify :param itemattr: name of the item's attribute to modify :param value: new value for the attribute :return: formatted string """ itemyamlfile = yamlfile(filename) if os.path.isfile(filename+YAML_FILE): itemyamlfile.load() itemyamlfile.setleafvalue(itempath, itemattr, value) itemyamlfile.save() # ================================================================================== # class yamlfile (for editing multiple entries at a time) #
def yaml_save(filename, data): """ ***Converter Special *** Save contents of an OrderedDict structure to a yaml file :param filename: name of the yaml file to save to :param data: OrderedDict to save """ sdata = convert_yaml(data) print(", saving to '{}'".format(os.path.basename(filename)+'.yaml')) if store_raw_output == True: with open(filename+'_raw.yaml', 'w') as outfile: outfile.write( sdata ) # Test if roundtrip gives the same result data = yaml.load(sdata, yaml.RoundTripLoader) _yaml_save_roundtrip(filename, data)
def load_settings(default_settings_file, override_settings_files): yaml = ruamel.yaml.YAML() yaml.allow_duplicate_keys = False try: log("Loading common default settings from: " + DEFAULT_COMMON_SETTINGS_FILE) settings = dict(yaml.load(open(DEFAULT_COMMON_SETTINGS_FILE))) log("Loading default settings from: " + default_settings_file) settings.update(yaml.load(open(default_settings_file))) for settings_fpath in override_settings_files: log("Loading settings from: " + settings_fpath) override_settings = yaml.load(open(settings_fpath)) settings.update(override_settings) log("Loaded settings.") except ruamel.yaml.constructor.DuplicateKeyError as ex: log(red(ex)) log(red("Aborting!")) exit(1) return settings
def import_cwl(self, cwl_path): """ Load content of cwl into the :class:`cwlgen.CommandLineTool` object. :param cwl_path: Path of the CWL tool to be loaded. :type cwl_path: STRING :return: CWL tool content in cwlgen model. :rtype: :class:`cwlgen.CommandLineTool` """ with open(cwl_path) as yaml_file: cwl_dict = ryaml.load(yaml_file, Loader=ryaml.Loader) tool = self._init_tool(cwl_dict) for key, element in cwl_dict.items(): try: getattr(self, '_load_{}'.format(key))(tool, element) except AttributeError: logger.warning(key + " content is not processed (yet).") return tool
def main(*args): """Main method of artman.""" # If no arguments are sent, we are using the entry point; derive # them from sys.argv. if not args: args = sys.argv[1:] # Get to a normalized set of arguments. flags = parse_args(*args) user_config = read_user_config(flags) _adjust_root_dir(flags.root_dir) pipeline_name, pipeline_kwargs = normalize_flags(flags, user_config) if flags.local: try: pipeline = pipeline_factory.make_pipeline(pipeline_name, False, **pipeline_kwargs) # Hardcoded to run pipeline in serial engine, though not necessarily. engine = engines.load( pipeline.flow, engine='serial', store=pipeline.kwargs) engine.run() except: logger.fatal(traceback.format_exc()) finally: _change_owner(flags, pipeline_name, pipeline_kwargs) else: support.check_docker_requirements(flags.image) # Note: artman currently won't work if input directory doesn't contain # shared configuration files (e.g. gapic/packaging/dependencies.yaml). # This will make artman less useful for non-Google APIs. # TODO(ethanbao): Fix that by checking the input directory and # pulling the shared configuration files if necessary. logger.info('Running artman command in a Docker instance.') _run_artman_in_docker(flags)
def read_user_config(flags): """Read the user config from disk and return it. Args: flags (argparse.Namespace): The flags from sys.argv. Returns: dict: The user config. """ # Load the user configuration if it exists and save a dictionary. user_config = {} user_config_file = os.path.realpath(os.path.expanduser(flags.user_config)) if os.path.isfile(user_config_file): with io.open(user_config_file) as ucf: user_config = yaml.load(ucf.read(), Loader=yaml.Loader) or {} # Sanity check: Is there a configuration? If not, abort. if not user_config: setup_logging(INFO) logger.critical('No user configuration found.') logger.warn('This is probably your first time running Artman.') logger.warn('Run `configure-artman` to get yourself set up.') sys.exit(64) # Done; return the user config. return user_config
def execute(self, gapic_code_dir, grpc_code_dir, proto_code_dir, gapic_api_yaml): with open(gapic_api_yaml[0]) as f: gapic_config = yaml.load(f, Loader=yaml.Loader) package_name = gapic_config.get('language_settings').get('csharp').get('package_name') package_root = '{0}/{1}'.format(gapic_code_dir, package_name) prod_dir = '{0}/{1}'.format(package_root, package_name) # Copy proto/grpc .cs files into prod directory self.exec_command(['sh', '-c', 'cp {0}/*.cs {1}'.format(proto_code_dir, prod_dir)]) self.exec_command(['sh', '-c', 'cp {0}/*.cs {1}'.format(grpc_code_dir, prod_dir)])
def execute(self, src_proto_path, import_proto_path, common_protos_yaml, organization_name): self._organization_name = organization_name with io.open(common_protos_yaml) as file_: common_protos_data = yaml.load(file_, Loader=yaml.Loader) # Treat google.protobuf, google.iam as a common proto package, even # though they are not included in the common-protos we generate. # # TODO (geigerj): remove 'google.iam' when it is included in the common # protos package. common_protos = ['google.protobuf', 'google.iam'] for package in common_protos_data['packages']: common_protos.append('google.' + package['name'].replace('/', '.')) tmpdir = os.path.join( tempfile.gettempdir(), 'artman-python', str(int(time.time()))) new_proto_dir = os.path.join(tmpdir, 'proto') new_src_path = set() new_import_path = [new_proto_dir] self._copy_and_transform_directories( src_proto_path, new_proto_dir, common_protos, paths=new_src_path) self._copy_and_transform_directories( import_proto_path, new_proto_dir, common_protos) # Update src_proto_path, import_proto_path return list(new_src_path), new_import_path
def read_header(path): path = Path(path) data = yload(path.text(encoding='utf8')) return data['header']
def _parse_data(self): with open(self.filename, 'r') as fin: data = json.load(fin) return data
def __init__(self, data=None, converts_none_to_str=True): """Could be a JSON or a YAML file :param str filename: filename to a config file in json or YAML format. SEQUANA config files must have some specific fields:: input_directory input_samples... """ # Create a dummy YAML code to hold data in case the input is a json # or a dictionary structure. We use a CommentedMap that works like # a dictionary. Be aware that the update method will lose the comments if data is None: self.config = AttrDict() self._yaml_code = comments.CommentedMap() elif isinstance(data, str): # else is it a filename ? if os.path.exists(data): if data.endswith(".yaml") or data.endswith(".yml"): with open(data, "r") as fh: self._yaml_code = ruamel.yaml.load( fh.read(), ruamel.yaml.RoundTripLoader) else: # read a JSON import yaml with open(data, "r") as fh: self._yaml_code = yaml.load(json.dumps( json.loads(fh.read()))) config = load_configfile(data) else: raise IOError("input string must be an existing file (%s)" % data) self.config = AttrDict(**config) elif isinstance(data, SequanaConfig): # else maybe a SequanaConfig ? self.config = AttrDict(**data.config) self._yaml_code = comments.CommentedMap(self.config.copy()) else: # or a pure dictionary ? self.config = AttrDict(**data) self._yaml_code = comments.CommentedMap(self.config.copy()) self.cleanup_config()
def add_stats_summary_json(json_list, parser): if not parser.stats: return for jfile in json_list: with open(jfile, 'r') as fp: jdict = json.load(fp) jdict['stats'] = parser.stats j = json.dumps(jdict) with open(jfile, 'w') as fp: print(j, file=fp)
def load_config(config_path, loader=yaml.Loader, verify_version=True): if not os.path.exists(config_path): system_log.error(_("config.yml not found in {config_path}").format(config_path)) return False with codecs.open(config_path, encoding="utf-8") as stream: config = yaml.load(stream, loader) if verify_version: config = config_version_verify(config, config_path) return config
def load_config(config_path, loader=yaml.Loader): if config_path is None: return {} if not os.path.exists(config_path): system_log.error(_(u"config.yml not found in {config_path}").format(config_path)) return False with codecs.open(config_path, encoding="utf-8") as stream: config = yaml.load(stream, loader) return config
def __init__(self, filename): self.filename = filename with open(filename, 'r') as f: self.data = yaml.load(f, Loader=yaml.RoundTripLoader)
def load_meas_file(filename=None): global LogDir, KernelDir, AWGDir, meas_file if filename: meas_file = filename else: meas_file = find_meas_file() with open(meas_file, 'r') as fid: Loader.add_constructor('!include', Loader.include) load = Loader(fid) code = load.get_single_data() load.dispose() # Get the config values out of the measure_file. if not 'config' in code.keys(): raise KeyError("Could not find config section of the yaml file.") if 'AWGDir' in code['config'].keys(): AWGDir = os.path.abspath(code['config']['AWGDir']) else: raise KeyError("Could not find AWGDir in the YAML config section") if 'KernelDir' in code['config'].keys(): KernelDir = os.path.abspath(code['config']['KernelDir']) else: raise KeyError("Could not find KernelDir in the YAML config section") if 'LogDir' in code['config'].keys(): LogDir = os.path.abspath(code['config']['LogDir']) else: raise KeyError("Could not find LogDir in the YAML config section") # Create directories if necessary for d in [KernelDir, LogDir]: if not os.path.isdir(d): os.mkdir(d) return code
def _init(): parallelism = config.get('parallelism', 1) logger.info('Using parallelism: %d', parallelism) for sandbox in await create_sandboxes(parallelism): _sandbox_pool.put_nowait(sandbox) try: with open(_LANGS_FILE) as file: langs_config = yaml.load(file, Loader=yaml.RoundTripLoader) except FileNotFoundError: logger.error('Language file %s not found.', _LANGS_FILE) exit(1) for lang_name, lang_config in langs_config.items(): if lang_config['type'] == 'compiler': compiler = Compiler(lang_config['compiler_file'], shlex.split(lang_config['compiler_args']), lang_config['code_file'], lang_config['execute_file'], shlex.split(lang_config['execute_args'])) _langs[lang_name] = partial( _compiler_build, compiler, time_limit_ns=lang_config.get('time_limit_ms', DEFAULT_TIME_MS) * 1000000, memory_limit_bytes=lang_config.get('memory_limit_kb', DEFAULT_MEM_KB) * 1024, process_limit=lang_config.get('process_limit', PROCESS_LIMIT)) elif lang_config['type'] == 'interpreter': interpreter = Interpreter(lang_config['code_file'], lang_config['execute_file'], shlex.split(lang_config['execute_args'])) _langs[lang_name] = partial(_interpreter_build, interpreter) else: logger.error('Unknown type %s', lang_config['type'])
def _load_config(): try: with open(_CONFIG_FILE, encoding='utf-8') as file: return yaml.load(file, Loader=yaml.RoundTripLoader) except FileNotFoundError: logger.error('Config file %s not found.', _CONFIG_FILE) exit(1)
def load(f): return ruamel.yaml.load(f, ruamel.yaml.RoundTripLoader)
def load(stream, Loader=yaml.SafeLoader, object_pairs_hook=OrderedDict): class OrderedLoader(Loader): pass def construct_mapping(loader, node): loader.flatten_mapping(node) return object_pairs_hook(loader.construct_pairs(node)) OrderedLoader.add_constructor( yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, construct_mapping) return yaml.load(stream, OrderedLoader)
def save(data, stream=None, Dumper=yaml.SafeDumper, default_flow_style=False, encoding='utf-8', **kwds): class OrderedDumper(Dumper): pass def _dict_representer(dumper, data): return dumper.represent_mapping( yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, data.items()) OrderedDumper.add_representer(OrderedDict, _dict_representer) OrderedDumper.add_representer(np.float64, lambda dumper, data: dumper.represent_float(float(data))) OrderedDumper.add_representer(complex, lambda dumper, data: dumper.represent_str(str(data))) OrderedDumper.add_representer(np.complex128, lambda dumper, data: dumper.represent_str(str(data))) OrderedDumper.add_representer(np.ndarray, lambda dumper, data: dumper.represent_list(list(data))) # I added the following two lines to make pyrpl compatible with pyinstruments. In principle they can be erased if isinstance(data, dict) and not isinstance(data, OrderedDict): data = OrderedDict(data) return yaml.dump(data, stream=stream, Dumper=OrderedDumper, default_flow_style=default_flow_style, encoding=encoding, **kwds) # usage example: # load(stream, yaml.SafeLoader) # save(data, stream=f, Dumper=yaml.SafeDumper)
def _set_yml(self, yml_content): """ :param yml_content: sets the branch to yml_content :return: None """ branch = load(yml_content) self._parent._data[self._branch] = branch self._save()
def _load(self): """ loads data from file """ if self._filename is None: # if no file is used, just ignore this call return logger.debug("Loading config file %s", self._filename) # read file from disc with open(self._filename) as f: self._data = load(f) # store the modification time of this file version self._mtime = os.path.getmtime(self._filename) # make sure that reload timeout starts from this moment self._lastreload = time() # empty file gives _data=None if self._data is None: self._data = OrderedDict() # update dict of the MemoryTree object to_remove = [] # remove all obsolete entries for name in self.__dict__: if not name.startswith('_') and name not in self._data: to_remove.append(name) for name in to_remove: self.__dict__.pop(name) # insert the branches into the object __dict__ for auto-completion self.__dict__.update(self._data)
def from_workspace(cls, path): filename = os.path.join(path, '.shipmaster.yaml') if not os.path.exists(filename): return None with open(filename, 'r') as file: return cls.from_kwargs(path, **yaml.load(file, yaml.RoundTripLoader))
def test_fileformatyaml_pass_no_substitutions(): """Relative path to file should succeed. Strictly speaking not a unit test. """ context = Context({ 'ok1': 'ov1', 'fileFormatYamlIn': './tests/testfiles/test.yaml', 'fileFormatYamlOut': './tests/testfiles/out/out.yaml'}) fileformat.run_step(context) assert context, "context shouldn't be None" assert len(context) == 3, "context should have 2 items" assert context['ok1'] == 'ov1' assert context['fileFormatYamlIn'] == './tests/testfiles/test.yaml' assert context['fileFormatYamlOut'] == './tests/testfiles/out/out.yaml' with open('./tests/testfiles/out/out.yaml') as outfile: outcontents = yaml.load(outfile, Loader=yaml.RoundTripLoader) assert len(outcontents) == 3 assert outcontents['key'] == 'value1 !£$%# *' assert outcontents['key2'] == 'blah' assert outcontents['key3'] == ['l1', '!£$% *', 'l2', [ 'l31', {'l32': ['l321', 'l322']} ] ] # atrociously lazy test clean-up os.remove('./tests/testfiles/out/out.yaml')
def load(config_path): with config_path.open() as fp: config = yaml.load(fp.read(), yaml.RoundTripLoader) envs = dict(_parse_environments(config.get('environments', {}))) zones = dict(_parse_zones(config.get('zones', {}))) return envs, zones
def construct_include(self, node): """Include file referenced at node.""" filename = os.path.join(self._root, self.construct_scalar(node)) filename = os.path.abspath(filename) extension = os.path.splitext(filename)[1].lstrip('.') with open(filename, 'r') as f: if extension in ('yaml', 'yml'): return yaml.load(f, Loader=self) else: return ''.join(f.readlines())
def load(cls, path, *args, **kwargs): name, ext = os.path.splitext(path) try: return cls._loaders(ext.lstrip('.'))(path, *args, **kwargs) except KeyError: logger.error('! Unknown loader for format %s. ' 'Trying with ParmEd as fallback', ext) return cls.from_parmed(path, *args, **kwargs) except IOError: raise IOError('Could not access file {}'.format(path))
def __init__(self, master=None, **kwargs): InputContainer.__init__(self, **kwargs) if isinstance(master, str): raise ValueError('To instantiate from file, use .load() or ' 'one of the .from_*() methods.') self.master = master self._path = kwargs.get('path')
def prepare_handler(cfg): """ Load all files into single object. """ positions, velocities, box = None, None, None _path = cfg['_path'] forcefield = cfg.pop('forcefield', None) topology = sanitize_path_for_file(cfg.pop('topology'), _path) if 'checkpoint' in cfg: restart_path = sanitize_path_for_file(cfg['checkpoint'], _path) restart = Restart.load(restart_path) positions = restart.positions velocities = restart.velocities box = restart.box if 'positions' in cfg: positions_path = sanitize_path_for_file(cfg.pop('positions'), _path) positions = Positions.load(positions_path) box = BoxVectors.load(positions_path) if 'velocities' in cfg: velocities_path = sanitize_path_for_file(cfg.pop('velocities'), _path) velocities = Velocities.load(velocities_path) if 'box' in cfg: box_path = sanitize_path_for_file(cfg.pop('box'), _path) box = BoxVectors.load(box_path) options = {} for key in 'positions velocities box forcefield'.split(): value = locals()[key] if value is not None: options[key] = value return SystemHandler.load(topology, **options)
def statexml2pdb(topology, state, output=None): """ Given an OpenMM xml file containing the state of the simulation, generate a PDB snapshot for easy visualization. """ state = Restart.from_xml(state) system = SystemHandler.load(topology, positions=state.positions) if output is None: output = topology + '.pdb' system.write_pdb(output)
def assertParametersConverted(self, actual, expected): print(yaml.load(expected, Loader=yaml.RoundTripLoader)['parameters']) print(lose_parameters_to_full(yaml.load(actual, Loader=yaml.RoundTripLoader)['parameters'])) self.assertEquals( yaml.load(expected, Loader=yaml.RoundTripLoader), {'parameters': lose_parameters_to_full(yaml.load(actual, Loader=yaml.RoundTripLoader)['parameters'])} )
def load(self, vb=False): try: with open('config.yaml', 'r') as fp: self.doc = yaml.load(fp, Loader=yaml.RoundTripLoader) except IOError as e: log.err("Could not open config.yaml: " + str(e)) except Exception as e: log.err("An unexcpected exception of type: " + type(e).__name__ + "has occurred: " + str(e)) else: return self
def _from_json(json_string=None, filename=None, encoding="utf-8", errors="strict", multiline=False, **kwargs): if filename: with open(filename, 'r', encoding=encoding, errors=errors) as f: if multiline: data = [json.loads(line.strip(), **kwargs) for line in f if line.strip() and not line.strip().startswith("#")] else: data = json.load(f, **kwargs) elif json_string: data = json.loads(json_string, **kwargs) else: raise BoxError('from_json requires a string or filename') return data
def _from_yaml(yaml_string=None, filename=None, encoding="utf-8", errors="strict", **kwargs): if filename: with open(filename, 'r', encoding=encoding, errors=errors) as f: data = yaml.load(f, **kwargs) elif yaml_string: data = yaml.load(yaml_string, **kwargs) else: raise BoxError('from_yaml requires a string or filename') return data # Helper functions
def from_json(cls, json_string=None, filename=None, encoding="utf-8", errors="strict", multiline=False, **kwargs): """ Transform a json object string into a BoxList object. If the incoming json is a dict, you must use Box.from_json. :param json_string: string to pass to `json.loads` :param filename: filename to open and pass to `json.load` :param encoding: File encoding :param errors: How to handle encoding errors :param multiline: One object per line :param kwargs: parameters to pass to `Box()` or `json.loads` :return: BoxList object from json data """ bx_args = {} for arg in kwargs.copy(): if arg in BOX_PARAMETERS: bx_args[arg] = kwargs.pop(arg) data = _from_json(json_string, filename=filename, encoding=encoding, errors=errors, multiline=multiline, **kwargs) if not isinstance(data, list): raise BoxError('json data not returned as a list, ' 'but rather a {0}'.format(type(data).__name__)) return cls(data, **bx_args)
def read_parameter(parameter_file, parameter): fr = open(parameter_file, "r") param = yaml.load(fr, yaml.RoundTripLoader) return merge_two_dicts(parameter,param)
def get_and_validate_mailer_config(args): with open(args.config) as fh: config = yaml.load(fh.read(), Loader=yaml.SafeLoader) jsonschema.validate(config, CONFIG_SCHEMA) utils.setup_defaults(config) return config
def yaml_load_fromstring(string, ordered=False): """ Load contents of a string into an dict/OrderedDict structure. The string has to be valid yaml :param string: name of the yaml file to load :type string: str :param ordered: load to an OrderedDict? Default=False :type ordered: bool :return: configuration data loaded from the file (or None if an error occured) :rtype: Dict | OrderedDict | None """ dict_type = 'dict' if ordered: dict_type = 'OrderedDict' logger.info("Loading '{}' to '{}'".format(string, dict_type)) y = None estr = '' try: sdata = string # sdata = sdata.replace('\n', '\n\n') if ordered: y = _ordered_load(sdata, yaml.SafeLoader) else: y = yaml.load(sdata, yaml.SafeLoader) except Exception as e: estr = str(e) if "found character '\\t'" in estr: estr = estr[estr.find('line'):] estr = 'TABs are not allowed in YAML files, use spaces for indentation instead!\nError in ' + estr if ("while scanning a simple key" in estr) and ("could not found expected ':'" in estr): estr = estr[estr.find('column'):estr.find('could not')] estr = 'The colon (:) following a key has to be followed by a space. The space is missing!\nError in ' + estr return y, estr
def load(self): """ load the contents of the yaml-file to the data-structure """ self.data = yaml_load_roundtrip(self.filename)