Java 类javax.jms.MessageConsumer 实例源码

项目:tangyuan2    文件:ActiveMqReceiver.java   
private void startSyncReceiveThread(final MessageConsumer messageConsumer, final long receiveTimeout) {
    syncReceiveThread = new SyncReceiveThread() {
        @Override
        public void run() {
            log.info("start listen to the " + typeStr + "[" + queue.getName() + "].");
            while (running) {
                try {
                    Message message = messageConsumer.receive(receiveTimeout);
                    processMessage(message);
                } catch (Throwable e) {
                    // 如果是关闭的时候,可能会报InterruptedException
                    log.error("listen to the [" + queue.getName() + "] error.", e);
                }
            }
            closed = true;
        }
    };
    syncReceiveThread.start();
}
项目:xtf    文件:JmsClient.java   
public MessageConsumer createTopicConsumer(String selector) throws JMSException {
    if (isQueue) {
        throw new IllegalArgumentException("Only for topic, not queue");
    }
    String consumerId = "consumer-" + UUID.randomUUID();
    topicConnection = startConnection(consumerId);
    Session session = topicConnection.createSession(isTransacted, Session.AUTO_ACKNOWLEDGE);
    Topic topic = session.createTopic(destinationName);
    if (isDurable) {
        if (selector != null) {
            return session.createDurableSubscriber(topic, consumerId, selector, true);
        } else {
            return session.createDurableSubscriber(topic, consumerId);
        }
    } else {
        if (selector != null) {
            return session.createConsumer(topic, selector);
        } else {
            return session.createConsumer(topic);
        }
    }
}
项目:jaffa-framework    文件:JmsBrowser.java   
/** Deletes the input Message from the given Queue, by creating a temporary consumer for that Message
 * @param session the JMS Session.
 * @param message the JMS Message.
 * @param queueName the Queue to consume from.
 * @throws FrameworkException in case any internal error occurs.
 * @throws ApplicationExceptions Indicates application error(s).
 */
static void consumeMessage(Session session, Message message, String queueName) throws FrameworkException, ApplicationExceptions {
    try {
        // Creates a consumer on the session for the given queueName, and specifying a selector having HEADER_JMS_MESSAGE_ID as the given messageId
        String selector = new StringBuilder(HEADER_JMS_MESSAGE_ID)
        .append("='")
        .append(message.getJMSMessageID())
        .append('\'')
        .toString();
        MessageConsumer consumer = session.createConsumer(JmsClientHelper.obtainQueue(queueName), selector);

        // Consume the message. Wait for 10 seconds at most
        Message m = consumer.receive(10000);
        if (m == null)
            throw new ApplicationExceptions(new JaffaMessagingApplicationException(JaffaMessagingApplicationException.MESSAGE_NOT_FOUND));
        consumer.close();
    } catch (JMSException e) {
        log.error("Error in consuming a JMS Message", e);
        throw new JaffaMessagingFrameworkException(JaffaMessagingFrameworkException.DELETE_ERROR, null, e);
    }
}
项目:pooled-jms    文件:JmsPoolMessageConusmerTest.java   
@Test
public void testReceive() throws JMSException {
    JmsPoolConnection connection = (JmsPoolConnection) cf.createQueueConnection();
    Session session = connection.createSession();
    Queue queue = session.createTemporaryQueue();
    MessageConsumer consumer = session.createConsumer(queue, "Color = Red");

    assertNull(consumer.receive());

    consumer.close();

    try {
        consumer.receive();
        fail("Should not be able to interact with closed consumer");
    } catch (IllegalStateException ise) {}
}
项目:pooled-jms    文件:JmsPoolMessageConusmerTest.java   
@Test
public void testReceiveTimed() throws JMSException {
    JmsPoolConnection connection = (JmsPoolConnection) cf.createQueueConnection();
    Session session = connection.createSession();
    Queue queue = session.createTemporaryQueue();
    MessageConsumer consumer = session.createConsumer(queue, "Color = Red");

    assertNull(consumer.receive(1));

    consumer.close();

    try {
        consumer.receive(1);
        fail("Should not be able to interact with closed consumer");
    } catch (IllegalStateException ise) {}
}
项目:pooled-jms    文件:PooledConnectionTempQueueTest.java   
private void sendWithReplyToTemp(ConnectionFactory cf, String serviceQueue) throws JMSException, InterruptedException {
    Connection connection = cf.createConnection();
    connection.start();
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    TemporaryQueue tempQueue = session.createTemporaryQueue();
    TextMessage msg = session.createTextMessage("Request");
    msg.setJMSReplyTo(tempQueue);
    MessageProducer producer = session.createProducer(session.createQueue(serviceQueue));
    producer.send(msg);

    MessageConsumer consumer = session.createConsumer(tempQueue);
    Message replyMsg = consumer.receive();
    assertNotNull(replyMsg);

    LOG.debug("Reply message: {}", replyMsg);

    consumer.close();

    producer.close();
    session.close();
    connection.close();
}
项目:pooled-jms    文件:PooledConnectionTempQueueTest.java   
public void receiveAndRespondWithMessageIdAsCorrelationId(ConnectionFactory connectionFactory, String queueName) throws JMSException {
    Connection connection = connectionFactory.createConnection();
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    MessageConsumer consumer = session.createConsumer(session.createQueue(queueName));
    final javax.jms.Message inMessage = consumer.receive();

    String requestMessageId = inMessage.getJMSMessageID();
    LOG.debug("Received message " + requestMessageId);
    final TextMessage replyMessage = session.createTextMessage("Result");
    replyMessage.setJMSCorrelationID(inMessage.getJMSMessageID());
    final MessageProducer producer = session.createProducer(inMessage.getJMSReplyTo());
    LOG.debug("Sending reply to " + inMessage.getJMSReplyTo());
    producer.send(replyMessage);

    producer.close();
    consumer.close();
    session.close();
    connection.close();
}
项目:solace-integration-guides    文件:JMSConnectionFactoryProviderTest.java   
/**
 * This test simply validates that {@link ConnectionFactory} can be setup by
 * pointing to the location of the client libraries at runtime. It uses
 * ActiveMQ which is not present at the POM but instead pulled from Maven
 * repo using {@link TestUtils#setupActiveMqLibForTesting(boolean)}, which
 * implies that for this test to run the computer must be connected to the
 * Internet. If computer is not connected to the Internet, this test will
 * quietly fail logging a message.
 */
