Python oslo_config.cfg 模块,StrOpt() 实例源码

我们从Python开源项目中,提取了以下31个代码示例,用于说明如何使用oslo_config.cfg.StrOpt()

项目:monasca-transform    作者:openstack    | 项目源码 | 文件源码
def load_repositories_options():
        repo_opts = [
            cfg.StrOpt(
                'offsets',
                default='monasca_transform.offset_specs:JSONOffsetSpecs',
                help='Repository for offset persistence'
            ),
            cfg.StrOpt(
                'data_driven_specs',
                default='monasca_transform.data_driven_specs.'
                        'json_data_driven_specs_repo:JSONDataDrivenSpecsRepo',
                help='Repository for metric and event data_driven_specs'
            ),
            cfg.IntOpt('offsets_max_revisions', default=10,
                       help="Max revisions of offsets for each application")
        ]
        repo_group = cfg.OptGroup(name='repositories', title='repositories')
        cfg.CONF.register_group(repo_group)
        cfg.CONF.register_opts(repo_opts, group=repo_group)
项目:monasca-transform    作者:openstack    | 项目源码 | 文件源码
def load_messaging_options():
        messaging_options = [
            cfg.StrOpt('adapter',
                       default='monasca_transform.messaging.adapter:'
                       'KafkaMessageAdapter',
                       help='Message adapter implementation'),
            cfg.StrOpt('topic', default='metrics',
                       help='Messaging topic'),
            cfg.StrOpt('brokers',
                       default='192.168.10.4:9092',
                       help='Messaging brokers'),
            cfg.StrOpt('publish_kafka_project_id',
                       default='111111',
                       help='publish aggregated metrics tenant'),
            cfg.StrOpt('adapter_pre_hourly',
                       default='monasca_transform.messaging.adapter:'
                       'KafkaMessageAdapterPreHourly',
                       help='Message adapter implementation'),
            cfg.StrOpt('topic_pre_hourly', default='metrics_pre_hourly',
                       help='Messaging topic pre hourly')
        ]
        messaging_group = cfg.OptGroup(name='messaging', title='messaging')
        cfg.CONF.register_group(messaging_group)
        cfg.CONF.register_opts(messaging_options, group=messaging_group)
项目:monasca-transform    作者:openstack    | 项目源码 | 文件源码
def load_pre_hourly_processor_options():
        app_opts = [
            cfg.IntOpt('late_metric_slack_time', default=600),
            cfg.StrOpt('data_provider',
                       default='monasca_transform.processor.'
                               'pre_hourly_processor:'
                               'PreHourlyProcessorDataProvider'),
            cfg.BoolOpt('enable_instance_usage_df_cache'),
            cfg.StrOpt('instance_usage_df_cache_storage_level'),
            cfg.BoolOpt('enable_batch_time_filtering'),
            cfg.IntOpt('effective_batch_revision', default=2)
        ]
        app_group = cfg.OptGroup(name='pre_hourly_processor',
                                 title='pre_hourly_processor')
        cfg.CONF.register_group(app_group)
        cfg.CONF.register_opts(app_opts, group=app_group)
项目:panko    作者:openstack    | 项目源码 | 文件源码
def list_opts():
    return [
        ('DEFAULT',
         [
             # FIXME(jd) Move to [api]
             cfg.StrOpt('api_paste_config',
                        default="api_paste.ini",
                        help="Configuration file for WSGI definition of API."),
         ]),
        ('api',
         [
             cfg.IntOpt('default_api_return_limit',
                        min=1,
                        default=100,
                        help='Default maximum number of '
                        'items returned by API request.'),
         ]),
        ('database', panko.storage.OPTS),
        ('storage', STORAGE_OPTS),
    ]
项目:coverage2sql    作者:openstack    | 项目源码 | 文件源码
def setUp(self):
        super(SqliteConfFixture, self).setUp()

        self.register_opt(cfg.StrOpt('connection', default=''),
                          group='database')
        self.register_opt(cfg.IntOpt('max_pool_size', default=None),
                          group='database')
        self.register_opt(cfg.IntOpt('idle_timeout', default=None),
                          group='database')
        self.register_opts(cli.MIGRATION_OPTS)
        self.url = db_test_utils.get_connect_string("sqlite")
        self.set_default('connection', self.url, group='database')
        self.set_default('disable_microsecond_data_migration', False)
        lockutils.set_defaults(lock_path='/tmp')
        self._drop_db()
        self.addCleanup(self.cleanup)
