Java 类com.amazonaws.services.sqs.model.MessageAttributeValue 实例源码

项目:Camel    文件:SqsEndpoint.java   
private Exchange createExchange(ExchangePattern pattern, com.amazonaws.services.sqs.model.Message msg) {
    Exchange exchange = super.createExchange(pattern);
    Message message = exchange.getIn();
    message.setBody(msg.getBody());
    message.setHeaders(new HashMap<String, Object>(msg.getAttributes()));
    message.setHeader(SqsConstants.MESSAGE_ID, msg.getMessageId());
    message.setHeader(SqsConstants.MD5_OF_BODY, msg.getMD5OfBody());
    message.setHeader(SqsConstants.RECEIPT_HANDLE, msg.getReceiptHandle());
    message.setHeader(SqsConstants.ATTRIBUTES, msg.getAttributes());
    message.setHeader(SqsConstants.MESSAGE_ATTRIBUTES, msg.getMessageAttributes());

    //Need to apply the SqsHeaderFilterStrategy this time
    HeaderFilterStrategy headerFilterStrategy = getHeaderFilterStrategy();
    //add all sqs message attributes as camel message headers so that knowledge of 
    //the Sqs class MessageAttributeValue will not leak to the client
    for (Entry<String, MessageAttributeValue> entry : msg.getMessageAttributes().entrySet()) {
        String header = entry.getKey();
        Object value = translateValue(entry.getValue());
        if (!headerFilterStrategy.applyFilterToExternalHeaders(header, value, exchange)) {
            message.setHeader(header, value);
        }
    }
    return exchange;
}
项目:herd    文件:SqsDaoImpl.java   
@Override
public SendMessageResult sendMessage(AwsParamsDto awsParamsDto, String queueName, String messageText, List<MessageHeader> messageHeaders)
{
    Map<String, MessageAttributeValue> messageAttributes = null;

    if (CollectionUtils.isNotEmpty(messageHeaders))
    {
        messageAttributes = new HashMap<>();

        for (MessageHeader messageHeader : messageHeaders)
        {
            messageAttributes.put(messageHeader.getKey(), new MessageAttributeValue().withDataType("String").withStringValue(messageHeader.getValue()));
        }
    }

    return sqsOperations.sendMessage(queueName, messageText, messageAttributes, awsClientFactory.getAmazonSQSClient(awsParamsDto));
}
项目:herd    文件:MockSqsOperationsImpl.java   
@Override
public SendMessageResult sendMessage(String queueName, String messageText, Map<String, MessageAttributeValue> messageAttributes, AmazonSQS amazonSQS)
{
    // Throw a throttling exception for a specific queue name for testing purposes.
    if (queueName.equals(MockAwsOperationsHelper.AMAZON_THROTTLING_EXCEPTION))
    {
        AmazonServiceException throttlingException = new AmazonServiceException("test throttling exception");
        throttlingException.setErrorCode("ThrottlingException");
        throw throttlingException;
    }

    // Throw an illegal state exception for a specific queue name for testing purposes.
    if (queueName.equals(MOCK_SQS_QUEUE_NOT_FOUND_NAME))
    {
        throw new IllegalStateException(String.format("AWS SQS queue with \"%s\" name not found.", queueName));
    }

    // Nothing else to do in the normal case since our unit tests aren't reading messages once they have been published.
    return new SendMessageResult().withMessageId(AbstractDaoTest.MESSAGE_ID);
}
项目:dropwizard-sqs-bundle    文件:SqsSenderTest.java   
@Test
public void shouldSendMessageWithCorrectAttributes() {
    //GIVEN
    String body = "Sample text message";
    Map<String, MessageAttributeValue> attributes = new HashMap<>();
    attributes.put("attribute1", new MessageAttributeValue()
            .withDataType("String")
            .withStringValue("value1"));
    attributes.put("attribute2", new MessageAttributeValue()
            .withDataType("Number")
            .withStringValue("230.000000000000000001"));

    //WHEN
    sender.send(body, attributes);

    //THEN
    SendMessageRequest expected = new SendMessageRequest();
    expected.withQueueUrl(queueUrl)
            .withMessageBody(body)
            .withMessageAttributes(attributes);
    verify(sqs).sendMessage(expected);
}
项目:dropwizard-sqs-bundle    文件:SqsSenderTest.java   
@Test
public void shouldSendObjectMessageWithCorrectAttributes() throws JsonProcessingException {
    //GIVEN
    DummyObject bodyObject = new DummyObject();
    Map<String, MessageAttributeValue> attributes = new HashMap<>();
    attributes.put("attribute1", new MessageAttributeValue()
            .withDataType("String")
            .withStringValue("value1"));
    attributes.put("attribute2", new MessageAttributeValue()
            .withDataType("Number")
            .withStringValue("230.000000000000000001"));

    //WHEN
    sender.send(bodyObject, attributes);

    //THEN
    SendMessageRequest expected = new SendMessageRequest();
    expected.withQueueUrl(queueUrl)
            .withMessageBody(objectMapper.writeValueAsString(bodyObject))
            .withMessageAttributes(attributes);
    verify(sqs).sendMessage(expected);
}
项目:amazon-sqs-java-extended-client-lib    文件:AmazonSQSExtendedClient.java   
private int getMsgAttributesSize(Map<String, MessageAttributeValue> msgAttributes) {
    int totalMsgAttributesSize = 0;
    for (Entry<String, MessageAttributeValue> entry : msgAttributes.entrySet()) {
        totalMsgAttributesSize += getStringSizeInBytes(entry.getKey());

        MessageAttributeValue entryVal = entry.getValue();
        if (entryVal.getDataType() != null) {
            totalMsgAttributesSize += getStringSizeInBytes(entryVal.getDataType());
        }

        String stringVal = entryVal.getStringValue();
        if (stringVal != null) {
            totalMsgAttributesSize += getStringSizeInBytes(entryVal.getStringValue());
        }

        ByteBuffer binaryVal = entryVal.getBinaryValue();
        if (binaryVal != null) {
            totalMsgAttributesSize += binaryVal.array().length;
        }
    }
    return totalMsgAttributesSize;
}
项目:micro-genie    文件:SqsProducer.java   
/***
 * Categorize the messages into batches per queue
 * @param messages
 * @return messageBatches - belonging to one or more queues
 */
