ActiveMQ的selectors


【 JMS Selectors 】

JMS Selectors用于在 订阅 中,基于消息属性对消息进行过滤。

以下是个Selectors的例子:Java代码

consumer = session. **createConsumer** (destination, "JMSType = 'car' AND weight > 2500");

在JMS Selectors表达式中,可以使用IN、NOT IN、LIKE等,

例如: LIKE '12%3' ('123' true,'12993' true,'1234' false) LIKE 'lse' ('lose' true,'loose' false) LIKE '\%' ESCAPE '\' ('_foo' true,'foo' false)

需要注意的是,JMS Selectors表达式中的日期和时间需要使用标准的long型毫秒值。

另外表达式中的属性不会自动进行类型转换,例如:Java代码myMessage.setStringProperty("NumberOfOrders", "2");"NumberOfOrders > 1" 求值结果是false。

关于JMS Selectors的详细文档请参考javax.jms.Message的javadoc。 上一小节介绍的Message Groups虽然可以保证具有相同message group的消息被唯一的consumer顺序处理,但是却不能确定被哪个consumer处理。在某些情况下,Message Groups可以和JMS Selector一起工作,例如: 设想有三个consumers分别是A、B和C。你可以在producer中为消息设置三个message groups分别是"A"、"B"和"C"。然后令consumer A使用"JMXGroupID = 'A'"作为selector。B和C也同理。这样就可以保证message group A的消息只被consumer A处理。需要注意的是,这种做法有以下缺点:• producer必须知道当前正在运行的consumers,也就是说producer和consumer被耦合到一起。• 如果某个consumer失效,那么应该被这个consumer消费的消息将会一直被积压在broker上。

【消息过滤实例】

【生产者】

package test2.activemq.demo;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {

    //1.连接工厂
    private ConnectionFactory connectionFactory;
    //2.连接对象
    private Connection connection;
    //3.Session对象
    private Session session;
    //4.生产者
    private MessageProducer messageProducer;

    public Producer(){
        try{
            this.connectionFactory = new ActiveMQConnectionFactory(
                    ActiveMQConnectionFactory.DEFAULT_USER,
                    ActiveMQConnectionFactory.DEFAULT_PASSWORD,
                    "tcp://localhost:61616");
            this.connection=  this.connectionFactory.createConnection();
            this.connection.start();
            this.session = this.connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
            this.messageProducer = this.session.createProducer(null);
        }catch(JMSException e){
            e.printStackTrace();
        }
    }

    public Session getSession(){
        return this.session;
    }

    public void sendMapMessage(){
        try{
            Destination destination = this.session.createQueue("bestQueue");
            MapMessage msg1 = this.session.createMapMessage();
            msg1.setString("name", "Higgin"); 
            msg1.setString("age", "25");            //这个格式不属于过滤的格式
            msg1.setIntProperty("money", 100);      //这个格式次才是能过滤的格式
            msg1.setStringProperty("color", "blue");


            MapMessage msg2 = this.session.createMapMessage();
            msg2.setString("name", "Higgin2");
            msg2.setString("age", "18");
            msg2.setIntProperty("money", 10);
            msg2.setStringProperty("color", "yellow");

            MapMessage msg3 = this.session.createMapMessage();
            msg3.setString("name", "Zhansan");
            msg3.setString("age", "28");
            msg3.setIntProperty("money", 100);
            msg3.setStringProperty("color", "red");

            MapMessage msg4 = this.session.createMapMessage();
            msg4.setString("name", "Lisi");
            msg4.setString("age", "33");
            msg4.setIntProperty("money", 120);
            msg4.setStringProperty("color", "blue");

            MapMessage msg5 = this.session.createMapMessage();
            msg5.setString("name", "Wangwu");
            msg5.setString("age", "35");
            msg5.setIntProperty("money", 60);
            msg5.setStringProperty("color", "blue");

            this.messageProducer.send(destination,msg1,DeliveryMode.NON_PERSISTENT,2,1000*60*10L);  //消息有效时间10分钟
            this.messageProducer.send(destination,msg2,DeliveryMode.NON_PERSISTENT,6,1000*60*10L);
            this.messageProducer.send(destination,msg3,DeliveryMode.NON_PERSISTENT,3,1000*60*10L);
            this.messageProducer.send(destination,msg4,DeliveryMode.NON_PERSISTENT,8,1000*60*10L);
            this.messageProducer.send(destination,msg5,DeliveryMode.NON_PERSISTENT,1,1000*60*10L);

        }catch(JMSException e){
            e.printStackTrace();
        }
    }

    public void sendTextMessage(){
        try{
            Destination destination = this.session.createQueue("bestQueue");
            TextMessage textMessage = this.session.createTextMessage("text Message Ha Ha Ha");
            this.messageProducer.send(destination,textMessage,DeliveryMode.NON_PERSISTENT,9,1000*60*10L);
        }catch(JMSException e){
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Producer p = new Producer();
        p.sendMapMessage();
    }

}

【消费者】

package test2.activemq.demo;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {
    //错误的条件,因为producer是使用setString()方法添加的属性,只有setIntProperty或setStringProperty这种格式的才能满足过滤条件
    public final String SELECTOR_0 = "age = 25";  

    public final String SELECTOR_1 = "color = 'blue'";

    public final String SELECTOR_2 = "color = 'blue' AND money > 100 ";

    public final String SELECTOR_3 = "receiver = 'A'";

    //1.连接工厂
    private ConnectionFactory connectionFactory;
    //2.连接对象
    private Connection connection;
    //3.Session对象
    private Session session;
    //4.生产者
    private MessageConsumer messageConsumer;
    //5.队列
    private Destination destination; 

    public Consumer(){
        try{
            this.connectionFactory = new ActiveMQConnectionFactory(
                    ActiveMQConnectionFactory.DEFAULT_USER,
                    ActiveMQConnectionFactory.DEFAULT_PASSWORD,
                    "tcp://localhost:61616");
            this.connection=  this.connectionFactory.createConnection();
            this.connection.start();
            this.session = this.connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
            this.destination = this.session.createQueue("bestQueue");
            //创建消费者的时候,除了增加队列信息,还可以添加一个过滤器
            this.messageConsumer = this.session.createConsumer(this.destination,this.SELECTOR_1);
        }catch(JMSException e){
            e.printStackTrace();
        }
    }

    public void receiver(){
        try{
            this.messageConsumer.setMessageListener(new Listener());
        }catch(JMSException e){
            e.printStackTrace();
        }
    }

    class Listener implements MessageListener{

        @Override
        public void onMessage(Message msg) {
            try{
                if(msg instanceof MapMessage){
                    MapMessage result =(MapMessage)msg;
                    System.out.println(result.toString());
                    System.out.println("被消费的消息:"+result.getString("name")+"--"+result.getString("age")+"--"+result.getStringProperty("color"));
                }
            }catch(Exception e){
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        Consumer c = new Consumer();
        c.receiver();
    } 
}

【消费者SELECT_1的运行结果】


原文链接:https://www.cnblogs.com/HigginCui/p/8469367.html