Java 类com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry 实例源码

项目:async-sqs    文件:SendMessageBatchAction.java   
@VisibleForTesting
static SendMessageBatchRequest createRequest(String queueUrl, Map<String, SendMessageEntry> entries) {
    return new SendMessageBatchRequest()
            .withQueueUrl(queueUrl)
            .withEntries(entries.entrySet().stream().map(keyValue -> {
                        SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry()
                                .withId(keyValue.getKey())
                                .withMessageBody(keyValue.getValue().getBody());

                        keyValue.getValue().getDelay()
                                .ifPresent((delay) -> entry.setDelaySeconds((int) delay.getSeconds()));

                        return entry;
                    }).collect(Collectors.toList())
            );
}
项目:flume-ng-aws-sqs-sink    文件:BatchSQSMsgSender.java   
private String buildErrorMessage(List<SendMessageBatchRequestEntry> batchRequestEntries,
    List<BatchResultErrorEntry> errors) {
    StringBuilder errorMessage = new StringBuilder();
    int count = 0;
    for (BatchResultErrorEntry error : errors) {
        if (count > 0) {
            errorMessage.append(",");
        }
        SendMessageBatchRequestEntry failedRequestEventEntry =
            findRequestEventEntryById(batchRequestEntries, error.getId());
        String messageBody = failedRequestEventEntry == null ? null : failedRequestEventEntry.getMessageBody();
        errorMessage.append("[" + error.toString() + ",{messageBody:" + "\"" + messageBody + "\"}]");
        count++;
    }
    return errorMessage.toString();
}
项目:flume-ng-aws-sqs-sink    文件:BatchSQSMsgSenderTest.java   
/**
 * Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests happy path scenario.
 * <p>
 * <pre>
 * Inputs:
 *  channel = never empty
 *  batchSize = 5
 *  maxMessageSize = 10 Bytes
 *  each message size = 2 Bytes
 *
 * Expected Output:
 *  number of batches = 1
 *  number of messages in batch = 5
 * </pre>
 */
@Test
public void testCreateBatches() throws Exception {
    BatchSQSMsgSender sqsMsgSender =
        new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10);

    byte[] mockMsgPayload = {'A', 'b'};
    Event mockEvent = Mockito.mock(Event.class);
    when(mockEvent.getBody()).thenReturn(mockMsgPayload);

    Channel mockChannel = Mockito.mock(Channel.class);
    when(mockChannel.take()).thenReturn(mockEvent);

    List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel);

    Assert.assertNotNull(batches);
    Assert.assertEquals(1, batches.size());

    List<SendMessageBatchRequestEntry> msgEntries = batches.get(0).getEntries();
    Assert.assertNotNull(msgEntries);
    Assert.assertEquals(5, msgEntries.size());

    assertCorrectPayloadInEntries(mockMsgPayload, msgEntries);
}
项目:flume-ng-aws-sqs-sink    文件:BatchSQSMsgSenderTest.java   
/**
 * Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the
 * channel is empty after first event.
 * <p>
 * <pre>
 * Inputs:
 *  channel = 1 Event (Empty after first take)
 *  batchSize = 5
 *  maxMessageSize = 10 Bytes
 *
 * Expected Output:
 *  number of batches = 1
 *  number of messages in batch = 1
 * </pre>
 */
@Test
public void testCreateBatchesEmptyChannelAfterFirstTake() throws Exception {
    BatchSQSMsgSender sqsMsgSender =
        new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10);
    byte[] mockMsgPayload = {'A', 'b'};
    Event mockEvent = Mockito.mock(Event.class);
    when(mockEvent.getBody()).thenReturn(mockMsgPayload);

    Channel mockChannel = Mockito.mock(Channel.class);
    when(mockChannel.take()).thenReturn(mockEvent).thenReturn(null);

    List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel);

    Assert.assertNotNull(batches);
    Assert.assertEquals(1, batches.size());

    List<SendMessageBatchRequestEntry> msgEntries = batches.get(0).getEntries();
    Assert.assertNotNull(msgEntries);
    Assert.assertEquals(1, msgEntries.size());

    assertCorrectPayloadInEntries(mockMsgPayload, msgEntries);
}
项目:flume-ng-aws-sqs-sink    文件:BatchSQSMsgSenderTest.java   
/**
 * Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the
 * channel is empty after the last take for the batch.
 * <p>
 * <pre>
 * Inputs:
 *  channel = 5 Events (Empty after 5th take)
 *  batchSize = 5
 *  maxMessageSize = 10 Bytes
 *
 * Expected Output:
 *  number of batches = 1
 *  number of messages in batch = 5
 * </pre>
 */
@Test
public void testCreateBatchesEmptyChannelAfterLastTake() throws Exception {
    BatchSQSMsgSender sqsMsgSender =
        new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10);
    byte[] mockMsgPayload = {'A', 'b'};
    Event mockEvent = Mockito.mock(Event.class);
    when(mockEvent.getBody()).thenReturn(mockMsgPayload);

    Channel mockChannel = Mockito.mock(Channel.class);
    when(mockChannel.take()).thenReturn(mockEvent, mockEvent, mockEvent, mockEvent, mockEvent, null);

    List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel);

    Assert.assertNotNull(batches);
    Assert.assertEquals(1, batches.size());

    List<SendMessageBatchRequestEntry> msgEntries = batches.get(0).getEntries();
    Assert.assertNotNull(msgEntries);
    Assert.assertEquals(5, msgEntries.size());

    assertCorrectPayloadInEntries(mockMsgPayload, msgEntries);
}
项目:flume-ng-aws-sqs-sink    文件:BatchSQSMsgSenderTest.java   
/**
 * Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the
 * channel is empty in the middle of taking events for the batch
 * <p>
 * <pre>
 * Inputs:
 *  channel = 3 Events (Empty after 3rd take)
 *  batchSize = 5
 *  maxMessageSize = 10 Bytes
 *
 * Expected Output:
 *  number of batches = 1
 *  number of messages in batch = 3
 * </pre>
 */
