Python oslo_utils.uuidutils 模块,generate_uuid() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用oslo_utils.uuidutils.generate_uuid()

项目:monasca-transform    作者:openstack    | 项目源码 | 文件源码
def test_add_another_offset(self):
        topic_1 = uuidutils.generate_uuid()
        partition_1 = random.randint(0, 1024)
        until_offset_1 = random.randint(0, sys.maxsize)
        from_offset_1 = random.randint(0, sys.maxsize)
        app_name_1 = uuidutils.generate_uuid()
        offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1)
        my_batch_time = self.get_dummy_batch_time()

        used_values = {}
        self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
                                    app_name=app_name_1,
                                    from_offset=from_offset_1,
                                    until_offset=until_offset_1,
                                    batch_time_info=my_batch_time)
        used_values[offset_key_1] = {
            "topic": topic_1, "partition": partition_1, "app_name": app_name_1,
            "from_offset": from_offset_1, "until_offset": until_offset_1
        }

        kafka_offset_specs = self.kafka_offset_specs.get_kafka_offsets(
            app_name_1)
        offset_value_1 = kafka_offset_specs.get(offset_key_1)
        self.assertions_on_offset(used_value=used_values.get(offset_key_1),
                                  offset_value=offset_value_1)
        self.assertEqual(1,
                         len(self.kafka_offset_specs.get_kafka_offsets(
                             app_name_1)))
项目:ironic-staging-drivers    作者:openstack    | 项目源码 | 文件源码
def test_send_magic_packets(self, mock_socket):
        fake_socket = mock.Mock(spec=socket, spec_set=True)
        mock_socket.return_value = fake_socket()
        obj_utils.create_test_port(self.context,
                                   uuid=uuidutils.generate_uuid(),
                                   address='aa:bb:cc:dd:ee:ff',
                                   node_id=self.node.id)
        with task_manager.acquire(
                self.context, self.node.uuid, shared=True) as task:
            wol_power._send_magic_packets(task, '255.255.255.255', 9)

            expected_calls = [
                mock.call(),
                mock.call().setsockopt(socket.SOL_SOCKET,
                                       socket.SO_BROADCAST, 1),
                mock.call().sendto(mock.ANY, ('255.255.255.255', 9)),
                mock.call().sendto(mock.ANY, ('255.255.255.255', 9)),
                mock.call().close()]

            fake_socket.assert_has_calls(expected_calls)
            self.assertEqual(1, mock_socket.call_count)
项目:watcher-tempest-plugin    作者:openstack    | 项目源码 | 文件源码
def test_create_audit_template(self):
        goal_name = "dummy"
        _, goal = self.client.show_goal(goal_name)

        params = {
            'name': 'my at name %s' % uuidutils.generate_uuid(),
            'description': 'my at description',
            'goal': goal['uuid']}
        expected_data = {
            'name': params['name'],
            'description': params['description'],
            'goal_uuid': params['goal'],
            'goal_name': goal_name,
            'strategy_uuid': None,
            'strategy_name': None}

        _, body = self.create_audit_template(**params)
        self.assert_expected(expected_data, body)

        _, audit_template = self.client.show_audit_template(body['uuid'])
        self.assert_expected(audit_template, body)
项目:watcher-tempest-plugin    作者:openstack    | 项目源码 | 文件源码
def test_create_audit_template_unicode_description(self):
        goal_name = "dummy"
        _, goal = self.client.show_goal(goal_name)
        # Use a unicode string for testing:
        params = {
            'name': 'my at name %s' % uuidutils.generate_uuid(),
            'description': 'my àt déscrïptïôn',
            'goal': goal['uuid']}

        expected_data = {
            'name': params['name'],
            'description': params['description'],
            'goal_uuid': params['goal'],
            'goal_name': goal_name,
            'strategy_uuid': None,
            'strategy_name': None}

        _, body = self.create_audit_template(**params)
        self.assert_expected(expected_data, body)

        _, audit_template = self.client.show_audit_template(body['uuid'])
        self.assert_expected(audit_template, body)
项目:watcher-tempest-plugin    作者:openstack    | 项目源码 | 文件源码
def test_update_audit_template_remove(self):
        description = 'my at description'
        name = 'my at name %s' % uuidutils.generate_uuid()
        params = {'name': name,
                  'description': description,
                  'goal': self.goal['uuid']}

        _, audit_template = self.create_audit_template(**params)

        # Removing the description
        self.client.update_audit_template(
            audit_template['uuid'],
            [{'path': '/description', 'op': 'remove'}])

        _, body = self.client.show_audit_template(audit_template['uuid'])
        self.assertIsNone(body.get('description'))

        # Assert nothing else was changed
        self.assertEqual(name, body['name'])
        self.assertIsNone(body['description'])
        self.assertEqual(self.goal['uuid'], body['goal_uuid'])
