Python amulet 模块,FAIL 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用amulet.FAIL

项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def test_100_services(self):
        """Verify the expected services are running on the corresponding
           service units."""
        services = {
            self.cinder_sentry: ['cinder-scheduler',
                                 'cinder-volume']
        }
        if self._get_openstack_release() >= self.xenial_ocata:
            services.update({self.cinder_sentry: ['apache2']})
        else:
            services.update({self.cinder_sentry: ['cinder-api']})

        for i in range(0, self.keystone_num_units):
            services.update({self.keystone_sentries[i]: self.init_services})

        ret = u.validate_services_by_name(services)
        if ret:
            amulet.raise_status(amulet.FAIL, msg=ret)
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def validate_keystone_tenants(self, client):
        """Verify all existing tenants."""
        u.log.debug('Checking keystone tenants...')
        expected = [
            {'name': 'services',
             'enabled': True,
             'description': 'Created by Juju',
             'id': u.not_null},
            {'name': 'demoTenant',
             'enabled': True,
             'description': 'demo tenant',
             'id': u.not_null},
            {'name': 'admin',
             'enabled': True,
             'description': 'Created by Juju',
             'id': u.not_null}
        ]
        if self.keystone_api_version == 2:
            actual = client.tenants.list()
        else:
            actual = client.projects.list()

        ret = u.validate_tenant_data(expected, actual)
        if ret:
            amulet.raise_status(amulet.FAIL, msg=ret)
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def test_138_service_catalog(self):
        """Verify that the service catalog endpoint data is valid."""
        u.log.debug('Checking keystone service catalog...')
        self.set_api_version(2)
        endpoint_check = {
            'adminURL': u.valid_url,
            'id': u.not_null,
            'region': 'RegionOne',
            'publicURL': u.valid_url,
            'internalURL': u.valid_url
        }
        expected = {
            'volume': [endpoint_check],
            'identity': [endpoint_check]
        }
        if self._get_openstack_release() >= self.xenial_pike:
            expected.pop('volume')
            expected['volumev2'] = [endpoint_check]
        actual = self.keystone_v2.service_catalog.get_endpoints()

        ret = u.validate_svc_catalog_endpoint_data(expected, actual)
        if ret:
            amulet.raise_status(amulet.FAIL, msg=ret)
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def test_140_keystone_endpoint(self):
        """Verify the keystone endpoint data."""
        u.log.debug('Checking keystone api endpoint data...')
        endpoints = self.keystone_v2.endpoints.list()
        admin_port = '35357'
        internal_port = public_port = '5000'
        expected = {
            'id': u.not_null,
            'region': 'RegionOne',
            'adminurl': u.valid_url,
            'internalurl': u.valid_url,
            'publicurl': u.valid_url,
            'service_id': u.not_null
        }
        ret = u.validate_endpoint_data(endpoints, admin_port, internal_port,
                                       public_port, expected)
        if ret:
            amulet.raise_status(amulet.FAIL,
                                msg='keystone endpoint: {}'.format(ret))
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def test_142_cinder_endpoint(self):
        """Verify the cinder endpoint data."""
        u.log.debug('Checking cinder endpoint...')
        endpoints = self.keystone_v2.endpoints.list()
        admin_port = internal_port = public_port = '8776'
        expected = {
            'id': u.not_null,
            'region': 'RegionOne',
            'adminurl': u.valid_url,
            'internalurl': u.valid_url,
            'publicurl': u.valid_url,
            'service_id': u.not_null
        }

        ret = u.validate_endpoint_data(endpoints, admin_port, internal_port,
                                       public_port, expected)
        if ret:
            amulet.raise_status(amulet.FAIL,
                                msg='cinder endpoint: {}'.format(ret))
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def test_202_keystone_cinder_identity_service_relation(self):
        """Verify the keystone identity-service relation data"""
        u.log.debug('Checking keystone to cinder id relation data...')
        relation = ['identity-service', 'cinder:identity-service']
        expected = {
            'service_protocol': 'http',
            'service_tenant': 'services',
            'admin_token': 'ubuntutesting',
            'service_password': u.not_null,
            'service_port': '5000',
            'auth_port': '35357',
            'auth_protocol': 'http',
            'private-address': u.valid_ip,
            'auth_host': u.valid_ip,
            'service_username': 'cinder_cinderv2',
            'service_tenant_id': u.not_null,
            'service_host': u.valid_ip
        }
        if self._get_openstack_release() >= self.xenial_pike:
            expected['service_username'] = 'cinderv3_cinderv2'
        for unit in self.keystone_sentries:
            ret = u.validate_relation_data(unit, relation, expected)
            if ret:
                message = u.relation_error('keystone identity-service', ret)
                amulet.raise_status(amulet.FAIL, msg=message)
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def test_302_keystone_logging_config(self):
        """Verify the data in the keystone logging config file"""
        u.log.debug('Checking keystone config file...')
        expected = {
            'logger_root': {
                'level': 'WARNING',
                'handlers': 'file,production',
            },
            'handlers': {
                'keys': 'production,file,devel'
            },
            'handler_file': {
                'level': 'DEBUG',
                'args': "('{}', 'a')".format(self.log_file)
            }
        }

        for unit in self.keystone_sentries:
            for section, pairs in expected.iteritems():
                ret = u.validate_config_data(unit, self.logging_config,
                                             section, pairs)
                if ret:
                    message = "keystone logging config error: {}".format(ret)
                    amulet.raise_status(amulet.FAIL, msg=message)