@Test
public void testCreateBatchesEmptyChannelInTheMiddle() throws Exception {
    BatchSQSMsgSender sqsMsgSender =
        new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10);
    byte[] mockMsgPayload = {'A', 'b'};
    Event mockEvent = Mockito.mock(Event.class);
    when(mockEvent.getBody()).thenReturn(mockMsgPayload);

    Channel mockChannel = Mockito.mock(Channel.class);
    when(mockChannel.take()).thenReturn(mockEvent, mockEvent, mockEvent, null);

    List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel);

    Assert.assertNotNull(batches);
    Assert.assertEquals(1, batches.size());

    List<SendMessageBatchRequestEntry> msgEntries = batches.get(0).getEntries();
    Assert.assertNotNull(msgEntries);
    Assert.assertEquals(3, msgEntries.size());

    assertCorrectPayloadInEntries(mockMsgPayload, msgEntries);
}
项目:flume-ng-aws-sqs-sink    文件:BatchSQSMsgSenderTest.java   
/**
 * Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the
 * channel is not empty but contains events with empty body in the middle of taking events for the batch
 * <p>
 * <pre>
 * Inputs:
 *  channel = 4 Events (3 Events with Body and 4th Event empty)
 *  batchSize = 5
 *  maxMessageSize = 10 Bytes
 *
 * Expected Output:
 *  number of batches = 1
 *  number of messages in batch = 3
 * </pre>
 */
@Test
public void testCreateBatchesEmptyEventInTheMiddle() throws Exception {
    BatchSQSMsgSender sqsMsgSender =
        new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10);
    byte[] mockMsgPayload = {'A', 'b'};
    byte[] mockEmptyMsgPayload = {};
    Event mockEvent = Mockito.mock(Event.class);
    Event mockEmptyEvent = Mockito.mock(Event.class);
    when(mockEvent.getBody()).thenReturn(mockMsgPayload);
    when(mockEmptyEvent.getBody()).thenReturn(mockEmptyMsgPayload);

    Channel mockChannel = Mockito.mock(Channel.class);
    when(mockChannel.take()).thenReturn(mockEvent, mockEvent, mockEvent, mockEmptyEvent);

    List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel);

    Assert.assertNotNull(batches);
    Assert.assertEquals(1, batches.size());

    List<SendMessageBatchRequestEntry> msgEntries = batches.get(0).getEntries();
    Assert.assertNotNull(msgEntries);
    Assert.assertEquals(3, msgEntries.size());

    assertCorrectPayloadInEntries(mockMsgPayload, msgEntries);
}
项目:micro-genie    文件:SqsProducer.java   
/***
 * Categorize the messages into batches per queue
 * @param messages
 * @return messageBatches - belonging to one or more queues
 */
private Map<String, List<SendMessageBatchRequestEntry>> createBatchesForQueues(final List<Message> messages) {

    final Map<String, List<SendMessageBatchRequestEntry>> messageBatches = new HashMap<String, List<SendMessageBatchRequestEntry>>();

    for(Message message : messages){
        final Map<String, MessageAttributeValue> attributes = this.toMessageAttrs(message);

        final SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry() 
        .withId(message.getId())
        .withMessageAttributes(attributes)
        .withMessageBody(message.getBody());

        if(!messageBatches.containsKey(message.getQueue())){
            messageBatches.put(message.getQueue(), new ArrayList<SendMessageBatchRequestEntry>());
        }
        messageBatches.get(message.getQueue()).add(entry);
    }
    return messageBatches;
}
项目:izettle-toolbox    文件:QueueServiceSender.java   
/**
 * Posts many messages to queue, with a message envelope that makes them look like they
 * were sent through Amazon SNS.
 *
 * @param messages list of messages to post
 * @param eventName the value that will be used as "subject" in the SNS envelope
 * @throws MessagingException Failed to post messages.
 */
@Override
public <T> void postBatch(Collection<T> messages, String eventName) throws MessagingException {
    if (empty(eventName)) {
        throw new MessagingException("Cannot publish message with empty eventName!");
    }
    try {
        Collection<SendMessageBatchRequestEntry> allEntries = new ArrayList<>(messages.size());
        int messageIdInBatch = 0;
        for (T message : messages) {
            ++messageIdInBatch;
            String messageBody = wrapInSNSMessage(message, eventName);
            allEntries.add(new SendMessageBatchRequestEntry(String.valueOf(messageIdInBatch), messageBody));
        }
        sendMessageBatch(allEntries);
    } catch (AmazonServiceException | IOException | CryptographyException e) {
        throw new MessagingException("Failed to post messages: " + messages.getClass(), e);
    }
}
项目:conductor    文件:SQSObservableQueue.java   
void publishMessages(List<Message> messages) {
    logger.info("Sending {} messages", messages.size());
    SendMessageBatchRequest batch = new SendMessageBatchRequest(queueURL);
    messages.stream().forEach(msg -> {
        SendMessageBatchRequestEntry sendr = new SendMessageBatchRequestEntry(msg.getId(), msg.getPayload());
        batch.getEntries().add(sendr);
    });
    logger.info("sending {}", batch.getEntries().size());
    SendMessageBatchResult result = client.sendMessageBatch(batch);
    logger.info("send result {}", result.getFailed().toString());
}
项目:flume-ng-aws-sqs-sink    文件:BatchSQSMsgSender.java   
/**
 * Handles SQS send message batch result and throws EventDeliveryException to cause the flume transaction to fail
 * and let flume retry the whole batch in case all the messages in the batch failed to be delivered to SQS.
 * Currently, this method does just logs errors and skips the messages in case some messages from the batched failed
 * to be delivered but some succeeded (i.e., partial batch failure).
 * <p>
 * TODO: Add retry logic instead letting flume drop the failed messages in case of partial batch failure
 *
 * @param batchRequest The SQS SendMessageBatchRequest
 * @param batchResult The SQS SendMessageBatchResult
 *
 * @throws EventDeliveryException In case all the messages in the batch failed to be delivered to SQS
 */
