Python marshmallow.fields 模块,UUID 实例源码

我们从Python开源项目中,提取了以下12个代码示例,用于说明如何使用marshmallow.fields.UUID

项目:queue-messaging    作者:socialwifi    | 项目源码 | 文件源码
def test_send(header_timestamp_mock, pubsub_client_mock):
    messaging = queue_messaging.Messaging.create_from_dict({
        'TOPIC': 'test-topic',
    })
    model = FancyEvent(
        uuid_field=uuid.UUID('cd1d3a03-7b04-4a35-97f8-ee5f3eb04c8e'),
        string_field='Just testing!'
    )
    header_timestamp_mock.return_value = datetime.datetime(
        2016, 12, 10, 11, 15, 45, 123456, tzinfo=datetime.timezone.utc)

    messaging.send(model)

    topic_mock = pubsub_client_mock.return_value.topic
    publish_mock = topic_mock.return_value.publish
    topic_mock.assert_called_with('test-topic')
    publish_mock.assert_called_with(
        test_utils.EncodedJson({
            "uuid_field": "cd1d3a03-7b04-4a35-97f8-ee5f3eb04c8e",
            "string_field": "Just testing!"
        }),
        timestamp='2016-12-10T11:15:45.123456Z',
        type='FancyEvent'
    )
项目:queue-messaging    作者:socialwifi    | 项目源码 | 文件源码
def test_if_encode_raises_exception_with_invalid_data_and_strict_schema():
    class StrictSchema(marshmallow.Schema):
        uuid_field = fields.UUID(required=True)

        class Meta:
            strict = True

    class Event(structures.Model):
        class Meta:
            schema = StrictSchema
            type_name = 'Event'

    data = Event(uuid_field='not an uuid')
    with pytest.raises(exceptions.EncodingError) as excinfo:
        encoding.encode(data)
    assert str(excinfo.value) == (
        "({'uuid_field': ['Not a valid UUID.']}, '')")
项目:QualquerMerdaAPI    作者:tiagovizoto    | 项目源码 | 文件源码
def _add_column_kwargs(self, kwargs, column):
        """Add keyword arguments to kwargs (in-place) based on the passed in
        `Column <sqlalchemy.schema.Column>`.
        """
        if column.nullable:
            kwargs['allow_none'] = True
        kwargs['required'] = not column.nullable and not _has_default(column)

        if hasattr(column.type, 'enums'):
            kwargs['validate'].append(validate.OneOf(choices=column.type.enums))

        # Add a length validator if a max length is set on the column
        # Skip UUID columns
        # (see https://github.com/marshmallow-code/marshmallow-sqlalchemy/issues/54)
        if hasattr(column.type, 'length'):
            try:
                python_type = column.type.python_type
            except (AttributeError, NotImplementedError):
                python_type = None
            if not python_type or not issubclass(python_type, uuid.UUID):
                kwargs['validate'].append(validate.Length(max=column.type.length))

        if hasattr(column.type, 'scale'):
            kwargs['places'] = getattr(column.type, 'scale', None)
项目:queue-messaging    作者:socialwifi    | 项目源码 | 文件源码
def test_receive(pubsub_client_mock):
    messaging = queue_messaging.Messaging.create_from_dict({
        'SUBSCRIPTION': 'test-subscription',
        'MESSAGE_TYPES': [
            FancyEvent,
        ],
    })
    topic_mock = pubsub_client_mock.return_value.topic.return_value
    subscription_mock = topic_mock.subscription.return_value
    mocked_message = mock.MagicMock(
        data=(b'{"uuid_field": "cd1d3a03-7b04-4a35-97f8-ee5f3eb04c8e", '
              b'"string_field": "Just testing!"}'),
        message_id=1,
        attributes={
            'timestamp': '2016-12-10T11:15:45.123456Z',
            'type': 'FancyEvent',
        }
    )
    subscription_mock.pull.return_value = [
        (123, mocked_message)
    ]

    envelope = messaging.receive()

    topic_mock.subscription.assert_called_with('test-subscription')
    assert envelope.model == FancyEvent(
        uuid_field=uuid.UUID('cd1d3a03-7b04-4a35-97f8-ee5f3eb04c8e'),
        string_field='Just testing!'
    )

    envelope.acknowledge()

    subscription_mock.acknowledge.assert_called_with([123])