项目:craton    作者:openstack    | 项目源码 | 文件源码
def post(self, context, request_data):
        """Create a new user. Requires project admin privileges."""
        # NOTE(sulo): Instead of using context project_id from
        # header, here we always ensure, user create gets project_id
        # from request param.
        project_id = request_data["project_id"]
        dbapi.projects_get_by_id(context, project_id)
        api_key = uuidutils.generate_uuid()
        request_data["api_key"] = api_key
        user_obj = dbapi.users_create(context, request_data)

        location = v1.api.url_for(
            UserById, id=user_obj.id, _external=True
        )
        headers = {'Location': location}

        return jsonutils.to_primitive(user_obj), 201, headers
项目:craton    作者:openstack    | 项目源码 | 文件源码
def start(conf):
    persistence = _get_persistence_backend(conf)

    if conf.taskflow.db_upgrade:
        with contextlib.closing(persistence.get_connection()) as conn:
            LOG.info('Checking for database schema upgrade')
            conn.upgrade()

    my_name = uuidutils.generate_uuid()
    LOG.info('I am %s', my_name)

    board = _get_jobboard_backend(conf, persistence=persistence)

    conductor = conductors.fetch(
        'nonblocking', my_name, board,
        engine='parallel',
        max_simultaneous_jobs=conf.max_simultaneous_jobs,
        persistence=persistence)

    board.connect()
    LOG.debug('Starting taskflow conductor loop')
    threading.Thread(target=conductor.run).start()

    return persistence, board, conductor
项目:cyborg    作者:openstack    | 项目源码 | 文件源码
def accelerator_create(self, context, values):
        if not values.get('uuid'):
            values['uuid'] = uuidutils.generate_uuid()

        if not values.get('description'):
            values['description'] = ''

        accelerator = models.Accelerator()
        accelerator.update(values)

        with _session_for_write() as session:
            try:
                session.add(accelerator)
                session.flush()
            except db_exc.DBDuplicateEntry:
                raise exception.AcceleratorAlreadyExists(uuid=values['uuid'])
            return accelerator
项目:iotronic    作者:openstack    | 项目源码 | 文件源码
def create_board(self, values):
        # ensure defaults are present for new boards
        if 'uuid' not in values:
            values['uuid'] = uuidutils.generate_uuid()
        if 'status' not in values:
            values['status'] = states.REGISTERED

        board = models.Board()
        board.update(values)
        try:
            board.save()
        except db_exc.DBDuplicateEntry as exc:
            if 'code' in exc.columns:
                raise exception.DuplicateCode(code=values['code'])
            raise exception.BoardAlreadyExists(uuid=values['uuid'])
        return board
项目:trio2o    作者:openstack    | 项目源码 | 文件源码
def register_job(context, _type, resource_id):
    try:
        context.session.begin()
        job_dict = {'id': uuidutils.generate_uuid(),
                    'type': _type,
                    'status': constants.JS_Running,
                    'resource_id': resource_id,
                    'extra_id': constants.SP_EXTRA_ID}
        job = core.create_resource(context, models.Job, job_dict)
        context.session.commit()
        return job
    except db_exc.DBDuplicateEntry:
        context.session.rollback()
        return None
    except db_exc.DBDeadlock:
        context.session.rollback()
        return None
    finally:
        context.session.close()
项目:trio2o    作者:openstack    | 项目源码 | 文件源码
def test_update_subproject_not_in_hierarchy(self):

        # Create another project hierarchy
        E = self.FakeProject(id=uuidutils.generate_uuid(), parent_id=None)
        F = self.FakeProject(id=uuidutils.generate_uuid(), parent_id=E.id)
        E.subtree = {F.id: F.subtree}
        self.project_by_id[E.id] = E
        self.project_by_id[F.id] = F

        qso = quota.QuotaSetOperation(self.A.id)
        qso._get_project = mock.Mock()
        qso._get_project.side_effect = self._get_project
        self.ctx.project_id = self.A.id

        updated = _make_body(tenant_id=None, root=True,
                             **self.test_class_quota)
        expected = _make_body(tenant_id=None, root=True,
                              **self.test_class_expected_result)
        result = qso.update(self.ctx, **updated)
        self.assertDictMatch(expected, result)

        # Update the quota of B to be equal to its parent quota
        qso.update_hierarchy(F.id)
        self.assertRaises(exceptions.HTTPForbiddenError, qso.update,
                          self.ctx, **updated)
