Java 类com.amazonaws.services.sqs.model.BatchResultErrorEntry 实例源码

项目:flume-ng-aws-sqs-sink    文件:BatchSQSMsgSender.java   
private String buildErrorMessage(List<SendMessageBatchRequestEntry> batchRequestEntries,
    List<BatchResultErrorEntry> errors) {
    StringBuilder errorMessage = new StringBuilder();
    int count = 0;
    for (BatchResultErrorEntry error : errors) {
        if (count > 0) {
            errorMessage.append(",");
        }
        SendMessageBatchRequestEntry failedRequestEventEntry =
            findRequestEventEntryById(batchRequestEntries, error.getId());
        String messageBody = failedRequestEventEntry == null ? null : failedRequestEventEntry.getMessageBody();
        errorMessage.append("[" + error.toString() + ",{messageBody:" + "\"" + messageBody + "\"}]");
        count++;
    }
    return errorMessage.toString();
}
项目:flume-ng-aws-sqs-sink    文件:BatchSQSMsgSenderTest.java   
private SendMessageBatchResult mockBatchResult(int batchSize, int expectedSuccessCount) {
    SendMessageBatchResult mockResult = Mockito.mock(SendMessageBatchResult.class);

    List<SendMessageBatchResultEntry> successfulEntries = new ArrayList<SendMessageBatchResultEntry>();
    for (int i = 0; i < expectedSuccessCount; i++) {
        successfulEntries.add(new SendMessageBatchResultEntry().withId(String.valueOf(i + 1)));
    }
    when(mockResult.getSuccessful()).thenReturn(successfulEntries);
    List<BatchResultErrorEntry> failedEntries = new ArrayList<BatchResultErrorEntry>();
    for (int i = expectedSuccessCount; i < batchSize; i++) {
        failedEntries.add(
            new BatchResultErrorEntry().withId(String.valueOf(i + 1)).withCode("401").withSenderFault(true)
                .withMessage("Invalid binary character"));
    }
    when(mockResult.getFailed()).thenReturn(failedEntries);
    return mockResult;
}
项目:awslocal    文件:DirectorySQS.java   
@Override
public DeleteMessageBatchResult deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) throws AmazonClientException {
    DirectorySQSQueue queue = getQueueFromUrl(deleteMessageBatchRequest.getQueueUrl(), false);
    //lists for reporting
    List<BatchResultErrorEntry> batchResultErrorEntries = new ArrayList<>();
    List<DeleteMessageBatchResultEntry> batchResultEntries = new ArrayList<>();
    //attempt delete on each
    for (DeleteMessageBatchRequestEntry batchRequestEntry : deleteMessageBatchRequest.getEntries()) {
        try {
            queue.delete(batchRequestEntry.getReceiptHandle());
            batchResultEntries.add(new DeleteMessageBatchResultEntry().withId(batchRequestEntry.getId()));
        } catch (IOException e) {
            BatchResultErrorEntry batchResultErrorEntry = new BatchResultErrorEntry().
                    withSenderFault(true).
                    withId(batchRequestEntry.getId()).
                    withMessage(e.getMessage());
            batchResultErrorEntries.add(batchResultErrorEntry);
        }
    }
    return new DeleteMessageBatchResult().withFailed(batchResultErrorEntries).withSuccessful(batchResultEntries);
}
项目:awslocal    文件:DirectorySQS.java   
@Override
public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) throws AmazonClientException {
    DirectorySQSQueue queue = getQueueFromUrl(changeMessageVisibilityBatchRequest.getQueueUrl(), false);
    //lists for reporting
    List<BatchResultErrorEntry> batchResultErrorEntries = new ArrayList<>();
    List<ChangeMessageVisibilityBatchResultEntry> batchResultEntries = new ArrayList<>();
    //attempt to change the visibility on each
    for (ChangeMessageVisibilityBatchRequestEntry batchRequestEntry : changeMessageVisibilityBatchRequest.getEntries()) {
        try {
            queue.changeVisibility(batchRequestEntry.getReceiptHandle(), batchRequestEntry.getVisibilityTimeout());
            batchResultEntries.add(new ChangeMessageVisibilityBatchResultEntry().withId(batchRequestEntry.getId()));
        } catch (Exception e) {
            BatchResultErrorEntry batchResultErrorEntry = new BatchResultErrorEntry().
                    withSenderFault(true).
                    withId(batchRequestEntry.getId()).
                    withMessage(e.getMessage());
            batchResultErrorEntries.add(batchResultErrorEntry);
        }
    }
    return new ChangeMessageVisibilityBatchResult().withFailed(batchResultErrorEntries).withSuccessful(batchResultEntries);
}
项目:flume-ng-aws-sqs-sink    文件:BatchSQSMsgSender.java   
/**
 * Handles SQS send message batch result and throws EventDeliveryException to cause the flume transaction to fail
 * and let flume retry the whole batch in case all the messages in the batch failed to be delivered to SQS.
 * Currently, this method does just logs errors and skips the messages in case some messages from the batched failed
 * to be delivered but some succeeded (i.e., partial batch failure).
 * <p>
 * TODO: Add retry logic instead letting flume drop the failed messages in case of partial batch failure
 *
 * @param batchRequest The SQS SendMessageBatchRequest
 * @param batchResult The SQS SendMessageBatchResult
 *
 * @throws EventDeliveryException In case all the messages in the batch failed to be delivered to SQS
 */