private Map<String, List<SendMessageBatchRequestEntry>> createBatchesForQueues(final List<Message> messages) {

    final Map<String, List<SendMessageBatchRequestEntry>> messageBatches = new HashMap<String, List<SendMessageBatchRequestEntry>>();

    for(Message message : messages){
        final Map<String, MessageAttributeValue> attributes = this.toMessageAttrs(message);

        final SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry() 
        .withId(message.getId())
        .withMessageAttributes(attributes)
        .withMessageBody(message.getBody());

        if(!messageBatches.containsKey(message.getQueue())){
            messageBatches.put(message.getQueue(), new ArrayList<SendMessageBatchRequestEntry>());
        }
        messageBatches.get(message.getQueue()).add(entry);
    }
    return messageBatches;
}
项目:spring-cloud-aws    文件:QueueMessageChannel.java   
private SendMessageRequest prepareSendMessageRequest(Message<?> message) {
    SendMessageRequest sendMessageRequest = new SendMessageRequest(this.queueUrl, String.valueOf(message.getPayload()));

    if (message.getHeaders().containsKey(SqsMessageHeaders.SQS_GROUP_ID_HEADER)) {
        sendMessageRequest.setMessageGroupId(message.getHeaders().get(SqsMessageHeaders.SQS_GROUP_ID_HEADER, String.class));
    }

    if (message.getHeaders().containsKey(SqsMessageHeaders.SQS_DEDUPLICATION_ID_HEADER)) {
        sendMessageRequest.setMessageDeduplicationId(message.getHeaders().get(SqsMessageHeaders.SQS_DEDUPLICATION_ID_HEADER, String.class));
    }

    if (message.getHeaders().containsKey(SqsMessageHeaders.SQS_DELAY_HEADER)) {
        sendMessageRequest.setDelaySeconds(message.getHeaders().get(SqsMessageHeaders.SQS_DELAY_HEADER, Integer.class));
    }

    Map<String, MessageAttributeValue> messageAttributes = getMessageAttributes(message);
    if (!messageAttributes.isEmpty()) {
        sendMessageRequest.withMessageAttributes(messageAttributes);
    }

    return sendMessageRequest;
}
项目:spring-cloud-aws    文件:QueueMessageUtils.java   
private static Map<String, Object> getMessageAttributesAsMessageHeaders(com.amazonaws.services.sqs.model.Message message) {
    Map<String, Object> messageHeaders = new HashMap<>();
    for (Map.Entry<String, MessageAttributeValue> messageAttribute : message.getMessageAttributes().entrySet()) {
        if (MessageHeaders.CONTENT_TYPE.equals(messageAttribute.getKey())) {
            messageHeaders.put(MessageHeaders.CONTENT_TYPE, MimeType.valueOf(messageAttribute.getValue().getStringValue()));
        } else if (MessageHeaders.ID.equals(messageAttribute.getKey())) {
            messageHeaders.put(MessageHeaders.ID, UUID.fromString(messageAttribute.getValue().getStringValue()));
        } else if (MessageAttributeDataTypes.STRING.equals(messageAttribute.getValue().getDataType())) {
            messageHeaders.put(messageAttribute.getKey(), messageAttribute.getValue().getStringValue());
        } else if (messageAttribute.getValue().getDataType().startsWith(MessageAttributeDataTypes.NUMBER)) {
            Object numberValue = getNumberValue(messageAttribute.getValue());
            if (numberValue != null) {
                messageHeaders.put(messageAttribute.getKey(), numberValue);
            }
        } else if (MessageAttributeDataTypes.BINARY.equals(messageAttribute.getValue().getDataType())) {
            messageHeaders.put(messageAttribute.getKey(), messageAttribute.getValue().getBinaryValue());
        }
    }

    return messageHeaders;
}
项目:spring-cloud-aws    文件:QueueMessageChannelTest.java   
@Test
public void receiveMessage_withMimeTypeMessageAttribute_shouldCopyToHeaders() throws Exception {
    // Arrange
    AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
    MimeType mimeType = new MimeType("test", "plain", Charset.forName("UTF-8"));
    when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue").
            withWaitTimeSeconds(0).
            withMaxNumberOfMessages(1).
            withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES).
            withMessageAttributeNames("All"))).
            thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello").
                    withMessageAttributes(Collections.singletonMap(MessageHeaders.CONTENT_TYPE,
                            new MessageAttributeValue().withDataType(MessageAttributeDataTypes.STRING).withStringValue(mimeType.toString())))));

    PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue");

    // Act
    Message<?> receivedMessage = messageChannel.receive();

    // Assert
    assertEquals(mimeType, receivedMessage.getHeaders().get(MessageHeaders.CONTENT_TYPE));
}
项目:spring-cloud-aws    文件:QueueMessageChannelTest.java   
@Test
public void receiveMessage_withStringMessageHeader_shouldBeReceivedAsQueueMessageAttribute() throws Exception {
    // Arrange
    AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
    String headerValue = "Header value";
    String headerName = "MyHeader";
    when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue").
            withWaitTimeSeconds(0).
            withMaxNumberOfMessages(1).
            withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES).
            withMessageAttributeNames("All"))).
            thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello").
                    withMessageAttributes(Collections.singletonMap(headerName,
                            new MessageAttributeValue().withDataType(MessageAttributeDataTypes.STRING).withStringValue(headerValue)))));

    PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue");

    // Act
    Message<?> receivedMessage = messageChannel.receive();

    // Assert
    assertEquals(headerValue, receivedMessage.getHeaders().get(headerName));
}
项目:spring-cloud-aws    文件:QueueMessageChannelTest.java   
@Test
public void receiveMessage_withIncompatibleNumericMessageHeader_shouldThrowAnException() throws Exception {
    // Arrange
    AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
    this.expectedException.expect(IllegalArgumentException.class);
    this.expectedException.expectMessage("Cannot convert String [17] to target class [java.util.concurrent.atomic.AtomicInteger]");

    HashMap<String, MessageAttributeValue> messageAttributes = new HashMap<>();
    AtomicInteger atomicInteger = new AtomicInteger(17);
    messageAttributes.put("atomicInteger", new MessageAttributeValue().withDataType(MessageAttributeDataTypes.NUMBER + ".java.util.concurrent.atomic.AtomicInteger").withStringValue(String.valueOf(atomicInteger)));

    when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue").
            withWaitTimeSeconds(0).
            withMaxNumberOfMessages(1).
            withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES).
            withMessageAttributeNames("All"))).
            thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello").
                    withMessageAttributes(messageAttributes)));

    PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue");

    // Act
    messageChannel.receive();
}
项目:spring-cloud-aws    文件:QueueMessageChannelTest.java   
@Test
public void receiveMessage_withMissingNumericMessageHeaderTargetClass_shouldThrowAnException() throws Exception {
    // Arrange
    AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
    this.expectedException.expect(MessagingException.class);
    this.expectedException.expectMessage("Message attribute with value '12' and data type 'Number.class.not.Found' could not be converted" +
            " into a Number because target class was not found.");

    HashMap<String, MessageAttributeValue> messageAttributes = new HashMap<>();
    messageAttributes.put("classNotFound", new MessageAttributeValue().withDataType(MessageAttributeDataTypes.NUMBER + ".class.not.Found").withStringValue("12"));

    when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue").
            withWaitTimeSeconds(0).
            withMaxNumberOfMessages(1).
            withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES).
            withMessageAttributeNames("All"))).
            thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello").
                    withMessageAttributes(messageAttributes)));

    PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue");

    // Act
    messageChannel.receive();
}
项目:spring-cloud-aws    文件:QueueMessageChannelTest.java   
@Test
public void receiveMessage_withBinaryMessageHeader_shouldBeReceivedAsByteBufferMessageAttribute() throws Exception {
    // Arrange
    AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
    ByteBuffer headerValue = ByteBuffer.wrap("My binary data!".getBytes());
    String headerName = "MyHeader";
    when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue").
            withWaitTimeSeconds(0).
            withMaxNumberOfMessages(1).
            withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES).
            withMessageAttributeNames("All"))).
            thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello").
                    withMessageAttributes(Collections.singletonMap(headerName,
                            new MessageAttributeValue().withDataType(MessageAttributeDataTypes.BINARY).withBinaryValue(headerValue)))));

    PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue");

    // Act
    Message<?> receivedMessage = messageChannel.receive();

    // Assert
    assertEquals(headerValue, receivedMessage.getHeaders().get(headerName));
}
项目:spring-cloud-aws    文件:QueueMessageChannelTest.java   
@Test
public void receiveMessage_withIdOfTypeString_IdShouldBeConvertedToUuid() throws Exception {
    // Arrange
    AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
    UUID uuid = UUID.randomUUID();
    when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue").
            withWaitTimeSeconds(0).
            withMaxNumberOfMessages(1).
            withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES).
            withMessageAttributeNames("All"))).
            thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello").
                    withMessageAttributes(Collections.singletonMap(MessageHeaders.ID,
                            new MessageAttributeValue().withDataType(MessageAttributeDataTypes.STRING).withStringValue(uuid.toString())))));

    PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue");

    // Act
    Message<?> receivedMessage = messageChannel.receive();

    // Assert
    Object idMessageHeader = receivedMessage.getHeaders().get(MessageHeaders.ID);
    assertTrue(UUID.class.isInstance(idMessageHeader));
    assertEquals(uuid, idMessageHeader);
}
项目:amazon-sqs-java-messaging-lib    文件:SQSMessageProducer.java   
/**
 * Adds the reply-to queue name and url attributes during send as part of the send message
 * request, if necessary
 */
