Python ruamel.yaml 模块,safe_load() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ruamel.yaml.safe_load()

项目:atoolbox    作者:liweitianux    | 项目源码 | 文件源码
def __init__(self, configfile, dryrun=False, debug=False):
        self.configfile = configfile
        self.config = yaml_load(open(configfile))
        logger.info("Loaded configuration file: %s" % configfile)
        src_root = self.config.get("src_root", "/")
        if os.path.isabs(src_root):
            self.src_root = src_root
            logger.info("Source root directory: %s" % self.src_root)
        else:
            raise ValueError("Source root must be an absolute path")
        self.syspath = syspath.union(self.config.get("syspath", []))
        logger.info("Protected system paths: {0}".format(self.syspath))
        dest_root = os.path.expanduser(self.config["dest_root"])
        logger.info("Check backup destination against protected paths ...")
        self.dest_root = self.check_dest(dest_root)
        logger.info("Backup destination: %s" % self.dest_root)
        self.dryrun = dryrun
        logger.info("Dry run mode: %s" % dryrun)
        self.debug = debug
        logger.info("Show DEBUG information: %s" % debug)
项目:audio-feeder    作者:pganssle    | 项目源码 | 文件源码
def load_schema(schema_file=None):
    schema_file = schema_file or get_configuration().schema_loc
    schema_file = os.path.abspath(schema_file)

    schema_cache = getattr(load_schema, '_schemas', {})
    if schema_file in schema_cache:
        return schema_cache[schema_file]

    with open(schema_file, 'r') as sf:
        schema = yaml.safe_load(sf)

    if 'tables' not in schema:
        raise ValueError('Tables list missing from schema.')

    if 'types' not in schema:
        raise ValueError('Types missing from schema.')

    schema_cache[schema_file] = schema

    return schema_cache[schema_file]
项目:pypyr-cli    作者:pypyr    | 项目源码 | 文件源码
def get_parsed_context(context_arg):
    """Parse input context string and returns context as dictionary."""
    assert context_arg, ("pipeline must be invoked with --context set. For "
                         "this yaml parser you're looking for something "
                         "like --context './myyamlfile.yaml'")
    logger.debug("starting")
    logger.debug(f"attempting to open file: {context_arg}")
    with open(context_arg) as yaml_file:
        payload = yaml.safe_load(yaml_file)

    logger.debug(f"yaml file parsed. Count: {len(payload)}")

    if not isinstance(payload, MutableMapping):
        raise TypeError("yaml input should describe a dictionary at the top "
                        "level. You should have something like "
                        "\n'key1: value1'\n key2: value2'\n"
                        "in the yaml top-level, not \n'- value1\n - value2'")

    logger.debug("done")
    return payload
项目:aetros-cli    作者:aetros    | 项目源码 | 文件源码
def prepare_git_user(self):
        """
        Tries to read the git name and email, so all git commits have correct author.
        Requests /api/user-git to check which user is behind the current configured ssh key.
        """
        import aetros.api
        try:
            response = aetros.api.request('user-git')
            if response:
                user = yaml.safe_load(response)

                self.git_name = user['name']
                self.git_email = user['email']
            else:
                self.go_offline()
        except ApiConnectionError as e:
            self.go_offline()
项目:InfoBot    作者:gdude2002    | 项目源码 | 文件源码
def load_server(self, server_id) -> bool:
        if not os.path.exists("data/{}".format(server_id)):
            return False

        log.debug("Loading server: {}".format(server_id))

        config = yaml.safe_load(open("data/{}/config.yml".format(server_id), "r"))
        sections = yaml.safe_load(open("data/{}/sections.yml".format(server_id), "r"))

        if os.path.exists("data/{}/notes.yml".format(server_id)):
            notes = yaml.safe_load(open("data/{}/notes.yml".format(server_id), "r"))
        else:
            notes = DEFAULT_NOTES

        if "notes_channel" not in config:
            config["notes_channel"] = None

        self.data[server_id] = {
            "config": config,
            "sections": self.load_sections(sections)
        }

        self.notes[server_id] = notes

        return True