protected void handleResult(SendMessageBatchRequest batchRequest, SendMessageBatchResult batchResult)
    throws EventDeliveryException {

    List<SendMessageBatchRequestEntry> batchRequestEntries = batchRequest.getEntries();
    List<BatchResultErrorEntry> errors = batchResult.getFailed();

    int attemptedCount = batchRequestEntries == null ? 0 : batchRequestEntries.size();
    int errorCount = errors == null ? 0 : errors.size();

    if (errorCount > 0) {
        String errorMessage = buildErrorMessage(batchRequestEntries, errors);

        if (attemptedCount == errorCount) {
            // if it was a non-empty batch and if all the messages in the batch have errors then fail the whole
            // batch and let flume rollback the transaction and retry it
            // Just throw the EventDeliveryException. This will eventually cause the channel's transaction to
            // rollback.
            throw new EventDeliveryException(errorMessage);
        }
        else {
            // TODO: Add retry logic instead letting flume drop the failed messages in case of partial batch failure

            // Just log the error message and let flume drop failed messages in case of partial batch failures
            LOG.error(errorMessage);
        }
    }
}
项目:flume-ng-aws-sqs-sink    文件:BatchSQSMsgSender.java   
private SendMessageBatchRequestEntry findRequestEventEntryById(List<SendMessageBatchRequestEntry> entries,
    String id) {
    SendMessageBatchRequestEntry foundEntry = null;
    if (entries != null) {
        for (SendMessageBatchRequestEntry entry : entries) {
            if (entry.getId().equals(id)) {
                foundEntry = entry;
                break;
            }
        }
    }
    return foundEntry;
}
项目:flume-ng-aws-sqs-sink    文件:BatchSQSMsgSenderTest.java   
/**
 * Tests {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests invalid characters not
 * allowed by the SQS. See [http://docs.aws.amazon
 * .com/AWSSimpleQueueService/latest/APIReference/API_SendMessageBatch.html]
 * for list of valid characters allowed by SQS.
 * <p>
 * <p>
 * <pre>
 * Inputs:
 *  channel = never empty. with messages containing invalid characters.
 *
 * Expected Output:
 *   The sink messages should not contain invalid characters
 * </pre>
 */
@Test
public void testInvalidCharacters() throws Exception {
    // See
    // http://stackoverflow.com/questions/16688523/aws-sqs-valid-characters
    // http://stackoverflow.com/questions/1169754/amazon-sqs-invalid-binary-character-in-message-body
    // https://forums.aws.amazon.com/thread.jspa?messageID=459090
    // http://stackoverflow.com/questions/16329695/invalid-binary-character-when-transmitting-protobuf-net
    // -messages-over-aws-sqs
    byte invalidCharByte = 0x1C;
    String mockMsg = "Test with some invalid chars at the end 0%2F>^F";
    byte[] origPayloadWithInvalidChars = ArrayUtils.add(mockMsg.getBytes(), invalidCharByte);

    BatchSQSMsgSender sqsMsgSender =
        new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 1,
            origPayloadWithInvalidChars.length);

    Event mockEvent = Mockito.mock(Event.class);
    when(mockEvent.getBody()).thenReturn(origPayloadWithInvalidChars);

    Channel mockChannel = Mockito.mock(Channel.class);
    when(mockChannel.take()).thenReturn(mockEvent);

    List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel);

    List<SendMessageBatchRequestEntry> msgEntries = batches.get(0).getEntries();
    assertCorrectPayloadInEntries(new String(origPayloadWithInvalidChars).trim().getBytes(), msgEntries);

    // Make sure that the message being sent by the sink doesn't contain the invalid characters
    for (SendMessageBatchRequestEntry entry : msgEntries) {
        Assert.assertNotNull(entry);
        Assert.assertTrue(ArrayUtils.contains(new String(origPayloadWithInvalidChars).getBytes(), invalidCharByte));
        Assert.assertTrue(!ArrayUtils.contains(entry.getMessageBody().getBytes(), invalidCharByte));
    }
}
项目:flume-ng-aws-sqs-sink    文件:BatchSQSMsgSenderTest.java   
/**
 * Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the
 * specified <i>batchSize</i> can not be fit into the specified <i>maxMessageSize</i>
 * <p>
 * <pre>
 * Inputs:
 *  channel = never empty
 *  batchSize = 5
 *  maxMessageSize = 10 Bytes
 *  each message size = 3 Bytes
 *
 * Expected Output:
 *  number of batches = 2
 *  number of messages in batch 1 = 3
 *  number of messages in batch 2 = 2
 * </pre>
 */
@Test
public void testCreateBatchesExceedingSize() throws Exception {
    BatchSQSMsgSender sqsMsgSender =
        new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10);

    byte[] mockMsgPayload = {'A', 'b', '~'};
    Event mockEvent = Mockito.mock(Event.class);
    when(mockEvent.getBody()).thenReturn(mockMsgPayload);

    Channel mockChannel = Mockito.mock(Channel.class);
    when(mockChannel.take()).thenReturn(mockEvent);

    List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel);

    Assert.assertNotNull(batches);
    Assert.assertEquals(2, batches.size());

    List<SendMessageBatchRequestEntry> msgEntries1 = batches.get(0).getEntries();
    Assert.assertNotNull(msgEntries1);
    Assert.assertEquals(3, msgEntries1.size());

    List<SendMessageBatchRequestEntry> msgEntries2 = batches.get(1).getEntries();
    Assert.assertNotNull(msgEntries2);
    Assert.assertEquals(2, msgEntries2.size());

    assertCorrectPayloadInEntries(mockMsgPayload, msgEntries2);
}
项目:flume-ng-aws-sqs-sink    文件:BatchSQSMsgSenderTest.java   
/**
 * Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the
 * specified <i>batchSize</i> can not fit into the specified <i>maxMessageSize</i> and channel gets empty after
 * certain number of events "takes".
 * <p>
 * <pre>
 * Inputs:
 *  channel = 4 Events
 *  batchSize = 5
 *  maxMessageSize = 10 Bytes
 *  each message size = 3 Bytes
 *
 * Expected Output:
 *  number of batches = 2
 *  number of messages in batch 1 = 3
 *  number of messages in batch 2 = 1
 * </pre>
 */