private void addReplyToQueueReservedAttributes(Map<String, MessageAttributeValue> messageAttributes,
                                               SQSMessage message) throws JMSException {

    Destination replyTo = message.getJMSReplyTo();
    if (replyTo instanceof SQSQueueDestination) {
        SQSQueueDestination replyToQueue = (SQSQueueDestination)replyTo;

        /**
         * This will override the existing attributes if exists. Everything that
         * has prefix JMS_ is reserved for JMS Provider, but if the user sets that
         * attribute, it will be overwritten.
         */
        addStringAttribute(messageAttributes, SQSMessage.JMS_SQS_REPLY_TO_QUEUE_NAME, replyToQueue.getQueueName());
        addStringAttribute(messageAttributes, SQSMessage.JMS_SQS_REPLY_TO_QUEUE_URL, replyToQueue.getQueueUrl());
    }
}
项目:Camel    文件:SqsEndpoint.java   
private Object translateValue(MessageAttributeValue mav) {
    Object result = null;
    if (mav.getStringValue() != null) {
        result = mav.getStringValue();
    } else if (mav.getBinaryValue() != null) {
        result = mav.getBinaryValue();
    }
    return result;
}
项目:herd    文件:SqsOperationsImpl.java   
@Override
public SendMessageResult sendMessage(String queueName, String messageText, Map<String, MessageAttributeValue> messageAttributes, AmazonSQS amazonSQS)
{
    try
    {
        return amazonSQS.sendMessage(new SendMessageRequest().withQueueUrl(amazonSQS.getQueueUrl(queueName).getQueueUrl()).withMessageBody(messageText)
            .withMessageAttributes(messageAttributes));
    }
    catch (QueueDoesNotExistException e)
    {
        throw new IllegalStateException(String.format("AWS SQS queue with \"%s\" name not found.", queueName), e);
    }
}
项目:dropwizard-sqs-bundle    文件:SqsSender.java   
public void send(Object object, Map<String, MessageAttributeValue> attributes) {
    final String json;
    try {
        json = objectMapper.writeValueAsString(object);
        send(json, attributes);
    } catch (JsonProcessingException e) {
        LOGGER.error("Could not send message to SQS, cause is " + e.getMessage(), e);
    }
}
项目:dropwizard-sqs-bundle    文件:SqsSender.java   
public void send(String body, Map<String, MessageAttributeValue> attributes) {
    SendMessageRequest sendMessageRequest = new SendMessageRequest();
    sendMessageRequest.withQueueUrl(queueUrl);
    sendMessageRequest.withMessageBody(body);
    for (Map.Entry<String, MessageAttributeValue> entry : attributes.entrySet()) {
        sendMessageRequest.addMessageAttributesEntry(entry.getKey(), entry.getValue());
    }
    sqs.sendMessage(sendMessageRequest);
}
项目:amazon-sqs-java-extended-client-lib    文件:AmazonSQSExtendedClient.java   
private SendMessageBatchRequestEntry storeMessageInS3(SendMessageBatchRequestEntry batchEntry) {

        checkMessageAttributes(batchEntry.getMessageAttributes());

        String s3Key = UUID.randomUUID().toString();

        // Read the content of the message from message body
        String messageContentStr = batchEntry.getMessageBody();

        Long messageContentSize = getStringSizeInBytes(messageContentStr);

        // Add a new message attribute as a flag
        MessageAttributeValue messageAttributeValue = new MessageAttributeValue();
        messageAttributeValue.setDataType("Number");
        messageAttributeValue.setStringValue(messageContentSize.toString());
        batchEntry.addMessageAttributesEntry(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME, messageAttributeValue);

        // Store the message content in S3.
        storeTextInS3(s3Key, messageContentStr, messageContentSize);

        LOG.info("S3 object created, Bucket name: " + clientConfiguration.getS3BucketName() + ", Object key: " + s3Key
                + ".");

        // Convert S3 pointer (bucket name, key, etc) to JSON string
        MessageS3Pointer s3Pointer = new MessageS3Pointer(clientConfiguration.getS3BucketName(), s3Key);
        String s3PointerStr = getJSONFromS3Pointer(s3Pointer);

        // Storing S3 pointer in the message body.
        batchEntry.setMessageBody(s3PointerStr);

        return batchEntry;
    }