项目:panko    作者:openstack    | 项目源码 | 文件源码
def setUp(self):
        super(HBaseManager, self).setUp()
        self.connection = storage.get_connection(
            self.url, self.conf)
        # Unique prefix for each test to keep data is distinguished because
        # all test data is stored in one table
        data_prefix = uuidutils.generate_uuid(dashed=False)

        def table(conn, name):
            return mocks.MockHBaseTable(name, conn, data_prefix)

        # Mock only real HBase connection, MConnection "table" method
        # stays origin.
        mock.patch('happybase.Connection.table', new=table).start()
        # We shouldn't delete data and tables after each test,
        # because it last for too long.
        # All tests tables will be deleted in setup-test-env.sh
        mock.patch("happybase.Connection.disable_table",
                   new=mock.MagicMock()).start()
        mock.patch("happybase.Connection.delete_table",
                   new=mock.MagicMock()).start()
        mock.patch("happybase.Connection.create_table",
                   new=mock.MagicMock()).start()
项目:panko    作者:openstack    | 项目源码 | 文件源码
def _generate_models(self):
        event_models = []
        base = 0
        self.s_time = datetime.datetime(2013, 12, 31, 5, 0)
        self.trait_time = datetime.datetime(2013, 12, 31, 5, 0)
        for i in range(20):
            trait_models = [models.Trait(name, type, value)
                            for name, type, value in [
                                ('trait_A', models.Trait.TEXT_TYPE,
                                    "my_text"),
                                ('trait_B', models.Trait.INT_TYPE,
                                    base + 1),
                                ('trait_C', models.Trait.FLOAT_TYPE,
                                    float(base) + 0.123456),
                                ('trait_D', models.Trait.DATETIME_TYPE,
                                    self.trait_time)]]

            event_models.append(
                models.Event(message_id=uuidutils.generate_uuid(),
                             event_type='foo.bar',
                             generated=self.trait_time,
                             traits=trait_models,
                             raw={'status': {'nested': 'started'}}))
            self.trait_time += datetime.timedelta(seconds=1)
        self.conn.record_events(event_models)
项目:zun    作者:openstack    | 项目源码 | 文件源码
def create_container(self, context, values):
        # ensure defaults are present for new containers
        if not values.get('uuid'):
            values['uuid'] = uuidutils.generate_uuid()

        if values.get('name'):
            self._validate_unique_container_name(context, values['name'])

        container = models.Container()
        container.update(values)
        try:
            container.save()
        except db_exc.DBDuplicateEntry:
            raise exception.ContainerAlreadyExists(field='UUID',
                                                   value=values['uuid'])
        return container