@Test
public void testCreateBatchesExceedingSizeLimitedChannel() throws Exception {
    BatchSQSMsgSender sqsMsgSender =
        new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10);

    byte[] mockMsgPayload = {'^', '@', '~'};
    Event mockEvent = Mockito.mock(Event.class);
    when(mockEvent.getBody()).thenReturn(mockMsgPayload);

    Channel mockChannel = Mockito.mock(Channel.class);
    when(mockChannel.take()).thenReturn(mockEvent, mockEvent, mockEvent, mockEvent, null);

    List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel);

    Assert.assertNotNull(batches);
    Assert.assertEquals(2, batches.size());

    List<SendMessageBatchRequestEntry> msgEntries1 = batches.get(0).getEntries();
    Assert.assertNotNull(msgEntries1);
    Assert.assertEquals(3, msgEntries1.size());

    List<SendMessageBatchRequestEntry> msgEntries2 = batches.get(1).getEntries();
    Assert.assertNotNull(msgEntries2);
    Assert.assertEquals(1, msgEntries2.size());

    assertCorrectPayloadInEntries(mockMsgPayload, msgEntries2);
}
项目:flume-ng-aws-sqs-sink    文件:BatchSQSMsgSenderTest.java   
private void assertCorrectPayloadInEntries(byte[] mockMsgPayload, List<SendMessageBatchRequestEntry> msgEntries)
    throws UnsupportedEncodingException {
    for (SendMessageBatchRequestEntry entry : msgEntries) {
        Assert.assertNotNull(entry);
        Assert.assertEquals(new String(mockMsgPayload, "UTF-8"), entry.getMessageBody());
    }
}
项目:aws-doc-sdk-examples    文件:SendReceiveMessages.java   
public static void main(String[] args)
{
    final AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();

    try {
        CreateQueueResult create_result = sqs.createQueue(QUEUE_NAME);
    } catch (AmazonSQSException e) {
        if (!e.getErrorCode().equals("QueueAlreadyExists")) {
            throw e;
        }
    }

    String queueUrl = sqs.getQueueUrl(QUEUE_NAME).getQueueUrl();

    SendMessageRequest send_msg_request = new SendMessageRequest()
            .withQueueUrl(queueUrl)
            .withMessageBody("hello world")
            .withDelaySeconds(5);
    sqs.sendMessage(send_msg_request);


    // Send multiple messages to the queue
    SendMessageBatchRequest send_batch_request = new SendMessageBatchRequest()
            .withQueueUrl(queueUrl)
            .withEntries(
                    new SendMessageBatchRequestEntry(
                            "msg_1", "Hello from message 1"),
                    new SendMessageBatchRequestEntry(
                            "msg_2", "Hello from message 2")
                            .withDelaySeconds(10));
    sqs.sendMessageBatch(send_batch_request);

    // receive messages from the queue
    List<Message> messages = sqs.receiveMessage(queueUrl).getMessages();

    // delete messages from the queue
    for (Message m : messages) {
        sqs.deleteMessage(queueUrl, m.getReceiptHandle());
    }
}
项目:amazon-sqs-java-extended-client-lib    文件:AmazonSQSExtendedClient.java   
private SendMessageBatchRequestEntry storeMessageInS3(SendMessageBatchRequestEntry batchEntry) {

        checkMessageAttributes(batchEntry.getMessageAttributes());

        String s3Key = UUID.randomUUID().toString();

        // Read the content of the message from message body
        String messageContentStr = batchEntry.getMessageBody();

        Long messageContentSize = getStringSizeInBytes(messageContentStr);

        // Add a new message attribute as a flag
        MessageAttributeValue messageAttributeValue = new MessageAttributeValue();
        messageAttributeValue.setDataType("Number");
        messageAttributeValue.setStringValue(messageContentSize.toString());
        batchEntry.addMessageAttributesEntry(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME, messageAttributeValue);

        // Store the message content in S3.
        storeTextInS3(s3Key, messageContentStr, messageContentSize);

        LOG.info("S3 object created, Bucket name: " + clientConfiguration.getS3BucketName() + ", Object key: " + s3Key
                + ".");

        // Convert S3 pointer (bucket name, key, etc) to JSON string
        MessageS3Pointer s3Pointer = new MessageS3Pointer(clientConfiguration.getS3BucketName(), s3Key);
        String s3PointerStr = getJSONFromS3Pointer(s3Pointer);

        // Storing S3 pointer in the message body.
        batchEntry.setMessageBody(s3PointerStr);

        return batchEntry;
    }
项目:CliDispatcher    文件:SQSClient.java   
/**
 * @todo Roba schifosa!
 */
public void sendBulkMessage(List<String> messages){
    List<SendMessageBatchRequestEntry> entries = new ArrayList<>(10);
    Integer i=0;
    for(String m : messages){
        entries.add(new SendMessageBatchRequestEntry((i++).toString(), m));
    }
    c.sendMessageBatch(queueUrl, entries);
}
项目:amazon-cloudengine    文件:ServerThread.java   
public void remoteBatchSend(BufferedReader in) throws ParseException{
    //Batch sending task to remote workers 
List<SendMessageBatchRequestEntry> entries = new ArrayList<SendMessageBatchRequestEntry>();
      String message;
      final int batchSize = 10; 

      try {
        JSONParser parser=new JSONParser();

    while ((message = in.readLine()) != null) {

        JSONArray taskList = (JSONArray)parser.parse(message);

        for(int i=0; i< taskList.size(); i++){
            JSONObject task = (JSONObject)taskList.get(i);
            msg_cnt++;

            entries.add(new SendMessageBatchRequestEntry()
            .withId(Integer.toString(msg_cnt))
            .withMessageBody(task.toString())); 

        }

        if(entries.size() == batchSize){
            jobQ.batchSend(entries);
            entries.clear();                
        }

    }

     if(!entries.isEmpty()){
            jobQ.batchSend(entries);
            entries.clear();
     }

} catch (IOException e) {           
    e.printStackTrace();
}

  }