项目:amazon-sqs-java-extended-client-lib    文件:AmazonSQSExtendedClient.java   
private SendMessageRequest storeMessageInS3(SendMessageRequest sendMessageRequest) {

        checkMessageAttributes(sendMessageRequest.getMessageAttributes());

        String s3Key = UUID.randomUUID().toString();

        // Read the content of the message from message body
        String messageContentStr = sendMessageRequest.getMessageBody();

        Long messageContentSize = getStringSizeInBytes(messageContentStr);

        // Add a new message attribute as a flag
        MessageAttributeValue messageAttributeValue = new MessageAttributeValue();
        messageAttributeValue.setDataType("Number");
        messageAttributeValue.setStringValue(messageContentSize.toString());
        sendMessageRequest.addMessageAttributesEntry(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME,
                messageAttributeValue);

        // Store the message content in S3.
        storeTextInS3(s3Key, messageContentStr, messageContentSize);
        LOG.info("S3 object created, Bucket name: " + clientConfiguration.getS3BucketName() + ", Object key: " + s3Key
                + ".");

        // Convert S3 pointer (bucket name, key, etc) to JSON string
        MessageS3Pointer s3Pointer = new MessageS3Pointer(clientConfiguration.getS3BucketName(), s3Key);

        String s3PointerStr = getJSONFromS3Pointer(s3Pointer);

        // Storing S3 pointer in the message body.
        sendMessageRequest.setMessageBody(s3PointerStr);

        return sendMessageRequest;
    }
