Java 类com.amazonaws.services.sqs.model.OverLimitException 实例源码

项目:spring-cloud-aws    文件:SimpleMessageListenerContainerTest.java   
@Test
public void stop_withContainerHavingMultipleQueuesRunning_shouldStopQueuesInParallel() throws Exception {
    // Arrange
    StaticApplicationContext applicationContext = new StaticApplicationContext();
    applicationContext.registerSingleton("testMessageListener", TestMessageListener.class);
    applicationContext.registerSingleton("anotherTestMessageListener", AnotherTestMessageListener.class);

    CountDownLatch testQueueCountdownLatch = new CountDownLatch(1);
    CountDownLatch anotherTestQueueCountdownLatch = new CountDownLatch(1);
    CountDownLatch spinningThreadsStarted = new CountDownLatch(2);

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer() {

        @Override
        public void stopQueue(String logicalQueueName) {
            if ("testQueue".equals(logicalQueueName)) {
                testQueueCountdownLatch.countDown();
            } else if ("anotherTestQueue".equals(logicalQueueName)) {
                anotherTestQueueCountdownLatch.countDown();
            }

            super.stopQueue(logicalQueueName);
        }
    };

    AmazonSQSAsync sqs = mock(AmazonSQSAsync.class, withSettings().stubOnly());
    container.setAmazonSqs(sqs);
    container.setBackOffTime(100);
    container.setQueueStopTimeout(5000);

    QueueMessageHandler messageHandler = new QueueMessageHandler();
    messageHandler.setApplicationContext(applicationContext);
    container.setMessageHandler(messageHandler);

    mockGetQueueUrl(sqs, "testQueue", "http://testQueue.amazonaws.com");
    mockGetQueueUrl(sqs, "anotherTestQueue", "http://anotherTestQueue.amazonaws.com");
    mockGetQueueAttributesWithEmptyResult(sqs, "http://testQueue.amazonaws.com");
    mockGetQueueAttributesWithEmptyResult(sqs, "http://anotherTestQueue.amazonaws.com");

    when(sqs.receiveMessage(new ReceiveMessageRequest("http://testQueue.amazonaws.com").withAttributeNames("All")
            .withMessageAttributeNames("All")
            .withMaxNumberOfMessages(10)))
            .thenAnswer((Answer<ReceiveMessageResult>) invocation -> {
                spinningThreadsStarted.countDown();
                testQueueCountdownLatch.await(1, TimeUnit.SECONDS);
                throw new OverLimitException("Boom");
            });

    when(sqs.receiveMessage(new ReceiveMessageRequest("http://anotherTestQueue.amazonaws.com").withAttributeNames("All")
            .withMessageAttributeNames("All")
            .withMaxNumberOfMessages(10)))
            .thenAnswer((Answer<ReceiveMessageResult>) invocation -> {
                spinningThreadsStarted.countDown();
                anotherTestQueueCountdownLatch.await(1, TimeUnit.SECONDS);
                throw new OverLimitException("Boom");
            });

    messageHandler.afterPropertiesSet();
    container.afterPropertiesSet();
    container.start();
    spinningThreadsStarted.await(1, TimeUnit.SECONDS);
    StopWatch stopWatch = new StopWatch();

    // Act
    stopWatch.start();
    container.stop();
    stopWatch.stop();

    // Assert
    assertTrue("Stop time must be shorter than stopping one queue after the other", stopWatch.getTotalTimeMillis() < 200);
}