项目:valet    作者:openstack    | 项目源码 | 文件源码
def _register_opts(self):
        """Register oslo.config options"""
        opts = []
        option = cfg.StrOpt('url', default=None,
                            help=_('API endpoint url'))
        opts.append(option)
        option = cfg.IntOpt('read_timeout', default=5,
                            help=_('API read timeout in seconds'))
        opts.append(option)
        option = cfg.IntOpt('retries', default=3,
                            help=_('API request retries'))
        opts.append(option)

        opt_group = cfg.OptGroup('valet')
        CONF.register_group(opt_group)
        CONF.register_opts(opts, group=opt_group)
项目:Assistant-for-Software-Defined-Infrastructure    作者:shank7485    | 项目源码 | 文件源码
def __init__(self, conf_path):
        self.conf_path = conf_path

        self.opt_group = cfg.OptGroup(name='endpoint',
                                 title='Get the endpoints for keystone')

        self.endpoint_opts = [cfg.StrOpt('endpoint', default='None',
            help=('URL or IP address where OpenStack keystone runs.'))
        ]

        CONF = cfg.CONF
        CONF.register_group(self.opt_group)
        CONF.register_opts(self.endpoint_opts, self.opt_group)

        CONF(default_config_files=[self.conf_path])
        self.AUTH_ENDPOINT = CONF.endpoint.endpoint
项目:valet    作者:att-comdev    | 项目源码 | 文件源码
def _register_opts(self):
        '''Register Options'''
        opts = []
        option = cfg.StrOpt(self.opt_failure_mode_str, choices=['reject', 'yield'], default='reject',
                            help=_('Mode to operate in if Valet planning fails for any reason.'))
        opts.append(option)
        option = cfg.StrOpt(self.opt_project_name_str, default=None, help=_('Valet Project Name'))
        opts.append(option)
        option = cfg.StrOpt(self.opt_username_str, default=None, help=_('Valet Username'))
        opts.append(option)
        option = cfg.StrOpt(self.opt_password_str, default=None, help=_('Valet Password'))
        opts.append(option)
        option = cfg.StrOpt(self.opt_auth_uri_str, default=None, help=_('Keystone Authorization API Endpoint'))
        opts.append(option)

        opt_group = cfg.OptGroup(self.opt_group_str)
        cfg.CONF.register_group(opt_group)
        cfg.CONF.register_opts(opts, group=opt_group)

    # TODO(JD): Factor out common code between this and the cinder filter
项目:kozinaki    作者:compunova    | 项目源码 | 文件源码
def load_config(self):
        """Load config options from nova config file or command line (for example: /etc/nova/nova.conf)

        Sample settings in nova config:
            [kozinaki_EC2]
            user=AKIAJR7NAEIZPWSTFBEQ
            key=zv9zSem8OE+k/axFkPCgZ3z3tLrhvFBaIIa0Ik0j
        """

        provider_opts = [
            cfg.StrOpt('aws_secret_access_key', help='AWS secret key', secret=True),
            cfg.StrOpt('aws_access_key_id', help='AWS access key id', secret=True),
            cfg.StrOpt('region', help='AWS region name'),
        ]

        cfg.CONF.register_opts(provider_opts, self.config_name)
        return cfg.CONF[self.config_name]
项目:kozinaki    作者:compunova    | 项目源码 | 文件源码
def load_config(self):
        """Load config options from nova config file or command line (for example: /etc/nova/nova.conf)

        Sample settings in nova config:
            [kozinaki_EC2]
            user=AKIAJR7NAEIZPWSTFBEQ
            key=zv9zSem8OE+k/axFkPCgZ3z3tLrhvFBaIIa0Ik0j
        """

        provider_opts = [
            cfg.StrOpt('path_to_json_token', help='Google API json token file', secret=True),
            cfg.StrOpt('project', help='Google project id'),
            cfg.StrOpt('zone', help='Google zone name'),
        ]

        cfg.CONF.register_opts(provider_opts, self.config_name)
        return cfg.CONF[self.config_name]