项目:amazon-sqs-java-extended-client-lib    文件:AmazonSQSExtendedClientTest.java   
@Test
public void testWhenSmallMessageIsSentThenNoAttributeIsAdded() {
    int messageLength = LESS_THAN_SQS_SIZE_LIMIT;
    String messageBody = generateStringWithLength(messageLength);

    SendMessageRequest messageRequest = new SendMessageRequest(SQS_QUEUE_URL, messageBody);
    extendedSqsWithDefaultConfig.sendMessage(messageRequest);

    ArgumentCaptor<SendMessageRequest> sendMessageRequestCaptor = ArgumentCaptor.forClass(SendMessageRequest.class);
    verify(mockSqsBackend).sendMessage(sendMessageRequestCaptor.capture());

    Map<String, MessageAttributeValue> attributes = sendMessageRequestCaptor.getValue().getMessageAttributes();
    Assert.assertTrue(attributes.isEmpty());
}
项目:amazon-sqs-java-extended-client-lib    文件:AmazonSQSExtendedClientTest.java   
@Test
public void testWhenLargeMessgaeIsSentThenAttributeWithPayloadSizeIsAdded() {
    int messageLength = MORE_THAN_SQS_SIZE_LIMIT;
    String messageBody = generateStringWithLength(messageLength);

    SendMessageRequest messageRequest = new SendMessageRequest(SQS_QUEUE_URL, messageBody);
    extendedSqsWithDefaultConfig.sendMessage(messageRequest);

    ArgumentCaptor<SendMessageRequest> sendMessageRequestCaptor = ArgumentCaptor.forClass(SendMessageRequest.class);
    verify(mockSqsBackend).sendMessage(sendMessageRequestCaptor.capture());

    Map<String, MessageAttributeValue> attributes = sendMessageRequestCaptor.getValue().getMessageAttributes();
    Assert.assertEquals("Number", attributes.get(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME).getDataType());
    Assert.assertEquals(messageLength, (int)Integer.valueOf(attributes.get(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME).getStringValue()));
}
项目:micro-genie    文件:SqsProducer.java   
/**
 * Convert Message headers from {@link Message} to SQS {@link MessageAttributeValue} Map entries
 * @param message
 * @return messageAttributes
 */
private Map<String, MessageAttributeValue> toMessageAttrs(Message message){
    if(message!=null && message.getHeaders() != null && message.getHeaders().size()>0){
        final Map<String, MessageAttributeValue> messageAttrs = Maps.newHashMap();
        for(Entry<String, String> attr : message.getHeaders().entrySet()){
            messageAttrs.put(attr.getKey(), new MessageAttributeValue().withStringValue(attr.getValue()));
        }
        return messageAttrs;
    }
    return null;
}
项目:aws-sdk-java-resources    文件:SQSResourcesIntegrationTest.java   
/**
 * Tests sending of message with message attributes. Asserts that the
 * message received has the attributes. Also changes the visibility of the
 * messages and tries to retrieve them. Performs delete action on the
 * message to the delete it from the queue.
 */
@Test
@Ignore
public void testSendReceiveMessageAttributes() throws InterruptedException {

    SendMessageResult sendMessageResult = queue
            .sendMessage(new SendMessageRequest().withMessageBody(
                    TEST_MESSAGE_ATTRIBUTES).withMessageAttributes(
                    ImmutableMapParameter.of(
                            "testAttribute",
                            new MessageAttributeValue().withDataType(
                                    "String").withStringValue(
                                    "testAttributeValue"))));

    List<Message> messages = waitForMessagesFromQueue(new ReceiveMessageRequest()
            .withMessageAttributeNames("testAttribute"));

    assertNotNull(messages);
    assertEquals(1, messages.size());
    Message message = messages.get(0);
    assertMessage(TEST_MESSAGE_ATTRIBUTES,
            sendMessageResult.getMessageId(),
            sendMessageResult.getMD5OfMessageBody(), message);

    Map<String, MessageAttributeValue> messageAttributes = message
            .getMessageAttributes();
    assertNotNull(messageAttributes);
    assertTrue(messageAttributes.containsKey("testAttribute"));
    assertEquals(messageAttributes.get("testAttribute").getStringValue(),
            "testAttributeValue");

    message.changeVisibility(10);

    messages = waitForMessagesFromQueue(null);
    message.delete();
}
项目:spring-cloud-aws    文件:QueueMessageChannel.java   
private Map<String, MessageAttributeValue> getMessageAttributes(Message<?> message) {
    HashMap<String, MessageAttributeValue> messageAttributes = new HashMap<>();
    for (Map.Entry<String, Object> messageHeader : message.getHeaders().entrySet()) {
        String messageHeaderName = messageHeader.getKey();
        Object messageHeaderValue = messageHeader.getValue();

        if (isSkipHeader(messageHeaderName)) {
            continue;
        }

        if (MessageHeaders.CONTENT_TYPE.equals(messageHeaderName) && messageHeaderValue != null) {
            messageAttributes.put(messageHeaderName, getContentTypeMessageAttribute(messageHeaderValue));
        } else if (MessageHeaders.ID.equals(messageHeaderName) && messageHeaderValue != null) {
            messageAttributes.put(messageHeaderName, getStringMessageAttribute(messageHeaderValue.toString()));
        } else if (messageHeaderValue instanceof String) {
            messageAttributes.put(messageHeaderName, getStringMessageAttribute((String) messageHeaderValue));
        } else if (messageHeaderValue instanceof Number) {
            messageAttributes.put(messageHeaderName, getNumberMessageAttribute(messageHeaderValue));
        } else if (messageHeaderValue instanceof ByteBuffer) {
            messageAttributes.put(messageHeaderName, getBinaryMessageAttribute((ByteBuffer) messageHeaderValue));
        } else {
            this.logger.warn(String.format("Message header with name '%s' and type '%s' cannot be sent as" +
                            " message attribute because it is not supported by SQS.", messageHeaderName,
                    messageHeaderValue != null ? messageHeaderValue.getClass().getName() : ""));
        }
    }

    return messageAttributes;
}
项目:spring-cloud-aws    文件:QueueMessageChannel.java   
private MessageAttributeValue getContentTypeMessageAttribute(Object messageHeaderValue) {
    if (messageHeaderValue instanceof MimeType) {
        return new MessageAttributeValue().withDataType(MessageAttributeDataTypes.STRING).withStringValue(messageHeaderValue.toString());
    } else if (messageHeaderValue instanceof String) {
        return new MessageAttributeValue().withDataType(MessageAttributeDataTypes.STRING).withStringValue((String) messageHeaderValue);
    }
    return null;
}
项目:spring-cloud-aws    文件:QueueMessageUtils.java   
private static Object getNumberValue(MessageAttributeValue value) {
    String numberType = value.getDataType().substring(MessageAttributeDataTypes.NUMBER.length() + 1);
    try {
        Class<? extends Number> numberTypeClass = Class.forName(numberType).asSubclass(Number.class);
        return NumberUtils.parseNumber(value.getStringValue(), numberTypeClass);
    } catch (ClassNotFoundException e) {
        throw new MessagingException(String.format("Message attribute with value '%s' and data type '%s' could not be converted " +
                "into a Number because target class was not found.", value.getStringValue(), value.getDataType()), e);
    }
}
项目:amazon-sqs-java-messaging-lib    文件:SQSMessageProducer.java   
/**
 * Not verified on the client side, but SQS Attribute names must be valid
 * letter or digit on the basic multilingual plane in addition to allowing
 * '_', '-' and '.'. No component of an attribute name may be empty, thus an
 * attribute name may neither start nor end in '.'. And it may not contain
 * "..".
 */