项目:lightflow    作者:AustralianSynchrotron    | 项目源码 | 文件源码
def _update_from_file(self, filename):
        """ Helper method to update an existing configuration with the values from a file.

        Loads a configuration file and replaces all values in the existing configuration
        dictionary with the values from the file.

        Args:
            filename (str): The path and name to the configuration file.
        """
        if os.path.exists(filename):
            try:
                with open(filename, 'r') as config_file:
                    yaml_dict = yaml.safe_load(config_file.read())
                    if yaml_dict is not None:
                        self._update_dict(self._config, yaml_dict)
            except IsADirectoryError:
                raise ConfigLoadError(
                    'The specified configuration file is a directory not a file')
        else:
            raise ConfigLoadError('The config file {} does not exist'.format(filename))
项目:buildhub    作者:mozilla-services    | 项目源码 | 文件源码
def initialize_kinto(loop, kinto_client, bucket, collection):
    """
    Initialize the remote server with the initialization.yml file.
    """
    # Leverage kinto-wizard async client.
    thread_pool = ThreadPoolExecutor()
    async_client = AsyncKintoClient(kinto_client, loop, thread_pool)

    initialization_manifest = pkgutil.get_data('buildhub', 'initialization.yml')
    config = yaml.safe_load(initialization_manifest)

    # Check that we push the records at the right place.
    if bucket not in config:
        raise ValueError(f"Bucket '{bucket}' not specified in `initialization.yml`.")
    if collection not in config[bucket]['collections']:
        raise ValueError(f"Collection '{collection}' not specified in `initialization.yml`.")

    await initialize_server(async_client,
                            config,
                            bucket=bucket,
                            collection=collection,
                            force=False)
项目:autosklearn-zeroconf    作者:paypal    | 项目源码 | 文件源码
def setup_logging(
        default_path='./parameter/logger.yml',
        default_level=logging.INFO,
        env_key='LOG_CFG'
):
    """Setup logging configuration

    """
    path = os.path.abspath(default_path)
    value = os.getenv(env_key, None)
    if value:
        path = value
    if os.path.exists(os.path.abspath(path)):
        with open(path, 'rt') as f:
            config = yaml.safe_load(f.read())
        logging.config.dictConfig(config)
    else:
        logging.basicConfig(level=default_level)
项目:autocert    作者:mozilla-it    | 项目源码 | 文件源码
def task_example():
    '''
    cp|strip config.yml -> config.yml.example
    '''
    apikey = '82_CHAR_APIKEY'
    punch = fmt('''
    authorities:
        digicert:
            apikey: {apikey}
    destinations:
        zeus:
            apikey: {apikey}
    ''')
    return {
        'actions': [
            fmt('cp {CONFIG_YML}.example {CONFIG_YML}.bak'),
            fmt('cp {CONFIG_YML} {CONFIG_YML}.example'),
            lambda: _update_config(CONFIG_YML+'.example', yaml.safe_load(punch)),
        ],
    }
项目:autocert    作者:mozilla-it    | 项目源码 | 文件源码
def unbundle(dirpath, cert_name):
    key, csr, crt, yml, readme = [None] * 5
    tarpath = fmt('{dirpath}/{cert_name}.tar.gz')
    with tarfile.open(tarpath, 'r:gz') as tar:
        for info in tar.getmembers():
            if info.name.endswith('.key'):
                key = tar.extractfile(info.name).read().decode('utf-8')
            elif info.name.endswith('.csr'):
                csr = tar.extractfile(info.name).read().decode('utf-8')
            elif info.name.endswith('.crt'):
                crt = tar.extractfile(info.name).read().decode('utf-8')
            elif info.name.endswith('.yml'):
                yml = tar.extractfile(info.name).read().decode('utf-8')
                yml = yaml.safe_load(yml)
            elif info.name == 'README':
                readme = tar.extractfile(info.name).read().decode('utf-8')
    return key, csr, crt, yml, readme