@Test
public void validateFactoryCreationWithActiveMQLibraries() throws Exception {
    try {
        String libPath = TestUtils.setupActiveMqLibForTesting(true);

        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
        JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
        runner.addControllerService("cfProvider", cfProvider);
        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI,
                "vm://localhost?broker.persistent=false");
        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, libPath);
        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL,
                "org.apache.activemq.ActiveMQConnectionFactory");
        runner.enableControllerService(cfProvider);
        runner.assertValid(cfProvider);

        Connection connection = cfProvider.getConnectionFactory().createConnection();
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination queue = session.createQueue("myqueue");
        MessageProducer producer = session.createProducer(queue);
        MessageConsumer consumer = session.createConsumer(queue);

        TextMessage message = session.createTextMessage("Hello");
        producer.send(message);
        assertEquals("Hello", ((TextMessage) consumer.receive()).getText());
        connection.stop();
        connection.close();
    } catch (Exception e) {
        logger.error("'validateFactoryCreationWithActiveMQLibraries' failed due to ", e);
    }
}
项目:axon-jms    文件:JmsMessageSource.java   
/**
 * JMS Consumer and Converter to convert retrieve and convert Message.
 * @param consumer The consumer
 * @param converter The converter
 */
public JmsMessageSource(MessageConsumer consumer, JmsMessageConverter converter) {
  try {
    this.converter = converter;
    consumer.setMessageListener(this);
  } catch (JMSException ex) {
    throw new RuntimeException(ex);
  }
}
项目:pooled-jms    文件:MockJMSQueueSession.java   
@Override
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
    if (destination instanceof Topic) {
        throw new IllegalStateException("Operation not supported by a QueueSession");
    }
    return super.createConsumer(destination, messageSelector);
}
项目:lemon    文件:ProxyConnectionFactory.java   
public MessageConsumer createConsumer(Destination destination,
        ProxySession session) throws JMSException {
    String destinationName = destination.toString();
    ProxyMessageConsumer messageConsumer = new ProxyMessageConsumer(session);
    messageConsumer.setDestination(destination);

    if (destination instanceof Topic) {
        this.messageHandler.registerToTopic(destinationName,
                messageConsumer.getId());
    }

    return messageConsumer;
}
项目:pooled-jms    文件:MockJMSTopicSession.java   
/**
 * @see javax.jms.Session#createConsumer(javax.jms.Destination, java.lang.String)
 */
