ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

ActiveMQ消息选择器Selector

2022-01-16 14:04:47  阅读:370  来源: 互联网

标签:jms Selector javax connection session import null ActiveMQ 选择器


一、前言

  消息发送到Broker,消费者通过Destination可以订阅消费某个特定的通道内的消息。一些特殊情况下,需要消费者对消息过滤下再进行消费,也就是筛选出某些特定消息。ActiveMQ提供了SQL92表达式语法的自定义消息筛选功能。非常方便快捷的能够开发出具有消息筛选功能的应用。

  ActiveMQ 支持:

  1. 数字表达式: >,>=,<,<=,BETWEEN,=.
  2. 字符表达式:=,<>,IN.
  3. IS NULL 或则 IS NOT NULL.
  4. 逻辑AND, 逻辑OR, 逻辑NOT.

  常数类型:

  1. 数字:3.1415926, 5。
  2. 字符: ‘a’,必须带有单引号。
  3. NULL,特别的常量。
  4. 布尔类型: TRUEFALSE

二、程序案例

  生产者:

复制代码

package com.cfang.prebo.activemq.selector;

import java.util.Scanner;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = null;
        Connection connection = null;
        Session session = null;
        Destination destination = null;
        MessageProducer producer = null;
        Message message = null;
        try {
            Scanner scanner = new Scanner(System.in);
            connectionFactory = new ActiveMQConnectionFactory("tcp://172.31.31.160:61618");
            connection = connectionFactory.createConnection(null, null);
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("TP_Q_TEST_SELECTOR00");
            producer = session.createProducer(destination);
            while(true) {
                String line = scanner.nextLine();
                if("exit".equals(line)) {
                    break;
                }
                message = session.createTextMessage(line);
                message.setIntProperty("applicationName", line.length());
                message.setStringProperty("result", "RT");
                producer.send(message);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(producer != null){ // 回收消息发送者
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if(session != null){ // 回收会话对象
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if(connection != null){ // 回收连接对象
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
}

复制代码

  如上,生产者还可以设置更多的条件,ActiveMQ也提供了全基本类型的 setXXXXXProperty方法去设置条件。

  消费者:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

package com.cfang.prebo.activemq.selector;

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.Destination;

import javax.jms.Message;

import javax.jms.MessageConsumer;

import javax.jms.MessageListener;

import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ConsumerA {

    public static void main(String[] args) {

        ConnectionFactory connectionFactory = null;

        Connection connection = null;

        Session session = null;

        Destination destination = null;

        MessageConsumer consumer = null;

        try {

            connectionFactory = new ActiveMQConnectionFactory("tcp://172.31.31.160:61618");

            connection = connectionFactory.createConnection(nullnull);

            connection.start();

            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            destination = session.createQueue("TP_Q_TEST_SELECTOR00");

            consumer = session.createConsumer(destination,"applicationName=2 and result='RT'");

            consumer.setMessageListener(new MessageListener() {

                public void onMessage(Message message) {

                    System.out.println(message);

                }

            });

        catch (Exception e) {

            e.printStackTrace();

        finally {

             

        }

    }

}

  如上,消费者就只消费  applicationName = 2 且  result = 'RT' 的消息。

三、小结

  1、提供了筛选功能,可以减少 destination 的数量。可以用于实现特定机器,特定消息(灰度?)。

  2、如果同时两个消费者的话,一个异常不能消费了,那么消息就会产生积压。对另一个正常的消费者而言,性能会下降,消费时间可能会变长。

标签:jms,Selector,javax,connection,session,import,null,ActiveMQ,选择器
来源: https://blog.csdn.net/qq_39459412/article/details/122522582

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有