项目:para    文件:AWSQueueUtils.java   
/**
 * Pushes a number of messages in batch to an SQS queue.
 * @param queueURL the URL of the SQS queue
 * @param messages the massage bodies
 */
public static void pushMessages(String queueURL, List<String> messages) {
    if (!StringUtils.isBlank(queueURL) && messages != null) {
        // only allow strings - ie JSON
        try {
            int j = 0;
            List<SendMessageBatchRequestEntry> msgs = new ArrayList<>(MAX_MESSAGES);
            for (int i = 0; i < messages.size(); i++) {
                String message = messages.get(i);
                if (!StringUtils.isBlank(message)) {
                    msgs.add(new SendMessageBatchRequestEntry().
                            withMessageBody(message).
                            withId(Integer.toString(i)));
                }
                if (++j >= MAX_MESSAGES || i == messages.size() - 1) {
                    if (!msgs.isEmpty()) {
                        getClient().sendMessageBatch(queueURL, msgs);
                        msgs.clear();
                    }
                    j = 0;
                }
            }
        } catch (AmazonServiceException ase) {
            logException(ase);
        } catch (AmazonClientException ace) {
            logger.error("Could not reach SQS. {}", ace.toString());
        }
    }
}
项目:micro-genie    文件:SqsProducer.java   
/***
 * Submit the batches of messages
 * @param messageBatches
 */
private void submitBatches(
        final Map<String, List<SendMessageBatchRequestEntry>> messageBatches) {
    for(Entry<String, List<SendMessageBatchRequestEntry>> queueBatchEntry : messageBatches.entrySet()){
        final String queueUrl = this.queueAdmin.getQueueUrl(queueBatchEntry.getKey());
        final SendMessageBatchRequest batch = new SendMessageBatchRequest()
        .withQueueUrl(queueUrl)
        .withEntries(queueBatchEntry.getValue());
        final SendMessageBatchResult batchResult = this.sqs.sendMessageBatch(batch);
        this.logFailures(batchResult.getFailed());
    }
}
项目:aws-sdk-java-resources    文件:QueueImpl.java   
@Override
public SendMessageBatchResult sendMessages(
        List<SendMessageBatchRequestEntry> entries) {

    return sendMessages(entries,
            (ResultCapture<SendMessageBatchResult>)null);
}
项目:aws-sdk-java-resources    文件:QueueImpl.java   
@Override
public SendMessageBatchResult sendMessages(
        List<SendMessageBatchRequestEntry> entries,
        ResultCapture<SendMessageBatchResult> extractor) {

    SendMessageBatchRequest request = new SendMessageBatchRequest()
        .withEntries(entries);
    return sendMessages(request, extractor);
}
项目:aws-sdk-java-resources    文件:SQSResourcesIntegrationTest.java   
/**
 * Tests sending messages using batch operation and retrieve them. Also
 * tests setting the queue attributes and retrieving them.
 */