@Override
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
    if (destination instanceof Queue) {
        throw new IllegalStateException("Operation not supported by a TopicSession");
    }
    return super.createConsumer(destination, messageSelector, noLocal);
}
项目:pooled-jms    文件:JmsPoolMessageConusmerTest.java   
@Test
public void testToString() throws JMSException {
    JmsPoolConnection connection = (JmsPoolConnection) cf.createQueueConnection();
    Session session = connection.createSession();
    Queue queue = session.createTemporaryQueue();
    MessageConsumer consumer = session.createConsumer(queue);

    assertNotNull(consumer.toString());
}
项目:pooled-jms    文件:JmsPoolMessageConusmerTest.java   
@Test
public void testCloseMoreThanOnce() throws JMSException {
    JmsPoolConnection connection = (JmsPoolConnection) cf.createQueueConnection();
    Session session = connection.createSession();
    Queue queue = session.createTemporaryQueue();
    MessageConsumer consumer = session.createConsumer(queue);

    consumer.close();
    consumer.close();
}
项目:flume-release-1.7.0    文件:JMSMessageConsumerTestBase.java   
@Before
public void setup() throws Exception {
  beforeSetup();
  connectionFactory = mock(ConnectionFactory.class);
  connection = mock(Connection.class);
  session = mock(Session.class);
  queue = mock(Queue.class);
  topic = mock(Topic.class);
  messageConsumer = mock(MessageConsumer.class);
  message = mock(TextMessage.class);
  when(message.getPropertyNames()).thenReturn(new Enumeration<Object>() {
    @Override
    public boolean hasMoreElements() {
      return false;
    }
    @Override
    public Object nextElement() {
      throw new UnsupportedOperationException();
    }
  });
  when(message.getText()).thenReturn(TEXT);
  when(connectionFactory.createConnection(USERNAME, PASSWORD)).thenReturn(connection);
  when(connection.createSession(true, Session.SESSION_TRANSACTED)).thenReturn(session);
  when(session.createQueue(destinationName)).thenReturn(queue);
  when(session.createConsumer(any(Destination.class), anyString())).thenReturn(messageConsumer);
  when(messageConsumer.receiveNoWait()).thenReturn(message);
  when(messageConsumer.receive(anyLong())).thenReturn(message);
  destinationName = DESTINATION_NAME;
  destinationType = JMSDestinationType.QUEUE;
  destinationLocator = JMSDestinationLocator.CDI;
  messageSelector = SELECTOR;
  batchSize = 10;
  pollTimeout = 500L;
  context = new Context();
  converter = new DefaultJMSMessageConverter.Builder().build(context);
  event = converter.convert(message).iterator().next();
  userName = Optional.of(USERNAME);
  password = Optional.of(PASSWORD);
  afterSetup();
}
项目:pooled-jms    文件:PooledConnectionSessionCleanupTest.java   
@Test(timeout = 60000)
public void testLingeringPooledSessionsHoldingPrefetchedMessages() throws Exception {

    produceMessages();

    Session pooledSession1 = pooledConn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
    pooledSession1.createConsumer(queue);

    final QueueViewMBean view = getProxyToQueue(queue.getPhysicalName());

    assertTrue("Should have all sent messages in flight:", Wait.waitFor(new Wait.Condition() {

        @Override
        public boolean isSatisfied() throws Exception {
            return view.getInFlightCount() == MESSAGE_COUNT;
        }
    }, TimeUnit.SECONDS.toMillis(20), TimeUnit.MILLISECONDS.toMillis(25)));

    // While all the message are in flight we should get anything on this consumer.
    Session session = directConn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
    MessageConsumer consumer = session.createConsumer(queue);
    assertNull(consumer.receive(500));

    pooledConn1.close();

    assertTrue("Should have only one consumer now:", Wait.waitFor(new Wait.Condition() {

        @Override
        public boolean isSatisfied() throws Exception {
            return view.getSubscriptions().length == 1;
        }
    }, TimeUnit.SECONDS.toMillis(20), TimeUnit.MILLISECONDS.toMillis(25)));

    // Now we'd expect that the message stuck in the prefetch of the pooled session's
    // consumer would be rerouted to the non-pooled session's consumer.
    assertNotNull(consumer.receive(10000));
}
项目:pooled-jms    文件:PooledConnectionSessionCleanupTest.java   
@Test(timeout = 60000)
public void testNonPooledConnectionCloseNotHoldingPrefetchedMessages() throws Exception {

    produceMessages();

    Session directSession = directConn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
    directSession.createConsumer(queue);

    final QueueViewMBean view = getProxyToQueue(queue.getPhysicalName());

    assertTrue("Should have all sent messages in flight:", Wait.waitFor(new Wait.Condition() {

        @Override
        public boolean isSatisfied() throws Exception {
            return view.getInFlightCount() == MESSAGE_COUNT;
        }
    }, TimeUnit.SECONDS.toMillis(20), TimeUnit.MILLISECONDS.toMillis(25)));

    // While all the message are in flight we should get anything on this consumer.
    Session session = directConn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
    MessageConsumer consumer = session.createConsumer(queue);
    assertNull(consumer.receive(500));

    directConn2.close();

    assertTrue("Should have only one consumer now:", Wait.waitFor(new Wait.Condition() {

        @Override
        public boolean isSatisfied() throws Exception {
            return view.getSubscriptions().length == 1;
        }
    }, TimeUnit.SECONDS.toMillis(20), TimeUnit.MILLISECONDS.toMillis(25)));

    // Now we'd expect that the message stuck in the prefetch of the first session's
    // consumer would be rerouted to the alternate session's consumer.
    assertNotNull(consumer.receive(10000));
}
项目:nifi-jms-jndi    文件:JMSConnectionFactoryProviderTest.java   
/**
 * This test simply validates that {@link ConnectionFactory} can be setup by
 * pointing to the location of the client libraries at runtime. It uses
 * ActiveMQ which is not present at the POM but instead pulled from Maven
 * repo using {@link TestUtils#setupActiveMqLibForTesting(boolean)}, which
 * implies that for this test to run the computer must be connected to the
 * Internet. If computer is not connected to the Internet, this test will
 * quietly fail logging a message.
 */