项目:charm-ceph-mon    作者:openstack    | 项目源码 | 文件源码
def test_100_ceph_processes(self):
        """Verify that the expected service processes are running
        on each ceph unit."""

        # Process name and quantity of processes to expect on each unit
        ceph_processes = {
            'ceph-mon': 1
        }

        # Units with process names and PID quantities expected
        expected_processes = {
            self.ceph0_sentry: ceph_processes,
            self.ceph1_sentry: ceph_processes,
            self.ceph2_sentry: ceph_processes
        }

        actual_pids = u.get_unit_process_ids(expected_processes)
        ret = u.validate_unit_process_ids(expected_processes, actual_pids)
        if ret:
            amulet.raise_status(amulet.FAIL, msg=ret)
项目:charm-ceph-mon    作者:openstack    | 项目源码 | 文件源码
def test_306_nova_rbd_config(self):
        """Verify the nova config file data regarding ceph."""
        u.log.debug('Checking nova (rbd) config file data...')
        unit = self.nova_sentry
        conf = '/etc/nova/nova.conf'
        expected = {
            'libvirt': {
                'rbd_user': 'nova-compute',
                'rbd_secret_uuid': u.not_null
            }
        }
        for section, pairs in expected.iteritems():
            ret = u.validate_config_data(unit, conf, section, pairs)
            if ret:
                message = "nova (rbd) config error: {}".format(ret)
                amulet.raise_status(amulet.FAIL, msg=message)
项目:charm-ceph-mon    作者:openstack    | 项目源码 | 文件源码
def test_402_pause_resume_actions(self):
        """Veryfy that pause/resume works"""
        u.log.debug("Testing pause")
        cmd = "ceph -s"

        sentry_unit = self.ceph0_sentry
        action_id = u.run_action(sentry_unit, 'pause-health')
        assert u.wait_on_action(action_id), "Pause health action failed."

        output, code = sentry_unit.run(cmd)
        if 'nodown' not in output or 'noout' not in output:
            amulet.raise_status(amulet.FAIL, msg="Missing noout,nodown")

        u.log.debug("Testing resume")
        action_id = u.run_action(sentry_unit, 'resume-health')
        assert u.wait_on_action(action_id), "Resume health action failed."

        output, code = sentry_unit.run(cmd)
        if 'nodown' in output or 'noout' in output:
            amulet.raise_status(amulet.FAIL, msg="Still has noout,nodown")
项目:charm-ceph-mon    作者:openstack    | 项目源码 | 文件源码
def find_pool(sentry_unit, pool_name):
        """
        This will do a ceph osd dump and search for pool you specify
        :param sentry_unit: The unit to run this command from.
        :param pool_name: str.  The name of the Ceph pool to query
        :return: str or None.  The ceph pool or None if not found
        """
        output, dump_code = sentry_unit.run("ceph osd dump")
        if dump_code is not 0:
            amulet.raise_status(
                amulet.FAIL,
                msg="ceph osd dump failed with output: {}".format(
                    output))
        for line in output.split('\n'):
            match = re.search(r"pool\s+\d+\s+'(?P<pool_name>.*)'", line)
            if match:
                name = match.group('pool_name')
                if name == pool_name:
                    return line
        return None
项目:charm-ceph-mon    作者:openstack    | 项目源码 | 文件源码
def test_499_ceph_cmds_exit_zero(self):
        """Check basic functionality of ceph cli commands against
        all ceph units."""
        sentry_units = [
            self.ceph0_sentry,
            self.ceph1_sentry,
            self.ceph2_sentry
        ]
        commands = [
            'sudo ceph health',
            'sudo ceph mds stat',
            'sudo ceph pg stat',
            'sudo ceph osd stat',
            'sudo ceph mon stat',
        ]
        ret = u.check_commands_on_units(commands, sentry_units)
        if ret:
            amulet.raise_status(amulet.FAIL, msg=ret)

            # FYI: No restart check as ceph services do not restart
            # when charm config changes, unless monitor count increases.
项目:charm-odl-controller    作者:openstack    | 项目源码 | 文件源码
def test_102_service_catalog(self):
        """Verify that the service catalog endpoint data is valid."""
        u.log.debug('Checking keystone service catalog...')
        endpoint_check = {
            'adminURL': u.valid_url,
            'id': u.not_null,
            'region': 'RegionOne',
            'publicURL': u.valid_url,
            'internalURL': u.valid_url
        }
        expected = {
            'network': [endpoint_check],
            'compute': [endpoint_check],
            'identity': [endpoint_check]
        }
        actual = self.keystone.service_catalog.get_endpoints()

        ret = u.validate_svc_catalog_endpoint_data(expected, actual)
        if ret:
            amulet.raise_status(amulet.FAIL, msg=ret)