Map<String, MessageAttributeValue> propertyToMessageAttribute(SQSMessage message)
        throws JMSException {
    Map<String, MessageAttributeValue> messageAttributes = new HashMap<String, MessageAttributeValue>();
    Enumeration<String> propertyNames = message.getPropertyNames();

    while (propertyNames.hasMoreElements()) {
        String propertyName = propertyNames.nextElement();

        // This is generated from SQS message attribute "ApproximateReceiveCount"
        if (propertyName.equals(SQSMessagingClientConstants.JMSX_DELIVERY_COUNT)) {
            continue;
        }

        // This property will be used as DeduplicationId argument of SendMessage call
        // On receive it is mapped back to this JMS property
        if (propertyName.equals(SQSMessagingClientConstants.JMS_SQS_DEDUPLICATION_ID)) {
            continue;
        }

        // the JMSXGroupID and JMSXGroupSeq are always stored as message
        // properties, so they are not lost between send and receive
        // even though SQS Classic does not respect those values when returning messages
        // and SQS FIFO has a different understanding of message groups

        JMSMessagePropertyValue propertyObject = message.getJMSMessagePropertyValue(propertyName);
        MessageAttributeValue messageAttributeValue = new MessageAttributeValue();

        messageAttributeValue.setDataType(propertyObject.getType());
        messageAttributeValue.setStringValue(propertyObject.getStringMessageAttributeValue());

        messageAttributes.put(propertyName, messageAttributeValue);
    }
    return messageAttributes;
}
项目:amazon-sqs-java-messaging-lib    文件:SQSMessageProducer.java   
/**
 * Convenience method for adding a single string attribute.
 */
private void addStringAttribute(Map<String, MessageAttributeValue> messageAttributes,
                                String key, String value) {
    MessageAttributeValue messageAttributeValue = new MessageAttributeValue();
    messageAttributeValue.setDataType(SQSMessagingClientConstants.STRING);
    messageAttributeValue.setStringValue(value);
    messageAttributes.put(key, messageAttributeValue);
}
项目:amazon-sqs-java-messaging-lib    文件:SQSMessageProducerTest.java   
/**
 * Test propertyToMessageAttribute with empty messages of different type
 */
@Test
public void testPropertyToMessageAttributeWithEmpty() throws JMSException {

    /*
     * Test Empty text message default attribute
     */
    SQSMessage sqsText = new SQSTextMessage();
    Map<String, MessageAttributeValue> messageAttributeText = producer.propertyToMessageAttribute(sqsText);

    assertEquals(0, messageAttributeText.size());

    /*
     * Test Empty object message default attribute
     */
    SQSMessage sqsObject = new SQSObjectMessage();
    Map<String, MessageAttributeValue> messageAttributeObject = producer.propertyToMessageAttribute(sqsObject);

    assertEquals(0, messageAttributeObject.size());

    /*
     * Test Empty byte message default attribute
     */
    MessageAttributeValue messageAttributeValueByte = new MessageAttributeValue();
    messageAttributeValueByte.setDataType("String");
    messageAttributeValueByte.setStringValue("byte");

    SQSMessage sqsByte = new SQSBytesMessage();
    Map<String, MessageAttributeValue> messageAttributeByte = producer.propertyToMessageAttribute(sqsByte);

    assertEquals(0, messageAttributeObject.size());
}
项目:amazon-sqs-java-messaging-lib    文件:SQSMessageProducerTest.java   
/**
 * Test sendInternal input with SQSTextMessage
 */
@Test
public void testSendInternalSQSTextMessage() throws JMSException {

    String messageBody1 = "MyText1";
    String messageBody2 = "MyText2";
    SQSTextMessage msg = spy(new SQSTextMessage(messageBody1));

    Map<String, MessageAttributeValue> messageAttributes = createMessageAttribute("text");

    when(amazonSQSClient.sendMessage(any(SendMessageRequest.class)))
            .thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_1))
            .thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_2));

    producer.sendInternal(destination, msg);

    /*
     * Re send the message
     */
    msg.setText(messageBody2);
    producer.sendInternal(destination, msg);

    List<String> messagesBody = Arrays.asList(messageBody1, messageBody2);
    verify(amazonSQSClient, times(2)).sendMessage(argThat(new sendMessageRequestMatcher(QUEUE_URL, messagesBody, messageAttributes)));
    verify(msg, times(2)).setJMSDestination(destination);
    verify(msg).setJMSMessageID("ID:" + MESSAGE_ID_1);
    verify(msg).setJMSMessageID("ID:" + MESSAGE_ID_2);
    verify(msg).setSQSMessageId(MESSAGE_ID_1);
    verify(msg).setSQSMessageId(MESSAGE_ID_2);
}
项目:amazon-sqs-java-messaging-lib    文件:SQSMessageProducerTest.java   
/**
 * Test sendInternal input with SQSTextMessage
 */