项目:kozinaki    作者:compunova    | 项目源码 | 文件源码
def load_config(self):
        """Load config options from nova config file or command line (for example: /etc/nova/nova.conf)

        Sample settings in nova config:
            [kozinaki_EC2]
            user=AKIAJR7NAEIZPWSTFBEQ
            key=zv9zSem8OE+k/axFkPCgZ3z3tLrhvFBaIIa0Ik0j
        """

        provider_cls = get_libcloud_driver(getattr(Provider, self.provider_name))
        provider_cls_info = inspect.getargspec(provider_cls.__init__)

        provider_opts = [cfg.StrOpt(arg) for arg in provider_cls_info.args]
        provider_opts.append(cfg.StrOpt('location'))
        provider_opts.append(cfg.StrOpt('root_password'))
        provider_opts.append(cfg.StrOpt('project_id'))

        cfg.CONF.register_opts(provider_opts, self.config_name)
        return cfg.CONF[self.config_name]
项目:monasca-transform    作者:openstack    | 项目源码 | 文件源码
def load_database_options():
        db_opts = [
            cfg.StrOpt('server_type'),
            cfg.StrOpt('host'),
            cfg.StrOpt('database_name'),
            cfg.StrOpt('username'),
            cfg.StrOpt('password'),
            cfg.BoolOpt('use_ssl', default=False),
            cfg.StrOpt('ca_file')
        ]
        mysql_group = cfg.OptGroup(name='database', title='database')
        cfg.CONF.register_group(mysql_group)
        cfg.CONF.register_opts(db_opts, group=mysql_group)
项目:monasca-transform    作者:openstack    | 项目源码 | 文件源码
def load_service_options():
        service_opts = [
            cfg.StrOpt('coordinator_address'),
            cfg.StrOpt('coordinator_group'),
            cfg.FloatOpt('election_polling_frequency'),
            cfg.BoolOpt('enable_debug_log_entries', default='false'),
            cfg.StrOpt('setup_file'),
            cfg.StrOpt('setup_target'),
            cfg.StrOpt('spark_driver'),
            cfg.StrOpt('service_log_path'),
            cfg.StrOpt('service_log_filename',
                       default='monasca-transform.log'),
            cfg.StrOpt('spark_event_logging_dest'),
            cfg.StrOpt('spark_event_logging_enabled'),
            cfg.StrOpt('spark_jars_list'),
            cfg.StrOpt('spark_master_list'),
            cfg.StrOpt('spark_python_files'),
            cfg.IntOpt('stream_interval'),
            cfg.StrOpt('work_dir'),
            cfg.StrOpt('spark_home'),
            cfg.BoolOpt('enable_record_store_df_cache'),
            cfg.StrOpt('record_store_df_cache_storage_level')
        ]
        service_group = cfg.OptGroup(name='service', title='service')
        cfg.CONF.register_group(service_group)
        cfg.CONF.register_opts(service_opts, group=service_group)
项目:kolla-kubernetes-personal    作者:rthallisey    | 项目源码 | 文件源码
def get_source_opts(type_=None, location=None, reference=None):
    return [cfg.StrOpt('type', choices=['local', 'git', 'url'],
                       default=type_,
                       help='Source location type'),
            cfg.StrOpt('location', default=location,
                       help='The location for source install'),
            cfg.StrOpt('reference', default=reference,
                       help=('Git reference to pull, commit sha, tag '
                             'or branch name'))]
项目:freezer-dr    作者:openstack    | 项目源码 | 文件源码
def __metric_opts(self):
        """List of options to be used in metric defined sections"""
        return [
            cfg.StrOpt("metric_name",
                       help="Metric Name used to log monitoring information"
                            " in Monasca",
                       required=True),
            cfg.DictOpt("dimensions",
                        default={},
                        help="Dict that contains dimensions information. "
                             "component:nova-compute,service:compute",
                        ),
            cfg.StrOpt("aggregate",
                       choices=["any", "all"],
                       help="How to consider the compute node is down. If you "
                            "metric reports many states, like checking "
                            "different services on the compute host, should we"
                            " consider if one component down all are down or"
                            " only if all components are down. Default is all."
                            " This means if all components fail, freezer-dr"
                            " will consider the host failed",
                       default='all'
                       ),
            cfg.StrOpt("undetermined",
                       choices=['OK', 'ALARM'],
                       default='ALARM',
                       help="How to handle UNDETERMINED states. It can be "
                            "ignored, will be considered OK state or can be "
                            "considered ALARM. Default is ALARM")

        ]