项目:autocert    作者:mozilla-it    | 项目源码 | 文件源码
def unbundle(dirpath, cert_name):
    key, csr, crt, yml, readme = [None] * 5
    tarpath = fmt('{dirpath}/{cert_name}.tar.gz')
    with tarfile.open(tarpath, 'r:gz') as tar:
        for info in tar.getmembers():
            if info.name.endswith('.key'):
                key = tar.extractfile(info.name).read().decode('utf-8')
            elif info.name.endswith('.csr'):
                csr = tar.extractfile(info.name).read().decode('utf-8')
            elif info.name.endswith('.crt'):
                crt = tar.extractfile(info.name).read().decode('utf-8')
            elif info.name.endswith('.yml'):
                yml = tar.extractfile(info.name).read().decode('utf-8')
                yml = yaml.safe_load(yml)
            elif info.name == 'README':
                readme = tar.extractfile(info.name).read().decode('utf-8')
    return key, csr, crt, yml, readme
项目:autocert    作者:mozilla-it    | 项目源码 | 文件源码
def unbundle(dirpath, cert_name):
    key, csr, crt, yml, readme = [None] * 5
    tarpath = fmt('{dirpath}/{cert_name}.tar.gz')
    with tarfile.open(tarpath, 'r:gz') as tar:
        for info in tar.getmembers():
            if info.name.endswith('.key'):
                key = tar.extractfile(info.name).read().decode('utf-8')
            elif info.name.endswith('.csr'):
                csr = tar.extractfile(info.name).read().decode('utf-8')
            elif info.name.endswith('.crt'):
                crt = tar.extractfile(info.name).read().decode('utf-8')
            elif info.name.endswith('.yml'):
                yml = tar.extractfile(info.name).read().decode('utf-8')
                yml = yaml.safe_load(yml)
            elif info.name == 'README':
                readme = tar.extractfile(info.name).read().decode('utf-8')
    return key, csr, crt, yml, readme
项目:autocert    作者:mozilla-it    | 项目源码 | 文件源码
def _fixup(obj):
    if isinstance(obj, dict):
        d = deepcopy(obj)
        for k,v in obj.items():
            if isinstance(v, str):
                if 'url' in k:
                    d[k] = URL(v)
                elif 'path' in k:
                    d[k] = Path(v)
                elif 'auth' == k:
                    with open(fmt('{CONFIG_DIR}/{v}'), 'r') as f:
                        d[k] = yaml.safe_load(f.read())
            elif isinstance(v, dict):
                d[k] = _fixup(v)
        return d
    return obj
项目:pypyr-aws    作者:pypyr    | 项目源码 | 文件源码
def run_step(context):
    """Fetch a json file from s3 and put the json values into context.

    Args:
        - context: pypyr.context.Context. Mandatory. Should contain keys for:
            - s3Fetch: dict. mandatory. Must contain:
                - Bucket: string. s3 bucket name.
                - Key: string. s3 key name.

    json parsed from the s3 file will be merged into the
    context. This will overwrite existing values if the same keys are already
    in there. I.e if s3 json has {'eggs' : 'boiled'} and context
    {'eggs': 'fried'} already exists, returned context['eggs'] will be
    'boiled'.
    """
    logger.debug("started")
    response = pypyraws.aws.s3.get_payload(context)

    payload = yaml.safe_load(response)
    logger.debug("successfully parsed yaml from s3 response bytes")
    context.update(payload)
    logger.info("loaded s3 yaml into pypyr context")

    logger.debug("done")
项目:scriptcwl    作者:NLeSC    | 项目源码 | 文件源码
def load_yaml(filename):
    with open(filename) as myfile:
        content = myfile.read()
        if "win" in sys.platform:
            content = content.replace("\\", "/")
        return yaml.safe_load(content)  # myfile.read())