项目:queue-messaging    作者:socialwifi    | 项目源码 | 文件源码
def test_encoding_payload_valid():
    data = FancyEvent(
        string_field='123456789',
        uuid_field=uuid.UUID('72d9a041-f401-42b6-8556-72b3c00e43d8'),
    )
    encoded_data = encoding.encode(
        model=data,
    )
    assert (json.loads(encoded_data)
            == json.loads('{"string_field": "123456789", "uuid_field": "72d9a041-f401-42b6-8556-72b3c00e43d8"}'))
项目:queue-messaging    作者:socialwifi    | 项目源码 | 文件源码
def test_if_encode_raises_exception_with_invalid_data_and_not_strict_schema():
    class NotStrictSchema(marshmallow.Schema):
        uuid_field = fields.UUID(required=True)

    class Event(structures.Model):
        class Meta:
            schema = NotStrictSchema
            type_name = 'Event'

    data = Event(uuid_field='not an uuid')
    with pytest.raises(exceptions.EncodingError) as excinfo:
        encoding.encode(data)
    assert str(excinfo.value) == (
        "({'uuid_field': ['Not a valid UUID.']}, '')")
项目:queue-messaging    作者:socialwifi    | 项目源码 | 文件源码
def test_create_attributes_uses_current_date_for_timestamp(get_now_with_utc_timezone):
    data = FancyEvent(
        string_field='123456789',
        uuid_field=uuid.UUID('72d9a041-f401-42b6-8556-72b3c00e43d8'),
    )
    get_now_with_utc_timezone.return_value = datetime.datetime(
        2016, 12, 10, 11, 15, 45, tzinfo=datetime.timezone.utc)
    attributes = encoding.create_attributes(data)
    assert attributes == {
        'type': 'FancyEvent',
        'timestamp': '2016-12-10T11:15:45.000000Z',
    }
项目:queue-messaging    作者:socialwifi    | 项目源码 | 文件源码
def schema_class(self):
        class Schema(marshmallow.Schema):
            uuid_field = fields.UUID(required=True)
            string_field = fields.String(required=False)
        return Schema
项目:queue-messaging    作者:socialwifi    | 项目源码 | 文件源码
def test_if_works(self, event_class):
        data = ('{"uuid_field": "72d9a041-f401-42b6-8556-72b3c00e43d8", '
                '"string_field": "123456789"}')
        result = encoding.decode(type=event_class, encoded_data=data)
        assert result == event_class(
            uuid_field=uuid.UUID('72d9a041-f401-42b6-8556-72b3c00e43d8'),
            string_field='123456789'
        )
项目:queue-messaging    作者:socialwifi    | 项目源码 | 文件源码
def test_if_works_when_optional_field_is_missing(self, event_class):
        data = '{"uuid_field": "72d9a041-f401-42b6-8556-72b3c00e43d8"}'
        result = encoding.decode(type=event_class, encoded_data=data)
        assert result == event_class(
            uuid_field=uuid.UUID('72d9a041-f401-42b6-8556-72b3c00e43d8')
        )
项目:queue-messaging    作者:socialwifi    | 项目源码 | 文件源码
def test_if_works_when_additional_field_is_present(self, event_class):
        data = ('{"uuid_field": "72d9a041-f401-42b6-8556-72b3c00e43d8", '
                '"string_field": "123456789", '
                '"new_field": "does it work?"}')
        result = encoding.decode(type=event_class, encoded_data=data)
        assert result == event_class(
            uuid_field=uuid.UUID('72d9a041-f401-42b6-8556-72b3c00e43d8'),
            string_field='123456789'
        )
项目:queue-messaging    作者:socialwifi    | 项目源码 | 文件源码
def test_if_raises_exception_with_invalid_data_and_not_strict_schema(self):
        class NotStrictSchema(marshmallow.Schema):
            uuid_field = fields.UUID(required=True)

        class Event(structures.Model):
            class Meta:
                schema = NotStrictSchema
                type_name = 'Event'

        data = '{"uuid_field": "not an uuid"}'
        with pytest.raises(exceptions.DecodingError) as excinfo:
            encoding.decode(type=Event, encoded_data=data)
        assert str(excinfo.value) == (
            "({'uuid_field': ['Not a valid UUID.']}, '')")