项目:charm-odl-controller    作者:openstack    | 项目源码 | 文件源码
def test_104_network_endpoint(self):
        """Verify the neutron network endpoint data."""
        u.log.debug('Checking neutron network api endpoint data...')
        endpoints = self.keystone.endpoints.list()
        admin_port = internal_port = public_port = '9696'
        expected = {
            'id': u.not_null,
            'region': 'RegionOne',
            'adminurl': u.valid_url,
            'internalurl': u.valid_url,
            'publicurl': u.valid_url,
            'service_id': u.not_null
        }
        ret = u.validate_endpoint_data(endpoints, admin_port, internal_port,
                                       public_port, expected)

        if ret:
            amulet.raise_status(amulet.FAIL,
                                msg='glance endpoint: {}'.format(ret))
项目:charm-odl-controller    作者:openstack    | 项目源码 | 文件源码
def test_200_odl_controller_controller_api_relation(self):
        """Verify the odl-controller to neutron-api-odl relation data"""
        u.log.debug('Checking odl-controller to neutron-api-odl relation data')
        unit = self.odl_controller_sentry
        relation = ['controller-api', 'neutron-api-odl:odl-controller']
        expected = {
            'private-address': u.valid_ip,
            'username': 'admin',
            'password': 'admin',
            'port': '8080',
        }

        ret = u.validate_relation_data(unit, relation, expected)
        if ret:
            message = u.relation_error('odl-controller controller-api', ret)
            amulet.raise_status(amulet.FAIL, msg=message)
项目:equlipse    作者:konono    | 项目源码 | 文件源码
def test_100_services(self):
        """Verify the expected services are running on the corresponding
           service units."""
        services = {
            self.cinder_sentry: ['cinder-scheduler',
                                 'cinder-volume']
        }
        if self._get_openstack_release() >= self.xenial_ocata:
            services.update({self.cinder_sentry: ['apache2']})
        else:
            services.update({self.cinder_sentry: ['cinder-api']})

        if self.is_liberty_or_newer():
            for i in range(0, self.keystone_num_units):
                services.update({self.keystone_sentries[i]: ['apache2']})
        else:
            services.update({self.keystone_sentries[0]: ['keystone']})

        ret = u.validate_services_by_name(services)
        if ret:
            amulet.raise_status(amulet.FAIL, msg=ret)
项目:equlipse    作者:konono    | 项目源码 | 文件源码
def validate_keystone_tenants(self, client):
        """Verify all existing tenants."""
        u.log.debug('Checking keystone tenants...')
        expected = [
            {'name': 'services',
             'enabled': True,
             'description': 'Created by Juju',
             'id': u.not_null},
            {'name': 'demoTenant',
             'enabled': True,
             'description': 'demo tenant',
             'id': u.not_null},
            {'name': 'admin',
             'enabled': True,
             'description': 'Created by Juju',
             'id': u.not_null}
        ]
        if self.keystone_api_version == 2:
            actual = client.tenants.list()
        else:
            actual = client.projects.list()

        ret = u.validate_tenant_data(expected, actual)
        if ret:
            amulet.raise_status(amulet.FAIL, msg=ret)
项目:equlipse    作者:konono    | 项目源码 | 文件源码
def test_138_service_catalog(self):
        """Verify that the service catalog endpoint data is valid."""
        u.log.debug('Checking keystone service catalog...')
        self.set_api_version(2)
        endpoint_check = {
            'adminURL': u.valid_url,
            'id': u.not_null,
            'region': 'RegionOne',
            'publicURL': u.valid_url,
            'internalURL': u.valid_url
        }
        expected = {
            'volume': [endpoint_check],
            'identity': [endpoint_check]
        }
        actual = self.keystone_v2.service_catalog.get_endpoints()

        ret = u.validate_svc_catalog_endpoint_data(expected, actual)
        if ret:
            amulet.raise_status(amulet.FAIL, msg=ret)
项目:equlipse    作者:konono    | 项目源码 | 文件源码
def test_140_keystone_endpoint(self):
        """Verify the keystone endpoint data."""
        u.log.debug('Checking keystone api endpoint data...')
        endpoints = self.keystone_v2.endpoints.list()
        admin_port = '35357'
        internal_port = public_port = '5000'
        expected = {
            'id': u.not_null,
            'region': 'RegionOne',
            'adminurl': u.valid_url,
            'internalurl': u.valid_url,
            'publicurl': u.valid_url,
            'service_id': u.not_null
        }
        ret = u.validate_endpoint_data(endpoints, admin_port, internal_port,
                                       public_port, expected)
        if ret:
            amulet.raise_status(amulet.FAIL,
                                msg='keystone endpoint: {}'.format(ret))
项目:equlipse    作者:konono    | 项目源码 | 文件源码
def test_142_cinder_endpoint(self):
        """Verify the cinder endpoint data."""
        u.log.debug('Checking cinder endpoint...')
        endpoints = self.keystone_v2.endpoints.list()
        admin_port = internal_port = public_port = '8776'
        expected = {
            'id': u.not_null,
            'region': 'RegionOne',
            'adminurl': u.valid_url,
            'internalurl': u.valid_url,
            'publicurl': u.valid_url,
            'service_id': u.not_null
        }

        ret = u.validate_endpoint_data(endpoints, admin_port, internal_port,
                                       public_port, expected)
        if ret:
            amulet.raise_status(amulet.FAIL,
                                msg='cinder endpoint: {}'.format(ret))