项目:coverage2sql    作者:openstack    | 项目源码 | 文件源码
def setUp(self):
        super(MySQLConfFixture, self).setUp()
        self.register_opt(cfg.IntOpt('max_pool_size', default=20),
                          group='database')
        self.register_opt(cfg.IntOpt('idle_timeout', default=3600),
                          group='database')
        self.register_opt(cfg.StrOpt('connection', default=''),
                          group='database')
        self.url = db_test_utils.get_connect_string("mysql")
        self.set_default('connection', self.url, group='database')
        lockutils.set_defaults(lock_path='/tmp')
        self._drop_db()
项目:coverage2sql    作者:openstack    | 项目源码 | 文件源码
def setUp(self):
        super(PostgresConfFixture, self).setUp()
        self.register_opt(cfg.StrOpt('connection', default=''),
                          group='database')
        self.register_opt(cfg.IntOpt('max_pool_size', default=20),
                          group='database')
        self.register_opt(cfg.IntOpt('idle_timeout', default=3600),
                          group='database')
        self.register_opts(cli.MIGRATION_OPTS)
        self.url = db_test_utils.get_connect_string("postgres")
        self.set_default('connection', self.url, group='database')
        self.set_default('disable_microsecond_data_migration', False)
        lockutils.set_defaults(lock_path='/tmp')
        self._drop_db()
项目:synergy-scheduler-manager    作者:openstack    | 项目源码 | 文件源码
def __init__(self):
        super(QueueManager, self).__init__(name="QueueManager")

        self.config_opts = [
            cfg.StrOpt("db_connection", help="the DB url", required=True),
            cfg.IntOpt('db_pool_size', default=10, required=False),
            cfg.IntOpt('db_pool_recycle', default=30, required=False),
            cfg.IntOpt('db_max_overflow', default=5, required=False)
        ]
        self.queues = {}
项目:synergy-scheduler-manager    作者:openstack    | 项目源码 | 文件源码
def __init__(self):
        super(ProjectManager, self).__init__(name="ProjectManager")

        self.config_opts = [
            cfg.IntOpt("default_TTL", default=1440, required=False),
            cfg.FloatOpt("default_share", default=10.0, required=False),
            cfg.StrOpt("db_connection", help="the DB url", required=True),
            cfg.IntOpt('db_pool_size', default=10, required=False),
            cfg.IntOpt('db_pool_recycle', default=30, required=False),
            cfg.IntOpt('db_max_overflow', default=5, required=False)
        ]

        self.projects = {}
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def get_options(self):
        options = super(TokenEndpoint, self).get_options()

        options.extend([
            # Maintain name 'url' for compatibility
            cfg.StrOpt('url',
                       help='Specific service endpoint to use'),
            cfg.StrOpt('token',
                       secret=True,
                       help='Authentication token to use'),
        ])

        return options
项目:valet    作者:openstack    | 项目源码 | 文件源码
def logger_conf(logger_name):
    return [
        cfg.StrOpt('output_format',
                   default="%(asctime)s - %(levelname)s - %(message)s"),
        cfg.BoolOpt('store', default=True),
        cfg.StrOpt('logging_level', default='debug'),
        cfg.StrOpt('logging_dir', default='/var/log/valet/'),
        cfg.StrOpt('logger_name', default=logger_name + ".log"),
        cfg.IntOpt('max_main_log_size', default=5000000),
        cfg.IntOpt('max_log_size', default=1000000),
        cfg.IntOpt('max_num_of_logs', default=3),
    ]