项目:zun    作者:openstack    | 项目源码 | 文件源码
def create_container(self, context, container_data):
        # ensure defaults are present for new containers
        if not container_data.get('uuid'):
            container_data['uuid'] = uuidutils.generate_uuid()

        if container_data.get('name'):
            self._validate_unique_container_name(context,
                                                 container_data['name'])

        container = models.Container(container_data)
        try:
            container.save()
        except Exception:
            raise

        return container
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_container_actions_get_by_container(self):
        """Ensure we can get actions by UUID."""
        uuid1 = uuidutils.generate_uuid()

        expected = []

        action_values = self._create_action_values(uuid1)
        action = dbapi.action_start(self.context, action_values)
        expected.append(action)

        action_values['action'] = 'test-action'
        action = dbapi.action_start(self.context, action_values)
        expected.append(action)

        # Create an other container action.
        uuid2 = uuidutils.generate_uuid()
        action_values = self._create_action_values(uuid2, 'test-action')
        dbapi.action_start(self.context, action_values)

        actions = dbapi.actions_get(self.context, uuid1)
        self._assertEqualListsOfObjects(expected, actions)
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_container_action_get_by_container_and_request(self):
        """Ensure we can get an action by container UUID and request_id"""
        uuid1 = uuidutils.generate_uuid()

        action_values = self._create_action_values(uuid1)
        dbapi.action_start(self.context, action_values)
        request_id = action_values['request_id']

        # An other action using a different req id
        action_values['action'] = 'test-action'
        action_values['request_id'] = 'req-00000000-7522-4d99-7ff-111111111111'
        dbapi.action_start(self.context, action_values)

        action = dbapi.action_get_by_request_id(self.context, uuid1,
                                                request_id)
        self.assertEqual('create_container', action['action'])
        self.assertEqual(self.context.request_id, action['request_id'])
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_container_action_event_start(self):
        """Create a container action event."""
        uuid = uuidutils.generate_uuid()

        action_values = self._create_action_values(uuid)
        action = dbapi.action_start(self.context, action_values)

        event_values = self._create_event_values(uuid)
        event = dbapi.action_event_start(self.context, event_values)

        event_values['action_id'] = action['id']
        ignored_keys = self.IGNORED_FIELDS + ['finish_time', 'traceback',
                                              'result']
        self._assertEqualObjects(event_values, event, ignored_keys)

        self._assertActionEventSaved(event, action['id'])
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_container_action_event_finish_success(self):
        """Finish a container action event."""
        uuid = uuidutils.generate_uuid()

        action = dbapi.action_start(self.context,
                                    self._create_action_values(uuid))

        dbapi.action_event_start(self.context,
                                 self._create_event_values(uuid))

        event_values = {
            'finish_time': timeutils.utcnow() + datetime.timedelta(seconds=5),
            'result': 'Success'
        }

        event_values = self._create_event_values(uuid, extra=event_values)
        event = dbapi.action_event_finish(self.context, event_values)

        self._assertActionEventSaved(event, action['id'])
        action = dbapi.action_get_by_request_id(self.context, uuid,
                                                self.context.request_id)
        self.assertNotEqual('Error', action['message'])
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_list_capsules_with_filters(self):
        capsule1 = utils.create_test_capsule(
            name='capsule1',
            uuid=uuidutils.generate_uuid(),
            context=self.context)
        capsule2 = utils.create_test_capsule(
            name='capsule2',
            uuid=uuidutils.generate_uuid(),
            context=self.context)

        res = dbapi.list_capsules(
            self.context, filters={'uuid': capsule1.uuid})
        self.assertEqual([capsule1.id], [r.id for r in res])

        res = dbapi.list_capsules(
            self.context, filters={'uuid': capsule2.uuid})
        self.assertEqual([capsule2.id], [r.id for r in res])

        res = dbapi.list_capsules(
            self.context, filters={'uuid': 'unknow-uuid'})
        self.assertEqual([], [r.id for r in res])
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_list_capsules(self, mock_write, mock_read):
        uuids = []
        capsules = []
        mock_read.side_effect = etcd.EtcdKeyNotFound
        for i in range(1, 6):
            capsule = utils.create_test_capsule(
                uuid=uuidutils.generate_uuid(),
                context=self.context,
                name='capsule' + str(i))
            capsules.append(capsule.as_dict())
            uuids.append(six.text_type(capsule['uuid']))
        mock_read.side_effect = lambda *args: FakeEtcdMultipleResult(
            capsules)
        res = dbapi.list_capsules(self.context)
        res_uuids = [r.uuid for r in res]
        self.assertEqual(sorted(uuids), sorted(res_uuids))
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_list_inventories(self):
        totals = []
        for i in range(1, 6):
            provider = utils.create_test_resource_provider(
                id=i,
                uuid=uuidutils.generate_uuid(),
                context=self.context)
            inventory = utils.create_test_inventory(
                id=i,
                resource_provider_id=provider.id,
                total=i,
                context=self.context)
            totals.append(inventory['total'])
        res = dbapi.list_inventories(self.context)
        res_totals = [r.total for r in res]
        self.assertEqual(sorted(totals), sorted(res_totals))
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_list_inventories_sorted(self):
        totals = []
        for i in range(5):
            provider = utils.create_test_resource_provider(
                id=i,
                uuid=uuidutils.generate_uuid(),
                context=self.context)
            inventory = utils.create_test_inventory(
                id=i,
                resource_provider_id=provider.id,
                total=10 - i,
                context=self.context)
            totals.append(inventory['total'])
        res = dbapi.list_inventories(self.context,
                                     sort_key='total')
        res_totals = [r.total for r in res]
        self.assertEqual(sorted(totals), res_totals)

        self.assertRaises(exception.InvalidParameterValue,
                          dbapi.list_inventories,
                          self.context,
                          sort_key='foo')
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_list_volume_mappings(self, mock_write, mock_read):
        uuids = []
        volume_mappings = []
        mock_read.side_effect = etcd.EtcdKeyNotFound
        for i in range(0, 6):
            volume_mapping = utils.create_test_volume_mapping(
                uuid=uuidutils.generate_uuid(),
                context=self.context,
                name='volume_mapping' + str(i))
            volume_mappings.append(volume_mapping.as_dict())
            uuids.append(six.text_type(volume_mapping['uuid']))
        mock_read.side_effect = lambda *args: FakeEtcdMultipleResult(
            volume_mappings)
        res = dbapi.list_volume_mappings(self.context)
        res_uuids = [r.uuid for r in res]
        self.assertEqual(sorted(uuids), sorted(res_uuids))
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_list_volume_mappings_sorted(self, mock_write, mock_read):
        uuids = []
        volume_mappings = []
        mock_read.side_effect = etcd.EtcdKeyNotFound
        for i in range(0, 6):
            volume_mapping = utils.create_test_volume_mapping(
                uuid=uuidutils.generate_uuid(),
                context=self.context,
                name='volume_mapping' + str(i))
            volume_mappings.append(volume_mapping.as_dict())
            uuids.append(six.text_type(volume_mapping['uuid']))
        mock_read.side_effect = lambda *args: FakeEtcdMultipleResult(
            volume_mappings)
        res = dbapi.list_volume_mappings(self.context, sort_key='uuid')
        res_uuids = [r.uuid for r in res]
        self.assertEqual(sorted(uuids), res_uuids)
        self.assertRaises(exception.InvalidParameterValue,
                          dbapi.list_volume_mappings,
                          self.context,
                          sort_key='wrong_key')
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_list_containers_sorted(self):
        uuids = []
        for i in range(5):
            container = utils.create_test_container(
                uuid=uuidutils.generate_uuid(),
                context=self.context,
                name='container' + str(i))
            uuids.append(six.text_type(container.uuid))
        res = dbapi.list_containers(self.context, sort_key='uuid')
        res_uuids = [r.uuid for r in res]
        self.assertEqual(sorted(uuids), res_uuids)

        self.assertRaises(exception.InvalidParameterValue,
                          dbapi.list_containers,
                          self.context,
                          sort_key='foo')
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_update_container_with_the_same_name(self):
        CONF.set_override("unique_container_name_scope", "project",
                          group="compute")
        container1 = utils.create_test_container(
            name='container-one',
            uuid=uuidutils.generate_uuid(),
            context=self.context)
        container2 = utils.create_test_container(
            name='container-two',
            uuid=uuidutils.generate_uuid(),
            context=self.context)
        new_name = 'new_name'
        dbapi.update_container(self.context, container1.id,
                               {'name': new_name})
        self.assertRaises(exception.ContainerAlreadyExists,
                          dbapi.update_container, self.context,
                          container2.id, {'name': new_name})
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_list_containers(self, mock_write, mock_read):
        uuids = []
        containers = []
        mock_read.side_effect = etcd.EtcdKeyNotFound
        for i in range(1, 6):
            container = utils.create_test_container(
                uuid=uuidutils.generate_uuid(),
                context=self.context,
                name='cont' + str(i))
            containers.append(container.as_dict())
            uuids.append(six.text_type(container['uuid']))
        mock_read.side_effect = lambda *args: FakeEtcdMultipleResult(
            containers)
        res = dbapi.list_containers(self.context)
        res_uuids = [r.uuid for r in res]
        self.assertEqual(sorted(uuids), sorted(res_uuids))
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_update_container_with_the_same_name(self, mock_update,
                                                 mock_write, mock_read):
        CONF.set_override("unique_container_name_scope", "project",
                          group="compute")
        mock_read.side_effect = etcd.EtcdKeyNotFound
        container1 = utils.create_test_container(
            name='container-one',
            uuid=uuidutils.generate_uuid(),
            context=self.context)
        container2 = utils.create_test_container(
            name='container-two',
            uuid=uuidutils.generate_uuid(),
            context=self.context)

        mock_read.side_effect = lambda *args: FakeEtcdMultipleResult(
            [container1.as_dict(), container2.as_dict()])
        self.assertRaises(exception.ContainerAlreadyExists,
                          dbapi.update_container, self.context,
                          container2.uuid, {'name': 'container-one'})
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_list_resource_classes_sorted(self):
        names = []
        for i in range(5):
            resource = utils.create_test_resource_class(
                context=self.context,
                uuid=uuidutils.generate_uuid(),
                name='class' + str(i))
            names.append(six.text_type(resource.name))
        res = dbapi.list_resource_classes(self.context, sort_key='name')
        res_names = [r.name for r in res]
        self.assertEqual(sorted(names), res_names)

        self.assertRaises(exception.InvalidParameterValue,
                          dbapi.list_resource_classes,
                          self.context,
                          sort_key='foo')
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_list_allocations(self):
        cids = []
        for i in range(1, 6):
            provider = utils.create_test_resource_provider(
                id=i,
                uuid=uuidutils.generate_uuid(),
                context=self.context)
            allocation = utils.create_test_allocation(
                id=i,
                resource_provider_id=provider.id,
                consumer_id=uuidutils.generate_uuid(),
                context=self.context)
            cids.append(allocation['consumer_id'])
        res = dbapi.list_allocations(self.context)
        res_cids = [r.consumer_id for r in res]
        self.assertEqual(sorted(cids), sorted(res_cids))
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_list_compute_nodes_sorted(self):
        uuids = []
        for i in range(5):
            node = utils.create_test_compute_node(
                uuid=uuidutils.generate_uuid(),
                context=self.context,
                hostname='node' + str(i))
            uuids.append(six.text_type(node.uuid))
        res = dbapi.list_compute_nodes(self.context, sort_key='uuid')
        res_uuids = [r.uuid for r in res]
        self.assertEqual(sorted(uuids), res_uuids)

        self.assertRaises(exception.InvalidParameterValue,
                          dbapi.list_compute_nodes,
                          self.context,
                          sort_key='foo')
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_list_images_with_filters(self):
        image1 = utils.create_test_image(
            context=self.context, repo='image-one',
            uuid=uuidutils.generate_uuid())
        image2 = utils.create_test_image(
            context=self.context, repo='image-two',
            uuid=uuidutils.generate_uuid())

        res = self.dbapi.list_images(self.context,
                                     filters={'repo': 'image-one'})
        self.assertEqual([image1.id], [r.id for r in res])

        res = self.dbapi.list_images(self.context,
                                     filters={'repo': 'image-two'})
        self.assertEqual([image2.id], [r.id for r in res])

        res = self.dbapi.list_images(self.context,
                                     filters={'repo': 'bad-image'})
        self.assertEqual([], [r.id for r in res])

        res = self.dbapi.list_images(
            self.context,
            filters={'repo': image1.repo})
        self.assertEqual([image1.id], [r.id for r in res])
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_list_resource_providers_sorted(self):
        uuids = []
        for i in range(5):
            provider = utils.create_test_resource_provider(
                uuid=uuidutils.generate_uuid(),
                context=self.context,
                name='provider' + str(i))
            uuids.append(six.text_type(provider.uuid))
        res = dbapi.list_resource_providers(self.context, sort_key='uuid')
        res_uuids = [r.uuid for r in res]
        self.assertEqual(sorted(uuids), res_uuids)

        self.assertRaises(exception.InvalidParameterValue,
                          dbapi.list_resource_providers,
                          self.context,
                          sort_key='foo')
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_get_all_images_with_pagination_marker(self, mock_image_list
                                                   ):
        image_list = []
        for id_ in range(4):
            test_image = utils.create_test_image(
                context=self.context,
                id=id_,
                repo='testrepo' + str(id_),
                uuid=uuidutils.generate_uuid())
            image_list.append(objects.Image(self.context, **test_image))
        mock_image_list.return_value = image_list[-1:]
        response = self.get('/v1/images/?limit=3&marker=%s'
                            % image_list[2].uuid)

        self.assertEqual(200, response.status_int)
        actual_images = response.json['images']
        self.assertEqual(1, len(actual_images))
        self.assertEqual(image_list[-1].uuid,
                         actual_images[0].get('uuid'))
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_get_all_hosts_with_pagination_marker(self, mock_host_list,
                                                  mock_policy):
        mock_policy.return_value = True
        host_list = []
        for id_ in range(4):
            test_host = utils.create_test_compute_node(
                context=self.context,
                uuid=uuidutils.generate_uuid())
            numat = numa.NUMATopology._from_dict(test_host['numa_topology'])
            test_host['numa_topology'] = numat
            host = objects.ComputeNode(self.context, **test_host)
            host_list.append(host)
        mock_host_list.return_value = host_list[-1:]
        response = self.get('/v1/hosts?limit=3&marker=%s'
                            % host_list[2].uuid)

        self.assertEqual(200, response.status_int)
        actual_hosts = response.json['hosts']
        self.assertEqual(1, len(actual_hosts))
        self.assertEqual(host_list[-1].uuid,
                         actual_hosts[0].get('uuid'))
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_get_all_containers_with_pagination_marker(self,
                                                       mock_container_list,
                                                       mock_container_show):
        container_list = []
        for id_ in range(4):
            test_container = utils.create_test_container(
                id=id_, uuid=uuidutils.generate_uuid(),
                name='container' + str(id_), context=self.context)
            container_list.append(objects.Container(self.context,
                                                    **test_container))
        mock_container_list.return_value = container_list[-1:]
        mock_container_show.return_value = container_list[-1]
        response = self.get('/v1/containers/?limit=3&marker=%s'
                            % container_list[2].uuid)

        self.assertEqual(200, response.status_int)
        actual_containers = response.json['containers']
        self.assertEqual(1, len(actual_containers))
        self.assertEqual(container_list[-1].uuid,
                         actual_containers[0].get('uuid'))
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_refresh(self):
        uuid = self.fake_volume_mapping['uuid']
        new_uuid = uuidutils.generate_uuid()
        returns = [dict(self.fake_volume_mapping, uuid=uuid),
                   dict(self.fake_volume_mapping, uuid=new_uuid)]
        expected = [mock.call(self.context, uuid),
                    mock.call(self.context, uuid)]
        with mock.patch.object(self.dbapi, 'get_volume_mapping_by_uuid',
                               side_effect=returns,
                               autospec=True) as mock_get_volume_mapping:
            volume_mapping = objects.VolumeMapping.get_by_uuid(self.context,
                                                               uuid)
            self.assertEqual(uuid, volume_mapping.uuid)
            volume_mapping.refresh()
            self.assertEqual(new_uuid, volume_mapping.uuid)
            self.assertEqual(expected, mock_get_volume_mapping.call_args_list)
            self.assertEqual(self.context, volume_mapping._context)
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_refresh(self):
        uuid = self.fake_container['uuid']
        new_uuid = uuidutils.generate_uuid()
        returns = [dict(self.fake_container, uuid=uuid),
                   dict(self.fake_container, uuid=new_uuid)]
        expected = [mock.call(self.context, uuid),
                    mock.call(self.context, uuid)]
        with mock.patch.object(self.dbapi, 'get_container_by_uuid',
                               side_effect=returns,
                               autospec=True) as mock_get_container:
            container = objects.Container.get_by_uuid(self.context, uuid)
            self.assertEqual(uuid, container.uuid)
            container.refresh()
            self.assertEqual(new_uuid, container.uuid)
            self.assertEqual(expected, mock_get_container.call_args_list)
            self.assertEqual(self.context, container._context)