@Test
@Ignore
public void testQueueSubResourceAndAttributes() throws InterruptedException {

    /**
     * Trying to get the message which is deleted. Here there is no service
     * call made, a new sub resource is created with the given handle. So,
     * this wont be returning null.
     */
    Message message = queue.getMessage("invalid-recepient-handle");
    assertNotNull(message);
    try {
        message.getAttributes();
        fail("An unsupported operation exception must be thrown as load operation is no supported on message attribute");
    } catch (UnsupportedOperationException use) { }

    SendMessageBatchResult sendMessageBatchResult = queue
            .sendMessages(new SendMessageBatchRequest()
                    .withEntries(new SendMessageBatchRequestEntry("msg1",
                            TEST_MESSAGE)));
    SendMessageBatchResultEntry sendMessageBatchResultEntry = sendMessageBatchResult
            .getSuccessful().get(0);
    List<Message> messages = waitForMessagesFromQueue(null);

    assertNotNull(messages);
    assertEquals(1, messages.size());
    message = messages.get(0);
    assertMessage(TEST_MESSAGE, sendMessageBatchResultEntry.getMessageId(),
            sendMessageBatchResultEntry.getMD5OfMessageBody(), message);

    queue.setAttributes(ImmutableMapParameter.of("MaximumMessageSize",
            "2048"));

    assertTrue(queue.getAttributes().containsKey("MaximumMessageSize"));
}
项目:spring-cloud-aws    文件:MessageListenerContainerAwsTest.java   
@Override
public void run() {
    List<SendMessageBatchRequestEntry> messages = new ArrayList<>();
    for (int i = 0; i < BATCH_MESSAGE_SIZE; i++) {
        messages.add(new SendMessageBatchRequestEntry(Integer.toString(i), new StringBuilder().append("message_").append(i).toString()));
    }
    this.amazonSqs.sendMessageBatch(new SendMessageBatchRequest(this.queueUrl, messages));
    this.countDownLatch.countDown();
}
项目:izettle-toolbox    文件:QueueServiceSender.java   
private void sendMessageBatch(Collection<SendMessageBatchRequestEntry> messages) {
    for (Collection<SendMessageBatchRequestEntry> batch : partition(messages, MAX_BATCH_SIZE)) {
        final SendMessageBatchResult sendMessageBatchResult =
            amazonSQS.sendMessageBatch(new SendMessageBatchRequest(queueUrl, new ArrayList<>(batch)));
        final List<BatchResultErrorEntry> failed = sendMessageBatchResult.getFailed();
        if (!failed.isEmpty()) {
            try {
                Set<String> failedMessageIds =
                    failed.stream().map(BatchResultErrorEntry::getId).collect(Collectors.toSet());
                final Map<String, SendMessageBatchRequestEntry> failedMessageIdToMessage =
                    batch.stream().filter(failedMessageIds::contains).collect(Collectors.toMap(
                        SendMessageBatchRequestEntry::getId,
                        Function.identity()
                    ));
                failed.stream().forEach(failMessage -> {
                    final SendMessageBatchRequestEntry failedEntry =
                        failedMessageIdToMessage.get(failMessage.getId());
                    if (failedEntry != null) {
                        final String messageBody = failedEntry.getMessageBody();
                        LOG.error(
                            "Failed to send message, due to {}, message content : {} ",
                            failMessage,
                            messageBody
                        );
                    }
                });
            } catch (Exception e) {
                LOG.error("Failed to log failed to send messages", e);
            }
        }
    }
}
项目:izettle-toolbox    文件:QueueServiceSenderTest.java   
@Test
public void postBatchShouldSendMessagesWithSNSEnvelope() throws Exception {
    // Arrange
    when(mockAmazonSQS.sendMessageBatch(any(SendMessageBatchRequest.class))).thenReturn(mock(SendMessageBatchResult.class));
    ArgumentCaptor<SendMessageBatchRequest> captor = ArgumentCaptor.forClass(SendMessageBatchRequest.class);

    // Act
    messagePublisher.postBatch(
        Arrays.asList(
            new TestMessage("Hello"), new TestMessage("world")
        ), "subject"
    );

    // Assert
    verify(mockAmazonSQS).sendMessageBatch(captor.capture());

    SendMessageBatchRequest sendMessageBatchRequest = captor.getValue();
    assertThat(sendMessageBatchRequest.getQueueUrl()).isEqualTo("queueUrl");

    List<SendMessageBatchRequestEntry> entries = sendMessageBatchRequest.getEntries();
    assertThat(entries.size()).isEqualTo(2);

    ObjectMapper mapper = new ObjectMapper();
    AmazonSNSMessage msg1 = mapper.readValue(entries.get(0).getMessageBody(), AmazonSNSMessage.class);
    assertThat(msg1.getSubject()).isEqualTo("subject");
    assertThat(msg1.getMessage()).isEqualTo("{\"message\":\"Hello\"}");

    AmazonSNSMessage msg2 = mapper.readValue(entries.get(1).getMessageBody(), AmazonSNSMessage.class);
    assertThat(msg2.getSubject()).isEqualTo("subject");
    assertThat(msg2.getMessage()).isEqualTo("{\"message\":\"world\"}");
}
项目:awslocal    文件:DirectorySQS.java   
@Override
public SendMessageBatchResult sendMessageBatch(SendMessageBatchRequest sendMessageBatchRequest) throws AmazonClientException {
    DirectorySQSQueue queue = getQueueFromUrl(sendMessageBatchRequest.getQueueUrl(), false);
    //lists for reporting
    List<BatchResultErrorEntry> batchResultErrorEntries = new ArrayList<>();
    List<SendMessageBatchResultEntry> batchResultEntries = new ArrayList<>();
    //attempt to change the visibility on each
    for (SendMessageBatchRequestEntry batchRequestEntry : sendMessageBatchRequest.getEntries()) {
        try {
            final int invisibilityDelay = Objects.firstNonNull(batchRequestEntry.getDelaySeconds(), 0);//0 is amazon spec default
            Message sentMessage = queue.send(batchRequestEntry.getMessageBody(), invisibilityDelay);
            batchResultEntries.add(new SendMessageBatchResultEntry().
                    withId(batchRequestEntry.getId()).
                    withMessageId(sentMessage.getMessageId()).
                    withMD5OfMessageBody(sentMessage.getMD5OfBody()));
        } catch (IOException e) {
            BatchResultErrorEntry batchResultErrorEntry = new BatchResultErrorEntry().
                    withSenderFault(false).
                    withId(batchRequestEntry.getId()).
                    withMessage(e.getMessage());
            batchResultErrorEntries.add(batchResultErrorEntry);
        }
    }
    return new SendMessageBatchResult().
            withFailed(batchResultErrorEntries).
            withSuccessful(batchResultEntries);
}
项目:unitstack    文件:MockSqsTest.java   
@Test
public void testBulkSendDelete_shouldWork() {
  // create queue
  CreateQueueResult createdQueue = sqs.createQueue(new CreateQueueRequest().withQueueName("tea-earl-grey-queue"));
  // send batch
  SendMessageBatchRequestEntry firstRequest = new SendMessageBatchRequestEntry().withDelaySeconds(0).withId("one")
      .withMessageGroupId("groupee").withMessageBody("{\"XOXO\":234}");
  SendMessageBatchRequestEntry secondRequest = new SendMessageBatchRequestEntry().withDelaySeconds(0).withId("two")
      .withMessageGroupId("groupee").withMessageBody("{\"Quinoa\":\"Readymade\",\"vegan\":true}");
  SendMessageBatchRequestEntry thirdRequest = new SendMessageBatchRequestEntry().withDelaySeconds(0).withId("three")
      .withMessageGroupId("groupee").withMessageBody("{\"VHS\":\"street art slow-carb\"}");
  // verify send batch result
  SendMessageBatchResult sendResult = sqs.sendMessageBatch(new SendMessageBatchRequest().withQueueUrl(createdQueue.getQueueUrl())
      .withEntries(ImmutableList.of(firstRequest,secondRequest, thirdRequest)));
  assertNotNull("verify that batch send returned ok", sendResult);
  assertTrue("no request failed",sendResult.getFailed().isEmpty());
  assertEquals("verify successfull message count", 3, sendResult.getSuccessful().size());
  SendMessageBatchResultEntry firstResultEntry = sendResult.getSuccessful().stream().filter(msg -> msg.getId().equals("one")).findAny().get();
  assertEquals("verify correct message MD5",getAwsMessageMD5("{\"XOXO\":234}"),firstResultEntry.getMD5OfMessageBody());
  assertNotNull("verify message id exists",firstResultEntry.getMessageId());

  ReceiveMessageResult receivedMessagesResult = sqs.receiveMessage(new ReceiveMessageRequest().withQueueUrl(createdQueue.getQueueUrl()).withMaxNumberOfMessages(4));
  // delete batch
  List<DeleteMessageBatchRequestEntry> deleteRequests = new ArrayList<>();
  deleteRequests.add(new DeleteMessageBatchRequestEntry().withId("one").withReceiptHandle(receivedMessagesResult.getMessages().get(0).getReceiptHandle()));
  deleteRequests.add(new DeleteMessageBatchRequestEntry().withId("two").withReceiptHandle(receivedMessagesResult.getMessages().get(0).getReceiptHandle()));
  deleteRequests.add(new DeleteMessageBatchRequestEntry().withId("three").withReceiptHandle(receivedMessagesResult.getMessages().get(0).getReceiptHandle())); 

  DeleteMessageBatchResult deleteBatchResult = sqs.deleteMessageBatch(new DeleteMessageBatchRequest().withQueueUrl(createdQueue.getQueueUrl()).withEntries(deleteRequests));

  // verify delete batch result
  assertNotNull("verify that batch delete returned ok", deleteBatchResult);
  assertTrue("no request failed",deleteBatchResult.getFailed().isEmpty());
  assertEquals("verify successfull message count", 3, deleteBatchResult.getSuccessful().size());
  assertTrue("queue must be empty after removal",getQueues().get("tea-earl-grey-queue").getMessageQueue().isEmpty());
  for(Message message : receivedMessagesResult.getMessages()) {
    assertTrue("invisibility-queue must be empty after removal",getQueues().get("tea-earl-grey-queue").getInvisibilityQueueFor(message.getReceiptHandle()).isEmpty());
  }

  // cleanup
  getQueues().remove("tea-earl-grey-queue");
}
项目:reactive-sqs-client    文件:ReactiveSqsClient.java   
public Observable<SendMessageBatchResult> sendMessageBatchAsync(String queueUrl, List<SendMessageBatchRequestEntry> entries) {
    return Observable.from(sqsClient.sendMessageBatchAsync(queueUrl, entries));
}
项目:amazon-sqs-java-extended-client-lib    文件:AmazonSQSExtendedClient.java   
private boolean isLarge(SendMessageBatchRequestEntry batchEntry) {
    int msgAttributesSize = getMsgAttributesSize(batchEntry.getMessageAttributes());
    long msgBodySize = getStringSizeInBytes(batchEntry.getMessageBody());
    long totalMsgSize = msgAttributesSize + msgBodySize;
    return (totalMsgSize > clientConfiguration.getMessageSizeThreshold());
}
项目:amazon-cloudengine    文件:SQSService.java   
public void batchSend(List<SendMessageBatchRequestEntry> entries){

       try {        
        // Send batch messages
           //System.out.println("\nSending a message to jobQueue.\n");

           SendMessageBatchRequest batchRequest = new SendMessageBatchRequest().withQueueUrl(queueUrl);         
        batchRequest.setEntries(entries);

        SendMessageBatchResult batchResult = sqs.sendMessageBatch(batchRequest);

        // sendMessageBatch can return successfully, and yet individual batch
        // items fail. So, make sure to retry the failed ones.
        if (!batchResult.getFailed().isEmpty()) {
            //System.out.println("Retry sending failed messages...");

            List<SendMessageBatchRequestEntry> failedEntries = new ArrayList<SendMessageBatchRequestEntry>();
            Iterator<SendMessageBatchRequestEntry> iter = entries.iterator();

            while(iter.hasNext()){
                if(batchResult.getFailed().contains(iter.next())){
                    failedEntries.add((SendMessageBatchRequestEntry) iter.next());
                }
            }   

            batchRequest.setEntries(failedEntries);
            sqs.sendMessageBatch(batchRequest);
        }

       } catch (AmazonServiceException ase) {
           System.out.println("Caught an AmazonServiceException, which means your request made it " +
                   "to Amazon SQS, but was rejected with an error response for some reason.");
           System.out.println("Error Message:    " + ase.getMessage());
           System.out.println("HTTP Status Code: " + ase.getStatusCode());
           System.out.println("AWS Error Code:   " + ase.getErrorCode());
           System.out.println("Error Type:       " + ase.getErrorType());
           System.out.println("Request ID:       " + ase.getRequestId());
       } catch (AmazonClientException ace) {
           System.out.println("Caught an AmazonClientException, which means the client encountered " +
                   "a serious internal problem while trying to communicate with SQS, such as not " +
                   "being able to access the network.");
           System.out.println("Error Message: " + ace.getMessage());
       }

   }