项目:equlipse    作者:konono    | 项目源码 | 文件源码
def test_202_keystone_cinder_identity_service_relation(self):
        """Verify the keystone identity-service relation data"""
        u.log.debug('Checking keystone to cinder id relation data...')
        relation = ['identity-service', 'cinder:identity-service']
        expected = {
            'service_protocol': 'http',
            'service_tenant': 'services',
            'admin_token': 'ubuntutesting',
            'service_password': u.not_null,
            'service_port': '5000',
            'auth_port': '35357',
            'auth_protocol': 'http',
            'private-address': u.valid_ip,
            'auth_host': u.valid_ip,
            'service_username': 'cinder_cinderv2',
            'service_tenant_id': u.not_null,
            'service_host': u.valid_ip
        }
        for unit in self.keystone_sentries:
            ret = u.validate_relation_data(unit, relation, expected)
            if ret:
                message = u.relation_error('keystone identity-service', ret)
                amulet.raise_status(amulet.FAIL, msg=message)
项目:equlipse    作者:konono    | 项目源码 | 文件源码
def test_203_cinder_keystone_identity_service_relation(self):
        """Verify the cinder identity-service relation data"""
        u.log.debug('Checking cinder to keystone id relation data...')
        unit = self.cinder_sentry
        relation = ['identity-service', 'keystone:identity-service']
        expected = {
            'cinder_service': 'cinder',
            'cinder_region': 'RegionOne',
            'cinder_public_url': u.valid_url,
            'cinder_internal_url': u.valid_url,
            'cinder_admin_url': u.valid_url,
            'cinderv2_service': 'cinderv2',
            'cinderv2_region': 'RegionOne',
            'cinderv2_public_url': u.valid_url,
            'cinderv2_internal_url': u.valid_url,
            'cinderv2_admin_url': u.valid_url,
            'private-address': u.valid_ip,
        }
        ret = u.validate_relation_data(unit, relation, expected)
        if ret:
            message = u.relation_error('cinder identity-service', ret)
            amulet.raise_status(amulet.FAIL, msg=message)
项目:charm-openvswitch-odl    作者:openstack    | 项目源码 | 文件源码
def test_services(self):
        """Verify the expected services are running on the corresponding
           service units."""

        commands = {
            self.compute_sentry: ['nova-compute',
                                  'openvswitch-switch'],
            self.gateway_sentry: ['openvswitch-switch',
                                  'neutron-dhcp-agent',
                                  'neutron-l3-agent',
                                  'neutron-metadata-agent',
                                  'neutron-metering-agent',
                                  'neutron-lbaas-agent',
                                  'nova-api-metadata'],
            self.odl_controller_sentry: ['odl-controller'],
        }

        if self._get_openstack_release() >= self.xenial_newton:
            commands[self.gateway_sentry].remove('neutron-lbaas-agent')
            commands[self.gateway_sentry].append('neutron-lbaasv2-agent')

        ret = u.validate_services_by_name(commands)
        if ret:
            amulet.raise_status(amulet.FAIL, msg=ret)
项目:charm-neutron-api-odl    作者:openstack    | 项目源码 | 文件源码
def test_services(self):
        """Verify the expected services are running on the corresponding
           service units."""

        commands = {
            self.compute_sentry: ['nova-compute',
                                  'openvswitch-switch'],
            self.gateway_sentry: ['openvswitch-switch',
                                  'neutron-dhcp-agent',
                                  'neutron-l3-agent',
                                  'neutron-metadata-agent',
                                  'neutron-metering-agent',
                                  'neutron-lbaas-agent',
                                  'nova-api-metadata'],
            self.odl_controller_sentry: ['odl-controller'],
        }

        if self._get_openstack_release() >= self.xenial_newton:
            commands[self.gateway_sentry].remove('neutron-lbaas-agent')
            commands[self.gateway_sentry].append('neutron-lbaasv2-agent')

        ret = u.validate_services_by_name(commands)
        if ret:
            amulet.raise_status(amulet.FAIL, msg=ret)
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def _add_services(self, this_service, other_services):
        """Add services.

           Add services to the deployment where this_service is the local charm
           that we're testing and other_services are the other services that
           are being used in the local amulet tests.
           """
        if this_service['name'] != os.path.basename(os.getcwd()):
            s = this_service['name']
            msg = "The charm's root directory name needs to be {}".format(s)
            amulet.raise_status(amulet.FAIL, msg=msg)

        if 'units' not in this_service:
            this_service['units'] = 1

        self.d.add(this_service['name'], units=this_service['units'],
                   constraints=this_service.get('constraints'))

        for svc in other_services:
            if 'location' in svc:
                branch_location = svc['location']
            elif self.series:
                branch_location = 'cs:{}/{}'.format(self.series, svc['name']),
            else:
                branch_location = None

            if 'units' not in svc:
                svc['units'] = 1

            self.d.add(svc['name'], charm=branch_location, units=svc['units'],
                       constraints=svc.get('constraints'))
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def _deploy(self):
        """Deploy environment and wait for all hooks to finish executing."""
        timeout = int(os.environ.get('AMULET_SETUP_TIMEOUT', 900))
        try:
            self.d.setup(timeout=timeout)
            self.d.sentry.wait(timeout=timeout)
        except amulet.helpers.TimeoutError:
            amulet.raise_status(
                amulet.FAIL,
                msg="Deployment timed out ({}s)".format(timeout)
            )
        except Exception:
            raise