@Test
public void validateFactoryCreationWithActiveMQLibraries() throws Exception {
    try {
        String libPath = TestUtils.setupActiveMqLibForTesting(true);

        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
        JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
        runner.addControllerService("cfProvider", cfProvider);
        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI,
                "vm://localhost?broker.persistent=false");
        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, libPath);
        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL,
                "org.apache.activemq.ActiveMQConnectionFactory");
        runner.enableControllerService(cfProvider);
        runner.assertValid(cfProvider);

        Connection connection = cfProvider.getConnectionFactory().createConnection();
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination queue = session.createQueue("myqueue");
        MessageProducer producer = session.createProducer(queue);
        MessageConsumer consumer = session.createConsumer(queue);

        TextMessage message = session.createTextMessage("Hello");
        producer.send(message);
        assertEquals("Hello", ((TextMessage) consumer.receive()).getText());
        connection.stop();
        connection.close();
    } catch (Exception e) {
        logger.error("'validateFactoryCreationWithActiveMQLibraries' failed due to ", e);
    }
}
项目:nifi-jms-jndi    文件:JNDIConnectionFactoryProviderTest.java   
/**
 * This test simply validates that {@link ConnectionFactory} can be setup by pointing to the location of the client
 * libraries at runtime. It uses ActiveMQ which is not present at the POM but instead pulled from Maven repo using
 * {@link TestUtils#setupActiveMqLibForTesting(boolean)}, which implies that for this test to run the computer must
 * be connected to the Internet. If computer is not connected to the Internet, this test will quietly fail logging a
 * message.
 */