protected void handleResult(SendMessageBatchRequest batchRequest, SendMessageBatchResult batchResult)
    throws EventDeliveryException {

    List<SendMessageBatchRequestEntry> batchRequestEntries = batchRequest.getEntries();
    List<BatchResultErrorEntry> errors = batchResult.getFailed();

    int attemptedCount = batchRequestEntries == null ? 0 : batchRequestEntries.size();
    int errorCount = errors == null ? 0 : errors.size();

    if (errorCount > 0) {
        String errorMessage = buildErrorMessage(batchRequestEntries, errors);

        if (attemptedCount == errorCount) {
            // if it was a non-empty batch and if all the messages in the batch have errors then fail the whole
            // batch and let flume rollback the transaction and retry it
            // Just throw the EventDeliveryException. This will eventually cause the channel's transaction to
            // rollback.
            throw new EventDeliveryException(errorMessage);
        }
        else {
            // TODO: Add retry logic instead letting flume drop the failed messages in case of partial batch failure

            // Just log the error message and let flume drop failed messages in case of partial batch failures
            LOG.error(errorMessage);
        }
    }
}
项目:micro-genie    文件:SqsProducer.java   
/***
 * Logging Failures
 * @param failed 
 */
private void logFailures(final List<BatchResultErrorEntry> failed) {
    if(failed != null){
        for(BatchResultErrorEntry batchError :  failed){
            LOGGER.error("Failed to submit sqs batch message entry - Id: {} - Code: {} - Message: {}, isSenders fault: {}",  batchError.getId(), batchError.getCode(), batchError.getMessage(), batchError.getSenderFault());
        }           
    }
}
项目:izettle-toolbox    文件:QueueServiceSender.java   
private void sendMessageBatch(Collection<SendMessageBatchRequestEntry> messages) {
    for (Collection<SendMessageBatchRequestEntry> batch : partition(messages, MAX_BATCH_SIZE)) {
        final SendMessageBatchResult sendMessageBatchResult =
            amazonSQS.sendMessageBatch(new SendMessageBatchRequest(queueUrl, new ArrayList<>(batch)));
        final List<BatchResultErrorEntry> failed = sendMessageBatchResult.getFailed();
        if (!failed.isEmpty()) {
            try {
                Set<String> failedMessageIds =
                    failed.stream().map(BatchResultErrorEntry::getId).collect(Collectors.toSet());
                final Map<String, SendMessageBatchRequestEntry> failedMessageIdToMessage =
                    batch.stream().filter(failedMessageIds::contains).collect(Collectors.toMap(
                        SendMessageBatchRequestEntry::getId,
                        Function.identity()
                    ));
                failed.stream().forEach(failMessage -> {
                    final SendMessageBatchRequestEntry failedEntry =
                        failedMessageIdToMessage.get(failMessage.getId());
                    if (failedEntry != null) {
                        final String messageBody = failedEntry.getMessageBody();
                        LOG.error(
                            "Failed to send message, due to {}, message content : {} ",
                            failMessage,
                            messageBody
                        );
                    }
                });
            } catch (Exception e) {
                LOG.error("Failed to log failed to send messages", e);
            }
        }
    }
}
项目:awslocal    文件:DirectorySQS.java   
@Override
public SendMessageBatchResult sendMessageBatch(SendMessageBatchRequest sendMessageBatchRequest) throws AmazonClientException {
    DirectorySQSQueue queue = getQueueFromUrl(sendMessageBatchRequest.getQueueUrl(), false);
    //lists for reporting
    List<BatchResultErrorEntry> batchResultErrorEntries = new ArrayList<>();
    List<SendMessageBatchResultEntry> batchResultEntries = new ArrayList<>();
    //attempt to change the visibility on each
    for (SendMessageBatchRequestEntry batchRequestEntry : sendMessageBatchRequest.getEntries()) {
        try {
            final int invisibilityDelay = Objects.firstNonNull(batchRequestEntry.getDelaySeconds(), 0);//0 is amazon spec default
            Message sentMessage = queue.send(batchRequestEntry.getMessageBody(), invisibilityDelay);
            batchResultEntries.add(new SendMessageBatchResultEntry().
                    withId(batchRequestEntry.getId()).
                    withMessageId(sentMessage.getMessageId()).
                    withMD5OfMessageBody(sentMessage.getMD5OfBody()));
        } catch (IOException e) {
            BatchResultErrorEntry batchResultErrorEntry = new BatchResultErrorEntry().
                    withSenderFault(false).
                    withId(batchRequestEntry.getId()).
                    withMessage(e.getMessage());
            batchResultErrorEntries.add(batchResultErrorEntry);
        }
    }
    return new SendMessageBatchResult().
            withFailed(batchResultErrorEntries).
            withSuccessful(batchResultEntries);
}