项目:Zabbix-Network-Weathermap    作者:Prototype-X    | 项目源码 | 文件源码
def load(self, path_cfg: str):
        with open(path_cfg, 'r') as stream:
            try:
                self.cfg_dict = yaml3ed.safe_load(stream)
            except yaml3ed.YAMLError as exc:
                print(exc)
        self.check()
        self.zbx = ZabbixAgent(self.cfg_dict['zabbix']['url'], self.cfg_dict['zabbix']['login'],
                               self.cfg_dict['zabbix']['password'])
        log.debug('Config loaded')
项目:artman    作者:googleapis    | 项目源码 | 文件源码
def test_package_metadata_config_gen_task(self):
        task = package_metadata_tasks.PackageMetadataConfigGenTask()
        repo_root = os.path.abspath('.')

        package_dependencies_yaml = os.path.join(
            repo_root,
            'test/testdata/googleapis_test/gapic/packaging/dependencies.yaml')
        package_defaults_yaml = os.path.join(
            repo_root,
            'test/testdata/googleapis_test/gapic/packaging/api_defaults.yaml')

        task.execute(
            api_name='fake',
            api_version='v1',
            gapic_api_yaml=[],
            language='python',
            local_paths={
                'googleapis': '%s/googleapis' % repo_root,
                'reporoot': repo_root,
            },
            organization_name='google-cloud',
            output_dir=str(self.output_dir),
            package_dependencies_yaml=package_dependencies_yaml,
            package_defaults_yaml=package_defaults_yaml,
            proto_deps=['googleapis-common-protos'],
            package_type="grpc_client",
            src_proto_path=['path/to/protos'],
            generated_package_version={'lower': '0.17.29', 'upper': '0.18dev'},
            release_level='beta'
        )
        with open(os.path.join(str(self.output_dir),
                               'google-cloud-fake-v1_package.yaml')) as f:
            actual = yaml.safe_load(f)
        with open('test/testdata/google-cloud-fake-v1_package.yaml') as f:
            expected = yaml.safe_load(f)
        # Don't compare files directly because yaml doesn't preserve ordering
        self.assertDictEqual(actual, expected)
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def load(self, data):
        self.data = yload(data)
项目:jd4    作者:vijos    | 项目源码 | 文件源码
def read_yaml_cases(config, open):
    for case in yaml.safe_load(config)['cases']:
        time = TIME_RE.fullmatch(case['time'])
        if not time:
            raise FormatError(case['time'], 'error parsing time')
        memory = MEMORY_RE.fullmatch(case['memory'])
        if not memory:
            raise FormatError(case['memory'], 'error parsing memory')
        yield DefaultCase(
            partial(open, case['input']),
            partial(open, case['output']),
            int(float(time.group(1)) * TIME_UNITS[time.group(2)]),
            int(float(memory.group(1)) * MEMORY_UNITS[memory.group(2)]),
            int(case['score']))
项目:audio-feeder    作者:pganssle    | 项目源码 | 文件源码
def load_table(table_loc, table_type):
    """
    Loads a table of type ``table_type`` from the YAML file ``table_loc``.
    """
    with open(table_loc, 'r') as yf:
        table_file = yaml.safe_load(yf)

    assert table_file['db_version'] == DB_VERSION
    table_list = table_file['data']

    raw_table = (table_type(**params) for params in table_list)
    table_by_id = {x.id: x for x in raw_table}

    return table_by_id
项目:audio-feeder    作者:pganssle    | 项目源码 | 文件源码
def from_file(cls, file_loc, **kwargs):
        if not os.path.exists(file_loc):
            raise IOError('File not found: {}'.format(file_loc))

        with open(file_loc, 'r') as yf:
            config = yaml.safe_load(yf)

        config.update(kwargs)

        return cls(config_loc_=file_loc, **config)
项目:shipmaster    作者:damoti    | 项目源码 | 文件源码
def from_string(cls, src):
        return cls.from_kwargs(None, **yaml.safe_load(src, yaml.RoundTripLoader))