项目:charm-heat    作者:openstack    | 项目源码 | 文件源码
def _add_services(self, this_service, other_services):
        """Add services.

           Add services to the deployment where this_service is the local charm
           that we're testing and other_services are the other services that
           are being used in the local amulet tests.
           """
        if this_service['name'] != os.path.basename(os.getcwd()):
            s = this_service['name']
            msg = "The charm's root directory name needs to be {}".format(s)
            amulet.raise_status(amulet.FAIL, msg=msg)

        if 'units' not in this_service:
            this_service['units'] = 1

        self.d.add(this_service['name'], units=this_service['units'],
                   constraints=this_service.get('constraints'))

        for svc in other_services:
            if 'location' in svc:
                branch_location = svc['location']
            elif self.series:
                branch_location = 'cs:{}/{}'.format(self.series, svc['name']),
            else:
                branch_location = None

            if 'units' not in svc:
                svc['units'] = 1

            self.d.add(svc['name'], charm=branch_location, units=svc['units'],
                       constraints=svc.get('constraints'))
项目:charm-heat    作者:openstack    | 项目源码 | 文件源码
def _deploy(self):
        """Deploy environment and wait for all hooks to finish executing."""
        timeout = int(os.environ.get('AMULET_SETUP_TIMEOUT', 900))
        try:
            self.d.setup(timeout=timeout)
            self.d.sentry.wait(timeout=timeout)
        except amulet.helpers.TimeoutError:
            amulet.raise_status(
                amulet.FAIL,
                msg="Deployment timed out ({}s)".format(timeout)
            )
        except Exception:
            raise
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def _add_services(self, this_service, other_services):
        """Add services.

           Add services to the deployment where this_service is the local charm
           that we're testing and other_services are the other services that
           are being used in the local amulet tests.
           """
        if this_service['name'] != os.path.basename(os.getcwd()):
            s = this_service['name']
            msg = "The charm's root directory name needs to be {}".format(s)
            amulet.raise_status(amulet.FAIL, msg=msg)

        if 'units' not in this_service:
            this_service['units'] = 1

        self.d.add(this_service['name'], units=this_service['units'],
                   constraints=this_service.get('constraints'))

        for svc in other_services:
            if 'location' in svc:
                branch_location = svc['location']
            elif self.series:
                branch_location = 'cs:{}/{}'.format(self.series, svc['name']),
            else:
                branch_location = None

            if 'units' not in svc:
                svc['units'] = 1

            self.d.add(svc['name'], charm=branch_location, units=svc['units'],
                       constraints=svc.get('constraints'))
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def _deploy(self):
        """Deploy environment and wait for all hooks to finish executing."""
        timeout = int(os.environ.get('AMULET_SETUP_TIMEOUT', 900))
        try:
            self.d.setup(timeout=timeout)
            self.d.sentry.wait(timeout=timeout)
        except amulet.helpers.TimeoutError:
            amulet.raise_status(
                amulet.FAIL,
                msg="Deployment timed out ({}s)".format(timeout)
            )
        except Exception:
            raise
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def validate_keystone_roles(self, client):
        """Verify all existing roles."""
        u.log.debug('Checking keystone roles...')
        expected = [
            {'name': 'demoRole',
             'id': u.not_null},
            {'name': 'Admin',
             'id': u.not_null}
        ]
        actual = client.roles.list()

        ret = u.validate_role_data(expected, actual)
        if ret:
            amulet.raise_status(amulet.FAIL, msg=ret)
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def test_121_keystone_demo_domain_admin_access(self):
        """Verify that end-user domain admin does not have elevated
           privileges. Catch regressions like LP#1651989"""
        if self.is_mitaka_or_newer():
            u.log.debug('Checking keystone end-user domain admin access...')
            self.set_api_version(3)
            # Authenticate as end-user domain admin and verify that we have
            # appropriate access.
            client = u.authenticate_keystone(
                self.keystone_sentries[0].info['public-address'],
                username=self.demo_domain_admin,
                password='password',
                api_version=3,
                user_domain_name=self.demo_domain,
                domain_name=self.demo_domain,
            )

            try:
                # Expect failure
                client.domains.list()
            except Exception as e:
                message = ('Retrieve domain list as end-user domain admin '
                           'NOT allowed...OK ({})'.format(e))
                u.log.debug(message)
                pass
            else:
                message = ('Retrieve domain list as end-user domain admin '
                           'allowed')
                amulet.raise_status(amulet.FAIL, msg=message)
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def test_122_keystone_project_scoped_admin_access(self):
        """Verify that user admin in domain admin_domain has access to
           identity-calls guarded by rule:cloud_admin when using project
           scoped token."""
        if self.is_mitaka_or_newer():
            u.log.debug('Checking keystone project scoped admin access...')
            self.set_api_version(3)
            # Authenticate as end-user domain admin and verify that we have
            # appropriate access.
            client = u.authenticate_keystone(
                self.keystone_sentries[0].info['public-address'],
                username='admin',
                password='openstack',
                api_version=3,
                admin_port=True,
                user_domain_name='admin_domain',
                project_domain_name='admin_domain',
                project_name='admin',
            )

            try:
                client.domains.list()
                u.log.debug('OK')
            except Exception as e:
                message = ('Retrieve domain list as admin with project scoped '
                           'token FAILED. ({})'.format(e))
                amulet.raise_status(amulet.FAIL, msg=message)
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def test_201_mysql_keystone_shared_db_relation(self):
        """Verify the mysql shared-db relation data"""
        u.log.debug('Checking mysql to keystone db relation data...')
        unit = self.pxc_sentry
        relation = ['shared-db', 'keystone:shared-db']
        expected_data = {
            'private-address': u.valid_ip,
            'password': u.not_null,
            'db_host': u.valid_ip
        }
        ret = u.validate_relation_data(unit, relation, expected_data)
        if ret:
            message = u.relation_error('mysql shared-db', ret)
            amulet.raise_status(amulet.FAIL, msg=message)
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def test_203_cinder_keystone_identity_service_relation(self):
        """Verify the cinder identity-service relation data"""
        u.log.debug('Checking cinder to keystone id relation data...')
        unit = self.cinder_sentry
        relation = ['identity-service', 'keystone:identity-service']
        expected = {
            'cinder_service': 'cinder',
            'cinder_region': 'RegionOne',
            'cinder_public_url': u.valid_url,
            'cinder_internal_url': u.valid_url,
            'cinder_admin_url': u.valid_url,
            'cinderv2_service': 'cinderv2',
            'cinderv2_region': 'RegionOne',
            'cinderv2_public_url': u.valid_url,
            'cinderv2_internal_url': u.valid_url,
            'cinderv2_admin_url': u.valid_url,
            'private-address': u.valid_ip,
        }

        if self._get_openstack_release() >= self.xenial_pike:
            expected.pop('cinder_region')
            expected.pop('cinder_service')
            expected.pop('cinder_public_url')
            expected.pop('cinder_admin_url')
            expected.pop('cinder_internal_url')
            expected.update({
                'cinderv2_region': 'RegionOne',
                'cinderv3_region': 'RegionOne',
                'cinderv3_service': 'cinderv3',
                'cinderv3_region': 'RegionOne',
                'cinderv3_public_url': u.valid_url,
                'cinderv3_internal_url': u.valid_url,
                'cinderv3_admin_url': u.valid_url})

        ret = u.validate_relation_data(unit, relation, expected)
        if ret:
            message = u.relation_error('cinder identity-service', ret)
            amulet.raise_status(amulet.FAIL, msg=message)
