RabbitMQ之路由模式
作者:互联网
概念
简单来说就是控制消费者拿到特定条件的消息
比如一个情景:生产者生产日志消息 然后低级别的日志交给一号消费者处理 严重的交给二号消费者处理
简单例子
生产者代码和之前的订阅模式 区别在于交换机模式改为DIRECT
同时要给出routekey 即判断的标准
然后生成消息发送时需要给出routekey
public class RouteProducer {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.198.129");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//创建连接
connection = connectionFactory.newConnection("生产者");
//获取通道
channel = connection.createChannel();
//创建交换机以及两个队列 同时绑定关系
String exchangeName = "test_direct";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,false,false,false,null);
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
channel.queueDeclare(queue1Name,false,false,false,null);
channel.queueDeclare(queue2Name,false,false,false,null);
//绑定关系 第三个参数为routingKey 绑定规则 fanout使用""
channel.queueBind(queue1Name,exchangeName,"error");
channel.queueBind(queue2Name,exchangeName,"info");
channel.queueBind(queue2Name,exchangeName,"error");
channel.queueBind(queue2Name,exchangeName,"warning");
//测试发送两条信息 给出对应的routeKey
String message = "error消息测试";
channel.basicPublish(exchangeName,"error",null,message.getBytes(StandardCharsets.UTF_8));
message = "warning消息测试";
channel.basicPublish(exchangeName,"warning",null,message.getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
finally {
//关闭通道
if(channel != null && channel.isOpen()){
try {
channel.close();
}
catch (Exception e){
e.printStackTrace();
}
}
if(connection != null && connection.isOpen()){
try {
connection.close();
}
catch (Exception e){
e.printStackTrace();
}
}
}
}
}
消费者这边写法没改变 队列名字换一下即可
public class RouteConsumer {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.198.129");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//创建连接
connection = connectionFactory.newConnection("消费者");
//获取通道
channel = connection.createChannel();
//通过通道声明队列,创建交换机等一系列事情
channel.basicConsume("test_direct_queue1", true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("1号消费者接受到的消息为 " + new String(delivery.getBody(), "UTF-8"));
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println("收取消息失败");
}
});
//卡一下
System.out.println("键盘输入关闭消费者");
System.in.read();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
finally {
//关闭通道
if(channel != null && channel.isOpen()){
try {
channel.close();
}
catch (Exception e){
e.printStackTrace();
}
}
if(connection != null && connection.isOpen()){
try {
connection.close();
}
catch (Exception e){
e.printStackTrace();
}
}
}
}
}
运行程序:
可以看到消费者只能拿到对应routekey规则的信息
标签:connectionFactory,false,String,channel,模式,connection,RabbitMQ,null,路由 来源: https://www.cnblogs.com/OfflineBoy/p/15366394.html