@Test
public void validateFactoryCreationWithActiveMQLibraries() throws Exception {
    try {
        String libPath = TestUtils.setupActiveMqLibForTesting(true);

        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
        JNDIConnectionFactoryProvider cfProvider = new JNDIConnectionFactoryProvider();
        runner.addControllerService("cfProvider", cfProvider);
        runner.setProperty(cfProvider, JNDIConnectionFactoryProvider.BROKER_URI,
                "vm://localhost?broker.persistent=false");
        runner.setProperty(cfProvider, JNDIConnectionFactoryProvider.JNDI_CF_LOOKUP, "ConnectionFactory");
        runner.setProperty(cfProvider, JNDIConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, libPath);
        runner.setProperty(cfProvider, JNDIConnectionFactoryProvider.CONNECTION_FACTORY_IMPL,
                "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
        runner.enableControllerService(cfProvider);
        runner.assertValid(cfProvider);

        Connection connection = cfProvider.getConnectionFactory().createConnection();
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination queue = session.createQueue("myqueue");
        MessageProducer producer = session.createProducer(queue);
        MessageConsumer consumer = session.createConsumer(queue);

        TextMessage message = session.createTextMessage("Hello");
        producer.send(message);
        assertEquals("Hello", ((TextMessage) consumer.receive()).getText());
        connection.stop();
        connection.close();
    } catch (Exception e) {
        logger.error("'validateFactoryCreationWithActiveMQLibraries' failed due to ", e);
    }
}
项目:testee.fi    文件:TestEEfiSession.java   
@Override
public MessageConsumer createConsumer(
        final Destination destination,
        final String messageSelector
) throws JMSException {
    return notImplemented();
}
项目:testee.fi    文件:TestEEfiSession.java   
@Override
public MessageConsumer createConsumer(
        final Destination destination,
        final String messageSelector,
        final boolean noLocal
) throws JMSException {
    return notImplemented();
}
项目:testee.fi    文件:TestEEfiSession.java   
@Override
public MessageConsumer createSharedConsumer(
        final Topic topic,
        final String sharedSubscriptionName,
        final String messageSelector
) throws JMSException {
    return notImplemented();
}
项目:testee.fi    文件:TestEEfiSession.java   
@Override
public MessageConsumer createDurableConsumer(
        final Topic topic,
        final String name,
        final String messageSelector,
        final boolean noLocal
) throws JMSException {
    return notImplemented();
}
项目:SpringTutorial    文件:HelloWorldConsumer.java   
public static void main(String[] args) {

        try {

            // Create a ConnectionFactory
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://Toshiba:61616");

            // Create a Connection
            Connection connection = connectionFactory.createConnection();
            connection.start();


            // Create a Session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // Create the destination (Topic or Queue)
            Destination destination = session.createQueue("HELLOWORLD.TESTQ");

            // Create a MessageConsumer from the Session to the Topic or Queue
            MessageConsumer consumer = session.createConsumer(destination);

            // Wait for a message
            Message message = consumer.receive(1000);

            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                String text = textMessage.getText();
                System.out.println("Received: " + text);
            } else {
                System.out.println("Received: " + message);
            }

            consumer.close();
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
项目:SpringTutorial    文件:HelloWorldConsumerThread.java   
public void run() {
    try {

        // Create a ConnectionFactory
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://Toshiba:61616");

        // Create a Connection
        Connection connection = connectionFactory.createConnection();
        connection.start();

        connection.setExceptionListener(this);

        // Create a Session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // Create the destination (Topic or Queue)
        Destination destination = session.createQueue("HELLOWORLD.TESTQ");

        // Create a MessageConsumer from the Session to the Topic or Queue
        MessageConsumer consumer = session.createConsumer(destination);

        // Wait for a message
        Message message = consumer.receive(1000);

        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            String text = textMessage.getText();
            System.out.println("Received: " + text);
        } else {
            System.out.println("Received: " + message);
        }

        consumer.close();
        session.close();
        connection.close();
    } catch (Exception e) {
        System.out.println("Caught: " + e);
        e.printStackTrace();
    }
}
项目:aem-orchestrator    文件:AwsConfig.java   
@Bean
public MessageConsumer sqsMessageConsumer(final SQSConnection connection) throws JMSException {

    /*
     * Create the session and use UNORDERED_ACKNOWLEDGE mode. Acknowledging
     * messages deletes them from the queue. Each message must be individually
     * acknowledged
     */
    Session session = connection.createSession(false, SQSSession.UNORDERED_ACKNOWLEDGE);

    return session.createConsumer(session.createQueue(queueName));
}
项目:aem-stack-manager    文件:AwsConfig.java   
@Bean
public MessageConsumer sqsMessageConsumer(SQSConnection connection) throws JMSException {

    /*
     * Create the session and use CLIENT_ACKNOWLEDGE mode. Acknowledging
     * messages deletes them from the queue
     */
    Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

    return session.createConsumer(session.createQueue(queueName));
}
项目:message-broker    文件:SslTransportTest.java   
@Parameters({ "broker-ssl-port"})
@Test
public void testConsumerProducerWithSsl(String port) throws Exception {
    String queueName = "testConsumerProducerWithAutoAck";
    InitialContext initialContextForQueue = ClientHelper
            .getInitialContextBuilder("admin", "admin", "localhost", port)
            .enableSsl()
            .withQueue(queueName)
            .build();

    ConnectionFactory connectionFactory
            = (ConnectionFactory) initialContextForQueue.lookup(ClientHelper.CONNECTION_FACTORY);
    Connection connection = connectionFactory.createConnection();
    connection.start();

    // publish 100 messages
    Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Queue queue = producerSession.createQueue(queueName);
    MessageProducer producer = producerSession.createProducer(queue);

    int numberOfMessages = 100;
    for (int i = 0; i < numberOfMessages; i++) {
        producer.send(producerSession.createTextMessage("Test message " + i));
    }
    producerSession.close();

    // Consume published messages
    Session subscriberSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Destination subscriberDestination = (Destination) initialContextForQueue.lookup(queueName);
    MessageConsumer consumer = subscriberSession.createConsumer(subscriberDestination);

    for (int i = 0; i < numberOfMessages; i++) {
        Message message = consumer.receive(1000);
        Assert.assertNotNull(message, "Message #" + i + " was not received");
    }

    connection.close();
}
项目:java-jms    文件:TracingActiveMQTest.java   
@Test
public void sendAndReceive() throws Exception {
  Destination destination = session.createQueue("TEST.FOO");

  MessageProducer messageProducer = session.createProducer(destination);
  messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

  // Instrument MessageProducer with TracingMessageProducer
  TracingMessageProducer producer =
      new TracingMessageProducer(messageProducer, mockTracer);

  MessageConsumer messageConsumer = session.createConsumer(destination);

  // Instrument MessageConsumer with TracingMessageConsumer
  TracingMessageConsumer consumer = new TracingMessageConsumer(messageConsumer, mockTracer);

  TextMessage message = session.createTextMessage("Hello world");

  producer.send(message);

  TextMessage received = (TextMessage) consumer.receive(5000);
  assertEquals("Hello world", received.getText());

  List<MockSpan> mockSpans = mockTracer.finishedSpans();
  assertEquals(2, mockSpans.size());

  checkSpans(mockSpans);
  assertNull(mockTracer.activeSpan());
}
项目:JavaSamples    文件:MessageListenerContainer.java   
@Override
protected boolean receiveAndExecute(Object invoker, Session session, MessageConsumer consumer) throws JMSException {
    //消费MQ限速
    if (!rateLimiter.tryAcquire()) {
        return false;
    }

    return super.receiveAndExecute(invoker, session, consumer);
}
项目:ats-framework    文件:JmsClient.java   
private void doCleanupQueue( final Session session, final Destination destination ) throws JMSException {

        try {
            MessageConsumer consumer = session.createConsumer(destination);
            Message message = null;
            do {
                message = consumer.receiveNoWait();
                if (message != null) {
                    message.acknowledge();
                }
            } while (message != null);
        } finally {
            releaseSession(false);
        }
    }
项目:java-jms    文件:TracingJmsTemplate.java   
@Override
protected MessageConsumer createConsumer(Session session, Destination destination,
    String messageSelector)
    throws JMSException {
  return new TracingMessageConsumer(super.createConsumer(session, destination, messageSelector),
      tracer);
}
项目:solace-integration-guides    文件:JNDIConnectionFactoryProviderTest.java   
/**
 * This test simply validates that {@link ConnectionFactory} can be setup by pointing to the location of the client
 * libraries at runtime. It uses ActiveMQ which is not present at the POM but instead pulled from Maven repo using
 * {@link TestUtils#setupActiveMqLibForTesting(boolean)}, which implies that for this test to run the computer must
 * be connected to the Internet. If computer is not connected to the Internet, this test will quietly fail logging a
 * message.
 */
@Test
public void validateFactoryCreationWithActiveMQLibraries() throws Exception {
    try {
        String libPath = TestUtils.setupActiveMqLibForTesting(true);

        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
        JNDIConnectionFactoryProvider cfProvider = new JNDIConnectionFactoryProvider();
        runner.addControllerService("cfProvider", cfProvider);
        runner.setProperty(cfProvider, JNDIConnectionFactoryProvider.BROKER_URI,
                "vm://localhost?broker.persistent=false");
        runner.setProperty(cfProvider, JNDIConnectionFactoryProvider.JNDI_CF_LOOKUP, "ConnectionFactory");
        runner.setProperty(cfProvider, JNDIConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, libPath);
        runner.setProperty(cfProvider, JNDIConnectionFactoryProvider.CONNECTION_FACTORY_IMPL,
                "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
        runner.enableControllerService(cfProvider);
        runner.assertValid(cfProvider);

        Connection connection = cfProvider.getConnectionFactory().createConnection();
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination queue = session.createQueue("myqueue");
        MessageProducer producer = session.createProducer(queue);
        MessageConsumer consumer = session.createConsumer(queue);

        TextMessage message = session.createTextMessage("Hello");
        producer.send(message);
        assertEquals("Hello", ((TextMessage) consumer.receive()).getText());
        connection.stop();
        connection.close();
    } catch (Exception e) {
        logger.error("'validateFactoryCreationWithActiveMQLibraries' failed due to ", e);
    }
}
项目:jmsclient    文件:JMSTopicClient.java   
@Override
public MessageConsumer createSubscriber() throws JMSException
{
    TopicSubscriber recv = ((TopicSession) session).createSubscriber((Topic) topic, messageSelector, true);
    log.debug("Created non-durable subscriber");
    return recv;
}
项目:servicebuilder    文件:ActiveMqListener.java   
@Override
public void requeueFailedMessages() {
    try {
        ActiveMQConnection connection = ActiveMqUtils.openConnection(user, password, url);
        Session session = ActiveMqUtils.startSession(connection);

        int count = getQueueSize(session, queueError);

        if (count < 1) {
            return;
        }

        log.info("Requeuing {} failed messages...", count);

        Queue queueErr = session.createQueue(queueError);
        MessageConsumer consumer = session.createConsumer(queueErr);

        Queue queueRetry = session.createQueue(queueInput);
        MessageProducer producer = session.createProducer(queueRetry);

        for (int consumed = 0; consumed < count; consumed++) {
            TextMessage message = (TextMessage) consumer.receive(REQUEUE_TIMEOUT);

            if (message == null) {
                continue;
            }

            String text = message.getText();
            String requestId = message.getJMSCorrelationID();

            log.info("Requeuing message '{}'", text);

            try {
                TextMessage newMessage = session.createTextMessage(text);
                newMessage.setJMSCorrelationID(requestId);

                producer.send(newMessage);
            } catch (Exception e) {
                log.error("Failed to requeue message", e);
            }

            message.acknowledge();
            session.commit();
        }

        producer.close();
        consumer.close();
    } catch (JMSException ex) {
        throw new MessageQueueException("Failed to requeue failed messages", ex);
    }
}
项目:pooled-jms    文件:JmsPoolSession.java   
@Override
public MessageConsumer createConsumer(Destination destination) throws JMSException {
    return addConsumer(getInternalSession().createConsumer(destination));
}
项目:pooled-jms    文件:JmsPoolSession.java   
@Override
public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
    return addConsumer(getInternalSession().createConsumer(destination, selector));
}
项目:pooled-jms    文件:JmsPoolSession.java   
@Override
public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
    return addConsumer(getInternalSession().createConsumer(destination, selector, noLocal));
}
项目:pooled-jms    文件:JmsPoolSession.java   
@Override
public MessageConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName, String messageSelector) throws JMSException {
    PooledSessionHolder state = safeGetSessionHolder();
    state.getConnection().checkClientJMSVersionSupport(2, 0);
    return addConsumer(state.getSession().createSharedConsumer(topic, sharedSubscriptionName, messageSelector));
}
项目:pooled-jms    文件:JmsPoolSession.java   
@Override
public MessageConsumer createDurableConsumer(Topic topic, String name) throws JMSException {
    PooledSessionHolder state = safeGetSessionHolder();
    state.getConnection().checkClientJMSVersionSupport(2, 0);
    return addConsumer(state.getSession().createDurableConsumer(topic, name));
}