项目:charm-nova-compute    作者:openstack    | 项目源码 | 文件源码
def _add_services(self, this_service, other_services):
        """Add services.

           Add services to the deployment where this_service is the local charm
           that we're testing and other_services are the other services that
           are being used in the local amulet tests.
           """
        if this_service['name'] != os.path.basename(os.getcwd()):
            s = this_service['name']
            msg = "The charm's root directory name needs to be {}".format(s)
            amulet.raise_status(amulet.FAIL, msg=msg)

        if 'units' not in this_service:
            this_service['units'] = 1

        self.d.add(this_service['name'], units=this_service['units'],
                   constraints=this_service.get('constraints'))

        for svc in other_services:
            if 'location' in svc:
                branch_location = svc['location']
            elif self.series:
                branch_location = 'cs:{}/{}'.format(self.series, svc['name']),
            else:
                branch_location = None

            if 'units' not in svc:
                svc['units'] = 1

            self.d.add(svc['name'], charm=branch_location, units=svc['units'],
                       constraints=svc.get('constraints'))
项目:charm-nova-compute    作者:openstack    | 项目源码 | 文件源码
def _deploy(self):
        """Deploy environment and wait for all hooks to finish executing."""
        timeout = int(os.environ.get('AMULET_SETUP_TIMEOUT', 900))
        try:
            self.d.setup(timeout=timeout)
            self.d.sentry.wait(timeout=timeout)
        except amulet.helpers.TimeoutError:
            amulet.raise_status(
                amulet.FAIL,
                msg="Deployment timed out ({}s)".format(timeout)
            )
        except Exception:
            raise