项目:valet    作者:openstack    | 项目源码 | 文件源码
def _register_opts(self):
        """Register additional options specific to this filter plugin"""
        opts = []
        option = cfg.StrOpt('failure_mode',
                            choices=['reject', 'yield'], default='reject',
                            help=_('Mode to operate in if Valet '
                                   'planning fails for any reason.'))

        # In the filter plugin space, there's no access to Nova's
        # keystone credentials, so we have to specify our own.
        # This also means we can't act as the user making the request
        # at scheduling-time.
        opts.append(option)
        option = cfg.StrOpt('admin_tenant_name', default=None,
                            help=_('Valet Project Name'))
        opts.append(option)
        option = cfg.StrOpt('admin_username', default=None,
                            help=_('Valet Username'))
        opts.append(option)
        option = cfg.StrOpt('admin_password', default=None,
                            help=_('Valet Password'))
        opts.append(option)
        option = cfg.StrOpt('admin_auth_url', default=None,
                            help=_('Keystone Authorization API Endpoint'))
        opts.append(option)

        opt_group = cfg.OptGroup('valet')
        cfg.CONF.register_group(opt_group)
        cfg.CONF.register_opts(opts, group=opt_group)
项目:deb-oslotest    作者:openstack    | 项目源码 | 文件源码
def get_config_parser():
    conf = cfg.ConfigOpts()
    conf.register_cli_opt(
        cfg.StrOpt(
            'repo_root',
            default='.',
            help='directory containing the git repositories',
        )
    )
    return conf
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def injector():
    conf = cfg.ConfigOpts()
    conf.register_cli_opts([
        cfg.IntOpt("metrics", default=1, min=1),
        cfg.StrOpt("archive-policy-name", default="low"),
        cfg.StrOpt("creator", default="admin"),
        cfg.IntOpt("batch-of-measures", default=1000),
        cfg.IntOpt("measures-per-batch", default=10),
    ])
    conf = service.prepare_service(conf=conf)
    index = indexer.get_driver(conf)
    instore = incoming.get_driver(conf)

    def todo():
        metric = index.create_metric(
            uuid.uuid4(),
            creator=conf.creator,
            archive_policy_name=conf.archive_policy_name)

        for _ in six.moves.range(conf.batch_of_measures):
            measures = [
                incoming.Measure(
                    utils.dt_in_unix_ns(utils.utcnow()), random.random())
                for __ in six.moves.range(conf.measures_per_batch)]
            instore.add_measures(metric, measures)

    with futures.ThreadPoolExecutor(max_workers=conf.metrics) as executor:
        for m in six.moves.range(conf.metrics):
            executor.submit(todo)
项目:murano-plugin-networking-sfc    作者:openstack    | 项目源码 | 文件源码
def init_config(conf):
    opts = [
        cfg.StrOpt('endpoint_type', default='publicURL')
    ]
    conf.register_opts(opts, group="networking_sfc")
    return conf.networking_sfc
项目:valet    作者:att-comdev    | 项目源码 | 文件源码
def logger_conf(logger_name):
    return [
        cfg.StrOpt('output_format', default="%(asctime)s - %(levelname)s - %(message)s"),  # dict
        cfg.BoolOpt('store', default=True),
        cfg.StrOpt('logging_level', default='debug'),
        cfg.StrOpt('logging_dir', default='/var/log/valet/'),
        cfg.StrOpt('logger_name', default=logger_name + ".log"),
        cfg.IntOpt('max_main_log_size', default=5000000),
        cfg.IntOpt('max_log_size', default=1000000),
        cfg.IntOpt('max_num_of_logs', default=3),
    ]
项目:valet    作者:att-comdev    | 项目源码 | 文件源码
def _register_opts(self):
        '''Register options'''
        opts = []
        option = cfg.StrOpt(self.opt_name_str, default=None, help=_('Valet API endpoint'))
        opts.append(option)
        option = cfg.IntOpt(self.opt_conn_timeout, default=3, help=_('Valet Plugin Connect Timeout'))
        opts.append(option)
        option = cfg.IntOpt(self.opt_read_timeout, default=5, help=_('Valet Plugin Read Timeout'))
        opts.append(option)

        opt_group = cfg.OptGroup(self.opt_group_str)
        cfg.CONF.register_group(opt_group)
        cfg.CONF.register_opts(opts, group=opt_group)

    # TODO(JD): Keep stack param for now. We may need it again.
