Python oslo_serialization.jsonutils 模块,load() 实例源码

我们从Python开源项目中,提取了以下8个代码示例,用于说明如何使用oslo_serialization.jsonutils.load()

项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def get_configuration(self, filename=None):
        """Check the json file for changes and load it if needed."""
        if not filename:
            filename = CONF.scheduler_json_config_location
        if not filename:
            return self.data
        if self.last_checked:
            now = self._get_time_now()
            if now - self.last_checked < datetime.timedelta(minutes=5):
                return self.data

        last_modified = self._get_file_timestamp(filename)
        if (not last_modified or not self.last_modified or
                last_modified > self.last_modified):
            self.data = self._load_file(self._get_file_handle(filename))
            self.last_modified = last_modified
        if not self.data:
            self.data = {}

        return self.data
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        super(JsonFileVendorData, self).__init__(*args, **kwargs)
        data = {}
        fpath = CONF.vendordata_jsonfile_path
        logprefix = "%s[%s]:" % (file_opt.name, fpath)
        if fpath:
            try:
                with open(fpath, "r") as fp:
                    data = jsonutils.load(fp)
            except IOError as e:
                if e.errno == errno.ENOENT:
                    LOG.warning(_LW("%(logprefix)s file does not exist"),
                                {'logprefix': logprefix})
                else:
                    LOG.warning(_LW("%(logprefix)s unexpected IOError when "
                                    "reading"), {'logprefix': logprefix})
                raise e
            except ValueError:
                LOG.warning(_LW("%(logprefix)s failed to load json"),
                            {'logprefix': logprefix})
                raise

        self._data = data
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def run():
    # REVISIT(ivc): current CNI implementation provided by this package is
    # experimental and its primary purpose is to enable development of other
    # components (e.g. functional tests, service/LBaaSv2 support)
    cni_conf = utils.CNIConfig(jsonutils.load(sys.stdin))
    args = ['--config-file', cni_conf.kuryr_conf]

    try:
        if cni_conf.debug:
            args.append('-d')
    except AttributeError:
        pass
    config.init(args)
    config.setup_logging()

    # Initialize o.vo registry.
    k_objects.register_locally_defined_vifs()
    os_vif.initialize()

    if CONF.cni_daemon.daemon_enabled:
        runner = cni_api.CNIDaemonizedRunner()
    else:
        runner = cni_api.CNIStandaloneRunner(K8sCNIPlugin())
    LOG.info("Using '%s' ", runner.__class__.__name__)

    def _timeout(signum, frame):
        runner._write_dict(sys.stdout, {
            'msg': 'timeout',
            'code': k_const.CNI_TIMEOUT_CODE,
        })
        LOG.debug('timed out')
        sys.exit(1)

    signal.signal(signal.SIGALRM, _timeout)
    signal.alarm(_CNI_TIMEOUT)
    status = runner.run(os.environ, cni_conf, sys.stdout)
    LOG.debug("Exiting with status %s", status)
    if status:
        sys.exit(status)
项目:ara    作者:openstack    | 项目源码 | 文件源码
def test_playbook_persistence(self):
        r_playbook = m.Playbook.query.first()
        tmpfile = os.path.join(self.app.config['ARA_TMP_DIR'], 'ara.json')

        with open(tmpfile, 'rb') as file:
            data = jsonutils.load(file)
        self.assertEqual(r_playbook.id, data['playbook']['id'])
项目:ara-archive    作者:dmsimard    | 项目源码 | 文件源码
def test_playbook_persistence(self):
        r_playbook = m.Playbook.query.first()
        tmpfile = os.path.join(self.app.config['ARA_TMP_DIR'], 'ara.json')

        with open(tmpfile, 'rb') as file:
            data = jsonutils.load(file)
        self.assertEqual(r_playbook.id, data['playbook']['id'])
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def _load_file(self, handle):
        """Decode the JSON file. Broken out for testing."""
        try:
            return jsonutils.load(handle)
        except ValueError:
            LOG.exception(_LE("Could not decode scheduler options"))
            return {}
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def _prepare_policy(self):
        policy = jsonutils.load(open(CONF.oslo_policy.policy_file))

        # Convert all actions to require specified role
        for action, rule in six.iteritems(policy):
            policy[action] = 'role:%s' % self.role

        self.policy_dir = self.useFixture(fixtures.TempDir())
        self.policy_file = os.path.join(self.policy_dir.path,
                                            'policy.json')
        with open(self.policy_file, 'w') as f:
            jsonutils.dump(policy, f)
项目:networking-vpp    作者:openstack    | 项目源码 | 文件源码
def smart_config(conf):
    # list existing compute nodes
    json_conf = jsonutils.load(open(conf))

    compute_nodes = json_conf['compute']
    network_controllers = json_conf['network']

    for c in compute_nodes:
        print("[+]\tFound Compute {}".format(c))

    for c in network_controllers:
        print("[+]\tFound Network {}".format(c))

    # create a role and user per compute node and per network node
    for compute_node, param in compute_nodes.items():
        rolename = param['role']
        username = param['username']
        password = param['password']
        create_role(rolename)
        create_user(username, password)
        set_user_role(username, rolename)
        print("[+] creating user '{}', role '{}'".format(
            username, rolename))
        # PERMISSIONS
        set_role_permission(
            rolename,
            "/networking-vpp/nodes/{}/*".format(compute_node), "read")
        set_role_permission(
            rolename,
            "/networking-vpp/state/{}/*".format(compute_node), "readwrite")

    for network_controller, param in network_controllers.items():
        rolename = param['role']
        username = param['username']
        password = param['password']
        create_role(rolename)
        create_user(username, password)
        print("[+] creating user '{}', role '{}'".format(
            username, rolename))

        set_user_role(username, rolename)
        # PERMISSION
        set_role_permission(
            rolename,
            "/networking-vpp/nodes/*", "readwrite")
        set_role_permission(
            rolename,
            "/networking-vpp/state/*", "read")

    if click.confirm('Enable ETCD authentication ?'):
        print("[*] Enabling ETCD authentication")
        enable_authentication()