项目:charm-ceph-osd    作者:openstack    | 项目源码 | 文件源码
def _add_services(self, this_service, other_services):
        """Add services.

           Add services to the deployment where this_service is the local charm
           that we're testing and other_services are the other services that
           are being used in the local amulet tests.
           """
        if this_service['name'] != os.path.basename(os.getcwd()):
            s = this_service['name']
            msg = "The charm's root directory name needs to be {}".format(s)
            amulet.raise_status(amulet.FAIL, msg=msg)

        if 'units' not in this_service:
            this_service['units'] = 1

        self.d.add(this_service['name'], units=this_service['units'],
                   constraints=this_service.get('constraints'))

        for svc in other_services:
            if 'location' in svc:
                branch_location = svc['location']
            elif self.series:
                branch_location = 'cs:{}/{}'.format(self.series, svc['name']),
            else:
                branch_location = None

            if 'units' not in svc:
                svc['units'] = 1

            self.d.add(svc['name'], charm=branch_location, units=svc['units'],
                       constraints=svc.get('constraints'))
项目:charm-ceph-osd    作者:openstack    | 项目源码 | 文件源码
def _deploy(self):
        """Deploy environment and wait for all hooks to finish executing."""
        timeout = int(os.environ.get('AMULET_SETUP_TIMEOUT', 900))
        try:
            self.d.setup(timeout=timeout)
            self.d.sentry.wait(timeout=timeout)
        except amulet.helpers.TimeoutError:
            amulet.raise_status(
                amulet.FAIL,
                msg="Deployment timed out ({}s)".format(timeout)
            )
        except Exception:
            raise
项目:charm-glance    作者:openstack    | 项目源码 | 文件源码
def _add_services(self, this_service, other_services):
        """Add services.

           Add services to the deployment where this_service is the local charm
           that we're testing and other_services are the other services that
           are being used in the local amulet tests.
           """
        if this_service['name'] != os.path.basename(os.getcwd()):
            s = this_service['name']
            msg = "The charm's root directory name needs to be {}".format(s)
            amulet.raise_status(amulet.FAIL, msg=msg)

        if 'units' not in this_service:
            this_service['units'] = 1

        self.d.add(this_service['name'], units=this_service['units'],
                   constraints=this_service.get('constraints'))

        for svc in other_services:
            if 'location' in svc:
                branch_location = svc['location']
            elif self.series:
                branch_location = 'cs:{}/{}'.format(self.series, svc['name']),
            else:
                branch_location = None

            if 'units' not in svc:
                svc['units'] = 1

            self.d.add(svc['name'], charm=branch_location, units=svc['units'],
                       constraints=svc.get('constraints'))
项目:charm-glance    作者:openstack    | 项目源码 | 文件源码
def _deploy(self):
        """Deploy environment and wait for all hooks to finish executing."""
        timeout = int(os.environ.get('AMULET_SETUP_TIMEOUT', 900))
        try:
            self.d.setup(timeout=timeout)
            self.d.sentry.wait(timeout=timeout)
        except amulet.helpers.TimeoutError:
            amulet.raise_status(
                amulet.FAIL,
                msg="Deployment timed out ({}s)".format(timeout)
            )
        except Exception:
            raise
项目:charm-neutron-api    作者:openstack    | 项目源码 | 文件源码
def _add_services(self, this_service, other_services):
        """Add services.

           Add services to the deployment where this_service is the local charm
           that we're testing and other_services are the other services that
           are being used in the local amulet tests.
           """
        if this_service['name'] != os.path.basename(os.getcwd()):
            s = this_service['name']
            msg = "The charm's root directory name needs to be {}".format(s)
            amulet.raise_status(amulet.FAIL, msg=msg)

        if 'units' not in this_service:
            this_service['units'] = 1

        self.d.add(this_service['name'], units=this_service['units'],
                   constraints=this_service.get('constraints'))

        for svc in other_services:
            if 'location' in svc:
                branch_location = svc['location']
            elif self.series:
                branch_location = 'cs:{}/{}'.format(self.series, svc['name']),
            else:
                branch_location = None

            if 'units' not in svc:
                svc['units'] = 1

            self.d.add(svc['name'], charm=branch_location, units=svc['units'],
                       constraints=svc.get('constraints'))
项目:charm-neutron-api    作者:openstack    | 项目源码 | 文件源码
def _deploy(self):
        """Deploy environment and wait for all hooks to finish executing."""
        timeout = int(os.environ.get('AMULET_SETUP_TIMEOUT', 900))
        try:
            self.d.setup(timeout=timeout)
            self.d.sentry.wait(timeout=timeout)
        except amulet.helpers.TimeoutError:
            amulet.raise_status(
                amulet.FAIL,
                msg="Deployment timed out ({}s)".format(timeout)
            )
        except Exception:
            raise
项目:charm-ceph-mon    作者:openstack    | 项目源码 | 文件源码
def _add_services(self, this_service, other_services):
        """Add services.

           Add services to the deployment where this_service is the local charm
           that we're testing and other_services are the other services that
           are being used in the local amulet tests.
           """
        if this_service['name'] != os.path.basename(os.getcwd()):
            s = this_service['name']
            msg = "The charm's root directory name needs to be {}".format(s)
            amulet.raise_status(amulet.FAIL, msg=msg)

        if 'units' not in this_service:
            this_service['units'] = 1

        self.d.add(this_service['name'], units=this_service['units'],
                   constraints=this_service.get('constraints'))

        for svc in other_services:
            if 'location' in svc:
                branch_location = svc['location']
            elif self.series:
                branch_location = 'cs:{}/{}'.format(self.series, svc['name']),
            else:
                branch_location = None

            if 'units' not in svc:
                svc['units'] = 1

            self.d.add(svc['name'], charm=branch_location, units=svc['units'],
                       constraints=svc.get('constraints'))