@Test
public void testSendInternalSQSTextMessageFromReceivedMessage() throws JMSException {

    /*
     * Set up non JMS sqs message
     */
    Map<String,MessageAttributeValue> mapMessageAttributes = new HashMap<String, MessageAttributeValue>();
    MessageAttributeValue messageAttributeValue = new MessageAttributeValue();
    messageAttributeValue.setStringValue(SQSMessage.TEXT_MESSAGE_TYPE);
    messageAttributeValue.setDataType(SQSMessagingClientConstants.STRING);
    mapMessageAttributes.put(SQSMessage.JMS_SQS_MESSAGE_TYPE, messageAttributeValue);

    Map<String, String> mapAttributes = new HashMap<String, String>();
    mapAttributes.put(SQSMessagingClientConstants.APPROXIMATE_RECEIVE_COUNT, "1");

    com.amazonaws.services.sqs.model.Message message =
            new com.amazonaws.services.sqs.model.Message()
                        .withMessageAttributes(mapMessageAttributes)
                        .withAttributes(mapAttributes)
                        .withBody("MessageBody");

    SQSTextMessage msg = spy(new SQSTextMessage(acknowledger, QUEUE_URL, message));

    when(amazonSQSClient.sendMessage(any(SendMessageRequest.class)))
            .thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_1));

    producer.sendInternal(destination, msg);

    List<String> messagesBody = Arrays.asList("MessageBody");
    verify(amazonSQSClient).sendMessage(argThat(new sendMessageRequestMatcher(QUEUE_URL, messagesBody, mapMessageAttributes)));
    verify(msg).setJMSDestination(destination);
    verify(msg).setJMSMessageID("ID:" + MESSAGE_ID_1);
    verify(msg).setSQSMessageId(MESSAGE_ID_1);
}
项目:amazon-sqs-java-messaging-lib    文件:SQSMessageProducerTest.java   
/**
 * Test sendInternal input with SQSObjectMessage
 */
@Test
public void testSendInternalSQSObjectMessage() throws JMSException {

    HashSet<String> set1 = new HashSet<String>();
    set1.add("data1");
    HashSet<String> set2 = new HashSet<String>();
    set2.add("data2");

    SQSObjectMessage msg = spy(new SQSObjectMessage(set1));
    String megBody1 = msg.getMessageBody();

    Map<String, MessageAttributeValue> messageAttributes = createMessageAttribute("object");

    when(amazonSQSClient.sendMessage(any(SendMessageRequest.class)))
            .thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_1))
            .thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_2));

    producer.sendInternal(destination, msg);

    /*
     * Re send the message
     */
    msg.clearBody();
    msg.setObject(set2);
    String megBody2 = msg.getMessageBody();
    producer.sendInternal(destination, msg);

    ArgumentCaptor<SendMessageRequest> argumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class);
    verify(amazonSQSClient, times(2)).sendMessage(argumentCaptor.capture());

    assertEquals(megBody1, argumentCaptor.getAllValues().get(0).getMessageBody());
    assertEquals(megBody2, argumentCaptor.getAllValues().get(1).getMessageBody());
    verify(msg, times(2)).setJMSDestination(destination);
    verify(msg).setJMSMessageID("ID:" + MESSAGE_ID_1);
    verify(msg).setJMSMessageID("ID:" + MESSAGE_ID_2);
    verify(msg).setSQSMessageId(MESSAGE_ID_1);
    verify(msg).setSQSMessageId(MESSAGE_ID_2);
}
项目:amazon-sqs-java-messaging-lib    文件:SQSMessageProducerTest.java   
/**
 * Test sendInternal input with SQSByteMessage
 */
@Test
public void testSendInternalSQSByteMessage() throws JMSException {

    SQSBytesMessage msg = spy(new SQSBytesMessage());
    msg.writeByte((byte)0);
    msg.reset();

    Map<String, MessageAttributeValue> messageAttributes = createMessageAttribute("byte");

    String messageId = "MessageId";
    when(amazonSQSClient.sendMessage(any(SendMessageRequest.class)))
            .thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_1))
            .thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_2));

    producer.sendInternal(destination, msg);

    /*
     * Re send the message
     */
    msg.clearBody();
    msg.writeInt(42);
    producer.sendInternal(destination, msg);

    List<String> messagesBody = Arrays.asList("AA==", "AAAAKg==");
    verify(amazonSQSClient, times(2)).sendMessage(argThat(new sendMessageRequestMatcher(QUEUE_URL, messagesBody,
                                                                                        messageAttributes)));

    verify(msg, times(2)).setJMSDestination(destination);
    verify(msg).setJMSMessageID("ID:" + MESSAGE_ID_1);
    verify(msg).setJMSMessageID("ID:" + MESSAGE_ID_2);
    verify(msg).setSQSMessageId(MESSAGE_ID_1);
    verify(msg).setSQSMessageId(MESSAGE_ID_2);
}
项目:amazon-sqs-java-messaging-lib    文件:SQSMessageProducerTest.java   
/**
 * Test sendInternal input with SQSByteMessage
 */