项目:shanghai    作者:chireiden    | 项目源码 | 文件源码
def load():
    def _load(yaml_string):
        return ryaml.safe_load(dedent(yaml_string))
    return _load
项目:shanghai    作者:chireiden    | 项目源码 | 文件源码
def from_filename(cls, filename: str) -> 'ShanghaiConfiguration':
        with open(filename, 'r', encoding='utf-8') as f:
            yaml_config = ryaml.safe_load(f)
        return cls(yaml_config)
项目:node-agent    作者:Tendrl    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        self._defs = True
        super(Definition, self).__init__(*args, **kwargs)

        self.data = pkg_resources.resource_string(__name__,
                                                  "provisioning.yaml")
        self._parsed_defs = yaml.safe_load(self.data)
        self.value = '_NS/provisioning/definitions'
项目:node-agent    作者:Tendrl    | 项目源码 | 文件源码
def get_parsed_defs(self):
        if self._parsed_defs:
            return self._parsed_defs
        self._parsed_defs = yaml.safe_load(self.data)
        return self._parsed_defs
项目:node-agent    作者:Tendrl    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        self._defs = {}
        super(Definition, self).__init__(*args, **kwargs)

        self.data = pkg_resources.resource_string(__name__, "node_agent.yaml")
        self._parsed_defs = yaml.safe_load(self.data)
        self.value = '_NS/node_agent/definitions'
项目:node-agent    作者:Tendrl    | 项目源码 | 文件源码
def get_parsed_defs(self):
        if self._parsed_defs:
            return self._parsed_defs
        self._parsed_defs = yaml.safe_load(self.data)
        return self._parsed_defs
项目:node-agent    作者:Tendrl    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        self._defs = {}
        super(CompiledDefinitions, self).__init__(*args, **kwargs)

        self.data = definitions.data
        self._parsed_defs = yaml.safe_load(self.data)
        self.value = '_NS/node_agent/compiled_definitions'
项目:node-agent    作者:Tendrl    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        self._defs = {}
        super(Definition, self).__init__(*args, **kwargs)

        self.data = pkg_resources.resource_string(__name__, "gluster.yaml")
        self._parsed_defs = yaml.safe_load(self.data)
        self.value = '_NS/integrations/gluster/definitions'
项目:node-agent    作者:Tendrl    | 项目源码 | 文件源码
def get_parsed_defs(self):
        if self._parsed_defs:
            return self._parsed_defs
        self._parsed_defs = yaml.safe_load(self.data)
        return self._parsed_defs
项目:node-agent    作者:Tendrl    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        self._defs = {}
        super(Definition, self).__init__(*args, **kwargs)

        self.data = pkg_resources.resource_string(__name__, "ceph.yaml")
        self._parsed_defs = yaml.safe_load(self.data)
        self.value = '_NS/integrations/ceph/definitions'
项目:node-agent    作者:Tendrl    | 项目源码 | 文件源码
def get_parsed_defs(self):
        if self._parsed_defs:
            return self._parsed_defs
        self._parsed_defs = yaml.safe_load(self.data)
        return self._parsed_defs
项目:pypyr-cli    作者:pypyr    | 项目源码 | 文件源码
def run_step(context):
    """Loads a yaml file into the pypyr context.

    Yaml parsed from the file will be merged into the pypyr context. This will
    overwrite existing values if the same keys are already in there.
    I.e if file yaml has {'eggs' : 'boiled'} and context {'eggs': 'fried'}
    already exists, returned context['eggs'] will be 'boiled'.

    Args:
        context: pypyr.context.Context. Mandatory.
                 The following context key must exist
                - fetchYamlPath. path-like. Path to file on disk.

    Returns:
        None. updates context arg.

    Raises:
        FileNotFoundError: take a guess
        pypyr.errors.KeyNotInContextError: fetchYamlPath missing in context.
        pypyr.errors.KeyInContextHasNoValueError: fetchYamlPath exists but is
                                                  None.
    """
    logger.debug("started")
    context.assert_key_has_value(key='fetchYamlPath', caller=__name__)

    file_path = context.get_formatted('fetchYamlPath')

    logger.debug(f"attempting to open file: {file_path}")
    with open(file_path) as yaml_file:
        payload = yaml.safe_load(yaml_file)

    if not isinstance(payload, MutableMapping):
        raise TypeError("yaml input should describe a dictionary at the top "
                        "level. You should have something like "
                        "\n'key1: value1'\n key2: value2'\n"
                        "in the yaml top-level, not \n'- value1\n - value2'")

    logger.debug("yaml file loaded. Merging into pypyr context. . .")
    context.update(payload)
    logger.info(f"yaml file merged into pypyr context. Count: {len(payload)}")
    logger.debug("done")