项目:zun    作者:openstack    | 项目源码 | 文件源码
def container_attach(self, context, container):
        LOG.debug('Get websocket url from the container: %s', container.uuid)
        try:
            url = self.driver.get_websocket_url(context, container)
            token = uuidutils.generate_uuid()
            access_url = '%s?token=%s&uuid=%s' % (
                CONF.websocket_proxy.base_url, token, container.uuid)
            container.websocket_url = url
            container.websocket_token = token
            container.save(context)
            return access_url
        except Exception as e:
            LOG.error("Error occurred while calling "
                      "get websocket url function: %s",
                      six.text_type(e))
            raise
项目:vmware-nsxlib    作者:openstack    | 项目源码 | 文件源码
def test_update_logical_router_port(self):
        fake_router_port = test_constants.FAKE_ROUTER_PORT.copy()
        uuid = fake_router_port['id']
        fake_relay_uuid = uuidutils.generate_uuid()
        lrport = self.get_mocked_resource()
        with mock.patch.object(lrport, 'get', return_value=fake_router_port),\
            mock.patch("vmware_nsxlib.v3.NsxLib.get_version",
                       return_value='2.0.0'):
            lrport.update(uuid, relay_service_uuid=fake_relay_uuid)
            data = {
                'id': uuid,
                'display_name': fake_router_port['display_name'],
                'logical_router_id': fake_router_port['logical_router_id'],
                'resource_type': fake_router_port['resource_type'],
                "revision": 0,
                'service_bindings': [{'service_id': {
                    'target_type': 'LogicalService',
                    'target_id': fake_relay_uuid}}]
            }

            test_client.assert_json_call(
                'put', lrport,
                'https://1.2.3.4/api/v1/logical-router-ports/%s' % uuid,
                data=jsonutils.dumps(data, sort_keys=True),
                headers=self.default_headers())