@Test
public void testSendInternalSQSByteMessageFromReceivedMessage() throws JMSException, IOException {

    /*
     * Set up non JMS sqs message
     */
    Map<String,MessageAttributeValue> mapMessageAttributes = new HashMap<String, MessageAttributeValue>();
    MessageAttributeValue messageAttributeValue = new MessageAttributeValue();
    messageAttributeValue.setStringValue(SQSMessage.BYTE_MESSAGE_TYPE);
    messageAttributeValue.setDataType(SQSMessagingClientConstants.STRING);
    mapMessageAttributes.put(SQSMessage.JMS_SQS_MESSAGE_TYPE, messageAttributeValue);

    Map<String, String> mapAttributes = new HashMap<String, String>();
    mapAttributes.put(SQSMessagingClientConstants.APPROXIMATE_RECEIVE_COUNT, "1");

    byte[] byteArray = new byte[] { 1, 0, 'a', 65 };
    String messageBody = Base64.encodeAsString(byteArray);
    com.amazonaws.services.sqs.model.Message message =
            new com.amazonaws.services.sqs.model.Message()
                    .withMessageAttributes(mapMessageAttributes)
                    .withAttributes(mapAttributes)
                    .withBody(messageBody);

    SQSObjectMessage msg = spy(new SQSObjectMessage(acknowledger, QUEUE_URL, message));

    Map<String, MessageAttributeValue> messageAttributes = createMessageAttribute("object");

    when(amazonSQSClient.sendMessage(any(SendMessageRequest.class)))
            .thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_1))
            .thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_2));

    producer.sendInternal(destination, msg);

    verify(amazonSQSClient).sendMessage(argThat(new sendMessageRequestMatcher(QUEUE_URL, Arrays.asList(messageBody),
            messageAttributes)));
    verify(msg).setJMSDestination(destination);
    verify(msg).setJMSMessageID("ID:" + MESSAGE_ID_1);
    verify(msg).setSQSMessageId(MESSAGE_ID_1);
}
项目:amazon-sqs-java-messaging-lib    文件:SQSMessageProducerTest.java   
private Map<String, MessageAttributeValue> createMessageAttribute(String type) {
    MessageAttributeValue messageAttributeValue = new MessageAttributeValue();
    messageAttributeValue.setDataType("String");
    messageAttributeValue.setStringValue(type);

    Map<String, MessageAttributeValue> messageAttributes = new HashMap<String, MessageAttributeValue>();
    messageAttributes.put(SQSMessage.JMS_SQS_MESSAGE_TYPE, messageAttributeValue);
    return messageAttributes;
}
项目:amazon-sqs-java-messaging-lib    文件:SQSMessageProducerFifoTest.java   
/**
 * Test propertyToMessageAttribute with empty messages of different type
 */
@Test
public void testPropertyToMessageAttributeWithEmpty() throws JMSException {

    /*
     * Test Empty text message default attribute
     */
    SQSMessage sqsText = new SQSTextMessage();
    Map<String, MessageAttributeValue> messageAttributeText = producer.propertyToMessageAttribute(sqsText);

    assertEquals(0, messageAttributeText.size());

    /*
     * Test Empty object message default attribute
     */
    SQSMessage sqsObject = new SQSObjectMessage();
    Map<String, MessageAttributeValue> messageAttributeObject = producer.propertyToMessageAttribute(sqsObject);

    assertEquals(0, messageAttributeObject.size());

    /*
     * Test Empty byte message default attribute
     */
    SQSMessage sqsByte = new SQSBytesMessage();
    Map<String, MessageAttributeValue> messageAttributeByte = producer.propertyToMessageAttribute(sqsByte);

    assertEquals(0, messageAttributeByte.size());
}
项目:amazon-sqs-java-messaging-lib    文件:SQSMessageProducerFifoTest.java   
/**
 * Test sendInternal input with SQSTextMessage
 */
@Test
public void testSendInternalSQSTextMessageFromReceivedMessage() throws JMSException {

    /*
     * Set up non JMS sqs message
     */
    Map<String,MessageAttributeValue> mapMessageAttributes = new HashMap<String, MessageAttributeValue>();
    MessageAttributeValue messageAttributeValue = new MessageAttributeValue();
    messageAttributeValue.setStringValue(SQSMessage.TEXT_MESSAGE_TYPE);
    messageAttributeValue.setDataType(SQSMessagingClientConstants.STRING);
    mapMessageAttributes.put(SQSMessage.JMS_SQS_MESSAGE_TYPE, messageAttributeValue);

    Map<String, String> mapAttributes = new HashMap<String, String>();
    mapAttributes.put(SQSMessagingClientConstants.APPROXIMATE_RECEIVE_COUNT, "1");
    mapAttributes.put(SQSMessagingClientConstants.MESSAGE_GROUP_ID, GROUP_ID);
    mapAttributes.put(SQSMessagingClientConstants.MESSAGE_DEDUPLICATION_ID, DEDUP_ID);
    mapAttributes.put(SQSMessagingClientConstants.SEQUENCE_NUMBER, SEQ_NUMBER);

    com.amazonaws.services.sqs.model.Message message =
            new com.amazonaws.services.sqs.model.Message()
                        .withMessageAttributes(mapMessageAttributes)
                        .withAttributes(mapAttributes)
                        .withBody("MessageBody");

    SQSTextMessage msg = spy(new SQSTextMessage(acknowledger, QUEUE_URL, message));

    when(amazonSQSClient.sendMessage(any(SendMessageRequest.class)))
            .thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID).withSequenceNumber(SEQ_NUMBER_2));

    producer.sendInternal(destination, msg);

    verify(amazonSQSClient).sendMessage(argThat(new sendMessageRequestMatcher(QUEUE_URL, "MessageBody", SQSMessage.TEXT_MESSAGE_TYPE, GROUP_ID, DEDUP_ID)));
    verify(msg).setJMSDestination(destination);
    verify(msg).setJMSMessageID("ID:" + MESSAGE_ID);
    verify(msg).setSQSMessageId(MESSAGE_ID);
    verify(msg).setSequenceNumber(SEQ_NUMBER_2);
}