项目:kolla    作者:sieve-microservices    | 项目源码 | 文件源码
def get_source_opts(type_=None, location=None, reference=None):
    return [cfg.StrOpt('type', choices=['local', 'git', 'url'],
                       default=type_,
                       help='Source location type'),
            cfg.StrOpt('location', default=location,
                       help='The location for source install'),
            cfg.StrOpt('reference', default=reference,
                       help=('Git reference to pull, commit sha, tag '
                             'or branch name'))]
项目:kolla    作者:sieve-microservices    | 项目源码 | 文件源码
def get_user_opts(uid, gid):
    return [
        cfg.StrOpt('uid', default=uid, help='The user id'),
        cfg.StrOpt('gid', default=gid, help='The group id'),
    ]
项目:kozinaki    作者:compunova    | 项目源码 | 文件源码
def load_config(self):
        """Load config options from nova config file or command line (for example: /etc/nova/nova.conf)

        Sample settings in nova config:
            [kozinaki_EC2]
            user=AKIAJR7NAEIZPWSTFBEQ
            key=zv9zSem8OE+k/axFkPCgZ3z3tLrhvFBaIIa0Ik0j
        """

        provider_opts = [
            cfg.StrOpt('subscription_id', help='Subscribe is from azure portal settings'),
            cfg.StrOpt('key_file', help='API key to work with the cloud provider', secret=True),
            cfg.StrOpt('username', help='Default vm username'),
            cfg.StrOpt('password', help='Azure: default instance password. '
                                        'Password must be 6-72 characters long'),
            cfg.StrOpt('app_client_id', help='Azure app client id'),
            cfg.StrOpt('app_secret', help='Azure app secret'),
            cfg.StrOpt('app_tenant', help='Azure app tenant'),
            cfg.StrOpt('resource_group_name', help='Azure resource group name'),
            cfg.StrOpt('location', help='VM location'),
            cfg.StrOpt('storage_account_name', help='Azure storage account name'),
            cfg.StrOpt('os_disk_name', help='VM default disk name'),
            cfg.StrOpt('vnet_name', help='Azure default virtual network'),
            cfg.StrOpt('subnet_name', help='Azure default subnet name'),
            cfg.StrOpt('ip_config_name', help='Azure default ip config name'),
            cfg.StrOpt('nic_name', help='Azure default nic name'),
            cfg.StrOpt('cloud_service_name', help='Azure default cloud service name'),
            cfg.StrOpt('deployment_name', help='Azure default deployment name'),
        ]

        cfg.CONF.register_opts(provider_opts, self.config_name)
        return cfg.CONF[self.config_name]
项目:synergy-scheduler-manager    作者:openstack    | 项目源码 | 文件源码
def __init__(self):
        super(KeystoneManager, self).__init__("KeystoneManager")

        self.config_opts = [
            cfg.StrOpt("auth_url",
                       help="the Keystone url (v3 only)",
                       required=True),
            cfg.StrOpt("username",
                       help="the name of user with admin role",
                       required=True),
            cfg.StrOpt("user_domain_name",
                       help="the user domain",
                       default="default",
                       required=False),
            cfg.StrOpt("password",
                       help="the password of user with admin role",
                       required=True),
            cfg.StrOpt("project_name",
                       help="the project to request authorization on",
                       required=True),
            cfg.StrOpt("project_domain_name",
                       help="the project domain",
                       default="default",
                       required=False),
            cfg.StrOpt("project_id",
                       help="the project id to request authorization on",
                       required=False),
            cfg.IntOpt("timeout",
                       help="set the http connection timeout",
                       default=60,
                       required=False),
            cfg.IntOpt("clock_skew",
                       help="set the clock skew (seconds)",
                       default=60,
                       required=False),
            cfg.StrOpt("ssl_ca_file",
                       help="set the PEM encoded Certificate Authority to "
                            "use when verifying HTTPs connections",
                       default=None,
                       required=False),
            cfg.StrOpt("ssl_cert_file",
                       help="set the SSL client certificate (PEM encoded)",
                       default=None,
                       required=False),
            cfg.StrOpt("amqp_url",
                       help="the amqp transport url",
                       default=None,
                       required=True),
            cfg.StrOpt("amqp_exchange",
                       help="the amqp exchange",
                       default="keystone",
                       required=False),
            cfg.StrOpt("amqp_topic",
                       help="the notification topic",
                       default="keystone_notification",
                       required=False)
        ]