项目:PollBot    作者:mozilla    | 项目源码 | 文件源码
def render_yaml_file(filename):
    with open(os.path.join(HERE, "..", filename)) as stream:
        content = yaml.safe_load(stream)
    return web.json_response(content)
项目:PollBot    作者:mozilla    | 项目源码 | 文件源码
def check_yaml_resource(cli, url, filename):
    with open(os.path.join(HERE, "..", "pollbot", filename)) as stream:
        content = yaml.safe_load(stream)
    resp = await cli.get(url)
    assert await resp.json() == content
项目:PollBot    作者:mozilla    | 项目源码 | 文件源码
def test_oas_spec():
    with open(os.path.join(HERE, "..", "pollbot", "api.yaml"), 'r') as stream:
        oas_spec = yaml.safe_load(stream)
    # example for swagger spec v2.0
    validate_spec(oas_spec)
项目:aetros-cli    作者:aetros    | 项目源码 | 文件源码
def read_config(path = 'aetros.yml', logger=None):
    path = os.path.normpath(os.path.expanduser(path))

    config = {
        'dockerfile': None,
        'command': None,
        'install': None,
        'ignore': None,
        'image': None,
        'server': None,
        'parameters': {},
        'servers': None,
        'before_command': [],
    }

    if os.path.exists(path):
        f = open(path, 'r')

        custom_config = yaml.safe_load(f)
        if custom_config is None:
            custom_config = {}

        if 'storage_dir' in custom_config:
            del custom_config['storage_dir']

        config.update(custom_config)

        logger and logger.debug('Config loaded from ' + os.path.realpath(path))

    if 'parameters' not in config:
        config['parameters'] = {}

    return config
项目:aetros-cli    作者:aetros    | 项目源码 | 文件源码
def main(self, args):
        import aetros.const

        parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter,
            prog=aetros.const.__prog__ + ' run')
        parser.add_argument('name', nargs='?', help="Model name")
        parser.add_argument('--private', action='store_true', help="Make the model private. Example: aetros init my-model --private")

        home_config = read_home_config()
        parsed_args = parser.parse_args(args)
        if not parsed_args.name:
            parser.print_help()
            sys.exit(1)

        if os.path.exists('aetros.yml'):
            config = yaml.safe_load(open('aetros.yml', 'r'))
            if isinstance(config, dict) and 'model' in config:
                print("failed: aetros.yml already exists with a linked model to " + config['model'])
                sys.exit(1)

        name = api.create_model(parsed_args.name or (os.path.basename(os.getcwd())), parsed_args.private)

        with open('aetros.yml', 'w') as f:
            f.write('model: ' + name)

        print("aetros.yml created linked with model " + name + ' in ' + os.getcwd())
        print("Open AETROS Trainer to see the model at https://" + home_config['host'] + '/model/' + name)
项目:whither    作者:Antergos    | 项目源码 | 文件源码
def load_config(self, key: str) -> dict:
        try:
            data = pkg_resources.resource_string(self.load_from, 'whither.yml').decode('utf-8')
            data = self._filter_data(key, data)
            config = yaml.safe_load(data)
            config = config[key]
        except Exception:
            data = open(self.load_from, 'r').read()
            data = self._filter_data(key, data)
            config = yaml.safe_load(data)
            config = config[key]

        return {key: value for key, value in config.items()}