项目:kuryr-libnetwork    作者:openstack    | 项目源码 | 文件源码
def test_get_default_network_id(self, mock_get_port_from_host, mock_conf):
        mock_conf.binding.link_iface = 'eth0'

        fake_endpoint_id = lib_utils.get_hash()
        fake_neutron_port_id = uuidutils.generate_uuid()
        fake_neutron_net_id = uuidutils.generate_uuid()
        fake_neutron_v4_subnet_id = uuidutils.generate_uuid()
        fake_neutron_v6_subnet_id = uuidutils.generate_uuid()

        fake_vm_port = self._get_fake_port(
            fake_endpoint_id, fake_neutron_net_id,
            fake_neutron_port_id, lib_const.PORT_STATUS_ACTIVE,
            fake_neutron_v4_subnet_id, fake_neutron_v6_subnet_id)['port']
        mock_get_port_from_host.return_value = fake_vm_port

        nested_driver = nested.NestedDriver()
        host_network_id = nested_driver.get_default_network_id()
        mock_get_port_from_host.assert_called_with('eth0')
        self.assertEqual(host_network_id, fake_vm_port['network_id'])
项目:kuryr-libnetwork    作者:openstack    | 项目源码 | 文件源码
def test_get_segmentation_id(self, mock_alloc_seg_id, mock_trunk_port,
                                 mock_vlan_check):
        mock_trunk_port.return_value = None
        mock_vlan_check.return_value = None
        fake_neutron_port1_id = uuidutils.generate_uuid()
        fake_neutron_port2_id = uuidutils.generate_uuid()
        mock_alloc_seg_id.side_effect = [1, 2]

        vlan_driver = vlan.VlanDriver()

        response = vlan_driver._get_segmentation_id(fake_neutron_port1_id)
        mock_alloc_seg_id.assert_called_once()
        self.assertEqual(response, 1)

        mock_alloc_seg_id.reset_mock()
        response = vlan_driver._get_segmentation_id(fake_neutron_port1_id)
        mock_alloc_seg_id.assert_not_called()
        self.assertEqual(response, 1)

        response = vlan_driver._get_segmentation_id(fake_neutron_port2_id)
        mock_alloc_seg_id.assert_called_once()
        self.assertEqual(response, 2)