项目:charm-ceph-mon    作者:openstack    | 项目源码 | 文件源码
def _deploy(self):
        """Deploy environment and wait for all hooks to finish executing."""
        timeout = int(os.environ.get('AMULET_SETUP_TIMEOUT', 900))
        try:
            self.d.setup(timeout=timeout)
            self.d.sentry.wait(timeout=timeout)
        except amulet.helpers.TimeoutError:
            amulet.raise_status(
                amulet.FAIL,
                msg="Deployment timed out ({}s)".format(timeout)
            )
        except Exception:
            raise
项目:charm-ceph-mon    作者:openstack    | 项目源码 | 文件源码
def test_102_services(self):
        """Verify the expected services are running on the service units."""

        services = {
            self.rabbitmq_sentry: ['rabbitmq-server'],
            self.nova_sentry: ['nova-compute'],
            self.keystone_sentry: ['keystone'],
            self.glance_sentry: ['glance-registry',
                                 'glance-api'],
            self.cinder_sentry: ['cinder-scheduler',
                                 'cinder-volume'],
        }

        if self._get_openstack_release() < self.xenial_ocata:
            services[self.cinder_sentry].append('cinder-api')

        if self._get_openstack_release() < self.xenial_mitaka:
            # For upstart systems only.  Ceph services under systemd
            # are checked by process name instead.
            ceph_services = [
                'ceph-mon-all',
                'ceph-mon id=`hostname`'
            ]
            services[self.ceph0_sentry] = ceph_services
            services[self.ceph1_sentry] = ceph_services
            services[self.ceph2_sentry] = ceph_services

            ceph_osd_services = [
                'ceph-osd id={}'.format(u.get_ceph_osd_id_cmd(0)),
                'ceph-osd id={}'.format(u.get_ceph_osd_id_cmd(1))
            ]

            services[self.ceph_osd_sentry] = ceph_osd_services

        if self._get_openstack_release() >= self.trusty_liberty:
            services[self.keystone_sentry] = ['apache2']

        ret = u.validate_services_by_name(services)
        if ret:
            amulet.raise_status(amulet.FAIL, msg=ret)
项目:charm-ceph-mon    作者:openstack    | 项目源码 | 文件源码
def test_200_ceph_nova_client_relation(self):
        """Verify the ceph to nova ceph-client relation data."""
        u.log.debug('Checking ceph:nova-compute ceph-mon relation data...')
        unit = self.ceph0_sentry
        relation = ['client', 'nova-compute:ceph']
        expected = {
            'private-address': u.valid_ip,
            'auth': 'none',
            'key': u.not_null
        }

        ret = u.validate_relation_data(unit, relation, expected)
        if ret:
            message = u.relation_error('ceph-mon to nova ceph-client', ret)
            amulet.raise_status(amulet.FAIL, msg=message)
项目:charm-ceph-mon    作者:openstack    | 项目源码 | 文件源码
def test_202_ceph_glance_client_relation(self):
        """Verify the ceph to glance ceph-client relation data."""
        u.log.debug('Checking ceph:glance client relation data...')
        unit = self.ceph1_sentry
        relation = ['client', 'glance:ceph']
        expected = {
            'private-address': u.valid_ip,
            'auth': 'none',
            'key': u.not_null
        }

        ret = u.validate_relation_data(unit, relation, expected)
        if ret:
            message = u.relation_error('ceph to glance ceph-client', ret)
            amulet.raise_status(amulet.FAIL, msg=message)
项目:charm-ceph-mon    作者:openstack    | 项目源码 | 文件源码
def test_203_glance_ceph_client_relation(self):
        """Verify the glance to ceph client relation data."""
        u.log.debug('Checking glance:ceph client relation data...')
        unit = self.glance_sentry
        relation = ['ceph', 'ceph-mon:client']
        expected = {
            'private-address': u.valid_ip
        }

        ret = u.validate_relation_data(unit, relation, expected)
        if ret:
            message = u.relation_error('glance to ceph ceph-client', ret)
            amulet.raise_status(amulet.FAIL, msg=message)
项目:charm-ceph-mon    作者:openstack    | 项目源码 | 文件源码
def test_204_ceph_cinder_client_relation(self):
        """Verify the ceph to cinder ceph-client relation data."""
        u.log.debug('Checking ceph:cinder ceph relation data...')
        unit = self.ceph2_sentry
        relation = ['client', 'cinder-ceph:ceph']
        expected = {
            'private-address': u.valid_ip,
            'auth': 'none',
            'key': u.not_null
        }

        ret = u.validate_relation_data(unit, relation, expected)
        if ret:
            message = u.relation_error('ceph to cinder ceph-client', ret)
            amulet.raise_status(amulet.FAIL, msg=message)