项目:amazon-sqs-java-extended-client-lib    文件:AmazonSQSExtendedClientBase.java   
/**
 * <p>
 * Delivers up to ten messages to the specified queue. This is a batch
 * version of SendMessage. The result of the send action on each message is
 * reported individually in the response. The maximum allowed individual
 * message size is 256 KB (262,144 bytes).
 * </p>
 * <p>
 * The maximum total payload size (i.e., the sum of all a batch's individual
 * message lengths) is also 256 KB (262,144 bytes).
 * </p>
 * <p>
 * If the <code>DelaySeconds</code> parameter is not specified for an entry,
 * the default for the queue is used.
 * </p>
 * <p>
 * <b>IMPORTANT:</b>The following list shows the characters (in Unicode)
 * that are allowed in your message, according to the W3C XML specification.
 * For more information, go to http://www.faqs.org/rfcs/rfc1321.html. If you
 * send any characters that are not included in the list, your request will
 * be rejected. #x9 | #xA | #xD | [#x20 to #xD7FF] | [#xE000 to #xFFFD] |
 * [#x10000 to #x10FFFF]
 * </p>
 * <p>
 * <b>IMPORTANT:</b> Because the batch request can result in a combination
 * of successful and unsuccessful actions, you should check for batch errors
 * even when the call returns an HTTP status code of 200.
 * </p>
 * <p>
 * <b>NOTE:</b>Some API actions take lists of parameters. These lists are
 * specified using the param.n notation. Values of n are integers starting
 * from 1. For example, a parameter list with two elements looks like this:
 * </p>
 * <p>
 * <code>&Attribute.1=this</code>
 * </p>
 * <p>
 * <code>&Attribute.2=that</code>
 * </p>
 * 
 * @param queueUrl
 *            The URL of the Amazon SQS queue to take action on.
 * @param entries
 *            A list of <a>SendMessageBatchRequestEntry</a> items.
 * 
 * @return The response from the SendMessageBatch service method, as
 *         returned by AmazonSQS.
 * 
 * @throws BatchEntryIdsNotDistinctException
 * @throws TooManyEntriesInBatchRequestException
 * @throws BatchRequestTooLongException
 * @throws UnsupportedOperationException
 * @throws InvalidBatchEntryIdException
 * @throws EmptyBatchRequestException
 *
 * @throws AmazonClientException
 *             If any internal errors are encountered inside the client
 *             while attempting to make the request or handle the response.
 *             For example if a network connection is not available.
 * @throws AmazonServiceException
 *             If an error response is returned by AmazonSQS indicating
 *             either a problem with the data in the request, or a server
 *             side issue.
 */