项目:kuryr-libnetwork    作者:openstack    | 项目源码 | 文件源码
def test_create_host_iface(self, mock_port_bind):
        veth_driver = veth.VethDriver()
        fake_endpoint_id = lib_utils.get_hash()
        fake_neutron_port = uuidutils.generate_uuid()
        fake_neutron_net_id = uuidutils.generate_uuid()
        fake_neutron_v4_subnet_id = uuidutils.generate_uuid()
        fake_neutron_v6_subnet_id = uuidutils.generate_uuid()

        fake_subnets = self._get_fake_subnets(
            fake_endpoint_id, fake_neutron_net_id,
            fake_neutron_v4_subnet_id, fake_neutron_v6_subnet_id)
        fake_network = mock.sentinel.binding_network
        fake_exec_response = ('fake_stdout', '')
        mock_port_bind.return_value = ('fake_host_ifname',
            'fake_container_ifname', fake_exec_response)

        response = veth_driver.create_host_iface(fake_endpoint_id,
            fake_neutron_port, fake_subnets, fake_network)
        mock_port_bind.assert_called_with(fake_endpoint_id,
            fake_neutron_port, fake_subnets, fake_network)
        self.assertEqual(response, fake_exec_response)
项目:monasca-transform    作者:openstack    | 项目源码 | 文件源码
def test_add_offset(self):

        topic_1 = uuidutils.generate_uuid()
        partition_1 = random.randint(0, 1024)
        until_offset_1 = random.randint(0, sys.maxsize)
        from_offset_1 = random.randint(0, sys.maxsize)
        app_name_1 = uuidutils.generate_uuid()
        offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1)

        my_batch_time = self.get_dummy_batch_time()

        used_values = {}
        self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
                                    app_name=app_name_1,
                                    from_offset=from_offset_1,
                                    until_offset=until_offset_1,
                                    batch_time_info=my_batch_time)
        used_values[offset_key_1] = {
            "topic": topic_1, "partition": partition_1, "app_name": app_name_1,
            "from_offset": from_offset_1, "until_offset": until_offset_1
        }

        kafka_offset_specs = self.kafka_offset_specs.get_kafka_offsets(
            app_name_1)
        offset_value_1 = kafka_offset_specs.get(offset_key_1)
        self.assertions_on_offset(used_value=used_values.get(offset_key_1),
                                  offset_value=offset_value_1)