项目:twlived    作者:tausackhn    | 项目源码 | 文件源码
def load(path: str) -> Dict:
    with open(path, 'rt') as file:
        config = yaml_load(file.read())
    validate(config)
    return config
项目:twlived    作者:tausackhn    | 项目源码 | 文件源码
def setup_logging() -> None:
    with open('log_config.yaml', 'r') as file:
        config_dict = yaml_load(file.read())
    logging.config.dictConfig(config_dict)
项目:monitoring-integration    作者:Tendrl    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        self._defs = {}
        super(Definition, self).__init__(*args, **kwargs)

        self.data = pkg_resources.resource_string(
            __name__,
            "monitoring_integration.yaml"
        )
        self._parsed_defs = yaml.safe_load(self.data)
        self.value = "_NS/monitoring/definitions"
项目:monitoring-integration    作者:Tendrl    | 项目源码 | 文件源码
def get_parsed_defs(self):
        if self._parsed_defs:
            return self._parsed_defs
        self._parsed_defs = yaml.safe_load(self.data)
        return self._parsed_defs
项目:InfoBot    作者:gdude2002    | 项目源码 | 文件源码
def __init__(self, *, loop=None, **options):
        super().__init__(loop=loop, **options)

        self.banned_ids = []
        self.config = yaml.safe_load(open("config.yml", "r"))
        self.data_manager = DataManager()
项目:islam-buddy    作者:hamir    | 项目源码 | 文件源码
def load_yaml(self, template_file):
        with open(template_file) as f:
            try:
                return yaml.safe_load(f)
            except yaml.YAMLError as e:
                print(e)
                return []
项目:lightflow    作者:AustralianSynchrotron    | 项目源码 | 文件源码
def set_to_default(self):
        """ Overwrite the configuration with the default configuration. """
        self._config = yaml.safe_load(self.default())
项目:autocert    作者:mozilla-it    | 项目源码 | 文件源码
def task_config():
    '''
    write config.yml -> .config.yml
    '''
    log_level = 'WARNING'
    filename = '{0}/LOG_LEVEL'.format(os.path.dirname(__file__))
    if os.path.isfile(filename):
        log_level = open(filename).read().strip()
    log_level = get_var('LOG_LEVEL', log_level)
    if log_level not in LOG_LEVELS:
        raise UnknownLogLevelError(log_level)
    punch = fmt('''
    logging:
        loggers:
            api:
                level: {log_level}
        handlers:
            console:
                level: {log_level}
    ''')
    return {
        'actions': [
            fmt('echo "cp {CONFIG_YML}\n-> {DOT_CONFIG_YML}"'),
            fmt('echo "setting LOG_LEVEL={log_level}"'),
            fmt('cp {CONFIG_YML} {DOT_CONFIG_YML}'),
            lambda: _update_config(DOT_CONFIG_YML, yaml.safe_load(punch)),
        ]
    }
项目:autocert    作者:mozilla-it    | 项目源码 | 文件源码
def _load_config(cfgs):
    config = {}
    for cfg in cfgs:
        cfg = os.path.expanduser(cfg)
        if os.path.isfile(cfg):
            with open(cfg, 'r') as f:
                yml = yaml.safe_load(f)
                if yml:
                    config.update(yml)
    return AttrDict(config)
项目:autocert    作者:mozilla-it    | 项目源码 | 文件源码
def _load_config(filename=DOT_CONFIG_YML, roundtrip=False, fixup=True):
    cfg = {}
    if os.path.isfile(filename):
        try:
            with open(filename, 'r') as f:
                if roundtrip:
                    cfg = yaml.round_trip_load(f.read())
                else:
                    cfg = yaml.safe_load(f.read())
            if fixup:
                cfg = _fixup(cfg)
        except Exception as ex:
            print('ex =', ex)
            raise ConfigLoadError(filename, errors=[ex])
    return AttrDict(cfg)