public SendMessageBatchResult sendMessageBatch(String queueUrl, List<SendMessageBatchRequestEntry> entries)
        throws AmazonServiceException, AmazonClientException {

    return amazonSqsToBeExtended.sendMessageBatch(queueUrl, entries);
}
项目:amazon-sqs-java-extended-client-lib    文件:AmazonSQSExtendedClient.java   
/**
 * <p>
 * Delivers up to ten messages to the specified queue. This is a batch
 * version of SendMessage. The result of the send action on each message is
 * reported individually in the response. Uploads message payloads to Amazon
 * S3 when necessary.
 * </p>
 * <p>
 * If the <code>DelaySeconds</code> parameter is not specified for an entry,
 * the default for the queue is used.
 * </p>
 * <p>
 * <b>IMPORTANT:</b>The following list shows the characters (in Unicode)
 * that are allowed in your message, according to the W3C XML specification.
 * For more information, go to http://www.faqs.org/rfcs/rfc1321.html. If you
 * send any characters that are not included in the list, your request will
 * be rejected. #x9 | #xA | #xD | [#x20 to #xD7FF] | [#xE000 to #xFFFD] |
 * [#x10000 to #x10FFFF]
 * </p>
 * <p>
 * <b>IMPORTANT:</b> Because the batch request can result in a combination
 * of successful and unsuccessful actions, you should check for batch errors
 * even when the call returns an HTTP status code of 200.
 * </p>
 * <p>
 * <b>NOTE:</b>Some API actions take lists of parameters. These lists are
 * specified using the param.n notation. Values of n are integers starting
 * from 1. For example, a parameter list with two elements looks like this:
 * </p>
 * <p>
 * <code>&Attribute.1=this</code>
 * </p>
 * <p>
 * <code>&Attribute.2=that</code>
 * </p>
 *
 * @param queueUrl
 *            The URL of the Amazon SQS queue to take action on.
 * @param entries
 *            A list of <a>SendMessageBatchRequestEntry</a> items.
 *
 * @return The response from the SendMessageBatch service method, as
 *         returned by AmazonSQS.
 *
 * @throws BatchEntryIdsNotDistinctException
 * @throws TooManyEntriesInBatchRequestException
 * @throws BatchRequestTooLongException
 * @throws UnsupportedOperationException
 * @throws InvalidBatchEntryIdException
 * @throws EmptyBatchRequestException
 *
 * @throws AmazonClientException
 *             If any internal errors are encountered inside the client
 *             while attempting to make the request or handle the response.
 *             For example if a network connection is not available.
 * @throws AmazonServiceException
 *             If an error response is returned by AmazonSQS indicating
 *             either a problem with the data in the request, or a server
 *             side issue.
 */
public SendMessageBatchResult sendMessageBatch(String queueUrl, List<SendMessageBatchRequestEntry> entries) {
    SendMessageBatchRequest sendMessageBatchRequest = new SendMessageBatchRequest(queueUrl, entries);
    return sendMessageBatch(sendMessageBatchRequest);
}
项目:micro-genie    文件:SqsProducer.java   
/**
 * Submit a batch of messages to SQS. The messages can be destined for different queues. This
 * method will categories the messages in batches according to what queue they around bound for. After
 * categorization of the messages into batches, each batch will be sent serially.
 * 
 * failures will be written to the error log.
 * 
 * @param messages - The batches of messages which can be destined for one or more queues
 */
@Override
public void submitBatch(final List<Message> messages) {
    final Map<String, List<SendMessageBatchRequestEntry>> messageBatches = this.createBatchesForQueues(messages);
    this.submitBatches(messageBatches);
}
项目:aws-sdk-java-resources    文件:Queue.java   
/**
 * The convenient method form for the <code>SendMessages</code> action.
 *
 * @see #sendMessages(SendMessageBatchRequest)
 */
SendMessageBatchResult sendMessages(List<SendMessageBatchRequestEntry>
        entries);
项目:aws-sdk-java-resources    文件:Queue.java   
/**
 * The convenient method form for the <code>SendMessages</code> action.
 *
 * @see #sendMessages(SendMessageBatchRequest, ResultCapture)
 */
SendMessageBatchResult sendMessages(List<SendMessageBatchRequestEntry>
        entries, ResultCapture<SendMessageBatchResult> extractor);