项目:monasca-transform    作者:openstack    | 项目源码 | 文件源码
def test_update_offset_values(self):
        topic_1 = uuidutils.generate_uuid()
        partition_1 = random.randint(0, 1024)
        until_offset_1 = random.randint(0, sys.maxsize)
        from_offset_1 = random.randint(0, sys.maxsize)
        app_name_1 = uuidutils.generate_uuid()
        offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1)

        my_batch_time = self.get_dummy_batch_time()

        self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
                                    app_name=app_name_1,
                                    from_offset=from_offset_1,
                                    until_offset=until_offset_1,
                                    batch_time_info=my_batch_time)

        until_offset_2 = random.randint(0, sys.maxsize)
        while until_offset_2 == until_offset_1:
            until_offset_2 = random.randint(0, sys.maxsize)

        from_offset_2 = random.randint(0, sys.maxsize)
        while from_offset_2 == from_offset_1:
            from_offset_2 = random.randint(0, sys.maxsize)

        self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
                                    app_name=app_name_1,
                                    from_offset=from_offset_2,
                                    until_offset=until_offset_2,
                                    batch_time_info=my_batch_time)

        kafka_offset_specs = self.kafka_offset_specs.get_kafka_offsets(
            app_name_1)
        updated_offset_value = kafka_offset_specs.get(offset_key_1)
        self.assertEqual(from_offset_2, updated_offset_value.get_from_offset())
        self.assertEqual(until_offset_2,
                         updated_offset_value.get_until_offset())
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_neutron_to_osvif_network(self):
        network_id = uuidutils.generate_uuid()
        network_name = 'test-net'
        network_mtu = 1500
        neutron_network = {
            'id': network_id,
            'name': network_name,
            'mtu': network_mtu,
        }

        network = ovu.neutron_to_osvif_network(neutron_network)

        self.assertEqual(network_id, network.id)
        self.assertEqual(network_name, network.label)
        self.assertEqual(network_mtu, network.mtu)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_neutron_to_osvif_network_no_name(self):
        network_id = uuidutils.generate_uuid()
        network_mtu = 1500
        neutron_network = {
            'id': network_id,
            'mtu': network_mtu,
        }

        network = ovu.neutron_to_osvif_network(neutron_network)

        self.assertFalse(network.obj_attr_is_set('label'))
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_neutron_to_osvif_network_no_mtu(self):
        network_id = uuidutils.generate_uuid()
        network_name = 'test-net'
        neutron_network = {
            'id': network_id,
            'name': network_name,
        }

        network = ovu.neutron_to_osvif_network(neutron_network)

        self.assertIsNone(network.mtu)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_neutron_to_osvif_vif_ovs_no_bridge(self):
        vif_plugin = 'ovs'
        port = {'id': uuidutils.generate_uuid()}
        subnets = {}

        self.assertRaises(o_cfg.RequiredOptError,
                          ovu.neutron_to_osvif_vif_ovs,
                          vif_plugin, port, subnets)